Files
012-kaopeilian/backend/app/api/v1/admin.py
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

510 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
管理员相关API路由
"""
from typing import Optional, List, Dict, Any
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from app.core.deps import get_current_active_user as get_current_user, get_db
from app.models.user import User
from app.models.course import Course, CourseStatus
from app.schemas.base import ResponseModel
router = APIRouter(prefix="/admin")
@router.get("/dashboard/stats")
async def get_dashboard_stats(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
) -> ResponseModel:
"""
获取管理员仪表盘统计数据
需要管理员权限
"""
# 权限检查
if current_user.role != "admin":
return ResponseModel(
code=403,
message="权限不足,需要管理员权限"
)
# 用户统计
total_users = await db.scalar(select(func.count(User.id)))
# 计算最近30天的新增用户
thirty_days_ago = datetime.now() - timedelta(days=30)
new_users_count = await db.scalar(
select(func.count(User.id))
.where(User.created_at >= thirty_days_ago)
)
# 计算增长率假设上个月也是30天
sixty_days_ago = datetime.now() - timedelta(days=60)
last_month_users = await db.scalar(
select(func.count(User.id))
.where(User.created_at >= sixty_days_ago)
.where(User.created_at < thirty_days_ago)
)
growth_rate = 0.0
if last_month_users > 0:
growth_rate = ((new_users_count - last_month_users) / last_month_users) * 100
# 课程统计
total_courses = await db.scalar(
select(func.count(Course.id))
.where(Course.status == CourseStatus.PUBLISHED)
)
# TODO: 完成的课程数需要根据用户课程进度表计算
completed_courses = 0 # 暂时设为0
# 考试统计(如果有考试表的话)
total_exams = 0
avg_score = 0.0
pass_rate = "0%"
# 学习时长统计(如果有学习记录表的话)
total_learning_hours = 0
avg_learning_hours = 0.0
active_rate = "0%"
# 构建响应数据
stats = {
"users": {
"total": total_users,
"growth": new_users_count,
"growthRate": f"{growth_rate:.1f}%"
},
"courses": {
"total": total_courses,
"completed": completed_courses,
"completionRate": f"{(completed_courses / total_courses * 100) if total_courses > 0 else 0:.1f}%"
},
"exams": {
"total": total_exams,
"avgScore": avg_score,
"passRate": pass_rate
},
"learning": {
"totalHours": total_learning_hours,
"avgHours": avg_learning_hours,
"activeRate": active_rate
}
}
return ResponseModel(
code=200,
message="获取仪表盘统计数据成功",
data=stats
)
@router.get("/dashboard/user-growth")
async def get_user_growth_data(
days: int = Query(30, description="统计天数", ge=7, le=90),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
) -> ResponseModel:
"""
获取用户增长数据
Args:
days: 统计天数默认30天
需要管理员权限
"""
# 权限检查
if current_user.role != "admin":
return ResponseModel(
code=403,
message="权限不足,需要管理员权限"
)
# 准备日期列表
dates = []
new_users = []
active_users = []
end_date = datetime.now().date()
for i in range(days):
current_date = end_date - timedelta(days=days-1-i)
dates.append(current_date.strftime("%Y-%m-%d"))
# 统计当天新增用户
next_date = current_date + timedelta(days=1)
new_count = await db.scalar(
select(func.count(User.id))
.where(func.date(User.created_at) == current_date)
)
new_users.append(new_count or 0)
# 统计当天活跃用户(有登录记录)
active_count = await db.scalar(
select(func.count(User.id))
.where(func.date(User.last_login_at) == current_date)
)
active_users.append(active_count or 0)
return ResponseModel(
code=200,
message="获取用户增长数据成功",
data={
"dates": dates,
"newUsers": new_users,
"activeUsers": active_users
}
)
@router.get("/dashboard/course-completion")
async def get_course_completion_data(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
) -> ResponseModel:
"""
获取课程完成率数据
需要管理员权限
"""
# 权限检查
if current_user.role != "admin":
return ResponseModel(
code=403,
message="权限不足,需要管理员权限"
)
# 获取所有已发布的课程
courses_result = await db.execute(
select(Course.name, Course.id)
.where(Course.status == CourseStatus.PUBLISHED)
.order_by(Course.sort_order, Course.id)
.limit(10) # 限制显示前10个课程
)
courses = courses_result.all()
course_names = []
completion_rates = []
for course_name, course_id in courses:
course_names.append(course_name)
# TODO: 根据用户课程进度表计算完成率
# 这里暂时生成模拟数据
import random
completion_rate = random.randint(60, 95)
completion_rates.append(completion_rate)
return ResponseModel(
code=200,
message="获取课程完成率数据成功",
data={
"courses": course_names,
"completionRates": completion_rates
}
)
# ===== 岗位管理(最小可用 stub 版本)=====
def _ensure_admin(user: User) -> Optional[ResponseModel]:
if user.role != "admin":
return ResponseModel(code=403, message="权限不足,需要管理员权限")
return None
# 注意positions相关路由已移至positions.py
# _sample_positions函数和所有positions路由已删除避免与positions.py冲突
# ===== 用户批量操作 =====
from pydantic import BaseModel
from app.models.position_member import PositionMember
class BatchUserOperation(BaseModel):
"""批量用户操作请求模型"""
ids: List[int]
action: str # delete, activate, deactivate, change_role, assign_position, assign_team
value: Optional[Any] = None # 角色值、岗位ID、团队ID等
@router.post("/users/batch", response_model=ResponseModel)
async def batch_user_operation(
operation: BatchUserOperation,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
) -> ResponseModel:
"""
批量用户操作
支持的操作类型:
- delete: 批量删除用户(软删除)
- activate: 批量启用用户
- deactivate: 批量禁用用户
- change_role: 批量修改角色(需要 value 参数)
- assign_position: 批量分配岗位(需要 value 参数为岗位ID
- assign_team: 批量分配团队(需要 value 参数为团队ID
权限:需要管理员权限
"""
# 权限检查
if current_user.role != "admin":
return ResponseModel(
code=403,
message="权限不足,需要管理员权限"
)
if not operation.ids:
return ResponseModel(
code=400,
message="请选择要操作的用户"
)
# 不能操作自己
if current_user.id in operation.ids:
return ResponseModel(
code=400,
message="不能对自己执行批量操作"
)
# 获取要操作的用户
result = await db.execute(
select(User).where(User.id.in_(operation.ids), User.is_deleted == False)
)
users = result.scalars().all()
if not users:
return ResponseModel(
code=404,
message="未找到要操作的用户"
)
success_count = 0
failed_count = 0
errors = []
try:
if operation.action == "delete":
# 批量软删除
for user in users:
try:
user.is_deleted = True
user.deleted_at = datetime.now()
success_count += 1
except Exception as e:
failed_count += 1
errors.append(f"删除用户 {user.username} 失败: {str(e)}")
await db.commit()
elif operation.action == "activate":
# 批量启用
for user in users:
try:
user.is_active = True
success_count += 1
except Exception as e:
failed_count += 1
errors.append(f"启用用户 {user.username} 失败: {str(e)}")
await db.commit()
elif operation.action == "deactivate":
# 批量禁用
for user in users:
try:
user.is_active = False
success_count += 1
except Exception as e:
failed_count += 1
errors.append(f"禁用用户 {user.username} 失败: {str(e)}")
await db.commit()
elif operation.action == "change_role":
# 批量修改角色
if not operation.value:
return ResponseModel(
code=400,
message="请指定要修改的角色"
)
valid_roles = ["trainee", "manager", "admin"]
if operation.value not in valid_roles:
return ResponseModel(
code=400,
message=f"无效的角色,可选值: {', '.join(valid_roles)}"
)
for user in users:
try:
user.role = operation.value
success_count += 1
except Exception as e:
failed_count += 1
errors.append(f"修改用户 {user.username} 角色失败: {str(e)}")
await db.commit()
elif operation.action == "assign_position":
# 批量分配岗位
if not operation.value:
return ResponseModel(
code=400,
message="请指定要分配的岗位ID"
)
position_id = int(operation.value)
# 获取岗位信息用于通知
from app.models.position import Position
position_result = await db.execute(
select(Position).where(Position.id == position_id)
)
position = position_result.scalar_one_or_none()
position_name = position.name if position else "未知岗位"
# 记录新分配成功的用户ID用于发送通知
newly_assigned_user_ids = []
for user in users:
try:
# 检查是否已有该岗位
existing = await db.execute(
select(PositionMember).where(
PositionMember.user_id == user.id,
PositionMember.position_id == position_id,
PositionMember.is_deleted == False
)
)
if existing.scalar_one_or_none():
# 已有该岗位,跳过
success_count += 1
continue
# 添加岗位关联PositionMember模型没有created_by字段
member = PositionMember(
position_id=position_id,
user_id=user.id,
joined_at=datetime.now()
)
db.add(member)
newly_assigned_user_ids.append(user.id)
success_count += 1
except Exception as e:
failed_count += 1
errors.append(f"为用户 {user.username} 分配岗位失败: {str(e)}")
await db.commit()
# 发送岗位分配通知给新分配的用户
if newly_assigned_user_ids:
try:
from app.services.notification_service import notification_service
from app.schemas.notification import NotificationBatchCreate, NotificationType
notification_batch = NotificationBatchCreate(
user_ids=newly_assigned_user_ids,
title="岗位分配通知",
content=f"您已被分配到「{position_name}」岗位,请查看相关培训课程。",
type=NotificationType.POSITION_ASSIGN,
related_id=position_id,
related_type="position",
sender_id=current_user.id
)
await notification_service.batch_create_notifications(
db=db,
batch_in=notification_batch
)
except Exception as e:
# 通知发送失败不影响岗位分配结果
import logging
logging.getLogger(__name__).error(f"发送岗位分配通知失败: {str(e)}")
elif operation.action == "assign_team":
# 批量分配团队
if not operation.value:
return ResponseModel(
code=400,
message="请指定要分配的团队ID"
)
from app.models.user import user_teams
team_id = int(operation.value)
for user in users:
try:
# 检查是否已在该团队
existing = await db.execute(
select(user_teams).where(
user_teams.c.user_id == user.id,
user_teams.c.team_id == team_id
)
)
if existing.first():
# 已在该团队,跳过
success_count += 1
continue
# 添加团队关联
await db.execute(
user_teams.insert().values(
user_id=user.id,
team_id=team_id,
role="member",
joined_at=datetime.now()
)
)
success_count += 1
except Exception as e:
failed_count += 1
errors.append(f"为用户 {user.username} 分配团队失败: {str(e)}")
await db.commit()
else:
return ResponseModel(
code=400,
message=f"不支持的操作类型: {operation.action}"
)
# 返回结果
action_names = {
"delete": "删除",
"activate": "启用",
"deactivate": "禁用",
"change_role": "修改角色",
"assign_position": "分配岗位",
"assign_team": "分配团队"
}
action_name = action_names.get(operation.action, operation.action)
return ResponseModel(
code=200,
message=f"批量{action_name}完成:成功 {success_count} 个,失败 {failed_count}",
data={
"success_count": success_count,
"failed_count": failed_count,
"errors": errors
}
)
except Exception as e:
await db.rollback()
return ResponseModel(
code=500,
message=f"批量操作失败: {str(e)}"
)