feat: KPL v1.5.0 功能迭代
Some checks failed
continuous-integration/drone/push Build is failing

1. 奖章条件优化
- 修复统计查询 SQL 语法
- 添加按类别检查奖章方法

2. 移动端适配
- 登录页、课程中心、课程详情
- 考试页面、成长路径、排行榜

3. 证书系统
- 数据库模型和迁移脚本
- 证书颁发/列表/下载/验证 API
- 前端证书列表页面

4. 数据大屏
- 企业级/团队级数据 API
- ECharts 可视化大屏页面
This commit is contained in:
yuliang_guo
2026-01-29 16:51:17 +08:00
parent 813ba2c295
commit 6f0f2e6363
21 changed files with 4907 additions and 80 deletions

View File

@@ -0,0 +1,516 @@
"""
证书服务
提供证书管理功能:
- 颁发证书
- 获取证书列表
- 生成证书PDF/图片
- 验证证书
"""
import os
import io
import uuid
from datetime import datetime
from typing import Optional, List, Dict, Any
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from PIL import Image, ImageDraw, ImageFont
import qrcode
from app.core.logger import get_logger
from app.core.config import settings
from app.models.certificate import CertificateTemplate, UserCertificate, CertificateType
logger = get_logger(__name__)
class CertificateService:
"""证书服务"""
# 证书编号前缀
CERT_NO_PREFIX = "KPL"
def __init__(self, db: AsyncSession):
self.db = db
async def get_templates(self, cert_type: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取证书模板列表
Args:
cert_type: 证书类型过滤
Returns:
模板列表
"""
query = select(CertificateTemplate).where(CertificateTemplate.is_active == True)
if cert_type:
query = query.where(CertificateTemplate.type == cert_type)
query = query.order_by(CertificateTemplate.sort_order)
result = await self.db.execute(query)
templates = result.scalars().all()
return [
{
"id": t.id,
"name": t.name,
"type": t.type.value if isinstance(t.type, CertificateType) else t.type,
"background_url": t.background_url,
"is_active": t.is_active,
}
for t in templates
]
async def _generate_certificate_no(self) -> str:
"""生成唯一证书编号"""
year = datetime.now().year
# 获取当年的证书数量
result = await self.db.execute(
select(func.count(UserCertificate.id))
.where(UserCertificate.certificate_no.like(f"{self.CERT_NO_PREFIX}-{year}-%"))
)
count = result.scalar() or 0
# 生成编号KPL-年份-6位序号
cert_no = f"{self.CERT_NO_PREFIX}-{year}-{str(count + 1).zfill(6)}"
return cert_no
async def issue_course_certificate(
self,
user_id: int,
course_id: int,
course_name: str,
completion_rate: float,
user_name: str
) -> Dict[str, Any]:
"""
颁发课程结业证书
Args:
user_id: 用户ID
course_id: 课程ID
course_name: 课程名称
completion_rate: 完成率
user_name: 用户姓名
Returns:
证书信息
"""
# 检查是否已颁发
existing = await self.db.execute(
select(UserCertificate).where(
UserCertificate.user_id == user_id,
UserCertificate.course_id == course_id
)
)
if existing.scalar_one_or_none():
raise ValueError("该课程证书已颁发")
# 获取课程证书模板
result = await self.db.execute(
select(CertificateTemplate).where(
CertificateTemplate.type == CertificateType.COURSE,
CertificateTemplate.is_active == True
)
)
template = result.scalar_one_or_none()
if not template:
raise ValueError("证书模板不存在")
# 生成证书编号
cert_no = await self._generate_certificate_no()
# 创建证书
certificate = UserCertificate(
user_id=user_id,
template_id=template.id,
certificate_no=cert_no,
title=f"{course_name}》课程结业证书",
description=f"完成课程《{course_name}》的全部学习内容",
course_id=course_id,
completion_rate=completion_rate,
meta_data={
"course_name": course_name,
"user_name": user_name,
"completion_rate": completion_rate
}
)
self.db.add(certificate)
await self.db.flush()
logger.info(f"颁发课程证书: user_id={user_id}, course_id={course_id}, cert_no={cert_no}")
return await self._format_certificate(certificate, template)
async def issue_exam_certificate(
self,
user_id: int,
exam_id: int,
exam_name: str,
score: float,
user_name: str
) -> Dict[str, Any]:
"""
颁发考试合格证书
Args:
user_id: 用户ID
exam_id: 考试ID
exam_name: 考试名称
score: 分数
user_name: 用户姓名
Returns:
证书信息
"""
# 检查是否已颁发
existing = await self.db.execute(
select(UserCertificate).where(
UserCertificate.user_id == user_id,
UserCertificate.exam_id == exam_id
)
)
if existing.scalar_one_or_none():
raise ValueError("该考试证书已颁发")
# 获取考试证书模板
result = await self.db.execute(
select(CertificateTemplate).where(
CertificateTemplate.type == CertificateType.EXAM,
CertificateTemplate.is_active == True
)
)
template = result.scalar_one_or_none()
if not template:
raise ValueError("证书模板不存在")
# 生成证书编号
cert_no = await self._generate_certificate_no()
# 创建证书
certificate = UserCertificate(
user_id=user_id,
template_id=template.id,
certificate_no=cert_no,
title=f"{exam_name}》考试合格证书",
description=f"在《{exam_name}》考试中成绩合格",
exam_id=exam_id,
score=score,
meta_data={
"exam_name": exam_name,
"user_name": user_name,
"score": score
}
)
self.db.add(certificate)
await self.db.flush()
logger.info(f"颁发考试证书: user_id={user_id}, exam_id={exam_id}, cert_no={cert_no}")
return await self._format_certificate(certificate, template)
async def issue_achievement_certificate(
self,
user_id: int,
badge_id: int,
badge_name: str,
badge_description: str,
user_name: str
) -> Dict[str, Any]:
"""
颁发成就证书
Args:
user_id: 用户ID
badge_id: 奖章ID
badge_name: 奖章名称
badge_description: 奖章描述
user_name: 用户姓名
Returns:
证书信息
"""
# 检查是否已颁发
existing = await self.db.execute(
select(UserCertificate).where(
UserCertificate.user_id == user_id,
UserCertificate.badge_id == badge_id
)
)
if existing.scalar_one_or_none():
raise ValueError("该成就证书已颁发")
# 获取成就证书模板
result = await self.db.execute(
select(CertificateTemplate).where(
CertificateTemplate.type == CertificateType.ACHIEVEMENT,
CertificateTemplate.is_active == True
)
)
template = result.scalar_one_or_none()
if not template:
raise ValueError("证书模板不存在")
# 生成证书编号
cert_no = await self._generate_certificate_no()
# 创建证书
certificate = UserCertificate(
user_id=user_id,
template_id=template.id,
certificate_no=cert_no,
title=f"{badge_name}」成就证书",
description=badge_description,
badge_id=badge_id,
meta_data={
"badge_name": badge_name,
"badge_description": badge_description,
"user_name": user_name
}
)
self.db.add(certificate)
await self.db.flush()
logger.info(f"颁发成就证书: user_id={user_id}, badge_id={badge_id}, cert_no={cert_no}")
return await self._format_certificate(certificate, template)
async def get_user_certificates(
self,
user_id: int,
cert_type: Optional[str] = None,
offset: int = 0,
limit: int = 20
) -> Dict[str, Any]:
"""
获取用户证书列表
Args:
user_id: 用户ID
cert_type: 证书类型过滤
offset: 偏移量
limit: 数量限制
Returns:
证书列表和分页信息
"""
query = (
select(UserCertificate, CertificateTemplate)
.join(CertificateTemplate, UserCertificate.template_id == CertificateTemplate.id)
.where(UserCertificate.user_id == user_id)
)
if cert_type:
query = query.where(CertificateTemplate.type == cert_type)
# 获取总数
count_query = select(func.count()).select_from(query.subquery())
total_result = await self.db.execute(count_query)
total = total_result.scalar() or 0
# 分页查询
query = query.order_by(UserCertificate.issued_at.desc()).offset(offset).limit(limit)
result = await self.db.execute(query)
rows = result.all()
certificates = [
await self._format_certificate(cert, template)
for cert, template in rows
]
return {
"items": certificates,
"total": total,
"offset": offset,
"limit": limit
}
async def get_certificate_by_id(self, cert_id: int) -> Optional[Dict[str, Any]]:
"""根据ID获取证书"""
result = await self.db.execute(
select(UserCertificate, CertificateTemplate)
.join(CertificateTemplate, UserCertificate.template_id == CertificateTemplate.id)
.where(UserCertificate.id == cert_id)
)
row = result.first()
if not row:
return None
cert, template = row
return await self._format_certificate(cert, template)
async def get_certificate_by_no(self, cert_no: str) -> Optional[Dict[str, Any]]:
"""根据编号获取证书(用于验证)"""
result = await self.db.execute(
select(UserCertificate, CertificateTemplate)
.join(CertificateTemplate, UserCertificate.template_id == CertificateTemplate.id)
.where(UserCertificate.certificate_no == cert_no)
)
row = result.first()
if not row:
return None
cert, template = row
return await self._format_certificate(cert, template, include_user=True)
async def _format_certificate(
self,
cert: UserCertificate,
template: CertificateTemplate,
include_user: bool = False
) -> Dict[str, Any]:
"""格式化证书数据"""
data = {
"id": cert.id,
"certificate_no": cert.certificate_no,
"title": cert.title,
"description": cert.description,
"type": template.type.value if isinstance(template.type, CertificateType) else template.type,
"type_name": self._get_type_name(template.type),
"issued_at": cert.issued_at.isoformat() if cert.issued_at else None,
"valid_until": cert.valid_until.isoformat() if cert.valid_until else None,
"score": float(cert.score) if cert.score else None,
"completion_rate": float(cert.completion_rate) if cert.completion_rate else None,
"pdf_url": cert.pdf_url,
"image_url": cert.image_url,
"course_id": cert.course_id,
"exam_id": cert.exam_id,
"badge_id": cert.badge_id,
"meta_data": cert.meta_data,
"template": {
"id": template.id,
"name": template.name,
"background_url": template.background_url,
}
}
if include_user and cert.user:
data["user"] = {
"id": cert.user.id,
"username": cert.user.username,
"full_name": cert.user.full_name,
}
return data
def _get_type_name(self, cert_type) -> str:
"""获取证书类型名称"""
type_names = {
CertificateType.COURSE: "课程结业证书",
CertificateType.EXAM: "考试合格证书",
CertificateType.ACHIEVEMENT: "成就证书",
"course": "课程结业证书",
"exam": "考试合格证书",
"achievement": "成就证书",
}
return type_names.get(cert_type, "证书")
async def generate_certificate_image(
self,
cert_id: int,
base_url: str = ""
) -> bytes:
"""
生成证书分享图片
Args:
cert_id: 证书ID
base_url: 基础URL用于生成二维码链接
Returns:
图片二进制数据
"""
# 获取证书信息
cert_data = await self.get_certificate_by_id(cert_id)
if not cert_data:
raise ValueError("证书不存在")
# 创建图片
width, height = 800, 600
img = Image.new('RGB', (width, height), color='#f5f7fa')
draw = ImageDraw.Draw(img)
# 尝试加载字体,如果失败则使用默认字体
try:
title_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 36)
text_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 20)
small_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 14)
except:
title_font = ImageFont.load_default()
text_font = ImageFont.load_default()
small_font = ImageFont.load_default()
# 绘制标题
title = cert_data.get("type_name", "证书")
draw.text((width // 2, 60), title, font=title_font, fill='#333333', anchor='mm')
# 绘制证书标题
cert_title = cert_data.get("title", "")
draw.text((width // 2, 140), cert_title, font=text_font, fill='#666666', anchor='mm')
# 绘制描述
description = cert_data.get("description", "")
draw.text((width // 2, 200), description, font=text_font, fill='#666666', anchor='mm')
# 绘制分数/完成率(如果有)
if cert_data.get("score"):
score_text = f"成绩:{cert_data['score']}"
draw.text((width // 2, 280), score_text, font=text_font, fill='#667eea', anchor='mm')
elif cert_data.get("completion_rate"):
rate_text = f"完成率:{cert_data['completion_rate']}%"
draw.text((width // 2, 280), rate_text, font=text_font, fill='#667eea', anchor='mm')
# 绘制颁发日期
if cert_data.get("issued_at"):
date_text = f"颁发日期:{cert_data['issued_at'][:10]}"
draw.text((width // 2, 360), date_text, font=small_font, fill='#999999', anchor='mm')
# 绘制证书编号
cert_no = cert_data.get("certificate_no", "")
draw.text((width // 2, 520), f"证书编号:{cert_no}", font=small_font, fill='#999999', anchor='mm')
# 生成验证二维码
if base_url and cert_no:
verify_url = f"{base_url}/verify/{cert_no}"
qr = qrcode.QRCode(version=1, box_size=3, border=2)
qr.add_data(verify_url)
qr.make(fit=True)
qr_img = qr.make_image(fill_color="black", back_color="white")
qr_img = qr_img.resize((80, 80))
img.paste(qr_img, (width - 100, height - 100))
# 转换为字节
img_bytes = io.BytesIO()
img.save(img_bytes, format='PNG')
img_bytes.seek(0)
return img_bytes.getvalue()
async def update_certificate_files(
self,
cert_id: int,
pdf_url: Optional[str] = None,
image_url: Optional[str] = None
):
"""更新证书文件URL"""
result = await self.db.execute(
select(UserCertificate).where(UserCertificate.id == cert_id)
)
cert = result.scalar_one_or_none()
if cert:
if pdf_url:
cert.pdf_url = pdf_url
if image_url:
cert.image_url = image_url
await self.db.flush()