Files
000-platform/backend/app/routers/tasks.py
Admin 9b72e6127f
Some checks failed
continuous-integration/drone/push Build is failing
feat: 脚本执行平台增强功能
- 新增重试和失败告警功能(支持自动重试N次,失败后钉钉/企微通知)
- 新增密钥管理(安全存储API Key等敏感信息)
- 新增脚本模板库(预置常用脚本模板)
- 新增脚本版本管理(自动保存历史版本,支持回滚)
- 新增执行统计(成功率、平均耗时、7日趋势)
- SDK 新增多租户遍历能力(get_tenants/get_tenant_config/get_all_tenant_configs)
- SDK 新增密钥读取方法(get_secret)
2026-01-28 11:59:50 +08:00

947 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""定时任务管理路由"""
import asyncio
from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, List
from sqlalchemy.orm import Session
from sqlalchemy import text
from ..database import get_db
from .auth import get_current_user, require_operator
from ..models.user import User
from ..services.scheduler import (
add_task_to_scheduler,
remove_task_from_scheduler,
reload_task,
execute_task
)
router = APIRouter(prefix="/scheduled-tasks", tags=["定时任务"])
# Schemas
class TaskCreate(BaseModel):
tenant_id: str
task_name: str
task_desc: Optional[str] = None
schedule_type: str = "simple" # simple | cron
time_points: Optional[List[str]] = None # ["09:00", "14:00"]
cron_expression: Optional[str] = None # "0 9,14 * * *"
execution_type: str = "webhook" # webhook | script
webhook_url: Optional[str] = None
input_params: Optional[dict] = None
script_content: Optional[str] = None
is_enabled: bool = True
# 重试和告警
retry_count: int = 0
retry_interval: int = 60
alert_on_failure: bool = False
alert_webhook: Optional[str] = None
class TaskUpdate(BaseModel):
task_name: Optional[str] = None
task_desc: Optional[str] = None
schedule_type: Optional[str] = None
time_points: Optional[List[str]] = None
cron_expression: Optional[str] = None
execution_type: Optional[str] = None
webhook_url: Optional[str] = None
input_params: Optional[dict] = None
script_content: Optional[str] = None
# 重试和告警
retry_count: Optional[int] = None
retry_interval: Optional[int] = None
alert_on_failure: Optional[bool] = None
alert_webhook: Optional[str] = None
class ScriptTestRequest(BaseModel):
tenant_id: str
script_content: str
# 密钥管理
class SecretCreate(BaseModel):
tenant_id: Optional[str] = None # None 表示全局
secret_key: str
secret_value: str
description: Optional[str] = None
class SecretUpdate(BaseModel):
secret_value: Optional[str] = None
description: Optional[str] = None
# 脚本模板
class TemplateCreate(BaseModel):
name: str
description: Optional[str] = None
category: Optional[str] = None
script_content: str
is_public: bool = True
class TemplateUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
category: Optional[str] = None
script_content: Optional[str] = None
is_public: Optional[bool] = None
# API Endpoints
@router.get("")
async def list_tasks(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100),
tenant_id: Optional[str] = None,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取定时任务列表"""
# 构建查询
where_clauses = []
params = {}
if tenant_id:
where_clauses.append("tenant_id = :tenant_id")
params["tenant_id"] = tenant_id
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
# 查询总数
count_result = db.execute(
text(f"SELECT COUNT(*) FROM platform_scheduled_tasks WHERE {where_sql}"),
params
)
total = count_result.scalar()
# 查询列表
params["offset"] = (page - 1) * size
params["limit"] = size
result = db.execute(
text(f"""
SELECT t.*, tn.name as tenant_name
FROM platform_scheduled_tasks t
LEFT JOIN platform_tenants tn ON t.tenant_id = tn.code
WHERE {where_sql}
ORDER BY t.id DESC
LIMIT :limit OFFSET :offset
"""),
params
)
tasks = [dict(row) for row in result.mappings().all()]
return {
"total": total,
"page": page,
"size": size,
"items": tasks
}
@router.post("")
async def create_task(
data: TaskCreate,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""创建定时任务"""
# 验证调度配置
if data.schedule_type == "simple":
if not data.time_points or len(data.time_points) == 0:
raise HTTPException(status_code=400, detail="简单模式需要至少一个时间点")
elif data.schedule_type == "cron":
if not data.cron_expression:
raise HTTPException(status_code=400, detail="CRON模式需要提供表达式")
# 验证执行配置
if data.execution_type == "webhook":
if not data.webhook_url:
raise HTTPException(status_code=400, detail="Webhook模式需要提供URL")
elif data.execution_type == "script":
if not data.script_content:
raise HTTPException(status_code=400, detail="脚本模式需要提供脚本内容")
# 插入数据库
import json
time_points_json = json.dumps(data.time_points) if data.time_points else None
input_params_json = json.dumps(data.input_params) if data.input_params else None
db.execute(
text("""
INSERT INTO platform_scheduled_tasks
(tenant_id, task_name, task_desc, schedule_type, time_points,
cron_expression, execution_type, webhook_url, input_params, script_content, is_enabled,
retry_count, retry_interval, alert_on_failure, alert_webhook)
VALUES (:tenant_id, :task_name, :task_desc, :schedule_type, :time_points,
:cron_expression, :execution_type, :webhook_url, :input_params, :script_content, :is_enabled,
:retry_count, :retry_interval, :alert_on_failure, :alert_webhook)
"""),
{
"tenant_id": data.tenant_id,
"task_name": data.task_name,
"task_desc": data.task_desc,
"schedule_type": data.schedule_type,
"time_points": time_points_json,
"cron_expression": data.cron_expression,
"execution_type": data.execution_type,
"webhook_url": data.webhook_url,
"input_params": input_params_json,
"script_content": data.script_content,
"is_enabled": 1 if data.is_enabled else 0,
"retry_count": data.retry_count,
"retry_interval": data.retry_interval,
"alert_on_failure": 1 if data.alert_on_failure else 0,
"alert_webhook": data.alert_webhook
}
)
db.commit()
# 获取新插入的ID
result = db.execute(text("SELECT LAST_INSERT_ID() as id"))
task_id = result.scalar()
# 如果启用,添加到调度器
if data.is_enabled:
reload_task(task_id)
return {"id": task_id, "message": "创建成功"}
@router.get("/{task_id}")
async def get_task(
task_id: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取任务详情"""
result = db.execute(
text("""
SELECT t.*, tn.name as tenant_name
FROM platform_scheduled_tasks t
LEFT JOIN platform_tenants tn ON t.tenant_id = tn.code
WHERE t.id = :id
"""),
{"id": task_id}
)
task = result.mappings().first()
if not task:
raise HTTPException(status_code=404, detail="任务不存在")
return dict(task)
@router.put("/{task_id}")
async def update_task(
task_id: int,
data: TaskUpdate,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""更新定时任务"""
# 检查任务是否存在
result = db.execute(
text("SELECT * FROM platform_scheduled_tasks WHERE id = :id"),
{"id": task_id}
)
task = result.mappings().first()
if not task:
raise HTTPException(status_code=404, detail="任务不存在")
# 构建更新语句
import json
updates = []
params = {"id": task_id}
if data.task_name is not None:
updates.append("task_name = :task_name")
params["task_name"] = data.task_name
if data.task_desc is not None:
updates.append("task_desc = :task_desc")
params["task_desc"] = data.task_desc
if data.schedule_type is not None:
updates.append("schedule_type = :schedule_type")
params["schedule_type"] = data.schedule_type
if data.time_points is not None:
updates.append("time_points = :time_points")
params["time_points"] = json.dumps(data.time_points)
if data.cron_expression is not None:
updates.append("cron_expression = :cron_expression")
params["cron_expression"] = data.cron_expression
if data.execution_type is not None:
updates.append("execution_type = :execution_type")
params["execution_type"] = data.execution_type
if data.webhook_url is not None:
updates.append("webhook_url = :webhook_url")
params["webhook_url"] = data.webhook_url
if data.script_content is not None:
updates.append("script_content = :script_content")
params["script_content"] = data.script_content
if data.input_params is not None:
updates.append("input_params = :input_params")
params["input_params"] = json.dumps(data.input_params)
if data.retry_count is not None:
updates.append("retry_count = :retry_count")
params["retry_count"] = data.retry_count
if data.retry_interval is not None:
updates.append("retry_interval = :retry_interval")
params["retry_interval"] = data.retry_interval
if data.alert_on_failure is not None:
updates.append("alert_on_failure = :alert_on_failure")
params["alert_on_failure"] = 1 if data.alert_on_failure else 0
if data.alert_webhook is not None:
updates.append("alert_webhook = :alert_webhook")
params["alert_webhook"] = data.alert_webhook
# 如果更新了脚本内容,自动保存版本
if data.script_content is not None and data.script_content.strip():
# 获取当前最大版本号
version_result = db.execute(
text("SELECT COALESCE(MAX(version), 0) FROM platform_script_versions WHERE task_id = :task_id"),
{"task_id": task_id}
)
max_version = version_result.scalar() or 0
new_version = max_version + 1
# 插入新版本
db.execute(
text("""
INSERT INTO platform_script_versions (task_id, version, script_content, created_by)
VALUES (:task_id, :version, :script_content, :created_by)
"""),
{
"task_id": task_id,
"version": new_version,
"script_content": data.script_content,
"created_by": user.username if hasattr(user, 'username') else None
}
)
if updates:
db.execute(
text(f"UPDATE platform_scheduled_tasks SET {', '.join(updates)} WHERE id = :id"),
params
)
db.commit()
# 重新加载调度器中的任务
reload_task(task_id)
return {"message": "更新成功"}
@router.delete("/{task_id}")
async def delete_task(
task_id: int,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""删除定时任务"""
# 检查任务是否存在
result = db.execute(
text("SELECT id FROM platform_scheduled_tasks WHERE id = :id"),
{"id": task_id}
)
if not result.scalar():
raise HTTPException(status_code=404, detail="任务不存在")
# 从调度器移除
remove_task_from_scheduler(task_id)
# 删除日志
db.execute(
text("DELETE FROM platform_task_logs WHERE task_id = :id"),
{"id": task_id}
)
# 删除任务
db.execute(
text("DELETE FROM platform_scheduled_tasks WHERE id = :id"),
{"id": task_id}
)
db.commit()
return {"message": "删除成功"}
@router.post("/{task_id}/toggle")
async def toggle_task(
task_id: int,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""启用/禁用任务"""
# 获取当前状态
result = db.execute(
text("SELECT is_enabled FROM platform_scheduled_tasks WHERE id = :id"),
{"id": task_id}
)
row = result.first()
if not row:
raise HTTPException(status_code=404, detail="任务不存在")
current_enabled = row[0]
new_enabled = 0 if current_enabled else 1
# 更新状态
db.execute(
text("UPDATE platform_scheduled_tasks SET is_enabled = :enabled WHERE id = :id"),
{"id": task_id, "enabled": new_enabled}
)
db.commit()
# 更新调度器
reload_task(task_id)
return {
"is_enabled": bool(new_enabled),
"message": "已启用" if new_enabled else "已禁用"
}
@router.post("/{task_id}/run")
async def run_task_now(
task_id: int,
background_tasks: BackgroundTasks,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""手动执行任务"""
# 检查任务是否存在
result = db.execute(
text("SELECT id FROM platform_scheduled_tasks WHERE id = :id"),
{"id": task_id}
)
if not result.scalar():
raise HTTPException(status_code=404, detail="任务不存在")
# 在后台执行任务
background_tasks.add_task(asyncio.create_task, execute_task(task_id))
return {"message": "任务已触发执行"}
@router.get("/{task_id}/logs")
async def get_task_logs(
task_id: int,
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100),
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取任务执行日志"""
# 查询总数
count_result = db.execute(
text("SELECT COUNT(*) FROM platform_task_logs WHERE task_id = :task_id"),
{"task_id": task_id}
)
total = count_result.scalar()
# 查询日志
result = db.execute(
text("""
SELECT * FROM platform_task_logs
WHERE task_id = :task_id
ORDER BY id DESC
LIMIT :limit OFFSET :offset
"""),
{"task_id": task_id, "limit": size, "offset": (page - 1) * size}
)
logs = [dict(row) for row in result.mappings().all()]
return {
"total": total,
"page": page,
"size": size,
"items": logs
}
@router.post("/test-script")
async def test_script(
data: ScriptTestRequest,
user: User = Depends(require_operator)
):
"""测试执行脚本(不记录日志)"""
from ..services.script_executor import test_script as run_test
if not data.script_content or not data.script_content.strip():
raise HTTPException(status_code=400, detail="脚本内容不能为空")
result = await run_test(
tenant_id=data.tenant_id,
script_content=data.script_content
)
return result.to_dict()
@router.get("/sdk-docs")
async def get_sdk_docs():
"""获取 SDK 文档"""
return {
"description": "脚本执行 SDK 文档",
"methods": [
{
"name": "ai(prompt, system=None, model='gemini-2.5-flash')",
"description": "调用大模型生成内容",
"example": "result = ai('帮我写一段营销文案')"
},
{
"name": "dingtalk(webhook, content, msg_type='text', at_mobiles=None)",
"description": "发送钉钉群消息",
"example": "dingtalk('https://oapi.dingtalk.com/robot/send?access_token=xxx', '消息内容')"
},
{
"name": "wecom(webhook, content, msg_type='text')",
"description": "发送企业微信群消息",
"example": "wecom('https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx', '消息内容')"
},
{
"name": "db(sql, params=None)",
"description": "执行 SQL 查询(仅支持 SELECT",
"example": "rows = db('SELECT * FROM users WHERE id = :id', {'id': 1})"
},
{
"name": "http_get(url, headers=None, params=None)",
"description": "发送 HTTP GET 请求",
"example": "data = http_get('https://api.example.com/data')"
},
{
"name": "http_post(url, data=None, json_data=None, headers=None)",
"description": "发送 HTTP POST 请求",
"example": "data = http_post('https://api.example.com/submit', json_data={'key': 'value'})"
},
{
"name": "get_var(key, default=None)",
"description": "获取存储的变量(跨执行持久化)",
"example": "count = get_var('run_count', 0)"
},
{
"name": "set_var(key, value)",
"description": "存储变量(跨执行持久化)",
"example": "set_var('run_count', count + 1)"
},
{
"name": "log(message, level='INFO')",
"description": "记录日志",
"example": "log('任务执行完成')"
},
{
"name": "get_tenants(app_code=None)",
"description": "获取租户列表(多租户任务遍历)",
"example": "tenants = get_tenants('review-generator')"
},
{
"name": "get_tenant_config(tenant_id, app_code, key=None)",
"description": "获取指定租户的应用配置",
"example": "webhook = get_tenant_config('tenant1', 'my-app', 'dingtalk_webhook')"
},
{
"name": "get_all_tenant_configs(app_code)",
"description": "批量获取所有租户的应用配置",
"example": "all_configs = get_all_tenant_configs('my-app')"
},
{
"name": "get_secret(key)",
"description": "获取密钥(优先租户级,其次全局)",
"example": "api_key = get_secret('openai_api_key')"
}
],
"example_script": '''# 示例:多租户批量推送
import json
# 获取所有订阅了某应用的租户配置
tenants = get_all_tenant_configs('daily-report')
for tenant in tenants:
tenant_id = tenant['tenant_id']
tenant_name = tenant['tenant_name']
configs = tenant['configs']
# 获取该租户的钉钉 Webhook
webhook = configs.get('dingtalk_webhook')
if not webhook:
log(f"租户 {tenant_name} 未配置 webhook跳过")
continue
# 调用 AI 生成内容
content = ai(f"{tenant_name} 生成今日报告", system="你是报告生成专家")
# 发送
dingtalk(webhook=webhook, content=content)
log(f"已发送给租户: {tenant_name}")
'''
}
# ============ 密钥管理 ============
@router.get("/secrets")
async def list_secrets(
tenant_id: Optional[str] = None,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""获取密钥列表"""
if tenant_id:
result = db.execute(
text("SELECT id, tenant_id, secret_key, description, created_at FROM platform_secrets WHERE tenant_id = :tenant_id OR tenant_id IS NULL ORDER BY tenant_id, secret_key"),
{"tenant_id": tenant_id}
)
else:
result = db.execute(
text("SELECT id, tenant_id, secret_key, description, created_at FROM platform_secrets ORDER BY tenant_id, secret_key")
)
secrets = [dict(row) for row in result.mappings().all()]
return {"items": secrets}
@router.post("/secrets")
async def create_secret(
data: SecretCreate,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""创建密钥"""
# 检查是否已存在
result = db.execute(
text("SELECT id FROM platform_secrets WHERE tenant_id <=> :tenant_id AND secret_key = :key"),
{"tenant_id": data.tenant_id, "key": data.secret_key}
)
if result.scalar():
raise HTTPException(status_code=400, detail="密钥已存在")
db.execute(
text("""
INSERT INTO platform_secrets (tenant_id, secret_key, secret_value, description)
VALUES (:tenant_id, :key, :value, :desc)
"""),
{
"tenant_id": data.tenant_id,
"key": data.secret_key,
"value": data.secret_value,
"desc": data.description
}
)
db.commit()
return {"message": "创建成功"}
@router.put("/secrets/{secret_id}")
async def update_secret(
secret_id: int,
data: SecretUpdate,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""更新密钥"""
updates = []
params = {"id": secret_id}
if data.secret_value is not None:
updates.append("secret_value = :value")
params["value"] = data.secret_value
if data.description is not None:
updates.append("description = :desc")
params["desc"] = data.description
if updates:
db.execute(
text(f"UPDATE platform_secrets SET {', '.join(updates)} WHERE id = :id"),
params
)
db.commit()
return {"message": "更新成功"}
@router.delete("/secrets/{secret_id}")
async def delete_secret(
secret_id: int,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""删除密钥"""
db.execute(text("DELETE FROM platform_secrets WHERE id = :id"), {"id": secret_id})
db.commit()
return {"message": "删除成功"}
# ============ 脚本模板 ============
@router.get("/templates")
async def list_templates(
category: Optional[str] = None,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取模板列表"""
if category:
result = db.execute(
text("SELECT id, name, description, category, is_public, created_by, created_at FROM platform_script_templates WHERE category = :category ORDER BY id DESC"),
{"category": category}
)
else:
result = db.execute(
text("SELECT id, name, description, category, is_public, created_by, created_at FROM platform_script_templates ORDER BY id DESC")
)
templates = [dict(row) for row in result.mappings().all()]
return {"items": templates}
@router.get("/templates/{template_id}")
async def get_template(
template_id: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取模板详情"""
result = db.execute(
text("SELECT * FROM platform_script_templates WHERE id = :id"),
{"id": template_id}
)
template = result.mappings().first()
if not template:
raise HTTPException(status_code=404, detail="模板不存在")
return dict(template)
@router.post("/templates")
async def create_template(
data: TemplateCreate,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""创建模板"""
db.execute(
text("""
INSERT INTO platform_script_templates (name, description, category, script_content, is_public, created_by)
VALUES (:name, :desc, :category, :content, :is_public, :created_by)
"""),
{
"name": data.name,
"desc": data.description,
"category": data.category,
"content": data.script_content,
"is_public": 1 if data.is_public else 0,
"created_by": user.username if hasattr(user, 'username') else None
}
)
db.commit()
result = db.execute(text("SELECT LAST_INSERT_ID() as id"))
template_id = result.scalar()
return {"id": template_id, "message": "创建成功"}
@router.put("/templates/{template_id}")
async def update_template(
template_id: int,
data: TemplateUpdate,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""更新模板"""
updates = []
params = {"id": template_id}
if data.name is not None:
updates.append("name = :name")
params["name"] = data.name
if data.description is not None:
updates.append("description = :desc")
params["desc"] = data.description
if data.category is not None:
updates.append("category = :category")
params["category"] = data.category
if data.script_content is not None:
updates.append("script_content = :content")
params["content"] = data.script_content
if data.is_public is not None:
updates.append("is_public = :is_public")
params["is_public"] = 1 if data.is_public else 0
if updates:
db.execute(
text(f"UPDATE platform_script_templates SET {', '.join(updates)} WHERE id = :id"),
params
)
db.commit()
return {"message": "更新成功"}
@router.delete("/templates/{template_id}")
async def delete_template(
template_id: int,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""删除模板"""
db.execute(text("DELETE FROM platform_script_templates WHERE id = :id"), {"id": template_id})
db.commit()
return {"message": "删除成功"}
# ============ 脚本版本管理 ============
@router.get("/{task_id}/versions")
async def list_versions(
task_id: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取任务的脚本版本列表"""
result = db.execute(
text("SELECT id, version, change_note, created_by, created_at FROM platform_script_versions WHERE task_id = :task_id ORDER BY version DESC"),
{"task_id": task_id}
)
versions = [dict(row) for row in result.mappings().all()]
return {"items": versions}
@router.get("/{task_id}/versions/{version}")
async def get_version(
task_id: int,
version: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取指定版本的脚本内容"""
result = db.execute(
text("SELECT * FROM platform_script_versions WHERE task_id = :task_id AND version = :version"),
{"task_id": task_id, "version": version}
)
ver = result.mappings().first()
if not ver:
raise HTTPException(status_code=404, detail="版本不存在")
return dict(ver)
@router.post("/{task_id}/versions/{version}/rollback")
async def rollback_version(
task_id: int,
version: int,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""回滚到指定版本"""
# 获取指定版本的脚本内容
result = db.execute(
text("SELECT script_content FROM platform_script_versions WHERE task_id = :task_id AND version = :version"),
{"task_id": task_id, "version": version}
)
ver = result.first()
if not ver:
raise HTTPException(status_code=404, detail="版本不存在")
script_content = ver[0]
# 更新任务脚本
db.execute(
text("UPDATE platform_scheduled_tasks SET script_content = :content WHERE id = :id"),
{"id": task_id, "content": script_content}
)
# 创建新版本记录
version_result = db.execute(
text("SELECT COALESCE(MAX(version), 0) FROM platform_script_versions WHERE task_id = :task_id"),
{"task_id": task_id}
)
max_version = version_result.scalar() or 0
new_version = max_version + 1
db.execute(
text("""
INSERT INTO platform_script_versions (task_id, version, script_content, change_note, created_by)
VALUES (:task_id, :version, :content, :note, :created_by)
"""),
{
"task_id": task_id,
"version": new_version,
"content": script_content,
"note": f"回滚到版本 {version}",
"created_by": user.username if hasattr(user, 'username') else None
}
)
db.commit()
# 重新加载任务
reload_task(task_id)
return {"message": f"已回滚到版本 {version},当前版本号 {new_version}"}
# ============ 统计数据 ============
@router.get("/{task_id}/stats")
async def get_task_stats(
task_id: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取任务执行统计"""
# 总执行次数
total_result = db.execute(
text("SELECT COUNT(*) FROM platform_task_logs WHERE task_id = :task_id"),
{"task_id": task_id}
)
total = total_result.scalar() or 0
# 成功次数
success_result = db.execute(
text("SELECT COUNT(*) FROM platform_task_logs WHERE task_id = :task_id AND status = 'success'"),
{"task_id": task_id}
)
success = success_result.scalar() or 0
# 失败次数
failed = total - success
# 成功率
success_rate = round(success / total * 100, 1) if total > 0 else 0
# 平均耗时(成功的任务)
avg_result = db.execute(
text("""
SELECT AVG(TIMESTAMPDIFF(SECOND, started_at, finished_at))
FROM platform_task_logs
WHERE task_id = :task_id AND status = 'success' AND finished_at IS NOT NULL
"""),
{"task_id": task_id}
)
avg_duration = avg_result.scalar()
avg_duration = round(float(avg_duration), 1) if avg_duration else 0
# 最近7天趋势
trend_result = db.execute(
text("""
SELECT DATE(started_at) as date,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
FROM platform_task_logs
WHERE task_id = :task_id AND started_at >= DATE_SUB(CURDATE(), INTERVAL 7 DAY)
GROUP BY DATE(started_at)
ORDER BY date
"""),
{"task_id": task_id}
)
trend = [dict(row) for row in trend_result.mappings().all()]
return {
"total": total,
"success": success,
"failed": failed,
"success_rate": success_rate,
"avg_duration": avg_duration,
"trend": trend
}