From 12c53b043d15b9277527e1e7bb7cc5eb072629a0 Mon Sep 17 00:00:00 2001 From: oscarz Date: Tue, 24 Jun 2025 10:02:28 +0800 Subject: [PATCH] modify scripts --- src/config/config.py | 39 ++ src/crawling/craw_common.py | 71 +++ src/crawling/craw_javbus.py | 515 +++++++++++++++++ src/db_utils/db_common.py | 121 ++++ src/db_utils/db_javbus.py | 1036 +++++++++++++++++++++++++++++++++++ src/javbus/fetch.py | 521 ++++++++++++++++++ src/logger/logger.py | 99 ++++ src/utils/utils.py | 167 ++++++ 8 files changed, 2569 insertions(+) create mode 100644 src/config/config.py create mode 100644 src/crawling/craw_common.py create mode 100644 src/crawling/craw_javbus.py create mode 100644 src/db_utils/db_common.py create mode 100644 src/db_utils/db_javbus.py create mode 100644 src/javbus/fetch.py create mode 100644 src/logger/logger.py create mode 100644 src/utils/utils.py diff --git a/src/config/config.py b/src/config/config.py new file mode 100644 index 0000000..b459ac2 --- /dev/null +++ b/src/config/config.py @@ -0,0 +1,39 @@ +import os +from pathlib import Path + +# MySQL 配置 +db_config = { + 'host': 'testdb', + 'user': 'root', + 'password': 'mysqlpw', + 'database': 'stockdb' +} + +home_dir = os.path.expanduser("~") +global_host_data_dir = f'{home_dir}/hostdir/scripts_data' +global_share_data_dir = f'{home_dir}/sharedata' + +# 获取当前文件所在目录 +current_dir = Path(__file__).resolve().parent + +# 找到项目根目录,假设项目根目录下有一个 src 文件夹 +project_root = current_dir +while project_root.name != 'src' and project_root != project_root.parent: + project_root = project_root.parent + +# 获取 src 目录 +def get_src_directory(): + return project_root + +# 获取 src/config 目录 +def get_src_config_directory(): + return project_root / 'config' + +# 获取 log 目录 +def get_log_directory(): + """ + 获取与 src 平行的 log 目录路径。如果 log 目录不存在,则自动创建。 + """ + log_dir = project_root.parent / 'log' + log_dir.mkdir(parents=True, exist_ok=True) + return log_dir \ No newline at end of file diff --git a/src/crawling/craw_common.py b/src/crawling/craw_common.py new file mode 100644 index 0000000..ab17f6b --- /dev/null +++ b/src/crawling/craw_common.py @@ -0,0 +1,71 @@ +import logging +import cloudscraper +from bs4 import BeautifulSoup +import src.utils.utils as utils + +# 设置 headers 和 scraper +headers = { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0' +} +# 定义 cookie +cookies = { +} +scraper = cloudscraper.create_scraper() + +http_code_404 = 404 +http_code_login = 401 +http_code_local = 99 +logging.getLogger().setLevel(logging.DEBUG) +#使用 CloudScraper 进行网络请求,并执行页面验证,支持不同解析器和预处理 +def fetch_page(url, validator, max_retries=3, parser="html.parser", preprocessor=None, headers=headers, cookies=cookies): + for attempt in range(max_retries): + try: + if not utils.is_valid_url(url): + logging.error(f'wrong url format: {url}') + return None, None + + response = scraper.get(url, headers=headers, cookies=cookies) + + # 处理 HTTP 状态码 + if response.status_code == 404: + logging.debug(f"Page not found (404): {url}") + return None, http_code_404 # 直接返回 404,调用方可以跳过 + + response.raise_for_status() # 处理 HTTP 错误 + + # 检查是否发生跳转,比如到登录页面 + if response.history: + logging.debug(f"Page redirected on {url}. Checking if it's a login page.") + soup = BeautifulSoup(response.text, parser) + # 判断是否为登录页面, + if soup.find('div', id='ageVerify'): + logging.warning(f"Page redirected to login page on {url}.") + return None, http_code_login + + # 预处理 HTML(如果提供了 preprocessor) + html_text = preprocessor(response.text) if preprocessor else response.text + + soup = BeautifulSoup(html_text, parser) + if validator(soup): # 进行自定义页面检查 + return soup, response.status_code + + logging.warning(f"Validation failed on attempt {attempt + 1} for {url}") + except cloudscraper.exceptions.CloudflareChallengeError as e: + logging.error(f"Cloudflare Challenge Error on {url}: {e}, Retring...") + except cloudscraper.exceptions.CloudflareCode1020 as e: + logging.error(f"Access Denied (Error 1020) on {url}: {e}, Retring...") + except Exception as e: + logging.error(f"Unexpected error on {url}: {e}, Retring...") + + logging.error(f'Fetching failed after max retries. {url}') + return None, None # 达到最大重试次数仍然失败 + +# 通用的 HTML 结构验证器 +def generic_validator(soup, tag, identifier, attr_type="id"): + if attr_type == "id": + return soup.find(tag, id=identifier) is not None + elif attr_type == "class": + return bool(soup.find_all(tag, class_=identifier)) + elif attr_type == "name": + return bool(soup.find('select', {'name': identifier})) + return False diff --git a/src/crawling/craw_javbus.py b/src/crawling/craw_javbus.py new file mode 100644 index 0000000..f36d0b8 --- /dev/null +++ b/src/crawling/craw_javbus.py @@ -0,0 +1,515 @@ +import cloudscraper +import logging +import re +import json +from functools import partial +from urllib.parse import urljoin +import src.config.config as config +import src.utils.utils as utils +import src.crawling.craw_common as scraper + +# 定义基础 URL 和可变参数 +host_url = "https://www.javbus.com" + +headers = { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Sec-Fetch-Site": "none", + "Accept-Encoding": "gzip, deflate, br", + "Sec-Fetch-Mode": "navigate", + "Host": "www.javbus.com", + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15", + "Accept-Language": "zh-CN,zh-Hans;q=0.9", + "Sec-Fetch-Dest": "document", + "Connection": "keep-alive", +} + +cookies = { + 'PHPSESSID': 'l9m4ugaaao1hgvl3micr22u3o6', + 'existmag': 'all', + 'age': 'verified' +} + +# 解析 HTML 内容,提取需要的数据 +def parse_actors_list(soup, href): + div_actors = soup.find("div", id='waterfall') + if not div_actors: + logging.warning(f"Warning: No actors div found ") + return None, None + + # 解析元素 + rows = div_actors.find_all('div', class_='item') + + list_data = [] + next_url = None + for row in rows: + # 获取演员详情链接 + actor_link = row.find('a')['href'] + # 获取演员名字 + actor_name = row.find('span').text.strip() + # 获取头像图片链接 + avatar_url = row.find('img')['src'] + + list_data.append({ + 'name' : actor_name, + 'href' : urljoin(host_url, actor_link), + 'pic' : avatar_url + }) + + # 查找 "下一页" 按钮 + div_link = soup.find("div", class_='text-center hidden-xs') + if div_link: + next_page_element = soup.find('a', id='next') + if next_page_element: + next_page_url = next_page_element['href'] + next_url = urljoin(href, next_page_url) + + return list_data, next_url + + +# 解析 HTML 内容,提取需要的数据 +def parse_actor_detail(soup, href): + # 先找一下别名 + alias_list = [] + + div_meta = soup.find('span', class_='actor-section-name') + if not div_meta: + logging.warning(f'warning: no meta data found in page {href}') + return None, None + alias_div = soup.find('div', class_='column section-title') + + if alias_div: + meta_list = alias_div.find_all('span', class_='section-meta') + if len(meta_list) > 1: + alias_list = meta_list[0].text.strip().split(", ") + + # 头像 + pic = '' + avatar = soup.find("div", class_="column actor-avatar") + if avatar: + pic = parse_avatar_image(avatar) + + # 返回数据 + actor = {} + + # 使用正则表达式查找 class 包含 'movie-list h cols-4' 的 div 元素 + div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-')) + #div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5') + #div_movies = soup.find("div", class_='movie-list h cols-4 vcols-8') + if not div_movies: + logging.warning(f"Warning: No movies div found ") + return None, None + + # 解析元素 + rows = div_movies.find_all('div', class_='item') + + list_data = [] + next_url = None + for row in rows: + link = row.find('a', class_='box')['href'] + serial_number = row.find('strong').text.strip() + title = row.find('div', class_='video-title').text.strip() + release_date = row.find('div', class_='meta').text.strip() + list_data.append({ + 'href' : host_url + link if link else '', + 'serial_number' : serial_number, + 'title' : title, + 'release_date': release_date + }) + + # 查找 "下一页" 按钮 + next_page_element = soup.find('a', class_='pagination-next') + if next_page_element: + next_page_url = next_page_element['href'] + next_page_number = url_page_num(next_page_url) + current_page_number = url_page_num(href) + logging.debug(f'current_page: {current_page_number}, next page_num: {next_page_number}') + if current_page_number is None: + current_page_number = 0 + if next_page_number and next_page_number > current_page_number : + next_url = host_url + next_page_url + + actor = { + 'pic' : pic, + 'alias' : alias_list, + 'movies' : list_data + } + + return actor, next_url + + +# 解析单个元素 +def parse_movie_one(soup, keys): + key_strong = soup.find('strong', string=lambda text: text in keys) + if key_strong: + key_span = key_strong.find_next_sibling('span', class_='value') + if key_span: + return key_span.text.strip() + return None + +# 解析值和链接 +def parse_movie_val_href(soup, keys): + key_strong = soup.find('strong', string=lambda text: text in keys) + if key_strong: + key_span = key_strong.find_next_sibling('span', class_='value') + if key_span: + a_tag = key_span.find('a') + if a_tag: + return a_tag.text.strip(), host_url + a_tag.get('href') + else: + return key_span.text.strip(), None + return None, None + +# 解析多个值和链接 +def parse_movie_arr(soup, keys): + key_strong = soup.find('strong', string=lambda text: text in keys) + if key_strong: + key_span = key_strong.find_next_sibling('span', class_='value') + if key_span: + actors = [] + a_tags = key_span.find_all('a') + for a_tag in a_tags: + actors.append({ + 'name': a_tag.text.strip(), + 'href': host_url + a_tag.get('href') + }) + return actors + return [] + +# 解析 HTML 内容,提取需要的数据 +def parse_movie_detail(soup, href, title): + div_video = soup.find("div", class_='video-meta-panel') + if not div_video: + logging.warning(f"Warning: No movies div found ") + return None, None + + result = {} + result['href'] = href + result['title'] = title + + # 获取封面图片 + cover_img = soup.select_one('.column-video-cover a') + result['cover_url'] = cover_img['href'] if cover_img else None + + # 获取番号 + result['serial_number'] = parse_movie_one(soup, ['番號:', 'ID:']) + result['release_date'] = parse_movie_one(soup, ['日期:', 'Released Date:']) + result['duration'] = parse_movie_one(soup, ['時長:', 'Duration:']) + + # 获取maker,系列 + result['maker_name'], result['maker_link'] = parse_movie_val_href(soup, ['片商:', 'Maker:']) + result['series_name'], result['series_link'] = parse_movie_val_href(soup, ['系列:', 'Series:']) + result['pub_name'], result['pub_link'] = parse_movie_val_href(soup, ['發行:', 'Publisher:']) + + # 获取演员,tags + result['tags'] = parse_movie_arr(soup, ['類別:', 'Tags:']) + result['actors'] = parse_movie_arr(soup, ['演員:', 'Actor(s):']) + + return result + +# 解析 HTML 内容,提取需要的数据 +def parse_series_uncensored(soup, href): + div_series = soup.find("div", id='series') + if not div_series: + logging.warning(f"Warning: No div_series div found ") + return None, None + + # 解析元素 + rows = div_series.find_all('a', class_='box') + + list_data = [] + next_url = None + for row in rows: + name = row.find('strong').text.strip() + href = row['href'] + div_movies = row.find('span') + movies = 0 + if div_movies: + match = re.search(r'\((\d+)\)', div_movies.text.strip()) + if match: + movies = int(match.group(1)) + + list_data.append({ + 'name' : name, + 'href' : host_url + href if href else '', + 'movies' : movies + }) + + # 查找 "下一页" 按钮 + next_page_element = soup.find('a', class_='pagination-next') + if next_page_element: + next_page_url = next_page_element['href'] + next_page_number = url_page_num(next_page_url) + current_page_number = url_page_num(href) + if current_page_number is None: + current_page_number = 0 + if next_page_number and next_page_number > current_page_number : + next_url = host_url + next_page_url + + return list_data, next_url + + +# 解析 HTML 内容,提取需要的数据 +def parse_series_detail(soup, href): + #div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5') + div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)')) + if not div_movies: + logging.warning(f"Warning: No movies div found ") + return [], None + + # 解析元素 + rows = div_movies.find_all('div', class_='item') + + list_data = [] + next_url = None + for row in rows: + link = row.find('a', class_='box')['href'] + serial_number = row.find('strong').text.strip() + title = row.find('div', class_='video-title').text.strip() + release_date = row.find('div', class_='meta').text.strip() + list_data.append({ + 'href' : host_url + link if link else '', + 'serial_number' : serial_number, + 'title' : title, + 'release_date': release_date + }) + + # 查找 "下一页" 按钮 + next_page_element = soup.find('a', class_='pagination-next') + if next_page_element: + next_page_url = next_page_element['href'] + next_page_number = url_page_num(next_page_url) + current_page_number = url_page_num(href) + if current_page_number is None: + current_page_number = 0 + if next_page_number and next_page_number > current_page_number : + next_url = host_url + next_page_url + + return list_data, next_url + + +# 解析 HTML 内容,提取需要的数据 +def parse_makers_uncensored(soup, href): + div_series = soup.find("div", id='makers') + if not div_series: + logging.warning(f"Warning: No makers div found ") + return None, None + + # 解析元素 + rows = div_series.find_all('a', class_='box') + + list_data = [] + next_url = None + for row in rows: + name = row.find('strong').text.strip() + href = row['href'] + div_movies = row.find('span') + movies = 0 + if div_movies: + match = re.search(r'\((\d+)\)', div_movies.text.strip()) + if match: + movies = int(match.group(1)) + + list_data.append({ + 'name' : name, + 'href' : host_url + href if href else '', + 'movies' : movies + }) + + # 查找 "下一页" 按钮 + next_page_element = soup.find('a', class_='pagination-next') + if next_page_element: + next_page_url = next_page_element['href'] + next_page_number = url_page_num(next_page_url) + current_page_number = url_page_num(href) + if current_page_number is None: + current_page_number = 0 + if next_page_number and next_page_number > current_page_number : + next_url = host_url + next_page_url + + return list_data, next_url + + +# 解析 HTML 内容,提取需要的数据 +def parse_maker_detail(soup, href): + #div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5') + div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)')) + if not div_movies: + logging.warning(f"Warning: No movies div found ") + return [], None + + # 解析元素 + rows = div_movies.find_all('div', class_='item') + + list_data = [] + next_url = None + for row in rows: + link = row.find('a', class_='box')['href'] + serial_number = row.find('strong').text.strip() + title = row.find('div', class_='video-title').text.strip() + release_date = row.find('div', class_='meta').text.strip() + list_data.append({ + 'href' : host_url + link if link else '', + 'serial_number' : serial_number, + 'title' : title, + 'release_date': release_date + }) + + # 查找 "下一页" 按钮 + next_page_element = soup.find('a', class_='pagination-next') + if next_page_element: + next_page_url = next_page_element['href'] + next_page_number = url_page_num(next_page_url) + current_page_number = url_page_num(href) + if current_page_number is None: + current_page_number = 0 + if next_page_number and next_page_number > current_page_number : + next_url = host_url + next_page_url + + return list_data, next_url + +# 解析 HTML 内容,提取需要的数据 +def parse_publisher_detail(soup, href): + #div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5') + div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)')) + if not div_movies: + logging.warning(f"Warning: No movies div found ") + return [], None + + # 解析元素 + rows = div_movies.find_all('div', class_='item') + + list_data = [] + next_url = None + for row in rows: + link = row.find('a', class_='box')['href'] + serial_number = row.find('strong').text.strip() + title = row.find('div', class_='video-title').text.strip() + release_date = row.find('div', class_='meta').text.strip() + list_data.append({ + 'href' : host_url + link if link else '', + 'serial_number' : serial_number, + 'title' : title, + 'release_date': release_date + }) + + # 查找 "下一页" 按钮 + next_page_element = soup.find('a', class_='pagination-next') + if next_page_element: + next_page_url = next_page_element['href'] + next_page_number = url_page_num(next_page_url) + current_page_number = url_page_num(href) + if current_page_number is None: + current_page_number = 0 + if next_page_number and next_page_number > current_page_number : + next_url = host_url + next_page_url + + return list_data, next_url + + +# 解析 HTML 内容,提取需要的数据 +def parse_uncensored(soup, href): + #div_movies = soup.find("div", class_='movie-list h cols-4 vcols-8') + div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)')) + if not div_movies: + logging.warning(f"Warning: No movies div found ") + return [], None + + # 解析元素 + rows = div_movies.find_all('div', class_='item') + + list_data = [] + next_url = None + for row in rows: + link = row.find('a', class_='box')['href'] + serial_number = row.find('strong').text.strip() + title = row.find('div', class_='video-title').text.strip() + release_date = row.find('div', class_='meta').text.strip() + list_data.append({ + 'href' : host_url + link if link else '', + 'serial_number' : serial_number, + 'title' : title, + 'release_date': release_date + }) + + # 查找 "下一页" 按钮 + next_page_element = soup.find('a', class_='pagination-next') + if next_page_element: + next_page_url = next_page_element['href'] + next_page_number = url_page_num(next_page_url) + current_page_number = url_page_num(href) + if current_page_number is None: + current_page_number = 0 + if next_page_number and next_page_number > current_page_number : + next_url = host_url + next_page_url + + return list_data, next_url + + +def pretty_print_json(data, n=10, indent=4, sort_keys=False): + """ + 以美化格式打印数组的前n个元素,其他元素用"..."表示 + + 参数: + - data: 要打印的数据(应为数组) + - n: 要显示的元素数量 + - indent: 缩进空格数 + - sort_keys: 是否按键排序 + """ + try: + # 处理非数组数据 + if not isinstance(data, list): + print(formatted) + return + + # 复制原始数据,避免修改原数组 + data_copy = data.copy() + + # 切片取前n个元素 + first_n_elements = data_copy[:n] + + # 如果数组长度超过n,添加"..."标记 + if len(data) > n: + result = first_n_elements + ["... ({} more elements)".format(len(data) - n)] + else: + result = first_n_elements + + # 格式化输出 + formatted = json.dumps(result, indent=indent, ensure_ascii=False, sort_keys=sort_keys) + print(formatted) + + except TypeError as e: + print(f"错误:无法格式化数据。详情:{e}") + except Exception as e: + print(f"打印时发生意外错误:{e}") + +def test_actor_list(url='https://www.javbus.com/uncensored/actresses/1'): + next_url = url + all_data = [] + while next_url: + print(f'fetching page {next_url}') + soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="waterfall", attr_type="id"),max_retries=1, headers=headers, cookies=cookies) + if soup: + list_data, next_url = parse_actors_list(soup, next_url) + if list_data : + all_data.extend(list_data) + pretty_print_json(all_data) + else: + print('get wrong page.') + + if next_url: + print(f"\n\nnext url: {next_url}") + else: + print(f"wrong request. url: {next_url}, status_code: {status_code}") + + break + +if __name__ == "__main__": + #test_actors_list() + #test_actor() + #test_movie_detail() + #test_series_list() + #test_series_detail() + logging.getLogger().setLevel(logging.DEBUG) + test_actor_list() + test_actor_list('https://www.javbus.com/en/actresses') + \ No newline at end of file diff --git a/src/db_utils/db_common.py b/src/db_utils/db_common.py new file mode 100644 index 0000000..6f1969e --- /dev/null +++ b/src/db_utils/db_common.py @@ -0,0 +1,121 @@ +import sqlite3 +import json +import logging +from datetime import datetime +import src.config.config as config + +# 连接 SQLite 数据库 +DB_PATH = f"{config.global_share_data_dir}/sqlite/shared.db" # 替换为你的数据库文件 + +# 检查 SQLite 版本 +lower_sqlite_version = False +sqlite_version = sqlite3.sqlite_version_info +if sqlite_version < (3, 24, 0): + lower_sqlite_version = True + +# 获取表的列名和默认值 +def get_table_columns_and_defaults(cursor, tbl_name): + try: + cursor.execute(f"PRAGMA table_info({tbl_name})") + columns = cursor.fetchall() + column_info = {} + for col in columns: + col_name = col[1] + default_value = col[4] + column_info[col_name] = default_value + return column_info + except sqlite3.Error as e: + logging.error(f"Error getting table columns: {e}") + return None + +# 检查并处理数据 +def check_and_process_data(cursor, data, tbl_name): + column_info = get_table_columns_and_defaults(cursor=cursor, tbl_name=tbl_name) + if column_info is None: + return None + processed_data = {} + for col, default in column_info.items(): + if col == 'id' or col == 'created_at': # 自增主键,不需要用户提供; 创建日期,使用建表默认值 + continue + if col == 'updated_at': # 日期函数,用户自己指定即可 + processed_data[col] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + if col in data: + processed_data[col] = data[col] + + return processed_data + + +# 插入或更新数据 +def insert_or_update_common(cursor, conn, data, tbl_name, uniq_key='url'): + if lower_sqlite_version: + return insert_or_update_common_lower(cursor, conn, data, tbl_name, uniq_key) + + try: + processed_data = check_and_process_data(cursor, data, tbl_name) + if processed_data is None: + return None + + columns = ', '.join(processed_data.keys()) + values = list(processed_data.values()) + placeholders = ', '.join(['?' for _ in values]) + update_clause = ', '.join([f"{col}=EXCLUDED.{col}" for col in processed_data.keys() if col != uniq_key]) + + sql = f''' + INSERT INTO {tbl_name} ({columns}) + VALUES ({placeholders}) + ON CONFLICT ({uniq_key}) DO UPDATE SET {update_clause} + ''' + cursor.execute(sql, values) + conn.commit() + + # 获取插入或更新后的 report_id + cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],)) + report_id = cursor.fetchone()[0] + return report_id + except sqlite3.Error as e: + logging.error(f"Error inserting or updating data: {e}") + return None + +# 插入或更新数据 +def insert_or_update_common_lower(cursor, conn, data, tbl_name, uniq_key='url'): + try: + processed_data = check_and_process_data(cursor, data, tbl_name) + if processed_data is None: + return None + + columns = ', '.join(processed_data.keys()) + values = list(processed_data.values()) + placeholders = ', '.join(['?' for _ in values]) + + # 先尝试插入数据 + try: + sql = f''' + INSERT INTO {tbl_name} ({columns}) + VALUES ({placeholders}) + ''' + cursor.execute(sql, values) + conn.commit() + except sqlite3.IntegrityError: # 唯一键冲突,执行更新操作 + update_clause = ', '.join([f"{col}=?" for col in processed_data.keys() if col != uniq_key]) + update_values = [processed_data[col] for col in processed_data.keys() if col != uniq_key] + update_values.append(data[uniq_key]) + sql = f"UPDATE {tbl_name} SET {update_clause} WHERE {uniq_key} = ?" + cursor.execute(sql, update_values) + conn.commit() + + # 获取插入或更新后的 report_id + cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],)) + report_id = cursor.fetchone()[0] + return report_id + except sqlite3.Error as e: + logging.error(f"Error inserting or updating data: {e}") + return None + + +# 测试代码 +if __name__ == "__main__": + conn = sqlite3.connect(DB_PATH, check_same_thread=False) + cursor = conn.cursor() + + tbl_name_actors = 'javhd_models' + print(get_table_columns_and_defaults(cursor, tbl_name_actors)) diff --git a/src/db_utils/db_javbus.py b/src/db_utils/db_javbus.py new file mode 100644 index 0000000..6c68a84 --- /dev/null +++ b/src/db_utils/db_javbus.py @@ -0,0 +1,1036 @@ +import sqlite3 +import json +import logging +from datetime import datetime +import src.config.config as config +import src.db_utils.db_common as db_comm + +# 连接 SQLite 数据库 +conn = sqlite3.connect(db_comm.DB_PATH, check_same_thread=False) +cursor = conn.cursor() + +cached_tags = {} +tbl_name_actors = 'javbus_actors' + +# 插入books表,并判断是否需要更新 +def insert_actor_index(data, uncensored=0, from_actor_list=0, from_movie_list=0): + data['uncensored'] = uncensored + if from_actor_list: + data['from_actor_list'] = from_actor_list + if from_movie_list: + data['from_movie_list'] = from_movie_list + try: + return db_comm.insert_or_update_common(cursor, conn, data, tbl_name_actors, uniq_key='href') + except sqlite3.Error as e: + logging.error(f"Error inserting or updating data: {e}") + return None + +# 更新详细信息 +def update_actor_detail(data, is_full_data=1): + try: + data['is_full_data'] = is_full_data + + return db_comm.insert_or_update_common(cursor, data, conn, tbl_name_actors, uniq_key='href') + + except sqlite3.Error as e: + logging.error(f"Error inserting or updating data: {e}") + return None + +# 查询 +def query_actors(**filters): + try: + sql = f"SELECT url, en_name as name FROM {tbl_name_actors} WHERE 1=1" + params = [] + + conditions = { + "id": " AND id = ?", + "url": " AND href = ?", + "en_name": " AND name LIKE ?", + "is_full_data": " AND is_full_data = ?", + "start_id": " AND id > ?", + } + + for key, condition in conditions.items(): + if key in filters: + sql += condition + if key == "en_name": + params.append(f"%{filters[key]}%") + else: + params.append(filters[key]) + + for key in ["is_full_data_in", "is_full_data_not_in"]: + if key in filters: + values = filters[key] + if values: + placeholders = ", ".join(["?"] * len(values)) + operator = "IN" if key == "is_full_data_in" else "NOT IN" + sql += f" AND is_full_data {operator} ({placeholders})" + params.extend(values) + + if "order_by" in filters: + # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 + sql += f" ORDER BY {filters['order_by']} " + + if 'limit' in filters: + sql += " LIMIT ?" + params.append(filters["limit"]) + + cursor.execute(sql, params) + #return [row[0].lower() for row in cursor.fetchall()] # 返回小写 + return [{'url': row[0], 'name': row[1]} for row in cursor.fetchall()] + + except sqlite3.Error as e: + logging.error(f"查询 href 失败: {e}") + return None + + + + +# """从指定表中通过 href 查找 id""" +def get_id_by_href(table: str, href: str) -> int: + if href is None: + return None + cursor.execute(f"SELECT id FROM {table} WHERE href = ?", (href,)) + row = cursor.fetchone() + return row[0] if row else None + +def insert_movie_index(title, href, **kwargs): + try: + # 先检查数据库中是否已有该电影 + cursor.execute("SELECT * FROM javdb_movies WHERE href = ?", (href,)) + existing_movie = cursor.fetchone() + + # 获取列名 + column_names = [description[0] for description in cursor.description] + + fields = [ + 'from_actor_list', 'from_movie_makers', 'from_movie_series', 'from_movie_publishers', + 'maker_id', 'series_id', 'pub_id', 'uncensored' + ] + + if existing_movie: # 如果电影已存在 + existing_values = dict(zip(column_names, existing_movie)) + movie_id = existing_values['id'] + logging.debug(f"values in db: {existing_values}") + + # 如果没有传入值,就用原来的值 + for field in fields: + kwargs[field] = kwargs.get(field) if kwargs.get(field) is not None else existing_values[field] + + set_clauses = ", ".join([f"{field} = ?" for field in fields]) + sql = f""" + UPDATE javdb_movies + SET title = ?, {set_clauses}, updated_at = datetime('now', 'localtime') + WHERE href = ? + """ + values = [title] + [kwargs[field] for field in fields] + [href] + logging.debug(f"sql: {sql}, values: {values}") + cursor.execute(sql, values) + else: # 如果电影不存在,插入 + columns = ', '.join(['title', 'href'] + fields) + placeholders = ', '.join(['?'] * (len(fields) + 2)) + sql = f"INSERT INTO javdb_movies ({columns}) VALUES ({placeholders})" + values = [title, href] + [kwargs.get(field, 0) for field in fields] + logging.debug(f"sql: {sql}, values: {values}") + cursor.execute(sql, values) + + conn.commit() + + movie_id = get_id_by_href('javdb_movies', href) + if movie_id: + logging.debug(f'Inserted/Updated movie index, id: {movie_id}, title: {title}, href: {href}') + + return movie_id + + except Exception as e: + conn.rollback() + logging.error(f"Error inserting/updating movie: {e}") + return None + + +# 插入演员和电影的关联数据 +def insert_actor_movie(performer_id, movie_id, tags=''): + try: + cursor.execute(""" + INSERT INTO javdb_actors_movies (actor_id, movie_id, tags, updated_at) + VALUES (?, ?, ?, datetime('now', 'localtime')) + ON CONFLICT(actor_id, movie_id) DO UPDATE SET tags=excluded.tags, updated_at=datetime('now', 'localtime') + """, + (performer_id, movie_id, tags) + ) + conn.commit() + + #logging.debug(f'insert one performer_movie, performer_id: {performer_id}, movie_id: {movie_id}') + + return performer_id + + except Exception as e: + conn.rollback() + logging.error("Error inserting movie: %s", e) + return None + +# 插入演员数据 +def insert_or_update_actor(actor): + try: + cursor.execute(''' + INSERT INTO javdb_actors (name, href, pic, is_full_data, updated_at) + VALUES (?, ?, ?, 1, datetime('now', 'localtime')) + ON CONFLICT(href) DO UPDATE SET name=excluded.name, pic=excluded.pic, is_full_data=1, updated_at=datetime('now', 'localtime') + ''', (actor['name'], actor['href'], actor['pic'])) + + conn.commit() + + # 查询刚插入的数据 + cursor.execute('SELECT id, from_actor_list FROM javdb_actors WHERE href = ?', (actor['href'],)) + actor_id, uncensored = cursor.fetchone() + if actor_id is None: + logging.warning(f'insert data error. name: {actor['name']}, href: {actor['href']}') + return None + + logging.debug(f'insert one actor, id: {actor_id}, name: {actor['name']}, href: {actor['href']}') + + # 插入别名 + for alias in actor.get("alias") or []: + cursor.execute(''' + INSERT OR IGNORE INTO javdb_actors_alias (actor_id, alias, updated_at) + VALUES (?, ?, datetime('now', 'localtime')) + ''', (actor_id, alias)) + + conn.commit() + + # 插入影片列表 + for movie in actor.get("credits") or []: + # from_actor_list = 1 表示无码影星的,其他不处理 + if uncensored and uncensored > 0: + movie_id = insert_movie_index(movie['title'], movie['href'], from_actor_list=1, uncensored=uncensored) + else: + movie_id = insert_movie_index(movie['title'], movie['href'], from_actor_list=1) + if movie_id: + tmp_id = insert_actor_movie(actor_id, movie_id) + if tmp_id : + logging.debug(f'insert one performer_movie, performer_id: {actor_id}, movie_id: {movie_id}') + else: + logging.warning(f'insert performer_movie failed. performer_id: {actor_id}, moive href: {movie['href']}') + + return actor_id + except Exception as e: + logging.error(f"插入/更新演员 {actor['name']} 失败: {e}") + conn.rollback() + +# """插入或更新电影数据(异常url的处理,比如404链接)""" +def insert_or_update_actor_404(name, href, is_full_data=1): + try: + # 插入或更新电影信息 + cursor.execute( + """ + INSERT INTO javdb_actors (name, href, is_full_data, updated_at) + VALUES (?, ?, ?, datetime('now', 'localtime')) + ON CONFLICT(href) DO UPDATE SET + name=excluded.name, is_full_data=excluded.is_full_data, updated_at = datetime('now', 'localtime') + """, + (name, href, is_full_data) + ) + conn.commit() + + # 获取插入的 movie_id + actor_id = get_id_by_href('javdb_actors', href) + if actor_id is None: + return None + + return actor_id + + except Exception as e: + conn.rollback() + logging.error("Error inserting movie: %s", e) + return None + + +# 删除演员 +def delete_actor_by_href(href): + try: + cursor.execute('DELETE FROM javdb_actors WHERE href = ?', (href,)) + conn.commit() + logging.info(f"成功删除演员: {href}") + except Exception as e: + logging.error(f"删除演员 {href} 失败: {e}") + conn.rollback() + +# 查询 +def query_actors(**filters): + try: + sql = "SELECT href, name FROM javdb_actors WHERE 1=1" + params = [] + + conditions = { + "id": " AND id = ?", + "href": " AND href = ?", + "name": " AND name LIKE ?", + "is_full_data": " AND is_full_data = ?", + "from_actor_list": " AND from_actor_list = ?", + "before_updated_at": " AND updated_at <= ?", + "after_updated_at": " AND updated_at >= ?", + "start_id": " AND id > ?", + } + + for key, condition in conditions.items(): + if key in filters: + sql += condition + if key == "name": + params.append(f"%{filters[key]}%") + else: + params.append(filters[key]) + + for key in ["is_full_data_in", "is_full_data_not_in"]: + if key in filters: + values = filters[key] + if values: + placeholders = ", ".join(["?"] * len(values)) + operator = "IN" if key == "is_full_data_in" else "NOT IN" + sql += f" AND is_full_data {operator} ({placeholders})" + params.extend(values) + + if "order_by" in filters: + # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 + sql += f" ORDER BY {filters['order_by']} " + + if 'limit' in filters: + sql += " LIMIT ?" + params.append(filters["limit"]) + + cursor.execute(sql, params) + #return [row[0].lower() for row in cursor.fetchall()] # 返回小写 + return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()] + + except sqlite3.Error as e: + logging.error(f"查询 href 失败: {e}") + return None + + +# 插入或更新发行商 """ +def insert_or_update_makers(data, caller='list'): + try: + if caller == 'list': + cursor.execute(""" + INSERT INTO javdb_makers (name, href, from_list, updated_at) + VALUES (?, ? , 1, datetime('now', 'localtime')) + ON CONFLICT(href) DO UPDATE SET + name = excluded.name, + from_list = 1, + updated_at = datetime('now', 'localtime') + """, (data["name"], data["href"])) + conn.commit() + elif caller == 'movie': + cursor.execute(""" + INSERT INTO javdb_makers (name, href, from_movie_list, updated_at) + VALUES (?, ? , 1, datetime('now', 'localtime')) + ON CONFLICT(href) DO UPDATE SET + name = excluded.name, + from_movie_list = 1, + updated_at = datetime('now', 'localtime') + """, (data["name"], data["href"])) + conn.commit() + else: + logging.warning(f"unexpected caller: {caller}") + return None + + # 获取 performer_id + cursor.execute("SELECT id FROM javdb_makers WHERE href = ?", (data["href"],)) + dist_id = cursor.fetchone()[0] + if dist_id: + logging.debug(f"成功插入/更新发行商: {data['name']}") + return dist_id + else: + return None + except sqlite3.Error as e: + conn.rollback() + logging.error(f"数据库错误: {e}") + return None + +# 删除发行商(按 id 或 name) """ +def delete_maker(identifier): + try: + if isinstance(identifier, int): + cursor.execute("DELETE FROM javdb_makers WHERE id = ?", (identifier,)) + elif isinstance(identifier, str): + cursor.execute("DELETE FROM javdb_makers WHERE name = ?", (identifier,)) + conn.commit() + logging.info(f"成功删除发行商: {identifier}") + except sqlite3.Error as e: + conn.rollback() + logging.error(f"删除失败: {e}") + +# 查询发行商(按 id 或 name) """ +def query_maker(identifier): + try: + if isinstance(identifier, int): + cursor.execute("SELECT * FROM javdb_makers WHERE id = ?", (identifier,)) + else: + cursor.execute("SELECT * FROM javdb_makers WHERE name LIKE ?", (f"%{identifier}%",)) + + distributor = cursor.fetchone() + if distributor: + return dict(zip([desc[0] for desc in cursor.description], distributor)) + else: + logging.warning(f"未找到发行商: {identifier}") + return None + except sqlite3.Error as e: + logging.error(f"查询失败: {e}") + return None + +# 按条件查询 href 列表 +def query_maker_hrefs(**filters): + try: + sql = "SELECT href, id, from_list FROM javdb_makers WHERE 1=1" + params = [] + + if "id" in filters: + sql += " AND id = ?" + params.append(filters["id"]) + if "from_list" in filters: + sql += " AND from_list = ?" + params.append(filters["from_list"]) + if "url" in filters: + sql += " AND href = ?" + params.append(filters["href"]) + if "name" in filters: + sql += " AND name LIKE ?" + params.append(f"%{filters['name']}%") + if 'limit' in filters: + sql += " limit ?" + params.append(filters["limit"]) + + cursor.execute(sql, params) + #return [row[0] for row in cursor.fetchall()] # 链接使用小写 + return [{'href': row[0], 'id': row[1], 'from_list':row[2]} for row in cursor.fetchall()] + + except sqlite3.Error as e: + logging.error(f"查询 href 失败: {e}") + return None + +# """ 插入或更新制作公司 """ +def insert_or_update_series(data, caller='list'): + try: + if caller == 'list': + cursor.execute(""" + INSERT INTO javdb_series (name, href, from_list, updated_at) + VALUES (?, ? , 1, datetime('now', 'localtime')) + ON CONFLICT(href) DO UPDATE SET + name = excluded.name, + from_list = 1, + updated_at = datetime('now', 'localtime') + """, (data["name"], data["href"])) + conn.commit() + elif caller == 'movie': + cursor.execute(""" + INSERT INTO javdb_series (name, href, from_movie_list, updated_at) + VALUES (?, ? , 1, datetime('now', 'localtime')) + ON CONFLICT(href) DO UPDATE SET + name = excluded.name, + from_movie_list = 1, + updated_at = datetime('now', 'localtime') + """, (data["name"], data["href"])) + conn.commit() + else: + logging.warning(f"unexpected caller: {caller}") + return None + + # 获取 performer_id + cursor.execute("SELECT id FROM javdb_series WHERE href = ?", (data["href"],)) + stu_id = cursor.fetchone()[0] + if stu_id: + logging.debug(f"成功插入/更新发行商: {data['name']}") + return stu_id + else: + return None + except sqlite3.Error as e: + conn.rollback() + logging.error(f"数据库错误: {e}") + return None + +# """ 删除制作公司(按 id 或 name) """ +def delete_series(identifier): + try: + if isinstance(identifier, int): + cursor.execute("DELETE FROM javdb_series WHERE id = ?", (identifier,)) + elif isinstance(identifier, str): + cursor.execute("DELETE FROM javdb_series WHERE name = ?", (identifier,)) + conn.commit() + logging.info(f"成功删除制作公司: {identifier}") + except sqlite3.Error as e: + conn.rollback() + logging.error(f"删除失败: {e}") + +# """ 查询制作公司(按 id 或 name) """ +def query_series(identifier): + try: + if isinstance(identifier, int): + cursor.execute("SELECT * FROM javdb_series WHERE id = ?", (identifier,)) + else: + cursor.execute("SELECT * FROM javdb_series WHERE name LIKE ?", (f"%{identifier}%",)) + + studio = cursor.fetchone() + if studio: + return dict(zip([desc[0] for desc in cursor.description], studio)) + else: + logging.warning(f"未找到制作公司: {identifier}") + return None + except sqlite3.Error as e: + logging.error(f"查询失败: {e}") + return None + +# 按条件查询 href 列表 +def query_series_hrefs(**filters): + try: + sql = "SELECT href, id, from_list FROM javdb_series WHERE 1=1" + params = [] + + if "id" in filters: + sql += " AND id = ?" + params.append(filters["id"]) + if "from_list" in filters: + sql += " AND from_list = ?" + params.append(filters["from_list"]) + if "href" in filters: + sql += " AND href = ?" + params.append(filters["href"]) + if "name" in filters: + sql += " AND name LIKE ?" + params.append(f"%{filters['name']}%") + if 'limit' in filters: + sql += " limit ?" + params.append(filters["limit"]) + + cursor.execute(sql, params) + #return [row[0] for row in cursor.fetchall()] # 链接使用小写 + #return [{'href': row[0], 'id': row[1]} for row in cursor.fetchall()] + return [{'href': row[0], 'id': row[1], 'from_list':row[2]} for row in cursor.fetchall()] + + except sqlite3.Error as e: + logging.error(f"查询 href 失败: {e}") + return None + +# 插入或更新发行商 """ +def insert_or_update_publishers(data, caller='list'): + try: + if caller == 'list': + cursor.execute(""" + INSERT INTO javdb_publishers (name, href, from_list, updated_at) + VALUES (?, ? , 1, datetime('now', 'localtime')) + ON CONFLICT(href) DO UPDATE SET + name = excluded.name, + from_list = 1, + updated_at = datetime('now', 'localtime') + """, (data["name"], data["href"])) + conn.commit() + elif caller == 'movie': + cursor.execute(""" + INSERT INTO javdb_publishers (name, href, from_movie_list, updated_at) + VALUES (?, ? , 1, datetime('now', 'localtime')) + ON CONFLICT(href) DO UPDATE SET + name = excluded.name, + from_movie_list = 1, + updated_at = datetime('now', 'localtime') + """, (data["name"], data["href"])) + conn.commit() + else: + logging.warning(f"unexpected caller: {caller}") + return None + + # 获取 performer_id + cursor.execute("SELECT id FROM javdb_publishers WHERE href = ?", (data["href"],)) + dist_id = cursor.fetchone()[0] + if dist_id: + logging.debug(f"成功插入/更新发行商: {data['name']}") + return dist_id + else: + return None + except sqlite3.Error as e: + conn.rollback() + logging.error(f"数据库错误: {e}") + return None + +# 删除发行商(按 id 或 name) """ +def delete_publishers(identifier): + try: + if isinstance(identifier, int): + cursor.execute("DELETE FROM javdb_publishers WHERE id = ?", (identifier,)) + elif isinstance(identifier, str): + cursor.execute("DELETE FROM javdb_publishers WHERE name = ?", (identifier,)) + conn.commit() + logging.info(f"成功删除发行商: {identifier}") + except sqlite3.Error as e: + conn.rollback() + logging.error(f"删除失败: {e}") + +# 查询发行商(按 id 或 name) """ +def query_publishers(identifier): + try: + if isinstance(identifier, int): + cursor.execute("SELECT * FROM javdb_publishers WHERE id = ?", (identifier,)) + else: + cursor.execute("SELECT * FROM javdb_publishers WHERE name LIKE ?", (f"%{identifier}%",)) + + distributor = cursor.fetchone() + if distributor: + return dict(zip([desc[0] for desc in cursor.description], distributor)) + else: + logging.warning(f"未找到发行商: {identifier}") + return None + except sqlite3.Error as e: + logging.error(f"查询失败: {e}") + return None + +# 按条件查询 href 列表 +def query_publishers_hrefs(**filters): + try: + sql = "SELECT href, id FROM javdb_publishers WHERE 1=1" + params = [] + + if "id" in filters: + sql += " AND id = ?" + params.append(filters["id"]) + if "from_list" in filters: + sql += " AND from_list = ?" + params.append(filters["from_list"]) + if "url" in filters: + sql += " AND href = ?" + params.append(filters["href"]) + if "name" in filters: + sql += " AND name LIKE ?" + params.append(f"%{filters['name']}%") + if 'limit' in filters: + sql += " limit ?" + params.append(filters["limit"]) + + cursor.execute(sql, params) + #return [row[0] for row in cursor.fetchall()] # 链接使用小写 + return [{'href': row[0], 'id': row[1]} for row in cursor.fetchall()] + + except sqlite3.Error as e: + logging.error(f"查询 href 失败: {e}") + return None + + +# 插入或更新类别 """ +def insert_or_update_tags(name, href): + try: + if href in cached_tags: + return cached_tags[href]['id'] + + cursor.execute(""" + INSERT INTO javdb_tags (name, href, updated_at) + VALUES (?, ? , datetime('now', 'localtime')) + ON CONFLICT(href) DO UPDATE SET + name = excluded.name, + updated_at = datetime('now', 'localtime') + """, (name, href)) + conn.commit() + + cursor.execute("SELECT id, name, href FROM javdb_tags") + for row in cursor.fetchall(): + cached_tags[row[2]] = {'id': row[0], 'name':row[2]} + + if href in cached_tags: + dist_id = cached_tags[href]['id'] + logging.debug(f"insert/update tags succ. id: {dist_id}, name: {name}") + return dist_id + else: + return None + except sqlite3.Error as e: + conn.rollback() + logging.error(f"数据库错误: {e}") + return None + +# 查询tags +def query_tags(href, name): + global cached_tags + try: + if href not in cached_tags: + cursor.execute("SELECT id, name, href FROM javdb_tags") + for row in cursor.fetchall(): + cached_tags[row[2]] = {'id': row[0], 'name':row[2]} + + if href in cached_tags: + return cached_tags[href]['id'], cached_tags[href]['name'] + except sqlite3.Error as e: + logging.error(f"查询失败: {e}") + return 0, name + +# 插入影片和tags的关联数据 +def insert_movie_tags( movie_id, tag_id, tags=''): + try: + cursor.execute(""" + INSERT INTO javdb_movies_tags (movie_id, tag_id, tags, updated_at) + VALUES (?, ?, ?, datetime('now', 'localtime')) + ON CONFLICT(tag_id, movie_id) DO UPDATE SET tags=excluded.tags, updated_at=datetime('now', 'localtime') + """, + (movie_id, tag_id, tags) + ) + conn.commit() + + #logging.debug(f'insert one performer_movie, performer_id: {performer_id}, movie_id: {movie_id}') + + return movie_id + + except Exception as e: + conn.rollback() + logging.error("Error inserting movie: %s", e) + return None + +# """插入或更新电影数据""" +def insert_or_update_movie(movie): + try: + # 获取相关 ID + makers_id = get_id_by_href('javdb_makers', movie['maker_link']) if movie['maker_link'] else None + series_id = get_id_by_href('javdb_series', movie['series_link']) if movie['series_link'] else None + pub_id = get_id_by_href('javdb_publishers', movie['pub_link']) if movie['pub_link'] else None + + # 如果不存在,插入 + if makers_id is None and movie['maker_link']: + makers_id = insert_or_update_makers({'name' : movie.get('maker_name', ''), 'href' : movie.get('maker_link', '')}, caller='movie') + if series_id is None and movie['series_link']: + series_id = insert_or_update_series({'name' : movie.get('series_name', ''), 'href' : movie.get('series_link', '')}, caller='movie') + if pub_id is None and movie['pub_link']: + pub_id = insert_or_update_publishers({'name' : movie.get('pub_name', ''), 'href' : movie.get('pub_link', '')}, caller='movie') + + cursor.execute(""" + INSERT INTO javdb_movies (href, title, cover_url, serial_number, release_date, duration, + maker_id, series_id, pub_id, is_full_data, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, datetime('now', 'localtime')) + ON CONFLICT(href) DO UPDATE SET + title=excluded.title, + cover_url=excluded.cover_url, + serial_number=excluded.serial_number, + release_date=excluded.release_date, + duration=excluded.duration, + maker_id=excluded.maker_id, + series_id=excluded.series_id, + pub_id=excluded.pub_id, + is_full_data=1, + updated_at=datetime('now', 'localtime') + """, (movie['href'], movie['title'], movie['cover_url'], movie['serial_number'], + movie['release_date'], movie['duration'], makers_id, series_id, pub_id)) + + conn.commit() + + # 获取插入的 movie_id + movie_id = get_id_by_href('javdb_movies', movie['href']) + if movie_id is None: + return None + + logging.debug(f"insert one move, id: {movie_id}, title: {movie['title']}, href: {movie['href']}") + + # 插入 performers_movies 关系表 + for performer in movie.get('actors', []): + performer_id = get_id_by_href('javdb_actors', performer['href']) + # 如果演员不存在,先插入 + if performer_id is None: + performer_id = insert_actor_index(performer['name'], performer['href'], from_movie_list=1) + logging.debug(f"insert new perfomer. perfomer_id: {performer_id}, name:{performer['name']}") + if performer_id: + tmp_id = insert_actor_movie(performer_id, movie_id) + if tmp_id: + logging.debug(f"insert one perfomer_movie. perfomer_id: {performer_id}, movie_id:{movie_id}") + else: + logging.debug(f"insert perfomer_movie failed. perfomer_id: {performer_id}, movie_id:{movie_id}") + else: + logging.warning(f"insert perfomer failed. name: {performer['name']}, href: {performer['href']}") + + # 插入 tags 表 + for tag in movie.get('tags', []): + tag_name = tag.get('name', '') + tag_href = tag.get('href', '') + tag_id = insert_or_update_tags(tag_name, tag_href) + if tag_id: + logging.debug(f"insert one tags. tag_id: {tag_id}, name: {tag_name}") + tmp_id = insert_movie_tags(movie_id=movie_id, tag_id=tag_id, tags=tag_name) + if tmp_id: + logging.debug(f"insert one movie_tag. movie_id: {movie_id}, tag_id: {tag_id}, name: {tag_name}") + else: + logging.warning(f"insert one movie_tag error. movie_id: {movie_id}, tag_id: {tag_id}, name: {tag_name}") + else: + logging.warning(f"insert tags error. name:{tag_name}, href: {tag_href}") + + return movie_id + + except Exception as e: + conn.rollback() + logging.error("Error inserting movie: %s", e) + return None + +# """插入或更新电影数据(异常url的处理,比如404链接)""" +def insert_or_update_movie_404(title, href, is_full_data=1): + try: + # 插入或更新电影信息 + cursor.execute( + """ + INSERT INTO javdb_movies (title, href, is_full_data, updated_at) + VALUES (?, ?, ?, datetime('now', 'localtime')) + ON CONFLICT(href) DO UPDATE SET + title=excluded.title, is_full_data=excluded.is_full_data, updated_at = datetime('now', 'localtime') + """, + (title, href, is_full_data) + ) + conn.commit() + + # 获取插入的 movie_id + movie_id = get_id_by_href('javdb_movies', href) + if movie_id is None: + return None + + return movie_id + + except Exception as e: + conn.rollback() + logging.error("Error inserting movie: %s", e) + return None + + +# 删除电影数据""" +def delete_movie(identifier): + try: + if isinstance(identifier, int): + cursor.execute("DELETE FROM javdb_movies WHERE id = ?", (identifier,)) + elif isinstance(identifier, str): + cursor.execute("DELETE FROM javdb_movies WHERE href = ?", (identifier,)) + else: + logging.warning("无效的删除参数") + return + conn.commit() + logging.info(f"Deleted movie with {identifier}") + + except sqlite3.Error as e: + conn.rollback() + logging.error("Error deleting movie: %s", e) + +# 查找电影数据""" +def query_movies(identifier): + try: + if isinstance(identifier, int): + cursor.execute("SELECT * FROM javdb_movies WHERE id = ?", (identifier,)) + elif "http" in identifier: + cursor.execute("SELECT * FROM javdb_movies WHERE href = ?", (identifier,)) + else: + cursor.execute("SELECT * FROM javdb_movies WHERE title LIKE ?", (f"%{identifier}%",)) + + movie = cursor.fetchone() + if movie: + cursor.execute("SELECT * FROM javdb_actors_movies WHERE performer_id = ?", (movie[0],)) + performers = [row[0] for row in cursor.fetchall()] + result = dict(zip([desc[0] for desc in cursor.description], performers)) + result["performers"] = performers + return result + else: + logging.warning(f"find no data: {identifier}") + return None + + except sqlite3.Error as e: + logging.error(f"查询失败: {e}") + return None +''' +# 按条件查询 href 列表 +def query_movie_hrefs_old(**filters): + try: + sql = "SELECT href, title, id FROM javdb_movies WHERE 1=1" + params = [] + + if "id" in filters: + sql += " AND id = ?" + params.append(filters["id"]) + if "href" in filters: + sql += " AND href = ?" + params.append(filters["href"]) + if "title" in filters: + sql += " AND title LIKE ?" + params.append(f"%{filters['title']}%") + if "is_full_data" in filters: + sql += " AND is_full_data = ?" + params.append(filters["is_full_data"]) + if "from_actor_list" in filters: + sql += " AND from_actor_list = ?" + params.append(filters["from_actor_list"]) + if "is_full_data_in" in filters: + values = filters["is_full_data_in"] + if values: + placeholders = ", ".join(["?"] * len(values)) + sql += f" AND is_full_data IN ({placeholders})" + params.extend(values) + if "is_full_data_not_in" in filters: + values = filters["is_full_data_not_in"] + if values: + placeholders = ", ".join(["?"] * len(values)) + sql += f" AND is_full_data NOT IN ({placeholders})" + params.extend(values) + if "before_updated_at" in filters: + sql += " AND updated_at <= ?" + params.append(filters["before_updated_at"]) + if "after_updated_at" in filters: + sql += " AND updated_at >= ?" + params.append(filters["after_updated_at"]) + if "start_id" in filters: + sql += " AND id > ?" + params.append(filters["start_id"]) + if "order_by" in filters: + sql += " order by ?" + params.append(filters["order_by"]) + if 'limit' in filters: + sql += " limit ?" + params.append(filters["limit"]) + + cursor.execute(sql, params) + #return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写 + return [{'href': row[0], 'title': row[1], 'id':row[2]} for row in cursor.fetchall()] + + except sqlite3.Error as e: + logging.error(f"查询 href 失败: {e}") + return [] +''' +# 查询 +def query_movie_hrefs(**filters): + try: + sql = "SELECT href, title, id FROM javdb_movies WHERE 1=1" + params = [] + + conditions = { + "id": " AND id = ?", + "href": " AND href = ?", + "title": " AND title LIKE ?", + "is_full_data": " AND is_full_data = ?", + "uncensored": " AND uncensored = ?", + "from_actor_list": " AND from_actor_list = ?", + "before_updated_at": " AND updated_at <= ?", + "after_updated_at": " AND updated_at >= ?", + "start_id": " AND id > ?", + } + + for key, condition in conditions.items(): + if key in filters: + sql += condition + if key == "title": + params.append(f"%{filters[key]}%") + else: + params.append(filters[key]) + + for key in ["is_full_data_in", "is_full_data_not_in"]: + if key in filters: + values = filters[key] + if values: + placeholders = ", ".join(["?"] * len(values)) + operator = "IN" if key == "is_full_data_in" else "NOT IN" + sql += f" AND is_full_data {operator} ({placeholders})" + params.extend(values) + + if "order_by" in filters: + # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 + sql += f" ORDER BY {filters['order_by']} " + + if 'limit' in filters: + sql += " LIMIT ?" + params.append(filters["limit"]) + + cursor.execute(sql, params) + #return [row[0].lower() for row in cursor.fetchall()] # 返回小写 + #return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()] + return [{'href': row[0], 'title': row[1], 'id':row[2]} for row in cursor.fetchall()] + + except sqlite3.Error as e: + logging.error(f"查询 href 失败: {e}") + return None + +# 插入一条任务日志 +def insert_task_log(): + try: + cursor.execute(""" + INSERT INTO javdb_task_log (task_status) VALUES ('Start') + """) + conn.commit() + + task_id = cursor.lastrowid + if task_id is None: + return None + update_task_log(task_id=task_id, task_status='Start') + + return task_id # 获取插入的 task_id + except sqlite3.Error as e: + logging.error(f"插入任务失败: {e}") + return None + +# 更新任务日志的字段 +def update_task_log_inner(task_id, **kwargs): + try: + fields = ", ".join(f"{key} = ?" for key in kwargs.keys()) + params = list(kwargs.values()) + [task_id] + + sql = f"UPDATE javdb_task_log SET {fields}, updated_at = datetime('now', 'localtime') WHERE task_id = ?" + cursor.execute(sql, params) + conn.commit() + except sqlite3.Error as e: + logging.error(f"更新任务 {task_id} 失败: {e}") + +# 更新任务日志的字段 +def update_task_log(task_id, task_status): + try: + # 获取 performers、studios 等表的最终行数 + cursor.execute("SELECT COUNT(*) FROM javdb_actors where is_full_data=1") + full_data_actors = cursor.fetchone()[0] + cursor.execute("SELECT COUNT(*) FROM javdb_actors") + total_actors = cursor.fetchone()[0] + + cursor.execute("SELECT COUNT(*) FROM javdb_movies where is_full_data=1") + full_data_movies = cursor.fetchone()[0] + cursor.execute("SELECT COUNT(*) FROM javdb_movies") + total_movies = cursor.fetchone()[0] + + cursor.execute("SELECT COUNT(*) FROM javdb_makers") + total_makers = cursor.fetchone()[0] + + cursor.execute("SELECT COUNT(*) FROM javdb_series") + total_series = cursor.fetchone()[0] + + # 更新 task_log + update_task_log_inner(task_id, + full_data_actors=full_data_actors, + total_actors=total_actors, + full_data_movies=full_data_movies, + total_movies=total_movies, + total_makers=total_makers, + total_series=total_series, + task_status=task_status) + + except sqlite3.Error as e: + logging.error(f"更新任务 {task_id} 失败: {e}") + + +# 任务结束,更新字段 +def finalize_task_log(task_id): + try: + # 更新 task_log + update_task_log(task_id, task_status="Success") + except sqlite3.Error as e: + logging.error(f"任务 {task_id} 结束失败: {e}") + + +# 测试代码 +if __name__ == "__main__": + + sample_data = [ + { + 'name': '上原亜衣', + 'href': 'https://www.javdb.com/actors/MkAX', + 'pic': 'https://c0.jdbstatic.com/avatars/mk/MkAX.jpg', + 'alias': ['上原亜衣', '下原舞', '早瀬クリスタル', '阿蘇山百式屏風奉行'] + }, + { + 'name': '大橋未久', + 'href': 'https://www.javdb.com/actors/21Jp', + 'pic': 'https://c0.jdbstatic.com/avatars/21/21Jp.jpg', + 'alias': ['大橋未久'] + }, + ] + + for actor in sample_data: + insert_or_update_actor(actor) + + print(query_actors("name LIKE '%未久%'")) + #delete_actor_by_href('https://www.javdb.com/actors/MkAX') + print(query_actors()) diff --git a/src/javbus/fetch.py b/src/javbus/fetch.py new file mode 100644 index 0000000..92c560a --- /dev/null +++ b/src/javbus/fetch.py @@ -0,0 +1,521 @@ + +import json +import time +import csv +import argparse +import textwrap +import logging +from functools import partial +from urllib.parse import urljoin, urlparse +import src.config.config as config +import src.logger.logger as logger +import src.db_utils.db_javbus as db_tools +import src.crawling.craw_common as scraper_base +import src.crawling.craw_javbus as scraper +import src.utils.utils as utils + +logger.setup_logging() + +debug = False +skip_local = False +scan_mode = 0 +update_mode = 0 + +# 获取演员列表 +def fetch_actor_list_lang(lang="en", uncensored=None): + if uncensored: + un_flag = 1 + s_url = f"/{lang}/uncensored/actresses" if lang != 'zh' else f"/uncensored/actresses" + else: + un_flag = 0 + s_url = f"/{lang}/actresses" if lang != 'zh' else f"/actresses" + + current_url = urljoin(scraper.host_url, s_url) + num = 1 + while current_url: + logging.info(f"fetching url {current_url}") + soup, status_code = scraper_base.fetch_page(current_url, partial(scraper_base.generic_validator, tag="div", identifier="waterfall", attr_type="id"), headers=scraper.headers, cookies=scraper.cookies) + if soup: + list_data, current_url = scraper.parse_actors_list(soup, current_url) + if list_data : + # 写入数据库 + for row in list_data: + row[f'{lang}_name'] = row['name'] + row['href'] = utils.normalize_url(row['href']) + row_id = db_tools.insert_actor_index(row, uncensored=un_flag, from_actor_list=1) + if row_id: + logging.debug(f'insert actor to db. row_id:{row_id}, data: {row}') + else: + logging.warning(f'insert actor failed. data: {row}') + else: + logging.warning(f'fetch actor error. {current_url} ...') + + elif status_code and status_code == 404: + logging.warning(f'fetch page error. httpcode: {status_code}, url: {current_url}') + break + time.sleep(0.3) + + # 调试break + if debug: + return True + +# 获取演员列表 +def fetch_actor_list(): + #for lang in ["en", "ja", "zh"]: + for lang in ['ja']: + fetch_actor_list_lang(lang=lang, uncensored=1) + + #for lang in ["en", "ja", "zh"]: + for lang in ['ja']: + fetch_actor_list_lang(lang=lang) + + +# 获取演员列表 +def fetch_actor_list2(): + next_url = scraper.actors_uncensored_base_url + while next_url: + logging.info(f'fetching page {next_url}') + soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="actors", attr_type="id")) + if soup: + list_data, next_url = scraper.parse_actors_uncensored(soup, next_url) + if list_data : + # 写入数据库 + for row in list_data: + actor_id = db_tools.insert_actor_index(name=row['name'], href=row.get('href', ''), from_actor_list=1) + if actor_id: + logging.debug(f'insert performer index to db. performer_id:{actor_id}, name: {row['name']}, href:{row['href']}') + else: + logging.warning(f'insert performer index failed. name: {row['name']}, href:{row['href']}') + else: + logging.warning(f'fetch actor error. {next_url} ...') + elif status_code and status_code == 404: + logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}') + break + +# 获取makers列表 +def fetch_makers_list(): + next_url = scraper.makers_uncensored_base_url + while next_url: + logging.info(f'fetching page {next_url}') + soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="makers", attr_type="id")) + if soup: + list_data, next_url = scraper.parse_makers_uncensored(soup, next_url) + if list_data : + # 写入数据库 + for row in list_data: + maker_id = db_tools.insert_or_update_makers(row, caller='list') + if maker_id: + logging.debug(f'insert maker to db. maker_id:{maker_id}, name: {row['name']}, href:{row['href']}') + else: + logging.warning(f'insert maker failed. name: {row['name']}, href:{row['href']}') + else: + logging.warning(f'fetch actor error. {next_url} ...') + + elif status_code and status_code == 404: + logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}') + break + +# 获取series列表 +def fetch_series_list(): + next_url = scraper.series_uncensored_base_url + while next_url: + logging.info(f'fetching page {next_url}') + soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="series", attr_type="id")) + if soup: + list_data, next_url = scraper.parse_series_uncensored(soup, next_url) + if list_data : + # 写入数据库 + for row in list_data: + maker_id = db_tools.insert_or_update_series(row, caller='list') + if maker_id: + logging.debug(f'insert series to db. maker_id:{maker_id}, name: {row['name']}, href:{row['href']}') + else: + logging.warning(f'insert series failed. name: {row['name']}, href:{row['href']}') + else: + logging.warning(f'fetch actor error. {next_url} ...') + + elif status_code and status_code == 404: + logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}') + break + + +# 更新makers列表中的影片信息 +def fetch_movies_by_maker(): + if debug: + url_list = db_tools.query_maker_hrefs(name='muramura') + else: + if scan_mode==1: + url_list = db_tools.query_maker_hrefs(from_list=1) + elif scan_mode==0: + url_list = db_tools.query_maker_hrefs(from_list=0) + else: + url_list = db_tools.query_maker_hrefs() + + for row in url_list: + url = row['href'] + row_id = row['id'] + uncensored = row['from_list'] if row['from_list'] > 0 else None + # 去掉可下载的标志(如果有) + next_url = utils.remove_url_query(url) + while next_url: + logging.info(f"Fetching data for maker url {next_url} ...") + soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="column section-title", attr_type="class")) + if soup: + list_data, next_url = scraper.parse_maker_detail(soup, next_url) + if list_data: + for movie in list_data: + tmp_id = db_tools.insert_movie_index(title=movie['title'], href=movie['href'], from_movie_makers=1, maker_id=row_id, uncensored=uncensored) + if tmp_id: + logging.debug(f'insert one movie index to db. movie_id: {tmp_id}, title: {movie['title']}, href: {movie['href']}') + else: + logging.warning(f'insert movie index failed. title: {movie['title']}, href: {movie['href']}') + else : + logging.warning(f'parse_page_movie error. url: {next_url}') + + elif status_code and status_code == 404: + logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}') + break + + # 调试增加brak + if debug: + return True + +# 更新series列表中的影片信息 +def fetch_movies_by_series(): + if debug: + url_list = db_tools.query_series_hrefs(name='10musume') + else: + if scan_mode == 1: + url_list = db_tools.query_series_hrefs(from_list=1) + elif scan_mode == 0: + url_list = db_tools.query_series_hrefs(from_list=0) + else: + url_list = db_tools.query_series_hrefs() + + for row in url_list: + url = row['href'] + row_id = row['id'] + uncensored = row['from_list'] if row['from_list'] > 0 else None + # 去掉可下载的标志(如果有) + next_url = utils.remove_url_query(url) + while next_url: + logging.info(f"Fetching data for series url {next_url} ...") + soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="column section-title", attr_type="class")) + if soup: + list_data, next_url = scraper.parse_series_detail(soup, next_url) + if list_data: + for movie in list_data: + tmp_id = db_tools.insert_movie_index(title=movie['title'], href=movie['href'], from_movie_series=1, series_id=row_id, uncensored=uncensored) + if tmp_id: + logging.debug(f'insert one movie index to db. movie_id: {tmp_id}, title: {movie['title']}, href: {movie['href']}') + else: + logging.warning(f'insert movie index failed. title: {movie['title']}, href: {movie['href']}') + else : + logging.warning(f'parse_page_movie error. url: {next_url}') + elif status_code and status_code == 404: + logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}') + break + + # 调试增加brak + if debug: + return True + +# 更新series列表中的影片信息 +def fetch_movies_by_publishers(): + if debug: + url_list = db_tools.query_publishers_hrefs(limit=1) + else: + if scan_mode == 1: + url_list = db_tools.query_publishers_hrefs(from_list=1) + elif scan_mode == 0: + url_list = db_tools.query_publishers_hrefs(from_list=0) + else: + url_list = db_tools.query_publishers_hrefs() + + for row in url_list: + url = row['href'] + row_id = row['id'] + # 去掉可下载的标志(如果有) + next_url = utils.remove_url_query(url) + while next_url: + logging.info(f"Fetching data for publisher url {next_url} ...") + soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="modal-card", attr_type="class")) + if soup: + list_data, next_url = scraper.parse_publisher_detail(soup, next_url) + if list_data: + for movie in list_data: + tmp_id = db_tools.insert_movie_index(title=movie['title'], href=movie['href'], from_movie_publishers=1, pub_id=row_id) + if tmp_id: + logging.debug(f'insert one movie index to db. movie_id: {tmp_id}, title: {movie['title']}, href: {movie['href']}') + else: + logging.warning(f'insert movie index failed. title: {movie['title']}, href: {movie['href']}') + else : + logging.warning(f'parse_page_movie error. url: {next_url}') + elif status_code and status_code == 404: + logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}') + break + + # 调试增加brak + if debug: + return True + + +# 更新演员信息 +def fetch_performers_detail(): + limit_count = 5 if debug else 100 + performers_list = [] + last_performer_id = 0 + abnormal_codes = [scraper.http_code_404, scraper.http_code_login] + + def get_performers(**kwargs): + if scan_mode == 1: + kwargs["from_actor_list"] = 1 + elif scan_mode == 0: + kwargs["from_actor_list"] = 0 + else: + logging.debug(f"scan all records") + kwargs["order_by"] = 'id asc' + return db_tools.query_actors(limit=limit_count, **kwargs) + + while True: + if update_mode == 0: # 只遍历新纪录 + performers_list = get_performers(start_id=0, is_full_data=0) + elif update_mode == 1: # 只遍历完整纪录 + performers_list = get_performers(start_id=last_performer_id, is_full_data=1) + elif update_mode == 2: # 0+1 + performers_list = get_performers(start_id=last_performer_id, is_full_data_not_in=abnormal_codes) + elif update_mode == 3: # 其他 + performers_list = get_performers(start_id=last_performer_id, is_full_data_in =abnormal_codes) + else: # 全部 + performers_list = get_performers(start_id=last_performer_id) + + if len(performers_list) < 1: + logging.info(f'all performers fetched.') + break + + succ_rows = 0 + for performer in performers_list: + url = performer['href'] + person = performer['name'] + pic = '' + alias = [] + + next_url = url + all_movies = [] + need_insert = True + while next_url: + logging.debug(f"Fetching data for actor ({person}), url {next_url} ...") + soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="span", identifier="actor-section-name", attr_type="class")) + if soup: + data, next_url = scraper.parse_actor_detail(soup, next_url) + if data: + pic = data.get('pic', '') + alias = data.get('alias', []) + all_movies.extend(data.get('movies', [])) + + elif status_code and status_code == scraper.http_code_404: + actor_id = db_tools.insert_or_update_actor_404(name=person, href=url, is_full_data=scraper.http_code_404) + logging.warning(f'404 page. id: {actor_id}, name: ({person}), url: {url}, Skiping...') + need_insert = False + break + elif status_code and status_code == scraper.http_code_login: + actor_id = db_tools.insert_or_update_actor_404(name=person, href=url, is_full_data=scraper.http_code_login) + logging.warning(f'401 page(need login). id: {actor_id}, name: ({person}), url: {url}, Skiping...') + need_insert = False + break + else: + logging.warning(f'fetch_page error. url: {url}') + + # 如果出现了401或者404,已经处理,直接跳过 + if not need_insert: + continue + + # 获取完了个人的所有影片,开始插入数据 + performer_id = db_tools.insert_or_update_actor({ + 'href': url, + 'name': person, + 'pic' : pic, + 'alias' : alias, + 'credits':all_movies + }) + if performer_id: + logging.debug(f'insert one person, id: {performer_id}, person: ({person}), url: {url}') + last_performer_id = performer_id + succ_rows += 1 + else: + logging.warning(f'insert person: ({person}) {url} failed.') + time.sleep(0.5) + + logging.info(f'total request: {len(performers_list)}, succ: {succ_rows}, last performer id: {last_performer_id}') + # 调试break + if debug: + return True + +# 更新影片信息 +def fetch_movies_detail(): + limit_count = 10 if debug else 100 + movies_list = [] + last_movie_id = 0 + abnormal_codes = [scraper.http_code_404, scraper.http_code_login] + + def get_movies(**kwargs): + if scan_mode == 1: + kwargs["uncensored"] = 1 + elif scan_mode == 0: + kwargs["uncensored"] = 0 + else: + logging.debug(f"scan all records.") + kwargs["order_by"] = 'id asc' + return db_tools.query_movie_hrefs(limit=limit_count, **kwargs) + + while True: + if update_mode == 0: # 只遍历新纪录 + movies_list = get_movies(start_id=0, is_full_data=0) + elif update_mode == 1: # 只遍历完整纪录 + movies_list = get_movies(start_id=last_movie_id, is_full_data=1) + elif update_mode == 2: # 0+1 + movies_list = get_movies(start_id=last_movie_id, is_full_data_not_in=abnormal_codes) + elif update_mode == 3: # 其他 + movies_list = get_movies(start_id=last_movie_id, is_full_data_in =abnormal_codes) + else: # 全部 + movies_list = get_movies(start_id=last_movie_id) + + if len(movies_list) < 1: + logging.info(f'all performers fetched.') + break + + succ_count = 0 + for movie in movies_list: + url = movie['href'] + title = movie['title'] + curr_id = movie['id'] + logging.debug(f"Fetching data for movie ({title}), url {url} ...") + soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="video-meta-panel", attr_type="class")) + # 从本地读取的文件,忽略 + if skip_local and status_code == scraper.http_code_local : + last_movie_id = curr_id + succ_count += 1 + continue + # 解析页面,写入数据库 + if soup: + movie_data = scraper.parse_movie_detail(soup, url, title) + if movie_data : + movie_id = db_tools.insert_or_update_movie(movie_data) + if movie_id: + logging.debug(f'insert one movie, id: {movie_id}, title: ({title}) url: {url}') + last_movie_id = movie_id + succ_count += 1 + else: + logging.warning(f'insert movie {url} failed.') + else: + logging.warning(f'parse_page_movie error. url: {url}') + + elif status_code and status_code == scraper.http_code_404: + movie_id = db_tools.insert_or_update_movie_404(title=title, href=url, is_full_data=scraper.http_code_404) + logging.warning(f'404 page. id: {movie_id}, title: ({title}), url: {url}, Skiping...') + elif status_code and status_code == scraper.http_code_login: + movie_id = db_tools.insert_or_update_movie_404(title=title, href=url, is_full_data=scraper.http_code_login) + logging.warning(f'401 page(need login). id: {movie_id}, title: ({title}), url: {url}, Skiping...') + else: + logging.warning(f'fetch_page error. url: {url}') + time.sleep(0.5) + logging.info(f'total request: {len(movies_list)}, succ: {succ_count}. last movie id: {last_movie_id}') + # 调试增加break + if debug: + return True + + +# 建立缩写到函数的映射 +function_map = { + "actor_list": fetch_actor_list, + "maker_list": fetch_makers_list, + "series_list": fetch_series_list, + "makers": fetch_movies_by_maker, + "series" : fetch_movies_by_series, + "pub" : fetch_movies_by_publishers, + "actors" : fetch_performers_detail, + "movies" : fetch_movies_detail, +} + +# 主函数 +def main(cmd, args): + # 开启任务 + task_id = db_tools.insert_task_log() + if task_id is None: + logging.warning(f'insert task log error.') + return None + + logging.info(f"running task. id: {task_id}, args: {args}") + + # 执行指定的函数 + if cmd: + function_names = args.cmd.split(",") # 拆分输入 + for short_name in function_names: + func = function_map.get(short_name.strip()) # 从映射中获取对应的函数 + if callable(func): + db_tools.update_task_log(task_id, task_status=f'Running {short_name}') + func() + else: + logging.warning(f" {short_name} is not a valid function shortcut.") + else: # 全量执行 + for name, func in function_map.items(): + if callable(func): + db_tools.update_task_log(task_id, task_status=f'Running {name}') + func() + else: + logging.warning(f" {short_name} is not a valid function shortcut.") + + logging.info(f'all process completed!') + db_tools.finalize_task_log(task_id) + + # TODO: + # 1, + +# 设置环境变量 +def set_env(args): + global debug + debug = args.debug + if debug: + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) + + global skip_local + skip_local = args.skip_local + + global scan_mode + scan_mode = args.scan_mode + + global update_mode + if args.update: + update_mode = args.update + +if __name__ == "__main__": + # 命令行参数处理 + keys_str = ",".join(function_map.keys()) + + usage_examples = textwrap.dedent(''' + 示例用法: + python3 ./fetch.py # 遍历新增的所有记录 + python3 ./fetch.py --scan_mode=1 # 遍历新增的 uncensored 记录(无码片) + python3 ./fetch.py --scan_mode=0 # 遍历新增的 非uncensored 记录(有码片) + python3 ./fetch.py --scan_mode=2 # 遍历所有新增 + python3 ./fetch.py --update=4 # 遍历全量的记录 + python3 ./fetch.py --update=4 --scan_mode=1 # 遍历全量的 uncensored 记录(无码片) + python3 ./fetch.py --update=4 --scan_mode=0 # 遍历全量的 非uncensored 记录(有码片) + python3 ./fetch.py --update=4 --scan_mode=2 # 遍历全量记录 + ''') + + parser = argparse.ArgumentParser( + description='fetch javdb data.\n\n' + usage_examples, + formatter_class=argparse.RawDescriptionHelpFormatter + ) + #parser = argparse.ArgumentParser(description='fetch javdb data.') + parser.add_argument("--cmd", type=str, help=f"Comma-separated list of function shortcuts: {keys_str}") + parser.add_argument('--update', type=int, choices=[0, 1, 2, 3, 4], default=0, help='0-只遍历is_full_data=0(默认), 1-只遍历is_full_data=1, 2-遍历is_full_data<=1, 3-只遍历is_full_data>1(异常数据), 4-遍历所有') + parser.add_argument('--scan_mode', type=int, choices=[0, 1, 2], default=1, help='1-只遍历所有 uncensored 的 makers/series/actors/movies(默认), 0-与前者相反, 2-全量') + parser.add_argument('--skip_local', action='store_true', help='如果本地缓存了页面,则跳过数据库操作') + parser.add_argument('--debug', action='store_true', help='Enable debug mode (limit records)') + args = parser.parse_args() + + set_env(args) + main(args.cmd, args) \ No newline at end of file diff --git a/src/logger/logger.py b/src/logger/logger.py new file mode 100644 index 0000000..a546b38 --- /dev/null +++ b/src/logger/logger.py @@ -0,0 +1,99 @@ +import logging +import os +import inspect +import time +from datetime import datetime +from pathlib import Path +from logging.handlers import RotatingFileHandler +from collections import defaultdict +from src.config.config import get_log_directory, get_src_directory + +# 统计日志频率 +log_count = defaultdict(int) # 记录日志的次数 +last_log_time = defaultdict(float) # 记录上次写入的时间戳 + +class RateLimitFilter(logging.Filter): + """ + 频率限制过滤器: + 1. 在 60 秒内,同样的日志最多写入 60 次,超过则忽略 + 2. 如果日志速率超过 100 条/秒,发出告警 + """ + LOG_LIMIT = 600 # 每分钟最多记录相同消息 10 次 + + def filter(self, record): + global log_count, last_log_time + message_key = record.getMessage() # 获取日志内容 + + # 计算当前时间 + now = time.time() + elapsed = now - last_log_time[message_key] + + # 限制相同日志的写入频率 + if elapsed < 60: # 60 秒内 + log_count[message_key] += 1 + if log_count[message_key] > self.LOG_LIMIT: + return False # 直接丢弃 + else: + log_count[message_key] = 1 # 超过 60 秒,重新计数 + + last_log_time[message_key] = now + + return True # 允许写入日志 + + +def get_caller_filename(): + # 获取调用栈 + stack = inspect.stack() + # 当前脚本文件名 + current_script = os.path.basename(__file__) + # 遍历栈帧,找到不是当前脚本的调用者 + for frame_info in stack[1:]: + if os.path.basename(frame_info.filename) != current_script: + caller_path = Path(frame_info.filename) + # 尝试找到 src 目录 + try: + relative_path = caller_path.relative_to(get_src_directory()) + # 去除扩展名 + relative_path_without_ext = relative_path.with_suffix('') + # 替换路径分隔符为下划线 + return str(relative_path_without_ext).replace(os.sep, '-') + except ValueError: + # 如果无法获取相对于 src 的路径,使用原文件名 + return os.path.splitext(os.path.basename(frame_info.filename))[0] + return None + + +def setup_logging(log_filename=None): + # 如果未传入 log_filename,则使用当前脚本名称作为日志文件名 + if log_filename is None: + caller_filename = get_caller_filename() + common_log_dir = get_log_directory() + current_date = datetime.now().strftime('%Y%m%d') + # 拼接 log 文件名,将日期加在扩展名前 + log_filename = f'{common_log_dir}/{caller_filename}_{current_date}.log' + + max_log_size = 100 * 1024 * 1024 # 10 MB + max_log_files = 10 # 最多保留 10 个日志文件 + + file_handler = RotatingFileHandler(log_filename, maxBytes=max_log_size, backupCount=max_log_files) + file_handler.setFormatter(logging.Formatter( + '%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s' + )) + + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter( + '%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s' + )) + + # 创建 logger + logger = logging.getLogger() + logger.setLevel(logging.INFO) + logger.handlers = [] # 避免重复添加 handler + logger.addHandler(file_handler) + logger.addHandler(console_handler) + + # 添加频率限制 + rate_limit_filter = RateLimitFilter() + file_handler.addFilter(rate_limit_filter) + console_handler.addFilter(rate_limit_filter) + diff --git a/src/utils/utils.py b/src/utils/utils.py new file mode 100644 index 0000000..6421b82 --- /dev/null +++ b/src/utils/utils.py @@ -0,0 +1,167 @@ +import re +import os +import json +import time +import csv +from datetime import datetime +from urllib.parse import urlparse +import logging +import src.config.config as config +from urllib.parse import urlparse, urlunparse, parse_qs, urlencode + +update_dir = f'{config.global_host_data_dir}/javdb' + +def is_valid_url(url: str) -> bool: + """检查 URL 是否合法""" + try: + result = urlparse(url) + # 验证是否包含 scheme(如 http/https)和 netloc(如 example.com) + return all([result.scheme, result.netloc]) + except ValueError: + return False + +# 创建目录 +def create_sub_directory(base_dir, str): + # 获取 person 的前两个字母并转为小写 + sub_dir = str[:1].lower() + full_path = os.path.join(base_dir, sub_dir) + if not os.path.exists(full_path): + os.makedirs(full_path) + return full_path + +# 只提取movies url +def extract_id_from_href(href): + # 检查 URL 是否符合要求 + if 'javdb.com/v/' in href: + # 定义正则表达式模式 + pattern = r'javdb.com/v/([^?&]+)' + # 查找匹配项 + match = re.search(pattern, href) + if match: + # 提取匹配的字符串并转换为小写 + result = match.group(1).lower() + return result + return '' + +# 保存抓取到的原始HTML,方便后续核验 +def write_raw_html(href, html_text): + # 获取目录 + id = extract_id_from_href(href) + if 'javdb.com/v/' in href.lower(): + dir_prefix = 'raw_movies' + else: + return + + file_dir = create_sub_directory(f"{update_dir}/{dir_prefix}", id) + file_name = f"{id}.html" # 用 - 替换空格 + full_path = os.path.join(file_dir, file_name) + + try: + with open(full_path, 'w', encoding='utf-8') as file: + file.write(html_text) + except FileNotFoundError: + logging.warning(f"错误:指定的路径 {full_path} 不存在。") + except PermissionError: + logging.warning(f"错误:没有权限写入文件 {full_path}。") + except Exception as e: + logging.warning(f"发生未知错误:{e}") + + +# 保存抓取到的原始HTML,方便后续核验 +def read_raw_html(href, expire_date_str="2025-03-01"): + # 获取目录 + id = extract_id_from_href(href) + if 'javdb.com/v/' in href.lower(): + dir_prefix = 'raw_movies' + else: + return + + file_dir = create_sub_directory(f"{update_dir}/{dir_prefix}", id) + file_name = f"{id}.html" # 用 - 替换空格 + full_path = os.path.join(file_dir, file_name) + + try: + if os.path.exists(full_path): + # 获取文件的最后修改时间 + last_modified_timestamp = os.path.getmtime(full_path) + # 将时间戳转换为 datetime 对象 + last_modified_date = datetime.fromtimestamp(last_modified_timestamp) + # 检查文件最后修改时间是否晚于给定日期 + expire_date = datetime.strptime(expire_date_str, "%Y-%m-%d") + if last_modified_date > expire_date: + logging.debug(f"find local file on href {href}") + with open(full_path, 'r', encoding='utf-8') as file: + return file.read() + else: + logging.debug(f"expired file {last_modified_date} on href {href}") + return None + else: + return None + except FileNotFoundError: + logging.warning(f"错误:指定的路径 {full_path} 不存在。") + except PermissionError: + logging.warning(f"错误:没有权限读取文件 {full_path}。") + except Exception as e: + logging.warning(f"发生未知错误:{e}") + return None + + + +# 去掉 https://www.javdb.com/makers/16w?f=download 后面的参数 +def remove_url_query(url: str) -> str: + try: + parsed_url = urlparse(url) + clean_url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}" + return clean_url + except Exception as e: + print(f"解析 URL 失败: {e}") + return url +# 写csv文件 +def json_to_csv(data, output_file): + if not data: + return + headers = list(data[0].keys()) + with open(f"{update_dir}/{output_file}", 'w', encoding='utf-8', newline='') as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=headers) + writer.writeheader() + for row in data: + writer.writerow(row) + + + +def normalize_url(url: str) -> str: + """ + 标准化URL,移除语言前缀,使不同语言版本的URL保持一致 + + 示例: + https://www.javbus.com/ja/star/p8y → https://www.javbus.com/star/p8y + https://www.javbus.com/en/star/p8y → https://www.javbus.com/star/p8y + """ + try: + # 解析URL + parsed = urlparse(url) + + # 提取路径部分 + path = parsed.path + + # 常见语言代码列表 + LANGUAGES = {'ja', 'en', 'ko', 'zh', 'fr', 'de', 'es', 'ru'} + + # 分割路径为组件 + path_components = path.strip('/').split('/') + + # 如果第一个组件是语言代码,则移除它 + if path_components and path_components[0] in LANGUAGES: + path_components = path_components[1:] + + # 重新构建标准化的路径 + normalized_path = '/' + '/'.join(path_components) + + # 构建标准化的URL(保留协议和域名,替换路径) + normalized_url = parsed._replace(path=normalized_path).geturl() + + return normalized_url + + except Exception as e: + print(f"URL标准化失败: {url}, 错误: {e}") + return url # 出错时返回原始URL \ No newline at end of file