diff --git a/src/tools/sync_db.py b/src/tools/sync_db.py index 105fbb3..55390b5 100644 --- a/src/tools/sync_db.py +++ b/src/tools/sync_db.py @@ -3,6 +3,7 @@ import pymysql from pymysql.cursors import DictCursor import os import argparse +from datetime import datetime def sync_sqlite_table_to_mysql(sqlite_path, sqlite_table, mysql_conn, mysql_table): @@ -107,6 +108,180 @@ def sync_sqlite_table_to_mysql(sqlite_path, sqlite_table, mysql_conn, mysql_tabl print(f"同步完成:{sqlite_table} → {mysql_table}, 共 {rows_count} 条记录") +# 拼接用于匹配的键值 +def parse_union_key(date_str, name_str, code_str): + """ + 解析用于匹配的键值,去除空格和特殊字符 + """ + name = name_str.lower() if name_str else "" + name = ''.join(e for e in name if e.isalnum()) + #return f"{date_str}_{name}_{code_str}" + return f"{date_str}_{name}" + +def generate_union_table(mysql_conn): + """ + 从数据库表中直接读取数据,在内存中进行匹配计算,生成合并结果并写入新表 + """ + try: + cursor = mysql_conn.cursor() + + # 1. 创建结果表(如果不存在) + create_table_sql = """ + CREATE TABLE IF NOT EXISTS resources.union_stash_whisper ( + ID int AUTO_INCREMENT, + whisper_id INT, + release_year INT UNSIGNED, + title TEXT, + release_date TEXT, + whisper_code TEXT, + studio_name TEXT, + date TEXT, + code TEXT, + stash_title TEXT, + name TEXT, + PRIMARY KEY (ID) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; + """ + cursor.execute(create_table_sql) + + # 2. 清空现有数据 + cursor.execute("TRUNCATE TABLE resources.union_stash_whisper;") + + # 3. 读取所需表数据 + # 读取sync_whisper_episodes和sync_whisper_series并关联(原view_wishper_all逻辑) + cursor.execute(""" + SELECT + swe.Id, + swe.SeasonNumber AS release_year, + swe.AirDate AS release_date, + swe.Title AS title, + swe.ExternalId AS whisper_code, + sws.Title AS studio_name + FROM resources.sync_whisper_episodes swe + LEFT JOIN resources.sync_whisper_series sws + ON swe.SeriesId = sws.Id + """) + whisper_data = cursor.fetchall() + + # 读取sync_stash_scenes和sync_stash_studios并关联(原view_stash_all逻辑) + cursor.execute(""" + SELECT + s.id, + s.date, + s.code, + s.title AS stash_title, + s2.name + FROM resources.sync_stash_scenes s + LEFT JOIN resources.sync_stash_studios s2 + ON s.studio_id = s2.id + WHERE s.title IS NOT NULL + """) + stash_data = cursor.fetchall() + print(f"读取数据完成:Whisper 共 {len(whisper_data)} 条,Stash 共 {len(stash_data)} 条") + + # 转换为字典列表以便处理,便于后续匹配 + dict_stash_data = {} + for idx, row in enumerate(stash_data): + # 规范化name字段用于匹配 + key_str = parse_union_key(row["date"], row["name"], row["code"]) + dict_stash_data[key_str] = idx + + dict_whisper_data = {} + for idx, row in enumerate(whisper_data): + # 规范化name字段用于匹配 + key_str = parse_union_key(row["release_date"], row["studio_name"], row["whisper_code"]) + dict_whisper_data[key_str] = idx + + # 4. 内存中进行双边匹配 + result = [] + dict_result_keys = set() + count_matched = 0 + count_whisper_only = 0 + count_stash_only = 0 + + # 先处理whisper到stash的匹配(原left join逻辑) + for w in whisper_data: + matched = False + if w['release_date'] and w['studio_name']: + # 对name进行去空格,去特殊字符处理 + key_str = parse_union_key(w['release_date'], w['studio_name'], w['whisper_code']) + if key_str in dict_stash_data: + s = stash_data[dict_stash_data[key_str]] + result.append({ + 'whisper_id': w['Id'], + 'release_year': w['release_year'] or 0, + 'release_date': w['release_date'], + 'whisper_code': w['whisper_code'], + 'title': w['title'], + 'studio_name': w['studio_name'], + 'date': s['date'], + 'code': s['code'], + 'stash_title': s['stash_title'], + 'name': s['name'] + }) + dict_result_keys.add(key_str) + count_matched += 1 + matched = True + if not matched: + result.append({ + 'whisper_id': w['Id'], + 'release_year': w['release_year'] or 0, + 'release_date': w['release_date'], + 'whisper_code': w['whisper_code'], + 'title': w['title'], + 'studio_name': w['studio_name'], + 'date': None, + 'code': None, + 'stash_title': None, + 'name': None + }) + count_whisper_only += 1 + + # 再处理stash到whisper的反向匹配(新增双边匹配逻辑) + for s in stash_data: + matched = False + if s['date'] and s['name']: + # 对name进行去空格,去特殊字符处理 + key_str = parse_union_key(s['date'], s['name'], s['code']) + if key_str in dict_whisper_data: + matched = True + + if not matched: + result.append({ + 'whisper_id': 0, + 'release_year': 0, + 'release_date': None, + 'whisper_code': None, + 'title': None, + 'studio_name': None, + 'date': s['date'], + 'code': s['code'], + 'stash_title': s['stash_title'], + 'name': s['name'] + }) + count_stash_only += 1 + + print(f"匹配完成:匹配成功 {count_matched} 条,Whisper 独有 {count_whisper_only} 条,Stash 独有 {count_stash_only} 条") + + # 5. 将结果写入数据库 + if result: + columns = ', '.join(result[0].keys()) + placeholders = ', '.join(['%s'] * len(result[0])) + insert_sql = f"INSERT INTO resources.union_stash_whisper ({columns}) VALUES ({placeholders})" + + # 转换数据为元组列表 + data_values = [tuple(item.values()) for item in result] + + # 批量插入 + cursor.executemany(insert_sql, data_values) + mysql_conn.commit() + print(f"成功插入 {cursor.rowcount} 条记录") + + except pymysql.MySQLError as e: + print(f"数据库错误: {e}") + if mysql_conn: + mysql_conn.rollback() + # 示例 MySQL 配置 mysql_config = { 'dev': { @@ -165,6 +340,13 @@ if __name__ == "__main__": cursorclass=DictCursor ) + # generate union table + print("\n\n开始生成合并表 union_stash_whisper ...") + generate_union_table(mysql_conn) + print("\n合并表生成完成。") + + exit(0) + # parse source to list source_list = [s.strip().lower() for s in args.source.split(",")] @@ -223,3 +405,4 @@ if __name__ == "__main__": mysql_conn=mysql_conn, mysql_table="sync_stash_scenes" ) +