71 lines
2.1 KiB
Python
71 lines
2.1 KiB
Python
import requests
|
|
import time
|
|
import sys
|
|
|
|
# 企业微信相关信息
|
|
CORP_ID = 'ww5d7d350d9b8c0be3'
|
|
SECRET = 'YhagYQpaNIK9j1ATopgKNQhw3D13mpGZ64YVr23Je-A'
|
|
AGENT_ID = '1000003'
|
|
|
|
# 获取 access_token
|
|
def get_access_token():
|
|
url = f'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={CORP_ID}&corpsecret={SECRET}'
|
|
response = requests.get(url)
|
|
result = response.json()
|
|
if result.get('errcode') == 0:
|
|
return result.get('access_token')
|
|
else:
|
|
print(f"获取 access_token 失败: {result.get('errmsg')}")
|
|
return None
|
|
|
|
# 发送消息到企业微信
|
|
def send_message(access_token, message, touser=None, toparty=None, totag=None):
|
|
url = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}'
|
|
data = {
|
|
"msgtype": "text",
|
|
"agentid": AGENT_ID,
|
|
"text": {
|
|
"content": message
|
|
},
|
|
"safe": 0
|
|
}
|
|
if touser:
|
|
data["touser"] = touser
|
|
if toparty:
|
|
data["toparty"] = toparty
|
|
if totag:
|
|
data["totag"] = totag
|
|
|
|
response = requests.post(url, json=data)
|
|
result = response.json()
|
|
if result.get('errcode') == 0:
|
|
print("消息发送成功")
|
|
else:
|
|
print(f"消息发送失败: {result.get('errmsg')}")
|
|
|
|
# 主函数
|
|
def main(report_content=None):
|
|
# 模拟数据报表内容
|
|
if report_content is None:
|
|
report_content = "这是第一行\n这是第二行\n这是第三行"
|
|
else:
|
|
# 处理转义字符
|
|
report_content = report_content.encode().decode('unicode_escape')
|
|
|
|
# 获取 access_token
|
|
access_token = get_access_token()
|
|
if access_token:
|
|
# 示例:发送给特定人员
|
|
send_message(access_token, report_content, touser='oscar')
|
|
# 示例:发送给特定部门
|
|
# send_message(access_token, report_content, toparty='department1|department2')
|
|
# 示例:发送给特定标签
|
|
# send_message(access_token, report_content, totag='tag1|tag2')
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) > 1:
|
|
message = sys.argv[1]
|
|
main(message)
|
|
else:
|
|
main()
|