From a04f02ec6dbaa510b5d74f2fde083e05771902c3 Mon Sep 17 00:00:00 2001 From: sophon Date: Mon, 28 Jul 2025 19:34:14 +0800 Subject: [PATCH] modify scripts --- .../db_wapper/spider_db_handler.py | 1076 ++++------------- .../scrapy_proj/db_wapper/sqlite_base.py | 333 ++++- .../scrapy_proj/spiders/iafd_spider.py | 11 +- scrapy_proj/scrapy_proj/tools/db_tools.py | 233 ++++ 4 files changed, 776 insertions(+), 877 deletions(-) create mode 100644 scrapy_proj/scrapy_proj/tools/db_tools.py diff --git a/scrapy_proj/scrapy_proj/db_wapper/spider_db_handler.py b/scrapy_proj/scrapy_proj/db_wapper/spider_db_handler.py index 16e90ca..74a0314 100644 --- a/scrapy_proj/scrapy_proj/db_wapper/spider_db_handler.py +++ b/scrapy_proj/scrapy_proj/db_wapper/spider_db_handler.py @@ -29,24 +29,11 @@ class SisDBHandler(SQLiteDBHandler): # 统计函数 def get_stat(self): - try: - self.cursor.execute(f""" - SELECT - (SELECT COUNT(*) FROM {self.tbl_name_sis}) AS cnt - """) - - row = self.cursor.fetchone() - if not row: - logging.warning(f"query no results.") - return {} - - columns = [desc[0] for desc in self.cursor.description] - return dict(zip(columns, row)) - - except sqlite3.Error as e: - logging.error(f"query error: {e}") - return {} - + stats_config = [ + # 演员相关统计 + {'table': self.tbl_name_sis, 'alias': 'cnt'}, + ] + return self.generic_stats_query(stats_config) @register_handler(comm.SPIDER_NAME_U3C3) class U3C3DBHandler(SQLiteDBHandler): @@ -59,23 +46,11 @@ class U3C3DBHandler(SQLiteDBHandler): # 统计函数 def get_stat(self): - try: - self.cursor.execute(f""" - SELECT - (SELECT COUNT(*) FROM {self.tbl_name_u3c3}) AS cnt - """) - - row = self.cursor.fetchone() - if not row: - logging.warning(f"query no results.") - return {} - - columns = [desc[0] for desc in self.cursor.description] - return dict(zip(columns, row)) - - except sqlite3.Error as e: - logging.error(f"query error: {e}") - return {} + stats_config = [ + # 演员相关统计 + {'table': self.tbl_name_u3c3, 'alias': 'cnt'}, + ] + return self.generic_stats_query(stats_config) @register_handler(comm.SPIDER_NAME_CLM) @@ -117,78 +92,37 @@ class ClmDBHandler(SQLiteDBHandler): logging.warning(f"insert index error: {item}") def get_empty_title(self): - try: - self.cursor.execute(f"SELECT id, href FROM {self.tbl_name_clm_index} WHERE title='' ") - return [dict(row) for row in self.cursor.fetchall()] - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None + return self.generic_query( + table_name = self.tbl_name_clm_index, + fields = ['id', 'href'], + title = '' + ) def get_count_by_keywords_id(self, key_words_id): - self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_name_words_index} WHERE words_id = ?", (key_words_id,)) - row = self.cursor.fetchone() - return row[0] if row else None + return self.generic_get_record_count( + table_name=self.tbl_name_words_index, + conditions={ + 'words_id': key_words_id, + } + ) # 按条件查询 href 列表 def get_key_words(self, **filters): - try: - sql = f"SELECT id, words, groups FROM {self.tbl_name_clm_keywords} WHERE 1=1" - params = [] - - conditions = { - "id": " AND id = ?", - "words": " AND words LIKE ?", - "groups": " AND groups LIKE ?", - "tags": " AND tags LIKE ?", - "start_id": " AND id > ?", - } - - for key, condition in conditions.items(): - if key in filters: - sql += condition - if key == "words" or key == 'groups' or key == 'tags': - params.append(f"%{filters[key]}%") - else: - params.append(filters[key]) - - if "query_str" in filters: - sql += f" AND {filters['query_str']} " - - if "order_by" in filters: - # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 - sql += f" ORDER BY {filters['order_by']} " - - if 'limit' in filters: - sql += " LIMIT ?" - params.append(filters["limit"]) - - self.cursor.execute(sql, params) - return [dict(row) for row in self.cursor.fetchall()] - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None + return self.generic_query( + table_name = self.tbl_name_clm_keywords, + fields = ['id', 'words', 'groups'], + filters = filters + ) # 统计函数 def get_stat(self): - try: - self.cursor.execute(f""" - SELECT - (SELECT COUNT(*) FROM {self.tbl_name_clm_keywords}) AS words, - (SELECT COUNT(*) FROM {self.tbl_name_clm_index}) AS magnets, - (SELECT COUNT(*) FROM {self.tbl_name_words_index}) AS wd_mag - """) - - row = self.cursor.fetchone() - if not row: - logging.warning(f"query no results.") - return {} - - columns = [desc[0] for desc in self.cursor.description] - return dict(zip(columns, row)) - - except sqlite3.Error as e: - logging.error(f"query error: {e}") - return {} + stats_config = [ + # 演员相关统计 + {'table': self.tbl_name_clm_keywords, 'alias': 'words'}, + {'table': self.tbl_name_clm_index, 'alias': 'magnets'}, + {'table': self.tbl_name_words_index, 'alias': 'wd_mag'}, + ] + return self.generic_stats_query(stats_config) @register_handler(comm.SPIDER_NAME_PBOX) class PboxDBHandler(SQLiteDBHandler): @@ -255,39 +189,12 @@ class PboxDBHandler(SQLiteDBHandler): def get_studios(self, **filters): - try: - sql = f"SELECT href, name, id, label_id, scene_count FROM {self.tbl_studios} WHERE 1=1" - params = [] - - conditions = { - "id": " AND id = ?", - "href": " AND href = ?", - "name": " AND name LIKE ?", - "start_id": " AND id > ?", - } - - for key, condition in conditions.items(): - if key in filters: - sql += condition - if key == "name": - params.append(f"%{filters[key]}%") - else: - params.append(filters[key]) - - if "order_by" in filters: - # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 - sql += f" ORDER BY {filters['order_by']} " - - if 'limit' in filters: - sql += " LIMIT ?" - params.append(filters["limit"]) - - self.cursor.execute(sql, params) - return [dict(row) for row in self.cursor.fetchall()] - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return [] - + return self.generic_query( + table_name = self.tbl_studios, + fields = ['href', 'name', 'id', 'label_id', 'scene_count'], + filters = filters + ) + def get_stu_mov_count(self, stu_id): self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_movies} WHERE studio_id = ?", (stu_id,)) row = self.cursor.fetchone() @@ -295,26 +202,14 @@ class PboxDBHandler(SQLiteDBHandler): # 统计函数 def get_stat(self): - try: - self.cursor.execute(f""" - SELECT - (SELECT COUNT(*) FROM {self.tbl_studios}) AS studios, - (SELECT COUNT(*) FROM {self.tbl_movies}) AS movies, - (SELECT COUNT(*) FROM {self.tbl_actor}) AS actors, - (SELECT COUNT(*) FROM {self.tbl_actor_mov}) AS act_mov - """) - - row = self.cursor.fetchone() - if not row: - logging.warning(f"query no results.") - return {} - - columns = [desc[0] for desc in self.cursor.description] - return dict(zip(columns, row)) - - except sqlite3.Error as e: - logging.error(f"query error: {e}") - return {} + stats_config = [ + # 演员相关统计 + {'table': self.tbl_studios, 'alias': 'studios'}, + {'table': self.tbl_movies, 'alias': 'movies'}, + {'table': self.tbl_actor, 'alias': 'actors'}, + {'table': self.tbl_actor_mov, 'alias': 'act_mov'}, + ] + return self.generic_stats_query(stats_config) def close_spider(self, spider): # 关闭数据库连接 @@ -340,33 +235,20 @@ class JavHDDBHandler(SQLiteDBHandler): # 统计函数 def get_stat(self): - try: - self.cursor.execute(f""" - SELECT - (SELECT COUNT(*) FROM {self.tbl_name_javhd}) AS cnt - """) - - row = self.cursor.fetchone() - if not row: - logging.warning(f"query no results.") - return {} - - columns = [desc[0] for desc in self.cursor.description] - return dict(zip(columns, row)) - - except sqlite3.Error as e: - logging.error(f"query error: {e}") - return {} + stats_config = [ + # 演员相关统计 + {'table': self.tbl_name_javhd, 'alias': 'cnt'}, + ] + return self.generic_stats_query(stats_config) def has_full_data(self, href): - try: - self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_name_javhd} WHERE is_full_data=1 and url = ?", (href,)) - row = self.cursor.fetchone() - return row[0] if row else None - except sqlite3.Error as e: - logging.error(f"query error: {e}") - return 0 - + return self.generic_get_record_count( + table_name=self.tbl_name_javhd, + conditions={ + 'is_full_data': 1, # is_full_data=1 + 'url': href # url=href + } + ) @register_handler(comm.SPIDER_NAME_LORD) class LordDBHandler(SQLiteDBHandler): @@ -398,33 +280,28 @@ class LordDBHandler(SQLiteDBHandler): # 统计函数 def get_stat(self): - try: - self.cursor.execute(f""" - SELECT - (SELECT COUNT(*) FROM {self.tbl_name_actors}) AS actor_cnt - """) - - row = self.cursor.fetchone() - if not row: - logging.warning(f"query no results.") - return {} - - columns = [desc[0] for desc in self.cursor.description] - return dict(zip(columns, row)) - - except sqlite3.Error as e: - logging.error(f"query error: {e}") - return {} + stats_config = [ + # 演员相关统计 + {'table': self.tbl_name_actors, 'alias': 'actor_cnt'}, + ] + return self.generic_stats_query(stats_config) def has_full_data(self, href): - try: - self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_name_actors} WHERE is_full_data=1 and href = ?", (href,)) - row = self.cursor.fetchone() - return row[0] if row else None - except sqlite3.Error as e: - logging.error(f"query error: {e}") - return 0 + return self.generic_get_record_count( + table_name=self.tbl_name_actors, + conditions={ + 'is_full_data': 1, # is_full_data=1 + 'href': href # url=href + } + ) + # 按条件查询 href 列表 + def get_lord_actors(self, **filters): + return self.generic_query( + table_name = self.tbl_name_thelordofporn_actress, + fields = ['href', 'pornstar as name', 'id'], + filters = filters + ) @register_handler(comm.SPIDER_NAME_JAVBUS) class JavBusDBHandler(SQLiteDBHandler): @@ -474,16 +351,34 @@ class JavBusDBHandler(SQLiteDBHandler): # 统计函数 def get_stat(self): - return self.get_statics() + stats_config = [ + # 演员相关统计 + {'table': self.tbl_name_actors, 'alias': 'actors'}, + {'table': self.tbl_name_actors, 'alias': 'act_un', 'where': 'uncensored=1'}, + {'table': self.tbl_name_actors, 'alias': 'act_full', 'where': 'is_full_data=1'}, + {'table': self.tbl_name_actors, 'alias': 'act_unc_full', 'where': 'uncensored=1 AND is_full_data=1'}, + + # 电影相关统计 + {'table': self.tbl_name_movies, 'alias': 'movies'}, + {'table': self.tbl_name_movies, 'alias': 'mov_un', 'where': 'uncensored=1'}, + {'table': self.tbl_name_movies, 'alias': 'mov_full', 'where': 'is_full_data=1'}, + {'table': self.tbl_name_movies, 'alias': 'mov_un_full', 'where': 'uncensored=1 AND is_full_data=1'}, + + # 其他表统计 + {'table': self.tbl_name_studios, 'alias': 'studios'}, + {'table': self.tbl_name_labels, 'alias': 'labels'}, + {'table': self.tbl_name_series, 'alias': 'series'} + ] + return self.generic_stats_query(stats_config) def has_full_data(self, href): - try: - self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_name_actors} WHERE is_full_data=1 and href = ?", (href,)) - row = self.cursor.fetchone() - return row[0] if row else None - except sqlite3.Error as e: - logging.error(f"query error: {e}") - return 0 + return self.generic_get_record_count( + table_name=self.tbl_name_actors, + conditions={ + 'is_full_data': 1, # is_full_data=1 + 'href': href # url=href + } + ) def insert_actor_index(self, data, **kwargs): fields = ['uncensored', 'from_actor_list', 'from_movie_list'] @@ -561,96 +456,18 @@ class JavBusDBHandler(SQLiteDBHandler): return None def query_actors(self, **filters): - try: - sql = f"SELECT href, en_name as name, uncensored, movies_cnt, id, is_full_data FROM {self.tbl_name_actors} WHERE 1=1" - params = [] - - conditions = { - "id": " AND id = ?", - "href": " AND href = ?", - "en_name": " AND en_name LIKE ?", - "is_full_data": " AND is_full_data = ?", - "start_id": " AND id > ?", - "uncensored": " AND uncensored = ?", - } - - for key, condition in conditions.items(): - if key in filters: - sql += condition - if key == "en_name": - params.append(f"%{filters[key]}%") - else: - params.append(filters[key]) - - for key in ["is_full_data_in", "is_full_data_not_in"]: - if key in filters: - values = filters[key] - if values: - placeholders = ", ".join(["?"] * len(values)) - operator = "IN" if key == "is_full_data_in" else "NOT IN" - sql += f" AND is_full_data {operator} ({placeholders})" - params.extend(values) - - if "order_by" in filters: - # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 - sql += f" ORDER BY {filters['order_by']} " - - if 'limit' in filters: - sql += " LIMIT ?" - params.append(filters["limit"]) - - self.cursor.execute(sql, params) - return [dict(row) for row in self.cursor.fetchall()] - #return [{'href': row[0], 'name': row[1], 'uncensored': row[2], 'movies_cnt':row[3]} for row in self.cursor.fetchall()] - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None + return self.generic_query( + table_name = self.tbl_name_actors, + fields = ['href', 'en_name as name', 'uncensored', 'movies_cnt', 'id', 'is_full_data'], + filters = filters + ) def query_movies(self, **filters): - try: - sql = f"SELECT href, title, uncensored, id, is_full_data FROM {self.tbl_name_movies} WHERE 1=1" - params = [] - - conditions = { - "id": " AND id = ?", - "href": " AND href = ?", - "title": " AND title LIKE ?", - "is_full_data": " AND is_full_data = ?", - "start_id": " AND id > ?", - "uncensored": " AND uncensored = ?", - } - - for key, condition in conditions.items(): - if key in filters: - sql += condition - if key == "title": - params.append(f"%{filters[key]}%") - else: - params.append(filters[key]) - - for key in ["is_full_data_in", "is_full_data_not_in"]: - if key in filters: - values = filters[key] - if values: - placeholders = ", ".join(["?"] * len(values)) - operator = "IN" if key == "is_full_data_in" else "NOT IN" - sql += f" AND is_full_data {operator} ({placeholders})" - params.extend(values) - - if "order_by" in filters: - # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 - sql += f" ORDER BY {filters['order_by']} " - - if 'limit' in filters: - sql += " LIMIT ?" - params.append(filters["limit"]) - - self.cursor.execute(sql, params) - return [dict(row) for row in self.cursor.fetchall()] - #return [{'href': row[0], 'title': row[1], 'uncensored': row[2], 'id':row[3]} for row in self.cursor.fetchall()] - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None + return self.generic_query( + table_name = self.tbl_name_actors, + fields = ['href', 'title', 'uncensored', 'id', 'is_full_data'], + filters = filters + ) # 检查记录是否存在,不存在就插入 def check_and_get_id(self, item, uncensored, tbl, uniq_key='href'): @@ -759,222 +576,22 @@ class JavBusDBHandler(SQLiteDBHandler): if not tbls.get(tbl): logging.warning(f"wrong table. table: {tbl}") return None - try: - sql = f"SELECT href, name, uncensored, id FROM {tbls[tbl]} WHERE 1=1" - params = [] - conditions = { - "id": " AND id = ?", - "href": " AND href = ?", - "name": " AND name LIKE ?", - "start_id": " AND id > ?", - "uncensored": " AND uncensored = ?", - } - - for key, condition in conditions.items(): - if key in filters: - sql += condition - if key == "name": - params.append(f"%{filters[key]}%") - else: - params.append(filters[key]) - - if "order_by" in filters: - # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 - sql += f" ORDER BY {filters['order_by']} " - - if 'limit' in filters: - sql += " LIMIT ?" - params.append(filters["limit"]) - - self.cursor.execute(sql, params) - return [{'href': row[0], 'name': row[1], 'uncensored': row[2], 'id':row[3]} for row in self.cursor.fetchall()] - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None + return self.generic_query( + table_name = {tbls[tbl]}, + fields = ['href', 'name', 'uncensored', 'id', 'is_full_data'], + filters = filters + ) def update_tags(self, data): return self.insert_or_update_common(data, self.tbl_name_tags, uniq_key='href') def query_tags(self, **filters): - try: - sql = f"SELECT href, name, id FROM {self.tbl_name_tags} WHERE 1=1" - params = [] - - conditions = { - "id": " AND id = ?", - "href": " AND href = ?", - "name": " AND name LIKE ?", - "start_id": " AND id > ?", - } - - for key, condition in conditions.items(): - if key in filters: - sql += condition - if key == "name": - params.append(f"%{filters[key]}%") - else: - params.append(filters[key]) - - if "order_by" in filters: - # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 - sql += f" ORDER BY {filters['order_by']} " - - if 'limit' in filters: - sql += " LIMIT ?" - params.append(filters["limit"]) - - self.cursor.execute(sql, params) - return [{'href': row[0], 'name': row[1], 'id': row[2]} for row in self.cursor.fetchall()] - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None - - def get_statics(self): - try: - self.cursor.execute(f""" - SELECT - (SELECT COUNT(*) FROM {self.tbl_name_actors}) AS actors, - (SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE uncensored=1) AS act_un, - (SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE is_full_data=1) AS act_full, - (SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE uncensored=1 AND is_full_data=1) AS act_unc_full, - (SELECT COUNT(*) FROM {self.tbl_name_movies}) AS movies, - (SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE uncensored=1) AS mov_un, - (SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE is_full_data=1) AS mov_full, - (SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE uncensored=1 AND is_full_data=1) AS mov_un_full, - (SELECT COUNT(*) FROM {self.tbl_name_studios}) AS studios, - (SELECT COUNT(*) FROM {self.tbl_name_labels}) AS labels, - (SELECT COUNT(*) FROM {self.tbl_name_series}) AS series - """) - - row = self.cursor.fetchone() - if not row: - logging.warning(f"query no results.") - return {} - - # 手动定义列名映射 - #columns = ['actors', 'act_un', 'act_full', 'act_unc_full', 'movies', 'mov_un', 'mov_full', 'mov_un_full'] - columns = [desc[0] for desc in self.cursor.description] - return dict(zip(columns, row)) - - except sqlite3.Error as e: - logging.error(f"query error: {e}") - return {} - - # 处理影片的 无码 字段 - def reset_movies_uncensored(self, check_and_do = 0): - try: - logging.info("创建临时表以便于保存待更新记录") - self.cursor.execute(""" - CREATE TEMPORARY TABLE IF NOT EXISTS temp_movies_to_update ( - movie_id INTEGER PRIMARY KEY - ) - """) - # 清空临时表(以防之前有残留数据) - self.cursor.execute("DELETE FROM temp_movies_to_update") - - logging.info(f"开始收集需要更新的影片ID...") - # 使用单个SQL语句完成所有条件的查询和插入 - self.cursor.execute(""" - INSERT OR IGNORE INTO temp_movies_to_update (movie_id) - SELECT DISTINCT m.id - FROM javbus_movies m - -- 连接演员表 - LEFT JOIN javbus_actors_movies am ON m.id = am.movie_id - LEFT JOIN javbus_actors a ON am.actor_id = a.id - -- 连接标签/系列/工作室表 - LEFT JOIN javbus_labels l ON m.label_id = l.id - LEFT JOIN javbus_series s ON m.series_id = s.id - LEFT JOIN javbus_studios st ON m.studio_id = st.id - -- 筛选条件:任一表的href包含'uncensored' - WHERE a.href LIKE '%uncensored%' - OR l.href LIKE '%uncensored%' - OR s.href LIKE '%uncensored%' - OR st.href LIKE '%uncensored%' - """) - - total_count = self.cursor.execute("SELECT COUNT(*) FROM temp_movies_to_update").fetchone()[0] - total_movies = self.cursor.execute("SELECT COUNT(*) FROM javbus_movies").fetchone()[0] - logging.info(f"共收集到 {total_count} 部需要更新的影片, 共有 {total_movies} 部影片") - - if check_and_do: - # 1. 将所有记录的uncensored默认设为0 - logging.info("开始将所有影片的uncensored设为默认值0...") - self.cursor.execute("UPDATE javbus_movies SET uncensored = 0") - logging.info(f"已将 {self.cursor.rowcount} 条记录的uncensored设为0") - - # 2. 将临时表中匹配的记录设为1 - logging.info("开始将匹配的影片的uncensored设为1...") - self.cursor.execute(""" - UPDATE javbus_movies - SET uncensored = 1 - WHERE id IN (SELECT movie_id FROM temp_movies_to_update) - """) - logging.info(f"已将 {self.cursor.rowcount} 条记录的uncensored设为1") - - self.conn.commit() - else: - logging.info("check完毕,本次忽略更新。。。") - - logging.info("任务执行完成!") - - except sqlite3.Error as e: - self.conn.rollback() - logging.error("Error inserting movie: %s", e) - logging.error(f"query error: {e}") - - # 处理影片的 无码 字段 - def reset_actor_movies(self, check_and_do = 0): - try: - # 检查表中是否已存在movies_cnt列 - self.cursor.execute(f"PRAGMA table_info({self.tbl_name_actors});") - columns = [row[1] for row in self.cursor.fetchall()] - - if 'movies_cnt' not in columns: - # 列不存在,添加新列 - add_field_sql = f""" - ALTER TABLE {self.tbl_name_actors} ADD COLUMN movies_cnt INTEGER DEFAULT 0 NOT NULL; - """ - self.cursor.execute(add_field_sql) - logging.info("成功添加movies_cnt字段") - else: - logging.info("movies_cnt字段已存在,跳过添加") - - # 确保关联表有索引 - self.cursor.execute(f""" - CREATE INDEX IF NOT EXISTS idx_actor_movie_actor_id - ON {self.tbl_name_actor_movie}(actor_id); - """) - - # 创建临时表存储统计结果 - self.cursor.execute(f""" - CREATE TEMPORARY TABLE temp_actor_counts AS - SELECT actor_id, COUNT(movie_id) AS cnt - FROM {self.tbl_name_actor_movie} - GROUP BY actor_id; - """) - - # 为临时表添加索引 - self.cursor.execute("CREATE INDEX idx_temp_actor_id ON temp_actor_counts(actor_id);") - - # 更新主表 - self.cursor.execute(f""" - UPDATE {self.tbl_name_actors} - SET movies_cnt = COALESCE(( - SELECT cnt FROM temp_actor_counts - WHERE actor_id = {self.tbl_name_actors}.id - ), 0); -- 使用COALESCE处理没有影片的演员 - """) - updated_rows = self.cursor.rowcount - logging.info(f"成功更新{updated_rows}个演员的影片数量") - - self.conn.commit() - logging.info("任务执行完成!") - - except sqlite3.Error as e: - self.conn.rollback() - logging.error("Error updating actor movie_cnt: %s", e) - + return self.generic_query( + table_name = self.tbl_name_tags, + fields = ['href', 'name', 'id'], + filters = filters + ) @register_handler(comm.SPIDER_NAME_IAFD) class IAFDDBHandler(SQLiteDBHandler): @@ -990,6 +607,7 @@ class IAFDDBHandler(SQLiteDBHandler): self.tbl_name_dist = 'iafd_distributors' self.tbl_name_performer_urls = 'iafd_performer_urls' self.tbl_name_ethnic = 'iafd_meta_ethnic' + self.tbl_name_thelordofporn_actress = 'thelordofporn_actress' def insert_item(self, item): # 获取Item中所有定义的字段(包括父类继承的) @@ -1023,36 +641,22 @@ class IAFDDBHandler(SQLiteDBHandler): # 统计函数 def get_stat(self): - try: - # 使用单个SQL查询获取所有统计数据 - self.cursor.execute(""" - SELECT - (SELECT COUNT(*) FROM iafd_performers) AS iafd_actors, - (SELECT COUNT(*) FROM iafd_performers WHERE is_full_data=1) AS act_full, - (SELECT COUNT(*) FROM iafd_movies) AS movies, - (SELECT COUNT(*) FROM iafd_movies WHERE is_full_data=1) AS mov_full, - (SELECT COUNT(*) FROM iafd_distributors) AS dist, - (SELECT COUNT(*) FROM iafd_studios) AS stu - """) + stats_config = [ + # 演员相关统计 + {'table': self.tbl_name_performers, 'alias': 'actors'}, + {'table': self.tbl_name_performers, 'alias': 'act_full', 'where': 'is_full_data=1'}, + + # 电影相关统计 + {'table': self.tbl_name_movies, 'alias': 'movies'}, + {'table': self.tbl_name_movies, 'alias': 'mov_full', 'where': 'is_full_data=1'}, - # 获取查询结果 - row = self.cursor.fetchone() - if not row: - return {} - - # 手动定义列名(与SQL中的AS别名保持一致) - #columns = ['iafd_actors', 'act_full', 'movies', 'mov_full', 'dist', 'stu'] - columns = [desc[0] for desc in self.cursor.description] - - # 将元组结果转换为字典 - return dict(zip(columns, row)) - - except sqlite3.Error as e: - logging.error(f"查询失败: {e}") - return {} + # 其他表统计 + {'table': self.tbl_name_studio, 'alias': 'stu'}, + {'table': self.tbl_name_dist, 'alias': 'dist'} + ] + return self.generic_stats_query(stats_config) # 插入演员索引,来自于列表数据 - #def insert_performer_index(self, name, href, from_astro_list=None, from_birth_list=None, from_ethnic_list=None, from_movie_list=None): def insert_performer_index(self, name, href, **kwargs): fields = [ 'from_astro_list', 'from_birth_list', 'from_ethnic_list', 'from_movie_list' @@ -1066,7 +670,6 @@ class IAFDDBHandler(SQLiteDBHandler): return self.insert_or_update_common(data=data, tbl_name=self.tbl_name_performers, uniq_key='href', exists_do_nothing=False) # """插入电影索引,来自于列表数据""" - #def insert_movie_index(self, title, href, release_year=0, from_performer_list=None, from_dist_list=None, from_stu_list=None): def insert_movie_index(self, title, href, **kwargs): fields = [ 'from_performer_list', 'from_dist_list', 'from_stu_list', 'release_year' @@ -1175,27 +778,11 @@ class IAFDDBHandler(SQLiteDBHandler): # 按条件查询 href 列表 def query_ethnic_hrefs(self, **filters): - try: - sql = "SELECT href, name FROM iafd_meta_ethnic WHERE 1=1" - params = [] - - if "id" in filters: - sql += " AND id = ?" - params.append(filters["id"]) - if "url" in filters: - sql += " AND href = ?" - params.append(filters["href"]) - if "name" in filters: - sql += " AND name LIKE ?" - params.append(f"%{filters['name']}%") - - self.cursor.execute(sql, params) - #return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写 - return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()] - - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None + return self.generic_query( + table_name = self.tbl_name_ethnic, + fields = ['href', 'name'], + filters = filters + ) # 插入或更新发行商 """ def insert_or_update_distributor(self, data): @@ -1203,26 +790,11 @@ class IAFDDBHandler(SQLiteDBHandler): # 按条件查询 href 列表 def query_distributor_hrefs(self, **filters): - try: - sql = "SELECT href FROM iafd_distributors WHERE 1=1" - params = [] - - if "id" in filters: - sql += " AND id = ?" - params.append(filters["id"]) - if "url" in filters: - sql += " AND href = ?" - params.append(filters["href"]) - if "name" in filters: - sql += " AND name LIKE ?" - params.append(f"%{filters['name']}%") - - self.cursor.execute(sql, params) - return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写 - - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None + return self.generic_query( + table_name = self.tbl_name_dist, + fields = ['href'], + filters = filters + ) # """ 插入或更新制作公司 """ def insert_or_update_studio(self, data): @@ -1230,26 +802,11 @@ class IAFDDBHandler(SQLiteDBHandler): # 按条件查询 href 列表 def query_studio_hrefs(self, **filters): - try: - sql = "SELECT href FROM iafd_studios WHERE 1=1" - params = [] - - if "id" in filters: - sql += " AND id = ?" - params.append(filters["id"]) - if "href" in filters: - sql += " AND href = ?" - params.append(filters["href"]) - if "name" in filters: - sql += " AND name LIKE ?" - params.append(f"%{filters['name']}%") - - self.cursor.execute(sql, params) - return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写 - - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None + return self.generic_query( + table_name = self.tbl_name_studio, + fields = ['href'], + filters = filters + ) # 检查记录是否存在,不存在就插入 def check_and_get_id(self, name, href, tbl, uniq_key='href'): @@ -1334,139 +891,29 @@ class IAFDDBHandler(SQLiteDBHandler): exists_do_nothing = False ) - # 按条件查询 href 列表 - def query_performer_hrefs(self, **filters): - return self.get_performers(**filters) - # 按条件查询 href 列表 def get_performers(self, **filters): - try: - sql = f"SELECT href, name, id, movies_cnt, is_full_data FROM {self.tbl_name_performers} WHERE 1=1" - params = [] - - conditions = { - "id": " AND id = ?", - "href": " AND href = ?", - "name": " AND name LIKE ?", - "is_full_data": " AND is_full_data = ?", - "start_id": " AND id > ?", - } - - for key, condition in conditions.items(): - if key in filters: - sql += condition - if key == "name": - params.append(f"%{filters[key]}%") - else: - params.append(filters[key]) - - for key in ["is_full_data_in", "is_full_data_not_in"]: - if key in filters: - values = filters[key] - if values: - placeholders = ", ".join(["?"] * len(values)) - operator = "IN" if key == "is_full_data_in" else "NOT IN" - sql += f" AND is_full_data {operator} ({placeholders})" - params.extend(values) - - if "order_by" in filters: - # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 - sql += f" ORDER BY {filters['order_by']} " - - if 'limit' in filters: - sql += " LIMIT ?" - params.append(filters["limit"]) - - self.cursor.execute(sql, params) - return [dict(row) for row in self.cursor.fetchall()] - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None - - # 按条件查询 href 列表 - def query_movie_hrefs(self, **filters): - return self.get_movies(**filters) + return self.generic_query( + table_name = self.tbl_name_performers, + fields = ['href', 'name', 'id', 'movies_cnt', 'is_full_data'], + filters = filters + ) # 按条件查询 href 列表 def get_movies(self, **filters): - try: - sql = f"SELECT href, title, id, is_full_data FROM {self.tbl_name_movies} WHERE 1=1" - params = [] - - conditions = { - "id": " AND id = ?", - "href": " AND href = ?", - "title": " AND title LIKE ?", - "is_full_data": " AND is_full_data = ?", - "start_id": " AND id > ?", - } - - for key, condition in conditions.items(): - if key in filters: - sql += condition - if key == "title": - params.append(f"%{filters[key]}%") - else: - params.append(filters[key]) - - for key in ["is_full_data_in", "is_full_data_not_in"]: - if key in filters: - values = filters[key] - if values: - placeholders = ", ".join(["?"] * len(values)) - operator = "IN" if key == "is_full_data_in" else "NOT IN" - sql += f" AND is_full_data {operator} ({placeholders})" - params.extend(values) - - if "order_by" in filters: - # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 - sql += f" ORDER BY {filters['order_by']} " - - if 'limit' in filters: - sql += " LIMIT ?" - params.append(filters["limit"]) - - self.cursor.execute(sql, params) - return [dict(row) for row in self.cursor.fetchall()] - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None - + return self.generic_query( + table_name = self.tbl_name_movies, + fields = ['href', 'title', 'id', 'is_full_data'], + filters = filters + ) # 按条件查询 href 列表 def get_lord_actors(self, **filters): - try: - sql = f"SELECT href, pornstar as name, id FROM {self.tbl_name_thelordofporn_actress} WHERE 1=1" - params = [] - - conditions = { - "id": " AND id = ?", - "href": " AND href = ?", - "pornstar": " AND pornstar LIKE ?", - "start_id": " AND id > ?", - } - - for key, condition in conditions.items(): - if key in filters: - sql += condition - if key == "pornstar": - params.append(f"%{filters[key]}%") - else: - params.append(filters[key]) - - if "order_by" in filters: - # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 - sql += f" ORDER BY {filters['order_by']} " - - if 'limit' in filters: - sql += " LIMIT ?" - params.append(filters["limit"]) - - self.cursor.execute(sql, params) - return [dict(row) for row in self.cursor.fetchall()] - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None + return self.generic_query( + table_name = self.tbl_name_thelordofporn_actress, + fields = ['href', 'pornstar as name', 'id'], + filters = filters + ) # 按条件查询 href 列表 def get_iafd_actors( @@ -1573,155 +1020,52 @@ class IAFDDBHandler(SQLiteDBHandler): return final_result - # 获取 view_iafd_performers_movies 中数据 不匹配的演员信息。 - def get_performers_needed_update(self, limit=None): - try: - sql = """ - SELECT href, name FROM view_iafd_performers_movies where actual_movies_cnt != movies_cnt - """ - - if limit is not None: - sql += f" LIMIT {limit}" +if __name__ == "__main__": + db = IAFDDBHandler() + print(f"case1, 预期: 有结果:") + print(db.get_movies( + is_full_data=0, + limit =5 + ) + ) - self.cursor.execute(sql) - return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()] - - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return [] - - # 生成一个复杂的演员电影数量的查询视图,来判断从电影列表中聚合出来的演员-影片数量,与从演员列表中抓取到的影片数量,是否相等。 - def check_and_create_stat_table(self, taskid = 0): - try: - # 检查索引是否存在,如果不存在则创建 - indexes = [ - ("idx_iafd_performers_movies_performer_id", - "CREATE INDEX idx_iafd_performers_movies_performer_id ON iafd_performers_movies (performer_id);"), - ("idx_iafd_movies_director_id", - "CREATE INDEX idx_iafd_movies_director_id ON iafd_movies (director_id);"), - ("idx_iafd_performers_id", - "CREATE INDEX idx_iafd_performers_id ON iafd_performers (id);") - ] - for index_name, create_index_sql in indexes: - self.cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name=?", (index_name,)) - if not self.cursor.fetchone(): - self.cursor.execute(create_index_sql) - logging.info(f"Index {index_name} created successfully.") - else: - logging.info(f"Index {index_name} already exists.") + print(f"\n\ncase2, 预期: 有结果,正常:") + print(db.get_movies( + is_full_data__in=[1, 404], + limit =5 + ) + ) - # 检查视图是否存在,如果不存在则创建 - view_name = f"iafd_tmp_performers_stat_{taskid}" - self.cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (view_name,)) - if self.cursor.fetchone(): - self.cursor.execute("drop table ?", (view_name,)) - self.conn.commit() + print(f"\n\ncase3, 预期: 有结果,有告警: created_at_gt 写的不对,应该是两个下划线:") + print(db.get_movies( + title__like='creampie', + limit =5, + created_at_gt='2025-07-25', + order_by = 'id' + ) + ) - create_view_sql = f""" - CREATE table {view_name} AS - SELECT - id, - href, - name, - movies_cnt, - SUM(CASE WHEN role = 'actor' THEN movie_count ELSE 0 END) AS actor_movie_count, - SUM(CASE WHEN role = 'director' THEN movie_count ELSE 0 END) AS director_movie_count - FROM ( - SELECT - p.id, - p.href, - p.name, - p.movies_cnt, - COUNT(apm.movie_id) AS movie_count, - 'actor' AS role - FROM - iafd_performers p - LEFT JOIN - iafd_performers_movies apm ON p.id = apm.performer_id - GROUP BY - p.id, p.href, p.name, p.movies_cnt + print(f"\n\ncase4, 预期: 有结果(空集),正常:") + print(db.get_movies( + title__like='Creampied', + limit =5, + created_at__gt='2025-07-25', + order_by = 'id asc' + ) + ) - UNION ALL - - SELECT - p.id, - p.href, - p.name, - p.movies_cnt, - COUNT(im.id) AS movie_count, - 'director' AS role - FROM - iafd_performers p - LEFT JOIN - iafd_movies im ON p.id = im.director_id - GROUP BY - p.id, p.href, p.name, p.movies_cnt - ) combined - GROUP BY - id, href, name, movies_cnt; - """ - self.cursor.execute(create_view_sql) - logging.info(f"table {view_name} created successfully.") - - # 提交更改并关闭连接 - self.conn.commit() - except sqlite3.Error as e: - logging.warning(f"An error occurred: {e}") + print(f"\n\ncase5, 预期: 有结果,正常,仅有一个结果字段时,直接变成简单列表:") + print(db.query_studio_hrefs( + limit =10 + ) + ) - # 处理影片的 无码 字段 - def reset_actor_movies(self, check_and_do = 0): - try: - # 检查表中是否已存在movies_cnt列 - self.cursor.execute(f"PRAGMA table_info(iafd_performers);") - columns = [row[1] for row in cursor.fetchall()] - - if 'movies_cnt' not in columns: - # 列不存在,添加新列 - add_field_sql = f""" - ALTER TABLE iafd_performers ADD COLUMN movies_cnt INTEGER DEFAULT 0 NOT NULL; - """ - self.cursor.execute(add_field_sql) - logging.info("成功添加movies_cnt字段") - else: - logging.info("movies_cnt字段已存在,跳过添加") - - # 确保关联表有索引 - self.cursor.execute(f""" - CREATE INDEX IF NOT EXISTS idx_iafd_performers_movies_performer_id - ON iafd_performers_movies(performer_id); - """) - - # 创建临时表存储统计结果 - self.cursor.execute(f""" - CREATE TEMPORARY TABLE temp_actor_counts AS - SELECT performer_id, COUNT(movie_id) AS cnt - FROM iafd_performers_movies - GROUP BY performer_id; - """) - - # 为临时表添加索引 - self.cursor.execute("CREATE INDEX idx_temp_performer_id ON temp_actor_counts(performer_id);") - - # 更新主表 - self.cursor.execute(f""" - UPDATE iafd_performers - SET movies_cnt = COALESCE(( - SELECT cnt FROM temp_actor_counts - WHERE performer_id = iafd_performers.id - ), 0); -- 使用COALESCE处理没有影片的演员 - """) - - updated_rows = cursor.rowcount - logging.info(f"成功更新{updated_rows}个演员的影片数量") - - # 清理资源 - self.cursor.execute("DROP TABLE IF EXISTS temp_actor_counts;") - self.conn.commit() - - logging.info("任务执行完成!") - - except sqlite3.Error as e: - self.conn.rollback() - logging.error("Error updating actor movie_cnt: %s", e) + print(f"\n\ncase6, 预期: 有结果,正常:") + print(db.get_lord_actors( + limit =10 + ) + ) + print(f"\n\ncase, 统计优化:") + print(db.get_stat()) \ No newline at end of file diff --git a/scrapy_proj/scrapy_proj/db_wapper/sqlite_base.py b/scrapy_proj/scrapy_proj/db_wapper/sqlite_base.py index 03bd852..2ed4082 100644 --- a/scrapy_proj/scrapy_proj/db_wapper/sqlite_base.py +++ b/scrapy_proj/scrapy_proj/db_wapper/sqlite_base.py @@ -1,7 +1,9 @@ import os +import re import sqlite3 import logging from datetime import datetime +from typing import List, Dict, Optional, Any home_dir = os.path.expanduser("~") global_share_data_dir = f'{home_dir}/sharedata' @@ -49,6 +51,7 @@ class SQLiteDBHandler(metaclass=SingletonMeta): # 应用单例元类 self.lower_sqlite_version = True self.initialized = True # 标记初始化完成 + self._column_cache = {} # 缓存表字段信息,避免重复查询 def __del__(self): try: @@ -64,14 +67,15 @@ class SQLiteDBHandler(metaclass=SingletonMeta): # 应用单例元类 raise NotImplementedError("子类必须实现 insert_item 方法") def get_table_columns_and_defaults(self, tbl_name): + """获取表的字段信息(含默认值),并缓存结果""" + if tbl_name in self._column_cache: + return self._column_cache[tbl_name] + try: self.cursor.execute(f"PRAGMA table_info({tbl_name})") columns = self.cursor.fetchall() - column_info = {} - for col in columns: - col_name = col[1] - default_value = col[4] - column_info[col_name] = default_value + column_info = {col[1]: col[4] for col in columns} # col[1]是字段名,col[4]是默认值 + self._column_cache[tbl_name] = column_info # 缓存结果 return column_info except sqlite3.Error as e: logging.error(f"Error getting table columns: {e}") @@ -262,3 +266,322 @@ class SQLiteDBHandler(metaclass=SingletonMeta): # 应用单例元类 def get_stat(self): return {} + + + def _validate_fields(self, tbl_name: str, fields: List[str]) -> List[str]: + """验证查询字段是否合法,返回有效字段列表""" + column_info = self.get_table_columns_and_defaults(tbl_name) + if not column_info: + return [] + + valid_fields = [] + for field in fields: + # 处理带别名的字段(如 "pornstar as name") + match = re.match(r'^(\w+)\s+as\s+\w+$', field, re.IGNORECASE) + if match: + raw_field = match.group(1) # 提取原始字段名(如 "pornstar") + else: + raw_field = field # 普通字段(如 "id"、"name") + + if raw_field in column_info: + valid_fields.append(field) + else: + logging.warning(f"无效查询字段: 表={tbl_name}, 字段={field}") + return valid_fields + + def _validate_filter_fields(self, tbl_name: str, filters: Dict[str, Any], + condition_mapping: Dict[str, str]) -> Dict[str, Any]: + """验证过滤条件中的字段是否合法,返回有效条件字典""" + column_info = self.get_table_columns_and_defaults(tbl_name) + if not column_info: + return {} + + valid_filters = {} + for key, value in filters.items(): + # 跳过排序和限制(单独处理) + if key in ["order_by", "limit"]: + valid_filters[key] = value + continue + + # 解析字段名(处理 "__" 分隔的条件类型) + if "__" in key: + field_base, _ = key.split("__", 1) + else: + field_base = key + + # 映射到表实际字段 + mapped_field = condition_mapping.get(field_base, field_base) + + if mapped_field in column_info: + valid_filters[key] = value + else: + logging.warning(f"无效过滤字段: 表={tbl_name}, 字段={field_base} (映射后={mapped_field})") + return valid_filters + + def _validate_order_fields(self, tbl_name: str, allowed_order_fields: List[str]) -> List[str]: + """验证排序字段是否合法,返回有效排序字段列表""" + column_info = self.get_table_columns_and_defaults(tbl_name) + if not column_info: + return [] + + valid_order_fields = [] + for field in allowed_order_fields: + # 处理带排序方向的字段(如 "id DESC"、"name ASC") + raw_field = field.split()[0].strip() # 提取纯字段名 + if raw_field in column_info: + valid_order_fields.append(field) + else: + logging.warning(f"无效排序字段: 表={tbl_name}, 字段={field}") + return valid_order_fields + + def generic_query( + self, + table_name: str, + fields: List[str], + filters: Dict[str, Any], + condition_mapping: Optional[Dict[str, str]] = None, + allowed_order_fields: Optional[List[str]] = None, + simplify_single_field: bool = True # 新增参数:是否简化单字段结果 + ) -> Optional[List[Dict[str, Any]]]: + """ + 带字段合法性校验的通用单表查询函数 + 新增逻辑:自动校验查询字段、过滤字段和排序字段的有效性 + """ + try: + condition_mapping = condition_mapping or {} + allowed_order_fields = allowed_order_fields or [] + + # 1. 校验并过滤查询字段(fields) + valid_fields = self._validate_fields(table_name, fields) + if not valid_fields: + logging.error(f"无有效查询字段: 表={table_name}") + return None + select_fields = ", ".join(valid_fields) + is_single_field = len(valid_fields) == 1 # 判断是否单字段查询 + + # 2. 校验并过滤条件字段(filters) + valid_filters = self._validate_filter_fields( + table_name, filters, condition_mapping + ) + + # 3. 校验排序字段(allowed_order_fields) + valid_order_fields = self._validate_order_fields( + table_name, allowed_order_fields + ) + + # 构建SQL基础 + sql = f"SELECT {select_fields} FROM {table_name} WHERE 1=1" + params = [] + + # 处理查询条件(基于校验后的valid_filters) + for key, value in valid_filters.items(): + if key in ["order_by", "limit"]: + continue + + if "__" in key: + field_base, condition_type = key.split("__", 1) + else: + field_base, condition_type = key, "eq" + + field = condition_mapping.get(field_base, field_base) + + # 生成SQL片段(逻辑与之前一致) + if condition_type == "eq": + sql += f" AND {field} = ?" + params.append(value) + elif condition_type == "like": + sql += f" AND {field} LIKE ?" + params.append(f"%{value}%") + elif condition_type == "in": + if isinstance(value, list): + placeholders = ", ".join(["?"] * len(value)) + sql += f" AND {field} IN ({placeholders})" + params.extend(value) + else: + logging.warning(f"IN条件值必须是列表,键: {key}") + elif condition_type == "not_in": + if isinstance(value, list): + placeholders = ", ".join(["?"] * len(value)) + sql += f" AND {field} NOT IN ({placeholders})" + params.extend(value) + else: + logging.warning(f"NOT IN条件值必须是列表,键: {key}") + elif condition_type == "gt": + sql += f" AND {field} > ?" + params.append(value) + elif condition_type == "lt": + sql += f" AND {field} < ?" + params.append(value) + else: + logging.warning(f"不支持的条件类型: {condition_type},键: {key}") + + # 处理排序(基于校验后的valid_order_fields) + if "order_by" in valid_filters: + sql += f" ORDER BY {valid_filters["order_by"]}" + + ''' 加校验的这段屏蔽掉 + if "order_by" in valid_filters: + order_field = valid_filters["order_by"] + # 检查排序字段是否在允许的列表中(且已通过合法性校验) + if any((order_field.startswith(valid) or valid.startswith(order_field)) for valid in valid_order_fields): + sql += f" ORDER BY {order_field}" + else: + logging.warning(f"不允许的排序字段: {order_field},表={table_name}") + ''' + + # 处理限制条数 + if "limit" in valid_filters: + sql += " LIMIT ?" + params.append(valid_filters["limit"]) + + # 执行查询 + self.cursor.execute(sql, params) + rows = self.cursor.fetchall() + + # 4. 处理结果:单字段查询时简化为值数组 + if not rows: + return [] # 空结果返回空列表 + + if is_single_field and simplify_single_field: + # 提取单字段的值(支持带别名的字段,如 "pornstar as name" 取 "name") + field_key = valid_fields[0].split(" as ")[-1].strip().lower() + return [row[field_key] for row in rows] + else: + # 多字段返回字典列表 + return [dict(row) for row in rows] + + except sqlite3.Error as e: + logging.error(f"查询失败: 表={table_name}, 错误={e}") + return None + + def generic_stats_query(self, stats_config: List[Dict[str, str]]) -> Dict[str, int]: + """ + 通用统计查询方法:通过配置列表定义统计项,自动生成并执行SQL + + 参数: + stats_config: 统计项配置列表,每个元素为字典,包含: + - 'table': 要统计的表名(必填) + - 'alias': 统计结果的别名(必填,如'actors'、'mov_full') + - 'where': 过滤条件(可选,如'uncensored=1 AND is_full_data=1') + + 返回: + 统计结果字典,键为alias,值为统计数(int) + """ + try: + # 1. 生成子查询列表(每个统计项对应一个子查询) + subqueries = [] + for config in stats_config: + table = config.get('table') + alias = config.get('alias') + where_clause = config.get('where') + + # 校验必填参数 + if not (table and alias): + logging.warning(f"统计项配置不完整:{config},跳过") + continue + + # 构建单个子查询(如 "SELECT COUNT(*) FROM actors WHERE uncensored=1 AS act_un") + subquery = f"(SELECT COUNT(*) FROM {table}" + if where_clause: + subquery += f" WHERE {where_clause}" + subquery += f") AS {alias}" + subqueries.append(subquery) + + if not subqueries: + logging.warning("无有效统计项配置,返回空结果") + return {} + + # 2. 组合成完整SQL + sql = f"SELECT {', '.join(subqueries)}" + + # 3. 执行查询 + self.cursor.execute(sql) + row = self.cursor.fetchone() + if not row: + logging.warning("统计查询无结果") + return {} + + # 4. 提取列名(alias)并映射结果 + columns = [desc[0] for desc in self.cursor.description] # 获取别名列表 + result = dict(zip(columns, row)) + + # 确保所有值都是整数(COUNT(*)返回的是数字,转换为int避免类型问题) + return {k: int(v) if v is not None else 0 for k, v in result.items()} + + except sqlite3.Error as e: + logging.error(f"统计查询失败: {e}") + return {} + + def generic_get_record_count( + self, + table_name: str, + conditions: Optional[Dict[str, any]] = None, + condition_mapping: Optional[Dict[str, str]] = None + ) -> int: + """ + 通用记录数查询:查询指定表中满足条件的记录数量 + + 参数: + table_name: 要查询的表名 + conditions: 查询条件字典,格式与generic_query中的filters一致 + (如{'is_full_data': 1, 'url': 'xxx'}表示is_full_data=1 AND url='xxx') + condition_mapping: 字段映射(同generic_query,将条件键映射到表实际字段) + + 返回: + 满足条件的记录数(int),查询失败返回0 + """ + try: + condition_mapping = condition_mapping or {} + conditions = conditions or {} + + # 1. 构建基础SQL + sql = f"SELECT COUNT(*) AS cnt FROM {table_name} WHERE 1=1" + params = [] + + # 2. 处理查询条件(复用之前的条件解析逻辑) + for key, value in conditions.items(): + # 解析条件类型(支持基础的eq,其他复杂条件可按需扩展) + if "__" in key: + field_base, condition_type = key.split("__", 1) + else: + field_base, condition_type = key, "eq" # 默认等于 + + # 映射到表实际字段 + field = condition_mapping.get(field_base, field_base) + + # 生成条件SQL(目前支持eq/like/in/not_in/gt/lt,与generic_query保持一致) + if condition_type == "eq": + sql += f" AND {field} = ?" + params.append(value) + elif condition_type == "like": + sql += f" AND {field} LIKE ?" + params.append(f"%{value}%") + elif condition_type == "in": + if isinstance(value, list): + placeholders = ", ".join(["?"] * len(value)) + sql += f" AND {field} IN ({placeholders})" + params.extend(value) + elif condition_type == "not_in": + if isinstance(value, list): + placeholders = ", ".join(["?"] * len(value)) + sql += f" AND {field} NOT IN ({placeholders})" + params.extend(value) + elif condition_type == "gt": + sql += f" AND {field} > ?" + params.append(value) + elif condition_type == "lt": + sql += f" AND {field} < ?" + params.append(value) + else: + logging.warning(f"不支持的条件类型: {condition_type},键: {key}") + + # 3. 执行查询 + self.cursor.execute(sql, params) + row = self.cursor.fetchone() + + # 4. 解析结果(确保返回整数,默认0) + return int(row[0]) if row and row[0] is not None else 0 + + except sqlite3.Error as e: + logging.error(f"记录数查询失败: 表={table_name}, 错误={e}") + return 0 diff --git a/scrapy_proj/scrapy_proj/spiders/iafd_spider.py b/scrapy_proj/scrapy_proj/spiders/iafd_spider.py index 1c37ab5..cb3566e 100644 --- a/scrapy_proj/scrapy_proj/spiders/iafd_spider.py +++ b/scrapy_proj/scrapy_proj/spiders/iafd_spider.py @@ -81,8 +81,7 @@ class IAFDSpider(BaseSpider): if self.debug: query_args['limit'] = 5 if self.update_mode: - query_args['is_full_data'] = 0 - query_args['is_full_data'] = 404 + query_args['is_full_data__in'] = [0,404] # 读取待更新的演员列表 if self.cmd_performers in self.cmd_list: @@ -347,22 +346,22 @@ class IAFDSpider(BaseSpider): def load_existed_actors(self): query_args = {} - rows = db_tools.query_performer_hrefs(**query_args) + rows = db_tools.get_performers(**query_args) if rows: for item in rows: self.existed_actors[item['href']] = {'is_full_data': item['is_full_data'], 'movies_cnt': item['movies_cnt']} else: - self.logger.warning(f"query_performer_hrefs empty. query args: {query_args}") + self.logger.warning(f"get_performers empty. query args: {query_args}") def load_existed_movies(self): query_args = {} - rows = db_tools.query_movie_hrefs(**query_args) + rows = db_tools.get_movies(**query_args) if rows: for item in rows: self.existed_movies[item['href']] = item['is_full_data'] else: - self.logger.warning(f"query_movies empty. query args: {query_args}") + self.logger.warning(f"get_movies empty. query args: {query_args}") # 内存缓存,也可以改为查询db def need_update_movie(self, href): diff --git a/scrapy_proj/scrapy_proj/tools/db_tools.py b/scrapy_proj/scrapy_proj/tools/db_tools.py new file mode 100644 index 0000000..bd4ebd9 --- /dev/null +++ b/scrapy_proj/scrapy_proj/tools/db_tools.py @@ -0,0 +1,233 @@ +import os +import sqlite3 +import json +import logging +from datetime import datetime +from typing import List, Dict +from scrapy_proj.db_wapper.sqlite_base import SQLiteDBHandler, default_dbpath, shared_db_path, test_db_path +import scrapy_proj.comm.comm_def as comm +import scrapy_proj.items as items_def +from scrapy_proj.utils.utils import pretty_json_simple, is_valid_url + +class IAFDDBHandler(SQLiteDBHandler): + def __init__(self, db_path=shared_db_path): + super().__init__(db_path) + self.tbl_name_performers = 'iafd_performers' + self.tbl_name_movies = 'iafd_movies' + self.tbl_name_performer_movies = 'iafd_performers_movies' + self.tbl_name_alias = 'iafd_performer_aliases' + self.tbl_name_moives_appear_in = 'iafd_movies_appers_in' + self.tbl_name_studio = 'iafd_studios' + self.tbl_name_dist = 'iafd_distributors' + self.tbl_name_performer_urls = 'iafd_performer_urls' + self.tbl_name_ethnic = 'iafd_meta_ethnic' + self.tbl_name_thelordofporn_actress = 'thelordofporn_actress' + + # 获取 view_iafd_performers_movies 中数据 不匹配的演员信息。 + def get_performers_needed_update(self, limit=None): + try: + sql = """ + SELECT href, name FROM view_iafd_performers_movies where actual_movies_cnt != movies_cnt + """ + + if limit is not None: + sql += f" LIMIT {limit}" + + self.cursor.execute(sql) + return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()] + + except sqlite3.Error as e: + logging.error(f"查询 href 失败: {e}") + return [] + + # 生成一个复杂的演员电影数量的查询视图,来判断从电影列表中聚合出来的演员-影片数量,与从演员列表中抓取到的影片数量,是否相等。 + def check_and_create_stat_table(self, taskid = 0): + try: + # 检查索引是否存在,如果不存在则创建 + indexes = [ + ("idx_iafd_performers_movies_performer_id", + "CREATE INDEX idx_iafd_performers_movies_performer_id ON iafd_performers_movies (performer_id);"), + ("idx_iafd_movies_director_id", + "CREATE INDEX idx_iafd_movies_director_id ON iafd_movies (director_id);"), + ("idx_iafd_performers_id", + "CREATE INDEX idx_iafd_performers_id ON iafd_performers (id);") + ] + for index_name, create_index_sql in indexes: + self.cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name=?", (index_name,)) + if not self.cursor.fetchone(): + self.cursor.execute(create_index_sql) + logging.info(f"Index {index_name} created successfully.") + else: + logging.info(f"Index {index_name} already exists.") + + # 检查视图是否存在,如果不存在则创建 + view_name = f"iafd_tmp_performers_stat_{taskid}" + self.cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (view_name,)) + if self.cursor.fetchone(): + self.cursor.execute("drop table ?", (view_name,)) + self.conn.commit() + + create_view_sql = f""" + CREATE table {view_name} AS + SELECT + id, + href, + name, + movies_cnt, + SUM(CASE WHEN role = 'actor' THEN movie_count ELSE 0 END) AS actor_movie_count, + SUM(CASE WHEN role = 'director' THEN movie_count ELSE 0 END) AS director_movie_count + FROM ( + SELECT + p.id, + p.href, + p.name, + p.movies_cnt, + COUNT(apm.movie_id) AS movie_count, + 'actor' AS role + FROM + iafd_performers p + LEFT JOIN + iafd_performers_movies apm ON p.id = apm.performer_id + GROUP BY + p.id, p.href, p.name, p.movies_cnt + + UNION ALL + + SELECT + p.id, + p.href, + p.name, + p.movies_cnt, + COUNT(im.id) AS movie_count, + 'director' AS role + FROM + iafd_performers p + LEFT JOIN + iafd_movies im ON p.id = im.director_id + GROUP BY + p.id, p.href, p.name, p.movies_cnt + ) combined + GROUP BY + id, href, name, movies_cnt; + """ + self.cursor.execute(create_view_sql) + logging.info(f"table {view_name} created successfully.") + + # 提交更改并关闭连接 + self.conn.commit() + except sqlite3.Error as e: + logging.warning(f"An error occurred: {e}") + + # 处理影片的 无码 字段 + def reset_movies_uncensored(self, check_and_do = 0): + try: + logging.info("创建临时表以便于保存待更新记录") + self.cursor.execute(""" + CREATE TEMPORARY TABLE IF NOT EXISTS temp_movies_to_update ( + movie_id INTEGER PRIMARY KEY + ) + """) + # 清空临时表(以防之前有残留数据) + self.cursor.execute("DELETE FROM temp_movies_to_update") + + logging.info(f"开始收集需要更新的影片ID...") + # 使用单个SQL语句完成所有条件的查询和插入 + self.cursor.execute(""" + INSERT OR IGNORE INTO temp_movies_to_update (movie_id) + SELECT DISTINCT m.id + FROM javbus_movies m + -- 连接演员表 + LEFT JOIN javbus_actors_movies am ON m.id = am.movie_id + LEFT JOIN javbus_actors a ON am.actor_id = a.id + -- 连接标签/系列/工作室表 + LEFT JOIN javbus_labels l ON m.label_id = l.id + LEFT JOIN javbus_series s ON m.series_id = s.id + LEFT JOIN javbus_studios st ON m.studio_id = st.id + -- 筛选条件:任一表的href包含'uncensored' + WHERE a.href LIKE '%uncensored%' + OR l.href LIKE '%uncensored%' + OR s.href LIKE '%uncensored%' + OR st.href LIKE '%uncensored%' + """) + + total_count = self.cursor.execute("SELECT COUNT(*) FROM temp_movies_to_update").fetchone()[0] + total_movies = self.cursor.execute("SELECT COUNT(*) FROM javbus_movies").fetchone()[0] + logging.info(f"共收集到 {total_count} 部需要更新的影片, 共有 {total_movies} 部影片") + + if check_and_do: + # 1. 将所有记录的uncensored默认设为0 + logging.info("开始将所有影片的uncensored设为默认值0...") + self.cursor.execute("UPDATE javbus_movies SET uncensored = 0") + logging.info(f"已将 {self.cursor.rowcount} 条记录的uncensored设为0") + + # 2. 将临时表中匹配的记录设为1 + logging.info("开始将匹配的影片的uncensored设为1...") + self.cursor.execute(""" + UPDATE javbus_movies + SET uncensored = 1 + WHERE id IN (SELECT movie_id FROM temp_movies_to_update) + """) + logging.info(f"已将 {self.cursor.rowcount} 条记录的uncensored设为1") + + self.conn.commit() + else: + logging.info("check完毕,本次忽略更新。。。") + + logging.info("任务执行完成!") + + except sqlite3.Error as e: + self.conn.rollback() + logging.error("Error inserting movie: %s", e) + logging.error(f"query error: {e}") + + # 处理影片的 无码 字段 + def reset_actor_movies(self, check_and_do = 0): + try: + # 检查表中是否已存在movies_cnt列 + self.cursor.execute(f"PRAGMA table_info({self.tbl_name_actors});") + columns = [row[1] for row in self.cursor.fetchall()] + + if 'movies_cnt' not in columns: + # 列不存在,添加新列 + add_field_sql = f""" + ALTER TABLE {self.tbl_name_actors} ADD COLUMN movies_cnt INTEGER DEFAULT 0 NOT NULL; + """ + self.cursor.execute(add_field_sql) + logging.info("成功添加movies_cnt字段") + else: + logging.info("movies_cnt字段已存在,跳过添加") + + # 确保关联表有索引 + self.cursor.execute(f""" + CREATE INDEX IF NOT EXISTS idx_actor_movie_actor_id + ON {self.tbl_name_actor_movie}(actor_id); + """) + + # 创建临时表存储统计结果 + self.cursor.execute(f""" + CREATE TEMPORARY TABLE temp_actor_counts AS + SELECT actor_id, COUNT(movie_id) AS cnt + FROM {self.tbl_name_actor_movie} + GROUP BY actor_id; + """) + + # 为临时表添加索引 + self.cursor.execute("CREATE INDEX idx_temp_actor_id ON temp_actor_counts(actor_id);") + + # 更新主表 + self.cursor.execute(f""" + UPDATE {self.tbl_name_actors} + SET movies_cnt = COALESCE(( + SELECT cnt FROM temp_actor_counts + WHERE actor_id = {self.tbl_name_actors}.id + ), 0); -- 使用COALESCE处理没有影片的演员 + """) + updated_rows = self.cursor.rowcount + logging.info(f"成功更新{updated_rows}个演员的影片数量") + + self.conn.commit() + logging.info("任务执行完成!") + + except sqlite3.Error as e: + self.conn.rollback() + logging.error("Error updating actor movie_cnt: %s", e)