Files
012-kaopeilian/backend/app/services/dashboard_service.py
yuliang_guo 8bfd5aa3de
All checks were successful
continuous-integration/drone/push Build is passing
fix: 修复TrainingSession状态比较大小写问题
- COMPLETED -> completed (枚举值是小写)
2026-02-02 13:02:19 +08:00

558 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
数据大屏服务
提供企业级和团队级数据大屏功能:
- 学习数据概览
- 部门/团队对比
- 趋势分析
- 实时动态
"""
from datetime import datetime, timedelta, date
from typing import Optional, List, Dict, Any
from sqlalchemy import select, func, and_, or_, desc, case
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.logger import get_logger
from app.models.user import User
from app.models.course import Course, CourseMaterial
from app.models.exam import Exam
from app.models.practice import PracticeSession
from app.models.training import TrainingSession, TrainingReport
from app.models.level import UserLevel, ExpHistory, UserBadge
from app.models.position import Position
from app.models.position_member import PositionMember
logger = get_logger(__name__)
class DashboardService:
"""数据大屏服务"""
def __init__(self, db: AsyncSession):
self.db = db
async def get_enterprise_overview(self, enterprise_id: Optional[int] = None) -> Dict[str, Any]:
"""
获取企业级数据概览
Args:
enterprise_id: 企业ID可选用于多租户
Returns:
企业级数据概览
"""
try:
today = date.today()
week_ago = today - timedelta(days=7)
month_ago = today - timedelta(days=30)
# 基础统计
# 1. 总学员数
result = await self.db.execute(
select(func.count(User.id))
.where(User.is_deleted == False, User.role == 'trainee')
)
total_users = result.scalar() or 0
# 2. 今日活跃用户(有经验值记录)
try:
result = await self.db.execute(
select(func.count(func.distinct(ExpHistory.user_id)))
.where(func.date(ExpHistory.created_at) == today)
)
today_active = result.scalar() or 0
except Exception as e:
logger.warning(f"获取今日活跃用户失败: {e}")
today_active = 0
# 3. 本周活跃用户
try:
result = await self.db.execute(
select(func.count(func.distinct(ExpHistory.user_id)))
.where(ExpHistory.created_at >= datetime.combine(week_ago, datetime.min.time()))
)
week_active = result.scalar() or 0
except Exception as e:
logger.warning(f"获取本周活跃用户失败: {e}")
week_active = 0
# 4. 本月活跃用户
try:
result = await self.db.execute(
select(func.count(func.distinct(ExpHistory.user_id)))
.where(ExpHistory.created_at >= datetime.combine(month_ago, datetime.min.time()))
)
month_active = result.scalar() or 0
except Exception as e:
logger.warning(f"获取本月活跃用户失败: {e}")
month_active = 0
# 5. 总学习时长(小时)
practice_hours = 0
training_hours = 0
try:
result = await self.db.execute(
select(func.coalesce(func.sum(PracticeSession.duration_seconds), 0))
.where(PracticeSession.status == 'completed')
)
practice_hours = (result.scalar() or 0) / 3600
except Exception as e:
logger.warning(f"获取陪练时长失败: {e}")
try:
result = await self.db.execute(
select(func.coalesce(func.sum(TrainingSession.duration_seconds), 0))
.where(TrainingSession.status == 'completed') # 修复: 使用小写
)
training_hours = (result.scalar() or 0) / 3600
except Exception as e:
logger.warning(f"获取培训时长失败: {e}")
total_hours = round(practice_hours + training_hours, 1)
# 6. 考试统计
exam_count = 0
exam_passed = 0
exam_avg_score = 0
try:
result = await self.db.execute(
select(
func.count(Exam.id),
func.count(case((Exam.is_passed == True, 1))),
func.avg(Exam.score)
)
.where(Exam.status == 'submitted')
)
exam_row = result.first()
if exam_row:
exam_count = exam_row[0] or 0
exam_passed = exam_row[1] or 0
exam_avg_score = round(exam_row[2] or 0, 1)
except Exception as e:
logger.warning(f"获取考试统计失败: {e}")
exam_pass_rate = round(exam_passed / exam_count * 100, 1) if exam_count > 0 else 0
# 7. 满分人数
perfect_users = 0
try:
result = await self.db.execute(
select(func.count(func.distinct(Exam.user_id)))
.where(
Exam.status == 'submitted',
Exam.score.isnot(None),
Exam.total_score.isnot(None),
Exam.score >= Exam.total_score
)
)
perfect_users = result.scalar() or 0
except Exception as e:
logger.warning(f"获取满分人数失败: {e}")
# 8. 签到率(今日签到人数/总用户数)
today_checkin = 0
try:
result = await self.db.execute(
select(func.count(UserLevel.id))
.where(UserLevel.last_login_date == today)
)
today_checkin = result.scalar() or 0
except Exception as e:
logger.warning(f"获取签到率失败: {e}")
checkin_rate = round(today_checkin / total_users * 100, 1) if total_users > 0 else 0
return {
"overview": {
"total_users": total_users,
"today_active": today_active,
"week_active": week_active,
"month_active": month_active,
"total_hours": total_hours,
"checkin_rate": checkin_rate,
},
"exam": {
"total_count": exam_count,
"pass_rate": exam_pass_rate,
"avg_score": exam_avg_score,
"perfect_users": perfect_users,
},
"updated_at": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"获取企业概览失败: {e}")
# 返回默认数据而不是抛出异常
return {
"overview": {
"total_users": 0,
"today_active": 0,
"week_active": 0,
"month_active": 0,
"total_hours": 0,
"checkin_rate": 0,
},
"exam": {
"total_count": 0,
"pass_rate": 0,
"avg_score": 0,
"perfect_users": 0,
},
"updated_at": datetime.now().isoformat()
}
async def get_department_comparison(self) -> List[Dict[str, Any]]:
"""
获取部门/团队学习对比数据
Returns:
部门对比列表
"""
# 获取所有岗位及其成员的学习数据
result = await self.db.execute(
select(Position)
.where(Position.is_deleted == False)
.order_by(Position.name)
)
positions = result.scalars().all()
departments = []
for pos in positions:
# 获取该岗位的成员数
result = await self.db.execute(
select(func.count(PositionMember.id))
.where(PositionMember.position_id == pos.id)
)
member_count = result.scalar() or 0
if member_count == 0:
continue
# 获取成员ID列表
result = await self.db.execute(
select(PositionMember.user_id)
.where(PositionMember.position_id == pos.id)
)
member_ids = [row[0] for row in result.all()]
# 统计该岗位成员的学习数据
# 考试通过率
result = await self.db.execute(
select(
func.count(Exam.id),
func.count(case((Exam.is_passed == True, 1)))
)
.where(
Exam.user_id.in_(member_ids),
Exam.status == 'submitted'
)
)
exam_row = result.first()
exam_total = exam_row[0] or 0
exam_passed = exam_row[1] or 0
pass_rate = round(exam_passed / exam_total * 100, 1) if exam_total > 0 else 0
# 平均学习时长
result = await self.db.execute(
select(func.coalesce(func.sum(PracticeSession.duration_seconds), 0))
.where(
PracticeSession.user_id.in_(member_ids),
PracticeSession.status == 'completed'
)
)
total_seconds = result.scalar() or 0
avg_hours = round(total_seconds / 3600 / member_count, 1) if member_count > 0 else 0
# 平均等级
result = await self.db.execute(
select(func.avg(UserLevel.level))
.where(UserLevel.user_id.in_(member_ids))
)
avg_level = round(result.scalar() or 1, 1)
departments.append({
"id": pos.id,
"name": pos.name,
"member_count": member_count,
"pass_rate": pass_rate,
"avg_hours": avg_hours,
"avg_level": avg_level,
})
# 按通过率排序
departments.sort(key=lambda x: x["pass_rate"], reverse=True)
return departments
async def get_learning_trend(self, days: int = 7) -> Dict[str, Any]:
"""
获取学习趋势数据
Args:
days: 统计天数
Returns:
趋势数据
"""
today = date.today()
dates = [(today - timedelta(days=i)) for i in range(days-1, -1, -1)]
trend_data = []
for d in dates:
# 当日活跃用户
result = await self.db.execute(
select(func.count(func.distinct(ExpHistory.user_id)))
.where(func.date(ExpHistory.created_at) == d)
)
active_users = result.scalar() or 0
# 当日新增学习时长
result = await self.db.execute(
select(func.coalesce(func.sum(PracticeSession.duration_seconds), 0))
.where(
func.date(PracticeSession.created_at) == d,
PracticeSession.status == 'completed'
)
)
hours = round((result.scalar() or 0) / 3600, 1)
# 当日考试次数
result = await self.db.execute(
select(func.count(Exam.id))
.where(
func.date(Exam.created_at) == d,
Exam.status == 'submitted'
)
)
exams = result.scalar() or 0
trend_data.append({
"date": d.isoformat(),
"active_users": active_users,
"learning_hours": hours,
"exam_count": exams,
})
return {
"dates": [d.isoformat() for d in dates],
"trend": trend_data
}
async def get_level_distribution(self) -> Dict[str, Any]:
"""
获取等级分布数据
Returns:
等级分布
"""
result = await self.db.execute(
select(UserLevel.level, func.count(UserLevel.id))
.group_by(UserLevel.level)
.order_by(UserLevel.level)
)
rows = result.all()
distribution = {row[0]: row[1] for row in rows}
# 补全1-10级
for i in range(1, 11):
if i not in distribution:
distribution[i] = 0
return {
"levels": list(range(1, 11)),
"counts": [distribution.get(i, 0) for i in range(1, 11)]
}
async def get_realtime_activities(self, limit: int = 20) -> List[Dict[str, Any]]:
"""
获取实时动态
Args:
limit: 数量限制
Returns:
实时动态列表
"""
activities = []
try:
# 获取最近的经验值记录
result = await self.db.execute(
select(ExpHistory, User)
.join(User, ExpHistory.user_id == User.id)
.order_by(ExpHistory.created_at.desc())
.limit(limit)
)
rows = result.all()
for exp, user in rows:
activity_type = "学习"
description = exp.description or ""
if "考试" in description:
activity_type = "考试"
elif "签到" in description:
activity_type = "签到"
elif "陪练" in description:
activity_type = "陪练"
elif "奖章" in description:
activity_type = "奖章"
activities.append({
"id": exp.id,
"user_id": user.id,
"user_name": user.full_name or user.username,
"type": activity_type,
"description": description,
"exp_amount": exp.exp_change, # 修复: exp_change 而非 exp_amount
"created_at": exp.created_at.isoformat() if exp.created_at else None,
})
except Exception as e:
logger.error(f"获取实时动态失败: {e}")
# 返回空列表而不是抛出异常
return activities
async def get_team_dashboard(self, team_leader_id: int) -> Dict[str, Any]:
"""
获取团队级数据大屏
Args:
team_leader_id: 团队负责人ID
Returns:
团队数据
"""
# 获取团队负责人管理的岗位
result = await self.db.execute(
select(Position)
.where(
Position.is_deleted == False,
or_(
Position.manager_id == team_leader_id,
Position.created_by == team_leader_id
)
)
)
positions = result.scalars().all()
position_ids = [p.id for p in positions]
if not position_ids:
return {
"members": [],
"overview": {
"total_members": 0,
"avg_level": 0,
"avg_exp": 0,
"total_badges": 0,
},
"pending_tasks": []
}
# 获取团队成员
result = await self.db.execute(
select(PositionMember.user_id)
.where(PositionMember.position_id.in_(position_ids))
)
member_ids = [row[0] for row in result.all()]
if not member_ids:
return {
"members": [],
"overview": {
"total_members": 0,
"avg_level": 0,
"avg_exp": 0,
"total_badges": 0,
},
"pending_tasks": []
}
# 获取成员详细信息
result = await self.db.execute(
select(User, UserLevel)
.outerjoin(UserLevel, User.id == UserLevel.user_id)
.where(User.id.in_(member_ids))
.order_by(UserLevel.total_exp.desc().nullslast())
)
rows = result.all()
members = []
total_exp = 0
total_level = 0
for user, level in rows:
user_level = level.level if level else 1
user_exp = level.total_exp if level else 0
total_level += user_level
total_exp += user_exp
# 获取用户奖章数
result = await self.db.execute(
select(func.count(UserBadge.id))
.where(UserBadge.user_id == user.id)
)
badge_count = result.scalar() or 0
members.append({
"id": user.id,
"username": user.username,
"full_name": user.full_name,
"avatar_url": user.avatar_url,
"level": user_level,
"total_exp": user_exp,
"badge_count": badge_count,
})
total_members = len(members)
# 获取团队总奖章数
result = await self.db.execute(
select(func.count(UserBadge.id))
.where(UserBadge.user_id.in_(member_ids))
)
total_badges = result.scalar() or 0
return {
"members": members,
"overview": {
"total_members": total_members,
"avg_level": round(total_level / total_members, 1) if total_members > 0 else 0,
"avg_exp": round(total_exp / total_members) if total_members > 0 else 0,
"total_badges": total_badges,
},
"positions": [{"id": p.id, "name": p.name} for p in positions]
}
async def get_course_ranking(self, limit: int = 10) -> List[Dict[str, Any]]:
"""
获取课程热度排行
Args:
limit: 数量限制
Returns:
课程排行列表
"""
# 这里简化实现,实际应该统计课程学习次数
result = await self.db.execute(
select(Course)
.where(Course.is_deleted == False, Course.is_published == True)
.order_by(Course.created_at.desc())
.limit(limit)
)
courses = result.scalars().all()
ranking = []
for i, course in enumerate(courses, 1):
ranking.append({
"rank": i,
"id": course.id,
"name": course.name,
"description": course.description,
# 这里可以添加实际的学习人数统计
"learners": 0,
})
return ranking