Files
devops/tools/send_to_wecom.py
2025-07-02 17:31:18 +08:00

128 lines
3.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import time
import json
import sys
# 企业微信相关信息
CORP_ID = 'ww5d7d350d9b8c0be3'
SECRET = 'YhagYQpaNIK9j1ATopgKNQhw3D13mpGZ64YVr23Je-A'
AGENT_ID = '1000003'
# 获取 access_token
def get_access_token():
url = f'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={CORP_ID}&corpsecret={SECRET}'
response = requests.get(url)
result = response.json()
if result.get('errcode') == 0:
return result.get('access_token')
else:
print(f"获取 access_token 失败: {result.get('errmsg')}")
return None
# 发送消息到企业微信
def send_message(access_token, message, touser=None, toparty=None, totag=None):
url = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}'
data = {
"msgtype": "text",
"agentid": AGENT_ID,
"text": {
"content": message
},
"safe": 0
}
if touser:
data["touser"] = touser
if toparty:
data["toparty"] = toparty
if totag:
data["totag"] = totag
response = requests.post(url, json=data)
result = response.json()
if result.get('errcode') == 0:
print("消息发送成功")
else:
print(f"消息发送失败: {result.get('errmsg')}")
def pretty_print_json(data, n=10, indent=4, sort_keys=False):
"""
以美化格式打印数组的前n个元素其他元素用"..."表示
参数:
- data: 要打印的数据(应为数组)
- n: 要显示的元素数量
- indent: 缩进空格数
- sort_keys: 是否按键排序
"""
try:
# 处理非数组数据
if not isinstance(data, list):
formatted = json.dumps(data, indent=indent, ensure_ascii=False, sort_keys=sort_keys)
return formatted
# 复制原始数据,避免修改原数组
data_copy = data.copy()
# 切片取前n个元素
first_n_elements = data_copy[:n]
# 如果数组长度超过n添加"..."标记
if len(data) > n:
result = first_n_elements + ["... ({} more elements)".format(len(data) - n)]
else:
result = first_n_elements
# 格式化输出
formatted = json.dumps(result, indent=indent, ensure_ascii=False, sort_keys=sort_keys)
return formatted
except TypeError as e:
print(f"错误:无法格式化数据。详情:{e}")
return str(data)
except Exception as e:
print(f"格式化时发生意外错误:{e}")
return str(data)
def is_json(s):
"""判断字符串是否可以解析为JSON"""
try:
json.loads(s)
return True
except json.JSONDecodeError:
return False
# 主函数
def main(report_content=None):
# 模拟数据报表内容
if report_content is None:
report_content = "这是第一行\n这是第二行\n这是第三行"
else:
# 处理转义字符
report_content = report_content.encode().decode('unicode_escape')
# 判断是否为JSON并格式化
if is_json(report_content):
try:
parsed_data = json.loads(report_content)
report_content = pretty_print_json(parsed_data)
except Exception as e:
print(f"JSON解析或格式化失败: {e}")
# 解析失败时保持原始内容
# 获取 access_token
access_token = get_access_token()
if access_token:
# 示例:发送给特定人员
send_message(access_token, report_content, touser='oscar')
# 示例:发送给特定部门
# send_message(access_token, report_content, toparty='department1|department2')
# 示例:发送给特定标签
# send_message(access_token, report_content, totag='tag1|tag2')
if __name__ == "__main__":
if len(sys.argv) > 1:
message = sys.argv[1]
main(message)
else:
main()