Files
012-kaopeilian/backend/app/services/statistics_service.py
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

709 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
统计分析服务
"""
from datetime import datetime, timedelta
from typing import List, Optional, Dict, Any, Tuple
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_, or_, case, desc, distinct
from app.models.exam import Exam, Question
from app.models.exam_mistake import ExamMistake
from app.models.course import Course, KnowledgePoint
from app.models.practice import PracticeSession
from app.models.training import TrainingSession
from app.core.logger import get_logger
logger = get_logger(__name__)
class StatisticsService:
"""统计分析服务类"""
@staticmethod
def _get_date_range(period: str) -> Tuple[datetime, datetime]:
"""
根据period返回开始和结束日期
Args:
period: 时间范围 (week/month/quarter/halfYear/year)
Returns:
Tuple[datetime, datetime]: (开始日期, 结束日期)
"""
end_date = datetime.now()
if period == "week":
start_date = end_date - timedelta(days=7)
elif period == "month":
start_date = end_date - timedelta(days=30)
elif period == "quarter":
start_date = end_date - timedelta(days=90)
elif period == "halfYear":
start_date = end_date - timedelta(days=180)
elif period == "year":
start_date = end_date - timedelta(days=365)
else:
# 默认一个月
start_date = end_date - timedelta(days=30)
return start_date, end_date
@staticmethod
def _calculate_change_rate(current: float, previous: float) -> float:
"""
计算环比变化率
Args:
current: 当前值
previous: 上期值
Returns:
float: 变化率(百分比)
"""
if previous == 0:
return 0 if current == 0 else 100
return round(((current - previous) / previous) * 100, 1)
@staticmethod
async def get_key_metrics(
db: AsyncSession,
user_id: int,
course_id: Optional[int] = None,
period: str = "month"
) -> Dict[str, Any]:
"""
获取关键指标
Args:
db: 数据库会话
user_id: 用户ID
course_id: 课程ID可选
period: 时间范围
Returns:
Dict: 包含学习效率、知识覆盖率、平均用时、进步速度的指标数据
"""
logger.info(f"获取关键指标 - user_id: {user_id}, course_id: {course_id}, period: {period}")
start_date, end_date = StatisticsService._get_date_range(period)
# 构建基础查询条件
exam_conditions = [
Exam.user_id == user_id,
Exam.start_time >= start_date,
Exam.start_time <= end_date,
Exam.round1_score.isnot(None)
]
if course_id:
exam_conditions.append(Exam.course_id == course_id)
# 1. 学习效率 = (总题数 - 错题数) / 总题数
# 获取总题数
total_questions_stmt = select(
func.coalesce(func.sum(Exam.question_count), 0)
).where(and_(*exam_conditions))
total_questions = await db.scalar(total_questions_stmt) or 0
# 获取错题数
mistake_conditions = [ExamMistake.user_id == user_id]
if course_id:
mistake_conditions.append(
ExamMistake.exam_id.in_(
select(Exam.id).where(Exam.course_id == course_id)
)
)
mistake_stmt = select(func.count(ExamMistake.id)).where(
and_(*mistake_conditions)
)
mistake_count = await db.scalar(mistake_stmt) or 0
# 计算学习效率
learning_efficiency = 0.0
if total_questions > 0:
correct_questions = total_questions - mistake_count
learning_efficiency = round((correct_questions / total_questions) * 100, 1)
# 计算上期学习效率(用于环比)
prev_start_date = start_date - (end_date - start_date)
prev_exam_conditions = [
Exam.user_id == user_id,
Exam.start_time >= prev_start_date,
Exam.start_time < start_date,
Exam.round1_score.isnot(None)
]
if course_id:
prev_exam_conditions.append(Exam.course_id == course_id)
prev_total_questions = await db.scalar(
select(func.coalesce(func.sum(Exam.question_count), 0)).where(
and_(*prev_exam_conditions)
)
) or 0
prev_mistake_count = await db.scalar(
select(func.count(ExamMistake.id)).where(
and_(
ExamMistake.user_id == user_id,
ExamMistake.exam_id.in_(
select(Exam.id).where(and_(*prev_exam_conditions))
)
)
)
) or 0
prev_efficiency = 0.0
if prev_total_questions > 0:
prev_correct = prev_total_questions - prev_mistake_count
prev_efficiency = (prev_correct / prev_total_questions) * 100
efficiency_change = StatisticsService._calculate_change_rate(
learning_efficiency, prev_efficiency
)
# 2. 知识覆盖率 = 已掌握知识点数 / 总知识点数
# 获取总知识点数
kp_conditions = []
if course_id:
kp_conditions.append(KnowledgePoint.course_id == course_id)
total_kp_stmt = select(func.count(KnowledgePoint.id)).where(
and_(KnowledgePoint.is_deleted == False, *kp_conditions)
)
total_kp = await db.scalar(total_kp_stmt) or 0
# 获取错误的知识点数(至少错过一次的)
mistake_kp_stmt = select(
func.count(distinct(ExamMistake.knowledge_point_id))
).where(
and_(
ExamMistake.user_id == user_id,
ExamMistake.knowledge_point_id.isnot(None),
*([ExamMistake.exam_id.in_(
select(Exam.id).where(Exam.course_id == course_id)
)] if course_id else [])
)
)
mistake_kp = await db.scalar(mistake_kp_stmt) or 0
# 计算知识覆盖率(掌握的知识点 = 总知识点 - 错误知识点)
knowledge_coverage = 0.0
if total_kp > 0:
mastered_kp = max(0, total_kp - mistake_kp)
knowledge_coverage = round((mastered_kp / total_kp) * 100, 1)
# 上期知识覆盖率(简化:假设知识点总数不变)
prev_mistake_kp = await db.scalar(
select(func.count(distinct(ExamMistake.knowledge_point_id))).where(
and_(
ExamMistake.user_id == user_id,
ExamMistake.knowledge_point_id.isnot(None),
ExamMistake.exam_id.in_(
select(Exam.id).where(and_(*prev_exam_conditions))
)
)
)
) or 0
prev_coverage = 0.0
if total_kp > 0:
prev_mastered = max(0, total_kp - prev_mistake_kp)
prev_coverage = (prev_mastered / total_kp) * 100
coverage_change = StatisticsService._calculate_change_rate(
knowledge_coverage, prev_coverage
)
# 3. 平均用时 = 总考试时长 / 总题数
total_duration_stmt = select(
func.coalesce(func.sum(Exam.duration_minutes), 0)
).where(and_(*exam_conditions))
total_duration = await db.scalar(total_duration_stmt) or 0
avg_time_per_question = 0.0
if total_questions > 0:
avg_time_per_question = round((total_duration / total_questions), 1)
# 上期平均用时
prev_total_duration = await db.scalar(
select(func.coalesce(func.sum(Exam.duration_minutes), 0)).where(
and_(*prev_exam_conditions)
)
) or 0
prev_avg_time = 0.0
if prev_total_questions > 0:
prev_avg_time = prev_total_duration / prev_total_questions
# 平均用时的环比是负增长表示好(时间减少)
time_change = StatisticsService._calculate_change_rate(
avg_time_per_question, prev_avg_time
)
# 4. 进步速度 = (本期平均分 - 上期平均分) / 上期平均分
avg_score_stmt = select(func.avg(Exam.round1_score)).where(
and_(*exam_conditions)
)
avg_score = await db.scalar(avg_score_stmt) or 0
prev_avg_score_stmt = select(func.avg(Exam.round1_score)).where(
and_(*prev_exam_conditions)
)
prev_avg_score = await db.scalar(prev_avg_score_stmt) or 0
progress_speed = StatisticsService._calculate_change_rate(
float(avg_score), float(prev_avg_score)
)
return {
"learningEfficiency": {
"value": learning_efficiency,
"unit": "%",
"change": efficiency_change,
"description": "正确题数/总练习题数"
},
"knowledgeCoverage": {
"value": knowledge_coverage,
"unit": "%",
"change": coverage_change,
"description": "已掌握知识点/总知识点"
},
"avgTimePerQuestion": {
"value": avg_time_per_question,
"unit": "分/题",
"change": time_change,
"description": "平均每道题的答题时间"
},
"progressSpeed": {
"value": abs(progress_speed),
"unit": "%",
"change": progress_speed,
"description": "成绩提升速度"
}
}
@staticmethod
async def get_score_distribution(
db: AsyncSession,
user_id: int,
course_id: Optional[int] = None,
period: str = "month"
) -> Dict[str, Any]:
"""
获取成绩分布统计
Args:
db: 数据库会话
user_id: 用户ID
course_id: 课程ID可选
period: 时间范围
Returns:
Dict: 成绩分布数据(优秀、良好、中等、及格、不及格)
"""
logger.info(f"获取成绩分布 - user_id: {user_id}, course_id: {course_id}, period: {period}")
start_date, end_date = StatisticsService._get_date_range(period)
# 构建查询条件
conditions = [
Exam.user_id == user_id,
Exam.start_time >= start_date,
Exam.start_time <= end_date,
Exam.round1_score.isnot(None)
]
if course_id:
conditions.append(Exam.course_id == course_id)
# 使用case when统计各分数段的数量
stmt = select(
func.count(case((Exam.round1_score >= 90, 1))).label("excellent"),
func.count(case((and_(Exam.round1_score >= 80, Exam.round1_score < 90), 1))).label("good"),
func.count(case((and_(Exam.round1_score >= 70, Exam.round1_score < 80), 1))).label("medium"),
func.count(case((and_(Exam.round1_score >= 60, Exam.round1_score < 70), 1))).label("pass_count"),
func.count(case((Exam.round1_score < 60, 1))).label("fail")
).where(and_(*conditions))
result = await db.execute(stmt)
row = result.one()
return {
"excellent": row.excellent or 0, # 优秀(90-100)
"good": row.good or 0, # 良好(80-89)
"medium": row.medium or 0, # 中等(70-79)
"pass": row.pass_count or 0, # 及格(60-69)
"fail": row.fail or 0 # 不及格(<60)
}
@staticmethod
async def get_difficulty_analysis(
db: AsyncSession,
user_id: int,
course_id: Optional[int] = None,
period: str = "month"
) -> Dict[str, Any]:
"""
获取题目难度分析
Args:
db: 数据库会话
user_id: 用户ID
course_id: 课程ID可选
period: 时间范围
Returns:
Dict: 各难度题目的正确率统计
"""
logger.info(f"获取难度分析 - user_id: {user_id}, course_id: {course_id}, period: {period}")
start_date, end_date = StatisticsService._get_date_range(period)
# 获取用户在时间范围内的考试
exam_conditions = [
Exam.user_id == user_id,
Exam.start_time >= start_date,
Exam.start_time <= end_date
]
if course_id:
exam_conditions.append(Exam.course_id == course_id)
exam_ids_stmt = select(Exam.id).where(and_(*exam_conditions))
result = await db.execute(exam_ids_stmt)
exam_ids = [row[0] for row in result.all()]
if not exam_ids:
# 没有考试数据,返回默认值
return {
"easy": 100.0,
"medium": 100.0,
"hard": 100.0,
"综合题": 100.0,
"应用题": 100.0
}
# 统计各难度的总题数和错题数
difficulty_stats = {}
for difficulty in ["easy", "medium", "hard"]:
# 总题数从exams的questions字段中统计这里简化处理
# 由于questions字段是JSON我们通过question_count估算
# 实际应用中可以解析JSON或通过exam_results表统计
# 错题数从exam_mistakes通过question_id关联查询
mistake_stmt = select(func.count(ExamMistake.id)).select_from(
ExamMistake
).join(
Question, ExamMistake.question_id == Question.id
).where(
and_(
ExamMistake.user_id == user_id,
ExamMistake.exam_id.in_(exam_ids),
Question.difficulty == difficulty
)
)
mistake_count = await db.scalar(mistake_stmt) or 0
# 总题数:该难度的题目在用户考试中出现的次数
# 简化处理:假设每次考试平均包含该难度题目的比例
total_questions_stmt = select(
func.coalesce(func.sum(Exam.question_count), 0)
).where(and_(*exam_conditions))
total_count = await db.scalar(total_questions_stmt) or 0
total_count = int(total_count) # 转换为int避免Decimal类型问题
# 简化算法假设easy:medium:hard = 3:2:1
if difficulty == "easy":
estimated_count = int(total_count * 0.5)
elif difficulty == "medium":
estimated_count = int(total_count * 0.3)
else: # hard
estimated_count = int(total_count * 0.2)
# 计算正确率
if estimated_count > 0:
correct_count = max(0, estimated_count - mistake_count)
accuracy = round((correct_count / estimated_count) * 100, 1)
else:
accuracy = 100.0
difficulty_stats[difficulty] = accuracy
# 综合题和应用题使用中等和困难题的平均值
difficulty_stats["综合题"] = round((difficulty_stats["medium"] + difficulty_stats["hard"]) / 2, 1)
difficulty_stats["应用题"] = round((difficulty_stats["medium"] + difficulty_stats["hard"]) / 2, 1)
return {
"简单题": difficulty_stats["easy"],
"中等题": difficulty_stats["medium"],
"困难题": difficulty_stats["hard"],
"综合题": difficulty_stats["综合题"],
"应用题": difficulty_stats["应用题"]
}
@staticmethod
async def get_knowledge_mastery(
db: AsyncSession,
user_id: int,
course_id: Optional[int] = None
) -> List[Dict[str, Any]]:
"""
获取知识点掌握度
Args:
db: 数据库会话
user_id: 用户ID
course_id: 课程ID可选
Returns:
List[Dict]: 知识点掌握度列表
"""
logger.info(f"获取知识点掌握度 - user_id: {user_id}, course_id: {course_id}")
# 获取知识点列表
kp_conditions = [KnowledgePoint.is_deleted == False]
if course_id:
kp_conditions.append(KnowledgePoint.course_id == course_id)
kp_stmt = select(KnowledgePoint).where(and_(*kp_conditions)).limit(10)
result = await db.execute(kp_stmt)
knowledge_points = result.scalars().all()
if not knowledge_points:
# 没有知识点数据,返回默认数据
return [
{"name": "基础概念", "mastery": 85.0},
{"name": "核心知识", "mastery": 72.0},
{"name": "实践应用", "mastery": 68.0},
{"name": "综合运用", "mastery": 58.0},
{"name": "高级技巧", "mastery": 75.0},
{"name": "案例分析", "mastery": 62.0}
]
mastery_list = []
for kp in knowledge_points:
# 统计该知识点的错误次数
mistake_stmt = select(func.count(ExamMistake.id)).where(
and_(
ExamMistake.user_id == user_id,
ExamMistake.knowledge_point_id == kp.id
)
)
mistake_count = await db.scalar(mistake_stmt) or 0
# 假设每个知识点平均被考查10次简化处理
estimated_total = 10
# 计算掌握度
if estimated_total > 0:
correct_count = max(0, estimated_total - mistake_count)
mastery = round((correct_count / estimated_total) * 100, 1)
else:
mastery = 100.0
mastery_list.append({
"name": kp.name[:10], # 限制名称长度
"mastery": mastery
})
return mastery_list[:6] # 最多返回6个知识点
@staticmethod
async def get_study_time_stats(
db: AsyncSession,
user_id: int,
course_id: Optional[int] = None,
period: str = "month"
) -> Dict[str, Any]:
"""
获取学习时长统计
Args:
db: 数据库会话
user_id: 用户ID
course_id: 课程ID可选
period: 时间范围
Returns:
Dict: 学习时长和练习时长的日期分布数据
"""
logger.info(f"获取学习时长统计 - user_id: {user_id}, course_id: {course_id}, period: {period}")
start_date, end_date = StatisticsService._get_date_range(period)
# 获取天数
days = (end_date - start_date).days
if days > 30:
days = 30 # 最多显示30天
# 生成日期列表
date_list = []
for i in range(days):
date = end_date - timedelta(days=days - i - 1)
date_list.append(date.date())
# 初始化数据
study_time_data = {str(d): 0.0 for d in date_list}
practice_time_data = {str(d): 0.0 for d in date_list}
# 统计考试时长(学习时长)
exam_conditions = [
Exam.user_id == user_id,
Exam.start_time >= start_date,
Exam.start_time <= end_date
]
if course_id:
exam_conditions.append(Exam.course_id == course_id)
exam_stmt = select(
func.date(Exam.start_time).label("date"),
func.sum(Exam.duration_minutes).label("total_minutes")
).where(
and_(*exam_conditions)
).group_by(
func.date(Exam.start_time)
)
exam_result = await db.execute(exam_stmt)
for row in exam_result.all():
date_str = str(row.date)
if date_str in study_time_data:
study_time_data[date_str] = round(float(row.total_minutes) / 60, 1)
# 统计陪练时长(练习时长)
practice_conditions = [
PracticeSession.user_id == user_id,
PracticeSession.start_time >= start_date,
PracticeSession.start_time <= end_date,
PracticeSession.status == "completed"
]
practice_stmt = select(
func.date(PracticeSession.start_time).label("date"),
func.sum(PracticeSession.duration_seconds).label("total_seconds")
).where(
and_(*practice_conditions)
).group_by(
func.date(PracticeSession.start_time)
)
practice_result = await db.execute(practice_stmt)
for row in practice_result.all():
date_str = str(row.date)
if date_str in practice_time_data:
practice_time_data[date_str] = round(float(row.total_seconds) / 3600, 1)
# 如果period是week返回星期几标签
if period == "week":
weekday_labels = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
labels = weekday_labels[:len(date_list)]
else:
# 其他情况返回日期
labels = [d.strftime("%m-%d") for d in date_list]
study_values = [study_time_data[str(d)] for d in date_list]
practice_values = [practice_time_data[str(d)] for d in date_list]
return {
"labels": labels,
"studyTime": study_values,
"practiceTime": practice_values
}
@staticmethod
async def get_detail_data(
db: AsyncSession,
user_id: int,
course_id: Optional[int] = None,
period: str = "month"
) -> List[Dict[str, Any]]:
"""
获取详细统计数据(按日期)
Args:
db: 数据库会话
user_id: 用户ID
course_id: 课程ID可选
period: 时间范围
Returns:
List[Dict]: 每日详细统计数据
"""
logger.info(f"获取详细数据 - user_id: {user_id}, course_id: {course_id}, period: {period}")
start_date, end_date = StatisticsService._get_date_range(period)
# 构建查询条件
exam_conditions = [
Exam.user_id == user_id,
Exam.start_time >= start_date,
Exam.start_time <= end_date,
Exam.round1_score.isnot(None)
]
if course_id:
exam_conditions.append(Exam.course_id == course_id)
# 按日期分组统计
stmt = select(
func.date(Exam.start_time).label("date"),
func.count(Exam.id).label("exam_count"),
func.avg(Exam.round1_score).label("avg_score"),
func.sum(Exam.duration_minutes).label("total_duration"),
func.sum(Exam.question_count).label("total_questions")
).where(
and_(*exam_conditions)
).group_by(
func.date(Exam.start_time)
).order_by(
desc(func.date(Exam.start_time))
).limit(10) # 最多返回10条
result = await db.execute(stmt)
rows = result.all()
detail_list = []
for row in rows:
date_str = row.date.strftime("%Y-%m-%d")
exam_count = row.exam_count or 0
avg_score = round(float(row.avg_score or 0), 1)
study_time = round(float(row.total_duration or 0) / 60, 1)
question_count = row.total_questions or 0
# 统计当天的错题数
mistake_stmt = select(func.count(ExamMistake.id)).where(
and_(
ExamMistake.user_id == user_id,
ExamMistake.exam_id.in_(
select(Exam.id).where(
and_(
Exam.user_id == user_id,
func.date(Exam.start_time) == row.date
)
)
)
)
)
mistake_count = await db.scalar(mistake_stmt) or 0
# 计算正确率
accuracy = 0.0
if question_count > 0:
correct_count = question_count - mistake_count
accuracy = round((correct_count / question_count) * 100, 1)
# 计算进步指数(基于平均分)
improvement = min(100, max(0, int(avg_score)))
detail_list.append({
"date": date_str,
"examCount": exam_count,
"avgScore": avg_score,
"studyTime": study_time,
"questionCount": question_count,
"accuracy": accuracy,
"improvement": improvement
})
return detail_list