All checks were successful
continuous-integration/drone/push Build is passing
- 扩展 ToolConfig 配置类型,新增 external_api 类型
- 实现接口注册表,包含 90+ 睿美云开放接口定义
- 实现 TPOS SHA256WithRSA 签名鉴权
- 实现睿美云 API 客户端,支持多租户配置
- 新增代理路由 /api/ruimeiyun/call/{api_name}
- 支持接口权限控制和健康检查
125 lines
3.3 KiB
Python
125 lines
3.3 KiB
Python
"""
|
||
睿美云 TPOS 鉴权
|
||
|
||
实现睿美云开放接口的身份验证机制:
|
||
- tpos-timestamp: 请求时间戳(秒级)
|
||
- tpos-account: 账号
|
||
- tpos-nonce-str: 随机字符串
|
||
- tpos-sign: SHA256WithRSA 签名
|
||
|
||
签名算法:
|
||
1. 组合待签名字符串: {timestamp}&{nonce_str}
|
||
2. 使用私钥进行 SHA256WithRSA 签名
|
||
3. Base64 编码签名结果
|
||
"""
|
||
|
||
import time
|
||
import uuid
|
||
import base64
|
||
import logging
|
||
from typing import Dict
|
||
|
||
from cryptography.hazmat.primitives import hashes, serialization
|
||
from cryptography.hazmat.primitives.asymmetric import padding
|
||
from cryptography.hazmat.backends import default_backend
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class TposAuthError(Exception):
|
||
"""TPOS 鉴权错误"""
|
||
pass
|
||
|
||
|
||
def build_tpos_headers(account: str, private_key_pem: str) -> Dict[str, str]:
|
||
"""
|
||
构建 TPOS 鉴权请求头
|
||
|
||
Args:
|
||
account: TPOS 账号(由睿美云提供)
|
||
private_key_pem: RSA 私钥(PEM 格式)
|
||
|
||
Returns:
|
||
包含鉴权信息的请求头字典
|
||
|
||
Raises:
|
||
TposAuthError: 签名失败时抛出
|
||
"""
|
||
try:
|
||
# 1. 生成时间戳和随机字符串
|
||
timestamp = str(int(time.time()))
|
||
nonce_str = uuid.uuid4().hex
|
||
|
||
# 2. 组合待签名字符串
|
||
sign_content = f"{timestamp}&{nonce_str}"
|
||
|
||
# 3. 加载私钥
|
||
private_key = serialization.load_pem_private_key(
|
||
private_key_pem.encode('utf-8'),
|
||
password=None,
|
||
backend=default_backend()
|
||
)
|
||
|
||
# 4. SHA256WithRSA 签名
|
||
signature = private_key.sign(
|
||
sign_content.encode('utf-8'),
|
||
padding.PKCS1v15(),
|
||
hashes.SHA256()
|
||
)
|
||
|
||
# 5. Base64 编码
|
||
sign_base64 = base64.b64encode(signature).decode('utf-8')
|
||
|
||
return {
|
||
"tpos-timestamp": timestamp,
|
||
"tpos-account": account,
|
||
"tpos-nonce-str": nonce_str,
|
||
"tpos-sign": sign_base64
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"TPOS 签名失败: {e}")
|
||
raise TposAuthError(f"签名失败: {str(e)}")
|
||
|
||
|
||
def validate_private_key(private_key_pem: str) -> bool:
|
||
"""
|
||
验证私钥格式是否正确
|
||
|
||
Args:
|
||
private_key_pem: RSA 私钥(PEM 格式)
|
||
|
||
Returns:
|
||
True 如果私钥有效,否则 False
|
||
"""
|
||
try:
|
||
serialization.load_pem_private_key(
|
||
private_key_pem.encode('utf-8'),
|
||
password=None,
|
||
backend=default_backend()
|
||
)
|
||
return True
|
||
except Exception as e:
|
||
logger.warning(f"私钥验证失败: {e}")
|
||
return False
|
||
|
||
|
||
def mask_private_key(private_key_pem: str, show_chars: int = 50) -> str:
|
||
"""
|
||
对私钥进行脱敏处理,用于日志显示
|
||
|
||
Args:
|
||
private_key_pem: RSA 私钥(PEM 格式)
|
||
show_chars: 显示的字符数
|
||
|
||
Returns:
|
||
脱敏后的字符串
|
||
"""
|
||
if not private_key_pem:
|
||
return ""
|
||
|
||
if len(private_key_pem) <= show_chars * 2:
|
||
return "****"
|
||
|
||
return f"{private_key_pem[:show_chars]}...****...{private_key_pem[-show_chars:]}"
|