modify scripts

This commit is contained in:
oscarz
2025-03-17 11:30:35 +08:00
parent e6327fbe73
commit d5dc76b87f
178 changed files with 44 additions and 184447 deletions

87
iafd/src/config.py Normal file
View File

@ -0,0 +1,87 @@
import logging
import os
import inspect
import time
from datetime import datetime
from logging.handlers import RotatingFileHandler
from collections import defaultdict
home_dir = os.path.expanduser("~")
global_host_data_dir = f'{home_dir}/hostdir/scripts_data'
global_share_data_dir = f'{home_dir}/sharedata'
# 统计日志频率
log_count = defaultdict(int) # 记录日志的次数
last_log_time = defaultdict(float) # 记录上次写入的时间戳
class RateLimitFilter(logging.Filter):
"""
频率限制过滤器:
1. 在 60 秒内,同样的日志最多写入 60 次,超过则忽略
2. 如果日志速率超过 100 条/秒,发出告警
"""
LOG_LIMIT = 60 # 每分钟最多记录相同消息 10 次
def filter(self, record):
global log_count, last_log_time
message_key = record.getMessage() # 获取日志内容
# 计算当前时间
now = time.time()
elapsed = now - last_log_time[message_key]
# 限制相同日志的写入频率
if elapsed < 60: # 60 秒内
log_count[message_key] += 1
if log_count[message_key] > self.LOG_LIMIT:
print('reach limit.')
return False # 直接丢弃
else:
log_count[message_key] = 1 # 超过 60 秒,重新计数
last_log_time[message_key] = now
return True # 允许写入日志
def setup_logging(log_filename=None):
if log_filename is None:
caller_frame = inspect.stack()[1]
caller_filename = os.path.splitext(os.path.basename(caller_frame.filename))[0]
current_date = datetime.now().strftime('%Y%m%d')
log_filename = f'../log/{caller_filename}_{current_date}.log'
max_log_size = 100 * 1024 * 1024 # 10 MB
max_log_files = 10 # 最多保留 10 个日志文件
file_handler = RotatingFileHandler(log_filename, maxBytes=max_log_size, backupCount=max_log_files)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s'
))
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] (%(funcName)s) - %(message)s'
))
# 创建 logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.handlers = [] # 避免重复添加 handler
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# 添加频率限制
rate_limit_filter = RateLimitFilter()
file_handler.addFilter(rate_limit_filter)
console_handler.addFilter(rate_limit_filter)
# 运行示例
if __name__ == "__main__":
setup_logging()
for i in range(1000):
logging.info("测试日志,检测频率限制")
time.sleep(0.01) # 模拟快速写入日志

411
iafd/src/fetch.py Normal file
View File

@ -0,0 +1,411 @@
import json
import time
import csv
import argparse
import logging
from functools import partial
import config
import sqlite_utils as db_tools
import iafd_scraper as scraper
import utils
config.setup_logging()
debug = False
force = False
# 按星座获取演员列表,无翻页
def fetch_performers_by_astro():
for astro in scraper.astro_list:
url = scraper.astr_base_url + astro
logging.info(f"Fetching data for {astro}, url {url} ...")
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="astro", attr_type="id"))
if soup:
list_data, next_url = scraper.parse_page_astro(soup, astro)
if list_data:
for row in list_data :
# 写入演员数据表
perfomer_id = db_tools.insert_performer_index(name=row['person'], href=row.get('href', '').lower(), from_astro_list=1)
if perfomer_id:
logging.debug(f'insert performer index to db. performer_id:{perfomer_id}, name: {row['person']}, href:{row['href']}')
else:
logging.warning(f'insert performer index failed. name: {row['person']}, href:{row['href']}')
else:
logging.warning(f'fetch astro error. {url} ...')
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...')
else:
logging.warning(f'fetch astro error. {url} ...')
# 调试添加break
if debug:
break
# 按生日获取演员列表,无翻页
def fetch_performers_by_birth():
for month in range(1, 13): # 遍历1到12月
for day in range(1, 32): # 遍历1到31天
url = scraper.birth_base_url.format(month=month, day=day)
logging.info(f"Fetching data for birth, url {url}")
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="col-sm-12 col-lg-9", attr_type="class"))
if soup:
list_data, next_url = scraper.parse_page_birth(soup, month, day)
if list_data:
for row in list_data :
# 写入演员数据表
perfomer_id = db_tools.insert_performer_index(name=row['person'], href=row.get('href', '').lower(), from_birth_list=1)
if perfomer_id:
logging.debug(f'insert performer index to db. performer_id:{perfomer_id}, name: {row['person']}, href:{row['href']}')
else:
logging.warning(f'insert performer index failed. name: {row['person']}, href:{row['href']}')
else:
logging.warning(f'fetch astro error. {url} ...')
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...')
else:
logging.warning(f'fetch astro error. {url} ...')
# 调试添加break
if debug:
return True
# 更新人种列表
def fetch_ethic_list():
url = scraper.ethnic_list_url
logging.info(f"Fetching data for performer's ethnic list, url {url} ...")
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="select", identifier="ethnicity1", attr_type="id"))
if soup:
list_data = scraper.parse_page_ethnic_list(soup, url)
if list_data:
for row in list_data :
dist_id = db_tools.insert_or_update_ethnic({'name': row['name'], 'href': row.get('href', '')})
if dist_id:
logging.debug(f'insert one record into ethnic table. id:{dist_id}, name: {row['name']}, href:{row.get('href', '')}')
else:
logging.warning(f'fetch ethnic error. {url} ...')
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...')
else:
logging.warning(f'fetch page error. {url} ...')
# 按人种获取演员列表,有翻页
def fetch_performers_by_ethnic():
# 先刷新列表
fetch_ethic_list()
ethnic_list = db_tools.query_ethnic_hrefs()
for row in ethnic_list:
url = row['href']
ethnic = row['name']
next_url = url
while next_url:
logging.info(f"Fetching data for {ethnic}, url {url} ...")
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="row headshotrow", attr_type="class"),
parser="lxml", preprocessor=scraper.preprocess_html)
if soup:
list_data, next_url = scraper.parse_page_ethnic(soup, ethnic)
if list_data:
for row in list_data :
# 写入演员数据表
perfomer_id = db_tools.insert_performer_index(name=row['person'], href=row.get('href', '').lower(), from_ethnic_list=1)
if perfomer_id:
logging.debug(f'insert performer index to db. performer_id:{perfomer_id}, name: {row['person']}, href:{row['href']}')
else:
logging.warning(f'insert performer index failed. name: {row['person']}, href:{row['href']}')
else:
logging.warning(f'fetch astro error. {url} ...')
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}, Skiping...')
break
else:
logging.warning(f'fetch astro error. {url} ...')
# 调试添加break
if debug:
return True
# 获取distributors列表
def fetch_distributors_list():
url = scraper.distributors_list_url
logging.info(f"Fetching data for distributors list, url {url} ...")
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="select", identifier="Distrib", attr_type="name"))
if soup:
list_data, next_url = scraper.parse_page_dist_stu_list(soup, "Distrib")
if list_data:
for row in list_data :
dis_url = scraper.distributors_base_url + row['href']
dist_id = db_tools.insert_or_update_distributor({'name': row['name'], 'href': dis_url})
if dist_id:
logging.debug(f'insert one record into distributors table. id:{dist_id}, name: {row['name']}, href:{dis_url}')
else:
logging.warning(f'fetch astro error. {url} ...')
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...')
else:
logging.warning(f'fetch astro error. {url} ...')
# 获取studios列表
def fetch_studios_list():
url = scraper.studios_list_url
logging.info(f"Fetching data for studios list, url {url} ...")
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="select", identifier="Studio", attr_type="name"))
if soup:
list_data, next_url = scraper.parse_page_dist_stu_list(soup, "Studio")
if list_data:
for row in list_data :
stu_url = scraper.studios_base_url + row['href']
stu_id = db_tools.insert_or_update_studio({'name': row['name'], 'href': stu_url})
if stu_id:
logging.debug(f'insert one record into studios table. id:{stu_id}, name: {row['name']}, href:{stu_url}')
else:
logging.warning(f'fetch astro error. {url} ...')
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...')
else:
logging.warning(f'fetch astro error. {url} ...')
# 更新distributors列表中的影片信息
def fetch_movies_by_dist():
# 先刷新一下列表
fetch_distributors_list()
url_list = db_tools.query_studio_hrefs()
if debug:
url_list = db_tools.query_distributor_hrefs(name='vixen.com')
for url in url_list:
logging.info(f"Fetching data for distributor url {url} ...")
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="table", identifier="distable", attr_type="id"))
if soup:
list_data, next_url = scraper.parse_page_dist_stu(soup, 'distable')
if list_data:
for movie in list_data:
tmp_id = db_tools.insert_movie_index(title=movie['title'], href=movie['href'], release_year=utils.to_number(movie['year']), from_dist_list=1)
if tmp_id:
logging.debug(f'insert one movie index to db. movie_id: {tmp_id}, title: {movie['title']}, href: {movie['href']}')
else:
logging.warning(f'insert movie index failed. title: {movie['title']}, href: {movie['href']}')
else :
logging.warning(f'parse_page_movie error. url: {url}')
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...')
else:
logging.warning(f'fetching page error. {url}')
# 调试增加brak
if debug:
break
# 更新distributors列表中的影片信息
def fetch_movies_by_stu():
# 先刷新一下列表
fetch_studios_list()
url_list = db_tools.query_studio_hrefs()
if debug:
url_list = db_tools.query_studio_hrefs(name='vixen.com')
for url in url_list:
logging.info(f"Fetching data for studio url {url} ...")
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="table", identifier="studio", attr_type="id"))
if soup:
list_data, next_url = scraper.parse_page_dist_stu(soup, 'studio')
if list_data:
for movie in list_data:
tmp_id = db_tools.insert_movie_index(title=movie['title'], href=movie['href'], release_year=utils.to_number(movie['year']), from_stu_list=1)
if tmp_id:
logging.debug(f'insert one movie index to db. movie_id: {tmp_id}, title: {movie['title']}, href: {movie['href']}')
else:
logging.warning(f'insert movie index failed. title: {movie['title']}, href: {movie['href']}')
else :
logging.warning(f'parse_page_movie error. url: {url}')
elif status_code and status_code == 404:
logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...')
else:
logging.warning(f'fetching page error. {url}')
# 调试增加brak
if debug:
break
# 更新演员信息,单次循环
def fetch_performers_detail_once(perfomers_list):
last_performer_id = 0
for performer in perfomers_list:
url = performer['href']
person = performer['name']
logging.info(f"Fetching data for performer ({person}), url {url} ...")
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="headshot", attr_type="id"))
if soup:
data = scraper.parse_page_performer(soup)
if data:
performer_id = db_tools.insert_or_update_performer({
'href': url,
'person': person,
**data
})
if performer_id:
logging.debug(f'insert one person, id: {performer_id}, person: ({person}), url: {url}')
last_performer_id = performer_id
else:
logging.warning(f'insert person: ({person}) {url} failed.')
# 写入到本地json文件
utils.write_person_json(person, url, {
'href': url,
'person': person,
**data
})
else:
logging.warning(f'parse_page_performer error. person: ({person}), url: {url}')
elif status_code and status_code == 404:
performer_id = db_tools.insert_or_update_performer_404(name=person, href=url)
logging.warning(f'404 page. id: {performer_id}, name: {person}, url: {url}, Skiping...')
else:
logging.warning(f'fetch_page error. person: ({person}), url: {url}')
time.sleep(1)
return last_performer_id
# 更新演员信息
def fetch_performers_detail():
limit_count = 5 if debug else 100
perfomers_list = []
# 获取新演员的列表
while True:
perfomers_list = db_tools.query_performer_hrefs(is_full_data=0, limit=limit_count)
if len(perfomers_list) < 1:
logging.info(f'all new performers fetched. ')
break
last_perfomer_id = fetch_performers_detail_once(perfomers_list)
logging.info(f'insert {len(perfomers_list)} person. last performer id: {last_perfomer_id}')
if debug:
break
# 获取待更新的演员的列表
while True:
perfomers_list = db_tools.get_performers_needed_update(limit=limit_count)
if len(perfomers_list) < 1:
logging.info(f'all existed performers updated. ')
break
last_perfomer_id = fetch_performers_detail_once(perfomers_list)
logging.info(f'insert {len(perfomers_list)} person. last performer id: {last_perfomer_id}')
if debug:
break
# 更新影片信息
def fetch_movies_detail():
limit_count = 10 if debug else 100
movies_list = []
while True:
movies_list = db_tools.query_movie_hrefs(is_full_data=0, limit=limit_count)
if len(movies_list) < 1:
logging.info(f'all movies fetched.')
break
last_movie_id = 0
succ_count = 0
for movie in movies_list:
url = movie['href']
title = movie['title']
logging.debug(f"Fetching data for movie ({title}), url {url} ...")
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="col-xs-12 col-sm-3", attr_type="class"))
if soup:
movie_data = scraper.parse_page_movie(soup, url, title)
if movie_data :
# 修复url不规范的问题
if movie_data['DistributorHref']:
movie_data['DistributorHref'] = utils.dist_stu_href_rewrite(movie_data['DistributorHref'].lower())
if movie_data['StudioHref']:
movie_data['StudioHref'] = utils.dist_stu_href_rewrite(movie_data['StudioHref'].lower())
movie_id = db_tools.insert_or_update_movie(movie_data)
if movie_id:
logging.debug(f'insert one movie, id: {movie_id}, title: ({title}) url: {url}')
last_movie_id = movie_id
succ_count += 1
else:
logging.warning(f'insert movie {url} failed.')
# 写入到本地json文件
utils.write_movie_json(url, movie_data)
else:
logging.warning(f'parse_page_movie error. url: {url}')
elif status_code and status_code == 404:
# 标记为已处理
movie_id = db_tools.insert_or_update_movie_404(title=title, href=url)
logging.warning(f'404 page. id: {movie_id}, title: ({title}), url: {url}, Skiping...')
else:
logging.warning(f'fetch_page error. url: {url}')
time.sleep(1)
logging.info(f'total request: {len(movies_list)}, succ: {succ_count}. last movie id: {last_movie_id}')
# 调试增加break
if debug:
return True
# 建立缩写到函数的映射
function_map = {
"astro": fetch_performers_by_astro,
"birth": fetch_performers_by_birth,
"ethnic": fetch_performers_by_ethnic,
"dist" : fetch_movies_by_dist,
"stu" : fetch_movies_by_stu,
"performers": fetch_performers_detail,
"movies" : fetch_movies_detail,
}
# 主函数
def main(cmd, args_debug, args_force):
global debug
debug = args_debug
global force
force = args_force
# 开启任务
task_id = db_tools.insert_task_log()
if task_id is None:
logging.warning(f'insert task log error.')
return None
logging.info(f'running task. id: {task_id}, debug: {debug}, force: {force}, cmd: {cmd}')
# 执行指定的函数
if cmd:
function_names = args.cmd.split(",") # 拆分输入
for short_name in function_names:
func = function_map.get(short_name.strip()) # 从映射中获取对应的函数
if callable(func):
db_tools.update_task_log(task_id, task_status=f'Running {func}')
func()
else:
print(f"Warning: {short_name} is not a valid function shortcut.")
else: # 全量执行
for name, func in function_map.items():
if callable(func):
db_tools.update_task_log(task_id, task_status=f'Running {func}')
func()
else:
print(f"Warning: {name} is not a valid function shortcut.")
logging.info(f'all process completed!')
db_tools.finalize_task_log(task_id)
# TODO:
# 1, movies 更新之后,要给相应的 performers 表打个 is_full_data = 0, 然后刷新获取
# 2, distributors 和 studios 对movie列表的互相检验
# 3, 数据不规范问题,可以先手动导入所有 performers 和 movies ,然后用本程序增量获取新的
if __name__ == "__main__":
# 命令行参数处理
keys_str = ",".join(function_map.keys())
parser = argparse.ArgumentParser(description='fetch iafd data.')
parser.add_argument("--cmd", type=str, help=f"Comma-separated list of function shortcuts: {keys_str}")
parser.add_argument('--debug', action='store_true', help='Enable debug mode (limit records)')
parser.add_argument('--force', action='store_true', help='force update (true for rewrite all)')
args = parser.parse_args()
main(args.cmd, args.debug, args.force)

562
iafd/src/iafd_scraper.py Normal file
View File

@ -0,0 +1,562 @@
import cloudscraper
import time
import json
import csv
import logging
import signal
import sys
import os
import re
from bs4 import BeautifulSoup
from requests.exceptions import RequestException
from functools import partial
import config
# 定义基础 URL 和可变参数
host_url = "https://www.iafd.com"
astr_base_url = f"{host_url}/astrology.rme/sign="
astro_list = ['Aries', 'Taurus', 'Gemini', 'Cancer', 'Leo', 'Virgo', 'Libra', 'Scorpio', 'Sagittarius', 'Capricorn', 'Aquarius', 'Pisces']
birth_base_url = "https://www.iafd.com/calendar.asp?calmonth={month}&calday={day}"
distributors_list_url = f'{host_url}/distrib.asp'
distributors_base_url = f"{host_url}/distrib.rme/distrib="
studios_list_url = f"{host_url}/studio.asp"
studios_base_url = f"{host_url}/studio.rme/studio="
ethnic_list_url = f'{host_url}/advsearch.asp'
# 设置 headers 和 scraper
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
scraper = cloudscraper.create_scraper()
#使用 CloudScraper 进行网络请求,并执行页面验证,支持不同解析器和预处理
def fetch_page(url, validator, max_retries=3, parser="html.parser", preprocessor=None):
for attempt in range(max_retries):
try:
if host_url not in url.lower():
logging.error(f'wrong url format: {url}')
return None, None
response = scraper.get(url, headers=headers)
# 处理 HTTP 状态码
if response.status_code == 404:
logging.debug(f"Page not found (404): {url}")
return None, 404 # 直接返回 404调用方可以跳过
response.raise_for_status() # 处理 HTTP 错误
# 过期的网页与404相同处理
if "invalid or outdated page" in response.text.lower():
logging.debug(f"invalid or outdated page: {url}")
return None, 404 # 直接返回 404调用方可以跳过
# 预处理 HTML如果提供了 preprocessor
html_text = preprocessor(response.text) if preprocessor else response.text
soup = BeautifulSoup(html_text, parser)
if validator(soup): # 进行自定义页面检查
return soup, response.status_code
logging.warning(f"Validation failed on attempt {attempt + 1} for {url}")
except cloudscraper.exceptions.CloudflareChallengeError as e:
logging.error(f"Cloudflare Challenge Error on {url}: {e}, Retring...")
except cloudscraper.exceptions.CloudflareCode1020 as e:
logging.error(f"Access Denied (Error 1020) on {url}: {e}, Retring...")
except Exception as e:
logging.error(f"Unexpected error on {url}: {e}, Retring...")
logging.error(f'Fetching failed after max retries. {url}')
return None, None # 达到最大重试次数仍然失败
# 修复 HTML 结构,去除多余标签并修正 <a> 标签,在获取人种的时候需要
def preprocess_html(html):
return html.replace('<br>', '').replace('<a ', '<a target="_blank" ')
# 通用的 HTML 结构验证器
def generic_validator(soup, tag, identifier, attr_type="id"):
if attr_type == "id":
return soup.find(tag, id=identifier) is not None
elif attr_type == "class":
return bool(soup.find_all(tag, class_=identifier))
elif attr_type == "name":
return bool(soup.find('select', {'name': identifier}))
return False
# 检查电影信息是否存在
def movie_validator(soup, table_id):
return soup.find("table", id=table_id) is not None
# 解析 HTML 内容,提取需要的数据
def parse_page_ethnic_list(soup, href):
div_root = soup.find("select", id="ethnicity1")
if not div_root:
logging.warning(f"Warning: No 'ethnicity1' select found in {href}")
return None, None
list_data = []
# 提取所有的 <option> 标签
options = div_root.find_all('option')
if options:
# 解析并输出 value 和文本内容
for option in options:
href = option.get('value', None)
text = option.text.strip()
if href and href.lower() == 'none':
continue
list_data.append({
"name": text,
"href": host_url + href if href else ''
})
return list_data
# 解析 HTML 内容,提取需要的数据
def parse_page_astro(soup, astro):
astro_div = soup.find("div", id="astro")
if not astro_div:
logging.warning(f"Warning: No 'astro' div found in {astro}")
return None, None
flag = False
list_cnt = 0
list_data = []
next_url = None
birth_date = None
for elem in astro_div.find_all(recursive=False):
if elem.name == "h3" and "astroday" in elem.get("class", []):
birth_date = elem.get_text(strip=True)
elif elem.name == "div" and "perficon" in elem.get("class", []):
a_tag = elem.find("a")
if a_tag:
href = host_url + a_tag["href"]
name = a_tag.find("span", class_="perfname")
if name:
list_data.append({
"astrology": astro,
"birth_date": birth_date,
"person": name.get_text(strip=True),
"href": href
})
flag = True
list_cnt = list_cnt +1
if flag:
logging.debug(f"get {list_cnt} persons from this page. total persons: {len(list_data)}")
return list_data, next_url
else:
return None, None
# 解析页面内容并更新birth_map
def parse_page_birth(soup, month, day):
datarows = soup.find_all('div', class_='col-sm-12 col-lg-9')
if not datarows:
return None, None
flag = False
list_cnt = 0
list_data = []
next_url = None
rows = datarows[0].find_all('div', class_='col-sm-4')
for row in rows:
link_tag = row.find('a')
person = link_tag.text.strip() if link_tag else ''
href = link_tag['href'] if link_tag else ''
href = host_url + href
# 如果 href 已经在 birth_map 中,跳过
flag = True
if any(entry['href'] == href for entry in list_data):
continue
# 将数据添加到 birth_map
list_data.append({
'month': month,
'day': day,
'person': person,
'href': href
})
list_cnt = list_cnt +1
if flag:
logging.debug(f"get {list_cnt} persons from this page. total persons: {len(list_data)}")
return list_data, next_url
else:
return None, None
# 解析 HTML 内容,提取需要的数据
def parse_page_ethnic(soup, ethnic):
rows = soup.find_all('div', class_='row headshotrow')
flag = False
list_data = []
next_url = None
for row in rows:
for col in row.find_all('div', class_='col-lg-2 col-md-3 col-sm-4 col-xs-6'):
link_tag = col.find('a')
img_tag = col.find('div', class_='pictag')
flag = True
if link_tag and img_tag:
href = host_url + link_tag['href']
person = img_tag.text.strip()
# 将数据存储到 ethnic_map
list_data.append({
'ethnic': ethnic,
'person': person,
'href': href
})
if flag:
logging.debug(f"get {len(list_data)} persons from this page.")
next_page = soup.find('a', rel='next')
if next_page:
next_url = host_url + next_page['href']
logging.debug(f"Found next page: {next_url}")
return list_data, next_url
else:
logging.debug(f"All pages fetched for {ethnic}.")
return list_data, None
else:
return None, None
# 解析列表页
def parse_page_dist_stu_list(soup, select_name):
list_data = []
next_url = None
select_element = soup.find('select', {'name': select_name})
if select_element :
options = select_element.find_all('option')
for option in options:
value = option.get('value') # 获取 value 属性
text = option.text.strip() # 获取文本内容
list_data.append({
'name' : text,
'href' : str(value)
})
return list_data, next_url
else:
return None, None
# 解析 HTML 内容,提取需要的数据
def parse_page_dist_stu(soup, table_id):
table = soup.find("table", id=table_id)
if not table:
logging.warning(f"Warning: No {table_id} table found ")
return None, None
# 找到thead并跳过
thead = table.find('thead')
if thead:
thead.decompose() # 去掉thead部分不需要解析
# 现在只剩下tbody部分
tbody = table.find('tbody')
rows = tbody.find_all('tr') if tbody else []
list_data = []
next_url = None
for row in rows:
cols = row.find_all('td')
if len(cols) >= 5:
title = cols[0].text.strip()
label = cols[1].text.strip()
year = cols[2].text.strip()
rev = cols[3].text.strip()
a_href = cols[0].find('a')
href = host_url + a_href['href'] if a_href else ''
list_data.append({
'title': title,
'label': label,
'year': year,
'rev': rev,
'href': href
})
return list_data, next_url
# 解析 作品列表,有个人出演,也有导演的
def parse_credits_table(table, distributor_list):
# 找到thead并跳过
thead = table.find('thead')
if thead:
thead.decompose() # 去掉thead部分不需要解析
# 现在只剩下tbody部分
tbody = table.find('tbody')
rows = tbody.find_all('tr') if tbody else []
movies = []
distributor_count = {key: 0 for key in distributor_list} # 初始化每个 distributor 的计数
# rows = table.find_all('tr', class_='we')
for row in rows:
#tr_class = row.get('class', '') # 获取 class 属性,如果没有则返回空字符串
tr_class = ' '.join(row.get('class', [])) # 获取 class 属性,如果没有则返回空字符串
cols = row.find_all('td')
if len(cols) >= 6:
title = cols[0].text.strip()
href_a = cols[0].find('a')
href = href_a['href'] if href_a else ''
year = cols[1].text.strip()
distributor = cols[2].text.strip().lower()
href_d = cols[2].find('a')
href_dist = host_url + href_d['href'] if href_d else ''
notes = cols[3].text.strip()
rev = cols[4].text.strip()
formats = cols[5].text.strip()
for key in distributor_list:
if key in distributor:
distributor_count[key] += 1
movies.append({
'title': title,
'href' : href,
'year': year,
'distributor': distributor,
'distributor_href': href_dist,
'notes': notes,
'rev': rev,
'formats': formats,
'tr_class': tr_class
})
return movies, distributor_count
# 请求网页并提取所需数据
def parse_page_performer(soup):
# 提取数据
data = {}
# 定义我们需要的字段名称和HTML中对应的标签
fields = {
'performer_aka': 'Performer AKA',
'birthday': 'Birthday',
'astrology': 'Astrology',
'birthplace': 'Birthplace',
'gender': 'Gender',
'years_active': 'Years Active',
'ethnicity': 'Ethnicity',
'nationality': 'Nationality',
'hair_colors': 'Hair Colors',
'eye_color': 'Eye Color',
'height': 'Height',
'weight': 'Weight',
'measurements': 'Measurements',
'tattoos': 'Tattoos',
'piercings': 'Piercings'
}
reversed_map = {v: k for k, v in fields.items()}
# 解析表格数据, 获取参演或者导演的列表
role_list = ['personal', 'directoral']
distributor_list = ['vixen', 'blacked', 'tushy', 'x-art']
credits_list = {}
# 使用字典来存储统计
distributor_count = {key: 0 for key in distributor_list} # 初始化每个 distributor 的计数
for role in role_list:
table = soup.find('table', id=role)
if table :
movies, stat_map = parse_credits_table(table, distributor_list)
credits_list[role] = movies
# 更新 distributor 统计
for distributor in distributor_list:
distributor_count[distributor] += stat_map.get(distributor, 0)
# 统计 movies 数量
#movies_cnt = sum(len(credits_list[role]) for role in role_list if credits_list[role])
movies_cnt = sum(len(credits_list.get(role, [])) for role in role_list if credits_list.get(role, []))
# 如果没有找到
if len(credits_list) == 0 :
logging.warning(f"movie table empty. url: {url} ")
# 遍历每个 bioheading, 获取metadata
bioheadings = soup.find_all('p', class_='bioheading')
for bio in bioheadings:
heading = bio.text.strip()
biodata = None
# 如果包含 "Performer",需要特殊处理
if 'Performer' in heading:
heading = 'Performer AKA'
biodata_div = bio.find_next('div', class_='biodata')
if biodata_div:
div_text = biodata_div.get_text(separator='|').strip()
biodata = [b.strip() for b in div_text.split('|') if b.strip()]
else:
biodata = bio.find_next('p', class_='biodata').text.strip() if bio.find_next('p', class_='biodata') else ''
# 保存数据
if heading in reversed_map:
kkey = reversed_map[heading]
data[kkey] = biodata
# 添加统计数据到 data
data['movies_cnt'] = movies_cnt
data['vixen_cnt'] = distributor_count['vixen']
data['blacked_cnt'] = distributor_count['blacked']
data['tushy_cnt'] = distributor_count['tushy']
data['x_art_cnt'] = distributor_count['x-art']
data['credits'] = credits_list
return data
# 解析网页 HTML 并提取电影信息
def parse_page_movie(soup, href, title):
# 解析电影基础信息
movie_data = {}
info_div = soup.find("div", class_="col-xs-12 col-sm-3")
if info_div:
labels = info_div.find_all("p", class_="bioheading")
values = info_div.find_all("p", class_="biodata")
for label, value in zip(labels, values):
key = label.text.strip()
val = value.text.strip()
if key in ["Distributor", "Studio", "Director"]:
link = value.find("a")
if link:
val = link.text.strip()
movie_data[f'{key}Href'] = host_url + link['href']
movie_data[key] = val
else:
return None
# 解析演职人员信息
performers = []
cast_divs = soup.find_all("div", class_="castbox")
for cast in cast_divs:
performer = {}
link = cast.find("a")
if link:
performer["name"] = link.text.strip()
performer["href"] = host_url + link["href"]
performer["tags"] = [
tag.strip() for br in cast.find_all("br")
if (tag := br.next_sibling) and isinstance(tag, str) and tag.strip()
]
#performer["tags"] = [br.next_sibling.strip() for br in cast.find_all("br") if br.next_sibling and (br.next_sibling).strip()]
performers.append(performer)
# 解析场景拆解
scene_breakdowns = []
scene_table = soup.find("div", id="sceneinfo")
if scene_table:
rows = scene_table.find_all("tr")
for row in rows:
cols = row.find_all("td")
if len(cols) >= 2:
scene = cols[0].text.strip() # 场景编号
performer_info = cols[1] # 包含表演者及链接信息
# 获取 <br> 之前的完整 HTML保留 <i> 标签等格式)
performer_html = str(performer_info) # 获取所有HTML内容
split_html = performer_html.split("<br/>") # 按 <br> 进行分割
if split_html:
performers_html = split_html[0].strip() # 取 <br> 之前的部分
else:
split_html = performer_html.split("<br>") # 按 <br> 进行分割
if split_html:
performers_html = split_html[0].strip() # 取 <br> 之前的部分
else:
performers_html = performer_html.strip() # 如果没有 <br>,取全部
# 解析为纯文本去除HTML标签仅提取文本内容
performers_soup = BeautifulSoup(performers_html, "html.parser")
performers_text = performers_soup.get_text()
# 提取表演者
scene_performers = [p.strip() for p in performers_text.split(",")]
# 尝试获取 `webscene` 和 `studio`
links_data = {}
links = performer_info.find_all("a")
if links:
webscene_title = links[0].text.strip() if len(links)>0 else None
webscene = links[0]["href"] if len(links)>0 else None
studio = links[1].text.strip() if len(links)>1 else None
studio_lnk = links[1]["href"] if len(links)>1 else None
links_data = {
"title": webscene_title,
"webscene": webscene,
"studio": studio,
"studio_lnk": studio_lnk,
}
scene_data = {
"scene": scene,
"performers": scene_performers,
**links_data,
}
scene_breakdowns.append(scene_data)
appears_in = []
appears_divs = soup.find("div", id="appearssection")
if appears_divs:
rows = appears_divs.find_all("li")
for row in rows:
lnk = row.find("a")
if lnk:
appears_in.append({'title': lnk.text.strip(), 'href': host_url + lnk['href']})
return {
"href": href,
"title": title,
"Minutes": movie_data.get("Minutes", ""),
"Distributor": movie_data.get("Distributor", ""),
"Studio": movie_data.get("Studio", ""),
"ReleaseDate": movie_data.get("Release Date", ""),
"AddedtoIAFDDate": movie_data.get("Date Added to IAFD", ""),
"All-Girl": movie_data.get("All-Girl", ""),
"All-Male": movie_data.get("All-Male", ""),
"Compilation": movie_data.get("Compilation", ""),
"Webscene": movie_data.get("Webscene", ""),
"Director": movie_data.get("Director", ""),
"DirectorHref": movie_data.get("DirectorHref", ""),
"DistributorHref": movie_data.get("DistributorHref", ""),
"StudioHref": movie_data.get("StudioHref", ""),
"Performers": performers,
"SceneBreakdowns": scene_breakdowns,
"AppearsIn": appears_in,
}
if __name__ == "__main__":
for astro in astro_list:
url = astr_base_url + astro
next_url = url
logging.info(f"Fetching data for {astro}, url {url} ...")
while True:
soup = fetch_page(next_url, partial(generic_validator, tag="div", identifier="astro", attr_type="id"))
if soup:
list_data, next_url = parse_page_astro(soup, astro)
if list_data:
print(list_data[0] if len(list_data)>0 else 'no data')
break
else:
logging.info(f"Retrying {next_url} ...")
time.sleep(5) # 等待后再重试
time.sleep(2) # 控制访问频率

107
iafd/src/load.py Normal file
View File

@ -0,0 +1,107 @@
import json
import time
import csv
import argparse
import logging
from functools import partial
import config
import sqlite_utils as db_tools
import iafd_scraper as scraper
import utils
config.setup_logging()
res_dir = '/root/hostdir/scripts_data/iafd_202503'
# 演员列表
def load_performer_list(file, **from_fields):
json_data = utils.read_json(file)
if json_data is None:
json_data = []
total_rows = len(json_data)
loaded_rows = 0
succ = 0
for row in json_data:
row_id = db_tools.insert_performer_index(name=row.get('person', ''),
href=row.get('href', ''),
**from_fields
)
if row_id:
logging.debug(f'insert one person, id: {row_id}, person: {row['person']}, url: {row['href']}')
succ += 1
else:
logging.warning(f'insert person failed. {row['person']}, {row['href']} failed.')
loaded_rows += 1
if loaded_rows % 10000 == 0:
logging.info(f'loading file: {file}, total rows: {total_rows}, loaded rows: {loaded_rows}, succ rows: {succ}')
logging.info(f'load data succ. file: {file}, rows: {total_rows}, succ rows: {succ}')
# movie 列表
def load_movie_list(file, **from_fields):
json_data = utils.read_json(file)
if json_data is None:
json_data = []
total_rows = len(json_data)
loaded_rows = 0
succ = 0
for row in json_data:
row_id = db_tools.insert_movie_index(title=row.get('title', ''),
href=row.get('href', ''),
release_year=utils.to_number(row['year']),
**from_fields
)
if row_id:
logging.debug(f'insert one movie, id: {row_id}, title: {row['title']}, url: {row['href']}')
succ += 1
else:
logging.warning(f'insert movie failed: {row['title']}, {row['href']} failed.')
loaded_rows += 1
if loaded_rows % 10000 == 0:
logging.info(f'loading file: {file}, total rows: {total_rows}, loaded rows: {loaded_rows}, succ rows: {succ}')
logging.info(f'load data succ. file: {file}, rows: {len(json_data)}, succ rows: {succ}')
# 演员详情
def load_performers(file):
json_data = utils.read_json(file)
if json_data is None:
json_data = []
total_rows = len(json_data)
loaded_rows = 0
succ = 0
for row in json_data:
performer_id = db_tools.insert_or_update_performer(row)
if performer_id:
logging.debug(f'insert one person, id: {performer_id}, person: {row['person']}, url: {row['href']}')
succ += 1
else:
logging.warning(f'insert person failed. {row['person']}, {row['href']} failed.')
loaded_rows += 1
if loaded_rows % 10000 == 0:
logging.info(f'loading file: {file}, total rows: {total_rows}, loaded rows: {loaded_rows}, succ rows: {succ}')
logging.info(f'load data succ. file: {file}, rows: {len(json_data)}, succ rows: {succ}')
if __name__ == "__main__":
load_performer_list(f'{res_dir}/astro.json', from_astro_list=1)
time.sleep(3)
load_performer_list(f'{res_dir}/birth.json', from_birth_list=1)
time.sleep(3)
load_performer_list(f'{res_dir}/ethnic.json', from_ethnic_list=1)
time.sleep(3)
load_movie_list(f'{res_dir}/distributors.json', from_dist_list=1)
time.sleep(3)
load_movie_list(f'{res_dir}/studios.json', from_stu_list=1)
time.sleep(3)
load_performers(f'{res_dir}/performers.json')

848
iafd/src/sqlite_utils.py Normal file
View File

@ -0,0 +1,848 @@
import sqlite3
import json
import config
import utils
import logging
import sys
from datetime import datetime
# 连接 SQLite 数据库
DB_PATH = f"{config.global_share_data_dir}/shared.db" # 替换为你的数据库文件
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 获取当前时间
def get_current_time():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# """从指定表中通过 href 查找 id"""
def get_id_by_href(table: str, href: str) -> int:
if href is None:
return None
cursor.execute(f"SELECT id FROM {table} WHERE href = ?", (href,))
row = cursor.fetchone()
return row[0] if row else None
# 插入演员索引,来自于列表数据
def insert_performer_index(name, href, from_astro_list=None, from_birth_list=None, from_ethnic_list=None, from_movie_list=None):
try:
# **查询是否已存在该演员**
cursor.execute("""
SELECT id, name, from_astro_list, from_birth_list, from_ethnic_list, from_movie_list
FROM iafd_performers WHERE href = ?
""", (href,))
existing_performer = cursor.fetchone()
if existing_performer: # **如果演员已存在**
performer_id, existing_name, existing_astro, existing_birth, existing_ethnic, existing_movie = existing_performer
# **如果没有传入值,则保持原有值**
from_astro_list = from_astro_list if from_astro_list is not None else existing_astro
from_birth_list = from_birth_list if from_birth_list is not None else existing_birth
from_ethnic_list = from_ethnic_list if from_ethnic_list is not None else existing_ethnic
from_movie_list = from_movie_list if from_movie_list is not None else existing_movie
cursor.execute("""
UPDATE iafd_performers
SET name = ?,
from_astro_list = ?,
from_birth_list = ?,
from_ethnic_list = ?,
from_movie_list = ?,
updated_at = datetime('now', 'localtime')
WHERE href = ?
""", (name, from_astro_list, from_birth_list, from_ethnic_list, from_movie_list, href))
else: # **如果演员不存在,插入**
cursor.execute("""
INSERT INTO iafd_performers (href, name, from_astro_list, from_birth_list, from_ethnic_list, from_movie_list)
VALUES (?, ?, COALESCE(?, 0), COALESCE(?, 0), COALESCE(?, 0), COALESCE(?, 0))
""", (href, name, from_astro_list, from_birth_list, from_ethnic_list, from_movie_list))
conn.commit()
performer_id = get_id_by_href('iafd_performers', href)
if performer_id:
logging.debug(f'Inserted/Updated performer index, id: {performer_id}, name: {name}, href: {href}')
return performer_id
except sqlite3.Error as e:
conn.rollback()
logging.error(f"数据库错误: {e}")
return None
except Exception as e:
conn.rollback()
logging.error(f"未知错误: {e}")
return None
# """插入电影索引,来自于列表数据"""
def insert_movie_index(title, href, release_year=0, from_performer_list=None, from_dist_list=None, from_stu_list=None):
try:
# **查询是否已存在该电影**
cursor.execute("""
SELECT id, title, release_year, from_performer_list, from_dist_list, from_stu_list
FROM iafd_movies WHERE href = ?
""", (href,))
existing_movie = cursor.fetchone()
if existing_movie: # **如果电影已存在**
movie_id, existing_title, existing_year, existing_performer, existing_dist, existing_stu = existing_movie
# **如果没有传入值,则保持原有值**
release_year = release_year if release_year != 0 else existing_year
from_performer_list = from_performer_list if from_performer_list is not None else existing_performer
from_dist_list = from_dist_list if from_dist_list is not None else existing_dist
from_stu_list = from_stu_list if from_stu_list is not None else existing_stu
cursor.execute("""
UPDATE iafd_movies
SET title = ?,
release_year = ?,
from_performer_list = ?,
from_dist_list = ?,
from_stu_list = ?,
updated_at = datetime('now', 'localtime')
WHERE href = ?
""", (title, release_year, from_performer_list, from_dist_list, from_stu_list, href))
else: # **如果电影不存在,插入**
cursor.execute("""
INSERT INTO iafd_movies (title, href, release_year, from_performer_list, from_dist_list, from_stu_list)
VALUES (?, ?, ?, COALESCE(?, 0), COALESCE(?, 0), COALESCE(?, 0))
""", (title, href, release_year, from_performer_list, from_dist_list, from_stu_list))
conn.commit()
movie_id = get_id_by_href('iafd_movies', href)
if movie_id:
logging.debug(f'Inserted/Updated movie index, id: {movie_id}, title: {title}, href: {href}')
return movie_id
except sqlite3.Error as e:
conn.rollback()
logging.error(f"数据库错误: {e}")
return None
except Exception as e:
conn.rollback()
logging.error(f"未知错误: {e}")
return None
# 插入演员和电影的关联数据
def insert_performer_movie(performer_id, movie_id, role, notes):
try:
cursor.execute("""
INSERT INTO iafd_performers_movies (performer_id, movie_id, role, notes)
VALUES (?, ?, ?, ?)
ON CONFLICT(movie_id, performer_id) DO UPDATE SET notes=excluded.notes, role=excluded.role
""",
(performer_id, movie_id, role, notes)
)
conn.commit()
#logging.debug(f'insert one performer_movie, performer_id: {performer_id}, movie_id: {movie_id}')
return performer_id
except Exception as e:
conn.rollback()
logging.error("Error inserting movie: %s", e)
return None
# 插入电影和电影的关联数据
def insert_movie_appears_in(movie_id, appears_in_id, gradation=0, notes=''):
try:
cursor.execute("""
INSERT INTO iafd_movies_appers_in (movie_id, appears_in_id, gradation, notes)
VALUES (?, ?, ?, ?)
ON CONFLICT(movie_id, appears_in_id) DO UPDATE SET notes=excluded.notes, gradation=excluded.gradation
""",
(movie_id, appears_in_id, gradation, notes)
)
conn.commit()
#logging.debug(f'insert one movie_appears_in, movie_id: {movie_id}, appears_in_id: {appears_in_id}')
return movie_id
except Exception as e:
conn.rollback()
logging.error("Error inserting movie: %s", e)
return None
# 插入演员信息
def insert_or_update_performer(data):
try:
cursor.execute("""
INSERT INTO iafd_performers (href, name, gender, birthday, astrology, birthplace, years_active, ethnicity, nationality, hair_colors,
eye_color, height_str, weight_str, measurements, tattoos, piercings, weight, height, movies_cnt, vixen_cnt,
blacked_cnt, tushy_cnt, x_art_cnt, is_full_data, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, datetime('now', 'localtime'))
ON CONFLICT(href) DO UPDATE SET
name = excluded.name,
gender = excluded.gender,
birthday = excluded.birthday,
astrology = excluded.astrology,
birthplace = excluded.birthplace,
years_active = excluded.years_active,
ethnicity = excluded.ethnicity,
nationality = excluded.nationality,
hair_colors = excluded.hair_colors,
eye_color = excluded.eye_color,
height_str = excluded.height_str,
weight_str = excluded.weight_str,
measurements = excluded.measurements,
tattoos = excluded.tattoos,
piercings = excluded.piercings,
weight = excluded.weight,
height = excluded.height,
movies_cnt = excluded.movies_cnt,
vixen_cnt = excluded.vixen_cnt,
blacked_cnt = excluded.blacked_cnt,
tushy_cnt = excluded.tushy_cnt,
x_art_cnt = excluded.x_art_cnt,
is_full_data = 1,
updated_at = datetime('now', 'localtime')
""", (
data["href"], data["person"], data.get("gender"), data.get("birthday"), data.get("astrology"), data.get("birthplace"), data.get("years_active"),
data.get("ethnicity"), data.get("nationality"), data.get("hair_colors"), data.get("eye_color"), data.get("height"),
data.get("weight"), data.get("measurements"), data.get("tattoos"), data.get("piercings"), utils.parse_weight(data.get('weight')), utils.parse_height(data.get('height')),
data.get("movies_cnt", 0), data.get("vixen_cnt", 0), data.get("blacked_cnt", 0), data.get("tushy_cnt", 0), data.get("x_art_cnt", 0)
))
# 获取 performer_id
performer_id = get_id_by_href('iafd_performers', data["href"])
if performer_id is None:
return None
logging.debug(f'insert one performer, id: {performer_id}, name: {data['person']}, href: {data['href']}')
# 插入新的 alias
for alias in data.get("performer_aka") or []:
if alias.lower() != "no known aliases":
cursor.execute("INSERT OR IGNORE INTO iafd_performer_aliases (performer_id, alias) VALUES (?, ?) ", (performer_id, alias))
conn.commit()
# 插入影片列表,可能有 personal 和 director 两个身份
credits = data.get('credits', {})
for role, movies in credits.items():
if movies:
for movie in movies:
movie_id = get_id_by_href('iafd_movies', movie['href'])
# 影片不存在,先插入
if movie_id is None:
movie_id = insert_movie_index(movie['title'], movie['href'], utils.to_number(movie['year']), from_performer_list=1)
if movie_id:
tmp_id = insert_performer_movie(performer_id, movie_id, role, movie['notes'])
if tmp_id :
logging.debug(f'insert one performer_movie, performer_id: {performer_id}, movie_id: {movie_id}, role: {role}')
else:
logging.warning(f'insert performer_movie failed. performer_id: {performer_id}, moive href: {movie['href']}')
return performer_id
except sqlite3.Error as e:
conn.rollback()
logging.error(f"数据库错误: {e}")
return None
except Exception as e:
conn.rollback()
logging.error(f"未知错误: {e}")
return None
# """插入或更新电影数据(异常url的处理比如404链接)"""
def insert_or_update_performer_404(name, href):
try:
cursor.execute("""
INSERT INTO iafd_performers (href, name, is_full_data, updated_at)
VALUES (?, ?, 1, datetime('now', 'localtime'))
ON CONFLICT(href) DO UPDATE SET
name = excluded.name,
is_full_data = 1,
updated_at = datetime('now', 'localtime')
""", (
href, name
))
# 获取 performer_id
performer_id = get_id_by_href('iafd_performers', href)
if performer_id is None:
return None
logging.debug(f'insert one performer, id: {performer_id}, name: {name}, href: {href}')
return performer_id
except sqlite3.Error as e:
conn.rollback()
logging.error(f"数据库错误: {e}")
return None
except Exception as e:
conn.rollback()
logging.error(f"未知错误: {e}")
return None
# 按 id 或 href 删除演员
def delete_performer(identifier):
try:
if isinstance(identifier, int):
cursor.execute("DELETE FROM iafd_performers WHERE id = ?", (identifier,))
elif isinstance(identifier, str):
cursor.execute("DELETE FROM iafd_performers WHERE href = ?", (identifier,))
else:
logging.warning("无效的删除参数")
return
conn.commit()
logging.info(f"成功删除演员: {identifier}")
except sqlite3.Error as e:
conn.rollback()
logging.error(f"删除失败: {e}")
# 按 id、href 或 name 查询演员信息
def query_performer(identifier):
try:
if isinstance(identifier, int):
cursor.execute("SELECT * FROM iafd_performers WHERE id = ?", (identifier,))
elif "http" in identifier:
cursor.execute("SELECT * FROM iafd_performers WHERE href = ?", (identifier,))
else:
cursor.execute("SELECT * FROM iafd_performers WHERE name LIKE ?", (f"%{identifier}%",))
performer = cursor.fetchone()
if performer:
cursor.execute("SELECT alias FROM iafd_performer_aliases WHERE performer_id = ?", (performer[0],))
aliases = [row[0] for row in cursor.fetchall()]
result = dict(zip([desc[0] for desc in cursor.description], performer))
result["performer_aka"] = aliases
return result
else:
logging.warning(f"未找到演员: {identifier}")
return None
except sqlite3.Error as e:
logging.error(f"查询失败: {e}")
return None
# 按条件查询 href 列表
def query_performer_hrefs(**filters):
try:
sql = "SELECT href, name FROM iafd_performers WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "href" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "name" in filters:
sql += " AND name LIKE ?"
params.append(f"%{filters['name']}%")
if "is_full_data" in filters:
sql += " AND is_full_data = ?"
params.append(filters["is_full_data"])
if 'limit' in filters:
sql += " limit ?"
params.append(filters["limit"])
cursor.execute(sql, params)
#return [row[0].lower() for row in cursor.fetchall()] # 返回小写
return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 插入或更新发行商 """
def insert_or_update_ethnic(data):
try:
cursor.execute("""
INSERT INTO iafd_meta_ethnic (name, href)
VALUES (?, ?)
ON CONFLICT(href) DO UPDATE SET
name = excluded.name
""", (data["name"], data["href"]))
conn.commit()
# 获取 performer_id
cursor.execute("SELECT id FROM iafd_meta_ethnic WHERE href = ?", (data["href"],))
dist_id = cursor.fetchone()[0]
if dist_id:
logging.debug(f"成功插入/更新ethnic: {data['name']}")
return dist_id
else:
return None
except sqlite3.Error as e:
conn.rollback()
logging.error(f"数据库错误: {e}")
return None
# 按条件查询 href 列表
def query_ethnic_hrefs(**filters):
try:
sql = "SELECT href, name FROM iafd_meta_ethnic WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "url" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "name" in filters:
sql += " AND name LIKE ?"
params.append(f"%{filters['name']}%")
cursor.execute(sql, params)
#return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写
return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# 插入或更新发行商 """
def insert_or_update_distributor(data):
try:
cursor.execute("""
INSERT INTO iafd_distributors (name, href, updated_at)
VALUES (?, ? , datetime('now', 'localtime'))
ON CONFLICT(href) DO UPDATE SET
name = excluded.name,
updated_at = datetime('now', 'localtime')
""", (data["name"], data["href"]))
conn.commit()
# 获取 performer_id
cursor.execute("SELECT id FROM iafd_distributors WHERE href = ?", (data["href"],))
dist_id = cursor.fetchone()[0]
if dist_id:
logging.debug(f"成功插入/更新发行商: {data['name']}")
return dist_id
else:
return None
except sqlite3.Error as e:
conn.rollback()
logging.error(f"数据库错误: {e}")
return None
# 删除发行商(按 id 或 name """
def delete_distributor(identifier):
try:
if isinstance(identifier, int):
cursor.execute("DELETE FROM iafd_distributors WHERE id = ?", (identifier,))
elif isinstance(identifier, str):
cursor.execute("DELETE FROM iafd_distributors WHERE name = ?", (identifier,))
conn.commit()
logging.info(f"成功删除发行商: {identifier}")
except sqlite3.Error as e:
conn.rollback()
logging.error(f"删除失败: {e}")
# 查询发行商(按 id 或 name """
def query_distributor(identifier):
try:
if isinstance(identifier, int):
cursor.execute("SELECT * FROM iafd_distributors WHERE id = ?", (identifier,))
else:
cursor.execute("SELECT * FROM iafd_distributors WHERE name LIKE ?", (f"%{identifier}%",))
distributor = cursor.fetchone()
if distributor:
return dict(zip([desc[0] for desc in cursor.description], distributor))
else:
logging.warning(f"未找到发行商: {identifier}")
return None
except sqlite3.Error as e:
logging.error(f"查询失败: {e}")
return None
# 按条件查询 href 列表
def query_distributor_hrefs(**filters):
try:
sql = "SELECT href FROM iafd_distributors WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "url" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "name" in filters:
sql += " AND name LIKE ?"
params.append(f"%{filters['name']}%")
cursor.execute(sql, params)
return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# """ 插入或更新制作公司 """
def insert_or_update_studio(data):
try:
cursor.execute("""
INSERT INTO iafd_studios (name, href, updated_at)
VALUES (?, ?, datetime('now', 'localtime'))
ON CONFLICT(href) DO UPDATE SET
name = excluded.name,
updated_at = datetime('now', 'localtime')
""", (data["name"], data["href"]))
conn.commit()
# 获取 performer_id
cursor.execute("SELECT id FROM iafd_studios WHERE href = ?", (data["href"],))
stu_id = cursor.fetchone()[0]
if stu_id:
logging.debug(f"成功插入/更新发行商: {data['name']}")
return stu_id
else:
return None
except sqlite3.Error as e:
conn.rollback()
logging.error(f"数据库错误: {e}")
return None
# """ 删除制作公司(按 id 或 name """
def delete_studio(identifier):
try:
if isinstance(identifier, int):
cursor.execute("DELETE FROM iafd_studios WHERE id = ?", (identifier,))
elif isinstance(identifier, str):
cursor.execute("DELETE FROM iafd_studios WHERE name = ?", (identifier,))
conn.commit()
logging.info(f"成功删除制作公司: {identifier}")
except sqlite3.Error as e:
conn.rollback()
logging.error(f"删除失败: {e}")
# """ 查询制作公司(按 id 或 name """
def query_studio(identifier):
try:
if isinstance(identifier, int):
cursor.execute("SELECT * FROM iafd_studios WHERE id = ?", (identifier,))
else:
cursor.execute("SELECT * FROM iafd_studios WHERE name LIKE ?", (f"%{identifier}%",))
studio = cursor.fetchone()
if studio:
return dict(zip([desc[0] for desc in cursor.description], studio))
else:
logging.warning(f"未找到制作公司: {identifier}")
return None
except sqlite3.Error as e:
logging.error(f"查询失败: {e}")
return None
# 按条件查询 href 列表
def query_studio_hrefs(**filters):
try:
sql = "SELECT href FROM iafd_studios WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "href" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "name" in filters:
sql += " AND name LIKE ?"
params.append(f"%{filters['name']}%")
cursor.execute(sql, params)
return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return None
# """插入或更新电影数据"""
def insert_or_update_movie(movie_data):
try:
# 获取相关 ID
distributor_id = get_id_by_href('iafd_distributors', movie_data['DistributorHref'])
studio_id = get_id_by_href('iafd_studios', movie_data['StudioHref'])
director_id = get_id_by_href('iafd_performers', movie_data['DirectorHref'])
# 导演不存在的话,插入一条
if director_id is None:
director_id = insert_performer_index( movie_data['Director'], movie_data['DirectorHref'], from_movie_list=1)
if studio_id is None:
studio_id = 0
if distributor_id is None:
distributor_id = 0
# 插入或更新电影信息
cursor.execute(
"""
INSERT INTO iafd_movies (title, minutes, distributor_id, studio_id, release_date, added_to_IAFD_date,
all_girl, all_male, compilation, webscene, director_id, href, is_full_data, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, datetime('now', 'localtime'))
ON CONFLICT(href) DO UPDATE SET
title=excluded.title, minutes=excluded.minutes, distributor_id=excluded.distributor_id,
studio_id=excluded.studio_id, release_date=excluded.release_date,
added_to_IAFD_date=excluded.added_to_IAFD_date, all_girl=excluded.all_girl,
all_male=excluded.all_male, compilation=excluded.compilation, webscene=excluded.webscene,
director_id=excluded.director_id, is_full_data=1, updated_at = datetime('now', 'localtime')
""",
(movie_data['title'], movie_data['Minutes'], distributor_id, studio_id, movie_data['ReleaseDate'],
movie_data['AddedtoIAFDDate'], movie_data['All-Girl'], movie_data['All-Male'],
movie_data['Compilation'], movie_data['Webscene'], director_id, movie_data['href'])
)
conn.commit()
# 获取插入的 movie_id
movie_id = get_id_by_href('iafd_movies', movie_data['href'])
if movie_id is None:
return None
logging.debug(f'insert one move, id: {movie_id}, title: {movie_data['title']}, href: {movie_data['href']}')
# 插入 performers_movies 关系表
for performer in movie_data.get('Performers', []):
performer_id = get_id_by_href('iafd_performers', performer['href'])
# 如果演员不存在,先插入
if performer_id is None:
performer_id = insert_performer_index(performer['name'], performer['href'], from_movie_list=1)
if performer_id:
notes = '|'.join(tag for tag in performer['tags'] if tag != performer['name'])
tmp_id = insert_performer_movie(performer_id, movie_id, 'personal', notes)
if tmp_id:
logging.debug(f"insert one perfomer_movie. perfomer_id: {performer_id}, movie_id:{movie_id}")
else:
logging.debug(f'insert perfomer_movie failed. perfomer_id: {performer_id}, movie_id:{movie_id}')
else:
logging.warning(f'insert perfomer failed. name: {performer['name']}, href: {performer['href']}')
# 插入 movies_appers_in 表
for appears in movie_data.get("AppearsIn", []):
appears_in_id = get_id_by_href('iafd_movies', appears['href'])
# 不存在,先插入
if appears_in_id is None:
appears_in_id = insert_movie_index( appears['title'], appears['href'])
if appears_in_id:
tmp_id = insert_movie_appears_in(movie_id, appears_in_id)
if tmp_id:
logging.debug(f'insert one movie_appears_in record. movie_id: {movie_id}, appears_in_id: {appears_in_id}')
else:
logging.warning(f'insert movie_appears_in failed. movie_id: {movie_id}, appears_in_id: {appears_in_id}')
else:
logging.warning(f'get appears_in_id failed. title: {appears['title']}, href: {appears['href']}')
return movie_id
except Exception as e:
conn.rollback()
logging.error("Error inserting movie: %s", e)
return None
# """插入或更新电影数据(异常url的处理比如404链接)"""
def insert_or_update_movie_404(title, href):
try:
# 插入或更新电影信息
cursor.execute(
"""
INSERT INTO iafd_movies (title, href, is_full_data, updated_at)
VALUES (?, ?, 1, datetime('now', 'localtime'))
ON CONFLICT(href) DO UPDATE SET
title=excluded.title, is_full_data=1, updated_at = datetime('now', 'localtime')
""",
(title, href)
)
conn.commit()
# 获取插入的 movie_id
movie_id = get_id_by_href('iafd_movies', href)
if movie_id is None:
return None
return movie_id
except Exception as e:
conn.rollback()
logging.error("Error inserting movie: %s", e)
return None
# 删除电影数据"""
def delete_movie(identifier):
try:
if isinstance(identifier, int):
cursor.execute("DELETE FROM iafd_movies WHERE id = ?", (identifier,))
elif isinstance(identifier, str):
cursor.execute("DELETE FROM iafd_movies WHERE href = ?", (identifier,))
else:
logging.warning("无效的删除参数")
return
conn.commit()
logging.info(f"Deleted movie with {identifier}")
except sqlite3.Error as e:
conn.rollback()
logging.error("Error deleting movie: %s", e)
# 查找电影数据"""
def query_movies(identifier):
try:
if isinstance(identifier, int):
cursor.execute("SELECT * FROM iafd_movies WHERE id = ?", (identifier,))
elif "http" in identifier:
cursor.execute("SELECT * FROM iafd_movies WHERE href = ?", (identifier,))
else:
cursor.execute("SELECT * FROM iafd_movies WHERE title LIKE ?", (f"%{identifier}%",))
movie = cursor.fetchone()
if movie:
cursor.execute("SELECT * FROM iafd_performers_movies WHERE performer_id = ?", (movie[0],))
performers = [row[0] for row in cursor.fetchall()]
result = dict(zip([desc[0] for desc in cursor.description], performers))
result["performers"] = performers
return result
else:
logging.warning(f"find no data: {identifier}")
return None
except sqlite3.Error as e:
logging.error(f"查询失败: {e}")
return None
# 按条件查询 href 列表
def query_movie_hrefs(**filters):
try:
sql = "SELECT href, title FROM iafd_movies WHERE 1=1"
params = []
if "id" in filters:
sql += " AND id = ?"
params.append(filters["id"])
if "href" in filters:
sql += " AND href = ?"
params.append(filters["href"])
if "title" in filters:
sql += " AND title LIKE ?"
params.append(f"%{filters['title']}%")
if "is_full_data" in filters:
sql += " AND is_full_data = ?"
params.append(filters["is_full_data"])
if 'limit' in filters:
sql += " limit ?"
params.append(filters["limit"])
cursor.execute(sql, params)
#return [row[0].lower() for row in cursor.fetchall()] # 链接使用小写
return [{'href': row[0], 'title': row[1]} for row in cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return []
# 获取 view_iafd_performers_movies 中数据 不匹配的演员信息。
def get_performers_needed_update(limit=None):
try:
sql = """
SELECT href, name FROM view_iafd_performers_movies where actual_movies_cnt != movies_cnt
"""
if limit is not None:
sql += f" LIMIT {limit}"
cursor.execute(sql)
return [{'href': row[0], 'name': row[1]} for row in cursor.fetchall()]
except sqlite3.Error as e:
logging.error(f"查询 href 失败: {e}")
return []
# 插入一条任务日志
def insert_task_log():
try:
cursor.execute("""
INSERT INTO iafd_task_log (task_status) VALUES ('Start')
""")
conn.commit()
task_id = cursor.lastrowid
if task_id is None:
return None
update_task_log(task_id=task_id, task_status='Start')
return task_id # 获取插入的 task_id
except sqlite3.Error as e:
logging.error(f"插入任务失败: {e}")
return None
# 更新任务日志的字段
def update_task_log_inner(task_id, **kwargs):
try:
fields = ", ".join(f"{key} = ?" for key in kwargs.keys())
params = list(kwargs.values()) + [task_id]
sql = f"UPDATE iafd_task_log SET {fields}, updated_at = datetime('now', 'localtime') WHERE task_id = ?"
cursor.execute(sql, params)
conn.commit()
except sqlite3.Error as e:
logging.error(f"更新任务 {task_id} 失败: {e}")
# 更新任务日志的字段
def update_task_log(task_id, task_status):
try:
# 获取 performers、studios 等表的最终行数
cursor.execute("SELECT COUNT(*) FROM iafd_performers where is_full_data=1")
full_data_performers = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_performers")
total_performers = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_movies where is_full_data=1")
full_data_movies = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_movies")
total_movies = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_distributors")
total_distributors = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM iafd_studios")
total_studios = cursor.fetchone()[0]
# 更新 task_log
update_task_log_inner(task_id,
full_data_performers=full_data_performers,
total_performers=total_performers,
full_data_movies=full_data_movies,
total_movies=total_movies,
total_distributors=total_distributors,
total_studios=total_studios,
task_status=task_status)
except sqlite3.Error as e:
logging.error(f"更新任务 {task_id} 失败: {e}")
# 任务结束,更新字段
def finalize_task_log(task_id):
try:
# 更新 task_log
update_task_log(task_id, task_status="Success")
except sqlite3.Error as e:
logging.error(f"任务 {task_id} 结束失败: {e}")
if __name__ == "__main__":
try:
with open('../result/detail.json', 'r') as file:
performers = json.load(file)
for performer in performers:
insert_or_update_performer(performer)
print(query_performer("Kirsten"))
#delete_performer("https://www.iafd.com/person.rme/id=ca699282-1b57-4ce7-9bcc-d7799a292e34")
print(query_performer_hrefs())
except FileNotFoundError:
logging.info("detail.json not found, starting fresh.")

101
iafd/src/utils.py Normal file
View File

@ -0,0 +1,101 @@
import re
import os
import json
import time
import csv
import logging
import config
# 解析 height 和 weight转换成数字
def parse_height(height_str):
return 0
try:
return int(height_str.split("(")[-1].replace(" cm)", ""))
except:
return None
def parse_weight(weight_str):
return 0
try:
return int(weight_str.split(" ")[0])
except:
return None
update_dir = f'{config.global_host_data_dir}/iafd'
performers_dir = f'{update_dir}/performers'
movies_dir = f'{update_dir}/movies'
def to_number(value):
"""将字符串转换为数字,如果无效则返回 0"""
try:
return float(value)
except (ValueError, TypeError):
return 0
def dist_stu_href_rewrite(href):
# 提取 ID适用于 distrib 或 studio
import re
match = re.search(r"(distrib|studio)=(\d+)", href)
if not match:
return None # 不是目标 URL返回 None
key, id_number = match.groups()
new_url = f"https://www.iafd.com/{key}.rme/{key}={id_number}"
return new_url
# 创建目录
def create_sub_directory(base_dir, str):
# 获取 person 的前两个字母并转为小写
sub_dir = str[:1].lower()
full_path = os.path.join(base_dir, sub_dir)
if not os.path.exists(full_path):
os.makedirs(full_path)
return full_path
# 从 https://www.iafd.com/person.rme/id=21898a3c-1ddd-4793-8d93-375d6db20586 中抽取 id 的值
def extract_id_from_href(href):
"""从href中提取id参数"""
match = re.search(r'id=([a-f0-9\-]+)', href)
return match.group(1) if match else ''
# 写入每个 performer 的单独 JSON 文件
def write_person_json(person, href, data):
# 获取目录
person_dir = create_sub_directory(performers_dir, person)
person_id = extract_id_from_href(href)
person_filename = f"{person.replace(' ', '-')}({person_id}).json" # 用 - 替换空格
full_path = os.path.join(person_dir, person_filename)
try:
with open(full_path, 'w', encoding='utf-8') as json_file:
json.dump(data, json_file, indent=4, ensure_ascii=False)
except Exception as e:
logging.error(f"Error writing file {full_path}: {e}")
# 写入每个 performer 的单独 JSON 文件
def write_movie_json(href, data):
# 获取目录
movie_id = extract_id_from_href(href)
person_dir = create_sub_directory(movies_dir, movie_id)
person_filename = f"{movie_id}.json" # 用 - 替换空格
full_path = os.path.join(person_dir, person_filename)
try:
with open(full_path, 'w', encoding='utf-8') as json_file:
json.dump(data, json_file, indent=4, ensure_ascii=False)
except Exception as e:
logging.error(f"Error writing file {full_path}: {e}")
# 读取json文件并返回内容
def read_json(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
print(f"文件 {file_path} 未找到.")
return None
except json.JSONDecodeError:
print(f"文件 {file_path} 解析错误.")
return None