""" Script Name: Description: 获取沪深300成分股的最新股价, 并计算年内涨幅, 924以来的涨幅, 市盈率, 股息率等。 调用em历史数据接口。 Author: [Your Name] Created Date: YYYY-MM-DD Last Modified: YYYY-MM-DD Version: 1.0 Modification History: - YYYY-MM-DD [Your Name]: - YYYY-MM-DD [Your Name]: - YYYY-MM-DD [Your Name]: """ import pymysql import logging import csv import os import re import time from datetime import datetime from futu import OpenQuoteContext, RET_OK # Futu API client from futu import * import argparse import src.crawling.stock_hist_em as his_em import src.logger.logger as logger import src.config.config as config from src.crawler.zixuan.xueqiu_zixuan import XueQiuStockFetcher # 配置日志 logger.setup_logging() current_date = datetime.now().strftime("%Y%m%d") current_year = datetime.now().strftime("%Y") res_dir = config.global_stock_data_dir # 刷新代码列表,并返回 def flush_code_map(): code_id_map_em_df = his_em.code_id_map_em() print(code_id_map_em_df) return code_id_map_em_df # 获取所有市场的当年股价快照,带重试机制。 def fetch_snap_all(max_retries: int = 3) -> pd.DataFrame: # 检查文件是否存在 file_name = f'{res_dir}/snapshot_em_{current_date}.csv' if os.path.exists(file_name): try: # 读取本地文件 snap_data = pd.read_csv(file_name, encoding='utf-8') logging.info(f"load snapshot data from local: {file_name}\n\n") return snap_data except Exception as e: logging.warning(f"读取本地文件失败: {e},将重新拉取数据\n\n") # 拉取数据 market_fs = {"china_a": "m:0 t:6,m:0 t:80,m:1 t:2,m:1 t:23,m:0 t:81 s:2048", "hk": "m:128 t:3,m:128 t:4,m:128 t:1,m:128 t:2", "us": "m:105,m:106,m:107"} result = pd.DataFrame() for market_id, fs in market_fs.items(): df = his_em.stock_zh_a_spot_em(fs, fs_desc=market_id) if df.empty: logging.warning(f'{market_id} empty data. please check.') return pd.DataFrame() else: logging.info(f'get {market_id} stock snapshot. stock count: {len(df)}') result = pd.concat([result, df], ignore_index=True) result.to_csv(file_name, index=False, encoding='utf-8') logging.info(f"get snapshot data and write to file: {file_name}\n\n") return result # 从数据库中读取指定指数的成分股 def load_index_codes(): conn = pymysql.connect(**config.db_config) cursor = conn.cursor(pymysql.cursors.DictCursor) #沪深300 #cursor.execute("SELECT code, code_name FROM index_hs where index_code='000300' ") #中证A500 #cursor.execute("SELECT code, code_name FROM index_hs where index_code='000510' ") #沪深300和中证A500的并集,去重 #cursor.execute("SELECT DISTINCT CONCAT('index-', code) as code, code_name FROM index_hs where index_code IN ('000300', '000510') ") #沪深300和中证A500的合并,不去重 cursor.execute("SELECT DISTINCT CONCAT(index_code , '-', code) as code, code_name FROM index_hs where index_code IN ('000300', '000510') ") #沪深300、中证A500、中证A50、科创芯片、科创创业50,不去重 #cursor.execute("SELECT DISTINCT CONCAT(index_code , '-', code) as code, code_name FROM index_hs where index_code IN ('000300', '000510', '930050', '000685', '931643') ") hs300_data = cursor.fetchall() #港股国企指数成分股、恒生科技指数成分股等 cursor.execute("SELECT DISTINCT CONCAT(index_code , '-', code) as code, code_name FROM index_hk where index_code IN ('HSCEI', 'HSTECH') ") hk_data = cursor.fetchall() #美股中概股等 cursor.execute("SELECT DISTINCT CONCAT(index_code , '-', code) as code, code_name FROM index_us where index_code IN ('CN_US') ") us_data = cursor.fetchall() cursor.close() conn.close() return hs300_data + hk_data + us_data def format_stock_code(code): """ 用正则表达式将 "SZ300750" 转换为 "SZ.300750" """ # 正则模式:匹配开头的1个或多个字母, followed by 1个或多个数字 pattern = r'^([A-Za-z]+)(\d+)$' match = re.match(pattern, code) if match: # 提取字母部分和数字部分,用点号拼接 letters = match.group(1) numbers = match.group(2) return f"{letters}.{numbers}" else: # 不匹配模式时返回原始字符串(如已包含点号、有其他字符等) return code def load_xueqiu_codes(): # 替换为你的实际cookie USER_COOKIES = "u=5682299253; HMACCOUNT=AA6F9D2598CE96D7; xq_is_login=1; snbim_minify=true; _c_WBKFRo=BuebJX5KAbPh1PGBVFDvQTV7x7VF8W2cvWtaC99v; _nb_ioWEgULi=; cookiesu=661740133906455; device_id=fbe0630e603f726742fec4f9a82eb5fb; s=b312165egu; bid=1f3e6ffcb97fd2d9b4ddda47551d4226_m7fv1brw; Hm_lvt_1db88642e346389874251b5a1eded6e3=1751852390; xq_a_token=a0fd17a76966314ab80c960412f08e3fffb3ec0f; xqat=a0fd17a76966314ab80c960412f08e3fffb3ec0f; xq_id_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ1aWQiOjU2ODIyOTkyNTMsImlzcyI6InVjIiwiZXhwIjoxNzU0NzAzMjk5LCJjdG0iOjE3NTIxMTEyOTkyODYsImNpZCI6ImQ5ZDBuNEFadXAifQ.Vbs-LDgB4bCJI2N644DwfeptdcamKsAm2hbXxlPnJ_0fnTJhXp6T-2Gc6b6jmhTjXJIsWta8IuS0rQBB1L-9fKpUliNFHkv4lr7FW2x7QhrZ1D4lrvjihgBxKHq8yQl31uO6lmUOJkoRaS4LM1pmkSL_UOVyw8aUeuVjETFcJR1HFDHwWpHCLM8kY55fk6n1gEgDZnYNh1_FACqlm6LU4Vq14wfQgyF9sfrGzF8rxXX0nns_j-Dq2k8vN3mknh8yUHyzCyq6Sfqn6NeVdR0vPOciylyTtNq5kOUBFb8uJe48aV2uLGww3dYV8HbsgqW4k0zam3r3QDErfSRVIg-Usw; xq_r_token=1b73cbfb47fcbd8e2055ca4a6dc7a08905dacd7d; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1752714700; is_overseas=0; ssxmod_itna=QqfxBD2D9DRQPY5i7YYxiwS4GhDYu0D0dGMD3qiQGglDFqAPKDHKm=lerDUhGr5h044VYmkTtDlxWeDZDG9dDqx0orXU7BB411D+iENYYe2GG+=3X0xOguYo7I=xmAkwKhSSIXNG2A+DnmeDQKDoxGkDivoD0IYwDiiTx0rD0eDPxDYDG4mDDvvQ84DjmEmFfoGImAeQIoDbORhz74DROdDS73A+IoGqW3Da1A3z8RGDmKDIhjozmoDFOL3Yq0k54i3Y=Ocaq0OZ+BGR0gvh849m1xkHYRr/oRCYQD4KDx5qAxOx20Z3isrfDxRvt70KGitCH4N4DGbh5gYH7x+GksdC58CNR3sx=1mt2qxkGd+QmoC5ZGYdixKG52q4iiqPj53js4D; ssxmod_itna2=QqfxBD2D9DRQPY5i7YYxiwS4GhDYu0D0dGMD3qiQGglDFqAPKDHKm=lerDUhGr5h044VYmkwYDioSBbrtN4=Htz/DUihxz=w4aD" # 初始化获取器 fetcher = XueQiuStockFetcher( cookies=USER_COOKIES, size=1000, retry_count=3 ) all_codes = [] stocks = fetcher.get_stocks_by_group( category=1, # 股票 pid=-1 # 全部 ) if stocks: for item in stocks: code = item['symbol'] mkt = item['marketplace'] if mkt: if mkt.lower() == 'cn': code = format_stock_code(code) elif mkt.lower() == 'hk': code = f"HK.{code}" else: code = f"US.{code}" all_codes.append({'code': code, 'code_name': item['name']}) return all_codes # 读取富途自选股的指定分类股 def load_futu_all_codes(): quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111) stock_data = [] ret, data = quote_ctx.get_user_security("全部") if ret == RET_OK: if data.shape[0] > 0: # 如果自选股列表不为空 stock_data = [{'code': row['code'], 'code_name': row['name']} for _, row in data.iterrows() if row['stock_type'] == 'STOCK'] #stock_data = [{'code': row['code'], 'code_name': row['name']} for _, row in data.iterrows()] else: logging.error('error:', data) quote_ctx.close() # 结束后记得关闭当条连接,防止连接条数用尽 return stock_data # 获取股票数据,并统计收益率 def calculate_stock_statistics(market, code, code_name): try: # 获取当前日期(用于比较) last_year = datetime.now().year - 1 last_year_str = str(last_year) # 调用 stock_zh_a_hist 获取历史数据 data = his_em.stock_zh_a_hist_new( em_symbol=code, period="daily", start_date="20210101", end_date=current_date, adjust='qfq', ) if data.empty: #logging.warning(f'fetch data for {code}, {code_name} failed. skipping...') return None # 获取当前日期的股价 current_row = data.loc[data['日期'].idxmax()] # 默认行,如果该股票没有年初股价,0923股价,1008股价等,以此记录代替 defaut_row = data.loc[data['日期'].idxmin()] # 获取年初股价,也就是上一年的最后一个交易日的收盘价 year_data = data[data['日期'].str.startswith(last_year_str)] if year_data.empty: logging.debug(f"{code}, {code_name} 未找到上一年的数据 ({last_year_str}), 以 {defaut_row['日期']} 的数据来代替") year_begin_row = defaut_row else: year_begin_row = year_data.loc[year_data['日期'].idxmax()] # 获取0923收盘价 try: row_0923 = data[data['日期'] == '2024-09-23'].iloc[0] except IndexError: logging.debug(f"{code}, {code_name} 未找到0923的数据, 以 {defaut_row['日期']} 的数据来代替") row_0923 = defaut_row # 获取0930收盘价 try: row_0930 = data[data['日期'] == '2024-09-30'].iloc[0] except IndexError: logging.debug(f"{code}, {code_name} 未找到0930的数据, 以 {defaut_row['日期']} 的数据来代替") row_0930 = defaut_row # 获取1008开盘价、收盘价 try: row_1008 = data[data['日期'] == '2024-10-08'].iloc[0] except IndexError: logging.debug(f"{code}, {code_name} 未找到1008的数据, 以 {defaut_row['日期']} 的数据来代替") row_1008 = defaut_row # 获取0403收盘价 try: row_0403 = data[data['日期'] == '2025-04-03'].iloc[0] except IndexError: logging.debug(f"{code}, {code_name} 未找到0403的数据, 以 {defaut_row['日期']} 的数据来代替") row_0403 = defaut_row # 获取0407收盘价 try: row_0407 = data[data['日期'] == '2025-04-07'].iloc[0] except IndexError: logging.debug(f"{code}, {code_name} 未找到0407的数据, 以 {defaut_row['日期']} 的数据来代替") row_0407 = defaut_row # 获取2021年以来的最高价 max_close_row = data.loc[data['收盘'].idxmax()] # 获取2021年以来的最低价 min_close_row = data.loc[data['收盘'].idxmin()] # 获取年内的最高价、最低价 year_data = data[data['日期'].str.startswith(current_year)] if year_data.empty: logging.debug(f"{code}, {code_name} 未找到年内的数据, 以 {defaut_row['日期']} 的数据来代替") year_min_row = defaut_row year_max_row = defaut_row else: year_min_row = year_data.loc[year_data['收盘'].idxmin()] year_max_row = year_data.loc[year_data['收盘'].idxmax()] # 计算统计数据 try: year_increase = (current_row['收盘'] / year_begin_row['收盘'] - 1) growth_0923 = (current_row['收盘'] / row_0923['收盘'] - 1) growth_0930 = (current_row['收盘'] / row_0930['收盘'] - 1) growth_1008 = (current_row['收盘'] / row_1008['收盘'] - 1) growth_1008_open = (current_row['收盘'] / row_1008['开盘'] - 1) growth_0403 = (current_row['收盘'] / row_0403['收盘'] - 1) growth_0407 = (current_row['收盘'] / row_0407['收盘'] - 1) year_amplitude = (year_max_row['收盘'] / year_min_row['收盘'] - 1) max_amplitude = (max_close_row['收盘'] / min_close_row['收盘'] - 1) stock_recovery = (current_row['收盘'] / max_close_row['收盘'] - 1) except ZeroDivisionError: logging.error(f"股票 {code} 计算时遇到除零错误") return None # 组织结果 result = [ market, code, code_name, current_row['日期'], current_row['收盘'], year_begin_row['日期'], year_begin_row['收盘'], row_0923['日期'], row_0923['收盘'] , row_0930['日期'], row_0930['收盘'] , row_1008['日期'], row_1008['开盘'] ,row_1008['收盘'] , row_0403['日期'], row_0403['收盘'] , row_0407['日期'], row_0407['收盘'] , float(row_0407['涨跌幅'])/100.0 , max_close_row['日期'], max_close_row['收盘'], min_close_row['日期'], min_close_row['收盘'], year_max_row['日期'], year_max_row['收盘'], year_min_row['日期'], year_min_row['收盘'], year_increase, growth_0923 if growth_0923 is not None else 'N/A', growth_0930 if growth_0930 is not None else 'N/A', growth_1008 if growth_1008 is not None else 'N/A', growth_1008_open if growth_1008_open is not None else 'N/A', growth_0403 if growth_0403 is not None else 'N/A', growth_0407 if growth_0407 is not None else 'N/A', year_amplitude, max_amplitude, stock_recovery ] return result except Exception as e: logging.error(f"处理股票 {code} 时出错: {e}") return None # 写入到文件中 def write_to_csv(results, filename): """将所有结果写入CSV文件""" try: with open(filename, mode='w', newline='', encoding='utf-8') as file: writer = csv.writer(file) # 写入表头 writer.writerow([ "股市", "股票代码", "股票名称", "当前日期", "当前收盘", "年初日期", "年初收盘", "0923日期", "0923收盘", "0930日期", "0930收盘", "1008日期", "1008开盘", "1008收盘", "0403日期", "0403收盘", "0407日期", "0407收盘", "0407涨跌幅", "最高日期", "最高收盘", "最低日期", "最低收盘", "年内最高日期", "年内最高收盘", "年内最低日期", "年内最低收盘", "年内涨幅", "相比0923收盘价涨幅", "相比0930收盘价涨幅", "相比1008收盘价涨幅", "相比1008开盘价涨幅", "相比0403收盘价涨幅", "相比0407收盘价涨幅", "年内振幅", "最大振幅", "股价自最高点恢复", "市盈率TTM", "市净率", "总市值" ]) # 写入每行数据 for result in results: writer.writerow(result) except Exception as e: logging.error(f"写入CSV文件时出错: {e}") # 主函数,执行逻辑 def main(list, debug): futu_codes = [] xueqiu_codes = [] index_codes = [] if list == 'futu': futu_codes = load_futu_all_codes() elif list == 'xueqiu': xueqiu_codes = load_xueqiu_codes() elif list == 'all': futu_codes = load_futu_all_codes() xueqiu_codes = load_xueqiu_codes() index_codes = load_index_codes() else: index_codes = load_index_codes() codes = futu_codes + index_codes + xueqiu_codes all_results = [] # 获取快照数据 snap_data = fetch_snap_all() if snap_data.empty: logging.error(f"fetching snapshot data error!") return em_code_map = {row['代码']: row['代码前缀'] for _, row in snap_data.iterrows()} for item in codes: code = item['code'] code_name = item['code_name'] # 清理股票代码中的前缀 try: market, clean_code = code.split(".") except ValueError: logging.error(f"wrong format code: {code}") if clean_code not in em_code_map: logging.warning(f"wrong stock code {clean_code}, please check.") continue em_code = f"{em_code_map[clean_code]}.{clean_code}" result = calculate_stock_statistics(market, em_code, code_name) if result: match = snap_data.loc[snap_data['代码'] == clean_code] if not match.empty: # 如果找到了匹配项 result.append(match['市盈率TTM'].iloc[0]) result.append(match['市净率'].iloc[0]) result.append(match['总市值'].iloc[0]) else: logging.warning(f'{market}.{clean_code} has no snapshot data.') all_results.append(result) logging.info(f"get data succ. {market}.{clean_code}, em_code: {em_code}, name: {code_name}...") else: logging.warning(f"get data faild. {market}.{clean_code}, em_code: {em_code}, name: {code_name}") if debug: break if all_results: file_name = f'{res_dir}/stock_statistics_{list}_{current_date}' if debug: file_name = f'{file_name}_debug' file_name = f'{file_name}.csv' write_to_csv(all_results, f'{file_name}') logging.info(f"统计结果已写入 CSV 文件 {file_name}") else: logging.warning("没有可写入的统计数据") if __name__ == "__main__": # 命令行参数处理 parser = argparse.ArgumentParser(description='计算指定股票的区间收益率') parser.add_argument('--list', type=str, default='all', help='Stocklist to process (futu , index, all)') parser.add_argument('--debug', action='store_true', help='Enable debug mode (limit records)') args = parser.parse_args() # 调用主函数 #flush_code_map() #print(load_futu_all_codes()) #print(load_xueqiu_codes()) main(args.list, args.debug)