This repository has been archived on 2026-01-07. You can view files and clone it, but cannot push or open issues or pull requests.
Files
resources/iafd/src_json/movie_list_fetch.py
2025-03-17 11:30:35 +08:00

256 lines
8.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Script Name:
Description: 从 https://www.iafd.com 上获取信息。利用cloudscraper绕过cloudflare
detail_fetch.py 从本地已经保存的列表数据,逐个拉取详情,并输出到文件。
list_fetch_astro.py 按照星座拉取数据,获得演员的信息列表。数据量适中,各详细字段较全
list_fetch_birth.py 按照生日拉取数据,获得演员的信息列表。数据量适中,各详细字段较全
list_fetch_ethnic.py 按照人种拉取数据,获得演员的信息列表。数据量大,但详细字段很多无效的
list_merge.py 上面三个列表的数据,取交集,得到整体数据。
iafd_scrape.py 借助 https://github.com/stashapp/CommunityScrapers 实现的脚本,可以输入演员的 iafd链接,获取兼容 stashapp 格式的数据。(作用不大,因为国籍、照片等字段不匹配)
html_format.py 负责读取已经保存的html目录, 提取信息,格式化输出。
data_merge.py 负责合并数据,它把从 iafd, javhd, thelordofporn 以及搭建 stashapp, 从上面更新到的演员数据(需导出)进行合并;
stashdb_merge.py 负责把从stashapp中导出的单个演员的json文件, 批量合并并输出; 通常我们需要把stashapp中导出的批量文件压缩并传输到data/tmp目录,解压后合并
从而获取到一份完整的数据列表。
Author: [Your Name]
Created Date: YYYY-MM-DD
Last Modified: YYYY-MM-DD
Version: 1.0
Modification History:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
"""
import cloudscraper
import json
import time
import csv
import argparse
from bs4 import BeautifulSoup
import logging
import config
config.setup_logging()
# 定义基础 URL 和可变参数
host_url = "https://www.iafd.com"
# 结果路径
res_dir = f"{config.global_share_data_dir}/iafd"
fetch_config = {
'dist': {
'base_url': f"{host_url}/distrib.rme/distrib=",
'list_page_url': f"{host_url}/distrib.asp",
'html_table_id': 'distable',
'html_select_name': 'Distrib',
'output_key_id': 'distributors',
'json_file': f'{res_dir}/distributors.json',
'csv_file': f'{res_dir}/distributors.csv',
},
'stu': {
'base_url': f"{host_url}/studio.rme/studio=",
'list_page_url': f"{host_url}/studio.asp",
'html_table_id': 'studio',
'html_select_name': 'Studio',
'output_key_id': 'studios',
'json_file': f'{res_dir}/studios.json',
'csv_file': f'{res_dir}/studios.csv',
}
}
distr_map = {
6812 : 'nubilefilms.com',
8563 : 'teenmegaworld network',
6779 : 'x-art.com',
7133 : 'tushy.com',
6496 : 'blacked.com',
7758 : 'vixen.com',
6791 : 'teamskeet.com',
12454: 'vip4k.com',
13541: 'wow network',
9702 : 'cum4k.com',
6778 : 'tiny4k.com',
12667: 'anal4k.com',
7419 : 'exotic4k.com',
13594: 'facials4k.com',
13633: 'mom4k.com',
12335: 'slim4k.com',
16709: 'strippers4k.com',
}
studio_map = {
6812 : 'nubilefilms.com',
9811 : 'Teen Mega World',
6779 : 'x-art.com',
7133 : 'tushy.com',
6496 : 'blacked.com',
7758 : 'vixen.com',
6791 : 'teamskeet.com',
8052: 'wowgirls.com',
9702 : 'cum4k.com',
6778 : 'tiny4k.com',
12667: 'anal4k.com',
7419 : 'exotic4k.com',
13594: 'facials4k.com',
13633: 'mom4k.com',
12335: 'slim4k.com',
16709: 'strippers4k.com',
}
# 设置 headers 和 scraper
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
scraper = cloudscraper.create_scraper()
all_data = []
# 网络请求并解析 HTML
def fetch_page(url):
try:
response = scraper.get(url, headers=headers)
response.raise_for_status()
return response.text
except Exception as e:
logging.error(f"Failed to fetch {url}: {e}")
return None
# 解析 HTML 内容,提取需要的数据
def parse_page(html, name, config):
table_id = config['html_table_id']
key_id = config['output_key_id']
soup = BeautifulSoup(html, "html.parser")
table = soup.find("table", id=table_id)
if not table:
logging.warning(f"Warning: No {table_id} table found in {name}")
return None
# 找到thead并跳过
thead = table.find('thead')
if thead:
thead.decompose() # 去掉thead部分不需要解析
# 现在只剩下tbody部分
tbody = table.find('tbody')
rows = tbody.find_all('tr') if tbody else []
global all_data
for row in rows:
cols = row.find_all('td')
if len(cols) >= 5:
title = cols[0].text.strip()
label = cols[1].text.strip()
year = cols[2].text.strip()
rev = cols[3].text.strip()
a_href = cols[0].find('a')
href = host_url + a_href['href'] if a_href else ''
all_data.append({
key_id: name,
'title': title,
'label': label,
'year': year,
'rev': rev,
'href': href
})
return soup
# 处理翻页,星座的无需翻页
def handle_pagination(soup, astro):
return None
# 获取列表页
def process_list_gage(config):
list_page_url=config['list_page_url']
select_name = config['html_select_name']
list_map = {}
logging.info(f"Fetching data for {list_page_url} ...")
select_element = None
while True:
html = fetch_page(list_page_url)
if html:
soup = BeautifulSoup(html, "html.parser")
select_element = soup.find('select', {'name': select_name})
if select_element :
break
else:
logging.info(f"wrong html content. retring {list_page_url} ...")
else:
logging.info(f"wrong html content. retring {list_page_url} ...")
if not select_element:
return None
options = select_element.find_all('option')
for option in options:
value = option.get('value') # 获取 value 属性
text = option.text.strip() # 获取文本内容
list_map[int(value)] = text
logging.info(f'fetch {list_page_url} succ. total lines: {len(list_map)}')
return list_map
# 主逻辑函数:循环处理每个种族
def process_main_data(list_data, config):
base_url = config['base_url']
for key, name in list_data.items():
url = base_url + str(key)
next_url = url
logging.info(f"Fetching data for {name}, url {url} ...")
while next_url:
html = fetch_page(next_url)
if html:
soup = parse_page(html, name, config)
if soup:
next_url = handle_pagination(soup, name)
else:
logging.info(f"wrong html content. retring {next_url} ...")
# 定期保存结果
save_data(config)
time.sleep(2) # 控制访问频率
else:
logging.info(f"Retrying {next_url} ...")
time.sleep(5) # 等待后再重试
# 保存到文件
def save_data(config):
with open(config['json_file'], 'w', encoding='utf-8') as json_file:
json.dump(all_data, json_file, indent=4, ensure_ascii=False)
with open(config['csv_file'], 'w', newline='', encoding='utf-8') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=[config['output_key_id'], 'title', 'label', 'year', 'rev', 'href'])
writer.writeheader()
writer.writerows(all_data)
# 执行主逻辑
if __name__ == '__main__':
# 命令行参数处理
parser = argparse.ArgumentParser(description='fetch movie list from iafd.com')
parser.add_argument('--type', type=str, default='dist', help='fetch by ... (dist , stu)')
parser.add_argument('--kind', type=str, default='parts', help='fetch all or parts (parts , all)')
args = parser.parse_args()
config = fetch_config[args.type]
if not config:
logging.warning(f'unkwon type: {args.type} {args.kind}')
else:
list_data = {}
if args.kind == 'all':
list_data = process_list_gage(config)
elif args.type == 'dist':
list_data = distr_map
else:
list_data = studio_map
process_main_data(list_data, config)
logging.info("Data fetching and saving completed.")