From 45271a5b23883552c31f0f9821bb1a6cae0dc86a Mon Sep 17 00:00:00 2001 From: sophon Date: Sun, 27 Jul 2025 16:03:59 +0800 Subject: [PATCH] modify scripts --- scrapy_proj/cron/cron_scheduler.sh | 1 + .../db_wapper/spider_db_handler.py | 986 +++++++++++++----- .../scrapy_proj/db_wapper/sqlite_base.py | 1 + scrapy_proj/scrapy_proj/items.py | 71 +- scrapy_proj/scrapy_proj/pipelines.py | 2 +- .../scrapy_proj/spiders/base_spider.py | 10 + .../scrapy_proj/spiders/iafd_spider.py | 216 +++- .../scrapy_proj/spiders/parser/iafd_parser.py | 41 +- 8 files changed, 974 insertions(+), 354 deletions(-) diff --git a/scrapy_proj/cron/cron_scheduler.sh b/scrapy_proj/cron/cron_scheduler.sh index a74c20d..b377305 100755 --- a/scrapy_proj/cron/cron_scheduler.sh +++ b/scrapy_proj/cron/cron_scheduler.sh @@ -136,6 +136,7 @@ if [ "${PERIOD}" = "--monthly" ]; then register_spider "pbox" "scrapy crawl pbox -a begin=${COMMON_DATE_PARAM} -a mod='update' " register_spider "javhd" "scrapy crawl javhd -a mod='update' " register_spider "lord" "scrapy crawl lord -a mod='update' " + register_spider "javbus" "scrapy crawl javbus -a cmd='actors' -s HTTPCACHE_DIR=/home/ubuntu/sharedata/scrapy_cached/ " fi diff --git a/scrapy_proj/scrapy_proj/db_wapper/spider_db_handler.py b/scrapy_proj/scrapy_proj/db_wapper/spider_db_handler.py index 7a0086d..50baa5b 100644 --- a/scrapy_proj/scrapy_proj/db_wapper/spider_db_handler.py +++ b/scrapy_proj/scrapy_proj/db_wapper/spider_db_handler.py @@ -4,10 +4,10 @@ import json import logging from datetime import datetime from typing import List, Dict -from scrapy_proj.db_wapper.sqlite_base import SQLiteDBHandler, default_dbpath, shared_db_path +from scrapy_proj.db_wapper.sqlite_base import SQLiteDBHandler, default_dbpath, shared_db_path, test_db_path import scrapy_proj.comm.comm_def as comm import scrapy_proj.items as items_def -from scrapy_proj.utils.utils import pretty_json_simple +from scrapy_proj.utils.utils import pretty_json_simple, is_valid_url # 注册器字典 spider_handler_registry = {} @@ -190,251 +190,6 @@ class ClmDBHandler(SQLiteDBHandler): logging.error(f"query error: {e}") return {} - -@register_handler(comm.SPIDER_NAME_IAFD) -class IAFDDBHandler(SQLiteDBHandler): - def __init__(self, db_path=shared_db_path): - super().__init__(db_path) - self.tbl_name_performers = 'iafd_performers' - self.tbl_name_movies = 'iafd_movies' - self.uniq_key = 'href' - self.tbl_name_thelordofporn_actress = 'thelordofporn_actress' - - def insert_item(self, item): - pass - - # 按条件查询 href 列表 - def get_performers(self, **filters): - try: - sql = f"SELECT href, name, id, movies_cnt FROM {self.tbl_name_performers} WHERE 1=1" - params = [] - - conditions = { - "id": " AND id = ?", - "href": " AND href = ?", - "name": " AND name LIKE ?", - "is_full_data": " AND is_full_data = ?", - "start_id": " AND id > ?", - } - - for key, condition in conditions.items(): - if key in filters: - sql += condition - if key == "name": - params.append(f"%{filters[key]}%") - else: - params.append(filters[key]) - - for key in ["is_full_data_in", "is_full_data_not_in"]: - if key in filters: - values = filters[key] - if values: - placeholders = ", ".join(["?"] * len(values)) - operator = "IN" if key == "is_full_data_in" else "NOT IN" - sql += f" AND is_full_data {operator} ({placeholders})" - params.extend(values) - - if "order_by" in filters: - # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 - sql += f" ORDER BY {filters['order_by']} " - - if 'limit' in filters: - sql += " LIMIT ?" - params.append(filters["limit"]) - - self.cursor.execute(sql, params) - return [dict(row) for row in self.cursor.fetchall()] - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None - - - # 按条件查询 href 列表 - def get_movies(self, **filters): - try: - sql = f"SELECT href, title, id FROM {self.tbl_name_movies} WHERE 1=1" - params = [] - - conditions = { - "id": " AND id = ?", - "href": " AND href = ?", - "title": " AND title LIKE ?", - "is_full_data": " AND is_full_data = ?", - "start_id": " AND id > ?", - } - - for key, condition in conditions.items(): - if key in filters: - sql += condition - if key == "title": - params.append(f"%{filters[key]}%") - else: - params.append(filters[key]) - - for key in ["is_full_data_in", "is_full_data_not_in"]: - if key in filters: - values = filters[key] - if values: - placeholders = ", ".join(["?"] * len(values)) - operator = "IN" if key == "is_full_data_in" else "NOT IN" - sql += f" AND is_full_data {operator} ({placeholders})" - params.extend(values) - - if "order_by" in filters: - # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 - sql += f" ORDER BY {filters['order_by']} " - - if 'limit' in filters: - sql += " LIMIT ?" - params.append(filters["limit"]) - - self.cursor.execute(sql, params) - return [dict(row) for row in self.cursor.fetchall()] - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None - - - # 按条件查询 href 列表 - def get_lord_actors(self, **filters): - try: - sql = f"SELECT href, pornstar as name, id FROM {self.tbl_name_thelordofporn_actress} WHERE 1=1" - params = [] - - conditions = { - "id": " AND id = ?", - "href": " AND href = ?", - "pornstar": " AND pornstar LIKE ?", - "start_id": " AND id > ?", - } - - for key, condition in conditions.items(): - if key in filters: - sql += condition - if key == "pornstar": - params.append(f"%{filters[key]}%") - else: - params.append(filters[key]) - - if "order_by" in filters: - # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 - sql += f" ORDER BY {filters['order_by']} " - - if 'limit' in filters: - sql += " LIMIT ?" - params.append(filters["limit"]) - - self.cursor.execute(sql, params) - return [dict(row) for row in self.cursor.fetchall()] - except sqlite3.Error as e: - logging.error(f"查询 href 失败: {e}") - return None - - # 按条件查询 href 列表 - def get_iafd_actors( - self, - names: List[str], - tbl = 'stu' - ) -> Dict[str, List[Dict[str, str]]]: - """ - 分两步查询指定发行商对应的女性演员(使用临时表减少内存占用) - - 步骤1:筛选目标发行商及其关联的影片,存入临时表(小集合) - 步骤2:用临时表的影片ID关联演员表,获取女性演员信息 - """ - tbl_name = 'iafd_studios' if tbl.lower() == 'stu' else 'iafd_distributors' - join_key = 'studio_id' if tbl.lower() == 'stu' else 'distributor_id' - if not names: - return {} - - # 结果容器 - final_result: Dict[str, List[Dict[str, str]]] = {} - - try: - # -------------------------- - # 步骤1:创建临时表,存储目标发行商及其关联的影片 - # -------------------------- - # 先删除可能残留的临时表(避免冲突) - self.cursor.execute("DROP TABLE IF EXISTS temp_distributor_movies") - # 创建临时表(只在当前连接可见,连接关闭后自动删除) - self.cursor.execute(""" - CREATE TEMPORARY TABLE temp_distributor_movies ( - distributor_id INTEGER, - distributor_name TEXT, - movie_id INTEGER, - PRIMARY KEY (distributor_id, movie_id) - ) - """) - - # 批量插入目标发行商及其关联的影片(小集合) - # 先筛选发行商,再关联影片,结果插入临时表 - insert_sql = """ - INSERT INTO temp_distributor_movies (distributor_id, distributor_name, movie_id) - SELECT - d.id AS distributor_id, - d.name AS distributor_name, - m.id AS movie_id - FROM - {tbl_name} d - INNER JOIN - iafd_movies m ON d.id = m.{join_key} - WHERE - d.name IN ({placeholders}) - """.format( - tbl_name=tbl_name, - join_key=join_key, - placeholders=', '.join(['?'] * len(names)) - ) - - logging.debug(f'{insert_sql}') - - self.cursor.execute(insert_sql, names) - self.conn.commit() # 提交临时表数据 - - # -------------------------- - # 步骤2:用临时表关联演员信息(仅处理小集合) - # -------------------------- - query_sql = """ - SELECT - t.distributor_name, - p.name AS performer_name, - p.href AS performer_href - FROM - temp_distributor_movies t - INNER JOIN - iafd_performers_movies pm ON t.movie_id = pm.movie_id - INNER JOIN - iafd_performers p ON pm.performer_id = p.id - WHERE - p.gender = 'Woman' - ORDER BY - t.distributor_name, p.name - """ - - self.cursor.execute(query_sql) - rows = self.cursor.fetchall() - - # 整理结果:按发行商分组 - for row in rows: - dist_name = row['distributor_name'] - performer = { - 'name': row['performer_name'], - 'href': row['performer_href'] - } - if dist_name not in final_result: - final_result[dist_name] = [] - final_result[dist_name].append(performer) - - # 主动清理临时表(可选,连接关闭后会自动删除) - self.cursor.execute("DROP TABLE IF EXISTS temp_distributor_movies") - - except sqlite3.Error as e: - print(f"查询失败:{e}") - return {} - - return final_result - - @register_handler(comm.SPIDER_NAME_PBOX) class PboxDBHandler(SQLiteDBHandler): def __init__(self, db_path=shared_db_path): @@ -1219,3 +974,740 @@ class JavBusDBHandler(SQLiteDBHandler): except sqlite3.Error as e: self.conn.rollback() logging.error("Error updating actor movie_cnt: %s", e) + + +@register_handler(comm.SPIDER_NAME_IAFD) +class IAFDDBHandler(SQLiteDBHandler): + #def __init__(self, db_path=shared_db_path): + def __init__(self, db_path=test_db_path): + super().__init__(db_path) + self.tbl_name_performers = 'iafd_performers' + self.tbl_name_movies = 'iafd_movies' + self.tbl_name_performer_movies = 'iafd_performers_movies' + self.tbl_name_alias = 'iafd_performer_aliases' + self.tbl_name_moives_appear_in = 'iafd_movies_appers_in' + self.tbl_name_studio = 'iafd_studios' + self.tbl_name_dist = 'iafd_distributors' + self.tbl_name_performer_urls = 'iafd_performer_urls' + self.tbl_name_ethnic = 'iafd_meta_ethnic' + + def insert_item(self, item): + # 获取Item中所有定义的字段(包括父类继承的) + all_fields = item.fields.keys() + # 获取已被赋值的字段(存储在Item的内部属性_values中) + assigned_fields = set(item._values.keys()) + # 过滤被赋值过的字段,其他预定义的字段不处理,这样在插入/更新时才不影响无关字段的值 + processed_item = {} + for field in assigned_fields: + processed_item[field] = item[field] + + if isinstance(item, items_def.IafdPerformersItem): + self.insert_or_update_performer(processed_item) + + elif isinstance(item, items_def.IafdMoviesItem): + self.insert_or_update_movie(processed_item) + + elif isinstance(item, items_def.IafdDistributorsItem): + self.insert_or_update_distributor(data=processed_item) + + elif isinstance(item, items_def.IafdStudiosItem): + self.insert_or_update_studio(data=processed_item) + + elif isinstance(item, items_def.IafdMetaEthnicItem): + self.insert_or_update_ethnic(data=processed_item) + + else: + logging.error(f"unkown item. {processed_item}") + + return item + + # 统计函数 + def get_stat(self): + try: + # 使用单个SQL查询获取所有统计数据 + self.cursor.execute(""" + SELECT + (SELECT COUNT(*) FROM iafd_performers) AS iafd_actors, + (SELECT COUNT(*) FROM iafd_performers WHERE is_full_data=1) AS act_full, + (SELECT COUNT(*) FROM iafd_movies) AS movies, + (SELECT COUNT(*) FROM iafd_movies WHERE is_full_data=1) AS mov_full, + (SELECT COUNT(*) FROM iafd_distributors) AS dist, + (SELECT COUNT(*) FROM iafd_studios) AS stu + """) + + # 获取查询结果 + row = self.cursor.fetchone() + if not row: + return {} + + # 手动定义列名(与SQL中的AS别名保持一致) + #columns = ['iafd_actors', 'act_full', 'movies', 'mov_full', 'dist', 'stu'] + columns = [desc[0] for desc in self.cursor.description] + + # 将元组结果转换为字典 + return dict(zip(columns, row)) + + except sqlite3.Error as e: + logging.error(f"查询失败: {e}") + return {} + + # 插入演员索引,来自于列表数据 + #def insert_performer_index(self, name, href, from_astro_list=None, from_birth_list=None, from_ethnic_list=None, from_movie_list=None): + def insert_performer_index(self, name, href, **kwargs): + fields = [ + 'from_astro_list', 'from_birth_list', 'from_ethnic_list', 'from_movie_list' + ] + data = {'name': name, 'href': href} + # 如果没有传入值,就用原来的值 + for field in fields: + if kwargs.get(field) is not None: + data[field] = kwargs.get(field) + + return self.insert_or_update_common(data=data, tbl_name=self.tbl_name_performers, uniq_key='href', exists_do_nothing=False) + + # """插入电影索引,来自于列表数据""" + #def insert_movie_index(self, title, href, release_year=0, from_performer_list=None, from_dist_list=None, from_stu_list=None): + def insert_movie_index(self, title, href, release_year=0, from_performer_list=None, from_dist_list=None, from_stu_list=None): + fields = [ + 'from_performer_list', 'from_dist_list', 'from_stu_list', 'release_year' + ] + data = {'title': title, 'href': href} + # 如果没有传入值,就用原来的值 + for field in fields: + if kwargs.get(field) is not None: + data[field] = kwargs.get(field) + + return self.insert_or_update_common(data=data, tbl_name=self.tbl_name_movies, uniq_key='href', exists_do_nothing=False) + + # 插入演员和电影的关联数据 + def insert_performer_movie(self, performer_id, movie_id, role, notes): + return self.insert_or_update_with_composite_pk( + data = {'performer_id': performer_id, 'movie_id': movie_id, 'role': role, 'notes': notes}, + tbl_name = self.tbl_name_performer_movies, + composite_pk = ['performer_id', 'movie_id'], + exists_do_nothing = True + ) + + # 插入电影和电影的关联数据 + def insert_movie_appears_in(self, movie_id, appears_in_id, gradation=0, notes=''): + return self.insert_or_update_with_composite_pk( + data = {'movie_id': movie_id, 'appears_in_id': appears_in_id, 'gradation': gradation, 'notes': notes}, + tbl_name = self.tbl_name_moives_appear_in, + composite_pk = ['movie_id', 'appears_in_id'], + exists_do_nothing = True + ) + + # 插入演员信息 + def insert_or_update_performer(self, data, movies_update=True): + try: + # 插入演员信息 + performer_id = self.insert_or_update_common(data=data, tbl_name=self.tbl_name_performers, uniq_key='href', exists_do_nothing=False) + if performer_id is None: + return None + logging.debug(f"insert one performer, id: {performer_id}, name: {data['person']}, href: {data['href']}") + + # 插入新的 alias + for alias in data.get("performer_aka", []): + if alias.lower() != "no known aliases": + self.insert_or_update_with_composite_pk( + data={'performer_id': performer_id, 'alias': alias}, + tbl_name = self.tbl_name_alias, + composite_pk = ['performer_id', 'alias'], + exists_do_nothing = True + ) + conn.commit() + + # 插入影片列表,可能有 personal 和 director 两个身份 + if movies_update: + credits = data.get('credits', {}) + for role, movies in credits.items(): + if movies: + for movie in movies: + movie_id = self.get_id_by_key(tbl=self.tbl_name_movies, uniq_key='href', val=movie['href']) + # 影片不存在,先插入 + if movie_id is None: + movie_id = self.insert_movie_index(movie['title'], movie['href'], utils.to_number(movie['year']), from_performer_list=1) + if movie_id: + tmp_id = self.insert_performer_movie(performer_id, movie_id, role, movie['notes']) + if tmp_id : + logging.debug(f"insert one performer_movie, performer_id: {performer_id}, movie_id: {movie_id}, role: {role}") + else: + logging.warning(f"insert performer_movie failed. performer_id: {performer_id}, moive href: {movie['href']}") + + return performer_id + + except sqlite3.Error as e: + conn.rollback() + logging.error(f"数据库错误: {e}") + return None + except Exception as e: + conn.rollback() + logging.error(f"未知错误: {e}") + return None + + + # """插入或更新电影数据(异常url的处理,比如404链接)""" + def insert_or_update_performer_404(self, name, href, is_full_data=1): + return self.insert_or_update_common( + data={'href': href, 'name': name, 'is_full_data': is_full_data}, + tbl_name=self.tbl_name_performers, + uniq_key='href', + exists_do_nothing = False + ) + + # 插入或更新发行商 """ + def insert_or_update_ethnic(self, data): + return self.insert_or_update_common(data=data, tbl_name=self.tbl_name_ethnic, uniq_key='href', exists_do_nothing=False) + + # 按条件查询 href 列表 + def query_ethnic_hrefs(self, **filters): + try: + sql = "SELECT href, name FROM iafd_meta_ethnic WHERE 1=1" + params = [] + + if "id" in filters: + sql += " AND id = ?" + params.append(filters["id"]) + if "url" in filters: + sql += " AND href = ?" + params.append(filters["href"]) + if "name" in filters: + sql += " AND name LIKE ?" + params.append(f"%{filters['name']}%") + + cursor.execute(sql, params) + #return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写 + return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()] + + except sqlite3.Error as e: + logging.error(f"查询 href 失败: {e}") + return None + + # 插入或更新发行商 """ + def insert_or_update_distributor(self, data): + return self.insert_or_update_common(data=data, tbl_name=self.tbl_name_dist, uniq_key='href', exists_do_nothing=False) + + # 按条件查询 href 列表 + def query_distributor_hrefs(self, **filters): + try: + sql = "SELECT href FROM iafd_distributors WHERE 1=1" + params = [] + + if "id" in filters: + sql += " AND id = ?" + params.append(filters["id"]) + if "url" in filters: + sql += " AND href = ?" + params.append(filters["href"]) + if "name" in filters: + sql += " AND name LIKE ?" + params.append(f"%{filters['name']}%") + + cursor.execute(sql, params) + return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写 + + except sqlite3.Error as e: + logging.error(f"查询 href 失败: {e}") + return None + + # """ 插入或更新制作公司 """ + def insert_or_update_studio(self, data): + return self.insert_or_update_common(data=data, tbl_name=self.tbl_name_studio, uniq_key='href', exists_do_nothing=False) + + # 按条件查询 href 列表 + def query_studio_hrefs(self, **filters): + try: + sql = "SELECT href FROM iafd_studios WHERE 1=1" + params = [] + + if "id" in filters: + sql += " AND id = ?" + params.append(filters["id"]) + if "href" in filters: + sql += " AND href = ?" + params.append(filters["href"]) + if "name" in filters: + sql += " AND name LIKE ?" + params.append(f"%{filters['name']}%") + + cursor.execute(sql, params) + return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写 + + except sqlite3.Error as e: + logging.error(f"查询 href 失败: {e}") + return None + + # 检查记录是否存在,不存在就插入 + def check_and_get_id(self, name, href, tbl, uniq_key='href'): + if not is_valid_url(href): + return 0 + row_id = self.get_id_by_key(tbl, uniq_key, href) + if row_id is None: + data = {'name':name, 'href': href} + if tbl == self.tbl_name_performers: + data['from_movie_list'] = 1 + row_id = self.insert_or_update_common(data=data, tbl_name=tbl, uniq_key=uniq_key, exists_do_nothing=True) + + return row_id if row_id else 0 + + # """插入或更新电影数据""" + def insert_or_update_movie(self, movie_data): + try: + # 获取相关 ID + movie_data['distributor_id']= self.check_and_get_id(movie_data['Distributor'], movie_data['DistributorHref'], self.tbl_name_dist, uniq_key='href') + movie_data['studio_id'] = self.check_and_get_id(movie_data['Studio'], movie_data['StudioHref'], self.tbl_name_studio, uniq_key='href') + movie_data['director_id'] = self.check_and_get_id(movie_data['Director'], movie_data['DirectorHref'], self.tbl_name_performers, uniq_key='href') + + movie_id = self.insert_or_update_common(data=movie_data, tbl_name=self.tbl_name_movies, uniq_key='href', exists_do_nothing=False) + if movie_id is None: + return None + logging.debug(f"insert one move, id: {movie_id}, title: {movie_data['title']}, href: {movie_data['href']}") + + # 导演-电影写入 关系表 + if movie_data['director_id']: + tmp_id = self.insert_performer_movie(movie_data['director_id'], movie_id, 'directoral', '') + if tmp_id: + logging.debug(f"insert one perfomer_movie. director_id: {movie_data['director_id']}, movie_id:{movie_id}") + for director in movie_data.get('Directors', []): + director_id = self.check_and_get_id(director['name'], director['href'], self.tbl_name_performers, uniq_key='href') + if director_id: + tmp_id = self.insert_performer_movie(director_id, movie_id, 'directoral', '') + if tmp_id: + logging.debug(f"insert one perfomer_movie. director_id: {director_id}, movie_id:{movie_id}") + + # 插入 performers_movies 关系表 + for performer in movie_data.get('Performers', []): + performer_id = self.check_and_get_id(performer['name'], performer['href'], self.tbl_name_performers, uniq_key='href') + if performer_id: + notes = '|'.join(tag for tag in performer['tags'] if tag != performer['name']) + tmp_id = self.insert_performer_movie(performer_id, movie_id, 'personal', notes) + if tmp_id: + logging.debug(f"insert one perfomer_movie. perfomer_id: {performer_id}, movie_id:{movie_id}") + else: + logging.debug(f"insert perfomer_movie failed. perfomer_id: {performer_id}, movie_id:{movie_id}") + else: + logging.warning(f"insert perfomer failed. name: {performer['name']}, href: {performer['href']}") + + # 插入 movies_appers_in 表 + for appears in movie_data.get("AppearsIn", []): + appears_in_id = self.get_id_by_key(self.tbl_name_movies, 'href', appears['href']) + # 不存在,先插入 + if appears_in_id is None: + appears_in_id = self.insert_movie_index( appears['title'], appears['href']) + if appears_in_id: + tmp_id = self.insert_movie_appears_in(movie_id, appears_in_id) + if tmp_id: + logging.debug(f"insert one movie_appears_in record. movie_id: {movie_id}, appears_in_id: {appears_in_id}") + else: + logging.warning(f"insert movie_appears_in failed. movie_id: {movie_id}, appears_in_id: {appears_in_id}") + else: + logging.warning(f"get appears_in_id failed. title: {appears['title']}, href: {appears['href']}") + + return movie_id + + except Exception as e: + conn.rollback() + logging.error("Error inserting movie: %s", e) + return None + + + # """插入或更新电影数据(异常url的处理,比如404链接)""" + def insert_or_update_movie_404(self, title, href, is_full_data=1): + return self.insert_or_update_common( + data={'href': href, 'title':title, 'is_full_data':is_full_data}, + tbl_name=self.tbl_name_movies, + uniq_key='href', + exists_do_nothing = False + ) + + # 按条件查询 href 列表 + def query_performer_hrefs(self, **filters): + return self.get_performers(**filters) + + # 按条件查询 href 列表 + def get_performers(self, **filters): + try: + sql = f"SELECT href, name, id, movies_cnt, is_full_data FROM {self.tbl_name_performers} WHERE 1=1" + params = [] + + conditions = { + "id": " AND id = ?", + "href": " AND href = ?", + "name": " AND name LIKE ?", + "is_full_data": " AND is_full_data = ?", + "start_id": " AND id > ?", + } + + for key, condition in conditions.items(): + if key in filters: + sql += condition + if key == "name": + params.append(f"%{filters[key]}%") + else: + params.append(filters[key]) + + for key in ["is_full_data_in", "is_full_data_not_in"]: + if key in filters: + values = filters[key] + if values: + placeholders = ", ".join(["?"] * len(values)) + operator = "IN" if key == "is_full_data_in" else "NOT IN" + sql += f" AND is_full_data {operator} ({placeholders})" + params.extend(values) + + if "order_by" in filters: + # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 + sql += f" ORDER BY {filters['order_by']} " + + if 'limit' in filters: + sql += " LIMIT ?" + params.append(filters["limit"]) + + self.cursor.execute(sql, params) + return [dict(row) for row in self.cursor.fetchall()] + except sqlite3.Error as e: + logging.error(f"查询 href 失败: {e}") + return None + + # 按条件查询 href 列表 + def query_movie_hrefs(self, **filters): + return self.get_movies(**filters) + + # 按条件查询 href 列表 + def get_movies(self, **filters): + try: + sql = f"SELECT href, title, id, is_full_data FROM {self.tbl_name_movies} WHERE 1=1" + params = [] + + conditions = { + "id": " AND id = ?", + "href": " AND href = ?", + "title": " AND title LIKE ?", + "is_full_data": " AND is_full_data = ?", + "start_id": " AND id > ?", + } + + for key, condition in conditions.items(): + if key in filters: + sql += condition + if key == "title": + params.append(f"%{filters[key]}%") + else: + params.append(filters[key]) + + for key in ["is_full_data_in", "is_full_data_not_in"]: + if key in filters: + values = filters[key] + if values: + placeholders = ", ".join(["?"] * len(values)) + operator = "IN" if key == "is_full_data_in" else "NOT IN" + sql += f" AND is_full_data {operator} ({placeholders})" + params.extend(values) + + if "order_by" in filters: + # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 + sql += f" ORDER BY {filters['order_by']} " + + if 'limit' in filters: + sql += " LIMIT ?" + params.append(filters["limit"]) + + self.cursor.execute(sql, params) + return [dict(row) for row in self.cursor.fetchall()] + except sqlite3.Error as e: + logging.error(f"查询 href 失败: {e}") + return None + + + # 按条件查询 href 列表 + def get_lord_actors(self, **filters): + try: + sql = f"SELECT href, pornstar as name, id FROM {self.tbl_name_thelordofporn_actress} WHERE 1=1" + params = [] + + conditions = { + "id": " AND id = ?", + "href": " AND href = ?", + "pornstar": " AND pornstar LIKE ?", + "start_id": " AND id > ?", + } + + for key, condition in conditions.items(): + if key in filters: + sql += condition + if key == "pornstar": + params.append(f"%{filters[key]}%") + else: + params.append(filters[key]) + + if "order_by" in filters: + # 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理 + sql += f" ORDER BY {filters['order_by']} " + + if 'limit' in filters: + sql += " LIMIT ?" + params.append(filters["limit"]) + + self.cursor.execute(sql, params) + return [dict(row) for row in self.cursor.fetchall()] + except sqlite3.Error as e: + logging.error(f"查询 href 失败: {e}") + return None + + # 按条件查询 href 列表 + def get_iafd_actors( + self, + names: List[str], + tbl = 'stu' + ) -> Dict[str, List[Dict[str, str]]]: + """ + 分两步查询指定发行商对应的女性演员(使用临时表减少内存占用) + + 步骤1:筛选目标发行商及其关联的影片,存入临时表(小集合) + 步骤2:用临时表的影片ID关联演员表,获取女性演员信息 + """ + tbl_name = 'iafd_studios' if tbl.lower() == 'stu' else 'iafd_distributors' + join_key = 'studio_id' if tbl.lower() == 'stu' else 'distributor_id' + if not names: + return {} + + # 结果容器 + final_result: Dict[str, List[Dict[str, str]]] = {} + + try: + # -------------------------- + # 步骤1:创建临时表,存储目标发行商及其关联的影片 + # -------------------------- + # 先删除可能残留的临时表(避免冲突) + self.cursor.execute("DROP TABLE IF EXISTS temp_distributor_movies") + # 创建临时表(只在当前连接可见,连接关闭后自动删除) + self.cursor.execute(""" + CREATE TEMPORARY TABLE temp_distributor_movies ( + distributor_id INTEGER, + distributor_name TEXT, + movie_id INTEGER, + PRIMARY KEY (distributor_id, movie_id) + ) + """) + + # 批量插入目标发行商及其关联的影片(小集合) + # 先筛选发行商,再关联影片,结果插入临时表 + insert_sql = """ + INSERT INTO temp_distributor_movies (distributor_id, distributor_name, movie_id) + SELECT + d.id AS distributor_id, + d.name AS distributor_name, + m.id AS movie_id + FROM + {tbl_name} d + INNER JOIN + iafd_movies m ON d.id = m.{join_key} + WHERE + d.name IN ({placeholders}) + """.format( + tbl_name=tbl_name, + join_key=join_key, + placeholders=', '.join(['?'] * len(names)) + ) + + logging.debug(f'{insert_sql}') + + self.cursor.execute(insert_sql, names) + self.conn.commit() # 提交临时表数据 + + # -------------------------- + # 步骤2:用临时表关联演员信息(仅处理小集合) + # -------------------------- + query_sql = """ + SELECT + t.distributor_name, + p.name AS performer_name, + p.href AS performer_href + FROM + temp_distributor_movies t + INNER JOIN + iafd_performers_movies pm ON t.movie_id = pm.movie_id + INNER JOIN + iafd_performers p ON pm.performer_id = p.id + WHERE + p.gender = 'Woman' + ORDER BY + t.distributor_name, p.name + """ + + self.cursor.execute(query_sql) + rows = self.cursor.fetchall() + + # 整理结果:按发行商分组 + for row in rows: + dist_name = row['distributor_name'] + performer = { + 'name': row['performer_name'], + 'href': row['performer_href'] + } + if dist_name not in final_result: + final_result[dist_name] = [] + final_result[dist_name].append(performer) + + # 主动清理临时表(可选,连接关闭后会自动删除) + self.cursor.execute("DROP TABLE IF EXISTS temp_distributor_movies") + + except sqlite3.Error as e: + print(f"查询失败:{e}") + return {} + + return final_result + + + # 获取 view_iafd_performers_movies 中数据 不匹配的演员信息。 + def get_performers_needed_update(self, limit=None): + try: + sql = """ + SELECT href, name FROM view_iafd_performers_movies where actual_movies_cnt != movies_cnt + """ + + if limit is not None: + sql += f" LIMIT {limit}" + + cursor.execute(sql) + return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()] + + except sqlite3.Error as e: + logging.error(f"查询 href 失败: {e}") + return [] + + # 生成一个复杂的演员电影数量的查询视图,来判断从电影列表中聚合出来的演员-影片数量,与从演员列表中抓取到的影片数量,是否相等。 + def check_and_create_stat_table(self, taskid = 0): + try: + # 检查索引是否存在,如果不存在则创建 + indexes = [ + ("idx_iafd_performers_movies_performer_id", + "CREATE INDEX idx_iafd_performers_movies_performer_id ON iafd_performers_movies (performer_id);"), + ("idx_iafd_movies_director_id", + "CREATE INDEX idx_iafd_movies_director_id ON iafd_movies (director_id);"), + ("idx_iafd_performers_id", + "CREATE INDEX idx_iafd_performers_id ON iafd_performers (id);") + ] + for index_name, create_index_sql in indexes: + cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name=?", (index_name,)) + if not cursor.fetchone(): + cursor.execute(create_index_sql) + logging.info(f"Index {index_name} created successfully.") + else: + logging.info(f"Index {index_name} already exists.") + + # 检查视图是否存在,如果不存在则创建 + view_name = f"iafd_tmp_performers_stat_{taskid}" + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (view_name,)) + if cursor.fetchone(): + cursor.execute("drop table ?", (view_name,)) + conn.commit() + + create_view_sql = f""" + CREATE table {view_name} AS + SELECT + id, + href, + name, + movies_cnt, + SUM(CASE WHEN role = 'actor' THEN movie_count ELSE 0 END) AS actor_movie_count, + SUM(CASE WHEN role = 'director' THEN movie_count ELSE 0 END) AS director_movie_count + FROM ( + SELECT + p.id, + p.href, + p.name, + p.movies_cnt, + COUNT(apm.movie_id) AS movie_count, + 'actor' AS role + FROM + iafd_performers p + LEFT JOIN + iafd_performers_movies apm ON p.id = apm.performer_id + GROUP BY + p.id, p.href, p.name, p.movies_cnt + + UNION ALL + + SELECT + p.id, + p.href, + p.name, + p.movies_cnt, + COUNT(im.id) AS movie_count, + 'director' AS role + FROM + iafd_performers p + LEFT JOIN + iafd_movies im ON p.id = im.director_id + GROUP BY + p.id, p.href, p.name, p.movies_cnt + ) combined + GROUP BY + id, href, name, movies_cnt; + """ + cursor.execute(create_view_sql) + logging.info(f"table {view_name} created successfully.") + + # 提交更改并关闭连接 + conn.commit() + except sqlite3.Error as e: + logging.warning(f"An error occurred: {e}") + + + # 处理影片的 无码 字段 + def reset_actor_movies(self, check_and_do = 0): + try: + # 检查表中是否已存在movies_cnt列 + cursor.execute(f"PRAGMA table_info(iafd_performers);") + columns = [row[1] for row in cursor.fetchall()] + + if 'movies_cnt' not in columns: + # 列不存在,添加新列 + add_field_sql = f""" + ALTER TABLE iafd_performers ADD COLUMN movies_cnt INTEGER DEFAULT 0 NOT NULL; + """ + cursor.execute(add_field_sql) + logging.info("成功添加movies_cnt字段") + else: + logging.info("movies_cnt字段已存在,跳过添加") + + # 确保关联表有索引 + cursor.execute(f""" + CREATE INDEX IF NOT EXISTS idx_iafd_performers_movies_performer_id + ON iafd_performers_movies(performer_id); + """) + + # 创建临时表存储统计结果 + cursor.execute(f""" + CREATE TEMPORARY TABLE temp_actor_counts AS + SELECT performer_id, COUNT(movie_id) AS cnt + FROM iafd_performers_movies + GROUP BY performer_id; + """) + + # 为临时表添加索引 + cursor.execute("CREATE INDEX idx_temp_performer_id ON temp_actor_counts(performer_id);") + + # 更新主表 + cursor.execute(f""" + UPDATE iafd_performers + SET movies_cnt = COALESCE(( + SELECT cnt FROM temp_actor_counts + WHERE performer_id = iafd_performers.id + ), 0); -- 使用COALESCE处理没有影片的演员 + """) + + updated_rows = cursor.rowcount + logging.info(f"成功更新{updated_rows}个演员的影片数量") + + # 清理资源 + cursor.execute("DROP TABLE IF EXISTS temp_actor_counts;") + conn.commit() + + logging.info("任务执行完成!") + + except sqlite3.Error as e: + conn.rollback() + logging.error("Error updating actor movie_cnt: %s", e) + diff --git a/scrapy_proj/scrapy_proj/db_wapper/sqlite_base.py b/scrapy_proj/scrapy_proj/db_wapper/sqlite_base.py index b74cf6b..03bd852 100644 --- a/scrapy_proj/scrapy_proj/db_wapper/sqlite_base.py +++ b/scrapy_proj/scrapy_proj/db_wapper/sqlite_base.py @@ -7,6 +7,7 @@ home_dir = os.path.expanduser("~") global_share_data_dir = f'{home_dir}/sharedata' default_dbpath = f"{global_share_data_dir}/sqlite/scrapy.db" shared_db_path = f"{global_share_data_dir}/sqlite/shared.db" +test_db_path = f"{global_share_data_dir}/sqlite/test.db" # 单例元类 class SingletonMeta(type): diff --git a/scrapy_proj/scrapy_proj/items.py b/scrapy_proj/scrapy_proj/items.py index 0bf6df8..9c4011c 100644 --- a/scrapy_proj/scrapy_proj/items.py +++ b/scrapy_proj/scrapy_proj/items.py @@ -35,55 +35,6 @@ class Sis001Item(scrapy.Item): size_gb = scrapy.Field() update_date = scrapy.Field() -class IAFDPersonItem(scrapy.Item): - item_type = comm.ITEM_TYPE_ACTOR_INDEX - name = scrapy.Field() - href = scrapy.Field() - from_astro_list = scrapy.Field() - from_birth_list = scrapy.Field() - from_ethnic_list = scrapy.Field() - from_movie_list = scrapy.Field() - -class IAFDMovieItem(scrapy.Item): - item_type = comm.ITEM_TYPE_MOVIE_INDEX - title = scrapy.Field() - href = scrapy.Field() - release_year = scrapy.Field() - from_performer_list = scrapy.Field() - from_dist_list = scrapy.Field() - from_stu_list = scrapy.Field() - -class IAFDPersonDetailItem(scrapy.Item): - item_type = comm.ITEM_TYPE_ACTOR_DETAIL - href = scrapy.Field() - person = scrapy.Field() - gender = scrapy.Field() - birthday = scrapy.Field() - astrology = scrapy.Field() - birthplace = scrapy.Field() - years_active = scrapy.Field() - ethnicity = scrapy.Field() - nationality = scrapy.Field() - hair_colors = scrapy.Field() - eye_color = scrapy.Field() - height = scrapy.Field() - weight = scrapy.Field() - measurements = scrapy.Field() - tattoos = scrapy.Field() - piercings = scrapy.Field() - movies_cnt = scrapy.Field() - vixen_cnt = scrapy.Field() - blacked_cnt = scrapy.Field() - tushy_cnt = scrapy.Field() - x_art_cnt = scrapy.Field() - performer_aka = scrapy.Field() - -class IAFDMovieDetailItem(scrapy.Item): - item_type = comm.ITEM_TYPE_MOVIE_DETAIL - title = scrapy.Field() - href = scrapy.Field() - # 可以根据实际需求添加更多影片详情字段 - class PBoxStuItem(scrapy.Item): item_type = scrapy.Field() label_id = scrapy.Field() @@ -228,10 +179,12 @@ class IafdDistributorsItem(scrapy.Item): href = scrapy.Field() parent_id = scrapy.Field() details = scrapy.Field() + # 以下为添加字段 class IafdMetaEthnicItem(scrapy.Item): name = scrapy.Field() href = scrapy.Field() + # 以下为添加字段 class IafdMoviesItem(scrapy.Item): title = scrapy.Field() @@ -251,21 +204,35 @@ class IafdMoviesItem(scrapy.Item): from_performer_list = scrapy.Field() from_dist_list = scrapy.Field() from_stu_list = scrapy.Field() + # 以下为添加字段 + Directors = scrapy.Field() + Distributor = scrapy.Field() + DistributorHref = scrapy.Field() + Studio = scrapy.Field() + StudioHref = scrapy.Field() + Director = scrapy.Field() + DirectorHref = scrapy.Field() + Performers = scrapy.Field() + SceneBreakdowns = scrapy.Field() + AppearsIn = scrapy.Field() class IafdMoviesAppersInItem(scrapy.Item): movie_id = scrapy.Field() appears_in_id = scrapy.Field() gradation = scrapy.Field() notes = scrapy.Field() + # 以下为添加字段 class IafdPerformerAliasesItem(scrapy.Item): performer_id = scrapy.Field() alias = scrapy.Field() + # 以下为添加字段 class IafdPerformerUrlsItem(scrapy.Item): performer_id = scrapy.Field() position = scrapy.Field() url = scrapy.Field() + # 以下为添加字段 class IafdPerformersItem(scrapy.Item): name = scrapy.Field() @@ -299,18 +266,23 @@ class IafdPerformersItem(scrapy.Item): from_birth_list = scrapy.Field() from_ethnic_list = scrapy.Field() from_movie_list = scrapy.Field() + # 以下为添加字段 + credits = scrapy.Field() + performer_aka = scrapy.Field() class IafdPerformersMoviesItem(scrapy.Item): performer_id = scrapy.Field() movie_id = scrapy.Field() role = scrapy.Field() notes = scrapy.Field() + # 以下为添加字段 class IafdStudiosItem(scrapy.Item): name = scrapy.Field() href = scrapy.Field() parent_id = scrapy.Field() details = scrapy.Field() + # 以下为添加字段 class IafdTaskLogItem(scrapy.Item): task_id = scrapy.Field() @@ -321,6 +293,7 @@ class IafdTaskLogItem(scrapy.Item): total_distributors = scrapy.Field() total_studios = scrapy.Field() task_status = scrapy.Field() + # 以下为添加字段 class JavbusActorsItem(scrapy.Item): ja_name = scrapy.Field() diff --git a/scrapy_proj/scrapy_proj/pipelines.py b/scrapy_proj/scrapy_proj/pipelines.py index aed981a..825d693 100644 --- a/scrapy_proj/scrapy_proj/pipelines.py +++ b/scrapy_proj/scrapy_proj/pipelines.py @@ -11,7 +11,7 @@ # return item import json import scrapy -from scrapy_proj.items import U001Item, Sis001Item, IAFDPersonItem, IAFDPersonDetailItem, IAFDMovieItem, IAFDMovieDetailItem, PBoxStuItem +from scrapy_proj.items import U001Item, Sis001Item, PBoxStuItem from scrapy_proj.db_wapper.spider_db_handler import spider_handler_registry, U3C3DBHandler, SisDBHandler, IAFDDBHandler, PboxDBHandler class SQLitePipeline(): diff --git a/scrapy_proj/scrapy_proj/spiders/base_spider.py b/scrapy_proj/scrapy_proj/spiders/base_spider.py index a768752..fb7be36 100644 --- a/scrapy_proj/scrapy_proj/spiders/base_spider.py +++ b/scrapy_proj/scrapy_proj/spiders/base_spider.py @@ -6,6 +6,16 @@ from twisted.internet import reactor, defer, asyncioreactor import time class BaseSpider(scrapy.Spider): + def __init__(self, *args, **kwargs): + self.requested_url = set() + + # 记录本次任务已经发起的请求链接 + def _can_request(self, href): + if href in self.requested_url: + return False + self.requested_url.add(href) + return True + def start_requests(self): """统一处理请求生成,兼容不同入口点""" # 如果定义了async start方法,使用它 diff --git a/scrapy_proj/scrapy_proj/spiders/iafd_spider.py b/scrapy_proj/scrapy_proj/spiders/iafd_spider.py index f288a8b..047a6c5 100644 --- a/scrapy_proj/scrapy_proj/spiders/iafd_spider.py +++ b/scrapy_proj/scrapy_proj/spiders/iafd_spider.py @@ -3,11 +3,11 @@ import re import sys from urllib.parse import urljoin, quote_plus from scrapy_proj.spiders.base_spider import BaseSpider -from scrapy_proj.items import IAFDPersonItem, IAFDMovieItem, IAFDPersonDetailItem, IAFDMovieDetailItem +from scrapy_proj.items import IafdDistributorsItem, IafdMetaEthnicItem, IafdMoviesItem, IafdPerformersItem, IafdStudiosItem from scrapy_proj.db_wapper.spider_db_handler import IAFDDBHandler from scrapy_proj.comm.comm_def import SPIDER_NAME_IAFD from scrapy_proj.spiders.parser.iafd_parser import common_parser -from scrapy_proj.utils.utils import pretty_json_simple +from scrapy_proj.utils.utils import pretty_json_simple, is_valid_url db_tools = IAFDDBHandler() @@ -40,8 +40,19 @@ class IAFDSpider(BaseSpider): if cmd and cmd != '': self.cmd_list = cmd.split(',') + self.existed_actors = {} + self.existed_movies = {} + self.load_existed_actors() + self.load_existed_movies() + # 入口函数,由基类的方法触发 def custom_start_requests(self): + self.crawler.stats.set_value(f"{self.name}/actor_all", 0) + self.crawler.stats.set_value(f"{self.name}/actor_done", 0) + self.crawler.stats.set_value(f"{self.name}/actor_404", 0) + self.crawler.stats.set_value(f"{self.name}/movie_all", 0) + self.crawler.stats.set_value(f"{self.name}/movie_done", 0) + self.crawler.stats.set_value(f"{self.name}/movie_404", 0) # 根据命令字执行 if self.cmd_astro in self.cmd_list: # 关键:迭代 start_astro 产生的生成器,转发其中的 Request @@ -117,59 +128,65 @@ class IAFDSpider(BaseSpider): async for request in super().start(): yield request + # 获得列表,查询详情 def parse_astro_page(self, response): astro = response.meta.get('astro', '') data, next_url = common_parser(html=response.text, page='astro', astro=astro) if data: - self.logger.debug(f"fetched data from {response.url}, data: {data}") + self.logger.debug(f"fetched data from {response.url}, data len: {len(data)}") + for item in data: + yield from self._create_performer_request(href=item['href'], name=item['person']) else: self.logger.warning(f"parse data error. {response.url}") - - item = IAFDPersonDetailItem() - #yield item + # 获得列表,查询详情 def parse_birth_page(self, response): month = response.meta['month'] day = response.meta['day'] data, next_url = common_parser(html=response.text, page='birth', month=month, day=day) if data: - self.logger.debug(f"fetched data from {response.url}, data: {data}") + self.logger.debug(f"fetched data from {response.url}, data len: {len(data)}") + for item in data: + yield from self._create_performer_request(href=item['href'], name=item['person']) else: self.logger.warning(f"parse data error. {response.url}") - - item = IAFDPersonDetailItem() - #yield item + # 获得列表,查询详情 def parse_ethnic_list_page(self, response): div_root = response.css('select#ethnicity1') if div_root: options = div_root.css('option') - self.crawler.stats.set_value(f"{self.name}/ethnic_all", len(options)) - self.crawler.stats.set_value(f"{self.name}/ethnic_done", 0) for option in options: href = option.attrib.get('value') text = option.css('::text').get().strip() if href and href.lower() != 'none': ethnic_url = urljoin(response.url , href) self.logger.info(f"ethnic: ({text}), start url: {ethnic_url}") - yield scrapy.Request(ethnic_url, callback=self.parse_ethnic_page, meta={'ethnic': text}) + item = IafdMetaEthnicItem() + item['name'] = text + item['href'] = ethnic_url + yield item + yield scrapy.Request(ethnic_url, callback=self.parse_ethnic_page, meta={'ethnic': text}) + else: + self.logger.warning(f"parse page error. url: {response.url}") + + # 获得列表,查询详情 def parse_ethnic_page(self, response): ethnic = response.meta['ethnic'] data, next_url = common_parser(html=response.text, page='ethnic', ethnic=ethnic) if data: - self.logger.debug(f"fetched data from {response.url}, data: {data}") + self.logger.debug(f"fetched data from {response.url}, data len: {len(data)}") + for item in data: + yield from self._create_performer_request(href=item['href'], name=item['person']) + + if next_url: + yield scrapy.Request(next_url, callback=self.parse_ethnic_page, meta={'ethnic': text}) + else: + self.logger.info(f"found all pages. ethnic: {ethnic}, url: {response.url}") else: self.logger.warning(f"parse data error. {response.url}") - if next_url: - self.logger.info(f"find next page: {next_url}") - else: - self.logger.info(f"found all pages. url: {response.url}") - - item = IAFDPersonDetailItem() - #yield item - def parse_distributors_list_page(self, response): select_element = response.css('select[name="Distrib"]') if select_element: @@ -178,7 +195,15 @@ class IAFDSpider(BaseSpider): value = option.attrib.get('value') text = option.css('::text').get().strip() dis_url = f"{self.host_url}/distrib.rme/distrib={value}" + item = IafdDistributorsItem() + item['name'] = text + item['href'] = dis_url + + yield item + yield scrapy.Request(dis_url, callback=self.parse_stu_dist_page, meta={'list_type': 'dist'}) + else: + self.logger.warning(f"parse page error. url: {response.url}") def parse_studios_list_page(self, response): select_element = response.css('select[name="Studio"]') @@ -188,53 +213,156 @@ class IAFDSpider(BaseSpider): value = option.attrib.get('value') text = option.css('::text').get().strip() dis_url = f"{self.host_url}/studio.rme/studio={value}" + item = IafdStudiosItem() + item['name'] = text + item['href'] = dis_url + yield item + yield scrapy.Request(dis_url, callback=self.parse_stu_dist_page, meta={'list_type': 'stu'}) + else: + self.logger.warning(f"parse page error. url: {response.url}") def parse_stu_dist_page(self, response): list_type = response.meta.get('list_type', '') data, next_url = common_parser(html=response.text, page=list_type) if data: self.logger.debug(f"fetched data from {response.url}, data: {data}") + for movie in data: + yield from self._create_movie_request(href=movie['href'], title=movie['title']) else: self.logger.warning(f"fetched data error. {response.url}") - item = IAFDPersonDetailItem() - #yield item + # 统一处理发起影片查询的请求 + def _create_performer_request(self, href, name): + if href != '' and is_valid_url(href): + if self._can_request(href): + self.crawler.stats.inc_value(f"{self.name}/actor_all") + yield scrapy.Request(href, + callback=self.parse_person_detail_page, + meta={'name': name, 'item_type':'movie'} + ) + else: + self.logger.warning(f"wrong url. {url}, ignore...") + # 统一处理发起影片查询的请求 + def _create_movie_request(self, href, title): + if href != '' and is_valid_url(href): + if self.need_update_movie(href) and self._can_request(href): + self.crawler.stats.inc_value(f"{self.name}/movie_all") + yield scrapy.Request(href, + callback=self.parse_movie_detail_page, + meta={'title': title, 'item_type':'movie'}, + cache=True + ) + else: + self.logger.warning(f"wrong url. {url}, ignore...") + # 演员详情页解析和处理 def parse_person_detail_page(self, response): data = common_parser(html=response.text, page='actor', url=response.url) if data: self.logger.debug(f"fetched data from {response.url}, data: {data}") + self.crawler.stats.inc_value(f"{self.name}/actor_done") + item = IafdPerformersItem() + for k, v in data.items(): + if k in item.fields: + item[k] = v + + yield item + + # 处理影片列表 + for role, movies in data.get('credits', {}).items(): + if movies: + for item in movies: + yield from self._create_movie_request(href=movie['href'], title=movie['title']) else: self.logger.warning(f"fetched data error. {response.url}") - item = IAFDPersonDetailItem() - #yield item - + # 影片详情页解析和处理 def parse_movie_detail_page(self, response): title = response.meta.get('title', '') data = common_parser(html=response.text, page='movies', href=response.url, title=title) if data: self.logger.debug(f"fetched data from {response.url}, data: {data}") + self.crawler.stats.inc_value(f"{self.name}/movie_done") + item = IafdMoviesItem() + for k, v in data.items(): + if k in item.fields: + item[k] = v + yield item + + # 处理各种链接 + link_url = data.get('DistributorHref', '') + if is_valid_url(link_url) and self._can_request(link_url): + yield scrapy.Request(link_url, callback=self.parse_stu_dist_page, meta={'list_type': 'dist'}) + + link_url = data.get('StudioHref', '') + if is_valid_url(link_url) and self._can_request(link_url): + yield scrapy.Request(link_url, callback=self.parse_stu_dist_page, meta={'list_type': 'stu'}) + + link_url = data.get('DirectorHref', '') + yield from self._create_performer_request(href=link_url, name=data.get('Director')) + + for director in data.get('Directors', []): + yield from self._create_performer_request(href=director['href'], name=director['name']) + else: self.logger.warning(f"fetched data error. {response.url}") - - item = IAFDMovieDetailItem() - #yield item - def custom_block_check(self, response): - item_type = response.meta.get('item_type', '') - if "invalid or outdated page" in response.text.lower(): - self.logger.warning(f"invalid or outdated page. url: {response.url}, item_type: {item_type}") - return "invalid or outdated page" - else: - self.logger.info(f"right content. url: {response.url}") - - return None + # 统一判断并处理异常 + def _handle_invalid_response(self, response): + if response.status in [200]: + if "invalid or outdated page" in response.text.lower(): + self.logger.warning(f"invalid or outdated page. url: {response.url}, status_code: {response.status}") + # TODO: 更新404的演员或者影片 + else: + self.logger.warning(f"unkown page. url:{response.url}, content: {response.text[:500]}") - # 处理页面异常,主要是404, 403 - def handle_blocked(self, response, reason): - item_type = response.meta.get('item_type', '') - if response.status in [404, 403]: - self.logger.warning(f"get 404 page. url: {response.url}, item_type: {item_type}") \ No newline at end of file + elif response.status in [404, 403]: + self.logger.warning(f"get 404 page. url: {response.url}") + # TODO: 更新404的演员或者影片 + + else: + self.logger.warning(f"unkown page. url:{response.url}, status: {response.status}, content: {response.text[:500]}") + + + def load_existed_actors(self): + query_args = {} + rows = db_tools.query_performer_hrefs(**query_args) + if rows: + for item in rows: + self.existed_actors[item['href']] = {'is_full_data': item['is_full_data'], 'movies_cnt': item['movies_cnt']} + else: + self.logger.warning(f"query_performer_hrefs empty. query args: {query_args}") + + + def load_existed_movies(self): + query_args = {} + rows = db_tools.query_movie_hrefs(**query_args) + if rows: + for item in rows: + self.existed_movies[item['href']] = item['is_full_data'] + else: + self.logger.warning(f"query_movies empty. query args: {query_args}") + + # 内存缓存,也可以改为查询db + def need_update_movie(self, href): + return not (href in self.existed_movies and self.existed_movies[href] >0) + + # 内存缓存,也可以改为查询db + def need_update_actor(self, href, movies_cnt): + if href not in self.existed_actors: + return True + data = self.existed_actors[href] + if data['is_full_data'] <=0 : + return True + if data['movies_cnt'] < movies_cnt: + return True + + return False + + def add_actor_to_existed(self, href, movies_cnt, is_full_data=1): + self.existed_actors[href] = {'is_full_data': is_full_data, 'movies_cnt': movies_cnt} + + def acc_movie_to_existed(self, href, is_full_data=1): + self.existed_movies[href] = is_full_data diff --git a/scrapy_proj/scrapy_proj/spiders/parser/iafd_parser.py b/scrapy_proj/scrapy_proj/spiders/parser/iafd_parser.py index 3464612..d8c93b3 100644 --- a/scrapy_proj/scrapy_proj/spiders/parser/iafd_parser.py +++ b/scrapy_proj/scrapy_proj/spiders/parser/iafd_parser.py @@ -1,4 +1,3 @@ - import cloudscraper import time import json @@ -11,6 +10,7 @@ import re from bs4 import BeautifulSoup from requests.exceptions import RequestException from functools import partial +from datetime import datetime #import config #import utils @@ -410,8 +410,8 @@ def parse_page_performer(soup, url): 'nationality': 'Nationality', 'hair_colors': 'Hair Colors', 'eye_color': 'Eye Color', - 'height': 'Height', - 'weight': 'Weight', + 'height_str': 'Height', + 'weight_str': 'Weight', 'measurements': 'Measurements', 'tattoos': 'Tattoos', 'piercings': 'Piercings' @@ -474,6 +474,20 @@ def parse_page_performer(soup, url): return data +def extract_year_from_date_string(date_str): + """ + 从 "Apr 23, 2021" 格式的字符串中提取年份 + + :param date_str: 待解析的日期字符串(如 "Apr 23, 2021") + :return: 提取的年份(int类型),若解析失败返回 None + """ + try: + date_obj = datetime.strptime(date_str, "%b %d, %Y") + return date_obj.year + except ValueError: + return 0 + except TypeError: + return 0 # 解析网页 HTML 并提取电影信息 def parse_page_movie(soup, href, title): @@ -595,19 +609,20 @@ def parse_page_movie(soup, href, title): return { "href": href, "title": title, - "Minutes": movie_data.get("Minutes", ""), - "Distributor": movie_data.get("Distributor", ""), - "Studio": movie_data.get("Studio", ""), - "ReleaseDate": movie_data.get("Release Date", ""), - "AddedtoIAFDDate": movie_data.get("Date Added to IAFD", ""), - "All-Girl": movie_data.get("All-Girl", ""), - "All-Male": movie_data.get("All-Male", ""), - "Compilation": movie_data.get("Compilation", ""), - "Webscene": movie_data.get("Webscene", ""), + "minutes": movie_data.get("Minutes", ""), + "release_date": movie_data.get("Release Date", ""), + "added_to_IAFD_date": movie_data.get("Date Added to IAFD", ""), + "all_girl": movie_data.get("All-Girl", ""), + "all_male": movie_data.get("All-Male", ""), + "compilation": movie_data.get("Compilation", ""), + "webscene": movie_data.get("Webscene", ""), + 'release_year': extract_year_from_date_string(movie_data.get("Release Date", "")), "Director": movie_data.get("Director", ""), "DirectorHref": movie_data.get("DirectorHref", ""), - "DistributorHref": movie_data.get("DistributorHref", ""), + "Studio": movie_data.get("Studio", ""), "StudioHref": movie_data.get("StudioHref", ""), + "Distributor": movie_data.get("Distributor", ""), + "DistributorHref": movie_data.get("DistributorHref", ""), "Directors": movie_data.get("Directors", []), # 可能存在的元素 "Performers": performers, "SceneBreakdowns": scene_breakdowns,