Files
stock/stockapp/src/get_his_kline_em.py
2024-10-25 09:59:08 +08:00

243 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import logging
import pandas as pd
import os
import sys
import config
import crawling.stock_hist_em as his_em
file_selected_codes = './cursor/his_kline_em_codes.txt' # 指定拉取的代码列表,每行一个代码
file_done_codes = './cursor/his_kline_em_done_codes.txt' # 已完成拉取的代码列表,每行一个代码
dir_his_kline_em = '../data/his_kline_em'
config.setup_logging()
# 刷新代码列表,并返回
def flush_code_map():
code_id_map_em_df = his_em.code_id_map_em()
print(code_id_map_em_df)
return code_id_map_em_df
# 获取历史K线如果失败就重试
def fetch_with_retry(code: str, adjust: str = '', max_retries: int = 20) -> pd.DataFrame :
retries = 0
while retries < max_retries:
try:
# 调用 stock_zh_a_hist 获取历史数据
df = his_em.stock_zh_a_hist(
symbol=code,
period="daily",
start_date="19000101",
end_date="20241020",
adjust=adjust,
)
# 如果获取到的数据为空,记录日志并重试
if df.empty:
retries += 1
time.sleep(3) # 每次重试前休眠 3 秒
else:
return df
except Exception as e:
retries += 1
time.sleep(3) # 每次重试前休眠 3 秒
return pd.DataFrame()
# 检查子目录是否存在,不存在则创建
def create_directory_if_not_exists(dir_name):
if not os.path.exists(dir_name):
os.makedirs(dir_name)
logging.info(f"Created directory: {dir_name}")
# 读取 code.txt 文件,并获取每个股票代码
def read_stock_codes(filename: str) -> list:
try:
with open(filename, 'r') as f:
codes = [line.strip() for line in f if line.strip()]
return codes
except FileNotFoundError:
logging.error(f"文件 {filename} 未找到。")
return []
# 从文件获取指定的代码并拉取历史K线
def fetch_parts_by_codes():
# 读取股票代码列表
codes = read_stock_codes(file_selected_codes)
# 如果没有代码,结束程序
if not codes:
logging.error("没有找到有效的股票代码,程序终止。")
return
# 读取已经下载的代码列表,后续下载时忽略
done_codes = []
if os.path.exists(file_done_codes):
with open(file_done_codes, 'r', encoding='utf-8') as f:
done_codes = [line.strip() for line in f] # 使用strip()去掉每行的换行符和多余的空白
adjust_values = ['', 'qfq', 'hfq']
code_id_map_em_df = his_em.code_id_map_em()
for key in codes:
val = code_id_map_em_df.get(key)
if key in done_codes:
logging.info(f'Skipping already code. code: ({key})')
continue
if val is None:
logging.error(f'cannot find stock code. code: ({key}), adjust: ({adjust_str})')
continue
succ = True
start_time = time.time() # 在函数执行前获取当前时间
for adjust in adjust_values:
adjust_str = adjust if adjust != '' else 'none'
stock_zh_a_hist_df =fetch_with_retry(key, adjust)
if stock_zh_a_hist_df.empty:
logging.info(f'fetch his data error. code: ({key}), adjust: ({adjust_str})')
succ = False
else:
# 将 DataFrame 输出为 CSV 文件
curr_dir = f'{dir_his_kline_em}/{val}_{adjust_str}'
create_directory_if_not_exists(curr_dir)
curr_file = f'{curr_dir}/{key}_{adjust_str}_his_data.csv'
stock_zh_a_hist_df.to_csv(curr_file, index=False, encoding='utf-8')
lines = stock_zh_a_hist_df.shape[0]
logging.info(f'fetch his data and write to file. code: ({key}), adjust: ({adjust_str}), file: ({curr_file}) lines: ({lines})')
time.sleep(5)
end_time = time.time() # 在函数执行后获取当前时间
elapsed_time = int(end_time - start_time) # 计算时间差,秒
if succ:
# 下载后,记录日志
with open(file_done_codes, 'a', encoding='utf-8') as done_list:
done_list.write(f"{key}\n")
logging.info(f"Downloaded and recorded: ({key}) total lines: {lines} time cost: {elapsed_time} s")
time.sleep(10)
# 获取全量代码的历史K线
def fetch_all_by_codes():
# 读取已经下载的代码列表,后续下载时忽略
done_codes = []
if os.path.exists(file_done_codes):
with open(file_done_codes, 'r', encoding='utf-8') as f:
done_codes = [line.strip() for line in f] # 使用strip()去掉每行的换行符和多余的空白
adjust_values = ['', 'qfq', 'hfq']
code_id_map_em_df = his_em.code_id_map_em()
for key, val in code_id_map_em_df.items():
if key in done_codes:
logging.info(f'Skipping already code. code: ({key})')
continue
succ = True
start_time = time.time() # 在函数执行前获取当前时间
for adjust in adjust_values:
adjust_str = adjust if adjust != '' else 'none'
stock_zh_a_hist_df =fetch_with_retry(key, adjust)
if stock_zh_a_hist_df.empty:
logging.error(f'fetch his data error. code: ({key}), adjust: ({adjust_str})')
succ = False
else:
# 将 DataFrame 输出为 CSV 文件
curr_dir = f'{dir_his_kline_em}/{val}_{adjust_str}'
create_directory_if_not_exists(curr_dir)
curr_file = f'{curr_dir}/{key}_{adjust_str}_his_data.csv'
stock_zh_a_hist_df.to_csv(curr_file, index=False, encoding='utf-8')
lines = stock_zh_a_hist_df.shape[0]
logging.info(f'fetch his data and write to file. code: ({key}), adjust: ({adjust_str}), file: ({curr_file}) lines: ({lines})')
time.sleep(5)
end_time = time.time() # 在函数执行后获取当前时间
elapsed_time = int(end_time - start_time) # 计算时间差,秒
if succ:
# 下载后,记录日志
with open(file_done_codes, 'a', encoding='utf-8') as done_list:
done_list.write(f"{key}\n")
logging.info(f"Downloaded and recorded: ({key}) total lines: {lines} time cost: {elapsed_time} s")
time.sleep(10)
# 从文件获取指定的代码并拉取历史K线废弃
def fetch_parts():
# 读取股票代码列表
codes = read_stock_codes(file_selected_codes)
# 如果没有代码,结束程序
if not codes:
logging.error("没有找到有效的股票代码,程序终止。")
return
adjust_values = ['', 'qfq', 'hfq']
code_id_map_em_df = his_em.code_id_map_em()
for adjust in adjust_values:
adjust_str = adjust if adjust != '' else 'none'
for key in codes:
val = code_id_map_em_df.get(key)
if val is None:
logging.error(f'cannot find stock code. code: ({key}), adjust: ({adjust_str})')
continue
stock_zh_a_hist_df =fetch_with_retry(key, adjust)
if stock_zh_a_hist_df.empty:
logging.info(f'fetch his data error. code: ({key}), adjust: ({adjust_str})')
else:
# 将 DataFrame 输出为 CSV 文件
stock_zh_a_hist_df.to_csv(f'../data/{val}/{key}_{adjust_str}_his_data.csv', index=False, encoding='utf-8')
lines = stock_zh_a_hist_df.shape[0]
logging.info(f'fetch his data and write to file. code: ({key}), adjust: ({adjust_str}), lines: ({lines})')
time.sleep(5)
time.sleep(10)
# 获取全量代码的历史K线废弃
def fetch_all():
adjust_values = ['', 'qfq', 'hfq']
code_id_map_em_df = his_em.code_id_map_em()
for adjust in adjust_values:
adjust_str = adjust if adjust != '' else 'none'
for key, val in code_id_map_em_df.items():
stock_zh_a_hist_df =fetch_with_retry(key, adjust)
if stock_zh_a_hist_df.empty:
logging.error(f'fetch his data error. code: ({key}), adjust: ({adjust_str})')
else:
# 将 DataFrame 输出为 CSV 文件
stock_zh_a_hist_df.to_csv(f'../data/{val}/{key}_{adjust_str}_his_data.csv', index=False, encoding='utf-8')
lines = stock_zh_a_hist_df.shape[0]
logging.info(f'fetch his data and write to file. code: ({key}), adjust: ({adjust_str}), lines: ({lines})')
time.sleep(5)
time.sleep(10)
# 主函数
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: python script.py <cmd>")
print("cmd: all, parts")
sys.exit(1)
cmd = sys.argv[1]
if cmd == "all":
fetch_all_by_codes() # 拉取所有的代码
elif cmd == "parts":
fetch_parts_by_codes() # 拉取指定的代码
elif cmd == "all_other":
fetch_all()
elif cmd == "parts_other":
fetch_parts()
else:
print(f"Unknown command: {cmd}")