From 104487f0828f83bee66e1c6fe52a4120892cfd51 Mon Sep 17 00:00:00 2001 From: Admin Date: Wed, 28 Jan 2026 16:38:19 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E7=B3=BB=E7=BB=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 platform_scheduled_tasks, platform_task_logs, platform_script_vars, platform_secrets 数据库表 - 实现 ScriptSDK 提供 AI/通知/DB/HTTP/变量存储/参数获取等功能 - 实现安全的脚本执行器,支持沙箱环境和禁止危险操作 - 实现 APScheduler 调度服务,支持简单时间点和 CRON 表达式 - 新增定时任务 API 路由,包含 CRUD、执行、日志、密钥管理 - 新增定时任务前端页面,支持脚本编辑、测试运行、日志查看 --- backend/app/main.py | 25 +- backend/app/models/__init__.py | 7 +- backend/app/models/app.py | 5 - backend/app/models/scheduled_task.py | 96 ++ backend/app/models/tenant_app.py | 4 - backend/app/routers/apps.py | 38 - backend/app/routers/scripts.py | 325 ---- backend/app/routers/tasks.py | 1170 +++++-------- backend/app/routers/tenant_apps.py | 40 - backend/app/services/scheduler.py | 679 ++++---- backend/app/services/script_executor.py | 442 +++-- backend/app/services/script_sdk.py | 798 ++++----- frontend/src/components/Layout.vue | 5 +- frontend/src/components/MonacoEditor.vue | 142 -- frontend/src/router/index.js | 20 +- frontend/src/views/app-config/index.vue | 977 +---------- frontend/src/views/apps/index.vue | 191 +-- frontend/src/views/scheduled-tasks/index.vue | 1540 ++++++------------ frontend/src/views/scripts/index.vue | 772 --------- 19 files changed, 1870 insertions(+), 5406 deletions(-) create mode 100644 backend/app/models/scheduled_task.py delete mode 100644 backend/app/routers/scripts.py delete mode 100644 frontend/src/components/MonacoEditor.vue delete mode 100644 frontend/src/views/scripts/index.vue diff --git a/backend/app/main.py b/backend/app/main.py index 0c6d207..18869c7 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -14,12 +14,10 @@ from .routers.wechat import router as wechat_router from .routers.alerts import router as alerts_router from .routers.cost import router as cost_router from .routers.quota import router as quota_router -from .routers.tool_configs import router as tool_configs_router from .routers.tasks import router as tasks_router -from .routers.scripts import router as scripts_router from .middleware import TraceMiddleware, setup_exception_handlers, RequestLoggerMiddleware from .middleware.trace import setup_logging -from .services.scheduler import start_scheduler, shutdown_scheduler +from .services.scheduler import scheduler_service # 配置日志(包含 TraceID) setup_logging(level=logging.INFO, include_trace=True) @@ -70,29 +68,20 @@ app.include_router(wechat_router, prefix="/api") app.include_router(alerts_router, prefix="/api") app.include_router(cost_router, prefix="/api") app.include_router(quota_router, prefix="/api") -app.include_router(tool_configs_router, prefix="/api") -app.include_router(tasks_router, prefix="/api") -app.include_router(scripts_router, prefix="/api") +app.include_router(tasks_router) +# 应用生命周期事件 @app.on_event("startup") async def startup_event(): - """应用启动时初始化调度器""" - try: - start_scheduler() - logging.info("Scheduler started successfully") - except Exception as e: - logging.error(f"Failed to start scheduler: {e}") + """应用启动时启动调度器""" + scheduler_service.start() @app.on_event("shutdown") async def shutdown_event(): - """应用关闭时停止调度器""" - try: - shutdown_scheduler() - logging.info("Scheduler shutdown successfully") - except Exception as e: - logging.error(f"Failed to shutdown scheduler: {e}") + """应用关闭时关闭调度器""" + scheduler_service.shutdown() @app.get("/") diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index eeb6bdc..743cbf5 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -8,6 +8,7 @@ from .stats import AICallEvent, TenantUsageDaily from .logs import PlatformLog from .alert import AlertRule, AlertRecord, NotificationChannel from .pricing import ModelPricing, TenantBilling +from .scheduled_task import ScheduledTask, TaskLog, ScriptVar, Secret __all__ = [ "Tenant", @@ -24,5 +25,9 @@ __all__ = [ "AlertRecord", "NotificationChannel", "ModelPricing", - "TenantBilling" + "TenantBilling", + "ScheduledTask", + "TaskLog", + "ScriptVar", + "Secret" ] diff --git a/backend/app/models/app.py b/backend/app/models/app.py index caade74..f388851 100644 --- a/backend/app/models/app.py +++ b/backend/app/models/app.py @@ -18,11 +18,6 @@ class App(Base): # [{"code": "brainstorm", "name": "头脑风暴", "path": "/brainstorm"}, ...] tools = Column(Text) - # 配置项定义(JSON 数组)- 定义租户可配置的参数 - # [{"key": "industry", "label": "行业类型", "type": "radio", "options": [...], "default": "...", "required": false}, ...] - # type: text(文本) | radio(单选) | select(下拉多选) | switch(开关) - config_schema = Column(Text) - # 是否需要企微JS-SDK require_jssdk = Column(SmallInteger, default=0) # 0-不需要 1-需要 diff --git a/backend/app/models/scheduled_task.py b/backend/app/models/scheduled_task.py new file mode 100644 index 0000000..1d610b0 --- /dev/null +++ b/backend/app/models/scheduled_task.py @@ -0,0 +1,96 @@ +"""定时任务相关模型""" +from datetime import datetime +from sqlalchemy import Column, BigInteger, Integer, String, Text, Enum, SmallInteger, TIMESTAMP, DateTime +from ..database import Base + + +class ScheduledTask(Base): + """定时任务表""" + __tablename__ = "platform_scheduled_tasks" + + id = Column(Integer, primary_key=True, autoincrement=True) + tenant_id = Column(String(50)) + task_name = Column(String(100), nullable=False) + task_type = Column(Enum('webhook', 'script'), nullable=False, default='script') + + # 调度配置 + schedule_type = Column(Enum('simple', 'cron'), nullable=False, default='simple') + time_points = Column(Text) # JSON数组 ["08:00", "12:00"] + cron_expression = Column(String(100)) + + # Webhook配置 + webhook_url = Column(String(500)) + webhook_method = Column(String(10), default='POST') + webhook_headers = Column(Text) # JSON格式 + + # 脚本配置 + script_content = Column(Text) + script_timeout = Column(Integer, default=300) + + # 输入参数 + input_params = Column(Text) # JSON格式 + + # 重试配置 + retry_count = Column(Integer, default=0) + retry_interval = Column(Integer, default=60) + + # 告警配置 + alert_on_failure = Column(SmallInteger, default=0) + alert_webhook = Column(String(500)) + + # 状态 + status = Column(SmallInteger, default=1) # 0-禁用 1-启用 + last_run_at = Column(DateTime) + last_run_status = Column(String(20)) + + created_at = Column(TIMESTAMP, default=datetime.now) + updated_at = Column(TIMESTAMP, default=datetime.now, onupdate=datetime.now) + + +class TaskLog(Base): + """任务执行日志""" + __tablename__ = "platform_task_logs" + + id = Column(BigInteger, primary_key=True, autoincrement=True) + task_id = Column(Integer, nullable=False) + tenant_id = Column(String(50)) + trace_id = Column(String(100)) + + status = Column(Enum('running', 'success', 'failed'), nullable=False) + started_at = Column(DateTime, nullable=False) + finished_at = Column(DateTime) + duration_ms = Column(Integer) + + output = Column(Text) + error = Column(Text) + retry_count = Column(Integer, default=0) + + created_at = Column(TIMESTAMP, default=datetime.now) + + +class ScriptVar(Base): + """脚本变量存储""" + __tablename__ = "platform_script_vars" + + id = Column(Integer, primary_key=True, autoincrement=True) + task_id = Column(Integer, nullable=False) + tenant_id = Column(String(50)) + var_key = Column(String(100), nullable=False) + var_value = Column(Text) # JSON格式 + + created_at = Column(TIMESTAMP, default=datetime.now) + updated_at = Column(TIMESTAMP, default=datetime.now, onupdate=datetime.now) + + +class Secret(Base): + """密钥管理""" + __tablename__ = "platform_secrets" + + id = Column(Integer, primary_key=True, autoincrement=True) + tenant_id = Column(String(50)) # NULL为全局 + secret_key = Column(String(100), nullable=False) + secret_value = Column(Text, nullable=False) + description = Column(String(255)) + + created_at = Column(TIMESTAMP, default=datetime.now) + updated_at = Column(TIMESTAMP, default=datetime.now, onupdate=datetime.now) diff --git a/backend/app/models/tenant_app.py b/backend/app/models/tenant_app.py index 62e6ef5..1e77a81 100644 --- a/backend/app/models/tenant_app.py +++ b/backend/app/models/tenant_app.py @@ -23,10 +23,6 @@ class TenantApp(Base): # 功能权限 allowed_tools = Column(Text) # JSON 数组 - # 自定义配置(JSON 数组) - # [{"key": "industry", "value": "medical_beauty", "remark": "医美行业"}, ...] - custom_configs = Column(Text) - status = Column(SmallInteger, default=1) created_at = Column(TIMESTAMP, default=datetime.now) updated_at = Column(TIMESTAMP, default=datetime.now, onupdate=datetime.now) diff --git a/backend/app/routers/apps.py b/backend/app/routers/apps.py index b1c6dd0..871d809 100644 --- a/backend/app/routers/apps.py +++ b/backend/app/routers/apps.py @@ -23,18 +23,6 @@ class ToolItem(BaseModel): path: str -class ConfigSchemaItem(BaseModel): - """配置项定义""" - key: str # 配置键 - label: str # 显示标签 - type: str # text | radio | select | switch - options: Optional[List[str]] = None # radio/select 的选项值 - option_labels: Optional[dict] = None # 选项显示名称 {"value": "显示名"} - default: Optional[str] = None # 默认值 - placeholder: Optional[str] = None # 输入提示(text类型) - required: bool = False # 是否必填 - - class AppCreate(BaseModel): """创建应用""" app_code: str @@ -42,7 +30,6 @@ class AppCreate(BaseModel): base_url: Optional[str] = None description: Optional[str] = None tools: Optional[List[ToolItem]] = None - config_schema: Optional[List[ConfigSchemaItem]] = None require_jssdk: bool = False @@ -52,7 +39,6 @@ class AppUpdate(BaseModel): base_url: Optional[str] = None description: Optional[str] = None tools: Optional[List[ToolItem]] = None - config_schema: Optional[List[ConfigSchemaItem]] = None require_jssdk: Optional[bool] = None status: Optional[int] = None @@ -133,7 +119,6 @@ async def create_app( base_url=data.base_url, description=data.description, tools=json.dumps([t.model_dump() for t in data.tools], ensure_ascii=False) if data.tools else None, - config_schema=json.dumps([c.model_dump() for c in data.config_schema], ensure_ascii=False) if data.config_schema else None, require_jssdk=1 if data.require_jssdk else 0, status=1 ) @@ -165,13 +150,6 @@ async def update_app( else: update_data['tools'] = None - # 处理 config_schema JSON - if 'config_schema' in update_data: - if update_data['config_schema']: - update_data['config_schema'] = json.dumps([c.model_dump() if hasattr(c, 'model_dump') else c for c in update_data['config_schema']], ensure_ascii=False) - else: - update_data['config_schema'] = None - # 处理 require_jssdk if 'require_jssdk' in update_data: update_data['require_jssdk'] = 1 if update_data['require_jssdk'] else 0 @@ -281,21 +259,6 @@ async def get_app_tools( return tools -@router.get("/{app_code}/config-schema") -async def get_app_config_schema( - app_code: str, - user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """获取应用的配置项定义(用于租户订阅时渲染表单)""" - app = db.query(App).filter(App.app_code == app_code).first() - if not app: - raise HTTPException(status_code=404, detail="应用不存在") - - config_schema = json.loads(app.config_schema) if app.config_schema else [] - return config_schema - - def format_app(app: App) -> dict: """格式化应用数据""" return { @@ -305,7 +268,6 @@ def format_app(app: App) -> dict: "base_url": app.base_url, "description": app.description, "tools": json.loads(app.tools) if app.tools else [], - "config_schema": json.loads(app.config_schema) if app.config_schema else [], "require_jssdk": bool(app.require_jssdk), "status": app.status, "created_at": app.created_at, diff --git a/backend/app/routers/scripts.py b/backend/app/routers/scripts.py deleted file mode 100644 index 9accc87..0000000 --- a/backend/app/routers/scripts.py +++ /dev/null @@ -1,325 +0,0 @@ -"""脚本管理路由""" -from fastapi import APIRouter, Depends, HTTPException, Query -from pydantic import BaseModel -from typing import Optional, List -from sqlalchemy.orm import Session -from sqlalchemy import text -from datetime import datetime - -from ..database import get_db -from .auth import get_current_user, require_operator -from ..models.user import User - -router = APIRouter(prefix="/scripts", tags=["脚本管理"]) - - -# Schemas - -class ScriptCreate(BaseModel): - tenant_id: Optional[str] = None - name: str - filename: Optional[str] = None - description: Optional[str] = None - script_content: str - category: Optional[str] = None - is_enabled: bool = True - - -class ScriptUpdate(BaseModel): - name: Optional[str] = None - filename: Optional[str] = None - description: Optional[str] = None - script_content: Optional[str] = None - category: Optional[str] = None - is_enabled: Optional[bool] = None - - -class ScriptRunRequest(BaseModel): - tenant_id: Optional[str] = None # 可指定以哪个租户身份运行 - - -# API Endpoints - -@router.get("") -async def list_scripts( - page: int = Query(1, ge=1), - size: int = Query(50, ge=1, le=200), - tenant_id: Optional[str] = None, - category: Optional[str] = None, - keyword: Optional[str] = None, - user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """获取脚本列表""" - where_clauses = [] - params = {} - - if tenant_id: - where_clauses.append("(tenant_id = :tenant_id OR tenant_id IS NULL)") - params["tenant_id"] = tenant_id - if category: - where_clauses.append("category = :category") - params["category"] = category - if keyword: - where_clauses.append("(name LIKE :keyword OR description LIKE :keyword)") - params["keyword"] = f"%{keyword}%" - - where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" - - # 查询总数 - count_result = db.execute( - text(f"SELECT COUNT(*) FROM platform_scripts WHERE {where_sql}"), - params - ) - total = count_result.scalar() - - # 查询列表 - params["offset"] = (page - 1) * size - params["limit"] = size - result = db.execute( - text(f""" - SELECT id, tenant_id, name, filename, description, category, - is_enabled, last_run_at, last_run_status, created_by, created_at, updated_at, - LENGTH(script_content) as content_length - FROM platform_scripts - WHERE {where_sql} - ORDER BY updated_at DESC, id DESC - LIMIT :limit OFFSET :offset - """), - params - ) - scripts = [dict(row) for row in result.mappings().all()] - - # 获取分类列表 - cat_result = db.execute( - text("SELECT DISTINCT category FROM platform_scripts WHERE category IS NOT NULL AND category != ''") - ) - categories = [row[0] for row in cat_result.fetchall()] - - return { - "total": total, - "page": page, - "size": size, - "items": scripts, - "categories": categories - } - - -@router.get("/{script_id}") -async def get_script( - script_id: int, - user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """获取脚本详情(包含内容)""" - result = db.execute( - text("SELECT * FROM platform_scripts WHERE id = :id"), - {"id": script_id} - ) - script = result.mappings().first() - - if not script: - raise HTTPException(status_code=404, detail="脚本不存在") - - return dict(script) - - -@router.post("") -async def create_script( - data: ScriptCreate, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): - """创建脚本""" - # 自动生成文件名 - filename = data.filename - if not filename and data.name: - # 转换为安全的文件名 - import re - safe_name = re.sub(r'[^\w\u4e00-\u9fff]', '_', data.name) - filename = f"{safe_name}.py" - - db.execute( - text(""" - INSERT INTO platform_scripts - (tenant_id, name, filename, description, script_content, category, is_enabled, created_by) - VALUES (:tenant_id, :name, :filename, :description, :script_content, :category, :is_enabled, :created_by) - """), - { - "tenant_id": data.tenant_id, - "name": data.name, - "filename": filename, - "description": data.description, - "script_content": data.script_content, - "category": data.category, - "is_enabled": 1 if data.is_enabled else 0, - "created_by": user.username if hasattr(user, 'username') else None - } - ) - db.commit() - - result = db.execute(text("SELECT LAST_INSERT_ID() as id")) - script_id = result.scalar() - - return {"id": script_id, "message": "创建成功"} - - -@router.put("/{script_id}") -async def update_script( - script_id: int, - data: ScriptUpdate, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): - """更新脚本""" - # 检查是否存在 - result = db.execute( - text("SELECT id FROM platform_scripts WHERE id = :id"), - {"id": script_id} - ) - if not result.scalar(): - raise HTTPException(status_code=404, detail="脚本不存在") - - updates = [] - params = {"id": script_id} - - if data.name is not None: - updates.append("name = :name") - params["name"] = data.name - if data.filename is not None: - updates.append("filename = :filename") - params["filename"] = data.filename - if data.description is not None: - updates.append("description = :description") - params["description"] = data.description - if data.script_content is not None: - updates.append("script_content = :script_content") - params["script_content"] = data.script_content - if data.category is not None: - updates.append("category = :category") - params["category"] = data.category - if data.is_enabled is not None: - updates.append("is_enabled = :is_enabled") - params["is_enabled"] = 1 if data.is_enabled else 0 - - if updates: - db.execute( - text(f"UPDATE platform_scripts SET {', '.join(updates)} WHERE id = :id"), - params - ) - db.commit() - - return {"message": "更新成功"} - - -@router.delete("/{script_id}") -async def delete_script( - script_id: int, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): - """删除脚本""" - result = db.execute( - text("SELECT id FROM platform_scripts WHERE id = :id"), - {"id": script_id} - ) - if not result.scalar(): - raise HTTPException(status_code=404, detail="脚本不存在") - - db.execute( - text("DELETE FROM platform_scripts WHERE id = :id"), - {"id": script_id} - ) - db.commit() - - return {"message": "删除成功"} - - -@router.post("/{script_id}/run") -async def run_script( - script_id: int, - data: ScriptRunRequest = None, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): - """执行脚本""" - from ..services.script_executor import test_script as run_test - - # 获取脚本 - result = db.execute( - text("SELECT * FROM platform_scripts WHERE id = :id"), - {"id": script_id} - ) - script = result.mappings().first() - - if not script: - raise HTTPException(status_code=404, detail="脚本不存在") - - if not script["script_content"]: - raise HTTPException(status_code=400, detail="脚本内容为空") - - # 确定租户ID - tenant_id = (data.tenant_id if data else None) or script["tenant_id"] or "system" - - # 执行脚本 - exec_result = await run_test( - tenant_id=tenant_id, - script_content=script["script_content"] - ) - - # 更新执行状态 - status = "success" if exec_result.success else "failed" - db.execute( - text(""" - UPDATE platform_scripts - SET last_run_at = NOW(), last_run_status = :status - WHERE id = :id - """), - {"id": script_id, "status": status} - ) - db.commit() - - return exec_result.to_dict() - - -@router.post("/{script_id}/copy") -async def copy_script( - script_id: int, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): - """复制脚本""" - # 获取原脚本 - result = db.execute( - text("SELECT * FROM platform_scripts WHERE id = :id"), - {"id": script_id} - ) - script = result.mappings().first() - - if not script: - raise HTTPException(status_code=404, detail="脚本不存在") - - # 创建副本 - new_name = f"{script['name']} - 副本" - db.execute( - text(""" - INSERT INTO platform_scripts - (tenant_id, name, filename, description, script_content, category, is_enabled, created_by) - VALUES (:tenant_id, :name, :filename, :description, :script_content, :category, 1, :created_by) - """), - { - "tenant_id": script["tenant_id"], - "name": new_name, - "filename": None, - "description": script["description"], - "script_content": script["script_content"], - "category": script["category"], - "created_by": user.username if hasattr(user, 'username') else None - } - ) - db.commit() - - result = db.execute(text("SELECT LAST_INSERT_ID() as id")) - new_id = result.scalar() - - return {"id": new_id, "message": "复制成功"} diff --git a/backend/app/routers/tasks.py b/backend/app/routers/tasks.py index 8e1e4c1..05c2630 100644 --- a/backend/app/routers/tasks.py +++ b/backend/app/routers/tasks.py @@ -1,70 +1,64 @@ -"""定时任务管理路由""" -import asyncio +"""定时任务API路由""" +import json +from datetime import datetime +from typing import Optional, List from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks from pydantic import BaseModel -from typing import Optional, List from sqlalchemy.orm import Session -from sqlalchemy import text +from sqlalchemy import desc from ..database import get_db -from .auth import get_current_user, require_operator -from ..models.user import User -from ..services.scheduler import ( - add_task_to_scheduler, - remove_task_from_scheduler, - reload_task, - execute_task -) - -router = APIRouter(prefix="/scheduled-tasks", tags=["定时任务"]) +from ..models.scheduled_task import ScheduledTask, TaskLog, Secret +from ..services.scheduler import scheduler_service +from ..services.script_executor import ScriptExecutor -# Schemas +router = APIRouter(prefix="/api/scheduled-tasks", tags=["scheduled-tasks"]) + + +# ==================== Schemas ==================== class TaskCreate(BaseModel): - tenant_id: str + tenant_id: Optional[str] = None task_name: str - task_desc: Optional[str] = None - schedule_type: str = "simple" # simple | cron - time_points: Optional[List[str]] = None # ["09:00", "14:00"] - cron_expression: Optional[str] = None # "0 9,14 * * *" - execution_type: str = "webhook" # webhook | script + task_type: str = 'script' + schedule_type: str = 'simple' + time_points: Optional[List[str]] = None + cron_expression: Optional[str] = None webhook_url: Optional[str] = None - input_params: Optional[dict] = None + webhook_method: Optional[str] = 'POST' + webhook_headers: Optional[dict] = None script_content: Optional[str] = None - is_enabled: bool = True - # 重试和告警 - retry_count: int = 0 - retry_interval: int = 60 - alert_on_failure: bool = False + script_timeout: Optional[int] = 300 + input_params: Optional[dict] = None + retry_count: Optional[int] = 0 + retry_interval: Optional[int] = 60 + alert_on_failure: Optional[bool] = False alert_webhook: Optional[str] = None class TaskUpdate(BaseModel): + tenant_id: Optional[str] = None task_name: Optional[str] = None - task_desc: Optional[str] = None + task_type: Optional[str] = None schedule_type: Optional[str] = None time_points: Optional[List[str]] = None cron_expression: Optional[str] = None - execution_type: Optional[str] = None webhook_url: Optional[str] = None - input_params: Optional[dict] = None + webhook_method: Optional[str] = None + webhook_headers: Optional[dict] = None script_content: Optional[str] = None - # 重试和告警 + script_timeout: Optional[int] = None + input_params: Optional[dict] = None retry_count: Optional[int] = None retry_interval: Optional[int] = None alert_on_failure: Optional[bool] = None alert_webhook: Optional[str] = None + status: Optional[int] = None -class ScriptTestRequest(BaseModel): - tenant_id: str - script_content: str - - -# 密钥管理 class SecretCreate(BaseModel): - tenant_id: Optional[str] = None # None 表示全局 + tenant_id: Optional[str] = None secret_key: str secret_value: str description: Optional[str] = None @@ -75,872 +69,472 @@ class SecretUpdate(BaseModel): description: Optional[str] = None -# 脚本模板 -class TemplateCreate(BaseModel): - name: str - description: Optional[str] = None - category: Optional[str] = None +class TestScriptRequest(BaseModel): script_content: str - is_public: bool = True + tenant_id: Optional[str] = None + params: Optional[dict] = None -class TemplateUpdate(BaseModel): - name: Optional[str] = None - description: Optional[str] = None - category: Optional[str] = None - script_content: Optional[str] = None - is_public: Optional[bool] = None - - -# API Endpoints +# ==================== Task CRUD ==================== @router.get("") async def list_tasks( + tenant_id: Optional[str] = None, + status: Optional[int] = None, page: int = Query(1, ge=1), size: int = Query(20, ge=1, le=100), - tenant_id: Optional[str] = None, - user: User = Depends(get_current_user), db: Session = Depends(get_db) ): - """获取定时任务列表""" - # 构建查询 - where_clauses = [] - params = {} + """获取任务列表""" + query = db.query(ScheduledTask) if tenant_id: - where_clauses.append("tenant_id = :tenant_id") - params["tenant_id"] = tenant_id + query = query.filter(ScheduledTask.tenant_id == tenant_id) + if status is not None: + query = query.filter(ScheduledTask.status == status) - where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" - - # 查询总数 - count_result = db.execute( - text(f"SELECT COUNT(*) FROM platform_scheduled_tasks WHERE {where_sql}"), - params - ) - total = count_result.scalar() - - # 查询列表 - params["offset"] = (page - 1) * size - params["limit"] = size - result = db.execute( - text(f""" - SELECT t.*, tn.name as tenant_name - FROM platform_scheduled_tasks t - LEFT JOIN platform_tenants tn ON t.tenant_id = tn.code - WHERE {where_sql} - ORDER BY t.id DESC - LIMIT :limit OFFSET :offset - """), - params - ) - tasks = [dict(row) for row in result.mappings().all()] + total = query.count() + items = query.order_by(desc(ScheduledTask.created_at)).offset((page - 1) * size).limit(size).all() return { "total": total, - "page": page, - "size": size, - "items": tasks + "items": [format_task(t) for t in items] } -@router.post("") -async def create_task( - data: TaskCreate, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): - """创建定时任务""" - # 验证调度配置 - if data.schedule_type == "simple": - if not data.time_points or len(data.time_points) == 0: - raise HTTPException(status_code=400, detail="简单模式需要至少一个时间点") - elif data.schedule_type == "cron": - if not data.cron_expression: - raise HTTPException(status_code=400, detail="CRON模式需要提供表达式") - - # 验证执行配置 - if data.execution_type == "webhook": - if not data.webhook_url: - raise HTTPException(status_code=400, detail="Webhook模式需要提供URL") - elif data.execution_type == "script": - if not data.script_content: - raise HTTPException(status_code=400, detail="脚本模式需要提供脚本内容") - - # 插入数据库 - import json - time_points_json = json.dumps(data.time_points) if data.time_points else None - input_params_json = json.dumps(data.input_params) if data.input_params else None - - db.execute( - text(""" - INSERT INTO platform_scheduled_tasks - (tenant_id, task_name, task_desc, schedule_type, time_points, - cron_expression, execution_type, webhook_url, input_params, script_content, is_enabled, - retry_count, retry_interval, alert_on_failure, alert_webhook) - VALUES (:tenant_id, :task_name, :task_desc, :schedule_type, :time_points, - :cron_expression, :execution_type, :webhook_url, :input_params, :script_content, :is_enabled, - :retry_count, :retry_interval, :alert_on_failure, :alert_webhook) - """), - { - "tenant_id": data.tenant_id, - "task_name": data.task_name, - "task_desc": data.task_desc, - "schedule_type": data.schedule_type, - "time_points": time_points_json, - "cron_expression": data.cron_expression, - "execution_type": data.execution_type, - "webhook_url": data.webhook_url, - "input_params": input_params_json, - "script_content": data.script_content, - "is_enabled": 1 if data.is_enabled else 0, - "retry_count": data.retry_count, - "retry_interval": data.retry_interval, - "alert_on_failure": 1 if data.alert_on_failure else 0, - "alert_webhook": data.alert_webhook - } - ) - db.commit() - - # 获取新插入的ID - result = db.execute(text("SELECT LAST_INSERT_ID() as id")) - task_id = result.scalar() - - # 如果启用,添加到调度器 - if data.is_enabled: - reload_task(task_id) - - return {"id": task_id, "message": "创建成功"} - - @router.get("/{task_id}") -async def get_task( - task_id: int, - user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): +async def get_task(task_id: int, db: Session = Depends(get_db)): """获取任务详情""" - result = db.execute( - text(""" - SELECT t.*, tn.name as tenant_name - FROM platform_scheduled_tasks t - LEFT JOIN platform_tenants tn ON t.tenant_id = tn.code - WHERE t.id = :id - """), - {"id": task_id} - ) - task = result.mappings().first() - + task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="任务不存在") + return format_task(task, include_content=True) + + +@router.post("") +async def create_task(data: TaskCreate, db: Session = Depends(get_db)): + """创建任务""" + task = ScheduledTask( + tenant_id=data.tenant_id, + task_name=data.task_name, + task_type=data.task_type, + schedule_type=data.schedule_type, + time_points=json.dumps(data.time_points) if data.time_points else None, + cron_expression=data.cron_expression, + webhook_url=data.webhook_url, + webhook_method=data.webhook_method, + webhook_headers=json.dumps(data.webhook_headers) if data.webhook_headers else None, + script_content=data.script_content, + script_timeout=data.script_timeout, + input_params=json.dumps(data.input_params) if data.input_params else None, + retry_count=data.retry_count, + retry_interval=data.retry_interval, + alert_on_failure=1 if data.alert_on_failure else 0, + alert_webhook=data.alert_webhook, + status=1 + ) - return dict(task) + db.add(task) + db.commit() + db.refresh(task) + + # 添加到调度器 + scheduler_service.add_task(task.id) + + return {"success": True, "id": task.id} @router.put("/{task_id}") -async def update_task( - task_id: int, - data: TaskUpdate, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): - """更新定时任务""" - # 检查任务是否存在 - result = db.execute( - text("SELECT * FROM platform_scheduled_tasks WHERE id = :id"), - {"id": task_id} - ) - task = result.mappings().first() +async def update_task(task_id: int, data: TaskUpdate, db: Session = Depends(get_db)): + """更新任务""" + task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="任务不存在") - # 构建更新语句 - import json - updates = [] - params = {"id": task_id} - + # 更新字段 + if data.tenant_id is not None: + task.tenant_id = data.tenant_id if data.task_name is not None: - updates.append("task_name = :task_name") - params["task_name"] = data.task_name - if data.task_desc is not None: - updates.append("task_desc = :task_desc") - params["task_desc"] = data.task_desc + task.task_name = data.task_name + if data.task_type is not None: + task.task_type = data.task_type if data.schedule_type is not None: - updates.append("schedule_type = :schedule_type") - params["schedule_type"] = data.schedule_type + task.schedule_type = data.schedule_type if data.time_points is not None: - updates.append("time_points = :time_points") - params["time_points"] = json.dumps(data.time_points) + task.time_points = json.dumps(data.time_points) if data.cron_expression is not None: - updates.append("cron_expression = :cron_expression") - params["cron_expression"] = data.cron_expression - if data.execution_type is not None: - updates.append("execution_type = :execution_type") - params["execution_type"] = data.execution_type + task.cron_expression = data.cron_expression if data.webhook_url is not None: - updates.append("webhook_url = :webhook_url") - params["webhook_url"] = data.webhook_url + task.webhook_url = data.webhook_url + if data.webhook_method is not None: + task.webhook_method = data.webhook_method + if data.webhook_headers is not None: + task.webhook_headers = json.dumps(data.webhook_headers) if data.script_content is not None: - updates.append("script_content = :script_content") - params["script_content"] = data.script_content + task.script_content = data.script_content + if data.script_timeout is not None: + task.script_timeout = data.script_timeout if data.input_params is not None: - updates.append("input_params = :input_params") - params["input_params"] = json.dumps(data.input_params) + task.input_params = json.dumps(data.input_params) if data.retry_count is not None: - updates.append("retry_count = :retry_count") - params["retry_count"] = data.retry_count + task.retry_count = data.retry_count if data.retry_interval is not None: - updates.append("retry_interval = :retry_interval") - params["retry_interval"] = data.retry_interval + task.retry_interval = data.retry_interval if data.alert_on_failure is not None: - updates.append("alert_on_failure = :alert_on_failure") - params["alert_on_failure"] = 1 if data.alert_on_failure else 0 + task.alert_on_failure = 1 if data.alert_on_failure else 0 if data.alert_webhook is not None: - updates.append("alert_webhook = :alert_webhook") - params["alert_webhook"] = data.alert_webhook + task.alert_webhook = data.alert_webhook + if data.status is not None: + task.status = data.status - # 如果更新了脚本内容,自动保存版本 - if data.script_content is not None and data.script_content.strip(): - # 获取当前最大版本号 - version_result = db.execute( - text("SELECT COALESCE(MAX(version), 0) FROM platform_script_versions WHERE task_id = :task_id"), - {"task_id": task_id} - ) - max_version = version_result.scalar() or 0 - new_version = max_version + 1 - - # 插入新版本 - db.execute( - text(""" - INSERT INTO platform_script_versions (task_id, version, script_content, created_by) - VALUES (:task_id, :version, :script_content, :created_by) - """), - { - "task_id": task_id, - "version": new_version, - "script_content": data.script_content, - "created_by": user.username if hasattr(user, 'username') else None - } - ) - - if updates: - db.execute( - text(f"UPDATE platform_scheduled_tasks SET {', '.join(updates)} WHERE id = :id"), - params - ) - db.commit() - - # 重新加载调度器中的任务 - reload_task(task_id) - - return {"message": "更新成功"} - - -@router.delete("/{task_id}") -async def delete_task( - task_id: int, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): - """删除定时任务""" - # 检查任务是否存在 - result = db.execute( - text("SELECT id FROM platform_scheduled_tasks WHERE id = :id"), - {"id": task_id} - ) - if not result.scalar(): - raise HTTPException(status_code=404, detail="任务不存在") - - # 从调度器移除 - remove_task_from_scheduler(task_id) - - # 删除日志 - db.execute( - text("DELETE FROM platform_task_logs WHERE task_id = :id"), - {"id": task_id} - ) - - # 删除任务 - db.execute( - text("DELETE FROM platform_scheduled_tasks WHERE id = :id"), - {"id": task_id} - ) - db.commit() - - return {"message": "删除成功"} - - -@router.post("/{task_id}/toggle") -async def toggle_task( - task_id: int, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): - """启用/禁用任务""" - # 获取当前状态 - result = db.execute( - text("SELECT is_enabled FROM platform_scheduled_tasks WHERE id = :id"), - {"id": task_id} - ) - row = result.first() - if not row: - raise HTTPException(status_code=404, detail="任务不存在") - - current_enabled = row[0] - new_enabled = 0 if current_enabled else 1 - - # 更新状态 - db.execute( - text("UPDATE platform_scheduled_tasks SET is_enabled = :enabled WHERE id = :id"), - {"id": task_id, "enabled": new_enabled} - ) db.commit() # 更新调度器 - reload_task(task_id) + if task.status == 1: + scheduler_service.add_task(task.id) + else: + scheduler_service.remove_task(task.id) - return { - "is_enabled": bool(new_enabled), - "message": "已启用" if new_enabled else "已禁用" - } + return {"success": True} + + +@router.delete("/{task_id}") +async def delete_task(task_id: int, db: Session = Depends(get_db)): + """删除任务""" + task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() + if not task: + raise HTTPException(status_code=404, detail="任务不存在") + + # 从调度器移除 + scheduler_service.remove_task(task_id) + + # 删除相关日志 + db.query(TaskLog).filter(TaskLog.task_id == task_id).delete() + + db.delete(task) + db.commit() + + return {"success": True} + + +@router.post("/{task_id}/toggle") +async def toggle_task(task_id: int, db: Session = Depends(get_db)): + """启用/禁用任务""" + task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() + if not task: + raise HTTPException(status_code=404, detail="任务不存在") + + task.status = 0 if task.status == 1 else 1 + db.commit() + + if task.status == 1: + scheduler_service.add_task(task.id) + else: + scheduler_service.remove_task(task.id) + + return {"success": True, "status": task.status} @router.post("/{task_id}/run") -async def run_task_now( - task_id: int, - background_tasks: BackgroundTasks, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): - """手动执行任务""" - # 检查任务是否存在 - result = db.execute( - text("SELECT id FROM platform_scheduled_tasks WHERE id = :id"), - {"id": task_id} - ) - if not result.scalar(): +async def run_task(task_id: int, background_tasks: BackgroundTasks, db: Session = Depends(get_db)): + """立即执行任务""" + task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() + if not task: raise HTTPException(status_code=404, detail="任务不存在") - # 在后台执行任务 - background_tasks.add_task(asyncio.create_task, execute_task(task_id)) - - return {"message": "任务已触发执行"} + result = await scheduler_service.run_task_now(task_id) + return result +# ==================== Task Logs ==================== + @router.get("/{task_id}/logs") async def get_task_logs( task_id: int, + status: Optional[str] = None, page: int = Query(1, ge=1), size: int = Query(20, ge=1, le=100), - user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """获取任务执行日志""" - # 查询总数 - count_result = db.execute( - text("SELECT COUNT(*) FROM platform_task_logs WHERE task_id = :task_id"), - {"task_id": task_id} - ) - total = count_result.scalar() + query = db.query(TaskLog).filter(TaskLog.task_id == task_id) - # 查询日志 - result = db.execute( - text(""" - SELECT * FROM platform_task_logs - WHERE task_id = :task_id - ORDER BY id DESC - LIMIT :limit OFFSET :offset - """), - {"task_id": task_id, "limit": size, "offset": (page - 1) * size} - ) - logs = [dict(row) for row in result.mappings().all()] + if status: + query = query.filter(TaskLog.status == status) + + total = query.count() + items = query.order_by(desc(TaskLog.started_at)).offset((page - 1) * size).limit(size).all() return { "total": total, - "page": page, - "size": size, - "items": logs + "items": [format_log(log) for log in items] } -@router.post("/test-script") -async def test_script( - data: ScriptTestRequest, - user: User = Depends(require_operator) -): - """测试执行脚本(不记录日志)""" - from ..services.script_executor import test_script as run_test - - if not data.script_content or not data.script_content.strip(): - raise HTTPException(status_code=400, detail="脚本内容不能为空") - - result = await run_test( - tenant_id=data.tenant_id, - script_content=data.script_content - ) - - return result.to_dict() +# ==================== Script Testing ==================== +@router.post("/test-script") +async def test_script(data: TestScriptRequest, db: Session = Depends(get_db)): + """测试脚本执行""" + executor = ScriptExecutor(db) + result = executor.test_script( + script_content=data.script_content, + task_id=0, + tenant_id=data.tenant_id, + params=data.params + ) + return result + + +# ==================== SDK Documentation ==================== @router.get("/sdk-docs") async def get_sdk_docs(): - """获取 SDK 文档""" + """获取SDK文档""" return { - "description": "脚本执行 SDK 文档", - "methods": [ + "functions": [ { - "name": "ai(prompt, system=None, model='gemini-2.5-flash')", - "description": "调用大模型生成内容", - "example": "result = ai('帮我写一段营销文案')" - }, - { - "name": "dingtalk(webhook, content, msg_type='text', at_mobiles=None)", - "description": "发送钉钉群消息", - "example": "dingtalk('https://oapi.dingtalk.com/robot/send?access_token=xxx', '消息内容')" - }, - { - "name": "wecom(webhook, content, msg_type='text')", - "description": "发送企业微信群消息", - "example": "wecom('https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx', '消息内容')" - }, - { - "name": "db(sql, params=None)", - "description": "执行 SQL 查询(仅支持 SELECT)", - "example": "rows = db('SELECT * FROM users WHERE id = :id', {'id': 1})" - }, - { - "name": "http_get(url, headers=None, params=None)", - "description": "发送 HTTP GET 请求", - "example": "data = http_get('https://api.example.com/data')" - }, - { - "name": "http_post(url, data=None, json_data=None, headers=None)", - "description": "发送 HTTP POST 请求", - "example": "data = http_post('https://api.example.com/submit', json_data={'key': 'value'})" - }, - { - "name": "get_var(key, default=None)", - "description": "获取存储的变量(跨执行持久化)", - "example": "count = get_var('run_count', 0)" - }, - { - "name": "set_var(key, value)", - "description": "存储变量(跨执行持久化)", - "example": "set_var('run_count', count + 1)" - }, - { - "name": "log(message, level='INFO')", + "name": "log", + "signature": "log(message: str, level: str = 'INFO')", "description": "记录日志", - "example": "log('任务执行完成')" + "example": "log('处理完成', 'INFO')" }, { - "name": "get_tenants(app_code=None)", - "description": "获取租户列表(多租户任务遍历)", - "example": "tenants = get_tenants('review-generator')" + "name": "print", + "signature": "print(*args)", + "description": "打印输出", + "example": "print('Hello', 'World')" }, { - "name": "get_tenant_config(tenant_id, app_code, key=None)", - "description": "获取指定租户的应用配置", - "example": "webhook = get_tenant_config('tenant1', 'my-app', 'dingtalk_webhook')" + "name": "ai", + "signature": "ai(prompt: str, system: str = None, model: str = None, temperature: float = 0.7)", + "description": "调用AI模型", + "example": "result = ai('生成一段问候语', system='你是友善的助手')" }, { - "name": "get_all_tenant_configs(app_code)", - "description": "批量获取所有租户的应用配置", - "example": "all_configs = get_all_tenant_configs('my-app')" + "name": "dingtalk", + "signature": "dingtalk(webhook: str, content: str, title: str = None, at_all: bool = False)", + "description": "发送钉钉消息", + "example": "dingtalk(webhook_url, '# 标题\\n内容')" }, { - "name": "get_secret(key)", - "description": "获取密钥(优先租户级,其次全局)", - "example": "api_key = get_secret('openai_api_key')" + "name": "wecom", + "signature": "wecom(webhook: str, content: str, msg_type: str = 'markdown')", + "description": "发送企微消息", + "example": "wecom(webhook_url, '消息内容')" + }, + { + "name": "http_get", + "signature": "http_get(url: str, headers: dict = None, params: dict = None)", + "description": "发起GET请求", + "example": "resp = http_get('https://api.example.com/data')" + }, + { + "name": "http_post", + "signature": "http_post(url: str, data: any = None, headers: dict = None)", + "description": "发起POST请求", + "example": "resp = http_post('https://api.example.com/submit', {'key': 'value'})" + }, + { + "name": "db_query", + "signature": "db_query(sql: str, params: dict = None)", + "description": "执行只读SQL查询", + "example": "rows = db_query('SELECT * FROM users WHERE status = :status', {'status': 1})" + }, + { + "name": "get_var", + "signature": "get_var(key: str, default: any = None)", + "description": "获取持久化变量", + "example": "counter = get_var('counter', 0)" + }, + { + "name": "set_var", + "signature": "set_var(key: str, value: any)", + "description": "设置持久化变量", + "example": "set_var('counter', counter + 1)" + }, + { + "name": "del_var", + "signature": "del_var(key: str)", + "description": "删除持久化变量", + "example": "del_var('temp_data')" + }, + { + "name": "get_param", + "signature": "get_param(key: str, default: any = None)", + "description": "获取任务参数", + "example": "prompt = get_param('prompt', '默认提示词')" + }, + { + "name": "get_params", + "signature": "get_params()", + "description": "获取所有任务参数", + "example": "params = get_params()" + }, + { + "name": "get_tenants", + "signature": "get_tenants(app_code: str = None)", + "description": "获取租户列表", + "example": "tenants = get_tenants('notification-service')" + }, + { + "name": "get_tenant_config", + "signature": "get_tenant_config(tenant_id: str, app_code: str, key: str = None)", + "description": "获取租户的应用配置", + "example": "webhook = get_tenant_config('tenant1', 'notification-service', 'dingtalk_webhook')" + }, + { + "name": "get_all_tenant_configs", + "signature": "get_all_tenant_configs(app_code: str)", + "description": "获取所有租户的应用配置", + "example": "configs = get_all_tenant_configs('notification-service')" + }, + { + "name": "get_secret", + "signature": "get_secret(key: str)", + "description": "获取密钥(优先租户级)", + "example": "api_key = get_secret('api_key')" } ], - "example_script": '''# 示例:多租户批量推送 -import json - -# 获取所有订阅了某应用的租户配置 -tenants = get_all_tenant_configs('daily-report') - -for tenant in tenants: - tenant_id = tenant['tenant_id'] - tenant_name = tenant['tenant_name'] - configs = tenant['configs'] - - # 获取该租户的钉钉 Webhook - webhook = configs.get('dingtalk_webhook') - if not webhook: - log(f"租户 {tenant_name} 未配置 webhook,跳过") - continue - - # 调用 AI 生成内容 - content = ai(f"为 {tenant_name} 生成今日报告", system="你是报告生成专家") - - # 发送 - dingtalk(webhook=webhook, content=content) - log(f"已发送给租户: {tenant_name}") -''' + "variables": [ + {"name": "task_id", "description": "当前任务ID"}, + {"name": "tenant_id", "description": "当前租户ID"}, + {"name": "trace_id", "description": "当前执行追踪ID"} + ], + "libraries": [ + {"name": "json", "description": "JSON处理"}, + {"name": "re", "description": "正则表达式"}, + {"name": "math", "description": "数学函数"}, + {"name": "random", "description": "随机数"}, + {"name": "hashlib", "description": "哈希函数"}, + {"name": "base64", "description": "Base64编解码"}, + {"name": "datetime", "description": "日期时间处理"}, + {"name": "timedelta", "description": "时间差"}, + {"name": "urlencode/quote/unquote", "description": "URL编码"} + ] } -# ============ 密钥管理 ============ +# ==================== Secrets ==================== @router.get("/secrets") async def list_secrets( tenant_id: Optional[str] = None, - user: User = Depends(require_operator), db: Session = Depends(get_db) ): """获取密钥列表""" + query = db.query(Secret) if tenant_id: - result = db.execute( - text("SELECT id, tenant_id, secret_key, description, created_at FROM platform_secrets WHERE tenant_id = :tenant_id OR tenant_id IS NULL ORDER BY tenant_id, secret_key"), - {"tenant_id": tenant_id} - ) - else: - result = db.execute( - text("SELECT id, tenant_id, secret_key, description, created_at FROM platform_secrets ORDER BY tenant_id, secret_key") - ) - secrets = [dict(row) for row in result.mappings().all()] - return {"items": secrets} + query = query.filter(Secret.tenant_id == tenant_id) + + items = query.order_by(desc(Secret.created_at)).all() + + return { + "items": [ + { + "id": s.id, + "tenant_id": s.tenant_id, + "secret_key": s.secret_key, + "description": s.description, + "created_at": s.created_at, + "updated_at": s.updated_at + } + for s in items + ] + } @router.post("/secrets") -async def create_secret( - data: SecretCreate, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): +async def create_secret(data: SecretCreate, db: Session = Depends(get_db)): """创建密钥""" - # 检查是否已存在 - result = db.execute( - text("SELECT id FROM platform_secrets WHERE tenant_id <=> :tenant_id AND secret_key = :key"), - {"tenant_id": data.tenant_id, "key": data.secret_key} - ) - if result.scalar(): - raise HTTPException(status_code=400, detail="密钥已存在") - - db.execute( - text(""" - INSERT INTO platform_secrets (tenant_id, secret_key, secret_value, description) - VALUES (:tenant_id, :key, :value, :desc) - """), - { - "tenant_id": data.tenant_id, - "key": data.secret_key, - "value": data.secret_value, - "desc": data.description - } + secret = Secret( + tenant_id=data.tenant_id, + secret_key=data.secret_key, + secret_value=data.secret_value, + description=data.description ) + db.add(secret) db.commit() - return {"message": "创建成功"} + db.refresh(secret) + + return {"success": True, "id": secret.id} @router.put("/secrets/{secret_id}") -async def update_secret( - secret_id: int, - data: SecretUpdate, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): +async def update_secret(secret_id: int, data: SecretUpdate, db: Session = Depends(get_db)): """更新密钥""" - updates = [] - params = {"id": secret_id} + secret = db.query(Secret).filter(Secret.id == secret_id).first() + if not secret: + raise HTTPException(status_code=404, detail="密钥不存在") if data.secret_value is not None: - updates.append("secret_value = :value") - params["value"] = data.secret_value + secret.secret_value = data.secret_value if data.description is not None: - updates.append("description = :desc") - params["desc"] = data.description + secret.description = data.description - if updates: - db.execute( - text(f"UPDATE platform_secrets SET {', '.join(updates)} WHERE id = :id"), - params - ) - db.commit() - - return {"message": "更新成功"} + db.commit() + return {"success": True} @router.delete("/secrets/{secret_id}") -async def delete_secret( - secret_id: int, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): +async def delete_secret(secret_id: int, db: Session = Depends(get_db)): """删除密钥""" - db.execute(text("DELETE FROM platform_secrets WHERE id = :id"), {"id": secret_id}) + secret = db.query(Secret).filter(Secret.id == secret_id).first() + if not secret: + raise HTTPException(status_code=404, detail="密钥不存在") + + db.delete(secret) db.commit() - return {"message": "删除成功"} + return {"success": True} -# ============ 脚本模板 ============ +# ==================== Helpers ==================== -@router.get("/templates") -async def list_templates( - category: Optional[str] = None, - user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """获取模板列表""" - if category: - result = db.execute( - text("SELECT id, name, description, category, is_public, created_by, created_at FROM platform_script_templates WHERE category = :category ORDER BY id DESC"), - {"category": category} - ) - else: - result = db.execute( - text("SELECT id, name, description, category, is_public, created_by, created_at FROM platform_script_templates ORDER BY id DESC") - ) - templates = [dict(row) for row in result.mappings().all()] - return {"items": templates} - - -@router.get("/templates/{template_id}") -async def get_template( - template_id: int, - user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """获取模板详情""" - result = db.execute( - text("SELECT * FROM platform_script_templates WHERE id = :id"), - {"id": template_id} - ) - template = result.mappings().first() - if not template: - raise HTTPException(status_code=404, detail="模板不存在") - return dict(template) - - -@router.post("/templates") -async def create_template( - data: TemplateCreate, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): - """创建模板""" - db.execute( - text(""" - INSERT INTO platform_script_templates (name, description, category, script_content, is_public, created_by) - VALUES (:name, :desc, :category, :content, :is_public, :created_by) - """), - { - "name": data.name, - "desc": data.description, - "category": data.category, - "content": data.script_content, - "is_public": 1 if data.is_public else 0, - "created_by": user.username if hasattr(user, 'username') else None - } - ) - db.commit() - - result = db.execute(text("SELECT LAST_INSERT_ID() as id")) - template_id = result.scalar() - - return {"id": template_id, "message": "创建成功"} - - -@router.put("/templates/{template_id}") -async def update_template( - template_id: int, - data: TemplateUpdate, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): - """更新模板""" - updates = [] - params = {"id": template_id} - - if data.name is not None: - updates.append("name = :name") - params["name"] = data.name - if data.description is not None: - updates.append("description = :desc") - params["desc"] = data.description - if data.category is not None: - updates.append("category = :category") - params["category"] = data.category - if data.script_content is not None: - updates.append("script_content = :content") - params["content"] = data.script_content - if data.is_public is not None: - updates.append("is_public = :is_public") - params["is_public"] = 1 if data.is_public else 0 - - if updates: - db.execute( - text(f"UPDATE platform_script_templates SET {', '.join(updates)} WHERE id = :id"), - params - ) - db.commit() - - return {"message": "更新成功"} - - -@router.delete("/templates/{template_id}") -async def delete_template( - template_id: int, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): - """删除模板""" - db.execute(text("DELETE FROM platform_script_templates WHERE id = :id"), {"id": template_id}) - db.commit() - return {"message": "删除成功"} - - -# ============ 脚本版本管理 ============ - -@router.get("/{task_id}/versions") -async def list_versions( - task_id: int, - user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """获取任务的脚本版本列表""" - result = db.execute( - text("SELECT id, version, change_note, created_by, created_at FROM platform_script_versions WHERE task_id = :task_id ORDER BY version DESC"), - {"task_id": task_id} - ) - versions = [dict(row) for row in result.mappings().all()] - return {"items": versions} - - -@router.get("/{task_id}/versions/{version}") -async def get_version( - task_id: int, - version: int, - user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """获取指定版本的脚本内容""" - result = db.execute( - text("SELECT * FROM platform_script_versions WHERE task_id = :task_id AND version = :version"), - {"task_id": task_id, "version": version} - ) - ver = result.mappings().first() - if not ver: - raise HTTPException(status_code=404, detail="版本不存在") - return dict(ver) - - -@router.post("/{task_id}/versions/{version}/rollback") -async def rollback_version( - task_id: int, - version: int, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): - """回滚到指定版本""" - # 获取指定版本的脚本内容 - result = db.execute( - text("SELECT script_content FROM platform_script_versions WHERE task_id = :task_id AND version = :version"), - {"task_id": task_id, "version": version} - ) - ver = result.first() - if not ver: - raise HTTPException(status_code=404, detail="版本不存在") - - script_content = ver[0] - - # 更新任务脚本 - db.execute( - text("UPDATE platform_scheduled_tasks SET script_content = :content WHERE id = :id"), - {"id": task_id, "content": script_content} - ) - - # 创建新版本记录 - version_result = db.execute( - text("SELECT COALESCE(MAX(version), 0) FROM platform_script_versions WHERE task_id = :task_id"), - {"task_id": task_id} - ) - max_version = version_result.scalar() or 0 - new_version = max_version + 1 - - db.execute( - text(""" - INSERT INTO platform_script_versions (task_id, version, script_content, change_note, created_by) - VALUES (:task_id, :version, :content, :note, :created_by) - """), - { - "task_id": task_id, - "version": new_version, - "content": script_content, - "note": f"回滚到版本 {version}", - "created_by": user.username if hasattr(user, 'username') else None - } - ) - db.commit() - - # 重新加载任务 - reload_task(task_id) - - return {"message": f"已回滚到版本 {version},当前版本号 {new_version}"} - - -# ============ 统计数据 ============ - -@router.get("/{task_id}/stats") -async def get_task_stats( - task_id: int, - user: User = Depends(get_current_user), - db: Session = Depends(get_db) -): - """获取任务执行统计""" - # 总执行次数 - total_result = db.execute( - text("SELECT COUNT(*) FROM platform_task_logs WHERE task_id = :task_id"), - {"task_id": task_id} - ) - total = total_result.scalar() or 0 - - # 成功次数 - success_result = db.execute( - text("SELECT COUNT(*) FROM platform_task_logs WHERE task_id = :task_id AND status = 'success'"), - {"task_id": task_id} - ) - success = success_result.scalar() or 0 - - # 失败次数 - failed = total - success - - # 成功率 - success_rate = round(success / total * 100, 1) if total > 0 else 0 - - # 平均耗时(成功的任务) - avg_result = db.execute( - text(""" - SELECT AVG(TIMESTAMPDIFF(SECOND, started_at, finished_at)) - FROM platform_task_logs - WHERE task_id = :task_id AND status = 'success' AND finished_at IS NOT NULL - """), - {"task_id": task_id} - ) - avg_duration = avg_result.scalar() - avg_duration = round(float(avg_duration), 1) if avg_duration else 0 - - # 最近7天趋势 - trend_result = db.execute( - text(""" - SELECT DATE(started_at) as date, - SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success, - SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed - FROM platform_task_logs - WHERE task_id = :task_id AND started_at >= DATE_SUB(CURDATE(), INTERVAL 7 DAY) - GROUP BY DATE(started_at) - ORDER BY date - """), - {"task_id": task_id} - ) - trend = [dict(row) for row in trend_result.mappings().all()] - - return { - "total": total, - "success": success, - "failed": failed, - "success_rate": success_rate, - "avg_duration": avg_duration, - "trend": trend +def format_task(task: ScheduledTask, include_content: bool = False) -> dict: + """格式化任务数据""" + data = { + "id": task.id, + "tenant_id": task.tenant_id, + "task_name": task.task_name, + "task_type": task.task_type, + "schedule_type": task.schedule_type, + "time_points": json.loads(task.time_points) if task.time_points else [], + "cron_expression": task.cron_expression, + "status": task.status, + "last_run_at": task.last_run_at, + "last_run_status": task.last_run_status, + "retry_count": task.retry_count, + "retry_interval": task.retry_interval, + "alert_on_failure": bool(task.alert_on_failure), + "alert_webhook": task.alert_webhook, + "created_at": task.created_at, + "updated_at": task.updated_at + } + + if include_content: + data["webhook_url"] = task.webhook_url + data["webhook_method"] = task.webhook_method + data["webhook_headers"] = json.loads(task.webhook_headers) if task.webhook_headers else None + data["script_content"] = task.script_content + data["script_timeout"] = task.script_timeout + data["input_params"] = json.loads(task.input_params) if task.input_params else None + + return data + + +def format_log(log: TaskLog) -> dict: + """格式化日志数据""" + return { + "id": log.id, + "task_id": log.task_id, + "tenant_id": log.tenant_id, + "trace_id": log.trace_id, + "status": log.status, + "started_at": log.started_at, + "finished_at": log.finished_at, + "duration_ms": log.duration_ms, + "output": log.output, + "error": log.error, + "retry_count": log.retry_count, + "created_at": log.created_at } diff --git a/backend/app/routers/tenant_apps.py b/backend/app/routers/tenant_apps.py index 29be095..db31e79 100644 --- a/backend/app/routers/tenant_apps.py +++ b/backend/app/routers/tenant_apps.py @@ -17,13 +17,6 @@ router = APIRouter(prefix="/tenant-apps", tags=["应用配置"]) # Schemas -class CustomConfigItem(BaseModel): - """自定义配置项""" - key: str # 配置键 - value: str # 配置值 - remark: Optional[str] = None # 备注说明 - - class TenantAppCreate(BaseModel): tenant_id: str app_code: str = "tools" @@ -32,7 +25,6 @@ class TenantAppCreate(BaseModel): access_token: Optional[str] = None # 如果不传则自动生成 allowed_origins: Optional[List[str]] = None allowed_tools: Optional[List[str]] = None - custom_configs: Optional[List[CustomConfigItem]] = None # 自定义配置 class TenantAppUpdate(BaseModel): @@ -41,7 +33,6 @@ class TenantAppUpdate(BaseModel): access_token: Optional[str] = None allowed_origins: Optional[List[str]] = None allowed_tools: Optional[List[str]] = None - custom_configs: Optional[List[CustomConfigItem]] = None # 自定义配置 status: Optional[int] = None @@ -120,7 +111,6 @@ async def create_tenant_app( access_token=access_token, allowed_origins=json.dumps(data.allowed_origins) if data.allowed_origins else None, allowed_tools=json.dumps(data.allowed_tools) if data.allowed_tools else None, - custom_configs=json.dumps([c.model_dump() for c in data.custom_configs], ensure_ascii=False) if data.custom_configs else None, status=1 ) db.add(app) @@ -149,14 +139,6 @@ async def update_tenant_app( update_data['allowed_origins'] = json.dumps(update_data['allowed_origins']) if update_data['allowed_origins'] else None if 'allowed_tools' in update_data: update_data['allowed_tools'] = json.dumps(update_data['allowed_tools']) if update_data['allowed_tools'] else None - if 'custom_configs' in update_data: - if update_data['custom_configs']: - update_data['custom_configs'] = json.dumps( - [c.model_dump() if hasattr(c, 'model_dump') else c for c in update_data['custom_configs']], - ensure_ascii=False - ) - else: - update_data['custom_configs'] = None for key, value in update_data.items(): setattr(app, key, value) @@ -182,27 +164,6 @@ async def delete_tenant_app( return {"success": True} -@router.get("/{app_id}/token") -async def get_token( - app_id: int, - user: User = Depends(require_operator), - db: Session = Depends(get_db) -): - """获取真实的 access_token(仅管理员可用)""" - app = db.query(TenantApp).filter(TenantApp.id == app_id).first() - if not app: - raise HTTPException(status_code=404, detail="应用配置不存在") - - # 获取应用的 base_url - app_info = db.query(App).filter(App.app_code == app.app_code).first() - base_url = app_info.base_url if app_info else "" - - return { - "access_token": app.access_token, - "base_url": base_url - } - - @router.post("/{app_id}/regenerate-token") async def regenerate_token( app_id: int, @@ -246,7 +207,6 @@ def format_tenant_app(app: TenantApp, mask_secret: bool = True, db: Session = No "access_token": "******" if mask_secret and app.access_token else app.access_token, "allowed_origins": json.loads(app.allowed_origins) if app.allowed_origins else [], "allowed_tools": json.loads(app.allowed_tools) if app.allowed_tools else [], - "custom_configs": json.loads(app.custom_configs) if app.custom_configs else [], "status": app.status, "created_at": app.created_at, "updated_at": app.updated_at diff --git a/backend/app/services/scheduler.py b/backend/app/services/scheduler.py index 91da8c5..a491cbc 100644 --- a/backend/app/services/scheduler.py +++ b/backend/app/services/scheduler.py @@ -1,399 +1,308 @@ -"""定时任务调度器服务""" -import asyncio -import logging -from datetime import datetime -from typing import Optional, List, Dict, Any - +"""定时任务调度服务""" +import json import httpx +import asyncio +from datetime import datetime +from typing import Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger -from sqlalchemy import text from sqlalchemy.orm import Session from ..database import SessionLocal - -logger = logging.getLogger(__name__) - -# 全局调度器实例 -scheduler: Optional[AsyncIOScheduler] = None +from ..models.scheduled_task import ScheduledTask, TaskLog +from .script_executor import ScriptExecutor -def get_scheduler() -> AsyncIOScheduler: - """获取调度器实例""" - global scheduler - if scheduler is None: - scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") - return scheduler - - -def get_db_session() -> Session: - """获取数据库会话""" - return SessionLocal() - - -async def send_alert(webhook: str, task_name: str, error_message: str): - """发送失败告警通知""" - try: - # 自动判断钉钉或企微 - if "dingtalk" in webhook or "oapi.dingtalk.com" in webhook: - data = { - "msgtype": "markdown", - "markdown": { - "title": "定时任务执行失败", - "text": f"### ⚠️ 定时任务执行失败\n\n**任务名称**:{task_name}\n\n**错误信息**:{error_message[:500]}\n\n**时间**:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" - } - } - else: - # 企微格式 - data = { - "msgtype": "markdown", - "markdown": { - "content": f"### ⚠️ 定时任务执行失败\n\n**任务名称**:{task_name}\n\n**错误信息**:{error_message[:500]}\n\n**时间**:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" - } - } - - async with httpx.AsyncClient(timeout=30.0) as client: - await client.post(webhook, json=data) - logger.info(f"Alert sent for task {task_name}") - except Exception as e: - logger.warning(f"Failed to send alert: {e}") - - -async def execute_task_with_retry(task_id: int, retry_count: int = 0, max_retries: int = 0, retry_interval: int = 60): - """带重试的任务执行""" - success = await execute_task_once(task_id) +class SchedulerService: + """调度服务 - 管理定时任务的调度和执行""" - if not success and retry_count < max_retries: - logger.info(f"Task {task_id} failed, scheduling retry {retry_count + 1}/{max_retries} in {retry_interval}s") - await asyncio.sleep(retry_interval) - await execute_task_with_retry(task_id, retry_count + 1, max_retries, retry_interval) - elif not success: - # 所有重试都失败,发送告警 - db = get_db_session() + _instance: Optional['SchedulerService'] = None + _scheduler: Optional[AsyncIOScheduler] = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + if self._scheduler is None: + self._scheduler = AsyncIOScheduler(timezone='Asia/Shanghai') + + @property + def scheduler(self) -> AsyncIOScheduler: + return self._scheduler + + def start(self): + """启动调度器并加载所有任务""" + if not self._scheduler.running: + self._scheduler.start() + self._load_all_tasks() + print("调度器已启动") + + def shutdown(self): + """关闭调度器""" + if self._scheduler.running: + self._scheduler.shutdown() + print("调度器已关闭") + + def _load_all_tasks(self): + """从数据库加载所有启用的任务""" + db = SessionLocal() try: - result = db.execute( - text("SELECT task_name, alert_on_failure, alert_webhook, last_run_message FROM platform_scheduled_tasks WHERE id = :id"), - {"id": task_id} - ) - task = result.mappings().first() - if task and task["alert_on_failure"] and task["alert_webhook"]: - await send_alert(task["alert_webhook"], task["task_name"], task["last_run_message"] or "未知错误") + tasks = db.query(ScheduledTask).filter(ScheduledTask.status == 1).all() + for task in tasks: + self._add_task_to_scheduler(task) + print(f"已加载 {len(tasks)} 个定时任务") + finally: + db.close() + + def _add_task_to_scheduler(self, task: ScheduledTask): + """将任务添加到调度器""" + job_id = f"task_{task.id}" + + # 移除已存在的任务 + if self._scheduler.get_job(job_id): + self._scheduler.remove_job(job_id) + + if task.schedule_type == 'cron' and task.cron_expression: + # CRON模式 + try: + trigger = CronTrigger.from_crontab(task.cron_expression, timezone='Asia/Shanghai') + self._scheduler.add_job( + self._execute_task, + trigger, + id=job_id, + args=[task.id], + replace_existing=True + ) + except Exception as e: + print(f"任务 {task.id} CRON表达式解析失败: {e}") + + elif task.schedule_type == 'simple' and task.time_points: + # 简单模式 - 多个时间点 + try: + time_points = json.loads(task.time_points) + for i, time_point in enumerate(time_points): + hour, minute = map(int, time_point.split(':')) + sub_job_id = f"{job_id}_{i}" + self._scheduler.add_job( + self._execute_task, + CronTrigger(hour=hour, minute=minute, timezone='Asia/Shanghai'), + id=sub_job_id, + args=[task.id], + replace_existing=True + ) + except Exception as e: + print(f"任务 {task.id} 时间点解析失败: {e}") + + def add_task(self, task_id: int): + """添加或更新任务调度""" + db = SessionLocal() + try: + task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() + if task and task.status == 1: + self._add_task_to_scheduler(task) + finally: + db.close() + + def remove_task(self, task_id: int): + """移除任务调度""" + job_id = f"task_{task_id}" + + # 移除主任务 + if self._scheduler.get_job(job_id): + self._scheduler.remove_job(job_id) + + # 移除简单模式的子任务 + for i in range(24): # 最多24个时间点 + sub_job_id = f"{job_id}_{i}" + if self._scheduler.get_job(sub_job_id): + self._scheduler.remove_job(sub_job_id) + + async def _execute_task(self, task_id: int): + """执行任务(带重试)""" + db = SessionLocal() + try: + task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() + if not task: + return + + max_retries = task.retry_count or 0 + retry_interval = task.retry_interval or 60 + + for attempt in range(max_retries + 1): + success, output, error = await self._execute_task_once(db, task) + + if success: + return + + # 如果还有重试机会 + if attempt < max_retries: + print(f"任务 {task_id} 执行失败,{retry_interval}秒后重试 ({attempt + 1}/{max_retries})") + await asyncio.sleep(retry_interval) + else: + # 最后一次失败,发送告警 + if task.alert_on_failure and task.alert_webhook: + await self._send_alert(task, error) + finally: + db.close() + + async def _execute_task_once(self, db: Session, task: ScheduledTask): + """执行一次任务""" + trace_id = f"{int(datetime.now().timestamp())}-{task.id}" + started_at = datetime.now() + + # 创建日志记录 + log = TaskLog( + task_id=task.id, + tenant_id=task.tenant_id, + trace_id=trace_id, + status='running', + started_at=started_at + ) + db.add(log) + db.commit() + db.refresh(log) + + success = False + output = '' + error = '' + + try: + # 解析输入参数 + params = {} + if task.input_params: + try: + params = json.loads(task.input_params) + except: + pass + + if task.task_type == 'webhook': + success, output, error = await self._execute_webhook(task) + else: + success, output, error = await self._execute_script(db, task, trace_id, params) + + except Exception as e: + error = str(e) + + # 更新日志 + finished_at = datetime.now() + duration_ms = int((finished_at - started_at).total_seconds() * 1000) + + log.status = 'success' if success else 'failed' + log.finished_at = finished_at + log.duration_ms = duration_ms + log.output = output[:10000] if output else None # 限制长度 + log.error = error[:5000] if error else None + + # 更新任务状态 + task.last_run_at = finished_at + task.last_run_status = 'success' if success else 'failed' + + db.commit() + + return success, output, error + + async def _execute_webhook(self, task: ScheduledTask): + """执行Webhook任务""" + try: + headers = {} + if task.webhook_headers: + headers = json.loads(task.webhook_headers) + + body = {} + if task.input_params: + body = json.loads(task.input_params) + + async with httpx.AsyncClient(timeout=30) as client: + if task.webhook_method.upper() == 'GET': + response = await client.get(task.webhook_url, headers=headers, params=body) + else: + response = await client.post(task.webhook_url, headers=headers, json=body) + + response.raise_for_status() + return True, response.text[:5000], '' + + except Exception as e: + return False, '', str(e) + + async def _execute_script(self, db: Session, task: ScheduledTask, trace_id: str, params: dict): + """执行脚本任务""" + if not task.script_content: + return False, '', '脚本内容为空' + + executor = ScriptExecutor(db) + success, output, error = executor.execute( + script_content=task.script_content, + task_id=task.id, + tenant_id=task.tenant_id, + trace_id=trace_id, + params=params, + timeout=task.script_timeout or 300 + ) + + return success, output, error + + async def _send_alert(self, task: ScheduledTask, error: str): + """发送失败告警""" + if not task.alert_webhook: + return + + content = f"""### 定时任务执行失败告警 + +**任务名称**: {task.task_name} +**任务ID**: {task.id} +**租户**: {task.tenant_id or '全局'} +**失败时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} +**错误信息**: +``` +{error[:500] if error else '未知错误'} +```""" + + try: + # 判断是钉钉还是企微 + if 'dingtalk' in task.alert_webhook or 'oapi.dingtalk.com' in task.alert_webhook: + payload = { + "msgtype": "markdown", + "markdown": {"title": "任务失败告警", "text": content} + } + else: + payload = { + "msgtype": "markdown", + "markdown": {"content": content} + } + + async with httpx.AsyncClient(timeout=10) as client: + await client.post(task.alert_webhook, json=payload) + except Exception as e: + print(f"发送告警失败: {e}") + + async def run_task_now(self, task_id: int) -> dict: + """立即执行任务(手动触发)""" + db = SessionLocal() + try: + task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() + if not task: + return {"success": False, "error": "任务不存在"} + + # 解析参数 + params = {} + if task.input_params: + try: + params = json.loads(task.input_params) + except: + pass + + success, output, error = await self._execute_task_once(db, task) + + return { + "success": success, + "output": output, + "error": error + } finally: db.close() -async def execute_task(task_id: int): - """执行定时任务入口(处理重试配置)""" - db = get_db_session() - try: - result = db.execute( - text("SELECT retry_count, retry_interval FROM platform_scheduled_tasks WHERE id = :id"), - {"id": task_id} - ) - task = result.mappings().first() - if task: - max_retries = task.get("retry_count", 0) or 0 - retry_interval = task.get("retry_interval", 60) or 60 - await execute_task_with_retry(task_id, 0, max_retries, retry_interval) - else: - await execute_task_once(task_id) - finally: - db.close() - - -async def execute_task_once(task_id: int) -> bool: - """执行一次定时任务,返回是否成功""" - db = get_db_session() - log_id = None - success = False - - try: - # 1. 查询任务配置 - result = db.execute( - text("SELECT * FROM platform_scheduled_tasks WHERE id = :id AND is_enabled = 1"), - {"id": task_id} - ) - task = result.mappings().first() - - if not task: - logger.warning(f"Task {task_id} not found or disabled") - return True # 不需要重试 - - # 2. 更新任务状态为运行中 - db.execute( - text("UPDATE platform_scheduled_tasks SET last_run_status = 'running', last_run_at = NOW() WHERE id = :id"), - {"id": task_id} - ) - db.commit() - - # 3. 创建执行日志 - db.execute( - text(""" - INSERT INTO platform_task_logs (task_id, tenant_id, started_at, status) - VALUES (:task_id, :tenant_id, NOW(), 'running') - """), - {"task_id": task_id, "tenant_id": task["tenant_id"]} - ) - db.commit() - - # 获取刚插入的日志ID - result = db.execute(text("SELECT LAST_INSERT_ID() as id")) - log_id = result.scalar() - - # 生成 trace_id - trace_id = f"task_{task_id}_{datetime.now().strftime('%Y%m%d%H%M%S%f')}" - - # 4. 根据执行类型分发 - execution_type = task.get("execution_type", "webhook") - - if execution_type == "script": - # 脚本执行模式 - from .script_executor import execute_script as run_script - - script_content = task.get("script_content", "") - if not script_content: - status = "failed" - error_message = "脚本内容为空" - response_code = None - response_body = "" - else: - script_result = await run_script( - task_id=task_id, - tenant_id=task["tenant_id"], - script_content=script_content, - trace_id=trace_id - ) - - if script_result.success: - status = "success" - error_message = None - else: - status = "failed" - error_message = script_result.error - - response_code = None - response_body = script_result.output[:5000] if script_result.output else "" - - # 添加日志到响应体 - if script_result.logs: - response_body += "\n\n--- 执行日志 ---\n" + "\n".join(script_result.logs[-20:]) - else: - # Webhook 执行模式 - webhook_url = task["webhook_url"] - input_params = task["input_params"] or {} - - async with httpx.AsyncClient(timeout=300.0) as client: - response = await client.post( - webhook_url, - json=input_params, - headers={"Content-Type": "application/json"} - ) - - response_code = response.status_code - response_body = response.text[:5000] if response.text else "" # 限制存储长度 - - if response.is_success: - status = "success" - error_message = None - else: - status = "failed" - error_message = f"HTTP {response_code}" - - # 5. 更新执行日志 - db.execute( - text(""" - UPDATE platform_task_logs - SET finished_at = NOW(), status = :status, response_code = :code, - response_body = :body, error_message = :error - WHERE id = :id - """), - { - "id": log_id, - "status": status, - "code": response_code, - "body": response_body, - "error": error_message - } - ) - - # 6. 更新任务状态 - db.execute( - text(""" - UPDATE platform_scheduled_tasks - SET last_run_status = :status, last_run_message = :message - WHERE id = :id - """), - { - "id": task_id, - "status": status, - "message": error_message or "执行成功" - } - ) - db.commit() - - logger.info(f"Task {task_id} executed with status: {status}") - success = (status == "success") - - except Exception as e: - logger.error(f"Task {task_id} execution error: {str(e)}") - success = False - - # 更新失败状态 - try: - if log_id: - db.execute( - text(""" - UPDATE platform_task_logs - SET finished_at = NOW(), status = 'failed', error_message = :error - WHERE id = :id - """), - {"id": log_id, "error": str(e)[:1000]} - ) - - db.execute( - text(""" - UPDATE platform_scheduled_tasks - SET last_run_status = 'failed', last_run_message = :message - WHERE id = :id - """), - {"id": task_id, "message": str(e)[:500]} - ) - db.commit() - except Exception: - pass - finally: - db.close() - - return success - - -def add_task_to_scheduler(task: Dict[str, Any]): - """将任务添加到调度器""" - global scheduler - if scheduler is None: - return - - task_id = task["id"] - schedule_type = task["schedule_type"] - - # 先移除已有的任务(如果存在) - remove_task_from_scheduler(task_id) - - if schedule_type == "cron": - # CRON 模式 - cron_expr = task["cron_expression"] - if cron_expr: - try: - trigger = CronTrigger.from_crontab(cron_expr, timezone="Asia/Shanghai") - scheduler.add_job( - execute_task, - trigger, - args=[task_id], - id=f"task_{task_id}_cron", - replace_existing=True - ) - logger.info(f"Added cron task {task_id}: {cron_expr}") - except Exception as e: - logger.error(f"Failed to add cron task {task_id}: {e}") - else: - # 简单模式 - 多个时间点 - time_points = task.get("time_points") or [] - if isinstance(time_points, str): - import json - time_points = json.loads(time_points) - - for i, time_point in enumerate(time_points): - try: - hour, minute = time_point.split(":") - trigger = CronTrigger( - hour=int(hour), - minute=int(minute), - timezone="Asia/Shanghai" - ) - scheduler.add_job( - execute_task, - trigger, - args=[task_id], - id=f"task_{task_id}_time_{i}", - replace_existing=True - ) - logger.info(f"Added simple task {task_id} at {time_point}") - except Exception as e: - logger.error(f"Failed to add time point {time_point} for task {task_id}: {e}") - - -def remove_task_from_scheduler(task_id: int): - """从调度器移除任务""" - global scheduler - if scheduler is None: - return - - # 移除所有相关的 job - jobs_to_remove = [] - for job in scheduler.get_jobs(): - if job.id.startswith(f"task_{task_id}_"): - jobs_to_remove.append(job.id) - - for job_id in jobs_to_remove: - try: - scheduler.remove_job(job_id) - logger.info(f"Removed job {job_id}") - except Exception as e: - logger.warning(f"Failed to remove job {job_id}: {e}") - - -def load_all_tasks(): - """从数据库加载所有启用的任务""" - db = get_db_session() - try: - result = db.execute( - text("SELECT * FROM platform_scheduled_tasks WHERE is_enabled = 1") - ) - tasks = result.mappings().all() - - for task in tasks: - add_task_to_scheduler(dict(task)) - - logger.info(f"Loaded {len(tasks)} scheduled tasks") - finally: - db.close() - - -def start_scheduler(): - """启动调度器""" - global scheduler - scheduler = get_scheduler() - - # 加载所有任务 - load_all_tasks() - - # 启动调度器 - if not scheduler.running: - scheduler.start() - logger.info("Scheduler started") - - -def shutdown_scheduler(): - """关闭调度器""" - global scheduler - if scheduler and scheduler.running: - scheduler.shutdown(wait=False) - logger.info("Scheduler shutdown") - - -def reload_task(task_id: int): - """重新加载单个任务""" - db = get_db_session() - try: - result = db.execute( - text("SELECT * FROM platform_scheduled_tasks WHERE id = :id"), - {"id": task_id} - ) - task = result.mappings().first() - - if task and task["is_enabled"]: - add_task_to_scheduler(dict(task)) - else: - remove_task_from_scheduler(task_id) - finally: - db.close() +# 全局调度器实例 +scheduler_service = SchedulerService() diff --git a/backend/app/services/script_executor.py b/backend/app/services/script_executor.py index 464135d..db217df 100644 --- a/backend/app/services/script_executor.py +++ b/backend/app/services/script_executor.py @@ -1,262 +1,246 @@ -"""脚本执行器 - 安全执行 Python 脚本""" -import asyncio -import io -import logging +"""脚本执行器 - 安全执行Python脚本""" import sys import traceback -from contextlib import redirect_stdout, redirect_stderr +from io import StringIO +from typing import Any, Dict, Optional, Tuple from datetime import datetime -from typing import Any, Dict +from sqlalchemy.orm import Session from .script_sdk import ScriptSDK -logger = logging.getLogger(__name__) - -# 执行超时时间(秒) -SCRIPT_TIMEOUT = 300 # 5 分钟 # 禁止导入的模块 FORBIDDEN_MODULES = { - 'os', 'subprocess', 'sys', 'builtins', '__builtins__', - 'importlib', 'eval', 'exec', 'compile', - 'open', 'file', 'input', - 'socket', 'multiprocessing', 'threading', - 'pickle', 'marshal', 'ctypes', - 'code', 'codeop', 'pty', 'tty', + 'os', 'subprocess', 'shutil', 'pathlib', + 'socket', 'ftplib', 'telnetlib', 'smtplib', + 'pickle', 'shelve', 'marshal', + 'ctypes', 'multiprocessing', + '__builtins__', 'builtins', + 'importlib', 'imp', + 'code', 'codeop', 'compile', +} + +# 允许的内置函数 +ALLOWED_BUILTINS = { + 'abs', 'all', 'any', 'ascii', 'bin', 'bool', 'bytearray', 'bytes', + 'callable', 'chr', 'complex', 'dict', 'dir', 'divmod', 'enumerate', + 'filter', 'float', 'format', 'frozenset', 'getattr', 'hasattr', 'hash', + 'hex', 'id', 'int', 'isinstance', 'issubclass', 'iter', 'len', 'list', + 'map', 'max', 'min', 'next', 'object', 'oct', 'ord', 'pow', 'print', + 'range', 'repr', 'reversed', 'round', 'set', 'setattr', 'slice', + 'sorted', 'str', 'sum', 'tuple', 'type', 'vars', 'zip', + 'True', 'False', 'None', + 'Exception', 'BaseException', 'ValueError', 'TypeError', 'KeyError', + 'IndexError', 'AttributeError', 'RuntimeError', 'StopIteration', } -class ScriptExecutionResult: - """脚本执行结果""" +class ScriptExecutor: + """脚本执行器""" - def __init__( + def __init__(self, db: Session): + self.db = db + + def execute( self, - success: bool, - output: str = "", - error: str = None, - logs: list = None, - execution_time_ms: int = 0 - ): - self.success = success - self.output = output - self.error = error - self.logs = logs or [] - self.execution_time_ms = execution_time_ms - - def to_dict(self) -> Dict[str, Any]: - return { - "success": self.success, - "output": self.output, - "error": self.error, - "logs": self.logs, - "execution_time_ms": self.execution_time_ms - } - - -def create_safe_builtins() -> Dict[str, Any]: - """创建安全的内置函数集""" - import builtins - - # 允许的内置函数 - allowed = [ - 'abs', 'all', 'any', 'ascii', 'bin', 'bool', 'bytearray', 'bytes', - 'callable', 'chr', 'complex', 'dict', 'divmod', 'enumerate', - 'filter', 'float', 'format', 'frozenset', 'getattr', 'hasattr', - 'hash', 'hex', 'id', 'int', 'isinstance', 'issubclass', 'iter', - 'len', 'list', 'map', 'max', 'min', 'next', 'object', 'oct', - 'ord', 'pow', 'print', 'range', 'repr', 'reversed', 'round', - 'set', 'slice', 'sorted', 'str', 'sum', 'tuple', 'type', 'zip', - 'True', 'False', 'None', - ] - - safe_builtins = {} - for name in allowed: - if hasattr(builtins, name): - safe_builtins[name] = getattr(builtins, name) - - # 添加安全的 import 函数 - def safe_import(name, *args, **kwargs): - """安全的 import 函数,只允许特定模块""" - allowed_modules = { - 'json', 'datetime', 'time', 're', 'math', 'random', - 'collections', 'itertools', 'functools', 'operator', - 'string', 'textwrap', 'unicodedata', - 'hashlib', 'base64', 'urllib.parse', - } + script_content: str, + task_id: int, + tenant_id: Optional[str] = None, + trace_id: Optional[str] = None, + params: Optional[Dict[str, Any]] = None, + timeout: int = 300 + ) -> Tuple[bool, str, str]: + """执行脚本 - if name in FORBIDDEN_MODULES: - raise ImportError(f"禁止导入模块: {name}") + Args: + script_content: Python脚本内容 + task_id: 任务ID + tenant_id: 租户ID + trace_id: 追踪ID + params: 输入参数 + timeout: 超时秒数 + + Returns: + (success, output, error) + """ + # 创建SDK实例 + sdk = ScriptSDK( + db=self.db, + task_id=task_id, + tenant_id=tenant_id, + trace_id=trace_id, + params=params or {} + ) - if name not in allowed_modules and not name.startswith('urllib.parse'): - raise ImportError(f"不允许导入模块: {name},允许的模块: {', '.join(sorted(allowed_modules))}") - - return __builtins__['__import__'](name, *args, **kwargs) - - safe_builtins['__import__'] = safe_import - - return safe_builtins - - -async def execute_script( - task_id: int, - tenant_id: str, - script_content: str, - trace_id: str = None -) -> ScriptExecutionResult: - """ - 执行 Python 脚本 - - Args: - task_id: 任务 ID - tenant_id: 租户 ID - script_content: 脚本内容 - trace_id: 追踪 ID - - Returns: - ScriptExecutionResult: 执行结果 - """ - start_time = datetime.now() - sdk = None - - try: - # 创建 SDK 实例 - sdk = ScriptSDK(tenant_id, task_id, trace_id) + # 检查脚本安全性 + check_result = self._check_script_safety(script_content) + if check_result: + return False, '', f"脚本安全检查失败: {check_result}" # 准备执行环境 - script_globals = { - '__builtins__': create_safe_builtins(), - '__name__': '__script__', - - # SDK 实例 - 'sdk': sdk, - - # 快捷方法(同步包装) - 'ai': lambda *args, **kwargs: asyncio.get_event_loop().run_until_complete(sdk.ai_chat(*args, **kwargs)), - 'dingtalk': lambda *args, **kwargs: asyncio.get_event_loop().run_until_complete(sdk.send_dingtalk(*args, **kwargs)), - 'wecom': lambda *args, **kwargs: asyncio.get_event_loop().run_until_complete(sdk.send_wecom(*args, **kwargs)), - 'http_get': lambda *args, **kwargs: asyncio.get_event_loop().run_until_complete(sdk.http_get(*args, **kwargs)), - 'http_post': lambda *args, **kwargs: asyncio.get_event_loop().run_until_complete(sdk.http_post(*args, **kwargs)), - - # 同步方法 - 'db': sdk.db_query, - 'get_var': sdk.get_var, - 'set_var': sdk.set_var, - 'delete_var': sdk.delete_var, - 'log': sdk.log, - - # 常用模块 - 'json': __import__('json'), - 'datetime': __import__('datetime'), - 're': __import__('re'), - 'math': __import__('math'), - 'random': __import__('random'), - } + safe_globals = self._create_safe_globals(sdk) # 捕获输出 - stdout = io.StringIO() - stderr = io.StringIO() + old_stdout = sys.stdout + old_stderr = sys.stderr + stdout_capture = StringIO() + stderr_capture = StringIO() - sdk.log("脚本开始执行") - - # 编译并执行脚本 try: - # 编译脚本 - code = compile(script_content, ' - - - - diff --git a/frontend/src/router/index.js b/frontend/src/router/index.js index 418e068..4ec95bd 100644 --- a/frontend/src/router/index.js +++ b/frontend/src/router/index.js @@ -53,19 +53,7 @@ const routes = [ path: 'app-config', name: 'AppConfig', component: () => import('@/views/app-config/index.vue'), - meta: { title: '租户订阅', icon: 'Setting' } - }, - { - path: 'scheduled-tasks', - name: 'ScheduledTasks', - component: () => import('@/views/scheduled-tasks/index.vue'), - meta: { title: '定时任务', icon: 'Clock' } - }, - { - path: 'scripts', - name: 'Scripts', - component: () => import('@/views/scripts/index.vue'), - meta: { title: '脚本管理', icon: 'Document' } + meta: { title: '租户应用配置', icon: 'Setting' } }, { path: 'stats', @@ -84,6 +72,12 @@ const routes = [ name: 'Users', component: () => import('@/views/users/index.vue'), meta: { title: '用户管理', icon: 'User', role: 'admin' } + }, + { + path: 'scheduled-tasks', + name: 'ScheduledTasks', + component: () => import('@/views/scheduled-tasks/index.vue'), + meta: { title: '定时任务', icon: 'Clock' } } ] } diff --git a/frontend/src/views/app-config/index.vue b/frontend/src/views/app-config/index.vue index 0b54c0c..ec10e34 100644 --- a/frontend/src/views/app-config/index.vue +++ b/frontend/src/views/app-config/index.vue @@ -1,7 +1,6 @@ @@ -534,125 +364,86 @@ onMounted(() => {
-
- - 管理定时任务,支持简单时间点和 CRON 表达式两种调度方式,可自动调用 n8n 工作流。 - -
- - -
- 租户筛选: - - {{ tenant.name }} - - - 清除筛选 - + +
+ + + + + + + + + 搜索
- + + - - + - + - + -
+
{
- - - - - - - + + + + + + + + + + + + + + + + + - - - + + + + + Python 脚本 + Webhook + + + + + + + 指定时间点 + CRON 表达式 + + + + - - - - - - - 简单模式(选择时间点) - CRON 表达式 - - - - + -
-
- - {{ time }} - -
-
- - - 添加 - -
-
+ +
多个时间点用逗号分隔,格式: HH:MM
+
+ + +
格式: 分 时 日 月 周,如: 0 9 * * * (每天9点)
- - - -
- 格式:分 时 日 月 周。例如:0 9 * * * 表示每天9点执行 -
-
- - 执行配置 - - - - Webhook(调用 n8n 等) - Python 脚本 - - - - -