""" 考试服务层 """ import json import random from datetime import datetime, timedelta from typing import List, Optional, Dict, Any from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, and_, or_, desc from app.models.exam import Exam, Question, ExamResult from app.models.exam_mistake import ExamMistake from app.models.course import Course, KnowledgePoint from app.core.exceptions import BusinessException, ErrorCode from app.utils.score_distributor import ScoreDistributor class ExamService: """考试服务类""" @staticmethod async def start_exam( db: AsyncSession, user_id: int, course_id: int, question_count: int = 10 ) -> Exam: """ 开始考试 Args: db: 数据库会话 user_id: 用户ID course_id: 课程ID question_count: 题目数量 Returns: Exam: 考试实例 """ # 检查课程是否存在 course = await db.get(Course, course_id) if not course: raise BusinessException(error_code=ErrorCode.NOT_FOUND, message="课程不存在") # 获取该课程的所有可用题目 stmt = select(Question).where( and_(Question.course_id == course_id, Question.is_active == True) ) result = await db.execute(stmt) all_questions = result.scalars().all() if not all_questions: raise BusinessException( error_code=ErrorCode.VALIDATION_ERROR, message="该课程暂无题目" ) # 随机选择题目 selected_questions = random.sample( all_questions, min(question_count, len(all_questions)) ) # 使用智能分数分配器,确保总分为整数且分配合理 total_score = 100.0 # 固定总分为100分 actual_question_count = len(selected_questions) # 创建分数分配器 distributor = ScoreDistributor(total_score, actual_question_count) distributed_scores = distributor.distribute(mode="integer") # 构建题目数据,使用分配后的分数 questions_data = [] for i, q in enumerate(selected_questions): question_data = { "id": str(q.id), "type": q.question_type, "title": q.title, "content": q.content, "options": q.options, "score": distributed_scores[i], # 使用智能分配的分数 } questions_data.append(question_data) # 计算及格分数(向上取整,确保公平) pass_score = ScoreDistributor.calculate_pass_score(total_score, 0.6) # 创建考试记录 exam = Exam( user_id=user_id, course_id=course_id, exam_name=f"{course.name} - 随机测试", question_count=actual_question_count, total_score=total_score, pass_score=pass_score, duration_minutes=60, status="started", questions={"questions": questions_data}, ) db.add(exam) await db.commit() await db.refresh(exam) return exam @staticmethod async def submit_exam( db: AsyncSession, user_id: int, exam_id: int, answers: List[Dict[str, str]] ) -> Dict[str, Any]: """ 提交考试答案 Args: db: 数据库会话 user_id: 用户ID exam_id: 考试ID answers: 答案列表 Returns: Dict: 考试结果 """ # 获取考试记录 stmt = select(Exam).where(and_(Exam.id == exam_id, Exam.user_id == user_id)) result = await db.execute(stmt) exam = result.scalar_one_or_none() if not exam: raise BusinessException(error_code=ErrorCode.NOT_FOUND, message="考试记录不存在") if exam.status != "started": raise BusinessException( error_code=ErrorCode.VALIDATION_ERROR, message="考试已结束或已提交" ) # 检查考试是否超时 if datetime.now() > exam.start_time + timedelta( minutes=exam.duration_minutes ): exam.status = "timeout" await db.commit() raise BusinessException( error_code=ErrorCode.VALIDATION_ERROR, message="考试已超时" ) # 处理答案 answers_dict = {ans["question_id"]: ans["answer"] for ans in answers} total_score = 0.0 correct_count = 0 # 批量获取题目 question_ids = [int(ans["question_id"]) for ans in answers] stmt = select(Question).where(Question.id.in_(question_ids)) result = await db.execute(stmt) questions_map = {str(q.id): q for q in result.scalars().all()} # 创建答题结果记录 for question_data in exam.questions["questions"]: question_id = question_data["id"] question = questions_map.get(question_id) # 使用考试创建时分配的分数(智能分配后的整数分数) allocated_score = question_data.get("score", 10.0) if not question: continue user_answer = answers_dict.get(question_id, "") is_correct = user_answer == question.correct_answer if is_correct: total_score += allocated_score # 使用分配的分数 correct_count += 1 # 创建答题结果记录 exam_result = ExamResult( exam_id=exam_id, question_id=int(question_id), user_answer=user_answer, is_correct=is_correct, score=allocated_score if is_correct else 0.0, # 使用分配的分数 ) db.add(exam_result) # 更新题目使用统计 question.usage_count += 1 if is_correct: question.correct_count += 1 # 更新考试记录 exam.end_time = datetime.now() exam.score = total_score exam.is_passed = total_score >= exam.pass_score exam.status = "submitted" exam.answers = {"answers": answers} await db.commit() return { "exam_id": exam_id, "total_score": total_score, "pass_score": exam.pass_score, "is_passed": exam.is_passed, "correct_count": correct_count, "total_count": exam.question_count, "accuracy": correct_count / exam.question_count if exam.question_count > 0 else 0, } @staticmethod async def get_exam_detail( db: AsyncSession, user_id: int, exam_id: int ) -> Dict[str, Any]: """ 获取考试详情 Args: db: 数据库会话 user_id: 用户ID exam_id: 考试ID Returns: Dict: 考试详情 """ # 获取考试记录 stmt = select(Exam).where(and_(Exam.id == exam_id, Exam.user_id == user_id)) result = await db.execute(stmt) exam = result.scalar_one_or_none() if not exam: raise BusinessException(error_code=ErrorCode.NOT_FOUND, message="考试记录不存在") # 构建返回数据 exam_data = { "id": exam.id, "course_id": exam.course_id, "exam_name": exam.exam_name, "question_count": exam.question_count, "total_score": exam.total_score, "pass_score": exam.pass_score, "start_time": exam.start_time.isoformat() if exam.start_time else None, "end_time": exam.end_time.isoformat() if exam.end_time else None, "duration_minutes": exam.duration_minutes, "status": exam.status, "score": exam.score, "is_passed": exam.is_passed, "questions": exam.questions, } # 如果考试已提交,获取答题详情 if exam.status == "submitted" and exam.answers: stmt = select(ExamResult).where(ExamResult.exam_id == exam_id) result = await db.execute(stmt) results = result.scalars().all() results_data = [] for r in results: results_data.append( { "question_id": r.question_id, "user_answer": r.user_answer, "is_correct": r.is_correct, "score": r.score, } ) exam_data["results"] = results_data exam_data["answers"] = exam.answers return exam_data @staticmethod async def get_exam_records( db: AsyncSession, user_id: int, page: int = 1, size: int = 10, course_id: Optional[int] = None, ) -> Dict[str, Any]: """ 获取考试记录列表(包含统计数据) Args: db: 数据库会话 user_id: 用户ID page: 页码 size: 每页数量 course_id: 课程ID(可选) Returns: Dict: 考试记录列表(包含统计信息) """ # 构建查询条件 conditions = [Exam.user_id == user_id] if course_id: conditions.append(Exam.course_id == course_id) # 查询总数 count_stmt = select(func.count(Exam.id)).where(and_(*conditions)) total = await db.scalar(count_stmt) # 查询考试数据(JOIN courses表获取课程名称) offset = (page - 1) * size stmt = ( select(Exam, Course.name.label("course_name")) .join(Course, Exam.course_id == Course.id) .where(and_(*conditions)) .order_by(Exam.created_at.desc()) .offset(offset) .limit(size) ) result = await db.execute(stmt) rows = result.all() # 构建返回数据 items = [] for exam, course_name in rows: # 1. 计算用时 duration_seconds = None if exam.start_time and exam.end_time: duration_seconds = int((exam.end_time - exam.start_time).total_seconds()) # 2. 统计错题数 mistakes_stmt = select(func.count(ExamMistake.id)).where( ExamMistake.exam_id == exam.id ) wrong_count = await db.scalar(mistakes_stmt) or 0 # 3. 计算正确数和正确率 correct_count = exam.question_count - wrong_count if exam.question_count else 0 accuracy = None if exam.question_count and exam.question_count > 0: accuracy = round((correct_count / exam.question_count) * 100, 1) # 4. 分题型统计 question_type_stats = [] if exam.questions: try: # 解析questions JSON,统计每种题型的总数 questions_data = json.loads(exam.questions) if isinstance(exam.questions, str) else exam.questions type_totals = {} type_scores = {} # 存储每种题型的总分 for q in questions_data: q_type = q.get("type", "unknown") q_score = q.get("score", 0) type_totals[q_type] = type_totals.get(q_type, 0) + 1 type_scores[q_type] = type_scores.get(q_type, 0) + q_score # 查询错题按题型统计 mistakes_by_type_stmt = ( select(ExamMistake.question_type, func.count(ExamMistake.id)) .where(ExamMistake.exam_id == exam.id) .group_by(ExamMistake.question_type) ) mistakes_by_type_result = await db.execute(mistakes_by_type_stmt) mistakes_by_type = dict(mistakes_by_type_result.all()) # 题型名称映射 type_name_map = { "single": "单选题", "multiple": "多选题", "judge": "判断题", "blank": "填空题", "essay": "问答题" } # 组装分题型统计 for q_type, total in type_totals.items(): wrong = mistakes_by_type.get(q_type, 0) correct = total - wrong type_accuracy = round((correct / total) * 100, 1) if total > 0 else 0 question_type_stats.append({ "type": type_name_map.get(q_type, q_type), "type_code": q_type, "total": total, "correct": correct, "wrong": wrong, "accuracy": type_accuracy, "total_score": type_scores.get(q_type, 0) }) except (json.JSONDecodeError, TypeError, KeyError) as e: # 如果JSON解析失败,返回空统计 question_type_stats = [] items.append( { "id": exam.id, "course_id": exam.course_id, "course_name": course_name, "exam_name": exam.exam_name, "question_count": exam.question_count, "total_score": exam.total_score, "score": exam.score, "is_passed": exam.is_passed, "status": exam.status, "start_time": exam.start_time.isoformat() if exam.start_time else None, "end_time": exam.end_time.isoformat() if exam.end_time else None, "created_at": exam.created_at.isoformat(), # 统计字段 "accuracy": accuracy, "correct_count": correct_count, "wrong_count": wrong_count, "duration_seconds": duration_seconds, "question_type_stats": question_type_stats, } ) return { "items": items, "total": total, "page": page, "size": size, "pages": (total + size - 1) // size, } @staticmethod async def get_exam_statistics( db: AsyncSession, user_id: int, course_id: Optional[int] = None ) -> Dict[str, Any]: """ 获取考试统计信息 Args: db: 数据库会话 user_id: 用户ID course_id: 课程ID(可选) Returns: Dict: 统计信息 """ # 构建查询条件 conditions = [Exam.user_id == user_id, Exam.status == "submitted"] if course_id: conditions.append(Exam.course_id == course_id) # 查询统计数据 stmt = select( func.count(Exam.id).label("total_exams"), func.count(func.nullif(Exam.is_passed, False)).label("passed_exams"), func.avg(Exam.score).label("avg_score"), func.max(Exam.score).label("max_score"), func.min(Exam.score).label("min_score"), ).where(and_(*conditions)) result = await db.execute(stmt) stats = result.one() return { "total_exams": stats.total_exams or 0, "passed_exams": stats.passed_exams or 0, "pass_rate": (stats.passed_exams / stats.total_exams * 100) if stats.total_exams > 0 else 0, "avg_score": float(stats.avg_score or 0), "max_score": float(stats.max_score or 0), "min_score": float(stats.min_score or 0), }