modify scripts

This commit is contained in:
2025-12-22 09:34:48 +08:00
parent 13bc218631
commit f7d8b790fa

View File

@ -3,6 +3,7 @@ import pymysql
from pymysql.cursors import DictCursor
import os
import argparse
from datetime import datetime
def sync_sqlite_table_to_mysql(sqlite_path, sqlite_table, mysql_conn, mysql_table):
@ -107,6 +108,180 @@ def sync_sqlite_table_to_mysql(sqlite_path, sqlite_table, mysql_conn, mysql_tabl
print(f"同步完成:{sqlite_table}{mysql_table}, 共 {rows_count} 条记录")
# 拼接用于匹配的键值
def parse_union_key(date_str, name_str, code_str):
"""
解析用于匹配的键值,去除空格和特殊字符
"""
name = name_str.lower() if name_str else ""
name = ''.join(e for e in name if e.isalnum())
#return f"{date_str}_{name}_{code_str}"
return f"{date_str}_{name}"
def generate_union_table(mysql_conn):
"""
从数据库表中直接读取数据,在内存中进行匹配计算,生成合并结果并写入新表
"""
try:
cursor = mysql_conn.cursor()
# 1. 创建结果表(如果不存在)
create_table_sql = """
CREATE TABLE IF NOT EXISTS resources.union_stash_whisper (
ID int AUTO_INCREMENT,
whisper_id INT,
release_year INT UNSIGNED,
title TEXT,
release_date TEXT,
whisper_code TEXT,
studio_name TEXT,
date TEXT,
code TEXT,
stash_title TEXT,
name TEXT,
PRIMARY KEY (ID)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
"""
cursor.execute(create_table_sql)
# 2. 清空现有数据
cursor.execute("TRUNCATE TABLE resources.union_stash_whisper;")
# 3. 读取所需表数据
# 读取sync_whisper_episodes和sync_whisper_series并关联原view_wishper_all逻辑
cursor.execute("""
SELECT
swe.Id,
swe.SeasonNumber AS release_year,
swe.AirDate AS release_date,
swe.Title AS title,
swe.ExternalId AS whisper_code,
sws.Title AS studio_name
FROM resources.sync_whisper_episodes swe
LEFT JOIN resources.sync_whisper_series sws
ON swe.SeriesId = sws.Id
""")
whisper_data = cursor.fetchall()
# 读取sync_stash_scenes和sync_stash_studios并关联原view_stash_all逻辑
cursor.execute("""
SELECT
s.id,
s.date,
s.code,
s.title AS stash_title,
s2.name
FROM resources.sync_stash_scenes s
LEFT JOIN resources.sync_stash_studios s2
ON s.studio_id = s2.id
WHERE s.title IS NOT NULL
""")
stash_data = cursor.fetchall()
print(f"读取数据完成Whisper 共 {len(whisper_data)}Stash 共 {len(stash_data)}")
# 转换为字典列表以便处理,便于后续匹配
dict_stash_data = {}
for idx, row in enumerate(stash_data):
# 规范化name字段用于匹配
key_str = parse_union_key(row["date"], row["name"], row["code"])
dict_stash_data[key_str] = idx
dict_whisper_data = {}
for idx, row in enumerate(whisper_data):
# 规范化name字段用于匹配
key_str = parse_union_key(row["release_date"], row["studio_name"], row["whisper_code"])
dict_whisper_data[key_str] = idx
# 4. 内存中进行双边匹配
result = []
dict_result_keys = set()
count_matched = 0
count_whisper_only = 0
count_stash_only = 0
# 先处理whisper到stash的匹配原left join逻辑
for w in whisper_data:
matched = False
if w['release_date'] and w['studio_name']:
# 对name进行去空格去特殊字符处理
key_str = parse_union_key(w['release_date'], w['studio_name'], w['whisper_code'])
if key_str in dict_stash_data:
s = stash_data[dict_stash_data[key_str]]
result.append({
'whisper_id': w['Id'],
'release_year': w['release_year'] or 0,
'release_date': w['release_date'],
'whisper_code': w['whisper_code'],
'title': w['title'],
'studio_name': w['studio_name'],
'date': s['date'],
'code': s['code'],
'stash_title': s['stash_title'],
'name': s['name']
})
dict_result_keys.add(key_str)
count_matched += 1
matched = True
if not matched:
result.append({
'whisper_id': w['Id'],
'release_year': w['release_year'] or 0,
'release_date': w['release_date'],
'whisper_code': w['whisper_code'],
'title': w['title'],
'studio_name': w['studio_name'],
'date': None,
'code': None,
'stash_title': None,
'name': None
})
count_whisper_only += 1
# 再处理stash到whisper的反向匹配新增双边匹配逻辑
for s in stash_data:
matched = False
if s['date'] and s['name']:
# 对name进行去空格去特殊字符处理
key_str = parse_union_key(s['date'], s['name'], s['code'])
if key_str in dict_whisper_data:
matched = True
if not matched:
result.append({
'whisper_id': 0,
'release_year': 0,
'release_date': None,
'whisper_code': None,
'title': None,
'studio_name': None,
'date': s['date'],
'code': s['code'],
'stash_title': s['stash_title'],
'name': s['name']
})
count_stash_only += 1
print(f"匹配完成:匹配成功 {count_matched}Whisper 独有 {count_whisper_only}Stash 独有 {count_stash_only}")
# 5. 将结果写入数据库
if result:
columns = ', '.join(result[0].keys())
placeholders = ', '.join(['%s'] * len(result[0]))
insert_sql = f"INSERT INTO resources.union_stash_whisper ({columns}) VALUES ({placeholders})"
# 转换数据为元组列表
data_values = [tuple(item.values()) for item in result]
# 批量插入
cursor.executemany(insert_sql, data_values)
mysql_conn.commit()
print(f"成功插入 {cursor.rowcount} 条记录")
except pymysql.MySQLError as e:
print(f"数据库错误: {e}")
if mysql_conn:
mysql_conn.rollback()
# 示例 MySQL 配置
mysql_config = {
'dev': {
@ -165,6 +340,13 @@ if __name__ == "__main__":
cursorclass=DictCursor
)
# generate union table
print("\n\n开始生成合并表 union_stash_whisper ...")
generate_union_table(mysql_conn)
print("\n合并表生成完成。")
exit(0)
# parse source to list
source_list = [s.strip().lower() for s in args.source.split(",")]
@ -223,3 +405,4 @@ if __name__ == "__main__":
mysql_conn=mysql_conn,
mysql_table="sync_stash_scenes"
)