This repository has been archived on 2026-01-07. You can view files and clone it, but cannot push or open issues or pull requests.
Files
resources/javhd/src/sqlite_utils.py
2025-06-03 10:20:03 +08:00

191 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
import json
import config
import logging
from datetime import datetime
# 连接 SQLite 数据库
DB_PATH = f"{config.global_share_data_dir}/sqlite/shared.db" # 替换为你的数据库文件
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
cursor = conn.cursor()
tbl_name_actors = 'javhd_models'
# 检查 SQLite 版本
lower_sqlite_version = False
sqlite_version = sqlite3.sqlite_version_info
if sqlite_version < (3, 24, 0):
lower_sqlite_version = True
# 获取表的列名和默认值
def get_table_columns_and_defaults(tbl_name):
try:
cursor.execute(f"PRAGMA table_info({tbl_name})")
columns = cursor.fetchall()
column_info = {}
for col in columns:
col_name = col[1]
default_value = col[4]
column_info[col_name] = default_value
return column_info
except sqlite3.Error as e:
logging.error(f"Error getting table columns: {e}")
return None
# 检查并处理数据
def check_and_process_data(data, tbl_name):
column_info = get_table_columns_and_defaults(tbl_name=tbl_name)
if column_info is None:
return None
processed_data = {}
for col, default in column_info.items():
if col == 'id': # 自增主键,不需要用户提供
continue
if col == 'created_at' or col == 'updated_at': # 日期函数,用户自己指定即可
continue
if col in data:
processed_data[col] = data[col]
return processed_data
# 插入或更新数据
def insert_or_update_common(data, tbl_name, uniq_key='url'):
if lower_sqlite_version:
return insert_or_update_common_lower(data, tbl_name, uniq_key)
try:
processed_data = check_and_process_data(data, tbl_name)
if processed_data is None:
return None
columns = ', '.join(processed_data.keys())
values = list(processed_data.values())
placeholders = ', '.join(['?' for _ in values])
update_clause = ', '.join([f"{col}=EXCLUDED.{col}" for col in processed_data.keys() if col != uniq_key]) + ', updated_at=datetime(\'now\', \'localtime\')'
sql = f'''
INSERT INTO {tbl_name} ({columns}, updated_at)
VALUES ({placeholders}, datetime('now', 'localtime'))
ON CONFLICT ({uniq_key}) DO UPDATE SET {update_clause}
'''
cursor.execute(sql, values)
conn.commit()
# 获取插入或更新后的 report_id
cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],))
report_id = cursor.fetchone()[0]
return report_id
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 插入或更新数据
def insert_or_update_common_lower(data, tbl_name, uniq_key='url'):
try:
processed_data = check_and_process_data(data, tbl_name)
if processed_data is None:
return None
columns = ', '.join(processed_data.keys())
values = list(processed_data.values())
placeholders = ', '.join(['?' for _ in values])
# 先尝试插入数据
try:
sql = f'''
INSERT INTO {tbl_name} ({columns}, updated_at)
VALUES ({placeholders}, datetime('now', 'localtime'))
'''
cursor.execute(sql, values)
conn.commit()
except sqlite3.IntegrityError: # 唯一键冲突,执行更新操作
update_clause = ', '.join([f"{col}=?" for col in processed_data.keys() if col != uniq_key]) + ', updated_at=datetime(\'now\', \'localtime\')'
update_values = [processed_data[col] for col in processed_data.keys() if col != uniq_key]
update_values.append(data[uniq_key])
sql = f"UPDATE {tbl_name} SET {update_clause} WHERE {uniq_key} = ?"
cursor.execute(sql, update_values)
conn.commit()
# 获取插入或更新后的 report_id
cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],))
report_id = cursor.fetchone()[0]
return report_id
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 插入books表并判断是否需要更新
def insert_actor_index(data):
try:
return insert_or_update_common(data, tbl_name_actors)
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 更新详细信息
def update_actor_detail(data, is_full_data=1):
try:
data['is_full_data'] = is_full_data
return insert_or_update_common(data, tbl_name_actors)
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 查询
def query_actors(**filters):
try:
sql = "SELECT url, en_name as name FROM javhd_models WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"url": " AND href = ?",
"en_name": " AND name LIKE ?",
"is_full_data": " AND is_full_data = ?",
"start_id": " AND id > ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "en_name":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
for key in ["is_full_data_in", "is_full_data_not_in"]:
if key in filters:
values = filters[key]
if values:
placeholders = ", ".join(["?"] * len(values))
operator = "IN" if key == "is_full_data_in" else "NOT IN"
sql += f" AND is_full_data {operator} ({placeholders})"
params.extend(values)
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
cursor.execute(sql, params)
#return [row[0].lower() for row in cursor.fetchall()] # 返回小写
return [{'url': row[0], 'name': row[1]} for row in cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 测试代码
if __name__ == "__main__":
print(query_actors("name LIKE '%未久%'"))
#delete_actor_by_href('https://www.javdb.com/actors/MkAX')
print(query_actors())