""" 团队成员管理 API 路由 提供团队统计、成员列表、成员详情、学习报告等功能 """ from datetime import datetime, timedelta from typing import Any, Dict, List, Optional from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy import and_, func, or_, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.deps import get_current_active_user as get_current_user, get_db from app.core.logger import logger from app.models.course import Course from app.models.exam import Exam from app.models.position import Position from app.models.position_course import PositionCourse from app.models.position_member import PositionMember from app.models.practice import PracticeReport, PracticeSession from app.models.user import User, UserTeam from app.schemas.base import PaginatedResponse, ResponseModel router = APIRouter(prefix="/team/management", tags=["team-management"]) async def get_accessible_team_member_ids( current_user: User, db: AsyncSession ) -> List[int]: """获取用户可访问的团队成员ID列表(只返回未删除的用户)""" if current_user.role in ['admin', 'manager']: # 管理员查看所有团队成员(过滤已删除用户) stmt = select(UserTeam.user_id).join( User, UserTeam.user_id == User.id ).where( and_( User.is_deleted == False, # noqa: E712 User.is_active == True # noqa: E712 ) ).distinct() result = await db.execute(stmt) return [row[0] for row in result.all()] else: # 普通用户只查看自己团队的成员 # 1. 先查询用户所在的团队 stmt = select(UserTeam.team_id).where(UserTeam.user_id == current_user.id) result = await db.execute(stmt) team_ids = [row[0] for row in result.all()] if not team_ids: return [] # 2. 查询这些团队的所有成员(过滤已删除用户) stmt = select(UserTeam.user_id).join( User, UserTeam.user_id == User.id ).where( and_( UserTeam.team_id.in_(team_ids), User.is_deleted == False, # noqa: E712 User.is_active == True # noqa: E712 ) ).distinct() result = await db.execute(stmt) return [row[0] for row in result.all()] def calculate_member_status( last_login: Optional[datetime], last_exam: Optional[datetime], last_practice: Optional[datetime], has_ongoing: bool ) -> str: """ 计算成员活跃状态 Args: last_login: 最后登录时间 last_exam: 最后考试时间 last_practice: 最后陪练时间 has_ongoing: 是否有进行中的活动 Returns: 状态: active(活跃), learning(学习中), rest(休息) """ # 获取最近活跃时间 times = [t for t in [last_login, last_exam, last_practice] if t is not None] if not times: return 'rest' last_active = max(times) thirty_days_ago = datetime.now() - timedelta(days=30) # 判断状态 if last_active >= thirty_days_ago: if has_ongoing: return 'learning' else: return 'active' else: return 'rest' @router.get("/statistics", response_model=ResponseModel) async def get_team_statistics( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """ 获取团队统计数据 返回:团队总人数、活跃成员数、平均学习进度、团队平均分 """ try: # 获取可访问的团队成员ID member_ids = await get_accessible_team_member_ids(current_user, db) # 团队总人数 team_count = len(member_ids) if team_count == 0: return ResponseModel( code=200, message="success", data={ "teamCount": 0, "activeMembers": 0, "avgProgress": 0, "avgScore": 0 } ) # 统计活跃成员数(最近30天有活动) thirty_days_ago = datetime.now() - timedelta(days=30) # 统计最近30天有登录或有考试或有陪练的用户 active_users_stmt = select(func.count(func.distinct(User.id))).where( and_( User.id.in_(member_ids), or_( User.last_login_at >= thirty_days_ago, User.id.in_( select(Exam.user_id).where( and_( Exam.user_id.in_(member_ids), Exam.created_at >= thirty_days_ago ) ) ), User.id.in_( select(PracticeSession.user_id).where( and_( PracticeSession.user_id.in_(member_ids), PracticeSession.start_time >= thirty_days_ago ) ) ) ) ) ) result = await db.execute(active_users_stmt) active_members = result.scalar() or 0 # 计算平均学习进度(每个成员的完成课程/应完成课程的平均值) # 统计每个成员的进度,然后计算平均值 total_progress = 0.0 members_with_courses = 0 for member_id in member_ids: # 获取该成员岗位分配的课程数 member_courses_stmt = select( func.count(func.distinct(PositionCourse.course_id)) ).select_from(PositionMember).join( PositionCourse, PositionCourse.position_id == PositionMember.position_id ).where( and_( PositionMember.user_id == member_id, PositionMember.is_deleted == False # noqa: E712 ) ) result = await db.execute(member_courses_stmt) member_total_courses = result.scalar() or 0 if member_total_courses > 0: # 获取该成员已完成(及格)的课程数 member_completed_stmt = select( func.count(func.distinct(Exam.course_id)) ).where( and_( Exam.user_id == member_id, Exam.round1_score >= 60, Exam.status.in_(['completed', 'submitted']) ) ) result = await db.execute(member_completed_stmt) member_completed = result.scalar() or 0 # 计算该成员的进度(最大100%) member_progress = min((member_completed / member_total_courses) * 100, 100) total_progress += member_progress members_with_courses += 1 avg_progress = round(total_progress / members_with_courses, 1) if members_with_courses > 0 else 0.0 # 计算团队平均分(使用round1_score) avg_score_stmt = select(func.avg(Exam.round1_score)).where( and_( Exam.user_id.in_(member_ids), Exam.round1_score.isnot(None), Exam.status.in_(['completed', 'submitted']) ) ) result = await db.execute(avg_score_stmt) avg_score_value = result.scalar() avg_score = round(float(avg_score_value), 1) if avg_score_value else 0.0 data = { "teamCount": team_count, "activeMembers": active_members, "avgProgress": avg_progress, "avgScore": avg_score } return ResponseModel(code=200, message="success", data=data) except Exception as e: logger.error(f"获取团队统计失败: {e}", exc_info=True) return ResponseModel(code=500, message=f"获取团队统计失败: {str(e)}", data=None) @router.get("/members", response_model=ResponseModel[PaginatedResponse]) async def get_team_members( page: int = Query(1, ge=1, description="页码"), size: int = Query(20, ge=1, le=100, description="每页数量"), search_text: Optional[str] = Query(None, description="搜索姓名、岗位"), status: Optional[str] = Query(None, description="筛选状态: active/learning/rest"), position: Optional[str] = Query(None, description="筛选岗位"), current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """ 获取团队成员列表(带筛选、搜索、分页) 返回成员基本信息、学习进度、成绩、学习时长等 """ try: # 获取可访问的团队成员ID member_ids = await get_accessible_team_member_ids(current_user, db) if not member_ids: return ResponseModel( code=200, message="success", data=PaginatedResponse( items=[], total=0, page=page, page_size=size, pages=0 ) ) # 构建基础查询 stmt = select(User).where( and_( User.id.in_(member_ids), User.is_deleted == False # noqa: E712 ) ) # 搜索条件(姓名) if search_text: like_pattern = f"%{search_text}%" stmt = stmt.where( or_( User.full_name.ilike(like_pattern), User.username.ilike(like_pattern) ) ) # 先获取所有符合条件的用户,然后在Python中过滤状态和岗位 result = await db.execute(stmt) all_users = result.scalars().all() # 为每个用户计算详细信息 member_list = [] thirty_days_ago = datetime.now() - timedelta(days=30) for user in all_users: # 获取用户岗位 position_stmt = select(Position.name).select_from(PositionMember).join( Position, Position.id == PositionMember.position_id ).where( and_( PositionMember.user_id == user.id, PositionMember.is_deleted == False # noqa: E712 ) ).limit(1) result = await db.execute(position_stmt) position_name = result.scalar() # 如果有岗位筛选且不匹配,跳过 if position and position_name != position: continue # 获取最近考试时间 last_exam_stmt = select(func.max(Exam.created_at)).where( Exam.user_id == user.id ) result = await db.execute(last_exam_stmt) last_exam = result.scalar() # 获取最近陪练时间 last_practice_stmt = select(func.max(PracticeSession.start_time)).where( PracticeSession.user_id == user.id ) result = await db.execute(last_practice_stmt) last_practice = result.scalar() # 检查是否有进行中的活动 has_ongoing_stmt = select(func.count(Exam.id)).where( and_( Exam.user_id == user.id, Exam.status == 'started' ) ) result = await db.execute(has_ongoing_stmt) has_ongoing = (result.scalar() or 0) > 0 # 计算状态 member_status = calculate_member_status( user.last_login_at, last_exam, last_practice, has_ongoing ) # 如果有状态筛选且不匹配,跳过 if status and member_status != status: continue # 统计学习进度 # 1. 获取岗位分配的课程总数 total_courses_stmt = select( func.count(func.distinct(PositionCourse.course_id)) ).select_from(PositionMember).join( PositionCourse, PositionCourse.position_id == PositionMember.position_id ).where( and_( PositionMember.user_id == user.id, PositionMember.is_deleted == False # noqa: E712 ) ) result = await db.execute(total_courses_stmt) total_courses = result.scalar() or 0 # 2. 统计已完成的考试(及格) completed_courses_stmt = select( func.count(func.distinct(Exam.course_id)) ).where( and_( Exam.user_id == user.id, Exam.round1_score >= 60, Exam.status.in_(['completed', 'submitted']) ) ) result = await db.execute(completed_courses_stmt) completed_courses = result.scalar() or 0 # 3. 计算进度 progress = 0 if total_courses > 0: progress = int((completed_courses / total_courses) * 100) # 统计平均成绩 avg_score_stmt = select(func.avg(Exam.round1_score)).where( and_( Exam.user_id == user.id, Exam.round1_score.isnot(None), Exam.status.in_(['completed', 'submitted']) ) ) result = await db.execute(avg_score_stmt) avg_score_value = result.scalar() avg_score = round(float(avg_score_value), 1) if avg_score_value else 0.0 # 统计学习时长(考试时长+陪练时长) exam_time_stmt = select( func.coalesce(func.sum(Exam.duration_minutes), 0) ).where(Exam.user_id == user.id) result = await db.execute(exam_time_stmt) exam_minutes = float(result.scalar() or 0) practice_time_stmt = select( func.coalesce(func.sum(PracticeSession.duration_seconds), 0) ).where( and_( PracticeSession.user_id == user.id, PracticeSession.status == 'completed' ) ) result = await db.execute(practice_time_stmt) practice_seconds = float(result.scalar() or 0) total_hours = round(exam_minutes / 60 + practice_seconds / 3600, 1) # 获取最近活跃时间 active_times = [t for t in [user.last_login_at, last_exam, last_practice] if t is not None] last_active = max(active_times).strftime("%Y-%m-%d %H:%M") if active_times else "-" member_list.append({ "id": user.id, "name": user.full_name or user.username, "avatar": user.avatar_url or "", "position": position_name or "未分配岗位", "status": member_status, "progress": progress, "completedCourses": completed_courses, "totalCourses": total_courses, "avgScore": avg_score, "studyTime": total_hours, "lastActive": last_active, "joinTime": user.created_at.strftime("%Y-%m-%d") if user.created_at else "-", "email": user.email or "", "phone": user.phone or "", "passRate": 100 if completed_courses > 0 else 0 # 简化计算 }) # 分页 total = len(member_list) pages = (total + size - 1) // size if size > 0 else 0 start = (page - 1) * size end = start + size items = member_list[start:end] return ResponseModel( code=200, message="success", data=PaginatedResponse( items=items, total=total, page=page, page_size=size, pages=pages ) ) except Exception as e: logger.error(f"获取团队成员列表失败: {e}", exc_info=True) return ResponseModel( code=500, message=f"获取团队成员列表失败: {str(e)}", data=None ) @router.get("/members/{member_id}/detail", response_model=ResponseModel) async def get_member_detail( member_id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """ 获取成员详情 返回完整的成员信息和最近学习记录 """ try: # 权限检查:确保member_id在可访问范围内 accessible_ids = await get_accessible_team_member_ids(current_user, db) if member_id not in accessible_ids: return ResponseModel( code=403, message="无权访问该成员信息", data=None ) # 获取用户基本信息 stmt = select(User).where( and_( User.id == member_id, User.is_deleted == False # noqa: E712 ) ) result = await db.execute(stmt) user = result.scalar_one_or_none() if not user: return ResponseModel(code=404, message="成员不存在", data=None) # 获取岗位 position_stmt = select(Position.name).select_from(PositionMember).join( Position, Position.id == PositionMember.position_id ).where( and_( PositionMember.user_id == user.id, PositionMember.is_deleted == False # noqa: E712 ) ).limit(1) result = await db.execute(position_stmt) position_name = result.scalar() or "未分配岗位" # 计算状态 last_exam_stmt = select(func.max(Exam.created_at)).where(Exam.user_id == user.id) result = await db.execute(last_exam_stmt) last_exam = result.scalar() last_practice_stmt = select(func.max(PracticeSession.start_time)).where( PracticeSession.user_id == user.id ) result = await db.execute(last_practice_stmt) last_practice = result.scalar() has_ongoing_stmt = select(func.count(Exam.id)).where( and_( Exam.user_id == user.id, Exam.status == 'started' ) ) result = await db.execute(has_ongoing_stmt) has_ongoing = (result.scalar() or 0) > 0 member_status = calculate_member_status( user.last_login_at, last_exam, last_practice, has_ongoing ) # 统计学习数据 # 学习时长 exam_time_stmt = select(func.coalesce(func.sum(Exam.duration_minutes), 0)).where( Exam.user_id == user.id ) result = await db.execute(exam_time_stmt) exam_minutes = result.scalar() or 0 practice_time_stmt = select( func.coalesce(func.sum(PracticeSession.duration_seconds), 0) ).where( and_( PracticeSession.user_id == user.id, PracticeSession.status == 'completed' ) ) result = await db.execute(practice_time_stmt) practice_seconds = result.scalar() or 0 study_time = round(exam_minutes / 60 + practice_seconds / 3600, 1) # 完成课程数 completed_courses_stmt = select( func.count(func.distinct(Exam.course_id)) ).where( and_( Exam.user_id == user.id, Exam.round1_score >= 60, Exam.status.in_(['completed', 'submitted']) ) ) result = await db.execute(completed_courses_stmt) completed_courses = result.scalar() or 0 # 平均成绩 avg_score_stmt = select(func.avg(Exam.round1_score)).where( and_( Exam.user_id == user.id, Exam.round1_score.isnot(None), Exam.status.in_(['completed', 'submitted']) ) ) result = await db.execute(avg_score_stmt) avg_score_value = result.scalar() avg_score = round(float(avg_score_value), 1) if avg_score_value else 0.0 # 通过率 total_exams_stmt = select(func.count(Exam.id)).where( and_( Exam.user_id == user.id, Exam.status.in_(['completed', 'submitted']) ) ) result = await db.execute(total_exams_stmt) total_exams = result.scalar() or 0 passed_exams_stmt = select(func.count(Exam.id)).where( and_( Exam.user_id == user.id, Exam.round1_score >= 60, Exam.status.in_(['completed', 'submitted']) ) ) result = await db.execute(passed_exams_stmt) passed_exams = result.scalar() or 0 pass_rate = round((passed_exams / total_exams) * 100) if total_exams > 0 else 0 # 获取最近学习记录(最近10条考试和陪练) recent_records = [] # 考试记录 exam_records_stmt = ( select(Exam, Course.name.label('course_name')) .join(Course, Course.id == Exam.course_id) .where( and_( Exam.user_id == user.id, Exam.status.in_(['completed', 'submitted']) ) ) .order_by(Exam.updated_at.desc()) .limit(10) ) result = await db.execute(exam_records_stmt) exam_records = result.all() for exam, course_name in exam_records: score = exam.round1_score or 0 record_type = "success" if score >= 60 else "danger" recent_records.append({ "id": f"exam_{exam.id}", "time": exam.updated_at.strftime("%Y-%m-%d %H:%M"), "content": f"完成《{course_name}》课程考试,成绩:{int(score)}分", "type": record_type }) # 陪练记录 practice_records_stmt = ( select(PracticeSession) .where( and_( PracticeSession.user_id == user.id, PracticeSession.status == 'completed' ) ) .order_by(PracticeSession.end_time.desc()) .limit(5) ) result = await db.execute(practice_records_stmt) practice_records = result.scalars().all() for session in practice_records: recent_records.append({ "id": f"practice_{session.id}", "time": session.end_time.strftime("%Y-%m-%d %H:%M") if session.end_time else "", "content": "参加AI陪练训练", "type": "primary" }) # 按时间排序 recent_records.sort(key=lambda x: x['time'], reverse=True) recent_records = recent_records[:10] data = { "id": user.id, "name": user.full_name or user.username, "avatar": user.avatar_url or "", "position": position_name, "status": member_status, "joinTime": user.created_at.strftime("%Y-%m-%d") if user.created_at else "-", "email": user.email or "", "phone": user.phone or "", "studyTime": study_time, "completedCourses": completed_courses, "avgScore": avg_score, "passRate": pass_rate, "recentRecords": recent_records } return ResponseModel(code=200, message="success", data=data) except Exception as e: logger.error(f"获取成员详情失败: {e}", exc_info=True) return ResponseModel( code=500, message=f"获取成员详情失败: {str(e)}", data=None ) @router.get("/members/{member_id}/report", response_model=ResponseModel) async def get_member_report( member_id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ) -> ResponseModel: """ 获取成员学习报告 返回学习概览、30天进度趋势、能力评估、详细学习记录 """ try: # 权限检查 accessible_ids = await get_accessible_team_member_ids(current_user, db) if member_id not in accessible_ids: return ResponseModel(code=403, message="无权访问该成员信息", data=None) # 获取用户信息 stmt = select(User).where( and_( User.id == member_id, User.is_deleted == False # noqa: E712 ) ) result = await db.execute(stmt) user = result.scalar_one_or_none() if not user: return ResponseModel(code=404, message="成员不存在", data=None) # 1. 报告概览 # 学习总时长 exam_time_stmt = select(func.coalesce(func.sum(Exam.duration_minutes), 0)).where( Exam.user_id == user.id ) result = await db.execute(exam_time_stmt) exam_minutes = result.scalar() or 0 practice_time_stmt = select( func.coalesce(func.sum(PracticeSession.duration_seconds), 0) ).where( and_( PracticeSession.user_id == user.id, PracticeSession.status == 'completed' ) ) result = await db.execute(practice_time_stmt) practice_seconds = result.scalar() or 0 total_hours = round(exam_minutes / 60 + practice_seconds / 3600, 1) # 完成课程数 completed_courses_stmt = select( func.count(func.distinct(Exam.course_id)) ).where( and_( Exam.user_id == user.id, Exam.round1_score >= 60, Exam.status.in_(['completed', 'submitted']) ) ) result = await db.execute(completed_courses_stmt) completed_courses = result.scalar() or 0 # 平均成绩 avg_score_stmt = select(func.avg(Exam.round1_score)).where( and_( Exam.user_id == user.id, Exam.round1_score.isnot(None), Exam.status.in_(['completed', 'submitted']) ) ) result = await db.execute(avg_score_stmt) avg_score_value = result.scalar() avg_score = round(float(avg_score_value), 1) if avg_score_value else 0.0 # 学习排名(简化:在团队中的排名) # TODO: 实现真实排名计算 ranking = "第5名" overview = [ { "label": "学习总时长", "value": f"{total_hours}小时", "icon": "Clock", "color": "#667eea", "bgColor": "rgba(102, 126, 234, 0.1)" }, { "label": "完成课程", "value": f"{completed_courses}门", "icon": "CircleCheck", "color": "#67c23a", "bgColor": "rgba(103, 194, 58, 0.1)" }, { "label": "平均成绩", "value": f"{avg_score}分", "icon": "Trophy", "color": "#e6a23c", "bgColor": "rgba(230, 162, 60, 0.1)" }, { "label": "学习排名", "value": ranking, "icon": "Medal", "color": "#f56c6c", "bgColor": "rgba(245, 108, 108, 0.1)" } ] # 2. 30天学习进度趋势 thirty_days_ago = datetime.now() - timedelta(days=30) dates = [] progress_data = [] for i in range(30): date = thirty_days_ago + timedelta(days=i) dates.append(date.strftime("%m-%d")) # 统计该日期之前完成的考试数 cumulative_exams_stmt = select(func.count(Exam.id)).where( and_( Exam.user_id == user.id, Exam.created_at <= date, Exam.status.in_(['completed', 'submitted']) ) ) result = await db.execute(cumulative_exams_stmt) cumulative = result.scalar() or 0 # 进度 = 累计考试数 * 10(简化计算) progress = min(cumulative * 10, 100) progress_data.append(progress) # 3. 能力评估(从陪练报告聚合) ability_stmt = select(PracticeReport.ability_dimensions).where( PracticeReport.user_id == user.id ) result = await db.execute(ability_stmt) all_dimensions = result.scalars().all() abilities = [] if all_dimensions: # 聚合能力数据 ability_scores: Dict[str, List[float]] = {} for dimensions in all_dimensions: if dimensions: for dim in dimensions: name = dim.get('name', '') score = dim.get('score', 0) if name: if name not in ability_scores: ability_scores[name] = [] ability_scores[name].append(float(score)) # 计算平均分 for name, scores in ability_scores.items(): avg = sum(scores) / len(scores) description = "表现良好" if avg >= 80 else "需要加强" abilities.append({ "name": name, "score": int(avg), "description": description }) else: # 默认能力评估 default_abilities = [ {"name": "沟通表达", "score": 0, "description": "暂无数据"}, {"name": "需求挖掘", "score": 0, "description": "暂无数据"}, {"name": "产品知识", "score": 0, "description": "暂无数据"}, {"name": "成交技巧", "score": 0, "description": "暂无数据"} ] abilities = default_abilities # 4. 详细学习记录(最近20条) records = [] # 考试记录 exam_records_stmt = ( select(Exam, Course.name.label('course_name')) .join(Course, Course.id == Exam.course_id) .where( and_( Exam.user_id == user.id, Exam.status.in_(['completed', 'submitted']) ) ) .order_by(Exam.updated_at.desc()) .limit(20) ) result = await db.execute(exam_records_stmt) exam_records = result.all() for exam, course_name in exam_records: score = exam.round1_score or 0 records.append({ "date": exam.updated_at.strftime("%Y-%m-%d"), "course": course_name, "duration": exam.duration_minutes or 0, "score": int(score), "status": "completed" }) data = { "overview": overview, "progressTrend": { "dates": dates, "data": progress_data }, "abilities": abilities, "records": records[:20] } return ResponseModel(code=200, message="success", data=data) except Exception as e: logger.error(f"获取成员学习报告失败: {e}", exc_info=True) return ResponseModel( code=500, message=f"获取成员学习报告失败: {str(e)}", data=None )