This repository has been archived on 2026-01-07. You can view files and clone it, but cannot push or open issues or pull requests.
Files
resources/pornhub/get_list.py
2025-03-17 11:30:35 +08:00

179 lines
6.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import sys
import os
import time
import re
import logging
from datetime import datetime, timedelta
from urllib.parse import urlparse, parse_qs
from yt_dlp import YoutubeDL
import config
from custom_pornhub import CustomPornHubIE
# 配置 yt-dlp 获取视频元数据
ydl_opts = {
'http_headers': {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
'Referer': 'https://www.pornhub.com/',
},
'extract_flat': True, # 只提取元数据,不下载视频
'skip_download': True,
'verbose': True, # 输出详细日志
}
meta_dir = './meta'
list_file = f'{meta_dir}/video_list.json'
detail_file = f'{meta_dir}/video_details.json'
config.setup_logging()
def convert_to_number(value):
"""
将字符串解析为实际数字,支持 K和 M百万等单位
"""
match = re.match(r'^([\d.]+)([KkMm]?)$', value)
if not match:
return None
number = float(match.group(1))
unit = match.group(2).upper()
if unit == 'K': # 千
return int(number * 1000)
elif unit == 'M': # 百万
return int(number * 1000000)
return int(number) # 无单位,直接返回数字
# 筛选最近一年的视频
def is_recent_video(upload_date):
try:
# 上传日期格式为 YYYYMMDD
video_date = datetime.strptime(upload_date, "%Y%m%d")
one_year_ago = datetime.now() - timedelta(days=365)
return video_date >= one_year_ago
except ValueError:
return False
# 获取视频列表
def fetch_video_list(output_file="video_list.json"):
#url = "https://www.pornhub.com/video?o=mr" # 根据时间排序(最近一年)
url = "https://www.pornhub.com/video?o=cm" # 最近
# 爬取视频列表
with YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(url, download=False)
entries = info_dict.get('entries', [])
# 保存到文件,确保每条记录一行
with open(output_file, 'w', encoding='utf-8') as f:
for entry in entries:
# 清理换行符或其他不可见字符
cleaned_entry = {k: str(v).replace("\n", " ").replace("\r", " ") for k, v in entry.items()}
# 写入一行 JSON 数据
f.write(json.dumps(cleaned_entry, ensure_ascii=False) + "\n")
# 读取已完成的详情列表
def load_processed_details(file_name="video_details.json"):
video_list = []
id_list = []
if os.path.exists(file_name):
with open(file_name, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip() # 去掉首尾的空格或换行符
if line: # 确保行不为空
video = json.loads(line) # 将 JSON 字符串解析为 Python 字典
video_list.append(video) # 将 JSON 字符串解析为 Python 字典
id = video.get("id") # 提取 URL 字段
if id:
id_list.append(id) # 添加到 URL 列表
#return json.load(f)
return video_list, id_list
# 保存详情到文件
def save_processed_details(data, file_name="video_details.json"):
with open(file_name, 'a+', encoding='utf-8') as f:
# 清理换行符或其他不可见字符
cleaned_entry = {k: str(v).replace("\n", " ").replace("\r", " ") for k, v in data.items()}
# 写入一行 JSON 数据
f.write(json.dumps(cleaned_entry, ensure_ascii=False) + "\n")
#json.dump(data, f, ensure_ascii=False, indent=4)
# 逐个获取视频详情
def fetch_video_details(list_file="video_list.json", details_file="video_details.json"):
# 加载视频列表
video_list = []
with open(list_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip() # 去掉首尾的空格或换行符
if line: # 确保行不为空
video_list.append(json.loads(line)) # 将 JSON 字符串解析为 Python 字典
#video_list = json.load(f)
# 加载已处理的详情
processed_details, id_list = load_processed_details(details_file)
with YoutubeDL(ydl_opts) as ydl:
# 添加自定义提取器
ydl.add_info_extractor(CustomPornHubIE())
for video in video_list:
try:
# 获取视频详情
url = video.get('url')
if not url:
logging.info(f"Wrong video, no url: {video.get('title')}")
continue
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
video_id = query_params.get('viewkey', [None])[0]
if video_id in id_list:
logging.info(f"Skipping existed video: (ID: {video_id}) {video.get('title')}")
continue
logging.info(f"processing video: {video.get('title')} (ID: {video_id})")
video_info = ydl.extract_info(url, download=False)
# 自定义提取收藏、喜欢、点赞等信息(假设这些信息可以从网页中解析)
video_data = {
'title': video_info.get('title'),
'url': url,
'id': video_id,
"upload_date": video_info.get('upload_date'),
'view_count': video_info.get('view_count'),
'like_count': video_info.get('like_count'),
'dislike_count': video_info.get('dislike_count'),
'favorite_count': convert_to_number(video_info.get('favorite_count')), # 可能需要手动提取
}
# 每次保存进度
save_processed_details(video_data, details_file)
logging.info(f"get video detail succ: {video.get('title')}")
except Exception as e:
logging.error(f"get video detail error: {video.get('title')}, msg: {e}")
time.sleep(1)
logging.info("fetch_video_details done!")
def main():
if len(sys.argv) != 2:
print("Usage: python script.py <cmd>")
print("cmd: get_list, get_detail, get_all")
sys.exit(1)
cmd = sys.argv[1]
if cmd == "get_list":
fetch_video_list(list_file) # 之前已经实现的获取列表功能
elif cmd == "get_detail":
fetch_video_details(list_file, detail_file) # 之前已经实现的获取详情功能
elif cmd == "get_all":
fetch_video_list(list_file)
fetch_video_details(list_file, detail_file)
else:
print(f"Unknown command: {cmd}")
if __name__ == '__main__':
main()