Files
012-kaopeilian/backend/app/services/badge_service.py
yuliang_guo 0933b936f9
Some checks failed
continuous-integration/drone/push Build is failing
feat: 新增等级与奖章系统
- 后端: 新增 user_levels, exp_history, badge_definitions, user_badges, level_configs 表
- 后端: 新增 LevelService 和 BadgeService 服务
- 后端: 新增等级/奖章/签到/排行榜 API 端点
- 后端: 考试/练习/陪练完成时触发经验值和奖章检查
- 前端: 新增 LevelBadge, ExpProgress, BadgeCard, LevelUpDialog 组件
- 前端: 新增排行榜页面
- 前端: 成长路径页面集成真实等级数据
- 数据库: 包含迁移脚本和初始数据
2026-01-29 16:19:22 +08:00

468 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
奖章服务
提供奖章管理功能:
- 获取奖章定义
- 检查奖章解锁条件
- 授予奖章
- 获取用户奖章
"""
from datetime import datetime
from typing import Optional, List, Dict, Any, Tuple
from sqlalchemy import select, func, and_, or_
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.logger import get_logger
from app.models.level import (
BadgeDefinition, UserBadge, UserLevel, ExpHistory, ExpType,
BadgeCategory, ConditionType
)
from app.models.exam import Exam
from app.models.practice import PracticeSession, PracticeReport
from app.models.training import TrainingSession, TrainingReport
from app.models.task import TaskAssignment
logger = get_logger(__name__)
class BadgeService:
"""奖章服务"""
def __init__(self, db: AsyncSession):
self.db = db
self._badge_definitions: Optional[List[BadgeDefinition]] = None
async def _get_badge_definitions(self) -> List[BadgeDefinition]:
"""获取所有奖章定义(带缓存)"""
if self._badge_definitions is None:
result = await self.db.execute(
select(BadgeDefinition)
.where(BadgeDefinition.is_active == True)
.order_by(BadgeDefinition.sort_order)
)
self._badge_definitions = list(result.scalars().all())
return self._badge_definitions
async def get_all_badges(self) -> List[Dict[str, Any]]:
"""
获取所有奖章定义
Returns:
奖章定义列表
"""
badges = await self._get_badge_definitions()
return [
{
"id": b.id,
"code": b.code,
"name": b.name,
"description": b.description,
"icon": b.icon,
"category": b.category,
"condition_type": b.condition_type,
"condition_value": b.condition_value,
"exp_reward": b.exp_reward,
}
for b in badges
]
async def get_user_badges(self, user_id: int) -> List[Dict[str, Any]]:
"""
获取用户已解锁的奖章
Args:
user_id: 用户ID
Returns:
用户奖章列表
"""
result = await self.db.execute(
select(UserBadge, BadgeDefinition)
.join(BadgeDefinition, UserBadge.badge_id == BadgeDefinition.id)
.where(UserBadge.user_id == user_id)
.order_by(UserBadge.unlocked_at.desc())
)
rows = result.all()
return [
{
"id": user_badge.id,
"badge_id": badge.id,
"code": badge.code,
"name": badge.name,
"description": badge.description,
"icon": badge.icon,
"category": badge.category,
"unlocked_at": user_badge.unlocked_at.isoformat() if user_badge.unlocked_at else None,
"is_notified": user_badge.is_notified,
}
for user_badge, badge in rows
]
async def get_user_badges_with_status(self, user_id: int) -> List[Dict[str, Any]]:
"""
获取所有奖章及用户解锁状态
Args:
user_id: 用户ID
Returns:
所有奖章列表(含解锁状态)
"""
# 获取所有奖章定义
all_badges = await self._get_badge_definitions()
# 获取用户已解锁的奖章
result = await self.db.execute(
select(UserBadge).where(UserBadge.user_id == user_id)
)
user_badges = {ub.badge_id: ub for ub in result.scalars().all()}
badges_with_status = []
for badge in all_badges:
user_badge = user_badges.get(badge.id)
badges_with_status.append({
"id": badge.id,
"code": badge.code,
"name": badge.name,
"description": badge.description,
"icon": badge.icon,
"category": badge.category,
"condition_type": badge.condition_type,
"condition_value": badge.condition_value,
"exp_reward": badge.exp_reward,
"unlocked": user_badge is not None,
"unlocked_at": user_badge.unlocked_at.isoformat() if user_badge else None,
})
return badges_with_status
async def _get_user_stats(self, user_id: int) -> Dict[str, Any]:
"""
获取用户统计数据(用于检查奖章条件)
Args:
user_id: 用户ID
Returns:
统计数据字典
"""
stats = {
"login_count": 0,
"login_streak": 0,
"course_completed": 0,
"exam_passed": 0,
"exam_perfect_count": 0,
"exam_excellent": 0,
"practice_count": 0,
"practice_hours": 0,
"training_count": 0,
"first_practice_90": 0,
"user_level": 1,
}
# 获取用户等级信息
result = await self.db.execute(
select(UserLevel).where(UserLevel.user_id == user_id)
)
user_level = result.scalar_one_or_none()
if user_level:
stats["login_streak"] = user_level.login_streak
stats["user_level"] = user_level.level
# 获取登录次数(从经验值历史)
result = await self.db.execute(
select(func.count(ExpHistory.id))
.where(
ExpHistory.user_id == user_id,
ExpHistory.exp_type == ExpType.LOGIN
)
)
stats["login_count"] = result.scalar() or 0
# 获取考试统计
result = await self.db.execute(
select(
func.count(Exam.id),
func.sum(func.if_(Exam.score >= 100, 1, 0)),
func.sum(func.if_(Exam.score >= 90, 1, 0))
)
.where(
Exam.user_id == user_id,
Exam.is_passed == True,
Exam.status == "submitted"
)
)
row = result.first()
if row:
stats["exam_passed"] = row[0] or 0
stats["exam_perfect_count"] = int(row[1] or 0)
stats["exam_excellent"] = int(row[2] or 0)
# 获取练习统计
result = await self.db.execute(
select(
func.count(PracticeSession.id),
func.sum(PracticeSession.duration_seconds)
)
.where(
PracticeSession.user_id == user_id,
PracticeSession.status == "completed"
)
)
row = result.first()
if row:
stats["practice_count"] = row[0] or 0
total_seconds = row[1] or 0
stats["practice_hours"] = total_seconds / 3600
# 获取陪练统计
result = await self.db.execute(
select(func.count(TrainingSession.id))
.where(
TrainingSession.user_id == user_id,
TrainingSession.status == "COMPLETED"
)
)
stats["training_count"] = result.scalar() or 0
# 检查首次高分陪练
result = await self.db.execute(
select(func.count(TrainingReport.id))
.where(
TrainingReport.user_id == user_id,
TrainingReport.overall_score >= 90
)
)
stats["first_practice_90"] = 1 if (result.scalar() or 0) > 0 else 0
return stats
async def check_and_award_badges(self, user_id: int) -> List[Dict[str, Any]]:
"""
检查并授予用户符合条件的奖章
Args:
user_id: 用户ID
Returns:
新获得的奖章列表
"""
# 获取用户统计数据
stats = await self._get_user_stats(user_id)
# 获取所有奖章定义
all_badges = await self._get_badge_definitions()
# 获取用户已有的奖章
result = await self.db.execute(
select(UserBadge.badge_id).where(UserBadge.user_id == user_id)
)
owned_badge_ids = {row[0] for row in result.all()}
# 检查每个奖章的解锁条件
newly_awarded = []
for badge in all_badges:
if badge.id in owned_badge_ids:
continue
# 检查条件
condition_met = self._check_badge_condition(badge, stats)
if condition_met:
# 授予奖章
user_badge = UserBadge(
user_id=user_id,
badge_id=badge.id,
unlocked_at=datetime.now(),
is_notified=False
)
self.db.add(user_badge)
# 如果有经验奖励,添加经验值
if badge.exp_reward > 0:
from app.services.level_service import LevelService
level_service = LevelService(self.db)
await level_service.add_exp(
user_id=user_id,
exp_amount=badge.exp_reward,
exp_type=ExpType.BADGE,
description=f"解锁奖章「{badge.name}"
)
newly_awarded.append({
"badge_id": badge.id,
"code": badge.code,
"name": badge.name,
"description": badge.description,
"icon": badge.icon,
"exp_reward": badge.exp_reward,
})
logger.info(f"用户 {user_id} 解锁奖章: {badge.name}")
if newly_awarded:
await self.db.flush()
return newly_awarded
def _check_badge_condition(self, badge: BadgeDefinition, stats: Dict[str, Any]) -> bool:
"""
检查奖章解锁条件
Args:
badge: 奖章定义
stats: 用户统计数据
Returns:
是否满足条件
"""
condition_field = badge.condition_field
condition_value = badge.condition_value
condition_type = badge.condition_type
if not condition_field:
return False
current_value = stats.get(condition_field, 0)
if condition_type == ConditionType.COUNT:
return current_value >= condition_value
elif condition_type == ConditionType.SCORE:
return current_value >= condition_value
elif condition_type == ConditionType.STREAK:
return current_value >= condition_value
elif condition_type == ConditionType.LEVEL:
return current_value >= condition_value
elif condition_type == ConditionType.DURATION:
return current_value >= condition_value
return False
async def award_badge(self, user_id: int, badge_code: str) -> Optional[Dict[str, Any]]:
"""
直接授予用户奖章(用于特殊奖章)
Args:
user_id: 用户ID
badge_code: 奖章编码
Returns:
奖章信息(如果成功)
"""
# 获取奖章定义
result = await self.db.execute(
select(BadgeDefinition).where(BadgeDefinition.code == badge_code)
)
badge = result.scalar_one_or_none()
if not badge:
logger.warning(f"奖章不存在: {badge_code}")
return None
# 检查是否已拥有
result = await self.db.execute(
select(UserBadge).where(
UserBadge.user_id == user_id,
UserBadge.badge_id == badge.id
)
)
if result.scalar_one_or_none():
logger.info(f"用户 {user_id} 已拥有奖章: {badge_code}")
return None
# 授予奖章
user_badge = UserBadge(
user_id=user_id,
badge_id=badge.id,
unlocked_at=datetime.now(),
is_notified=False
)
self.db.add(user_badge)
# 添加经验值奖励
if badge.exp_reward > 0:
from app.services.level_service import LevelService
level_service = LevelService(self.db)
await level_service.add_exp(
user_id=user_id,
exp_amount=badge.exp_reward,
exp_type=ExpType.BADGE,
description=f"解锁奖章「{badge.name}"
)
await self.db.flush()
logger.info(f"用户 {user_id} 获得奖章: {badge.name}")
return {
"badge_id": badge.id,
"code": badge.code,
"name": badge.name,
"description": badge.description,
"icon": badge.icon,
"exp_reward": badge.exp_reward,
}
async def get_unnotified_badges(self, user_id: int) -> List[Dict[str, Any]]:
"""
获取用户未通知的新奖章
Args:
user_id: 用户ID
Returns:
未通知的奖章列表
"""
result = await self.db.execute(
select(UserBadge, BadgeDefinition)
.join(BadgeDefinition, UserBadge.badge_id == BadgeDefinition.id)
.where(
UserBadge.user_id == user_id,
UserBadge.is_notified == False
)
.order_by(UserBadge.unlocked_at.desc())
)
rows = result.all()
return [
{
"user_badge_id": user_badge.id,
"badge_id": badge.id,
"code": badge.code,
"name": badge.name,
"description": badge.description,
"icon": badge.icon,
"exp_reward": badge.exp_reward,
"unlocked_at": user_badge.unlocked_at.isoformat() if user_badge.unlocked_at else None,
}
for user_badge, badge in rows
]
async def mark_badges_notified(self, user_id: int, badge_ids: List[int] = None):
"""
标记奖章为已通知
Args:
user_id: 用户ID
badge_ids: 要标记的奖章ID列表为空则标记全部
"""
from sqlalchemy import update
query = update(UserBadge).where(
UserBadge.user_id == user_id,
UserBadge.is_notified == False
)
if badge_ids:
query = query.where(UserBadge.badge_id.in_(badge_ids))
query = query.values(
is_notified=True,
notified_at=datetime.now()
)
await self.db.execute(query)
await self.db.flush()