179 lines
6.6 KiB
Python
179 lines
6.6 KiB
Python
import json
|
||
import sys
|
||
import os
|
||
import time
|
||
import re
|
||
import logging
|
||
from datetime import datetime, timedelta
|
||
from urllib.parse import urlparse, parse_qs
|
||
from yt_dlp import YoutubeDL
|
||
import config
|
||
from custom_pornhub import CustomPornHubIE
|
||
|
||
# 配置 yt-dlp 获取视频元数据
|
||
ydl_opts = {
|
||
'http_headers': {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
|
||
'Referer': 'https://www.pornhub.com/',
|
||
},
|
||
'extract_flat': True, # 只提取元数据,不下载视频
|
||
'skip_download': True,
|
||
'verbose': True, # 输出详细日志
|
||
}
|
||
|
||
meta_dir = './meta'
|
||
list_file = f'{meta_dir}/video_list.json'
|
||
detail_file = f'{meta_dir}/video_details.json'
|
||
|
||
config.setup_logging()
|
||
|
||
def convert_to_number(value):
|
||
"""
|
||
将字符串解析为实际数字,支持 K(千)和 M(百万)等单位
|
||
"""
|
||
match = re.match(r'^([\d.]+)([KkMm]?)$', value)
|
||
if not match:
|
||
return None
|
||
number = float(match.group(1))
|
||
unit = match.group(2).upper()
|
||
|
||
if unit == 'K': # 千
|
||
return int(number * 1000)
|
||
elif unit == 'M': # 百万
|
||
return int(number * 1000000)
|
||
return int(number) # 无单位,直接返回数字
|
||
|
||
# 筛选最近一年的视频
|
||
def is_recent_video(upload_date):
|
||
try:
|
||
# 上传日期格式为 YYYYMMDD
|
||
video_date = datetime.strptime(upload_date, "%Y%m%d")
|
||
one_year_ago = datetime.now() - timedelta(days=365)
|
||
return video_date >= one_year_ago
|
||
except ValueError:
|
||
return False
|
||
|
||
# 获取视频列表
|
||
def fetch_video_list(output_file="video_list.json"):
|
||
#url = "https://www.pornhub.com/video?o=mr" # 根据时间排序(最近一年)
|
||
url = "https://www.pornhub.com/video?o=cm" # 最近
|
||
|
||
# 爬取视频列表
|
||
with YoutubeDL(ydl_opts) as ydl:
|
||
info_dict = ydl.extract_info(url, download=False)
|
||
entries = info_dict.get('entries', [])
|
||
|
||
# 保存到文件,确保每条记录一行
|
||
with open(output_file, 'w', encoding='utf-8') as f:
|
||
for entry in entries:
|
||
# 清理换行符或其他不可见字符
|
||
cleaned_entry = {k: str(v).replace("\n", " ").replace("\r", " ") for k, v in entry.items()}
|
||
# 写入一行 JSON 数据
|
||
f.write(json.dumps(cleaned_entry, ensure_ascii=False) + "\n")
|
||
|
||
# 读取已完成的详情列表
|
||
def load_processed_details(file_name="video_details.json"):
|
||
video_list = []
|
||
id_list = []
|
||
if os.path.exists(file_name):
|
||
with open(file_name, 'r', encoding='utf-8') as f:
|
||
for line in f:
|
||
line = line.strip() # 去掉首尾的空格或换行符
|
||
if line: # 确保行不为空
|
||
video = json.loads(line) # 将 JSON 字符串解析为 Python 字典
|
||
video_list.append(video) # 将 JSON 字符串解析为 Python 字典
|
||
id = video.get("id") # 提取 URL 字段
|
||
if id:
|
||
id_list.append(id) # 添加到 URL 列表
|
||
#return json.load(f)
|
||
return video_list, id_list
|
||
|
||
# 保存详情到文件
|
||
def save_processed_details(data, file_name="video_details.json"):
|
||
with open(file_name, 'a+', encoding='utf-8') as f:
|
||
# 清理换行符或其他不可见字符
|
||
cleaned_entry = {k: str(v).replace("\n", " ").replace("\r", " ") for k, v in data.items()}
|
||
# 写入一行 JSON 数据
|
||
f.write(json.dumps(cleaned_entry, ensure_ascii=False) + "\n")
|
||
#json.dump(data, f, ensure_ascii=False, indent=4)
|
||
|
||
# 逐个获取视频详情
|
||
def fetch_video_details(list_file="video_list.json", details_file="video_details.json"):
|
||
# 加载视频列表
|
||
video_list = []
|
||
with open(list_file, 'r', encoding='utf-8') as f:
|
||
for line in f:
|
||
line = line.strip() # 去掉首尾的空格或换行符
|
||
if line: # 确保行不为空
|
||
video_list.append(json.loads(line)) # 将 JSON 字符串解析为 Python 字典
|
||
#video_list = json.load(f)
|
||
|
||
# 加载已处理的详情
|
||
processed_details, id_list = load_processed_details(details_file)
|
||
|
||
with YoutubeDL(ydl_opts) as ydl:
|
||
# 添加自定义提取器
|
||
ydl.add_info_extractor(CustomPornHubIE())
|
||
|
||
for video in video_list:
|
||
try:
|
||
# 获取视频详情
|
||
url = video.get('url')
|
||
if not url:
|
||
logging.info(f"Wrong video, no url: {video.get('title')}")
|
||
continue
|
||
parsed_url = urlparse(url)
|
||
query_params = parse_qs(parsed_url.query)
|
||
video_id = query_params.get('viewkey', [None])[0]
|
||
|
||
if video_id in id_list:
|
||
logging.info(f"Skipping existed video: (ID: {video_id}) {video.get('title')}")
|
||
continue
|
||
|
||
logging.info(f"processing video: {video.get('title')} (ID: {video_id})")
|
||
video_info = ydl.extract_info(url, download=False)
|
||
|
||
# 自定义提取收藏、喜欢、点赞等信息(假设这些信息可以从网页中解析)
|
||
video_data = {
|
||
'title': video_info.get('title'),
|
||
'url': url,
|
||
'id': video_id,
|
||
"upload_date": video_info.get('upload_date'),
|
||
'view_count': video_info.get('view_count'),
|
||
'like_count': video_info.get('like_count'),
|
||
'dislike_count': video_info.get('dislike_count'),
|
||
'favorite_count': convert_to_number(video_info.get('favorite_count')), # 可能需要手动提取
|
||
}
|
||
|
||
# 每次保存进度
|
||
save_processed_details(video_data, details_file)
|
||
logging.info(f"get video detail succ: {video.get('title')}")
|
||
|
||
except Exception as e:
|
||
logging.error(f"get video detail error: {video.get('title')}, msg: {e}")
|
||
time.sleep(1)
|
||
|
||
logging.info("fetch_video_details done!")
|
||
|
||
|
||
def main():
|
||
if len(sys.argv) != 2:
|
||
print("Usage: python script.py <cmd>")
|
||
print("cmd: get_list, get_detail, get_all")
|
||
sys.exit(1)
|
||
|
||
cmd = sys.argv[1]
|
||
|
||
if cmd == "get_list":
|
||
fetch_video_list(list_file) # 之前已经实现的获取列表功能
|
||
elif cmd == "get_detail":
|
||
fetch_video_details(list_file, detail_file) # 之前已经实现的获取详情功能
|
||
elif cmd == "get_all":
|
||
fetch_video_list(list_file)
|
||
fetch_video_details(list_file, detail_file)
|
||
else:
|
||
print(f"Unknown command: {cmd}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main() |