import sqlite3 import os import logging import json from datetime import datetime import argparse import re res_dir = './result' os.makedirs(res_dir, exist_ok=True) # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(f'{res_dir}/rename_files.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) def preload_folders(conn, prefix): """预加载所有文件夹路径到字典(folder_id -> path)""" sqlstr = "SELECT id, path FROM folders where 1=1 " if prefix and prefix.strip(): sqlstr += f" and path like '%{prefix}%' " try: cursor = conn.cursor() cursor.execute(sqlstr) return {row[0]: row[1] for row in cursor.fetchall()} except sqlite3.Error as e: logger.error(f"预加载文件夹信息失败: {str(e)}") raise def preload_studios(conn): """预加载所有工作室名称到字典(studio_id -> name)""" try: cursor = conn.cursor() cursor.execute("SELECT id, name FROM studios") studios = {row[0]: row[1] for row in cursor.fetchall()} # 补充默认值(未找到的工作室) studios[None] = "UnknownStudio" return studios except sqlite3.Error as e: logger.error(f"预加载工作室信息失败: {str(e)}") raise def get_performers(conn, scene_id): """获取场景对应的演员列表(按字母序排序,逗号分隔)""" try: cursor = conn.cursor() query = """ SELECT p.name FROM performers p JOIN performers_scenes ps ON p.id = ps.performer_id WHERE ps.scene_id = ? ORDER BY p.name """ cursor.execute(query, (scene_id,)) results = cursor.fetchall() return ','.join([row[0] for row in results]) or "UnknownPerformers" except sqlite3.Error as e: logger.error(f"获取演员信息失败 (scene_id={scene_id}): {str(e)}") raise def parse_date(date_str): """解析日期为yyyy.mm.dd格式""" if not date_str: return "0000.00.00" date_formats = [ "%Y-%m-%d", "%Y/%m/%d", "%d-%m-%Y", "%d/%m/%Y", "%Y%m%d", "%m-%d-%Y", "%m/%d/%Y" ] for fmt in date_formats: try: return datetime.strptime(date_str, fmt).strftime("%Y.%m.%d") except ValueError: continue logger.warning(f"无法解析日期格式: {date_str},使用默认值") return "0000.00.00" def get_file_extension(basename): """获取文件扩展名""" if '.' in basename: return basename.split('.')[-1].lower() return '' def sanitize_filename(name): """清理文件名中的非法字符""" invalid_chars = '/\\:*?"<>|' for char in invalid_chars: name = name.replace(char, '-') return name def process_scene_files(conn, mode, prefix): """处理所有场景文件映射关系(优化版:合并查询+预加载缓存)""" results = [] try: # 1. 预加载文件夹和工作室到内存字典(仅2次SQL查询) folders = preload_folders(conn, prefix) studios = preload_studios(conn) logger.info(f"预加载完成 - 文件夹: {len(folders)} 个, 工作室: {len(studios)} 个") # 2. 一次性查询所有关联数据(1次SQL查询替代多次) cursor = conn.cursor() query = """ SELECT sf.scene_id, sf.file_id, f.id AS file_id, f.basename, f.parent_folder_id, s.title, s.date as release_date, s.studio_id, s.code FROM scenes_files sf LEFT JOIN files f ON sf.file_id = f.id LEFT JOIN scenes s ON sf.scene_id = s.id """ cursor.execute(query) mappings = cursor.fetchall() logger.info(f"共找到 {len(mappings)} 条场景-文件映射记录") for idx, row in enumerate(mappings, 1): try: # 解析合并查询的结果 scene_id = row[0] file_id = row[1] file_info = { 'id': row[2], 'basename': row[3], 'parent_folder_id': row[4] } scene_info = { 'title': row[5], 'release_date': row[6], 'studio_id': row[7], 'code': row[8] } # 校验必要数据 if not file_id or not file_info['id'] or not file_info['basename'] or not file_info['parent_folder_id']: logger.debug(f"文件ID信息不完整 (scene_id={scene_id}, file_id={file_id}),跳过") continue if not scene_id or not scene_info['title'] or not scene_info['release_date'] or not scene_info['studio_id']: logger.debug(f"场景信息不完整 (scene_id={scene_id}, file_id={file_id}),跳过") continue # 3. 从内存缓存获取文件夹路径和工作室名称(无SQL查询) folder_path = folders.get(file_info['parent_folder_id']) if not folder_path: logger.debug(f"文件夹ID不存在 (folder_id={file_info['parent_folder_id']}),跳过") continue studio_name = studios.get(scene_info['studio_id']) if not studio_name: logger.debug(f"工作室ID不存在 (studio_id={scene_info['studio_id']}),跳过") continue # 4. 获取演员信息(仍需单独查询,因多对多关联需排序) performers = get_performers(conn, scene_id) # 5. 构建新文件名 original_basename = file_info['basename'] or "unknown_file" ext = get_file_extension(original_basename) release_date = parse_date(scene_info['release_date']) title = scene_info['title'] or "Untitled" # 清理特殊字符 sanitized_studio = sanitize_filename(studio_name) sanitized_performers = sanitize_filename(performers)[0:100] # 限制长度避免过长 sanitized_title = sanitize_filename(title)[0:100] # 限制长度避免过长 if scene_info.get('code'): sanitized_title = f"{sanitized_title} ({scene_info['code']})" # 去掉sanitized_studio的空格,以及' " 等特殊符号 sanitized_studio = re.sub(r'[\'"\s\-_]+', '', sanitized_studio) # 拼接新文件名 if ext: new_basename = f"{sanitized_studio}.{release_date} {sanitized_performers} - {sanitized_title}.{ext}" else: new_basename = f"{sanitized_studio}.{release_date} {sanitized_performers} - {sanitized_title}" # 特殊规则: 路径中带有 FA Pro 的(不区分大小写),新文件名只用 code 和 日期命名 if ("FA Pro" in folder_path or "fa pro" in folder_path.lower()) and scene_info.get('code'): # code 转换成大写 new_code = scene_info['code'].upper() new_basename = f"{new_code}_{release_date}.{ext}" if ext else f"{new_code}_{release_date}" if len(new_basename) > 254: logger.warning(f"生成的文件名过长,跳过 (file_id={file_id}): {new_basename}") continue # 构建完整路径 original_path = os.path.join(folder_path, original_basename) new_path = os.path.join(folder_path, new_basename) if not os.path.exists(original_path): logger.warning(f"文件不存在,跳过: {original_path}") continue if os.path.exists(new_path): logger.warning(f"目标文件已存在,跳过: {new_path}") continue if original_path == new_path: # 文件名未变化 logger.info(f"文件名未变化,跳过 (file_id={file_id}): {original_path}") continue # 记录结果 result = { 'file_id': file_id, 'scene_id': scene_id, 'original_name': original_path, 'dest_name': new_path } results.append(result) logger.info(f"处理第 {idx}/{len(mappings)} 条: {original_path} -> {new_path}") # 运行模式:执行重命名和数据库更新 if mode == 'run': if not os.path.exists(original_path): logger.warning(f"文件不存在,跳过: {original_path}") continue if os.path.exists(new_path): logger.warning(f"目标文件已存在,跳过: {new_path}") continue if original_path != new_path: os.rename(original_path, new_path) #cursor.execute( # "UPDATE files SET basename = ? WHERE id = ?", # (new_basename, file_info['id']) #) #conn.commit() logger.info(f"已更新文件 (file_id={file_info['id']})") except Exception as e: logger.error(f"处理记录失败 (scene_id={scene_id}, file_id={file_id}): {str(e)}", exc_info=True) if mode == 'run': conn.rollback() continue # 保存结果 with open(f'{res_dir}/rename_results.json', 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=2) logger.info(f"处理完成,结果已保存到 rename_results.json") return results except sqlite3.Error as e: logger.error(f"数据库操作失败: {str(e)}", exc_info=True) if mode == 'run': conn.rollback() raise finally: if mode == 'run': conn.commit() def main(): parser = argparse.ArgumentParser(description='电影文件重命名工具(优化版)') parser.add_argument('--mode', choices=['check', 'run'], default='check', help='运行模式: check(检查) 或 run(执行)') parser.add_argument('--db', default='movies.db', help='SQLite数据库文件路径') parser.add_argument('--prefix', default='', help='目录前缀,用来过滤文件路径') args = parser.parse_args() if not os.path.exists(args.db): logger.error(f"数据库文件不存在: {args.db}") return conn = None try: conn = sqlite3.connect(args.db) logger.info(f"成功连接到数据库: {args.db}") process_scene_files(conn, args.mode, args.prefix) except sqlite3.Error as e: logger.error(f"数据库连接失败: {str(e)}", exc_info=True) finally: if conn: conn.close() logger.info("数据库连接已关闭") if __name__ == "__main__": main()