Compare commits

...

2 Commits

Author SHA1 Message Date
228566a6f3 modify scripts 2025-07-26 18:21:51 +08:00
5456b40d56 modify scripts 2025-07-26 18:20:49 +08:00
13 changed files with 3112 additions and 8 deletions

View File

@ -6,6 +6,7 @@ from datetime import datetime
from typing import List, Dict
from scrapy_proj.db_wapper.sqlite_base import SQLiteDBHandler, default_dbpath, shared_db_path
import scrapy_proj.comm.comm_def as comm
import scrapy_proj.items as items_def
from scrapy_proj.utils.utils import pretty_json_simple
# 注册器字典
@ -668,3 +669,553 @@ class LordDBHandler(SQLiteDBHandler):
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return 0
@register_handler(comm.SPIDER_NAME_JAVBUS)
class JavBusDBHandler(SQLiteDBHandler):
def __init__(self, db_path=shared_db_path):
super().__init__(db_path)
self.tbl_name_actors = 'javbus_actors'
self.tbl_name_movies = 'javbus_movies'
self.tbl_name_studios = 'javbus_studios'
self.tbl_name_labels = 'javbus_labels'
self.tbl_name_series = 'javbus_series'
self.tbl_name_tags = 'javbus_tags'
self.tbl_name_movie_tags = 'javbus_movies_tags'
self.tbl_name_actor_movie = 'javbus_actors_movies'
def insert_item(self, item):
# 获取Item中所有定义的字段包括父类继承的
all_fields = item.fields.keys()
# 获取已被赋值的字段存储在Item的内部属性_values中
assigned_fields = set(item._values.keys())
# 过滤被赋值过的字段,其他预定义的字段不处理,这样在插入/更新时才不影响无关字段的值
processed_item = {}
for field in assigned_fields:
processed_item[field] = item[field]
if isinstance(item, items_def.JavbusActorsItem):
self.update_actor_detail(processed_item)
elif isinstance(item, items_def.JavbusMoviesItem):
self.insert_or_update_movie(processed_item)
elif isinstance(item, items_def.JavbusLabelsItem):
self.update_pubs_multilang(data=processed_item, tbl='label')
elif isinstance(item, items_def.JavbusStudiosItem):
self.update_pubs_multilang(data=processed_item, tbl='studio')
elif isinstance(item, items_def.JavbusSeriesItem):
self.update_pubs_multilang(data=processed_item, tbl='series')
elif isinstance(item, items_def.JavbusTagsItem):
self.update_pubs_multilang(data=processed_item, tbl='tags')
else:
logging.error(f"unkown item. {processed_item}")
return item
# 统计函数
def get_stat(self):
return self.get_statics()
def has_full_data(self, href):
try:
self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_name_actors} WHERE is_full_data=1 and href = ?", (href,))
row = self.cursor.fetchone()
return row[0] if row else None
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return 0
def insert_actor_index(self, data, **kwargs):
fields = ['uncensored', 'from_actor_list', 'from_movie_list']
# 如果没有传入值,就用原来的值
for field in fields:
if kwargs.get(field) is not None:
data[field] = kwargs.get(field)
try:
return self.insert_or_update_common(data, self.tbl_name_actors, uniq_key='href', exists_do_nothing=True)
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
def insert_movie_index(self, data, **kwargs):
fields = [
'uncensored', 'from_actor_list', 'from_movie_studios', 'from_movie_labels', 'from_movie_series',
'studio_id', 'label_id', 'series_id'
]
# 如果没有传入值,就用原来的值
for field in fields:
if kwargs.get(field) is not None:
data[field] = kwargs.get(field)
try:
return self.insert_or_update_common(data, self.tbl_name_movies, uniq_key='href')
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 插入演员和电影的关联数据
def insert_actor_movie(self, performer_id, movie_id, tags=''):
return self.insert_or_update_with_composite_pk(
data={'actor_id':performer_id, 'movie_id':movie_id, 'tags':tags},
tbl_name = self.tbl_name_actor_movie,
composite_pk = ['actor_id', 'movie_id'],
exists_do_nothing = True
)
def update_actor_detail_404(self, data, is_full_data=1):
try:
data['is_full_data'] = is_full_data
return self.insert_or_update_common(data, self.tbl_name_actors, uniq_key='href')
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
def update_actor_detail(self, data, is_full_data=1):
try:
# 跟新actor表
avatar = data.get('avatar', {})
avatar['href'] = data['href']
avatar['is_full_data'] = is_full_data
avatar_id = self.insert_or_update_common(avatar, self.tbl_name_actors, uniq_key='href', exists_do_nothing=False)
if not avatar_id:
logging.warning(f"get actor id error. href: {data['href']}")
return None
else:
logging.debug(f"update actor data. href: {data['href']} avatar: {avatar}")
# 更新movies表
uncensored = data.get('uncensored', 0)
for movie in data.get('credits', []):
movie_id = self.insert_movie_index(movie, from_actor_list=1, uncensored=uncensored)
if movie_id:
logging.debug(f"insert one movie index. data: {movie}")
# 插入关系表
link_id = self.insert_actor_movie(avatar_id, movie_id)
if link_id:
logging.debug(f"insert one actor_movie record. actor id: {avatar_id}, movie id: {movie_id}")
return avatar_id
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
def query_actors(self, **filters):
try:
sql = f"SELECT href, en_name as name, uncensored, movies_cnt, id, is_full_data FROM {self.tbl_name_actors} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"en_name": " AND en_name LIKE ?",
"is_full_data": " AND is_full_data = ?",
"start_id": " AND id > ?",
"uncensored": " AND uncensored = ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "en_name":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
for key in ["is_full_data_in", "is_full_data_not_in"]:
if key in filters:
values = filters[key]
if values:
placeholders = ", ".join(["?"] * len(values))
operator = "IN" if key == "is_full_data_in" else "NOT IN"
sql += f" AND is_full_data {operator} ({placeholders})"
params.extend(values)
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [dict(row) for row in self.cursor.fetchall()]
#return [{'href': row[0], 'name': row[1], 'uncensored': row[2], 'movies_cnt':row[3]} for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
def query_movies(self, **filters):
try:
sql = f"SELECT href, title, uncensored, id, is_full_data FROM {self.tbl_name_movies} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"title": " AND title LIKE ?",
"is_full_data": " AND is_full_data = ?",
"start_id": " AND id > ?",
"uncensored": " AND uncensored = ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "title":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
for key in ["is_full_data_in", "is_full_data_not_in"]:
if key in filters:
values = filters[key]
if values:
placeholders = ", ".join(["?"] * len(values))
operator = "IN" if key == "is_full_data_in" else "NOT IN"
sql += f" AND is_full_data {operator} ({placeholders})"
params.extend(values)
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [dict(row) for row in self.cursor.fetchall()]
#return [{'href': row[0], 'title': row[1], 'uncensored': row[2], 'id':row[3]} for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 检查记录是否存在,不存在就插入
def check_and_get_id(self, item, uncensored, tbl, uniq_key='href'):
name = item['name']
href = item['href']
row_id = self.get_id_by_key(tbl, uniq_key, href)
if row_id is None:
row_id = self.insert_or_update_common({'name':name, 'href': href, 'uncensored':uncensored, 'from_movie_list':1}, tbl_name=tbl, uniq_key=uniq_key)
return row_id
def insert_or_update_tags(self, data, uniq_key='href'):
return self.insert_or_update_common(data, self.tbl_name_tags, uniq_key)
def insert_movie_tags(self, movie_id, tag_id, tags):
return self.insert_or_update_with_composite_pk(
data={'movie_id':movie_id, 'tag_id':tag_id, 'tags':tags},
tbl_name = self.tbl_name_movie_tags,
composite_pk = ['movie_id', 'tag_id'],
exists_do_nothing = True
)
def insert_or_update_movie_404(self, data, is_full_data=1):
try:
data['is_full_data'] = is_full_data
return self.insert_or_update_common(data, self.tbl_name_movies, uniq_key='href')
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# """插入或更新电影数据"""
def insert_or_update_movie(self, movie, is_full_data=1):
try:
# 获取相关 ID
studio_id = self.check_and_get_id(movie.get('studio'), movie.get('uncensored', 0), self.tbl_name_studios) if movie.get('studio') is not None else None
label_id = self.check_and_get_id(movie.get('label'), movie.get('uncensored', 0), self.tbl_name_labels) if movie.get('label') is not None else None
series_id = self.check_and_get_id(movie.get('series'), movie.get('uncensored', 0), self.tbl_name_series) if movie.get('series') is not None else None
if studio_id:
movie['studio_id'] = studio_id
if label_id:
movie['label_id'] = label_id
if series_id:
movie['series_id'] = series_id
movie['is_full_data'] = is_full_data
movie['actors_cnt'] = len(movie.get('actors', []))
movie_id = self.insert_or_update_common(movie, self.tbl_name_movies, uniq_key='href')
if movie_id is None:
logging.warning(f"insert/update movie error. data:{movie}")
return None
logging.debug(f"insert one move, id: {movie_id}, title: {movie['title']}, href: {movie['href']}")
# 插入 performers_movies 关系表
uncensored = movie.get('uncensored', 0)
for performer in movie.get('actors', []):
performer_id = self.get_id_by_key(self.tbl_name_actors, 'href', performer['href'])
# 如果演员不存在,先插入
if performer_id is None:
performer_id = self.insert_actor_index({'zh_name': performer['name'], 'href':performer['href']}, uncensored=uncensored, from_movie_list=1)
logging.debug(f"insert new perfomer. perfomer_id: {performer_id}, name:{performer['name']}")
if performer_id:
tmp_id = self.insert_actor_movie(performer_id, movie_id)
if tmp_id:
logging.debug(f"insert one perfomer_movie. perfomer_id: {performer_id}, movie_id:{movie_id}")
else:
logging.debug(f"insert perfomer_movie failed. perfomer_id: {performer_id}, movie_id:{movie_id}")
else:
logging.warning(f"insert perfomer failed. name: {performer['name']}, href: {performer['href']}")
# 插入 tags 表
for tag in movie.get('tags', []):
tag_name = tag.get('name', '')
tag_href = tag.get('href', '')
tag_id = self.insert_or_update_tags({'name':tag_name, 'href':tag_href}, uniq_key='href')
if tag_id:
logging.debug(f"insert one tags. tag_id: {tag_id}, name: {tag_name}")
tmp_id = self.insert_movie_tags(movie_id=movie_id, tag_id=tag_id, tags=tag_name)
if tmp_id:
logging.debug(f"insert one movie_tag. movie_id: {movie_id}, tag_id: {tag_id}, name: {tag_name}")
else:
logging.warning(f"insert one movie_tag error. movie_id: {movie_id}, tag_id: {tag_id}, name: {tag_name}")
else:
logging.warning(f"insert tags error. name:{tag_name}, href: {tag_href}")
return movie_id
except Exception as e:
self.conn.rollback()
logging.error("Error inserting movie: %s", e)
return None
# 更新 studio / label / series 等的多语言
def update_pubs_multilang(self, data, tbl, **filters):
tbls = {'studio': self.tbl_name_studios, 'label':self.tbl_name_labels, 'series':self.tbl_name_series, 'tags': self.tbl_name_tags}
if not tbls.get(tbl):
logging.warning(f"wrong table. table: {tbl}")
return None
return self.insert_or_update_common(data=data, tbl_name=tbls[tbl], uniq_key='href', exists_do_nothing=False)
def query_list_common(self, tbl, **filters):
tbls = {'studio': self.tbl_name_studios, 'label':self.tbl_name_labels, 'series':self.tbl_name_series}
if not tbls.get(tbl):
logging.warning(f"wrong table. table: {tbl}")
return None
try:
sql = f"SELECT href, name, uncensored, id FROM {tbls[tbl]} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"name": " AND name LIKE ?",
"start_id": " AND id > ?",
"uncensored": " AND uncensored = ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "name":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [{'href': row[0], 'name': row[1], 'uncensored': row[2], 'id':row[3]} for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
def update_tags(self, data):
return self.insert_or_update_common(data, self.tbl_name_tags, uniq_key='href')
def query_tags(self, **filters):
try:
sql = f"SELECT href, name, id FROM {self.tbl_name_tags} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"name": " AND name LIKE ?",
"start_id": " AND id > ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "name":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [{'href': row[0], 'name': row[1], 'id': row[2]} for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
def get_statics(self):
try:
self.cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM {self.tbl_name_actors}) AS actors,
(SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE uncensored=1) AS act_un,
(SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE is_full_data=1) AS act_full,
(SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE uncensored=1 AND is_full_data=1) AS act_unc_full,
(SELECT COUNT(*) FROM {self.tbl_name_movies}) AS movies,
(SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE uncensored=1) AS mov_un,
(SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE is_full_data=1) AS mov_full,
(SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE uncensored=1 AND is_full_data=1) AS mov_un_full,
(SELECT COUNT(*) FROM {self.tbl_name_studios}) AS studios,
(SELECT COUNT(*) FROM {self.tbl_name_labels}) AS labels,
(SELECT COUNT(*) FROM {self.tbl_name_series}) AS series
""")
row = self.cursor.fetchone()
if not row:
logging.warning(f"query no results.")
return {}
# 手动定义列名映射
#columns = ['actors', 'act_un', 'act_full', 'act_unc_full', 'movies', 'mov_un', 'mov_full', 'mov_un_full']
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return {}
# 处理影片的 无码 字段
def reset_movies_uncensored(self, check_and_do = 0):
try:
logging.info("创建临时表以便于保存待更新记录")
self.cursor.execute("""
CREATE TEMPORARY TABLE IF NOT EXISTS temp_movies_to_update (
movie_id INTEGER PRIMARY KEY
)
""")
# 清空临时表(以防之前有残留数据)
self.cursor.execute("DELETE FROM temp_movies_to_update")
logging.info(f"开始收集需要更新的影片ID...")
# 使用单个SQL语句完成所有条件的查询和插入
self.cursor.execute("""
INSERT OR IGNORE INTO temp_movies_to_update (movie_id)
SELECT DISTINCT m.id
FROM javbus_movies m
-- 连接演员表
LEFT JOIN javbus_actors_movies am ON m.id = am.movie_id
LEFT JOIN javbus_actors a ON am.actor_id = a.id
-- 连接标签/系列/工作室表
LEFT JOIN javbus_labels l ON m.label_id = l.id
LEFT JOIN javbus_series s ON m.series_id = s.id
LEFT JOIN javbus_studios st ON m.studio_id = st.id
-- 筛选条件任一表的href包含'uncensored'
WHERE a.href LIKE '%uncensored%'
OR l.href LIKE '%uncensored%'
OR s.href LIKE '%uncensored%'
OR st.href LIKE '%uncensored%'
""")
total_count = self.cursor.execute("SELECT COUNT(*) FROM temp_movies_to_update").fetchone()[0]
total_movies = self.cursor.execute("SELECT COUNT(*) FROM javbus_movies").fetchone()[0]
logging.info(f"共收集到 {total_count} 部需要更新的影片, 共有 {total_movies} 部影片")
if check_and_do:
# 1. 将所有记录的uncensored默认设为0
logging.info("开始将所有影片的uncensored设为默认值0...")
self.cursor.execute("UPDATE javbus_movies SET uncensored = 0")
logging.info(f"已将 {self.cursor.rowcount} 条记录的uncensored设为0")
# 2. 将临时表中匹配的记录设为1
logging.info("开始将匹配的影片的uncensored设为1...")
self.cursor.execute("""
UPDATE javbus_movies
SET uncensored = 1
WHERE id IN (SELECT movie_id FROM temp_movies_to_update)
""")
logging.info(f"已将 {self.cursor.rowcount} 条记录的uncensored设为1")
self.conn.commit()
else:
logging.info("check完毕本次忽略更新。。。")
logging.info("任务执行完成!")
except sqlite3.Error as e:
self.conn.rollback()
logging.error("Error inserting movie: %s", e)
logging.error(f"query error: {e}")
# 处理影片的 无码 字段
def reset_actor_movies(self, check_and_do = 0):
try:
# 检查表中是否已存在movies_cnt列
self.cursor.execute(f"PRAGMA table_info({self.tbl_name_actors});")
columns = [row[1] for row in self.cursor.fetchall()]
if 'movies_cnt' not in columns:
# 列不存在,添加新列
add_field_sql = f"""
ALTER TABLE {self.tbl_name_actors} ADD COLUMN movies_cnt INTEGER DEFAULT 0 NOT NULL;
"""
self.cursor.execute(add_field_sql)
logging.info("成功添加movies_cnt字段")
else:
logging.info("movies_cnt字段已存在跳过添加")
# 确保关联表有索引
self.cursor.execute(f"""
CREATE INDEX IF NOT EXISTS idx_actor_movie_actor_id
ON {self.tbl_name_actor_movie}(actor_id);
""")
# 创建临时表存储统计结果
self.cursor.execute(f"""
CREATE TEMPORARY TABLE temp_actor_counts AS
SELECT actor_id, COUNT(movie_id) AS cnt
FROM {self.tbl_name_actor_movie}
GROUP BY actor_id;
""")
# 为临时表添加索引
self.cursor.execute("CREATE INDEX idx_temp_actor_id ON temp_actor_counts(actor_id);")
# 更新主表
self.cursor.execute(f"""
UPDATE {self.tbl_name_actors}
SET movies_cnt = COALESCE((
SELECT cnt FROM temp_actor_counts
WHERE actor_id = {self.tbl_name_actors}.id
), 0); -- 使用COALESCE处理没有影片的演员
""")
updated_rows = self.cursor.rowcount
logging.info(f"成功更新{updated_rows}个演员的影片数量")
self.conn.commit()
logging.info("任务执行完成!")
except sqlite3.Error as e:
self.conn.rollback()
logging.error("Error updating actor movie_cnt: %s", e)

View File

@ -44,7 +44,9 @@ class FailureMonitorExtension:
'''Sent when a Request, scheduled by the engine to be downloaded later, is rejected by the scheduler.'''
def request_dropped(self, request, spider):
spider.logger.warning(f"request_dropped on url {request.url}")
# 从request.meta中获取丢弃理由
drop_reason = request.meta.get('_dropreason', '未知原因')
spider.logger.warning(f"request_dropped on url: {request.url} | 原因: {drop_reason}")
self.calculate_failure(spider)
'''

View File

@ -89,7 +89,9 @@ class StatsExtension:
'spider': self.spider_name,
'scrapy_req': stats.get('downloader/request_count', 0),
'middle_req': stats.get('cloudscraper/request_count', 0),
'total_req': stats.get('cloudscraper/request_count', 0) + stats.get('downloader/request_count', 0),
'cache_hits': stats.get('httpcache/hits', 0), # 本地缓存读取次数
#'total_req': stats.get('cloudscraper/request_count', 0) + stats.get('downloader/request_count', 0),
'total_req': stats.get('httpcache/downloader/request_count', 0), # hits + misses
'total_rsp': stats.get('downloader/response_count', 0),
'200_cnt': stats.get('downloader/response_status_count/200', 0),
'404_cnt': stats.get('downloader/response_status_count/404', 0),

View File

@ -222,3 +222,275 @@ class LordActorItem(scrapy.Item):
weight_kg = scrapy.Field()
is_full_data = scrapy.Field()
alias = scrapy.Field()
class IafdDistributorsItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
parent_id = scrapy.Field()
details = scrapy.Field()
class IafdMetaEthnicItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
class IafdMoviesItem(scrapy.Item):
title = scrapy.Field()
minutes = scrapy.Field()
distributor_id = scrapy.Field()
studio_id = scrapy.Field()
release_date = scrapy.Field()
added_to_IAFD_date = scrapy.Field()
all_girl = scrapy.Field()
all_male = scrapy.Field()
compilation = scrapy.Field()
webscene = scrapy.Field()
director_id = scrapy.Field()
href = scrapy.Field()
is_full_data = scrapy.Field()
release_year = scrapy.Field()
from_performer_list = scrapy.Field()
from_dist_list = scrapy.Field()
from_stu_list = scrapy.Field()
class IafdMoviesAppersInItem(scrapy.Item):
movie_id = scrapy.Field()
appears_in_id = scrapy.Field()
gradation = scrapy.Field()
notes = scrapy.Field()
class IafdPerformerAliasesItem(scrapy.Item):
performer_id = scrapy.Field()
alias = scrapy.Field()
class IafdPerformerUrlsItem(scrapy.Item):
performer_id = scrapy.Field()
position = scrapy.Field()
url = scrapy.Field()
class IafdPerformersItem(scrapy.Item):
name = scrapy.Field()
gender = scrapy.Field()
birthday = scrapy.Field()
astrology = scrapy.Field()
birthplace = scrapy.Field()
years_active = scrapy.Field()
ethnicity = scrapy.Field()
nationality = scrapy.Field()
hair_colors = scrapy.Field()
eye_color = scrapy.Field()
height_str = scrapy.Field()
weight_str = scrapy.Field()
measurements = scrapy.Field()
tattoos = scrapy.Field()
piercings = scrapy.Field()
fake_tits = scrapy.Field()
href = scrapy.Field()
weight = scrapy.Field()
height = scrapy.Field()
rating = scrapy.Field()
movies_cnt = scrapy.Field()
vixen_cnt = scrapy.Field()
blacked_cnt = scrapy.Field()
tushy_cnt = scrapy.Field()
x_art_cnt = scrapy.Field()
is_full_data = scrapy.Field()
birth_year = scrapy.Field()
from_astro_list = scrapy.Field()
from_birth_list = scrapy.Field()
from_ethnic_list = scrapy.Field()
from_movie_list = scrapy.Field()
class IafdPerformersMoviesItem(scrapy.Item):
performer_id = scrapy.Field()
movie_id = scrapy.Field()
role = scrapy.Field()
notes = scrapy.Field()
class IafdStudiosItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
parent_id = scrapy.Field()
details = scrapy.Field()
class IafdTaskLogItem(scrapy.Item):
task_id = scrapy.Field()
full_data_performers = scrapy.Field()
total_performers = scrapy.Field()
full_data_movies = scrapy.Field()
total_movies = scrapy.Field()
total_distributors = scrapy.Field()
total_studios = scrapy.Field()
task_status = scrapy.Field()
class JavbusActorsItem(scrapy.Item):
ja_name = scrapy.Field()
zh_name = scrapy.Field()
en_name = scrapy.Field()
href = scrapy.Field()
pic = scrapy.Field()
birth_date = scrapy.Field()
height = scrapy.Field()
breast_size = scrapy.Field()
measurements = scrapy.Field()
uncensored = scrapy.Field()
is_full_data = scrapy.Field()
from_actor_list = scrapy.Field()
from_movie_list = scrapy.Field()
movies_cnt = scrapy.Field()
# 以下为添加字段
avatar = scrapy.Field()
credits = scrapy.Field()
class JavbusActorsMoviesItem(scrapy.Item):
actor_id = scrapy.Field()
movie_id = scrapy.Field()
tags = scrapy.Field()
class JavbusLabelsItem(scrapy.Item):
name = scrapy.Field()
en_name = scrapy.Field()
ja_name = scrapy.Field()
href = scrapy.Field()
details = scrapy.Field()
uncensored = scrapy.Field()
from_list = scrapy.Field()
from_movie_list = scrapy.Field()
movies_cnt = scrapy.Field()
magnet_cnt = scrapy.Field()
class JavbusMoviesItem(scrapy.Item):
href = scrapy.Field()
title = scrapy.Field()
cover_url = scrapy.Field()
serial_number = scrapy.Field()
release_date = scrapy.Field()
duration = scrapy.Field()
studio_id = scrapy.Field()
label_id = scrapy.Field()
series_id = scrapy.Field()
is_full_data = scrapy.Field()
uncensored = scrapy.Field()
from_actor_list = scrapy.Field()
from_movie_studios = scrapy.Field()
from_movie_labels = scrapy.Field()
from_movie_series = scrapy.Field()
actors_cnt = scrapy.Field()
# 以下为添加字段
studio = scrapy.Field()
label = scrapy.Field()
series = scrapy.Field()
actors = scrapy.Field()
tags = scrapy.Field()
class JavbusMoviesTagsItem(scrapy.Item):
movie_id = scrapy.Field()
tag_id = scrapy.Field()
tags = scrapy.Field()
class JavbusSeriesItem(scrapy.Item):
name = scrapy.Field()
en_name = scrapy.Field()
ja_name = scrapy.Field()
href = scrapy.Field()
details = scrapy.Field()
uncensored = scrapy.Field()
from_list = scrapy.Field()
from_movie_list = scrapy.Field()
movies_cnt = scrapy.Field()
magnet_cnt = scrapy.Field()
class JavbusStudiosItem(scrapy.Item):
name = scrapy.Field()
en_name = scrapy.Field()
ja_name = scrapy.Field()
href = scrapy.Field()
details = scrapy.Field()
uncensored = scrapy.Field()
from_list = scrapy.Field()
from_movie_list = scrapy.Field()
movies_cnt = scrapy.Field()
magnet_cnt = scrapy.Field()
class JavbusTagsItem(scrapy.Item):
name = scrapy.Field()
en_name = scrapy.Field()
ja_name = scrapy.Field()
href = scrapy.Field()
class JavdbActorsItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
pic = scrapy.Field()
is_full_data = scrapy.Field()
from_actor_list = scrapy.Field()
from_movie_list = scrapy.Field()
class JavdbActorsAliasItem(scrapy.Item):
actor_id = scrapy.Field()
alias = scrapy.Field()
class JavdbActorsMoviesItem(scrapy.Item):
actor_id = scrapy.Field()
movie_id = scrapy.Field()
tags = scrapy.Field()
class JavdbMakersItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
parent_id = scrapy.Field()
details = scrapy.Field()
from_list = scrapy.Field()
from_movie_list = scrapy.Field()
class JavdbMoviesItem(scrapy.Item):
href = scrapy.Field()
title = scrapy.Field()
cover_url = scrapy.Field()
serial_number = scrapy.Field()
release_date = scrapy.Field()
duration = scrapy.Field()
maker_id = scrapy.Field()
series_id = scrapy.Field()
is_full_data = scrapy.Field()
from_actor_list = scrapy.Field()
from_movie_makers = scrapy.Field()
from_movie_series = scrapy.Field()
from_movie_publishers = scrapy.Field()
pub_id = scrapy.Field()
uncensored = scrapy.Field()
class JavdbMoviesTagsItem(scrapy.Item):
movie_id = scrapy.Field()
tag_id = scrapy.Field()
tags = scrapy.Field()
class JavdbPublishersItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
parent_id = scrapy.Field()
details = scrapy.Field()
from_list = scrapy.Field()
from_movie_list = scrapy.Field()
class JavdbSeriesItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
parent_id = scrapy.Field()
details = scrapy.Field()
from_list = scrapy.Field()
from_movie_list = scrapy.Field()
class JavdbTagsItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
class JavdbTaskLogItem(scrapy.Item):
task_id = scrapy.Field()
full_data_actors = scrapy.Field()
total_actors = scrapy.Field()
full_data_movies = scrapy.Field()
total_movies = scrapy.Field()
total_makers = scrapy.Field()
total_series = scrapy.Field()
task_status = scrapy.Field()

View File

@ -175,3 +175,28 @@ class CloudScraperMiddleware:
self.stats.inc_value(f'cloudscraper/exception_type_count/{e.__class__.__name__}')
spider.logger.error(f"CloudScraper请求失败: {e}")
return None # 失败时使用默认下载器
# middlewares.py
class SmartCacheControlMiddleware:
"""
根据 Spider 的 debug 变量控制缓存:
- 若 spider.debug = True强制开启缓存即使未指定 meta={'cache': True}
- 若 spider.debug = False 或未定义:默认关闭缓存,仅手动指定 meta={'cache': True} 时开启
"""
def process_request(self, request, spider):
# 读取 Spider 中的 debug 变量(默认 False
is_debug = getattr(spider, 'debug', False)
# 逻辑判断
if is_debug:
# debug 模式强制开启缓存dont_cache=False
request.meta['dont_cache'] = False
else:
# 非 debug 模式:默认关闭,仅手动指定时开启
if request.meta.get('cache'):
request.meta['dont_cache'] = False
else:
request.meta['dont_cache'] = True
#spider.logger.debug(f"url: {request.url}, cached-setting: debug({is_debug}), dont_cache: {request.meta['dont_cache']}")

View File

@ -53,6 +53,8 @@ DOWNLOADER_MIDDLEWARES = {
'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
'scrapy_useragents.downloadermiddlewares.useragents.UserAgentsMiddleware': None,
'scrapy_proj.middlewares.CloudScraperMiddleware': 543,
'scrapy_proj.middlewares.SmartCacheControlMiddleware': 800, # 自定义中间件(优先级高于内置缓存中间件)
'scrapy.downloadermiddlewares.httpcache.HttpCacheMiddleware': 900,
}
# settings.py
@ -83,6 +85,9 @@ STATS_EXPORT_SCRIPT = 'scrapy_proj/extensions/push_to_wecom.sh' # 本地shell
TWISTED_REACTOR = 'twisted.internet.epollreactor.EPollReactor' # 适用于Linux
# 允许 404 状态码被 Spider 处理
HTTPERROR_ALLOWED_CODES = [404]
# Crawl responsibly by identifying yourself (and your website) on the user-agent
#USER_AGENT = "scrapy_proj (+http://www.yourdomain.com)"
@ -151,11 +156,11 @@ TWISTED_REACTOR = 'twisted.internet.epollreactor.EPollReactor' # 适用于Linux
# Enable and configure HTTP caching (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
#HTTPCACHE_ENABLED = True
#HTTPCACHE_EXPIRATION_SECS = 0
#HTTPCACHE_DIR = "httpcache"
#HTTPCACHE_IGNORE_HTTP_CODES = []
#HTTPCACHE_STORAGE = "scrapy.extensions.httpcache.FilesystemCacheStorage"
HTTPCACHE_ENABLED = True
HTTPCACHE_EXPIRATION_SECS = 0
HTTPCACHE_DIR = "~/sharedata/scrapy_cached"
HTTPCACHE_IGNORE_HTTP_CODES = []
HTTPCACHE_STORAGE = "scrapy.extensions.httpcache.FilesystemCacheStorage"
# Set settings whose default value is deprecated to a future-proof value
FEED_EXPORT_ENCODING = "utf-8"

View File

@ -0,0 +1,424 @@
import scrapy
import re
import sys
from urllib.parse import urljoin, quote_plus
from scrapy_proj.spiders.base_spider import BaseSpider
from scrapy_proj.items import JavbusActorsItem, JavbusMoviesItem, JavbusActorsMoviesItem, JavbusLabelsItem, JavbusMoviesTagsItem, JavbusSeriesItem, JavbusStudiosItem, JavbusTagsItem
from scrapy_proj.db_wapper.spider_db_handler import JavBusDBHandler
from scrapy_proj.comm.comm_def import SPIDER_NAME_JAVBUS
from scrapy_proj.spiders.parser.javbus_parser import common_parser
from scrapy_proj.utils.utils import pretty_json_simple, normalize_url, generate_multilang_urls, is_valid_url
db_tools = JavBusDBHandler()
class JavbusSpiderSpider(BaseSpider):
name = SPIDER_NAME_JAVBUS
allowed_domains = ["javbus.com", "www.javbus.com"]
# 配置请求头复用curl中的头部信息
custom_settings = {
"DEFAULT_REQUEST_HEADERS": {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Sec-Fetch-Site": "none",
"Accept-Encoding": "gzip, deflate, br",
"Sec-Fetch-Mode": "navigate",
"Host": "www.javbus.com",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15",
"Accept-Language": "zh-CN,zh-Hans;q=0.9",
"Sec-Fetch-Dest": "document",
"Connection": "keep-alive",
},
"COOKIES_ENABLED": True # 启用Cookie支持
}
host_url = "https://www.javbus.com"
def __init__(self, debug='false', cmd='', mod='all', *args, **kwargs):
super().__init__(*args, **kwargs)
self.debug = True if (str(debug).lower() == 'true' or str(debug).lower() == '1') else False
self.update_mode = True if mod and mod.lower() == 'update' else False
self.logger.info(f"RUN CMD: {' '.join(sys.argv)}")
self.cmd_actors = 'actors'
self.cmd_movies = 'movies'
self.cmd_dist = 'dist_list'
self.cmd_list = [self.cmd_actors, self.cmd_movies, self.cmd_dist]
if cmd and cmd != '':
self.cmd_list = cmd.split(',')
self.existed_actors = {}
self.existed_movies = {}
self.load_existed_actors()
self.load_existed_movies()
self.requested_url = set()
# 入口函数,由基类的方法触发
def custom_start_requests(self):
self.crawler.stats.set_value(f"{self.name}/actor_all", 0)
self.crawler.stats.set_value(f"{self.name}/actor_done", 0)
self.crawler.stats.set_value(f"{self.name}/movie_all", 0)
self.crawler.stats.set_value(f"{self.name}/movie_done", 0)
# 根据命令字执行
if self.cmd_actors in self.cmd_list:
meta_actor = {}
for lang in ["en", "ja", "zh"]:
s_url = f"/{lang}/uncensored/actresses" if lang != 'zh' else f"/uncensored/actresses"
url = urljoin(self.host_url, s_url)
yield scrapy.Request(url,
callback=self.parser_actor_list,
headers=self.settings.get('DEFAULT_REQUEST_HEADERS'), # 使用GET头
meta={'lang': lang, 'uncensored':1, 'from_actor_list':1})
for lang in ["en", "ja", "zh"]:
s_url = f"/{lang}/actresses" if lang != 'zh' else f"/actresses"
url = urljoin(self.host_url, s_url)
yield scrapy.Request(url,
callback=self.parser_actor_list,
headers=self.settings.get('DEFAULT_REQUEST_HEADERS'), # 使用GET头
meta={'lang': lang, 'uncensored':1, 'from_actor_list':1}
)
query_args = {}
if self.debug:
query_args['limit'] = 5
if self.update_mode:
query_args['is_full_data'] = 0
# 读取待更新的演员列表
if False:
actors = db_tools.query_actors(**query_args)
if actors:
for item in actors:
href = item.get('href', '')
movies_cnt = item['movies_cnt'] if item['movies_cnt'] else 0
self.logger.info(f"fetch from db. item: {item}")
yield scrapy.Request(href,
callback=self.parse_actor_detail_page,
headers=self.settings.get('DEFAULT_REQUEST_HEADERS'), # 使用GET头
meta={'id': item.get('id', 0), 'name': item.get('name', ''), 'movies_cnt': movies_cnt, 'item_type':'actor', 'actor_url': href}
)
# 读取待更新的影片列表
if False:
movies = db_tools.query_movies(**query_args)
if movies:
for item in movies:
href = item.get('href', '')
self.logger.info(f"fetch from db. item: {item}")
yield scrapy.Request(href,
callback=self.parse_movie_detail_page,
headers=self.settings.get('DEFAULT_REQUEST_HEADERS'), # 使用GET头
meta={'id': item.get('id', 0), 'title': item.get('title', ''), 'item_type':'movie'}
)
# 演员列表页解析
def parser_actor_list(self, response):
lang = response.meta.get('lang', '')
uncensored = response.meta.get('uncensored', 1)
data, next_url = common_parser(html=response.text, page='actor_list', href=response.url)
if data:
self.logger.info(f"fetched data from {response.url}, data count: {len(data)}")
for item in data:
url = item['href']
name = item['name']
# 更新对应语言的姓名
item = JavbusActorsItem()
item['href'] = normalize_url(url) # 改为统一的url
item[f"{lang}_name"] = name
yield item
# 发起查询详情,只有 lang = 'zh' 时执行
if lang == 'zh' and self._can_request(url) :
yield scrapy.Request(url,
callback=self.parse_actor_detail_page,
headers=self.settings.get('DEFAULT_REQUEST_HEADERS'), # 使用GET头
meta={'lang': lang, 'actor_name': name, 'actor_url': url })
self.crawler.stats.inc_value(f"{self.name}/actor_all")
if next_url:
yield scrapy.Request(next_url,
callback=self.parser_actor_list,
headers=self.settings.get('DEFAULT_REQUEST_HEADERS'), # 使用GET头
meta=response.meta
)
else:
self.logger.warning(f"parse data error. {response.url}")
# 处理详细的解析页面
def parse_actor_detail_page(self, response):
actor_url = response.meta.get('actor_url', '')
actor_name = response.meta.get('actor_name', '')
lang = response.meta.get('lang', 'zh')
data, next_url = common_parser(html=response.text, page='actor_detail', href=response.url)
if data:
self.logger.debug(f"fetched data from {response.url}, data: {data}")
# 其他语言,只更新必要字段
if lang != 'zh' and not response.meta.get('from_cache'):
avatar = data.get('avatar',{})
item = JavbusActorsItem()
item['href'] = normalize_url(actor_url)
item[f"{lang}_name"] = avatar['name']
yield item
return None
# 判断是否需要更新: 存在完整数据,且影片数量相同
titles = data.get('title', {})
movies_cnt = titles.get('movies_cnt', 0)
if not self.need_update_actor(href=actor_url, movies_cnt=movies_cnt):
self.crawler.stats.inc_value(f"{self.name}/actor_done")
self.logger.info(f"actor ({actor_name}) up to date. skipping... url: {actor_url}")
return None
# 需要更新了,先翻页
if next_url:
yield scrapy.Request(next_url,
callback=self.parse_actor_detail_page,
headers=self.settings.get('DEFAULT_REQUEST_HEADERS'), # 使用GET头
meta={'lang': lang, 'actor_name': actor_name, 'actor_url': actor_url })
else:
self.logger.info(f"actor ({actor_name}) read all pages. url :{response.url}")
self.crawler.stats.inc_value(f"{self.name}/actor_done")
self.add_actor_to_existed(href=actor_url, movies_cnt=movies_cnt)
# 更新详情数据
item = JavbusActorsItem()
item['href'] = actor_url
item['zh_name'] = actor_name
item['uncensored'] = 1 if 'uncensored' in actor_url else 0
item['is_full_data'] = 1
item['movies_cnt'] = movies_cnt
item['avatar'] = data.get('avatar', {})
item['credits'] = data.get('movies', [])
for k, v in data.get('avatar', {}).items():
if k in item.fields:
item[k] = v
yield item
# 影片链接,判断是否需要发起
for item in data.get('movies', []):
url = item['href']
if self.need_update_movie(href=url) and self._can_request(url):
# 发起查询
self.crawler.stats.inc_value(f"{self.name}/movie_all")
yield scrapy.Request(url,
callback=self.parse_movie_detail_page,
headers=self.settings.get('DEFAULT_REQUEST_HEADERS'), # 使用GET头
meta={'title': item.get('title', ''), 'item_type':'movie', 'cache':True}
)
else:
self.logger.warning(f"fetched data error. {response.url}")
def parse_movie_detail_page(self, response):
title = response.meta.get('title', '')
data = common_parser(html=response.text, page='movies', href=response.url, title=title)
if data:
self.crawler.stats.inc_value(f"{self.name}/movie_done")
self.logger.info(f"fetched data from {response.url}, data: {data}")
# 把movies信息入库
item = JavbusMoviesItem()
for k, v in data.items():
if k in item.fields:
item[k] = v
yield item
# 处理actors列表
for actor in data.get('actors', []):
yield from self._create_multi_langs_request(
href = actor['href'],
name = actor['name'],
callback = self.parse_actor_detail_page,
prefix = 'actor'
)
# 处理tags列表
for tag in data.get('tags', []):
# 处理tags
yield from self._create_multi_langs_request(
href = tag['href'],
name = tag['name'],
callback = self.parse_movie_list_page,
prefix = 'tags'
)
# 处理studio
yield from self._create_multi_langs_request(
href = data['studio']['href'],
name = data['studio']['name'],
callback = self.parse_movie_list_page,
prefix = 'studio'
)
# 处理series
yield from self._create_multi_langs_request(
href = data['label']['href'],
name = data['label']['name'],
callback = self.parse_movie_list_page,
prefix = 'label'
)
# 处理series
yield from self._create_multi_langs_request(
href = data['series']['href'],
name = data['series']['name'],
callback = self.parse_movie_list_page,
prefix = 'series'
)
else:
self.logger.warning(f"fetched data error. {response.url}")
def _create_multi_langs_request(self, href, name, callback, prefix):
"""创建单个对象的多语言请求"""
if href == '':
return
if is_valid_url(href):
langs_url = generate_multilang_urls(href)
for lang, next_url in langs_url.items():
if not self._can_request(next_url):
continue
if lang == 'zh' and prefix=='actor':
self.crawler.stats.inc_value(f"{self.name}/actor_all")
# 构建meta参数统一键名格式{prefix}_name、{prefix}_url
meta = {
'lang': lang,
f'{prefix}_name': name,
f'{prefix}_url': href,
'prefix': prefix,
'cache': True if lang != 'zh' else False # 统一cache逻辑
}
yield scrapy.Request(
next_url,
callback=callback,
headers=self.settings.get('DEFAULT_REQUEST_HEADERS'),
meta=meta
)
else:
self.logger.warning(f"wrong url. {url}, ignore...")
# 处理 tags, studio, label, series 列表的公共函数
def parse_movie_list_page(self, response):
# 定义 prefix 与 Item 类的映射关系
ITEM_MAPPING = {
'tags': JavbusTagsItem,
'studio': JavbusStudiosItem,
'label': JavbusLabelsItem,
'series': JavbusSeriesItem
}
data, next_url = common_parser(html=response.text, page='movie_list', href=response.url)
lang = response.meta.get('lang', 'zh')
prefix = response.meta.get('prefix', '')
if data:
self.logger.debug(f"fetched data from {response.url}, data: {data}")
# 根据 prefix 获取对应的 Item 类
ItemClass = ITEM_MAPPING.get(prefix)
if not ItemClass:
self.logger.warning(f"未找到 {prefix} 对应的 Item 类")
return None
# 实例化 Item 并赋值(假设所有 Item 都有 'name' 和 'href' 字段)
item = ItemClass()
item['href'] = response.meta.get(f'{prefix}_url', '')
title_meta = data.get('meta', {})
for k, v in title_meta.items():
if k in item.fields:
item[k] = v
name_key = f"{lang}_name" if lang !='zh' else 'name'
if name_key in item.fields:
item[name_key] = title_meta.get('title')
if not response.meta.get('from_cache'):
yield item
# 只有zh的才会继续查询
if lang != 'zh':
return None
# 影片链接,判断是否需要发起
for item in data.get('movies', []):
url = item['href']
if self.need_update_movie(href=url) and self._can_request(url):
# 发起查询
self.crawler.stats.inc_value(f"{self.name}/movie_all")
yield scrapy.Request(url,
callback=self.parse_movie_detail_page,
headers=self.settings.get('DEFAULT_REQUEST_HEADERS'), # 使用GET头
meta={'title': item.get('title', ''), 'item_type':'movie', 'cache':True}
)
# 处理翻页
if next_url:
yield scrapy.Request(next_url,
callback=self.parse_movie_list_page,
headers=self.settings.get('DEFAULT_REQUEST_HEADERS'), # 使用GET头
meta=response.meta
)
else:
self.logger.info(f"movies list ({prefix}) read all pages. url :{response.url}")
else:
self.logger.warning(f"parse data error. {response.url}")
def custom_block_check(self, response):
item_type = response.meta.get('item_type', '')
if "invalid or outdated page" in response.text.lower():
self.logger.warning(f"invalid or outdated page. url: {response.url}, item_type: {item_type}")
return "invalid or outdated page"
else:
self.logger.info(f"right content. url: {response.url}")
return None
# 处理页面异常主要是404, 403
def handle_blocked(self, response, reason):
item_type = response.meta.get('item_type', '')
if response.status in [404, 403]:
self.logger.warning(f"get 404 page. url: {response.url}, item_type: {item_type}")
def load_existed_actors(self):
query_args = {}
rows = db_tools.query_actors(**query_args)
if rows:
for item in rows:
self.existed_actors[item['href']] = {'is_full_data': item['is_full_data'], 'movies_cnt': item['movies_cnt']}
else:
self.logger.warning(f"query_actors empty. query args: {query_args}")
def load_existed_movies(self):
query_args = {}
rows = db_tools.query_movies(**query_args)
if rows:
for item in rows:
self.existed_movies[item['href']] = item['is_full_data']
else:
self.logger.warning(f"query_movies empty. query args: {query_args}")
# 内存缓存也可以改为查询db
def need_update_movie(self, href):
return not (href in self.existed_movies and self.existed_movies[href] >0)
# 内存缓存也可以改为查询db
def need_update_actor(self, href, movies_cnt):
if href not in self.existed_actors:
return True
data = self.existed_actors[href]
if data['is_full_data'] <=0 :
return True
if data['movies_cnt'] < movies_cnt:
return True
return False
def add_actor_to_existed(self, href, movies_cnt, is_full_data=1):
self.existed_actors[href] = {'is_full_data': is_full_data, 'movies_cnt': movies_cnt}
def acc_movie_to_existed(self, href, is_full_data=1):
self.existed_movies[href] = is_full_data
def _can_request(self, href):
if href in self.requested_url:
return False
self.requested_url.add(href)
return True

View File

@ -0,0 +1,10 @@
import scrapy
class JavdbSpiderSpider(scrapy.Spider):
name = "javdb_spider"
allowed_domains = ["www.javdb.com"]
start_urls = ["https://www.javdb.com"]
def parse(self, response):
pass

View File

@ -0,0 +1,585 @@
import logging
import sys
import requests
import re
import time
from bs4 import BeautifulSoup
from urllib.parse import urljoin
#import src.utils.utils as utils
http_code_404 = 404
http_code_403 = 403
http_code_redirect = 401
http_code_url = 601
http_code_local = 99
# 通用的爬取类,主要实现了底层的网络交互封装
class GenericCrawler:
def __init__(self, use_cloudscraper=None, headers=None, cookies=None, max_retries=3, html_parser='html.parser'):
if use_cloudscraper is None:
use_cloudscraper = sys.version_info >= (3, 8)
self.use_cloudscraper = use_cloudscraper
self.headers = headers or {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0'
}
self.cookies = cookies or {}
self.scraper = None # 延迟初始化
self.max_retries = max_retries
self.parser = html_parser
# 不在这里导入 cloudscraper而是在需要时导入
def _initialize_scraper(self):
"""延迟初始化请求客户端,避免不必要的 cloudscraper 导入"""
if self.scraper is not None:
return
if self.use_cloudscraper:
try:
# 延迟导入 cloudscraper
import cloudscraper
self.scraper = cloudscraper.create_scraper()
logging.info("Using cloudscraper for requests")
except ImportError:
logging.warning("cloudscraper not installed. Falling back to requests.")
self.use_cloudscraper = False
self.scraper = requests.Session()
else:
self.scraper = requests.Session()
logging.info("Using requests for HTTP operations")
def fetch_page(self, url, validator):
# 在使用前初始化 scraper
self._initialize_scraper()
for attempt in range(self.max_retries):
try:
#if not utils.is_valid_url(url):
# logging.error(f'wrong url format: {url}')
# return None, http_code_url
response = self.scraper.get(url, headers=self.headers, cookies=self.cookies)
# 处理 HTTP 状态码
if response.status_code in [http_code_404, http_code_403]:
logging.debug(f"get http code: {response.status_code}, url: {url}")
return None, response.status_code # 直接返回,调用方可以跳过
response.raise_for_status() # 处理 HTTP 错误
# 检查是否发生跳转,比如到登录页面
if response.history:
logging.debug(f"Page redirected on {url}. Checking if it's a verify page.")
soup = BeautifulSoup(response.text, self.parser)
if self.check_redirect(soup) :
logging.warning(f"Page redirected to verify page on {url}.")
return None, http_code_redirect
# 判断是否为登录页面
#if soup.find('div', id='ageVerify'):
# 预处理 HTML如果提供了 preprocessor
html_text = self.preprocessor(response.text)
soup = BeautifulSoup(html_text, self.parser)
if validator(soup): # 进行自定义页面检查
return soup, response.status_code
logging.warning(f"Validation failed on attempt {attempt + 1} for {url}")
except Exception as e:
logging.error(f"Unexpected error on {url}: {e}, Retrying...")
time.sleep(0.3)
logging.error(f'Fetching failed after max retries. {url}')
return None, None # 达到最大重试次数仍然失败
# 对页面的预处理,通常是修复标签之类的
def preprocessor(self, html):
return html
# 检查是否发生了跳转,偏离了正常解析
def check_redirect(self, soup):
"""默认的页面验证器,子类可重写"""
return False # 默认验证通过
@staticmethod
def generic_validator(soup, tag, identifier, attr_type="id"):
if attr_type == "id":
return soup.find(tag, id=identifier) is not None
elif attr_type == "class":
return bool(soup.find_all(tag, class_=identifier))
elif attr_type == "name":
return bool(soup.find('select', {'name': identifier}))
return False
# javbus.com 网页爬取类
class JavbusCrawler(GenericCrawler):
def __init__(self, use_cloudscraper=None):
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Sec-Fetch-Site": "none",
"Accept-Encoding": "gzip, deflate, br",
"Sec-Fetch-Mode": "navigate",
"Host": "www.javbus.com",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15",
"Accept-Language": "zh-CN,zh-Hans;q=0.9",
"Sec-Fetch-Dest": "document",
"Connection": "keep-alive",
}
cookies = {
'PHPSESSID': 'l9m4ugaaao1hgvl3micr22u3o6',
'existmag': 'all',
'age': 'verified'
}
super().__init__(use_cloudscraper, headers=headers, cookies=cookies)
self.host_url = "https://www.javbus.com"
# 以下是原有的解析函数,保持不变
def parse_actors_list(self, soup, href):
div_actors = soup.find("div", id='waterfall')
if not div_actors:
logging.warning(f"Warning: No actors div found ")
return None, None
# 解析元素
rows = div_actors.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
# 获取演员详情链接
actor_link = row.find('a')['href']
# 获取演员名字
actor_name = row.find('span').text.strip()
# 获取头像图片链接
avatar_url = row.find('img')['src']
list_data.append({
'name': actor_name,
'href': urljoin(self.host_url, actor_link),
'pic': avatar_url
})
# 查找 "下一页" 按钮
div_link = soup.find("div", class_='text-center hidden-xs')
if div_link:
next_page_element = soup.find('a', id='next')
if next_page_element:
next_page_url = next_page_element['href']
next_url = urljoin(href, next_page_url)
return list_data, next_url
# 获取演员详情
def parse_actor_detail(self, soup, href):
"""
解析Javbus网页内容提取演员信息和影片列表
"""
result = {
'avatar': {},
'title' : {},
'movies': []
}
try:
# 解析演员信息
avatar_box = soup.find('div', class_='avatar-box')
if avatar_box:
result['avatar'] = self.parse_avatar_info(avatar_box)
else:
logging.debug(f"avatar-box not found. href: {href}")
# 解析页面上的title获取影片数量等信息
result['title'] = self.parse_title_info(soup, href)
# 解析影片列表
movie_boxes = soup.find_all('a', class_='movie-box')
if movie_boxes:
for movie_box in movie_boxes:
movie_info = self.parse_movie_info(movie_box)
if movie_info:
result['movies'].append(movie_info)
else:
logging.debug(f"movie-box not found. href: {href}")
except Exception as e:
logging.warning(f"parse html error: {str(e)}, href: {href}", exc_info=True)
# 查找 "下一页" 按钮
next_url = None
div_link = soup.find("div", class_='text-center hidden-xs')
if div_link:
next_page_element = soup.find('a', id='next')
if next_page_element:
next_page_url = next_page_element['href']
next_url = urljoin(href, next_page_url)
return result, next_url
def parse_avatar_info(self, avatar_box):
"""
解析演员信息
"""
avatar_info = {}
# 定义映射关系:包含各种语言的字段名称及其对应的目标键名
field_mapping = {
'birth_date': ['生日', 'D.O.B', '生年月日', 'Birthday'],
'age': ['年齡', 'Age', '年龄'],
'height': ['身高', 'Height', '身長'],
'breast_size': ['罩杯', 'Cup', 'ブラのサイズ'],
'bust': ['胸圍', 'Bust', 'バスト'],
'waist': ['腰圍', 'Waist', 'ウエスト'],
'hip': ['臀圍', 'Hips', 'ヒップ'],
'hobby': ['愛好', 'Hobby', '趣味']
}
# 提取演员名称
name_span = avatar_box.find('span', class_='pb10')
if name_span:
avatar_info['name'] = name_span.get_text(strip=True)
else:
logging.debug("未找到演员名称")
# 提取生日、年龄等信息
p_tags = avatar_box.find_all('p')
for p in p_tags:
text = p.get_text(strip=True)
# 使用正则表达式匹配冒号前后的内容
match = re.search(r'^(.*?)[:](.*)$', text)
if match:
key = match.group(1).strip()
value = match.group(2).strip()
# 查找对应的目标键名
target_key = next((k for k, v in field_mapping.items() if any(x in key for x in v)), None)
if target_key:
# 特殊处理数字类型和单位转换
if target_key in ['age', 'height', 'bust', 'waist', 'hip']:
# 提取数字部分
num_match = re.search(r'(\d+\.?\d*)', value)
if num_match:
try:
avatar_info[target_key] = float(num_match.group(1))
# 保留整数(如果是整数)
if avatar_info[target_key].is_integer():
avatar_info[target_key] = int(avatar_info[target_key])
except ValueError:
logging.debug(f"转换数字失败: {value}")
avatar_info[target_key] = value
else:
logging.debug(f"未找到数字部分: {value}")
avatar_info[target_key] = value
else:
avatar_info[target_key] = value
else:
logging.debug(f"未知的演员信息类型: {key}")
else:
logging.debug(f"无法解析的演员信息: {text}")
avatar_info['measurements'] = f"{avatar_info.get('bust', '')}-{avatar_info.get('waist', '')}-{avatar_info.get('hip', '') }"
return avatar_info
def parse_movie_info(self, movie_box):
"""
解析影片信息
"""
movie_info = {}
try:
# 提取影片链接
href = movie_box.get('href')
if href:
movie_info['href'] = href
else:
logging.warning("未找到影片链接")
return None
# 提取图片链接
img_tag = movie_box.find('img')
if img_tag and 'src' in img_tag.attrs:
movie_info['cover_url'] = img_tag['src']
movie_info['title'] = img_tag['title']
else:
logging.warning("未找到影片图片链接")
# 提取标题、番号和发布日期
photo_info = movie_box.find('div', class_='photo-info')
if photo_info:
# 提取标题 (span标签中的文本排除date标签)
span_tag = photo_info.find('span')
if span_tag:
# 获取span下的纯文本内容 (不包含date标签)
title = ''.join(span_tag.find_all(text=True, recursive=False)).strip()
# 移除常见的分隔符模式
if title.endswith('\n\n /'):
clean_title = title[:-4].strip()
elif title.endswith('\n /'):
clean_title = title[:-3].strip()
else:
clean_title = title
movie_info['title'] = clean_title
# 提取番号和日期 (date标签)
date_tags = span_tag.find_all('date')
if len(date_tags) >= 2:
movie_info['serial_number'] = date_tags[0].get_text(strip=True)
movie_info['release_date'] = date_tags[1].get_text(strip=True)
else:
logging.warning(f"date标签数量不足无法提取番号和日期")
else:
logging.warning("未找到span标签")
else:
logging.warning("未找到影片信息区域")
except Exception as e:
logging.error(f"解析影片信息时发生错误: {str(e)}", exc_info=True)
return None
return movie_info
# 获取页面头部的信息
def parse_title_info(self, soup, href):
title_info = {}
try:
# 解析标题
b_tag = soup.select_one('.alert.alert-success.alert-common p b')
if not b_tag:
logging.warning(f'found no title. href: {href}')
else:
# 获取文本内容
title_text = b_tag.get_text(strip=True)
# 使用横线分割文本
parts = [part.strip() for part in title_text.split('-')]
# 定义"影片"的多种语言表示
video_keywords = ['影片', 'Video', '映画', 'Videos', 'Movies']
# 查找"影片"关键词的位置
video_index = next((i for i, part in enumerate(parts) if part in video_keywords), None)
if video_index is not None and video_index >= 2:
# 提取前两个元素作为工作室和角色
studio = parts[video_index - 2]
role = parts[video_index - 1]
title_info['title'] = studio
title_info['role'] = role
else:
logging.debug(f"无法按规则解析: {' - '.join(parts)}")
# 提取全部影片和已有磁力的数量
# 查找a标签
a_tags = soup.select('.alert.alert-success.alert-common a.mypointer')
if not a_tags:
logging.warning(f'found no movie cnt. href: {href}')
else:
for a in a_tags:
text = a.get_text(strip=True)
# 提取全部影片数量
if '全部影片' in text:
match = re.search(r'全部影片\s*(\d+)\s*', text)
if match:
title_info['movies_cnt'] = int(match.group(1))
# 提取已有磁力数量
if '已有磁力' in text:
match = re.search(r'已有磁力\s*(\d+)\s*', text)
if match:
title_info['magnet_cnt'] = int(match.group(1))
except Exception as e:
logging.warning(f"parse html error: {str(e)}, href: {href}", exc_info=True)
return title_info
# 获取演员详情
def parse_studios_labels_series_detail(self, soup, href):
"""
解析Javbus网页内容提取演员信息和影片列表
"""
result = {
'meta': {},
'movies': []
}
try:
# 解析标题
result['meta'] = self.parse_title_info(soup, href)
div_waterfall = soup.find('div', id='waterfall')
if not div_waterfall:
logging.warning(f"found no records. href: {href}")
else:
# 解析影片列表
movie_boxes = div_waterfall.find_all('a', class_='movie-box')
if movie_boxes:
for movie_box in movie_boxes:
movie_info = self.parse_movie_info(movie_box)
if movie_info:
result['movies'].append(movie_info)
else:
logging.debug(f"movie-box not found. href: {href}")
except Exception as e:
logging.warning(f"parse html error: {str(e)}, href: {href}", exc_info=True)
# 查找 "下一页" 按钮
next_url = None
div_link = soup.find("div", class_='text-center hidden-xs')
if div_link:
next_page_element = soup.find('a', id='next')
if next_page_element:
next_page_url = next_page_element['href']
next_url = urljoin(href, next_page_url)
return result, next_url
# 解析Javbus影片详情页内容
def parse_movie_detail(self, soup, href, title):
result = {
'title': title,
'href': href,
'serial_number': '',
'release_date': '',
'duration': '',
'studio': {'name': '', 'href': ''},
'label': {'name': '', 'href': ''},
'series': {'name': '', 'href': ''},
'tags': [],
'actors': []
}
try:
# 提取标题
div_container = soup.find('div', class_='container')
if not div_container:
logging.warning(f"found no container tag. href: {href}")
return None
title_element = div_container.find('h3')
if title_element:
result['title'] = title_element.get_text(strip=True)
else:
logging.debug("no title found. href: {href}")
# 提取基本信息(识别码、发行日期等)
info_div = div_container.find('div', class_='info')
if not info_div:
logging.warning(f"found no div info tag. href: {href}")
return None
# 定义字段映射关系(多种语言支持)
field_mapping = {
'serial_number': ['識別碼:', '识别码:', 'ID:', '品番:'],
'release_date': ['發行日期:', '发行日期:', 'Release Date:', '発売日:'],
'duration': ['長度:', '长度:', 'Length:', '収録時間:'],
'studio': ['製作商:', '制作商:', 'Studio:', 'メーカー:'],
'label': ['發行商:', '发行商:', 'Label:', 'レーベル:'],
'series': ['系列:', 'Series:', 'シリーズ:']
}
# 遍历所有p标签查找信息
p_tags = info_div.find_all('p')
for p in p_tags:
# 查找header标签
header = p.find('span', class_='header')
if header:
header_text = header.get_text(strip=True)
# 查找匹配的目标键名
target_key = next((k for k, v in field_mapping.items() if header_text in v), None)
if target_key:
# 获取值(处理文本和链接)
if target_key in ['studio', 'label', 'series']:
# 处理有链接的字段
a_tag = p.find('a')
if a_tag:
result[target_key]['name'] = a_tag.get_text(strip=True)
result[target_key]['href'] = a_tag.get('href', '')
else:
# 没有链接,直接获取文本
value_text = p.get_text(strip=True)
# 移除header文本
value_text = value_text.replace(header_text, '').strip()
result[target_key]['name'] = value_text
logging.debug(f"{header_text} 没有链接,直接提取文本")
else:
# 处理普通文本字段
value_text = p.get_text(strip=True)
# 移除header文本
value_text = value_text.replace(header_text, '').strip()
# 特殊处理:提取时长的数字部分(咱不处理)
if target_key == 'duration' and False:
num_match = re.search(r'(\d+)', value_text)
if num_match:
result[target_key] = num_match.group(1)
else:
result[target_key] = value_text
else:
result[target_key] = value_text
# 处理类别字段
tag_lables = info_div.find_all('label')
for item in tag_lables:
link = item.find('a')
if link:
genre = {
'name': link.get_text(strip=True),
'href': link.get('href', '')
}
result['tags'].append(genre)
# 提取演员信息
star_p = info_div.find('p', class_='star-show')
if star_p:
# 查找演员列表
star_list = star_p.find_next('ul')
if star_list:
star_items = star_list.find_all('div', class_='star-name')
for item in star_items:
link = item.find('a')
if link:
actor = {
'name': link.get_text(strip=True),
'href': link.get('href', '')
}
result['actors'].append(actor)
else:
logging.debug(f"actors not found.")
else:
logging.debug("no star-name area. href: {href}")
else:
logging.debug("no star-show area. href: {href}")
except Exception as e:
logging.warning(f"parse movie detail error. href: {href}, error: {str(e)}", exc_info=True)
return result
javbus_parser = JavbusCrawler()
def common_parser(html, page, **kwargs):
parser = "html.parser"
soup = BeautifulSoup(html, parser)
if not soup:
return None
if page == 'actor_list':
#parse_actors_list(self, soup, href)
#return list_data, next_url
return javbus_parser.parse_actors_list(soup, **kwargs)
elif page == 'actor_detail':
#parse_actor_detail(self, soup, href)
#return result, next_url
return javbus_parser.parse_actor_detail(soup, **kwargs)
elif page == 'movie_list':
#parse_studios_labels_series_detail(self, soup, href):
#return result, next_url
return javbus_parser.parse_studios_labels_series_detail(soup, **kwargs)
elif page == 'movies':
#parse_movie_detail(self, soup, href, title):
#return result
return javbus_parser.parse_movie_detail(soup, **kwargs)
else:
logging.warning(f"wrong page: {page}")
return None

View File

@ -0,0 +1,652 @@
import cloudscraper
import time
import json
import csv
import logging
import signal
import sys
import os
import re
from bs4 import BeautifulSoup
from requests.exceptions import RequestException
from functools import partial
import config
#import utils
# 定义基础 URL 和可变参数
host_url = "https://www.javdb.com"
actors_uncensored_base_url = f'{host_url}/actors/uncensored'
series_uncensored_base_url = f'{host_url}/series/uncensored'
makers_uncensored_base_url = f'{host_url}/makers/uncensored'
# 设置 headers 和 scraper
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
scraper = cloudscraper.create_scraper()
http_code_404 = 404
http_code_login = 401
http_code_local = 99
save_raw_html = True
load_from_local = True
def common_parser(html, page, **kwargs):
parser = "html.parser"
soup = BeautifulSoup(html, parser)
if not soup:
return None
if page == 'actor_list':
#parse_actors_uncensored(soup, href):
#return list_data, next_url
return parse_actors_uncensored(soup, **kwargs)
elif page == 'series_list':
#parse_series_uncensored(soup, href):
#return list_data, next_url
return parse_series_uncensored(soup, **kwargs)
elif page == 'series':
#parse_series_detail(soup, href):
#return list_data, next_url
return parse_series_detail(soup, **kwargs)
elif page == 'makers_list':
#parse_makers_uncensored(soup, href):
#return list_data, next_url
return parse_makers_uncensored(soup, **kwargs)
elif page == 'makers':
#parse_maker_detail(soup, href):
#return list_data, next_url
return parse_maker_detail(soup, **kwargs)
elif page == 'publisher':
#parse_publisher_detail(soup, href):
#return list_data, next_url
return parse_publisher_detail(soup, **kwargs)
elif page == 'actor':
#parse_actor_detail(soup, href):
#return actor, next_url
return parse_actor_detail(soup, **kwargs)
elif page == 'movies':
#parse_movie_detail(soup, href, title):
#return result
return parse_movie_detail(soup, **kwargs)
elif page == 'search':
#parse_uncensored(soup, href):
#return list_data, next_url
return parse_uncensored(soup, **kwargs)
else:
logging.warning(f"wrong page: {page}")
return None
'''
#使用 CloudScraper 进行网络请求,并执行页面验证,支持不同解析器和预处理
def fetch_page(url, validator, max_retries=3, parser="html.parser", preprocessor=None):
if load_from_local: # 从本地读取的逻辑
html = utils.read_raw_html(url)
if html:
# 预处理 HTML如果提供了 preprocessor
html_text = preprocessor(html) if preprocessor else html
soup = BeautifulSoup(html_text, parser)
if validator(soup): # 进行自定义页面检查
logging.debug(f"read from local. href: {url}")
return soup, http_code_local # 返回一个小于100的错误码表明是从本地返回的
for attempt in range(max_retries):
try:
if 'javdb.com' not in url.lower():
logging.error(f'wrong url format: {url}')
return None, None
response = scraper.get(url, headers=headers)
# 处理 HTTP 状态码
if response.status_code == 404:
logging.debug(f"Page not found (404): {url}")
return None, http_code_404 # 直接返回 404调用方可以跳过
response.raise_for_status() # 处理 HTTP 错误
# 检查是否发生跳转,比如到登录页面
if response.history:
logging.debug(f"Page redirected on {url}. Checking if it's a login page.")
soup = BeautifulSoup(response.text, parser)
# 判断是否为登录页面,
if soup.find('nav', class_='panel form-panel'):
logging.debug(f"Page redirected to login page on {url}.")
return None, http_code_login
if save_raw_html:
utils.write_raw_html(url, response.text)
# 预处理 HTML如果提供了 preprocessor
html_text = preprocessor(response.text) if preprocessor else response.text
soup = BeautifulSoup(html_text, parser)
if validator(soup): # 进行自定义页面检查
return soup, response.status_code
logging.warning(f"Validation failed on attempt {attempt + 1} for {url}")
except cloudscraper.exceptions.CloudflareChallengeError as e:
logging.error(f"Cloudflare Challenge Error on {url}: {e}, Retring...")
except cloudscraper.exceptions.CloudflareCode1020 as e:
logging.error(f"Access Denied (Error 1020) on {url}: {e}, Retring...")
except Exception as e:
logging.error(f"Unexpected error on {url}: {e}, Retring...")
logging.error(f'Fetching failed after max retries. {url}')
return None, None # 达到最大重试次数仍然失败
'''
# 修复 HTML 结构,去除多余标签并修正 <a> 标签,在获取人种的时候需要
def preprocess_html(html):
return html.replace('<br>', '').replace('<a ', '<a target="_blank" ')
# 通用的 HTML 结构验证器
def generic_validator(soup, tag, identifier, attr_type="id"):
if attr_type == "id":
return soup.find(tag, id=identifier) is not None
elif attr_type == "class":
return bool(soup.find_all(tag, class_=identifier))
elif attr_type == "name":
return bool(soup.find('select', {'name': identifier}))
return False
# 解析链接中的页码
def url_page_num(href):
if href is None:
return None
match = re.search(r'page=(\d+)', href)
if match:
next_page_number = int(match.group(1))
return next_page_number
else:
return None
# <span class="avatar" style="background-image: url(https://c0.jdbstatic.com/avatars/md/mdRn.jpg)"></span>
def parse_avatar_image(soup):
try:
span = soup.find("span", class_="avatar")
if not span:
return "" # 没有找到 <span> 元素,返回空字符串
style = span.get("style", "")
match = re.search(r'url\(["\']?(.*?)["\']?\)', style)
return match.group(1) if match else "" # 解析成功返回 URL否则返回空字符串
except Exception as e:
return "" # 发生异常时,返回空字符串
# 解析 HTML 内容,提取需要的数据
def parse_actors_uncensored(soup, href):
div_actors = soup.find("div", id='actors')
if not div_actors:
logging.warning(f"Warning: No actors div found ")
return None, None
# 解析元素
rows = div_actors.find_all('div', class_='box actor-box')
list_data = []
next_url = None
for row in rows:
# 获取演员详情链接
actor_link = row.find('a')['href']
# 获取演员名字
actor_name = row.find('strong').text.strip()
# 获取头像图片链接
avatar_url = row.find('img', class_='avatar')['src']
# 获取 title 属性中的别名
alias_list = row.find('a')['title'].split(", ")
list_data.append({
'name' : actor_name,
'href' : host_url + actor_link if actor_link else '',
'pic' : avatar_url,
'alias': alias_list
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_actor_detail(soup, href):
# 先找一下别名
alias_list = []
div_meta = soup.find('span', class_='actor-section-name')
if not div_meta:
logging.warning(f'warning: no meta data found in page {href}')
return None, None
alias_div = soup.find('div', class_='column section-title')
if alias_div:
meta_list = alias_div.find_all('span', class_='section-meta')
if len(meta_list) > 1:
alias_list = meta_list[0].text.strip().split(", ")
# 头像
pic = ''
avatar = soup.find("div", class_="column actor-avatar")
if avatar:
pic = parse_avatar_image(avatar)
# 返回数据
actor = {}
# 使用正则表达式查找 class 包含 'movie-list h cols-4' 的 div 元素
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-'))
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-8')
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return None, None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
logging.debug(f'current_page: {current_page_number}, next page_num: {next_page_number}')
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
actor = {
'pic' : pic,
'alias' : alias_list,
'movies' : list_data
}
return actor, next_url
# 解析 HTML 内容,提取需要的数据
def parse_movie_detail_old(soup, href, title):
div_video = soup.find("div", class_='video-meta-panel')
if not div_video:
logging.warning(f"Warning: No movies div found ")
return None, None
# 获取封面图片
cover_img = soup.select_one('.column-video-cover a')
cover_url = cover_img['href'] if cover_img else None
# 获取番号
serial = soup.select_one('.panel-block:first-child .value')
serial_number = serial.text.strip() if serial else None
# 获取日期
date = soup.select_one('.panel-block:nth-of-type(2) .value')
release_date = date.text.strip() if date else None
# 获取时长
duration = soup.select_one('.panel-block:nth-of-type(3) .value')
video_duration = duration.text.strip() if duration else None
# 获取片商
maker = soup.select_one('.panel-block:nth-of-type(4) .value a')
maker_name = maker.text.strip() if maker else None
maker_link = maker['href'] if maker else None
# 获取系列
series = soup.select_one('.panel-block:nth-of-type(5) .value a')
series_name = series.text.strip() if series else None
series_link = series['href'] if series else None
# 获取演员(名字 + 链接)
actors = [{'name': actor.text.strip(), 'href': host_url + actor['href']} for actor in soup.select('.panel-block:nth-of-type(8) .value a')]
return {
'href' : href,
'title' : title,
'cover_url': cover_url,
'serial_number': serial_number,
'release_date': release_date,
'duration': video_duration,
'maker_name': maker_name,
'maker_link': host_url + maker_link if maker_link else '',
'series_name': series_name,
'series_link': host_url + series_link if series_link else '',
'actors': actors
}
# 解析单个元素
def parse_movie_one(soup, keys):
key_strong = soup.find('strong', string=lambda text: text in keys)
if key_strong:
key_span = key_strong.find_next_sibling('span', class_='value')
if key_span:
return key_span.text.strip()
return None
# 解析值和链接
def parse_movie_val_href(soup, keys):
key_strong = soup.find('strong', string=lambda text: text in keys)
if key_strong:
key_span = key_strong.find_next_sibling('span', class_='value')
if key_span:
a_tag = key_span.find('a')
if a_tag:
return a_tag.text.strip(), host_url + a_tag.get('href')
else:
return key_span.text.strip(), None
return None, None
# 解析多个值和链接
def parse_movie_arr(soup, keys):
key_strong = soup.find('strong', string=lambda text: text in keys)
if key_strong:
key_span = key_strong.find_next_sibling('span', class_='value')
if key_span:
actors = []
a_tags = key_span.find_all('a')
for a_tag in a_tags:
actors.append({
'name': a_tag.text.strip(),
'href': host_url + a_tag.get('href')
})
return actors
return []
# 解析 HTML 内容,提取需要的数据
def parse_movie_detail(soup, href, title):
div_video = soup.find("div", class_='video-meta-panel')
if not div_video:
logging.warning(f"Warning: No movies div found ")
return None, None
result = {}
result['href'] = href
result['title'] = title
# 获取封面图片
cover_img = soup.select_one('.column-video-cover a')
result['cover_url'] = cover_img['href'] if cover_img else None
# 获取番号
result['serial_number'] = parse_movie_one(soup, ['番號:', 'ID:'])
result['release_date'] = parse_movie_one(soup, ['日期:', 'Released Date:'])
result['duration'] = parse_movie_one(soup, ['時長:', 'Duration:'])
# 获取maker系列
result['maker_name'], result['maker_link'] = parse_movie_val_href(soup, ['片商:', 'Maker:'])
result['series_name'], result['series_link'] = parse_movie_val_href(soup, ['系列:', 'Series:'])
result['pub_name'], result['pub_link'] = parse_movie_val_href(soup, ['發行:', 'Publisher:'])
# 获取演员tags
result['tags'] = parse_movie_arr(soup, ['類別:', 'Tags:'])
result['actors'] = parse_movie_arr(soup, ['演員:', 'Actor(s):'])
return result
# 解析 HTML 内容,提取需要的数据
def parse_series_uncensored(soup, href):
div_series = soup.find("div", id='series')
if not div_series:
logging.warning(f"Warning: No div_series div found ")
return None, None
# 解析元素
rows = div_series.find_all('a', class_='box')
list_data = []
next_url = None
for row in rows:
name = row.find('strong').text.strip()
href = row['href']
div_movies = row.find('span')
movies = 0
if div_movies:
match = re.search(r'\((\d+)\)', div_movies.text.strip())
if match:
movies = int(match.group(1))
list_data.append({
'name' : name,
'href' : host_url + href if href else '',
'movies' : movies
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_series_detail(soup, href):
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_makers_uncensored(soup, href):
div_series = soup.find("div", id='makers')
if not div_series:
logging.warning(f"Warning: No makers div found ")
return None, None
# 解析元素
rows = div_series.find_all('a', class_='box')
list_data = []
next_url = None
for row in rows:
name = row.find('strong').text.strip()
href = row['href']
div_movies = row.find('span')
movies = 0
if div_movies:
match = re.search(r'\((\d+)\)', div_movies.text.strip())
if match:
movies = int(match.group(1))
list_data.append({
'name' : name,
'href' : host_url + href if href else '',
'movies' : movies
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_maker_detail(soup, href):
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_publisher_detail(soup, href):
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-5')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url
# 解析 HTML 内容,提取需要的数据
def parse_uncensored(soup, href):
#div_movies = soup.find("div", class_='movie-list h cols-4 vcols-8')
div_movies = soup.find("div", class_=re.compile(r'movie-list h cols-4 vcols-(5|8)'))
if not div_movies:
logging.warning(f"Warning: No movies div found ")
return [], None
# 解析元素
rows = div_movies.find_all('div', class_='item')
list_data = []
next_url = None
for row in rows:
link = row.find('a', class_='box')['href']
serial_number = row.find('strong').text.strip()
title = row.find('div', class_='video-title').text.strip()
release_date = row.find('div', class_='meta').text.strip()
list_data.append({
'href' : host_url + link if link else '',
'serial_number' : serial_number,
'title' : title,
'release_date': release_date
})
# 查找 "下一页" 按钮
next_page_element = soup.find('a', class_='pagination-next')
if next_page_element:
next_page_url = next_page_element['href']
next_page_number = url_page_num(next_page_url)
current_page_number = url_page_num(href)
if current_page_number is None:
current_page_number = 0
if next_page_number and next_page_number > current_page_number :
next_url = host_url + next_page_url
return list_data, next_url

View File

@ -0,0 +1,76 @@
import sqlite3
import re
def camel_case(table_name):
"""将下划线命名转换为驼峰式命名"""
parts = table_name.split('_')
return ''.join(part.capitalize() for part in parts)
def generate_scrapy_items(db_path, output_file):
"""
从SQLite数据库生成Scrapy Item类
:param db_path: SQLite数据库路径
:param output_file: 生成的Item文件路径
"""
# 要忽略的字段
IGNORED_FIELDS = {'id', 'created_at', 'updated_at'}
# 连接数据库
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 获取所有表名
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = [row[0] for row in cursor.fetchall() if not row[0].startswith('sqlite_')]
tables.sort() # 按表名字母序排序
# 生成Item代码
item_code = ["import scrapy\n\n\n"]
for table in tables:
# 获取表的字段信息
cursor.execute(f"PRAGMA table_info({table});")
columns = cursor.fetchall()
# 过滤需要忽略的字段
filtered_columns = [col[1] for col in columns if col[1] not in IGNORED_FIELDS]
if not filtered_columns:
continue # 跳过没有有效字段的表
# 生成类名(驼峰式 + Item后缀
class_name = f"{camel_case(table)}Item"
# 添加类定义
item_code.append(f"class {class_name}(scrapy.Item):\n")
# 添加item_type字段固定
#item_code.append(" item_type = scrapy.Field()\n")
# 添加其他字段
for col in filtered_columns:
item_code.append(f" {col} = scrapy.Field()\n")
# 可以手动添加一些字段,比如关联的结构体等等
item_code.append(" # 以下为手动添加字段\n")
# 类之间添加空行
item_code.append("\n")
# 关闭数据库连接
conn.close()
# 写入生成的代码到文件
with open(output_file, 'w', encoding='utf-8') as f:
f.write(''.join(item_code))
print(f"成功生成 {len(tables)} 个Item类到 {output_file}")
if __name__ == "__main__":
# 数据库路径(可根据实际情况修改)
DB_PATH = "/root/sharedata/sqlite/shared.db"
# 输出文件路径
OUTPUT_FILE = "generated_items.py"
generate_scrapy_items(DB_PATH, OUTPUT_FILE)

View File

@ -0,0 +1,407 @@
import scrapy
class IafdDistributorsItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
parent_id = scrapy.Field()
details = scrapy.Field()
# 以下为手动添加字段
class IafdMetaEthnicItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
# 以下为手动添加字段
class IafdMoviesItem(scrapy.Item):
title = scrapy.Field()
minutes = scrapy.Field()
distributor_id = scrapy.Field()
studio_id = scrapy.Field()
release_date = scrapy.Field()
added_to_IAFD_date = scrapy.Field()
all_girl = scrapy.Field()
all_male = scrapy.Field()
compilation = scrapy.Field()
webscene = scrapy.Field()
director_id = scrapy.Field()
href = scrapy.Field()
is_full_data = scrapy.Field()
release_year = scrapy.Field()
from_performer_list = scrapy.Field()
from_dist_list = scrapy.Field()
from_stu_list = scrapy.Field()
# 以下为手动添加字段
class IafdMoviesAppersInItem(scrapy.Item):
movie_id = scrapy.Field()
appears_in_id = scrapy.Field()
gradation = scrapy.Field()
notes = scrapy.Field()
# 以下为手动添加字段
class IafdPerformerAliasesItem(scrapy.Item):
performer_id = scrapy.Field()
alias = scrapy.Field()
# 以下为手动添加字段
class IafdPerformerUrlsItem(scrapy.Item):
performer_id = scrapy.Field()
position = scrapy.Field()
url = scrapy.Field()
# 以下为手动添加字段
class IafdPerformersItem(scrapy.Item):
name = scrapy.Field()
gender = scrapy.Field()
birthday = scrapy.Field()
astrology = scrapy.Field()
birthplace = scrapy.Field()
years_active = scrapy.Field()
ethnicity = scrapy.Field()
nationality = scrapy.Field()
hair_colors = scrapy.Field()
eye_color = scrapy.Field()
height_str = scrapy.Field()
weight_str = scrapy.Field()
measurements = scrapy.Field()
tattoos = scrapy.Field()
piercings = scrapy.Field()
fake_tits = scrapy.Field()
href = scrapy.Field()
weight = scrapy.Field()
height = scrapy.Field()
rating = scrapy.Field()
movies_cnt = scrapy.Field()
vixen_cnt = scrapy.Field()
blacked_cnt = scrapy.Field()
tushy_cnt = scrapy.Field()
x_art_cnt = scrapy.Field()
is_full_data = scrapy.Field()
birth_year = scrapy.Field()
from_astro_list = scrapy.Field()
from_birth_list = scrapy.Field()
from_ethnic_list = scrapy.Field()
from_movie_list = scrapy.Field()
# 以下为手动添加字段
class IafdPerformersMoviesItem(scrapy.Item):
performer_id = scrapy.Field()
movie_id = scrapy.Field()
role = scrapy.Field()
notes = scrapy.Field()
# 以下为手动添加字段
class IafdStudiosItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
parent_id = scrapy.Field()
details = scrapy.Field()
# 以下为手动添加字段
class IafdTaskLogItem(scrapy.Item):
task_id = scrapy.Field()
full_data_performers = scrapy.Field()
total_performers = scrapy.Field()
full_data_movies = scrapy.Field()
total_movies = scrapy.Field()
total_distributors = scrapy.Field()
total_studios = scrapy.Field()
task_status = scrapy.Field()
# 以下为手动添加字段
class JavbusActorsItem(scrapy.Item):
ja_name = scrapy.Field()
zh_name = scrapy.Field()
en_name = scrapy.Field()
href = scrapy.Field()
pic = scrapy.Field()
birth_date = scrapy.Field()
height = scrapy.Field()
breast_size = scrapy.Field()
measurements = scrapy.Field()
uncensored = scrapy.Field()
is_full_data = scrapy.Field()
from_actor_list = scrapy.Field()
from_movie_list = scrapy.Field()
movies_cnt = scrapy.Field()
# 以下为手动添加字段
class JavbusActorsMoviesItem(scrapy.Item):
actor_id = scrapy.Field()
movie_id = scrapy.Field()
tags = scrapy.Field()
# 以下为手动添加字段
class JavbusLabelsItem(scrapy.Item):
name = scrapy.Field()
en_name = scrapy.Field()
ja_name = scrapy.Field()
href = scrapy.Field()
details = scrapy.Field()
uncensored = scrapy.Field()
from_list = scrapy.Field()
from_movie_list = scrapy.Field()
movies_cnt = scrapy.Field()
magnet_cnt = scrapy.Field()
# 以下为手动添加字段
class JavbusMoviesItem(scrapy.Item):
href = scrapy.Field()
title = scrapy.Field()
cover_url = scrapy.Field()
serial_number = scrapy.Field()
release_date = scrapy.Field()
duration = scrapy.Field()
studio_id = scrapy.Field()
label_id = scrapy.Field()
series_id = scrapy.Field()
is_full_data = scrapy.Field()
uncensored = scrapy.Field()
from_actor_list = scrapy.Field()
from_movie_studios = scrapy.Field()
from_movie_labels = scrapy.Field()
from_movie_series = scrapy.Field()
actors_cnt = scrapy.Field()
# 以下为手动添加字段
class JavbusMoviesTagsItem(scrapy.Item):
movie_id = scrapy.Field()
tag_id = scrapy.Field()
tags = scrapy.Field()
# 以下为手动添加字段
class JavbusSeriesItem(scrapy.Item):
name = scrapy.Field()
en_name = scrapy.Field()
ja_name = scrapy.Field()
href = scrapy.Field()
details = scrapy.Field()
uncensored = scrapy.Field()
from_list = scrapy.Field()
from_movie_list = scrapy.Field()
movies_cnt = scrapy.Field()
magnet_cnt = scrapy.Field()
# 以下为手动添加字段
class JavbusStudiosItem(scrapy.Item):
name = scrapy.Field()
en_name = scrapy.Field()
ja_name = scrapy.Field()
href = scrapy.Field()
details = scrapy.Field()
uncensored = scrapy.Field()
from_list = scrapy.Field()
from_movie_list = scrapy.Field()
movies_cnt = scrapy.Field()
magnet_cnt = scrapy.Field()
# 以下为手动添加字段
class JavbusTagsItem(scrapy.Item):
name = scrapy.Field()
en_name = scrapy.Field()
ja_name = scrapy.Field()
href = scrapy.Field()
# 以下为手动添加字段
class JavdbActorsItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
pic = scrapy.Field()
is_full_data = scrapy.Field()
from_actor_list = scrapy.Field()
from_movie_list = scrapy.Field()
# 以下为手动添加字段
class JavdbActorsAliasItem(scrapy.Item):
actor_id = scrapy.Field()
alias = scrapy.Field()
# 以下为手动添加字段
class JavdbActorsMoviesItem(scrapy.Item):
actor_id = scrapy.Field()
movie_id = scrapy.Field()
tags = scrapy.Field()
# 以下为手动添加字段
class JavdbMakersItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
parent_id = scrapy.Field()
details = scrapy.Field()
from_list = scrapy.Field()
from_movie_list = scrapy.Field()
# 以下为手动添加字段
class JavdbMoviesItem(scrapy.Item):
href = scrapy.Field()
title = scrapy.Field()
cover_url = scrapy.Field()
serial_number = scrapy.Field()
release_date = scrapy.Field()
duration = scrapy.Field()
maker_id = scrapy.Field()
series_id = scrapy.Field()
is_full_data = scrapy.Field()
from_actor_list = scrapy.Field()
from_movie_makers = scrapy.Field()
from_movie_series = scrapy.Field()
from_movie_publishers = scrapy.Field()
pub_id = scrapy.Field()
uncensored = scrapy.Field()
# 以下为手动添加字段
class JavdbMoviesTagsItem(scrapy.Item):
movie_id = scrapy.Field()
tag_id = scrapy.Field()
tags = scrapy.Field()
# 以下为手动添加字段
class JavdbPublishersItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
parent_id = scrapy.Field()
details = scrapy.Field()
from_list = scrapy.Field()
from_movie_list = scrapy.Field()
# 以下为手动添加字段
class JavdbSeriesItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
parent_id = scrapy.Field()
details = scrapy.Field()
from_list = scrapy.Field()
from_movie_list = scrapy.Field()
# 以下为手动添加字段
class JavdbTagsItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
# 以下为手动添加字段
class JavdbTaskLogItem(scrapy.Item):
task_id = scrapy.Field()
full_data_actors = scrapy.Field()
total_actors = scrapy.Field()
full_data_movies = scrapy.Field()
total_movies = scrapy.Field()
total_makers = scrapy.Field()
total_series = scrapy.Field()
task_status = scrapy.Field()
# 以下为手动添加字段
class JavhdModelsItem(scrapy.Item):
rank = scrapy.Field()
ja_name = scrapy.Field()
zh_name = scrapy.Field()
en_name = scrapy.Field()
url = scrapy.Field()
pic = scrapy.Field()
height = scrapy.Field()
weight = scrapy.Field()
breast_size = scrapy.Field()
breast_factor = scrapy.Field()
hair_color = scrapy.Field()
eye_color = scrapy.Field()
birth_date = scrapy.Field()
ethnicity = scrapy.Field()
birth_place = scrapy.Field()
is_full_data = scrapy.Field()
# 以下为手动添加字段
class PboxActorAliasesItem(scrapy.Item):
actor_id = scrapy.Field()
alias = scrapy.Field()
actor_alias = scrapy.Field()
# 以下为手动添加字段
class PboxActorsItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
gender = scrapy.Field()
age = scrapy.Field()
nationality = scrapy.Field()
country = scrapy.Field()
movies_cnt = scrapy.Field()
is_full_data = scrapy.Field()
# 以下为手动添加字段
class PboxActorsMoviesItem(scrapy.Item):
actor_id = scrapy.Field()
movie_id = scrapy.Field()
actor_mov = scrapy.Field()
tags = scrapy.Field()
# 以下为手动添加字段
class PboxMoviesItem(scrapy.Item):
href = scrapy.Field()
title = scrapy.Field()
movie_id = scrapy.Field()
content_id = scrapy.Field()
duration = scrapy.Field()
publish_date = scrapy.Field()
release_date = scrapy.Field()
studio_id = scrapy.Field()
is_full_data = scrapy.Field()
# 以下为手动添加字段
class PboxMoviesAltsItem(scrapy.Item):
min_mov_id = scrapy.Field()
max_mov_id = scrapy.Field()
min_max = scrapy.Field()
# 以下为手动添加字段
class PboxMoviesTagsItem(scrapy.Item):
movie_id = scrapy.Field()
tag_id = scrapy.Field()
movid_tagid = scrapy.Field()
tags = scrapy.Field()
# 以下为手动添加字段
class PboxStudiosItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
label_id = scrapy.Field()
scene_count = scrapy.Field()
description = scrapy.Field()
# 以下为手动添加字段
class PboxTagsItem(scrapy.Item):
name = scrapy.Field()
href = scrapy.Field()
tag_id = scrapy.Field()
# 以下为手动添加字段
class ThelordofpornActressItem(scrapy.Item):
pornstar = scrapy.Field()
rating = scrapy.Field()
rank = scrapy.Field()
votes = scrapy.Field()
href = scrapy.Field()
career_start = scrapy.Field()
measurements = scrapy.Field()
born = scrapy.Field()
height = scrapy.Field()
weight = scrapy.Field()
date_modified = scrapy.Field()
global_rank = scrapy.Field()
weekly_rank = scrapy.Field()
last_month_rating = scrapy.Field()
current_rating = scrapy.Field()
total_votes = scrapy.Field()
birth_date = scrapy.Field()
birth_year = scrapy.Field()
birth_place = scrapy.Field()
height_ft = scrapy.Field()
height_cm = scrapy.Field()
weight_lbs = scrapy.Field()
weight_kg = scrapy.Field()
is_full_data = scrapy.Field()
# 以下为手动添加字段
class ThelordofpornAliasItem(scrapy.Item):
actress_id = scrapy.Field()
alias = scrapy.Field()
# 以下为手动添加字段

View File

@ -4,6 +4,17 @@ import os
from datetime import datetime, timezone
from urllib.parse import urlparse, urlunparse, parse_qs, urlencode
def is_valid_url(url: str) -> bool:
"""检查 URL 是否合法"""
try:
result = urlparse(url)
# 验证是否包含 scheme如 http/https和 netloc如 example.com
return all([result.scheme, result.netloc])
except ValueError:
return False
def load_json_file(file_path):
# 检查文件是否存在
if not os.path.exists(file_path):
@ -136,4 +147,86 @@ def pretty_json_simple(item):
except:
# 转换失败时返回原始字符串
return item
# javbus 使用处理多语言url归一化
def normalize_url(url: str) -> str:
"""
标准化URL移除语言前缀使不同语言版本的URL保持一致
示例:
https://www.javbus.com/ja/star/p8y → https://www.javbus.com/star/p8y
https://www.javbus.com/en/star/p8y → https://www.javbus.com/star/p8y
"""
try:
# 解析URL
parsed = urlparse(url)
# 提取路径部分
path = parsed.path
# 常见语言代码列表
LANGUAGES = {'ja', 'en', 'ko', 'zh', 'fr', 'de', 'es', 'ru'}
# 分割路径为组件
path_components = path.strip('/').split('/')
# 如果第一个组件是语言代码,则移除它
if path_components and path_components[0] in LANGUAGES:
path_components = path_components[1:]
# 重新构建标准化的路径
normalized_path = '/' + '/'.join(path_components)
# 构建标准化的URL保留协议和域名替换路径
normalized_url = parsed._replace(path=normalized_path).geturl()
return normalized_url
except Exception as e:
print(f"URL标准化失败: {url}, 错误: {e}")
return url # 出错时返回原始URL
# javbus使用归一化的url转为多语言
def generate_multilang_urls(url, languages=['en', 'ja']):
"""
根据给定的URL生成多语言版本的URL
Args:
url (str): 原始URL
languages (list): 需要生成的语言代码列表
Returns:
list: 包含多语言URL的列表
"""
try:
# 解析URL
parsed = urlparse(url)
path = parsed.path
# 处理以斜杠开头的路径
if path.startswith('/'):
path = path[1:] # 移除开头的斜杠
# 生成多语言URL
result = {}
for lang in languages:
# 构建新的路径:语言代码 + 原始路径
new_path = f'/{lang}/{path}'
# 构建新的URL
new_url = urlunparse((
parsed.scheme,
parsed.netloc,
new_path,
parsed.params,
parsed.query,
parsed.fragment
))
result[lang] = new_url
return result
except Exception as e:
print(f"生成多语言URL时出错: {e}")
return {}