From 9b72e6127f9c7440838270dd00c192fd1fd27829 Mon Sep 17 00:00:00 2001 From: Admin Date: Wed, 28 Jan 2026 11:59:50 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=84=9A=E6=9C=AC=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=B9=B3=E5=8F=B0=E5=A2=9E=E5=BC=BA=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增重试和失败告警功能(支持自动重试N次,失败后钉钉/企微通知) - 新增密钥管理(安全存储API Key等敏感信息) - 新增脚本模板库(预置常用脚本模板) - 新增脚本版本管理(自动保存历史版本,支持回滚) - 新增执行统计(成功率、平均耗时、7日趋势) - SDK 新增多租户遍历能力(get_tenants/get_tenant_config/get_all_tenant_configs) - SDK 新增密钥读取方法(get_secret) --- backend/app/routers/tasks.py | 508 ++++++++++++++++++- backend/app/services/scheduler.py | 79 ++- backend/app/services/script_sdk.py | 168 ++++++ frontend/src/views/scheduled-tasks/index.vue | 415 ++++++++++++++- 4 files changed, 1142 insertions(+), 28 deletions(-) diff --git a/backend/app/routers/tasks.py b/backend/app/routers/tasks.py index e48aede..8e1e4c1 100644 --- a/backend/app/routers/tasks.py +++ b/backend/app/routers/tasks.py @@ -33,6 +33,11 @@ class TaskCreate(BaseModel): input_params: Optional[dict] = None script_content: Optional[str] = None is_enabled: bool = True + # 重试和告警 + retry_count: int = 0 + retry_interval: int = 60 + alert_on_failure: bool = False + alert_webhook: Optional[str] = None class TaskUpdate(BaseModel): @@ -45,6 +50,11 @@ class TaskUpdate(BaseModel): webhook_url: Optional[str] = None input_params: Optional[dict] = None script_content: Optional[str] = None + # 重试和告警 + retry_count: Optional[int] = None + retry_interval: Optional[int] = None + alert_on_failure: Optional[bool] = None + alert_webhook: Optional[str] = None class ScriptTestRequest(BaseModel): @@ -52,6 +62,36 @@ class ScriptTestRequest(BaseModel): script_content: str +# 密钥管理 +class SecretCreate(BaseModel): + tenant_id: Optional[str] = None # None 表示全局 + secret_key: str + secret_value: str + description: Optional[str] = None + + +class SecretUpdate(BaseModel): + secret_value: Optional[str] = None + description: Optional[str] = None + + +# 脚本模板 +class TemplateCreate(BaseModel): + name: str + description: Optional[str] = None + category: Optional[str] = None + script_content: str + is_public: bool = True + + +class TemplateUpdate(BaseModel): + name: Optional[str] = None + description: Optional[str] = None + category: Optional[str] = None + script_content: Optional[str] = None + is_public: Optional[bool] = None + + # API Endpoints @router.get("") @@ -136,9 +176,11 @@ async def create_task( text(""" INSERT INTO platform_scheduled_tasks (tenant_id, task_name, task_desc, schedule_type, time_points, - cron_expression, execution_type, webhook_url, input_params, script_content, is_enabled) + cron_expression, execution_type, webhook_url, input_params, script_content, is_enabled, + retry_count, retry_interval, alert_on_failure, alert_webhook) VALUES (:tenant_id, :task_name, :task_desc, :schedule_type, :time_points, - :cron_expression, :execution_type, :webhook_url, :input_params, :script_content, :is_enabled) + :cron_expression, :execution_type, :webhook_url, :input_params, :script_content, :is_enabled, + :retry_count, :retry_interval, :alert_on_failure, :alert_webhook) """), { "tenant_id": data.tenant_id, @@ -151,7 +193,11 @@ async def create_task( "webhook_url": data.webhook_url, "input_params": input_params_json, "script_content": data.script_content, - "is_enabled": 1 if data.is_enabled else 0 + "is_enabled": 1 if data.is_enabled else 0, + "retry_count": data.retry_count, + "retry_interval": data.retry_interval, + "alert_on_failure": 1 if data.alert_on_failure else 0, + "alert_webhook": data.alert_webhook } ) db.commit() @@ -240,6 +286,42 @@ async def update_task( if data.input_params is not None: updates.append("input_params = :input_params") params["input_params"] = json.dumps(data.input_params) + if data.retry_count is not None: + updates.append("retry_count = :retry_count") + params["retry_count"] = data.retry_count + if data.retry_interval is not None: + updates.append("retry_interval = :retry_interval") + params["retry_interval"] = data.retry_interval + if data.alert_on_failure is not None: + updates.append("alert_on_failure = :alert_on_failure") + params["alert_on_failure"] = 1 if data.alert_on_failure else 0 + if data.alert_webhook is not None: + updates.append("alert_webhook = :alert_webhook") + params["alert_webhook"] = data.alert_webhook + + # 如果更新了脚本内容,自动保存版本 + if data.script_content is not None and data.script_content.strip(): + # 获取当前最大版本号 + version_result = db.execute( + text("SELECT COALESCE(MAX(version), 0) FROM platform_script_versions WHERE task_id = :task_id"), + {"task_id": task_id} + ) + max_version = version_result.scalar() or 0 + new_version = max_version + 1 + + # 插入新版本 + db.execute( + text(""" + INSERT INTO platform_script_versions (task_id, version, script_content, created_by) + VALUES (:task_id, :version, :script_content, :created_by) + """), + { + "task_id": task_id, + "version": new_version, + "script_content": data.script_content, + "created_by": user.username if hasattr(user, 'username') else None + } + ) if updates: db.execute( @@ -450,25 +532,415 @@ async def get_sdk_docs(): "name": "log(message, level='INFO')", "description": "记录日志", "example": "log('任务执行完成')" + }, + { + "name": "get_tenants(app_code=None)", + "description": "获取租户列表(多租户任务遍历)", + "example": "tenants = get_tenants('review-generator')" + }, + { + "name": "get_tenant_config(tenant_id, app_code, key=None)", + "description": "获取指定租户的应用配置", + "example": "webhook = get_tenant_config('tenant1', 'my-app', 'dingtalk_webhook')" + }, + { + "name": "get_all_tenant_configs(app_code)", + "description": "批量获取所有租户的应用配置", + "example": "all_configs = get_all_tenant_configs('my-app')" + }, + { + "name": "get_secret(key)", + "description": "获取密钥(优先租户级,其次全局)", + "example": "api_key = get_secret('openai_api_key')" } ], - "example_script": '''# 示例:每日推送 AI 生成的内容到钉钉 + "example_script": '''# 示例:多租户批量推送 import json -# 获取历史数据 -history = get_var('history', []) +# 获取所有订阅了某应用的租户配置 +tenants = get_all_tenant_configs('daily-report') -# 调用 AI 生成内容 -prompt = f"根据以下信息生成今日营销文案:{json.dumps(history[-5:], ensure_ascii=False)}" -content = ai(prompt, system="你是一个专业的营销文案专家") - -# 发送到钉钉 -dingtalk( - webhook="你的钉钉机器人Webhook", - content=content -) - -# 记录日志 -log(f"已发送: {content[:50]}...") +for tenant in tenants: + tenant_id = tenant['tenant_id'] + tenant_name = tenant['tenant_name'] + configs = tenant['configs'] + + # 获取该租户的钉钉 Webhook + webhook = configs.get('dingtalk_webhook') + if not webhook: + log(f"租户 {tenant_name} 未配置 webhook,跳过") + continue + + # 调用 AI 生成内容 + content = ai(f"为 {tenant_name} 生成今日报告", system="你是报告生成专家") + + # 发送 + dingtalk(webhook=webhook, content=content) + log(f"已发送给租户: {tenant_name}") ''' } + + +# ============ 密钥管理 ============ + +@router.get("/secrets") +async def list_secrets( + tenant_id: Optional[str] = None, + user: User = Depends(require_operator), + db: Session = Depends(get_db) +): + """获取密钥列表""" + if tenant_id: + result = db.execute( + text("SELECT id, tenant_id, secret_key, description, created_at FROM platform_secrets WHERE tenant_id = :tenant_id OR tenant_id IS NULL ORDER BY tenant_id, secret_key"), + {"tenant_id": tenant_id} + ) + else: + result = db.execute( + text("SELECT id, tenant_id, secret_key, description, created_at FROM platform_secrets ORDER BY tenant_id, secret_key") + ) + secrets = [dict(row) for row in result.mappings().all()] + return {"items": secrets} + + +@router.post("/secrets") +async def create_secret( + data: SecretCreate, + user: User = Depends(require_operator), + db: Session = Depends(get_db) +): + """创建密钥""" + # 检查是否已存在 + result = db.execute( + text("SELECT id FROM platform_secrets WHERE tenant_id <=> :tenant_id AND secret_key = :key"), + {"tenant_id": data.tenant_id, "key": data.secret_key} + ) + if result.scalar(): + raise HTTPException(status_code=400, detail="密钥已存在") + + db.execute( + text(""" + INSERT INTO platform_secrets (tenant_id, secret_key, secret_value, description) + VALUES (:tenant_id, :key, :value, :desc) + """), + { + "tenant_id": data.tenant_id, + "key": data.secret_key, + "value": data.secret_value, + "desc": data.description + } + ) + db.commit() + return {"message": "创建成功"} + + +@router.put("/secrets/{secret_id}") +async def update_secret( + secret_id: int, + data: SecretUpdate, + user: User = Depends(require_operator), + db: Session = Depends(get_db) +): + """更新密钥""" + updates = [] + params = {"id": secret_id} + + if data.secret_value is not None: + updates.append("secret_value = :value") + params["value"] = data.secret_value + if data.description is not None: + updates.append("description = :desc") + params["desc"] = data.description + + if updates: + db.execute( + text(f"UPDATE platform_secrets SET {', '.join(updates)} WHERE id = :id"), + params + ) + db.commit() + + return {"message": "更新成功"} + + +@router.delete("/secrets/{secret_id}") +async def delete_secret( + secret_id: int, + user: User = Depends(require_operator), + db: Session = Depends(get_db) +): + """删除密钥""" + db.execute(text("DELETE FROM platform_secrets WHERE id = :id"), {"id": secret_id}) + db.commit() + return {"message": "删除成功"} + + +# ============ 脚本模板 ============ + +@router.get("/templates") +async def list_templates( + category: Optional[str] = None, + user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """获取模板列表""" + if category: + result = db.execute( + text("SELECT id, name, description, category, is_public, created_by, created_at FROM platform_script_templates WHERE category = :category ORDER BY id DESC"), + {"category": category} + ) + else: + result = db.execute( + text("SELECT id, name, description, category, is_public, created_by, created_at FROM platform_script_templates ORDER BY id DESC") + ) + templates = [dict(row) for row in result.mappings().all()] + return {"items": templates} + + +@router.get("/templates/{template_id}") +async def get_template( + template_id: int, + user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """获取模板详情""" + result = db.execute( + text("SELECT * FROM platform_script_templates WHERE id = :id"), + {"id": template_id} + ) + template = result.mappings().first() + if not template: + raise HTTPException(status_code=404, detail="模板不存在") + return dict(template) + + +@router.post("/templates") +async def create_template( + data: TemplateCreate, + user: User = Depends(require_operator), + db: Session = Depends(get_db) +): + """创建模板""" + db.execute( + text(""" + INSERT INTO platform_script_templates (name, description, category, script_content, is_public, created_by) + VALUES (:name, :desc, :category, :content, :is_public, :created_by) + """), + { + "name": data.name, + "desc": data.description, + "category": data.category, + "content": data.script_content, + "is_public": 1 if data.is_public else 0, + "created_by": user.username if hasattr(user, 'username') else None + } + ) + db.commit() + + result = db.execute(text("SELECT LAST_INSERT_ID() as id")) + template_id = result.scalar() + + return {"id": template_id, "message": "创建成功"} + + +@router.put("/templates/{template_id}") +async def update_template( + template_id: int, + data: TemplateUpdate, + user: User = Depends(require_operator), + db: Session = Depends(get_db) +): + """更新模板""" + updates = [] + params = {"id": template_id} + + if data.name is not None: + updates.append("name = :name") + params["name"] = data.name + if data.description is not None: + updates.append("description = :desc") + params["desc"] = data.description + if data.category is not None: + updates.append("category = :category") + params["category"] = data.category + if data.script_content is not None: + updates.append("script_content = :content") + params["content"] = data.script_content + if data.is_public is not None: + updates.append("is_public = :is_public") + params["is_public"] = 1 if data.is_public else 0 + + if updates: + db.execute( + text(f"UPDATE platform_script_templates SET {', '.join(updates)} WHERE id = :id"), + params + ) + db.commit() + + return {"message": "更新成功"} + + +@router.delete("/templates/{template_id}") +async def delete_template( + template_id: int, + user: User = Depends(require_operator), + db: Session = Depends(get_db) +): + """删除模板""" + db.execute(text("DELETE FROM platform_script_templates WHERE id = :id"), {"id": template_id}) + db.commit() + return {"message": "删除成功"} + + +# ============ 脚本版本管理 ============ + +@router.get("/{task_id}/versions") +async def list_versions( + task_id: int, + user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """获取任务的脚本版本列表""" + result = db.execute( + text("SELECT id, version, change_note, created_by, created_at FROM platform_script_versions WHERE task_id = :task_id ORDER BY version DESC"), + {"task_id": task_id} + ) + versions = [dict(row) for row in result.mappings().all()] + return {"items": versions} + + +@router.get("/{task_id}/versions/{version}") +async def get_version( + task_id: int, + version: int, + user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """获取指定版本的脚本内容""" + result = db.execute( + text("SELECT * FROM platform_script_versions WHERE task_id = :task_id AND version = :version"), + {"task_id": task_id, "version": version} + ) + ver = result.mappings().first() + if not ver: + raise HTTPException(status_code=404, detail="版本不存在") + return dict(ver) + + +@router.post("/{task_id}/versions/{version}/rollback") +async def rollback_version( + task_id: int, + version: int, + user: User = Depends(require_operator), + db: Session = Depends(get_db) +): + """回滚到指定版本""" + # 获取指定版本的脚本内容 + result = db.execute( + text("SELECT script_content FROM platform_script_versions WHERE task_id = :task_id AND version = :version"), + {"task_id": task_id, "version": version} + ) + ver = result.first() + if not ver: + raise HTTPException(status_code=404, detail="版本不存在") + + script_content = ver[0] + + # 更新任务脚本 + db.execute( + text("UPDATE platform_scheduled_tasks SET script_content = :content WHERE id = :id"), + {"id": task_id, "content": script_content} + ) + + # 创建新版本记录 + version_result = db.execute( + text("SELECT COALESCE(MAX(version), 0) FROM platform_script_versions WHERE task_id = :task_id"), + {"task_id": task_id} + ) + max_version = version_result.scalar() or 0 + new_version = max_version + 1 + + db.execute( + text(""" + INSERT INTO platform_script_versions (task_id, version, script_content, change_note, created_by) + VALUES (:task_id, :version, :content, :note, :created_by) + """), + { + "task_id": task_id, + "version": new_version, + "content": script_content, + "note": f"回滚到版本 {version}", + "created_by": user.username if hasattr(user, 'username') else None + } + ) + db.commit() + + # 重新加载任务 + reload_task(task_id) + + return {"message": f"已回滚到版本 {version},当前版本号 {new_version}"} + + +# ============ 统计数据 ============ + +@router.get("/{task_id}/stats") +async def get_task_stats( + task_id: int, + user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """获取任务执行统计""" + # 总执行次数 + total_result = db.execute( + text("SELECT COUNT(*) FROM platform_task_logs WHERE task_id = :task_id"), + {"task_id": task_id} + ) + total = total_result.scalar() or 0 + + # 成功次数 + success_result = db.execute( + text("SELECT COUNT(*) FROM platform_task_logs WHERE task_id = :task_id AND status = 'success'"), + {"task_id": task_id} + ) + success = success_result.scalar() or 0 + + # 失败次数 + failed = total - success + + # 成功率 + success_rate = round(success / total * 100, 1) if total > 0 else 0 + + # 平均耗时(成功的任务) + avg_result = db.execute( + text(""" + SELECT AVG(TIMESTAMPDIFF(SECOND, started_at, finished_at)) + FROM platform_task_logs + WHERE task_id = :task_id AND status = 'success' AND finished_at IS NOT NULL + """), + {"task_id": task_id} + ) + avg_duration = avg_result.scalar() + avg_duration = round(float(avg_duration), 1) if avg_duration else 0 + + # 最近7天趋势 + trend_result = db.execute( + text(""" + SELECT DATE(started_at) as date, + SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success, + SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed + FROM platform_task_logs + WHERE task_id = :task_id AND started_at >= DATE_SUB(CURDATE(), INTERVAL 7 DAY) + GROUP BY DATE(started_at) + ORDER BY date + """), + {"task_id": task_id} + ) + trend = [dict(row) for row in trend_result.mappings().all()] + + return { + "total": total, + "success": success, + "failed": failed, + "success_rate": success_rate, + "avg_duration": avg_duration, + "trend": trend + } diff --git a/backend/app/services/scheduler.py b/backend/app/services/scheduler.py index f03c059..91da8c5 100644 --- a/backend/app/services/scheduler.py +++ b/backend/app/services/scheduler.py @@ -31,10 +31,81 @@ def get_db_session() -> Session: return SessionLocal() +async def send_alert(webhook: str, task_name: str, error_message: str): + """发送失败告警通知""" + try: + # 自动判断钉钉或企微 + if "dingtalk" in webhook or "oapi.dingtalk.com" in webhook: + data = { + "msgtype": "markdown", + "markdown": { + "title": "定时任务执行失败", + "text": f"### ⚠️ 定时任务执行失败\n\n**任务名称**:{task_name}\n\n**错误信息**:{error_message[:500]}\n\n**时间**:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + } + } + else: + # 企微格式 + data = { + "msgtype": "markdown", + "markdown": { + "content": f"### ⚠️ 定时任务执行失败\n\n**任务名称**:{task_name}\n\n**错误信息**:{error_message[:500]}\n\n**时间**:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + } + } + + async with httpx.AsyncClient(timeout=30.0) as client: + await client.post(webhook, json=data) + logger.info(f"Alert sent for task {task_name}") + except Exception as e: + logger.warning(f"Failed to send alert: {e}") + + +async def execute_task_with_retry(task_id: int, retry_count: int = 0, max_retries: int = 0, retry_interval: int = 60): + """带重试的任务执行""" + success = await execute_task_once(task_id) + + if not success and retry_count < max_retries: + logger.info(f"Task {task_id} failed, scheduling retry {retry_count + 1}/{max_retries} in {retry_interval}s") + await asyncio.sleep(retry_interval) + await execute_task_with_retry(task_id, retry_count + 1, max_retries, retry_interval) + elif not success: + # 所有重试都失败,发送告警 + db = get_db_session() + try: + result = db.execute( + text("SELECT task_name, alert_on_failure, alert_webhook, last_run_message FROM platform_scheduled_tasks WHERE id = :id"), + {"id": task_id} + ) + task = result.mappings().first() + if task and task["alert_on_failure"] and task["alert_webhook"]: + await send_alert(task["alert_webhook"], task["task_name"], task["last_run_message"] or "未知错误") + finally: + db.close() + + async def execute_task(task_id: int): - """执行定时任务""" + """执行定时任务入口(处理重试配置)""" + db = get_db_session() + try: + result = db.execute( + text("SELECT retry_count, retry_interval FROM platform_scheduled_tasks WHERE id = :id"), + {"id": task_id} + ) + task = result.mappings().first() + if task: + max_retries = task.get("retry_count", 0) or 0 + retry_interval = task.get("retry_interval", 60) or 60 + await execute_task_with_retry(task_id, 0, max_retries, retry_interval) + else: + await execute_task_once(task_id) + finally: + db.close() + + +async def execute_task_once(task_id: int) -> bool: + """执行一次定时任务,返回是否成功""" db = get_db_session() log_id = None + success = False try: # 1. 查询任务配置 @@ -46,7 +117,7 @@ async def execute_task(task_id: int): if not task: logger.warning(f"Task {task_id} not found or disabled") - return + return True # 不需要重试 # 2. 更新任务状态为运行中 db.execute( @@ -161,9 +232,11 @@ async def execute_task(task_id: int): db.commit() logger.info(f"Task {task_id} executed with status: {status}") + success = (status == "success") except Exception as e: logger.error(f"Task {task_id} execution error: {str(e)}") + success = False # 更新失败状态 try: @@ -190,6 +263,8 @@ async def execute_task(task_id: int): pass finally: db.close() + + return success def add_task_to_scheduler(task: Dict[str, Any]): diff --git a/backend/app/services/script_sdk.py b/backend/app/services/script_sdk.py index 7e63886..b3e09bb 100644 --- a/backend/app/services/script_sdk.py +++ b/backend/app/services/script_sdk.py @@ -24,6 +24,8 @@ class ScriptSDK: - HTTP 请求 - 变量存储(跨执行持久化) - 日志记录 + - 多租户遍历 + - 密钥管理 """ def __init__(self, tenant_id: str, task_id: int, trace_id: str = None): @@ -32,6 +34,7 @@ class ScriptSDK: self.trace_id = trace_id or f"script_{task_id}_{datetime.now().strftime('%Y%m%d%H%M%S')}" self._logs: List[str] = [] self._db: Optional[Session] = None + self._tenants_cache: Optional[List[Dict]] = None # 租户列表缓存 def _get_db(self) -> Session: """获取数据库会话""" @@ -393,6 +396,171 @@ class ScriptSDK: self.log(f"删除变量失败: {str(e)}", level="ERROR") return False + # ============ 多租户遍历 ============ + + def get_tenants(self, app_code: str = None) -> List[Dict]: + """ + 获取租户列表(用于多租户任务遍历) + + Args: + app_code: 应用代码(可选),筛选订阅了该应用的租户 + + Returns: + 租户列表 [{"code": "xxx", "name": "租户名", "custom_configs": {...}}] + """ + db = self._get_db() + try: + if app_code: + # 筛选订阅了指定应用的租户 + result = db.execute( + text(""" + SELECT DISTINCT t.code, t.name, ta.custom_configs + FROM platform_tenants t + INNER JOIN platform_tenant_apps ta ON t.code = ta.tenant_id + WHERE ta.app_code = :app_code AND t.status = 1 + """), + {"app_code": app_code} + ) + else: + # 获取所有启用的租户 + result = db.execute( + text("SELECT code, name FROM platform_tenants WHERE status = 1") + ) + + tenants = [] + for row in result.mappings().all(): + tenant = dict(row) + # 解析 custom_configs + if "custom_configs" in tenant and tenant["custom_configs"]: + try: + tenant["custom_configs"] = json.loads(tenant["custom_configs"]) + except: + pass + tenants.append(tenant) + + self.log(f"获取租户列表成功,共 {len(tenants)} 个租户") + return tenants + except Exception as e: + self.log(f"获取租户列表失败: {str(e)}", level="ERROR") + return [] + + def get_tenant_config(self, tenant_id: str, app_code: str, key: str = None) -> Any: + """ + 获取指定租户的应用配置 + + Args: + tenant_id: 租户ID + app_code: 应用代码 + key: 配置项键名(可选,不传返回全部配置) + + Returns: + 配置值或配置字典 + """ + db = self._get_db() + try: + result = db.execute( + text(""" + SELECT custom_configs FROM platform_tenant_apps + WHERE tenant_id = :tenant_id AND app_code = :app_code + """), + {"tenant_id": tenant_id, "app_code": app_code} + ) + row = result.first() + + if not row or not row[0]: + return None if key else {} + + try: + configs = json.loads(row[0]) + except: + configs = {} + + if key: + return configs.get(key) + return configs + except Exception as e: + self.log(f"获取租户配置失败: {str(e)}", level="ERROR") + return None if key else {} + + def get_all_tenant_configs(self, app_code: str) -> List[Dict]: + """ + 获取所有租户的应用配置(便捷方法,用于批量操作) + + Args: + app_code: 应用代码 + + Returns: + [{"tenant_id": "xxx", "tenant_name": "租户名", "configs": {...}}] + """ + db = self._get_db() + try: + result = db.execute( + text(""" + SELECT t.code as tenant_id, t.name as tenant_name, ta.custom_configs + FROM platform_tenants t + INNER JOIN platform_tenant_apps ta ON t.code = ta.tenant_id + WHERE ta.app_code = :app_code AND t.status = 1 + """), + {"app_code": app_code} + ) + + tenants = [] + for row in result.mappings().all(): + configs = {} + if row["custom_configs"]: + try: + configs = json.loads(row["custom_configs"]) + except: + pass + tenants.append({ + "tenant_id": row["tenant_id"], + "tenant_name": row["tenant_name"], + "configs": configs + }) + + self.log(f"获取 {app_code} 应用的租户配置,共 {len(tenants)} 个") + return tenants + except Exception as e: + self.log(f"获取租户配置失败: {str(e)}", level="ERROR") + return [] + + # ============ 密钥管理 ============ + + def get_secret(self, key: str) -> Optional[str]: + """ + 获取密钥(优先读取租户级密钥,其次读取全局密钥) + + Args: + key: 密钥名称 + + Returns: + 密钥值(如不存在返回 None) + """ + db = self._get_db() + try: + # 优先查询租户级密钥 + result = db.execute( + text(""" + SELECT secret_value FROM platform_secrets + WHERE (tenant_id = :tenant_id OR tenant_id IS NULL) + AND secret_key = :key + ORDER BY tenant_id DESC + LIMIT 1 + """), + {"tenant_id": self.tenant_id, "key": key} + ) + row = result.first() + + if row: + self.log(f"获取密钥成功: {key}") + return row[0] + + self.log(f"密钥不存在: {key}", level="WARN") + return None + except Exception as e: + self.log(f"获取密钥失败: {str(e)}", level="ERROR") + return None + # ============ 日志 ============ def log(self, message: str, level: str = "INFO"): diff --git a/frontend/src/views/scheduled-tasks/index.vue b/frontend/src/views/scheduled-tasks/index.vue index d1dfc0a..a9efda9 100644 --- a/frontend/src/views/scheduled-tasks/index.vue +++ b/frontend/src/views/scheduled-tasks/index.vue @@ -36,9 +36,30 @@ const form = reactive({ webhook_url: '', input_params: '', script_content: '', - is_enabled: true + is_enabled: true, + // 重试和告警 + retry_count: 0, + retry_interval: 60, + alert_on_failure: false, + alert_webhook: '' }) +// 模板 +const templateList = ref([]) +const templateDialogVisible = ref(false) + +// 版本 +const versionsDialogVisible = ref(false) +const versionsLoading = ref(false) +const versionsList = ref([]) +const currentVersionTaskId = ref(null) + +// 统计 +const statsDialogVisible = ref(false) +const statsLoading = ref(false) +const statsData = ref(null) +const currentStatsTaskId = ref(null) + // 时间选择器 const newTimePoint = ref('') @@ -149,7 +170,11 @@ function handleCreate() { webhook_url: '', input_params: '', script_content: '', - is_enabled: true + is_enabled: true, + retry_count: 0, + retry_interval: 60, + alert_on_failure: false, + alert_webhook: '' }) newTimePoint.value = '' testResult.value = null @@ -170,7 +195,11 @@ function handleEdit(row) { webhook_url: row.webhook_url || '', input_params: row.input_params ? JSON.stringify(row.input_params, null, 2) : '', script_content: row.script_content || '', - is_enabled: row.is_enabled + is_enabled: row.is_enabled, + retry_count: row.retry_count || 0, + retry_interval: row.retry_interval || 60, + alert_on_failure: !!row.alert_on_failure, + alert_webhook: row.alert_webhook || '' }) newTimePoint.value = '' testResult.value = null @@ -237,7 +266,11 @@ async function handleSubmit() { webhook_url: form.execution_type === 'webhook' ? form.webhook_url : null, script_content: form.execution_type === 'script' ? form.script_content : null, input_params: inputParams, - is_enabled: form.is_enabled + is_enabled: form.is_enabled, + retry_count: form.retry_count, + retry_interval: form.retry_interval, + alert_on_failure: form.alert_on_failure, + alert_webhook: form.alert_webhook || null } try { @@ -379,6 +412,86 @@ function selectTenant(code) { handleSearch() } +// ============ 模板功能 ============ +async function fetchTemplates() { + try { + const res = await api.get('/api/scheduled-tasks/templates') + templateList.value = res.data.items || [] + } catch (e) { + console.error('获取模板列表失败:', e) + } +} + +function handleSelectTemplate() { + fetchTemplates() + templateDialogVisible.value = true +} + +function applyTemplate(template) { + form.script_content = template.script_content + templateDialogVisible.value = false + ElMessage.success(`已应用模板:${template.name}`) +} + +// ============ 版本功能 ============ +async function handleViewVersions(row) { + currentVersionTaskId.value = row.id + versionsLoading.value = true + versionsDialogVisible.value = true + + try { + const res = await api.get(`/api/scheduled-tasks/${row.id}/versions`) + versionsList.value = res.data.items || [] + } catch (e) { + console.error(e) + } finally { + versionsLoading.value = false + } +} + +async function handleRollback(version) { + await ElMessageBox.confirm(`确定回滚到版本 ${version} 吗?`, '版本回滚', { + type: 'warning' + }) + + try { + const res = await api.post(`/api/scheduled-tasks/${currentVersionTaskId.value}/versions/${version}/rollback`) + ElMessage.success(res.data.message) + versionsDialogVisible.value = false + fetchList() + } catch (e) { + // 错误已在拦截器处理 + } +} + +async function handleViewVersionContent(version) { + try { + const res = await api.get(`/api/scheduled-tasks/${currentVersionTaskId.value}/versions/${version}`) + ElMessageBox.alert(res.data.script_content, `版本 ${version} 脚本内容`, { + customClass: 'version-content-dialog', + dangerouslyUseHTMLString: false + }) + } catch (e) { + // 错误已在拦截器处理 + } +} + +// ============ 统计功能 ============ +async function handleViewStats(row) { + currentStatsTaskId.value = row.id + statsLoading.value = true + statsDialogVisible.value = true + + try { + const res = await api.get(`/api/scheduled-tasks/${row.id}/stats`) + statsData.value = res.data + } catch (e) { + console.error(e) + } finally { + statsLoading.value = false + } +} + onMounted(() => { fetchTenants() fetchList() @@ -485,9 +598,20 @@ onMounted(() => { {{ row.is_enabled ? '禁用' : '启用' }} - 日志 + + 统计 + + + 版本 + 删除 @@ -613,9 +737,14 @@ onMounted(() => { + 高级配置 + + +
+ 失败后重试 + + 次,间隔 + + +
+
+ + + + + 执行失败时发送通知 + + + + + +
+ 填写钉钉或企微机器人 Webhook 地址,任务执行失败时会自动发送告警通知 +
+
+ @@ -748,6 +906,109 @@ log('执行完成')" 关闭 + + + +
+ 暂无可用模板 +
+
+
+
{{ template.name }}
+
{{ template.description || '暂无描述' }}
+ {{ template.category }} +
+
+ +
+ + + + + + + + + + + + + + + + + + + + + + + +
+ +
+ +
@@ -897,4 +1158,142 @@ log('执行完成')" line-height: 1.5; overflow-x: auto; } + +/* 重试配置 */ +.retry-config { + display: flex; + align-items: center; + gap: 8px; + color: #606266; + font-size: 14px; +} + +/* 模板列表 */ +.template-list { + max-height: 400px; + overflow-y: auto; +} + +.template-item { + padding: 12px 16px; + border: 1px solid #e4e7ed; + border-radius: 8px; + margin-bottom: 8px; + cursor: pointer; + transition: all 0.2s; +} + +.template-item:hover { + border-color: #409eff; + background: #f5f7fa; +} + +.template-name { + font-weight: 600; + color: #303133; + margin-bottom: 4px; +} + +.template-desc { + font-size: 12px; + color: #909399; + margin-bottom: 8px; +} + +.empty-tip { + text-align: center; + color: #909399; + padding: 40px 0; +} + +/* 统计 */ +.stats-grid { + display: grid; + grid-template-columns: repeat(5, 1fr); + gap: 16px; + margin-bottom: 24px; +} + +.stats-item { + text-align: center; + padding: 16px; + background: #f5f7fa; + border-radius: 8px; +} + +.stats-item.success { + background: #f0f9eb; +} + +.stats-item.danger { + background: #fef0f0; +} + +.stats-value { + font-size: 24px; + font-weight: 600; + color: #303133; + margin-bottom: 4px; +} + +.stats-item.success .stats-value { + color: #67c23a; +} + +.stats-item.danger .stats-value { + color: #f56c6c; +} + +.stats-label { + font-size: 12px; + color: #909399; +} + +.stats-trend h4 { + margin: 0 0 12px; + color: #303133; +} + +.trend-chart { + display: flex; + gap: 8px; + align-items: flex-end; + height: 100px; +} + +.trend-day { + flex: 1; + display: flex; + flex-direction: column; + align-items: center; +} + +.trend-bars { + display: flex; + flex-direction: column; + width: 100%; + height: 80px; + background: #f5f7fa; + border-radius: 4px; + overflow: hidden; +} + +.trend-bar { + width: 100%; + transition: height 0.3s; +} + +.trend-bar.success { + background: #67c23a; +} + +.trend-bar.danger { + background: #f56c6c; +} + +.trend-date { + font-size: 10px; + color: #909399; + margin-top: 4px; +}