import sqlite3 import pymysql from pymysql.cursors import DictCursor import os import argparse from datetime import datetime import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def preload_folders(conn, prefix): """预加载所有文件夹路径到字典(folder_id -> path)""" sqlstr = "SELECT id, path FROM folders where 1=1 " if prefix and prefix.strip(): sqlstr += f" and path like '%{prefix}%' " try: cursor = conn.cursor() cursor.execute(sqlstr) return {row[0]: row[1] for row in cursor.fetchall()} except sqlite3.Error as e: logger.error(f"预加载文件夹信息失败: {str(e)}") raise def preload_studios(conn): """预加载所有工作室名称到字典(studio_id -> name)""" try: cursor = conn.cursor() cursor.execute("SELECT id, name FROM studios") studios = {row[0]: row[1] for row in cursor.fetchall()} # 补充默认值(未找到的工作室) studios[None] = "UnknownStudio" return studios except sqlite3.Error as e: logger.error(f"预加载工作室信息失败: {str(e)}") raise # 从whisper 同步到stash def sync_whisper_to_stash(whisper_db_path, stash_db_path, dir_prefix, studio_filter, run_mode="check"): # 读取whisper数据库中的数据 try: sqlite_conn = sqlite3.connect(whisper_db_path) sqlite_conn.row_factory = sqlite3.Row cursor_sqlite = sqlite_conn.cursor() sql_str = f""" SELECT episodes.Id as Id, episodes.Title as Title, episodes.AirDate as AirDate, episodes.ExternalId as whisper_code from episodes left join series on episodes.SeriesId = series.Id where series.Title like '%{studio_filter}%' """ cursor_sqlite.execute(sql_str) whisper_rows = cursor_sqlite.fetchall() except sqlite3.Error as e: logger.error(f"读取Whisper数据库失败: {str(e)}") raise logger.info(f"从Whisper数据库读取到 {len(whisper_rows)} 条记录") idx_whisper = {} for row in whisper_rows: idx_whisper[row["whisper_code"]] = row # 连接stash数据库,预加载数据 try: sqlite_conn_stash = sqlite3.connect(stash_db_path) sqlite_conn_stash.row_factory = sqlite3.Row cursor_sqlite_stash = sqlite_conn_stash.cursor() # 1. 预加载文件夹和工作室到内存字典(仅2次SQL查询) folders = preload_folders(sqlite_conn_stash, dir_prefix) studios = preload_studios(sqlite_conn_stash) logger.info(f"预加载完成 - 文件夹: {len(folders)} 个, 工作室: {len(studios)} 个") # 2. 一次性查询所有关联数据(1次SQL查询替代多次) cursor = sqlite_conn_stash.cursor() query = """ SELECT sf.scene_id, sf.file_id, f.id AS file_id, f.basename, f.parent_folder_id, s.title, s.date as release_date, s.studio_id as studio_id, s.code as scene_code FROM scenes_files sf LEFT JOIN files f ON sf.file_id = f.id LEFT JOIN scenes s ON sf.scene_id = s.id """ cursor.execute(query) mappings = cursor.fetchall() logger.info(f"共找到 {len(mappings)} 条场景-文件映射记录") except sqlite3.Error as e: logger.error(f"预加载关联数据失败: {str(e)}") raise matched_cnt = 0 # 遍历mappings的记录,逐个处理 for mapping in mappings: scene_id = mapping['scene_id'] file_id = mapping['file_id'] code = mapping['scene_code'] studio_id = mapping['studio_id'] if code or studio_id: # 已经有code或studio_id,跳过 continue folder_path = folders.get(mapping['parent_folder_id']) if not folder_path: logger.debug(f"文件夹ID不存在 (folder_id={mapping['parent_folder_id']}),跳过") continue basename = mapping['basename'] # 在whisper数据中查找匹配项 matched = False for whisper_code, whisper_row in idx_whisper.items(): if whisper_code and basename and str(whisper_code).lower() in basename.lower(): # 尝试对 basename 进行截取,从匹配到 whisper_code 的位置开始截取,一直遇到非字母和数字的字符为止 start_index = basename.lower().find(str(whisper_code).lower()) end_index = start_index + len(whisper_code) # 向后扩展,直到遇到非字母数字字符 while end_index < len(basename) and basename[end_index].isalnum(): end_index += 1 extracted_code = basename[start_index:end_index] if extracted_code.lower() != str(whisper_code).lower(): # 截取结果与whisper_code不匹配,跳过 continue # 找到匹配项,更新stash数据库 matched = True matched_cnt += 1 if run_mode == "check": logger.info(f"[检查模式] 找到匹配的Whisper记录,场景ID {scene_id}, filename {basename}:code={whisper_row['whisper_code']}, title={whisper_row['Title']}, date={whisper_row['AirDate']}") break # 执行更新操作 new_code = whisper_row['whisper_code'] new_title = whisper_row['Title'] new_date = whisper_row['AirDate'] try: update_cursor = sqlite_conn_stash.cursor() update_cursor.execute(""" UPDATE scenes SET code = ?, title = ?, date = ? WHERE id = ? """, (new_code, new_title, new_date, scene_id)) sqlite_conn_stash.commit() logger.info(f"更新场景ID {scene_id}, filename {basename}:code={new_code}, title={new_title}, date={new_date}") except sqlite3.Error as e: logger.error(f"更新场景ID {scene_id}, filename {basename} 失败: {str(e)}") break # 跳出内层循环,处理下一个mapping if not matched: logger.info(f"未找到匹配的Whisper记录,跳过场景ID {scene_id}, filename {basename}") logger.info(f"处理完成,共匹配并更新 {matched_cnt} 条记录。") # 关闭数据库连接 sqlite_conn_stash.close() sqlite_conn.close() sqlite_config = { 'dev': { 'shared_db_path': "/root/sharedata/sqlite/shared.db", 'whisparr_db_path': "/root/sharedata/sqlite/whisparr2.db", 'stash_db_path': "/root/sharedata/sqlite/stash-go.sqlite" }, 'nas': { 'shared_db_path': "/root/sharedata/sqlite/shared.db", 'whisparr_db_path': "/root/sharedata/sqlite/whisper_db/whisparr2.db", 'stash_db_path': "/root/sharedata/sqlite/stash_db/stash-go.sqlite" } } if __name__ == "__main__": parser = argparse.ArgumentParser( description='update filename from whisper to stash.\n\n', formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument("--dir_prefix", type=str, help=f"file directory prefix", default="") parser.add_argument("--studio", type=str, help=f"studio name filter", default="") parser.add_argument("--env", type=str, help=f"execute enviroment", default="dev") parser.add_argument('--mode', choices=['check', 'run'], default='check', help='运行模式: check(检查) 或 run(执行)') args = parser.parse_args() # get env config current_env = args.env if current_env not in sqlite_config: current_env = 'dev' # 默认使用开发环境配置 if not args.dir_prefix: print("未指定 dir_prefix,程序退出") exit(1) if not args.studio: print("未指定 studio,程序退出") exit(1) sync_whisper_to_stash( sqlite_config[current_env]['whisparr_db_path'], sqlite_config[current_env]['stash_db_path'], args.dir_prefix, args.studio, args.mode ) exit(0)