import sqlite3 import json import config import logging from datetime import datetime # 连接 SQLite 数据库 DB_PATH = f"{config.global_share_data_dir}/sqlite/shared.db" # 替换为你的数据库文件 conn = sqlite3.connect(DB_PATH, check_same_thread=False) cursor = conn.cursor() # """从指定表中通过 href 查找 id""" def get_id_by_href(table: str, href: str) -> int: if href is None: return None cursor.execute(f"SELECT id FROM {table} WHERE href = ?", (href,)) row = cursor.fetchone() return row[0] if row else None def insert_actor_index(name, href, from_actor_list=None, from_movie_list=None): try: # **查询是否已存在该演员** cursor.execute("SELECT id, name, from_actor_list, from_movie_list FROM javdb_actors WHERE href = ?", (href,)) existing_actor = cursor.fetchone() if existing_actor: # **如果演员已存在** actor_id, existing_name, existing_actor_list, existing_movie_list = existing_actor # **如果没有传入值,则保持原有值** from_actor_list = from_actor_list if from_actor_list is not None else existing_actor_list from_movie_list = from_movie_list if from_movie_list is not None else existing_movie_list cursor.execute(""" UPDATE javdb_actors SET name = ?, from_actor_list = ?, from_movie_list = ?, updated_at = datetime('now', 'localtime') WHERE href = ? """, (name, from_actor_list, from_movie_list, href)) else: # **如果演员不存在,插入** cursor.execute(""" INSERT INTO javdb_actors (href, name, from_actor_list, from_movie_list) VALUES (?, ?, COALESCE(?, 0), COALESCE(?, 0)) """, (href, name, from_actor_list, from_movie_list)) conn.commit() performer_id = get_id_by_href('javdb_actors', href) if performer_id: logging.debug(f'Inserted/Updated actor index, id: {performer_id}, name: {name}, href: {href}') return performer_id except sqlite3.Error as e: conn.rollback() logging.error(f"数据库错误: {e}") return None except Exception as e: conn.rollback() logging.error(f"未知错误: {e}") return None def insert_movie_index(title, href, from_actor_list=None, from_movie_makers=None, from_movie_series=None): try: # **先检查数据库中是否已有该电影** cursor.execute("SELECT id, from_actor_list, from_movie_makers, from_movie_series FROM javdb_movies WHERE href = ?", (href,)) existing_movie = cursor.fetchone() if existing_movie: # **如果电影已存在** movie_id, existing_actor, existing_maker, existing_series = existing_movie # **如果没有传入值,就用原来的值** from_actor_list = from_actor_list if from_actor_list is not None else existing_actor from_movie_makers = from_movie_makers if from_movie_makers is not None else existing_maker from_movie_series = from_movie_series if from_movie_series is not None else existing_series cursor.execute(""" UPDATE javdb_movies SET title = ?, from_actor_list = ?, from_movie_makers = ?, from_movie_series = ?, updated_at = datetime('now', 'localtime') WHERE href = ? """, (title, from_actor_list, from_movie_makers, from_movie_series, href)) else: # **如果电影不存在,插入** cursor.execute(""" INSERT INTO javdb_movies (title, href, from_actor_list, from_movie_makers, from_movie_series) VALUES (?, ?, COALESCE(?, 0), COALESCE(?, 0), COALESCE(?, 0)) """, (title, href, from_actor_list, from_movie_makers, from_movie_series)) conn.commit() movie_id = get_id_by_href('javdb_movies', href) if movie_id: logging.debug(f'Inserted/Updated movie index, id: {movie_id}, title: {title}, href: {href}') return movie_id except Exception as e: conn.rollback() logging.error(f"Error inserting/updating movie: {e}") return None # 插入演员和电影的关联数据 def insert_actor_movie(performer_id, movie_id, tags=''): try: cursor.execute(""" INSERT INTO javdb_actors_movies (actor_id, movie_id, tags) VALUES (?, ?, ?) ON CONFLICT(actor_id, movie_id) DO UPDATE SET tags=excluded.tags """, (performer_id, movie_id, tags) ) conn.commit() #logging.debug(f'insert one performer_movie, performer_id: {performer_id}, movie_id: {movie_id}') return performer_id except Exception as e: conn.rollback() logging.error("Error inserting movie: %s", e) return None # 插入演员数据 def insert_or_update_actor(actor): try: cursor.execute(''' INSERT INTO javdb_actors (name, href, pic, is_full_data, updated_at) VALUES (?, ?, ?, 1, datetime('now', 'localtime')) ON CONFLICT(href) DO UPDATE SET name=excluded.name, pic=excluded.pic, is_full_data=1, updated_at=datetime('now', 'localtime') ''', (actor['name'], actor['href'], actor['pic'])) cursor.execute('SELECT id FROM javdb_actors WHERE href = ?', (actor['href'],)) conn.commit() actor_id = get_id_by_href('javdb_actors', actor['href']) if actor_id is None: logging.warning(f'insert data error. name: {actor['name']}, href: {actor['href']}') return None logging.debug(f'insert one actor, id: {actor_id}, name: {actor['name']}, href: {actor['href']}') # 插入别名 for alias in actor.get("alias") or []: cursor.execute(''' INSERT OR IGNORE INTO javdb_actors_alias (actor_id, alias, updated_at) VALUES (?, ?, datetime('now', 'localtime')) ''', (actor_id, alias)) conn.commit() # 插入影片列表 for movie in actor.get("credits") or []: movie_id = get_id_by_href('javdb_movies', movie['href']) # 影片不存在,先插入 if movie_id is None: movie_id = insert_movie_index(movie['title'], movie['href'], from_actor_list=1) if movie_id: tmp_id = insert_actor_movie(actor_id, movie_id) if tmp_id : logging.debug(f'insert one performer_movie, performer_id: {actor_id}, movie_id: {movie_id}') else: logging.warning(f'insert performer_movie failed. performer_id: {actor_id}, moive href: {movie['href']}') return actor_id except Exception as e: logging.error(f"插入/更新演员 {actor['name']} 失败: {e}") conn.rollback() # 删除演员 def delete_actor_by_href(href): try: cursor.execute('DELETE FROM javdb_actors WHERE href = ?', (href,)) conn.commit() logging.info(f"成功删除演员: {href}") except Exception as e: logging.error(f"删除演员 {href} 失败: {e}") conn.rollback() # 查询 def query_actors(**filters): try: sql = "SELECT href, name FROM javdb_actors WHERE 1=1" params = [] if "id" in filters: sql += " AND id = ?" params.append(filters["id"]) if "href" in filters: sql += " AND href = ?" params.append(filters["href"]) if "name" in filters: sql += " AND name LIKE ?" params.append(f"%{filters['name']}%") if "is_full_data" in filters: sql += " AND is_full_data = ?" params.append(filters["is_full_data"]) if 'limit' in filters: sql += " limit ?" params.append(filters["limit"]) cursor.execute(sql, params) #return [row[0].lower() for row in cursor.fetchall()] # 返回小写 return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()] except sqlite3.Error as e: logging.error(f"查询 href 失败: {e}") return None # 插入或更新发行商 """ def insert_or_update_makers(data): try: cursor.execute(""" INSERT INTO javdb_makers (name, href, updated_at) VALUES (?, ? , datetime('now', 'localtime')) ON CONFLICT(href) DO UPDATE SET name = excluded.name, updated_at = datetime('now', 'localtime') """, (data["name"], data["href"])) conn.commit() # 获取 performer_id cursor.execute("SELECT id FROM javdb_makers WHERE href = ?", (data["href"],)) dist_id = cursor.fetchone()[0] if dist_id: logging.debug(f"成功插入/更新发行商: {data['name']}") return dist_id else: return None except sqlite3.Error as e: conn.rollback() logging.error(f"数据库错误: {e}") return None # 删除发行商(按 id 或 name) """ def delete_maker(identifier): try: if isinstance(identifier, int): cursor.execute("DELETE FROM javdb_makers WHERE id = ?", (identifier,)) elif isinstance(identifier, str): cursor.execute("DELETE FROM javdb_makers WHERE name = ?", (identifier,)) conn.commit() logging.info(f"成功删除发行商: {identifier}") except sqlite3.Error as e: conn.rollback() logging.error(f"删除失败: {e}") # 查询发行商(按 id 或 name) """ def query_maker(identifier): try: if isinstance(identifier, int): cursor.execute("SELECT * FROM javdb_makers WHERE id = ?", (identifier,)) else: cursor.execute("SELECT * FROM javdb_makers WHERE name LIKE ?", (f"%{identifier}%",)) distributor = cursor.fetchone() if distributor: return dict(zip([desc[0] for desc in cursor.description], distributor)) else: logging.warning(f"未找到发行商: {identifier}") return None except sqlite3.Error as e: logging.error(f"查询失败: {e}") return None # 按条件查询 href 列表 def query_maker_hrefs(**filters): try: sql = "SELECT href FROM javdb_makers WHERE 1=1" params = [] if "id" in filters: sql += " AND id = ?" params.append(filters["id"]) if "url" in filters: sql += " AND href = ?" params.append(filters["href"]) if "name" in filters: sql += " AND name LIKE ?" params.append(f"%{filters['name']}%") cursor.execute(sql, params) return [row[0] for row in cursor.fetchall()] # 链接使用小写 except sqlite3.Error as e: logging.error(f"查询 href 失败: {e}") return None # """ 插入或更新制作公司 """ def insert_or_update_series(data): try: cursor.execute(""" INSERT INTO javdb_series (name, href, updated_at) VALUES (?, ?, datetime('now', 'localtime')) ON CONFLICT(href) DO UPDATE SET name = excluded.name, updated_at = datetime('now', 'localtime') """, (data["name"], data["href"])) conn.commit() # 获取 performer_id cursor.execute("SELECT id FROM javdb_series WHERE href = ?", (data["href"],)) stu_id = cursor.fetchone()[0] if stu_id: logging.debug(f"成功插入/更新发行商: {data['name']}") return stu_id else: return None except sqlite3.Error as e: conn.rollback() logging.error(f"数据库错误: {e}") return None # """ 删除制作公司(按 id 或 name) """ def delete_series(identifier): try: if isinstance(identifier, int): cursor.execute("DELETE FROM javdb_series WHERE id = ?", (identifier,)) elif isinstance(identifier, str): cursor.execute("DELETE FROM javdb_series WHERE name = ?", (identifier,)) conn.commit() logging.info(f"成功删除制作公司: {identifier}") except sqlite3.Error as e: conn.rollback() logging.error(f"删除失败: {e}") # """ 查询制作公司(按 id 或 name) """ def query_series(identifier): try: if isinstance(identifier, int): cursor.execute("SELECT * FROM javdb_series WHERE id = ?", (identifier,)) else: cursor.execute("SELECT * FROM javdb_series WHERE name LIKE ?", (f"%{identifier}%",)) studio = cursor.fetchone() if studio: return dict(zip([desc[0] for desc in cursor.description], studio)) else: logging.warning(f"未找到制作公司: {identifier}") return None except sqlite3.Error as e: logging.error(f"查询失败: {e}") return None # 按条件查询 href 列表 def query_series_hrefs(**filters): try: sql = "SELECT href FROM javdb_series WHERE 1=1" params = [] if "id" in filters: sql += " AND id = ?" params.append(filters["id"]) if "href" in filters: sql += " AND href = ?" params.append(filters["href"]) if "name" in filters: sql += " AND name LIKE ?" params.append(f"%{filters['name']}%") cursor.execute(sql, params) return [row[0] for row in cursor.fetchall()] # 链接使用小写 except sqlite3.Error as e: logging.error(f"查询 href 失败: {e}") return None # """插入或更新电影数据""" def insert_or_update_movie(movie): try: # 获取相关 ID makers_id = get_id_by_href('javdb_makers', movie['maker_link']) series_id = get_id_by_href('javdb_series', movie['series_link']) cursor.execute(""" INSERT INTO javdb_movies (href, title, cover_url, serial_number, release_date, duration, maker_id, series_id, is_full_data, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, datetime('now', 'localtime')) ON CONFLICT(href) DO UPDATE SET title=excluded.title, cover_url=excluded.cover_url, serial_number=excluded.serial_number, release_date=excluded.release_date, duration=excluded.duration, maker_id=excluded.maker_id, series_id=excluded.series_id, is_full_data=1, updated_at=datetime('now', 'localtime') """, (movie['href'], movie['title'], movie['cover_url'], movie['serial_number'], movie['release_date'], movie['duration'], makers_id, series_id)) conn.commit() # 获取插入的 movie_id movie_id = get_id_by_href('javdb_movies', movie['href']) if movie_id is None: return None logging.debug(f'insert one move, id: {movie_id}, title: {movie['title']}, href: {movie['href']}') # 插入 performers_movies 关系表 for performer in movie.get('actors', []): performer_id = get_id_by_href('javdb_actors', performer['href']) # 如果演员不存在,先插入 if performer_id is None: performer_id = insert_actor_index(performer['name'], performer['href'], from_movie_list=1) if performer_id: tmp_id = insert_actor_movie(performer_id, movie_id) if tmp_id: logging.debug(f"insert one perfomer_movie. perfomer_id: {performer_id}, movie_id:{movie_id}") else: logging.debug(f'insert perfomer_movie failed. perfomer_id: {performer_id}, movie_id:{movie_id}') else: logging.warning(f'insert perfomer failed. name: {performer['name']}, href: {performer['href']}') return movie_id except Exception as e: conn.rollback() logging.error("Error inserting movie: %s", e) return None # """插入或更新电影数据(异常url的处理,比如404链接)""" def insert_or_update_movie_404(title, href, is_full_data=1): try: # 插入或更新电影信息 cursor.execute( """ INSERT INTO javdb_movies (title, href, is_full_data, updated_at) VALUES (?, ?, ?, datetime('now', 'localtime')) ON CONFLICT(href) DO UPDATE SET title=excluded.title, is_full_data=excluded.is_full_data, updated_at = datetime('now', 'localtime') """, (title, href, is_full_data) ) conn.commit() # 获取插入的 movie_id movie_id = get_id_by_href('javdb_movies', href) if movie_id is None: return None return movie_id except Exception as e: conn.rollback() logging.error("Error inserting movie: %s", e) return None # 删除电影数据""" def delete_movie(identifier): try: if isinstance(identifier, int): cursor.execute("DELETE FROM javdb_movies WHERE id = ?", (identifier,)) elif isinstance(identifier, str): cursor.execute("DELETE FROM javdb_movies WHERE href = ?", (identifier,)) else: logging.warning("无效的删除参数") return conn.commit() logging.info(f"Deleted movie with {identifier}") except sqlite3.Error as e: conn.rollback() logging.error("Error deleting movie: %s", e) # 查找电影数据""" def query_movies(identifier): try: if isinstance(identifier, int): cursor.execute("SELECT * FROM javdb_movies WHERE id = ?", (identifier,)) elif "http" in identifier: cursor.execute("SELECT * FROM javdb_movies WHERE href = ?", (identifier,)) else: cursor.execute("SELECT * FROM javdb_movies WHERE title LIKE ?", (f"%{identifier}%",)) movie = cursor.fetchone() if movie: cursor.execute("SELECT * FROM javdb_actors_movies WHERE performer_id = ?", (movie[0],)) performers = [row[0] for row in cursor.fetchall()] result = dict(zip([desc[0] for desc in cursor.description], performers)) result["performers"] = performers return result else: logging.warning(f"find no data: {identifier}") return None except sqlite3.Error as e: logging.error(f"查询失败: {e}") return None # 按条件查询 href 列表 def query_movie_hrefs(**filters): try: sql = "SELECT href, title FROM javdb_movies WHERE 1=1" params = [] if "id" in filters: sql += " AND id = ?" params.append(filters["id"]) if "href" in filters: sql += " AND href = ?" params.append(filters["href"]) if "title" in filters: sql += " AND title LIKE ?" params.append(f"%{filters['title']}%") if "is_full_data" in filters: sql += " AND is_full_data = ?" params.append(filters["is_full_data"]) if 'limit' in filters: sql += " limit ?" params.append(filters["limit"]) cursor.execute(sql, params) #return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写 return [{'href': row[0], 'title': row[1]} for row in cursor.fetchall()] except sqlite3.Error as e: logging.error(f"查询 href 失败: {e}") return [] # 插入一条任务日志 def insert_task_log(): try: cursor.execute(""" INSERT INTO javdb_task_log (task_status) VALUES ('Start') """) conn.commit() task_id = cursor.lastrowid if task_id is None: return None update_task_log(task_id=task_id, task_status='Start') return task_id # 获取插入的 task_id except sqlite3.Error as e: logging.error(f"插入任务失败: {e}") return None # 更新任务日志的字段 def update_task_log_inner(task_id, **kwargs): try: fields = ", ".join(f"{key} = ?" for key in kwargs.keys()) params = list(kwargs.values()) + [task_id] sql = f"UPDATE javdb_task_log SET {fields}, updated_at = datetime('now', 'localtime') WHERE task_id = ?" cursor.execute(sql, params) conn.commit() except sqlite3.Error as e: logging.error(f"更新任务 {task_id} 失败: {e}") # 更新任务日志的字段 def update_task_log(task_id, task_status): try: # 获取 performers、studios 等表的最终行数 cursor.execute("SELECT COUNT(*) FROM javdb_actors where is_full_data=1") full_data_actors = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM javdb_actors") total_actors = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM javdb_movies where is_full_data=1") full_data_movies = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM javdb_movies") total_movies = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM javdb_makers") total_makers = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM javdb_series") total_series = cursor.fetchone()[0] # 更新 task_log update_task_log_inner(task_id, full_data_actors=full_data_actors, total_actors=total_actors, full_data_movies=full_data_movies, total_movies=total_movies, total_makers=total_makers, total_series=total_series, task_status=task_status) except sqlite3.Error as e: logging.error(f"更新任务 {task_id} 失败: {e}") # 任务结束,更新字段 def finalize_task_log(task_id): try: # 更新 task_log update_task_log(task_id, task_status="Success") except sqlite3.Error as e: logging.error(f"任务 {task_id} 结束失败: {e}") # 测试代码 if __name__ == "__main__": sample_data = [ { 'name': '上原亜衣', 'href': 'https://www.javdb.com/actors/MkAX', 'pic': 'https://c0.jdbstatic.com/avatars/mk/MkAX.jpg', 'alias': ['上原亜衣', '下原舞', '早瀬クリスタル', '阿蘇山百式屏風奉行'] }, { 'name': '大橋未久', 'href': 'https://www.javdb.com/actors/21Jp', 'pic': 'https://c0.jdbstatic.com/avatars/21/21Jp.jpg', 'alias': ['大橋未久'] }, ] for actor in sample_data: insert_or_update_actor(actor) print(query_actors("name LIKE '%未久%'")) #delete_actor_by_href('https://www.javdb.com/actors/MkAX') print(query_actors())