""" Coze 播课服务 """ import logging from typing import Optional from cozepy.exception import CozeError from app.core.config import settings from app.services.ai.coze.client import get_coze_client logger = logging.getLogger(__name__) class CozeBroadcastService: """Coze 播课服务""" def __init__(self): """初始化配置""" self.workflow_id = settings.COZE_BROADCAST_WORKFLOW_ID self.space_id = settings.COZE_BROADCAST_SPACE_ID def _get_client(self): """获取新的 Coze 客户端(每次调用都创建新认证,避免token过期)""" return get_coze_client(force_new=True) async def trigger_workflow(self, course_id: int) -> None: """ 触发播课生成工作流(不等待结果) Coze工作流会: 1. 生成播课音频 2. 直接将结果写入数据库 Args: course_id: 课程ID Raises: CozeError: Coze API 调用失败 """ logger.info( f"触发播课生成工作流", extra={ "course_id": course_id, "workflow_id": self.workflow_id, "bot_id": settings.COZE_BROADCAST_BOT_ID or settings.COZE_PRACTICE_BOT_ID # 关联到同一工作空间的Bot } ) try: # 每次调用都获取新客户端(确保OAuth token有效) coze = self._get_client() # 调用工作流(触发即返回,不等待结果) # 关键:添加bot_id参数,关联到OAuth应用下的Bot import asyncio result = await asyncio.to_thread( coze.workflows.runs.create, workflow_id=self.workflow_id, parameters={"course_id": str(course_id)}, bot_id=settings.COZE_BROADCAST_BOT_ID or settings.COZE_PRACTICE_BOT_ID # 关联Bot,确保OAuth权限 ) logger.info( f"播课生成工作流已触发", extra={ "course_id": course_id, "execute_id": getattr(result, 'execute_id', None), "debug_url": getattr(result, 'debug_url', None) } ) except CozeError as e: logger.error( f"触发 Coze 工作流失败", extra={ "course_id": course_id, "error": str(e), "error_code": getattr(e, 'code', None) } ) raise except Exception as e: logger.error( f"触发播课生成工作流异常", extra={ "course_id": course_id, "error": str(e), "error_type": type(e).__name__ } ) raise # 全局单例 broadcast_service = CozeBroadcastService()