Files
000-platform/backend/app/services/scheduler.py
Admin ed88099cf0
All checks were successful
continuous-integration/drone/push Build is passing
feat: 定时任务调度功能
- 新增 platform_scheduled_tasks 和 platform_task_logs 数据表
- 实现 APScheduler 调度器服务(支持简单模式和CRON表达式)
- 添加定时任务 CRUD API
- 支持手动触发执行和查看执行日志
- 前端任务管理页面
2026-01-28 11:27:42 +08:00

287 lines
8.8 KiB
Python

"""定时任务调度器服务"""
import asyncio
import logging
from datetime import datetime
from typing import Optional, List, Dict, Any
import httpx
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy import text
from sqlalchemy.orm import Session
from ..database import SessionLocal
logger = logging.getLogger(__name__)
# 全局调度器实例
scheduler: Optional[AsyncIOScheduler] = None
def get_scheduler() -> AsyncIOScheduler:
"""获取调度器实例"""
global scheduler
if scheduler is None:
scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
return scheduler
def get_db_session() -> Session:
"""获取数据库会话"""
return SessionLocal()
async def execute_task(task_id: int):
"""执行定时任务"""
db = get_db_session()
log_id = None
try:
# 1. 查询任务配置
result = db.execute(
text("SELECT * FROM platform_scheduled_tasks WHERE id = :id AND is_enabled = 1"),
{"id": task_id}
)
task = result.mappings().first()
if not task:
logger.warning(f"Task {task_id} not found or disabled")
return
# 2. 更新任务状态为运行中
db.execute(
text("UPDATE platform_scheduled_tasks SET last_run_status = 'running', last_run_at = NOW() WHERE id = :id"),
{"id": task_id}
)
db.commit()
# 3. 创建执行日志
db.execute(
text("""
INSERT INTO platform_task_logs (task_id, tenant_id, started_at, status)
VALUES (:task_id, :tenant_id, NOW(), 'running')
"""),
{"task_id": task_id, "tenant_id": task["tenant_id"]}
)
db.commit()
# 获取刚插入的日志ID
result = db.execute(text("SELECT LAST_INSERT_ID() as id"))
log_id = result.scalar()
# 4. 调用 webhook
webhook_url = task["webhook_url"]
input_params = task["input_params"] or {}
async with httpx.AsyncClient(timeout=300.0) as client:
response = await client.post(
webhook_url,
json=input_params,
headers={"Content-Type": "application/json"}
)
response_code = response.status_code
response_body = response.text[:5000] if response.text else "" # 限制存储长度
if response.is_success:
status = "success"
error_message = None
else:
status = "failed"
error_message = f"HTTP {response_code}"
# 5. 更新执行日志
db.execute(
text("""
UPDATE platform_task_logs
SET finished_at = NOW(), status = :status, response_code = :code,
response_body = :body, error_message = :error
WHERE id = :id
"""),
{
"id": log_id,
"status": status,
"code": response_code,
"body": response_body,
"error": error_message
}
)
# 6. 更新任务状态
db.execute(
text("""
UPDATE platform_scheduled_tasks
SET last_run_status = :status, last_run_message = :message
WHERE id = :id
"""),
{
"id": task_id,
"status": status,
"message": error_message or "执行成功"
}
)
db.commit()
logger.info(f"Task {task_id} executed with status: {status}")
except Exception as e:
logger.error(f"Task {task_id} execution error: {str(e)}")
# 更新失败状态
try:
if log_id:
db.execute(
text("""
UPDATE platform_task_logs
SET finished_at = NOW(), status = 'failed', error_message = :error
WHERE id = :id
"""),
{"id": log_id, "error": str(e)[:1000]}
)
db.execute(
text("""
UPDATE platform_scheduled_tasks
SET last_run_status = 'failed', last_run_message = :message
WHERE id = :id
"""),
{"id": task_id, "message": str(e)[:500]}
)
db.commit()
except Exception:
pass
finally:
db.close()
def add_task_to_scheduler(task: Dict[str, Any]):
"""将任务添加到调度器"""
global scheduler
if scheduler is None:
return
task_id = task["id"]
schedule_type = task["schedule_type"]
# 先移除已有的任务(如果存在)
remove_task_from_scheduler(task_id)
if schedule_type == "cron":
# CRON 模式
cron_expr = task["cron_expression"]
if cron_expr:
try:
trigger = CronTrigger.from_crontab(cron_expr, timezone="Asia/Shanghai")
scheduler.add_job(
execute_task,
trigger,
args=[task_id],
id=f"task_{task_id}_cron",
replace_existing=True
)
logger.info(f"Added cron task {task_id}: {cron_expr}")
except Exception as e:
logger.error(f"Failed to add cron task {task_id}: {e}")
else:
# 简单模式 - 多个时间点
time_points = task.get("time_points") or []
if isinstance(time_points, str):
import json
time_points = json.loads(time_points)
for i, time_point in enumerate(time_points):
try:
hour, minute = time_point.split(":")
trigger = CronTrigger(
hour=int(hour),
minute=int(minute),
timezone="Asia/Shanghai"
)
scheduler.add_job(
execute_task,
trigger,
args=[task_id],
id=f"task_{task_id}_time_{i}",
replace_existing=True
)
logger.info(f"Added simple task {task_id} at {time_point}")
except Exception as e:
logger.error(f"Failed to add time point {time_point} for task {task_id}: {e}")
def remove_task_from_scheduler(task_id: int):
"""从调度器移除任务"""
global scheduler
if scheduler is None:
return
# 移除所有相关的 job
jobs_to_remove = []
for job in scheduler.get_jobs():
if job.id.startswith(f"task_{task_id}_"):
jobs_to_remove.append(job.id)
for job_id in jobs_to_remove:
try:
scheduler.remove_job(job_id)
logger.info(f"Removed job {job_id}")
except Exception as e:
logger.warning(f"Failed to remove job {job_id}: {e}")
def load_all_tasks():
"""从数据库加载所有启用的任务"""
db = get_db_session()
try:
result = db.execute(
text("SELECT * FROM platform_scheduled_tasks WHERE is_enabled = 1")
)
tasks = result.mappings().all()
for task in tasks:
add_task_to_scheduler(dict(task))
logger.info(f"Loaded {len(tasks)} scheduled tasks")
finally:
db.close()
def start_scheduler():
"""启动调度器"""
global scheduler
scheduler = get_scheduler()
# 加载所有任务
load_all_tasks()
# 启动调度器
if not scheduler.running:
scheduler.start()
logger.info("Scheduler started")
def shutdown_scheduler():
"""关闭调度器"""
global scheduler
if scheduler and scheduler.running:
scheduler.shutdown(wait=False)
logger.info("Scheduler shutdown")
def reload_task(task_id: int):
"""重新加载单个任务"""
db = get_db_session()
try:
result = db.execute(
text("SELECT * FROM platform_scheduled_tasks WHERE id = :id"),
{"id": task_id}
)
task = result.mappings().first()
if task and task["is_enabled"]:
add_task_to_scheduler(dict(task))
else:
remove_task_from_scheduler(task_id)
finally:
db.close()