All checks were successful
continuous-integration/drone/push Build is passing
钉钉机器人支持:
- text: 纯文本(支持@人)
- markdown: Markdown格式
- link: 链接消息
- actionCard: 交互卡片(整体跳转/独立跳转按钮)
- feedCard: 信息流卡片
企微机器人支持:
- text: 纯文本(支持@人)
- markdown: Markdown格式
- image: 图片
- news: 图文消息
- template_card: 模板卡片(文本通知/图文展示/按钮交互)
使用方式: result = {'msg_type': 'actionCard', 'title': '...', 'content': '...', 'buttons': [...]}
610 lines
23 KiB
Python
610 lines
23 KiB
Python
"""定时任务调度服务"""
|
||
import json
|
||
import httpx
|
||
import asyncio
|
||
from datetime import datetime
|
||
from typing import Optional
|
||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||
from apscheduler.triggers.cron import CronTrigger
|
||
from sqlalchemy.orm import Session
|
||
|
||
from ..database import SessionLocal
|
||
from ..models.scheduled_task import ScheduledTask, TaskLog
|
||
from ..models.notification_channel import TaskNotifyChannel
|
||
from .script_executor import ScriptExecutor
|
||
|
||
|
||
class SchedulerService:
|
||
"""调度服务 - 管理定时任务的调度和执行"""
|
||
|
||
_instance: Optional['SchedulerService'] = None
|
||
_scheduler: Optional[AsyncIOScheduler] = None
|
||
|
||
def __new__(cls):
|
||
if cls._instance is None:
|
||
cls._instance = super().__new__(cls)
|
||
return cls._instance
|
||
|
||
def __init__(self):
|
||
if self._scheduler is None:
|
||
self._scheduler = AsyncIOScheduler(timezone='Asia/Shanghai')
|
||
|
||
@property
|
||
def scheduler(self) -> AsyncIOScheduler:
|
||
return self._scheduler
|
||
|
||
def start(self):
|
||
"""启动调度器并加载所有任务"""
|
||
if not self._scheduler.running:
|
||
self._scheduler.start()
|
||
self._load_all_tasks()
|
||
print("调度器已启动")
|
||
|
||
def shutdown(self):
|
||
"""关闭调度器"""
|
||
if self._scheduler.running:
|
||
self._scheduler.shutdown()
|
||
print("调度器已关闭")
|
||
|
||
def _load_all_tasks(self):
|
||
"""从数据库加载所有启用的任务"""
|
||
db = SessionLocal()
|
||
try:
|
||
tasks = db.query(ScheduledTask).filter(ScheduledTask.is_enabled == True).all()
|
||
for task in tasks:
|
||
self._add_task_to_scheduler(task)
|
||
print(f"已加载 {len(tasks)} 个定时任务")
|
||
finally:
|
||
db.close()
|
||
|
||
def _add_task_to_scheduler(self, task: ScheduledTask):
|
||
"""将任务添加到调度器"""
|
||
job_id = f"task_{task.id}"
|
||
|
||
# 移除已存在的任务
|
||
if self._scheduler.get_job(job_id):
|
||
self._scheduler.remove_job(job_id)
|
||
|
||
if task.schedule_type == 'cron' and task.cron_expression:
|
||
# CRON模式
|
||
try:
|
||
trigger = CronTrigger.from_crontab(task.cron_expression, timezone='Asia/Shanghai')
|
||
self._scheduler.add_job(
|
||
self._execute_task,
|
||
trigger,
|
||
id=job_id,
|
||
args=[task.id],
|
||
replace_existing=True
|
||
)
|
||
except Exception as e:
|
||
print(f"任务 {task.id} CRON表达式解析失败: {e}")
|
||
|
||
elif task.schedule_type == 'simple' and task.time_points:
|
||
# 简单模式 - 多个时间点
|
||
try:
|
||
time_points = task.time_points if isinstance(task.time_points, list) else json.loads(task.time_points)
|
||
for i, time_point in enumerate(time_points):
|
||
hour, minute = map(int, time_point.split(':'))
|
||
sub_job_id = f"{job_id}_{i}"
|
||
self._scheduler.add_job(
|
||
self._execute_task,
|
||
CronTrigger(hour=hour, minute=minute, timezone='Asia/Shanghai'),
|
||
id=sub_job_id,
|
||
args=[task.id],
|
||
replace_existing=True
|
||
)
|
||
except Exception as e:
|
||
print(f"任务 {task.id} 时间点解析失败: {e}")
|
||
|
||
def add_task(self, task_id: int):
|
||
"""添加或更新任务调度"""
|
||
db = SessionLocal()
|
||
try:
|
||
task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
|
||
if task and task.is_enabled:
|
||
self._add_task_to_scheduler(task)
|
||
finally:
|
||
db.close()
|
||
|
||
def remove_task(self, task_id: int):
|
||
"""移除任务调度"""
|
||
job_id = f"task_{task_id}"
|
||
|
||
# 移除主任务
|
||
if self._scheduler.get_job(job_id):
|
||
self._scheduler.remove_job(job_id)
|
||
|
||
# 移除简单模式的子任务
|
||
for i in range(24): # 最多24个时间点
|
||
sub_job_id = f"{job_id}_{i}"
|
||
if self._scheduler.get_job(sub_job_id):
|
||
self._scheduler.remove_job(sub_job_id)
|
||
|
||
async def _execute_task(self, task_id: int):
|
||
"""执行任务(带重试)"""
|
||
db = SessionLocal()
|
||
try:
|
||
task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
|
||
if not task:
|
||
return
|
||
|
||
max_retries = task.retry_count or 0
|
||
retry_interval = task.retry_interval or 60
|
||
|
||
for attempt in range(max_retries + 1):
|
||
success, output, error = await self._execute_task_once(db, task)
|
||
|
||
if success:
|
||
return
|
||
|
||
# 如果还有重试机会
|
||
if attempt < max_retries:
|
||
print(f"任务 {task_id} 执行失败,{retry_interval}秒后重试 ({attempt + 1}/{max_retries})")
|
||
await asyncio.sleep(retry_interval)
|
||
else:
|
||
# 最后一次失败,发送告警
|
||
if task.alert_on_failure and task.alert_webhook:
|
||
await self._send_alert(task, error)
|
||
finally:
|
||
db.close()
|
||
|
||
async def _execute_task_once(self, db: Session, task: ScheduledTask):
|
||
"""执行一次任务"""
|
||
trace_id = f"{int(datetime.now().timestamp())}-{task.id}"
|
||
started_at = datetime.now()
|
||
|
||
# 创建日志记录
|
||
log = TaskLog(
|
||
task_id=task.id,
|
||
tenant_id=task.tenant_id,
|
||
trace_id=trace_id,
|
||
status='running',
|
||
started_at=started_at
|
||
)
|
||
db.add(log)
|
||
db.commit()
|
||
db.refresh(log)
|
||
|
||
success = False
|
||
output = ''
|
||
error = ''
|
||
result = None
|
||
|
||
try:
|
||
# 解析输入参数
|
||
params = {}
|
||
if task.input_params:
|
||
params = task.input_params if isinstance(task.input_params, dict) else {}
|
||
|
||
if task.execution_type == 'webhook':
|
||
success, output, error = await self._execute_webhook(task)
|
||
else:
|
||
success, output, error, result = await self._execute_script(db, task, trace_id, params)
|
||
|
||
# 如果脚本执行成功且有返回内容,发送通知
|
||
if success and result and result.get('content'):
|
||
await self._send_notifications(db, task, result)
|
||
|
||
except Exception as e:
|
||
error = str(e)
|
||
|
||
# 更新日志
|
||
finished_at = datetime.now()
|
||
duration_ms = int((finished_at - started_at).total_seconds() * 1000)
|
||
|
||
log.status = 'success' if success else 'failed'
|
||
log.finished_at = finished_at
|
||
log.duration_ms = duration_ms
|
||
log.output = output[:10000] if output else None # 限制长度
|
||
log.error = error[:5000] if error else None
|
||
|
||
# 更新任务状态
|
||
task.last_run_at = finished_at
|
||
task.last_run_status = 'success' if success else 'failed'
|
||
|
||
db.commit()
|
||
|
||
return success, output, error
|
||
|
||
async def _execute_webhook(self, task: ScheduledTask):
|
||
"""执行Webhook任务"""
|
||
try:
|
||
body = {}
|
||
if task.input_params:
|
||
body = task.input_params if isinstance(task.input_params, dict) else {}
|
||
|
||
async with httpx.AsyncClient(timeout=30) as client:
|
||
response = await client.post(task.webhook_url, json=body)
|
||
response.raise_for_status()
|
||
return True, response.text[:5000], ''
|
||
|
||
except Exception as e:
|
||
return False, '', str(e)
|
||
|
||
async def _execute_script(self, db: Session, task: ScheduledTask, trace_id: str, params: dict):
|
||
"""执行脚本任务"""
|
||
if not task.script_content:
|
||
return False, '', '脚本内容为空', None
|
||
|
||
executor = ScriptExecutor(db)
|
||
success, output, error, result = executor.execute(
|
||
script_content=task.script_content,
|
||
task_id=task.id,
|
||
tenant_id=task.tenant_id,
|
||
trace_id=trace_id,
|
||
params=params,
|
||
timeout=300 # 默认超时
|
||
)
|
||
|
||
return success, output, error, result
|
||
|
||
async def _send_notifications(self, db: Session, task: ScheduledTask, result: dict):
|
||
"""发送通知到配置的渠道
|
||
|
||
result 格式:
|
||
- 简单格式: {'content': '内容', 'title': '标题'}
|
||
- 完整格式: {'msg_type': 'actionCard', 'title': '...', 'content': '...', 'buttons': [...]}
|
||
|
||
支持的 msg_type:
|
||
- text: 纯文本
|
||
- markdown: Markdown格式(默认)
|
||
- link: 链接消息
|
||
- actionCard: 交互卡片(带按钮)
|
||
- feedCard: 信息流卡片
|
||
- news: 图文消息(企微)
|
||
- template_card: 模板卡片(企微)
|
||
"""
|
||
content = result.get('content', '')
|
||
title = result.get('title', task.task_name)
|
||
|
||
if not content and result.get('msg_type') not in ('feedCard', 'news', 'template_card'):
|
||
return
|
||
|
||
# 获取通知渠道配置
|
||
channel_ids = task.notify_channels
|
||
if isinstance(channel_ids, str):
|
||
try:
|
||
channel_ids = json.loads(channel_ids)
|
||
except:
|
||
channel_ids = []
|
||
|
||
if not channel_ids:
|
||
channel_ids = []
|
||
|
||
# 发送到通知渠道
|
||
for channel_id in channel_ids:
|
||
try:
|
||
channel = db.query(TaskNotifyChannel).filter(
|
||
TaskNotifyChannel.id == channel_id,
|
||
TaskNotifyChannel.is_enabled == True
|
||
).first()
|
||
|
||
if not channel:
|
||
continue
|
||
|
||
await self._send_to_channel(channel, result)
|
||
|
||
except Exception as e:
|
||
print(f"发送通知到渠道 {channel_id} 失败: {e}")
|
||
|
||
# 发送到企微应用
|
||
if task.notify_wecom_app_id:
|
||
try:
|
||
await self._send_to_wecom_app(db, task.notify_wecom_app_id, result, task.tenant_id)
|
||
except Exception as e:
|
||
print(f"发送企微应用消息失败: {e}")
|
||
|
||
async def _send_to_channel(self, channel: TaskNotifyChannel, result: dict):
|
||
"""发送消息到通知渠道
|
||
|
||
钉钉支持: text, markdown, link, actionCard, feedCard
|
||
企微支持: text, markdown, image, news, template_card
|
||
"""
|
||
import time
|
||
import hmac
|
||
import hashlib
|
||
import base64
|
||
import urllib.parse
|
||
|
||
url = channel.webhook_url
|
||
msg_type = result.get('msg_type', 'markdown')
|
||
title = result.get('title', '通知')
|
||
content = result.get('content', '')
|
||
|
||
if channel.channel_type == 'dingtalk_bot':
|
||
# 钉钉加签
|
||
if channel.sign_secret:
|
||
timestamp = str(round(time.time() * 1000))
|
||
string_to_sign = f'{timestamp}\n{channel.sign_secret}'
|
||
hmac_code = hmac.new(
|
||
channel.sign_secret.encode('utf-8'),
|
||
string_to_sign.encode('utf-8'),
|
||
digestmod=hashlib.sha256
|
||
).digest()
|
||
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
|
||
|
||
if '?' in url:
|
||
url = f"{url}×tamp={timestamp}&sign={sign}"
|
||
else:
|
||
url = f"{url}?timestamp={timestamp}&sign={sign}"
|
||
|
||
payload = self._build_dingtalk_payload(msg_type, title, content, result)
|
||
else: # wecom_bot
|
||
payload = self._build_wecom_payload(msg_type, title, content, result)
|
||
|
||
async with httpx.AsyncClient(timeout=10) as client:
|
||
response = await client.post(url, json=payload)
|
||
resp = response.json()
|
||
if resp.get('errcode') != 0:
|
||
print(f"通知发送失败: {resp}")
|
||
|
||
def _build_dingtalk_payload(self, msg_type: str, title: str, content: str, result: dict) -> dict:
|
||
"""构建钉钉消息体
|
||
|
||
支持类型:
|
||
- text: 纯文本
|
||
- markdown: Markdown
|
||
- link: 链接消息
|
||
- actionCard: 交互卡片(整体跳转/独立跳转)
|
||
- feedCard: 信息流卡片
|
||
"""
|
||
if msg_type == 'text':
|
||
return {
|
||
"msgtype": "text",
|
||
"text": {"content": content},
|
||
"at": result.get('at', {})
|
||
}
|
||
|
||
elif msg_type == 'link':
|
||
return {
|
||
"msgtype": "link",
|
||
"link": {
|
||
"title": title,
|
||
"text": content,
|
||
"messageUrl": result.get('url', ''),
|
||
"picUrl": result.get('pic_url', '')
|
||
}
|
||
}
|
||
|
||
elif msg_type == 'actionCard':
|
||
buttons = result.get('buttons', [])
|
||
card = {
|
||
"title": title,
|
||
"text": content,
|
||
"btnOrientation": result.get('btn_orientation', '0') # 0-竖向 1-横向
|
||
}
|
||
|
||
if len(buttons) == 1:
|
||
# 整体跳转
|
||
card["singleTitle"] = buttons[0].get('title', '查看详情')
|
||
card["singleURL"] = buttons[0].get('url', '')
|
||
elif len(buttons) > 1:
|
||
# 独立跳转
|
||
card["btns"] = [
|
||
{"title": btn.get('title', ''), "actionURL": btn.get('url', '')}
|
||
for btn in buttons
|
||
]
|
||
|
||
return {"msgtype": "actionCard", "actionCard": card}
|
||
|
||
elif msg_type == 'feedCard':
|
||
links = result.get('links', [])
|
||
return {
|
||
"msgtype": "feedCard",
|
||
"feedCard": {
|
||
"links": [
|
||
{
|
||
"title": link.get('title', ''),
|
||
"messageURL": link.get('url', ''),
|
||
"picURL": link.get('pic_url', '')
|
||
}
|
||
for link in links
|
||
]
|
||
}
|
||
}
|
||
|
||
else: # markdown(默认)
|
||
return {
|
||
"msgtype": "markdown",
|
||
"markdown": {"title": title, "text": content},
|
||
"at": result.get('at', {})
|
||
}
|
||
|
||
def _build_wecom_payload(self, msg_type: str, title: str, content: str, result: dict) -> dict:
|
||
"""构建企微消息体
|
||
|
||
支持类型:
|
||
- text: 纯文本
|
||
- markdown: Markdown
|
||
- image: 图片
|
||
- news: 图文消息
|
||
- template_card: 模板卡片
|
||
"""
|
||
if msg_type == 'text':
|
||
payload = {
|
||
"msgtype": "text",
|
||
"text": {"content": content}
|
||
}
|
||
if result.get('mentioned_list'):
|
||
payload["text"]["mentioned_list"] = result.get('mentioned_list')
|
||
if result.get('mentioned_mobile_list'):
|
||
payload["text"]["mentioned_mobile_list"] = result.get('mentioned_mobile_list')
|
||
return payload
|
||
|
||
elif msg_type == 'image':
|
||
return {
|
||
"msgtype": "image",
|
||
"image": {
|
||
"base64": result.get('image_base64', ''),
|
||
"md5": result.get('image_md5', '')
|
||
}
|
||
}
|
||
|
||
elif msg_type == 'news':
|
||
articles = result.get('articles', [])
|
||
if not articles and content:
|
||
articles = [{
|
||
"title": title,
|
||
"description": content,
|
||
"url": result.get('url', ''),
|
||
"picurl": result.get('pic_url', '')
|
||
}]
|
||
return {
|
||
"msgtype": "news",
|
||
"news": {"articles": articles}
|
||
}
|
||
|
||
elif msg_type == 'template_card':
|
||
card_type = result.get('card_type', 'text_notice')
|
||
|
||
if card_type == 'text_notice':
|
||
# 文本通知卡片
|
||
return {
|
||
"msgtype": "template_card",
|
||
"template_card": {
|
||
"card_type": "text_notice",
|
||
"main_title": {"title": title, "desc": result.get('desc', '')},
|
||
"sub_title_text": content,
|
||
"horizontal_content_list": result.get('horizontal_list', []),
|
||
"jump_list": result.get('jump_list', []),
|
||
"card_action": result.get('card_action', {"type": 1, "url": ""})
|
||
}
|
||
}
|
||
|
||
elif card_type == 'news_notice':
|
||
# 图文展示卡片
|
||
return {
|
||
"msgtype": "template_card",
|
||
"template_card": {
|
||
"card_type": "news_notice",
|
||
"main_title": {"title": title, "desc": result.get('desc', '')},
|
||
"card_image": {"url": result.get('image_url', ''), "aspect_ratio": result.get('aspect_ratio', 1.3)},
|
||
"vertical_content_list": result.get('vertical_list', []),
|
||
"horizontal_content_list": result.get('horizontal_list', []),
|
||
"jump_list": result.get('jump_list', []),
|
||
"card_action": result.get('card_action', {"type": 1, "url": ""})
|
||
}
|
||
}
|
||
|
||
elif card_type == 'button_interaction':
|
||
# 按钮交互卡片
|
||
return {
|
||
"msgtype": "template_card",
|
||
"template_card": {
|
||
"card_type": "button_interaction",
|
||
"main_title": {"title": title, "desc": result.get('desc', '')},
|
||
"sub_title_text": content,
|
||
"horizontal_content_list": result.get('horizontal_list', []),
|
||
"button_list": result.get('buttons', []),
|
||
"card_action": result.get('card_action', {"type": 1, "url": ""})
|
||
}
|
||
}
|
||
|
||
else: # markdown(默认)
|
||
return {
|
||
"msgtype": "markdown",
|
||
"markdown": {"content": f"**{title}**\n\n{content}"}
|
||
}
|
||
|
||
async def _send_to_wecom_app(self, db: Session, app_id: int, result: dict, tenant_id: str):
|
||
"""发送消息到企微应用"""
|
||
from ..models.tenant_wechat_app import TenantWechatApp
|
||
|
||
app = db.query(TenantWechatApp).filter(TenantWechatApp.id == app_id).first()
|
||
if not app:
|
||
return
|
||
|
||
# 获取 access_token
|
||
access_token = await self._get_wecom_access_token(app.corp_id, app.app_secret)
|
||
if not access_token:
|
||
return
|
||
|
||
title = result.get('title', '通知')
|
||
content = result.get('content', '')
|
||
|
||
# 发送消息
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
|
||
payload = {
|
||
"touser": result.get('touser', '@all'),
|
||
"msgtype": "markdown",
|
||
"agentid": app.agent_id,
|
||
"markdown": {
|
||
"content": f"**{title}**\n\n{content}"
|
||
}
|
||
}
|
||
|
||
async with httpx.AsyncClient(timeout=10) as client:
|
||
response = await client.post(url, json=payload)
|
||
result = response.json()
|
||
if result.get('errcode') != 0:
|
||
print(f"企微应用消息发送失败: {result}")
|
||
|
||
async def _get_wecom_access_token(self, corp_id: str, app_secret: str) -> Optional[str]:
|
||
"""获取企微 access_token"""
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corp_id}&corpsecret={app_secret}"
|
||
|
||
async with httpx.AsyncClient(timeout=10) as client:
|
||
response = await client.get(url)
|
||
result = response.json()
|
||
if result.get('errcode') == 0:
|
||
return result.get('access_token')
|
||
else:
|
||
print(f"获取企微 access_token 失败: {result}")
|
||
return None
|
||
|
||
async def _send_alert(self, task: ScheduledTask, error: str):
|
||
"""发送失败告警"""
|
||
if not task.alert_webhook:
|
||
return
|
||
|
||
content = f"""### 定时任务执行失败告警
|
||
|
||
**任务名称**: {task.task_name}
|
||
**任务ID**: {task.id}
|
||
**租户**: {task.tenant_id or '全局'}
|
||
**失败时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
|
||
**错误信息**:
|
||
```
|
||
{error[:500] if error else '未知错误'}
|
||
```"""
|
||
|
||
try:
|
||
# 判断是钉钉还是企微
|
||
if 'dingtalk' in task.alert_webhook or 'oapi.dingtalk.com' in task.alert_webhook:
|
||
payload = {
|
||
"msgtype": "markdown",
|
||
"markdown": {"title": "任务失败告警", "text": content}
|
||
}
|
||
else:
|
||
payload = {
|
||
"msgtype": "markdown",
|
||
"markdown": {"content": content}
|
||
}
|
||
|
||
async with httpx.AsyncClient(timeout=10) as client:
|
||
await client.post(task.alert_webhook, json=payload)
|
||
except Exception as e:
|
||
print(f"发送告警失败: {e}")
|
||
|
||
async def run_task_now(self, task_id: int) -> dict:
|
||
"""立即执行任务(手动触发)"""
|
||
db = SessionLocal()
|
||
try:
|
||
task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
|
||
if not task:
|
||
return {"success": False, "error": "任务不存在"}
|
||
|
||
success, output, error = await self._execute_task_once(db, task)
|
||
|
||
return {
|
||
"success": success,
|
||
"output": output,
|
||
"error": error
|
||
}
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
# 全局调度器实例
|
||
scheduler_service = SchedulerService()
|