modify scripts

This commit is contained in:
oscarz
2025-06-19 16:55:21 +08:00
parent afe3d2e96e
commit 767858f7a4
3 changed files with 277 additions and 10 deletions

View File

@ -27,6 +27,8 @@ target_torrent_dir = f"{config.global_share_data_dir}/u3c3_torrents"
def fetch_list(start_p=1):
p = start_p
total_results = []
# 备份已有文件
utils.backup_existing_file(target_csv)
while True:
url = f"https://u001.25img.com/?p={p}"
logging.info(f"fetching url {url}")
@ -40,10 +42,15 @@ def fetch_list(start_p=1):
if total_pages:
if p >= total_pages:
url = None
break
else:
p += 1
if p % 10 == 0 :
utils.write_to_csv(total_results, target_csv)
#utils.write_to_csv(total_results, target_csv)
lines = utils.append_to_csv(total_results, target_csv)
if lines:
logging.info(f"write to csv file. new lines: {len(total_results)}, total lines: {lines}")
total_results.clear() # 清空缓冲区
time.sleep(1)
else:
logging.warning(f"fetch_list failed. url: {url} ")
@ -52,13 +59,17 @@ def fetch_list(start_p=1):
else:
logging.warning(f'fetch_page error. url: {url}, status_code: {status_code}')
if not url:
break
if debug:
break
# 写入csv文件
lines = utils.write_to_csv(total_results, target_csv)
total_results.clear()
if lines:
logging.info(f"write to file succ. total lines: {lines}, file: {target_csv}")
logging.info(f"write to file succ. file: {target_csv}. total lines: {lines}")
logging.info(f"fetch list finished. total pages: {p}")
@ -112,10 +123,86 @@ def down_torrents():
break
time.sleep(1)
# 获取演员列表
def fetch_sis_list(url = 'https://sis001.com/forum/forum-25-1.html', target_csv_sis = f"{config.global_share_data_dir}/sis_asia_zt.csv", ident='forum_25'):
total_results = []
cnt = 0
# 备份已有文件
utils.backup_existing_file(target_csv_sis)
while url:
logging.info(f"fetching url {url}")
soup, status_code = scraper.fetch_page(url, partial(scraper.generic_validator, tag="table", identifier=ident, attr_type="id"))
if soup:
list_data, next_url = scraper.parse_sis_list(soup, url, ident)
if list_data :
total_results.extend(list_data)
else:
logging.warning(f"fetch_list failed. url: {url} ")
if next_url:
url = next_url
cnt += 1
if cnt % 10 == 0 :
#utils.write_to_csv(total_results, target_csv_sis)
lines = utils.append_to_csv(total_results, target_csv_sis)
if lines:
logging.info(f"write to csv file. new lines: {len(total_results)}, total lines: {lines}")
total_results.clear()
time.sleep(1)
else:
logging.warning(f"fetch_list failed. url: {url} ")
url = None
else:
logging.warning(f'fetch_page error. url: {url}, status_code: {status_code}')
if debug:
break
# 写入csv文件
lines = utils.write_to_csv(total_results, target_csv_sis)
total_results.clear()
if lines:
logging.info(f"write to file succ. file: {target_csv_sis}, total lines: {lines}")
logging.info(f"fetch list finished. total pages: {cnt}")
def fetch_sis_all():
sections = [
{
'plate' : 'sis_asia_yc',
'url' : 'https://sis001.com/forum/forum-143-1.html',
'ident' : 'forum_143'
},
{
'plate' : 'sis_asia_zt',
'url' : 'https://sis001.com/forum/forum-25-1.html',
'ident' : 'forum_25'
},
{
'plate' : 'sis_oumei_yc',
'url' : 'https://sis001.com/forum/forum-229-1.html',
'ident' : 'forum_229'
},
{
'plate' : 'sis_oumei_zt',
'url' : 'https://sis001.com/forum/forum-77-1.html',
'ident' : 'forum_77'
},
]
for item in sections:
section = item['plate']
url = item['url']
logging.info(f"---------------start fetching {section}, begin url: {url}")
csv_file = f"{config.global_share_data_dir}/{section}.csv"
fetch_sis_list(url=url, target_csv_sis=csv_file, ident=item['ident'])
# 建立缩写到函数的映射
function_map = {
"list": fetch_list,
"down" : down_torrents,
"sis": fetch_sis_list,
"sis_all": fetch_sis_all,
}
# 主函数
@ -168,6 +255,8 @@ if __name__ == "__main__":
python3 ./fetch.py # 刷新列表,并下载新增资源
python3 ./fetch.py --cmd=list # 刷新列表
python3 ./fetch.py --cmd=down # 并下载新增资源
python3 ./fetch.py --cmd=sis # 刷新sis列表, 亚无转帖版面
python3 ./fetch.py --cmd=sis_all # 刷新sis列表, 所有版面
''')
parser = argparse.ArgumentParser(

View File

@ -11,6 +11,7 @@ import random
from bs4 import BeautifulSoup
from requests.exceptions import RequestException
from functools import partial
from urllib.parse import urljoin
import config
import utils
@ -19,6 +20,8 @@ host_url = 'https://u001.25img.com'
list_url_update = f'{host_url}/category.html?pageNum=1&pageSize=30&catId=-1&size=-1&isFinish=-1&updT=-1&orderBy=update'
#list_url_wordcount = 'https://aabook.xyz/category.html?pageNum=1&pageSize=30&catId=-1&size=-1&isFinish=-1&updT=-1&orderBy=wordcount'
sis_host_url = 'https://sis001.com'
# User-Agent 列表
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.63 Safari/537.36",
@ -32,7 +35,7 @@ user_agents = [
def fetch_page(url, validator, parser="html.parser", preprocessor=None, max_retries=3, sleep_time=5, default_timeout=10):
for attempt in range(max_retries):
try:
if '25img.com' not in url.lower():
if '25img.com' not in url.lower() and 'sis001.com' not in url.lower():
logging.error(f'wrong url format: {url}')
return None, None
@ -212,6 +215,121 @@ def download_torrent(torrent_url, target_file):
logging.warning(f"Error downloading {torrent_url}: {str(e)}")
return False
def parse_size_format(size_text: str):
"""解析大小和格式"""
try:
if not size_text:
return 0.0, "未知格式"
# 分割大小和格式
parts = size_text.split('/')
format_part = parts[1].strip() if len(parts) > 1 else "未知格式"
# 解析大小
size_part = parts[0].strip()
match = re.search(r'(\d+\.\d+|\d+)\s*([A-Za-z]+)', size_part)
if not match:
logging.warning(f"无法解析大小: {size_part}")
return 0.0, format_part
value, unit = match.groups()
value = float(value)
if unit.lower() == 'mb' or unit.lower() == 'm':
return round(value / 1024, 2), format_part
elif unit.lower() == 'gb' or unit.lower() == 'g':
return round(value, 2), format_part
else:
logging.warning(f"未知单位: {unit}")
return 0.0, format_part
except Exception as e:
logging.error(f"解析大小格式时出错: {e}")
return 0.0, "未知格式"
def parse_sis_list(soup, curr_url, ident):
"""解析符合条件的表格"""
tables = soup.find_all('table', {'id': ident})
if not tables:
logging.warning(f"cannot found table. url: {curr_url}")
return None, None
main_table = None
for table in tables:
try:
# 检查表头是否包含"版块主题"
thead = table.find('thead')
if thead and '版块主题' in thead.get_text():
main_table = table
break
except Exception as e:
logging.warning(f"解析表格时出错: {e} url: {curr_url}")
if not main_table:
logging.warning(f"cannot found table in right topic. url: {curr_url}")
return None, None
results = []
bodies = main_table.find_all('tbody', id=re.compile(r'normalthread_\d+'))
for body in bodies:
try:
rows = body.find_all('tr')
for row in rows:
tds = row.find_all('td')
if len(tds) < 6:
logging.warning(f"跳过不完整的行,列数: {len(tds)}")
continue
# 解析类别和标题
th_lock = row.find('th')
if not th_lock:
logging.warning("未找到th.lock元素")
continue
# 解析类别链接
category_links = th_lock.find_all('a', href=re.compile(r'forumdisplay.php'))
category = category_links[0].text.strip() if category_links else "未知类别"
# 解析标题链接
title_links = th_lock.find_all('a', href=re.compile(r'thread-\d+-\d+-\d+.html'))
title = title_links[0].text.strip() if title_links else "未知标题"
url = title_links[0]['href'] if title_links else ""
url = urljoin(curr_url, url)
# 解析发布日期
author_td = tds[2]
date = author_td.find('em').text.strip() if author_td.find('em') else "未知日期"
# 解析大小和格式
size_td = tds[4]
size_text = size_td.text.strip()
size_gb, file_format = parse_size_format(size_text)
# 添加到结果
results.append({
"category": category,
"title": title,
"url": url,
"date": date,
"size_text": size_text,
"size_gb": size_gb,
"format": file_format
})
except Exception as e:
logging.error(f"解析tbody时出错: {e}")
next_url = None
pages_btns = soup.find('div', class_='pages_btns')
if not pages_btns:
logging.debug("未找到页面导航栏")
else:
next_link = pages_btns.find('a', class_='next')
if next_link:
next_url = urljoin(curr_url, next_link['href'])
return results, next_url
def test_chapter_page(url):
soup, status_code = fetch_page(url, partial(generic_validator, tag="div", identifier="table-responsive", attr_type="class"))
@ -222,7 +340,18 @@ def test_chapter_page(url):
if total_pages :
print(total_pages)
def test_sis_page(url):
soup, status_code = fetch_page(url, partial(generic_validator, tag="table", identifier="forum_25", attr_type="id"))
if soup:
data, next_url = parse_sis_list(soup, url)
if data:
print(data)
if next_url :
print(next_url)
if __name__ == "__main__":
test_chapter_page('https://u001.25img.com/?p=1')
#test_chapter_page('https://u001.25img.com/?p=1')
test_sis_page('https://sis001.com/forum/forum-25-1.html')

View File

@ -1,5 +1,28 @@
import csv
import os
import time
def backup_existing_file(file_path):
"""检查文件是否存在如果存在则添加bak扩展名进行备份"""
if os.path.exists(file_path):
# 获取文件所在目录和文件名
dir_name = os.path.dirname(file_path)
base_name = os.path.basename(file_path)
# 构建备份文件名,添加时间戳避免覆盖
timestamp = time.strftime("%Y%m%d-%H%M%S")
backup_name = f"{os.path.splitext(base_name)[0]}_{timestamp}.bak"
backup_path = os.path.join(dir_name, backup_name)
try:
# 重命名文件
os.rename(file_path, backup_path)
print(f"已将现有文件备份为: {backup_path}")
except Exception as e:
print(f"备份文件时出错: {e}")
return False
return True
def write_to_csv(data, filename='output.csv'):
"""将资源数据写入CSV文件"""
@ -7,12 +30,8 @@ def write_to_csv(data, filename='output.csv'):
print("没有数据可写入")
return None
# 定义CSV文件的列
fieldnames = [
'category', 'title', 'url',
'torrent_url', 'magnet_url',
'size_text', 'size_gb', 'update_date'
]
# 从第一条数据中提取所有可能的字段
fieldnames = list(data[0].keys()) if data else []
try:
# 写入CSV文件
@ -32,6 +51,36 @@ def write_to_csv(data, filename='output.csv'):
print(f"写入CSV文件时出错: {e}")
return None
def append_to_csv(data, filename='output.csv'):
"""将单条数据追加到CSV文件"""
if not data:
print("没有数据可写入")
return None
# 从第一条数据中提取所有可能的字段名
fieldnames = list(data[0].keys()) if data else []
file_exists = os.path.exists(filename)
try:
# 追加模式打开文件
with open(filename, 'a', newline='', encoding='utf-8-sig') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# 如果文件不存在,先写入表头
if not file_exists:
writer.writeheader()
# 写入数据行
writer.writerows(data) # 使用writerows处理批量数据
# 计算文件的总行数
with open(filename, 'r', encoding='utf-8-sig') as f:
return sum(1 for _ in f)
except Exception as e:
print(f"写入CSV文件时出错: {e}")
return None
def read_csv_data(csv_file):
"""读取CSV文件并返回数据列表"""