"""定时任务调度器服务""" import asyncio import logging from datetime import datetime from typing import Optional, List, Dict, Any import httpx from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from sqlalchemy import text from sqlalchemy.orm import Session from ..database import SessionLocal logger = logging.getLogger(__name__) # 全局调度器实例 scheduler: Optional[AsyncIOScheduler] = None def get_scheduler() -> AsyncIOScheduler: """获取调度器实例""" global scheduler if scheduler is None: scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") return scheduler def get_db_session() -> Session: """获取数据库会话""" return SessionLocal() async def execute_task(task_id: int): """执行定时任务""" db = get_db_session() log_id = None try: # 1. 查询任务配置 result = db.execute( text("SELECT * FROM platform_scheduled_tasks WHERE id = :id AND is_enabled = 1"), {"id": task_id} ) task = result.mappings().first() if not task: logger.warning(f"Task {task_id} not found or disabled") return # 2. 更新任务状态为运行中 db.execute( text("UPDATE platform_scheduled_tasks SET last_run_status = 'running', last_run_at = NOW() WHERE id = :id"), {"id": task_id} ) db.commit() # 3. 创建执行日志 db.execute( text(""" INSERT INTO platform_task_logs (task_id, tenant_id, started_at, status) VALUES (:task_id, :tenant_id, NOW(), 'running') """), {"task_id": task_id, "tenant_id": task["tenant_id"]} ) db.commit() # 获取刚插入的日志ID result = db.execute(text("SELECT LAST_INSERT_ID() as id")) log_id = result.scalar() # 生成 trace_id trace_id = f"task_{task_id}_{datetime.now().strftime('%Y%m%d%H%M%S%f')}" # 4. 根据执行类型分发 execution_type = task.get("execution_type", "webhook") if execution_type == "script": # 脚本执行模式 from .script_executor import execute_script as run_script script_content = task.get("script_content", "") if not script_content: status = "failed" error_message = "脚本内容为空" response_code = None response_body = "" else: script_result = await run_script( task_id=task_id, tenant_id=task["tenant_id"], script_content=script_content, trace_id=trace_id ) if script_result.success: status = "success" error_message = None else: status = "failed" error_message = script_result.error response_code = None response_body = script_result.output[:5000] if script_result.output else "" # 添加日志到响应体 if script_result.logs: response_body += "\n\n--- 执行日志 ---\n" + "\n".join(script_result.logs[-20:]) else: # Webhook 执行模式 webhook_url = task["webhook_url"] input_params = task["input_params"] or {} async with httpx.AsyncClient(timeout=300.0) as client: response = await client.post( webhook_url, json=input_params, headers={"Content-Type": "application/json"} ) response_code = response.status_code response_body = response.text[:5000] if response.text else "" # 限制存储长度 if response.is_success: status = "success" error_message = None else: status = "failed" error_message = f"HTTP {response_code}" # 5. 更新执行日志 db.execute( text(""" UPDATE platform_task_logs SET finished_at = NOW(), status = :status, response_code = :code, response_body = :body, error_message = :error WHERE id = :id """), { "id": log_id, "status": status, "code": response_code, "body": response_body, "error": error_message } ) # 6. 更新任务状态 db.execute( text(""" UPDATE platform_scheduled_tasks SET last_run_status = :status, last_run_message = :message WHERE id = :id """), { "id": task_id, "status": status, "message": error_message or "执行成功" } ) db.commit() logger.info(f"Task {task_id} executed with status: {status}") except Exception as e: logger.error(f"Task {task_id} execution error: {str(e)}") # 更新失败状态 try: if log_id: db.execute( text(""" UPDATE platform_task_logs SET finished_at = NOW(), status = 'failed', error_message = :error WHERE id = :id """), {"id": log_id, "error": str(e)[:1000]} ) db.execute( text(""" UPDATE platform_scheduled_tasks SET last_run_status = 'failed', last_run_message = :message WHERE id = :id """), {"id": task_id, "message": str(e)[:500]} ) db.commit() except Exception: pass finally: db.close() def add_task_to_scheduler(task: Dict[str, Any]): """将任务添加到调度器""" global scheduler if scheduler is None: return task_id = task["id"] schedule_type = task["schedule_type"] # 先移除已有的任务(如果存在) remove_task_from_scheduler(task_id) if schedule_type == "cron": # CRON 模式 cron_expr = task["cron_expression"] if cron_expr: try: trigger = CronTrigger.from_crontab(cron_expr, timezone="Asia/Shanghai") scheduler.add_job( execute_task, trigger, args=[task_id], id=f"task_{task_id}_cron", replace_existing=True ) logger.info(f"Added cron task {task_id}: {cron_expr}") except Exception as e: logger.error(f"Failed to add cron task {task_id}: {e}") else: # 简单模式 - 多个时间点 time_points = task.get("time_points") or [] if isinstance(time_points, str): import json time_points = json.loads(time_points) for i, time_point in enumerate(time_points): try: hour, minute = time_point.split(":") trigger = CronTrigger( hour=int(hour), minute=int(minute), timezone="Asia/Shanghai" ) scheduler.add_job( execute_task, trigger, args=[task_id], id=f"task_{task_id}_time_{i}", replace_existing=True ) logger.info(f"Added simple task {task_id} at {time_point}") except Exception as e: logger.error(f"Failed to add time point {time_point} for task {task_id}: {e}") def remove_task_from_scheduler(task_id: int): """从调度器移除任务""" global scheduler if scheduler is None: return # 移除所有相关的 job jobs_to_remove = [] for job in scheduler.get_jobs(): if job.id.startswith(f"task_{task_id}_"): jobs_to_remove.append(job.id) for job_id in jobs_to_remove: try: scheduler.remove_job(job_id) logger.info(f"Removed job {job_id}") except Exception as e: logger.warning(f"Failed to remove job {job_id}: {e}") def load_all_tasks(): """从数据库加载所有启用的任务""" db = get_db_session() try: result = db.execute( text("SELECT * FROM platform_scheduled_tasks WHERE is_enabled = 1") ) tasks = result.mappings().all() for task in tasks: add_task_to_scheduler(dict(task)) logger.info(f"Loaded {len(tasks)} scheduled tasks") finally: db.close() def start_scheduler(): """启动调度器""" global scheduler scheduler = get_scheduler() # 加载所有任务 load_all_tasks() # 启动调度器 if not scheduler.running: scheduler.start() logger.info("Scheduler started") def shutdown_scheduler(): """关闭调度器""" global scheduler if scheduler and scheduler.running: scheduler.shutdown(wait=False) logger.info("Scheduler shutdown") def reload_task(task_id: int): """重新加载单个任务""" db = get_db_session() try: result = db.execute( text("SELECT * FROM platform_scheduled_tasks WHERE id = :id"), {"id": task_id} ) task = result.mappings().first() if task and task["is_enabled"]: add_task_to_scheduler(dict(task)) else: remove_task_from_scheduler(task_id) finally: db.close()