Files
012-kaopeilian/backend/app/services/certificate_service.py
yuliang_guo 6f0f2e6363
Some checks failed
continuous-integration/drone/push Build is failing
feat: KPL v1.5.0 功能迭代
1. 奖章条件优化
- 修复统计查询 SQL 语法
- 添加按类别检查奖章方法

2. 移动端适配
- 登录页、课程中心、课程详情
- 考试页面、成长路径、排行榜

3. 证书系统
- 数据库模型和迁移脚本
- 证书颁发/列表/下载/验证 API
- 前端证书列表页面

4. 数据大屏
- 企业级/团队级数据 API
- ECharts 可视化大屏页面
2026-01-29 16:51:17 +08:00

517 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
证书服务
提供证书管理功能:
- 颁发证书
- 获取证书列表
- 生成证书PDF/图片
- 验证证书
"""
import os
import io
import uuid
from datetime import datetime
from typing import Optional, List, Dict, Any
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from PIL import Image, ImageDraw, ImageFont
import qrcode
from app.core.logger import get_logger
from app.core.config import settings
from app.models.certificate import CertificateTemplate, UserCertificate, CertificateType
logger = get_logger(__name__)
class CertificateService:
"""证书服务"""
# 证书编号前缀
CERT_NO_PREFIX = "KPL"
def __init__(self, db: AsyncSession):
self.db = db
async def get_templates(self, cert_type: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取证书模板列表
Args:
cert_type: 证书类型过滤
Returns:
模板列表
"""
query = select(CertificateTemplate).where(CertificateTemplate.is_active == True)
if cert_type:
query = query.where(CertificateTemplate.type == cert_type)
query = query.order_by(CertificateTemplate.sort_order)
result = await self.db.execute(query)
templates = result.scalars().all()
return [
{
"id": t.id,
"name": t.name,
"type": t.type.value if isinstance(t.type, CertificateType) else t.type,
"background_url": t.background_url,
"is_active": t.is_active,
}
for t in templates
]
async def _generate_certificate_no(self) -> str:
"""生成唯一证书编号"""
year = datetime.now().year
# 获取当年的证书数量
result = await self.db.execute(
select(func.count(UserCertificate.id))
.where(UserCertificate.certificate_no.like(f"{self.CERT_NO_PREFIX}-{year}-%"))
)
count = result.scalar() or 0
# 生成编号KPL-年份-6位序号
cert_no = f"{self.CERT_NO_PREFIX}-{year}-{str(count + 1).zfill(6)}"
return cert_no
async def issue_course_certificate(
self,
user_id: int,
course_id: int,
course_name: str,
completion_rate: float,
user_name: str
) -> Dict[str, Any]:
"""
颁发课程结业证书
Args:
user_id: 用户ID
course_id: 课程ID
course_name: 课程名称
completion_rate: 完成率
user_name: 用户姓名
Returns:
证书信息
"""
# 检查是否已颁发
existing = await self.db.execute(
select(UserCertificate).where(
UserCertificate.user_id == user_id,
UserCertificate.course_id == course_id
)
)
if existing.scalar_one_or_none():
raise ValueError("该课程证书已颁发")
# 获取课程证书模板
result = await self.db.execute(
select(CertificateTemplate).where(
CertificateTemplate.type == CertificateType.COURSE,
CertificateTemplate.is_active == True
)
)
template = result.scalar_one_or_none()
if not template:
raise ValueError("证书模板不存在")
# 生成证书编号
cert_no = await self._generate_certificate_no()
# 创建证书
certificate = UserCertificate(
user_id=user_id,
template_id=template.id,
certificate_no=cert_no,
title=f"{course_name}》课程结业证书",
description=f"完成课程《{course_name}》的全部学习内容",
course_id=course_id,
completion_rate=completion_rate,
meta_data={
"course_name": course_name,
"user_name": user_name,
"completion_rate": completion_rate
}
)
self.db.add(certificate)
await self.db.flush()
logger.info(f"颁发课程证书: user_id={user_id}, course_id={course_id}, cert_no={cert_no}")
return await self._format_certificate(certificate, template)
async def issue_exam_certificate(
self,
user_id: int,
exam_id: int,
exam_name: str,
score: float,
user_name: str
) -> Dict[str, Any]:
"""
颁发考试合格证书
Args:
user_id: 用户ID
exam_id: 考试ID
exam_name: 考试名称
score: 分数
user_name: 用户姓名
Returns:
证书信息
"""
# 检查是否已颁发
existing = await self.db.execute(
select(UserCertificate).where(
UserCertificate.user_id == user_id,
UserCertificate.exam_id == exam_id
)
)
if existing.scalar_one_or_none():
raise ValueError("该考试证书已颁发")
# 获取考试证书模板
result = await self.db.execute(
select(CertificateTemplate).where(
CertificateTemplate.type == CertificateType.EXAM,
CertificateTemplate.is_active == True
)
)
template = result.scalar_one_or_none()
if not template:
raise ValueError("证书模板不存在")
# 生成证书编号
cert_no = await self._generate_certificate_no()
# 创建证书
certificate = UserCertificate(
user_id=user_id,
template_id=template.id,
certificate_no=cert_no,
title=f"{exam_name}》考试合格证书",
description=f"在《{exam_name}》考试中成绩合格",
exam_id=exam_id,
score=score,
meta_data={
"exam_name": exam_name,
"user_name": user_name,
"score": score
}
)
self.db.add(certificate)
await self.db.flush()
logger.info(f"颁发考试证书: user_id={user_id}, exam_id={exam_id}, cert_no={cert_no}")
return await self._format_certificate(certificate, template)
async def issue_achievement_certificate(
self,
user_id: int,
badge_id: int,
badge_name: str,
badge_description: str,
user_name: str
) -> Dict[str, Any]:
"""
颁发成就证书
Args:
user_id: 用户ID
badge_id: 奖章ID
badge_name: 奖章名称
badge_description: 奖章描述
user_name: 用户姓名
Returns:
证书信息
"""
# 检查是否已颁发
existing = await self.db.execute(
select(UserCertificate).where(
UserCertificate.user_id == user_id,
UserCertificate.badge_id == badge_id
)
)
if existing.scalar_one_or_none():
raise ValueError("该成就证书已颁发")
# 获取成就证书模板
result = await self.db.execute(
select(CertificateTemplate).where(
CertificateTemplate.type == CertificateType.ACHIEVEMENT,
CertificateTemplate.is_active == True
)
)
template = result.scalar_one_or_none()
if not template:
raise ValueError("证书模板不存在")
# 生成证书编号
cert_no = await self._generate_certificate_no()
# 创建证书
certificate = UserCertificate(
user_id=user_id,
template_id=template.id,
certificate_no=cert_no,
title=f"{badge_name}」成就证书",
description=badge_description,
badge_id=badge_id,
meta_data={
"badge_name": badge_name,
"badge_description": badge_description,
"user_name": user_name
}
)
self.db.add(certificate)
await self.db.flush()
logger.info(f"颁发成就证书: user_id={user_id}, badge_id={badge_id}, cert_no={cert_no}")
return await self._format_certificate(certificate, template)
async def get_user_certificates(
self,
user_id: int,
cert_type: Optional[str] = None,
offset: int = 0,
limit: int = 20
) -> Dict[str, Any]:
"""
获取用户证书列表
Args:
user_id: 用户ID
cert_type: 证书类型过滤
offset: 偏移量
limit: 数量限制
Returns:
证书列表和分页信息
"""
query = (
select(UserCertificate, CertificateTemplate)
.join(CertificateTemplate, UserCertificate.template_id == CertificateTemplate.id)
.where(UserCertificate.user_id == user_id)
)
if cert_type:
query = query.where(CertificateTemplate.type == cert_type)
# 获取总数
count_query = select(func.count()).select_from(query.subquery())
total_result = await self.db.execute(count_query)
total = total_result.scalar() or 0
# 分页查询
query = query.order_by(UserCertificate.issued_at.desc()).offset(offset).limit(limit)
result = await self.db.execute(query)
rows = result.all()
certificates = [
await self._format_certificate(cert, template)
for cert, template in rows
]
return {
"items": certificates,
"total": total,
"offset": offset,
"limit": limit
}
async def get_certificate_by_id(self, cert_id: int) -> Optional[Dict[str, Any]]:
"""根据ID获取证书"""
result = await self.db.execute(
select(UserCertificate, CertificateTemplate)
.join(CertificateTemplate, UserCertificate.template_id == CertificateTemplate.id)
.where(UserCertificate.id == cert_id)
)
row = result.first()
if not row:
return None
cert, template = row
return await self._format_certificate(cert, template)
async def get_certificate_by_no(self, cert_no: str) -> Optional[Dict[str, Any]]:
"""根据编号获取证书(用于验证)"""
result = await self.db.execute(
select(UserCertificate, CertificateTemplate)
.join(CertificateTemplate, UserCertificate.template_id == CertificateTemplate.id)
.where(UserCertificate.certificate_no == cert_no)
)
row = result.first()
if not row:
return None
cert, template = row
return await self._format_certificate(cert, template, include_user=True)
async def _format_certificate(
self,
cert: UserCertificate,
template: CertificateTemplate,
include_user: bool = False
) -> Dict[str, Any]:
"""格式化证书数据"""
data = {
"id": cert.id,
"certificate_no": cert.certificate_no,
"title": cert.title,
"description": cert.description,
"type": template.type.value if isinstance(template.type, CertificateType) else template.type,
"type_name": self._get_type_name(template.type),
"issued_at": cert.issued_at.isoformat() if cert.issued_at else None,
"valid_until": cert.valid_until.isoformat() if cert.valid_until else None,
"score": float(cert.score) if cert.score else None,
"completion_rate": float(cert.completion_rate) if cert.completion_rate else None,
"pdf_url": cert.pdf_url,
"image_url": cert.image_url,
"course_id": cert.course_id,
"exam_id": cert.exam_id,
"badge_id": cert.badge_id,
"meta_data": cert.meta_data,
"template": {
"id": template.id,
"name": template.name,
"background_url": template.background_url,
}
}
if include_user and cert.user:
data["user"] = {
"id": cert.user.id,
"username": cert.user.username,
"full_name": cert.user.full_name,
}
return data
def _get_type_name(self, cert_type) -> str:
"""获取证书类型名称"""
type_names = {
CertificateType.COURSE: "课程结业证书",
CertificateType.EXAM: "考试合格证书",
CertificateType.ACHIEVEMENT: "成就证书",
"course": "课程结业证书",
"exam": "考试合格证书",
"achievement": "成就证书",
}
return type_names.get(cert_type, "证书")
async def generate_certificate_image(
self,
cert_id: int,
base_url: str = ""
) -> bytes:
"""
生成证书分享图片
Args:
cert_id: 证书ID
base_url: 基础URL用于生成二维码链接
Returns:
图片二进制数据
"""
# 获取证书信息
cert_data = await self.get_certificate_by_id(cert_id)
if not cert_data:
raise ValueError("证书不存在")
# 创建图片
width, height = 800, 600
img = Image.new('RGB', (width, height), color='#f5f7fa')
draw = ImageDraw.Draw(img)
# 尝试加载字体,如果失败则使用默认字体
try:
title_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 36)
text_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 20)
small_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 14)
except:
title_font = ImageFont.load_default()
text_font = ImageFont.load_default()
small_font = ImageFont.load_default()
# 绘制标题
title = cert_data.get("type_name", "证书")
draw.text((width // 2, 60), title, font=title_font, fill='#333333', anchor='mm')
# 绘制证书标题
cert_title = cert_data.get("title", "")
draw.text((width // 2, 140), cert_title, font=text_font, fill='#666666', anchor='mm')
# 绘制描述
description = cert_data.get("description", "")
draw.text((width // 2, 200), description, font=text_font, fill='#666666', anchor='mm')
# 绘制分数/完成率(如果有)
if cert_data.get("score"):
score_text = f"成绩:{cert_data['score']}"
draw.text((width // 2, 280), score_text, font=text_font, fill='#667eea', anchor='mm')
elif cert_data.get("completion_rate"):
rate_text = f"完成率:{cert_data['completion_rate']}%"
draw.text((width // 2, 280), rate_text, font=text_font, fill='#667eea', anchor='mm')
# 绘制颁发日期
if cert_data.get("issued_at"):
date_text = f"颁发日期:{cert_data['issued_at'][:10]}"
draw.text((width // 2, 360), date_text, font=small_font, fill='#999999', anchor='mm')
# 绘制证书编号
cert_no = cert_data.get("certificate_no", "")
draw.text((width // 2, 520), f"证书编号:{cert_no}", font=small_font, fill='#999999', anchor='mm')
# 生成验证二维码
if base_url and cert_no:
verify_url = f"{base_url}/verify/{cert_no}"
qr = qrcode.QRCode(version=1, box_size=3, border=2)
qr.add_data(verify_url)
qr.make(fit=True)
qr_img = qr.make_image(fill_color="black", back_color="white")
qr_img = qr_img.resize((80, 80))
img.paste(qr_img, (width - 100, height - 100))
# 转换为字节
img_bytes = io.BytesIO()
img.save(img_bytes, format='PNG')
img_bytes.seek(0)
return img_bytes.getvalue()
async def update_certificate_files(
self,
cert_id: int,
pdf_url: Optional[str] = None,
image_url: Optional[str] = None
):
"""更新证书文件URL"""
result = await self.db.execute(
select(UserCertificate).where(UserCertificate.id == cert_id)
)
cert = result.scalar_one_or_none()
if cert:
if pdf_url:
cert.pdf_url = pdf_url
if image_url:
cert.image_url = image_url
await self.db.flush()