Files
stock/stockapp/get_plat_list.py
2024-10-09 11:46:30 +08:00

105 lines
3.2 KiB
Python

"""
Script Name:
Description: 从富途获取板块列表。
参考地址: https://openapi.futunn.com/futu-api-doc/quote/get-plate-list.html
Author: [Your Name]
Created Date: YYYY-MM-DD
Last Modified: YYYY-MM-DD
Version: 1.0
Modification History:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
"""
import time
from futu import *
import pymysql
from datetime import datetime
import logging
# MySQL 配置
db_config = {
'host': '172.18.0.2',
'user': 'root',
'password': 'mysqlpw',
'database': 'stockdb'
}
# 市场映射
market_mapping = {
'US': Market.US,
'HK': Market.HK,
'SZ': Market.SZ,
'SH': Market.SH
}
# 配置日志记录
logging.basicConfig(filename='mysql_errors.log', level=logging.ERROR,
format='%(asctime)s %(levelname)s:%(message)s')
# 板块集合类型映射
plat_mapping = {
'INDUSTRY': Plate.INDUSTRY,
'ALL': Plate.ALL,
'CONCEPT': Plate.CONCEPT
}
# 建立 MySQL 连接
connection = pymysql.connect(**db_config)
# 定义插入或更新函数
def insert_or_update_data(connection, data, market, plat):
try:
with connection.cursor() as cursor:
for index, row in data.iterrows():
code = row['code']
plate_name = row['plate_name']
plate_id = row['plate_id']
up_date = datetime.now().strftime('%Y-%m-%d') # 当前日期
# MySQL 插入或更新语句
sql = """
INSERT INTO futu_plat_list (code, plate_name, plate_id, market, plat, up_date)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
plate_name = VALUES(plate_name),
plate_id = VALUES(plate_id),
market = VALUES(market),
up_date = VALUES(up_date)
"""
cursor.execute(sql, (code, plate_name, plate_id, market, plat, up_date))
# 提交事务
connection.commit()
except pymysql.MySQLError as e:
# 捕获 MySQL 错误并打印日志
print(f"Error occurred while inserting/updating data for market {market}, plat {plat}: {e}")
# 可根据需要记录到文件或其他日志工具
# 初始化行情连接
quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
try:
# 双重循环:市场和板块类型
for market in market_mapping:
for plat_name, plat_enum in plat_mapping.items():
# 拉取数据
ret, data = quote_ctx.get_plate_list(market_mapping[market], plat_enum)
if ret == RET_OK:
row_count = len(data) # 获取行数
print(f"成功获取 {market} 市场的 {plat_name} 板块数据,共 {row_count}")
# 插入或更新数据到 MySQL
insert_or_update_data(connection, data, market, plat_name)
else:
print(f"获取 {market} 市场的 {plat_name} 板块数据失败: {data}")
# 每次循环后休眠10秒
time.sleep(10)
finally:
quote_ctx.close() # 关闭行情连接
connection.close() # 关闭 MySQL 连接