From ed88099cf0b771269c3a5c8681a4278f77a89c2a Mon Sep 17 00:00:00 2001 From: Admin Date: Wed, 28 Jan 2026 11:27:42 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E8=B0=83=E5=BA=A6=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 platform_scheduled_tasks 和 platform_task_logs 数据表 - 实现 APScheduler 调度器服务(支持简单模式和CRON表达式) - 添加定时任务 CRUD API - 支持手动触发执行和查看执行日志 - 前端任务管理页面 --- backend/app/main.py | 23 + backend/app/routers/tasks.py | 356 ++++++++++ backend/app/services/scheduler.py | 286 ++++++++ backend/requirements.txt | 1 + frontend/src/components/Layout.vue | 1 + frontend/src/router/index.js | 8 +- frontend/src/views/scheduled-tasks/index.vue | 645 +++++++++++++++++++ 7 files changed, 1319 insertions(+), 1 deletion(-) create mode 100644 backend/app/routers/tasks.py create mode 100644 backend/app/services/scheduler.py create mode 100644 frontend/src/views/scheduled-tasks/index.vue diff --git a/backend/app/main.py b/backend/app/main.py index 3ef47c0..6219de4 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -15,8 +15,10 @@ from .routers.alerts import router as alerts_router from .routers.cost import router as cost_router from .routers.quota import router as quota_router from .routers.tool_configs import router as tool_configs_router +from .routers.tasks import router as tasks_router from .middleware import TraceMiddleware, setup_exception_handlers, RequestLoggerMiddleware from .middleware.trace import setup_logging +from .services.scheduler import start_scheduler, shutdown_scheduler # 配置日志(包含 TraceID) setup_logging(level=logging.INFO, include_trace=True) @@ -68,6 +70,27 @@ app.include_router(alerts_router, prefix="/api") app.include_router(cost_router, prefix="/api") app.include_router(quota_router, prefix="/api") app.include_router(tool_configs_router, prefix="/api") +app.include_router(tasks_router, prefix="/api") + + +@app.on_event("startup") +async def startup_event(): + """应用启动时初始化调度器""" + try: + start_scheduler() + logging.info("Scheduler started successfully") + except Exception as e: + logging.error(f"Failed to start scheduler: {e}") + + +@app.on_event("shutdown") +async def shutdown_event(): + """应用关闭时停止调度器""" + try: + shutdown_scheduler() + logging.info("Scheduler shutdown successfully") + except Exception as e: + logging.error(f"Failed to shutdown scheduler: {e}") @app.get("/") diff --git a/backend/app/routers/tasks.py b/backend/app/routers/tasks.py new file mode 100644 index 0000000..02eb734 --- /dev/null +++ b/backend/app/routers/tasks.py @@ -0,0 +1,356 @@ +"""定时任务管理路由""" +import asyncio +from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks +from pydantic import BaseModel +from typing import Optional, List +from sqlalchemy.orm import Session +from sqlalchemy import text + +from ..database import get_db +from .auth import get_current_user, require_operator +from ..models.user import User +from ..services.scheduler import ( + add_task_to_scheduler, + remove_task_from_scheduler, + reload_task, + execute_task +) + +router = APIRouter(prefix="/scheduled-tasks", tags=["定时任务"]) + + +# Schemas + +class TaskCreate(BaseModel): + tenant_id: str + task_name: str + task_desc: Optional[str] = None + schedule_type: str = "simple" # simple | cron + time_points: Optional[List[str]] = None # ["09:00", "14:00"] + cron_expression: Optional[str] = None # "0 9,14 * * *" + webhook_url: str + input_params: Optional[dict] = None + is_enabled: bool = True + + +class TaskUpdate(BaseModel): + task_name: Optional[str] = None + task_desc: Optional[str] = None + schedule_type: Optional[str] = None + time_points: Optional[List[str]] = None + cron_expression: Optional[str] = None + webhook_url: Optional[str] = None + input_params: Optional[dict] = None + + +# API Endpoints + +@router.get("") +async def list_tasks( + page: int = Query(1, ge=1), + size: int = Query(20, ge=1, le=100), + tenant_id: Optional[str] = None, + user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """获取定时任务列表""" + # 构建查询 + where_clauses = [] + params = {} + + if tenant_id: + where_clauses.append("tenant_id = :tenant_id") + params["tenant_id"] = tenant_id + + where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" + + # 查询总数 + count_result = db.execute( + text(f"SELECT COUNT(*) FROM platform_scheduled_tasks WHERE {where_sql}"), + params + ) + total = count_result.scalar() + + # 查询列表 + params["offset"] = (page - 1) * size + params["limit"] = size + result = db.execute( + text(f""" + SELECT t.*, tn.name as tenant_name + FROM platform_scheduled_tasks t + LEFT JOIN platform_tenants tn ON t.tenant_id = tn.code + WHERE {where_sql} + ORDER BY t.id DESC + LIMIT :limit OFFSET :offset + """), + params + ) + tasks = [dict(row) for row in result.mappings().all()] + + return { + "total": total, + "page": page, + "size": size, + "items": tasks + } + + +@router.post("") +async def create_task( + data: TaskCreate, + user: User = Depends(require_operator), + db: Session = Depends(get_db) +): + """创建定时任务""" + # 验证调度配置 + if data.schedule_type == "simple": + if not data.time_points or len(data.time_points) == 0: + raise HTTPException(status_code=400, detail="简单模式需要至少一个时间点") + elif data.schedule_type == "cron": + if not data.cron_expression: + raise HTTPException(status_code=400, detail="CRON模式需要提供表达式") + + # 插入数据库 + import json + time_points_json = json.dumps(data.time_points) if data.time_points else None + input_params_json = json.dumps(data.input_params) if data.input_params else None + + db.execute( + text(""" + INSERT INTO platform_scheduled_tasks + (tenant_id, task_name, task_desc, schedule_type, time_points, + cron_expression, webhook_url, input_params, is_enabled) + VALUES (:tenant_id, :task_name, :task_desc, :schedule_type, :time_points, + :cron_expression, :webhook_url, :input_params, :is_enabled) + """), + { + "tenant_id": data.tenant_id, + "task_name": data.task_name, + "task_desc": data.task_desc, + "schedule_type": data.schedule_type, + "time_points": time_points_json, + "cron_expression": data.cron_expression, + "webhook_url": data.webhook_url, + "input_params": input_params_json, + "is_enabled": 1 if data.is_enabled else 0 + } + ) + db.commit() + + # 获取新插入的ID + result = db.execute(text("SELECT LAST_INSERT_ID() as id")) + task_id = result.scalar() + + # 如果启用,添加到调度器 + if data.is_enabled: + reload_task(task_id) + + return {"id": task_id, "message": "创建成功"} + + +@router.get("/{task_id}") +async def get_task( + task_id: int, + user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """获取任务详情""" + result = db.execute( + text(""" + SELECT t.*, tn.name as tenant_name + FROM platform_scheduled_tasks t + LEFT JOIN platform_tenants tn ON t.tenant_id = tn.code + WHERE t.id = :id + """), + {"id": task_id} + ) + task = result.mappings().first() + + if not task: + raise HTTPException(status_code=404, detail="任务不存在") + + return dict(task) + + +@router.put("/{task_id}") +async def update_task( + task_id: int, + data: TaskUpdate, + user: User = Depends(require_operator), + db: Session = Depends(get_db) +): + """更新定时任务""" + # 检查任务是否存在 + result = db.execute( + text("SELECT * FROM platform_scheduled_tasks WHERE id = :id"), + {"id": task_id} + ) + task = result.mappings().first() + if not task: + raise HTTPException(status_code=404, detail="任务不存在") + + # 构建更新语句 + import json + updates = [] + params = {"id": task_id} + + if data.task_name is not None: + updates.append("task_name = :task_name") + params["task_name"] = data.task_name + if data.task_desc is not None: + updates.append("task_desc = :task_desc") + params["task_desc"] = data.task_desc + if data.schedule_type is not None: + updates.append("schedule_type = :schedule_type") + params["schedule_type"] = data.schedule_type + if data.time_points is not None: + updates.append("time_points = :time_points") + params["time_points"] = json.dumps(data.time_points) + if data.cron_expression is not None: + updates.append("cron_expression = :cron_expression") + params["cron_expression"] = data.cron_expression + if data.webhook_url is not None: + updates.append("webhook_url = :webhook_url") + params["webhook_url"] = data.webhook_url + if data.input_params is not None: + updates.append("input_params = :input_params") + params["input_params"] = json.dumps(data.input_params) + + if updates: + db.execute( + text(f"UPDATE platform_scheduled_tasks SET {', '.join(updates)} WHERE id = :id"), + params + ) + db.commit() + + # 重新加载调度器中的任务 + reload_task(task_id) + + return {"message": "更新成功"} + + +@router.delete("/{task_id}") +async def delete_task( + task_id: int, + user: User = Depends(require_operator), + db: Session = Depends(get_db) +): + """删除定时任务""" + # 检查任务是否存在 + result = db.execute( + text("SELECT id FROM platform_scheduled_tasks WHERE id = :id"), + {"id": task_id} + ) + if not result.scalar(): + raise HTTPException(status_code=404, detail="任务不存在") + + # 从调度器移除 + remove_task_from_scheduler(task_id) + + # 删除日志 + db.execute( + text("DELETE FROM platform_task_logs WHERE task_id = :id"), + {"id": task_id} + ) + + # 删除任务 + db.execute( + text("DELETE FROM platform_scheduled_tasks WHERE id = :id"), + {"id": task_id} + ) + db.commit() + + return {"message": "删除成功"} + + +@router.post("/{task_id}/toggle") +async def toggle_task( + task_id: int, + user: User = Depends(require_operator), + db: Session = Depends(get_db) +): + """启用/禁用任务""" + # 获取当前状态 + result = db.execute( + text("SELECT is_enabled FROM platform_scheduled_tasks WHERE id = :id"), + {"id": task_id} + ) + row = result.first() + if not row: + raise HTTPException(status_code=404, detail="任务不存在") + + current_enabled = row[0] + new_enabled = 0 if current_enabled else 1 + + # 更新状态 + db.execute( + text("UPDATE platform_scheduled_tasks SET is_enabled = :enabled WHERE id = :id"), + {"id": task_id, "enabled": new_enabled} + ) + db.commit() + + # 更新调度器 + reload_task(task_id) + + return { + "is_enabled": bool(new_enabled), + "message": "已启用" if new_enabled else "已禁用" + } + + +@router.post("/{task_id}/run") +async def run_task_now( + task_id: int, + background_tasks: BackgroundTasks, + user: User = Depends(require_operator), + db: Session = Depends(get_db) +): + """手动执行任务""" + # 检查任务是否存在 + result = db.execute( + text("SELECT id FROM platform_scheduled_tasks WHERE id = :id"), + {"id": task_id} + ) + if not result.scalar(): + raise HTTPException(status_code=404, detail="任务不存在") + + # 在后台执行任务 + background_tasks.add_task(asyncio.create_task, execute_task(task_id)) + + return {"message": "任务已触发执行"} + + +@router.get("/{task_id}/logs") +async def get_task_logs( + task_id: int, + page: int = Query(1, ge=1), + size: int = Query(20, ge=1, le=100), + user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """获取任务执行日志""" + # 查询总数 + count_result = db.execute( + text("SELECT COUNT(*) FROM platform_task_logs WHERE task_id = :task_id"), + {"task_id": task_id} + ) + total = count_result.scalar() + + # 查询日志 + result = db.execute( + text(""" + SELECT * FROM platform_task_logs + WHERE task_id = :task_id + ORDER BY id DESC + LIMIT :limit OFFSET :offset + """), + {"task_id": task_id, "limit": size, "offset": (page - 1) * size} + ) + logs = [dict(row) for row in result.mappings().all()] + + return { + "total": total, + "page": page, + "size": size, + "items": logs + } diff --git a/backend/app/services/scheduler.py b/backend/app/services/scheduler.py new file mode 100644 index 0000000..976fc2f --- /dev/null +++ b/backend/app/services/scheduler.py @@ -0,0 +1,286 @@ +"""定时任务调度器服务""" +import asyncio +import logging +from datetime import datetime +from typing import Optional, List, Dict, Any + +import httpx +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger +from sqlalchemy import text +from sqlalchemy.orm import Session + +from ..database import SessionLocal + +logger = logging.getLogger(__name__) + +# 全局调度器实例 +scheduler: Optional[AsyncIOScheduler] = None + + +def get_scheduler() -> AsyncIOScheduler: + """获取调度器实例""" + global scheduler + if scheduler is None: + scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") + return scheduler + + +def get_db_session() -> Session: + """获取数据库会话""" + return SessionLocal() + + +async def execute_task(task_id: int): + """执行定时任务""" + db = get_db_session() + log_id = None + + try: + # 1. 查询任务配置 + result = db.execute( + text("SELECT * FROM platform_scheduled_tasks WHERE id = :id AND is_enabled = 1"), + {"id": task_id} + ) + task = result.mappings().first() + + if not task: + logger.warning(f"Task {task_id} not found or disabled") + return + + # 2. 更新任务状态为运行中 + db.execute( + text("UPDATE platform_scheduled_tasks SET last_run_status = 'running', last_run_at = NOW() WHERE id = :id"), + {"id": task_id} + ) + db.commit() + + # 3. 创建执行日志 + db.execute( + text(""" + INSERT INTO platform_task_logs (task_id, tenant_id, started_at, status) + VALUES (:task_id, :tenant_id, NOW(), 'running') + """), + {"task_id": task_id, "tenant_id": task["tenant_id"]} + ) + db.commit() + + # 获取刚插入的日志ID + result = db.execute(text("SELECT LAST_INSERT_ID() as id")) + log_id = result.scalar() + + # 4. 调用 webhook + webhook_url = task["webhook_url"] + input_params = task["input_params"] or {} + + async with httpx.AsyncClient(timeout=300.0) as client: + response = await client.post( + webhook_url, + json=input_params, + headers={"Content-Type": "application/json"} + ) + + response_code = response.status_code + response_body = response.text[:5000] if response.text else "" # 限制存储长度 + + if response.is_success: + status = "success" + error_message = None + else: + status = "failed" + error_message = f"HTTP {response_code}" + + # 5. 更新执行日志 + db.execute( + text(""" + UPDATE platform_task_logs + SET finished_at = NOW(), status = :status, response_code = :code, + response_body = :body, error_message = :error + WHERE id = :id + """), + { + "id": log_id, + "status": status, + "code": response_code, + "body": response_body, + "error": error_message + } + ) + + # 6. 更新任务状态 + db.execute( + text(""" + UPDATE platform_scheduled_tasks + SET last_run_status = :status, last_run_message = :message + WHERE id = :id + """), + { + "id": task_id, + "status": status, + "message": error_message or "执行成功" + } + ) + db.commit() + + logger.info(f"Task {task_id} executed with status: {status}") + + except Exception as e: + logger.error(f"Task {task_id} execution error: {str(e)}") + + # 更新失败状态 + try: + if log_id: + db.execute( + text(""" + UPDATE platform_task_logs + SET finished_at = NOW(), status = 'failed', error_message = :error + WHERE id = :id + """), + {"id": log_id, "error": str(e)[:1000]} + ) + + db.execute( + text(""" + UPDATE platform_scheduled_tasks + SET last_run_status = 'failed', last_run_message = :message + WHERE id = :id + """), + {"id": task_id, "message": str(e)[:500]} + ) + db.commit() + except Exception: + pass + finally: + db.close() + + +def add_task_to_scheduler(task: Dict[str, Any]): + """将任务添加到调度器""" + global scheduler + if scheduler is None: + return + + task_id = task["id"] + schedule_type = task["schedule_type"] + + # 先移除已有的任务(如果存在) + remove_task_from_scheduler(task_id) + + if schedule_type == "cron": + # CRON 模式 + cron_expr = task["cron_expression"] + if cron_expr: + try: + trigger = CronTrigger.from_crontab(cron_expr, timezone="Asia/Shanghai") + scheduler.add_job( + execute_task, + trigger, + args=[task_id], + id=f"task_{task_id}_cron", + replace_existing=True + ) + logger.info(f"Added cron task {task_id}: {cron_expr}") + except Exception as e: + logger.error(f"Failed to add cron task {task_id}: {e}") + else: + # 简单模式 - 多个时间点 + time_points = task.get("time_points") or [] + if isinstance(time_points, str): + import json + time_points = json.loads(time_points) + + for i, time_point in enumerate(time_points): + try: + hour, minute = time_point.split(":") + trigger = CronTrigger( + hour=int(hour), + minute=int(minute), + timezone="Asia/Shanghai" + ) + scheduler.add_job( + execute_task, + trigger, + args=[task_id], + id=f"task_{task_id}_time_{i}", + replace_existing=True + ) + logger.info(f"Added simple task {task_id} at {time_point}") + except Exception as e: + logger.error(f"Failed to add time point {time_point} for task {task_id}: {e}") + + +def remove_task_from_scheduler(task_id: int): + """从调度器移除任务""" + global scheduler + if scheduler is None: + return + + # 移除所有相关的 job + jobs_to_remove = [] + for job in scheduler.get_jobs(): + if job.id.startswith(f"task_{task_id}_"): + jobs_to_remove.append(job.id) + + for job_id in jobs_to_remove: + try: + scheduler.remove_job(job_id) + logger.info(f"Removed job {job_id}") + except Exception as e: + logger.warning(f"Failed to remove job {job_id}: {e}") + + +def load_all_tasks(): + """从数据库加载所有启用的任务""" + db = get_db_session() + try: + result = db.execute( + text("SELECT * FROM platform_scheduled_tasks WHERE is_enabled = 1") + ) + tasks = result.mappings().all() + + for task in tasks: + add_task_to_scheduler(dict(task)) + + logger.info(f"Loaded {len(tasks)} scheduled tasks") + finally: + db.close() + + +def start_scheduler(): + """启动调度器""" + global scheduler + scheduler = get_scheduler() + + # 加载所有任务 + load_all_tasks() + + # 启动调度器 + if not scheduler.running: + scheduler.start() + logger.info("Scheduler started") + + +def shutdown_scheduler(): + """关闭调度器""" + global scheduler + if scheduler and scheduler.running: + scheduler.shutdown(wait=False) + logger.info("Scheduler shutdown") + + +def reload_task(task_id: int): + """重新加载单个任务""" + db = get_db_session() + try: + result = db.execute( + text("SELECT * FROM platform_scheduled_tasks WHERE id = :id"), + {"id": task_id} + ) + task = result.mappings().first() + + if task and task["is_enabled"]: + add_task_to_scheduler(dict(task)) + else: + remove_task_from_scheduler(task_id) + finally: + db.close() diff --git a/backend/requirements.txt b/backend/requirements.txt index ac816e7..8c4decf 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -12,3 +12,4 @@ python-multipart>=0.0.6 httpx>=0.26.0 redis>=5.0.0 openpyxl>=3.1.0 +apscheduler>=3.10.0 diff --git a/frontend/src/components/Layout.vue b/frontend/src/components/Layout.vue index ce1b488..6e84801 100644 --- a/frontend/src/components/Layout.vue +++ b/frontend/src/components/Layout.vue @@ -16,6 +16,7 @@ const menuItems = computed(() => { { path: '/apps', title: '应用管理', icon: 'Grid' }, { path: '/tenant-wechat-apps', title: '企微应用', icon: 'ChatDotRound' }, { path: '/app-config', title: '租户订阅', icon: 'Setting' }, + { path: '/scheduled-tasks', title: '定时任务', icon: 'Clock' }, { path: '/stats', title: '统计分析', icon: 'TrendCharts' }, { path: '/logs', title: '日志查看', icon: 'Document' } ] diff --git a/frontend/src/router/index.js b/frontend/src/router/index.js index bd68d40..7067145 100644 --- a/frontend/src/router/index.js +++ b/frontend/src/router/index.js @@ -53,7 +53,13 @@ const routes = [ path: 'app-config', name: 'AppConfig', component: () => import('@/views/app-config/index.vue'), - meta: { title: '租户应用配置', icon: 'Setting' } + meta: { title: '租户订阅', icon: 'Setting' } + }, + { + path: 'scheduled-tasks', + name: 'ScheduledTasks', + component: () => import('@/views/scheduled-tasks/index.vue'), + meta: { title: '定时任务', icon: 'Clock' } }, { path: 'stats', diff --git a/frontend/src/views/scheduled-tasks/index.vue b/frontend/src/views/scheduled-tasks/index.vue new file mode 100644 index 0000000..ac286cc --- /dev/null +++ b/frontend/src/views/scheduled-tasks/index.vue @@ -0,0 +1,645 @@ + + + + +