Files
stock/scripts/javdb/src/scraper.py
2025-03-07 19:11:41 +08:00

504 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cloudscraper
import time
import json
import csv
import logging
import signal
import sys
import os
import re
from bs4 import BeautifulSoup
from requests.exceptions import RequestException
from functools import partial
import config
# 定义基础 URL 和可变参数
host_url = "https://www.javdb.com"
actors_uncensored_base_url = f'{host_url}/actors/uncensored'
series_uncensored_base_url = f'{host_url}/series/uncensored'
makers_uncensored_base_url = f'{host_url}/makers/uncensored'
# 设置 headers 和 scraper
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
scraper = cloudscraper.create_scraper()
#使用 CloudScraper 进行网络请求,并执行页面验证,支持不同解析器和预处理
def fetch_page(url, validator, max_retries=3, parser="html.parser", preprocessor=None):
for attempt in range(max_retries):
try:
if 'javdb.com' not in url.lower():
logging.error(f'wrong url format: {url}')
return None, None
response = scraper.get(url, headers=headers)
# 处理 HTTP 状态码
if response.status_code == 404:
logging.warning(f"Page not found (404): {url}")
return None, 404 # 直接返回 404调用方可以跳过
response.raise_for_status() # 处理 HTTP 错误
# 预处理 HTML如果提供了 preprocessor
html_text = preprocessor(response.text) if preprocessor else response.text
soup = BeautifulSoup(html_text, parser)
if validator(soup): # 进行自定义页面检查
return soup, response.status_code
logging.warning(f"Validation failed on attempt {attempt + 1} for {url}")
except cloudscraper.exceptions.CloudflareChallengeError as e:
logging.error(f"Cloudflare Challenge Error on {url}: {e}, Retring...")
except cloudscraper.exceptions.CloudflareCode1020 as e:
logging.error(f"Access Denied (Error 1020) on {url}: {e}, Retring...")
except Exception as e:
logging.error(f"Unexpected error on {url}: {e}, Retring...")
logging.error(f'Fetching failed after max retries. {url}')
return None, None # 达到最大重试次数仍然失败
# 修复 HTML 结构,去除多余标签并修正 <a> 标签,在获取人种的时候需要
def preprocess_html(html):
return html.replace('<br>', '').replace('<a ', '<a target="_blank" ')
# 通用的 HTML 结构验证器
def generic_validator(soup, tag, identifier, attr_type="id"):
if attr_type == "id":
return soup.find(tag, id=identifier) is not None
elif attr_type == "class":
return bool(soup.find_all(tag, class_=identifier))
elif attr_type == "name":
return bool(soup.find('select', {'name': identifier}))
return False
# 解析链接中的页码
def url_page_num(href):
if href is None:
return None
match = re.search(r'page=(\d+)', href)
if match:
next_page_number = int(match.group(1))
return next_page_number
else:
return None
# <span class="avatar" style="background-image: url(https://c0.jdbstatic.com/avatars/md/mdRn.jpg)"></span>
def parse_avatar_image(soup):
try:
span = soup.find("span", class_="avatar")
if not span:
return "" # 没有找到 <span> 元素,返回空字符串
style = span.get("style", "")
match = re.search(r'url\(["\']?(.*?)["\']?\)', style)
return match.group(1) if match else "" # 解析成功返回 URL否则返回空字符串
except Exception as e:
return "" # 发生异常时,返回空字符串
# 解析 HTML 内容,提取需要的数据
def parse_actors_uncensored(soup, href):
div_actors = soup.find("div", id='actors')
if not div_actors:
logging.warning(f"Warning: No actors div found ")
return None, None
# 解析元素
rows = div_actors.find_all('div', class_='box actor-box')
list_data = []
next_url = None
for row in rows:
# 获取演员详情链接
actor_link = row.find('a')['href']
# 获取演员名字
actor_name = row.find('strong').text.strip()
# 获取头像图片链接
avatar_url = row.find('img', class_='avatar')['src']
# 获取 title 属性中的别名
alias_list = row.find('a')['title'].split(", ")
list_data.append({
'name' : actor_name,
'href' : host_url + actor_link if actor_link else '',
'pic' : avatar_url,
'alias': alias_list
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_actor_detail(soup, href):
# 先找一下别名
alias_list = []
div_meta = soup.find('span', class_='actor-section-name')
if not div_meta:
logging.warning(f'warning: no meta data found in page {href}')
return None, None
alias_div = soup.find('div', class_='column section-title')
if alias_div:
meta_list = alias_div.find_all('span', class_='section-meta')
if len(meta_list) > 1:
alias_list = meta_list[0].text.strip().split(", ")
# 头像
pic = ''
avatar = soup.find("div", class_="column actor-avatar")
if avatar:
pic = parse_avatar_image(avatar)
# 返回数据
actor = {}
div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return None, None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
logging.debug(f'current_page: {current_page_number}, next page_num: {next_page_number}')
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
actor = {
'pic' : pic,
'alias' : alias_list,
'movies' : list_data
}
return actor, next_url
# 解析 HTML 内容,提取需要的数据
def parse_movie_detail(soup, href, title):
div_video = soup.find("div", class_='video-meta-panel')
if not div_video:
logging.warning(f"Warning: No movies div found ")
return None, None
# 获取封面图片
cover_img = soup.select_one('.column-video-cover a')
cover_url = cover_img['href'] if cover_img else None
# 获取番号
serial = soup.select_one('.panel-block:first-child .value')
serial_number = serial.text.strip() if serial else None
# 获取日期
date = soup.select_one('.panel-block:nth-of-type(2) .value')
release_date = date.text.strip() if date else None
# 获取时长
duration = soup.select_one('.panel-block:nth-of-type(3) .value')
video_duration = duration.text.strip() if duration else None
# 获取片商
maker = soup.select_one('.panel-block:nth-of-type(4) .value a')
maker_name = maker.text.strip() if maker else None
maker_link = maker['href'] if maker else None
# 获取系列
series = soup.select_one('.panel-block:nth-of-type(5) .value a')
series_name = series.text.strip() if series else None
series_link = series['href'] if series else None
# 获取演员(名字 + 链接)
actors = [{'name': actor.text.strip(), 'href': host_url + actor['href']} for actor in soup.select('.panel-block:nth-of-type(8) .value a')]
return {
'href' : href,
'title' : title,
'cover_url': cover_url,
'serial_number': serial_number,
'release_date': release_date,
'duration': video_duration,
'maker_name': maker_name,
'maker_link': host_url + maker_link if maker_link else '',
'series_name': series_name,
'series_link': host_url + series_link if series_link else '',
'actors': actors
}
# 解析 HTML 内容,提取需要的数据
def parse_series_uncensored(soup, href):
div_series = soup.find("div", id='series')
if not div_series:
logging.warning(f"Warning: No div_series div found ")
return None, None
# 解析元素
rows = div_series.find_all('a', class_='box')
list_data = []
next_url = None
for row in rows:
name = row.find('strong').text.strip()
href = row['href']
div_movies = row.find('span')
movies = 0
if div_movies:
match = re.search(r'\((\d+)\)', div_movies.text.strip())
if match:
movies = int(match.group(1))
list_data.append({
'name' : name,
'href' : host_url + href if href else '',
'movies' : movies
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_series_detail(soup, href):
div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_makers_uncensored(soup, href):
div_series = soup.find("div", id='makers')
if not div_series:
logging.warning(f"Warning: No makers div found ")
return None, None
# 解析元素
rows = div_series.find_all('a', class_='box')
list_data = []
next_url = None
for row in rows:
name = row.find('strong').text.strip()
href = row['href']
div_movies = row.find('span')
movies = 0
if div_movies:
match = re.search(r'\((\d+)\)', div_movies.text.strip())
if match:
movies = int(match.group(1))
list_data.append({
'name' : name,
'href' : host_url + href if href else '',
'movies' : movies
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_maker_detail(soup, href):
div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
###### 以下为测试代码 ######
def test_actors_list():
next_url = actors_uncensored_base_url
while next_url:
print(f'fetching page {next_url}')
soup = fetch_page(next_url, partial(generic_validator, tag="div", identifier="actors", attr_type="id"))
if soup:
list_data, next_url = parse_actors_uncensored(soup, next_url)
if list_data :
print(list_data)
else:
print('get wrong page.')
if next_url:
print(next_url)
break
def test_actor():
next_url = 'https://javdb.com/actors/mdRn'
all_data = []
while next_url:
print(f'fetching page {next_url}')
soup = fetch_page(next_url, partial(generic_validator, tag="div", identifier="movie-list h cols-4 vcols-5", attr_type="class"))
if soup:
list_data, next_url = parse_actor_detail(soup, next_url)
if list_data :
all_data.extend(list_data)
else:
print('get wrong page.')
print(all_data)
def test_movie_detail():
movie_url = 'https://javdb.com/v/gB2Q7'
while True:
soup = fetch_page(movie_url, partial(generic_validator, tag="div", identifier="video-detail", attr_type="class"))
if soup:
detail = parse_movie_detail(soup, movie_url, 'RED193 無碼 レッドホットフェティッシュコレクション 中出し120連発 4 : 波多野結衣, 愛乃なみ, 夢実あくび, 他多数')
if detail:
print(detail)
break
def test_series_list():
next_url = 'https://javdb.com/series/uncensored'
all_data = []
while next_url:
print(f'fetching page {next_url}')
soup = fetch_page(next_url, partial(generic_validator, tag="div", identifier="series", attr_type="id"))
if soup:
list_data, next_url = parse_series_uncensored(soup, next_url)
if list_data :
all_data.extend(list_data)
else:
print('get wrong page.')
break
print(all_data)
def test_series_detail():
next_url = 'https://javdb.com/series/39za'
all_data = []
while next_url:
print(f'fetching page {next_url}')
soup = fetch_page(next_url, partial(generic_validator, tag="div", identifier="movie-list h cols-4 vcols-5", attr_type="class"))
if soup:
list_data, next_url = parse_series_detail(soup, next_url)
if list_data :
all_data.extend(list_data)
else:
print('get wrong page.')
print(all_data)
if __name__ == "__main__":
#test_actors_list()
#test_actor()
test_movie_detail()
#test_series_list()
#test_series_detail()