modify scripts

This commit is contained in:
2025-07-31 17:42:47 +08:00
parent cbd9b18ef4
commit 76cb84d401
17 changed files with 1654 additions and 0 deletions

231
src/static/daily_snap_em.py Normal file
View File

@ -0,0 +1,231 @@
"""
Script Name:
Description: 获取沪深300成分股的最新股价, 并计算年内涨幅, 924以来的涨幅, 市盈率, 股息率等。
调用em历史数据接口。
Author: [Your Name]
Created Date: YYYY-MM-DD
Last Modified: YYYY-MM-DD
Version: 1.0
Modification History:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
"""
import pymysql
import logging
import csv
import os
import re
import time
import pandas as pd
import numpy as np
from datetime import datetime
import argparse
import src.crawling.stock_hist_em as his_em
import src.logger.logger as logger
from src.config.config import global_stock_data_dir
from src.crawler.zixuan.xueqiu_zixuan import XueQiuStockFetcher
from src.sqlalchemy.models.stockdb import DailySanpModel, Base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from src.sqlalchemy.config import global_db_url
# 配置日志
logger.setup_logging()
current_date = datetime.now().strftime("%Y%m%d")
current_year = datetime.now().strftime("%Y")
res_dir = global_stock_data_dir
# 刷新代码列表,并返回
def flush_code_map():
code_id_map_em_df = his_em.code_id_map_em()
print(code_id_map_em_df)
return code_id_map_em_df
# 获取所有市场的当年股价快照,带重试机制。
def fetch_snap_all(max_retries: int = 3) -> pd.DataFrame:
# 检查文件是否存在
file_name = f'{res_dir}/snapshot_em_{current_date}.csv'
if os.path.exists(file_name):
try:
# 读取本地文件
snap_data = pd.read_csv(file_name, encoding='utf-8')
logging.info(f"load snapshot data from local: {file_name}\n\n")
return snap_data
except Exception as e:
logging.warning(f"读取本地文件失败: {e},将重新拉取数据\n\n")
# 拉取数据
market_fs = {"cn": "m:0 t:6,m:0 t:80,m:1 t:2,m:1 t:23,m:0 t:81 s:2048",
"hk": "m:128 t:3,m:128 t:4,m:128 t:1,m:128 t:2",
"us": "m:105,m:106,m:107"}
result = pd.DataFrame()
for market_id, fs in market_fs.items():
df = his_em.stock_zh_a_spot_em(fs, fs_desc=market_id)
if df.empty:
logging.warning(f'{market_id} empty data. please check.')
return pd.DataFrame()
else:
logging.info(f'get {market_id} stock snapshot. stock count: {len(df)}')
# 关键步骤添加market_id列值为当前市场标识
df['market_id'] = market_id # 新增一列,记录数据所属市场
result = pd.concat([result, df], ignore_index=True)
result.to_csv(file_name, index=False, encoding='utf-8')
logging.info(f"get snapshot data and write to file: {file_name}\n\n")
return result
def load_xueqiu_codes():
# 替换为你的实际cookie
USER_COOKIES = "u=5682299253; HMACCOUNT=AA6F9D2598CE96D7; xq_is_login=1; snbim_minify=true; _c_WBKFRo=BuebJX5KAbPh1PGBVFDvQTV7x7VF8W2cvWtaC99v; _nb_ioWEgULi=; cookiesu=661740133906455; device_id=fbe0630e603f726742fec4f9a82eb5fb; s=b312165egu; bid=1f3e6ffcb97fd2d9b4ddda47551d4226_m7fv1brw; Hm_lvt_1db88642e346389874251b5a1eded6e3=1751852390; xq_a_token=a0fd17a76966314ab80c960412f08e3fffb3ec0f; xqat=a0fd17a76966314ab80c960412f08e3fffb3ec0f; xq_id_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ1aWQiOjU2ODIyOTkyNTMsImlzcyI6InVjIiwiZXhwIjoxNzU0NzAzMjk5LCJjdG0iOjE3NTIxMTEyOTkyODYsImNpZCI6ImQ5ZDBuNEFadXAifQ.Vbs-LDgB4bCJI2N644DwfeptdcamKsAm2hbXxlPnJ_0fnTJhXp6T-2Gc6b6jmhTjXJIsWta8IuS0rQBB1L-9fKpUliNFHkv4lr7FW2x7QhrZ1D4lrvjihgBxKHq8yQl31uO6lmUOJkoRaS4LM1pmkSL_UOVyw8aUeuVjETFcJR1HFDHwWpHCLM8kY55fk6n1gEgDZnYNh1_FACqlm6LU4Vq14wfQgyF9sfrGzF8rxXX0nns_j-Dq2k8vN3mknh8yUHyzCyq6Sfqn6NeVdR0vPOciylyTtNq5kOUBFb8uJe48aV2uLGww3dYV8HbsgqW4k0zam3r3QDErfSRVIg-Usw; xq_r_token=1b73cbfb47fcbd8e2055ca4a6dc7a08905dacd7d; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1752714700; is_overseas=0; ssxmod_itna=QqfxBD2D9DRQPY5i7YYxiwS4GhDYu0D0dGMD3qiQGglDFqAPKDHKm=lerDUhGr5h044VYmkTtDlxWeDZDG9dDqx0orXU7BB411D+iENYYe2GG+=3X0xOguYo7I=xmAkwKhSSIXNG2A+DnmeDQKDoxGkDivoD0IYwDiiTx0rD0eDPxDYDG4mDDvvQ84DjmEmFfoGImAeQIoDbORhz74DROdDS73A+IoGqW3Da1A3z8RGDmKDIhjozmoDFOL3Yq0k54i3Y=Ocaq0OZ+BGR0gvh849m1xkHYRr/oRCYQD4KDx5qAxOx20Z3isrfDxRvt70KGitCH4N4DGbh5gYH7x+GksdC58CNR3sx=1mt2qxkGd+QmoC5ZGYdixKG52q4iiqPj53js4D; ssxmod_itna2=QqfxBD2D9DRQPY5i7YYxiwS4GhDYu0D0dGMD3qiQGglDFqAPKDHKm=lerDUhGr5h044VYmkwYDioSBbrtN4=Htz/DUihxz=w4aD"
# 初始化获取器
fetcher = XueQiuStockFetcher(
cookies=USER_COOKIES,
size=1000,
retry_count=3
)
all_codes = []
stocks = fetcher.get_stocks_by_group(
category=1, # 股票
pid=-1 # 全部
)
if stocks:
for item in stocks:
code = item['symbol']
mkt = item['marketplace']
if mkt:
if mkt.lower() == 'cn':
code = format_stock_code(code)
elif mkt.lower() == 'hk':
code = f"HK.{code}"
else:
code = f"US.{code}"
all_codes.append({'code': code, 'code_name': item['name']})
return all_codes
def insert_stock_data_to_db(dataframe, db_url=global_db_url):
"""
将pandas DataFrame中的股票数据插入到MySQL数据库
参数:
dataframe: 包含股票数据的pandas DataFrame
db_url: 数据库连接字符串,格式如'mysql+mysqldb://user:password@host:port/dbname?charset=utf8mb4'
"""
# 创建数据库引擎
engine = create_engine(db_url)
# 创建数据表(如果不存在)
Base.metadata.create_all(engine)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 注意pandas中NaN在数值列用np.nan字符串列用pd.NA统一替换为None
dataframe = dataframe.replace({np.nan: None, pd.NA: None})
try:
count_insert = 0
count_update = 0
# 遍历DataFrame的每一行
for _, row in dataframe.iterrows():
# 先检查 code 是否存在且有效
if not row.get('代码'):
logging.warning(f"警告:发现无效的 code 值,跳过该行数据。行数据:{row['名称']}")
continue # 跳过无效行
# 创建股票数据对象
stock = DailySanpModel(
code=row['代码'],
curr_date=current_date, # TODO: 怎么判断当前的数据是哪一天的? 要看当前时间是否已经开盘,还是在盘前,还是前一个交易日的?
name=row['名称'],
market_id=row['market_id'],
code_prefix=row['代码前缀'],
industry=row['所处行业'],
listing_date=pd.to_datetime(row['上市时间']).date() if row['上市时间'] else None,
latest_price=row['最新价'],
price_change_percent=row['涨跌幅'],
price_change=row['涨跌额'],
volume=row['成交量'],
turnover=row['成交额'],
amplitude=row['振幅'],
turnover_rate=row['换手率'],
pe_dynamic=row['市盈率动'],
volume_ratio=row['量比'],
change_5min=row['5分钟涨跌'],
highest=row['最高'],
lowest=row['最低'],
opening=row['今开'],
previous_close=row['昨收'],
price_speed=row['涨速'],
total_market_cap=row['总市值'],
circulating_market_cap=row['流通市值'],
pb_ratio=row['市净率'],
change_60d=row['60日涨跌幅'],
change_ytd=row['年初至今涨跌幅'],
weighted_roe=row['加权净资产收益率'],
total_shares=row['总股本'],
circulating_shares=row['已流通股份'],
operating_revenue=row['营业收入'],
revenue_growth=row['营业收入同比增长'],
net_profit=row['归属净利润'],
net_profit_growth=row['归属净利润同比增长'],
undistributed_profit_per_share=row['每股未分配利润'],
gross_margin=row['毛利率'],
asset_liability_ratio=row['资产负债率'],
reserve_per_share=row['每股公积金'],
earnings_per_share=row['每股收益'],
net_asset_per_share=row['每股净资产'],
pe_static=row['市盈率静'],
pe_ttm=row['市盈率TTM'],
report_period=row['报告期']
)
# 2. 执行merge存在则更新不存在则插入
merged_stock = session.merge(stock)
# 3. 统计插入/更新数量
if merged_stock in session.new: # 新插入
count_insert += 1
elif merged_stock in session.dirty: # 已更新
count_update += 1
# 提交事务
session.commit()
logging.info(f"成功插入 {count_insert} 条,更新 {count_update} 条数据")
except Exception as e:
# 发生错误时回滚
session.rollback()
logging.warning(f"插入数据失败: {str(e)}")
finally:
# 关闭会话
session.close()
def main():
# 获取快照数据
snap_data = fetch_snap_all()
if snap_data.empty:
logging.error(f"fetching snapshot data error!")
return
em_code_map = {row['代码']: row['代码前缀'] for _, row in snap_data.iterrows()}
insert_stock_data_to_db(dataframe=snap_data)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,140 @@
from futu import *
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from src.sqlalchemy.models.stockdb import Base, FutuTradingDayModel
import datetime
import logging
import numpy as np
import src.logger.logger as logger
from src.sqlalchemy.config import global_db_url
# 配置日志
logger.setup_logging()
def get_current_year():
"""获取当前年份"""
return datetime.datetime.now().year
def get_year_date_range(year):
"""获取指定年份的年初和年末日期"""
start_date = f"{year}-01-01"
end_date = f"{year}-12-31"
return start_date, end_date
def init_db(db_url=global_db_url):
"""初始化数据库连接和表结构"""
engine = create_engine(db_url)
# 创建表(如果不存在)
Base.metadata.create_all(engine)
return sessionmaker(bind=engine)
def save_trading_days_to_db(session, market, trading_days):
"""
将交易日历数据保存到数据库
存在则更新,不存在则插入
"""
count_insert = 0
count_update = 0
for day in trading_days:
# 转换日期字符串为date对象
trade_date = datetime.datetime.strptime(day['time'], '%Y-%m-%d').date()
# 查询是否已存在
existing = session.query(FutuTradingDayModel).filter(
FutuTradingDayModel.market == market,
FutuTradingDayModel.trade_date == trade_date
).first()
if existing:
# 存在则更新类型
if existing.trade_date_type != day['trade_date_type']:
existing.trade_date_type = day['trade_date_type']
count_update += 1
else:
# 不存在则插入新记录
new_day = FutuTradingDayModel(
market=market,
trade_date=trade_date,
trade_date_type=day['trade_date_type']
)
session.add(new_day)
count_insert += 1
return count_insert, count_update
def get_futu_trading_days(market, desc, db_url=global_db_url, year=None):
"""
获取指定市场的交易日历并保存到数据库
:param market: 市场类型如TradeDateMarket.HK
:param db_url: 数据库连接字符串
:param year: 年份,默认当前年
"""
# 确定年份
target_year = year or get_current_year()
start_date, end_date = get_year_date_range(target_year)
logging.info(f"开始获取 {market} 市场 {target_year} 年的交易日历({start_date}{end_date}")
# 初始化富途连接
quote_ctx = None
try:
quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
# 请求交易日历
ret, data = quote_ctx.request_trading_days(
market=market,
start=start_date,
end=end_date
)
if ret != RET_OK:
logging.error(f"获取交易日历失败: {data}")
return False
logging.info(f"成功获取 {len(data)} 条交易日数据")
# 初始化数据库会话
Session = init_db(db_url)
session = Session()
try:
# 保存到数据库
count_insert, count_update = save_trading_days_to_db(
session=session,
market=desc, # 存储枚举值(如'HK'
trading_days=data
)
session.commit()
logging.info(f"数据库操作完成:新增 {count_insert} 条,更新 {count_update}")
return True
except Exception as e:
session.rollback()
logging.error(f"数据库操作失败: {str(e)}")
return False
finally:
session.close()
except Exception as e:
logging.error(f"连接富途API失败: {str(e)}")
return False
finally:
# 确保连接关闭
if quote_ctx:
quote_ctx.close()
if __name__ == "__main__":
# 示例获取港股HK和美股US的当前年交易日历
markets = {
'CN' : TradeDateMarket.CN, # 港股
'HK' : TradeDateMarket.HK, # 港股
'US' : TradeDateMarket.US # 美股
}
for desc, market in markets.items():
get_futu_trading_days(
market=market,
desc=desc,
# db_url=DB_URL,
# year=2024 # 可选:指定年份,默认当前年
)

122
src/static/trading_day.py Normal file
View File

@ -0,0 +1,122 @@
from datetime import datetime, time, date, timedelta
from zoneinfo import ZoneInfo
from sqlalchemy.orm import Session
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from src.sqlalchemy.models.stockdb import FutuTradingDayModel
from src.sqlalchemy.config import global_db_url
# 市场时间配置(东八区 Asia/Shanghai
# 新增盘前开始时间pre_market_start作为判断临界点
MARKET_HOURS = {
# A股沪深盘前集合竞价9:15开始实际交易9:30-15:00
"CN": {
"pre_market_start": time(9, 15), # 盘前开始时间(集合竞价)
"morning_start": time(9, 30),
"morning_end": time(11, 30),
"afternoon_start": time(13, 0),
"afternoon_end": time(15, 0)
},
# 港股盘前竞价9:00开始实际交易9:30-16:00
"HK": {
"pre_market_start": time(9, 0), # 盘前开始时间
"morning_start": time(9, 30),
"morning_end": time(12, 0),
"afternoon_start": time(13, 0),
"afternoon_end": time(16, 0)
},
# 美股盘前交易4:00-9:30纽约时间对应东八区夏令时16:00-21:30冬令时17:00-22:30
"US": {
# 东八区盘前开始时间(夏令时/冬令时)
"dst_pre_start": time(16, 0), # 夏令时盘前开始
"std_pre_start": time(17, 0), # 冬令时盘前开始
# 东八区交易时间(同之前)
"dst_start": time(21, 30),
"dst_end": time(4, 0),
"std_start": time(22, 30),
"std_end": time(5, 0)
}
}
def get_trading_date(market: str, db_session: Session) -> str:
"""
根据新逻辑返回交易日期:
1. 若当前日期是交易日,且当前时间 >= 盘前开始时间 → 取当前交易日
2. 否则(非交易日,或交易日但未到盘前时间) → 取前一交易日
参数:
market: 市场标识CN=A股, HK=港股, US=美股)
db_session: SQLAlchemy数据库会话对象
返回:
交易日期字符串YYYY-MM-DD
"""
# 1. 获取东八区当前时间和日期
tz_sh = ZoneInfo("Asia/Shanghai")
now = datetime.now(tz_sh) # 含时区的当前时间
current_date: date = now.date() # 东八区当前日期(仅日期)
current_time: time = now.time() # 东八区当前时间(仅时间)
# 2. 判断当前日期是否为该市场的交易日
def is_trading_day(market: str, date: date, session: Session) -> bool:
"""检查指定日期是否为该市场的交易日"""
return session.query(FutuTradingDayModel).filter(
FutuTradingDayModel.market == market,
FutuTradingDayModel.trade_date == date
).first() is not None
# 3. 获取该市场的盘前开始时间(东八区)
def get_pre_market_start(market: str, now: datetime) -> time:
"""根据市场和当前时间(判断夏令时)返回盘前开始时间"""
if market == "US":
# 美股需根据纽约时区判断夏令时
tz_ny = ZoneInfo("America/New_York")
now_ny = now.astimezone(tz_ny)
is_dst = now_ny.dst() != timedelta(0) # 夏令时判断
return MARKET_HOURS["US"]["dst_pre_start"] if is_dst else MARKET_HOURS["US"]["std_pre_start"]
else:
# A股/港股直接返回配置的盘前时间
return MARKET_HOURS[market]["pre_market_start"]
# 4. 核心逻辑判断
current_is_trading_day = is_trading_day(market, current_date, db_session)
pre_market_start = get_pre_market_start(market, now)
is_after_pre_market = current_time >= pre_market_start # 当前时间是否过了盘前开始时间
# 5. 确定查询条件
if current_is_trading_day and is_after_pre_market:
# 情况1当前是交易日且已过盘前时间 → 取当前交易日
target_date = current_date
else:
# 情况2非交易日或未到盘前时间 → 取前一交易日
# 查询小于当前日期的最大交易日
prev_trading_day = db_session.query(FutuTradingDayModel.trade_date).filter(
FutuTradingDayModel.market == market,
FutuTradingDayModel.trade_date < current_date
).order_by(FutuTradingDayModel.trade_date.desc()).first()
if not prev_trading_day:
raise ValueError(f"未查询到{market}市场的前一交易日数据")
target_date = prev_trading_day.trade_date
return target_date.strftime("%Y-%m-%d")
# 示例:获取各市场的目标交易日期
if __name__ == "__main__":
engine = create_engine(global_db_url)
Session = sessionmaker(bind=engine)
db_session = Session()
try:
# 分别获取三个市场的交易日期
print(f"A股目标交易日期{get_trading_date('CN', db_session)}")
print(f"港股目标交易日期:{get_trading_date('HK', db_session)}")
print(f"美股目标交易日期:{get_trading_date('US', db_session)}")
finally:
db_session.close()