modify scripts

This commit is contained in:
oscarz
2025-07-01 15:12:59 +08:00
parent d1ef3957b2
commit 3a3fe5624f
7 changed files with 414 additions and 15 deletions

View File

@ -2,6 +2,7 @@
import json import json
import time import time
import csv import csv
import sys
import argparse import argparse
import logging import logging
from functools import partial from functools import partial
@ -9,6 +10,13 @@ import config
import sqlite_utils as db_tools import sqlite_utils as db_tools
import iafd_scraper as scraper import iafd_scraper as scraper
import utils import utils
from pathlib import Path
# 添加 src 目录到路径
root_dir = str(Path(__file__).resolve().parent.parent.parent)
sys.path.append(root_dir)
from src.monitor.scheduler import CommandScheduler
from src.utils.utils import pretty_print_json
config.setup_logging() config.setup_logging()
@ -389,6 +397,11 @@ def fetch_movies_detail():
def reset_actor_movie_cnt(): def reset_actor_movie_cnt():
db_tools.reset_actor_movies() db_tools.reset_actor_movies()
def check_task_status():
# 命令行参数处理
result = db_tools.get_statics()
pretty_print_json(result)
# 建立缩写到函数的映射 # 建立缩写到函数的映射
function_map = { function_map = {
"astro": fetch_performers_by_astro, "astro": fetch_performers_by_astro,
@ -399,6 +412,7 @@ function_map = {
"performers": fetch_performers_detail, "performers": fetch_performers_detail,
"movies" : fetch_movies_detail, "movies" : fetch_movies_detail,
"reset_mv" : reset_actor_movie_cnt, "reset_mv" : reset_actor_movie_cnt,
"check" : check_task_status,
} }
# 主函数 # 主函数
@ -415,6 +429,10 @@ def main(cmd, args_debug, args_force, args_skip_local):
global skip_local global skip_local
skip_local = args_skip_local skip_local = args_skip_local
if cmd.lower() == 'check':
check_task_status()
return None
# 开启任务 # 开启任务
task_id = db_tools.insert_task_log() task_id = db_tools.insert_task_log()
if task_id is None: if task_id is None:
@ -423,6 +441,16 @@ def main(cmd, args_debug, args_force, args_skip_local):
logging.info(f'running task. id: {task_id}, debug: {debug}, force: {force}, cmd: {cmd}') logging.info(f'running task. id: {task_id}, debug: {debug}, force: {force}, cmd: {cmd}')
# 要执行的Shell命令示例
shell_command = "cd ~/projects/resources/src/monitor; chmod u+x ./run.sh; ./run.sh iafd"
# 创建命令调度器30分钟执行一次
scheduler = CommandScheduler(
command=shell_command,
interval=10 if debug else 1800
)
scheduler.run_periodically()
# 执行指定的函数 # 执行指定的函数
if cmd: if cmd:
function_names = args.cmd.split(",") # 拆分输入 function_names = args.cmd.split(",") # 拆分输入
@ -444,6 +472,8 @@ def main(cmd, args_debug, args_force, args_skip_local):
logging.info(f'all process completed!') logging.info(f'all process completed!')
db_tools.finalize_task_log(task_id) db_tools.finalize_task_log(task_id)
scheduler.stop()
# TODO: # TODO:
# 1, 演员列表中的影片数量,与电影列表中聚合出来的影片数量,可能不同。一个原因是某个影片有多个导演,且导演又兼了演员。比如: # 1, 演员列表中的影片数量,与电影列表中聚合出来的影片数量,可能不同。一个原因是某个影片有多个导演,且导演又兼了演员。比如:
# https://www.iafd.com/title.rme/id=0f79d81f-25ff-40d1-967a-24b99f03b79a # https://www.iafd.com/title.rme/id=0f79d81f-25ff-40d1-967a-24b99f03b79a

View File

@ -1034,6 +1034,64 @@ def finalize_task_log(task_id):
except sqlite3.Error as e: except sqlite3.Error as e:
logging.error(f"任务 {task_id} 结束失败: {e}") logging.error(f"任务 {task_id} 结束失败: {e}")
def get_statics2():
result={}
try:
# 获取 performers、studios 等表的最终行数
cursor.execute("SELECT COUNT(*) FROM iafd_performers")
result['iafd_actors'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_performers where is_full_data=1")
result['act_full'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_movies")
result['movies'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_movies where is_full_data=1")
result['mov_full'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_distributors")
result['dist'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_studios")
result['stu'] = cursor.fetchone()[0]
except sqlite3.Error as e:
logging.error(f"query failed: {e}")
return result
def get_statics():
try:
# 使用单个SQL查询获取所有统计数据
cursor.execute("""
SELECT
(SELECT COUNT(*) FROM iafd_performers) AS iafd_actors,
(SELECT COUNT(*) FROM iafd_performers WHERE is_full_data=1) AS act_full,
(SELECT COUNT(*) FROM iafd_movies) AS movies,
(SELECT COUNT(*) FROM iafd_movies WHERE is_full_data=1) AS mov_full,
(SELECT COUNT(*) FROM iafd_distributors) AS dist,
(SELECT COUNT(*) FROM iafd_studios) AS stu
""")
# 获取查询结果
row = cursor.fetchone()
if not row:
return {}
# 手动定义列名与SQL中的AS别名保持一致
#columns = ['iafd_actors', 'act_full', 'movies', 'mov_full', 'dist', 'stu']
columns = [desc[0] for desc in cursor.description]
# 将元组结果转换为字典
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"查询失败: {e}")
return {}
if __name__ == "__main__": if __name__ == "__main__":
check_and_create_stat_table() check_and_create_stat_table()

View File

@ -599,7 +599,7 @@ class JavbusDBHandler(DatabaseHandler):
# 查询状态 # 查询状态
def get_statics(self): def get_statics2(self):
result = {} result = {}
try: try:
# 获取 performers、studios 等表的最终行数 # 获取 performers、studios 等表的最终行数
@ -633,6 +633,36 @@ class JavbusDBHandler(DatabaseHandler):
return result return result
def get_statics(self):
try:
self.cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM {self.tbl_name_actors}) AS actors,
(SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE uncensored=1) AS act_un,
(SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE is_full_data=1) AS act_full,
(SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE uncensored=1 AND is_full_data=1) AS act_unc_full,
(SELECT COUNT(*) FROM {self.tbl_name_movies}) AS movies,
(SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE uncensored=1) AS mov_un,
(SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE is_full_data=1) AS mov_full,
(SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE uncensored=1 AND is_full_data=1) AS mov_un_full,
(SELECT COUNT(*) FROM {self.tbl_name_studios}) AS studios,
(SELECT COUNT(*) FROM {self.tbl_name_labels}) AS labels,
(SELECT COUNT(*) FROM {self.tbl_name_series}) AS series,
""")
row = self.cursor.fetchone()
if not row:
return {}
# 手动定义列名映射
#columns = ['actors', 'act_un', 'act_full', 'act_unc_full', 'movies', 'mov_un', 'mov_full', 'mov_un_full']
columns = [desc[0] for desc in cursor.description]
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return {}
# 处理影片的 无码 字段 # 处理影片的 无码 字段
def reset_movies_uncensored(self, check_and_do = 0): def reset_movies_uncensored(self, check_and_do = 0):
try: try:

View File

@ -11,6 +11,7 @@ import src.logger.logger as logger
import src.db_utils.sqlite_db as sqlite_db import src.db_utils.sqlite_db as sqlite_db
import src.crawling.craw as craw import src.crawling.craw as craw
import src.utils.utils as utils import src.utils.utils as utils
import src.monitor.scheduler as scheduler_manager
logger.setup_logging() logger.setup_logging()
db_tools = sqlite_db.JavbusDBHandler() db_tools = sqlite_db.JavbusDBHandler()
@ -469,6 +470,11 @@ def reset_movies_uncensored():
db_tools.reset_movies_uncensored(check_and_do=0 if debug else 1) db_tools.reset_movies_uncensored(check_and_do=0 if debug else 1)
db_tools.reset_actor_movies() db_tools.reset_actor_movies()
def check_task_status():
# 命令行参数处理
result = db_tools.get_statics()
utils.pretty_print_json(result)
# 建立缩写到函数的映射 # 建立缩写到函数的映射
function_map = { function_map = {
"actor_list": fetch_actor_list, "actor_list": fetch_actor_list,
@ -479,11 +485,17 @@ function_map = {
"movies" : fetch_movies_detail, "movies" : fetch_movies_detail,
"langs" : update_multi_langs, "langs" : update_multi_langs,
"tags" : update_multilang_tags, "tags" : update_multilang_tags,
"reset_un" : reset_movies_uncensored "reset_un" : reset_movies_uncensored,
"check" : check_task_status
} }
# 主函数 # 主函数
def main(cmd, args): def main(cmd, args):
# 如果是检查状态,单独执行
if cmd.lower() == 'check':
check_task_status()
return None
# 开启任务 # 开启任务
task_id = db_tools.insert_task_log() task_id = db_tools.insert_task_log()
if task_id is None: if task_id is None:
@ -492,6 +504,16 @@ def main(cmd, args):
logging.info(f"running task. id: {task_id}, args: {args}") logging.info(f"running task. id: {task_id}, args: {args}")
# 要执行的Shell命令示例
shell_command = "cd ~/projects/resources/src/monitor; chmod u+x ./run.sh; ./run.sh javbus"
# 创建命令调度器30分钟执行一次
scheduler = scheduler_manager.CommandScheduler(
command=shell_command,
interval=1800
)
scheduler.run_periodically()
# 执行指定的函数 # 执行指定的函数
if cmd: if cmd:
function_names = args.cmd.split(",") # 拆分输入 function_names = args.cmd.split(",") # 拆分输入
@ -512,6 +534,8 @@ def main(cmd, args):
logging.info(f'all process completed!') logging.info(f'all process completed!')
db_tools.finalize_task_log(task_id) db_tools.finalize_task_log(task_id)
scheduler.stop()
# TODO: # TODO:
# 1, tags 和 studio / label / series 的多语言 ---done # 1, tags 和 studio / label / series 的多语言 ---done

View File

@ -1,13 +0,0 @@
import json
import time
import src.db_utils.sqlite_db as sqlite_db
import src.utils.utils as utils
db_tools = sqlite_db.JavbusDBHandler()
if __name__ == "__main__":
# 命令行参数处理
result = db_tools.get_statics()
utils.pretty_print_json(result)

137
src/monitor/run.sh Executable file
View File

@ -0,0 +1,137 @@
#!/bin/bash
: << 'EOF'
执行本地脚本,以实现任务的状态监控。
远程机上部署发送通知(企微)的脚本,把结果发送出来。
EOF
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
NC='\033[0m' # 无颜色
# 定义命令映射和函数
declare -A COMMAND_MAP
declare -a COMMAND_ORDER # 记录命令注册顺序,用于帮助信息
REMOTE_SERVER="101.33.230.186"
REMOTR_USER="root"
SSH_OTRS="-o StrictHostKeyChecking=no -o ConnectTimeout=10"
# 注册命令函数并添加到映射
register_command() {
local cmd_name=$1
local cmd_desc=$2
COMMAND_MAP[$cmd_name]=$cmd_desc
COMMAND_ORDER+=($cmd_name)
}
# 执行本地命令并将结果发送到远程的通用函数
execute_local_and_send_remote() {
local local_cmd=$1
local desc=$2
echo -e "${YELLOW}执行命令: ${desc}${NC}"
echo "----------------------------------------"
# 执行本地命令
RESULT=$(eval $local_cmd)
EXIT_CODE=$?
# 输出结果
if [ $EXIT_CODE -eq 0 ]; then
echo -e "${GREEN}命令执行成功${NC}"
echo "$RESULT"
# 调用远程脚本并将本地命令结果作为输入
ssh $SSH_OTRS $REMOTR_USER@$REMOTE_SERVER "cd /root/projects/devops/tools; python3 ./send_to_wecom.py '$RESULT'"
return 0
else
echo -e "${RED}命令执行失败,退出代码: $EXIT_CODE${NC}"
return $EXIT_CODE
fi
}
# 定义具体命令函数
cmd_javbus() {
execute_local_and_send_remote \
"cd /root/projects/resources && python3 -m src.javbus.fetch --cmd=check" \
"检查服务状态"
}
# 定义具体命令函数
cmd_aabook() {
execute_local_and_send_remote \
"cd /root/projects/resources/aabook/src/ && python3 ./check_status.py" \
"检查服务状态"
}
# 定义具体命令函数
cmd_iafd() {
execute_local_and_send_remote \
"cd /root/projects/resources/iafd/src/ && python3 ./fetch.py --cmd=check" \
"检查服务状态"
}
# 注册命令
register_command "aabook" "查询 aabook 任务进度"
register_command "javbus" "查询 javbus 任务进度"
register_command "iafd" "查询 iafd 任务进度"
# 显示帮助信息
show_help() {
echo "用法: $0 [命令1] [命令2] [...]"
echo "可用命令:"
for cmd in "${COMMAND_ORDER[@]}"; do
printf " %-10s %s\n" "$cmd" "${COMMAND_MAP[$cmd]}"
done
echo
echo "示例:"
echo " $0 check fetch"
echo " $0 backup"
}
# 主函数
main() {
# 检查是否提供了命令参数
if [ $# -eq 0 ]; then
show_help
exit 1
fi
local success=true
# 处理每个命令参数
for cmd in "$@"; do
# 检查命令是否存在
if [[ -z "${COMMAND_MAP[$cmd]}" ]]; then
echo -e "${RED}错误: 未知命令 '$cmd'${NC}"
show_help
exit 1
fi
# 执行对应的命令函数
cmd_${cmd}
if [ $? -ne 0 ]; then
success=false
fi
echo
done
# 输出总体执行结果
if [ "$success" = true ]; then
echo -e "${GREEN}所有命令执行成功${NC}"
exit 0
else
echo -e "${RED}部分命令执行失败${NC}"
exit 1
fi
}
# 执行主函数
main "$@"

133
src/monitor/scheduler.py Normal file
View File

@ -0,0 +1,133 @@
import subprocess
import time
import threading
import logging
from typing import Callable
import signal
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class CommandScheduler:
"""命令调度器:立即执行命令并定时重复执行,退出前再执行一次"""
def __init__(self, command: str, interval: int = 1800, execute_immediately: bool = True):
"""
初始化命令调度器
参数:
command: 要执行的Shell命令
interval: 执行间隔(秒)默认30分钟
execute_immediately: 是否立即执行命令默认True
"""
self.command = command
self.interval = interval
self.execute_immediately = execute_immediately
self._stop_event = threading.Event()
self._timer_thread = None
self._should_execute_on_exit = True # 控制退出前是否执行命令
def run_periodically(self) -> None:
"""启动周期性执行命令"""
def _execute_command():
# 立即执行命令(如果设置了)
if self.execute_immediately:
self._run_command()
# 然后按间隔周期性执行
while not self._stop_event.wait(self.interval):
self._run_command()
self._timer_thread = threading.Thread(target=_execute_command, daemon=True)
self._timer_thread.start()
logger.info(f"定时任务已启动,每 {self.interval} 秒执行一次")
def _run_command(self) -> None:
"""执行命令的辅助方法"""
try:
logger.info(f"开始执行命令: {self.command}")
result = subprocess.run(
self.command,
shell=True,
capture_output=True,
text=True
)
if result.returncode == 0:
logger.info("命令执行成功")
logger.debug(f"标准输出: {result.stdout}")
else:
logger.error(f"命令执行失败,返回码: {result.returncode}")
logger.error(f"错误输出: {result.stderr}")
except Exception as e:
logger.error(f"执行命令时发生异常: {e}")
def stop(self, execute_on_exit: bool = True) -> None:
"""
停止定时任务
参数:
execute_on_exit: 退出前是否执行一次命令默认True
"""
self._should_execute_on_exit = execute_on_exit
# 设置停止事件
self._stop_event.set()
# 等待定时器线程结束
if self._timer_thread:
self._timer_thread.join(timeout=5)
# 在退出前执行一次命令
if self._should_execute_on_exit:
logger.info("执行退出前的最后一次命令...")
self._run_command()
logger.info("定时任务已停止")
def main():
# 要执行的Shell命令示例
shell_command = "cd ~/shell; ./run.sh"
# 创建命令调度器30分钟执行一次立即执行首次
scheduler = CommandScheduler(
command=shell_command,
interval=1800,
execute_immediately=True
)
# 注册信号处理函数
def signal_handler(signum, frame):
logger.info(f"接收到信号 {signum},准备退出...")
scheduler.stop() # 停止时执行最后一次命令
raise SystemExit(0)
# 捕获常见的终止信号
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # 普通终止信号
scheduler.run_periodically()
try:
logger.info("主程序运行中按Ctrl+C退出...")
while True:
time.sleep(60) # 主程序保持运行
except SystemExit:
# 正常退出,已在信号处理函数中完成清理
pass
except Exception as e:
logger.error(f"发生异常: {e}")
scheduler.stop(execute_on_exit=False) # 异常时不执行最后一次命令
finally:
# 确保资源被释放
if not scheduler._stop_event.is_set():
scheduler.stop(execute_on_exit=False)
if __name__ == "__main__":
main()