""" Script Name: Description: 从 thelordofporn.com 上获取女优列表,并逐个获取女优详细信息。 由于网站使用了cloudflare, 无法直接爬取,使用 cloudscraper 绕过限制。 list_fetch.py 从网站上获取列表, 并以json的形式把结果输出到本地文件, 同时生成csv文件; actress_fetch.py 则把上一步获取到的列表,读取详情页面,合并进来一些详细信息。 Author: [Your Name] Created Date: YYYY-MM-DD Last Modified: YYYY-MM-DD Version: 1.0 Modification History: - YYYY-MM-DD [Your Name]: - YYYY-MM-DD [Your Name]: - YYYY-MM-DD [Your Name]: """ import json import csv import os import re import time import random import cloudscraper from bs4 import BeautifulSoup import config # 文件路径 DIR_RES = config.global_host_data_dir ACTRESSES_FILE = f"{DIR_RES}/actresses.json" DETAILS_JSON_FILE = f"{DIR_RES}/thelordofporn_pornstars.json" DETAILS_CSV_FILE = f"{DIR_RES}/thelordofporn_pornstars.csv" # 请求头和 Cookies(模拟真实浏览器) HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", "Accept-Language": "en-US,en;q=0.9", } COOKIES = { "cf_clearance": "your_clearance_token_here" # 需要根据 Cloudflare 的验证情况更新 } # 解析出生日期和地点 def parse_birth_info(text): match = re.match(r"(.+?) (\d{1,2}), (\d{4}) in (.+)", text) if match: return { "birth_date": f"{match.group(1)} {match.group(2)}, {match.group(3)}", "birth_year": match.group(3), "birth_place": match.group(4), } return {"birth_date": text, "birth_year": "", "birth_place": ""} # 解析身高 def parse_height(text): match = re.match(r"(\d+)\s*ft\s*(\d*)\s*in\s*\((\d+)\s*cm\)", text) if match: height_ft = f"{match.group(1)}'{match.group(2)}\"" return {"height_ft": height_ft.strip(), "height_cm": match.group(3)} return {"height_ft": text, "height_cm": ""} # 解析体重 def parse_weight(text): match = re.match(r"(\d+)\s*lbs\s*\((\d+)\s*kg\)", text) if match: return {"weight_lbs": match.group(1), "weight_kg": match.group(2)} return {"weight_lbs": text, "weight_kg": ""} # 解析网页内容 def parse_page(actress, html): soup = BeautifulSoup(html, "html.parser") # 确保页面结构正确 if not soup.find("main", {"id": "content", "class": "site-content"}): return None # 提取基本信息 entry_header = soup.find("header", class_="entry-header") name_el = entry_header.find("h1", class_="entry-title") if entry_header else None name = name_el.text.strip() if name_el else "" date_modified_el = soup.find("time", itemprop="dateModified") if date_modified_el: date_modified = date_modified_el.get("content", "").strip() else: date_modified = "" # 提取 metadata global_rank = "" weekly_rank = "" last_month_rating = "" current_rating = "" total_votes = "" for div in entry_header.find_all("div", class_="porn-star-rank__item"): text = div.text.strip() if "Global Rank" in text: global_rank = div.find("b").text.strip() elif "Weekly Rank" in text: weekly_rank = div.find("b").text.strip() for item in soup.find_all("div", class_="specifications__item--horizontal"): text = item.text.strip() if "Last Month" in text: last_month_rating = item.find("b").text.strip() elif "Rating Av." in text: current_rating = item.find("b").text.strip() elif "Total of" in text: total_votes = item.find("b").text.strip() # 解析详细属性 attributes = {} for row in soup.find_all("div", class_="specifications-grid-row"): items = row.find_all("div", class_="specifications-grid-item") if len(items) == 2: label = items[0].find("h5").text.strip() value = items[0].find("span").text.strip() attributes[label] = value label2 = items[1].find("h5").text.strip() value2 = items[1].find("span").text.strip() attributes[label2] = value2 # 解析出生信息、身高、体重等 birth_info = parse_birth_info(attributes.get("Born", "")) height_info = parse_height(attributes.get("Height", "")) weight_info = parse_weight(attributes.get("Weight", "")) return { "pornstar": actress['pornstar'], "rating": actress['rating'], "rank": actress['rank'], "votes": actress['votes'], "href": actress['href'], 'name': name, "alias": attributes.get("Name", ""), "career_start": attributes.get("Career start", ""), "measurements": attributes.get("Measurements", ""), "born": attributes.get("Born", ""), "height": attributes.get("Height", ""), "weight": attributes.get("Weight", ""), "date_modified": date_modified, "global_rank": global_rank, "weekly_rank": weekly_rank, "last_month_rating": last_month_rating, "current_rating": current_rating, "total_votes": total_votes, **birth_info, **height_info, **weight_info, } # 读取已处理数据 def load_existing_data(): if os.path.exists(DETAILS_JSON_FILE): with open(DETAILS_JSON_FILE, "r", encoding="utf-8") as f: return {item["pornstar"]: item for item in json.load(f)} return {} # 访问页面 def fetch_page(url): scraper = cloudscraper.create_scraper() for _ in range(500): # 最多重试5次 try: response = scraper.get(url, headers=HEADERS, cookies=COOKIES, timeout=10) if response.status_code == 200 and "specifications-grid-row" in response.text: return response.text except Exception as e: print(f"请求 {url} 失败,错误: {e}") time.sleep(random.uniform(2, 5)) # 随机延迟 return None # 处理数据并保存 def process_data(): with open(ACTRESSES_FILE, "r", encoding="utf-8") as f: actresses = json.load(f) existing_data = load_existing_data() updated_data = list(existing_data.values()) for actress in actresses: name, url = actress["pornstar"], actress["href"] if name in existing_data: print(f"跳过已处理: {name}") continue print(f"正在处理: {name} - {url}") html = fetch_page(url) if not html: print(f"无法获取页面: {url}") continue details = parse_page(actress, html) if details: updated_data.append(details) existing_data[name] = details with open(DETAILS_JSON_FILE, "w", encoding="utf-8") as jsonfile: json.dump(updated_data, jsonfile, indent=4, ensure_ascii=False) # 从 JSON 生成 CSV def json_to_csv(): if not os.path.exists(DETAILS_JSON_FILE): print("没有 JSON 文件,跳过 CSV 生成") return with open(DETAILS_JSON_FILE, "r", encoding="utf-8") as jsonfile: data = json.load(jsonfile) fieldnames = data[0].keys() with open(DETAILS_CSV_FILE, "w", newline="", encoding="utf-8") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() writer.writerows(data) if __name__ == '__main__': # 确保目录存在 os.makedirs(DIR_RES, exist_ok=True) process_data() json_to_csv() print("数据处理完成!")