modify scripts

This commit is contained in:
oscarz
2025-07-02 17:59:56 +08:00
parent f1a9287834
commit 2ea1eec072
12 changed files with 706 additions and 0 deletions

11
scrapy_proj/scrapy.cfg Normal file
View File

@ -0,0 +1,11 @@
# Automatically created by: scrapy startproject
#
# For more information about the [deploy] section see:
# https://scrapyd.readthedocs.io/en/latest/deploy.html
[settings]
default = scrapy_proj.settings
[deploy]
#url = http://localhost:6800/
project = scrapy_proj

View File

View File

@ -0,0 +1,33 @@
#!/bin/bash
: << 'EOF'
执行本地脚本,以实现任务的状态监控。
远程机上部署发送通知(企微)的脚本,把结果发送出来。
EOF
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
NC='\033[0m' # 无颜色
REMOTE_SERVER="101.33.230.186"
REMOTE_USER="root"
SSH_OTRS="-o StrictHostKeyChecking=no -o ConnectTimeout=10"
# 主函数
main() {
# 检查是否提供了命令参数
if [ $# -eq 0 ]; then
result='test' # 无参数时默认值
else
result=$1 # 使用第一个参数作为结果
fi
# 调用远程脚本并传递结果
ssh $SSH_OTRS $REMOTE_USER@$REMOTE_SERVER "cd /root/projects/devops/tools; python3 ./send_to_wecom.py '$result'"
return $? # 返回远程命令的执行状态
}
# 执行主函数
main "$@"

View File

@ -0,0 +1,116 @@
import subprocess
import time
import logging
from datetime import datetime
from scrapy import signals
from scrapy.exceptions import NotConfigured
from twisted.internet import task
logger = logging.getLogger() # 修改点:使用全局 logger
class StatsExtension:
def __init__(self, stats, interval, script_path=None):
self.stats = stats
self.interval = interval
self.script_path = script_path
self.spider_name = None
self.loop = None # 添加循环任务
@classmethod
def from_crawler(cls, crawler):
interval = crawler.settings.getint('STATS_EXPORT_INTERVAL', 600)
script_path = crawler.settings.get('STATS_EXPORT_SCRIPT')
if interval <= 0:
raise NotConfigured
ext = cls(crawler.stats, interval, script_path)
crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
return ext
def spider_opened(self, spider):
self.spider_name = spider.name
logger.info(f"Spider {spider.name} opened - StatsExtension initialized")
#self._export_stats(spider)
# 创建并启动循环任务
self.loop = task.LoopingCall(self._export_stats, spider)
self.loop.start(self.interval) # 每隔interval秒执行一次
def spider_closed(self, spider, reason):
# 停止循环任务
if self.loop and self.loop.running:
self.loop.stop()
self._export_stats(spider)
logger.info(f"Spider {spider.name} closed - reason: {reason}")
def _export_stats(self, spider):
# 获取当前统计信息
stats = self.stats.get_stats()
# 修正:计算爬虫运行时间
start_time = stats.get('start_time')
if start_time:
# 将 datetime 对象转换为时间戳
start_timestamp = start_time.timestamp()
uptime = time.time() - start_timestamp
else:
uptime = 0
# 构建统计摘要
stats_summary = {
't': datetime.now().strftime('%H:%M:%S'),
'spider': self.spider_name,
'interval(s)': int(uptime),
'recv_cnt': stats.get('response_received_count', 0),
'total_req': stats.get('downloader/request_count', 0),
'200_cnt': stats.get('downloader/response_status_count/200', 0),
'404_cnt': stats.get('downloader/response_status_count/404', 0),
'log_err_cnt': stats.get('log_count/ERROR', 0)
}
# 打印统计信息
logger.info(f"Stats Summary: {stats_summary}")
# 如果配置了shell脚本则调用它
if self.script_path:
self._call_shell_script_async(stats_summary)
def _call_shell_script(self, stats):
try:
# 将统计信息转换为JSON字符串作为参数传递给shell脚本
import json
stats_json = json.dumps(stats)
# 使用subprocess调用shell脚本
result = subprocess.run(
[self.script_path, stats_json],
capture_output=True,
text=True,
check=True
)
logger.info(f"Shell script executed successfully: {result.stdout}")
except subprocess.CalledProcessError as e:
logger.error(f"Error executing shell script: {e.stderr}")
except Exception as e:
logger.error(f"Unexpected error calling shell script: {e}")
def _call_shell_script_async(self, stats):
try:
import json
stats_json = json.dumps(stats)
# 非阻塞执行shell脚本
subprocess.Popen(
[self.script_path, stats_json],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
logger.info(f"Shell script started in background")
except Exception as e:
logger.error(f"Error starting shell script: {e}")

View File

@ -0,0 +1,22 @@
# Define here the models for your scraped items
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/items.html
# items.py
import scrapy
class U001Item(scrapy.Item):
category = scrapy.Field()
title = scrapy.Field()
url = scrapy.Field()
torrent_url = scrapy.Field()
magnet_url = scrapy.Field()
size_text = scrapy.Field()
size_gb = scrapy.Field()
update_date = scrapy.Field()
class Sis001Item(scrapy.Item):
title = scrapy.Field()
url = scrapy.Field()
plate_name = scrapy.Field()

View File

@ -0,0 +1,100 @@
# Define here the models for your spider middleware
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html
from scrapy import signals
# useful for handling different item types with a single interface
from itemadapter import ItemAdapter
class ScrapyProjSpiderMiddleware:
# Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the spider middleware does not modify the
# passed objects.
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
def process_spider_input(self, response, spider):
# Called for each response that goes through the spider
# middleware and into the spider.
# Should return None or raise an exception.
return None
def process_spider_output(self, response, result, spider):
# Called with the results returned from the Spider, after
# it has processed the response.
# Must return an iterable of Request, or item objects.
for i in result:
yield i
def process_spider_exception(self, response, exception, spider):
# Called when a spider or process_spider_input() method
# (from other spider middleware) raises an exception.
# Should return either None or an iterable of Request or item objects.
pass
async def process_start(self, start):
# Called with an async iterator over the spider start() method or the
# maching method of an earlier spider middleware.
async for item_or_request in start:
yield item_or_request
def spider_opened(self, spider):
spider.logger.info("Spider opened: %s" % spider.name)
class ScrapyProjDownloaderMiddleware:
# Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the downloader middleware does not modify the
# passed objects.
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
def process_request(self, request, spider):
# Called for each request that goes through the downloader
# middleware.
# Must either:
# - return None: continue processing this request
# - or return a Response object
# - or return a Request object
# - or raise IgnoreRequest: process_exception() methods of
# installed downloader middleware will be called
return None
def process_response(self, request, response, spider):
# Called with the response returned from the downloader.
# Must either;
# - return a Response object
# - return a Request object
# - or raise IgnoreRequest
return response
def process_exception(self, request, exception, spider):
# Called when a download handler or a process_request()
# (from other downloader middleware) raises an exception.
# Must either:
# - return None: continue processing this exception
# - return a Response object: stops process_exception() chain
# - return a Request object: stops process_exception() chain
pass
def spider_opened(self, spider):
spider.logger.info("Spider opened: %s" % spider.name)

View File

@ -0,0 +1,209 @@
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# useful for handling different item types with a single interface
#from itemadapter import ItemAdapter
#class ScrapyProjPipeline:
# def process_item(self, item, spider):
# return item
import os
import sqlite3
import logging
from datetime import datetime
from scrapy_proj.items import U001Item, Sis001Item
home_dir = os.path.expanduser("~")
global_share_data_dir = f'{home_dir}/sharedata'
default_dbpath = f"{global_share_data_dir}/sqlite/scrapy.db"
# 数据库基类,封装了通用的操作。
class SQLiteDBHandler:
def __init__(self, db_path=None):
# 使用传入的 db_path 或默认路径
self.DB_PATH = db_path or default_dbpath
# 验证路径是否存在(可选)
if db_path and not os.path.exists(os.path.dirname(db_path)):
os.makedirs(os.path.dirname(db_path))
self.conn = sqlite3.connect(self.DB_PATH, check_same_thread=False)
self.cursor = self.conn.cursor()
# 检查 SQLite 版本
self.lower_sqlite_version = False
sqlite_version = sqlite3.sqlite_version_info
if sqlite_version < (3, 24, 0):
self.lower_sqlite_version = True
def get_table_columns_and_defaults(self, tbl_name):
try:
self.cursor.execute(f"PRAGMA table_info({tbl_name})")
columns = self.cursor.fetchall()
column_info = {}
for col in columns:
col_name = col[1]
default_value = col[4]
column_info[col_name] = default_value
return column_info
except sqlite3.Error as e:
logging.error(f"Error getting table columns: {e}")
return None
def check_and_process_data(self, data, tbl_name):
column_info = self.get_table_columns_and_defaults(tbl_name)
if column_info is None:
return None
processed_data = {}
for col, default in column_info.items():
if col == 'id' or col == 'created_at': # 自增主键,不需要用户提供; 创建日期,使用建表默认值
continue
if col == 'updated_at': # 日期函数,用户自己指定即可
processed_data[col] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if col in data:
processed_data[col] = data[col]
return processed_data
def insert_or_update_common(self, data, tbl_name, uniq_key='url'):
if self.lower_sqlite_version:
return self.insert_or_update_common_lower(data, tbl_name, uniq_key)
try:
processed_data = self.check_and_process_data(data, tbl_name)
if processed_data is None:
return None
columns = ', '.join(processed_data.keys())
values = list(processed_data.values())
placeholders = ', '.join(['?' for _ in values])
update_clause = ', '.join([f"{col}=EXCLUDED.{col}" for col in processed_data.keys() if col != uniq_key])
sql = f'''
INSERT INTO {tbl_name} ({columns})
VALUES ({placeholders})
ON CONFLICT ({uniq_key}) DO UPDATE SET {update_clause}
'''
self.cursor.execute(sql, values)
self.conn.commit()
# 获取插入或更新后的记录 ID
self.cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],))
record_id = self.cursor.fetchone()[0]
return record_id
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
def insert_or_update_common_lower(self, data, tbl_name, uniq_key='url'):
try:
processed_data = self.check_and_process_data(data, tbl_name)
if processed_data is None:
return None
columns = ', '.join(processed_data.keys())
values = list(processed_data.values())
placeholders = ', '.join(['?' for _ in values])
# 先尝试插入数据
try:
sql = f'''
INSERT INTO {tbl_name} ({columns})
VALUES ({placeholders})
'''
self.cursor.execute(sql, values)
self.conn.commit()
except sqlite3.IntegrityError: # 唯一键冲突,执行更新操作
update_clause = ', '.join([f"{col}=?" for col in processed_data.keys() if col != uniq_key])
update_values = [processed_data[col] for col in processed_data.keys() if col != uniq_key]
update_values.append(data[uniq_key])
sql = f"UPDATE {tbl_name} SET {update_clause} WHERE {uniq_key} = ?"
self.cursor.execute(sql, update_values)
self.conn.commit()
# 获取插入或更新后的记录 ID
self.cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],))
record_id = self.cursor.fetchone()[0]
return record_id
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
def get_id_by_key(self, tbl, uniq_key, val):
self.cursor.execute(f"SELECT id FROM {tbl} WHERE {uniq_key} = ?", (val,))
row = self.cursor.fetchone()
return row[0] if row else None
def close(self):
self.cursor.close()
self.conn.close()
class SQLitePipeline(SQLiteDBHandler):
def __init__(self, db_path=None):
super().__init__(db_path)
self.tbl_name_u3c3 = 'u3c3'
self.tbl_name_sis = 'sis'
self._create_tables()
def _create_tables(self):
# 创建 u001 数据表
self.cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {self.tbl_name_u3c3} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category TEXT,
title TEXT,
url TEXT UNIQUE,
torrent_url TEXT,
magnet_url TEXT,
size_text TEXT,
size_gb REAL,
update_date TEXT,
created_at TEXT DEFAULT (datetime('now', 'localtime')),
updated_at TEXT DEFAULT (datetime('now', 'localtime'))
)
''')
# 创建 sis001 数据表
self.cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {self.tbl_name_sis} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
plate_name TEXT,
title TEXT,
url TEXT UNIQUE,
size_text TEXT,
size_gb REAL,
update_date TEXT,
created_at TEXT DEFAULT (datetime('now', 'localtime')),
updated_at TEXT DEFAULT (datetime('now', 'localtime'))
)
''')
self.conn.commit()
def process_item(self, item, spider):
if isinstance(item, U001Item):
self._process_u001_item(item)
elif isinstance(item, Sis001Item):
self._process_sis001_item(item)
return item
def _process_u001_item(self, item):
return self.insert_or_update_common(item, tbl_name=self.tbl_name_u3c3, uniq_key='url')
def _process_sis001_item(self, item):
self.cursor.execute('''
INSERT OR IGNORE INTO sis001_data
(title, url, plate_name)
VALUES (?,?,?)
''', (
item.get('title'),
item.get('url'),
item.get('plate_name')
))
self.conn.commit()
def close_spider(self, spider):
self.conn.close()

View File

@ -0,0 +1,140 @@
# Scrapy settings for scrapy_proj project
#
# For simplicity, this file contains only settings considered important or
# commonly used. You can find more settings consulting the documentation:
#
# https://docs.scrapy.org/en/latest/topics/settings.html
# https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html
import os
from datetime import datetime
# 创建日志目录
LOG_DIR = './log'
os.makedirs(LOG_DIR, exist_ok=True)
log_date = datetime.now().strftime('%Y%m%d')
# 配置全局日志
LOG_LEVEL = 'INFO' # 设置为INFO级别
LOG_FILE = os.path.join(LOG_DIR, f'scrapy_{log_date}.log') # 日志文件路径
# 配置日志格式
LOG_FORMAT = '%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s'
LOG_DATEFORMAT = '%Y-%m-%d %H:%M:%S'
BOT_NAME = "scrapy_proj"
SPIDER_MODULES = ["scrapy_proj.spiders"]
NEWSPIDER_MODULE = "scrapy_proj.spiders"
ADDONS = {}
# 并发设置
CONCURRENT_REQUESTS = 1
CONCURRENT_ITEMS = 100
# 下载延迟
DOWNLOAD_DELAY = 1
# 启用管道
ITEM_PIPELINES = {
'scrapy_proj.pipelines.SQLitePipeline': 300,
}
# 用户代理池
USER_AGENT_LIST = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36',
# 更多 UA...
]
# 随机用户代理中间件
DOWNLOADER_MIDDLEWARES = {
'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
'scrapy_useragents.downloadermiddlewares.useragents.UserAgentsMiddleware': None,
}
# settings.py
EXTENSIONS = {
'scrapy_proj.extensions.stats_extension.StatsExtension': 500,
}
# 配置统计导出参数
STATS_EXPORT_INTERVAL = 1800 # 每10分钟导出一次
STATS_EXPORT_SCRIPT = '/root/projects/resources/scrapy_proj/scrapy_proj/extensions/push_to_wecom.sh' # 本地shell脚本路径
# Crawl responsibly by identifying yourself (and your website) on the user-agent
#USER_AGENT = "scrapy_proj (+http://www.yourdomain.com)"
# Obey robots.txt rules
ROBOTSTXT_OBEY = True
# Configure maximum concurrent requests performed by Scrapy (default: 16)
#CONCURRENT_REQUESTS = 32
# Configure a delay for requests for the same website (default: 0)
# See https://docs.scrapy.org/en/latest/topics/settings.html#download-delay
# See also autothrottle settings and docs
#DOWNLOAD_DELAY = 3
# The download delay setting will honor only one of:
#CONCURRENT_REQUESTS_PER_DOMAIN = 16
#CONCURRENT_REQUESTS_PER_IP = 16
# Disable cookies (enabled by default)
#COOKIES_ENABLED = False
# Disable Telnet Console (enabled by default)
#TELNETCONSOLE_ENABLED = False
# Override the default request headers:
#DEFAULT_REQUEST_HEADERS = {
# "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
# "Accept-Language": "en",
#}
# Enable or disable spider middlewares
# See https://docs.scrapy.org/en/latest/topics/spider-middleware.html
#SPIDER_MIDDLEWARES = {
# "scrapy_proj.middlewares.ScrapyProjSpiderMiddleware": 543,
#}
# Enable or disable downloader middlewares
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
#DOWNLOADER_MIDDLEWARES = {
# "scrapy_proj.middlewares.ScrapyProjDownloaderMiddleware": 543,
#}
# Enable or disable extensions
# See https://docs.scrapy.org/en/latest/topics/extensions.html
#EXTENSIONS = {
# "scrapy.extensions.telnet.TelnetConsole": None,
#}
# Configure item pipelines
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
#ITEM_PIPELINES = {
# "scrapy_proj.pipelines.ScrapyProjPipeline": 300,
#}
# Enable and configure the AutoThrottle extension (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/autothrottle.html
#AUTOTHROTTLE_ENABLED = True
# The initial download delay
#AUTOTHROTTLE_START_DELAY = 5
# The maximum download delay to be set in case of high latencies
#AUTOTHROTTLE_MAX_DELAY = 60
# The average number of requests Scrapy should be sending in parallel to
# each remote server
#AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# Enable showing throttling stats for every response received:
#AUTOTHROTTLE_DEBUG = False
# Enable and configure HTTP caching (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
#HTTPCACHE_ENABLED = True
#HTTPCACHE_EXPIRATION_SECS = 0
#HTTPCACHE_DIR = "httpcache"
#HTTPCACHE_IGNORE_HTTP_CODES = []
#HTTPCACHE_STORAGE = "scrapy.extensions.httpcache.FilesystemCacheStorage"
# Set settings whose default value is deprecated to a future-proof value
FEED_EXPORT_ENCODING = "utf-8"

View File

@ -0,0 +1,4 @@
# This package will contain the spiders of your Scrapy project
#
# Please refer to the documentation for information on how to create and manage
# your spiders.

View File

@ -0,0 +1,20 @@
import scrapy
from scrapy_proj.items import Sis001Item
class Sis001Spider(scrapy.Spider):
name = "sis"
allowed_domains = ["sis001.com"]
start_urls = ["https://sis001.com/forum/forum-25-1.html"]
def parse(self, response):
for row in response.css('table[id="forum_25"] tbody[id^="normalthread_"] tr'):
item = Sis001Item()
item['title'] = row.css('td a::text').get()
item['url'] = response.urljoin(row.css('td a::attr(href)').get())
item['plate_name'] = '亚无转帖'
yield item
# 翻页逻辑
next_page = response.css('a.nxt::attr(href)').get()
if next_page:
yield response.follow(next_page, self.parse)

View File

@ -0,0 +1,32 @@
import scrapy
from scrapy_proj.items import U001Item
from scrapy_proj.utils.size_converter import parse_size
class U001Spider(scrapy.Spider):
name = "u3c3"
allowed_domains = ["u001.25img.com"]
start_urls = ["https://u001.25img.com/?p=1"]
def parse(self, response):
for row in response.css('table.torrent-list tbody tr'):
item = U001Item()
item['category'] = row.css('td:nth-child(1) a::attr(title)').get()
item['title'] = row.css('td:nth-child(2) a::attr(title)').get()
item['url'] = response.urljoin(row.css('td:nth-child(2) a::attr(href)').get())
links = row.css('td:nth-child(3) a::attr(href)').getall()
item['torrent_url'] = response.urljoin(links[0]) if links else ''
item['magnet_url'] = links[1] if len(links) > 1 else ''
size_text = row.css('td:nth-child(4)::text').get(default='').strip()
item['size_text'] = size_text
item['size_gb'] = parse_size(size_text)
item['update_date'] = row.css('td:nth-child(5)::text').get(default='').strip()
yield item
# 翻页逻辑
current_page = int(response.url.split('=')[-1])
total_pages = int(response.css('script:contains("totalPages")').re_first(r'totalPages:\s*(\d+)'))
if current_page < total_pages:
yield response.follow(f"?p={current_page + 1}", self.parse)

View File

@ -0,0 +1,19 @@
import re
def parse_size(size_text):
try:
match = re.search(r'(\d+\.\d+|\d+)\s*([A-Za-z]+)', size_text)
if not match:
return 0.0
value, unit = match.groups()
value = float(value)
if unit.lower() == 'mb':
return round(value / 1024, 2)
elif unit.lower() == 'kb':
return round(value / 1024 / 1024, 2)
elif unit.lower() == 'gb':
return round(value, 2)
else:
return 0.0
except Exception:
return 0.0