From fa1bb1df02208370897b435afbd56dd713850e93 Mon Sep 17 00:00:00 2001 From: oscarz Date: Sat, 26 Apr 2025 17:44:13 +0800 Subject: [PATCH] modify scripts --- src/static/akshare_finace.py | 208 +++++++++++++++++++++++++++++++++++ 1 file changed, 208 insertions(+) create mode 100644 src/static/akshare_finace.py diff --git a/src/static/akshare_finace.py b/src/static/akshare_finace.py new file mode 100644 index 0000000..d3c6ada --- /dev/null +++ b/src/static/akshare_finace.py @@ -0,0 +1,208 @@ +import logging +import os +import time +from datetime import datetime +import pandas as pd +#import akshare as ak +import src.crawler.em.stock_finance_hk_em as ak +import src.logger.logger as logger +import src.config.config as config +''' +akshare 的实现里,会先调用 RPT_CUSTOM_HKSK_APPFN_CASHFLOW_SUMMARY 来判断财年数据,而这个有可能更新的比较慢,导致最新的年报获取不到。 +''' + + +# 配置日志 +logger.setup_logging() + +current_date = datetime.now().strftime("%Y%m%d") + +# 获取年度利润表数据 +def get_stock_financial_data(stock): + try: + income_statement_df = ak.stock_financial_hk_report_em(stock=stock, symbol="利润表", indicator="年度") + return income_statement_df + except Exception as e: + logging.error(f"获取股票 {stock} 财务数据时出错: {e}") + return pd.DataFrame() + +# 把数组转置,多行拼成一行 +def transform_data(df): + if df is None or df.empty: + return None + # 处理 REPORT_DATE 只保留日期 + df['REPORT_DATE'] = pd.to_datetime(df['REPORT_DATE']).dt.date + # 提取固定列 + fixed_columns = ['SECUCODE', 'SECURITY_CODE', 'SECURITY_NAME_ABBR', 'ORG_CODE', 'REPORT_DATE', + 'DATE_TYPE_CODE', 'FISCAL_YEAR', 'START_DATE'] + fixed_part = df[fixed_columns].drop_duplicates() + # 透视表将 STD_ITEM_NAME 转为列,AMOUNT 为值 + pivot_df = df.pivot_table(index='REPORT_DATE', columns='STD_ITEM_NAME', values='AMOUNT', aggfunc='first') + # 合并固定列和透视后的列 + transformed_df = pd.merge(fixed_part, pivot_df, on='REPORT_DATE', how='left') + return transformed_df + +# 计算增长率,逐年 +def calculate_growth_rates(df, column): + # 按 REPORT_DATE 排序 + df = df.sort_values(by='REPORT_DATE') + growth_rates = {} + for i in range(1, len(df)): + current_row = df.iloc[i] + previous_row = df.iloc[i - 1] + current_date = current_row['REPORT_DATE'] + previous_date = previous_row['REPORT_DATE'] + # 将日期转换为年份 + current_year = pd.Timestamp(current_date).year + previous_year = pd.Timestamp(previous_date).year + # 检查是否为相邻的两年 + if current_year - previous_year == 1: + if column in current_row and column in previous_row: + value_current = current_row[column] + value_previous = previous_row[column] + if pd.notna(value_previous) and value_previous != 0: + growth_rate = (value_current - value_previous) / value_previous + growth_rates[current_date] = growth_rate + return growth_rates + +# 计算当年的指标 +def calculate_ratios(df): + ratios = {} + for year in df['REPORT_DATE'].unique(): + year_df = df[df['REPORT_DATE'] == year] + if '营运收入' in year_df.columns and '毛利' in year_df.columns: + gross_profit_margin = year_df['毛利'].sum() / year_df['营运收入'].sum() if year_df['营运收入'].sum() != 0 else None + else: + gross_profit_margin = None + if '营运收入' in year_df.columns and '销售及分销费用' in year_df.columns: + sales_rate = year_df['销售及分销费用'].sum() / year_df['营运收入'].sum() if year_df['营运收入'].sum() != 0 else None + else: + sales_rate = None + if '营运收入' in year_df.columns and '除税后溢利' in year_df.columns: + net_profit_margin = year_df['除税后溢利'].sum() / year_df['营运收入'].sum() if year_df['营运收入'].sum() != 0 else None + else: + net_profit_margin = None + + ratios[year] = { + '毛利率': gross_profit_margin, + '销售费率': sales_rate, + '净利率': net_profit_margin + } + return ratios + +# 检查报表的关键列是否存在 +def check_columns(pd): + col_list = ['REPORT_DATE', 'FISCAL_YEAR', 'SECUCODE', 'SECURITY_NAME_ABBR', + '营运收入', '销售及分销费用', '经营溢利', '股东应占溢利', '每股基本盈利', '毛利', '除税后溢利' + ] + for col in col_list: + if col not in pd.columns: + return False + return True + +# 主处理函数 +def main_process(stock_code, stock_name): + stock_str = f"({stock_code}:{stock_name})" + logging.info(f"处理股票 {stock_str}") + + # 获取数据 + df = get_stock_financial_data(stock_code) + results = [] + if not df.empty : + transformed_df = transform_data(df) + save_to_csv(transformed_df, f"{config.global_host_data_dir}/tmp", f'{stock_code}.csv') + + if check_columns(transformed_df): + if '12-31' == transformed_df['FISCAL_YEAR'].values[0]: + if pd.Timestamp('2024-12-31').date() in transformed_df['REPORT_DATE'].values: + revenue_growth = calculate_growth_rates(transformed_df, '营运收入') + sales_expense_growth = calculate_growth_rates(transformed_df, '销售及分销费用') + net_income_growth = calculate_growth_rates(transformed_df, '经营溢利') + net_income_growth = calculate_growth_rates(transformed_df, '股东应占溢利') + basic_eps_growth = calculate_growth_rates(transformed_df, '每股基本盈利') + + ratios = calculate_ratios(transformed_df) + for year, ratio in ratios.items(): + year_df = transformed_df[transformed_df['REPORT_DATE'] == year] + if not year_df.empty: + result = { + 'code': year_df['SECUCODE'].values[0], + 'name': year_df['SECURITY_NAME_ABBR'].values[0], + '年度': year, + '营运收入': year_df['营运收入'].values[0], + '毛利': year_df['毛利'].values[0], + '销售及分销费用': year_df['销售及分销费用'].values[0], + '经营溢利': year_df['经营溢利'].values[0], + '除税后溢利': year_df['除税后溢利'].values[0], + '股东应占溢利': year_df['股东应占溢利'].values[0], + '每股基本盈利': year_df['每股基本盈利'].values[0], + '毛利率': ratio['毛利率'], + '销售费率': ratio['销售费率'], + '净利率': ratio['净利率'], + '营收增长率': revenue_growth.get(year, None), + '销售费用增长率': sales_expense_growth.get(year, None), + '股东应占溢利增长率': net_income_growth.get(year, None), + '每股基本盈利增长率': basic_eps_growth.get(year, None), + '减值及拨备': year_df['减值及拨备'].values[0] if '减值及拨备' in transformed_df.columns else None, + } + results.append(result) + else: + logging.warning(f"股票 {stock_str} 没有 REPORT_DATE = 2024-12-31 的数据,跳过") + else: + logging.warning(f"股票 {stock_str} 的财年是 {transformed_df['FISCAL_YEAR'].values[0]} 跳过") + else: + logging.warning(f"columns not fit. {stock_str}") + else: + logging.warning(f"get_stock_financial_data empty. {stock_str}") + + return results + +# 读取stock code +def load_csv(csv_file_path): + try: + # 从 CSV 文件中读取股票代码 + stock_df = pd.read_csv(csv_file_path, encoding='utf-16', sep='\t', dtype={'代码': str}) + return stock_df + #stock_codes = stock_df['代码'].tolist() + #return stock_codes + + except FileNotFoundError: + logging.error(f"未找到 CSV 文件: {csv_file_path}") + except KeyError: + logging.error("CSV 文件中未找到 '代码' 列。") + return pd.DataFrame() + +# 写入结果 +def save_to_csv(pd, save_path, file_name): + os.makedirs(save_path, exist_ok=True) + full_name = f"{save_path}/{file_name}" + pd.to_csv(full_name, index=False) + + +# 开始处理 +if __name__ == "__main__": + stock_files = { + 'hk_game_plat' : f'{config.global_host_input_dir}/hk_game_plat.csv', + 'hk_tech_plat' : f'{config.global_host_input_dir}/hk_tech_plat.csv', + #'hk_house_plate' : f'{config.global_host_input_dir}/hk_house_plate.csv', + } + + for key, ff in stock_files.items(): + df = load_csv(ff) + if df.empty: + logging.warning(f"read file {ff} empty.") + continue + + # 处理代码 + all_results = [] + for row in df.itertuples(): + stock_code = getattr(row, '代码') + stock_name = getattr(row, '名称') + final_result = main_process(stock_code, stock_name) + all_results.extend(final_result) + + result_df = pd.DataFrame(all_results) + file_name = f"{key}_{current_date}.csv" + save_to_csv(result_df, config.global_host_data_dir, file_name) + logging.info(f"data processed and saved to {file_name}") + \ No newline at end of file