diff --git a/backend/app/models/scheduled_task.py b/backend/app/models/scheduled_task.py index 1d610b0..2699978 100644 --- a/backend/app/models/scheduled_task.py +++ b/backend/app/models/scheduled_task.py @@ -1,6 +1,6 @@ """定时任务相关模型""" from datetime import datetime -from sqlalchemy import Column, BigInteger, Integer, String, Text, Enum, SmallInteger, TIMESTAMP, DateTime +from sqlalchemy import Column, BigInteger, Integer, String, Text, Enum, SmallInteger, TIMESTAMP, DateTime, JSON, Boolean from ..database import Base @@ -11,40 +11,43 @@ class ScheduledTask(Base): id = Column(Integer, primary_key=True, autoincrement=True) tenant_id = Column(String(50)) task_name = Column(String(100), nullable=False) - task_type = Column(Enum('webhook', 'script'), nullable=False, default='script') + task_desc = Column(String(500)) # 调度配置 schedule_type = Column(Enum('simple', 'cron'), nullable=False, default='simple') - time_points = Column(Text) # JSON数组 ["08:00", "12:00"] + time_points = Column(JSON) # JSON数组 ["08:00", "12:00"] cron_expression = Column(String(100)) + timezone = Column(String(50), default='Asia/Shanghai') + + # 执行类型 + execution_type = Column(Enum('webhook', 'script'), nullable=False, default='script') # Webhook配置 webhook_url = Column(String(500)) - webhook_method = Column(String(10), default='POST') - webhook_headers = Column(Text) # JSON格式 # 脚本配置 script_content = Column(Text) - script_timeout = Column(Integer, default=300) + script_deps = Column(Text) # 脚本依赖 # 输入参数 - input_params = Column(Text) # JSON格式 + input_params = Column(JSON) # JSON格式 # 重试配置 retry_count = Column(Integer, default=0) retry_interval = Column(Integer, default=60) # 告警配置 - alert_on_failure = Column(SmallInteger, default=0) + alert_on_failure = Column(Boolean, default=False) alert_webhook = Column(String(500)) # 状态 - status = Column(SmallInteger, default=1) # 0-禁用 1-启用 + is_enabled = Column(Boolean, default=True) last_run_at = Column(DateTime) - last_run_status = Column(String(20)) + last_run_status = Column(Enum('success', 'failed', 'running')) + last_run_message = Column(Text) - created_at = Column(TIMESTAMP, default=datetime.now) - updated_at = Column(TIMESTAMP, default=datetime.now, onupdate=datetime.now) + created_at = Column(DateTime, default=datetime.now) + updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) class TaskLog(Base): diff --git a/backend/app/routers/tasks.py b/backend/app/routers/tasks.py index 05c2630..a7f584f 100644 --- a/backend/app/routers/tasks.py +++ b/backend/app/routers/tasks.py @@ -21,15 +21,13 @@ router = APIRouter(prefix="/api/scheduled-tasks", tags=["scheduled-tasks"]) class TaskCreate(BaseModel): tenant_id: Optional[str] = None task_name: str - task_type: str = 'script' + task_desc: Optional[str] = None + execution_type: str = 'script' schedule_type: str = 'simple' time_points: Optional[List[str]] = None cron_expression: Optional[str] = None webhook_url: Optional[str] = None - webhook_method: Optional[str] = 'POST' - webhook_headers: Optional[dict] = None script_content: Optional[str] = None - script_timeout: Optional[int] = 300 input_params: Optional[dict] = None retry_count: Optional[int] = 0 retry_interval: Optional[int] = 60 @@ -40,21 +38,19 @@ class TaskCreate(BaseModel): class TaskUpdate(BaseModel): tenant_id: Optional[str] = None task_name: Optional[str] = None - task_type: Optional[str] = None + task_desc: Optional[str] = None + execution_type: Optional[str] = None schedule_type: Optional[str] = None time_points: Optional[List[str]] = None cron_expression: Optional[str] = None webhook_url: Optional[str] = None - webhook_method: Optional[str] = None - webhook_headers: Optional[dict] = None script_content: Optional[str] = None - script_timeout: Optional[int] = None input_params: Optional[dict] = None retry_count: Optional[int] = None retry_interval: Optional[int] = None alert_on_failure: Optional[bool] = None alert_webhook: Optional[str] = None - status: Optional[int] = None + is_enabled: Optional[bool] = None class SecretCreate(BaseModel): @@ -91,7 +87,8 @@ async def list_tasks( if tenant_id: query = query.filter(ScheduledTask.tenant_id == tenant_id) if status is not None: - query = query.filter(ScheduledTask.status == status) + is_enabled = status == 1 + query = query.filter(ScheduledTask.is_enabled == is_enabled) total = query.count() items = query.order_by(desc(ScheduledTask.created_at)).offset((page - 1) * size).limit(size).all() @@ -117,21 +114,19 @@ async def create_task(data: TaskCreate, db: Session = Depends(get_db)): task = ScheduledTask( tenant_id=data.tenant_id, task_name=data.task_name, - task_type=data.task_type, + task_desc=data.task_desc, + execution_type=data.execution_type, schedule_type=data.schedule_type, - time_points=json.dumps(data.time_points) if data.time_points else None, + time_points=data.time_points, cron_expression=data.cron_expression, webhook_url=data.webhook_url, - webhook_method=data.webhook_method, - webhook_headers=json.dumps(data.webhook_headers) if data.webhook_headers else None, script_content=data.script_content, - script_timeout=data.script_timeout, - input_params=json.dumps(data.input_params) if data.input_params else None, + input_params=data.input_params, retry_count=data.retry_count, retry_interval=data.retry_interval, - alert_on_failure=1 if data.alert_on_failure else 0, + alert_on_failure=data.alert_on_failure, alert_webhook=data.alert_webhook, - status=1 + is_enabled=True ) db.add(task) @@ -156,41 +151,37 @@ async def update_task(task_id: int, data: TaskUpdate, db: Session = Depends(get_ task.tenant_id = data.tenant_id if data.task_name is not None: task.task_name = data.task_name - if data.task_type is not None: - task.task_type = data.task_type + if data.task_desc is not None: + task.task_desc = data.task_desc + if data.execution_type is not None: + task.execution_type = data.execution_type if data.schedule_type is not None: task.schedule_type = data.schedule_type if data.time_points is not None: - task.time_points = json.dumps(data.time_points) + task.time_points = data.time_points if data.cron_expression is not None: task.cron_expression = data.cron_expression if data.webhook_url is not None: task.webhook_url = data.webhook_url - if data.webhook_method is not None: - task.webhook_method = data.webhook_method - if data.webhook_headers is not None: - task.webhook_headers = json.dumps(data.webhook_headers) if data.script_content is not None: task.script_content = data.script_content - if data.script_timeout is not None: - task.script_timeout = data.script_timeout if data.input_params is not None: - task.input_params = json.dumps(data.input_params) + task.input_params = data.input_params if data.retry_count is not None: task.retry_count = data.retry_count if data.retry_interval is not None: task.retry_interval = data.retry_interval if data.alert_on_failure is not None: - task.alert_on_failure = 1 if data.alert_on_failure else 0 + task.alert_on_failure = data.alert_on_failure if data.alert_webhook is not None: task.alert_webhook = data.alert_webhook - if data.status is not None: - task.status = data.status + if data.is_enabled is not None: + task.is_enabled = data.is_enabled db.commit() # 更新调度器 - if task.status == 1: + if task.is_enabled: scheduler_service.add_task(task.id) else: scheduler_service.remove_task(task.id) @@ -224,15 +215,15 @@ async def toggle_task(task_id: int, db: Session = Depends(get_db)): if not task: raise HTTPException(status_code=404, detail="任务不存在") - task.status = 0 if task.status == 1 else 1 + task.is_enabled = not task.is_enabled db.commit() - if task.status == 1: + if task.is_enabled: scheduler_service.add_task(task.id) else: scheduler_service.remove_task(task.id) - return {"success": True, "status": task.status} + return {"success": True, "status": 1 if task.is_enabled else 0} @router.post("/{task_id}/run") @@ -492,15 +483,22 @@ async def delete_secret(secret_id: int, db: Session = Depends(get_db)): def format_task(task: ScheduledTask, include_content: bool = False) -> dict: """格式化任务数据""" + time_points = task.time_points + if isinstance(time_points, str): + try: + time_points = json.loads(time_points) + except: + time_points = [] + data = { "id": task.id, "tenant_id": task.tenant_id, "task_name": task.task_name, - "task_type": task.task_type, + "task_type": task.execution_type, # 前端使用 task_type "schedule_type": task.schedule_type, - "time_points": json.loads(task.time_points) if task.time_points else [], + "time_points": time_points or [], "cron_expression": task.cron_expression, - "status": task.status, + "status": 1 if task.is_enabled else 0, # 前端使用 status "last_run_at": task.last_run_at, "last_run_status": task.last_run_status, "retry_count": task.retry_count, @@ -513,11 +511,14 @@ def format_task(task: ScheduledTask, include_content: bool = False) -> dict: if include_content: data["webhook_url"] = task.webhook_url - data["webhook_method"] = task.webhook_method - data["webhook_headers"] = json.loads(task.webhook_headers) if task.webhook_headers else None data["script_content"] = task.script_content - data["script_timeout"] = task.script_timeout - data["input_params"] = json.loads(task.input_params) if task.input_params else None + input_params = task.input_params + if isinstance(input_params, str): + try: + input_params = json.loads(input_params) + except: + input_params = None + data["input_params"] = input_params return data diff --git a/backend/app/services/scheduler.py b/backend/app/services/scheduler.py index a491cbc..295363b 100644 --- a/backend/app/services/scheduler.py +++ b/backend/app/services/scheduler.py @@ -49,7 +49,7 @@ class SchedulerService: """从数据库加载所有启用的任务""" db = SessionLocal() try: - tasks = db.query(ScheduledTask).filter(ScheduledTask.status == 1).all() + tasks = db.query(ScheduledTask).filter(ScheduledTask.is_enabled == True).all() for task in tasks: self._add_task_to_scheduler(task) print(f"已加载 {len(tasks)} 个定时任务") @@ -81,7 +81,7 @@ class SchedulerService: elif task.schedule_type == 'simple' and task.time_points: # 简单模式 - 多个时间点 try: - time_points = json.loads(task.time_points) + time_points = task.time_points if isinstance(task.time_points, list) else json.loads(task.time_points) for i, time_point in enumerate(time_points): hour, minute = map(int, time_point.split(':')) sub_job_id = f"{job_id}_{i}" @@ -100,7 +100,7 @@ class SchedulerService: db = SessionLocal() try: task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() - if task and task.status == 1: + if task and task.is_enabled: self._add_task_to_scheduler(task) finally: db.close() @@ -172,12 +172,9 @@ class SchedulerService: # 解析输入参数 params = {} if task.input_params: - try: - params = json.loads(task.input_params) - except: - pass + params = task.input_params if isinstance(task.input_params, dict) else {} - if task.task_type == 'webhook': + if task.execution_type == 'webhook': success, output, error = await self._execute_webhook(task) else: success, output, error = await self._execute_script(db, task, trace_id, params) @@ -206,20 +203,12 @@ class SchedulerService: async def _execute_webhook(self, task: ScheduledTask): """执行Webhook任务""" try: - headers = {} - if task.webhook_headers: - headers = json.loads(task.webhook_headers) - body = {} if task.input_params: - body = json.loads(task.input_params) + body = task.input_params if isinstance(task.input_params, dict) else {} async with httpx.AsyncClient(timeout=30) as client: - if task.webhook_method.upper() == 'GET': - response = await client.get(task.webhook_url, headers=headers, params=body) - else: - response = await client.post(task.webhook_url, headers=headers, json=body) - + response = await client.post(task.webhook_url, json=body) response.raise_for_status() return True, response.text[:5000], '' @@ -238,7 +227,7 @@ class SchedulerService: tenant_id=task.tenant_id, trace_id=trace_id, params=params, - timeout=task.script_timeout or 300 + timeout=300 # 默认超时 ) return success, output, error @@ -285,14 +274,6 @@ class SchedulerService: if not task: return {"success": False, "error": "任务不存在"} - # 解析参数 - params = {} - if task.input_params: - try: - params = json.loads(task.input_params) - except: - pass - success, output, error = await self._execute_task_once(db, task) return {