Files
012-kaopeilian/backend/app/api/v1/team_dashboard.py
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

751 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
团队看板 API 路由
提供团队概览、学习进度、排行榜、动态等数据
"""
import json
from datetime import datetime, timedelta
from typing import Any, Dict, List
from fastapi import APIRouter, Depends
from sqlalchemy import and_, func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.deps import get_current_active_user as get_current_user, get_db
from app.core.logger import logger
from app.models.course import Course
from app.models.exam import Exam
from app.models.position import Position
from app.models.position_member import PositionMember
from app.models.practice import PracticeReport, PracticeSession
from app.models.user import Team, User, UserTeam
from app.schemas.base import ResponseModel
router = APIRouter(prefix="/team/dashboard", tags=["team-dashboard"])
async def get_accessible_teams(
current_user: User,
db: AsyncSession
) -> List[int]:
"""获取用户可访问的团队ID列表"""
if current_user.role in ['admin', 'manager']:
# 管理员查看所有团队
stmt = select(Team.id).where(Team.is_deleted == False) # noqa: E712
result = await db.execute(stmt)
return [row[0] for row in result.all()]
else:
# 普通用户只查看自己的团队
stmt = select(UserTeam.team_id).where(UserTeam.user_id == current_user.id)
result = await db.execute(stmt)
return [row[0] for row in result.all()]
async def get_team_member_ids(
team_ids: List[int],
db: AsyncSession
) -> List[int]:
"""获取团队成员ID列表"""
if not team_ids:
return []
stmt = select(UserTeam.user_id).where(
UserTeam.team_id.in_(team_ids)
).distinct()
result = await db.execute(stmt)
return [row[0] for row in result.all()]
@router.get("/overview", response_model=ResponseModel)
async def get_team_overview(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> ResponseModel:
"""
获取团队概览统计
返回团队总数、成员数、平均学习进度、平均成绩、课程完成率等
"""
try:
# 获取可访问的团队
team_ids = await get_accessible_teams(current_user, db)
# 获取团队成员ID
member_ids = await get_team_member_ids(team_ids, db)
# 统计团队数
team_count = len(team_ids)
# 统计成员数
member_count = len(member_ids)
# 计算平均考试成绩使用round1_score
avg_score = 0.0
if member_ids:
stmt = select(func.avg(Exam.round1_score)).where(
and_(
Exam.user_id.in_(member_ids),
Exam.round1_score.isnot(None),
Exam.status.in_(['completed', 'submitted'])
)
)
result = await db.execute(stmt)
avg_score_value = result.scalar()
avg_score = float(avg_score_value) if avg_score_value else 0.0
# 计算平均学习进度(基于考试完成情况)
avg_progress = 0.0
if member_ids:
# 统计每个成员完成的考试数
stmt = select(func.count(Exam.id)).where(
and_(
Exam.user_id.in_(member_ids),
Exam.status.in_(['completed', 'submitted'])
)
)
result = await db.execute(stmt)
completed_exams = result.scalar() or 0
# 假设每个成员应完成10个考试计算完成率作为进度
total_expected = member_count * 10
if total_expected > 0:
avg_progress = (completed_exams / total_expected) * 100
# 计算课程完成率
course_completion_rate = 0.0
if member_ids:
# 统计已完成的课程数(有考试记录且成绩>=60
stmt = select(func.count(func.distinct(Exam.course_id))).where(
and_(
Exam.user_id.in_(member_ids),
Exam.round1_score >= 60,
Exam.status.in_(['completed', 'submitted'])
)
)
result = await db.execute(stmt)
completed_courses = result.scalar() or 0
# 统计总课程数
stmt = select(func.count(Course.id)).where(
and_(
Course.is_deleted == False, # noqa: E712
Course.status == 'published'
)
)
result = await db.execute(stmt)
total_courses = result.scalar() or 0
if total_courses > 0:
course_completion_rate = (completed_courses / total_courses) * 100
# 趋势数据(暂时返回固定值,后续可实现真实趋势计算)
trends = {
"member_trend": 0,
"progress_trend": 12.3 if avg_progress > 0 else 0,
"score_trend": 5.8 if avg_score > 0 else 0,
"completion_trend": -3.2 if course_completion_rate > 0 else 0
}
data = {
"team_count": team_count,
"member_count": member_count,
"avg_progress": round(avg_progress, 1),
"avg_score": round(avg_score, 1),
"course_completion_rate": round(course_completion_rate, 1),
"trends": trends
}
return ResponseModel(code=200, message="success", data=data)
except Exception as e:
logger.error(f"获取团队概览失败: {e}", exc_info=True)
return ResponseModel(code=500, message=f"获取团队概览失败: {str(e)}", data=None)
@router.get("/progress", response_model=ResponseModel)
async def get_progress_data(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> ResponseModel:
"""
获取学习进度数据
返回Top 5成员的8周学习进度数据
"""
try:
# 获取可访问的团队
team_ids = await get_accessible_teams(current_user, db)
member_ids = await get_team_member_ids(team_ids, db)
if not member_ids:
return ResponseModel(
code=200,
message="success",
data={"members": [], "weeks": [], "data": []}
)
# 获取Top 5学习时长最高的成员
stmt = (
select(
User.id,
User.full_name,
func.sum(PracticeSession.duration_seconds).label('total_duration')
)
.join(PracticeSession, PracticeSession.user_id == User.id)
.where(
and_(
User.id.in_(member_ids),
PracticeSession.status == 'completed'
)
)
.group_by(User.id, User.full_name)
.order_by(func.sum(PracticeSession.duration_seconds).desc())
.limit(5)
)
result = await db.execute(stmt)
top_members = result.all()
if not top_members:
# 如果没有陪练记录按考试成绩选择Top 5
stmt = (
select(
User.id,
User.full_name,
func.avg(Exam.round1_score).label('avg_score')
)
.join(Exam, Exam.user_id == User.id)
.where(
and_(
User.id.in_(member_ids),
Exam.round1_score.isnot(None),
Exam.status.in_(['completed', 'submitted'])
)
)
.group_by(User.id, User.full_name)
.order_by(func.avg(Exam.round1_score).desc())
.limit(5)
)
result = await db.execute(stmt)
top_members = result.all()
# 生成周标签
weeks = [f"{i+1}" for i in range(8)]
# 为每个成员生成进度数据
members = []
data = []
for member in top_members:
member_name = member.full_name or f"用户{member.id}"
members.append(member_name)
# 查询该成员8周内的考试完成情况
eight_weeks_ago = datetime.now() - timedelta(weeks=8)
stmt = select(Exam).where(
and_(
Exam.user_id == member.id,
Exam.created_at >= eight_weeks_ago,
Exam.status.in_(['completed', 'submitted'])
)
).order_by(Exam.created_at)
result = await db.execute(stmt)
exams = result.scalars().all()
# 计算每周的进度0-100
values = []
for week in range(8):
week_start = datetime.now() - timedelta(weeks=8-week)
week_end = week_start + timedelta(weeks=1)
# 统计该周完成的考试数
week_exams = [
e for e in exams
if week_start <= e.created_at < week_end
]
# 进度 = 累计完成考试数 * 10假设每个考试代表10%进度)
cumulative_exams = len([e for e in exams if e.created_at < week_end])
progress = min(cumulative_exams * 10, 100)
values.append(progress)
data.append({"name": member_name, "values": values})
return ResponseModel(
code=200,
message="success",
data={"members": members, "weeks": weeks, "data": data}
)
except Exception as e:
logger.error(f"获取学习进度数据失败: {e}", exc_info=True)
return ResponseModel(code=500, message=f"获取学习进度数据失败: {str(e)}", data=None)
@router.get("/course-distribution", response_model=ResponseModel)
async def get_course_distribution(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> ResponseModel:
"""
获取课程完成分布
返回已完成、进行中、未开始的课程数量
"""
try:
# 获取可访问的团队
team_ids = await get_accessible_teams(current_user, db)
member_ids = await get_team_member_ids(team_ids, db)
# 统计所有已发布的课程
stmt = select(func.count(Course.id)).where(
and_(
Course.is_deleted == False, # noqa: E712
Course.status == 'published'
)
)
result = await db.execute(stmt)
total_courses = result.scalar() or 0
if not member_ids or total_courses == 0:
return ResponseModel(
code=200,
message="success",
data={"completed": 0, "in_progress": 0, "not_started": 0}
)
# 统计已完成的课程(有及格成绩)
stmt = select(func.count(func.distinct(Exam.course_id))).where(
and_(
Exam.user_id.in_(member_ids),
Exam.round1_score >= 60,
Exam.status.in_(['completed', 'submitted'])
)
)
result = await db.execute(stmt)
completed = result.scalar() or 0
# 统计进行中的课程(有考试记录但未及格)
stmt = select(func.count(func.distinct(Exam.course_id))).where(
and_(
Exam.user_id.in_(member_ids),
or_(
Exam.round1_score < 60,
Exam.status == 'started'
)
)
)
result = await db.execute(stmt)
in_progress = result.scalar() or 0
# 未开始 = 总数 - 已完成 - 进行中
not_started = max(0, total_courses - completed - in_progress)
data = {
"completed": completed,
"in_progress": in_progress,
"not_started": not_started
}
return ResponseModel(code=200, message="success", data=data)
except Exception as e:
logger.error(f"获取课程分布失败: {e}", exc_info=True)
return ResponseModel(code=500, message=f"获取课程分布失败: {str(e)}", data=None)
@router.get("/ability-analysis", response_model=ResponseModel)
async def get_ability_analysis(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> ResponseModel:
"""
获取能力分析数据
返回团队能力雷达图数据和短板列表
"""
try:
# 获取可访问的团队
team_ids = await get_accessible_teams(current_user, db)
member_ids = await get_team_member_ids(team_ids, db)
if not member_ids:
return ResponseModel(
code=200,
message="success",
data={
"radar_data": {
"dimensions": [],
"values": []
},
"weaknesses": []
}
)
# 查询所有陪练报告的能力维度数据
# 需要通过PracticeSession关联因为PracticeReport没有user_id
stmt = (
select(PracticeReport.ability_dimensions)
.join(PracticeSession, PracticeSession.session_id == PracticeReport.session_id)
.where(PracticeSession.user_id.in_(member_ids))
)
result = await db.execute(stmt)
all_dimensions = result.scalars().all()
if not all_dimensions:
# 如果没有陪练报告,返回默认能力维度
default_dimensions = ["沟通表达", "倾听理解", "需求挖掘", "异议处理", "成交技巧", "客户维护"]
return ResponseModel(
code=200,
message="success",
data={
"radar_data": {
"dimensions": default_dimensions,
"values": [0] * len(default_dimensions)
},
"weaknesses": []
}
)
# 聚合能力数据
ability_scores: Dict[str, List[float]] = {}
# 能力维度名称映射
dimension_name_map = {
"sales_ability": "销售能力",
"service_attitude": "服务态度",
"technical_skills": "技术能力",
"沟通表达": "沟通表达",
"倾听理解": "倾听理解",
"需求挖掘": "需求挖掘",
"异议处理": "异议处理",
"成交技巧": "成交技巧",
"客户维护": "客户维护"
}
for dimensions in all_dimensions:
if dimensions:
# 如果是字符串进行JSON反序列化
if isinstance(dimensions, str):
try:
dimensions = json.loads(dimensions)
except json.JSONDecodeError:
logger.warning(f"无法解析能力维度数据: {dimensions}")
continue
# 处理字典格式:{"sales_ability": 79.0, ...}
if isinstance(dimensions, dict):
for key, score in dimensions.items():
name = dimension_name_map.get(key, key)
if name not in ability_scores:
ability_scores[name] = []
ability_scores[name].append(float(score))
# 处理列表格式:[{"name": "沟通表达", "score": 85}, ...]
elif isinstance(dimensions, list):
for dim in dimensions:
if not isinstance(dim, dict):
logger.warning(f"能力维度项格式错误: {type(dim)}")
continue
name = dim.get('name', '')
score = dim.get('score', 0)
if name:
mapped_name = dimension_name_map.get(name, name)
if mapped_name not in ability_scores:
ability_scores[mapped_name] = []
ability_scores[mapped_name].append(float(score))
else:
logger.warning(f"能力维度数据格式错误: {type(dimensions)}")
# 计算平均分
avg_scores = {
name: sum(scores) / len(scores)
for name, scores in ability_scores.items()
}
# 按固定顺序排列维度(支持多种维度组合)
# 优先使用六维度,如果没有则使用三维度
standard_dimensions_six = ["沟通表达", "倾听理解", "需求挖掘", "异议处理", "成交技巧", "客户维护"]
standard_dimensions_three = ["销售能力", "服务态度", "技术能力"]
# 判断使用哪种维度标准
has_six_dimensions = any(dim in avg_scores for dim in standard_dimensions_six)
has_three_dimensions = any(dim in avg_scores for dim in standard_dimensions_three)
if has_six_dimensions:
standard_dimensions = standard_dimensions_six
elif has_three_dimensions:
standard_dimensions = standard_dimensions_three
else:
# 如果都没有,使用实际数据的维度
standard_dimensions = list(avg_scores.keys())
dimensions = []
values = []
for dim in standard_dimensions:
if dim in avg_scores:
dimensions.append(dim)
values.append(round(avg_scores[dim], 1))
# 找出短板(平均分<80
weaknesses = []
weakness_suggestions = {
# 六维度建议
"异议处理": "建议加强异议处理专项训练,增加实战演练",
"成交技巧": "需要系统学习成交话术和时机把握",
"需求挖掘": "提升提问技巧,深入了解客户需求",
"沟通表达": "加强沟通技巧训练,提升表达能力",
"倾听理解": "培养同理心,提高倾听和理解能力",
"客户维护": "学习客户关系管理,提升服务质量",
# 三维度建议
"销售能力": "建议加强销售技巧训练,提升成交率",
"服务态度": "需要改善服务态度,提高客户满意度",
"技术能力": "建议学习产品知识,提升专业能力"
}
for name, score in avg_scores.items():
if score < 80:
weaknesses.append({
"name": name,
"avg_score": int(score),
"suggestion": weakness_suggestions.get(name, f"建议加强{name}专项训练")
})
# 按分数升序排列
weaknesses.sort(key=lambda x: x['avg_score'])
data = {
"radar_data": {
"dimensions": dimensions,
"values": values
},
"weaknesses": weaknesses
}
return ResponseModel(code=200, message="success", data=data)
except Exception as e:
logger.error(f"获取能力分析失败: {e}", exc_info=True)
return ResponseModel(code=500, message=f"获取能力分析失败: {str(e)}", data=None)
@router.get("/rankings", response_model=ResponseModel)
async def get_rankings(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> ResponseModel:
"""
获取排行榜数据
返回学习时长排行和成绩排行Top 5
"""
try:
# 获取可访问的团队
team_ids = await get_accessible_teams(current_user, db)
member_ids = await get_team_member_ids(team_ids, db)
if not member_ids:
return ResponseModel(
code=200,
message="success",
data={
"study_time_ranking": [],
"score_ranking": []
}
)
# 学习时长排行(基于陪练会话)
stmt = (
select(
User.id,
User.full_name,
User.avatar_url,
Position.name.label('position_name'),
func.sum(PracticeSession.duration_seconds).label('total_duration')
)
.join(PracticeSession, PracticeSession.user_id == User.id)
.outerjoin(PositionMember, and_(
PositionMember.user_id == User.id,
PositionMember.is_deleted == False # noqa: E712
))
.outerjoin(Position, Position.id == PositionMember.position_id)
.where(
and_(
User.id.in_(member_ids),
PracticeSession.status == 'completed'
)
)
.group_by(User.id, User.full_name, User.avatar_url, Position.name)
.order_by(func.sum(PracticeSession.duration_seconds).desc())
.limit(5)
)
result = await db.execute(stmt)
study_time_data = result.all()
study_time_ranking = []
for row in study_time_data:
study_time_ranking.append({
"id": row.id,
"name": row.full_name or f"用户{row.id}",
"position": row.position_name or "未分配岗位",
"avatar": row.avatar_url or "",
"study_time": round(row.total_duration / 3600, 1) # 转换为小时
})
# 成绩排行基于考试round1_score
stmt = (
select(
User.id,
User.full_name,
User.avatar_url,
Position.name.label('position_name'),
func.avg(Exam.round1_score).label('avg_score')
)
.join(Exam, Exam.user_id == User.id)
.outerjoin(PositionMember, and_(
PositionMember.user_id == User.id,
PositionMember.is_deleted == False # noqa: E712
))
.outerjoin(Position, Position.id == PositionMember.position_id)
.where(
and_(
User.id.in_(member_ids),
Exam.round1_score.isnot(None),
Exam.status.in_(['completed', 'submitted'])
)
)
.group_by(User.id, User.full_name, User.avatar_url, Position.name)
.order_by(func.avg(Exam.round1_score).desc())
.limit(5)
)
result = await db.execute(stmt)
score_data = result.all()
score_ranking = []
for row in score_data:
score_ranking.append({
"id": row.id,
"name": row.full_name or f"用户{row.id}",
"position": row.position_name or "未分配岗位",
"avatar": row.avatar_url or "",
"avg_score": round(row.avg_score, 1)
})
data = {
"study_time_ranking": study_time_ranking,
"score_ranking": score_ranking
}
return ResponseModel(code=200, message="success", data=data)
except Exception as e:
logger.error(f"获取排行榜失败: {e}", exc_info=True)
return ResponseModel(code=500, message=f"获取排行榜失败: {str(e)}", data=None)
@router.get("/activities", response_model=ResponseModel)
async def get_activities(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> ResponseModel:
"""
获取团队学习动态
返回最近20条活动记录考试、陪练等
"""
try:
# 获取可访问的团队
team_ids = await get_accessible_teams(current_user, db)
member_ids = await get_team_member_ids(team_ids, db)
if not member_ids:
return ResponseModel(
code=200,
message="success",
data={"activities": []}
)
activities = []
# 获取最近的考试记录
stmt = (
select(Exam, User.full_name, Course.name.label('course_name'))
.join(User, User.id == Exam.user_id)
.join(Course, Course.id == Exam.course_id)
.where(
and_(
Exam.user_id.in_(member_ids),
Exam.status.in_(['completed', 'submitted'])
)
)
.order_by(Exam.updated_at.desc())
.limit(10)
)
result = await db.execute(stmt)
exam_records = result.all()
for exam, user_name, course_name in exam_records:
score = exam.round1_score or 0
activity_type = "success" if score >= 60 else "danger"
result_type = "success" if score >= 60 else "danger"
result_text = f"成绩:{int(score)}" if score >= 60 else "未通过"
activities.append({
"id": f"exam_{exam.id}",
"user_name": user_name or f"用户{exam.user_id}",
"action": "完成了" if score >= 60 else "参加了",
"target": f"{course_name}》课程考试",
"time": exam.updated_at.strftime("%Y-%m-%d %H:%M"),
"type": activity_type,
"result": {"type": result_type, "text": result_text}
})
# 获取最近的陪练记录
stmt = (
select(PracticeSession, User.full_name, PracticeReport.total_score)
.join(User, User.id == PracticeSession.user_id)
.outerjoin(PracticeReport, PracticeReport.session_id == PracticeSession.session_id)
.where(
and_(
PracticeSession.user_id.in_(member_ids),
PracticeSession.status == 'completed'
)
)
.order_by(PracticeSession.end_time.desc())
.limit(10)
)
result = await db.execute(stmt)
practice_records = result.all()
for session, user_name, total_score in practice_records:
activity_type = "primary"
result_data = None
if total_score:
result_data = {"type": "", "text": f"评分:{int(total_score)}"}
activities.append({
"id": f"practice_{session.id}",
"user_name": user_name or f"用户{session.user_id}",
"action": "参加了",
"target": "AI陪练训练",
"time": session.end_time.strftime("%Y-%m-%d %H:%M") if session.end_time else "",
"type": activity_type,
"result": result_data
})
# 按时间倒序排列取前20条
activities.sort(key=lambda x: x['time'], reverse=True)
activities = activities[:20]
return ResponseModel(
code=200,
message="success",
data={"activities": activities}
)
except Exception as e:
logger.error(f"获取团队动态失败: {e}", exc_info=True)
return ResponseModel(code=500, message=f"获取团队动态失败: {str(e)}", data=None)