import time from futu import * import pymysql from datetime import datetime import logging # MySQL 配置 db_config = { 'host': '172.18.0.2', 'user': 'root', 'password': 'mysqlpw', 'database': 'stockdb' } # 市场映射 market_mapping = { 'US': Market.US, 'HK': Market.HK, 'SZ': Market.SZ, 'SH': Market.SH } # 配置日志记录 logging.basicConfig(filename='mysql_errors.log', level=logging.ERROR, format='%(asctime)s %(levelname)s:%(message)s') # 板块集合类型映射 plat_mapping = { 'INDUSTRY': Plate.INDUSTRY, 'ALL': Plate.ALL, 'CONCEPT': Plate.CONCEPT } # 建立 MySQL 连接 connection = pymysql.connect(**db_config) # 定义插入或更新函数 def insert_or_update_data(connection, data, market, plat): try: with connection.cursor() as cursor: for index, row in data.iterrows(): code = row['code'] plate_name = row['plate_name'] plate_id = row['plate_id'] up_date = datetime.now().strftime('%Y-%m-%d') # 当前日期 # MySQL 插入或更新语句 sql = """ INSERT INTO plat_list (code, plate_name, plate_id, market, plat, up_date) VALUES (%s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE plate_name = VALUES(plate_name), plate_id = VALUES(plate_id), market = VALUES(market), up_date = VALUES(up_date) """ cursor.execute(sql, (code, plate_name, plate_id, market, plat, up_date)) # 提交事务 connection.commit() except pymysql.MySQLError as e: # 捕获 MySQL 错误并打印日志 print(f"Error occurred while inserting/updating data for market {market}, plat {plat}: {e}") # 可根据需要记录到文件或其他日志工具 # 初始化行情连接 quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111) try: # 双重循环:市场和板块类型 for market in market_mapping: for plat_name, plat_enum in plat_mapping.items(): # 拉取数据 ret, data = quote_ctx.get_plate_list(market_mapping[market], plat_enum) if ret == RET_OK: row_count = len(data) # 获取行数 print(f"成功获取 {market} 市场的 {plat_name} 板块数据,共 {row_count} 行") # 插入或更新数据到 MySQL insert_or_update_data(connection, data, market, plat_name) else: print(f"获取 {market} 市场的 {plat_name} 板块数据失败: {data}") # 每次循环后休眠10秒 time.sleep(10) finally: quote_ctx.close() # 关闭行情连接 connection.close() # 关闭 MySQL 连接