Files
stock/scripts/iafd/src_json/performers_details.py
2025-03-03 09:02:59 +08:00

393 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Script Name:
Description: 从 https://www.iafd.com 上获取信息。利用cloudscraper绕过cloudflare
detail_fetch.py 从本地已经保存的列表数据,逐个拉取详情,并输出到文件。
list_fetch_astro.py 按照星座拉取数据,获得演员的信息列表。数据量适中,各详细字段较全
list_fetch_birth.py 按照生日拉取数据,获得演员的信息列表。数据量适中,各详细字段较全
list_fetch_ethnic.py 按照人种拉取数据,获得演员的信息列表。数据量大,但详细字段很多无效的
list_merge.py 上面三个列表的数据,取交集,得到整体数据。
iafd_scrape.py 借助 https://github.com/stashapp/CommunityScrapers 实现的脚本,可以输入演员的 iafd链接,获取兼容 stashapp 格式的数据。(作用不大,因为国籍、照片等字段不匹配)
html_format.py 负责读取已经保存的html目录, 提取信息,格式化输出。
data_merge.py 负责合并数据,它把从 iafd, javhd, thelordofporn 以及搭建 stashapp, 从上面更新到的演员数据(需导出)进行合并;
stashdb_merge.py 负责把从stashapp中导出的单个演员的json文件, 批量合并并输出; 通常我们需要把stashapp中导出的批量文件压缩并传输到data/tmp目录,解压后合并
从而获取到一份完整的数据列表。
Author: [Your Name]
Created Date: YYYY-MM-DD
Last Modified: YYYY-MM-DD
Version: 1.0
Modification History:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
"""
import cloudscraper
import time
import json
import csv
import logging
import signal
import sys
import os
import re
from bs4 import BeautifulSoup
from requests.exceptions import RequestException
import config
# 配置日志
config.setup_logging()
# 结果路径
res_dir = '../result'
res_json_file = f'{res_dir}/detail.json'
res_csv_file = f'{res_dir}/detail.csv'
input_json_file = f'{res_dir}/merged.json'
performers_dir = f'{res_dir}/performers'
# 存储结果
final_data = []
# 读取 detail.json 中的 数据,以便于断点续传
def load_existing_hrefs():
existing_hrefs = set()
global final_data
try:
with open(res_json_file, 'r') as file:
final_data = json.load(file)
for entry in final_data:
existing_hrefs.add(entry['href'])
except FileNotFoundError:
logging.info("detail.json not found, starting fresh.")
return existing_hrefs
# 解析 作品列表,有个人出演,也有导演的
def parse_credits_table(table, distributor_list):
# 找到thead并跳过
thead = table.find('thead')
if thead:
thead.decompose() # 去掉thead部分不需要解析
# 现在只剩下tbody部分
tbody = table.find('tbody')
rows = tbody.find_all('tr') if tbody else []
movies = []
distributor_count = {key: 0 for key in distributor_list} # 初始化每个 distributor 的计数
# rows = table.find_all('tr', class_='we')
for row in rows:
cols = row.find_all('td')
if len(cols) >= 6:
title = cols[0].text.strip()
year = cols[1].text.strip()
distributor = cols[2].text.strip().lower()
notes = cols[3].text.strip()
rev = cols[4].text.strip()
formats = cols[5].text.strip()
for key in distributor_list:
if key in distributor:
distributor_count[key] += 1
movies.append({
'title': title,
'year': year,
'distributor': distributor,
'notes': notes,
'rev': rev,
'formats': formats
})
return movies, distributor_count
# 请求网页并提取所需数据
def fetch_and_parse_page(url, scraper):
try:
response = scraper.get(url)
if response.status_code != 200:
logging.warning(f"Failed to fetch {url}, Status code: {response.status_code}")
return None, None
# 解析 HTML 内容
soup = BeautifulSoup(response.text, 'html.parser')
# 提取数据
data = {}
# 定义我们需要的字段名称和HTML中对应的标签
fields = {
'performer_aka': 'Performer AKA',
'birthday': 'Birthday',
'astrology': 'Astrology',
'birthplace': 'Birthplace',
'gender': 'Gender',
'years_active': 'Years Active',
'ethnicity': 'Ethnicity',
'nationality': 'Nationality',
'hair_colors': 'Hair Colors',
'eye_color': 'Eye Color',
'height': 'Height',
'weight': 'Weight',
'measurements': 'Measurements',
'tattoos': 'Tattoos',
'piercings': 'Piercings'
}
reversed_map = {v: k for k, v in fields.items()}
# 解析表格数据, 获取参演或者导演的列表
role_list = ['personal', 'directoral']
distributor_list = ['vixen', 'blacked', 'tushy', 'x-art']
credits_list = {}
# 使用字典来存储统计
distributor_count = {key: 0 for key in distributor_list} # 初始化每个 distributor 的计数
for role in role_list:
table = soup.find('table', id=role)
if table :
movies, stat_map = parse_credits_table(table, distributor_list)
credits_list[role] = movies
# 更新 distributor 统计
for distributor in distributor_list:
distributor_count[distributor] += stat_map.get(distributor, 0)
# 统计 movies 数量
#movies_cnt = sum(len(credits_list[role]) for role in role_list if credits_list[role])
movies_cnt = sum(len(credits_list.get(role, [])) for role in role_list if credits_list.get(role, []))
# 如果没有找到
if len(credits_list) == 0 :
logging.warning(f"movie table empty. url: {url} ")
# 遍历每个 bioheading, 获取metadata
bioheadings = soup.find_all('p', class_='bioheading')
for bio in bioheadings:
heading = bio.text.strip()
biodata = None
# 如果包含 "Performer",需要特殊处理
if 'Performer' in heading:
heading = 'Performer AKA'
biodata_div = bio.find_next('div', class_='biodata')
if biodata_div:
div_text = biodata_div.get_text(separator='|').strip()
biodata = [b.strip() for b in div_text.split('|') if b.strip()]
else:
biodata = bio.find_next('p', class_='biodata').text.strip() if bio.find_next('p', class_='biodata') else ''
# 保存数据
if heading in reversed_map:
kkey = reversed_map[heading]
data[kkey] = biodata
# 添加统计数据到 data
data['movies_cnt'] = movies_cnt
data['vixen_cnt'] = distributor_count['vixen']
data['blacked_cnt'] = distributor_count['blacked']
data['tushy_cnt'] = distributor_count['tushy']
data['x_art_cnt'] = distributor_count['x-art']
return data, credits_list
except RequestException as e:
logging.error(f"Error fetching {url}: {e}")
return None, None
# 写入 detail.json
def write_to_detail_json(data):
with open(res_json_file, 'w', encoding='utf-8') as json_file:
json.dump(data, json_file, indent=4, ensure_ascii=False)
# 写入 CSV 文件
def write_to_csv(data):
try:
with open(res_csv_file, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile, delimiter=',')
header = ['person', 'href', 'performer_aka', 'birthday', 'astrology', 'birthplace', 'gender', 'years_active', 'ethnicity',
'nationality', 'hair_colors', 'eye_color', 'height', 'weight', 'measurements', 'tattoos', 'piercings',
'movies_cnt', 'vixen_cnt', 'blacked_cnt', 'tushy_cnt', 'x_art_cnt']
writer.writerow(header)
for entry in data:
# 确保 performer_aka 始终为列表类型
performer_aka = entry.get('performer_aka', [])
# 如果是 None 或非列表类型,转换为一个空列表
if performer_aka is None:
performer_aka = []
elif not isinstance(performer_aka, list):
performer_aka = [performer_aka]
writer.writerow([
entry.get('person', ''),
entry.get('href', ''),
'|'.join(performer_aka),
entry.get('birthday', ''),
entry.get('astrology', ''),
entry.get('birthplace', ''),
entry.get('gender', ''),
entry.get('years_active', ''),
entry.get('ethnicity', ''),
entry.get('nationality', ''),
entry.get('hair_colors', ''),
entry.get('eye_color', ''),
entry.get('height', ''),
entry.get('weight', ''),
entry.get('measurements', ''),
entry.get('tattoos', ''),
entry.get('piercings', ''),
entry.get('movies_cnt', 0),
entry.get('vixen_cnt', 0),
entry.get('blacked_cnt', 0),
entry.get('tushy_cnt', 0),
entry.get('x_art_cnt', 0)
])
except Exception as e:
logging.error(f"Error writing to CSV: {e}")
def handle_exit_signal(signal, frame):
logging.info("Gracefully exiting... Saving remaining data to Json and CSV.")
write_to_csv(final_data) # Ensure final data is written when exiting
write_to_detail_json(final_data)
sys.exit(0)
# 创建目录
def create_directory_for_person(person):
# 获取 person 的前两个字母并转为小写
person_dir = person[:1].lower()
full_path = os.path.join(performers_dir, person_dir)
if not os.path.exists(full_path):
os.makedirs(full_path)
return full_path
# 从 https://www.iafd.com/person.rme/id=21898a3c-1ddd-4793-8d93-375d6db20586 中抽取 id 的值
def extract_id_from_href(href):
"""从href中提取id参数"""
match = re.search(r'id=([a-f0-9\-]+)', href)
return match.group(1) if match else ''
# 写入每个 performer 的单独 JSON 文件
def write_person_json(person, href, data):
# 获取目录
person_dir = create_directory_for_person(person)
person_id = extract_id_from_href(href)
person_filename = f"{person.replace(' ', '-')}({person_id}).json" # 用 - 替换空格
full_path = os.path.join(person_dir, person_filename)
try:
with open(full_path, 'w', encoding='utf-8') as json_file:
json.dump(data, json_file, indent=4, ensure_ascii=False)
except Exception as e:
logging.error(f"Error writing file {full_path}: {e}")
# 指定url访问
def process_one(href):
# 初始化 cloudscraper
scraper = cloudscraper.create_scraper()
# 获取并解析数据
while True:
data, movies = fetch_and_parse_page(href, scraper)
if data is None:
logging.warning(f'Retring {href} ')
time.sleep(3)
else:
break
# 写入 performer 的独立 JSON 文件
full_data = {
**data,
'credits': movies if movies else {}
}
person_id = extract_id_from_href(href)
person_filename = f"{person_id}.json" # 用 - 替换空格
try:
with open(person_filename, 'w', encoding='utf-8') as json_file:
json.dump(full_data, json_file, indent=4, ensure_ascii=False)
except Exception as e:
logging.error(f"Error writing file {person_filename}: {e}")
print(f'fetch succ. saved result in {person_filename}')
def process_all():
# 初始化 cloudscraper
scraper = cloudscraper.create_scraper()
# 加载已存在的 href 列表
global final_data
existing_hrefs = load_existing_hrefs()
logging.info(f"load data from {res_json_file}, count: {len(final_data)}")
# 读取 merged.json
with open(input_json_file, 'r') as file:
merged_data = json.load(file)
# 遍历 merged.json 中的数据
loop = 0
for entry in merged_data:
href = entry.get('href')
person = entry.get('person')
if href in existing_hrefs:
logging.info(f"Skipping {href} - already processed")
continue
logging.info(f"Processing {href} - {person}")
# 获取并解析数据
while True:
data, credits = fetch_and_parse_page(href, scraper)
if data is None:
logging.warning(f'Retring {href} - {person} ')
time.sleep(3)
else:
break
# 如果数据正确,加入到 final_data
final_data.append({
'href': href,
'person': person,
**data
})
# 写入 performer 的独立 JSON 文件
full_data = {
'href': href,
'person': person,
**data,
'credits': credits if credits else {}
}
write_person_json(person.strip(), href, full_data)
# 更新 detail.json 文件
loop = loop + 1
if loop % 100 == 0:
logging.info(f'flush data to json file. now fetched data count: {loop}, total count: {len(final_data)}')
write_to_detail_json(final_data)
write_to_csv(final_data)
# 更新已存在的 href
existing_hrefs.add(href)
# 延时,防止请求过快被封锁
time.sleep(1)
# 全量访问
def main():
try:
# 注册退出信号
signal.signal(signal.SIGINT, handle_exit_signal) # Handle Ctrl+C
signal.signal(signal.SIGTERM, handle_exit_signal) # Handle kill signal
process_all()
finally:
# 清理操作,保证在程序正常退出时执行
write_to_csv(final_data) # Write to CSV or other necessary tasks
write_to_detail_json(final_data) # Save data to JSON
logging.info("Data processing completed.")
if __name__ == "__main__":
if len(sys.argv) > 1:
url = sys.argv[1]
process_one(url)
else:
main()