modify dirs

This commit is contained in:
oscarz
2025-03-21 21:21:18 +08:00
parent 7c8b27339c
commit 564c7bd442
23 changed files with 975 additions and 7 deletions

View File

@ -0,0 +1,350 @@
"""
Script Name:
Description: 获取沪深300成分股的最新股价, 并计算年内涨幅, 924以来的涨幅, 市盈率, 股息率等。
调用em历史数据接口。
Author: [Your Name]
Created Date: YYYY-MM-DD
Last Modified: YYYY-MM-DD
Version: 1.0
Modification History:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
"""
import pymysql
import logging
import csv
import os
import time
from datetime import datetime
from futu import OpenQuoteContext, RET_OK # Futu API client
from futu import *
import argparse
import src.crawling.stock_hist_em as his_em
import src.logger.logger as logger
import src.config.config as config
# 配置日志
logger.setup_logging()
current_date = datetime.now().strftime("%Y%m%d")
current_year = datetime.now().strftime("%Y")
res_dir = config.global_stock_data_dir
# 刷新代码列表,并返回
def flush_code_map():
code_id_map_em_df = his_em.code_id_map_em()
print(code_id_map_em_df)
return code_id_map_em_df
# 获取历史K线如果失败就重试
def fetch_with_retry(code: str, s_date, e_date, adjust: str = '', max_retries: int = 3) -> pd.DataFrame :
retries = 0
while retries < max_retries:
try:
# 调用 stock_zh_a_hist 获取历史数据
df = his_em.stock_zh_a_hist(
symbol=code,
period="daily",
start_date=s_date,
end_date=e_date,
adjust=adjust,
)
# 如果获取到的数据为空,记录日志并重试
if df.empty:
logging.info(f'{code} empty data. retry...')
retries += 1
time.sleep(3) # 每次重试前休眠 3 秒
else:
return df
except Exception as e:
retries += 1
time.sleep(3) # 每次重试前休眠 3 秒
return pd.DataFrame()
# 获取所有市场的当年股价快照,带重试机制。
def fetch_snap_all(max_retries: int = 3) -> pd.DataFrame:
market_fs = {"china_a": "m:0 t:6,m:0 t:80,m:1 t:2,m:1 t:23,m:0 t:81 s:2048",
"hk": "m:128 t:3,m:128 t:4,m:128 t:1,m:128 t:2",
"us": "m:105,m:106,m:107"}
result = pd.DataFrame()
for market_id, fs in market_fs.items():
retries = 0
while retries < max_retries:
try:
df = his_em.stock_zh_a_spot_em(fs)
# 如果获取到的数据为空,记录日志并重试
if df.empty:
logging.warning(f'{market_id} empty data. retry...')
retries += 1
time.sleep(3) # 每次重试前休眠 3 秒
else:
print(f'get {market_id} stock snapshot. stock count: {len(df)}')
result = pd.concat([result, df], ignore_index=True)
break
except Exception as e:
retries += 1
time.sleep(3) # 每次重试前休眠 3 秒
if retries >= max_retries:
logging.warning(f'{market_id} fetching error.')
return result
# 从数据库中读取指定指数的成分股
def load_index_codes():
conn = pymysql.connect(**config.db_config)
cursor = conn.cursor(pymysql.cursors.DictCursor)
#沪深300
#cursor.execute("SELECT code, code_name FROM index_hs where index_code='000300' ")
#中证A500
#cursor.execute("SELECT code, code_name FROM index_hs where index_code='000510' ")
#沪深300和中证A500的并集去重
#cursor.execute("SELECT DISTINCT CONCAT('index-', code) as code, code_name FROM index_hs where index_code IN ('000300', '000510') ")
#沪深300和中证A500的合并不去重
cursor.execute("SELECT DISTINCT CONCAT(index_code , '-', code) as code, code_name FROM index_hs where index_code IN ('000300', '000510') ")
#沪深300、中证A500、中证A50、科创芯片、科创创业50不去重
#cursor.execute("SELECT DISTINCT CONCAT(index_code , '-', code) as code, code_name FROM index_hs where index_code IN ('000300', '000510', '930050', '000685', '931643') ")
hs300_data = cursor.fetchall()
#港股国企指数成分股、恒生科技指数成分股等
cursor.execute("SELECT DISTINCT CONCAT(index_code , '-', code) as code, code_name FROM index_hk where index_code IN ('HSCEI', 'HSTECH') ")
hk_data = cursor.fetchall()
#美股中概股等
cursor.execute("SELECT DISTINCT CONCAT(index_code , '-', code) as code, code_name FROM index_us where index_code IN ('CN_US') ")
us_data = cursor.fetchall()
cursor.close()
conn.close()
return hs300_data + hk_data + us_data
# 读取富途自选股的指定分类股
def load_futu_all_codes():
quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
stock_data = []
ret, data = quote_ctx.get_user_security("全部")
if ret == RET_OK:
if data.shape[0] > 0: # 如果自选股列表不为空
stock_data = [{'code': row['code'], 'code_name': row['name']} for _, row in data.iterrows() if row['stock_type'] == 'STOCK']
#stock_data = [{'code': row['code'], 'code_name': row['name']} for _, row in data.iterrows()]
else:
logging.error('error:', data)
quote_ctx.close() # 结束后记得关闭当条连接,防止连接条数用尽
return stock_data
# 获取特定的行
def get_specific_date_row(data, date):
"""获取特定日期的行"""
for row in data:
if row['日期'] == date:
return row
return None
# 获取股票数据,并统计收益率
def calculate_stock_statistics(market, code, code_name):
try:
# 获取当前日期(用于比较)
last_year = datetime.now().year - 1
last_year_str = str(last_year)
# 获取历史数据
data = fetch_with_retry(code, "20210101", current_date, 'qfq')
if data.empty:
logging.warning(f'{code}, {code_name} has no data. skipping...')
return None
# 获取当前日期的股价
current_row = data.loc[data['日期'].idxmax()]
# 默认行如果该股票没有年初股价0923股价1008股价等以此记录代替
defaut_row = data.loc[data['日期'].idxmin()]
# 获取年初股价,也就是上一年的最后一个交易日的收盘价
year_data = data[data['日期'].str.startswith(last_year_str)]
if year_data.empty:
logging.warning(f"{code}, {code_name} 未找到上一年的数据 ({last_year_str}), 以 {defaut_row['日期']} 的数据来代替")
year_begin_row = defaut_row
else:
year_begin_row = year_data.loc[year_data['日期'].idxmax()]
# 获取0923收盘价
try:
row_0923 = data[data['日期'] == '2024-09-23'].iloc[0]
except IndexError:
logging.warning(f"{code}, {code_name} 未找到0923的数据, 以 {defaut_row['日期']} 的数据来代替")
row_0923 = defaut_row
# 获取0930收盘价
try:
row_0930 = data[data['日期'] == '2024-09-30'].iloc[0]
except IndexError:
logging.warning(f"{code}, {code_name} 未找到0930的数据, 以 {defaut_row['日期']} 的数据来代替")
row_0930 = defaut_row
# 获取1008开盘价、收盘价
try:
row_1008 = data[data['日期'] == '2024-10-08'].iloc[0]
except IndexError:
logging.warning(f"{code}, {code_name} 未找到1008的数据, 以 {defaut_row['日期']} 的数据来代替")
row_1008 = defaut_row
# 获取2021年以来的最高价
max_close_row = data.loc[data['收盘'].idxmax()]
# 获取2021年以来的最低价
min_close_row = data.loc[data['收盘'].idxmin()]
# 获取年内的最高价、最低价
year_data = data[data['日期'].str.startswith(current_year)]
if year_data.empty:
logging.warning(f"{code}, {code_name} 未找到年内的数据, 以 {defaut_row['日期']} 的数据来代替")
year_min_row = defaut_row
year_max_row = defaut_row
else:
year_min_row = year_data.loc[year_data['收盘'].idxmin()]
year_max_row = year_data.loc[year_data['收盘'].idxmax()]
# 计算统计数据
try:
year_increase = (current_row['收盘'] / year_begin_row['收盘'] - 1)
growth_0923 = (current_row['收盘'] / row_0923['收盘'] - 1)
growth_0930 = (current_row['收盘'] / row_0930['收盘'] - 1)
growth_1008 = (current_row['收盘'] / row_1008['收盘'] - 1)
growth_1008_open = (current_row['收盘'] / row_1008['开盘'] - 1)
year_amplitude = (year_max_row['收盘'] / year_min_row['收盘'] - 1)
max_amplitude = (max_close_row['收盘'] / min_close_row['收盘'] - 1)
stock_recovery = (current_row['收盘'] / max_close_row['收盘'] - 1)
except ZeroDivisionError:
logging.error(f"股票 {code} 计算时遇到除零错误")
return None
# 组织结果
result = [
market,
code,
code_name,
current_row['日期'], current_row['收盘'],
year_begin_row['日期'], year_begin_row['收盘'],
row_0923['日期'], row_0923['收盘'] ,
row_0930['日期'], row_0930['收盘'] ,
row_1008['日期'], row_1008['开盘'] ,row_1008['收盘'] ,
max_close_row['日期'], max_close_row['收盘'],
min_close_row['日期'], min_close_row['收盘'],
year_max_row['日期'], year_max_row['收盘'],
year_min_row['日期'], year_min_row['收盘'],
year_increase,
growth_0923 if growth_0923 is not None else 'N/A',
growth_0930 if growth_0930 is not None else 'N/A',
growth_1008 if growth_1008 is not None else 'N/A',
growth_1008_open if growth_1008_open is not None else 'N/A',
year_amplitude,
max_amplitude,
stock_recovery
]
return result
except Exception as e:
logging.error(f"处理股票 {code} 时出错: {e}")
return None
# 写入到文件中
def write_to_csv(results, filename):
"""将所有结果写入CSV文件"""
try:
with open(filename, mode='w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
# 写入表头
writer.writerow([
"股市", "股票代码", "股票名称", "当前日期", "当前收盘", "年初日期", "年初收盘",
"0923日期", "0923收盘", "0930日期", "0930收盘", "1008日期", "1008开盘", "1008收盘",
"最高日期", "最高收盘", "最低日期", "最低收盘", "年内最高日期", "年内最高收盘", "年内最低日期", "年内最低收盘", "年内涨幅",
"相比0923收盘价涨幅", "相比0930收盘价涨幅", "相比1008收盘价涨幅", "相比1008开盘价涨幅",
"年内振幅", "最大振幅", "股价自最高点恢复", "市盈率TTM", "市净率", "总市值"
])
# 写入每行数据
for result in results:
writer.writerow(result)
except Exception as e:
logging.error(f"写入CSV文件时出错: {e}")
# 主函数,执行逻辑
def main(list, debug):
futu_codes = []
index_codes = []
if list == 'futu':
futu_codes = load_futu_all_codes()
elif list == 'all':
futu_codes = load_futu_all_codes()
index_codes = load_index_codes()
else:
index_codes = load_index_codes()
codes = futu_codes + index_codes
all_results = []
# 获取快照数据,并保存到文件
snap_data = fetch_snap_all()
if snap_data.empty:
logging.error(f"fetching snapshot data error!")
return
file_name = f'{res_dir}/snapshot_em_{current_date}.csv'
snap_data.to_csv(file_name, index=False, encoding='utf-8')
logging.info(f"市场快照数据已经写入 CSV 文件 {file_name}\n\n")
for item in codes:
code = item['code']
code_name = item['code_name']
# 清理股票代码中的前缀
try:
market, clean_code = code.split(".")
except ValueError:
logging.error(f"wrong format code: {code}")
logging.info(f"正在处理股票 {market}.{clean_code}, {code_name}...")
result = calculate_stock_statistics(market, clean_code, code_name)
if result:
match = snap_data.loc[snap_data['代码'] == clean_code]
if not match.empty: # 如果找到了匹配项
result.append(match['市盈率TTM'].iloc[0])
result.append(match['市净率'].iloc[0])
result.append(match['总市值'].iloc[0])
else:
logging.warning(f'{market}.{clean_code} has no snapshot data.')
all_results.append(result)
if debug:
break
if all_results:
file_name = f'{res_dir}/stock_statistics_{list}_{current_date}'
if debug:
file_name = f'{file_name}_debug'
file_name = f'{file_name}.csv'
write_to_csv(all_results, f'{file_name}')
logging.info(f"统计结果已写入 CSV 文件 {file_name}")
else:
logging.warning("没有可写入的统计数据")
if __name__ == "__main__":
# 命令行参数处理
parser = argparse.ArgumentParser(description='计算指定股票的区间收益率')
parser.add_argument('--list', type=str, default='futu', help='Stocklist to process (futu , index, all)')
parser.add_argument('--debug', action='store_true', help='Enable debug mode (limit records)')
args = parser.parse_args()
# 调用主函数
#flush_code_map()
main(args.list, args.debug)