feat: 新增睿美云对接模块
All checks were successful
continuous-integration/drone/push Build is passing

- 扩展 ToolConfig 配置类型,新增 external_api 类型
- 实现接口注册表,包含 90+ 睿美云开放接口定义
- 实现 TPOS SHA256WithRSA 签名鉴权
- 实现睿美云 API 客户端,支持多租户配置
- 新增代理路由 /api/ruimeiyun/call/{api_name}
- 支持接口权限控制和健康检查
This commit is contained in:
2026-01-30 17:27:58 +08:00
parent c1ba17f809
commit afcf30b519
23 changed files with 6290 additions and 4648 deletions

View File

@@ -0,0 +1,124 @@
"""
睿美云 TPOS 鉴权
实现睿美云开放接口的身份验证机制:
- tpos-timestamp: 请求时间戳(秒级)
- tpos-account: 账号
- tpos-nonce-str: 随机字符串
- tpos-sign: SHA256WithRSA 签名
签名算法:
1. 组合待签名字符串: {timestamp}&{nonce_str}
2. 使用私钥进行 SHA256WithRSA 签名
3. Base64 编码签名结果
"""
import time
import uuid
import base64
import logging
from typing import Dict
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.backends import default_backend
logger = logging.getLogger(__name__)
class TposAuthError(Exception):
"""TPOS 鉴权错误"""
pass
def build_tpos_headers(account: str, private_key_pem: str) -> Dict[str, str]:
"""
构建 TPOS 鉴权请求头
Args:
account: TPOS 账号(由睿美云提供)
private_key_pem: RSA 私钥PEM 格式)
Returns:
包含鉴权信息的请求头字典
Raises:
TposAuthError: 签名失败时抛出
"""
try:
# 1. 生成时间戳和随机字符串
timestamp = str(int(time.time()))
nonce_str = uuid.uuid4().hex
# 2. 组合待签名字符串
sign_content = f"{timestamp}&{nonce_str}"
# 3. 加载私钥
private_key = serialization.load_pem_private_key(
private_key_pem.encode('utf-8'),
password=None,
backend=default_backend()
)
# 4. SHA256WithRSA 签名
signature = private_key.sign(
sign_content.encode('utf-8'),
padding.PKCS1v15(),
hashes.SHA256()
)
# 5. Base64 编码
sign_base64 = base64.b64encode(signature).decode('utf-8')
return {
"tpos-timestamp": timestamp,
"tpos-account": account,
"tpos-nonce-str": nonce_str,
"tpos-sign": sign_base64
}
except Exception as e:
logger.error(f"TPOS 签名失败: {e}")
raise TposAuthError(f"签名失败: {str(e)}")
def validate_private_key(private_key_pem: str) -> bool:
"""
验证私钥格式是否正确
Args:
private_key_pem: RSA 私钥PEM 格式)
Returns:
True 如果私钥有效,否则 False
"""
try:
serialization.load_pem_private_key(
private_key_pem.encode('utf-8'),
password=None,
backend=default_backend()
)
return True
except Exception as e:
logger.warning(f"私钥验证失败: {e}")
return False
def mask_private_key(private_key_pem: str, show_chars: int = 50) -> str:
"""
对私钥进行脱敏处理,用于日志显示
Args:
private_key_pem: RSA 私钥PEM 格式)
show_chars: 显示的字符数
Returns:
脱敏后的字符串
"""
if not private_key_pem:
return ""
if len(private_key_pem) <= show_chars * 2:
return "****"
return f"{private_key_pem[:show_chars]}...****...{private_key_pem[-show_chars:]}"