This repository has been archived on 2026-01-07. You can view files and clone it, but cannot push or open issues or pull requests.
Files
resources/stockapp/stat_growth.py
2024-10-12 18:45:24 +08:00

217 lines
9.3 KiB
Python

"""
Script Name:
Description: 获取沪深300成分股的最新股价, 并计算年内涨幅, 924以来的涨幅, 市盈率, 股息率等。
需要调用futu的获取快照接口。
https://openapi.futunn.com/futu-api-doc/quote/get-market-snapshot.html
Author: [Your Name]
Created Date: YYYY-MM-DD
Last Modified: YYYY-MM-DD
Version: 1.0
Modification History:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
"""
import pymysql
import logging
import csv
import os
import config
import time
from datetime import datetime
from futu import OpenQuoteContext, RET_OK # Futu API client
# 配置日志
config.setup_logging()
# 1. 获取 hs300 表数据
def get_hs300_data():
conn = pymysql.connect(**config.db_config)
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute("SELECT code, code_name FROM hs300")
hs300_data = cursor.fetchall()
cursor.close()
conn.close()
return hs300_data
# 2. 批量获取市场快照
def get_market_snapshots(codes):
quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111) # 替换为实际Futu API地址和端口
snapshot_results = []
batch_size = 350
for i in range(0, len(codes), batch_size):
code_batch = codes[i:i+batch_size]
ret, data = quote_ctx.get_market_snapshot(code_batch)
if ret == RET_OK:
snapshot_results.extend(data.to_dict('records'))
else:
logging.error(f"获取市场快照失败: {data}")
quote_ctx.close()
return snapshot_results
# 3. 插入或更新 futu_market_snapshot 表
def insert_or_update_snapshot(snapshot_data):
conn = pymysql.connect(**config.db_config)
cursor = conn.cursor()
query = """
INSERT INTO futu_market_snapshot (
code, name, update_time, last_price, open_price, high_price, low_price, prev_close_price,
volume, turnover, turnover_rate, suspension, listing_date, equity_valid, issued_shares,
total_market_val, net_asset, net_profit, earning_per_share, outstanding_shares, net_asset_per_share,
circular_market_val, ey_ratio, pe_ratio, pb_ratio, pe_ttm_ratio, dividend_ttm, dividend_ratio_ttm,
dividend_lfy, dividend_lfy_ratio
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
last_price = VALUES(last_price), open_price = VALUES(open_price), high_price = VALUES(high_price),
low_price = VALUES(low_price), prev_close_price = VALUES(prev_close_price), volume = VALUES(volume),
turnover = VALUES(turnover), turnover_rate = VALUES(turnover_rate), suspension = VALUES(suspension),
issued_shares = VALUES(issued_shares), total_market_val = VALUES(total_market_val), net_asset = VALUES(net_asset),
net_profit = VALUES(net_profit), earning_per_share = VALUES(earning_per_share), outstanding_shares = VALUES(outstanding_shares),
net_asset_per_share = VALUES(net_asset_per_share), circular_market_val = VALUES(circular_market_val),
ey_ratio = VALUES(ey_ratio), pe_ratio = VALUES(pe_ratio), pb_ratio = VALUES(pb_ratio),
pe_ttm_ratio = VALUES(pe_ttm_ratio), dividend_ttm = VALUES(dividend_ttm), dividend_ratio_ttm = VALUES(dividend_ratio_ttm),
dividend_lfy = VALUES(dividend_lfy), dividend_lfy_ratio = VALUES(dividend_lfy_ratio)
"""
try:
for record in snapshot_data:
cursor.execute(query, (
record['code'], record['name'], record['update_time'], record['last_price'], record['open_price'],
record['high_price'], record['low_price'], record['prev_close_price'], record['volume'], record['turnover'],
record['turnover_rate'], record['suspension'], record['listing_date'], record['equity_valid'], record['issued_shares'],
record['total_market_val'], record['net_asset'], record['net_profit'], record['earning_per_share'], record['outstanding_shares'],
record['net_asset_per_share'], record['circular_market_val'], record['ey_ratio'], record['pe_ratio'], record['pb_ratio'],
record['pe_ttm_ratio'], record['dividend_ttm'], record['dividend_ratio_ttm'], record['dividend_lfy'], record['dividend_lfy_ratio']
))
conn.commit()
except Exception as e:
logging.error(f"插入或更新快照数据时出错: {e}")
conn.rollback()
finally:
cursor.close()
conn.close()
def process_snapshot_data():
hs300_data = get_hs300_data()
codes = [item['code'] for item in hs300_data]
# 分批拉取市场快照数据
snapshot_data = get_market_snapshots(codes)
# 插入或更新数据到数据库
insert_or_update_snapshot(snapshot_data)
logging.info(f"Successfully get market snapshots and write to db.")
def write_to_csv(file_path, fieldnames, data):
"""
Writes data to a CSV file.
:param file_path: Path to the CSV file.
:param fieldnames: A list of field names (headers) for the CSV file.
:param data: A list of dictionaries where each dictionary represents a row.
"""
try:
# Check if file exists to determine if we need to write headers
file_exists = os.path.isfile(file_path)
# Open file in append mode ('a' means append, newline='' is needed for correct CSV formatting)
with open(file_path, mode='w+', newline='', encoding='utf-8') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
# Write headers
writer.writeheader()
# Write the data rows
writer.writerows(data)
logging.info(f"Successfully wrote data to {file_path}")
except Exception as e:
logging.error(f"Error while writing to CSV file {file_path}: {str(e)}")
raise
def calculate_yield():
conn = pymysql.connect(**config.db_config)
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
hs300_data = get_hs300_data()
results = []
for item in hs300_data:
code = item['code']
name = item['code_name']
# 1. 获取最近快照数据
cursor.execute("SELECT * FROM futu_market_snapshot WHERE code=%s ORDER BY update_time DESC LIMIT 1", (code,))
row1 = cursor.fetchone()
row1['close'] = row1['last_price']
row1['time_key'] = row1['update_time']
# 2. 获取 2024-01-01 之前的历史数据
cursor.execute("SELECT * FROM hs300_his_kline_none WHERE code=%s AND time_key<'2024-01-01' ORDER BY time_key DESC LIMIT 1", (code,))
row2 = cursor.fetchone()
# 3. 获取 2024-09-24 之前的历史数据
cursor.execute("SELECT * FROM hs300_his_kline_none WHERE code=%s AND time_key<'2024-09-24' ORDER BY time_key DESC LIMIT 1", (code,))
row3 = cursor.fetchone()
# 4. 读取复权数据
cursor.execute("SELECT * FROM futu_rehab WHERE code=%s ORDER BY ex_div_date ASC", (code,))
rehab_res = cursor.fetchall()
# 5. 计算复权价格
for row in [row1, row2, row3]:
qfq_close = row['close']
time_key_date = row['time_key'].date() # 将 datetime 转换为 date
for rehab in rehab_res:
if rehab['ex_div_date'] >= time_key_date:
qfq_close = (qfq_close * rehab['forward_adj_factorA']) + rehab['forward_adj_factorB']
row['qfq_close'] = qfq_close
# 6. 计算收益率
year_yield = row1['qfq_close'] / row2['qfq_close'] - 1 if row1 and row2 else None
yield_0924 = row1['qfq_close'] / row3['qfq_close'] - 1 if row3 and row1 else None
# 7. 收集结果
result = {
'code': code,
'name': name,
'year_begin_date': row2['time_key'].date(),
'year_begin_close': row2['close'],
'0924_date': row3['time_key'].date(),
'0924_close': row3['close'],
'latest_date': row1['time_key'].date(),
'latest_close': row1['close'],
'year_yield': year_yield,
'yield_0924': yield_0924,
'total_market_val': row1.get('total_market_val', None),
'pe_ttm_ratio': row1.get('pe_ttm_ratio', None),
'dividend_ratio_ttm': row1.get('dividend_ratio_ttm', None),
'dividend_lfy_ratio': row1.get('dividend_lfy_ratio', None),
}
results.append(result)
logging.info(f"{result}")
time.sleep(0.1)
# 写入CSV
# 指定字段名称
# 获取当前日期 格式化为 yyyymmdd
current_date = datetime.now()
date_string = current_date.strftime('%Y%m%d')
fieldnames = ['code', 'name', 'year_begin_date', 'year_begin_close', '0924_date', '0924_close', 'latest_date', 'latest_close', 'year_yield', 'yield_0924', 'total_market_val', 'pe_ttm_ratio', 'dividend_ratio_ttm', 'dividend_lfy_ratio']
write_to_csv(f'./result/stat_growth{date_string}.csv', fieldnames, results)
except Exception as e:
logging.error(f"计算收益率时出错: {e}")
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
process_snapshot_data()
calculate_yield()