From f769698f63fa7c3e1a9c402e4409824d11b94a5c Mon Sep 17 00:00:00 2001 From: sophon Date: Mon, 21 Jul 2025 10:24:59 +0800 Subject: [PATCH] modify scripts --- scrapy_proj/cron/cron_weekly.sh | 114 ++++++++++++++++++ .../db_wapper/spider_db_handler.py | 85 +++++++++++++ .../scrapy_proj/db_wapper/sqlite_base.py | 29 ++++- .../scrapy_proj/extensions/stats_extension.py | 27 ++++- scrapy_proj/scrapy_proj/pipelines.py | 3 +- scrapy_proj/scrapy_proj/spiders/clm_spider.py | 12 +- 6 files changed, 258 insertions(+), 12 deletions(-) create mode 100755 scrapy_proj/cron/cron_weekly.sh diff --git a/scrapy_proj/cron/cron_weekly.sh b/scrapy_proj/cron/cron_weekly.sh new file mode 100755 index 0000000..161399c --- /dev/null +++ b/scrapy_proj/cron/cron_weekly.sh @@ -0,0 +1,114 @@ +#!/bin/bash + +# ============================================== +# 配置区:可根据需求修改或扩展 +# ============================================== +# 项目基础路径(自动计算:脚本所在目录的上一级,即scrapy_proj/) +SCRAPY_PROJ_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd) +LOG_DIR="${SCRAPY_PROJ_DIR}/log" # 日志存放目录 +SLEEP_SECONDS=60 # 每个Spider执行间隔(秒) +# 计算公共日期参数(当前日期往前减8天,yyyy-mm-dd) +COMMON_DATE_PARAM=$(date -d "8 days ago" +%Y-%m-%d) + +# ============================================== +# 注册Spider:在此处为每个Spider定义执行命令 +# 格式:register_spider "Spider名称" "完整执行命令(支持变量和自定义参数)" +# ============================================== +# 定义注册函数(无需修改) +declare -a SPIDER_REGISTRY=() # 存储注册的Spider信息 +register_spider() { + local spider_name="$1" + local execute_cmd="$2" + SPIDER_REGISTRY+=("${spider_name}|${execute_cmd}") +} + +# 注册示例:根据实际需求修改或添加 +# SpiderA:仅需公共日期参数 +register_spider "u3c3" "scrapy crawl u3c3 -a begin=${COMMON_DATE_PARAM}" + +# SpiderB:需要公共日期+自定义参数 +register_spider "sis" "scrapy crawl sis -a begin=${COMMON_DATE_PARAM}" + +# SpiderB:需要公共日期+自定义参数 +register_spider "clm" "scrapy crawl clm -a begin=${COMMON_DATE_PARAM} -a mod='update' " + +# SpiderC:完全自定义参数(不依赖公共日期) +register_spider "pbox" "scrapy crawl pbox " + +# ============================================== +# 核心执行逻辑(无需修改) +# ============================================== +# 初始化日志目录 +mkdir -p "${LOG_DIR}" +current_time=$(date +"%Y%m%d") # 执行时间戳(用于日志命名) +main_log="${LOG_DIR}/cron_${current_time}.log" + +# 日志函数:带时间戳并写入主日志 +log() { + local msg="$1" + local timestamp=$(date +"%Y-%m-%d %H:%M:%S") + echo "[$timestamp] $msg" | tee -a "${main_log}" +} + +# 执行单个Spider的函数 +execute_spider() { + local spider_name="$1" + local execute_cmd="$2" + + log "===== 开始执行 ${spider_name} =====" + log "执行命令:${execute_cmd}" + + # 单个Spider的日志文件(独立记录,便于排查) + local spider_log="${LOG_DIR}/${spider_name}_${current_time}.log" + + # 执行命令(切换到项目目录,确保scrapy命令生效) + (cd "${SCRAPY_PROJ_DIR}" && eval "${execute_cmd}") > "${spider_log}" 2>&1 + local exit_code=$? # 捕获命令执行结果 + + # 执行结果判断 + if [ ${exit_code} -eq 0 ]; then + log "${spider_name} 执行成功(日志:${spider_log})" + else + log "ERROR: ${spider_name} 执行失败(日志:${spider_log},退出码:${exit_code})" + fi + return ${exit_code} +} + +# ============================================== +# 主流程:遍历注册的Spider并顺序执行 +# ============================================== +log "===== 爬虫调度脚本启动 =====" +log "项目路径:${SCRAPY_PROJ_DIR}" +log "公共日期参数:${COMMON_DATE_PARAM}" +log "已注册Spider数量:${#SPIDER_REGISTRY[@]}" + +# 检查注册的Spider是否为空 +if [ ${#SPIDER_REGISTRY[@]} -eq 0 ]; then + log "ERROR: 未注册任何Spider,脚本终止" + exit 1 +fi + +# 遍历执行所有注册的Spider +for spider_info in "${SPIDER_REGISTRY[@]}"; do + # 解析注册信息(分割名称和命令) + IFS="|" read -r spider_name execute_cmd <<< "${spider_info}" + + # 执行当前Spider + execute_spider "${spider_name}" "${execute_cmd}" + last_exit_code=$? + + # 若开启“失败即终止”,取消以下注释(某一个失败后不再执行后续) + # if [ ${last_exit_code} -ne 0 ]; then + # log "ERROR: 因${spider_name}执行失败,终止后续执行" + # exit ${last_exit_code} + # fi + + # 不是最后一个Spider则休眠 + if [ "${spider_info}" != "${SPIDER_REGISTRY[-1]}" ]; then + log "等待${SLEEP_SECONDS}秒后执行下一个Spider..." + sleep ${SLEEP_SECONDS} + fi +done + +log "===== 所有注册的Spider执行完毕 =====" +exit 0 \ No newline at end of file diff --git a/scrapy_proj/scrapy_proj/db_wapper/spider_db_handler.py b/scrapy_proj/scrapy_proj/db_wapper/spider_db_handler.py index 10c9aa5..e14246f 100644 --- a/scrapy_proj/scrapy_proj/db_wapper/spider_db_handler.py +++ b/scrapy_proj/scrapy_proj/db_wapper/spider_db_handler.py @@ -42,6 +42,26 @@ class SisDBHandler(SQLiteDBHandler): ''') self.conn.commit() + # 统计函数 + def get_stat(self): + try: + self.cursor.execute(f""" + SELECT + (SELECT COUNT(*) FROM {self.tbl_name_sis}) AS cnt + """) + + row = self.cursor.fetchone() + if not row: + logging.warning(f"query no results.") + return {} + + columns = [desc[0] for desc in self.cursor.description] + return dict(zip(columns, row)) + + except sqlite3.Error as e: + logging.error(f"query error: {e}") + return {} + @register_handler(comm.SPIDER_NAME_U3C3) class U3C3DBHandler(SQLiteDBHandler): @@ -71,6 +91,26 @@ class U3C3DBHandler(SQLiteDBHandler): ''') self.conn.commit() + # 统计函数 + def get_stat(self): + try: + self.cursor.execute(f""" + SELECT + (SELECT COUNT(*) FROM {self.tbl_name_u3c3}) AS cnt + """) + + row = self.cursor.fetchone() + if not row: + logging.warning(f"query no results.") + return {} + + columns = [desc[0] for desc in self.cursor.description] + return dict(zip(columns, row)) + + except sqlite3.Error as e: + logging.error(f"query error: {e}") + return {} + @register_handler(comm.SPIDER_NAME_CLM) class ClmDBHandler(SQLiteDBHandler): @@ -161,6 +201,28 @@ class ClmDBHandler(SQLiteDBHandler): except sqlite3.Error as e: logging.error(f"查询 href 失败: {e}") return None + + # 统计函数 + def get_stat(self): + try: + self.cursor.execute(f""" + SELECT + (SELECT COUNT(*) FROM {self.tbl_name_clm_keywords}) AS words, + (SELECT COUNT(*) FROM {self.tbl_name_clm_index}) AS magnets, + (SELECT COUNT(*) FROM {self.tbl_name_words_index}) AS wd_mag + """) + + row = self.cursor.fetchone() + if not row: + logging.warning(f"query no results.") + return {} + + columns = [desc[0] for desc in self.cursor.description] + return dict(zip(columns, row)) + + except sqlite3.Error as e: + logging.error(f"query error: {e}") + return {} @register_handler(comm.SPIDER_NAME_IAFD) @@ -505,6 +567,29 @@ class PboxDBHandler(SQLiteDBHandler): logging.error(f"查询 href 失败: {e}") return [] + # 统计函数 + def get_stat(self): + try: + self.cursor.execute(f""" + SELECT + (SELECT COUNT(*) FROM {self.tbl_studios}) AS studios, + (SELECT COUNT(*) FROM {self.tbl_movies}) AS movies, + (SELECT COUNT(*) FROM {self.tbl_actor}) AS actors, + (SELECT COUNT(*) FROM {self.tbl_actor_mov}) AS act_mov + """) + + row = self.cursor.fetchone() + if not row: + logging.warning(f"query no results.") + return {} + + columns = [desc[0] for desc in self.cursor.description] + return dict(zip(columns, row)) + + except sqlite3.Error as e: + logging.error(f"query error: {e}") + return {} + def close_spider(self, spider): # 关闭数据库连接 self.conn.close() \ No newline at end of file diff --git a/scrapy_proj/scrapy_proj/db_wapper/sqlite_base.py b/scrapy_proj/scrapy_proj/db_wapper/sqlite_base.py index 827e6ef..75ce2ea 100644 --- a/scrapy_proj/scrapy_proj/db_wapper/sqlite_base.py +++ b/scrapy_proj/scrapy_proj/db_wapper/sqlite_base.py @@ -8,9 +8,23 @@ global_share_data_dir = f'{home_dir}/sharedata' default_dbpath = f"{global_share_data_dir}/sqlite/scrapy.db" shared_db_path = f"{global_share_data_dir}/sqlite/shared.db" +# 单例元类 +class SingletonMeta(type): + _instances = {} # 存储每个类的唯一实例 + + def __call__(cls, *args, **kwargs): + # 检查实例是否已存在,不存在则创建 + if cls not in cls._instances: + cls._instances[cls] = super().__call__(*args, **kwargs) + return cls._instances[cls] + # 数据库基类,封装了通用的操作。 -class SQLiteDBHandler: +class SQLiteDBHandler(metaclass=SingletonMeta): # 应用单例元类 def __init__(self, db_path=None): + # 防止重复初始化(单例模式下可能被多次调用__init__) + if hasattr(self, 'initialized') and self.initialized: + return + # 使用传入的 db_path 或默认路径 self.DB_PATH = db_path or default_dbpath @@ -19,18 +33,22 @@ class SQLiteDBHandler: os.makedirs(os.path.dirname(db_path)) self.conn = sqlite3.connect(self.DB_PATH, check_same_thread=False) - #self.conn.execute('PRAGMA journal_mode = WAL') # 启用 WAL(Write-Ahead Logging) 模式 - #self.conn.commit() - self.conn.row_factory = sqlite3.Row # 结果集支持字典式访问 self.cursor = self.conn.cursor() + #self.conn.execute('PRAGMA journal_mode = WAL') # 启用 WAL(Write-Ahead Logging) 模式 + #self.conn.commit() + self.conn.execute('PRAGMA journal_mode = DELETE') # 切换回传统模式 + self.conn.commit() # 确保设置生效 + # 检查 SQLite 版本 self.lower_sqlite_version = False sqlite_version = sqlite3.sqlite_version_info if sqlite_version < (3, 24, 0): self.lower_sqlite_version = True + self.initialized = True # 标记初始化完成 + def __del__(self): try: self.close() @@ -178,3 +196,6 @@ class SQLiteDBHandler: def close(self): self.cursor.close() self.conn.close() + + def get_stat(self): + return {} diff --git a/scrapy_proj/scrapy_proj/extensions/stats_extension.py b/scrapy_proj/scrapy_proj/extensions/stats_extension.py index 5eb78e5..fb125b8 100644 --- a/scrapy_proj/scrapy_proj/extensions/stats_extension.py +++ b/scrapy_proj/scrapy_proj/extensions/stats_extension.py @@ -5,6 +5,7 @@ from datetime import datetime from scrapy import signals from scrapy.exceptions import NotConfigured from twisted.internet import task +from scrapy_proj.db_wapper.spider_db_handler import spider_handler_registry logger = logging.getLogger() # 修改点:使用全局 logger @@ -16,7 +17,12 @@ class StatsExtension: self.spider_name = None self.loop = None # 添加循环任务 self.current_status = 'running' # 初始化状态 + self.db_handlers = {} + def get_db_handler(self, spider_name): + # 获取数据库连接对象 + return self.db_handlers.get(spider_name.lower()) + @classmethod def from_crawler(cls, crawler): interval = crawler.settings.getint('STATS_EXPORT_INTERVAL', 600) @@ -36,6 +42,14 @@ class StatsExtension: logger.info(f"Spider {spider.name} opened - StatsExtension initialized") #self._export_stats(spider) + # 获取数据库连接对象 + spider_name = spider.name.lower() + handler_class = spider_handler_registry.get(spider_name) + if handler_class: + self.db_handlers[spider_name] = handler_class() + else: + spider.logger.warning(f"spider({spider.name}) has no db instance.") + # 创建并启动循环任务 self.loop = task.LoopingCall(self._export_stats, spider) self.loop.start(self.interval) # 每隔interval秒执行一次 @@ -48,8 +62,18 @@ class StatsExtension: self.current_status = f"stop({reason})" # 终止状态 self._export_stats(spider) logger.info(f"Spider {spider.name} closed - reason: {reason}") + + handler = self.get_db_handler(spider.name) + if handler: + handler.close() def _export_stats(self, spider): + # 获取数据库统计信息 + db_stat = {} + handler = self.get_db_handler(spider.name) + if handler: + db_stat = handler.get_stat() + # 获取当前统计信息 stats = self.stats.get_stats() # 构建统计摘要 @@ -62,7 +86,8 @@ class StatsExtension: '200_cnt': stats.get('downloader/response_status_count/200', 0), '404_cnt': stats.get('downloader/response_status_count/404', 0), 'log_err_cnt': stats.get('log_count/ERROR', 0), - 'status': self.current_status + 'status': self.current_status, + **db_stat } # 打印统计信息 diff --git a/scrapy_proj/scrapy_proj/pipelines.py b/scrapy_proj/scrapy_proj/pipelines.py index 93d0f3c..aed981a 100644 --- a/scrapy_proj/scrapy_proj/pipelines.py +++ b/scrapy_proj/scrapy_proj/pipelines.py @@ -30,7 +30,8 @@ class SQLitePipeline(): spider_name = spider.name.lower() handler = self.db_handlers.pop(spider_name, None) if handler: - handler.close() + pass + #handler.close() # 这里不关闭,由统计中间件去关闭 def process_item(self, item, spider): spider_name = spider.name.lower() diff --git a/scrapy_proj/scrapy_proj/spiders/clm_spider.py b/scrapy_proj/scrapy_proj/spiders/clm_spider.py index 7969480..11decdb 100644 --- a/scrapy_proj/scrapy_proj/spiders/clm_spider.py +++ b/scrapy_proj/scrapy_proj/spiders/clm_spider.py @@ -28,6 +28,7 @@ class ClmSpider(BaseSpider): self.begin = parse_date_to_datetime(begin) if begin else None self.min_size = float(min_size) if min_size else 1.0 self.keywords_file = kwargs.get('file_path') if kwargs.get('file_path') else default_keywords_file + self.query_str = kwargs.get('query_str') if kwargs.get('query_str') else None self.logger.info(f"RUN CMD: {' '.join(sys.argv)}") @@ -69,13 +70,12 @@ class ClmSpider(BaseSpider): if not self.run_task: return - #tmp_query_str = f" groups='actress' and tags not like '%vixen%' " - tmp_query_str = f" 1=1 " + query_args = {} if self.debug: - keywords = db_clm.get_key_words(limit =5, query_str = tmp_query_str) - else: - #keywords = db_clm.get_key_words(groups='actress', tags='vixen') - keywords = db_clm.get_key_words(query_str = tmp_query_str) + query_args['limit'] = 5 + if self.query_str: + query_args['query_str'] = self.query_str + keywords = db_clm.get_key_words(**query_args) for item in keywords: words_id = item['id']