Files
012-kaopeilian/backend/app/services/coze_broadcast_service.py
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

98 lines
3.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Coze 播课服务
"""
import logging
from typing import Optional
from cozepy.exception import CozeError
from app.core.config import settings
from app.services.ai.coze.client import get_coze_client
logger = logging.getLogger(__name__)
class CozeBroadcastService:
"""Coze 播课服务"""
def __init__(self):
"""初始化配置"""
self.workflow_id = settings.COZE_BROADCAST_WORKFLOW_ID
self.space_id = settings.COZE_BROADCAST_SPACE_ID
def _get_client(self):
"""获取新的 Coze 客户端每次调用都创建新认证避免token过期"""
return get_coze_client(force_new=True)
async def trigger_workflow(self, course_id: int) -> None:
"""
触发播课生成工作流(不等待结果)
Coze工作流会
1. 生成播课音频
2. 直接将结果写入数据库
Args:
course_id: 课程ID
Raises:
CozeError: Coze API 调用失败
"""
logger.info(
f"触发播课生成工作流",
extra={
"course_id": course_id,
"workflow_id": self.workflow_id,
"bot_id": settings.COZE_BROADCAST_BOT_ID or settings.COZE_PRACTICE_BOT_ID # 关联到同一工作空间的Bot
}
)
try:
# 每次调用都获取新客户端确保OAuth token有效
coze = self._get_client()
# 调用工作流(触发即返回,不等待结果)
# 关键添加bot_id参数关联到OAuth应用下的Bot
import asyncio
result = await asyncio.to_thread(
coze.workflows.runs.create,
workflow_id=self.workflow_id,
parameters={"course_id": str(course_id)},
bot_id=settings.COZE_BROADCAST_BOT_ID or settings.COZE_PRACTICE_BOT_ID # 关联Bot确保OAuth权限
)
logger.info(
f"播课生成工作流已触发",
extra={
"course_id": course_id,
"execute_id": getattr(result, 'execute_id', None),
"debug_url": getattr(result, 'debug_url', None)
}
)
except CozeError as e:
logger.error(
f"触发 Coze 工作流失败",
extra={
"course_id": course_id,
"error": str(e),
"error_code": getattr(e, 'code', None)
}
)
raise
except Exception as e:
logger.error(
f"触发播课生成工作流异常",
extra={
"course_id": course_id,
"error": str(e),
"error_type": type(e).__name__
}
)
raise
# 全局单例
broadcast_service = CozeBroadcastService()