import cloudscraper import time import json import csv import logging import signal import sys import os import re from bs4 import BeautifulSoup from requests.exceptions import RequestException from functools import partial import config # 定义基础 URL 和可变参数 host_url = "https://www.javdb.com" actors_uncensored_base_url = f'{host_url}/actors/uncensored' series_uncensored_base_url = f'{host_url}/series/uncensored' makers_uncensored_base_url = f'{host_url}/makers/uncensored' # 设置 headers 和 scraper headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } scraper = cloudscraper.create_scraper() #使用 CloudScraper 进行网络请求,并执行页面验证,支持不同解析器和预处理 def fetch_page(url, validator, max_retries=3, parser="html.parser", preprocessor=None): for attempt in range(max_retries): try: if 'javdb.com' not in url.lower(): logging.error(f'wrong url format: {url}') return None response = scraper.get(url, headers=headers) response.raise_for_status() # 处理 HTTP 错误 # 预处理 HTML(如果提供了 preprocessor) html_text = preprocessor(response.text) if preprocessor else response.text soup = BeautifulSoup(html_text, parser) if validator(soup): # 进行自定义页面检查 return soup logging.warning(f"Validation failed on attempt {attempt + 1} for {url}") except cloudscraper.exceptions.CloudflareChallengeError as e: logging.error(f"Cloudflare Challenge Error on {url}: {e}, Retring...") except cloudscraper.exceptions.CloudflareCode1020 as e: logging.error(f"Access Denied (Error 1020) on {url}: {e}, Retring...") except Exception as e: logging.error(f"Unexpected error on {url}: {e}, Retring...") logging.error(f'Fetching failed after max retries. {url}') return None # 达到最大重试次数仍然失败 # 修复 HTML 结构,去除多余标签并修正 标签,在获取人种的时候需要 def preprocess_html(html): return html.replace('
', '').replace('
current_page_number : next_url = host_url + next_page_url return list_data, next_url # 解析 HTML 内容,提取需要的数据 def parse_actor_detail(soup, href): div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5') if not div_movies: logging.warning(f"Warning: No movies div found ") return None, None # 解析元素 rows = div_movies.find_all('div', class_='item') list_data = [] next_url = None for row in rows: link = row.find('a', class_='box')['href'] serial_number = row.find('strong').text.strip() title = row.find('div', class_='video-title').text.strip() release_date = row.find('div', class_='meta').text.strip() list_data.append({ 'href' : host_url + link if link else '', 'serial_number' : serial_number, 'title' : title, 'release_date': release_date }) # 查找 "下一页" 按钮 next_page_element = soup.find('a', class_='pagination-next') if next_page_element: next_page_url = next_page_element['href'] next_page_number = url_page_num(next_page_url) current_page_number = url_page_num(href) logging.debug(f'current_page: {current_page_number}, next page_num: {next_page_number}') if current_page_number is None: current_page_number = 0 if next_page_number and next_page_number > current_page_number : next_url = host_url + next_page_url return list_data, next_url # 解析 HTML 内容,提取需要的数据 def parse_movie_detail(soup, href, title): div_video = soup.find("div", class_='video-meta-panel') if not div_video: logging.warning(f"Warning: No movies div found ") return None, None # 获取封面图片 cover_img = soup.select_one('.column-video-cover a') cover_url = cover_img['href'] if cover_img else None # 获取番号 serial = soup.select_one('.panel-block:first-child .value') serial_number = serial.text.strip() if serial else None # 获取日期 date = soup.select_one('.panel-block:nth-of-type(2) .value') release_date = date.text.strip() if date else None # 获取时长 duration = soup.select_one('.panel-block:nth-of-type(3) .value') video_duration = duration.text.strip() if duration else None # 获取片商 maker = soup.select_one('.panel-block:nth-of-type(4) .value a') maker_name = maker.text.strip() if maker else None maker_link = maker['href'] if maker else None # 获取系列 series = soup.select_one('.panel-block:nth-of-type(5) .value a') series_name = series.text.strip() if series else None series_link = series['href'] if series else None # 获取演员(名字 + 链接) actors = [{'name': actor.text.strip(), 'href': host_url + actor['href']} for actor in soup.select('.panel-block:nth-of-type(8) .value a')] return { 'href' : href, 'title' : title, 'cover_url': cover_url, 'serial_number': serial_number, 'release_date': release_date, 'duration': video_duration, 'maker_name': maker_name, 'maker_link': host_url + maker_link if maker_link else '', 'series_name': series_name, 'series_link': host_url + series_link if series_link else '', 'actors': actors } # 解析 HTML 内容,提取需要的数据 def parse_series_uncensored(soup, href): div_series = soup.find("div", id='series') if not div_series: logging.warning(f"Warning: No div_series div found ") return None, None # 解析元素 rows = div_series.find_all('a', class_='box') list_data = [] next_url = None for row in rows: name = row.find('strong').text.strip() href = row['href'] div_movies = row.find('span') movies = 0 if div_movies: match = re.search(r'\((\d+)\)', div_movies.text.strip()) if match: movies = int(match.group(1)) list_data.append({ 'name' : name, 'href' : host_url + href if href else '', 'movies' : movies }) # 查找 "下一页" 按钮 next_page_element = soup.find('a', class_='pagination-next') if next_page_element: next_page_url = next_page_element['href'] next_page_number = url_page_num(next_page_url) current_page_number = url_page_num(href) if current_page_number is None: current_page_number = 0 if next_page_number and next_page_number > current_page_number : next_url = host_url + next_page_url return list_data, next_url # 解析 HTML 内容,提取需要的数据 def parse_series_detail(soup, href): div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5') if not div_movies: logging.warning(f"Warning: No movies div found ") return None, None # 解析元素 rows = div_movies.find_all('div', class_='item') list_data = [] next_url = None for row in rows: link = row.find('a', class_='box')['href'] serial_number = row.find('strong').text.strip() title = row.find('div', class_='video-title').text.strip() release_date = row.find('div', class_='meta').text.strip() list_data.append({ 'href' : host_url + link if link else '', 'serial_number' : serial_number, 'title' : title, 'release_date': release_date }) # 查找 "下一页" 按钮 next_page_element = soup.find('a', class_='pagination-next') if next_page_element: next_page_url = next_page_element['href'] next_page_number = url_page_num(next_page_url) current_page_number = url_page_num(href) if current_page_number is None: current_page_number = 0 if next_page_number and next_page_number > current_page_number : next_url = host_url + next_page_url return list_data, next_url # 解析 HTML 内容,提取需要的数据 def parse_makers_uncensored(soup, href): div_series = soup.find("div", id='makers') if not div_series: logging.warning(f"Warning: No makers div found ") return None, None # 解析元素 rows = div_series.find_all('a', class_='box') list_data = [] next_url = None for row in rows: name = row.find('strong').text.strip() href = row['href'] div_movies = row.find('span') movies = 0 if div_movies: match = re.search(r'\((\d+)\)', div_movies.text.strip()) if match: movies = int(match.group(1)) list_data.append({ 'name' : name, 'href' : host_url + href if href else '', 'movies' : movies }) # 查找 "下一页" 按钮 next_page_element = soup.find('a', class_='pagination-next') if next_page_element: next_page_url = next_page_element['href'] next_page_number = url_page_num(next_page_url) current_page_number = url_page_num(href) if current_page_number is None: current_page_number = 0 if next_page_number and next_page_number > current_page_number : next_url = host_url + next_page_url return list_data, next_url # 解析 HTML 内容,提取需要的数据 def parse_maker_detail(soup, href): div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5') if not div_movies: logging.warning(f"Warning: No movies div found ") return None, None # 解析元素 rows = div_movies.find_all('div', class_='item') list_data = [] next_url = None for row in rows: link = row.find('a', class_='box')['href'] serial_number = row.find('strong').text.strip() title = row.find('div', class_='video-title').text.strip() release_date = row.find('div', class_='meta').text.strip() list_data.append({ 'href' : host_url + link if link else '', 'serial_number' : serial_number, 'title' : title, 'release_date': release_date }) # 查找 "下一页" 按钮 next_page_element = soup.find('a', class_='pagination-next') if next_page_element: next_page_url = next_page_element['href'] next_page_number = url_page_num(next_page_url) current_page_number = url_page_num(href) if current_page_number is None: current_page_number = 0 if next_page_number and next_page_number > current_page_number : next_url = host_url + next_page_url return list_data, next_url ###### 以下为测试代码 ###### def test_actors_list(): next_url = actors_uncensored_base_url while next_url: print(f'fetching page {next_url}') soup = fetch_page(next_url, partial(generic_validator, tag="div", identifier="actors", attr_type="id")) if soup: list_data, next_url = parse_actors_uncensored(soup, next_url) if list_data : print(list_data) else: print('get wrong page.') if next_url: print(next_url) break def test_actor(): next_url = 'https://javdb.com/actors/mdRn' all_data = [] while next_url: print(f'fetching page {next_url}') soup = fetch_page(next_url, partial(generic_validator, tag="div", identifier="movie-list h cols-4 vcols-5", attr_type="class")) if soup: list_data, next_url = parse_actor_detail(soup, next_url) if list_data : all_data.extend(list_data) else: print('get wrong page.') print(all_data) def test_movie_detail(): movie_url = 'https://javdb.com/v/gB2Q7' while True: soup = fetch_page(movie_url, partial(generic_validator, tag="div", identifier="video-detail", attr_type="class")) if soup: detail = parse_movie_detail(soup, movie_url, 'RED193 無碼 レッドホットフェティッシュコレクション 中出し120連発 4 : 波多野結衣, 愛乃なみ, 夢実あくび, 他多数') if detail: print(detail) break def test_series_list(): next_url = 'https://javdb.com/series/uncensored' all_data = [] while next_url: print(f'fetching page {next_url}') soup = fetch_page(next_url, partial(generic_validator, tag="div", identifier="series", attr_type="id")) if soup: list_data, next_url = parse_series_uncensored(soup, next_url) if list_data : all_data.extend(list_data) else: print('get wrong page.') break print(all_data) def test_series_detail(): next_url = 'https://javdb.com/series/39za' all_data = [] while next_url: print(f'fetching page {next_url}') soup = fetch_page(next_url, partial(generic_validator, tag="div", identifier="movie-list h cols-4 vcols-5", attr_type="class")) if soup: list_data, next_url = parse_series_detail(soup, next_url) if list_data : all_data.extend(list_data) else: print('get wrong page.') print(all_data) if __name__ == "__main__": #test_actors_list() #test_actor() test_movie_detail() #test_series_list() #test_series_detail()