feat: 租户应用配置支持自定义参数
All checks were successful
continuous-integration/drone/push Build is passing

- 后端: TenantApp 模型添加 custom_configs 字段 (LONGTEXT)
- 后端: tenant_apps API 支持自定义配置的增删改查
- 前端: 应用订阅编辑对话框增加自定义配置编辑区域
- 支持 key-value-备注 三字段结构
- value 使用 textarea 支持超长文本(如提示词)
This commit is contained in:
2026-01-27 17:15:19 +08:00
parent 7134947c0c
commit e37466a7cc
6 changed files with 1273 additions and 1160 deletions

View File

@@ -23,6 +23,10 @@ class TenantApp(Base):
# 功能权限
allowed_tools = Column(Text) # JSON 数组
# 自定义配置JSON 数组)
# [{"key": "industry", "value": "medical_beauty", "remark": "医美行业"}, ...]
custom_configs = Column(Text)
status = Column(SmallInteger, default=1)
created_at = Column(TIMESTAMP, default=datetime.now)
updated_at = Column(TIMESTAMP, default=datetime.now, onupdate=datetime.now)

View File

@@ -1,21 +1,21 @@
"""租户工具配置模型"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, SmallInteger, TIMESTAMP
from ..database import Base
class ToolConfig(Base):
"""租户工具配置表"""
__tablename__ = "platform_tool_configs"
id = Column(Integer, primary_key=True, autoincrement=True)
tenant_id = Column(String(50), nullable=False, comment="租户ID")
tool_code = Column(String(50), nullable=True, comment="工具代码NULL 表示租户级共享配置)")
config_type = Column(String(30), nullable=False, comment="配置类型datasource / jssdk / webhook / params")
config_key = Column(String(100), nullable=False, comment="配置键名")
config_value = Column(Text, comment="配置值(明文或加密)")
is_encrypted = Column(SmallInteger, default=0, comment="是否加密存储")
description = Column(String(255), comment="配置说明")
status = Column(SmallInteger, default=1)
created_at = Column(TIMESTAMP, default=datetime.now)
updated_at = Column(TIMESTAMP, default=datetime.now, onupdate=datetime.now)
"""租户工具配置模型"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, SmallInteger, TIMESTAMP
from ..database import Base
class ToolConfig(Base):
"""租户工具配置表"""
__tablename__ = "platform_tool_configs"
id = Column(Integer, primary_key=True, autoincrement=True)
tenant_id = Column(String(50), nullable=False, comment="租户ID")
tool_code = Column(String(50), nullable=True, comment="工具代码NULL 表示租户级共享配置)")
config_type = Column(String(30), nullable=False, comment="配置类型datasource / jssdk / webhook / params")
config_key = Column(String(100), nullable=False, comment="配置键名")
config_value = Column(Text, comment="配置值(明文或加密)")
is_encrypted = Column(SmallInteger, default=0, comment="是否加密存储")
description = Column(String(255), comment="配置说明")
status = Column(SmallInteger, default=1)
created_at = Column(TIMESTAMP, default=datetime.now)
updated_at = Column(TIMESTAMP, default=datetime.now, onupdate=datetime.now)

View File

@@ -1,235 +1,254 @@
"""租户应用配置路由"""
import json
import secrets
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from typing import Optional, List
from sqlalchemy.orm import Session
from ..database import get_db
from ..models.tenant_app import TenantApp
from ..models.app import App
from .auth import get_current_user, require_operator
from ..models.user import User
router = APIRouter(prefix="/tenant-apps", tags=["应用配置"])
# Schemas
class TenantAppCreate(BaseModel):
tenant_id: str
app_code: str = "tools"
app_name: Optional[str] = None
wechat_app_id: Optional[int] = None # 关联的企微应用ID
access_token: Optional[str] = None # 如果不传则自动生成
allowed_origins: Optional[List[str]] = None
allowed_tools: Optional[List[str]] = None
class TenantAppUpdate(BaseModel):
app_name: Optional[str] = None
wechat_app_id: Optional[int] = None # 关联的企微应用ID
access_token: Optional[str] = None
allowed_origins: Optional[List[str]] = None
allowed_tools: Optional[List[str]] = None
status: Optional[int] = None
# API Endpoints
@router.get("")
async def list_tenant_apps(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=1000),
tenant_id: Optional[str] = None,
app_code: Optional[str] = None,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取应用配置列表"""
query = db.query(TenantApp)
if tenant_id:
query = query.filter(TenantApp.tenant_id == tenant_id)
if app_code:
query = query.filter(TenantApp.app_code == app_code)
total = query.count()
apps = query.order_by(TenantApp.id.desc()).offset((page - 1) * size).limit(size).all()
return {
"total": total,
"page": page,
"size": size,
"items": [format_tenant_app(app, mask_secret=True, db=db) for app in apps]
}
@router.get("/{app_id}")
async def get_tenant_app(
app_id: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取应用配置详情"""
app = db.query(TenantApp).filter(TenantApp.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="应用配置不存在")
return format_tenant_app(app, mask_secret=True, db=db)
@router.post("")
async def create_tenant_app(
data: TenantAppCreate,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""创建应用配置"""
# 验证 app_code 是否存在于应用管理中
app_exists = db.query(App).filter(App.app_code == data.app_code, App.status == 1).first()
if not app_exists:
raise HTTPException(status_code=400, detail=f"应用 '{data.app_code}' 不存在,请先在应用管理中创建")
# 检查是否重复
exists = db.query(TenantApp).filter(
TenantApp.tenant_id == data.tenant_id,
TenantApp.app_code == data.app_code
).first()
if exists:
raise HTTPException(status_code=400, detail="该租户应用配置已存在")
# 自动生成 access_token
access_token = data.access_token or secrets.token_hex(32)
app = TenantApp(
tenant_id=data.tenant_id,
app_code=data.app_code,
app_name=data.app_name,
wechat_app_id=data.wechat_app_id,
access_token=access_token,
allowed_origins=json.dumps(data.allowed_origins) if data.allowed_origins else None,
allowed_tools=json.dumps(data.allowed_tools) if data.allowed_tools else None,
status=1
)
db.add(app)
db.commit()
db.refresh(app)
return {"success": True, "id": app.id, "access_token": access_token}
@router.put("/{app_id}")
async def update_tenant_app(
app_id: int,
data: TenantAppUpdate,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""更新应用配置"""
app = db.query(TenantApp).filter(TenantApp.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="应用配置不存在")
update_data = data.model_dump(exclude_unset=True)
# 处理 JSON 字段
if 'allowed_origins' in update_data:
update_data['allowed_origins'] = json.dumps(update_data['allowed_origins']) if update_data['allowed_origins'] else None
if 'allowed_tools' in update_data:
update_data['allowed_tools'] = json.dumps(update_data['allowed_tools']) if update_data['allowed_tools'] else None
for key, value in update_data.items():
setattr(app, key, value)
db.commit()
return {"success": True}
@router.delete("/{app_id}")
async def delete_tenant_app(
app_id: int,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""删除应用配置"""
app = db.query(TenantApp).filter(TenantApp.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="应用配置不存在")
db.delete(app)
db.commit()
return {"success": True}
@router.get("/{app_id}/token")
async def get_token(
app_id: int,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""获取真实的 access_token仅管理员可用"""
app = db.query(TenantApp).filter(TenantApp.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="应用配置不存在")
# 获取应用的 base_url
app_info = db.query(App).filter(App.app_code == app.app_code).first()
base_url = app_info.base_url if app_info else ""
return {
"access_token": app.access_token,
"base_url": base_url
}
@router.post("/{app_id}/regenerate-token")
async def regenerate_token(
app_id: int,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""重新生成 access_token"""
app = db.query(TenantApp).filter(TenantApp.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="应用配置不存在")
new_token = secrets.token_hex(32)
app.access_token = new_token
db.commit()
return {"success": True, "access_token": new_token}
def format_tenant_app(app: TenantApp, mask_secret: bool = True, db: Session = None) -> dict:
"""格式化应用配置"""
# 获取关联的企微应用信息
wechat_app_info = None
if app.wechat_app_id and db:
from ..models.tenant_wechat_app import TenantWechatApp
wechat_app = db.query(TenantWechatApp).filter(TenantWechatApp.id == app.wechat_app_id).first()
if wechat_app:
wechat_app_info = {
"id": wechat_app.id,
"name": wechat_app.name,
"corp_id": wechat_app.corp_id,
"agent_id": wechat_app.agent_id
}
result = {
"id": app.id,
"tenant_id": app.tenant_id,
"app_code": app.app_code,
"app_name": app.app_name,
"wechat_app_id": app.wechat_app_id,
"wechat_app": wechat_app_info,
"access_token": "******" if mask_secret and app.access_token else app.access_token,
"allowed_origins": json.loads(app.allowed_origins) if app.allowed_origins else [],
"allowed_tools": json.loads(app.allowed_tools) if app.allowed_tools else [],
"status": app.status,
"created_at": app.created_at,
"updated_at": app.updated_at
}
return result
"""租户应用配置路由"""
import json
import secrets
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from typing import Optional, List
from sqlalchemy.orm import Session
from ..database import get_db
from ..models.tenant_app import TenantApp
from ..models.app import App
from .auth import get_current_user, require_operator
from ..models.user import User
router = APIRouter(prefix="/tenant-apps", tags=["应用配置"])
# Schemas
class CustomConfigItem(BaseModel):
"""自定义配置项"""
key: str # 配置键
value: str # 配置值
remark: Optional[str] = None # 备注说明
class TenantAppCreate(BaseModel):
tenant_id: str
app_code: str = "tools"
app_name: Optional[str] = None
wechat_app_id: Optional[int] = None # 关联的企微应用ID
access_token: Optional[str] = None # 如果不传则自动生成
allowed_origins: Optional[List[str]] = None
allowed_tools: Optional[List[str]] = None
custom_configs: Optional[List[CustomConfigItem]] = None # 自定义配置
class TenantAppUpdate(BaseModel):
app_name: Optional[str] = None
wechat_app_id: Optional[int] = None # 关联的企微应用ID
access_token: Optional[str] = None
allowed_origins: Optional[List[str]] = None
allowed_tools: Optional[List[str]] = None
custom_configs: Optional[List[CustomConfigItem]] = None # 自定义配置
status: Optional[int] = None
# API Endpoints
@router.get("")
async def list_tenant_apps(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=1000),
tenant_id: Optional[str] = None,
app_code: Optional[str] = None,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取应用配置列表"""
query = db.query(TenantApp)
if tenant_id:
query = query.filter(TenantApp.tenant_id == tenant_id)
if app_code:
query = query.filter(TenantApp.app_code == app_code)
total = query.count()
apps = query.order_by(TenantApp.id.desc()).offset((page - 1) * size).limit(size).all()
return {
"total": total,
"page": page,
"size": size,
"items": [format_tenant_app(app, mask_secret=True, db=db) for app in apps]
}
@router.get("/{app_id}")
async def get_tenant_app(
app_id: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取应用配置详情"""
app = db.query(TenantApp).filter(TenantApp.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="应用配置不存在")
return format_tenant_app(app, mask_secret=True, db=db)
@router.post("")
async def create_tenant_app(
data: TenantAppCreate,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""创建应用配置"""
# 验证 app_code 是否存在于应用管理中
app_exists = db.query(App).filter(App.app_code == data.app_code, App.status == 1).first()
if not app_exists:
raise HTTPException(status_code=400, detail=f"应用 '{data.app_code}' 不存在,请先在应用管理中创建")
# 检查是否重复
exists = db.query(TenantApp).filter(
TenantApp.tenant_id == data.tenant_id,
TenantApp.app_code == data.app_code
).first()
if exists:
raise HTTPException(status_code=400, detail="该租户应用配置已存在")
# 自动生成 access_token
access_token = data.access_token or secrets.token_hex(32)
app = TenantApp(
tenant_id=data.tenant_id,
app_code=data.app_code,
app_name=data.app_name,
wechat_app_id=data.wechat_app_id,
access_token=access_token,
allowed_origins=json.dumps(data.allowed_origins) if data.allowed_origins else None,
allowed_tools=json.dumps(data.allowed_tools) if data.allowed_tools else None,
custom_configs=json.dumps([c.model_dump() for c in data.custom_configs], ensure_ascii=False) if data.custom_configs else None,
status=1
)
db.add(app)
db.commit()
db.refresh(app)
return {"success": True, "id": app.id, "access_token": access_token}
@router.put("/{app_id}")
async def update_tenant_app(
app_id: int,
data: TenantAppUpdate,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""更新应用配置"""
app = db.query(TenantApp).filter(TenantApp.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="应用配置不存在")
update_data = data.model_dump(exclude_unset=True)
# 处理 JSON 字段
if 'allowed_origins' in update_data:
update_data['allowed_origins'] = json.dumps(update_data['allowed_origins']) if update_data['allowed_origins'] else None
if 'allowed_tools' in update_data:
update_data['allowed_tools'] = json.dumps(update_data['allowed_tools']) if update_data['allowed_tools'] else None
if 'custom_configs' in update_data:
if update_data['custom_configs']:
update_data['custom_configs'] = json.dumps(
[c.model_dump() if hasattr(c, 'model_dump') else c for c in update_data['custom_configs']],
ensure_ascii=False
)
else:
update_data['custom_configs'] = None
for key, value in update_data.items():
setattr(app, key, value)
db.commit()
return {"success": True}
@router.delete("/{app_id}")
async def delete_tenant_app(
app_id: int,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""删除应用配置"""
app = db.query(TenantApp).filter(TenantApp.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="应用配置不存在")
db.delete(app)
db.commit()
return {"success": True}
@router.get("/{app_id}/token")
async def get_token(
app_id: int,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""获取真实的 access_token仅管理员可用"""
app = db.query(TenantApp).filter(TenantApp.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="应用配置不存在")
# 获取应用的 base_url
app_info = db.query(App).filter(App.app_code == app.app_code).first()
base_url = app_info.base_url if app_info else ""
return {
"access_token": app.access_token,
"base_url": base_url
}
@router.post("/{app_id}/regenerate-token")
async def regenerate_token(
app_id: int,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""重新生成 access_token"""
app = db.query(TenantApp).filter(TenantApp.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="应用配置不存在")
new_token = secrets.token_hex(32)
app.access_token = new_token
db.commit()
return {"success": True, "access_token": new_token}
def format_tenant_app(app: TenantApp, mask_secret: bool = True, db: Session = None) -> dict:
"""格式化应用配置"""
# 获取关联的企微应用信息
wechat_app_info = None
if app.wechat_app_id and db:
from ..models.tenant_wechat_app import TenantWechatApp
wechat_app = db.query(TenantWechatApp).filter(TenantWechatApp.id == app.wechat_app_id).first()
if wechat_app:
wechat_app_info = {
"id": wechat_app.id,
"name": wechat_app.name,
"corp_id": wechat_app.corp_id,
"agent_id": wechat_app.agent_id
}
result = {
"id": app.id,
"tenant_id": app.tenant_id,
"app_code": app.app_code,
"app_name": app.app_name,
"wechat_app_id": app.wechat_app_id,
"wechat_app": wechat_app_info,
"access_token": "******" if mask_secret and app.access_token else app.access_token,
"allowed_origins": json.loads(app.allowed_origins) if app.allowed_origins else [],
"allowed_tools": json.loads(app.allowed_tools) if app.allowed_tools else [],
"custom_configs": json.loads(app.custom_configs) if app.custom_configs else [],
"status": app.status,
"created_at": app.created_at,
"updated_at": app.updated_at
}
return result

View File

@@ -1,391 +1,391 @@
"""租户工具配置路由"""
import json
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import or_
from ..database import get_db
from ..models.tool_config import ToolConfig
from .auth import get_current_user
from ..models.user import User
router = APIRouter(prefix="/tool-configs", tags=["工具配置"])
# ========================================
# Schemas
# ========================================
class ToolConfigCreate(BaseModel):
"""创建配置"""
tenant_id: str
tool_code: Optional[str] = None # NULL 表示租户级共享配置
config_type: str # datasource / jssdk / webhook / params
config_key: str
config_value: Optional[str] = None
is_encrypted: int = 0
description: Optional[str] = None
class ToolConfigUpdate(BaseModel):
"""更新配置"""
config_value: Optional[str] = None
is_encrypted: Optional[int] = None
description: Optional[str] = None
status: Optional[int] = None
class ToolConfigBatchCreate(BaseModel):
"""批量创建配置"""
tenant_id: str
tool_code: Optional[str] = None
configs: List[Dict[str, Any]] # [{config_type, config_key, config_value, description}]
# ========================================
# 工具函数
# ========================================
def format_config(config: ToolConfig, mask_secret: bool = True) -> dict:
"""格式化配置输出"""
value = config.config_value
# 如果需要掩码且是加密字段
if mask_secret and config.is_encrypted and value:
value = value[:4] + "****" + value[-4:] if len(value) > 8 else "****"
return {
"id": config.id,
"tenant_id": config.tenant_id,
"tool_code": config.tool_code,
"config_type": config.config_type,
"config_key": config.config_key,
"config_value": value,
"is_encrypted": config.is_encrypted,
"description": config.description,
"status": config.status,
"created_at": config.created_at.isoformat() if config.created_at else None,
"updated_at": config.updated_at.isoformat() if config.updated_at else None
}
# ========================================
# API Endpoints
# ========================================
@router.get("")
async def list_tool_configs(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100),
tenant_id: Optional[str] = None,
tool_code: Optional[str] = None,
config_type: Optional[str] = None,
keyword: Optional[str] = None,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取配置列表"""
query = db.query(ToolConfig).filter(ToolConfig.status == 1)
if tenant_id:
query = query.filter(ToolConfig.tenant_id == tenant_id)
if tool_code:
if tool_code == "__shared__":
# 特殊标记:只查租户级共享配置
query = query.filter(ToolConfig.tool_code.is_(None))
else:
query = query.filter(ToolConfig.tool_code == tool_code)
if config_type:
query = query.filter(ToolConfig.config_type == config_type)
if keyword:
query = query.filter(
or_(
ToolConfig.config_key.like(f"%{keyword}%"),
ToolConfig.description.like(f"%{keyword}%")
)
)
total = query.count()
configs = query.order_by(ToolConfig.tool_code, ToolConfig.config_type, ToolConfig.config_key)\
.offset((page - 1) * size).limit(size).all()
return {
"total": total,
"page": page,
"size": size,
"items": [format_config(c) for c in configs]
}
@router.get("/tenant/{tenant_id}")
async def get_tenant_configs(
tenant_id: str,
tool_code: Optional[str] = None,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
获取租户完整配置(合并层级)
返回结构:
{
"__shared__": {"config_key": "value", ...}, # 租户级共享配置
"customer-profile": {"config_key": "value", ...}, # 工具级配置
...
}
"""
query = db.query(ToolConfig).filter(
ToolConfig.tenant_id == tenant_id,
ToolConfig.status == 1
)
if tool_code:
# 只查指定工具 + 共享配置
query = query.filter(
or_(
ToolConfig.tool_code == tool_code,
ToolConfig.tool_code.is_(None)
)
)
configs = query.all()
result = {"__shared__": {}}
for config in configs:
tool = config.tool_code or "__shared__"
if tool not in result:
result[tool] = {}
result[tool][config.config_key] = {
"value": config.config_value,
"type": config.config_type,
"encrypted": config.is_encrypted == 1,
"description": config.description
}
return result
@router.get("/merged/{tenant_id}/{tool_code}")
async def get_merged_config(
tenant_id: str,
tool_code: str,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
获取合并后的工具配置(工具级 > 租户级 > 默认值)
返回扁平化的配置字典:
{"config_key": "value", ...}
"""
# 查询租户级配置
shared_configs = db.query(ToolConfig).filter(
ToolConfig.tenant_id == tenant_id,
ToolConfig.tool_code.is_(None),
ToolConfig.status == 1
).all()
# 查询工具级配置
tool_configs = db.query(ToolConfig).filter(
ToolConfig.tenant_id == tenant_id,
ToolConfig.tool_code == tool_code,
ToolConfig.status == 1
).all()
# 合并:工具级覆盖租户级
result = {}
for config in shared_configs:
result[config.config_key] = config.config_value
for config in tool_configs:
result[config.config_key] = config.config_value
return result
@router.get("/{config_id}")
async def get_tool_config(
config_id: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取配置详情"""
config = db.query(ToolConfig).filter(ToolConfig.id == config_id).first()
if not config:
raise HTTPException(status_code=404, detail="配置不存在")
return format_config(config, mask_secret=False)
@router.post("")
async def create_tool_config(
data: ToolConfigCreate,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""创建配置"""
# 检查是否已存在
existing = db.query(ToolConfig).filter(
ToolConfig.tenant_id == data.tenant_id,
ToolConfig.tool_code == data.tool_code if data.tool_code else ToolConfig.tool_code.is_(None),
ToolConfig.config_key == data.config_key
).first()
if existing:
raise HTTPException(status_code=400, detail="配置已存在")
config = ToolConfig(
tenant_id=data.tenant_id,
tool_code=data.tool_code,
config_type=data.config_type,
config_key=data.config_key,
config_value=data.config_value,
is_encrypted=data.is_encrypted,
description=data.description
)
db.add(config)
db.commit()
db.refresh(config)
return format_config(config)
@router.post("/batch")
async def batch_create_configs(
data: ToolConfigBatchCreate,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""批量创建配置"""
created = []
skipped = []
for item in data.configs:
config_key = item.get("config_key")
if not config_key:
continue
# 检查是否已存在
existing = db.query(ToolConfig).filter(
ToolConfig.tenant_id == data.tenant_id,
ToolConfig.tool_code == data.tool_code if data.tool_code else ToolConfig.tool_code.is_(None),
ToolConfig.config_key == config_key
).first()
if existing:
skipped.append(config_key)
continue
config = ToolConfig(
tenant_id=data.tenant_id,
tool_code=data.tool_code,
config_type=item.get("config_type", "params"),
config_key=config_key,
config_value=item.get("config_value"),
is_encrypted=item.get("is_encrypted", 0),
description=item.get("description")
)
db.add(config)
created.append(config_key)
db.commit()
return {
"created": created,
"skipped": skipped,
"message": f"成功创建 {len(created)} 条配置,跳过 {len(skipped)} 条已存在配置"
}
@router.put("/{config_id}")
async def update_tool_config(
config_id: int,
data: ToolConfigUpdate,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""更新配置"""
config = db.query(ToolConfig).filter(ToolConfig.id == config_id).first()
if not config:
raise HTTPException(status_code=404, detail="配置不存在")
if data.config_value is not None:
config.config_value = data.config_value
if data.is_encrypted is not None:
config.is_encrypted = data.is_encrypted
if data.description is not None:
config.description = data.description
if data.status is not None:
config.status = data.status
db.commit()
db.refresh(config)
return format_config(config)
@router.delete("/{config_id}")
async def delete_tool_config(
config_id: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""删除配置(软删除)"""
config = db.query(ToolConfig).filter(ToolConfig.id == config_id).first()
if not config:
raise HTTPException(status_code=404, detail="配置不存在")
config.status = 0
db.commit()
return {"message": "删除成功"}
# ========================================
# 配置类型和键名定义(供前端使用)
# ========================================
@router.get("/schema/types")
async def get_config_types():
"""获取支持的配置类型"""
return {
"types": [
{"code": "datasource", "name": "数据源配置", "description": "数据库连接等"},
{"code": "jssdk", "name": "JS-SDK 配置", "description": "企微侧边栏等"},
{"code": "webhook", "name": "Webhook 配置", "description": "n8n 工作流地址等"},
{"code": "params", "name": "工具参数", "description": "各工具的自定义参数"}
]
}
@router.get("/schema/keys")
async def get_config_keys():
"""获取预定义的配置键(供前端下拉选择)"""
return {
"datasource": [
{"key": "scrm_db_host", "name": "SCRM 数据库地址", "encrypted": False},
{"key": "scrm_db_port", "name": "SCRM 数据库端口", "encrypted": False},
{"key": "scrm_db_user", "name": "SCRM 数据库用户", "encrypted": False},
{"key": "scrm_db_password", "name": "SCRM 数据库密码", "encrypted": True},
{"key": "scrm_db_name", "name": "SCRM 数据库名", "encrypted": False}
],
"jssdk": [
{"key": "corp_id", "name": "企业ID", "encrypted": False},
{"key": "agent_id", "name": "应用ID", "encrypted": False},
{"key": "secret", "name": "应用密钥", "encrypted": True}
],
"webhook": [
{"key": "n8n_base_url", "name": "n8n 基础地址", "encrypted": False},
{"key": "webhook_brainstorm", "name": "头脑风暴 Webhook", "encrypted": False},
{"key": "webhook_high_eq", "name": "高情商回复 Webhook", "encrypted": False},
{"key": "webhook_customer_profile", "name": "客户画像 Webhook", "encrypted": False},
{"key": "webhook_consultation", "name": "面诊方案 Webhook", "encrypted": False},
{"key": "webhook_medical_compliance", "name": "医疗合规 Webhook", "encrypted": False}
],
"params": [
{"key": "default_data_tenant_id", "name": "默认数据租户ID", "encrypted": False},
{"key": "enable_deep_thinking", "name": "启用深度思考", "encrypted": False},
{"key": "max_history_rounds", "name": "最大历史轮数", "encrypted": False}
]
}
"""租户工具配置路由"""
import json
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import or_
from ..database import get_db
from ..models.tool_config import ToolConfig
from .auth import get_current_user
from ..models.user import User
router = APIRouter(prefix="/tool-configs", tags=["工具配置"])
# ========================================
# Schemas
# ========================================
class ToolConfigCreate(BaseModel):
"""创建配置"""
tenant_id: str
tool_code: Optional[str] = None # NULL 表示租户级共享配置
config_type: str # datasource / jssdk / webhook / params
config_key: str
config_value: Optional[str] = None
is_encrypted: int = 0
description: Optional[str] = None
class ToolConfigUpdate(BaseModel):
"""更新配置"""
config_value: Optional[str] = None
is_encrypted: Optional[int] = None
description: Optional[str] = None
status: Optional[int] = None
class ToolConfigBatchCreate(BaseModel):
"""批量创建配置"""
tenant_id: str
tool_code: Optional[str] = None
configs: List[Dict[str, Any]] # [{config_type, config_key, config_value, description}]
# ========================================
# 工具函数
# ========================================
def format_config(config: ToolConfig, mask_secret: bool = True) -> dict:
"""格式化配置输出"""
value = config.config_value
# 如果需要掩码且是加密字段
if mask_secret and config.is_encrypted and value:
value = value[:4] + "****" + value[-4:] if len(value) > 8 else "****"
return {
"id": config.id,
"tenant_id": config.tenant_id,
"tool_code": config.tool_code,
"config_type": config.config_type,
"config_key": config.config_key,
"config_value": value,
"is_encrypted": config.is_encrypted,
"description": config.description,
"status": config.status,
"created_at": config.created_at.isoformat() if config.created_at else None,
"updated_at": config.updated_at.isoformat() if config.updated_at else None
}
# ========================================
# API Endpoints
# ========================================
@router.get("")
async def list_tool_configs(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100),
tenant_id: Optional[str] = None,
tool_code: Optional[str] = None,
config_type: Optional[str] = None,
keyword: Optional[str] = None,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取配置列表"""
query = db.query(ToolConfig).filter(ToolConfig.status == 1)
if tenant_id:
query = query.filter(ToolConfig.tenant_id == tenant_id)
if tool_code:
if tool_code == "__shared__":
# 特殊标记:只查租户级共享配置
query = query.filter(ToolConfig.tool_code.is_(None))
else:
query = query.filter(ToolConfig.tool_code == tool_code)
if config_type:
query = query.filter(ToolConfig.config_type == config_type)
if keyword:
query = query.filter(
or_(
ToolConfig.config_key.like(f"%{keyword}%"),
ToolConfig.description.like(f"%{keyword}%")
)
)
total = query.count()
configs = query.order_by(ToolConfig.tool_code, ToolConfig.config_type, ToolConfig.config_key)\
.offset((page - 1) * size).limit(size).all()
return {
"total": total,
"page": page,
"size": size,
"items": [format_config(c) for c in configs]
}
@router.get("/tenant/{tenant_id}")
async def get_tenant_configs(
tenant_id: str,
tool_code: Optional[str] = None,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
获取租户完整配置(合并层级)
返回结构:
{
"__shared__": {"config_key": "value", ...}, # 租户级共享配置
"customer-profile": {"config_key": "value", ...}, # 工具级配置
...
}
"""
query = db.query(ToolConfig).filter(
ToolConfig.tenant_id == tenant_id,
ToolConfig.status == 1
)
if tool_code:
# 只查指定工具 + 共享配置
query = query.filter(
or_(
ToolConfig.tool_code == tool_code,
ToolConfig.tool_code.is_(None)
)
)
configs = query.all()
result = {"__shared__": {}}
for config in configs:
tool = config.tool_code or "__shared__"
if tool not in result:
result[tool] = {}
result[tool][config.config_key] = {
"value": config.config_value,
"type": config.config_type,
"encrypted": config.is_encrypted == 1,
"description": config.description
}
return result
@router.get("/merged/{tenant_id}/{tool_code}")
async def get_merged_config(
tenant_id: str,
tool_code: str,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
获取合并后的工具配置(工具级 > 租户级 > 默认值)
返回扁平化的配置字典:
{"config_key": "value", ...}
"""
# 查询租户级配置
shared_configs = db.query(ToolConfig).filter(
ToolConfig.tenant_id == tenant_id,
ToolConfig.tool_code.is_(None),
ToolConfig.status == 1
).all()
# 查询工具级配置
tool_configs = db.query(ToolConfig).filter(
ToolConfig.tenant_id == tenant_id,
ToolConfig.tool_code == tool_code,
ToolConfig.status == 1
).all()
# 合并:工具级覆盖租户级
result = {}
for config in shared_configs:
result[config.config_key] = config.config_value
for config in tool_configs:
result[config.config_key] = config.config_value
return result
@router.get("/{config_id}")
async def get_tool_config(
config_id: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取配置详情"""
config = db.query(ToolConfig).filter(ToolConfig.id == config_id).first()
if not config:
raise HTTPException(status_code=404, detail="配置不存在")
return format_config(config, mask_secret=False)
@router.post("")
async def create_tool_config(
data: ToolConfigCreate,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""创建配置"""
# 检查是否已存在
existing = db.query(ToolConfig).filter(
ToolConfig.tenant_id == data.tenant_id,
ToolConfig.tool_code == data.tool_code if data.tool_code else ToolConfig.tool_code.is_(None),
ToolConfig.config_key == data.config_key
).first()
if existing:
raise HTTPException(status_code=400, detail="配置已存在")
config = ToolConfig(
tenant_id=data.tenant_id,
tool_code=data.tool_code,
config_type=data.config_type,
config_key=data.config_key,
config_value=data.config_value,
is_encrypted=data.is_encrypted,
description=data.description
)
db.add(config)
db.commit()
db.refresh(config)
return format_config(config)
@router.post("/batch")
async def batch_create_configs(
data: ToolConfigBatchCreate,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""批量创建配置"""
created = []
skipped = []
for item in data.configs:
config_key = item.get("config_key")
if not config_key:
continue
# 检查是否已存在
existing = db.query(ToolConfig).filter(
ToolConfig.tenant_id == data.tenant_id,
ToolConfig.tool_code == data.tool_code if data.tool_code else ToolConfig.tool_code.is_(None),
ToolConfig.config_key == config_key
).first()
if existing:
skipped.append(config_key)
continue
config = ToolConfig(
tenant_id=data.tenant_id,
tool_code=data.tool_code,
config_type=item.get("config_type", "params"),
config_key=config_key,
config_value=item.get("config_value"),
is_encrypted=item.get("is_encrypted", 0),
description=item.get("description")
)
db.add(config)
created.append(config_key)
db.commit()
return {
"created": created,
"skipped": skipped,
"message": f"成功创建 {len(created)} 条配置,跳过 {len(skipped)} 条已存在配置"
}
@router.put("/{config_id}")
async def update_tool_config(
config_id: int,
data: ToolConfigUpdate,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""更新配置"""
config = db.query(ToolConfig).filter(ToolConfig.id == config_id).first()
if not config:
raise HTTPException(status_code=404, detail="配置不存在")
if data.config_value is not None:
config.config_value = data.config_value
if data.is_encrypted is not None:
config.is_encrypted = data.is_encrypted
if data.description is not None:
config.description = data.description
if data.status is not None:
config.status = data.status
db.commit()
db.refresh(config)
return format_config(config)
@router.delete("/{config_id}")
async def delete_tool_config(
config_id: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""删除配置(软删除)"""
config = db.query(ToolConfig).filter(ToolConfig.id == config_id).first()
if not config:
raise HTTPException(status_code=404, detail="配置不存在")
config.status = 0
db.commit()
return {"message": "删除成功"}
# ========================================
# 配置类型和键名定义(供前端使用)
# ========================================
@router.get("/schema/types")
async def get_config_types():
"""获取支持的配置类型"""
return {
"types": [
{"code": "datasource", "name": "数据源配置", "description": "数据库连接等"},
{"code": "jssdk", "name": "JS-SDK 配置", "description": "企微侧边栏等"},
{"code": "webhook", "name": "Webhook 配置", "description": "n8n 工作流地址等"},
{"code": "params", "name": "工具参数", "description": "各工具的自定义参数"}
]
}
@router.get("/schema/keys")
async def get_config_keys():
"""获取预定义的配置键(供前端下拉选择)"""
return {
"datasource": [
{"key": "scrm_db_host", "name": "SCRM 数据库地址", "encrypted": False},
{"key": "scrm_db_port", "name": "SCRM 数据库端口", "encrypted": False},
{"key": "scrm_db_user", "name": "SCRM 数据库用户", "encrypted": False},
{"key": "scrm_db_password", "name": "SCRM 数据库密码", "encrypted": True},
{"key": "scrm_db_name", "name": "SCRM 数据库名", "encrypted": False}
],
"jssdk": [
{"key": "corp_id", "name": "企业ID", "encrypted": False},
{"key": "agent_id", "name": "应用ID", "encrypted": False},
{"key": "secret", "name": "应用密钥", "encrypted": True}
],
"webhook": [
{"key": "n8n_base_url", "name": "n8n 基础地址", "encrypted": False},
{"key": "webhook_brainstorm", "name": "头脑风暴 Webhook", "encrypted": False},
{"key": "webhook_high_eq", "name": "高情商回复 Webhook", "encrypted": False},
{"key": "webhook_customer_profile", "name": "客户画像 Webhook", "encrypted": False},
{"key": "webhook_consultation", "name": "面诊方案 Webhook", "encrypted": False},
{"key": "webhook_medical_compliance", "name": "医疗合规 Webhook", "encrypted": False}
],
"params": [
{"key": "default_data_tenant_id", "name": "默认数据租户ID", "encrypted": False},
{"key": "enable_deep_thinking", "name": "启用深度思考", "encrypted": False},
{"key": "max_history_rounds", "name": "最大历史轮数", "encrypted": False}
]
}