Files
stock/scripts/iafd/detail_fetch.py
2025-02-26 11:06:21 +08:00

342 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Script Name:
Description: 从 https://www.iafd.com 上获取信息。利用cloudscraper绕过cloudflare
detail_fetch.py 从本地已经保存的列表数据,逐个拉取详情,并输出到文件。
list_fetch_astro.py 按照星座拉取数据,获得演员的信息列表。数据量适中,各详细字段较全
list_fetch_birth.py 按照生日拉取数据,获得演员的信息列表。数据量适中,各详细字段较全
list_fetch_ethnic.py 按照人种拉取数据,获得演员的信息列表。数据量大,但详细字段很多无效的
list_merge.py 上面三个列表的数据,取交集,得到整体数据。
iafd_scrape.py 借助 https://github.com/stashapp/CommunityScrapers 实现的脚本,可以输入演员的 iafd链接,获取兼容 stashapp 格式的数据。(作用不大,因为国籍、照片等字段不匹配)
html_format.py 负责读取已经保存的html目录, 提取信息,格式化输出。
data_merge.py 负责合并数据,它把从 iafd, javhd, thelordofporn 以及搭建 stashapp, 从上面更新到的演员数据(需导出)进行合并;
stashdb_merge.py 负责把从stashapp中导出的单个演员的json文件, 批量合并并输出; 通常我们需要把stashapp中导出的批量文件压缩并传输到data/tmp目录,解压后合并
从而获取到一份完整的数据列表。
Author: [Your Name]
Created Date: YYYY-MM-DD
Last Modified: YYYY-MM-DD
Version: 1.0
Modification History:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
"""
import cloudscraper
import time
import json
import csv
import logging
import signal
import sys
import os
import re
from bs4 import BeautifulSoup
from requests.exceptions import RequestException
import config
# 配置日志
config.setup_logging()
# 结果路径
res_dir = './result'
res_json_file = f'{res_dir}/detail.json'
res_csv_file = f'{res_dir}/detail.csv'
input_json_file = f'{res_dir}/merged.json'
performers_dir = f'{res_dir}/performers'
# 存储结果
final_data = []
# 读取 detail.json 中的 数据,以便于断点续传
def load_existing_hrefs():
existing_hrefs = set()
global final_data
try:
with open(res_json_file, 'r') as file:
final_data = json.load(file)
for entry in final_data:
existing_hrefs.add(entry['href'])
except FileNotFoundError:
logging.info("detail.json not found, starting fresh.")
return existing_hrefs
# 请求网页并提取所需数据
def fetch_and_parse_page(url, scraper):
try:
response = scraper.get(url)
if response.status_code != 200:
logging.warning(f"Failed to fetch {url}, Status code: {response.status_code}")
return None, None
# 解析 HTML 内容
soup = BeautifulSoup(response.text, 'html.parser')
# 提取数据
data = {}
# 定义我们需要的字段名称和HTML中对应的标签
fields = {
'performer_aka': 'Performer AKA',
'birthday': 'Birthday',
'astrology': 'Astrology',
'birthplace': 'Birthplace',
'gender': 'Gender',
'years_active': 'Years Active',
'ethnicity': 'Ethnicity',
'nationality': 'Nationality',
'hair_colors': 'Hair Colors',
'eye_color': 'Eye Color',
'height': 'Height',
'weight': 'Weight',
'measurements': 'Measurements',
'tattoos': 'Tattoos',
'piercings': 'Piercings'
}
reversed_map = {v: k for k, v in fields.items()}
# 解析表格数据
movies = []
vixen_cnt = 0
blacked_cnt = 0
tushy_cnt = 0
x_art_cnt = 0
table = soup.find('table', id='personal')
if table:
# 找到thead并跳过
thead = table.find('thead')
if thead:
thead.decompose() # 去掉thead部分不需要解析
# 现在只剩下tbody部分
tbody = table.find('tbody')
rows = tbody.find_all('tr') if tbody else []
# rows = table.find_all('tr', class_='we')
for row in rows:
cols = row.find_all('td')
if len(cols) >= 6:
title = cols[0].text.strip()
year = cols[1].text.strip()
distributor = cols[2].text.strip().lower()
notes = cols[3].text.strip()
rev = cols[4].text.strip()
formats = cols[5].text.strip()
# 统计 distributor 中的关键词
if 'vixen' in distributor:
vixen_cnt += 1
if 'blacked' in distributor:
blacked_cnt += 1
if 'tushy' in distributor:
tushy_cnt += 1
if 'x_art' in distributor:
x_art_cnt += 1
movies.append({
'title': title,
'year': year,
'distributor': distributor,
'notes': notes,
'rev': rev,
'formats': formats
})
else:
logging.warning(f"movie table empty. ")
# 遍历每个 bioheading
bioheadings = soup.find_all('p', class_='bioheading')
for bio in bioheadings:
heading = bio.text.strip()
biodata = None
# 如果包含 "Performer",需要特殊处理
if 'Performer' in heading:
heading = 'Performer AKA'
biodata_div = bio.find_next('div', class_='biodata')
if biodata_div:
div_text = biodata_div.get_text(separator='|').strip()
biodata = [b.strip() for b in div_text.split('|') if b.strip()]
else:
biodata = bio.find_next('p', class_='biodata').text.strip() if bio.find_next('p', class_='biodata') else ''
# 保存数据
if heading in reversed_map:
kkey = reversed_map[heading]
data[kkey] = biodata
# 添加统计数据到 data
data['movies_cnt'] = len(movies)
data['vixen_cnt'] = vixen_cnt
data['blacked_cnt'] = blacked_cnt
data['tushy_cnt'] = tushy_cnt
data['x_art_cnt'] = x_art_cnt
return data, movies
except RequestException as e:
logging.error(f"Error fetching {url}: {e}")
return None, None
# 写入 detail.json
def write_to_detail_json(data):
with open(res_json_file, 'w', encoding='utf-8') as json_file:
json.dump(data, json_file, indent=4, ensure_ascii=False)
# 写入 CSV 文件
def write_to_csv(data):
try:
with open(res_csv_file, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile, delimiter=',')
header = ['person', 'href', 'performer_aka', 'birthday', 'astrology', 'birthplace', 'gender', 'years_active', 'ethnicity',
'nationality', 'hair_colors', 'eye_color', 'height', 'weight', 'measurements', 'tattoos', 'piercings',
'movies_cnt', 'vixen_cnt', 'blacked_cnt', 'tushy_cnt', 'x_art_cnt']
writer.writerow(header)
for entry in data:
# 确保 performer_aka 始终为列表类型
performer_aka = entry.get('performer_aka', [])
# 如果是 None 或非列表类型,转换为一个空列表
if performer_aka is None:
performer_aka = []
elif not isinstance(performer_aka, list):
performer_aka = [performer_aka]
writer.writerow([
entry.get('person', ''),
entry.get('href', ''),
'|'.join(performer_aka),
entry.get('birthday', ''),
entry.get('astrology', ''),
entry.get('birthplace', ''),
entry.get('gender', ''),
entry.get('years_active', ''),
entry.get('ethnicity', ''),
entry.get('nationality', ''),
entry.get('hair_colors', ''),
entry.get('eye_color', ''),
entry.get('height', ''),
entry.get('weight', ''),
entry.get('measurements', ''),
entry.get('tattoos', ''),
entry.get('piercings', ''),
entry.get('movies_cnt', 0),
entry.get('vixen_cnt', 0),
entry.get('blacked_cnt', 0),
entry.get('tushy_cnt', 0),
entry.get('x_art_cnt', 0)
])
except Exception as e:
logging.error(f"Error writing to CSV: {e}")
def handle_exit_signal(signal, frame):
logging.info("Gracefully exiting... Saving remaining data to Json and CSV.")
write_to_csv(final_data) # Ensure final data is written when exiting
write_to_detail_json(final_data)
sys.exit(0)
# 创建目录
def create_directory_for_person(person):
# 获取 person 的前两个字母并转为小写
person_dir = person[:1].lower()
full_path = os.path.join(performers_dir, person_dir)
if not os.path.exists(full_path):
os.makedirs(full_path)
return full_path
# 从 https://www.iafd.com/person.rme/id=21898a3c-1ddd-4793-8d93-375d6db20586 中抽取 id 的值
def extract_id_from_href(href):
"""从href中提取id参数"""
match = re.search(r'id=([a-f0-9\-]+)', href)
return match.group(1) if match else ''
# 写入每个 performer 的单独 JSON 文件
def write_person_json(person, href, data):
# 获取目录
person_dir = create_directory_for_person(person)
person_id = extract_id_from_href(href)
person_filename = f"{person.replace(' ', '-')}({person_id}).json" # 用 - 替换空格
full_path = os.path.join(person_dir, person_filename)
try:
with open(full_path, 'w', encoding='utf-8') as json_file:
json.dump(data, json_file, indent=4, ensure_ascii=False)
except Exception as e:
logging.error(f"Error writing file {full_path}: {e}")
def main():
# 初始化 cloudscraper
scraper = cloudscraper.create_scraper()
# 加载已存在的 href 列表
global final_data
existing_hrefs = load_existing_hrefs()
logging.info(f"load data from {res_json_file}, count: {len(final_data)}")
# 读取 merged.json
with open(input_json_file, 'r') as file:
merged_data = json.load(file)
# 遍历 merged.json 中的数据
loop = 0
for entry in merged_data:
href = entry.get('href')
person = entry.get('person')
if href in existing_hrefs:
logging.info(f"Skipping {href} - already processed")
continue
logging.info(f"Processing {href} - {person}")
# 获取并解析数据
while True:
data, movies = fetch_and_parse_page(href, scraper)
if data is None:
logging.warning(f'Retring {href} - {person} ')
time.sleep(3)
else:
break
# 如果数据正确,加入到 final_data
final_data.append({
'href': href,
'person': person,
**data
})
# 写入 performer 的独立 JSON 文件
full_data = {
'href': href,
'person': person,
**data,
'movies': movies if movies else []
}
write_person_json(person.strip(), href, full_data)
# 更新 detail.json 文件
loop = loop + 1
if loop % 100 == 0:
logging.info(f'flush data to json file. now fetched data count: {loop}, total count: {len(final_data)}')
write_to_detail_json(final_data)
write_to_csv(final_data)
# 更新已存在的 href
existing_hrefs.add(href)
# 延时,防止请求过快被封锁
time.sleep(1)
if __name__ == "__main__":
try:
# 注册退出信号
signal.signal(signal.SIGINT, handle_exit_signal) # Handle Ctrl+C
signal.signal(signal.SIGTERM, handle_exit_signal) # Handle kill signal
main()
finally:
# 清理操作,保证在程序正常退出时执行
write_to_csv(final_data) # Write to CSV or other necessary tasks
write_to_detail_json(final_data) # Save data to JSON
logging.info("Data processing completed.")