This repository has been archived on 2026-01-07. You can view files and clone it, but cannot push or open issues or pull requests.
Files
resources/scrapy_proj/scrapy_proj/db_wapper/spider_db_handler.py
2025-07-28 11:14:34 +08:00

1728 lines
72 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sqlite3
import json
import logging
from datetime import datetime
from typing import List, Dict
from scrapy_proj.db_wapper.sqlite_base import SQLiteDBHandler, default_dbpath, shared_db_path, test_db_path
import scrapy_proj.comm.comm_def as comm
import scrapy_proj.items as items_def
from scrapy_proj.utils.utils import pretty_json_simple, is_valid_url
# 注册器字典
spider_handler_registry = {}
def register_handler(spider_name):
def decorator(cls):
spider_handler_registry[spider_name.lower()] = cls
return cls
return decorator
@register_handler(comm.SPIDER_NAME_SIS)
class SisDBHandler(SQLiteDBHandler):
def __init__(self, db_path=default_dbpath):
super().__init__(db_path)
self.tbl_name_sis = 'sis'
def insert_item(self, item):
self.insert_or_update_common(item, tbl_name=self.tbl_name_sis, uniq_key='url', exists_do_nothing=True)
# 统计函数
def get_stat(self):
try:
self.cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM {self.tbl_name_sis}) AS cnt
""")
row = self.cursor.fetchone()
if not row:
logging.warning(f"query no results.")
return {}
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return {}
@register_handler(comm.SPIDER_NAME_U3C3)
class U3C3DBHandler(SQLiteDBHandler):
def __init__(self, db_path=default_dbpath):
super().__init__(db_path)
self.tbl_name_u3c3 = 'u3c3'
def insert_item(self, item):
self.insert_or_update_common(item, tbl_name=self.tbl_name_u3c3, uniq_key='url', exists_do_nothing=True)
# 统计函数
def get_stat(self):
try:
self.cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM {self.tbl_name_u3c3}) AS cnt
""")
row = self.cursor.fetchone()
if not row:
logging.warning(f"query no results.")
return {}
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return {}
@register_handler(comm.SPIDER_NAME_CLM)
class ClmDBHandler(SQLiteDBHandler):
def __init__(self, db_path=default_dbpath):
super().__init__(db_path)
self.tbl_name_clm_index = 'clm_index'
self.tbl_name_clm_keywords = 'clm_keywords'
self.tbl_name_words_index = 'clm_keywords_index'
def insert_item(self, item):
if item['item_type'] == comm.ITEM_TYPE_CLM_INDEX:
self.insert_index(item)
elif item['item_type'] == comm.ITEM_TYPE_CLM_KEYWORDS:
self.insert_or_update_common(item, self.tbl_name_clm_keywords, uniq_key='words', exists_do_nothing=False)
else:
logging.error(f"unkown item.")
return item
def insert_index(self, item):
if item['is_update']: # 仅更新
self.insert_or_update_common(item, self.tbl_name_clm_index, uniq_key='href', exists_do_nothing=False)
return
row_id = self.insert_or_update_common(item, self.tbl_name_clm_index, uniq_key='href', exists_do_nothing=True)
if row_id:
lnk_data = {}
lnk_data['words_id'] = item['key_words_id']
lnk_data['index_id'] = row_id
lnk_data['wid_iid'] = f"{item['key_words_id']}_{row_id}"
lnk_data['tags'] = item['key_words']
lnk_id = self.insert_or_update_common(lnk_data, self.tbl_name_words_index, uniq_key='wid_iid', exists_do_nothing=True)
if lnk_id:
logging.debug(f"insert one item: {lnk_data}")
else:
logging.warning(f"insert item error: {lnk_data}")
else:
logging.warning(f"insert index error: {item}")
def get_empty_title(self):
try:
self.cursor.execute(f"SELECT id, href FROM {self.tbl_name_clm_index} WHERE title='' ")
return [dict(row) for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
def get_count_by_keywords_id(self, key_words_id):
self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_name_words_index} WHERE words_id = ?", (key_words_id,))
row = self.cursor.fetchone()
return row[0] if row else None
# 按条件查询 href 列表
def get_key_words(self, **filters):
try:
sql = f"SELECT id, words, groups FROM {self.tbl_name_clm_keywords} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"words": " AND words LIKE ?",
"groups": " AND groups LIKE ?",
"tags": " AND tags LIKE ?",
"start_id": " AND id > ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "words" or key == 'groups' or key == 'tags':
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
if "query_str" in filters:
sql += f" AND {filters['query_str']} "
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [dict(row) for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 统计函数
def get_stat(self):
try:
self.cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM {self.tbl_name_clm_keywords}) AS words,
(SELECT COUNT(*) FROM {self.tbl_name_clm_index}) AS magnets,
(SELECT COUNT(*) FROM {self.tbl_name_words_index}) AS wd_mag
""")
row = self.cursor.fetchone()
if not row:
logging.warning(f"query no results.")
return {}
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return {}
@register_handler(comm.SPIDER_NAME_PBOX)
class PboxDBHandler(SQLiteDBHandler):
def __init__(self, db_path=shared_db_path):
super().__init__(db_path)
#self._create_tables()
self.tbl_studios = 'pbox_studios'
self.tbl_movies = 'pbox_movies'
self.tbl_tags = 'pbox_tags'
self.tbl_mov_tags = 'pbox_movies_tags'
self.tbl_mov_alts = 'pbox_movies_alts'
self.tbl_actor = 'pbox_actors'
self.tbl_actor_alias = 'pbox_actor_aliases'
self.tbl_actor_mov = 'pbox_actors_movies'
def _create_tables(self):
pass
def insert_item(self, item):
if item['item_type'] == comm.ITEM_TYPE_STUDIO:
self.insert_or_update_common(item, self.tbl_studios, uniq_key='href', exists_do_nothing=True)
elif item['item_type'] == comm.ITEM_TYPE_MOVIE_DETAIL:
self.insert_movie(item)
else:
logging.error(f"unkown item.")
return item
# 插入电影明细,逻辑比较复杂
def insert_movie(self, item):
# 插入电影
mov_id = self.insert_or_update_common(item, self.tbl_movies, uniq_key='href', exists_do_nothing=False)
if not mov_id:
logging.error(f"insert movie error. movid: {item['movie_id']}, url: {item['href']}")
return
# 插入演员
for actor in item.get('actor_index_list', []):
actor_id = self.get_id_by_key(self.tbl_actor, uniq_key='href', val=actor['href'])
if not actor_id:
actor_id = self.insert_or_update_common(actor, self.tbl_actor, uniq_key='href', exists_do_nothing=True)
if actor_id:
lnk_id = self.insert_or_update_common({'movie_id':mov_id, 'actor_id':actor_id, 'actor_mov': f'{actor_id}_{mov_id}'}, tbl_name=self.tbl_actor_mov, uniq_key='actor_mov', exists_do_nothing=True)
if not lnk_id:
logging.error(f"insert actor_movie error. actor id: {actor_id}, mov id: {mov_id}")
# 插入tags
for tag in item.get('mov_tags_list', []):
tag_id = self.get_id_by_key(self.tbl_tags, uniq_key='href', val=tag['href'])
if not tag_id:
tag_id = self.insert_or_update_common(tag, self.tbl_tags, uniq_key='href')
if tag_id:
lnk_id = self.insert_or_update_common({'movie_id':mov_id, 'tag_id':tag_id, 'movid_tagid': f'{mov_id}_{tag_id}', 'tags':''}, tbl_name=self.tbl_mov_tags, uniq_key='movid_tagid', exists_do_nothing=True)
if not lnk_id:
logging.error(f"insert movie_tag error. tag id: {tag_id}, mov id: {mov_id}")
# 插入别名
for alt in item.get('mov_alt_list', []):
min_max = f"{alt['min_mov_id']}_{alt['max_mov_id']}"
lnk_id = self.insert_or_update_common({'min_mov_id':alt['min_mov_id'], 'max_mov_id':alt['max_mov_id'], 'min_max': min_max}, tbl_name=self.tbl_mov_alts, uniq_key='min_max', exists_do_nothing=True)
if not lnk_id:
logging.error(f"insert movie_alt error. item: {alt}")
def get_studios(self, **filters):
try:
sql = f"SELECT href, name, id, label_id, scene_count FROM {self.tbl_studios} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"name": " AND name LIKE ?",
"start_id": " AND id > ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "name":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [dict(row) for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return []
def get_stu_mov_count(self, stu_id):
self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_movies} WHERE studio_id = ?", (stu_id,))
row = self.cursor.fetchone()
return row[0] if row else None
# 统计函数
def get_stat(self):
try:
self.cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM {self.tbl_studios}) AS studios,
(SELECT COUNT(*) FROM {self.tbl_movies}) AS movies,
(SELECT COUNT(*) FROM {self.tbl_actor}) AS actors,
(SELECT COUNT(*) FROM {self.tbl_actor_mov}) AS act_mov
""")
row = self.cursor.fetchone()
if not row:
logging.warning(f"query no results.")
return {}
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return {}
def close_spider(self, spider):
# 关闭数据库连接
self.conn.close()
@register_handler(comm.SPIDER_NAME_JAVHD)
class JavHDDBHandler(SQLiteDBHandler):
def __init__(self, db_path=shared_db_path):
super().__init__(db_path)
self.tbl_name_javhd = 'javhd_models'
def insert_item(self, item):
if item['item_type'] == comm.ITEM_TYPE_ACTOR_INDEX:
self.insert_or_update_common(item, self.tbl_name_javhd, uniq_key='url', exists_do_nothing=False)
elif item['item_type'] == comm.ITEM_TYPE_ACTOR_DETAIL:
self.insert_or_update_common(item, self.tbl_name_javhd, uniq_key='url', exists_do_nothing=False)
else:
logging.error(f"unkown item.")
return item
# 统计函数
def get_stat(self):
try:
self.cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM {self.tbl_name_javhd}) AS cnt
""")
row = self.cursor.fetchone()
if not row:
logging.warning(f"query no results.")
return {}
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return {}
def has_full_data(self, href):
try:
self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_name_javhd} WHERE is_full_data=1 and url = ?", (href,))
row = self.cursor.fetchone()
return row[0] if row else None
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return 0
@register_handler(comm.SPIDER_NAME_LORD)
class LordDBHandler(SQLiteDBHandler):
def __init__(self, db_path=shared_db_path):
super().__init__(db_path)
self.tbl_name_actors = 'thelordofporn_actress'
self.tbl_name_alias = 'thelordofporn_alias'
def insert_item(self, item):
if item['item_type'] == comm.ITEM_TYPE_ACTOR_DETAIL:
self.insert_actor(item)
else:
logging.error(f"unkown item.")
return item
def insert_actor(self, item):
actor_id = self.insert_or_update_common(item, self.tbl_name_actors, uniq_key='href', exists_do_nothing=False)
if actor_id:
for alias in item.get('alias', []):
alias_data = {'actress_id':actor_id, 'alias':alias}
affected_rows = self.insert_or_update_with_composite_pk(data=alias_data, tbl_name=self.tbl_name_alias, composite_pk=['actress_id','alias'], exists_do_nothing=False)
if affected_rows:
logging.debug(f"insert/update actress_alias. data: {alias_data}")
else:
logging.warning(f"insert actor alias error!. data: {alias_data}")
else:
logging.warning(f"insert actor data error! data: {pretty_json_simple(item)}")
# 统计函数
def get_stat(self):
try:
self.cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM {self.tbl_name_actors}) AS actor_cnt
""")
row = self.cursor.fetchone()
if not row:
logging.warning(f"query no results.")
return {}
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return {}
def has_full_data(self, href):
try:
self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_name_actors} WHERE is_full_data=1 and href = ?", (href,))
row = self.cursor.fetchone()
return row[0] if row else None
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return 0
@register_handler(comm.SPIDER_NAME_JAVBUS)
class JavBusDBHandler(SQLiteDBHandler):
def __init__(self, db_path=shared_db_path):
super().__init__(db_path)
self.tbl_name_actors = 'javbus_actors'
self.tbl_name_movies = 'javbus_movies'
self.tbl_name_studios = 'javbus_studios'
self.tbl_name_labels = 'javbus_labels'
self.tbl_name_series = 'javbus_series'
self.tbl_name_tags = 'javbus_tags'
self.tbl_name_movie_tags = 'javbus_movies_tags'
self.tbl_name_actor_movie = 'javbus_actors_movies'
def insert_item(self, item):
# 获取Item中所有定义的字段包括父类继承的
all_fields = item.fields.keys()
# 获取已被赋值的字段存储在Item的内部属性_values中
assigned_fields = set(item._values.keys())
# 过滤被赋值过的字段,其他预定义的字段不处理,这样在插入/更新时才不影响无关字段的值
processed_item = {}
for field in assigned_fields:
processed_item[field] = item[field]
if isinstance(item, items_def.JavbusActorsItem):
self.update_actor_detail(processed_item)
elif isinstance(item, items_def.JavbusMoviesItem):
self.insert_or_update_movie(processed_item)
elif isinstance(item, items_def.JavbusLabelsItem):
self.update_pubs_multilang(data=processed_item, tbl='label')
elif isinstance(item, items_def.JavbusStudiosItem):
self.update_pubs_multilang(data=processed_item, tbl='studio')
elif isinstance(item, items_def.JavbusSeriesItem):
self.update_pubs_multilang(data=processed_item, tbl='series')
elif isinstance(item, items_def.JavbusTagsItem):
self.update_pubs_multilang(data=processed_item, tbl='tags')
else:
logging.error(f"unkown item. {processed_item}")
return item
# 统计函数
def get_stat(self):
return self.get_statics()
def has_full_data(self, href):
try:
self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_name_actors} WHERE is_full_data=1 and href = ?", (href,))
row = self.cursor.fetchone()
return row[0] if row else None
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return 0
def insert_actor_index(self, data, **kwargs):
fields = ['uncensored', 'from_actor_list', 'from_movie_list']
# 如果没有传入值,就用原来的值
for field in fields:
if kwargs.get(field) is not None:
data[field] = kwargs.get(field)
try:
return self.insert_or_update_common(data, self.tbl_name_actors, uniq_key='href', exists_do_nothing=True)
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
def insert_movie_index(self, data, **kwargs):
fields = [
'uncensored', 'from_actor_list', 'from_movie_studios', 'from_movie_labels', 'from_movie_series',
'studio_id', 'label_id', 'series_id'
]
# 如果没有传入值,就用原来的值
for field in fields:
if kwargs.get(field) is not None:
data[field] = kwargs.get(field)
try:
return self.insert_or_update_common(data, self.tbl_name_movies, uniq_key='href')
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# 插入演员和电影的关联数据
def insert_actor_movie(self, performer_id, movie_id, tags=''):
return self.insert_or_update_with_composite_pk(
data={'actor_id':performer_id, 'movie_id':movie_id, 'tags':tags},
tbl_name = self.tbl_name_actor_movie,
composite_pk = ['actor_id', 'movie_id'],
exists_do_nothing = True
)
def update_actor_detail_404(self, data, is_full_data=1):
try:
data['is_full_data'] = is_full_data
return self.insert_or_update_common(data, self.tbl_name_actors, uniq_key='href')
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
def update_actor_detail(self, data, is_full_data=1):
try:
# 跟新actor表
avatar = data.get('avatar', {})
avatar['href'] = data['href']
avatar['is_full_data'] = is_full_data
avatar_id = self.insert_or_update_common(avatar, self.tbl_name_actors, uniq_key='href', exists_do_nothing=False)
if not avatar_id:
logging.warning(f"get actor id error. href: {data['href']}")
return None
else:
logging.debug(f"update actor data. href: {data['href']} avatar: {avatar}")
# 更新movies表
uncensored = data.get('uncensored', 0)
for movie in data.get('credits', []):
movie_id = self.insert_movie_index(movie, from_actor_list=1, uncensored=uncensored)
if movie_id:
logging.debug(f"insert one movie index. data: {movie}")
# 插入关系表
link_id = self.insert_actor_movie(avatar_id, movie_id)
if link_id:
logging.debug(f"insert one actor_movie record. actor id: {avatar_id}, movie id: {movie_id}")
return avatar_id
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
def query_actors(self, **filters):
try:
sql = f"SELECT href, en_name as name, uncensored, movies_cnt, id, is_full_data FROM {self.tbl_name_actors} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"en_name": " AND en_name LIKE ?",
"is_full_data": " AND is_full_data = ?",
"start_id": " AND id > ?",
"uncensored": " AND uncensored = ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "en_name":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
for key in ["is_full_data_in", "is_full_data_not_in"]:
if key in filters:
values = filters[key]
if values:
placeholders = ", ".join(["?"] * len(values))
operator = "IN" if key == "is_full_data_in" else "NOT IN"
sql += f" AND is_full_data {operator} ({placeholders})"
params.extend(values)
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [dict(row) for row in self.cursor.fetchall()]
#return [{'href': row[0], 'name': row[1], 'uncensored': row[2], 'movies_cnt':row[3]} for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
def query_movies(self, **filters):
try:
sql = f"SELECT href, title, uncensored, id, is_full_data FROM {self.tbl_name_movies} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"title": " AND title LIKE ?",
"is_full_data": " AND is_full_data = ?",
"start_id": " AND id > ?",
"uncensored": " AND uncensored = ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "title":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
for key in ["is_full_data_in", "is_full_data_not_in"]:
if key in filters:
values = filters[key]
if values:
placeholders = ", ".join(["?"] * len(values))
operator = "IN" if key == "is_full_data_in" else "NOT IN"
sql += f" AND is_full_data {operator} ({placeholders})"
params.extend(values)
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [dict(row) for row in self.cursor.fetchall()]
#return [{'href': row[0], 'title': row[1], 'uncensored': row[2], 'id':row[3]} for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 检查记录是否存在,不存在就插入
def check_and_get_id(self, item, uncensored, tbl, uniq_key='href'):
name = item['name']
href = item['href']
row_id = self.get_id_by_key(tbl, uniq_key, href)
if row_id is None:
row_id = self.insert_or_update_common({'name':name, 'href': href, 'uncensored':uncensored, 'from_movie_list':1}, tbl_name=tbl, uniq_key=uniq_key)
return row_id
def insert_or_update_tags(self, data, uniq_key='href'):
return self.insert_or_update_common(data, self.tbl_name_tags, uniq_key)
def insert_movie_tags(self, movie_id, tag_id, tags):
return self.insert_or_update_with_composite_pk(
data={'movie_id':movie_id, 'tag_id':tag_id, 'tags':tags},
tbl_name = self.tbl_name_movie_tags,
composite_pk = ['movie_id', 'tag_id'],
exists_do_nothing = True
)
def insert_or_update_movie_404(self, data, is_full_data=1):
try:
data['is_full_data'] = is_full_data
return self.insert_or_update_common(data, self.tbl_name_movies, uniq_key='href')
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# """插入或更新电影数据"""
def insert_or_update_movie(self, movie, is_full_data=1):
try:
# 获取相关 ID
studio_id = self.check_and_get_id(movie.get('studio'), movie.get('uncensored', 0), self.tbl_name_studios) if movie.get('studio') is not None else None
label_id = self.check_and_get_id(movie.get('label'), movie.get('uncensored', 0), self.tbl_name_labels) if movie.get('label') is not None else None
series_id = self.check_and_get_id(movie.get('series'), movie.get('uncensored', 0), self.tbl_name_series) if movie.get('series') is not None else None
if studio_id:
movie['studio_id'] = studio_id
if label_id:
movie['label_id'] = label_id
if series_id:
movie['series_id'] = series_id
movie['is_full_data'] = is_full_data
movie['actors_cnt'] = len(movie.get('actors', []))
movie_id = self.insert_or_update_common(movie, self.tbl_name_movies, uniq_key='href')
if movie_id is None:
logging.warning(f"insert/update movie error. data:{movie}")
return None
logging.debug(f"insert one move, id: {movie_id}, title: {movie['title']}, href: {movie['href']}")
# 插入 performers_movies 关系表
uncensored = movie.get('uncensored', 0)
for performer in movie.get('actors', []):
performer_id = self.get_id_by_key(self.tbl_name_actors, 'href', performer['href'])
# 如果演员不存在,先插入
if performer_id is None:
performer_id = self.insert_actor_index({'zh_name': performer['name'], 'href':performer['href']}, uncensored=uncensored, from_movie_list=1)
logging.debug(f"insert new perfomer. perfomer_id: {performer_id}, name:{performer['name']}")
if performer_id:
tmp_id = self.insert_actor_movie(performer_id, movie_id)
if tmp_id:
logging.debug(f"insert one perfomer_movie. perfomer_id: {performer_id}, movie_id:{movie_id}")
else:
logging.debug(f"insert perfomer_movie failed. perfomer_id: {performer_id}, movie_id:{movie_id}")
else:
logging.warning(f"insert perfomer failed. name: {performer['name']}, href: {performer['href']}")
# 插入 tags 表
for tag in movie.get('tags', []):
tag_name = tag.get('name', '')
tag_href = tag.get('href', '')
tag_id = self.insert_or_update_tags({'name':tag_name, 'href':tag_href}, uniq_key='href')
if tag_id:
logging.debug(f"insert one tags. tag_id: {tag_id}, name: {tag_name}")
tmp_id = self.insert_movie_tags(movie_id=movie_id, tag_id=tag_id, tags=tag_name)
if tmp_id:
logging.debug(f"insert one movie_tag. movie_id: {movie_id}, tag_id: {tag_id}, name: {tag_name}")
else:
logging.warning(f"insert one movie_tag error. movie_id: {movie_id}, tag_id: {tag_id}, name: {tag_name}")
else:
logging.warning(f"insert tags error. name:{tag_name}, href: {tag_href}")
return movie_id
except Exception as e:
self.conn.rollback()
logging.error("Error inserting movie: %s", e)
return None
# 更新 studio / label / series 等的多语言
def update_pubs_multilang(self, data, tbl, **filters):
tbls = {'studio': self.tbl_name_studios, 'label':self.tbl_name_labels, 'series':self.tbl_name_series, 'tags': self.tbl_name_tags}
if not tbls.get(tbl):
logging.warning(f"wrong table. table: {tbl}")
return None
return self.insert_or_update_common(data=data, tbl_name=tbls[tbl], uniq_key='href', exists_do_nothing=False)
def query_list_common(self, tbl, **filters):
tbls = {'studio': self.tbl_name_studios, 'label':self.tbl_name_labels, 'series':self.tbl_name_series}
if not tbls.get(tbl):
logging.warning(f"wrong table. table: {tbl}")
return None
try:
sql = f"SELECT href, name, uncensored, id FROM {tbls[tbl]} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"name": " AND name LIKE ?",
"start_id": " AND id > ?",
"uncensored": " AND uncensored = ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "name":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [{'href': row[0], 'name': row[1], 'uncensored': row[2], 'id':row[3]} for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
def update_tags(self, data):
return self.insert_or_update_common(data, self.tbl_name_tags, uniq_key='href')
def query_tags(self, **filters):
try:
sql = f"SELECT href, name, id FROM {self.tbl_name_tags} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"name": " AND name LIKE ?",
"start_id": " AND id > ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "name":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [{'href': row[0], 'name': row[1], 'id': row[2]} for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
def get_statics(self):
try:
self.cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM {self.tbl_name_actors}) AS actors,
(SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE uncensored=1) AS act_un,
(SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE is_full_data=1) AS act_full,
(SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE uncensored=1 AND is_full_data=1) AS act_unc_full,
(SELECT COUNT(*) FROM {self.tbl_name_movies}) AS movies,
(SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE uncensored=1) AS mov_un,
(SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE is_full_data=1) AS mov_full,
(SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE uncensored=1 AND is_full_data=1) AS mov_un_full,
(SELECT COUNT(*) FROM {self.tbl_name_studios}) AS studios,
(SELECT COUNT(*) FROM {self.tbl_name_labels}) AS labels,
(SELECT COUNT(*) FROM {self.tbl_name_series}) AS series
""")
row = self.cursor.fetchone()
if not row:
logging.warning(f"query no results.")
return {}
# 手动定义列名映射
#columns = ['actors', 'act_un', 'act_full', 'act_unc_full', 'movies', 'mov_un', 'mov_full', 'mov_un_full']
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return {}
# 处理影片的 无码 字段
def reset_movies_uncensored(self, check_and_do = 0):
try:
logging.info("创建临时表以便于保存待更新记录")
self.cursor.execute("""
CREATE TEMPORARY TABLE IF NOT EXISTS temp_movies_to_update (
movie_id INTEGER PRIMARY KEY
)
""")
# 清空临时表(以防之前有残留数据)
self.cursor.execute("DELETE FROM temp_movies_to_update")
logging.info(f"开始收集需要更新的影片ID...")
# 使用单个SQL语句完成所有条件的查询和插入
self.cursor.execute("""
INSERT OR IGNORE INTO temp_movies_to_update (movie_id)
SELECT DISTINCT m.id
FROM javbus_movies m
-- 连接演员表
LEFT JOIN javbus_actors_movies am ON m.id = am.movie_id
LEFT JOIN javbus_actors a ON am.actor_id = a.id
-- 连接标签/系列/工作室表
LEFT JOIN javbus_labels l ON m.label_id = l.id
LEFT JOIN javbus_series s ON m.series_id = s.id
LEFT JOIN javbus_studios st ON m.studio_id = st.id
-- 筛选条件任一表的href包含'uncensored'
WHERE a.href LIKE '%uncensored%'
OR l.href LIKE '%uncensored%'
OR s.href LIKE '%uncensored%'
OR st.href LIKE '%uncensored%'
""")
total_count = self.cursor.execute("SELECT COUNT(*) FROM temp_movies_to_update").fetchone()[0]
total_movies = self.cursor.execute("SELECT COUNT(*) FROM javbus_movies").fetchone()[0]
logging.info(f"共收集到 {total_count} 部需要更新的影片, 共有 {total_movies} 部影片")
if check_and_do:
# 1. 将所有记录的uncensored默认设为0
logging.info("开始将所有影片的uncensored设为默认值0...")
self.cursor.execute("UPDATE javbus_movies SET uncensored = 0")
logging.info(f"已将 {self.cursor.rowcount} 条记录的uncensored设为0")
# 2. 将临时表中匹配的记录设为1
logging.info("开始将匹配的影片的uncensored设为1...")
self.cursor.execute("""
UPDATE javbus_movies
SET uncensored = 1
WHERE id IN (SELECT movie_id FROM temp_movies_to_update)
""")
logging.info(f"已将 {self.cursor.rowcount} 条记录的uncensored设为1")
self.conn.commit()
else:
logging.info("check完毕本次忽略更新。。。")
logging.info("任务执行完成!")
except sqlite3.Error as e:
self.conn.rollback()
logging.error("Error inserting movie: %s", e)
logging.error(f"query error: {e}")
# 处理影片的 无码 字段
def reset_actor_movies(self, check_and_do = 0):
try:
# 检查表中是否已存在movies_cnt列
self.cursor.execute(f"PRAGMA table_info({self.tbl_name_actors});")
columns = [row[1] for row in self.cursor.fetchall()]
if 'movies_cnt' not in columns:
# 列不存在,添加新列
add_field_sql = f"""
ALTER TABLE {self.tbl_name_actors} ADD COLUMN movies_cnt INTEGER DEFAULT 0 NOT NULL;
"""
self.cursor.execute(add_field_sql)
logging.info("成功添加movies_cnt字段")
else:
logging.info("movies_cnt字段已存在跳过添加")
# 确保关联表有索引
self.cursor.execute(f"""
CREATE INDEX IF NOT EXISTS idx_actor_movie_actor_id
ON {self.tbl_name_actor_movie}(actor_id);
""")
# 创建临时表存储统计结果
self.cursor.execute(f"""
CREATE TEMPORARY TABLE temp_actor_counts AS
SELECT actor_id, COUNT(movie_id) AS cnt
FROM {self.tbl_name_actor_movie}
GROUP BY actor_id;
""")
# 为临时表添加索引
self.cursor.execute("CREATE INDEX idx_temp_actor_id ON temp_actor_counts(actor_id);")
# 更新主表
self.cursor.execute(f"""
UPDATE {self.tbl_name_actors}
SET movies_cnt = COALESCE((
SELECT cnt FROM temp_actor_counts
WHERE actor_id = {self.tbl_name_actors}.id
), 0); -- 使用COALESCE处理没有影片的演员
""")
updated_rows = self.cursor.rowcount
logging.info(f"成功更新{updated_rows}个演员的影片数量")
self.conn.commit()
logging.info("任务执行完成!")
except sqlite3.Error as e:
self.conn.rollback()
logging.error("Error updating actor movie_cnt: %s", e)
@register_handler(comm.SPIDER_NAME_IAFD)
class IAFDDBHandler(SQLiteDBHandler):
def __init__(self, db_path=shared_db_path):
#def __init__(self, db_path=test_db_path):
super().__init__(db_path)
self.tbl_name_performers = 'iafd_performers'
self.tbl_name_movies = 'iafd_movies'
self.tbl_name_performer_movies = 'iafd_performers_movies'
self.tbl_name_alias = 'iafd_performer_aliases'
self.tbl_name_moives_appear_in = 'iafd_movies_appers_in'
self.tbl_name_studio = 'iafd_studios'
self.tbl_name_dist = 'iafd_distributors'
self.tbl_name_performer_urls = 'iafd_performer_urls'
self.tbl_name_ethnic = 'iafd_meta_ethnic'
def insert_item(self, item):
# 获取Item中所有定义的字段包括父类继承的
all_fields = item.fields.keys()
# 获取已被赋值的字段存储在Item的内部属性_values中
assigned_fields = set(item._values.keys())
# 过滤被赋值过的字段,其他预定义的字段不处理,这样在插入/更新时才不影响无关字段的值
processed_item = {}
for field in assigned_fields:
processed_item[field] = item[field]
if isinstance(item, items_def.IafdPerformersItem):
self.insert_or_update_performer(processed_item)
elif isinstance(item, items_def.IafdMoviesItem):
self.insert_or_update_movie(processed_item)
elif isinstance(item, items_def.IafdDistributorsItem):
self.insert_or_update_distributor(data=processed_item)
elif isinstance(item, items_def.IafdStudiosItem):
self.insert_or_update_studio(data=processed_item)
elif isinstance(item, items_def.IafdMetaEthnicItem):
self.insert_or_update_ethnic(data=processed_item)
else:
logging.error(f"unkown item. {processed_item}")
return item
# 统计函数
def get_stat(self):
try:
# 使用单个SQL查询获取所有统计数据
self.cursor.execute("""
SELECT
(SELECT COUNT(*) FROM iafd_performers) AS iafd_actors,
(SELECT COUNT(*) FROM iafd_performers WHERE is_full_data=1) AS act_full,
(SELECT COUNT(*) FROM iafd_movies) AS movies,
(SELECT COUNT(*) FROM iafd_movies WHERE is_full_data=1) AS mov_full,
(SELECT COUNT(*) FROM iafd_distributors) AS dist,
(SELECT COUNT(*) FROM iafd_studios) AS stu
""")
# 获取查询结果
row = self.cursor.fetchone()
if not row:
return {}
# 手动定义列名与SQL中的AS别名保持一致
#columns = ['iafd_actors', 'act_full', 'movies', 'mov_full', 'dist', 'stu']
columns = [desc[0] for desc in self.cursor.description]
# 将元组结果转换为字典
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"查询失败: {e}")
return {}
# 插入演员索引,来自于列表数据
#def insert_performer_index(self, name, href, from_astro_list=None, from_birth_list=None, from_ethnic_list=None, from_movie_list=None):
def insert_performer_index(self, name, href, **kwargs):
fields = [
'from_astro_list', 'from_birth_list', 'from_ethnic_list', 'from_movie_list'
]
data = {'name': name, 'href': href}
# 如果没有传入值,就用原来的值
for field in fields:
if kwargs.get(field) is not None:
data[field] = kwargs.get(field)
return self.insert_or_update_common(data=data, tbl_name=self.tbl_name_performers, uniq_key='href', exists_do_nothing=False)
# """插入电影索引,来自于列表数据"""
#def insert_movie_index(self, title, href, release_year=0, from_performer_list=None, from_dist_list=None, from_stu_list=None):
def insert_movie_index(self, title, href, **kwargs):
fields = [
'from_performer_list', 'from_dist_list', 'from_stu_list', 'release_year'
]
data = {'title': title, 'href': href}
# 如果没有传入值,就用原来的值
for field in fields:
if kwargs.get(field) is not None:
data[field] = kwargs.get(field)
return self.insert_or_update_common(data=data, tbl_name=self.tbl_name_movies, uniq_key='href', exists_do_nothing=False)
# 插入演员和电影的关联数据
def insert_performer_movie(self, performer_id, movie_id, role, notes):
return self.insert_or_update_with_composite_pk(
data = {'performer_id': performer_id, 'movie_id': movie_id, 'role': role, 'notes': notes},
tbl_name = self.tbl_name_performer_movies,
composite_pk = ['performer_id', 'movie_id'],
exists_do_nothing = True
)
# 插入电影和电影的关联数据
def insert_movie_appears_in(self, movie_id, appears_in_id, gradation=0, notes=''):
return self.insert_or_update_with_composite_pk(
data = {'movie_id': movie_id, 'appears_in_id': appears_in_id, 'gradation': gradation, 'notes': notes},
tbl_name = self.tbl_name_moives_appear_in,
composite_pk = ['movie_id', 'appears_in_id'],
exists_do_nothing = True
)
# 插入演员信息
def insert_or_update_performer(self, data, movies_update=True):
try:
# 插入演员信息
performer_id = self.insert_or_update_common(data=data, tbl_name=self.tbl_name_performers, uniq_key='href', exists_do_nothing=False)
if performer_id is None:
return None
logging.debug(f"insert one performer, id: {performer_id}, name: {data['name']}, href: {data['href']}")
# 插入新的 alias
alias_list = data.get("performer_aka", [])
# 确保alias是可迭代对象列表/元组),否则跳过
if not isinstance(alias_list, (list, tuple)):
alias_list = []
for alias in alias_list:
if alias.lower() != "no known aliases":
self.insert_or_update_with_composite_pk(
data={'performer_id': performer_id, 'alias': alias},
tbl_name = self.tbl_name_alias,
composite_pk = ['performer_id', 'alias'],
exists_do_nothing = True
)
# 插入影片列表,可能有 personal 和 director 两个身份
if movies_update:
credits = data.get('credits', {})
# 强制转换为字典(防止非字典类型)
if not isinstance(credits, dict):
credits = {}
for role, movies in credits.items():
# 确保movies是可迭代对象列表/元组),否则跳过
if not isinstance(movies, (list, tuple)):
continue
if movies:
for movie in movies:
movie_id = self.get_id_by_key(tbl=self.tbl_name_movies, uniq_key='href', val=movie['href'])
# 影片不存在,先插入
if movie_id is None:
release_year=0
try:
release_year=int(movie['year'])
except Exception as e:
release_year=0
movie_id = self.insert_movie_index(movie['title'], movie['href'], release_year=int(movie['year']), from_performer_list=1)
if movie_id:
tmp_id = self.insert_performer_movie(performer_id, movie_id, role, movie['notes'])
if tmp_id :
logging.debug(f"insert one performer_movie, performer_id: {performer_id}, movie_id: {movie_id}, role: {role}")
else:
logging.warning(f"insert performer_movie failed. performer_id: {performer_id}, moive href: {movie['href']}")
return performer_id
except sqlite3.Error as e:
self.conn.rollback()
logging.error(f"数据库错误: {e}")
return None
except Exception as e:
self.conn.rollback()
logging.error(f"未知错误: {e}")
return None
# """插入或更新电影数据(异常url的处理比如404链接)"""
def insert_or_update_performer_404(self, name, href, is_full_data=1):
return self.insert_or_update_common(
data={'href': href, 'name': name, 'is_full_data': is_full_data},
tbl_name=self.tbl_name_performers,
uniq_key='href',
exists_do_nothing = False
)
# 插入或更新发行商 """
def insert_or_update_ethnic(self, data):
return self.insert_or_update_common(data=data, tbl_name=self.tbl_name_ethnic, uniq_key='href', exists_do_nothing=False)
# 按条件查询 href 列表
def query_ethnic_hrefs(self, **filters):
try:
sql = "SELECT href, name FROM iafd_meta_ethnic WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "url" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "name" in filters:
sql += " AND name LIKE ?"
params.append(f"%{filters['name']}%")
self.cursor.execute(sql, params)
#return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写
return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 插入或更新发行商 """
def insert_or_update_distributor(self, data):
return self.insert_or_update_common(data=data, tbl_name=self.tbl_name_dist, uniq_key='href', exists_do_nothing=False)
# 按条件查询 href 列表
def query_distributor_hrefs(self, **filters):
try:
sql = "SELECT href FROM iafd_distributors WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "url" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "name" in filters:
sql += " AND name LIKE ?"
params.append(f"%{filters['name']}%")
self.cursor.execute(sql, params)
return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# """ 插入或更新制作公司 """
def insert_or_update_studio(self, data):
return self.insert_or_update_common(data=data, tbl_name=self.tbl_name_studio, uniq_key='href', exists_do_nothing=False)
# 按条件查询 href 列表
def query_studio_hrefs(self, **filters):
try:
sql = "SELECT href FROM iafd_studios WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "href" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "name" in filters:
sql += " AND name LIKE ?"
params.append(f"%{filters['name']}%")
self.cursor.execute(sql, params)
return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 检查记录是否存在,不存在就插入
def check_and_get_id(self, name, href, tbl, uniq_key='href'):
if not is_valid_url(href):
return 0
row_id = self.get_id_by_key(tbl, uniq_key, href)
if row_id is None:
data = {'name':name, 'href': href}
if tbl == self.tbl_name_performers:
data['from_movie_list'] = 1
row_id = self.insert_or_update_common(data=data, tbl_name=tbl, uniq_key=uniq_key, exists_do_nothing=True)
return row_id if row_id else 0
# """插入或更新电影数据"""
def insert_or_update_movie(self, movie_data):
try:
# 获取相关 ID
movie_data['distributor_id']= self.check_and_get_id(movie_data['Distributor'], movie_data['DistributorHref'], self.tbl_name_dist, uniq_key='href')
movie_data['studio_id'] = self.check_and_get_id(movie_data['Studio'], movie_data['StudioHref'], self.tbl_name_studio, uniq_key='href')
movie_data['director_id'] = self.check_and_get_id(movie_data['Director'], movie_data['DirectorHref'], self.tbl_name_performers, uniq_key='href')
movie_id = self.insert_or_update_common(data=movie_data, tbl_name=self.tbl_name_movies, uniq_key='href', exists_do_nothing=False)
if movie_id is None:
return None
logging.debug(f"insert one move, id: {movie_id}, title: {movie_data['title']}, href: {movie_data['href']}")
# 导演-电影写入 关系表
if movie_data['director_id']:
tmp_id = self.insert_performer_movie(movie_data['director_id'], movie_id, 'directoral', '')
if tmp_id:
logging.debug(f"insert one perfomer_movie. director_id: {movie_data['director_id']}, movie_id:{movie_id}")
for director in movie_data.get('Directors', []):
director_id = self.check_and_get_id(director['name'], director['href'], self.tbl_name_performers, uniq_key='href')
if director_id:
tmp_id = self.insert_performer_movie(director_id, movie_id, 'directoral', '')
if tmp_id:
logging.debug(f"insert one perfomer_movie. director_id: {director_id}, movie_id:{movie_id}")
# 插入 performers_movies 关系表
for performer in movie_data.get('Performers', []):
performer_id = self.check_and_get_id(performer['name'], performer['href'], self.tbl_name_performers, uniq_key='href')
if performer_id:
notes = '|'.join(tag for tag in performer['tags'] if tag != performer['name'])
tmp_id = self.insert_performer_movie(performer_id, movie_id, 'personal', notes)
if tmp_id:
logging.debug(f"insert one perfomer_movie. perfomer_id: {performer_id}, movie_id:{movie_id}")
else:
logging.debug(f"insert perfomer_movie failed. perfomer_id: {performer_id}, movie_id:{movie_id}")
else:
logging.warning(f"insert perfomer failed. name: {performer['name']}, href: {performer['href']}")
# 插入 movies_appers_in 表
for appears in movie_data.get("AppearsIn", []):
appears_in_id = self.get_id_by_key(self.tbl_name_movies, 'href', appears['href'])
# 不存在,先插入
if appears_in_id is None:
appears_in_id = self.insert_movie_index( appears['title'], appears['href'])
if appears_in_id:
tmp_id = self.insert_movie_appears_in(movie_id, appears_in_id)
if tmp_id:
logging.debug(f"insert one movie_appears_in record. movie_id: {movie_id}, appears_in_id: {appears_in_id}")
else:
logging.warning(f"insert movie_appears_in failed. movie_id: {movie_id}, appears_in_id: {appears_in_id}")
else:
logging.warning(f"get appears_in_id failed. title: {appears['title']}, href: {appears['href']}")
return movie_id
except Exception as e:
self.conn.rollback()
logging.error("Error inserting movie: %s", e)
return None
# """插入或更新电影数据(异常url的处理比如404链接)"""
def insert_or_update_movie_404(self, title, href, is_full_data=1):
return self.insert_or_update_common(
data={'href': href, 'title':title, 'is_full_data':is_full_data},
tbl_name=self.tbl_name_movies,
uniq_key='href',
exists_do_nothing = False
)
# 按条件查询 href 列表
def query_performer_hrefs(self, **filters):
return self.get_performers(**filters)
# 按条件查询 href 列表
def get_performers(self, **filters):
try:
sql = f"SELECT href, name, id, movies_cnt, is_full_data FROM {self.tbl_name_performers} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"name": " AND name LIKE ?",
"is_full_data": " AND is_full_data = ?",
"start_id": " AND id > ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "name":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
for key in ["is_full_data_in", "is_full_data_not_in"]:
if key in filters:
values = filters[key]
if values:
placeholders = ", ".join(["?"] * len(values))
operator = "IN" if key == "is_full_data_in" else "NOT IN"
sql += f" AND is_full_data {operator} ({placeholders})"
params.extend(values)
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [dict(row) for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 按条件查询 href 列表
def query_movie_hrefs(self, **filters):
return self.get_movies(**filters)
# 按条件查询 href 列表
def get_movies(self, **filters):
try:
sql = f"SELECT href, title, id, is_full_data FROM {self.tbl_name_movies} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"title": " AND title LIKE ?",
"is_full_data": " AND is_full_data = ?",
"start_id": " AND id > ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "title":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
for key in ["is_full_data_in", "is_full_data_not_in"]:
if key in filters:
values = filters[key]
if values:
placeholders = ", ".join(["?"] * len(values))
operator = "IN" if key == "is_full_data_in" else "NOT IN"
sql += f" AND is_full_data {operator} ({placeholders})"
params.extend(values)
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [dict(row) for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 按条件查询 href 列表
def get_lord_actors(self, **filters):
try:
sql = f"SELECT href, pornstar as name, id FROM {self.tbl_name_thelordofporn_actress} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"pornstar": " AND pornstar LIKE ?",
"start_id": " AND id > ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "pornstar":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [dict(row) for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 按条件查询 href 列表
def get_iafd_actors(
self,
names: List[str],
tbl = 'stu'
) -> Dict[str, List[Dict[str, str]]]:
"""
分两步查询指定发行商对应的女性演员(使用临时表减少内存占用)
步骤1筛选目标发行商及其关联的影片存入临时表小集合
步骤2用临时表的影片ID关联演员表获取女性演员信息
"""
tbl_name = 'iafd_studios' if tbl.lower() == 'stu' else 'iafd_distributors'
join_key = 'studio_id' if tbl.lower() == 'stu' else 'distributor_id'
if not names:
return {}
# 结果容器
final_result: Dict[str, List[Dict[str, str]]] = {}
try:
# --------------------------
# 步骤1创建临时表存储目标发行商及其关联的影片
# --------------------------
# 先删除可能残留的临时表(避免冲突)
self.cursor.execute("DROP TABLE IF EXISTS temp_distributor_movies")
# 创建临时表(只在当前连接可见,连接关闭后自动删除)
self.cursor.execute("""
CREATE TEMPORARY TABLE temp_distributor_movies (
distributor_id INTEGER,
distributor_name TEXT,
movie_id INTEGER,
PRIMARY KEY (distributor_id, movie_id)
)
""")
# 批量插入目标发行商及其关联的影片(小集合)
# 先筛选发行商,再关联影片,结果插入临时表
insert_sql = """
INSERT INTO temp_distributor_movies (distributor_id, distributor_name, movie_id)
SELECT
d.id AS distributor_id,
d.name AS distributor_name,
m.id AS movie_id
FROM
{tbl_name} d
INNER JOIN
iafd_movies m ON d.id = m.{join_key}
WHERE
d.name IN ({placeholders})
""".format(
tbl_name=tbl_name,
join_key=join_key,
placeholders=', '.join(['?'] * len(names))
)
logging.debug(f'{insert_sql}')
self.cursor.execute(insert_sql, names)
self.conn.commit() # 提交临时表数据
# --------------------------
# 步骤2用临时表关联演员信息仅处理小集合
# --------------------------
query_sql = """
SELECT
t.distributor_name,
p.name AS performer_name,
p.href AS performer_href
FROM
temp_distributor_movies t
INNER JOIN
iafd_performers_movies pm ON t.movie_id = pm.movie_id
INNER JOIN
iafd_performers p ON pm.performer_id = p.id
WHERE
p.gender = 'Woman'
ORDER BY
t.distributor_name, p.name
"""
self.cursor.execute(query_sql)
rows = self.cursor.fetchall()
# 整理结果:按发行商分组
for row in rows:
dist_name = row['distributor_name']
performer = {
'name': row['performer_name'],
'href': row['performer_href']
}
if dist_name not in final_result:
final_result[dist_name] = []
final_result[dist_name].append(performer)
# 主动清理临时表(可选,连接关闭后会自动删除)
self.cursor.execute("DROP TABLE IF EXISTS temp_distributor_movies")
except sqlite3.Error as e:
print(f"查询失败:{e}")
return {}
return final_result
# 获取 view_iafd_performers_movies 中数据 不匹配的演员信息。
def get_performers_needed_update(self, limit=None):
try:
sql = """
SELECT href, name FROM view_iafd_performers_movies where actual_movies_cnt != movies_cnt
"""
if limit is not None:
sql += f" LIMIT {limit}"
self.cursor.execute(sql)
return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return []
# 生成一个复杂的演员电影数量的查询视图,来判断从电影列表中聚合出来的演员-影片数量,与从演员列表中抓取到的影片数量,是否相等。
def check_and_create_stat_table(self, taskid = 0):
try:
# 检查索引是否存在,如果不存在则创建
indexes = [
("idx_iafd_performers_movies_performer_id",
"CREATE INDEX idx_iafd_performers_movies_performer_id ON iafd_performers_movies (performer_id);"),
("idx_iafd_movies_director_id",
"CREATE INDEX idx_iafd_movies_director_id ON iafd_movies (director_id);"),
("idx_iafd_performers_id",
"CREATE INDEX idx_iafd_performers_id ON iafd_performers (id);")
]
for index_name, create_index_sql in indexes:
self.cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name=?", (index_name,))
if not self.cursor.fetchone():
self.cursor.execute(create_index_sql)
logging.info(f"Index {index_name} created successfully.")
else:
logging.info(f"Index {index_name} already exists.")
# 检查视图是否存在,如果不存在则创建
view_name = f"iafd_tmp_performers_stat_{taskid}"
self.cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (view_name,))
if self.cursor.fetchone():
self.cursor.execute("drop table ?", (view_name,))
self.conn.commit()
create_view_sql = f"""
CREATE table {view_name} AS
SELECT
id,
href,
name,
movies_cnt,
SUM(CASE WHEN role = 'actor' THEN movie_count ELSE 0 END) AS actor_movie_count,
SUM(CASE WHEN role = 'director' THEN movie_count ELSE 0 END) AS director_movie_count
FROM (
SELECT
p.id,
p.href,
p.name,
p.movies_cnt,
COUNT(apm.movie_id) AS movie_count,
'actor' AS role
FROM
iafd_performers p
LEFT JOIN
iafd_performers_movies apm ON p.id = apm.performer_id
GROUP BY
p.id, p.href, p.name, p.movies_cnt
UNION ALL
SELECT
p.id,
p.href,
p.name,
p.movies_cnt,
COUNT(im.id) AS movie_count,
'director' AS role
FROM
iafd_performers p
LEFT JOIN
iafd_movies im ON p.id = im.director_id
GROUP BY
p.id, p.href, p.name, p.movies_cnt
) combined
GROUP BY
id, href, name, movies_cnt;
"""
self.cursor.execute(create_view_sql)
logging.info(f"table {view_name} created successfully.")
# 提交更改并关闭连接
self.conn.commit()
except sqlite3.Error as e:
logging.warning(f"An error occurred: {e}")
# 处理影片的 无码 字段
def reset_actor_movies(self, check_and_do = 0):
try:
# 检查表中是否已存在movies_cnt列
self.cursor.execute(f"PRAGMA table_info(iafd_performers);")
columns = [row[1] for row in cursor.fetchall()]
if 'movies_cnt' not in columns:
# 列不存在,添加新列
add_field_sql = f"""
ALTER TABLE iafd_performers ADD COLUMN movies_cnt INTEGER DEFAULT 0 NOT NULL;
"""
self.cursor.execute(add_field_sql)
logging.info("成功添加movies_cnt字段")
else:
logging.info("movies_cnt字段已存在跳过添加")
# 确保关联表有索引
self.cursor.execute(f"""
CREATE INDEX IF NOT EXISTS idx_iafd_performers_movies_performer_id
ON iafd_performers_movies(performer_id);
""")
# 创建临时表存储统计结果
self.cursor.execute(f"""
CREATE TEMPORARY TABLE temp_actor_counts AS
SELECT performer_id, COUNT(movie_id) AS cnt
FROM iafd_performers_movies
GROUP BY performer_id;
""")
# 为临时表添加索引
self.cursor.execute("CREATE INDEX idx_temp_performer_id ON temp_actor_counts(performer_id);")
# 更新主表
self.cursor.execute(f"""
UPDATE iafd_performers
SET movies_cnt = COALESCE((
SELECT cnt FROM temp_actor_counts
WHERE performer_id = iafd_performers.id
), 0); -- 使用COALESCE处理没有影片的演员
""")
updated_rows = cursor.rowcount
logging.info(f"成功更新{updated_rows}个演员的影片数量")
# 清理资源
self.cursor.execute("DROP TABLE IF EXISTS temp_actor_counts;")
self.conn.commit()
logging.info("任务执行完成!")
except sqlite3.Error as e:
self.conn.rollback()
logging.error("Error updating actor movie_cnt: %s", e)