import sqlite3 import json import config import logging from datetime import datetime # 连接 SQLite 数据库 DB_PATH = f"{config.global_share_data_dir}/sqlite/shared.db" # 替换为你的数据库文件 conn = sqlite3.connect(DB_PATH, check_same_thread=False) cursor = conn.cursor() tbl_name_actors = 'javhd_models' # 检查 SQLite 版本 lower_sqlite_version = False sqlite_version = sqlite3.sqlite_version_info if sqlite_version < (3, 24, 0): lower_sqlite_version = True # 获取表的列名和默认值 def get_table_columns_and_defaults(tbl_name): try: cursor.execute(f"PRAGMA table_info({tbl_name})") columns = cursor.fetchall() column_info = {} for col in columns: col_name = col[1] default_value = col[4] column_info[col_name] = default_value return column_info except sqlite3.Error as e: logging.error(f"Error getting table columns: {e}") return None # 检查并处理数据 def check_and_process_data(data, tbl_name): column_info = get_table_columns_and_defaults(tbl_name=tbl_name) if column_info is None: return None processed_data = {} for col, default in column_info.items(): if col == 'id': # 自增主键,不需要用户提供 continue if col == 'created_at' or col == 'updated_at': # 日期函数,用户自己指定即可 continue if col in data: processed_data[col] = data[col] return processed_data # 插入或更新数据 def insert_or_update_common(data, tbl_name, uniq_key='url'): if lower_sqlite_version: return insert_or_update_common_lower(data, tbl_name, uniq_key) try: processed_data = check_and_process_data(data, tbl_name) if processed_data is None: return None columns = ', '.join(processed_data.keys()) values = list(processed_data.values()) placeholders = ', '.join(['?' for _ in values]) update_clause = ', '.join([f"{col}=EXCLUDED.{col}" for col in processed_data.keys() if col != uniq_key]) + ', updated_at=datetime(\'now\', \'localtime\')' sql = f''' INSERT INTO {tbl_name} ({columns}, updated_at) VALUES ({placeholders}, datetime('now', 'localtime')) ON CONFLICT ({uniq_key}) DO UPDATE SET {update_clause} ''' cursor.execute(sql, values) conn.commit() # 获取插入或更新后的 report_id cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],)) report_id = cursor.fetchone()[0] return report_id except sqlite3.Error as e: logging.error(f"Error inserting or updating data: {e}") return None # 插入或更新数据 def insert_or_update_common_lower(data, tbl_name, uniq_key='url'): try: processed_data = check_and_process_data(data, tbl_name) if processed_data is None: return None columns = ', '.join(processed_data.keys()) values = list(processed_data.values()) placeholders = ', '.join(['?' for _ in values]) # 先尝试插入数据 try: sql = f''' INSERT INTO {tbl_name} ({columns}, updated_at) VALUES ({placeholders}, datetime('now', 'localtime')) ''' cursor.execute(sql, values) conn.commit() except sqlite3.IntegrityError: # 唯一键冲突,执行更新操作 update_clause = ', '.join([f"{col}=?" for col in processed_data.keys() if col != uniq_key]) + ', updated_at=datetime(\'now\', \'localtime\')' update_values = [processed_data[col] for col in processed_data.keys() if col != uniq_key] update_values.append(data[uniq_key]) sql = f"UPDATE {tbl_name} SET {update_clause} WHERE {uniq_key} = ?" cursor.execute(sql, update_values) conn.commit() # 获取插入或更新后的 report_id cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],)) report_id = cursor.fetchone()[0] return report_id except sqlite3.Error as e: logging.error(f"Error inserting or updating data: {e}") return None # 插入books表,并判断是否需要更新 def insert_actor_index(data): try: return insert_or_update_common(data, tbl_name_actors) except sqlite3.Error as e: logging.error(f"Error inserting or updating data: {e}") return None # 更新详细信息 def update_actor_detail(data, is_full_data=1): try: data['is_full_data'] = is_full_data return insert_or_update_common(data, tbl_name_actors) except sqlite3.Error as e: logging.error(f"Error inserting or updating data: {e}") return None # 查询 def query_actors(**filters): try: sql = "SELECT url, en_name as name FROM javhd_models WHERE 1=1" params = [] conditions = { "id": " AND id = ?", "url": " AND href = ?", "en_name": " AND name LIKE ?", "is_full_data": " AND is_full_data = ?", "start_id": " AND id > ?", } for key, condition in conditions.items(): if key in filters: sql += condition if key == "en_name": params.append(f"%{filters[key]}%") else: params.append(filters[key]) for key in ["is_full_data_in", "is_full_data_not_in"]: if key in filters: values = filters[key] if values: placeholders = ", ".join(["?"] * len(values)) operator = "IN" if key == "is_full_data_in" else "NOT IN" sql += f" AND is_full_data {operator} ({placeholders})" params.extend(values) if "order_by" in filters: # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 sql += f" ORDER BY {filters['order_by']} " if 'limit' in filters: sql += " LIMIT ?" params.append(filters["limit"]) cursor.execute(sql, params) #return [row[0].lower() for row in cursor.fetchall()] # 返回小写 return [{'url': row[0], 'name': row[1]} for row in cursor.fetchall()] except sqlite3.Error as e: logging.error(f"查询 href 失败: {e}") return None # 测试代码 if __name__ == "__main__": print(query_actors("name LIKE '%未久%'")) #delete_actor_by_href('https://www.javdb.com/actors/MkAX') print(query_actors())