- 从服务器拉取完整代码 - 按框架规范整理项目结构 - 配置 Drone CI 测试环境部署 - 包含后端(FastAPI)、前端(Vue3)、管理端 技术栈: Vue3 + TypeScript + FastAPI + MySQL
709 lines
25 KiB
Python
709 lines
25 KiB
Python
"""
|
||
统计分析服务
|
||
"""
|
||
from datetime import datetime, timedelta
|
||
from typing import List, Optional, Dict, Any, Tuple
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
from sqlalchemy import select, func, and_, or_, case, desc, distinct
|
||
from app.models.exam import Exam, Question
|
||
from app.models.exam_mistake import ExamMistake
|
||
from app.models.course import Course, KnowledgePoint
|
||
from app.models.practice import PracticeSession
|
||
from app.models.training import TrainingSession
|
||
from app.core.logger import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
class StatisticsService:
|
||
"""统计分析服务类"""
|
||
|
||
@staticmethod
|
||
def _get_date_range(period: str) -> Tuple[datetime, datetime]:
|
||
"""
|
||
根据period返回开始和结束日期
|
||
|
||
Args:
|
||
period: 时间范围 (week/month/quarter/halfYear/year)
|
||
|
||
Returns:
|
||
Tuple[datetime, datetime]: (开始日期, 结束日期)
|
||
"""
|
||
end_date = datetime.now()
|
||
|
||
if period == "week":
|
||
start_date = end_date - timedelta(days=7)
|
||
elif period == "month":
|
||
start_date = end_date - timedelta(days=30)
|
||
elif period == "quarter":
|
||
start_date = end_date - timedelta(days=90)
|
||
elif period == "halfYear":
|
||
start_date = end_date - timedelta(days=180)
|
||
elif period == "year":
|
||
start_date = end_date - timedelta(days=365)
|
||
else:
|
||
# 默认一个月
|
||
start_date = end_date - timedelta(days=30)
|
||
|
||
return start_date, end_date
|
||
|
||
@staticmethod
|
||
def _calculate_change_rate(current: float, previous: float) -> float:
|
||
"""
|
||
计算环比变化率
|
||
|
||
Args:
|
||
current: 当前值
|
||
previous: 上期值
|
||
|
||
Returns:
|
||
float: 变化率(百分比)
|
||
"""
|
||
if previous == 0:
|
||
return 0 if current == 0 else 100
|
||
return round(((current - previous) / previous) * 100, 1)
|
||
|
||
@staticmethod
|
||
async def get_key_metrics(
|
||
db: AsyncSession,
|
||
user_id: int,
|
||
course_id: Optional[int] = None,
|
||
period: str = "month"
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
获取关键指标
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
user_id: 用户ID
|
||
course_id: 课程ID(可选)
|
||
period: 时间范围
|
||
|
||
Returns:
|
||
Dict: 包含学习效率、知识覆盖率、平均用时、进步速度的指标数据
|
||
"""
|
||
logger.info(f"获取关键指标 - user_id: {user_id}, course_id: {course_id}, period: {period}")
|
||
|
||
start_date, end_date = StatisticsService._get_date_range(period)
|
||
|
||
# 构建基础查询条件
|
||
exam_conditions = [
|
||
Exam.user_id == user_id,
|
||
Exam.start_time >= start_date,
|
||
Exam.start_time <= end_date,
|
||
Exam.round1_score.isnot(None)
|
||
]
|
||
if course_id:
|
||
exam_conditions.append(Exam.course_id == course_id)
|
||
|
||
# 1. 学习效率 = (总题数 - 错题数) / 总题数
|
||
# 获取总题数
|
||
total_questions_stmt = select(
|
||
func.coalesce(func.sum(Exam.question_count), 0)
|
||
).where(and_(*exam_conditions))
|
||
total_questions = await db.scalar(total_questions_stmt) or 0
|
||
|
||
# 获取错题数
|
||
mistake_conditions = [ExamMistake.user_id == user_id]
|
||
if course_id:
|
||
mistake_conditions.append(
|
||
ExamMistake.exam_id.in_(
|
||
select(Exam.id).where(Exam.course_id == course_id)
|
||
)
|
||
)
|
||
mistake_stmt = select(func.count(ExamMistake.id)).where(
|
||
and_(*mistake_conditions)
|
||
)
|
||
mistake_count = await db.scalar(mistake_stmt) or 0
|
||
|
||
# 计算学习效率
|
||
learning_efficiency = 0.0
|
||
if total_questions > 0:
|
||
correct_questions = total_questions - mistake_count
|
||
learning_efficiency = round((correct_questions / total_questions) * 100, 1)
|
||
|
||
# 计算上期学习效率(用于环比)
|
||
prev_start_date = start_date - (end_date - start_date)
|
||
prev_exam_conditions = [
|
||
Exam.user_id == user_id,
|
||
Exam.start_time >= prev_start_date,
|
||
Exam.start_time < start_date,
|
||
Exam.round1_score.isnot(None)
|
||
]
|
||
if course_id:
|
||
prev_exam_conditions.append(Exam.course_id == course_id)
|
||
|
||
prev_total_questions = await db.scalar(
|
||
select(func.coalesce(func.sum(Exam.question_count), 0)).where(
|
||
and_(*prev_exam_conditions)
|
||
)
|
||
) or 0
|
||
|
||
prev_mistake_count = await db.scalar(
|
||
select(func.count(ExamMistake.id)).where(
|
||
and_(
|
||
ExamMistake.user_id == user_id,
|
||
ExamMistake.exam_id.in_(
|
||
select(Exam.id).where(and_(*prev_exam_conditions))
|
||
)
|
||
)
|
||
)
|
||
) or 0
|
||
|
||
prev_efficiency = 0.0
|
||
if prev_total_questions > 0:
|
||
prev_correct = prev_total_questions - prev_mistake_count
|
||
prev_efficiency = (prev_correct / prev_total_questions) * 100
|
||
|
||
efficiency_change = StatisticsService._calculate_change_rate(
|
||
learning_efficiency, prev_efficiency
|
||
)
|
||
|
||
# 2. 知识覆盖率 = 已掌握知识点数 / 总知识点数
|
||
# 获取总知识点数
|
||
kp_conditions = []
|
||
if course_id:
|
||
kp_conditions.append(KnowledgePoint.course_id == course_id)
|
||
|
||
total_kp_stmt = select(func.count(KnowledgePoint.id)).where(
|
||
and_(KnowledgePoint.is_deleted == False, *kp_conditions)
|
||
)
|
||
total_kp = await db.scalar(total_kp_stmt) or 0
|
||
|
||
# 获取错误的知识点数(至少错过一次的)
|
||
mistake_kp_stmt = select(
|
||
func.count(distinct(ExamMistake.knowledge_point_id))
|
||
).where(
|
||
and_(
|
||
ExamMistake.user_id == user_id,
|
||
ExamMistake.knowledge_point_id.isnot(None),
|
||
*([ExamMistake.exam_id.in_(
|
||
select(Exam.id).where(Exam.course_id == course_id)
|
||
)] if course_id else [])
|
||
)
|
||
)
|
||
mistake_kp = await db.scalar(mistake_kp_stmt) or 0
|
||
|
||
# 计算知识覆盖率(掌握的知识点 = 总知识点 - 错误知识点)
|
||
knowledge_coverage = 0.0
|
||
if total_kp > 0:
|
||
mastered_kp = max(0, total_kp - mistake_kp)
|
||
knowledge_coverage = round((mastered_kp / total_kp) * 100, 1)
|
||
|
||
# 上期知识覆盖率(简化:假设知识点总数不变)
|
||
prev_mistake_kp = await db.scalar(
|
||
select(func.count(distinct(ExamMistake.knowledge_point_id))).where(
|
||
and_(
|
||
ExamMistake.user_id == user_id,
|
||
ExamMistake.knowledge_point_id.isnot(None),
|
||
ExamMistake.exam_id.in_(
|
||
select(Exam.id).where(and_(*prev_exam_conditions))
|
||
)
|
||
)
|
||
)
|
||
) or 0
|
||
|
||
prev_coverage = 0.0
|
||
if total_kp > 0:
|
||
prev_mastered = max(0, total_kp - prev_mistake_kp)
|
||
prev_coverage = (prev_mastered / total_kp) * 100
|
||
|
||
coverage_change = StatisticsService._calculate_change_rate(
|
||
knowledge_coverage, prev_coverage
|
||
)
|
||
|
||
# 3. 平均用时 = 总考试时长 / 总题数
|
||
total_duration_stmt = select(
|
||
func.coalesce(func.sum(Exam.duration_minutes), 0)
|
||
).where(and_(*exam_conditions))
|
||
total_duration = await db.scalar(total_duration_stmt) or 0
|
||
|
||
avg_time_per_question = 0.0
|
||
if total_questions > 0:
|
||
avg_time_per_question = round((total_duration / total_questions), 1)
|
||
|
||
# 上期平均用时
|
||
prev_total_duration = await db.scalar(
|
||
select(func.coalesce(func.sum(Exam.duration_minutes), 0)).where(
|
||
and_(*prev_exam_conditions)
|
||
)
|
||
) or 0
|
||
|
||
prev_avg_time = 0.0
|
||
if prev_total_questions > 0:
|
||
prev_avg_time = prev_total_duration / prev_total_questions
|
||
|
||
# 平均用时的环比是负增长表示好(时间减少)
|
||
time_change = StatisticsService._calculate_change_rate(
|
||
avg_time_per_question, prev_avg_time
|
||
)
|
||
|
||
# 4. 进步速度 = (本期平均分 - 上期平均分) / 上期平均分
|
||
avg_score_stmt = select(func.avg(Exam.round1_score)).where(
|
||
and_(*exam_conditions)
|
||
)
|
||
avg_score = await db.scalar(avg_score_stmt) or 0
|
||
|
||
prev_avg_score_stmt = select(func.avg(Exam.round1_score)).where(
|
||
and_(*prev_exam_conditions)
|
||
)
|
||
prev_avg_score = await db.scalar(prev_avg_score_stmt) or 0
|
||
|
||
progress_speed = StatisticsService._calculate_change_rate(
|
||
float(avg_score), float(prev_avg_score)
|
||
)
|
||
|
||
return {
|
||
"learningEfficiency": {
|
||
"value": learning_efficiency,
|
||
"unit": "%",
|
||
"change": efficiency_change,
|
||
"description": "正确题数/总练习题数"
|
||
},
|
||
"knowledgeCoverage": {
|
||
"value": knowledge_coverage,
|
||
"unit": "%",
|
||
"change": coverage_change,
|
||
"description": "已掌握知识点/总知识点"
|
||
},
|
||
"avgTimePerQuestion": {
|
||
"value": avg_time_per_question,
|
||
"unit": "分/题",
|
||
"change": time_change,
|
||
"description": "平均每道题的答题时间"
|
||
},
|
||
"progressSpeed": {
|
||
"value": abs(progress_speed),
|
||
"unit": "%",
|
||
"change": progress_speed,
|
||
"description": "成绩提升速度"
|
||
}
|
||
}
|
||
|
||
@staticmethod
|
||
async def get_score_distribution(
|
||
db: AsyncSession,
|
||
user_id: int,
|
||
course_id: Optional[int] = None,
|
||
period: str = "month"
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
获取成绩分布统计
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
user_id: 用户ID
|
||
course_id: 课程ID(可选)
|
||
period: 时间范围
|
||
|
||
Returns:
|
||
Dict: 成绩分布数据(优秀、良好、中等、及格、不及格)
|
||
"""
|
||
logger.info(f"获取成绩分布 - user_id: {user_id}, course_id: {course_id}, period: {period}")
|
||
|
||
start_date, end_date = StatisticsService._get_date_range(period)
|
||
|
||
# 构建查询条件
|
||
conditions = [
|
||
Exam.user_id == user_id,
|
||
Exam.start_time >= start_date,
|
||
Exam.start_time <= end_date,
|
||
Exam.round1_score.isnot(None)
|
||
]
|
||
if course_id:
|
||
conditions.append(Exam.course_id == course_id)
|
||
|
||
# 使用case when统计各分数段的数量
|
||
stmt = select(
|
||
func.count(case((Exam.round1_score >= 90, 1))).label("excellent"),
|
||
func.count(case((and_(Exam.round1_score >= 80, Exam.round1_score < 90), 1))).label("good"),
|
||
func.count(case((and_(Exam.round1_score >= 70, Exam.round1_score < 80), 1))).label("medium"),
|
||
func.count(case((and_(Exam.round1_score >= 60, Exam.round1_score < 70), 1))).label("pass_count"),
|
||
func.count(case((Exam.round1_score < 60, 1))).label("fail")
|
||
).where(and_(*conditions))
|
||
|
||
result = await db.execute(stmt)
|
||
row = result.one()
|
||
|
||
return {
|
||
"excellent": row.excellent or 0, # 优秀(90-100)
|
||
"good": row.good or 0, # 良好(80-89)
|
||
"medium": row.medium or 0, # 中等(70-79)
|
||
"pass": row.pass_count or 0, # 及格(60-69)
|
||
"fail": row.fail or 0 # 不及格(<60)
|
||
}
|
||
|
||
@staticmethod
|
||
async def get_difficulty_analysis(
|
||
db: AsyncSession,
|
||
user_id: int,
|
||
course_id: Optional[int] = None,
|
||
period: str = "month"
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
获取题目难度分析
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
user_id: 用户ID
|
||
course_id: 课程ID(可选)
|
||
period: 时间范围
|
||
|
||
Returns:
|
||
Dict: 各难度题目的正确率统计
|
||
"""
|
||
logger.info(f"获取难度分析 - user_id: {user_id}, course_id: {course_id}, period: {period}")
|
||
|
||
start_date, end_date = StatisticsService._get_date_range(period)
|
||
|
||
# 获取用户在时间范围内的考试
|
||
exam_conditions = [
|
||
Exam.user_id == user_id,
|
||
Exam.start_time >= start_date,
|
||
Exam.start_time <= end_date
|
||
]
|
||
if course_id:
|
||
exam_conditions.append(Exam.course_id == course_id)
|
||
|
||
exam_ids_stmt = select(Exam.id).where(and_(*exam_conditions))
|
||
result = await db.execute(exam_ids_stmt)
|
||
exam_ids = [row[0] for row in result.all()]
|
||
|
||
if not exam_ids:
|
||
# 没有考试数据,返回默认值
|
||
return {
|
||
"easy": 100.0,
|
||
"medium": 100.0,
|
||
"hard": 100.0,
|
||
"综合题": 100.0,
|
||
"应用题": 100.0
|
||
}
|
||
|
||
# 统计各难度的总题数和错题数
|
||
difficulty_stats = {}
|
||
|
||
for difficulty in ["easy", "medium", "hard"]:
|
||
# 总题数:从exams的questions字段中统计(这里简化处理)
|
||
# 由于questions字段是JSON,我们通过question_count估算
|
||
# 实际应用中可以解析JSON或通过exam_results表统计
|
||
|
||
# 错题数:从exam_mistakes通过question_id关联查询
|
||
mistake_stmt = select(func.count(ExamMistake.id)).select_from(
|
||
ExamMistake
|
||
).join(
|
||
Question, ExamMistake.question_id == Question.id
|
||
).where(
|
||
and_(
|
||
ExamMistake.user_id == user_id,
|
||
ExamMistake.exam_id.in_(exam_ids),
|
||
Question.difficulty == difficulty
|
||
)
|
||
)
|
||
|
||
mistake_count = await db.scalar(mistake_stmt) or 0
|
||
|
||
# 总题数:该难度的题目在用户考试中出现的次数
|
||
# 简化处理:假设每次考试平均包含该难度题目的比例
|
||
total_questions_stmt = select(
|
||
func.coalesce(func.sum(Exam.question_count), 0)
|
||
).where(and_(*exam_conditions))
|
||
total_count = await db.scalar(total_questions_stmt) or 0
|
||
total_count = int(total_count) # 转换为int避免Decimal类型问题
|
||
|
||
# 简化算法:假设easy:medium:hard = 3:2:1
|
||
if difficulty == "easy":
|
||
estimated_count = int(total_count * 0.5)
|
||
elif difficulty == "medium":
|
||
estimated_count = int(total_count * 0.3)
|
||
else: # hard
|
||
estimated_count = int(total_count * 0.2)
|
||
|
||
# 计算正确率
|
||
if estimated_count > 0:
|
||
correct_count = max(0, estimated_count - mistake_count)
|
||
accuracy = round((correct_count / estimated_count) * 100, 1)
|
||
else:
|
||
accuracy = 100.0
|
||
|
||
difficulty_stats[difficulty] = accuracy
|
||
|
||
# 综合题和应用题使用中等和困难题的平均值
|
||
difficulty_stats["综合题"] = round((difficulty_stats["medium"] + difficulty_stats["hard"]) / 2, 1)
|
||
difficulty_stats["应用题"] = round((difficulty_stats["medium"] + difficulty_stats["hard"]) / 2, 1)
|
||
|
||
return {
|
||
"简单题": difficulty_stats["easy"],
|
||
"中等题": difficulty_stats["medium"],
|
||
"困难题": difficulty_stats["hard"],
|
||
"综合题": difficulty_stats["综合题"],
|
||
"应用题": difficulty_stats["应用题"]
|
||
}
|
||
|
||
@staticmethod
|
||
async def get_knowledge_mastery(
|
||
db: AsyncSession,
|
||
user_id: int,
|
||
course_id: Optional[int] = None
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
获取知识点掌握度
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
user_id: 用户ID
|
||
course_id: 课程ID(可选)
|
||
|
||
Returns:
|
||
List[Dict]: 知识点掌握度列表
|
||
"""
|
||
logger.info(f"获取知识点掌握度 - user_id: {user_id}, course_id: {course_id}")
|
||
|
||
# 获取知识点列表
|
||
kp_conditions = [KnowledgePoint.is_deleted == False]
|
||
if course_id:
|
||
kp_conditions.append(KnowledgePoint.course_id == course_id)
|
||
|
||
kp_stmt = select(KnowledgePoint).where(and_(*kp_conditions)).limit(10)
|
||
result = await db.execute(kp_stmt)
|
||
knowledge_points = result.scalars().all()
|
||
|
||
if not knowledge_points:
|
||
# 没有知识点数据,返回默认数据
|
||
return [
|
||
{"name": "基础概念", "mastery": 85.0},
|
||
{"name": "核心知识", "mastery": 72.0},
|
||
{"name": "实践应用", "mastery": 68.0},
|
||
{"name": "综合运用", "mastery": 58.0},
|
||
{"name": "高级技巧", "mastery": 75.0},
|
||
{"name": "案例分析", "mastery": 62.0}
|
||
]
|
||
|
||
mastery_list = []
|
||
|
||
for kp in knowledge_points:
|
||
# 统计该知识点的错误次数
|
||
mistake_stmt = select(func.count(ExamMistake.id)).where(
|
||
and_(
|
||
ExamMistake.user_id == user_id,
|
||
ExamMistake.knowledge_point_id == kp.id
|
||
)
|
||
)
|
||
mistake_count = await db.scalar(mistake_stmt) or 0
|
||
|
||
# 假设每个知识点平均被考查10次(简化处理)
|
||
estimated_total = 10
|
||
|
||
# 计算掌握度
|
||
if estimated_total > 0:
|
||
correct_count = max(0, estimated_total - mistake_count)
|
||
mastery = round((correct_count / estimated_total) * 100, 1)
|
||
else:
|
||
mastery = 100.0
|
||
|
||
mastery_list.append({
|
||
"name": kp.name[:10], # 限制名称长度
|
||
"mastery": mastery
|
||
})
|
||
|
||
return mastery_list[:6] # 最多返回6个知识点
|
||
|
||
@staticmethod
|
||
async def get_study_time_stats(
|
||
db: AsyncSession,
|
||
user_id: int,
|
||
course_id: Optional[int] = None,
|
||
period: str = "month"
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
获取学习时长统计
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
user_id: 用户ID
|
||
course_id: 课程ID(可选)
|
||
period: 时间范围
|
||
|
||
Returns:
|
||
Dict: 学习时长和练习时长的日期分布数据
|
||
"""
|
||
logger.info(f"获取学习时长统计 - user_id: {user_id}, course_id: {course_id}, period: {period}")
|
||
|
||
start_date, end_date = StatisticsService._get_date_range(period)
|
||
|
||
# 获取天数
|
||
days = (end_date - start_date).days
|
||
if days > 30:
|
||
days = 30 # 最多显示30天
|
||
|
||
# 生成日期列表
|
||
date_list = []
|
||
for i in range(days):
|
||
date = end_date - timedelta(days=days - i - 1)
|
||
date_list.append(date.date())
|
||
|
||
# 初始化数据
|
||
study_time_data = {str(d): 0.0 for d in date_list}
|
||
practice_time_data = {str(d): 0.0 for d in date_list}
|
||
|
||
# 统计考试时长(学习时长)
|
||
exam_conditions = [
|
||
Exam.user_id == user_id,
|
||
Exam.start_time >= start_date,
|
||
Exam.start_time <= end_date
|
||
]
|
||
if course_id:
|
||
exam_conditions.append(Exam.course_id == course_id)
|
||
|
||
exam_stmt = select(
|
||
func.date(Exam.start_time).label("date"),
|
||
func.sum(Exam.duration_minutes).label("total_minutes")
|
||
).where(
|
||
and_(*exam_conditions)
|
||
).group_by(
|
||
func.date(Exam.start_time)
|
||
)
|
||
|
||
exam_result = await db.execute(exam_stmt)
|
||
for row in exam_result.all():
|
||
date_str = str(row.date)
|
||
if date_str in study_time_data:
|
||
study_time_data[date_str] = round(float(row.total_minutes) / 60, 1)
|
||
|
||
# 统计陪练时长(练习时长)
|
||
practice_conditions = [
|
||
PracticeSession.user_id == user_id,
|
||
PracticeSession.start_time >= start_date,
|
||
PracticeSession.start_time <= end_date,
|
||
PracticeSession.status == "completed"
|
||
]
|
||
|
||
practice_stmt = select(
|
||
func.date(PracticeSession.start_time).label("date"),
|
||
func.sum(PracticeSession.duration_seconds).label("total_seconds")
|
||
).where(
|
||
and_(*practice_conditions)
|
||
).group_by(
|
||
func.date(PracticeSession.start_time)
|
||
)
|
||
|
||
practice_result = await db.execute(practice_stmt)
|
||
for row in practice_result.all():
|
||
date_str = str(row.date)
|
||
if date_str in practice_time_data:
|
||
practice_time_data[date_str] = round(float(row.total_seconds) / 3600, 1)
|
||
|
||
# 如果period是week,返回星期几标签
|
||
if period == "week":
|
||
weekday_labels = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
|
||
labels = weekday_labels[:len(date_list)]
|
||
else:
|
||
# 其他情况返回日期
|
||
labels = [d.strftime("%m-%d") for d in date_list]
|
||
|
||
study_values = [study_time_data[str(d)] for d in date_list]
|
||
practice_values = [practice_time_data[str(d)] for d in date_list]
|
||
|
||
return {
|
||
"labels": labels,
|
||
"studyTime": study_values,
|
||
"practiceTime": practice_values
|
||
}
|
||
|
||
@staticmethod
|
||
async def get_detail_data(
|
||
db: AsyncSession,
|
||
user_id: int,
|
||
course_id: Optional[int] = None,
|
||
period: str = "month"
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
获取详细统计数据(按日期)
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
user_id: 用户ID
|
||
course_id: 课程ID(可选)
|
||
period: 时间范围
|
||
|
||
Returns:
|
||
List[Dict]: 每日详细统计数据
|
||
"""
|
||
logger.info(f"获取详细数据 - user_id: {user_id}, course_id: {course_id}, period: {period}")
|
||
|
||
start_date, end_date = StatisticsService._get_date_range(period)
|
||
|
||
# 构建查询条件
|
||
exam_conditions = [
|
||
Exam.user_id == user_id,
|
||
Exam.start_time >= start_date,
|
||
Exam.start_time <= end_date,
|
||
Exam.round1_score.isnot(None)
|
||
]
|
||
if course_id:
|
||
exam_conditions.append(Exam.course_id == course_id)
|
||
|
||
# 按日期分组统计
|
||
stmt = select(
|
||
func.date(Exam.start_time).label("date"),
|
||
func.count(Exam.id).label("exam_count"),
|
||
func.avg(Exam.round1_score).label("avg_score"),
|
||
func.sum(Exam.duration_minutes).label("total_duration"),
|
||
func.sum(Exam.question_count).label("total_questions")
|
||
).where(
|
||
and_(*exam_conditions)
|
||
).group_by(
|
||
func.date(Exam.start_time)
|
||
).order_by(
|
||
desc(func.date(Exam.start_time))
|
||
).limit(10) # 最多返回10条
|
||
|
||
result = await db.execute(stmt)
|
||
rows = result.all()
|
||
|
||
detail_list = []
|
||
|
||
for row in rows:
|
||
date_str = row.date.strftime("%Y-%m-%d")
|
||
exam_count = row.exam_count or 0
|
||
avg_score = round(float(row.avg_score or 0), 1)
|
||
study_time = round(float(row.total_duration or 0) / 60, 1)
|
||
question_count = row.total_questions or 0
|
||
|
||
# 统计当天的错题数
|
||
mistake_stmt = select(func.count(ExamMistake.id)).where(
|
||
and_(
|
||
ExamMistake.user_id == user_id,
|
||
ExamMistake.exam_id.in_(
|
||
select(Exam.id).where(
|
||
and_(
|
||
Exam.user_id == user_id,
|
||
func.date(Exam.start_time) == row.date
|
||
)
|
||
)
|
||
)
|
||
)
|
||
)
|
||
mistake_count = await db.scalar(mistake_stmt) or 0
|
||
|
||
# 计算正确率
|
||
accuracy = 0.0
|
||
if question_count > 0:
|
||
correct_count = question_count - mistake_count
|
||
accuracy = round((correct_count / question_count) * 100, 1)
|
||
|
||
# 计算进步指数(基于平均分)
|
||
improvement = min(100, max(0, int(avg_score)))
|
||
|
||
detail_list.append({
|
||
"date": date_str,
|
||
"examCount": exam_count,
|
||
"avgScore": avg_score,
|
||
"studyTime": study_time,
|
||
"questionCount": question_count,
|
||
"accuracy": accuracy,
|
||
"improvement": improvement
|
||
})
|
||
|
||
return detail_list
|
||
|