Files
012-kaopeilian/backend/app/services/exam_service.py
yuliang_guo e3b7bdcfd8
All checks were successful
continuous-integration/drone/push Build is passing
feat: 实现考试分数智能分配
解决题目数量无法整除总分时出现无限小数的问题

后端:
- 新增 ScoreDistributor 分数分配工具类
- 支持整数分配和小数分配两种模式
- 更新 exam_service.py 使用智能分配
- 考试总分固定100分,按题目数量智能分配

前端:
- 新增 scoreFormatter.ts 分数格式化工具
- 提供分数显示、等级判断等辅助函数

示例:100分/6题 = [17,17,17,17,16,16] 总和=100
2026-01-30 14:50:41 +08:00

455 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
考试服务层
"""
import json
import random
from datetime import datetime, timedelta
from typing import List, Optional, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_, or_, desc
from app.models.exam import Exam, Question, ExamResult
from app.models.exam_mistake import ExamMistake
from app.models.course import Course, KnowledgePoint
from app.core.exceptions import BusinessException, ErrorCode
from app.utils.score_distributor import ScoreDistributor
class ExamService:
"""考试服务类"""
@staticmethod
async def start_exam(
db: AsyncSession, user_id: int, course_id: int, question_count: int = 10
) -> Exam:
"""
开始考试
Args:
db: 数据库会话
user_id: 用户ID
course_id: 课程ID
question_count: 题目数量
Returns:
Exam: 考试实例
"""
# 检查课程是否存在
course = await db.get(Course, course_id)
if not course:
raise BusinessException(error_code=ErrorCode.NOT_FOUND, message="课程不存在")
# 获取该课程的所有可用题目
stmt = select(Question).where(
and_(Question.course_id == course_id, Question.is_active == True)
)
result = await db.execute(stmt)
all_questions = result.scalars().all()
if not all_questions:
raise BusinessException(
error_code=ErrorCode.VALIDATION_ERROR, message="该课程暂无题目"
)
# 随机选择题目
selected_questions = random.sample(
all_questions, min(question_count, len(all_questions))
)
# 使用智能分数分配器,确保总分为整数且分配合理
total_score = 100.0 # 固定总分为100分
actual_question_count = len(selected_questions)
# 创建分数分配器
distributor = ScoreDistributor(total_score, actual_question_count)
distributed_scores = distributor.distribute(mode="integer")
# 构建题目数据,使用分配后的分数
questions_data = []
for i, q in enumerate(selected_questions):
question_data = {
"id": str(q.id),
"type": q.question_type,
"title": q.title,
"content": q.content,
"options": q.options,
"score": distributed_scores[i], # 使用智能分配的分数
}
questions_data.append(question_data)
# 计算及格分数(向上取整,确保公平)
pass_score = ScoreDistributor.calculate_pass_score(total_score, 0.6)
# 创建考试记录
exam = Exam(
user_id=user_id,
course_id=course_id,
exam_name=f"{course.name} - 随机测试",
question_count=actual_question_count,
total_score=total_score,
pass_score=pass_score,
duration_minutes=60,
status="started",
questions={"questions": questions_data},
)
db.add(exam)
await db.commit()
await db.refresh(exam)
return exam
@staticmethod
async def submit_exam(
db: AsyncSession, user_id: int, exam_id: int, answers: List[Dict[str, str]]
) -> Dict[str, Any]:
"""
提交考试答案
Args:
db: 数据库会话
user_id: 用户ID
exam_id: 考试ID
answers: 答案列表
Returns:
Dict: 考试结果
"""
# 获取考试记录
stmt = select(Exam).where(and_(Exam.id == exam_id, Exam.user_id == user_id))
result = await db.execute(stmt)
exam = result.scalar_one_or_none()
if not exam:
raise BusinessException(error_code=ErrorCode.NOT_FOUND, message="考试记录不存在")
if exam.status != "started":
raise BusinessException(
error_code=ErrorCode.VALIDATION_ERROR, message="考试已结束或已提交"
)
# 检查考试是否超时
if datetime.now() > exam.start_time + timedelta(
minutes=exam.duration_minutes
):
exam.status = "timeout"
await db.commit()
raise BusinessException(
error_code=ErrorCode.VALIDATION_ERROR, message="考试已超时"
)
# 处理答案
answers_dict = {ans["question_id"]: ans["answer"] for ans in answers}
total_score = 0.0
correct_count = 0
# 批量获取题目
question_ids = [int(ans["question_id"]) for ans in answers]
stmt = select(Question).where(Question.id.in_(question_ids))
result = await db.execute(stmt)
questions_map = {str(q.id): q for q in result.scalars().all()}
# 创建答题结果记录
for question_data in exam.questions["questions"]:
question_id = question_data["id"]
question = questions_map.get(question_id)
# 使用考试创建时分配的分数(智能分配后的整数分数)
allocated_score = question_data.get("score", 10.0)
if not question:
continue
user_answer = answers_dict.get(question_id, "")
is_correct = user_answer == question.correct_answer
if is_correct:
total_score += allocated_score # 使用分配的分数
correct_count += 1
# 创建答题结果记录
exam_result = ExamResult(
exam_id=exam_id,
question_id=int(question_id),
user_answer=user_answer,
is_correct=is_correct,
score=allocated_score if is_correct else 0.0, # 使用分配的分数
)
db.add(exam_result)
# 更新题目使用统计
question.usage_count += 1
if is_correct:
question.correct_count += 1
# 更新考试记录
exam.end_time = datetime.now()
exam.score = total_score
exam.is_passed = total_score >= exam.pass_score
exam.status = "submitted"
exam.answers = {"answers": answers}
await db.commit()
return {
"exam_id": exam_id,
"total_score": total_score,
"pass_score": exam.pass_score,
"is_passed": exam.is_passed,
"correct_count": correct_count,
"total_count": exam.question_count,
"accuracy": correct_count / exam.question_count
if exam.question_count > 0
else 0,
}
@staticmethod
async def get_exam_detail(
db: AsyncSession, user_id: int, exam_id: int
) -> Dict[str, Any]:
"""
获取考试详情
Args:
db: 数据库会话
user_id: 用户ID
exam_id: 考试ID
Returns:
Dict: 考试详情
"""
# 获取考试记录
stmt = select(Exam).where(and_(Exam.id == exam_id, Exam.user_id == user_id))
result = await db.execute(stmt)
exam = result.scalar_one_or_none()
if not exam:
raise BusinessException(error_code=ErrorCode.NOT_FOUND, message="考试记录不存在")
# 构建返回数据
exam_data = {
"id": exam.id,
"course_id": exam.course_id,
"exam_name": exam.exam_name,
"question_count": exam.question_count,
"total_score": exam.total_score,
"pass_score": exam.pass_score,
"start_time": exam.start_time.isoformat() if exam.start_time else None,
"end_time": exam.end_time.isoformat() if exam.end_time else None,
"duration_minutes": exam.duration_minutes,
"status": exam.status,
"score": exam.score,
"is_passed": exam.is_passed,
"questions": exam.questions,
}
# 如果考试已提交,获取答题详情
if exam.status == "submitted" and exam.answers:
stmt = select(ExamResult).where(ExamResult.exam_id == exam_id)
result = await db.execute(stmt)
results = result.scalars().all()
results_data = []
for r in results:
results_data.append(
{
"question_id": r.question_id,
"user_answer": r.user_answer,
"is_correct": r.is_correct,
"score": r.score,
}
)
exam_data["results"] = results_data
exam_data["answers"] = exam.answers
return exam_data
@staticmethod
async def get_exam_records(
db: AsyncSession,
user_id: int,
page: int = 1,
size: int = 10,
course_id: Optional[int] = None,
) -> Dict[str, Any]:
"""
获取考试记录列表(包含统计数据)
Args:
db: 数据库会话
user_id: 用户ID
page: 页码
size: 每页数量
course_id: 课程ID可选
Returns:
Dict: 考试记录列表(包含统计信息)
"""
# 构建查询条件
conditions = [Exam.user_id == user_id]
if course_id:
conditions.append(Exam.course_id == course_id)
# 查询总数
count_stmt = select(func.count(Exam.id)).where(and_(*conditions))
total = await db.scalar(count_stmt)
# 查询考试数据JOIN courses表获取课程名称
offset = (page - 1) * size
stmt = (
select(Exam, Course.name.label("course_name"))
.join(Course, Exam.course_id == Course.id)
.where(and_(*conditions))
.order_by(Exam.created_at.desc())
.offset(offset)
.limit(size)
)
result = await db.execute(stmt)
rows = result.all()
# 构建返回数据
items = []
for exam, course_name in rows:
# 1. 计算用时
duration_seconds = None
if exam.start_time and exam.end_time:
duration_seconds = int((exam.end_time - exam.start_time).total_seconds())
# 2. 统计错题数
mistakes_stmt = select(func.count(ExamMistake.id)).where(
ExamMistake.exam_id == exam.id
)
wrong_count = await db.scalar(mistakes_stmt) or 0
# 3. 计算正确数和正确率
correct_count = exam.question_count - wrong_count if exam.question_count else 0
accuracy = None
if exam.question_count and exam.question_count > 0:
accuracy = round((correct_count / exam.question_count) * 100, 1)
# 4. 分题型统计
question_type_stats = []
if exam.questions:
try:
# 解析questions JSON统计每种题型的总数
questions_data = json.loads(exam.questions) if isinstance(exam.questions, str) else exam.questions
type_totals = {}
type_scores = {} # 存储每种题型的总分
for q in questions_data:
q_type = q.get("type", "unknown")
q_score = q.get("score", 0)
type_totals[q_type] = type_totals.get(q_type, 0) + 1
type_scores[q_type] = type_scores.get(q_type, 0) + q_score
# 查询错题按题型统计
mistakes_by_type_stmt = (
select(ExamMistake.question_type, func.count(ExamMistake.id))
.where(ExamMistake.exam_id == exam.id)
.group_by(ExamMistake.question_type)
)
mistakes_by_type_result = await db.execute(mistakes_by_type_stmt)
mistakes_by_type = dict(mistakes_by_type_result.all())
# 题型名称映射
type_name_map = {
"single": "单选题",
"multiple": "多选题",
"judge": "判断题",
"blank": "填空题",
"essay": "问答题"
}
# 组装分题型统计
for q_type, total in type_totals.items():
wrong = mistakes_by_type.get(q_type, 0)
correct = total - wrong
type_accuracy = round((correct / total) * 100, 1) if total > 0 else 0
question_type_stats.append({
"type": type_name_map.get(q_type, q_type),
"type_code": q_type,
"total": total,
"correct": correct,
"wrong": wrong,
"accuracy": type_accuracy,
"total_score": type_scores.get(q_type, 0)
})
except (json.JSONDecodeError, TypeError, KeyError) as e:
# 如果JSON解析失败返回空统计
question_type_stats = []
items.append(
{
"id": exam.id,
"course_id": exam.course_id,
"course_name": course_name,
"exam_name": exam.exam_name,
"question_count": exam.question_count,
"total_score": exam.total_score,
"score": exam.score,
"is_passed": exam.is_passed,
"status": exam.status,
"start_time": exam.start_time.isoformat() if exam.start_time else None,
"end_time": exam.end_time.isoformat() if exam.end_time else None,
"created_at": exam.created_at.isoformat(),
# 统计字段
"accuracy": accuracy,
"correct_count": correct_count,
"wrong_count": wrong_count,
"duration_seconds": duration_seconds,
"question_type_stats": question_type_stats,
}
)
return {
"items": items,
"total": total,
"page": page,
"size": size,
"pages": (total + size - 1) // size,
}
@staticmethod
async def get_exam_statistics(
db: AsyncSession, user_id: int, course_id: Optional[int] = None
) -> Dict[str, Any]:
"""
获取考试统计信息
Args:
db: 数据库会话
user_id: 用户ID
course_id: 课程ID可选
Returns:
Dict: 统计信息
"""
# 构建查询条件
conditions = [Exam.user_id == user_id, Exam.status == "submitted"]
if course_id:
conditions.append(Exam.course_id == course_id)
# 查询统计数据
stmt = select(
func.count(Exam.id).label("total_exams"),
func.count(func.nullif(Exam.is_passed, False)).label("passed_exams"),
func.avg(Exam.score).label("avg_score"),
func.max(Exam.score).label("max_score"),
func.min(Exam.score).label("min_score"),
).where(and_(*conditions))
result = await db.execute(stmt)
stats = result.one()
return {
"total_exams": stats.total_exams or 0,
"passed_exams": stats.passed_exams or 0,
"pass_rate": (stats.passed_exams / stats.total_exams * 100)
if stats.total_exams > 0
else 0,
"avg_score": float(stats.avg_score or 0),
"max_score": float(stats.max_score or 0),
"min_score": float(stats.min_score or 0),
}