""" 考试相关API路由 """ from typing import List, Optional import json from datetime import datetime from fastapi import APIRouter, Depends, Query, HTTPException, status, Request from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.core.deps import get_db, get_current_user from app.core.config import get_settings from app.core.logger import get_logger from app.models.user import User from app.models.exam import Exam from app.models.exam_mistake import ExamMistake from app.models.position_member import PositionMember from app.models.position_course import PositionCourse from app.schemas.base import ResponseModel from app.services.exam_service import ExamService from app.schemas.exam import ( StartExamRequest, StartExamResponse, SubmitExamRequest, SubmitExamResponse, ExamDetailResponse, ExamRecordResponse, GenerateExamRequest, GenerateExamResponse, JudgeAnswerRequest, JudgeAnswerResponse, RecordMistakeRequest, RecordMistakeResponse, GetMistakesResponse, MistakeRecordItem, # 新增的Schema ExamReportResponse, MistakeListResponse, MistakesStatisticsResponse, UpdateRoundScoreRequest, ) from app.services.exam_report_service import ExamReportService, MistakeService from app.services.course_statistics_service import course_statistics_service from app.services.system_log_service import system_log_service from app.schemas.system_log import SystemLogCreate # V2 原生服务:Python 实现 from app.services.ai import exam_generator_service, ExamGeneratorConfig from app.services.ai.answer_judge_service import answer_judge_service from app.core.exceptions import ExternalServiceError logger = get_logger(__name__) settings = get_settings() router = APIRouter(prefix="/exams", tags=["考试"]) @router.post("/start", response_model=ResponseModel[StartExamResponse]) async def start_exam( request: StartExamRequest, http_request: Request, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """开始考试""" # 先提取用户信息,避免后续懒加载问题 user_id = current_user.id username = current_user.username exam = await ExamService.start_exam( db=db, user_id=user_id, course_id=request.course_id, question_count=request.count, ) # 异步更新课程学员数统计 try: await course_statistics_service.update_course_student_count(db, request.course_id) except Exception as e: logger.warning(f"更新课程学员数失败: {str(e)}") # 不影响主流程,只记录警告 # 记录考试开始日志 await system_log_service.create_log( db, SystemLogCreate( level="INFO", type="api", message=f"用户 {username} 开始考试(课程ID: {request.course_id})", user_id=user_id, user=username, ip=http_request.client.host if http_request.client else None, path="/api/v1/exams/start", method="POST", user_agent=http_request.headers.get("user-agent") ) ) return ResponseModel(code=200, data=StartExamResponse(exam_id=exam.id), message="考试开始") @router.post("/submit", response_model=ResponseModel[SubmitExamResponse]) async def submit_exam( request: SubmitExamRequest, http_request: Request, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """提交考试答案""" # 先提取用户信息,避免后续懒加载问题 user_id = current_user.id username = current_user.username result = await ExamService.submit_exam( db=db, user_id=user_id, exam_id=request.exam_id, answers=request.answers ) # 获取考试记录以获取course_id exam_stmt = select(Exam).where(Exam.id == request.exam_id) exam_result = await db.execute(exam_stmt) exam = exam_result.scalar_one_or_none() # 异步更新课程学员数统计 if exam and exam.course_id: try: await course_statistics_service.update_course_student_count(db, exam.course_id) except Exception as e: logger.warning(f"更新课程学员数失败: {str(e)}") # 不影响主流程,只记录警告 # 记录考试提交日志 await system_log_service.create_log( db, SystemLogCreate( level="INFO", type="api", message=f"用户 {username} 提交考试(考试ID: {request.exam_id},得分: {result.get('score', 0)})", user_id=user_id, user=username, ip=http_request.client.host if http_request.client else None, path="/api/v1/exams/submit", method="POST", user_agent=http_request.headers.get("user-agent") ) ) # 考试通过时触发经验值和奖章检查 exp_result = None new_badges = [] if result.get("is_passed"): try: from app.services.level_service import LevelService from app.services.badge_service import BadgeService level_service = LevelService(db) badge_service = BadgeService(db) # 添加考试经验值 exp_result = await level_service.add_exam_exp( user_id=current_user.id, exam_id=request.exam_id, score=result.get("total_score", 0), is_passed=True ) # 检查是否解锁新奖章 new_badges = await badge_service.check_and_award_badges(current_user.id) await db.commit() except Exception as e: logger.warning(f"考试经验值/奖章处理失败: {str(e)}") # 将经验值结果添加到返回数据 response_data = SubmitExamResponse(**result) return ResponseModel( code=200, data={ **response_data.model_dump(), "exp_result": exp_result, "new_badges": new_badges }, message="考试提交成功" ) @router.get("/mistakes", response_model=ResponseModel[GetMistakesResponse]) async def get_mistakes( exam_id: int, round: int = Query(None, description="获取指定轮次的错题,不传则获取所有轮次"), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 获取错题记录 用于第二、三轮考试时获取上一轮的错题记录 返回的数据可直接序列化为JSON字符串作为mistake_records参数传给考试生成接口 参数说明: - exam_id: 考试ID - round: 指定轮次(1/2/3),用于获取特定轮次的错题 第2轮考试时传入round=1,获取第1轮的错题 第3轮考试时传入round=2,获取第2轮的错题 不传则获取该考试的所有错题 """ logger.info(f"📋 GET /mistakes 收到请求") try: logger.info(f"📋 获取错题记录 - exam_id: {exam_id}, round: {round}, user_id: {current_user.id}") # 构建查询条件 query = select(ExamMistake).where( ExamMistake.exam_id == exam_id, ExamMistake.user_id == current_user.id, ) # 如果指定了轮次,只获取该轮次的错题 if round is not None: query = query.where(ExamMistake.round == round) query = query.order_by(ExamMistake.id) result = await db.execute(query) mistakes = result.scalars().all() logger.info(f"✅ 查询到错题记录数量: {len(mistakes)}") # 转换为响应格式 mistake_items = [ MistakeRecordItem( id=m.id, question_id=m.question_id, knowledge_point_id=m.knowledge_point_id, question_content=m.question_content, correct_answer=m.correct_answer, user_answer=m.user_answer, created_at=m.created_at, ) for m in mistakes ] logger.info( f"获取错题记录成功 - user_id: {current_user.id}, exam_id: {exam_id}, " f"count: {len(mistake_items)}" ) # 返回统一的ResponseModel格式,让Pydantic自动处理序列化 return ResponseModel( code=200, message="获取错题记录成功", data=GetMistakesResponse( mistakes=mistake_items ) ) except Exception as e: logger.error(f"获取错题记录失败: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"获取错题记录失败: {str(e)}" ) @router.get("/{exam_id}", response_model=ResponseModel[ExamDetailResponse]) async def get_exam_detail( exam_id: int, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """获取考试详情""" exam_data = await ExamService.get_exam_detail( db=db, user_id=current_user.id, exam_id=exam_id ) return ResponseModel(code=200, data=ExamDetailResponse(**exam_data), message="获取成功") @router.get("/records", response_model=ResponseModel[dict]) async def get_exam_records( page: int = Query(1, ge=1), size: int = Query(10, ge=1, le=100), course_id: Optional[int] = Query(None), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """获取考试记录列表""" records = await ExamService.get_exam_records( db=db, user_id=current_user.id, page=page, size=size, course_id=course_id ) return ResponseModel(code=200, data=records, message="获取成功") @router.get("/statistics/summary", response_model=ResponseModel[dict]) async def get_exam_statistics( course_id: Optional[int] = Query(None), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """获取考试统计信息""" stats = await ExamService.get_exam_statistics( db=db, user_id=current_user.id, course_id=course_id ) return ResponseModel(code=200, data=stats, message="获取成功") # ==================== 试题生成接口 ==================== @router.post("/generate", response_model=ResponseModel[GenerateExamResponse]) async def generate_exam( request: GenerateExamRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 生成考试试题 使用 Python 原生 AI 服务实现。 考试轮次说明: - 第一轮考试:mistake_records 传空或不传 - 第二、三轮错题重考:mistake_records 传入上一轮错题记录的JSON字符串 """ from app.models.course import Course try: # 验证课程存在性 course = await db.get(Course, request.course_id) if not course or course.is_deleted: logger.warning(f"课程不存在或已删除: course_id={request.course_id}") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"无法生成试题:课程ID {request.course_id} 不存在或已被删除" ) # 从用户信息中自动获取岗位ID(如果未提供) position_id = request.position_id if not position_id: # 1. 首先查询用户已分配的岗位 result = await db.execute( select(PositionMember).where( PositionMember.user_id == current_user.id, PositionMember.is_deleted == False ).limit(1) ) position_member = result.scalar_one_or_none() if position_member: position_id = position_member.position_id else: # 2. 如果用户没有岗位,从课程关联的岗位中获取第一个 result = await db.execute( select(PositionCourse.position_id).where( PositionCourse.course_id == request.course_id, PositionCourse.is_deleted == False ).limit(1) ) course_position = result.scalar_one_or_none() if course_position: position_id = course_position logger.info(f"用户 {current_user.id} 没有分配岗位,使用课程关联的岗位ID: {position_id}") else: # 3. 如果课程也没有关联岗位,抛出错误 logger.warning(f"用户 {current_user.id} 没有分配岗位,且课程 {request.course_id} 未关联任何岗位") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="无法生成试题:用户未分配岗位,且课程未关联任何岗位" ) # 记录详细的题型设置(用于调试) logger.info( f"考试题型设置 - 单选:{request.single_choice_count}, 多选:{request.multiple_choice_count}, " f"判断:{request.true_false_count}, 填空:{request.fill_blank_count}, 问答:{request.essay_count}, " f"难度:{request.difficulty_level}" ) # 调用 Python 原生试题生成服务 logger.info( f"调用原生试题生成服务 - user_id: {current_user.id}, " f"course_id: {request.course_id}, position_id: {position_id}" ) # 构建配置 config = ExamGeneratorConfig( course_id=request.course_id, position_id=position_id, single_choice_count=request.single_choice_count or 0, multiple_choice_count=request.multiple_choice_count or 0, true_false_count=request.true_false_count or 0, fill_blank_count=request.fill_blank_count or 0, essay_count=request.essay_count or 0, difficulty_level=request.difficulty_level or 3, mistake_records=request.mistake_records or "", ) # 调用原生服务 gen_result = await exam_generator_service.generate_exam(db, config) if not gen_result.get("success"): raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="试题生成服务返回失败" ) # 将题目列表转为 JSON 字符串(兼容原有前端格式) result_data = json.dumps(gen_result.get("questions", []), ensure_ascii=False) logger.info( f"试题生成完成 - questions: {gen_result.get('total_count')}, " f"provider: {gen_result.get('ai_provider')}, latency: {gen_result.get('ai_latency_ms')}ms" ) if result_data is None or result_data == "": logger.error(f"试题生成未返回有效结果数据") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="试题生成失败: 未返回结果数据" ) # 创建或复用考试记录 question_count = sum([ request.single_choice_count or 0, request.multiple_choice_count or 0, request.true_false_count or 0, request.fill_blank_count or 0, request.essay_count or 0 ]) # 第一轮:创建新的exam记录 if request.current_round == 1: exam = Exam( user_id=current_user.id, course_id=request.course_id, exam_name=f"课程{request.course_id}考试", question_count=question_count, total_score=100.0, pass_score=60.0, duration_minutes=60, status="started", start_time=datetime.now(), questions=None, answers=None, ) db.add(exam) await db.commit() await db.refresh(exam) logger.info(f"第{request.current_round}轮:创建考试记录成功 - exam_id: {exam.id}") else: # 第二、三轮:复用已有exam记录 if not request.exam_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"第{request.current_round}轮考试必须提供exam_id" ) exam = await db.get(Exam, request.exam_id) if not exam: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="考试记录不存在" ) if exam.user_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="无权访问此考试记录" ) logger.info(f"第{request.current_round}轮:复用考试记录 - exam_id: {exam.id}") return ResponseModel( code=200, message="试题生成成功", data=GenerateExamResponse( result=result_data, workflow_run_id=f"{gen_result.get('ai_provider')}_{gen_result.get('ai_latency_ms')}ms", task_id=f"native_{request.course_id}", exam_id=exam.id, ) ) except HTTPException: raise except Exception as e: logger.error(f"生成考试试题失败: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"试题生成失败: {str(e)}" ) @router.post("/judge-answer", response_model=ResponseModel[JudgeAnswerResponse]) async def judge_answer( request: JudgeAnswerRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 判断主观题答案 适用于填空题和问答题的答案判断。 使用 Python 原生 AI 服务实现。 """ try: logger.info( f"调用原生答案判断服务 - user_id: {current_user.id}, " f"question: {request.question[:50]}..." ) result = await answer_judge_service.judge( question=request.question, correct_answer=request.correct_answer, user_answer=request.user_answer, analysis=request.analysis, db=db # 传入 db_session 用于记录调用日志 ) logger.info( f"答案判断完成 - is_correct: {result.is_correct}, " f"provider: {result.ai_provider}, latency: {result.ai_latency_ms}ms" ) return ResponseModel( code=200, message="答案判断完成", data=JudgeAnswerResponse( is_correct=result.is_correct, correct_answer=request.correct_answer, feedback=result.raw_response if not result.is_correct else None, ) ) except Exception as e: logger.error(f"答案判断失败: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"答案判断失败: {str(e)}" ) @router.post("/record-mistake", response_model=ResponseModel[RecordMistakeResponse]) async def record_mistake( request: RecordMistakeRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 记录错题 当用户答错题目时,立即调用此接口记录到错题表 """ try: # 创建错题记录 # 注意:knowledge_point_id暂时设置为None,避免外键约束失败 mistake = ExamMistake( user_id=current_user.id, exam_id=request.exam_id, round=request.round, # 记录考试轮次 question_id=request.question_id, knowledge_point_id=None, # 暂时设为None,避免外键约束 question_content=request.question_content, correct_answer=request.correct_answer, user_answer=request.user_answer, question_type=request.question_type, # 新增:记录题型 ) if request.knowledge_point_id: logger.info(f"原始knowledge_point_id={request.knowledge_point_id},已设置为NULL(待同步生产数据)") db.add(mistake) await db.commit() await db.refresh(mistake) logger.info( f"记录错题成功 - user_id: {current_user.id}, exam_id: {request.exam_id}, " f"round: {request.round}, mistake_id: {mistake.id}" ) return ResponseModel( code=200, message="错题记录成功", data=RecordMistakeResponse( id=mistake.id, created_at=mistake.created_at, ) ) except Exception as e: await db.rollback() logger.error(f"记录错题失败: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"记录错题失败: {str(e)}" ) @router.get("/mistakes-debug") async def get_mistakes_debug( exam_id: int, ): """调试endpoint - 不需要认证""" logger.info(f"🔍 调试 - exam_id: {exam_id}, type: {type(exam_id)}") return {"exam_id": exam_id, "type": str(type(exam_id))} # ==================== 成绩报告和错题本相关接口 ==================== @router.get("/statistics/report", response_model=ResponseModel[ExamReportResponse]) async def get_exam_report( start_date: Optional[str] = Query(None, description="开始日期(YYYY-MM-DD)"), end_date: Optional[str] = Query(None, description="结束日期(YYYY-MM-DD)"), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 获取成绩报告汇总 返回包含概览、趋势、科目分析、最近考试记录的完整报告 """ try: report_data = await ExamReportService.get_exam_report( db=db, user_id=current_user.id, start_date=start_date, end_date=end_date ) return ResponseModel(code=200, data=report_data, message="获取成绩报告成功") except Exception as e: logger.error(f"获取成绩报告失败: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"获取成绩报告失败: {str(e)}" ) @router.get("/mistakes/list", response_model=ResponseModel[MistakeListResponse]) async def get_mistakes_list( exam_id: Optional[int] = Query(None, description="考试ID"), course_id: Optional[int] = Query(None, description="课程ID"), question_type: Optional[str] = Query(None, description="题型(single/multiple/judge/blank/essay)"), search: Optional[str] = Query(None, description="关键词搜索"), start_date: Optional[str] = Query(None, description="开始日期(YYYY-MM-DD)"), end_date: Optional[str] = Query(None, description="结束日期(YYYY-MM-DD)"), page: int = Query(1, ge=1, description="页码"), size: int = Query(10, ge=1, le=100, description="每页数量"), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 获取错题列表(支持多维度筛选) - 不传exam_id时返回用户所有错题 - 支持按course_id、question_type、关键词、时间范围筛选 """ try: mistakes_data = await MistakeService.get_mistakes_list( db=db, user_id=current_user.id, exam_id=exam_id, course_id=course_id, question_type=question_type, search=search, start_date=start_date, end_date=end_date, page=page, size=size ) return ResponseModel(code=200, data=mistakes_data, message="获取错题列表成功") except Exception as e: logger.error(f"获取错题列表失败: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"获取错题列表失败: {str(e)}" ) @router.get("/mistakes/statistics", response_model=ResponseModel[MistakesStatisticsResponse]) async def get_mistakes_statistics( course_id: Optional[int] = Query(None, description="课程ID"), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 获取错题统计数据 返回按课程、题型、时间维度的统计数据 """ try: stats_data = await MistakeService.get_mistakes_statistics( db=db, user_id=current_user.id, course_id=course_id ) return ResponseModel(code=200, data=stats_data, message="获取错题统计成功") except Exception as e: logger.error(f"获取错题统计失败: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"获取错题统计失败: {str(e)}" ) @router.put("/{exam_id}/round-score", response_model=ResponseModel[dict]) async def update_round_score( exam_id: int, request: UpdateRoundScoreRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 更新某轮的得分 用于前端每轮考试结束后更新对应轮次的得分 """ try: # 查询考试记录 exam = await db.get(Exam, exam_id) if not exam: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="考试记录不存在" ) # 验证权限 if exam.user_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="无权修改此考试记录" ) # 更新对应轮次的得分 if request.round == 1: exam.round1_score = request.score elif request.round == 2: exam.round2_score = request.score elif request.round == 3: exam.round3_score = request.score # 第三轮默认就是 final request.is_final = True # 如果是最终轮次(可能是第1/2轮就全对了),更新总分和状态 if request.is_final: exam.score = request.score exam.status = "submitted" # 计算是否通过 (pass_score 为空默认 60) exam.is_passed = request.score >= (exam.pass_score or 60) # 更新结束时间 from datetime import datetime exam.end_time = datetime.now() await db.commit() logger.info(f"更新轮次得分成功 - exam_id: {exam_id}, round: {request.round}, score: {request.score}") return ResponseModel(code=200, data={"exam_id": exam_id}, message="更新得分成功") except HTTPException: raise except Exception as e: await db.rollback() logger.error(f"更新轮次得分失败: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"更新轮次得分失败: {str(e)}" ) @router.put("/mistakes/{mistake_id}/mastered", response_model=ResponseModel) async def mark_mistake_mastered( mistake_id: int, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 标记错题为已掌握 Args: mistake_id: 错题记录ID db: 数据库会话 current_user: 当前用户 Returns: ResponseModel: 标记结果 """ try: # 查询错题记录 stmt = select(ExamMistake).where( ExamMistake.id == mistake_id, ExamMistake.user_id == current_user.id ) result = await db.execute(stmt) mistake = result.scalar_one_or_none() if not mistake: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="错题记录不存在或无权访问" ) # 更新掌握状态 from datetime import datetime as dt mistake.mastery_status = 'mastered' mistake.mastered_at = dt.utcnow() await db.commit() logger.info(f"标记错题已掌握成功 - mistake_id: {mistake_id}, user_id: {current_user.id}") return ResponseModel( code=200, message="已标记为掌握", data={"mistake_id": mistake_id, "mastery_status": "mastered"} ) except HTTPException: raise except Exception as e: await db.rollback() logger.error(f"标记错题已掌握失败: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"标记失败: {str(e)}" )