""" Script Name: Description: 获取沪深300成分股的最新股价, 并计算年内涨幅, 924以来的涨幅, 市盈率, 股息率等。 调用em历史数据接口。 Author: [Your Name] Created Date: YYYY-MM-DD Last Modified: YYYY-MM-DD Version: 1.0 Modification History: - YYYY-MM-DD [Your Name]: - YYYY-MM-DD [Your Name]: - YYYY-MM-DD [Your Name]: """ import pymysql import logging import csv import os import re import time import pandas as pd import numpy as np from datetime import datetime import argparse import src.crawling.stock_hist_em as his_em import src.logger.logger as logger from src.config.config import global_stock_data_dir from src.crawler.zixuan.xueqiu_zixuan import XueQiuStockFetcher from src.sqlalchemy.models.stockdb import DailySanpModel, Base from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from src.sqlalchemy.config import global_db_url from .trading_day import TradingDayChecker from src.utils.send_to_wecom import send_to_wecom # 配置日志 logger.setup_logging() current_date = datetime.now().strftime("%Y%m%d") current_year = datetime.now().strftime("%Y") res_dir = global_stock_data_dir debug = False # 拉取数据 market_fs = { "cn": "m:0 t:6,m:0 t:80,m:1 t:2,m:1 t:23,m:0 t:81 s:2048", "hk": "m:128 t:3,m:128 t:4,m:128 t:1,m:128 t:2", "us": "m:105,m:106,m:107" } # 刷新代码列表,并返回 def flush_code_map(): code_id_map_em_df = his_em.code_id_map_em() print(code_id_map_em_df) return code_id_map_em_df # 获取所有市场的当年股价快照,带重试机制。 def fetch_snap_all(market_id, trading_date) -> pd.DataFrame: # 检查文件是否存在 os.makedirs(res_dir, exist_ok=True) file_name = f'{res_dir}/snapshot_em_{market_id}_{trading_date}.csv' if os.path.exists(file_name) and debug: try: # 读取本地文件 snap_data = pd.read_csv(file_name, encoding='utf-8') logging.info(f"load snapshot data from local: {file_name}\n\n") return snap_data except Exception as e: logging.warning(f"读取本地文件失败: {e},将重新拉取数据\n\n") result = pd.DataFrame() fs = market_fs.get(market_id, None) if not fs: logging.error(f"未找到市场 {market_id} 的数据源配置,请检查 market_fs 配置") return result df = his_em.stock_zh_a_spot_em(fs, fs_desc=market_id) if df.empty: logging.warning(f'{market_id} empty data. please check.') return pd.DataFrame() else: logging.info(f'get {market_id} stock snapshot. stock count: {len(df)}') # 关键步骤:添加market_id列,值为当前市场标识 df['market_id'] = market_id # 新增一列,记录数据所属市场 df['curr_date'] = trading_date result = pd.concat([result, df], ignore_index=True) result.to_csv(file_name, index=False, encoding='utf-8') logging.info(f"get snapshot data and write to file: {file_name}\n\n") return result def load_xueqiu_codes(): # 替换为你的实际cookie USER_COOKIES = "u=5682299253; HMACCOUNT=AA6F9D2598CE96D7; xq_is_login=1; snbim_minify=true; _c_WBKFRo=BuebJX5KAbPh1PGBVFDvQTV7x7VF8W2cvWtaC99v; _nb_ioWEgULi=; cookiesu=661740133906455; device_id=fbe0630e603f726742fec4f9a82eb5fb; s=b312165egu; bid=1f3e6ffcb97fd2d9b4ddda47551d4226_m7fv1brw; Hm_lvt_1db88642e346389874251b5a1eded6e3=1751852390; xq_a_token=a0fd17a76966314ab80c960412f08e3fffb3ec0f; xqat=a0fd17a76966314ab80c960412f08e3fffb3ec0f; xq_id_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ1aWQiOjU2ODIyOTkyNTMsImlzcyI6InVjIiwiZXhwIjoxNzU0NzAzMjk5LCJjdG0iOjE3NTIxMTEyOTkyODYsImNpZCI6ImQ5ZDBuNEFadXAifQ.Vbs-LDgB4bCJI2N644DwfeptdcamKsAm2hbXxlPnJ_0fnTJhXp6T-2Gc6b6jmhTjXJIsWta8IuS0rQBB1L-9fKpUliNFHkv4lr7FW2x7QhrZ1D4lrvjihgBxKHq8yQl31uO6lmUOJkoRaS4LM1pmkSL_UOVyw8aUeuVjETFcJR1HFDHwWpHCLM8kY55fk6n1gEgDZnYNh1_FACqlm6LU4Vq14wfQgyF9sfrGzF8rxXX0nns_j-Dq2k8vN3mknh8yUHyzCyq6Sfqn6NeVdR0vPOciylyTtNq5kOUBFb8uJe48aV2uLGww3dYV8HbsgqW4k0zam3r3QDErfSRVIg-Usw; xq_r_token=1b73cbfb47fcbd8e2055ca4a6dc7a08905dacd7d; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1752714700; is_overseas=0; ssxmod_itna=QqfxBD2D9DRQPY5i7YYxiwS4GhDYu0D0dGMD3qiQGglDFqAPKDHKm=lerDUhGr5h044VYmkTtDlxWeDZDG9dDqx0orXU7BB411D+iENYYe2GG+=3X0xOguYo7I=xmAkwKhSSIXNG2A+DnmeDQKDoxGkDivoD0IYwDiiTx0rD0eDPxDYDG4mDDvvQ84DjmEmFfoGImAeQIoDbORhz74DROdDS73A+IoGqW3Da1A3z8RGDmKDIhjozmoDFOL3Yq0k54i3Y=Ocaq0OZ+BGR0gvh849m1xkHYRr/oRCYQD4KDx5qAxOx20Z3isrfDxRvt70KGitCH4N4DGbh5gYH7x+GksdC58CNR3sx=1mt2qxkGd+QmoC5ZGYdixKG52q4iiqPj53js4D; ssxmod_itna2=QqfxBD2D9DRQPY5i7YYxiwS4GhDYu0D0dGMD3qiQGglDFqAPKDHKm=lerDUhGr5h044VYmkwYDioSBbrtN4=Htz/DUihxz=w4aD" # 初始化获取器 fetcher = XueQiuStockFetcher( cookies=USER_COOKIES, size=1000, retry_count=3 ) all_codes = [] stocks = fetcher.get_stocks_by_group( category=1, # 股票 pid=-1 # 全部 ) if stocks: for item in stocks: code = item['symbol'] mkt = item['marketplace'] if mkt: if mkt.lower() == 'cn': code = format_stock_code(code) elif mkt.lower() == 'hk': code = f"HK.{code}" else: code = f"US.{code}" all_codes.append({'code': code, 'code_name': item['name']}) return all_codes def insert_stock_data_to_db(dataframe, db_url=global_db_url): """ 将pandas DataFrame中的股票数据插入到MySQL数据库 参数: dataframe: 包含股票数据的pandas DataFrame db_url: 数据库连接字符串,格式如'mysql+mysqldb://user:password@host:port/dbname?charset=utf8mb4' """ # 创建数据库引擎 engine = create_engine(db_url) # 创建数据表(如果不存在) Base.metadata.create_all(engine) # 创建会话 Session = sessionmaker(bind=engine) session = Session() # 注意:pandas中NaN在数值列用np.nan,字符串列用pd.NA,统一替换为None dataframe = dataframe.replace({np.nan: None, pd.NA: None}) try: count_insert = 0 count_update = 0 # 遍历DataFrame的每一行 for _, row in dataframe.iterrows(): # 先检查 code 是否存在且有效 if not row.get('代码'): logging.warning(f"警告:发现无效的 code 值,跳过该行数据。行数据:{row['名称']}") continue # 跳过无效行 # 创建股票数据对象 stock = DailySanpModel( code=row['代码'], curr_date=row['curr_date'], name=row['名称'], market_id=row['market_id'], code_prefix=row['代码前缀'], industry=row['所处行业'], listing_date=pd.to_datetime(row['上市时间']).date() if row['上市时间'] else None, latest_price=row['最新价'], price_change_percent=row['涨跌幅'], price_change=row['涨跌额'], volume=row['成交量'], turnover=row['成交额'], amplitude=row['振幅'], turnover_rate=row['换手率'], pe_dynamic=row['市盈率动'], volume_ratio=row['量比'], change_5min=row['5分钟涨跌'], highest=row['最高'], lowest=row['最低'], opening=row['今开'], previous_close=row['昨收'], price_speed=row['涨速'], total_market_cap=row['总市值'], circulating_market_cap=row['流通市值'], pb_ratio=row['市净率'], change_60d=row['60日涨跌幅'], change_ytd=row['年初至今涨跌幅'], weighted_roe=row['加权净资产收益率'], total_shares=row['总股本'], circulating_shares=row['已流通股份'], operating_revenue=row['营业收入'], revenue_growth=row['营业收入同比增长'], net_profit=row['归属净利润'], net_profit_growth=row['归属净利润同比增长'], undistributed_profit_per_share=row['每股未分配利润'], gross_margin=row['毛利率'], asset_liability_ratio=row['资产负债率'], reserve_per_share=row['每股公积金'], earnings_per_share=row['每股收益'], net_asset_per_share=row['每股净资产'], pe_static=row['市盈率静'], pe_ttm=row['市盈率TTM'], report_period=row['报告期'] ) # 2. 执行merge:存在则更新,不存在则插入 merged_stock = session.merge(stock) # 3. 统计插入/更新数量 if merged_stock in session.new: # 新插入 count_insert += 1 elif merged_stock in session.dirty: # 已更新 count_update += 1 # 提交事务 session.commit() logging.info(f"成功插入 {count_insert} 条,更新 {count_update} 条数据") except Exception as e: # 发生错误时回滚 session.rollback() logging.warning(f"插入数据失败: {str(e)}") finally: # 关闭会话 session.close() def main(list, args_debug, notify): global debug debug = args_debug # 获取快照数据 market_list = list.split(',') if not market_list: logging.error("未指定市场列表,请使用 --list 参数指定市场(如 cn,hk,us)") return em_code_map = {} for market_id in market_list: # 获取交易日期 trading_day_checker = TradingDayChecker() if not trading_day_checker.is_trading_day_today(market_id.upper()) and not debug: logging.info(f"非交易日,不处理 {market_id} 市场,当前交易日期: {trading_date}, 当前日期: {current_date}") continue trading_date = trading_day_checker.get_trading_date(market_id.upper()) if not trading_date: logging.error(f"无法获取 {market_id} 市场的交易日期") continue # 获取快照数据 snap_data = fetch_snap_all(market_id, trading_date) if snap_data.empty: logging.error(f"未获取到 {market_id} 市场的快照数据") continue if snap_data.empty: logging.error(f"fetching snapshot data error for {market_id}!") continue insert_stock_data_to_db(dataframe=snap_data) logging.info(f"成功获取 {market_id} 市场的快照数据,记录数: {len(snap_data)}") if notify: send_to_wecom(f"fetched {market_id} snap data, counts: {len(snap_data)}") em_code_map.update({row['代码']: row['代码前缀'] for _, row in snap_data.iterrows()}) time.sleep(5) if __name__ == "__main__": # 命令行参数处理 parser = argparse.ArgumentParser(description='获取指定市场的快照数据并存储到数据库') parser.add_argument('--list', type=str, default='cn,hk,us', help='Stocklist to process (cn,hk,us)') parser.add_argument('--debug', action='store_true', help='Enable debug mode (limit records)') parser.add_argument('--notify', action='store_true', help='notify to wecom') args = parser.parse_args() main(args.list, args.debug, args.notify)