Files
stock/stockapp/stat_adjust_kline.py
2024-10-09 11:46:30 +08:00

218 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Script Name: stat_adjust_kline.py
Description: 根据从数据源获取的历史K线计算股票的前复权和后复权值.
注意:
本程序只适合沪深300的前复权和后复权计算使用的是富途提供的不复权数据和复权因子。
处理sp500有问题yahoo 提供的不复权收据 + 富途提供的复权因子计算前复权与yahoo提供的前复权数据差别很大。
尚未找到原因所以目前只能用hs300
Author: [Your Name]
Created Date: YYYY-MM-DD
Last Modified: YYYY-MM-DD
Version: 1.0
Modification History:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
"""
import pymysql
import logging
import sys
import time
import config
import argparse
# 配置不同市场的表名管理
tables_mapping = {
'sp500': {
'none_his_kline': 'sp500_his_kline_none',
'adjust_his_kline': 'sp500_ajust_kline_202410',
'rehab_table': 'futu_rehab'
},
'hs300': {
'none_his_kline': 'hs300_his_kline_none',
'adjust_his_kline': 'hs300_ajust_kline_202410',
'rehab_table': 'futu_rehab'
}
}
# 日志配置
config.setup_logging("./log/stat_adjust_kline.log")
logger = logging.getLogger()
# MySQL数据库连接
def connect_to_db():
try:
return pymysql.connect(
**config.db_config,
cursorclass=pymysql.cursors.DictCursor
)
except pymysql.MySQLError as e:
logger.error(f"Error connecting to the database: {e}", exc_info=True)
return None
# 从指定市场表中读取code和code_name字段
def fetch_codes_from_market_table(market, debug=False):
db = connect_to_db()
if db is None:
logger.error("Failed to connect to database.")
return []
try:
with db.cursor() as cursor:
query = f"SELECT code, code_name FROM {market} "
if debug:
#query += " LIMIT 2"
query += " where code in ('GE', 'WTW')"
cursor.execute(query)
codes = cursor.fetchall()
return codes
finally:
db.close()
# 读取复权因子数据
def fetch_rehab_data(db, code):
try:
with db.cursor() as cursor:
cursor.execute(f"SELECT ex_div_date, forward_adj_factorA, forward_adj_factorB, backward_adj_factorA, backward_adj_factorB FROM futu_rehab WHERE code = %s ORDER BY ex_div_date DESC", (code,))
return cursor.fetchall()
except pymysql.MySQLError as e:
logger.error(f"Error fetching rehab data for {code}: {e}", exc_info=True)
return []
# 读取不复权的股票价格数据
def fetch_kline_none_data(db, table_name, code):
try:
with db.cursor() as cursor:
cursor.execute(f"SELECT code, time_key, open, close FROM {table_name} WHERE code = %s ORDER BY time_key ASC", (code,))
return cursor.fetchall()
except pymysql.MySQLError as e:
logger.error(f"Error fetching kline none data for {code}: {e}", exc_info=True)
return []
# 插入前后复权价格到数据库
def insert_hfq_data(db, hfq_data, hfq_table):
try:
with db.cursor() as cursor:
insert_query = f"""
INSERT INTO {hfq_table} (code, name, time_key, hfq_open, hfq_close, qfq_open, qfq_close, none_open, none_close)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
hfq_open = VALUES(hfq_open),
hfq_close = VALUES(hfq_close),
qfq_open = VALUES(qfq_open),
qfq_close = VALUES(qfq_close),
none_open = VALUES(none_open),
none_close = VALUES(none_close)
"""
cursor.executemany(insert_query, hfq_data)
db.commit()
except pymysql.MySQLError as e:
logger.error(f"Error inserting hfq data: {e}", exc_info=True)
# 计算后复权价格和前复权价格
def calculate_hfq_qfq_price(market, debug=False):
db = connect_to_db()
if db is None:
return
# 获取表名
table_names = tables_mapping[market]
none_his_kline_table = table_names['none_his_kline']
adjust_kline_table = table_names['adjust_his_kline']
# 获取股票代码
codes = fetch_codes_from_market_table(market, debug)
isSP500 = True if market == 'sp500' else False
for row in codes:
code = row['code']
name = row['code_name']
# 如果是 sp500 市场,拼接 'US.' + code
rehab_code = code
if isSP500:
rehab_code = 'US.' + code
logger.info(f"Processing {code} ({name})...")
# 获取复权因子数据,没有的话就直接使用不复权值
rehab_res = fetch_rehab_data(db, rehab_code)
if not rehab_res:
logger.warning(f"No rehab data found for {code}, use non close")
rehab_res = list()
# 反转复权因子行,为了计算前复权
rehab_res_asc = list(reversed(rehab_res))
# 获取不复权的价格数据
kline_none = fetch_kline_none_data(db, none_his_kline_table, code)
if not kline_none:
logger.warning(f"No kline none data found for {code}")
continue
hfq_data = []
# 遍历kline_none计算前后复权价格
for kline_row in kline_none:
none_open = kline_row['open']
none_close = kline_row['close']
time_key = kline_row['time_key']
# 将 time_key 转换为 date 格式
time_key_date = time_key.date()
# 计算后复权价格
hfq_open = none_open
hfq_close = none_close
tmp_close = none_close
tmp_open = none_open
for rehab_row in rehab_res:
# Yahoo Finance 提供的不复权数据,已经处理了拆股;所以对于富途复权因子表里的拆股数据,要忽略
if isSP500 and rehab_row['backward_adj_factorA'] != 1 :
continue
if rehab_row['ex_div_date'] <= time_key_date:
hfq_close = (tmp_close * rehab_row['backward_adj_factorA']) + rehab_row['backward_adj_factorB']
hfq_open = (tmp_open * rehab_row['backward_adj_factorA']) + rehab_row['backward_adj_factorB']
tmp_close = hfq_close
tmp_open = hfq_open
# 计算前复权价格
qfq_close = none_close
qfq_open = none_open
tmp_close = none_close
tmp_open = none_open
for rehab_row in rehab_res_asc:
# Yahoo Finance 提供的不复权数据,已经处理了拆股;所以对于富途复权因子表里的拆股和合股数据,要忽略
if isSP500 and rehab_row['backward_adj_factorA'] != 1 :
continue
# 富途对美股的前复权价格计算,要忽略 forward_adj_factorB
# https://openapi.futunn.com/futu-api-doc/qa/quote.html
factorB = 0 if isSP500 else rehab_row['forward_adj_factorB']
if rehab_row['ex_div_date'] > time_key_date:
qfq_close = (tmp_close * rehab_row['forward_adj_factorA']) + factorB
qfq_open = (tmp_open * rehab_row['forward_adj_factorA']) + factorB
tmp_close = qfq_close
tmp_open = qfq_open
# 保存计算后的复权价格
hfq_data.append((code, name, time_key, hfq_open, hfq_close, qfq_open, qfq_close, none_open, none_close))
# 插入后复权价格数据
insert_hfq_data(db, hfq_data, adjust_kline_table)
logger.info(f"Inserted HFQ/QFQ data for {code} ({name})")
time.sleep(1)
db.close()
if __name__ == "__main__":
# 命令行参数处理
parser = argparse.ArgumentParser(description='Calculate HFQ and QFQ Prices for Market')
parser.add_argument('--market', type=str, default='hs300', help='Market to process (sp500 or hs300)')
parser.add_argument('--debug', action='store_true', help='Enable debug mode (limit records)')
args = parser.parse_args()
# 调用主函数
calculate_hfq_qfq_price(args.market, args.debug)