Files
stock/scripts/iafd/src/fetch.py
2025-03-03 19:01:41 +08:00

320 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import csv
import argparse
import logging
from functools import partial
import config
import sqlite_utils as utils
import iafd_scraper as scraper
import utils as func
config.setup_logging()
debug = True
# 按星座获取演员列表,无翻页
def fetch_performers_by_astro(existed_performer_hrefs):
performers = []
for astro in scraper.astro_list:
url = scraper.astr_base_url + astro
logging.info(f"Fetching data for {astro}, url {url} ...")
soup = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="astro", attr_type="id"))
if soup:
list_data, next_url = scraper.parse_page_astro(soup, astro)
if list_data:
for row in list_data :
if row['href'] not in existed_performer_hrefs:
performers.append({
'person' : row['person'],
'href' : row['href']
})
else:
logging.warning(f'fetch astro error. {url} ...')
else:
logging.warning(f'fetch astro error. {url} ...')
# 调试添加break
if debug:
break
return performers
# 按生日获取演员列表,无翻页
def fetch_performers_by_birth(existed_performer_hrefs):
performers = []
for month in range(1, 13): # 遍历1到12月
for day in range(1, 32): # 遍历1到31天
url = scraper.birth_base_url.format(month=month, day=day)
logging.info(f"Fetching data for birth, url {url}")
soup = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="col-sm-12 col-lg-9", attr_type="class"))
if soup:
list_data, next_url = scraper.parse_page_birth(soup, month, day)
if list_data:
for row in list_data :
if row['href'] not in existed_performer_hrefs:
performers.append({
'person' : row['person'],
'href' : row['href']
})
else:
logging.warning(f'fetch astro error. {url} ...')
else:
logging.warning(f'fetch astro error. {url} ...')
# 调试添加break
if debug:
return performers
return performers
# 处理带空格的种族名
def format_ethnic(ethnic):
return ethnic.replace(' ', '+')
# 按人种获取演员列表,有翻页
def fetch_performers_by_ethnic(existed_performer_hrefs):
performers = []
for ethnic in scraper.ethnic_list:
url = scraper.ethnic_url + format_ethnic(ethnic)
next_url = url
while next_url:
logging.info(f"Fetching data for {ethnic}, url {url} ...")
soup = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="row headshotrow", attr_type="class"),
parser="lxml", preprocessor=scraper.preprocess_html)
if soup:
list_data, next_url = scraper.parse_page_ethnic(soup, ethnic)
if list_data:
for row in list_data :
if row['href'] not in existed_performer_hrefs:
performers.append({
'person' : row['person'],
'href' : row['href']
})
else:
logging.warning(f'fetch astro error. {url} ...')
else:
logging.warning(f'fetch astro error. {url} ...')
# 调试添加break
if debug:
return performers
return performers
# 获取distributors列表
def fetch_distributors_list(existed_distributors_href):
url = scraper.distributors_list_url
distributors_list = []
logging.info(f"Fetching data for distributors list, url {url} ...")
soup = scraper.fetch_page(url, partial(scraper.generic_validator, tag="select", identifier="Distrib", attr_type="name"))
if soup:
list_data, next_url = scraper.parse_page_dist_stu_list(soup, "Distrib")
if list_data:
for row in list_data :
dis_url = scraper.distributors_base_url + row['href']
if dis_url in existed_distributors_href :
continue
distributors_list.append({
'name' : row['name'],
'href' : dis_url
})
else:
logging.warning(f'fetch astro error. {url} ...')
else:
logging.warning(f'fetch astro error. {url} ...')
return distributors_list
# 获取studios列表
def fetch_studios_list(existed_studios_href):
url = scraper.studios_list_url
studios_list = []
logging.info(f"Fetching data for studios list, url {url} ...")
soup = scraper.fetch_page(url, partial(scraper.generic_validator, tag="select", identifier="Studio", attr_type="name"))
if soup:
list_data, next_url = scraper.parse_page_dist_stu_list(soup, "Studio")
if list_data:
for row in list_data :
stu_url = scraper.studios_base_url + row['href']
if stu_url in existed_studios_href:
continue
studios_list.append({
'name' : row['name'],
'href' : stu_url
})
else:
logging.warning(f'fetch astro error. {url} ...')
else:
logging.warning(f'fetch astro error. {url} ...')
return studios_list
# 获取更新
def check_update():
# 读取数据库中的演员列表
existed_performer_hrefs = utils.query_performer_hrefs()
if not existed_performer_hrefs:
logging.warning(f'get existed performers from db error.')
return None
# 从列表页获取新的演员
new_performers = []
#new_performers.extend(fetch_performers_by_astro(existed_performer_hrefs))
#new_performers.extend(fetch_performers_by_birth(existed_performer_hrefs))
new_performers.extend(fetch_performers_by_ethnic(existed_performer_hrefs))
# 逐个获取演员信息并写入到db中
new_performers = list({item["href"]: item for item in new_performers}.values())
logging.info(f'get new performers count: {len(new_performers)} ')
for performer in new_performers:
url = performer['href']
person = performer['person']
soup = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="headshot", attr_type="id"))
if soup:
data, credits = scraper.parse_page_performer(soup)
if data:
performer_id = utils.insert_or_update_performer({
'href': url,
'person': person,
**data
})
if performer_id:
logging.info(f'insert one person, id: {performer_id}, person: {person}, url: {url}')
else:
logging.warning(f'insert person: {person} {url} failed.')
# 写入到本地json文件
func.write_person_json(person, url, {
'href': url,
'person': person,
**data,
'credits': credits if credits else {}
})
else:
logging.warning(f'parse_page_performer error. person: {person}, url: {url}')
else:
logging.warning(f'fetch_page error. person: {person}, url: {url}')
# 调试break
if debug:
break
# 从数据库读取distributors列表
existed_distributors_href = utils.query_distributor_hrefs()
if existed_distributors_href is None:
logging.warning(f'get existed distributors from db error.')
return
new_distributors = fetch_distributors_list(existed_distributors_href)
for dist in new_distributors:
dist_id = utils.insert_or_update_distributor(dist)
if dist_id:
logging.info(f'insert one distributor record, id: {dist_id}, name: {dist['name']}, href: {dist['href']}')
else:
logging.warning(f'insert into studio failed. name: {dist['name']} href: {dist['href']}')
# 从数据库读取studios列表
existed_studios_href = utils.query_studio_hrefs()
if existed_studios_href is None:
logging.warning(f'get existed studios from db error.')
return
new_studios = fetch_studios_list(existed_studios_href)
for stu in new_studios:
stu_id = utils.insert_or_update_studio(stu)
if stu_id:
logging.info(f'insert one studio record, id: {stu_id}, name: {stu['name']}, href: {stu['href']}')
else:
logging.warning(f'insert into studio failed. name: {stu['name']}, href: {stu['href']}')
# 从数据库中读取影片列表
existed_movies = utils.query_movie_hrefs()
if existed_movies is None:
logging.warning(f'load movies from db error')
return
new_movies = []
new_movie_hrefs = []
# 遍历所有 distributors获取 movies 列表
existed_distributors_href = utils.query_distributor_hrefs(name='vixen')
if existed_distributors_href is None:
logging.warning(f'get existed distributors from db error.')
return
for url in existed_distributors_href:
soup = scraper.fetch_page(url, partial(scraper.generic_validator, tag="table", identifier="distable", attr_type="id"))
if soup:
list_data, next_url = scraper.parse_page_dist_stu(soup, 'distable')
if list_data:
for movie in list_data:
if movie['href'] in existed_movies:
continue
new_movies.append({
'title' : movie['title'],
'href' : movie['href']
})
new_movie_hrefs.append(movie['href'])
else :
logging.warning(f'parse_page_movie error. url: {url}')
# 调试增加brak
if debug:
break
logging.info(f'all new moives found for distributors, now total new {len(new_movies)}')
# 遍历所有 studios获取 movies 列表
existed_studios_href = utils.query_studio_hrefs(name='vixen')
if existed_studios_href is None:
logging.warning(f'get existed studios from db error.')
return
for url in existed_studios_href:
soup = scraper.fetch_page(url, partial(scraper.generic_validator, tag="table", identifier="studio", attr_type="id"))
if soup:
list_data, next_url = scraper.parse_page_dist_stu(soup, 'studio')
if list_data:
for movie in list_data:
if movie['href'] in existed_movies and movie['href'] in new_movie_hrefs:
continue
new_movies.append({
'title' : movie['title'],
'href' : movie['href']
})
new_movie_hrefs.append(movie['href'])
else :
logging.warning(f'parse_page_movie error. url: {url}')
# 调试增加brak
if debug:
break
logging.info(f'all new moives found for studios, now total new {len(new_movies)}')
# 对新的影片,逐个获取内容
new_movies = list({item["href"]: item for item in new_movies}.values())
logging.info(f'get merged new movies, count: {len(new_movies)} ')
for movie in new_movies:
url = movie['href']
title = movie['title']
soup = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="col-xs-12 col-sm-3", attr_type="class"))
if soup:
movie_data = scraper.parse_page_movie(soup, url, title)
if movie_data :
movie_id = utils.insert_or_update_movie(movie_data)
if movie_id:
logging.info(f'insert one movie, id: {movie_id}, title: {title} url: {url}')
else:
logging.warning(f'insert movie {url} failed.')
# 写入到本地json文件
func.write_movie_json(url, movie_data)
else:
logging.warning(f'parse_page_movie error. url: {url}')
else:
logging.warning(f'fetch_page error. url: {url}')
# 调试增加break
if debug:
break
logging.info(f'all process completed!')
if __name__ == "__main__":
check_update()