#!/usr/bin/env python3 import os import re import sys import time import logging import requests import sqlite3 from requests.auth import HTTPBasicAuth from requests.exceptions import RequestException # Paperless 服务器信息 PAPERLESS_URL = "http://localhost:8000/api" AUTH = HTTPBasicAuth("admin", "admin") # Basic Auth 认证 # 日志配置 logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") # 连接到 SQLite 数据库 #DB_PATH = "/usr/src/paperless/db/paperless.sqlite3" # 根据你的数据库实际路径调整 DB_PATH = "/usr/src/paperless/data/db.sqlite3" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() enable_db = True # 正则解析文件名 FILENAME_PATTERN = re.compile(r"(\d{4}-\d{2}-\d{2})_(.*?)_(.*?)_(.*?)_(.*?)_(.*)\.pdf") # 直接从db里获取数据 def query_id_from_db(table, column, value): try: cursor.execute(f"SELECT id FROM {table} WHERE {column} = ?", (value,)) row = cursor.fetchone() if row: logging.info(f'query {table} where {column}="{value}" get id={row[0]}') return row[0] else: return None except sqlite3.Error as e: logging.error(f"Error inserting or updating data: {e}") return None # API 请求封装(带重试),支持GET,POST等 def api_request(method, url, data=None, retries=5): for attempt in range(retries): try: response = requests.request(method, url, json=data, auth=AUTH, timeout=5) if response.status_code in [200, 201]: return response.json() elif response.status_code == 404: logging.warning(f"API 资源未找到: {method} {url}") return None else: logging.error(f"API 请求失败: {method} {url}, 状态码: {response.status_code}, 响应: {response.text}") except RequestException as e: logging.error(f"API 请求异常: {method} {url}, 错误: {e}") if attempt < retries - 1: logging.warning(f"请求失败,等待 2 秒后重试 ({attempt+1}/{retries})...") time.sleep(2) logging.error(f"API 请求最终失败: {method} {url}") return None # 从API查询 def query_id_from_rest_api(endpoint, name): url = f"{PAPERLESS_URL}/{endpoint}" method = 'GET' while url is not None : # 循环请求所有分页数据 json_data = api_request(method, url, data=None, retries=5) if json_data: if "results" in json_data and isinstance(json_data["results"], list): # 找到就直接返回了 for item in json_data.get('results', []): if item["name"] == name: return item["id"] # 没找到,继续下一页 url = json_data.get("next", None) else: logging.warning(f"API 返回无数据: {method} {url}") break else: logging.warning(f"API 返回无数据: {method} {url}") break return None # 获取或创建文档类型 def get_or_create_document_type(name): if enable_db: id = query_id_from_db("documents_documenttype", "name", name) else: id = query_id_from_rest_api("document_types/", name) if id: return id # 创建新的文档类型 new_doc_type = api_request("POST", f"{PAPERLESS_URL}/document_types/", {"name": name}) return new_doc_type.get("id") if new_doc_type else None # 获取或创建 Correspondent(机构/通信者) def get_or_create_correspondent(name): if enable_db: id = query_id_from_db("documents_correspondent", "name", name) else: id = query_id_from_rest_api("correspondents/", name) if id: return id # 创建新的 Correspondent new_correspondent = api_request("POST", f"{PAPERLESS_URL}/correspondents/", {"name": name}) return new_correspondent.get("id") if new_correspondent else None # 获取或创建自定义字段 def get_or_create_custom_field(name): if enable_db: id = query_id_from_db("documents_customfield", "name", name) else: id = query_id_from_rest_api("custom_fields/", name) if id: return id # 创建新的自定义字段 new_field = api_request("POST", f"{PAPERLESS_URL}/custom_fields/", {"name": name, "data_type": "string"}) return new_field.get("id") if new_field else None # 更新文档信息 def update_document(document_id, title, publish_date, doc_type_id, correspondent_id, custom_fields): payload = { "title": title, "created_date": publish_date, "document_type": doc_type_id, "correspondent": correspondent_id, "custom_fields": custom_fields, } return api_request("PATCH", f"{PAPERLESS_URL}/documents/{document_id}/", payload) is not None # 主函数 - 解析文件名并更新 Paperless def process_document(): doc_id = os.getenv("DOCUMENT_ID") filename = os.getenv("DOCUMENT_ORIGINAL_FILENAME") # debug #doc_id = sys.argv[1] #filename = sys.argv[2] if not doc_id or not filename: logging.error("❌ 缺少必要的环境变量,无法执行脚本") return match = FILENAME_PATTERN.match(filename) if not match: logging.warning(f"⚠️ 文件名格式不匹配,跳过处理: {filename}") return publish_date, report_type, org_sname, industry_name, stock_name, title = match.groups() logging.info(f"✅ 解析成功: 日期={publish_date}, 类型={report_type}, 机构={org_sname}, 行业={industry_name}, 股票={stock_name}, 标题={title}") # 获取或创建文档类型 & Correspondent doc_type_id = get_or_create_document_type(report_type) correspondent_id = get_or_create_correspondent(org_sname) if not doc_type_id or not correspondent_id: logging.error("❌ 文档类型/机构创建失败") return # 获取或创建自定义字段 industry_field_id = get_or_create_custom_field("行业") stock_field_id = get_or_create_custom_field("股票名称") if not industry_field_id or not stock_field_id: logging.error("❌ 自定义字段创建失败") return # 组装自定义字段数据 custom_fields = [ {"field": industry_field_id, "value": industry_name}, {"field": stock_field_id, "value": stock_name} ] # 更新 Paperless 文档 success = update_document(doc_id, title, publish_date, doc_type_id, correspondent_id, custom_fields) if success: logging.info(f"✅ 文档 {doc_id} 更新成功: {title}") else: logging.error(f"❌ 文档 {doc_id} 更新失败") if __name__ == "__main__": process_document()