diff --git a/scripts/iafd/src/config.py b/scripts/iafd/src/config.py index fc7fc09..6820d73 100644 --- a/scripts/iafd/src/config.py +++ b/scripts/iafd/src/config.py @@ -1,26 +1,86 @@ import logging import os import inspect +import time from datetime import datetime +from logging.handlers import RotatingFileHandler +from collections import defaultdict global_share_data_dir = '/root/sharedata' global_host_data_dir = '/root/hostdir/scripts_data' -# 设置日志配置 +# 统计日志频率 +log_count = defaultdict(int) # 记录日志的次数 +last_log_time = defaultdict(float) # 记录上次写入的时间戳 + +class RateLimitFilter(logging.Filter): + """ + 频率限制过滤器: + 1. 在 60 秒内,同样的日志最多写入 60 次,超过则忽略 + 2. 如果日志速率超过 100 条/秒,发出告警 + """ + LOG_LIMIT = 60 # 每分钟最多记录相同消息 10 次 + + def filter(self, record): + global log_count, last_log_time + message_key = record.getMessage() # 获取日志内容 + + # 计算当前时间 + now = time.time() + elapsed = now - last_log_time[message_key] + + # 限制相同日志的写入频率 + if elapsed < 60: # 60 秒内 + log_count[message_key] += 1 + if log_count[message_key] > self.LOG_LIMIT: + print('reach limit.') + return False # 直接丢弃 + else: + log_count[message_key] = 1 # 超过 60 秒,重新计数 + + last_log_time[message_key] = now + + return True # 允许写入日志 + + + def setup_logging(log_filename=None): - # 如果未传入 log_filename,则使用当前脚本名称作为日志文件名 if log_filename is None: - # 获取调用 setup_logging 的脚本文件名 caller_frame = inspect.stack()[1] caller_filename = os.path.splitext(os.path.basename(caller_frame.filename))[0] - - # 获取当前日期,格式为 yyyymmdd current_date = datetime.now().strftime('%Y%m%d') - # 拼接 log 文件名,将日期加在扩展名前 log_filename = f'../log/{caller_filename}_{current_date}.log' - logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s', - handlers=[ - logging.FileHandler(log_filename), - logging.StreamHandler() - ]) \ No newline at end of file + max_log_size = 100 * 1024 * 1024 # 10 MB + max_log_files = 10 # 最多保留 10 个日志文件 + + file_handler = RotatingFileHandler(log_filename, maxBytes=max_log_size, backupCount=max_log_files) + file_handler.setFormatter(logging.Formatter( + '%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s' + )) + + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter( + '%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s' + )) + + # 创建 logger + logger = logging.getLogger() + logger.setLevel(logging.INFO) + logger.handlers = [] # 避免重复添加 handler + logger.addHandler(file_handler) + logger.addHandler(console_handler) + + # 添加频率限制 + rate_limit_filter = RateLimitFilter() + file_handler.addFilter(rate_limit_filter) + console_handler.addFilter(rate_limit_filter) + + +# 运行示例 +if __name__ == "__main__": + setup_logging() + + for i in range(1000): + logging.info("测试日志,检测频率限制") + time.sleep(0.01) # 模拟快速写入日志 \ No newline at end of file diff --git a/scripts/iafd/src/fetch.py b/scripts/iafd/src/fetch.py index 0289bcc..c20894b 100644 --- a/scripts/iafd/src/fetch.py +++ b/scripts/iafd/src/fetch.py @@ -231,54 +231,71 @@ def fetch_movies_by_stu(): if debug: break +# 更新演员信息,单次循环 +def fetch_performers_detail_once(perfomers_list): + for performer in perfomers_list: + url = performer['href'] + person = performer['name'] + logging.info(f"Fetching data for performer ({person}), url {url} ...") + soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="headshot", attr_type="id")) + if soup: + data = scraper.parse_page_performer(soup) + if data: + performer_id = db_tools.insert_or_update_performer({ + 'href': url, + 'person': person, + **data + }) + if performer_id: + logging.info(f'insert one person, id: {performer_id}, person: ({person}), url: {url}') + else: + logging.warning(f'insert person: ({person}) {url} failed.') + + # 写入到本地json文件 + utils.write_person_json(person, url, { + 'href': url, + 'person': person, + **data + }) + else: + logging.warning(f'parse_page_performer error. person: ({person}), url: {url}') + elif status_code and status_code == 404: + logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...') + else: + logging.warning(f'fetch_page error. person: ({person}), url: {url}') + time.sleep(1) + # 更新演员信息 def fetch_performers_detail(): + limit_count = 5 if debug else 1000 perfomers_list = [] - while True: - # 每次从数据库中取一部分,避免一次全量获取 - perfomers_list = db_tools.query_performer_hrefs(is_full_data=0, limit=1000) - if len(perfomers_list) < 1: - logging.info(f'all performers fetched.') - break - for performer in perfomers_list: - url = performer['href'] - person = performer['name'] - logging.info(f"Fetching data for performer ({person}), url {url} ...") - soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="headshot", attr_type="id")) - if soup: - data = scraper.parse_page_performer(soup) - if data: - performer_id = db_tools.insert_or_update_performer({ - 'href': url, - 'person': person, - **data - }) - if performer_id: - logging.info(f'insert one person, id: {performer_id}, person: ({person}), url: {url}') - else: - logging.warning(f'insert person: ({person}) {url} failed.') - # 写入到本地json文件 - utils.write_person_json(person, url, { - 'href': url, - 'person': person, - **data - }) - else: - logging.warning(f'parse_page_performer error. person: ({person}), url: {url}') - elif status_code and status_code == 404: - logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...') - else: - logging.warning(f'fetch_page error. person: ({person}), url: {url}') - # 调试break - if debug: - return True + # 获取新演员的列表 + while True: + perfomers_list = db_tools.query_performer_hrefs(is_full_data=0, limit=limit_count) + if len(perfomers_list) < 1: + logging.info(f'all new performers fetched. ') + break + fetch_performers_detail_once(perfomers_list) + if debug: + break + + # 获取待更新的演员的列表 + while True: + perfomers_list = db_tools.get_performers_needed_update(limit=limit_count) + if len(perfomers_list) < 1: + logging.info(f'all existed performers updated. ') + break + fetch_performers_detail_once(perfomers_list) + if debug: + break # 更新影片信息 def fetch_movies_detail(): + limit_count = 10 if debug else 1000 movies_list = [] while True: - movies_list = db_tools.query_movie_hrefs(is_full_data=0, limit=1000) + movies_list = db_tools.query_movie_hrefs(is_full_data=0, limit=limit_count) if len(movies_list) < 1: logging.info(f'all movies fetched.') break @@ -309,9 +326,10 @@ def fetch_movies_detail(): logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...') else: logging.warning(f'fetch_page error. url: {url}') - # 调试增加break - if debug: - return True + time.sleep(1) + # 调试增加break + if debug: + return True # 建立缩写到函数的映射 diff --git a/scripts/iafd/src/sqlite_utils.py b/scripts/iafd/src/sqlite_utils.py index dcd51ac..e2288f0 100644 --- a/scripts/iafd/src/sqlite_utils.py +++ b/scripts/iafd/src/sqlite_utils.py @@ -541,6 +541,10 @@ def insert_or_update_movie(movie_data): # 导演不存在的话,插入一条 if director_id is None: director_id = insert_performer_index( movie_data['Director'], movie_data['DirectorHref'], from_movie_list=1) + if studio_id is None: + studio_id = 0 + if distributor_id is None: + distributor_id = 0 # 插入或更新电影信息 cursor.execute( @@ -678,6 +682,23 @@ def query_movie_hrefs(**filters): logging.error(f"查询 href 失败: {e}") return [] +# 获取 view_iafd_performers_movies 中数据 不匹配的演员信息。 +def get_performers_needed_update(limit=None): + try: + sql = """ + SELECT href, name FROM view_iafd_performers_movies where actual_movies_cnt != movies_cnt + """ + + if limit is not None: + sql += f" LIMIT {limit}" + + cursor.execute(sql) + return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()] + + except sqlite3.Error as e: + logging.error(f"查询 href 失败: {e}") + return [] + # 插入一条任务日志 def insert_task_log(): try: diff --git a/scripts/javdb/src/config.py b/scripts/javdb/src/config.py index fc7fc09..0218efe 100644 --- a/scripts/javdb/src/config.py +++ b/scripts/javdb/src/config.py @@ -1,26 +1,85 @@ import logging import os import inspect +import time from datetime import datetime +from logging.handlers import RotatingFileHandler +from collections import defaultdict global_share_data_dir = '/root/sharedata' global_host_data_dir = '/root/hostdir/scripts_data' -# 设置日志配置 +# 统计日志频率 +log_count = defaultdict(int) # 记录日志的次数 +last_log_time = defaultdict(float) # 记录上次写入的时间戳 + +class RateLimitFilter(logging.Filter): + """ + 频率限制过滤器: + 1. 在 60 秒内,同样的日志最多写入 60 次,超过则忽略 + 2. 如果日志速率超过 100 条/秒,发出告警 + """ + LOG_LIMIT = 60 # 每分钟最多记录相同消息 10 次 + + def filter(self, record): + global log_count, last_log_time + message_key = record.getMessage() # 获取日志内容 + + # 计算当前时间 + now = time.time() + elapsed = now - last_log_time[message_key] + + # 限制相同日志的写入频率 + if elapsed < 60: # 60 秒内 + log_count[message_key] += 1 + if log_count[message_key] > self.LOG_LIMIT: + print('reach limit.') + return False # 直接丢弃 + else: + log_count[message_key] = 1 # 超过 60 秒,重新计数 + + last_log_time[message_key] = now + + return True # 允许写入日志 + + def setup_logging(log_filename=None): - # 如果未传入 log_filename,则使用当前脚本名称作为日志文件名 if log_filename is None: - # 获取调用 setup_logging 的脚本文件名 caller_frame = inspect.stack()[1] caller_filename = os.path.splitext(os.path.basename(caller_frame.filename))[0] - - # 获取当前日期,格式为 yyyymmdd current_date = datetime.now().strftime('%Y%m%d') - # 拼接 log 文件名,将日期加在扩展名前 log_filename = f'../log/{caller_filename}_{current_date}.log' - logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s', - handlers=[ - logging.FileHandler(log_filename), - logging.StreamHandler() - ]) \ No newline at end of file + max_log_size = 100 * 1024 * 1024 # 10 MB + max_log_files = 10 # 最多保留 10 个日志文件 + + file_handler = RotatingFileHandler(log_filename, maxBytes=max_log_size, backupCount=max_log_files) + file_handler.setFormatter(logging.Formatter( + '%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s' + )) + + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter( + '%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s' + )) + + # 创建 logger + logger = logging.getLogger() + logger.setLevel(logging.INFO) + logger.handlers = [] # 避免重复添加 handler + logger.addHandler(file_handler) + logger.addHandler(console_handler) + + # 添加频率限制 + rate_limit_filter = RateLimitFilter() + file_handler.addFilter(rate_limit_filter) + console_handler.addFilter(rate_limit_filter) + + +# 运行示例 +if __name__ == "__main__": + setup_logging() + + for i in range(1000): + logging.info("测试日志,检测频率限制") + time.sleep(0.01) # 模拟快速写入日志 \ No newline at end of file diff --git a/scripts/javdb/src/fetch.py b/scripts/javdb/src/fetch.py index 93cea3b..a3f0ab4 100644 --- a/scripts/javdb/src/fetch.py +++ b/scripts/javdb/src/fetch.py @@ -224,6 +224,7 @@ def fetch_movies_detail(): break else: logging.warning(f'fetch_page error. url: {url}') + time.sleep(1) # 调试增加break if debug: return True @@ -262,14 +263,14 @@ def main(cmd, args_debug, args_force): for short_name in function_names: func = function_map.get(short_name.strip()) # 从映射中获取对应的函数 if callable(func): - db_tools.update_task_log(task_id, task_status=f'Running {func}') + db_tools.update_task_log(task_id, task_status=f'Running {short_name}') func() else: logging.warning(f" {short_name} is not a valid function shortcut.") else: # 全量执行 for name, func in function_map.items(): if callable(func): - db_tools.update_task_log(task_id, task_status=f'Running {func}') + db_tools.update_task_log(task_id, task_status=f'Running {name}') func() else: logging.warning(f" {short_name} is not a valid function shortcut.") diff --git a/scripts/schema.sql b/scripts/schema.sql index 5d5dff9..6005a70 100644 --- a/scripts/schema.sql +++ b/scripts/schema.sql @@ -256,3 +256,60 @@ CREATE TABLE IF NOT EXISTS "javdb_task_log" ( `created_at` TEXT DEFAULT (datetime('now', 'localtime')), `updated_at` TEXT DEFAULT (datetime('now', 'localtime')) ); +CREATE VIEW view_iafd_movies_summary AS +SELECT + COUNT(*) AS total_count, + SUM(CASE WHEN from_performer_list = 1 THEN 1 ELSE 0 END) AS from_perfromers, + SUM(CASE WHEN from_dist_list = 1 THEN 1 ELSE 0 END) AS from_dis, + SUM(CASE WHEN from_stu_list = 1 THEN 1 ELSE 0 END) AS from_stu, + SUM(CASE WHEN from_performer_list=1 AND from_dist_list = 1 THEN 1 ELSE 0 END) AS performers_dist, + SUM(CASE WHEN from_performer_list=1 AND from_stu_list = 1 THEN 1 ELSE 0 END) AS performers_stu, + SUM(CASE WHEN from_dist_list=1 AND from_stu_list = 1 THEN 1 ELSE 0 END) AS dist_stu, + SUM(CASE WHEN from_performer_list=1 AND from_dist_list=1 AND from_stu_list = 1 THEN 1 ELSE 0 END) AS performers_dist_stu, + SUM(CASE WHEN from_performer_list=1 AND from_dist_list=0 AND from_stu_list = 0 THEN 1 ELSE 0 END) AS performers_only, + SUM(CASE WHEN from_performer_list=0 AND from_dist_list=1 AND from_stu_list = 0 THEN 1 ELSE 0 END) AS dist_only, + SUM(CASE WHEN from_performer_list=0 AND from_dist_list=0 AND from_stu_list = 1 THEN 1 ELSE 0 END) AS stu_only +FROM iafd_movies im +/* view_iafd_movies_summary(total_count,from_perfromers,from_dis,from_stu,performers_dist,performers_stu,dist_stu,performers_dist_stu,performers_only,dist_only,stu_only) */; +CREATE VIEW view_iafd_thelordofporn_match AS +SELECT + ia.id AS iafd_id, ia.href AS iafd_href, ia.name AS iafd_name, + tl.id AS tl_id, tl.pornstar AS tl_pornstar, tl.href AS tl_href +FROM thelordofporn_actress tl +JOIN iafd_performers ia ON tl.pornstar = ia.name +/* view_iafd_thelordofporn_match(iafd_id,iafd_href,iafd_name,tl_id,tl_pornstar,tl_href) */; +CREATE VIEW view_iafd_performers_movies AS +SELECT p.id, p.href, p.name, IFNULL(COUNT(pm.performer_id), 0) AS actual_movies_cnt, p.movies_cnt +FROM iafd_performers p +LEFT JOIN iafd_performers_movies pm ON pm.performer_id = p.id +GROUP BY p.id +/* view_iafd_performers_movies(id,href,name,actual_movies_cnt,movies_cnt) */; +CREATE VIEW view_javdb_javhd_match AS +SELECT + ja.id AS javdb_id, ja.href AS javdb_href, ja.name AS javdb_name, + jm.id AS javhd_id, jm.ja_name AS javhd_ja_name, jm.en_name AS javhd_en_name, jm.url AS javhd_url +FROM javdb_actors ja +JOIN javhd_models jm ON ja.name = jm.ja_name +/* view_javdb_javhd_match(javdb_id,javdb_href,javdb_name,javhd_id,javhd_ja_name,javhd_en_name,javhd_url) */; +CREATE VIEW view_iafd_javdb_javhd_match AS +SELECT + ip.id AS iafd_id, ip.href AS iafd_href, ip.name AS iafd_name, + jjm.javdb_id AS javdb_id, jjm.javdb_name AS javdb_name, jjm.javdb_href AS javdb_href, + jjm.javhd_id AS javhd_id, jjm.javhd_en_name AS javhd_en_name, jjm.javhd_url AS javhd_url +FROM iafd_performers ip +JOIN javdb_javhd_match jjm ON ip.name = jjm.javhd_en_name; +CREATE VIEW view_javdb_movies_summary AS +SELECT + COUNT(*) AS total_count, + SUM(CASE WHEN from_actor_list = 1 THEN 1 ELSE 0 END) AS from_actors, + SUM(CASE WHEN from_movie_makers = 1 THEN 1 ELSE 0 END) AS from_makers, + SUM(CASE WHEN from_movie_series = 1 THEN 1 ELSE 0 END) AS from_series, + SUM(CASE WHEN from_actor_list=1 AND from_movie_makers = 1 THEN 1 ELSE 0 END) AS actor_makers, + SUM(CASE WHEN from_actor_list=1 AND from_movie_series = 1 THEN 1 ELSE 0 END) AS actor_series, + SUM(CASE WHEN from_movie_makers=1 AND from_movie_series = 1 THEN 1 ELSE 0 END) AS makers_series, + SUM(CASE WHEN from_actor_list=1 AND from_movie_makers = 1 AND from_movie_series = 1 THEN 1 ELSE 0 END) AS actor_makers_series, + SUM(CASE WHEN from_actor_list=1 AND from_movie_makers = 0 AND from_movie_series = 0 THEN 1 ELSE 0 END) AS from_actors_only, + SUM(CASE WHEN from_actor_list=0 AND from_movie_makers = 1 AND from_movie_series = 0 THEN 1 ELSE 0 END) AS from_makers_only, + SUM(CASE WHEN from_actor_list=0 AND from_movie_makers = 0 AND from_movie_series = 1 THEN 1 ELSE 0 END) AS from_series_only +FROM javdb_movies +/* view_javdb_movies_summary(total_count,from_actors,from_makers,from_series,actor_makers,actor_series,makers_series,actor_makers_series,from_actors_only,from_makers_only,from_series_only) */;