441 lines
16 KiB
Python
441 lines
16 KiB
Python
import cloudscraper
|
||
import time
|
||
import json
|
||
import csv
|
||
import logging
|
||
import signal
|
||
import sys
|
||
import os
|
||
import re
|
||
from bs4 import BeautifulSoup
|
||
from requests.exceptions import RequestException
|
||
from functools import partial
|
||
#import config
|
||
#import utils
|
||
|
||
# 定义基础 URL 和可变参数
|
||
host_url = "https://www.javdb.com"
|
||
actors_uncensored_base_url = f'{host_url}/actors/uncensored'
|
||
series_uncensored_base_url = f'{host_url}/series/uncensored'
|
||
makers_uncensored_base_url = f'{host_url}/makers/uncensored'
|
||
|
||
# 设置 headers 和 scraper
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
||
}
|
||
scraper = cloudscraper.create_scraper()
|
||
|
||
http_code_404 = 404
|
||
http_code_login = 401
|
||
http_code_local = 99
|
||
|
||
save_raw_html = True
|
||
load_from_local = True
|
||
|
||
|
||
def common_parser(html, page, **kwargs):
|
||
parser = "html.parser"
|
||
soup = BeautifulSoup(html, parser)
|
||
if not soup:
|
||
return None
|
||
if page == 'actor_list':
|
||
#parse_actors_uncensored(soup, href):
|
||
#return list_data, next_url
|
||
return parse_actors_uncensored(soup, **kwargs)
|
||
elif page == 'actor':
|
||
#parse_actor_detail(soup, href):
|
||
#return actor, next_url
|
||
return parse_actor_detail(soup, **kwargs)
|
||
elif page == 'makers_series_list':
|
||
#parse_maker_series_list(soup, href, category): series / makers
|
||
#return list_data, next_url
|
||
return parse_maker_series_list(soup, **kwargs)
|
||
elif page == 'movie_list':
|
||
#parse_movie_list(soup, href):
|
||
#return list_data, next_url
|
||
return parse_movie_list(soup, **kwargs)
|
||
elif page == 'movies':
|
||
#parse_movie_detail(soup, href, title):
|
||
#return result
|
||
return parse_movie_detail(soup, **kwargs)
|
||
elif page == 'search':
|
||
#parse_uncensored(soup, href):
|
||
#return list_data, next_url
|
||
return parse_uncensored(soup, **kwargs)
|
||
else:
|
||
logging.warning(f"wrong page: {page}")
|
||
return None
|
||
|
||
'''
|
||
#使用 CloudScraper 进行网络请求,并执行页面验证,支持不同解析器和预处理
|
||
def fetch_page(url, validator, max_retries=3, parser="html.parser", preprocessor=None):
|
||
if load_from_local: # 从本地读取的逻辑
|
||
html = utils.read_raw_html(url)
|
||
if html:
|
||
# 预处理 HTML(如果提供了 preprocessor)
|
||
html_text = preprocessor(html) if preprocessor else html
|
||
|
||
soup = BeautifulSoup(html_text, parser)
|
||
if validator(soup): # 进行自定义页面检查
|
||
logging.debug(f"read from local. href: {url}")
|
||
return soup, http_code_local # 返回一个小于100的错误码,表明是从本地返回的
|
||
|
||
for attempt in range(max_retries):
|
||
try:
|
||
if 'javdb.com' not in url.lower():
|
||
logging.error(f'wrong url format: {url}')
|
||
return None, None
|
||
|
||
response = scraper.get(url, headers=headers)
|
||
|
||
# 处理 HTTP 状态码
|
||
if response.status_code == 404:
|
||
logging.debug(f"Page not found (404): {url}")
|
||
return None, http_code_404 # 直接返回 404,调用方可以跳过
|
||
|
||
response.raise_for_status() # 处理 HTTP 错误
|
||
|
||
# 检查是否发生跳转,比如到登录页面
|
||
if response.history:
|
||
logging.debug(f"Page redirected on {url}. Checking if it's a login page.")
|
||
soup = BeautifulSoup(response.text, parser)
|
||
# 判断是否为登录页面,
|
||
if soup.find('nav', class_='panel form-panel'):
|
||
logging.debug(f"Page redirected to login page on {url}.")
|
||
return None, http_code_login
|
||
|
||
if save_raw_html:
|
||
utils.write_raw_html(url, response.text)
|
||
|
||
# 预处理 HTML(如果提供了 preprocessor)
|
||
html_text = preprocessor(response.text) if preprocessor else response.text
|
||
|
||
soup = BeautifulSoup(html_text, parser)
|
||
if validator(soup): # 进行自定义页面检查
|
||
return soup, response.status_code
|
||
|
||
logging.warning(f"Validation failed on attempt {attempt + 1} for {url}")
|
||
except cloudscraper.exceptions.CloudflareChallengeError as e:
|
||
logging.error(f"Cloudflare Challenge Error on {url}: {e}, Retring...")
|
||
except cloudscraper.exceptions.CloudflareCode1020 as e:
|
||
logging.error(f"Access Denied (Error 1020) on {url}: {e}, Retring...")
|
||
except Exception as e:
|
||
logging.error(f"Unexpected error on {url}: {e}, Retring...")
|
||
|
||
logging.error(f'Fetching failed after max retries. {url}')
|
||
return None, None # 达到最大重试次数仍然失败
|
||
|
||
'''
|
||
|
||
# 修复 HTML 结构,去除多余标签并修正 <a> 标签,在获取人种的时候需要
|
||
def preprocess_html(html):
|
||
return html.replace('<br>', '').replace('<a ', '<a target="_blank" ')
|
||
|
||
# 通用的 HTML 结构验证器
|
||
def generic_validator(soup, tag, identifier, attr_type="id"):
|
||
if attr_type == "id":
|
||
return soup.find(tag, id=identifier) is not None
|
||
elif attr_type == "class":
|
||
return bool(soup.find_all(tag, class_=identifier))
|
||
elif attr_type == "name":
|
||
return bool(soup.find('select', {'name': identifier}))
|
||
return False
|
||
|
||
# 解析链接中的页码
|
||
def url_page_num(href):
|
||
if href is None:
|
||
return None
|
||
match = re.search(r'page=(\d+)', href)
|
||
if match:
|
||
next_page_number = int(match.group(1))
|
||
return next_page_number
|
||
else:
|
||
return None
|
||
|
||
|
||
# <span class="avatar" style="background-image: url(https://c0.jdbstatic.com/avatars/md/mdRn.jpg)"></span>
|
||
def parse_avatar_image(soup):
|
||
try:
|
||
span = soup.find("span", class_="avatar")
|
||
if not span:
|
||
return "" # 没有找到 <span> 元素,返回空字符串
|
||
|
||
style = span.get("style", "")
|
||
match = re.search(r'url\(["\']?(.*?)["\']?\)', style)
|
||
return match.group(1) if match else "" # 解析成功返回 URL,否则返回空字符串
|
||
except Exception as e:
|
||
return "" # 发生异常时,返回空字符串
|
||
|
||
|
||
# 解析 HTML 内容,提取需要的数据
|
||
def parse_actors_uncensored(soup, href):
|
||
div_actors = soup.find("div", id='actors')
|
||
if not div_actors:
|
||
logging.warning(f"Warning: No actors div found ")
|
||
return None, None
|
||
|
||
# 解析元素
|
||
rows = div_actors.find_all('div', class_='box actor-box')
|
||
|
||
list_data = []
|
||
next_url = None
|
||
for row in rows:
|
||
# 获取演员详情链接
|
||
actor_link = row.find('a')['href']
|
||
# 获取演员名字
|
||
actor_name = row.find('strong').text.strip()
|
||
# 获取头像图片链接
|
||
avatar_url = row.find('img', class_='avatar')['src']
|
||
# 获取 title 属性中的别名
|
||
alias_list = row.find('a')['title'].split(", ")
|
||
|
||
list_data.append({
|
||
'name' : actor_name,
|
||
'href' : host_url + actor_link if actor_link else '',
|
||
'pic' : avatar_url,
|
||
'alias': alias_list
|
||
})
|
||
|
||
# 查找 "下一页" 按钮
|
||
next_page_element = soup.find('a', class_='pagination-next')
|
||
if next_page_element:
|
||
next_page_url = next_page_element['href']
|
||
next_page_number = url_page_num(next_page_url)
|
||
current_page_number = url_page_num(href)
|
||
if current_page_number is None:
|
||
current_page_number = 0
|
||
if next_page_number and next_page_number > current_page_number :
|
||
next_url = host_url + next_page_url
|
||
|
||
return list_data, next_url
|
||
|
||
|
||
# 解析 HTML 内容,提取需要的数据
|
||
def parse_actor_detail(soup, href):
|
||
# 先找一下别名
|
||
alias_list = []
|
||
movies_text = ''
|
||
movies_cnt = 0
|
||
|
||
div_meta = soup.find('span', class_='actor-section-name')
|
||
if not div_meta:
|
||
logging.warning(f'warning: no meta data found in page {href}')
|
||
return None, None
|
||
alias_div = soup.find('div', class_='column section-title')
|
||
|
||
if alias_div:
|
||
meta_list = alias_div.find_all('span', class_='section-meta')
|
||
if len(meta_list) > 1:
|
||
alias_list = meta_list[0].text.strip().split(", ")
|
||
movies_text = meta_list[1].text.strip()
|
||
try:
|
||
match = re.search(r'(\d+)\s+movie\(s\)', movie_elem.strip(), re.IGNORECASE)
|
||
if match:
|
||
movies_cnt = int(match.group(1))
|
||
except Exception as e:
|
||
movies_cnt = 0
|
||
|
||
# 头像
|
||
pic = ''
|
||
avatar = soup.find("div", class_="column actor-avatar")
|
||
if avatar:
|
||
pic = parse_avatar_image(avatar)
|
||
|
||
# 返回数据
|
||
actor = {}
|
||
|
||
# 使用正则表达式查找 class 包含 'movie-list h cols-4' 的 div 元素
|
||
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-'))
|
||
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
|
||
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-8')
|
||
if not div_movies:
|
||
logging.warning(f"Warning: No movies div found ")
|
||
return None, None
|
||
|
||
# 解析元素
|
||
rows = div_movies.find_all('div', class_='item')
|
||
|
||
list_data = []
|
||
next_url = None
|
||
for row in rows:
|
||
link = row.find('a', class_='box')['href']
|
||
serial_number = row.find('strong').text.strip()
|
||
title = row.find('div', class_='video-title').text.strip()
|
||
release_date = row.find('div', class_='meta').text.strip()
|
||
list_data.append({
|
||
'href' : host_url + link if link else '',
|
||
'serial_number' : serial_number,
|
||
'title' : title,
|
||
'release_date': release_date
|
||
})
|
||
|
||
# 查找 "下一页" 按钮
|
||
next_page_element = soup.find('a', class_='pagination-next')
|
||
if next_page_element:
|
||
next_page_url = next_page_element['href']
|
||
next_page_number = url_page_num(next_page_url)
|
||
current_page_number = url_page_num(href)
|
||
logging.debug(f'current_page: {current_page_number}, next page_num: {next_page_number}')
|
||
if current_page_number is None:
|
||
current_page_number = 0
|
||
if next_page_number and next_page_number > current_page_number :
|
||
next_url = host_url + next_page_url
|
||
|
||
actor = {
|
||
'pic' : pic,
|
||
'movies_text' : movies_text,
|
||
'movies_cnt' : movies_cnt,
|
||
'movies' : list_data
|
||
}
|
||
|
||
return actor, next_url
|
||
|
||
|
||
# 解析单个元素
|
||
def parse_movie_one(soup, keys):
|
||
key_strong = soup.find('strong', string=lambda text: text in keys)
|
||
if key_strong:
|
||
key_span = key_strong.find_next_sibling('span', class_='value')
|
||
if key_span:
|
||
return key_span.text.strip()
|
||
return None
|
||
|
||
# 解析值和链接
|
||
def parse_movie_val_href(soup, keys):
|
||
key_strong = soup.find('strong', string=lambda text: text in keys)
|
||
if key_strong:
|
||
key_span = key_strong.find_next_sibling('span', class_='value')
|
||
if key_span:
|
||
a_tag = key_span.find('a')
|
||
if a_tag:
|
||
return a_tag.text.strip(), host_url + a_tag.get('href')
|
||
else:
|
||
return key_span.text.strip(), None
|
||
return None, None
|
||
|
||
# 解析多个值和链接
|
||
def parse_movie_arr(soup, keys):
|
||
key_strong = soup.find('strong', string=lambda text: text in keys)
|
||
if key_strong:
|
||
key_span = key_strong.find_next_sibling('span', class_='value')
|
||
if key_span:
|
||
actors = []
|
||
a_tags = key_span.find_all('a')
|
||
for a_tag in a_tags:
|
||
actors.append({
|
||
'name': a_tag.text.strip(),
|
||
'href': host_url + a_tag.get('href')
|
||
})
|
||
return actors
|
||
return []
|
||
|
||
# 解析 HTML 内容,提取需要的数据
|
||
def parse_movie_detail(soup, href, title):
|
||
div_video = soup.find("div", class_='video-meta-panel')
|
||
if not div_video:
|
||
logging.warning(f"Warning: No movies div found ")
|
||
return None, None
|
||
|
||
result = {}
|
||
result['href'] = href
|
||
result['title'] = title
|
||
|
||
# 获取封面图片
|
||
cover_img = soup.select_one('.column-video-cover a')
|
||
result['cover_url'] = cover_img['href'] if cover_img else None
|
||
|
||
# 获取番号
|
||
result['serial_number'] = parse_movie_one(soup, ['番號:', 'ID:'])
|
||
result['release_date'] = parse_movie_one(soup, ['日期:', 'Released Date:'])
|
||
result['duration'] = parse_movie_one(soup, ['時長:', 'Duration:'])
|
||
|
||
# 获取maker,系列
|
||
result['maker_name'], result['maker_link'] = parse_movie_val_href(soup, ['片商:', 'Maker:'])
|
||
result['series_name'], result['series_link'] = parse_movie_val_href(soup, ['系列:', 'Series:'])
|
||
result['pub_name'], result['pub_link'] = parse_movie_val_href(soup, ['發行:', 'Publisher:'])
|
||
|
||
# 获取演员,tags
|
||
result['tags'] = parse_movie_arr(soup, ['類別:', 'Tags:'])
|
||
result['actors'] = parse_movie_arr(soup, ['演員:', 'Actor(s):'])
|
||
|
||
return result
|
||
|
||
# 解析 HTML 内容,提取需要的数据
|
||
def parse_maker_series_list(soup, href, category):
|
||
div_series = soup.find("div", id=category)
|
||
if not div_series:
|
||
logging.warning(f"Warning: No div_series div found ")
|
||
return None, None
|
||
|
||
# 解析元素
|
||
rows = div_series.find_all('a', class_='box')
|
||
|
||
list_data = []
|
||
next_url = None
|
||
for row in rows:
|
||
name = row.find('strong').text.strip()
|
||
href = row['href']
|
||
div_movies = row.find('span')
|
||
movies = 0
|
||
if div_movies:
|
||
match = re.search(r'\((\d+)\)', div_movies.text.strip())
|
||
if match:
|
||
movies = int(match.group(1))
|
||
|
||
list_data.append({
|
||
'name' : name,
|
||
'href' : host_url + href if href else '',
|
||
'movies' : movies
|
||
})
|
||
|
||
# 查找 "下一页" 按钮
|
||
next_page_element = soup.find('a', class_='pagination-next')
|
||
if next_page_element:
|
||
next_page_url = next_page_element['href']
|
||
next_page_number = url_page_num(next_page_url)
|
||
current_page_number = url_page_num(href)
|
||
if current_page_number is None:
|
||
current_page_number = 0
|
||
if next_page_number and next_page_number > current_page_number :
|
||
next_url = host_url + next_page_url
|
||
|
||
return list_data, next_url
|
||
|
||
# 解析 HTML 内容,提取需要的数据
|
||
def parse_movie_list(soup, href):
|
||
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
|
||
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
|
||
if not div_movies:
|
||
logging.warning(f"Warning: No movies div found ")
|
||
return [], None
|
||
|
||
# 解析元素
|
||
rows = div_movies.find_all('div', class_='item')
|
||
|
||
list_data = []
|
||
next_url = None
|
||
for row in rows:
|
||
link = row.find('a', class_='box')['href']
|
||
serial_number = row.find('strong').text.strip()
|
||
title = row.find('div', class_='video-title').text.strip()
|
||
release_date = row.find('div', class_='meta').text.strip()
|
||
list_data.append({
|
||
'href' : host_url + link if link else '',
|
||
'serial_number' : serial_number,
|
||
'title' : title,
|
||
'release_date': release_date
|
||
})
|
||
|
||
# 查找 "下一页" 按钮
|
||
next_page_element = soup.find('a', class_='pagination-next')
|
||
if next_page_element:
|
||
next_page_url = next_page_element['href']
|
||
next_page_number = url_page_num(next_page_url)
|
||
current_page_number = url_page_num(href)
|
||
if current_page_number is None:
|
||
current_page_number = 0
|
||
if next_page_number and next_page_number > current_page_number :
|
||
next_url = host_url + next_page_url
|
||
|
||
return list_data, next_url
|