From 5403838793394c09b6e42a236108da94ee1eb2cf Mon Sep 17 00:00:00 2001 From: oscarz Date: Wed, 18 Jun 2025 16:52:20 +0800 Subject: [PATCH] modify scripts --- u9a9/src/config.py | 90 ++++++++++++++++++ u9a9/src/fetch.py | 183 +++++++++++++++++++++++++++++++++++ u9a9/src/scraper.py | 226 ++++++++++++++++++++++++++++++++++++++++++++ u9a9/src/utils.py | 43 +++++++++ u9a9/test_u3a3.py | 118 +++++++++++++++++++++++ 5 files changed, 660 insertions(+) create mode 100644 u9a9/src/config.py create mode 100644 u9a9/src/fetch.py create mode 100644 u9a9/src/scraper.py create mode 100644 u9a9/src/utils.py create mode 100644 u9a9/test_u3a3.py diff --git a/u9a9/src/config.py b/u9a9/src/config.py new file mode 100644 index 0000000..b318d06 --- /dev/null +++ b/u9a9/src/config.py @@ -0,0 +1,90 @@ +import logging +import os +import inspect +import time +from datetime import datetime +from logging.handlers import RotatingFileHandler +from collections import defaultdict + +home_dir = os.path.expanduser("~") +global_host_data_dir = f'{home_dir}/hostdir/scripts_data' +global_share_data_dir = f'{home_dir}/sharedata' + +log_dir = '../log' + +# 统计日志频率 +log_count = defaultdict(int) # 记录日志的次数 +last_log_time = defaultdict(float) # 记录上次写入的时间戳 + +class RateLimitFilter(logging.Filter): + """ + 频率限制过滤器: + 1. 在 60 秒内,同样的日志最多写入 60 次,超过则忽略 + 2. 如果日志速率超过 100 条/秒,发出告警 + """ + LOG_LIMIT = 60 # 每分钟最多记录相同消息 10 次 + + def filter(self, record): + global log_count, last_log_time + message_key = record.getMessage() # 获取日志内容 + + # 计算当前时间 + now = time.time() + elapsed = now - last_log_time[message_key] + + # 限制相同日志的写入频率 + if elapsed < 60: # 60 秒内 + log_count[message_key] += 1 + if log_count[message_key] > self.LOG_LIMIT: + print('reach limit.') + return False # 直接丢弃 + else: + log_count[message_key] = 1 # 超过 60 秒,重新计数 + + last_log_time[message_key] = now + + return True # 允许写入日志 + + +def setup_logging(log_filename=None): + if log_filename is None: + caller_frame = inspect.stack()[1] + caller_filename = os.path.splitext(os.path.basename(caller_frame.filename))[0] + current_date = datetime.now().strftime('%Y%m%d') + os.makedirs(log_dir, exist_ok=True) + log_filename = f'{log_dir}/{caller_filename}_{current_date}.log' + #log_filename = f'../log/{caller_filename}_{current_date}.log' + + max_log_size = 100 * 1024 * 1024 # 10 MB + max_log_files = 10 # 最多保留 10 个日志文件 + + file_handler = RotatingFileHandler(log_filename, maxBytes=max_log_size, backupCount=max_log_files) + file_handler.setFormatter(logging.Formatter( + '%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s' + )) + + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter( + '%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s' + )) + + # 创建 logger + logger = logging.getLogger() + logger.setLevel(logging.INFO) + logger.handlers = [] # 避免重复添加 handler + logger.addHandler(file_handler) + logger.addHandler(console_handler) + + # 添加频率限制 + rate_limit_filter = RateLimitFilter() + file_handler.addFilter(rate_limit_filter) + console_handler.addFilter(rate_limit_filter) + + +# 运行示例 +if __name__ == "__main__": + setup_logging() + + for i in range(1000): + logging.info("测试日志,检测频率限制") + time.sleep(0.01) # 模拟快速写入日志 \ No newline at end of file diff --git a/u9a9/src/fetch.py b/u9a9/src/fetch.py new file mode 100644 index 0000000..87f3ae5 --- /dev/null +++ b/u9a9/src/fetch.py @@ -0,0 +1,183 @@ + +import json +import time +import os +import argparse +import textwrap +import logging +from datetime import datetime, timedelta +from functools import partial +import config +import scraper +import utils +from urllib.parse import urljoin, urlparse + +config.setup_logging() + +debug = False +skip_local = False +scan_mode = 0 +update_mode = 0 + +current_date_str = datetime.now().strftime("%Y-%m-%d") +target_csv = f"{config.global_share_data_dir}/u3c3.csv" +target_torrent_dir = f"{config.global_share_data_dir}/u3c3_torrents" + +# 获取演员列表 +def fetch_list(start_p=1): + p = start_p + total_results = [] + while True: + url = f"https://u001.25img.com/?p={p}" + logging.info(f"fetching url {url}") + soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="table-responsive", attr_type="class")) + if soup: + list_data, total_pages = scraper.parse_page(soup, url) + if list_data : + total_results.extend(list_data) + else: + logging.warning(f"fetch_list failed. url: {url} ") + if total_pages: + if p >= total_pages: + url = None + else: + p += 1 + else: + logging.warning(f"fetch_list failed. url: {url} ") + url = None + + else: + logging.warning(f'fetch_page error. url: {url}, status_code: {status_code}') + + if debug: + break + + # 写入csv文件 + lines = utils.write_to_csv(total_results, target_csv) + if lines: + logging.info(f"write to file succ. total lines: {lines}, file: {target_csv}") + logging.info(f"fetch list finished. total pages: {p}") + + +# 下载资源 +def down_torrents(): + # 读取CSV数据 + rows = utils.read_csv_data(target_csv) + if not rows: + return + + # 创建主下载目录 + os.makedirs(target_torrent_dir, exist_ok=True) + + for row in rows: + title = row.get('title', '') + torrent_url = row.get('torrent_url', '') + + # 检查URL是否合法 + if not (torrent_url.startswith('https') and torrent_url.endswith('.torrent')): + logging.warning(f"跳过非法torrent链接: {torrent_url}") + continue + + # 解析文件名 + try: + parsed_url = urlparse(torrent_url) + filename = os.path.basename(parsed_url.path) + + if not filename: + logging.warning(f"无法从URL解析文件名: {torrent_url}") + continue + except Exception as e: + logging.warning(f"解析URL时出错: {e}") + continue + + # 创建子目录(按文件名首字母小写) + first_char = filename[0].lower() + subdir = os.path.join(target_torrent_dir, first_char) + os.makedirs(subdir, exist_ok=True) + + # 检查文件是否已存在 + local_path = os.path.join(subdir, filename) + if os.path.exists(local_path): + logging.info(f"文件已存在,跳过下载: {title}, {local_path}") + continue + + succ = scraper.download_torrent(torrent_url, local_path) + if succ: + logging.info(f"download succ. {title}, {local_path}") + + if debug: + break + time.sleep(1) + +# 建立缩写到函数的映射 +function_map = { + "list": fetch_list, + "down" : down_torrents, +} + +# 主函数 +def main(cmd, args): + # 执行指定的函数 + if cmd: + function_names = args.cmd.split(",") # 拆分输入 + for short_name in function_names: + func = function_map.get(short_name.strip()) # 从映射中获取对应的函数 + if callable(func): + func() + else: + logging.warning(f" {short_name} is not a valid function shortcut.") + else: # 全量执行 + for name, func in function_map.items(): + if callable(func): + func() + else: + logging.warning(f" {short_name} is not a valid function shortcut.") + + logging.info(f'all process completed!') + + # TODO: + # 1, + +# 设置环境变量 +def set_env(args): + global debug + debug = args.debug + if debug: + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) + + global skip_local + skip_local = args.skip_local + + global scan_mode + scan_mode = args.scan_mode + + global update_mode + if args.update: + update_mode = args.update + +if __name__ == "__main__": + # 命令行参数处理 + keys_str = ",".join(function_map.keys()) + + usage_examples = textwrap.dedent(''' + 示例用法: + python3 ./fetch.py # 刷新列表,并下载新增资源 + python3 ./fetch.py --cmd=list # 刷新列表 + python3 ./fetch.py --cmd=down # 并下载新增资源 + ''') + + parser = argparse.ArgumentParser( + description='fetch javhd data.\n\n' + usage_examples, + formatter_class=argparse.RawDescriptionHelpFormatter + ) + #parser = argparse.ArgumentParser(description='fetch javdb data.') + parser.add_argument("--cmd", type=str, help=f"Comma-separated list of function shortcuts: {keys_str}") + parser.add_argument('--update', type=int, choices=[0, 1, 2, 3, 4], default=0, help='0-只遍历is_full_data=0(默认), 1-只遍历is_full_data=1, 2-遍历is_full_data<=1, 3-只遍历is_full_data>1(异常数据), 4-遍历所有') + parser.add_argument('--scan_mode', type=int, choices=[0, 1, 2], default=1, help='1-只遍历所有 uncensored 的 makers/series/actors/movies(默认), 0-与前者相反, 2-全量') + parser.add_argument('--skip_local', action='store_true', help='如果本地缓存了页面,则跳过数据库操作') + parser.add_argument('--debug', action='store_true', help='Enable debug mode (limit records)') + args = parser.parse_args() + + set_env(args) + main(args.cmd, args) \ No newline at end of file diff --git a/u9a9/src/scraper.py b/u9a9/src/scraper.py new file mode 100644 index 0000000..2e8110e --- /dev/null +++ b/u9a9/src/scraper.py @@ -0,0 +1,226 @@ +import time +import json +import csv +import logging +import signal +import sys +import os +import re +import requests +import random +from bs4 import BeautifulSoup +from requests.exceptions import RequestException +from functools import partial +import config +import utils + +# 定义基础 URL 和可变参数 +host_url = 'https://u001.25img.com' +list_url_update = f'{host_url}/category.html?pageNum=1&pageSize=30&catId=-1&size=-1&isFinish=-1&updT=-1&orderBy=update' +#list_url_wordcount = 'https://aabook.xyz/category.html?pageNum=1&pageSize=30&catId=-1&size=-1&isFinish=-1&updT=-1&orderBy=wordcount' + +# User-Agent 列表 +user_agents = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.63 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.67", + "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:90.0) Gecko/20100101 Firefox/90.0", + "Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Mobile Safari/537.36" +] + +#使用 CloudScraper 进行网络请求,并执行页面验证,支持不同解析器和预处理 +def fetch_page(url, validator, parser="html.parser", preprocessor=None, max_retries=3, sleep_time=5, default_timeout=10): + for attempt in range(max_retries): + try: + if '25img.com' not in url.lower(): + logging.error(f'wrong url format: {url}') + return None, None + + # 随机选择一个 User-Agent + headers = { + 'User-Agent': random.choice(user_agents) + } + response = requests.get(url, headers=headers, timeout=default_timeout, stream=True) + + # 处理 HTTP 状态码 + if response.status_code == 404: + logging.warning(f"Page not found (404): {url}") + return None, 404 # 直接返回 404,调用方可以跳过 + + response.raise_for_status() # 处理 HTTP 错误 + + # 预处理 HTML(如果提供了 preprocessor) + html_text = preprocessor(response.text) if preprocessor else response.text + + soup = BeautifulSoup(html_text, parser) + if validator(soup): # 进行自定义页面检查 + return soup, response.status_code + + logging.warning(f"Validation failed on attempt {attempt + 1} for {url}") + except requests.RequestException as e: + logging.warning(f"fetching page ({url}) error: {e}, Retrying ...") + time.sleep(sleep_time) # 休眠指定的时间,然后重试 + + logging.error(f'Fetching failed after max retries. {url}') + return None, None # 达到最大重试次数仍然失败 + + +# 通用的 HTML 结构验证器 +def generic_validator(soup, tag, identifier, attr_type="id"): + if attr_type == "id": + return soup.find(tag, id=identifier) is not None + elif attr_type == "class": + return bool(soup.find_all(tag, class_=identifier)) + elif attr_type == "name": + return bool(soup.find('select', {'name': identifier})) + return False + +def parse_size(size_text: str) -> float: + """解析大小文本为GB,保留两位小数""" + try: + match = re.search(r'(\d+\.\d+|\d+)\s*([A-Za-z]+)', size_text) + if not match: + logging.warning(f"无法解析大小文本: {size_text}") + return 0.0 + + value, unit = match.groups() + value = float(value) + + if unit.lower() == 'mb': + return round(value / 1024, 2) + elif unit.lower() == 'gb': + return round(value, 2) + else: + logging.warning(f"未知单位: {unit} in {size_text}") + return 0.0 + except Exception as e: + logging.warning(f"解析大小出错: {e}") + return 0.0 + +# 解析页面内容 +def parse_page(soup, url): + # 解析表格 + table = soup.find('table', class_='torrent-list') + if not table: + logging.warning("未找到torrent-list表格") + return None, None + + # 获取 tbody 标签中的所有 tr 行 + tbody = table.find('tbody') + if not tbody: + logging.warning("未找到tbody表格") + return None, None + + rows = tbody.find_all('tr') + if not rows: + logging.warning("表格中没有数据行") + return None, None + + results = [] + for row in rows: + try: + tds = row.find_all('td') + if len(tds) < 5: # 至少需要5个td + logging.warning("tr格式不对") + continue + + # 提取类别 + category_td = tds[0] + category_link = category_td.find('a') + category = category_link.get('title', '未知类别') if category_link else '未知类别' + + # 提取标题和URL + title_td = tds[1] + title_link = title_td.find('a') + title = title_link.get('title', '未知标题') if title_link else '未知标题' + url = title_link.get('href', '') if title_link else '' + url = host_url + url + + # 提取种子和磁力链接 + link_td = tds[2] + links = link_td.find_all('a') + torrent_url = links[0].get('href', '') if len(links) > 0 else '' + magnet_url = links[1].get('href', '') if len(links) > 1 else '' + torrent_url = host_url + torrent_url + + # 提取大小 + size_td = tds[3] + size_text = size_td.get_text(strip=True) + size_gb = parse_size(size_text) + + # 提取日期 + date_td = tds[4] + update_date = date_td.get_text(strip=True) + + results.append({ + 'category' : category, + 'title' : title, + 'url' : url, + 'torrent_url' : torrent_url, + 'magnet_url' : magnet_url, + 'size_text' : size_text, + 'size_gb' : size_gb, + 'update_date' : update_date, + }) + + except Exception as e: + logging.error(f"解析行时出错: {e}") + continue + + # 解析总页数 + paginator_script = None + for script in soup.find_all('script'): + if 'bootstrapPaginator' in str(script): + paginator_script = str(script) + break + total_pages = None + if paginator_script: + try: + match = re.search(r'totalPages:\s*(\d+)', paginator_script) + if match: + total_pages = int(match.group(1)) + else: + logging.warning("未找到总页数信息") + except Exception as e: + logging.error(f"解析总页数时出错: {e}") + else: + logging.warning("未找到分页脚本") + + return results, total_pages + +def download_torrent(torrent_url, target_file): + try: + # 获取 .torrent 文件 + # 随机选择一个 User-Agent + headers = { + 'User-Agent': random.choice(user_agents) + } + response = requests.get(torrent_url, headers=headers, stream=True) + + if response.status_code != 200: + logging.warning(f"download failed, url: {torrent_url}, status_code: {response.status_code}") + + # 保存文件 + with open(target_file, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + return True + except Exception as e: + logging.warning(f"Error downloading {torrent_url}: {str(e)}") + return False + + +def test_chapter_page(url): + soup, status_code = fetch_page(url, partial(generic_validator, tag="div", identifier="table-responsive", attr_type="class")) + if soup: + data, total_pages = parse_page(soup, url) + if data: + print(data) + if total_pages : + print(total_pages) + +if __name__ == "__main__": + test_chapter_page('https://u001.25img.com/?p=1') + + \ No newline at end of file diff --git a/u9a9/src/utils.py b/u9a9/src/utils.py new file mode 100644 index 0000000..5781680 --- /dev/null +++ b/u9a9/src/utils.py @@ -0,0 +1,43 @@ +import csv +import os + +def write_to_csv(data, filename='output.csv'): + """将资源数据写入CSV文件""" + if not data: + print("没有数据可写入") + return None + + # 定义CSV文件的列名 + fieldnames = [ + 'category', 'title', 'url', + 'torrent_url', 'magnet_url', + 'size_text', 'size_gb', 'update_date' + ] + + try: + # 写入CSV文件 + with open(filename, 'w', newline='', encoding='utf-8-sig') as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + + # 写入表头 + writer.writeheader() + + # 写入数据行 + for row in data: + writer.writerow(row) + + return len(data) + + except Exception as e: + print(f"写入CSV文件时出错: {e}") + return None + + +def read_csv_data(csv_file): + """读取CSV文件并返回数据列表""" + if not os.path.exists(csv_file): + print(f"错误:CSV文件 '{csv_file}' 不存在") + return [] + + with open(csv_file, 'r', encoding='utf-8-sig') as file: + return list(csv.DictReader(file)) \ No newline at end of file diff --git a/u9a9/test_u3a3.py b/u9a9/test_u3a3.py new file mode 100644 index 0000000..fe7cb16 --- /dev/null +++ b/u9a9/test_u3a3.py @@ -0,0 +1,118 @@ +""" +Script Name: +Description: 获取 u9a9 数据, prompt: + 我们需要访问 https://u9a9.org/?type=2&search={q}&p=4 这个地址,并返回数据,以下是需求详细描述: + q 参数,我们有一个数组,分别是 qlist = ['[BD', '合集2'] + p 参数,是要访问的页码,它通常从1开始。 + + 我们循环遍历 qlist,对每一个值,从 p=1 开始,组成一个访问的 URL, 获取该 URL 的内容,它是一个页面,页面结构简化之后,就是我刚才发给你的内容。我们需要做的是: + 解析 tbody 标签中的若干个 tr,对每个 tr,获取第二个 td 中的 title 文本,并去掉 [BD/{}] 的部分,记为title; + 获取第三个td中的第一个链接,它是一个 .torrent 文件,我们下载它,命名为 {title}..torrent ; + 然后我们解析
中的内容,它是一个页码导航,我们只需要关注 li 中文本为 >> 的这一行,解析出 href 字段,并取出 p 值,这个值与上面的URL拼起来,就是我们要访问的下一页。如果没有 匹配到这一行,那就代表访问结束了。 + + 请你理解上面的需求,并写出相应的 python脚本。 + +Author: [Your Name] +Created Date: YYYY-MM-DD +Last Modified: YYYY-MM-DD +Version: 1.0 + + +Modification History: + - YYYY-MM-DD [Your Name]: + - YYYY-MM-DD [Your Name]: + - YYYY-MM-DD [Your Name]: +""" + +import requests +from bs4 import BeautifulSoup +import re +import os +import time + +# 模拟头部,避免被认为是爬虫 +headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101 Firefox/91.0' +} + +# 定义搜索词数组 +qlist = ['[BD'] + +# 定义下载路径 +download_path = "./torrents/" +if not os.path.exists(download_path): + os.makedirs(download_path) + +def download_torrent(torrent_url, title): + try: + # 获取 .torrent 文件 + response = requests.get(torrent_url, headers=headers, stream=True) + torrent_file_name = f"{title}.torrent" + torrent_path = os.path.join(download_path, torrent_file_name) + + # 保存文件 + with open(torrent_path, 'wb') as f: + f.write(response.content) + print(f"Downloaded: {torrent_file_name}") + except Exception as e: + print(f"Error downloading {torrent_url}: {str(e)}") + +# 解析页面内容 +def parse_page(html_content): + soup = BeautifulSoup(html_content, 'html.parser') + + # 获取 tbody 标签中的所有 tr 行 + tbody = soup.find('tbody') + rows = tbody.find_all('tr', class_='default') + + for row in rows: + # 获取第二个td中的标题文本,并去掉 [BD/{}] 部分 + title_td = row.find_all('td')[1] + raw_title = title_td.find('a')['title'].strip() + #title = re.sub(r'\[BD/\d+\.\d+G\]', '', raw_title).strip() + title = re.sub(r'\[.*?\]', '', raw_title).strip() + + # 获取第三个td中的第一个链接 + magnet_td = row.find_all('td')[2] + torrent_link = magnet_td.find('a', href=re.compile(r'.torrent'))['href'] + # 拼接完整的链接并移除 host 中的 '-' + full_torrent_link = f"https://u001.25img.com{torrent_link}".replace('-', '') + + # 下载 torrent 文件 + download_torrent(full_torrent_link, title) + time.sleep(3) # 避免请求过快 + + # 解析页码导航,获取下一页链接 + pagination = soup.find('div', class_='center').find('nav').find('ul', class_='pagination') + next_page = pagination.find('a', text='»') + + if next_page: + next_page_url = next_page['href'] + next_p_value = re.search(r'p=(\d+)', next_page_url).group(1) + return next_p_value + return None + +# 爬取指定 q 和 p 的页面 +def scrape(q, start_p=1): + p = start_p + while True: + #url = f"https://u9a9.org/?type=2&search={q}&p={p}" + url = f"https://u001.25img.com/?search2=eelj1a3lfe1a1&search={q}&p={p}" + print(f"Fetching URL: {url}") + response = requests.get(url, headers=headers) + + if response.status_code != 200: + print(f"Failed to fetch {url}") + break + + next_p = parse_page(response.text) + + if next_p: + p = next_p + else: + print(f"No more pages for query {q}.") + break + +# 循环遍历 qlist +for q in qlist: + scrape(q, start_p=1) \ No newline at end of file