This repository has been archived on 2026-01-07. You can view files and clone it, but cannot push or open issues or pull requests.
Files
resources/src/crawling/craw.py
2025-06-26 15:59:56 +08:00

543 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import sys
import requests
import re
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import src.utils.utils as utils
http_code_404 = 404
http_code_redirect = 401
http_code_url = 601
http_code_local = 99
# 通用的爬取类,主要实现了底层的网络交互封装
class GenericCrawler:
def __init__(self, use_cloudscraper=None, headers=None, cookies=None, max_retries=3, html_parser='html.parser'):
if use_cloudscraper is None:
use_cloudscraper = sys.version_info >= (3, 8)
self.use_cloudscraper = use_cloudscraper
self.headers = headers or {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0'
}
self.cookies = cookies or {}
self.scraper = None # 延迟初始化
self.max_retries = max_retries
self.parser = html_parser
# 不在这里导入 cloudscraper而是在需要时导入
def _initialize_scraper(self):
"""延迟初始化请求客户端,避免不必要的 cloudscraper 导入"""
if self.scraper is not None:
return
if self.use_cloudscraper:
try:
# 延迟导入 cloudscraper
import cloudscraper
self.scraper = cloudscraper.create_scraper()
logging.info("Using cloudscraper for requests")
except ImportError:
logging.warning("cloudscraper not installed. Falling back to requests.")
self.use_cloudscraper = False
self.scraper = requests.Session()
else:
self.scraper = requests.Session()
logging.info("Using requests for HTTP operations")
def fetch_page(self, url, validator):
# 在使用前初始化 scraper
self._initialize_scraper()
for attempt in range(self.max_retries):
try:
if not utils.is_valid_url(url):
logging.error(f'wrong url format: {url}')
return None, http_code_url
response = self.scraper.get(url, headers=self.headers, cookies=self.cookies)
# 处理 HTTP 状态码
if response.status_code == http_code_404:
logging.debug(f"Page not found (404): {url}")
return None, http_code_404 # 直接返回 404调用方可以跳过
response.raise_for_status() # 处理 HTTP 错误
# 检查是否发生跳转,比如到登录页面
if response.history:
logging.debug(f"Page redirected on {url}. Checking if it's a verify page.")
soup = BeautifulSoup(response.text, self.parser)
if self.check_redirect(soup) :
logging.warning(f"Page redirected to verify page on {url}.")
return None, http_code_redirect
# 判断是否为登录页面
#if soup.find('div', id='ageVerify'):
# 预处理 HTML如果提供了 preprocessor
html_text = self.preprocessor(response.text)
soup = BeautifulSoup(html_text, self.parser)
if validator(soup): # 进行自定义页面检查
return soup, response.status_code
logging.warning(f"Validation failed on attempt {attempt + 1} for {url}")
except Exception as e:
logging.error(f"Unexpected error on {url}: {e}, Retrying...")
logging.error(f'Fetching failed after max retries. {url}')
return None, None # 达到最大重试次数仍然失败
# 对页面的预处理,通常是修复标签之类的
def preprocessor(self, html):
return html
# 检查是否发生了跳转,偏离了正常解析
def check_redirect(self, soup):
"""默认的页面验证器,子类可重写"""
return False # 默认验证通过
@staticmethod
def generic_validator(soup, tag, identifier, attr_type="id"):
if attr_type == "id":
return soup.find(tag, id=identifier) is not None
elif attr_type == "class":
return bool(soup.find_all(tag, class_=identifier))
elif attr_type == "name":
return bool(soup.find('select', {'name': identifier}))
return False
# javbus.com 网页爬取类
class JavbusCrawler(GenericCrawler):
def __init__(self, use_cloudscraper=None):
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Sec-Fetch-Site": "none",
"Accept-Encoding": "gzip, deflate, br",
"Sec-Fetch-Mode": "navigate",
"Host": "www.javbus.com",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15",
"Accept-Language": "zh-CN,zh-Hans;q=0.9",
"Sec-Fetch-Dest": "document",
"Connection": "keep-alive",
}
cookies = {
'PHPSESSID': 'l9m4ugaaao1hgvl3micr22u3o6',
'existmag': 'all',
'age': 'verified'
}
super().__init__(use_cloudscraper, headers=headers, cookies=cookies)
self.host_url = "https://www.javbus.com"
# 以下是原有的解析函数,保持不变
def parse_actors_list(self, soup, href):
div_actors = soup.find("div", id='waterfall')
if not div_actors:
logging.warning(f"Warning: No actors div found ")
return None, None
# 解析元素
rows = div_actors.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
# 获取演员详情链接
actor_link = row.find('a')['href']
# 获取演员名字
actor_name = row.find('span').text.strip()
# 获取头像图片链接
avatar_url = row.find('img')['src']
list_data.append({
'name': actor_name,
'href': urljoin(self.host_url, actor_link),
'pic': avatar_url
})
# 查找 "下一页" 按钮
div_link = soup.find("div", class_='text-center hidden-xs')
if div_link:
next_page_element = soup.find('a', id='next')
if next_page_element:
next_page_url = next_page_element['href']
next_url = urljoin(href, next_page_url)
return list_data, next_url
# 获取演员详情
def parse_actor_detail(self, soup, href):
"""
解析Javbus网页内容提取演员信息和影片列表
"""
result = {
'avatar': {},
'movies': []
}
try:
# 解析演员信息
avatar_box = soup.find('div', class_='avatar-box')
if avatar_box:
result['avatar'] = self.parse_avatar_info(avatar_box)
else:
logging.debug(f"avatar-box not found. href: {href}")
# 解析影片列表
movie_boxes = soup.find_all('a', class_='movie-box')
if movie_boxes:
for movie_box in movie_boxes:
movie_info = self.parse_movie_info(movie_box)
if movie_info:
result['movies'].append(movie_info)
else:
logging.debug(f"movie-box not found. href: {href}")
except Exception as e:
logging.warning(f"parse html error: {str(e)}, href: {href}", exc_info=True)
# 查找 "下一页" 按钮
next_url = None
div_link = soup.find("div", class_='text-center hidden-xs')
if div_link:
next_page_element = soup.find('a', id='next')
if next_page_element:
next_page_url = next_page_element['href']
next_url = urljoin(href, next_page_url)
return result, next_url
def parse_avatar_info(self, avatar_box):
"""
解析演员信息
"""
avatar_info = {}
# 定义映射关系:包含各种语言的字段名称及其对应的目标键名
field_mapping = {
'birth_date': ['生日', 'D.O.B', '生年月日', 'Birthday'],
'age': ['年齡', 'Age', '年龄'],
'height': ['身高', 'Height', '身長'],
'breast_size': ['罩杯', 'Cup', 'ブラのサイズ'],
'bust': ['胸圍', 'Bust', 'バスト'],
'waist': ['腰圍', 'Waist', 'ウエスト'],
'hip': ['臀圍', 'Hips', 'ヒップ'],
'hobby': ['愛好', 'Hobby', '趣味']
}
# 提取演员名称
name_span = avatar_box.find('span', class_='pb10')
if name_span:
avatar_info['name'] = name_span.get_text(strip=True)
else:
logging.debug("未找到演员名称")
# 提取生日、年龄等信息
p_tags = avatar_box.find_all('p')
for p in p_tags:
text = p.get_text(strip=True)
# 使用正则表达式匹配冒号前后的内容
match = re.search(r'^(.*?)[:](.*)$', text)
if match:
key = match.group(1).strip()
value = match.group(2).strip()
# 查找对应的目标键名
target_key = next((k for k, v in field_mapping.items() if any(x in key for x in v)), None)
if target_key:
# 特殊处理数字类型和单位转换
if target_key in ['age', 'height', 'bust', 'waist', 'hip']:
# 提取数字部分
num_match = re.search(r'(\d+\.?\d*)', value)
if num_match:
try:
avatar_info[target_key] = float(num_match.group(1))
# 保留整数(如果是整数)
if avatar_info[target_key].is_integer():
avatar_info[target_key] = int(avatar_info[target_key])
except ValueError:
logging.debug(f"转换数字失败: {value}")
avatar_info[target_key] = value
else:
logging.debug(f"未找到数字部分: {value}")
avatar_info[target_key] = value
else:
avatar_info[target_key] = value
else:
logging.debug(f"未知的演员信息类型: {key}")
else:
logging.debug(f"无法解析的演员信息: {text}")
avatar_info['measurements'] = f"{avatar_info.get('bust', '')}-{avatar_info.get('waist', '')}-{avatar_info.get('hip', '') }"
return avatar_info
def parse_movie_info(self, movie_box):
"""
解析影片信息
"""
movie_info = {}
try:
# 提取影片链接
href = movie_box.get('href')
if href:
movie_info['href'] = href
else:
logging.warning("未找到影片链接")
return None
# 提取图片链接
img_tag = movie_box.find('img')
if img_tag and 'src' in img_tag.attrs:
movie_info['cover_url'] = img_tag['src']
movie_info['title'] = img_tag['title']
else:
logging.warning("未找到影片图片链接")
# 提取标题、番号和发布日期
photo_info = movie_box.find('div', class_='photo-info')
if photo_info:
# 提取标题 (span标签中的文本排除date标签)
span_tag = photo_info.find('span')
if span_tag:
# 获取span下的纯文本内容 (不包含date标签)
title = ''.join(span_tag.find_all(text=True, recursive=False)).strip()
# 移除常见的分隔符模式
if title.endswith('\n\n /'):
clean_title = title[:-4].strip()
elif title.endswith('\n /'):
clean_title = title[:-3].strip()
else:
clean_title = title
movie_info['title'] = clean_title
# 提取番号和日期 (date标签)
date_tags = span_tag.find_all('date')
if len(date_tags) >= 2:
movie_info['serial_number'] = date_tags[0].get_text(strip=True)
movie_info['release_date'] = date_tags[1].get_text(strip=True)
else:
logging.warning(f"date标签数量不足无法提取番号和日期")
else:
logging.warning("未找到span标签")
else:
logging.warning("未找到影片信息区域")
except Exception as e:
logging.error(f"解析影片信息时发生错误: {str(e)}", exc_info=True)
return None
return movie_info
# 获取演员详情
def parse_studios_labels_series_detail(self, soup, href):
"""
解析Javbus网页内容提取演员信息和影片列表
"""
result = {
'meta': {},
'movies': []
}
try:
# 解析标题
b_tag = soup.select_one('.alert.alert-success.alert-common p b')
if not b_tag:
logging.warning(f'found no title. href: {href}')
else:
# 获取文本内容
title_text = b_tag.get_text(strip=True)
# 使用横线分割文本
parts = [part.strip() for part in title_text.split('-')]
# 定义"影片"的多种语言表示
video_keywords = ['影片', 'Video', '映画', 'Videos', 'Movies']
# 查找"影片"关键词的位置
video_index = next((i for i, part in enumerate(parts) if part in video_keywords), None)
if video_index is not None and video_index >= 2:
# 提取前两个元素作为工作室和角色
studio = parts[video_index - 2]
role = parts[video_index - 1]
result['meta']['title'] = studio
result['meta']['role'] = role
else:
logging.debug(f"无法按规则解析: {' - '.join(parts)}")
# 提取全部影片和已有磁力的数量
# 查找a标签
a_tags = soup.select('.alert.alert-success.alert-common a.mypointer')
if not a_tags:
logging.warning(f'found no movie cnt. href: {href}')
else:
for a in a_tags:
text = a.get_text(strip=True)
# 提取全部影片数量
if '全部影片' in text:
match = re.search(r'全部影片\s*(\d+)\s*', text)
if match:
result['meta']['movies_cnt'] = int(match.group(1))
# 提取已有磁力数量
if '已有磁力' in text:
match = re.search(r'已有磁力\s*(\d+)\s*', text)
if match:
result['meta']['magnet_cnt'] = int(match.group(1))
div_waterfall = soup.find('div', id='waterfall')
if not div_waterfall:
logging.warning(f"found no records. href: {href}")
else:
# 解析影片列表
movie_boxes = div_waterfall.find_all('a', class_='movie-box')
if movie_boxes:
for movie_box in movie_boxes:
movie_info = self.parse_movie_info(movie_box)
if movie_info:
result['movies'].append(movie_info)
else:
logging.debug(f"movie-box not found. href: {href}")
except Exception as e:
logging.warning(f"parse html error: {str(e)}, href: {href}", exc_info=True)
# 查找 "下一页" 按钮
next_url = None
div_link = soup.find("div", class_='text-center hidden-xs')
if div_link:
next_page_element = soup.find('a', id='next')
if next_page_element:
next_page_url = next_page_element['href']
next_url = urljoin(href, next_page_url)
return result, next_url
# 解析Javbus影片详情页内容
def parse_movie_detail(self, soup, href, title):
result = {
'title': title,
'href': href,
'serial_number': '',
'release_date': '',
'duration': '',
'studio': {'name': '', 'href': ''},
'label': {'name': '', 'href': ''},
'series': {'name': '', 'href': ''},
'tags': [],
'actors': []
}
try:
# 提取标题
div_container = soup.find('div', class_='container')
if not div_container:
logging.warning(f"found no container tag. href: {href}")
return None
title_element = div_container.find('h3')
if title_element:
result['title'] = title_element.get_text(strip=True)
else:
logging.debug("no title found. href: {href}")
# 提取基本信息(识别码、发行日期等)
info_div = div_container.find('div', class_='info')
if not info_div:
logging.warning(f"found no div info tag. href: {href}")
return None
# 定义字段映射关系(多种语言支持)
field_mapping = {
'serial_number': ['識別碼:', '识别码:', 'ID:', '品番:'],
'release_date': ['發行日期:', '发行日期:', 'Release Date:', '発売日:'],
'duration': ['長度:', '长度:', 'Length:', '収録時間:'],
'studio': ['製作商:', '制作商:', 'Studio:', 'メーカー:'],
'label': ['發行商:', '发行商:', 'Label:', 'レーベル:'],
'series': ['系列:', 'Series:', 'シリーズ:']
}
# 遍历所有p标签查找信息
p_tags = info_div.find_all('p')
for p in p_tags:
# 查找header标签
header = p.find('span', class_='header')
if header:
header_text = header.get_text(strip=True)
# 查找匹配的目标键名
target_key = next((k for k, v in field_mapping.items() if header_text in v), None)
if target_key:
# 获取值(处理文本和链接)
if target_key in ['studio', 'label', 'series']:
# 处理有链接的字段
a_tag = p.find('a')
if a_tag:
result[target_key]['name'] = a_tag.get_text(strip=True)
result[target_key]['href'] = a_tag.get('href', '')
else:
# 没有链接,直接获取文本
value_text = p.get_text(strip=True)
# 移除header文本
value_text = value_text.replace(header_text, '').strip()
result[target_key]['name'] = value_text
logging.debug(f"{header_text} 没有链接,直接提取文本")
else:
# 处理普通文本字段
value_text = p.get_text(strip=True)
# 移除header文本
value_text = value_text.replace(header_text, '').strip()
# 特殊处理:提取时长的数字部分(咱不处理)
if target_key == 'duration' and False:
num_match = re.search(r'(\d+)', value_text)
if num_match:
result[target_key] = num_match.group(1)
else:
result[target_key] = value_text
else:
result[target_key] = value_text
# 处理类别字段
tag_lables = info_div.find_all('label')
for item in tag_lables:
link = item.find('a')
if link:
genre = {
'name': link.get_text(strip=True),
'href': link.get('href', '')
}
result['tags'].append(genre)
# 提取演员信息
star_p = info_div.find('p', class_='star-show')
if star_p:
# 查找演员列表
star_list = star_p.find_next('ul')
if star_list:
star_items = star_list.find_all('div', class_='star-name')
for item in star_items:
link = item.find('a')
if link:
actor = {
'name': link.get_text(strip=True),
'href': link.get('href', '')
}
result['actors'].append(actor)
else:
logging.debug(f"actors not found.")
else:
logging.debug("no star-name area. href: {href}")
else:
logging.debug("no star-show area. href: {href}")
except Exception as e:
logging.warning(f"parse movie detail error. href: {href}, error: {str(e)}", exc_info=True)
return result