add some scripts.

This commit is contained in:
2025-02-11 16:07:43 +08:00
parent 2cab12ea34
commit 62a2fbdc77
20 changed files with 148909 additions and 4 deletions

View File

@ -0,0 +1,205 @@
import requests
from bs4 import BeautifulSoup
import os
import sys
import random
import time
import re
import logging
import csv
from datetime import datetime
from datetime import date
import config # 日志配置
import cloudscraper
# 日志
config.setup_logging()
httpx_logger = logging.getLogger("httpx")
httpx_logger.setLevel(logging.DEBUG)
# 配置基础URL和输出文件
base_url = 'https://thelordofporn.com/'
list_url_scenes = 'https://thelordofporn.com/category/top-10/porn-scenes-movies/'
list_url_pornstars = 'https://thelordofporn.com/category/pornstars-top-10/'
curr_novel_pages = 0
res_dir = 'result'
top_scenes_file = f'{res_dir}/top_scenes_list.csv'
top_pornstars_file = f'{res_dir}/top_pornstars_list.csv'
# 请求头和 Cookies模拟真实浏览器
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
}
COOKIES = {
"cf_clearance": "your_clearance_token_here" # 需要根据 Cloudflare 的验证情况更新
}
# 定义获取页面内容的函数,带重试机制
def get_page_content(url, max_retries=100, sleep_time=5, default_timeout=10):
scraper = cloudscraper.create_scraper(
browser={"browser": "chrome", "platform": "windows", "mobile": False}
)
retries = 0
while retries < max_retries:
try:
response = scraper.get(url, headers=HEADERS, cookies=COOKIES, timeout=default_timeout)
if response.status_code == 200 and "content-area content-area--full-width" in response.text :
return response.text # 请求成功,返回内容
except requests.RequestException as e:
retries += 1
logging.info(f"Warn fetching page {url}: {e}. Retrying {retries}/{max_retries}...")
if retries >= max_retries:
logging.error(f"Failed to fetch page {url} after {max_retries} retries.")
return None
time.sleep(sleep_time) # 休眠指定的时间,然后重试
# 获取 top scenes and movies
def get_scenes(base_url, output_file=top_scenes_file):
# 初始化变量
current_url = base_url
all_data = []
while current_url:
try:
logging.info(f"Fetching URL: {current_url}")
# 发起网络请求
content = get_page_content(current_url)
# 解析网页内容
soup = BeautifulSoup(content, "html.parser")
articles = soup.find_all("article", class_="loop-item loop-item--top loop-item--ca_prod_movies__scen")
if not articles:
logging.warning(f"No articles found on page: {current_url}")
# 解析每个 article 标签
for article in articles:
try:
# 获取 href 和 title
a_tag = article.find("a", class_="loop-item__image")
title = a_tag.get("title", "").strip()
href = a_tag.get("href", "").strip()
if title and href:
all_data.append({
'title': title,
'href': href
})
logging.info(f"Extracted: {title} -> {href}")
else:
logging.warning("Missing title or href in an article.")
except Exception as e:
logging.error(f"Error parsing article: {e}")
# 找下一页链接
next_page = soup.find("a", class_="next page-numbers")
if next_page:
current_url = next_page.get("href", "").strip()
else:
current_url = None
logging.info("No more pages to fetch.")
# 等待一段时间以避免被目标网站封禁
time.sleep(2)
except requests.exceptions.RequestException as e:
logging.error(f"Network error while fetching {current_url}: {e}")
break
except Exception as e:
logging.error(f"Unexpected error: {e}")
break
# 保存结果到文件
csv_headers = ["title", "href"]
with open(output_file, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_headers)
writer.writeheader()
writer.writerows(all_data)
logging.info(f"Data successfully saved to {output_file}.")
# 获取 top pornstars
def get_pornstars(base_url, output_file=top_pornstars_file):
# 初始化变量
current_url = base_url
all_data = []
while current_url:
try:
logging.info(f"Fetching URL: {current_url}")
# 发起网络请求
content = get_page_content(current_url)
# 解析网页内容
soup = BeautifulSoup(content, "html.parser")
articles = soup.find_all("article", class_="loop-item loop-item--top loop-item--ca_prod_pornstars")
if not articles:
logging.warning(f"No articles found on page: {current_url}")
# 解析每个 article 标签
for article in articles:
try:
# 获取 href 和 title
a_tag = article.find("a", class_="loop-item__image")
title = a_tag.get("title", "").strip()
href = a_tag.get("href", "").strip()
if title and href:
all_data.append({
'title':title,
'href': href
})
logging.info(f"Extracted: {title} -> {href}")
else:
logging.warning("Missing title or href in an article.")
except Exception as e:
logging.error(f"Error parsing article: {e}")
# 找下一页链接
next_page = soup.find("a", class_="next page-numbers")
if next_page:
current_url = next_page.get("href", "").strip()
else:
current_url = None
logging.info("No more pages to fetch.")
# 等待一段时间以避免被目标网站封禁
time.sleep(2)
except requests.exceptions.RequestException as e:
logging.error(f"Network error while fetching {current_url}: {e}")
break
except Exception as e:
logging.error(f"Unexpected error: {e}")
break
# 保存结果到文件
csv_headers = ["title", "href"]
with open(output_file, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_headers)
writer.writeheader()
writer.writerows(all_data)
logging.info(f"Data successfully saved to {output_file}.")
def main():
if len(sys.argv) < 2:
print("Usage: python script.py <cmd>")
print("cmd: scenes, pornstars")
sys.exit(1)
cmd = sys.argv[1]
if cmd == "scenes":
get_scenes(list_url_scenes) # 之前已经实现的获取列表功能
elif cmd == "pornstars":
get_pornstars(list_url_pornstars) # 之前已经实现的获取详情功能
else:
print(f"Unknown command: {cmd}")
if __name__ == '__main__':
main()