feat: 应用扁平化与 Token 验证 API
All checks were successful
continuous-integration/drone/push Build is passing

- 新增 /api/auth/verify 接口供外部应用验证 token
- 简化应用管理:移除 tools 字段,每个应用独立存在
- 简化应用配置:移除 allowed_tools,专注于租户订阅
- 优化 Token 展示和复制功能
This commit is contained in:
111
2026-01-24 10:05:24 +08:00
parent c4bd7c8251
commit 6a93e05ec3
3 changed files with 264 additions and 331 deletions

View File

@@ -1,8 +1,9 @@
"""认证路由"""
import hmac
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from typing import Optional
from typing import Optional, List
from sqlalchemy.orm import Session
from ..database import get_db
@@ -15,7 +16,10 @@ from ..services.auth import (
TokenData,
UserInfo
)
from ..services.crypto import decrypt_config
from ..models.user import User
from ..models.tenant_app import TenantApp
from ..models.tenant_wechat_app import TenantWechatApp
router = APIRouter(prefix="/auth", tags=["认证"])
security = HTTPBearer()
@@ -221,3 +225,114 @@ async def delete_user(
db.commit()
return {"success": True}
# ============ Token 验证 API供外部应用调用 ============
class VerifyTokenRequest(BaseModel):
"""Token 验证请求"""
token: str
app_code: Optional[str] = None # 可选,用于验证 token 是否属于特定应用
class WechatConfig(BaseModel):
"""企微配置"""
corp_id: Optional[str] = None
agent_id: Optional[str] = None
secret: Optional[str] = None
class VerifyTokenResponse(BaseModel):
"""Token 验证响应"""
valid: bool
tenant_id: Optional[str] = None
app_code: Optional[str] = None
wechat_config: Optional[WechatConfig] = None
error: Optional[str] = None
@router.post("/verify", response_model=VerifyTokenResponse)
async def verify_token(
request: VerifyTokenRequest,
db: Session = Depends(get_db)
):
"""
验证 Token 有效性(供外部应用调用,无需登录)
外部应用收到用户请求后,可调用此接口验证 token
1. 验证 token 是否存在且有效
2. 如传入 app_code验证 token 是否属于该应用
3. 返回租户信息和企微配置
Args:
token: 访问令牌
app_code: 应用代码(可选,用于验证 token 是否属于特定应用)
Returns:
valid: 是否有效
tenant_id: 租户ID
app_code: 应用代码
wechat_config: 企微配置(如有)
"""
if not request.token:
return VerifyTokenResponse(valid=False, error="Token 不能为空")
# 根据 token 查询租户应用配置
query = db.query(TenantApp).filter(
TenantApp.access_token == request.token,
TenantApp.status == 1
)
# 如果指定了 app_code验证 token 是否属于该应用
if request.app_code:
query = query.filter(TenantApp.app_code == request.app_code)
tenant_app = query.first()
if not tenant_app:
return VerifyTokenResponse(valid=False, error="Token 无效或已过期")
# 获取关联的企微配置
wechat_config = None
if tenant_app.wechat_app_id:
wechat_app = db.query(TenantWechatApp).filter(
TenantWechatApp.id == tenant_app.wechat_app_id,
TenantWechatApp.status == 1
).first()
if wechat_app:
# 解密 secret
secret = None
if wechat_app.secret_encrypted:
try:
secret = decrypt_config(wechat_app.secret_encrypted)
except:
pass
wechat_config = WechatConfig(
corp_id=wechat_app.corp_id,
agent_id=wechat_app.agent_id,
secret=secret
)
return VerifyTokenResponse(
valid=True,
tenant_id=tenant_app.tenant_id,
app_code=tenant_app.app_code,
wechat_config=wechat_config
)
@router.get("/verify")
async def verify_token_get(
token: str,
app_code: Optional[str] = None,
db: Session = Depends(get_db)
):
"""
验证 TokenGET 方式,便于简单测试)
"""
return await verify_token(
VerifyTokenRequest(token=token, app_code=app_code),
db
)