""" 证书服务 提供证书管理功能: - 颁发证书 - 获取证书列表 - 生成证书PDF/图片 - 验证证书 """ import os import io import uuid from datetime import datetime from typing import Optional, List, Dict, Any from sqlalchemy import select, func, and_ from sqlalchemy.ext.asyncio import AsyncSession from PIL import Image, ImageDraw, ImageFont import qrcode from app.core.logger import get_logger from app.core.config import settings from app.models.certificate import CertificateTemplate, UserCertificate, CertificateType logger = get_logger(__name__) class CertificateService: """证书服务""" # 证书编号前缀 CERT_NO_PREFIX = "KPL" def __init__(self, db: AsyncSession): self.db = db async def get_templates(self, cert_type: Optional[str] = None) -> List[Dict[str, Any]]: """ 获取证书模板列表 Args: cert_type: 证书类型过滤 Returns: 模板列表 """ query = select(CertificateTemplate).where(CertificateTemplate.is_active == True) if cert_type: query = query.where(CertificateTemplate.type == cert_type) query = query.order_by(CertificateTemplate.sort_order) result = await self.db.execute(query) templates = result.scalars().all() return [ { "id": t.id, "name": t.name, "type": t.type.value if isinstance(t.type, CertificateType) else t.type, "background_url": t.background_url, "is_active": t.is_active, } for t in templates ] async def _generate_certificate_no(self) -> str: """生成唯一证书编号""" year = datetime.now().year # 获取当年的证书数量 result = await self.db.execute( select(func.count(UserCertificate.id)) .where(UserCertificate.certificate_no.like(f"{self.CERT_NO_PREFIX}-{year}-%")) ) count = result.scalar() or 0 # 生成编号:KPL-年份-6位序号 cert_no = f"{self.CERT_NO_PREFIX}-{year}-{str(count + 1).zfill(6)}" return cert_no async def issue_course_certificate( self, user_id: int, course_id: int, course_name: str, completion_rate: float, user_name: str ) -> Dict[str, Any]: """ 颁发课程结业证书 Args: user_id: 用户ID course_id: 课程ID course_name: 课程名称 completion_rate: 完成率 user_name: 用户姓名 Returns: 证书信息 """ # 检查是否已颁发 existing = await self.db.execute( select(UserCertificate).where( UserCertificate.user_id == user_id, UserCertificate.course_id == course_id ) ) if existing.scalar_one_or_none(): raise ValueError("该课程证书已颁发") # 获取课程证书模板 result = await self.db.execute( select(CertificateTemplate).where( CertificateTemplate.type == CertificateType.COURSE, CertificateTemplate.is_active == True ) ) template = result.scalar_one_or_none() if not template: raise ValueError("证书模板不存在") # 生成证书编号 cert_no = await self._generate_certificate_no() # 创建证书 certificate = UserCertificate( user_id=user_id, template_id=template.id, certificate_no=cert_no, title=f"《{course_name}》课程结业证书", description=f"完成课程《{course_name}》的全部学习内容", course_id=course_id, completion_rate=completion_rate, meta_data={ "course_name": course_name, "user_name": user_name, "completion_rate": completion_rate } ) self.db.add(certificate) await self.db.flush() logger.info(f"颁发课程证书: user_id={user_id}, course_id={course_id}, cert_no={cert_no}") return await self._format_certificate(certificate, template) async def issue_exam_certificate( self, user_id: int, exam_id: int, exam_name: str, score: float, user_name: str ) -> Dict[str, Any]: """ 颁发考试合格证书 Args: user_id: 用户ID exam_id: 考试ID exam_name: 考试名称 score: 分数 user_name: 用户姓名 Returns: 证书信息 """ # 检查是否已颁发 existing = await self.db.execute( select(UserCertificate).where( UserCertificate.user_id == user_id, UserCertificate.exam_id == exam_id ) ) if existing.scalar_one_or_none(): raise ValueError("该考试证书已颁发") # 获取考试证书模板 result = await self.db.execute( select(CertificateTemplate).where( CertificateTemplate.type == CertificateType.EXAM, CertificateTemplate.is_active == True ) ) template = result.scalar_one_or_none() if not template: raise ValueError("证书模板不存在") # 生成证书编号 cert_no = await self._generate_certificate_no() # 创建证书 certificate = UserCertificate( user_id=user_id, template_id=template.id, certificate_no=cert_no, title=f"《{exam_name}》考试合格证书", description=f"在《{exam_name}》考试中成绩合格", exam_id=exam_id, score=score, meta_data={ "exam_name": exam_name, "user_name": user_name, "score": score } ) self.db.add(certificate) await self.db.flush() logger.info(f"颁发考试证书: user_id={user_id}, exam_id={exam_id}, cert_no={cert_no}") return await self._format_certificate(certificate, template) async def issue_achievement_certificate( self, user_id: int, badge_id: int, badge_name: str, badge_description: str, user_name: str ) -> Dict[str, Any]: """ 颁发成就证书 Args: user_id: 用户ID badge_id: 奖章ID badge_name: 奖章名称 badge_description: 奖章描述 user_name: 用户姓名 Returns: 证书信息 """ # 检查是否已颁发 existing = await self.db.execute( select(UserCertificate).where( UserCertificate.user_id == user_id, UserCertificate.badge_id == badge_id ) ) if existing.scalar_one_or_none(): raise ValueError("该成就证书已颁发") # 获取成就证书模板 result = await self.db.execute( select(CertificateTemplate).where( CertificateTemplate.type == CertificateType.ACHIEVEMENT, CertificateTemplate.is_active == True ) ) template = result.scalar_one_or_none() if not template: raise ValueError("证书模板不存在") # 生成证书编号 cert_no = await self._generate_certificate_no() # 创建证书 certificate = UserCertificate( user_id=user_id, template_id=template.id, certificate_no=cert_no, title=f"「{badge_name}」成就证书", description=badge_description, badge_id=badge_id, meta_data={ "badge_name": badge_name, "badge_description": badge_description, "user_name": user_name } ) self.db.add(certificate) await self.db.flush() logger.info(f"颁发成就证书: user_id={user_id}, badge_id={badge_id}, cert_no={cert_no}") return await self._format_certificate(certificate, template) async def get_user_certificates( self, user_id: int, cert_type: Optional[str] = None, offset: int = 0, limit: int = 20 ) -> Dict[str, Any]: """ 获取用户证书列表 Args: user_id: 用户ID cert_type: 证书类型过滤 offset: 偏移量 limit: 数量限制 Returns: 证书列表和分页信息 """ query = ( select(UserCertificate, CertificateTemplate) .join(CertificateTemplate, UserCertificate.template_id == CertificateTemplate.id) .where(UserCertificate.user_id == user_id) ) if cert_type: query = query.where(CertificateTemplate.type == cert_type) # 获取总数 count_query = select(func.count()).select_from(query.subquery()) total_result = await self.db.execute(count_query) total = total_result.scalar() or 0 # 分页查询 query = query.order_by(UserCertificate.issued_at.desc()).offset(offset).limit(limit) result = await self.db.execute(query) rows = result.all() certificates = [ await self._format_certificate(cert, template) for cert, template in rows ] return { "items": certificates, "total": total, "offset": offset, "limit": limit } async def get_certificate_by_id(self, cert_id: int) -> Optional[Dict[str, Any]]: """根据ID获取证书""" result = await self.db.execute( select(UserCertificate, CertificateTemplate) .join(CertificateTemplate, UserCertificate.template_id == CertificateTemplate.id) .where(UserCertificate.id == cert_id) ) row = result.first() if not row: return None cert, template = row return await self._format_certificate(cert, template) async def get_certificate_by_no(self, cert_no: str) -> Optional[Dict[str, Any]]: """根据编号获取证书(用于验证)""" result = await self.db.execute( select(UserCertificate, CertificateTemplate) .join(CertificateTemplate, UserCertificate.template_id == CertificateTemplate.id) .where(UserCertificate.certificate_no == cert_no) ) row = result.first() if not row: return None cert, template = row return await self._format_certificate(cert, template, include_user=True) async def _format_certificate( self, cert: UserCertificate, template: CertificateTemplate, include_user: bool = False ) -> Dict[str, Any]: """格式化证书数据""" data = { "id": cert.id, "certificate_no": cert.certificate_no, "title": cert.title, "description": cert.description, "type": template.type.value if isinstance(template.type, CertificateType) else template.type, "type_name": self._get_type_name(template.type), "issued_at": cert.issued_at.isoformat() if cert.issued_at else None, "valid_until": cert.valid_until.isoformat() if cert.valid_until else None, "score": float(cert.score) if cert.score else None, "completion_rate": float(cert.completion_rate) if cert.completion_rate else None, "pdf_url": cert.pdf_url, "image_url": cert.image_url, "course_id": cert.course_id, "exam_id": cert.exam_id, "badge_id": cert.badge_id, "meta_data": cert.meta_data, "template": { "id": template.id, "name": template.name, "background_url": template.background_url, } } if include_user and cert.user: data["user"] = { "id": cert.user.id, "username": cert.user.username, "full_name": cert.user.full_name, } return data def _get_type_name(self, cert_type) -> str: """获取证书类型名称""" type_names = { CertificateType.COURSE: "课程结业证书", CertificateType.EXAM: "考试合格证书", CertificateType.ACHIEVEMENT: "成就证书", "course": "课程结业证书", "exam": "考试合格证书", "achievement": "成就证书", } return type_names.get(cert_type, "证书") async def generate_certificate_image( self, cert_id: int, base_url: str = "" ) -> bytes: """ 生成证书分享图片 Args: cert_id: 证书ID base_url: 基础URL(用于生成二维码链接) Returns: 图片二进制数据 """ # 获取证书信息 cert_data = await self.get_certificate_by_id(cert_id) if not cert_data: raise ValueError("证书不存在") # 创建图片 width, height = 800, 600 img = Image.new('RGB', (width, height), color='#f5f7fa') draw = ImageDraw.Draw(img) # 尝试加载字体,如果失败则使用默认字体 try: title_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 36) text_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 20) small_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 14) except: title_font = ImageFont.load_default() text_font = ImageFont.load_default() small_font = ImageFont.load_default() # 绘制标题 title = cert_data.get("type_name", "证书") draw.text((width // 2, 60), title, font=title_font, fill='#333333', anchor='mm') # 绘制证书标题 cert_title = cert_data.get("title", "") draw.text((width // 2, 140), cert_title, font=text_font, fill='#666666', anchor='mm') # 绘制描述 description = cert_data.get("description", "") draw.text((width // 2, 200), description, font=text_font, fill='#666666', anchor='mm') # 绘制分数/完成率(如果有) if cert_data.get("score"): score_text = f"成绩:{cert_data['score']}分" draw.text((width // 2, 280), score_text, font=text_font, fill='#667eea', anchor='mm') elif cert_data.get("completion_rate"): rate_text = f"完成率:{cert_data['completion_rate']}%" draw.text((width // 2, 280), rate_text, font=text_font, fill='#667eea', anchor='mm') # 绘制颁发日期 if cert_data.get("issued_at"): date_text = f"颁发日期:{cert_data['issued_at'][:10]}" draw.text((width // 2, 360), date_text, font=small_font, fill='#999999', anchor='mm') # 绘制证书编号 cert_no = cert_data.get("certificate_no", "") draw.text((width // 2, 520), f"证书编号:{cert_no}", font=small_font, fill='#999999', anchor='mm') # 生成验证二维码 if base_url and cert_no: verify_url = f"{base_url}/verify/{cert_no}" qr = qrcode.QRCode(version=1, box_size=3, border=2) qr.add_data(verify_url) qr.make(fit=True) qr_img = qr.make_image(fill_color="black", back_color="white") qr_img = qr_img.resize((80, 80)) img.paste(qr_img, (width - 100, height - 100)) # 转换为字节 img_bytes = io.BytesIO() img.save(img_bytes, format='PNG') img_bytes.seek(0) return img_bytes.getvalue() async def update_certificate_files( self, cert_id: int, pdf_url: Optional[str] = None, image_url: Optional[str] = None ): """更新证书文件URL""" result = await self.db.execute( select(UserCertificate).where(UserCertificate.id == cert_id) ) cert = result.scalar_one_or_none() if cert: if pdf_url: cert.pdf_url = pdf_url if image_url: cert.image_url = image_url await self.db.flush() async def generate_certificate_pdf( self, cert_id: int, base_url: str = "" ) -> bytes: """ 生成证书 PDF 使用 HTML 模板渲染后转换为 PDF Args: cert_id: 证书ID base_url: 基础URL(用于生成二维码链接) Returns: PDF 二进制数据 """ # 获取证书信息 cert_data = await self.get_certificate_by_id(cert_id) if not cert_data: raise ValueError("证书不存在") # 获取用户信息 from app.models.user import User user_result = await self.db.execute( select(User).join(UserCertificate, UserCertificate.user_id == User.id) .where(UserCertificate.id == cert_id) ) user = user_result.scalar_one_or_none() user_name = user.full_name or user.username if user else "未知用户" # 生成验证二维码 base64 qr_base64 = "" cert_no = cert_data.get("certificate_no", "") if cert_no: import base64 verify_url = f"{base_url}/verify/{cert_no}" if base_url else cert_no qr = qrcode.QRCode(version=1, box_size=3, border=2) qr.add_data(verify_url) qr.make(fit=True) qr_img = qr.make_image(fill_color="black", back_color="white") qr_bytes = io.BytesIO() qr_img.save(qr_bytes, format='PNG') qr_bytes.seek(0) qr_base64 = base64.b64encode(qr_bytes.getvalue()).decode('utf-8') # HTML 模板 html_template = f"""
考培练学习平台
{cert_data.get('type_name', '证书')}
{user_name}
{cert_data.get('title', '')}
{cert_data.get('description', '')}
{"
成绩:" + str(cert_data.get('score')) + "分
" if cert_data.get('score') else ""} {"
完成率:" + str(cert_data.get('completion_rate')) + "%
" if cert_data.get('completion_rate') else ""}
官方认证
""" # 尝试使用 weasyprint 生成 PDF try: from weasyprint import HTML pdf_bytes = HTML(string=html_template).write_pdf() return pdf_bytes except ImportError: logger.warning("weasyprint 未安装,使用备用方案") # 备用方案:返回 HTML 供前端处理 return html_template.encode('utf-8') except Exception as e: logger.error(f"生成 PDF 失败: {str(e)}") raise ValueError(f"生成 PDF 失败: {str(e)}") async def download_certificate( self, cert_id: int, format: str = "pdf", base_url: str = "" ) -> tuple[bytes, str, str]: """ 下载证书 Args: cert_id: 证书ID format: 格式 (pdf/png) base_url: 基础URL Returns: (文件内容, 文件名, MIME类型) """ cert_data = await self.get_certificate_by_id(cert_id) if not cert_data: raise ValueError("证书不存在") cert_no = cert_data.get("certificate_no", "certificate") if format.lower() == "pdf": content = await self.generate_certificate_pdf(cert_id, base_url) filename = f"{cert_no}.pdf" mime_type = "application/pdf" else: content = await self.generate_certificate_image(cert_id, base_url) filename = f"{cert_no}.png" mime_type = "image/png" return content, filename, mime_type