import json import time import csv import sys import argparse import logging from functools import partial import config import sqlite_utils as db_tools import iafd_scraper as scraper import utils from pathlib import Path # 添加 src 目录到路径 root_dir = str(Path(__file__).resolve().parent.parent.parent) sys.path.append(root_dir) from src.monitor.scheduler import CommandScheduler from src.utils.utils import pretty_print_json config.setup_logging() debug = False force = False skip_local = True # 按星座获取演员列表,无翻页 def fetch_performers_by_astro(): for astro in scraper.astro_list: url = scraper.astr_base_url + astro logging.info(f"Fetching data for {astro}, url {url} ...") while True: soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="astro", attr_type="id")) if soup: list_data, next_url = scraper.parse_page_astro(soup, astro) if list_data: all_updated = True for row in list_data : # 写入演员数据表 perfomer_id = db_tools.insert_performer_index(name=row['person'], href=row.get('href', '').lower(), from_astro_list=1) if perfomer_id: logging.debug(f"insert performer index to db. performer_id:{perfomer_id}, name: {row['person']}, href:{row['href']}") else: logging.warning(f"insert performer index failed. name: {row['person']}, href:{row['href']}") all_updated = False # 全部写成功,才算完成,进行下一页 if all_updated: break else: logging.warning(f'fetch astro error. {url} ...') time.sleep(0.5) elif status_code and status_code == 404: logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...') break else: logging.warning(f'fetch astro error. {url} ...') time.sleep(3) # 调试添加break if debug: break # 按生日获取演员列表,无翻页 def fetch_performers_by_birth(): for month in range(1, 13): # 遍历1到12月 for day in range(1, 32): # 遍历1到31天 url = scraper.birth_base_url.format(month=month, day=day) logging.info(f"Fetching data for birth, url {url}") while True: soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="col-sm-12 col-lg-9", attr_type="class")) if soup: list_data, next_url = scraper.parse_page_birth(soup, month, day) if list_data: all_updated = True for row in list_data : # 写入演员数据表 perfomer_id = db_tools.insert_performer_index(name=row['person'], href=row.get('href', '').lower(), from_birth_list=1) if perfomer_id: logging.debug(f"insert performer index to db. performer_id:{perfomer_id}, name: {row['person']}, href:{row['href']}") break else: logging.warning(f"insert performer index failed. name: {row['person']}, href:{row['href']}") all_updated = False # 全部写成功,才算完成,进行下一页 if all_updated: break else: logging.warning(f'fetch astro error. {url} ...') time.sleep(1) elif status_code and status_code == 404: logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...') break else: logging.warning(f'fetch astro error. {url} ...') time.sleep(3) # 调试添加break if debug: return True # 更新人种列表 def fetch_ethic_list(): url = scraper.ethnic_list_url logging.info(f"Fetching data for performer's ethnic list, url {url} ...") soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="select", identifier="ethnicity1", attr_type="id")) if soup: list_data = scraper.parse_page_ethnic_list(soup, url) if list_data: for row in list_data : dist_id = db_tools.insert_or_update_ethnic({'name': row['name'], 'href': row.get('href', '')}) if dist_id: logging.debug(f"insert one record into ethnic table. id:{dist_id}, name: {row['name']}, href:{row.get('href', '')}") else: logging.warning(f'fetch ethnic error. {url} ...') elif status_code and status_code == 404: logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...') else: logging.warning(f'fetch page error. {url} ...') # 按人种获取演员列表,有翻页 def fetch_performers_by_ethnic(): # 先刷新列表 fetch_ethic_list() ethnic_list = db_tools.query_ethnic_hrefs() for row in ethnic_list: url = row['href'] ethnic = row['name'] next_url = url count = 0 pages = 0 while next_url: logging.info(f"Fetching data for {ethnic}, url {next_url} ...") soup, status_code = scraper.fetch_page(next_url, partial(scraper.generic_validator, tag="div", identifier="row headshotrow", attr_type="class"), parser="lxml", preprocessor=scraper.preprocess_html) if soup: list_data, next_page_url = scraper.parse_page_ethnic(soup, ethnic) if list_data: all_updated = True for row in list_data : # 写入演员数据表 perfomer_id = db_tools.insert_performer_index(name=row['person'], href=row.get('href', '').lower(), from_ethnic_list=1) if perfomer_id: count += 1 logging.debug(f"insert performer index to db. performer_id:{perfomer_id}, name: {row['person']}, href:{row['href']}") else: logging.warning(f"insert performer index failed. name: {row['person']}, href:{row['href']}") all_updated = False # 全部写成功,才算完成,进行下一页 if all_updated: next_url = next_page_url else: logging.warning(f'fetch astro error. {next_url} ...') elif status_code and status_code == 404: logging.warning(f'fetch page error. httpcode: {status_code}, url: {next_url}, Skiping...') break else: logging.warning(f'fetch astro error. {next_url} ...') time.sleep(3) pages +=1 # 调试添加break if debug: return True logging.info(f"fetched data for {ethnic} total pages: {pages}, total performers: {count}") # 获取distributors列表 def fetch_distributors_list(): url = scraper.distributors_list_url logging.info(f"Fetching data for distributors list, url {url} ...") soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="select", identifier="Distrib", attr_type="name")) if soup: list_data, next_url = scraper.parse_page_dist_stu_list(soup, "Distrib") if list_data: for row in list_data : dis_url = scraper.distributors_base_url + row['href'] dist_id = db_tools.insert_or_update_distributor({'name': row['name'], 'href': dis_url}) if dist_id: logging.debug(f"insert one record into distributors table. id:{dist_id}, name: {row['name']}, href:{dis_url}") else: logging.warning(f'fetch astro error. {url} ...') elif status_code and status_code == 404: logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...') else: logging.warning(f'fetch astro error. {url} ...') # 获取studios列表 def fetch_studios_list(): url = scraper.studios_list_url logging.info(f"Fetching data for studios list, url {url} ...") soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="select", identifier="Studio", attr_type="name")) if soup: list_data, next_url = scraper.parse_page_dist_stu_list(soup, "Studio") if list_data: for row in list_data : stu_url = scraper.studios_base_url + row['href'] stu_id = db_tools.insert_or_update_studio({'name': row['name'], 'href': stu_url}) if stu_id: logging.debug(f"insert one record into studios table. id:{stu_id}, name: {row['name']}, href:{stu_url}") else: logging.warning(f'fetch astro error. {url} ...') elif status_code and status_code == 404: logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...') else: logging.warning(f'fetch astro error. {url} ...') # 更新distributors列表中的影片信息 def fetch_movies_by_dist(): # 先刷新一下列表 fetch_distributors_list() url_list = db_tools.query_distributor_hrefs() if debug: url_list = db_tools.query_distributor_hrefs(name='vixen.com') for url in url_list: logging.info(f"Fetching data for distributor url {url} ...") while True: soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="table", identifier="distable", attr_type="id")) if soup: list_data, next_url = scraper.parse_page_dist_stu(soup, 'distable') if list_data: all_updated = True for movie in list_data: tmp_id = db_tools.insert_movie_index(title=movie['title'], href=movie['href'], release_year=utils.to_number(movie['year']), from_dist_list=1) if tmp_id: logging.debug(f"insert one movie index to db. movie_id: {tmp_id}, title: {movie['title']}, href: {movie['href']}") else: logging.warning(f"insert movie index failed. title: {movie['title']}, href: {movie['href']}") all_updated = False # 全部写成功,才算完成,进行下一页 if all_updated: break else : logging.warning(f'parse_page_movie error. url: {url}') time.sleep(1) elif status_code and status_code in [scraper.http_code_404, scraper.http_code_login, scraper.http_code_url]: logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...') break else: logging.warning(f'fetching page error. {url}') time.sleep(3) # 调试增加brak if debug: break # 更新distributors列表中的影片信息 def fetch_movies_by_stu(): # 先刷新一下列表 fetch_studios_list() url_list = db_tools.query_studio_hrefs() if debug: url_list = db_tools.query_studio_hrefs(name='vixen.com') for url in url_list: logging.info(f"Fetching data for studio url {url} ...") while True: soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="table", identifier="studio", attr_type="id")) if soup: list_data, next_url = scraper.parse_page_dist_stu(soup, 'studio') if list_data: all_updated = True for movie in list_data: tmp_id = db_tools.insert_movie_index(title=movie['title'], href=movie['href'], release_year=utils.to_number(movie['year']), from_stu_list=1) if tmp_id: logging.debug(f"insert one movie index to db. movie_id: {tmp_id}, title: {movie['title']}, href: {movie['href']}") else: logging.warning(f"insert movie index failed. title: {movie['title']}, href: {movie['href']}") all_updated = False # 全部写成功,才算完成,进行下一页 if all_updated: break else : logging.warning(f'parse_page_movie error. url: {url}') time.sleep(1) elif status_code and status_code in [scraper.http_code_404, scraper.http_code_login, scraper.http_code_url]: logging.warning(f'fetch page error. httpcode: {status_code}, url: {url}, Skiping...') break else: logging.warning(f'fetching page error. {url}') time.sleep(3) # 调试增加brak if debug: break # 更新演员信息,单次循环 def fetch_performers_detail_once(perfomers_list): last_performer_id = 0 for performer in perfomers_list: url = performer['href'] person = performer['name'] curr_id = performer['id'] movies_cnt = performer['movies_cnt'] logging.debug(f"Fetching data for performer ({person}), url {url} ...") soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="headshot", attr_type="id")) # 从本地读取的文件,忽略 if skip_local and status_code == scraper.http_code_local : last_performer_id = curr_id continue if soup: data = scraper.parse_page_performer(soup, url) if data: # 检查影片数量是否有更新 page_movies_cnt = int(data.get('movies_cnt', '0')) movies_changed = True if page_movies_cnt <= movies_cnt: movies_changed = False if not force: logging.info(f"actor already update. skipping... person: ({person}), url: {url}") last_performer_id = curr_id continue performer_id = db_tools.insert_or_update_performer({ 'href': url, 'person': person, **data }, movies_update=movies_changed ) if performer_id: logging.debug(f'insert one person, id: {performer_id}, person: ({person}), url: {url}') last_performer_id = performer_id else: logging.warning(f'insert person: ({person}) {url} failed.') # 写入到本地json文件 utils.write_person_json(person, url, { 'href': url, 'person': person, **data }) else: logging.warning(f'parse_page_performer error. person: ({person}), url: {url}') elif status_code and status_code == scraper.http_code_404: performer_id = db_tools.insert_or_update_performer_404(name=person, href=url, is_full_data=scraper.http_code_404) logging.warning(f'404 page. id: {performer_id}, name: {person}, url: {url}, Skiping...') elif status_code and status_code == scraper.http_code_url: performer_id = db_tools.insert_or_update_performer_404(name=person, href=url, is_full_data=scraper.http_code_url) logging.warning(f'601 page(wrong url). id: {performer_id}, name: {person}, url: {url}, Skiping...') else: logging.warning(f'fetch_page error. person: ({person}), url: {url}') if status_code != 99: # 从网站上获取的数据,需要控制频率 time.sleep(0.5) return last_performer_id # 更新演员信息 def fetch_performers_detail(): limit_count = 5 if debug else 100 perfomers_list = [] last_perfomer_id = 0 # 获取新演员的列表 while True: if force: # 从头逐个遍历 perfomers_list = db_tools.query_performer_hrefs(start_id=last_perfomer_id, is_full_data_not_in=[scraper.http_code_404, scraper.http_code_url], order_by='id asc', limit=limit_count) else: # 只做更新 perfomers_list = db_tools.query_performer_hrefs(is_full_data=0, limit=limit_count) if len(perfomers_list) < 1: logging.info(f'all new performers fetched. ') break last_perfomer_id = fetch_performers_detail_once(perfomers_list) logging.info(f'insert {len(perfomers_list)} person. last performer id: {last_perfomer_id}') if debug: break # 更新影片信息 def fetch_movies_detail(): limit_count = 10 if debug else 100 movies_list = [] last_movie_id = 0 while True: if force: # 从头逐个遍历 movies_list = db_tools.query_movie_hrefs(start_id=last_movie_id, is_full_data_not_in=[scraper.http_code_404, scraper.http_code_url], order_by='id asc', limit=limit_count) else: # 只做更新 movies_list = db_tools.query_movie_hrefs(is_full_data=0, limit=limit_count) if len(movies_list) < 1: logging.info(f'all movies fetched.') break succ_count = 0 for movie in movies_list: url = movie['href'] title = movie['title'] curr_id = movie['id'] logging.debug(f"Fetching data for movie: {curr_id}: ({title}), url {url} ...") soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="div", identifier="col-xs-12 col-sm-3", attr_type="class")) # 从本地读取的文件,忽略 if skip_local and status_code == scraper.http_code_local : last_movie_id = curr_id succ_count += 1 continue if soup: movie_data = scraper.parse_page_movie(soup, url, title) if movie_data : # 修复url不规范的问题 if movie_data['DistributorHref']: movie_data['DistributorHref'] = utils.dist_stu_href_rewrite(movie_data['DistributorHref'].lower()) if movie_data['StudioHref']: movie_data['StudioHref'] = utils.dist_stu_href_rewrite(movie_data['StudioHref'].lower()) movie_id = db_tools.insert_or_update_movie(movie_data) if movie_id: logging.debug(f'insert one movie, id: {movie_id}, title: ({title}) url: {url}') last_movie_id = movie_id succ_count += 1 else: logging.warning(f'insert movie {url} failed.') # 写入到本地json文件 utils.write_movie_json(url, movie_data) else: logging.warning(f'parse_page_movie error. url: {url}') elif status_code and status_code == scraper.http_code_404: # 标记为已处理 movie_id = db_tools.insert_or_update_movie_404(title=title, href=url, is_full_data=scraper.http_code_404) logging.warning(f'404 page. id: {movie_id}, title: ({title}), url: {url}, Skiping...') elif status_code and status_code == scraper.http_code_url: # 标记为已处理 movie_id = db_tools.insert_or_update_movie_404(title=title, href=url, is_full_data=scraper.http_code_url) logging.warning(f'601 page(wrong url). id: {movie_id}, title: ({title}), url: {url}, Skiping...') else: logging.warning(f'fetch_page error. url: {url}') if status_code != 99: # 从网站上获取的数据,需要控制频率 time.sleep(0.5) logging.info(f'total request: {len(movies_list)}, succ: {succ_count}. last movie id: {last_movie_id}') # 调试增加break if debug: return True def reset_actor_movie_cnt(): db_tools.reset_actor_movies() def check_task_status(): # 命令行参数处理 result = db_tools.get_statics() pretty_print_json(result) # 建立缩写到函数的映射 function_map = { "astro": fetch_performers_by_astro, "birth": fetch_performers_by_birth, "ethnic": fetch_performers_by_ethnic, "dist" : fetch_movies_by_dist, "stu" : fetch_movies_by_stu, "performers": fetch_performers_detail, "movies" : fetch_movies_detail, "reset_mv" : reset_actor_movie_cnt, "check" : check_task_status, } # 主函数 def main(cmd, args_debug, args_force, args_skip_local): global debug debug = args_debug if debug: logger = logging.getLogger() logger.setLevel(logging.DEBUG) global force force = args_force global skip_local skip_local = args_skip_local if cmd.lower() == 'check': check_task_status() return None # 开启任务 task_id = db_tools.insert_task_log() if task_id is None: logging.warning(f'insert task log error.') return None logging.info(f'running task. id: {task_id}, debug: {debug}, force: {force}, cmd: {cmd}') # 要执行的Shell命令(示例) shell_command = "cd ~/projects/resources/src/monitor; chmod u+x ./run.sh; ./run.sh iafd" # 创建命令调度器(30分钟执行一次) scheduler = CommandScheduler( command=shell_command, interval=10 if debug else 1800 ) scheduler.run_periodically() # 执行指定的函数 if cmd: function_names = args.cmd.split(",") # 拆分输入 for short_name in function_names: func = function_map.get(short_name.strip()) # 从映射中获取对应的函数 if callable(func): db_tools.update_task_log(task_id, task_status=f'Running {short_name}') func() else: print(f"Warning: {short_name} is not a valid function shortcut.") else: # 全量执行 for name, func in function_map.items(): if callable(func): db_tools.update_task_log(task_id, task_status=f'Running {name}') func() else: print(f"Warning: {name} is not a valid function shortcut.") logging.info(f'all process completed!') db_tools.finalize_task_log(task_id) scheduler.stop() # TODO: # 1, 演员列表中的影片数量,与电影列表中聚合出来的影片数量,可能不同。一个原因是某个影片有多个导演,且导演又兼了演员。比如: # https://www.iafd.com/title.rme/id=0f79d81f-25ff-40d1-967a-24b99f03b79a # https://www.iafd.com/person.rme/id=37efc86d-fefe-436d-8e3e-2e04b4e6565c # 目前的movie表保存导演信息有遗漏。需要调整 if __name__ == "__main__": # 命令行参数处理 keys_str = ",".join(function_map.keys()) parser = argparse.ArgumentParser(description='fetch iafd data.') parser.add_argument("--cmd", type=str, help=f"Comma-separated list of function shortcuts: {keys_str}") parser.add_argument('--debug', action='store_true', help='Enable debug mode (limit records)') parser.add_argument('--force', action='store_true', help='force update (true for rewrite all)') parser.add_argument('--skip_local', action='store_true', help='skip if cached html (true for skip)') args = parser.parse_args() main(args.cmd, args.debug, args.force, args.skip_local)