This repository has been archived on 2026-01-07. You can view files and clone it, but cannot push or open issues or pull requests.
Files
resources/thelordofporn/src/actress_fetch.py
2025-04-02 08:32:21 +08:00

225 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Script Name:
Description: 从 thelordofporn.com 上获取女优列表,并逐个获取女优详细信息。
由于网站使用了cloudflare, 无法直接爬取,使用 cloudscraper 绕过限制。
list_fetch.py 从网站上获取列表, 并以json的形式把结果输出到本地文件, 同时生成csv文件;
actress_fetch.py 则把上一步获取到的列表,读取详情页面,合并进来一些详细信息。
Author: [Your Name]
Created Date: YYYY-MM-DD
Last Modified: YYYY-MM-DD
Version: 1.0
Modification History:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
"""
import json
import csv
import os
import re
import time
import random
import cloudscraper
from bs4 import BeautifulSoup
import config
# 文件路径
DIR_RES = config.global_host_data_dir
ACTRESSES_FILE = f"{DIR_RES}/actresses.json"
DETAILS_JSON_FILE = f"{DIR_RES}/thelordofporn_pornstars.json"
DETAILS_CSV_FILE = f"{DIR_RES}/thelordofporn_pornstars.csv"
# 请求头和 Cookies模拟真实浏览器
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
}
COOKIES = {
"cf_clearance": "your_clearance_token_here" # 需要根据 Cloudflare 的验证情况更新
}
# 解析出生日期和地点
def parse_birth_info(text):
match = re.match(r"(.+?) (\d{1,2}), (\d{4}) in (.+)", text)
if match:
return {
"birth_date": f"{match.group(1)} {match.group(2)}, {match.group(3)}",
"birth_year": match.group(3),
"birth_place": match.group(4),
}
return {"birth_date": text, "birth_year": "", "birth_place": ""}
# 解析身高
def parse_height(text):
match = re.match(r"(\d+)\s*ft\s*(\d*)\s*in\s*\((\d+)\s*cm\)", text)
if match:
height_ft = f"{match.group(1)}'{match.group(2)}\""
return {"height_ft": height_ft.strip(), "height_cm": match.group(3)}
return {"height_ft": text, "height_cm": ""}
# 解析体重
def parse_weight(text):
match = re.match(r"(\d+)\s*lbs\s*\((\d+)\s*kg\)", text)
if match:
return {"weight_lbs": match.group(1), "weight_kg": match.group(2)}
return {"weight_lbs": text, "weight_kg": ""}
# 解析网页内容
def parse_page(actress, html):
soup = BeautifulSoup(html, "html.parser")
# 确保页面结构正确
if not soup.find("main", {"id": "content", "class": "site-content"}):
return None
# 提取基本信息
entry_header = soup.find("header", class_="entry-header")
name_el = entry_header.find("h1", class_="entry-title") if entry_header else None
name = name_el.text.strip() if name_el else ""
date_modified_el = soup.find("time", itemprop="dateModified")
if date_modified_el:
date_modified = date_modified_el.get("content", "").strip()
else:
date_modified = ""
# 提取 metadata
global_rank = ""
weekly_rank = ""
last_month_rating = ""
current_rating = ""
total_votes = ""
for div in entry_header.find_all("div", class_="porn-star-rank__item"):
text = div.text.strip()
if "Global Rank" in text:
global_rank = div.find("b").text.strip()
elif "Weekly Rank" in text:
weekly_rank = div.find("b").text.strip()
for item in soup.find_all("div", class_="specifications__item--horizontal"):
text = item.text.strip()
if "Last Month" in text:
last_month_rating = item.find("b").text.strip()
elif "Rating Av." in text:
current_rating = item.find("b").text.strip()
elif "Total of" in text:
total_votes = item.find("b").text.strip()
# 解析详细属性
attributes = {}
for row in soup.find_all("div", class_="specifications-grid-row"):
items = row.find_all("div", class_="specifications-grid-item")
if len(items) == 2:
label = items[0].find("h5").text.strip()
value = items[0].find("span").text.strip()
attributes[label] = value
label2 = items[1].find("h5").text.strip()
value2 = items[1].find("span").text.strip()
attributes[label2] = value2
# 解析出生信息、身高、体重等
birth_info = parse_birth_info(attributes.get("Born", ""))
height_info = parse_height(attributes.get("Height", ""))
weight_info = parse_weight(attributes.get("Weight", ""))
return {
"pornstar": actress['pornstar'],
"rating": actress['rating'],
"rank": actress['rank'],
"votes": actress['votes'],
"href": actress['href'],
'name': name,
"alias": attributes.get("Name", ""),
"career_start": attributes.get("Career start", ""),
"measurements": attributes.get("Measurements", ""),
"born": attributes.get("Born", ""),
"height": attributes.get("Height", ""),
"weight": attributes.get("Weight", ""),
"date_modified": date_modified,
"global_rank": global_rank,
"weekly_rank": weekly_rank,
"last_month_rating": last_month_rating,
"current_rating": current_rating,
"total_votes": total_votes,
**birth_info,
**height_info,
**weight_info,
}
# 读取已处理数据
def load_existing_data():
if os.path.exists(DETAILS_JSON_FILE):
with open(DETAILS_JSON_FILE, "r", encoding="utf-8") as f:
return {item["pornstar"]: item for item in json.load(f)}
return {}
# 访问页面
def fetch_page(url):
scraper = cloudscraper.create_scraper()
for _ in range(500): # 最多重试5次
try:
response = scraper.get(url, headers=HEADERS, cookies=COOKIES, timeout=10)
if response.status_code == 200 and "specifications-grid-row" in response.text:
return response.text
except Exception as e:
print(f"请求 {url} 失败,错误: {e}")
time.sleep(random.uniform(2, 5)) # 随机延迟
return None
# 处理数据并保存
def process_data():
with open(ACTRESSES_FILE, "r", encoding="utf-8") as f:
actresses = json.load(f)
existing_data = load_existing_data()
updated_data = list(existing_data.values())
for actress in actresses:
name, url = actress["pornstar"], actress["href"]
if name in existing_data:
print(f"跳过已处理: {name}")
continue
print(f"正在处理: {name} - {url}")
html = fetch_page(url)
if not html:
print(f"无法获取页面: {url}")
continue
details = parse_page(actress, html)
if details:
updated_data.append(details)
existing_data[name] = details
with open(DETAILS_JSON_FILE, "w", encoding="utf-8") as jsonfile:
json.dump(updated_data, jsonfile, indent=4, ensure_ascii=False)
# 从 JSON 生成 CSV
def json_to_csv():
if not os.path.exists(DETAILS_JSON_FILE):
print("没有 JSON 文件,跳过 CSV 生成")
return
with open(DETAILS_JSON_FILE, "r", encoding="utf-8") as jsonfile:
data = json.load(jsonfile)
fieldnames = data[0].keys()
with open(DETAILS_CSV_FILE, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
if __name__ == '__main__':
# 确保目录存在
os.makedirs(DIR_RES, exist_ok=True)
process_data()
json_to_csv()
print("数据处理完成!")