modify scripts

This commit is contained in:
oscarz
2025-06-24 10:02:28 +08:00
parent 882ee5047a
commit 12c53b043d
8 changed files with 2569 additions and 0 deletions

39
src/config/config.py Normal file
View File

@ -0,0 +1,39 @@
import os
from pathlib import Path
# MySQL 配置
db_config = {
'host': 'testdb',
'user': 'root',
'password': 'mysqlpw',
'database': 'stockdb'
}
home_dir = os.path.expanduser("~")
global_host_data_dir = f'{home_dir}/hostdir/scripts_data'
global_share_data_dir = f'{home_dir}/sharedata'
# 获取当前文件所在目录
current_dir = Path(__file__).resolve().parent
# 找到项目根目录,假设项目根目录下有一个 src 文件夹
project_root = current_dir
while project_root.name != 'src' and project_root != project_root.parent:
project_root = project_root.parent
# 获取 src 目录
def get_src_directory():
return project_root
# 获取 src/config 目录
def get_src_config_directory():
return project_root / 'config'
# 获取 log 目录
def get_log_directory():
"""
获取与 src 平行的 log 目录路径。如果 log 目录不存在,则自动创建。
"""
log_dir = project_root.parent / 'log'
log_dir.mkdir(parents=True, exist_ok=True)
return log_dir

View File

@ -0,0 +1,71 @@
import logging
import cloudscraper
from bs4 import BeautifulSoup
import src.utils.utils as utils
# 设置 headers 和 scraper
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0'
}
# 定义 cookie
cookies = {
}
scraper = cloudscraper.create_scraper()
http_code_404 = 404
http_code_login = 401
http_code_local = 99
logging.getLogger().setLevel(logging.DEBUG)
#使用 CloudScraper 进行网络请求,并执行页面验证,支持不同解析器和预处理
def fetch_page(url, validator, max_retries=3, parser="html.parser", preprocessor=None, headers=headers, cookies=cookies):
for attempt in range(max_retries):
try:
if not utils.is_valid_url(url):
logging.error(f'wrong url format: {url}')
return None, None
response = scraper.get(url, headers=headers, cookies=cookies)
# 处理 HTTP 状态码
if response.status_code == 404:
logging.debug(f"Page not found (404): {url}")
return None, http_code_404 # 直接返回 404调用方可以跳过
response.raise_for_status() # 处理 HTTP 错误
# 检查是否发生跳转,比如到登录页面
if response.history:
logging.debug(f"Page redirected on {url}. Checking if it's a login page.")
soup = BeautifulSoup(response.text, parser)
# 判断是否为登录页面,
if soup.find('div', id='ageVerify'):
logging.warning(f"Page redirected to login page on {url}.")
return None, http_code_login
# 预处理 HTML如果提供了 preprocessor
html_text = preprocessor(response.text) if preprocessor else response.text
soup = BeautifulSoup(html_text, parser)
if validator(soup): # 进行自定义页面检查
return soup, response.status_code
logging.warning(f"Validation failed on attempt {attempt + 1} for {url}")
except cloudscraper.exceptions.CloudflareChallengeError as e:
logging.error(f"Cloudflare Challenge Error on {url}: {e}, Retring...")
except cloudscraper.exceptions.CloudflareCode1020 as e:
logging.error(f"Access Denied (Error 1020) on {url}: {e}, Retring...")
except Exception as e:
logging.error(f"Unexpected error on {url}: {e}, Retring...")
logging.error(f'Fetching failed after max retries. {url}')
return None, None # 达到最大重试次数仍然失败
# 通用的 HTML 结构验证器
def generic_validator(soup, tag, identifier, attr_type="id"):
if attr_type == "id":
return soup.find(tag, id=identifier) is not None
elif attr_type == "class":
return bool(soup.find_all(tag, class_=identifier))
elif attr_type == "name":
return bool(soup.find('select', {'name': identifier}))
return False

515
src/crawling/craw_javbus.py Normal file
View File

@ -0,0 +1,515 @@
import cloudscraper
import logging
import re
import json
from functools import partial
from urllib.parse import urljoin
import src.config.config as config
import src.utils.utils as utils
import src.crawling.craw_common as scraper
# 定义基础 URL 和可变参数
host_url = "https://www.javbus.com"
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Sec-Fetch-Site": "none",
"Accept-Encoding": "gzip, deflate, br",
"Sec-Fetch-Mode": "navigate",
"Host": "www.javbus.com",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15",
"Accept-Language": "zh-CN,zh-Hans;q=0.9",
"Sec-Fetch-Dest": "document",
"Connection": "keep-alive",
}
cookies = {
'PHPSESSID': 'l9m4ugaaao1hgvl3micr22u3o6',
'existmag': 'all',
'age': 'verified'
}
# 解析 HTML 内容,提取需要的数据
def parse_actors_list(soup, href):
div_actors = soup.find("div", id='waterfall')
if not div_actors:
logging.warning(f"Warning: No actors div found ")
return None, None
# 解析元素
rows = div_actors.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
# 获取演员详情链接
actor_link = row.find('a')['href']
# 获取演员名字
actor_name = row.find('span').text.strip()
# 获取头像图片链接
avatar_url = row.find('img')['src']
list_data.append({
'name' : actor_name,
'href' : urljoin(host_url, actor_link),
'pic' : avatar_url
})
# 查找 "下一页" 按钮
div_link = soup.find("div", class_='text-center hidden-xs')
if div_link:
next_page_element = soup.find('a', id='next')
if next_page_element:
next_page_url = next_page_element['href']
next_url = urljoin(href, next_page_url)
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_actor_detail(soup, href):
# 先找一下别名
alias_list = []
div_meta = soup.find('span', class_='actor-section-name')
if not div_meta:
logging.warning(f'warning: no meta data found in page {href}')
return None, None
alias_div = soup.find('div', class_='column section-title')
if alias_div:
meta_list = alias_div.find_all('span', class_='section-meta')
if len(meta_list) > 1:
alias_list = meta_list[0].text.strip().split(", ")
# 头像
pic = ''
avatar = soup.find("div", class_="column actor-avatar")
if avatar:
pic = parse_avatar_image(avatar)
# 返回数据
actor = {}
# 使用正则表达式查找 class 包含 'movie-list h cols-4' 的 div 元素
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-'))
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-8')
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return None, None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
logging.debug(f'current_page: {current_page_number}, next page_num: {next_page_number}')
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
actor = {
'pic' : pic,
'alias' : alias_list,
'movies' : list_data
}
return actor, next_url
# 解析单个元素
def parse_movie_one(soup, keys):
key_strong = soup.find('strong', string=lambda text: text in keys)
if key_strong:
key_span = key_strong.find_next_sibling('span', class_='value')
if key_span:
return key_span.text.strip()
return None
# 解析值和链接
def parse_movie_val_href(soup, keys):
key_strong = soup.find('strong', string=lambda text: text in keys)
if key_strong:
key_span = key_strong.find_next_sibling('span', class_='value')
if key_span:
a_tag = key_span.find('a')
if a_tag:
return a_tag.text.strip(), host_url + a_tag.get('href')
else:
return key_span.text.strip(), None
return None, None
# 解析多个值和链接
def parse_movie_arr(soup, keys):
key_strong = soup.find('strong', string=lambda text: text in keys)
if key_strong:
key_span = key_strong.find_next_sibling('span', class_='value')
if key_span:
actors = []
a_tags = key_span.find_all('a')
for a_tag in a_tags:
actors.append({
'name': a_tag.text.strip(),
'href': host_url + a_tag.get('href')
})
return actors
return []
# 解析 HTML 内容,提取需要的数据
def parse_movie_detail(soup, href, title):
div_video = soup.find("div", class_='video-meta-panel')
if not div_video:
logging.warning(f"Warning: No movies div found ")
return None, None
result = {}
result['href'] = href
result['title'] = title
# 获取封面图片
cover_img = soup.select_one('.column-video-cover a')
result['cover_url'] = cover_img['href'] if cover_img else None
# 获取番号
result['serial_number'] = parse_movie_one(soup, ['番號:', 'ID:'])
result['release_date'] = parse_movie_one(soup, ['日期:', 'Released Date:'])
result['duration'] = parse_movie_one(soup, ['時長:', 'Duration:'])
# 获取maker系列
result['maker_name'], result['maker_link'] = parse_movie_val_href(soup, ['片商:', 'Maker:'])
result['series_name'], result['series_link'] = parse_movie_val_href(soup, ['系列:', 'Series:'])
result['pub_name'], result['pub_link'] = parse_movie_val_href(soup, ['發行:', 'Publisher:'])
# 获取演员tags
result['tags'] = parse_movie_arr(soup, ['類別:', 'Tags:'])
result['actors'] = parse_movie_arr(soup, ['演員:', 'Actor(s):'])
return result
# 解析 HTML 内容,提取需要的数据
def parse_series_uncensored(soup, href):
div_series = soup.find("div", id='series')
if not div_series:
logging.warning(f"Warning: No div_series div found ")
return None, None
# 解析元素
rows = div_series.find_all('a', class_='box')
list_data = []
next_url = None
for row in rows:
name = row.find('strong').text.strip()
href = row['href']
div_movies = row.find('span')
movies = 0
if div_movies:
match = re.search(r'\((\d+)\)', div_movies.text.strip())
if match:
movies = int(match.group(1))
list_data.append({
'name' : name,
'href' : host_url + href if href else '',
'movies' : movies
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_series_detail(soup, href):
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_makers_uncensored(soup, href):
div_series = soup.find("div", id='makers')
if not div_series:
logging.warning(f"Warning: No makers div found ")
return None, None
# 解析元素
rows = div_series.find_all('a', class_='box')
list_data = []
next_url = None
for row in rows:
name = row.find('strong').text.strip()
href = row['href']
div_movies = row.find('span')
movies = 0
if div_movies:
match = re.search(r'\((\d+)\)', div_movies.text.strip())
if match:
movies = int(match.group(1))
list_data.append({
'name' : name,
'href' : host_url + href if href else '',
'movies' : movies
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_maker_detail(soup, href):
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_publisher_detail(soup, href):
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_uncensored(soup, href):
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-8')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
def pretty_print_json(data, n=10, indent=4, sort_keys=False):
"""
以美化格式打印数组的前n个元素其他元素用"..."表示
参数:
- data: 要打印的数据(应为数组)
- n: 要显示的元素数量
- indent: 缩进空格数
- sort_keys: 是否按键排序
"""
try:
# 处理非数组数据
if not isinstance(data, list):
print(formatted)
return
# 复制原始数据,避免修改原数组
data_copy = data.copy()
# 切片取前n个元素
first_n_elements = data_copy[:n]
# 如果数组长度超过n添加"..."标记
if len(data) > n:
result = first_n_elements + ["... ({} more elements)".format(len(data) - n)]
else:
result = first_n_elements
# 格式化输出
formatted = json.dumps(result, indent=indent, ensure_ascii=False, sort_keys=sort_keys)
print(formatted)
except TypeError as e:
print(f"错误:无法格式化数据。详情:{e}")
except Exception as e:
print(f"打印时发生意外错误:{e}")
def test_actor_list(url='https://www.javbus.com/uncensored/actresses/1'):
next_url = url
all_data = []
while next_url:
print(f'fetching page {next_url}')
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="waterfall", attr_type="id"),max_retries=1, headers=headers, cookies=cookies)
if soup:
list_data, next_url = parse_actors_list(soup, next_url)
if list_data :
all_data.extend(list_data)
pretty_print_json(all_data)
else:
print('get wrong page.')
if next_url:
print(f"\n\nnext url: {next_url}")
else:
print(f"wrong request. url: {next_url}, status_code: {status_code}")
break
if __name__ == "__main__":
#test_actors_list()
#test_actor()
#test_movie_detail()
#test_series_list()
#test_series_detail()
logging.getLogger().setLevel(logging.DEBUG)
test_actor_list()
test_actor_list('https://www.javbus.com/en/actresses')

121
src/db_utils/db_common.py Normal file
View File

@ -0,0 +1,121 @@
import sqlite3
import json
import logging
from datetime import datetime
import src.config.config as config
# 连接 SQLite 数据库
DB_PATH = f"{config.global_share_data_dir}/sqlite/shared.db" # 替换为你的数据库文件
# 检查 SQLite 版本
lower_sqlite_version = False
sqlite_version = sqlite3.sqlite_version_info
if sqlite_version < (3, 24, 0):
lower_sqlite_version = True
# 获取表的列名和默认值
def get_table_columns_and_defaults(cursor, tbl_name):
try:
cursor.execute(f"PRAGMA table_info({tbl_name})")
columns = cursor.fetchall()
column_info = {}
for col in columns:
col_name = col[1]
default_value = col[4]
column_info[col_name] = default_value
return column_info
except sqlite3.Error as e:
logging.error(f"Error getting table columns: {e}")
return None
# 检查并处理数据
def check_and_process_data(cursor, data, tbl_name):
column_info = get_table_columns_and_defaults(cursor=cursor, tbl_name=tbl_name)
if column_info is None:
return None
processed_data = {}
for col, default in column_info.items():
if col == 'id' or col == 'created_at': # 自增主键,不需要用户提供; 创建日期,使用建表默认值
continue
if col == 'updated_at': # 日期函数,用户自己指定即可
processed_data[col] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if col in data:
processed_data[col] = data[col]
return processed_data
# 插入或更新数据
def insert_or_update_common(cursor, conn, data, tbl_name, uniq_key='url'):
if lower_sqlite_version:
return insert_or_update_common_lower(cursor, conn, data, tbl_name, uniq_key)
try:
processed_data = check_and_process_data(cursor, data, tbl_name)
if processed_data is None:
return None
columns = ', '.join(processed_data.keys())
values = list(processed_data.values())
placeholders = ', '.join(['?' for _ in values])
update_clause = ', '.join([f"{col}=EXCLUDED.{col}" for col in processed_data.keys() if col != uniq_key])
sql = f'''
INSERT INTO {tbl_name} ({columns})
VALUES ({placeholders})
ON CONFLICT ({uniq_key}) DO UPDATE SET {update_clause}
'''
cursor.execute(sql, values)
conn.commit()
# 获取插入或更新后的 report_id
cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],))
report_id = cursor.fetchone()[0]
return report_id
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 插入或更新数据
def insert_or_update_common_lower(cursor, conn, data, tbl_name, uniq_key='url'):
try:
processed_data = check_and_process_data(cursor, data, tbl_name)
if processed_data is None:
return None
columns = ', '.join(processed_data.keys())
values = list(processed_data.values())
placeholders = ', '.join(['?' for _ in values])
# 先尝试插入数据
try:
sql = f'''
INSERT INTO {tbl_name} ({columns})
VALUES ({placeholders})
'''
cursor.execute(sql, values)
conn.commit()
except sqlite3.IntegrityError: # 唯一键冲突,执行更新操作
update_clause = ', '.join([f"{col}=?" for col in processed_data.keys() if col != uniq_key])
update_values = [processed_data[col] for col in processed_data.keys() if col != uniq_key]
update_values.append(data[uniq_key])
sql = f"UPDATE {tbl_name} SET {update_clause} WHERE {uniq_key} = ?"
cursor.execute(sql, update_values)
conn.commit()
# 获取插入或更新后的 report_id
cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],))
report_id = cursor.fetchone()[0]
return report_id
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 测试代码
if __name__ == "__main__":
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
cursor = conn.cursor()
tbl_name_actors = 'javhd_models'
print(get_table_columns_and_defaults(cursor, tbl_name_actors))

1036
src/db_utils/db_javbus.py Normal file

File diff suppressed because it is too large Load Diff

521
src/javbus/fetch.py Normal file
View File

@ -0,0 +1,521 @@
import json
import time
import csv
import argparse
import textwrap
import logging
from functools import partial
from urllib.parse import urljoin, urlparse
import src.config.config as config
import src.logger.logger as logger
import src.db_utils.db_javbus as db_tools
import src.crawling.craw_common as scraper_base
import src.crawling.craw_javbus as scraper
import src.utils.utils as utils
logger.setup_logging()
debug = False
skip_local = False
scan_mode = 0
update_mode = 0
# 获取演员列表
def fetch_actor_list_lang(lang="en", uncensored=None):
if uncensored:
un_flag = 1
s_url = f"/{lang}/uncensored/actresses" if lang != 'zh' else f"/uncensored/actresses"
else:
un_flag = 0
s_url = f"/{lang}/actresses" if lang != 'zh' else f"/actresses"
current_url = urljoin(scraper.host_url, s_url)
num = 1
while current_url:
logging.info(f"fetching url {current_url}")
soup, status_code = scraper_base.fetch_page(current_url, partial(scraper_base.generic_validator, tag="div", identifier="waterfall", attr_type="id"), headers=scraper.headers, cookies=scraper.cookies)
if soup:
list_data, current_url = scraper.parse_actors_list(soup, current_url)
if list_data :
# 写入数据库
for row in list_data:
row[f'{lang}_name'] = row['name']
row['href'] = utils.normalize_url(row['href'])
row_id = db_tools.insert_actor_index(row, uncensored=un_flag, from_actor_list=1)
if row_id:
logging.debug(f'insert actor to db. row_id:{row_id}, data: {row}')
else:
logging.warning(f'insert actor failed. data: {row}')
else:
logging.warning(f'fetch actor error. {current_url} ...')
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {current_url}')
break
time.sleep(0.3)
# 调试break
if debug:
return True
# 获取演员列表
def fetch_actor_list():
#for lang in ["en", "ja", "zh"]:
for lang in ['ja']:
fetch_actor_list_lang(lang=lang, uncensored=1)
#for lang in ["en", "ja", "zh"]:
for lang in ['ja']:
fetch_actor_list_lang(lang=lang)
# 获取演员列表
def fetch_actor_list2():
next_url = scraper.actors_uncensored_base_url
while next_url:
logging.info(f'fetching page {next_url}')
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="actors", attr_type="id"))
if soup:
list_data, next_url = scraper.parse_actors_uncensored(soup, next_url)
if list_data :
# 写入数据库
for row in list_data:
actor_id = db_tools.insert_actor_index(name=row['name'], href=row.get('href', ''), from_actor_list=1)
if actor_id:
logging.debug(f'insert performer index to db. performer_id:{actor_id}, name: {row['name']}, href:{row['href']}')
else:
logging.warning(f'insert performer index failed. name: {row['name']}, href:{row['href']}')
else:
logging.warning(f'fetch actor error. {next_url} ...')
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}')
break
# 获取makers列表
def fetch_makers_list():
next_url = scraper.makers_uncensored_base_url
while next_url:
logging.info(f'fetching page {next_url}')
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="makers", attr_type="id"))
if soup:
list_data, next_url = scraper.parse_makers_uncensored(soup, next_url)
if list_data :
# 写入数据库
for row in list_data:
maker_id = db_tools.insert_or_update_makers(row, caller='list')
if maker_id:
logging.debug(f'insert maker to db. maker_id:{maker_id}, name: {row['name']}, href:{row['href']}')
else:
logging.warning(f'insert maker failed. name: {row['name']}, href:{row['href']}')
else:
logging.warning(f'fetch actor error. {next_url} ...')
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}')
break
# 获取series列表
def fetch_series_list():
next_url = scraper.series_uncensored_base_url
while next_url:
logging.info(f'fetching page {next_url}')
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="series", attr_type="id"))
if soup:
list_data, next_url = scraper.parse_series_uncensored(soup, next_url)
if list_data :
# 写入数据库
for row in list_data:
maker_id = db_tools.insert_or_update_series(row, caller='list')
if maker_id:
logging.debug(f'insert series to db. maker_id:{maker_id}, name: {row['name']}, href:{row['href']}')
else:
logging.warning(f'insert series failed. name: {row['name']}, href:{row['href']}')
else:
logging.warning(f'fetch actor error. {next_url} ...')
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}')
break
# 更新makers列表中的影片信息
def fetch_movies_by_maker():
if debug:
url_list = db_tools.query_maker_hrefs(name='muramura')
else:
if scan_mode==1:
url_list = db_tools.query_maker_hrefs(from_list=1)
elif scan_mode==0:
url_list = db_tools.query_maker_hrefs(from_list=0)
else:
url_list = db_tools.query_maker_hrefs()
for row in url_list:
url = row['href']
row_id = row['id']
uncensored = row['from_list'] if row['from_list'] > 0 else None
# 去掉可下载的标志(如果有)
next_url = utils.remove_url_query(url)
while next_url:
logging.info(f"Fetching data for maker url {next_url} ...")
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="column section-title", attr_type="class"))
if soup:
list_data, next_url = scraper.parse_maker_detail(soup, next_url)
if list_data:
for movie in list_data:
tmp_id = db_tools.insert_movie_index(title=movie['title'], href=movie['href'], from_movie_makers=1, maker_id=row_id, uncensored=uncensored)
if tmp_id:
logging.debug(f'insert one movie index to db. movie_id: {tmp_id}, title: {movie['title']}, href: {movie['href']}')
else:
logging.warning(f'insert movie index failed. title: {movie['title']}, href: {movie['href']}')
else :
logging.warning(f'parse_page_movie error. url: {next_url}')
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}')
break
# 调试增加brak
if debug:
return True
# 更新series列表中的影片信息
def fetch_movies_by_series():
if debug:
url_list = db_tools.query_series_hrefs(name='10musume')
else:
if scan_mode == 1:
url_list = db_tools.query_series_hrefs(from_list=1)
elif scan_mode == 0:
url_list = db_tools.query_series_hrefs(from_list=0)
else:
url_list = db_tools.query_series_hrefs()
for row in url_list:
url = row['href']
row_id = row['id']
uncensored = row['from_list'] if row['from_list'] > 0 else None
# 去掉可下载的标志(如果有)
next_url = utils.remove_url_query(url)
while next_url:
logging.info(f"Fetching data for series url {next_url} ...")
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="column section-title", attr_type="class"))
if soup:
list_data, next_url = scraper.parse_series_detail(soup, next_url)
if list_data:
for movie in list_data:
tmp_id = db_tools.insert_movie_index(title=movie['title'], href=movie['href'], from_movie_series=1, series_id=row_id, uncensored=uncensored)
if tmp_id:
logging.debug(f'insert one movie index to db. movie_id: {tmp_id}, title: {movie['title']}, href: {movie['href']}')
else:
logging.warning(f'insert movie index failed. title: {movie['title']}, href: {movie['href']}')
else :
logging.warning(f'parse_page_movie error. url: {next_url}')
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}')
break
# 调试增加brak
if debug:
return True
# 更新series列表中的影片信息
def fetch_movies_by_publishers():
if debug:
url_list = db_tools.query_publishers_hrefs(limit=1)
else:
if scan_mode == 1:
url_list = db_tools.query_publishers_hrefs(from_list=1)
elif scan_mode == 0:
url_list = db_tools.query_publishers_hrefs(from_list=0)
else:
url_list = db_tools.query_publishers_hrefs()
for row in url_list:
url = row['href']
row_id = row['id']
# 去掉可下载的标志(如果有)
next_url = utils.remove_url_query(url)
while next_url:
logging.info(f"Fetching data for publisher url {next_url} ...")
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="modal-card", attr_type="class"))
if soup:
list_data, next_url = scraper.parse_publisher_detail(soup, next_url)
if list_data:
for movie in list_data:
tmp_id = db_tools.insert_movie_index(title=movie['title'], href=movie['href'], from_movie_publishers=1, pub_id=row_id)
if tmp_id:
logging.debug(f'insert one movie index to db. movie_id: {tmp_id}, title: {movie['title']}, href: {movie['href']}')
else:
logging.warning(f'insert movie index failed. title: {movie['title']}, href: {movie['href']}')
else :
logging.warning(f'parse_page_movie error. url: {next_url}')
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}')
break
# 调试增加brak
if debug:
return True
# 更新演员信息
def fetch_performers_detail():
limit_count = 5 if debug else 100
performers_list = []
last_performer_id = 0
abnormal_codes = [scraper.http_code_404, scraper.http_code_login]
def get_performers(**kwargs):
if scan_mode == 1:
kwargs["from_actor_list"] = 1
elif scan_mode == 0:
kwargs["from_actor_list"] = 0
else:
logging.debug(f"scan all records")
kwargs["order_by"] = 'id asc'
return db_tools.query_actors(limit=limit_count, **kwargs)
while True:
if update_mode == 0: # 只遍历新纪录
performers_list = get_performers(start_id=0, is_full_data=0)
elif update_mode == 1: # 只遍历完整纪录
performers_list = get_performers(start_id=last_performer_id, is_full_data=1)
elif update_mode == 2: # 0+1
performers_list = get_performers(start_id=last_performer_id, is_full_data_not_in=abnormal_codes)
elif update_mode == 3: # 其他
performers_list = get_performers(start_id=last_performer_id, is_full_data_in =abnormal_codes)
else: # 全部
performers_list = get_performers(start_id=last_performer_id)
if len(performers_list) < 1:
logging.info(f'all performers fetched.')
break
succ_rows = 0
for performer in performers_list:
url = performer['href']
person = performer['name']
pic = ''
alias = []
next_url = url
all_movies = []
need_insert = True
while next_url:
logging.debug(f"Fetching data for actor ({person}), url {next_url} ...")
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="span", identifier="actor-section-name", attr_type="class"))
if soup:
data, next_url = scraper.parse_actor_detail(soup, next_url)
if data:
pic = data.get('pic', '')
alias = data.get('alias', [])
all_movies.extend(data.get('movies', []))
elif status_code and status_code == scraper.http_code_404:
actor_id = db_tools.insert_or_update_actor_404(name=person, href=url, is_full_data=scraper.http_code_404)
logging.warning(f'404 page. id: {actor_id}, name: ({person}), url: {url}, Skiping...')
need_insert = False
break
elif status_code and status_code == scraper.http_code_login:
actor_id = db_tools.insert_or_update_actor_404(name=person, href=url, is_full_data=scraper.http_code_login)
logging.warning(f'401 page(need login). id: {actor_id}, name: ({person}), url: {url}, Skiping...')
need_insert = False
break
else:
logging.warning(f'fetch_page error. url: {url}')
# 如果出现了401或者404已经处理直接跳过
if not need_insert:
continue
# 获取完了个人的所有影片,开始插入数据
performer_id = db_tools.insert_or_update_actor({
'href': url,
'name': person,
'pic' : pic,
'alias' : alias,
'credits':all_movies
})
if performer_id:
logging.debug(f'insert one person, id: {performer_id}, person: ({person}), url: {url}')
last_performer_id = performer_id
succ_rows += 1
else:
logging.warning(f'insert person: ({person}) {url} failed.')
time.sleep(0.5)
logging.info(f'total request: {len(performers_list)}, succ: {succ_rows}, last performer id: {last_performer_id}')
# 调试break
if debug:
return True
# 更新影片信息
def fetch_movies_detail():
limit_count = 10 if debug else 100
movies_list = []
last_movie_id = 0
abnormal_codes = [scraper.http_code_404, scraper.http_code_login]
def get_movies(**kwargs):
if scan_mode == 1:
kwargs["uncensored"] = 1
elif scan_mode == 0:
kwargs["uncensored"] = 0
else:
logging.debug(f"scan all records.")
kwargs["order_by"] = 'id asc'
return db_tools.query_movie_hrefs(limit=limit_count, **kwargs)
while True:
if update_mode == 0: # 只遍历新纪录
movies_list = get_movies(start_id=0, is_full_data=0)
elif update_mode == 1: # 只遍历完整纪录
movies_list = get_movies(start_id=last_movie_id, is_full_data=1)
elif update_mode == 2: # 0+1
movies_list = get_movies(start_id=last_movie_id, is_full_data_not_in=abnormal_codes)
elif update_mode == 3: # 其他
movies_list = get_movies(start_id=last_movie_id, is_full_data_in =abnormal_codes)
else: # 全部
movies_list = get_movies(start_id=last_movie_id)
if len(movies_list) < 1:
logging.info(f'all performers fetched.')
break
succ_count = 0
for movie in movies_list:
url = movie['href']
title = movie['title']
curr_id = movie['id']
logging.debug(f"Fetching data for movie ({title}), url {url} ...")
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="video-meta-panel", attr_type="class"))
# 从本地读取的文件,忽略
if skip_local and status_code == scraper.http_code_local :
last_movie_id = curr_id
succ_count += 1
continue
# 解析页面,写入数据库
if soup:
movie_data = scraper.parse_movie_detail(soup, url, title)
if movie_data :
movie_id = db_tools.insert_or_update_movie(movie_data)
if movie_id:
logging.debug(f'insert one movie, id: {movie_id}, title: ({title}) url: {url}')
last_movie_id = movie_id
succ_count += 1
else:
logging.warning(f'insert movie {url} failed.')
else:
logging.warning(f'parse_page_movie error. url: {url}')
elif status_code and status_code == scraper.http_code_404:
movie_id = db_tools.insert_or_update_movie_404(title=title, href=url, is_full_data=scraper.http_code_404)
logging.warning(f'404 page. id: {movie_id}, title: ({title}), url: {url}, Skiping...')
elif status_code and status_code == scraper.http_code_login:
movie_id = db_tools.insert_or_update_movie_404(title=title, href=url, is_full_data=scraper.http_code_login)
logging.warning(f'401 page(need login). id: {movie_id}, title: ({title}), url: {url}, Skiping...')
else:
logging.warning(f'fetch_page error. url: {url}')
time.sleep(0.5)
logging.info(f'total request: {len(movies_list)}, succ: {succ_count}. last movie id: {last_movie_id}')
# 调试增加break
if debug:
return True
# 建立缩写到函数的映射
function_map = {
"actor_list": fetch_actor_list,
"maker_list": fetch_makers_list,
"series_list": fetch_series_list,
"makers": fetch_movies_by_maker,
"series" : fetch_movies_by_series,
"pub" : fetch_movies_by_publishers,
"actors" : fetch_performers_detail,
"movies" : fetch_movies_detail,
}
# 主函数
def main(cmd, args):
# 开启任务
task_id = db_tools.insert_task_log()
if task_id is None:
logging.warning(f'insert task log error.')
return None
logging.info(f"running task. id: {task_id}, args: {args}")
# 执行指定的函数
if cmd:
function_names = args.cmd.split(",") # 拆分输入
for short_name in function_names:
func = function_map.get(short_name.strip()) # 从映射中获取对应的函数
if callable(func):
db_tools.update_task_log(task_id, task_status=f'Running {short_name}')
func()
else:
logging.warning(f" {short_name} is not a valid function shortcut.")
else: # 全量执行
for name, func in function_map.items():
if callable(func):
db_tools.update_task_log(task_id, task_status=f'Running {name}')
func()
else:
logging.warning(f" {short_name} is not a valid function shortcut.")
logging.info(f'all process completed!')
db_tools.finalize_task_log(task_id)
# TODO:
# 1,
# 设置环境变量
def set_env(args):
global debug
debug = args.debug
if debug:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
global skip_local
skip_local = args.skip_local
global scan_mode
scan_mode = args.scan_mode
global update_mode
if args.update:
update_mode = args.update
if __name__ == "__main__":
# 命令行参数处理
keys_str = ",".join(function_map.keys())
usage_examples = textwrap.dedent('''
示例用法:
python3 ./fetch.py # 遍历新增的所有记录
python3 ./fetch.py --scan_mode=1 # 遍历新增的 uncensored 记录(无码片)
python3 ./fetch.py --scan_mode=0 # 遍历新增的 非uncensored 记录(有码片)
python3 ./fetch.py --scan_mode=2 # 遍历所有新增
python3 ./fetch.py --update=4 # 遍历全量的记录
python3 ./fetch.py --update=4 --scan_mode=1 # 遍历全量的 uncensored 记录(无码片)
python3 ./fetch.py --update=4 --scan_mode=0 # 遍历全量的 非uncensored 记录(有码片)
python3 ./fetch.py --update=4 --scan_mode=2 # 遍历全量记录
''')
parser = argparse.ArgumentParser(
description='fetch javdb data.\n\n' + usage_examples,
formatter_class=argparse.RawDescriptionHelpFormatter
)
#parser = argparse.ArgumentParser(description='fetch javdb data.')
parser.add_argument("--cmd", type=str, help=f"Comma-separated list of function shortcuts: {keys_str}")
parser.add_argument('--update', type=int, choices=[0, 1, 2, 3, 4], default=0, help='0-只遍历is_full_data=0(默认), 1-只遍历is_full_data=1, 2-遍历is_full_data<=1, 3-只遍历is_full_data>1(异常数据), 4-遍历所有')
parser.add_argument('--scan_mode', type=int, choices=[0, 1, 2], default=1, help='1-只遍历所有 uncensored 的 makers/series/actors/movies(默认), 0-与前者相反, 2-全量')
parser.add_argument('--skip_local', action='store_true', help='如果本地缓存了页面,则跳过数据库操作')
parser.add_argument('--debug', action='store_true', help='Enable debug mode (limit records)')
args = parser.parse_args()
set_env(args)
main(args.cmd, args)

99
src/logger/logger.py Normal file
View File

@ -0,0 +1,99 @@
import logging
import os
import inspect
import time
from datetime import datetime
from pathlib import Path
from logging.handlers import RotatingFileHandler
from collections import defaultdict
from src.config.config import get_log_directory, get_src_directory
# 统计日志频率
log_count = defaultdict(int) # 记录日志的次数
last_log_time = defaultdict(float) # 记录上次写入的时间戳
class RateLimitFilter(logging.Filter):
"""
频率限制过滤器:
1. 在 60 秒内,同样的日志最多写入 60 次,超过则忽略
2. 如果日志速率超过 100 条/秒,发出告警
"""
LOG_LIMIT = 600 # 每分钟最多记录相同消息 10 次
def filter(self, record):
global log_count, last_log_time
message_key = record.getMessage() # 获取日志内容
# 计算当前时间
now = time.time()
elapsed = now - last_log_time[message_key]
# 限制相同日志的写入频率
if elapsed < 60: # 60 秒内
log_count[message_key] += 1
if log_count[message_key] > self.LOG_LIMIT:
return False # 直接丢弃
else:
log_count[message_key] = 1 # 超过 60 秒,重新计数
last_log_time[message_key] = now
return True # 允许写入日志
def get_caller_filename():
# 获取调用栈
stack = inspect.stack()
# 当前脚本文件名
current_script = os.path.basename(__file__)
# 遍历栈帧,找到不是当前脚本的调用者
for frame_info in stack[1:]:
if os.path.basename(frame_info.filename) != current_script:
caller_path = Path(frame_info.filename)
# 尝试找到 src 目录
try:
relative_path = caller_path.relative_to(get_src_directory())
# 去除扩展名
relative_path_without_ext = relative_path.with_suffix('')
# 替换路径分隔符为下划线
return str(relative_path_without_ext).replace(os.sep, '-')
except ValueError:
# 如果无法获取相对于 src 的路径,使用原文件名
return os.path.splitext(os.path.basename(frame_info.filename))[0]
return None
def setup_logging(log_filename=None):
# 如果未传入 log_filename则使用当前脚本名称作为日志文件名
if log_filename is None:
caller_filename = get_caller_filename()
common_log_dir = get_log_directory()
current_date = datetime.now().strftime('%Y%m%d')
# 拼接 log 文件名,将日期加在扩展名前
log_filename = f'{common_log_dir}/{caller_filename}_{current_date}.log'
max_log_size = 100 * 1024 * 1024 # 10 MB
max_log_files = 10 # 最多保留 10 个日志文件
file_handler = RotatingFileHandler(log_filename, maxBytes=max_log_size, backupCount=max_log_files)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s'
))
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s'
))
# 创建 logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.handlers = [] # 避免重复添加 handler
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# 添加频率限制
rate_limit_filter = RateLimitFilter()
file_handler.addFilter(rate_limit_filter)
console_handler.addFilter(rate_limit_filter)

167
src/utils/utils.py Normal file
View File

@ -0,0 +1,167 @@
import re
import os
import json
import time
import csv
from datetime import datetime
from urllib.parse import urlparse
import logging
import src.config.config as config
from urllib.parse import urlparse, urlunparse, parse_qs, urlencode
update_dir = f'{config.global_host_data_dir}/javdb'
def is_valid_url(url: str) -> bool:
"""检查 URL 是否合法"""
try:
result = urlparse(url)
# 验证是否包含 scheme如 http/https和 netloc如 example.com
return all([result.scheme, result.netloc])
except ValueError:
return False
# 创建目录
def create_sub_directory(base_dir, str):
# 获取 person 的前两个字母并转为小写
sub_dir = str[:1].lower()
full_path = os.path.join(base_dir, sub_dir)
if not os.path.exists(full_path):
os.makedirs(full_path)
return full_path
# 只提取movies url
def extract_id_from_href(href):
# 检查 URL 是否符合要求
if 'javdb.com/v/' in href:
# 定义正则表达式模式
pattern = r'javdb.com/v/([^?&]+)'
# 查找匹配项
match = re.search(pattern, href)
if match:
# 提取匹配的字符串并转换为小写
result = match.group(1).lower()
return result
return ''
# 保存抓取到的原始HTML方便后续核验
def write_raw_html(href, html_text):
# 获取目录
id = extract_id_from_href(href)
if 'javdb.com/v/' in href.lower():
dir_prefix = 'raw_movies'
else:
return
file_dir = create_sub_directory(f"{update_dir}/{dir_prefix}", id)
file_name = f"{id}.html" # 用 - 替换空格
full_path = os.path.join(file_dir, file_name)
try:
with open(full_path, 'w', encoding='utf-8') as file:
file.write(html_text)
except FileNotFoundError:
logging.warning(f"错误:指定的路径 {full_path} 不存在。")
except PermissionError:
logging.warning(f"错误:没有权限写入文件 {full_path}")
except Exception as e:
logging.warning(f"发生未知错误:{e}")
# 保存抓取到的原始HTML方便后续核验
def read_raw_html(href, expire_date_str="2025-03-01"):
# 获取目录
id = extract_id_from_href(href)
if 'javdb.com/v/' in href.lower():
dir_prefix = 'raw_movies'
else:
return
file_dir = create_sub_directory(f"{update_dir}/{dir_prefix}", id)
file_name = f"{id}.html" # 用 - 替换空格
full_path = os.path.join(file_dir, file_name)
try:
if os.path.exists(full_path):
# 获取文件的最后修改时间
last_modified_timestamp = os.path.getmtime(full_path)
# 将时间戳转换为 datetime 对象
last_modified_date = datetime.fromtimestamp(last_modified_timestamp)
# 检查文件最后修改时间是否晚于给定日期
expire_date = datetime.strptime(expire_date_str, "%Y-%m-%d")
if last_modified_date > expire_date:
logging.debug(f"find local file on href {href}")
with open(full_path, 'r', encoding='utf-8') as file:
return file.read()
else:
logging.debug(f"expired file {last_modified_date} on href {href}")
return None
else:
return None
except FileNotFoundError:
logging.warning(f"错误:指定的路径 {full_path} 不存在。")
except PermissionError:
logging.warning(f"错误:没有权限读取文件 {full_path}")
except Exception as e:
logging.warning(f"发生未知错误:{e}")
return None
# 去掉 https://www.javdb.com/makers/16w?f=download 后面的参数
def remove_url_query(url: str) -> str:
try:
parsed_url = urlparse(url)
clean_url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
return clean_url
except Exception as e:
print(f"解析 URL 失败: {e}")
return url
# 写csv文件
def json_to_csv(data, output_file):
if not data:
return
headers = list(data[0].keys())
with open(f"{update_dir}/{output_file}", 'w', encoding='utf-8', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=headers)
writer.writeheader()
for row in data:
writer.writerow(row)
def normalize_url(url: str) -> str:
"""
标准化URL移除语言前缀使不同语言版本的URL保持一致
示例:
https://www.javbus.com/ja/star/p8y → https://www.javbus.com/star/p8y
https://www.javbus.com/en/star/p8y → https://www.javbus.com/star/p8y
"""
try:
# 解析URL
parsed = urlparse(url)
# 提取路径部分
path = parsed.path
# 常见语言代码列表
LANGUAGES = {'ja', 'en', 'ko', 'zh', 'fr', 'de', 'es', 'ru'}
# 分割路径为组件
path_components = path.strip('/').split('/')
# 如果第一个组件是语言代码,则移除它
if path_components and path_components[0] in LANGUAGES:
path_components = path_components[1:]
# 重新构建标准化的路径
normalized_path = '/' + '/'.join(path_components)
# 构建标准化的URL保留协议和域名替换路径
normalized_url = parsed._replace(path=normalized_path).geturl()
return normalized_url
except Exception as e:
print(f"URL标准化失败: {url}, 错误: {e}")
return url # 出错时返回原始URL