import sqlite3 import json import config import utils import logging import sys from datetime import datetime # 连接 SQLite 数据库 DB_PATH = f"{config.global_share_data_dir}/sqlite/shared.db" # 替换为你的数据库文件 conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # 获取当前时间 def get_current_time(): return datetime.now().strftime("%Y-%m-%d %H:%M:%S") # """从指定表中通过 href 查找 id""" def get_id_by_href(table: str, href: str) -> int: if href is None: return None cursor.execute(f"SELECT id FROM {table} WHERE href = ?", (href,)) row = cursor.fetchone() return row[0] if row else None # 插入演员索引,来自于列表数据 def insert_performer_index(name, href, from_astro_list=None, from_birth_list=None, from_ethnic_list=None, from_movie_list=None): try: # **查询是否已存在该演员** cursor.execute(""" SELECT id, name, from_astro_list, from_birth_list, from_ethnic_list, from_movie_list FROM iafd_performers WHERE href = ? """, (href,)) existing_performer = cursor.fetchone() if existing_performer: # **如果演员已存在** performer_id, existing_name, existing_astro, existing_birth, existing_ethnic, existing_movie = existing_performer # **如果没有传入值,则保持原有值** from_astro_list = from_astro_list if from_astro_list is not None else existing_astro from_birth_list = from_birth_list if from_birth_list is not None else existing_birth from_ethnic_list = from_ethnic_list if from_ethnic_list is not None else existing_ethnic from_movie_list = from_movie_list if from_movie_list is not None else existing_movie cursor.execute(""" UPDATE iafd_performers SET name = ?, from_astro_list = ?, from_birth_list = ?, from_ethnic_list = ?, from_movie_list = ?, updated_at = datetime('now', 'localtime') WHERE href = ? """, (name, from_astro_list, from_birth_list, from_ethnic_list, from_movie_list, href)) else: # **如果演员不存在,插入** cursor.execute(""" INSERT INTO iafd_performers (href, name, from_astro_list, from_birth_list, from_ethnic_list, from_movie_list) VALUES (?, ?, COALESCE(?, 0), COALESCE(?, 0), COALESCE(?, 0), COALESCE(?, 0)) """, (href, name, from_astro_list, from_birth_list, from_ethnic_list, from_movie_list)) conn.commit() performer_id = get_id_by_href('iafd_performers', href) if performer_id: logging.debug(f'Inserted/Updated performer index, id: {performer_id}, name: {name}, href: {href}') return performer_id except sqlite3.Error as e: conn.rollback() logging.error(f"数据库错误: {e}") return None except Exception as e: conn.rollback() logging.error(f"未知错误: {e}") return None # """插入电影索引,来自于列表数据""" def insert_movie_index(title, href, release_year=0, from_performer_list=None, from_dist_list=None, from_stu_list=None): try: # **查询是否已存在该电影** cursor.execute(""" SELECT id, title, release_year, from_performer_list, from_dist_list, from_stu_list FROM iafd_movies WHERE href = ? """, (href,)) existing_movie = cursor.fetchone() if existing_movie: # **如果电影已存在** movie_id, existing_title, existing_year, existing_performer, existing_dist, existing_stu = existing_movie # **如果没有传入值,则保持原有值** release_year = release_year if release_year != 0 else existing_year from_performer_list = from_performer_list if from_performer_list is not None else existing_performer from_dist_list = from_dist_list if from_dist_list is not None else existing_dist from_stu_list = from_stu_list if from_stu_list is not None else existing_stu cursor.execute(""" UPDATE iafd_movies SET title = ?, release_year = ?, from_performer_list = ?, from_dist_list = ?, from_stu_list = ?, updated_at = datetime('now', 'localtime') WHERE href = ? """, (title, release_year, from_performer_list, from_dist_list, from_stu_list, href)) else: # **如果电影不存在,插入** cursor.execute(""" INSERT INTO iafd_movies (title, href, release_year, from_performer_list, from_dist_list, from_stu_list) VALUES (?, ?, ?, COALESCE(?, 0), COALESCE(?, 0), COALESCE(?, 0)) """, (title, href, release_year, from_performer_list, from_dist_list, from_stu_list)) conn.commit() movie_id = get_id_by_href('iafd_movies', href) if movie_id: logging.debug(f'Inserted/Updated movie index, id: {movie_id}, title: {title}, href: {href}') return movie_id except sqlite3.Error as e: conn.rollback() logging.error(f"数据库错误: {e}") return None except Exception as e: conn.rollback() logging.error(f"未知错误: {e}") return None # 插入演员和电影的关联数据 def insert_performer_movie(performer_id, movie_id, role, notes): try: cursor.execute(""" INSERT INTO iafd_performers_movies (performer_id, movie_id, role, notes) VALUES (?, ?, ?, ?) ON CONFLICT(movie_id, performer_id) DO UPDATE SET notes=excluded.notes, role=excluded.role """, (performer_id, movie_id, role, notes) ) conn.commit() #logging.debug(f'insert one performer_movie, performer_id: {performer_id}, movie_id: {movie_id}') return performer_id except Exception as e: conn.rollback() logging.error("Error inserting movie: %s", e) return None # 插入电影和电影的关联数据 def insert_movie_appears_in(movie_id, appears_in_id, gradation=0, notes=''): try: cursor.execute(""" INSERT INTO iafd_movies_appers_in (movie_id, appears_in_id, gradation, notes) VALUES (?, ?, ?, ?) ON CONFLICT(movie_id, appears_in_id) DO UPDATE SET notes=excluded.notes, gradation=excluded.gradation """, (movie_id, appears_in_id, gradation, notes) ) conn.commit() #logging.debug(f'insert one movie_appears_in, movie_id: {movie_id}, appears_in_id: {appears_in_id}') return movie_id except Exception as e: conn.rollback() logging.error("Error inserting movie: %s", e) return None # 插入演员信息 def insert_or_update_performer(data, movies_update=True): try: cursor.execute(""" INSERT INTO iafd_performers (href, name, gender, birthday, astrology, birthplace, years_active, ethnicity, nationality, hair_colors, eye_color, height_str, weight_str, measurements, tattoos, piercings, weight, height, movies_cnt, vixen_cnt, blacked_cnt, tushy_cnt, x_art_cnt, is_full_data, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, datetime('now', 'localtime')) ON CONFLICT(href) DO UPDATE SET name = excluded.name, gender = excluded.gender, birthday = excluded.birthday, astrology = excluded.astrology, birthplace = excluded.birthplace, years_active = excluded.years_active, ethnicity = excluded.ethnicity, nationality = excluded.nationality, hair_colors = excluded.hair_colors, eye_color = excluded.eye_color, height_str = excluded.height_str, weight_str = excluded.weight_str, measurements = excluded.measurements, tattoos = excluded.tattoos, piercings = excluded.piercings, weight = excluded.weight, height = excluded.height, movies_cnt = excluded.movies_cnt, vixen_cnt = excluded.vixen_cnt, blacked_cnt = excluded.blacked_cnt, tushy_cnt = excluded.tushy_cnt, x_art_cnt = excluded.x_art_cnt, is_full_data = 1, updated_at = datetime('now', 'localtime') """, ( data["href"], data["person"], data.get("gender"), data.get("birthday"), data.get("astrology"), data.get("birthplace"), data.get("years_active"), data.get("ethnicity"), data.get("nationality"), data.get("hair_colors"), data.get("eye_color"), data.get("height"), data.get("weight"), data.get("measurements"), data.get("tattoos"), data.get("piercings"), utils.parse_weight(data.get('weight')), utils.parse_height(data.get('height')), data.get("movies_cnt", 0), data.get("vixen_cnt", 0), data.get("blacked_cnt", 0), data.get("tushy_cnt", 0), data.get("x_art_cnt", 0) )) # 获取 performer_id performer_id = get_id_by_href('iafd_performers', data["href"]) if performer_id is None: return None logging.debug(f"insert one performer, id: {performer_id}, name: {data['person']}, href: {data['href']}") # 插入新的 alias for alias in data.get("performer_aka") or []: if alias.lower() != "no known aliases": cursor.execute("INSERT OR IGNORE INTO iafd_performer_aliases (performer_id, alias) VALUES (?, ?) ", (performer_id, alias)) conn.commit() # 插入影片列表,可能有 personal 和 director 两个身份 if movies_update: credits = data.get('credits', {}) for role, movies in credits.items(): if movies: for movie in movies: movie_id = get_id_by_href('iafd_movies', movie['href']) # 影片不存在,先插入 if movie_id is None: movie_id = insert_movie_index(movie['title'], movie['href'], utils.to_number(movie['year']), from_performer_list=1) if movie_id: tmp_id = insert_performer_movie(performer_id, movie_id, role, movie['notes']) if tmp_id : logging.debug(f"insert one performer_movie, performer_id: {performer_id}, movie_id: {movie_id}, role: {role}") else: logging.warning(f"insert performer_movie failed. performer_id: {performer_id}, moive href: {movie['href']}") return performer_id except sqlite3.Error as e: conn.rollback() logging.error(f"数据库错误: {e}") return None except Exception as e: conn.rollback() logging.error(f"未知错误: {e}") return None # """插入或更新电影数据(异常url的处理,比如404链接)""" def insert_or_update_performer_404(name, href, is_full_data=1): try: cursor.execute(""" INSERT INTO iafd_performers (href, name, is_full_data, updated_at) VALUES (?, ?, ?, datetime('now', 'localtime')) ON CONFLICT(href) DO UPDATE SET name = excluded.name, is_full_data = excluded.is_full_data, updated_at = datetime('now', 'localtime') """, ( href, name, is_full_data )) # 获取 performer_id performer_id = get_id_by_href('iafd_performers', href) if performer_id is None: return None logging.debug(f'insert one performer, id: {performer_id}, name: {name}, href: {href}') return performer_id except sqlite3.Error as e: conn.rollback() logging.error(f"数据库错误: {e}") return None except Exception as e: conn.rollback() logging.error(f"未知错误: {e}") return None # 按 id 或 href 删除演员 def delete_performer(identifier): try: if isinstance(identifier, int): cursor.execute("DELETE FROM iafd_performers WHERE id = ?", (identifier,)) elif isinstance(identifier, str): cursor.execute("DELETE FROM iafd_performers WHERE href = ?", (identifier,)) else: logging.warning("无效的删除参数") return conn.commit() logging.info(f"成功删除演员: {identifier}") except sqlite3.Error as e: conn.rollback() logging.error(f"删除失败: {e}") # 按 id、href 或 name 查询演员信息 def query_performer(identifier): try: if isinstance(identifier, int): cursor.execute("SELECT * FROM iafd_performers WHERE id = ?", (identifier,)) elif "http" in identifier: cursor.execute("SELECT * FROM iafd_performers WHERE href = ?", (identifier,)) else: cursor.execute("SELECT * FROM iafd_performers WHERE name LIKE ?", (f"%{identifier}%",)) performer = cursor.fetchone() if performer: cursor.execute("SELECT alias FROM iafd_performer_aliases WHERE performer_id = ?", (performer[0],)) aliases = [row[0] for row in cursor.fetchall()] result = dict(zip([desc[0] for desc in cursor.description], performer)) result["performer_aka"] = aliases return result else: logging.warning(f"未找到演员: {identifier}") return None except sqlite3.Error as e: logging.error(f"查询失败: {e}") return None # 按条件查询 href 列表 def query_performer_hrefs(**filters): try: sql = "SELECT href, name, id, movies_cnt FROM iafd_performers WHERE 1=1" params = [] if "id" in filters: sql += " AND id = ?" params.append(filters["id"]) if "href" in filters: sql += " AND href = ?" params.append(filters["href"]) if "name" in filters: sql += " AND name LIKE ?" params.append(f"%{filters['name']}%") if "is_full_data" in filters: sql += " AND is_full_data = ?" params.append(filters["is_full_data"]) if "is_full_data_in" in filters: values = filters["is_full_data_in"] if values: placeholders = ", ".join(["?"] * len(values)) sql += f" AND is_full_data IN ({placeholders})" params.extend(values) if "is_full_data_not_in" in filters: values = filters["is_full_data_not_in"] if values: placeholders = ", ".join(["?"] * len(values)) sql += f" AND is_full_data NOT IN ({placeholders})" params.extend(values) if "before_updated_at" in filters: sql += " AND updated_at <= ?" params.append(filters["before_updated_at"]) if "after_updated_at" in filters: sql += " AND updated_at >= ?" params.append(filters["after_updated_at"]) if "start_id" in filters: sql += " AND id > ?" params.append(filters["start_id"]) if "order_by" in filters: sql += " order by ? asc" params.append(filters["order_by"]) if 'limit' in filters: sql += " limit ?" params.append(filters["limit"]) logging.debug(f"query sql: {sql}") cursor.execute(sql, params) #return [row[0].lower() for row in cursor.fetchall()] # 返回小写 return [{'href': row[0], 'name': row[1], 'id':row[2], 'movies_cnt':row[3]} for row in cursor.fetchall()] except sqlite3.Error as e: logging.error(f"查询 href 失败: {e}") return None # 插入或更新发行商 """ def insert_or_update_ethnic(data): try: cursor.execute(""" INSERT INTO iafd_meta_ethnic (name, href) VALUES (?, ?) ON CONFLICT(href) DO UPDATE SET name = excluded.name """, (data["name"], data["href"])) conn.commit() # 获取 performer_id cursor.execute("SELECT id FROM iafd_meta_ethnic WHERE href = ?", (data["href"],)) dist_id = cursor.fetchone()[0] if dist_id: logging.debug(f"成功插入/更新ethnic: {data['name']}") return dist_id else: return None except sqlite3.Error as e: conn.rollback() logging.error(f"数据库错误: {e}") return None # 按条件查询 href 列表 def query_ethnic_hrefs(**filters): try: sql = "SELECT href, name FROM iafd_meta_ethnic WHERE 1=1" params = [] if "id" in filters: sql += " AND id = ?" params.append(filters["id"]) if "url" in filters: sql += " AND href = ?" params.append(filters["href"]) if "name" in filters: sql += " AND name LIKE ?" params.append(f"%{filters['name']}%") cursor.execute(sql, params) #return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写 return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()] except sqlite3.Error as e: logging.error(f"查询 href 失败: {e}") return None # 插入或更新发行商 """ def insert_or_update_distributor(data): try: cursor.execute(""" INSERT INTO iafd_distributors (name, href, updated_at) VALUES (?, ? , datetime('now', 'localtime')) ON CONFLICT(href) DO UPDATE SET name = excluded.name, updated_at = datetime('now', 'localtime') """, (data["name"], data["href"])) conn.commit() # 获取 performer_id cursor.execute("SELECT id FROM iafd_distributors WHERE href = ?", (data["href"],)) dist_id = cursor.fetchone()[0] if dist_id: logging.debug(f"成功插入/更新发行商: {data['name']}") return dist_id else: return None except sqlite3.Error as e: conn.rollback() logging.error(f"数据库错误: {e}") return None # 删除发行商(按 id 或 name) """ def delete_distributor(identifier): try: if isinstance(identifier, int): cursor.execute("DELETE FROM iafd_distributors WHERE id = ?", (identifier,)) elif isinstance(identifier, str): cursor.execute("DELETE FROM iafd_distributors WHERE name = ?", (identifier,)) conn.commit() logging.info(f"成功删除发行商: {identifier}") except sqlite3.Error as e: conn.rollback() logging.error(f"删除失败: {e}") # 查询发行商(按 id 或 name) """ def query_distributor(identifier): try: if isinstance(identifier, int): cursor.execute("SELECT * FROM iafd_distributors WHERE id = ?", (identifier,)) else: cursor.execute("SELECT * FROM iafd_distributors WHERE name LIKE ?", (f"%{identifier}%",)) distributor = cursor.fetchone() if distributor: return dict(zip([desc[0] for desc in cursor.description], distributor)) else: logging.warning(f"未找到发行商: {identifier}") return None except sqlite3.Error as e: logging.error(f"查询失败: {e}") return None # 按条件查询 href 列表 def query_distributor_hrefs(**filters): try: sql = "SELECT href FROM iafd_distributors WHERE 1=1" params = [] if "id" in filters: sql += " AND id = ?" params.append(filters["id"]) if "url" in filters: sql += " AND href = ?" params.append(filters["href"]) if "name" in filters: sql += " AND name LIKE ?" params.append(f"%{filters['name']}%") cursor.execute(sql, params) return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写 except sqlite3.Error as e: logging.error(f"查询 href 失败: {e}") return None # """ 插入或更新制作公司 """ def insert_or_update_studio(data): try: cursor.execute(""" INSERT INTO iafd_studios (name, href, updated_at) VALUES (?, ?, datetime('now', 'localtime')) ON CONFLICT(href) DO UPDATE SET name = excluded.name, updated_at = datetime('now', 'localtime') """, (data["name"], data["href"])) conn.commit() # 获取 performer_id cursor.execute("SELECT id FROM iafd_studios WHERE href = ?", (data["href"],)) stu_id = cursor.fetchone()[0] if stu_id: logging.debug(f"成功插入/更新发行商: {data['name']}") return stu_id else: return None except sqlite3.Error as e: conn.rollback() logging.error(f"数据库错误: {e}") return None # """ 删除制作公司(按 id 或 name) """ def delete_studio(identifier): try: if isinstance(identifier, int): cursor.execute("DELETE FROM iafd_studios WHERE id = ?", (identifier,)) elif isinstance(identifier, str): cursor.execute("DELETE FROM iafd_studios WHERE name = ?", (identifier,)) conn.commit() logging.info(f"成功删除制作公司: {identifier}") except sqlite3.Error as e: conn.rollback() logging.error(f"删除失败: {e}") # """ 查询制作公司(按 id 或 name) """ def query_studio(identifier): try: if isinstance(identifier, int): cursor.execute("SELECT * FROM iafd_studios WHERE id = ?", (identifier,)) else: cursor.execute("SELECT * FROM iafd_studios WHERE name LIKE ?", (f"%{identifier}%",)) studio = cursor.fetchone() if studio: return dict(zip([desc[0] for desc in cursor.description], studio)) else: logging.warning(f"未找到制作公司: {identifier}") return None except sqlite3.Error as e: logging.error(f"查询失败: {e}") return None # 按条件查询 href 列表 def query_studio_hrefs(**filters): try: sql = "SELECT href FROM iafd_studios WHERE 1=1" params = [] if "id" in filters: sql += " AND id = ?" params.append(filters["id"]) if "href" in filters: sql += " AND href = ?" params.append(filters["href"]) if "name" in filters: sql += " AND name LIKE ?" params.append(f"%{filters['name']}%") cursor.execute(sql, params) return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写 except sqlite3.Error as e: logging.error(f"查询 href 失败: {e}") return None # """插入或更新电影数据""" def insert_or_update_movie(movie_data): try: # 获取相关 ID distributor_id = get_id_by_href('iafd_distributors', movie_data['DistributorHref']) studio_id = get_id_by_href('iafd_studios', movie_data['StudioHref']) director_id = get_id_by_href('iafd_performers', movie_data['DirectorHref']) # 导演不存在的话,插入一条 if (director_id is None) and utils.is_valid_person_url(movie_data['DirectorHref']): director_id = insert_performer_index( movie_data['Director'], movie_data['DirectorHref'], from_movie_list=1) if studio_id is None: studio_id = 0 if distributor_id is None: distributor_id = 0 # 插入或更新电影信息 cursor.execute( """ INSERT INTO iafd_movies (title, minutes, distributor_id, studio_id, release_date, added_to_IAFD_date, all_girl, all_male, compilation, webscene, director_id, href, is_full_data, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, datetime('now', 'localtime')) ON CONFLICT(href) DO UPDATE SET title=excluded.title, minutes=excluded.minutes, distributor_id=excluded.distributor_id, studio_id=excluded.studio_id, release_date=excluded.release_date, added_to_IAFD_date=excluded.added_to_IAFD_date, all_girl=excluded.all_girl, all_male=excluded.all_male, compilation=excluded.compilation, webscene=excluded.webscene, director_id=excluded.director_id, is_full_data=1, updated_at = datetime('now', 'localtime') """, (movie_data['title'], movie_data['Minutes'], distributor_id, studio_id, movie_data['ReleaseDate'], movie_data['AddedtoIAFDDate'], movie_data['All-Girl'], movie_data['All-Male'], movie_data['Compilation'], movie_data['Webscene'], director_id, movie_data['href']) ) conn.commit() # 获取插入的 movie_id movie_id = get_id_by_href('iafd_movies', movie_data['href']) if movie_id is None: return None logging.debug(f"insert one move, id: {movie_id}, title: {movie_data['title']}, href: {movie_data['href']}") # 导演-电影写入 关系表 if director_id: tmp_id = insert_performer_movie(director_id, movie_id, 'directoral', '') if tmp_id: logging.debug(f"insert one perfomer_movie. director_id: {director_id}, movie_id:{movie_id}") for director in movie_data.get('Directors', []): director_id = get_id_by_href('iafd_performers', director['href']) # 如果演员不存在,先插入 if (director_id is None) and utils.is_valid_person_url(director['href']): director_id = insert_performer_index(director['name'], director['href'], from_movie_list=1) logging.debug(f"insert one director. perfomer_id: {director_id}, movie_id:{movie_id} ") if director_id: tmp_id = insert_performer_movie(director_id, movie_id, 'directoral', '') if tmp_id: logging.debug(f"insert one perfomer_movie. director_id: {director_id}, movie_id:{movie_id}") # 插入 performers_movies 关系表 for performer in movie_data.get('Performers', []): performer_id = get_id_by_href('iafd_performers', performer['href']) # 如果演员不存在,先插入 if performer_id is None: performer_id = insert_performer_index(performer['name'], performer['href'], from_movie_list=1) if performer_id: notes = '|'.join(tag for tag in performer['tags'] if tag != performer['name']) tmp_id = insert_performer_movie(performer_id, movie_id, 'personal', notes) if tmp_id: logging.debug(f"insert one perfomer_movie. perfomer_id: {performer_id}, movie_id:{movie_id}") else: logging.debug(f"insert perfomer_movie failed. perfomer_id: {performer_id}, movie_id:{movie_id}") else: logging.warning(f"insert perfomer failed. name: {performer['name']}, href: {performer['href']}") # 插入 movies_appers_in 表 for appears in movie_data.get("AppearsIn", []): appears_in_id = get_id_by_href('iafd_movies', appears['href']) # 不存在,先插入 if appears_in_id is None: appears_in_id = insert_movie_index( appears['title'], appears['href']) if appears_in_id: tmp_id = insert_movie_appears_in(movie_id, appears_in_id) if tmp_id: logging.debug(f"insert one movie_appears_in record. movie_id: {movie_id}, appears_in_id: {appears_in_id}") else: logging.warning(f"insert movie_appears_in failed. movie_id: {movie_id}, appears_in_id: {appears_in_id}") else: logging.warning(f"get appears_in_id failed. title: {appears['title']}, href: {appears['href']}") return movie_id except Exception as e: conn.rollback() logging.error("Error inserting movie: %s", e) return None # """插入或更新电影数据(异常url的处理,比如404链接)""" def insert_or_update_movie_404(title, href, is_full_data=1): try: # 插入或更新电影信息 cursor.execute( """ INSERT INTO iafd_movies (title, href, is_full_data, updated_at) VALUES (?, ?, ?, datetime('now', 'localtime')) ON CONFLICT(href) DO UPDATE SET title=excluded.title, is_full_data=excluded.is_full_data, updated_at = datetime('now', 'localtime') """, (title, href, is_full_data) ) conn.commit() # 获取插入的 movie_id movie_id = get_id_by_href('iafd_movies', href) if movie_id is None: return None return movie_id except Exception as e: conn.rollback() logging.error("Error inserting movie: %s", e) return None # 删除电影数据""" def delete_movie(identifier): try: if isinstance(identifier, int): cursor.execute("DELETE FROM iafd_movies WHERE id = ?", (identifier,)) elif isinstance(identifier, str): cursor.execute("DELETE FROM iafd_movies WHERE href = ?", (identifier,)) else: logging.warning("无效的删除参数") return conn.commit() logging.info(f"Deleted movie with {identifier}") except sqlite3.Error as e: conn.rollback() logging.error("Error deleting movie: %s", e) # 查找电影数据""" def query_movies(identifier): try: if isinstance(identifier, int): cursor.execute("SELECT * FROM iafd_movies WHERE id = ?", (identifier,)) elif "http" in identifier: cursor.execute("SELECT * FROM iafd_movies WHERE href = ?", (identifier,)) else: cursor.execute("SELECT * FROM iafd_movies WHERE title LIKE ?", (f"%{identifier}%",)) movie = cursor.fetchone() if movie: cursor.execute("SELECT * FROM iafd_performers_movies WHERE performer_id = ?", (movie[0],)) performers = [row[0] for row in cursor.fetchall()] result = dict(zip([desc[0] for desc in cursor.description], performers)) result["performers"] = performers return result else: logging.warning(f"find no data: {identifier}") return None except sqlite3.Error as e: logging.error(f"查询失败: {e}") return None # 按条件查询 href 列表 def query_movie_hrefs(**filters): try: sql = "SELECT href, title, id FROM iafd_movies WHERE 1=1" params = [] if "id" in filters: sql += " AND id = ?" params.append(filters["id"]) if "href" in filters: sql += " AND href = ?" params.append(filters["href"]) if "title" in filters: sql += " AND title LIKE ?" params.append(f"%{filters['title']}%") if "is_full_data" in filters: sql += " AND is_full_data = ?" params.append(filters["is_full_data"]) if "is_full_data_in" in filters: values = filters["is_full_data_in"] if values: placeholders = ", ".join(["?"] * len(values)) sql += f" AND is_full_data IN ({placeholders})" params.extend(values) if "is_full_data_not_in" in filters: values = filters["is_full_data_not_in"] if values: placeholders = ", ".join(["?"] * len(values)) sql += f" AND is_full_data NOT IN ({placeholders})" params.extend(values) if "before_updated_at" in filters: sql += " AND updated_at <= ?" params.append(filters["before_updated_at"]) if "after_updated_at" in filters: sql += " AND updated_at >= ?" params.append(filters["after_updated_at"]) if "start_id" in filters: sql += " AND id > ?" params.append(filters["start_id"]) if "order_by" in filters: sql += " order by ?" params.append(filters["order_by"]) if 'limit' in filters: sql += " limit ?" params.append(filters["limit"]) logging.debug(f"query sql: {sql}") cursor.execute(sql, params) #return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写 return [{'href': row[0], 'title': row[1], 'id':row[2]} for row in cursor.fetchall()] except sqlite3.Error as e: logging.error(f"查询 href 失败: {e}") return [] # 获取 view_iafd_performers_movies 中数据 不匹配的演员信息。 def get_performers_needed_update(limit=None): try: sql = """ SELECT href, name FROM view_iafd_performers_movies where actual_movies_cnt != movies_cnt """ if limit is not None: sql += f" LIMIT {limit}" cursor.execute(sql) return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()] except sqlite3.Error as e: logging.error(f"查询 href 失败: {e}") return [] # 生成一个复杂的演员电影数量的查询视图,来判断从电影列表中聚合出来的演员-影片数量,与从演员列表中抓取到的影片数量,是否相等。 def check_and_create_stat_table(taskid = 0): try: # 检查索引是否存在,如果不存在则创建 indexes = [ ("idx_iafd_performers_movies_performer_id", "CREATE INDEX idx_iafd_performers_movies_performer_id ON iafd_performers_movies (performer_id);"), ("idx_iafd_movies_director_id", "CREATE INDEX idx_iafd_movies_director_id ON iafd_movies (director_id);"), ("idx_iafd_performers_id", "CREATE INDEX idx_iafd_performers_id ON iafd_performers (id);") ] for index_name, create_index_sql in indexes: cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name=?", (index_name,)) if not cursor.fetchone(): cursor.execute(create_index_sql) logging.info(f"Index {index_name} created successfully.") else: logging.info(f"Index {index_name} already exists.") # 检查视图是否存在,如果不存在则创建 view_name = f"iafd_tmp_performers_stat_{taskid}" cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (view_name,)) if cursor.fetchone(): cursor.execute("drop table ?", (view_name,)) conn.commit() create_view_sql = f""" CREATE table {view_name} AS SELECT id, href, name, movies_cnt, SUM(CASE WHEN role = 'actor' THEN movie_count ELSE 0 END) AS actor_movie_count, SUM(CASE WHEN role = 'director' THEN movie_count ELSE 0 END) AS director_movie_count FROM ( SELECT p.id, p.href, p.name, p.movies_cnt, COUNT(apm.movie_id) AS movie_count, 'actor' AS role FROM iafd_performers p LEFT JOIN iafd_performers_movies apm ON p.id = apm.performer_id GROUP BY p.id, p.href, p.name, p.movies_cnt UNION ALL SELECT p.id, p.href, p.name, p.movies_cnt, COUNT(im.id) AS movie_count, 'director' AS role FROM iafd_performers p LEFT JOIN iafd_movies im ON p.id = im.director_id GROUP BY p.id, p.href, p.name, p.movies_cnt ) combined GROUP BY id, href, name, movies_cnt; """ cursor.execute(create_view_sql) logging.info(f"table {view_name} created successfully.") # 提交更改并关闭连接 conn.commit() except sqlite3.Error as e: logging.warning(f"An error occurred: {e}") # 处理影片的 无码 字段 def reset_actor_movies(check_and_do = 0): try: # 检查表中是否已存在movies_cnt列 cursor.execute(f"PRAGMA table_info(iafd_performers);") columns = [row[1] for row in cursor.fetchall()] if 'movies_cnt' not in columns: # 列不存在,添加新列 add_field_sql = f""" ALTER TABLE iafd_performers ADD COLUMN movies_cnt INTEGER DEFAULT 0 NOT NULL; """ cursor.execute(add_field_sql) logging.info("成功添加movies_cnt字段") else: logging.info("movies_cnt字段已存在,跳过添加") # 确保关联表有索引 cursor.execute(f""" CREATE INDEX IF NOT EXISTS idx_iafd_performers_movies_performer_id ON iafd_performers_movies(performer_id); """) # 创建临时表存储统计结果 cursor.execute(f""" CREATE TEMPORARY TABLE temp_actor_counts AS SELECT performer_id, COUNT(movie_id) AS cnt FROM iafd_performers_movies GROUP BY performer_id; """) # 为临时表添加索引 cursor.execute("CREATE INDEX idx_temp_performer_id ON temp_actor_counts(performer_id);") # 更新主表 cursor.execute(f""" UPDATE iafd_performers SET movies_cnt = COALESCE(( SELECT cnt FROM temp_actor_counts WHERE performer_id = iafd_performers.id ), 0); -- 使用COALESCE处理没有影片的演员 """) updated_rows = cursor.rowcount logging.info(f"成功更新{updated_rows}个演员的影片数量") # 清理资源 cursor.execute("DROP TABLE IF EXISTS temp_actor_counts;") conn.commit() logging.info("任务执行完成!") except sqlite3.Error as e: conn.rollback() logging.error("Error updating actor movie_cnt: %s", e) # 插入一条任务日志 def insert_task_log(): try: cursor.execute(""" INSERT INTO iafd_task_log (task_status) VALUES ('Start') """) conn.commit() task_id = cursor.lastrowid if task_id is None: return None update_task_log(task_id=task_id, task_status='Start') return task_id # 获取插入的 task_id except sqlite3.Error as e: logging.error(f"插入任务失败: {e}") return None # 更新任务日志的字段 def update_task_log_inner(task_id, **kwargs): try: fields = ", ".join(f"{key} = ?" for key in kwargs.keys()) params = list(kwargs.values()) + [task_id] sql = f"UPDATE iafd_task_log SET {fields}, updated_at = datetime('now', 'localtime') WHERE task_id = ?" cursor.execute(sql, params) conn.commit() except sqlite3.Error as e: logging.error(f"更新任务 {task_id} 失败: {e}") # 更新任务日志的字段 def update_task_log(task_id, task_status): try: # 获取 performers、studios 等表的最终行数 cursor.execute("SELECT COUNT(*) FROM iafd_performers where is_full_data=1") full_data_performers = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM iafd_performers") total_performers = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM iafd_movies where is_full_data=1") full_data_movies = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM iafd_movies") total_movies = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM iafd_distributors") total_distributors = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM iafd_studios") total_studios = cursor.fetchone()[0] # 更新 task_log update_task_log_inner(task_id, full_data_performers=full_data_performers, total_performers=total_performers, full_data_movies=full_data_movies, total_movies=total_movies, total_distributors=total_distributors, total_studios=total_studios, task_status=task_status) except sqlite3.Error as e: logging.error(f"更新任务 {task_id} 失败: {e}") # 任务结束,更新字段 def finalize_task_log(task_id): try: # 更新 task_log update_task_log(task_id, task_status="Success") except sqlite3.Error as e: logging.error(f"任务 {task_id} 结束失败: {e}") def get_statics2(): result={} try: # 获取 performers、studios 等表的最终行数 cursor.execute("SELECT COUNT(*) FROM iafd_performers") result['iafd_actors'] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM iafd_performers where is_full_data=1") result['act_full'] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM iafd_movies") result['movies'] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM iafd_movies where is_full_data=1") result['mov_full'] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM iafd_distributors") result['dist'] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM iafd_studios") result['stu'] = cursor.fetchone()[0] except sqlite3.Error as e: logging.error(f"query failed: {e}") return result def get_statics(): try: # 使用单个SQL查询获取所有统计数据 cursor.execute(""" SELECT (SELECT COUNT(*) FROM iafd_performers) AS iafd_actors, (SELECT COUNT(*) FROM iafd_performers WHERE is_full_data=1) AS act_full, (SELECT COUNT(*) FROM iafd_movies) AS movies, (SELECT COUNT(*) FROM iafd_movies WHERE is_full_data=1) AS mov_full, (SELECT COUNT(*) FROM iafd_distributors) AS dist, (SELECT COUNT(*) FROM iafd_studios) AS stu """) # 获取查询结果 row = cursor.fetchone() if not row: return {} # 手动定义列名(与SQL中的AS别名保持一致) #columns = ['iafd_actors', 'act_full', 'movies', 'mov_full', 'dist', 'stu'] columns = [desc[0] for desc in cursor.description] # 将元组结果转换为字典 return dict(zip(columns, row)) except sqlite3.Error as e: logging.error(f"查询失败: {e}") return {} if __name__ == "__main__": check_and_create_stat_table() ''' try: with open('../result/detail.json', 'r') as file: performers = json.load(file) for performer in performers: insert_or_update_performer(performer) print(query_performer("Kirsten")) #delete_performer("https://www.iafd.com/person.rme/id=ca699282-1b57-4ce7-9bcc-d7799a292e34") print(query_performer_hrefs()) except FileNotFoundError: logging.info("detail.json not found, starting fresh.") '''