import requests from bs4 import BeautifulSoup import os import sys import random import time import re import logging from datetime import datetime from datetime import date import config # 日志配置 from down_list import novel_map # 日志 config.setup_logging() # 配置基础URL和输出文件 base_url = 'https://aabook.xyz' list_url_wordcount = 'https://aabook.xyz/category.html?pageNum={}&pageSize=30&catId=-1&size=-1&isFinish=-1&updT=-1&orderBy=wordcount' list_url_update = 'https://aabook.xyz/category.html?pageNum={}&pageSize=30&catId=-1&size=-1&isFinish=-1&updT=-1&orderBy=update' curr_novel_pages = 0 meta_dir = 'meta' list_file = f'{meta_dir}/list.txt' details_file = f'{meta_dir}/details.txt' down_list_file = f'{meta_dir}/down_list.txt' # User-Agent 列表 user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.63 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.67", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:90.0) Gecko/20100101 Firefox/90.0", "Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Mobile Safari/537.36" ] # 定义获取页面内容的函数,带重试机制 def get_page_content(url, max_retries=100, sleep_time=5, default_timeout=10): retries = 0 # 随机选择一个 User-Agent headers = { 'User-Agent': random.choice(user_agents) } while retries < max_retries: try: response = requests.get(url, headers=headers, timeout=default_timeout, stream=True) response.raise_for_status() return response.text # 请求成功,返回内容 except requests.RequestException as e: retries += 1 logging.info(f"Warn fetching page {url}: {e}. Retrying {retries}/{max_retries}...") if retries >= max_retries: logging.error(f"Failed to fetch page {url} after {max_retries} retries.") return None time.sleep(sleep_time) # 休眠指定的时间,然后重试 # 获取排行列表 def get_list(write_list_file = list_file, list_url = list_url_wordcount, start_date = '2000-01-01', order_by_date = False): page_num = 1 start_time = datetime.strptime(f'{start_date} 00:00:00', "%Y-%m-%d %H:%M:%S") with open(write_list_file, 'w', encoding='utf-8') as f: while True: # 发起请求 list_url = list_url.format(page_num) logging.info(f"Fetching page [{page_num}] {list_url}") content = get_page_content(list_url) soup = BeautifulSoup(content, 'html.parser') # 查找书籍列表 list_main = soup.find('div', class_='list_main') if not list_main: logging.info("No list_main Found. retry...") continue tbody = list_main.find('tbody') if not tbody: logging.info("No tbody found. retry...") continue # 获取每本书的基础信息:排名、分类、书名、作者、月票、更新时间(按字数排序时是总字数,按日期排序时是最后更新日期) for tr in tbody.find_all('tr'): tds = tr.find_all('td') if len(tds) < 6: logging.info("Invalid tr format.") continue ranking = tds[0].text.strip() category = tds[1].text.strip() book_link_tag = tds[2].find('a') book_name = book_link_tag.text.strip() book_link = base_url + '/' + book_link_tag['href'] author = tds[3].text.strip() monthly_tickets = tds[4].text.strip() update_time = tds[5].text.strip() #实际上是字数(按字数排序时是总字数,按日期排序时是最后更新日期) # 检查更新 if order_by_date : up_time = datetime.strptime(update_time, "%Y-%m-%d %H:%M:%S") if start_time > up_time: return # 写入 aabook_list.txt # 排名 分类 书名 作者 月票 字数(更新日期) 书本链接 f.write(f"{ranking}\t{category}\t{book_name}\t{author}\t{monthly_tickets}\t{update_time}\t{book_link}\n") f.flush() # 查找下一页链接 next_page_tag = soup.find('a', title='下一页') if next_page_tag: list_url = base_url + next_page_tag['href'] page_num += 1 else: logging.info("No next page, stopping.") break time.sleep(3) #break ## for test # 拉取详情,并校验 def fetch_detail_and_check(url, book_name): while True: contenxt = get_page_content(url) soup = BeautifulSoup(contenxt, 'html.parser') # 解析书籍详细信息 book_info_tag = soup.find('li', class_='zuopinxinxi') if not book_info_tag: logging.info(f"No details found for {book_name}, retry...") continue book_info_lis = book_info_tag.find_all('li') if len(book_info_lis) < 4: logging.info(f"invalid book info. {book_name}. retry...") continue return contenxt # 获取每本书的详情 def get_detail(write_list_file = list_file, wirte_details_file = details_file): # 读取已完成详细信息的书籍链接 if os.path.exists(wirte_details_file): with open(wirte_details_file, 'r', encoding='utf-8') as f: completed_links = set(line.split('\t')[4] for line in f.readlines()) else: completed_links = set() with open(write_list_file, 'r', encoding='utf-8') as f_list, open(wirte_details_file, 'a', encoding='utf-8') as f_details: for line in f_list: fields = line.strip().split('\t') if len(fields) < 7: continue book_link = fields[6] book_name = fields[2] if book_link in completed_links: logging.info(f"Skipping {book_name} {book_link}, already processed.") continue # 访问书籍详细页 logging.info(f"Fetching details for {book_name} {book_link}") #contenxt = get_page_content(book_link) contenxt = fetch_detail_and_check(book_link, book_name) soup = BeautifulSoup(contenxt, 'html.parser') # 解析书籍详细信息 book_info_tag = soup.find('li', class_='zuopinxinxi') if not book_info_tag: logging.info(f"No details found for {book_name}, skipping.") continue book_info_lis = book_info_tag.find_all('li') if len(book_info_lis) < 4: logging.info(f"invalid book info. {book_name}") continue book_category = book_info_lis[0].find('span').text.strip() book_status = book_info_lis[1].find('span').text.strip() total_word_count = book_info_lis[2].find('span').text.strip() total_clicks = book_info_lis[3].find('span').text.strip() # 去掉后面的汉字,只要数字 total_word_count = int(re.search(r'\d+', total_word_count).group()) # 读取创建时间 creation_time_tag = soup.find('li', class_='update_time') creation_time = creation_time_tag.text.strip() if creation_time_tag else 'N/A' # 获取起始页链接和编号 start_page_tag = soup.find('ul', class_='gezhonganniu').find_all('li')[0].find('a') start_page_link = base_url + '/' + start_page_tag['href'] start_page_number = start_page_link.split('-')[-1].replace('.html', '') # 写入 aabook_details.txt # 排名 类别 书名 作者 书本链接 首页链接 开始链接编码 状态 总字数 总点击 总字数 创建时间 f_details.write(f"{fields[0]}\t{book_category}\t{fields[2]}\t{fields[3]}\t{book_link}\t" f"{start_page_link}\t{start_page_number}\t{book_status}\t{total_word_count}\t" f"{total_clicks}\t{fields[5]}\t{creation_time}\n") f_details.flush() time.sleep(5) # 解析内容中的水印部分 def clean_watermarks(html): """ 过滤掉带有 class 属性的水印标签及其内部内容,保留其他标签结构。 """ # 使用正则表达式匹配并移除任何带有 class 属性的 HTML 标签及其内容 cleaned_html = re.sub(r'<[^>]+class="[^"]+">.*?]+>', '', html, flags=re.DOTALL) return cleaned_html def process_paragraph(paragraph): # 获取完整的 HTML 结构,而不是 get_text() paragraph_html = str(paragraph) # 移除水印标签 cleaned_html = clean_watermarks(paragraph_html) # 使用 BeautifulSoup 解析移除水印标签后的 HTML 并提取文本 soup = BeautifulSoup(cleaned_html, 'html.parser') cleaned_text = soup.get_text().strip() return cleaned_text # 从 script 标签中提取 content_url def extract_content_url(soup, base_url, chapid): # 找到所有