import json import time import os import argparse import textwrap import logging from datetime import datetime, timedelta from functools import partial import config import scraper import utils from urllib.parse import urljoin, urlparse config.setup_logging() debug = False skip_local = False scan_mode = 0 update_mode = 0 current_date_str = datetime.now().strftime("%Y-%m-%d") docs_dir = f"{config.global_share_data_dir}/docs" target_csv = f"{docs_dir}/u3c3.csv" target_torrent_dir = f"{docs_dir}/u3c3_torrents" # 获取演员列表 def fetch_list(start_p=1): p = start_p total_results = [] # 备份已有文件 utils.backup_existing_file(target_csv) url = f"https://u001.25img.com/?p={p}" while url: logging.info(f"fetching url {url}") soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="table-responsive", attr_type="class")) if soup: list_data, total_pages = scraper.parse_page(soup, url) if list_data : total_results.extend(list_data) else: logging.warning(f"fetch_list failed. url: {url} ") if total_pages: if p >= total_pages: url = None else: p += 1 url = f"https://u001.25img.com/?p={p}" if p % 10 == 0 : lines = utils.append_to_csv(total_results, target_csv) if lines: logging.info(f"write to csv file. new lines: {len(total_results)}, total lines: {lines}") total_results.clear() # 清空缓冲区 time.sleep(1) else: logging.warning(f"fetch_list failed. url: {url} ") url = None else: logging.warning(f'fetch_page error. url: {url}, status_code: {status_code}') if debug: break # 写入csv文件 lines = utils.append_to_csv(total_results, target_csv) total_results.clear() if lines: logging.info(f"write to csv file succ. file: {target_csv}. total lines: {lines}") logging.info(f"fetch list finished. total pages: {p}") # 下载资源 def down_torrents(): # 读取CSV数据 rows = utils.read_csv_data(target_csv) if not rows: return # 创建主下载目录 os.makedirs(target_torrent_dir, exist_ok=True) for row in rows: title = row.get('title', '') torrent_url = row.get('torrent_url', '') # 检查URL是否合法 if not (torrent_url.startswith('https') and torrent_url.endswith('.torrent')): logging.warning(f"跳过非法torrent链接: {torrent_url}") continue # 解析文件名 try: parsed_url = urlparse(torrent_url) filename = os.path.basename(parsed_url.path) if not filename: logging.warning(f"无法从URL解析文件名: {torrent_url}") continue except Exception as e: logging.warning(f"解析URL时出错: {e}") continue # 创建子目录(按文件名首字母小写) first_char = filename[0].lower() subdir = os.path.join(target_torrent_dir, first_char) os.makedirs(subdir, exist_ok=True) # 检查文件是否已存在 local_path = os.path.join(subdir, filename) if os.path.exists(local_path): logging.info(f"文件已存在,跳过下载: {title}, {local_path}") continue succ = scraper.download_torrent(torrent_url, local_path) if succ: logging.info(f"download succ. {title}, {local_path}") if debug: break time.sleep(1) # 获取演员列表 def fetch_sis_list(url = 'https://sis001.com/forum/forum-25-1.html', target_csv_sis = f"{config.global_share_data_dir}/sis_asia_zt.csv", ident='forum_25', plate_name='亚无转帖'): total_results = [] cnt = 0 while url: logging.info(f"fetching url {url}") soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="table", identifier=ident, attr_type="id")) if soup: list_data, next_url = scraper.parse_sis_list(soup, url, ident, plate_name) if list_data : total_results.extend(list_data) else: logging.warning(f"fetch_list failed. url: {url} ") if next_url: url = next_url cnt += 1 if cnt % 10 == 0 : lines = utils.append_to_csv(total_results, target_csv_sis) if lines: logging.info(f"write to csv file. new lines: {len(total_results)}, total lines: {lines}") total_results.clear() time.sleep(1) else: logging.warning(f"fetch_list failed. url: {url} ") url = None else: logging.warning(f'fetch_page error. url: {url}, status_code: {status_code}') if debug: break # 写入csv文件 lines = utils.append_to_csv(total_results, target_csv_sis) total_results.clear() if lines: logging.info(f"write to csv file succ. file: {target_csv_sis}, total lines: {lines}") logging.info(f"fetch list finished. total pages: {cnt}") def fetch_sis_all(): sections = [ { 'plate' : 'sis_asia_yc', 'plate_name' : '亚无原创', 'url' : 'https://sis001.com/forum/forum-143-1.html', 'ident' : 'forum_143' }, { 'plate' : 'sis_asia_zt', 'plate_name' : '亚无转帖', 'url' : 'https://sis001.com/forum/forum-25-1.html', 'ident' : 'forum_25' }, { 'plate' : 'sis_oumei_yc', 'plate_name' : '欧无原创', 'url' : 'https://sis001.com/forum/forum-229-1.html', 'ident' : 'forum_229' }, { 'plate' : 'sis_oumei_zt', 'plate_name' : '欧无转帖', 'url' : 'https://sis001.com/forum/forum-77-1.html', 'ident' : 'forum_77' }, ] for item in sections: section = item['plate'] url = item['url'] logging.info(f"---------------start fetching {section}, begin url: {url}") #csv_file = f"{config.global_share_data_dir}/{section}.csv" csv_file = f"{docs_dir}/sis.csv" # 备份已有文件 utils.backup_existing_file(csv_file) fetch_sis_list(url=url, target_csv_sis=csv_file, ident=item['ident'], plate_name=item['plate_name']) # 建立缩写到函数的映射 function_map = { "list": fetch_list, "down" : down_torrents, "sis": fetch_sis_list, "sis_all": fetch_sis_all, } # 主函数 def main(cmd, args): # 执行指定的函数 if cmd: function_names = args.cmd.split(",") # 拆分输入 for short_name in function_names: func = function_map.get(short_name.strip()) # 从映射中获取对应的函数 if callable(func): func() else: logging.warning(f" {short_name} is not a valid function shortcut.") else: # 全量执行 for name, func in function_map.items(): if callable(func): func() else: logging.warning(f" {short_name} is not a valid function shortcut.") logging.info(f'all process completed!') # TODO: # 1, # 设置环境变量 def set_env(args): global debug debug = args.debug if debug: logger = logging.getLogger() logger.setLevel(logging.DEBUG) global skip_local skip_local = args.skip_local global scan_mode scan_mode = args.scan_mode global update_mode if args.update: update_mode = args.update if __name__ == "__main__": # 命令行参数处理 keys_str = ",".join(function_map.keys()) usage_examples = textwrap.dedent(''' 示例用法: python3 ./fetch.py # 刷新列表,并下载新增资源 python3 ./fetch.py --cmd=list # 刷新列表 python3 ./fetch.py --cmd=down # 并下载新增资源 python3 ./fetch.py --cmd=sis # 刷新sis列表, 亚无转帖版面 python3 ./fetch.py --cmd=sis_all # 刷新sis列表, 所有版面 ''') parser = argparse.ArgumentParser( description='fetch javhd data.\n\n' + usage_examples, formatter_class=argparse.RawDescriptionHelpFormatter ) #parser = argparse.ArgumentParser(description='fetch javdb data.') parser.add_argument("--cmd", type=str, help=f"Comma-separated list of function shortcuts: {keys_str}") parser.add_argument('--update', type=int, choices=[0, 1, 2, 3, 4], default=0, help='0-只遍历is_full_data=0(默认), 1-只遍历is_full_data=1, 2-遍历is_full_data<=1, 3-只遍历is_full_data>1(异常数据), 4-遍历所有') parser.add_argument('--scan_mode', type=int, choices=[0, 1, 2], default=1, help='1-只遍历所有 uncensored 的 makers/series/actors/movies(默认), 0-与前者相反, 2-全量') parser.add_argument('--skip_local', action='store_true', help='如果本地缓存了页面,则跳过数据库操作') parser.add_argument('--debug', action='store_true', help='Enable debug mode (limit records)') args = parser.parse_args() os.makedirs(docs_dir, exist_ok=True) set_env(args) main(args.cmd, args)