This repository has been archived on 2026-01-07. You can view files and clone it, but cannot push or open issues or pull requests.
Files
resources/aabook/src/fetch.py
2025-03-18 17:59:22 +08:00

313 lines
12 KiB
Python

import json
import time
import csv
import argparse
import logging
from functools import partial
import config
import sqlite_utils as db_tools
import scraper
import utils
import config
config.setup_logging()
debug = False
force = False
# 获取列表
def fetch_book_list():
url = scraper.list_url_update
while True:
logging.info(f'fetching book list. url: {url}')
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="list_main", attr_type="class"))
if soup:
# 获取书籍列表
list_data, next_url = scraper.parse_book_list(soup, url=url)
for item in list_data:
row_id = db_tools.insert_or_update_common(item, db_tools.tbl_name_books)
if row_id:
logging.debug(f"insert one book. row_id: {row_id}, name: {item['name']}")
else:
logging.warning(f"insert book error. name: {item['name']}, href: {item['href']}")
if next_url is None:
logging.info(f'get all pages.')
return True
else:
url = next_url
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...')
else:
logging.warning(f'fetch page error. {url} ...')
# 获取详情
def fetch_real_content(url):
soup, status_code = scraper.fetch_page(url, scraper.content_validator)
if soup:
data = scraper.parse_content_page(soup, url)
if data:
return data # 段落的数组
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...')
else:
logging.warning(f'fetch page error. {url} ...')
return None
# 获取内容页
def fetch_chapter_content(url):
chapter_data = {}
next_url = None
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="h1", identifier="chapter_title", attr_type="class"))
if soup:
data, next_url = scraper.parse_chapter_page(soup, url)
if data:
chapter_data['title'] = data['title']
contents = fetch_real_content(data['content_url'])
if contents:
chapter_data['contents'] = contents
else:
logging.warning(f"fetching real content faild. url: {data['content_url']}")
return None, None
else:
logging.warning(f'fetch chapter page no data. url: {url}')
return None, None
else:
logging.warning(f"fetch chapter page error. url: {url}, status_code: {status_code}")
return None, None
return chapter_data, next_url
# 获取小说详情页,获得首页地址
def fetch_book_detail(url):
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="li", identifier="zuopinxinxi", attr_type="class"))
if soup:
detail = scraper.parse_book_detail(soup, url)
return detail
else:
return None
# 获取某本小说的目录页
def fetch_book_toc(url):
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="page_main", attr_type="class"))
if soup:
listdata = scraper.pase_chapter_list(soup, url)
return listdata
else:
return None
# 获取小说的目录页,并插入到数据库
def fetch_table_of_contents():
while True:
update_list = db_tools.query_books(id=2547, is_latest=0, limit = 2 if debug else 100)
if update_list is None or len(update_list) <1 :
logging.info(f'no more data need fecth.')
return
for row in update_list:
name = row['name']
href = row['href']
bookid = row['id']
# 先打开详情页
logging.info(f'----------fetching book {name}: {href}-------------')
book_detail = fetch_book_detail(href)
if book_detail is None:
logging.warning(f'get book detail failed. url: {href}')
continue
# 获取目录页
toc_url = book_detail['table_of_contents_href']
if toc_url is None or toc_url == '':
logging.warning(f'table_of_contents_href is not correct. url: {href}')
continue
logging.info(f'fetching page: {toc_url}')
toc_data = fetch_book_toc(toc_url)
# 解析目录页
if toc_data is None:
logging.warning(f'fetch_book_toc error. url: {toc_url}')
continue
# 插入所有的目录数据
succ = 1
for row in toc_data:
section_title = row['title']
chapters = row['chapters']
section_id = db_tools.insert_or_update_book_sections({
'book_id' : int(bookid),
'section' : section_title,
'bookid_section': f'{bookid}_{section_title}'
})
if section_id is None:
logging.warning(f'insert section error. url: {toc_url}, section: {section_title}')
succ = 0
break
else:
logging.debug(f'insert one books_sections record. id:{section_id}, key: {bookid}_{section_title}')
# 插入目录数据
for chap in chapters:
chap_row_id = db_tools.insert_chapter_data({
'book_id': bookid,
'chapter_id': chap['chapter_id'],
'section_id': section_id,
'title': chap['title'],
'href': chap['href'],
'content': '',
'has_content' : 0
})
if chap_row_id is None:
logging.warning(f'insert_chapter_data error. url: {toc_url}')
succ = 0
break
if succ == 0 :
logging.warning(f'fetch_book_toc data error. url: {toc_url}')
continue
# 读取完毕,更新列表
row_id = db_tools.update_book_detail({
'href' : href,
**book_detail
})
if row_id:
logging.debug(f'update book succ. id: {row_id}, url: {href}')
else:
logging.warning(f'update book failed. url: {href}')
if debug:
return
# 直接获取小说内容
def fetch_contents():
while True:
list_data = db_tools.query_no_content_chapters(limit = 10 if debug else 100)
if list_data is None or len(list_data) <1 :
logging.info(f'no more data need fecth.')
return
for row in list_data:
url = row['href']
logging.info(f'fetching content ({row['title']}) from {url}')
content, next_url = fetch_chapter_content(url)
if content and content['title'] and content['contents']:
# 写入到数据表里
db_tools.insert_chapter_data({
'book_id': row['book_id'],
'chapter_id': row['chapter_id'],
'section_id': row['section_id'],
'title': row['title'],
'href': url,
'content': '\n\n'.join(content['contents']),
'has_content': 1
})
else:
logging.warning(f'fetch content error. url: {url}')
if debug:
return
'''
# 下载完整的小说
def fetch_book_data():
update_list = db_tools.query_books(need_update=1, limit = 1)
if update_list:
for row in update_list:
name = row['name']
href = row['href']
bookid = row['id']
# 先打开详情页
logging.info(f'----------fetching book {name}: {href}-------------')
book_detail = fetch_book_detail(href)
if book_detail:
# 获取内容页,然后循环读取内容
chapter_url = book_detail['start_page_href']
chapter_id = utils.extract_page_num(chapter_url)
# 断点续传,从上次拉取的最后一页开始
if not force:
last_chapter_url = db_tools.query_last_chapter_by_book(bookid)
if last_chapter_url:
chapter_url = last_chapter_url
while chapter_url:
logging.info(f'fetching page: {chapter_url}')
content, next_url = fetch_chapter_content(chapter_url)
if content and content['title'] and content['contents']:
# 写入到数据表里
db_tools.insert_chapter_data({
'book_id': bookid,
'chapter_id': chapter_id,
'title': content['title'],
'href': chapter_url,
'content': '\n\n'.join(content['contents']),
'has_content': 1
})
if debug:
return
else:
logging.warning(f'fetch content error. url: {chapter_url}')
chapter_url = next_url
# 读取完毕,更新列表
row_id = db_tools.update_book_detail({
'href' : href,
**book_detail
})
if row_id:
logging.debug(f'update book succ. id: {row_id}, url: {href}')
else:
logging.warning(f'update book failed. url: {href}')
else:
logging.warning(f'get book detail failed. url: {href}')
else:
logging.warning(f'get no data needed update.')
'''
# 建立缩写到函数的映射
function_map = {
"list": fetch_book_list,
"toc" : fetch_table_of_contents,
"content": fetch_contents,
}
# 主函数
def main(cmd, args_debug, args_force):
global debug
debug = args_debug
global force
force = args_force
# 执行指定的函数
if cmd:
function_names = args.cmd.split(",") # 拆分输入
for short_name in function_names:
func = function_map.get(short_name.strip()) # 从映射中获取对应的函数
if callable(func):
func()
else:
logging.warning(f" {short_name} is not a valid function shortcut.")
else: # 全量执行
for name, func in function_map.items():
if callable(func):
func()
else:
logging.warning(f" {short_name} is not a valid function shortcut.")
logging.info(f'all process completed!')
# TODO:
# 1,
if __name__ == "__main__":
# 命令行参数处理
keys_str = ",".join(function_map.keys())
parser = argparse.ArgumentParser(description='fetch aabook data.')
parser.add_argument("--cmd", type=str, help=f"Comma-separated list of function shortcuts: {keys_str}")
parser.add_argument('--debug', action='store_true', help='Enable debug mode (limit records)')
parser.add_argument('--force', action='store_true', help='force update (true for rewrite all)')
args = parser.parse_args()
main(args.cmd, args.debug, args.force)