Files
012-kaopeilian/backend/app/services/badge_service.py
yuliang_guo 64f5d567fa
Some checks failed
continuous-integration/drone/push Build is failing
feat: 实现 KPL 系统功能改进计划
1. 课程学习进度追踪
   - 新增 UserCourseProgress 和 UserMaterialProgress 模型
   - 新增 /api/v1/progress/* 进度追踪 API
   - 更新 admin.py 使用真实课程完成率数据

2. 路由权限检查完善
   - 新增前端 permissionChecker.ts 权限检查工具
   - 更新 router/guard.ts 实现团队和课程权限验证
   - 新增后端 permission_service.py

3. AI 陪练音频转文本
   - 新增 speech_recognition.py 语音识别服务
   - 新增 /api/v1/speech/* API
   - 更新 ai-practice-coze.vue 支持语音输入

4. 双人对练报告生成
   - 更新 practice_room_service.py 添加报告生成功能
   - 新增 /rooms/{room_code}/report API
   - 更新 duo-practice-report.vue 调用真实 API

5. 学习提醒推送
   - 新增 notification_service.py 通知服务
   - 新增 scheduler_service.py 定时任务服务
   - 支持钉钉、企微、站内消息推送

6. 智能学习推荐
   - 新增 recommendation_service.py 推荐服务
   - 新增 /api/v1/recommendations/* API
   - 支持错题、能力、进度、热门多维度推荐

7. 安全问题修复
   - DEBUG 默认值改为 False
   - 添加 SECRET_KEY 安全警告
   - 新增 check_security_settings() 检查函数

8. 证书 PDF 生成
   - 更新 certificate_service.py 添加 PDF 生成
   - 添加 weasyprint、Pillow、qrcode 依赖
   - 更新下载 API 支持 PDF 和 PNG 格式
2026-01-30 14:22:35 +08:00

587 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
奖章服务
提供奖章管理功能:
- 获取奖章定义
- 检查奖章解锁条件
- 授予奖章
- 获取用户奖章
"""
from datetime import datetime
from typing import Optional, List, Dict, Any, Tuple
from sqlalchemy import select, func, and_, or_, case
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.logger import get_logger
from app.models.level import (
BadgeDefinition, UserBadge, UserLevel, ExpHistory, ExpType,
BadgeCategory, ConditionType
)
from app.models.exam import Exam
from app.models.practice import PracticeSession, PracticeReport
from app.models.training import TrainingSession, TrainingReport
from app.models.task import TaskAssignment
logger = get_logger(__name__)
class BadgeService:
"""奖章服务"""
def __init__(self, db: AsyncSession):
self.db = db
self._badge_definitions: Optional[List[BadgeDefinition]] = None
async def _get_badge_definitions(self) -> List[BadgeDefinition]:
"""获取所有奖章定义(带缓存)"""
if self._badge_definitions is None:
result = await self.db.execute(
select(BadgeDefinition)
.where(BadgeDefinition.is_active == True)
.order_by(BadgeDefinition.sort_order)
)
self._badge_definitions = list(result.scalars().all())
return self._badge_definitions
async def get_all_badges(self) -> List[Dict[str, Any]]:
"""
获取所有奖章定义
Returns:
奖章定义列表
"""
badges = await self._get_badge_definitions()
return [
{
"id": b.id,
"code": b.code,
"name": b.name,
"description": b.description,
"icon": b.icon,
"category": b.category,
"condition_type": b.condition_type,
"condition_value": b.condition_value,
"exp_reward": b.exp_reward,
}
for b in badges
]
async def get_user_badges(self, user_id: int) -> List[Dict[str, Any]]:
"""
获取用户已解锁的奖章
Args:
user_id: 用户ID
Returns:
用户奖章列表
"""
result = await self.db.execute(
select(UserBadge, BadgeDefinition)
.join(BadgeDefinition, UserBadge.badge_id == BadgeDefinition.id)
.where(UserBadge.user_id == user_id)
.order_by(UserBadge.unlocked_at.desc())
)
rows = result.all()
return [
{
"id": user_badge.id,
"badge_id": badge.id,
"code": badge.code,
"name": badge.name,
"description": badge.description,
"icon": badge.icon,
"category": badge.category,
"unlocked_at": user_badge.unlocked_at.isoformat() if user_badge.unlocked_at else None,
"is_notified": user_badge.is_notified,
}
for user_badge, badge in rows
]
async def get_user_badges_with_status(self, user_id: int) -> List[Dict[str, Any]]:
"""
获取所有奖章及用户解锁状态
Args:
user_id: 用户ID
Returns:
所有奖章列表(含解锁状态)
"""
# 获取所有奖章定义
all_badges = await self._get_badge_definitions()
# 获取用户已解锁的奖章
result = await self.db.execute(
select(UserBadge).where(UserBadge.user_id == user_id)
)
user_badges = {ub.badge_id: ub for ub in result.scalars().all()}
badges_with_status = []
for badge in all_badges:
user_badge = user_badges.get(badge.id)
badges_with_status.append({
"id": badge.id,
"code": badge.code,
"name": badge.name,
"description": badge.description,
"icon": badge.icon,
"category": badge.category,
"condition_type": badge.condition_type,
"condition_value": badge.condition_value,
"exp_reward": badge.exp_reward,
"unlocked": user_badge is not None,
"unlocked_at": user_badge.unlocked_at.isoformat() if user_badge else None,
})
return badges_with_status
async def _get_user_stats(self, user_id: int) -> Dict[str, Any]:
"""
获取用户统计数据(用于检查奖章条件)
Args:
user_id: 用户ID
Returns:
统计数据字典
"""
stats = {
"login_count": 0,
"login_streak": 0,
"course_completed": 0,
"exam_passed": 0,
"exam_perfect_count": 0,
"exam_excellent": 0,
"practice_count": 0,
"practice_hours": 0,
"training_count": 0,
"first_practice_90": 0,
"user_level": 1,
}
try:
# 获取用户等级信息
result = await self.db.execute(
select(UserLevel).where(UserLevel.user_id == user_id)
)
user_level = result.scalar_one_or_none()
if user_level:
stats["login_streak"] = user_level.login_streak or 0
stats["user_level"] = user_level.level or 1
# 获取登录/签到次数(从经验值历史)
result = await self.db.execute(
select(func.count(ExpHistory.id))
.where(
ExpHistory.user_id == user_id,
ExpHistory.exp_type == ExpType.LOGIN
)
)
stats["login_count"] = result.scalar() or 0
# 获取考试统计 - 使用 case 语句
# 通过考试数量
result = await self.db.execute(
select(func.count(Exam.id))
.where(
Exam.user_id == user_id,
Exam.is_passed == True,
Exam.status == "submitted"
)
)
stats["exam_passed"] = result.scalar() or 0
# 满分考试数量score >= 总分,通常是 100
result = await self.db.execute(
select(func.count(Exam.id))
.where(
Exam.user_id == user_id,
Exam.status == "submitted",
Exam.score >= Exam.total_score
)
)
stats["exam_perfect_count"] = result.scalar() or 0
# 优秀考试数量90分以上
result = await self.db.execute(
select(func.count(Exam.id))
.where(
Exam.user_id == user_id,
Exam.status == "submitted",
Exam.score >= 90
)
)
stats["exam_excellent"] = result.scalar() or 0
# 获取练习统计PracticeSession - AI 陪练)
result = await self.db.execute(
select(
func.count(PracticeSession.id),
func.coalesce(func.sum(PracticeSession.duration_seconds), 0)
)
.where(
PracticeSession.user_id == user_id,
PracticeSession.status == "completed"
)
)
row = result.first()
if row:
stats["practice_count"] = row[0] or 0
total_seconds = row[1] or 0
stats["practice_hours"] = float(total_seconds) / 3600.0
# 获取培训/陪练统计TrainingSession
result = await self.db.execute(
select(func.count(TrainingSession.id))
.where(
TrainingSession.user_id == user_id,
TrainingSession.status == "COMPLETED"
)
)
stats["training_count"] = result.scalar() or 0
# 检查是否有高分陪练90分以上
result = await self.db.execute(
select(func.count(TrainingReport.id))
.where(
TrainingReport.user_id == user_id,
TrainingReport.overall_score >= 90
)
)
high_score_count = result.scalar() or 0
stats["first_practice_90"] = 1 if high_score_count > 0 else 0
logger.debug(f"用户 {user_id} 奖章统计数据: {stats}")
except Exception as e:
logger.error(f"获取用户统计数据失败: {e}")
return stats
async def check_and_award_badges(self, user_id: int) -> List[Dict[str, Any]]:
"""
检查并授予用户符合条件的奖章
Args:
user_id: 用户ID
Returns:
新获得的奖章列表
"""
# 获取用户统计数据
stats = await self._get_user_stats(user_id)
# 获取所有奖章定义
all_badges = await self._get_badge_definitions()
# 获取用户已有的奖章
result = await self.db.execute(
select(UserBadge.badge_id).where(UserBadge.user_id == user_id)
)
owned_badge_ids = {row[0] for row in result.all()}
# 检查每个奖章的解锁条件
newly_awarded = []
for badge in all_badges:
if badge.id in owned_badge_ids:
continue
# 检查条件
condition_met = self._check_badge_condition(badge, stats)
if condition_met:
# 授予奖章
user_badge = UserBadge(
user_id=user_id,
badge_id=badge.id,
unlocked_at=datetime.now(),
is_notified=False
)
self.db.add(user_badge)
# 如果有经验奖励,添加经验值
if badge.exp_reward > 0:
from app.services.level_service import LevelService
level_service = LevelService(self.db)
await level_service.add_exp(
user_id=user_id,
exp_amount=badge.exp_reward,
exp_type=ExpType.BADGE,
description=f"解锁奖章「{badge.name}"
)
newly_awarded.append({
"badge_id": badge.id,
"code": badge.code,
"name": badge.name,
"description": badge.description,
"icon": badge.icon,
"exp_reward": badge.exp_reward,
})
logger.info(f"用户 {user_id} 解锁奖章: {badge.name}")
if newly_awarded:
await self.db.flush()
return newly_awarded
def _check_badge_condition(self, badge: BadgeDefinition, stats: Dict[str, Any]) -> bool:
"""
检查奖章解锁条件
Args:
badge: 奖章定义
stats: 用户统计数据
Returns:
是否满足条件
"""
condition_field = badge.condition_field
condition_value = badge.condition_value
condition_type = badge.condition_type
if not condition_field:
return False
current_value = stats.get(condition_field, 0)
if condition_type == ConditionType.COUNT:
return current_value >= condition_value
elif condition_type == ConditionType.SCORE:
return current_value >= condition_value
elif condition_type == ConditionType.STREAK:
return current_value >= condition_value
elif condition_type == ConditionType.LEVEL:
return current_value >= condition_value
elif condition_type == ConditionType.DURATION:
return current_value >= condition_value
return False
async def award_badge(self, user_id: int, badge_code: str) -> Optional[Dict[str, Any]]:
"""
直接授予用户奖章(用于特殊奖章)
Args:
user_id: 用户ID
badge_code: 奖章编码
Returns:
奖章信息(如果成功)
"""
# 获取奖章定义
result = await self.db.execute(
select(BadgeDefinition).where(BadgeDefinition.code == badge_code)
)
badge = result.scalar_one_or_none()
if not badge:
logger.warning(f"奖章不存在: {badge_code}")
return None
# 检查是否已拥有
result = await self.db.execute(
select(UserBadge).where(
UserBadge.user_id == user_id,
UserBadge.badge_id == badge.id
)
)
if result.scalar_one_or_none():
logger.info(f"用户 {user_id} 已拥有奖章: {badge_code}")
return None
# 授予奖章
user_badge = UserBadge(
user_id=user_id,
badge_id=badge.id,
unlocked_at=datetime.now(),
is_notified=False
)
self.db.add(user_badge)
# 添加经验值奖励
if badge.exp_reward > 0:
from app.services.level_service import LevelService
level_service = LevelService(self.db)
await level_service.add_exp(
user_id=user_id,
exp_amount=badge.exp_reward,
exp_type=ExpType.BADGE,
description=f"解锁奖章「{badge.name}"
)
await self.db.flush()
logger.info(f"用户 {user_id} 获得奖章: {badge.name}")
return {
"badge_id": badge.id,
"code": badge.code,
"name": badge.name,
"description": badge.description,
"icon": badge.icon,
"exp_reward": badge.exp_reward,
}
async def get_unnotified_badges(self, user_id: int) -> List[Dict[str, Any]]:
"""
获取用户未通知的新奖章
Args:
user_id: 用户ID
Returns:
未通知的奖章列表
"""
result = await self.db.execute(
select(UserBadge, BadgeDefinition)
.join(BadgeDefinition, UserBadge.badge_id == BadgeDefinition.id)
.where(
UserBadge.user_id == user_id,
UserBadge.is_notified == False
)
.order_by(UserBadge.unlocked_at.desc())
)
rows = result.all()
return [
{
"user_badge_id": user_badge.id,
"badge_id": badge.id,
"code": badge.code,
"name": badge.name,
"description": badge.description,
"icon": badge.icon,
"exp_reward": badge.exp_reward,
"unlocked_at": user_badge.unlocked_at.isoformat() if user_badge.unlocked_at else None,
}
for user_badge, badge in rows
]
async def mark_badges_notified(self, user_id: int, badge_ids: List[int] = None):
"""
标记奖章为已通知
Args:
user_id: 用户ID
badge_ids: 要标记的奖章ID列表为空则标记全部
"""
from sqlalchemy import update
query = update(UserBadge).where(
UserBadge.user_id == user_id,
UserBadge.is_notified == False
)
if badge_ids:
query = query.where(UserBadge.badge_id.in_(badge_ids))
query = query.values(
is_notified=True,
notified_at=datetime.now()
)
await self.db.execute(query)
await self.db.flush()
async def check_badges_by_category(
self,
user_id: int,
categories: List[str]
) -> List[Dict[str, Any]]:
"""
按类别检查并授予奖章(优化触发时机)
Args:
user_id: 用户ID
categories: 要检查的奖章类别列表
Returns:
新获得的奖章列表
"""
# 获取用户统计数据
stats = await self._get_user_stats(user_id)
# 获取指定类别的奖章定义
result = await self.db.execute(
select(BadgeDefinition)
.where(
BadgeDefinition.is_active == True,
BadgeDefinition.category.in_(categories)
)
.order_by(BadgeDefinition.sort_order)
)
category_badges = list(result.scalars().all())
# 获取用户已有的奖章
result = await self.db.execute(
select(UserBadge.badge_id).where(UserBadge.user_id == user_id)
)
owned_badge_ids = {row[0] for row in result.all()}
# 检查每个奖章的解锁条件
newly_awarded = []
for badge in category_badges:
if badge.id in owned_badge_ids:
continue
# 检查条件
condition_met = self._check_badge_condition(badge, stats)
if condition_met:
# 授予奖章
user_badge = UserBadge(
user_id=user_id,
badge_id=badge.id,
unlocked_at=datetime.now(),
is_notified=False
)
self.db.add(user_badge)
# 如果有经验奖励,添加经验值
if badge.exp_reward > 0:
from app.services.level_service import LevelService
level_service = LevelService(self.db)
await level_service.add_exp(
user_id=user_id,
exp_amount=badge.exp_reward,
exp_type=ExpType.BADGE,
description=f"解锁奖章「{badge.name}"
)
newly_awarded.append({
"badge_id": badge.id,
"code": badge.code,
"name": badge.name,
"description": badge.description,
"icon": badge.icon,
"exp_reward": badge.exp_reward,
})
logger.info(f"用户 {user_id} 解锁奖章: {badge.name}")
if newly_awarded:
await self.db.flush()
return newly_awarded
async def check_exam_badges(self, user_id: int) -> List[Dict[str, Any]]:
"""考试后检查考试类奖章"""
return await self.check_badges_by_category(user_id, [BadgeCategory.EXAM])
async def check_practice_badges(self, user_id: int) -> List[Dict[str, Any]]:
"""练习后检查练习类奖章"""
return await self.check_badges_by_category(user_id, [BadgeCategory.PRACTICE])
async def check_streak_badges(self, user_id: int) -> List[Dict[str, Any]]:
"""签到后检查连续打卡类奖章"""
return await self.check_badges_by_category(user_id, [BadgeCategory.STREAK, BadgeCategory.LEARNING])
async def check_level_badges(self, user_id: int) -> List[Dict[str, Any]]:
"""等级变化后检查等级类奖章"""
return await self.check_badges_by_category(user_id, [BadgeCategory.SPECIAL])