modify scripts

This commit is contained in:
2025-11-10 11:35:44 +08:00
parent e81ca8a3a4
commit 1582c140bf
38 changed files with 562 additions and 7 deletions

View File

@ -0,0 +1,146 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts.
# this is typically a path given in POSIX (e.g. forward slashes)
# format, relative to the token %(here)s which refers to the location of this
# ini file
script_location = %(here)s
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory. for multiple paths, the path separator
# is defined by "path_separator" below.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to <script_location>/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "path_separator"
# below.
# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions
# path_separator; This indicates what character is used to split lists of file
# paths, including version_locations and prepend_sys_path within configparser
# files such as alembic.ini.
# The default rendered in new alembic.ini files is "os", which uses os.pathsep
# to provide os-dependent path splitting.
#
# Note that in order to support legacy alembic.ini files, this default does NOT
# take place if path_separator is not present in alembic.ini. If this
# option is omitted entirely, fallback logic is as follows:
#
# 1. Parsing of the version_locations option falls back to using the legacy
# "version_path_separator" key, which if absent then falls back to the legacy
# behavior of splitting on spaces and/or commas.
# 2. Parsing of the prepend_sys_path option falls back to the legacy
# behavior of splitting on spaces, commas, or colons.
#
# Valid values for path_separator are:
#
# path_separator = :
# path_separator = ;
# path_separator = space
# path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# database URL. This is consumed by the user-maintained env.py script only.
# other means of configuring database URLs may be customized within the env.py
# file.
sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module
# hooks = ruff
# ruff.type = module
# ruff.module = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Alternatively, use the exec runner to execute a binary found on your PATH
# hooks = ruff
# ruff.type = exec
# ruff.executable = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Logging configuration. This is also consumed by the user-maintained
# env.py script only.
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@ -0,0 +1,85 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
import os
from alembic import context
from sqlalchemy import create_engine
# 导入 resource 数据库的 Base包含所有表定义
from models.resources import ResourceBase
target_metadata = ResourceBase.metadata # 关联 MySQL 模型的元数据
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
#ini_section = config.get_section_option(config.config_ini_section, "ini_section", fallback="dev")
#url = config.get_section_option(ini_section, "sqlalchemy.url")
#connectable = create_engine(url)
url = "mysql+pymysql://root:mysqlpw@testdb:3306/resources"
connectable = create_engine(url)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@ -0,0 +1,47 @@
"""init
Revision ID: 758b3971a51e
Revises:
Create Date: 2025-11-10 10:01:19.228932
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '758b3971a51e'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('u3c3',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False, comment='主键ID'),
sa.Column('sites', sa.Text(), nullable=True, comment='站点名称'),
sa.Column('category', sa.Text(), nullable=True, comment='分类'),
sa.Column('title', sa.Text(), nullable=True, comment='标题'),
sa.Column('url', sa.String(length=512), nullable=True, comment='资源链接(唯一)'),
sa.Column('torrent_url', sa.Text(), nullable=True, comment='种子链接'),
sa.Column('magnet_url', sa.Text(), nullable=True, comment='磁力链接'),
sa.Column('size_text', sa.Text(), nullable=True, comment='大小文本描述'),
sa.Column('size_gb', sa.Float(), nullable=True, comment='大小GB'),
sa.Column('update_date', sa.Text(), nullable=True, comment='更新日期'),
sa.Column('created_at', sa.DateTime(), nullable=True, comment='创建时间(本地时间)'),
sa.Column('updated_at', sa.DateTime(), nullable=True, comment='更新时间(本地时间)'),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('url')
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('u3c3')
# ### end Alembic commands ###

View File

@ -0,0 +1 @@
Generic single-database configuration.

View File

@ -0,0 +1,28 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1 @@
from .resources import ResourceBase, U3C3

View File

@ -0,0 +1,29 @@
from sqlalchemy import Column, Integer, Text, String, Float, DateTime, func
from sqlalchemy.ext.declarative import declarative_base
# 基础模型基类(如果已有全局 Base 可直接复用)
ResourceBase = declarative_base()
class U3C3(ResourceBase):
"""对应 sqlite 中的 u3c3 表"""
__tablename__ = "u3c3" # 表名与原表保持一致
# 字段定义(严格映射原表结构)
id = Column(Integer, primary_key=True, autoincrement=True, comment="主键ID")
sites = Column(Text, comment="站点名称")
category = Column(Text, comment="分类")
title = Column(Text, comment="标题")
# 关键修改:给 String 加长度(如 512根据实际链接长度调整
url = Column(String(512), unique=True, comment="资源链接(唯一)")
torrent_url = Column(Text, comment="种子链接")
magnet_url = Column(Text, comment="磁力链接")
size_text = Column(Text, comment="大小文本描述")
size_gb = Column(Float, comment="大小GB")
update_date = Column(Text, comment="更新日期")
# 补充MySQL 中建议用 func.now() 替代 func.datetime(...),兼容性更好
created_at = Column(DateTime, default=func.now(), comment="创建时间(本地时间)")
updated_at = Column(DateTime, default=func.now(), onupdate=func.now(), comment="更新时间(本地时间)")
def __repr__(self):
"""打印实例时显示的信息"""
return f"<U3c3(id={self.id}, title='{self.title[:20]}...')>"

View File

@ -0,0 +1,164 @@
import os
import sqlite3
import json
import logging
from datetime import datetime
import scrapy_proj.comm.comm_def as comm
import scrapy_proj.items as items_def
from scrapy_proj.utils.utils import pretty_json_simple, is_valid_url
from my_sqlalchemy.models.resources import U3C3, ResourceBase
from typing import List, Dict, Optional
from sqlalchemy import create_engine, func, exists
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
# 注册器字典
mysql_handler_registry = {}
# 单例元类
class SingletonMeta(type):
_instances = {} # 存储每个类的唯一实例
def __call__(cls, *args, **kwargs):
# 检查实例是否已存在,不存在则创建
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
# MySQL 数据库基类(基于 SQLAlchemy
class BaseMysqlHandler(metaclass=SingletonMeta):
def __init__(self, db_url: Optional[str] = None):
# 默认 MySQL 连接地址(从环境变量或配置文件读取更灵活)
self.db_url = db_url or os.getenv(
"MYSQL_RESOURCES_URL",
"mysql+pymysql://root:mysqlpw@testdb:3306/resources?charset=utf8mb4"
)
# 初始化引擎和会话
self.engine = create_engine(self.db_url)
self.Session = sessionmaker(bind=self.engine)
# 确保表结构存在(生产环境建议用 Alembic 迁移,这里仅作为兜底)
ResourceBase.metadata.create_all(self.engine)
self.logger = logging.getLogger(__name__)
def get_session(self):
"""获取数据库会话(用完需手动关闭或用上下文管理器)"""
return self.Session()
def insert_or_update_common(self, item: dict, tbl_model, uniq_key: str, exists_do_nothing: bool = True):
"""
通用插入或更新方法(兼容非 DB 字段,避免报错)
:param item: 爬虫item字典类型
:param tbl_model: SQLAlchemy 模型类(如 U3C3
:param uniq_key: 唯一键字段名(如 'url'
:param exists_do_nothing: 若存在则不操作True否则更新False
"""
session = self.get_session()
try:
# 1. 提取模型的有效字段(避免非 DB 字段干扰)
model_fields = [col.name for col in tbl_model.__table__.columns]
# 2. 检查唯一键是否存在且有效
uniq_value = item.get(uniq_key)
if not uniq_value or uniq_key not in model_fields:
self.logger.warning(f"唯一键 {uniq_key} 无效或不存在,跳过数据: {item}")
return
# 3. 检查唯一键是否已存在
exists_query = session.query(
exists().where(getattr(tbl_model, uniq_key) == uniq_value)
).scalar()
if exists_query:
if exists_do_nothing:
self.logger.debug(f"唯一键 {uniq_key}={uniq_value} 已存在,忽略插入")
return
# 4. 存在则更新(只保留模型中存在的非唯一键字段)
update_data = {
k: v for k, v in item.items()
if k != uniq_key and k in model_fields # 双重过滤:排除唯一键 + 非 DB 字段
}
if not update_data:
self.logger.debug(f"无有效更新字段,跳过更新: {item}")
return
session.query(tbl_model).filter(
getattr(tbl_model, uniq_key) == uniq_value
).update(update_data)
self.logger.debug(f"更新数据: {update_data}")
else:
# 5. 不存在则插入(只保留模型中存在的字段)
valid_item = {k: v for k, v in item.items() if k in model_fields}
new_record = tbl_model(**valid_item)
session.add(new_record)
self.logger.debug(f"插入新数据: {valid_item}")
session.commit()
except SQLAlchemyError as e:
session.rollback()
self.logger.error(f"数据库操作失败: {str(e)}, 数据: {item}")
finally:
session.close()
def generic_stats_query(self, stats_config: List[Dict]) -> Dict:
"""
通用统计查询方法
:param stats_config: 统计配置,格式如 [{'table': 模型类, 'alias': '统计别名'}]
:return: 统计结果字典
"""
session = self.get_session()
try:
result = {}
for config in stats_config:
tbl_model = config['table'] # 传入 SQLAlchemy 模型类(如 U3C3
alias = config['alias'] # 统计结果的键名
# 执行计数查询(默认统计非删除记录,可根据模型调整)
count = session.query(func.count(tbl_model.id)).scalar()
result[alias] = count
return result
except SQLAlchemyError as e:
self.logger.error(f"统计查询失败: {str(e)}")
return {}
finally:
session.close()
def register_handler(spider_name):
def decorator(cls):
mysql_handler_registry[spider_name.lower()] = cls
return cls
return decorator
@register_handler(comm.SPIDER_NAME_U3C3)
class U3C3MysqlHandler(BaseMysqlHandler):
def __init__(self, db_url: Optional[str] = None):
super().__init__(db_url)
def insert_item(self, item):
self.insert_or_update_common(item, tbl_model=U3C3, uniq_key='url', exists_do_nothing=True)
# 统计函数
def get_stat(self):
stats_config = [
# 演员相关统计
{'table': self.tbl_name_u3c3, 'alias': 'cnt'},
]
return self.generic_stats_query(stats_config)
if __name__ == "__main__":
db = U3C3MysqlHandler()
item = {
'sites' : "u3c3",
'category' : 'test',
'title' : 'test',
'url' : 'test',
'torrent_url' : 'test',
'magnet_url' : 'test',
'size_text' : 'test',
'size_gb' : 2,
'update_date' : 'test'
}
testdb = U3C3MysqlHandler()
testdb.insert_item(item)

View File

@ -35,7 +35,7 @@ class SisDBHandler(SQLiteDBHandler):
]
return self.generic_stats_query(stats_config)
@register_handler(comm.SPIDER_NAME_U3C3)
#@register_handler(comm.SPIDER_NAME_U3C3)
class U3C3DBHandler(SQLiteDBHandler):
def __init__(self, db_path=default_dbpath):
super().__init__(db_path)

View File

@ -19,6 +19,7 @@ class U001Item(scrapy.Item):
category = scrapy.Field()
title = scrapy.Field()
url = scrapy.Field()
sites = scrapy.Field()
torrent_url = scrapy.Field()
magnet_url = scrapy.Field()
size_text = scrapy.Field()

View File

@ -13,6 +13,7 @@ import json
import scrapy
from scrapy_proj.items import U001Item, Sis001Item, PBoxStuItem
from scrapy_proj.db_wapper.spider_db_handler import spider_handler_registry, U3C3DBHandler, SisDBHandler, IAFDDBHandler, PboxDBHandler
from scrapy_proj.db_wapper.mysql_handler import mysql_handler_registry, U3C3MysqlHandler
class SQLitePipeline():
def __init__(self):
@ -22,7 +23,9 @@ class SQLitePipeline():
spider_name = spider.name.lower()
handler_class = spider_handler_registry.get(spider_name)
if not handler_class:
raise ValueError(f"未注册 Spider {spider_name} 的数据库处理类")
#raise ValueError(f"未注册 Spider {spider_name} 的数据库处理类")
spider.logger.warning(f"未注册 Spider {spider_name} 的Sqlite数据库处理类跳过数据库操作")
return
self.db_handlers[spider_name] = handler_class()
@ -42,7 +45,48 @@ class SQLitePipeline():
#spider.logger.debug(f"spider name: {spider_name}, item: {item_json}")
if not handler:
raise ValueError(f"未找到 Spider {spider_name} 的数据库处理器")
spider.logger.debug(f"未找到 Spider {spider_name} 的数据库处理器,跳过数据库操作")
return item
#raise ValueError(f"未找到 Spider {spider_name} 的数据库处理器")
handler.insert_item(item)
return item
class MysqlPipeline():
def __init__(self):
self.db_handlers = {}
def open_spider(self, spider):
spider_name = spider.name.lower()
handler_class = mysql_handler_registry.get(spider_name)
if not handler_class:
#raise ValueError(f"未注册 Spider {spider_name} 的数据库处理类")
spider.logger.warning(f"未注册 Spider {spider_name} 的数据库处理类,跳过数据库操作")
return
self.db_handlers[spider_name] = handler_class()
def close_spider(self, spider):
spider_name = spider.name.lower()
handler = self.db_handlers.pop(spider_name, None)
if handler:
pass
#handler.close() # 这里不关闭,由统计中间件去关闭
def process_item(self, item, spider):
spider_name = spider.name.lower()
handler = self.db_handlers.get(spider_name)
# 转换为单行JSON格式
#item_json = json.dumps(dict(item), ensure_ascii=False, separators=(',', ':'))
#spider.logger.debug(f"spider name: {spider_name}, item: {item_json}")
if not handler:
spider.logger.warning(f"未找到 Spider {spider_name} 的数据库处理器,跳过数据库操作")
return item
#raise ValueError(f"未找到 Spider {spider_name} 的数据库处理器")
handler.insert_item(item)
return item

View File

@ -40,6 +40,7 @@ DOWNLOAD_DELAY = 0.3
# 启用管道
ITEM_PIPELINES = {
'scrapy_proj.pipelines.SQLitePipeline': 300,
'scrapy_proj.pipelines.MysqlPipeline': 400,
}
# 用户代理池

View File

@ -475,7 +475,8 @@ class JavbusCrawler(GenericCrawler):
'duration': ['長度:', '长度:', 'Length:', '収録時間:'],
'studio': ['製作商:', '制作商:', 'Studio:', 'メーカー:'],
'label': ['發行商:', '发行商:', 'Label:', 'レーベル:'],
'series': ['系列:', 'Series:', 'シリーズ:']
'series': ['系列:', 'Series:', 'シリーズ:'],
'director': ['導演:', '导演:', 'Director:', '監督:']
}
# 遍历所有p标签查找信息
@ -491,7 +492,7 @@ class JavbusCrawler(GenericCrawler):
if target_key:
# 获取值(处理文本和链接)
if target_key in ['studio', 'label', 'series']:
if target_key in ['studio', 'label', 'series', 'director']:
# 处理有链接的字段
a_tag = p.find('a')
if a_tag:

View File

@ -9,7 +9,7 @@ class U001Spider(BaseSpider):
name = SPIDER_NAME_U3C3
allowed_domains = ["u001.25img.com", 'u9a9.com']
start_urls = ["https://u001.25img.com/?p=1", 'https://u9a9.com/?type=2&p=1']
#start_urls = ['https://u9a9.com/?type=2&p=1']
#start_urls = ['https://u001.25img.com/?p=1']
def __init__(self, debug='False', begin=None, *args, **kwargs):
super().__init__(*args, **kwargs)
@ -23,7 +23,13 @@ class U001Spider(BaseSpider):
need_next = False
for row in response.css('table.torrent-list tbody tr'):
item = U001Item()
item['category'] = row.css('td:nth-child(1) a::attr(title)').get()
#item['category'] = row.css('td:nth-child(1) a::attr(title)').get()
# 优先获取 a 标签的 title若为空则获取 img 标签的 alt
item['category'] = row.css(
'td:nth-child(1) a::attr(title), ' # 第一种情况a标签的title
'td:nth-child(1) img::attr(alt)' # 第二种情况img标签的alt注意逗号分隔表示“或”
).get()
item['title'] = extract_text_from_element(row.css('td:nth-child(2) a'), use_title=True)
item['url'] = response.urljoin(row.css('td:nth-child(2) a::attr(href)').get())
@ -36,6 +42,7 @@ class U001Spider(BaseSpider):
item['size_gb'] = parse_size(size_text)
item['update_date'] = row.css('td:nth-child(5)::text').get(default='').strip()
item['sites'] = 'u9a9' if "u9a9" in response.url else 'u3c3'
# 判断是否还要翻页,只有满足所有页面的数据,日期均小于开始日期时,停止翻页
up_date = parse_date_to_datetime(item['update_date'])