"""企业微信 API 调用:超时与重试,配置来自环境变量。""" import logging from typing import Any import httpx from app.config import settings logger = logging.getLogger(__name__) TIMEOUT = settings.wecom_api_timeout RETRIES = settings.wecom_api_retries BASE = settings.wecom_api_base.rstrip("/") async def _request(method: str, path: str, **kwargs: Any) -> dict | None: url = f"{BASE}{path}" for attempt in range(RETRIES + 1): try: async with httpx.AsyncClient(timeout=TIMEOUT) as client: r = await client.request(method, url, **kwargs) r.raise_for_status() return r.json() except Exception as e: logger.warning("wecom api attempt %s failed: %s", attempt + 1, e) if attempt == RETRIES: raise return None async def get_access_token() -> str: """获取 corpid + secret 的 access_token。""" r = await _request( "GET", "/cgi-bin/gettoken", params={"corpid": settings.wecom_corp_id, "corpsecret": settings.wecom_secret}, ) if not r or r.get("errcode") != 0: raise RuntimeError(r.get("errmsg", "get token failed")) return r["access_token"] async def send_text_to_external(external_user_id: str, content: str) -> None: """发送文本消息给外部联系人(客户联系-发送消息到客户)。""" token = await get_access_token() body = { "touser": [external_user_id], "sender": settings.wecom_agent_id, "msgtype": "text", "text": {"content": content}, } # 企业微信文档:发送消息到客户 send_message_to_user r = await _request("POST", f"/cgi-bin/externalcontact/send_message_to_user?access_token={token}", json=body) if not r or r.get("errcode") != 0: raise RuntimeError(r.get("errmsg", "send failed"))