Files
devops/tools/send_to_wecom.py
2025-03-17 11:53:43 +08:00

71 lines
2.1 KiB
Python

import requests
import time
import sys
# 企业微信相关信息
CORP_ID = 'ww5d7d350d9b8c0be3'
SECRET = 'YhagYQpaNIK9j1ATopgKNQhw3D13mpGZ64YVr23Je-A'
AGENT_ID = '1000003'
# 获取 access_token
def get_access_token():
url = f'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={CORP_ID}&corpsecret={SECRET}'
response = requests.get(url)
result = response.json()
if result.get('errcode') == 0:
return result.get('access_token')
else:
print(f"获取 access_token 失败: {result.get('errmsg')}")
return None
# 发送消息到企业微信
def send_message(access_token, message, touser=None, toparty=None, totag=None):
url = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}'
data = {
"msgtype": "text",
"agentid": AGENT_ID,
"text": {
"content": message
},
"safe": 0
}
if touser:
data["touser"] = touser
if toparty:
data["toparty"] = toparty
if totag:
data["totag"] = totag
response = requests.post(url, json=data)
result = response.json()
if result.get('errcode') == 0:
print("消息发送成功")
else:
print(f"消息发送失败: {result.get('errmsg')}")
# 主函数
def main(report_content=None):
# 模拟数据报表内容
if report_content is None:
report_content = "这是第一行\n这是第二行\n这是第三行"
else:
# 处理转义字符
report_content = report_content.encode().decode('unicode_escape')
# 获取 access_token
access_token = get_access_token()
if access_token:
# 示例:发送给特定人员
send_message(access_token, report_content, touser='oscar')
# 示例:发送给特定部门
# send_message(access_token, report_content, toparty='department1|department2')
# 示例:发送给特定标签
# send_message(access_token, report_content, totag='tag1|tag2')
if __name__ == "__main__":
if len(sys.argv) > 1:
message = sys.argv[1]
main(message)
else:
main()