add em stat scripts.

This commit is contained in:
2024-12-21 09:32:01 +08:00
parent 6c3a8f7cfa
commit e94ec5d35c
16 changed files with 46547 additions and 58 deletions

View File

@ -10,7 +10,7 @@ import pandas as pd
from functools import lru_cache
def stock_zh_a_spot_em() -> pd.DataFrame:
def stock_zh_a_spot_em(fs='m:0 t:6,m:0 t:80,m:1 t:2,m:1 t:23,m:0 t:81 s:2048') -> pd.DataFrame:
"""
东方财富网-沪深京 A 股-实时行情
https://quote.eastmoney.com/center/gridlist.html#hs_a_board
@ -27,7 +27,7 @@ def stock_zh_a_spot_em() -> pd.DataFrame:
"fltt": "2",
"invt": "2",
"fid": "f3",
"fs": "m:0 t:6,m:0 t:80,m:1 t:2,m:1 t:23,m:0 t:81 s:2048",
"fs": fs,
"fields": "f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f14,f15,f16,f17,f18,f20,f21,f22,f23,f24,f25,f26,f37,f38,f39,f40,f41,f45,f46,f48,f49,f57,f61,f100,f112,f113,f114,f115,f221",
"_": "1623833739532",
}
@ -162,9 +162,9 @@ def stock_zh_a_spot_em() -> pd.DataFrame:
return temp_df
@lru_cache()
def code_id_map_em() -> dict:
#原有版本,实现的比较繁琐,后面有个简化版本替代它。
#@lru_cache()
def code_id_map_em_older() -> dict:
"""
东方财富-股票和市场代码
http://quote.eastmoney.com/center/gridlist.html#hs_a_board
@ -286,7 +286,7 @@ def code_id_map_em() -> dict:
"invt": "2",
"fid": "f3",
"fs": "m:105,m:106,m:107",
"fields": "f12",
"fields": "f12,f13",
"_": "1623833739532",
}
r = requests.get(url, params=params)
@ -294,11 +294,62 @@ def code_id_map_em() -> dict:
if not data_json["data"]["diff"]:
return dict()
temp_df_sz = pd.DataFrame(data_json["data"]["diff"])
temp_df_sz["us_all"] = 105
code_id_dict.update(dict(zip(temp_df_sz["f12"], temp_df_sz["us_all"])))
# 把数据保存到字典中。按照f13的值分别存储
grouped = temp_df_sz.groupby('f13')
for id, group in grouped:
temp_df_sz[f"us_all_{id}"] = id
code_id_dict.update(dict(zip(group["f12"], str(id))))
#print(f"分组 f13 = {id}:")
#print(group)
#temp_df_sz["us_all"] = 105
#code_id_dict.update(dict(zip(temp_df_sz["f12"], temp_df_sz["us_all"])))
print(code_id_dict)
return code_id_dict
@lru_cache()
def code_id_map_em() -> dict:
"""
东方财富-股票和市场代码
http://quote.eastmoney.com/center/gridlist.html#hs_a_board
:return: 股票和市场代码
:rtype: dict
"""
url = "http://80.push2.eastmoney.com/api/qt/clist/get"
params = {
"pn": "1",
"pz": "50000",
"po": "1",
"np": "1",
"ut": "bd1d9ddb04089700cf9c27f6f7426281",
"fltt": "2",
"invt": "2",
"fid": "f3",
"fs": "m:1 t:2,m:1 t:23",
"fields": "f12,f13",
"_": "1623833739532",
}
market_fs = {"china_a": "m:0 t:6,m:0 t:80,m:1 t:2,m:1 t:23,m:0 t:81 s:2048",
"hk": "m:128 t:3,m:128 t:4,m:128 t:1,m:128 t:2",
"us": "m:105,m:106,m:107"}
code_id_dict = dict()
for market_id, fs in market_fs.items():
params['fs'] = fs
r = requests.get(url, params=params)
data_json = r.json()
if not data_json["data"]["diff"]:
return dict()
temp_df = pd.DataFrame(data_json["data"]["diff"])
temp_df["market_id"] = 1
# 把数据保存到字典中。按照f13的值分别存储
grouped = temp_df.groupby('f13')
for id, group in grouped:
temp_df[f"{market_id}_{id}"] = id
#code_id_dict.update(dict(zip(group["f12"], str(id))))
code_id_dict.update(dict.fromkeys(group["f12"], id))
print(f'get {market_id} stock list. f13: {id}, stock count: {len(group)}')
return code_id_dict
def stock_zh_a_hist(
symbol: str = "000001",

View File

@ -44,12 +44,12 @@ end_date = datetime.now().strftime('%Y-%m-%d')
start_date = (datetime.now() - timedelta(days=365*10-1)).strftime('%Y-%m-%d')
# 定义插入数据的函数
def insert_data(connection, data):
def insert_data(connection, data, s_table=selected_table):
try:
with connection.cursor() as cursor:
for index, row in data.iterrows():
sql = f"""
INSERT INTO {selected_table} (code, name, time_key, open, close, high, low, pe_ratio, turnover_rate, volume, turnover, change_rate, last_close)
INSERT INTO {s_table} (code, name, time_key, open, close, high, low, pe_ratio, turnover_rate, volume, turnover, change_rate, last_close)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
name = VALUES(name),
@ -80,43 +80,81 @@ def get_hs300_codes():
cursor.execute("SELECT code FROM hs300 ")
return cursor.fetchall()
# 初始化 futu 行情连接
quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
def stat_growth(s_autype = selected_autype, s_table = selected_table, s_start = start_date, s_end = end_date):
# 初始化 futu 行情连接
quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
try:
hs300_codes = get_hs300_codes()
for code_row in hs300_codes:
code = code_row[0] # 从数据库行中提取 code
try:
hs300_codes = get_hs300_codes()
for code_row in hs300_codes:
code = code_row[0] # 从数据库行中提取 code
# 获取历史 K 线数据,设置分页请求
ret, data, page_req_key = quote_ctx.request_history_kline(code, autype=selected_autype, start=start_date, end=end_date, max_count=500)
if ret == RET_OK:
logging.info(f"成功获取 {code} 的第一页数据,共 {len(data)}")
print(f"成功获取 {code} 的第一页数据,共 {len(data)}")
# 插入第一页数据
insert_data(connection, data)
else:
logging.error(f"获取 {code} 的数据失败: {data}")
print(f"获取 {code} 的数据失败: {data}")
# 分页拉取
while page_req_key is not None:
time.sleep(1) # 休眠 5 秒
ret, data, page_req_key = quote_ctx.request_history_kline(code, autype=selected_autype, start=start_date, end=end_date, max_count=500, page_req_key=page_req_key)
# 获取历史 K 线数据,设置分页请求
ret, data, page_req_key = quote_ctx.request_history_kline(code, autype=s_autype, start=s_start, end=s_end, max_count=500)
if ret == RET_OK:
logging.info(f"成功获取 {code}页数据,共 {len(data)}")
print(f"成功获取 {code}页数据,共 {len(data)}")
# 插入页数据
insert_data(connection, data)
logging.info(f"成功获取 {code}第一页数据,共 {len(data)}")
print(f"成功获取 {code}第一页数据,共 {len(data)}")
# 插入第一页数据
insert_data(connection, data, s_table)
else:
logging.error(f"分页数据获取失败: {data}")
print(f"分页数据获取失败: {data}")
logging.error(f"获取 {code} 的数据失败: {data}")
print(f"获取 {code} 的数据失败: {data}")
# 每次获取完一个股票的数据后,休眠 5 秒
time.sleep(2)
# 分页拉取
while page_req_key is not None:
time.sleep(1) # 休眠 5 秒
ret, data, page_req_key = quote_ctx.request_history_kline(code, autype=s_autype, start=s_start, end=s_end, max_count=500, page_req_key=page_req_key)
if ret == RET_OK:
logging.info(f"成功获取 {code} 的分页数据,共 {len(data)}")
print(f"成功获取 {code} 的分页数据,共 {len(data)}")
finally:
quote_ctx.close() # 关闭 futu 连接
connection.close() # 关闭 MySQL 连接
# 插入分页数据
insert_data(connection, data, s_table)
else:
logging.error(f"分页数据获取失败: {data}")
print(f"分页数据获取失败: {data}")
# 每次获取完一个股票的数据后,休眠 5 秒
time.sleep(2)
finally:
quote_ctx.close() # 关闭 futu 连接
connection.close() # 关闭 MySQL 连接
def print_help():
print("Usage: python script.py type end_date start_date")
print("type: qfq|hfq|none, default for none")
print('start_date: yyyy-mm-dd, default for end_date - 10 years ')
print('end_date: yyyy-mm-dd, default for current date ')
def main():
type = selected_autype
table = selected_table
start = start_date
end = end_date
args_num = len(sys.argv)
if args_num > 1 :
if sys.argv[1] == 'none':
type = AuType.NONE
table = "hs300_his_kline_none"
elif sys.argv[1] == 'qfq':
type = AuType.QFQ
table = "hs300_qfq_his"
elif sys.argv[1] == 'hfq':
type = AuType.HFQ
table = "hs300_his_kline_hfq"
else:
print_help()
exit(1)
if args_num > 2 :
end = sys.argv[2]
if args_num > 3 :
start = sys.argv[3]
print(f'fetching his kline... type: {type}, table: {table}, start: {start}, end: {end}\n\n')
stat_growth(type, table, start, end)
if __name__ == '__main__':
main()

View File

@ -1,7 +1,7 @@
from futu import *
quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
ret, data = quote_ctx.get_user_security("港股")
ret, data = quote_ctx.get_user_security("全部")
if ret == RET_OK:
print(data)
if data.shape[0] > 0: # 如果自选股列表不为空

View File

@ -176,23 +176,27 @@ def calculate_yield():
cursor.execute("SELECT * FROM hs300_qfq_his WHERE code=%s AND time_key>='2021-01-01' ORDER BY pe_ratio ASC LIMIT 1", (code,))
row7 = cursor.fetchone()
# 3.6 获取 2024-10-08 的收盘数据
cursor.execute("SELECT * FROM hs300_his_kline_none WHERE code=%s AND time_key<'2024-10-09' ORDER BY time_key DESC LIMIT 1", (code,))
row8 = cursor.fetchone()
# 4. 读取复权数据
cursor.execute("SELECT * FROM futu_rehab WHERE code=%s ORDER BY ex_div_date ASC", (code,))
rehab_res = cursor.fetchall()
# 5. 计算复权价格
for row in [row1, row2, row3]:
for row in [row1, row2, row3, row8]:
qfq_close = row['close']
time_key_date = row['time_key'].date() # 将 datetime 转换为 date
for rehab in rehab_res:
if rehab['ex_div_date'] >= time_key_date:
if rehab['ex_div_date'] > time_key_date:
qfq_close = (qfq_close * rehab['forward_adj_factorA']) + rehab['forward_adj_factorB']
row['qfq_close'] = qfq_close
# 6. 计算收益率
year_yield = row1['qfq_close'] / row2['qfq_close'] - 1 if row1 and row2 else None
yield_0924 = row1['qfq_close'] / row3['qfq_close'] - 1 if row3 and row1 else None
yield_1008 = row1['qfq_close'] / row8['qfq_close'] - 1 if row8 and row1 else None
# 6.1 计算当前股价是 2021 年之后最高点及最低点的百分比
max_price_pct = row1['qfq_close'] / row4['close'] if row1 and row4 and row4['close'] !=0 else None
@ -208,23 +212,26 @@ def calculate_yield():
'code': code,
'name': name,
'year_begin_date': row2['time_key'].date(),
'year_begin_close': row2['qfq_close'],
'year_begin_close': round(row2['qfq_close'], 4),
'0924_date': row3['time_key'].date(),
'0924_close': row3['qfq_close'],
'0924_close': round(row3['qfq_close'],4),
'1008_date': row8['time_key'].date(),
'1008_close': round(row8['qfq_close'], 4),
'max_price_date': row4['time_key'].date(),
'max_price': row4['close'],
'max_price': round(row4['close'], 4),
'max_price_pe': row4['pe_ratio'],
'max_pe_date': row6['time_key'].date(),
'max_pe': row6['pe_ratio'],
'min_price_date': row5['time_key'].date(),
'min_price': row5['close'],
'min_price': round(row5['close'], 4),
'min_price_pe': row5['pe_ratio'],
'min_pe_date': row7['time_key'].date(),
'min_pe': row7['pe_ratio'],
'latest_date': row1['time_key'].date(),
'latest_close': row1['qfq_close'],
'year_yield': year_yield,
'yield_0924': yield_0924,
'latest_close': round(row1['qfq_close'], 4),
'year_yield': round(year_yield, 4),
'yield_0924': round(yield_0924, 4),
'yield_1008': round(yield_1008, 4),
'total_market_val': row1.get('total_market_val', None),
'pe_ttm_ratio': row1.get('pe_ttm_ratio', None),
'dividend_ratio_ttm': row1.get('dividend_ratio_ttm', None),
@ -235,6 +242,7 @@ def calculate_yield():
'min_price_pe_pct': min_price_pe_pct,
'max_pe_pct': max_pe_pct,
'min_pe_pct': min_pe_pct,
#'price_pe': round(max_price_pct - max_price_pe_pct,4)
}
results.append(result)
logging.info(f"{result}")
@ -245,12 +253,12 @@ def calculate_yield():
# 获取当前日期 格式化为 yyyymmdd
current_date = datetime.now()
date_string = current_date.strftime('%Y%m%d')
fieldnames = ['code', 'name', 'year_begin_date', 'year_begin_close', '0924_date', '0924_close',
fieldnames = ['code', 'name', 'year_begin_date', 'year_begin_close', '0924_date', '0924_close', '1008_date', '1008_close',
'max_price_date', 'max_price', 'max_price_pe', 'max_pe_date', 'max_pe', 'min_price_date', 'min_price', 'min_price_pe', 'min_pe_date', 'min_pe',
'latest_date', 'latest_close', 'year_yield', 'yield_0924', 'total_market_val', 'pe_ttm_ratio', 'dividend_ratio_ttm', 'dividend_lfy_ratio',
'latest_date', 'latest_close', 'year_yield', 'yield_0924', 'yield_1008', 'total_market_val', 'pe_ttm_ratio', 'dividend_ratio_ttm', 'dividend_lfy_ratio',
'max_price_pct', 'max_price_pe_pct', 'min_price_pct', 'min_price_pe_pct', 'max_pe_pct', 'min_pe_pct'
]
write_to_csv(f'./result/stat_growth{date_string}.csv', fieldnames, results)
write_to_csv(f'../result/stat_growth{date_string}.csv', fieldnames, results)
except Exception as e:
logging.error(f"计算收益率时出错: {e}")

View File

@ -0,0 +1,306 @@
"""
Script Name:
Description: 获取沪深300成分股的最新股价, 并计算年内涨幅, 924以来的涨幅, 市盈率, 股息率等。
调用em历史数据接口。
Author: [Your Name]
Created Date: YYYY-MM-DD
Last Modified: YYYY-MM-DD
Version: 1.0
Modification History:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
"""
import pymysql
import logging
import csv
import os
import config
import time
from datetime import datetime
from futu import OpenQuoteContext, RET_OK # Futu API client
from futu import *
import crawling.stock_hist_em as his_em
import argparse
# 配置日志
config.setup_logging()
current_date = datetime.now().strftime("%Y%m%d")
current_year = datetime.now().strftime("%Y")
# 刷新代码列表,并返回
def flush_code_map():
code_id_map_em_df = his_em.code_id_map_em()
print(code_id_map_em_df)
return code_id_map_em_df
# 获取历史K线如果失败就重试
def fetch_with_retry(code: str, s_date, e_date, adjust: str = '', max_retries: int = 3) -> pd.DataFrame :
retries = 0
while retries < max_retries:
try:
# 调用 stock_zh_a_hist 获取历史数据
df = his_em.stock_zh_a_hist(
symbol=code,
period="daily",
start_date=s_date,
end_date=e_date,
adjust=adjust,
)
# 如果获取到的数据为空,记录日志并重试
if df.empty:
logging.info(f'{code} empty data. retry...')
retries += 1
time.sleep(3) # 每次重试前休眠 3 秒
else:
return df
except Exception as e:
retries += 1
time.sleep(3) # 每次重试前休眠 3 秒
return pd.DataFrame()
# 获取所有市场的当年股价快照,带重试机制。
def fetch_snap_all(max_retries: int = 3) -> pd.DataFrame:
market_fs = {"china_a": "m:0 t:6,m:0 t:80,m:1 t:2,m:1 t:23,m:0 t:81 s:2048",
"hk": "m:128 t:3,m:128 t:4,m:128 t:1,m:128 t:2",
"us": "m:105,m:106,m:107"}
result = pd.DataFrame()
for market_id, fs in market_fs.items():
retries = 0
while retries < max_retries:
try:
df = his_em.stock_zh_a_spot_em(fs)
# 如果获取到的数据为空,记录日志并重试
if df.empty:
logging.warning(f'{market_id} empty data. retry...')
retries += 1
time.sleep(3) # 每次重试前休眠 3 秒
else:
print(f'get {market_id} stock snapshot. stock count: {len(df)}')
result = pd.concat([result, df], ignore_index=True)
break
except Exception as e:
retries += 1
time.sleep(3) # 每次重试前休眠 3 秒
if retries >= max_retries:
logging.warning(f'{market_id} fetching error.')
return result
# 从数据库中读取指定指数的成分股
def load_index_codes():
conn = pymysql.connect(**config.db_config)
cursor = conn.cursor(pymysql.cursors.DictCursor)
#沪深300
#cursor.execute("SELECT code, code_name FROM hs_index where index_code='000300' ")
#中证A500
#cursor.execute("SELECT code, code_name FROM hs_index where index_code='000510' ")
#沪深300和中证A500的并集
cursor.execute("SELECT DISTINCT code, code_name FROM hs_index where index_code IN ('000300', '000510') ")
hs300_data = cursor.fetchall()
cursor.close()
conn.close()
return hs300_data
# 读取富途自选股的指定分类股
def load_futu_all_codes():
quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
stock_data = []
ret, data = quote_ctx.get_user_security("全部")
if ret == RET_OK:
if data.shape[0] > 0: # 如果自选股列表不为空
stock_data = [{'code': row['code'], 'code_name': row['name']} for _, row in data.iterrows() if row['stock_type'] == 'STOCK']
#stock_data = [{'code': row['code'], 'code_name': row['name']} for _, row in data.iterrows()]
else:
logging.error('error:', data)
quote_ctx.close() # 结束后记得关闭当条连接,防止连接条数用尽
return stock_data
# 获取特定的行
def get_specific_date_row(data, date):
"""获取特定日期的行"""
for row in data:
if row['日期'] == date:
return row
return None
# 获取股票数据,并统计收益率
def calculate_stock_statistics(market, code, code_name):
try:
# 获取当前日期(用于比较)
last_year = datetime.now().year - 1
last_year_str = str(last_year)
# 获取历史数据
data = fetch_with_retry(code, "20210101", current_date, 'qfq')
if data.empty:
logging.warning(f'{code}, {code_name} has no data. skipping...')
return None
# 获取当前日期的股价
current_row = data.loc[data['日期'].idxmax()]
# 默认行如果该股票没有年初股价0923股价1008股价等以此记录代替
defaut_row = data.loc[data['日期'].idxmin()]
# 获取年初股价,也就是上一年的最后一个交易日的收盘价
year_data = data[data['日期'].str.startswith(last_year_str)]
if year_data.empty:
logging.warning(f"{code}, {code_name} 未找到上一年的数据 ({last_year_str}), 以 {defaut_row['日期']} 的数据来代替")
year_begin_row = defaut_row
else:
year_begin_row = year_data.loc[year_data['日期'].idxmax()]
# 获取0923收盘价
try:
row_0923 = data[data['日期'] == '2024-09-23'].iloc[0]
except IndexError:
logging.warning(f"{code}, {code_name} 未找到0923的数据, 以 {defaut_row['日期']} 的数据来代替")
row_0923 = defaut_row
# 获取1008收盘价
try:
row_1008 = data[data['日期'] == '2024-10-08'].iloc[0]
except IndexError:
logging.warning(f"{code}, {code_name} 未找到1008的数据, 以 {defaut_row['日期']} 的数据来代替")
row_1008 = defaut_row
# 获取2021年以来的最高价
max_close_row = data.loc[data['收盘'].idxmax()]
# 获取2021年以来的最低价
min_close_row = data.loc[data['收盘'].idxmin()]
# 获取年内的最高价、最低价
year_data = data[data['日期'].str.startswith(current_year)]
if year_data.empty:
logging.warning(f"{code}, {code_name} 未找到年内的数据, 以 {defaut_row['日期']} 的数据来代替")
year_min_row = defaut_row
year_max_row = defaut_row
else:
year_min_row = year_data.loc[year_data['收盘'].idxmin()]
year_max_row = year_data.loc[year_data['收盘'].idxmax()]
# 计算统计数据
try:
year_increase = (current_row['收盘'] / year_begin_row['收盘'] - 1)
growth_0923 = (current_row['收盘'] / row_0923['收盘'] - 1)
growth_1008 = (current_row['收盘'] / row_1008['收盘'] - 1)
growth_1008_open = (current_row['收盘'] / row_1008['开盘'] - 1)
year_amplitude = (year_max_row['收盘'] / year_min_row['收盘'] - 1)
max_amplitude = (max_close_row['收盘'] / min_close_row['收盘'] - 1)
stock_recovery = (current_row['收盘'] / max_close_row['收盘'] - 1)
except ZeroDivisionError:
logging.error(f"股票 {code} 计算时遇到除零错误")
return None
# 组织结果
result = [
market,
code,
code_name,
current_row['日期'], current_row['收盘'],
year_begin_row['日期'], year_begin_row['收盘'],
row_0923['日期'], row_0923['收盘'] ,
row_1008['日期'], row_1008['开盘'] ,row_1008['收盘'] ,
max_close_row['日期'], max_close_row['收盘'],
min_close_row['日期'], min_close_row['收盘'],
year_max_row['日期'], year_max_row['收盘'],
year_min_row['日期'], year_min_row['收盘'],
year_increase,
growth_0923 if growth_0923 is not None else 'N/A',
growth_1008 if growth_1008 is not None else 'N/A',
growth_1008_open if growth_1008_open is not None else 'N/A',
year_amplitude,
max_amplitude,
stock_recovery
]
return result
except Exception as e:
logging.error(f"处理股票 {code} 时出错: {e}")
return None
# 写入到文件中
def write_to_csv(results, filename='../stock_statistics.csv'):
"""将所有结果写入CSV文件"""
try:
with open(filename, mode='w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
# 写入表头
writer.writerow([
"股市", "股票代码", "股票名称", "当前日期", "当前收盘", "年初日期", "年初收盘", "0923日期", "0923收盘", "1008日期", "1008开盘", "1008收盘",
"最高日期", "最高收盘", "最低日期", "最低收盘", "年内最高日期", "年内最高收盘", "年内最低日期", "年内最低收盘", "年内涨幅", "相比0923收盘价涨幅",
"相比1008收盘价涨幅", "相比1008开盘价涨幅", "年内振幅", "最大振幅", "股价自最高点恢复", "市盈率TTM", "市净率", "总市值"
])
# 写入每行数据
for result in results:
writer.writerow(result)
except Exception as e:
logging.error(f"写入CSV文件时出错: {e}")
# 主函数,执行逻辑
def main(list, debug):
if list == 'futu':
codes = load_futu_all_codes()
else:
codes = load_index_codes()
all_results = []
# 获取快照数据,并保存到文件
snap_data = fetch_snap_all()
if snap_data.empty:
logging.error(f"fetching snapshot data error!")
return
snap_data.to_csv(f'../result/snapshot_em_{current_date}.csv', index=False, encoding='utf-8')
for item in codes:
code = item['code']
code_name = item['code_name']
# 清理股票代码中的前缀
try:
market, clean_code = code.split(".")
except ValueError:
logging.error(f"wrong format code: {code}")
logging.info(f"正在处理股票 {market}.{clean_code}, {code_name}...")
result = calculate_stock_statistics(market, clean_code, code_name)
if result:
match = snap_data.loc[snap_data['代码'] == clean_code]
if not match.empty: # 如果找到了匹配项
result.append(match['市盈率TTM'].iloc[0])
result.append(match['市净率'].iloc[0])
result.append(match['总市值'].iloc[0])
else:
logging.warning(f'{market}.{clean_code} has no snapshot data.')
all_results.append(result)
if debug:
break
if all_results:
file_name = f'../result/stock_statistics_{list}_{current_date}.csv'
write_to_csv(all_results, f'{file_name}')
logging.info(f"统计结果已写入 CSV 文件 {file_name}")
else:
logging.warning("没有可写入的统计数据")
if __name__ == "__main__":
# 命令行参数处理
parser = argparse.ArgumentParser(description='计算指定股票的区间收益率')
parser.add_argument('--list', type=str, default='futu', help='Stocklist to process (futu or index)')
parser.add_argument('--debug', action='store_true', help='Enable debug mode (limit records)')
args = parser.parse_args()
# 调用主函数
main(args.list, args.debug)

View File

@ -0,0 +1,145 @@
import pandas as pd
import pymysql
import backtrader as bt
import config
# 从 MySQL 提取数据
def fetch_data_from_mysql():
connection = pymysql.connect(**config.db_config)
# ['datetime', 'open', 'high', 'low', 'close', 'volume', 'openinterest']]
#query = "SELECT code, time_key, qfq_close as close FROM hs300_ajust_kline_202410 where time_key>='2023-01-01 00:00:00' "
query = "SELECT code, time_key, open, high, low, close, volume FROM hs300_his_kline_none where time_key>='2023-01-01 00:00:00' "
data = pd.read_sql(query, connection)
data['time_key'] = pd.to_datetime(data['time_key'], errors='coerce') # 确保为datetime格式
data['openinterest'] = 0 # 添加 openinterest 列并赋值为 0
connection.close()
return data
# 格式化数据为Backtrader兼容的格式
data = fetch_data_from_mysql()
data_by_code = {code: df for code, df in data.groupby('code')}
class BestPortfolioStrategy(bt.Strategy):
params = dict(
max_stocks=30,
rebalance_monthly=True,
commission=0.0003,
slippage=0.005,
maperiod = 15,
printlog = False
)
def __init__(self):
self.stocks = [] # 持有的股票
self.rebalance_date = None
def next(self):
# 判断是否是月初
if self.rebalance_date and self.data.datetime.date(0) < self.rebalance_date.date():
return
print(f"rebalance date: {self.rebalance_date}")
# 设置下次调仓日
self.rebalance_date = self.data.datetime.date(0).replace(day=1) + pd.DateOffset(months=1)
# 选股逻辑计算过去一段时间的收益率选取最高的30只股票
returns = {}
for data in self.datas:
returns[data._name] = (data.close[0] / data.close[-20]) - 1 # 上个月的收益率
# 按收益率排序并选择最佳组合
sorted_stocks = sorted(returns, key=returns.get, reverse=True)[:self.params.max_stocks]
print(sorted_stocks)
# 调仓:卖出非选中股票,买入新选股票
self.rebalance(sorted_stocks)
def rebalance(self, selected_stocks):
for stock in self.stocks:
if stock not in selected_stocks:
self.close(self.getdatabyname(stock))
for stock in selected_stocks:
if stock not in self.stocks:
self.buy(self.getdatabyname(stock), size=100)
self.stocks = selected_stocks
# 交易记录日志(可省略,默认不输出结果)
def log(self, txt, dt=None, doprint=False):
if self.params.printlog or doprint:
dt = dt or self.datas[0].datetime.date(0)
print(f'{dt.isoformat()},{txt}')
def notify_order(self, order):
# 未被处理的订单
if order.status in [order.Submitted, order.Accepted]:
return
# 已经处理的订单
if order.status in [order.Completed, order.Canceled, order.Margin]:
if order.isbuy():
self.log(
'BUY EXECUTED, ref:%.0fPrice: %.2f, Cost: %.2f, Comm %.2f, Size: %.2f, Stock: %s' %
(order.ref, # 订单编号
order.executed.price, # 成交价
order.executed.value, # 成交额
order.executed.comm, # 佣金
order.executed.size, # 成交量
order.data._name)) # 股票名称
else: # Sell
self.log('SELL EXECUTED, ref:%.0f, Price: %.2f, Cost: %.2f, Comm %.2f, Size: %.2f, Stock: %s' %
(order.ref,
order.executed.price,
order.executed.value,
order.executed.comm,
order.executed.size,
order.data._name))
cerebro = bt.Cerebro()
cerebro.broker.setcash(1_000_000) # 初始资金 100万
cerebro.broker.setcommission(commission=0.0003) # 设置交易费率 万分之三
cerebro.broker.set_slippage_perc(0.005) # 设置滑点 0.005
# 加载数据
for code, df in data_by_code.items():
# 删除缺失值
df = df.dropna(subset=['time_key'])
# 设置DataFrame为Backtrader的数据格式
data_feed = bt.feeds.PandasData(
dataname=df,
datetime='time_key',
openinterest=None # 如果没有openinterest列则为None
)
#bt_data = bt.feeds.PandasData(dataname=df)
cerebro.adddata(data_feed, name=code)
cerebro.addanalyzer(bt.analyzers.TimeReturn, _name='pnl') # 返回收益率时序数据
cerebro.addanalyzer(bt.analyzers.AnnualReturn, _name='_AnnualReturn') # 年化收益率
cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='_SharpeRatio') # 夏普比率
cerebro.addanalyzer(bt.analyzers.DrawDown, _name='_DrawDown') # 回撤
# 加载策略
cerebro.addstrategy(BestPortfolioStrategy, printlog=True)
# 运行回测
results = cerebro.run()
# 获取分析结果
results = cerebro.run()
strat = results[0]
# 返回日度收益率序列
daily_return = pd.Series(strat.analyzers.pnl.get_analysis())
# 打印评价指标
print("--------------- AnnualReturn -----------------")
print(strat.analyzers._AnnualReturn.get_analysis())
print("--------------- SharpeRatio -----------------")
print(strat.analyzers._SharpeRatio.get_analysis())
print("--------------- DrawDown -----------------")
print(strat.analyzers._DrawDown.get_analysis())

View File

@ -0,0 +1,83 @@
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.optimize as sco
import yfinance as yf
# 下载资产历史数据
assets = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA'] # 资产代码
data = yf.download(assets, start="2020-01-01", end="2023-01-01")['Adj Close']
print(data)
# 计算每日收益率
returns = data.pct_change().dropna()
print(returns)
# 计算年化预期收益和协方差矩阵
annual_returns = returns.mean() * 252
cov_matrix = returns.cov() * 252
print(annual_returns)
print(cov_matrix)
# 定义组合的预期收益和风险(方差)
def portfolio_performance(weights, mean_returns, cov_matrix):
returns = np.sum(weights * mean_returns)
volatility = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights)))
return returns, volatility
# 定义目标函数:最小化风险(方差)
def minimize_volatility(weights, mean_returns, cov_matrix):
return portfolio_performance(weights, mean_returns, cov_matrix)[1]
# 定义约束条件
def constraint_sum(weights):
return np.sum(weights) - 1
# 初始化权重和边界每个资产的权重在0-1之间
num_assets = len(assets)
bounds = tuple((0, 1) for asset in range(num_assets))
initial_weights = num_assets * [1. / num_assets] # 初始等权重
# 定义约束条件总权重为1
constraints = ({'type': 'eq', 'fun': constraint_sum})
# 优化组合,使得组合的方差最小
opt_result = sco.minimize(minimize_volatility, initial_weights, args=(annual_returns, cov_matrix),
method='SLSQP', bounds=bounds, constraints=constraints)
# 提取最优权重
optimal_weights = opt_result.x
# 计算最优组合的收益和风险
optimal_return, optimal_volatility = portfolio_performance(optimal_weights, annual_returns, cov_matrix)
# 输出最优组合结果
print("最优资产组合权重:")
for i, asset in enumerate(assets):
print(f"{asset}: {optimal_weights[i]:.2%}")
print(f"\n最优组合的年化预期收益: {optimal_return:.2%}")
print(f"最优组合的年化风险(标准差): {optimal_volatility:.2%}")
# 可视化有效前沿
def plot_efficient_frontier(mean_returns, cov_matrix, num_portfolios=10000):
results = np.zeros((3, num_portfolios))
for i in range(num_portfolios):
weights = np.random.random(num_assets)
weights /= np.sum(weights)
portfolio_return, portfolio_volatility = portfolio_performance(weights, mean_returns, cov_matrix)
results[0, i] = portfolio_return
results[1, i] = portfolio_volatility
results[2, i] = results[0, i] / results[1, i] # 计算夏普比率
plt.figure(figsize=(10, 6))
plt.scatter(results[1, :], results[0, :], c=results[2, :], cmap='viridis')
plt.colorbar(label='Sharpe Ratio')
plt.scatter(optimal_volatility, optimal_return, c='red', marker='*', s=200) # 最优组合
plt.title('Efficient Frontier')
plt.xlabel('Volatility (Risk)')
plt.ylabel('Return')
plt.show()
# 绘制有效前沿
plot_efficient_frontier(annual_returns, cov_matrix)