All checks were successful
continuous-integration/drone/push Build is passing
- 通知渠道增加 sign_secret 字段存储加签密钥 - 发送钉钉消息时自动计算签名 - 前端增加加签密钥输入框(仅钉钉机器人显示)
429 lines
16 KiB
Python
429 lines
16 KiB
Python
"""定时任务调度服务"""
|
|
import json
|
|
import httpx
|
|
import asyncio
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from sqlalchemy.orm import Session
|
|
|
|
from ..database import SessionLocal
|
|
from ..models.scheduled_task import ScheduledTask, TaskLog
|
|
from ..models.notification_channel import TaskNotifyChannel
|
|
from .script_executor import ScriptExecutor
|
|
|
|
|
|
class SchedulerService:
|
|
"""调度服务 - 管理定时任务的调度和执行"""
|
|
|
|
_instance: Optional['SchedulerService'] = None
|
|
_scheduler: Optional[AsyncIOScheduler] = None
|
|
|
|
def __new__(cls):
|
|
if cls._instance is None:
|
|
cls._instance = super().__new__(cls)
|
|
return cls._instance
|
|
|
|
def __init__(self):
|
|
if self._scheduler is None:
|
|
self._scheduler = AsyncIOScheduler(timezone='Asia/Shanghai')
|
|
|
|
@property
|
|
def scheduler(self) -> AsyncIOScheduler:
|
|
return self._scheduler
|
|
|
|
def start(self):
|
|
"""启动调度器并加载所有任务"""
|
|
if not self._scheduler.running:
|
|
self._scheduler.start()
|
|
self._load_all_tasks()
|
|
print("调度器已启动")
|
|
|
|
def shutdown(self):
|
|
"""关闭调度器"""
|
|
if self._scheduler.running:
|
|
self._scheduler.shutdown()
|
|
print("调度器已关闭")
|
|
|
|
def _load_all_tasks(self):
|
|
"""从数据库加载所有启用的任务"""
|
|
db = SessionLocal()
|
|
try:
|
|
tasks = db.query(ScheduledTask).filter(ScheduledTask.is_enabled == True).all()
|
|
for task in tasks:
|
|
self._add_task_to_scheduler(task)
|
|
print(f"已加载 {len(tasks)} 个定时任务")
|
|
finally:
|
|
db.close()
|
|
|
|
def _add_task_to_scheduler(self, task: ScheduledTask):
|
|
"""将任务添加到调度器"""
|
|
job_id = f"task_{task.id}"
|
|
|
|
# 移除已存在的任务
|
|
if self._scheduler.get_job(job_id):
|
|
self._scheduler.remove_job(job_id)
|
|
|
|
if task.schedule_type == 'cron' and task.cron_expression:
|
|
# CRON模式
|
|
try:
|
|
trigger = CronTrigger.from_crontab(task.cron_expression, timezone='Asia/Shanghai')
|
|
self._scheduler.add_job(
|
|
self._execute_task,
|
|
trigger,
|
|
id=job_id,
|
|
args=[task.id],
|
|
replace_existing=True
|
|
)
|
|
except Exception as e:
|
|
print(f"任务 {task.id} CRON表达式解析失败: {e}")
|
|
|
|
elif task.schedule_type == 'simple' and task.time_points:
|
|
# 简单模式 - 多个时间点
|
|
try:
|
|
time_points = task.time_points if isinstance(task.time_points, list) else json.loads(task.time_points)
|
|
for i, time_point in enumerate(time_points):
|
|
hour, minute = map(int, time_point.split(':'))
|
|
sub_job_id = f"{job_id}_{i}"
|
|
self._scheduler.add_job(
|
|
self._execute_task,
|
|
CronTrigger(hour=hour, minute=minute, timezone='Asia/Shanghai'),
|
|
id=sub_job_id,
|
|
args=[task.id],
|
|
replace_existing=True
|
|
)
|
|
except Exception as e:
|
|
print(f"任务 {task.id} 时间点解析失败: {e}")
|
|
|
|
def add_task(self, task_id: int):
|
|
"""添加或更新任务调度"""
|
|
db = SessionLocal()
|
|
try:
|
|
task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
|
|
if task and task.is_enabled:
|
|
self._add_task_to_scheduler(task)
|
|
finally:
|
|
db.close()
|
|
|
|
def remove_task(self, task_id: int):
|
|
"""移除任务调度"""
|
|
job_id = f"task_{task_id}"
|
|
|
|
# 移除主任务
|
|
if self._scheduler.get_job(job_id):
|
|
self._scheduler.remove_job(job_id)
|
|
|
|
# 移除简单模式的子任务
|
|
for i in range(24): # 最多24个时间点
|
|
sub_job_id = f"{job_id}_{i}"
|
|
if self._scheduler.get_job(sub_job_id):
|
|
self._scheduler.remove_job(sub_job_id)
|
|
|
|
async def _execute_task(self, task_id: int):
|
|
"""执行任务(带重试)"""
|
|
db = SessionLocal()
|
|
try:
|
|
task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
|
|
if not task:
|
|
return
|
|
|
|
max_retries = task.retry_count or 0
|
|
retry_interval = task.retry_interval or 60
|
|
|
|
for attempt in range(max_retries + 1):
|
|
success, output, error = await self._execute_task_once(db, task)
|
|
|
|
if success:
|
|
return
|
|
|
|
# 如果还有重试机会
|
|
if attempt < max_retries:
|
|
print(f"任务 {task_id} 执行失败,{retry_interval}秒后重试 ({attempt + 1}/{max_retries})")
|
|
await asyncio.sleep(retry_interval)
|
|
else:
|
|
# 最后一次失败,发送告警
|
|
if task.alert_on_failure and task.alert_webhook:
|
|
await self._send_alert(task, error)
|
|
finally:
|
|
db.close()
|
|
|
|
async def _execute_task_once(self, db: Session, task: ScheduledTask):
|
|
"""执行一次任务"""
|
|
trace_id = f"{int(datetime.now().timestamp())}-{task.id}"
|
|
started_at = datetime.now()
|
|
|
|
# 创建日志记录
|
|
log = TaskLog(
|
|
task_id=task.id,
|
|
tenant_id=task.tenant_id,
|
|
trace_id=trace_id,
|
|
status='running',
|
|
started_at=started_at
|
|
)
|
|
db.add(log)
|
|
db.commit()
|
|
db.refresh(log)
|
|
|
|
success = False
|
|
output = ''
|
|
error = ''
|
|
result = None
|
|
|
|
try:
|
|
# 解析输入参数
|
|
params = {}
|
|
if task.input_params:
|
|
params = task.input_params if isinstance(task.input_params, dict) else {}
|
|
|
|
if task.execution_type == 'webhook':
|
|
success, output, error = await self._execute_webhook(task)
|
|
else:
|
|
success, output, error, result = await self._execute_script(db, task, trace_id, params)
|
|
|
|
# 如果脚本执行成功且有返回内容,发送通知
|
|
if success and result and result.get('content'):
|
|
await self._send_notifications(db, task, result)
|
|
|
|
except Exception as e:
|
|
error = str(e)
|
|
|
|
# 更新日志
|
|
finished_at = datetime.now()
|
|
duration_ms = int((finished_at - started_at).total_seconds() * 1000)
|
|
|
|
log.status = 'success' if success else 'failed'
|
|
log.finished_at = finished_at
|
|
log.duration_ms = duration_ms
|
|
log.output = output[:10000] if output else None # 限制长度
|
|
log.error = error[:5000] if error else None
|
|
|
|
# 更新任务状态
|
|
task.last_run_at = finished_at
|
|
task.last_run_status = 'success' if success else 'failed'
|
|
|
|
db.commit()
|
|
|
|
return success, output, error
|
|
|
|
async def _execute_webhook(self, task: ScheduledTask):
|
|
"""执行Webhook任务"""
|
|
try:
|
|
body = {}
|
|
if task.input_params:
|
|
body = task.input_params if isinstance(task.input_params, dict) else {}
|
|
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
response = await client.post(task.webhook_url, json=body)
|
|
response.raise_for_status()
|
|
return True, response.text[:5000], ''
|
|
|
|
except Exception as e:
|
|
return False, '', str(e)
|
|
|
|
async def _execute_script(self, db: Session, task: ScheduledTask, trace_id: str, params: dict):
|
|
"""执行脚本任务"""
|
|
if not task.script_content:
|
|
return False, '', '脚本内容为空', None
|
|
|
|
executor = ScriptExecutor(db)
|
|
success, output, error, result = executor.execute(
|
|
script_content=task.script_content,
|
|
task_id=task.id,
|
|
tenant_id=task.tenant_id,
|
|
trace_id=trace_id,
|
|
params=params,
|
|
timeout=300 # 默认超时
|
|
)
|
|
|
|
return success, output, error, result
|
|
|
|
async def _send_notifications(self, db: Session, task: ScheduledTask, result: dict):
|
|
"""发送通知到配置的渠道"""
|
|
content = result.get('content', '')
|
|
title = result.get('title', task.task_name)
|
|
|
|
if not content:
|
|
return
|
|
|
|
# 获取通知渠道配置
|
|
channel_ids = task.notify_channels
|
|
if isinstance(channel_ids, str):
|
|
try:
|
|
channel_ids = json.loads(channel_ids)
|
|
except:
|
|
channel_ids = []
|
|
|
|
if not channel_ids:
|
|
channel_ids = []
|
|
|
|
# 发送到通知渠道
|
|
for channel_id in channel_ids:
|
|
try:
|
|
channel = db.query(TaskNotifyChannel).filter(
|
|
TaskNotifyChannel.id == channel_id,
|
|
TaskNotifyChannel.is_enabled == True
|
|
).first()
|
|
|
|
if not channel:
|
|
continue
|
|
|
|
await self._send_to_channel(channel, content, title)
|
|
|
|
except Exception as e:
|
|
print(f"发送通知到渠道 {channel_id} 失败: {e}")
|
|
|
|
# 发送到企微应用
|
|
if task.notify_wecom_app_id:
|
|
try:
|
|
await self._send_to_wecom_app(db, task.notify_wecom_app_id, content, title, task.tenant_id)
|
|
except Exception as e:
|
|
print(f"发送企微应用消息失败: {e}")
|
|
|
|
async def _send_to_channel(self, channel: TaskNotifyChannel, content: str, title: str):
|
|
"""发送消息到通知渠道"""
|
|
import time
|
|
import hmac
|
|
import hashlib
|
|
import base64
|
|
import urllib.parse
|
|
|
|
url = channel.webhook_url
|
|
|
|
if channel.channel_type == 'dingtalk_bot':
|
|
# 钉钉加签
|
|
if channel.sign_secret:
|
|
timestamp = str(round(time.time() * 1000))
|
|
string_to_sign = f'{timestamp}\n{channel.sign_secret}'
|
|
hmac_code = hmac.new(
|
|
channel.sign_secret.encode('utf-8'),
|
|
string_to_sign.encode('utf-8'),
|
|
digestmod=hashlib.sha256
|
|
).digest()
|
|
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
|
|
|
|
if '?' in url:
|
|
url = f"{url}×tamp={timestamp}&sign={sign}"
|
|
else:
|
|
url = f"{url}?timestamp={timestamp}&sign={sign}"
|
|
|
|
payload = {
|
|
"msgtype": "markdown",
|
|
"markdown": {
|
|
"title": title,
|
|
"text": content
|
|
}
|
|
}
|
|
else: # wecom_bot
|
|
payload = {
|
|
"msgtype": "markdown",
|
|
"markdown": {
|
|
"content": f"**{title}**\n\n{content}"
|
|
}
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
response = await client.post(url, json=payload)
|
|
result = response.json()
|
|
if result.get('errcode') != 0:
|
|
print(f"通知发送失败: {result}")
|
|
|
|
async def _send_to_wecom_app(self, db: Session, app_id: int, content: str, title: str, tenant_id: str):
|
|
"""发送消息到企微应用"""
|
|
from ..models.tenant_wechat_app import TenantWechatApp
|
|
|
|
app = db.query(TenantWechatApp).filter(TenantWechatApp.id == app_id).first()
|
|
if not app:
|
|
return
|
|
|
|
# 获取 access_token
|
|
access_token = await self._get_wecom_access_token(app.corp_id, app.app_secret)
|
|
if not access_token:
|
|
return
|
|
|
|
# 发送消息
|
|
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
|
|
payload = {
|
|
"touser": "@all",
|
|
"msgtype": "markdown",
|
|
"agentid": app.agent_id,
|
|
"markdown": {
|
|
"content": f"**{title}**\n\n{content}"
|
|
}
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
response = await client.post(url, json=payload)
|
|
result = response.json()
|
|
if result.get('errcode') != 0:
|
|
print(f"企微应用消息发送失败: {result}")
|
|
|
|
async def _get_wecom_access_token(self, corp_id: str, app_secret: str) -> Optional[str]:
|
|
"""获取企微 access_token"""
|
|
url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corp_id}&corpsecret={app_secret}"
|
|
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
response = await client.get(url)
|
|
result = response.json()
|
|
if result.get('errcode') == 0:
|
|
return result.get('access_token')
|
|
else:
|
|
print(f"获取企微 access_token 失败: {result}")
|
|
return None
|
|
|
|
async def _send_alert(self, task: ScheduledTask, error: str):
|
|
"""发送失败告警"""
|
|
if not task.alert_webhook:
|
|
return
|
|
|
|
content = f"""### 定时任务执行失败告警
|
|
|
|
**任务名称**: {task.task_name}
|
|
**任务ID**: {task.id}
|
|
**租户**: {task.tenant_id or '全局'}
|
|
**失败时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
|
|
**错误信息**:
|
|
```
|
|
{error[:500] if error else '未知错误'}
|
|
```"""
|
|
|
|
try:
|
|
# 判断是钉钉还是企微
|
|
if 'dingtalk' in task.alert_webhook or 'oapi.dingtalk.com' in task.alert_webhook:
|
|
payload = {
|
|
"msgtype": "markdown",
|
|
"markdown": {"title": "任务失败告警", "text": content}
|
|
}
|
|
else:
|
|
payload = {
|
|
"msgtype": "markdown",
|
|
"markdown": {"content": content}
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
await client.post(task.alert_webhook, json=payload)
|
|
except Exception as e:
|
|
print(f"发送告警失败: {e}")
|
|
|
|
async def run_task_now(self, task_id: int) -> dict:
|
|
"""立即执行任务(手动触发)"""
|
|
db = SessionLocal()
|
|
try:
|
|
task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
|
|
if not task:
|
|
return {"success": False, "error": "任务不存在"}
|
|
|
|
success, output, error = await self._execute_task_once(db, task)
|
|
|
|
return {
|
|
"success": success,
|
|
"output": output,
|
|
"error": error
|
|
}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# 全局调度器实例
|
|
scheduler_service = SchedulerService()
|