From e6d0c628afeef45660441e746d134d0b1773220f Mon Sep 17 00:00:00 2001 From: oscarz Date: Thu, 3 Jul 2025 19:47:10 +0800 Subject: [PATCH] modify scripts --- .../scrapy_proj/extensions/failure_monitor.py | 68 +++++++ .../scrapy_proj/extensions/stats_extension.py | 7 +- scrapy_proj/scrapy_proj/items.py | 5 + scrapy_proj/scrapy_proj/pipelines.py | 12 +- scrapy_proj/scrapy_proj/settings.py | 17 ++ .../scrapy_proj/spiders/base_spider.py | 156 ++++++++++++++++ .../scrapy_proj/spiders/iafd_spider.py | 6 +- scrapy_proj/scrapy_proj/spiders/sis_spider.py | 175 ++++++++++++++++-- .../scrapy_proj/spiders/u3c3_spider.py | 6 +- 9 files changed, 423 insertions(+), 29 deletions(-) create mode 100644 scrapy_proj/scrapy_proj/extensions/failure_monitor.py create mode 100644 scrapy_proj/scrapy_proj/spiders/base_spider.py diff --git a/scrapy_proj/scrapy_proj/extensions/failure_monitor.py b/scrapy_proj/scrapy_proj/extensions/failure_monitor.py new file mode 100644 index 0000000..cac540c --- /dev/null +++ b/scrapy_proj/scrapy_proj/extensions/failure_monitor.py @@ -0,0 +1,68 @@ +# extensions/failure_monitor.py +from scrapy import signals +from scrapy.exceptions import NotConfigured +import time + +class FailureMonitorExtension: + def __init__(self, crawler, max_consecutive_failures, failure_rate_threshold, time_window): + self.crawler = crawler + self.max_consecutive_failures = max_consecutive_failures + self.failure_rate_threshold = failure_rate_threshold + self.time_window = time_window # 秒 + + self.consecutive_failures = 0 + self.total_requests = 0 + self.failed_requests = 0 + self.request_times = [] # 记录请求时间用于计算速率 + + @classmethod + def from_crawler(cls, crawler): + # 从设置中获取参数 + max_consecutive = crawler.settings.getint('EXT_FAIL_MONI_MAX_CONSECUTIVE_FAILURES', 10) + failure_rate = crawler.settings.getfloat('EXT_FAIL_MONI_RATE_THRESHOLD', 0.5) + time_window = crawler.settings.getint('EXT_FAIL_MONI_FAILURE_TIME_WINDOW', 60) + + if max_consecutive <= 0 and failure_rate <= 0: + raise NotConfigured + + ext = cls(crawler, max_consecutive, failure_rate, time_window) + + # 注册信号处理函数 + crawler.signals.connect(ext.request_succeeded, signal=signals.response_received) + crawler.signals.connect(ext.request_failed, signal=signals.request_dropped) + crawler.signals.connect(ext.request_failed, signal=signals.spider_error) + + return ext + + def request_succeeded(self, response, request, spider): + self.consecutive_failures = 0 # 重置连续失败计数 + self.total_requests += 1 + self.request_times.append(time.time()) + self._cleanup_old_requests() # 移除时间窗口外的请求 + + def request_failed(self, request, exception, spider): + self.consecutive_failures += 1 + self.failed_requests += 1 + self.total_requests += 1 + self.request_times.append(time.time()) + self._cleanup_old_requests() + + # 检查连续失败次数 + if self.max_consecutive_failures > 0 and self.consecutive_failures >= self.max_consecutive_failures: + spider.logger.error(f"达到连续失败上限 ({self.consecutive_failures}/{self.max_consecutive_failures}),停止爬虫") + self.crawler.engine.close_spider(spider, 'consecutive_failures_exceeded') + + # 检查失败率 + if self.total_requests > 0 and self.failure_rate_threshold > 0: + current_failure_rate = self.failed_requests / self.total_requests + if current_failure_rate >= self.failure_rate_threshold: + spider.logger.error(f"失败率超过阈值 ({current_failure_rate:.2%} > {self.failure_rate_threshold:.2%}),停止爬虫") + self.crawler.engine.close_spider(spider, 'failure_rate_exceeded') + + def _cleanup_old_requests(self): + """移除时间窗口外的请求记录""" + cutoff_time = time.time() - self.time_window + self.request_times = [t for t in self.request_times if t >= cutoff_time] + + # 重新计算失败率 + self.total_requests = len(self.request_times) \ No newline at end of file diff --git a/scrapy_proj/scrapy_proj/extensions/stats_extension.py b/scrapy_proj/scrapy_proj/extensions/stats_extension.py index d339c71..b5f0f31 100644 --- a/scrapy_proj/scrapy_proj/extensions/stats_extension.py +++ b/scrapy_proj/scrapy_proj/extensions/stats_extension.py @@ -15,6 +15,7 @@ class StatsExtension: self.script_path = script_path self.spider_name = None self.loop = None # 添加循环任务 + self.current_status = 'running' # 初始化状态 @classmethod def from_crawler(cls, crawler): @@ -42,7 +43,8 @@ class StatsExtension: # 停止循环任务 if self.loop and self.loop.running: self.loop.stop() - + + self.current_status = f"stop({reason})" # 终止状态 self._export_stats(spider) logger.info(f"Spider {spider.name} closed - reason: {reason}") @@ -58,7 +60,8 @@ class StatsExtension: 'total_rsp': stats.get('downloader/response_count', 0), '200_cnt': stats.get('downloader/response_status_count/200', 0), '404_cnt': stats.get('downloader/response_status_count/404', 0), - 'log_err_cnt': stats.get('log_count/ERROR', 0) + 'log_err_cnt': stats.get('log_count/ERROR', 0), + 'status': self.current_status } # 打印统计信息 diff --git a/scrapy_proj/scrapy_proj/items.py b/scrapy_proj/scrapy_proj/items.py index 984b6ff..02da5ff 100644 --- a/scrapy_proj/scrapy_proj/items.py +++ b/scrapy_proj/scrapy_proj/items.py @@ -6,6 +6,7 @@ # items.py import scrapy +# u3c3.in class U001Item(scrapy.Item): category = scrapy.Field() title = scrapy.Field() @@ -16,10 +17,14 @@ class U001Item(scrapy.Item): size_gb = scrapy.Field() update_date = scrapy.Field() +# sis001.com class Sis001Item(scrapy.Item): title = scrapy.Field() url = scrapy.Field() plate_name = scrapy.Field() + size_text = scrapy.Field() + size_gb = scrapy.Field() + update_date = scrapy.Field() class IAFDPersonItem(scrapy.Item): name = scrapy.Field() diff --git a/scrapy_proj/scrapy_proj/pipelines.py b/scrapy_proj/scrapy_proj/pipelines.py index b39769d..f3ae4a5 100644 --- a/scrapy_proj/scrapy_proj/pipelines.py +++ b/scrapy_proj/scrapy_proj/pipelines.py @@ -79,16 +79,8 @@ class SQLitePipeline(SQLiteDBHandler): return self.insert_or_update_common(item, tbl_name=self.tbl_name_u3c3, uniq_key='url', exists_do_nothing=True) def _process_sis001_item(self, item, spider): - self.cursor.execute(''' - INSERT OR IGNORE INTO sis001_data - (title, url, plate_name) - VALUES (?,?,?) - ''', ( - item.get('title'), - item.get('url'), - item.get('plate_name') - )) - self.conn.commit() + logging.debug(f"insert one item. href:{spider.name}") + return self.insert_or_update_common(item, tbl_name=self.tbl_name_sis, uniq_key='url', exists_do_nothing=True) def _process_iafd_person_item(self, item, spider): logging.info(f"deal with persion item. {item}") diff --git a/scrapy_proj/scrapy_proj/settings.py b/scrapy_proj/scrapy_proj/settings.py index 0635ebe..e14bd1c 100644 --- a/scrapy_proj/scrapy_proj/settings.py +++ b/scrapy_proj/scrapy_proj/settings.py @@ -58,12 +58,29 @@ DOWNLOADER_MIDDLEWARES = { # settings.py EXTENSIONS = { 'scrapy_proj.extensions.stats_extension.StatsExtension': 500, + 'scrapy_proj.extensions.failure_monitor.FailureMonitorExtension': 500, } +# 配置参数,失败检测,并退出任务 +EXT_FAIL_MONI_MAX_CONSECUTIVE_FAILURES = 100 # 连续10次失败后退出 +EXT_FAIL_MONI_RATE_THRESHOLD = 0.6 # 失败率超过30%时退出 +EXT_FAIL_MONI_FAILURE_TIME_WINDOW = 300 # 时间窗口为300秒 + + +# 配置拦截检测和重试参数 +BASE_SPIDER_MIN_CONTENT_LENGTH = 1000 +BASE_SPIDER_BLOCKED_KEYWORDS = [ +] +BASE_SPIDER_MAX_RETRIES = 5 +BASE_SPIDER_RETRY_DELAY = 5 +BASE_SPIDER_CLOSE_ON_MAX_RETRIES = False + # 配置统计导出参数 STATS_EXPORT_INTERVAL = 1800 # 每10分钟导出一次 STATS_EXPORT_SCRIPT = '/root/projects/resources/scrapy_proj/scrapy_proj/extensions/push_to_wecom.sh' # 本地shell脚本路径 +TWISTED_REACTOR = 'twisted.internet.epollreactor.EPollReactor' # 适用于Linux + # Crawl responsibly by identifying yourself (and your website) on the user-agent #USER_AGENT = "scrapy_proj (+http://www.yourdomain.com)" diff --git a/scrapy_proj/scrapy_proj/spiders/base_spider.py b/scrapy_proj/scrapy_proj/spiders/base_spider.py new file mode 100644 index 0000000..e595238 --- /dev/null +++ b/scrapy_proj/scrapy_proj/spiders/base_spider.py @@ -0,0 +1,156 @@ +# spiders/base_spider.py +import scrapy +from scrapy.exceptions import CloseSpider +from scrapy.http import Request +from twisted.internet import reactor, defer, asyncioreactor +import time + +class BaseSpider(scrapy.Spider): + def start_requests(self): + """统一处理请求生成,兼容不同入口点""" + # 如果定义了async start方法,使用它 + if hasattr(self, 'start') and callable(self.start) and \ + getattr(self.start, '__isasync__', False): + return self._wrap_async_start() + + # 如果定义了custom_start_requests,使用它 + if hasattr(self, 'custom_start_requests') and callable(self.custom_start_requests): + return self.custom_start_requests() + + # 默认使用父类的start_requests + return super().start_requests() + + async def _wrap_async_start(self): + """包装异步start方法,处理拦截和重试""" + async for request in self.start(): + if isinstance(request, Request): + # 添加元数据以便后续跟踪 + request.meta.setdefault('original_url', request.url) + yield request + else: + yield request + + def parse(self, response): + """统一的响应处理入口""" + # 记录请求耗时 + request_time = response.meta.get('request_time') + if request_time: + duration = (response.meta.get('response_time') or time.time()) - request_time + self.crawler.stats.set_value(f'response_duration/{response.url}', duration) + + # 检查是否被拦截 + is_blocked, reason = self.check_blocked(response) + if is_blocked: + self.logger.warning(f"页面被拦截: {response.url}, 原因: {reason}") + return self.handle_blocked(response, reason) + + ''' + # 确定实际的解析方法 + callback = self._get_callback(response) + if callback: + yield from callback(response) + else: + # 如果没有指定回调,尝试使用默认的_parse方法 + yield from self._parse(response) + ''' + yield from self._parse(response) + + def _get_callback(self, response): + """获取请求的回调方法""" + # 优先使用请求中指定的回调 + if 'callback' in response.request.meta: + callback = response.request.meta['callback'] + if callable(callback): + return callback + + # 检查请求的原始回调属性 + if hasattr(response.request, 'callback') and callable(response.request.callback): + return response.request.callback + + return None + + def _parse(self, response): + """实际的解析逻辑,由子类实现""" + raise NotImplementedError("子类必须实现_parse方法") + + def check_blocked(self, response): + """检查响应是否被拦截,返回 (is_blocked, reason)""" + # 1. HTTP状态码检查 + if response.status != 200: + return True, f"status_code_{response.status}" + + # 2. 响应内容长度检查(异常短的内容可能是拦截页面) + if len(response.text) < self.settings.getint('BASE_SPIDER_MIN_CONTENT_LENGTH', 1000): + return True, "content_too_short" + + # 3. 响应内容类型检查 + content_type = response.headers.get('Content-Type', b'').decode('utf-8') + if 'html' not in content_type.lower() and 'json' not in content_type.lower(): + return True, f"unexpected_content_type_{content_type}" + + # 4. 响应内容关键词检查 + content = response.text.lower() + blocked_keywords = self.settings.getlist('BASE_SPIDER_BLOCKED_KEYWORDS', [ + ]) + + for keyword in blocked_keywords: + if keyword in content: + return True, f"content_contains_{keyword}" + + # 5. 自定义检查(由子类实现) + custom_check = self.custom_block_check(response) + if custom_check: + return True, custom_check + + return False, None + + def custom_block_check(self, response): + """子类可重写此方法进行自定义拦截检查""" + return None + + def handle_blocked(self, response, reason): + """处理被拦截的请求""" + # 记录统计信息 + self.crawler.stats.inc_value(f'blocked/{reason}') + self.crawler.stats.inc_value(f'blocked/{reason}/{self.name}') + + # 更新数据库状态 + item_id = response.meta.get('item_id') + if item_id and hasattr(self, 'db'): + self.db.update_status(item_id, 'blocked', reason) + + # 重试机制 + retries = response.meta.get('retry_times', 0) + if retries < self.settings.getint('BASE_SPIDER_MAX_RETRIES', 3): + retry_delay = self.get_retry_delay(retries) + self.logger.info(f"将在 {retry_delay} 秒后重试: {response.url}") + + # 使用Twisted的defer延迟执行,非阻塞 + d = defer.Deferred() + reactor.callLater(retry_delay, d.callback, None) + d.addCallback(lambda _: self._schedule_retry(response, retries, item_id)) + return d + + self.logger.error(f"达到最大重试次数,放弃: {response.url}") + if self.settings.getbool('BASE_SPIDER_CLOSE_ON_MAX_RETRIES', False): + raise CloseSpider(f"达到最大重试次数: {retries}") + + def get_retry_delay(self, retries): + """计算重试延迟时间,可被子类重写""" + # 默认使用指数退避算法 + base_delay = self.settings.getint('BASE_SPIDER_RETRY_DELAY', 5) + return base_delay * (2 ** retries) + + def _schedule_retry(self, response, retries, item_id): + """调度重试请求""" + return Request( + response.url, + callback=self.parse, + meta={ + 'retry_times': retries + 1, + 'item_id': item_id, + 'original_url': response.meta.get('original_url', response.url) + }, + dont_filter=True, + priority=response.request.priority - 1 + ) diff --git a/scrapy_proj/scrapy_proj/spiders/iafd_spider.py b/scrapy_proj/scrapy_proj/spiders/iafd_spider.py index 25b7cd2..943607e 100644 --- a/scrapy_proj/scrapy_proj/spiders/iafd_spider.py +++ b/scrapy_proj/scrapy_proj/spiders/iafd_spider.py @@ -1,11 +1,12 @@ import scrapy import re +from scrapy_proj.spiders.base_spider import BaseSpider from scrapy_proj.items import IAFDPersonItem, IAFDMovieItem, IAFDPersonDetailItem, IAFDMovieDetailItem from scrapy_proj.db_wapper.iafd_query import IAFDQuery db_tools = IAFDQuery() -class IAFDSpider(scrapy.Spider): +class IAFDSpider(BaseSpider): name = "iafd" allowed_domains = ["iafd.com"] @@ -35,7 +36,8 @@ class IAFDSpider(scrapy.Spider): if len(self.cmd_list) == 0 : self.cmd_list = [self.cmd_astro, self.cmd_birth, self.cmd_ethnic, self.cmd_dist, self.cmd_stu, self.cmd_performers, self.cmd_movies] - def start_requests(self): + # 入口函数,由基类的方法触发 + def custom_start_requests(self): # 根据命令字执行 if self.cmd_astro in self.cmd_list: self.start_astro() diff --git a/scrapy_proj/scrapy_proj/spiders/sis_spider.py b/scrapy_proj/scrapy_proj/spiders/sis_spider.py index a17f4f7..ffb3794 100644 --- a/scrapy_proj/scrapy_proj/spiders/sis_spider.py +++ b/scrapy_proj/scrapy_proj/spiders/sis_spider.py @@ -1,20 +1,169 @@ import scrapy +from scrapy_proj.spiders.base_spider import BaseSpider from scrapy_proj.items import Sis001Item +from urllib.parse import urljoin +import re -class Sis001Spider(scrapy.Spider): + +def extract_title(element): + """提取a标签中的文本内容,优先使用非空title属性""" + # 检查title属性是否存在且不为空字符串 + title_attr = element.attrib.get('title', '').strip() + if title_attr: + return title_attr + + # 否则使用XPath的string(.)函数获取所有子孙节点的文本 + full_text = element.xpath('string(.)').get(default='').strip() + + # 如果结果为空,尝试获取所有文本片段并分别strip后合并 + if not full_text: + text_parts = element.css('::text').getall() + # 对每个文本片段进行strip处理 + stripped_parts = [part.strip() for part in text_parts] + # 过滤掉空字符串并拼接 + full_text = ' '.join(filter(None, stripped_parts)) + + return full_text or '无标题' # 确保至少返回"无标题" + +class Sis001Spider(BaseSpider): name = "sis" allowed_domains = ["sis001.com"] - start_urls = ["https://sis001.com/forum/forum-25-1.html"] - def parse(self, response): - for row in response.css('table[id="forum_25"] tbody[id^="normalthread_"] tr'): - item = Sis001Item() - item['title'] = row.css('td a::text').get() - item['url'] = response.urljoin(row.css('td a::attr(href)').get()) - item['plate_name'] = '亚无转帖' - yield item + def __init__(self, debug='False', *args, **kwargs): + super().__init__(*args, **kwargs) + self.debug = True if (str(debug).lower() == 'true' or str(debug).lower() == '1') else False + self.logger.info(f"debug mod: {self.debug}") - # 翻页逻辑 - next_page = response.css('a.nxt::attr(href)').get() - if next_page: - yield response.follow(next_page, self.parse) \ No newline at end of file + # 入口函数,由基类的方法触发 + def custom_start_requests(self): + sections = [ + { + 'plate' : 'sis_asia_yc', + 'plate_name' : '亚无原创', + 'url' : 'https://sis001.com/forum/forum-143-1.html', + 'ident' : 'forum_143' + }, + { + 'plate' : 'sis_asia_zt', + 'plate_name' : '亚无转帖', + 'url' : 'https://sis001.com/forum/forum-25-1.html', + 'ident' : 'forum_25' + }, + { + 'plate' : 'sis_oumei_yc', + 'plate_name' : '欧无原创', + 'url' : 'https://sis001.com/forum/forum-229-1.html', + 'ident' : 'forum_229' + }, + { + 'plate' : 'sis_oumei_zt', + 'plate_name' : '欧无转帖', + 'url' : 'https://sis001.com/forum/forum-77-1.html', + 'ident' : 'forum_77' + }, + ] + + for item in sections: + yield scrapy.Request(item['url'], callback=self.parse_page_common, meta=item) + + + def parse_page_common(self, response): + ident = response.meta['ident'] + plate_name = response.meta['plate_name'] + # 查找目标表格 + tables = response.css(f'table#{ident}') + if not tables: + self.logger.warning(f"cannot found table. url: {response.url}") + return + + main_table = None + for table in tables: + # 检查表头是否包含"版块主题" + tbody_tile = extract_title(table.css('thead')) + if "版块主题" in tbody_tile: + main_table = table + break + + if not main_table: + self.logger.warning(f"cannot found table in right topic. url: {response.url}") + return + + # 解析表格行数据 + for body in main_table.css('tbody[id^="normalthread_"]'): + for row in body.css('tr'): + tds = row.css('td') + if len(tds) < 6: + self.logger.warning(f"跳过不完整的行,列数: {len(tds)}") + continue + + # 解析类别和标题 + th_lock = row.css('th') + if not th_lock: + self.logger.warning("未找到th.lock元素") + continue + + # 解析类别链接 + category = th_lock.css('a[href*="forumdisplay.php"] ::text').get(default="未知类别").strip() + + # 解析标题链接 + title = th_lock.css('a[href*="thread-"] ::text').get(default="未知标题").strip() + url = th_lock.css('a[href*="thread-"]::attr(href)').get(default="") + url = urljoin(response.url, url) + + # 解析发布日期 + pub_date = tds[2].css('em ::text').get(default="未知日期").strip() + + # 解析大小和格式 + size_text = tds[4].css('::text').get(default="").strip() + size_gb, file_format = self.parse_size_format(size_text) + + # 生成数据项 + item = Sis001Item() + item['title'] = title + item['url'] = url + item['plate_name'] = plate_name + item['size_text'] = size_text + item['size_gb'] = size_gb + item['update_date'] = pub_date + yield item + + # 处理分页 + next_url = response.css('div.pages_btns a.next::attr(href)').get() + if next_url: + next_url = urljoin(response.url, next_url) + self.logger.info(f"发现下一页: {next_url}") + if not self.debug: + yield scrapy.Request(next_url, callback=self.parse, meta=response.meta) + + def parse_size_format(self, size_text: str): + """解析大小和格式(保持原有逻辑不变)""" + try: + if not size_text: + return 0.0, "未知格式" + + # 分割大小和格式 + parts = size_text.split('/') + format_part = parts[1].strip() if len(parts) > 1 else "未知格式" + + # 解析大小 + size_part = parts[0].strip() + match = re.search(r'(\d+\.\d+|\d+)\s*([A-Za-z]+)', size_part) + + if not match: + self.logger.warning(f"无法解析大小: {size_part}") + return 0.0, format_part + + value, unit = match.groups() + value = float(value) + + if unit.lower() == 'mb' or unit.lower() == 'm': + return round(value / 1024, 2), format_part + elif unit.lower() == 'gb' or unit.lower() == 'g': + return round(value, 2), format_part + else: + self.logger.warning(f"未知单位: {unit}") + return 0.0, format_part + + except Exception as e: + self.logger.error(f"解析大小格式时出错: {e}") + return 0.0, "未知格式" diff --git a/scrapy_proj/scrapy_proj/spiders/u3c3_spider.py b/scrapy_proj/scrapy_proj/spiders/u3c3_spider.py index 94be268..57a11af 100644 --- a/scrapy_proj/scrapy_proj/spiders/u3c3_spider.py +++ b/scrapy_proj/scrapy_proj/spiders/u3c3_spider.py @@ -1,4 +1,5 @@ import scrapy +from scrapy_proj.spiders.base_spider import BaseSpider from scrapy_proj.items import U001Item from scrapy_proj.utils.size_converter import parse_size @@ -22,7 +23,7 @@ def extract_title(element): return full_text or '无标题' # 确保至少返回"无标题" -class U001Spider(scrapy.Spider): +class U001Spider(BaseSpider): name = "u3c3" allowed_domains = ["u001.25img.com"] start_urls = ["https://u001.25img.com/?p=1"] @@ -32,7 +33,8 @@ class U001Spider(scrapy.Spider): self.debug = True if (str(debug).lower() == 'true' or str(debug).lower() == '1') else False self.logger.info(f"debug mod: {self.debug}") - def parse(self, response): + # 入口函数,由基类的方法触发 + def _parse(self, response): for row in response.css('table.torrent-list tbody tr'): item = U001Item() item['category'] = row.css('td:nth-child(1) a::attr(title)').get()