modify scripts

This commit is contained in:
oscarz
2025-06-24 11:39:29 +08:00
parent 12c53b043d
commit c5feab2c22
7 changed files with 808 additions and 1773 deletions

593
src/crawling/craw.py Normal file
View File

@ -0,0 +1,593 @@
import logging
import sys
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import src.utils.utils as utils
http_code_404 = 404
http_code_redirect = 401
http_code_url = 601
# 通用的爬取类,主要实现了底层的网络交互封装
class GenericCrawler:
def __init__(self, use_cloudscraper=None, headers=None, cookies=None, max_retries=3, html_parser='html.parser'):
if use_cloudscraper is None:
use_cloudscraper = sys.version_info >= (3, 8)
self.use_cloudscraper = use_cloudscraper
self.headers = headers or {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0'
}
self.cookies = cookies or {}
self.scraper = None # 延迟初始化
self.max_retries = max_retries
self.parser = html_parser
# 不在这里导入 cloudscraper而是在需要时导入
def _initialize_scraper(self):
"""延迟初始化请求客户端,避免不必要的 cloudscraper 导入"""
if self.scraper is not None:
return
if self.use_cloudscraper:
try:
# 延迟导入 cloudscraper
import cloudscraper
self.scraper = cloudscraper.create_scraper()
logging.info("Using cloudscraper for requests")
except ImportError:
logging.warning("cloudscraper not installed. Falling back to requests.")
self.use_cloudscraper = False
self.scraper = requests.Session()
else:
self.scraper = requests.Session()
logging.info("Using requests for HTTP operations")
def fetch_page(self, url, validator):
# 在使用前初始化 scraper
self._initialize_scraper()
for attempt in range(self.max_retries):
try:
if not utils.is_valid_url(url):
logging.error(f'wrong url format: {url}')
return None, http_code_url
response = self.scraper.get(url, headers=self.headers, cookies=self.cookies)
# 处理 HTTP 状态码
if response.status_code == http_code_404:
logging.debug(f"Page not found (404): {url}")
return None, http_code_404 # 直接返回 404调用方可以跳过
response.raise_for_status() # 处理 HTTP 错误
# 检查是否发生跳转,比如到登录页面
if response.history:
logging.debug(f"Page redirected on {url}. Checking if it's a verify page.")
soup = BeautifulSoup(response.text, self.parser)
if self.check_redirect(soup) :
logging.warning(f"Page redirected to verify page on {url}.")
return None, http_code_redirect
# 判断是否为登录页面
#if soup.find('div', id='ageVerify'):
# 预处理 HTML如果提供了 preprocessor
html_text = self.preprocessor(response.text)
soup = BeautifulSoup(html_text, self.parser)
if validator(soup): # 进行自定义页面检查
return soup, response.status_code
logging.warning(f"Validation failed on attempt {attempt + 1} for {url}")
except Exception as e:
logging.error(f"Unexpected error on {url}: {e}, Retrying...")
logging.error(f'Fetching failed after max retries. {url}')
return None, None # 达到最大重试次数仍然失败
# 对页面的预处理,通常是修复标签之类的
def preprocessor(self, html):
return html
# 检查是否发生了跳转,偏离了正常解析
def check_redirect(self, soup):
"""默认的页面验证器,子类可重写"""
return False # 默认验证通过
@staticmethod
def generic_validator(soup, tag, identifier, attr_type="id"):
if attr_type == "id":
return soup.find(tag, id=identifier) is not None
elif attr_type == "class":
return bool(soup.find_all(tag, class_=identifier))
elif attr_type == "name":
return bool(soup.find('select', {'name': identifier}))
return False
# javbus.com 网页爬取类
class JavbusCrawler(GenericCrawler):
def __init__(self, use_cloudscraper=None):
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Sec-Fetch-Site": "none",
"Accept-Encoding": "gzip, deflate, br",
"Sec-Fetch-Mode": "navigate",
"Host": "www.javbus.com",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15",
"Accept-Language": "zh-CN,zh-Hans;q=0.9",
"Sec-Fetch-Dest": "document",
"Connection": "keep-alive",
}
cookies = {
'PHPSESSID': 'l9m4ugaaao1hgvl3micr22u3o6',
'existmag': 'all',
'age': 'verified'
}
super().__init__(use_cloudscraper, headers=headers, cookies=cookies)
self.host_url = "https://www.javbus.com"
# 以下是原有的解析函数,保持不变
def parse_actors_list(self, soup, href):
div_actors = soup.find("div", id='waterfall')
if not div_actors:
logging.warning(f"Warning: No actors div found ")
return None, None
# 解析元素
rows = div_actors.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
# 获取演员详情链接
actor_link = row.find('a')['href']
# 获取演员名字
actor_name = row.find('span').text.strip()
# 获取头像图片链接
avatar_url = row.find('img')['src']
list_data.append({
'name': actor_name,
'href': urljoin(self.host_url, actor_link),
'pic': avatar_url
})
# 查找 "下一页" 按钮
div_link = soup.find("div", class_='text-center hidden-xs')
if div_link:
next_page_element = soup.find('a', id='next')
if next_page_element:
next_page_url = next_page_element['href']
next_url = urljoin(href, next_page_url)
return list_data, next_url
def parse_actor_detail(self, soup, href):
# 先找一下别名
alias_list = []
div_meta = soup.find('span', class_='actor-section-name')
if not div_meta:
logging.warning(f'warning: no meta data found in page {href}')
return None, None
alias_div = soup.find('div', class_='column section-title')
if alias_div:
meta_list = alias_div.find_all('span', class_='section-meta')
if len(meta_list) > 1:
alias_list = meta_list[0].text.strip().split(", ")
# 头像
pic = ''
avatar = soup.find("div", class_="column actor-avatar")
if avatar:
pic = self.parse_avatar_image(avatar)
# 返回数据
actor = {}
# 使用正则表达式查找 class 包含 'movie-list h cols-4' 的 div 元素
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-'))
# div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
# div_movies = soup.find("div", class_='movie-list h cols-4 vcols-8')
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return None, None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href': host_url + link if link else '',
'serial_number': serial_number,
'title': title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = self.url_page_num(next_page_url)
current_page_number = self.url_page_num(href)
logging.debug(f'current_page: {current_page_number}, next page_num: {next_page_number}')
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number:
next_url = host_url + next_page_url
actor = {
'pic': pic,
'alias': alias_list,
'movies': list_data
}
return actor, next_url
def parse_movie_one(self, soup, keys):
key_strong = soup.find('strong', string=lambda text: text in keys)
if key_strong:
key_span = key_strong.find_next_sibling('span', class_='value')
if key_span:
return key_span.text.strip()
return None
def parse_movie_val_href(self, soup, keys):
key_strong = soup.find('strong', string=lambda text: text in keys)
if key_strong:
key_span = key_strong.find_next_sibling('span', class_='value')
if key_span:
a_tag = key_span.find('a')
if a_tag:
return a_tag.text.strip(), host_url + a_tag.get('href')
else:
return key_span.text.strip(), None
return None, None
def parse_movie_arr(self, soup, keys):
key_strong = soup.find('strong', string=lambda text: text in keys)
if key_strong:
key_span = key_strong.find_next_sibling('span', class_='value')
if key_span:
actors = []
a_tags = key_span.find_all('a')
for a_tag in a_tags:
actors.append({
'name': a_tag.text.strip(),
'href': host_url + a_tag.get('href')
})
return actors
return []
def parse_movie_detail(self, soup, href, title):
div_video = soup.find("div", class_='video-meta-panel')
if not div_video:
logging.warning(f"Warning: No movies div found ")
return None, None
result = {}
result['href'] = href
result['title'] = title
# 获取封面图片
cover_img = soup.select_one('.column-video-cover a')
result['cover_url'] = cover_img['href'] if cover_img else None
# 获取番号
result['serial_number'] = self.parse_movie_one(soup, ['番號:', 'ID:'])
result['release_date'] = self.parse_movie_one(soup, ['日期:', 'Released Date:'])
result['duration'] = self.parse_movie_one(soup, ['時長:', 'Duration:'])
# 获取maker系列
result['maker_name'], result['maker_link'] = self.parse_movie_val_href(soup, ['片商:', 'Maker:'])
result['series_name'], result['series_link'] = self.parse_movie_val_href(soup, ['系列:', 'Series:'])
result['pub_name'], result['pub_link'] = self.parse_movie_val_href(soup, ['發行:', 'Publisher:'])
# 获取演员tags
result['tags'] = self.parse_movie_arr(soup, ['類別:', 'Tags:'])
result['actors'] = self.parse_movie_arr(soup, ['演員:', 'Actor(s):'])
return result
def parse_series_uncensored(self, soup, href):
div_series = soup.find("div", id='series')
if not div_series:
logging.warning(f"Warning: No div_series div found ")
return None, None
# 解析元素
rows = div_series.find_all('a', class_='box')
list_data = []
next_url = None
for row in rows:
name = row.find('strong').text.strip()
href = row['href']
div_movies = row.find('span')
movies = 0
if div_movies:
match = re.search(r'\((\d+)\)', div_movies.text.strip())
if match:
movies = int(match.group(1))
list_data.append({
'name': name,
'href': host_url + href if href else '',
'movies': movies
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = self.url_page_num(next_page_url)
current_page_number = self.url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number:
next_url = host_url + next_page_url
return list_data, next_url
def parse_series_detail(self, soup, href):
# div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href': host_url + link if link else '',
'serial_number': serial_number,
'title': title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = self.url_page_num(next_page_url)
current_page_number = self.url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number:
next_url = host_url + next_page_url
return list_data, next_url
def parse_makers_uncensored(self, soup, href):
div_series = soup.find("div", id='makers')
if not div_series:
logging.warning(f"Warning: No makers div found ")
return None, None
# 解析元素
rows = div_series.find_all('a', class_='box')
list_data = []
next_url = None
for row in rows:
name = row.find('strong').text.strip()
href = row['href']
div_movies = row.find('span')
movies = 0
if div_movies:
match = re.search(r'\((\d+)\)', div_movies.text.strip())
if match:
movies = int(match.group(1))
list_data.append({
'name': name,
'href': host_url + href if href else '',
'movies': movies
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = self.url_page_num(next_page_url)
current_page_number = self.url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number:
next_url = host_url + next_page_url
return list_data, next_url
def parse_maker_detail(self, soup, href):
# div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href': host_url + link if link else '',
'serial_number': serial_number,
'title': title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = self.url_page_num(next_page_url)
current_page_number = self.url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number:
next_url = host_url + next_page_url
return list_data, next_url
def parse_publisher_detail(self, soup, href):
# div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href': host_url + link if link else '',
'serial_number': serial_number,
'title': title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = self.url_page_num(next_page_url)
current_page_number = self.url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number:
next_url = host_url + next_page_url
return list_data, next_url
def parse_uncensored(self, soup, href):
# div_movies = soup.find("div", class_='movie-list h cols-4 vcols-8')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href': host_url + link if link else '',
'serial_number': serial_number,
'title': title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = self.url_page_num(next_page_url)
current_page_number = self.url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number:
next_url = host_url + next_page_url
return list_data, next_url
@staticmethod
def pretty_print_json(data, n=10, indent=4, sort_keys=False):
"""
以美化格式打印数组的前n个元素其他元素用"..."表示
参数:
- data: 要打印的数据(应为数组)
- n: 要显示的元素数量
- indent: 缩进空格数
- sort_keys: 是否按键排序
"""
try:
# 处理非数组数据
if not isinstance(data, list):
print(formatted)
return
# 复制原始数据,避免修改原数组
data_copy = data.copy()
# 切片取前n个元素
first_n_elements = data_copy[:n]
# 如果数组长度超过n添加"..."标记
if len(data) > n:
result = first_n_elements + ["... ({} more elements)".format(len(data) - n)]
else:
result = first_n_elements
# 格式化输出
formatted = json.dumps(result, indent=indent, ensure_ascii=False, sort_keys=sort_keys)
print(formatted)
except TypeError as e:
print(f"错误:无法格式化数据。详情:{e}")
except Exception as e:
print(f"打印时发生意外错误:{e}")
def test_actor_list(self, url='https://www.javbus.com/uncensored/actresses/1'):
next_url = url
all_data = []
while next_url:
print(f'fetching page {next_url}')
soup, status_code = self.fetch_page(next_url, partial(self.generic_validator, tag="div", identifier="waterfall", attr_type="id"),
max_retries=1)
if soup:
list_data, next_url = self.parse_actors_list(soup, next_url)
if list_data:
all_data.extend(list_data)
self.pretty_print_json(all_data)
else:
print('get wrong page.')
if next_url:
print(f"\n\nnext url: {next_url}")
else:
print(f"wrong request. url: {next_url}, status_code: {status_code}")
break
def url_page_num(self, url):
# 这里需要根据实际情况实现提取页码的逻辑
return None

View File

@ -1,71 +0,0 @@
import logging
import cloudscraper
from bs4 import BeautifulSoup
import src.utils.utils as utils
# 设置 headers 和 scraper
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0'
}
# 定义 cookie
cookies = {
}
scraper = cloudscraper.create_scraper()
http_code_404 = 404
http_code_login = 401
http_code_local = 99
logging.getLogger().setLevel(logging.DEBUG)
#使用 CloudScraper 进行网络请求,并执行页面验证,支持不同解析器和预处理
def fetch_page(url, validator, max_retries=3, parser="html.parser", preprocessor=None, headers=headers, cookies=cookies):
for attempt in range(max_retries):
try:
if not utils.is_valid_url(url):
logging.error(f'wrong url format: {url}')
return None, None
response = scraper.get(url, headers=headers, cookies=cookies)
# 处理 HTTP 状态码
if response.status_code == 404:
logging.debug(f"Page not found (404): {url}")
return None, http_code_404 # 直接返回 404调用方可以跳过
response.raise_for_status() # 处理 HTTP 错误
# 检查是否发生跳转,比如到登录页面
if response.history:
logging.debug(f"Page redirected on {url}. Checking if it's a login page.")
soup = BeautifulSoup(response.text, parser)
# 判断是否为登录页面,
if soup.find('div', id='ageVerify'):
logging.warning(f"Page redirected to login page on {url}.")
return None, http_code_login
# 预处理 HTML如果提供了 preprocessor
html_text = preprocessor(response.text) if preprocessor else response.text
soup = BeautifulSoup(html_text, parser)
if validator(soup): # 进行自定义页面检查
return soup, response.status_code
logging.warning(f"Validation failed on attempt {attempt + 1} for {url}")
except cloudscraper.exceptions.CloudflareChallengeError as e:
logging.error(f"Cloudflare Challenge Error on {url}: {e}, Retring...")
except cloudscraper.exceptions.CloudflareCode1020 as e:
logging.error(f"Access Denied (Error 1020) on {url}: {e}, Retring...")
except Exception as e:
logging.error(f"Unexpected error on {url}: {e}, Retring...")
logging.error(f'Fetching failed after max retries. {url}')
return None, None # 达到最大重试次数仍然失败
# 通用的 HTML 结构验证器
def generic_validator(soup, tag, identifier, attr_type="id"):
if attr_type == "id":
return soup.find(tag, id=identifier) is not None
elif attr_type == "class":
return bool(soup.find_all(tag, class_=identifier))
elif attr_type == "name":
return bool(soup.find('select', {'name': identifier}))
return False

View File

@ -1,515 +0,0 @@
import cloudscraper
import logging
import re
import json
from functools import partial
from urllib.parse import urljoin
import src.config.config as config
import src.utils.utils as utils
import src.crawling.craw_common as scraper
# 定义基础 URL 和可变参数
host_url = "https://www.javbus.com"
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Sec-Fetch-Site": "none",
"Accept-Encoding": "gzip, deflate, br",
"Sec-Fetch-Mode": "navigate",
"Host": "www.javbus.com",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15",
"Accept-Language": "zh-CN,zh-Hans;q=0.9",
"Sec-Fetch-Dest": "document",
"Connection": "keep-alive",
}
cookies = {
'PHPSESSID': 'l9m4ugaaao1hgvl3micr22u3o6',
'existmag': 'all',
'age': 'verified'
}
# 解析 HTML 内容,提取需要的数据
def parse_actors_list(soup, href):
div_actors = soup.find("div", id='waterfall')
if not div_actors:
logging.warning(f"Warning: No actors div found ")
return None, None
# 解析元素
rows = div_actors.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
# 获取演员详情链接
actor_link = row.find('a')['href']
# 获取演员名字
actor_name = row.find('span').text.strip()
# 获取头像图片链接
avatar_url = row.find('img')['src']
list_data.append({
'name' : actor_name,
'href' : urljoin(host_url, actor_link),
'pic' : avatar_url
})
# 查找 "下一页" 按钮
div_link = soup.find("div", class_='text-center hidden-xs')
if div_link:
next_page_element = soup.find('a', id='next')
if next_page_element:
next_page_url = next_page_element['href']
next_url = urljoin(href, next_page_url)
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_actor_detail(soup, href):
# 先找一下别名
alias_list = []
div_meta = soup.find('span', class_='actor-section-name')
if not div_meta:
logging.warning(f'warning: no meta data found in page {href}')
return None, None
alias_div = soup.find('div', class_='column section-title')
if alias_div:
meta_list = alias_div.find_all('span', class_='section-meta')
if len(meta_list) > 1:
alias_list = meta_list[0].text.strip().split(", ")
# 头像
pic = ''
avatar = soup.find("div", class_="column actor-avatar")
if avatar:
pic = parse_avatar_image(avatar)
# 返回数据
actor = {}
# 使用正则表达式查找 class 包含 'movie-list h cols-4' 的 div 元素
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-'))
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-8')
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return None, None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
logging.debug(f'current_page: {current_page_number}, next page_num: {next_page_number}')
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
actor = {
'pic' : pic,
'alias' : alias_list,
'movies' : list_data
}
return actor, next_url
# 解析单个元素
def parse_movie_one(soup, keys):
key_strong = soup.find('strong', string=lambda text: text in keys)
if key_strong:
key_span = key_strong.find_next_sibling('span', class_='value')
if key_span:
return key_span.text.strip()
return None
# 解析值和链接
def parse_movie_val_href(soup, keys):
key_strong = soup.find('strong', string=lambda text: text in keys)
if key_strong:
key_span = key_strong.find_next_sibling('span', class_='value')
if key_span:
a_tag = key_span.find('a')
if a_tag:
return a_tag.text.strip(), host_url + a_tag.get('href')
else:
return key_span.text.strip(), None
return None, None
# 解析多个值和链接
def parse_movie_arr(soup, keys):
key_strong = soup.find('strong', string=lambda text: text in keys)
if key_strong:
key_span = key_strong.find_next_sibling('span', class_='value')
if key_span:
actors = []
a_tags = key_span.find_all('a')
for a_tag in a_tags:
actors.append({
'name': a_tag.text.strip(),
'href': host_url + a_tag.get('href')
})
return actors
return []
# 解析 HTML 内容,提取需要的数据
def parse_movie_detail(soup, href, title):
div_video = soup.find("div", class_='video-meta-panel')
if not div_video:
logging.warning(f"Warning: No movies div found ")
return None, None
result = {}
result['href'] = href
result['title'] = title
# 获取封面图片
cover_img = soup.select_one('.column-video-cover a')
result['cover_url'] = cover_img['href'] if cover_img else None
# 获取番号
result['serial_number'] = parse_movie_one(soup, ['番號:', 'ID:'])
result['release_date'] = parse_movie_one(soup, ['日期:', 'Released Date:'])
result['duration'] = parse_movie_one(soup, ['時長:', 'Duration:'])
# 获取maker系列
result['maker_name'], result['maker_link'] = parse_movie_val_href(soup, ['片商:', 'Maker:'])
result['series_name'], result['series_link'] = parse_movie_val_href(soup, ['系列:', 'Series:'])
result['pub_name'], result['pub_link'] = parse_movie_val_href(soup, ['發行:', 'Publisher:'])
# 获取演员tags
result['tags'] = parse_movie_arr(soup, ['類別:', 'Tags:'])
result['actors'] = parse_movie_arr(soup, ['演員:', 'Actor(s):'])
return result
# 解析 HTML 内容,提取需要的数据
def parse_series_uncensored(soup, href):
div_series = soup.find("div", id='series')
if not div_series:
logging.warning(f"Warning: No div_series div found ")
return None, None
# 解析元素
rows = div_series.find_all('a', class_='box')
list_data = []
next_url = None
for row in rows:
name = row.find('strong').text.strip()
href = row['href']
div_movies = row.find('span')
movies = 0
if div_movies:
match = re.search(r'\((\d+)\)', div_movies.text.strip())
if match:
movies = int(match.group(1))
list_data.append({
'name' : name,
'href' : host_url + href if href else '',
'movies' : movies
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_series_detail(soup, href):
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_makers_uncensored(soup, href):
div_series = soup.find("div", id='makers')
if not div_series:
logging.warning(f"Warning: No makers div found ")
return None, None
# 解析元素
rows = div_series.find_all('a', class_='box')
list_data = []
next_url = None
for row in rows:
name = row.find('strong').text.strip()
href = row['href']
div_movies = row.find('span')
movies = 0
if div_movies:
match = re.search(r'\((\d+)\)', div_movies.text.strip())
if match:
movies = int(match.group(1))
list_data.append({
'name' : name,
'href' : host_url + href if href else '',
'movies' : movies
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_maker_detail(soup, href):
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_publisher_detail(soup, href):
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_uncensored(soup, href):
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-8')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
def pretty_print_json(data, n=10, indent=4, sort_keys=False):
"""
以美化格式打印数组的前n个元素其他元素用"..."表示
参数:
- data: 要打印的数据(应为数组)
- n: 要显示的元素数量
- indent: 缩进空格数
- sort_keys: 是否按键排序
"""
try:
# 处理非数组数据
if not isinstance(data, list):
print(formatted)
return
# 复制原始数据,避免修改原数组
data_copy = data.copy()
# 切片取前n个元素
first_n_elements = data_copy[:n]
# 如果数组长度超过n添加"..."标记
if len(data) > n:
result = first_n_elements + ["... ({} more elements)".format(len(data) - n)]
else:
result = first_n_elements
# 格式化输出
formatted = json.dumps(result, indent=indent, ensure_ascii=False, sort_keys=sort_keys)
print(formatted)
except TypeError as e:
print(f"错误:无法格式化数据。详情:{e}")
except Exception as e:
print(f"打印时发生意外错误:{e}")
def test_actor_list(url='https://www.javbus.com/uncensored/actresses/1'):
next_url = url
all_data = []
while next_url:
print(f'fetching page {next_url}')
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="waterfall", attr_type="id"),max_retries=1, headers=headers, cookies=cookies)
if soup:
list_data, next_url = parse_actors_list(soup, next_url)
if list_data :
all_data.extend(list_data)
pretty_print_json(all_data)
else:
print('get wrong page.')
if next_url:
print(f"\n\nnext url: {next_url}")
else:
print(f"wrong request. url: {next_url}, status_code: {status_code}")
break
if __name__ == "__main__":
#test_actors_list()
#test_actor()
#test_movie_detail()
#test_series_list()
#test_series_detail()
logging.getLogger().setLevel(logging.DEBUG)
test_actor_list()
test_actor_list('https://www.javbus.com/en/actresses')