diff --git a/src/crawling/craw.py b/src/crawling/craw.py new file mode 100644 index 0000000..a850a4c --- /dev/null +++ b/src/crawling/craw.py @@ -0,0 +1,593 @@ +import logging +import sys +import requests +from bs4 import BeautifulSoup +from urllib.parse import urljoin +import src.utils.utils as utils + +http_code_404 = 404 +http_code_redirect = 401 +http_code_url = 601 + +# 通用的爬取类,主要实现了底层的网络交互封装 +class GenericCrawler: + def __init__(self, use_cloudscraper=None, headers=None, cookies=None, max_retries=3, html_parser='html.parser'): + if use_cloudscraper is None: + use_cloudscraper = sys.version_info >= (3, 8) + self.use_cloudscraper = use_cloudscraper + self.headers = headers or { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0' + } + self.cookies = cookies or {} + self.scraper = None # 延迟初始化 + self.max_retries = max_retries + self.parser = html_parser + + # 不在这里导入 cloudscraper,而是在需要时导入 + + def _initialize_scraper(self): + """延迟初始化请求客户端,避免不必要的 cloudscraper 导入""" + if self.scraper is not None: + return + + if self.use_cloudscraper: + try: + # 延迟导入 cloudscraper + import cloudscraper + self.scraper = cloudscraper.create_scraper() + logging.info("Using cloudscraper for requests") + except ImportError: + logging.warning("cloudscraper not installed. Falling back to requests.") + self.use_cloudscraper = False + self.scraper = requests.Session() + else: + self.scraper = requests.Session() + logging.info("Using requests for HTTP operations") + + def fetch_page(self, url, validator): + # 在使用前初始化 scraper + self._initialize_scraper() + + for attempt in range(self.max_retries): + try: + if not utils.is_valid_url(url): + logging.error(f'wrong url format: {url}') + return None, http_code_url + + response = self.scraper.get(url, headers=self.headers, cookies=self.cookies) + + # 处理 HTTP 状态码 + if response.status_code == http_code_404: + logging.debug(f"Page not found (404): {url}") + return None, http_code_404 # 直接返回 404,调用方可以跳过 + + response.raise_for_status() # 处理 HTTP 错误 + + # 检查是否发生跳转,比如到登录页面 + if response.history: + logging.debug(f"Page redirected on {url}. Checking if it's a verify page.") + soup = BeautifulSoup(response.text, self.parser) + if self.check_redirect(soup) : + logging.warning(f"Page redirected to verify page on {url}.") + return None, http_code_redirect + + # 判断是否为登录页面 + #if soup.find('div', id='ageVerify'): + + # 预处理 HTML(如果提供了 preprocessor) + html_text = self.preprocessor(response.text) + + soup = BeautifulSoup(html_text, self.parser) + if validator(soup): # 进行自定义页面检查 + return soup, response.status_code + + logging.warning(f"Validation failed on attempt {attempt + 1} for {url}") + except Exception as e: + logging.error(f"Unexpected error on {url}: {e}, Retrying...") + + logging.error(f'Fetching failed after max retries. {url}') + return None, None # 达到最大重试次数仍然失败 + + # 对页面的预处理,通常是修复标签之类的 + def preprocessor(self, html): + return html + + # 检查是否发生了跳转,偏离了正常解析 + def check_redirect(self, soup): + """默认的页面验证器,子类可重写""" + return False # 默认验证通过 + + @staticmethod + def generic_validator(soup, tag, identifier, attr_type="id"): + if attr_type == "id": + return soup.find(tag, id=identifier) is not None + elif attr_type == "class": + return bool(soup.find_all(tag, class_=identifier)) + elif attr_type == "name": + return bool(soup.find('select', {'name': identifier})) + return False + +# javbus.com 网页爬取类 +class JavbusCrawler(GenericCrawler): + def __init__(self, use_cloudscraper=None): + headers = { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Sec-Fetch-Site": "none", + "Accept-Encoding": "gzip, deflate, br", + "Sec-Fetch-Mode": "navigate", + "Host": "www.javbus.com", + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15", + "Accept-Language": "zh-CN,zh-Hans;q=0.9", + "Sec-Fetch-Dest": "document", + "Connection": "keep-alive", + } + + cookies = { + 'PHPSESSID': 'l9m4ugaaao1hgvl3micr22u3o6', + 'existmag': 'all', + 'age': 'verified' + } + super().__init__(use_cloudscraper, headers=headers, cookies=cookies) + self.host_url = "https://www.javbus.com" + + # 以下是原有的解析函数,保持不变 + def parse_actors_list(self, soup, href): + div_actors = soup.find("div", id='waterfall') + if not div_actors: + logging.warning(f"Warning: No actors div found ") + return None, None + + # 解析元素 + rows = div_actors.find_all('div', class_='item') + + list_data = [] + next_url = None + for row in rows: + # 获取演员详情链接 + actor_link = row.find('a')['href'] + # 获取演员名字 + actor_name = row.find('span').text.strip() + # 获取头像图片链接 + avatar_url = row.find('img')['src'] + + list_data.append({ + 'name': actor_name, + 'href': urljoin(self.host_url, actor_link), + 'pic': avatar_url + }) + + # 查找 "下一页" 按钮 + div_link = soup.find("div", class_='text-center hidden-xs') + if div_link: + next_page_element = soup.find('a', id='next') + if next_page_element: + next_page_url = next_page_element['href'] + next_url = urljoin(href, next_page_url) + + return list_data, next_url + + def parse_actor_detail(self, soup, href): + # 先找一下别名 + alias_list = [] + + div_meta = soup.find('span', class_='actor-section-name') + if not div_meta: + logging.warning(f'warning: no meta data found in page {href}') + return None, None + alias_div = soup.find('div', class_='column section-title') + + if alias_div: + meta_list = alias_div.find_all('span', class_='section-meta') + if len(meta_list) > 1: + alias_list = meta_list[0].text.strip().split(", ") + + # 头像 + pic = '' + avatar = soup.find("div", class_="column actor-avatar") + if avatar: + pic = self.parse_avatar_image(avatar) + + # 返回数据 + actor = {} + + # 使用正则表达式查找 class 包含 'movie-list h cols-4' 的 div 元素 + div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-')) + # div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5') + # div_movies = soup.find("div", class_='movie-list h cols-4 vcols-8') + if not div_movies: + logging.warning(f"Warning: No movies div found ") + return None, None + + # 解析元素 + rows = div_movies.find_all('div', class_='item') + + list_data = [] + next_url = None + for row in rows: + link = row.find('a', class_='box')['href'] + serial_number = row.find('strong').text.strip() + title = row.find('div', class_='video-title').text.strip() + release_date = row.find('div', class_='meta').text.strip() + list_data.append({ + 'href': host_url + link if link else '', + 'serial_number': serial_number, + 'title': title, + 'release_date': release_date + }) + + # 查找 "下一页" 按钮 + next_page_element = soup.find('a', class_='pagination-next') + if next_page_element: + next_page_url = next_page_element['href'] + next_page_number = self.url_page_num(next_page_url) + current_page_number = self.url_page_num(href) + logging.debug(f'current_page: {current_page_number}, next page_num: {next_page_number}') + if current_page_number is None: + current_page_number = 0 + if next_page_number and next_page_number > current_page_number: + next_url = host_url + next_page_url + + actor = { + 'pic': pic, + 'alias': alias_list, + 'movies': list_data + } + + return actor, next_url + + def parse_movie_one(self, soup, keys): + key_strong = soup.find('strong', string=lambda text: text in keys) + if key_strong: + key_span = key_strong.find_next_sibling('span', class_='value') + if key_span: + return key_span.text.strip() + return None + + def parse_movie_val_href(self, soup, keys): + key_strong = soup.find('strong', string=lambda text: text in keys) + if key_strong: + key_span = key_strong.find_next_sibling('span', class_='value') + if key_span: + a_tag = key_span.find('a') + if a_tag: + return a_tag.text.strip(), host_url + a_tag.get('href') + else: + return key_span.text.strip(), None + return None, None + + def parse_movie_arr(self, soup, keys): + key_strong = soup.find('strong', string=lambda text: text in keys) + if key_strong: + key_span = key_strong.find_next_sibling('span', class_='value') + if key_span: + actors = [] + a_tags = key_span.find_all('a') + for a_tag in a_tags: + actors.append({ + 'name': a_tag.text.strip(), + 'href': host_url + a_tag.get('href') + }) + return actors + return [] + + def parse_movie_detail(self, soup, href, title): + div_video = soup.find("div", class_='video-meta-panel') + if not div_video: + logging.warning(f"Warning: No movies div found ") + return None, None + + result = {} + result['href'] = href + result['title'] = title + + # 获取封面图片 + cover_img = soup.select_one('.column-video-cover a') + result['cover_url'] = cover_img['href'] if cover_img else None + + # 获取番号 + result['serial_number'] = self.parse_movie_one(soup, ['番號:', 'ID:']) + result['release_date'] = self.parse_movie_one(soup, ['日期:', 'Released Date:']) + result['duration'] = self.parse_movie_one(soup, ['時長:', 'Duration:']) + + # 获取maker,系列 + result['maker_name'], result['maker_link'] = self.parse_movie_val_href(soup, ['片商:', 'Maker:']) + result['series_name'], result['series_link'] = self.parse_movie_val_href(soup, ['系列:', 'Series:']) + result['pub_name'], result['pub_link'] = self.parse_movie_val_href(soup, ['發行:', 'Publisher:']) + + # 获取演员,tags + result['tags'] = self.parse_movie_arr(soup, ['類別:', 'Tags:']) + result['actors'] = self.parse_movie_arr(soup, ['演員:', 'Actor(s):']) + + return result + + def parse_series_uncensored(self, soup, href): + div_series = soup.find("div", id='series') + if not div_series: + logging.warning(f"Warning: No div_series div found ") + return None, None + + # 解析元素 + rows = div_series.find_all('a', class_='box') + + list_data = [] + next_url = None + for row in rows: + name = row.find('strong').text.strip() + href = row['href'] + div_movies = row.find('span') + movies = 0 + if div_movies: + match = re.search(r'\((\d+)\)', div_movies.text.strip()) + if match: + movies = int(match.group(1)) + + list_data.append({ + 'name': name, + 'href': host_url + href if href else '', + 'movies': movies + }) + + # 查找 "下一页" 按钮 + next_page_element = soup.find('a', class_='pagination-next') + if next_page_element: + next_page_url = next_page_element['href'] + next_page_number = self.url_page_num(next_page_url) + current_page_number = self.url_page_num(href) + if current_page_number is None: + current_page_number = 0 + if next_page_number and next_page_number > current_page_number: + next_url = host_url + next_page_url + + return list_data, next_url + + def parse_series_detail(self, soup, href): + # div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5') + div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)')) + if not div_movies: + logging.warning(f"Warning: No movies div found ") + return [], None + + # 解析元素 + rows = div_movies.find_all('div', class_='item') + + list_data = [] + next_url = None + for row in rows: + link = row.find('a', class_='box')['href'] + serial_number = row.find('strong').text.strip() + title = row.find('div', class_='video-title').text.strip() + release_date = row.find('div', class_='meta').text.strip() + list_data.append({ + 'href': host_url + link if link else '', + 'serial_number': serial_number, + 'title': title, + 'release_date': release_date + }) + + # 查找 "下一页" 按钮 + next_page_element = soup.find('a', class_='pagination-next') + if next_page_element: + next_page_url = next_page_element['href'] + next_page_number = self.url_page_num(next_page_url) + current_page_number = self.url_page_num(href) + if current_page_number is None: + current_page_number = 0 + if next_page_number and next_page_number > current_page_number: + next_url = host_url + next_page_url + + return list_data, next_url + + def parse_makers_uncensored(self, soup, href): + div_series = soup.find("div", id='makers') + if not div_series: + logging.warning(f"Warning: No makers div found ") + return None, None + + # 解析元素 + rows = div_series.find_all('a', class_='box') + + list_data = [] + next_url = None + for row in rows: + name = row.find('strong').text.strip() + href = row['href'] + div_movies = row.find('span') + movies = 0 + if div_movies: + match = re.search(r'\((\d+)\)', div_movies.text.strip()) + if match: + movies = int(match.group(1)) + + list_data.append({ + 'name': name, + 'href': host_url + href if href else '', + 'movies': movies + }) + + # 查找 "下一页" 按钮 + next_page_element = soup.find('a', class_='pagination-next') + if next_page_element: + next_page_url = next_page_element['href'] + next_page_number = self.url_page_num(next_page_url) + current_page_number = self.url_page_num(href) + if current_page_number is None: + current_page_number = 0 + if next_page_number and next_page_number > current_page_number: + next_url = host_url + next_page_url + + return list_data, next_url + + def parse_maker_detail(self, soup, href): + # div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5') + div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)')) + if not div_movies: + logging.warning(f"Warning: No movies div found ") + return [], None + + # 解析元素 + rows = div_movies.find_all('div', class_='item') + + list_data = [] + next_url = None + for row in rows: + link = row.find('a', class_='box')['href'] + serial_number = row.find('strong').text.strip() + title = row.find('div', class_='video-title').text.strip() + release_date = row.find('div', class_='meta').text.strip() + list_data.append({ + 'href': host_url + link if link else '', + 'serial_number': serial_number, + 'title': title, + 'release_date': release_date + }) + + # 查找 "下一页" 按钮 + next_page_element = soup.find('a', class_='pagination-next') + if next_page_element: + next_page_url = next_page_element['href'] + next_page_number = self.url_page_num(next_page_url) + current_page_number = self.url_page_num(href) + if current_page_number is None: + current_page_number = 0 + if next_page_number and next_page_number > current_page_number: + next_url = host_url + next_page_url + + return list_data, next_url + + def parse_publisher_detail(self, soup, href): + # div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5') + div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)')) + if not div_movies: + logging.warning(f"Warning: No movies div found ") + return [], None + + # 解析元素 + rows = div_movies.find_all('div', class_='item') + + list_data = [] + next_url = None + for row in rows: + link = row.find('a', class_='box')['href'] + serial_number = row.find('strong').text.strip() + title = row.find('div', class_='video-title').text.strip() + release_date = row.find('div', class_='meta').text.strip() + list_data.append({ + 'href': host_url + link if link else '', + 'serial_number': serial_number, + 'title': title, + 'release_date': release_date + }) + + # 查找 "下一页" 按钮 + next_page_element = soup.find('a', class_='pagination-next') + if next_page_element: + next_page_url = next_page_element['href'] + next_page_number = self.url_page_num(next_page_url) + current_page_number = self.url_page_num(href) + if current_page_number is None: + current_page_number = 0 + if next_page_number and next_page_number > current_page_number: + next_url = host_url + next_page_url + + return list_data, next_url + + def parse_uncensored(self, soup, href): + # div_movies = soup.find("div", class_='movie-list h cols-4 vcols-8') + div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)')) + if not div_movies: + logging.warning(f"Warning: No movies div found ") + return [], None + + # 解析元素 + rows = div_movies.find_all('div', class_='item') + + list_data = [] + next_url = None + for row in rows: + link = row.find('a', class_='box')['href'] + serial_number = row.find('strong').text.strip() + title = row.find('div', class_='video-title').text.strip() + release_date = row.find('div', class_='meta').text.strip() + list_data.append({ + 'href': host_url + link if link else '', + 'serial_number': serial_number, + 'title': title, + 'release_date': release_date + }) + + # 查找 "下一页" 按钮 + next_page_element = soup.find('a', class_='pagination-next') + if next_page_element: + next_page_url = next_page_element['href'] + next_page_number = self.url_page_num(next_page_url) + current_page_number = self.url_page_num(href) + if current_page_number is None: + current_page_number = 0 + if next_page_number and next_page_number > current_page_number: + next_url = host_url + next_page_url + + return list_data, next_url + + @staticmethod + def pretty_print_json(data, n=10, indent=4, sort_keys=False): + """ + 以美化格式打印数组的前n个元素,其他元素用"..."表示 + + 参数: + - data: 要打印的数据(应为数组) + - n: 要显示的元素数量 + - indent: 缩进空格数 + - sort_keys: 是否按键排序 + """ + try: + # 处理非数组数据 + if not isinstance(data, list): + print(formatted) + return + + # 复制原始数据,避免修改原数组 + data_copy = data.copy() + + # 切片取前n个元素 + first_n_elements = data_copy[:n] + + # 如果数组长度超过n,添加"..."标记 + if len(data) > n: + result = first_n_elements + ["... ({} more elements)".format(len(data) - n)] + else: + result = first_n_elements + + # 格式化输出 + formatted = json.dumps(result, indent=indent, ensure_ascii=False, sort_keys=sort_keys) + print(formatted) + + except TypeError as e: + print(f"错误:无法格式化数据。详情:{e}") + except Exception as e: + print(f"打印时发生意外错误:{e}") + + def test_actor_list(self, url='https://www.javbus.com/uncensored/actresses/1'): + next_url = url + all_data = [] + while next_url: + print(f'fetching page {next_url}') + soup, status_code = self.fetch_page(next_url, partial(self.generic_validator, tag="div", identifier="waterfall", attr_type="id"), + max_retries=1) + if soup: + list_data, next_url = self.parse_actors_list(soup, next_url) + if list_data: + all_data.extend(list_data) + self.pretty_print_json(all_data) + else: + print('get wrong page.') + + if next_url: + print(f"\n\nnext url: {next_url}") + else: + print(f"wrong request. url: {next_url}, status_code: {status_code}") + + break + + def url_page_num(self, url): + # 这里需要根据实际情况实现提取页码的逻辑 + return None diff --git a/src/crawling/craw_common.py b/src/crawling/craw_common.py deleted file mode 100644 index ab17f6b..0000000 --- a/src/crawling/craw_common.py +++ /dev/null @@ -1,71 +0,0 @@ -import logging -import cloudscraper -from bs4 import BeautifulSoup -import src.utils.utils as utils - -# 设置 headers 和 scraper -headers = { - 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0' -} -# 定义 cookie -cookies = { -} -scraper = cloudscraper.create_scraper() - -http_code_404 = 404 -http_code_login = 401 -http_code_local = 99 -logging.getLogger().setLevel(logging.DEBUG) -#使用 CloudScraper 进行网络请求,并执行页面验证,支持不同解析器和预处理 -def fetch_page(url, validator, max_retries=3, parser="html.parser", preprocessor=None, headers=headers, cookies=cookies): - for attempt in range(max_retries): - try: - if not utils.is_valid_url(url): - logging.error(f'wrong url format: {url}') - return None, None - - response = scraper.get(url, headers=headers, cookies=cookies) - - # 处理 HTTP 状态码 - if response.status_code == 404: - logging.debug(f"Page not found (404): {url}") - return None, http_code_404 # 直接返回 404,调用方可以跳过 - - response.raise_for_status() # 处理 HTTP 错误 - - # 检查是否发生跳转,比如到登录页面 - if response.history: - logging.debug(f"Page redirected on {url}. Checking if it's a login page.") - soup = BeautifulSoup(response.text, parser) - # 判断是否为登录页面, - if soup.find('div', id='ageVerify'): - logging.warning(f"Page redirected to login page on {url}.") - return None, http_code_login - - # 预处理 HTML(如果提供了 preprocessor) - html_text = preprocessor(response.text) if preprocessor else response.text - - soup = BeautifulSoup(html_text, parser) - if validator(soup): # 进行自定义页面检查 - return soup, response.status_code - - logging.warning(f"Validation failed on attempt {attempt + 1} for {url}") - except cloudscraper.exceptions.CloudflareChallengeError as e: - logging.error(f"Cloudflare Challenge Error on {url}: {e}, Retring...") - except cloudscraper.exceptions.CloudflareCode1020 as e: - logging.error(f"Access Denied (Error 1020) on {url}: {e}, Retring...") - except Exception as e: - logging.error(f"Unexpected error on {url}: {e}, Retring...") - - logging.error(f'Fetching failed after max retries. {url}') - return None, None # 达到最大重试次数仍然失败 - -# 通用的 HTML 结构验证器 -def generic_validator(soup, tag, identifier, attr_type="id"): - if attr_type == "id": - return soup.find(tag, id=identifier) is not None - elif attr_type == "class": - return bool(soup.find_all(tag, class_=identifier)) - elif attr_type == "name": - return bool(soup.find('select', {'name': identifier})) - return False diff --git a/src/crawling/craw_javbus.py b/src/crawling/craw_javbus.py deleted file mode 100644 index f36d0b8..0000000 --- a/src/crawling/craw_javbus.py +++ /dev/null @@ -1,515 +0,0 @@ -import cloudscraper -import logging -import re -import json -from functools import partial -from urllib.parse import urljoin -import src.config.config as config -import src.utils.utils as utils -import src.crawling.craw_common as scraper - -# 定义基础 URL 和可变参数 -host_url = "https://www.javbus.com" - -headers = { - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", - "Sec-Fetch-Site": "none", - "Accept-Encoding": "gzip, deflate, br", - "Sec-Fetch-Mode": "navigate", - "Host": "www.javbus.com", - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15", - "Accept-Language": "zh-CN,zh-Hans;q=0.9", - "Sec-Fetch-Dest": "document", - "Connection": "keep-alive", -} - -cookies = { - 'PHPSESSID': 'l9m4ugaaao1hgvl3micr22u3o6', - 'existmag': 'all', - 'age': 'verified' -} - -# 解析 HTML 内容,提取需要的数据 -def parse_actors_list(soup, href): - div_actors = soup.find("div", id='waterfall') - if not div_actors: - logging.warning(f"Warning: No actors div found ") - return None, None - - # 解析元素 - rows = div_actors.find_all('div', class_='item') - - list_data = [] - next_url = None - for row in rows: - # 获取演员详情链接 - actor_link = row.find('a')['href'] - # 获取演员名字 - actor_name = row.find('span').text.strip() - # 获取头像图片链接 - avatar_url = row.find('img')['src'] - - list_data.append({ - 'name' : actor_name, - 'href' : urljoin(host_url, actor_link), - 'pic' : avatar_url - }) - - # 查找 "下一页" 按钮 - div_link = soup.find("div", class_='text-center hidden-xs') - if div_link: - next_page_element = soup.find('a', id='next') - if next_page_element: - next_page_url = next_page_element['href'] - next_url = urljoin(href, next_page_url) - - return list_data, next_url - - -# 解析 HTML 内容,提取需要的数据 -def parse_actor_detail(soup, href): - # 先找一下别名 - alias_list = [] - - div_meta = soup.find('span', class_='actor-section-name') - if not div_meta: - logging.warning(f'warning: no meta data found in page {href}') - return None, None - alias_div = soup.find('div', class_='column section-title') - - if alias_div: - meta_list = alias_div.find_all('span', class_='section-meta') - if len(meta_list) > 1: - alias_list = meta_list[0].text.strip().split(", ") - - # 头像 - pic = '' - avatar = soup.find("div", class_="column actor-avatar") - if avatar: - pic = parse_avatar_image(avatar) - - # 返回数据 - actor = {} - - # 使用正则表达式查找 class 包含 'movie-list h cols-4' 的 div 元素 - div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-')) - #div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5') - #div_movies = soup.find("div", class_='movie-list h cols-4 vcols-8') - if not div_movies: - logging.warning(f"Warning: No movies div found ") - return None, None - - # 解析元素 - rows = div_movies.find_all('div', class_='item') - - list_data = [] - next_url = None - for row in rows: - link = row.find('a', class_='box')['href'] - serial_number = row.find('strong').text.strip() - title = row.find('div', class_='video-title').text.strip() - release_date = row.find('div', class_='meta').text.strip() - list_data.append({ - 'href' : host_url + link if link else '', - 'serial_number' : serial_number, - 'title' : title, - 'release_date': release_date - }) - - # 查找 "下一页" 按钮 - next_page_element = soup.find('a', class_='pagination-next') - if next_page_element: - next_page_url = next_page_element['href'] - next_page_number = url_page_num(next_page_url) - current_page_number = url_page_num(href) - logging.debug(f'current_page: {current_page_number}, next page_num: {next_page_number}') - if current_page_number is None: - current_page_number = 0 - if next_page_number and next_page_number > current_page_number : - next_url = host_url + next_page_url - - actor = { - 'pic' : pic, - 'alias' : alias_list, - 'movies' : list_data - } - - return actor, next_url - - -# 解析单个元素 -def parse_movie_one(soup, keys): - key_strong = soup.find('strong', string=lambda text: text in keys) - if key_strong: - key_span = key_strong.find_next_sibling('span', class_='value') - if key_span: - return key_span.text.strip() - return None - -# 解析值和链接 -def parse_movie_val_href(soup, keys): - key_strong = soup.find('strong', string=lambda text: text in keys) - if key_strong: - key_span = key_strong.find_next_sibling('span', class_='value') - if key_span: - a_tag = key_span.find('a') - if a_tag: - return a_tag.text.strip(), host_url + a_tag.get('href') - else: - return key_span.text.strip(), None - return None, None - -# 解析多个值和链接 -def parse_movie_arr(soup, keys): - key_strong = soup.find('strong', string=lambda text: text in keys) - if key_strong: - key_span = key_strong.find_next_sibling('span', class_='value') - if key_span: - actors = [] - a_tags = key_span.find_all('a') - for a_tag in a_tags: - actors.append({ - 'name': a_tag.text.strip(), - 'href': host_url + a_tag.get('href') - }) - return actors - return [] - -# 解析 HTML 内容,提取需要的数据 -def parse_movie_detail(soup, href, title): - div_video = soup.find("div", class_='video-meta-panel') - if not div_video: - logging.warning(f"Warning: No movies div found ") - return None, None - - result = {} - result['href'] = href - result['title'] = title - - # 获取封面图片 - cover_img = soup.select_one('.column-video-cover a') - result['cover_url'] = cover_img['href'] if cover_img else None - - # 获取番号 - result['serial_number'] = parse_movie_one(soup, ['番號:', 'ID:']) - result['release_date'] = parse_movie_one(soup, ['日期:', 'Released Date:']) - result['duration'] = parse_movie_one(soup, ['時長:', 'Duration:']) - - # 获取maker,系列 - result['maker_name'], result['maker_link'] = parse_movie_val_href(soup, ['片商:', 'Maker:']) - result['series_name'], result['series_link'] = parse_movie_val_href(soup, ['系列:', 'Series:']) - result['pub_name'], result['pub_link'] = parse_movie_val_href(soup, ['發行:', 'Publisher:']) - - # 获取演员,tags - result['tags'] = parse_movie_arr(soup, ['類別:', 'Tags:']) - result['actors'] = parse_movie_arr(soup, ['演員:', 'Actor(s):']) - - return result - -# 解析 HTML 内容,提取需要的数据 -def parse_series_uncensored(soup, href): - div_series = soup.find("div", id='series') - if not div_series: - logging.warning(f"Warning: No div_series div found ") - return None, None - - # 解析元素 - rows = div_series.find_all('a', class_='box') - - list_data = [] - next_url = None - for row in rows: - name = row.find('strong').text.strip() - href = row['href'] - div_movies = row.find('span') - movies = 0 - if div_movies: - match = re.search(r'\((\d+)\)', div_movies.text.strip()) - if match: - movies = int(match.group(1)) - - list_data.append({ - 'name' : name, - 'href' : host_url + href if href else '', - 'movies' : movies - }) - - # 查找 "下一页" 按钮 - next_page_element = soup.find('a', class_='pagination-next') - if next_page_element: - next_page_url = next_page_element['href'] - next_page_number = url_page_num(next_page_url) - current_page_number = url_page_num(href) - if current_page_number is None: - current_page_number = 0 - if next_page_number and next_page_number > current_page_number : - next_url = host_url + next_page_url - - return list_data, next_url - - -# 解析 HTML 内容,提取需要的数据 -def parse_series_detail(soup, href): - #div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5') - div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)')) - if not div_movies: - logging.warning(f"Warning: No movies div found ") - return [], None - - # 解析元素 - rows = div_movies.find_all('div', class_='item') - - list_data = [] - next_url = None - for row in rows: - link = row.find('a', class_='box')['href'] - serial_number = row.find('strong').text.strip() - title = row.find('div', class_='video-title').text.strip() - release_date = row.find('div', class_='meta').text.strip() - list_data.append({ - 'href' : host_url + link if link else '', - 'serial_number' : serial_number, - 'title' : title, - 'release_date': release_date - }) - - # 查找 "下一页" 按钮 - next_page_element = soup.find('a', class_='pagination-next') - if next_page_element: - next_page_url = next_page_element['href'] - next_page_number = url_page_num(next_page_url) - current_page_number = url_page_num(href) - if current_page_number is None: - current_page_number = 0 - if next_page_number and next_page_number > current_page_number : - next_url = host_url + next_page_url - - return list_data, next_url - - -# 解析 HTML 内容,提取需要的数据 -def parse_makers_uncensored(soup, href): - div_series = soup.find("div", id='makers') - if not div_series: - logging.warning(f"Warning: No makers div found ") - return None, None - - # 解析元素 - rows = div_series.find_all('a', class_='box') - - list_data = [] - next_url = None - for row in rows: - name = row.find('strong').text.strip() - href = row['href'] - div_movies = row.find('span') - movies = 0 - if div_movies: - match = re.search(r'\((\d+)\)', div_movies.text.strip()) - if match: - movies = int(match.group(1)) - - list_data.append({ - 'name' : name, - 'href' : host_url + href if href else '', - 'movies' : movies - }) - - # 查找 "下一页" 按钮 - next_page_element = soup.find('a', class_='pagination-next') - if next_page_element: - next_page_url = next_page_element['href'] - next_page_number = url_page_num(next_page_url) - current_page_number = url_page_num(href) - if current_page_number is None: - current_page_number = 0 - if next_page_number and next_page_number > current_page_number : - next_url = host_url + next_page_url - - return list_data, next_url - - -# 解析 HTML 内容,提取需要的数据 -def parse_maker_detail(soup, href): - #div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5') - div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)')) - if not div_movies: - logging.warning(f"Warning: No movies div found ") - return [], None - - # 解析元素 - rows = div_movies.find_all('div', class_='item') - - list_data = [] - next_url = None - for row in rows: - link = row.find('a', class_='box')['href'] - serial_number = row.find('strong').text.strip() - title = row.find('div', class_='video-title').text.strip() - release_date = row.find('div', class_='meta').text.strip() - list_data.append({ - 'href' : host_url + link if link else '', - 'serial_number' : serial_number, - 'title' : title, - 'release_date': release_date - }) - - # 查找 "下一页" 按钮 - next_page_element = soup.find('a', class_='pagination-next') - if next_page_element: - next_page_url = next_page_element['href'] - next_page_number = url_page_num(next_page_url) - current_page_number = url_page_num(href) - if current_page_number is None: - current_page_number = 0 - if next_page_number and next_page_number > current_page_number : - next_url = host_url + next_page_url - - return list_data, next_url - -# 解析 HTML 内容,提取需要的数据 -def parse_publisher_detail(soup, href): - #div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5') - div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)')) - if not div_movies: - logging.warning(f"Warning: No movies div found ") - return [], None - - # 解析元素 - rows = div_movies.find_all('div', class_='item') - - list_data = [] - next_url = None - for row in rows: - link = row.find('a', class_='box')['href'] - serial_number = row.find('strong').text.strip() - title = row.find('div', class_='video-title').text.strip() - release_date = row.find('div', class_='meta').text.strip() - list_data.append({ - 'href' : host_url + link if link else '', - 'serial_number' : serial_number, - 'title' : title, - 'release_date': release_date - }) - - # 查找 "下一页" 按钮 - next_page_element = soup.find('a', class_='pagination-next') - if next_page_element: - next_page_url = next_page_element['href'] - next_page_number = url_page_num(next_page_url) - current_page_number = url_page_num(href) - if current_page_number is None: - current_page_number = 0 - if next_page_number and next_page_number > current_page_number : - next_url = host_url + next_page_url - - return list_data, next_url - - -# 解析 HTML 内容,提取需要的数据 -def parse_uncensored(soup, href): - #div_movies = soup.find("div", class_='movie-list h cols-4 vcols-8') - div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)')) - if not div_movies: - logging.warning(f"Warning: No movies div found ") - return [], None - - # 解析元素 - rows = div_movies.find_all('div', class_='item') - - list_data = [] - next_url = None - for row in rows: - link = row.find('a', class_='box')['href'] - serial_number = row.find('strong').text.strip() - title = row.find('div', class_='video-title').text.strip() - release_date = row.find('div', class_='meta').text.strip() - list_data.append({ - 'href' : host_url + link if link else '', - 'serial_number' : serial_number, - 'title' : title, - 'release_date': release_date - }) - - # 查找 "下一页" 按钮 - next_page_element = soup.find('a', class_='pagination-next') - if next_page_element: - next_page_url = next_page_element['href'] - next_page_number = url_page_num(next_page_url) - current_page_number = url_page_num(href) - if current_page_number is None: - current_page_number = 0 - if next_page_number and next_page_number > current_page_number : - next_url = host_url + next_page_url - - return list_data, next_url - - -def pretty_print_json(data, n=10, indent=4, sort_keys=False): - """ - 以美化格式打印数组的前n个元素,其他元素用"..."表示 - - 参数: - - data: 要打印的数据(应为数组) - - n: 要显示的元素数量 - - indent: 缩进空格数 - - sort_keys: 是否按键排序 - """ - try: - # 处理非数组数据 - if not isinstance(data, list): - print(formatted) - return - - # 复制原始数据,避免修改原数组 - data_copy = data.copy() - - # 切片取前n个元素 - first_n_elements = data_copy[:n] - - # 如果数组长度超过n,添加"..."标记 - if len(data) > n: - result = first_n_elements + ["... ({} more elements)".format(len(data) - n)] - else: - result = first_n_elements - - # 格式化输出 - formatted = json.dumps(result, indent=indent, ensure_ascii=False, sort_keys=sort_keys) - print(formatted) - - except TypeError as e: - print(f"错误:无法格式化数据。详情:{e}") - except Exception as e: - print(f"打印时发生意外错误:{e}") - -def test_actor_list(url='https://www.javbus.com/uncensored/actresses/1'): - next_url = url - all_data = [] - while next_url: - print(f'fetching page {next_url}') - soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="waterfall", attr_type="id"),max_retries=1, headers=headers, cookies=cookies) - if soup: - list_data, next_url = parse_actors_list(soup, next_url) - if list_data : - all_data.extend(list_data) - pretty_print_json(all_data) - else: - print('get wrong page.') - - if next_url: - print(f"\n\nnext url: {next_url}") - else: - print(f"wrong request. url: {next_url}, status_code: {status_code}") - - break - -if __name__ == "__main__": - #test_actors_list() - #test_actor() - #test_movie_detail() - #test_series_list() - #test_series_detail() - logging.getLogger().setLevel(logging.DEBUG) - test_actor_list() - test_actor_list('https://www.javbus.com/en/actresses') - \ No newline at end of file diff --git a/src/db_utils/db_common.py b/src/db_utils/db_common.py deleted file mode 100644 index 6f1969e..0000000 --- a/src/db_utils/db_common.py +++ /dev/null @@ -1,121 +0,0 @@ -import sqlite3 -import json -import logging -from datetime import datetime -import src.config.config as config - -# 连接 SQLite 数据库 -DB_PATH = f"{config.global_share_data_dir}/sqlite/shared.db" # 替换为你的数据库文件 - -# 检查 SQLite 版本 -lower_sqlite_version = False -sqlite_version = sqlite3.sqlite_version_info -if sqlite_version < (3, 24, 0): - lower_sqlite_version = True - -# 获取表的列名和默认值 -def get_table_columns_and_defaults(cursor, tbl_name): - try: - cursor.execute(f"PRAGMA table_info({tbl_name})") - columns = cursor.fetchall() - column_info = {} - for col in columns: - col_name = col[1] - default_value = col[4] - column_info[col_name] = default_value - return column_info - except sqlite3.Error as e: - logging.error(f"Error getting table columns: {e}") - return None - -# 检查并处理数据 -def check_and_process_data(cursor, data, tbl_name): - column_info = get_table_columns_and_defaults(cursor=cursor, tbl_name=tbl_name) - if column_info is None: - return None - processed_data = {} - for col, default in column_info.items(): - if col == 'id' or col == 'created_at': # 自增主键,不需要用户提供; 创建日期,使用建表默认值 - continue - if col == 'updated_at': # 日期函数,用户自己指定即可 - processed_data[col] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - if col in data: - processed_data[col] = data[col] - - return processed_data - - -# 插入或更新数据 -def insert_or_update_common(cursor, conn, data, tbl_name, uniq_key='url'): - if lower_sqlite_version: - return insert_or_update_common_lower(cursor, conn, data, tbl_name, uniq_key) - - try: - processed_data = check_and_process_data(cursor, data, tbl_name) - if processed_data is None: - return None - - columns = ', '.join(processed_data.keys()) - values = list(processed_data.values()) - placeholders = ', '.join(['?' for _ in values]) - update_clause = ', '.join([f"{col}=EXCLUDED.{col}" for col in processed_data.keys() if col != uniq_key]) - - sql = f''' - INSERT INTO {tbl_name} ({columns}) - VALUES ({placeholders}) - ON CONFLICT ({uniq_key}) DO UPDATE SET {update_clause} - ''' - cursor.execute(sql, values) - conn.commit() - - # 获取插入或更新后的 report_id - cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],)) - report_id = cursor.fetchone()[0] - return report_id - except sqlite3.Error as e: - logging.error(f"Error inserting or updating data: {e}") - return None - -# 插入或更新数据 -def insert_or_update_common_lower(cursor, conn, data, tbl_name, uniq_key='url'): - try: - processed_data = check_and_process_data(cursor, data, tbl_name) - if processed_data is None: - return None - - columns = ', '.join(processed_data.keys()) - values = list(processed_data.values()) - placeholders = ', '.join(['?' for _ in values]) - - # 先尝试插入数据 - try: - sql = f''' - INSERT INTO {tbl_name} ({columns}) - VALUES ({placeholders}) - ''' - cursor.execute(sql, values) - conn.commit() - except sqlite3.IntegrityError: # 唯一键冲突,执行更新操作 - update_clause = ', '.join([f"{col}=?" for col in processed_data.keys() if col != uniq_key]) - update_values = [processed_data[col] for col in processed_data.keys() if col != uniq_key] - update_values.append(data[uniq_key]) - sql = f"UPDATE {tbl_name} SET {update_clause} WHERE {uniq_key} = ?" - cursor.execute(sql, update_values) - conn.commit() - - # 获取插入或更新后的 report_id - cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],)) - report_id = cursor.fetchone()[0] - return report_id - except sqlite3.Error as e: - logging.error(f"Error inserting or updating data: {e}") - return None - - -# 测试代码 -if __name__ == "__main__": - conn = sqlite3.connect(DB_PATH, check_same_thread=False) - cursor = conn.cursor() - - tbl_name_actors = 'javhd_models' - print(get_table_columns_and_defaults(cursor, tbl_name_actors)) diff --git a/src/db_utils/db_javbus.py b/src/db_utils/db_javbus.py deleted file mode 100644 index 6c68a84..0000000 --- a/src/db_utils/db_javbus.py +++ /dev/null @@ -1,1036 +0,0 @@ -import sqlite3 -import json -import logging -from datetime import datetime -import src.config.config as config -import src.db_utils.db_common as db_comm - -# 连接 SQLite 数据库 -conn = sqlite3.connect(db_comm.DB_PATH, check_same_thread=False) -cursor = conn.cursor() - -cached_tags = {} -tbl_name_actors = 'javbus_actors' - -# 插入books表,并判断是否需要更新 -def insert_actor_index(data, uncensored=0, from_actor_list=0, from_movie_list=0): - data['uncensored'] = uncensored - if from_actor_list: - data['from_actor_list'] = from_actor_list - if from_movie_list: - data['from_movie_list'] = from_movie_list - try: - return db_comm.insert_or_update_common(cursor, conn, data, tbl_name_actors, uniq_key='href') - except sqlite3.Error as e: - logging.error(f"Error inserting or updating data: {e}") - return None - -# 更新详细信息 -def update_actor_detail(data, is_full_data=1): - try: - data['is_full_data'] = is_full_data - - return db_comm.insert_or_update_common(cursor, data, conn, tbl_name_actors, uniq_key='href') - - except sqlite3.Error as e: - logging.error(f"Error inserting or updating data: {e}") - return None - -# 查询 -def query_actors(**filters): - try: - sql = f"SELECT url, en_name as name FROM {tbl_name_actors} WHERE 1=1" - params = [] - - conditions = { - "id": " AND id = ?", - "url": " AND href = ?", - "en_name": " AND name LIKE ?", - "is_full_data": " AND is_full_data = ?", - "start_id": " AND id > ?", - } - - for key, condition in conditions.items(): - if key in filters: - sql += condition - if key == "en_name": - params.append(f"%{filters[key]}%") - else: - params.append(filters[key]) - - for key in ["is_full_data_in", "is_full_data_not_in"]: - if key in filters: - values = filters[key] - if values: - placeholders = ", ".join(["?"] * len(values)) - operator = "IN" if key == "is_full_data_in" else "NOT IN" - sql += f" AND is_full_data {operator} ({placeholders})" - params.extend(values) - - if "order_by" in filters: - # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 - sql += f" ORDER BY {filters['order_by']} " - - if 'limit' in filters: - sql += " LIMIT ?" - params.append(filters["limit"]) - - cursor.execute(sql, params) - #return [row[0].lower() for row in cursor.fetchall()] # 返回小写 - return [{'url': row[0], 'name': row[1]} for row in cursor.fetchall()] - - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None - - - - -# """从指定表中通过 href 查找 id""" -def get_id_by_href(table: str, href: str) -> int: - if href is None: - return None - cursor.execute(f"SELECT id FROM {table} WHERE href = ?", (href,)) - row = cursor.fetchone() - return row[0] if row else None - -def insert_movie_index(title, href, **kwargs): - try: - # 先检查数据库中是否已有该电影 - cursor.execute("SELECT * FROM javdb_movies WHERE href = ?", (href,)) - existing_movie = cursor.fetchone() - - # 获取列名 - column_names = [description[0] for description in cursor.description] - - fields = [ - 'from_actor_list', 'from_movie_makers', 'from_movie_series', 'from_movie_publishers', - 'maker_id', 'series_id', 'pub_id', 'uncensored' - ] - - if existing_movie: # 如果电影已存在 - existing_values = dict(zip(column_names, existing_movie)) - movie_id = existing_values['id'] - logging.debug(f"values in db: {existing_values}") - - # 如果没有传入值,就用原来的值 - for field in fields: - kwargs[field] = kwargs.get(field) if kwargs.get(field) is not None else existing_values[field] - - set_clauses = ", ".join([f"{field} = ?" for field in fields]) - sql = f""" - UPDATE javdb_movies - SET title = ?, {set_clauses}, updated_at = datetime('now', 'localtime') - WHERE href = ? - """ - values = [title] + [kwargs[field] for field in fields] + [href] - logging.debug(f"sql: {sql}, values: {values}") - cursor.execute(sql, values) - else: # 如果电影不存在,插入 - columns = ', '.join(['title', 'href'] + fields) - placeholders = ', '.join(['?'] * (len(fields) + 2)) - sql = f"INSERT INTO javdb_movies ({columns}) VALUES ({placeholders})" - values = [title, href] + [kwargs.get(field, 0) for field in fields] - logging.debug(f"sql: {sql}, values: {values}") - cursor.execute(sql, values) - - conn.commit() - - movie_id = get_id_by_href('javdb_movies', href) - if movie_id: - logging.debug(f'Inserted/Updated movie index, id: {movie_id}, title: {title}, href: {href}') - - return movie_id - - except Exception as e: - conn.rollback() - logging.error(f"Error inserting/updating movie: {e}") - return None - - -# 插入演员和电影的关联数据 -def insert_actor_movie(performer_id, movie_id, tags=''): - try: - cursor.execute(""" - INSERT INTO javdb_actors_movies (actor_id, movie_id, tags, updated_at) - VALUES (?, ?, ?, datetime('now', 'localtime')) - ON CONFLICT(actor_id, movie_id) DO UPDATE SET tags=excluded.tags, updated_at=datetime('now', 'localtime') - """, - (performer_id, movie_id, tags) - ) - conn.commit() - - #logging.debug(f'insert one performer_movie, performer_id: {performer_id}, movie_id: {movie_id}') - - return performer_id - - except Exception as e: - conn.rollback() - logging.error("Error inserting movie: %s", e) - return None - -# 插入演员数据 -def insert_or_update_actor(actor): - try: - cursor.execute(''' - INSERT INTO javdb_actors (name, href, pic, is_full_data, updated_at) - VALUES (?, ?, ?, 1, datetime('now', 'localtime')) - ON CONFLICT(href) DO UPDATE SET name=excluded.name, pic=excluded.pic, is_full_data=1, updated_at=datetime('now', 'localtime') - ''', (actor['name'], actor['href'], actor['pic'])) - - conn.commit() - - # 查询刚插入的数据 - cursor.execute('SELECT id, from_actor_list FROM javdb_actors WHERE href = ?', (actor['href'],)) - actor_id, uncensored = cursor.fetchone() - if actor_id is None: - logging.warning(f'insert data error. name: {actor['name']}, href: {actor['href']}') - return None - - logging.debug(f'insert one actor, id: {actor_id}, name: {actor['name']}, href: {actor['href']}') - - # 插入别名 - for alias in actor.get("alias") or []: - cursor.execute(''' - INSERT OR IGNORE INTO javdb_actors_alias (actor_id, alias, updated_at) - VALUES (?, ?, datetime('now', 'localtime')) - ''', (actor_id, alias)) - - conn.commit() - - # 插入影片列表 - for movie in actor.get("credits") or []: - # from_actor_list = 1 表示无码影星的,其他不处理 - if uncensored and uncensored > 0: - movie_id = insert_movie_index(movie['title'], movie['href'], from_actor_list=1, uncensored=uncensored) - else: - movie_id = insert_movie_index(movie['title'], movie['href'], from_actor_list=1) - if movie_id: - tmp_id = insert_actor_movie(actor_id, movie_id) - if tmp_id : - logging.debug(f'insert one performer_movie, performer_id: {actor_id}, movie_id: {movie_id}') - else: - logging.warning(f'insert performer_movie failed. performer_id: {actor_id}, moive href: {movie['href']}') - - return actor_id - except Exception as e: - logging.error(f"插入/更新演员 {actor['name']} 失败: {e}") - conn.rollback() - -# """插入或更新电影数据(异常url的处理,比如404链接)""" -def insert_or_update_actor_404(name, href, is_full_data=1): - try: - # 插入或更新电影信息 - cursor.execute( - """ - INSERT INTO javdb_actors (name, href, is_full_data, updated_at) - VALUES (?, ?, ?, datetime('now', 'localtime')) - ON CONFLICT(href) DO UPDATE SET - name=excluded.name, is_full_data=excluded.is_full_data, updated_at = datetime('now', 'localtime') - """, - (name, href, is_full_data) - ) - conn.commit() - - # 获取插入的 movie_id - actor_id = get_id_by_href('javdb_actors', href) - if actor_id is None: - return None - - return actor_id - - except Exception as e: - conn.rollback() - logging.error("Error inserting movie: %s", e) - return None - - -# 删除演员 -def delete_actor_by_href(href): - try: - cursor.execute('DELETE FROM javdb_actors WHERE href = ?', (href,)) - conn.commit() - logging.info(f"成功删除演员: {href}") - except Exception as e: - logging.error(f"删除演员 {href} 失败: {e}") - conn.rollback() - -# 查询 -def query_actors(**filters): - try: - sql = "SELECT href, name FROM javdb_actors WHERE 1=1" - params = [] - - conditions = { - "id": " AND id = ?", - "href": " AND href = ?", - "name": " AND name LIKE ?", - "is_full_data": " AND is_full_data = ?", - "from_actor_list": " AND from_actor_list = ?", - "before_updated_at": " AND updated_at <= ?", - "after_updated_at": " AND updated_at >= ?", - "start_id": " AND id > ?", - } - - for key, condition in conditions.items(): - if key in filters: - sql += condition - if key == "name": - params.append(f"%{filters[key]}%") - else: - params.append(filters[key]) - - for key in ["is_full_data_in", "is_full_data_not_in"]: - if key in filters: - values = filters[key] - if values: - placeholders = ", ".join(["?"] * len(values)) - operator = "IN" if key == "is_full_data_in" else "NOT IN" - sql += f" AND is_full_data {operator} ({placeholders})" - params.extend(values) - - if "order_by" in filters: - # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 - sql += f" ORDER BY {filters['order_by']} " - - if 'limit' in filters: - sql += " LIMIT ?" - params.append(filters["limit"]) - - cursor.execute(sql, params) - #return [row[0].lower() for row in cursor.fetchall()] # 返回小写 - return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()] - - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None - - -# 插入或更新发行商 """ -def insert_or_update_makers(data, caller='list'): - try: - if caller == 'list': - cursor.execute(""" - INSERT INTO javdb_makers (name, href, from_list, updated_at) - VALUES (?, ? , 1, datetime('now', 'localtime')) - ON CONFLICT(href) DO UPDATE SET - name = excluded.name, - from_list = 1, - updated_at = datetime('now', 'localtime') - """, (data["name"], data["href"])) - conn.commit() - elif caller == 'movie': - cursor.execute(""" - INSERT INTO javdb_makers (name, href, from_movie_list, updated_at) - VALUES (?, ? , 1, datetime('now', 'localtime')) - ON CONFLICT(href) DO UPDATE SET - name = excluded.name, - from_movie_list = 1, - updated_at = datetime('now', 'localtime') - """, (data["name"], data["href"])) - conn.commit() - else: - logging.warning(f"unexpected caller: {caller}") - return None - - # 获取 performer_id - cursor.execute("SELECT id FROM javdb_makers WHERE href = ?", (data["href"],)) - dist_id = cursor.fetchone()[0] - if dist_id: - logging.debug(f"成功插入/更新发行商: {data['name']}") - return dist_id - else: - return None - except sqlite3.Error as e: - conn.rollback() - logging.error(f"数据库错误: {e}") - return None - -# 删除发行商(按 id 或 name) """ -def delete_maker(identifier): - try: - if isinstance(identifier, int): - cursor.execute("DELETE FROM javdb_makers WHERE id = ?", (identifier,)) - elif isinstance(identifier, str): - cursor.execute("DELETE FROM javdb_makers WHERE name = ?", (identifier,)) - conn.commit() - logging.info(f"成功删除发行商: {identifier}") - except sqlite3.Error as e: - conn.rollback() - logging.error(f"删除失败: {e}") - -# 查询发行商(按 id 或 name) """ -def query_maker(identifier): - try: - if isinstance(identifier, int): - cursor.execute("SELECT * FROM javdb_makers WHERE id = ?", (identifier,)) - else: - cursor.execute("SELECT * FROM javdb_makers WHERE name LIKE ?", (f"%{identifier}%",)) - - distributor = cursor.fetchone() - if distributor: - return dict(zip([desc[0] for desc in cursor.description], distributor)) - else: - logging.warning(f"未找到发行商: {identifier}") - return None - except sqlite3.Error as e: - logging.error(f"查询失败: {e}") - return None - -# 按条件查询 href 列表 -def query_maker_hrefs(**filters): - try: - sql = "SELECT href, id, from_list FROM javdb_makers WHERE 1=1" - params = [] - - if "id" in filters: - sql += " AND id = ?" - params.append(filters["id"]) - if "from_list" in filters: - sql += " AND from_list = ?" - params.append(filters["from_list"]) - if "url" in filters: - sql += " AND href = ?" - params.append(filters["href"]) - if "name" in filters: - sql += " AND name LIKE ?" - params.append(f"%{filters['name']}%") - if 'limit' in filters: - sql += " limit ?" - params.append(filters["limit"]) - - cursor.execute(sql, params) - #return [row[0] for row in cursor.fetchall()] # 链接使用小写 - return [{'href': row[0], 'id': row[1], 'from_list':row[2]} for row in cursor.fetchall()] - - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None - -# """ 插入或更新制作公司 """ -def insert_or_update_series(data, caller='list'): - try: - if caller == 'list': - cursor.execute(""" - INSERT INTO javdb_series (name, href, from_list, updated_at) - VALUES (?, ? , 1, datetime('now', 'localtime')) - ON CONFLICT(href) DO UPDATE SET - name = excluded.name, - from_list = 1, - updated_at = datetime('now', 'localtime') - """, (data["name"], data["href"])) - conn.commit() - elif caller == 'movie': - cursor.execute(""" - INSERT INTO javdb_series (name, href, from_movie_list, updated_at) - VALUES (?, ? , 1, datetime('now', 'localtime')) - ON CONFLICT(href) DO UPDATE SET - name = excluded.name, - from_movie_list = 1, - updated_at = datetime('now', 'localtime') - """, (data["name"], data["href"])) - conn.commit() - else: - logging.warning(f"unexpected caller: {caller}") - return None - - # 获取 performer_id - cursor.execute("SELECT id FROM javdb_series WHERE href = ?", (data["href"],)) - stu_id = cursor.fetchone()[0] - if stu_id: - logging.debug(f"成功插入/更新发行商: {data['name']}") - return stu_id - else: - return None - except sqlite3.Error as e: - conn.rollback() - logging.error(f"数据库错误: {e}") - return None - -# """ 删除制作公司(按 id 或 name) """ -def delete_series(identifier): - try: - if isinstance(identifier, int): - cursor.execute("DELETE FROM javdb_series WHERE id = ?", (identifier,)) - elif isinstance(identifier, str): - cursor.execute("DELETE FROM javdb_series WHERE name = ?", (identifier,)) - conn.commit() - logging.info(f"成功删除制作公司: {identifier}") - except sqlite3.Error as e: - conn.rollback() - logging.error(f"删除失败: {e}") - -# """ 查询制作公司(按 id 或 name) """ -def query_series(identifier): - try: - if isinstance(identifier, int): - cursor.execute("SELECT * FROM javdb_series WHERE id = ?", (identifier,)) - else: - cursor.execute("SELECT * FROM javdb_series WHERE name LIKE ?", (f"%{identifier}%",)) - - studio = cursor.fetchone() - if studio: - return dict(zip([desc[0] for desc in cursor.description], studio)) - else: - logging.warning(f"未找到制作公司: {identifier}") - return None - except sqlite3.Error as e: - logging.error(f"查询失败: {e}") - return None - -# 按条件查询 href 列表 -def query_series_hrefs(**filters): - try: - sql = "SELECT href, id, from_list FROM javdb_series WHERE 1=1" - params = [] - - if "id" in filters: - sql += " AND id = ?" - params.append(filters["id"]) - if "from_list" in filters: - sql += " AND from_list = ?" - params.append(filters["from_list"]) - if "href" in filters: - sql += " AND href = ?" - params.append(filters["href"]) - if "name" in filters: - sql += " AND name LIKE ?" - params.append(f"%{filters['name']}%") - if 'limit' in filters: - sql += " limit ?" - params.append(filters["limit"]) - - cursor.execute(sql, params) - #return [row[0] for row in cursor.fetchall()] # 链接使用小写 - #return [{'href': row[0], 'id': row[1]} for row in cursor.fetchall()] - return [{'href': row[0], 'id': row[1], 'from_list':row[2]} for row in cursor.fetchall()] - - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None - -# 插入或更新发行商 """ -def insert_or_update_publishers(data, caller='list'): - try: - if caller == 'list': - cursor.execute(""" - INSERT INTO javdb_publishers (name, href, from_list, updated_at) - VALUES (?, ? , 1, datetime('now', 'localtime')) - ON CONFLICT(href) DO UPDATE SET - name = excluded.name, - from_list = 1, - updated_at = datetime('now', 'localtime') - """, (data["name"], data["href"])) - conn.commit() - elif caller == 'movie': - cursor.execute(""" - INSERT INTO javdb_publishers (name, href, from_movie_list, updated_at) - VALUES (?, ? , 1, datetime('now', 'localtime')) - ON CONFLICT(href) DO UPDATE SET - name = excluded.name, - from_movie_list = 1, - updated_at = datetime('now', 'localtime') - """, (data["name"], data["href"])) - conn.commit() - else: - logging.warning(f"unexpected caller: {caller}") - return None - - # 获取 performer_id - cursor.execute("SELECT id FROM javdb_publishers WHERE href = ?", (data["href"],)) - dist_id = cursor.fetchone()[0] - if dist_id: - logging.debug(f"成功插入/更新发行商: {data['name']}") - return dist_id - else: - return None - except sqlite3.Error as e: - conn.rollback() - logging.error(f"数据库错误: {e}") - return None - -# 删除发行商(按 id 或 name) """ -def delete_publishers(identifier): - try: - if isinstance(identifier, int): - cursor.execute("DELETE FROM javdb_publishers WHERE id = ?", (identifier,)) - elif isinstance(identifier, str): - cursor.execute("DELETE FROM javdb_publishers WHERE name = ?", (identifier,)) - conn.commit() - logging.info(f"成功删除发行商: {identifier}") - except sqlite3.Error as e: - conn.rollback() - logging.error(f"删除失败: {e}") - -# 查询发行商(按 id 或 name) """ -def query_publishers(identifier): - try: - if isinstance(identifier, int): - cursor.execute("SELECT * FROM javdb_publishers WHERE id = ?", (identifier,)) - else: - cursor.execute("SELECT * FROM javdb_publishers WHERE name LIKE ?", (f"%{identifier}%",)) - - distributor = cursor.fetchone() - if distributor: - return dict(zip([desc[0] for desc in cursor.description], distributor)) - else: - logging.warning(f"未找到发行商: {identifier}") - return None - except sqlite3.Error as e: - logging.error(f"查询失败: {e}") - return None - -# 按条件查询 href 列表 -def query_publishers_hrefs(**filters): - try: - sql = "SELECT href, id FROM javdb_publishers WHERE 1=1" - params = [] - - if "id" in filters: - sql += " AND id = ?" - params.append(filters["id"]) - if "from_list" in filters: - sql += " AND from_list = ?" - params.append(filters["from_list"]) - if "url" in filters: - sql += " AND href = ?" - params.append(filters["href"]) - if "name" in filters: - sql += " AND name LIKE ?" - params.append(f"%{filters['name']}%") - if 'limit' in filters: - sql += " limit ?" - params.append(filters["limit"]) - - cursor.execute(sql, params) - #return [row[0] for row in cursor.fetchall()] # 链接使用小写 - return [{'href': row[0], 'id': row[1]} for row in cursor.fetchall()] - - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None - - -# 插入或更新类别 """ -def insert_or_update_tags(name, href): - try: - if href in cached_tags: - return cached_tags[href]['id'] - - cursor.execute(""" - INSERT INTO javdb_tags (name, href, updated_at) - VALUES (?, ? , datetime('now', 'localtime')) - ON CONFLICT(href) DO UPDATE SET - name = excluded.name, - updated_at = datetime('now', 'localtime') - """, (name, href)) - conn.commit() - - cursor.execute("SELECT id, name, href FROM javdb_tags") - for row in cursor.fetchall(): - cached_tags[row[2]] = {'id': row[0], 'name':row[2]} - - if href in cached_tags: - dist_id = cached_tags[href]['id'] - logging.debug(f"insert/update tags succ. id: {dist_id}, name: {name}") - return dist_id - else: - return None - except sqlite3.Error as e: - conn.rollback() - logging.error(f"数据库错误: {e}") - return None - -# 查询tags -def query_tags(href, name): - global cached_tags - try: - if href not in cached_tags: - cursor.execute("SELECT id, name, href FROM javdb_tags") - for row in cursor.fetchall(): - cached_tags[row[2]] = {'id': row[0], 'name':row[2]} - - if href in cached_tags: - return cached_tags[href]['id'], cached_tags[href]['name'] - except sqlite3.Error as e: - logging.error(f"查询失败: {e}") - return 0, name - -# 插入影片和tags的关联数据 -def insert_movie_tags( movie_id, tag_id, tags=''): - try: - cursor.execute(""" - INSERT INTO javdb_movies_tags (movie_id, tag_id, tags, updated_at) - VALUES (?, ?, ?, datetime('now', 'localtime')) - ON CONFLICT(tag_id, movie_id) DO UPDATE SET tags=excluded.tags, updated_at=datetime('now', 'localtime') - """, - (movie_id, tag_id, tags) - ) - conn.commit() - - #logging.debug(f'insert one performer_movie, performer_id: {performer_id}, movie_id: {movie_id}') - - return movie_id - - except Exception as e: - conn.rollback() - logging.error("Error inserting movie: %s", e) - return None - -# """插入或更新电影数据""" -def insert_or_update_movie(movie): - try: - # 获取相关 ID - makers_id = get_id_by_href('javdb_makers', movie['maker_link']) if movie['maker_link'] else None - series_id = get_id_by_href('javdb_series', movie['series_link']) if movie['series_link'] else None - pub_id = get_id_by_href('javdb_publishers', movie['pub_link']) if movie['pub_link'] else None - - # 如果不存在,插入 - if makers_id is None and movie['maker_link']: - makers_id = insert_or_update_makers({'name' : movie.get('maker_name', ''), 'href' : movie.get('maker_link', '')}, caller='movie') - if series_id is None and movie['series_link']: - series_id = insert_or_update_series({'name' : movie.get('series_name', ''), 'href' : movie.get('series_link', '')}, caller='movie') - if pub_id is None and movie['pub_link']: - pub_id = insert_or_update_publishers({'name' : movie.get('pub_name', ''), 'href' : movie.get('pub_link', '')}, caller='movie') - - cursor.execute(""" - INSERT INTO javdb_movies (href, title, cover_url, serial_number, release_date, duration, - maker_id, series_id, pub_id, is_full_data, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, datetime('now', 'localtime')) - ON CONFLICT(href) DO UPDATE SET - title=excluded.title, - cover_url=excluded.cover_url, - serial_number=excluded.serial_number, - release_date=excluded.release_date, - duration=excluded.duration, - maker_id=excluded.maker_id, - series_id=excluded.series_id, - pub_id=excluded.pub_id, - is_full_data=1, - updated_at=datetime('now', 'localtime') - """, (movie['href'], movie['title'], movie['cover_url'], movie['serial_number'], - movie['release_date'], movie['duration'], makers_id, series_id, pub_id)) - - conn.commit() - - # 获取插入的 movie_id - movie_id = get_id_by_href('javdb_movies', movie['href']) - if movie_id is None: - return None - - logging.debug(f"insert one move, id: {movie_id}, title: {movie['title']}, href: {movie['href']}") - - # 插入 performers_movies 关系表 - for performer in movie.get('actors', []): - performer_id = get_id_by_href('javdb_actors', performer['href']) - # 如果演员不存在,先插入 - if performer_id is None: - performer_id = insert_actor_index(performer['name'], performer['href'], from_movie_list=1) - logging.debug(f"insert new perfomer. perfomer_id: {performer_id}, name:{performer['name']}") - if performer_id: - tmp_id = insert_actor_movie(performer_id, movie_id) - if tmp_id: - logging.debug(f"insert one perfomer_movie. perfomer_id: {performer_id}, movie_id:{movie_id}") - else: - logging.debug(f"insert perfomer_movie failed. perfomer_id: {performer_id}, movie_id:{movie_id}") - else: - logging.warning(f"insert perfomer failed. name: {performer['name']}, href: {performer['href']}") - - # 插入 tags 表 - for tag in movie.get('tags', []): - tag_name = tag.get('name', '') - tag_href = tag.get('href', '') - tag_id = insert_or_update_tags(tag_name, tag_href) - if tag_id: - logging.debug(f"insert one tags. tag_id: {tag_id}, name: {tag_name}") - tmp_id = insert_movie_tags(movie_id=movie_id, tag_id=tag_id, tags=tag_name) - if tmp_id: - logging.debug(f"insert one movie_tag. movie_id: {movie_id}, tag_id: {tag_id}, name: {tag_name}") - else: - logging.warning(f"insert one movie_tag error. movie_id: {movie_id}, tag_id: {tag_id}, name: {tag_name}") - else: - logging.warning(f"insert tags error. name:{tag_name}, href: {tag_href}") - - return movie_id - - except Exception as e: - conn.rollback() - logging.error("Error inserting movie: %s", e) - return None - -# """插入或更新电影数据(异常url的处理,比如404链接)""" -def insert_or_update_movie_404(title, href, is_full_data=1): - try: - # 插入或更新电影信息 - cursor.execute( - """ - INSERT INTO javdb_movies (title, href, is_full_data, updated_at) - VALUES (?, ?, ?, datetime('now', 'localtime')) - ON CONFLICT(href) DO UPDATE SET - title=excluded.title, is_full_data=excluded.is_full_data, updated_at = datetime('now', 'localtime') - """, - (title, href, is_full_data) - ) - conn.commit() - - # 获取插入的 movie_id - movie_id = get_id_by_href('javdb_movies', href) - if movie_id is None: - return None - - return movie_id - - except Exception as e: - conn.rollback() - logging.error("Error inserting movie: %s", e) - return None - - -# 删除电影数据""" -def delete_movie(identifier): - try: - if isinstance(identifier, int): - cursor.execute("DELETE FROM javdb_movies WHERE id = ?", (identifier,)) - elif isinstance(identifier, str): - cursor.execute("DELETE FROM javdb_movies WHERE href = ?", (identifier,)) - else: - logging.warning("无效的删除参数") - return - conn.commit() - logging.info(f"Deleted movie with {identifier}") - - except sqlite3.Error as e: - conn.rollback() - logging.error("Error deleting movie: %s", e) - -# 查找电影数据""" -def query_movies(identifier): - try: - if isinstance(identifier, int): - cursor.execute("SELECT * FROM javdb_movies WHERE id = ?", (identifier,)) - elif "http" in identifier: - cursor.execute("SELECT * FROM javdb_movies WHERE href = ?", (identifier,)) - else: - cursor.execute("SELECT * FROM javdb_movies WHERE title LIKE ?", (f"%{identifier}%",)) - - movie = cursor.fetchone() - if movie: - cursor.execute("SELECT * FROM javdb_actors_movies WHERE performer_id = ?", (movie[0],)) - performers = [row[0] for row in cursor.fetchall()] - result = dict(zip([desc[0] for desc in cursor.description], performers)) - result["performers"] = performers - return result - else: - logging.warning(f"find no data: {identifier}") - return None - - except sqlite3.Error as e: - logging.error(f"查询失败: {e}") - return None -''' -# 按条件查询 href 列表 -def query_movie_hrefs_old(**filters): - try: - sql = "SELECT href, title, id FROM javdb_movies WHERE 1=1" - params = [] - - if "id" in filters: - sql += " AND id = ?" - params.append(filters["id"]) - if "href" in filters: - sql += " AND href = ?" - params.append(filters["href"]) - if "title" in filters: - sql += " AND title LIKE ?" - params.append(f"%{filters['title']}%") - if "is_full_data" in filters: - sql += " AND is_full_data = ?" - params.append(filters["is_full_data"]) - if "from_actor_list" in filters: - sql += " AND from_actor_list = ?" - params.append(filters["from_actor_list"]) - if "is_full_data_in" in filters: - values = filters["is_full_data_in"] - if values: - placeholders = ", ".join(["?"] * len(values)) - sql += f" AND is_full_data IN ({placeholders})" - params.extend(values) - if "is_full_data_not_in" in filters: - values = filters["is_full_data_not_in"] - if values: - placeholders = ", ".join(["?"] * len(values)) - sql += f" AND is_full_data NOT IN ({placeholders})" - params.extend(values) - if "before_updated_at" in filters: - sql += " AND updated_at <= ?" - params.append(filters["before_updated_at"]) - if "after_updated_at" in filters: - sql += " AND updated_at >= ?" - params.append(filters["after_updated_at"]) - if "start_id" in filters: - sql += " AND id > ?" - params.append(filters["start_id"]) - if "order_by" in filters: - sql += " order by ?" - params.append(filters["order_by"]) - if 'limit' in filters: - sql += " limit ?" - params.append(filters["limit"]) - - cursor.execute(sql, params) - #return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写 - return [{'href': row[0], 'title': row[1], 'id':row[2]} for row in cursor.fetchall()] - - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return [] -''' -# 查询 -def query_movie_hrefs(**filters): - try: - sql = "SELECT href, title, id FROM javdb_movies WHERE 1=1" - params = [] - - conditions = { - "id": " AND id = ?", - "href": " AND href = ?", - "title": " AND title LIKE ?", - "is_full_data": " AND is_full_data = ?", - "uncensored": " AND uncensored = ?", - "from_actor_list": " AND from_actor_list = ?", - "before_updated_at": " AND updated_at <= ?", - "after_updated_at": " AND updated_at >= ?", - "start_id": " AND id > ?", - } - - for key, condition in conditions.items(): - if key in filters: - sql += condition - if key == "title": - params.append(f"%{filters[key]}%") - else: - params.append(filters[key]) - - for key in ["is_full_data_in", "is_full_data_not_in"]: - if key in filters: - values = filters[key] - if values: - placeholders = ", ".join(["?"] * len(values)) - operator = "IN" if key == "is_full_data_in" else "NOT IN" - sql += f" AND is_full_data {operator} ({placeholders})" - params.extend(values) - - if "order_by" in filters: - # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 - sql += f" ORDER BY {filters['order_by']} " - - if 'limit' in filters: - sql += " LIMIT ?" - params.append(filters["limit"]) - - cursor.execute(sql, params) - #return [row[0].lower() for row in cursor.fetchall()] # 返回小写 - #return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()] - return [{'href': row[0], 'title': row[1], 'id':row[2]} for row in cursor.fetchall()] - - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None - -# 插入一条任务日志 -def insert_task_log(): - try: - cursor.execute(""" - INSERT INTO javdb_task_log (task_status) VALUES ('Start') - """) - conn.commit() - - task_id = cursor.lastrowid - if task_id is None: - return None - update_task_log(task_id=task_id, task_status='Start') - - return task_id # 获取插入的 task_id - except sqlite3.Error as e: - logging.error(f"插入任务失败: {e}") - return None - -# 更新任务日志的字段 -def update_task_log_inner(task_id, **kwargs): - try: - fields = ", ".join(f"{key} = ?" for key in kwargs.keys()) - params = list(kwargs.values()) + [task_id] - - sql = f"UPDATE javdb_task_log SET {fields}, updated_at = datetime('now', 'localtime') WHERE task_id = ?" - cursor.execute(sql, params) - conn.commit() - except sqlite3.Error as e: - logging.error(f"更新任务 {task_id} 失败: {e}") - -# 更新任务日志的字段 -def update_task_log(task_id, task_status): - try: - # 获取 performers、studios 等表的最终行数 - cursor.execute("SELECT COUNT(*) FROM javdb_actors where is_full_data=1") - full_data_actors = cursor.fetchone()[0] - cursor.execute("SELECT COUNT(*) FROM javdb_actors") - total_actors = cursor.fetchone()[0] - - cursor.execute("SELECT COUNT(*) FROM javdb_movies where is_full_data=1") - full_data_movies = cursor.fetchone()[0] - cursor.execute("SELECT COUNT(*) FROM javdb_movies") - total_movies = cursor.fetchone()[0] - - cursor.execute("SELECT COUNT(*) FROM javdb_makers") - total_makers = cursor.fetchone()[0] - - cursor.execute("SELECT COUNT(*) FROM javdb_series") - total_series = cursor.fetchone()[0] - - # 更新 task_log - update_task_log_inner(task_id, - full_data_actors=full_data_actors, - total_actors=total_actors, - full_data_movies=full_data_movies, - total_movies=total_movies, - total_makers=total_makers, - total_series=total_series, - task_status=task_status) - - except sqlite3.Error as e: - logging.error(f"更新任务 {task_id} 失败: {e}") - - -# 任务结束,更新字段 -def finalize_task_log(task_id): - try: - # 更新 task_log - update_task_log(task_id, task_status="Success") - except sqlite3.Error as e: - logging.error(f"任务 {task_id} 结束失败: {e}") - - -# 测试代码 -if __name__ == "__main__": - - sample_data = [ - { - 'name': '上原亜衣', - 'href': 'https://www.javdb.com/actors/MkAX', - 'pic': 'https://c0.jdbstatic.com/avatars/mk/MkAX.jpg', - 'alias': ['上原亜衣', '下原舞', '早瀬クリスタル', '阿蘇山百式屏風奉行'] - }, - { - 'name': '大橋未久', - 'href': 'https://www.javdb.com/actors/21Jp', - 'pic': 'https://c0.jdbstatic.com/avatars/21/21Jp.jpg', - 'alias': ['大橋未久'] - }, - ] - - for actor in sample_data: - insert_or_update_actor(actor) - - print(query_actors("name LIKE '%未久%'")) - #delete_actor_by_href('https://www.javdb.com/actors/MkAX') - print(query_actors()) diff --git a/src/db_utils/sqlite_db.py b/src/db_utils/sqlite_db.py new file mode 100644 index 0000000..f21dca1 --- /dev/null +++ b/src/db_utils/sqlite_db.py @@ -0,0 +1,204 @@ +import sqlite3 +import logging +import os +from datetime import datetime +import src.config.config as config + +default_dbpath = f"{config.global_share_data_dir}/sqlite/shared.db" + +# 数据库基类,封装了通用的操作。 +class DatabaseHandler: + def __init__(self, db_path=None): + # 使用传入的 db_path 或默认路径 + self.DB_PATH = db_path or default_dbpath + + # 验证路径是否存在(可选) + if db_path and not os.path.exists(os.path.dirname(db_path)): + os.makedirs(os.path.dirname(db_path)) + + self.conn = sqlite3.connect(self.DB_PATH, check_same_thread=False) + self.cursor = self.conn.cursor() + + # 检查 SQLite 版本 + self.lower_sqlite_version = False + sqlite_version = sqlite3.sqlite_version_info + if sqlite_version < (3, 24, 0): + self.lower_sqlite_version = True + + def get_table_columns_and_defaults(self, tbl_name): + try: + self.cursor.execute(f"PRAGMA table_info({tbl_name})") + columns = self.cursor.fetchall() + column_info = {} + for col in columns: + col_name = col[1] + default_value = col[4] + column_info[col_name] = default_value + return column_info + except sqlite3.Error as e: + logging.error(f"Error getting table columns: {e}") + return None + + def check_and_process_data(self, data, tbl_name): + column_info = self.get_table_columns_and_defaults(tbl_name) + if column_info is None: + return None + processed_data = {} + for col, default in column_info.items(): + if col == 'id' or col == 'created_at': # 自增主键,不需要用户提供; 创建日期,使用建表默认值 + continue + if col == 'updated_at': # 日期函数,用户自己指定即可 + processed_data[col] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + if col in data: + processed_data[col] = data[col] + + return processed_data + + def insert_or_update_common(self, data, tbl_name, uniq_key='url'): + if self.lower_sqlite_version: + return self.insert_or_update_common_lower(data, tbl_name, uniq_key) + + try: + processed_data = self.check_and_process_data(data, tbl_name) + if processed_data is None: + return None + + columns = ', '.join(processed_data.keys()) + values = list(processed_data.values()) + placeholders = ', '.join(['?' for _ in values]) + update_clause = ', '.join([f"{col}=EXCLUDED.{col}" for col in processed_data.keys() if col != uniq_key]) + + sql = f''' + INSERT INTO {tbl_name} ({columns}) + VALUES ({placeholders}) + ON CONFLICT ({uniq_key}) DO UPDATE SET {update_clause} + ''' + self.cursor.execute(sql, values) + self.conn.commit() + + # 获取插入或更新后的 report_id + self.cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],)) + report_id = self.cursor.fetchone()[0] + return report_id + except sqlite3.Error as e: + logging.error(f"Error inserting or updating data: {e}") + return None + + def insert_or_update_common_lower(self, data, tbl_name, uniq_key='url'): + try: + processed_data = self.check_and_process_data(data, tbl_name) + if processed_data is None: + return None + + columns = ', '.join(processed_data.keys()) + values = list(processed_data.values()) + placeholders = ', '.join(['?' for _ in values]) + + # 先尝试插入数据 + try: + sql = f''' + INSERT INTO {tbl_name} ({columns}) + VALUES ({placeholders}) + ''' + self.cursor.execute(sql, values) + self.conn.commit() + except sqlite3.IntegrityError: # 唯一键冲突,执行更新操作 + update_clause = ', '.join([f"{col}=?" for col in processed_data.keys() if col != uniq_key]) + update_values = [processed_data[col] for col in processed_data.keys() if col != uniq_key] + update_values.append(data[uniq_key]) + sql = f"UPDATE {tbl_name} SET {update_clause} WHERE {uniq_key} = ?" + self.cursor.execute(sql, update_values) + self.conn.commit() + + # 获取插入或更新后的 report_id + self.cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],)) + report_id = self.cursor.fetchone()[0] + return report_id + except sqlite3.Error as e: + logging.error(f"Error inserting or updating data: {e}") + return None + + def insert_task_log(self): + return 1 + + def update_task_log(self, task_id, task_status): + return 1 + + def finalize_task_log(self, task_id): + return 1 + + def close(self): + self.cursor.close() + self.conn.close() + + +# javbus 类 +class JavbusDBHandler(DatabaseHandler): + def __init__(self, db_path=None): + super().__init__(db_path) + self.tbl_name_actors = 'javbus_actors' + + def insert_actor_index(self, data, uncensored=0, from_actor_list=0, from_movie_list=0): + data['uncensored'] = uncensored + if from_actor_list: + data['from_actor_list'] = from_actor_list + if from_movie_list: + data['from_movie_list'] = from_movie_list + try: + return self.insert_or_update_common(data, self.tbl_name_actors, uniq_key='href') + except sqlite3.Error as e: + logging.error(f"Error inserting or updating data: {e}") + return None + + def update_actor_detail(self, data, is_full_data=1): + try: + data['is_full_data'] = is_full_data + return self.insert_or_update_common(data, self.tbl_name_actors, uniq_key='href') + except sqlite3.Error as e: + logging.error(f"Error inserting or updating data: {e}") + return None + + def query_actors(self, **filters): + try: + sql = f"SELECT url, en_name as name FROM {self.tbl_name_actors} WHERE 1=1" + params = [] + + conditions = { + "id": " AND id = ?", + "url": " AND href = ?", + "en_name": " AND name LIKE ?", + "is_full_data": " AND is_full_data = ?", + "start_id": " AND id > ?", + } + + for key, condition in conditions.items(): + if key in filters: + sql += condition + if key == "en_name": + params.append(f"%{filters[key]}%") + else: + params.append(filters[key]) + + for key in ["is_full_data_in", "is_full_data_not_in"]: + if key in filters: + values = filters[key] + if values: + placeholders = ", ".join(["?"] * len(values)) + operator = "IN" if key == "is_full_data_in" else "NOT IN" + sql += f" AND is_full_data {operator} ({placeholders})" + params.extend(values) + + if "order_by" in filters: + # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 + sql += f" ORDER BY {filters['order_by']} " + + if 'limit' in filters: + sql += " LIMIT ?" + params.append(filters["limit"]) + + self.cursor.execute(sql, params) + return [{'url': row[0], 'name': row[1]} for row in self.cursor.fetchall()] + except sqlite3.Error as e: + logging.error(f"查询 href 失败: {e}") + return None + diff --git a/src/javbus/fetch.py b/src/javbus/fetch.py index 92c560a..0fc8252 100644 --- a/src/javbus/fetch.py +++ b/src/javbus/fetch.py @@ -9,12 +9,13 @@ from functools import partial from urllib.parse import urljoin, urlparse import src.config.config as config import src.logger.logger as logger -import src.db_utils.db_javbus as db_tools -import src.crawling.craw_common as scraper_base -import src.crawling.craw_javbus as scraper +import src.db_utils.sqlite_db as sqlite_db +import src.crawling.craw as craw import src.utils.utils as utils logger.setup_logging() +db_tools = sqlite_db.JavbusDBHandler() +scraper = craw.JavbusCrawler() debug = False skip_local = False @@ -34,7 +35,7 @@ def fetch_actor_list_lang(lang="en", uncensored=None): num = 1 while current_url: logging.info(f"fetching url {current_url}") - soup, status_code = scraper_base.fetch_page(current_url, partial(scraper_base.generic_validator, tag="div", identifier="waterfall", attr_type="id"), headers=scraper.headers, cookies=scraper.cookies) + soup, status_code = scraper.fetch_page(current_url, partial(scraper.generic_validator, tag="div", identifier="waterfall", attr_type="id")) if soup: list_data, current_url = scraper.parse_actors_list(soup, current_url) if list_data : @@ -50,9 +51,12 @@ def fetch_actor_list_lang(lang="en", uncensored=None): else: logging.warning(f'fetch actor error. {current_url} ...') - elif status_code and status_code == 404: + elif status_code : logging.warning(f'fetch page error. httpcode: {status_code}, url: {current_url}') break + else: # 达到失败上限,加上休眠继续重试 + time.sleep(5) + time.sleep(0.3) # 调试break @@ -62,36 +66,13 @@ def fetch_actor_list_lang(lang="en", uncensored=None): # 获取演员列表 def fetch_actor_list(): #for lang in ["en", "ja", "zh"]: - for lang in ['ja']: + for lang in ['en']: fetch_actor_list_lang(lang=lang, uncensored=1) #for lang in ["en", "ja", "zh"]: - for lang in ['ja']: + for lang in ['en']: fetch_actor_list_lang(lang=lang) - -# 获取演员列表 -def fetch_actor_list2(): - next_url = scraper.actors_uncensored_base_url - while next_url: - logging.info(f'fetching page {next_url}') - soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="actors", attr_type="id")) - if soup: - list_data, next_url = scraper.parse_actors_uncensored(soup, next_url) - if list_data : - # 写入数据库 - for row in list_data: - actor_id = db_tools.insert_actor_index(name=row['name'], href=row.get('href', ''), from_actor_list=1) - if actor_id: - logging.debug(f'insert performer index to db. performer_id:{actor_id}, name: {row['name']}, href:{row['href']}') - else: - logging.warning(f'insert performer index failed. name: {row['name']}, href:{row['href']}') - else: - logging.warning(f'fetch actor error. {next_url} ...') - elif status_code and status_code == 404: - logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}') - break - # 获取makers列表 def fetch_makers_list(): next_url = scraper.makers_uncensored_base_url