modify scripts

This commit is contained in:
oscarz
2025-06-24 19:03:44 +08:00
parent c5feab2c22
commit 7e14a5f247
4 changed files with 610 additions and 226 deletions

View File

@ -1,6 +1,7 @@
import logging
import sys
import requests
import re
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import src.utils.utils as utils
@ -8,6 +9,7 @@ import src.utils.utils as utils
http_code_404 = 404
http_code_redirect = 401
http_code_url = 601
http_code_local = 99
# 通用的爬取类,主要实现了底层的网络交互封装
class GenericCrawler:
@ -166,138 +168,293 @@ class JavbusCrawler(GenericCrawler):
return list_data, next_url
# 获取演员详情
def parse_actor_detail(self, soup, href):
# 先找一下别名
alias_list = []
div_meta = soup.find('span', class_='actor-section-name')
if not div_meta:
logging.warning(f'warning: no meta data found in page {href}')
return None, None
alias_div = soup.find('div', class_='column section-title')
if alias_div:
meta_list = alias_div.find_all('span', class_='section-meta')
if len(meta_list) > 1:
alias_list = meta_list[0].text.strip().split(", ")
# 头像
pic = ''
avatar = soup.find("div", class_="column actor-avatar")
if avatar:
pic = self.parse_avatar_image(avatar)
# 返回数据
actor = {}
# 使用正则表达式查找 class 包含 'movie-list h cols-4' 的 div 元素
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-'))
# div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
# div_movies = soup.find("div", class_='movie-list h cols-4 vcols-8')
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return None, None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href': host_url + link if link else '',
'serial_number': serial_number,
'title': title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = self.url_page_num(next_page_url)
current_page_number = self.url_page_num(href)
logging.debug(f'current_page: {current_page_number}, next page_num: {next_page_number}')
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number:
next_url = host_url + next_page_url
actor = {
'pic': pic,
'alias': alias_list,
'movies': list_data
"""
解析Javbus网页内容提取演员信息和影片列表
"""
result = {
'avatar': {},
'movies': []
}
try:
# 解析演员信息
avatar_box = soup.find('div', class_='avatar-box')
if avatar_box:
result['avatar'] = self.parse_avatar_info(avatar_box)
else:
logging.debug(f"avatar-box not found. href: {href}")
# 解析影片列表
movie_boxes = soup.find_all('a', class_='movie-box')
if movie_boxes:
for movie_box in movie_boxes:
movie_info = self.parse_movie_info(movie_box)
if movie_info:
result['movies'].append(movie_info)
else:
logging.debug(f"movie-box not found. href: {href}")
except Exception as e:
logging.warning(f"parse html error: {str(e)}, href: {href}", exc_info=True)
# 查找 "下一页" 按钮
next_url = None
div_link = soup.find("div", class_='text-center hidden-xs')
if div_link:
next_page_element = soup.find('a', id='next')
if next_page_element:
next_page_url = next_page_element['href']
next_url = urljoin(href, next_page_url)
return result, next_url
return actor, next_url
def parse_movie_one(self, soup, keys):
key_strong = soup.find('strong', string=lambda text: text in keys)
if key_strong:
key_span = key_strong.find_next_sibling('span', class_='value')
if key_span:
return key_span.text.strip()
return None
def parse_movie_val_href(self, soup, keys):
key_strong = soup.find('strong', string=lambda text: text in keys)
if key_strong:
key_span = key_strong.find_next_sibling('span', class_='value')
if key_span:
a_tag = key_span.find('a')
if a_tag:
return a_tag.text.strip(), host_url + a_tag.get('href')
def parse_avatar_info(self, avatar_box):
"""
解析演员信息
"""
avatar_info = {}
# 定义映射关系:包含各种语言的字段名称及其对应的目标键名
field_mapping = {
'birth_date': ['生日', 'D.O.B', '生年月日', 'Birthday'],
'age': ['年齡', 'Age', '年龄'],
'height': ['身高', 'Height', '身長'],
'breast_size': ['罩杯', 'Cup', 'ブラのサイズ'],
'bust': ['胸圍', 'Bust', 'バスト'],
'waist': ['腰圍', 'Waist', 'ウエスト'],
'hip': ['臀圍', 'Hips', 'ヒップ'],
'hobby': ['愛好', 'Hobby', '趣味']
}
# 提取演员名称
name_span = avatar_box.find('span', class_='pb10')
if name_span:
avatar_info['name'] = name_span.get_text(strip=True)
else:
logging.debug("未找到演员名称")
# 提取生日、年龄等信息
p_tags = avatar_box.find_all('p')
for p in p_tags:
text = p.get_text(strip=True)
# 使用正则表达式匹配冒号前后的内容
match = re.search(r'^(.*?)[:](.*)$', text)
if match:
key = match.group(1).strip()
value = match.group(2).strip()
# 查找对应的目标键名
target_key = next((k for k, v in field_mapping.items() if any(x in key for x in v)), None)
if target_key:
# 特殊处理数字类型和单位转换
if target_key in ['age', 'height', 'bust', 'waist', 'hip']:
# 提取数字部分
num_match = re.search(r'(\d+\.?\d*)', value)
if num_match:
try:
avatar_info[target_key] = float(num_match.group(1))
# 保留整数(如果是整数)
if avatar_info[target_key].is_integer():
avatar_info[target_key] = int(avatar_info[target_key])
except ValueError:
logging.debug(f"转换数字失败: {value}")
avatar_info[target_key] = value
else:
logging.debug(f"未找到数字部分: {value}")
avatar_info[target_key] = value
else:
avatar_info[target_key] = value
else:
return key_span.text.strip(), None
return None, None
logging.debug(f"未知的演员信息类型: {key}")
else:
logging.debug(f"无法解析的演员信息: {text}")
def parse_movie_arr(self, soup, keys):
key_strong = soup.find('strong', string=lambda text: text in keys)
if key_strong:
key_span = key_strong.find_next_sibling('span', class_='value')
if key_span:
actors = []
a_tags = key_span.find_all('a')
for a_tag in a_tags:
actors.append({
'name': a_tag.text.strip(),
'href': host_url + a_tag.get('href')
})
return actors
return []
avatar_info['measurements'] = f"{avatar_info.get('bust', '')}-{avatar_info.get('waist', '')}-{avatar_info.get('hip', '') }"
return avatar_info
def parse_movie_info(self, movie_box):
"""
解析影片信息
"""
movie_info = {}
try:
# 提取影片链接
href = movie_box.get('href')
if href:
movie_info['href'] = href
else:
logging.warning("未找到影片链接")
return None
# 提取图片链接
img_tag = movie_box.find('img')
if img_tag and 'src' in img_tag.attrs:
movie_info['cover_url'] = img_tag['src']
movie_info['title'] = img_tag['title']
else:
logging.warning("未找到影片图片链接")
# 提取标题、番号和发布日期
photo_info = movie_box.find('div', class_='photo-info')
if photo_info:
# 提取标题 (span标签中的文本排除date标签)
span_tag = photo_info.find('span')
if span_tag:
# 获取span下的纯文本内容 (不包含date标签)
title = ''.join(span_tag.find_all(text=True, recursive=False)).strip()
# 移除常见的分隔符模式
if title.endswith('\n\n /'):
clean_title = title[:-4].strip()
elif title.endswith('\n /'):
clean_title = title[:-3].strip()
else:
clean_title = title
movie_info['title'] = clean_title
# 提取番号和日期 (date标签)
date_tags = span_tag.find_all('date')
if len(date_tags) >= 2:
movie_info['serial_number'] = date_tags[0].get_text(strip=True)
movie_info['release_date'] = date_tags[1].get_text(strip=True)
else:
logging.warning(f"date标签数量不足无法提取番号和日期")
else:
logging.warning("未找到span标签")
else:
logging.warning("未找到影片信息区域")
except Exception as e:
logging.error(f"解析影片信息时发生错误: {str(e)}", exc_info=True)
return None
return movie_info
# 解析Javbus影片详情页内容
def parse_movie_detail(self, soup, href, title):
div_video = soup.find("div", class_='video-meta-panel')
if not div_video:
logging.warning(f"Warning: No movies div found ")
return None, None
result = {}
result['href'] = href
result['title'] = title
# 获取封面图片
cover_img = soup.select_one('.column-video-cover a')
result['cover_url'] = cover_img['href'] if cover_img else None
# 获取番号
result['serial_number'] = self.parse_movie_one(soup, ['番號:', 'ID:'])
result['release_date'] = self.parse_movie_one(soup, ['日期:', 'Released Date:'])
result['duration'] = self.parse_movie_one(soup, ['時長:', 'Duration:'])
# 获取maker系列
result['maker_name'], result['maker_link'] = self.parse_movie_val_href(soup, ['片商:', 'Maker:'])
result['series_name'], result['series_link'] = self.parse_movie_val_href(soup, ['系列:', 'Series:'])
result['pub_name'], result['pub_link'] = self.parse_movie_val_href(soup, ['發行:', 'Publisher:'])
# 获取演员tags
result['tags'] = self.parse_movie_arr(soup, ['類別:', 'Tags:'])
result['actors'] = self.parse_movie_arr(soup, ['演員:', 'Actor(s):'])
result = {
'title': title,
'href': href,
'serial_number': '',
'release_date': '',
'duration': '',
'studio': {'name': '', 'href': ''},
'label': {'name': '', 'href': ''},
'series': {'name': '', 'href': ''},
'tags': [],
'actors': []
}
try:
# 提取标题
div_container = soup.find('div', class_='container')
if not div_container:
logging.warning(f"found no container tag.")
return None
title_element = div_container.find('h3')
if title_element:
result['title'] = title_element.get_text(strip=True)
else:
logging.debug("未找到影片标题")
# 提取基本信息(识别码、发行日期等)
info_div = div_container.find('div', class_='info')
if not info_div:
logging.warning(f"found no div info tag.")
return None
# 定义字段映射关系(多种语言支持)
field_mapping = {
'serial_number': ['識別碼:', '识别码:', 'ID:', '品番:'],
'release_date': ['發行日期:', '发行日期:', 'Release Date:', '発売日:'],
'duration': ['長度:', '长度:', 'Length:', '収録時間:'],
'studio': ['製作商:', '制作商:', 'Studio:', 'メーカー:'],
'label': ['發行商:', '发行商:', 'Label:', 'レーベル:'],
'series': ['系列:', 'Series:', 'シリーズ:']
}
# 遍历所有p标签查找信息
p_tags = info_div.find_all('p')
for p in p_tags:
# 查找header标签
header = p.find('span', class_='header')
if header:
header_text = header.get_text(strip=True)
# 查找匹配的目标键名
target_key = next((k for k, v in field_mapping.items() if header_text in v), None)
if target_key:
# 获取值(处理文本和链接)
if target_key in ['studio', 'label', 'series']:
# 处理有链接的字段
a_tag = p.find('a')
if a_tag:
result[target_key]['name'] = a_tag.get_text(strip=True)
result[target_key]['href'] = a_tag.get('href', '')
else:
# 没有链接,直接获取文本
value_text = p.get_text(strip=True)
# 移除header文本
value_text = value_text.replace(header_text, '').strip()
result[target_key]['name'] = value_text
logging.debug(f"{header_text} 没有链接,直接提取文本")
else:
# 处理普通文本字段
value_text = p.get_text(strip=True)
# 移除header文本
value_text = value_text.replace(header_text, '').strip()
# 特殊处理:提取时长的数字部分(咱不处理)
if target_key == 'duration' and False:
num_match = re.search(r'(\d+)', value_text)
if num_match:
result[target_key] = num_match.group(1)
else:
result[target_key] = value_text
else:
result[target_key] = value_text
# 处理类别字段
tag_lables = info_div.find_all('label')
for item in tag_lables:
link = item.find('a')
if link:
genre = {
'name': link.get_text(strip=True),
'href': link.get('href', '')
}
result['tags'].append(genre)
# 提取演员信息
star_p = info_div.find('p', class_='star-show')
if star_p:
# 查找演员列表
star_list = star_p.find_next('ul')
if star_list:
star_items = star_list.find_all('div', class_='star-name')
for item in star_items:
link = item.find('a')
if link:
actor = {
'name': link.get_text(strip=True),
'href': link.get('href', '')
}
result['actors'].append(actor)
else:
logging.debug(f"actors not found.")
else:
logging.warning("未找到演员列表区域")
else:
logging.warning("未找到演员标题")
except Exception as e:
logging.error(f"解析影片详情时发生错误: {str(e)}", exc_info=True)
return result
def parse_series_uncensored(self, soup, href):
@ -527,67 +684,3 @@ class JavbusCrawler(GenericCrawler):
next_url = host_url + next_page_url
return list_data, next_url
@staticmethod
def pretty_print_json(data, n=10, indent=4, sort_keys=False):
"""
以美化格式打印数组的前n个元素其他元素用"..."表示
参数:
- data: 要打印的数据(应为数组)
- n: 要显示的元素数量
- indent: 缩进空格数
- sort_keys: 是否按键排序
"""
try:
# 处理非数组数据
if not isinstance(data, list):
print(formatted)
return
# 复制原始数据,避免修改原数组
data_copy = data.copy()
# 切片取前n个元素
first_n_elements = data_copy[:n]
# 如果数组长度超过n添加"..."标记
if len(data) > n:
result = first_n_elements + ["... ({} more elements)".format(len(data) - n)]
else:
result = first_n_elements
# 格式化输出
formatted = json.dumps(result, indent=indent, ensure_ascii=False, sort_keys=sort_keys)
print(formatted)
except TypeError as e:
print(f"错误:无法格式化数据。详情:{e}")
except Exception as e:
print(f"打印时发生意外错误:{e}")
def test_actor_list(self, url='https://www.javbus.com/uncensored/actresses/1'):
next_url = url
all_data = []
while next_url:
print(f'fetching page {next_url}')
soup, status_code = self.fetch_page(next_url, partial(self.generic_validator, tag="div", identifier="waterfall", attr_type="id"),
max_retries=1)
if soup:
list_data, next_url = self.parse_actors_list(soup, next_url)
if list_data:
all_data.extend(list_data)
self.pretty_print_json(all_data)
else:
print('get wrong page.')
if next_url:
print(f"\n\nnext url: {next_url}")
else:
print(f"wrong request. url: {next_url}, status_code: {status_code}")
break
def url_page_num(self, url):
# 这里需要根据实际情况实现提取页码的逻辑
return None