""" Script Name: Description: 从yahoo获取美股股票的历史K线, 通过 auto_adjust 参数来控制是否获取前复权数据。默认为true, 如果设置为false, 那么结果中会自动带 adj Close. 参考地址: https://github.com/ranaroussi/yfinance https://aroussi.com/post/python-yahoo-finance Author: [Your Name] Created Date: YYYY-MM-DD Last Modified: YYYY-MM-DD Version: 1.0 Modification History: - YYYY-MM-DD [Your Name]: - YYYY-MM-DD [Your Name]: - YYYY-MM-DD [Your Name]: """ import yfinance as yf import pymysql import logging import time import sys import os from datetime import datetime import config # 引入 config.py 中的配置 # 股票代码集合,如果属于这些股票,则使用 "max" 时间段 special_stock_codes = ('ABNB', 'CARR', 'CEG', 'GEHC', 'GEV', 'HUBB', 'KVUE', 'OTIS', 'PLTR', 'SOLV', 'VLTO') # K线调整选项,决定是否使用前复权价格 kline_adjust = True # 根据 kline_adjust 决定使用的表名 table_name = 'sp500_qfq_his_202410' if kline_adjust else 'sp500_his_kline_none' # 使用 config.py 中的日志配置 filename = os.path.splitext(os.path.basename(__file__))[0] config.setup_logging(f"./log/{filename}.log") logger = logging.getLogger() # MySQL数据库连接 def connect_to_db(): try: #return pymysql.connect(**config.db_config) return pymysql.connect( **config.db_config, cursorclass=pymysql.cursors.DictCursor # 确保使用字典形式的游标 ) except pymysql.MySQLError as e: logger.error(f"Error connecting to the database: {e}", exc_info=True) return None # 从MySQL读取sp500表中的股票代码和名称 def fetch_sp500_codes(): db = connect_to_db() if db is None: logger.error("Failed to connect to database.") return [] try: with db.cursor() as cursor: cursor.execute("SELECT code_inner, code_name FROM sp500") codes = cursor.fetchall() return codes finally: db.close() # 插入数据到指定表名 def insert_stock_data_to_db(data, code, name): try: db = connect_to_db() if db is None: return with db.cursor() as cursor: insert_query = f""" INSERT INTO {table_name} (time_key, open, high, low, close, adj_close, volume, dividends, stock_splits, code, name) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE open = VALUES(open), high = VALUES(high), low = VALUES(low), close = VALUES(close), adj_close = VALUES(adj_close), volume = VALUES(volume), dividends = VALUES(dividends), stock_splits = VALUES(stock_splits) """ # auto_adjust=True: Date,Open,High,Low,Close,Volume,Dividends,Stock Splits # auto_adjust=False: Date,Open,High,Low,Close,Adj Close,Volume,Dividends,Stock Splits for index, row in data.iterrows(): time_key = index.strftime('%Y-%m-%d %H:%M:%S') # 判断 row['Adj Close'] 是否存在,若不存在则使用 0 adj_close = row['Adj Close'] if 'Adj Close' in row else 0 values = (time_key, row['Open'], row['High'], row['Low'], row['Close'], adj_close, row['Volume'], row['Dividends'], row['Stock Splits'], code, name) cursor.execute(insert_query, values) db.commit() except pymysql.MySQLError as e: logger.error(f"Error occurred while inserting data: {e}", exc_info=True) finally: if db: db.close() # 拉取股票的历史数据 def fetch_and_store_stock_data(): codes = fetch_sp500_codes() for row in codes: code_inner = row['code_inner'] code_name = row['code_name'] logger.info(f"Fetching data for {code_name} ({code_inner})...") # 判断使用的时间段,特殊股票使用 max,其他使用 10y period = "max" if code_inner in special_stock_codes else "10y" try: stock = yf.Ticker(code_inner) # 拉取股票历史数据,使用 kline_adjust 决定 auto_adjust 是否为 True hist_data = stock.history(period=period, auto_adjust=kline_adjust) if not hist_data.empty: logger.info(f"Inserting data for {code_name} ({code_inner}) into {table_name}...") insert_stock_data_to_db(hist_data, code_inner, code_name) else: logger.warning(f"No data found for {code_name} ({code_inner})") # 每次请求完后休眠3秒 time.sleep(3) except Exception as e: logger.error(f"Error fetching data for {code_name} ({code_inner}): {e}", exc_info=True) if __name__ == "__main__": fetch_and_store_stock_data()