Files
000-platform/backend/app/services/scheduler.py
Admin 9b72e6127f
Some checks failed
continuous-integration/drone/push Build is failing
feat: 脚本执行平台增强功能
- 新增重试和失败告警功能(支持自动重试N次,失败后钉钉/企微通知)
- 新增密钥管理(安全存储API Key等敏感信息)
- 新增脚本模板库(预置常用脚本模板)
- 新增脚本版本管理(自动保存历史版本,支持回滚)
- 新增执行统计(成功率、平均耗时、7日趋势)
- SDK 新增多租户遍历能力(get_tenants/get_tenant_config/get_all_tenant_configs)
- SDK 新增密钥读取方法(get_secret)
2026-01-28 11:59:50 +08:00

400 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""定时任务调度器服务"""
import asyncio
import logging
from datetime import datetime
from typing import Optional, List, Dict, Any
import httpx
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy import text
from sqlalchemy.orm import Session
from ..database import SessionLocal
logger = logging.getLogger(__name__)
# 全局调度器实例
scheduler: Optional[AsyncIOScheduler] = None
def get_scheduler() -> AsyncIOScheduler:
"""获取调度器实例"""
global scheduler
if scheduler is None:
scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
return scheduler
def get_db_session() -> Session:
"""获取数据库会话"""
return SessionLocal()
async def send_alert(webhook: str, task_name: str, error_message: str):
"""发送失败告警通知"""
try:
# 自动判断钉钉或企微
if "dingtalk" in webhook or "oapi.dingtalk.com" in webhook:
data = {
"msgtype": "markdown",
"markdown": {
"title": "定时任务执行失败",
"text": f"### ⚠️ 定时任务执行失败\n\n**任务名称**{task_name}\n\n**错误信息**{error_message[:500]}\n\n**时间**{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
}
}
else:
# 企微格式
data = {
"msgtype": "markdown",
"markdown": {
"content": f"### ⚠️ 定时任务执行失败\n\n**任务名称**{task_name}\n\n**错误信息**{error_message[:500]}\n\n**时间**{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
}
}
async with httpx.AsyncClient(timeout=30.0) as client:
await client.post(webhook, json=data)
logger.info(f"Alert sent for task {task_name}")
except Exception as e:
logger.warning(f"Failed to send alert: {e}")
async def execute_task_with_retry(task_id: int, retry_count: int = 0, max_retries: int = 0, retry_interval: int = 60):
"""带重试的任务执行"""
success = await execute_task_once(task_id)
if not success and retry_count < max_retries:
logger.info(f"Task {task_id} failed, scheduling retry {retry_count + 1}/{max_retries} in {retry_interval}s")
await asyncio.sleep(retry_interval)
await execute_task_with_retry(task_id, retry_count + 1, max_retries, retry_interval)
elif not success:
# 所有重试都失败,发送告警
db = get_db_session()
try:
result = db.execute(
text("SELECT task_name, alert_on_failure, alert_webhook, last_run_message FROM platform_scheduled_tasks WHERE id = :id"),
{"id": task_id}
)
task = result.mappings().first()
if task and task["alert_on_failure"] and task["alert_webhook"]:
await send_alert(task["alert_webhook"], task["task_name"], task["last_run_message"] or "未知错误")
finally:
db.close()
async def execute_task(task_id: int):
"""执行定时任务入口(处理重试配置)"""
db = get_db_session()
try:
result = db.execute(
text("SELECT retry_count, retry_interval FROM platform_scheduled_tasks WHERE id = :id"),
{"id": task_id}
)
task = result.mappings().first()
if task:
max_retries = task.get("retry_count", 0) or 0
retry_interval = task.get("retry_interval", 60) or 60
await execute_task_with_retry(task_id, 0, max_retries, retry_interval)
else:
await execute_task_once(task_id)
finally:
db.close()
async def execute_task_once(task_id: int) -> bool:
"""执行一次定时任务,返回是否成功"""
db = get_db_session()
log_id = None
success = False
try:
# 1. 查询任务配置
result = db.execute(
text("SELECT * FROM platform_scheduled_tasks WHERE id = :id AND is_enabled = 1"),
{"id": task_id}
)
task = result.mappings().first()
if not task:
logger.warning(f"Task {task_id} not found or disabled")
return True # 不需要重试
# 2. 更新任务状态为运行中
db.execute(
text("UPDATE platform_scheduled_tasks SET last_run_status = 'running', last_run_at = NOW() WHERE id = :id"),
{"id": task_id}
)
db.commit()
# 3. 创建执行日志
db.execute(
text("""
INSERT INTO platform_task_logs (task_id, tenant_id, started_at, status)
VALUES (:task_id, :tenant_id, NOW(), 'running')
"""),
{"task_id": task_id, "tenant_id": task["tenant_id"]}
)
db.commit()
# 获取刚插入的日志ID
result = db.execute(text("SELECT LAST_INSERT_ID() as id"))
log_id = result.scalar()
# 生成 trace_id
trace_id = f"task_{task_id}_{datetime.now().strftime('%Y%m%d%H%M%S%f')}"
# 4. 根据执行类型分发
execution_type = task.get("execution_type", "webhook")
if execution_type == "script":
# 脚本执行模式
from .script_executor import execute_script as run_script
script_content = task.get("script_content", "")
if not script_content:
status = "failed"
error_message = "脚本内容为空"
response_code = None
response_body = ""
else:
script_result = await run_script(
task_id=task_id,
tenant_id=task["tenant_id"],
script_content=script_content,
trace_id=trace_id
)
if script_result.success:
status = "success"
error_message = None
else:
status = "failed"
error_message = script_result.error
response_code = None
response_body = script_result.output[:5000] if script_result.output else ""
# 添加日志到响应体
if script_result.logs:
response_body += "\n\n--- 执行日志 ---\n" + "\n".join(script_result.logs[-20:])
else:
# Webhook 执行模式
webhook_url = task["webhook_url"]
input_params = task["input_params"] or {}
async with httpx.AsyncClient(timeout=300.0) as client:
response = await client.post(
webhook_url,
json=input_params,
headers={"Content-Type": "application/json"}
)
response_code = response.status_code
response_body = response.text[:5000] if response.text else "" # 限制存储长度
if response.is_success:
status = "success"
error_message = None
else:
status = "failed"
error_message = f"HTTP {response_code}"
# 5. 更新执行日志
db.execute(
text("""
UPDATE platform_task_logs
SET finished_at = NOW(), status = :status, response_code = :code,
response_body = :body, error_message = :error
WHERE id = :id
"""),
{
"id": log_id,
"status": status,
"code": response_code,
"body": response_body,
"error": error_message
}
)
# 6. 更新任务状态
db.execute(
text("""
UPDATE platform_scheduled_tasks
SET last_run_status = :status, last_run_message = :message
WHERE id = :id
"""),
{
"id": task_id,
"status": status,
"message": error_message or "执行成功"
}
)
db.commit()
logger.info(f"Task {task_id} executed with status: {status}")
success = (status == "success")
except Exception as e:
logger.error(f"Task {task_id} execution error: {str(e)}")
success = False
# 更新失败状态
try:
if log_id:
db.execute(
text("""
UPDATE platform_task_logs
SET finished_at = NOW(), status = 'failed', error_message = :error
WHERE id = :id
"""),
{"id": log_id, "error": str(e)[:1000]}
)
db.execute(
text("""
UPDATE platform_scheduled_tasks
SET last_run_status = 'failed', last_run_message = :message
WHERE id = :id
"""),
{"id": task_id, "message": str(e)[:500]}
)
db.commit()
except Exception:
pass
finally:
db.close()
return success
def add_task_to_scheduler(task: Dict[str, Any]):
"""将任务添加到调度器"""
global scheduler
if scheduler is None:
return
task_id = task["id"]
schedule_type = task["schedule_type"]
# 先移除已有的任务(如果存在)
remove_task_from_scheduler(task_id)
if schedule_type == "cron":
# CRON 模式
cron_expr = task["cron_expression"]
if cron_expr:
try:
trigger = CronTrigger.from_crontab(cron_expr, timezone="Asia/Shanghai")
scheduler.add_job(
execute_task,
trigger,
args=[task_id],
id=f"task_{task_id}_cron",
replace_existing=True
)
logger.info(f"Added cron task {task_id}: {cron_expr}")
except Exception as e:
logger.error(f"Failed to add cron task {task_id}: {e}")
else:
# 简单模式 - 多个时间点
time_points = task.get("time_points") or []
if isinstance(time_points, str):
import json
time_points = json.loads(time_points)
for i, time_point in enumerate(time_points):
try:
hour, minute = time_point.split(":")
trigger = CronTrigger(
hour=int(hour),
minute=int(minute),
timezone="Asia/Shanghai"
)
scheduler.add_job(
execute_task,
trigger,
args=[task_id],
id=f"task_{task_id}_time_{i}",
replace_existing=True
)
logger.info(f"Added simple task {task_id} at {time_point}")
except Exception as e:
logger.error(f"Failed to add time point {time_point} for task {task_id}: {e}")
def remove_task_from_scheduler(task_id: int):
"""从调度器移除任务"""
global scheduler
if scheduler is None:
return
# 移除所有相关的 job
jobs_to_remove = []
for job in scheduler.get_jobs():
if job.id.startswith(f"task_{task_id}_"):
jobs_to_remove.append(job.id)
for job_id in jobs_to_remove:
try:
scheduler.remove_job(job_id)
logger.info(f"Removed job {job_id}")
except Exception as e:
logger.warning(f"Failed to remove job {job_id}: {e}")
def load_all_tasks():
"""从数据库加载所有启用的任务"""
db = get_db_session()
try:
result = db.execute(
text("SELECT * FROM platform_scheduled_tasks WHERE is_enabled = 1")
)
tasks = result.mappings().all()
for task in tasks:
add_task_to_scheduler(dict(task))
logger.info(f"Loaded {len(tasks)} scheduled tasks")
finally:
db.close()
def start_scheduler():
"""启动调度器"""
global scheduler
scheduler = get_scheduler()
# 加载所有任务
load_all_tasks()
# 启动调度器
if not scheduler.running:
scheduler.start()
logger.info("Scheduler started")
def shutdown_scheduler():
"""关闭调度器"""
global scheduler
if scheduler and scheduler.running:
scheduler.shutdown(wait=False)
logger.info("Scheduler shutdown")
def reload_task(task_id: int):
"""重新加载单个任务"""
db = get_db_session()
try:
result = db.execute(
text("SELECT * FROM platform_scheduled_tasks WHERE id = :id"),
{"id": task_id}
)
task = result.mappings().first()
if task and task["is_enabled"]:
add_task_to_scheduler(dict(task))
else:
remove_task_from_scheduler(task_id)
finally:
db.close()