Files
012-kaopeilian/backend/app/services/certificate_service.py
yuliang_guo 64f5d567fa
Some checks failed
continuous-integration/drone/push Build is failing
feat: 实现 KPL 系统功能改进计划
1. 课程学习进度追踪
   - 新增 UserCourseProgress 和 UserMaterialProgress 模型
   - 新增 /api/v1/progress/* 进度追踪 API
   - 更新 admin.py 使用真实课程完成率数据

2. 路由权限检查完善
   - 新增前端 permissionChecker.ts 权限检查工具
   - 更新 router/guard.ts 实现团队和课程权限验证
   - 新增后端 permission_service.py

3. AI 陪练音频转文本
   - 新增 speech_recognition.py 语音识别服务
   - 新增 /api/v1/speech/* API
   - 更新 ai-practice-coze.vue 支持语音输入

4. 双人对练报告生成
   - 更新 practice_room_service.py 添加报告生成功能
   - 新增 /rooms/{room_code}/report API
   - 更新 duo-practice-report.vue 调用真实 API

5. 学习提醒推送
   - 新增 notification_service.py 通知服务
   - 新增 scheduler_service.py 定时任务服务
   - 支持钉钉、企微、站内消息推送

6. 智能学习推荐
   - 新增 recommendation_service.py 推荐服务
   - 新增 /api/v1/recommendations/* API
   - 支持错题、能力、进度、热门多维度推荐

7. 安全问题修复
   - DEBUG 默认值改为 False
   - 添加 SECRET_KEY 安全警告
   - 新增 check_security_settings() 检查函数

8. 证书 PDF 生成
   - 更新 certificate_service.py 添加 PDF 生成
   - 添加 weasyprint、Pillow、qrcode 依赖
   - 更新下载 API 支持 PDF 和 PNG 格式
2026-01-30 14:22:35 +08:00

759 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
证书服务
提供证书管理功能:
- 颁发证书
- 获取证书列表
- 生成证书PDF/图片
- 验证证书
"""
import os
import io
import uuid
from datetime import datetime
from typing import Optional, List, Dict, Any
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from PIL import Image, ImageDraw, ImageFont
import qrcode
from app.core.logger import get_logger
from app.core.config import settings
from app.models.certificate import CertificateTemplate, UserCertificate, CertificateType
logger = get_logger(__name__)
class CertificateService:
"""证书服务"""
# 证书编号前缀
CERT_NO_PREFIX = "KPL"
def __init__(self, db: AsyncSession):
self.db = db
async def get_templates(self, cert_type: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取证书模板列表
Args:
cert_type: 证书类型过滤
Returns:
模板列表
"""
query = select(CertificateTemplate).where(CertificateTemplate.is_active == True)
if cert_type:
query = query.where(CertificateTemplate.type == cert_type)
query = query.order_by(CertificateTemplate.sort_order)
result = await self.db.execute(query)
templates = result.scalars().all()
return [
{
"id": t.id,
"name": t.name,
"type": t.type.value if isinstance(t.type, CertificateType) else t.type,
"background_url": t.background_url,
"is_active": t.is_active,
}
for t in templates
]
async def _generate_certificate_no(self) -> str:
"""生成唯一证书编号"""
year = datetime.now().year
# 获取当年的证书数量
result = await self.db.execute(
select(func.count(UserCertificate.id))
.where(UserCertificate.certificate_no.like(f"{self.CERT_NO_PREFIX}-{year}-%"))
)
count = result.scalar() or 0
# 生成编号KPL-年份-6位序号
cert_no = f"{self.CERT_NO_PREFIX}-{year}-{str(count + 1).zfill(6)}"
return cert_no
async def issue_course_certificate(
self,
user_id: int,
course_id: int,
course_name: str,
completion_rate: float,
user_name: str
) -> Dict[str, Any]:
"""
颁发课程结业证书
Args:
user_id: 用户ID
course_id: 课程ID
course_name: 课程名称
completion_rate: 完成率
user_name: 用户姓名
Returns:
证书信息
"""
# 检查是否已颁发
existing = await self.db.execute(
select(UserCertificate).where(
UserCertificate.user_id == user_id,
UserCertificate.course_id == course_id
)
)
if existing.scalar_one_or_none():
raise ValueError("该课程证书已颁发")
# 获取课程证书模板
result = await self.db.execute(
select(CertificateTemplate).where(
CertificateTemplate.type == CertificateType.COURSE,
CertificateTemplate.is_active == True
)
)
template = result.scalar_one_or_none()
if not template:
raise ValueError("证书模板不存在")
# 生成证书编号
cert_no = await self._generate_certificate_no()
# 创建证书
certificate = UserCertificate(
user_id=user_id,
template_id=template.id,
certificate_no=cert_no,
title=f"{course_name}》课程结业证书",
description=f"完成课程《{course_name}》的全部学习内容",
course_id=course_id,
completion_rate=completion_rate,
meta_data={
"course_name": course_name,
"user_name": user_name,
"completion_rate": completion_rate
}
)
self.db.add(certificate)
await self.db.flush()
logger.info(f"颁发课程证书: user_id={user_id}, course_id={course_id}, cert_no={cert_no}")
return await self._format_certificate(certificate, template)
async def issue_exam_certificate(
self,
user_id: int,
exam_id: int,
exam_name: str,
score: float,
user_name: str
) -> Dict[str, Any]:
"""
颁发考试合格证书
Args:
user_id: 用户ID
exam_id: 考试ID
exam_name: 考试名称
score: 分数
user_name: 用户姓名
Returns:
证书信息
"""
# 检查是否已颁发
existing = await self.db.execute(
select(UserCertificate).where(
UserCertificate.user_id == user_id,
UserCertificate.exam_id == exam_id
)
)
if existing.scalar_one_or_none():
raise ValueError("该考试证书已颁发")
# 获取考试证书模板
result = await self.db.execute(
select(CertificateTemplate).where(
CertificateTemplate.type == CertificateType.EXAM,
CertificateTemplate.is_active == True
)
)
template = result.scalar_one_or_none()
if not template:
raise ValueError("证书模板不存在")
# 生成证书编号
cert_no = await self._generate_certificate_no()
# 创建证书
certificate = UserCertificate(
user_id=user_id,
template_id=template.id,
certificate_no=cert_no,
title=f"{exam_name}》考试合格证书",
description=f"在《{exam_name}》考试中成绩合格",
exam_id=exam_id,
score=score,
meta_data={
"exam_name": exam_name,
"user_name": user_name,
"score": score
}
)
self.db.add(certificate)
await self.db.flush()
logger.info(f"颁发考试证书: user_id={user_id}, exam_id={exam_id}, cert_no={cert_no}")
return await self._format_certificate(certificate, template)
async def issue_achievement_certificate(
self,
user_id: int,
badge_id: int,
badge_name: str,
badge_description: str,
user_name: str
) -> Dict[str, Any]:
"""
颁发成就证书
Args:
user_id: 用户ID
badge_id: 奖章ID
badge_name: 奖章名称
badge_description: 奖章描述
user_name: 用户姓名
Returns:
证书信息
"""
# 检查是否已颁发
existing = await self.db.execute(
select(UserCertificate).where(
UserCertificate.user_id == user_id,
UserCertificate.badge_id == badge_id
)
)
if existing.scalar_one_or_none():
raise ValueError("该成就证书已颁发")
# 获取成就证书模板
result = await self.db.execute(
select(CertificateTemplate).where(
CertificateTemplate.type == CertificateType.ACHIEVEMENT,
CertificateTemplate.is_active == True
)
)
template = result.scalar_one_or_none()
if not template:
raise ValueError("证书模板不存在")
# 生成证书编号
cert_no = await self._generate_certificate_no()
# 创建证书
certificate = UserCertificate(
user_id=user_id,
template_id=template.id,
certificate_no=cert_no,
title=f"{badge_name}」成就证书",
description=badge_description,
badge_id=badge_id,
meta_data={
"badge_name": badge_name,
"badge_description": badge_description,
"user_name": user_name
}
)
self.db.add(certificate)
await self.db.flush()
logger.info(f"颁发成就证书: user_id={user_id}, badge_id={badge_id}, cert_no={cert_no}")
return await self._format_certificate(certificate, template)
async def get_user_certificates(
self,
user_id: int,
cert_type: Optional[str] = None,
offset: int = 0,
limit: int = 20
) -> Dict[str, Any]:
"""
获取用户证书列表
Args:
user_id: 用户ID
cert_type: 证书类型过滤
offset: 偏移量
limit: 数量限制
Returns:
证书列表和分页信息
"""
query = (
select(UserCertificate, CertificateTemplate)
.join(CertificateTemplate, UserCertificate.template_id == CertificateTemplate.id)
.where(UserCertificate.user_id == user_id)
)
if cert_type:
query = query.where(CertificateTemplate.type == cert_type)
# 获取总数
count_query = select(func.count()).select_from(query.subquery())
total_result = await self.db.execute(count_query)
total = total_result.scalar() or 0
# 分页查询
query = query.order_by(UserCertificate.issued_at.desc()).offset(offset).limit(limit)
result = await self.db.execute(query)
rows = result.all()
certificates = [
await self._format_certificate(cert, template)
for cert, template in rows
]
return {
"items": certificates,
"total": total,
"offset": offset,
"limit": limit
}
async def get_certificate_by_id(self, cert_id: int) -> Optional[Dict[str, Any]]:
"""根据ID获取证书"""
result = await self.db.execute(
select(UserCertificate, CertificateTemplate)
.join(CertificateTemplate, UserCertificate.template_id == CertificateTemplate.id)
.where(UserCertificate.id == cert_id)
)
row = result.first()
if not row:
return None
cert, template = row
return await self._format_certificate(cert, template)
async def get_certificate_by_no(self, cert_no: str) -> Optional[Dict[str, Any]]:
"""根据编号获取证书(用于验证)"""
result = await self.db.execute(
select(UserCertificate, CertificateTemplate)
.join(CertificateTemplate, UserCertificate.template_id == CertificateTemplate.id)
.where(UserCertificate.certificate_no == cert_no)
)
row = result.first()
if not row:
return None
cert, template = row
return await self._format_certificate(cert, template, include_user=True)
async def _format_certificate(
self,
cert: UserCertificate,
template: CertificateTemplate,
include_user: bool = False
) -> Dict[str, Any]:
"""格式化证书数据"""
data = {
"id": cert.id,
"certificate_no": cert.certificate_no,
"title": cert.title,
"description": cert.description,
"type": template.type.value if isinstance(template.type, CertificateType) else template.type,
"type_name": self._get_type_name(template.type),
"issued_at": cert.issued_at.isoformat() if cert.issued_at else None,
"valid_until": cert.valid_until.isoformat() if cert.valid_until else None,
"score": float(cert.score) if cert.score else None,
"completion_rate": float(cert.completion_rate) if cert.completion_rate else None,
"pdf_url": cert.pdf_url,
"image_url": cert.image_url,
"course_id": cert.course_id,
"exam_id": cert.exam_id,
"badge_id": cert.badge_id,
"meta_data": cert.meta_data,
"template": {
"id": template.id,
"name": template.name,
"background_url": template.background_url,
}
}
if include_user and cert.user:
data["user"] = {
"id": cert.user.id,
"username": cert.user.username,
"full_name": cert.user.full_name,
}
return data
def _get_type_name(self, cert_type) -> str:
"""获取证书类型名称"""
type_names = {
CertificateType.COURSE: "课程结业证书",
CertificateType.EXAM: "考试合格证书",
CertificateType.ACHIEVEMENT: "成就证书",
"course": "课程结业证书",
"exam": "考试合格证书",
"achievement": "成就证书",
}
return type_names.get(cert_type, "证书")
async def generate_certificate_image(
self,
cert_id: int,
base_url: str = ""
) -> bytes:
"""
生成证书分享图片
Args:
cert_id: 证书ID
base_url: 基础URL用于生成二维码链接
Returns:
图片二进制数据
"""
# 获取证书信息
cert_data = await self.get_certificate_by_id(cert_id)
if not cert_data:
raise ValueError("证书不存在")
# 创建图片
width, height = 800, 600
img = Image.new('RGB', (width, height), color='#f5f7fa')
draw = ImageDraw.Draw(img)
# 尝试加载字体,如果失败则使用默认字体
try:
title_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 36)
text_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 20)
small_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 14)
except:
title_font = ImageFont.load_default()
text_font = ImageFont.load_default()
small_font = ImageFont.load_default()
# 绘制标题
title = cert_data.get("type_name", "证书")
draw.text((width // 2, 60), title, font=title_font, fill='#333333', anchor='mm')
# 绘制证书标题
cert_title = cert_data.get("title", "")
draw.text((width // 2, 140), cert_title, font=text_font, fill='#666666', anchor='mm')
# 绘制描述
description = cert_data.get("description", "")
draw.text((width // 2, 200), description, font=text_font, fill='#666666', anchor='mm')
# 绘制分数/完成率(如果有)
if cert_data.get("score"):
score_text = f"成绩:{cert_data['score']}"
draw.text((width // 2, 280), score_text, font=text_font, fill='#667eea', anchor='mm')
elif cert_data.get("completion_rate"):
rate_text = f"完成率:{cert_data['completion_rate']}%"
draw.text((width // 2, 280), rate_text, font=text_font, fill='#667eea', anchor='mm')
# 绘制颁发日期
if cert_data.get("issued_at"):
date_text = f"颁发日期:{cert_data['issued_at'][:10]}"
draw.text((width // 2, 360), date_text, font=small_font, fill='#999999', anchor='mm')
# 绘制证书编号
cert_no = cert_data.get("certificate_no", "")
draw.text((width // 2, 520), f"证书编号:{cert_no}", font=small_font, fill='#999999', anchor='mm')
# 生成验证二维码
if base_url and cert_no:
verify_url = f"{base_url}/verify/{cert_no}"
qr = qrcode.QRCode(version=1, box_size=3, border=2)
qr.add_data(verify_url)
qr.make(fit=True)
qr_img = qr.make_image(fill_color="black", back_color="white")
qr_img = qr_img.resize((80, 80))
img.paste(qr_img, (width - 100, height - 100))
# 转换为字节
img_bytes = io.BytesIO()
img.save(img_bytes, format='PNG')
img_bytes.seek(0)
return img_bytes.getvalue()
async def update_certificate_files(
self,
cert_id: int,
pdf_url: Optional[str] = None,
image_url: Optional[str] = None
):
"""更新证书文件URL"""
result = await self.db.execute(
select(UserCertificate).where(UserCertificate.id == cert_id)
)
cert = result.scalar_one_or_none()
if cert:
if pdf_url:
cert.pdf_url = pdf_url
if image_url:
cert.image_url = image_url
await self.db.flush()
async def generate_certificate_pdf(
self,
cert_id: int,
base_url: str = ""
) -> bytes:
"""
生成证书 PDF
使用 HTML 模板渲染后转换为 PDF
Args:
cert_id: 证书ID
base_url: 基础URL用于生成二维码链接
Returns:
PDF 二进制数据
"""
# 获取证书信息
cert_data = await self.get_certificate_by_id(cert_id)
if not cert_data:
raise ValueError("证书不存在")
# 获取用户信息
from app.models.user import User
user_result = await self.db.execute(
select(User).join(UserCertificate, UserCertificate.user_id == User.id)
.where(UserCertificate.id == cert_id)
)
user = user_result.scalar_one_or_none()
user_name = user.full_name or user.username if user else "未知用户"
# 生成验证二维码 base64
qr_base64 = ""
cert_no = cert_data.get("certificate_no", "")
if cert_no:
import base64
verify_url = f"{base_url}/verify/{cert_no}" if base_url else cert_no
qr = qrcode.QRCode(version=1, box_size=3, border=2)
qr.add_data(verify_url)
qr.make(fit=True)
qr_img = qr.make_image(fill_color="black", back_color="white")
qr_bytes = io.BytesIO()
qr_img.save(qr_bytes, format='PNG')
qr_bytes.seek(0)
qr_base64 = base64.b64encode(qr_bytes.getvalue()).decode('utf-8')
# HTML 模板
html_template = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
@page {{
size: A4 landscape;
margin: 20mm;
}}
body {{
font-family: "Microsoft YaHei", "SimHei", Arial, sans-serif;
margin: 0;
padding: 40px;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
box-sizing: border-box;
}}
.certificate {{
background: white;
border-radius: 20px;
padding: 60px;
box-shadow: 0 20px 60px rgba(0,0,0,0.3);
text-align: center;
position: relative;
}}
.border-decoration {{
position: absolute;
top: 20px;
left: 20px;
right: 20px;
bottom: 20px;
border: 3px solid #667eea;
border-radius: 15px;
pointer-events: none;
}}
.header {{
color: #667eea;
font-size: 16px;
letter-spacing: 8px;
margin-bottom: 20px;
}}
.type-name {{
color: #333;
font-size: 36px;
font-weight: bold;
margin-bottom: 30px;
}}
.user-name {{
color: #667eea;
font-size: 42px;
font-weight: bold;
margin: 30px 0;
border-bottom: 3px solid #667eea;
display: inline-block;
padding-bottom: 10px;
}}
.title {{
color: #333;
font-size: 24px;
margin: 20px 0;
}}
.description {{
color: #666;
font-size: 18px;
margin: 20px 0;
line-height: 1.6;
}}
.score {{
color: #667eea;
font-size: 28px;
font-weight: bold;
margin: 20px 0;
}}
.footer {{
margin-top: 40px;
display: flex;
justify-content: space-between;
align-items: flex-end;
}}
.date-section {{
text-align: left;
}}
.date-label {{
color: #999;
font-size: 14px;
}}
.date-value {{
color: #333;
font-size: 18px;
margin-top: 5px;
}}
.qr-section {{
text-align: right;
}}
.qr-code {{
width: 80px;
height: 80px;
}}
.cert-no {{
color: #999;
font-size: 12px;
margin-top: 5px;
}}
.seal {{
position: absolute;
right: 100px;
bottom: 120px;
width: 120px;
height: 120px;
border: 4px solid #e74c3c;
border-radius: 50%;
display: flex;
align-items: center;
justify-content: center;
color: #e74c3c;
font-weight: bold;
font-size: 14px;
transform: rotate(-15deg);
opacity: 0.8;
}}
</style>
</head>
<body>
<div class="certificate">
<div class="border-decoration"></div>
<div class="header">考培练学习平台</div>
<div class="type-name">{cert_data.get('type_name', '证书')}</div>
<div class="user-name">{user_name}</div>
<div class="title">{cert_data.get('title', '')}</div>
<div class="description">{cert_data.get('description', '')}</div>
{"<div class='score'>成绩:" + str(cert_data.get('score')) + "分</div>" if cert_data.get('score') else ""}
{"<div class='score'>完成率:" + str(cert_data.get('completion_rate')) + "%</div>" if cert_data.get('completion_rate') else ""}
<div class="footer">
<div class="date-section">
<div class="date-label">颁发日期</div>
<div class="date-value">{cert_data.get('issued_at', '')[:10] if cert_data.get('issued_at') else ''}</div>
</div>
<div class="qr-section">
{"<img class='qr-code' src='data:image/png;base64," + qr_base64 + "' alt='验证二维码'>" if qr_base64 else ""}
<div class="cert-no">证书编号:{cert_no}</div>
</div>
</div>
<div class="seal">官方认证</div>
</div>
</body>
</html>
"""
# 尝试使用 weasyprint 生成 PDF
try:
from weasyprint import HTML
pdf_bytes = HTML(string=html_template).write_pdf()
return pdf_bytes
except ImportError:
logger.warning("weasyprint 未安装,使用备用方案")
# 备用方案:返回 HTML 供前端处理
return html_template.encode('utf-8')
except Exception as e:
logger.error(f"生成 PDF 失败: {str(e)}")
raise ValueError(f"生成 PDF 失败: {str(e)}")
async def download_certificate(
self,
cert_id: int,
format: str = "pdf",
base_url: str = ""
) -> tuple[bytes, str, str]:
"""
下载证书
Args:
cert_id: 证书ID
format: 格式 (pdf/png)
base_url: 基础URL
Returns:
(文件内容, 文件名, MIME类型)
"""
cert_data = await self.get_certificate_by_id(cert_id)
if not cert_data:
raise ValueError("证书不存在")
cert_no = cert_data.get("certificate_no", "certificate")
if format.lower() == "pdf":
content = await self.generate_certificate_pdf(cert_id, base_url)
filename = f"{cert_no}.pdf"
mime_type = "application/pdf"
else:
content = await self.generate_certificate_image(cert_id, base_url)
filename = f"{cert_no}.png"
mime_type = "image/png"
return content, filename, mime_type