This repository has been archived on 2026-01-07. You can view files and clone it, but cannot push or open issues or pull requests.
Files
resources/src/javbus/fetch.py
2025-06-26 09:48:55 +08:00

536 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import csv
import argparse
import textwrap
import logging
from functools import partial
from urllib.parse import urljoin, urlparse
import src.config.config as config
import src.logger.logger as logger
import src.db_utils.sqlite_db as sqlite_db
import src.crawling.craw as craw
import src.utils.utils as utils
logger.setup_logging()
db_tools = sqlite_db.JavbusDBHandler()
scraper = craw.JavbusCrawler()
debug = False
skip_local = False
g_uncensored = 0
update_mode = 0
# 获取演员列表
def fetch_actor_list_lang(lang="en", uncensored=None):
if uncensored:
un_flag = 1
s_url = f"/{lang}/uncensored/actresses" if lang != 'zh' else f"/uncensored/actresses"
else:
un_flag = 0
s_url = f"/{lang}/actresses" if lang != 'zh' else f"/actresses"
current_url = urljoin(scraper.host_url, s_url)
while current_url:
logging.info(f"fetching url {current_url}")
soup, status_code = scraper.fetch_page(current_url, partial(scraper.generic_validator, tag="div", identifier="waterfall", attr_type="id"))
if soup:
list_data, current_url = scraper.parse_actors_list(soup, current_url)
if list_data :
# 写入数据库
for row in list_data:
row[f'{lang}_name'] = row['name']
row['href'] = utils.normalize_url(row['href'])
row_id = db_tools.insert_actor_index(row, uncensored=un_flag, from_actor_list=1)
if row_id:
logging.debug(f'insert actor to db. row_id:{row_id}, data: {row}')
else:
logging.warning(f'insert actor failed. data: {row}')
else:
logging.warning(f'fetch actor error. {current_url} ...')
elif status_code :
logging.warning(f'fetch page error. httpcode: {status_code}, url: {current_url}')
break
else: # 达到失败上限,加上休眠继续重试
time.sleep(5)
time.sleep(0.3)
# 调试break
if debug:
return True
# 获取演员列表,控制逻辑,多语言
def fetch_actor_list():
if g_uncensored == 1:
for lang in ["en", "ja", "zh"]:
#for lang in ['en']:
fetch_actor_list_lang(lang=lang, uncensored=1)
elif g_uncensored ==0:
for lang in ["en", "ja", "zh"]:
#for lang in ['en']:
fetch_actor_list_lang(lang=lang)
else:
for lang in ["en", "ja", "zh"]:
#for lang in ['en']:
fetch_actor_list_lang(lang=lang, uncensored=1)
for lang in ["en", "ja", "zh"]:
#for lang in ['en']:
fetch_actor_list_lang(lang=lang)
# 从studio/label/series中获取影片
def fetch_movies_common(tbl):
if debug:
url_list = db_tools.query_list_common(tbl=tbl)
else:
if g_uncensored==1:
url_list = db_tools.query_list_common(tbl=tbl, uncensored=1)
elif g_uncensored==0:
url_list = db_tools.query_list_common(tbl=tbl, uncensored=0)
else:
url_list = db_tools.query_list_common(tbl=tbl)
for row in url_list:
url = row['href']
row_id = row['id']
uncensored = row['uncensored'] if row['uncensored'] > 0 else None
# 增加一个判断只有当url中包含uncensored时才算无码
uncensored = 1 if 'uncensored' in url.lower() else 0
if not utils.is_valid_url(url):
logging.info(f'invalid url ({url}) in {tbl}, row id: {row_id}. skipping...')
continue
# 去掉可下载的标志(如果有)
next_url = url
while next_url:
logging.info(f"Fetching data from {tbl} url {next_url} ...")
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="waterfall", attr_type="id"))
if soup:
list_data, next_url = scraper.parse_studios_labels_series_detail(soup, next_url)
if list_data:
# 根据tbl的值动态构建额外参数
extra_kwargs = {}
if tbl == 'studio':
extra_kwargs = {'from_movie_studios': 1, 'studio_id': row_id}
elif tbl == 'label':
extra_kwargs = {'from_movie_labels': 1, 'label_id': row_id}
elif tbl == 'series':
extra_kwargs = {'from_movie_series': 1, 'series_id': row_id}
extra_kwargs['uncensored'] = uncensored
for movie in list_data.get('movies', []):
tmp_id = db_tools.insert_movie_index({'title':movie['title'], 'href':movie['href']}, **extra_kwargs)
if tmp_id:
logging.debug(f"insert one movie index to db. movie_id: {tmp_id}, title: {movie['title']}, href: {movie['href']}")
else:
logging.warning(f"insert movie index failed. title: {movie['title']}, href: {movie['href']}")
else :
logging.warning(f'parse_page_movie error. url: {next_url}')
elif status_code and status_code == 404:
logging.warning(f"fetch page error. httpcode: {status_code}, url: {next_url}")
break
else: # 达到失败上限,加上休眠继续重试
time.sleep(5)
time.sleep(0.3)
# 调试增加brak
if debug:
return True
# 更新makers列表中的影片信息
def fetch_movies_by_studio():
fetch_movies_common('studio')
# 更新series列表中的影片信息
def fetch_movies_by_label():
fetch_movies_common('label')
# 更新series列表中的影片信息
def fetch_movies_by_series():
fetch_movies_common('series')
# 从studio/label/series首页获取他们的多语言表述
def update_multilang_common(tbl):
if debug:
url_list = db_tools.query_list_common(tbl=tbl, limit=3)
else:
if g_uncensored==1:
url_list = db_tools.query_list_common(tbl=tbl, uncensored=1)
elif g_uncensored==0:
url_list = db_tools.query_list_common(tbl=tbl, uncensored=0)
else:
url_list = db_tools.query_list_common(tbl=tbl)
for row in url_list:
url = row['href']
row_id = row['id']
if not utils.is_valid_url(url):
logging.info(f'invalid url ({url}) in {tbl}, row id: {row_id}. skipping...')
continue
langs_url = utils.generate_multilang_urls(url)
for lang, next_url in langs_url.items():
while next_url:
logging.info(f"Fetching data for url {next_url} ...")
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="waterfall", attr_type="id"))
if soup:
list_data, next_url = scraper.parse_studios_labels_series_detail(soup, next_url)
if list_data:
lang_meta = list_data.get('meta', {})
if lang_meta.get('title') is not None:
lang_meta['href'] = url
lang_meta[f'{lang}_name'] = lang_meta.get('title')
tmp_id = db_tools.update_pubs_multilang(lang_meta, tbl)
if tmp_id:
logging.debug(f'update pubs multi lang. data: {lang_meta}')
else:
logging.warning(f'update pubs multi lang failed. data: {lang_meta}')
else :
logging.warning(f'parse_page_movie error. url: {next_url}')
# 不要翻页,获取首页的即可
time.sleep(0.3)
break
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}')
break
else: # 达到失败上限,加上休眠继续重试
time.sleep(5)
if debug:
break
# 更新series列表中的影片信息
def update_multi_langs():
update_multilang_common('studio')
update_multilang_common('label')
update_multilang_common('series')
# 获取影片tags的多语言表述
def update_multilang_tags():
if debug:
url_list = db_tools.query_tags(limit=5)
else:
url_list = db_tools.query_tags()
for row in url_list:
url = row['href']
row_id = row['id']
if not utils.is_valid_url(url):
logging.info(f'invalid url ({url}), row id: {row_id}. skipping...')
continue
langs_url = utils.generate_multilang_urls(url)
for lang, next_url in langs_url.items():
while next_url:
logging.info(f"Fetching data for url {next_url} ...")
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="waterfall", attr_type="id"))
if soup:
list_data, next_url = scraper.parse_studios_labels_series_detail(soup, next_url)
if list_data:
lang_meta = list_data.get('meta', {})
if lang_meta.get('title') is not None:
lang_meta['href'] = url
lang_meta[f'{lang}_name'] = lang_meta.get('title')
tmp_id = db_tools.update_tags(lang_meta)
if tmp_id:
logging.debug(f'update tags multi lang. data: {lang_meta}')
else:
logging.warning(f'update tags multi lang failed. data: {lang_meta}')
else :
logging.warning(f'parse_page_movie error. url: {next_url}')
# 不要翻页,获取首页的即可
time.sleep(0.3)
break
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}')
break
else: # 达到失败上限,加上休眠继续重试
time.sleep(5)
if debug:
break
# 更新演员信息
def fetch_performers_detail():
limit_count = 5 if debug else 100
performers_list = []
last_performer_id = 0
abnormal_codes = [craw.http_code_404, craw.http_code_redirect]
def get_performers(**kwargs):
if g_uncensored == 1:
kwargs["uncensored"] = 1
elif g_uncensored == 0:
kwargs["uncensored"] = 0
else:
logging.debug(f"scan all records")
kwargs["order_by"] = 'id asc'
return db_tools.query_actors(limit=limit_count, **kwargs)
while True:
if update_mode == 0: # 只遍历新纪录
performers_list = get_performers(start_id=0, is_full_data=0)
elif update_mode == 1: # 只遍历完整纪录
performers_list = get_performers(start_id=last_performer_id, is_full_data=1)
elif update_mode == 2: # 0+1
performers_list = get_performers(start_id=last_performer_id, is_full_data_not_in=abnormal_codes)
elif update_mode == 3: # 其他
performers_list = get_performers(start_id=last_performer_id, is_full_data_in =abnormal_codes)
else: # 全部
performers_list = get_performers(start_id=last_performer_id)
if len(performers_list) < 1:
logging.info(f'all performers fetched.')
break
succ_rows = 0
for performer in performers_list:
url = performer['href']
person = performer['name']
uncensored = int(performer['uncensored'])
avatar = None
if not utils.is_valid_url(url):
logging.info(f'invalid url ({url}), name: {person}. skipping...')
continue
next_url = url
all_movies = []
need_insert = True
while next_url:
logging.debug(f"Fetching data for actor ({person}), url {next_url} ...")
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="alert alert-success alert-common", attr_type="class"))
if soup:
data, next_url = scraper.parse_actor_detail(soup, next_url)
if data:
if not avatar:
avatar = data.get('avatar')
all_movies.extend(data.get('movies', []))
elif status_code and status_code == craw.http_code_404:
actor_id = db_tools.update_actor_detail_404({'href': url, 'is_full_data': craw.http_code_404})
logging.warning(f'404 page. id: {actor_id}, name: ({person}), url: {url}, Skiping...')
need_insert = False
break
elif status_code and status_code == craw.http_code_redirect:
actor_id = db_tools.update_actor_detail_404({'href': url, 'is_full_data': craw.http_code_redirect})
logging.warning(f'401 page(need login). id: {actor_id}, name: ({person}), url: {url}, Skiping...')
need_insert = False
break
else:
logging.warning(f'fetch_page error. url: {url}')
time.sleep(2)
# 如果出现了401或者404已经处理直接跳过
if not need_insert:
continue
#utils.pretty_print_json(avatar)
#utils.pretty_print_json(all_movies)
#continue
# 获取完了个人的所有影片,开始插入数据
performer_id = db_tools.update_actor_detail({
'href': url,
'name': person,
'avatar': avatar,
'credits':all_movies,
'uncensored':uncensored
})
if performer_id:
logging.debug(f'insert/update one person, id: {performer_id}, person: ({person}), url: {url}')
last_performer_id = performer_id
succ_rows += 1
else:
logging.warning(f'insert person: ({person}) {url} failed.')
time.sleep(0.5)
logging.info(f'total request: {len(performers_list)}, succ: {succ_rows}, last performer id: {last_performer_id}')
# 调试break
if debug:
return True
# 更新影片信息
def fetch_movies_detail():
limit_count = 2 if debug else 100
movies_list = []
last_movie_id = 0
abnormal_codes = [craw.http_code_404, craw.http_code_redirect]
def get_movies(**kwargs):
if g_uncensored == 1:
kwargs["uncensored"] = 1
elif g_uncensored == 0:
kwargs["uncensored"] = 0
else:
logging.debug(f"scan all records.")
kwargs["order_by"] = 'id asc'
return db_tools.query_movies(limit=limit_count, **kwargs)
while True:
if update_mode == 0: # 只遍历新纪录
movies_list = get_movies(start_id=0, is_full_data=0)
elif update_mode == 1: # 只遍历完整纪录
movies_list = get_movies(start_id=last_movie_id, is_full_data=1)
elif update_mode == 2: # 0+1
movies_list = get_movies(start_id=last_movie_id, is_full_data_not_in=abnormal_codes)
elif update_mode == 3: # 其他
movies_list = get_movies(start_id=last_movie_id, is_full_data_in =abnormal_codes)
else: # 全部
movies_list = get_movies(start_id=last_movie_id)
if len(movies_list) < 1:
logging.info(f'all performers fetched.')
break
succ_count = 0
for movie in movies_list:
url = movie['href']
title = movie['title']
curr_id = movie['id']
uncensored = int(movie['uncensored'])
if not utils.is_valid_url(url):
logging.info(f'invalid url ({url}), row id: {curr_id}. skipping...')
continue
logging.debug(f"Fetching data for movie ({title}), url {url} ...")
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="container", attr_type="class"))
# 从本地读取的文件,忽略
if skip_local and status_code == craw.http_code_local :
last_movie_id = curr_id
succ_count += 1
continue
# 解析页面,写入数据库
if soup:
movie_data = scraper.parse_movie_detail(soup, url, title)
if movie_data :
#utils.pretty_print_json(movie_data)
#continue
movie_data['uncensored'] = uncensored
movie_id = db_tools.insert_or_update_movie(movie_data)
if movie_id:
logging.debug(f'insert one movie, id: {movie_id}, title: ({title}) url: {url}')
last_movie_id = movie_id
succ_count += 1
else:
logging.warning(f'insert movie {url} failed.')
else:
logging.warning(f'parse_page_movie error. url: {url}')
time.sleep(2)
elif status_code and status_code == craw.http_code_404:
movie_id = db_tools.insert_or_update_movie_404({'href': url, 'is_full_data': craw.http_code_404})
logging.warning(f'404 page. id: {movie_id}, title: ({title}), url: {url}, Skiping...')
elif status_code and status_code == craw.http_code_redirect:
movie_id = db_tools.insert_or_update_movie_404({'href': url, 'is_full_data': craw.http_code_redirect})
logging.warning(f'401 page(need login). id: {movie_id}, title: ({title}), url: {url}, Skiping...')
else:
logging.warning(f'fetch_page error. url: {url}')
time.sleep(0.5)
logging.info(f'total request: {len(movies_list)}, succ: {succ_count}. last movie id: {last_movie_id}')
# 调试增加break
if debug:
return True
# 建立缩写到函数的映射
function_map = {
"actor_list": fetch_actor_list,
"studio" : fetch_movies_by_studio,
"series" : fetch_movies_by_series,
"labels" : fetch_movies_by_label,
"actors" : fetch_performers_detail,
"movies" : fetch_movies_detail,
"langs" : update_multi_langs,
"tags" : update_multilang_tags,
}
# 主函数
def main(cmd, args):
# 开启任务
task_id = db_tools.insert_task_log()
if task_id is None:
logging.warning(f'insert task log error.')
return None
logging.info(f"running task. id: {task_id}, args: {args}")
# 执行指定的函数
if cmd:
function_names = args.cmd.split(",") # 拆分输入
for short_name in function_names:
func = function_map.get(short_name.strip()) # 从映射中获取对应的函数
if callable(func):
db_tools.update_task_log(task_id, task_status=f'Running {short_name}')
func()
else:
logging.warning(f" {short_name} is not a valid function shortcut.")
else: # 全量执行
for name, func in function_map.items():
if callable(func):
db_tools.update_task_log(task_id, task_status=f'Running {name}')
func()
else:
logging.warning(f" {short_name} is not a valid function shortcut.")
logging.info(f'all process completed!')
db_tools.finalize_task_log(task_id)
# TODO:
# 1, tags 和 studio / label / series 的多语言
# 设置环境变量
def set_env(args):
global debug
debug = args.debug
if debug:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
global skip_local
skip_local = args.skip_local
global g_uncensored
g_uncensored = args.uncensored
global update_mode
if args.update:
update_mode = args.update
if __name__ == "__main__":
# 命令行参数处理
keys_str = ",".join(function_map.keys())
usage_examples = textwrap.dedent('''
示例用法:
python3 ./fetch.py # 遍历新增的所有记录
python3 ./fetch.py --uncensored=1 # 遍历新增的 uncensored 记录(无码片)
python3 ./fetch.py --uncensored=0 # 遍历新增的 非uncensored 记录(有码片)
python3 ./fetch.py --uncensored=2 # 遍历所有新增
python3 ./fetch.py --update=4 # 遍历全量的记录
python3 ./fetch.py --update=4 --uncensored=1 # 遍历全量的 uncensored 记录(无码片)
python3 ./fetch.py --update=4 --uncensored=0 # 遍历全量的 非uncensored 记录(有码片)
python3 ./fetch.py --update=4 --uncensored=2 # 遍历全量记录
''')
parser = argparse.ArgumentParser(
description='fetch javdb data.\n\n' + usage_examples,
formatter_class=argparse.RawDescriptionHelpFormatter
)
#parser = argparse.ArgumentParser(description='fetch javdb data.')
parser.add_argument("--cmd", type=str, help=f"Comma-separated list of function shortcuts: {keys_str}")
parser.add_argument('--update', type=int, choices=[0, 1, 2, 3, 4], default=0, help='0-只遍历is_full_data=0(默认), 1-只遍历is_full_data=1, 2-遍历is_full_data<=1, 3-只遍历is_full_data>1(异常数据), 4-遍历所有')
parser.add_argument('--uncensored', type=int, choices=[0, 1, 2], default=1, help='1-只遍历所有 uncensored 的 makers/series/actors/movies(默认), 0-与前者相反, 2-全量')
parser.add_argument('--skip_local', action='store_true', help='如果本地缓存了页面,则跳过数据库操作')
parser.add_argument('--debug', action='store_true', help='Enable debug mode (limit records)')
args = parser.parse_args()
set_env(args)
main(args.cmd, args)