All checks were successful
continuous-integration/drone/push Build is passing
考试开始/提交等API因缺少ExamService导入返回500错误
825 lines
29 KiB
Python
825 lines
29 KiB
Python
"""
|
||
考试相关API路由
|
||
"""
|
||
from typing import List, Optional
|
||
import json
|
||
from datetime import datetime
|
||
from fastapi import APIRouter, Depends, Query, HTTPException, status, Request
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
from sqlalchemy import select
|
||
from app.core.deps import get_db, get_current_user
|
||
from app.core.config import get_settings
|
||
from app.core.logger import get_logger
|
||
from app.models.user import User
|
||
from app.models.exam import Exam
|
||
from app.models.exam_mistake import ExamMistake
|
||
from app.models.position_member import PositionMember
|
||
from app.models.position_course import PositionCourse
|
||
from app.schemas.base import ResponseModel
|
||
from app.services.exam_service import ExamService
|
||
from app.schemas.exam import (
|
||
StartExamRequest,
|
||
StartExamResponse,
|
||
SubmitExamRequest,
|
||
SubmitExamResponse,
|
||
ExamDetailResponse,
|
||
ExamRecordResponse,
|
||
GenerateExamRequest,
|
||
GenerateExamResponse,
|
||
JudgeAnswerRequest,
|
||
JudgeAnswerResponse,
|
||
RecordMistakeRequest,
|
||
RecordMistakeResponse,
|
||
GetMistakesResponse,
|
||
MistakeRecordItem,
|
||
# 新增的Schema
|
||
ExamReportResponse,
|
||
MistakeListResponse,
|
||
MistakesStatisticsResponse,
|
||
UpdateRoundScoreRequest,
|
||
)
|
||
from app.services.exam_report_service import ExamReportService, MistakeService
|
||
from app.services.course_statistics_service import course_statistics_service
|
||
from app.services.system_log_service import system_log_service
|
||
from app.schemas.system_log import SystemLogCreate
|
||
|
||
# V2 原生服务:Python 实现
|
||
from app.services.ai import exam_generator_service, ExamGeneratorConfig
|
||
from app.services.ai.answer_judge_service import answer_judge_service
|
||
from app.core.exceptions import ExternalServiceError
|
||
|
||
logger = get_logger(__name__)
|
||
settings = get_settings()
|
||
|
||
router = APIRouter(prefix="/exams", tags=["考试"])
|
||
|
||
|
||
@router.post("/start", response_model=ResponseModel[StartExamResponse])
|
||
async def start_exam(
|
||
request: StartExamRequest,
|
||
http_request: Request,
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""开始考试"""
|
||
exam = await ExamService.start_exam(
|
||
db=db,
|
||
user_id=current_user.id,
|
||
course_id=request.course_id,
|
||
question_count=request.count,
|
||
)
|
||
|
||
# 异步更新课程学员数统计
|
||
try:
|
||
await course_statistics_service.update_course_student_count(db, request.course_id)
|
||
except Exception as e:
|
||
logger.warning(f"更新课程学员数失败: {str(e)}")
|
||
# 不影响主流程,只记录警告
|
||
|
||
# 记录考试开始日志
|
||
await system_log_service.create_log(
|
||
db,
|
||
SystemLogCreate(
|
||
level="INFO",
|
||
type="api",
|
||
message=f"用户 {current_user.username} 开始考试(课程ID: {request.course_id})",
|
||
user_id=current_user.id,
|
||
user=current_user.username,
|
||
ip=http_request.client.host if http_request.client else None,
|
||
path="/api/v1/exams/start",
|
||
method="POST",
|
||
user_agent=http_request.headers.get("user-agent")
|
||
)
|
||
)
|
||
|
||
return ResponseModel(code=200, data=StartExamResponse(exam_id=exam.id), message="考试开始")
|
||
|
||
|
||
@router.post("/submit", response_model=ResponseModel[SubmitExamResponse])
|
||
async def submit_exam(
|
||
request: SubmitExamRequest,
|
||
http_request: Request,
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""提交考试答案"""
|
||
result = await ExamService.submit_exam(
|
||
db=db, user_id=current_user.id, exam_id=request.exam_id, answers=request.answers
|
||
)
|
||
|
||
# 获取考试记录以获取course_id
|
||
exam_stmt = select(Exam).where(Exam.id == request.exam_id)
|
||
exam_result = await db.execute(exam_stmt)
|
||
exam = exam_result.scalar_one_or_none()
|
||
|
||
# 异步更新课程学员数统计
|
||
if exam and exam.course_id:
|
||
try:
|
||
await course_statistics_service.update_course_student_count(db, exam.course_id)
|
||
except Exception as e:
|
||
logger.warning(f"更新课程学员数失败: {str(e)}")
|
||
# 不影响主流程,只记录警告
|
||
|
||
# 记录考试提交日志
|
||
await system_log_service.create_log(
|
||
db,
|
||
SystemLogCreate(
|
||
level="INFO",
|
||
type="api",
|
||
message=f"用户 {current_user.username} 提交考试(考试ID: {request.exam_id},得分: {result.get('score', 0)})",
|
||
user_id=current_user.id,
|
||
user=current_user.username,
|
||
ip=http_request.client.host if http_request.client else None,
|
||
path="/api/v1/exams/submit",
|
||
method="POST",
|
||
user_agent=http_request.headers.get("user-agent")
|
||
)
|
||
)
|
||
|
||
# 考试通过时触发经验值和奖章检查
|
||
exp_result = None
|
||
new_badges = []
|
||
if result.get("is_passed"):
|
||
try:
|
||
from app.services.level_service import LevelService
|
||
from app.services.badge_service import BadgeService
|
||
|
||
level_service = LevelService(db)
|
||
badge_service = BadgeService(db)
|
||
|
||
# 添加考试经验值
|
||
exp_result = await level_service.add_exam_exp(
|
||
user_id=current_user.id,
|
||
exam_id=request.exam_id,
|
||
score=result.get("total_score", 0),
|
||
is_passed=True
|
||
)
|
||
|
||
# 检查是否解锁新奖章
|
||
new_badges = await badge_service.check_and_award_badges(current_user.id)
|
||
|
||
await db.commit()
|
||
except Exception as e:
|
||
logger.warning(f"考试经验值/奖章处理失败: {str(e)}")
|
||
|
||
# 将经验值结果添加到返回数据
|
||
response_data = SubmitExamResponse(**result)
|
||
return ResponseModel(
|
||
code=200,
|
||
data={
|
||
**response_data.model_dump(),
|
||
"exp_result": exp_result,
|
||
"new_badges": new_badges
|
||
},
|
||
message="考试提交成功"
|
||
)
|
||
|
||
|
||
@router.get("/mistakes", response_model=ResponseModel[GetMistakesResponse])
|
||
async def get_mistakes(
|
||
exam_id: int,
|
||
round: int = Query(None, description="获取指定轮次的错题,不传则获取所有轮次"),
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
获取错题记录
|
||
|
||
用于第二、三轮考试时获取上一轮的错题记录
|
||
返回的数据可直接序列化为JSON字符串作为mistake_records参数传给考试生成接口
|
||
|
||
参数说明:
|
||
- exam_id: 考试ID
|
||
- round: 指定轮次(1/2/3),用于获取特定轮次的错题
|
||
第2轮考试时传入round=1,获取第1轮的错题
|
||
第3轮考试时传入round=2,获取第2轮的错题
|
||
不传则获取该考试的所有错题
|
||
"""
|
||
logger.info(f"📋 GET /mistakes 收到请求")
|
||
try:
|
||
logger.info(f"📋 获取错题记录 - exam_id: {exam_id}, round: {round}, user_id: {current_user.id}")
|
||
|
||
# 构建查询条件
|
||
query = select(ExamMistake).where(
|
||
ExamMistake.exam_id == exam_id,
|
||
ExamMistake.user_id == current_user.id,
|
||
)
|
||
|
||
# 如果指定了轮次,只获取该轮次的错题
|
||
if round is not None:
|
||
query = query.where(ExamMistake.round == round)
|
||
|
||
query = query.order_by(ExamMistake.id)
|
||
|
||
result = await db.execute(query)
|
||
mistakes = result.scalars().all()
|
||
|
||
logger.info(f"✅ 查询到错题记录数量: {len(mistakes)}")
|
||
|
||
# 转换为响应格式
|
||
mistake_items = [
|
||
MistakeRecordItem(
|
||
id=m.id,
|
||
question_id=m.question_id,
|
||
knowledge_point_id=m.knowledge_point_id,
|
||
question_content=m.question_content,
|
||
correct_answer=m.correct_answer,
|
||
user_answer=m.user_answer,
|
||
created_at=m.created_at,
|
||
)
|
||
for m in mistakes
|
||
]
|
||
|
||
logger.info(
|
||
f"获取错题记录成功 - user_id: {current_user.id}, exam_id: {exam_id}, "
|
||
f"count: {len(mistake_items)}"
|
||
)
|
||
|
||
# 返回统一的ResponseModel格式,让Pydantic自动处理序列化
|
||
return ResponseModel(
|
||
code=200,
|
||
message="获取错题记录成功",
|
||
data=GetMistakesResponse(
|
||
mistakes=mistake_items
|
||
)
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取错题记录失败: {str(e)}", exc_info=True)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"获取错题记录失败: {str(e)}"
|
||
)
|
||
|
||
|
||
@router.get("/{exam_id}", response_model=ResponseModel[ExamDetailResponse])
|
||
async def get_exam_detail(
|
||
exam_id: int,
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""获取考试详情"""
|
||
exam_data = await ExamService.get_exam_detail(
|
||
db=db, user_id=current_user.id, exam_id=exam_id
|
||
)
|
||
|
||
return ResponseModel(code=200, data=ExamDetailResponse(**exam_data), message="获取成功")
|
||
|
||
|
||
@router.get("/records", response_model=ResponseModel[dict])
|
||
async def get_exam_records(
|
||
page: int = Query(1, ge=1),
|
||
size: int = Query(10, ge=1, le=100),
|
||
course_id: Optional[int] = Query(None),
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""获取考试记录列表"""
|
||
records = await ExamService.get_exam_records(
|
||
db=db, user_id=current_user.id, page=page, size=size, course_id=course_id
|
||
)
|
||
|
||
return ResponseModel(code=200, data=records, message="获取成功")
|
||
|
||
|
||
@router.get("/statistics/summary", response_model=ResponseModel[dict])
|
||
async def get_exam_statistics(
|
||
course_id: Optional[int] = Query(None),
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""获取考试统计信息"""
|
||
stats = await ExamService.get_exam_statistics(
|
||
db=db, user_id=current_user.id, course_id=course_id
|
||
)
|
||
|
||
return ResponseModel(code=200, data=stats, message="获取成功")
|
||
|
||
|
||
# ==================== 试题生成接口 ====================
|
||
|
||
@router.post("/generate", response_model=ResponseModel[GenerateExamResponse])
|
||
async def generate_exam(
|
||
request: GenerateExamRequest,
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
生成考试试题
|
||
|
||
使用 Python 原生 AI 服务实现。
|
||
|
||
考试轮次说明:
|
||
- 第一轮考试:mistake_records 传空或不传
|
||
- 第二、三轮错题重考:mistake_records 传入上一轮错题记录的JSON字符串
|
||
"""
|
||
from app.models.course import Course
|
||
|
||
try:
|
||
# 验证课程存在性
|
||
course = await db.get(Course, request.course_id)
|
||
if not course or course.is_deleted:
|
||
logger.warning(f"课程不存在或已删除: course_id={request.course_id}")
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail=f"无法生成试题:课程ID {request.course_id} 不存在或已被删除"
|
||
)
|
||
|
||
# 从用户信息中自动获取岗位ID(如果未提供)
|
||
position_id = request.position_id
|
||
if not position_id:
|
||
# 1. 首先查询用户已分配的岗位
|
||
result = await db.execute(
|
||
select(PositionMember).where(
|
||
PositionMember.user_id == current_user.id,
|
||
PositionMember.is_deleted == False
|
||
).limit(1)
|
||
)
|
||
position_member = result.scalar_one_or_none()
|
||
if position_member:
|
||
position_id = position_member.position_id
|
||
else:
|
||
# 2. 如果用户没有岗位,从课程关联的岗位中获取第一个
|
||
result = await db.execute(
|
||
select(PositionCourse.position_id).where(
|
||
PositionCourse.course_id == request.course_id,
|
||
PositionCourse.is_deleted == False
|
||
).limit(1)
|
||
)
|
||
course_position = result.scalar_one_or_none()
|
||
if course_position:
|
||
position_id = course_position
|
||
logger.info(f"用户 {current_user.id} 没有分配岗位,使用课程关联的岗位ID: {position_id}")
|
||
else:
|
||
# 3. 如果课程也没有关联岗位,抛出错误
|
||
logger.warning(f"用户 {current_user.id} 没有分配岗位,且课程 {request.course_id} 未关联任何岗位")
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail="无法生成试题:用户未分配岗位,且课程未关联任何岗位"
|
||
)
|
||
|
||
# 记录详细的题型设置(用于调试)
|
||
logger.info(
|
||
f"考试题型设置 - 单选:{request.single_choice_count}, 多选:{request.multiple_choice_count}, "
|
||
f"判断:{request.true_false_count}, 填空:{request.fill_blank_count}, 问答:{request.essay_count}, "
|
||
f"难度:{request.difficulty_level}"
|
||
)
|
||
|
||
# 调用 Python 原生试题生成服务
|
||
logger.info(
|
||
f"调用原生试题生成服务 - user_id: {current_user.id}, "
|
||
f"course_id: {request.course_id}, position_id: {position_id}"
|
||
)
|
||
|
||
# 构建配置
|
||
config = ExamGeneratorConfig(
|
||
course_id=request.course_id,
|
||
position_id=position_id,
|
||
single_choice_count=request.single_choice_count or 0,
|
||
multiple_choice_count=request.multiple_choice_count or 0,
|
||
true_false_count=request.true_false_count or 0,
|
||
fill_blank_count=request.fill_blank_count or 0,
|
||
essay_count=request.essay_count or 0,
|
||
difficulty_level=request.difficulty_level or 3,
|
||
mistake_records=request.mistake_records or "",
|
||
)
|
||
|
||
# 调用原生服务
|
||
gen_result = await exam_generator_service.generate_exam(db, config)
|
||
|
||
if not gen_result.get("success"):
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail="试题生成服务返回失败"
|
||
)
|
||
|
||
# 将题目列表转为 JSON 字符串(兼容原有前端格式)
|
||
result_data = json.dumps(gen_result.get("questions", []), ensure_ascii=False)
|
||
|
||
logger.info(
|
||
f"试题生成完成 - questions: {gen_result.get('total_count')}, "
|
||
f"provider: {gen_result.get('ai_provider')}, latency: {gen_result.get('ai_latency_ms')}ms"
|
||
)
|
||
|
||
if result_data is None or result_data == "":
|
||
logger.error(f"试题生成未返回有效结果数据")
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail="试题生成失败: 未返回结果数据"
|
||
)
|
||
|
||
# 创建或复用考试记录
|
||
question_count = sum([
|
||
request.single_choice_count or 0,
|
||
request.multiple_choice_count or 0,
|
||
request.true_false_count or 0,
|
||
request.fill_blank_count or 0,
|
||
request.essay_count or 0
|
||
])
|
||
|
||
# 第一轮:创建新的exam记录
|
||
if request.current_round == 1:
|
||
exam = Exam(
|
||
user_id=current_user.id,
|
||
course_id=request.course_id,
|
||
exam_name=f"课程{request.course_id}考试",
|
||
question_count=question_count,
|
||
total_score=100.0,
|
||
pass_score=60.0,
|
||
duration_minutes=60,
|
||
status="started",
|
||
start_time=datetime.now(),
|
||
questions=None,
|
||
answers=None,
|
||
)
|
||
|
||
db.add(exam)
|
||
await db.commit()
|
||
await db.refresh(exam)
|
||
|
||
logger.info(f"第{request.current_round}轮:创建考试记录成功 - exam_id: {exam.id}")
|
||
else:
|
||
# 第二、三轮:复用已有exam记录
|
||
if not request.exam_id:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail=f"第{request.current_round}轮考试必须提供exam_id"
|
||
)
|
||
|
||
exam = await db.get(Exam, request.exam_id)
|
||
if not exam:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="考试记录不存在"
|
||
)
|
||
|
||
if exam.user_id != current_user.id:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_403_FORBIDDEN,
|
||
detail="无权访问此考试记录"
|
||
)
|
||
|
||
logger.info(f"第{request.current_round}轮:复用考试记录 - exam_id: {exam.id}")
|
||
|
||
return ResponseModel(
|
||
code=200,
|
||
message="试题生成成功",
|
||
data=GenerateExamResponse(
|
||
result=result_data,
|
||
workflow_run_id=f"{gen_result.get('ai_provider')}_{gen_result.get('ai_latency_ms')}ms",
|
||
task_id=f"native_{request.course_id}",
|
||
exam_id=exam.id,
|
||
)
|
||
)
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"生成考试试题失败: {str(e)}", exc_info=True)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"试题生成失败: {str(e)}"
|
||
)
|
||
|
||
|
||
@router.post("/judge-answer", response_model=ResponseModel[JudgeAnswerResponse])
|
||
async def judge_answer(
|
||
request: JudgeAnswerRequest,
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
判断主观题答案
|
||
|
||
适用于填空题和问答题的答案判断。
|
||
使用 Python 原生 AI 服务实现。
|
||
"""
|
||
try:
|
||
logger.info(
|
||
f"调用原生答案判断服务 - user_id: {current_user.id}, "
|
||
f"question: {request.question[:50]}..."
|
||
)
|
||
|
||
result = await answer_judge_service.judge(
|
||
question=request.question,
|
||
correct_answer=request.correct_answer,
|
||
user_answer=request.user_answer,
|
||
analysis=request.analysis,
|
||
db=db # 传入 db_session 用于记录调用日志
|
||
)
|
||
|
||
logger.info(
|
||
f"答案判断完成 - is_correct: {result.is_correct}, "
|
||
f"provider: {result.ai_provider}, latency: {result.ai_latency_ms}ms"
|
||
)
|
||
|
||
return ResponseModel(
|
||
code=200,
|
||
message="答案判断完成",
|
||
data=JudgeAnswerResponse(
|
||
is_correct=result.is_correct,
|
||
correct_answer=request.correct_answer,
|
||
feedback=result.raw_response if not result.is_correct else None,
|
||
)
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"答案判断失败: {e}", exc_info=True)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"答案判断失败: {str(e)}"
|
||
)
|
||
|
||
|
||
@router.post("/record-mistake", response_model=ResponseModel[RecordMistakeResponse])
|
||
async def record_mistake(
|
||
request: RecordMistakeRequest,
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
记录错题
|
||
|
||
当用户答错题目时,立即调用此接口记录到错题表
|
||
"""
|
||
try:
|
||
# 创建错题记录
|
||
# 注意:knowledge_point_id暂时设置为None,避免外键约束失败
|
||
mistake = ExamMistake(
|
||
user_id=current_user.id,
|
||
exam_id=request.exam_id,
|
||
round=request.round, # 记录考试轮次
|
||
question_id=request.question_id,
|
||
knowledge_point_id=None, # 暂时设为None,避免外键约束
|
||
question_content=request.question_content,
|
||
correct_answer=request.correct_answer,
|
||
user_answer=request.user_answer,
|
||
question_type=request.question_type, # 新增:记录题型
|
||
)
|
||
|
||
if request.knowledge_point_id:
|
||
logger.info(f"原始knowledge_point_id={request.knowledge_point_id},已设置为NULL(待同步生产数据)")
|
||
|
||
db.add(mistake)
|
||
await db.commit()
|
||
await db.refresh(mistake)
|
||
|
||
logger.info(
|
||
f"记录错题成功 - user_id: {current_user.id}, exam_id: {request.exam_id}, "
|
||
f"round: {request.round}, mistake_id: {mistake.id}"
|
||
)
|
||
|
||
return ResponseModel(
|
||
code=200,
|
||
message="错题记录成功",
|
||
data=RecordMistakeResponse(
|
||
id=mistake.id,
|
||
created_at=mistake.created_at,
|
||
)
|
||
)
|
||
|
||
except Exception as e:
|
||
await db.rollback()
|
||
logger.error(f"记录错题失败: {str(e)}", exc_info=True)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"记录错题失败: {str(e)}"
|
||
)
|
||
|
||
|
||
@router.get("/mistakes-debug")
|
||
async def get_mistakes_debug(
|
||
exam_id: int,
|
||
):
|
||
"""调试endpoint - 不需要认证"""
|
||
logger.info(f"🔍 调试 - exam_id: {exam_id}, type: {type(exam_id)}")
|
||
return {"exam_id": exam_id, "type": str(type(exam_id))}
|
||
|
||
|
||
# ==================== 成绩报告和错题本相关接口 ====================
|
||
|
||
@router.get("/statistics/report", response_model=ResponseModel[ExamReportResponse])
|
||
async def get_exam_report(
|
||
start_date: Optional[str] = Query(None, description="开始日期(YYYY-MM-DD)"),
|
||
end_date: Optional[str] = Query(None, description="结束日期(YYYY-MM-DD)"),
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
获取成绩报告汇总
|
||
|
||
返回包含概览、趋势、科目分析、最近考试记录的完整报告
|
||
"""
|
||
try:
|
||
report_data = await ExamReportService.get_exam_report(
|
||
db=db,
|
||
user_id=current_user.id,
|
||
start_date=start_date,
|
||
end_date=end_date
|
||
)
|
||
|
||
return ResponseModel(code=200, data=report_data, message="获取成绩报告成功")
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取成绩报告失败: {str(e)}", exc_info=True)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"获取成绩报告失败: {str(e)}"
|
||
)
|
||
|
||
|
||
@router.get("/mistakes/list", response_model=ResponseModel[MistakeListResponse])
|
||
async def get_mistakes_list(
|
||
exam_id: Optional[int] = Query(None, description="考试ID"),
|
||
course_id: Optional[int] = Query(None, description="课程ID"),
|
||
question_type: Optional[str] = Query(None, description="题型(single/multiple/judge/blank/essay)"),
|
||
search: Optional[str] = Query(None, description="关键词搜索"),
|
||
start_date: Optional[str] = Query(None, description="开始日期(YYYY-MM-DD)"),
|
||
end_date: Optional[str] = Query(None, description="结束日期(YYYY-MM-DD)"),
|
||
page: int = Query(1, ge=1, description="页码"),
|
||
size: int = Query(10, ge=1, le=100, description="每页数量"),
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
获取错题列表(支持多维度筛选)
|
||
|
||
- 不传exam_id时返回用户所有错题
|
||
- 支持按course_id、question_type、关键词、时间范围筛选
|
||
"""
|
||
try:
|
||
mistakes_data = await MistakeService.get_mistakes_list(
|
||
db=db,
|
||
user_id=current_user.id,
|
||
exam_id=exam_id,
|
||
course_id=course_id,
|
||
question_type=question_type,
|
||
search=search,
|
||
start_date=start_date,
|
||
end_date=end_date,
|
||
page=page,
|
||
size=size
|
||
)
|
||
|
||
return ResponseModel(code=200, data=mistakes_data, message="获取错题列表成功")
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取错题列表失败: {str(e)}", exc_info=True)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"获取错题列表失败: {str(e)}"
|
||
)
|
||
|
||
|
||
@router.get("/mistakes/statistics", response_model=ResponseModel[MistakesStatisticsResponse])
|
||
async def get_mistakes_statistics(
|
||
course_id: Optional[int] = Query(None, description="课程ID"),
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
获取错题统计数据
|
||
|
||
返回按课程、题型、时间维度的统计数据
|
||
"""
|
||
try:
|
||
stats_data = await MistakeService.get_mistakes_statistics(
|
||
db=db,
|
||
user_id=current_user.id,
|
||
course_id=course_id
|
||
)
|
||
|
||
return ResponseModel(code=200, data=stats_data, message="获取错题统计成功")
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取错题统计失败: {str(e)}", exc_info=True)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"获取错题统计失败: {str(e)}"
|
||
)
|
||
|
||
|
||
@router.put("/{exam_id}/round-score", response_model=ResponseModel[dict])
|
||
async def update_round_score(
|
||
exam_id: int,
|
||
request: UpdateRoundScoreRequest,
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
更新某轮的得分
|
||
|
||
用于前端每轮考试结束后更新对应轮次的得分
|
||
"""
|
||
try:
|
||
# 查询考试记录
|
||
exam = await db.get(Exam, exam_id)
|
||
if not exam:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="考试记录不存在"
|
||
)
|
||
|
||
# 验证权限
|
||
if exam.user_id != current_user.id:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_403_FORBIDDEN,
|
||
detail="无权修改此考试记录"
|
||
)
|
||
|
||
# 更新对应轮次的得分
|
||
if request.round == 1:
|
||
exam.round1_score = request.score
|
||
elif request.round == 2:
|
||
exam.round2_score = request.score
|
||
elif request.round == 3:
|
||
exam.round3_score = request.score
|
||
# 第三轮默认就是 final
|
||
request.is_final = True
|
||
|
||
# 如果是最终轮次(可能是第1/2轮就全对了),更新总分和状态
|
||
if request.is_final:
|
||
exam.score = request.score
|
||
exam.status = "submitted"
|
||
# 计算是否通过 (pass_score 为空默认 60)
|
||
exam.is_passed = request.score >= (exam.pass_score or 60)
|
||
# 更新结束时间
|
||
from datetime import datetime
|
||
exam.end_time = datetime.now()
|
||
|
||
await db.commit()
|
||
|
||
logger.info(f"更新轮次得分成功 - exam_id: {exam_id}, round: {request.round}, score: {request.score}")
|
||
|
||
return ResponseModel(code=200, data={"exam_id": exam_id}, message="更新得分成功")
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
await db.rollback()
|
||
logger.error(f"更新轮次得分失败: {str(e)}", exc_info=True)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"更新轮次得分失败: {str(e)}"
|
||
)
|
||
|
||
|
||
@router.put("/mistakes/{mistake_id}/mastered", response_model=ResponseModel)
|
||
async def mark_mistake_mastered(
|
||
mistake_id: int,
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
标记错题为已掌握
|
||
|
||
Args:
|
||
mistake_id: 错题记录ID
|
||
db: 数据库会话
|
||
current_user: 当前用户
|
||
|
||
Returns:
|
||
ResponseModel: 标记结果
|
||
"""
|
||
try:
|
||
# 查询错题记录
|
||
stmt = select(ExamMistake).where(
|
||
ExamMistake.id == mistake_id,
|
||
ExamMistake.user_id == current_user.id
|
||
)
|
||
result = await db.execute(stmt)
|
||
mistake = result.scalar_one_or_none()
|
||
|
||
if not mistake:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="错题记录不存在或无权访问"
|
||
)
|
||
|
||
# 更新掌握状态
|
||
from datetime import datetime as dt
|
||
mistake.mastery_status = 'mastered'
|
||
mistake.mastered_at = dt.utcnow()
|
||
|
||
await db.commit()
|
||
|
||
logger.info(f"标记错题已掌握成功 - mistake_id: {mistake_id}, user_id: {current_user.id}")
|
||
|
||
return ResponseModel(
|
||
code=200,
|
||
message="已标记为掌握",
|
||
data={"mistake_id": mistake_id, "mastery_status": "mastered"}
|
||
)
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
await db.rollback()
|
||
logger.error(f"标记错题已掌握失败: {str(e)}", exc_info=True)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"标记失败: {str(e)}"
|
||
)
|
||
|
||
|