modify scripts

This commit is contained in:
oscarz
2025-06-03 10:20:03 +08:00
parent f45886bc9f
commit e97f49bfb9
5 changed files with 825 additions and 0 deletions

90
javhd/src/config.py Normal file
View File

@ -0,0 +1,90 @@
import logging
import os
import inspect
import time
from datetime import datetime
from logging.handlers import RotatingFileHandler
from collections import defaultdict
home_dir = os.path.expanduser("~")
global_host_data_dir = f'{home_dir}/hostdir/scripts_data'
global_share_data_dir = f'{home_dir}/sharedata'
log_dir = '../log'
# 统计日志频率
log_count = defaultdict(int) # 记录日志的次数
last_log_time = defaultdict(float) # 记录上次写入的时间戳
class RateLimitFilter(logging.Filter):
"""
频率限制过滤器:
1. 在 60 秒内,同样的日志最多写入 60 次,超过则忽略
2. 如果日志速率超过 100 条/秒,发出告警
"""
LOG_LIMIT = 60 # 每分钟最多记录相同消息 10 次
def filter(self, record):
global log_count, last_log_time
message_key = record.getMessage() # 获取日志内容
# 计算当前时间
now = time.time()
elapsed = now - last_log_time[message_key]
# 限制相同日志的写入频率
if elapsed < 60: # 60 秒内
log_count[message_key] += 1
if log_count[message_key] > self.LOG_LIMIT:
print('reach limit.')
return False # 直接丢弃
else:
log_count[message_key] = 1 # 超过 60 秒,重新计数
last_log_time[message_key] = now
return True # 允许写入日志
def setup_logging(log_filename=None):
if log_filename is None:
caller_frame = inspect.stack()[1]
caller_filename = os.path.splitext(os.path.basename(caller_frame.filename))[0]
current_date = datetime.now().strftime('%Y%m%d')
os.makedirs(log_dir, exist_ok=True)
log_filename = f'{log_dir}/{caller_filename}_{current_date}.log'
#log_filename = f'../log/{caller_filename}_{current_date}.log'
max_log_size = 100 * 1024 * 1024 # 10 MB
max_log_files = 10 # 最多保留 10 个日志文件
file_handler = RotatingFileHandler(log_filename, maxBytes=max_log_size, backupCount=max_log_files)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s'
))
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s'
))
# 创建 logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.handlers = [] # 避免重复添加 handler
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# 添加频率限制
rate_limit_filter = RateLimitFilter()
file_handler.addFilter(rate_limit_filter)
console_handler.addFilter(rate_limit_filter)
# 运行示例
if __name__ == "__main__":
setup_logging()
for i in range(1000):
logging.info("测试日志,检测频率限制")
time.sleep(0.01) # 模拟快速写入日志

225
javhd/src/fetch.py Normal file
View File

@ -0,0 +1,225 @@
import json
import time
import csv
import argparse
import textwrap
import logging
from functools import partial
import config
import sqlite_utils as db_tools
import scraper
import utils
from urllib.parse import urljoin, urlparse
config.setup_logging()
debug = False
skip_local = False
scan_mode = 0
update_mode = 0
# 获取演员列表
def fetch_actor_list_lang(lang="en"):
s_url = f"/{lang}/model"
current_url = urljoin(scraper.host_url, s_url)
num = 1
while current_url:
logging.info(f"fetching url {current_url}")
data = scraper.fetch_post_page(current_url)
if not data:
logging.warning(f"fetch {current_url} error.")
break
# 检查 JSON 结构
if not all(key in data for key in ["status", "results_count", "pagination_params", "template"]):
logging.warning(f"[错误] 数据结构异常: {data}")
break
# 解析数据
all_data = scraper.parse_list_json(data, num=num, lang=lang)
# 插入到数据库
for row in all_data:
# 非en的话只保留name
if lang != 'en':
new_row = {}
new_row['url'] = utils.replace_lang_param(row['url'])
new_row[f"{lang}_name"] = row[f"{lang}_name"]
insert_row = new_row
else:
insert_row = row
row_id = db_tools.insert_actor_index(insert_row)
if row_id:
logging.debug(f"insert or update one row. row id: {row_id}, data: {insert_row}")
else:
logging.warning(f"insert or update actor failed. data: {insert_row}")
# 获取下一页
next_path = data.get("pagination_params", {}).get("next")
if next_path:
current_url = urljoin(scraper.host_url, next_path)
logging.debug(f"next page: {current_url}")
num += 1
time.sleep(0.2)
else:
logging.info(f"all pages fetched. lang: {lang}")
break
# 调试break
if debug:
return True
# 获取演员列表
def fetch_actor_list():
for lang in ["en", "ja", "zh"]:
fetch_actor_list_lang(lang=lang)
# 更新演员信息
def fetch_performers_detail():
limit_count = 5 if debug else 100
performers_list = []
last_performer_id = 0
abnormal_codes = [scraper.http_code_404, scraper.http_code_login]
def get_performers(**kwargs):
kwargs["order_by"] = 'id asc'
return db_tools.query_actors(limit=limit_count, **kwargs)
while True:
if update_mode == 0: # 只遍历新纪录
performers_list = get_performers(start_id=0, is_full_data=0)
elif update_mode == 1: # 只遍历完整纪录
performers_list = get_performers(start_id=last_performer_id, is_full_data=1)
elif update_mode == 2: # 0+1
performers_list = get_performers(start_id=last_performer_id, is_full_data_not_in=abnormal_codes)
elif update_mode == 3: # 其他
performers_list = get_performers(start_id=last_performer_id, is_full_data_in =abnormal_codes)
else: # 全部
performers_list = get_performers(start_id=last_performer_id)
if len(performers_list) < 1:
logging.info(f'all performers fetched.')
break
succ_rows = 0
for performer in performers_list:
url = performer['url']
person = performer['name']
next_url = url
need_insert = True
while next_url:
logging.debug(f"Fetching data for actor ({person}), url {next_url} ...")
soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="info__features", attr_type="class"))
if soup:
data, next_url = scraper.parse_actor_detail(soup, next_url)
if data:
# 获取完了个人的所有影片,开始插入数据
performer_id = db_tools.update_actor_detail(data, is_full_data=1)
if performer_id:
logging.debug(f'insert one person, id: {performer_id}, person: ({person}), url: {url}')
last_performer_id = performer_id
succ_rows += 1
else:
logging.warning(f'insert person: ({person}) {url} failed.')
elif status_code and status_code == scraper.http_code_404:
actor_id = db_tools.update_actor_detail({'url': url}, is_full_data=scraper.http_code_404)
logging.warning(f'404 page. id: {actor_id}, name: ({person}), url: {url}, Skiping...')
need_insert = False
break
elif status_code and status_code == scraper.http_code_login:
actor_id = db_tools.update_actor_detail({'url': url}, is_full_data=scraper.http_code_login)
logging.warning(f'401 page(need login). id: {actor_id}, name: ({person}), url: {url}, Skiping...')
need_insert = False
break
else:
logging.warning(f'fetch_page error. url: {url}')
# 如果出现了401或者404已经处理直接跳过
if not need_insert:
continue
time.sleep(0.5)
logging.info(f'total request: {len(performers_list)}, succ: {succ_rows}, last performer id: {last_performer_id}')
# 调试break
if debug:
return True
# 建立缩写到函数的映射
function_map = {
"actor_list": fetch_actor_list,
"actors" : fetch_performers_detail,
}
# 主函数
def main(cmd, args):
# 执行指定的函数
if cmd:
function_names = args.cmd.split(",") # 拆分输入
for short_name in function_names:
func = function_map.get(short_name.strip()) # 从映射中获取对应的函数
if callable(func):
func()
else:
logging.warning(f" {short_name} is not a valid function shortcut.")
else: # 全量执行
for name, func in function_map.items():
if callable(func):
func()
else:
logging.warning(f" {short_name} is not a valid function shortcut.")
logging.info(f'all process completed!')
# TODO:
# 1,
# 设置环境变量
def set_env(args):
global debug
debug = args.debug
if debug:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
global skip_local
skip_local = args.skip_local
global scan_mode
scan_mode = args.scan_mode
global update_mode
if args.update:
update_mode = args.update
if __name__ == "__main__":
# 命令行参数处理
keys_str = ",".join(function_map.keys())
usage_examples = textwrap.dedent('''
示例用法:
python3 ./fetch.py # 刷新列表页,并遍历新增的演员
python3 ./fetch.py --update=4 # 刷新列表页,并遍历全量的记录
python3 ./fetch.py --cmd=actor_list # 刷新列表页所有演员(三种语言)
python3 ./fetch.py --cmd=actors # 遍历新增的演员
''')
parser = argparse.ArgumentParser(
description='fetch javhd data.\n\n' + usage_examples,
formatter_class=argparse.RawDescriptionHelpFormatter
)
#parser = argparse.ArgumentParser(description='fetch javdb data.')
parser.add_argument("--cmd", type=str, help=f"Comma-separated list of function shortcuts: {keys_str}")
parser.add_argument('--update', type=int, choices=[0, 1, 2, 3, 4], default=0, help='0-只遍历is_full_data=0(默认), 1-只遍历is_full_data=1, 2-遍历is_full_data<=1, 3-只遍历is_full_data>1(异常数据), 4-遍历所有')
parser.add_argument('--scan_mode', type=int, choices=[0, 1, 2], default=1, help='1-只遍历所有 uncensored 的 makers/series/actors/movies(默认), 0-与前者相反, 2-全量')
parser.add_argument('--skip_local', action='store_true', help='如果本地缓存了页面,则跳过数据库操作')
parser.add_argument('--debug', action='store_true', help='Enable debug mode (limit records)')
args = parser.parse_args()
set_env(args)
main(args.cmd, args)

285
javhd/src/scraper.py Normal file
View File

@ -0,0 +1,285 @@
import cloudscraper
import time
import json
import csv
import logging
import signal
import sys
import os
import re
from bs4 import BeautifulSoup
from requests.exceptions import RequestException
from functools import partial
from urllib.parse import urljoin, urlparse
import config
import utils
# 定义基础 URL 和可变参数
host_url = "https://javhd.com"
lang_prefix = ["ja", "en", "zh"]
http_code_404 = 404
http_code_login = 401
http_code_local = 99
save_raw_html = False
load_from_local = False
POST_HEADERS = {
"accept": "application/json, text/plain, */*",
"content-type": "application/json",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 Edg/132.0.0.0",
"x-requested-with": "XMLHttpRequest",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
'content-type': 'application/json',
'cookie': 'adult-warning-popup=disabled; st_d=%7B%7D; feid=c18cd2f2cf5c034d120e5975558acc8c; xfeid=3b040b0aecba9d3df41f21732480d947; _ym_uid=1739069925634817268; _ym_d=1739069925; atas_uid=; _clck=1cd9xpy%7C2%7Cftb%7C0%7C1866; _ym_isad=2; nats=ODY0LjIuMi4yNi4yMzQuMC4wLjAuMA; nats_cookie=https%253A%252F%252Fcn.pornhub.com%252F; nats_unique=ODY0LjIuMi4yNi4yMzQuMC4wLjAuMA; nats_sess=480e7410e649efce6003c3add587a579; nats_landing=No%2BLanding%2BPage%2BURL; JAVSESSID=n42hnvj3ecr0r6tadusladpk3h; user_lang=zh; locale=ja; utm=%7B%22ads_type%22%3A%22%22%7D; sid=3679b28ec523df85ec4e7739e32f2008; _ym_visorc=w; feid_sa=62; sid_sa=2' ,
'origin': 'https://javhd.com',
'priority': 'u=1, i',
'referer': 'https://javhd.com/ja/model' ,
'sec-ch-ua': '"Not A(Brand";v="8", "Chromium";v="132", "Microsoft Edge";v="132"' ,
'sec-ch-ua-mobile': '?0' ,
'sec-ch-ua-platform': '"macOS"' ,
'sec-fetch-dest': 'empty' ,
'sec-fetch-mode': 'cors' ,
'sec-fetch-site': 'same-origin' ,
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 Edg/132.0.0.0' ,
'x-requested-with': 'XMLHttpRequest' ,
}
POST_DATA = {} # 空字典表示无数据
HEADERS = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
'cookie': 'adult-warning-popup=disabled; st_d=%7B%7D; feid=c18cd2f2cf5c034d120e5975558acc8c; xfeid=3b040b0aecba9d3df41f21732480d947; _ym_uid=1739069925634817268; _ym_d=1739069925; atas_uid=; _clck=1cd9xpy%7C2%7Cftb%7C0%7C1866; _ym_isad=2; nats=ODY0LjIuMi4yNi4yMzQuMC4wLjAuMA; nats_cookie=https%253A%252F%252Fcn.pornhub.com%252F; nats_unique=ODY0LjIuMi4yNi4yMzQuMC4wLjAuMA; nats_sess=480e7410e649efce6003c3add587a579; nats_landing=No%2BLanding%2BPage%2BURL; JAVSESSID=n42hnvj3ecr0r6tadusladpk3h; user_lang=zh; locale=ja; utm=%7B%22ads_type%22%3A%22%22%7D; sid=3679b28ec523df85ec4e7739e32f2008; _ym_visorc=w; feid_sa=62; sid_sa=2' ,
'origin': 'https://javhd.com',
'priority': 'u=1, i',
'referer': 'https://javhd.com/ja/model' ,
'sec-ch-ua': '"Not A(Brand";v="8", "Chromium";v="132", "Microsoft Edge";v="132"' ,
'sec-ch-ua-mobile': '?0' ,
'sec-ch-ua-platform': '"macOS"' ,
'sec-fetch-dest': 'empty' ,
'sec-fetch-mode': 'cors' ,
'sec-fetch-site': 'same-origin' ,
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 Edg/132.0.0.0' ,
}
scraper = cloudscraper.create_scraper()
# POST 请求并返回json数据
def fetch_post_page(url, retries=3):
"""从给定 URL 获取数据,带重试机制"""
for attempt in range(retries):
try:
response = scraper.post(url=url, headers=POST_HEADERS, json=POST_DATA, timeout=10)
response.raise_for_status()
return response.json()
except cloudscraper.exceptions.CloudflareChallengeError as e:
logging.error(f"Cloudflare Challenge Error on {url}: {e}, Retring...")
except cloudscraper.exceptions.CloudflareCode1020 as e:
logging.error(f"Access Denied (Error 1020) on {url}: {e}, Retring...")
except Exception as e:
logging.error(f"[错误] 请求失败 {url}: {e}, 重试 {attempt + 1}/{retries}")
time.sleep(2)
return None
#使用 CloudScraper 进行网络请求,并执行页面验证,支持不同解析器和预处理
def fetch_page(url, validator, max_retries=3, parser="html.parser", preprocessor=None):
if load_from_local: # 从本地读取的逻辑
html = utils.read_raw_html(url)
if html:
# 预处理 HTML如果提供了 preprocessor
html_text = preprocessor(html) if preprocessor else html
soup = BeautifulSoup(html_text, parser)
if validator(soup): # 进行自定义页面检查
logging.debug(f"read from local. href: {url}")
return soup, http_code_local # 返回一个小于100的错误码表明是从本地返回的
for attempt in range(max_retries):
try:
if 'javhd.com' not in url.lower():
logging.error(f'wrong url format: {url}')
return None, None
response = scraper.get(url, headers=HEADERS)
# 处理 HTTP 状态码
if response.status_code == 404:
logging.debug(f"Page not found (404): {url}")
return None, http_code_404 # 直接返回 404调用方可以跳过
response.raise_for_status() # 处理 HTTP 错误
# 检查是否发生跳转,比如到登录页面
if response.history:
logging.debug(f"Page redirected on {url}. Checking if it's a login page.")
soup = BeautifulSoup(response.text, parser)
# 判断是否为登录页面,
if soup.find('nav', class_='panel form-panel'):
logging.debug(f"Page redirected to login page on {url}.")
return None, http_code_login
if save_raw_html:
utils.write_raw_html(url, response.text)
# 预处理 HTML如果提供了 preprocessor
html_text = preprocessor(response.text) if preprocessor else response.text
soup = BeautifulSoup(html_text, parser)
if validator(soup): # 进行自定义页面检查
return soup, response.status_code
logging.warning(f"Validation failed on attempt {attempt + 1} for {url}")
except cloudscraper.exceptions.CloudflareChallengeError as e:
logging.error(f"Cloudflare Challenge Error on {url}: {e}, Retring...")
except cloudscraper.exceptions.CloudflareCode1020 as e:
logging.error(f"Access Denied (Error 1020) on {url}: {e}, Retring...")
except Exception as e:
logging.error(f"Unexpected error on {url}: {e}, Retring...")
logging.error(f'Fetching failed after max retries. {url}')
return None, None # 达到最大重试次数仍然失败
# 修复 HTML 结构,去除多余标签并修正 <a> 标签,在获取人种的时候需要
def preprocess_html(html):
return html.replace('<br>', '').replace('<a ', '<a target="_blank" ')
# 通用的 HTML 结构验证器
def generic_validator(soup, tag, identifier, attr_type="id"):
if attr_type == "id":
return soup.find(tag, id=identifier) is not None
elif attr_type == "class":
return bool(soup.find_all(tag, class_=identifier))
elif attr_type == "name":
return bool(soup.find('select', {'name': identifier}))
return False
# 解析列表页
def parse_list_json(data, num, lang='en'):
template = data.get("template", "")
thumb_components = re.findall(r'<thumb-component[^>]*>', template)
list_data = []
for idx, thumb in enumerate(thumb_components, start=1):
rank = (num - 1) * 36 + idx
link_content = re.search(r'link-content="(.*?)"', thumb)
url_thumb = re.search(r'url-thumb="(.*?)"', thumb)
title = re.search(r'title="(.*?)"', thumb)
if not url_thumb or not title:
logging.warning(f"no countent for rank:{rank} title:{title} url:{url_thumb} {thumb}")
continue
pic = url_thumb.group(1)
name = title.group(1)
url = link_content.group(1)
data = {"rank": rank, "url": url, "pic": pic}
data[f"{lang}_name"] = name
list_data.append(data)
return list_data
def process_paragraph(paragraph):
# 获取完整的 HTML 结构,而不是 get_text()
paragraph_html = str(paragraph)
# 使用 BeautifulSoup 解析移除水印标签后的 HTML 并提取文本
soup = BeautifulSoup(paragraph_html, 'html.parser')
cleaned_text = soup.get_text().strip()
return cleaned_text
# 解析 HTML 内容,提取需要的数据
def parse_actor_detail(soup, href):
info_section = soup.find("div", class_="info__features")
if not info_section:
logging.warning(f"未找到 info__features 区块: {href}")
return None, None
# 页面标题到数据库字段的映射
FIELD_MAPPING = {
"Height": "height",
"Weight": "weight",
"Breast size": "breast_size",
"Breast factor": "breast_factor",
"Hair color": "hair_color",
"Eye color": "eye_color",
"Birth date": "birth_date",
"Ethnicity": "ethnicity",
"Birth place": "birth_place"
}
# 初始化数据字典,使用数据库字段名
extracted_data = {db_field: "" for db_field in FIELD_MAPPING.values()}
extracted_data['url'] = href
for li in info_section.find_all("li", class_="content-desc__list-item"):
title_tag = li.find("strong", class_="content-desc__list-title")
value_tag = li.find("span", class_="content-desc__list-text")
if title_tag and value_tag:
title = process_paragraph(title_tag) # 页面原始标题
value = process_paragraph(value_tag)
# 通过映射表转换为数据库字段名
db_field = FIELD_MAPPING.get(title)
if db_field:
extracted_data[db_field] = value
return extracted_data, None
###### 以下为测试代码 ######
def test_actor_list():
s_url = "/ja/model"
current_url = urljoin(host_url, s_url)
while current_url:
print(f"[信息] 正在抓取 {current_url}")
data = fetch_post_page(current_url)
if not data:
print(f"[错误] 无法获取数据 {current_url}")
break
# 检查 JSON 结构
if not all(key in data for key in ["status", "results_count", "pagination_params", "template"]):
print(f"[错误] 数据结构异常: {data}")
break
all_data = parse_list_json(data, 1)
print(all_data)
# 获取下一页
next_path = data.get("pagination_params", {}).get("next")
if next_path:
current_url = urljoin(host_url, next_path)
print(f"next page: {current_url}")
else:
print("[信息] 已抓取所有页面。")
break
break
def test_actor():
next_url = 'https://javhd.com/en/model/Yui-Hatano'
all_data = []
while next_url:
print(f'fetching page {next_url}')
soup, http_status_code = fetch_page(next_url, partial(generic_validator, tag="div", identifier="info__features", attr_type="class"))
if soup:
list_data, next_url = parse_actor_detail(soup, next_url)
if list_data :
all_data.append(list_data)
else:
print('get wrong page.')
print(all_data)
if __name__ == "__main__":
test_actor_list()
test_actor()

190
javhd/src/sqlite_utils.py Normal file
View File

@ -0,0 +1,190 @@
import sqlite3
import json
import config
import logging
from datetime import datetime
# 连接 SQLite 数据库
DB_PATH = f"{config.global_share_data_dir}/sqlite/shared.db" # 替换为你的数据库文件
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
cursor = conn.cursor()
tbl_name_actors = 'javhd_models'
# 检查 SQLite 版本
lower_sqlite_version = False
sqlite_version = sqlite3.sqlite_version_info
if sqlite_version < (3, 24, 0):
lower_sqlite_version = True
# 获取表的列名和默认值
def get_table_columns_and_defaults(tbl_name):
try:
cursor.execute(f"PRAGMA table_info({tbl_name})")
columns = cursor.fetchall()
column_info = {}
for col in columns:
col_name = col[1]
default_value = col[4]
column_info[col_name] = default_value
return column_info
except sqlite3.Error as e:
logging.error(f"Error getting table columns: {e}")
return None
# 检查并处理数据
def check_and_process_data(data, tbl_name):
column_info = get_table_columns_and_defaults(tbl_name=tbl_name)
if column_info is None:
return None
processed_data = {}
for col, default in column_info.items():
if col == 'id': # 自增主键,不需要用户提供
continue
if col == 'created_at' or col == 'updated_at': # 日期函数,用户自己指定即可
continue
if col in data:
processed_data[col] = data[col]
return processed_data
# 插入或更新数据
def insert_or_update_common(data, tbl_name, uniq_key='url'):
if lower_sqlite_version:
return insert_or_update_common_lower(data, tbl_name, uniq_key)
try:
processed_data = check_and_process_data(data, tbl_name)
if processed_data is None:
return None
columns = ', '.join(processed_data.keys())
values = list(processed_data.values())
placeholders = ', '.join(['?' for _ in values])
update_clause = ', '.join([f"{col}=EXCLUDED.{col}" for col in processed_data.keys() if col != uniq_key]) + ', updated_at=datetime(\'now\', \'localtime\')'
sql = f'''
INSERT INTO {tbl_name} ({columns}, updated_at)
VALUES ({placeholders}, datetime('now', 'localtime'))
ON CONFLICT ({uniq_key}) DO UPDATE SET {update_clause}
'''
cursor.execute(sql, values)
conn.commit()
# 获取插入或更新后的 report_id
cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],))
report_id = cursor.fetchone()[0]
return report_id
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 插入或更新数据
def insert_or_update_common_lower(data, tbl_name, uniq_key='url'):
try:
processed_data = check_and_process_data(data, tbl_name)
if processed_data is None:
return None
columns = ', '.join(processed_data.keys())
values = list(processed_data.values())
placeholders = ', '.join(['?' for _ in values])
# 先尝试插入数据
try:
sql = f'''
INSERT INTO {tbl_name} ({columns}, updated_at)
VALUES ({placeholders}, datetime('now', 'localtime'))
'''
cursor.execute(sql, values)
conn.commit()
except sqlite3.IntegrityError: # 唯一键冲突,执行更新操作
update_clause = ', '.join([f"{col}=?" for col in processed_data.keys() if col != uniq_key]) + ', updated_at=datetime(\'now\', \'localtime\')'
update_values = [processed_data[col] for col in processed_data.keys() if col != uniq_key]
update_values.append(data[uniq_key])
sql = f"UPDATE {tbl_name} SET {update_clause} WHERE {uniq_key} = ?"
cursor.execute(sql, update_values)
conn.commit()
# 获取插入或更新后的 report_id
cursor.execute(f"SELECT id FROM {tbl_name} WHERE {uniq_key} = ?", (data[uniq_key],))
report_id = cursor.fetchone()[0]
return report_id
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 插入books表并判断是否需要更新
def insert_actor_index(data):
try:
return insert_or_update_common(data, tbl_name_actors)
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 更新详细信息
def update_actor_detail(data, is_full_data=1):
try:
data['is_full_data'] = is_full_data
return insert_or_update_common(data, tbl_name_actors)
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 查询
def query_actors(**filters):
try:
sql = "SELECT url, en_name as name FROM javhd_models WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"url": " AND href = ?",
"en_name": " AND name LIKE ?",
"is_full_data": " AND is_full_data = ?",
"start_id": " AND id > ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "en_name":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
for key in ["is_full_data_in", "is_full_data_not_in"]:
if key in filters:
values = filters[key]
if values:
placeholders = ", ".join(["?"] * len(values))
operator = "IN" if key == "is_full_data_in" else "NOT IN"
sql += f" AND is_full_data {operator} ({placeholders})"
params.extend(values)
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
cursor.execute(sql, params)
#return [row[0].lower() for row in cursor.fetchall()] # 返回小写
return [{'url': row[0], 'name': row[1]} for row in cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 测试代码
if __name__ == "__main__":
print(query_actors("name LIKE '%未久%'"))
#delete_actor_by_href('https://www.javdb.com/actors/MkAX')
print(query_actors())

35
javhd/src/utils.py Normal file
View File

@ -0,0 +1,35 @@
import re
import os
import json
import time
import csv
from datetime import datetime
from urllib.parse import urlparse
import logging
import config
from urllib.parse import urlparse, urlunparse, parse_qs, urlencode
def replace_lang_param(url: str) -> str:
"""
将URL中的lang参数统一替换为'en'支持路径中包含lang的情况
"""
parsed = urlparse(url)
# 处理路径中的lang参数如 /ja/model/... 或 /en/model/...
path_parts = parsed.path.split('/')
if len(path_parts) >= 2 and path_parts[1] in ['en', 'ja', 'zh']:
path_parts[1] = 'en' # 替换第二个路径段为'en'
new_path = '/'.join(path_parts)
else:
new_path = parsed.path
# 处理查询参数中的lang如有
query = parse_qs(parsed.query)
# 构建新URL
new_parsed = parsed._replace(
path=new_path,
query=urlencode(query, doseq=True)
)
return urlunparse(new_parsed)