diff --git a/backend/app/main.py b/backend/app/main.py index 07a5da5..8eec4ed 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -16,6 +16,8 @@ from .routers.cost import router as cost_router from .routers.quota import router as quota_router from .routers.tasks import router as tasks_router from .routers.notification_channels import router as notification_channels_router +from .routers.tool_configs import router as tool_configs_router +from .routers.ruimeiyun import router as ruimeiyun_router from .middleware import TraceMiddleware, setup_exception_handlers, RequestLoggerMiddleware from .middleware.trace import setup_logging from .services.scheduler import scheduler_service @@ -71,6 +73,8 @@ app.include_router(cost_router, prefix="/api") app.include_router(quota_router, prefix="/api") app.include_router(tasks_router) app.include_router(notification_channels_router) +app.include_router(tool_configs_router, prefix="/api") +app.include_router(ruimeiyun_router, prefix="/api") # 应用生命周期事件 diff --git a/backend/app/models/notification_channel.py b/backend/app/models/notification_channel.py index 16c5157..518b530 100644 --- a/backend/app/models/notification_channel.py +++ b/backend/app/models/notification_channel.py @@ -1,21 +1,21 @@ -"""任务通知渠道模型""" -from datetime import datetime -from sqlalchemy import Column, Integer, String, Enum, Boolean, DateTime -from ..database import Base - - -class TaskNotifyChannel(Base): - """任务通知渠道表(用于定时任务推送)""" - __tablename__ = "platform_task_notify_channels" - - id = Column(Integer, primary_key=True, autoincrement=True) - tenant_id = Column(String(50), nullable=False) - channel_name = Column(String(100), nullable=False) - channel_type = Column(Enum('dingtalk_bot', 'wecom_bot'), nullable=False) - webhook_url = Column(String(500), nullable=False) - sign_secret = Column(String(200)) # 钉钉加签密钥 - description = Column(String(255)) - is_enabled = Column(Boolean, default=True) - - created_at = Column(DateTime, default=datetime.now) - updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) +"""任务通知渠道模型""" +from datetime import datetime +from sqlalchemy import Column, Integer, String, Enum, Boolean, DateTime +from ..database import Base + + +class TaskNotifyChannel(Base): + """任务通知渠道表(用于定时任务推送)""" + __tablename__ = "platform_task_notify_channels" + + id = Column(Integer, primary_key=True, autoincrement=True) + tenant_id = Column(String(50), nullable=False) + channel_name = Column(String(100), nullable=False) + channel_type = Column(Enum('dingtalk_bot', 'wecom_bot'), nullable=False) + webhook_url = Column(String(500), nullable=False) + sign_secret = Column(String(200)) # 钉钉加签密钥 + description = Column(String(255)) + is_enabled = Column(Boolean, default=True) + + created_at = Column(DateTime, default=datetime.now) + updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) diff --git a/backend/app/models/scheduled_task.py b/backend/app/models/scheduled_task.py index 75668a6..3474972 100644 --- a/backend/app/models/scheduled_task.py +++ b/backend/app/models/scheduled_task.py @@ -1,103 +1,103 @@ -"""定时任务相关模型""" -from datetime import datetime -from sqlalchemy import Column, BigInteger, Integer, String, Text, Enum, SmallInteger, TIMESTAMP, DateTime, JSON, Boolean -from ..database import Base - - -class ScheduledTask(Base): - """定时任务表""" - __tablename__ = "platform_scheduled_tasks" - - id = Column(Integer, primary_key=True, autoincrement=True) - tenant_id = Column(String(50)) - task_name = Column(String(100), nullable=False) - task_desc = Column(String(500)) - - # 调度配置 - schedule_type = Column(Enum('simple', 'cron'), nullable=False, default='simple') - time_points = Column(JSON) # JSON数组 ["08:00", "12:00"] - cron_expression = Column(String(100)) - timezone = Column(String(50), default='Asia/Shanghai') - - # 执行类型 - execution_type = Column(Enum('webhook', 'script'), nullable=False, default='script') - - # Webhook配置 - webhook_url = Column(String(500)) - - # 脚本配置 - script_content = Column(Text) - script_deps = Column(Text) # 脚本依赖 - - # 输入参数 - input_params = Column(JSON) # JSON格式 - - # 重试配置 - retry_count = Column(Integer, default=0) - retry_interval = Column(Integer, default=60) - - # 告警配置 - alert_on_failure = Column(Boolean, default=False) - alert_webhook = Column(String(500)) - - # 通知配置 - notify_channels = Column(JSON) # 通知渠道ID列表 - notify_wecom_app_id = Column(Integer) # 企微应用ID - - # 状态 - is_enabled = Column(Boolean, default=True) - last_run_at = Column(DateTime) - last_run_status = Column(Enum('success', 'failed', 'running')) - last_run_message = Column(Text) - - created_at = Column(DateTime, default=datetime.now) - updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) - - -class TaskLog(Base): - """任务执行日志""" - __tablename__ = "platform_task_logs" - - id = Column(BigInteger, primary_key=True, autoincrement=True) - task_id = Column(Integer, nullable=False) - tenant_id = Column(String(50)) - trace_id = Column(String(100)) - - status = Column(Enum('running', 'success', 'failed'), nullable=False) - started_at = Column(DateTime, nullable=False) - finished_at = Column(DateTime) - duration_ms = Column(Integer) - - output = Column(Text) - error = Column(Text) - retry_count = Column(Integer, default=0) - - created_at = Column(TIMESTAMP, default=datetime.now) - - -class ScriptVar(Base): - """脚本变量存储""" - __tablename__ = "platform_script_vars" - - id = Column(Integer, primary_key=True, autoincrement=True) - task_id = Column(Integer, nullable=False) - tenant_id = Column(String(50)) - var_key = Column(String(100), nullable=False) - var_value = Column(Text) # JSON格式 - - created_at = Column(TIMESTAMP, default=datetime.now) - updated_at = Column(TIMESTAMP, default=datetime.now, onupdate=datetime.now) - - -class Secret(Base): - """密钥管理""" - __tablename__ = "platform_secrets" - - id = Column(Integer, primary_key=True, autoincrement=True) - tenant_id = Column(String(50)) # NULL为全局 - secret_key = Column(String(100), nullable=False) - secret_value = Column(Text, nullable=False) - description = Column(String(255)) - - created_at = Column(TIMESTAMP, default=datetime.now) - updated_at = Column(TIMESTAMP, default=datetime.now, onupdate=datetime.now) +"""定时任务相关模型""" +from datetime import datetime +from sqlalchemy import Column, BigInteger, Integer, String, Text, Enum, SmallInteger, TIMESTAMP, DateTime, JSON, Boolean +from ..database import Base + + +class ScheduledTask(Base): + """定时任务表""" + __tablename__ = "platform_scheduled_tasks" + + id = Column(Integer, primary_key=True, autoincrement=True) + tenant_id = Column(String(50)) + task_name = Column(String(100), nullable=False) + task_desc = Column(String(500)) + + # 调度配置 + schedule_type = Column(Enum('simple', 'cron'), nullable=False, default='simple') + time_points = Column(JSON) # JSON数组 ["08:00", "12:00"] + cron_expression = Column(String(100)) + timezone = Column(String(50), default='Asia/Shanghai') + + # 执行类型 + execution_type = Column(Enum('webhook', 'script'), nullable=False, default='script') + + # Webhook配置 + webhook_url = Column(String(500)) + + # 脚本配置 + script_content = Column(Text) + script_deps = Column(Text) # 脚本依赖 + + # 输入参数 + input_params = Column(JSON) # JSON格式 + + # 重试配置 + retry_count = Column(Integer, default=0) + retry_interval = Column(Integer, default=60) + + # 告警配置 + alert_on_failure = Column(Boolean, default=False) + alert_webhook = Column(String(500)) + + # 通知配置 + notify_channels = Column(JSON) # 通知渠道ID列表 + notify_wecom_app_id = Column(Integer) # 企微应用ID + + # 状态 + is_enabled = Column(Boolean, default=True) + last_run_at = Column(DateTime) + last_run_status = Column(Enum('success', 'failed', 'running')) + last_run_message = Column(Text) + + created_at = Column(DateTime, default=datetime.now) + updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) + + +class TaskLog(Base): + """任务执行日志""" + __tablename__ = "platform_task_logs" + + id = Column(BigInteger, primary_key=True, autoincrement=True) + task_id = Column(Integer, nullable=False) + tenant_id = Column(String(50)) + trace_id = Column(String(100)) + + status = Column(Enum('running', 'success', 'failed'), nullable=False) + started_at = Column(DateTime, nullable=False) + finished_at = Column(DateTime) + duration_ms = Column(Integer) + + output = Column(Text) + error = Column(Text) + retry_count = Column(Integer, default=0) + + created_at = Column(TIMESTAMP, default=datetime.now) + + +class ScriptVar(Base): + """脚本变量存储""" + __tablename__ = "platform_script_vars" + + id = Column(Integer, primary_key=True, autoincrement=True) + task_id = Column(Integer, nullable=False) + tenant_id = Column(String(50)) + var_key = Column(String(100), nullable=False) + var_value = Column(Text) # JSON格式 + + created_at = Column(TIMESTAMP, default=datetime.now) + updated_at = Column(TIMESTAMP, default=datetime.now, onupdate=datetime.now) + + +class Secret(Base): + """密钥管理""" + __tablename__ = "platform_secrets" + + id = Column(Integer, primary_key=True, autoincrement=True) + tenant_id = Column(String(50)) # NULL为全局 + secret_key = Column(String(100), nullable=False) + secret_value = Column(Text, nullable=False) + description = Column(String(255)) + + created_at = Column(TIMESTAMP, default=datetime.now) + updated_at = Column(TIMESTAMP, default=datetime.now, onupdate=datetime.now) diff --git a/backend/app/routers/notification_channels.py b/backend/app/routers/notification_channels.py index 51fb30c..93e860b 100644 --- a/backend/app/routers/notification_channels.py +++ b/backend/app/routers/notification_channels.py @@ -1,211 +1,211 @@ -"""通知渠道API路由""" -from typing import Optional, List -from fastapi import APIRouter, Depends, HTTPException, Query -from pydantic import BaseModel -from sqlalchemy.orm import Session -from sqlalchemy import desc - -from ..database import get_db -from ..models.notification_channel import TaskNotifyChannel - - -router = APIRouter(prefix="/api/notification-channels", tags=["notification-channels"]) - - -# ==================== Schemas ==================== - -class ChannelCreate(BaseModel): - tenant_id: str - channel_name: str - channel_type: str # dingtalk_bot, wecom_bot - webhook_url: str - sign_secret: Optional[str] = None # 钉钉加签密钥 - description: Optional[str] = None - - -class ChannelUpdate(BaseModel): - channel_name: Optional[str] = None - channel_type: Optional[str] = None - webhook_url: Optional[str] = None - sign_secret: Optional[str] = None - description: Optional[str] = None - is_enabled: Optional[bool] = None - - -# ==================== CRUD ==================== - -@router.get("") -async def list_channels( - tenant_id: Optional[str] = None, - channel_type: Optional[str] = None, - is_enabled: Optional[bool] = None, - page: int = Query(1, ge=1), - size: int = Query(50, ge=1, le=100), - db: Session = Depends(get_db) -): - """获取通知渠道列表""" - query = db.query(TaskNotifyChannel) - - if tenant_id: - query = query.filter(TaskNotifyChannel.tenant_id == tenant_id) - if channel_type: - query = query.filter(TaskNotifyChannel.channel_type == channel_type) - if is_enabled is not None: - query = query.filter(TaskNotifyChannel.is_enabled == is_enabled) - - total = query.count() - items = query.order_by(desc(TaskNotifyChannel.created_at)).offset((page - 1) * size).limit(size).all() - - return { - "total": total, - "items": [format_channel(c) for c in items] - } - - -@router.get("/{channel_id}") -async def get_channel(channel_id: int, db: Session = Depends(get_db)): - """获取渠道详情""" - channel = db.query(TaskNotifyChannel).filter(TaskNotifyChannel.id == channel_id).first() - if not channel: - raise HTTPException(status_code=404, detail="渠道不存在") - return format_channel(channel) - - -@router.post("") -async def create_channel(data: ChannelCreate, db: Session = Depends(get_db)): - """创建通知渠道""" - channel = TaskNotifyChannel( - tenant_id=data.tenant_id, - channel_name=data.channel_name, - channel_type=data.channel_type, - webhook_url=data.webhook_url, - sign_secret=data.sign_secret, - description=data.description, - is_enabled=True - ) - - db.add(channel) - db.commit() - db.refresh(channel) - - return {"success": True, "id": channel.id} - - -@router.put("/{channel_id}") -async def update_channel(channel_id: int, data: ChannelUpdate, db: Session = Depends(get_db)): - """更新通知渠道""" - channel = db.query(TaskNotifyChannel).filter(TaskNotifyChannel.id == channel_id).first() - if not channel: - raise HTTPException(status_code=404, detail="渠道不存在") - - if data.channel_name is not None: - channel.channel_name = data.channel_name - if data.channel_type is not None: - channel.channel_type = data.channel_type - if data.webhook_url is not None: - channel.webhook_url = data.webhook_url - if data.sign_secret is not None: - channel.sign_secret = data.sign_secret - if data.description is not None: - channel.description = data.description - if data.is_enabled is not None: - channel.is_enabled = data.is_enabled - - db.commit() - return {"success": True} - - -@router.delete("/{channel_id}") -async def delete_channel(channel_id: int, db: Session = Depends(get_db)): - """删除通知渠道""" - channel = db.query(TaskNotifyChannel).filter(TaskNotifyChannel.id == channel_id).first() - if not channel: - raise HTTPException(status_code=404, detail="渠道不存在") - - db.delete(channel) - db.commit() - return {"success": True} - - -@router.post("/{channel_id}/test") -async def test_channel(channel_id: int, db: Session = Depends(get_db)): - """测试通知渠道""" - import httpx - import time - import hmac - import hashlib - import base64 - import urllib.parse - - channel = db.query(TaskNotifyChannel).filter(TaskNotifyChannel.id == channel_id).first() - if not channel: - raise HTTPException(status_code=404, detail="渠道不存在") - - test_content = f"**测试消息**\n\n渠道名称: {channel.channel_name}\n发送时间: 测试中..." - - try: - url = channel.webhook_url - - if channel.channel_type == 'dingtalk_bot': - # 钉钉加签 - if channel.sign_secret: - timestamp = str(round(time.time() * 1000)) - string_to_sign = f'{timestamp}\n{channel.sign_secret}' - hmac_code = hmac.new( - channel.sign_secret.encode('utf-8'), - string_to_sign.encode('utf-8'), - digestmod=hashlib.sha256 - ).digest() - sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) - - # 拼接签名参数 - if '?' in url: - url = f"{url}×tamp={timestamp}&sign={sign}" - else: - url = f"{url}?timestamp={timestamp}&sign={sign}" - - payload = { - "msgtype": "markdown", - "markdown": { - "title": "渠道测试", - "text": test_content - } - } - else: # wecom_bot - payload = { - "msgtype": "markdown", - "markdown": { - "content": test_content - } - } - - async with httpx.AsyncClient(timeout=10) as client: - response = await client.post(url, json=payload) - result = response.json() - - # 钉钉返回 errcode=0,企微返回 errcode=0 - if result.get('errcode') == 0: - return {"success": True, "message": "发送成功"} - else: - return {"success": False, "message": f"发送失败: {result}"} - - except Exception as e: - return {"success": False, "message": f"发送失败: {str(e)}"} - - -# ==================== Helpers ==================== - -def format_channel(channel: TaskNotifyChannel) -> dict: - """格式化渠道数据""" - return { - "id": channel.id, - "tenant_id": channel.tenant_id, - "channel_name": channel.channel_name, - "channel_type": channel.channel_type, - "webhook_url": channel.webhook_url, - "sign_secret": channel.sign_secret, - "description": channel.description, - "is_enabled": channel.is_enabled, - "created_at": channel.created_at, - "updated_at": channel.updated_at - } +"""通知渠道API路由""" +from typing import Optional, List +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy.orm import Session +from sqlalchemy import desc + +from ..database import get_db +from ..models.notification_channel import TaskNotifyChannel + + +router = APIRouter(prefix="/api/notification-channels", tags=["notification-channels"]) + + +# ==================== Schemas ==================== + +class ChannelCreate(BaseModel): + tenant_id: str + channel_name: str + channel_type: str # dingtalk_bot, wecom_bot + webhook_url: str + sign_secret: Optional[str] = None # 钉钉加签密钥 + description: Optional[str] = None + + +class ChannelUpdate(BaseModel): + channel_name: Optional[str] = None + channel_type: Optional[str] = None + webhook_url: Optional[str] = None + sign_secret: Optional[str] = None + description: Optional[str] = None + is_enabled: Optional[bool] = None + + +# ==================== CRUD ==================== + +@router.get("") +async def list_channels( + tenant_id: Optional[str] = None, + channel_type: Optional[str] = None, + is_enabled: Optional[bool] = None, + page: int = Query(1, ge=1), + size: int = Query(50, ge=1, le=100), + db: Session = Depends(get_db) +): + """获取通知渠道列表""" + query = db.query(TaskNotifyChannel) + + if tenant_id: + query = query.filter(TaskNotifyChannel.tenant_id == tenant_id) + if channel_type: + query = query.filter(TaskNotifyChannel.channel_type == channel_type) + if is_enabled is not None: + query = query.filter(TaskNotifyChannel.is_enabled == is_enabled) + + total = query.count() + items = query.order_by(desc(TaskNotifyChannel.created_at)).offset((page - 1) * size).limit(size).all() + + return { + "total": total, + "items": [format_channel(c) for c in items] + } + + +@router.get("/{channel_id}") +async def get_channel(channel_id: int, db: Session = Depends(get_db)): + """获取渠道详情""" + channel = db.query(TaskNotifyChannel).filter(TaskNotifyChannel.id == channel_id).first() + if not channel: + raise HTTPException(status_code=404, detail="渠道不存在") + return format_channel(channel) + + +@router.post("") +async def create_channel(data: ChannelCreate, db: Session = Depends(get_db)): + """创建通知渠道""" + channel = TaskNotifyChannel( + tenant_id=data.tenant_id, + channel_name=data.channel_name, + channel_type=data.channel_type, + webhook_url=data.webhook_url, + sign_secret=data.sign_secret, + description=data.description, + is_enabled=True + ) + + db.add(channel) + db.commit() + db.refresh(channel) + + return {"success": True, "id": channel.id} + + +@router.put("/{channel_id}") +async def update_channel(channel_id: int, data: ChannelUpdate, db: Session = Depends(get_db)): + """更新通知渠道""" + channel = db.query(TaskNotifyChannel).filter(TaskNotifyChannel.id == channel_id).first() + if not channel: + raise HTTPException(status_code=404, detail="渠道不存在") + + if data.channel_name is not None: + channel.channel_name = data.channel_name + if data.channel_type is not None: + channel.channel_type = data.channel_type + if data.webhook_url is not None: + channel.webhook_url = data.webhook_url + if data.sign_secret is not None: + channel.sign_secret = data.sign_secret + if data.description is not None: + channel.description = data.description + if data.is_enabled is not None: + channel.is_enabled = data.is_enabled + + db.commit() + return {"success": True} + + +@router.delete("/{channel_id}") +async def delete_channel(channel_id: int, db: Session = Depends(get_db)): + """删除通知渠道""" + channel = db.query(TaskNotifyChannel).filter(TaskNotifyChannel.id == channel_id).first() + if not channel: + raise HTTPException(status_code=404, detail="渠道不存在") + + db.delete(channel) + db.commit() + return {"success": True} + + +@router.post("/{channel_id}/test") +async def test_channel(channel_id: int, db: Session = Depends(get_db)): + """测试通知渠道""" + import httpx + import time + import hmac + import hashlib + import base64 + import urllib.parse + + channel = db.query(TaskNotifyChannel).filter(TaskNotifyChannel.id == channel_id).first() + if not channel: + raise HTTPException(status_code=404, detail="渠道不存在") + + test_content = f"**测试消息**\n\n渠道名称: {channel.channel_name}\n发送时间: 测试中..." + + try: + url = channel.webhook_url + + if channel.channel_type == 'dingtalk_bot': + # 钉钉加签 + if channel.sign_secret: + timestamp = str(round(time.time() * 1000)) + string_to_sign = f'{timestamp}\n{channel.sign_secret}' + hmac_code = hmac.new( + channel.sign_secret.encode('utf-8'), + string_to_sign.encode('utf-8'), + digestmod=hashlib.sha256 + ).digest() + sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) + + # 拼接签名参数 + if '?' in url: + url = f"{url}×tamp={timestamp}&sign={sign}" + else: + url = f"{url}?timestamp={timestamp}&sign={sign}" + + payload = { + "msgtype": "markdown", + "markdown": { + "title": "渠道测试", + "text": test_content + } + } + else: # wecom_bot + payload = { + "msgtype": "markdown", + "markdown": { + "content": test_content + } + } + + async with httpx.AsyncClient(timeout=10) as client: + response = await client.post(url, json=payload) + result = response.json() + + # 钉钉返回 errcode=0,企微返回 errcode=0 + if result.get('errcode') == 0: + return {"success": True, "message": "发送成功"} + else: + return {"success": False, "message": f"发送失败: {result}"} + + except Exception as e: + return {"success": False, "message": f"发送失败: {str(e)}"} + + +# ==================== Helpers ==================== + +def format_channel(channel: TaskNotifyChannel) -> dict: + """格式化渠道数据""" + return { + "id": channel.id, + "tenant_id": channel.tenant_id, + "channel_name": channel.channel_name, + "channel_type": channel.channel_type, + "webhook_url": channel.webhook_url, + "sign_secret": channel.sign_secret, + "description": channel.description, + "is_enabled": channel.is_enabled, + "created_at": channel.created_at, + "updated_at": channel.updated_at + } diff --git a/backend/app/routers/ruimeiyun.py b/backend/app/routers/ruimeiyun.py new file mode 100644 index 0000000..a1fce56 --- /dev/null +++ b/backend/app/routers/ruimeiyun.py @@ -0,0 +1,287 @@ +""" +睿美云代理路由 + +提供统一的睿美云接口代理能力,支持: +- 多租户配置隔离 +- 接口权限控制 +- 统一日志记录 +- 错误处理 +""" + +import logging +from typing import Optional, Dict, Any, List +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy.orm import Session + +from ..database import get_db +from ..services.ruimeiyun import RuimeiyunClient, RUIMEIYUN_APIS, get_api_definition +from ..services.ruimeiyun.client import RuimeiyunError +from ..services.ruimeiyun.registry import get_all_modules, get_api_list_by_module, get_api_summary +from .auth import get_current_user +from ..models.user import User + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/ruimeiyun", tags=["睿美云代理"]) + + +# ======================================== +# Schemas +# ======================================== + +class RuimeiyunCallRequest(BaseModel): + """睿美云接口调用请求""" + params: Optional[Dict[str, Any]] = None # URL 参数 + body: Optional[Dict[str, Any]] = None # 请求体 + + +class RuimeiyunRawCallRequest(BaseModel): + """睿美云原始接口调用请求""" + method: str # HTTP 方法 + path: str # API 路径 + params: Optional[Dict[str, Any]] = None + body: Optional[Dict[str, Any]] = None + + +# ======================================== +# API Endpoints +# ======================================== + +@router.post("/call/{api_name}") +async def call_ruimeiyun_api( + api_name: str, + request: RuimeiyunCallRequest, + tenant_id: str = Query(..., description="租户ID"), + user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """ + 调用睿美云接口(通过接口名称) + + Args: + api_name: 接口名称,如 customer.search, order.list + request: 请求参数 + tenant_id: 租户ID + + Returns: + 睿美云接口返回的数据 + + 示例: + POST /api/ruimeiyun/call/customer.search?tenant_id=xxx + Body: {"params": {"keyword": "13800138000", "page": 1, "size": 20}} + """ + try: + client = RuimeiyunClient(tenant_id, db) + result = await client.call( + api_name=api_name, + params=request.params, + body=request.body + ) + + if result.success: + return { + "success": True, + "data": result.data + } + else: + return { + "success": False, + "error": result.error, + "raw": result.raw_response + } + + except RuimeiyunError as e: + logger.error(f"睿美云调用失败: {api_name}, {e}") + raise HTTPException(status_code=e.status_code, detail=str(e)) + except Exception as e: + logger.exception(f"睿美云调用异常: {api_name}") + raise HTTPException(status_code=500, detail=f"调用失败: {str(e)}") + + +@router.post("/call-raw") +async def call_ruimeiyun_raw( + request: RuimeiyunRawCallRequest, + tenant_id: str = Query(..., description="租户ID"), + user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """ + 直接调用睿美云接口(通过路径) + + 用于调用未在注册表中定义的接口 + + Args: + request: 请求参数(包含 method, path, params, body) + tenant_id: 租户ID + + Returns: + 睿美云接口返回的数据 + + 示例: + POST /api/ruimeiyun/call-raw?tenant_id=xxx + Body: { + "method": "GET", + "path": "/api/v1/tpos/customer/customer-search", + "params": {"keyword": "13800138000"} + } + """ + try: + client = RuimeiyunClient(tenant_id, db) + result = await client.call_raw( + method=request.method, + path=request.path, + params=request.params, + body=request.body + ) + + if result.success: + return { + "success": True, + "data": result.data + } + else: + return { + "success": False, + "error": result.error, + "raw": result.raw_response + } + + except RuimeiyunError as e: + logger.error(f"睿美云调用失败: {request.path}, {e}") + raise HTTPException(status_code=e.status_code, detail=str(e)) + except Exception as e: + logger.exception(f"睿美云调用异常: {request.path}") + raise HTTPException(status_code=500, detail=f"调用失败: {str(e)}") + + +# ======================================== +# 接口元数据 +# ======================================== + +@router.get("/apis") +async def list_apis( + module: Optional[str] = None, + user: User = Depends(get_current_user) +): + """ + 获取可用的接口列表 + + Args: + module: 模块名称(可选),如 customer, order + + Returns: + 接口列表 + """ + if module: + apis = get_api_list_by_module(module) + return { + "module": module, + "count": len(apis), + "apis": apis + } + else: + return { + "count": len(RUIMEIYUN_APIS), + "summary": get_api_summary(), + "apis": RUIMEIYUN_APIS + } + + +@router.get("/apis/{api_name}") +async def get_api_info( + api_name: str, + user: User = Depends(get_current_user) +): + """ + 获取接口详情 + + Args: + api_name: 接口名称,如 customer.search + + Returns: + 接口定义 + """ + api_def = get_api_definition(api_name) + if not api_def: + raise HTTPException(status_code=404, detail=f"接口不存在: {api_name}") + + return { + "name": api_name, + **api_def + } + + +@router.get("/modules") +async def list_modules( + user: User = Depends(get_current_user) +): + """ + 获取所有模块列表 + + Returns: + 模块名称列表和每个模块的接口数量 + """ + modules = get_all_modules() + summary = get_api_summary() + + return { + "modules": [ + {"name": m, "count": summary.get(m, 0)} + for m in modules + ] + } + + +# ======================================== +# 健康检查 +# ======================================== + +@router.get("/health/{tenant_id}") +async def check_ruimeiyun_health( + tenant_id: str, + user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """ + 检查租户的睿美云连接状态 + + Args: + tenant_id: 租户ID + + Returns: + 连接状态信息 + """ + try: + client = RuimeiyunClient(tenant_id, db) + + # 调用门店列表接口测试连接 + result = await client.call("tenant.list") + + if result.success: + return { + "status": "connected", + "tenant_id": tenant_id, + "base_url": client.config.base_url, + "account": client.config.account, + "message": "连接正常" + } + else: + return { + "status": "error", + "tenant_id": tenant_id, + "message": result.error + } + + except RuimeiyunError as e: + return { + "status": "error", + "tenant_id": tenant_id, + "message": str(e) + } + except Exception as e: + return { + "status": "error", + "tenant_id": tenant_id, + "message": f"检查失败: {str(e)}" + } diff --git a/backend/app/routers/tasks.py b/backend/app/routers/tasks.py index 3fee208..f337313 100644 --- a/backend/app/routers/tasks.py +++ b/backend/app/routers/tasks.py @@ -1,559 +1,559 @@ -"""定时任务API路由""" -import json -from datetime import datetime -from typing import Optional, List -from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks -from pydantic import BaseModel -from sqlalchemy.orm import Session -from sqlalchemy import desc - -from ..database import get_db -from ..models.scheduled_task import ScheduledTask, TaskLog, Secret -from ..services.scheduler import scheduler_service -from ..services.script_executor import ScriptExecutor - - -router = APIRouter(prefix="/api/scheduled-tasks", tags=["scheduled-tasks"]) - - -# ==================== Schemas ==================== - -class TaskCreate(BaseModel): - tenant_id: Optional[str] = None - task_name: str - task_desc: Optional[str] = None - execution_type: str = 'script' - schedule_type: str = 'simple' - time_points: Optional[List[str]] = None - cron_expression: Optional[str] = None - webhook_url: Optional[str] = None - script_content: Optional[str] = None - input_params: Optional[dict] = None - retry_count: Optional[int] = 0 - retry_interval: Optional[int] = 60 - alert_on_failure: Optional[bool] = False - alert_webhook: Optional[str] = None - notify_channels: Optional[List[int]] = None # 通知渠道ID列表 - notify_wecom_app_id: Optional[int] = None # 企微应用ID - - -class TaskUpdate(BaseModel): - tenant_id: Optional[str] = None - task_name: Optional[str] = None - task_desc: Optional[str] = None - execution_type: Optional[str] = None - schedule_type: Optional[str] = None - time_points: Optional[List[str]] = None - cron_expression: Optional[str] = None - webhook_url: Optional[str] = None - script_content: Optional[str] = None - input_params: Optional[dict] = None - retry_count: Optional[int] = None - retry_interval: Optional[int] = None - alert_on_failure: Optional[bool] = None - alert_webhook: Optional[str] = None - is_enabled: Optional[bool] = None - notify_channels: Optional[List[int]] = None - notify_wecom_app_id: Optional[int] = None - - -class SecretCreate(BaseModel): - tenant_id: Optional[str] = None - secret_key: str - secret_value: str - description: Optional[str] = None - - -class SecretUpdate(BaseModel): - secret_value: Optional[str] = None - description: Optional[str] = None - - -class TestScriptRequest(BaseModel): - script_content: str - tenant_id: Optional[str] = None - params: Optional[dict] = None - - -# ==================== Static Routes (must be before dynamic routes) ==================== - -@router.get("/sdk-docs") -async def get_sdk_docs(): - """获取SDK文档""" - return { - "functions": [ - { - "name": "log", - "signature": "log(message: str, level: str = 'INFO')", - "description": "记录日志", - "example": "log('处理完成', 'INFO')" - }, - { - "name": "print", - "signature": "print(*args)", - "description": "打印输出", - "example": "print('Hello', 'World')" - }, - { - "name": "ai", - "signature": "ai(prompt: str, system: str = None, model: str = None, temperature: float = 0.7)", - "description": "调用AI模型", - "example": "result = ai('生成一段问候语', system='你是友善的助手')" - }, - { - "name": "dingtalk", - "signature": "dingtalk(webhook: str, content: str, title: str = None, at_all: bool = False)", - "description": "发送钉钉消息", - "example": "dingtalk(webhook_url, '# 标题\\n内容')" - }, - { - "name": "wecom", - "signature": "wecom(webhook: str, content: str, msg_type: str = 'markdown')", - "description": "发送企微消息", - "example": "wecom(webhook_url, '消息内容')" - }, - { - "name": "http_get", - "signature": "http_get(url: str, headers: dict = None, params: dict = None)", - "description": "发起GET请求", - "example": "resp = http_get('https://api.example.com/data')" - }, - { - "name": "http_post", - "signature": "http_post(url: str, data: any = None, headers: dict = None)", - "description": "发起POST请求", - "example": "resp = http_post('https://api.example.com/submit', {'key': 'value'})" - }, - { - "name": "db_query", - "signature": "db_query(sql: str, params: dict = None)", - "description": "执行只读SQL查询", - "example": "rows = db_query('SELECT * FROM users WHERE status = :status', {'status': 1})" - }, - { - "name": "get_var", - "signature": "get_var(key: str, default: any = None)", - "description": "获取持久化变量", - "example": "counter = get_var('counter', 0)" - }, - { - "name": "set_var", - "signature": "set_var(key: str, value: any)", - "description": "设置持久化变量", - "example": "set_var('counter', counter + 1)" - }, - { - "name": "del_var", - "signature": "del_var(key: str)", - "description": "删除持久化变量", - "example": "del_var('temp_data')" - }, - { - "name": "get_param", - "signature": "get_param(key: str, default: any = None)", - "description": "获取任务参数", - "example": "prompt = get_param('prompt', '默认提示词')" - }, - { - "name": "get_params", - "signature": "get_params()", - "description": "获取所有任务参数", - "example": "params = get_params()" - }, - { - "name": "get_tenants", - "signature": "get_tenants(app_code: str = None)", - "description": "获取租户列表", - "example": "tenants = get_tenants('notification-service')" - }, - { - "name": "get_tenant_config", - "signature": "get_tenant_config(tenant_id: str, app_code: str, key: str = None)", - "description": "获取租户的应用配置", - "example": "webhook = get_tenant_config('tenant1', 'notification-service', 'dingtalk_webhook')" - }, - { - "name": "get_all_tenant_configs", - "signature": "get_all_tenant_configs(app_code: str)", - "description": "获取所有租户的应用配置", - "example": "configs = get_all_tenant_configs('notification-service')" - }, - { - "name": "get_secret", - "signature": "get_secret(key: str)", - "description": "获取密钥(优先租户级)", - "example": "api_key = get_secret('api_key')" - } - ], - "variables": [ - {"name": "task_id", "description": "当前任务ID"}, - {"name": "tenant_id", "description": "当前租户ID"}, - {"name": "trace_id", "description": "当前执行追踪ID"} - ], - "libraries": [ - {"name": "json", "description": "JSON处理"}, - {"name": "re", "description": "正则表达式"}, - {"name": "math", "description": "数学函数"}, - {"name": "random", "description": "随机数"}, - {"name": "hashlib", "description": "哈希函数"}, - {"name": "base64", "description": "Base64编解码"}, - {"name": "datetime", "description": "日期时间处理"}, - {"name": "timedelta", "description": "时间差"}, - {"name": "urlencode/quote/unquote", "description": "URL编码"} - ] - } - - -@router.post("/test-script") -async def test_script(data: TestScriptRequest, db: Session = Depends(get_db)): - """测试脚本执行""" - executor = ScriptExecutor(db) - result = executor.test_script( - script_content=data.script_content, - task_id=0, - tenant_id=data.tenant_id, - params=data.params - ) - return result - - -@router.get("/secrets") -async def list_secrets( - tenant_id: Optional[str] = None, - db: Session = Depends(get_db) -): - """获取密钥列表""" - query = db.query(Secret) - if tenant_id: - query = query.filter(Secret.tenant_id == tenant_id) - - items = query.order_by(desc(Secret.created_at)).all() - - return { - "items": [ - { - "id": s.id, - "tenant_id": s.tenant_id, - "secret_key": s.secret_key, - "description": s.description, - "created_at": s.created_at, - "updated_at": s.updated_at - } - for s in items - ] - } - - -@router.post("/secrets") -async def create_secret(data: SecretCreate, db: Session = Depends(get_db)): - """创建密钥""" - secret = Secret( - tenant_id=data.tenant_id, - secret_key=data.secret_key, - secret_value=data.secret_value, - description=data.description - ) - db.add(secret) - db.commit() - db.refresh(secret) - - return {"success": True, "id": secret.id} - - -# ==================== Task CRUD ==================== - -@router.get("") -async def list_tasks( - tenant_id: Optional[str] = None, - status: Optional[int] = None, - page: int = Query(1, ge=1), - size: int = Query(20, ge=1, le=100), - db: Session = Depends(get_db) -): - """获取任务列表""" - query = db.query(ScheduledTask) - - if tenant_id: - query = query.filter(ScheduledTask.tenant_id == tenant_id) - if status is not None: - is_enabled = status == 1 - query = query.filter(ScheduledTask.is_enabled == is_enabled) - - total = query.count() - items = query.order_by(desc(ScheduledTask.created_at)).offset((page - 1) * size).limit(size).all() - - return { - "total": total, - "items": [format_task(t) for t in items] - } - - -@router.get("/{task_id}") -async def get_task(task_id: int, db: Session = Depends(get_db)): - """获取任务详情""" - task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() - if not task: - raise HTTPException(status_code=404, detail="任务不存在") - return format_task(task, include_content=True) - - -@router.post("") -async def create_task(data: TaskCreate, db: Session = Depends(get_db)): - """创建任务""" - task = ScheduledTask( - tenant_id=data.tenant_id, - task_name=data.task_name, - task_desc=data.task_desc, - execution_type=data.execution_type, - schedule_type=data.schedule_type, - time_points=data.time_points, - cron_expression=data.cron_expression, - webhook_url=data.webhook_url, - script_content=data.script_content, - input_params=data.input_params, - retry_count=data.retry_count, - retry_interval=data.retry_interval, - alert_on_failure=data.alert_on_failure, - alert_webhook=data.alert_webhook, - notify_channels=data.notify_channels, - notify_wecom_app_id=data.notify_wecom_app_id, - is_enabled=True - ) - - db.add(task) - db.commit() - db.refresh(task) - - # 添加到调度器 - scheduler_service.add_task(task.id) - - return {"success": True, "id": task.id} - - -@router.put("/{task_id}") -async def update_task(task_id: int, data: TaskUpdate, db: Session = Depends(get_db)): - """更新任务""" - task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() - if not task: - raise HTTPException(status_code=404, detail="任务不存在") - - # 更新字段 - if data.tenant_id is not None: - task.tenant_id = data.tenant_id - if data.task_name is not None: - task.task_name = data.task_name - if data.task_desc is not None: - task.task_desc = data.task_desc - if data.execution_type is not None: - task.execution_type = data.execution_type - if data.schedule_type is not None: - task.schedule_type = data.schedule_type - if data.time_points is not None: - task.time_points = data.time_points - if data.cron_expression is not None: - task.cron_expression = data.cron_expression - if data.webhook_url is not None: - task.webhook_url = data.webhook_url - if data.script_content is not None: - task.script_content = data.script_content - if data.input_params is not None: - task.input_params = data.input_params - if data.retry_count is not None: - task.retry_count = data.retry_count - if data.retry_interval is not None: - task.retry_interval = data.retry_interval - if data.alert_on_failure is not None: - task.alert_on_failure = data.alert_on_failure - if data.alert_webhook is not None: - task.alert_webhook = data.alert_webhook - if data.notify_channels is not None: - task.notify_channels = data.notify_channels - if data.notify_wecom_app_id is not None: - task.notify_wecom_app_id = data.notify_wecom_app_id - if data.is_enabled is not None: - task.is_enabled = data.is_enabled - - db.commit() - - # 更新调度器 - if task.is_enabled: - scheduler_service.add_task(task.id) - else: - scheduler_service.remove_task(task.id) - - return {"success": True} - - -@router.delete("/{task_id}") -async def delete_task(task_id: int, db: Session = Depends(get_db)): - """删除任务""" - task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() - if not task: - raise HTTPException(status_code=404, detail="任务不存在") - - # 从调度器移除 - scheduler_service.remove_task(task_id) - - # 删除相关日志 - db.query(TaskLog).filter(TaskLog.task_id == task_id).delete() - - db.delete(task) - db.commit() - - return {"success": True} - - -@router.post("/{task_id}/toggle") -async def toggle_task(task_id: int, db: Session = Depends(get_db)): - """启用/禁用任务""" - task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() - if not task: - raise HTTPException(status_code=404, detail="任务不存在") - - task.is_enabled = not task.is_enabled - db.commit() - - if task.is_enabled: - scheduler_service.add_task(task.id) - else: - scheduler_service.remove_task(task.id) - - return {"success": True, "status": 1 if task.is_enabled else 0} - - -@router.post("/{task_id}/run") -async def run_task(task_id: int, background_tasks: BackgroundTasks, db: Session = Depends(get_db)): - """立即执行任务""" - task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() - if not task: - raise HTTPException(status_code=404, detail="任务不存在") - - result = await scheduler_service.run_task_now(task_id) - return result - - -# ==================== Task Logs ==================== - -@router.get("/{task_id}/logs") -async def get_task_logs( - task_id: int, - status: Optional[str] = None, - page: int = Query(1, ge=1), - size: int = Query(20, ge=1, le=100), - db: Session = Depends(get_db) -): - """获取任务执行日志""" - query = db.query(TaskLog).filter(TaskLog.task_id == task_id) - - if status: - query = query.filter(TaskLog.status == status) - - total = query.count() - items = query.order_by(desc(TaskLog.started_at)).offset((page - 1) * size).limit(size).all() - - return { - "total": total, - "items": [format_log(log) for log in items] - } - - -# ==================== Secrets (dynamic routes) ==================== - -@router.put("/secrets/{secret_id}") -async def update_secret(secret_id: int, data: SecretUpdate, db: Session = Depends(get_db)): - """更新密钥""" - secret = db.query(Secret).filter(Secret.id == secret_id).first() - if not secret: - raise HTTPException(status_code=404, detail="密钥不存在") - - if data.secret_value is not None: - secret.secret_value = data.secret_value - if data.description is not None: - secret.description = data.description - - db.commit() - return {"success": True} - - -@router.delete("/secrets/{secret_id}") -async def delete_secret(secret_id: int, db: Session = Depends(get_db)): - """删除密钥""" - secret = db.query(Secret).filter(Secret.id == secret_id).first() - if not secret: - raise HTTPException(status_code=404, detail="密钥不存在") - - db.delete(secret) - db.commit() - return {"success": True} - - -# ==================== Helpers ==================== - -def format_task(task: ScheduledTask, include_content: bool = False) -> dict: - """格式化任务数据""" - time_points = task.time_points - if isinstance(time_points, str): - try: - time_points = json.loads(time_points) - except: - time_points = [] - - # 处理 notify_channels - notify_channels = task.notify_channels - if isinstance(notify_channels, str): - try: - notify_channels = json.loads(notify_channels) - except: - notify_channels = [] - - data = { - "id": task.id, - "tenant_id": task.tenant_id, - "task_name": task.task_name, - "task_type": task.execution_type, # 前端使用 task_type - "schedule_type": task.schedule_type, - "time_points": time_points or [], - "cron_expression": task.cron_expression, - "status": 1 if task.is_enabled else 0, # 前端使用 status - "last_run_at": task.last_run_at, - "last_run_status": task.last_run_status, - "retry_count": task.retry_count, - "retry_interval": task.retry_interval, - "alert_on_failure": bool(task.alert_on_failure), - "alert_webhook": task.alert_webhook, - "notify_channels": notify_channels or [], - "notify_wecom_app_id": task.notify_wecom_app_id, - "created_at": task.created_at, - "updated_at": task.updated_at - } - - if include_content: - data["webhook_url"] = task.webhook_url - data["script_content"] = task.script_content - input_params = task.input_params - if isinstance(input_params, str): - try: - input_params = json.loads(input_params) - except: - input_params = None - data["input_params"] = input_params - - return data - - -def format_log(log: TaskLog) -> dict: - """格式化日志数据""" - return { - "id": log.id, - "task_id": log.task_id, - "tenant_id": log.tenant_id, - "trace_id": log.trace_id, - "status": log.status, - "started_at": log.started_at, - "finished_at": log.finished_at, - "duration_ms": log.duration_ms, - "output": log.output, - "error": log.error, - "retry_count": log.retry_count, - "created_at": log.created_at - } +"""定时任务API路由""" +import json +from datetime import datetime +from typing import Optional, List +from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks +from pydantic import BaseModel +from sqlalchemy.orm import Session +from sqlalchemy import desc + +from ..database import get_db +from ..models.scheduled_task import ScheduledTask, TaskLog, Secret +from ..services.scheduler import scheduler_service +from ..services.script_executor import ScriptExecutor + + +router = APIRouter(prefix="/api/scheduled-tasks", tags=["scheduled-tasks"]) + + +# ==================== Schemas ==================== + +class TaskCreate(BaseModel): + tenant_id: Optional[str] = None + task_name: str + task_desc: Optional[str] = None + execution_type: str = 'script' + schedule_type: str = 'simple' + time_points: Optional[List[str]] = None + cron_expression: Optional[str] = None + webhook_url: Optional[str] = None + script_content: Optional[str] = None + input_params: Optional[dict] = None + retry_count: Optional[int] = 0 + retry_interval: Optional[int] = 60 + alert_on_failure: Optional[bool] = False + alert_webhook: Optional[str] = None + notify_channels: Optional[List[int]] = None # 通知渠道ID列表 + notify_wecom_app_id: Optional[int] = None # 企微应用ID + + +class TaskUpdate(BaseModel): + tenant_id: Optional[str] = None + task_name: Optional[str] = None + task_desc: Optional[str] = None + execution_type: Optional[str] = None + schedule_type: Optional[str] = None + time_points: Optional[List[str]] = None + cron_expression: Optional[str] = None + webhook_url: Optional[str] = None + script_content: Optional[str] = None + input_params: Optional[dict] = None + retry_count: Optional[int] = None + retry_interval: Optional[int] = None + alert_on_failure: Optional[bool] = None + alert_webhook: Optional[str] = None + is_enabled: Optional[bool] = None + notify_channels: Optional[List[int]] = None + notify_wecom_app_id: Optional[int] = None + + +class SecretCreate(BaseModel): + tenant_id: Optional[str] = None + secret_key: str + secret_value: str + description: Optional[str] = None + + +class SecretUpdate(BaseModel): + secret_value: Optional[str] = None + description: Optional[str] = None + + +class TestScriptRequest(BaseModel): + script_content: str + tenant_id: Optional[str] = None + params: Optional[dict] = None + + +# ==================== Static Routes (must be before dynamic routes) ==================== + +@router.get("/sdk-docs") +async def get_sdk_docs(): + """获取SDK文档""" + return { + "functions": [ + { + "name": "log", + "signature": "log(message: str, level: str = 'INFO')", + "description": "记录日志", + "example": "log('处理完成', 'INFO')" + }, + { + "name": "print", + "signature": "print(*args)", + "description": "打印输出", + "example": "print('Hello', 'World')" + }, + { + "name": "ai", + "signature": "ai(prompt: str, system: str = None, model: str = None, temperature: float = 0.7)", + "description": "调用AI模型", + "example": "result = ai('生成一段问候语', system='你是友善的助手')" + }, + { + "name": "dingtalk", + "signature": "dingtalk(webhook: str, content: str, title: str = None, at_all: bool = False)", + "description": "发送钉钉消息", + "example": "dingtalk(webhook_url, '# 标题\\n内容')" + }, + { + "name": "wecom", + "signature": "wecom(webhook: str, content: str, msg_type: str = 'markdown')", + "description": "发送企微消息", + "example": "wecom(webhook_url, '消息内容')" + }, + { + "name": "http_get", + "signature": "http_get(url: str, headers: dict = None, params: dict = None)", + "description": "发起GET请求", + "example": "resp = http_get('https://api.example.com/data')" + }, + { + "name": "http_post", + "signature": "http_post(url: str, data: any = None, headers: dict = None)", + "description": "发起POST请求", + "example": "resp = http_post('https://api.example.com/submit', {'key': 'value'})" + }, + { + "name": "db_query", + "signature": "db_query(sql: str, params: dict = None)", + "description": "执行只读SQL查询", + "example": "rows = db_query('SELECT * FROM users WHERE status = :status', {'status': 1})" + }, + { + "name": "get_var", + "signature": "get_var(key: str, default: any = None)", + "description": "获取持久化变量", + "example": "counter = get_var('counter', 0)" + }, + { + "name": "set_var", + "signature": "set_var(key: str, value: any)", + "description": "设置持久化变量", + "example": "set_var('counter', counter + 1)" + }, + { + "name": "del_var", + "signature": "del_var(key: str)", + "description": "删除持久化变量", + "example": "del_var('temp_data')" + }, + { + "name": "get_param", + "signature": "get_param(key: str, default: any = None)", + "description": "获取任务参数", + "example": "prompt = get_param('prompt', '默认提示词')" + }, + { + "name": "get_params", + "signature": "get_params()", + "description": "获取所有任务参数", + "example": "params = get_params()" + }, + { + "name": "get_tenants", + "signature": "get_tenants(app_code: str = None)", + "description": "获取租户列表", + "example": "tenants = get_tenants('notification-service')" + }, + { + "name": "get_tenant_config", + "signature": "get_tenant_config(tenant_id: str, app_code: str, key: str = None)", + "description": "获取租户的应用配置", + "example": "webhook = get_tenant_config('tenant1', 'notification-service', 'dingtalk_webhook')" + }, + { + "name": "get_all_tenant_configs", + "signature": "get_all_tenant_configs(app_code: str)", + "description": "获取所有租户的应用配置", + "example": "configs = get_all_tenant_configs('notification-service')" + }, + { + "name": "get_secret", + "signature": "get_secret(key: str)", + "description": "获取密钥(优先租户级)", + "example": "api_key = get_secret('api_key')" + } + ], + "variables": [ + {"name": "task_id", "description": "当前任务ID"}, + {"name": "tenant_id", "description": "当前租户ID"}, + {"name": "trace_id", "description": "当前执行追踪ID"} + ], + "libraries": [ + {"name": "json", "description": "JSON处理"}, + {"name": "re", "description": "正则表达式"}, + {"name": "math", "description": "数学函数"}, + {"name": "random", "description": "随机数"}, + {"name": "hashlib", "description": "哈希函数"}, + {"name": "base64", "description": "Base64编解码"}, + {"name": "datetime", "description": "日期时间处理"}, + {"name": "timedelta", "description": "时间差"}, + {"name": "urlencode/quote/unquote", "description": "URL编码"} + ] + } + + +@router.post("/test-script") +async def test_script(data: TestScriptRequest, db: Session = Depends(get_db)): + """测试脚本执行""" + executor = ScriptExecutor(db) + result = executor.test_script( + script_content=data.script_content, + task_id=0, + tenant_id=data.tenant_id, + params=data.params + ) + return result + + +@router.get("/secrets") +async def list_secrets( + tenant_id: Optional[str] = None, + db: Session = Depends(get_db) +): + """获取密钥列表""" + query = db.query(Secret) + if tenant_id: + query = query.filter(Secret.tenant_id == tenant_id) + + items = query.order_by(desc(Secret.created_at)).all() + + return { + "items": [ + { + "id": s.id, + "tenant_id": s.tenant_id, + "secret_key": s.secret_key, + "description": s.description, + "created_at": s.created_at, + "updated_at": s.updated_at + } + for s in items + ] + } + + +@router.post("/secrets") +async def create_secret(data: SecretCreate, db: Session = Depends(get_db)): + """创建密钥""" + secret = Secret( + tenant_id=data.tenant_id, + secret_key=data.secret_key, + secret_value=data.secret_value, + description=data.description + ) + db.add(secret) + db.commit() + db.refresh(secret) + + return {"success": True, "id": secret.id} + + +# ==================== Task CRUD ==================== + +@router.get("") +async def list_tasks( + tenant_id: Optional[str] = None, + status: Optional[int] = None, + page: int = Query(1, ge=1), + size: int = Query(20, ge=1, le=100), + db: Session = Depends(get_db) +): + """获取任务列表""" + query = db.query(ScheduledTask) + + if tenant_id: + query = query.filter(ScheduledTask.tenant_id == tenant_id) + if status is not None: + is_enabled = status == 1 + query = query.filter(ScheduledTask.is_enabled == is_enabled) + + total = query.count() + items = query.order_by(desc(ScheduledTask.created_at)).offset((page - 1) * size).limit(size).all() + + return { + "total": total, + "items": [format_task(t) for t in items] + } + + +@router.get("/{task_id}") +async def get_task(task_id: int, db: Session = Depends(get_db)): + """获取任务详情""" + task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() + if not task: + raise HTTPException(status_code=404, detail="任务不存在") + return format_task(task, include_content=True) + + +@router.post("") +async def create_task(data: TaskCreate, db: Session = Depends(get_db)): + """创建任务""" + task = ScheduledTask( + tenant_id=data.tenant_id, + task_name=data.task_name, + task_desc=data.task_desc, + execution_type=data.execution_type, + schedule_type=data.schedule_type, + time_points=data.time_points, + cron_expression=data.cron_expression, + webhook_url=data.webhook_url, + script_content=data.script_content, + input_params=data.input_params, + retry_count=data.retry_count, + retry_interval=data.retry_interval, + alert_on_failure=data.alert_on_failure, + alert_webhook=data.alert_webhook, + notify_channels=data.notify_channels, + notify_wecom_app_id=data.notify_wecom_app_id, + is_enabled=True + ) + + db.add(task) + db.commit() + db.refresh(task) + + # 添加到调度器 + scheduler_service.add_task(task.id) + + return {"success": True, "id": task.id} + + +@router.put("/{task_id}") +async def update_task(task_id: int, data: TaskUpdate, db: Session = Depends(get_db)): + """更新任务""" + task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() + if not task: + raise HTTPException(status_code=404, detail="任务不存在") + + # 更新字段 + if data.tenant_id is not None: + task.tenant_id = data.tenant_id + if data.task_name is not None: + task.task_name = data.task_name + if data.task_desc is not None: + task.task_desc = data.task_desc + if data.execution_type is not None: + task.execution_type = data.execution_type + if data.schedule_type is not None: + task.schedule_type = data.schedule_type + if data.time_points is not None: + task.time_points = data.time_points + if data.cron_expression is not None: + task.cron_expression = data.cron_expression + if data.webhook_url is not None: + task.webhook_url = data.webhook_url + if data.script_content is not None: + task.script_content = data.script_content + if data.input_params is not None: + task.input_params = data.input_params + if data.retry_count is not None: + task.retry_count = data.retry_count + if data.retry_interval is not None: + task.retry_interval = data.retry_interval + if data.alert_on_failure is not None: + task.alert_on_failure = data.alert_on_failure + if data.alert_webhook is not None: + task.alert_webhook = data.alert_webhook + if data.notify_channels is not None: + task.notify_channels = data.notify_channels + if data.notify_wecom_app_id is not None: + task.notify_wecom_app_id = data.notify_wecom_app_id + if data.is_enabled is not None: + task.is_enabled = data.is_enabled + + db.commit() + + # 更新调度器 + if task.is_enabled: + scheduler_service.add_task(task.id) + else: + scheduler_service.remove_task(task.id) + + return {"success": True} + + +@router.delete("/{task_id}") +async def delete_task(task_id: int, db: Session = Depends(get_db)): + """删除任务""" + task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() + if not task: + raise HTTPException(status_code=404, detail="任务不存在") + + # 从调度器移除 + scheduler_service.remove_task(task_id) + + # 删除相关日志 + db.query(TaskLog).filter(TaskLog.task_id == task_id).delete() + + db.delete(task) + db.commit() + + return {"success": True} + + +@router.post("/{task_id}/toggle") +async def toggle_task(task_id: int, db: Session = Depends(get_db)): + """启用/禁用任务""" + task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() + if not task: + raise HTTPException(status_code=404, detail="任务不存在") + + task.is_enabled = not task.is_enabled + db.commit() + + if task.is_enabled: + scheduler_service.add_task(task.id) + else: + scheduler_service.remove_task(task.id) + + return {"success": True, "status": 1 if task.is_enabled else 0} + + +@router.post("/{task_id}/run") +async def run_task(task_id: int, background_tasks: BackgroundTasks, db: Session = Depends(get_db)): + """立即执行任务""" + task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() + if not task: + raise HTTPException(status_code=404, detail="任务不存在") + + result = await scheduler_service.run_task_now(task_id) + return result + + +# ==================== Task Logs ==================== + +@router.get("/{task_id}/logs") +async def get_task_logs( + task_id: int, + status: Optional[str] = None, + page: int = Query(1, ge=1), + size: int = Query(20, ge=1, le=100), + db: Session = Depends(get_db) +): + """获取任务执行日志""" + query = db.query(TaskLog).filter(TaskLog.task_id == task_id) + + if status: + query = query.filter(TaskLog.status == status) + + total = query.count() + items = query.order_by(desc(TaskLog.started_at)).offset((page - 1) * size).limit(size).all() + + return { + "total": total, + "items": [format_log(log) for log in items] + } + + +# ==================== Secrets (dynamic routes) ==================== + +@router.put("/secrets/{secret_id}") +async def update_secret(secret_id: int, data: SecretUpdate, db: Session = Depends(get_db)): + """更新密钥""" + secret = db.query(Secret).filter(Secret.id == secret_id).first() + if not secret: + raise HTTPException(status_code=404, detail="密钥不存在") + + if data.secret_value is not None: + secret.secret_value = data.secret_value + if data.description is not None: + secret.description = data.description + + db.commit() + return {"success": True} + + +@router.delete("/secrets/{secret_id}") +async def delete_secret(secret_id: int, db: Session = Depends(get_db)): + """删除密钥""" + secret = db.query(Secret).filter(Secret.id == secret_id).first() + if not secret: + raise HTTPException(status_code=404, detail="密钥不存在") + + db.delete(secret) + db.commit() + return {"success": True} + + +# ==================== Helpers ==================== + +def format_task(task: ScheduledTask, include_content: bool = False) -> dict: + """格式化任务数据""" + time_points = task.time_points + if isinstance(time_points, str): + try: + time_points = json.loads(time_points) + except: + time_points = [] + + # 处理 notify_channels + notify_channels = task.notify_channels + if isinstance(notify_channels, str): + try: + notify_channels = json.loads(notify_channels) + except: + notify_channels = [] + + data = { + "id": task.id, + "tenant_id": task.tenant_id, + "task_name": task.task_name, + "task_type": task.execution_type, # 前端使用 task_type + "schedule_type": task.schedule_type, + "time_points": time_points or [], + "cron_expression": task.cron_expression, + "status": 1 if task.is_enabled else 0, # 前端使用 status + "last_run_at": task.last_run_at, + "last_run_status": task.last_run_status, + "retry_count": task.retry_count, + "retry_interval": task.retry_interval, + "alert_on_failure": bool(task.alert_on_failure), + "alert_webhook": task.alert_webhook, + "notify_channels": notify_channels or [], + "notify_wecom_app_id": task.notify_wecom_app_id, + "created_at": task.created_at, + "updated_at": task.updated_at + } + + if include_content: + data["webhook_url"] = task.webhook_url + data["script_content"] = task.script_content + input_params = task.input_params + if isinstance(input_params, str): + try: + input_params = json.loads(input_params) + except: + input_params = None + data["input_params"] = input_params + + return data + + +def format_log(log: TaskLog) -> dict: + """格式化日志数据""" + return { + "id": log.id, + "task_id": log.task_id, + "tenant_id": log.tenant_id, + "trace_id": log.trace_id, + "status": log.status, + "started_at": log.started_at, + "finished_at": log.finished_at, + "duration_ms": log.duration_ms, + "output": log.output, + "error": log.error, + "retry_count": log.retry_count, + "created_at": log.created_at + } diff --git a/backend/app/routers/tool_configs.py b/backend/app/routers/tool_configs.py index 485adde..a7fcc72 100644 --- a/backend/app/routers/tool_configs.py +++ b/backend/app/routers/tool_configs.py @@ -354,7 +354,8 @@ async def get_config_types(): {"code": "datasource", "name": "数据源配置", "description": "数据库连接等"}, {"code": "jssdk", "name": "JS-SDK 配置", "description": "企微侧边栏等"}, {"code": "webhook", "name": "Webhook 配置", "description": "n8n 工作流地址等"}, - {"code": "params", "name": "工具参数", "description": "各工具的自定义参数"} + {"code": "params", "name": "工具参数", "description": "各工具的自定义参数"}, + {"code": "external_api", "name": "外部API配置", "description": "睿美云等外部系统对接"} ] } @@ -387,5 +388,11 @@ async def get_config_keys(): {"key": "default_data_tenant_id", "name": "默认数据租户ID", "encrypted": False}, {"key": "enable_deep_thinking", "name": "启用深度思考", "encrypted": False}, {"key": "max_history_rounds", "name": "最大历史轮数", "encrypted": False} + ], + "external_api": [ + {"key": "ruimeiyun_base_url", "name": "睿美云 API 地址", "encrypted": False}, + {"key": "ruimeiyun_account", "name": "睿美云 TPOS 账号", "encrypted": False}, + {"key": "ruimeiyun_private_key", "name": "睿美云 RSA 私钥", "encrypted": True}, + {"key": "ruimeiyun_allowed_apis", "name": "允许的接口列表(JSON)", "encrypted": False} ] } diff --git a/backend/app/services/ruimeiyun/__init__.py b/backend/app/services/ruimeiyun/__init__.py new file mode 100644 index 0000000..d8ed8a5 --- /dev/null +++ b/backend/app/services/ruimeiyun/__init__.py @@ -0,0 +1,10 @@ +""" +睿美云对接服务 + +提供睿美云开放接口的代理调用能力,支持多租户配置。 +""" + +from .client import RuimeiyunClient +from .registry import RUIMEIYUN_APIS, get_api_definition + +__all__ = ["RuimeiyunClient", "RUIMEIYUN_APIS", "get_api_definition"] diff --git a/backend/app/services/ruimeiyun/auth.py b/backend/app/services/ruimeiyun/auth.py new file mode 100644 index 0000000..75b153f --- /dev/null +++ b/backend/app/services/ruimeiyun/auth.py @@ -0,0 +1,124 @@ +""" +睿美云 TPOS 鉴权 + +实现睿美云开放接口的身份验证机制: +- tpos-timestamp: 请求时间戳(秒级) +- tpos-account: 账号 +- tpos-nonce-str: 随机字符串 +- tpos-sign: SHA256WithRSA 签名 + +签名算法: +1. 组合待签名字符串: {timestamp}&{nonce_str} +2. 使用私钥进行 SHA256WithRSA 签名 +3. Base64 编码签名结果 +""" + +import time +import uuid +import base64 +import logging +from typing import Dict + +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.backends import default_backend + +logger = logging.getLogger(__name__) + + +class TposAuthError(Exception): + """TPOS 鉴权错误""" + pass + + +def build_tpos_headers(account: str, private_key_pem: str) -> Dict[str, str]: + """ + 构建 TPOS 鉴权请求头 + + Args: + account: TPOS 账号(由睿美云提供) + private_key_pem: RSA 私钥(PEM 格式) + + Returns: + 包含鉴权信息的请求头字典 + + Raises: + TposAuthError: 签名失败时抛出 + """ + try: + # 1. 生成时间戳和随机字符串 + timestamp = str(int(time.time())) + nonce_str = uuid.uuid4().hex + + # 2. 组合待签名字符串 + sign_content = f"{timestamp}&{nonce_str}" + + # 3. 加载私钥 + private_key = serialization.load_pem_private_key( + private_key_pem.encode('utf-8'), + password=None, + backend=default_backend() + ) + + # 4. SHA256WithRSA 签名 + signature = private_key.sign( + sign_content.encode('utf-8'), + padding.PKCS1v15(), + hashes.SHA256() + ) + + # 5. Base64 编码 + sign_base64 = base64.b64encode(signature).decode('utf-8') + + return { + "tpos-timestamp": timestamp, + "tpos-account": account, + "tpos-nonce-str": nonce_str, + "tpos-sign": sign_base64 + } + + except Exception as e: + logger.error(f"TPOS 签名失败: {e}") + raise TposAuthError(f"签名失败: {str(e)}") + + +def validate_private_key(private_key_pem: str) -> bool: + """ + 验证私钥格式是否正确 + + Args: + private_key_pem: RSA 私钥(PEM 格式) + + Returns: + True 如果私钥有效,否则 False + """ + try: + serialization.load_pem_private_key( + private_key_pem.encode('utf-8'), + password=None, + backend=default_backend() + ) + return True + except Exception as e: + logger.warning(f"私钥验证失败: {e}") + return False + + +def mask_private_key(private_key_pem: str, show_chars: int = 50) -> str: + """ + 对私钥进行脱敏处理,用于日志显示 + + Args: + private_key_pem: RSA 私钥(PEM 格式) + show_chars: 显示的字符数 + + Returns: + 脱敏后的字符串 + """ + if not private_key_pem: + return "" + + if len(private_key_pem) <= show_chars * 2: + return "****" + + return f"{private_key_pem[:show_chars]}...****...{private_key_pem[-show_chars:]}" diff --git a/backend/app/services/ruimeiyun/client.py b/backend/app/services/ruimeiyun/client.py new file mode 100644 index 0000000..8213dc5 --- /dev/null +++ b/backend/app/services/ruimeiyun/client.py @@ -0,0 +1,325 @@ +""" +睿美云 API 客户端 + +提供统一的睿美云接口调用能力: +- 自动加载租户配置 +- 自动构建 TPOS 鉴权头 +- 统一错误处理 +- 请求日志记录 +""" + +import json +import logging +from typing import Dict, Any, Optional, List +from dataclasses import dataclass + +import httpx +from sqlalchemy.orm import Session + +from .auth import build_tpos_headers, TposAuthError +from .registry import get_api_definition, RUIMEIYUN_APIS +from ..crypto import decrypt_value + +logger = logging.getLogger(__name__) + +# 请求超时设置 +DEFAULT_TIMEOUT = 30.0 # 秒 +LONG_TIMEOUT = 60.0 # 长时间操作 + + +@dataclass +class RuimeiyunConfig: + """睿美云配置""" + base_url: str + account: str + private_key: str + allowed_apis: Optional[List[str]] = None + + +@dataclass +class RuimeiyunResponse: + """睿美云响应""" + success: bool + data: Optional[Any] = None + error: Optional[str] = None + status_code: int = 200 + raw_response: Optional[Dict] = None + + +class RuimeiyunError(Exception): + """睿美云调用错误""" + def __init__(self, message: str, status_code: int = 500, response: Any = None): + super().__init__(message) + self.status_code = status_code + self.response = response + + +class RuimeiyunClient: + """ + 睿美云 API 客户端 + + 使用方式: + client = RuimeiyunClient(tenant_id, db) + result = await client.call("customer.search", params={"keyword": "13800138000"}) + """ + + def __init__(self, tenant_id: str, db: Session): + """ + 初始化客户端 + + Args: + tenant_id: 租户ID + db: 数据库会话 + """ + self.tenant_id = tenant_id + self.db = db + self.config = self._load_config() + + def _load_config(self) -> RuimeiyunConfig: + """从数据库加载租户的睿美云配置""" + from ...models.tool_config import ToolConfig + + # 查询租户的睿美云配置 + configs = self.db.query(ToolConfig).filter( + ToolConfig.tenant_id == self.tenant_id, + ToolConfig.tool_code == "ruimeiyun", + ToolConfig.config_type == "external_api", + ToolConfig.status == 1 + ).all() + + if not configs: + raise RuimeiyunError( + f"租户 {self.tenant_id} 未配置睿美云连接信息", + status_code=400 + ) + + # 转换为字典 + config_dict = {} + for c in configs: + value = c.config_value + # 解密加密字段 + if c.is_encrypted and value: + try: + value = decrypt_value(value) + except Exception as e: + logger.error(f"解密配置失败: {c.config_key}, {e}") + raise RuimeiyunError(f"配置解密失败: {c.config_key}") + config_dict[c.config_key] = value + + # 验证必填配置 + required = ["ruimeiyun_base_url", "ruimeiyun_account", "ruimeiyun_private_key"] + for key in required: + if not config_dict.get(key): + raise RuimeiyunError(f"缺少必填配置: {key}", status_code=400) + + # 解析允许的接口列表 + allowed_apis = None + if config_dict.get("ruimeiyun_allowed_apis"): + try: + allowed_apis = json.loads(config_dict["ruimeiyun_allowed_apis"]) + except json.JSONDecodeError: + logger.warning(f"解析 allowed_apis 失败: {config_dict.get('ruimeiyun_allowed_apis')}") + + return RuimeiyunConfig( + base_url=config_dict["ruimeiyun_base_url"].rstrip("/"), + account=config_dict["ruimeiyun_account"], + private_key=config_dict["ruimeiyun_private_key"], + allowed_apis=allowed_apis + ) + + def _check_permission(self, api_name: str): + """检查是否有权限调用该接口""" + if self.config.allowed_apis is not None: + if api_name not in self.config.allowed_apis: + raise RuimeiyunError( + f"租户无权调用接口: {api_name}", + status_code=403 + ) + + async def call( + self, + api_name: str, + params: Optional[Dict[str, Any]] = None, + body: Optional[Dict[str, Any]] = None, + timeout: float = DEFAULT_TIMEOUT + ) -> RuimeiyunResponse: + """ + 调用睿美云接口 + + Args: + api_name: 接口名称,如 customer.search + params: URL 查询参数 + body: 请求体(POST 请求) + timeout: 超时时间(秒) + + Returns: + RuimeiyunResponse 对象 + + Raises: + RuimeiyunError: 调用失败时抛出 + """ + # 1. 获取接口定义 + api_def = get_api_definition(api_name) + if not api_def: + raise RuimeiyunError(f"未知接口: {api_name}", status_code=400) + + # 2. 检查权限 + self._check_permission(api_name) + + # 3. 构建请求 + method = api_def["method"] + url = f"{self.config.base_url}{api_def['path']}" + + # 4. 构建鉴权头 + try: + auth_headers = build_tpos_headers( + self.config.account, + self.config.private_key + ) + except TposAuthError as e: + raise RuimeiyunError(str(e), status_code=500) + + headers = { + **auth_headers, + "Content-Type": "application/json" + } + + # 5. 发送请求 + logger.info(f"调用睿美云接口: {api_name} ({method} {api_def['path']})") + + try: + async with httpx.AsyncClient(timeout=timeout) as client: + if method == "GET": + response = await client.get(url, params=params, headers=headers) + else: + response = await client.post(url, params=params, json=body, headers=headers) + + # 6. 处理响应 + status_code = response.status_code + + try: + response_data = response.json() + except Exception: + response_data = {"raw": response.text} + + # 睿美云响应格式通常为: {"code": 0, "data": ..., "msg": "success"} + if status_code == 200: + # 检查业务状态码 + code = response_data.get("code") + if code == 0 or code == "0" or code is None: + return RuimeiyunResponse( + success=True, + data=response_data.get("data", response_data), + status_code=status_code, + raw_response=response_data + ) + else: + return RuimeiyunResponse( + success=False, + error=response_data.get("msg", response_data.get("message", "未知错误")), + status_code=status_code, + raw_response=response_data + ) + else: + return RuimeiyunResponse( + success=False, + error=f"HTTP {status_code}: {response_data}", + status_code=status_code, + raw_response=response_data + ) + + except httpx.TimeoutException: + logger.error(f"睿美云接口超时: {api_name}") + raise RuimeiyunError(f"接口超时: {api_name}", status_code=504) + except httpx.RequestError as e: + logger.error(f"睿美云接口请求错误: {api_name}, {e}") + raise RuimeiyunError(f"请求错误: {str(e)}", status_code=502) + + async def call_raw( + self, + method: str, + path: str, + params: Optional[Dict[str, Any]] = None, + body: Optional[Dict[str, Any]] = None, + timeout: float = DEFAULT_TIMEOUT + ) -> RuimeiyunResponse: + """ + 直接调用睿美云接口(不经过注册表) + + 用于调用未在注册表中定义的接口 + + Args: + method: HTTP 方法 + path: API 路径 + params: URL 查询参数 + body: 请求体 + timeout: 超时时间 + + Returns: + RuimeiyunResponse 对象 + """ + url = f"{self.config.base_url}{path}" + + try: + auth_headers = build_tpos_headers( + self.config.account, + self.config.private_key + ) + except TposAuthError as e: + raise RuimeiyunError(str(e), status_code=500) + + headers = { + **auth_headers, + "Content-Type": "application/json" + } + + logger.info(f"调用睿美云接口(raw): {method} {path}") + + try: + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.request( + method=method, + url=url, + params=params, + json=body if method != "GET" else None, + headers=headers + ) + + try: + response_data = response.json() + except Exception: + response_data = {"raw": response.text} + + if response.status_code == 200: + code = response_data.get("code") + if code == 0 or code == "0" or code is None: + return RuimeiyunResponse( + success=True, + data=response_data.get("data", response_data), + status_code=response.status_code, + raw_response=response_data + ) + else: + return RuimeiyunResponse( + success=False, + error=response_data.get("msg", "未知错误"), + status_code=response.status_code, + raw_response=response_data + ) + else: + return RuimeiyunResponse( + success=False, + error=f"HTTP {response.status_code}", + status_code=response.status_code, + raw_response=response_data + ) + + except httpx.TimeoutException: + raise RuimeiyunError("接口超时", status_code=504) + except httpx.RequestError as e: + raise RuimeiyunError(f"请求错误: {str(e)}", status_code=502) + + @staticmethod + def get_available_apis() -> Dict[str, Any]: + """获取所有可用的接口列表""" + return RUIMEIYUN_APIS diff --git a/backend/app/services/ruimeiyun/registry.py b/backend/app/services/ruimeiyun/registry.py new file mode 100644 index 0000000..210c092 --- /dev/null +++ b/backend/app/services/ruimeiyun/registry.py @@ -0,0 +1,885 @@ +""" +睿美云接口注册表 + +定义所有可用的睿美云开放接口,包括: +- 接口路径 +- 请求方法 +- 参数说明 +- 接口分组 + +接口命名规则: {模块}.{操作} +例如: customer.search, order.list, treat.page +""" + +from typing import Dict, Any, Optional, List +from dataclasses import dataclass + + +@dataclass +class ApiDefinition: + """接口定义""" + method: str # GET / POST + path: str # API 路径 + description: str # 接口描述 + module: str # 所属模块 + params: Optional[List[str]] = None # URL 参数列表 + body_required: bool = False # 是否需要请求体 + + +# 睿美云开放接口注册表 +RUIMEIYUN_APIS: Dict[str, Dict[str, Any]] = { + + # ======================================== + # 客户模块 (customer) + # ======================================== + + "customer.sync": { + "method": "POST", + "path": "/api/v1/tpos/customer/info-sync", + "description": "客户档案新增/编辑", + "module": "customer", + "body_required": True + }, + "customer.search": { + "method": "GET", + "path": "/api/v1/tpos/customer/customer-search", + "description": "获取客户信息列表(支持姓名、电话、档案号模糊查询)", + "module": "customer", + "params": ["keyword", "createDateStart", "createDateEnd", "tenantId", "page", "size", "lastCustomerId"] + }, + "customer.detail": { + "method": "GET", + "path": "/api/v1/tpos/customer/customer-by-id", + "description": "根据客户ID获取详细信息", + "module": "customer", + "params": ["customerId"] + }, + "customer.rebate_list": { + "method": "POST", + "path": "/api/v1/tpos/customer/my-rebate-list", + "description": "获取客户返利列表", + "module": "customer", + "body_required": True + }, + "customer.rebate_detail": { + "method": "GET", + "path": "/api/v1/tpos/customer/my-rebate-detail", + "description": "获取返利详情", + "module": "customer", + "params": ["rebateId"] + }, + "customer.clue_list": { + "method": "POST", + "path": "/api/v1/tpos/customer/customer-clue-list", + "description": "获取客户线索列表", + "module": "customer", + "body_required": True + }, + "customer.label_list": { + "method": "GET", + "path": "/api/v1/tpos/customer/customer-label-list", + "description": "获取客户标签列表", + "module": "customer", + "params": ["tenantId"] + }, + "customer.gold_list": { + "method": "POST", + "path": "/api/v1/tpos/customer/customer-gold-list", + "description": "获取金卡客户列表", + "module": "customer", + "body_required": True + }, + "customer.plan_list": { + "method": "GET", + "path": "/api/v1/tpos/customer/get-all-plan", + "description": "获取所有客户计划", + "module": "customer", + "params": ["tenantId"] + }, + "customer.transfer_pool": { + "method": "POST", + "path": "/api/v1/tpos/customer/transfer-pool", + "description": "客户池转移", + "module": "customer", + "body_required": True + }, + "customer.pool_info": { + "method": "GET", + "path": "/api/v1/tpos/customer/get-pool-info", + "description": "获取客户池信息", + "module": "customer", + "params": ["customerId"] + }, + "customer.qw_info": { + "method": "GET", + "path": "/api/v1/tpos/customer/customer-qw-info", + "description": "获取客户企微信息", + "module": "customer", + "params": ["customerId"] + }, + "customer.sign_search": { + "method": "POST", + "path": "/api/v1/tpos/customer/customer-sign-search", + "description": "客户签到搜索", + "module": "customer", + "body_required": True + }, + + # ======================================== + # 门店模块 (tenant) + # ======================================== + + "tenant.list": { + "method": "GET", + "path": "/api/v1/tpos/common/tenantList", + "description": "获取门店信息列表", + "module": "tenant", + "params": [] + }, + + # ======================================== + # 回访模块 (visit) + # ======================================== + + "visit.type_list": { + "method": "GET", + "path": "/api/v1/tpos/visit/get-visit-type", + "description": "获取回访类型列表", + "module": "visit", + "params": ["tenantId"] + }, + "visit.way_type_list": { + "method": "GET", + "path": "/api/v1/tpos/visit/get-visit-way-type", + "description": "获取回访方式类型列表", + "module": "visit", + "params": ["tenantId"] + }, + "visit.page": { + "method": "POST", + "path": "/api/v1/tpos/visit/get-visit-page", + "description": "分页获取回访记录", + "module": "visit", + "body_required": True + }, + "visit.create": { + "method": "POST", + "path": "/api/v1/tpos/visit/create-visit", + "description": "新增回访记录", + "module": "visit", + "body_required": True + }, + "visit.template_type": { + "method": "GET", + "path": "/api/v1/tpos/visit/visit-template-type", + "description": "获取回访模板类型", + "module": "visit", + "params": ["tenantId"] + }, + + # ======================================== + # 报备模块 (preparation) + # ======================================== + + "preparation.add": { + "method": "POST", + "path": "/api/v1/tpos/preparation/add", + "description": "新增报备", + "module": "preparation", + "body_required": True + }, + "preparation.query": { + "method": "POST", + "path": "/api/v1/tpos/preparation/get-preparation", + "description": "查询报备", + "module": "preparation", + "body_required": True + }, + + # ======================================== + # 员工模块 (user) + # ======================================== + + "user.page": { + "method": "GET", + "path": "/api/v1/tpos/user/get-page", + "description": "分页获取员工列表", + "module": "user", + "params": ["page", "size", "tenantId", "keyword"] + }, + "user.dept_tree": { + "method": "GET", + "path": "/api/v1/tpos/basic/dept-tree", + "description": "获取部门树结构", + "module": "user", + "params": ["tenantId"] + }, + "user.role_list": { + "method": "GET", + "path": "/api/v1/tpos/role/getRoleList", + "description": "获取角色列表", + "module": "user", + "params": ["tenantId"] + }, + + # ======================================== + # 卡券模块 (coupon) + # ======================================== + + "coupon.customer_page": { + "method": "GET", + "path": "/api/v1/tpos/coupon/customer-coupon-page", + "description": "分页获取客户卡券", + "module": "coupon", + "params": ["customerId", "page", "size"] + }, + "coupon.customer_list": { + "method": "GET", + "path": "/api/v1/tpos/coupon/customer-coupon-list", + "description": "获取客户卡券列表", + "module": "coupon", + "params": ["customerId"] + }, + "coupon.use": { + "method": "POST", + "path": "/api/v1/tpos/coupon/use-coupon", + "description": "使用卡券", + "module": "coupon", + "body_required": True + }, + "coupon.page": { + "method": "GET", + "path": "/api/v1/tpos/coupon/coupon-page", + "description": "分页获取卡券信息", + "module": "coupon", + "params": ["page", "size", "tenantId"] + }, + "coupon.send": { + "method": "POST", + "path": "/api/v1/tpos/coupon/send-coupon", + "description": "发送卡券", + "module": "coupon", + "body_required": True + }, + "coupon.gift": { + "method": "POST", + "path": "/api/v1/tpos/coupon/gift-coupon", + "description": "卡券赠送(小程序分享)", + "module": "coupon", + "body_required": True + }, + "coupon.receive": { + "method": "POST", + "path": "/api/v1/tpos/coupon/receive-coupon", + "description": "领取卡券(小程序分享)", + "module": "coupon", + "body_required": True + }, + + # ======================================== + # 营销模块 (marketing) + # ======================================== + + "marketing.appoint_card_page": { + "method": "GET", + "path": "/api/v1/tpos/marketing/appoint-card-page", + "description": "线上预约-名片管理", + "module": "marketing", + "params": ["page", "size", "tenantId"] + }, + "marketing.graphic_message_list": { + "method": "GET", + "path": "/api/v1/tpos/marketing/graphic-message-list", + "description": "内容管理-图文消息", + "module": "marketing", + "params": ["tenantId"] + }, + + # ======================================== + # 积分模块 (integral) + # ======================================== + + "integral.customer": { + "method": "GET", + "path": "/api/v1/tpos/integral/getCusIntegral", + "description": "获取客户积分", + "module": "integral", + "params": ["customerId"] + }, + "integral.score_record_page": { + "method": "GET", + "path": "/api/v1/tpos/integral/score-record-page", + "description": "获取客户积分/成长值分页信息", + "module": "integral", + "params": ["customerId", "page", "size"] + }, + "integral.growth_upgrade": { + "method": "POST", + "path": "/api/v1/tpos/integral/query-customer-growth-upgrade", + "description": "查询客户成长升级信息", + "module": "integral", + "body_required": True + }, + + # ======================================== + # 订单模块 (order) + # ======================================== + + "order.billing_page": { + "method": "GET", + "path": "/api/v1/tpos/order/billing-page", + "description": "获取订单信息列表", + "module": "order", + "params": ["customerId", "page", "size", "startDate", "endDate"] + }, + "order.payment_detail": { + "method": "GET", + "path": "/api/v1/tpos/order/payment-detail", + "description": "获取费用单详细信息", + "module": "order", + "params": ["billingId"] + }, + "order.add_billing": { + "method": "POST", + "path": "/api/v1/tpos/order/add-billing", + "description": "开单", + "module": "order", + "body_required": True + }, + "order.add_billing_review": { + "method": "POST", + "path": "/api/v1/tpos/order/add-billing-review", + "description": "开单审核", + "module": "order", + "body_required": True + }, + "order.enable_billing": { + "method": "GET", + "path": "/api/v1/tpos/order/enable-billing", + "description": "可操作的订单项", + "module": "order", + "params": ["billingId"] + }, + "order.refund": { + "method": "POST", + "path": "/api/v1/tpos/order/refund", + "description": "订单退款", + "module": "order", + "body_required": True + }, + "order.gift_project": { + "method": "POST", + "path": "/api/v1/tpos/order/gift-project", + "description": "项目转赠(小程序分享)", + "module": "order", + "body_required": True + }, + "order.receive_project": { + "method": "POST", + "path": "/api/v1/tpos/order/receive-project", + "description": "领取赠送项目(小程序分享)", + "module": "order", + "body_required": True + }, + "order.equity_card_page": { + "method": "GET", + "path": "/api/v1/tpos/order/billing-equity-card-page", + "description": "获取客户权益卡列表", + "module": "order", + "params": ["customerId", "page", "size"] + }, + "order.balance_recharge_page": { + "method": "GET", + "path": "/api/v1/tpos/order/balance-recharge-page", + "description": "储值充值记录", + "module": "order", + "params": ["customerId", "page", "size"] + }, + "order.balance_deduction_page": { + "method": "GET", + "path": "/api/v1/tpos/order/balance-deduction-page", + "description": "储值抵扣记录", + "module": "order", + "params": ["customerId", "page", "size"] + }, + "order.balance_refund_page": { + "method": "GET", + "path": "/api/v1/tpos/order/balance-refund-page", + "description": "储值退款记录", + "module": "order", + "params": ["customerId", "page", "size"] + }, + "order.balance_transfer_page": { + "method": "GET", + "path": "/api/v1/tpos/order/balance-transfer-page", + "description": "储值转赠记录", + "module": "order", + "params": ["customerId", "page", "size"] + }, + "order.integral_mall_page": { + "method": "GET", + "path": "/api/v1/tpos/order/integral-mall-exchange-page", + "description": "获取积分兑换订单信息列表", + "module": "order", + "params": ["customerId", "page", "size"] + }, + "order.add_external": { + "method": "POST", + "path": "/api/v1/tpos/order/add-order-external", + "description": "外部订单创建", + "module": "order", + "body_required": True + }, + "order.refund_external": { + "method": "POST", + "path": "/api/v1/tpos/order/refund-order-external", + "description": "外部订单退款", + "module": "order", + "body_required": True + }, + "order.customer_billing_list": { + "method": "POST", + "path": "/api/v1/tpos/order/get-customer-billing-list", + "description": "获取客户订单列表", + "module": "order", + "body_required": True + }, + "order.cashier_record_list": { + "method": "POST", + "path": "/api/v1/tpos/order/get-cashierRecord-list", + "description": "获取收银记录列表", + "module": "order", + "body_required": True + }, + + # ======================================== + # 治疗模块 (treat) + # ======================================== + + "treat.untreated_page": { + "method": "GET", + "path": "/api/v1/tpos/treat/untreated-page", + "description": "查询客户未治疗记录", + "module": "treat", + "params": ["customerId", "page", "size"] + }, + "treat.already_treated_page": { + "method": "GET", + "path": "/api/v1/tpos/treat/already-treated-page", + "description": "查询客户已治疗记录", + "module": "treat", + "params": ["customerId", "page", "size"] + }, + "treat.page_review": { + "method": "GET", + "path": "/api/v1/tpos/treat/treated-page-review", + "description": "分页获取治疗数据", + "module": "treat", + "params": ["page", "size", "tenantId"] + }, + "treat.operating_room_list": { + "method": "GET", + "path": "/api/v1/tpos/treat/get-operating-room-list", + "description": "获取治疗时查询的手术间信息", + "module": "treat", + "params": ["tenantId"] + }, + "treat.begin_info": { + "method": "GET", + "path": "/api/v1/tpos/treat/get-begin-treat-info", + "description": "获取治疗中和已治疗的数据", + "module": "treat", + "params": ["treatId"] + }, + "treat.deduct_verify": { + "method": "POST", + "path": "/api/v1/tpos/treat/treat-deduct-verify", + "description": "进行核销和划扣", + "module": "treat", + "body_required": True + }, + "treat.cancel_deduct": { + "method": "POST", + "path": "/api/v1/tpos/treat/cancel-deduct", + "description": "取消划扣", + "module": "treat", + "body_required": True + }, + "treat.cancel_verify": { + "method": "POST", + "path": "/api/v1/tpos/treat/cancel-verify", + "description": "取消核销", + "module": "treat", + "body_required": True + }, + "treat.deduct_verify_detail": { + "method": "GET", + "path": "/api/v1/tpos/treat/get-treated-deduct-and-verify-detail", + "description": "已治疗的核销和划扣详情信息", + "module": "treat", + "params": ["treatId"] + }, + "treat.roles": { + "method": "GET", + "path": "/api/v1/tpos/treat/get-treatment-roles", + "description": "获取所有的治疗岗位列表", + "module": "treat", + "params": ["tenantId"] + }, + "treat.scrm_list": { + "method": "POST", + "path": "/api/v1/tpos/treat/scrmTreatList", + "description": "小程序-我的治疗(新版)", + "module": "treat", + "body_required": True + }, + + # ======================================== + # 照片模块 (photo) + # ======================================== + + "photo.add": { + "method": "POST", + "path": "/api/v1/tpos/common/addPhoto", + "description": "外部七牛照片转存至睿美云", + "module": "photo", + "body_required": True + }, + "photo.add_open": { + "method": "POST", + "path": "/api/v1/tpos/common/addPhotoOpen", + "description": "外部照片路径转存至睿美云", + "module": "photo", + "body_required": True + }, + "photo.upload": { + "method": "POST", + "path": "/api/v1/tpos/common/upload_customer_photo", + "description": "上传照片到睿美云", + "module": "photo", + "body_required": True + }, + "photo.page": { + "method": "GET", + "path": "/api/v1/tpos/common/photoPage", + "description": "通过客户id分页查询照片信息", + "module": "photo", + "params": ["customerId", "page", "size"] + }, + "photo.skin_update": { + "method": "POST", + "path": "/api/v1/tpos/skin_image/update_skin_file", + "description": "皮肤检测类图片上传", + "module": "photo", + "body_required": True + }, + + # ======================================== + # 基础数据模块 (basic) + # ======================================== + + "basic.project_page": { + "method": "GET", + "path": "/api/v1/tpos/basic/project-page", + "description": "分页获取项目列表", + "module": "basic", + "params": ["page", "size", "tenantId", "keyword"] + }, + "basic.project_type_tree": { + "method": "GET", + "path": "/api/v1/tpos/basic/project-type-tree", + "description": "获取项目分类树", + "module": "basic", + "params": ["tenantId"] + }, + "basic.project_detail": { + "method": "GET", + "path": "/api/v1/tpos/basic/project-detail", + "description": "获取项目详情", + "module": "basic", + "params": ["projectId"] + }, + "basic.package_page": { + "method": "GET", + "path": "/api/v1/tpos/basic/package-page", + "description": "分页获取套餐列表", + "module": "basic", + "params": ["page", "size", "tenantId"] + }, + "basic.package_type_tree": { + "method": "GET", + "path": "/api/v1/tpos/basic/package-type-tree", + "description": "获取套餐分类树", + "module": "basic", + "params": ["tenantId"] + }, + "basic.package_detail": { + "method": "GET", + "path": "/api/v1/tpos/basic/package-detail", + "description": "获取套餐详情", + "module": "basic", + "params": ["packageId"] + }, + "basic.annual_card_page": { + "method": "GET", + "path": "/api/v1/tpos/basic/annual-card-page", + "description": "分页获取年卡列表", + "module": "basic", + "params": ["page", "size", "tenantId"] + }, + "basic.annual_card_type_tree": { + "method": "GET", + "path": "/api/v1/tpos/basic/annual-card-type-tree", + "description": "获取年卡分类树", + "module": "basic", + "params": ["tenantId"] + }, + "basic.annual_card_detail": { + "method": "GET", + "path": "/api/v1/tpos/basic/annual-card-detail", + "description": "获取年卡详情", + "module": "basic", + "params": ["cardId"] + }, + "basic.time_card_page": { + "method": "GET", + "path": "/api/v1/tpos/basic/time-card-page", + "description": "分页获取次卡列表", + "module": "basic", + "params": ["page", "size", "tenantId"] + }, + "basic.time_card_type_tree": { + "method": "GET", + "path": "/api/v1/tpos/basic/time-card-type-tree", + "description": "获取次卡分类树", + "module": "basic", + "params": ["tenantId"] + }, + "basic.time_card_detail": { + "method": "GET", + "path": "/api/v1/tpos/basic/time-card-detail", + "description": "获取次卡详情", + "module": "basic", + "params": ["cardId"] + }, + + # ======================================== + # 预约模块 (cusbespeak) + # ======================================== + + "appointment.add": { + "method": "POST", + "path": "/api/v1/tpos/cusbespeak/add", + "description": "新增预约", + "module": "appointment", + "body_required": True + }, + "appointment.update": { + "method": "POST", + "path": "/api/v1/tpos/cusbespeak/update", + "description": "修改预约", + "module": "appointment", + "body_required": True + }, + "appointment.confirm": { + "method": "POST", + "path": "/api/v1/tpos/cusbespeak/confirm", + "description": "确认预约", + "module": "appointment", + "body_required": True + }, + "appointment.cancel": { + "method": "POST", + "path": "/api/v1/tpos/cusbespeak/cancel", + "description": "取消预约", + "module": "appointment", + "body_required": True + }, + "appointment.page": { + "method": "POST", + "path": "/api/v1/tpos/cusbespeak/page", + "description": "预约分页查询", + "module": "appointment", + "body_required": True + }, + "appointment.doctor_list": { + "method": "GET", + "path": "/api/v1/tpos/cusbespeak/doctor-list", + "description": "获取可选择的预约医生", + "module": "appointment", + "params": ["tenantId"] + }, + "appointment.schedule": { + "method": "GET", + "path": "/api/v1/tpos/cusbespeak/schedule", + "description": "查询预约专家排班", + "module": "appointment", + "params": ["doctorId", "date"] + }, + + # ======================================== + # 渠道模块 (channel) + # ======================================== + + "channel.type_select": { + "method": "GET", + "path": "/api/v1/tpos/channel/type-select", + "description": "整合渠道类型选择(建档,报备)", + "module": "channel", + "params": ["tenantId"] + }, + "channel.list_by_type": { + "method": "GET", + "path": "/api/v1/tpos/channel/list-by-type", + "description": "通过渠道类型获取渠道列表", + "module": "channel", + "params": ["typeId", "tenantId"] + }, + "channel.info": { + "method": "GET", + "path": "/api/v1/tpos/channel/info", + "description": "查询渠道信息", + "module": "channel", + "params": ["channelId"] + }, + "channel.media_info": { + "method": "GET", + "path": "/api/v1/tpos/channel/media-info", + "description": "查询运营媒体信息", + "module": "channel", + "params": ["mediaId"] + }, + + # ======================================== + # 接待模块 (reception) + # ======================================== + + "reception.triage_list": { + "method": "GET", + "path": "/api/v1/tpos/reception/triage-list", + "description": "可用的接待分诊人列表", + "module": "reception", + "params": ["tenantId"] + }, + "reception.add": { + "method": "POST", + "path": "/api/v1/tpos/reception/add", + "description": "新增接待", + "module": "reception", + "body_required": True + }, + "reception.query": { + "method": "GET", + "path": "/api/v1/tpos/reception/query", + "description": "查询客户接待信息", + "module": "reception", + "params": ["customerId"] + }, + "reception.sign_init": { + "method": "GET", + "path": "/api/v1/tpos/reception/sign-init", + "description": "客户扫码签到初始化数据(小程序)", + "module": "reception", + "params": ["tenantId"] + }, + "reception.sign": { + "method": "POST", + "path": "/api/v1/tpos/reception/sign", + "description": "客户扫码签到(小程序)", + "module": "reception", + "body_required": True + }, + + # ======================================== + # 咨询模块 (consult) + # ======================================== + + "consult.add": { + "method": "POST", + "path": "/api/v1/tpos/consult/add", + "description": "新增咨询", + "module": "consult", + "body_required": True + }, + "consult.update": { + "method": "POST", + "path": "/api/v1/tpos/consult/update", + "description": "修改咨询", + "module": "consult", + "body_required": True + }, + + # ======================================== + # 病历模块 (medical_record) + # ======================================== + + "medical_record.add": { + "method": "POST", + "path": "/api/v1/tpos/medical-record/add", + "description": "新增病历", + "module": "medical_record", + "body_required": True + }, + "medical_record.update": { + "method": "POST", + "path": "/api/v1/tpos/medical-record/update", + "description": "修改病历", + "module": "medical_record", + "body_required": True + }, + "medical_record.delete": { + "method": "POST", + "path": "/api/v1/tpos/medical-record/delete", + "description": "删除病历", + "module": "medical_record", + "body_required": True + }, +} + + +def get_api_definition(api_name: str) -> Optional[Dict[str, Any]]: + """ + 获取接口定义 + + Args: + api_name: 接口名称,如 customer.search + + Returns: + 接口定义字典,不存在则返回 None + """ + return RUIMEIYUN_APIS.get(api_name) + + +def get_api_list_by_module(module: str) -> List[Dict[str, Any]]: + """ + 按模块获取接口列表 + + Args: + module: 模块名称,如 customer, order + + Returns: + 该模块下的接口列表 + """ + result = [] + for name, definition in RUIMEIYUN_APIS.items(): + if definition.get("module") == module: + result.append({"name": name, **definition}) + return result + + +def get_all_modules() -> List[str]: + """获取所有模块名称""" + modules = set() + for definition in RUIMEIYUN_APIS.values(): + if "module" in definition: + modules.add(definition["module"]) + return sorted(list(modules)) + + +def get_api_summary() -> Dict[str, int]: + """获取接口统计""" + summary = {} + for definition in RUIMEIYUN_APIS.values(): + module = definition.get("module", "unknown") + summary[module] = summary.get(module, 0) + 1 + return summary diff --git a/backend/app/services/scheduler.py b/backend/app/services/scheduler.py index 1dcf59a..6058b5a 100644 --- a/backend/app/services/scheduler.py +++ b/backend/app/services/scheduler.py @@ -1,609 +1,609 @@ -"""定时任务调度服务""" -import json -import httpx -import asyncio -from datetime import datetime -from typing import Optional -from apscheduler.schedulers.asyncio import AsyncIOScheduler -from apscheduler.triggers.cron import CronTrigger -from sqlalchemy.orm import Session - -from ..database import SessionLocal -from ..models.scheduled_task import ScheduledTask, TaskLog -from ..models.notification_channel import TaskNotifyChannel -from .script_executor import ScriptExecutor - - -class SchedulerService: - """调度服务 - 管理定时任务的调度和执行""" - - _instance: Optional['SchedulerService'] = None - _scheduler: Optional[AsyncIOScheduler] = None - - def __new__(cls): - if cls._instance is None: - cls._instance = super().__new__(cls) - return cls._instance - - def __init__(self): - if self._scheduler is None: - self._scheduler = AsyncIOScheduler(timezone='Asia/Shanghai') - - @property - def scheduler(self) -> AsyncIOScheduler: - return self._scheduler - - def start(self): - """启动调度器并加载所有任务""" - if not self._scheduler.running: - self._scheduler.start() - self._load_all_tasks() - print("调度器已启动") - - def shutdown(self): - """关闭调度器""" - if self._scheduler.running: - self._scheduler.shutdown() - print("调度器已关闭") - - def _load_all_tasks(self): - """从数据库加载所有启用的任务""" - db = SessionLocal() - try: - tasks = db.query(ScheduledTask).filter(ScheduledTask.is_enabled == True).all() - for task in tasks: - self._add_task_to_scheduler(task) - print(f"已加载 {len(tasks)} 个定时任务") - finally: - db.close() - - def _add_task_to_scheduler(self, task: ScheduledTask): - """将任务添加到调度器""" - job_id = f"task_{task.id}" - - # 移除已存在的任务 - if self._scheduler.get_job(job_id): - self._scheduler.remove_job(job_id) - - if task.schedule_type == 'cron' and task.cron_expression: - # CRON模式 - try: - trigger = CronTrigger.from_crontab(task.cron_expression, timezone='Asia/Shanghai') - self._scheduler.add_job( - self._execute_task, - trigger, - id=job_id, - args=[task.id], - replace_existing=True - ) - except Exception as e: - print(f"任务 {task.id} CRON表达式解析失败: {e}") - - elif task.schedule_type == 'simple' and task.time_points: - # 简单模式 - 多个时间点 - try: - time_points = task.time_points if isinstance(task.time_points, list) else json.loads(task.time_points) - for i, time_point in enumerate(time_points): - hour, minute = map(int, time_point.split(':')) - sub_job_id = f"{job_id}_{i}" - self._scheduler.add_job( - self._execute_task, - CronTrigger(hour=hour, minute=minute, timezone='Asia/Shanghai'), - id=sub_job_id, - args=[task.id], - replace_existing=True - ) - except Exception as e: - print(f"任务 {task.id} 时间点解析失败: {e}") - - def add_task(self, task_id: int): - """添加或更新任务调度""" - db = SessionLocal() - try: - task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() - if task and task.is_enabled: - self._add_task_to_scheduler(task) - finally: - db.close() - - def remove_task(self, task_id: int): - """移除任务调度""" - job_id = f"task_{task_id}" - - # 移除主任务 - if self._scheduler.get_job(job_id): - self._scheduler.remove_job(job_id) - - # 移除简单模式的子任务 - for i in range(24): # 最多24个时间点 - sub_job_id = f"{job_id}_{i}" - if self._scheduler.get_job(sub_job_id): - self._scheduler.remove_job(sub_job_id) - - async def _execute_task(self, task_id: int): - """执行任务(带重试)""" - db = SessionLocal() - try: - task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() - if not task: - return - - max_retries = task.retry_count or 0 - retry_interval = task.retry_interval or 60 - - for attempt in range(max_retries + 1): - success, output, error = await self._execute_task_once(db, task) - - if success: - return - - # 如果还有重试机会 - if attempt < max_retries: - print(f"任务 {task_id} 执行失败,{retry_interval}秒后重试 ({attempt + 1}/{max_retries})") - await asyncio.sleep(retry_interval) - else: - # 最后一次失败,发送告警 - if task.alert_on_failure and task.alert_webhook: - await self._send_alert(task, error) - finally: - db.close() - - async def _execute_task_once(self, db: Session, task: ScheduledTask): - """执行一次任务""" - trace_id = f"{int(datetime.now().timestamp())}-{task.id}" - started_at = datetime.now() - - # 创建日志记录 - log = TaskLog( - task_id=task.id, - tenant_id=task.tenant_id, - trace_id=trace_id, - status='running', - started_at=started_at - ) - db.add(log) - db.commit() - db.refresh(log) - - success = False - output = '' - error = '' - result = None - - try: - # 解析输入参数 - params = {} - if task.input_params: - params = task.input_params if isinstance(task.input_params, dict) else {} - - if task.execution_type == 'webhook': - success, output, error = await self._execute_webhook(task) - else: - success, output, error, result = await self._execute_script(db, task, trace_id, params) - - # 如果脚本执行成功且有返回内容,发送通知 - if success and result and result.get('content'): - await self._send_notifications(db, task, result) - - except Exception as e: - error = str(e) - - # 更新日志 - finished_at = datetime.now() - duration_ms = int((finished_at - started_at).total_seconds() * 1000) - - log.status = 'success' if success else 'failed' - log.finished_at = finished_at - log.duration_ms = duration_ms - log.output = output[:10000] if output else None # 限制长度 - log.error = error[:5000] if error else None - - # 更新任务状态 - task.last_run_at = finished_at - task.last_run_status = 'success' if success else 'failed' - - db.commit() - - return success, output, error - - async def _execute_webhook(self, task: ScheduledTask): - """执行Webhook任务""" - try: - body = {} - if task.input_params: - body = task.input_params if isinstance(task.input_params, dict) else {} - - async with httpx.AsyncClient(timeout=30) as client: - response = await client.post(task.webhook_url, json=body) - response.raise_for_status() - return True, response.text[:5000], '' - - except Exception as e: - return False, '', str(e) - - async def _execute_script(self, db: Session, task: ScheduledTask, trace_id: str, params: dict): - """执行脚本任务""" - if not task.script_content: - return False, '', '脚本内容为空', None - - executor = ScriptExecutor(db) - success, output, error, result = executor.execute( - script_content=task.script_content, - task_id=task.id, - tenant_id=task.tenant_id, - trace_id=trace_id, - params=params, - timeout=300 # 默认超时 - ) - - return success, output, error, result - - async def _send_notifications(self, db: Session, task: ScheduledTask, result: dict): - """发送通知到配置的渠道 - - result 格式: - - 简单格式: {'content': '内容', 'title': '标题'} - - 完整格式: {'msg_type': 'actionCard', 'title': '...', 'content': '...', 'buttons': [...]} - - 支持的 msg_type: - - text: 纯文本 - - markdown: Markdown格式(默认) - - link: 链接消息 - - actionCard: 交互卡片(带按钮) - - feedCard: 信息流卡片 - - news: 图文消息(企微) - - template_card: 模板卡片(企微) - """ - content = result.get('content', '') - title = result.get('title', task.task_name) - - if not content and result.get('msg_type') not in ('feedCard', 'news', 'template_card'): - return - - # 获取通知渠道配置 - channel_ids = task.notify_channels - if isinstance(channel_ids, str): - try: - channel_ids = json.loads(channel_ids) - except: - channel_ids = [] - - if not channel_ids: - channel_ids = [] - - # 发送到通知渠道 - for channel_id in channel_ids: - try: - channel = db.query(TaskNotifyChannel).filter( - TaskNotifyChannel.id == channel_id, - TaskNotifyChannel.is_enabled == True - ).first() - - if not channel: - continue - - await self._send_to_channel(channel, result) - - except Exception as e: - print(f"发送通知到渠道 {channel_id} 失败: {e}") - - # 发送到企微应用 - if task.notify_wecom_app_id: - try: - await self._send_to_wecom_app(db, task.notify_wecom_app_id, result, task.tenant_id) - except Exception as e: - print(f"发送企微应用消息失败: {e}") - - async def _send_to_channel(self, channel: TaskNotifyChannel, result: dict): - """发送消息到通知渠道 - - 钉钉支持: text, markdown, link, actionCard, feedCard - 企微支持: text, markdown, image, news, template_card - """ - import time - import hmac - import hashlib - import base64 - import urllib.parse - - url = channel.webhook_url - msg_type = result.get('msg_type', 'markdown') - title = result.get('title', '通知') - content = result.get('content', '') - - if channel.channel_type == 'dingtalk_bot': - # 钉钉加签 - if channel.sign_secret: - timestamp = str(round(time.time() * 1000)) - string_to_sign = f'{timestamp}\n{channel.sign_secret}' - hmac_code = hmac.new( - channel.sign_secret.encode('utf-8'), - string_to_sign.encode('utf-8'), - digestmod=hashlib.sha256 - ).digest() - sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) - - if '?' in url: - url = f"{url}×tamp={timestamp}&sign={sign}" - else: - url = f"{url}?timestamp={timestamp}&sign={sign}" - - payload = self._build_dingtalk_payload(msg_type, title, content, result) - else: # wecom_bot - payload = self._build_wecom_payload(msg_type, title, content, result) - - async with httpx.AsyncClient(timeout=10) as client: - response = await client.post(url, json=payload) - resp = response.json() - if resp.get('errcode') != 0: - print(f"通知发送失败: {resp}") - - def _build_dingtalk_payload(self, msg_type: str, title: str, content: str, result: dict) -> dict: - """构建钉钉消息体 - - 支持类型: - - text: 纯文本 - - markdown: Markdown - - link: 链接消息 - - actionCard: 交互卡片(整体跳转/独立跳转) - - feedCard: 信息流卡片 - """ - if msg_type == 'text': - return { - "msgtype": "text", - "text": {"content": content}, - "at": result.get('at', {}) - } - - elif msg_type == 'link': - return { - "msgtype": "link", - "link": { - "title": title, - "text": content, - "messageUrl": result.get('url', ''), - "picUrl": result.get('pic_url', '') - } - } - - elif msg_type == 'actionCard': - buttons = result.get('buttons', []) - card = { - "title": title, - "text": content, - "btnOrientation": result.get('btn_orientation', '0') # 0-竖向 1-横向 - } - - if len(buttons) == 1: - # 整体跳转 - card["singleTitle"] = buttons[0].get('title', '查看详情') - card["singleURL"] = buttons[0].get('url', '') - elif len(buttons) > 1: - # 独立跳转 - card["btns"] = [ - {"title": btn.get('title', ''), "actionURL": btn.get('url', '')} - for btn in buttons - ] - - return {"msgtype": "actionCard", "actionCard": card} - - elif msg_type == 'feedCard': - links = result.get('links', []) - return { - "msgtype": "feedCard", - "feedCard": { - "links": [ - { - "title": link.get('title', ''), - "messageURL": link.get('url', ''), - "picURL": link.get('pic_url', '') - } - for link in links - ] - } - } - - else: # markdown(默认) - return { - "msgtype": "markdown", - "markdown": {"title": title, "text": content}, - "at": result.get('at', {}) - } - - def _build_wecom_payload(self, msg_type: str, title: str, content: str, result: dict) -> dict: - """构建企微消息体 - - 支持类型: - - text: 纯文本 - - markdown: Markdown - - image: 图片 - - news: 图文消息 - - template_card: 模板卡片 - """ - if msg_type == 'text': - payload = { - "msgtype": "text", - "text": {"content": content} - } - if result.get('mentioned_list'): - payload["text"]["mentioned_list"] = result.get('mentioned_list') - if result.get('mentioned_mobile_list'): - payload["text"]["mentioned_mobile_list"] = result.get('mentioned_mobile_list') - return payload - - elif msg_type == 'image': - return { - "msgtype": "image", - "image": { - "base64": result.get('image_base64', ''), - "md5": result.get('image_md5', '') - } - } - - elif msg_type == 'news': - articles = result.get('articles', []) - if not articles and content: - articles = [{ - "title": title, - "description": content, - "url": result.get('url', ''), - "picurl": result.get('pic_url', '') - }] - return { - "msgtype": "news", - "news": {"articles": articles} - } - - elif msg_type == 'template_card': - card_type = result.get('card_type', 'text_notice') - - if card_type == 'text_notice': - # 文本通知卡片 - return { - "msgtype": "template_card", - "template_card": { - "card_type": "text_notice", - "main_title": {"title": title, "desc": result.get('desc', '')}, - "sub_title_text": content, - "horizontal_content_list": result.get('horizontal_list', []), - "jump_list": result.get('jump_list', []), - "card_action": result.get('card_action', {"type": 1, "url": ""}) - } - } - - elif card_type == 'news_notice': - # 图文展示卡片 - return { - "msgtype": "template_card", - "template_card": { - "card_type": "news_notice", - "main_title": {"title": title, "desc": result.get('desc', '')}, - "card_image": {"url": result.get('image_url', ''), "aspect_ratio": result.get('aspect_ratio', 1.3)}, - "vertical_content_list": result.get('vertical_list', []), - "horizontal_content_list": result.get('horizontal_list', []), - "jump_list": result.get('jump_list', []), - "card_action": result.get('card_action', {"type": 1, "url": ""}) - } - } - - elif card_type == 'button_interaction': - # 按钮交互卡片 - return { - "msgtype": "template_card", - "template_card": { - "card_type": "button_interaction", - "main_title": {"title": title, "desc": result.get('desc', '')}, - "sub_title_text": content, - "horizontal_content_list": result.get('horizontal_list', []), - "button_list": result.get('buttons', []), - "card_action": result.get('card_action', {"type": 1, "url": ""}) - } - } - - else: # markdown(默认) - return { - "msgtype": "markdown", - "markdown": {"content": f"**{title}**\n\n{content}"} - } - - async def _send_to_wecom_app(self, db: Session, app_id: int, result: dict, tenant_id: str): - """发送消息到企微应用""" - from ..models.tenant_wechat_app import TenantWechatApp - - app = db.query(TenantWechatApp).filter(TenantWechatApp.id == app_id).first() - if not app: - return - - # 获取 access_token - access_token = await self._get_wecom_access_token(app.corp_id, app.app_secret) - if not access_token: - return - - title = result.get('title', '通知') - content = result.get('content', '') - - # 发送消息 - url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}" - payload = { - "touser": result.get('touser', '@all'), - "msgtype": "markdown", - "agentid": app.agent_id, - "markdown": { - "content": f"**{title}**\n\n{content}" - } - } - - async with httpx.AsyncClient(timeout=10) as client: - response = await client.post(url, json=payload) - result = response.json() - if result.get('errcode') != 0: - print(f"企微应用消息发送失败: {result}") - - async def _get_wecom_access_token(self, corp_id: str, app_secret: str) -> Optional[str]: - """获取企微 access_token""" - url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corp_id}&corpsecret={app_secret}" - - async with httpx.AsyncClient(timeout=10) as client: - response = await client.get(url) - result = response.json() - if result.get('errcode') == 0: - return result.get('access_token') - else: - print(f"获取企微 access_token 失败: {result}") - return None - - async def _send_alert(self, task: ScheduledTask, error: str): - """发送失败告警""" - if not task.alert_webhook: - return - - content = f"""### 定时任务执行失败告警 - -**任务名称**: {task.task_name} -**任务ID**: {task.id} -**租户**: {task.tenant_id or '全局'} -**失败时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} -**错误信息**: -``` -{error[:500] if error else '未知错误'} -```""" - - try: - # 判断是钉钉还是企微 - if 'dingtalk' in task.alert_webhook or 'oapi.dingtalk.com' in task.alert_webhook: - payload = { - "msgtype": "markdown", - "markdown": {"title": "任务失败告警", "text": content} - } - else: - payload = { - "msgtype": "markdown", - "markdown": {"content": content} - } - - async with httpx.AsyncClient(timeout=10) as client: - await client.post(task.alert_webhook, json=payload) - except Exception as e: - print(f"发送告警失败: {e}") - - async def run_task_now(self, task_id: int) -> dict: - """立即执行任务(手动触发)""" - db = SessionLocal() - try: - task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() - if not task: - return {"success": False, "error": "任务不存在"} - - success, output, error = await self._execute_task_once(db, task) - - return { - "success": success, - "output": output, - "error": error - } - finally: - db.close() - - -# 全局调度器实例 -scheduler_service = SchedulerService() +"""定时任务调度服务""" +import json +import httpx +import asyncio +from datetime import datetime +from typing import Optional +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger +from sqlalchemy.orm import Session + +from ..database import SessionLocal +from ..models.scheduled_task import ScheduledTask, TaskLog +from ..models.notification_channel import TaskNotifyChannel +from .script_executor import ScriptExecutor + + +class SchedulerService: + """调度服务 - 管理定时任务的调度和执行""" + + _instance: Optional['SchedulerService'] = None + _scheduler: Optional[AsyncIOScheduler] = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + if self._scheduler is None: + self._scheduler = AsyncIOScheduler(timezone='Asia/Shanghai') + + @property + def scheduler(self) -> AsyncIOScheduler: + return self._scheduler + + def start(self): + """启动调度器并加载所有任务""" + if not self._scheduler.running: + self._scheduler.start() + self._load_all_tasks() + print("调度器已启动") + + def shutdown(self): + """关闭调度器""" + if self._scheduler.running: + self._scheduler.shutdown() + print("调度器已关闭") + + def _load_all_tasks(self): + """从数据库加载所有启用的任务""" + db = SessionLocal() + try: + tasks = db.query(ScheduledTask).filter(ScheduledTask.is_enabled == True).all() + for task in tasks: + self._add_task_to_scheduler(task) + print(f"已加载 {len(tasks)} 个定时任务") + finally: + db.close() + + def _add_task_to_scheduler(self, task: ScheduledTask): + """将任务添加到调度器""" + job_id = f"task_{task.id}" + + # 移除已存在的任务 + if self._scheduler.get_job(job_id): + self._scheduler.remove_job(job_id) + + if task.schedule_type == 'cron' and task.cron_expression: + # CRON模式 + try: + trigger = CronTrigger.from_crontab(task.cron_expression, timezone='Asia/Shanghai') + self._scheduler.add_job( + self._execute_task, + trigger, + id=job_id, + args=[task.id], + replace_existing=True + ) + except Exception as e: + print(f"任务 {task.id} CRON表达式解析失败: {e}") + + elif task.schedule_type == 'simple' and task.time_points: + # 简单模式 - 多个时间点 + try: + time_points = task.time_points if isinstance(task.time_points, list) else json.loads(task.time_points) + for i, time_point in enumerate(time_points): + hour, minute = map(int, time_point.split(':')) + sub_job_id = f"{job_id}_{i}" + self._scheduler.add_job( + self._execute_task, + CronTrigger(hour=hour, minute=minute, timezone='Asia/Shanghai'), + id=sub_job_id, + args=[task.id], + replace_existing=True + ) + except Exception as e: + print(f"任务 {task.id} 时间点解析失败: {e}") + + def add_task(self, task_id: int): + """添加或更新任务调度""" + db = SessionLocal() + try: + task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() + if task and task.is_enabled: + self._add_task_to_scheduler(task) + finally: + db.close() + + def remove_task(self, task_id: int): + """移除任务调度""" + job_id = f"task_{task_id}" + + # 移除主任务 + if self._scheduler.get_job(job_id): + self._scheduler.remove_job(job_id) + + # 移除简单模式的子任务 + for i in range(24): # 最多24个时间点 + sub_job_id = f"{job_id}_{i}" + if self._scheduler.get_job(sub_job_id): + self._scheduler.remove_job(sub_job_id) + + async def _execute_task(self, task_id: int): + """执行任务(带重试)""" + db = SessionLocal() + try: + task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() + if not task: + return + + max_retries = task.retry_count or 0 + retry_interval = task.retry_interval or 60 + + for attempt in range(max_retries + 1): + success, output, error = await self._execute_task_once(db, task) + + if success: + return + + # 如果还有重试机会 + if attempt < max_retries: + print(f"任务 {task_id} 执行失败,{retry_interval}秒后重试 ({attempt + 1}/{max_retries})") + await asyncio.sleep(retry_interval) + else: + # 最后一次失败,发送告警 + if task.alert_on_failure and task.alert_webhook: + await self._send_alert(task, error) + finally: + db.close() + + async def _execute_task_once(self, db: Session, task: ScheduledTask): + """执行一次任务""" + trace_id = f"{int(datetime.now().timestamp())}-{task.id}" + started_at = datetime.now() + + # 创建日志记录 + log = TaskLog( + task_id=task.id, + tenant_id=task.tenant_id, + trace_id=trace_id, + status='running', + started_at=started_at + ) + db.add(log) + db.commit() + db.refresh(log) + + success = False + output = '' + error = '' + result = None + + try: + # 解析输入参数 + params = {} + if task.input_params: + params = task.input_params if isinstance(task.input_params, dict) else {} + + if task.execution_type == 'webhook': + success, output, error = await self._execute_webhook(task) + else: + success, output, error, result = await self._execute_script(db, task, trace_id, params) + + # 如果脚本执行成功且有返回内容,发送通知 + if success and result and result.get('content'): + await self._send_notifications(db, task, result) + + except Exception as e: + error = str(e) + + # 更新日志 + finished_at = datetime.now() + duration_ms = int((finished_at - started_at).total_seconds() * 1000) + + log.status = 'success' if success else 'failed' + log.finished_at = finished_at + log.duration_ms = duration_ms + log.output = output[:10000] if output else None # 限制长度 + log.error = error[:5000] if error else None + + # 更新任务状态 + task.last_run_at = finished_at + task.last_run_status = 'success' if success else 'failed' + + db.commit() + + return success, output, error + + async def _execute_webhook(self, task: ScheduledTask): + """执行Webhook任务""" + try: + body = {} + if task.input_params: + body = task.input_params if isinstance(task.input_params, dict) else {} + + async with httpx.AsyncClient(timeout=30) as client: + response = await client.post(task.webhook_url, json=body) + response.raise_for_status() + return True, response.text[:5000], '' + + except Exception as e: + return False, '', str(e) + + async def _execute_script(self, db: Session, task: ScheduledTask, trace_id: str, params: dict): + """执行脚本任务""" + if not task.script_content: + return False, '', '脚本内容为空', None + + executor = ScriptExecutor(db) + success, output, error, result = executor.execute( + script_content=task.script_content, + task_id=task.id, + tenant_id=task.tenant_id, + trace_id=trace_id, + params=params, + timeout=300 # 默认超时 + ) + + return success, output, error, result + + async def _send_notifications(self, db: Session, task: ScheduledTask, result: dict): + """发送通知到配置的渠道 + + result 格式: + - 简单格式: {'content': '内容', 'title': '标题'} + - 完整格式: {'msg_type': 'actionCard', 'title': '...', 'content': '...', 'buttons': [...]} + + 支持的 msg_type: + - text: 纯文本 + - markdown: Markdown格式(默认) + - link: 链接消息 + - actionCard: 交互卡片(带按钮) + - feedCard: 信息流卡片 + - news: 图文消息(企微) + - template_card: 模板卡片(企微) + """ + content = result.get('content', '') + title = result.get('title', task.task_name) + + if not content and result.get('msg_type') not in ('feedCard', 'news', 'template_card'): + return + + # 获取通知渠道配置 + channel_ids = task.notify_channels + if isinstance(channel_ids, str): + try: + channel_ids = json.loads(channel_ids) + except: + channel_ids = [] + + if not channel_ids: + channel_ids = [] + + # 发送到通知渠道 + for channel_id in channel_ids: + try: + channel = db.query(TaskNotifyChannel).filter( + TaskNotifyChannel.id == channel_id, + TaskNotifyChannel.is_enabled == True + ).first() + + if not channel: + continue + + await self._send_to_channel(channel, result) + + except Exception as e: + print(f"发送通知到渠道 {channel_id} 失败: {e}") + + # 发送到企微应用 + if task.notify_wecom_app_id: + try: + await self._send_to_wecom_app(db, task.notify_wecom_app_id, result, task.tenant_id) + except Exception as e: + print(f"发送企微应用消息失败: {e}") + + async def _send_to_channel(self, channel: TaskNotifyChannel, result: dict): + """发送消息到通知渠道 + + 钉钉支持: text, markdown, link, actionCard, feedCard + 企微支持: text, markdown, image, news, template_card + """ + import time + import hmac + import hashlib + import base64 + import urllib.parse + + url = channel.webhook_url + msg_type = result.get('msg_type', 'markdown') + title = result.get('title', '通知') + content = result.get('content', '') + + if channel.channel_type == 'dingtalk_bot': + # 钉钉加签 + if channel.sign_secret: + timestamp = str(round(time.time() * 1000)) + string_to_sign = f'{timestamp}\n{channel.sign_secret}' + hmac_code = hmac.new( + channel.sign_secret.encode('utf-8'), + string_to_sign.encode('utf-8'), + digestmod=hashlib.sha256 + ).digest() + sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) + + if '?' in url: + url = f"{url}×tamp={timestamp}&sign={sign}" + else: + url = f"{url}?timestamp={timestamp}&sign={sign}" + + payload = self._build_dingtalk_payload(msg_type, title, content, result) + else: # wecom_bot + payload = self._build_wecom_payload(msg_type, title, content, result) + + async with httpx.AsyncClient(timeout=10) as client: + response = await client.post(url, json=payload) + resp = response.json() + if resp.get('errcode') != 0: + print(f"通知发送失败: {resp}") + + def _build_dingtalk_payload(self, msg_type: str, title: str, content: str, result: dict) -> dict: + """构建钉钉消息体 + + 支持类型: + - text: 纯文本 + - markdown: Markdown + - link: 链接消息 + - actionCard: 交互卡片(整体跳转/独立跳转) + - feedCard: 信息流卡片 + """ + if msg_type == 'text': + return { + "msgtype": "text", + "text": {"content": content}, + "at": result.get('at', {}) + } + + elif msg_type == 'link': + return { + "msgtype": "link", + "link": { + "title": title, + "text": content, + "messageUrl": result.get('url', ''), + "picUrl": result.get('pic_url', '') + } + } + + elif msg_type == 'actionCard': + buttons = result.get('buttons', []) + card = { + "title": title, + "text": content, + "btnOrientation": result.get('btn_orientation', '0') # 0-竖向 1-横向 + } + + if len(buttons) == 1: + # 整体跳转 + card["singleTitle"] = buttons[0].get('title', '查看详情') + card["singleURL"] = buttons[0].get('url', '') + elif len(buttons) > 1: + # 独立跳转 + card["btns"] = [ + {"title": btn.get('title', ''), "actionURL": btn.get('url', '')} + for btn in buttons + ] + + return {"msgtype": "actionCard", "actionCard": card} + + elif msg_type == 'feedCard': + links = result.get('links', []) + return { + "msgtype": "feedCard", + "feedCard": { + "links": [ + { + "title": link.get('title', ''), + "messageURL": link.get('url', ''), + "picURL": link.get('pic_url', '') + } + for link in links + ] + } + } + + else: # markdown(默认) + return { + "msgtype": "markdown", + "markdown": {"title": title, "text": content}, + "at": result.get('at', {}) + } + + def _build_wecom_payload(self, msg_type: str, title: str, content: str, result: dict) -> dict: + """构建企微消息体 + + 支持类型: + - text: 纯文本 + - markdown: Markdown + - image: 图片 + - news: 图文消息 + - template_card: 模板卡片 + """ + if msg_type == 'text': + payload = { + "msgtype": "text", + "text": {"content": content} + } + if result.get('mentioned_list'): + payload["text"]["mentioned_list"] = result.get('mentioned_list') + if result.get('mentioned_mobile_list'): + payload["text"]["mentioned_mobile_list"] = result.get('mentioned_mobile_list') + return payload + + elif msg_type == 'image': + return { + "msgtype": "image", + "image": { + "base64": result.get('image_base64', ''), + "md5": result.get('image_md5', '') + } + } + + elif msg_type == 'news': + articles = result.get('articles', []) + if not articles and content: + articles = [{ + "title": title, + "description": content, + "url": result.get('url', ''), + "picurl": result.get('pic_url', '') + }] + return { + "msgtype": "news", + "news": {"articles": articles} + } + + elif msg_type == 'template_card': + card_type = result.get('card_type', 'text_notice') + + if card_type == 'text_notice': + # 文本通知卡片 + return { + "msgtype": "template_card", + "template_card": { + "card_type": "text_notice", + "main_title": {"title": title, "desc": result.get('desc', '')}, + "sub_title_text": content, + "horizontal_content_list": result.get('horizontal_list', []), + "jump_list": result.get('jump_list', []), + "card_action": result.get('card_action', {"type": 1, "url": ""}) + } + } + + elif card_type == 'news_notice': + # 图文展示卡片 + return { + "msgtype": "template_card", + "template_card": { + "card_type": "news_notice", + "main_title": {"title": title, "desc": result.get('desc', '')}, + "card_image": {"url": result.get('image_url', ''), "aspect_ratio": result.get('aspect_ratio', 1.3)}, + "vertical_content_list": result.get('vertical_list', []), + "horizontal_content_list": result.get('horizontal_list', []), + "jump_list": result.get('jump_list', []), + "card_action": result.get('card_action', {"type": 1, "url": ""}) + } + } + + elif card_type == 'button_interaction': + # 按钮交互卡片 + return { + "msgtype": "template_card", + "template_card": { + "card_type": "button_interaction", + "main_title": {"title": title, "desc": result.get('desc', '')}, + "sub_title_text": content, + "horizontal_content_list": result.get('horizontal_list', []), + "button_list": result.get('buttons', []), + "card_action": result.get('card_action', {"type": 1, "url": ""}) + } + } + + else: # markdown(默认) + return { + "msgtype": "markdown", + "markdown": {"content": f"**{title}**\n\n{content}"} + } + + async def _send_to_wecom_app(self, db: Session, app_id: int, result: dict, tenant_id: str): + """发送消息到企微应用""" + from ..models.tenant_wechat_app import TenantWechatApp + + app = db.query(TenantWechatApp).filter(TenantWechatApp.id == app_id).first() + if not app: + return + + # 获取 access_token + access_token = await self._get_wecom_access_token(app.corp_id, app.app_secret) + if not access_token: + return + + title = result.get('title', '通知') + content = result.get('content', '') + + # 发送消息 + url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}" + payload = { + "touser": result.get('touser', '@all'), + "msgtype": "markdown", + "agentid": app.agent_id, + "markdown": { + "content": f"**{title}**\n\n{content}" + } + } + + async with httpx.AsyncClient(timeout=10) as client: + response = await client.post(url, json=payload) + result = response.json() + if result.get('errcode') != 0: + print(f"企微应用消息发送失败: {result}") + + async def _get_wecom_access_token(self, corp_id: str, app_secret: str) -> Optional[str]: + """获取企微 access_token""" + url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corp_id}&corpsecret={app_secret}" + + async with httpx.AsyncClient(timeout=10) as client: + response = await client.get(url) + result = response.json() + if result.get('errcode') == 0: + return result.get('access_token') + else: + print(f"获取企微 access_token 失败: {result}") + return None + + async def _send_alert(self, task: ScheduledTask, error: str): + """发送失败告警""" + if not task.alert_webhook: + return + + content = f"""### 定时任务执行失败告警 + +**任务名称**: {task.task_name} +**任务ID**: {task.id} +**租户**: {task.tenant_id or '全局'} +**失败时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} +**错误信息**: +``` +{error[:500] if error else '未知错误'} +```""" + + try: + # 判断是钉钉还是企微 + if 'dingtalk' in task.alert_webhook or 'oapi.dingtalk.com' in task.alert_webhook: + payload = { + "msgtype": "markdown", + "markdown": {"title": "任务失败告警", "text": content} + } + else: + payload = { + "msgtype": "markdown", + "markdown": {"content": content} + } + + async with httpx.AsyncClient(timeout=10) as client: + await client.post(task.alert_webhook, json=payload) + except Exception as e: + print(f"发送告警失败: {e}") + + async def run_task_now(self, task_id: int) -> dict: + """立即执行任务(手动触发)""" + db = SessionLocal() + try: + task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() + if not task: + return {"success": False, "error": "任务不存在"} + + success, output, error = await self._execute_task_once(db, task) + + return { + "success": success, + "output": output, + "error": error + } + finally: + db.close() + + +# 全局调度器实例 +scheduler_service = SchedulerService() diff --git a/backend/app/services/script_executor.py b/backend/app/services/script_executor.py index 8fc1279..48b19f3 100644 --- a/backend/app/services/script_executor.py +++ b/backend/app/services/script_executor.py @@ -1,285 +1,285 @@ -"""脚本执行器 - 安全执行Python脚本""" -import sys -import traceback -from io import StringIO -from typing import Any, Dict, Optional, Tuple -from datetime import datetime -from sqlalchemy.orm import Session - -from .script_sdk import ScriptSDK - - -# 禁止导入的模块 -FORBIDDEN_MODULES = { - 'os', 'subprocess', 'shutil', 'pathlib', - 'socket', 'ftplib', 'telnetlib', 'smtplib', - 'pickle', 'shelve', 'marshal', - 'ctypes', 'multiprocessing', - '__builtins__', 'builtins', - 'importlib', 'imp', - 'code', 'codeop', 'compile', -} - -# 允许的内置函数 -ALLOWED_BUILTINS = { - 'abs', 'all', 'any', 'ascii', 'bin', 'bool', 'bytearray', 'bytes', - 'callable', 'chr', 'complex', 'dict', 'dir', 'divmod', 'enumerate', - 'filter', 'float', 'format', 'frozenset', 'getattr', 'hasattr', 'hash', - 'hex', 'id', 'int', 'isinstance', 'issubclass', 'iter', 'len', 'list', - 'map', 'max', 'min', 'next', 'object', 'oct', 'ord', 'pow', 'print', - 'range', 'repr', 'reversed', 'round', 'set', 'setattr', 'slice', - 'sorted', 'str', 'sum', 'tuple', 'type', 'vars', 'zip', - 'True', 'False', 'None', - 'Exception', 'BaseException', 'ValueError', 'TypeError', 'KeyError', - 'IndexError', 'AttributeError', 'RuntimeError', 'StopIteration', -} - - -class ScriptExecutor: - """脚本执行器""" - - def __init__(self, db: Session): - self.db = db - - def execute( - self, - script_content: str, - task_id: int, - tenant_id: Optional[str] = None, - trace_id: Optional[str] = None, - params: Optional[Dict[str, Any]] = None, - timeout: int = 300 - ) -> Tuple[bool, str, str, Optional[Dict]]: - """执行脚本 - - Args: - script_content: Python脚本内容 - task_id: 任务ID - tenant_id: 租户ID - trace_id: 追踪ID - params: 输入参数 - timeout: 超时秒数 - - Returns: - (success, output, error, result) - result: 脚本返回值 {'content': '...', 'title': '...'} - """ - # 创建SDK实例 - sdk = ScriptSDK( - db=self.db, - task_id=task_id, - tenant_id=tenant_id, - trace_id=trace_id, - params=params or {} - ) - - # 检查脚本安全性 - check_result = self._check_script_safety(script_content) - if check_result: - return False, '', f"脚本安全检查失败: {check_result}", None - - # 准备执行环境 - safe_globals = self._create_safe_globals(sdk) - - # 捕获输出 - old_stdout = sys.stdout - old_stderr = sys.stderr - stdout_capture = StringIO() - stderr_capture = StringIO() - - try: - sys.stdout = stdout_capture - sys.stderr = stderr_capture - - # 编译并执行脚本 - compiled = compile(script_content, ' - - + + + diff --git a/frontend/src/views/error/index.vue b/frontend/src/views/error/index.vue index aca8a85..eb8527e 100644 --- a/frontend/src/views/error/index.vue +++ b/frontend/src/views/error/index.vue @@ -1,223 +1,223 @@ - - - - - + + + + + diff --git a/frontend/src/views/notification-channels/index.vue b/frontend/src/views/notification-channels/index.vue index 1137b28..fbb5b56 100644 --- a/frontend/src/views/notification-channels/index.vue +++ b/frontend/src/views/notification-channels/index.vue @@ -1,317 +1,317 @@ - - - - - + + + + + diff --git a/frontend/src/views/scheduled-tasks/index.vue b/frontend/src/views/scheduled-tasks/index.vue index 2392be1..681ee4f 100644 --- a/frontend/src/views/scheduled-tasks/index.vue +++ b/frontend/src/views/scheduled-tasks/index.vue @@ -1,1187 +1,1187 @@ - - - - - + + + + +