modify scripts

This commit is contained in:
oscarz
2025-07-03 19:47:10 +08:00
parent 9561f57aa5
commit e6d0c628af
9 changed files with 423 additions and 29 deletions

View File

@ -0,0 +1,68 @@
# extensions/failure_monitor.py
from scrapy import signals
from scrapy.exceptions import NotConfigured
import time
class FailureMonitorExtension:
def __init__(self, crawler, max_consecutive_failures, failure_rate_threshold, time_window):
self.crawler = crawler
self.max_consecutive_failures = max_consecutive_failures
self.failure_rate_threshold = failure_rate_threshold
self.time_window = time_window # 秒
self.consecutive_failures = 0
self.total_requests = 0
self.failed_requests = 0
self.request_times = [] # 记录请求时间用于计算速率
@classmethod
def from_crawler(cls, crawler):
# 从设置中获取参数
max_consecutive = crawler.settings.getint('EXT_FAIL_MONI_MAX_CONSECUTIVE_FAILURES', 10)
failure_rate = crawler.settings.getfloat('EXT_FAIL_MONI_RATE_THRESHOLD', 0.5)
time_window = crawler.settings.getint('EXT_FAIL_MONI_FAILURE_TIME_WINDOW', 60)
if max_consecutive <= 0 and failure_rate <= 0:
raise NotConfigured
ext = cls(crawler, max_consecutive, failure_rate, time_window)
# 注册信号处理函数
crawler.signals.connect(ext.request_succeeded, signal=signals.response_received)
crawler.signals.connect(ext.request_failed, signal=signals.request_dropped)
crawler.signals.connect(ext.request_failed, signal=signals.spider_error)
return ext
def request_succeeded(self, response, request, spider):
self.consecutive_failures = 0 # 重置连续失败计数
self.total_requests += 1
self.request_times.append(time.time())
self._cleanup_old_requests() # 移除时间窗口外的请求
def request_failed(self, request, exception, spider):
self.consecutive_failures += 1
self.failed_requests += 1
self.total_requests += 1
self.request_times.append(time.time())
self._cleanup_old_requests()
# 检查连续失败次数
if self.max_consecutive_failures > 0 and self.consecutive_failures >= self.max_consecutive_failures:
spider.logger.error(f"达到连续失败上限 ({self.consecutive_failures}/{self.max_consecutive_failures}),停止爬虫")
self.crawler.engine.close_spider(spider, 'consecutive_failures_exceeded')
# 检查失败率
if self.total_requests > 0 and self.failure_rate_threshold > 0:
current_failure_rate = self.failed_requests / self.total_requests
if current_failure_rate >= self.failure_rate_threshold:
spider.logger.error(f"失败率超过阈值 ({current_failure_rate:.2%} > {self.failure_rate_threshold:.2%}),停止爬虫")
self.crawler.engine.close_spider(spider, 'failure_rate_exceeded')
def _cleanup_old_requests(self):
"""移除时间窗口外的请求记录"""
cutoff_time = time.time() - self.time_window
self.request_times = [t for t in self.request_times if t >= cutoff_time]
# 重新计算失败率
self.total_requests = len(self.request_times)

View File

@ -15,6 +15,7 @@ class StatsExtension:
self.script_path = script_path self.script_path = script_path
self.spider_name = None self.spider_name = None
self.loop = None # 添加循环任务 self.loop = None # 添加循环任务
self.current_status = 'running' # 初始化状态
@classmethod @classmethod
def from_crawler(cls, crawler): def from_crawler(cls, crawler):
@ -42,7 +43,8 @@ class StatsExtension:
# 停止循环任务 # 停止循环任务
if self.loop and self.loop.running: if self.loop and self.loop.running:
self.loop.stop() self.loop.stop()
self.current_status = f"stop({reason})" # 终止状态
self._export_stats(spider) self._export_stats(spider)
logger.info(f"Spider {spider.name} closed - reason: {reason}") logger.info(f"Spider {spider.name} closed - reason: {reason}")
@ -58,7 +60,8 @@ class StatsExtension:
'total_rsp': stats.get('downloader/response_count', 0), 'total_rsp': stats.get('downloader/response_count', 0),
'200_cnt': stats.get('downloader/response_status_count/200', 0), '200_cnt': stats.get('downloader/response_status_count/200', 0),
'404_cnt': stats.get('downloader/response_status_count/404', 0), '404_cnt': stats.get('downloader/response_status_count/404', 0),
'log_err_cnt': stats.get('log_count/ERROR', 0) 'log_err_cnt': stats.get('log_count/ERROR', 0),
'status': self.current_status
} }
# 打印统计信息 # 打印统计信息

View File

@ -6,6 +6,7 @@
# items.py # items.py
import scrapy import scrapy
# u3c3.in
class U001Item(scrapy.Item): class U001Item(scrapy.Item):
category = scrapy.Field() category = scrapy.Field()
title = scrapy.Field() title = scrapy.Field()
@ -16,10 +17,14 @@ class U001Item(scrapy.Item):
size_gb = scrapy.Field() size_gb = scrapy.Field()
update_date = scrapy.Field() update_date = scrapy.Field()
# sis001.com
class Sis001Item(scrapy.Item): class Sis001Item(scrapy.Item):
title = scrapy.Field() title = scrapy.Field()
url = scrapy.Field() url = scrapy.Field()
plate_name = scrapy.Field() plate_name = scrapy.Field()
size_text = scrapy.Field()
size_gb = scrapy.Field()
update_date = scrapy.Field()
class IAFDPersonItem(scrapy.Item): class IAFDPersonItem(scrapy.Item):
name = scrapy.Field() name = scrapy.Field()

View File

@ -79,16 +79,8 @@ class SQLitePipeline(SQLiteDBHandler):
return self.insert_or_update_common(item, tbl_name=self.tbl_name_u3c3, uniq_key='url', exists_do_nothing=True) return self.insert_or_update_common(item, tbl_name=self.tbl_name_u3c3, uniq_key='url', exists_do_nothing=True)
def _process_sis001_item(self, item, spider): def _process_sis001_item(self, item, spider):
self.cursor.execute(''' logging.debug(f"insert one item. href:{spider.name}")
INSERT OR IGNORE INTO sis001_data return self.insert_or_update_common(item, tbl_name=self.tbl_name_sis, uniq_key='url', exists_do_nothing=True)
(title, url, plate_name)
VALUES (?,?,?)
''', (
item.get('title'),
item.get('url'),
item.get('plate_name')
))
self.conn.commit()
def _process_iafd_person_item(self, item, spider): def _process_iafd_person_item(self, item, spider):
logging.info(f"deal with persion item. {item}") logging.info(f"deal with persion item. {item}")

View File

@ -58,12 +58,29 @@ DOWNLOADER_MIDDLEWARES = {
# settings.py # settings.py
EXTENSIONS = { EXTENSIONS = {
'scrapy_proj.extensions.stats_extension.StatsExtension': 500, 'scrapy_proj.extensions.stats_extension.StatsExtension': 500,
'scrapy_proj.extensions.failure_monitor.FailureMonitorExtension': 500,
} }
# 配置参数,失败检测,并退出任务
EXT_FAIL_MONI_MAX_CONSECUTIVE_FAILURES = 100 # 连续10次失败后退出
EXT_FAIL_MONI_RATE_THRESHOLD = 0.6 # 失败率超过30%时退出
EXT_FAIL_MONI_FAILURE_TIME_WINDOW = 300 # 时间窗口为300秒
# 配置拦截检测和重试参数
BASE_SPIDER_MIN_CONTENT_LENGTH = 1000
BASE_SPIDER_BLOCKED_KEYWORDS = [
]
BASE_SPIDER_MAX_RETRIES = 5
BASE_SPIDER_RETRY_DELAY = 5
BASE_SPIDER_CLOSE_ON_MAX_RETRIES = False
# 配置统计导出参数 # 配置统计导出参数
STATS_EXPORT_INTERVAL = 1800 # 每10分钟导出一次 STATS_EXPORT_INTERVAL = 1800 # 每10分钟导出一次
STATS_EXPORT_SCRIPT = '/root/projects/resources/scrapy_proj/scrapy_proj/extensions/push_to_wecom.sh' # 本地shell脚本路径 STATS_EXPORT_SCRIPT = '/root/projects/resources/scrapy_proj/scrapy_proj/extensions/push_to_wecom.sh' # 本地shell脚本路径
TWISTED_REACTOR = 'twisted.internet.epollreactor.EPollReactor' # 适用于Linux
# Crawl responsibly by identifying yourself (and your website) on the user-agent # Crawl responsibly by identifying yourself (and your website) on the user-agent
#USER_AGENT = "scrapy_proj (+http://www.yourdomain.com)" #USER_AGENT = "scrapy_proj (+http://www.yourdomain.com)"

View File

@ -0,0 +1,156 @@
# spiders/base_spider.py
import scrapy
from scrapy.exceptions import CloseSpider
from scrapy.http import Request
from twisted.internet import reactor, defer, asyncioreactor
import time
class BaseSpider(scrapy.Spider):
def start_requests(self):
"""统一处理请求生成,兼容不同入口点"""
# 如果定义了async start方法使用它
if hasattr(self, 'start') and callable(self.start) and \
getattr(self.start, '__isasync__', False):
return self._wrap_async_start()
# 如果定义了custom_start_requests使用它
if hasattr(self, 'custom_start_requests') and callable(self.custom_start_requests):
return self.custom_start_requests()
# 默认使用父类的start_requests
return super().start_requests()
async def _wrap_async_start(self):
"""包装异步start方法处理拦截和重试"""
async for request in self.start():
if isinstance(request, Request):
# 添加元数据以便后续跟踪
request.meta.setdefault('original_url', request.url)
yield request
else:
yield request
def parse(self, response):
"""统一的响应处理入口"""
# 记录请求耗时
request_time = response.meta.get('request_time')
if request_time:
duration = (response.meta.get('response_time') or time.time()) - request_time
self.crawler.stats.set_value(f'response_duration/{response.url}', duration)
# 检查是否被拦截
is_blocked, reason = self.check_blocked(response)
if is_blocked:
self.logger.warning(f"页面被拦截: {response.url}, 原因: {reason}")
return self.handle_blocked(response, reason)
'''
# 确定实际的解析方法
callback = self._get_callback(response)
if callback:
yield from callback(response)
else:
# 如果没有指定回调尝试使用默认的_parse方法
yield from self._parse(response)
'''
yield from self._parse(response)
def _get_callback(self, response):
"""获取请求的回调方法"""
# 优先使用请求中指定的回调
if 'callback' in response.request.meta:
callback = response.request.meta['callback']
if callable(callback):
return callback
# 检查请求的原始回调属性
if hasattr(response.request, 'callback') and callable(response.request.callback):
return response.request.callback
return None
def _parse(self, response):
"""实际的解析逻辑,由子类实现"""
raise NotImplementedError("子类必须实现_parse方法")
def check_blocked(self, response):
"""检查响应是否被拦截,返回 (is_blocked, reason)"""
# 1. HTTP状态码检查
if response.status != 200:
return True, f"status_code_{response.status}"
# 2. 响应内容长度检查(异常短的内容可能是拦截页面)
if len(response.text) < self.settings.getint('BASE_SPIDER_MIN_CONTENT_LENGTH', 1000):
return True, "content_too_short"
# 3. 响应内容类型检查
content_type = response.headers.get('Content-Type', b'').decode('utf-8')
if 'html' not in content_type.lower() and 'json' not in content_type.lower():
return True, f"unexpected_content_type_{content_type}"
# 4. 响应内容关键词检查
content = response.text.lower()
blocked_keywords = self.settings.getlist('BASE_SPIDER_BLOCKED_KEYWORDS', [
])
for keyword in blocked_keywords:
if keyword in content:
return True, f"content_contains_{keyword}"
# 5. 自定义检查(由子类实现)
custom_check = self.custom_block_check(response)
if custom_check:
return True, custom_check
return False, None
def custom_block_check(self, response):
"""子类可重写此方法进行自定义拦截检查"""
return None
def handle_blocked(self, response, reason):
"""处理被拦截的请求"""
# 记录统计信息
self.crawler.stats.inc_value(f'blocked/{reason}')
self.crawler.stats.inc_value(f'blocked/{reason}/{self.name}')
# 更新数据库状态
item_id = response.meta.get('item_id')
if item_id and hasattr(self, 'db'):
self.db.update_status(item_id, 'blocked', reason)
# 重试机制
retries = response.meta.get('retry_times', 0)
if retries < self.settings.getint('BASE_SPIDER_MAX_RETRIES', 3):
retry_delay = self.get_retry_delay(retries)
self.logger.info(f"将在 {retry_delay} 秒后重试: {response.url}")
# 使用Twisted的defer延迟执行非阻塞
d = defer.Deferred()
reactor.callLater(retry_delay, d.callback, None)
d.addCallback(lambda _: self._schedule_retry(response, retries, item_id))
return d
self.logger.error(f"达到最大重试次数,放弃: {response.url}")
if self.settings.getbool('BASE_SPIDER_CLOSE_ON_MAX_RETRIES', False):
raise CloseSpider(f"达到最大重试次数: {retries}")
def get_retry_delay(self, retries):
"""计算重试延迟时间,可被子类重写"""
# 默认使用指数退避算法
base_delay = self.settings.getint('BASE_SPIDER_RETRY_DELAY', 5)
return base_delay * (2 ** retries)
def _schedule_retry(self, response, retries, item_id):
"""调度重试请求"""
return Request(
response.url,
callback=self.parse,
meta={
'retry_times': retries + 1,
'item_id': item_id,
'original_url': response.meta.get('original_url', response.url)
},
dont_filter=True,
priority=response.request.priority - 1
)

View File

@ -1,11 +1,12 @@
import scrapy import scrapy
import re import re
from scrapy_proj.spiders.base_spider import BaseSpider
from scrapy_proj.items import IAFDPersonItem, IAFDMovieItem, IAFDPersonDetailItem, IAFDMovieDetailItem from scrapy_proj.items import IAFDPersonItem, IAFDMovieItem, IAFDPersonDetailItem, IAFDMovieDetailItem
from scrapy_proj.db_wapper.iafd_query import IAFDQuery from scrapy_proj.db_wapper.iafd_query import IAFDQuery
db_tools = IAFDQuery() db_tools = IAFDQuery()
class IAFDSpider(scrapy.Spider): class IAFDSpider(BaseSpider):
name = "iafd" name = "iafd"
allowed_domains = ["iafd.com"] allowed_domains = ["iafd.com"]
@ -35,7 +36,8 @@ class IAFDSpider(scrapy.Spider):
if len(self.cmd_list) == 0 : if len(self.cmd_list) == 0 :
self.cmd_list = [self.cmd_astro, self.cmd_birth, self.cmd_ethnic, self.cmd_dist, self.cmd_stu, self.cmd_performers, self.cmd_movies] self.cmd_list = [self.cmd_astro, self.cmd_birth, self.cmd_ethnic, self.cmd_dist, self.cmd_stu, self.cmd_performers, self.cmd_movies]
def start_requests(self): # 入口函数,由基类的方法触发
def custom_start_requests(self):
# 根据命令字执行 # 根据命令字执行
if self.cmd_astro in self.cmd_list: if self.cmd_astro in self.cmd_list:
self.start_astro() self.start_astro()

View File

@ -1,20 +1,169 @@
import scrapy import scrapy
from scrapy_proj.spiders.base_spider import BaseSpider
from scrapy_proj.items import Sis001Item from scrapy_proj.items import Sis001Item
from urllib.parse import urljoin
import re
class Sis001Spider(scrapy.Spider):
def extract_title(element):
"""提取a标签中的文本内容优先使用非空title属性"""
# 检查title属性是否存在且不为空字符串
title_attr = element.attrib.get('title', '').strip()
if title_attr:
return title_attr
# 否则使用XPath的string(.)函数获取所有子孙节点的文本
full_text = element.xpath('string(.)').get(default='').strip()
# 如果结果为空尝试获取所有文本片段并分别strip后合并
if not full_text:
text_parts = element.css('::text').getall()
# 对每个文本片段进行strip处理
stripped_parts = [part.strip() for part in text_parts]
# 过滤掉空字符串并拼接
full_text = ' '.join(filter(None, stripped_parts))
return full_text or '无标题' # 确保至少返回"无标题"
class Sis001Spider(BaseSpider):
name = "sis" name = "sis"
allowed_domains = ["sis001.com"] allowed_domains = ["sis001.com"]
start_urls = ["https://sis001.com/forum/forum-25-1.html"]
def parse(self, response): def __init__(self, debug='False', *args, **kwargs):
for row in response.css('table[id="forum_25"] tbody[id^="normalthread_"] tr'): super().__init__(*args, **kwargs)
item = Sis001Item() self.debug = True if (str(debug).lower() == 'true' or str(debug).lower() == '1') else False
item['title'] = row.css('td a::text').get() self.logger.info(f"debug mod: {self.debug}")
item['url'] = response.urljoin(row.css('td a::attr(href)').get())
item['plate_name'] = '亚无转帖'
yield item
# 翻页逻辑 # 入口函数,由基类的方法触发
next_page = response.css('a.nxt::attr(href)').get() def custom_start_requests(self):
if next_page: sections = [
yield response.follow(next_page, self.parse) {
'plate' : 'sis_asia_yc',
'plate_name' : '亚无原创',
'url' : 'https://sis001.com/forum/forum-143-1.html',
'ident' : 'forum_143'
},
{
'plate' : 'sis_asia_zt',
'plate_name' : '亚无转帖',
'url' : 'https://sis001.com/forum/forum-25-1.html',
'ident' : 'forum_25'
},
{
'plate' : 'sis_oumei_yc',
'plate_name' : '欧无原创',
'url' : 'https://sis001.com/forum/forum-229-1.html',
'ident' : 'forum_229'
},
{
'plate' : 'sis_oumei_zt',
'plate_name' : '欧无转帖',
'url' : 'https://sis001.com/forum/forum-77-1.html',
'ident' : 'forum_77'
},
]
for item in sections:
yield scrapy.Request(item['url'], callback=self.parse_page_common, meta=item)
def parse_page_common(self, response):
ident = response.meta['ident']
plate_name = response.meta['plate_name']
# 查找目标表格
tables = response.css(f'table#{ident}')
if not tables:
self.logger.warning(f"cannot found table. url: {response.url}")
return
main_table = None
for table in tables:
# 检查表头是否包含"版块主题"
tbody_tile = extract_title(table.css('thead'))
if "版块主题" in tbody_tile:
main_table = table
break
if not main_table:
self.logger.warning(f"cannot found table in right topic. url: {response.url}")
return
# 解析表格行数据
for body in main_table.css('tbody[id^="normalthread_"]'):
for row in body.css('tr'):
tds = row.css('td')
if len(tds) < 6:
self.logger.warning(f"跳过不完整的行,列数: {len(tds)}")
continue
# 解析类别和标题
th_lock = row.css('th')
if not th_lock:
self.logger.warning("未找到th.lock元素")
continue
# 解析类别链接
category = th_lock.css('a[href*="forumdisplay.php"] ::text').get(default="未知类别").strip()
# 解析标题链接
title = th_lock.css('a[href*="thread-"] ::text').get(default="未知标题").strip()
url = th_lock.css('a[href*="thread-"]::attr(href)').get(default="")
url = urljoin(response.url, url)
# 解析发布日期
pub_date = tds[2].css('em ::text').get(default="未知日期").strip()
# 解析大小和格式
size_text = tds[4].css('::text').get(default="").strip()
size_gb, file_format = self.parse_size_format(size_text)
# 生成数据项
item = Sis001Item()
item['title'] = title
item['url'] = url
item['plate_name'] = plate_name
item['size_text'] = size_text
item['size_gb'] = size_gb
item['update_date'] = pub_date
yield item
# 处理分页
next_url = response.css('div.pages_btns a.next::attr(href)').get()
if next_url:
next_url = urljoin(response.url, next_url)
self.logger.info(f"发现下一页: {next_url}")
if not self.debug:
yield scrapy.Request(next_url, callback=self.parse, meta=response.meta)
def parse_size_format(self, size_text: str):
"""解析大小和格式(保持原有逻辑不变)"""
try:
if not size_text:
return 0.0, "未知格式"
# 分割大小和格式
parts = size_text.split('/')
format_part = parts[1].strip() if len(parts) > 1 else "未知格式"
# 解析大小
size_part = parts[0].strip()
match = re.search(r'(\d+\.\d+|\d+)\s*([A-Za-z]+)', size_part)
if not match:
self.logger.warning(f"无法解析大小: {size_part}")
return 0.0, format_part
value, unit = match.groups()
value = float(value)
if unit.lower() == 'mb' or unit.lower() == 'm':
return round(value / 1024, 2), format_part
elif unit.lower() == 'gb' or unit.lower() == 'g':
return round(value, 2), format_part
else:
self.logger.warning(f"未知单位: {unit}")
return 0.0, format_part
except Exception as e:
self.logger.error(f"解析大小格式时出错: {e}")
return 0.0, "未知格式"

View File

@ -1,4 +1,5 @@
import scrapy import scrapy
from scrapy_proj.spiders.base_spider import BaseSpider
from scrapy_proj.items import U001Item from scrapy_proj.items import U001Item
from scrapy_proj.utils.size_converter import parse_size from scrapy_proj.utils.size_converter import parse_size
@ -22,7 +23,7 @@ def extract_title(element):
return full_text or '无标题' # 确保至少返回"无标题" return full_text or '无标题' # 确保至少返回"无标题"
class U001Spider(scrapy.Spider): class U001Spider(BaseSpider):
name = "u3c3" name = "u3c3"
allowed_domains = ["u001.25img.com"] allowed_domains = ["u001.25img.com"]
start_urls = ["https://u001.25img.com/?p=1"] start_urls = ["https://u001.25img.com/?p=1"]
@ -32,7 +33,8 @@ class U001Spider(scrapy.Spider):
self.debug = True if (str(debug).lower() == 'true' or str(debug).lower() == '1') else False self.debug = True if (str(debug).lower() == 'true' or str(debug).lower() == '1') else False
self.logger.info(f"debug mod: {self.debug}") self.logger.info(f"debug mod: {self.debug}")
def parse(self, response): # 入口函数,由基类的方法触发
def _parse(self, response):
for row in response.css('table.torrent-list tbody tr'): for row in response.css('table.torrent-list tbody tr'):
item = U001Item() item = U001Item()
item['category'] = row.css('td:nth-child(1) a::attr(title)').get() item['category'] = row.css('td:nth-child(1) a::attr(title)').get()