Files
000-platform/backend/app/routers/scripts.py
Admin 2f9d85edb6
Some checks failed
continuous-integration/drone/push Build is failing
feat: 脚本管理页面(类似青龙面板)
- 新增脚本管理页面,左右分栏布局
- 集成 Monaco Editor 代码编辑器(语法高亮、行号、快捷键)
- 支持脚本 CRUD、运行、复制等操作
- 定时任务支持从脚本库导入脚本
- 新增 platform_scripts 表存储脚本
2026-01-28 13:13:08 +08:00

326 lines
9.7 KiB
Python

"""脚本管理路由"""
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from typing import Optional, List
from sqlalchemy.orm import Session
from sqlalchemy import text
from datetime import datetime
from ..database import get_db
from .auth import get_current_user, require_operator
from ..models.user import User
router = APIRouter(prefix="/scripts", tags=["脚本管理"])
# Schemas
class ScriptCreate(BaseModel):
tenant_id: Optional[str] = None
name: str
filename: Optional[str] = None
description: Optional[str] = None
script_content: str
category: Optional[str] = None
is_enabled: bool = True
class ScriptUpdate(BaseModel):
name: Optional[str] = None
filename: Optional[str] = None
description: Optional[str] = None
script_content: Optional[str] = None
category: Optional[str] = None
is_enabled: Optional[bool] = None
class ScriptRunRequest(BaseModel):
tenant_id: Optional[str] = None # 可指定以哪个租户身份运行
# API Endpoints
@router.get("")
async def list_scripts(
page: int = Query(1, ge=1),
size: int = Query(50, ge=1, le=200),
tenant_id: Optional[str] = None,
category: Optional[str] = None,
keyword: Optional[str] = None,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取脚本列表"""
where_clauses = []
params = {}
if tenant_id:
where_clauses.append("(tenant_id = :tenant_id OR tenant_id IS NULL)")
params["tenant_id"] = tenant_id
if category:
where_clauses.append("category = :category")
params["category"] = category
if keyword:
where_clauses.append("(name LIKE :keyword OR description LIKE :keyword)")
params["keyword"] = f"%{keyword}%"
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
# 查询总数
count_result = db.execute(
text(f"SELECT COUNT(*) FROM platform_scripts WHERE {where_sql}"),
params
)
total = count_result.scalar()
# 查询列表
params["offset"] = (page - 1) * size
params["limit"] = size
result = db.execute(
text(f"""
SELECT id, tenant_id, name, filename, description, category,
is_enabled, last_run_at, last_run_status, created_by, created_at, updated_at,
LENGTH(script_content) as content_length
FROM platform_scripts
WHERE {where_sql}
ORDER BY updated_at DESC, id DESC
LIMIT :limit OFFSET :offset
"""),
params
)
scripts = [dict(row) for row in result.mappings().all()]
# 获取分类列表
cat_result = db.execute(
text("SELECT DISTINCT category FROM platform_scripts WHERE category IS NOT NULL AND category != ''")
)
categories = [row[0] for row in cat_result.fetchall()]
return {
"total": total,
"page": page,
"size": size,
"items": scripts,
"categories": categories
}
@router.get("/{script_id}")
async def get_script(
script_id: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""获取脚本详情(包含内容)"""
result = db.execute(
text("SELECT * FROM platform_scripts WHERE id = :id"),
{"id": script_id}
)
script = result.mappings().first()
if not script:
raise HTTPException(status_code=404, detail="脚本不存在")
return dict(script)
@router.post("")
async def create_script(
data: ScriptCreate,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""创建脚本"""
# 自动生成文件名
filename = data.filename
if not filename and data.name:
# 转换为安全的文件名
import re
safe_name = re.sub(r'[^\w\u4e00-\u9fff]', '_', data.name)
filename = f"{safe_name}.py"
db.execute(
text("""
INSERT INTO platform_scripts
(tenant_id, name, filename, description, script_content, category, is_enabled, created_by)
VALUES (:tenant_id, :name, :filename, :description, :script_content, :category, :is_enabled, :created_by)
"""),
{
"tenant_id": data.tenant_id,
"name": data.name,
"filename": filename,
"description": data.description,
"script_content": data.script_content,
"category": data.category,
"is_enabled": 1 if data.is_enabled else 0,
"created_by": user.username if hasattr(user, 'username') else None
}
)
db.commit()
result = db.execute(text("SELECT LAST_INSERT_ID() as id"))
script_id = result.scalar()
return {"id": script_id, "message": "创建成功"}
@router.put("/{script_id}")
async def update_script(
script_id: int,
data: ScriptUpdate,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""更新脚本"""
# 检查是否存在
result = db.execute(
text("SELECT id FROM platform_scripts WHERE id = :id"),
{"id": script_id}
)
if not result.scalar():
raise HTTPException(status_code=404, detail="脚本不存在")
updates = []
params = {"id": script_id}
if data.name is not None:
updates.append("name = :name")
params["name"] = data.name
if data.filename is not None:
updates.append("filename = :filename")
params["filename"] = data.filename
if data.description is not None:
updates.append("description = :description")
params["description"] = data.description
if data.script_content is not None:
updates.append("script_content = :script_content")
params["script_content"] = data.script_content
if data.category is not None:
updates.append("category = :category")
params["category"] = data.category
if data.is_enabled is not None:
updates.append("is_enabled = :is_enabled")
params["is_enabled"] = 1 if data.is_enabled else 0
if updates:
db.execute(
text(f"UPDATE platform_scripts SET {', '.join(updates)} WHERE id = :id"),
params
)
db.commit()
return {"message": "更新成功"}
@router.delete("/{script_id}")
async def delete_script(
script_id: int,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""删除脚本"""
result = db.execute(
text("SELECT id FROM platform_scripts WHERE id = :id"),
{"id": script_id}
)
if not result.scalar():
raise HTTPException(status_code=404, detail="脚本不存在")
db.execute(
text("DELETE FROM platform_scripts WHERE id = :id"),
{"id": script_id}
)
db.commit()
return {"message": "删除成功"}
@router.post("/{script_id}/run")
async def run_script(
script_id: int,
data: ScriptRunRequest = None,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""执行脚本"""
from ..services.script_executor import test_script as run_test
# 获取脚本
result = db.execute(
text("SELECT * FROM platform_scripts WHERE id = :id"),
{"id": script_id}
)
script = result.mappings().first()
if not script:
raise HTTPException(status_code=404, detail="脚本不存在")
if not script["script_content"]:
raise HTTPException(status_code=400, detail="脚本内容为空")
# 确定租户ID
tenant_id = (data.tenant_id if data else None) or script["tenant_id"] or "system"
# 执行脚本
exec_result = await run_test(
tenant_id=tenant_id,
script_content=script["script_content"]
)
# 更新执行状态
status = "success" if exec_result.success else "failed"
db.execute(
text("""
UPDATE platform_scripts
SET last_run_at = NOW(), last_run_status = :status
WHERE id = :id
"""),
{"id": script_id, "status": status}
)
db.commit()
return exec_result.to_dict()
@router.post("/{script_id}/copy")
async def copy_script(
script_id: int,
user: User = Depends(require_operator),
db: Session = Depends(get_db)
):
"""复制脚本"""
# 获取原脚本
result = db.execute(
text("SELECT * FROM platform_scripts WHERE id = :id"),
{"id": script_id}
)
script = result.mappings().first()
if not script:
raise HTTPException(status_code=404, detail="脚本不存在")
# 创建副本
new_name = f"{script['name']} - 副本"
db.execute(
text("""
INSERT INTO platform_scripts
(tenant_id, name, filename, description, script_content, category, is_enabled, created_by)
VALUES (:tenant_id, :name, :filename, :description, :script_content, :category, 1, :created_by)
"""),
{
"tenant_id": script["tenant_id"],
"name": new_name,
"filename": None,
"description": script["description"],
"script_content": script["script_content"],
"category": script["category"],
"created_by": user.username if hasattr(user, 'username') else None
}
)
db.commit()
result = db.execute(text("SELECT LAST_INSERT_ID() as id"))
new_id = result.scalar()
return {"id": new_id, "message": "复制成功"}