This repository has been archived on 2026-01-07. You can view files and clone it, but cannot push or open issues or pull requests.
Files
resources/aabook/aabook_fetch.py
2025-03-18 17:45:20 +08:00

484 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
from bs4 import BeautifulSoup
import os
import sys
import random
import time
import re
import logging
from datetime import datetime
from datetime import date
import config # 日志配置
from down_list import novel_map
import utils
# 日志
config.setup_logging()
# 配置基础URL和输出文件
base_url = 'https://aabook.xyz'
list_url_wordcount = 'https://aabook.xyz/category.html?pageNum={}&pageSize=30&catId=-1&size=-1&isFinish=-1&updT=-1&orderBy=wordcount'
list_url_update = 'https://aabook.xyz/category.html?pageNum={}&pageSize=30&catId=-1&size=-1&isFinish=-1&updT=-1&orderBy=update'
curr_novel_pages = 0
meta_dir = f'{config.global_host_data_dir}/aabook/meta'
novel_dir = f'{config.global_host_data_dir}/aabook/data'
list_file = f'{meta_dir}/list.txt'
details_file = f'{meta_dir}/details.txt'
down_list_file = f'{meta_dir}/down_list.txt'
# User-Agent 列表
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.63 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.67",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:90.0) Gecko/20100101 Firefox/90.0",
"Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Mobile Safari/537.36"
]
# 定义获取页面内容的函数,带重试机制
def get_page_content(url, max_retries=100, sleep_time=5, default_timeout=10):
retries = 0
# 随机选择一个 User-Agent
headers = {
'User-Agent': random.choice(user_agents)
}
while retries < max_retries:
try:
response = requests.get(url, headers=headers, timeout=default_timeout, stream=True)
response.raise_for_status()
return response.text # 请求成功,返回内容
except requests.RequestException as e:
retries += 1
logging.info(f"Warn fetching page {url}: {e}. Retrying {retries}/{max_retries}...")
if retries >= max_retries:
logging.error(f"Failed to fetch page {url} after {max_retries} retries.")
return None
time.sleep(sleep_time) # 休眠指定的时间,然后重试
# 获取排行列表
def get_list(write_list_file = list_file, list_url = list_url_wordcount, start_date = '2000-01-01', order_by_date = False):
page_num = 1
start_time = datetime.strptime(f'{start_date} 00:00:00', "%Y-%m-%d %H:%M:%S")
with open(write_list_file, 'w', encoding='utf-8') as f:
while True:
# 发起请求
list_url = list_url.format(page_num)
logging.info(f"Fetching page [{page_num}] {list_url}")
content = get_page_content(list_url)
soup = BeautifulSoup(content, 'html.parser')
# 查找书籍列表
list_main = soup.find('div', class_='list_main')
if not list_main:
logging.info("No list_main Found. retry...")
continue
tbody = list_main.find('tbody')
if not tbody:
logging.info("No tbody found. retry...")
continue
# 获取每本书的基础信息:排名、分类、书名、作者、月票、更新时间(按字数排序时是总字数,按日期排序时是最后更新日期)
for tr in tbody.find_all('tr'):
tds = tr.find_all('td')
if len(tds) < 6:
logging.info("Invalid tr format.")
continue
ranking = tds[0].text.strip()
category = tds[1].text.strip()
book_link_tag = tds[2].find('a')
book_name = book_link_tag.text.strip()
book_link = base_url + '/' + book_link_tag['href']
author = tds[3].text.strip()
monthly_tickets = tds[4].text.strip()
update_time = tds[5].text.strip() #实际上是字数(按字数排序时是总字数,按日期排序时是最后更新日期)
# 检查更新
if order_by_date :
up_time = datetime.strptime(update_time, "%Y-%m-%d %H:%M:%S")
if start_time > up_time:
return
# 写入 aabook_list.txt
# 排名 分类 书名 作者 月票 字数(更新日期) 书本链接
f.write(f"{ranking}\t{category}\t{book_name}\t{author}\t{monthly_tickets}\t{update_time}\t{book_link}\n")
f.flush()
# 查找下一页链接
next_page_tag = soup.find('a', title='下一页')
if next_page_tag:
list_url = base_url + next_page_tag['href']
page_num += 1
else:
logging.info("No next page, stopping.")
break
time.sleep(3)
#break ## for test
# 拉取详情,并校验
def fetch_detail_and_check(url, book_name):
while True:
contenxt = get_page_content(url)
soup = BeautifulSoup(contenxt, 'html.parser')
# 解析书籍详细信息
book_info_tag = soup.find('li', class_='zuopinxinxi')
if not book_info_tag:
logging.info(f"No details found for {book_name}, retry...")
continue
book_info_lis = book_info_tag.find_all('li')
if len(book_info_lis) < 4:
logging.info(f"invalid book info. {book_name}. retry...")
continue
return contenxt
# 获取每本书的详情
def get_detail(write_list_file = list_file, wirte_details_file = details_file):
# 读取已完成详细信息的书籍链接
if os.path.exists(wirte_details_file):
with open(wirte_details_file, 'r', encoding='utf-8') as f:
completed_links = set(line.split('\t')[4] for line in f.readlines())
else:
completed_links = set()
with open(write_list_file, 'r', encoding='utf-8') as f_list, open(wirte_details_file, 'a', encoding='utf-8') as f_details:
for line in f_list:
fields = line.strip().split('\t')
if len(fields) < 7:
continue
book_link = fields[6]
book_name = fields[2]
if book_link in completed_links:
logging.info(f"Skipping {book_name} {book_link}, already processed.")
continue
# 访问书籍详细页
logging.info(f"Fetching details for {book_name} {book_link}")
#contenxt = get_page_content(book_link)
contenxt = fetch_detail_and_check(book_link, book_name)
soup = BeautifulSoup(contenxt, 'html.parser')
# 解析书籍详细信息
book_info_tag = soup.find('li', class_='zuopinxinxi')
if not book_info_tag:
logging.info(f"No details found for {book_name}, skipping.")
continue
book_info_lis = book_info_tag.find_all('li')
if len(book_info_lis) < 4:
logging.info(f"invalid book info. {book_name}")
continue
book_category = book_info_lis[0].find('span').text.strip()
book_status = book_info_lis[1].find('span').text.strip()
total_word_count = book_info_lis[2].find('span').text.strip()
total_clicks = book_info_lis[3].find('span').text.strip()
# 去掉后面的汉字,只要数字
total_word_count = int(re.search(r'\d+', total_word_count).group())
# 读取创建时间
creation_time_tag = soup.find('li', class_='update_time')
creation_time = creation_time_tag.text.strip() if creation_time_tag else 'N/A'
# 获取起始页链接和编号
start_page_tag = soup.find('ul', class_='gezhonganniu').find_all('li')[0].find('a')
start_page_link = base_url + '/' + start_page_tag['href']
start_page_number = start_page_link.split('-')[-1].replace('.html', '')
# 写入 aabook_details.txt
# 排名 类别 书名 作者 书本链接 首页链接 开始链接编码 状态 总字数 总点击 总字数 创建时间
f_details.write(f"{fields[0]}\t{book_category}\t{fields[2]}\t{fields[3]}\t{book_link}\t"
f"{start_page_link}\t{start_page_number}\t{book_status}\t{total_word_count}\t"
f"{total_clicks}\t{fields[5]}\t{creation_time}\n")
f_details.flush()
time.sleep(5)
# 解析内容中的水印部分
def clean_watermarks(html):
"""
过滤掉带有 class 属性的水印标签及其内部内容,保留其他标签结构。
"""
# 使用正则表达式匹配并移除任何带有 class 属性的 HTML 标签及其内容
cleaned_html = re.sub(r'<[^>]+class="[^"]+">.*?</[^>]+>', '', html, flags=re.DOTALL)
return cleaned_html
def process_paragraph(paragraph):
# 获取完整的 HTML 结构,而不是 get_text()
paragraph_html = str(paragraph)
# 移除水印标签
cleaned_html = clean_watermarks(paragraph_html)
# 使用 BeautifulSoup 解析移除水印标签后的 HTML 并提取文本
soup = BeautifulSoup(cleaned_html, 'html.parser')
cleaned_text = soup.get_text().strip()
return cleaned_text
# 从 script 标签中提取 content_url
def extract_content_url(soup, base_url, chapid):
# 找到所有 <script> 标签
script_tags = soup.find_all('script')
# 遍历每一个 <script> 标签,查找包含特定内容的标签
for script_tag in script_tags:
script_content = script_tag.string
if script_content and re.search(r'\.get\("./_getcontent\.php', script_content):
# 匹配到特定内容,提取出 _getcontent.php 的 URL 模板
match = re.search(r'\.get\("\./_getcontent\.php\?id="\+\s*chapid\s*\+\s*"&v=([^"]+)"', script_content)
if match:
# 从匹配中提取 v 参数值
v_value = match.group(1)
# 构建完整的 content_url
content_url = f"{base_url}/_getcontent.php?id={chapid}&v={v_value}"
return content_url
# 如果未找到匹配的 script 标签,则返回 None
return None
# 判断内容是否被污染
def check_content(content):
if '2005-2024 疯情书库' in content or '2005-2025 疯情书库' in content:
return False
return True
# 计数器
def reset_novel_pages():
global curr_novel_pages
curr_novel_pages = 0
def add_novel_pages():
global curr_novel_pages
curr_novel_pages += 1
def get_novel_pages():
global curr_novel_pages
return curr_novel_pages
# 解析章节内容并保存到文件中
def download_novel(chapid, novel_name, dir_prefix=novel_dir):
chapter_url = f'{base_url}/read-{chapid}.html'
novel_file = f'{dir_prefix}/{chapid}_{novel_name}.txt'
if os.path.exists(novel_file):
os.remove(novel_file) # 如果存在同名文件,删除重新下载
# 保存到其他类型的文件
chapters = []
reset_novel_pages()
while chapter_url:
logging.info(f"Processing: [{novel_name}] [{chapid}] {chapter_url}")
# 获取章节页面内容
html_content = get_page_content(chapter_url)
if html_content is None:
logging.error(f"Get page error {chapter_url}, retry...")
time.sleep(2)
continue
# 解析章节内容
soup = BeautifulSoup(html_content, 'html.parser')
# 获取章节标题
chapter_title_tag = soup.find('h1', class_='chapter_title')
if chapter_title_tag:
chapter_title = chapter_title_tag.get_text().strip()
logging.info(f"Processing: [{novel_name}] [{chapid}] Chapter Title: {chapter_title}")
else:
logging.error(f"Chapter title not found in {chapter_url}, retry...")
time.sleep(2)
continue
# 提取正文内容的请求地址
content_url = extract_content_url(soup, base_url, chapid)
if content_url:
logging.info(f"Fetching content from: {content_url}")
# 获取正文内容
content_response = get_page_content(content_url)
if content_response:
if not check_content(content_response):
logging.error(f'error response. dirty page [{novel_name}] {content_url}, retry...')
continue
content_soup = BeautifulSoup(content_response, 'html.parser')
paragraphs = content_soup.find_all('p')
# 写入标题到文件
with open(novel_file, 'a', encoding='utf-8') as f:
f.write(chapter_title + '\n\n')
# 写入每个段落内容到文件
content = ''
with open(novel_file, 'a', encoding='utf-8') as f:
for paragraph in paragraphs:
#cleaned_part = clean_watermarks(paragraph.get_text().strip())
#f.write(paragraph.get_text() + '\n\n')
#f.write(cleaned_part + '\n\n')
cleaned_text = process_paragraph(paragraph)
f.write(cleaned_text + '\n\n')
content = content + '<p>' + cleaned_text + '</p>' # epub 里面用html标签来分段落
logging.info(f"Writting content to file. [{novel_name}] [{chapid}] [{chapter_title}]")
chapters.append((chapter_title, content))
else:
logging.info(f"Fetching content error: [{novel_name}] {content_url}, retry...")
continue
else:
logging.info(f"Content URL not found in [{novel_name}] {chapter_url}, retry...")
continue
# 页码数+1
add_novel_pages()
# 查找下一章的链接
next_div = soup.find('div', class_='next_arrow')
# 判断是否找到了包含下一章链接的 div 标签
if next_div:
next_page_tag = next_div.find('a', href=True, title=re.compile(r'下一章'))
if next_page_tag:
next_page_url = next_page_tag['href']
# 使用正则提取其中的章节 ID数字部分
chapid_match = re.search(r'read-(\d+)\.html', next_page_url)
if chapid_match:
chapid = chapid_match.group(1) # 提取到的章节 ID
chapter_url = f"{base_url}/{next_page_url}"
logging.debug(f"Next chapter URL: {chapter_url}, chapid: {chapid}")
else:
logging.info(f"Failed to extract chapid from next_page_url: {next_page_url}")
break
else:
logging.info(f"No next page found. Ending download for {novel_name}.")
break
else:
logging.info(f"No 'next_arrow' div found in {chapter_url}. Ending download.")
break
time.sleep(3)
# 全部获取完生成epub文件
utils.generate_epub(novel_name, 'nobody', chapters, dir_prefix)
# 检查子目录是否存在,不存在则创建
def create_directory_if_not_exists(category_name):
if not os.path.exists(category_name):
os.makedirs(category_name)
logging.info(f"Created directory: {category_name}")
# 下载小说,检查是否已经下载过
def download_books(need_down_list_file = details_file, cursor_file = down_list_file):
if not os.path.isfile(need_down_list_file):
logging.error(f'input file {need_down_list_file} not exist!')
return
if not os.path.isfile(cursor_file):
logging.info(f'input file {cursor_file} not exist, use empty dict instead.')
# 读取 aabook_down_list.txt 中已下载书籍的起始页数字编号和书名
downloaded_books = {}
if os.path.exists(cursor_file):
with open(cursor_file, 'r', encoding='utf-8') as f:
for line in f:
fields = line.strip().split('\t')
if len(fields) != 2:
logging.info(f'invalid line data: {line}')
continue
novel_id, novel_name = fields
downloaded_books[novel_id] = novel_name
# 打开 aabook_details.txt 读取书籍信息
with open(need_down_list_file, 'r', encoding='utf-8') as details:
for line in details:
fields = line.strip().split('\t')
if len(fields) < 8:
logging.info(f'invalid line data. {line}')
continue # 跳过不完整的数据
ranking, category, book_name, author, book_link, start_page_link, novel_id, status, total_word_count, total_clicks, update_time, creation_time = fields
# 检查书籍是否已经下载过
if novel_id in downloaded_books:
logging.info(f"Skipping already downloaded novel: {book_name} (ID: {novel_id})")
continue # 已经下载过,跳过
# 创建分类目录
down_dir = f'{novel_dir}/{category}'
create_directory_if_not_exists(down_dir)
# 调用下载函数下载书籍
start_time = time.time() # 在函数执行前获取当前时间
download_novel(novel_id, book_name, down_dir)
end_time = time.time() # 在函数执行后获取当前时间
elapsed_time = int(end_time - start_time) # 计算时间差,秒
novel_pages = get_novel_pages()
# 下载后,将书籍信息追加写入 aabook_down_list.txt
with open(cursor_file, 'a', encoding='utf-8') as down_list:
down_list.write(f"{novel_id}\t{book_name}\n")
logging.info(f"Downloaded and recorded: ({book_name}) (ID: {novel_id}) total pages: {novel_pages} time cost: {elapsed_time} s")
# 下载指定的小说
def download_map():
# 遍历 novel_map下载所有小说
for novel_id, novel_name in novel_map.items():
logging.info(f"Starting download for {novel_name} (ID: {novel_id})")
download_novel(novel_id, novel_name, novel_dir)
logging.info(f"Completed download for {novel_id}_{novel_name}.\n")
# 获取更新列表,并下载
def get_update(start_date, fetch_all = False):
today_str = date.today().strftime("%Y-%m-%d")
update_file = f'{meta_dir}/{today_str}_list_{start_date}.txt'
details_file = f'{meta_dir}/{today_str}_details_{start_date}.txt'
cursor_file = f'{meta_dir}/{today_str}_down_list_{start_date}.txt'
logging.info(f"\n\nFetching novel list by update time from {start_date} \n\n")
get_list(update_file, list_url_update, start_date, True)
logging.info(f"\n\nFetching novel details by update time from {start_date} \n\n")
get_detail(update_file, details_file)
if fetch_all:
logging.info(f"\n\nDownloading novel lists by update time from {start_date} \n\n")
download_books(details_file, cursor_file)
def main():
if len(sys.argv) < 2:
print("Usage: python script.py <cmd>")
print("cmd: get_list, get_detail, get_all, get_update, get_update_all, download, download_map")
sys.exit(1)
# 确保目录存在
create_directory_if_not_exists(meta_dir)
create_directory_if_not_exists(novel_dir)
cmd = sys.argv[1]
if cmd == "get_list":
get_list() # 之前已经实现的获取列表功能
elif cmd == "get_detail":
get_detail() # 之前已经实现的获取详情功能
elif cmd == "get_all":
get_list()
get_detail()
elif cmd == "download":
download_books() # 下载书籍功能
elif cmd == "download_map":
download_map() # 下载书籍功能
elif cmd == "get_update" or cmd == "get_update_all":
fetch_all = False if cmd == "get_update" else True
start_date = '2000-01-01'
if len(sys.argv) == 3:
start_date = sys.argv[2]
get_update(start_date, fetch_all) # 获取更新列表,并下载
else:
print(f"Unknown command: {cmd}")
if __name__ == '__main__':
main()