modify scripts

This commit is contained in:
2025-07-19 17:06:27 +08:00
parent ad47bcf511
commit cbd9b18ef4
9 changed files with 1171 additions and 696 deletions

View File

@ -0,0 +1,127 @@
import akshare as ak
import pandas as pd
import datetime
import time
import logging
# 配置日志
logging.basicConfig(
filename='stock_signals.log',
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
)
# 板块关键词
TARGET_SECTORS = ['互联网服务', '芯片', '消费', '房地产']
def fetch_sector_stocks(sector_name):
"""
获取某个行业板块的股票列表
"""
try:
df_plates = ak.stock_board_industry_name_em()
if not df_plates['板块名称'].isin([sector_name]).any():
logging.warning(f"{sector_name} not exists!")
return pd.DataFrame()
df = ak.stock_board_industry_cons_em(symbol=sector_name)
return df[['代码', '名称']]
except Exception as e:
logging.error(f"获取板块 {sector_name} 股票失败: {e}")
return pd.DataFrame()
def compute_signals(df):
"""
根据行情数据计算交易信号
"""
signals = []
for _, row in df.iterrows():
try:
code = row['代码']
name = row['名称']
pct_today = row['当日涨跌幅']
pct_year = row['年内涨跌幅']
pe = row['市盈率']
pb = row['市净率']
signal = ''
if pct_today < -5:
signal += '今日大跌; '
if pct_year < -20:
signal += '年内大跌; '
if pe < 50:
signal += '低市盈率; '
if signal:
signals.append({
'代码': code,
'名称': name,
'当日涨跌幅': pct_today,
'年内涨跌幅': pct_year,
'市盈率': pe,
'市净率': pb,
'信号': signal.strip()
})
except Exception as e:
logging.warning(f"处理股票 {row} 时出错: {e}")
return pd.DataFrame(signals)
def fetch_and_analyze():
"""
获取行情并计算信号
"""
logging.info("开始获取并分析行情数据")
all_stocks = pd.DataFrame()
for sector in TARGET_SECTORS:
df = fetch_sector_stocks(sector)
if df.empty:
continue
logging.info(f"获取到板块 [{sector}] {len(df)} 只股票")
for code in df['代码']:
try:
# 获取日K线
kline = ak.stock_zh_a_hist(symbol=code, period='daily', adjust='qfq')
kline['日期'] = pd.to_datetime(kline['日期'])
kline.set_index('日期', inplace=True)
today = kline.iloc[-1]
close_today = today['收盘']
close_5d = kline.iloc[-5]['收盘'] if len(kline) >= 5 else today['收盘']
close_month = kline.iloc[-21]['收盘'] if len(kline) >= 21 else today['收盘']
close_year = kline.iloc[0]['收盘']
pct_today = (close_today / today['开盘'] - 1) * 100
pct_5d = (close_today / close_5d -1) * 100
pct_month = (close_today / close_month -1) * 100
pct_year = (close_today / close_year -1) *100
# 获取市盈率、市净率
fundamentals = ak.stock_a_lg_indicator(symbol=code)
pe = fundamentals.iloc[-1]['市盈率(TTM)']
pb = fundamentals.iloc[-1]['市净率']
all_stocks = pd.concat([all_stocks, pd.DataFrame([{
'代码': code,
'名称': df.loc[df['代码']==code, '名称'].values[0],
'当日涨跌幅': pct_today,
'5日涨跌幅': pct_5d,
'本月涨跌幅': pct_month,
'年内涨跌幅': pct_year,
'市盈率': pe,
'市净率': pb
}])])
except Exception as e:
logging.error(f"获取股票 {code} 数据失败: {e}")
continue
signals = compute_signals(all_stocks)
if not signals.empty:
signals.to_csv(f'stock_signals_{datetime.date.today()}.csv', index=False, encoding='utf-8-sig')
logging.info(f"生成信号 {len(signals)} 条,已写入 CSV。")
else:
logging.info("未发现符合条件的交易信号")
if __name__ == "__main__":
fetch_and_analyze()

View File

@ -0,0 +1,361 @@
import akshare as ak
import schedule
import time
import logging
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple
# 配置日志记录
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("stock_analysis.log", encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger("stock_analyzer")
# 关注的板块列表
WATCH_SECTORS = {
"互联网": "互联网服务",
"芯片": "半导体及元件",
"消费": "消费电子",
"房地产": "房地产开发"
}
# 技术指标参数配置
TECHNICAL_PARAMS = {
"rsi_period": 14,
"macd_fast": 12,
"macd_slow": 26,
"macd_signal": 9,
"bollinger_period": 20
}
# 异动检测阈值
ANOMALY_THRESHOLDS = {
"daily_drop": -5, # 当日跌幅超过5%
"yearly_drop": -20, # 本年跌幅超过20%
"pe_threshold": 50 # 市盈率低于50
}
def fetch_sector_stocks(sector_name):
"""
获取某个行业板块的股票列表
"""
try:
df_plates = ak.stock_board_industry_name_em()
if not df_plates['板块名称'].isin([sector_name]).any():
logging.warning(f"{sector_name} not exists!")
return pd.DataFrame()
df = ak.stock_board_industry_cons_em(symbol=sector_name)
return df[['代码', '名称']]
except Exception as e:
logging.error(f"获取板块 {sector_name} 股票失败: {e}")
return pd.DataFrame()
def get_sector_stocks(sector_name: str, sector_code: str) -> List[str]:
"""获取指定板块的股票列表"""
try:
logger.info(f"获取[{sector_name}]板块股票列表")
# 使用同花顺概念板块获取股票列表
stock_df = fetch_sector_stocks(sector_code)
# 提取股票代码并去重
stock_codes = list(set(stock_df["代码"].tolist()))
logger.info(f"成功获取[{sector_name}]板块股票{len(stock_codes)}")
# 为避免请求过于频繁返回前20只股票
return stock_codes[:20]
except Exception as e:
logger.error(f"获取[{sector_name}]板块股票列表失败: {str(e)}", exc_info=True)
return []
def get_stock_data(stock_code: str) -> Dict:
"""获取个股详细数据"""
try:
logger.info(f"获取股票[{stock_code}]数据")
# 获取实时行情数据
spot_df = ak.stock_zh_a_spot_em()
stock_info = spot_df[spot_df["代码"] == stock_code]
if stock_info.empty:
logger.warning(f"未找到股票[{stock_code}]的实时数据")
return None
stock_info = stock_info.iloc[0]
current_price = stock_info["最新价"]
stock_name = stock_info["名称"]
# 计算日期范围
today = datetime.now().strftime("%Y%m%d")
five_days_ago = (datetime.now() - timedelta(days=7)).strftime("%Y%m%d") # 包含周末
month_ago = (datetime.now() - timedelta(days=30)).strftime("%Y%m%d")
year_start = datetime.now().replace(month=1, day=1).strftime("%Y%m%d")
# 获取历史行情数据
history_df = ak.stock_zh_a_daily(symbol=stock_code, start_date=year_start, end_date=today)
if history_df.empty:
logger.warning(f"未找到股票[{stock_code}]的历史数据")
return None
# 计算各类涨跌幅
indicators = {
"代码": stock_code,
"名称": stock_name,
"当前价": current_price,
"当日涨跌幅(%)": stock_info["涨跌幅"],
"五日涨跌幅(%)": calculate_change(history_df, 5),
"本月涨跌幅(%)": calculate_change(history_df, days=30),
"本年涨跌幅(%)": calculate_change(history_df, start_date=year_start),
"市盈率(PE)": get_valuation_indicator(stock_code, "pe"),
"市净率(PB)": get_valuation_indicator(stock_code, "pb"),
"技术指标": calculate_technical_indicators(history_df),
"更新时间": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
logger.info(f"成功获取股票[{stock_code}({stock_name})]数据")
return indicators
except Exception as e:
logger.error(f"获取股票[{stock_code}]数据失败: {str(e)}", exc_info=True)
return None
def calculate_change(history_df: pd.DataFrame, days: int = None, start_date: str = None) -> float:
"""计算涨跌幅"""
try:
if history_df.empty:
return None
# 按天数计算
if days:
if len(history_df) >= days:
start_price = history_df.iloc[-days]["收盘"]
end_price = history_df.iloc[-1]["收盘"]
return (end_price - start_price) / start_price * 100
else:
return None
# 按起始日期计算
if start_date:
# 转换日期格式
history_df["日期"] = pd.to_datetime(history_df["日期"])
start_date_obj = datetime.strptime(start_date, "%Y%m%d")
# 找到起始日期后的数据
start_row = history_df[history_df["日期"] >= start_date_obj].iloc[0]
end_row = history_df.iloc[-1]
return (end_row["收盘"] - start_row["开盘"]) / start_row["开盘"] * 100
except Exception as e:
logger.error(f"计算涨跌幅失败: {str(e)}")
return None
def get_valuation_indicator(stock_code: str, indicator: str) -> float:
"""获取估值指标(市盈率、市净率)"""
try:
valuation_df = ak.stock_a_indicator_lg(symbol=stock_code)
if valuation_df.empty or indicator not in valuation_df.columns:
return None
# 返回最新的指标值
return valuation_df[indicator].iloc[-1]
except Exception as e:
logger.error(f"获取股票[{stock_code}]的{indicator}失败: {str(e)}")
return None
def calculate_technical_indicators(history_df: pd.DataFrame) -> Dict:
"""计算技术指标"""
indicators = {}
try:
if history_df.empty:
return indicators
# 确保有足够的数据计算指标
if len(history_df) < TECHNICAL_PARAMS["bollinger_period"]:
logger.warning("历史数据不足,无法计算所有技术指标")
return indicators
close_prices = history_df["收盘"]
# 计算RSI
delta = close_prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=TECHNICAL_PARAMS["rsi_period"]).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=TECHNICAL_PARAMS["rsi_period"]).mean()
rs = gain / loss
indicators["RSI"] = 100 - (100 / (1 + rs)).iloc[-1]
# 计算MACD
ema_fast = close_prices.ewm(span=TECHNICAL_PARAMS["macd_fast"], adjust=False).mean()
ema_slow = close_prices.ewm(span=TECHNICAL_PARAMS["macd_slow"], adjust=False).mean()
indicators["MACD"] = (ema_fast - ema_slow).iloc[-1]
indicators["MACD_Signal"] = indicators["MACD"].ewm(span=TECHNICAL_PARAMS["macd_signal"], adjust=False).mean()
# 计算布林带
sma = close_prices.rolling(window=TECHNICAL_PARAMS["bollinger_period"]).mean()
std = close_prices.rolling(window=TECHNICAL_PARAMS["bollinger_period"]).std()
indicators["布林带上轨"] = (sma + 2 * std).iloc[-1]
indicators["布林带中轨"] = sma.iloc[-1]
indicators["布林带下轨"] = (sma - 2 * std).iloc[-1]
return indicators
except Exception as e:
logger.error(f"计算技术指标失败: {str(e)}")
return indicators
def detect_anomalies_and_signals(stock_data: Dict) -> Tuple[List[str], List[str]]:
"""检测股票异动和生成交易信号"""
anomalies = []
signals = []
if not stock_data:
return anomalies, signals
# 检测异动情况
# 当日跌幅超过阈值
if (stock_data["当日涨跌幅(%)"] is not None and
stock_data["当日涨跌幅(%)"] < ANOMALY_THRESHOLDS["daily_drop"]):
anomalies.append(
f"当日跌幅超过{abs(ANOMALY_THRESHOLDS['daily_drop'])}%: "
f"{stock_data['当日涨跌幅(%)']:.2f}%"
)
# 本年跌幅超过阈值
if (stock_data["本年涨跌幅(%)"] is not None and
stock_data["本年涨跌幅(%)"] < ANOMALY_THRESHOLDS["yearly_drop"]):
anomalies.append(
f"本年跌幅超过{abs(ANOMALY_THRESHOLDS['yearly_drop'])}%: "
f"{stock_data['本年涨跌幅(%)']:.2f}%"
)
# 市盈率低于阈值
if (stock_data["市盈率(PE)"] is not None and
stock_data["市盈率(PE)"] > 0 and # 排除负市盈率
stock_data["市盈率(PE)"] < ANOMALY_THRESHOLDS["pe_threshold"]):
anomalies.append(
f"市盈率低于{ANOMALY_THRESHOLDS['pe_threshold']}: "
f"{stock_data['市盈率(PE)']:.2f}"
)
# 生成交易信号
ti = stock_data.get("技术指标", {})
# RSI信号
if "RSI" in ti:
if ti["RSI"] < 30:
signals.append(f"RSI超卖({ti['RSI']:.2f}),可能的买入信号")
elif ti["RSI"] > 70:
signals.append(f"RSI超买({ti['RSI']:.2f}),可能的卖出信号")
# MACD信号
if "MACD" in ti and "MACD_Signal" in ti:
if ti["MACD"] > ti["MACD_Signal"]:
signals.append(f"MACD金叉可能的买入信号")
else:
signals.append(f"MACD死叉可能的卖出信号")
# 布林带信号
if ("布林带上轨" in ti and "布林带下轨" in ti and
stock_data["当前价"] is not None):
current_price = stock_data["当前价"]
if current_price < ti["布林带下轨"]:
signals.append(f"价格低于布林带下轨,可能的买入信号")
elif current_price > ti["布林带上轨"]:
signals.append(f"价格高于布林带上轨,可能的卖出信号")
return anomalies, signals
def analyze_sector(sector_name: str, sector_code: str):
"""分析指定板块"""
logger.info(f"\n===== 开始分析[{sector_name}]板块 =====")
# 获取板块股票列表
stock_codes = get_sector_stocks(sector_name, sector_code)
if not stock_codes:
logger.warning(f"[{sector_name}]板块没有可分析的股票")
return
# 分析每只股票
for stock_code in stock_codes:
stock_data = get_stock_data(stock_code)
if not stock_data:
continue
# 检测异动和信号
anomalies, signals = detect_anomalies_and_signals(stock_data)
# 记录分析结果
logger.info(f"\n----- 股票: {stock_data['代码']} {stock_data['名称']} -----")
logger.info(f"当前价: {stock_data['当前价']:.2f}")
logger.info(f"当日涨跌幅: {stock_data['当日涨跌幅(%)']:.2f}%")
logger.info(f"五日涨跌幅: {stock_data['五日涨跌幅(%)']:.2f}%" if stock_data['五日涨跌幅(%)'] else "五日涨跌幅: 数据不足")
logger.info(f"本月涨跌幅: {stock_data['本月涨跌幅(%)']:.2f}%" if stock_data['本月涨跌幅(%)'] else "本月涨跌幅: 数据不足")
logger.info(f"本年涨跌幅: {stock_data['本年涨跌幅(%)']:.2f}%" if stock_data['本年涨跌幅(%)'] else "本年涨跌幅: 数据不足")
logger.info(f"市盈率(PE): {stock_data['市盈率(PE)']:.2f}" if stock_data['市盈率(PE)'] else "市盈率(PE): 数据不足")
logger.info(f"市净率(PB): {stock_data['市净率(PB)']:.2f}" if stock_data['市净率(PB)'] else "市净率(PB): 数据不足")
if anomalies:
logger.warning("----- 异动警告 -----")
for anomaly in anomalies:
logger.warning(f"- {anomaly}")
if signals:
logger.info("----- 交易信号 -----")
for signal in signals:
logger.info(f"- {signal}")
logger.info(f"===== [{sector_name}]板块分析完成 =====")
def daily_analysis():
"""每日分析任务"""
logger.info("\n\n=====================================")
logger.info("===== 开始每日股票分析任务 =====")
logger.info(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("=====================================")
# 分析所有关注的板块
for sector_name, sector_code in WATCH_SECTORS.items():
analyze_sector(sector_name, sector_code)
logger.info("\n===== 每日股票分析任务完成 =====")
def main():
"""主函数"""
logger.info("股票分析程序启动")
# 测试时可以立即运行一次
logger.info("执行首次测试分析")
daily_analysis()
# 设置定时任务每天17:00运行
logger.info("设置定时任务每天17:00执行分析")
schedule.every().day.at("17:00").do(daily_analysis)
# 运行定时任务
try:
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
except KeyboardInterrupt:
logger.info("程序被用户中断")
except Exception as e:
logger.error(f"程序运行出错: {str(e)}", exc_info=True)
finally:
logger.info("股票分析程序退出")
if __name__ == "__main__":
main()

View File

@ -18,6 +18,7 @@ import pymysql
import logging
import csv
import os
import re
import time
from datetime import datetime
from futu import OpenQuoteContext, RET_OK # Futu API client
@ -26,6 +27,7 @@ import argparse
import src.crawling.stock_hist_em as his_em
import src.logger.logger as logger
import src.config.config as config
from src.crawler.zixuan.xueqiu_zixuan import XueQiuStockFetcher
# 配置日志
logger.setup_logging()
@ -41,58 +43,36 @@ def flush_code_map():
print(code_id_map_em_df)
return code_id_map_em_df
# 获取历史K线如果失败就重试
def fetch_with_retry(code: str, s_date, e_date, adjust: str = '', max_retries: int = 3) -> pd.DataFrame :
retries = 0
while retries < max_retries:
try:
# 调用 stock_zh_a_hist 获取历史数据
df = his_em.stock_zh_a_hist(
symbol=code,
period="daily",
start_date=s_date,
end_date=e_date,
adjust=adjust,
)
# 如果获取到的数据为空,记录日志并重试
if df.empty:
logging.info(f'{code} empty data. retry...')
retries += 1
time.sleep(3) # 每次重试前休眠 3 秒
else:
return df
except Exception as e:
retries += 1
time.sleep(3) # 每次重试前休眠 3 秒
return pd.DataFrame()
# 获取所有市场的当年股价快照,带重试机制。
def fetch_snap_all(max_retries: int = 3) -> pd.DataFrame:
# 检查文件是否存在
file_name = f'{res_dir}/snapshot_em_{current_date}.csv'
if os.path.exists(file_name):
try:
# 读取本地文件
snap_data = pd.read_csv(file_name, encoding='utf-8')
logging.info(f"load snapshot data from local: {file_name}\n\n")
return snap_data
except Exception as e:
logging.warning(f"读取本地文件失败: {e},将重新拉取数据\n\n")
# 拉取数据
market_fs = {"china_a": "m:0 t:6,m:0 t:80,m:1 t:2,m:1 t:23,m:0 t:81 s:2048",
"hk": "m:128 t:3,m:128 t:4,m:128 t:1,m:128 t:2",
"us": "m:105,m:106,m:107"}
result = pd.DataFrame()
for market_id, fs in market_fs.items():
retries = 0
while retries < max_retries:
try:
df = his_em.stock_zh_a_spot_em(fs)
# 如果获取到的数据为空,记录日志并重试
if df.empty:
logging.warning(f'{market_id} empty data. retry...')
retries += 1
time.sleep(3) # 每次重试前休眠 3 秒
else:
print(f'get {market_id} stock snapshot. stock count: {len(df)}')
result = pd.concat([result, df], ignore_index=True)
break
except Exception as e:
retries += 1
time.sleep(3) # 每次重试前休眠 3 秒
if retries >= max_retries:
logging.warning(f'{market_id} fetching error.')
df = his_em.stock_zh_a_spot_em(fs, fs_desc=market_id)
if df.empty:
logging.warning(f'{market_id} empty data. please check.')
return pd.DataFrame()
else:
logging.info(f'get {market_id} stock snapshot. stock count: {len(df)}')
result = pd.concat([result, df], ignore_index=True)
result.to_csv(file_name, index=False, encoding='utf-8')
logging.info(f"get snapshot data and write to file: {file_name}\n\n")
return result
@ -127,6 +107,55 @@ def load_index_codes():
conn.close()
return hs300_data + hk_data + us_data
def format_stock_code(code):
"""
用正则表达式将 "SZ300750" 转换为 "SZ.300750"
"""
# 正则模式匹配开头的1个或多个字母 followed by 1个或多个数字
pattern = r'^([A-Za-z]+)(\d+)$'
match = re.match(pattern, code)
if match:
# 提取字母部分和数字部分,用点号拼接
letters = match.group(1)
numbers = match.group(2)
return f"{letters}.{numbers}"
else:
# 不匹配模式时返回原始字符串(如已包含点号、有其他字符等)
return code
def load_xueqiu_codes():
# 替换为你的实际cookie
USER_COOKIES = "u=5682299253; HMACCOUNT=AA6F9D2598CE96D7; xq_is_login=1; snbim_minify=true; _c_WBKFRo=BuebJX5KAbPh1PGBVFDvQTV7x7VF8W2cvWtaC99v; _nb_ioWEgULi=; cookiesu=661740133906455; device_id=fbe0630e603f726742fec4f9a82eb5fb; s=b312165egu; bid=1f3e6ffcb97fd2d9b4ddda47551d4226_m7fv1brw; Hm_lvt_1db88642e346389874251b5a1eded6e3=1751852390; xq_a_token=a0fd17a76966314ab80c960412f08e3fffb3ec0f; xqat=a0fd17a76966314ab80c960412f08e3fffb3ec0f; xq_id_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ1aWQiOjU2ODIyOTkyNTMsImlzcyI6InVjIiwiZXhwIjoxNzU0NzAzMjk5LCJjdG0iOjE3NTIxMTEyOTkyODYsImNpZCI6ImQ5ZDBuNEFadXAifQ.Vbs-LDgB4bCJI2N644DwfeptdcamKsAm2hbXxlPnJ_0fnTJhXp6T-2Gc6b6jmhTjXJIsWta8IuS0rQBB1L-9fKpUliNFHkv4lr7FW2x7QhrZ1D4lrvjihgBxKHq8yQl31uO6lmUOJkoRaS4LM1pmkSL_UOVyw8aUeuVjETFcJR1HFDHwWpHCLM8kY55fk6n1gEgDZnYNh1_FACqlm6LU4Vq14wfQgyF9sfrGzF8rxXX0nns_j-Dq2k8vN3mknh8yUHyzCyq6Sfqn6NeVdR0vPOciylyTtNq5kOUBFb8uJe48aV2uLGww3dYV8HbsgqW4k0zam3r3QDErfSRVIg-Usw; xq_r_token=1b73cbfb47fcbd8e2055ca4a6dc7a08905dacd7d; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1752714700; is_overseas=0; ssxmod_itna=QqfxBD2D9DRQPY5i7YYxiwS4GhDYu0D0dGMD3qiQGglDFqAPKDHKm=lerDUhGr5h044VYmkTtDlxWeDZDG9dDqx0orXU7BB411D+iENYYe2GG+=3X0xOguYo7I=xmAkwKhSSIXNG2A+DnmeDQKDoxGkDivoD0IYwDiiTx0rD0eDPxDYDG4mDDvvQ84DjmEmFfoGImAeQIoDbORhz74DROdDS73A+IoGqW3Da1A3z8RGDmKDIhjozmoDFOL3Yq0k54i3Y=Ocaq0OZ+BGR0gvh849m1xkHYRr/oRCYQD4KDx5qAxOx20Z3isrfDxRvt70KGitCH4N4DGbh5gYH7x+GksdC58CNR3sx=1mt2qxkGd+QmoC5ZGYdixKG52q4iiqPj53js4D; ssxmod_itna2=QqfxBD2D9DRQPY5i7YYxiwS4GhDYu0D0dGMD3qiQGglDFqAPKDHKm=lerDUhGr5h044VYmkwYDioSBbrtN4=Htz/DUihxz=w4aD"
# 初始化获取器
fetcher = XueQiuStockFetcher(
cookies=USER_COOKIES,
size=1000,
retry_count=3
)
all_codes = []
stocks = fetcher.get_stocks_by_group(
category=1, # 股票
pid=-1 # 全部
)
if stocks:
for item in stocks:
code = item['symbol']
mkt = item['marketplace']
if mkt:
if mkt.lower() == 'cn':
code = format_stock_code(code)
elif mkt.lower() == 'hk':
code = f"HK.{code}"
else:
code = f"US.{code}"
all_codes.append({'code': code, 'code_name': item['name']})
return all_codes
# 读取富途自选股的指定分类股
def load_futu_all_codes():
quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
@ -143,14 +172,6 @@ def load_futu_all_codes():
return stock_data
# 获取特定的行
def get_specific_date_row(data, date):
"""获取特定日期的行"""
for row in data:
if row['日期'] == date:
return row
return None
# 获取股票数据,并统计收益率
def calculate_stock_statistics(market, code, code_name):
try:
@ -158,10 +179,16 @@ def calculate_stock_statistics(market, code, code_name):
last_year = datetime.now().year - 1
last_year_str = str(last_year)
# 获取历史数据
data = fetch_with_retry(code, "20210101", current_date, 'qfq')
# 调用 stock_zh_a_hist 获取历史数据
data = his_em.stock_zh_a_hist_new(
em_symbol=code,
period="daily",
start_date="20210101",
end_date=current_date,
adjust='qfq',
)
if data.empty:
logging.warning(f'{code}, {code_name} has no data. skipping...')
#logging.warning(f'fetch data for {code}, {code_name} failed. skipping...')
return None
# 获取当前日期的股价
@ -173,7 +200,7 @@ def calculate_stock_statistics(market, code, code_name):
# 获取年初股价,也就是上一年的最后一个交易日的收盘价
year_data = data[data['日期'].str.startswith(last_year_str)]
if year_data.empty:
logging.warning(f"{code}, {code_name} 未找到上一年的数据 ({last_year_str}), 以 {defaut_row['日期']} 的数据来代替")
logging.debug(f"{code}, {code_name} 未找到上一年的数据 ({last_year_str}), 以 {defaut_row['日期']} 的数据来代替")
year_begin_row = defaut_row
else:
year_begin_row = year_data.loc[year_data['日期'].idxmax()]
@ -182,35 +209,35 @@ def calculate_stock_statistics(market, code, code_name):
try:
row_0923 = data[data['日期'] == '2024-09-23'].iloc[0]
except IndexError:
logging.warning(f"{code}, {code_name} 未找到0923的数据, 以 {defaut_row['日期']} 的数据来代替")
logging.debug(f"{code}, {code_name} 未找到0923的数据, 以 {defaut_row['日期']} 的数据来代替")
row_0923 = defaut_row
# 获取0930收盘价
try:
row_0930 = data[data['日期'] == '2024-09-30'].iloc[0]
except IndexError:
logging.warning(f"{code}, {code_name} 未找到0930的数据, 以 {defaut_row['日期']} 的数据来代替")
logging.debug(f"{code}, {code_name} 未找到0930的数据, 以 {defaut_row['日期']} 的数据来代替")
row_0930 = defaut_row
# 获取1008开盘价、收盘价
try:
row_1008 = data[data['日期'] == '2024-10-08'].iloc[0]
except IndexError:
logging.warning(f"{code}, {code_name} 未找到1008的数据, 以 {defaut_row['日期']} 的数据来代替")
logging.debug(f"{code}, {code_name} 未找到1008的数据, 以 {defaut_row['日期']} 的数据来代替")
row_1008 = defaut_row
# 获取0403收盘价
try:
row_0403 = data[data['日期'] == '2025-04-03'].iloc[0]
except IndexError:
logging.warning(f"{code}, {code_name} 未找到0403的数据, 以 {defaut_row['日期']} 的数据来代替")
logging.debug(f"{code}, {code_name} 未找到0403的数据, 以 {defaut_row['日期']} 的数据来代替")
row_0403 = defaut_row
# 获取0407收盘价
try:
row_0407 = data[data['日期'] == '2025-04-07'].iloc[0]
except IndexError:
logging.warning(f"{code}, {code_name} 未找到0407的数据, 以 {defaut_row['日期']} 的数据来代替")
logging.debug(f"{code}, {code_name} 未找到0407的数据, 以 {defaut_row['日期']} 的数据来代替")
row_0407 = defaut_row
# 获取2021年以来的最高价
@ -221,7 +248,7 @@ def calculate_stock_statistics(market, code, code_name):
# 获取年内的最高价、最低价
year_data = data[data['日期'].str.startswith(current_year)]
if year_data.empty:
logging.warning(f"{code}, {code_name} 未找到年内的数据, 以 {defaut_row['日期']} 的数据来代替")
logging.debug(f"{code}, {code_name} 未找到年内的数据, 以 {defaut_row['日期']} 的数据来代替")
year_min_row = defaut_row
year_max_row = defaut_row
else:
@ -300,27 +327,29 @@ def write_to_csv(results, filename):
# 主函数,执行逻辑
def main(list, debug):
futu_codes = []
xueqiu_codes = []
index_codes = []
if list == 'futu':
futu_codes = load_futu_all_codes()
elif list == 'xueqiu':
xueqiu_codes = load_xueqiu_codes()
elif list == 'all':
futu_codes = load_futu_all_codes()
xueqiu_codes = load_xueqiu_codes()
index_codes = load_index_codes()
else:
index_codes = load_index_codes()
codes = futu_codes + index_codes
codes = futu_codes + index_codes + xueqiu_codes
all_results = []
# 获取快照数据,并保存到文件
# 获取快照数据
snap_data = fetch_snap_all()
if snap_data.empty:
logging.error(f"fetching snapshot data error!")
return
file_name = f'{res_dir}/snapshot_em_{current_date}.csv'
snap_data.to_csv(file_name, index=False, encoding='utf-8')
logging.info(f"市场快照数据已经写入 CSV 文件 {file_name}\n\n")
em_code_map = {row['代码']: row['代码前缀'] for _, row in snap_data.iterrows()}
for item in codes:
code = item['code']
@ -331,9 +360,13 @@ def main(list, debug):
market, clean_code = code.split(".")
except ValueError:
logging.error(f"wrong format code: {code}")
if clean_code not in em_code_map:
logging.warning(f"wrong stock code {clean_code}, please check.")
continue
em_code = f"{em_code_map[clean_code]}.{clean_code}"
logging.info(f"正在处理股票 {market}.{clean_code}, {code_name}...")
result = calculate_stock_statistics(market, clean_code, code_name)
result = calculate_stock_statistics(market, em_code, code_name)
if result:
match = snap_data.loc[snap_data['代码'] == clean_code]
if not match.empty: # 如果找到了匹配项
@ -344,6 +377,9 @@ def main(list, debug):
logging.warning(f'{market}.{clean_code} has no snapshot data.')
all_results.append(result)
logging.info(f"get data succ. {market}.{clean_code}, em_code: {em_code}, name: {code_name}...")
else:
logging.warning(f"get data faild. {market}.{clean_code}, em_code: {em_code}, name: {code_name}")
if debug:
break
@ -367,4 +403,6 @@ if __name__ == "__main__":
# 调用主函数
#flush_code_map()
#print(load_futu_all_codes())
#print(load_xueqiu_codes())
main(args.list, args.debug)