All checks were successful
continuous-integration/drone/push Build is passing
- task_type -> execution_type - status -> is_enabled - 移除不存在的字段 webhook_method, webhook_headers, script_timeout - time_points/input_params 适配 JSON 类型
290 lines
10 KiB
Python
290 lines
10 KiB
Python
"""定时任务调度服务"""
|
|
import json
|
|
import httpx
|
|
import asyncio
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from sqlalchemy.orm import Session
|
|
|
|
from ..database import SessionLocal
|
|
from ..models.scheduled_task import ScheduledTask, TaskLog
|
|
from .script_executor import ScriptExecutor
|
|
|
|
|
|
class SchedulerService:
|
|
"""调度服务 - 管理定时任务的调度和执行"""
|
|
|
|
_instance: Optional['SchedulerService'] = None
|
|
_scheduler: Optional[AsyncIOScheduler] = None
|
|
|
|
def __new__(cls):
|
|
if cls._instance is None:
|
|
cls._instance = super().__new__(cls)
|
|
return cls._instance
|
|
|
|
def __init__(self):
|
|
if self._scheduler is None:
|
|
self._scheduler = AsyncIOScheduler(timezone='Asia/Shanghai')
|
|
|
|
@property
|
|
def scheduler(self) -> AsyncIOScheduler:
|
|
return self._scheduler
|
|
|
|
def start(self):
|
|
"""启动调度器并加载所有任务"""
|
|
if not self._scheduler.running:
|
|
self._scheduler.start()
|
|
self._load_all_tasks()
|
|
print("调度器已启动")
|
|
|
|
def shutdown(self):
|
|
"""关闭调度器"""
|
|
if self._scheduler.running:
|
|
self._scheduler.shutdown()
|
|
print("调度器已关闭")
|
|
|
|
def _load_all_tasks(self):
|
|
"""从数据库加载所有启用的任务"""
|
|
db = SessionLocal()
|
|
try:
|
|
tasks = db.query(ScheduledTask).filter(ScheduledTask.is_enabled == True).all()
|
|
for task in tasks:
|
|
self._add_task_to_scheduler(task)
|
|
print(f"已加载 {len(tasks)} 个定时任务")
|
|
finally:
|
|
db.close()
|
|
|
|
def _add_task_to_scheduler(self, task: ScheduledTask):
|
|
"""将任务添加到调度器"""
|
|
job_id = f"task_{task.id}"
|
|
|
|
# 移除已存在的任务
|
|
if self._scheduler.get_job(job_id):
|
|
self._scheduler.remove_job(job_id)
|
|
|
|
if task.schedule_type == 'cron' and task.cron_expression:
|
|
# CRON模式
|
|
try:
|
|
trigger = CronTrigger.from_crontab(task.cron_expression, timezone='Asia/Shanghai')
|
|
self._scheduler.add_job(
|
|
self._execute_task,
|
|
trigger,
|
|
id=job_id,
|
|
args=[task.id],
|
|
replace_existing=True
|
|
)
|
|
except Exception as e:
|
|
print(f"任务 {task.id} CRON表达式解析失败: {e}")
|
|
|
|
elif task.schedule_type == 'simple' and task.time_points:
|
|
# 简单模式 - 多个时间点
|
|
try:
|
|
time_points = task.time_points if isinstance(task.time_points, list) else json.loads(task.time_points)
|
|
for i, time_point in enumerate(time_points):
|
|
hour, minute = map(int, time_point.split(':'))
|
|
sub_job_id = f"{job_id}_{i}"
|
|
self._scheduler.add_job(
|
|
self._execute_task,
|
|
CronTrigger(hour=hour, minute=minute, timezone='Asia/Shanghai'),
|
|
id=sub_job_id,
|
|
args=[task.id],
|
|
replace_existing=True
|
|
)
|
|
except Exception as e:
|
|
print(f"任务 {task.id} 时间点解析失败: {e}")
|
|
|
|
def add_task(self, task_id: int):
|
|
"""添加或更新任务调度"""
|
|
db = SessionLocal()
|
|
try:
|
|
task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
|
|
if task and task.is_enabled:
|
|
self._add_task_to_scheduler(task)
|
|
finally:
|
|
db.close()
|
|
|
|
def remove_task(self, task_id: int):
|
|
"""移除任务调度"""
|
|
job_id = f"task_{task_id}"
|
|
|
|
# 移除主任务
|
|
if self._scheduler.get_job(job_id):
|
|
self._scheduler.remove_job(job_id)
|
|
|
|
# 移除简单模式的子任务
|
|
for i in range(24): # 最多24个时间点
|
|
sub_job_id = f"{job_id}_{i}"
|
|
if self._scheduler.get_job(sub_job_id):
|
|
self._scheduler.remove_job(sub_job_id)
|
|
|
|
async def _execute_task(self, task_id: int):
|
|
"""执行任务(带重试)"""
|
|
db = SessionLocal()
|
|
try:
|
|
task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
|
|
if not task:
|
|
return
|
|
|
|
max_retries = task.retry_count or 0
|
|
retry_interval = task.retry_interval or 60
|
|
|
|
for attempt in range(max_retries + 1):
|
|
success, output, error = await self._execute_task_once(db, task)
|
|
|
|
if success:
|
|
return
|
|
|
|
# 如果还有重试机会
|
|
if attempt < max_retries:
|
|
print(f"任务 {task_id} 执行失败,{retry_interval}秒后重试 ({attempt + 1}/{max_retries})")
|
|
await asyncio.sleep(retry_interval)
|
|
else:
|
|
# 最后一次失败,发送告警
|
|
if task.alert_on_failure and task.alert_webhook:
|
|
await self._send_alert(task, error)
|
|
finally:
|
|
db.close()
|
|
|
|
async def _execute_task_once(self, db: Session, task: ScheduledTask):
|
|
"""执行一次任务"""
|
|
trace_id = f"{int(datetime.now().timestamp())}-{task.id}"
|
|
started_at = datetime.now()
|
|
|
|
# 创建日志记录
|
|
log = TaskLog(
|
|
task_id=task.id,
|
|
tenant_id=task.tenant_id,
|
|
trace_id=trace_id,
|
|
status='running',
|
|
started_at=started_at
|
|
)
|
|
db.add(log)
|
|
db.commit()
|
|
db.refresh(log)
|
|
|
|
success = False
|
|
output = ''
|
|
error = ''
|
|
|
|
try:
|
|
# 解析输入参数
|
|
params = {}
|
|
if task.input_params:
|
|
params = task.input_params if isinstance(task.input_params, dict) else {}
|
|
|
|
if task.execution_type == 'webhook':
|
|
success, output, error = await self._execute_webhook(task)
|
|
else:
|
|
success, output, error = await self._execute_script(db, task, trace_id, params)
|
|
|
|
except Exception as e:
|
|
error = str(e)
|
|
|
|
# 更新日志
|
|
finished_at = datetime.now()
|
|
duration_ms = int((finished_at - started_at).total_seconds() * 1000)
|
|
|
|
log.status = 'success' if success else 'failed'
|
|
log.finished_at = finished_at
|
|
log.duration_ms = duration_ms
|
|
log.output = output[:10000] if output else None # 限制长度
|
|
log.error = error[:5000] if error else None
|
|
|
|
# 更新任务状态
|
|
task.last_run_at = finished_at
|
|
task.last_run_status = 'success' if success else 'failed'
|
|
|
|
db.commit()
|
|
|
|
return success, output, error
|
|
|
|
async def _execute_webhook(self, task: ScheduledTask):
|
|
"""执行Webhook任务"""
|
|
try:
|
|
body = {}
|
|
if task.input_params:
|
|
body = task.input_params if isinstance(task.input_params, dict) else {}
|
|
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
response = await client.post(task.webhook_url, json=body)
|
|
response.raise_for_status()
|
|
return True, response.text[:5000], ''
|
|
|
|
except Exception as e:
|
|
return False, '', str(e)
|
|
|
|
async def _execute_script(self, db: Session, task: ScheduledTask, trace_id: str, params: dict):
|
|
"""执行脚本任务"""
|
|
if not task.script_content:
|
|
return False, '', '脚本内容为空'
|
|
|
|
executor = ScriptExecutor(db)
|
|
success, output, error = executor.execute(
|
|
script_content=task.script_content,
|
|
task_id=task.id,
|
|
tenant_id=task.tenant_id,
|
|
trace_id=trace_id,
|
|
params=params,
|
|
timeout=300 # 默认超时
|
|
)
|
|
|
|
return success, output, error
|
|
|
|
async def _send_alert(self, task: ScheduledTask, error: str):
|
|
"""发送失败告警"""
|
|
if not task.alert_webhook:
|
|
return
|
|
|
|
content = f"""### 定时任务执行失败告警
|
|
|
|
**任务名称**: {task.task_name}
|
|
**任务ID**: {task.id}
|
|
**租户**: {task.tenant_id or '全局'}
|
|
**失败时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
|
|
**错误信息**:
|
|
```
|
|
{error[:500] if error else '未知错误'}
|
|
```"""
|
|
|
|
try:
|
|
# 判断是钉钉还是企微
|
|
if 'dingtalk' in task.alert_webhook or 'oapi.dingtalk.com' in task.alert_webhook:
|
|
payload = {
|
|
"msgtype": "markdown",
|
|
"markdown": {"title": "任务失败告警", "text": content}
|
|
}
|
|
else:
|
|
payload = {
|
|
"msgtype": "markdown",
|
|
"markdown": {"content": content}
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
await client.post(task.alert_webhook, json=payload)
|
|
except Exception as e:
|
|
print(f"发送告警失败: {e}")
|
|
|
|
async def run_task_now(self, task_id: int) -> dict:
|
|
"""立即执行任务(手动触发)"""
|
|
db = SessionLocal()
|
|
try:
|
|
task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
|
|
if not task:
|
|
return {"success": False, "error": "任务不存在"}
|
|
|
|
success, output, error = await self._execute_task_once(db, task)
|
|
|
|
return {
|
|
"success": success,
|
|
"output": output,
|
|
"error": error
|
|
}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# 全局调度器实例
|
|
scheduler_service = SchedulerService()
|