Files
012-kaopeilian/backend/app/services/exam_service.py
yuliang_guo 4e817f6eef
All checks were successful
continuous-integration/drone/push Build is passing
fix: 修复exam_service解析questions JSON格式
questions可能是{"questions":[...]}或直接是列表,需要兼容处理
2026-01-31 11:28:00 +08:00

453 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
考试服务层
"""
import json
import random
from datetime import datetime, timedelta
from typing import List, Optional, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_, or_, desc
from app.models.exam import Exam, Question, ExamResult
from app.models.exam_mistake import ExamMistake
from app.models.course import Course, KnowledgePoint
from app.core.exceptions import NotFoundError, ValidationError
from app.utils.score_distributor import ScoreDistributor
class ExamService:
"""考试服务类"""
@staticmethod
async def start_exam(
db: AsyncSession, user_id: int, course_id: int, question_count: int = 10
) -> int:
"""
开始考试
Args:
db: 数据库会话
user_id: 用户ID
course_id: 课程ID
question_count: 题目数量
Returns:
int: 考试ID
"""
# 检查课程是否存在
course = await db.get(Course, course_id)
if not course:
raise NotFoundError("课程不存在")
# 获取该课程的所有可用题目
stmt = select(Question).where(
and_(Question.course_id == course_id, Question.is_active == True)
)
result = await db.execute(stmt)
all_questions = result.scalars().all()
if not all_questions:
raise ValidationError("该课程暂无题目")
# 随机选择题目
selected_questions = random.sample(
all_questions, min(question_count, len(all_questions))
)
# 使用智能分数分配器,确保总分为整数且分配合理
total_score = 100.0 # 固定总分为100分
actual_question_count = len(selected_questions)
# 创建分数分配器
distributor = ScoreDistributor(total_score, actual_question_count)
distributed_scores = distributor.distribute(mode="integer")
# 构建题目数据,使用分配后的分数
questions_data = []
for i, q in enumerate(selected_questions):
question_data = {
"id": str(q.id),
"type": q.question_type,
"title": q.title,
"content": q.content,
"options": q.options,
"score": distributed_scores[i], # 使用智能分配的分数
}
questions_data.append(question_data)
# 计算及格分数(向上取整,确保公平)
pass_score = ScoreDistributor.calculate_pass_score(total_score, 0.6)
# 创建考试记录
exam = Exam(
user_id=user_id,
course_id=course_id,
exam_name=f"{course.name} - 随机测试",
question_count=actual_question_count,
total_score=total_score,
pass_score=pass_score,
duration_minutes=60,
status="started",
questions={"questions": questions_data},
)
db.add(exam)
await db.commit()
await db.refresh(exam)
# 返回exam.id而不是整个对象避免懒加载问题
exam_id = exam.id
return exam_id
@staticmethod
async def submit_exam(
db: AsyncSession, user_id: int, exam_id: int, answers: List[Dict[str, str]]
) -> Dict[str, Any]:
"""
提交考试答案
Args:
db: 数据库会话
user_id: 用户ID
exam_id: 考试ID
answers: 答案列表
Returns:
Dict: 考试结果
"""
# 获取考试记录
stmt = select(Exam).where(and_(Exam.id == exam_id, Exam.user_id == user_id))
result = await db.execute(stmt)
exam = result.scalar_one_or_none()
if not exam:
raise NotFoundError("考试记录不存在")
if exam.status != "started":
raise ValidationError("考试已结束或已提交")
# 检查考试是否超时
if datetime.now() > exam.start_time + timedelta(
minutes=exam.duration_minutes
):
exam.status = "timeout"
await db.commit()
raise ValidationError("考试已超时")
# 处理答案
answers_dict = {ans["question_id"]: ans["answer"] for ans in answers}
total_score = 0.0
correct_count = 0
# 批量获取题目
question_ids = [int(ans["question_id"]) for ans in answers]
stmt = select(Question).where(Question.id.in_(question_ids))
result = await db.execute(stmt)
questions_map = {str(q.id): q for q in result.scalars().all()}
# 创建答题结果记录
for question_data in exam.questions["questions"]:
question_id = question_data["id"]
question = questions_map.get(question_id)
# 使用考试创建时分配的分数(智能分配后的整数分数)
allocated_score = question_data.get("score", 10.0)
if not question:
continue
user_answer = answers_dict.get(question_id, "")
is_correct = user_answer == question.correct_answer
if is_correct:
total_score += allocated_score # 使用分配的分数
correct_count += 1
# 创建答题结果记录
exam_result = ExamResult(
exam_id=exam_id,
question_id=int(question_id),
user_answer=user_answer,
is_correct=is_correct,
score=allocated_score if is_correct else 0.0, # 使用分配的分数
)
db.add(exam_result)
# 更新题目使用统计
question.usage_count += 1
if is_correct:
question.correct_count += 1
# 更新考试记录
exam.end_time = datetime.now()
exam.score = total_score
exam.is_passed = total_score >= exam.pass_score
exam.status = "submitted"
exam.answers = {"answers": answers}
await db.commit()
return {
"exam_id": exam_id,
"total_score": total_score,
"pass_score": exam.pass_score,
"is_passed": exam.is_passed,
"correct_count": correct_count,
"total_count": exam.question_count,
"accuracy": correct_count / exam.question_count
if exam.question_count > 0
else 0,
}
@staticmethod
async def get_exam_detail(
db: AsyncSession, user_id: int, exam_id: int
) -> Dict[str, Any]:
"""
获取考试详情
Args:
db: 数据库会话
user_id: 用户ID
exam_id: 考试ID
Returns:
Dict: 考试详情
"""
# 获取考试记录
stmt = select(Exam).where(and_(Exam.id == exam_id, Exam.user_id == user_id))
result = await db.execute(stmt)
exam = result.scalar_one_or_none()
if not exam:
raise NotFoundError("考试记录不存在")
# 构建返回数据
exam_data = {
"id": exam.id,
"course_id": exam.course_id,
"exam_name": exam.exam_name,
"question_count": exam.question_count,
"total_score": exam.total_score,
"pass_score": exam.pass_score,
"start_time": exam.start_time.isoformat() if exam.start_time else None,
"end_time": exam.end_time.isoformat() if exam.end_time else None,
"duration_minutes": exam.duration_minutes,
"status": exam.status,
"score": exam.score,
"is_passed": exam.is_passed,
"questions": exam.questions,
}
# 如果考试已提交,获取答题详情
if exam.status == "submitted" and exam.answers:
stmt = select(ExamResult).where(ExamResult.exam_id == exam_id)
result = await db.execute(stmt)
results = result.scalars().all()
results_data = []
for r in results:
results_data.append(
{
"question_id": r.question_id,
"user_answer": r.user_answer,
"is_correct": r.is_correct,
"score": r.score,
}
)
exam_data["results"] = results_data
exam_data["answers"] = exam.answers
return exam_data
@staticmethod
async def get_exam_records(
db: AsyncSession,
user_id: int,
page: int = 1,
size: int = 10,
course_id: Optional[int] = None,
) -> Dict[str, Any]:
"""
获取考试记录列表(包含统计数据)
Args:
db: 数据库会话
user_id: 用户ID
page: 页码
size: 每页数量
course_id: 课程ID可选
Returns:
Dict: 考试记录列表(包含统计信息)
"""
# 构建查询条件
conditions = [Exam.user_id == user_id]
if course_id:
conditions.append(Exam.course_id == course_id)
# 查询总数
count_stmt = select(func.count(Exam.id)).where(and_(*conditions))
total = await db.scalar(count_stmt)
# 查询考试数据JOIN courses表获取课程名称
offset = (page - 1) * size
stmt = (
select(Exam, Course.name.label("course_name"))
.join(Course, Exam.course_id == Course.id)
.where(and_(*conditions))
.order_by(Exam.created_at.desc())
.offset(offset)
.limit(size)
)
result = await db.execute(stmt)
rows = result.all()
# 构建返回数据
items = []
for exam, course_name in rows:
# 1. 计算用时
duration_seconds = None
if exam.start_time and exam.end_time:
duration_seconds = int((exam.end_time - exam.start_time).total_seconds())
# 2. 统计错题数
mistakes_stmt = select(func.count(ExamMistake.id)).where(
ExamMistake.exam_id == exam.id
)
wrong_count = await db.scalar(mistakes_stmt) or 0
# 3. 计算正确数和正确率
correct_count = exam.question_count - wrong_count if exam.question_count else 0
accuracy = None
if exam.question_count and exam.question_count > 0:
accuracy = round((correct_count / exam.question_count) * 100, 1)
# 4. 分题型统计
question_type_stats = []
if exam.questions:
try:
# 解析questions JSON统计每种题型的总数
questions_raw = json.loads(exam.questions) if isinstance(exam.questions, str) else exam.questions
# questions可能是 {"questions": [...]} 或直接是列表
questions_data = questions_raw.get("questions", questions_raw) if isinstance(questions_raw, dict) else questions_raw
type_totals = {}
type_scores = {} # 存储每种题型的总分
for q in questions_data:
q_type = q.get("type", "unknown")
q_score = q.get("score", 0)
type_totals[q_type] = type_totals.get(q_type, 0) + 1
type_scores[q_type] = type_scores.get(q_type, 0) + q_score
# 查询错题按题型统计
mistakes_by_type_stmt = (
select(ExamMistake.question_type, func.count(ExamMistake.id))
.where(ExamMistake.exam_id == exam.id)
.group_by(ExamMistake.question_type)
)
mistakes_by_type_result = await db.execute(mistakes_by_type_stmt)
mistakes_by_type = dict(mistakes_by_type_result.all())
# 题型名称映射
type_name_map = {
"single": "单选题",
"multiple": "多选题",
"judge": "判断题",
"blank": "填空题",
"essay": "问答题"
}
# 组装分题型统计
for q_type, total in type_totals.items():
wrong = mistakes_by_type.get(q_type, 0)
correct = total - wrong
type_accuracy = round((correct / total) * 100, 1) if total > 0 else 0
question_type_stats.append({
"type": type_name_map.get(q_type, q_type),
"type_code": q_type,
"total": total,
"correct": correct,
"wrong": wrong,
"accuracy": type_accuracy,
"total_score": type_scores.get(q_type, 0)
})
except (json.JSONDecodeError, TypeError, KeyError) as e:
# 如果JSON解析失败返回空统计
question_type_stats = []
items.append(
{
"id": exam.id,
"course_id": exam.course_id,
"course_name": course_name,
"exam_name": exam.exam_name,
"question_count": exam.question_count,
"total_score": exam.total_score,
"score": exam.score,
"is_passed": exam.is_passed,
"status": exam.status,
"start_time": exam.start_time.isoformat() if exam.start_time else None,
"end_time": exam.end_time.isoformat() if exam.end_time else None,
"created_at": exam.created_at.isoformat(),
# 统计字段
"accuracy": accuracy,
"correct_count": correct_count,
"wrong_count": wrong_count,
"duration_seconds": duration_seconds,
"question_type_stats": question_type_stats,
}
)
return {
"items": items,
"total": total,
"page": page,
"size": size,
"pages": (total + size - 1) // size,
}
@staticmethod
async def get_exam_statistics(
db: AsyncSession, user_id: int, course_id: Optional[int] = None
) -> Dict[str, Any]:
"""
获取考试统计信息
Args:
db: 数据库会话
user_id: 用户ID
course_id: 课程ID可选
Returns:
Dict: 统计信息
"""
# 构建查询条件
conditions = [Exam.user_id == user_id, Exam.status == "submitted"]
if course_id:
conditions.append(Exam.course_id == course_id)
# 查询统计数据
stmt = select(
func.count(Exam.id).label("total_exams"),
func.count(func.nullif(Exam.is_passed, False)).label("passed_exams"),
func.avg(Exam.score).label("avg_score"),
func.max(Exam.score).label("max_score"),
func.min(Exam.score).label("min_score"),
).where(and_(*conditions))
result = await db.execute(stmt)
stats = result.one()
return {
"total_exams": stats.total_exams or 0,
"passed_exams": stats.passed_exams or 0,
"pass_rate": (stats.passed_exams / stats.total_exams * 100)
if stats.total_exams > 0
else 0,
"avg_score": float(stats.avg_score or 0),
"max_score": float(stats.max_score or 0),
"min_score": float(stats.min_score or 0),
}