This repository has been archived on 2026-01-07. You can view files and clone it, but cannot push or open issues or pull requests.
Files
resources/scrapy_proj/scrapy_proj/db_wapper/spider_db_handler.py
2025-07-22 16:53:04 +08:00

564 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sqlite3
import json
import logging
from datetime import datetime
from typing import List, Dict
from scrapy_proj.db_wapper.sqlite_base import SQLiteDBHandler, default_dbpath, shared_db_path
import scrapy_proj.comm.comm_def as comm
# 注册器字典
spider_handler_registry = {}
def register_handler(spider_name):
def decorator(cls):
spider_handler_registry[spider_name.lower()] = cls
return cls
return decorator
@register_handler(comm.SPIDER_NAME_SIS)
class SisDBHandler(SQLiteDBHandler):
def __init__(self, db_path=default_dbpath):
super().__init__(db_path)
self.tbl_name_sis = 'sis'
def insert_item(self, item):
self.insert_or_update_common(item, tbl_name=self.tbl_name_sis, uniq_key='url', exists_do_nothing=True)
# 统计函数
def get_stat(self):
try:
self.cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM {self.tbl_name_sis}) AS cnt
""")
row = self.cursor.fetchone()
if not row:
logging.warning(f"query no results.")
return {}
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return {}
@register_handler(comm.SPIDER_NAME_U3C3)
class U3C3DBHandler(SQLiteDBHandler):
def __init__(self, db_path=default_dbpath):
super().__init__(db_path)
self.tbl_name_u3c3 = 'u3c3'
def insert_item(self, item):
self.insert_or_update_common(item, tbl_name=self.tbl_name_u3c3, uniq_key='url', exists_do_nothing=True)
# 统计函数
def get_stat(self):
try:
self.cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM {self.tbl_name_u3c3}) AS cnt
""")
row = self.cursor.fetchone()
if not row:
logging.warning(f"query no results.")
return {}
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return {}
@register_handler(comm.SPIDER_NAME_CLM)
class ClmDBHandler(SQLiteDBHandler):
def __init__(self, db_path=default_dbpath):
super().__init__(db_path)
self.tbl_name_clm_index = 'clm_index'
self.tbl_name_clm_keywords = 'clm_keywords'
self.tbl_name_words_index = 'clm_keywords_index'
def insert_item(self, item):
if item['item_type'] == comm.ITEM_TYPE_CLM_INDEX:
self.insert_index(item)
elif item['item_type'] == comm.ITEM_TYPE_CLM_KEYWORDS:
self.insert_or_update_common(item, self.tbl_name_clm_keywords, uniq_key='words', exists_do_nothing=False)
else:
logging.error(f"unkown item.")
return item
def insert_index(self, item):
if item['is_update']: # 仅更新
self.insert_or_update_common(item, self.tbl_name_clm_index, uniq_key='href', exists_do_nothing=False)
return
row_id = self.insert_or_update_common(item, self.tbl_name_clm_index, uniq_key='href', exists_do_nothing=True)
if row_id:
lnk_data = {}
lnk_data['words_id'] = item['key_words_id']
lnk_data['index_id'] = row_id
lnk_data['wid_iid'] = f"{item['key_words_id']}_{row_id}"
lnk_data['tags'] = item['key_words']
lnk_id = self.insert_or_update_common(lnk_data, self.tbl_name_words_index, uniq_key='wid_iid', exists_do_nothing=True)
if lnk_id:
logging.debug(f"insert one item: {lnk_data}")
else:
logging.warning(f"insert item error: {lnk_data}")
else:
logging.warning(f"insert index error: {item}")
def get_empty_title(self):
try:
self.cursor.execute(f"SELECT id, href FROM {self.tbl_name_clm_index} WHERE title='' ")
return [dict(row) for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
def get_count_by_keywords_id(self, key_words_id):
self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_name_words_index} WHERE words_id = ?", (key_words_id,))
row = self.cursor.fetchone()
return row[0] if row else None
# 按条件查询 href 列表
def get_key_words(self, **filters):
try:
sql = f"SELECT id, words, groups FROM {self.tbl_name_clm_keywords} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"words": " AND words LIKE ?",
"groups": " AND groups LIKE ?",
"tags": " AND tags LIKE ?",
"start_id": " AND id > ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "words" or key == 'groups' or key == 'tags':
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
if "query_str" in filters:
sql += f" AND {filters['query_str']} "
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [dict(row) for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 统计函数
def get_stat(self):
try:
self.cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM {self.tbl_name_clm_keywords}) AS words,
(SELECT COUNT(*) FROM {self.tbl_name_clm_index}) AS magnets,
(SELECT COUNT(*) FROM {self.tbl_name_words_index}) AS wd_mag
""")
row = self.cursor.fetchone()
if not row:
logging.warning(f"query no results.")
return {}
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return {}
@register_handler(comm.SPIDER_NAME_IAFD)
class IAFDDBHandler(SQLiteDBHandler):
def __init__(self, db_path=shared_db_path):
super().__init__(db_path)
self.tbl_name_performers = 'iafd_performers'
self.tbl_name_movies = 'iafd_movies'
self.uniq_key = 'href'
self.tbl_name_thelordofporn_actress = 'thelordofporn_actress'
def insert_item(self, item):
pass
# 按条件查询 href 列表
def get_performers(self, **filters):
try:
sql = f"SELECT href, name, id, movies_cnt FROM {self.tbl_name_performers} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"name": " AND name LIKE ?",
"is_full_data": " AND is_full_data = ?",
"start_id": " AND id > ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "name":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
for key in ["is_full_data_in", "is_full_data_not_in"]:
if key in filters:
values = filters[key]
if values:
placeholders = ", ".join(["?"] * len(values))
operator = "IN" if key == "is_full_data_in" else "NOT IN"
sql += f" AND is_full_data {operator} ({placeholders})"
params.extend(values)
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [dict(row) for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 按条件查询 href 列表
def get_movies(self, **filters):
try:
sql = f"SELECT href, title, id FROM {self.tbl_name_movies} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"title": " AND title LIKE ?",
"is_full_data": " AND is_full_data = ?",
"start_id": " AND id > ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "title":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
for key in ["is_full_data_in", "is_full_data_not_in"]:
if key in filters:
values = filters[key]
if values:
placeholders = ", ".join(["?"] * len(values))
operator = "IN" if key == "is_full_data_in" else "NOT IN"
sql += f" AND is_full_data {operator} ({placeholders})"
params.extend(values)
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [dict(row) for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 按条件查询 href 列表
def get_lord_actors(self, **filters):
try:
sql = f"SELECT href, pornstar as name, id FROM {self.tbl_name_thelordofporn_actress} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"pornstar": " AND pornstar LIKE ?",
"start_id": " AND id > ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "pornstar":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [dict(row) for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 按条件查询 href 列表
def get_iafd_actors(
self,
names: List[str],
tbl = 'stu'
) -> Dict[str, List[Dict[str, str]]]:
"""
分两步查询指定发行商对应的女性演员(使用临时表减少内存占用)
步骤1筛选目标发行商及其关联的影片存入临时表小集合
步骤2用临时表的影片ID关联演员表获取女性演员信息
"""
tbl_name = 'iafd_studios' if tbl.lower() == 'stu' else 'iafd_distributors'
join_key = 'studio_id' if tbl.lower() == 'stu' else 'distributor_id'
if not names:
return {}
# 结果容器
final_result: Dict[str, List[Dict[str, str]]] = {}
try:
# --------------------------
# 步骤1创建临时表存储目标发行商及其关联的影片
# --------------------------
# 先删除可能残留的临时表(避免冲突)
self.cursor.execute("DROP TABLE IF EXISTS temp_distributor_movies")
# 创建临时表(只在当前连接可见,连接关闭后自动删除)
self.cursor.execute("""
CREATE TEMPORARY TABLE temp_distributor_movies (
distributor_id INTEGER,
distributor_name TEXT,
movie_id INTEGER,
PRIMARY KEY (distributor_id, movie_id)
)
""")
# 批量插入目标发行商及其关联的影片(小集合)
# 先筛选发行商,再关联影片,结果插入临时表
insert_sql = """
INSERT INTO temp_distributor_movies (distributor_id, distributor_name, movie_id)
SELECT
d.id AS distributor_id,
d.name AS distributor_name,
m.id AS movie_id
FROM
{tbl_name} d
INNER JOIN
iafd_movies m ON d.id = m.{join_key}
WHERE
d.name IN ({placeholders})
""".format(
tbl_name=tbl_name,
join_key=join_key,
placeholders=', '.join(['?'] * len(names))
)
logging.debug(f'{insert_sql}')
self.cursor.execute(insert_sql, names)
self.conn.commit() # 提交临时表数据
# --------------------------
# 步骤2用临时表关联演员信息仅处理小集合
# --------------------------
query_sql = """
SELECT
t.distributor_name,
p.name AS performer_name,
p.href AS performer_href
FROM
temp_distributor_movies t
INNER JOIN
iafd_performers_movies pm ON t.movie_id = pm.movie_id
INNER JOIN
iafd_performers p ON pm.performer_id = p.id
WHERE
p.gender = 'Woman'
ORDER BY
t.distributor_name, p.name
"""
self.cursor.execute(query_sql)
rows = self.cursor.fetchall()
# 整理结果:按发行商分组
for row in rows:
dist_name = row['distributor_name']
performer = {
'name': row['performer_name'],
'href': row['performer_href']
}
if dist_name not in final_result:
final_result[dist_name] = []
final_result[dist_name].append(performer)
# 主动清理临时表(可选,连接关闭后会自动删除)
self.cursor.execute("DROP TABLE IF EXISTS temp_distributor_movies")
except sqlite3.Error as e:
print(f"查询失败:{e}")
return {}
return final_result
@register_handler(comm.SPIDER_NAME_PBOX)
class PboxDBHandler(SQLiteDBHandler):
def __init__(self, db_path=shared_db_path):
super().__init__(db_path)
#self._create_tables()
self.tbl_studios = 'pbox_studios'
self.tbl_movies = 'pbox_movies'
self.tbl_tags = 'pbox_tags'
self.tbl_mov_tags = 'pbox_movies_tags'
self.tbl_mov_alts = 'pbox_movies_alts'
self.tbl_actor = 'pbox_actors'
self.tbl_actor_alias = 'pbox_actor_aliases'
self.tbl_actor_mov = 'pbox_actors_movies'
def _create_tables(self):
pass
def insert_item(self, item):
if item['item_type'] == comm.ITEM_TYPE_STUDIO:
self.insert_or_update_common(item, self.tbl_studios, uniq_key='href', exists_do_nothing=True)
elif item['item_type'] == comm.ITEM_TYPE_MOVIE_DETAIL:
self.insert_movie(item)
else:
logging.error(f"unkown item.")
return item
# 插入电影明细,逻辑比较复杂
def insert_movie(self, item):
# 插入电影
mov_id = self.insert_or_update_common(item, self.tbl_movies, uniq_key='href', exists_do_nothing=False)
if not mov_id:
logging.error(f"insert movie error. movid: {item['movie_id']}, url: {item['href']}")
return
# 插入演员
for actor in item.get('actor_index_list', []):
actor_id = self.get_id_by_key(self.tbl_actor, uniq_key='href', val=actor['href'])
if not actor_id:
actor_id = self.insert_or_update_common(actor, self.tbl_actor, uniq_key='href', exists_do_nothing=True)
if actor_id:
lnk_id = self.insert_or_update_common({'movie_id':mov_id, 'actor_id':actor_id, 'actor_mov': f'{actor_id}_{mov_id}'}, tbl_name=self.tbl_actor_mov, uniq_key='actor_mov', exists_do_nothing=True)
if not lnk_id:
logging.error(f"insert actor_movie error. actor id: {actor_id}, mov id: {mov_id}")
# 插入tags
for tag in item.get('mov_tags_list', []):
tag_id = self.get_id_by_key(self.tbl_tags, uniq_key='href', val=tag['href'])
if not tag_id:
tag_id = self.insert_or_update_common(tag, self.tbl_tags, uniq_key='href')
if tag_id:
lnk_id = self.insert_or_update_common({'movie_id':mov_id, 'tag_id':tag_id, 'movid_tagid': f'{mov_id}_{tag_id}', 'tags':''}, tbl_name=self.tbl_mov_tags, uniq_key='movid_tagid', exists_do_nothing=True)
if not lnk_id:
logging.error(f"insert movie_tag error. tag id: {tag_id}, mov id: {mov_id}")
# 插入别名
for alt in item.get('mov_alt_list', []):
min_max = f"{alt['min_mov_id']}_{alt['max_mov_id']}"
lnk_id = self.insert_or_update_common({'min_mov_id':alt['min_mov_id'], 'max_mov_id':alt['max_mov_id'], 'min_max': min_max}, tbl_name=self.tbl_mov_alts, uniq_key='min_max', exists_do_nothing=True)
if not lnk_id:
logging.error(f"insert movie_alt error. item: {alt}")
def get_studios(self, **filters):
try:
sql = f"SELECT href, name, id, label_id, scene_count FROM {self.tbl_studios} WHERE 1=1"
params = []
conditions = {
"id": " AND id = ?",
"href": " AND href = ?",
"name": " AND name LIKE ?",
"start_id": " AND id > ?",
}
for key, condition in conditions.items():
if key in filters:
sql += condition
if key == "name":
params.append(f"%{filters[key]}%")
else:
params.append(filters[key])
if "order_by" in filters:
# 注意:这里 order by 后面直接跟字段名,不能用占位符,否则会被当作字符串处理
sql += f" ORDER BY {filters['order_by']} "
if 'limit' in filters:
sql += " LIMIT ?"
params.append(filters["limit"])
self.cursor.execute(sql, params)
return [dict(row) for row in self.cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return []
def get_stu_mov_count(self, stu_id):
self.cursor.execute(f"SELECT count(*) as cnt from {self.tbl_movies} WHERE studio_id = ?", (stu_id,))
row = self.cursor.fetchone()
return row[0] if row else None
# 统计函数
def get_stat(self):
try:
self.cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM {self.tbl_studios}) AS studios,
(SELECT COUNT(*) FROM {self.tbl_movies}) AS movies,
(SELECT COUNT(*) FROM {self.tbl_actor}) AS actors,
(SELECT COUNT(*) FROM {self.tbl_actor_mov}) AS act_mov
""")
row = self.cursor.fetchone()
if not row:
logging.warning(f"query no results.")
return {}
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
except sqlite3.Error as e:
logging.error(f"query error: {e}")
return {}
def close_spider(self, spider):
# 关闭数据库连接
self.conn.close()