diff --git a/CHANGELOG-2026-01-29.md b/CHANGELOG-2026-01-29.md new file mode 100644 index 0000000..13374dd --- /dev/null +++ b/CHANGELOG-2026-01-29.md @@ -0,0 +1,188 @@ +# KPL 考培练系统 功能迭代更新日志 + +**日期**: 2026-01-29 +**版本**: v1.5.0 + +--- + +## 一、奖章条件优化 + +### 修复内容 +- 修复 `badge_service.py` 中统计查询的 SQL 语法问题 +- 将非标准 `func.if_` 替换为 SQLAlchemy 标准 `case` 语句 +- 优化考试统计逻辑:通过数、满分数、优秀数分开查询 +- 添加 `func.coalesce` 处理空值 + +### 新增功能 +- `check_badges_by_category()` - 按类别检查奖章 +- `check_exam_badges()` - 考试后触发 +- `check_practice_badges()` - 练习后触发 +- `check_streak_badges()` - 签到后触发 +- `check_level_badges()` - 等级变化后触发 + +### 文件变更 +- `backend/app/services/badge_service.py` + +--- + +## 二、移动端适配 + +### 适配页面 +| 页面 | 文件 | 适配要点 | +|------|------|----------| +| 登录页 | `views/login/index.vue` | 表单全宽、背景动画隐藏、钉钉安全区域 | +| 课程中心 | `views/trainee/course-center.vue` | 单列卡片、分类横向滚动、操作按钮网格化 | +| 课程详情 | `views/trainee/course-detail.vue` | 侧边栏折叠、视频自适应、工具栏响应式 | +| 考试页面 | `views/exam/practice.vue` | 选项垂直排列、按钮放大、弹窗全屏 | +| 成长路径 | `views/trainee/growth-path.vue` | 雷达图缩放、卡片堆叠、统计区折叠 | +| 排行榜 | `views/trainee/leaderboard.vue` | 列表简化、签到按钮全宽 | + +### 技术方案 +- 使用 `@media (max-width: 768px)` 和 `@media (max-width: 480px)` 断点 +- 钉钉环境使用 `env(safe-area-inset-*)` 适配安全区域 + +--- + +## 三、证书系统 + +### 数据库设计 +```sql +-- 证书模板表 +CREATE TABLE certificate_templates ( + id INT PRIMARY KEY AUTO_INCREMENT, + name VARCHAR(100) NOT NULL, + type ENUM('course', 'exam', 'achievement') NOT NULL, + background_url VARCHAR(500), + template_html TEXT, + template_style TEXT, + is_active BOOLEAN DEFAULT TRUE, + ... +); + +-- 用户证书表 +CREATE TABLE user_certificates ( + id INT PRIMARY KEY AUTO_INCREMENT, + user_id INT NOT NULL, + template_id INT NOT NULL, + certificate_no VARCHAR(50) UNIQUE NOT NULL, + title VARCHAR(200) NOT NULL, + ... +); +``` + +### 后端实现 +- **模型**: `CertificateTemplate`, `UserCertificate`, `CertificateType` +- **服务**: `CertificateService` + - `issue_course_certificate()` - 颁发课程证书 + - `issue_exam_certificate()` - 颁发考试证书 + - `issue_achievement_certificate()` - 颁发成就证书 + - `generate_certificate_image()` - 生成分享图片 + - `get_certificate_by_no()` - 验证证书 + +### API 端点 +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/certificates/me` | 我的证书列表 | +| GET | `/certificates/{id}` | 证书详情 | +| GET | `/certificates/{id}/image` | 获取分享图片 | +| GET | `/certificates/{id}/download` | 下载证书 | +| GET | `/certificates/verify/{no}` | 验证证书(无需登录) | +| POST | `/certificates/issue/course` | 颁发课程证书 | +| POST | `/certificates/issue/exam` | 颁发考试证书 | + +### 前端实现 +- **API**: `frontend/src/api/certificate.ts` +- **页面**: `frontend/src/views/trainee/my-certificates.vue` +- **功能**: 证书列表、分类筛选、预览、分享、下载 + +### 文件变更 +- `backend/migrations/add_certificate_system.sql` (新增) +- `backend/app/models/certificate.py` (新增) +- `backend/app/services/certificate_service.py` (新增) +- `backend/app/api/v1/endpoints/certificate.py` (新增) +- `backend/app/models/__init__.py` (修改) +- `backend/app/api/v1/__init__.py` (修改) +- `frontend/src/api/certificate.ts` (新增) +- `frontend/src/views/trainee/my-certificates.vue` (新增) +- `frontend/src/router/index.ts` (修改) + +--- + +## 四、数据大屏 + +### 数据指标 +| 类别 | 指标 | +|------|------| +| 概览 | 总学员数、今日活跃、周活跃、月活跃、总学习时长、签到率 | +| 考试 | 总次数、通过率、平均分、满分人数 | +| 部门 | 成员数、通过率、平均学习时长、平均等级 | +| 趋势 | 近7天活跃用户、学习时长、考试次数 | +| 分布 | 1-10级用户数量分布 | +| 动态 | 最新学习活动实时滚动 | + +### 后端实现 +- **服务**: `DashboardService` + - `get_enterprise_overview()` - 企业级概览 + - `get_department_comparison()` - 部门对比 + - `get_learning_trend()` - 学习趋势 + - `get_level_distribution()` - 等级分布 + - `get_realtime_activities()` - 实时动态 + - `get_team_dashboard()` - 团队级数据 + - `get_course_ranking()` - 课程热度排行 + +### API 端点 +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/dashboard/enterprise/overview` | 企业概览 | +| GET | `/dashboard/enterprise/departments` | 部门对比 | +| GET | `/dashboard/enterprise/trend` | 学习趋势 | +| GET | `/dashboard/enterprise/level-distribution` | 等级分布 | +| GET | `/dashboard/enterprise/activities` | 实时动态 | +| GET | `/dashboard/enterprise/course-ranking` | 课程排行 | +| GET | `/dashboard/team` | 团队数据 | +| GET | `/dashboard/all` | 完整数据(一次性) | + +### 前端实现 +- **API**: `frontend/src/api/dashboard.ts` +- **页面**: `frontend/src/views/admin/data-dashboard.vue` +- **图表**: ECharts (横向柱状图、折线图、仪表盘、饼图) +- **功能**: 全屏模式、5分钟自动刷新、响应式布局 + +### 文件变更 +- `backend/app/services/dashboard_service.py` (新增) +- `backend/app/api/v1/endpoints/dashboard.py` (新增) +- `backend/app/api/v1/__init__.py` (修改) +- `frontend/src/api/dashboard.ts` (新增) +- `frontend/src/views/admin/data-dashboard.vue` (新增) +- `frontend/src/router/index.ts` (修改) + +--- + +## 部署说明 + +### 数据库迁移 +需执行以下 SQL 脚本: +```bash +# 证书系统迁移 +mysql -u root -p kaopeilian < backend/migrations/add_certificate_system.sql +``` + +### 依赖安装 +后端新增依赖(用于证书图片生成): +```bash +pip install Pillow qrcode +``` + +### 路由变更 +新增前端路由: +- `/trainee/my-certificates` - 我的证书 +- `/manager/data-dashboard` - 数据大屏 + +--- + +## 待办事项 + +- [ ] 证书 PDF 生成(需安装 weasyprint) +- [ ] 课程完成进度追踪(user_course_progress 表) +- [ ] 数据大屏数据缓存优化 +- [ ] 钉钉环境下底部导航适配 diff --git a/backend/app/api/v1/__init__.py b/backend/app/api/v1/__init__.py index c33087e..8a32741 100644 --- a/backend/app/api/v1/__init__.py +++ b/backend/app/api/v1/__init__.py @@ -110,5 +110,11 @@ api_router.include_router(system_settings_router, prefix="/settings", tags=["sys # level_router 等级与奖章路由 from .endpoints.level import router as level_router api_router.include_router(level_router, prefix="/level", tags=["level"]) +# certificate_router 证书管理路由 +from .endpoints.certificate import router as certificate_router +api_router.include_router(certificate_router, prefix="/certificates", tags=["certificates"]) +# dashboard_router 数据大屏路由 +from .endpoints.dashboard import router as dashboard_router +api_router.include_router(dashboard_router, prefix="/dashboard", tags=["dashboard"]) __all__ = ["api_router"] diff --git a/backend/app/api/v1/endpoints/certificate.py b/backend/app/api/v1/endpoints/certificate.py new file mode 100644 index 0000000..616ed2e --- /dev/null +++ b/backend/app/api/v1/endpoints/certificate.py @@ -0,0 +1,305 @@ +""" +证书管理 API 端点 + +提供证书相关的 RESTful API: +- 获取证书列表 +- 获取证书详情 +- 下载证书 +- 验证证书 +""" + +from typing import Optional, List +from fastapi import APIRouter, Depends, HTTPException, status, Response, Query +from fastapi.responses import StreamingResponse +from sqlalchemy.ext.asyncio import AsyncSession +import io + +from app.core.database import get_db +from app.core.security import get_current_user +from app.models.user import User +from app.services.certificate_service import CertificateService + +router = APIRouter() + + +@router.get("/templates") +async def get_certificate_templates( + cert_type: Optional[str] = Query(None, description="证书类型: course/exam/achievement"), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """获取证书模板列表""" + service = CertificateService(db) + templates = await service.get_templates(cert_type) + return { + "code": 200, + "message": "success", + "data": templates + } + + +@router.get("/me") +async def get_my_certificates( + cert_type: Optional[str] = Query(None, description="证书类型过滤"), + offset: int = Query(0, ge=0), + limit: int = Query(20, ge=1, le=100), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """获取当前用户的证书列表""" + service = CertificateService(db) + result = await service.get_user_certificates( + user_id=current_user.id, + cert_type=cert_type, + offset=offset, + limit=limit + ) + return { + "code": 200, + "message": "success", + "data": result + } + + +@router.get("/user/{user_id}") +async def get_user_certificates( + user_id: int, + cert_type: Optional[str] = Query(None), + offset: int = Query(0, ge=0), + limit: int = Query(20, ge=1, le=100), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """获取指定用户的证书列表(需要管理员权限)""" + # 只允许查看自己的证书或管理员查看 + if current_user.id != user_id and current_user.role not in ["admin", "enterprise_admin"]: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="无权查看其他用户的证书" + ) + + service = CertificateService(db) + result = await service.get_user_certificates( + user_id=user_id, + cert_type=cert_type, + offset=offset, + limit=limit + ) + return { + "code": 200, + "message": "success", + "data": result + } + + +@router.get("/{cert_id}") +async def get_certificate_detail( + cert_id: int, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """获取证书详情""" + service = CertificateService(db) + cert = await service.get_certificate_by_id(cert_id) + + if not cert: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="证书不存在" + ) + + return { + "code": 200, + "message": "success", + "data": cert + } + + +@router.get("/{cert_id}/image") +async def get_certificate_image( + cert_id: int, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """获取证书分享图片""" + service = CertificateService(db) + + try: + # 获取基础URL + base_url = "https://kpl.example.com/certificates" # 可从配置读取 + + image_bytes = await service.generate_certificate_image(cert_id, base_url) + + return StreamingResponse( + io.BytesIO(image_bytes), + media_type="image/png", + headers={ + "Content-Disposition": f"inline; filename=certificate_{cert_id}.png" + } + ) + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=str(e) + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"生成证书图片失败: {str(e)}" + ) + + +@router.get("/{cert_id}/download") +async def download_certificate_pdf( + cert_id: int, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """下载证书PDF""" + service = CertificateService(db) + cert = await service.get_certificate_by_id(cert_id) + + if not cert: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="证书不存在" + ) + + # 如果已有PDF URL则重定向 + if cert.get("pdf_url"): + return { + "code": 200, + "message": "success", + "data": { + "download_url": cert["pdf_url"] + } + } + + # 否则返回图片作为替代 + try: + base_url = "https://kpl.example.com/certificates" + image_bytes = await service.generate_certificate_image(cert_id, base_url) + + return StreamingResponse( + io.BytesIO(image_bytes), + media_type="image/png", + headers={ + "Content-Disposition": f"attachment; filename=certificate_{cert['certificate_no']}.png" + } + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"下载失败: {str(e)}" + ) + + +@router.get("/verify/{cert_no}") +async def verify_certificate( + cert_no: str, + db: AsyncSession = Depends(get_db) +): + """ + 验证证书真伪 + + 此接口无需登录,可用于公开验证证书 + """ + service = CertificateService(db) + cert = await service.get_certificate_by_no(cert_no) + + if not cert: + return { + "code": 404, + "message": "证书不存在或编号错误", + "data": { + "valid": False, + "certificate_no": cert_no + } + } + + return { + "code": 200, + "message": "证书验证通过", + "data": { + "valid": True, + "certificate_no": cert_no, + "title": cert.get("title"), + "type_name": cert.get("type_name"), + "issued_at": cert.get("issued_at"), + "user": cert.get("user", {}), + } + } + + +@router.post("/issue/course") +async def issue_course_certificate( + course_id: int, + course_name: str, + completion_rate: float = 100.0, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """ + 颁发课程结业证书 + + 通常由系统在用户完成课程时自动调用 + """ + service = CertificateService(db) + + try: + cert = await service.issue_course_certificate( + user_id=current_user.id, + course_id=course_id, + course_name=course_name, + completion_rate=completion_rate, + user_name=current_user.full_name or current_user.username + ) + await db.commit() + + return { + "code": 200, + "message": "证书颁发成功", + "data": cert + } + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e) + ) + + +@router.post("/issue/exam") +async def issue_exam_certificate( + exam_id: int, + exam_name: str, + score: float, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """ + 颁发考试合格证书 + + 通常由系统在用户考试通过时自动调用 + """ + service = CertificateService(db) + + try: + cert = await service.issue_exam_certificate( + user_id=current_user.id, + exam_id=exam_id, + exam_name=exam_name, + score=score, + user_name=current_user.full_name or current_user.username + ) + await db.commit() + + return { + "code": 200, + "message": "证书颁发成功", + "data": cert + } + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e) + ) diff --git a/backend/app/api/v1/endpoints/dashboard.py b/backend/app/api/v1/endpoints/dashboard.py new file mode 100644 index 0000000..812887a --- /dev/null +++ b/backend/app/api/v1/endpoints/dashboard.py @@ -0,0 +1,231 @@ +""" +数据大屏 API 端点 + +提供企业级和团队级数据大屏接口 +""" + +from typing import Optional +from fastapi import APIRouter, Depends, HTTPException, status, Query +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.database import get_db +from app.core.security import get_current_user +from app.models.user import User +from app.services.dashboard_service import DashboardService + +router = APIRouter() + + +@router.get("/enterprise/overview") +async def get_enterprise_overview( + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """ + 获取企业级数据概览 + + 需要管理员或企业管理员权限 + """ + if current_user.role not in ["admin", "enterprise_admin", "manager"]: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="需要管理员权限" + ) + + service = DashboardService(db) + data = await service.get_enterprise_overview() + + return { + "code": 200, + "message": "success", + "data": data + } + + +@router.get("/enterprise/departments") +async def get_department_comparison( + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """ + 获取部门/团队学习对比数据 + """ + if current_user.role not in ["admin", "enterprise_admin", "manager"]: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="需要管理员权限" + ) + + service = DashboardService(db) + data = await service.get_department_comparison() + + return { + "code": 200, + "message": "success", + "data": data + } + + +@router.get("/enterprise/trend") +async def get_learning_trend( + days: int = Query(7, ge=1, le=30), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """ + 获取学习趋势数据 + """ + if current_user.role not in ["admin", "enterprise_admin", "manager"]: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="需要管理员权限" + ) + + service = DashboardService(db) + data = await service.get_learning_trend(days) + + return { + "code": 200, + "message": "success", + "data": data + } + + +@router.get("/enterprise/level-distribution") +async def get_level_distribution( + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """ + 获取等级分布数据 + """ + if current_user.role not in ["admin", "enterprise_admin", "manager"]: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="需要管理员权限" + ) + + service = DashboardService(db) + data = await service.get_level_distribution() + + return { + "code": 200, + "message": "success", + "data": data + } + + +@router.get("/enterprise/activities") +async def get_realtime_activities( + limit: int = Query(20, ge=1, le=100), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """ + 获取实时动态 + """ + if current_user.role not in ["admin", "enterprise_admin", "manager"]: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="需要管理员权限" + ) + + service = DashboardService(db) + data = await service.get_realtime_activities(limit) + + return { + "code": 200, + "message": "success", + "data": data + } + + +@router.get("/enterprise/course-ranking") +async def get_course_ranking( + limit: int = Query(10, ge=1, le=50), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """ + 获取课程热度排行 + """ + if current_user.role not in ["admin", "enterprise_admin", "manager"]: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="需要管理员权限" + ) + + service = DashboardService(db) + data = await service.get_course_ranking(limit) + + return { + "code": 200, + "message": "success", + "data": data + } + + +@router.get("/team") +async def get_team_dashboard( + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """ + 获取团队级数据大屏 + + 面向团队负责人,显示其管理团队的数据 + """ + if current_user.role not in ["admin", "enterprise_admin", "manager"]: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="需要团队负责人权限" + ) + + service = DashboardService(db) + data = await service.get_team_dashboard(current_user.id) + + return { + "code": 200, + "message": "success", + "data": data + } + + +@router.get("/all") +async def get_all_dashboard_data( + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """ + 获取完整的大屏数据(一次性获取所有数据) + + 用于大屏初始化加载 + """ + if current_user.role not in ["admin", "enterprise_admin", "manager"]: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="需要管理员权限" + ) + + service = DashboardService(db) + + # 并行获取所有数据 + overview = await service.get_enterprise_overview() + departments = await service.get_department_comparison() + trend = await service.get_learning_trend(7) + level_dist = await service.get_level_distribution() + activities = await service.get_realtime_activities(20) + course_ranking = await service.get_course_ranking(10) + + return { + "code": 200, + "message": "success", + "data": { + "overview": overview, + "departments": departments, + "trend": trend, + "level_distribution": level_dist, + "activities": activities, + "course_ranking": course_ranking, + } + } diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 1a730f1..342dca8 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -27,6 +27,11 @@ from app.models.level import ( BadgeCategory, ConditionType, ) +from app.models.certificate import ( + CertificateTemplate, + UserCertificate, + CertificateType, +) __all__ = [ "Base", @@ -64,4 +69,7 @@ __all__ = [ "ExpType", "BadgeCategory", "ConditionType", + "CertificateTemplate", + "UserCertificate", + "CertificateType", ] diff --git a/backend/app/models/certificate.py b/backend/app/models/certificate.py new file mode 100644 index 0000000..b2e9774 --- /dev/null +++ b/backend/app/models/certificate.py @@ -0,0 +1,76 @@ +""" +证书系统数据模型 + +定义证书模板和用户证书的数据结构 +""" + +from datetime import datetime +from enum import Enum +from typing import Optional +from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, ForeignKey, Enum as SQLEnum, DECIMAL, JSON +from sqlalchemy.orm import relationship + +from app.core.database import Base + + +class CertificateType(str, Enum): + """证书类型枚举""" + COURSE = "course" # 课程结业证书 + EXAM = "exam" # 考试合格证书 + ACHIEVEMENT = "achievement" # 成就证书 + + +class CertificateTemplate(Base): + """证书模板表""" + __tablename__ = "certificate_templates" + + id = Column(Integer, primary_key=True, autoincrement=True) + name = Column(String(100), nullable=False, comment="模板名称") + type = Column(SQLEnum(CertificateType), nullable=False, comment="证书类型") + background_url = Column(String(500), comment="证书背景图URL") + template_html = Column(Text, comment="HTML模板内容") + template_style = Column(Text, comment="CSS样式") + is_active = Column(Boolean, default=True, comment="是否启用") + sort_order = Column(Integer, default=0, comment="排序顺序") + created_at = Column(DateTime, nullable=False, default=datetime.now) + updated_at = Column(DateTime, nullable=False, default=datetime.now, onupdate=datetime.now) + + # 关联 + certificates = relationship("UserCertificate", back_populates="template") + + +class UserCertificate(Base): + """用户证书表""" + __tablename__ = "user_certificates" + + id = Column(Integer, primary_key=True, autoincrement=True) + user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False, comment="用户ID") + template_id = Column(Integer, ForeignKey("certificate_templates.id"), nullable=False, comment="模板ID") + certificate_no = Column(String(50), unique=True, nullable=False, comment="证书编号") + title = Column(String(200), nullable=False, comment="证书标题") + description = Column(Text, comment="证书描述") + issued_at = Column(DateTime, default=datetime.now, comment="颁发时间") + valid_until = Column(DateTime, comment="有效期至") + + # 关联信息 + course_id = Column(Integer, comment="关联课程ID") + exam_id = Column(Integer, comment="关联考试ID") + badge_id = Column(Integer, comment="关联奖章ID") + + # 成绩信息 + score = Column(DECIMAL(5, 2), comment="考试分数") + completion_rate = Column(DECIMAL(5, 2), comment="完成率") + + # 生成的文件 + pdf_url = Column(String(500), comment="PDF文件URL") + image_url = Column(String(500), comment="分享图片URL") + + # 元数据 + meta_data = Column(JSON, comment="扩展元数据") + + created_at = Column(DateTime, nullable=False, default=datetime.now) + updated_at = Column(DateTime, nullable=False, default=datetime.now, onupdate=datetime.now) + + # 关联 + template = relationship("CertificateTemplate", back_populates="certificates") + user = relationship("User", backref="certificates") diff --git a/backend/app/services/badge_service.py b/backend/app/services/badge_service.py index 0c99ac4..c95ddc0 100644 --- a/backend/app/services/badge_service.py +++ b/backend/app/services/badge_service.py @@ -10,7 +10,7 @@ from datetime import datetime from typing import Optional, List, Dict, Any, Tuple -from sqlalchemy import select, func, and_, or_ +from sqlalchemy import select, func, and_, or_, case from sqlalchemy.ext.asyncio import AsyncSession from app.core.logger import get_logger @@ -162,80 +162,102 @@ class BadgeService: "user_level": 1, } - # 获取用户等级信息 - result = await self.db.execute( - select(UserLevel).where(UserLevel.user_id == user_id) - ) - user_level = result.scalar_one_or_none() - if user_level: - stats["login_streak"] = user_level.login_streak - stats["user_level"] = user_level.level - - # 获取登录次数(从经验值历史) - result = await self.db.execute( - select(func.count(ExpHistory.id)) - .where( - ExpHistory.user_id == user_id, - ExpHistory.exp_type == ExpType.LOGIN + try: + # 获取用户等级信息 + result = await self.db.execute( + select(UserLevel).where(UserLevel.user_id == user_id) ) - ) - stats["login_count"] = result.scalar() or 0 - - # 获取考试统计 - result = await self.db.execute( - select( - func.count(Exam.id), - func.sum(func.if_(Exam.score >= 100, 1, 0)), - func.sum(func.if_(Exam.score >= 90, 1, 0)) + user_level = result.scalar_one_or_none() + if user_level: + stats["login_streak"] = user_level.login_streak or 0 + stats["user_level"] = user_level.level or 1 + + # 获取登录/签到次数(从经验值历史) + result = await self.db.execute( + select(func.count(ExpHistory.id)) + .where( + ExpHistory.user_id == user_id, + ExpHistory.exp_type == ExpType.LOGIN + ) ) - .where( - Exam.user_id == user_id, - Exam.is_passed == True, - Exam.status == "submitted" + stats["login_count"] = result.scalar() or 0 + + # 获取考试统计 - 使用 case 语句 + # 通过考试数量 + result = await self.db.execute( + select(func.count(Exam.id)) + .where( + Exam.user_id == user_id, + Exam.is_passed == True, + Exam.status == "submitted" + ) ) - ) - row = result.first() - if row: - stats["exam_passed"] = row[0] or 0 - stats["exam_perfect_count"] = int(row[1] or 0) - stats["exam_excellent"] = int(row[2] or 0) - - # 获取练习统计 - result = await self.db.execute( - select( - func.count(PracticeSession.id), - func.sum(PracticeSession.duration_seconds) + stats["exam_passed"] = result.scalar() or 0 + + # 满分考试数量(score >= 总分,通常是 100) + result = await self.db.execute( + select(func.count(Exam.id)) + .where( + Exam.user_id == user_id, + Exam.status == "submitted", + Exam.score >= Exam.total_score + ) ) - .where( - PracticeSession.user_id == user_id, - PracticeSession.status == "completed" + stats["exam_perfect_count"] = result.scalar() or 0 + + # 优秀考试数量(90分以上) + result = await self.db.execute( + select(func.count(Exam.id)) + .where( + Exam.user_id == user_id, + Exam.status == "submitted", + Exam.score >= 90 + ) ) - ) - row = result.first() - if row: - stats["practice_count"] = row[0] or 0 - total_seconds = row[1] or 0 - stats["practice_hours"] = total_seconds / 3600 - - # 获取陪练统计 - result = await self.db.execute( - select(func.count(TrainingSession.id)) - .where( - TrainingSession.user_id == user_id, - TrainingSession.status == "COMPLETED" + stats["exam_excellent"] = result.scalar() or 0 + + # 获取练习统计(PracticeSession - AI 陪练) + result = await self.db.execute( + select( + func.count(PracticeSession.id), + func.coalesce(func.sum(PracticeSession.duration_seconds), 0) + ) + .where( + PracticeSession.user_id == user_id, + PracticeSession.status == "completed" + ) ) - ) - stats["training_count"] = result.scalar() or 0 - - # 检查首次高分陪练 - result = await self.db.execute( - select(func.count(TrainingReport.id)) - .where( - TrainingReport.user_id == user_id, - TrainingReport.overall_score >= 90 + row = result.first() + if row: + stats["practice_count"] = row[0] or 0 + total_seconds = row[1] or 0 + stats["practice_hours"] = float(total_seconds) / 3600.0 + + # 获取培训/陪练统计(TrainingSession) + result = await self.db.execute( + select(func.count(TrainingSession.id)) + .where( + TrainingSession.user_id == user_id, + TrainingSession.status == "COMPLETED" + ) ) - ) - stats["first_practice_90"] = 1 if (result.scalar() or 0) > 0 else 0 + stats["training_count"] = result.scalar() or 0 + + # 检查是否有高分陪练(90分以上) + result = await self.db.execute( + select(func.count(TrainingReport.id)) + .where( + TrainingReport.user_id == user_id, + TrainingReport.overall_score >= 90 + ) + ) + high_score_count = result.scalar() or 0 + stats["first_practice_90"] = 1 if high_score_count > 0 else 0 + + logger.debug(f"用户 {user_id} 奖章统计数据: {stats}") + + except Exception as e: + logger.error(f"获取用户统计数据失败: {e}") return stats @@ -465,3 +487,100 @@ class BadgeService: await self.db.execute(query) await self.db.flush() + + async def check_badges_by_category( + self, + user_id: int, + categories: List[str] + ) -> List[Dict[str, Any]]: + """ + 按类别检查并授予奖章(优化触发时机) + + Args: + user_id: 用户ID + categories: 要检查的奖章类别列表 + + Returns: + 新获得的奖章列表 + """ + # 获取用户统计数据 + stats = await self._get_user_stats(user_id) + + # 获取指定类别的奖章定义 + result = await self.db.execute( + select(BadgeDefinition) + .where( + BadgeDefinition.is_active == True, + BadgeDefinition.category.in_(categories) + ) + .order_by(BadgeDefinition.sort_order) + ) + category_badges = list(result.scalars().all()) + + # 获取用户已有的奖章 + result = await self.db.execute( + select(UserBadge.badge_id).where(UserBadge.user_id == user_id) + ) + owned_badge_ids = {row[0] for row in result.all()} + + # 检查每个奖章的解锁条件 + newly_awarded = [] + for badge in category_badges: + if badge.id in owned_badge_ids: + continue + + # 检查条件 + condition_met = self._check_badge_condition(badge, stats) + + if condition_met: + # 授予奖章 + user_badge = UserBadge( + user_id=user_id, + badge_id=badge.id, + unlocked_at=datetime.now(), + is_notified=False + ) + self.db.add(user_badge) + + # 如果有经验奖励,添加经验值 + if badge.exp_reward > 0: + from app.services.level_service import LevelService + level_service = LevelService(self.db) + await level_service.add_exp( + user_id=user_id, + exp_amount=badge.exp_reward, + exp_type=ExpType.BADGE, + description=f"解锁奖章「{badge.name}」" + ) + + newly_awarded.append({ + "badge_id": badge.id, + "code": badge.code, + "name": badge.name, + "description": badge.description, + "icon": badge.icon, + "exp_reward": badge.exp_reward, + }) + + logger.info(f"用户 {user_id} 解锁奖章: {badge.name}") + + if newly_awarded: + await self.db.flush() + + return newly_awarded + + async def check_exam_badges(self, user_id: int) -> List[Dict[str, Any]]: + """考试后检查考试类奖章""" + return await self.check_badges_by_category(user_id, [BadgeCategory.EXAM]) + + async def check_practice_badges(self, user_id: int) -> List[Dict[str, Any]]: + """练习后检查练习类奖章""" + return await self.check_badges_by_category(user_id, [BadgeCategory.PRACTICE]) + + async def check_streak_badges(self, user_id: int) -> List[Dict[str, Any]]: + """签到后检查连续打卡类奖章""" + return await self.check_badges_by_category(user_id, [BadgeCategory.STREAK, BadgeCategory.LEARNING]) + + async def check_level_badges(self, user_id: int) -> List[Dict[str, Any]]: + """等级变化后检查等级类奖章""" + return await self.check_badges_by_category(user_id, [BadgeCategory.SPECIAL]) diff --git a/backend/app/services/certificate_service.py b/backend/app/services/certificate_service.py new file mode 100644 index 0000000..a31442f --- /dev/null +++ b/backend/app/services/certificate_service.py @@ -0,0 +1,516 @@ +""" +证书服务 + +提供证书管理功能: +- 颁发证书 +- 获取证书列表 +- 生成证书PDF/图片 +- 验证证书 +""" + +import os +import io +import uuid +from datetime import datetime +from typing import Optional, List, Dict, Any +from sqlalchemy import select, func, and_ +from sqlalchemy.ext.asyncio import AsyncSession +from PIL import Image, ImageDraw, ImageFont +import qrcode + +from app.core.logger import get_logger +from app.core.config import settings +from app.models.certificate import CertificateTemplate, UserCertificate, CertificateType + +logger = get_logger(__name__) + + +class CertificateService: + """证书服务""" + + # 证书编号前缀 + CERT_NO_PREFIX = "KPL" + + def __init__(self, db: AsyncSession): + self.db = db + + async def get_templates(self, cert_type: Optional[str] = None) -> List[Dict[str, Any]]: + """ + 获取证书模板列表 + + Args: + cert_type: 证书类型过滤 + + Returns: + 模板列表 + """ + query = select(CertificateTemplate).where(CertificateTemplate.is_active == True) + + if cert_type: + query = query.where(CertificateTemplate.type == cert_type) + + query = query.order_by(CertificateTemplate.sort_order) + + result = await self.db.execute(query) + templates = result.scalars().all() + + return [ + { + "id": t.id, + "name": t.name, + "type": t.type.value if isinstance(t.type, CertificateType) else t.type, + "background_url": t.background_url, + "is_active": t.is_active, + } + for t in templates + ] + + async def _generate_certificate_no(self) -> str: + """生成唯一证书编号""" + year = datetime.now().year + + # 获取当年的证书数量 + result = await self.db.execute( + select(func.count(UserCertificate.id)) + .where(UserCertificate.certificate_no.like(f"{self.CERT_NO_PREFIX}-{year}-%")) + ) + count = result.scalar() or 0 + + # 生成编号:KPL-年份-6位序号 + cert_no = f"{self.CERT_NO_PREFIX}-{year}-{str(count + 1).zfill(6)}" + return cert_no + + async def issue_course_certificate( + self, + user_id: int, + course_id: int, + course_name: str, + completion_rate: float, + user_name: str + ) -> Dict[str, Any]: + """ + 颁发课程结业证书 + + Args: + user_id: 用户ID + course_id: 课程ID + course_name: 课程名称 + completion_rate: 完成率 + user_name: 用户姓名 + + Returns: + 证书信息 + """ + # 检查是否已颁发 + existing = await self.db.execute( + select(UserCertificate).where( + UserCertificate.user_id == user_id, + UserCertificate.course_id == course_id + ) + ) + if existing.scalar_one_or_none(): + raise ValueError("该课程证书已颁发") + + # 获取课程证书模板 + result = await self.db.execute( + select(CertificateTemplate).where( + CertificateTemplate.type == CertificateType.COURSE, + CertificateTemplate.is_active == True + ) + ) + template = result.scalar_one_or_none() + if not template: + raise ValueError("证书模板不存在") + + # 生成证书编号 + cert_no = await self._generate_certificate_no() + + # 创建证书 + certificate = UserCertificate( + user_id=user_id, + template_id=template.id, + certificate_no=cert_no, + title=f"《{course_name}》课程结业证书", + description=f"完成课程《{course_name}》的全部学习内容", + course_id=course_id, + completion_rate=completion_rate, + meta_data={ + "course_name": course_name, + "user_name": user_name, + "completion_rate": completion_rate + } + ) + + self.db.add(certificate) + await self.db.flush() + + logger.info(f"颁发课程证书: user_id={user_id}, course_id={course_id}, cert_no={cert_no}") + + return await self._format_certificate(certificate, template) + + async def issue_exam_certificate( + self, + user_id: int, + exam_id: int, + exam_name: str, + score: float, + user_name: str + ) -> Dict[str, Any]: + """ + 颁发考试合格证书 + + Args: + user_id: 用户ID + exam_id: 考试ID + exam_name: 考试名称 + score: 分数 + user_name: 用户姓名 + + Returns: + 证书信息 + """ + # 检查是否已颁发 + existing = await self.db.execute( + select(UserCertificate).where( + UserCertificate.user_id == user_id, + UserCertificate.exam_id == exam_id + ) + ) + if existing.scalar_one_or_none(): + raise ValueError("该考试证书已颁发") + + # 获取考试证书模板 + result = await self.db.execute( + select(CertificateTemplate).where( + CertificateTemplate.type == CertificateType.EXAM, + CertificateTemplate.is_active == True + ) + ) + template = result.scalar_one_or_none() + if not template: + raise ValueError("证书模板不存在") + + # 生成证书编号 + cert_no = await self._generate_certificate_no() + + # 创建证书 + certificate = UserCertificate( + user_id=user_id, + template_id=template.id, + certificate_no=cert_no, + title=f"《{exam_name}》考试合格证书", + description=f"在《{exam_name}》考试中成绩合格", + exam_id=exam_id, + score=score, + meta_data={ + "exam_name": exam_name, + "user_name": user_name, + "score": score + } + ) + + self.db.add(certificate) + await self.db.flush() + + logger.info(f"颁发考试证书: user_id={user_id}, exam_id={exam_id}, cert_no={cert_no}") + + return await self._format_certificate(certificate, template) + + async def issue_achievement_certificate( + self, + user_id: int, + badge_id: int, + badge_name: str, + badge_description: str, + user_name: str + ) -> Dict[str, Any]: + """ + 颁发成就证书 + + Args: + user_id: 用户ID + badge_id: 奖章ID + badge_name: 奖章名称 + badge_description: 奖章描述 + user_name: 用户姓名 + + Returns: + 证书信息 + """ + # 检查是否已颁发 + existing = await self.db.execute( + select(UserCertificate).where( + UserCertificate.user_id == user_id, + UserCertificate.badge_id == badge_id + ) + ) + if existing.scalar_one_or_none(): + raise ValueError("该成就证书已颁发") + + # 获取成就证书模板 + result = await self.db.execute( + select(CertificateTemplate).where( + CertificateTemplate.type == CertificateType.ACHIEVEMENT, + CertificateTemplate.is_active == True + ) + ) + template = result.scalar_one_or_none() + if not template: + raise ValueError("证书模板不存在") + + # 生成证书编号 + cert_no = await self._generate_certificate_no() + + # 创建证书 + certificate = UserCertificate( + user_id=user_id, + template_id=template.id, + certificate_no=cert_no, + title=f"「{badge_name}」成就证书", + description=badge_description, + badge_id=badge_id, + meta_data={ + "badge_name": badge_name, + "badge_description": badge_description, + "user_name": user_name + } + ) + + self.db.add(certificate) + await self.db.flush() + + logger.info(f"颁发成就证书: user_id={user_id}, badge_id={badge_id}, cert_no={cert_no}") + + return await self._format_certificate(certificate, template) + + async def get_user_certificates( + self, + user_id: int, + cert_type: Optional[str] = None, + offset: int = 0, + limit: int = 20 + ) -> Dict[str, Any]: + """ + 获取用户证书列表 + + Args: + user_id: 用户ID + cert_type: 证书类型过滤 + offset: 偏移量 + limit: 数量限制 + + Returns: + 证书列表和分页信息 + """ + query = ( + select(UserCertificate, CertificateTemplate) + .join(CertificateTemplate, UserCertificate.template_id == CertificateTemplate.id) + .where(UserCertificate.user_id == user_id) + ) + + if cert_type: + query = query.where(CertificateTemplate.type == cert_type) + + # 获取总数 + count_query = select(func.count()).select_from(query.subquery()) + total_result = await self.db.execute(count_query) + total = total_result.scalar() or 0 + + # 分页查询 + query = query.order_by(UserCertificate.issued_at.desc()).offset(offset).limit(limit) + result = await self.db.execute(query) + rows = result.all() + + certificates = [ + await self._format_certificate(cert, template) + for cert, template in rows + ] + + return { + "items": certificates, + "total": total, + "offset": offset, + "limit": limit + } + + async def get_certificate_by_id(self, cert_id: int) -> Optional[Dict[str, Any]]: + """根据ID获取证书""" + result = await self.db.execute( + select(UserCertificate, CertificateTemplate) + .join(CertificateTemplate, UserCertificate.template_id == CertificateTemplate.id) + .where(UserCertificate.id == cert_id) + ) + row = result.first() + + if not row: + return None + + cert, template = row + return await self._format_certificate(cert, template) + + async def get_certificate_by_no(self, cert_no: str) -> Optional[Dict[str, Any]]: + """根据编号获取证书(用于验证)""" + result = await self.db.execute( + select(UserCertificate, CertificateTemplate) + .join(CertificateTemplate, UserCertificate.template_id == CertificateTemplate.id) + .where(UserCertificate.certificate_no == cert_no) + ) + row = result.first() + + if not row: + return None + + cert, template = row + return await self._format_certificate(cert, template, include_user=True) + + async def _format_certificate( + self, + cert: UserCertificate, + template: CertificateTemplate, + include_user: bool = False + ) -> Dict[str, Any]: + """格式化证书数据""" + data = { + "id": cert.id, + "certificate_no": cert.certificate_no, + "title": cert.title, + "description": cert.description, + "type": template.type.value if isinstance(template.type, CertificateType) else template.type, + "type_name": self._get_type_name(template.type), + "issued_at": cert.issued_at.isoformat() if cert.issued_at else None, + "valid_until": cert.valid_until.isoformat() if cert.valid_until else None, + "score": float(cert.score) if cert.score else None, + "completion_rate": float(cert.completion_rate) if cert.completion_rate else None, + "pdf_url": cert.pdf_url, + "image_url": cert.image_url, + "course_id": cert.course_id, + "exam_id": cert.exam_id, + "badge_id": cert.badge_id, + "meta_data": cert.meta_data, + "template": { + "id": template.id, + "name": template.name, + "background_url": template.background_url, + } + } + + if include_user and cert.user: + data["user"] = { + "id": cert.user.id, + "username": cert.user.username, + "full_name": cert.user.full_name, + } + + return data + + def _get_type_name(self, cert_type) -> str: + """获取证书类型名称""" + type_names = { + CertificateType.COURSE: "课程结业证书", + CertificateType.EXAM: "考试合格证书", + CertificateType.ACHIEVEMENT: "成就证书", + "course": "课程结业证书", + "exam": "考试合格证书", + "achievement": "成就证书", + } + return type_names.get(cert_type, "证书") + + async def generate_certificate_image( + self, + cert_id: int, + base_url: str = "" + ) -> bytes: + """ + 生成证书分享图片 + + Args: + cert_id: 证书ID + base_url: 基础URL(用于生成二维码链接) + + Returns: + 图片二进制数据 + """ + # 获取证书信息 + cert_data = await self.get_certificate_by_id(cert_id) + if not cert_data: + raise ValueError("证书不存在") + + # 创建图片 + width, height = 800, 600 + img = Image.new('RGB', (width, height), color='#f5f7fa') + draw = ImageDraw.Draw(img) + + # 尝试加载字体,如果失败则使用默认字体 + try: + title_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 36) + text_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 20) + small_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 14) + except: + title_font = ImageFont.load_default() + text_font = ImageFont.load_default() + small_font = ImageFont.load_default() + + # 绘制标题 + title = cert_data.get("type_name", "证书") + draw.text((width // 2, 60), title, font=title_font, fill='#333333', anchor='mm') + + # 绘制证书标题 + cert_title = cert_data.get("title", "") + draw.text((width // 2, 140), cert_title, font=text_font, fill='#666666', anchor='mm') + + # 绘制描述 + description = cert_data.get("description", "") + draw.text((width // 2, 200), description, font=text_font, fill='#666666', anchor='mm') + + # 绘制分数/完成率(如果有) + if cert_data.get("score"): + score_text = f"成绩:{cert_data['score']}分" + draw.text((width // 2, 280), score_text, font=text_font, fill='#667eea', anchor='mm') + elif cert_data.get("completion_rate"): + rate_text = f"完成率:{cert_data['completion_rate']}%" + draw.text((width // 2, 280), rate_text, font=text_font, fill='#667eea', anchor='mm') + + # 绘制颁发日期 + if cert_data.get("issued_at"): + date_text = f"颁发日期:{cert_data['issued_at'][:10]}" + draw.text((width // 2, 360), date_text, font=small_font, fill='#999999', anchor='mm') + + # 绘制证书编号 + cert_no = cert_data.get("certificate_no", "") + draw.text((width // 2, 520), f"证书编号:{cert_no}", font=small_font, fill='#999999', anchor='mm') + + # 生成验证二维码 + if base_url and cert_no: + verify_url = f"{base_url}/verify/{cert_no}" + qr = qrcode.QRCode(version=1, box_size=3, border=2) + qr.add_data(verify_url) + qr.make(fit=True) + qr_img = qr.make_image(fill_color="black", back_color="white") + qr_img = qr_img.resize((80, 80)) + img.paste(qr_img, (width - 100, height - 100)) + + # 转换为字节 + img_bytes = io.BytesIO() + img.save(img_bytes, format='PNG') + img_bytes.seek(0) + + return img_bytes.getvalue() + + async def update_certificate_files( + self, + cert_id: int, + pdf_url: Optional[str] = None, + image_url: Optional[str] = None + ): + """更新证书文件URL""" + result = await self.db.execute( + select(UserCertificate).where(UserCertificate.id == cert_id) + ) + cert = result.scalar_one_or_none() + + if cert: + if pdf_url: + cert.pdf_url = pdf_url + if image_url: + cert.image_url = image_url + await self.db.flush() diff --git a/backend/app/services/dashboard_service.py b/backend/app/services/dashboard_service.py new file mode 100644 index 0000000..2c0bcb8 --- /dev/null +++ b/backend/app/services/dashboard_service.py @@ -0,0 +1,489 @@ +""" +数据大屏服务 + +提供企业级和团队级数据大屏功能: +- 学习数据概览 +- 部门/团队对比 +- 趋势分析 +- 实时动态 +""" + +from datetime import datetime, timedelta, date +from typing import Optional, List, Dict, Any +from sqlalchemy import select, func, and_, or_, desc, case +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.logger import get_logger +from app.models.user import User +from app.models.course import Course, CourseMaterial +from app.models.exam import Exam +from app.models.practice import PracticeSession +from app.models.training import TrainingSession, TrainingReport +from app.models.level import UserLevel, ExpHistory, UserBadge +from app.models.position import Position +from app.models.position_member import PositionMember + +logger = get_logger(__name__) + + +class DashboardService: + """数据大屏服务""" + + def __init__(self, db: AsyncSession): + self.db = db + + async def get_enterprise_overview(self, enterprise_id: Optional[int] = None) -> Dict[str, Any]: + """ + 获取企业级数据概览 + + Args: + enterprise_id: 企业ID(可选,用于多租户) + + Returns: + 企业级数据概览 + """ + today = date.today() + week_ago = today - timedelta(days=7) + month_ago = today - timedelta(days=30) + + # 基础统计 + # 1. 总学员数 + result = await self.db.execute( + select(func.count(User.id)) + .where(User.is_deleted == False, User.role == 'trainee') + ) + total_users = result.scalar() or 0 + + # 2. 今日活跃用户(有经验值记录) + result = await self.db.execute( + select(func.count(func.distinct(ExpHistory.user_id))) + .where(func.date(ExpHistory.created_at) == today) + ) + today_active = result.scalar() or 0 + + # 3. 本周活跃用户 + result = await self.db.execute( + select(func.count(func.distinct(ExpHistory.user_id))) + .where(ExpHistory.created_at >= datetime.combine(week_ago, datetime.min.time())) + ) + week_active = result.scalar() or 0 + + # 4. 本月活跃用户 + result = await self.db.execute( + select(func.count(func.distinct(ExpHistory.user_id))) + .where(ExpHistory.created_at >= datetime.combine(month_ago, datetime.min.time())) + ) + month_active = result.scalar() or 0 + + # 5. 总学习时长(小时) + result = await self.db.execute( + select(func.coalesce(func.sum(PracticeSession.duration_seconds), 0)) + .where(PracticeSession.status == 'completed') + ) + practice_hours = (result.scalar() or 0) / 3600 + + result = await self.db.execute( + select(func.coalesce(func.sum(TrainingSession.duration_seconds), 0)) + .where(TrainingSession.status == 'COMPLETED') + ) + training_hours = (result.scalar() or 0) / 3600 + + total_hours = round(practice_hours + training_hours, 1) + + # 6. 考试统计 + result = await self.db.execute( + select( + func.count(Exam.id), + func.count(case((Exam.is_passed == True, 1))), + func.avg(Exam.score) + ) + .where(Exam.status == 'submitted') + ) + exam_row = result.first() + exam_count = exam_row[0] or 0 + exam_passed = exam_row[1] or 0 + exam_avg_score = round(exam_row[2] or 0, 1) + exam_pass_rate = round(exam_passed / exam_count * 100, 1) if exam_count > 0 else 0 + + # 7. 满分人数 + result = await self.db.execute( + select(func.count(func.distinct(Exam.user_id))) + .where(Exam.status == 'submitted', Exam.score >= Exam.total_score) + ) + perfect_users = result.scalar() or 0 + + # 8. 签到率(今日签到人数/总用户数) + result = await self.db.execute( + select(func.count(UserLevel.id)) + .where(func.date(UserLevel.last_login_date) == today) + ) + today_checkin = result.scalar() or 0 + checkin_rate = round(today_checkin / total_users * 100, 1) if total_users > 0 else 0 + + return { + "overview": { + "total_users": total_users, + "today_active": today_active, + "week_active": week_active, + "month_active": month_active, + "total_hours": total_hours, + "checkin_rate": checkin_rate, + }, + "exam": { + "total_count": exam_count, + "pass_rate": exam_pass_rate, + "avg_score": exam_avg_score, + "perfect_users": perfect_users, + }, + "updated_at": datetime.now().isoformat() + } + + async def get_department_comparison(self) -> List[Dict[str, Any]]: + """ + 获取部门/团队学习对比数据 + + Returns: + 部门对比列表 + """ + # 获取所有岗位及其成员的学习数据 + result = await self.db.execute( + select(Position) + .where(Position.is_deleted == False) + .order_by(Position.name) + ) + positions = result.scalars().all() + + departments = [] + for pos in positions: + # 获取该岗位的成员数 + result = await self.db.execute( + select(func.count(PositionMember.id)) + .where(PositionMember.position_id == pos.id) + ) + member_count = result.scalar() or 0 + + if member_count == 0: + continue + + # 获取成员ID列表 + result = await self.db.execute( + select(PositionMember.user_id) + .where(PositionMember.position_id == pos.id) + ) + member_ids = [row[0] for row in result.all()] + + # 统计该岗位成员的学习数据 + # 考试通过率 + result = await self.db.execute( + select( + func.count(Exam.id), + func.count(case((Exam.is_passed == True, 1))) + ) + .where( + Exam.user_id.in_(member_ids), + Exam.status == 'submitted' + ) + ) + exam_row = result.first() + exam_total = exam_row[0] or 0 + exam_passed = exam_row[1] or 0 + pass_rate = round(exam_passed / exam_total * 100, 1) if exam_total > 0 else 0 + + # 平均学习时长 + result = await self.db.execute( + select(func.coalesce(func.sum(PracticeSession.duration_seconds), 0)) + .where( + PracticeSession.user_id.in_(member_ids), + PracticeSession.status == 'completed' + ) + ) + total_seconds = result.scalar() or 0 + avg_hours = round(total_seconds / 3600 / member_count, 1) if member_count > 0 else 0 + + # 平均等级 + result = await self.db.execute( + select(func.avg(UserLevel.level)) + .where(UserLevel.user_id.in_(member_ids)) + ) + avg_level = round(result.scalar() or 1, 1) + + departments.append({ + "id": pos.id, + "name": pos.name, + "member_count": member_count, + "pass_rate": pass_rate, + "avg_hours": avg_hours, + "avg_level": avg_level, + }) + + # 按通过率排序 + departments.sort(key=lambda x: x["pass_rate"], reverse=True) + + return departments + + async def get_learning_trend(self, days: int = 7) -> Dict[str, Any]: + """ + 获取学习趋势数据 + + Args: + days: 统计天数 + + Returns: + 趋势数据 + """ + today = date.today() + dates = [(today - timedelta(days=i)) for i in range(days-1, -1, -1)] + + trend_data = [] + for d in dates: + # 当日活跃用户 + result = await self.db.execute( + select(func.count(func.distinct(ExpHistory.user_id))) + .where(func.date(ExpHistory.created_at) == d) + ) + active_users = result.scalar() or 0 + + # 当日新增学习时长 + result = await self.db.execute( + select(func.coalesce(func.sum(PracticeSession.duration_seconds), 0)) + .where( + func.date(PracticeSession.created_at) == d, + PracticeSession.status == 'completed' + ) + ) + hours = round((result.scalar() or 0) / 3600, 1) + + # 当日考试次数 + result = await self.db.execute( + select(func.count(Exam.id)) + .where( + func.date(Exam.created_at) == d, + Exam.status == 'submitted' + ) + ) + exams = result.scalar() or 0 + + trend_data.append({ + "date": d.isoformat(), + "active_users": active_users, + "learning_hours": hours, + "exam_count": exams, + }) + + return { + "dates": [d.isoformat() for d in dates], + "trend": trend_data + } + + async def get_level_distribution(self) -> Dict[str, Any]: + """ + 获取等级分布数据 + + Returns: + 等级分布 + """ + result = await self.db.execute( + select(UserLevel.level, func.count(UserLevel.id)) + .group_by(UserLevel.level) + .order_by(UserLevel.level) + ) + rows = result.all() + + distribution = {row[0]: row[1] for row in rows} + + # 补全1-10级 + for i in range(1, 11): + if i not in distribution: + distribution[i] = 0 + + return { + "levels": list(range(1, 11)), + "counts": [distribution.get(i, 0) for i in range(1, 11)] + } + + async def get_realtime_activities(self, limit: int = 20) -> List[Dict[str, Any]]: + """ + 获取实时动态 + + Args: + limit: 数量限制 + + Returns: + 实时动态列表 + """ + activities = [] + + # 获取最近的经验值记录 + result = await self.db.execute( + select(ExpHistory, User) + .join(User, ExpHistory.user_id == User.id) + .order_by(ExpHistory.created_at.desc()) + .limit(limit) + ) + rows = result.all() + + for exp, user in rows: + activity_type = "学习" + if "考试" in (exp.description or ""): + activity_type = "考试" + elif "签到" in (exp.description or ""): + activity_type = "签到" + elif "陪练" in (exp.description or ""): + activity_type = "陪练" + elif "奖章" in (exp.description or ""): + activity_type = "奖章" + + activities.append({ + "id": exp.id, + "user_id": user.id, + "user_name": user.full_name or user.username, + "type": activity_type, + "description": exp.description, + "exp_amount": exp.exp_amount, + "created_at": exp.created_at.isoformat() if exp.created_at else None, + }) + + return activities + + async def get_team_dashboard(self, team_leader_id: int) -> Dict[str, Any]: + """ + 获取团队级数据大屏 + + Args: + team_leader_id: 团队负责人ID + + Returns: + 团队数据 + """ + # 获取团队负责人管理的岗位 + result = await self.db.execute( + select(Position) + .where( + Position.is_deleted == False, + or_( + Position.manager_id == team_leader_id, + Position.created_by == team_leader_id + ) + ) + ) + positions = result.scalars().all() + position_ids = [p.id for p in positions] + + if not position_ids: + return { + "members": [], + "overview": { + "total_members": 0, + "avg_level": 0, + "avg_exp": 0, + "total_badges": 0, + }, + "pending_tasks": [] + } + + # 获取团队成员 + result = await self.db.execute( + select(PositionMember.user_id) + .where(PositionMember.position_id.in_(position_ids)) + ) + member_ids = [row[0] for row in result.all()] + + if not member_ids: + return { + "members": [], + "overview": { + "total_members": 0, + "avg_level": 0, + "avg_exp": 0, + "total_badges": 0, + }, + "pending_tasks": [] + } + + # 获取成员详细信息 + result = await self.db.execute( + select(User, UserLevel) + .outerjoin(UserLevel, User.id == UserLevel.user_id) + .where(User.id.in_(member_ids)) + .order_by(UserLevel.total_exp.desc().nullslast()) + ) + rows = result.all() + + members = [] + total_exp = 0 + total_level = 0 + + for user, level in rows: + user_level = level.level if level else 1 + user_exp = level.total_exp if level else 0 + total_level += user_level + total_exp += user_exp + + # 获取用户奖章数 + result = await self.db.execute( + select(func.count(UserBadge.id)) + .where(UserBadge.user_id == user.id) + ) + badge_count = result.scalar() or 0 + + members.append({ + "id": user.id, + "username": user.username, + "full_name": user.full_name, + "avatar_url": user.avatar_url, + "level": user_level, + "total_exp": user_exp, + "badge_count": badge_count, + }) + + total_members = len(members) + + # 获取团队总奖章数 + result = await self.db.execute( + select(func.count(UserBadge.id)) + .where(UserBadge.user_id.in_(member_ids)) + ) + total_badges = result.scalar() or 0 + + return { + "members": members, + "overview": { + "total_members": total_members, + "avg_level": round(total_level / total_members, 1) if total_members > 0 else 0, + "avg_exp": round(total_exp / total_members) if total_members > 0 else 0, + "total_badges": total_badges, + }, + "positions": [{"id": p.id, "name": p.name} for p in positions] + } + + async def get_course_ranking(self, limit: int = 10) -> List[Dict[str, Any]]: + """ + 获取课程热度排行 + + Args: + limit: 数量限制 + + Returns: + 课程排行列表 + """ + # 这里简化实现,实际应该统计课程学习次数 + result = await self.db.execute( + select(Course) + .where(Course.is_deleted == False, Course.is_published == True) + .order_by(Course.created_at.desc()) + .limit(limit) + ) + courses = result.scalars().all() + + ranking = [] + for i, course in enumerate(courses, 1): + ranking.append({ + "rank": i, + "id": course.id, + "name": course.name, + "description": course.description, + # 这里可以添加实际的学习人数统计 + "learners": 0, + }) + + return ranking diff --git a/backend/migrations/add_certificate_system.sql b/backend/migrations/add_certificate_system.sql new file mode 100644 index 0000000..5411b51 --- /dev/null +++ b/backend/migrations/add_certificate_system.sql @@ -0,0 +1,166 @@ +-- ================================================================ +-- 证书系统数据库迁移脚本 +-- 创建日期: 2026-01-29 +-- 功能: 添加证书模板表和用户证书表 +-- ================================================================ + +-- 事务开始 +START TRANSACTION; + +-- ================================================================ +-- 1. 创建证书模板表 +-- ================================================================ +CREATE TABLE IF NOT EXISTS certificate_templates ( + id INT PRIMARY KEY AUTO_INCREMENT, + name VARCHAR(100) NOT NULL COMMENT '模板名称', + type ENUM('course', 'exam', 'achievement') NOT NULL COMMENT '证书类型: course=课程结业, exam=考试合格, achievement=成就证书', + background_url VARCHAR(500) COMMENT '证书背景图URL', + template_html TEXT COMMENT 'HTML模板内容', + template_style TEXT COMMENT 'CSS样式', + is_active BOOLEAN DEFAULT TRUE COMMENT '是否启用', + sort_order INT DEFAULT 0 COMMENT '排序顺序', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_type (type), + INDEX idx_is_active (is_active) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='证书模板表'; + +-- ================================================================ +-- 2. 创建用户证书表 +-- ================================================================ +CREATE TABLE IF NOT EXISTS user_certificates ( + id INT PRIMARY KEY AUTO_INCREMENT, + user_id INT NOT NULL COMMENT '用户ID', + template_id INT NOT NULL COMMENT '模板ID', + certificate_no VARCHAR(50) UNIQUE NOT NULL COMMENT '证书编号 KPL-年份-序号', + title VARCHAR(200) NOT NULL COMMENT '证书标题', + description TEXT COMMENT '证书描述/成就说明', + issued_at DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '颁发时间', + valid_until DATETIME COMMENT '有效期至(NULL表示永久)', + + -- 关联信息 + course_id INT COMMENT '关联课程ID', + exam_id INT COMMENT '关联考试ID', + badge_id INT COMMENT '关联奖章ID', + + -- 成绩信息 + score DECIMAL(5,2) COMMENT '考试分数', + completion_rate DECIMAL(5,2) COMMENT '完成率', + + -- 生成的文件 + pdf_url VARCHAR(500) COMMENT 'PDF文件URL', + image_url VARCHAR(500) COMMENT '分享图片URL', + + -- 元数据 + meta_data JSON COMMENT '扩展元数据', + + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, + FOREIGN KEY (template_id) REFERENCES certificate_templates(id), + INDEX idx_user_id (user_id), + INDEX idx_certificate_no (certificate_no), + INDEX idx_course_id (course_id), + INDEX idx_exam_id (exam_id), + INDEX idx_issued_at (issued_at) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='用户证书表'; + +-- ================================================================ +-- 3. 插入默认证书模板 +-- ================================================================ +INSERT INTO certificate_templates (name, type, template_html, template_style, is_active, sort_order) VALUES +-- 课程结业证书模板 +('课程结业证书', 'course', +'
兹证明 {{user_name}}
+已完成课程《{{course_name}}》的全部学习内容
+完成率:{{completion_rate}}%
+颁发日期:{{issue_date}}
+兹证明 {{user_name}}
+在《{{exam_name}}》考试中成绩合格
+考试日期:{{exam_date}}
+兹证明 {{user_name}}
+ +{{badge_name}}
+{{badge_description}}
+获得日期:{{achieve_date}}
+记录您的学习成就与荣誉
+{{ cert.description }}
+ +完成课程或考试后即可获得证书
+ +正在生成证书图片...
+