1728 lines
72 KiB
Python
1728 lines
72 KiB
Python
import os
|
||
import sqlite3
|
||
import json
|
||
import logging
|
||
from datetime import datetime
|
||
from typing import List, Dict
|
||
from scrapy_proj.db_wapper.sqlite_base import SQLiteDBHandler, default_dbpath, shared_db_path, test_db_path
|
||
import scrapy_proj.comm.comm_def as comm
|
||
import scrapy_proj.items as items_def
|
||
from scrapy_proj.utils.utils import pretty_json_simple, is_valid_url
|
||
|
||
# 注册器字典
|
||
spider_handler_registry = {}
|
||
|
||
def register_handler(spider_name):
|
||
def decorator(cls):
|
||
spider_handler_registry[spider_name.lower()] = cls
|
||
return cls
|
||
return decorator
|
||
|
||
@register_handler(comm.SPIDER_NAME_SIS)
|
||
class SisDBHandler(SQLiteDBHandler):
|
||
def __init__(self, db_path=default_dbpath):
|
||
super().__init__(db_path)
|
||
self.tbl_name_sis = 'sis'
|
||
|
||
def insert_item(self, item):
|
||
self.insert_or_update_common(item, tbl_name=self.tbl_name_sis, uniq_key='url', exists_do_nothing=True)
|
||
|
||
# 统计函数
|
||
def get_stat(self):
|
||
try:
|
||
self.cursor.execute(f"""
|
||
SELECT
|
||
(SELECT COUNT(*) FROM {self.tbl_name_sis}) AS cnt
|
||
""")
|
||
|
||
row = self.cursor.fetchone()
|
||
if not row:
|
||
logging.warning(f"query no results.")
|
||
return {}
|
||
|
||
columns = [desc[0] for desc in self.cursor.description]
|
||
return dict(zip(columns, row))
|
||
|
||
except sqlite3.Error as e:
|
||
logging.error(f"query error: {e}")
|
||
return {}
|
||
|
||
|
||
@register_handler(comm.SPIDER_NAME_U3C3)
|
||
class U3C3DBHandler(SQLiteDBHandler):
|
||
def __init__(self, db_path=default_dbpath):
|
||
super().__init__(db_path)
|
||
self.tbl_name_u3c3 = 'u3c3'
|
||
|
||
def insert_item(self, item):
|
||
self.insert_or_update_common(item, tbl_name=self.tbl_name_u3c3, uniq_key='url', exists_do_nothing=True)
|
||
|
||
# 统计函数
|
||
def get_stat(self):
|
||
try:
|
||
self.cursor.execute(f"""
|
||
SELECT
|
||
(SELECT COUNT(*) FROM {self.tbl_name_u3c3}) AS cnt
|
||
""")
|
||
|
||
row = self.cursor.fetchone()
|
||
if not row:
|
||
logging.warning(f"query no results.")
|
||
return {}
|
||
|
||
columns = [desc[0] for desc in self.cursor.description]
|
||
return dict(zip(columns, row))
|
||
|
||
except sqlite3.Error as e:
|
||
logging.error(f"query error: {e}")
|
||
return {}
|
||
|
||
|
||
@register_handler(comm.SPIDER_NAME_CLM)
|
||
class ClmDBHandler(SQLiteDBHandler):
|
||
def __init__(self, db_path=default_dbpath):
|
||
super().__init__(db_path)
|
||
self.tbl_name_clm_index = 'clm_index'
|
||
self.tbl_name_clm_keywords = 'clm_keywords'
|
||
self.tbl_name_words_index = 'clm_keywords_index'
|
||
|
||
def insert_item(self, item):
|
||
if item['item_type'] == comm.ITEM_TYPE_CLM_INDEX:
|
||
self.insert_index(item)
|
||
elif item['item_type'] == comm.ITEM_TYPE_CLM_KEYWORDS:
|
||
self.insert_or_update_common(item, self.tbl_name_clm_keywords, uniq_key='words', exists_do_nothing=False)
|
||
else:
|
||
logging.error(f"unkown item.")
|
||
|
||
return item
|
||
|
||
def insert_index(self, item):
|
||
if item['is_update']: # 仅更新
|
||
self.insert_or_update_common(item, self.tbl_name_clm_index, uniq_key='href', exists_do_nothing=False)
|
||
return
|
||
|
||
row_id = self.insert_or_update_common(item, self.tbl_name_clm_index, uniq_key='href', exists_do_nothing=True)
|
||
if row_id:
|
||
lnk_data = {}
|
||
lnk_data['words_id'] = item['key_words_id']
|
||
lnk_data['index_id'] = row_id
|
||
lnk_data['wid_iid'] = f"{item['key_words_id']}_{row_id}"
|
||
lnk_data['tags'] = item['key_words']
|
||
lnk_id = self.insert_or_update_common(lnk_data, self.tbl_name_words_index, uniq_key='wid_iid', exists_do_nothing=True)
|
||
if lnk_id:
|
||
logging.debug(f"insert one item: {lnk_data}")
|
||
else:
|
||
logging.warning(f"insert item error: {lnk_data}")
|
||
else:
|
||
logging.warning(f"insert index error: {item}")
|
||
|
||
def get_empty_title(self):
|
||
try:
|
||
self.cursor.execute(f"SELECT id, href FROM {self.tbl_name_clm_index} WHERE title='' ")
|
||
return [dict(row) for row in self.cursor.fetchall()]
|
||
except sqlite3.Error as e:
|
||
logging.error(f"查询 href 失败: {e}")
|
||
return None
|
||
|
||
def get_count_by_keywords_id(self, key_words_id):
|
||
self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_name_words_index} WHERE words_id = ?", (key_words_id,))
|
||
row = self.cursor.fetchone()
|
||
return row[0] if row else None
|
||
|
||
# 按条件查询 href 列表
|
||
def get_key_words(self, **filters):
|
||
try:
|
||
sql = f"SELECT id, words, groups FROM {self.tbl_name_clm_keywords} WHERE 1=1"
|
||
params = []
|
||
|
||
conditions = {
|
||
"id": " AND id = ?",
|
||
"words": " AND words LIKE ?",
|
||
"groups": " AND groups LIKE ?",
|
||
"tags": " AND tags LIKE ?",
|
||
"start_id": " AND id > ?",
|
||
}
|
||
|
||
for key, condition in conditions.items():
|
||
if key in filters:
|
||
sql += condition
|
||
if key == "words" or key == 'groups' or key == 'tags':
|
||
params.append(f"%{filters[key]}%")
|
||
else:
|
||
params.append(filters[key])
|
||
|
||
if "query_str" in filters:
|
||
sql += f" AND {filters['query_str']} "
|
||
|
||
if "order_by" in filters:
|
||
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
|
||
sql += f" ORDER BY {filters['order_by']} "
|
||
|
||
if 'limit' in filters:
|
||
sql += " LIMIT ?"
|
||
params.append(filters["limit"])
|
||
|
||
self.cursor.execute(sql, params)
|
||
return [dict(row) for row in self.cursor.fetchall()]
|
||
except sqlite3.Error as e:
|
||
logging.error(f"查询 href 失败: {e}")
|
||
return None
|
||
|
||
# 统计函数
|
||
def get_stat(self):
|
||
try:
|
||
self.cursor.execute(f"""
|
||
SELECT
|
||
(SELECT COUNT(*) FROM {self.tbl_name_clm_keywords}) AS words,
|
||
(SELECT COUNT(*) FROM {self.tbl_name_clm_index}) AS magnets,
|
||
(SELECT COUNT(*) FROM {self.tbl_name_words_index}) AS wd_mag
|
||
""")
|
||
|
||
row = self.cursor.fetchone()
|
||
if not row:
|
||
logging.warning(f"query no results.")
|
||
return {}
|
||
|
||
columns = [desc[0] for desc in self.cursor.description]
|
||
return dict(zip(columns, row))
|
||
|
||
except sqlite3.Error as e:
|
||
logging.error(f"query error: {e}")
|
||
return {}
|
||
|
||
@register_handler(comm.SPIDER_NAME_PBOX)
|
||
class PboxDBHandler(SQLiteDBHandler):
|
||
def __init__(self, db_path=shared_db_path):
|
||
super().__init__(db_path)
|
||
#self._create_tables()
|
||
|
||
self.tbl_studios = 'pbox_studios'
|
||
self.tbl_movies = 'pbox_movies'
|
||
self.tbl_tags = 'pbox_tags'
|
||
self.tbl_mov_tags = 'pbox_movies_tags'
|
||
self.tbl_mov_alts = 'pbox_movies_alts'
|
||
self.tbl_actor = 'pbox_actors'
|
||
self.tbl_actor_alias = 'pbox_actor_aliases'
|
||
self.tbl_actor_mov = 'pbox_actors_movies'
|
||
|
||
def _create_tables(self):
|
||
pass
|
||
|
||
def insert_item(self, item):
|
||
if item['item_type'] == comm.ITEM_TYPE_STUDIO:
|
||
self.insert_or_update_common(item, self.tbl_studios, uniq_key='href', exists_do_nothing=True)
|
||
elif item['item_type'] == comm.ITEM_TYPE_MOVIE_DETAIL:
|
||
self.insert_movie(item)
|
||
else:
|
||
logging.error(f"unkown item.")
|
||
|
||
return item
|
||
|
||
# 插入电影明细,逻辑比较复杂
|
||
def insert_movie(self, item):
|
||
# 插入电影
|
||
mov_id = self.insert_or_update_common(item, self.tbl_movies, uniq_key='href', exists_do_nothing=False)
|
||
if not mov_id:
|
||
logging.error(f"insert movie error. movid: {item['movie_id']}, url: {item['href']}")
|
||
return
|
||
|
||
# 插入演员
|
||
for actor in item.get('actor_index_list', []):
|
||
actor_id = self.get_id_by_key(self.tbl_actor, uniq_key='href', val=actor['href'])
|
||
if not actor_id:
|
||
actor_id = self.insert_or_update_common(actor, self.tbl_actor, uniq_key='href', exists_do_nothing=True)
|
||
if actor_id:
|
||
lnk_id = self.insert_or_update_common({'movie_id':mov_id, 'actor_id':actor_id, 'actor_mov': f'{actor_id}_{mov_id}'}, tbl_name=self.tbl_actor_mov, uniq_key='actor_mov', exists_do_nothing=True)
|
||
if not lnk_id:
|
||
logging.error(f"insert actor_movie error. actor id: {actor_id}, mov id: {mov_id}")
|
||
|
||
# 插入tags
|
||
for tag in item.get('mov_tags_list', []):
|
||
tag_id = self.get_id_by_key(self.tbl_tags, uniq_key='href', val=tag['href'])
|
||
if not tag_id:
|
||
tag_id = self.insert_or_update_common(tag, self.tbl_tags, uniq_key='href')
|
||
if tag_id:
|
||
lnk_id = self.insert_or_update_common({'movie_id':mov_id, 'tag_id':tag_id, 'movid_tagid': f'{mov_id}_{tag_id}', 'tags':''}, tbl_name=self.tbl_mov_tags, uniq_key='movid_tagid', exists_do_nothing=True)
|
||
if not lnk_id:
|
||
logging.error(f"insert movie_tag error. tag id: {tag_id}, mov id: {mov_id}")
|
||
|
||
# 插入别名
|
||
for alt in item.get('mov_alt_list', []):
|
||
min_max = f"{alt['min_mov_id']}_{alt['max_mov_id']}"
|
||
lnk_id = self.insert_or_update_common({'min_mov_id':alt['min_mov_id'], 'max_mov_id':alt['max_mov_id'], 'min_max': min_max}, tbl_name=self.tbl_mov_alts, uniq_key='min_max', exists_do_nothing=True)
|
||
if not lnk_id:
|
||
logging.error(f"insert movie_alt error. item: {alt}")
|
||
|
||
|
||
def get_studios(self, **filters):
|
||
try:
|
||
sql = f"SELECT href, name, id, label_id, scene_count FROM {self.tbl_studios} WHERE 1=1"
|
||
params = []
|
||
|
||
conditions = {
|
||
"id": " AND id = ?",
|
||
"href": " AND href = ?",
|
||
"name": " AND name LIKE ?",
|
||
"start_id": " AND id > ?",
|
||
}
|
||
|
||
for key, condition in conditions.items():
|
||
if key in filters:
|
||
sql += condition
|
||
if key == "name":
|
||
params.append(f"%{filters[key]}%")
|
||
else:
|
||
params.append(filters[key])
|
||
|
||
if "order_by" in filters:
|
||
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
|
||
sql += f" ORDER BY {filters['order_by']} "
|
||
|
||
if 'limit' in filters:
|
||
sql += " LIMIT ?"
|
||
params.append(filters["limit"])
|
||
|
||
self.cursor.execute(sql, params)
|
||
return [dict(row) for row in self.cursor.fetchall()]
|
||
except sqlite3.Error as e:
|
||
logging.error(f"查询 href 失败: {e}")
|
||
return []
|
||
|
||
def get_stu_mov_count(self, stu_id):
|
||
self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_movies} WHERE studio_id = ?", (stu_id,))
|
||
row = self.cursor.fetchone()
|
||
return row[0] if row else None
|
||
|
||
# 统计函数
|
||
def get_stat(self):
|
||
try:
|
||
self.cursor.execute(f"""
|
||
SELECT
|
||
(SELECT COUNT(*) FROM {self.tbl_studios}) AS studios,
|
||
(SELECT COUNT(*) FROM {self.tbl_movies}) AS movies,
|
||
(SELECT COUNT(*) FROM {self.tbl_actor}) AS actors,
|
||
(SELECT COUNT(*) FROM {self.tbl_actor_mov}) AS act_mov
|
||
""")
|
||
|
||
row = self.cursor.fetchone()
|
||
if not row:
|
||
logging.warning(f"query no results.")
|
||
return {}
|
||
|
||
columns = [desc[0] for desc in self.cursor.description]
|
||
return dict(zip(columns, row))
|
||
|
||
except sqlite3.Error as e:
|
||
logging.error(f"query error: {e}")
|
||
return {}
|
||
|
||
def close_spider(self, spider):
|
||
# 关闭数据库连接
|
||
self.conn.close()
|
||
|
||
|
||
|
||
@register_handler(comm.SPIDER_NAME_JAVHD)
|
||
class JavHDDBHandler(SQLiteDBHandler):
|
||
def __init__(self, db_path=shared_db_path):
|
||
super().__init__(db_path)
|
||
self.tbl_name_javhd = 'javhd_models'
|
||
|
||
def insert_item(self, item):
|
||
if item['item_type'] == comm.ITEM_TYPE_ACTOR_INDEX:
|
||
self.insert_or_update_common(item, self.tbl_name_javhd, uniq_key='url', exists_do_nothing=False)
|
||
elif item['item_type'] == comm.ITEM_TYPE_ACTOR_DETAIL:
|
||
self.insert_or_update_common(item, self.tbl_name_javhd, uniq_key='url', exists_do_nothing=False)
|
||
else:
|
||
logging.error(f"unkown item.")
|
||
|
||
return item
|
||
|
||
# 统计函数
|
||
def get_stat(self):
|
||
try:
|
||
self.cursor.execute(f"""
|
||
SELECT
|
||
(SELECT COUNT(*) FROM {self.tbl_name_javhd}) AS cnt
|
||
""")
|
||
|
||
row = self.cursor.fetchone()
|
||
if not row:
|
||
logging.warning(f"query no results.")
|
||
return {}
|
||
|
||
columns = [desc[0] for desc in self.cursor.description]
|
||
return dict(zip(columns, row))
|
||
|
||
except sqlite3.Error as e:
|
||
logging.error(f"query error: {e}")
|
||
return {}
|
||
|
||
def has_full_data(self, href):
|
||
try:
|
||
self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_name_javhd} WHERE is_full_data=1 and url = ?", (href,))
|
||
row = self.cursor.fetchone()
|
||
return row[0] if row else None
|
||
except sqlite3.Error as e:
|
||
logging.error(f"query error: {e}")
|
||
return 0
|
||
|
||
|
||
@register_handler(comm.SPIDER_NAME_LORD)
|
||
class LordDBHandler(SQLiteDBHandler):
|
||
def __init__(self, db_path=shared_db_path):
|
||
super().__init__(db_path)
|
||
self.tbl_name_actors = 'thelordofporn_actress'
|
||
self.tbl_name_alias = 'thelordofporn_alias'
|
||
|
||
def insert_item(self, item):
|
||
if item['item_type'] == comm.ITEM_TYPE_ACTOR_DETAIL:
|
||
self.insert_actor(item)
|
||
else:
|
||
logging.error(f"unkown item.")
|
||
|
||
return item
|
||
|
||
def insert_actor(self, item):
|
||
actor_id = self.insert_or_update_common(item, self.tbl_name_actors, uniq_key='href', exists_do_nothing=False)
|
||
if actor_id:
|
||
for alias in item.get('alias', []):
|
||
alias_data = {'actress_id':actor_id, 'alias':alias}
|
||
affected_rows = self.insert_or_update_with_composite_pk(data=alias_data, tbl_name=self.tbl_name_alias, composite_pk=['actress_id','alias'], exists_do_nothing=False)
|
||
if affected_rows:
|
||
logging.debug(f"insert/update actress_alias. data: {alias_data}")
|
||
else:
|
||
logging.warning(f"insert actor alias error!. data: {alias_data}")
|
||
else:
|
||
logging.warning(f"insert actor data error! data: {pretty_json_simple(item)}")
|
||
|
||
# 统计函数
|
||
def get_stat(self):
|
||
try:
|
||
self.cursor.execute(f"""
|
||
SELECT
|
||
(SELECT COUNT(*) FROM {self.tbl_name_actors}) AS actor_cnt
|
||
""")
|
||
|
||
row = self.cursor.fetchone()
|
||
if not row:
|
||
logging.warning(f"query no results.")
|
||
return {}
|
||
|
||
columns = [desc[0] for desc in self.cursor.description]
|
||
return dict(zip(columns, row))
|
||
|
||
except sqlite3.Error as e:
|
||
logging.error(f"query error: {e}")
|
||
return {}
|
||
|
||
def has_full_data(self, href):
|
||
try:
|
||
self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_name_actors} WHERE is_full_data=1 and href = ?", (href,))
|
||
row = self.cursor.fetchone()
|
||
return row[0] if row else None
|
||
except sqlite3.Error as e:
|
||
logging.error(f"query error: {e}")
|
||
return 0
|
||
|
||
|
||
@register_handler(comm.SPIDER_NAME_JAVBUS)
|
||
class JavBusDBHandler(SQLiteDBHandler):
|
||
def __init__(self, db_path=shared_db_path):
|
||
super().__init__(db_path)
|
||
self.tbl_name_actors = 'javbus_actors'
|
||
self.tbl_name_movies = 'javbus_movies'
|
||
self.tbl_name_studios = 'javbus_studios'
|
||
self.tbl_name_labels = 'javbus_labels'
|
||
self.tbl_name_series = 'javbus_series'
|
||
self.tbl_name_tags = 'javbus_tags'
|
||
self.tbl_name_movie_tags = 'javbus_movies_tags'
|
||
self.tbl_name_actor_movie = 'javbus_actors_movies'
|
||
|
||
def insert_item(self, item):
|
||
# 获取Item中所有定义的字段(包括父类继承的)
|
||
all_fields = item.fields.keys()
|
||
# 获取已被赋值的字段(存储在Item的内部属性_values中)
|
||
assigned_fields = set(item._values.keys())
|
||
# 过滤被赋值过的字段,其他预定义的字段不处理,这样在插入/更新时才不影响无关字段的值
|
||
processed_item = {}
|
||
for field in assigned_fields:
|
||
processed_item[field] = item[field]
|
||
|
||
if isinstance(item, items_def.JavbusActorsItem):
|
||
self.update_actor_detail(processed_item)
|
||
|
||
elif isinstance(item, items_def.JavbusMoviesItem):
|
||
self.insert_or_update_movie(processed_item)
|
||
|
||
elif isinstance(item, items_def.JavbusLabelsItem):
|
||
self.update_pubs_multilang(data=processed_item, tbl='label')
|
||
|
||
elif isinstance(item, items_def.JavbusStudiosItem):
|
||
self.update_pubs_multilang(data=processed_item, tbl='studio')
|
||
|
||
elif isinstance(item, items_def.JavbusSeriesItem):
|
||
self.update_pubs_multilang(data=processed_item, tbl='series')
|
||
|
||
elif isinstance(item, items_def.JavbusTagsItem):
|
||
self.update_pubs_multilang(data=processed_item, tbl='tags')
|
||
|
||
else:
|
||
logging.error(f"unkown item. {processed_item}")
|
||
|
||
return item
|
||
|
||
# 统计函数
|
||
def get_stat(self):
|
||
return self.get_statics()
|
||
|
||
def has_full_data(self, href):
|
||
try:
|
||
self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_name_actors} WHERE is_full_data=1 and href = ?", (href,))
|
||
row = self.cursor.fetchone()
|
||
return row[0] if row else None
|
||
except sqlite3.Error as e:
|
||
logging.error(f"query error: {e}")
|
||
return 0
|
||
|
||
def insert_actor_index(self, data, **kwargs):
|
||
fields = ['uncensored', 'from_actor_list', 'from_movie_list']
|
||
# 如果没有传入值,就用原来的值
|
||
for field in fields:
|
||
if kwargs.get(field) is not None:
|
||
data[field] = kwargs.get(field)
|
||
|
||
try:
|
||
return self.insert_or_update_common(data, self.tbl_name_actors, uniq_key='href', exists_do_nothing=True)
|
||
except sqlite3.Error as e:
|
||
logging.error(f"Error inserting or updating data: {e}")
|
||
return None
|
||
|
||
def insert_movie_index(self, data, **kwargs):
|
||
fields = [
|
||
'uncensored', 'from_actor_list', 'from_movie_studios', 'from_movie_labels', 'from_movie_series',
|
||
'studio_id', 'label_id', 'series_id'
|
||
]
|
||
# 如果没有传入值,就用原来的值
|
||
for field in fields:
|
||
if kwargs.get(field) is not None:
|
||
data[field] = kwargs.get(field)
|
||
try:
|
||
return self.insert_or_update_common(data, self.tbl_name_movies, uniq_key='href')
|
||
except sqlite3.Error as e:
|
||
logging.error(f"Error inserting or updating data: {e}")
|
||
return None
|
||
|
||
# 插入演员和电影的关联数据
|
||
def insert_actor_movie(self, performer_id, movie_id, tags=''):
|
||
return self.insert_or_update_with_composite_pk(
|
||
data={'actor_id':performer_id, 'movie_id':movie_id, 'tags':tags},
|
||
tbl_name = self.tbl_name_actor_movie,
|
||
composite_pk = ['actor_id', 'movie_id'],
|
||
exists_do_nothing = True
|
||
)
|
||
|
||
def update_actor_detail_404(self, data, is_full_data=1):
|
||
try:
|
||
data['is_full_data'] = is_full_data
|
||
return self.insert_or_update_common(data, self.tbl_name_actors, uniq_key='href')
|
||
except sqlite3.Error as e:
|
||
logging.error(f"Error inserting or updating data: {e}")
|
||
return None
|
||
|
||
def update_actor_detail(self, data, is_full_data=1):
|
||
try:
|
||
# 跟新actor表
|
||
avatar = data.get('avatar', {})
|
||
avatar['href'] = data['href']
|
||
avatar['is_full_data'] = is_full_data
|
||
|
||
avatar_id = self.insert_or_update_common(avatar, self.tbl_name_actors, uniq_key='href', exists_do_nothing=False)
|
||
if not avatar_id:
|
||
logging.warning(f"get actor id error. href: {data['href']}")
|
||
return None
|
||
else:
|
||
logging.debug(f"update actor data. href: {data['href']} avatar: {avatar}")
|
||
|
||
# 更新movies表
|
||
uncensored = data.get('uncensored', 0)
|
||
for movie in data.get('credits', []):
|
||
movie_id = self.insert_movie_index(movie, from_actor_list=1, uncensored=uncensored)
|
||
if movie_id:
|
||
logging.debug(f"insert one movie index. data: {movie}")
|
||
# 插入关系表
|
||
link_id = self.insert_actor_movie(avatar_id, movie_id)
|
||
if link_id:
|
||
logging.debug(f"insert one actor_movie record. actor id: {avatar_id}, movie id: {movie_id}")
|
||
|
||
return avatar_id
|
||
except sqlite3.Error as e:
|
||
logging.error(f"Error inserting or updating data: {e}")
|
||
return None
|
||
|
||
def query_actors(self, **filters):
|
||
try:
|
||
sql = f"SELECT href, en_name as name, uncensored, movies_cnt, id, is_full_data FROM {self.tbl_name_actors} WHERE 1=1"
|
||
params = []
|
||
|
||
conditions = {
|
||
"id": " AND id = ?",
|
||
"href": " AND href = ?",
|
||
"en_name": " AND en_name LIKE ?",
|
||
"is_full_data": " AND is_full_data = ?",
|
||
"start_id": " AND id > ?",
|
||
"uncensored": " AND uncensored = ?",
|
||
}
|
||
|
||
for key, condition in conditions.items():
|
||
if key in filters:
|
||
sql += condition
|
||
if key == "en_name":
|
||
params.append(f"%{filters[key]}%")
|
||
else:
|
||
params.append(filters[key])
|
||
|
||
for key in ["is_full_data_in", "is_full_data_not_in"]:
|
||
if key in filters:
|
||
values = filters[key]
|
||
if values:
|
||
placeholders = ", ".join(["?"] * len(values))
|
||
operator = "IN" if key == "is_full_data_in" else "NOT IN"
|
||
sql += f" AND is_full_data {operator} ({placeholders})"
|
||
params.extend(values)
|
||
|
||
if "order_by" in filters:
|
||
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
|
||
sql += f" ORDER BY {filters['order_by']} "
|
||
|
||
if 'limit' in filters:
|
||
sql += " LIMIT ?"
|
||
params.append(filters["limit"])
|
||
|
||
self.cursor.execute(sql, params)
|
||
return [dict(row) for row in self.cursor.fetchall()]
|
||
#return [{'href': row[0], 'name': row[1], 'uncensored': row[2], 'movies_cnt':row[3]} for row in self.cursor.fetchall()]
|
||
except sqlite3.Error as e:
|
||
logging.error(f"查询 href 失败: {e}")
|
||
return None
|
||
|
||
def query_movies(self, **filters):
|
||
try:
|
||
sql = f"SELECT href, title, uncensored, id, is_full_data FROM {self.tbl_name_movies} WHERE 1=1"
|
||
params = []
|
||
|
||
conditions = {
|
||
"id": " AND id = ?",
|
||
"href": " AND href = ?",
|
||
"title": " AND title LIKE ?",
|
||
"is_full_data": " AND is_full_data = ?",
|
||
"start_id": " AND id > ?",
|
||
"uncensored": " AND uncensored = ?",
|
||
}
|
||
|
||
for key, condition in conditions.items():
|
||
if key in filters:
|
||
sql += condition
|
||
if key == "title":
|
||
params.append(f"%{filters[key]}%")
|
||
else:
|
||
params.append(filters[key])
|
||
|
||
for key in ["is_full_data_in", "is_full_data_not_in"]:
|
||
if key in filters:
|
||
values = filters[key]
|
||
if values:
|
||
placeholders = ", ".join(["?"] * len(values))
|
||
operator = "IN" if key == "is_full_data_in" else "NOT IN"
|
||
sql += f" AND is_full_data {operator} ({placeholders})"
|
||
params.extend(values)
|
||
|
||
if "order_by" in filters:
|
||
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
|
||
sql += f" ORDER BY {filters['order_by']} "
|
||
|
||
if 'limit' in filters:
|
||
sql += " LIMIT ?"
|
||
params.append(filters["limit"])
|
||
|
||
self.cursor.execute(sql, params)
|
||
return [dict(row) for row in self.cursor.fetchall()]
|
||
#return [{'href': row[0], 'title': row[1], 'uncensored': row[2], 'id':row[3]} for row in self.cursor.fetchall()]
|
||
except sqlite3.Error as e:
|
||
logging.error(f"查询 href 失败: {e}")
|
||
return None
|
||
|
||
# 检查记录是否存在,不存在就插入
|
||
def check_and_get_id(self, item, uncensored, tbl, uniq_key='href'):
|
||
name = item['name']
|
||
href = item['href']
|
||
row_id = self.get_id_by_key(tbl, uniq_key, href)
|
||
if row_id is None:
|
||
row_id = self.insert_or_update_common({'name':name, 'href': href, 'uncensored':uncensored, 'from_movie_list':1}, tbl_name=tbl, uniq_key=uniq_key)
|
||
|
||
return row_id
|
||
|
||
def insert_or_update_tags(self, data, uniq_key='href'):
|
||
return self.insert_or_update_common(data, self.tbl_name_tags, uniq_key)
|
||
|
||
def insert_movie_tags(self, movie_id, tag_id, tags):
|
||
return self.insert_or_update_with_composite_pk(
|
||
data={'movie_id':movie_id, 'tag_id':tag_id, 'tags':tags},
|
||
tbl_name = self.tbl_name_movie_tags,
|
||
composite_pk = ['movie_id', 'tag_id'],
|
||
exists_do_nothing = True
|
||
)
|
||
|
||
def insert_or_update_movie_404(self, data, is_full_data=1):
|
||
try:
|
||
data['is_full_data'] = is_full_data
|
||
return self.insert_or_update_common(data, self.tbl_name_movies, uniq_key='href')
|
||
except sqlite3.Error as e:
|
||
logging.error(f"Error inserting or updating data: {e}")
|
||
return None
|
||
|
||
# """插入或更新电影数据"""
|
||
def insert_or_update_movie(self, movie, is_full_data=1):
|
||
try:
|
||
# 获取相关 ID
|
||
studio_id = self.check_and_get_id(movie.get('studio'), movie.get('uncensored', 0), self.tbl_name_studios) if movie.get('studio') is not None else None
|
||
label_id = self.check_and_get_id(movie.get('label'), movie.get('uncensored', 0), self.tbl_name_labels) if movie.get('label') is not None else None
|
||
series_id = self.check_and_get_id(movie.get('series'), movie.get('uncensored', 0), self.tbl_name_series) if movie.get('series') is not None else None
|
||
|
||
if studio_id:
|
||
movie['studio_id'] = studio_id
|
||
if label_id:
|
||
movie['label_id'] = label_id
|
||
if series_id:
|
||
movie['series_id'] = series_id
|
||
|
||
movie['is_full_data'] = is_full_data
|
||
movie['actors_cnt'] = len(movie.get('actors', []))
|
||
|
||
movie_id = self.insert_or_update_common(movie, self.tbl_name_movies, uniq_key='href')
|
||
if movie_id is None:
|
||
logging.warning(f"insert/update movie error. data:{movie}")
|
||
return None
|
||
|
||
logging.debug(f"insert one move, id: {movie_id}, title: {movie['title']}, href: {movie['href']}")
|
||
|
||
# 插入 performers_movies 关系表
|
||
uncensored = movie.get('uncensored', 0)
|
||
for performer in movie.get('actors', []):
|
||
performer_id = self.get_id_by_key(self.tbl_name_actors, 'href', performer['href'])
|
||
# 如果演员不存在,先插入
|
||
if performer_id is None:
|
||
performer_id = self.insert_actor_index({'zh_name': performer['name'], 'href':performer['href']}, uncensored=uncensored, from_movie_list=1)
|
||
logging.debug(f"insert new perfomer. perfomer_id: {performer_id}, name:{performer['name']}")
|
||
if performer_id:
|
||
tmp_id = self.insert_actor_movie(performer_id, movie_id)
|
||
if tmp_id:
|
||
logging.debug(f"insert one perfomer_movie. perfomer_id: {performer_id}, movie_id:{movie_id}")
|
||
else:
|
||
logging.debug(f"insert perfomer_movie failed. perfomer_id: {performer_id}, movie_id:{movie_id}")
|
||
else:
|
||
logging.warning(f"insert perfomer failed. name: {performer['name']}, href: {performer['href']}")
|
||
|
||
# 插入 tags 表
|
||
for tag in movie.get('tags', []):
|
||
tag_name = tag.get('name', '')
|
||
tag_href = tag.get('href', '')
|
||
tag_id = self.insert_or_update_tags({'name':tag_name, 'href':tag_href}, uniq_key='href')
|
||
if tag_id:
|
||
logging.debug(f"insert one tags. tag_id: {tag_id}, name: {tag_name}")
|
||
tmp_id = self.insert_movie_tags(movie_id=movie_id, tag_id=tag_id, tags=tag_name)
|
||
if tmp_id:
|
||
logging.debug(f"insert one movie_tag. movie_id: {movie_id}, tag_id: {tag_id}, name: {tag_name}")
|
||
else:
|
||
logging.warning(f"insert one movie_tag error. movie_id: {movie_id}, tag_id: {tag_id}, name: {tag_name}")
|
||
else:
|
||
logging.warning(f"insert tags error. name:{tag_name}, href: {tag_href}")
|
||
|
||
return movie_id
|
||
|
||
except Exception as e:
|
||
self.conn.rollback()
|
||
logging.error("Error inserting movie: %s", e)
|
||
return None
|
||
|
||
# 更新 studio / label / series 等的多语言
|
||
def update_pubs_multilang(self, data, tbl, **filters):
|
||
tbls = {'studio': self.tbl_name_studios, 'label':self.tbl_name_labels, 'series':self.tbl_name_series, 'tags': self.tbl_name_tags}
|
||
if not tbls.get(tbl):
|
||
logging.warning(f"wrong table. table: {tbl}")
|
||
return None
|
||
|
||
return self.insert_or_update_common(data=data, tbl_name=tbls[tbl], uniq_key='href', exists_do_nothing=False)
|
||
|
||
def query_list_common(self, tbl, **filters):
|
||
tbls = {'studio': self.tbl_name_studios, 'label':self.tbl_name_labels, 'series':self.tbl_name_series}
|
||
if not tbls.get(tbl):
|
||
logging.warning(f"wrong table. table: {tbl}")
|
||
return None
|
||
try:
|
||
sql = f"SELECT href, name, uncensored, id FROM {tbls[tbl]} WHERE 1=1"
|
||
params = []
|
||
|
||
conditions = {
|
||
"id": " AND id = ?",
|
||
"href": " AND href = ?",
|
||
"name": " AND name LIKE ?",
|
||
"start_id": " AND id > ?",
|
||
"uncensored": " AND uncensored = ?",
|
||
}
|
||
|
||
for key, condition in conditions.items():
|
||
if key in filters:
|
||
sql += condition
|
||
if key == "name":
|
||
params.append(f"%{filters[key]}%")
|
||
else:
|
||
params.append(filters[key])
|
||
|
||
if "order_by" in filters:
|
||
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
|
||
sql += f" ORDER BY {filters['order_by']} "
|
||
|
||
if 'limit' in filters:
|
||
sql += " LIMIT ?"
|
||
params.append(filters["limit"])
|
||
|
||
self.cursor.execute(sql, params)
|
||
return [{'href': row[0], 'name': row[1], 'uncensored': row[2], 'id':row[3]} for row in self.cursor.fetchall()]
|
||
except sqlite3.Error as e:
|
||
logging.error(f"查询 href 失败: {e}")
|
||
return None
|
||
|
||
def update_tags(self, data):
|
||
return self.insert_or_update_common(data, self.tbl_name_tags, uniq_key='href')
|
||
|
||
def query_tags(self, **filters):
|
||
try:
|
||
sql = f"SELECT href, name, id FROM {self.tbl_name_tags} WHERE 1=1"
|
||
params = []
|
||
|
||
conditions = {
|
||
"id": " AND id = ?",
|
||
"href": " AND href = ?",
|
||
"name": " AND name LIKE ?",
|
||
"start_id": " AND id > ?",
|
||
}
|
||
|
||
for key, condition in conditions.items():
|
||
if key in filters:
|
||
sql += condition
|
||
if key == "name":
|
||
params.append(f"%{filters[key]}%")
|
||
else:
|
||
params.append(filters[key])
|
||
|
||
if "order_by" in filters:
|
||
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
|
||
sql += f" ORDER BY {filters['order_by']} "
|
||
|
||
if 'limit' in filters:
|
||
sql += " LIMIT ?"
|
||
params.append(filters["limit"])
|
||
|
||
self.cursor.execute(sql, params)
|
||
return [{'href': row[0], 'name': row[1], 'id': row[2]} for row in self.cursor.fetchall()]
|
||
except sqlite3.Error as e:
|
||
logging.error(f"查询 href 失败: {e}")
|
||
return None
|
||
|
||
def get_statics(self):
|
||
try:
|
||
self.cursor.execute(f"""
|
||
SELECT
|
||
(SELECT COUNT(*) FROM {self.tbl_name_actors}) AS actors,
|
||
(SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE uncensored=1) AS act_un,
|
||
(SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE is_full_data=1) AS act_full,
|
||
(SELECT COUNT(*) FROM {self.tbl_name_actors} WHERE uncensored=1 AND is_full_data=1) AS act_unc_full,
|
||
(SELECT COUNT(*) FROM {self.tbl_name_movies}) AS movies,
|
||
(SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE uncensored=1) AS mov_un,
|
||
(SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE is_full_data=1) AS mov_full,
|
||
(SELECT COUNT(*) FROM {self.tbl_name_movies} WHERE uncensored=1 AND is_full_data=1) AS mov_un_full,
|
||
(SELECT COUNT(*) FROM {self.tbl_name_studios}) AS studios,
|
||
(SELECT COUNT(*) FROM {self.tbl_name_labels}) AS labels,
|
||
(SELECT COUNT(*) FROM {self.tbl_name_series}) AS series
|
||
""")
|
||
|
||
row = self.cursor.fetchone()
|
||
if not row:
|
||
logging.warning(f"query no results.")
|
||
return {}
|
||
|
||
# 手动定义列名映射
|
||
#columns = ['actors', 'act_un', 'act_full', 'act_unc_full', 'movies', 'mov_un', 'mov_full', 'mov_un_full']
|
||
columns = [desc[0] for desc in self.cursor.description]
|
||
return dict(zip(columns, row))
|
||
|
||
except sqlite3.Error as e:
|
||
logging.error(f"query error: {e}")
|
||
return {}
|
||
|
||
# 处理影片的 无码 字段
|
||
def reset_movies_uncensored(self, check_and_do = 0):
|
||
try:
|
||
logging.info("创建临时表以便于保存待更新记录")
|
||
self.cursor.execute("""
|
||
CREATE TEMPORARY TABLE IF NOT EXISTS temp_movies_to_update (
|
||
movie_id INTEGER PRIMARY KEY
|
||
)
|
||
""")
|
||
# 清空临时表(以防之前有残留数据)
|
||
self.cursor.execute("DELETE FROM temp_movies_to_update")
|
||
|
||
logging.info(f"开始收集需要更新的影片ID...")
|
||
# 使用单个SQL语句完成所有条件的查询和插入
|
||
self.cursor.execute("""
|
||
INSERT OR IGNORE INTO temp_movies_to_update (movie_id)
|
||
SELECT DISTINCT m.id
|
||
FROM javbus_movies m
|
||
-- 连接演员表
|
||
LEFT JOIN javbus_actors_movies am ON m.id = am.movie_id
|
||
LEFT JOIN javbus_actors a ON am.actor_id = a.id
|
||
-- 连接标签/系列/工作室表
|
||
LEFT JOIN javbus_labels l ON m.label_id = l.id
|
||
LEFT JOIN javbus_series s ON m.series_id = s.id
|
||
LEFT JOIN javbus_studios st ON m.studio_id = st.id
|
||
-- 筛选条件:任一表的href包含'uncensored'
|
||
WHERE a.href LIKE '%uncensored%'
|
||
OR l.href LIKE '%uncensored%'
|
||
OR s.href LIKE '%uncensored%'
|
||
OR st.href LIKE '%uncensored%'
|
||
""")
|
||
|
||
total_count = self.cursor.execute("SELECT COUNT(*) FROM temp_movies_to_update").fetchone()[0]
|
||
total_movies = self.cursor.execute("SELECT COUNT(*) FROM javbus_movies").fetchone()[0]
|
||
logging.info(f"共收集到 {total_count} 部需要更新的影片, 共有 {total_movies} 部影片")
|
||
|
||
if check_and_do:
|
||
# 1. 将所有记录的uncensored默认设为0
|
||
logging.info("开始将所有影片的uncensored设为默认值0...")
|
||
self.cursor.execute("UPDATE javbus_movies SET uncensored = 0")
|
||
logging.info(f"已将 {self.cursor.rowcount} 条记录的uncensored设为0")
|
||
|
||
# 2. 将临时表中匹配的记录设为1
|
||
logging.info("开始将匹配的影片的uncensored设为1...")
|
||
self.cursor.execute("""
|
||
UPDATE javbus_movies
|
||
SET uncensored = 1
|
||
WHERE id IN (SELECT movie_id FROM temp_movies_to_update)
|
||
""")
|
||
logging.info(f"已将 {self.cursor.rowcount} 条记录的uncensored设为1")
|
||
|
||
self.conn.commit()
|
||
else:
|
||
logging.info("check完毕,本次忽略更新。。。")
|
||
|
||
logging.info("任务执行完成!")
|
||
|
||
except sqlite3.Error as e:
|
||
self.conn.rollback()
|
||
logging.error("Error inserting movie: %s", e)
|
||
logging.error(f"query error: {e}")
|
||
|
||
# 处理影片的 无码 字段
|
||
def reset_actor_movies(self, check_and_do = 0):
|
||
try:
|
||
# 检查表中是否已存在movies_cnt列
|
||
self.cursor.execute(f"PRAGMA table_info({self.tbl_name_actors});")
|
||
columns = [row[1] for row in self.cursor.fetchall()]
|
||
|
||
if 'movies_cnt' not in columns:
|
||
# 列不存在,添加新列
|
||
add_field_sql = f"""
|
||
ALTER TABLE {self.tbl_name_actors} ADD COLUMN movies_cnt INTEGER DEFAULT 0 NOT NULL;
|
||
"""
|
||
self.cursor.execute(add_field_sql)
|
||
logging.info("成功添加movies_cnt字段")
|
||
else:
|
||
logging.info("movies_cnt字段已存在,跳过添加")
|
||
|
||
# 确保关联表有索引
|
||
self.cursor.execute(f"""
|
||
CREATE INDEX IF NOT EXISTS idx_actor_movie_actor_id
|
||
ON {self.tbl_name_actor_movie}(actor_id);
|
||
""")
|
||
|
||
# 创建临时表存储统计结果
|
||
self.cursor.execute(f"""
|
||
CREATE TEMPORARY TABLE temp_actor_counts AS
|
||
SELECT actor_id, COUNT(movie_id) AS cnt
|
||
FROM {self.tbl_name_actor_movie}
|
||
GROUP BY actor_id;
|
||
""")
|
||
|
||
# 为临时表添加索引
|
||
self.cursor.execute("CREATE INDEX idx_temp_actor_id ON temp_actor_counts(actor_id);")
|
||
|
||
# 更新主表
|
||
self.cursor.execute(f"""
|
||
UPDATE {self.tbl_name_actors}
|
||
SET movies_cnt = COALESCE((
|
||
SELECT cnt FROM temp_actor_counts
|
||
WHERE actor_id = {self.tbl_name_actors}.id
|
||
), 0); -- 使用COALESCE处理没有影片的演员
|
||
""")
|
||
updated_rows = self.cursor.rowcount
|
||
logging.info(f"成功更新{updated_rows}个演员的影片数量")
|
||
|
||
self.conn.commit()
|
||
logging.info("任务执行完成!")
|
||
|
||
except sqlite3.Error as e:
|
||
self.conn.rollback()
|
||
logging.error("Error updating actor movie_cnt: %s", e)
|
||
|
||
|
||
@register_handler(comm.SPIDER_NAME_IAFD)
|
||
class IAFDDBHandler(SQLiteDBHandler):
|
||
def __init__(self, db_path=shared_db_path):
|
||
#def __init__(self, db_path=test_db_path):
|
||
super().__init__(db_path)
|
||
self.tbl_name_performers = 'iafd_performers'
|
||
self.tbl_name_movies = 'iafd_movies'
|
||
self.tbl_name_performer_movies = 'iafd_performers_movies'
|
||
self.tbl_name_alias = 'iafd_performer_aliases'
|
||
self.tbl_name_moives_appear_in = 'iafd_movies_appers_in'
|
||
self.tbl_name_studio = 'iafd_studios'
|
||
self.tbl_name_dist = 'iafd_distributors'
|
||
self.tbl_name_performer_urls = 'iafd_performer_urls'
|
||
self.tbl_name_ethnic = 'iafd_meta_ethnic'
|
||
|
||
def insert_item(self, item):
|
||
# 获取Item中所有定义的字段(包括父类继承的)
|
||
all_fields = item.fields.keys()
|
||
# 获取已被赋值的字段(存储在Item的内部属性_values中)
|
||
assigned_fields = set(item._values.keys())
|
||
# 过滤被赋值过的字段,其他预定义的字段不处理,这样在插入/更新时才不影响无关字段的值
|
||
processed_item = {}
|
||
for field in assigned_fields:
|
||
processed_item[field] = item[field]
|
||
|
||
if isinstance(item, items_def.IafdPerformersItem):
|
||
self.insert_or_update_performer(processed_item)
|
||
|
||
elif isinstance(item, items_def.IafdMoviesItem):
|
||
self.insert_or_update_movie(processed_item)
|
||
|
||
elif isinstance(item, items_def.IafdDistributorsItem):
|
||
self.insert_or_update_distributor(data=processed_item)
|
||
|
||
elif isinstance(item, items_def.IafdStudiosItem):
|
||
self.insert_or_update_studio(data=processed_item)
|
||
|
||
elif isinstance(item, items_def.IafdMetaEthnicItem):
|
||
self.insert_or_update_ethnic(data=processed_item)
|
||
|
||
else:
|
||
logging.error(f"unkown item. {processed_item}")
|
||
|
||
return item
|
||
|
||
# 统计函数
|
||
def get_stat(self):
|
||
try:
|
||
# 使用单个SQL查询获取所有统计数据
|
||
self.cursor.execute("""
|
||
SELECT
|
||
(SELECT COUNT(*) FROM iafd_performers) AS iafd_actors,
|
||
(SELECT COUNT(*) FROM iafd_performers WHERE is_full_data=1) AS act_full,
|
||
(SELECT COUNT(*) FROM iafd_movies) AS movies,
|
||
(SELECT COUNT(*) FROM iafd_movies WHERE is_full_data=1) AS mov_full,
|
||
(SELECT COUNT(*) FROM iafd_distributors) AS dist,
|
||
(SELECT COUNT(*) FROM iafd_studios) AS stu
|
||
""")
|
||
|
||
# 获取查询结果
|
||
row = self.cursor.fetchone()
|
||
if not row:
|
||
return {}
|
||
|
||
# 手动定义列名(与SQL中的AS别名保持一致)
|
||
#columns = ['iafd_actors', 'act_full', 'movies', 'mov_full', 'dist', 'stu']
|
||
columns = [desc[0] for desc in self.cursor.description]
|
||
|
||
# 将元组结果转换为字典
|
||
return dict(zip(columns, row))
|
||
|
||
except sqlite3.Error as e:
|
||
logging.error(f"查询失败: {e}")
|
||
return {}
|
||
|
||
# 插入演员索引,来自于列表数据
|
||
#def insert_performer_index(self, name, href, from_astro_list=None, from_birth_list=None, from_ethnic_list=None, from_movie_list=None):
|
||
def insert_performer_index(self, name, href, **kwargs):
|
||
fields = [
|
||
'from_astro_list', 'from_birth_list', 'from_ethnic_list', 'from_movie_list'
|
||
]
|
||
data = {'name': name, 'href': href}
|
||
# 如果没有传入值,就用原来的值
|
||
for field in fields:
|
||
if kwargs.get(field) is not None:
|
||
data[field] = kwargs.get(field)
|
||
|
||
return self.insert_or_update_common(data=data, tbl_name=self.tbl_name_performers, uniq_key='href', exists_do_nothing=False)
|
||
|
||
# """插入电影索引,来自于列表数据"""
|
||
#def insert_movie_index(self, title, href, release_year=0, from_performer_list=None, from_dist_list=None, from_stu_list=None):
|
||
def insert_movie_index(self, title, href, **kwargs):
|
||
fields = [
|
||
'from_performer_list', 'from_dist_list', 'from_stu_list', 'release_year'
|
||
]
|
||
data = {'title': title, 'href': href}
|
||
# 如果没有传入值,就用原来的值
|
||
for field in fields:
|
||
if kwargs.get(field) is not None:
|
||
data[field] = kwargs.get(field)
|
||
|
||
return self.insert_or_update_common(data=data, tbl_name=self.tbl_name_movies, uniq_key='href', exists_do_nothing=False)
|
||
|
||
# 插入演员和电影的关联数据
|
||
def insert_performer_movie(self, performer_id, movie_id, role, notes):
|
||
return self.insert_or_update_with_composite_pk(
|
||
data = {'performer_id': performer_id, 'movie_id': movie_id, 'role': role, 'notes': notes},
|
||
tbl_name = self.tbl_name_performer_movies,
|
||
composite_pk = ['performer_id', 'movie_id'],
|
||
exists_do_nothing = True
|
||
)
|
||
|
||
# 插入电影和电影的关联数据
|
||
def insert_movie_appears_in(self, movie_id, appears_in_id, gradation=0, notes=''):
|
||
return self.insert_or_update_with_composite_pk(
|
||
data = {'movie_id': movie_id, 'appears_in_id': appears_in_id, 'gradation': gradation, 'notes': notes},
|
||
tbl_name = self.tbl_name_moives_appear_in,
|
||
composite_pk = ['movie_id', 'appears_in_id'],
|
||
exists_do_nothing = True
|
||
)
|
||
|
||
# 插入演员信息
|
||
def insert_or_update_performer(self, data, movies_update=True):
|
||
try:
|
||
# 插入演员信息
|
||
performer_id = self.insert_or_update_common(data=data, tbl_name=self.tbl_name_performers, uniq_key='href', exists_do_nothing=False)
|
||
if performer_id is None:
|
||
return None
|
||
logging.debug(f"insert one performer, id: {performer_id}, name: {data['name']}, href: {data['href']}")
|
||
|
||
# 插入新的 alias
|
||
alias_list = data.get("performer_aka", [])
|
||
# 确保alias是可迭代对象(列表/元组),否则跳过
|
||
if not isinstance(alias_list, (list, tuple)):
|
||
alias_list = []
|
||
for alias in alias_list:
|
||
if alias.lower() != "no known aliases":
|
||
self.insert_or_update_with_composite_pk(
|
||
data={'performer_id': performer_id, 'alias': alias},
|
||
tbl_name = self.tbl_name_alias,
|
||
composite_pk = ['performer_id', 'alias'],
|
||
exists_do_nothing = True
|
||
)
|
||
|
||
# 插入影片列表,可能有 personal 和 director 两个身份
|
||
if movies_update:
|
||
credits = data.get('credits', {})
|
||
# 强制转换为字典(防止非字典类型)
|
||
if not isinstance(credits, dict):
|
||
credits = {}
|
||
for role, movies in credits.items():
|
||
# 确保movies是可迭代对象(列表/元组),否则跳过
|
||
if not isinstance(movies, (list, tuple)):
|
||
continue
|
||
if movies:
|
||
for movie in movies:
|
||
movie_id = self.get_id_by_key(tbl=self.tbl_name_movies, uniq_key='href', val=movie['href'])
|
||
# 影片不存在,先插入
|
||
if movie_id is None:
|
||
release_year=0
|
||
try:
|
||
release_year=int(movie['year'])
|
||
except Exception as e:
|
||
release_year=0
|
||
movie_id = self.insert_movie_index(movie['title'], movie['href'], release_year=int(movie['year']), from_performer_list=1)
|
||
if movie_id:
|
||
tmp_id = self.insert_performer_movie(performer_id, movie_id, role, movie['notes'])
|
||
if tmp_id :
|
||
logging.debug(f"insert one performer_movie, performer_id: {performer_id}, movie_id: {movie_id}, role: {role}")
|
||
else:
|
||
logging.warning(f"insert performer_movie failed. performer_id: {performer_id}, moive href: {movie['href']}")
|
||
|
||
return performer_id
|
||
|
||
except sqlite3.Error as e:
|
||
self.conn.rollback()
|
||
logging.error(f"数据库错误: {e}, data: {data}")
|
||
return None
|
||
except Exception as e:
|
||
self.conn.rollback()
|
||
logging.error(f"未知错误: {e}, data: {data}")
|
||
return None
|
||
|
||
|
||
# """插入或更新电影数据(异常url的处理,比如404链接)"""
|
||
def insert_or_update_performer_404(self, name, href, is_full_data=1):
|
||
return self.insert_or_update_common(
|
||
data={'href': href, 'name': name, 'is_full_data': is_full_data},
|
||
tbl_name=self.tbl_name_performers,
|
||
uniq_key='href',
|
||
exists_do_nothing = False
|
||
)
|
||
|
||
# 插入或更新发行商 """
|
||
def insert_or_update_ethnic(self, data):
|
||
return self.insert_or_update_common(data=data, tbl_name=self.tbl_name_ethnic, uniq_key='href', exists_do_nothing=False)
|
||
|
||
# 按条件查询 href 列表
|
||
def query_ethnic_hrefs(self, **filters):
|
||
try:
|
||
sql = "SELECT href, name FROM iafd_meta_ethnic WHERE 1=1"
|
||
params = []
|
||
|
||
if "id" in filters:
|
||
sql += " AND id = ?"
|
||
params.append(filters["id"])
|
||
if "url" in filters:
|
||
sql += " AND href = ?"
|
||
params.append(filters["href"])
|
||
if "name" in filters:
|
||
sql += " AND name LIKE ?"
|
||
params.append(f"%{filters['name']}%")
|
||
|
||
self.cursor.execute(sql, params)
|
||
#return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写
|
||
return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()]
|
||
|
||
except sqlite3.Error as e:
|
||
logging.error(f"查询 href 失败: {e}")
|
||
return None
|
||
|
||
# 插入或更新发行商 """
|
||
def insert_or_update_distributor(self, data):
|
||
return self.insert_or_update_common(data=data, tbl_name=self.tbl_name_dist, uniq_key='href', exists_do_nothing=False)
|
||
|
||
# 按条件查询 href 列表
|
||
def query_distributor_hrefs(self, **filters):
|
||
try:
|
||
sql = "SELECT href FROM iafd_distributors WHERE 1=1"
|
||
params = []
|
||
|
||
if "id" in filters:
|
||
sql += " AND id = ?"
|
||
params.append(filters["id"])
|
||
if "url" in filters:
|
||
sql += " AND href = ?"
|
||
params.append(filters["href"])
|
||
if "name" in filters:
|
||
sql += " AND name LIKE ?"
|
||
params.append(f"%{filters['name']}%")
|
||
|
||
self.cursor.execute(sql, params)
|
||
return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写
|
||
|
||
except sqlite3.Error as e:
|
||
logging.error(f"查询 href 失败: {e}")
|
||
return None
|
||
|
||
# """ 插入或更新制作公司 """
|
||
def insert_or_update_studio(self, data):
|
||
return self.insert_or_update_common(data=data, tbl_name=self.tbl_name_studio, uniq_key='href', exists_do_nothing=False)
|
||
|
||
# 按条件查询 href 列表
|
||
def query_studio_hrefs(self, **filters):
|
||
try:
|
||
sql = "SELECT href FROM iafd_studios WHERE 1=1"
|
||
params = []
|
||
|
||
if "id" in filters:
|
||
sql += " AND id = ?"
|
||
params.append(filters["id"])
|
||
if "href" in filters:
|
||
sql += " AND href = ?"
|
||
params.append(filters["href"])
|
||
if "name" in filters:
|
||
sql += " AND name LIKE ?"
|
||
params.append(f"%{filters['name']}%")
|
||
|
||
self.cursor.execute(sql, params)
|
||
return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写
|
||
|
||
except sqlite3.Error as e:
|
||
logging.error(f"查询 href 失败: {e}")
|
||
return None
|
||
|
||
# 检查记录是否存在,不存在就插入
|
||
def check_and_get_id(self, name, href, tbl, uniq_key='href'):
|
||
if not is_valid_url(href):
|
||
return 0
|
||
row_id = self.get_id_by_key(tbl, uniq_key, href)
|
||
if row_id is None:
|
||
data = {'name':name, 'href': href}
|
||
if tbl == self.tbl_name_performers:
|
||
data['from_movie_list'] = 1
|
||
row_id = self.insert_or_update_common(data=data, tbl_name=tbl, uniq_key=uniq_key, exists_do_nothing=True)
|
||
|
||
return row_id if row_id else 0
|
||
|
||
# """插入或更新电影数据"""
|
||
def insert_or_update_movie(self, movie_data):
|
||
try:
|
||
# 获取相关 ID
|
||
movie_data['distributor_id']= self.check_and_get_id(movie_data['Distributor'], movie_data['DistributorHref'], self.tbl_name_dist, uniq_key='href')
|
||
movie_data['studio_id'] = self.check_and_get_id(movie_data['Studio'], movie_data['StudioHref'], self.tbl_name_studio, uniq_key='href')
|
||
movie_data['director_id'] = self.check_and_get_id(movie_data['Director'], movie_data['DirectorHref'], self.tbl_name_performers, uniq_key='href')
|
||
|
||
movie_id = self.insert_or_update_common(data=movie_data, tbl_name=self.tbl_name_movies, uniq_key='href', exists_do_nothing=False)
|
||
if movie_id is None:
|
||
return None
|
||
logging.debug(f"insert one move, id: {movie_id}, title: {movie_data['title']}, href: {movie_data['href']}")
|
||
|
||
# 导演-电影写入 关系表
|
||
if movie_data['director_id']:
|
||
tmp_id = self.insert_performer_movie(movie_data['director_id'], movie_id, 'directoral', '')
|
||
if tmp_id:
|
||
logging.debug(f"insert one perfomer_movie. director_id: {movie_data['director_id']}, movie_id:{movie_id}")
|
||
for director in movie_data.get('Directors', []):
|
||
director_id = self.check_and_get_id(director['name'], director['href'], self.tbl_name_performers, uniq_key='href')
|
||
if director_id:
|
||
tmp_id = self.insert_performer_movie(director_id, movie_id, 'directoral', '')
|
||
if tmp_id:
|
||
logging.debug(f"insert one perfomer_movie. director_id: {director_id}, movie_id:{movie_id}")
|
||
|
||
# 插入 performers_movies 关系表
|
||
for performer in movie_data.get('Performers', []):
|
||
performer_id = self.check_and_get_id(performer['name'], performer['href'], self.tbl_name_performers, uniq_key='href')
|
||
if performer_id:
|
||
notes = '|'.join(tag for tag in performer['tags'] if tag != performer['name'])
|
||
tmp_id = self.insert_performer_movie(performer_id, movie_id, 'personal', notes)
|
||
if tmp_id:
|
||
logging.debug(f"insert one perfomer_movie. perfomer_id: {performer_id}, movie_id:{movie_id}")
|
||
else:
|
||
logging.debug(f"insert perfomer_movie failed. perfomer_id: {performer_id}, movie_id:{movie_id}")
|
||
else:
|
||
logging.warning(f"insert perfomer failed. name: {performer['name']}, href: {performer['href']}")
|
||
|
||
# 插入 movies_appers_in 表
|
||
for appears in movie_data.get("AppearsIn", []):
|
||
appears_in_id = self.get_id_by_key(self.tbl_name_movies, 'href', appears['href'])
|
||
# 不存在,先插入
|
||
if appears_in_id is None:
|
||
appears_in_id = self.insert_movie_index( appears['title'], appears['href'])
|
||
if appears_in_id:
|
||
tmp_id = self.insert_movie_appears_in(movie_id, appears_in_id)
|
||
if tmp_id:
|
||
logging.debug(f"insert one movie_appears_in record. movie_id: {movie_id}, appears_in_id: {appears_in_id}")
|
||
else:
|
||
logging.warning(f"insert movie_appears_in failed. movie_id: {movie_id}, appears_in_id: {appears_in_id}")
|
||
else:
|
||
logging.warning(f"get appears_in_id failed. title: {appears['title']}, href: {appears['href']}")
|
||
|
||
return movie_id
|
||
|
||
except Exception as e:
|
||
self.conn.rollback()
|
||
logging.error("Error inserting movie: %s", e)
|
||
return None
|
||
|
||
|
||
# """插入或更新电影数据(异常url的处理,比如404链接)"""
|
||
def insert_or_update_movie_404(self, title, href, is_full_data=1):
|
||
return self.insert_or_update_common(
|
||
data={'href': href, 'title':title, 'is_full_data':is_full_data},
|
||
tbl_name=self.tbl_name_movies,
|
||
uniq_key='href',
|
||
exists_do_nothing = False
|
||
)
|
||
|
||
# 按条件查询 href 列表
|
||
def query_performer_hrefs(self, **filters):
|
||
return self.get_performers(**filters)
|
||
|
||
# 按条件查询 href 列表
|
||
def get_performers(self, **filters):
|
||
try:
|
||
sql = f"SELECT href, name, id, movies_cnt, is_full_data FROM {self.tbl_name_performers} WHERE 1=1"
|
||
params = []
|
||
|
||
conditions = {
|
||
"id": " AND id = ?",
|
||
"href": " AND href = ?",
|
||
"name": " AND name LIKE ?",
|
||
"is_full_data": " AND is_full_data = ?",
|
||
"start_id": " AND id > ?",
|
||
}
|
||
|
||
for key, condition in conditions.items():
|
||
if key in filters:
|
||
sql += condition
|
||
if key == "name":
|
||
params.append(f"%{filters[key]}%")
|
||
else:
|
||
params.append(filters[key])
|
||
|
||
for key in ["is_full_data_in", "is_full_data_not_in"]:
|
||
if key in filters:
|
||
values = filters[key]
|
||
if values:
|
||
placeholders = ", ".join(["?"] * len(values))
|
||
operator = "IN" if key == "is_full_data_in" else "NOT IN"
|
||
sql += f" AND is_full_data {operator} ({placeholders})"
|
||
params.extend(values)
|
||
|
||
if "order_by" in filters:
|
||
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
|
||
sql += f" ORDER BY {filters['order_by']} "
|
||
|
||
if 'limit' in filters:
|
||
sql += " LIMIT ?"
|
||
params.append(filters["limit"])
|
||
|
||
self.cursor.execute(sql, params)
|
||
return [dict(row) for row in self.cursor.fetchall()]
|
||
except sqlite3.Error as e:
|
||
logging.error(f"查询 href 失败: {e}")
|
||
return None
|
||
|
||
# 按条件查询 href 列表
|
||
def query_movie_hrefs(self, **filters):
|
||
return self.get_movies(**filters)
|
||
|
||
# 按条件查询 href 列表
|
||
def get_movies(self, **filters):
|
||
try:
|
||
sql = f"SELECT href, title, id, is_full_data FROM {self.tbl_name_movies} WHERE 1=1"
|
||
params = []
|
||
|
||
conditions = {
|
||
"id": " AND id = ?",
|
||
"href": " AND href = ?",
|
||
"title": " AND title LIKE ?",
|
||
"is_full_data": " AND is_full_data = ?",
|
||
"start_id": " AND id > ?",
|
||
}
|
||
|
||
for key, condition in conditions.items():
|
||
if key in filters:
|
||
sql += condition
|
||
if key == "title":
|
||
params.append(f"%{filters[key]}%")
|
||
else:
|
||
params.append(filters[key])
|
||
|
||
for key in ["is_full_data_in", "is_full_data_not_in"]:
|
||
if key in filters:
|
||
values = filters[key]
|
||
if values:
|
||
placeholders = ", ".join(["?"] * len(values))
|
||
operator = "IN" if key == "is_full_data_in" else "NOT IN"
|
||
sql += f" AND is_full_data {operator} ({placeholders})"
|
||
params.extend(values)
|
||
|
||
if "order_by" in filters:
|
||
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
|
||
sql += f" ORDER BY {filters['order_by']} "
|
||
|
||
if 'limit' in filters:
|
||
sql += " LIMIT ?"
|
||
params.append(filters["limit"])
|
||
|
||
self.cursor.execute(sql, params)
|
||
return [dict(row) for row in self.cursor.fetchall()]
|
||
except sqlite3.Error as e:
|
||
logging.error(f"查询 href 失败: {e}")
|
||
return None
|
||
|
||
|
||
# 按条件查询 href 列表
|
||
def get_lord_actors(self, **filters):
|
||
try:
|
||
sql = f"SELECT href, pornstar as name, id FROM {self.tbl_name_thelordofporn_actress} WHERE 1=1"
|
||
params = []
|
||
|
||
conditions = {
|
||
"id": " AND id = ?",
|
||
"href": " AND href = ?",
|
||
"pornstar": " AND pornstar LIKE ?",
|
||
"start_id": " AND id > ?",
|
||
}
|
||
|
||
for key, condition in conditions.items():
|
||
if key in filters:
|
||
sql += condition
|
||
if key == "pornstar":
|
||
params.append(f"%{filters[key]}%")
|
||
else:
|
||
params.append(filters[key])
|
||
|
||
if "order_by" in filters:
|
||
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
|
||
sql += f" ORDER BY {filters['order_by']} "
|
||
|
||
if 'limit' in filters:
|
||
sql += " LIMIT ?"
|
||
params.append(filters["limit"])
|
||
|
||
self.cursor.execute(sql, params)
|
||
return [dict(row) for row in self.cursor.fetchall()]
|
||
except sqlite3.Error as e:
|
||
logging.error(f"查询 href 失败: {e}")
|
||
return None
|
||
|
||
# 按条件查询 href 列表
|
||
def get_iafd_actors(
|
||
self,
|
||
names: List[str],
|
||
tbl = 'stu'
|
||
) -> Dict[str, List[Dict[str, str]]]:
|
||
"""
|
||
分两步查询指定发行商对应的女性演员(使用临时表减少内存占用)
|
||
|
||
步骤1:筛选目标发行商及其关联的影片,存入临时表(小集合)
|
||
步骤2:用临时表的影片ID关联演员表,获取女性演员信息
|
||
"""
|
||
tbl_name = 'iafd_studios' if tbl.lower() == 'stu' else 'iafd_distributors'
|
||
join_key = 'studio_id' if tbl.lower() == 'stu' else 'distributor_id'
|
||
if not names:
|
||
return {}
|
||
|
||
# 结果容器
|
||
final_result: Dict[str, List[Dict[str, str]]] = {}
|
||
|
||
try:
|
||
# --------------------------
|
||
# 步骤1:创建临时表,存储目标发行商及其关联的影片
|
||
# --------------------------
|
||
# 先删除可能残留的临时表(避免冲突)
|
||
self.cursor.execute("DROP TABLE IF EXISTS temp_distributor_movies")
|
||
# 创建临时表(只在当前连接可见,连接关闭后自动删除)
|
||
self.cursor.execute("""
|
||
CREATE TEMPORARY TABLE temp_distributor_movies (
|
||
distributor_id INTEGER,
|
||
distributor_name TEXT,
|
||
movie_id INTEGER,
|
||
PRIMARY KEY (distributor_id, movie_id)
|
||
)
|
||
""")
|
||
|
||
# 批量插入目标发行商及其关联的影片(小集合)
|
||
# 先筛选发行商,再关联影片,结果插入临时表
|
||
insert_sql = """
|
||
INSERT INTO temp_distributor_movies (distributor_id, distributor_name, movie_id)
|
||
SELECT
|
||
d.id AS distributor_id,
|
||
d.name AS distributor_name,
|
||
m.id AS movie_id
|
||
FROM
|
||
{tbl_name} d
|
||
INNER JOIN
|
||
iafd_movies m ON d.id = m.{join_key}
|
||
WHERE
|
||
d.name IN ({placeholders})
|
||
""".format(
|
||
tbl_name=tbl_name,
|
||
join_key=join_key,
|
||
placeholders=', '.join(['?'] * len(names))
|
||
)
|
||
|
||
logging.debug(f'{insert_sql}')
|
||
|
||
self.cursor.execute(insert_sql, names)
|
||
self.conn.commit() # 提交临时表数据
|
||
|
||
# --------------------------
|
||
# 步骤2:用临时表关联演员信息(仅处理小集合)
|
||
# --------------------------
|
||
query_sql = """
|
||
SELECT
|
||
t.distributor_name,
|
||
p.name AS performer_name,
|
||
p.href AS performer_href
|
||
FROM
|
||
temp_distributor_movies t
|
||
INNER JOIN
|
||
iafd_performers_movies pm ON t.movie_id = pm.movie_id
|
||
INNER JOIN
|
||
iafd_performers p ON pm.performer_id = p.id
|
||
WHERE
|
||
p.gender = 'Woman'
|
||
ORDER BY
|
||
t.distributor_name, p.name
|
||
"""
|
||
|
||
self.cursor.execute(query_sql)
|
||
rows = self.cursor.fetchall()
|
||
|
||
# 整理结果:按发行商分组
|
||
for row in rows:
|
||
dist_name = row['distributor_name']
|
||
performer = {
|
||
'name': row['performer_name'],
|
||
'href': row['performer_href']
|
||
}
|
||
if dist_name not in final_result:
|
||
final_result[dist_name] = []
|
||
final_result[dist_name].append(performer)
|
||
|
||
# 主动清理临时表(可选,连接关闭后会自动删除)
|
||
self.cursor.execute("DROP TABLE IF EXISTS temp_distributor_movies")
|
||
|
||
except sqlite3.Error as e:
|
||
print(f"查询失败:{e}")
|
||
return {}
|
||
|
||
return final_result
|
||
|
||
|
||
# 获取 view_iafd_performers_movies 中数据 不匹配的演员信息。
|
||
def get_performers_needed_update(self, limit=None):
|
||
try:
|
||
sql = """
|
||
SELECT href, name FROM view_iafd_performers_movies where actual_movies_cnt != movies_cnt
|
||
"""
|
||
|
||
if limit is not None:
|
||
sql += f" LIMIT {limit}"
|
||
|
||
self.cursor.execute(sql)
|
||
return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()]
|
||
|
||
except sqlite3.Error as e:
|
||
logging.error(f"查询 href 失败: {e}")
|
||
return []
|
||
|
||
# 生成一个复杂的演员电影数量的查询视图,来判断从电影列表中聚合出来的演员-影片数量,与从演员列表中抓取到的影片数量,是否相等。
|
||
def check_and_create_stat_table(self, taskid = 0):
|
||
try:
|
||
# 检查索引是否存在,如果不存在则创建
|
||
indexes = [
|
||
("idx_iafd_performers_movies_performer_id",
|
||
"CREATE INDEX idx_iafd_performers_movies_performer_id ON iafd_performers_movies (performer_id);"),
|
||
("idx_iafd_movies_director_id",
|
||
"CREATE INDEX idx_iafd_movies_director_id ON iafd_movies (director_id);"),
|
||
("idx_iafd_performers_id",
|
||
"CREATE INDEX idx_iafd_performers_id ON iafd_performers (id);")
|
||
]
|
||
for index_name, create_index_sql in indexes:
|
||
self.cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name=?", (index_name,))
|
||
if not self.cursor.fetchone():
|
||
self.cursor.execute(create_index_sql)
|
||
logging.info(f"Index {index_name} created successfully.")
|
||
else:
|
||
logging.info(f"Index {index_name} already exists.")
|
||
|
||
# 检查视图是否存在,如果不存在则创建
|
||
view_name = f"iafd_tmp_performers_stat_{taskid}"
|
||
self.cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (view_name,))
|
||
if self.cursor.fetchone():
|
||
self.cursor.execute("drop table ?", (view_name,))
|
||
self.conn.commit()
|
||
|
||
create_view_sql = f"""
|
||
CREATE table {view_name} AS
|
||
SELECT
|
||
id,
|
||
href,
|
||
name,
|
||
movies_cnt,
|
||
SUM(CASE WHEN role = 'actor' THEN movie_count ELSE 0 END) AS actor_movie_count,
|
||
SUM(CASE WHEN role = 'director' THEN movie_count ELSE 0 END) AS director_movie_count
|
||
FROM (
|
||
SELECT
|
||
p.id,
|
||
p.href,
|
||
p.name,
|
||
p.movies_cnt,
|
||
COUNT(apm.movie_id) AS movie_count,
|
||
'actor' AS role
|
||
FROM
|
||
iafd_performers p
|
||
LEFT JOIN
|
||
iafd_performers_movies apm ON p.id = apm.performer_id
|
||
GROUP BY
|
||
p.id, p.href, p.name, p.movies_cnt
|
||
|
||
UNION ALL
|
||
|
||
SELECT
|
||
p.id,
|
||
p.href,
|
||
p.name,
|
||
p.movies_cnt,
|
||
COUNT(im.id) AS movie_count,
|
||
'director' AS role
|
||
FROM
|
||
iafd_performers p
|
||
LEFT JOIN
|
||
iafd_movies im ON p.id = im.director_id
|
||
GROUP BY
|
||
p.id, p.href, p.name, p.movies_cnt
|
||
) combined
|
||
GROUP BY
|
||
id, href, name, movies_cnt;
|
||
"""
|
||
self.cursor.execute(create_view_sql)
|
||
logging.info(f"table {view_name} created successfully.")
|
||
|
||
# 提交更改并关闭连接
|
||
self.conn.commit()
|
||
except sqlite3.Error as e:
|
||
logging.warning(f"An error occurred: {e}")
|
||
|
||
|
||
# 处理影片的 无码 字段
|
||
def reset_actor_movies(self, check_and_do = 0):
|
||
try:
|
||
# 检查表中是否已存在movies_cnt列
|
||
self.cursor.execute(f"PRAGMA table_info(iafd_performers);")
|
||
columns = [row[1] for row in cursor.fetchall()]
|
||
|
||
if 'movies_cnt' not in columns:
|
||
# 列不存在,添加新列
|
||
add_field_sql = f"""
|
||
ALTER TABLE iafd_performers ADD COLUMN movies_cnt INTEGER DEFAULT 0 NOT NULL;
|
||
"""
|
||
self.cursor.execute(add_field_sql)
|
||
logging.info("成功添加movies_cnt字段")
|
||
else:
|
||
logging.info("movies_cnt字段已存在,跳过添加")
|
||
|
||
# 确保关联表有索引
|
||
self.cursor.execute(f"""
|
||
CREATE INDEX IF NOT EXISTS idx_iafd_performers_movies_performer_id
|
||
ON iafd_performers_movies(performer_id);
|
||
""")
|
||
|
||
# 创建临时表存储统计结果
|
||
self.cursor.execute(f"""
|
||
CREATE TEMPORARY TABLE temp_actor_counts AS
|
||
SELECT performer_id, COUNT(movie_id) AS cnt
|
||
FROM iafd_performers_movies
|
||
GROUP BY performer_id;
|
||
""")
|
||
|
||
# 为临时表添加索引
|
||
self.cursor.execute("CREATE INDEX idx_temp_performer_id ON temp_actor_counts(performer_id);")
|
||
|
||
# 更新主表
|
||
self.cursor.execute(f"""
|
||
UPDATE iafd_performers
|
||
SET movies_cnt = COALESCE((
|
||
SELECT cnt FROM temp_actor_counts
|
||
WHERE performer_id = iafd_performers.id
|
||
), 0); -- 使用COALESCE处理没有影片的演员
|
||
""")
|
||
|
||
updated_rows = cursor.rowcount
|
||
logging.info(f"成功更新{updated_rows}个演员的影片数量")
|
||
|
||
# 清理资源
|
||
self.cursor.execute("DROP TABLE IF EXISTS temp_actor_counts;")
|
||
self.conn.commit()
|
||
|
||
logging.info("任务执行完成!")
|
||
|
||
except sqlite3.Error as e:
|
||
self.conn.rollback()
|
||
logging.error("Error updating actor movie_cnt: %s", e)
|
||
|