import time import logging import pandas as pd import os import sys import config import crawling.stock_hist_em as his_em file_selected_codes = './cursor/his_kline_em_codes.txt' # 指定拉取的代码列表,每行一个代码 file_done_codes = './cursor/his_kline_em_done_codes.txt' # 已完成拉取的代码列表,每行一个代码 dir_his_kline_em = '../data/his_kline_em' config.setup_logging() # 刷新代码列表,并返回 def flush_code_map(): code_id_map_em_df = his_em.code_id_map_em() print(code_id_map_em_df) return code_id_map_em_df # 获取历史K线,如果失败,就重试 def fetch_with_retry(code: str, adjust: str = '', max_retries: int = 20) -> pd.DataFrame : retries = 0 while retries < max_retries: try: # 调用 stock_zh_a_hist 获取历史数据 df = his_em.stock_zh_a_hist( symbol=code, period="daily", start_date="19000101", end_date="20241020", adjust=adjust, ) # 如果获取到的数据为空,记录日志并重试 if df.empty: retries += 1 time.sleep(3) # 每次重试前休眠 3 秒 else: return df except Exception as e: retries += 1 time.sleep(3) # 每次重试前休眠 3 秒 return pd.DataFrame() # 检查子目录是否存在,不存在则创建 def create_directory_if_not_exists(dir_name): if not os.path.exists(dir_name): os.makedirs(dir_name) logging.info(f"Created directory: {dir_name}") # 读取 code.txt 文件,并获取每个股票代码 def read_stock_codes(filename: str) -> list: try: with open(filename, 'r') as f: codes = [line.strip() for line in f if line.strip()] return codes except FileNotFoundError: logging.error(f"文件 {filename} 未找到。") return [] # 从文件获取指定的代码,并拉取历史K线 def fetch_parts_by_codes(): # 读取股票代码列表 codes = read_stock_codes(file_selected_codes) # 如果没有代码,结束程序 if not codes: logging.error("没有找到有效的股票代码,程序终止。") return # 读取已经下载的代码列表,后续下载时忽略 done_codes = [] if os.path.exists(file_done_codes): with open(file_done_codes, 'r', encoding='utf-8') as f: done_codes = [line.strip() for line in f] # 使用strip()去掉每行的换行符和多余的空白 adjust_values = ['', 'qfq', 'hfq'] code_id_map_em_df = his_em.code_id_map_em() for key in codes: val = code_id_map_em_df.get(key) if key in done_codes: logging.info(f'Skipping already code. code: ({key})') continue if val is None: logging.error(f'cannot find stock code. code: ({key}), adjust: ({adjust_str})') continue succ = True start_time = time.time() # 在函数执行前获取当前时间 for adjust in adjust_values: adjust_str = adjust if adjust != '' else 'none' stock_zh_a_hist_df =fetch_with_retry(key, adjust) if stock_zh_a_hist_df.empty: logging.info(f'fetch his data error. code: ({key}), adjust: ({adjust_str})') succ = False else: # 将 DataFrame 输出为 CSV 文件 curr_dir = f'{dir_his_kline_em}/{val}_{adjust_str}' create_directory_if_not_exists(curr_dir) curr_file = f'{curr_dir}/{key}_{adjust_str}_his_data.csv' stock_zh_a_hist_df.to_csv(curr_file, index=False, encoding='utf-8') lines = stock_zh_a_hist_df.shape[0] logging.info(f'fetch his data and write to file. code: ({key}), adjust: ({adjust_str}), file: ({curr_file}) lines: ({lines})') time.sleep(5) end_time = time.time() # 在函数执行后获取当前时间 elapsed_time = int(end_time - start_time) # 计算时间差,秒 if succ: # 下载后,记录日志 with open(file_done_codes, 'a', encoding='utf-8') as done_list: done_list.write(f"{key}\n") logging.info(f"Downloaded and recorded: ({key}) total lines: {lines} time cost: {elapsed_time} s") time.sleep(10) # 获取全量代码的历史K线 def fetch_all_by_codes(): # 读取已经下载的代码列表,后续下载时忽略 done_codes = [] if os.path.exists(file_done_codes): with open(file_done_codes, 'r', encoding='utf-8') as f: done_codes = [line.strip() for line in f] # 使用strip()去掉每行的换行符和多余的空白 adjust_values = ['', 'qfq', 'hfq'] code_id_map_em_df = his_em.code_id_map_em() for key, val in code_id_map_em_df.items(): if key in done_codes: logging.info(f'Skipping already code. code: ({key})') continue succ = True start_time = time.time() # 在函数执行前获取当前时间 for adjust in adjust_values: adjust_str = adjust if adjust != '' else 'none' stock_zh_a_hist_df =fetch_with_retry(key, adjust) if stock_zh_a_hist_df.empty: logging.error(f'fetch his data error. code: ({key}), adjust: ({adjust_str})') succ = False else: # 将 DataFrame 输出为 CSV 文件 curr_dir = f'{dir_his_kline_em}/{val}_{adjust_str}' create_directory_if_not_exists(curr_dir) curr_file = f'{curr_dir}/{key}_{adjust_str}_his_data.csv' stock_zh_a_hist_df.to_csv(curr_file, index=False, encoding='utf-8') lines = stock_zh_a_hist_df.shape[0] logging.info(f'fetch his data and write to file. code: ({key}), adjust: ({adjust_str}), file: ({curr_file}) lines: ({lines})') time.sleep(5) end_time = time.time() # 在函数执行后获取当前时间 elapsed_time = int(end_time - start_time) # 计算时间差,秒 if succ: # 下载后,记录日志 with open(file_done_codes, 'a', encoding='utf-8') as done_list: done_list.write(f"{key}\n") logging.info(f"Downloaded and recorded: ({key}) total lines: {lines} time cost: {elapsed_time} s") time.sleep(10) # 从文件获取指定的代码,并拉取历史K线,废弃 def fetch_parts(): # 读取股票代码列表 codes = read_stock_codes(file_selected_codes) # 如果没有代码,结束程序 if not codes: logging.error("没有找到有效的股票代码,程序终止。") return adjust_values = ['', 'qfq', 'hfq'] code_id_map_em_df = his_em.code_id_map_em() for adjust in adjust_values: adjust_str = adjust if adjust != '' else 'none' for key in codes: val = code_id_map_em_df.get(key) if val is None: logging.error(f'cannot find stock code. code: ({key}), adjust: ({adjust_str})') continue stock_zh_a_hist_df =fetch_with_retry(key, adjust) if stock_zh_a_hist_df.empty: logging.info(f'fetch his data error. code: ({key}), adjust: ({adjust_str})') else: # 将 DataFrame 输出为 CSV 文件 stock_zh_a_hist_df.to_csv(f'../data/{val}/{key}_{adjust_str}_his_data.csv', index=False, encoding='utf-8') lines = stock_zh_a_hist_df.shape[0] logging.info(f'fetch his data and write to file. code: ({key}), adjust: ({adjust_str}), lines: ({lines})') time.sleep(5) time.sleep(10) # 获取全量代码的历史K线,废弃 def fetch_all(): adjust_values = ['', 'qfq', 'hfq'] code_id_map_em_df = his_em.code_id_map_em() for adjust in adjust_values: adjust_str = adjust if adjust != '' else 'none' for key, val in code_id_map_em_df.items(): stock_zh_a_hist_df =fetch_with_retry(key, adjust) if stock_zh_a_hist_df.empty: logging.error(f'fetch his data error. code: ({key}), adjust: ({adjust_str})') else: # 将 DataFrame 输出为 CSV 文件 stock_zh_a_hist_df.to_csv(f'../data/{val}/{key}_{adjust_str}_his_data.csv', index=False, encoding='utf-8') lines = stock_zh_a_hist_df.shape[0] logging.info(f'fetch his data and write to file. code: ({key}), adjust: ({adjust_str}), lines: ({lines})') time.sleep(5) time.sleep(10) # 主函数 if __name__ == '__main__': if len(sys.argv) != 2: print("Usage: python script.py ") print("cmd: all, parts") sys.exit(1) cmd = sys.argv[1] if cmd == "all": fetch_all_by_codes() # 拉取所有的代码 elif cmd == "parts": fetch_parts_by_codes() # 拉取指定的代码 elif cmd == "all_other": fetch_all() elif cmd == "parts_other": fetch_parts() else: print(f"Unknown command: {cmd}")