Files
012-kaopeilian/backend/app/services/badge_service.py
yuliang_guo 6f0f2e6363
Some checks failed
continuous-integration/drone/push Build is failing
feat: KPL v1.5.0 功能迭代
1. 奖章条件优化
- 修复统计查询 SQL 语法
- 添加按类别检查奖章方法

2. 移动端适配
- 登录页、课程中心、课程详情
- 考试页面、成长路径、排行榜

3. 证书系统
- 数据库模型和迁移脚本
- 证书颁发/列表/下载/验证 API
- 前端证书列表页面

4. 数据大屏
- 企业级/团队级数据 API
- ECharts 可视化大屏页面
2026-01-29 16:51:17 +08:00

587 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
奖章服务
提供奖章管理功能:
- 获取奖章定义
- 检查奖章解锁条件
- 授予奖章
- 获取用户奖章
"""
from datetime import datetime
from typing import Optional, List, Dict, Any, Tuple
from sqlalchemy import select, func, and_, or_, case
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.logger import get_logger
from app.models.level import (
BadgeDefinition, UserBadge, UserLevel, ExpHistory, ExpType,
BadgeCategory, ConditionType
)
from app.models.exam import Exam
from app.models.practice import PracticeSession, PracticeReport
from app.models.training import TrainingSession, TrainingReport
from app.models.task import TaskAssignment
logger = get_logger(__name__)
class BadgeService:
"""奖章服务"""
def __init__(self, db: AsyncSession):
self.db = db
self._badge_definitions: Optional[List[BadgeDefinition]] = None
async def _get_badge_definitions(self) -> List[BadgeDefinition]:
"""获取所有奖章定义(带缓存)"""
if self._badge_definitions is None:
result = await self.db.execute(
select(BadgeDefinition)
.where(BadgeDefinition.is_active == True)
.order_by(BadgeDefinition.sort_order)
)
self._badge_definitions = list(result.scalars().all())
return self._badge_definitions
async def get_all_badges(self) -> List[Dict[str, Any]]:
"""
获取所有奖章定义
Returns:
奖章定义列表
"""
badges = await self._get_badge_definitions()
return [
{
"id": b.id,
"code": b.code,
"name": b.name,
"description": b.description,
"icon": b.icon,
"category": b.category,
"condition_type": b.condition_type,
"condition_value": b.condition_value,
"exp_reward": b.exp_reward,
}
for b in badges
]
async def get_user_badges(self, user_id: int) -> List[Dict[str, Any]]:
"""
获取用户已解锁的奖章
Args:
user_id: 用户ID
Returns:
用户奖章列表
"""
result = await self.db.execute(
select(UserBadge, BadgeDefinition)
.join(BadgeDefinition, UserBadge.badge_id == BadgeDefinition.id)
.where(UserBadge.user_id == user_id)
.order_by(UserBadge.unlocked_at.desc())
)
rows = result.all()
return [
{
"id": user_badge.id,
"badge_id": badge.id,
"code": badge.code,
"name": badge.name,
"description": badge.description,
"icon": badge.icon,
"category": badge.category,
"unlocked_at": user_badge.unlocked_at.isoformat() if user_badge.unlocked_at else None,
"is_notified": user_badge.is_notified,
}
for user_badge, badge in rows
]
async def get_user_badges_with_status(self, user_id: int) -> List[Dict[str, Any]]:
"""
获取所有奖章及用户解锁状态
Args:
user_id: 用户ID
Returns:
所有奖章列表(含解锁状态)
"""
# 获取所有奖章定义
all_badges = await self._get_badge_definitions()
# 获取用户已解锁的奖章
result = await self.db.execute(
select(UserBadge).where(UserBadge.user_id == user_id)
)
user_badges = {ub.badge_id: ub for ub in result.scalars().all()}
badges_with_status = []
for badge in all_badges:
user_badge = user_badges.get(badge.id)
badges_with_status.append({
"id": badge.id,
"code": badge.code,
"name": badge.name,
"description": badge.description,
"icon": badge.icon,
"category": badge.category,
"condition_type": badge.condition_type,
"condition_value": badge.condition_value,
"exp_reward": badge.exp_reward,
"unlocked": user_badge is not None,
"unlocked_at": user_badge.unlocked_at.isoformat() if user_badge else None,
})
return badges_with_status
async def _get_user_stats(self, user_id: int) -> Dict[str, Any]:
"""
获取用户统计数据(用于检查奖章条件)
Args:
user_id: 用户ID
Returns:
统计数据字典
"""
stats = {
"login_count": 0,
"login_streak": 0,
"course_completed": 0,
"exam_passed": 0,
"exam_perfect_count": 0,
"exam_excellent": 0,
"practice_count": 0,
"practice_hours": 0,
"training_count": 0,
"first_practice_90": 0,
"user_level": 1,
}
try:
# 获取用户等级信息
result = await self.db.execute(
select(UserLevel).where(UserLevel.user_id == user_id)
)
user_level = result.scalar_one_or_none()
if user_level:
stats["login_streak"] = user_level.login_streak or 0
stats["user_level"] = user_level.level or 1
# 获取登录/签到次数(从经验值历史)
result = await self.db.execute(
select(func.count(ExpHistory.id))
.where(
ExpHistory.user_id == user_id,
ExpHistory.exp_type == ExpType.LOGIN
)
)
stats["login_count"] = result.scalar() or 0
# 获取考试统计 - 使用 case 语句
# 通过考试数量
result = await self.db.execute(
select(func.count(Exam.id))
.where(
Exam.user_id == user_id,
Exam.is_passed == True,
Exam.status == "submitted"
)
)
stats["exam_passed"] = result.scalar() or 0
# 满分考试数量score >= 总分,通常是 100
result = await self.db.execute(
select(func.count(Exam.id))
.where(
Exam.user_id == user_id,
Exam.status == "submitted",
Exam.score >= Exam.total_score
)
)
stats["exam_perfect_count"] = result.scalar() or 0
# 优秀考试数量90分以上
result = await self.db.execute(
select(func.count(Exam.id))
.where(
Exam.user_id == user_id,
Exam.status == "submitted",
Exam.score >= 90
)
)
stats["exam_excellent"] = result.scalar() or 0
# 获取练习统计PracticeSession - AI 陪练)
result = await self.db.execute(
select(
func.count(PracticeSession.id),
func.coalesce(func.sum(PracticeSession.duration_seconds), 0)
)
.where(
PracticeSession.user_id == user_id,
PracticeSession.status == "completed"
)
)
row = result.first()
if row:
stats["practice_count"] = row[0] or 0
total_seconds = row[1] or 0
stats["practice_hours"] = float(total_seconds) / 3600.0
# 获取培训/陪练统计TrainingSession
result = await self.db.execute(
select(func.count(TrainingSession.id))
.where(
TrainingSession.user_id == user_id,
TrainingSession.status == "COMPLETED"
)
)
stats["training_count"] = result.scalar() or 0
# 检查是否有高分陪练90分以上
result = await self.db.execute(
select(func.count(TrainingReport.id))
.where(
TrainingReport.user_id == user_id,
TrainingReport.overall_score >= 90
)
)
high_score_count = result.scalar() or 0
stats["first_practice_90"] = 1 if high_score_count > 0 else 0
logger.debug(f"用户 {user_id} 奖章统计数据: {stats}")
except Exception as e:
logger.error(f"获取用户统计数据失败: {e}")
return stats
async def check_and_award_badges(self, user_id: int) -> List[Dict[str, Any]]:
"""
检查并授予用户符合条件的奖章
Args:
user_id: 用户ID
Returns:
新获得的奖章列表
"""
# 获取用户统计数据
stats = await self._get_user_stats(user_id)
# 获取所有奖章定义
all_badges = await self._get_badge_definitions()
# 获取用户已有的奖章
result = await self.db.execute(
select(UserBadge.badge_id).where(UserBadge.user_id == user_id)
)
owned_badge_ids = {row[0] for row in result.all()}
# 检查每个奖章的解锁条件
newly_awarded = []
for badge in all_badges:
if badge.id in owned_badge_ids:
continue
# 检查条件
condition_met = self._check_badge_condition(badge, stats)
if condition_met:
# 授予奖章
user_badge = UserBadge(
user_id=user_id,
badge_id=badge.id,
unlocked_at=datetime.now(),
is_notified=False
)
self.db.add(user_badge)
# 如果有经验奖励,添加经验值
if badge.exp_reward > 0:
from app.services.level_service import LevelService
level_service = LevelService(self.db)
await level_service.add_exp(
user_id=user_id,
exp_amount=badge.exp_reward,
exp_type=ExpType.BADGE,
description=f"解锁奖章「{badge.name}"
)
newly_awarded.append({
"badge_id": badge.id,
"code": badge.code,
"name": badge.name,
"description": badge.description,
"icon": badge.icon,
"exp_reward": badge.exp_reward,
})
logger.info(f"用户 {user_id} 解锁奖章: {badge.name}")
if newly_awarded:
await self.db.flush()
return newly_awarded
def _check_badge_condition(self, badge: BadgeDefinition, stats: Dict[str, Any]) -> bool:
"""
检查奖章解锁条件
Args:
badge: 奖章定义
stats: 用户统计数据
Returns:
是否满足条件
"""
condition_field = badge.condition_field
condition_value = badge.condition_value
condition_type = badge.condition_type
if not condition_field:
return False
current_value = stats.get(condition_field, 0)
if condition_type == ConditionType.COUNT:
return current_value >= condition_value
elif condition_type == ConditionType.SCORE:
return current_value >= condition_value
elif condition_type == ConditionType.STREAK:
return current_value >= condition_value
elif condition_type == ConditionType.LEVEL:
return current_value >= condition_value
elif condition_type == ConditionType.DURATION:
return current_value >= condition_value
return False
async def award_badge(self, user_id: int, badge_code: str) -> Optional[Dict[str, Any]]:
"""
直接授予用户奖章(用于特殊奖章)
Args:
user_id: 用户ID
badge_code: 奖章编码
Returns:
奖章信息(如果成功)
"""
# 获取奖章定义
result = await self.db.execute(
select(BadgeDefinition).where(BadgeDefinition.code == badge_code)
)
badge = result.scalar_one_or_none()
if not badge:
logger.warning(f"奖章不存在: {badge_code}")
return None
# 检查是否已拥有
result = await self.db.execute(
select(UserBadge).where(
UserBadge.user_id == user_id,
UserBadge.badge_id == badge.id
)
)
if result.scalar_one_or_none():
logger.info(f"用户 {user_id} 已拥有奖章: {badge_code}")
return None
# 授予奖章
user_badge = UserBadge(
user_id=user_id,
badge_id=badge.id,
unlocked_at=datetime.now(),
is_notified=False
)
self.db.add(user_badge)
# 添加经验值奖励
if badge.exp_reward > 0:
from app.services.level_service import LevelService
level_service = LevelService(self.db)
await level_service.add_exp(
user_id=user_id,
exp_amount=badge.exp_reward,
exp_type=ExpType.BADGE,
description=f"解锁奖章「{badge.name}"
)
await self.db.flush()
logger.info(f"用户 {user_id} 获得奖章: {badge.name}")
return {
"badge_id": badge.id,
"code": badge.code,
"name": badge.name,
"description": badge.description,
"icon": badge.icon,
"exp_reward": badge.exp_reward,
}
async def get_unnotified_badges(self, user_id: int) -> List[Dict[str, Any]]:
"""
获取用户未通知的新奖章
Args:
user_id: 用户ID
Returns:
未通知的奖章列表
"""
result = await self.db.execute(
select(UserBadge, BadgeDefinition)
.join(BadgeDefinition, UserBadge.badge_id == BadgeDefinition.id)
.where(
UserBadge.user_id == user_id,
UserBadge.is_notified == False
)
.order_by(UserBadge.unlocked_at.desc())
)
rows = result.all()
return [
{
"user_badge_id": user_badge.id,
"badge_id": badge.id,
"code": badge.code,
"name": badge.name,
"description": badge.description,
"icon": badge.icon,
"exp_reward": badge.exp_reward,
"unlocked_at": user_badge.unlocked_at.isoformat() if user_badge.unlocked_at else None,
}
for user_badge, badge in rows
]
async def mark_badges_notified(self, user_id: int, badge_ids: List[int] = None):
"""
标记奖章为已通知
Args:
user_id: 用户ID
badge_ids: 要标记的奖章ID列表为空则标记全部
"""
from sqlalchemy import update
query = update(UserBadge).where(
UserBadge.user_id == user_id,
UserBadge.is_notified == False
)
if badge_ids:
query = query.where(UserBadge.badge_id.in_(badge_ids))
query = query.values(
is_notified=True,
notified_at=datetime.now()
)
await self.db.execute(query)
await self.db.flush()
async def check_badges_by_category(
self,
user_id: int,
categories: List[str]
) -> List[Dict[str, Any]]:
"""
按类别检查并授予奖章(优化触发时机)
Args:
user_id: 用户ID
categories: 要检查的奖章类别列表
Returns:
新获得的奖章列表
"""
# 获取用户统计数据
stats = await self._get_user_stats(user_id)
# 获取指定类别的奖章定义
result = await self.db.execute(
select(BadgeDefinition)
.where(
BadgeDefinition.is_active == True,
BadgeDefinition.category.in_(categories)
)
.order_by(BadgeDefinition.sort_order)
)
category_badges = list(result.scalars().all())
# 获取用户已有的奖章
result = await self.db.execute(
select(UserBadge.badge_id).where(UserBadge.user_id == user_id)
)
owned_badge_ids = {row[0] for row in result.all()}
# 检查每个奖章的解锁条件
newly_awarded = []
for badge in category_badges:
if badge.id in owned_badge_ids:
continue
# 检查条件
condition_met = self._check_badge_condition(badge, stats)
if condition_met:
# 授予奖章
user_badge = UserBadge(
user_id=user_id,
badge_id=badge.id,
unlocked_at=datetime.now(),
is_notified=False
)
self.db.add(user_badge)
# 如果有经验奖励,添加经验值
if badge.exp_reward > 0:
from app.services.level_service import LevelService
level_service = LevelService(self.db)
await level_service.add_exp(
user_id=user_id,
exp_amount=badge.exp_reward,
exp_type=ExpType.BADGE,
description=f"解锁奖章「{badge.name}"
)
newly_awarded.append({
"badge_id": badge.id,
"code": badge.code,
"name": badge.name,
"description": badge.description,
"icon": badge.icon,
"exp_reward": badge.exp_reward,
})
logger.info(f"用户 {user_id} 解锁奖章: {badge.name}")
if newly_awarded:
await self.db.flush()
return newly_awarded
async def check_exam_badges(self, user_id: int) -> List[Dict[str, Any]]:
"""考试后检查考试类奖章"""
return await self.check_badges_by_category(user_id, [BadgeCategory.EXAM])
async def check_practice_badges(self, user_id: int) -> List[Dict[str, Any]]:
"""练习后检查练习类奖章"""
return await self.check_badges_by_category(user_id, [BadgeCategory.PRACTICE])
async def check_streak_badges(self, user_id: int) -> List[Dict[str, Any]]:
"""签到后检查连续打卡类奖章"""
return await self.check_badges_by_category(user_id, [BadgeCategory.STREAK, BadgeCategory.LEARNING])
async def check_level_badges(self, user_id: int) -> List[Dict[str, Any]]:
"""等级变化后检查等级类奖章"""
return await self.check_badges_by_category(user_id, [BadgeCategory.SPECIAL])