add some scripts.

This commit is contained in:
2024-11-22 14:18:08 +08:00
parent a261a9e7ed
commit ba31911477
5 changed files with 387 additions and 0 deletions

178
scripts/pornhub/get_list.py Normal file
View File

@ -0,0 +1,178 @@
import json
import sys
import os
import time
import re
import logging
from datetime import datetime, timedelta
from urllib.parse import urlparse, parse_qs
from yt_dlp import YoutubeDL
import config
from custom_pornhub import CustomPornHubIE
# 配置 yt-dlp 获取视频元数据
ydl_opts = {
'http_headers': {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
'Referer': 'https://www.pornhub.com/',
},
'extract_flat': True, # 只提取元数据,不下载视频
'skip_download': True,
'verbose': True, # 输出详细日志
}
meta_dir = './meta'
list_file = f'{meta_dir}/video_list.json'
detail_file = f'{meta_dir}/video_details.json'
config.setup_logging()
def convert_to_number(value):
"""
将字符串解析为实际数字,支持 K和 M百万等单位
"""
match = re.match(r'^([\d.]+)([KkMm]?)$', value)
if not match:
return None
number = float(match.group(1))
unit = match.group(2).upper()
if unit == 'K': # 千
return int(number * 1000)
elif unit == 'M': # 百万
return int(number * 1000000)
return int(number) # 无单位,直接返回数字
# 筛选最近一年的视频
def is_recent_video(upload_date):
try:
# 上传日期格式为 YYYYMMDD
video_date = datetime.strptime(upload_date, "%Y%m%d")
one_year_ago = datetime.now() - timedelta(days=365)
return video_date >= one_year_ago
except ValueError:
return False
# 获取视频列表
def fetch_video_list(output_file="video_list.json"):
url = "https://www.pornhub.com/video?o=mr" # 根据时间排序(最近一年)
# 爬取视频列表
with YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(url, download=False)
entries = info_dict.get('entries', [])
# 保存到文件,确保每条记录一行
with open(output_file, 'w', encoding='utf-8') as f:
for entry in entries:
# 清理换行符或其他不可见字符
cleaned_entry = {k: str(v).replace("\n", " ").replace("\r", " ") for k, v in entry.items()}
# 写入一行 JSON 数据
f.write(json.dumps(cleaned_entry, ensure_ascii=False) + "\n")
# 读取已完成的详情列表
def load_processed_details(file_name="video_details.json"):
video_list = []
id_list = []
if os.path.exists(file_name):
with open(file_name, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip() # 去掉首尾的空格或换行符
if line: # 确保行不为空
video = json.loads(line) # 将 JSON 字符串解析为 Python 字典
video_list.append(video) # 将 JSON 字符串解析为 Python 字典
id = video.get("id") # 提取 URL 字段
if id:
id_list.append(id) # 添加到 URL 列表
#return json.load(f)
return video_list, id_list
# 保存详情到文件
def save_processed_details(data, file_name="video_details.json"):
with open(file_name, 'a+', encoding='utf-8') as f:
# 清理换行符或其他不可见字符
cleaned_entry = {k: str(v).replace("\n", " ").replace("\r", " ") for k, v in data.items()}
# 写入一行 JSON 数据
f.write(json.dumps(cleaned_entry, ensure_ascii=False) + "\n")
#json.dump(data, f, ensure_ascii=False, indent=4)
# 逐个获取视频详情
def fetch_video_details(list_file="video_list.json", details_file="video_details.json"):
# 加载视频列表
video_list = []
with open(list_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip() # 去掉首尾的空格或换行符
if line: # 确保行不为空
video_list.append(json.loads(line)) # 将 JSON 字符串解析为 Python 字典
#video_list = json.load(f)
# 加载已处理的详情
processed_details, id_list = load_processed_details(details_file)
with YoutubeDL(ydl_opts) as ydl:
# 添加自定义提取器
ydl.add_info_extractor(CustomPornHubIE())
for video in video_list:
try:
# 获取视频详情
url = video.get('url')
if not url:
logging.info(f"Wrong video, no url: {video.get('title')}")
continue
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
video_id = query_params.get('viewkey', [None])[0]
if video_id in id_list:
logging.info(f"Skipping existed video: (ID: {video_id}) {video.get('title')}")
continue
logging.info(f"processing video: {video.get('title')} (ID: {video_id})")
video_info = ydl.extract_info(url, download=False)
# 自定义提取收藏、喜欢、点赞等信息(假设这些信息可以从网页中解析)
video_data = {
'title': video_info.get('title'),
'url': url,
'id': video_id,
"upload_date": video_info.get('upload_date'),
'view_count': video_info.get('view_count'),
'like_count': video_info.get('like_count'),
'dislike_count': video_info.get('dislike_count'),
'favorite_count': convert_to_number(video_info.get('favorite_count')), # 可能需要手动提取
}
# 每次保存进度
save_processed_details(video_data, details_file)
logging.info(f"get video detail succ: {video.get('title')}")
except Exception as e:
logging.error(f"get video detail error: {video.get('title')}, msg: {e}")
time.sleep(1)
logging.info("fetch_video_details done!")
def main():
if len(sys.argv) != 2:
print("Usage: python script.py <cmd>")
print("cmd: get_list, get_detail, get_all")
sys.exit(1)
cmd = sys.argv[1]
if cmd == "get_list":
fetch_video_list(list_file) # 之前已经实现的获取列表功能
elif cmd == "get_detail":
fetch_video_details(list_file, detail_file) # 之前已经实现的获取详情功能
elif cmd == "get_all":
fetch_video_list(list_file)
fetch_video_details(list_file, detail_file)
else:
print(f"Unknown command: {cmd}")
if __name__ == '__main__':
main()