feat: 实现定时任务系统
All checks were successful
continuous-integration/drone/push Build is passing

- 新增 platform_scheduled_tasks, platform_task_logs, platform_script_vars, platform_secrets 数据库表
- 实现 ScriptSDK 提供 AI/通知/DB/HTTP/变量存储/参数获取等功能
- 实现安全的脚本执行器,支持沙箱环境和禁止危险操作
- 实现 APScheduler 调度服务,支持简单时间点和 CRON 表达式
- 新增定时任务 API 路由,包含 CRUD、执行、日志、密钥管理
- 新增定时任务前端页面,支持脚本编辑、测试运行、日志查看
This commit is contained in:
2026-01-28 16:38:19 +08:00
parent 7806072b17
commit 104487f082
19 changed files with 1870 additions and 5406 deletions

View File

@@ -1,399 +1,308 @@
"""定时任务调度服务"""
import asyncio
import logging
from datetime import datetime
from typing import Optional, List, Dict, Any
"""定时任务调度服务"""
import json
import httpx
import asyncio
from datetime import datetime
from typing import Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy import text
from sqlalchemy.orm import Session
from ..database import SessionLocal
logger = logging.getLogger(__name__)
# 全局调度器实例
scheduler: Optional[AsyncIOScheduler] = None
from ..models.scheduled_task import ScheduledTask, TaskLog
from .script_executor import ScriptExecutor
def get_scheduler() -> AsyncIOScheduler:
"""获取调度器实例"""
global scheduler
if scheduler is None:
scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
return scheduler
def get_db_session() -> Session:
"""获取数据库会话"""
return SessionLocal()
async def send_alert(webhook: str, task_name: str, error_message: str):
"""发送失败告警通知"""
try:
# 自动判断钉钉或企微
if "dingtalk" in webhook or "oapi.dingtalk.com" in webhook:
data = {
"msgtype": "markdown",
"markdown": {
"title": "定时任务执行失败",
"text": f"### ⚠️ 定时任务执行失败\n\n**任务名称**{task_name}\n\n**错误信息**{error_message[:500]}\n\n**时间**{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
}
}
else:
# 企微格式
data = {
"msgtype": "markdown",
"markdown": {
"content": f"### ⚠️ 定时任务执行失败\n\n**任务名称**{task_name}\n\n**错误信息**{error_message[:500]}\n\n**时间**{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
}
}
async with httpx.AsyncClient(timeout=30.0) as client:
await client.post(webhook, json=data)
logger.info(f"Alert sent for task {task_name}")
except Exception as e:
logger.warning(f"Failed to send alert: {e}")
async def execute_task_with_retry(task_id: int, retry_count: int = 0, max_retries: int = 0, retry_interval: int = 60):
"""带重试的任务执行"""
success = await execute_task_once(task_id)
class SchedulerService:
"""调度服务 - 管理定时任务的调度和执行"""
if not success and retry_count < max_retries:
logger.info(f"Task {task_id} failed, scheduling retry {retry_count + 1}/{max_retries} in {retry_interval}s")
await asyncio.sleep(retry_interval)
await execute_task_with_retry(task_id, retry_count + 1, max_retries, retry_interval)
elif not success:
# 所有重试都失败,发送告警
db = get_db_session()
_instance: Optional['SchedulerService'] = None
_scheduler: Optional[AsyncIOScheduler] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if self._scheduler is None:
self._scheduler = AsyncIOScheduler(timezone='Asia/Shanghai')
@property
def scheduler(self) -> AsyncIOScheduler:
return self._scheduler
def start(self):
"""启动调度器并加载所有任务"""
if not self._scheduler.running:
self._scheduler.start()
self._load_all_tasks()
print("调度器已启动")
def shutdown(self):
"""关闭调度器"""
if self._scheduler.running:
self._scheduler.shutdown()
print("调度器已关闭")
def _load_all_tasks(self):
"""从数据库加载所有启用的任务"""
db = SessionLocal()
try:
result = db.execute(
text("SELECT task_name, alert_on_failure, alert_webhook, last_run_message FROM platform_scheduled_tasks WHERE id = :id"),
{"id": task_id}
)
task = result.mappings().first()
if task and task["alert_on_failure"] and task["alert_webhook"]:
await send_alert(task["alert_webhook"], task["task_name"], task["last_run_message"] or "未知错误")
tasks = db.query(ScheduledTask).filter(ScheduledTask.status == 1).all()
for task in tasks:
self._add_task_to_scheduler(task)
print(f"已加载 {len(tasks)} 个定时任务")
finally:
db.close()
def _add_task_to_scheduler(self, task: ScheduledTask):
"""将任务添加到调度器"""
job_id = f"task_{task.id}"
# 移除已存在的任务
if self._scheduler.get_job(job_id):
self._scheduler.remove_job(job_id)
if task.schedule_type == 'cron' and task.cron_expression:
# CRON模式
try:
trigger = CronTrigger.from_crontab(task.cron_expression, timezone='Asia/Shanghai')
self._scheduler.add_job(
self._execute_task,
trigger,
id=job_id,
args=[task.id],
replace_existing=True
)
except Exception as e:
print(f"任务 {task.id} CRON表达式解析失败: {e}")
elif task.schedule_type == 'simple' and task.time_points:
# 简单模式 - 多个时间点
try:
time_points = json.loads(task.time_points)
for i, time_point in enumerate(time_points):
hour, minute = map(int, time_point.split(':'))
sub_job_id = f"{job_id}_{i}"
self._scheduler.add_job(
self._execute_task,
CronTrigger(hour=hour, minute=minute, timezone='Asia/Shanghai'),
id=sub_job_id,
args=[task.id],
replace_existing=True
)
except Exception as e:
print(f"任务 {task.id} 时间点解析失败: {e}")
def add_task(self, task_id: int):
"""添加或更新任务调度"""
db = SessionLocal()
try:
task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
if task and task.status == 1:
self._add_task_to_scheduler(task)
finally:
db.close()
def remove_task(self, task_id: int):
"""移除任务调度"""
job_id = f"task_{task_id}"
# 移除主任务
if self._scheduler.get_job(job_id):
self._scheduler.remove_job(job_id)
# 移除简单模式的子任务
for i in range(24): # 最多24个时间点
sub_job_id = f"{job_id}_{i}"
if self._scheduler.get_job(sub_job_id):
self._scheduler.remove_job(sub_job_id)
async def _execute_task(self, task_id: int):
"""执行任务(带重试)"""
db = SessionLocal()
try:
task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
if not task:
return
max_retries = task.retry_count or 0
retry_interval = task.retry_interval or 60
for attempt in range(max_retries + 1):
success, output, error = await self._execute_task_once(db, task)
if success:
return
# 如果还有重试机会
if attempt < max_retries:
print(f"任务 {task_id} 执行失败,{retry_interval}秒后重试 ({attempt + 1}/{max_retries})")
await asyncio.sleep(retry_interval)
else:
# 最后一次失败,发送告警
if task.alert_on_failure and task.alert_webhook:
await self._send_alert(task, error)
finally:
db.close()
async def _execute_task_once(self, db: Session, task: ScheduledTask):
"""执行一次任务"""
trace_id = f"{int(datetime.now().timestamp())}-{task.id}"
started_at = datetime.now()
# 创建日志记录
log = TaskLog(
task_id=task.id,
tenant_id=task.tenant_id,
trace_id=trace_id,
status='running',
started_at=started_at
)
db.add(log)
db.commit()
db.refresh(log)
success = False
output = ''
error = ''
try:
# 解析输入参数
params = {}
if task.input_params:
try:
params = json.loads(task.input_params)
except:
pass
if task.task_type == 'webhook':
success, output, error = await self._execute_webhook(task)
else:
success, output, error = await self._execute_script(db, task, trace_id, params)
except Exception as e:
error = str(e)
# 更新日志
finished_at = datetime.now()
duration_ms = int((finished_at - started_at).total_seconds() * 1000)
log.status = 'success' if success else 'failed'
log.finished_at = finished_at
log.duration_ms = duration_ms
log.output = output[:10000] if output else None # 限制长度
log.error = error[:5000] if error else None
# 更新任务状态
task.last_run_at = finished_at
task.last_run_status = 'success' if success else 'failed'
db.commit()
return success, output, error
async def _execute_webhook(self, task: ScheduledTask):
"""执行Webhook任务"""
try:
headers = {}
if task.webhook_headers:
headers = json.loads(task.webhook_headers)
body = {}
if task.input_params:
body = json.loads(task.input_params)
async with httpx.AsyncClient(timeout=30) as client:
if task.webhook_method.upper() == 'GET':
response = await client.get(task.webhook_url, headers=headers, params=body)
else:
response = await client.post(task.webhook_url, headers=headers, json=body)
response.raise_for_status()
return True, response.text[:5000], ''
except Exception as e:
return False, '', str(e)
async def _execute_script(self, db: Session, task: ScheduledTask, trace_id: str, params: dict):
"""执行脚本任务"""
if not task.script_content:
return False, '', '脚本内容为空'
executor = ScriptExecutor(db)
success, output, error = executor.execute(
script_content=task.script_content,
task_id=task.id,
tenant_id=task.tenant_id,
trace_id=trace_id,
params=params,
timeout=task.script_timeout or 300
)
return success, output, error
async def _send_alert(self, task: ScheduledTask, error: str):
"""发送失败告警"""
if not task.alert_webhook:
return
content = f"""### 定时任务执行失败告警
**任务名称**: {task.task_name}
**任务ID**: {task.id}
**租户**: {task.tenant_id or '全局'}
**失败时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**错误信息**:
```
{error[:500] if error else '未知错误'}
```"""
try:
# 判断是钉钉还是企微
if 'dingtalk' in task.alert_webhook or 'oapi.dingtalk.com' in task.alert_webhook:
payload = {
"msgtype": "markdown",
"markdown": {"title": "任务失败告警", "text": content}
}
else:
payload = {
"msgtype": "markdown",
"markdown": {"content": content}
}
async with httpx.AsyncClient(timeout=10) as client:
await client.post(task.alert_webhook, json=payload)
except Exception as e:
print(f"发送告警失败: {e}")
async def run_task_now(self, task_id: int) -> dict:
"""立即执行任务(手动触发)"""
db = SessionLocal()
try:
task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
if not task:
return {"success": False, "error": "任务不存在"}
# 解析参数
params = {}
if task.input_params:
try:
params = json.loads(task.input_params)
except:
pass
success, output, error = await self._execute_task_once(db, task)
return {
"success": success,
"output": output,
"error": error
}
finally:
db.close()
async def execute_task(task_id: int):
"""执行定时任务入口(处理重试配置)"""
db = get_db_session()
try:
result = db.execute(
text("SELECT retry_count, retry_interval FROM platform_scheduled_tasks WHERE id = :id"),
{"id": task_id}
)
task = result.mappings().first()
if task:
max_retries = task.get("retry_count", 0) or 0
retry_interval = task.get("retry_interval", 60) or 60
await execute_task_with_retry(task_id, 0, max_retries, retry_interval)
else:
await execute_task_once(task_id)
finally:
db.close()
async def execute_task_once(task_id: int) -> bool:
"""执行一次定时任务,返回是否成功"""
db = get_db_session()
log_id = None
success = False
try:
# 1. 查询任务配置
result = db.execute(
text("SELECT * FROM platform_scheduled_tasks WHERE id = :id AND is_enabled = 1"),
{"id": task_id}
)
task = result.mappings().first()
if not task:
logger.warning(f"Task {task_id} not found or disabled")
return True # 不需要重试
# 2. 更新任务状态为运行中
db.execute(
text("UPDATE platform_scheduled_tasks SET last_run_status = 'running', last_run_at = NOW() WHERE id = :id"),
{"id": task_id}
)
db.commit()
# 3. 创建执行日志
db.execute(
text("""
INSERT INTO platform_task_logs (task_id, tenant_id, started_at, status)
VALUES (:task_id, :tenant_id, NOW(), 'running')
"""),
{"task_id": task_id, "tenant_id": task["tenant_id"]}
)
db.commit()
# 获取刚插入的日志ID
result = db.execute(text("SELECT LAST_INSERT_ID() as id"))
log_id = result.scalar()
# 生成 trace_id
trace_id = f"task_{task_id}_{datetime.now().strftime('%Y%m%d%H%M%S%f')}"
# 4. 根据执行类型分发
execution_type = task.get("execution_type", "webhook")
if execution_type == "script":
# 脚本执行模式
from .script_executor import execute_script as run_script
script_content = task.get("script_content", "")
if not script_content:
status = "failed"
error_message = "脚本内容为空"
response_code = None
response_body = ""
else:
script_result = await run_script(
task_id=task_id,
tenant_id=task["tenant_id"],
script_content=script_content,
trace_id=trace_id
)
if script_result.success:
status = "success"
error_message = None
else:
status = "failed"
error_message = script_result.error
response_code = None
response_body = script_result.output[:5000] if script_result.output else ""
# 添加日志到响应体
if script_result.logs:
response_body += "\n\n--- 执行日志 ---\n" + "\n".join(script_result.logs[-20:])
else:
# Webhook 执行模式
webhook_url = task["webhook_url"]
input_params = task["input_params"] or {}
async with httpx.AsyncClient(timeout=300.0) as client:
response = await client.post(
webhook_url,
json=input_params,
headers={"Content-Type": "application/json"}
)
response_code = response.status_code
response_body = response.text[:5000] if response.text else "" # 限制存储长度
if response.is_success:
status = "success"
error_message = None
else:
status = "failed"
error_message = f"HTTP {response_code}"
# 5. 更新执行日志
db.execute(
text("""
UPDATE platform_task_logs
SET finished_at = NOW(), status = :status, response_code = :code,
response_body = :body, error_message = :error
WHERE id = :id
"""),
{
"id": log_id,
"status": status,
"code": response_code,
"body": response_body,
"error": error_message
}
)
# 6. 更新任务状态
db.execute(
text("""
UPDATE platform_scheduled_tasks
SET last_run_status = :status, last_run_message = :message
WHERE id = :id
"""),
{
"id": task_id,
"status": status,
"message": error_message or "执行成功"
}
)
db.commit()
logger.info(f"Task {task_id} executed with status: {status}")
success = (status == "success")
except Exception as e:
logger.error(f"Task {task_id} execution error: {str(e)}")
success = False
# 更新失败状态
try:
if log_id:
db.execute(
text("""
UPDATE platform_task_logs
SET finished_at = NOW(), status = 'failed', error_message = :error
WHERE id = :id
"""),
{"id": log_id, "error": str(e)[:1000]}
)
db.execute(
text("""
UPDATE platform_scheduled_tasks
SET last_run_status = 'failed', last_run_message = :message
WHERE id = :id
"""),
{"id": task_id, "message": str(e)[:500]}
)
db.commit()
except Exception:
pass
finally:
db.close()
return success
def add_task_to_scheduler(task: Dict[str, Any]):
"""将任务添加到调度器"""
global scheduler
if scheduler is None:
return
task_id = task["id"]
schedule_type = task["schedule_type"]
# 先移除已有的任务(如果存在)
remove_task_from_scheduler(task_id)
if schedule_type == "cron":
# CRON 模式
cron_expr = task["cron_expression"]
if cron_expr:
try:
trigger = CronTrigger.from_crontab(cron_expr, timezone="Asia/Shanghai")
scheduler.add_job(
execute_task,
trigger,
args=[task_id],
id=f"task_{task_id}_cron",
replace_existing=True
)
logger.info(f"Added cron task {task_id}: {cron_expr}")
except Exception as e:
logger.error(f"Failed to add cron task {task_id}: {e}")
else:
# 简单模式 - 多个时间点
time_points = task.get("time_points") or []
if isinstance(time_points, str):
import json
time_points = json.loads(time_points)
for i, time_point in enumerate(time_points):
try:
hour, minute = time_point.split(":")
trigger = CronTrigger(
hour=int(hour),
minute=int(minute),
timezone="Asia/Shanghai"
)
scheduler.add_job(
execute_task,
trigger,
args=[task_id],
id=f"task_{task_id}_time_{i}",
replace_existing=True
)
logger.info(f"Added simple task {task_id} at {time_point}")
except Exception as e:
logger.error(f"Failed to add time point {time_point} for task {task_id}: {e}")
def remove_task_from_scheduler(task_id: int):
"""从调度器移除任务"""
global scheduler
if scheduler is None:
return
# 移除所有相关的 job
jobs_to_remove = []
for job in scheduler.get_jobs():
if job.id.startswith(f"task_{task_id}_"):
jobs_to_remove.append(job.id)
for job_id in jobs_to_remove:
try:
scheduler.remove_job(job_id)
logger.info(f"Removed job {job_id}")
except Exception as e:
logger.warning(f"Failed to remove job {job_id}: {e}")
def load_all_tasks():
"""从数据库加载所有启用的任务"""
db = get_db_session()
try:
result = db.execute(
text("SELECT * FROM platform_scheduled_tasks WHERE is_enabled = 1")
)
tasks = result.mappings().all()
for task in tasks:
add_task_to_scheduler(dict(task))
logger.info(f"Loaded {len(tasks)} scheduled tasks")
finally:
db.close()
def start_scheduler():
"""启动调度器"""
global scheduler
scheduler = get_scheduler()
# 加载所有任务
load_all_tasks()
# 启动调度器
if not scheduler.running:
scheduler.start()
logger.info("Scheduler started")
def shutdown_scheduler():
"""关闭调度器"""
global scheduler
if scheduler and scheduler.running:
scheduler.shutdown(wait=False)
logger.info("Scheduler shutdown")
def reload_task(task_id: int):
"""重新加载单个任务"""
db = get_db_session()
try:
result = db.execute(
text("SELECT * FROM platform_scheduled_tasks WHERE id = :id"),
{"id": task_id}
)
task = result.mappings().first()
if task and task["is_enabled"]:
add_task_to_scheduler(dict(task))
else:
remove_task_from_scheduler(task_id)
finally:
db.close()
# 全局调度器实例
scheduler_service = SchedulerService()