modify scripts

This commit is contained in:
2025-07-21 10:24:59 +08:00
parent 34728b7868
commit f769698f63
6 changed files with 258 additions and 12 deletions

114
scrapy_proj/cron/cron_weekly.sh Executable file
View File

@ -0,0 +1,114 @@
#!/bin/bash
# ==============================================
# 配置区:可根据需求修改或扩展
# ==============================================
# 项目基础路径自动计算脚本所在目录的上一级即scrapy_proj/
SCRAPY_PROJ_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)
LOG_DIR="${SCRAPY_PROJ_DIR}/log" # 日志存放目录
SLEEP_SECONDS=60 # 每个Spider执行间隔
# 计算公共日期参数当前日期往前减8天yyyy-mm-dd
COMMON_DATE_PARAM=$(date -d "8 days ago" +%Y-%m-%d)
# ==============================================
# 注册Spider在此处为每个Spider定义执行命令
# 格式register_spider "Spider名称" "完整执行命令(支持变量和自定义参数)"
# ==============================================
# 定义注册函数(无需修改)
declare -a SPIDER_REGISTRY=() # 存储注册的Spider信息
register_spider() {
local spider_name="$1"
local execute_cmd="$2"
SPIDER_REGISTRY+=("${spider_name}|${execute_cmd}")
}
# 注册示例:根据实际需求修改或添加
# SpiderA仅需公共日期参数
register_spider "u3c3" "scrapy crawl u3c3 -a begin=${COMMON_DATE_PARAM}"
# SpiderB需要公共日期+自定义参数
register_spider "sis" "scrapy crawl sis -a begin=${COMMON_DATE_PARAM}"
# SpiderB需要公共日期+自定义参数
register_spider "clm" "scrapy crawl clm -a begin=${COMMON_DATE_PARAM} -a mod='update' "
# SpiderC完全自定义参数不依赖公共日期
register_spider "pbox" "scrapy crawl pbox "
# ==============================================
# 核心执行逻辑(无需修改)
# ==============================================
# 初始化日志目录
mkdir -p "${LOG_DIR}"
current_time=$(date +"%Y%m%d") # 执行时间戳(用于日志命名)
main_log="${LOG_DIR}/cron_${current_time}.log"
# 日志函数:带时间戳并写入主日志
log() {
local msg="$1"
local timestamp=$(date +"%Y-%m-%d %H:%M:%S")
echo "[$timestamp] $msg" | tee -a "${main_log}"
}
# 执行单个Spider的函数
execute_spider() {
local spider_name="$1"
local execute_cmd="$2"
log "===== 开始执行 ${spider_name} ====="
log "执行命令:${execute_cmd}"
# 单个Spider的日志文件独立记录便于排查
local spider_log="${LOG_DIR}/${spider_name}_${current_time}.log"
# 执行命令切换到项目目录确保scrapy命令生效
(cd "${SCRAPY_PROJ_DIR}" && eval "${execute_cmd}") > "${spider_log}" 2>&1
local exit_code=$? # 捕获命令执行结果
# 执行结果判断
if [ ${exit_code} -eq 0 ]; then
log "${spider_name} 执行成功(日志:${spider_log}"
else
log "ERROR: ${spider_name} 执行失败(日志:${spider_log},退出码:${exit_code}"
fi
return ${exit_code}
}
# ==============================================
# 主流程遍历注册的Spider并顺序执行
# ==============================================
log "===== 爬虫调度脚本启动 ====="
log "项目路径:${SCRAPY_PROJ_DIR}"
log "公共日期参数:${COMMON_DATE_PARAM}"
log "已注册Spider数量${#SPIDER_REGISTRY[@]}"
# 检查注册的Spider是否为空
if [ ${#SPIDER_REGISTRY[@]} -eq 0 ]; then
log "ERROR: 未注册任何Spider脚本终止"
exit 1
fi
# 遍历执行所有注册的Spider
for spider_info in "${SPIDER_REGISTRY[@]}"; do
# 解析注册信息(分割名称和命令)
IFS="|" read -r spider_name execute_cmd <<< "${spider_info}"
# 执行当前Spider
execute_spider "${spider_name}" "${execute_cmd}"
last_exit_code=$?
# 若开启“失败即终止”,取消以下注释(某一个失败后不再执行后续)
# if [ ${last_exit_code} -ne 0 ]; then
# log "ERROR: 因${spider_name}执行失败,终止后续执行"
# exit ${last_exit_code}
# fi
# 不是最后一个Spider则休眠
if [ "${spider_info}" != "${SPIDER_REGISTRY[-1]}" ]; then
log "等待${SLEEP_SECONDS}秒后执行下一个Spider..."
sleep ${SLEEP_SECONDS}
fi
done
log "===== 所有注册的Spider执行完毕 ====="
exit 0

View File

@ -42,6 +42,26 @@ class SisDBHandler(SQLiteDBHandler):
''')
self.conn.commit()
# 统计函数
def get_stat(self):
try:
self.cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM {self.tbl_name_sis}) AS cnt
""")
row = self.cursor.fetchone()
if not row:
logging.warning(f"query no results.")
return {}
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return {}
@register_handler(comm.SPIDER_NAME_U3C3)
class U3C3DBHandler(SQLiteDBHandler):
@ -71,6 +91,26 @@ class U3C3DBHandler(SQLiteDBHandler):
''')
self.conn.commit()
# 统计函数
def get_stat(self):
try:
self.cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM {self.tbl_name_u3c3}) AS cnt
""")
row = self.cursor.fetchone()
if not row:
logging.warning(f"query no results.")
return {}
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return {}
@register_handler(comm.SPIDER_NAME_CLM)
class ClmDBHandler(SQLiteDBHandler):
@ -161,6 +201,28 @@ class ClmDBHandler(SQLiteDBHandler):
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 统计函数
def get_stat(self):
try:
self.cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM {self.tbl_name_clm_keywords}) AS words,
(SELECT COUNT(*) FROM {self.tbl_name_clm_index}) AS magnets,
(SELECT COUNT(*) FROM {self.tbl_name_words_index}) AS wd_mag
""")
row = self.cursor.fetchone()
if not row:
logging.warning(f"query no results.")
return {}
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return {}
@register_handler(comm.SPIDER_NAME_IAFD)
@ -505,6 +567,29 @@ class PboxDBHandler(SQLiteDBHandler):
logging.error(f"查询 href 失败: {e}")
return []
# 统计函数
def get_stat(self):
try:
self.cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM {self.tbl_studios}) AS studios,
(SELECT COUNT(*) FROM {self.tbl_movies}) AS movies,
(SELECT COUNT(*) FROM {self.tbl_actor}) AS actors,
(SELECT COUNT(*) FROM {self.tbl_actor_mov}) AS act_mov
""")
row = self.cursor.fetchone()
if not row:
logging.warning(f"query no results.")
return {}
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return {}
def close_spider(self, spider):
# 关闭数据库连接
self.conn.close()

View File

@ -8,9 +8,23 @@ global_share_data_dir = f'{home_dir}/sharedata'
default_dbpath = f"{global_share_data_dir}/sqlite/scrapy.db"
shared_db_path = f"{global_share_data_dir}/sqlite/shared.db"
# 单例元类
class SingletonMeta(type):
_instances = {} # 存储每个类的唯一实例
def __call__(cls, *args, **kwargs):
# 检查实例是否已存在,不存在则创建
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
# 数据库基类,封装了通用的操作。
class SQLiteDBHandler:
class SQLiteDBHandler(metaclass=SingletonMeta): # 应用单例元类
def __init__(self, db_path=None):
# 防止重复初始化单例模式下可能被多次调用__init__
if hasattr(self, 'initialized') and self.initialized:
return
# 使用传入的 db_path 或默认路径
self.DB_PATH = db_path or default_dbpath
@ -19,18 +33,22 @@ class SQLiteDBHandler:
os.makedirs(os.path.dirname(db_path))
self.conn = sqlite3.connect(self.DB_PATH, check_same_thread=False)
#self.conn.execute('PRAGMA journal_mode = WAL') # 启用 WAL(Write-Ahead Logging) 模式
#self.conn.commit()
self.conn.row_factory = sqlite3.Row # 结果集支持字典式访问
self.cursor = self.conn.cursor()
#self.conn.execute('PRAGMA journal_mode = WAL') # 启用 WAL(Write-Ahead Logging) 模式
#self.conn.commit()
self.conn.execute('PRAGMA journal_mode = DELETE') # 切换回传统模式
self.conn.commit() # 确保设置生效
# 检查 SQLite 版本
self.lower_sqlite_version = False
sqlite_version = sqlite3.sqlite_version_info
if sqlite_version < (3, 24, 0):
self.lower_sqlite_version = True
self.initialized = True # 标记初始化完成
def __del__(self):
try:
self.close()
@ -178,3 +196,6 @@ class SQLiteDBHandler:
def close(self):
self.cursor.close()
self.conn.close()
def get_stat(self):
return {}

View File

@ -5,6 +5,7 @@ from datetime import datetime
from scrapy import signals
from scrapy.exceptions import NotConfigured
from twisted.internet import task
from scrapy_proj.db_wapper.spider_db_handler import spider_handler_registry
logger = logging.getLogger() # 修改点:使用全局 logger
@ -16,7 +17,12 @@ class StatsExtension:
self.spider_name = None
self.loop = None # 添加循环任务
self.current_status = 'running' # 初始化状态
self.db_handlers = {}
def get_db_handler(self, spider_name):
# 获取数据库连接对象
return self.db_handlers.get(spider_name.lower())
@classmethod
def from_crawler(cls, crawler):
interval = crawler.settings.getint('STATS_EXPORT_INTERVAL', 600)
@ -36,6 +42,14 @@ class StatsExtension:
logger.info(f"Spider {spider.name} opened - StatsExtension initialized")
#self._export_stats(spider)
# 获取数据库连接对象
spider_name = spider.name.lower()
handler_class = spider_handler_registry.get(spider_name)
if handler_class:
self.db_handlers[spider_name] = handler_class()
else:
spider.logger.warning(f"spider({spider.name}) has no db instance.")
# 创建并启动循环任务
self.loop = task.LoopingCall(self._export_stats, spider)
self.loop.start(self.interval) # 每隔interval秒执行一次
@ -48,8 +62,18 @@ class StatsExtension:
self.current_status = f"stop({reason})" # 终止状态
self._export_stats(spider)
logger.info(f"Spider {spider.name} closed - reason: {reason}")
handler = self.get_db_handler(spider.name)
if handler:
handler.close()
def _export_stats(self, spider):
# 获取数据库统计信息
db_stat = {}
handler = self.get_db_handler(spider.name)
if handler:
db_stat = handler.get_stat()
# 获取当前统计信息
stats = self.stats.get_stats()
# 构建统计摘要
@ -62,7 +86,8 @@ class StatsExtension:
'200_cnt': stats.get('downloader/response_status_count/200', 0),
'404_cnt': stats.get('downloader/response_status_count/404', 0),
'log_err_cnt': stats.get('log_count/ERROR', 0),
'status': self.current_status
'status': self.current_status,
**db_stat
}
# 打印统计信息

View File

@ -30,7 +30,8 @@ class SQLitePipeline():
spider_name = spider.name.lower()
handler = self.db_handlers.pop(spider_name, None)
if handler:
handler.close()
pass
#handler.close() # 这里不关闭,由统计中间件去关闭
def process_item(self, item, spider):
spider_name = spider.name.lower()

View File

@ -28,6 +28,7 @@ class ClmSpider(BaseSpider):
self.begin = parse_date_to_datetime(begin) if begin else None
self.min_size = float(min_size) if min_size else 1.0
self.keywords_file = kwargs.get('file_path') if kwargs.get('file_path') else default_keywords_file
self.query_str = kwargs.get('query_str') if kwargs.get('query_str') else None
self.logger.info(f"RUN CMD: {' '.join(sys.argv)}")
@ -69,13 +70,12 @@ class ClmSpider(BaseSpider):
if not self.run_task:
return
#tmp_query_str = f" groups='actress' and tags not like '%vixen%' "
tmp_query_str = f" 1=1 "
query_args = {}
if self.debug:
keywords = db_clm.get_key_words(limit =5, query_str = tmp_query_str)
else:
#keywords = db_clm.get_key_words(groups='actress', tags='vixen')
keywords = db_clm.get_key_words(query_str = tmp_query_str)
query_args['limit'] = 5
if self.query_str:
query_args['query_str'] = self.query_str
keywords = db_clm.get_key_words(**query_args)
for item in keywords:
words_id = item['id']