Files
012-kaopeilian/backend/app/services/exam_service.py
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

440 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
考试服务层
"""
import json
import random
from datetime import datetime, timedelta
from typing import List, Optional, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_, or_, desc
from app.models.exam import Exam, Question, ExamResult
from app.models.exam_mistake import ExamMistake
from app.models.course import Course, KnowledgePoint
from app.core.exceptions import BusinessException, ErrorCode
class ExamService:
"""考试服务类"""
@staticmethod
async def start_exam(
db: AsyncSession, user_id: int, course_id: int, question_count: int = 10
) -> Exam:
"""
开始考试
Args:
db: 数据库会话
user_id: 用户ID
course_id: 课程ID
question_count: 题目数量
Returns:
Exam: 考试实例
"""
# 检查课程是否存在
course = await db.get(Course, course_id)
if not course:
raise BusinessException(error_code=ErrorCode.NOT_FOUND, message="课程不存在")
# 获取该课程的所有可用题目
stmt = select(Question).where(
and_(Question.course_id == course_id, Question.is_active == True)
)
result = await db.execute(stmt)
all_questions = result.scalars().all()
if not all_questions:
raise BusinessException(
error_code=ErrorCode.VALIDATION_ERROR, message="该课程暂无题目"
)
# 随机选择题目
selected_questions = random.sample(
all_questions, min(question_count, len(all_questions))
)
# 构建题目数据
questions_data = []
for q in selected_questions:
question_data = {
"id": str(q.id),
"type": q.question_type,
"title": q.title,
"content": q.content,
"options": q.options,
"score": q.score,
}
questions_data.append(question_data)
# 创建考试记录
exam = Exam(
user_id=user_id,
course_id=course_id,
exam_name=f"{course.name} - 随机测试",
question_count=len(selected_questions),
total_score=sum(q.score for q in selected_questions),
pass_score=sum(q.score for q in selected_questions) * 0.6,
duration_minutes=60,
status="started",
questions={"questions": questions_data},
)
db.add(exam)
await db.commit()
await db.refresh(exam)
return exam
@staticmethod
async def submit_exam(
db: AsyncSession, user_id: int, exam_id: int, answers: List[Dict[str, str]]
) -> Dict[str, Any]:
"""
提交考试答案
Args:
db: 数据库会话
user_id: 用户ID
exam_id: 考试ID
answers: 答案列表
Returns:
Dict: 考试结果
"""
# 获取考试记录
stmt = select(Exam).where(and_(Exam.id == exam_id, Exam.user_id == user_id))
result = await db.execute(stmt)
exam = result.scalar_one_or_none()
if not exam:
raise BusinessException(error_code=ErrorCode.NOT_FOUND, message="考试记录不存在")
if exam.status != "started":
raise BusinessException(
error_code=ErrorCode.VALIDATION_ERROR, message="考试已结束或已提交"
)
# 检查考试是否超时
if datetime.now() > exam.start_time + timedelta(
minutes=exam.duration_minutes
):
exam.status = "timeout"
await db.commit()
raise BusinessException(
error_code=ErrorCode.VALIDATION_ERROR, message="考试已超时"
)
# 处理答案
answers_dict = {ans["question_id"]: ans["answer"] for ans in answers}
total_score = 0.0
correct_count = 0
# 批量获取题目
question_ids = [int(ans["question_id"]) for ans in answers]
stmt = select(Question).where(Question.id.in_(question_ids))
result = await db.execute(stmt)
questions_map = {str(q.id): q for q in result.scalars().all()}
# 创建答题结果记录
for question_data in exam.questions["questions"]:
question_id = question_data["id"]
question = questions_map.get(question_id)
if not question:
continue
user_answer = answers_dict.get(question_id, "")
is_correct = user_answer == question.correct_answer
if is_correct:
total_score += question.score
correct_count += 1
# 创建答题结果记录
exam_result = ExamResult(
exam_id=exam_id,
question_id=int(question_id),
user_answer=user_answer,
is_correct=is_correct,
score=question.score if is_correct else 0.0,
)
db.add(exam_result)
# 更新题目使用统计
question.usage_count += 1
if is_correct:
question.correct_count += 1
# 更新考试记录
exam.end_time = datetime.now()
exam.score = total_score
exam.is_passed = total_score >= exam.pass_score
exam.status = "submitted"
exam.answers = {"answers": answers}
await db.commit()
return {
"exam_id": exam_id,
"total_score": total_score,
"pass_score": exam.pass_score,
"is_passed": exam.is_passed,
"correct_count": correct_count,
"total_count": exam.question_count,
"accuracy": correct_count / exam.question_count
if exam.question_count > 0
else 0,
}
@staticmethod
async def get_exam_detail(
db: AsyncSession, user_id: int, exam_id: int
) -> Dict[str, Any]:
"""
获取考试详情
Args:
db: 数据库会话
user_id: 用户ID
exam_id: 考试ID
Returns:
Dict: 考试详情
"""
# 获取考试记录
stmt = select(Exam).where(and_(Exam.id == exam_id, Exam.user_id == user_id))
result = await db.execute(stmt)
exam = result.scalar_one_or_none()
if not exam:
raise BusinessException(error_code=ErrorCode.NOT_FOUND, message="考试记录不存在")
# 构建返回数据
exam_data = {
"id": exam.id,
"course_id": exam.course_id,
"exam_name": exam.exam_name,
"question_count": exam.question_count,
"total_score": exam.total_score,
"pass_score": exam.pass_score,
"start_time": exam.start_time.isoformat() if exam.start_time else None,
"end_time": exam.end_time.isoformat() if exam.end_time else None,
"duration_minutes": exam.duration_minutes,
"status": exam.status,
"score": exam.score,
"is_passed": exam.is_passed,
"questions": exam.questions,
}
# 如果考试已提交,获取答题详情
if exam.status == "submitted" and exam.answers:
stmt = select(ExamResult).where(ExamResult.exam_id == exam_id)
result = await db.execute(stmt)
results = result.scalars().all()
results_data = []
for r in results:
results_data.append(
{
"question_id": r.question_id,
"user_answer": r.user_answer,
"is_correct": r.is_correct,
"score": r.score,
}
)
exam_data["results"] = results_data
exam_data["answers"] = exam.answers
return exam_data
@staticmethod
async def get_exam_records(
db: AsyncSession,
user_id: int,
page: int = 1,
size: int = 10,
course_id: Optional[int] = None,
) -> Dict[str, Any]:
"""
获取考试记录列表(包含统计数据)
Args:
db: 数据库会话
user_id: 用户ID
page: 页码
size: 每页数量
course_id: 课程ID可选
Returns:
Dict: 考试记录列表(包含统计信息)
"""
# 构建查询条件
conditions = [Exam.user_id == user_id]
if course_id:
conditions.append(Exam.course_id == course_id)
# 查询总数
count_stmt = select(func.count(Exam.id)).where(and_(*conditions))
total = await db.scalar(count_stmt)
# 查询考试数据JOIN courses表获取课程名称
offset = (page - 1) * size
stmt = (
select(Exam, Course.name.label("course_name"))
.join(Course, Exam.course_id == Course.id)
.where(and_(*conditions))
.order_by(Exam.created_at.desc())
.offset(offset)
.limit(size)
)
result = await db.execute(stmt)
rows = result.all()
# 构建返回数据
items = []
for exam, course_name in rows:
# 1. 计算用时
duration_seconds = None
if exam.start_time and exam.end_time:
duration_seconds = int((exam.end_time - exam.start_time).total_seconds())
# 2. 统计错题数
mistakes_stmt = select(func.count(ExamMistake.id)).where(
ExamMistake.exam_id == exam.id
)
wrong_count = await db.scalar(mistakes_stmt) or 0
# 3. 计算正确数和正确率
correct_count = exam.question_count - wrong_count if exam.question_count else 0
accuracy = None
if exam.question_count and exam.question_count > 0:
accuracy = round((correct_count / exam.question_count) * 100, 1)
# 4. 分题型统计
question_type_stats = []
if exam.questions:
try:
# 解析questions JSON统计每种题型的总数
questions_data = json.loads(exam.questions) if isinstance(exam.questions, str) else exam.questions
type_totals = {}
type_scores = {} # 存储每种题型的总分
for q in questions_data:
q_type = q.get("type", "unknown")
q_score = q.get("score", 0)
type_totals[q_type] = type_totals.get(q_type, 0) + 1
type_scores[q_type] = type_scores.get(q_type, 0) + q_score
# 查询错题按题型统计
mistakes_by_type_stmt = (
select(ExamMistake.question_type, func.count(ExamMistake.id))
.where(ExamMistake.exam_id == exam.id)
.group_by(ExamMistake.question_type)
)
mistakes_by_type_result = await db.execute(mistakes_by_type_stmt)
mistakes_by_type = dict(mistakes_by_type_result.all())
# 题型名称映射
type_name_map = {
"single": "单选题",
"multiple": "多选题",
"judge": "判断题",
"blank": "填空题",
"essay": "问答题"
}
# 组装分题型统计
for q_type, total in type_totals.items():
wrong = mistakes_by_type.get(q_type, 0)
correct = total - wrong
type_accuracy = round((correct / total) * 100, 1) if total > 0 else 0
question_type_stats.append({
"type": type_name_map.get(q_type, q_type),
"type_code": q_type,
"total": total,
"correct": correct,
"wrong": wrong,
"accuracy": type_accuracy,
"total_score": type_scores.get(q_type, 0)
})
except (json.JSONDecodeError, TypeError, KeyError) as e:
# 如果JSON解析失败返回空统计
question_type_stats = []
items.append(
{
"id": exam.id,
"course_id": exam.course_id,
"course_name": course_name,
"exam_name": exam.exam_name,
"question_count": exam.question_count,
"total_score": exam.total_score,
"score": exam.score,
"is_passed": exam.is_passed,
"status": exam.status,
"start_time": exam.start_time.isoformat() if exam.start_time else None,
"end_time": exam.end_time.isoformat() if exam.end_time else None,
"created_at": exam.created_at.isoformat(),
# 统计字段
"accuracy": accuracy,
"correct_count": correct_count,
"wrong_count": wrong_count,
"duration_seconds": duration_seconds,
"question_type_stats": question_type_stats,
}
)
return {
"items": items,
"total": total,
"page": page,
"size": size,
"pages": (total + size - 1) // size,
}
@staticmethod
async def get_exam_statistics(
db: AsyncSession, user_id: int, course_id: Optional[int] = None
) -> Dict[str, Any]:
"""
获取考试统计信息
Args:
db: 数据库会话
user_id: 用户ID
course_id: 课程ID可选
Returns:
Dict: 统计信息
"""
# 构建查询条件
conditions = [Exam.user_id == user_id, Exam.status == "submitted"]
if course_id:
conditions.append(Exam.course_id == course_id)
# 查询统计数据
stmt = select(
func.count(Exam.id).label("total_exams"),
func.count(func.nullif(Exam.is_passed, False)).label("passed_exams"),
func.avg(Exam.score).label("avg_score"),
func.max(Exam.score).label("max_score"),
func.min(Exam.score).label("min_score"),
).where(and_(*conditions))
result = await db.execute(stmt)
stats = result.one()
return {
"total_exams": stats.total_exams or 0,
"passed_exams": stats.passed_exams or 0,
"pass_rate": (stats.passed_exams / stats.total_exams * 100)
if stats.total_exams > 0
else 0,
"avg_score": float(stats.avg_score or 0),
"max_score": float(stats.max_score or 0),
"min_score": float(stats.min_score or 0),
}