- 从服务器拉取完整代码 - 按框架规范整理项目结构 - 配置 Drone CI 测试环境部署 - 包含后端(FastAPI)、前端(Vue3)、管理端 技术栈: Vue3 + TypeScript + FastAPI + MySQL
487 lines
16 KiB
Python
487 lines
16 KiB
Python
"""
|
||
考试报告和错题统计服务
|
||
"""
|
||
from datetime import datetime, timedelta
|
||
from typing import List, Optional, Dict, Any
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
from sqlalchemy import select, func, and_, or_, desc, case, text
|
||
from app.models.exam import Exam
|
||
from app.models.exam_mistake import ExamMistake
|
||
from app.models.course import Course, KnowledgePoint
|
||
from app.core.logger import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
class ExamReportService:
|
||
"""考试报告服务类"""
|
||
|
||
@staticmethod
|
||
async def get_exam_report(
|
||
db: AsyncSession,
|
||
user_id: int,
|
||
start_date: Optional[str] = None,
|
||
end_date: Optional[str] = None
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
获取成绩报告汇总数据
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
user_id: 用户ID
|
||
start_date: 开始日期(YYYY-MM-DD)
|
||
end_date: 结束日期(YYYY-MM-DD)
|
||
|
||
Returns:
|
||
Dict: 包含overview、trends、subjects、recent_exams的完整报告数据
|
||
"""
|
||
logger.info(f"获取成绩报告 - user_id: {user_id}, start_date: {start_date}, end_date: {end_date}")
|
||
|
||
# 构建基础查询条件
|
||
conditions = [Exam.user_id == user_id]
|
||
|
||
# 添加时间范围条件
|
||
if start_date:
|
||
conditions.append(Exam.start_time >= start_date)
|
||
if end_date:
|
||
# 结束日期包含当天全部
|
||
end_datetime = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
|
||
conditions.append(Exam.start_time < end_datetime)
|
||
|
||
# 1. 获取概览数据
|
||
overview = await ExamReportService._get_overview(db, conditions)
|
||
|
||
# 2. 获取趋势数据(最近30天)
|
||
trends = await ExamReportService._get_trends(db, user_id, conditions)
|
||
|
||
# 3. 获取科目分析
|
||
subjects = await ExamReportService._get_subjects(db, conditions)
|
||
|
||
# 4. 获取最近考试记录
|
||
recent_exams = await ExamReportService._get_recent_exams(db, conditions)
|
||
|
||
return {
|
||
"overview": overview,
|
||
"trends": trends,
|
||
"subjects": subjects,
|
||
"recent_exams": recent_exams
|
||
}
|
||
|
||
@staticmethod
|
||
async def _get_overview(db: AsyncSession, conditions: List) -> Dict[str, Any]:
|
||
"""获取概览数据"""
|
||
# 查询统计数据(使用round1_score作为主要成绩)
|
||
stmt = select(
|
||
func.count(Exam.id).label("total_exams"),
|
||
func.avg(Exam.round1_score).label("avg_score"),
|
||
func.sum(Exam.question_count).label("total_questions"),
|
||
func.count(case((Exam.is_passed == True, 1))).label("passed_exams")
|
||
).where(
|
||
and_(*conditions, Exam.round1_score.isnot(None))
|
||
)
|
||
|
||
result = await db.execute(stmt)
|
||
stats = result.one()
|
||
|
||
total_exams = stats.total_exams or 0
|
||
passed_exams = stats.passed_exams or 0
|
||
|
||
return {
|
||
"avg_score": round(float(stats.avg_score or 0), 1),
|
||
"total_exams": total_exams,
|
||
"pass_rate": round((passed_exams / total_exams * 100) if total_exams > 0 else 0, 1),
|
||
"total_questions": stats.total_questions or 0
|
||
}
|
||
|
||
@staticmethod
|
||
async def _get_trends(
|
||
db: AsyncSession,
|
||
user_id: int,
|
||
base_conditions: List
|
||
) -> List[Dict[str, Any]]:
|
||
"""获取成绩趋势(最近30天)"""
|
||
# 计算30天前的日期
|
||
thirty_days_ago = datetime.now() - timedelta(days=30)
|
||
|
||
# 查询最近30天的考试数据,按日期分组
|
||
stmt = select(
|
||
func.date(Exam.start_time).label("exam_date"),
|
||
func.avg(Exam.round1_score).label("avg_score")
|
||
).where(
|
||
and_(
|
||
Exam.user_id == user_id,
|
||
Exam.start_time >= thirty_days_ago,
|
||
Exam.round1_score.isnot(None)
|
||
)
|
||
).group_by(
|
||
func.date(Exam.start_time)
|
||
).order_by(
|
||
func.date(Exam.start_time)
|
||
)
|
||
|
||
result = await db.execute(stmt)
|
||
trend_data = result.all()
|
||
|
||
# 转换为前端需要的格式
|
||
trends = []
|
||
for row in trend_data:
|
||
trends.append({
|
||
"date": row.exam_date.strftime("%Y-%m-%d") if row.exam_date else "",
|
||
"avg_score": round(float(row.avg_score or 0), 1)
|
||
})
|
||
|
||
return trends
|
||
|
||
@staticmethod
|
||
async def _get_subjects(db: AsyncSession, conditions: List) -> List[Dict[str, Any]]:
|
||
"""获取科目分析"""
|
||
# 关联course表,按课程分组统计
|
||
stmt = select(
|
||
Exam.course_id,
|
||
Course.name.label("course_name"),
|
||
func.avg(Exam.round1_score).label("avg_score"),
|
||
func.count(Exam.id).label("exam_count"),
|
||
func.max(Exam.round1_score).label("max_score"),
|
||
func.min(Exam.round1_score).label("min_score"),
|
||
func.count(case((Exam.is_passed == True, 1))).label("passed_count")
|
||
).join(
|
||
Course, Exam.course_id == Course.id
|
||
).where(
|
||
and_(*conditions, Exam.round1_score.isnot(None))
|
||
).group_by(
|
||
Exam.course_id, Course.name
|
||
).order_by(
|
||
desc(func.count(Exam.id))
|
||
)
|
||
|
||
result = await db.execute(stmt)
|
||
subject_data = result.all()
|
||
|
||
subjects = []
|
||
for row in subject_data:
|
||
exam_count = row.exam_count or 0
|
||
passed_count = row.passed_count or 0
|
||
|
||
subjects.append({
|
||
"course_id": row.course_id,
|
||
"course_name": row.course_name,
|
||
"avg_score": round(float(row.avg_score or 0), 1),
|
||
"exam_count": exam_count,
|
||
"max_score": round(float(row.max_score or 0), 1),
|
||
"min_score": round(float(row.min_score or 0), 1),
|
||
"pass_rate": round((passed_count / exam_count * 100) if exam_count > 0 else 0, 1)
|
||
})
|
||
|
||
return subjects
|
||
|
||
@staticmethod
|
||
async def _get_recent_exams(db: AsyncSession, conditions: List) -> List[Dict[str, Any]]:
|
||
"""获取最近10次考试记录"""
|
||
# 查询最近10次考试,包含三轮得分
|
||
stmt = select(
|
||
Exam.id,
|
||
Exam.course_id,
|
||
Course.name.label("course_name"),
|
||
Exam.score,
|
||
Exam.total_score,
|
||
Exam.is_passed,
|
||
Exam.start_time,
|
||
Exam.end_time,
|
||
Exam.round1_score,
|
||
Exam.round2_score,
|
||
Exam.round3_score
|
||
).join(
|
||
Course, Exam.course_id == Course.id
|
||
).where(
|
||
and_(*conditions)
|
||
).order_by(
|
||
desc(Exam.created_at) # 改为按创建时间排序,避免start_time为NULL的问题
|
||
).limit(10)
|
||
|
||
result = await db.execute(stmt)
|
||
exam_data = result.all()
|
||
|
||
recent_exams = []
|
||
for row in exam_data:
|
||
# 计算考试用时
|
||
duration_seconds = None
|
||
if row.start_time and row.end_time:
|
||
duration_seconds = int((row.end_time - row.start_time).total_seconds())
|
||
|
||
recent_exams.append({
|
||
"id": row.id,
|
||
"course_id": row.course_id,
|
||
"course_name": row.course_name,
|
||
"score": round(float(row.score), 1) if row.score else None,
|
||
"total_score": round(float(row.total_score or 100), 1),
|
||
"is_passed": row.is_passed,
|
||
"duration_seconds": duration_seconds,
|
||
"start_time": row.start_time.isoformat() if row.start_time else None,
|
||
"end_time": row.end_time.isoformat() if row.end_time else None,
|
||
"round_scores": {
|
||
"round1": round(float(row.round1_score), 1) if row.round1_score else None,
|
||
"round2": round(float(row.round2_score), 1) if row.round2_score else None,
|
||
"round3": round(float(row.round3_score), 1) if row.round3_score else None
|
||
}
|
||
})
|
||
|
||
return recent_exams
|
||
|
||
|
||
class MistakeService:
|
||
"""错题服务类"""
|
||
|
||
@staticmethod
|
||
async def get_mistakes_list(
|
||
db: AsyncSession,
|
||
user_id: int,
|
||
exam_id: Optional[int] = None,
|
||
course_id: Optional[int] = None,
|
||
question_type: Optional[str] = None,
|
||
search: Optional[str] = None,
|
||
start_date: Optional[str] = None,
|
||
end_date: Optional[str] = None,
|
||
page: int = 1,
|
||
size: int = 10
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
获取错题列表(支持多维度筛选)
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
user_id: 用户ID
|
||
exam_id: 考试ID(可选)
|
||
course_id: 课程ID(可选)
|
||
question_type: 题型(可选)
|
||
search: 关键词搜索(可选)
|
||
start_date: 开始日期(可选)
|
||
end_date: 结束日期(可选)
|
||
page: 页码
|
||
size: 每页数量
|
||
|
||
Returns:
|
||
Dict: 包含items、total、page、size、pages的分页数据
|
||
"""
|
||
logger.info(f"获取错题列表 - user_id: {user_id}, exam_id: {exam_id}, course_id: {course_id}")
|
||
|
||
# 构建查询条件
|
||
conditions = [ExamMistake.user_id == user_id]
|
||
|
||
if exam_id:
|
||
conditions.append(ExamMistake.exam_id == exam_id)
|
||
|
||
if question_type:
|
||
conditions.append(ExamMistake.question_type == question_type)
|
||
|
||
if search:
|
||
conditions.append(ExamMistake.question_content.like(f"%{search}%"))
|
||
|
||
if start_date:
|
||
conditions.append(ExamMistake.created_at >= start_date)
|
||
|
||
if end_date:
|
||
end_datetime = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
|
||
conditions.append(ExamMistake.created_at < end_datetime)
|
||
|
||
# 如果指定了course_id,需要通过exam关联
|
||
if course_id:
|
||
conditions.append(Exam.course_id == course_id)
|
||
|
||
# 查询总数
|
||
count_stmt = select(func.count(ExamMistake.id)).select_from(ExamMistake).join(
|
||
Exam, ExamMistake.exam_id == Exam.id
|
||
).where(and_(*conditions))
|
||
|
||
total_result = await db.execute(count_stmt)
|
||
total = total_result.scalar() or 0
|
||
|
||
# 查询分页数据
|
||
offset = (page - 1) * size
|
||
|
||
stmt = select(
|
||
ExamMistake.id,
|
||
ExamMistake.exam_id,
|
||
Exam.course_id,
|
||
Course.name.label("course_name"),
|
||
ExamMistake.question_content,
|
||
ExamMistake.correct_answer,
|
||
ExamMistake.user_answer,
|
||
ExamMistake.question_type,
|
||
ExamMistake.knowledge_point_id,
|
||
KnowledgePoint.name.label("knowledge_point_name"),
|
||
ExamMistake.created_at
|
||
).select_from(ExamMistake).join(
|
||
Exam, ExamMistake.exam_id == Exam.id
|
||
).join(
|
||
Course, Exam.course_id == Course.id
|
||
).outerjoin(
|
||
KnowledgePoint, ExamMistake.knowledge_point_id == KnowledgePoint.id
|
||
).where(
|
||
and_(*conditions)
|
||
).order_by(
|
||
desc(ExamMistake.created_at)
|
||
).offset(offset).limit(size)
|
||
|
||
result = await db.execute(stmt)
|
||
mistakes = result.all()
|
||
|
||
# 构建返回数据
|
||
items = []
|
||
for row in mistakes:
|
||
items.append({
|
||
"id": row.id,
|
||
"exam_id": row.exam_id,
|
||
"course_id": row.course_id,
|
||
"course_name": row.course_name,
|
||
"question_content": row.question_content,
|
||
"correct_answer": row.correct_answer,
|
||
"user_answer": row.user_answer,
|
||
"question_type": row.question_type,
|
||
"knowledge_point_id": row.knowledge_point_id,
|
||
"knowledge_point_name": row.knowledge_point_name,
|
||
"created_at": row.created_at
|
||
})
|
||
|
||
pages = (total + size - 1) // size
|
||
|
||
return {
|
||
"items": items,
|
||
"total": total,
|
||
"page": page,
|
||
"size": size,
|
||
"pages": pages
|
||
}
|
||
|
||
@staticmethod
|
||
async def get_mistakes_statistics(
|
||
db: AsyncSession,
|
||
user_id: int,
|
||
course_id: Optional[int] = None
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
获取错题统计数据
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
user_id: 用户ID
|
||
course_id: 课程ID(可选)
|
||
|
||
Returns:
|
||
Dict: 包含total、by_course、by_type、by_time的统计数据
|
||
"""
|
||
logger.info(f"获取错题统计 - user_id: {user_id}, course_id: {course_id}")
|
||
|
||
# 基础条件
|
||
base_conditions = [ExamMistake.user_id == user_id]
|
||
if course_id:
|
||
base_conditions.append(Exam.course_id == course_id)
|
||
|
||
# 1. 总数统计
|
||
count_stmt = select(func.count(ExamMistake.id)).select_from(ExamMistake).join(
|
||
Exam, ExamMistake.exam_id == Exam.id
|
||
).where(and_(*base_conditions))
|
||
|
||
total_result = await db.execute(count_stmt)
|
||
total = total_result.scalar() or 0
|
||
|
||
# 2. 按课程统计
|
||
by_course_stmt = select(
|
||
Exam.course_id,
|
||
Course.name.label("course_name"),
|
||
func.count(ExamMistake.id).label("count")
|
||
).select_from(ExamMistake).join(
|
||
Exam, ExamMistake.exam_id == Exam.id
|
||
).join(
|
||
Course, Exam.course_id == Course.id
|
||
).where(
|
||
ExamMistake.user_id == user_id
|
||
).group_by(
|
||
Exam.course_id, Course.name
|
||
).order_by(
|
||
desc(func.count(ExamMistake.id))
|
||
)
|
||
|
||
by_course_result = await db.execute(by_course_stmt)
|
||
by_course_data = by_course_result.all()
|
||
|
||
by_course = [
|
||
{
|
||
"course_id": row.course_id,
|
||
"course_name": row.course_name,
|
||
"count": row.count
|
||
}
|
||
for row in by_course_data
|
||
]
|
||
|
||
# 3. 按题型统计
|
||
by_type_stmt = select(
|
||
ExamMistake.question_type,
|
||
func.count(ExamMistake.id).label("count")
|
||
).where(
|
||
and_(ExamMistake.user_id == user_id, ExamMistake.question_type.isnot(None))
|
||
).group_by(
|
||
ExamMistake.question_type
|
||
)
|
||
|
||
by_type_result = await db.execute(by_type_stmt)
|
||
by_type_data = by_type_result.all()
|
||
|
||
# 题型名称映射
|
||
type_names = {
|
||
"single": "单选题",
|
||
"multiple": "多选题",
|
||
"judge": "判断题",
|
||
"blank": "填空题",
|
||
"essay": "问答题"
|
||
}
|
||
|
||
by_type = [
|
||
{
|
||
"type": row.question_type,
|
||
"type_name": type_names.get(row.question_type, "未知类型"),
|
||
"count": row.count
|
||
}
|
||
for row in by_type_data
|
||
]
|
||
|
||
# 4. 按时间统计
|
||
now = datetime.now()
|
||
week_ago = now - timedelta(days=7)
|
||
month_ago = now - timedelta(days=30)
|
||
quarter_ago = now - timedelta(days=90)
|
||
|
||
# 最近一周
|
||
week_stmt = select(func.count(ExamMistake.id)).where(
|
||
and_(ExamMistake.user_id == user_id, ExamMistake.created_at >= week_ago)
|
||
)
|
||
week_result = await db.execute(week_stmt)
|
||
week_count = week_result.scalar() or 0
|
||
|
||
# 最近一月
|
||
month_stmt = select(func.count(ExamMistake.id)).where(
|
||
and_(ExamMistake.user_id == user_id, ExamMistake.created_at >= month_ago)
|
||
)
|
||
month_result = await db.execute(month_stmt)
|
||
month_count = month_result.scalar() or 0
|
||
|
||
# 最近三月
|
||
quarter_stmt = select(func.count(ExamMistake.id)).where(
|
||
and_(ExamMistake.user_id == user_id, ExamMistake.created_at >= quarter_ago)
|
||
)
|
||
quarter_result = await db.execute(quarter_stmt)
|
||
quarter_count = quarter_result.scalar() or 0
|
||
|
||
by_time = {
|
||
"week": week_count,
|
||
"month": month_count,
|
||
"quarter": quarter_count
|
||
}
|
||
|
||
return {
|
||
"total": total,
|
||
"by_course": by_course,
|
||
"by_type": by_type,
|
||
"by_time": by_time
|
||
}
|
||
|