""" Script Name: Description: 从 https://www.iafd.com 上获取信息。利用cloudscraper绕过cloudflare detail_fetch.py 从本地已经保存的列表数据,逐个拉取详情,并输出到文件。 list_fetch_astro.py 按照星座拉取数据,获得演员的信息列表。数据量适中,各详细字段较全 list_fetch_birth.py 按照生日拉取数据,获得演员的信息列表。数据量适中,各详细字段较全 list_fetch_ethnic.py 按照人种拉取数据,获得演员的信息列表。数据量大,但详细字段很多无效的 list_merge.py 上面三个列表的数据,取交集,得到整体数据。 iafd_scrape.py 借助 https://github.com/stashapp/CommunityScrapers 实现的脚本,可以输入演员的 iafd链接,获取兼容 stashapp 格式的数据。(作用不大,因为国籍、照片等字段不匹配) html_format.py 负责读取已经保存的html目录, 提取信息,格式化输出。 data_merge.py 负责合并数据,它把从 iafd, javhd, thelordofporn 以及搭建 stashapp, 从上面更新到的演员数据(需导出)进行合并; stashdb_merge.py 负责把从stashapp中导出的单个演员的json文件, 批量合并并输出; 通常我们需要把stashapp中导出的批量文件压缩并传输到data/tmp目录,解压后合并 从而获取到一份完整的数据列表。 Author: [Your Name] Created Date: YYYY-MM-DD Last Modified: YYYY-MM-DD Version: 1.0 Modification History: - YYYY-MM-DD [Your Name]: - YYYY-MM-DD [Your Name]: - YYYY-MM-DD [Your Name]: """ import json import os import subprocess import time import logging from typing import List # 设置日志配置 logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # 预定义的 scrapers 目录 scrapers_dir = "/root/gitlabs/stashapp_CommunityScrapers/scrapers" meta_file = "./data/iafd_meta.json" cursor_file = "./data/iafd_cursor.txt" output_dir = f"{scrapers_dir}/iafd_meta" # 重试次数和间隔 MAX_RETRIES = 10 RETRY_DELAY = 5 # 5秒重试间隔 # 创建输出目录 os.makedirs(output_dir, exist_ok=True) def read_processed_hrefs() -> set: """ 读取已经处理过的 href """ processed_hrefs = set() if os.path.exists(cursor_file): with open(cursor_file, "r", encoding="utf-8") as f: processed_hrefs = {line.strip().split(",")[1] for line in f if "," in line} return processed_hrefs def execute_scraper_command(href: str, idv: str) -> bool: """ 执行命令抓取数据,成功则返回True,否则返回False。 包含重试机制。 """ command = f"cd {scrapers_dir}; python3 -m IAFD.IAFD performer {href} > {output_dir}/{idv}.json" attempt = 0 while attempt < MAX_RETRIES: try: logger.info(f"执行命令: {command}") subprocess.run(command, shell=True, check=True) return True except subprocess.CalledProcessError as e: logger.error(f"执行命令失败: {e}. 重试 {attempt + 1}/{MAX_RETRIES}...") time.sleep(RETRY_DELAY) attempt += 1 logger.error(f"命令执行失败,已尝试 {MAX_RETRIES} 次: {command}") return False def validate_json_file(idv: str) -> bool: """ 校验 JSON 文件是否有效 """ output_file = f"{output_dir}/{idv}.json" try: with open(output_file, "r", encoding="utf-8") as f: content = f.read().strip() json_data = json.loads(content) # 尝试解析 JSON if "name" not in json_data: raise ValueError("缺少 'name' 字段") return True except (json.JSONDecodeError, ValueError) as e: logger.error(f"解析失败,删除无效文件: {output_file}. 错误: {e}") os.remove(output_file) return False def process_iafd_meta(data: List[dict], processed_hrefs: set) -> None: """ 处理 iafd_meta.json 中的数据 """ for entry in data: person = entry.get("person") href = entry.get("href") if not person or not href: logger.warning(f"跳过无效数据: {entry}") continue # 解析 href 提取 id try: idv = href.split("id=")[-1] except IndexError: logger.error(f"无法解析 ID: {href}") continue output_file = f"{output_dir}/{idv}.json" # 跳过已处理的 href if href in processed_hrefs: logger.info(f"已处理,跳过: {person}, {href}") continue # 执行数据抓取 if not execute_scraper_command(href, idv): continue # 校验 JSON 文件 if not validate_json_file(idv): continue # 记录已处理数据 with open(cursor_file, "a", encoding="utf-8") as f: f.write(f"{person},{href}\n") logger.info(f"成功处理: {person} - {href}") def main(): """ 主程序执行函数 """ # 读取已处理的 href processed_hrefs = read_processed_hrefs() # 读取 iafd_meta.json 数据 try: with open(meta_file, "r", encoding="utf-8") as f: data = json.load(f) except json.JSONDecodeError as e: logger.error(f"读取 iafd_meta.json 错误: {e}") return # 处理数据 process_iafd_meta(data, processed_hrefs) if __name__ == "__main__": main()