modify scripts

This commit is contained in:
oscarz
2025-07-05 12:08:08 +08:00
parent e6d0c628af
commit 00d8ac08f4
9 changed files with 171 additions and 137 deletions

View File

@ -18,7 +18,7 @@ class FailureMonitorExtension:
@classmethod
def from_crawler(cls, crawler):
# 从设置中获取参数
max_consecutive = crawler.settings.getint('EXT_FAIL_MONI_MAX_CONSECUTIVE_FAILURES', 10)
max_consecutive = crawler.settings.getint('EXT_FAIL_MONI_MAX_CONSECUTIVE_FAILURES', 100)
failure_rate = crawler.settings.getfloat('EXT_FAIL_MONI_RATE_THRESHOLD', 0.5)
time_window = crawler.settings.getint('EXT_FAIL_MONI_FAILURE_TIME_WINDOW', 60)

View File

@ -13,6 +13,7 @@
import os
import sqlite3
import scrapy
import logging
from datetime import datetime
from scrapy_proj.items import U001Item, Sis001Item, IAFDPersonItem, IAFDPersonDetailItem, IAFDMovieItem, IAFDMovieDetailItem
@ -75,24 +76,24 @@ class SQLitePipeline(SQLiteDBHandler):
return item
def _process_u001_item(self, item, spider):
logging.debug(f"insert one item. href:{spider.name}")
spider.logger.debug(f"insert one item. href:{spider.name}")
return self.insert_or_update_common(item, tbl_name=self.tbl_name_u3c3, uniq_key='url', exists_do_nothing=True)
def _process_sis001_item(self, item, spider):
logging.debug(f"insert one item. href:{spider.name}")
spider.logger.debug(f"insert one item. href:{spider.name}")
return self.insert_or_update_common(item, tbl_name=self.tbl_name_sis, uniq_key='url', exists_do_nothing=True)
def _process_iafd_person_item(self, item, spider):
logging.info(f"deal with persion item. {item}")
spider.logger.debug(f"deal with persion item. {item}")
def _process_iafd_movie_item(self, item, spider):
logging.info(f"deal with movie item. {item}")
spider.logger.debug(f"deal with movie item. {item}")
def _process_iafd_person_detail_item(self, item, spider):
logging.info(f"deal with persion item. {item}")
spider.logger.debug(f"deal with persion item. {item}")
def _process_iafd_movie_detail_item(self, item, spider):
logging.info(f"deal with movie item. {item}")
spider.logger.debug(f"deal with movie item. {item}")
def close_spider(self, spider):
self.conn.close()

View File

@ -29,9 +29,9 @@ NEWSPIDER_MODULE = "scrapy_proj.spiders"
ADDONS = {}
# 并发设置
CONCURRENT_REQUESTS = 1
CONCURRENT_REQUESTS_PER_DOMAIN = 1
CONCURRENT_ITEMS = 100
CONCURRENT_REQUESTS = 10
CONCURRENT_REQUESTS_PER_DOMAIN = 5
CONCURRENT_ITEMS = 1000
# 下载延迟
DOWNLOAD_DELAY = 0.3

View File

@ -44,7 +44,7 @@ class BaseSpider(scrapy.Spider):
self.logger.warning(f"页面被拦截: {response.url}, 原因: {reason}")
return self.handle_blocked(response, reason)
'''
# 确定实际的解析方法
callback = self._get_callback(response)
if callback:
@ -52,8 +52,8 @@ class BaseSpider(scrapy.Spider):
else:
# 如果没有指定回调尝试使用默认的_parse方法
yield from self._parse(response)
'''
yield from self._parse(response)
#yield from self._parse(response)
def _get_callback(self, response):
"""获取请求的回调方法"""
@ -154,3 +154,25 @@ class BaseSpider(scrapy.Spider):
dont_filter=True,
priority=response.request.priority - 1
)
"""提取页面元素中的文本内容, 如果包含子元素, 则拼接所有内容。优先使用非空title属性"""
def extract_text_from_element(element, use_title=False):
# 检查title属性是否存在且不为空字符串
if use_title:
title_attr = element.attrib.get('title', '').strip()
if title_attr:
return title_attr
# 否则使用XPath的string(.)函数获取所有子孙节点的文本
full_text = element.xpath('string(.)').get(default='').strip()
# 如果结果为空尝试获取所有文本片段并分别strip后合并
if not full_text:
text_parts = element.css('::text').getall()
# 对每个文本片段进行strip处理
stripped_parts = [part.strip() for part in text_parts]
# 过滤掉空字符串并拼接
full_text = ' '.join(filter(None, stripped_parts))
return full_text or '无标题' # 确保至少返回"无标题"

View File

@ -206,7 +206,7 @@ class IAFDSpider(BaseSpider):
item['from_dist_list'] = 1
item['from_stu_list'] = 0
yield item
yield scrapy.Request(dis_url, callback=self.parse_movie_detail_page)
#yield scrapy.Request(dis_url, callback=self.parse_movie_detail_page)
def parse_studios_list_page(self, response):
select_element = response.css('select[name="Studio"]')
@ -224,7 +224,7 @@ class IAFDSpider(BaseSpider):
item['from_dist_list'] = 0
item['from_stu_list'] = 1
yield item
yield scrapy.Request(stu_url, callback=self.parse_movie_detail_page)
#yield scrapy.Request(stu_url, callback=self.parse_movie_detail_page)
def parse_person_detail_page(self, response):
item = IAFDPersonDetailItem()

View File

@ -1,39 +1,20 @@
import scrapy
from scrapy_proj.spiders.base_spider import BaseSpider
from scrapy_proj.items import Sis001Item
from urllib.parse import urljoin
import re
def extract_title(element):
"""提取a标签中的文本内容优先使用非空title属性"""
# 检查title属性是否存在且不为空字符串
title_attr = element.attrib.get('title', '').strip()
if title_attr:
return title_attr
# 否则使用XPath的string(.)函数获取所有子孙节点的文本
full_text = element.xpath('string(.)').get(default='').strip()
# 如果结果为空尝试获取所有文本片段并分别strip后合并
if not full_text:
text_parts = element.css('::text').getall()
# 对每个文本片段进行strip处理
stripped_parts = [part.strip() for part in text_parts]
# 过滤掉空字符串并拼接
full_text = ' '.join(filter(None, stripped_parts))
return full_text or '无标题' # 确保至少返回"无标题"
from scrapy_proj.spiders.base_spider import BaseSpider, extract_text_from_element
from scrapy_proj.items import Sis001Item
from scrapy_proj.utils.utils import parse_size_format, parse_date_to_datetime
class Sis001Spider(BaseSpider):
name = "sis"
allowed_domains = ["sis001.com"]
def __init__(self, debug='False', *args, **kwargs):
def __init__(self, debug='False', begin=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.debug = True if (str(debug).lower() == 'true' or str(debug).lower() == '1') else False
self.logger.info(f"debug mod: {self.debug}")
self.begin = parse_date_to_datetime(begin) if begin else None
# 入口函数,由基类的方法触发
def custom_start_requests(self):
sections = [
@ -73,33 +54,34 @@ class Sis001Spider(BaseSpider):
# 查找目标表格
tables = response.css(f'table#{ident}')
if not tables:
self.logger.warning(f"cannot found table. url: {response.url}")
self.logger.error(f"cannot found table. url: {response.url}")
return
main_table = None
for table in tables:
# 检查表头是否包含"版块主题"
tbody_tile = extract_title(table.css('thead'))
tbody_tile = extract_text_from_element(table.css('thead'))
if "版块主题" in tbody_tile:
main_table = table
break
if not main_table:
self.logger.warning(f"cannot found table in right topic. url: {response.url}")
self.logger.error(f"cannot found table in right topic. url: {response.url}")
return
need_next = False
# 解析表格行数据
for body in main_table.css('tbody[id^="normalthread_"]'):
for row in body.css('tr'):
tds = row.css('td')
if len(tds) < 6:
self.logger.warning(f"跳过不完整的行,列数: {len(tds)}")
self.logger.warning(f"跳过不完整的行,列数: {len(tds)}. url: {response.url}")
continue
# 解析类别和标题
th_lock = row.css('th')
if not th_lock:
self.logger.warning("未找到th.lock元素")
self.logger.warning(f"未找到th.lock元素. url: {response.url}")
continue
# 解析类别链接
@ -115,7 +97,7 @@ class Sis001Spider(BaseSpider):
# 解析大小和格式
size_text = tds[4].css('::text').get(default="").strip()
size_gb, file_format = self.parse_size_format(size_text)
size_gb, file_format = parse_size_format(size_text)
# 生成数据项
item = Sis001Item()
@ -125,45 +107,22 @@ class Sis001Spider(BaseSpider):
item['size_text'] = size_text
item['size_gb'] = size_gb
item['update_date'] = pub_date
# 判断是否还要翻页,只有满足所有页面的数据,日期均小于开始日期时,停止翻页
up_date = parse_date_to_datetime(item['update_date'])
if up_date and self.begin and up_date < self.begin :
self.logger.debug(f"find early data.")
else:
need_next = True
yield item
# 处理分页
next_url = response.css('div.pages_btns a.next::attr(href)').get()
if next_url:
next_url = urljoin(response.url, next_url)
self.logger.info(f"发现下一页: {next_url}")
if not self.debug:
yield scrapy.Request(next_url, callback=self.parse, meta=response.meta)
if need_next:
# 处理分页
next_url = response.css('div.pages_btns a.next::attr(href)').get()
if next_url:
next_url = urljoin(response.url, next_url)
self.logger.debug(f"发现下一页: {next_url}")
if not self.debug:
yield scrapy.Request(next_url, callback=self.parse_page_common, meta=response.meta)
def parse_size_format(self, size_text: str):
"""解析大小和格式(保持原有逻辑不变)"""
try:
if not size_text:
return 0.0, "未知格式"
# 分割大小和格式
parts = size_text.split('/')
format_part = parts[1].strip() if len(parts) > 1 else "未知格式"
# 解析大小
size_part = parts[0].strip()
match = re.search(r'(\d+\.\d+|\d+)\s*([A-Za-z]+)', size_part)
if not match:
self.logger.warning(f"无法解析大小: {size_part}")
return 0.0, format_part
value, unit = match.groups()
value = float(value)
if unit.lower() == 'mb' or unit.lower() == 'm':
return round(value / 1024, 2), format_part
elif unit.lower() == 'gb' or unit.lower() == 'g':
return round(value, 2), format_part
else:
self.logger.warning(f"未知单位: {unit}")
return 0.0, format_part
except Exception as e:
self.logger.error(f"解析大小格式时出错: {e}")
return 0.0, "未知格式"

View File

@ -1,44 +1,27 @@
import scrapy
from scrapy_proj.spiders.base_spider import BaseSpider
from scrapy_proj.spiders.base_spider import BaseSpider, extract_text_from_element
from scrapy_proj.items import U001Item
from scrapy_proj.utils.size_converter import parse_size
def extract_title(element):
"""提取a标签中的文本内容优先使用非空title属性"""
# 检查title属性是否存在且不为空字符串
title_attr = element.attrib.get('title', '').strip()
if title_attr:
return title_attr
# 否则使用XPath的string(.)函数获取所有子孙节点的文本
full_text = element.xpath('string(.)').get(default='').strip()
# 如果结果为空尝试获取所有文本片段并分别strip后合并
if not full_text:
text_parts = element.css('::text').getall()
# 对每个文本片段进行strip处理
stripped_parts = [part.strip() for part in text_parts]
# 过滤掉空字符串并拼接
full_text = ' '.join(filter(None, stripped_parts))
return full_text or '无标题' # 确保至少返回"无标题"
from scrapy_proj.utils.utils import parse_size, parse_date_to_datetime
class U001Spider(BaseSpider):
name = "u3c3"
allowed_domains = ["u001.25img.com"]
start_urls = ["https://u001.25img.com/?p=1"]
def __init__(self, debug='False', *args, **kwargs):
def __init__(self, debug='False', begin=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.debug = True if (str(debug).lower() == 'true' or str(debug).lower() == '1') else False
self.logger.info(f"debug mod: {self.debug}")
self.begin = parse_date_to_datetime(begin) if begin else None
# 入口函数,由基类的方法触发
def _parse(self, response):
need_next = False
for row in response.css('table.torrent-list tbody tr'):
item = U001Item()
item['category'] = row.css('td:nth-child(1) a::attr(title)').get()
item['title'] = extract_title(row.css('td:nth-child(2) a'))
item['title'] = extract_text_from_element(row.css('td:nth-child(2) a'), use_title=True)
item['url'] = response.urljoin(row.css('td:nth-child(2) a::attr(href)').get())
links = row.css('td:nth-child(3) a::attr(href)').getall()
@ -50,13 +33,21 @@ class U001Spider(BaseSpider):
item['size_gb'] = parse_size(size_text)
item['update_date'] = row.css('td:nth-child(5)::text').get(default='').strip()
# 判断是否还要翻页,只有满足所有页面的数据,日期均小于开始日期时,停止翻页
up_date = parse_date_to_datetime(item['update_date'])
if up_date and self.begin and up_date < self.begin :
self.logger.debug(f"find early data.")
else:
need_next = True
yield item
# 翻页逻辑
current_page = int(response.url.split('=')[-1])
total_pages = int(response.css('script:contains("totalPages")').re_first(r'totalPages:\s*(\d+)'))
if current_page < total_pages:
if self.debug and current_page >= 5:
self.logger.info(f"debug mod. stop crawling.")
else:
yield response.follow(f"?p={current_page + 1}", self.parse)
if need_next :
# 翻页逻辑
current_page = int(response.url.split('=')[-1])
total_pages = int(response.css('script:contains("totalPages")').re_first(r'totalPages:\s*(\d+)'))
if current_page < total_pages:
if self.debug and current_page >= 5:
self.logger.info(f"debug mod. stop crawling.")
else:
yield response.follow(f"?p={current_page + 1}", self._parse)

View File

@ -1,19 +0,0 @@
import re
def parse_size(size_text):
try:
match = re.search(r'(\d+\.\d+|\d+)\s*([A-Za-z]+)', size_text)
if not match:
return 0.0
value, unit = match.groups()
value = float(value)
if unit.lower() == 'mb':
return round(value / 1024, 2)
elif unit.lower() == 'kb':
return round(value / 1024 / 1024, 2)
elif unit.lower() == 'gb':
return round(value, 2)
else:
return 0.0
except Exception:
return 0.0

View File

@ -0,0 +1,80 @@
import re
from datetime import datetime
''' 解析格式为 xxxMB, xxxGB, xxxM 等格式的字符串, 统一单位为 gb '''
def parse_size(size_text):
try:
match = re.search(r'(\d+\.\d+|\d+)\s*([A-Za-z]+)', size_text)
if not match:
return 0.0
value, unit = match.groups()
value = float(value)
if unit.lower() == 'mb' or unit.lower() == 'm':
return round(value / 1024, 2)
elif unit.lower() == 'kb' or unit.lower() == 'k':
return round(value / 1024 / 1024, 2)
elif unit.lower() == 'gb' or unit.lower() == 'g':
return round(value, 2)
else:
return 0.0
except Exception:
return 0.0
''' 解析格式为 5GB/MP4 的字符串, 提取视频类型和统一转换大小。 '''
def parse_size_format(size_text: str):
try:
if not size_text:
return 0.0, "未知格式"
# 分割大小和格式
parts = size_text.split('/')
format_part = parts[1].strip() if len(parts) > 1 else "未知格式"
# 解析大小
return parse_size( parts[0].strip() ), format_part
size_part = parts[0].strip()
match = re.search(r'(\d+\.\d+|\d+)\s*([A-Za-z]+)', size_part)
if not match:
return 0.0, format_part
value, unit = match.groups()
value = float(value)
if unit.lower() == 'mb' or unit.lower() == 'm':
return round(value / 1024, 2), format_part
elif unit.lower() == 'gb' or unit.lower() == 'g':
return round(value, 2), format_part
else:
return 0.0, format_part
except Exception as e:
return 0.0, "未知格式"
"""将日期字符串转换为datetime对象, 支持多种格式"""
def parse_date_to_datetime(date_str):
# 处理可能的日期格式:
# 1. yyyy-mm-dd
# 2. yyyy-m-d
# 3. yyyy/mm/dd
# 4. yyyy/m/dd
# 5. yyyy年mm月dd日中文格式
# 6. yyyy-mm-dd hh:mm:ss
# 尝试匹配不同格式
patterns = [
(r'^(\d{4})-(\d{1,2})-(\d{1,2})$', "%Y-%m-%d"), # yyyy-mm-dd 或 yyyy-m-d
(r'^(\d{4})/(\d{1,2})/(\d{1,2})$', "%Y/%m/%d"), # yyyy/mm/dd 或 yyyy/m/dd
(r'^(\d{4})年(\d{1,2})月(\d{1,2})日$', "%Y年%m月%d"), # 中文格式
(r'^(\d{4})-(\d{1,2})-(\d{1,2}) (\d{1,2}):(\d{1,2}):(\d{1,2})$', "%Y-%m-%d %H:%M:%S"), # 带时间
]
for pattern, format_str in patterns:
match = re.match(pattern, date_str)
if match:
return datetime.strptime(date_str, format_str)
# 如果所有格式都不匹配,抛出错误
return None