feat: 实现通知渠道管理功能
All checks were successful
continuous-integration/drone/push Build is passing

- 新增 platform_notification_channels 表管理通知渠道(钉钉/企微机器人)
- 新增通知渠道管理页面,支持创建、编辑、测试、删除
- 定时任务增加通知渠道选择和企微应用选择
- 脚本执行支持返回值(result变量),自动发送到配置的渠道
- 调度器执行脚本后根据配置自动发送通知

使用方式:
1. 在「通知渠道」页面为租户配置钉钉/企微机器人
2. 创建定时任务时选择通知渠道
3. 脚本中设置 result = {'content': '内容', 'title': '标题'}
4. 任务执行后自动发送到配置的渠道
This commit is contained in:
2026-01-28 17:02:20 +08:00
parent d9fa9708ce
commit 2fbba63884
12 changed files with 800 additions and 25 deletions

View File

@@ -15,6 +15,7 @@ from .routers.alerts import router as alerts_router
from .routers.cost import router as cost_router
from .routers.quota import router as quota_router
from .routers.tasks import router as tasks_router
from .routers.notification_channels import router as notification_channels_router
from .middleware import TraceMiddleware, setup_exception_handlers, RequestLoggerMiddleware
from .middleware.trace import setup_logging
from .services.scheduler import scheduler_service
@@ -69,6 +70,7 @@ app.include_router(alerts_router, prefix="/api")
app.include_router(cost_router, prefix="/api")
app.include_router(quota_router, prefix="/api")
app.include_router(tasks_router)
app.include_router(notification_channels_router)
# 应用生命周期事件

View File

@@ -9,6 +9,7 @@ from .logs import PlatformLog
from .alert import AlertRule, AlertRecord, NotificationChannel
from .pricing import ModelPricing, TenantBilling
from .scheduled_task import ScheduledTask, TaskLog, ScriptVar, Secret
from .notification_channel import NotificationChannel
__all__ = [
"Tenant",
@@ -29,5 +30,6 @@ __all__ = [
"ScheduledTask",
"TaskLog",
"ScriptVar",
"Secret"
"Secret",
"NotificationChannel"
]

View File

@@ -0,0 +1,20 @@
"""通知渠道模型"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, Enum, Boolean, DateTime
from ..database import Base
class NotificationChannel(Base):
"""通知渠道表"""
__tablename__ = "platform_notification_channels"
id = Column(Integer, primary_key=True, autoincrement=True)
tenant_id = Column(String(50), nullable=False)
channel_name = Column(String(100), nullable=False)
channel_type = Column(Enum('dingtalk_bot', 'wecom_bot'), nullable=False)
webhook_url = Column(String(500), nullable=False)
description = Column(String(255))
is_enabled = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)

View File

@@ -40,6 +40,10 @@ class ScheduledTask(Base):
alert_on_failure = Column(Boolean, default=False)
alert_webhook = Column(String(500))
# 通知配置
notify_channels = Column(JSON) # 通知渠道ID列表
notify_wecom_app_id = Column(Integer) # 企微应用ID
# 状态
is_enabled = Column(Boolean, default=True)
last_run_at = Column(DateTime)

View File

@@ -0,0 +1,181 @@
"""通知渠道API路由"""
from typing import Optional, List
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy import desc
from ..database import get_db
from ..models.notification_channel import NotificationChannel
router = APIRouter(prefix="/api/notification-channels", tags=["notification-channels"])
# ==================== Schemas ====================
class ChannelCreate(BaseModel):
tenant_id: str
channel_name: str
channel_type: str # dingtalk_bot, wecom_bot
webhook_url: str
description: Optional[str] = None
class ChannelUpdate(BaseModel):
channel_name: Optional[str] = None
channel_type: Optional[str] = None
webhook_url: Optional[str] = None
description: Optional[str] = None
is_enabled: Optional[bool] = None
# ==================== CRUD ====================
@router.get("")
async def list_channels(
tenant_id: Optional[str] = None,
channel_type: Optional[str] = None,
is_enabled: Optional[bool] = None,
page: int = Query(1, ge=1),
size: int = Query(50, ge=1, le=100),
db: Session = Depends(get_db)
):
"""获取通知渠道列表"""
query = db.query(NotificationChannel)
if tenant_id:
query = query.filter(NotificationChannel.tenant_id == tenant_id)
if channel_type:
query = query.filter(NotificationChannel.channel_type == channel_type)
if is_enabled is not None:
query = query.filter(NotificationChannel.is_enabled == is_enabled)
total = query.count()
items = query.order_by(desc(NotificationChannel.created_at)).offset((page - 1) * size).limit(size).all()
return {
"total": total,
"items": [format_channel(c) for c in items]
}
@router.get("/{channel_id}")
async def get_channel(channel_id: int, db: Session = Depends(get_db)):
"""获取渠道详情"""
channel = db.query(NotificationChannel).filter(NotificationChannel.id == channel_id).first()
if not channel:
raise HTTPException(status_code=404, detail="渠道不存在")
return format_channel(channel)
@router.post("")
async def create_channel(data: ChannelCreate, db: Session = Depends(get_db)):
"""创建通知渠道"""
channel = NotificationChannel(
tenant_id=data.tenant_id,
channel_name=data.channel_name,
channel_type=data.channel_type,
webhook_url=data.webhook_url,
description=data.description,
is_enabled=True
)
db.add(channel)
db.commit()
db.refresh(channel)
return {"success": True, "id": channel.id}
@router.put("/{channel_id}")
async def update_channel(channel_id: int, data: ChannelUpdate, db: Session = Depends(get_db)):
"""更新通知渠道"""
channel = db.query(NotificationChannel).filter(NotificationChannel.id == channel_id).first()
if not channel:
raise HTTPException(status_code=404, detail="渠道不存在")
if data.channel_name is not None:
channel.channel_name = data.channel_name
if data.channel_type is not None:
channel.channel_type = data.channel_type
if data.webhook_url is not None:
channel.webhook_url = data.webhook_url
if data.description is not None:
channel.description = data.description
if data.is_enabled is not None:
channel.is_enabled = data.is_enabled
db.commit()
return {"success": True}
@router.delete("/{channel_id}")
async def delete_channel(channel_id: int, db: Session = Depends(get_db)):
"""删除通知渠道"""
channel = db.query(NotificationChannel).filter(NotificationChannel.id == channel_id).first()
if not channel:
raise HTTPException(status_code=404, detail="渠道不存在")
db.delete(channel)
db.commit()
return {"success": True}
@router.post("/{channel_id}/test")
async def test_channel(channel_id: int, db: Session = Depends(get_db)):
"""测试通知渠道"""
import httpx
channel = db.query(NotificationChannel).filter(NotificationChannel.id == channel_id).first()
if not channel:
raise HTTPException(status_code=404, detail="渠道不存在")
test_content = f"**测试消息**\n\n渠道名称: {channel.channel_name}\n发送时间: 测试中..."
try:
if channel.channel_type == 'dingtalk_bot':
payload = {
"msgtype": "markdown",
"markdown": {
"title": "渠道测试",
"text": test_content
}
}
else: # wecom_bot
payload = {
"msgtype": "markdown",
"markdown": {
"content": test_content
}
}
async with httpx.AsyncClient(timeout=10) as client:
response = await client.post(channel.webhook_url, json=payload)
result = response.json()
# 钉钉返回 errcode=0企微返回 errcode=0
if result.get('errcode') == 0:
return {"success": True, "message": "发送成功"}
else:
return {"success": False, "message": f"发送失败: {result}"}
except Exception as e:
return {"success": False, "message": f"发送失败: {str(e)}"}
# ==================== Helpers ====================
def format_channel(channel: NotificationChannel) -> dict:
"""格式化渠道数据"""
return {
"id": channel.id,
"tenant_id": channel.tenant_id,
"channel_name": channel.channel_name,
"channel_type": channel.channel_type,
"webhook_url": channel.webhook_url,
"description": channel.description,
"is_enabled": channel.is_enabled,
"created_at": channel.created_at,
"updated_at": channel.updated_at
}

View File

@@ -33,6 +33,8 @@ class TaskCreate(BaseModel):
retry_interval: Optional[int] = 60
alert_on_failure: Optional[bool] = False
alert_webhook: Optional[str] = None
notify_channels: Optional[List[int]] = None # 通知渠道ID列表
notify_wecom_app_id: Optional[int] = None # 企微应用ID
class TaskUpdate(BaseModel):
@@ -51,6 +53,8 @@ class TaskUpdate(BaseModel):
alert_on_failure: Optional[bool] = None
alert_webhook: Optional[str] = None
is_enabled: Optional[bool] = None
notify_channels: Optional[List[int]] = None
notify_wecom_app_id: Optional[int] = None
class SecretCreate(BaseModel):
@@ -126,6 +130,8 @@ async def create_task(data: TaskCreate, db: Session = Depends(get_db)):
retry_interval=data.retry_interval,
alert_on_failure=data.alert_on_failure,
alert_webhook=data.alert_webhook,
notify_channels=data.notify_channels,
notify_wecom_app_id=data.notify_wecom_app_id,
is_enabled=True
)
@@ -175,6 +181,10 @@ async def update_task(task_id: int, data: TaskUpdate, db: Session = Depends(get_
task.alert_on_failure = data.alert_on_failure
if data.alert_webhook is not None:
task.alert_webhook = data.alert_webhook
if data.notify_channels is not None:
task.notify_channels = data.notify_channels
if data.notify_wecom_app_id is not None:
task.notify_wecom_app_id = data.notify_wecom_app_id
if data.is_enabled is not None:
task.is_enabled = data.is_enabled
@@ -490,6 +500,14 @@ def format_task(task: ScheduledTask, include_content: bool = False) -> dict:
except:
time_points = []
# 处理 notify_channels
notify_channels = task.notify_channels
if isinstance(notify_channels, str):
try:
notify_channels = json.loads(notify_channels)
except:
notify_channels = []
data = {
"id": task.id,
"tenant_id": task.tenant_id,
@@ -505,6 +523,8 @@ def format_task(task: ScheduledTask, include_content: bool = False) -> dict:
"retry_interval": task.retry_interval,
"alert_on_failure": bool(task.alert_on_failure),
"alert_webhook": task.alert_webhook,
"notify_channels": notify_channels or [],
"notify_wecom_app_id": task.notify_wecom_app_id,
"created_at": task.created_at,
"updated_at": task.updated_at
}

View File

@@ -10,6 +10,7 @@ from sqlalchemy.orm import Session
from ..database import SessionLocal
from ..models.scheduled_task import ScheduledTask, TaskLog
from ..models.notification_channel import NotificationChannel
from .script_executor import ScriptExecutor
@@ -167,6 +168,7 @@ class SchedulerService:
success = False
output = ''
error = ''
result = None
try:
# 解析输入参数
@@ -177,7 +179,11 @@ class SchedulerService:
if task.execution_type == 'webhook':
success, output, error = await self._execute_webhook(task)
else:
success, output, error = await self._execute_script(db, task, trace_id, params)
success, output, error, result = await self._execute_script(db, task, trace_id, params)
# 如果脚本执行成功且有返回内容,发送通知
if success and result and result.get('content'):
await self._send_notifications(db, task, result)
except Exception as e:
error = str(e)
@@ -218,10 +224,10 @@ class SchedulerService:
async def _execute_script(self, db: Session, task: ScheduledTask, trace_id: str, params: dict):
"""执行脚本任务"""
if not task.script_content:
return False, '', '脚本内容为空'
return False, '', '脚本内容为空', None
executor = ScriptExecutor(db)
success, output, error = executor.execute(
success, output, error, result = executor.execute(
script_content=task.script_content,
task_id=task.id,
tenant_id=task.tenant_id,
@@ -230,8 +236,117 @@ class SchedulerService:
timeout=300 # 默认超时
)
return success, output, error
return success, output, error, result
async def _send_notifications(self, db: Session, task: ScheduledTask, result: dict):
"""发送通知到配置的渠道"""
content = result.get('content', '')
title = result.get('title', task.task_name)
if not content:
return
# 获取通知渠道配置
channel_ids = task.notify_channels
if isinstance(channel_ids, str):
try:
channel_ids = json.loads(channel_ids)
except:
channel_ids = []
if not channel_ids:
channel_ids = []
# 发送到通知渠道
for channel_id in channel_ids:
try:
channel = db.query(NotificationChannel).filter(
NotificationChannel.id == channel_id,
NotificationChannel.is_enabled == True
).first()
if not channel:
continue
await self._send_to_channel(channel, content, title)
except Exception as e:
print(f"发送通知到渠道 {channel_id} 失败: {e}")
# 发送到企微应用
if task.notify_wecom_app_id:
try:
await self._send_to_wecom_app(db, task.notify_wecom_app_id, content, title, task.tenant_id)
except Exception as e:
print(f"发送企微应用消息失败: {e}")
async def _send_to_channel(self, channel: NotificationChannel, content: str, title: str):
"""发送消息到通知渠道"""
if channel.channel_type == 'dingtalk_bot':
payload = {
"msgtype": "markdown",
"markdown": {
"title": title,
"text": content
}
}
else: # wecom_bot
payload = {
"msgtype": "markdown",
"markdown": {
"content": f"**{title}**\n\n{content}"
}
}
async with httpx.AsyncClient(timeout=10) as client:
response = await client.post(channel.webhook_url, json=payload)
result = response.json()
if result.get('errcode') != 0:
print(f"通知发送失败: {result}")
async def _send_to_wecom_app(self, db: Session, app_id: int, content: str, title: str, tenant_id: str):
"""发送消息到企微应用"""
from ..models.tenant_wechat_app import TenantWechatApp
app = db.query(TenantWechatApp).filter(TenantWechatApp.id == app_id).first()
if not app:
return
# 获取 access_token
access_token = await self._get_wecom_access_token(app.corp_id, app.app_secret)
if not access_token:
return
# 发送消息
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
payload = {
"touser": "@all",
"msgtype": "markdown",
"agentid": app.agent_id,
"markdown": {
"content": f"**{title}**\n\n{content}"
}
}
async with httpx.AsyncClient(timeout=10) as client:
response = await client.post(url, json=payload)
result = response.json()
if result.get('errcode') != 0:
print(f"企微应用消息发送失败: {result}")
async def _get_wecom_access_token(self, corp_id: str, app_secret: str) -> Optional[str]:
"""获取企微 access_token"""
url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corp_id}&corpsecret={app_secret}"
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(url)
result = response.json()
if result.get('errcode') == 0:
return result.get('access_token')
else:
print(f"获取企微 access_token 失败: {result}")
return None
async def _send_alert(self, task: ScheduledTask, error: str):
"""发送失败告警"""
if not task.alert_webhook:

View File

@@ -49,7 +49,7 @@ class ScriptExecutor:
trace_id: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
timeout: int = 300
) -> Tuple[bool, str, str]:
) -> Tuple[bool, str, str, Optional[Dict]]:
"""执行脚本
Args:
@@ -61,7 +61,8 @@ class ScriptExecutor:
timeout: 超时秒数
Returns:
(success, output, error)
(success, output, error, result)
result: 脚本返回值 {'content': '...', 'title': '...'}
"""
# 创建SDK实例
sdk = ScriptSDK(
@@ -75,7 +76,7 @@ class ScriptExecutor:
# 检查脚本安全性
check_result = self._check_script_safety(script_content)
if check_result:
return False, '', f"脚本安全检查失败: {check_result}"
return False, '', f"脚本安全检查失败: {check_result}", None
# 准备执行环境
safe_globals = self._create_safe_globals(sdk)
@@ -101,11 +102,22 @@ class ScriptExecutor:
# 合并输出
output = '\n'.join(filter(None, [sdk_output, stdout_output]))
return True, output, ''
# 获取脚本返回值(通过 __result__ 变量)
result = safe_globals.get('__result__')
if result is None and 'result' in safe_globals:
result = safe_globals.get('result')
# 如果返回的是字符串,包装成字典
if isinstance(result, str):
result = {'content': result}
elif result is not None and not isinstance(result, dict):
result = {'content': str(result)}
return True, output, '', result
except Exception as e:
error_msg = f"{type(e).__name__}: {str(e)}\n{traceback.format_exc()}"
return False, sdk.get_output(), error_msg
return False, sdk.get_output(), error_msg, None
finally:
sys.stdout = old_stdout
@@ -223,12 +235,12 @@ class ScriptExecutor:
"output": str,
"error": str,
"duration_ms": int,
"logs": [...]
"result": dict
}
"""
start_time = datetime.now()
success, output, error = self.execute(
success, output, error, result = self.execute(
script_content=script_content,
task_id=task_id,
tenant_id=tenant_id,
@@ -242,5 +254,6 @@ class ScriptExecutor:
"success": success,
"output": output,
"error": error,
"duration_ms": duration_ms
"duration_ms": duration_ms,
"result": result
}