modify scripts

This commit is contained in:
oscarz
2025-06-03 15:13:55 +08:00
parent e97f49bfb9
commit a4c4fa39d0
10 changed files with 808 additions and 6 deletions

View File

@ -1,225 +0,0 @@
"""
Script Name:
Description: 从 thelordofporn.com 上获取女优列表,并逐个获取女优详细信息。
由于网站使用了cloudflare, 无法直接爬取,使用 cloudscraper 绕过限制。
list_fetch.py 从网站上获取列表, 并以json的形式把结果输出到本地文件, 同时生成csv文件;
actress_fetch.py 则把上一步获取到的列表,读取详情页面,合并进来一些详细信息。
Author: [Your Name]
Created Date: YYYY-MM-DD
Last Modified: YYYY-MM-DD
Version: 1.0
Modification History:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
"""
import json
import csv
import os
import re
import time
import random
import cloudscraper
from bs4 import BeautifulSoup
import config
# 文件路径
DIR_RES = config.global_host_data_dir
ACTRESSES_FILE = f"{DIR_RES}/actresses.json"
DETAILS_JSON_FILE = f"{DIR_RES}/thelordofporn_pornstars.json"
DETAILS_CSV_FILE = f"{DIR_RES}/thelordofporn_pornstars.csv"
# 请求头和 Cookies模拟真实浏览器
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
}
COOKIES = {
"cf_clearance": "your_clearance_token_here" # 需要根据 Cloudflare 的验证情况更新
}
# 解析出生日期和地点
def parse_birth_info(text):
match = re.match(r"(.+?) (\d{1,2}), (\d{4}) in (.+)", text)
if match:
return {
"birth_date": f"{match.group(1)} {match.group(2)}, {match.group(3)}",
"birth_year": match.group(3),
"birth_place": match.group(4),
}
return {"birth_date": text, "birth_year": "", "birth_place": ""}
# 解析身高
def parse_height(text):
match = re.match(r"(\d+)\s*ft\s*(\d*)\s*in\s*\((\d+)\s*cm\)", text)
if match:
height_ft = f"{match.group(1)}'{match.group(2)}\""
return {"height_ft": height_ft.strip(), "height_cm": match.group(3)}
return {"height_ft": text, "height_cm": ""}
# 解析体重
def parse_weight(text):
match = re.match(r"(\d+)\s*lbs\s*\((\d+)\s*kg\)", text)
if match:
return {"weight_lbs": match.group(1), "weight_kg": match.group(2)}
return {"weight_lbs": text, "weight_kg": ""}
# 解析网页内容
def parse_page(actress, html):
soup = BeautifulSoup(html, "html.parser")
# 确保页面结构正确
if not soup.find("main", {"id": "content", "class": "site-content"}):
return None
# 提取基本信息
entry_header = soup.find("header", class_="entry-header")
name_el = entry_header.find("h1", class_="entry-title") if entry_header else None
name = name_el.text.strip() if name_el else ""
date_modified_el = soup.find("time", itemprop="dateModified")
if date_modified_el:
date_modified = date_modified_el.get("content", "").strip()
else:
date_modified = ""
# 提取 metadata
global_rank = ""
weekly_rank = ""
last_month_rating = ""
current_rating = ""
total_votes = ""
for div in entry_header.find_all("div", class_="porn-star-rank__item"):
text = div.text.strip()
if "Global Rank" in text:
global_rank = div.find("b").text.strip()
elif "Weekly Rank" in text:
weekly_rank = div.find("b").text.strip()
for item in soup.find_all("div", class_="specifications__item--horizontal"):
text = item.text.strip()
if "Last Month" in text:
last_month_rating = item.find("b").text.strip()
elif "Rating Av." in text:
current_rating = item.find("b").text.strip()
elif "Total of" in text:
total_votes = item.find("b").text.strip()
# 解析详细属性
attributes = {}
for row in soup.find_all("div", class_="specifications-grid-row"):
items = row.find_all("div", class_="specifications-grid-item")
if len(items) == 2:
label = items[0].find("h5").text.strip()
value = items[0].find("span").text.strip()
attributes[label] = value
label2 = items[1].find("h5").text.strip()
value2 = items[1].find("span").text.strip()
attributes[label2] = value2
# 解析出生信息、身高、体重等
birth_info = parse_birth_info(attributes.get("Born", ""))
height_info = parse_height(attributes.get("Height", ""))
weight_info = parse_weight(attributes.get("Weight", ""))
return {
"pornstar": actress['pornstar'],
"rating": actress['rating'],
"rank": actress['rank'],
"votes": actress['votes'],
"href": actress['href'],
'name': name,
"alias": attributes.get("Name", ""),
"career_start": attributes.get("Career start", ""),
"measurements": attributes.get("Measurements", ""),
"born": attributes.get("Born", ""),
"height": attributes.get("Height", ""),
"weight": attributes.get("Weight", ""),
"date_modified": date_modified,
"global_rank": global_rank,
"weekly_rank": weekly_rank,
"last_month_rating": last_month_rating,
"current_rating": current_rating,
"total_votes": total_votes,
**birth_info,
**height_info,
**weight_info,
}
# 读取已处理数据
def load_existing_data():
if os.path.exists(DETAILS_JSON_FILE):
with open(DETAILS_JSON_FILE, "r", encoding="utf-8") as f:
return {item["pornstar"]: item for item in json.load(f)}
return {}
# 访问页面
def fetch_page(url):
scraper = cloudscraper.create_scraper()
for _ in range(500): # 最多重试5次
try:
response = scraper.get(url, headers=HEADERS, cookies=COOKIES, timeout=10)
if response.status_code == 200 and "specifications-grid-row" in response.text:
return response.text
except Exception as e:
print(f"请求 {url} 失败,错误: {e}")
time.sleep(random.uniform(2, 5)) # 随机延迟
return None
# 处理数据并保存
def process_data():
with open(ACTRESSES_FILE, "r", encoding="utf-8") as f:
actresses = json.load(f)
existing_data = load_existing_data()
updated_data = list(existing_data.values())
for actress in actresses:
name, url = actress["pornstar"], actress["href"]
if name in existing_data:
print(f"跳过已处理: {name}")
continue
print(f"正在处理: {name} - {url}")
html = fetch_page(url)
if not html:
print(f"无法获取页面: {url}")
continue
details = parse_page(actress, html)
if details:
updated_data.append(details)
existing_data[name] = details
with open(DETAILS_JSON_FILE, "w", encoding="utf-8") as jsonfile:
json.dump(updated_data, jsonfile, indent=4, ensure_ascii=False)
# 从 JSON 生成 CSV
def json_to_csv():
if not os.path.exists(DETAILS_JSON_FILE):
print("没有 JSON 文件,跳过 CSV 生成")
return
with open(DETAILS_JSON_FILE, "r", encoding="utf-8") as jsonfile:
data = json.load(jsonfile)
fieldnames = data[0].keys()
with open(DETAILS_CSV_FILE, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
if __name__ == '__main__':
# 确保目录存在
os.makedirs(DIR_RES, exist_ok=True)
process_data()
json_to_csv()
print("数据处理完成!")

View File

@ -6,16 +6,16 @@ from datetime import datetime
from logging.handlers import RotatingFileHandler
from collections import defaultdict
# 映射到宿主机的目录
home_dir = os.path.expanduser("~")
global_host_data_dir = f'{home_dir}/hostdir/scripts_data/thelordofporn'
global_host_data_dir = f'{home_dir}/hostdir/scripts_data'
global_share_data_dir = f'{home_dir}/sharedata'
log_dir = '../log'
# 统计日志频率
log_count = defaultdict(int) # 记录日志的次数
last_log_time = defaultdict(float) # 记录上次写入的时间戳
log_dir = '../log'
class RateLimitFilter(logging.Filter):
"""
频率限制过滤器:
@ -43,8 +43,7 @@ class RateLimitFilter(logging.Filter):
last_log_time[message_key] = now
return True # 允许写入日志
return True # 允许写入日志
def setup_logging(log_filename=None):

198
thelordofporn/src/fetch.py Normal file
View File

@ -0,0 +1,198 @@
import json
import time
import csv
import argparse
import textwrap
import logging
from functools import partial
import config
import sqlite_utils as db_tools
import scraper
import utils
from urllib.parse import urljoin, urlparse
config.setup_logging()
debug = False
skip_local = False
scan_mode = 0
update_mode = 0
# 获取演员列表
def fetch_actor_list():
next_url = scraper.pornstar_url
while next_url:
logging.info(f"fetching url {next_url}")
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="main", identifier="content", attr_type="id"))
if soup:
list_data, next_url = scraper.parse_actor_list(soup, next_url)
if list_data :
# 插入到数据库
for row in list_data:
row_id = db_tools.insert_actor_index(row)
if row_id:
logging.debug(f"insert or update one row. row id: {row_id}, data: {row}")
else:
logging.warning(f"insert or update actor failed. data: {row}")
else:
logging.warning(f"parse_actor_list failed. url: {next_url} ")
elif status_code and status_code == scraper.http_code_404:
logging.warning(f'404 page. url: {next_url}')
break
elif status_code and status_code == scraper.http_code_login:
logging.warning(f'401 page(need login). url: {next_url}')
break
else:
logging.warning(f'fetch_page error. url: {next_url}')
if debug:
break
logging.info(f"fetch actor list finished.")
# 更新演员信息
def fetch_performers_detail():
limit_count = 5 if debug else 100
performers_list = []
last_performer_id = 0
abnormal_codes = [scraper.http_code_404, scraper.http_code_login]
def get_performers(**kwargs):
kwargs["order_by"] = 'id asc'
return db_tools.query_actors(limit=limit_count, **kwargs)
while True:
if update_mode == 0: # 只遍历新纪录
performers_list = get_performers(start_id=0, is_full_data=0)
elif update_mode == 1: # 只遍历完整纪录
performers_list = get_performers(start_id=last_performer_id, is_full_data=1)
elif update_mode == 2: # 0+1
performers_list = get_performers(start_id=last_performer_id, is_full_data_not_in=abnormal_codes)
elif update_mode == 3: # 其他
performers_list = get_performers(start_id=last_performer_id, is_full_data_in =abnormal_codes)
else: # 全部
performers_list = get_performers(start_id=last_performer_id)
if len(performers_list) < 1:
logging.info(f'all performers fetched.')
break
succ_rows = 0
for performer in performers_list:
url = performer['href']
person = performer['name']
next_url = url
need_insert = True
while next_url:
logging.debug(f"Fetching data for actor ({person}), url {next_url} ...")
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="main", identifier="content", attr_type="id"))
if soup:
data, next_url = scraper.parse_actor_detail(soup, next_url)
if data:
# 获取完了个人的所有影片,开始插入数据
performer_id = db_tools.update_actor_detail(data, is_full_data=1)
if performer_id:
logging.debug(f'insert one person, id: {performer_id}, person: ({person}), url: {next_url}')
last_performer_id = performer_id
succ_rows += 1
else:
logging.warning(f'insert person: ({person}) {next_url} failed.')
elif status_code and status_code == scraper.http_code_404:
actor_id = db_tools.update_actor_detail({'href': next_url}, is_full_data=scraper.http_code_404)
logging.warning(f'404 page. id: {actor_id}, name: ({person}), url: {next_url}, Skiping...')
need_insert = False
break
elif status_code and status_code == scraper.http_code_login:
actor_id = db_tools.update_actor_detail({'href': next_url}, is_full_data=scraper.http_code_login)
logging.warning(f'401 page(need login). id: {actor_id}, name: ({person}), url: {next_url}, Skiping...')
need_insert = False
break
else:
logging.warning(f'fetch_page error. url: {next_url}')
# 如果出现了401或者404已经处理直接跳过
if not need_insert:
continue
time.sleep(0.5)
logging.info(f'total request: {len(performers_list)}, succ: {succ_rows}, last performer id: {last_performer_id}')
# 调试break
if debug:
return True
# 建立缩写到函数的映射
function_map = {
"actor_list": fetch_actor_list,
"actors" : fetch_performers_detail,
}
# 主函数
def main(cmd, args):
# 执行指定的函数
if cmd:
function_names = args.cmd.split(",") # 拆分输入
for short_name in function_names:
func = function_map.get(short_name.strip()) # 从映射中获取对应的函数
if callable(func):
func()
else:
logging.warning(f" {short_name} is not a valid function shortcut.")
else: # 全量执行
for name, func in function_map.items():
if callable(func):
func()
else:
logging.warning(f" {short_name} is not a valid function shortcut.")
logging.info(f'all process completed!')
# TODO:
# 1,
# 设置环境变量
def set_env(args):
global debug
debug = args.debug
if debug:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
global skip_local
skip_local = args.skip_local
global scan_mode
scan_mode = args.scan_mode
global update_mode
if args.update:
update_mode = args.update
if __name__ == "__main__":
# 命令行参数处理
keys_str = ",".join(function_map.keys())
usage_examples = textwrap.dedent('''
示例用法:
python3 ./fetch.py # 刷新列表页,并遍历新增的演员
python3 ./fetch.py --update=4 # 刷新列表页,并遍历全量的记录
python3 ./fetch.py --cmd=actor_list # 刷新列表页所有演员(三种语言)
python3 ./fetch.py --cmd=actors # 遍历新增的演员
''')
parser = argparse.ArgumentParser(
description='fetch javhd data.\n\n' + usage_examples,
formatter_class=argparse.RawDescriptionHelpFormatter
)
#parser = argparse.ArgumentParser(description='fetch javdb data.')
parser.add_argument("--cmd", type=str, help=f"Comma-separated list of function shortcuts: {keys_str}")
parser.add_argument('--update', type=int, choices=[0, 1, 2, 3, 4], default=0, help='0-只遍历is_full_data=0(默认), 1-只遍历is_full_data=1, 2-遍历is_full_data<=1, 3-只遍历is_full_data>1(异常数据), 4-遍历所有')
parser.add_argument('--scan_mode', type=int, choices=[0, 1, 2], default=1, help='1-只遍历所有 uncensored 的 makers/series/actors/movies(默认), 0-与前者相反, 2-全量')
parser.add_argument('--skip_local', action='store_true', help='如果本地缓存了页面,则跳过数据库操作')
parser.add_argument('--debug', action='store_true', help='Enable debug mode (limit records)')
args = parser.parse_args()
set_env(args)
main(args.cmd, args)

View File

@ -1,169 +0,0 @@
import sqlite3
import json
import re
import logging
from datetime import datetime
import config
def setup_logging():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
#db_path = "/root/sharedata/shared.db"
# 连接 SQLite 数据库
db_path = f"{config.global_share_data_dir}/sqlite/shared.db" # 替换为你的数据库文件
def connect_db(db_name=db_path):
return sqlite3.connect(db_name)
def create_tables(conn):
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS thelordofporn_actress (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pornstar TEXT,
rating REAL,
rank INTEGER,
votes INTEGER,
href TEXT UNIQUE,
career_start TEXT,
measurements TEXT,
born TEXT,
height TEXT,
weight TEXT,
date_modified TEXT,
global_rank INTEGER,
weekly_rank INTEGER,
last_month_rating REAL,
current_rating REAL,
total_votes INTEGER,
birth_date TEXT,
birth_year TEXT,
birth_place TEXT,
height_ft TEXT,
height_cm TEXT,
weight_lbs TEXT,
weight_kg TEXT,
created_at TEXT DEFAULT (datetime('now', 'localtime')),
updated_at TEXT DEFAULT (datetime('now', 'localtime'))
);
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS thelordofporn_alias (
actress_id INTEGER NOT NULL,
alias TEXT NOT NULL,
FOREIGN KEY (actress_id) REFERENCES thelordofporn_actress(id) ON DELETE CASCADE,
PRIMARY KEY(`actress_id`, `alias`)
);
''')
conn.commit()
def load_json(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(f"Failed to load JSON file: {e}")
return []
def clean_alias(alias):
alias = re.sub(r'\(Age \d+\)', '', alias) # 去掉 (Age XX)
return [name.strip() for name in alias.split(',') if name.strip()]
def parse_numeric(value):
try:
return float(value)
except (ValueError, TypeError):
return 0 # 默认值为 0
def insert_actress(conn, actress):
cursor = conn.cursor()
# 插入 thelordofporn_actress 表
cursor.execute('''
INSERT INTO thelordofporn_actress (
pornstar, rating, rank, votes, href, career_start, measurements, born,
height, weight, date_modified, global_rank, weekly_rank,
last_month_rating, current_rating, total_votes,
birth_date, birth_year, birth_place, height_ft, height_cm,
weight_lbs, weight_kg, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now', 'localtime'))
ON CONFLICT(href) DO UPDATE SET
rating=excluded.rating,
rank=excluded.rank,
votes=excluded.votes,
career_start=excluded.career_start,
measurements=excluded.measurements,
born=excluded.born,
height=excluded.height,
weight=excluded.weight,
date_modified=excluded.date_modified,
global_rank=excluded.global_rank,
weekly_rank=excluded.weekly_rank,
last_month_rating=excluded.last_month_rating,
current_rating=excluded.current_rating,
total_votes=excluded.total_votes,
birth_date=excluded.birth_date,
birth_year=excluded.birth_year,
birth_place=excluded.birth_place,
height_ft=excluded.height_ft,
height_cm=excluded.height_cm,
weight_lbs=excluded.weight_lbs,
weight_kg=excluded.weight_kg,
updated_at=datetime('now', 'localtime');
''', (
actress.get('pornstar', ''),
parse_numeric(actress.get('rating', 0)),
parse_numeric(actress.get('rank', 0)),
parse_numeric(actress.get('votes', 0)),
actress.get('href', ''),
actress.get('career_start', ''),
actress.get('measurements', ''),
actress.get('born', ''),
actress.get('height', ''),
actress.get('weight', ''),
actress.get('date_modified', ''),
parse_numeric(actress.get('global_rank', 0)),
parse_numeric(actress.get('weekly_rank', 0)),
parse_numeric(actress.get('last_month_rating', 0)),
parse_numeric(actress.get('current_rating', 0)),
parse_numeric(actress.get('total_votes', 0)),
actress.get('birth_date', ''),
str(actress.get('birth_year', '')),
actress.get('birth_place', ''),
actress.get('height_ft', ''),
str(actress.get('height_cm', '')),
str(actress.get('weight_lbs', '')),
str(actress.get('weight_kg', ''))
))
actress_id = cursor.lastrowid if cursor.lastrowid else cursor.execute("SELECT id FROM thelordofporn_actress WHERE href = ?", (actress.get('href', ''),)).fetchone()[0]
# 插入 thelordofporn_alias 表
if 'alias' in actress:
aliases = clean_alias(actress['alias'])
cursor.execute("DELETE FROM thelordofporn_alias WHERE actress_id = ?", (actress_id,))
for alias in aliases:
cursor.execute("INSERT INTO thelordofporn_alias (actress_id, alias) VALUES (?, ?) ON CONFLICT(actress_id, alias) DO NOTHING ", (actress_id, alias))
conn.commit()
def main():
setup_logging()
conn = connect_db()
#create_tables(conn)
actresses = load_json("./result/actress_detail.json")
if actresses:
for actress in actresses:
try:
insert_actress(conn, actress)
logging.info(f"Inserted/Updated: {actress.get('pornstar', 'Unknown')}")
except Exception as e:
logging.error(f"Error inserting actress: {e}")
else:
logging.warning("No data to insert.")
conn.close()
if __name__ == "__main__":
main()

View File

@ -1,138 +0,0 @@
"""
Script Name:
Description: 从 thelordofporn.com 上获取女优列表,并逐个获取女优详细信息。
由于网站使用了cloudflare, 无法直接爬取,使用 cloudscraper 绕过限制。
list_fetch.py 从网站上获取列表, 并以json的形式把结果输出到本地文件, 同时生成csv文件;
actress_fetch.py 则把上一步获取到的列表,读取详情页面,合并进来一些详细信息。
Author: [Your Name]
Created Date: YYYY-MM-DD
Last Modified: YYYY-MM-DD
Version: 1.0
Modification History:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
"""
import time
import json
import csv
import os
import random
import cloudscraper
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import config
DIR_RES = config.global_host_data_dir
ACTRESSES_JSON = f"{DIR_RES}/actresses.json"
ACTRESSES_CSV = f"{DIR_RES}/actresses.csv"
# 设置目标 URL
BASE_URL = "https://thelordofporn.com/pornstars/"
# 伪装成真实浏览器
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Referer": "https://thelordofporn.com/",
}
# 记录抓取数据
actress_list = []
# 创建 CloudScraper 以绕过 Cloudflare
scraper = cloudscraper.create_scraper(
browser={"browser": "chrome", "platform": "windows", "mobile": False}
)
# 爬取页面函数(支持分页)
def scrape_page(url):
print(f"[INFO] 正在抓取: {url}")
# 网络访问失败时自动重试
for attempt in range(3):
try:
response = scraper.get(url, headers=HEADERS, timeout=10)
response.raise_for_status() # 检查 HTTP 状态码
# 检查是否返回了有效的页面
soup = BeautifulSoup(response.text, "html.parser")
main_tag = soup.find("main", class_="site-content")
if main_tag:
break # 如果页面内容正确,则继续解析
else:
print(f"[WARNING] 服务器返回的页面不完整,尝试重新获取 ({attempt+1}/3)")
time.sleep(random.uniform(2, 5)) # 休眠 2-5 秒再试
except Exception as e:
print(f"[ERROR] 访问失败 ({attempt+1}/3): {e}")
time.sleep(random.uniform(2, 5)) # 休眠 2-5 秒再试
else:
print("[ERROR] 多次尝试后仍然失败,跳过该页面")
return None
#soup = BeautifulSoup(response.text, "html.parser")
# 解析演员信息
articles = soup.find_all("article", class_="loop-item")
for article in articles:
try:
# 获取演员详情
title_tag = article.find("h3", class_="loop-item__title").find("a")
title = title_tag.text.strip()
href = title_tag["href"]
# 获取评分
rating_tag = article.find("div", class_="loop-item__rating")
rating = rating_tag.text.strip() if rating_tag else "N/A"
# 获取 Rank 和 Votes
meta_tags = article.find("div", class_="loop-item__rank").find_all("span")
rank = meta_tags[0].find("b").text.strip() if meta_tags else "N/A"
votes = meta_tags[1].find("b").text.strip() if len(meta_tags) > 1 else "N/A"
# 存入列表
actress_list.append({
"pornstar": title,
"rating": rating,
"rank": rank,
"votes": votes,
"href": href
})
print(f"-----[INFO] 获取演员: {title} (Rank: {rank}, Votes: {votes}, Rating: {rating})-----")
except Exception as e:
print(f"[ERROR] 解析演员信息失败: {e}")
# 查找下一页链接
next_page_tag = soup.select_one(".nav-links .next.page-numbers")
if next_page_tag:
next_page_url = urljoin(BASE_URL, next_page_tag["href"])
print(f"[INFO] 发现下一页: {next_page_url}")
time.sleep(random.uniform(1, 3)) # 休眠 1-3 秒,避免被封
scrape_page(next_page_url)
else:
print("[INFO] 已抓取所有页面,爬取结束")
# 保存数据
def save_data():
# 确保目录存在
os.makedirs(DIR_RES, exist_ok=True)
# 保存数据为 JSON
with open(ACTRESSES_JSON, "w", encoding="utf-8") as json_file:
json.dump(actress_list, json_file, ensure_ascii=False, indent=4)
print(f"[INFO] 数据已保存到 {ACTRESSES_JSON}")
# 保存数据为 CSV
with open(ACTRESSES_CSV, "w", encoding="utf-8", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=["pornstar", "rating", "rank", "votes", "href"])
writer.writeheader()
writer.writerows(actress_list)
print(f"[INFO] 数据已保存到 {ACTRESSES_CSV}")
if __name__ == '__main__':
scrape_page(BASE_URL)
save_data()

View File

@ -0,0 +1,267 @@
import cloudscraper
import time
import json
import csv
import logging
import signal
import sys
import os
import re
from bs4 import BeautifulSoup
from requests.exceptions import RequestException
from functools import partial
from urllib.parse import urljoin, urlparse
import config
import utils
# 定义基础 URL 和可变参数
host_url = "https://thelordofporn.com/"
pornstar_url = "https://thelordofporn.com/pornstars/"
lang_prefix = ["ja", "en", "zh"]
http_code_404 = 404
http_code_login = 401
http_code_local = 99
save_raw_html = False
load_from_local = False
# 伪装成真实浏览器
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Referer": "https://thelordofporn.com/",
}
# 创建 CloudScraper 以绕过 Cloudflare
scraper = cloudscraper.create_scraper(
browser={"browser": "chrome", "platform": "windows", "mobile": False}
)
#使用 CloudScraper 进行网络请求,并执行页面验证,支持不同解析器和预处理
def fetch_page(url, validator, max_retries=3, parser="html.parser", preprocessor=None):
if load_from_local: # 从本地读取的逻辑
html = utils.read_raw_html(url)
if html:
# 预处理 HTML如果提供了 preprocessor
html_text = preprocessor(html) if preprocessor else html
soup = BeautifulSoup(html_text, parser)
if validator(soup): # 进行自定义页面检查
logging.debug(f"read from local. href: {url}")
return soup, http_code_local # 返回一个小于100的错误码表明是从本地返回的
for attempt in range(max_retries):
try:
if 'thelordofporn.com' not in url.lower():
logging.error(f'wrong url format: {url}')
return None, None
response = scraper.get(url, headers=HEADERS)
# 处理 HTTP 状态码
if response.status_code == 404:
logging.debug(f"Page not found (404): {url}")
return None, http_code_404 # 直接返回 404调用方可以跳过
response.raise_for_status() # 处理 HTTP 错误
# 检查是否发生跳转,比如到登录页面
if response.history:
logging.debug(f"Page redirected on {url}. Checking if it's a login page.")
soup = BeautifulSoup(response.text, parser)
# 判断是否为登录页面,
if soup.find('nav', class_='panel form-panel'):
logging.debug(f"Page redirected to login page on {url}.")
return None, http_code_login
if save_raw_html:
utils.write_raw_html(url, response.text)
# 预处理 HTML如果提供了 preprocessor
html_text = preprocessor(response.text) if preprocessor else response.text
soup = BeautifulSoup(html_text, parser)
if validator(soup): # 进行自定义页面检查
return soup, response.status_code
logging.warning(f"Validation failed on attempt {attempt + 1} for {url}")
except cloudscraper.exceptions.CloudflareChallengeError as e:
logging.error(f"Cloudflare Challenge Error on {url}: {e}, Retring...")
except cloudscraper.exceptions.CloudflareCode1020 as e:
logging.error(f"Access Denied (Error 1020) on {url}: {e}, Retring...")
except Exception as e:
logging.error(f"Unexpected error on {url}: {e}, Retring...")
logging.error(f'Fetching failed after max retries. {url}')
return None, None # 达到最大重试次数仍然失败
# 修复 HTML 结构,去除多余标签并修正 <a> 标签,在获取人种的时候需要
def preprocess_html(html):
return html.replace('<br>', '').replace('<a ', '<a target="_blank" ')
# 通用的 HTML 结构验证器
def generic_validator(soup, tag, identifier, attr_type="id"):
if attr_type == "id":
return soup.find(tag, id=identifier) is not None
elif attr_type == "class":
return bool(soup.find_all(tag, class_=identifier))
elif attr_type == "name":
return bool(soup.find('select', {'name': identifier}))
return False
# 解析列表页
def parse_actor_list(soup, href):
# 解析演员信息
actress_list = []
next_page_url = None
articles = soup.find_all("article", class_="loop-item")
for article in articles:
try:
# 获取演员详情
title_tag = article.find("h3", class_="loop-item__title").find("a")
title = title_tag.text.strip()
href = title_tag["href"]
# 获取评分
rating_tag = article.find("div", class_="loop-item__rating")
rating = rating_tag.text.strip() if rating_tag else "N/A"
# 获取 Rank 和 Votes
meta_tags = article.find("div", class_="loop-item__rank").find_all("span")
rank = meta_tags[0].find("b").text.strip() if meta_tags else "N/A"
votes = meta_tags[1].find("b").text.strip() if len(meta_tags) > 1 else "N/A"
# 存入列表
actress_list.append({
"pornstar": title,
"rating": utils.parse_numeric(rating),
"rank": utils.parse_numeric(rank),
"votes": utils.parse_numeric(votes),
"href": href
})
except Exception as e:
logging.error(f"parse list faild: {e}, url: {href}")
return None, None
# 查找下一页链接
next_page_tag = soup.select_one(".nav-links .next.page-numbers")
if next_page_tag:
next_page_url = urljoin(host_url, next_page_tag["href"])
logging.debug(f"next page: {next_page_url}")
else:
logging.debug("find all pages.")
return actress_list, next_page_url
# 解析 HTML 内容,提取需要的数据
def parse_actor_detail(soup, href):
# 提取基本信息
entry_header = soup.find("header", class_="entry-header")
name_el = entry_header.find("h1", class_="entry-title") if entry_header else None
name = name_el.text.strip() if name_el else ""
date_modified_el = soup.find("time", itemprop="dateModified")
if date_modified_el:
date_modified = date_modified_el.get("content", "").strip()
else:
date_modified = ""
# 提取 metadata
global_rank = ""
weekly_rank = ""
last_month_rating = ""
current_rating = ""
total_votes = ""
for div in entry_header.find_all("div", class_="porn-star-rank__item"):
text = div.text.strip()
if "Global Rank" in text:
global_rank = div.find("b").text.strip()
elif "Weekly Rank" in text:
weekly_rank = div.find("b").text.strip()
for item in soup.find_all("div", class_="specifications__item--horizontal"):
text = item.text.strip()
if "Last Month" in text:
last_month_rating = item.find("b").text.strip()
elif "Rating Av." in text:
current_rating = item.find("b").text.strip()
elif "Total of" in text:
total_votes = item.find("b").text.strip()
# 解析详细属性
attributes = {}
for row in soup.find_all("div", class_="specifications-grid-row"):
items = row.find_all("div", class_="specifications-grid-item")
if len(items) == 2:
label = items[0].find("h5").text.strip()
value = items[0].find("span").text.strip()
attributes[label] = value
label2 = items[1].find("h5").text.strip()
value2 = items[1].find("span").text.strip()
attributes[label2] = value2
# 解析出生信息、身高、体重等
birth_info = utils.parse_birth_info(attributes.get("Born", ""))
height_info = utils.parse_height(attributes.get("Height", ""))
weight_info = utils.parse_weight(attributes.get("Weight", ""))
alias_list = utils.clean_alias(attributes.get("Name", ""))
return {
'name': name,
'href': href,
"alias": alias_list,
"career_start": attributes.get("Career start", ""),
"measurements": attributes.get("Measurements", ""),
"born": attributes.get("Born", ""),
"height": attributes.get("Height", ""),
"weight": attributes.get("Weight", ""),
"date_modified": date_modified,
"global_rank": utils.parse_numeric(global_rank),
"weekly_rank": utils.parse_numeric(weekly_rank),
"last_month_rating": utils.parse_numeric(last_month_rating),
"current_rating": utils.parse_numeric(current_rating),
"total_votes": utils.parse_numeric(total_votes),
**birth_info,
**height_info,
**weight_info,
}, None
###### 以下为测试代码 ######
def test_actor_list():
next_url = pornstar_url
all_data = []
while next_url:
print(f'fetching page {next_url}')
soup, http_status_code = fetch_page(next_url, partial(generic_validator, tag="main", identifier="content", attr_type="id"))
if soup:
list_data, next_url = parse_actor_list(soup, next_url)
if list_data :
all_data.extend(list_data)
else:
print('get wrong page.')
if next_url:
print(f"next url: {next_url}")
break
print(all_data)
def test_actor():
next_url = 'https://thelordofporn.com/pornstars/eva-elfie/'
while next_url:
print(f'fetching page {next_url}')
soup, http_status_code = fetch_page(next_url, partial(generic_validator, tag="main", identifier="content", attr_type="id"))
if soup:
data, next_url = parse_actor_detail(soup, next_url)
if data :
print(data)
else:
print('get wrong page.')
break
if __name__ == "__main__":
test_actor_list()
test_actor()

View File

@ -0,0 +1,199 @@
import sqlite3
import json
import config
import logging
from datetime import datetime
# 连接 SQLite 数据库
DB_PATH = f"{config.global_share_data_dir}/sqlite/shared.db" # 替换为你的数据库文件
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
cursor = conn.cursor()
tbl_name_actors = 'thelordofporn_actress'
tbl_name_alias = 'thelordofporn_alias'
# 检查 SQLite 版本
lower_sqlite_version = False
sqlite_version = sqlite3.sqlite_version_info
if sqlite_version < (3, 24, 0):
lower_sqlite_version = True
# 获取表的列名和默认值
def get_table_columns_and_defaults(tbl_name):
try:
cursor.execute(f"PRAGMA table_info({tbl_name})")
columns = cursor.fetchall()
column_info = {}
for col in columns:
col_name = col[1]
default_value = col[4]
column_info[col_name] = default_value
return column_info
except sqlite3.Error as e:
logging.error(f"Error getting table columns: {e}")
return None
# 检查并处理数据
def check_and_process_data(data, tbl_name):
column_info = get_table_columns_and_defaults(tbl_name=tbl_name)
if column_info is None:
return None
processed_data = {}
for col, default in column_info.items():
if col == 'id': # 自增主键,不需要用户提供
continue
if col == 'created_at' or col == 'updated_at': # 日期函数,用户自己指定即可
continue
if col in data:
processed_data[col] = data[col]
return processed_data
# 插入或更新数据
def insert_or_update_common(data, tbl_name, uniq_key='href'):
if lower_sqlite_version:
return insert_or_update_common_lower(data, tbl_name, uniq_key)
try:
processed_data = check_and_process_data(data, tbl_name)
if processed_data is None:
return None
columns = ', '.join(processed_data.keys())
values = list(processed_data.values())
placeholders = ', '.join(['?' for _ in values])
update_clause = ', '.join([f"{col}=EXCLUDED.{col}" for col in processed_data.keys() if col != uniq_key]) + ', updated_at=datetime(\'now\', \'localtime\')'
sql = f'''
INSERT INTO {tbl_name} ({columns}, updated_at)
VALUES ({placeholders}, datetime('now', 'localtime'))
ON CONFLICT ({uniq_key}) DO UPDATE SET {update_clause}
'''
cursor.execute(sql, values)
conn.commit()
# 获取插入或更新后的 report_id
cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],))
report_id = cursor.fetchone()[0]
return report_id
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 插入或更新数据
def insert_or_update_common_lower(data, tbl_name, uniq_key='href'):
try:
processed_data = check_and_process_data(data, tbl_name)
if processed_data is None:
return None
columns = ', '.join(processed_data.keys())
values = list(processed_data.values())
placeholders = ', '.join(['?' for _ in values])
# 先尝试插入数据
try:
sql = f'''
INSERT INTO {tbl_name} ({columns}, updated_at)
VALUES ({placeholders}, datetime('now', 'localtime'))
'''
cursor.execute(sql, values)
conn.commit()
except sqlite3.IntegrityError: # 唯一键冲突,执行更新操作
update_clause = ', '.join([f"{col}=?" for col in processed_data.keys() if col != uniq_key]) + ', updated_at=datetime(\'now\', \'localtime\')'
update_values = [processed_data[col] for col in processed_data.keys() if col != uniq_key]
update_values.append(data[uniq_key])
sql = f"UPDATE {tbl_name} SET {update_clause} WHERE {uniq_key} = ?"
cursor.execute(sql, update_values)
conn.commit()
# 获取插入或更新后的 report_id
cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],))
report_id = cursor.fetchone()[0]
return report_id
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 插入books表并判断是否需要更新
def insert_actor_index(data):
try:
return insert_or_update_common(data, tbl_name_actors)
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 更新详细信息
def update_actor_detail(data, is_full_data=1):
try:
data['is_full_data'] = is_full_data
row_id = insert_or_update_common(data, tbl_name_actors)
# 写入别名表
for alias in data.get("alias") or []:
cursor.execute('''
INSERT OR IGNORE INTO thelordofporn_alias (actress_id, alias, updated_at)
VALUES (?, ?, datetime('now', 'localtime'))
''', (row_id, alias))
conn.commit()
return row_id
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 查询
def query_actors(**filters):
try:
sql = f"SELECT href, pornstar as name FROM {tbl_name_actors} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"pornstar": " AND pornstar LIKE ?",
"is_full_data": " AND is_full_data = ?",
"start_id": " AND id > ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "pornstar":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
for key in ["is_full_data_in", "is_full_data_not_in"]:
if key in filters:
values = filters[key]
if values:
placeholders = ", ".join(["?"] * len(values))
operator = "IN" if key == "is_full_data_in" else "NOT IN"
sql += f" AND is_full_data {operator} ({placeholders})"
params.extend(values)
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
cursor.execute(sql, params)
#return [row[0].lower() for row in cursor.fetchall()] # 返回小写
return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 测试代码
if __name__ == "__main__":
print(query_actors("name LIKE '%未久%'"))
#delete_actor_by_href('https://www.javdb.com/actors/MkAX')
print(query_actors())

View File

@ -1,205 +0,0 @@
import requests
from bs4 import BeautifulSoup
import os
import sys
import random
import time
import re
import logging
import csv
from datetime import datetime
from datetime import date
import config # 日志配置
import cloudscraper
# 日志
config.setup_logging()
httpx_logger = logging.getLogger("httpx")
httpx_logger.setLevel(logging.DEBUG)
# 配置基础URL和输出文件
base_url = 'https://thelordofporn.com/'
list_url_scenes = 'https://thelordofporn.com/category/top-10/porn-scenes-movies/'
list_url_pornstars = 'https://thelordofporn.com/category/pornstars-top-10/'
curr_novel_pages = 0
res_dir = 'result'
top_scenes_file = f'{res_dir}/top_scenes_list.csv'
top_pornstars_file = f'{res_dir}/top_pornstars_list.csv'
# 请求头和 Cookies模拟真实浏览器
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
}
COOKIES = {
"cf_clearance": "your_clearance_token_here" # 需要根据 Cloudflare 的验证情况更新
}
# 定义获取页面内容的函数,带重试机制
def get_page_content(url, max_retries=100, sleep_time=5, default_timeout=10):
scraper = cloudscraper.create_scraper(
browser={"browser": "chrome", "platform": "windows", "mobile": False}
)
retries = 0
while retries < max_retries:
try:
response = scraper.get(url, headers=HEADERS, cookies=COOKIES, timeout=default_timeout)
if response.status_code == 200 and "content-area content-area--full-width" in response.text :
return response.text # 请求成功,返回内容
except requests.RequestException as e:
retries += 1
logging.info(f"Warn fetching page {url}: {e}. Retrying {retries}/{max_retries}...")
if retries >= max_retries:
logging.error(f"Failed to fetch page {url} after {max_retries} retries.")
return None
time.sleep(sleep_time) # 休眠指定的时间,然后重试
# 获取 top scenes and movies
def get_scenes(base_url, output_file=top_scenes_file):
# 初始化变量
current_url = base_url
all_data = []
while current_url:
try:
logging.info(f"Fetching URL: {current_url}")
# 发起网络请求
content = get_page_content(current_url)
# 解析网页内容
soup = BeautifulSoup(content, "html.parser")
articles = soup.find_all("article", class_="loop-item loop-item--top loop-item--ca_prod_movies__scen")
if not articles:
logging.warning(f"No articles found on page: {current_url}")
# 解析每个 article 标签
for article in articles:
try:
# 获取 href 和 title
a_tag = article.find("a", class_="loop-item__image")
title = a_tag.get("title", "").strip()
href = a_tag.get("href", "").strip()
if title and href:
all_data.append({
'title': title,
'href': href
})
logging.info(f"Extracted: {title} -> {href}")
else:
logging.warning("Missing title or href in an article.")
except Exception as e:
logging.error(f"Error parsing article: {e}")
# 找下一页链接
next_page = soup.find("a", class_="next page-numbers")
if next_page:
current_url = next_page.get("href", "").strip()
else:
current_url = None
logging.info("No more pages to fetch.")
# 等待一段时间以避免被目标网站封禁
time.sleep(2)
except requests.exceptions.RequestException as e:
logging.error(f"Network error while fetching {current_url}: {e}")
break
except Exception as e:
logging.error(f"Unexpected error: {e}")
break
# 保存结果到文件
csv_headers = ["title", "href"]
with open(output_file, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_headers)
writer.writeheader()
writer.writerows(all_data)
logging.info(f"Data successfully saved to {output_file}.")
# 获取 top pornstars
def get_pornstars(base_url, output_file=top_pornstars_file):
# 初始化变量
current_url = base_url
all_data = []
while current_url:
try:
logging.info(f"Fetching URL: {current_url}")
# 发起网络请求
content = get_page_content(current_url)
# 解析网页内容
soup = BeautifulSoup(content, "html.parser")
articles = soup.find_all("article", class_="loop-item loop-item--top loop-item--ca_prod_pornstars")
if not articles:
logging.warning(f"No articles found on page: {current_url}")
# 解析每个 article 标签
for article in articles:
try:
# 获取 href 和 title
a_tag = article.find("a", class_="loop-item__image")
title = a_tag.get("title", "").strip()
href = a_tag.get("href", "").strip()
if title and href:
all_data.append({
'title':title,
'href': href
})
logging.info(f"Extracted: {title} -> {href}")
else:
logging.warning("Missing title or href in an article.")
except Exception as e:
logging.error(f"Error parsing article: {e}")
# 找下一页链接
next_page = soup.find("a", class_="next page-numbers")
if next_page:
current_url = next_page.get("href", "").strip()
else:
current_url = None
logging.info("No more pages to fetch.")
# 等待一段时间以避免被目标网站封禁
time.sleep(2)
except requests.exceptions.RequestException as e:
logging.error(f"Network error while fetching {current_url}: {e}")
break
except Exception as e:
logging.error(f"Unexpected error: {e}")
break
# 保存结果到文件
csv_headers = ["title", "href"]
with open(output_file, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_headers)
writer.writeheader()
writer.writerows(all_data)
logging.info(f"Data successfully saved to {output_file}.")
def main():
if len(sys.argv) < 2:
print("Usage: python script.py <cmd>")
print("cmd: scenes, pornstars")
sys.exit(1)
cmd = sys.argv[1]
if cmd == "scenes":
get_scenes(list_url_scenes) # 之前已经实现的获取列表功能
elif cmd == "pornstars":
get_pornstars(list_url_pornstars) # 之前已经实现的获取详情功能
else:
print(f"Unknown command: {cmd}")
if __name__ == '__main__':
main()

View File

@ -0,0 +1,48 @@
import re
import os
import json
import time
import csv
from datetime import datetime
from urllib.parse import urlparse
import logging
import config
from urllib.parse import urlparse, urlunparse, parse_qs, urlencode
# 解析出生日期和地点
def parse_birth_info(text):
match = re.match(r"(.+?) (\d{1,2}), (\d{4}) in (.+)", text)
if match:
return {
"birth_date": f"{match.group(1)} {match.group(2)}, {match.group(3)}",
"birth_year": match.group(3),
"birth_place": match.group(4),
}
return {"birth_date": text, "birth_year": "", "birth_place": ""}
# 解析身高
def parse_height(text):
match = re.match(r"(\d+)\s*ft\s*(\d*)\s*in\s*\((\d+)\s*cm\)", text)
if match:
height_ft = f"{match.group(1)}'{match.group(2)}\""
return {"height_ft": height_ft.strip(), "height_cm": match.group(3)}
return {"height_ft": text, "height_cm": ""}
# 解析体重
def parse_weight(text):
match = re.match(r"(\d+)\s*lbs\s*\((\d+)\s*kg\)", text)
if match:
return {"weight_lbs": match.group(1), "weight_kg": match.group(2)}
return {"weight_lbs": text, "weight_kg": ""}
def clean_alias(alias):
alias = re.sub(r'\(Age \d+\)', '', alias) # 去掉 (Age XX)
return [name.strip() for name in alias.split(',') if name.strip()]
def parse_numeric(value):
try:
return float(value)
except (ValueError, TypeError):
return 0 # 默认值为 0