""" 考试报告和错题统计服务 """ from datetime import datetime, timedelta from typing import List, Optional, Dict, Any from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, and_, or_, desc, case, text from app.models.exam import Exam from app.models.exam_mistake import ExamMistake from app.models.course import Course, KnowledgePoint from app.core.logger import get_logger logger = get_logger(__name__) class ExamReportService: """考试报告服务类""" @staticmethod async def get_exam_report( db: AsyncSession, user_id: int, start_date: Optional[str] = None, end_date: Optional[str] = None ) -> Dict[str, Any]: """ 获取成绩报告汇总数据 Args: db: 数据库会话 user_id: 用户ID start_date: 开始日期(YYYY-MM-DD) end_date: 结束日期(YYYY-MM-DD) Returns: Dict: 包含overview、trends、subjects、recent_exams的完整报告数据 """ logger.info(f"获取成绩报告 - user_id: {user_id}, start_date: {start_date}, end_date: {end_date}") # 构建基础查询条件 conditions = [Exam.user_id == user_id] # 添加时间范围条件 if start_date: conditions.append(Exam.start_time >= start_date) if end_date: # 结束日期包含当天全部 end_datetime = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1) conditions.append(Exam.start_time < end_datetime) # 1. 获取概览数据 overview = await ExamReportService._get_overview(db, conditions) # 2. 获取趋势数据(最近30天) trends = await ExamReportService._get_trends(db, user_id, conditions) # 3. 获取科目分析 subjects = await ExamReportService._get_subjects(db, conditions) # 4. 获取最近考试记录 recent_exams = await ExamReportService._get_recent_exams(db, conditions) return { "overview": overview, "trends": trends, "subjects": subjects, "recent_exams": recent_exams } @staticmethod async def _get_overview(db: AsyncSession, conditions: List) -> Dict[str, Any]: """获取概览数据""" # 查询统计数据(使用round1_score作为主要成绩) stmt = select( func.count(Exam.id).label("total_exams"), func.avg(Exam.round1_score).label("avg_score"), func.sum(Exam.question_count).label("total_questions"), func.count(case((Exam.is_passed == True, 1))).label("passed_exams") ).where( and_(*conditions, Exam.round1_score.isnot(None)) ) result = await db.execute(stmt) stats = result.one() total_exams = stats.total_exams or 0 passed_exams = stats.passed_exams or 0 return { "avg_score": round(float(stats.avg_score or 0), 1), "total_exams": total_exams, "pass_rate": round((passed_exams / total_exams * 100) if total_exams > 0 else 0, 1), "total_questions": stats.total_questions or 0 } @staticmethod async def _get_trends( db: AsyncSession, user_id: int, base_conditions: List ) -> List[Dict[str, Any]]: """获取成绩趋势(最近30天)""" # 计算30天前的日期 thirty_days_ago = datetime.now() - timedelta(days=30) # 查询最近30天的考试数据,按日期分组 stmt = select( func.date(Exam.start_time).label("exam_date"), func.avg(Exam.round1_score).label("avg_score") ).where( and_( Exam.user_id == user_id, Exam.start_time >= thirty_days_ago, Exam.round1_score.isnot(None) ) ).group_by( func.date(Exam.start_time) ).order_by( func.date(Exam.start_time) ) result = await db.execute(stmt) trend_data = result.all() # 转换为前端需要的格式 trends = [] for row in trend_data: trends.append({ "date": row.exam_date.strftime("%Y-%m-%d") if row.exam_date else "", "avg_score": round(float(row.avg_score or 0), 1) }) return trends @staticmethod async def _get_subjects(db: AsyncSession, conditions: List) -> List[Dict[str, Any]]: """获取科目分析""" # 关联course表,按课程分组统计 stmt = select( Exam.course_id, Course.name.label("course_name"), func.avg(Exam.round1_score).label("avg_score"), func.count(Exam.id).label("exam_count"), func.max(Exam.round1_score).label("max_score"), func.min(Exam.round1_score).label("min_score"), func.count(case((Exam.is_passed == True, 1))).label("passed_count") ).join( Course, Exam.course_id == Course.id ).where( and_(*conditions, Exam.round1_score.isnot(None)) ).group_by( Exam.course_id, Course.name ).order_by( desc(func.count(Exam.id)) ) result = await db.execute(stmt) subject_data = result.all() subjects = [] for row in subject_data: exam_count = row.exam_count or 0 passed_count = row.passed_count or 0 subjects.append({ "course_id": row.course_id, "course_name": row.course_name, "avg_score": round(float(row.avg_score or 0), 1), "exam_count": exam_count, "max_score": round(float(row.max_score or 0), 1), "min_score": round(float(row.min_score or 0), 1), "pass_rate": round((passed_count / exam_count * 100) if exam_count > 0 else 0, 1) }) return subjects @staticmethod async def _get_recent_exams(db: AsyncSession, conditions: List) -> List[Dict[str, Any]]: """获取最近10次考试记录""" # 查询最近10次考试,包含三轮得分 stmt = select( Exam.id, Exam.course_id, Course.name.label("course_name"), Exam.score, Exam.total_score, Exam.is_passed, Exam.start_time, Exam.end_time, Exam.round1_score, Exam.round2_score, Exam.round3_score ).join( Course, Exam.course_id == Course.id ).where( and_(*conditions) ).order_by( desc(Exam.created_at) # 改为按创建时间排序,避免start_time为NULL的问题 ).limit(10) result = await db.execute(stmt) exam_data = result.all() recent_exams = [] for row in exam_data: # 计算考试用时 duration_seconds = None if row.start_time and row.end_time: duration_seconds = int((row.end_time - row.start_time).total_seconds()) recent_exams.append({ "id": row.id, "course_id": row.course_id, "course_name": row.course_name, "score": round(float(row.score), 1) if row.score else None, "total_score": round(float(row.total_score or 100), 1), "is_passed": row.is_passed, "duration_seconds": duration_seconds, "start_time": row.start_time.isoformat() if row.start_time else None, "end_time": row.end_time.isoformat() if row.end_time else None, "round_scores": { "round1": round(float(row.round1_score), 1) if row.round1_score else None, "round2": round(float(row.round2_score), 1) if row.round2_score else None, "round3": round(float(row.round3_score), 1) if row.round3_score else None } }) return recent_exams class MistakeService: """错题服务类""" @staticmethod async def get_mistakes_list( db: AsyncSession, user_id: int, exam_id: Optional[int] = None, course_id: Optional[int] = None, question_type: Optional[str] = None, search: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, page: int = 1, size: int = 10 ) -> Dict[str, Any]: """ 获取错题列表(支持多维度筛选) Args: db: 数据库会话 user_id: 用户ID exam_id: 考试ID(可选) course_id: 课程ID(可选) question_type: 题型(可选) search: 关键词搜索(可选) start_date: 开始日期(可选) end_date: 结束日期(可选) page: 页码 size: 每页数量 Returns: Dict: 包含items、total、page、size、pages的分页数据 """ logger.info(f"获取错题列表 - user_id: {user_id}, exam_id: {exam_id}, course_id: {course_id}") # 构建查询条件 conditions = [ExamMistake.user_id == user_id] if exam_id: conditions.append(ExamMistake.exam_id == exam_id) if question_type: conditions.append(ExamMistake.question_type == question_type) if search: conditions.append(ExamMistake.question_content.like(f"%{search}%")) if start_date: conditions.append(ExamMistake.created_at >= start_date) if end_date: end_datetime = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1) conditions.append(ExamMistake.created_at < end_datetime) # 如果指定了course_id,需要通过exam关联 if course_id: conditions.append(Exam.course_id == course_id) # 查询总数 count_stmt = select(func.count(ExamMistake.id)).select_from(ExamMistake).join( Exam, ExamMistake.exam_id == Exam.id ).where(and_(*conditions)) total_result = await db.execute(count_stmt) total = total_result.scalar() or 0 # 查询分页数据 offset = (page - 1) * size stmt = select( ExamMistake.id, ExamMistake.exam_id, Exam.course_id, Course.name.label("course_name"), ExamMistake.question_content, ExamMistake.correct_answer, ExamMistake.user_answer, ExamMistake.question_type, ExamMistake.knowledge_point_id, KnowledgePoint.name.label("knowledge_point_name"), ExamMistake.created_at ).select_from(ExamMistake).join( Exam, ExamMistake.exam_id == Exam.id ).join( Course, Exam.course_id == Course.id ).outerjoin( KnowledgePoint, ExamMistake.knowledge_point_id == KnowledgePoint.id ).where( and_(*conditions) ).order_by( desc(ExamMistake.created_at) ).offset(offset).limit(size) result = await db.execute(stmt) mistakes = result.all() # 构建返回数据 items = [] for row in mistakes: items.append({ "id": row.id, "exam_id": row.exam_id, "course_id": row.course_id, "course_name": row.course_name, "question_content": row.question_content, "correct_answer": row.correct_answer, "user_answer": row.user_answer, "question_type": row.question_type, "knowledge_point_id": row.knowledge_point_id, "knowledge_point_name": row.knowledge_point_name, "created_at": row.created_at }) pages = (total + size - 1) // size return { "items": items, "total": total, "page": page, "size": size, "pages": pages } @staticmethod async def get_mistakes_statistics( db: AsyncSession, user_id: int, course_id: Optional[int] = None ) -> Dict[str, Any]: """ 获取错题统计数据 Args: db: 数据库会话 user_id: 用户ID course_id: 课程ID(可选) Returns: Dict: 包含total、by_course、by_type、by_time的统计数据 """ logger.info(f"获取错题统计 - user_id: {user_id}, course_id: {course_id}") # 基础条件 base_conditions = [ExamMistake.user_id == user_id] if course_id: base_conditions.append(Exam.course_id == course_id) # 1. 总数统计 count_stmt = select(func.count(ExamMistake.id)).select_from(ExamMistake).join( Exam, ExamMistake.exam_id == Exam.id ).where(and_(*base_conditions)) total_result = await db.execute(count_stmt) total = total_result.scalar() or 0 # 2. 按课程统计 by_course_stmt = select( Exam.course_id, Course.name.label("course_name"), func.count(ExamMistake.id).label("count") ).select_from(ExamMistake).join( Exam, ExamMistake.exam_id == Exam.id ).join( Course, Exam.course_id == Course.id ).where( ExamMistake.user_id == user_id ).group_by( Exam.course_id, Course.name ).order_by( desc(func.count(ExamMistake.id)) ) by_course_result = await db.execute(by_course_stmt) by_course_data = by_course_result.all() by_course = [ { "course_id": row.course_id, "course_name": row.course_name, "count": row.count } for row in by_course_data ] # 3. 按题型统计 by_type_stmt = select( ExamMistake.question_type, func.count(ExamMistake.id).label("count") ).where( and_(ExamMistake.user_id == user_id, ExamMistake.question_type.isnot(None)) ).group_by( ExamMistake.question_type ) by_type_result = await db.execute(by_type_stmt) by_type_data = by_type_result.all() # 题型名称映射 type_names = { "single": "单选题", "multiple": "多选题", "judge": "判断题", "blank": "填空题", "essay": "问答题" } by_type = [ { "type": row.question_type, "type_name": type_names.get(row.question_type, "未知类型"), "count": row.count } for row in by_type_data ] # 4. 按时间统计 now = datetime.now() week_ago = now - timedelta(days=7) month_ago = now - timedelta(days=30) quarter_ago = now - timedelta(days=90) # 最近一周 week_stmt = select(func.count(ExamMistake.id)).where( and_(ExamMistake.user_id == user_id, ExamMistake.created_at >= week_ago) ) week_result = await db.execute(week_stmt) week_count = week_result.scalar() or 0 # 最近一月 month_stmt = select(func.count(ExamMistake.id)).where( and_(ExamMistake.user_id == user_id, ExamMistake.created_at >= month_ago) ) month_result = await db.execute(month_stmt) month_count = month_result.scalar() or 0 # 最近三月 quarter_stmt = select(func.count(ExamMistake.id)).where( and_(ExamMistake.user_id == user_id, ExamMistake.created_at >= quarter_ago) ) quarter_result = await db.execute(quarter_stmt) quarter_count = quarter_result.scalar() or 0 by_time = { "week": week_count, "month": month_count, "quarter": quarter_count } return { "total": total, "by_course": by_course, "by_type": by_type, "by_time": by_time }