modify scripts

This commit is contained in:
oscarz
2025-06-18 16:52:20 +08:00
parent e12fb725d5
commit 5403838793
5 changed files with 660 additions and 0 deletions

90
u9a9/src/config.py Normal file
View File

@ -0,0 +1,90 @@
import logging
import os
import inspect
import time
from datetime import datetime
from logging.handlers import RotatingFileHandler
from collections import defaultdict
home_dir = os.path.expanduser("~")
global_host_data_dir = f'{home_dir}/hostdir/scripts_data'
global_share_data_dir = f'{home_dir}/sharedata'
log_dir = '../log'
# 统计日志频率
log_count = defaultdict(int) # 记录日志的次数
last_log_time = defaultdict(float) # 记录上次写入的时间戳
class RateLimitFilter(logging.Filter):
"""
频率限制过滤器:
1. 在 60 秒内,同样的日志最多写入 60 次,超过则忽略
2. 如果日志速率超过 100 条/秒,发出告警
"""
LOG_LIMIT = 60 # 每分钟最多记录相同消息 10 次
def filter(self, record):
global log_count, last_log_time
message_key = record.getMessage() # 获取日志内容
# 计算当前时间
now = time.time()
elapsed = now - last_log_time[message_key]
# 限制相同日志的写入频率
if elapsed < 60: # 60 秒内
log_count[message_key] += 1
if log_count[message_key] > self.LOG_LIMIT:
print('reach limit.')
return False # 直接丢弃
else:
log_count[message_key] = 1 # 超过 60 秒,重新计数
last_log_time[message_key] = now
return True # 允许写入日志
def setup_logging(log_filename=None):
if log_filename is None:
caller_frame = inspect.stack()[1]
caller_filename = os.path.splitext(os.path.basename(caller_frame.filename))[0]
current_date = datetime.now().strftime('%Y%m%d')
os.makedirs(log_dir, exist_ok=True)
log_filename = f'{log_dir}/{caller_filename}_{current_date}.log'
#log_filename = f'../log/{caller_filename}_{current_date}.log'
max_log_size = 100 * 1024 * 1024 # 10 MB
max_log_files = 10 # 最多保留 10 个日志文件
file_handler = RotatingFileHandler(log_filename, maxBytes=max_log_size, backupCount=max_log_files)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s'
))
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s'
))
# 创建 logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.handlers = [] # 避免重复添加 handler
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# 添加频率限制
rate_limit_filter = RateLimitFilter()
file_handler.addFilter(rate_limit_filter)
console_handler.addFilter(rate_limit_filter)
# 运行示例
if __name__ == "__main__":
setup_logging()
for i in range(1000):
logging.info("测试日志,检测频率限制")
time.sleep(0.01) # 模拟快速写入日志

183
u9a9/src/fetch.py Normal file
View File

@ -0,0 +1,183 @@
import json
import time
import os
import argparse
import textwrap
import logging
from datetime import datetime, timedelta
from functools import partial
import config
import scraper
import utils
from urllib.parse import urljoin, urlparse
config.setup_logging()
debug = False
skip_local = False
scan_mode = 0
update_mode = 0
current_date_str = datetime.now().strftime("%Y-%m-%d")
target_csv = f"{config.global_share_data_dir}/u3c3.csv"
target_torrent_dir = f"{config.global_share_data_dir}/u3c3_torrents"
# 获取演员列表
def fetch_list(start_p=1):
p = start_p
total_results = []
while True:
url = f"https://u001.25img.com/?p={p}"
logging.info(f"fetching url {url}")
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="table-responsive", attr_type="class"))
if soup:
list_data, total_pages = scraper.parse_page(soup, url)
if list_data :
total_results.extend(list_data)
else:
logging.warning(f"fetch_list failed. url: {url} ")
if total_pages:
if p >= total_pages:
url = None
else:
p += 1
else:
logging.warning(f"fetch_list failed. url: {url} ")
url = None
else:
logging.warning(f'fetch_page error. url: {url}, status_code: {status_code}')
if debug:
break
# 写入csv文件
lines = utils.write_to_csv(total_results, target_csv)
if lines:
logging.info(f"write to file succ. total lines: {lines}, file: {target_csv}")
logging.info(f"fetch list finished. total pages: {p}")
# 下载资源
def down_torrents():
# 读取CSV数据
rows = utils.read_csv_data(target_csv)
if not rows:
return
# 创建主下载目录
os.makedirs(target_torrent_dir, exist_ok=True)
for row in rows:
title = row.get('title', '')
torrent_url = row.get('torrent_url', '')
# 检查URL是否合法
if not (torrent_url.startswith('https') and torrent_url.endswith('.torrent')):
logging.warning(f"跳过非法torrent链接: {torrent_url}")
continue
# 解析文件名
try:
parsed_url = urlparse(torrent_url)
filename = os.path.basename(parsed_url.path)
if not filename:
logging.warning(f"无法从URL解析文件名: {torrent_url}")
continue
except Exception as e:
logging.warning(f"解析URL时出错: {e}")
continue
# 创建子目录(按文件名首字母小写)
first_char = filename[0].lower()
subdir = os.path.join(target_torrent_dir, first_char)
os.makedirs(subdir, exist_ok=True)
# 检查文件是否已存在
local_path = os.path.join(subdir, filename)
if os.path.exists(local_path):
logging.info(f"文件已存在,跳过下载: {title}, {local_path}")
continue
succ = scraper.download_torrent(torrent_url, local_path)
if succ:
logging.info(f"download succ. {title}, {local_path}")
if debug:
break
time.sleep(1)
# 建立缩写到函数的映射
function_map = {
"list": fetch_list,
"down" : down_torrents,
}
# 主函数
def main(cmd, args):
# 执行指定的函数
if cmd:
function_names = args.cmd.split(",") # 拆分输入
for short_name in function_names:
func = function_map.get(short_name.strip()) # 从映射中获取对应的函数
if callable(func):
func()
else:
logging.warning(f" {short_name} is not a valid function shortcut.")
else: # 全量执行
for name, func in function_map.items():
if callable(func):
func()
else:
logging.warning(f" {short_name} is not a valid function shortcut.")
logging.info(f'all process completed!')
# TODO:
# 1,
# 设置环境变量
def set_env(args):
global debug
debug = args.debug
if debug:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
global skip_local
skip_local = args.skip_local
global scan_mode
scan_mode = args.scan_mode
global update_mode
if args.update:
update_mode = args.update
if __name__ == "__main__":
# 命令行参数处理
keys_str = ",".join(function_map.keys())
usage_examples = textwrap.dedent('''
示例用法:
python3 ./fetch.py # 刷新列表,并下载新增资源
python3 ./fetch.py --cmd=list # 刷新列表
python3 ./fetch.py --cmd=down # 并下载新增资源
''')
parser = argparse.ArgumentParser(
description='fetch javhd data.\n\n' + usage_examples,
formatter_class=argparse.RawDescriptionHelpFormatter
)
#parser = argparse.ArgumentParser(description='fetch javdb data.')
parser.add_argument("--cmd", type=str, help=f"Comma-separated list of function shortcuts: {keys_str}")
parser.add_argument('--update', type=int, choices=[0, 1, 2, 3, 4], default=0, help='0-只遍历is_full_data=0(默认), 1-只遍历is_full_data=1, 2-遍历is_full_data<=1, 3-只遍历is_full_data>1(异常数据), 4-遍历所有')
parser.add_argument('--scan_mode', type=int, choices=[0, 1, 2], default=1, help='1-只遍历所有 uncensored 的 makers/series/actors/movies(默认), 0-与前者相反, 2-全量')
parser.add_argument('--skip_local', action='store_true', help='如果本地缓存了页面,则跳过数据库操作')
parser.add_argument('--debug', action='store_true', help='Enable debug mode (limit records)')
args = parser.parse_args()
set_env(args)
main(args.cmd, args)

226
u9a9/src/scraper.py Normal file
View File

@ -0,0 +1,226 @@
import time
import json
import csv
import logging
import signal
import sys
import os
import re
import requests
import random
from bs4 import BeautifulSoup
from requests.exceptions import RequestException
from functools import partial
import config
import utils
# 定义基础 URL 和可变参数
host_url = 'https://u001.25img.com'
list_url_update = f'{host_url}/category.html?pageNum=1&pageSize=30&catId=-1&size=-1&isFinish=-1&updT=-1&orderBy=update'
#list_url_wordcount = 'https://aabook.xyz/category.html?pageNum=1&pageSize=30&catId=-1&size=-1&isFinish=-1&updT=-1&orderBy=wordcount'
# User-Agent 列表
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.63 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.67",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:90.0) Gecko/20100101 Firefox/90.0",
"Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Mobile Safari/537.36"
]
#使用 CloudScraper 进行网络请求,并执行页面验证,支持不同解析器和预处理
def fetch_page(url, validator, parser="html.parser", preprocessor=None, max_retries=3, sleep_time=5, default_timeout=10):
for attempt in range(max_retries):
try:
if '25img.com' not in url.lower():
logging.error(f'wrong url format: {url}')
return None, None
# 随机选择一个 User-Agent
headers = {
'User-Agent': random.choice(user_agents)
}
response = requests.get(url, headers=headers, timeout=default_timeout, stream=True)
# 处理 HTTP 状态码
if response.status_code == 404:
logging.warning(f"Page not found (404): {url}")
return None, 404 # 直接返回 404调用方可以跳过
response.raise_for_status() # 处理 HTTP 错误
# 预处理 HTML如果提供了 preprocessor
html_text = preprocessor(response.text) if preprocessor else response.text
soup = BeautifulSoup(html_text, parser)
if validator(soup): # 进行自定义页面检查
return soup, response.status_code
logging.warning(f"Validation failed on attempt {attempt + 1} for {url}")
except requests.RequestException as e:
logging.warning(f"fetching page ({url}) error: {e}, Retrying ...")
time.sleep(sleep_time) # 休眠指定的时间,然后重试
logging.error(f'Fetching failed after max retries. {url}')
return None, None # 达到最大重试次数仍然失败
# 通用的 HTML 结构验证器
def generic_validator(soup, tag, identifier, attr_type="id"):
if attr_type == "id":
return soup.find(tag, id=identifier) is not None
elif attr_type == "class":
return bool(soup.find_all(tag, class_=identifier))
elif attr_type == "name":
return bool(soup.find('select', {'name': identifier}))
return False
def parse_size(size_text: str) -> float:
"""解析大小文本为GB保留两位小数"""
try:
match = re.search(r'(\d+\.\d+|\d+)\s*([A-Za-z]+)', size_text)
if not match:
logging.warning(f"无法解析大小文本: {size_text}")
return 0.0
value, unit = match.groups()
value = float(value)
if unit.lower() == 'mb':
return round(value / 1024, 2)
elif unit.lower() == 'gb':
return round(value, 2)
else:
logging.warning(f"未知单位: {unit} in {size_text}")
return 0.0
except Exception as e:
logging.warning(f"解析大小出错: {e}")
return 0.0
# 解析页面内容
def parse_page(soup, url):
# 解析表格
table = soup.find('table', class_='torrent-list')
if not table:
logging.warning("未找到torrent-list表格")
return None, None
# 获取 tbody 标签中的所有 tr 行
tbody = table.find('tbody')
if not tbody:
logging.warning("未找到tbody表格")
return None, None
rows = tbody.find_all('tr')
if not rows:
logging.warning("表格中没有数据行")
return None, None
results = []
for row in rows:
try:
tds = row.find_all('td')
if len(tds) < 5: # 至少需要5个td
logging.warning("tr格式不对")
continue
# 提取类别
category_td = tds[0]
category_link = category_td.find('a')
category = category_link.get('title', '未知类别') if category_link else '未知类别'
# 提取标题和URL
title_td = tds[1]
title_link = title_td.find('a')
title = title_link.get('title', '未知标题') if title_link else '未知标题'
url = title_link.get('href', '') if title_link else ''
url = host_url + url
# 提取种子和磁力链接
link_td = tds[2]
links = link_td.find_all('a')
torrent_url = links[0].get('href', '') if len(links) > 0 else ''
magnet_url = links[1].get('href', '') if len(links) > 1 else ''
torrent_url = host_url + torrent_url
# 提取大小
size_td = tds[3]
size_text = size_td.get_text(strip=True)
size_gb = parse_size(size_text)
# 提取日期
date_td = tds[4]
update_date = date_td.get_text(strip=True)
results.append({
'category' : category,
'title' : title,
'url' : url,
'torrent_url' : torrent_url,
'magnet_url' : magnet_url,
'size_text' : size_text,
'size_gb' : size_gb,
'update_date' : update_date,
})
except Exception as e:
logging.error(f"解析行时出错: {e}")
continue
# 解析总页数
paginator_script = None
for script in soup.find_all('script'):
if 'bootstrapPaginator' in str(script):
paginator_script = str(script)
break
total_pages = None
if paginator_script:
try:
match = re.search(r'totalPages:\s*(\d+)', paginator_script)
if match:
total_pages = int(match.group(1))
else:
logging.warning("未找到总页数信息")
except Exception as e:
logging.error(f"解析总页数时出错: {e}")
else:
logging.warning("未找到分页脚本")
return results, total_pages
def download_torrent(torrent_url, target_file):
try:
# 获取 .torrent 文件
# 随机选择一个 User-Agent
headers = {
'User-Agent': random.choice(user_agents)
}
response = requests.get(torrent_url, headers=headers, stream=True)
if response.status_code != 200:
logging.warning(f"download failed, url: {torrent_url}, status_code: {response.status_code}")
# 保存文件
with open(target_file, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return True
except Exception as e:
logging.warning(f"Error downloading {torrent_url}: {str(e)}")
return False
def test_chapter_page(url):
soup, status_code = fetch_page(url, partial(generic_validator, tag="div", identifier="table-responsive", attr_type="class"))
if soup:
data, total_pages = parse_page(soup, url)
if data:
print(data)
if total_pages :
print(total_pages)
if __name__ == "__main__":
test_chapter_page('https://u001.25img.com/?p=1')

43
u9a9/src/utils.py Normal file
View File

@ -0,0 +1,43 @@
import csv
import os
def write_to_csv(data, filename='output.csv'):
"""将资源数据写入CSV文件"""
if not data:
print("没有数据可写入")
return None
# 定义CSV文件的列名
fieldnames = [
'category', 'title', 'url',
'torrent_url', 'magnet_url',
'size_text', 'size_gb', 'update_date'
]
try:
# 写入CSV文件
with open(filename, 'w', newline='', encoding='utf-8-sig') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# 写入表头
writer.writeheader()
# 写入数据行
for row in data:
writer.writerow(row)
return len(data)
except Exception as e:
print(f"写入CSV文件时出错: {e}")
return None
def read_csv_data(csv_file):
"""读取CSV文件并返回数据列表"""
if not os.path.exists(csv_file):
print(f"错误CSV文件 '{csv_file}' 不存在")
return []
with open(csv_file, 'r', encoding='utf-8-sig') as file:
return list(csv.DictReader(file))

118
u9a9/test_u3a3.py Normal file
View File

@ -0,0 +1,118 @@
"""
Script Name:
Description: 获取 u9a9 数据, prompt:
我们需要访问 https://u9a9.org/?type=2&search={q}&p=4 这个地址,并返回数据,以下是需求详细描述:
q 参数,我们有一个数组,分别是 qlist = ['[BD', '合集2']
p 参数是要访问的页码它通常从1开始。
我们循环遍历 qlist对每一个值从 p=1 开始,组成一个访问的 URL 获取该 URL 的内容,它是一个页面,页面结构简化之后,就是我刚才发给你的内容。我们需要做的是:
解析 tbody 标签中的若干个 tr对每个 tr获取第二个 td 中的 title 文本,并去掉 [BD/{}] 的部分记为title
获取第三个td中的第一个链接它是一个 .torrent 文件,我们下载它,命名为 {title}..torrent ;
然后我们解析 <div class="center"> 中的内容,它是一个页码导航,我们只需要关注 li 中文本为 >> 的这一行,解析出 href 字段,并取出 p 值这个值与上面的URL拼起来就是我们要访问的下一页。如果没有 匹配到这一行,那就代表访问结束了。
请你理解上面的需求,并写出相应的 python脚本。
Author: [Your Name]
Created Date: YYYY-MM-DD
Last Modified: YYYY-MM-DD
Version: 1.0
Modification History:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
- YYYY-MM-DD [Your Name]:
"""
import requests
from bs4 import BeautifulSoup
import re
import os
import time
# 模拟头部,避免被认为是爬虫
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101 Firefox/91.0'
}
# 定义搜索词数组
qlist = ['[BD']
# 定义下载路径
download_path = "./torrents/"
if not os.path.exists(download_path):
os.makedirs(download_path)
def download_torrent(torrent_url, title):
try:
# 获取 .torrent 文件
response = requests.get(torrent_url, headers=headers, stream=True)
torrent_file_name = f"{title}.torrent"
torrent_path = os.path.join(download_path, torrent_file_name)
# 保存文件
with open(torrent_path, 'wb') as f:
f.write(response.content)
print(f"Downloaded: {torrent_file_name}")
except Exception as e:
print(f"Error downloading {torrent_url}: {str(e)}")
# 解析页面内容
def parse_page(html_content):
soup = BeautifulSoup(html_content, 'html.parser')
# 获取 tbody 标签中的所有 tr 行
tbody = soup.find('tbody')
rows = tbody.find_all('tr', class_='default')
for row in rows:
# 获取第二个td中的标题文本并去掉 [BD/{}] 部分
title_td = row.find_all('td')[1]
raw_title = title_td.find('a')['title'].strip()
#title = re.sub(r'\[BD/\d+\.\d+G\]', '', raw_title).strip()
title = re.sub(r'\[.*?\]', '', raw_title).strip()
# 获取第三个td中的第一个链接
magnet_td = row.find_all('td')[2]
torrent_link = magnet_td.find('a', href=re.compile(r'.torrent'))['href']
# 拼接完整的链接并移除 host 中的 '-'
full_torrent_link = f"https://u001.25img.com{torrent_link}".replace('-', '')
# 下载 torrent 文件
download_torrent(full_torrent_link, title)
time.sleep(3) # 避免请求过快
# 解析页码导航,获取下一页链接
pagination = soup.find('div', class_='center').find('nav').find('ul', class_='pagination')
next_page = pagination.find('a', text='»')
if next_page:
next_page_url = next_page['href']
next_p_value = re.search(r'p=(\d+)', next_page_url).group(1)
return next_p_value
return None
# 爬取指定 q 和 p 的页面
def scrape(q, start_p=1):
p = start_p
while True:
#url = f"https://u9a9.org/?type=2&search={q}&p={p}"
url = f"https://u001.25img.com/?search2=eelj1a3lfe1a1&search={q}&p={p}"
print(f"Fetching URL: {url}")
response = requests.get(url, headers=headers)
if response.status_code != 200:
print(f"Failed to fetch {url}")
break
next_p = parse_page(response.text)
if next_p:
p = next_p
else:
print(f"No more pages for query {q}.")
break
# 循环遍历 qlist
for q in qlist:
scrape(q, start_p=1)