This repository has been archived on 2026-01-07. You can view files and clone it, but cannot push or open issues or pull requests.
Files
resources/aabook/src/sqlite_utils.py
2025-03-19 08:34:30 +08:00

339 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
import json
import config
import utils
import logging
import sys
from datetime import datetime
# 连接 SQLite 数据库
DB_PATH = config.global_sqlite_path # 替换为你的数据库文件
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
tbl_name_books = 'books'
tbl_name_chapters_prefix = 'chapters'
tbl_name_section = 'books_sections'
# 检查 SQLite 版本
lower_sqlite_version = False
sqlite_version = sqlite3.sqlite_version_info
if sqlite_version < (3, 24, 0):
lower_sqlite_version = True
# 获取表的列名和默认值
def get_table_columns_and_defaults(tbl_name):
try:
cursor.execute(f"PRAGMA table_info({tbl_name})")
columns = cursor.fetchall()
column_info = {}
for col in columns:
col_name = col[1]
default_value = col[4]
column_info[col_name] = default_value
return column_info
except sqlite3.Error as e:
logging.error(f"Error getting table columns: {e}")
return None
# 检查并处理数据
def check_and_process_data(data, tbl_name):
column_info = get_table_columns_and_defaults(tbl_name=tbl_name)
if column_info is None:
return None
processed_data = {}
for col, default in column_info.items():
if col == 'id': # 自增主键,不需要用户提供
continue
if col == 'created_at' or col == 'updated_at': # 日期函数,用户自己指定即可
continue
elif col in data:
processed_data[col] = data[col]
else:
if default is not None:
processed_data[col] = default
else:
processed_data[col] = None
return processed_data
# 插入或更新数据
def insert_or_update_common(data, tbl_name, uniq_key='href'):
if lower_sqlite_version:
return insert_or_update_common_lower(data, tbl_name, uniq_key)
try:
processed_data = check_and_process_data(data, tbl_name)
if processed_data is None:
return None
columns = ', '.join(processed_data.keys())
values = list(processed_data.values())
placeholders = ', '.join(['?' for _ in values])
update_clause = ', '.join([f"{col}=EXCLUDED.{col}" for col in processed_data.keys() if col != uniq_key]) + ', updated_at=datetime(\'now\', \'localtime\')'
sql = f'''
INSERT INTO {tbl_name} ({columns}, updated_at)
VALUES ({placeholders}, datetime('now', 'localtime'))
ON CONFLICT ({uniq_key}) DO UPDATE SET {update_clause}
'''
cursor.execute(sql, values)
conn.commit()
# 获取插入或更新后的 report_id
cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],))
report_id = cursor.fetchone()[0]
return report_id
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 插入或更新数据
def insert_or_update_common_lower(data, tbl_name, uniq_key='href'):
try:
processed_data = check_and_process_data(data, tbl_name)
if processed_data is None:
return None
columns = ', '.join(processed_data.keys())
values = list(processed_data.values())
placeholders = ', '.join(['?' for _ in values])
# 先尝试插入数据
try:
sql = f'''
INSERT INTO {tbl_name} ({columns}, updated_at)
VALUES ({placeholders}, datetime('now', 'localtime'))
'''
cursor.execute(sql, values)
conn.commit()
except sqlite3.IntegrityError: # 唯一键冲突,执行更新操作
update_clause = ', '.join([f"{col}=?" for col in processed_data.keys() if col != uniq_key]) + ', updated_at=datetime(\'now\', \'localtime\')'
update_values = [processed_data[col] for col in processed_data.keys() if col != uniq_key]
update_values.append(data[uniq_key])
sql = f"UPDATE {tbl_name} SET {update_clause} WHERE {uniq_key} = ?"
cursor.execute(sql, update_values)
conn.commit()
# 获取插入或更新后的 report_id
cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],))
report_id = cursor.fetchone()[0]
return report_id
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 插入books表并判断是否需要更新
def insert_books_index(data):
try:
# 查询是否存在以及是否需要更新
cursor.execute(f"SELECT id FROM {tbl_name_books} WHERE href = ? and update_time >= ?", (data['href'], data['update_time'], ))
existing_book = cursor.fetchone()
if existing_book: # **如果演员已存在**
logging.debug(f"book {data['href']} already exist. id: {existing_book[0]}")
return existing_book[0], 0
# 不存在,或者需要更新
data['is_latest'] = 0
return insert_or_update_common(data, tbl_name_books), 1
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None, 0
# 更新详细信息
def update_book_detail(data):
try:
data['is_latest'] = 1
# 排除不更新的字段只更新data中含有的字段
fields_to_update = [field for field in data if field not in ['id', 'href', 'created_at']]
# 构建更新语句
set_clause = ', '.join([f"{field} = ?" for field in fields_to_update])
sql = f"UPDATE {tbl_name_books} SET {set_clause}, updated_at = datetime('now', 'localtime') WHERE href = ?"
# 准备参数
values = [data[field] for field in fields_to_update]
values.append(data['href'])
cursor.execute(sql, values)
conn.commit()
# 获取插入或更新后的 report_id
cursor.execute(f"SELECT id FROM {tbl_name_books} WHERE href = ?", (data['href'],))
report_id = cursor.fetchone()[0]
return report_id
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 按条件查询 href 列表
def query_books(**filters):
try:
sql = f"SELECT href, name, id FROM {tbl_name_books} WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "href" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "name" in filters:
sql += " AND name LIKE ?"
params.append(f"%{filters['name']}%")
if "is_latest" in filters:
sql += " AND is_latest = ?"
params.append(filters["is_latest"])
if 'limit' in filters:
sql += " limit ?"
params.append(filters["limit"])
cursor.execute(sql, params)
return [{'href': row[0], 'name': row[1], 'id': row[2]} for row in cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 检查表是否存在,不存在就创建
def check_and_create_chapters_table(book_number):
table_name = f"{tbl_name_chapters_prefix}_{book_number}"
try:
create_table_query = f'''
CREATE TABLE if not exists {table_name} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
book_id INTEGER,
chapter_id INTEGER,
section_id INTEGER,
title TEXT,
href TEXT UNIQUE,
content TEXT,
has_content INTEGER default 0,
created_at TEXT DEFAULT (datetime('now', 'localtime')),
updated_at TEXT DEFAULT (datetime('now', 'localtime')),
FOREIGN KEY(book_id) REFERENCES books(id) ON DELETE CASCADE
);
'''
cursor.execute(create_table_query)
conn.commit()
return table_name
except sqlite3.Error as e:
logging.error(f"create table failed: {e}")
return None
# 插入到数据表中
def insert_chapter_data(data):
tbl_num = int(data['book_id']) % 100
tbl_name = check_and_create_chapters_table(tbl_num)
if tbl_name :
return insert_or_update_common(data, tbl_name)
else:
return None
# 查询某本书最后的获取页码
def query_last_chapter_by_book(bookid):
tbl_num = int(bookid) % 100
tbl_name = check_and_create_chapters_table(tbl_num)
if tbl_name is None:
return None
try:
sql = f"SELECT href FROM {tbl_name} WHERE book_id={bookid} order by id desc limit 1"
cursor.execute(sql)
row = cursor.fetchone()
if row: # **如果演员已存在**
return row[0]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 获取没有内容的章节链接
def query_no_content_chapters(limit = 100):
# 用于存储所有结果的列表
all_results = []
# 循环遍历 0 到 100 的数字
for i in range(100):
table_name = f'{tbl_name_chapters_prefix}_{i}'
try:
# 计算还需要多少条数据
remaining_count = limit - len(all_results)
if remaining_count <= 0:
break
# 执行 SQL 查询,从每个表中获取 has_content = 0 的数据,数量不超过剩余所需数量
query = f"SELECT href, title, book_id, chapter_id, section_id FROM {table_name} WHERE has_content = 0 LIMIT {remaining_count}"
cursor.execute(query)
results = [{'href': row[0], 'title': row[1], 'book_id': row[2], 'chapter_id': row[3], 'section_id': row[4]} for row in cursor.fetchall()]
all_results.extend(results)
except sqlite3.Error as e:
print(f"Error querying table {table_name}: {e}")
return all_results
# 插入书本的卷信息
def insert_or_update_book_sections(data):
return insert_or_update_common(data, tbl_name_section, uniq_key='bookid_section')
# 统计信息
def get_statics():
result = {}
try:
# 获取 performers、studios 等表的最终行数
cursor.execute(f"SELECT COUNT(*) FROM {tbl_name_books} ")
result['all_books'] = cursor.fetchone()[0]
cursor.execute(f"SELECT COUNT(*) FROM {tbl_name_books} where is_latest=1")
result['all_books_latest'] = cursor.fetchone()[0]
except sqlite3.Error as e:
logging.error(f"query error: {e}")
all_chapters = 0
all_chapters_has_contents = 0
finished_books = 0
# 循环遍历 0 到 100 的数字
for i in range(100):
table_name = f'{tbl_name_chapters_prefix}_{i}'
try:
cursor.execute(f"SELECT COUNT(*) FROM {table_name} ")
all_chapters += cursor.fetchone()[0]
cursor.execute(f"SELECT COUNT(*) FROM {table_name} where has_content=1")
all_chapters_has_contents += cursor.fetchone()[0]
# 统计已经下载完的书籍总数
sql = f"""
SELECT COUNT(*)
FROM (
SELECT book_id
FROM {table_name}
GROUP BY book_id
HAVING SUM(CASE WHEN has_content = 1 THEN 1 ELSE 0 END) = COUNT(*)
)
"""
cursor.execute(sql)
finished_books += cursor.fetchone()[0]
except sqlite3.Error as e:
logging.debug(f"Error querying table {table_name}: {e}")
result['all_chapters'] = all_chapters
result['all_chapters_has_contents'] = all_chapters_has_contents
result['finished_books'] = finished_books
return result