""" Script Name: Description: 从富途获取复权因子数据。 参考地址: https://openapi.futunn.com/futu-api-doc/quote/get-rehab.html Author: [Your Name] Created Date: YYYY-MM-DD Last Modified: YYYY-MM-DD Version: 1.0 Modification History: - YYYY-MM-DD [Your Name]: - YYYY-MM-DD [Your Name]: - YYYY-MM-DD [Your Name]: """ import logging import pymysql import time from futu import * import pandas as pd import config # 设置日志 config.setup_logging("./log/futu_rehab.log") # 连接 MySQL 数据库 def get_mysql_connection(): return pymysql.connect(**config.db_config) # 获取股票代码列表 def get_stock_codes(table_name): connection = get_mysql_connection() with connection.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(f"SELECT code, code_name FROM {table_name} ") result = cursor.fetchall() connection.close() return result # 插入或更新复权信息 def insert_or_update_rehab_data(connection, rehab_data, code, name): try: with connection.cursor() as cursor: sql = """ INSERT INTO futu_rehab (code, name, ex_div_date, forward_adj_factorA, forward_adj_factorB, backward_adj_factorA, backward_adj_factorB) VALUES (%s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE forward_adj_factorA = VALUES(forward_adj_factorA), forward_adj_factorB = VALUES(forward_adj_factorB), backward_adj_factorA = VALUES(backward_adj_factorA), backward_adj_factorB = VALUES(backward_adj_factorB) """ for row in rehab_data.itertuples(index=False): cursor.execute(sql, (code, name, row.ex_div_date, row.forward_adj_factorA, row.forward_adj_factorB, row.backward_adj_factorA, row.backward_adj_factorB)) connection.commit() except pymysql.MySQLError as e: logging.error(f"Error occurred while inserting or updating rehab data: {e}", exc_info=True) # 从 Futu API 获取复权信息 def get_rehab_data(code): quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111) ret, data = quote_ctx.get_rehab(code) quote_ctx.close() if ret == RET_OK: return data else: logging.error(f"Failed to get rehab data for {code}: {data}") return None # 主函数 def process_stock_data(table_name, prefix=''): stocks = get_stock_codes(table_name) connection = get_mysql_connection() for stock in stocks: code = stock['code'] name = stock['code_name'] # 拼接 'US.' 前缀对于 sp500 表中的股票 full_code = f"{prefix}{code}" if prefix else code logging.info(f"Processing {full_code} ({name})") # 获取复权数据 rehab_data = get_rehab_data(full_code) if rehab_data is not None: # 插入或更新复权数据 insert_or_update_rehab_data(connection, rehab_data, full_code, name) logging.info(f"Inserted/Updated rehab data for {full_code} ({name})") time.sleep(3) connection.close() if __name__ == "__main__": # 处理 hs300 表数据,不需要加 'US.' 前缀 process_stock_data("hs300") # 处理 sp500 表数据,加 'US.' 前缀 process_stock_data("sp500", prefix='US.')