diff --git a/backend/app/routers/tasks.py b/backend/app/routers/tasks.py
index e48aede..8e1e4c1 100644
--- a/backend/app/routers/tasks.py
+++ b/backend/app/routers/tasks.py
@@ -33,6 +33,11 @@ class TaskCreate(BaseModel):
input_params: Optional[dict] = None
script_content: Optional[str] = None
is_enabled: bool = True
+ # 重试和告警
+ retry_count: int = 0
+ retry_interval: int = 60
+ alert_on_failure: bool = False
+ alert_webhook: Optional[str] = None
class TaskUpdate(BaseModel):
@@ -45,6 +50,11 @@ class TaskUpdate(BaseModel):
webhook_url: Optional[str] = None
input_params: Optional[dict] = None
script_content: Optional[str] = None
+ # 重试和告警
+ retry_count: Optional[int] = None
+ retry_interval: Optional[int] = None
+ alert_on_failure: Optional[bool] = None
+ alert_webhook: Optional[str] = None
class ScriptTestRequest(BaseModel):
@@ -52,6 +62,36 @@ class ScriptTestRequest(BaseModel):
script_content: str
+# 密钥管理
+class SecretCreate(BaseModel):
+ tenant_id: Optional[str] = None # None 表示全局
+ secret_key: str
+ secret_value: str
+ description: Optional[str] = None
+
+
+class SecretUpdate(BaseModel):
+ secret_value: Optional[str] = None
+ description: Optional[str] = None
+
+
+# 脚本模板
+class TemplateCreate(BaseModel):
+ name: str
+ description: Optional[str] = None
+ category: Optional[str] = None
+ script_content: str
+ is_public: bool = True
+
+
+class TemplateUpdate(BaseModel):
+ name: Optional[str] = None
+ description: Optional[str] = None
+ category: Optional[str] = None
+ script_content: Optional[str] = None
+ is_public: Optional[bool] = None
+
+
# API Endpoints
@router.get("")
@@ -136,9 +176,11 @@ async def create_task(
text("""
INSERT INTO platform_scheduled_tasks
(tenant_id, task_name, task_desc, schedule_type, time_points,
- cron_expression, execution_type, webhook_url, input_params, script_content, is_enabled)
+ cron_expression, execution_type, webhook_url, input_params, script_content, is_enabled,
+ retry_count, retry_interval, alert_on_failure, alert_webhook)
VALUES (:tenant_id, :task_name, :task_desc, :schedule_type, :time_points,
- :cron_expression, :execution_type, :webhook_url, :input_params, :script_content, :is_enabled)
+ :cron_expression, :execution_type, :webhook_url, :input_params, :script_content, :is_enabled,
+ :retry_count, :retry_interval, :alert_on_failure, :alert_webhook)
"""),
{
"tenant_id": data.tenant_id,
@@ -151,7 +193,11 @@ async def create_task(
"webhook_url": data.webhook_url,
"input_params": input_params_json,
"script_content": data.script_content,
- "is_enabled": 1 if data.is_enabled else 0
+ "is_enabled": 1 if data.is_enabled else 0,
+ "retry_count": data.retry_count,
+ "retry_interval": data.retry_interval,
+ "alert_on_failure": 1 if data.alert_on_failure else 0,
+ "alert_webhook": data.alert_webhook
}
)
db.commit()
@@ -240,6 +286,42 @@ async def update_task(
if data.input_params is not None:
updates.append("input_params = :input_params")
params["input_params"] = json.dumps(data.input_params)
+ if data.retry_count is not None:
+ updates.append("retry_count = :retry_count")
+ params["retry_count"] = data.retry_count
+ if data.retry_interval is not None:
+ updates.append("retry_interval = :retry_interval")
+ params["retry_interval"] = data.retry_interval
+ if data.alert_on_failure is not None:
+ updates.append("alert_on_failure = :alert_on_failure")
+ params["alert_on_failure"] = 1 if data.alert_on_failure else 0
+ if data.alert_webhook is not None:
+ updates.append("alert_webhook = :alert_webhook")
+ params["alert_webhook"] = data.alert_webhook
+
+ # 如果更新了脚本内容,自动保存版本
+ if data.script_content is not None and data.script_content.strip():
+ # 获取当前最大版本号
+ version_result = db.execute(
+ text("SELECT COALESCE(MAX(version), 0) FROM platform_script_versions WHERE task_id = :task_id"),
+ {"task_id": task_id}
+ )
+ max_version = version_result.scalar() or 0
+ new_version = max_version + 1
+
+ # 插入新版本
+ db.execute(
+ text("""
+ INSERT INTO platform_script_versions (task_id, version, script_content, created_by)
+ VALUES (:task_id, :version, :script_content, :created_by)
+ """),
+ {
+ "task_id": task_id,
+ "version": new_version,
+ "script_content": data.script_content,
+ "created_by": user.username if hasattr(user, 'username') else None
+ }
+ )
if updates:
db.execute(
@@ -450,25 +532,415 @@ async def get_sdk_docs():
"name": "log(message, level='INFO')",
"description": "记录日志",
"example": "log('任务执行完成')"
+ },
+ {
+ "name": "get_tenants(app_code=None)",
+ "description": "获取租户列表(多租户任务遍历)",
+ "example": "tenants = get_tenants('review-generator')"
+ },
+ {
+ "name": "get_tenant_config(tenant_id, app_code, key=None)",
+ "description": "获取指定租户的应用配置",
+ "example": "webhook = get_tenant_config('tenant1', 'my-app', 'dingtalk_webhook')"
+ },
+ {
+ "name": "get_all_tenant_configs(app_code)",
+ "description": "批量获取所有租户的应用配置",
+ "example": "all_configs = get_all_tenant_configs('my-app')"
+ },
+ {
+ "name": "get_secret(key)",
+ "description": "获取密钥(优先租户级,其次全局)",
+ "example": "api_key = get_secret('openai_api_key')"
}
],
- "example_script": '''# 示例:每日推送 AI 生成的内容到钉钉
+ "example_script": '''# 示例:多租户批量推送
import json
-# 获取历史数据
-history = get_var('history', [])
+# 获取所有订阅了某应用的租户配置
+tenants = get_all_tenant_configs('daily-report')
-# 调用 AI 生成内容
-prompt = f"根据以下信息生成今日营销文案:{json.dumps(history[-5:], ensure_ascii=False)}"
-content = ai(prompt, system="你是一个专业的营销文案专家")
-
-# 发送到钉钉
-dingtalk(
- webhook="你的钉钉机器人Webhook",
- content=content
-)
-
-# 记录日志
-log(f"已发送: {content[:50]}...")
+for tenant in tenants:
+ tenant_id = tenant['tenant_id']
+ tenant_name = tenant['tenant_name']
+ configs = tenant['configs']
+
+ # 获取该租户的钉钉 Webhook
+ webhook = configs.get('dingtalk_webhook')
+ if not webhook:
+ log(f"租户 {tenant_name} 未配置 webhook,跳过")
+ continue
+
+ # 调用 AI 生成内容
+ content = ai(f"为 {tenant_name} 生成今日报告", system="你是报告生成专家")
+
+ # 发送
+ dingtalk(webhook=webhook, content=content)
+ log(f"已发送给租户: {tenant_name}")
'''
}
+
+
+# ============ 密钥管理 ============
+
+@router.get("/secrets")
+async def list_secrets(
+ tenant_id: Optional[str] = None,
+ user: User = Depends(require_operator),
+ db: Session = Depends(get_db)
+):
+ """获取密钥列表"""
+ if tenant_id:
+ result = db.execute(
+ text("SELECT id, tenant_id, secret_key, description, created_at FROM platform_secrets WHERE tenant_id = :tenant_id OR tenant_id IS NULL ORDER BY tenant_id, secret_key"),
+ {"tenant_id": tenant_id}
+ )
+ else:
+ result = db.execute(
+ text("SELECT id, tenant_id, secret_key, description, created_at FROM platform_secrets ORDER BY tenant_id, secret_key")
+ )
+ secrets = [dict(row) for row in result.mappings().all()]
+ return {"items": secrets}
+
+
+@router.post("/secrets")
+async def create_secret(
+ data: SecretCreate,
+ user: User = Depends(require_operator),
+ db: Session = Depends(get_db)
+):
+ """创建密钥"""
+ # 检查是否已存在
+ result = db.execute(
+ text("SELECT id FROM platform_secrets WHERE tenant_id <=> :tenant_id AND secret_key = :key"),
+ {"tenant_id": data.tenant_id, "key": data.secret_key}
+ )
+ if result.scalar():
+ raise HTTPException(status_code=400, detail="密钥已存在")
+
+ db.execute(
+ text("""
+ INSERT INTO platform_secrets (tenant_id, secret_key, secret_value, description)
+ VALUES (:tenant_id, :key, :value, :desc)
+ """),
+ {
+ "tenant_id": data.tenant_id,
+ "key": data.secret_key,
+ "value": data.secret_value,
+ "desc": data.description
+ }
+ )
+ db.commit()
+ return {"message": "创建成功"}
+
+
+@router.put("/secrets/{secret_id}")
+async def update_secret(
+ secret_id: int,
+ data: SecretUpdate,
+ user: User = Depends(require_operator),
+ db: Session = Depends(get_db)
+):
+ """更新密钥"""
+ updates = []
+ params = {"id": secret_id}
+
+ if data.secret_value is not None:
+ updates.append("secret_value = :value")
+ params["value"] = data.secret_value
+ if data.description is not None:
+ updates.append("description = :desc")
+ params["desc"] = data.description
+
+ if updates:
+ db.execute(
+ text(f"UPDATE platform_secrets SET {', '.join(updates)} WHERE id = :id"),
+ params
+ )
+ db.commit()
+
+ return {"message": "更新成功"}
+
+
+@router.delete("/secrets/{secret_id}")
+async def delete_secret(
+ secret_id: int,
+ user: User = Depends(require_operator),
+ db: Session = Depends(get_db)
+):
+ """删除密钥"""
+ db.execute(text("DELETE FROM platform_secrets WHERE id = :id"), {"id": secret_id})
+ db.commit()
+ return {"message": "删除成功"}
+
+
+# ============ 脚本模板 ============
+
+@router.get("/templates")
+async def list_templates(
+ category: Optional[str] = None,
+ user: User = Depends(get_current_user),
+ db: Session = Depends(get_db)
+):
+ """获取模板列表"""
+ if category:
+ result = db.execute(
+ text("SELECT id, name, description, category, is_public, created_by, created_at FROM platform_script_templates WHERE category = :category ORDER BY id DESC"),
+ {"category": category}
+ )
+ else:
+ result = db.execute(
+ text("SELECT id, name, description, category, is_public, created_by, created_at FROM platform_script_templates ORDER BY id DESC")
+ )
+ templates = [dict(row) for row in result.mappings().all()]
+ return {"items": templates}
+
+
+@router.get("/templates/{template_id}")
+async def get_template(
+ template_id: int,
+ user: User = Depends(get_current_user),
+ db: Session = Depends(get_db)
+):
+ """获取模板详情"""
+ result = db.execute(
+ text("SELECT * FROM platform_script_templates WHERE id = :id"),
+ {"id": template_id}
+ )
+ template = result.mappings().first()
+ if not template:
+ raise HTTPException(status_code=404, detail="模板不存在")
+ return dict(template)
+
+
+@router.post("/templates")
+async def create_template(
+ data: TemplateCreate,
+ user: User = Depends(require_operator),
+ db: Session = Depends(get_db)
+):
+ """创建模板"""
+ db.execute(
+ text("""
+ INSERT INTO platform_script_templates (name, description, category, script_content, is_public, created_by)
+ VALUES (:name, :desc, :category, :content, :is_public, :created_by)
+ """),
+ {
+ "name": data.name,
+ "desc": data.description,
+ "category": data.category,
+ "content": data.script_content,
+ "is_public": 1 if data.is_public else 0,
+ "created_by": user.username if hasattr(user, 'username') else None
+ }
+ )
+ db.commit()
+
+ result = db.execute(text("SELECT LAST_INSERT_ID() as id"))
+ template_id = result.scalar()
+
+ return {"id": template_id, "message": "创建成功"}
+
+
+@router.put("/templates/{template_id}")
+async def update_template(
+ template_id: int,
+ data: TemplateUpdate,
+ user: User = Depends(require_operator),
+ db: Session = Depends(get_db)
+):
+ """更新模板"""
+ updates = []
+ params = {"id": template_id}
+
+ if data.name is not None:
+ updates.append("name = :name")
+ params["name"] = data.name
+ if data.description is not None:
+ updates.append("description = :desc")
+ params["desc"] = data.description
+ if data.category is not None:
+ updates.append("category = :category")
+ params["category"] = data.category
+ if data.script_content is not None:
+ updates.append("script_content = :content")
+ params["content"] = data.script_content
+ if data.is_public is not None:
+ updates.append("is_public = :is_public")
+ params["is_public"] = 1 if data.is_public else 0
+
+ if updates:
+ db.execute(
+ text(f"UPDATE platform_script_templates SET {', '.join(updates)} WHERE id = :id"),
+ params
+ )
+ db.commit()
+
+ return {"message": "更新成功"}
+
+
+@router.delete("/templates/{template_id}")
+async def delete_template(
+ template_id: int,
+ user: User = Depends(require_operator),
+ db: Session = Depends(get_db)
+):
+ """删除模板"""
+ db.execute(text("DELETE FROM platform_script_templates WHERE id = :id"), {"id": template_id})
+ db.commit()
+ return {"message": "删除成功"}
+
+
+# ============ 脚本版本管理 ============
+
+@router.get("/{task_id}/versions")
+async def list_versions(
+ task_id: int,
+ user: User = Depends(get_current_user),
+ db: Session = Depends(get_db)
+):
+ """获取任务的脚本版本列表"""
+ result = db.execute(
+ text("SELECT id, version, change_note, created_by, created_at FROM platform_script_versions WHERE task_id = :task_id ORDER BY version DESC"),
+ {"task_id": task_id}
+ )
+ versions = [dict(row) for row in result.mappings().all()]
+ return {"items": versions}
+
+
+@router.get("/{task_id}/versions/{version}")
+async def get_version(
+ task_id: int,
+ version: int,
+ user: User = Depends(get_current_user),
+ db: Session = Depends(get_db)
+):
+ """获取指定版本的脚本内容"""
+ result = db.execute(
+ text("SELECT * FROM platform_script_versions WHERE task_id = :task_id AND version = :version"),
+ {"task_id": task_id, "version": version}
+ )
+ ver = result.mappings().first()
+ if not ver:
+ raise HTTPException(status_code=404, detail="版本不存在")
+ return dict(ver)
+
+
+@router.post("/{task_id}/versions/{version}/rollback")
+async def rollback_version(
+ task_id: int,
+ version: int,
+ user: User = Depends(require_operator),
+ db: Session = Depends(get_db)
+):
+ """回滚到指定版本"""
+ # 获取指定版本的脚本内容
+ result = db.execute(
+ text("SELECT script_content FROM platform_script_versions WHERE task_id = :task_id AND version = :version"),
+ {"task_id": task_id, "version": version}
+ )
+ ver = result.first()
+ if not ver:
+ raise HTTPException(status_code=404, detail="版本不存在")
+
+ script_content = ver[0]
+
+ # 更新任务脚本
+ db.execute(
+ text("UPDATE platform_scheduled_tasks SET script_content = :content WHERE id = :id"),
+ {"id": task_id, "content": script_content}
+ )
+
+ # 创建新版本记录
+ version_result = db.execute(
+ text("SELECT COALESCE(MAX(version), 0) FROM platform_script_versions WHERE task_id = :task_id"),
+ {"task_id": task_id}
+ )
+ max_version = version_result.scalar() or 0
+ new_version = max_version + 1
+
+ db.execute(
+ text("""
+ INSERT INTO platform_script_versions (task_id, version, script_content, change_note, created_by)
+ VALUES (:task_id, :version, :content, :note, :created_by)
+ """),
+ {
+ "task_id": task_id,
+ "version": new_version,
+ "content": script_content,
+ "note": f"回滚到版本 {version}",
+ "created_by": user.username if hasattr(user, 'username') else None
+ }
+ )
+ db.commit()
+
+ # 重新加载任务
+ reload_task(task_id)
+
+ return {"message": f"已回滚到版本 {version},当前版本号 {new_version}"}
+
+
+# ============ 统计数据 ============
+
+@router.get("/{task_id}/stats")
+async def get_task_stats(
+ task_id: int,
+ user: User = Depends(get_current_user),
+ db: Session = Depends(get_db)
+):
+ """获取任务执行统计"""
+ # 总执行次数
+ total_result = db.execute(
+ text("SELECT COUNT(*) FROM platform_task_logs WHERE task_id = :task_id"),
+ {"task_id": task_id}
+ )
+ total = total_result.scalar() or 0
+
+ # 成功次数
+ success_result = db.execute(
+ text("SELECT COUNT(*) FROM platform_task_logs WHERE task_id = :task_id AND status = 'success'"),
+ {"task_id": task_id}
+ )
+ success = success_result.scalar() or 0
+
+ # 失败次数
+ failed = total - success
+
+ # 成功率
+ success_rate = round(success / total * 100, 1) if total > 0 else 0
+
+ # 平均耗时(成功的任务)
+ avg_result = db.execute(
+ text("""
+ SELECT AVG(TIMESTAMPDIFF(SECOND, started_at, finished_at))
+ FROM platform_task_logs
+ WHERE task_id = :task_id AND status = 'success' AND finished_at IS NOT NULL
+ """),
+ {"task_id": task_id}
+ )
+ avg_duration = avg_result.scalar()
+ avg_duration = round(float(avg_duration), 1) if avg_duration else 0
+
+ # 最近7天趋势
+ trend_result = db.execute(
+ text("""
+ SELECT DATE(started_at) as date,
+ SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success,
+ SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
+ FROM platform_task_logs
+ WHERE task_id = :task_id AND started_at >= DATE_SUB(CURDATE(), INTERVAL 7 DAY)
+ GROUP BY DATE(started_at)
+ ORDER BY date
+ """),
+ {"task_id": task_id}
+ )
+ trend = [dict(row) for row in trend_result.mappings().all()]
+
+ return {
+ "total": total,
+ "success": success,
+ "failed": failed,
+ "success_rate": success_rate,
+ "avg_duration": avg_duration,
+ "trend": trend
+ }
diff --git a/backend/app/services/scheduler.py b/backend/app/services/scheduler.py
index f03c059..91da8c5 100644
--- a/backend/app/services/scheduler.py
+++ b/backend/app/services/scheduler.py
@@ -31,10 +31,81 @@ def get_db_session() -> Session:
return SessionLocal()
+async def send_alert(webhook: str, task_name: str, error_message: str):
+ """发送失败告警通知"""
+ try:
+ # 自动判断钉钉或企微
+ if "dingtalk" in webhook or "oapi.dingtalk.com" in webhook:
+ data = {
+ "msgtype": "markdown",
+ "markdown": {
+ "title": "定时任务执行失败",
+ "text": f"### ⚠️ 定时任务执行失败\n\n**任务名称**:{task_name}\n\n**错误信息**:{error_message[:500]}\n\n**时间**:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
+ }
+ }
+ else:
+ # 企微格式
+ data = {
+ "msgtype": "markdown",
+ "markdown": {
+ "content": f"### ⚠️ 定时任务执行失败\n\n**任务名称**:{task_name}\n\n**错误信息**:{error_message[:500]}\n\n**时间**:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
+ }
+ }
+
+ async with httpx.AsyncClient(timeout=30.0) as client:
+ await client.post(webhook, json=data)
+ logger.info(f"Alert sent for task {task_name}")
+ except Exception as e:
+ logger.warning(f"Failed to send alert: {e}")
+
+
+async def execute_task_with_retry(task_id: int, retry_count: int = 0, max_retries: int = 0, retry_interval: int = 60):
+ """带重试的任务执行"""
+ success = await execute_task_once(task_id)
+
+ if not success and retry_count < max_retries:
+ logger.info(f"Task {task_id} failed, scheduling retry {retry_count + 1}/{max_retries} in {retry_interval}s")
+ await asyncio.sleep(retry_interval)
+ await execute_task_with_retry(task_id, retry_count + 1, max_retries, retry_interval)
+ elif not success:
+ # 所有重试都失败,发送告警
+ db = get_db_session()
+ try:
+ result = db.execute(
+ text("SELECT task_name, alert_on_failure, alert_webhook, last_run_message FROM platform_scheduled_tasks WHERE id = :id"),
+ {"id": task_id}
+ )
+ task = result.mappings().first()
+ if task and task["alert_on_failure"] and task["alert_webhook"]:
+ await send_alert(task["alert_webhook"], task["task_name"], task["last_run_message"] or "未知错误")
+ finally:
+ db.close()
+
+
async def execute_task(task_id: int):
- """执行定时任务"""
+ """执行定时任务入口(处理重试配置)"""
+ db = get_db_session()
+ try:
+ result = db.execute(
+ text("SELECT retry_count, retry_interval FROM platform_scheduled_tasks WHERE id = :id"),
+ {"id": task_id}
+ )
+ task = result.mappings().first()
+ if task:
+ max_retries = task.get("retry_count", 0) or 0
+ retry_interval = task.get("retry_interval", 60) or 60
+ await execute_task_with_retry(task_id, 0, max_retries, retry_interval)
+ else:
+ await execute_task_once(task_id)
+ finally:
+ db.close()
+
+
+async def execute_task_once(task_id: int) -> bool:
+ """执行一次定时任务,返回是否成功"""
db = get_db_session()
log_id = None
+ success = False
try:
# 1. 查询任务配置
@@ -46,7 +117,7 @@ async def execute_task(task_id: int):
if not task:
logger.warning(f"Task {task_id} not found or disabled")
- return
+ return True # 不需要重试
# 2. 更新任务状态为运行中
db.execute(
@@ -161,9 +232,11 @@ async def execute_task(task_id: int):
db.commit()
logger.info(f"Task {task_id} executed with status: {status}")
+ success = (status == "success")
except Exception as e:
logger.error(f"Task {task_id} execution error: {str(e)}")
+ success = False
# 更新失败状态
try:
@@ -190,6 +263,8 @@ async def execute_task(task_id: int):
pass
finally:
db.close()
+
+ return success
def add_task_to_scheduler(task: Dict[str, Any]):
diff --git a/backend/app/services/script_sdk.py b/backend/app/services/script_sdk.py
index 7e63886..b3e09bb 100644
--- a/backend/app/services/script_sdk.py
+++ b/backend/app/services/script_sdk.py
@@ -24,6 +24,8 @@ class ScriptSDK:
- HTTP 请求
- 变量存储(跨执行持久化)
- 日志记录
+ - 多租户遍历
+ - 密钥管理
"""
def __init__(self, tenant_id: str, task_id: int, trace_id: str = None):
@@ -32,6 +34,7 @@ class ScriptSDK:
self.trace_id = trace_id or f"script_{task_id}_{datetime.now().strftime('%Y%m%d%H%M%S')}"
self._logs: List[str] = []
self._db: Optional[Session] = None
+ self._tenants_cache: Optional[List[Dict]] = None # 租户列表缓存
def _get_db(self) -> Session:
"""获取数据库会话"""
@@ -393,6 +396,171 @@ class ScriptSDK:
self.log(f"删除变量失败: {str(e)}", level="ERROR")
return False
+ # ============ 多租户遍历 ============
+
+ def get_tenants(self, app_code: str = None) -> List[Dict]:
+ """
+ 获取租户列表(用于多租户任务遍历)
+
+ Args:
+ app_code: 应用代码(可选),筛选订阅了该应用的租户
+
+ Returns:
+ 租户列表 [{"code": "xxx", "name": "租户名", "custom_configs": {...}}]
+ """
+ db = self._get_db()
+ try:
+ if app_code:
+ # 筛选订阅了指定应用的租户
+ result = db.execute(
+ text("""
+ SELECT DISTINCT t.code, t.name, ta.custom_configs
+ FROM platform_tenants t
+ INNER JOIN platform_tenant_apps ta ON t.code = ta.tenant_id
+ WHERE ta.app_code = :app_code AND t.status = 1
+ """),
+ {"app_code": app_code}
+ )
+ else:
+ # 获取所有启用的租户
+ result = db.execute(
+ text("SELECT code, name FROM platform_tenants WHERE status = 1")
+ )
+
+ tenants = []
+ for row in result.mappings().all():
+ tenant = dict(row)
+ # 解析 custom_configs
+ if "custom_configs" in tenant and tenant["custom_configs"]:
+ try:
+ tenant["custom_configs"] = json.loads(tenant["custom_configs"])
+ except:
+ pass
+ tenants.append(tenant)
+
+ self.log(f"获取租户列表成功,共 {len(tenants)} 个租户")
+ return tenants
+ except Exception as e:
+ self.log(f"获取租户列表失败: {str(e)}", level="ERROR")
+ return []
+
+ def get_tenant_config(self, tenant_id: str, app_code: str, key: str = None) -> Any:
+ """
+ 获取指定租户的应用配置
+
+ Args:
+ tenant_id: 租户ID
+ app_code: 应用代码
+ key: 配置项键名(可选,不传返回全部配置)
+
+ Returns:
+ 配置值或配置字典
+ """
+ db = self._get_db()
+ try:
+ result = db.execute(
+ text("""
+ SELECT custom_configs FROM platform_tenant_apps
+ WHERE tenant_id = :tenant_id AND app_code = :app_code
+ """),
+ {"tenant_id": tenant_id, "app_code": app_code}
+ )
+ row = result.first()
+
+ if not row or not row[0]:
+ return None if key else {}
+
+ try:
+ configs = json.loads(row[0])
+ except:
+ configs = {}
+
+ if key:
+ return configs.get(key)
+ return configs
+ except Exception as e:
+ self.log(f"获取租户配置失败: {str(e)}", level="ERROR")
+ return None if key else {}
+
+ def get_all_tenant_configs(self, app_code: str) -> List[Dict]:
+ """
+ 获取所有租户的应用配置(便捷方法,用于批量操作)
+
+ Args:
+ app_code: 应用代码
+
+ Returns:
+ [{"tenant_id": "xxx", "tenant_name": "租户名", "configs": {...}}]
+ """
+ db = self._get_db()
+ try:
+ result = db.execute(
+ text("""
+ SELECT t.code as tenant_id, t.name as tenant_name, ta.custom_configs
+ FROM platform_tenants t
+ INNER JOIN platform_tenant_apps ta ON t.code = ta.tenant_id
+ WHERE ta.app_code = :app_code AND t.status = 1
+ """),
+ {"app_code": app_code}
+ )
+
+ tenants = []
+ for row in result.mappings().all():
+ configs = {}
+ if row["custom_configs"]:
+ try:
+ configs = json.loads(row["custom_configs"])
+ except:
+ pass
+ tenants.append({
+ "tenant_id": row["tenant_id"],
+ "tenant_name": row["tenant_name"],
+ "configs": configs
+ })
+
+ self.log(f"获取 {app_code} 应用的租户配置,共 {len(tenants)} 个")
+ return tenants
+ except Exception as e:
+ self.log(f"获取租户配置失败: {str(e)}", level="ERROR")
+ return []
+
+ # ============ 密钥管理 ============
+
+ def get_secret(self, key: str) -> Optional[str]:
+ """
+ 获取密钥(优先读取租户级密钥,其次读取全局密钥)
+
+ Args:
+ key: 密钥名称
+
+ Returns:
+ 密钥值(如不存在返回 None)
+ """
+ db = self._get_db()
+ try:
+ # 优先查询租户级密钥
+ result = db.execute(
+ text("""
+ SELECT secret_value FROM platform_secrets
+ WHERE (tenant_id = :tenant_id OR tenant_id IS NULL)
+ AND secret_key = :key
+ ORDER BY tenant_id DESC
+ LIMIT 1
+ """),
+ {"tenant_id": self.tenant_id, "key": key}
+ )
+ row = result.first()
+
+ if row:
+ self.log(f"获取密钥成功: {key}")
+ return row[0]
+
+ self.log(f"密钥不存在: {key}", level="WARN")
+ return None
+ except Exception as e:
+ self.log(f"获取密钥失败: {str(e)}", level="ERROR")
+ return None
+
# ============ 日志 ============
def log(self, message: str, level: str = "INFO"):
diff --git a/frontend/src/views/scheduled-tasks/index.vue b/frontend/src/views/scheduled-tasks/index.vue
index d1dfc0a..a9efda9 100644
--- a/frontend/src/views/scheduled-tasks/index.vue
+++ b/frontend/src/views/scheduled-tasks/index.vue
@@ -36,9 +36,30 @@ const form = reactive({
webhook_url: '',
input_params: '',
script_content: '',
- is_enabled: true
+ is_enabled: true,
+ // 重试和告警
+ retry_count: 0,
+ retry_interval: 60,
+ alert_on_failure: false,
+ alert_webhook: ''
})
+// 模板
+const templateList = ref([])
+const templateDialogVisible = ref(false)
+
+// 版本
+const versionsDialogVisible = ref(false)
+const versionsLoading = ref(false)
+const versionsList = ref([])
+const currentVersionTaskId = ref(null)
+
+// 统计
+const statsDialogVisible = ref(false)
+const statsLoading = ref(false)
+const statsData = ref(null)
+const currentStatsTaskId = ref(null)
+
// 时间选择器
const newTimePoint = ref('')
@@ -149,7 +170,11 @@ function handleCreate() {
webhook_url: '',
input_params: '',
script_content: '',
- is_enabled: true
+ is_enabled: true,
+ retry_count: 0,
+ retry_interval: 60,
+ alert_on_failure: false,
+ alert_webhook: ''
})
newTimePoint.value = ''
testResult.value = null
@@ -170,7 +195,11 @@ function handleEdit(row) {
webhook_url: row.webhook_url || '',
input_params: row.input_params ? JSON.stringify(row.input_params, null, 2) : '',
script_content: row.script_content || '',
- is_enabled: row.is_enabled
+ is_enabled: row.is_enabled,
+ retry_count: row.retry_count || 0,
+ retry_interval: row.retry_interval || 60,
+ alert_on_failure: !!row.alert_on_failure,
+ alert_webhook: row.alert_webhook || ''
})
newTimePoint.value = ''
testResult.value = null
@@ -237,7 +266,11 @@ async function handleSubmit() {
webhook_url: form.execution_type === 'webhook' ? form.webhook_url : null,
script_content: form.execution_type === 'script' ? form.script_content : null,
input_params: inputParams,
- is_enabled: form.is_enabled
+ is_enabled: form.is_enabled,
+ retry_count: form.retry_count,
+ retry_interval: form.retry_interval,
+ alert_on_failure: form.alert_on_failure,
+ alert_webhook: form.alert_webhook || null
}
try {
@@ -379,6 +412,86 @@ function selectTenant(code) {
handleSearch()
}
+// ============ 模板功能 ============
+async function fetchTemplates() {
+ try {
+ const res = await api.get('/api/scheduled-tasks/templates')
+ templateList.value = res.data.items || []
+ } catch (e) {
+ console.error('获取模板列表失败:', e)
+ }
+}
+
+function handleSelectTemplate() {
+ fetchTemplates()
+ templateDialogVisible.value = true
+}
+
+function applyTemplate(template) {
+ form.script_content = template.script_content
+ templateDialogVisible.value = false
+ ElMessage.success(`已应用模板:${template.name}`)
+}
+
+// ============ 版本功能 ============
+async function handleViewVersions(row) {
+ currentVersionTaskId.value = row.id
+ versionsLoading.value = true
+ versionsDialogVisible.value = true
+
+ try {
+ const res = await api.get(`/api/scheduled-tasks/${row.id}/versions`)
+ versionsList.value = res.data.items || []
+ } catch (e) {
+ console.error(e)
+ } finally {
+ versionsLoading.value = false
+ }
+}
+
+async function handleRollback(version) {
+ await ElMessageBox.confirm(`确定回滚到版本 ${version} 吗?`, '版本回滚', {
+ type: 'warning'
+ })
+
+ try {
+ const res = await api.post(`/api/scheduled-tasks/${currentVersionTaskId.value}/versions/${version}/rollback`)
+ ElMessage.success(res.data.message)
+ versionsDialogVisible.value = false
+ fetchList()
+ } catch (e) {
+ // 错误已在拦截器处理
+ }
+}
+
+async function handleViewVersionContent(version) {
+ try {
+ const res = await api.get(`/api/scheduled-tasks/${currentVersionTaskId.value}/versions/${version}`)
+ ElMessageBox.alert(res.data.script_content, `版本 ${version} 脚本内容`, {
+ customClass: 'version-content-dialog',
+ dangerouslyUseHTMLString: false
+ })
+ } catch (e) {
+ // 错误已在拦截器处理
+ }
+}
+
+// ============ 统计功能 ============
+async function handleViewStats(row) {
+ currentStatsTaskId.value = row.id
+ statsLoading.value = true
+ statsDialogVisible.value = true
+
+ try {
+ const res = await api.get(`/api/scheduled-tasks/${row.id}/stats`)
+ statsData.value = res.data
+ } catch (e) {
+ console.error(e)
+ } finally {
+ statsLoading.value = false
+ }
+}
+
onMounted(() => {
fetchTenants()
fetchList()
@@ -485,9 +598,20 @@ onMounted(() => {
{{ row.is_enabled ? '禁用' : '启用' }}
-
日志
+
+ 统计
+
+
+ 版本
+
删除
@@ -613,9 +737,14 @@ onMounted(() => {
@@ -897,4 +1158,142 @@ log('执行完成')"
line-height: 1.5;
overflow-x: auto;
}
+
+/* 重试配置 */
+.retry-config {
+ display: flex;
+ align-items: center;
+ gap: 8px;
+ color: #606266;
+ font-size: 14px;
+}
+
+/* 模板列表 */
+.template-list {
+ max-height: 400px;
+ overflow-y: auto;
+}
+
+.template-item {
+ padding: 12px 16px;
+ border: 1px solid #e4e7ed;
+ border-radius: 8px;
+ margin-bottom: 8px;
+ cursor: pointer;
+ transition: all 0.2s;
+}
+
+.template-item:hover {
+ border-color: #409eff;
+ background: #f5f7fa;
+}
+
+.template-name {
+ font-weight: 600;
+ color: #303133;
+ margin-bottom: 4px;
+}
+
+.template-desc {
+ font-size: 12px;
+ color: #909399;
+ margin-bottom: 8px;
+}
+
+.empty-tip {
+ text-align: center;
+ color: #909399;
+ padding: 40px 0;
+}
+
+/* 统计 */
+.stats-grid {
+ display: grid;
+ grid-template-columns: repeat(5, 1fr);
+ gap: 16px;
+ margin-bottom: 24px;
+}
+
+.stats-item {
+ text-align: center;
+ padding: 16px;
+ background: #f5f7fa;
+ border-radius: 8px;
+}
+
+.stats-item.success {
+ background: #f0f9eb;
+}
+
+.stats-item.danger {
+ background: #fef0f0;
+}
+
+.stats-value {
+ font-size: 24px;
+ font-weight: 600;
+ color: #303133;
+ margin-bottom: 4px;
+}
+
+.stats-item.success .stats-value {
+ color: #67c23a;
+}
+
+.stats-item.danger .stats-value {
+ color: #f56c6c;
+}
+
+.stats-label {
+ font-size: 12px;
+ color: #909399;
+}
+
+.stats-trend h4 {
+ margin: 0 0 12px;
+ color: #303133;
+}
+
+.trend-chart {
+ display: flex;
+ gap: 8px;
+ align-items: flex-end;
+ height: 100px;
+}
+
+.trend-day {
+ flex: 1;
+ display: flex;
+ flex-direction: column;
+ align-items: center;
+}
+
+.trend-bars {
+ display: flex;
+ flex-direction: column;
+ width: 100%;
+ height: 80px;
+ background: #f5f7fa;
+ border-radius: 4px;
+ overflow: hidden;
+}
+
+.trend-bar {
+ width: 100%;
+ transition: height 0.3s;
+}
+
+.trend-bar.success {
+ background: #67c23a;
+}
+
+.trend-bar.danger {
+ background: #f56c6c;
+}
+
+.trend-date {
+ font-size: 10px;
+ color: #909399;
+ margin-top: 4px;
+}