Files
012-kaopeilian/backend/app/services/exam_report_service.py
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

487 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
考试报告和错题统计服务
"""
from datetime import datetime, timedelta
from typing import List, Optional, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_, or_, desc, case, text
from app.models.exam import Exam
from app.models.exam_mistake import ExamMistake
from app.models.course import Course, KnowledgePoint
from app.core.logger import get_logger
logger = get_logger(__name__)
class ExamReportService:
"""考试报告服务类"""
@staticmethod
async def get_exam_report(
db: AsyncSession,
user_id: int,
start_date: Optional[str] = None,
end_date: Optional[str] = None
) -> Dict[str, Any]:
"""
获取成绩报告汇总数据
Args:
db: 数据库会话
user_id: 用户ID
start_date: 开始日期(YYYY-MM-DD)
end_date: 结束日期(YYYY-MM-DD)
Returns:
Dict: 包含overview、trends、subjects、recent_exams的完整报告数据
"""
logger.info(f"获取成绩报告 - user_id: {user_id}, start_date: {start_date}, end_date: {end_date}")
# 构建基础查询条件
conditions = [Exam.user_id == user_id]
# 添加时间范围条件
if start_date:
conditions.append(Exam.start_time >= start_date)
if end_date:
# 结束日期包含当天全部
end_datetime = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
conditions.append(Exam.start_time < end_datetime)
# 1. 获取概览数据
overview = await ExamReportService._get_overview(db, conditions)
# 2. 获取趋势数据最近30天
trends = await ExamReportService._get_trends(db, user_id, conditions)
# 3. 获取科目分析
subjects = await ExamReportService._get_subjects(db, conditions)
# 4. 获取最近考试记录
recent_exams = await ExamReportService._get_recent_exams(db, conditions)
return {
"overview": overview,
"trends": trends,
"subjects": subjects,
"recent_exams": recent_exams
}
@staticmethod
async def _get_overview(db: AsyncSession, conditions: List) -> Dict[str, Any]:
"""获取概览数据"""
# 查询统计数据使用round1_score作为主要成绩
stmt = select(
func.count(Exam.id).label("total_exams"),
func.avg(Exam.round1_score).label("avg_score"),
func.sum(Exam.question_count).label("total_questions"),
func.count(case((Exam.is_passed == True, 1))).label("passed_exams")
).where(
and_(*conditions, Exam.round1_score.isnot(None))
)
result = await db.execute(stmt)
stats = result.one()
total_exams = stats.total_exams or 0
passed_exams = stats.passed_exams or 0
return {
"avg_score": round(float(stats.avg_score or 0), 1),
"total_exams": total_exams,
"pass_rate": round((passed_exams / total_exams * 100) if total_exams > 0 else 0, 1),
"total_questions": stats.total_questions or 0
}
@staticmethod
async def _get_trends(
db: AsyncSession,
user_id: int,
base_conditions: List
) -> List[Dict[str, Any]]:
"""获取成绩趋势最近30天"""
# 计算30天前的日期
thirty_days_ago = datetime.now() - timedelta(days=30)
# 查询最近30天的考试数据按日期分组
stmt = select(
func.date(Exam.start_time).label("exam_date"),
func.avg(Exam.round1_score).label("avg_score")
).where(
and_(
Exam.user_id == user_id,
Exam.start_time >= thirty_days_ago,
Exam.round1_score.isnot(None)
)
).group_by(
func.date(Exam.start_time)
).order_by(
func.date(Exam.start_time)
)
result = await db.execute(stmt)
trend_data = result.all()
# 转换为前端需要的格式
trends = []
for row in trend_data:
trends.append({
"date": row.exam_date.strftime("%Y-%m-%d") if row.exam_date else "",
"avg_score": round(float(row.avg_score or 0), 1)
})
return trends
@staticmethod
async def _get_subjects(db: AsyncSession, conditions: List) -> List[Dict[str, Any]]:
"""获取科目分析"""
# 关联course表按课程分组统计
stmt = select(
Exam.course_id,
Course.name.label("course_name"),
func.avg(Exam.round1_score).label("avg_score"),
func.count(Exam.id).label("exam_count"),
func.max(Exam.round1_score).label("max_score"),
func.min(Exam.round1_score).label("min_score"),
func.count(case((Exam.is_passed == True, 1))).label("passed_count")
).join(
Course, Exam.course_id == Course.id
).where(
and_(*conditions, Exam.round1_score.isnot(None))
).group_by(
Exam.course_id, Course.name
).order_by(
desc(func.count(Exam.id))
)
result = await db.execute(stmt)
subject_data = result.all()
subjects = []
for row in subject_data:
exam_count = row.exam_count or 0
passed_count = row.passed_count or 0
subjects.append({
"course_id": row.course_id,
"course_name": row.course_name,
"avg_score": round(float(row.avg_score or 0), 1),
"exam_count": exam_count,
"max_score": round(float(row.max_score or 0), 1),
"min_score": round(float(row.min_score or 0), 1),
"pass_rate": round((passed_count / exam_count * 100) if exam_count > 0 else 0, 1)
})
return subjects
@staticmethod
async def _get_recent_exams(db: AsyncSession, conditions: List) -> List[Dict[str, Any]]:
"""获取最近10次考试记录"""
# 查询最近10次考试包含三轮得分
stmt = select(
Exam.id,
Exam.course_id,
Course.name.label("course_name"),
Exam.score,
Exam.total_score,
Exam.is_passed,
Exam.start_time,
Exam.end_time,
Exam.round1_score,
Exam.round2_score,
Exam.round3_score
).join(
Course, Exam.course_id == Course.id
).where(
and_(*conditions)
).order_by(
desc(Exam.created_at) # 改为按创建时间排序避免start_time为NULL的问题
).limit(10)
result = await db.execute(stmt)
exam_data = result.all()
recent_exams = []
for row in exam_data:
# 计算考试用时
duration_seconds = None
if row.start_time and row.end_time:
duration_seconds = int((row.end_time - row.start_time).total_seconds())
recent_exams.append({
"id": row.id,
"course_id": row.course_id,
"course_name": row.course_name,
"score": round(float(row.score), 1) if row.score else None,
"total_score": round(float(row.total_score or 100), 1),
"is_passed": row.is_passed,
"duration_seconds": duration_seconds,
"start_time": row.start_time.isoformat() if row.start_time else None,
"end_time": row.end_time.isoformat() if row.end_time else None,
"round_scores": {
"round1": round(float(row.round1_score), 1) if row.round1_score else None,
"round2": round(float(row.round2_score), 1) if row.round2_score else None,
"round3": round(float(row.round3_score), 1) if row.round3_score else None
}
})
return recent_exams
class MistakeService:
"""错题服务类"""
@staticmethod
async def get_mistakes_list(
db: AsyncSession,
user_id: int,
exam_id: Optional[int] = None,
course_id: Optional[int] = None,
question_type: Optional[str] = None,
search: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
page: int = 1,
size: int = 10
) -> Dict[str, Any]:
"""
获取错题列表(支持多维度筛选)
Args:
db: 数据库会话
user_id: 用户ID
exam_id: 考试ID可选
course_id: 课程ID可选
question_type: 题型(可选)
search: 关键词搜索(可选)
start_date: 开始日期(可选)
end_date: 结束日期(可选)
page: 页码
size: 每页数量
Returns:
Dict: 包含items、total、page、size、pages的分页数据
"""
logger.info(f"获取错题列表 - user_id: {user_id}, exam_id: {exam_id}, course_id: {course_id}")
# 构建查询条件
conditions = [ExamMistake.user_id == user_id]
if exam_id:
conditions.append(ExamMistake.exam_id == exam_id)
if question_type:
conditions.append(ExamMistake.question_type == question_type)
if search:
conditions.append(ExamMistake.question_content.like(f"%{search}%"))
if start_date:
conditions.append(ExamMistake.created_at >= start_date)
if end_date:
end_datetime = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
conditions.append(ExamMistake.created_at < end_datetime)
# 如果指定了course_id需要通过exam关联
if course_id:
conditions.append(Exam.course_id == course_id)
# 查询总数
count_stmt = select(func.count(ExamMistake.id)).select_from(ExamMistake).join(
Exam, ExamMistake.exam_id == Exam.id
).where(and_(*conditions))
total_result = await db.execute(count_stmt)
total = total_result.scalar() or 0
# 查询分页数据
offset = (page - 1) * size
stmt = select(
ExamMistake.id,
ExamMistake.exam_id,
Exam.course_id,
Course.name.label("course_name"),
ExamMistake.question_content,
ExamMistake.correct_answer,
ExamMistake.user_answer,
ExamMistake.question_type,
ExamMistake.knowledge_point_id,
KnowledgePoint.name.label("knowledge_point_name"),
ExamMistake.created_at
).select_from(ExamMistake).join(
Exam, ExamMistake.exam_id == Exam.id
).join(
Course, Exam.course_id == Course.id
).outerjoin(
KnowledgePoint, ExamMistake.knowledge_point_id == KnowledgePoint.id
).where(
and_(*conditions)
).order_by(
desc(ExamMistake.created_at)
).offset(offset).limit(size)
result = await db.execute(stmt)
mistakes = result.all()
# 构建返回数据
items = []
for row in mistakes:
items.append({
"id": row.id,
"exam_id": row.exam_id,
"course_id": row.course_id,
"course_name": row.course_name,
"question_content": row.question_content,
"correct_answer": row.correct_answer,
"user_answer": row.user_answer,
"question_type": row.question_type,
"knowledge_point_id": row.knowledge_point_id,
"knowledge_point_name": row.knowledge_point_name,
"created_at": row.created_at
})
pages = (total + size - 1) // size
return {
"items": items,
"total": total,
"page": page,
"size": size,
"pages": pages
}
@staticmethod
async def get_mistakes_statistics(
db: AsyncSession,
user_id: int,
course_id: Optional[int] = None
) -> Dict[str, Any]:
"""
获取错题统计数据
Args:
db: 数据库会话
user_id: 用户ID
course_id: 课程ID可选
Returns:
Dict: 包含total、by_course、by_type、by_time的统计数据
"""
logger.info(f"获取错题统计 - user_id: {user_id}, course_id: {course_id}")
# 基础条件
base_conditions = [ExamMistake.user_id == user_id]
if course_id:
base_conditions.append(Exam.course_id == course_id)
# 1. 总数统计
count_stmt = select(func.count(ExamMistake.id)).select_from(ExamMistake).join(
Exam, ExamMistake.exam_id == Exam.id
).where(and_(*base_conditions))
total_result = await db.execute(count_stmt)
total = total_result.scalar() or 0
# 2. 按课程统计
by_course_stmt = select(
Exam.course_id,
Course.name.label("course_name"),
func.count(ExamMistake.id).label("count")
).select_from(ExamMistake).join(
Exam, ExamMistake.exam_id == Exam.id
).join(
Course, Exam.course_id == Course.id
).where(
ExamMistake.user_id == user_id
).group_by(
Exam.course_id, Course.name
).order_by(
desc(func.count(ExamMistake.id))
)
by_course_result = await db.execute(by_course_stmt)
by_course_data = by_course_result.all()
by_course = [
{
"course_id": row.course_id,
"course_name": row.course_name,
"count": row.count
}
for row in by_course_data
]
# 3. 按题型统计
by_type_stmt = select(
ExamMistake.question_type,
func.count(ExamMistake.id).label("count")
).where(
and_(ExamMistake.user_id == user_id, ExamMistake.question_type.isnot(None))
).group_by(
ExamMistake.question_type
)
by_type_result = await db.execute(by_type_stmt)
by_type_data = by_type_result.all()
# 题型名称映射
type_names = {
"single": "单选题",
"multiple": "多选题",
"judge": "判断题",
"blank": "填空题",
"essay": "问答题"
}
by_type = [
{
"type": row.question_type,
"type_name": type_names.get(row.question_type, "未知类型"),
"count": row.count
}
for row in by_type_data
]
# 4. 按时间统计
now = datetime.now()
week_ago = now - timedelta(days=7)
month_ago = now - timedelta(days=30)
quarter_ago = now - timedelta(days=90)
# 最近一周
week_stmt = select(func.count(ExamMistake.id)).where(
and_(ExamMistake.user_id == user_id, ExamMistake.created_at >= week_ago)
)
week_result = await db.execute(week_stmt)
week_count = week_result.scalar() or 0
# 最近一月
month_stmt = select(func.count(ExamMistake.id)).where(
and_(ExamMistake.user_id == user_id, ExamMistake.created_at >= month_ago)
)
month_result = await db.execute(month_stmt)
month_count = month_result.scalar() or 0
# 最近三月
quarter_stmt = select(func.count(ExamMistake.id)).where(
and_(ExamMistake.user_id == user_id, ExamMistake.created_at >= quarter_ago)
)
quarter_result = await db.execute(quarter_stmt)
quarter_count = quarter_result.scalar() or 0
by_time = {
"week": week_count,
"month": month_count,
"quarter": quarter_count
}
return {
"total": total,
"by_course": by_course,
"by_type": by_type,
"by_time": by_time
}