diff --git a/backend/app/routers/tasks.py b/backend/app/routers/tasks.py index 02eb734..e48aede 100644 --- a/backend/app/routers/tasks.py +++ b/backend/app/routers/tasks.py @@ -28,8 +28,10 @@ class TaskCreate(BaseModel): schedule_type: str = "simple" # simple | cron time_points: Optional[List[str]] = None # ["09:00", "14:00"] cron_expression: Optional[str] = None # "0 9,14 * * *" - webhook_url: str + execution_type: str = "webhook" # webhook | script + webhook_url: Optional[str] = None input_params: Optional[dict] = None + script_content: Optional[str] = None is_enabled: bool = True @@ -39,8 +41,15 @@ class TaskUpdate(BaseModel): schedule_type: Optional[str] = None time_points: Optional[List[str]] = None cron_expression: Optional[str] = None + execution_type: Optional[str] = None webhook_url: Optional[str] = None input_params: Optional[dict] = None + script_content: Optional[str] = None + + +class ScriptTestRequest(BaseModel): + tenant_id: str + script_content: str # API Endpoints @@ -110,6 +119,14 @@ async def create_task( if not data.cron_expression: raise HTTPException(status_code=400, detail="CRON模式需要提供表达式") + # 验证执行配置 + if data.execution_type == "webhook": + if not data.webhook_url: + raise HTTPException(status_code=400, detail="Webhook模式需要提供URL") + elif data.execution_type == "script": + if not data.script_content: + raise HTTPException(status_code=400, detail="脚本模式需要提供脚本内容") + # 插入数据库 import json time_points_json = json.dumps(data.time_points) if data.time_points else None @@ -119,9 +136,9 @@ async def create_task( text(""" INSERT INTO platform_scheduled_tasks (tenant_id, task_name, task_desc, schedule_type, time_points, - cron_expression, webhook_url, input_params, is_enabled) + cron_expression, execution_type, webhook_url, input_params, script_content, is_enabled) VALUES (:tenant_id, :task_name, :task_desc, :schedule_type, :time_points, - :cron_expression, :webhook_url, :input_params, :is_enabled) + :cron_expression, :execution_type, :webhook_url, :input_params, :script_content, :is_enabled) """), { "tenant_id": data.tenant_id, @@ -130,8 +147,10 @@ async def create_task( "schedule_type": data.schedule_type, "time_points": time_points_json, "cron_expression": data.cron_expression, + "execution_type": data.execution_type, "webhook_url": data.webhook_url, "input_params": input_params_json, + "script_content": data.script_content, "is_enabled": 1 if data.is_enabled else 0 } ) @@ -209,9 +228,15 @@ async def update_task( if data.cron_expression is not None: updates.append("cron_expression = :cron_expression") params["cron_expression"] = data.cron_expression + if data.execution_type is not None: + updates.append("execution_type = :execution_type") + params["execution_type"] = data.execution_type if data.webhook_url is not None: updates.append("webhook_url = :webhook_url") params["webhook_url"] = data.webhook_url + if data.script_content is not None: + updates.append("script_content = :script_content") + params["script_content"] = data.script_content if data.input_params is not None: updates.append("input_params = :input_params") params["input_params"] = json.dumps(data.input_params) @@ -354,3 +379,96 @@ async def get_task_logs( "size": size, "items": logs } + + +@router.post("/test-script") +async def test_script( + data: ScriptTestRequest, + user: User = Depends(require_operator) +): + """测试执行脚本(不记录日志)""" + from ..services.script_executor import test_script as run_test + + if not data.script_content or not data.script_content.strip(): + raise HTTPException(status_code=400, detail="脚本内容不能为空") + + result = await run_test( + tenant_id=data.tenant_id, + script_content=data.script_content + ) + + return result.to_dict() + + +@router.get("/sdk-docs") +async def get_sdk_docs(): + """获取 SDK 文档""" + return { + "description": "脚本执行 SDK 文档", + "methods": [ + { + "name": "ai(prompt, system=None, model='gemini-2.5-flash')", + "description": "调用大模型生成内容", + "example": "result = ai('帮我写一段营销文案')" + }, + { + "name": "dingtalk(webhook, content, msg_type='text', at_mobiles=None)", + "description": "发送钉钉群消息", + "example": "dingtalk('https://oapi.dingtalk.com/robot/send?access_token=xxx', '消息内容')" + }, + { + "name": "wecom(webhook, content, msg_type='text')", + "description": "发送企业微信群消息", + "example": "wecom('https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx', '消息内容')" + }, + { + "name": "db(sql, params=None)", + "description": "执行 SQL 查询(仅支持 SELECT)", + "example": "rows = db('SELECT * FROM users WHERE id = :id', {'id': 1})" + }, + { + "name": "http_get(url, headers=None, params=None)", + "description": "发送 HTTP GET 请求", + "example": "data = http_get('https://api.example.com/data')" + }, + { + "name": "http_post(url, data=None, json_data=None, headers=None)", + "description": "发送 HTTP POST 请求", + "example": "data = http_post('https://api.example.com/submit', json_data={'key': 'value'})" + }, + { + "name": "get_var(key, default=None)", + "description": "获取存储的变量(跨执行持久化)", + "example": "count = get_var('run_count', 0)" + }, + { + "name": "set_var(key, value)", + "description": "存储变量(跨执行持久化)", + "example": "set_var('run_count', count + 1)" + }, + { + "name": "log(message, level='INFO')", + "description": "记录日志", + "example": "log('任务执行完成')" + } + ], + "example_script": '''# 示例:每日推送 AI 生成的内容到钉钉 +import json + +# 获取历史数据 +history = get_var('history', []) + +# 调用 AI 生成内容 +prompt = f"根据以下信息生成今日营销文案:{json.dumps(history[-5:], ensure_ascii=False)}" +content = ai(prompt, system="你是一个专业的营销文案专家") + +# 发送到钉钉 +dingtalk( + webhook="你的钉钉机器人Webhook", + content=content +) + +# 记录日志 +log(f"已发送: {content[:50]}...") +''' + } diff --git a/backend/app/services/scheduler.py b/backend/app/services/scheduler.py index 976fc2f..f03c059 100644 --- a/backend/app/services/scheduler.py +++ b/backend/app/services/scheduler.py @@ -69,26 +69,64 @@ async def execute_task(task_id: int): result = db.execute(text("SELECT LAST_INSERT_ID() as id")) log_id = result.scalar() - # 4. 调用 webhook - webhook_url = task["webhook_url"] - input_params = task["input_params"] or {} + # 生成 trace_id + trace_id = f"task_{task_id}_{datetime.now().strftime('%Y%m%d%H%M%S%f')}" - async with httpx.AsyncClient(timeout=300.0) as client: - response = await client.post( - webhook_url, - json=input_params, - headers={"Content-Type": "application/json"} - ) + # 4. 根据执行类型分发 + execution_type = task.get("execution_type", "webhook") + + if execution_type == "script": + # 脚本执行模式 + from .script_executor import execute_script as run_script - response_code = response.status_code - response_body = response.text[:5000] if response.text else "" # 限制存储长度 - - if response.is_success: - status = "success" - error_message = None - else: + script_content = task.get("script_content", "") + if not script_content: status = "failed" - error_message = f"HTTP {response_code}" + error_message = "脚本内容为空" + response_code = None + response_body = "" + else: + script_result = await run_script( + task_id=task_id, + tenant_id=task["tenant_id"], + script_content=script_content, + trace_id=trace_id + ) + + if script_result.success: + status = "success" + error_message = None + else: + status = "failed" + error_message = script_result.error + + response_code = None + response_body = script_result.output[:5000] if script_result.output else "" + + # 添加日志到响应体 + if script_result.logs: + response_body += "\n\n--- 执行日志 ---\n" + "\n".join(script_result.logs[-20:]) + else: + # Webhook 执行模式 + webhook_url = task["webhook_url"] + input_params = task["input_params"] or {} + + async with httpx.AsyncClient(timeout=300.0) as client: + response = await client.post( + webhook_url, + json=input_params, + headers={"Content-Type": "application/json"} + ) + + response_code = response.status_code + response_body = response.text[:5000] if response.text else "" # 限制存储长度 + + if response.is_success: + status = "success" + error_message = None + else: + status = "failed" + error_message = f"HTTP {response_code}" # 5. 更新执行日志 db.execute( diff --git a/backend/app/services/script_executor.py b/backend/app/services/script_executor.py new file mode 100644 index 0000000..464135d --- /dev/null +++ b/backend/app/services/script_executor.py @@ -0,0 +1,262 @@ +"""脚本执行器 - 安全执行 Python 脚本""" +import asyncio +import io +import logging +import sys +import traceback +from contextlib import redirect_stdout, redirect_stderr +from datetime import datetime +from typing import Any, Dict + +from .script_sdk import ScriptSDK + +logger = logging.getLogger(__name__) + +# 执行超时时间(秒) +SCRIPT_TIMEOUT = 300 # 5 分钟 + +# 禁止导入的模块 +FORBIDDEN_MODULES = { + 'os', 'subprocess', 'sys', 'builtins', '__builtins__', + 'importlib', 'eval', 'exec', 'compile', + 'open', 'file', 'input', + 'socket', 'multiprocessing', 'threading', + 'pickle', 'marshal', 'ctypes', + 'code', 'codeop', 'pty', 'tty', +} + + +class ScriptExecutionResult: + """脚本执行结果""" + + def __init__( + self, + success: bool, + output: str = "", + error: str = None, + logs: list = None, + execution_time_ms: int = 0 + ): + self.success = success + self.output = output + self.error = error + self.logs = logs or [] + self.execution_time_ms = execution_time_ms + + def to_dict(self) -> Dict[str, Any]: + return { + "success": self.success, + "output": self.output, + "error": self.error, + "logs": self.logs, + "execution_time_ms": self.execution_time_ms + } + + +def create_safe_builtins() -> Dict[str, Any]: + """创建安全的内置函数集""" + import builtins + + # 允许的内置函数 + allowed = [ + 'abs', 'all', 'any', 'ascii', 'bin', 'bool', 'bytearray', 'bytes', + 'callable', 'chr', 'complex', 'dict', 'divmod', 'enumerate', + 'filter', 'float', 'format', 'frozenset', 'getattr', 'hasattr', + 'hash', 'hex', 'id', 'int', 'isinstance', 'issubclass', 'iter', + 'len', 'list', 'map', 'max', 'min', 'next', 'object', 'oct', + 'ord', 'pow', 'print', 'range', 'repr', 'reversed', 'round', + 'set', 'slice', 'sorted', 'str', 'sum', 'tuple', 'type', 'zip', + 'True', 'False', 'None', + ] + + safe_builtins = {} + for name in allowed: + if hasattr(builtins, name): + safe_builtins[name] = getattr(builtins, name) + + # 添加安全的 import 函数 + def safe_import(name, *args, **kwargs): + """安全的 import 函数,只允许特定模块""" + allowed_modules = { + 'json', 'datetime', 'time', 're', 'math', 'random', + 'collections', 'itertools', 'functools', 'operator', + 'string', 'textwrap', 'unicodedata', + 'hashlib', 'base64', 'urllib.parse', + } + + if name in FORBIDDEN_MODULES: + raise ImportError(f"禁止导入模块: {name}") + + if name not in allowed_modules and not name.startswith('urllib.parse'): + raise ImportError(f"不允许导入模块: {name},允许的模块: {', '.join(sorted(allowed_modules))}") + + return __builtins__['__import__'](name, *args, **kwargs) + + safe_builtins['__import__'] = safe_import + + return safe_builtins + + +async def execute_script( + task_id: int, + tenant_id: str, + script_content: str, + trace_id: str = None +) -> ScriptExecutionResult: + """ + 执行 Python 脚本 + + Args: + task_id: 任务 ID + tenant_id: 租户 ID + script_content: 脚本内容 + trace_id: 追踪 ID + + Returns: + ScriptExecutionResult: 执行结果 + """ + start_time = datetime.now() + sdk = None + + try: + # 创建 SDK 实例 + sdk = ScriptSDK(tenant_id, task_id, trace_id) + + # 准备执行环境 + script_globals = { + '__builtins__': create_safe_builtins(), + '__name__': '__script__', + + # SDK 实例 + 'sdk': sdk, + + # 快捷方法(同步包装) + 'ai': lambda *args, **kwargs: asyncio.get_event_loop().run_until_complete(sdk.ai_chat(*args, **kwargs)), + 'dingtalk': lambda *args, **kwargs: asyncio.get_event_loop().run_until_complete(sdk.send_dingtalk(*args, **kwargs)), + 'wecom': lambda *args, **kwargs: asyncio.get_event_loop().run_until_complete(sdk.send_wecom(*args, **kwargs)), + 'http_get': lambda *args, **kwargs: asyncio.get_event_loop().run_until_complete(sdk.http_get(*args, **kwargs)), + 'http_post': lambda *args, **kwargs: asyncio.get_event_loop().run_until_complete(sdk.http_post(*args, **kwargs)), + + # 同步方法 + 'db': sdk.db_query, + 'get_var': sdk.get_var, + 'set_var': sdk.set_var, + 'delete_var': sdk.delete_var, + 'log': sdk.log, + + # 常用模块 + 'json': __import__('json'), + 'datetime': __import__('datetime'), + 're': __import__('re'), + 'math': __import__('math'), + 'random': __import__('random'), + } + + # 捕获输出 + stdout = io.StringIO() + stderr = io.StringIO() + + sdk.log("脚本开始执行") + + # 编译并执行脚本 + try: + # 编译脚本 + code = compile(script_content, '