This repository has been archived on 2026-01-07. You can view files and clone it, but cannot push or open issues or pull requests.
Files
resources/iafd/src/sqlite_utils.py
2025-07-01 10:48:30 +08:00

1053 lines
41 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
import json
import config
import utils
import logging
import sys
from datetime import datetime
# 连接 SQLite 数据库
DB_PATH = f"{config.global_share_data_dir}/sqlite/shared.db" # 替换为你的数据库文件
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 获取当前时间
def get_current_time():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# """从指定表中通过 href 查找 id"""
def get_id_by_href(table: str, href: str) -> int:
if href is None:
return None
cursor.execute(f"SELECT id FROM {table} WHERE href = ?", (href,))
row = cursor.fetchone()
return row[0] if row else None
# 插入演员索引,来自于列表数据
def insert_performer_index(name, href, from_astro_list=None, from_birth_list=None, from_ethnic_list=None, from_movie_list=None):
try:
# **查询是否已存在该演员**
cursor.execute("""
SELECT id, name, from_astro_list, from_birth_list, from_ethnic_list, from_movie_list
FROM iafd_performers WHERE href = ?
""", (href,))
existing_performer = cursor.fetchone()
if existing_performer: # **如果演员已存在**
performer_id, existing_name, existing_astro, existing_birth, existing_ethnic, existing_movie = existing_performer
# **如果没有传入值,则保持原有值**
from_astro_list = from_astro_list if from_astro_list is not None else existing_astro
from_birth_list = from_birth_list if from_birth_list is not None else existing_birth
from_ethnic_list = from_ethnic_list if from_ethnic_list is not None else existing_ethnic
from_movie_list = from_movie_list if from_movie_list is not None else existing_movie
cursor.execute("""
UPDATE iafd_performers
SET name = ?,
from_astro_list = ?,
from_birth_list = ?,
from_ethnic_list = ?,
from_movie_list = ?,
updated_at = datetime('now', 'localtime')
WHERE href = ?
""", (name, from_astro_list, from_birth_list, from_ethnic_list, from_movie_list, href))
else: # **如果演员不存在,插入**
cursor.execute("""
INSERT INTO iafd_performers (href, name, from_astro_list, from_birth_list, from_ethnic_list, from_movie_list)
VALUES (?, ?, COALESCE(?, 0), COALESCE(?, 0), COALESCE(?, 0), COALESCE(?, 0))
""", (href, name, from_astro_list, from_birth_list, from_ethnic_list, from_movie_list))
conn.commit()
performer_id = get_id_by_href('iafd_performers', href)
if performer_id:
logging.debug(f'Inserted/Updated performer index, id: {performer_id}, name: {name}, href: {href}')
return performer_id
except sqlite3.Error as e:
conn.rollback()
logging.error(f"数据库错误: {e}")
return None
except Exception as e:
conn.rollback()
logging.error(f"未知错误: {e}")
return None
# """插入电影索引,来自于列表数据"""
def insert_movie_index(title, href, release_year=0, from_performer_list=None, from_dist_list=None, from_stu_list=None):
try:
# **查询是否已存在该电影**
cursor.execute("""
SELECT id, title, release_year, from_performer_list, from_dist_list, from_stu_list
FROM iafd_movies WHERE href = ?
""", (href,))
existing_movie = cursor.fetchone()
if existing_movie: # **如果电影已存在**
movie_id, existing_title, existing_year, existing_performer, existing_dist, existing_stu = existing_movie
# **如果没有传入值,则保持原有值**
release_year = release_year if release_year != 0 else existing_year
from_performer_list = from_performer_list if from_performer_list is not None else existing_performer
from_dist_list = from_dist_list if from_dist_list is not None else existing_dist
from_stu_list = from_stu_list if from_stu_list is not None else existing_stu
cursor.execute("""
UPDATE iafd_movies
SET title = ?,
release_year = ?,
from_performer_list = ?,
from_dist_list = ?,
from_stu_list = ?,
updated_at = datetime('now', 'localtime')
WHERE href = ?
""", (title, release_year, from_performer_list, from_dist_list, from_stu_list, href))
else: # **如果电影不存在,插入**
cursor.execute("""
INSERT INTO iafd_movies (title, href, release_year, from_performer_list, from_dist_list, from_stu_list)
VALUES (?, ?, ?, COALESCE(?, 0), COALESCE(?, 0), COALESCE(?, 0))
""", (title, href, release_year, from_performer_list, from_dist_list, from_stu_list))
conn.commit()
movie_id = get_id_by_href('iafd_movies', href)
if movie_id:
logging.debug(f'Inserted/Updated movie index, id: {movie_id}, title: {title}, href: {href}')
return movie_id
except sqlite3.Error as e:
conn.rollback()
logging.error(f"数据库错误: {e}")
return None
except Exception as e:
conn.rollback()
logging.error(f"未知错误: {e}")
return None
# 插入演员和电影的关联数据
def insert_performer_movie(performer_id, movie_id, role, notes):
try:
cursor.execute("""
INSERT INTO iafd_performers_movies (performer_id, movie_id, role, notes)
VALUES (?, ?, ?, ?)
ON CONFLICT(movie_id, performer_id) DO UPDATE SET notes=excluded.notes, role=excluded.role
""",
(performer_id, movie_id, role, notes)
)
conn.commit()
#logging.debug(f'insert one performer_movie, performer_id: {performer_id}, movie_id: {movie_id}')
return performer_id
except Exception as e:
conn.rollback()
logging.error("Error inserting movie: %s", e)
return None
# 插入电影和电影的关联数据
def insert_movie_appears_in(movie_id, appears_in_id, gradation=0, notes=''):
try:
cursor.execute("""
INSERT INTO iafd_movies_appers_in (movie_id, appears_in_id, gradation, notes)
VALUES (?, ?, ?, ?)
ON CONFLICT(movie_id, appears_in_id) DO UPDATE SET notes=excluded.notes, gradation=excluded.gradation
""",
(movie_id, appears_in_id, gradation, notes)
)
conn.commit()
#logging.debug(f'insert one movie_appears_in, movie_id: {movie_id}, appears_in_id: {appears_in_id}')
return movie_id
except Exception as e:
conn.rollback()
logging.error("Error inserting movie: %s", e)
return None
# 插入演员信息
def insert_or_update_performer(data):
try:
cursor.execute("""
INSERT INTO iafd_performers (href, name, gender, birthday, astrology, birthplace, years_active, ethnicity, nationality, hair_colors,
eye_color, height_str, weight_str, measurements, tattoos, piercings, weight, height, movies_cnt, vixen_cnt,
blacked_cnt, tushy_cnt, x_art_cnt, is_full_data, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, datetime('now', 'localtime'))
ON CONFLICT(href) DO UPDATE SET
name = excluded.name,
gender = excluded.gender,
birthday = excluded.birthday,
astrology = excluded.astrology,
birthplace = excluded.birthplace,
years_active = excluded.years_active,
ethnicity = excluded.ethnicity,
nationality = excluded.nationality,
hair_colors = excluded.hair_colors,
eye_color = excluded.eye_color,
height_str = excluded.height_str,
weight_str = excluded.weight_str,
measurements = excluded.measurements,
tattoos = excluded.tattoos,
piercings = excluded.piercings,
weight = excluded.weight,
height = excluded.height,
movies_cnt = excluded.movies_cnt,
vixen_cnt = excluded.vixen_cnt,
blacked_cnt = excluded.blacked_cnt,
tushy_cnt = excluded.tushy_cnt,
x_art_cnt = excluded.x_art_cnt,
is_full_data = 1,
updated_at = datetime('now', 'localtime')
""", (
data["href"], data["person"], data.get("gender"), data.get("birthday"), data.get("astrology"), data.get("birthplace"), data.get("years_active"),
data.get("ethnicity"), data.get("nationality"), data.get("hair_colors"), data.get("eye_color"), data.get("height"),
data.get("weight"), data.get("measurements"), data.get("tattoos"), data.get("piercings"), utils.parse_weight(data.get('weight')), utils.parse_height(data.get('height')),
data.get("movies_cnt", 0), data.get("vixen_cnt", 0), data.get("blacked_cnt", 0), data.get("tushy_cnt", 0), data.get("x_art_cnt", 0)
))
# 获取 performer_id
performer_id = get_id_by_href('iafd_performers', data["href"])
if performer_id is None:
return None
logging.debug(f"insert one performer, id: {performer_id}, name: {data['person']}, href: {data['href']}")
# 插入新的 alias
for alias in data.get("performer_aka") or []:
if alias.lower() != "no known aliases":
cursor.execute("INSERT OR IGNORE INTO iafd_performer_aliases (performer_id, alias) VALUES (?, ?) ", (performer_id, alias))
conn.commit()
# 插入影片列表,可能有 personal 和 director 两个身份
credits = data.get('credits', {})
for role, movies in credits.items():
if movies:
for movie in movies:
movie_id = get_id_by_href('iafd_movies', movie['href'])
# 影片不存在,先插入
if movie_id is None:
movie_id = insert_movie_index(movie['title'], movie['href'], utils.to_number(movie['year']), from_performer_list=1)
if movie_id:
tmp_id = insert_performer_movie(performer_id, movie_id, role, movie['notes'])
if tmp_id :
logging.debug(f"insert one performer_movie, performer_id: {performer_id}, movie_id: {movie_id}, role: {role}")
else:
logging.warning(f"insert performer_movie failed. performer_id: {performer_id}, moive href: {movie['href']}")
return performer_id
except sqlite3.Error as e:
conn.rollback()
logging.error(f"数据库错误: {e}")
return None
except Exception as e:
conn.rollback()
logging.error(f"未知错误: {e}")
return None
# """插入或更新电影数据(异常url的处理比如404链接)"""
def insert_or_update_performer_404(name, href, is_full_data=1):
try:
cursor.execute("""
INSERT INTO iafd_performers (href, name, is_full_data, updated_at)
VALUES (?, ?, ?, datetime('now', 'localtime'))
ON CONFLICT(href) DO UPDATE SET
name = excluded.name,
is_full_data = excluded.is_full_data,
updated_at = datetime('now', 'localtime')
""", (
href, name, is_full_data
))
# 获取 performer_id
performer_id = get_id_by_href('iafd_performers', href)
if performer_id is None:
return None
logging.debug(f'insert one performer, id: {performer_id}, name: {name}, href: {href}')
return performer_id
except sqlite3.Error as e:
conn.rollback()
logging.error(f"数据库错误: {e}")
return None
except Exception as e:
conn.rollback()
logging.error(f"未知错误: {e}")
return None
# 按 id 或 href 删除演员
def delete_performer(identifier):
try:
if isinstance(identifier, int):
cursor.execute("DELETE FROM iafd_performers WHERE id = ?", (identifier,))
elif isinstance(identifier, str):
cursor.execute("DELETE FROM iafd_performers WHERE href = ?", (identifier,))
else:
logging.warning("无效的删除参数")
return
conn.commit()
logging.info(f"成功删除演员: {identifier}")
except sqlite3.Error as e:
conn.rollback()
logging.error(f"删除失败: {e}")
# 按 id、href 或 name 查询演员信息
def query_performer(identifier):
try:
if isinstance(identifier, int):
cursor.execute("SELECT * FROM iafd_performers WHERE id = ?", (identifier,))
elif "http" in identifier:
cursor.execute("SELECT * FROM iafd_performers WHERE href = ?", (identifier,))
else:
cursor.execute("SELECT * FROM iafd_performers WHERE name LIKE ?", (f"%{identifier}%",))
performer = cursor.fetchone()
if performer:
cursor.execute("SELECT alias FROM iafd_performer_aliases WHERE performer_id = ?", (performer[0],))
aliases = [row[0] for row in cursor.fetchall()]
result = dict(zip([desc[0] for desc in cursor.description], performer))
result["performer_aka"] = aliases
return result
else:
logging.warning(f"未找到演员: {identifier}")
return None
except sqlite3.Error as e:
logging.error(f"查询失败: {e}")
return None
# 按条件查询 href 列表
def query_performer_hrefs(**filters):
try:
sql = "SELECT href, name, id, movies_cnt FROM iafd_performers WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "href" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "name" in filters:
sql += " AND name LIKE ?"
params.append(f"%{filters['name']}%")
if "is_full_data" in filters:
sql += " AND is_full_data = ?"
params.append(filters["is_full_data"])
if "is_full_data_in" in filters:
values = filters["is_full_data_in"]
if values:
placeholders = ", ".join(["?"] * len(values))
sql += f" AND is_full_data IN ({placeholders})"
params.extend(values)
if "is_full_data_not_in" in filters:
values = filters["is_full_data_not_in"]
if values:
placeholders = ", ".join(["?"] * len(values))
sql += f" AND is_full_data NOT IN ({placeholders})"
params.extend(values)
if "before_updated_at" in filters:
sql += " AND updated_at <= ?"
params.append(filters["before_updated_at"])
if "after_updated_at" in filters:
sql += " AND updated_at >= ?"
params.append(filters["after_updated_at"])
if "start_id" in filters:
sql += " AND id > ?"
params.append(filters["start_id"])
if "order_by" in filters:
sql += " order by ? asc"
params.append(filters["order_by"])
if 'limit' in filters:
sql += " limit ?"
params.append(filters["limit"])
logging.debug(f"query sql: {sql}")
cursor.execute(sql, params)
#return [row[0].lower() for row in cursor.fetchall()] # 返回小写
return [{'href': row[0], 'name': row[1], 'id':row[2], 'movies_cnt':row[3]} for row in cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 插入或更新发行商 """
def insert_or_update_ethnic(data):
try:
cursor.execute("""
INSERT INTO iafd_meta_ethnic (name, href)
VALUES (?, ?)
ON CONFLICT(href) DO UPDATE SET
name = excluded.name
""", (data["name"], data["href"]))
conn.commit()
# 获取 performer_id
cursor.execute("SELECT id FROM iafd_meta_ethnic WHERE href = ?", (data["href"],))
dist_id = cursor.fetchone()[0]
if dist_id:
logging.debug(f"成功插入/更新ethnic: {data['name']}")
return dist_id
else:
return None
except sqlite3.Error as e:
conn.rollback()
logging.error(f"数据库错误: {e}")
return None
# 按条件查询 href 列表
def query_ethnic_hrefs(**filters):
try:
sql = "SELECT href, name FROM iafd_meta_ethnic WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "url" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "name" in filters:
sql += " AND name LIKE ?"
params.append(f"%{filters['name']}%")
cursor.execute(sql, params)
#return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写
return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 插入或更新发行商 """
def insert_or_update_distributor(data):
try:
cursor.execute("""
INSERT INTO iafd_distributors (name, href, updated_at)
VALUES (?, ? , datetime('now', 'localtime'))
ON CONFLICT(href) DO UPDATE SET
name = excluded.name,
updated_at = datetime('now', 'localtime')
""", (data["name"], data["href"]))
conn.commit()
# 获取 performer_id
cursor.execute("SELECT id FROM iafd_distributors WHERE href = ?", (data["href"],))
dist_id = cursor.fetchone()[0]
if dist_id:
logging.debug(f"成功插入/更新发行商: {data['name']}")
return dist_id
else:
return None
except sqlite3.Error as e:
conn.rollback()
logging.error(f"数据库错误: {e}")
return None
# 删除发行商(按 id 或 name """
def delete_distributor(identifier):
try:
if isinstance(identifier, int):
cursor.execute("DELETE FROM iafd_distributors WHERE id = ?", (identifier,))
elif isinstance(identifier, str):
cursor.execute("DELETE FROM iafd_distributors WHERE name = ?", (identifier,))
conn.commit()
logging.info(f"成功删除发行商: {identifier}")
except sqlite3.Error as e:
conn.rollback()
logging.error(f"删除失败: {e}")
# 查询发行商(按 id 或 name """
def query_distributor(identifier):
try:
if isinstance(identifier, int):
cursor.execute("SELECT * FROM iafd_distributors WHERE id = ?", (identifier,))
else:
cursor.execute("SELECT * FROM iafd_distributors WHERE name LIKE ?", (f"%{identifier}%",))
distributor = cursor.fetchone()
if distributor:
return dict(zip([desc[0] for desc in cursor.description], distributor))
else:
logging.warning(f"未找到发行商: {identifier}")
return None
except sqlite3.Error as e:
logging.error(f"查询失败: {e}")
return None
# 按条件查询 href 列表
def query_distributor_hrefs(**filters):
try:
sql = "SELECT href FROM iafd_distributors WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "url" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "name" in filters:
sql += " AND name LIKE ?"
params.append(f"%{filters['name']}%")
cursor.execute(sql, params)
return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# """ 插入或更新制作公司 """
def insert_or_update_studio(data):
try:
cursor.execute("""
INSERT INTO iafd_studios (name, href, updated_at)
VALUES (?, ?, datetime('now', 'localtime'))
ON CONFLICT(href) DO UPDATE SET
name = excluded.name,
updated_at = datetime('now', 'localtime')
""", (data["name"], data["href"]))
conn.commit()
# 获取 performer_id
cursor.execute("SELECT id FROM iafd_studios WHERE href = ?", (data["href"],))
stu_id = cursor.fetchone()[0]
if stu_id:
logging.debug(f"成功插入/更新发行商: {data['name']}")
return stu_id
else:
return None
except sqlite3.Error as e:
conn.rollback()
logging.error(f"数据库错误: {e}")
return None
# """ 删除制作公司(按 id 或 name """
def delete_studio(identifier):
try:
if isinstance(identifier, int):
cursor.execute("DELETE FROM iafd_studios WHERE id = ?", (identifier,))
elif isinstance(identifier, str):
cursor.execute("DELETE FROM iafd_studios WHERE name = ?", (identifier,))
conn.commit()
logging.info(f"成功删除制作公司: {identifier}")
except sqlite3.Error as e:
conn.rollback()
logging.error(f"删除失败: {e}")
# """ 查询制作公司(按 id 或 name """
def query_studio(identifier):
try:
if isinstance(identifier, int):
cursor.execute("SELECT * FROM iafd_studios WHERE id = ?", (identifier,))
else:
cursor.execute("SELECT * FROM iafd_studios WHERE name LIKE ?", (f"%{identifier}%",))
studio = cursor.fetchone()
if studio:
return dict(zip([desc[0] for desc in cursor.description], studio))
else:
logging.warning(f"未找到制作公司: {identifier}")
return None
except sqlite3.Error as e:
logging.error(f"查询失败: {e}")
return None
# 按条件查询 href 列表
def query_studio_hrefs(**filters):
try:
sql = "SELECT href FROM iafd_studios WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "href" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "name" in filters:
sql += " AND name LIKE ?"
params.append(f"%{filters['name']}%")
cursor.execute(sql, params)
return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# """插入或更新电影数据"""
def insert_or_update_movie(movie_data):
try:
# 获取相关 ID
distributor_id = get_id_by_href('iafd_distributors', movie_data['DistributorHref'])
studio_id = get_id_by_href('iafd_studios', movie_data['StudioHref'])
director_id = get_id_by_href('iafd_performers', movie_data['DirectorHref'])
# 导演不存在的话,插入一条
if (director_id is None) and utils.is_valid_person_url(movie_data['DirectorHref']):
director_id = insert_performer_index( movie_data['Director'], movie_data['DirectorHref'], from_movie_list=1)
if studio_id is None:
studio_id = 0
if distributor_id is None:
distributor_id = 0
# 插入或更新电影信息
cursor.execute(
"""
INSERT INTO iafd_movies (title, minutes, distributor_id, studio_id, release_date, added_to_IAFD_date,
all_girl, all_male, compilation, webscene, director_id, href, is_full_data, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, datetime('now', 'localtime'))
ON CONFLICT(href) DO UPDATE SET
title=excluded.title, minutes=excluded.minutes, distributor_id=excluded.distributor_id,
studio_id=excluded.studio_id, release_date=excluded.release_date,
added_to_IAFD_date=excluded.added_to_IAFD_date, all_girl=excluded.all_girl,
all_male=excluded.all_male, compilation=excluded.compilation, webscene=excluded.webscene,
director_id=excluded.director_id, is_full_data=1, updated_at = datetime('now', 'localtime')
""",
(movie_data['title'], movie_data['Minutes'], distributor_id, studio_id, movie_data['ReleaseDate'],
movie_data['AddedtoIAFDDate'], movie_data['All-Girl'], movie_data['All-Male'],
movie_data['Compilation'], movie_data['Webscene'], director_id, movie_data['href'])
)
conn.commit()
# 获取插入的 movie_id
movie_id = get_id_by_href('iafd_movies', movie_data['href'])
if movie_id is None:
return None
logging.debug(f"insert one move, id: {movie_id}, title: {movie_data['title']}, href: {movie_data['href']}")
# 导演-电影写入 关系表
if director_id:
tmp_id = insert_performer_movie(director_id, movie_id, 'directoral', '')
if tmp_id:
logging.debug(f"insert one perfomer_movie. director_id: {director_id}, movie_id:{movie_id}")
for director in movie_data.get('Directors', []):
director_id = get_id_by_href('iafd_performers', director['href'])
# 如果演员不存在,先插入
if (director_id is None) and utils.is_valid_person_url(director['href']):
director_id = insert_performer_index(director['name'], director['href'], from_movie_list=1)
logging.debug(f"insert one director. perfomer_id: {director_id}, movie_id:{movie_id} ")
if director_id:
tmp_id = insert_performer_movie(director_id, movie_id, 'directoral', '')
if tmp_id:
logging.debug(f"insert one perfomer_movie. director_id: {director_id}, movie_id:{movie_id}")
# 插入 performers_movies 关系表
for performer in movie_data.get('Performers', []):
performer_id = get_id_by_href('iafd_performers', performer['href'])
# 如果演员不存在,先插入
if performer_id is None:
performer_id = insert_performer_index(performer['name'], performer['href'], from_movie_list=1)
if performer_id:
notes = '|'.join(tag for tag in performer['tags'] if tag != performer['name'])
tmp_id = insert_performer_movie(performer_id, movie_id, 'personal', notes)
if tmp_id:
logging.debug(f"insert one perfomer_movie. perfomer_id: {performer_id}, movie_id:{movie_id}")
else:
logging.debug(f"insert perfomer_movie failed. perfomer_id: {performer_id}, movie_id:{movie_id}")
else:
logging.warning(f"insert perfomer failed. name: {performer['name']}, href: {performer['href']}")
# 插入 movies_appers_in 表
for appears in movie_data.get("AppearsIn", []):
appears_in_id = get_id_by_href('iafd_movies', appears['href'])
# 不存在,先插入
if appears_in_id is None:
appears_in_id = insert_movie_index( appears['title'], appears['href'])
if appears_in_id:
tmp_id = insert_movie_appears_in(movie_id, appears_in_id)
if tmp_id:
logging.debug(f"insert one movie_appears_in record. movie_id: {movie_id}, appears_in_id: {appears_in_id}")
else:
logging.warning(f"insert movie_appears_in failed. movie_id: {movie_id}, appears_in_id: {appears_in_id}")
else:
logging.warning(f"get appears_in_id failed. title: {appears['title']}, href: {appears['href']}")
return movie_id
except Exception as e:
conn.rollback()
logging.error("Error inserting movie: %s", e)
return None
# """插入或更新电影数据(异常url的处理比如404链接)"""
def insert_or_update_movie_404(title, href, is_full_data=1):
try:
# 插入或更新电影信息
cursor.execute(
"""
INSERT INTO iafd_movies (title, href, is_full_data, updated_at)
VALUES (?, ?, ?, datetime('now', 'localtime'))
ON CONFLICT(href) DO UPDATE SET
title=excluded.title, is_full_data=excluded.is_full_data, updated_at = datetime('now', 'localtime')
""",
(title, href, is_full_data)
)
conn.commit()
# 获取插入的 movie_id
movie_id = get_id_by_href('iafd_movies', href)
if movie_id is None:
return None
return movie_id
except Exception as e:
conn.rollback()
logging.error("Error inserting movie: %s", e)
return None
# 删除电影数据"""
def delete_movie(identifier):
try:
if isinstance(identifier, int):
cursor.execute("DELETE FROM iafd_movies WHERE id = ?", (identifier,))
elif isinstance(identifier, str):
cursor.execute("DELETE FROM iafd_movies WHERE href = ?", (identifier,))
else:
logging.warning("无效的删除参数")
return
conn.commit()
logging.info(f"Deleted movie with {identifier}")
except sqlite3.Error as e:
conn.rollback()
logging.error("Error deleting movie: %s", e)
# 查找电影数据"""
def query_movies(identifier):
try:
if isinstance(identifier, int):
cursor.execute("SELECT * FROM iafd_movies WHERE id = ?", (identifier,))
elif "http" in identifier:
cursor.execute("SELECT * FROM iafd_movies WHERE href = ?", (identifier,))
else:
cursor.execute("SELECT * FROM iafd_movies WHERE title LIKE ?", (f"%{identifier}%",))
movie = cursor.fetchone()
if movie:
cursor.execute("SELECT * FROM iafd_performers_movies WHERE performer_id = ?", (movie[0],))
performers = [row[0] for row in cursor.fetchall()]
result = dict(zip([desc[0] for desc in cursor.description], performers))
result["performers"] = performers
return result
else:
logging.warning(f"find no data: {identifier}")
return None
except sqlite3.Error as e:
logging.error(f"查询失败: {e}")
return None
# 按条件查询 href 列表
def query_movie_hrefs(**filters):
try:
sql = "SELECT href, title, id FROM iafd_movies WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "href" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "title" in filters:
sql += " AND title LIKE ?"
params.append(f"%{filters['title']}%")
if "is_full_data" in filters:
sql += " AND is_full_data = ?"
params.append(filters["is_full_data"])
if "is_full_data_in" in filters:
values = filters["is_full_data_in"]
if values:
placeholders = ", ".join(["?"] * len(values))
sql += f" AND is_full_data IN ({placeholders})"
params.extend(values)
if "is_full_data_not_in" in filters:
values = filters["is_full_data_not_in"]
if values:
placeholders = ", ".join(["?"] * len(values))
sql += f" AND is_full_data NOT IN ({placeholders})"
params.extend(values)
if "before_updated_at" in filters:
sql += " AND updated_at <= ?"
params.append(filters["before_updated_at"])
if "after_updated_at" in filters:
sql += " AND updated_at >= ?"
params.append(filters["after_updated_at"])
if "start_id" in filters:
sql += " AND id > ?"
params.append(filters["start_id"])
if "order_by" in filters:
sql += " order by ?"
params.append(filters["order_by"])
if 'limit' in filters:
sql += " limit ?"
params.append(filters["limit"])
logging.debug(f"query sql: {sql}")
cursor.execute(sql, params)
#return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写
return [{'href': row[0], 'title': row[1], 'id':row[2]} for row in cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return []
# 获取 view_iafd_performers_movies 中数据 不匹配的演员信息。
def get_performers_needed_update(limit=None):
try:
sql = """
SELECT href, name FROM view_iafd_performers_movies where actual_movies_cnt != movies_cnt
"""
if limit is not None:
sql += f" LIMIT {limit}"
cursor.execute(sql)
return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return []
# 生成一个复杂的演员电影数量的查询视图,来判断从电影列表中聚合出来的演员-影片数量,与从演员列表中抓取到的影片数量,是否相等。
def check_and_create_stat_table(taskid = 0):
try:
# 检查索引是否存在,如果不存在则创建
indexes = [
("idx_iafd_performers_movies_performer_id",
"CREATE INDEX idx_iafd_performers_movies_performer_id ON iafd_performers_movies (performer_id);"),
("idx_iafd_movies_director_id",
"CREATE INDEX idx_iafd_movies_director_id ON iafd_movies (director_id);"),
("idx_iafd_performers_id",
"CREATE INDEX idx_iafd_performers_id ON iafd_performers (id);")
]
for index_name, create_index_sql in indexes:
cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name=?", (index_name,))
if not cursor.fetchone():
cursor.execute(create_index_sql)
logging.info(f"Index {index_name} created successfully.")
else:
logging.info(f"Index {index_name} already exists.")
# 检查视图是否存在,如果不存在则创建
view_name = f"iafd_tmp_performers_stat_{taskid}"
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (view_name,))
if cursor.fetchone():
cursor.execute("drop table ?", (view_name,))
conn.commit()
create_view_sql = f"""
CREATE table {view_name} AS
SELECT
id,
href,
name,
movies_cnt,
SUM(CASE WHEN role = 'actor' THEN movie_count ELSE 0 END) AS actor_movie_count,
SUM(CASE WHEN role = 'director' THEN movie_count ELSE 0 END) AS director_movie_count
FROM (
SELECT
p.id,
p.href,
p.name,
p.movies_cnt,
COUNT(apm.movie_id) AS movie_count,
'actor' AS role
FROM
iafd_performers p
LEFT JOIN
iafd_performers_movies apm ON p.id = apm.performer_id
GROUP BY
p.id, p.href, p.name, p.movies_cnt
UNION ALL
SELECT
p.id,
p.href,
p.name,
p.movies_cnt,
COUNT(im.id) AS movie_count,
'director' AS role
FROM
iafd_performers p
LEFT JOIN
iafd_movies im ON p.id = im.director_id
GROUP BY
p.id, p.href, p.name, p.movies_cnt
) combined
GROUP BY
id, href, name, movies_cnt;
"""
cursor.execute(create_view_sql)
logging.info(f"table {view_name} created successfully.")
# 提交更改并关闭连接
conn.commit()
except sqlite3.Error as e:
logging.warning(f"An error occurred: {e}")
# 处理影片的 无码 字段
def reset_actor_movies(check_and_do = 0):
try:
# 检查表中是否已存在movies_cnt列
cursor.execute(f"PRAGMA table_info(iafd_performers);")
columns = [row[1] for row in cursor.fetchall()]
if 'movies_cnt' not in columns:
# 列不存在,添加新列
add_field_sql = f"""
ALTER TABLE iafd_performers ADD COLUMN movies_cnt INTEGER DEFAULT 0 NOT NULL;
"""
cursor.execute(add_field_sql)
logging.info("成功添加movies_cnt字段")
else:
logging.info("movies_cnt字段已存在跳过添加")
# 确保关联表有索引
cursor.execute(f"""
CREATE INDEX IF NOT EXISTS idx_iafd_performers_movies_performer_id
ON iafd_performers_movies(performer_id);
""")
# 创建临时表存储统计结果
cursor.execute(f"""
CREATE TEMPORARY TABLE temp_actor_counts AS
SELECT performer_id, COUNT(movie_id) AS cnt
FROM iafd_performers_movies
GROUP BY performer_id;
""")
# 为临时表添加索引
cursor.execute("CREATE INDEX idx_temp_performer_id ON temp_actor_counts(performer_id);")
# 更新主表
cursor.execute(f"""
UPDATE iafd_performers
SET movies_cnt = COALESCE((
SELECT cnt FROM temp_actor_counts
WHERE performer_id = iafd_performers.id
), 0); -- 使用COALESCE处理没有影片的演员
""")
updated_rows = cursor.rowcount
logging.info(f"成功更新{updated_rows}个演员的影片数量")
# 清理资源
cursor.execute("DROP TABLE IF EXISTS temp_actor_counts;")
conn.commit()
logging.info("任务执行完成!")
except sqlite3.Error as e:
conn.rollback()
logging.error("Error updating actor movie_cnt: %s", e)
# 插入一条任务日志
def insert_task_log():
try:
cursor.execute("""
INSERT INTO iafd_task_log (task_status) VALUES ('Start')
""")
conn.commit()
task_id = cursor.lastrowid
if task_id is None:
return None
update_task_log(task_id=task_id, task_status='Start')
return task_id # 获取插入的 task_id
except sqlite3.Error as e:
logging.error(f"插入任务失败: {e}")
return None
# 更新任务日志的字段
def update_task_log_inner(task_id, **kwargs):
try:
fields = ", ".join(f"{key} = ?" for key in kwargs.keys())
params = list(kwargs.values()) + [task_id]
sql = f"UPDATE iafd_task_log SET {fields}, updated_at = datetime('now', 'localtime') WHERE task_id = ?"
cursor.execute(sql, params)
conn.commit()
except sqlite3.Error as e:
logging.error(f"更新任务 {task_id} 失败: {e}")
# 更新任务日志的字段
def update_task_log(task_id, task_status):
try:
# 获取 performers、studios 等表的最终行数
cursor.execute("SELECT COUNT(*) FROM iafd_performers where is_full_data=1")
full_data_performers = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_performers")
total_performers = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_movies where is_full_data=1")
full_data_movies = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_movies")
total_movies = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_distributors")
total_distributors = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_studios")
total_studios = cursor.fetchone()[0]
# 更新 task_log
update_task_log_inner(task_id,
full_data_performers=full_data_performers,
total_performers=total_performers,
full_data_movies=full_data_movies,
total_movies=total_movies,
total_distributors=total_distributors,
total_studios=total_studios,
task_status=task_status)
except sqlite3.Error as e:
logging.error(f"更新任务 {task_id} 失败: {e}")
# 任务结束,更新字段
def finalize_task_log(task_id):
try:
# 更新 task_log
update_task_log(task_id, task_status="Success")
except sqlite3.Error as e:
logging.error(f"任务 {task_id} 结束失败: {e}")
if __name__ == "__main__":
check_and_create_stat_table()
'''
try:
with open('../result/detail.json', 'r') as file:
performers = json.load(file)
for performer in performers:
insert_or_update_performer(performer)
print(query_performer("Kirsten"))
#delete_performer("https://www.iafd.com/person.rme/id=ca699282-1b57-4ce7-9bcc-d7799a292e34")
print(query_performer_hrefs())
except FileNotFoundError:
logging.info("detail.json not found, starting fresh.")
'''