Files
devops/docker/paperless/plugins/parse_filename.py
2025-11-03 16:21:46 +08:00

192 lines
6.8 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import os
import re
import sys
import time
import logging
import requests
import sqlite3
from requests.auth import HTTPBasicAuth
from requests.exceptions import RequestException
# Paperless 服务器信息
PAPERLESS_URL = "http://localhost:8000/api"
#AUTH = HTTPBasicAuth("admin", "admin") # Basic Auth 认证, mac上用这个
AUTH = HTTPBasicAuth("admin", "paperless") # Basic Auth 认证NAS上用这个
# 日志配置
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# 连接到 SQLite 数据库
#DB_PATH = "/usr/src/paperless/db/paperless.sqlite3" # 根据你的数据库实际路径调整
DB_PATH = "/usr/src/paperless/data/db.sqlite3"
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
enable_db = False # 标准用法用API
# 正则解析文件名
FILENAME_PATTERN = re.compile(r"(\d{4}-\d{2}-\d{2})_(.*?)_(.*?)_(.*?)_(.*?)_(.*)\.pdf")
# 直接从db里获取数据
def query_id_from_db(table, column, value):
try:
cursor.execute(f"SELECT id FROM {table} WHERE {column} = ?", (value,))
row = cursor.fetchone()
if row:
logging.info(f'query {table} where {column}="{value}" get id={row[0]}')
return row[0]
else:
return None
except sqlite3.Error as e:
logging.error(f"Error inserting or updating data: {e}")
return None
# API 请求封装(带重试),支持GETPOST等
def api_request(method, url, data=None, retries=5):
for attempt in range(retries):
try:
response = requests.request(method, url, json=data, auth=AUTH, timeout=5)
if response.status_code in [200, 201]:
return response.json()
elif response.status_code == 404:
logging.warning(f"API 资源未找到: {method} {url}")
return None
else:
logging.error(f"API 请求失败: {method} {url}, 状态码: {response.status_code}, 响应: {response.text}")
except RequestException as e:
logging.error(f"API 请求异常: {method} {url}, 错误: {e}")
if attempt < retries - 1:
logging.warning(f"请求失败,等待 2 秒后重试 ({attempt+1}/{retries})...")
time.sleep(2)
logging.error(f"API 请求最终失败: {method} {url}")
return None
# 从API查询
def query_id_from_rest_api(endpoint, name):
url = f"{PAPERLESS_URL}/{endpoint}"
method = 'GET'
while url is not None : # 循环请求所有分页数据
json_data = api_request(method, url, data=None, retries=5)
if json_data:
if "results" in json_data and isinstance(json_data["results"], list):
# 找到就直接返回了
for item in json_data.get('results', []):
if item["name"] == name:
return item["id"]
# 没找到,继续下一页
url = json_data.get("next", None)
else:
logging.warning(f"API 返回无数据: {method} {url}")
break
else:
logging.warning(f"API 返回无数据: {method} {url}")
break
return None
# 获取或创建文档类型
def get_or_create_document_type(name):
if enable_db:
id = query_id_from_db("documents_documenttype", "name", name)
else:
id = query_id_from_rest_api("document_types/", name)
if id:
return id
# 创建新的文档类型
new_doc_type = api_request("POST", f"{PAPERLESS_URL}/document_types/", {"name": name})
return new_doc_type.get("id") if new_doc_type else None
# 获取或创建 Correspondent机构/通信者)
def get_or_create_correspondent(name):
if enable_db:
id = query_id_from_db("documents_correspondent", "name", name)
else:
id = query_id_from_rest_api("correspondents/", name)
if id:
return id
# 创建新的 Correspondent
new_correspondent = api_request("POST", f"{PAPERLESS_URL}/correspondents/", {"name": name})
return new_correspondent.get("id") if new_correspondent else None
# 获取或创建自定义字段
def get_or_create_custom_field(name):
if enable_db:
id = query_id_from_db("documents_customfield", "name", name)
else:
id = query_id_from_rest_api("custom_fields/", name)
if id:
return id
# 创建新的自定义字段
new_field = api_request("POST", f"{PAPERLESS_URL}/custom_fields/", {"name": name, "data_type": "string"})
return new_field.get("id") if new_field else None
# 更新文档信息
def update_document(document_id, title, publish_date, doc_type_id, correspondent_id, custom_fields):
payload = {
"title": title,
"created_date": publish_date,
"document_type": doc_type_id,
"correspondent": correspondent_id,
"custom_fields": custom_fields,
}
return api_request("PATCH", f"{PAPERLESS_URL}/documents/{document_id}/", payload) is not None
# 主函数 - 解析文件名并更新 Paperless
def process_document():
doc_id = os.getenv("DOCUMENT_ID")
filename = os.getenv("DOCUMENT_ORIGINAL_FILENAME")
# debug
#doc_id = sys.argv[1]
#filename = sys.argv[2]
if not doc_id or not filename:
logging.error("❌ 缺少必要的环境变量,无法执行脚本")
return
match = FILENAME_PATTERN.match(filename)
if not match:
logging.warning(f"⚠️ 文件名格式不匹配,跳过处理: {filename}")
return
publish_date, report_type, org_sname, industry_name, stock_name, title = match.groups()
logging.info(f"✅ 解析成功: 日期={publish_date}, 类型={report_type}, 机构={org_sname}, 行业={industry_name}, 股票={stock_name}, 标题={title}")
# 获取或创建文档类型 & Correspondent
doc_type_id = get_or_create_document_type(report_type)
correspondent_id = get_or_create_correspondent(org_sname)
if not doc_type_id or not correspondent_id:
logging.error("❌ 文档类型/机构创建失败")
return
# 获取或创建自定义字段
industry_field_id = get_or_create_custom_field("行业")
stock_field_id = get_or_create_custom_field("股票名称")
if not industry_field_id or not stock_field_id:
logging.error("❌ 自定义字段创建失败")
return
# 组装自定义字段数据
custom_fields = [
{"field": industry_field_id, "value": industry_name},
{"field": stock_field_id, "value": stock_name}
]
# 更新 Paperless 文档
success = update_document(doc_id, title, publish_date, doc_type_id, correspondent_id, custom_fields)
if success:
logging.info(f"✅ 文档 {doc_id} 更新成功: {title}")
else:
logging.error(f"❌ 文档 {doc_id} 更新失败")
if __name__ == "__main__":
process_document()