feat: 扩展消息类型支持钉钉/企微所有格式
All checks were successful
continuous-integration/drone/push Build is passing

钉钉机器人支持:
- text: 纯文本(支持@人)
- markdown: Markdown格式
- link: 链接消息
- actionCard: 交互卡片(整体跳转/独立跳转按钮)
- feedCard: 信息流卡片

企微机器人支持:
- text: 纯文本(支持@人)
- markdown: Markdown格式
- image: 图片
- news: 图文消息
- template_card: 模板卡片(文本通知/图文展示/按钮交互)

使用方式: result = {'msg_type': 'actionCard', 'title': '...', 'content': '...', 'buttons': [...]}
This commit is contained in:
2026-01-28 17:44:01 +08:00
parent 3cf5451597
commit 97d0aac734

View File

@@ -239,11 +239,25 @@ class SchedulerService:
return success, output, error, result
async def _send_notifications(self, db: Session, task: ScheduledTask, result: dict):
"""发送通知到配置的渠道"""
"""发送通知到配置的渠道
result 格式:
- 简单格式: {'content': '内容', 'title': '标题'}
- 完整格式: {'msg_type': 'actionCard', 'title': '...', 'content': '...', 'buttons': [...]}
支持的 msg_type:
- text: 纯文本
- markdown: Markdown格式默认
- link: 链接消息
- actionCard: 交互卡片(带按钮)
- feedCard: 信息流卡片
- news: 图文消息(企微)
- template_card: 模板卡片(企微)
"""
content = result.get('content', '')
title = result.get('title', task.task_name)
if not content:
if not content and result.get('msg_type') not in ('feedCard', 'news', 'template_card'):
return
# 获取通知渠道配置
@@ -268,7 +282,7 @@ class SchedulerService:
if not channel:
continue
await self._send_to_channel(channel, content, title)
await self._send_to_channel(channel, result)
except Exception as e:
print(f"发送通知到渠道 {channel_id} 失败: {e}")
@@ -276,12 +290,16 @@ class SchedulerService:
# 发送到企微应用
if task.notify_wecom_app_id:
try:
await self._send_to_wecom_app(db, task.notify_wecom_app_id, content, title, task.tenant_id)
await self._send_to_wecom_app(db, task.notify_wecom_app_id, result, task.tenant_id)
except Exception as e:
print(f"发送企微应用消息失败: {e}")
async def _send_to_channel(self, channel: TaskNotifyChannel, content: str, title: str):
"""发送消息到通知渠道"""
async def _send_to_channel(self, channel: TaskNotifyChannel, result: dict):
"""发送消息到通知渠道
钉钉支持: text, markdown, link, actionCard, feedCard
企微支持: text, markdown, image, news, template_card
"""
import time
import hmac
import hashlib
@@ -289,6 +307,9 @@ class SchedulerService:
import urllib.parse
url = channel.webhook_url
msg_type = result.get('msg_type', 'markdown')
title = result.get('title', '通知')
content = result.get('content', '')
if channel.channel_type == 'dingtalk_bot':
# 钉钉加签
@@ -307,28 +328,185 @@ class SchedulerService:
else:
url = f"{url}?timestamp={timestamp}&sign={sign}"
payload = {
"msgtype": "markdown",
"markdown": {
"title": title,
"text": content
}
}
payload = self._build_dingtalk_payload(msg_type, title, content, result)
else: # wecom_bot
payload = {
"msgtype": "markdown",
"markdown": {
"content": f"**{title}**\n\n{content}"
}
}
payload = self._build_wecom_payload(msg_type, title, content, result)
async with httpx.AsyncClient(timeout=10) as client:
response = await client.post(url, json=payload)
result = response.json()
if result.get('errcode') != 0:
print(f"通知发送失败: {result}")
resp = response.json()
if resp.get('errcode') != 0:
print(f"通知发送失败: {resp}")
async def _send_to_wecom_app(self, db: Session, app_id: int, content: str, title: str, tenant_id: str):
def _build_dingtalk_payload(self, msg_type: str, title: str, content: str, result: dict) -> dict:
"""构建钉钉消息体
支持类型:
- text: 纯文本
- markdown: Markdown
- link: 链接消息
- actionCard: 交互卡片(整体跳转/独立跳转)
- feedCard: 信息流卡片
"""
if msg_type == 'text':
return {
"msgtype": "text",
"text": {"content": content},
"at": result.get('at', {})
}
elif msg_type == 'link':
return {
"msgtype": "link",
"link": {
"title": title,
"text": content,
"messageUrl": result.get('url', ''),
"picUrl": result.get('pic_url', '')
}
}
elif msg_type == 'actionCard':
buttons = result.get('buttons', [])
card = {
"title": title,
"text": content,
"btnOrientation": result.get('btn_orientation', '0') # 0-竖向 1-横向
}
if len(buttons) == 1:
# 整体跳转
card["singleTitle"] = buttons[0].get('title', '查看详情')
card["singleURL"] = buttons[0].get('url', '')
elif len(buttons) > 1:
# 独立跳转
card["btns"] = [
{"title": btn.get('title', ''), "actionURL": btn.get('url', '')}
for btn in buttons
]
return {"msgtype": "actionCard", "actionCard": card}
elif msg_type == 'feedCard':
links = result.get('links', [])
return {
"msgtype": "feedCard",
"feedCard": {
"links": [
{
"title": link.get('title', ''),
"messageURL": link.get('url', ''),
"picURL": link.get('pic_url', '')
}
for link in links
]
}
}
else: # markdown默认
return {
"msgtype": "markdown",
"markdown": {"title": title, "text": content},
"at": result.get('at', {})
}
def _build_wecom_payload(self, msg_type: str, title: str, content: str, result: dict) -> dict:
"""构建企微消息体
支持类型:
- text: 纯文本
- markdown: Markdown
- image: 图片
- news: 图文消息
- template_card: 模板卡片
"""
if msg_type == 'text':
payload = {
"msgtype": "text",
"text": {"content": content}
}
if result.get('mentioned_list'):
payload["text"]["mentioned_list"] = result.get('mentioned_list')
if result.get('mentioned_mobile_list'):
payload["text"]["mentioned_mobile_list"] = result.get('mentioned_mobile_list')
return payload
elif msg_type == 'image':
return {
"msgtype": "image",
"image": {
"base64": result.get('image_base64', ''),
"md5": result.get('image_md5', '')
}
}
elif msg_type == 'news':
articles = result.get('articles', [])
if not articles and content:
articles = [{
"title": title,
"description": content,
"url": result.get('url', ''),
"picurl": result.get('pic_url', '')
}]
return {
"msgtype": "news",
"news": {"articles": articles}
}
elif msg_type == 'template_card':
card_type = result.get('card_type', 'text_notice')
if card_type == 'text_notice':
# 文本通知卡片
return {
"msgtype": "template_card",
"template_card": {
"card_type": "text_notice",
"main_title": {"title": title, "desc": result.get('desc', '')},
"sub_title_text": content,
"horizontal_content_list": result.get('horizontal_list', []),
"jump_list": result.get('jump_list', []),
"card_action": result.get('card_action', {"type": 1, "url": ""})
}
}
elif card_type == 'news_notice':
# 图文展示卡片
return {
"msgtype": "template_card",
"template_card": {
"card_type": "news_notice",
"main_title": {"title": title, "desc": result.get('desc', '')},
"card_image": {"url": result.get('image_url', ''), "aspect_ratio": result.get('aspect_ratio', 1.3)},
"vertical_content_list": result.get('vertical_list', []),
"horizontal_content_list": result.get('horizontal_list', []),
"jump_list": result.get('jump_list', []),
"card_action": result.get('card_action', {"type": 1, "url": ""})
}
}
elif card_type == 'button_interaction':
# 按钮交互卡片
return {
"msgtype": "template_card",
"template_card": {
"card_type": "button_interaction",
"main_title": {"title": title, "desc": result.get('desc', '')},
"sub_title_text": content,
"horizontal_content_list": result.get('horizontal_list', []),
"button_list": result.get('buttons', []),
"card_action": result.get('card_action', {"type": 1, "url": ""})
}
}
else: # markdown默认
return {
"msgtype": "markdown",
"markdown": {"content": f"**{title}**\n\n{content}"}
}
async def _send_to_wecom_app(self, db: Session, app_id: int, result: dict, tenant_id: str):
"""发送消息到企微应用"""
from ..models.tenant_wechat_app import TenantWechatApp
@@ -341,10 +519,13 @@ class SchedulerService:
if not access_token:
return
title = result.get('title', '通知')
content = result.get('content', '')
# 发送消息
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
payload = {
"touser": "@all",
"touser": result.get('touser', '@all'),
"msgtype": "markdown",
"agentid": app.agent_id,
"markdown": {