Files
012-kaopeilian/backend/app/api/v1/exam.py
111 442ac78b56
Some checks failed
continuous-integration/drone/push Build is failing
sync: 同步服务器最新代码 (2026-01-27)
更新内容:
- 后端 AI 服务优化(能力分析、知识点解析等)
- 前端考试和陪练界面更新
- 修复多个 prompt 和 JSON 解析问题
- 更新 Coze 语音客户端
2026-01-27 10:03:28 +08:00

788 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
考试相关API路由
"""
from typing import List, Optional
import json
from datetime import datetime
from fastapi import APIRouter, Depends, Query, HTTPException, status, Request
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.core.deps import get_db, get_current_user
from app.core.config import get_settings
from app.core.logger import get_logger
from app.models.user import User
from app.models.exam import Exam
from app.models.exam_mistake import ExamMistake
from app.models.position_member import PositionMember
from app.models.position_course import PositionCourse
from app.schemas.base import ResponseModel
from app.schemas.exam import (
StartExamRequest,
StartExamResponse,
SubmitExamRequest,
SubmitExamResponse,
ExamDetailResponse,
ExamRecordResponse,
GenerateExamRequest,
GenerateExamResponse,
JudgeAnswerRequest,
JudgeAnswerResponse,
RecordMistakeRequest,
RecordMistakeResponse,
GetMistakesResponse,
MistakeRecordItem,
# 新增的Schema
ExamReportResponse,
MistakeListResponse,
MistakesStatisticsResponse,
UpdateRoundScoreRequest,
)
from app.services.exam_report_service import ExamReportService, MistakeService
from app.services.course_statistics_service import course_statistics_service
from app.services.system_log_service import system_log_service
from app.schemas.system_log import SystemLogCreate
# V2 原生服务Python 实现
from app.services.ai import exam_generator_service, ExamGeneratorConfig
from app.services.ai.answer_judge_service import answer_judge_service
from app.core.exceptions import ExternalServiceError
logger = get_logger(__name__)
settings = get_settings()
router = APIRouter(prefix="/exams", tags=["考试"])
@router.post("/start", response_model=ResponseModel[StartExamResponse])
async def start_exam(
request: StartExamRequest,
http_request: Request,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""开始考试"""
exam = await ExamService.start_exam(
db=db,
user_id=current_user.id,
course_id=request.course_id,
question_count=request.count,
)
# 异步更新课程学员数统计
try:
await course_statistics_service.update_course_student_count(db, request.course_id)
except Exception as e:
logger.warning(f"更新课程学员数失败: {str(e)}")
# 不影响主流程,只记录警告
# 记录考试开始日志
await system_log_service.create_log(
db,
SystemLogCreate(
level="INFO",
type="api",
message=f"用户 {current_user.username} 开始考试课程ID: {request.course_id}",
user_id=current_user.id,
user=current_user.username,
ip=http_request.client.host if http_request.client else None,
path="/api/v1/exams/start",
method="POST",
user_agent=http_request.headers.get("user-agent")
)
)
return ResponseModel(code=200, data=StartExamResponse(exam_id=exam.id), message="考试开始")
@router.post("/submit", response_model=ResponseModel[SubmitExamResponse])
async def submit_exam(
request: SubmitExamRequest,
http_request: Request,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""提交考试答案"""
result = await ExamService.submit_exam(
db=db, user_id=current_user.id, exam_id=request.exam_id, answers=request.answers
)
# 获取考试记录以获取course_id
exam_stmt = select(Exam).where(Exam.id == request.exam_id)
exam_result = await db.execute(exam_stmt)
exam = exam_result.scalar_one_or_none()
# 异步更新课程学员数统计
if exam and exam.course_id:
try:
await course_statistics_service.update_course_student_count(db, exam.course_id)
except Exception as e:
logger.warning(f"更新课程学员数失败: {str(e)}")
# 不影响主流程,只记录警告
# 记录考试提交日志
await system_log_service.create_log(
db,
SystemLogCreate(
level="INFO",
type="api",
message=f"用户 {current_user.username} 提交考试考试ID: {request.exam_id},得分: {result.get('score', 0)}",
user_id=current_user.id,
user=current_user.username,
ip=http_request.client.host if http_request.client else None,
path="/api/v1/exams/submit",
method="POST",
user_agent=http_request.headers.get("user-agent")
)
)
return ResponseModel(code=200, data=SubmitExamResponse(**result), message="考试提交成功")
@router.get("/mistakes", response_model=ResponseModel[GetMistakesResponse])
async def get_mistakes(
exam_id: int,
round: int = Query(None, description="获取指定轮次的错题,不传则获取所有轮次"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
获取错题记录
用于第二、三轮考试时获取上一轮的错题记录
返回的数据可直接序列化为JSON字符串作为mistake_records参数传给考试生成接口
参数说明:
- exam_id: 考试ID
- round: 指定轮次1/2/3用于获取特定轮次的错题
第2轮考试时传入round=1获取第1轮的错题
第3轮考试时传入round=2获取第2轮的错题
不传则获取该考试的所有错题
"""
logger.info(f"📋 GET /mistakes 收到请求")
try:
logger.info(f"📋 获取错题记录 - exam_id: {exam_id}, round: {round}, user_id: {current_user.id}")
# 构建查询条件
query = select(ExamMistake).where(
ExamMistake.exam_id == exam_id,
ExamMistake.user_id == current_user.id,
)
# 如果指定了轮次,只获取该轮次的错题
if round is not None:
query = query.where(ExamMistake.round == round)
query = query.order_by(ExamMistake.id)
result = await db.execute(query)
mistakes = result.scalars().all()
logger.info(f"✅ 查询到错题记录数量: {len(mistakes)}")
# 转换为响应格式
mistake_items = [
MistakeRecordItem(
id=m.id,
question_id=m.question_id,
knowledge_point_id=m.knowledge_point_id,
question_content=m.question_content,
correct_answer=m.correct_answer,
user_answer=m.user_answer,
created_at=m.created_at,
)
for m in mistakes
]
logger.info(
f"获取错题记录成功 - user_id: {current_user.id}, exam_id: {exam_id}, "
f"count: {len(mistake_items)}"
)
# 返回统一的ResponseModel格式让Pydantic自动处理序列化
return ResponseModel(
code=200,
message="获取错题记录成功",
data=GetMistakesResponse(
mistakes=mistake_items
)
)
except Exception as e:
logger.error(f"获取错题记录失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取错题记录失败: {str(e)}"
)
@router.get("/{exam_id}", response_model=ResponseModel[ExamDetailResponse])
async def get_exam_detail(
exam_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""获取考试详情"""
exam_data = await ExamService.get_exam_detail(
db=db, user_id=current_user.id, exam_id=exam_id
)
return ResponseModel(code=200, data=ExamDetailResponse(**exam_data), message="获取成功")
@router.get("/records", response_model=ResponseModel[dict])
async def get_exam_records(
page: int = Query(1, ge=1),
size: int = Query(10, ge=1, le=100),
course_id: Optional[int] = Query(None),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""获取考试记录列表"""
records = await ExamService.get_exam_records(
db=db, user_id=current_user.id, page=page, size=size, course_id=course_id
)
return ResponseModel(code=200, data=records, message="获取成功")
@router.get("/statistics/summary", response_model=ResponseModel[dict])
async def get_exam_statistics(
course_id: Optional[int] = Query(None),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""获取考试统计信息"""
stats = await ExamService.get_exam_statistics(
db=db, user_id=current_user.id, course_id=course_id
)
return ResponseModel(code=200, data=stats, message="获取成功")
# ==================== 试题生成接口 ====================
@router.post("/generate", response_model=ResponseModel[GenerateExamResponse])
async def generate_exam(
request: GenerateExamRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
生成考试试题
使用 Python 原生 AI 服务实现。
考试轮次说明:
- 第一轮考试mistake_records 传空或不传
- 第二、三轮错题重考mistake_records 传入上一轮错题记录的JSON字符串
"""
from app.models.course import Course
try:
# 验证课程存在性
course = await db.get(Course, request.course_id)
if not course or course.is_deleted:
logger.warning(f"课程不存在或已删除: course_id={request.course_id}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"无法生成试题课程ID {request.course_id} 不存在或已被删除"
)
# 从用户信息中自动获取岗位ID如果未提供
position_id = request.position_id
if not position_id:
# 1. 首先查询用户已分配的岗位
result = await db.execute(
select(PositionMember).where(
PositionMember.user_id == current_user.id,
PositionMember.is_deleted == False
).limit(1)
)
position_member = result.scalar_one_or_none()
if position_member:
position_id = position_member.position_id
else:
# 2. 如果用户没有岗位,从课程关联的岗位中获取第一个
result = await db.execute(
select(PositionCourse.position_id).where(
PositionCourse.course_id == request.course_id,
PositionCourse.is_deleted == False
).limit(1)
)
course_position = result.scalar_one_or_none()
if course_position:
position_id = course_position
logger.info(f"用户 {current_user.id} 没有分配岗位使用课程关联的岗位ID: {position_id}")
else:
# 3. 如果课程也没有关联岗位,抛出错误
logger.warning(f"用户 {current_user.id} 没有分配岗位,且课程 {request.course_id} 未关联任何岗位")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="无法生成试题:用户未分配岗位,且课程未关联任何岗位"
)
# 记录详细的题型设置(用于调试)
logger.info(
f"考试题型设置 - 单选:{request.single_choice_count}, 多选:{request.multiple_choice_count}, "
f"判断:{request.true_false_count}, 填空:{request.fill_blank_count}, 问答:{request.essay_count}, "
f"难度:{request.difficulty_level}"
)
# 调用 Python 原生试题生成服务
logger.info(
f"调用原生试题生成服务 - user_id: {current_user.id}, "
f"course_id: {request.course_id}, position_id: {position_id}"
)
# 构建配置
config = ExamGeneratorConfig(
course_id=request.course_id,
position_id=position_id,
single_choice_count=request.single_choice_count or 0,
multiple_choice_count=request.multiple_choice_count or 0,
true_false_count=request.true_false_count or 0,
fill_blank_count=request.fill_blank_count or 0,
essay_count=request.essay_count or 0,
difficulty_level=request.difficulty_level or 3,
mistake_records=request.mistake_records or "",
)
# 调用原生服务
gen_result = await exam_generator_service.generate_exam(db, config)
if not gen_result.get("success"):
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="试题生成服务返回失败"
)
# 将题目列表转为 JSON 字符串(兼容原有前端格式)
result_data = json.dumps(gen_result.get("questions", []), ensure_ascii=False)
logger.info(
f"试题生成完成 - questions: {gen_result.get('total_count')}, "
f"provider: {gen_result.get('ai_provider')}, latency: {gen_result.get('ai_latency_ms')}ms"
)
if result_data is None or result_data == "":
logger.error(f"试题生成未返回有效结果数据")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="试题生成失败: 未返回结果数据"
)
# 创建或复用考试记录
question_count = sum([
request.single_choice_count or 0,
request.multiple_choice_count or 0,
request.true_false_count or 0,
request.fill_blank_count or 0,
request.essay_count or 0
])
# 第一轮创建新的exam记录
if request.current_round == 1:
exam = Exam(
user_id=current_user.id,
course_id=request.course_id,
exam_name=f"课程{request.course_id}考试",
question_count=question_count,
total_score=100.0,
pass_score=60.0,
duration_minutes=60,
status="started",
start_time=datetime.now(),
questions=None,
answers=None,
)
db.add(exam)
await db.commit()
await db.refresh(exam)
logger.info(f"{request.current_round}轮:创建考试记录成功 - exam_id: {exam.id}")
else:
# 第二、三轮复用已有exam记录
if not request.exam_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"{request.current_round}轮考试必须提供exam_id"
)
exam = await db.get(Exam, request.exam_id)
if not exam:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="考试记录不存在"
)
if exam.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="无权访问此考试记录"
)
logger.info(f"{request.current_round}轮:复用考试记录 - exam_id: {exam.id}")
return ResponseModel(
code=200,
message="试题生成成功",
data=GenerateExamResponse(
result=result_data,
workflow_run_id=f"{gen_result.get('ai_provider')}_{gen_result.get('ai_latency_ms')}ms",
task_id=f"native_{request.course_id}",
exam_id=exam.id,
)
)
except HTTPException:
raise
except Exception as e:
logger.error(f"生成考试试题失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"试题生成失败: {str(e)}"
)
@router.post("/judge-answer", response_model=ResponseModel[JudgeAnswerResponse])
async def judge_answer(
request: JudgeAnswerRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
判断主观题答案
适用于填空题和问答题的答案判断。
使用 Python 原生 AI 服务实现。
"""
try:
logger.info(
f"调用原生答案判断服务 - user_id: {current_user.id}, "
f"question: {request.question[:50]}..."
)
result = await answer_judge_service.judge(
question=request.question,
correct_answer=request.correct_answer,
user_answer=request.user_answer,
analysis=request.analysis,
db=db # 传入 db_session 用于记录调用日志
)
logger.info(
f"答案判断完成 - is_correct: {result.is_correct}, "
f"provider: {result.ai_provider}, latency: {result.ai_latency_ms}ms"
)
return ResponseModel(
code=200,
message="答案判断完成",
data=JudgeAnswerResponse(
is_correct=result.is_correct,
correct_answer=request.correct_answer,
feedback=result.raw_response if not result.is_correct else None,
)
)
except Exception as e:
logger.error(f"答案判断失败: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"答案判断失败: {str(e)}"
)
@router.post("/record-mistake", response_model=ResponseModel[RecordMistakeResponse])
async def record_mistake(
request: RecordMistakeRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
记录错题
当用户答错题目时,立即调用此接口记录到错题表
"""
try:
# 创建错题记录
# 注意knowledge_point_id暂时设置为None避免外键约束失败
mistake = ExamMistake(
user_id=current_user.id,
exam_id=request.exam_id,
round=request.round, # 记录考试轮次
question_id=request.question_id,
knowledge_point_id=None, # 暂时设为None避免外键约束
question_content=request.question_content,
correct_answer=request.correct_answer,
user_answer=request.user_answer,
question_type=request.question_type, # 新增:记录题型
)
if request.knowledge_point_id:
logger.info(f"原始knowledge_point_id={request.knowledge_point_id}已设置为NULL待同步生产数据")
db.add(mistake)
await db.commit()
await db.refresh(mistake)
logger.info(
f"记录错题成功 - user_id: {current_user.id}, exam_id: {request.exam_id}, "
f"round: {request.round}, mistake_id: {mistake.id}"
)
return ResponseModel(
code=200,
message="错题记录成功",
data=RecordMistakeResponse(
id=mistake.id,
created_at=mistake.created_at,
)
)
except Exception as e:
await db.rollback()
logger.error(f"记录错题失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"记录错题失败: {str(e)}"
)
@router.get("/mistakes-debug")
async def get_mistakes_debug(
exam_id: int,
):
"""调试endpoint - 不需要认证"""
logger.info(f"🔍 调试 - exam_id: {exam_id}, type: {type(exam_id)}")
return {"exam_id": exam_id, "type": str(type(exam_id))}
# ==================== 成绩报告和错题本相关接口 ====================
@router.get("/statistics/report", response_model=ResponseModel[ExamReportResponse])
async def get_exam_report(
start_date: Optional[str] = Query(None, description="开始日期(YYYY-MM-DD)"),
end_date: Optional[str] = Query(None, description="结束日期(YYYY-MM-DD)"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
获取成绩报告汇总
返回包含概览、趋势、科目分析、最近考试记录的完整报告
"""
try:
report_data = await ExamReportService.get_exam_report(
db=db,
user_id=current_user.id,
start_date=start_date,
end_date=end_date
)
return ResponseModel(code=200, data=report_data, message="获取成绩报告成功")
except Exception as e:
logger.error(f"获取成绩报告失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取成绩报告失败: {str(e)}"
)
@router.get("/mistakes/list", response_model=ResponseModel[MistakeListResponse])
async def get_mistakes_list(
exam_id: Optional[int] = Query(None, description="考试ID"),
course_id: Optional[int] = Query(None, description="课程ID"),
question_type: Optional[str] = Query(None, description="题型(single/multiple/judge/blank/essay)"),
search: Optional[str] = Query(None, description="关键词搜索"),
start_date: Optional[str] = Query(None, description="开始日期(YYYY-MM-DD)"),
end_date: Optional[str] = Query(None, description="结束日期(YYYY-MM-DD)"),
page: int = Query(1, ge=1, description="页码"),
size: int = Query(10, ge=1, le=100, description="每页数量"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
获取错题列表(支持多维度筛选)
- 不传exam_id时返回用户所有错题
- 支持按course_id、question_type、关键词、时间范围筛选
"""
try:
mistakes_data = await MistakeService.get_mistakes_list(
db=db,
user_id=current_user.id,
exam_id=exam_id,
course_id=course_id,
question_type=question_type,
search=search,
start_date=start_date,
end_date=end_date,
page=page,
size=size
)
return ResponseModel(code=200, data=mistakes_data, message="获取错题列表成功")
except Exception as e:
logger.error(f"获取错题列表失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取错题列表失败: {str(e)}"
)
@router.get("/mistakes/statistics", response_model=ResponseModel[MistakesStatisticsResponse])
async def get_mistakes_statistics(
course_id: Optional[int] = Query(None, description="课程ID"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
获取错题统计数据
返回按课程、题型、时间维度的统计数据
"""
try:
stats_data = await MistakeService.get_mistakes_statistics(
db=db,
user_id=current_user.id,
course_id=course_id
)
return ResponseModel(code=200, data=stats_data, message="获取错题统计成功")
except Exception as e:
logger.error(f"获取错题统计失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取错题统计失败: {str(e)}"
)
@router.put("/{exam_id}/round-score", response_model=ResponseModel[dict])
async def update_round_score(
exam_id: int,
request: UpdateRoundScoreRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
更新某轮的得分
用于前端每轮考试结束后更新对应轮次的得分
"""
try:
# 查询考试记录
exam = await db.get(Exam, exam_id)
if not exam:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="考试记录不存在"
)
# 验证权限
if exam.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="无权修改此考试记录"
)
# 更新对应轮次的得分
if request.round == 1:
exam.round1_score = request.score
elif request.round == 2:
exam.round2_score = request.score
elif request.round == 3:
exam.round3_score = request.score
# 第三轮默认就是 final
request.is_final = True
# 如果是最终轮次可能是第1/2轮就全对了更新总分和状态
if request.is_final:
exam.score = request.score
exam.status = "submitted"
# 计算是否通过 (pass_score 为空默认 60)
exam.is_passed = request.score >= (exam.pass_score or 60)
# 更新结束时间
from datetime import datetime
exam.end_time = datetime.now()
await db.commit()
logger.info(f"更新轮次得分成功 - exam_id: {exam_id}, round: {request.round}, score: {request.score}")
return ResponseModel(code=200, data={"exam_id": exam_id}, message="更新得分成功")
except HTTPException:
raise
except Exception as e:
await db.rollback()
logger.error(f"更新轮次得分失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"更新轮次得分失败: {str(e)}"
)
@router.put("/mistakes/{mistake_id}/mastered", response_model=ResponseModel)
async def mark_mistake_mastered(
mistake_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
标记错题为已掌握
Args:
mistake_id: 错题记录ID
db: 数据库会话
current_user: 当前用户
Returns:
ResponseModel: 标记结果
"""
try:
# 查询错题记录
stmt = select(ExamMistake).where(
ExamMistake.id == mistake_id,
ExamMistake.user_id == current_user.id
)
result = await db.execute(stmt)
mistake = result.scalar_one_or_none()
if not mistake:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="错题记录不存在或无权访问"
)
# 更新掌握状态
from datetime import datetime as dt
mistake.mastery_status = 'mastered'
mistake.mastered_at = dt.utcnow()
await db.commit()
logger.info(f"标记错题已掌握成功 - mistake_id: {mistake_id}, user_id: {current_user.id}")
return ResponseModel(
code=200,
message="已标记为掌握",
data={"mistake_id": mistake_id, "mastery_status": "mastered"}
)
except HTTPException:
raise
except Exception as e:
await db.rollback()
logger.error(f"标记错题已掌握失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"标记失败: {str(e)}"
)