modify scripts

This commit is contained in:
oscarz
2025-03-20 12:18:27 +08:00
parent f8af2c44e5
commit 37b82e5e5c
3 changed files with 168 additions and 6 deletions

View File

@ -1,9 +1,18 @@
from ebooklib import epub from ebooklib import epub
from fpdf import FPDF
import os
from reportlab.lib.pagesizes import letter from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas from reportlab.pdfgen import canvas
from reportlab.lib.styles import getSampleStyleSheet from reportlab.lib.styles import getSampleStyleSheet
from reportlab.platypus import Paragraph, Spacer from reportlab.platypus import Paragraph, Spacer
# pdf 会报错,各种字体的问题。。。不折腾了
# 需要安装
# apt-get install fonts-wqy-microhei
# apt-get install fontforge
# fontforge -lang=ff -c 'Open($1); Generate($2)' /usr/share/fonts/truetype/wqy/wqy-microhei.ttc /usr/share/fonts/truetype/wqy/wqy-microhei.ttf
font_path = "/usr/share/fonts/truetype/wqy/wqy-microhei.ttf"
def generate_epub(data, save_path): def generate_epub(data, save_path):
# 创建 EPUB 书籍对象 # 创建 EPUB 书籍对象
@ -21,30 +30,33 @@ def generate_epub(data, save_path):
if len(sections) == 1: if len(sections) == 1:
# 如果只有一个 section忽略 section 的 title按一级目录处理 # 如果只有一个 section忽略 section 的 title按一级目录处理
for chapter in sections[0].get('chapters', []): for index, chapter in enumerate(sections[0].get('chapters', []), start=1):
chapter_title = chapter.get('title', '未知章节') chapter_title = chapter.get('title', '未知章节')
chapter_content = chapter.get('content', '') chapter_content = chapter.get('content', '')
paragraphs = chapter_content.split('\n\n') paragraphs = chapter_content.split('\n\n')
html_content = ''.join([f'<p>{para}</p>' for para in paragraphs]) html_content = ''.join([f'<p>{para}</p>' for para in paragraphs])
chapter_obj = epub.EpubHtml(title=chapter_title, file_name=f'{chapter_title}.xhtml', lang='zh') # 为文件名添加序号,避免冲突
chapter_obj = epub.EpubHtml(title=chapter_title, file_name=f'{index}_{chapter_title}.xhtml', lang='zh')
chapter_obj.content = f'<h1>{chapter_title}</h1>{html_content}' chapter_obj.content = f'<h1>{chapter_title}</h1>{html_content}'
book.add_item(chapter_obj) book.add_item(chapter_obj)
all_chapters.append(chapter_obj) all_chapters.append(chapter_obj)
else: else:
# 如果有多个 section按两级目录处理 # 如果有多个 section按两级目录处理
for section in sections: for section_index, section in enumerate(sections, start=1):
section_title = section.get('title', '未知卷') section_title = section.get('title', '未知卷')
section_chapter = epub.EpubHtml(title=section_title, file_name=f'{section_title}.xhtml', lang='zh') # 为 section 的文件名添加序号,避免冲突
section_chapter = epub.EpubHtml(title=section_title, file_name=f'{section_index}_{section_title}.xhtml', lang='zh')
section_chapter.content = f'<h1>{section_title}</h1>' section_chapter.content = f'<h1>{section_title}</h1>'
book.add_item(section_chapter) book.add_item(section_chapter)
all_chapters.append(section_chapter) all_chapters.append(section_chapter)
for chapter in section.get('chapters', []): for chapter_index, chapter in enumerate(section.get('chapters', []), start=1):
chapter_title = chapter.get('title', '未知章节') chapter_title = chapter.get('title', '未知章节')
chapter_content = chapter.get('content', '') chapter_content = chapter.get('content', '')
paragraphs = chapter_content.split('\n\n') paragraphs = chapter_content.split('\n\n')
html_content = ''.join([f'<p>{para}</p>' for para in paragraphs]) html_content = ''.join([f'<p>{para}</p>' for para in paragraphs])
chapter_obj = epub.EpubHtml(title=chapter_title, file_name=f'{chapter_title}.xhtml', lang='zh') # 为 chapter 的文件名添加 section 序号和 chapter 序号,避免冲突
chapter_obj = epub.EpubHtml(title=chapter_title, file_name=f'{section_index}_{chapter_index}_{chapter_title}.xhtml', lang='zh')
chapter_obj.content = f'<h2>{chapter_title}</h2>{html_content}' chapter_obj.content = f'<h2>{chapter_title}</h2>{html_content}'
book.add_item(chapter_obj) book.add_item(chapter_obj)
all_chapters.append(chapter_obj) all_chapters.append(chapter_obj)
@ -69,6 +81,62 @@ def generate_epub(data, save_path):
def generate_pdf(data, save_path): def generate_pdf(data, save_path):
# 数据校验
if not isinstance(data, dict) or 'title' not in data or 'sections' not in data:
raise ValueError("Invalid data format. 'title' and 'sections' are required.")
if not isinstance(data['sections'], list) or not data['sections']:
raise ValueError("'sections' must be a non-empty list.")
# 初始化 PDF
pdf = FPDF()
pdf.set_auto_page_break(auto=True, margin=15)
pdf.add_page()
# 加载支持中文的字体
if not os.path.exists(font_path):
raise FileNotFoundError(f"Font not found at {font_path}. Please install NotoSansCJK.")
pdf.add_font("NotoSans", "", font_path, uni=True)
pdf.set_font("NotoSans", size=16)
# PDF 标题
pdf.cell(200, 10, txt=data['title'], ln=True, align='C')
pdf.ln(10)
# 判断是否有多卷
multiple_sections = len(data['sections']) > 1
# 遍历 sections
for section in data['sections']:
if 'chapters' not in section or not isinstance(section['chapters'], list):
raise ValueError(f"Section '{section.get('title', 'Unknown')}' is missing valid 'chapters'.")
# 如果有多个sections显示卷标题
if multiple_sections:
pdf.set_font("NotoSans", style='B', size=14)
pdf.cell(200, 10, txt=section['title'], ln=True)
pdf.ln(5)
# 遍历 chapters
for chapter in section['chapters']:
if 'title' not in chapter or 'content' not in chapter:
raise ValueError("Each chapter must have a 'title' and 'content'.")
pdf.set_font("NotoSans", style='B', size=12)
pdf.cell(200, 8, txt=chapter['title'], ln=True)
pdf.set_font("NotoSans", size=10)
pdf.multi_cell(0, 8, txt=chapter['content'])
pdf.ln(5)
# 保存 PDF
try:
pdf.output(save_path)
print(f"PDF saved to {save_path}")
except Exception as e:
raise IOError(f"Failed to save PDF: {e}")
def generate_pdf_2(data, save_path):
# 创建 PDF 画布 # 创建 PDF 画布
c = canvas.Canvas(save_path, pagesize=letter) c = canvas.Canvas(save_path, pagesize=letter)
styles = getSampleStyleSheet() styles = getSampleStyleSheet()

26
aabook/src/dump_book.py Normal file
View File

@ -0,0 +1,26 @@
import json
import time
import csv
import argparse
import logging
from functools import partial
import config
import sqlite_utils as db_tools
import scraper
import utils
import config
import convert_utils
config.setup_logging()
books_dir = f"{config.global_host_data_dir}/aabook/data"
# 使用示例
if __name__ == "__main__":
books = [4, 2600]
for book in books:
data = db_tools.get_contents_by_book(book)
if data:
title = data['title']
convert_utils.generate_epub(data, f"{books_dir}/{title}.epub")
print(f"dump {title} suss!")

View File

@ -302,6 +302,21 @@ def query_last_chapter_by_book(bookid):
logging.error(f"查询 href 失败: {e}") logging.error(f"查询 href 失败: {e}")
return None return None
# 查询表是否存在
def check_table_exists(table_name):
try:
# 执行查询,检查表是否存在
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
result = cursor.fetchone()
# 如果查询结果不为空,说明表存在
return result is not None
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 获取没有内容的章节链接 # 获取没有内容的章节链接
def query_no_content_chapters(limit = 100): def query_no_content_chapters(limit = 100):
# 用于存储所有结果的列表 # 用于存储所有结果的列表
@ -370,6 +385,59 @@ def query_toc_href():
def insert_or_update_book_sections(data): def insert_or_update_book_sections(data):
return insert_or_update_common(data, tbl_name_section, uniq_key='bookid_section') return insert_or_update_common(data, tbl_name_section, uniq_key='bookid_section')
# 获取完整的小说内容
def get_contents_by_book(id):
try:
# 查询内容表是否存在
tbl_num = int(id) % 100
chap_tlb_name = f'{tbl_name_chapters_prefix}_{tbl_num}'
tbl_exists = check_table_exists(chap_tlb_name)
if tbl_exists is None or not tbl_exists:
logging.warning(f"book {id} have no data.")
return None
# 查询是书否存在
cursor.execute(f"SELECT id, name, author, category, status FROM {tbl_name_books} WHERE id= {id}")
existing_book = cursor.fetchone()
if not existing_book: # 书不存在
logging.warning(f"book {id} have no meta data.")
return None
book_data = {}
book_data['title'] = existing_book[1]
book_data['author'] = existing_book[2]
book_data['category'] = existing_book[3]
book_data['status'] = existing_book[4]
book_data['sections'] = []
# 组装section信息
query = f"SELECT id, section FROM {tbl_name_section} WHERE book_id = {id}"
cursor.execute(query)
sections = [{'id': row[0], 'section': row[1]} for row in cursor.fetchall()]
# 循环读取数据
for row in sections:
section_id = row['id']
section_title = row['section']
chapters = []
query = f"SELECT id, title, content FROM {chap_tlb_name} WHERE book_id = {id} and section_id={section_id} order by id asc"
cursor.execute(query)
for chap_row in cursor.fetchall():
chapters.append({
'title': chap_row[1],
'content': chap_row[2]
})
book_data['sections'].append({
'title': section_title,
'chapters' : chapters
})
return book_data
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 统计信息 # 统计信息
def get_statics(): def get_statics():