import sqlite3 import os import logging import json from datetime import datetime import argparse import re # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('./result/rename_files.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) def get_performers(conn, scene_id): """获取场景对应的演员列表(按字母序排序,逗号分隔)""" try: cursor = conn.cursor() # 优化查询:使用JOIN一次性获取所需数据 query = """ SELECT p.name FROM performers p JOIN performers_scenes ps ON p.id = ps.performer_id WHERE ps.scene_id = ? ORDER BY p.name """ cursor.execute(query, (scene_id,)) results = cursor.fetchall() return ','.join([row[0] for row in results]) except sqlite3.Error as e: logger.error(f"获取演员信息失败 (scene_id={scene_id}): {str(e)}") raise def get_file_info(conn, file_id): """获取文件信息(ID、原始文件名、父目录ID)""" try: cursor = conn.cursor() cursor.execute(""" SELECT id, basename, parent_folder_id FROM files WHERE id = ? """, (file_id,)) result = cursor.fetchone() if not result: raise ValueError(f"未找到文件信息 (file_id={file_id})") return { 'id': result[0], 'basename': result[1], 'parent_folder_id': result[2] } except sqlite3.Error as e: logger.error(f"获取文件信息失败 (file_id={file_id}): {str(e)}") raise def get_folder_path(conn, folder_id): """获取文件夹路径""" try: cursor = conn.cursor() cursor.execute("SELECT path FROM folders WHERE id = ?", (folder_id,)) result = cursor.fetchone() if not result: raise ValueError(f"未找到文件夹路径 (folder_id={folder_id})") return result[0] except sqlite3.Error as e: logger.error(f"获取文件夹路径失败 (folder_id={folder_id}): {str(e)}") raise def get_scene_info(conn, scene_id): """获取场景信息(标题、日期、工作室ID)""" try: cursor = conn.cursor() cursor.execute(""" SELECT title, date as release_date, studio_id FROM scenes WHERE id = ? """, (scene_id,)) result = cursor.fetchone() if not result: raise ValueError(f"未找到场景信息 (scene_id={scene_id})") return { 'title': result[0], 'release_date': result[1], 'studio_id': result[2] } except sqlite3.Error as e: logger.error(f"获取场景信息失败 (scene_id={scene_id}): {str(e)}") raise def get_studio_name(conn, studio_id): """获取工作室名称""" try: cursor = conn.cursor() cursor.execute("SELECT name FROM studios WHERE id = ?", (studio_id,)) result = cursor.fetchone() if not result: logger.warning(f"未找到工作室信息 (studio_id={studio_id}),使用默认名称") return "UnknownStudio" return result[0] except sqlite3.Error as e: logger.error(f"获取工作室信息失败 (studio_id={studio_id}): {str(e)}") raise def parse_date(date_str): """解析日期为yyyy.mm.dd格式""" if not date_str: return "0000.00.00" # 尝试多种常见日期格式 date_formats = [ "%Y-%m-%d", "%Y/%m/%d", "%d-%m-%Y", "%d/%m/%Y", "%Y%m%d", "%m-%d-%Y", "%m/%d/%Y" ] for fmt in date_formats: try: date_obj = datetime.strptime(date_str, fmt) return date_obj.strftime("%Y.%m.%d") except ValueError: continue logger.warning(f"无法解析日期格式: {date_str},使用默认值") return "0000.00.00" def get_file_extension(basename): """获取文件扩展名""" if '.' in basename: return basename.split('.')[-1].lower() return '' def sanitize_filename(name): """清理文件名中的非法字符""" invalid_chars = '/\\:*?"<>|' for char in invalid_chars: name = name.replace(char, '-') return name def process_scene_files(conn, mode, prefix): """处理所有场景文件映射关系""" results = [] try: cursor = conn.cursor() # 获取所有场景-文件映射关系 cursor.execute("SELECT scene_id, file_id FROM scenes_files") mappings = cursor.fetchall() logger.debug(f"共找到 {len(mappings)} 条场景-文件映射记录") for idx, (scene_id, file_id) in enumerate(mappings, 1): logger.debug(f"处理第 {idx}/{len(mappings)} 条记录 (scene_id={scene_id}, file_id={file_id})") try: # 1. 获取文件信息 file_info = get_file_info(conn, file_id) original_basename = file_info['basename'] parent_folder_id = file_info['parent_folder_id'] # 2.获取文件夹路径 folder_path = get_folder_path(conn, parent_folder_id) # 3. 获取演员信息 performers = get_performers(conn, scene_id) if not performers: performers = "UnknownPerformers" logger.warning(f"场景 {scene_id} 未找到演员信息,跳过") continue # 4. 获取场景和工作室信息 scene_info = get_scene_info(conn, scene_id) if not scene_info['title'] or not scene_info['release_date'] or not scene_info['studio_id']: logger.warning(f"场景 {scene_id} 信息不完整,跳过") continue title = scene_info['title'] or "Untitled" release_date = parse_date(scene_info['release_date']) studio_name = get_studio_name(conn, scene_info['studio_id']) # 5. 构建新文件名 ext = get_file_extension(original_basename) sanitized_studio = sanitize_filename(studio_name) sanitized_performers = sanitize_filename(performers)[0:100] # 限制长度避免过长 sanitized_title = sanitize_filename(title)[0:100] # 限制长度避免过长 if ext: new_basename = f"{sanitized_studio} - {release_date} - {sanitized_performers} - {sanitized_title}.{ext}" else: new_basename = f"{sanitized_studio} - {release_date} - {sanitized_performers} - {sanitized_title}" if len(new_basename) > 254: logger.warning(f"生成的文件名过长,跳过 (file_id={file_id}): {new_basename}") continue # 构建完整路径 original_path = os.path.join(folder_path, original_basename) new_path = os.path.join(folder_path, new_basename) # 记录结果 result = { 'file_id': file_id, 'scene_id': scene_id, 'original_name': original_path, 'dest_name': new_path } results.append(result) # 输出检查信息 logger.info(f"准备重命名: {original_path} -> {new_path}") # 在运行模式下执行操作 if mode == 'run': # 检查文件是否存在 if not os.path.exists(original_path): logger.warning(f"文件不存在,跳过: {original_path}") continue # 执行文件重命名 if original_path != new_path: os.rename(original_path, new_path) logger.info(f"已重命名: {original_path} -> {new_path}") # 更新数据库记录 cursor.execute( "UPDATE files SET basename = ? WHERE id = ?", (new_basename, file_id) ) conn.commit() logger.info(f"已更新数据库记录 (file_id={file_id})") except Exception as e: logger.error(f"处理记录失败 (scene_id={scene_id}, file_id={file_id}): {str(e)}", exc_info=True) # 回滚当前事务(如果是运行模式) if mode == 'run': conn.rollback() continue # 保存结果到文件 with open('./result/rename_results.json', 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=2) logger.info(f"处理完成,结果已保存到 rename_results.json") return results except sqlite3.Error as e: logger.error(f"数据库操作失败: {str(e)}", exc_info=True) if mode == 'run': conn.rollback() raise finally: if mode == 'run': conn.commit() def main(): # 解析命令行参数 parser = argparse.ArgumentParser(description='电影文件重命名工具') parser.add_argument('--mode', choices=['check', 'run'], default='check', help='运行模式: check(检查) 或 run(执行)') parser.add_argument('--db', default='movies.db', help='SQLite数据库文件路径') parser.add_argument('--prefix', default='', help='目录的前缀,用来匹配') args = parser.parse_args() # 验证数据库文件是否存在 if not os.path.exists(args.db): logger.error(f"数据库文件不存在: {args.db}") return os.makedirs('./result', exist_ok=True) # 连接数据库 conn = None try: conn = sqlite3.connect(args.db) conn.row_factory = sqlite3.Row # 启用行工厂,方便按列名访问 logger.info(f"成功连接到数据库: {args.db}") # 执行处理 process_scene_files(conn, args.mode, args.prefix) except sqlite3.Error as e: logger.error(f"数据库连接失败: {str(e)}", exc_info=True) finally: if conn: conn.close() logger.info("数据库连接已关闭") if __name__ == "__main__": main()