""" 奖章服务 提供奖章管理功能: - 获取奖章定义 - 检查奖章解锁条件 - 授予奖章 - 获取用户奖章 """ from datetime import datetime from typing import Optional, List, Dict, Any, Tuple from sqlalchemy import select, func, and_, or_, case from sqlalchemy.ext.asyncio import AsyncSession from app.core.logger import get_logger from app.models.level import ( BadgeDefinition, UserBadge, UserLevel, ExpHistory, ExpType, BadgeCategory, ConditionType ) from app.models.exam import Exam from app.models.practice import PracticeSession, PracticeReport from app.models.training import TrainingSession, TrainingReport from app.models.task import TaskAssignment logger = get_logger(__name__) class BadgeService: """奖章服务""" def __init__(self, db: AsyncSession): self.db = db self._badge_definitions: Optional[List[BadgeDefinition]] = None async def _get_badge_definitions(self) -> List[BadgeDefinition]: """获取所有奖章定义(带缓存)""" if self._badge_definitions is None: result = await self.db.execute( select(BadgeDefinition) .where(BadgeDefinition.is_active == True) .order_by(BadgeDefinition.sort_order) ) self._badge_definitions = list(result.scalars().all()) return self._badge_definitions async def get_all_badges(self) -> List[Dict[str, Any]]: """ 获取所有奖章定义 Returns: 奖章定义列表 """ badges = await self._get_badge_definitions() return [ { "id": b.id, "code": b.code, "name": b.name, "description": b.description, "icon": b.icon, "category": b.category, "condition_type": b.condition_type, "condition_value": b.condition_value, "exp_reward": b.exp_reward, } for b in badges ] async def get_user_badges(self, user_id: int) -> List[Dict[str, Any]]: """ 获取用户已解锁的奖章 Args: user_id: 用户ID Returns: 用户奖章列表 """ result = await self.db.execute( select(UserBadge, BadgeDefinition) .join(BadgeDefinition, UserBadge.badge_id == BadgeDefinition.id) .where(UserBadge.user_id == user_id) .order_by(UserBadge.unlocked_at.desc()) ) rows = result.all() return [ { "id": user_badge.id, "badge_id": badge.id, "code": badge.code, "name": badge.name, "description": badge.description, "icon": badge.icon, "category": badge.category, "unlocked_at": user_badge.unlocked_at.isoformat() if user_badge.unlocked_at else None, "is_notified": user_badge.is_notified, } for user_badge, badge in rows ] async def get_user_badges_with_status(self, user_id: int) -> List[Dict[str, Any]]: """ 获取所有奖章及用户解锁状态 Args: user_id: 用户ID Returns: 所有奖章列表(含解锁状态) """ # 获取所有奖章定义 all_badges = await self._get_badge_definitions() # 获取用户已解锁的奖章 result = await self.db.execute( select(UserBadge).where(UserBadge.user_id == user_id) ) user_badges = {ub.badge_id: ub for ub in result.scalars().all()} badges_with_status = [] for badge in all_badges: user_badge = user_badges.get(badge.id) badges_with_status.append({ "id": badge.id, "code": badge.code, "name": badge.name, "description": badge.description, "icon": badge.icon, "category": badge.category, "condition_type": badge.condition_type, "condition_value": badge.condition_value, "exp_reward": badge.exp_reward, "unlocked": user_badge is not None, "unlocked_at": user_badge.unlocked_at.isoformat() if user_badge else None, }) return badges_with_status async def _get_user_stats(self, user_id: int) -> Dict[str, Any]: """ 获取用户统计数据(用于检查奖章条件) Args: user_id: 用户ID Returns: 统计数据字典 """ stats = { "login_count": 0, "login_streak": 0, "course_completed": 0, "exam_passed": 0, "exam_perfect_count": 0, "exam_excellent": 0, "practice_count": 0, "practice_hours": 0, "training_count": 0, "first_practice_90": 0, "user_level": 1, } try: # 获取用户等级信息 result = await self.db.execute( select(UserLevel).where(UserLevel.user_id == user_id) ) user_level = result.scalar_one_or_none() if user_level: stats["login_streak"] = user_level.login_streak or 0 stats["user_level"] = user_level.level or 1 # 获取登录/签到次数(从经验值历史) result = await self.db.execute( select(func.count(ExpHistory.id)) .where( ExpHistory.user_id == user_id, ExpHistory.exp_type == ExpType.LOGIN ) ) stats["login_count"] = result.scalar() or 0 # 获取考试统计 - 使用 case 语句 # 通过考试数量 result = await self.db.execute( select(func.count(Exam.id)) .where( Exam.user_id == user_id, Exam.is_passed == True, Exam.status == "submitted" ) ) stats["exam_passed"] = result.scalar() or 0 # 满分考试数量(score >= 总分,通常是 100) result = await self.db.execute( select(func.count(Exam.id)) .where( Exam.user_id == user_id, Exam.status == "submitted", Exam.score >= Exam.total_score ) ) stats["exam_perfect_count"] = result.scalar() or 0 # 优秀考试数量(90分以上) result = await self.db.execute( select(func.count(Exam.id)) .where( Exam.user_id == user_id, Exam.status == "submitted", Exam.score >= 90 ) ) stats["exam_excellent"] = result.scalar() or 0 # 获取练习统计(PracticeSession - AI 陪练) result = await self.db.execute( select( func.count(PracticeSession.id), func.coalesce(func.sum(PracticeSession.duration_seconds), 0) ) .where( PracticeSession.user_id == user_id, PracticeSession.status == "completed" ) ) row = result.first() if row: stats["practice_count"] = row[0] or 0 total_seconds = row[1] or 0 stats["practice_hours"] = float(total_seconds) / 3600.0 # 获取培训/陪练统计(TrainingSession) result = await self.db.execute( select(func.count(TrainingSession.id)) .where( TrainingSession.user_id == user_id, TrainingSession.status == "COMPLETED" ) ) stats["training_count"] = result.scalar() or 0 # 检查是否有高分陪练(90分以上) result = await self.db.execute( select(func.count(TrainingReport.id)) .where( TrainingReport.user_id == user_id, TrainingReport.overall_score >= 90 ) ) high_score_count = result.scalar() or 0 stats["first_practice_90"] = 1 if high_score_count > 0 else 0 logger.debug(f"用户 {user_id} 奖章统计数据: {stats}") except Exception as e: logger.error(f"获取用户统计数据失败: {e}") return stats async def check_and_award_badges(self, user_id: int) -> List[Dict[str, Any]]: """ 检查并授予用户符合条件的奖章 Args: user_id: 用户ID Returns: 新获得的奖章列表 """ # 获取用户统计数据 stats = await self._get_user_stats(user_id) # 获取所有奖章定义 all_badges = await self._get_badge_definitions() # 获取用户已有的奖章 result = await self.db.execute( select(UserBadge.badge_id).where(UserBadge.user_id == user_id) ) owned_badge_ids = {row[0] for row in result.all()} # 检查每个奖章的解锁条件 newly_awarded = [] for badge in all_badges: if badge.id in owned_badge_ids: continue # 检查条件 condition_met = self._check_badge_condition(badge, stats) if condition_met: # 授予奖章 user_badge = UserBadge( user_id=user_id, badge_id=badge.id, unlocked_at=datetime.now(), is_notified=False ) self.db.add(user_badge) # 如果有经验奖励,添加经验值 if badge.exp_reward > 0: from app.services.level_service import LevelService level_service = LevelService(self.db) await level_service.add_exp( user_id=user_id, exp_amount=badge.exp_reward, exp_type=ExpType.BADGE, description=f"解锁奖章「{badge.name}」" ) newly_awarded.append({ "badge_id": badge.id, "code": badge.code, "name": badge.name, "description": badge.description, "icon": badge.icon, "exp_reward": badge.exp_reward, }) logger.info(f"用户 {user_id} 解锁奖章: {badge.name}") if newly_awarded: await self.db.flush() return newly_awarded def _check_badge_condition(self, badge: BadgeDefinition, stats: Dict[str, Any]) -> bool: """ 检查奖章解锁条件 Args: badge: 奖章定义 stats: 用户统计数据 Returns: 是否满足条件 """ condition_field = badge.condition_field condition_value = badge.condition_value condition_type = badge.condition_type if not condition_field: return False current_value = stats.get(condition_field, 0) if condition_type == ConditionType.COUNT: return current_value >= condition_value elif condition_type == ConditionType.SCORE: return current_value >= condition_value elif condition_type == ConditionType.STREAK: return current_value >= condition_value elif condition_type == ConditionType.LEVEL: return current_value >= condition_value elif condition_type == ConditionType.DURATION: return current_value >= condition_value return False async def award_badge(self, user_id: int, badge_code: str) -> Optional[Dict[str, Any]]: """ 直接授予用户奖章(用于特殊奖章) Args: user_id: 用户ID badge_code: 奖章编码 Returns: 奖章信息(如果成功) """ # 获取奖章定义 result = await self.db.execute( select(BadgeDefinition).where(BadgeDefinition.code == badge_code) ) badge = result.scalar_one_or_none() if not badge: logger.warning(f"奖章不存在: {badge_code}") return None # 检查是否已拥有 result = await self.db.execute( select(UserBadge).where( UserBadge.user_id == user_id, UserBadge.badge_id == badge.id ) ) if result.scalar_one_or_none(): logger.info(f"用户 {user_id} 已拥有奖章: {badge_code}") return None # 授予奖章 user_badge = UserBadge( user_id=user_id, badge_id=badge.id, unlocked_at=datetime.now(), is_notified=False ) self.db.add(user_badge) # 添加经验值奖励 if badge.exp_reward > 0: from app.services.level_service import LevelService level_service = LevelService(self.db) await level_service.add_exp( user_id=user_id, exp_amount=badge.exp_reward, exp_type=ExpType.BADGE, description=f"解锁奖章「{badge.name}」" ) await self.db.flush() logger.info(f"用户 {user_id} 获得奖章: {badge.name}") return { "badge_id": badge.id, "code": badge.code, "name": badge.name, "description": badge.description, "icon": badge.icon, "exp_reward": badge.exp_reward, } async def get_unnotified_badges(self, user_id: int) -> List[Dict[str, Any]]: """ 获取用户未通知的新奖章 Args: user_id: 用户ID Returns: 未通知的奖章列表 """ result = await self.db.execute( select(UserBadge, BadgeDefinition) .join(BadgeDefinition, UserBadge.badge_id == BadgeDefinition.id) .where( UserBadge.user_id == user_id, UserBadge.is_notified == False ) .order_by(UserBadge.unlocked_at.desc()) ) rows = result.all() return [ { "user_badge_id": user_badge.id, "badge_id": badge.id, "code": badge.code, "name": badge.name, "description": badge.description, "icon": badge.icon, "exp_reward": badge.exp_reward, "unlocked_at": user_badge.unlocked_at.isoformat() if user_badge.unlocked_at else None, } for user_badge, badge in rows ] async def mark_badges_notified(self, user_id: int, badge_ids: List[int] = None): """ 标记奖章为已通知 Args: user_id: 用户ID badge_ids: 要标记的奖章ID列表(为空则标记全部) """ from sqlalchemy import update query = update(UserBadge).where( UserBadge.user_id == user_id, UserBadge.is_notified == False ) if badge_ids: query = query.where(UserBadge.badge_id.in_(badge_ids)) query = query.values( is_notified=True, notified_at=datetime.now() ) await self.db.execute(query) await self.db.flush() async def check_badges_by_category( self, user_id: int, categories: List[str] ) -> List[Dict[str, Any]]: """ 按类别检查并授予奖章(优化触发时机) Args: user_id: 用户ID categories: 要检查的奖章类别列表 Returns: 新获得的奖章列表 """ # 获取用户统计数据 stats = await self._get_user_stats(user_id) # 获取指定类别的奖章定义 result = await self.db.execute( select(BadgeDefinition) .where( BadgeDefinition.is_active == True, BadgeDefinition.category.in_(categories) ) .order_by(BadgeDefinition.sort_order) ) category_badges = list(result.scalars().all()) # 获取用户已有的奖章 result = await self.db.execute( select(UserBadge.badge_id).where(UserBadge.user_id == user_id) ) owned_badge_ids = {row[0] for row in result.all()} # 检查每个奖章的解锁条件 newly_awarded = [] for badge in category_badges: if badge.id in owned_badge_ids: continue # 检查条件 condition_met = self._check_badge_condition(badge, stats) if condition_met: # 授予奖章 user_badge = UserBadge( user_id=user_id, badge_id=badge.id, unlocked_at=datetime.now(), is_notified=False ) self.db.add(user_badge) # 如果有经验奖励,添加经验值 if badge.exp_reward > 0: from app.services.level_service import LevelService level_service = LevelService(self.db) await level_service.add_exp( user_id=user_id, exp_amount=badge.exp_reward, exp_type=ExpType.BADGE, description=f"解锁奖章「{badge.name}」" ) newly_awarded.append({ "badge_id": badge.id, "code": badge.code, "name": badge.name, "description": badge.description, "icon": badge.icon, "exp_reward": badge.exp_reward, }) logger.info(f"用户 {user_id} 解锁奖章: {badge.name}") if newly_awarded: await self.db.flush() return newly_awarded async def check_exam_badges(self, user_id: int) -> List[Dict[str, Any]]: """考试后检查考试类奖章""" return await self.check_badges_by_category(user_id, [BadgeCategory.EXAM]) async def check_practice_badges(self, user_id: int) -> List[Dict[str, Any]]: """练习后检查练习类奖章""" return await self.check_badges_by_category(user_id, [BadgeCategory.PRACTICE]) async def check_streak_badges(self, user_id: int) -> List[Dict[str, Any]]: """签到后检查连续打卡类奖章""" return await self.check_badges_by_category(user_id, [BadgeCategory.STREAK, BadgeCategory.LEARNING]) async def check_level_badges(self, user_id: int) -> List[Dict[str, Any]]: """等级变化后检查等级类奖章""" return await self.check_badges_by_category(user_id, [BadgeCategory.SPECIAL])