modify fetch em data files.

This commit is contained in:
2024-10-25 09:59:08 +08:00
parent c8342de5a2
commit fcf6f8a945
9 changed files with 150 additions and 5 deletions

View File

@ -0,0 +1,11 @@
689009
688981
688517
000001
000002
000001
688353
688093
688303
000063
000100

View File

@ -1,18 +1,25 @@
import time
import logging
import pandas as pd
import os
import sys
import config
import crawling.stock_hist_em as his_em
file_selected_codes = './cursor/his_kline_em_codes.txt' # 指定拉取的代码列表,每行一个代码
file_done_codes = './cursor/his_kline_em_done_codes.txt' # 已完成拉取的代码列表,每行一个代码
dir_his_kline_em = '../data/his_kline_em'
config.setup_logging()
# 刷新代码列表,并返回
def flush_code_map():
code_id_map_em_df = his_em.code_id_map_em()
print(code_id_map_em_df)
return code_id_map_em_df
# 获取历史K线如果失败就重试
def fetch_with_retry(code: str, adjust: str = '', max_retries: int = 5) -> pd.DataFrame :
def fetch_with_retry(code: str, adjust: str = '', max_retries: int = 20) -> pd.DataFrame :
retries = 0
while retries < max_retries:
try:
@ -37,6 +44,13 @@ def fetch_with_retry(code: str, adjust: str = '', max_retries: int = 5) -> pd.Da
return pd.DataFrame()
# 检查子目录是否存在,不存在则创建
def create_directory_if_not_exists(dir_name):
if not os.path.exists(dir_name):
os.makedirs(dir_name)
logging.info(f"Created directory: {dir_name}")
# 读取 code.txt 文件,并获取每个股票代码
def read_stock_codes(filename: str) -> list:
try:
@ -47,10 +61,115 @@ def read_stock_codes(filename: str) -> list:
logging.error(f"文件 {filename} 未找到。")
return []
# 从文件获取指定的代码并拉取历史K线
def fetch_parts_by_codes():
# 读取股票代码列表
codes = read_stock_codes(file_selected_codes)
# 如果没有代码,结束程序
if not codes:
logging.error("没有找到有效的股票代码,程序终止。")
return
# 读取已经下载的代码列表,后续下载时忽略
done_codes = []
if os.path.exists(file_done_codes):
with open(file_done_codes, 'r', encoding='utf-8') as f:
done_codes = [line.strip() for line in f] # 使用strip()去掉每行的换行符和多余的空白
adjust_values = ['', 'qfq', 'hfq']
code_id_map_em_df = his_em.code_id_map_em()
for key in codes:
val = code_id_map_em_df.get(key)
if key in done_codes:
logging.info(f'Skipping already code. code: ({key})')
continue
if val is None:
logging.error(f'cannot find stock code. code: ({key}), adjust: ({adjust_str})')
continue
succ = True
start_time = time.time() # 在函数执行前获取当前时间
for adjust in adjust_values:
adjust_str = adjust if adjust != '' else 'none'
stock_zh_a_hist_df =fetch_with_retry(key, adjust)
if stock_zh_a_hist_df.empty:
logging.info(f'fetch his data error. code: ({key}), adjust: ({adjust_str})')
succ = False
else:
# 将 DataFrame 输出为 CSV 文件
curr_dir = f'{dir_his_kline_em}/{val}_{adjust_str}'
create_directory_if_not_exists(curr_dir)
curr_file = f'{curr_dir}/{key}_{adjust_str}_his_data.csv'
stock_zh_a_hist_df.to_csv(curr_file, index=False, encoding='utf-8')
lines = stock_zh_a_hist_df.shape[0]
logging.info(f'fetch his data and write to file. code: ({key}), adjust: ({adjust_str}), file: ({curr_file}) lines: ({lines})')
time.sleep(5)
end_time = time.time() # 在函数执行后获取当前时间
elapsed_time = int(end_time - start_time) # 计算时间差,秒
if succ:
# 下载后,记录日志
with open(file_done_codes, 'a', encoding='utf-8') as done_list:
done_list.write(f"{key}\n")
logging.info(f"Downloaded and recorded: ({key}) total lines: {lines} time cost: {elapsed_time} s")
time.sleep(10)
# 获取全量代码的历史K线
def fetch_all_by_codes():
# 读取已经下载的代码列表,后续下载时忽略
done_codes = []
if os.path.exists(file_done_codes):
with open(file_done_codes, 'r', encoding='utf-8') as f:
done_codes = [line.strip() for line in f] # 使用strip()去掉每行的换行符和多余的空白
adjust_values = ['', 'qfq', 'hfq']
code_id_map_em_df = his_em.code_id_map_em()
for key, val in code_id_map_em_df.items():
if key in done_codes:
logging.info(f'Skipping already code. code: ({key})')
continue
succ = True
start_time = time.time() # 在函数执行前获取当前时间
for adjust in adjust_values:
adjust_str = adjust if adjust != '' else 'none'
stock_zh_a_hist_df =fetch_with_retry(key, adjust)
if stock_zh_a_hist_df.empty:
logging.error(f'fetch his data error. code: ({key}), adjust: ({adjust_str})')
succ = False
else:
# 将 DataFrame 输出为 CSV 文件
curr_dir = f'{dir_his_kline_em}/{val}_{adjust_str}'
create_directory_if_not_exists(curr_dir)
curr_file = f'{curr_dir}/{key}_{adjust_str}_his_data.csv'
stock_zh_a_hist_df.to_csv(curr_file, index=False, encoding='utf-8')
lines = stock_zh_a_hist_df.shape[0]
logging.info(f'fetch his data and write to file. code: ({key}), adjust: ({adjust_str}), file: ({curr_file}) lines: ({lines})')
time.sleep(5)
end_time = time.time() # 在函数执行后获取当前时间
elapsed_time = int(end_time - start_time) # 计算时间差,秒
if succ:
# 下载后,记录日志
with open(file_done_codes, 'a', encoding='utf-8') as done_list:
done_list.write(f"{key}\n")
logging.info(f"Downloaded and recorded: ({key}) total lines: {lines} time cost: {elapsed_time} s")
time.sleep(10)
# 从文件获取指定的代码并拉取历史K线废弃
def fetch_parts():
# 读取股票代码列表
codes = read_stock_codes('code.txt')
codes = read_stock_codes(file_selected_codes)
# 如果没有代码,结束程序
if not codes:
logging.error("没有找到有效的股票代码,程序终止。")
@ -80,7 +199,7 @@ def fetch_parts():
time.sleep(10)
# 获取全量代码的历史K线
# 获取全量代码的历史K线,废弃
def fetch_all():
adjust_values = ['', 'qfq', 'hfq']
code_id_map_em_df = his_em.code_id_map_em()
@ -104,5 +223,20 @@ def fetch_all():
# 主函数
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: python script.py <cmd>")
print("cmd: all, parts")
sys.exit(1)
cmd = sys.argv[1]
if cmd == "all":
fetch_all_by_codes() # 拉取所有的代码
elif cmd == "parts":
fetch_parts_by_codes() # 拉取指定的代码
elif cmd == "all_other":
fetch_all()
#fetch_parts()
elif cmd == "parts_other":
fetch_parts()
else:
print(f"Unknown command: {cmd}")