diff --git a/backend/app/main.py b/backend/app/main.py index 18869c7..07a5da5 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -15,6 +15,7 @@ from .routers.alerts import router as alerts_router from .routers.cost import router as cost_router from .routers.quota import router as quota_router from .routers.tasks import router as tasks_router +from .routers.notification_channels import router as notification_channels_router from .middleware import TraceMiddleware, setup_exception_handlers, RequestLoggerMiddleware from .middleware.trace import setup_logging from .services.scheduler import scheduler_service @@ -69,6 +70,7 @@ app.include_router(alerts_router, prefix="/api") app.include_router(cost_router, prefix="/api") app.include_router(quota_router, prefix="/api") app.include_router(tasks_router) +app.include_router(notification_channels_router) # 应用生命周期事件 diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 743cbf5..343c160 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -9,6 +9,7 @@ from .logs import PlatformLog from .alert import AlertRule, AlertRecord, NotificationChannel from .pricing import ModelPricing, TenantBilling from .scheduled_task import ScheduledTask, TaskLog, ScriptVar, Secret +from .notification_channel import NotificationChannel __all__ = [ "Tenant", @@ -29,5 +30,6 @@ __all__ = [ "ScheduledTask", "TaskLog", "ScriptVar", - "Secret" + "Secret", + "NotificationChannel" ] diff --git a/backend/app/models/notification_channel.py b/backend/app/models/notification_channel.py new file mode 100644 index 0000000..d4f8342 --- /dev/null +++ b/backend/app/models/notification_channel.py @@ -0,0 +1,20 @@ +"""通知渠道模型""" +from datetime import datetime +from sqlalchemy import Column, Integer, String, Enum, Boolean, DateTime +from ..database import Base + + +class NotificationChannel(Base): + """通知渠道表""" + __tablename__ = "platform_notification_channels" + + id = Column(Integer, primary_key=True, autoincrement=True) + tenant_id = Column(String(50), nullable=False) + channel_name = Column(String(100), nullable=False) + channel_type = Column(Enum('dingtalk_bot', 'wecom_bot'), nullable=False) + webhook_url = Column(String(500), nullable=False) + description = Column(String(255)) + is_enabled = Column(Boolean, default=True) + + created_at = Column(DateTime, default=datetime.now) + updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) diff --git a/backend/app/models/scheduled_task.py b/backend/app/models/scheduled_task.py index 2699978..75668a6 100644 --- a/backend/app/models/scheduled_task.py +++ b/backend/app/models/scheduled_task.py @@ -40,6 +40,10 @@ class ScheduledTask(Base): alert_on_failure = Column(Boolean, default=False) alert_webhook = Column(String(500)) + # 通知配置 + notify_channels = Column(JSON) # 通知渠道ID列表 + notify_wecom_app_id = Column(Integer) # 企微应用ID + # 状态 is_enabled = Column(Boolean, default=True) last_run_at = Column(DateTime) diff --git a/backend/app/routers/notification_channels.py b/backend/app/routers/notification_channels.py new file mode 100644 index 0000000..b8432cc --- /dev/null +++ b/backend/app/routers/notification_channels.py @@ -0,0 +1,181 @@ +"""通知渠道API路由""" +from typing import Optional, List +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy.orm import Session +from sqlalchemy import desc + +from ..database import get_db +from ..models.notification_channel import NotificationChannel + + +router = APIRouter(prefix="/api/notification-channels", tags=["notification-channels"]) + + +# ==================== Schemas ==================== + +class ChannelCreate(BaseModel): + tenant_id: str + channel_name: str + channel_type: str # dingtalk_bot, wecom_bot + webhook_url: str + description: Optional[str] = None + + +class ChannelUpdate(BaseModel): + channel_name: Optional[str] = None + channel_type: Optional[str] = None + webhook_url: Optional[str] = None + description: Optional[str] = None + is_enabled: Optional[bool] = None + + +# ==================== CRUD ==================== + +@router.get("") +async def list_channels( + tenant_id: Optional[str] = None, + channel_type: Optional[str] = None, + is_enabled: Optional[bool] = None, + page: int = Query(1, ge=1), + size: int = Query(50, ge=1, le=100), + db: Session = Depends(get_db) +): + """获取通知渠道列表""" + query = db.query(NotificationChannel) + + if tenant_id: + query = query.filter(NotificationChannel.tenant_id == tenant_id) + if channel_type: + query = query.filter(NotificationChannel.channel_type == channel_type) + if is_enabled is not None: + query = query.filter(NotificationChannel.is_enabled == is_enabled) + + total = query.count() + items = query.order_by(desc(NotificationChannel.created_at)).offset((page - 1) * size).limit(size).all() + + return { + "total": total, + "items": [format_channel(c) for c in items] + } + + +@router.get("/{channel_id}") +async def get_channel(channel_id: int, db: Session = Depends(get_db)): + """获取渠道详情""" + channel = db.query(NotificationChannel).filter(NotificationChannel.id == channel_id).first() + if not channel: + raise HTTPException(status_code=404, detail="渠道不存在") + return format_channel(channel) + + +@router.post("") +async def create_channel(data: ChannelCreate, db: Session = Depends(get_db)): + """创建通知渠道""" + channel = NotificationChannel( + tenant_id=data.tenant_id, + channel_name=data.channel_name, + channel_type=data.channel_type, + webhook_url=data.webhook_url, + description=data.description, + is_enabled=True + ) + + db.add(channel) + db.commit() + db.refresh(channel) + + return {"success": True, "id": channel.id} + + +@router.put("/{channel_id}") +async def update_channel(channel_id: int, data: ChannelUpdate, db: Session = Depends(get_db)): + """更新通知渠道""" + channel = db.query(NotificationChannel).filter(NotificationChannel.id == channel_id).first() + if not channel: + raise HTTPException(status_code=404, detail="渠道不存在") + + if data.channel_name is not None: + channel.channel_name = data.channel_name + if data.channel_type is not None: + channel.channel_type = data.channel_type + if data.webhook_url is not None: + channel.webhook_url = data.webhook_url + if data.description is not None: + channel.description = data.description + if data.is_enabled is not None: + channel.is_enabled = data.is_enabled + + db.commit() + return {"success": True} + + +@router.delete("/{channel_id}") +async def delete_channel(channel_id: int, db: Session = Depends(get_db)): + """删除通知渠道""" + channel = db.query(NotificationChannel).filter(NotificationChannel.id == channel_id).first() + if not channel: + raise HTTPException(status_code=404, detail="渠道不存在") + + db.delete(channel) + db.commit() + return {"success": True} + + +@router.post("/{channel_id}/test") +async def test_channel(channel_id: int, db: Session = Depends(get_db)): + """测试通知渠道""" + import httpx + + channel = db.query(NotificationChannel).filter(NotificationChannel.id == channel_id).first() + if not channel: + raise HTTPException(status_code=404, detail="渠道不存在") + + test_content = f"**测试消息**\n\n渠道名称: {channel.channel_name}\n发送时间: 测试中..." + + try: + if channel.channel_type == 'dingtalk_bot': + payload = { + "msgtype": "markdown", + "markdown": { + "title": "渠道测试", + "text": test_content + } + } + else: # wecom_bot + payload = { + "msgtype": "markdown", + "markdown": { + "content": test_content + } + } + + async with httpx.AsyncClient(timeout=10) as client: + response = await client.post(channel.webhook_url, json=payload) + result = response.json() + + # 钉钉返回 errcode=0,企微返回 errcode=0 + if result.get('errcode') == 0: + return {"success": True, "message": "发送成功"} + else: + return {"success": False, "message": f"发送失败: {result}"} + + except Exception as e: + return {"success": False, "message": f"发送失败: {str(e)}"} + + +# ==================== Helpers ==================== + +def format_channel(channel: NotificationChannel) -> dict: + """格式化渠道数据""" + return { + "id": channel.id, + "tenant_id": channel.tenant_id, + "channel_name": channel.channel_name, + "channel_type": channel.channel_type, + "webhook_url": channel.webhook_url, + "description": channel.description, + "is_enabled": channel.is_enabled, + "created_at": channel.created_at, + "updated_at": channel.updated_at + } diff --git a/backend/app/routers/tasks.py b/backend/app/routers/tasks.py index a7f584f..431025f 100644 --- a/backend/app/routers/tasks.py +++ b/backend/app/routers/tasks.py @@ -33,6 +33,8 @@ class TaskCreate(BaseModel): retry_interval: Optional[int] = 60 alert_on_failure: Optional[bool] = False alert_webhook: Optional[str] = None + notify_channels: Optional[List[int]] = None # 通知渠道ID列表 + notify_wecom_app_id: Optional[int] = None # 企微应用ID class TaskUpdate(BaseModel): @@ -51,6 +53,8 @@ class TaskUpdate(BaseModel): alert_on_failure: Optional[bool] = None alert_webhook: Optional[str] = None is_enabled: Optional[bool] = None + notify_channels: Optional[List[int]] = None + notify_wecom_app_id: Optional[int] = None class SecretCreate(BaseModel): @@ -126,6 +130,8 @@ async def create_task(data: TaskCreate, db: Session = Depends(get_db)): retry_interval=data.retry_interval, alert_on_failure=data.alert_on_failure, alert_webhook=data.alert_webhook, + notify_channels=data.notify_channels, + notify_wecom_app_id=data.notify_wecom_app_id, is_enabled=True ) @@ -175,6 +181,10 @@ async def update_task(task_id: int, data: TaskUpdate, db: Session = Depends(get_ task.alert_on_failure = data.alert_on_failure if data.alert_webhook is not None: task.alert_webhook = data.alert_webhook + if data.notify_channels is not None: + task.notify_channels = data.notify_channels + if data.notify_wecom_app_id is not None: + task.notify_wecom_app_id = data.notify_wecom_app_id if data.is_enabled is not None: task.is_enabled = data.is_enabled @@ -490,6 +500,14 @@ def format_task(task: ScheduledTask, include_content: bool = False) -> dict: except: time_points = [] + # 处理 notify_channels + notify_channels = task.notify_channels + if isinstance(notify_channels, str): + try: + notify_channels = json.loads(notify_channels) + except: + notify_channels = [] + data = { "id": task.id, "tenant_id": task.tenant_id, @@ -505,6 +523,8 @@ def format_task(task: ScheduledTask, include_content: bool = False) -> dict: "retry_interval": task.retry_interval, "alert_on_failure": bool(task.alert_on_failure), "alert_webhook": task.alert_webhook, + "notify_channels": notify_channels or [], + "notify_wecom_app_id": task.notify_wecom_app_id, "created_at": task.created_at, "updated_at": task.updated_at } diff --git a/backend/app/services/scheduler.py b/backend/app/services/scheduler.py index 295363b..b8f80a8 100644 --- a/backend/app/services/scheduler.py +++ b/backend/app/services/scheduler.py @@ -10,6 +10,7 @@ from sqlalchemy.orm import Session from ..database import SessionLocal from ..models.scheduled_task import ScheduledTask, TaskLog +from ..models.notification_channel import NotificationChannel from .script_executor import ScriptExecutor @@ -167,6 +168,7 @@ class SchedulerService: success = False output = '' error = '' + result = None try: # 解析输入参数 @@ -177,7 +179,11 @@ class SchedulerService: if task.execution_type == 'webhook': success, output, error = await self._execute_webhook(task) else: - success, output, error = await self._execute_script(db, task, trace_id, params) + success, output, error, result = await self._execute_script(db, task, trace_id, params) + + # 如果脚本执行成功且有返回内容,发送通知 + if success and result and result.get('content'): + await self._send_notifications(db, task, result) except Exception as e: error = str(e) @@ -218,10 +224,10 @@ class SchedulerService: async def _execute_script(self, db: Session, task: ScheduledTask, trace_id: str, params: dict): """执行脚本任务""" if not task.script_content: - return False, '', '脚本内容为空' + return False, '', '脚本内容为空', None executor = ScriptExecutor(db) - success, output, error = executor.execute( + success, output, error, result = executor.execute( script_content=task.script_content, task_id=task.id, tenant_id=task.tenant_id, @@ -230,8 +236,117 @@ class SchedulerService: timeout=300 # 默认超时 ) - return success, output, error + return success, output, error, result + async def _send_notifications(self, db: Session, task: ScheduledTask, result: dict): + """发送通知到配置的渠道""" + content = result.get('content', '') + title = result.get('title', task.task_name) + + if not content: + return + + # 获取通知渠道配置 + channel_ids = task.notify_channels + if isinstance(channel_ids, str): + try: + channel_ids = json.loads(channel_ids) + except: + channel_ids = [] + + if not channel_ids: + channel_ids = [] + + # 发送到通知渠道 + for channel_id in channel_ids: + try: + channel = db.query(NotificationChannel).filter( + NotificationChannel.id == channel_id, + NotificationChannel.is_enabled == True + ).first() + + if not channel: + continue + + await self._send_to_channel(channel, content, title) + + except Exception as e: + print(f"发送通知到渠道 {channel_id} 失败: {e}") + + # 发送到企微应用 + if task.notify_wecom_app_id: + try: + await self._send_to_wecom_app(db, task.notify_wecom_app_id, content, title, task.tenant_id) + except Exception as e: + print(f"发送企微应用消息失败: {e}") + + async def _send_to_channel(self, channel: NotificationChannel, content: str, title: str): + """发送消息到通知渠道""" + if channel.channel_type == 'dingtalk_bot': + payload = { + "msgtype": "markdown", + "markdown": { + "title": title, + "text": content + } + } + else: # wecom_bot + payload = { + "msgtype": "markdown", + "markdown": { + "content": f"**{title}**\n\n{content}" + } + } + + async with httpx.AsyncClient(timeout=10) as client: + response = await client.post(channel.webhook_url, json=payload) + result = response.json() + if result.get('errcode') != 0: + print(f"通知发送失败: {result}") + + async def _send_to_wecom_app(self, db: Session, app_id: int, content: str, title: str, tenant_id: str): + """发送消息到企微应用""" + from ..models.tenant_wechat_app import TenantWechatApp + + app = db.query(TenantWechatApp).filter(TenantWechatApp.id == app_id).first() + if not app: + return + + # 获取 access_token + access_token = await self._get_wecom_access_token(app.corp_id, app.app_secret) + if not access_token: + return + + # 发送消息 + url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}" + payload = { + "touser": "@all", + "msgtype": "markdown", + "agentid": app.agent_id, + "markdown": { + "content": f"**{title}**\n\n{content}" + } + } + + async with httpx.AsyncClient(timeout=10) as client: + response = await client.post(url, json=payload) + result = response.json() + if result.get('errcode') != 0: + print(f"企微应用消息发送失败: {result}") + + async def _get_wecom_access_token(self, corp_id: str, app_secret: str) -> Optional[str]: + """获取企微 access_token""" + url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corp_id}&corpsecret={app_secret}" + + async with httpx.AsyncClient(timeout=10) as client: + response = await client.get(url) + result = response.json() + if result.get('errcode') == 0: + return result.get('access_token') + else: + print(f"获取企微 access_token 失败: {result}") + return None + async def _send_alert(self, task: ScheduledTask, error: str): """发送失败告警""" if not task.alert_webhook: diff --git a/backend/app/services/script_executor.py b/backend/app/services/script_executor.py index db217df..03ad805 100644 --- a/backend/app/services/script_executor.py +++ b/backend/app/services/script_executor.py @@ -49,7 +49,7 @@ class ScriptExecutor: trace_id: Optional[str] = None, params: Optional[Dict[str, Any]] = None, timeout: int = 300 - ) -> Tuple[bool, str, str]: + ) -> Tuple[bool, str, str, Optional[Dict]]: """执行脚本 Args: @@ -61,7 +61,8 @@ class ScriptExecutor: timeout: 超时秒数 Returns: - (success, output, error) + (success, output, error, result) + result: 脚本返回值 {'content': '...', 'title': '...'} """ # 创建SDK实例 sdk = ScriptSDK( @@ -75,7 +76,7 @@ class ScriptExecutor: # 检查脚本安全性 check_result = self._check_script_safety(script_content) if check_result: - return False, '', f"脚本安全检查失败: {check_result}" + return False, '', f"脚本安全检查失败: {check_result}", None # 准备执行环境 safe_globals = self._create_safe_globals(sdk) @@ -101,11 +102,22 @@ class ScriptExecutor: # 合并输出 output = '\n'.join(filter(None, [sdk_output, stdout_output])) - return True, output, '' + # 获取脚本返回值(通过 __result__ 变量) + result = safe_globals.get('__result__') + if result is None and 'result' in safe_globals: + result = safe_globals.get('result') + + # 如果返回的是字符串,包装成字典 + if isinstance(result, str): + result = {'content': result} + elif result is not None and not isinstance(result, dict): + result = {'content': str(result)} + + return True, output, '', result except Exception as e: error_msg = f"{type(e).__name__}: {str(e)}\n{traceback.format_exc()}" - return False, sdk.get_output(), error_msg + return False, sdk.get_output(), error_msg, None finally: sys.stdout = old_stdout @@ -223,12 +235,12 @@ class ScriptExecutor: "output": str, "error": str, "duration_ms": int, - "logs": [...] + "result": dict } """ start_time = datetime.now() - success, output, error = self.execute( + success, output, error, result = self.execute( script_content=script_content, task_id=task_id, tenant_id=tenant_id, @@ -242,5 +254,6 @@ class ScriptExecutor: "success": success, "output": output, "error": error, - "duration_ms": duration_ms + "duration_ms": duration_ms, + "result": result } diff --git a/frontend/src/components/Layout.vue b/frontend/src/components/Layout.vue index 0b10a34..5ef6773 100644 --- a/frontend/src/components/Layout.vue +++ b/frontend/src/components/Layout.vue @@ -18,7 +18,8 @@ const menuItems = computed(() => { { path: '/app-config', title: '租户订阅', icon: 'Setting' }, { path: '/stats', title: '统计分析', icon: 'TrendCharts' }, { path: '/logs', title: '日志查看', icon: 'Document' }, - { path: '/scheduled-tasks', title: '定时任务', icon: 'Clock' } + { path: '/scheduled-tasks', title: '定时任务', icon: 'Clock' }, + { path: '/notification-channels', title: '通知渠道', icon: 'Bell' } ] // 管理员才能看到用户管理 diff --git a/frontend/src/router/index.js b/frontend/src/router/index.js index 4ec95bd..7b80ada 100644 --- a/frontend/src/router/index.js +++ b/frontend/src/router/index.js @@ -78,6 +78,12 @@ const routes = [ name: 'ScheduledTasks', component: () => import('@/views/scheduled-tasks/index.vue'), meta: { title: '定时任务', icon: 'Clock' } + }, + { + path: 'notification-channels', + name: 'NotificationChannels', + component: () => import('@/views/notification-channels/index.vue'), + meta: { title: '通知渠道', icon: 'Bell' } } ] } diff --git a/frontend/src/views/notification-channels/index.vue b/frontend/src/views/notification-channels/index.vue new file mode 100644 index 0000000..f4e3a7a --- /dev/null +++ b/frontend/src/views/notification-channels/index.vue @@ -0,0 +1,308 @@ + + + + + diff --git a/frontend/src/views/scheduled-tasks/index.vue b/frontend/src/views/scheduled-tasks/index.vue index aba5c62..e094af1 100644 --- a/frontend/src/views/scheduled-tasks/index.vue +++ b/frontend/src/views/scheduled-tasks/index.vue @@ -16,6 +16,10 @@ const query = reactive({ // 租户列表 const tenants = ref([]) +// 通知渠道和企微应用 +const notifyChannels = ref([]) +const wecomApps = ref([]) + // 对话框 const dialogVisible = ref(false) const dialogTitle = ref('') @@ -40,16 +44,19 @@ const form = reactive({ log('任务开始执行') -# 示例:获取参数 +# 获取参数 prompt = get_param('prompt', '默认提示词') -log(f'参数 prompt: {prompt}') +log(f'参数: {prompt}') -# 示例:调用 AI -# result = ai(prompt, system='你是一个助手') -# log(f'AI 返回: {result}') +# 调用 AI 生成内容 +content = ai(prompt, system='你是一个助手') +log(f'生成内容: {content[:50]}...') -# 示例:发送通知 -# dingtalk('webhook_url', '消息内容') +# 设置返回值(会自动发送到配置的通知渠道) +result = { + 'content': content, + 'title': '每日推送' +} log('任务执行完成') `, @@ -58,7 +65,9 @@ log('任务执行完成') retry_count: 0, retry_interval: 60, alert_on_failure: false, - alert_webhook: '' + alert_webhook: '', + notify_channels: [], + notify_wecom_app_id: null }) const rules = { @@ -121,6 +130,26 @@ async function fetchTenants() { } } +async function fetchNotifyChannels(tenantId) { + try { + const params = tenantId ? { tenant_id: tenantId } : {} + const res = await api.get('/api/notification-channels', { params }) + notifyChannels.value = res.data.items || [] + } catch (e) { + console.error(e) + } +} + +async function fetchWecomApps(tenantId) { + try { + const params = tenantId ? { tenant_id: tenantId } : {} + const res = await api.get('/api/tenant-wechat-apps', { params }) + wecomApps.value = res.data.items || [] + } catch (e) { + console.error(e) + } +} + function handleSearch() { query.page = 1 fetchList() @@ -149,7 +178,12 @@ log('任务开始执行') # 获取参数 prompt = get_param('prompt', '默认提示词') -log(f'参数: {prompt}') + +# 调用 AI 生成内容 +content = ai(prompt, system='你是一个助手') + +# 设置返回值(自动发送到通知渠道) +result = {'content': content, 'title': '推送通知'} log('任务执行完成') `, @@ -158,8 +192,12 @@ log('任务执行完成') retry_count: 0, retry_interval: 60, alert_on_failure: false, - alert_webhook: '' + alert_webhook: '', + notify_channels: [], + notify_wecom_app_id: null }) + notifyChannels.value = [] + wecomApps.value = [] dialogVisible.value = true } @@ -186,8 +224,15 @@ async function handleEdit(row) { retry_count: task.retry_count || 0, retry_interval: task.retry_interval || 60, alert_on_failure: task.alert_on_failure || false, - alert_webhook: task.alert_webhook || '' + alert_webhook: task.alert_webhook || '', + notify_channels: task.notify_channels || [], + notify_wecom_app_id: task.notify_wecom_app_id || null }) + // 加载该租户的通知渠道和企微应用 + if (task.tenant_id) { + await fetchNotifyChannels(task.tenant_id) + await fetchWecomApps(task.tenant_id) + } dialogVisible.value = true } catch (e) { console.error(e) @@ -570,6 +615,57 @@ onMounted(() => { + + + 通知配置(脚本设置 result 变量后自动发送) + + +
+ 请先选择租户,再配置通知渠道 +
+ +
+ + +
+ 请先选择租户 +
+ +