""" Script Name: Description: 从 https://www.iafd.com 上获取信息。利用cloudscraper绕过cloudflare detail_fetch.py 从本地已经保存的列表数据,逐个拉取详情,并输出到文件。 list_fetch_astro.py 按照星座拉取数据,获得演员的信息列表。数据量适中,各详细字段较全 list_fetch_birth.py 按照生日拉取数据,获得演员的信息列表。数据量适中,各详细字段较全 list_fetch_ethnic.py 按照人种拉取数据,获得演员的信息列表。数据量大,但详细字段很多无效的 list_merge.py 上面三个列表的数据,取交集,得到整体数据。 iafd_scrape.py 借助 https://github.com/stashapp/CommunityScrapers 实现的脚本,可以输入演员的 iafd链接,获取兼容 stashapp 格式的数据。(作用不大,因为国籍、照片等字段不匹配) html_format.py 负责读取已经保存的html目录, 提取信息,格式化输出。 data_merge.py 负责合并数据,它把从 iafd, javhd, thelordofporn 以及搭建 stashapp, 从上面更新到的演员数据(需导出)进行合并; stashdb_merge.py 负责把从stashapp中导出的单个演员的json文件, 批量合并并输出; 通常我们需要把stashapp中导出的批量文件压缩并传输到data/tmp目录,解压后合并 从而获取到一份完整的数据列表。 Author: [Your Name] Created Date: YYYY-MM-DD Last Modified: YYYY-MM-DD Version: 1.0 Modification History: - YYYY-MM-DD [Your Name]: - YYYY-MM-DD [Your Name]: - YYYY-MM-DD [Your Name]: """ import cloudscraper import json import time import csv from bs4 import BeautifulSoup import logging import config config.setup_logging() # 定义基础 URL 和可变参数 host_url = "https://www.iafd.com" base_url = f"{host_url}/astrology.rme/sign=" astro_list = ['Aries', 'Taurus', 'Gemini', 'Cancer', 'Leo', 'Virgo', 'Libra', 'Scorpio', 'Sagittarius', 'Capricorn', 'Aquarius', 'Pisces'] # 设置 headers 和 scraper headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } scraper = cloudscraper.create_scraper() # 结果路径 res_dir = './result' # 记录 ethinc_map astro_map = [] # 网络请求并解析 HTML def fetch_page(url): try: response = scraper.get(url, headers=headers) response.raise_for_status() return response.text except Exception as e: logging.error(f"Failed to fetch {url}: {e}") return None # 解析 HTML 内容,提取需要的数据 def parse_page(html, astro): soup = BeautifulSoup(html, "html.parser") astro_div = soup.find("div", id="astro") if not astro_div: logging.warning(f"Warning: No 'astro' div found in {astro}") return None flag = False list_cnt = 0 birth_date = None for elem in astro_div.find_all(recursive=False): if elem.name == "h3" and "astroday" in elem.get("class", []): birth_date = elem.get_text(strip=True) elif elem.name == "div" and "perficon" in elem.get("class", []): a_tag = elem.find("a") if a_tag: href = host_url + a_tag["href"] name = a_tag.find("span", class_="perfname") if name: astro_map.append({ "astrology": astro, "birth_date": birth_date, "person": name.get_text(strip=True), "href": href }) flag = True list_cnt = list_cnt +1 if flag: logging.info(f"get {list_cnt} persons from this page. total persons: {len(astro_map)}") return soup else: return None # 处理翻页,星座的无需翻页 def handle_pagination(soup, astro): return None # 主逻辑函数:循环处理每个种族 def process_astro_data(): for astro in astro_list: url = base_url + astro next_url = url logging.info(f"Fetching data for {astro}, url {url} ...") while next_url: html = fetch_page(next_url) if html: soup = parse_page(html, astro) if soup: next_url = handle_pagination(soup, astro) else: logging.info(f"wrong html content. retring {next_url} ...") # 定期保存结果 save_data() time.sleep(2) # 控制访问频率 else: logging.info(f"Retrying {next_url} ...") time.sleep(5) # 等待后再重试 # 保存到文件 def save_data(): with open(f'{res_dir}/astro.json', 'w', encoding='utf-8') as json_file: json.dump(astro_map, json_file, indent=4, ensure_ascii=False) with open(f'{res_dir}/astro.csv', 'w', newline='', encoding='utf-8') as csv_file: writer = csv.DictWriter(csv_file, fieldnames=['astrology', 'birth_date', 'person', 'href']) writer.writeheader() writer.writerows(astro_map) # 执行主逻辑 if __name__ == '__main__': process_astro_data() save_data() logging.info("Data fetching and saving completed.")