modify scripts

This commit is contained in:
oscarz
2025-06-25 11:38:06 +08:00
parent 9cf521a0d6
commit 5ebfe7cb8c
4 changed files with 306 additions and 309 deletions

View File

@ -333,6 +333,69 @@ class JavbusCrawler(GenericCrawler):
return movie_info
# 获取演员详情
def parse_studios_labels_series_detail(self, soup, href):
"""
解析Javbus网页内容提取演员信息和影片列表
"""
result = {
'meta': {},
'movies': []
}
try:
# 解析标题
b_tag = soup.select_one('.alert.alert-success.alert-common p b')
if not b_tag:
logging.warning(f'found no title. href: {href}')
else:
# 获取文本内容
title_text = b_tag.get_text(strip=True)
# 使用横线分割文本
parts = [part.strip() for part in title_text.split('-')]
# 定义"影片"的多种语言表示
video_keywords = ['影片', 'Video', '映画', 'Videos', 'Movies']
# 查找"影片"关键词的位置
video_index = next((i for i, part in enumerate(parts) if part in video_keywords), None)
if video_index is not None and video_index >= 2:
# 提取前两个元素作为工作室和角色
studio = parts[video_index - 2]
role = parts[video_index - 1]
result['meta'] = {'title': studio, 'role': role}
else:
logging.debug(f"无法按规则解析: {' - '.join(parts)}")
div_waterfall = soup.find('div', id='waterfall')
if not div_waterfall:
logging.warning(f"found no records. href: {href}")
else:
# 解析影片列表
movie_boxes = div_waterfall.find_all('a', class_='movie-box')
if movie_boxes:
for movie_box in movie_boxes:
movie_info = self.parse_movie_info(movie_box)
if movie_info:
result['movies'].append(movie_info)
else:
logging.debug(f"movie-box not found. href: {href}")
except Exception as e:
logging.warning(f"parse html error: {str(e)}, href: {href}", exc_info=True)
# 查找 "下一页" 按钮
next_url = None
div_link = soup.find("div", class_='text-center hidden-xs')
if div_link:
next_page_element = soup.find('a', id='next')
if next_page_element:
next_page_url = next_page_element['href']
next_url = urljoin(href, next_page_url)
return result, next_url
# 解析Javbus影片详情页内容
def parse_movie_detail(self, soup, href, title):
result = {
@ -456,231 +519,3 @@ class JavbusCrawler(GenericCrawler):
logging.error(f"解析影片详情时发生错误: {str(e)}", exc_info=True)
return result
def parse_series_uncensored(self, soup, href):
div_series = soup.find("div", id='series')
if not div_series:
logging.warning(f"Warning: No div_series div found ")
return None, None
# 解析元素
rows = div_series.find_all('a', class_='box')
list_data = []
next_url = None
for row in rows:
name = row.find('strong').text.strip()
href = row['href']
div_movies = row.find('span')
movies = 0
if div_movies:
match = re.search(r'\((\d+)\)', div_movies.text.strip())
if match:
movies = int(match.group(1))
list_data.append({
'name': name,
'href': host_url + href if href else '',
'movies': movies
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = self.url_page_num(next_page_url)
current_page_number = self.url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number:
next_url = host_url + next_page_url
return list_data, next_url
def parse_series_detail(self, soup, href):
# div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href': host_url + link if link else '',
'serial_number': serial_number,
'title': title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = self.url_page_num(next_page_url)
current_page_number = self.url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number:
next_url = host_url + next_page_url
return list_data, next_url
def parse_makers_uncensored(self, soup, href):
div_series = soup.find("div", id='makers')
if not div_series:
logging.warning(f"Warning: No makers div found ")
return None, None
# 解析元素
rows = div_series.find_all('a', class_='box')
list_data = []
next_url = None
for row in rows:
name = row.find('strong').text.strip()
href = row['href']
div_movies = row.find('span')
movies = 0
if div_movies:
match = re.search(r'\((\d+)\)', div_movies.text.strip())
if match:
movies = int(match.group(1))
list_data.append({
'name': name,
'href': host_url + href if href else '',
'movies': movies
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = self.url_page_num(next_page_url)
current_page_number = self.url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number:
next_url = host_url + next_page_url
return list_data, next_url
def parse_maker_detail(self, soup, href):
# div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href': host_url + link if link else '',
'serial_number': serial_number,
'title': title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = self.url_page_num(next_page_url)
current_page_number = self.url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number:
next_url = host_url + next_page_url
return list_data, next_url
def parse_publisher_detail(self, soup, href):
# div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href': host_url + link if link else '',
'serial_number': serial_number,
'title': title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = self.url_page_num(next_page_url)
current_page_number = self.url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number:
next_url = host_url + next_page_url
return list_data, next_url
def parse_uncensored(self, soup, href):
# div_movies = soup.find("div", class_='movie-list h cols-4 vcols-8')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href': host_url + link if link else '',
'serial_number': serial_number,
'title': title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = self.url_page_num(next_page_url)
current_page_number = self.url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number:
next_url = host_url + next_page_url
return list_data, next_url

View File

@ -165,7 +165,10 @@ class JavbusDBHandler(DatabaseHandler):
return None
def insert_movie_index(self, data, **kwargs):
fields = ['uncensored', 'from_actor_list', 'from_movie_studios', 'from_movie_labels', 'from_movie_series']
fields = [
'uncensored', 'from_actor_list', 'from_movie_studios', 'from_movie_labels', 'from_movie_series',
'studio_id', 'label_id', 'series_id'
]
# 如果没有传入值,就用原来的值
for field in fields:
if kwargs.get(field) is not None:
@ -430,3 +433,88 @@ class JavbusDBHandler(DatabaseHandler):
logging.error("Error inserting movie: %s", e)
return None
# 更新 studio / label / series 等的多语言
def update_pubs_multilang(self, data, tbl, **filters):
tbls = {'studio': self.tbl_name_studios, 'label':self.tbl_name_labels, 'series':self.tbl_name_series}
if not tbls.get(tbl):
logging.warning(f"wrong table. table: {tbl}")
return None
return self.insert_or_update_common(data=data, tbl_name=tbls[tbl], uniq_key='href')
def query_list_common(self, tbl, **filters):
tbls = {'studio': self.tbl_name_studios, 'label':self.tbl_name_labels, 'series':self.tbl_name_series}
if not tbls.get(tbl):
logging.warning(f"wrong table. table: {tbl}")
return None
try:
sql = f"SELECT href, name, uncensored, id FROM {tbls[tbl]} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"name": " AND name LIKE ?",
"start_id": " AND id > ?",
"uncensored": " AND uncensored = ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "name":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [{'href': row[0], 'name': row[1], 'uncensored': row[2], 'id':row[3]} for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
def update_tags(self, data):
return self.insert_or_update_common(data, self.tbl_name_tags, uniq_key='href')
def query_tags(self, **filters):
try:
sql = f"SELECT href, name, id FROM {self.tbl_name_tags} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"name": " AND name LIKE ?",
"start_id": " AND id > ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "name":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [{'href': row[0], 'name': row[1], 'id': row[2]} for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None

View File

@ -79,32 +79,42 @@ def fetch_actor_list():
#for lang in ['en']:
fetch_actor_list_lang(lang=lang)
# 更新makers列表中的影片信息
def fetch_movies_by_maker():
# 从studio/label/series中获取影片
def fetch_movies_common(tbl):
if debug:
url_list = db_tools.query_maker_hrefs(name='muramura')
url_list = db_tools.query_list_common(tbl=tbl)
else:
if g_uncensored==1:
url_list = db_tools.query_maker_hrefs(from_list=1)
url_list = db_tools.query_list_common(tbl=tbl, uncensored=1)
elif g_uncensored==0:
url_list = db_tools.query_maker_hrefs(from_list=0)
url_list = db_tools.query_list_common(tbl=tbl, uncensored=0)
else:
url_list = db_tools.query_maker_hrefs()
url_list = db_tools.query_list_common(tbl=tbl)
for row in url_list:
url = row['href']
row_id = row['id']
uncensored = row['from_list'] if row['from_list'] > 0 else None
uncensored = row['uncensored'] if row['uncensored'] > 0 else None
# 去掉可下载的标志(如果有)
next_url = utils.remove_url_query(url)
next_url = url
while next_url:
logging.info(f"Fetching data for maker url {next_url} ...")
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="column section-title", attr_type="class"))
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="waterfall", attr_type="id"))
if soup:
list_data, next_url = scraper.parse_maker_detail(soup, next_url)
if list_data:
for movie in list_data:
tmp_id = db_tools.insert_movie_index(title=movie['title'], href=movie['href'], from_movie_makers=1, maker_id=row_id, uncensored=uncensored)
list_data, next_url = scraper.parse_studios_labels_series_detail(soup, next_url)
if list_data:
# 根据tbl的值动态构建额外参数
extra_kwargs = {}
if tbl == 'studio':
extra_kwargs = {'from_movie_studios': 1, 'studio_id': row_id}
elif tbl == 'label':
extra_kwargs = {'from_movie_labels': 1, 'label_id': row_id}
elif tbl == 'series':
extra_kwargs = {'from_movie_series': 1, 'series_id': row_id}
extra_kwargs['uncensored'] = uncensored
for movie in list_data.get('movies', []):
tmp_id = db_tools.insert_movie_index({'title':movie['title'], 'href':movie['href']}, **extra_kwargs)
if tmp_id:
logging.debug(f'insert one movie index to db. movie_id: {tmp_id}, title: {movie['title']}, href: {movie['href']}')
else:
@ -120,85 +130,106 @@ def fetch_movies_by_maker():
if debug:
return True
# 更新makers列表中的影片信息
def fetch_movies_by_studio():
fetch_movies_common('studio')
# 更新series列表中的影片信息
def fetch_movies_by_label():
fetch_movies_common('label')
# 更新series列表中的影片信息
def fetch_movies_by_series():
fetch_movies_common('series')
# 从studio/label/series中获取影片
def update_multilang_common(tbl):
if debug:
url_list = db_tools.query_series_hrefs(name='10musume')
url_list = db_tools.query_list_common(tbl=tbl, limit=3)
else:
if g_uncensored == 1:
url_list = db_tools.query_series_hrefs(from_list=1)
elif g_uncensored == 0:
url_list = db_tools.query_series_hrefs(from_list=0)
if g_uncensored==1:
url_list = db_tools.query_list_common(tbl=tbl, uncensored=1)
elif g_uncensored==0:
url_list = db_tools.query_list_common(tbl=tbl, uncensored=0)
else:
url_list = db_tools.query_series_hrefs()
url_list = db_tools.query_list_common(tbl=tbl)
for row in url_list:
url = row['href']
row_id = row['id']
uncensored = row['from_list'] if row['from_list'] > 0 else None
# 去掉可下载的标志(如果有)
next_url = utils.remove_url_query(url)
while next_url:
logging.info(f"Fetching data for series url {next_url} ...")
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="column section-title", attr_type="class"))
if soup:
list_data, next_url = scraper.parse_series_detail(soup, next_url)
if list_data:
for movie in list_data:
tmp_id = db_tools.insert_movie_index(title=movie['title'], href=movie['href'], from_movie_series=1, series_id=row_id, uncensored=uncensored)
if tmp_id:
logging.debug(f'insert one movie index to db. movie_id: {tmp_id}, title: {movie['title']}, href: {movie['href']}')
else:
logging.warning(f'insert movie index failed. title: {movie['title']}, href: {movie['href']}')
else :
logging.warning(f'parse_page_movie error. url: {next_url}')
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}')
break
if not utils.is_valid_url(url):
logging.info(f'invalid url {url} in {tbl}, skipping...')
continue
langs_url = utils.generate_multilang_urls(url)
for lang, next_url in langs_url.items():
while next_url:
logging.info(f"Fetching data for url {next_url} ..., raw url: {url}")
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="waterfall", attr_type="id"))
if soup:
list_data, next_url = scraper.parse_studios_labels_series_detail(soup, next_url)
if list_data:
lang_meta = list_data.get('meta', {})
if lang_meta.get('title') is not None:
lang_meta['href'] = url
lang_meta[f'{lang}_name'] = lang_meta.get('title')
tmp_id = db_tools.update_pubs_multilang(lang_meta, tbl)
if tmp_id:
logging.debug(f'update pubs multi lang. data: {lang_meta}')
else:
logging.warning(f'update pubs multi lang failed. data: {lang_meta}')
else :
logging.warning(f'parse_page_movie error. url: {next_url}')
# 调试增加brak
if debug:
return True
# 不要翻页,获取首页的即可
break
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}')
break
# 更新series列表中的影片信息
def fetch_movies_by_publishers():
def update_multi_langs():
update_multilang_common('studio')
update_multilang_common('label')
update_multilang_common('series')
# 从studio/label/series中获取影片
def update_multilang_tags():
if debug:
url_list = db_tools.query_publishers_hrefs(limit=1)
url_list = db_tools.query_tags(limit=5)
else:
if g_uncensored == 1:
url_list = db_tools.query_publishers_hrefs(from_list=1)
elif g_uncensored == 0:
url_list = db_tools.query_publishers_hrefs(from_list=0)
else:
url_list = db_tools.query_publishers_hrefs()
url_list = db_tools.query_tags()
for row in url_list:
url = row['href']
row_id = row['id']
# 去掉可下载的标志(如果有)
next_url = utils.remove_url_query(url)
while next_url:
logging.info(f"Fetching data for publisher url {next_url} ...")
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="modal-card", attr_type="class"))
if soup:
list_data, next_url = scraper.parse_publisher_detail(soup, next_url)
if list_data:
for movie in list_data:
tmp_id = db_tools.insert_movie_index(title=movie['title'], href=movie['href'], from_movie_publishers=1, pub_id=row_id)
if tmp_id:
logging.debug(f'insert one movie index to db. movie_id: {tmp_id}, title: {movie['title']}, href: {movie['href']}')
else:
logging.warning(f'insert movie index failed. title: {movie['title']}, href: {movie['href']}')
else :
logging.warning(f'parse_page_movie error. url: {next_url}')
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}')
break
if not utils.is_valid_url(url):
logging.info(f'invalid url {url}, skipping...')
continue
langs_url = utils.generate_multilang_urls(url)
for lang, next_url in langs_url.items():
while next_url:
logging.info(f"Fetching data for url {next_url} ..., raw url: {url}")
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="waterfall", attr_type="id"))
if soup:
list_data, next_url = scraper.parse_studios_labels_series_detail(soup, next_url)
if list_data:
lang_meta = list_data.get('meta', {})
if lang_meta.get('title') is not None:
lang_meta['href'] = url
lang_meta[f'{lang}_name'] = lang_meta.get('title')
tmp_id = db_tools.update_tags(lang_meta)
if tmp_id:
logging.debug(f'update tags multi lang. data: {lang_meta}')
else:
logging.warning(f'update tags multi lang failed. data: {lang_meta}')
else :
logging.warning(f'parse_page_movie error. url: {next_url}')
# 调试增加brak
if debug:
return True
# 不要翻页,获取首页的即可
break
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}')
break
# 更新演员信息
def fetch_performers_detail():
@ -376,11 +407,13 @@ def fetch_movies_detail():
# 建立缩写到函数的映射
function_map = {
"actor_list": fetch_actor_list,
"makers": fetch_movies_by_maker,
"studio" : fetch_movies_by_studio,
"series" : fetch_movies_by_series,
"pub" : fetch_movies_by_publishers,
"labels" : fetch_movies_by_label,
"actors" : fetch_performers_detail,
"movies" : fetch_movies_detail,
"langs" : update_multi_langs,
"tags" : update_multilang_tags,
}
# 主函数
@ -415,7 +448,7 @@ def main(cmd, args):
db_tools.finalize_task_log(task_id)
# TODO:
# 1,
# 1, tags 和 studio / label / series 的多语言
# 设置环境变量
def set_env(args):

View File

@ -128,7 +128,7 @@ def json_to_csv(data, output_file):
writer.writerow(row)
# javbus 使用处理多语言url归一化
def normalize_url(url: str) -> str:
"""
标准化URL移除语言前缀使不同语言版本的URL保持一致
@ -166,7 +166,48 @@ def normalize_url(url: str) -> str:
print(f"URL标准化失败: {url}, 错误: {e}")
return url # 出错时返回原始URL
import json
# javbus使用归一化的url转为多语言
def generate_multilang_urls(url, languages=['en', 'ja']):
"""
根据给定的URL生成多语言版本的URL
Args:
url (str): 原始URL
languages (list): 需要生成的语言代码列表
Returns:
list: 包含多语言URL的列表
"""
try:
# 解析URL
parsed = urlparse(url)
path = parsed.path
# 处理以斜杠开头的路径
if path.startswith('/'):
path = path[1:] # 移除开头的斜杠
# 生成多语言URL
result = {}
for lang in languages:
# 构建新的路径:语言代码 + 原始路径
new_path = f'/{lang}/{path}'
# 构建新的URL
new_url = urlunparse((
parsed.scheme,
parsed.netloc,
new_path,
parsed.params,
parsed.query,
parsed.fragment
))
result[lang] = new_url
return result
except Exception as e:
print(f"生成多语言URL时出错: {e}")
return {}
def pretty_print_json(data, n=10, indent=4, sort_keys=False):
"""