modify scripts

This commit is contained in:
oscarz
2025-04-26 17:44:13 +08:00
parent 26cd6b52ca
commit fa1bb1df02

View File

@ -0,0 +1,208 @@
import logging
import os
import time
from datetime import datetime
import pandas as pd
#import akshare as ak
import src.crawler.em.stock_finance_hk_em as ak
import src.logger.logger as logger
import src.config.config as config
'''
akshare 的实现里,会先调用 RPT_CUSTOM_HKSK_APPFN_CASHFLOW_SUMMARY 来判断财年数据,而这个有可能更新的比较慢,导致最新的年报获取不到。
'''
# 配置日志
logger.setup_logging()
current_date = datetime.now().strftime("%Y%m%d")
# 获取年度利润表数据
def get_stock_financial_data(stock):
try:
income_statement_df = ak.stock_financial_hk_report_em(stock=stock, symbol="利润表", indicator="年度")
return income_statement_df
except Exception as e:
logging.error(f"获取股票 {stock} 财务数据时出错: {e}")
return pd.DataFrame()
# 把数组转置,多行拼成一行
def transform_data(df):
if df is None or df.empty:
return None
# 处理 REPORT_DATE 只保留日期
df['REPORT_DATE'] = pd.to_datetime(df['REPORT_DATE']).dt.date
# 提取固定列
fixed_columns = ['SECUCODE', 'SECURITY_CODE', 'SECURITY_NAME_ABBR', 'ORG_CODE', 'REPORT_DATE',
'DATE_TYPE_CODE', 'FISCAL_YEAR', 'START_DATE']
fixed_part = df[fixed_columns].drop_duplicates()
# 透视表将 STD_ITEM_NAME 转为列AMOUNT 为值
pivot_df = df.pivot_table(index='REPORT_DATE', columns='STD_ITEM_NAME', values='AMOUNT', aggfunc='first')
# 合并固定列和透视后的列
transformed_df = pd.merge(fixed_part, pivot_df, on='REPORT_DATE', how='left')
return transformed_df
# 计算增长率,逐年
def calculate_growth_rates(df, column):
# 按 REPORT_DATE 排序
df = df.sort_values(by='REPORT_DATE')
growth_rates = {}
for i in range(1, len(df)):
current_row = df.iloc[i]
previous_row = df.iloc[i - 1]
current_date = current_row['REPORT_DATE']
previous_date = previous_row['REPORT_DATE']
# 将日期转换为年份
current_year = pd.Timestamp(current_date).year
previous_year = pd.Timestamp(previous_date).year
# 检查是否为相邻的两年
if current_year - previous_year == 1:
if column in current_row and column in previous_row:
value_current = current_row[column]
value_previous = previous_row[column]
if pd.notna(value_previous) and value_previous != 0:
growth_rate = (value_current - value_previous) / value_previous
growth_rates[current_date] = growth_rate
return growth_rates
# 计算当年的指标
def calculate_ratios(df):
ratios = {}
for year in df['REPORT_DATE'].unique():
year_df = df[df['REPORT_DATE'] == year]
if '营运收入' in year_df.columns and '毛利' in year_df.columns:
gross_profit_margin = year_df['毛利'].sum() / year_df['营运收入'].sum() if year_df['营运收入'].sum() != 0 else None
else:
gross_profit_margin = None
if '营运收入' in year_df.columns and '销售及分销费用' in year_df.columns:
sales_rate = year_df['销售及分销费用'].sum() / year_df['营运收入'].sum() if year_df['营运收入'].sum() != 0 else None
else:
sales_rate = None
if '营运收入' in year_df.columns and '除税后溢利' in year_df.columns:
net_profit_margin = year_df['除税后溢利'].sum() / year_df['营运收入'].sum() if year_df['营运收入'].sum() != 0 else None
else:
net_profit_margin = None
ratios[year] = {
'毛利率': gross_profit_margin,
'销售费率': sales_rate,
'净利率': net_profit_margin
}
return ratios
# 检查报表的关键列是否存在
def check_columns(pd):
col_list = ['REPORT_DATE', 'FISCAL_YEAR', 'SECUCODE', 'SECURITY_NAME_ABBR',
'营运收入', '销售及分销费用', '经营溢利', '股东应占溢利', '每股基本盈利', '毛利', '除税后溢利'
]
for col in col_list:
if col not in pd.columns:
return False
return True
# 主处理函数
def main_process(stock_code, stock_name):
stock_str = f"({stock_code}:{stock_name})"
logging.info(f"处理股票 {stock_str}")
# 获取数据
df = get_stock_financial_data(stock_code)
results = []
if not df.empty :
transformed_df = transform_data(df)
save_to_csv(transformed_df, f"{config.global_host_data_dir}/tmp", f'{stock_code}.csv')
if check_columns(transformed_df):
if '12-31' == transformed_df['FISCAL_YEAR'].values[0]:
if pd.Timestamp('2024-12-31').date() in transformed_df['REPORT_DATE'].values:
revenue_growth = calculate_growth_rates(transformed_df, '营运收入')
sales_expense_growth = calculate_growth_rates(transformed_df, '销售及分销费用')
net_income_growth = calculate_growth_rates(transformed_df, '经营溢利')
net_income_growth = calculate_growth_rates(transformed_df, '股东应占溢利')
basic_eps_growth = calculate_growth_rates(transformed_df, '每股基本盈利')
ratios = calculate_ratios(transformed_df)
for year, ratio in ratios.items():
year_df = transformed_df[transformed_df['REPORT_DATE'] == year]
if not year_df.empty:
result = {
'code': year_df['SECUCODE'].values[0],
'name': year_df['SECURITY_NAME_ABBR'].values[0],
'年度': year,
'营运收入': year_df['营运收入'].values[0],
'毛利': year_df['毛利'].values[0],
'销售及分销费用': year_df['销售及分销费用'].values[0],
'经营溢利': year_df['经营溢利'].values[0],
'除税后溢利': year_df['除税后溢利'].values[0],
'股东应占溢利': year_df['股东应占溢利'].values[0],
'每股基本盈利': year_df['每股基本盈利'].values[0],
'毛利率': ratio['毛利率'],
'销售费率': ratio['销售费率'],
'净利率': ratio['净利率'],
'营收增长率': revenue_growth.get(year, None),
'销售费用增长率': sales_expense_growth.get(year, None),
'股东应占溢利增长率': net_income_growth.get(year, None),
'每股基本盈利增长率': basic_eps_growth.get(year, None),
'减值及拨备': year_df['减值及拨备'].values[0] if '减值及拨备' in transformed_df.columns else None,
}
results.append(result)
else:
logging.warning(f"股票 {stock_str} 没有 REPORT_DATE = 2024-12-31 的数据,跳过")
else:
logging.warning(f"股票 {stock_str} 的财年是 {transformed_df['FISCAL_YEAR'].values[0]} 跳过")
else:
logging.warning(f"columns not fit. {stock_str}")
else:
logging.warning(f"get_stock_financial_data empty. {stock_str}")
return results
# 读取stock code
def load_csv(csv_file_path):
try:
# 从 CSV 文件中读取股票代码
stock_df = pd.read_csv(csv_file_path, encoding='utf-16', sep='\t', dtype={'代码': str})
return stock_df
#stock_codes = stock_df['代码'].tolist()
#return stock_codes
except FileNotFoundError:
logging.error(f"未找到 CSV 文件: {csv_file_path}")
except KeyError:
logging.error("CSV 文件中未找到 '代码' 列。")
return pd.DataFrame()
# 写入结果
def save_to_csv(pd, save_path, file_name):
os.makedirs(save_path, exist_ok=True)
full_name = f"{save_path}/{file_name}"
pd.to_csv(full_name, index=False)
# 开始处理
if __name__ == "__main__":
stock_files = {
'hk_game_plat' : f'{config.global_host_input_dir}/hk_game_plat.csv',
'hk_tech_plat' : f'{config.global_host_input_dir}/hk_tech_plat.csv',
#'hk_house_plate' : f'{config.global_host_input_dir}/hk_house_plate.csv',
}
for key, ff in stock_files.items():
df = load_csv(ff)
if df.empty:
logging.warning(f"read file {ff} empty.")
continue
# 处理代码
all_results = []
for row in df.itertuples():
stock_code = getattr(row, '代码')
stock_name = getattr(row, '名称')
final_result = main_process(stock_code, stock_name)
all_results.extend(final_result)
result_df = pd.DataFrame(all_results)
file_name = f"{key}_{current_date}.csv"
save_to_csv(result_df, config.global_host_data_dir, file_name)
logging.info(f"data processed and saved to {file_name}")