import requests import time import json import sys # 企业微信相关信息 CORP_ID = 'ww5d7d350d9b8c0be3' SECRET = 'YhagYQpaNIK9j1ATopgKNQhw3D13mpGZ64YVr23Je-A' AGENT_ID = '1000003' # 获取 access_token def get_access_token(): url = f'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={CORP_ID}&corpsecret={SECRET}' response = requests.get(url) result = response.json() if result.get('errcode') == 0: return result.get('access_token') else: print(f"获取 access_token 失败: {result.get('errmsg')}") return None # 发送消息到企业微信 def send_message(access_token, message, touser=None, toparty=None, totag=None): url = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}' data = { "msgtype": "text", "agentid": AGENT_ID, "text": { "content": message }, "safe": 0 } if touser: data["touser"] = touser if toparty: data["toparty"] = toparty if totag: data["totag"] = totag response = requests.post(url, json=data) result = response.json() if result.get('errcode') == 0: print("消息发送成功") else: print(f"消息发送失败: {result.get('errmsg')}") def pretty_print_json(data, n=10, indent=4, sort_keys=False): """ 以美化格式打印数组的前n个元素,其他元素用"..."表示 参数: - data: 要打印的数据(应为数组) - n: 要显示的元素数量 - indent: 缩进空格数 - sort_keys: 是否按键排序 """ try: # 处理非数组数据 if not isinstance(data, list): formatted = json.dumps(data, indent=indent, ensure_ascii=False, sort_keys=sort_keys) return formatted # 复制原始数据,避免修改原数组 data_copy = data.copy() # 切片取前n个元素 first_n_elements = data_copy[:n] # 如果数组长度超过n,添加"..."标记 if len(data) > n: result = first_n_elements + ["... ({} more elements)".format(len(data) - n)] else: result = first_n_elements # 格式化输出 formatted = json.dumps(result, indent=indent, ensure_ascii=False, sort_keys=sort_keys) return formatted except TypeError as e: print(f"错误:无法格式化数据。详情:{e}") return str(data) except Exception as e: print(f"格式化时发生意外错误:{e}") return str(data) def is_json(s): """判断字符串是否可以解析为JSON""" try: json.loads(s) return True except json.JSONDecodeError: return False # 主函数 def main(report_content=None): # 模拟数据报表内容 if report_content is None: report_content = "这是第一行\n这是第二行\n这是第三行" else: # 处理转义字符 report_content = report_content.encode().decode('unicode_escape') # 判断是否为JSON并格式化 if is_json(report_content): try: parsed_data = json.loads(report_content) report_content = pretty_print_json(parsed_data) except Exception as e: print(f"JSON解析或格式化失败: {e}") # 解析失败时保持原始内容 # 获取 access_token access_token = get_access_token() if access_token: # 示例:发送给特定人员 send_message(access_token, report_content, touser='oscar') # 示例:发送给特定部门 # send_message(access_token, report_content, toparty='department1|department2') # 示例:发送给特定标签 # send_message(access_token, report_content, totag='tag1|tag2') if __name__ == "__main__": if len(sys.argv) > 1: message = sys.argv[1] main(message) else: main()