import sqlite3 import logging import os from datetime import datetime import src.config.config as config default_dbpath = f"{config.global_share_data_dir}/sqlite/shared.db" # 数据库基类,封装了通用的操作。 class DatabaseHandler: def __init__(self, db_path=None): # 使用传入的 db_path 或默认路径 self.DB_PATH = db_path or default_dbpath # 验证路径是否存在(可选) if db_path and not os.path.exists(os.path.dirname(db_path)): os.makedirs(os.path.dirname(db_path)) self.conn = sqlite3.connect(self.DB_PATH, check_same_thread=False) self.cursor = self.conn.cursor() # 检查 SQLite 版本 self.lower_sqlite_version = False sqlite_version = sqlite3.sqlite_version_info if sqlite_version < (3, 24, 0): self.lower_sqlite_version = True def get_table_columns_and_defaults(self, tbl_name): try: self.cursor.execute(f"PRAGMA table_info({tbl_name})") columns = self.cursor.fetchall() column_info = {} for col in columns: col_name = col[1] default_value = col[4] column_info[col_name] = default_value return column_info except sqlite3.Error as e: logging.error(f"Error getting table columns: {e}") return None def check_and_process_data(self, data, tbl_name): column_info = self.get_table_columns_and_defaults(tbl_name) if column_info is None: return None processed_data = {} for col, default in column_info.items(): if col == 'id' or col == 'created_at': # 自增主键,不需要用户提供; 创建日期,使用建表默认值 continue if col == 'updated_at': # 日期函数,用户自己指定即可 processed_data[col] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if col in data: processed_data[col] = data[col] return processed_data def insert_or_update_common(self, data, tbl_name, uniq_key='url'): if self.lower_sqlite_version: return self.insert_or_update_common_lower(data, tbl_name, uniq_key) try: processed_data = self.check_and_process_data(data, tbl_name) if processed_data is None: return None columns = ', '.join(processed_data.keys()) values = list(processed_data.values()) placeholders = ', '.join(['?' for _ in values]) update_clause = ', '.join([f"{col}=EXCLUDED.{col}" for col in processed_data.keys() if col != uniq_key]) sql = f''' INSERT INTO {tbl_name} ({columns}) VALUES ({placeholders}) ON CONFLICT ({uniq_key}) DO UPDATE SET {update_clause} ''' self.cursor.execute(sql, values) self.conn.commit() # 获取插入或更新后的 report_id self.cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],)) report_id = self.cursor.fetchone()[0] return report_id except sqlite3.Error as e: logging.error(f"Error inserting or updating data: {e}") return None def insert_or_update_common_lower(self, data, tbl_name, uniq_key='url'): try: processed_data = self.check_and_process_data(data, tbl_name) if processed_data is None: return None columns = ', '.join(processed_data.keys()) values = list(processed_data.values()) placeholders = ', '.join(['?' for _ in values]) # 先尝试插入数据 try: sql = f''' INSERT INTO {tbl_name} ({columns}) VALUES ({placeholders}) ''' self.cursor.execute(sql, values) self.conn.commit() except sqlite3.IntegrityError: # 唯一键冲突,执行更新操作 update_clause = ', '.join([f"{col}=?" for col in processed_data.keys() if col != uniq_key]) update_values = [processed_data[col] for col in processed_data.keys() if col != uniq_key] update_values.append(data[uniq_key]) sql = f"UPDATE {tbl_name} SET {update_clause} WHERE {uniq_key} = ?" self.cursor.execute(sql, update_values) self.conn.commit() # 获取插入或更新后的 report_id self.cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],)) report_id = self.cursor.fetchone()[0] return report_id except sqlite3.Error as e: logging.error(f"Error inserting or updating data: {e}") return None def get_id_by_key(self, tbl, uniq_key, val): self.cursor.execute(f"SELECT id FROM {tbl} WHERE {uniq_key} = ?", (val,)) row = self.cursor.fetchone() return row[0] if row else None def insert_task_log(self): return 1 def update_task_log(self, task_id, task_status): return 1 def finalize_task_log(self, task_id): return 1 def get_statics(self): return {} def close(self): self.cursor.close() self.conn.close() # javbus 类 class JavbusDBHandler(DatabaseHandler): def __init__(self, db_path=None): super().__init__(db_path) self.tbl_name_actors = 'javbus_actors' self.tbl_name_movies = 'javbus_movies' self.tbl_name_studios = 'javbus_studios' self.tbl_name_labels = 'javbus_labels' self.tbl_name_series = 'javbus_series' self.tbl_name_tags = 'javbus_tags' self.tbl_name_movie_tags = 'javbus_movies_tags' self.tbl_name_actor_movie = 'javbus_actors_movies' def insert_actor_index(self, data, **kwargs): fields = ['uncensored', 'from_actor_list', 'from_movie_list'] # 如果没有传入值,就用原来的值 for field in fields: if kwargs.get(field) is not None: data[field] = kwargs.get(field) try: return self.insert_or_update_common(data, self.tbl_name_actors, uniq_key='href') except sqlite3.Error as e: logging.error(f"Error inserting or updating data: {e}") return None def insert_movie_index(self, data, **kwargs): fields = [ 'uncensored', 'from_actor_list', 'from_movie_studios', 'from_movie_labels', 'from_movie_series', 'studio_id', 'label_id', 'series_id' ] # 如果没有传入值,就用原来的值 for field in fields: if kwargs.get(field) is not None: data[field] = kwargs.get(field) try: return self.insert_or_update_common(data, self.tbl_name_movies, uniq_key='href') except sqlite3.Error as e: logging.error(f"Error inserting or updating data: {e}") return None # 插入演员和电影的关联数据 def insert_actor_movie(self, performer_id, movie_id, tags=''): if self.lower_sqlite_version: return self.insert_actor_movie_lower(performer_id, movie_id, tags) try: self.cursor.execute(""" INSERT INTO javbus_actors_movies (actor_id, movie_id, tags, updated_at) VALUES (?, ?, ?, datetime('now', 'localtime')) ON CONFLICT(actor_id, movie_id) DO UPDATE SET tags=excluded.tags, updated_at=datetime('now', 'localtime') """, (performer_id, movie_id, tags) ) self.conn.commit() #logging.debug(f'insert one performer_movie, performer_id: {performer_id}, movie_id: {movie_id}') return performer_id except Exception as e: self.conn.rollback() logging.error("Error inserting movie: %s", e) return None def insert_actor_movie_lower(self, performer_id, movie_id, tags=''): try: # 先尝试插入数据 self.cursor.execute(""" INSERT INTO javbus_actors_movies (actor_id, movie_id, tags, updated_at) VALUES (?, ?, ?, datetime('now', 'localtime')) """, (performer_id, movie_id, tags) ) self.conn.commit() return performer_id except sqlite3.IntegrityError: # 捕获唯一约束冲突错误 try: # 如果冲突发生,执行更新操作 self.cursor.execute(""" UPDATE javbus_actors_movies SET tags=?, updated_at=datetime('now', 'localtime') WHERE actor_id=? AND movie_id=? """, (tags, performer_id, movie_id) ) self.conn.commit() return performer_id except Exception as e: self.conn.rollback() logging.error("Error updating actor_movie: %s", e) return None except Exception as e: self.conn.rollback() logging.error("Error inserting actor_movie: %s", e) return None def update_actor_detail_404(self, data, is_full_data=1): try: data['is_full_data'] = is_full_data return self.insert_or_update_common(data, self.tbl_name_actors, uniq_key='href') except sqlite3.Error as e: logging.error(f"Error inserting or updating data: {e}") return None def update_actor_detail(self, data, is_full_data=1): try: # 跟新actor表 avatar = data.get('avatar', {}) avatar['href'] = data['href'] avatar['is_full_data'] = is_full_data avatar_id = self.insert_or_update_common(avatar, self.tbl_name_actors, uniq_key='href') if not avatar_id: logging.warning(f"get actor id error. href: {data['href']}") return None else: logging.debug(f"update actor data. href: {data['href']} avatar: {avatar}") # 更新movies表 uncensored = data.get('uncensored', 0) for movie in data.get('credits', []): movie_id = self.insert_movie_index(movie, from_actor_list=1, uncensored=uncensored) if movie_id: logging.debug(f"insert one movie index. data: {movie}") # 插入关系表 link_id = self.insert_actor_movie(avatar_id, movie_id) if link_id: logging.debug(f"insert one actor_movie record. actor id: {avatar_id}, movie id: {movie_id}") return avatar_id except sqlite3.Error as e: logging.error(f"Error inserting or updating data: {e}") return None def query_actors(self, **filters): try: sql = f"SELECT href, en_name as name, uncensored FROM {self.tbl_name_actors} WHERE 1=1" params = [] conditions = { "id": " AND id = ?", "href": " AND href = ?", "en_name": " AND en_name LIKE ?", "is_full_data": " AND is_full_data = ?", "start_id": " AND id > ?", "uncensored": " AND uncensored = ?", } for key, condition in conditions.items(): if key in filters: sql += condition if key == "en_name": params.append(f"%{filters[key]}%") else: params.append(filters[key]) for key in ["is_full_data_in", "is_full_data_not_in"]: if key in filters: values = filters[key] if values: placeholders = ", ".join(["?"] * len(values)) operator = "IN" if key == "is_full_data_in" else "NOT IN" sql += f" AND is_full_data {operator} ({placeholders})" params.extend(values) if "order_by" in filters: # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 sql += f" ORDER BY {filters['order_by']} " if 'limit' in filters: sql += " LIMIT ?" params.append(filters["limit"]) self.cursor.execute(sql, params) return [{'href': row[0], 'name': row[1], 'uncensored': row[2]} for row in self.cursor.fetchall()] except sqlite3.Error as e: logging.error(f"查询 href 失败: {e}") return None def query_movies(self, **filters): try: sql = f"SELECT href, title, uncensored, id FROM {self.tbl_name_movies} WHERE 1=1" params = [] conditions = { "id": " AND id = ?", "href": " AND href = ?", "title": " AND title LIKE ?", "is_full_data": " AND is_full_data = ?", "start_id": " AND id > ?", "uncensored": " AND uncensored = ?", } for key, condition in conditions.items(): if key in filters: sql += condition if key == "title": params.append(f"%{filters[key]}%") else: params.append(filters[key]) for key in ["is_full_data_in", "is_full_data_not_in"]: if key in filters: values = filters[key] if values: placeholders = ", ".join(["?"] * len(values)) operator = "IN" if key == "is_full_data_in" else "NOT IN" sql += f" AND is_full_data {operator} ({placeholders})" params.extend(values) if "order_by" in filters: # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 sql += f" ORDER BY {filters['order_by']} " if 'limit' in filters: sql += " LIMIT ?" params.append(filters["limit"]) self.cursor.execute(sql, params) return [{'href': row[0], 'title': row[1], 'uncensored': row[2], 'id':row[3]} for row in self.cursor.fetchall()] except sqlite3.Error as e: logging.error(f"查询 href 失败: {e}") return None # 检查记录是否存在,不存在就插入 def check_and_get_id(self, item, uncensored, tbl, uniq_key='href'): name = item['name'] href = item['href'] row_id = self.get_id_by_key(tbl, uniq_key, href) if row_id is None: row_id = self.insert_or_update_common({'name':name, 'href': href, 'uncensored':uncensored, 'from_movie_list':1}, tbl_name=tbl, uniq_key=uniq_key) return row_id def insert_or_update_tags(self, data, uniq_key='href'): return self.insert_or_update_common(data, self.tbl_name_tags, uniq_key) def insert_movie_tags(self, movie_id, tag_id, tags): if self.lower_sqlite_version: return self.insert_movie_tags_lower(movie_id, tag_id, tags) try: self.cursor.execute(""" INSERT INTO javbus_movies_tags (movie_id, tag_id, tags, updated_at) VALUES (?, ?, ?, datetime('now', 'localtime')) ON CONFLICT(tag_id, movie_id) DO UPDATE SET tags=excluded.tags, updated_at=datetime('now', 'localtime') """, (movie_id, tag_id, tags) ) self.conn.commit() #logging.debug(f'insert one performer_movie, performer_id: {performer_id}, movie_id: {movie_id}') return movie_id except Exception as e: self.conn.rollback() logging.error("Error inserting movie: %s", e) return None def insert_movie_tags_lower(self, movie_id, tag_id, tags): try: # 先尝试插入数据 self.cursor.execute(""" INSERT INTO javbus_movies_tags (movie_id, tag_id, tags, updated_at) VALUES (?, ?, ?, datetime('now', 'localtime')) """, (movie_id, tag_id, tags) ) self.conn.commit() return movie_id except sqlite3.IntegrityError: # 捕获唯一约束冲突错误 try: # 如果冲突发生,执行更新操作 self.cursor.execute(""" UPDATE javbus_movies_tags SET tags=?, updated_at=datetime('now', 'localtime') WHERE tag_id=? AND movie_id=? """, (tags, tag_id, movie_id) ) self.conn.commit() return movie_id except Exception as e: self.conn.rollback() logging.error("Error updating movie_tags: %s", e) return None except Exception as e: self.conn.rollback() logging.error("Error inserting movie_tags: %s", e) return None def insert_or_update_movie_404(self, data, is_full_data=1): try: data['is_full_data'] = is_full_data return self.insert_or_update_common(data, self.tbl_name_movies, uniq_key='href') except sqlite3.Error as e: logging.error(f"Error inserting or updating data: {e}") return None # """插入或更新电影数据""" def insert_or_update_movie(self, movie, is_full_data=1): try: # 获取相关 ID studio_id = self.check_and_get_id(movie.get('studio'), movie.get('uncensored', 0), self.tbl_name_studios) if movie.get('studio') is not None else None label_id = self.check_and_get_id(movie.get('label'), movie.get('uncensored', 0), self.tbl_name_labels) if movie.get('label') is not None else None series_id = self.check_and_get_id(movie.get('series'), movie.get('uncensored', 0), self.tbl_name_series) if movie.get('series') is not None else None if studio_id: movie['studio_id'] = studio_id if label_id: movie['label_id'] = label_id if series_id: movie['series_id'] = series_id movie['is_full_data'] = is_full_data movie['actors_cnt'] = len(movie.get('actors', [])) movie_id = self.insert_or_update_common(movie, self.tbl_name_movies, uniq_key='href') if movie_id is None: logging.warning(f"insert/update movie error. data:{movie}") return None logging.debug(f"insert one move, id: {movie_id}, title: {movie['title']}, href: {movie['href']}") # 插入 performers_movies 关系表 uncensored = movie.get('uncensored', 0) for performer in movie.get('actors', []): performer_id = self.get_id_by_key(self.tbl_name_actors, 'href', performer['href']) # 如果演员不存在,先插入 if performer_id is None: performer_id = self.insert_actor_index({'zh_name': performer['name'], 'href':performer['href']}, uncensored=uncensored, from_movie_list=1) logging.debug(f"insert new perfomer. perfomer_id: {performer_id}, name:{performer['name']}") if performer_id: tmp_id = self.insert_actor_movie(performer_id, movie_id) if tmp_id: logging.debug(f"insert one perfomer_movie. perfomer_id: {performer_id}, movie_id:{movie_id}") else: logging.debug(f"insert perfomer_movie failed. perfomer_id: {performer_id}, movie_id:{movie_id}") else: logging.warning(f"insert perfomer failed. name: {performer['name']}, href: {performer['href']}") # 插入 tags 表 for tag in movie.get('tags', []): tag_name = tag.get('name', '') tag_href = tag.get('href', '') tag_id = self.insert_or_update_tags({'name':tag_name, 'href':tag_href}, uniq_key='href') if tag_id: logging.debug(f"insert one tags. tag_id: {tag_id}, name: {tag_name}") tmp_id = self.insert_movie_tags(movie_id=movie_id, tag_id=tag_id, tags=tag_name) if tmp_id: logging.debug(f"insert one movie_tag. movie_id: {movie_id}, tag_id: {tag_id}, name: {tag_name}") else: logging.warning(f"insert one movie_tag error. movie_id: {movie_id}, tag_id: {tag_id}, name: {tag_name}") else: logging.warning(f"insert tags error. name:{tag_name}, href: {tag_href}") return movie_id except Exception as e: self.conn.rollback() logging.error("Error inserting movie: %s", e) return None # 更新 studio / label / series 等的多语言 def update_pubs_multilang(self, data, tbl, **filters): tbls = {'studio': self.tbl_name_studios, 'label':self.tbl_name_labels, 'series':self.tbl_name_series} if not tbls.get(tbl): logging.warning(f"wrong table. table: {tbl}") return None return self.insert_or_update_common(data=data, tbl_name=tbls[tbl], uniq_key='href') def query_list_common(self, tbl, **filters): tbls = {'studio': self.tbl_name_studios, 'label':self.tbl_name_labels, 'series':self.tbl_name_series} if not tbls.get(tbl): logging.warning(f"wrong table. table: {tbl}") return None try: sql = f"SELECT href, name, uncensored, id FROM {tbls[tbl]} WHERE 1=1" params = [] conditions = { "id": " AND id = ?", "href": " AND href = ?", "name": " AND name LIKE ?", "start_id": " AND id > ?", "uncensored": " AND uncensored = ?", } for key, condition in conditions.items(): if key in filters: sql += condition if key == "name": params.append(f"%{filters[key]}%") else: params.append(filters[key]) if "order_by" in filters: # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 sql += f" ORDER BY {filters['order_by']} " if 'limit' in filters: sql += " LIMIT ?" params.append(filters["limit"]) self.cursor.execute(sql, params) return [{'href': row[0], 'name': row[1], 'uncensored': row[2], 'id':row[3]} for row in self.cursor.fetchall()] except sqlite3.Error as e: logging.error(f"查询 href 失败: {e}") return None def update_tags(self, data): return self.insert_or_update_common(data, self.tbl_name_tags, uniq_key='href') def query_tags(self, **filters): try: sql = f"SELECT href, name, id FROM {self.tbl_name_tags} WHERE 1=1" params = [] conditions = { "id": " AND id = ?", "href": " AND href = ?", "name": " AND name LIKE ?", "start_id": " AND id > ?", } for key, condition in conditions.items(): if key in filters: sql += condition if key == "name": params.append(f"%{filters[key]}%") else: params.append(filters[key]) if "order_by" in filters: # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 sql += f" ORDER BY {filters['order_by']} " if 'limit' in filters: sql += " LIMIT ?" params.append(filters["limit"]) self.cursor.execute(sql, params) return [{'href': row[0], 'name': row[1], 'id': row[2]} for row in self.cursor.fetchall()] except sqlite3.Error as e: logging.error(f"查询 href 失败: {e}") return None # 查询状态 def get_statics(self): result = {} try: # 获取 performers、studios 等表的最终行数 self.cursor.execute(f"SELECT COUNT(*) FROM {self.tbl_name_actors} ") result['actors'] = self.cursor.fetchone()[0] self.cursor.execute(f"SELECT COUNT(*) FROM {self.tbl_name_actors} where uncensored=1") result['act_un'] = self.cursor.fetchone()[0] self.cursor.execute(f"SELECT COUNT(*) FROM {self.tbl_name_actors} where is_full_data=1") result['act_full'] = self.cursor.fetchone()[0] self.cursor.execute(f"SELECT COUNT(*) FROM {self.tbl_name_actors} where uncensored=1 and is_full_data=1") result['act_unc_full'] = self.cursor.fetchone()[0] # 获取 performers、studios 等表的最终行数 self.cursor.execute(f"SELECT COUNT(*) FROM {self.tbl_name_movies} ") result['movies'] = self.cursor.fetchone()[0] self.cursor.execute(f"SELECT COUNT(*) FROM {self.tbl_name_movies} where uncensored=1") result['mov_un'] = self.cursor.fetchone()[0] self.cursor.execute(f"SELECT COUNT(*) FROM {self.tbl_name_movies} where is_full_data=1") result['mov_full'] = self.cursor.fetchone()[0] self.cursor.execute(f"SELECT COUNT(*) FROM {self.tbl_name_movies} where uncensored=1 and is_full_data=1") result['mov_un_full'] = self.cursor.fetchone()[0] except sqlite3.Error as e: logging.error(f"query error: {e}") return result # 处理影片的 无码 字段 def reset_movies_uncensored(self): try: logging.info("创建临时表以便于保存待更新记录") self.cursor.execute(""" CREATE TEMPORARY TABLE IF NOT EXISTS temp_movies_to_update ( movie_id INTEGER PRIMARY KEY ) """) # 清空临时表(以防之前有残留数据) self.cursor.execute("DELETE FROM temp_movies_to_update") logging.info(f"开始收集需要更新的影片ID...") # 使用单个SQL语句完成所有条件的查询和插入 self.cursor.execute(""" INSERT OR IGNORE INTO temp_movies_to_update (movie_id) SELECT DISTINCT m.id FROM javbus_movies m -- 连接演员表 LEFT JOIN javbus_actors_movies am ON m.id = am.movie_id LEFT JOIN javbus_actors a ON am.actor_id = a.id -- 连接标签/系列/工作室表 LEFT JOIN javbus_labels l ON m.label_id = l.id LEFT JOIN javbus_series s ON m.series_id = s.id LEFT JOIN javbus_studios st ON m.studio_id = st.id -- 筛选条件:任一表的href包含'uncensored' WHERE a.href LIKE '%uncensored%' OR l.href LIKE '%uncensored%' OR s.href LIKE '%uncensored%' OR st.href LIKE '%uncensored%' """) total_count = self.cursor.execute("SELECT COUNT(*) FROM temp_movies_to_update").fetchone()[0] total_movies = self.cursor.execute("SELECT COUNT(*) FROM javbus_movies").fetchone()[0] logging.info(f"共收集到 {total_count} 部需要更新的影片, 共有 {total_movies} 部影片") # 1. 将所有记录的uncensored默认设为0 logging.info("开始将所有影片的uncensored设为默认值0...") self.cursor.execute("UPDATE javbus_movies SET uncensored = 0") logging.info(f"已将 {self.cursor.rowcount} 条记录的uncensored设为0") # 2. 将临时表中匹配的记录设为1 logging.info("开始将匹配的影片的uncensored设为1...") self.cursor.execute(""" UPDATE javbus_movies SET uncensored = 1 WHERE id IN (SELECT movie_id FROM temp_movies_to_update) """) logging.info(f"已将 {self.cursor.rowcount} 条记录的uncensored设为1") # 3. 清理临时表,也可以不清理,以便于抽检 self.conn.commit() logging.info("所有更新已提交") except sqlite3.Error as e: self.conn.rollback() logging.error("Error inserting movie: %s", e) logging.error(f"query error: {e}")