Files
devops/docker/stash/scripts/format_filename.py
2025-11-13 08:34:28 +08:00

288 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
import os
import logging
import json
from datetime import datetime
import argparse
import re
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('./result/rename_files.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def get_performers(conn, scene_id):
"""获取场景对应的演员列表(按字母序排序,逗号分隔)"""
try:
cursor = conn.cursor()
# 优化查询使用JOIN一次性获取所需数据
query = """
SELECT p.name
FROM performers p
JOIN performers_scenes ps ON p.id = ps.performer_id
WHERE ps.scene_id = ?
ORDER BY p.name
"""
cursor.execute(query, (scene_id,))
results = cursor.fetchall()
return ','.join([row[0] for row in results])
except sqlite3.Error as e:
logger.error(f"获取演员信息失败 (scene_id={scene_id}): {str(e)}")
raise
def get_file_info(conn, file_id):
"""获取文件信息ID、原始文件名、父目录ID"""
try:
cursor = conn.cursor()
cursor.execute("""
SELECT id, basename, parent_folder_id
FROM files
WHERE id = ?
""", (file_id,))
result = cursor.fetchone()
if not result:
raise ValueError(f"未找到文件信息 (file_id={file_id})")
return {
'id': result[0],
'basename': result[1],
'parent_folder_id': result[2]
}
except sqlite3.Error as e:
logger.error(f"获取文件信息失败 (file_id={file_id}): {str(e)}")
raise
def get_folder_path(conn, folder_id):
"""获取文件夹路径"""
try:
cursor = conn.cursor()
cursor.execute("SELECT path FROM folders WHERE id = ?", (folder_id,))
result = cursor.fetchone()
if not result:
raise ValueError(f"未找到文件夹路径 (folder_id={folder_id})")
return result[0]
except sqlite3.Error as e:
logger.error(f"获取文件夹路径失败 (folder_id={folder_id}): {str(e)}")
raise
def get_scene_info(conn, scene_id):
"""获取场景信息标题、日期、工作室ID"""
try:
cursor = conn.cursor()
cursor.execute("""
SELECT title, date as release_date, studio_id
FROM scenes
WHERE id = ?
""", (scene_id,))
result = cursor.fetchone()
if not result:
raise ValueError(f"未找到场景信息 (scene_id={scene_id})")
return {
'title': result[0],
'release_date': result[1],
'studio_id': result[2]
}
except sqlite3.Error as e:
logger.error(f"获取场景信息失败 (scene_id={scene_id}): {str(e)}")
raise
def get_studio_name(conn, studio_id):
"""获取工作室名称"""
try:
cursor = conn.cursor()
cursor.execute("SELECT name FROM studios WHERE id = ?", (studio_id,))
result = cursor.fetchone()
if not result:
logger.warning(f"未找到工作室信息 (studio_id={studio_id}),使用默认名称")
return "UnknownStudio"
return result[0]
except sqlite3.Error as e:
logger.error(f"获取工作室信息失败 (studio_id={studio_id}): {str(e)}")
raise
def parse_date(date_str):
"""解析日期为yyyy.mm.dd格式"""
if not date_str:
return "0000.00.00"
# 尝试多种常见日期格式
date_formats = [
"%Y-%m-%d", "%Y/%m/%d", "%d-%m-%Y", "%d/%m/%Y",
"%Y%m%d", "%m-%d-%Y", "%m/%d/%Y"
]
for fmt in date_formats:
try:
date_obj = datetime.strptime(date_str, fmt)
return date_obj.strftime("%Y.%m.%d")
except ValueError:
continue
logger.warning(f"无法解析日期格式: {date_str},使用默认值")
return "0000.00.00"
def get_file_extension(basename):
"""获取文件扩展名"""
if '.' in basename:
return basename.split('.')[-1].lower()
return ''
def sanitize_filename(name):
"""清理文件名中的非法字符"""
invalid_chars = '/\\:*?"<>|'
for char in invalid_chars:
name = name.replace(char, '-')
return name
def process_scene_files(conn, mode, prefix):
"""处理所有场景文件映射关系"""
results = []
try:
cursor = conn.cursor()
# 获取所有场景-文件映射关系
cursor.execute("SELECT scene_id, file_id FROM scenes_files")
mappings = cursor.fetchall()
logger.debug(f"共找到 {len(mappings)} 条场景-文件映射记录")
for idx, (scene_id, file_id) in enumerate(mappings, 1):
logger.debug(f"处理第 {idx}/{len(mappings)} 条记录 (scene_id={scene_id}, file_id={file_id})")
try:
# 1. 获取文件信息
file_info = get_file_info(conn, file_id)
original_basename = file_info['basename']
parent_folder_id = file_info['parent_folder_id']
# 2.获取文件夹路径
folder_path = get_folder_path(conn, parent_folder_id)
# 3. 获取演员信息
performers = get_performers(conn, scene_id)
if not performers:
performers = "UnknownPerformers"
logger.warning(f"场景 {scene_id} 未找到演员信息,跳过")
continue
# 4. 获取场景和工作室信息
scene_info = get_scene_info(conn, scene_id)
if not scene_info['title'] or not scene_info['release_date'] or not scene_info['studio_id']:
logger.warning(f"场景 {scene_id} 信息不完整,跳过")
continue
title = scene_info['title'] or "Untitled"
release_date = parse_date(scene_info['release_date'])
studio_name = get_studio_name(conn, scene_info['studio_id'])
# 5. 构建新文件名
ext = get_file_extension(original_basename)
sanitized_studio = sanitize_filename(studio_name)
sanitized_performers = sanitize_filename(performers)[0:100] # 限制长度避免过长
sanitized_title = sanitize_filename(title)[0:100] # 限制长度避免过长
if ext:
new_basename = f"{sanitized_studio} - {release_date} - {sanitized_performers} - {sanitized_title}.{ext}"
else:
new_basename = f"{sanitized_studio} - {release_date} - {sanitized_performers} - {sanitized_title}"
if len(new_basename) > 254:
logger.warning(f"生成的文件名过长,跳过 (file_id={file_id}): {new_basename}")
continue
# 构建完整路径
original_path = os.path.join(folder_path, original_basename)
new_path = os.path.join(folder_path, new_basename)
# 记录结果
result = {
'file_id': file_id,
'scene_id': scene_id,
'original_name': original_path,
'dest_name': new_path
}
results.append(result)
# 输出检查信息
logger.info(f"准备重命名: {original_path} -> {new_path}")
# 在运行模式下执行操作
if mode == 'run':
# 检查文件是否存在
if not os.path.exists(original_path):
logger.warning(f"文件不存在,跳过: {original_path}")
continue
# 执行文件重命名
if original_path != new_path:
os.rename(original_path, new_path)
logger.info(f"已重命名: {original_path} -> {new_path}")
# 更新数据库记录
cursor.execute(
"UPDATE files SET basename = ? WHERE id = ?",
(new_basename, file_id)
)
conn.commit()
logger.info(f"已更新数据库记录 (file_id={file_id})")
except Exception as e:
logger.error(f"处理记录失败 (scene_id={scene_id}, file_id={file_id}): {str(e)}", exc_info=True)
# 回滚当前事务(如果是运行模式)
if mode == 'run':
conn.rollback()
continue
# 保存结果到文件
with open('./result/rename_results.json', 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
logger.info(f"处理完成,结果已保存到 rename_results.json")
return results
except sqlite3.Error as e:
logger.error(f"数据库操作失败: {str(e)}", exc_info=True)
if mode == 'run':
conn.rollback()
raise
finally:
if mode == 'run':
conn.commit()
def main():
# 解析命令行参数
parser = argparse.ArgumentParser(description='电影文件重命名工具')
parser.add_argument('--mode', choices=['check', 'run'], default='check',
help='运行模式: check(检查) 或 run(执行)')
parser.add_argument('--db', default='movies.db', help='SQLite数据库文件路径')
parser.add_argument('--prefix', default='', help='目录的前缀,用来匹配')
args = parser.parse_args()
# 验证数据库文件是否存在
if not os.path.exists(args.db):
logger.error(f"数据库文件不存在: {args.db}")
return
os.makedirs('./result', exist_ok=True)
# 连接数据库
conn = None
try:
conn = sqlite3.connect(args.db)
conn.row_factory = sqlite3.Row # 启用行工厂,方便按列名访问
logger.info(f"成功连接到数据库: {args.db}")
# 执行处理
process_scene_files(conn, args.mode, args.prefix)
except sqlite3.Error as e:
logger.error(f"数据库连接失败: {str(e)}", exc_info=True)
finally:
if conn:
conn.close()
logger.info("数据库连接已关闭")
if __name__ == "__main__":
main()