""" Script Name: Description: 获取沪深300成分股的最新股价, 并计算年内涨幅, 924以来的涨幅, 市盈率, 股息率等。 需要调用futu的获取快照接口。 https://openapi.futunn.com/futu-api-doc/quote/get-market-snapshot.html Author: [Your Name] Created Date: YYYY-MM-DD Last Modified: YYYY-MM-DD Version: 1.0 Modification History: - YYYY-MM-DD [Your Name]: - YYYY-MM-DD [Your Name]: - YYYY-MM-DD [Your Name]: """ import pymysql import logging import csv import os import config import time from datetime import datetime from futu import OpenQuoteContext, RET_OK # Futu API client # 配置日志 config.setup_logging() # 1. 获取 hs300 表数据 def get_hs300_data(): conn = pymysql.connect(**config.db_config) cursor = conn.cursor(pymysql.cursors.DictCursor) cursor.execute("SELECT code, code_name FROM hs300") hs300_data = cursor.fetchall() cursor.close() conn.close() return hs300_data # 2. 批量获取市场快照 def get_market_snapshots(codes): quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111) # 替换为实际Futu API地址和端口 snapshot_results = [] batch_size = 350 for i in range(0, len(codes), batch_size): code_batch = codes[i:i+batch_size] ret, data = quote_ctx.get_market_snapshot(code_batch) if ret == RET_OK: snapshot_results.extend(data.to_dict('records')) else: logging.error(f"获取市场快照失败: {data}") quote_ctx.close() return snapshot_results # 3. 插入或更新 futu_market_snapshot 表 def insert_or_update_snapshot(snapshot_data): conn = pymysql.connect(**config.db_config) cursor = conn.cursor() query = """ INSERT INTO futu_market_snapshot ( code, name, update_time, last_price, open_price, high_price, low_price, prev_close_price, volume, turnover, turnover_rate, suspension, listing_date, equity_valid, issued_shares, total_market_val, net_asset, net_profit, earning_per_share, outstanding_shares, net_asset_per_share, circular_market_val, ey_ratio, pe_ratio, pb_ratio, pe_ttm_ratio, dividend_ttm, dividend_ratio_ttm, dividend_lfy, dividend_lfy_ratio ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE last_price = VALUES(last_price), open_price = VALUES(open_price), high_price = VALUES(high_price), low_price = VALUES(low_price), prev_close_price = VALUES(prev_close_price), volume = VALUES(volume), turnover = VALUES(turnover), turnover_rate = VALUES(turnover_rate), suspension = VALUES(suspension), issued_shares = VALUES(issued_shares), total_market_val = VALUES(total_market_val), net_asset = VALUES(net_asset), net_profit = VALUES(net_profit), earning_per_share = VALUES(earning_per_share), outstanding_shares = VALUES(outstanding_shares), net_asset_per_share = VALUES(net_asset_per_share), circular_market_val = VALUES(circular_market_val), ey_ratio = VALUES(ey_ratio), pe_ratio = VALUES(pe_ratio), pb_ratio = VALUES(pb_ratio), pe_ttm_ratio = VALUES(pe_ttm_ratio), dividend_ttm = VALUES(dividend_ttm), dividend_ratio_ttm = VALUES(dividend_ratio_ttm), dividend_lfy = VALUES(dividend_lfy), dividend_lfy_ratio = VALUES(dividend_lfy_ratio) """ try: for record in snapshot_data: cursor.execute(query, ( record['code'], record['name'], record['update_time'], record['last_price'], record['open_price'], record['high_price'], record['low_price'], record['prev_close_price'], record['volume'], record['turnover'], record['turnover_rate'], record['suspension'], record['listing_date'], record['equity_valid'], record['issued_shares'], record['total_market_val'], record['net_asset'], record['net_profit'], record['earning_per_share'], record['outstanding_shares'], record['net_asset_per_share'], record['circular_market_val'], record['ey_ratio'], record['pe_ratio'], record['pb_ratio'], record['pe_ttm_ratio'], record['dividend_ttm'], record['dividend_ratio_ttm'], record['dividend_lfy'], record['dividend_lfy_ratio'] )) conn.commit() except Exception as e: logging.error(f"插入或更新快照数据时出错: {e}") conn.rollback() finally: cursor.close() conn.close() def process_snapshot_data(): hs300_data = get_hs300_data() codes = [item['code'] for item in hs300_data] # 分批拉取市场快照数据 snapshot_data = get_market_snapshots(codes) # 插入或更新数据到数据库 insert_or_update_snapshot(snapshot_data) logging.info(f"Successfully get market snapshots and write to db.") def write_to_csv(file_path, fieldnames, data): """ Writes data to a CSV file. :param file_path: Path to the CSV file. :param fieldnames: A list of field names (headers) for the CSV file. :param data: A list of dictionaries where each dictionary represents a row. """ try: # Check if file exists to determine if we need to write headers file_exists = os.path.isfile(file_path) # Open file in append mode ('a' means append, newline='' is needed for correct CSV formatting) with open(file_path, mode='w+', newline='', encoding='utf-8') as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fieldnames) # Write headers writer.writeheader() # Write the data rows writer.writerows(data) logging.info(f"Successfully wrote data to {file_path}") except Exception as e: logging.error(f"Error while writing to CSV file {file_path}: {str(e)}") raise def calculate_yield(): conn = pymysql.connect(**config.db_config) cursor = conn.cursor(pymysql.cursors.DictCursor) try: hs300_data = get_hs300_data() results = [] for item in hs300_data: code = item['code'] name = item['code_name'] # 1. 获取最近快照数据 cursor.execute("SELECT * FROM futu_market_snapshot WHERE code=%s ORDER BY update_time DESC LIMIT 1", (code,)) row1 = cursor.fetchone() row1['close'] = row1['last_price'] row1['time_key'] = row1['update_time'] # 2. 获取 2024-01-01 的年初数据 cursor.execute("SELECT * FROM hs300_his_kline_none WHERE code=%s AND time_key<'2024-01-01' ORDER BY time_key DESC LIMIT 1", (code,)) row2 = cursor.fetchone() # 3. 获取 2024-09-24 的初始数据 cursor.execute("SELECT * FROM hs300_his_kline_none WHERE code=%s AND time_key<'2024-09-24' ORDER BY time_key DESC LIMIT 1", (code,)) row3 = cursor.fetchone() # 3.1 获取 2021-01-01 之后的历史数据中最大值(前复权) cursor.execute("SELECT * FROM hs300_qfq_his WHERE code=%s AND time_key>='2021-01-01' ORDER BY close DESC LIMIT 1", (code,)) row4 = cursor.fetchone() # 3.2 获取 2021-01-01 之后的历史数据中最小值(前复权) cursor.execute("SELECT * FROM hs300_qfq_his WHERE code=%s AND time_key>='2021-01-01' ORDER BY close ASC LIMIT 1", (code,)) row5 = cursor.fetchone() # 3.4 获取 2021-01-01 之后的历史数据中pe最大值 cursor.execute("SELECT * FROM hs300_qfq_his WHERE code=%s AND time_key>='2021-01-01' ORDER BY pe_ratio DESC LIMIT 1", (code,)) row6 = cursor.fetchone() # 3.5 获取 2021-01-01 之后的历史数据中pe最小值 cursor.execute("SELECT * FROM hs300_qfq_his WHERE code=%s AND time_key>='2021-01-01' ORDER BY pe_ratio ASC LIMIT 1", (code,)) row7 = cursor.fetchone() # 4. 读取复权数据 cursor.execute("SELECT * FROM futu_rehab WHERE code=%s ORDER BY ex_div_date ASC", (code,)) rehab_res = cursor.fetchall() # 5. 计算复权价格 for row in [row1, row2, row3]: qfq_close = row['close'] time_key_date = row['time_key'].date() # 将 datetime 转换为 date for rehab in rehab_res: if rehab['ex_div_date'] >= time_key_date: qfq_close = (qfq_close * rehab['forward_adj_factorA']) + rehab['forward_adj_factorB'] row['qfq_close'] = qfq_close # 6. 计算收益率 year_yield = row1['qfq_close'] / row2['qfq_close'] - 1 if row1 and row2 else None yield_0924 = row1['qfq_close'] / row3['qfq_close'] - 1 if row3 and row1 else None # 6.1 计算当前股价是 2021 年之后最高点及最低点的百分比 max_price_pct = row1['qfq_close'] / row4['close'] if row1 and row4 and row4['close'] !=0 else None max_price_pe_pct = row1['pe_ttm_ratio'] / row4['pe_ratio'] if row1 and row4 and row4['pe_ratio'] !=0 else None min_price_pct = row1['qfq_close'] / row5['close'] if row1 and row5 and row5['close'] !=0 else None min_price_pe_pct = row1['pe_ttm_ratio'] / row5['pe_ratio'] if row1 and row5 and row5['pe_ratio'] !=0 else None max_pe_pct = row1['pe_ttm_ratio'] / row6['pe_ratio'] if row1 and row6 and row6['pe_ratio'] !=0 else None min_pe_pct = row1['pe_ttm_ratio'] / row7['pe_ratio'] if row1 and row7 and row7['pe_ratio'] !=0 else None # 7. 收集结果 result = { 'code': code, 'name': name, 'year_begin_date': row2['time_key'].date(), 'year_begin_close': row2['qfq_close'], '0924_date': row3['time_key'].date(), '0924_close': row3['qfq_close'], 'max_price_date': row4['time_key'].date(), 'max_price': row4['close'], 'max_price_pe': row4['pe_ratio'], 'max_pe_date': row6['time_key'].date(), 'max_pe': row6['pe_ratio'], 'min_price_date': row5['time_key'].date(), 'min_price': row5['close'], 'min_price_pe': row5['pe_ratio'], 'min_pe_date': row7['time_key'].date(), 'min_pe': row7['pe_ratio'], 'latest_date': row1['time_key'].date(), 'latest_close': row1['qfq_close'], 'year_yield': year_yield, 'yield_0924': yield_0924, 'total_market_val': row1.get('total_market_val', None), 'pe_ttm_ratio': row1.get('pe_ttm_ratio', None), 'dividend_ratio_ttm': row1.get('dividend_ratio_ttm', None), 'dividend_lfy_ratio': row1.get('dividend_lfy_ratio', None), 'max_price_pct': max_price_pct, 'max_price_pe_pct': max_price_pe_pct, 'min_price_pct': min_price_pct, 'min_price_pe_pct': min_price_pe_pct, 'max_pe_pct': max_pe_pct, 'min_pe_pct': min_pe_pct, } results.append(result) logging.info(f"{result}") time.sleep(0.1) # 写入CSV # 指定字段名称 # 获取当前日期 格式化为 yyyymmdd current_date = datetime.now() date_string = current_date.strftime('%Y%m%d') fieldnames = ['code', 'name', 'year_begin_date', 'year_begin_close', '0924_date', '0924_close', 'max_price_date', 'max_price', 'max_price_pe', 'max_pe_date', 'max_pe', 'min_price_date', 'min_price', 'min_price_pe', 'min_pe_date', 'min_pe', 'latest_date', 'latest_close', 'year_yield', 'yield_0924', 'total_market_val', 'pe_ttm_ratio', 'dividend_ratio_ttm', 'dividend_lfy_ratio', 'max_price_pct', 'max_price_pe_pct', 'min_price_pct', 'min_price_pe_pct', 'max_pe_pct', 'min_pe_pct' ] write_to_csv(f'./result/stat_growth{date_string}.csv', fieldnames, results) except Exception as e: logging.error(f"计算收益率时出错: {e}") finally: cursor.close() conn.close() if __name__ == "__main__": process_snapshot_data() calculate_yield()