import json import sys import os import time import re import logging from datetime import datetime, timedelta from urllib.parse import urlparse, parse_qs from yt_dlp import YoutubeDL import config from custom_pornhub import CustomPornHubIE # 配置 yt-dlp 获取视频元数据 ydl_opts = { 'http_headers': { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', 'Referer': 'https://www.pornhub.com/', }, 'extract_flat': True, # 只提取元数据,不下载视频 'skip_download': True, 'verbose': True, # 输出详细日志 } meta_dir = './meta' list_file = f'{meta_dir}/video_list.json' detail_file = f'{meta_dir}/video_details.json' config.setup_logging() def convert_to_number(value): """ 将字符串解析为实际数字,支持 K(千)和 M(百万)等单位 """ match = re.match(r'^([\d.]+)([KkMm]?)$', value) if not match: return None number = float(match.group(1)) unit = match.group(2).upper() if unit == 'K': # 千 return int(number * 1000) elif unit == 'M': # 百万 return int(number * 1000000) return int(number) # 无单位,直接返回数字 # 筛选最近一年的视频 def is_recent_video(upload_date): try: # 上传日期格式为 YYYYMMDD video_date = datetime.strptime(upload_date, "%Y%m%d") one_year_ago = datetime.now() - timedelta(days=365) return video_date >= one_year_ago except ValueError: return False # 获取视频列表 def fetch_video_list(output_file="video_list.json"): #url = "https://www.pornhub.com/video?o=mr" # 根据时间排序(最近一年) url = "https://www.pornhub.com/video?o=cm" # 最近 # 爬取视频列表 with YoutubeDL(ydl_opts) as ydl: info_dict = ydl.extract_info(url, download=False) entries = info_dict.get('entries', []) # 保存到文件,确保每条记录一行 with open(output_file, 'w', encoding='utf-8') as f: for entry in entries: # 清理换行符或其他不可见字符 cleaned_entry = {k: str(v).replace("\n", " ").replace("\r", " ") for k, v in entry.items()} # 写入一行 JSON 数据 f.write(json.dumps(cleaned_entry, ensure_ascii=False) + "\n") # 读取已完成的详情列表 def load_processed_details(file_name="video_details.json"): video_list = [] id_list = [] if os.path.exists(file_name): with open(file_name, 'r', encoding='utf-8') as f: for line in f: line = line.strip() # 去掉首尾的空格或换行符 if line: # 确保行不为空 video = json.loads(line) # 将 JSON 字符串解析为 Python 字典 video_list.append(video) # 将 JSON 字符串解析为 Python 字典 id = video.get("id") # 提取 URL 字段 if id: id_list.append(id) # 添加到 URL 列表 #return json.load(f) return video_list, id_list # 保存详情到文件 def save_processed_details(data, file_name="video_details.json"): with open(file_name, 'a+', encoding='utf-8') as f: # 清理换行符或其他不可见字符 cleaned_entry = {k: str(v).replace("\n", " ").replace("\r", " ") for k, v in data.items()} # 写入一行 JSON 数据 f.write(json.dumps(cleaned_entry, ensure_ascii=False) + "\n") #json.dump(data, f, ensure_ascii=False, indent=4) # 逐个获取视频详情 def fetch_video_details(list_file="video_list.json", details_file="video_details.json"): # 加载视频列表 video_list = [] with open(list_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() # 去掉首尾的空格或换行符 if line: # 确保行不为空 video_list.append(json.loads(line)) # 将 JSON 字符串解析为 Python 字典 #video_list = json.load(f) # 加载已处理的详情 processed_details, id_list = load_processed_details(details_file) with YoutubeDL(ydl_opts) as ydl: # 添加自定义提取器 ydl.add_info_extractor(CustomPornHubIE()) for video in video_list: try: # 获取视频详情 url = video.get('url') if not url: logging.info(f"Wrong video, no url: {video.get('title')}") continue parsed_url = urlparse(url) query_params = parse_qs(parsed_url.query) video_id = query_params.get('viewkey', [None])[0] if video_id in id_list: logging.info(f"Skipping existed video: (ID: {video_id}) {video.get('title')}") continue logging.info(f"processing video: {video.get('title')} (ID: {video_id})") video_info = ydl.extract_info(url, download=False) # 自定义提取收藏、喜欢、点赞等信息(假设这些信息可以从网页中解析) video_data = { 'title': video_info.get('title'), 'url': url, 'id': video_id, "upload_date": video_info.get('upload_date'), 'view_count': video_info.get('view_count'), 'like_count': video_info.get('like_count'), 'dislike_count': video_info.get('dislike_count'), 'favorite_count': convert_to_number(video_info.get('favorite_count')), # 可能需要手动提取 } # 每次保存进度 save_processed_details(video_data, details_file) logging.info(f"get video detail succ: {video.get('title')}") except Exception as e: logging.error(f"get video detail error: {video.get('title')}, msg: {e}") time.sleep(1) logging.info("fetch_video_details done!") def main(): if len(sys.argv) != 2: print("Usage: python script.py ") print("cmd: get_list, get_detail, get_all") sys.exit(1) cmd = sys.argv[1] if cmd == "get_list": fetch_video_list(list_file) # 之前已经实现的获取列表功能 elif cmd == "get_detail": fetch_video_details(list_file, detail_file) # 之前已经实现的获取详情功能 elif cmd == "get_all": fetch_video_list(list_file) fetch_video_details(list_file, detail_file) else: print(f"Unknown command: {cmd}") if __name__ == '__main__': main()