diff --git a/iafd/src/fetch.py b/iafd/src/fetch.py index d1b5ec3..953c133 100644 --- a/iafd/src/fetch.py +++ b/iafd/src/fetch.py @@ -2,6 +2,7 @@ import json import time import csv +import sys import argparse import logging from functools import partial @@ -9,6 +10,13 @@ import config import sqlite_utils as db_tools import iafd_scraper as scraper import utils +from pathlib import Path + +# 添加 src 目录到路径 +root_dir = str(Path(__file__).resolve().parent.parent.parent) +sys.path.append(root_dir) +from src.monitor.scheduler import CommandScheduler +from src.utils.utils import pretty_print_json config.setup_logging() @@ -389,6 +397,11 @@ def fetch_movies_detail(): def reset_actor_movie_cnt(): db_tools.reset_actor_movies() +def check_task_status(): + # 命令行参数处理 + result = db_tools.get_statics() + pretty_print_json(result) + # 建立缩写到函数的映射 function_map = { "astro": fetch_performers_by_astro, @@ -399,6 +412,7 @@ function_map = { "performers": fetch_performers_detail, "movies" : fetch_movies_detail, "reset_mv" : reset_actor_movie_cnt, + "check" : check_task_status, } # 主函数 @@ -415,6 +429,10 @@ def main(cmd, args_debug, args_force, args_skip_local): global skip_local skip_local = args_skip_local + if cmd.lower() == 'check': + check_task_status() + return None + # 开启任务 task_id = db_tools.insert_task_log() if task_id is None: @@ -423,6 +441,16 @@ def main(cmd, args_debug, args_force, args_skip_local): logging.info(f'running task. id: {task_id}, debug: {debug}, force: {force}, cmd: {cmd}') + # 要执行的Shell命令(示例) + shell_command = "cd ~/projects/resources/src/monitor; chmod u+x ./run.sh; ./run.sh iafd" + + # 创建命令调度器(30分钟执行一次) + scheduler = CommandScheduler( + command=shell_command, + interval=10 if debug else 1800 + ) + scheduler.run_periodically() + # 执行指定的函数 if cmd: function_names = args.cmd.split(",") # 拆分输入 @@ -444,6 +472,8 @@ def main(cmd, args_debug, args_force, args_skip_local): logging.info(f'all process completed!') db_tools.finalize_task_log(task_id) + scheduler.stop() + # TODO: # 1, 演员列表中的影片数量,与电影列表中聚合出来的影片数量,可能不同。一个原因是某个影片有多个导演,且导演又兼了演员。比如: # https://www.iafd.com/title.rme/id=0f79d81f-25ff-40d1-967a-24b99f03b79a diff --git a/iafd/src/sqlite_utils.py b/iafd/src/sqlite_utils.py index 591a43b..57ad78b 100644 --- a/iafd/src/sqlite_utils.py +++ b/iafd/src/sqlite_utils.py @@ -1034,6 +1034,64 @@ def finalize_task_log(task_id): except sqlite3.Error as e: logging.error(f"任务 {task_id} 结束失败: {e}") +def get_statics2(): + result={} + + try: + # 获取 performers、studios 等表的最终行数 + cursor.execute("SELECT COUNT(*) FROM iafd_performers") + result['iafd_actors'] = cursor.fetchone()[0] + + cursor.execute("SELECT COUNT(*) FROM iafd_performers where is_full_data=1") + result['act_full'] = cursor.fetchone()[0] + + cursor.execute("SELECT COUNT(*) FROM iafd_movies") + result['movies'] = cursor.fetchone()[0] + + cursor.execute("SELECT COUNT(*) FROM iafd_movies where is_full_data=1") + result['mov_full'] = cursor.fetchone()[0] + + cursor.execute("SELECT COUNT(*) FROM iafd_distributors") + result['dist'] = cursor.fetchone()[0] + + cursor.execute("SELECT COUNT(*) FROM iafd_studios") + result['stu'] = cursor.fetchone()[0] + + except sqlite3.Error as e: + logging.error(f"query failed: {e}") + + return result + + +def get_statics(): + try: + # 使用单个SQL查询获取所有统计数据 + cursor.execute(""" + SELECT + (SELECT COUNT(*) FROM iafd_performers) AS iafd_actors, + (SELECT COUNT(*) FROM iafd_performers WHERE is_full_data=1) AS act_full, + (SELECT COUNT(*) FROM iafd_movies) AS movies, + (SELECT COUNT(*) FROM iafd_movies WHERE is_full_data=1) AS mov_full, + (SELECT COUNT(*) FROM iafd_distributors) AS dist, + (SELECT COUNT(*) FROM iafd_studios) AS stu + """) + + # 获取查询结果 + row = cursor.fetchone() + if not row: + return {} + + # 手动定义列名(与SQL中的AS别名保持一致) + #columns = ['iafd_actors', 'act_full', 'movies', 'mov_full', 'dist', 'stu'] + columns = [desc[0] for desc in cursor.description] + + # 将元组结果转换为字典 + return dict(zip(columns, row)) + + except sqlite3.Error as e: + logging.error(f"查询失败: {e}") + return {} + if __name__ == "__main__": check_and_create_stat_table() diff --git a/src/db_utils/sqlite_db.py b/src/db_utils/sqlite_db.py index 889ada2..e859e8c 100644 --- a/src/db_utils/sqlite_db.py +++ b/src/db_utils/sqlite_db.py @@ -599,7 +599,7 @@ class JavbusDBHandler(DatabaseHandler): # 查询状态 - def get_statics(self): + def get_statics2(self): result = {} try: # 获取 performers、studios 等表的最终行数 @@ -633,6 +633,36 @@ class JavbusDBHandler(DatabaseHandler): return result + def get_statics(self): + try: + self.cursor.execute(f""" + SELECT + (SELECT COUNT(*) FROM {self.tbl_name_actors}) AS actors, + (SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE uncensored=1) AS act_un, + (SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE is_full_data=1) AS act_full, + (SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE uncensored=1 AND is_full_data=1) AS act_unc_full, + (SELECT COUNT(*) FROM {self.tbl_name_movies}) AS movies, + (SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE uncensored=1) AS mov_un, + (SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE is_full_data=1) AS mov_full, + (SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE uncensored=1 AND is_full_data=1) AS mov_un_full, + (SELECT COUNT(*) FROM {self.tbl_name_studios}) AS studios, + (SELECT COUNT(*) FROM {self.tbl_name_labels}) AS labels, + (SELECT COUNT(*) FROM {self.tbl_name_series}) AS series, + """) + + row = self.cursor.fetchone() + if not row: + return {} + + # 手动定义列名映射 + #columns = ['actors', 'act_un', 'act_full', 'act_unc_full', 'movies', 'mov_un', 'mov_full', 'mov_un_full'] + columns = [desc[0] for desc in cursor.description] + return dict(zip(columns, row)) + + except sqlite3.Error as e: + logging.error(f"query error: {e}") + return {} + # 处理影片的 无码 字段 def reset_movies_uncensored(self, check_and_do = 0): try: diff --git a/src/javbus/fetch.py b/src/javbus/fetch.py index 042b71d..387e711 100644 --- a/src/javbus/fetch.py +++ b/src/javbus/fetch.py @@ -11,6 +11,7 @@ import src.logger.logger as logger import src.db_utils.sqlite_db as sqlite_db import src.crawling.craw as craw import src.utils.utils as utils +import src.monitor.scheduler as scheduler_manager logger.setup_logging() db_tools = sqlite_db.JavbusDBHandler() @@ -469,6 +470,11 @@ def reset_movies_uncensored(): db_tools.reset_movies_uncensored(check_and_do=0 if debug else 1) db_tools.reset_actor_movies() +def check_task_status(): + # 命令行参数处理 + result = db_tools.get_statics() + utils.pretty_print_json(result) + # 建立缩写到函数的映射 function_map = { "actor_list": fetch_actor_list, @@ -479,11 +485,17 @@ function_map = { "movies" : fetch_movies_detail, "langs" : update_multi_langs, "tags" : update_multilang_tags, - "reset_un" : reset_movies_uncensored + "reset_un" : reset_movies_uncensored, + "check" : check_task_status } # 主函数 def main(cmd, args): + # 如果是检查状态,单独执行 + if cmd.lower() == 'check': + check_task_status() + return None + # 开启任务 task_id = db_tools.insert_task_log() if task_id is None: @@ -492,6 +504,16 @@ def main(cmd, args): logging.info(f"running task. id: {task_id}, args: {args}") + # 要执行的Shell命令(示例) + shell_command = "cd ~/projects/resources/src/monitor; chmod u+x ./run.sh; ./run.sh javbus" + + # 创建命令调度器(30分钟执行一次) + scheduler = scheduler_manager.CommandScheduler( + command=shell_command, + interval=1800 + ) + scheduler.run_periodically() + # 执行指定的函数 if cmd: function_names = args.cmd.split(",") # 拆分输入 @@ -512,6 +534,8 @@ def main(cmd, args): logging.info(f'all process completed!') db_tools.finalize_task_log(task_id) + + scheduler.stop() # TODO: # 1, tags 和 studio / label / series 的多语言 ---done diff --git a/src/monitor/check_javbus.py b/src/monitor/check_javbus.py deleted file mode 100644 index f8fb529..0000000 --- a/src/monitor/check_javbus.py +++ /dev/null @@ -1,13 +0,0 @@ -import json -import time -import src.db_utils.sqlite_db as sqlite_db -import src.utils.utils as utils - -db_tools = sqlite_db.JavbusDBHandler() - -if __name__ == "__main__": - # 命令行参数处理 - result = db_tools.get_statics() - utils.pretty_print_json(result) - - diff --git a/src/monitor/run.sh b/src/monitor/run.sh new file mode 100755 index 0000000..5861f34 --- /dev/null +++ b/src/monitor/run.sh @@ -0,0 +1,137 @@ +#!/bin/bash + +: << 'EOF' +执行本地脚本,以实现任务的状态监控。 +远程机上部署发送通知(企微)的脚本,把结果发送出来。 +EOF + +# 颜色定义 +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[0;33m' +NC='\033[0m' # 无颜色 + +# 定义命令映射和函数 +declare -A COMMAND_MAP +declare -a COMMAND_ORDER # 记录命令注册顺序,用于帮助信息 + +REMOTE_SERVER="101.33.230.186" +REMOTR_USER="root" +SSH_OTRS="-o StrictHostKeyChecking=no -o ConnectTimeout=10" + +# 注册命令函数并添加到映射 +register_command() { + local cmd_name=$1 + local cmd_desc=$2 + COMMAND_MAP[$cmd_name]=$cmd_desc + COMMAND_ORDER+=($cmd_name) +} + +# 执行本地命令并将结果发送到远程的通用函数 +execute_local_and_send_remote() { + local local_cmd=$1 + local desc=$2 + + echo -e "${YELLOW}执行命令: ${desc}${NC}" + echo "----------------------------------------" + + # 执行本地命令 + RESULT=$(eval $local_cmd) + EXIT_CODE=$? + + # 输出结果 + if [ $EXIT_CODE -eq 0 ]; then + echo -e "${GREEN}命令执行成功${NC}" + echo "$RESULT" + + # 调用远程脚本并将本地命令结果作为输入 + ssh $SSH_OTRS $REMOTR_USER@$REMOTE_SERVER "cd /root/projects/devops/tools; python3 ./send_to_wecom.py '$RESULT'" + return 0 + else + echo -e "${RED}命令执行失败,退出代码: $EXIT_CODE${NC}" + return $EXIT_CODE + fi +} + + +# 定义具体命令函数 +cmd_javbus() { + execute_local_and_send_remote \ + "cd /root/projects/resources && python3 -m src.javbus.fetch --cmd=check" \ + "检查服务状态" +} + +# 定义具体命令函数 +cmd_aabook() { + execute_local_and_send_remote \ + "cd /root/projects/resources/aabook/src/ && python3 ./check_status.py" \ + "检查服务状态" +} + +# 定义具体命令函数 +cmd_iafd() { + execute_local_and_send_remote \ + "cd /root/projects/resources/iafd/src/ && python3 ./fetch.py --cmd=check" \ + "检查服务状态" +} + +# 注册命令 +register_command "aabook" "查询 aabook 任务进度" +register_command "javbus" "查询 javbus 任务进度" +register_command "iafd" "查询 iafd 任务进度" + +# 显示帮助信息 +show_help() { + echo "用法: $0 [命令1] [命令2] [...]" + echo "可用命令:" + + for cmd in "${COMMAND_ORDER[@]}"; do + printf " %-10s %s\n" "$cmd" "${COMMAND_MAP[$cmd]}" + done + + echo + echo "示例:" + echo " $0 check fetch" + echo " $0 backup" +} + +# 主函数 +main() { + # 检查是否提供了命令参数 + if [ $# -eq 0 ]; then + show_help + exit 1 + fi + + local success=true + + # 处理每个命令参数 + for cmd in "$@"; do + # 检查命令是否存在 + if [[ -z "${COMMAND_MAP[$cmd]}" ]]; then + echo -e "${RED}错误: 未知命令 '$cmd'${NC}" + show_help + exit 1 + fi + + # 执行对应的命令函数 + cmd_${cmd} + if [ $? -ne 0 ]; then + success=false + fi + + echo + done + + # 输出总体执行结果 + if [ "$success" = true ]; then + echo -e "${GREEN}所有命令执行成功${NC}" + exit 0 + else + echo -e "${RED}部分命令执行失败${NC}" + exit 1 + fi +} + +# 执行主函数 +main "$@" \ No newline at end of file diff --git a/src/monitor/scheduler.py b/src/monitor/scheduler.py new file mode 100644 index 0000000..ecdb7c1 --- /dev/null +++ b/src/monitor/scheduler.py @@ -0,0 +1,133 @@ +import subprocess +import time +import threading +import logging +from typing import Callable +import signal + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +class CommandScheduler: + """命令调度器:立即执行命令并定时重复执行,退出前再执行一次""" + + def __init__(self, command: str, interval: int = 1800, execute_immediately: bool = True): + """ + 初始化命令调度器 + + 参数: + command: 要执行的Shell命令 + interval: 执行间隔(秒),默认30分钟 + execute_immediately: 是否立即执行命令,默认True + """ + self.command = command + self.interval = interval + self.execute_immediately = execute_immediately + self._stop_event = threading.Event() + self._timer_thread = None + self._should_execute_on_exit = True # 控制退出前是否执行命令 + + def run_periodically(self) -> None: + """启动周期性执行命令""" + def _execute_command(): + # 立即执行命令(如果设置了) + if self.execute_immediately: + self._run_command() + + # 然后按间隔周期性执行 + while not self._stop_event.wait(self.interval): + self._run_command() + + self._timer_thread = threading.Thread(target=_execute_command, daemon=True) + self._timer_thread.start() + logger.info(f"定时任务已启动,每 {self.interval} 秒执行一次") + + def _run_command(self) -> None: + """执行命令的辅助方法""" + try: + logger.info(f"开始执行命令: {self.command}") + result = subprocess.run( + self.command, + shell=True, + capture_output=True, + text=True + ) + + if result.returncode == 0: + logger.info("命令执行成功") + logger.debug(f"标准输出: {result.stdout}") + else: + logger.error(f"命令执行失败,返回码: {result.returncode}") + logger.error(f"错误输出: {result.stderr}") + except Exception as e: + logger.error(f"执行命令时发生异常: {e}") + + def stop(self, execute_on_exit: bool = True) -> None: + """ + 停止定时任务 + + 参数: + execute_on_exit: 退出前是否执行一次命令,默认True + """ + self._should_execute_on_exit = execute_on_exit + + # 设置停止事件 + self._stop_event.set() + + # 等待定时器线程结束 + if self._timer_thread: + self._timer_thread.join(timeout=5) + + # 在退出前执行一次命令 + if self._should_execute_on_exit: + logger.info("执行退出前的最后一次命令...") + self._run_command() + + logger.info("定时任务已停止") + + +def main(): + # 要执行的Shell命令(示例) + shell_command = "cd ~/shell; ./run.sh" + + # 创建命令调度器(30分钟执行一次,立即执行首次) + scheduler = CommandScheduler( + command=shell_command, + interval=1800, + execute_immediately=True + ) + + # 注册信号处理函数 + def signal_handler(signum, frame): + logger.info(f"接收到信号 {signum},准备退出...") + scheduler.stop() # 停止时执行最后一次命令 + raise SystemExit(0) + + # 捕获常见的终止信号 + signal.signal(signal.SIGINT, signal_handler) # Ctrl+C + signal.signal(signal.SIGTERM, signal_handler) # 普通终止信号 + + scheduler.run_periodically() + + try: + logger.info("主程序运行中,按Ctrl+C退出...") + while True: + time.sleep(60) # 主程序保持运行 + except SystemExit: + # 正常退出,已在信号处理函数中完成清理 + pass + except Exception as e: + logger.error(f"发生异常: {e}") + scheduler.stop(execute_on_exit=False) # 异常时不执行最后一次命令 + finally: + # 确保资源被释放 + if not scheduler._stop_event.is_set(): + scheduler.stop(execute_on_exit=False) + + +if __name__ == "__main__": + main() \ No newline at end of file