"""企业微信服务""" import hashlib import time import logging from typing import Optional, Dict, Any from dataclasses import dataclass import httpx from ..config import get_settings from .cache import get_cache from .crypto import decrypt_config logger = logging.getLogger(__name__) settings = get_settings() @dataclass class WechatConfig: """企业微信应用配置""" corp_id: str agent_id: str secret: str class WechatService: """企业微信服务 提供access_token获取、JS-SDK签名、OAuth2等功能 使用示例: wechat = WechatService(corp_id="wwxxxx", agent_id="1000001", secret="xxx") # 获取access_token token = await wechat.get_access_token() # 获取JS-SDK签名 signature = await wechat.get_jssdk_signature("https://example.com/page") """ # 企业微信API基础URL BASE_URL = "https://qyapi.weixin.qq.com" def __init__(self, corp_id: str, agent_id: str, secret: str): """初始化企业微信服务 Args: corp_id: 企业ID agent_id: 应用AgentId secret: 应用Secret(明文) """ self.corp_id = corp_id self.agent_id = agent_id self.secret = secret self._cache = get_cache() @classmethod def from_wechat_app(cls, wechat_app) -> "WechatService": """从TenantWechatApp模型创建服务实例 Args: wechat_app: TenantWechatApp数据库模型 Returns: WechatService实例 """ secret = "" if wechat_app.secret_encrypted: try: secret = decrypt_config(wechat_app.secret_encrypted) except Exception as e: logger.error(f"Failed to decrypt secret: {e}") return cls( corp_id=wechat_app.corp_id, agent_id=wechat_app.agent_id, secret=secret ) def _cache_key(self, key_type: str) -> str: """生成缓存键""" return f"wechat:{self.corp_id}:{self.agent_id}:{key_type}" async def get_access_token(self, force_refresh: bool = False) -> Optional[str]: """获取access_token 企业微信access_token有效期7200秒,需要缓存 Args: force_refresh: 是否强制刷新 Returns: access_token或None """ cache_key = self._cache_key("access_token") # 尝试从缓存获取 if not force_refresh: cached = self._cache.get(cache_key) if cached: logger.debug(f"Access token from cache: {cached[:20]}...") return cached # 从企业微信API获取 url = f"{self.BASE_URL}/cgi-bin/gettoken" params = { "corpid": self.corp_id, "corpsecret": self.secret } try: async with httpx.AsyncClient(timeout=10) as client: response = await client.get(url, params=params) result = response.json() if result.get("errcode", 0) != 0: logger.error(f"Get access_token failed: {result}") return None access_token = result.get("access_token") expires_in = result.get("expires_in", 7200) # 缓存,提前200秒过期以确保安全 self._cache.set( cache_key, access_token, ttl=min(expires_in - 200, settings.WECHAT_ACCESS_TOKEN_EXPIRE) ) logger.info(f"Got new access_token for {self.corp_id}") return access_token except Exception as e: logger.error(f"Get access_token error: {e}") return None async def get_jsapi_ticket(self, force_refresh: bool = False) -> Optional[str]: """获取jsapi_ticket 用于生成JS-SDK签名 Args: force_refresh: 是否强制刷新 Returns: jsapi_ticket或None """ cache_key = self._cache_key("jsapi_ticket") # 尝试从缓存获取 if not force_refresh: cached = self._cache.get(cache_key) if cached: logger.debug(f"JSAPI ticket from cache: {cached[:20]}...") return cached # 先获取access_token access_token = await self.get_access_token() if not access_token: return None # 获取jsapi_ticket url = f"{self.BASE_URL}/cgi-bin/get_jsapi_ticket" params = {"access_token": access_token} try: async with httpx.AsyncClient(timeout=10) as client: response = await client.get(url, params=params) result = response.json() if result.get("errcode", 0) != 0: logger.error(f"Get jsapi_ticket failed: {result}") return None ticket = result.get("ticket") expires_in = result.get("expires_in", 7200) # 缓存 self._cache.set( cache_key, ticket, ttl=min(expires_in - 200, settings.WECHAT_JSAPI_TICKET_EXPIRE) ) logger.info(f"Got new jsapi_ticket for {self.corp_id}") return ticket except Exception as e: logger.error(f"Get jsapi_ticket error: {e}") return None async def get_jssdk_signature( self, url: str, noncestr: Optional[str] = None, timestamp: Optional[int] = None ) -> Optional[Dict[str, Any]]: """生成JS-SDK签名 Args: url: 当前页面URL(不含#及其后面部分) noncestr: 随机字符串,可选 timestamp: 时间戳,可选 Returns: 签名信息字典,包含signature, noncestr, timestamp, appId等 """ ticket = await self.get_jsapi_ticket() if not ticket: return None # 生成随机字符串和时间戳 if noncestr is None: import secrets noncestr = secrets.token_hex(8) if timestamp is None: timestamp = int(time.time()) # 构建签名字符串 sign_str = f"jsapi_ticket={ticket}&noncestr={noncestr}×tamp={timestamp}&url={url}" # SHA1签名 signature = hashlib.sha1(sign_str.encode()).hexdigest() return { "appId": self.corp_id, "agentId": self.agent_id, "timestamp": timestamp, "nonceStr": noncestr, "signature": signature, "url": url } def get_oauth2_url( self, redirect_uri: str, scope: str = "snsapi_base", state: str = "" ) -> str: """生成OAuth2授权URL Args: redirect_uri: 授权后重定向的URL scope: 应用授权作用域 - snsapi_base: 静默授权,只能获取成员基础信息 - snsapi_privateinfo: 手动授权,可获取成员详细信息 state: 重定向后会带上state参数 Returns: OAuth2授权URL """ import urllib.parse encoded_uri = urllib.parse.quote(redirect_uri, safe='') url = ( f"https://open.weixin.qq.com/connect/oauth2/authorize" f"?appid={self.corp_id}" f"&redirect_uri={encoded_uri}" f"&response_type=code" f"&scope={scope}" f"&state={state}" f"&agentid={self.agent_id}" f"#wechat_redirect" ) return url async def get_user_info_by_code(self, code: str) -> Optional[Dict[str, Any]]: """通过OAuth2 code获取用户信息 Args: code: OAuth2回调返回的code Returns: 用户信息字典,包含UserId, DeviceId等 """ access_token = await self.get_access_token() if not access_token: return None url = f"{self.BASE_URL}/cgi-bin/auth/getuserinfo" params = { "access_token": access_token, "code": code } try: async with httpx.AsyncClient(timeout=10) as client: response = await client.get(url, params=params) result = response.json() if result.get("errcode", 0) != 0: logger.error(f"Get user info by code failed: {result}") return None return { "user_id": result.get("userid") or result.get("UserId"), "device_id": result.get("deviceid") or result.get("DeviceId"), "open_id": result.get("openid") or result.get("OpenId"), "external_userid": result.get("external_userid"), } except Exception as e: logger.error(f"Get user info by code error: {e}") return None async def get_user_detail(self, user_id: str) -> Optional[Dict[str, Any]]: """获取成员详细信息 Args: user_id: 成员UserID Returns: 成员详细信息 """ access_token = await self.get_access_token() if not access_token: return None url = f"{self.BASE_URL}/cgi-bin/user/get" params = { "access_token": access_token, "userid": user_id } try: async with httpx.AsyncClient(timeout=10) as client: response = await client.get(url, params=params) result = response.json() if result.get("errcode", 0) != 0: logger.error(f"Get user detail failed: {result}") return None return { "userid": result.get("userid"), "name": result.get("name"), "department": result.get("department"), "position": result.get("position"), "mobile": result.get("mobile"), "email": result.get("email"), "avatar": result.get("avatar"), "status": result.get("status"), } except Exception as e: logger.error(f"Get user detail error: {e}") return None async def get_wechat_service_by_id( wechat_app_id: int, db_session ) -> Optional[WechatService]: """根据企微应用ID获取服务实例 Args: wechat_app_id: platform_tenant_wechat_apps表的ID db_session: 数据库session Returns: WechatService实例或None """ from ..models.tenant_wechat_app import TenantWechatApp wechat_app = db_session.query(TenantWechatApp).filter( TenantWechatApp.id == wechat_app_id, TenantWechatApp.status == 1 ).first() if not wechat_app: return None return WechatService.from_wechat_app(wechat_app)