"""企业微信回调加解密与验签(与企微文档一致)。"""
import base64
import hashlib
import struct
import xml.etree.ElementTree as ET
from typing import Tuple
from Crypto.Cipher import AES
from app.config import settings
def _sha1(s: str) -> str:
return hashlib.sha1(s.encode()).hexdigest()
def _check_signature(signature: str, timestamp: str, nonce: str, echostr_or_encrypt: str) -> bool:
token = settings.wecom_token
lst = [token, timestamp, nonce, echostr_or_encrypt]
lst.sort()
return _sha1("".join(lst)) == signature
def _aes_key() -> bytes:
key_b64 = settings.wecom_encoding_aes_key + "="
return base64.b64decode(key_b64)[:32]
def decrypt(encrypt: str) -> str:
"""解密企微回调密文(echostr 或 Encrypt 节点内容)。"""
key = _aes_key()
iv = key[:16]
raw = base64.b64decode(encrypt)
cipher = AES.new(key, AES.MODE_CBC, iv)
dec = cipher.decrypt(raw)
# 16 随机字节 + 4 字节长度(big-endian) + 消息 + corpid;先按长度取消息,避免 padding 差异
msg_len = struct.unpack(">I", dec[16:20])[0]
return dec[20 : 20 + msg_len].decode("utf-8")
def encrypt(plain: str) -> str:
"""加密回复内容(明文为 XML 或文本)。"""
import os
key = _aes_key()
iv = key[:16]
corpid = settings.wecom_corp_id or "placeholder"
msg = plain.encode("utf-8")
msg_len = struct.pack(">I", len(msg))
rand = os.urandom(16)
to_enc = rand + msg_len + msg + corpid.encode("utf-8")
from Crypto.Util.Padding import pad
to_enc = pad(to_enc, 16)
cipher = AES.new(key, AES.MODE_CBC, iv)
enc = cipher.encrypt(to_enc)
return base64.b64encode(enc).decode("ascii")
def verify_signature(msg_signature: str, timestamp: str, nonce: str, encrypt: str) -> bool:
"""校验签名(GET 或 POST 的 Encrypt)。"""
return _check_signature(msg_signature, timestamp, nonce, encrypt)
def verify_and_decrypt_echostr(msg_signature: str, timestamp: str, nonce: str, echostr: str) -> str | None:
"""GET 校验:验签并解密 echostr,返回明文;失败返回 None。"""
if not verify_signature(msg_signature, timestamp, nonce, echostr):
return None
return decrypt(echostr)
def parse_encrypted_body(body: bytes) -> Tuple[str | None, str | None]:
"""解析 POST 请求体 XML,取 Encrypt;验签用 msg_signature/timestamp/nonce 从 query 传。返回 (encrypt_raw, None) 或 (None, error)。"""
try:
root = ET.fromstring(body)
encrypt_el = root.find("Encrypt")
if encrypt_el is None or encrypt_el.text is None:
return None, "missing Encrypt"
return encrypt_el.text.strip(), None
except Exception as e:
return None, str(e)
def parse_decrypted_xml(plain_xml: str) -> dict | None:
"""解密后的 XML 解析为 dict(ToUserName, FromUserName, MsgType, Content 等)。"""
try:
root = ET.fromstring(plain_xml)
d = {}
for c in root:
if c.text:
d[c.tag] = c.text
return d
except Exception:
return None
def build_reply_xml(to_user: str, from_user: str, content: str) -> str:
"""构造文本回复 XML(明文)。"""
return f"""
{int(__import__("time").time())}
"""
def make_reply_signature(encrypt: str, timestamp: str, nonce: str) -> str:
lst = [settings.wecom_token, timestamp, nonce, encrypt]
lst.sort()
return _sha1("".join(lst))
def build_encrypted_response(encrypt: str, signature: str, timestamp: str, nonce: str) -> str:
"""构造 POST 回复的加密 XML。"""
return f"""
{timestamp}
"""