Files
stock/scripts/iafd/src/sqlite_utils.py
2025-03-04 16:05:47 +08:00

510 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
import json
import config
import utils
import logging
from datetime import datetime
# 连接 SQLite 数据库
DB_PATH = f"{config.global_share_data_dir}/shared.db" # 替换为你的数据库文件
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 获取当前时间
def get_current_time():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 插入演员信息
def insert_or_update_performer(data):
try:
cursor.execute("""
INSERT INTO iafd_performers (href, name, gender, birthday, astrology, birthplace, years_active, ethnicity, nationality, hair_colors,
eye_color, height_str, weight_str, measurements, tattoos, piercings, weight, height, movies_cnt, vixen_cnt,
blacked_cnt, tushy_cnt, x_art_cnt, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now', 'localtime'))
ON CONFLICT(href) DO UPDATE SET
name = excluded.name,
gender = excluded.gender,
birthday = excluded.birthday,
astrology = excluded.astrology,
birthplace = excluded.birthplace,
years_active = excluded.years_active,
ethnicity = excluded.ethnicity,
nationality = excluded.nationality,
hair_colors = excluded.hair_colors,
eye_color = excluded.eye_color,
height_str = excluded.height_str,
weight_str = excluded.weight_str,
measurements = excluded.measurements,
tattoos = excluded.tattoos,
piercings = excluded.piercings,
weight = excluded.weight,
height = excluded.height,
movies_cnt = excluded.movies_cnt,
vixen_cnt = excluded.vixen_cnt,
blacked_cnt = excluded.blacked_cnt,
tushy_cnt = excluded.tushy_cnt,
x_art_cnt = excluded.x_art_cnt,
updated_at = datetime('now', 'localtime')
""", (
data["href"], data["person"], data.get("gender"), data.get("birthday"), data.get("astrology"), data.get("birthplace"), data.get("years_active"),
data.get("ethnicity"), data.get("nationality"), data.get("hair_colors"), data.get("eye_color"), data.get("height"),
data.get("weight"), data.get("measurements"), data.get("tattoos"), data.get("piercings"), utils.parse_weight(data.get('weight')), utils.parse_height(data.get('height')),
data.get("movies_cnt", 0), data.get("vixen_cnt", 0), data.get("blacked_cnt", 0), data.get("tushy_cnt", 0), data.get("x_art_cnt", 0)
))
# 获取 performer_id
cursor.execute("SELECT id FROM iafd_performers WHERE href = ?", (data["href"],))
performer_id = cursor.fetchone()[0]
# 删除旧的 alias
cursor.execute("DELETE FROM iafd_performer_aliases WHERE performer_id = ?", (performer_id,))
# 插入新的 alias
#for alias in data.get("performer_aka", []):
for alias in data.get("performer_aka") or []:
if alias.lower() != "no known aliases":
cursor.execute("INSERT INTO iafd_performer_aliases (performer_id, alias) VALUES (?, ?) ON CONFLICT(performer_id, alias) DO NOTHING ", (performer_id, alias))
conn.commit()
logging.debug(f"成功插入/更新演员: {data['person']}")
return performer_id
except sqlite3.Error as e:
conn.rollback()
logging.error(f"数据库错误: {e}")
return None
except Exception as e:
conn.rollback()
logging.error(f"未知错误: {e}")
return None
# 按 id 或 href 删除演员
def delete_performer(identifier):
try:
if isinstance(identifier, int):
cursor.execute("DELETE FROM iafd_performers WHERE id = ?", (identifier,))
elif isinstance(identifier, str):
cursor.execute("DELETE FROM iafd_performers WHERE href = ?", (identifier,))
else:
logging.warning("无效的删除参数")
return
conn.commit()
logging.info(f"成功删除演员: {identifier}")
except sqlite3.Error as e:
conn.rollback()
logging.error(f"删除失败: {e}")
# 按 id、href 或 name 查询演员信息
def query_performer(identifier):
try:
if isinstance(identifier, int):
cursor.execute("SELECT * FROM iafd_performers WHERE id = ?", (identifier,))
elif "http" in identifier:
cursor.execute("SELECT * FROM iafd_performers WHERE href = ?", (identifier,))
else:
cursor.execute("SELECT * FROM iafd_performers WHERE name LIKE ?", (f"%{identifier}%",))
performer = cursor.fetchone()
if performer:
cursor.execute("SELECT alias FROM iafd_performer_aliases WHERE performer_id = ?", (performer[0],))
aliases = [row[0] for row in cursor.fetchall()]
result = dict(zip([desc[0] for desc in cursor.description], performer))
result["performer_aka"] = aliases
return result
else:
logging.warning(f"未找到演员: {identifier}")
return None
except sqlite3.Error as e:
logging.error(f"查询失败: {e}")
return None
# 按条件查询 href 列表
def query_performer_hrefs(**filters):
try:
sql = "SELECT href FROM iafd_performers WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "href" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "name" in filters:
sql += " AND name LIKE ?"
params.append(f"%{filters['name']}%")
cursor.execute(sql, params)
return [row[0].lower() for row in cursor.fetchall()] # 返回小写
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 插入或更新发行商 """
def insert_or_update_distributor(data):
try:
cursor.execute("""
INSERT INTO iafd_distributors (name, href, updated_at)
VALUES (?, ? , datetime('now', 'localtime'))
ON CONFLICT(href) DO UPDATE SET
name = excluded.name,
updated_at = datetime('now', 'localtime')
""", (data["name"], data["href"]))
conn.commit()
# 获取 performer_id
cursor.execute("SELECT id FROM iafd_distributors WHERE href = ?", (data["href"],))
dist_id = cursor.fetchone()[0]
if dist_id:
logging.debug(f"成功插入/更新发行商: {data['name']}")
return dist_id
else:
return None
except sqlite3.Error as e:
conn.rollback()
logging.error(f"数据库错误: {e}")
return None
# 删除发行商(按 id 或 name """
def delete_distributor(identifier):
try:
if isinstance(identifier, int):
cursor.execute("DELETE FROM iafd_distributors WHERE id = ?", (identifier,))
elif isinstance(identifier, str):
cursor.execute("DELETE FROM iafd_distributors WHERE name = ?", (identifier,))
conn.commit()
logging.info(f"成功删除发行商: {identifier}")
except sqlite3.Error as e:
conn.rollback()
logging.error(f"删除失败: {e}")
# 查询发行商(按 id 或 name """
def query_distributor(identifier):
try:
if isinstance(identifier, int):
cursor.execute("SELECT * FROM iafd_distributors WHERE id = ?", (identifier,))
else:
cursor.execute("SELECT * FROM iafd_distributors WHERE name LIKE ?", (f"%{identifier}%",))
distributor = cursor.fetchone()
if distributor:
return dict(zip([desc[0] for desc in cursor.description], distributor))
else:
logging.warning(f"未找到发行商: {identifier}")
return None
except sqlite3.Error as e:
logging.error(f"查询失败: {e}")
return None
# 按条件查询 href 列表
def query_distributor_hrefs(**filters):
try:
sql = "SELECT href FROM iafd_distributors WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "url" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "name" in filters:
sql += " AND name LIKE ?"
params.append(f"%{filters['name']}%")
cursor.execute(sql, params)
return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# """ 插入或更新制作公司 """
def insert_or_update_studio(data):
try:
cursor.execute("""
INSERT INTO iafd_studios (name, href, updated_at)
VALUES (?, ?, datetime('now', 'localtime'))
ON CONFLICT(href) DO UPDATE SET
name = excluded.name,
updated_at = datetime('now', 'localtime')
""", (data["name"], data["href"]))
conn.commit()
# 获取 performer_id
cursor.execute("SELECT id FROM iafd_studios WHERE href = ?", (data["href"],))
stu_id = cursor.fetchone()[0]
if stu_id:
logging.debug(f"成功插入/更新发行商: {data['name']}")
return stu_id
else:
return None
except sqlite3.Error as e:
conn.rollback()
logging.error(f"数据库错误: {e}")
return None
# """ 删除制作公司(按 id 或 name """
def delete_studio(identifier):
try:
if isinstance(identifier, int):
cursor.execute("DELETE FROM iafd_studios WHERE id = ?", (identifier,))
elif isinstance(identifier, str):
cursor.execute("DELETE FROM iafd_studios WHERE name = ?", (identifier,))
conn.commit()
logging.info(f"成功删除制作公司: {identifier}")
except sqlite3.Error as e:
conn.rollback()
logging.error(f"删除失败: {e}")
# """ 查询制作公司(按 id 或 name """
def query_studio(identifier):
try:
if isinstance(identifier, int):
cursor.execute("SELECT * FROM iafd_studios WHERE id = ?", (identifier,))
else:
cursor.execute("SELECT * FROM iafd_studios WHERE name LIKE ?", (f"%{identifier}%",))
studio = cursor.fetchone()
if studio:
return dict(zip([desc[0] for desc in cursor.description], studio))
else:
logging.warning(f"未找到制作公司: {identifier}")
return None
except sqlite3.Error as e:
logging.error(f"查询失败: {e}")
return None
# 按条件查询 href 列表
def query_studio_hrefs(**filters):
try:
sql = "SELECT href FROM iafd_studios WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "href" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "name" in filters:
sql += " AND name LIKE ?"
params.append(f"%{filters['name']}%")
cursor.execute(sql, params)
return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# """从指定表中通过 href 查找 id"""
def get_id_by_href(table: str, href: str) -> int:
cursor.execute(f"SELECT id FROM {table} WHERE href = ?", (href,))
row = cursor.fetchone()
return row[0] if row else None
# """插入或更新电影数据"""
def insert_or_update_movie(movie_data):
try:
# 获取相关 ID
distributor_id = get_id_by_href('iafd_distributors', movie_data['DistributorHref'])
studio_id = get_id_by_href('iafd_studios', movie_data['StudioHref'])
director_id = get_id_by_href('iafd_performers', movie_data['DirectorHref'])
# 插入或更新电影信息
cursor.execute(
"""
INSERT INTO iafd_movies (title, minutes, distributor_id, studio_id, release_date, added_to_IAFD_date,
all_girl, all_male, compilation, webscene, director_id, href, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now', 'localtime'))
ON CONFLICT(href) DO UPDATE SET
title=excluded.title, minutes=excluded.minutes, distributor_id=excluded.distributor_id,
studio_id=excluded.studio_id, release_date=excluded.release_date,
added_to_IAFD_date=excluded.added_to_IAFD_date, all_girl=excluded.all_girl,
all_male=excluded.all_male, compilation=excluded.compilation, webscene=excluded.webscene,
director_id=excluded.director_id, updated_at = datetime('now', 'localtime')
""",
(movie_data['title'], movie_data['Minutes'], distributor_id, studio_id, movie_data['ReleaseDate'],
movie_data['AddedtoIAFDDate'], movie_data['All-Girl'], movie_data['All-Male'],
movie_data['Compilation'], movie_data['Webscene'], director_id, movie_data['href'])
)
conn.commit()
logging.debug("Movie inserted/updated: %s", movie_data['title'])
# 获取插入的 movie_id
cursor.execute("SELECT id FROM iafd_movies WHERE href = ?", (movie_data['href'],))
movie_id = cursor.fetchone()[0]
# 插入 performers_movies 关系表
for performer in movie_data.get('Performers', []):
performer_id = get_id_by_href('iafd_performers', performer['href'])
if performer_id:
notes = '|'.join(performer['tags'])
cursor.execute(
"""
INSERT INTO iafd_performers_movies (performer_id, movie_id, role, notes)
VALUES (?, ?, ?, ?)
ON CONFLICT(movie_id, performer_id) DO UPDATE SET notes=excluded.notes
""",
(performer_id, movie_id, "Actor", notes)
)
logging.debug(f"Performers {performer['href']} linked to movie: %s", movie_data['title'])
else:
logging.warning(f'missing performer, url {performer['href']}, in movie: ({movie_data['title']}) {movie_data['href']}')
# 插入 movies_appers_in 表
for appears in movie_data.get("AppearsIn", []):
appears_in_id = get_id_by_href('iafd_movies', appears['href'])
if appears_in_id:
appears_in_id = appears_in_id[0]
cursor.execute("""
INSERT INTO iafd_movies_appers_in (movie_id, appears_in_id, gradation, notes)
VALUES (?, ?, ?, ?)
ON CONFLICT(movie_id, appears_in_id) DO NOTHING
""", (movie_id, appears_in_id, 1, appears["title"]))
else:
logging.warning(f'missing AppearsIn movie in movies table, parent_url {appears['href']}, in movie: ({movie_data['title']}) {movie_data['href']}')
conn.commit()
return movie_id
except Exception as e:
conn.rollback()
logging.error("Error inserting movie: %s", e)
return None
# 删除电影数据"""
def delete_movie(identifier):
try:
if isinstance(identifier, int):
cursor.execute("DELETE FROM iafd_movies WHERE id = ?", (identifier,))
elif isinstance(identifier, str):
cursor.execute("DELETE FROM iafd_movies WHERE href = ?", (identifier,))
else:
logging.warning("无效的删除参数")
return
conn.commit()
logging.info(f"Deleted movie with {identifier}")
except sqlite3.Error as e:
conn.rollback()
logging.error("Error deleting movie: %s", e)
# 查找电影数据"""
def query_movies(identifier):
try:
if isinstance(identifier, int):
cursor.execute("SELECT * FROM iafd_movies WHERE id = ?", (identifier,))
elif "http" in identifier:
cursor.execute("SELECT * FROM iafd_movies WHERE href = ?", (identifier,))
else:
cursor.execute("SELECT * FROM iafd_movies WHERE title LIKE ?", (f"%{identifier}%",))
movie = cursor.fetchone()
if movie:
cursor.execute("SELECT * FROM iafd_performers_movies WHERE performer_id = ?", (movie[0],))
performers = [row[0] for row in cursor.fetchall()]
result = dict(zip([desc[0] for desc in cursor.description], performers))
result["performers"] = performers
return result
else:
logging.warning(f"find no data: {identifier}")
return None
except sqlite3.Error as e:
logging.error(f"查询失败: {e}")
return None
# 按条件查询 href 列表
def query_movie_hrefs(**filters):
try:
sql = "SELECT href FROM iafd_movies WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "href" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "title" in filters:
sql += " AND title LIKE ?"
params.append(f"%{filters['title']}%")
cursor.execute(sql, params)
return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return []
# 插入一条任务日志
def insert_task_log():
try:
cursor.execute("""
INSERT INTO iafd_task_log (task_status) VALUES ('Start')
""")
conn.commit()
return cursor.lastrowid # 获取插入的 task_id
except sqlite3.Error as e:
logging.error(f"插入任务失败: {e}")
return None
# 更新任务日志的字段
def update_task_log(task_id, **kwargs):
try:
fields = ", ".join(f"{key} = ?" for key in kwargs.keys())
params = list(kwargs.values()) + [task_id]
sql = f"UPDATE iafd_task_log SET {fields}, updated_at = datetime('now', 'localtime') WHERE task_id = ?"
cursor.execute(sql, params)
conn.commit()
except sqlite3.Error as e:
logging.error(f"更新任务 {task_id} 失败: {e}")
# 任务结束,更新字段
def finalize_task_log(task_id):
try:
# 获取 performers、studios 等表的最终行数
cursor.execute("SELECT COUNT(*) FROM iafd_performers")
after_performers = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_movies")
after_movies = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_distributors")
after_distributors = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_studios")
after_studios = cursor.fetchone()[0]
# 更新 task_log
update_task_log(task_id,
after_performers=after_performers,
after_movies=after_movies,
after_distributors=after_distributors,
after_studios=after_studios,
task_status="Success")
except sqlite3.Error as e:
logging.error(f"任务 {task_id} 结束失败: {e}")
if __name__ == "__main__":
try:
with open('../result/detail.json', 'r') as file:
performers = json.load(file)
for performer in performers:
insert_or_update_performer(performer)
print(query_performer("Kirsten"))
#delete_performer("https://www.iafd.com/person.rme/id=ca699282-1b57-4ce7-9bcc-d7799a292e34")
print(query_performer_hrefs())
except FileNotFoundError:
logging.info("detail.json not found, starting fresh.")