""" 陪练分析报告服务 - Python 原生实现 功能: - 分析陪练对话历史 - 生成综合评分、能力维度评估 - 提供对话标注和改进建议 提供稳定可靠的陪练分析报告生成能力。 """ import json import logging from dataclasses import dataclass, field from typing import Any, Dict, List, Optional from .ai_service import AIService, AIResponse from .llm_json_parser import parse_with_fallback, clean_llm_output from .prompts.practice_analysis_prompts import ( SYSTEM_PROMPT, USER_PROMPT, PRACTICE_ANALYSIS_SCHEMA, SCORE_BREAKDOWN_ITEMS, ABILITY_DIMENSIONS, ) logger = logging.getLogger(__name__) # ==================== 数据结构 ==================== @dataclass class ScoreBreakdownItem: """分数细分项""" name: str score: float description: str @dataclass class AbilityDimensionItem: """能力维度项""" name: str score: float feedback: str @dataclass class DialogueAnnotation: """对话标注""" sequence: int tags: List[str] comment: str @dataclass class Suggestion: """改进建议""" title: str content: str example: str @dataclass class PracticeAnalysisResult: """陪练分析结果""" success: bool total_score: float = 0.0 score_breakdown: List[ScoreBreakdownItem] = field(default_factory=list) ability_dimensions: List[AbilityDimensionItem] = field(default_factory=list) dialogue_annotations: List[DialogueAnnotation] = field(default_factory=list) suggestions: List[Suggestion] = field(default_factory=list) ai_provider: str = "" ai_model: str = "" ai_tokens: int = 0 ai_latency_ms: int = 0 error: str = "" def to_dict(self) -> Dict[str, Any]: """转换为字典(兼容原有数据格式)""" return { "analysis": { "total_score": self.total_score, "score_breakdown": [ {"name": s.name, "score": s.score, "description": s.description} for s in self.score_breakdown ], "ability_dimensions": [ {"name": d.name, "score": d.score, "feedback": d.feedback} for d in self.ability_dimensions ], "dialogue_annotations": [ {"sequence": a.sequence, "tags": a.tags, "comment": a.comment} for a in self.dialogue_annotations ], "suggestions": [ {"title": s.title, "content": s.content, "example": s.example} for s in self.suggestions ], }, "ai_provider": self.ai_provider, "ai_model": self.ai_model, "ai_tokens": self.ai_tokens, "ai_latency_ms": self.ai_latency_ms, } def to_db_format(self) -> Dict[str, Any]: """转换为数据库存储格式(兼容 PracticeReport 模型)""" return { "total_score": int(self.total_score), "score_breakdown": [ {"name": s.name, "score": s.score, "description": s.description} for s in self.score_breakdown ], "ability_dimensions": [ {"name": d.name, "score": d.score, "feedback": d.feedback} for d in self.ability_dimensions ], "dialogue_review": [ {"sequence": a.sequence, "tags": a.tags, "comment": a.comment} for a in self.dialogue_annotations ], "suggestions": [ {"title": s.title, "content": s.content, "example": s.example} for s in self.suggestions ], } # ==================== 服务类 ==================== class PracticeAnalysisService: """ 陪练分析报告服务 使用 Python 原生实现。 使用示例: ```python service = PracticeAnalysisService() result = await service.analyze( db=db_session, # 传入 db_session 用于记录调用日志 dialogue_history=[ {"speaker": "user", "content": "您好,我想咨询一下..."}, {"speaker": "ai", "content": "您好!很高兴为您服务..."} ] ) print(result.total_score) print(result.suggestions) ``` """ MODULE_CODE = "practice_analysis" async def analyze( self, dialogue_history: List[Dict[str, Any]], db: Any = None # 数据库会话,用于记录 AI 调用日志 ) -> PracticeAnalysisResult: """ 分析陪练对话 Args: dialogue_history: 对话历史列表,每项包含 speaker, content, timestamp 等字段 db: 数据库会话,用于记录调用日志(符合 AI 接入规范) Returns: PracticeAnalysisResult 分析结果 """ try: logger.info(f"开始分析陪练对话 - 对话轮次: {len(dialogue_history)}") # 1. 验证输入 if not dialogue_history or len(dialogue_history) < 2: return PracticeAnalysisResult( success=False, error="对话记录太少,无法生成分析报告(至少需要2轮对话)" ) # 2. 格式化对话历史 dialogue_text = self._format_dialogue_history(dialogue_history) # 3. 创建 AIService 实例(传入 db_session 用于记录调用日志) self._ai_service = AIService(module_code=self.MODULE_CODE, db_session=db) # 4. 调用 AI 分析 ai_response = await self._call_ai_analysis(dialogue_text) logger.info( f"AI 分析完成 - provider: {ai_response.provider}, " f"tokens: {ai_response.total_tokens}, latency: {ai_response.latency_ms}ms" ) # 4. 解析 JSON 结果 analysis_data = self._parse_analysis_result(ai_response.content) # 5. 构建返回结果 result = PracticeAnalysisResult( success=True, total_score=analysis_data.get("total_score", 0), score_breakdown=[ ScoreBreakdownItem( name=s.get("name", ""), score=s.get("score", 0), description=s.get("description", "") ) for s in analysis_data.get("score_breakdown", []) ], ability_dimensions=[ AbilityDimensionItem( name=d.get("name", ""), score=d.get("score", 0), feedback=d.get("feedback", "") ) for d in analysis_data.get("ability_dimensions", []) ], dialogue_annotations=[ DialogueAnnotation( sequence=a.get("sequence", 0), tags=a.get("tags", []), comment=a.get("comment", "") ) for a in analysis_data.get("dialogue_annotations", []) ], suggestions=[ Suggestion( title=s.get("title", ""), content=s.get("content", ""), example=s.get("example", "") ) for s in analysis_data.get("suggestions", []) ], ai_provider=ai_response.provider, ai_model=ai_response.model, ai_tokens=ai_response.total_tokens, ai_latency_ms=ai_response.latency_ms, ) logger.info( f"陪练分析完成 - total_score: {result.total_score}, " f"annotations: {len(result.dialogue_annotations)}, " f"suggestions: {len(result.suggestions)}" ) return result except Exception as e: logger.error(f"陪练分析失败: {e}", exc_info=True) return PracticeAnalysisResult( success=False, error=str(e) ) def _format_dialogue_history(self, dialogue_history: List[Dict[str, Any]]) -> str: """ 格式化对话历史为文本 Args: dialogue_history: 对话历史列表 Returns: 格式化后的对话文本 """ lines = [] for i, d in enumerate(dialogue_history, 1): speaker = d.get('speaker', 'unknown') content = d.get('content', '') # 统一说话者标识 if speaker in ['user', 'employee', 'consultant', '员工', '用户']: speaker_label = '员工' elif speaker in ['ai', 'customer', 'client', '顾客', '客户', 'AI']: speaker_label = '顾客' else: speaker_label = speaker lines.append(f"[{i}] {speaker_label}: {content}") return '\n'.join(lines) async def _call_ai_analysis(self, dialogue_text: str) -> AIResponse: """调用 AI 进行分析""" user_message = USER_PROMPT.format(dialogue_history=dialogue_text) messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_message} ] response = await self._ai_service.chat( messages=messages, temperature=0.7, prompt_name="practice_analysis" ) return response def _parse_analysis_result(self, ai_output: str) -> Dict[str, Any]: """ 解析 AI 输出的分析结果 JSON 使用 LLM JSON Parser 进行多层兜底解析 """ # 先清洗输出 cleaned_output, rules = clean_llm_output(ai_output) if rules: logger.debug(f"AI 输出已清洗: {rules}") # 使用带 Schema 校验的解析 parsed = parse_with_fallback( cleaned_output, schema=PRACTICE_ANALYSIS_SCHEMA, default={"analysis": {}}, validate_schema=True, on_error="default" ) # 提取 analysis 部分 analysis = parsed.get("analysis", {}) # 确保 score_breakdown 完整 existing_breakdown = {s.get("name") for s in analysis.get("score_breakdown", [])} for item_name in SCORE_BREAKDOWN_ITEMS: if item_name not in existing_breakdown: logger.warning(f"缺少分数维度: {item_name},使用默认值") analysis.setdefault("score_breakdown", []).append({ "name": item_name, "score": 75, "description": "暂无详细评价" }) # 确保 ability_dimensions 完整 existing_dims = {d.get("name") for d in analysis.get("ability_dimensions", [])} for dim_name in ABILITY_DIMENSIONS: if dim_name not in existing_dims: logger.warning(f"缺少能力维度: {dim_name},使用默认值") analysis.setdefault("ability_dimensions", []).append({ "name": dim_name, "score": 75, "feedback": "暂无详细评价" }) # 确保有建议 if not analysis.get("suggestions"): analysis["suggestions"] = [ { "title": "持续练习", "content": "建议继续进行陪练练习,提升整体表现", "example": "每周进行2-3次陪练,针对薄弱环节重点练习" } ] return analysis # ==================== 全局实例 ==================== practice_analysis_service = PracticeAnalysisService() # ==================== 便捷函数 ==================== async def analyze_practice_session( dialogue_history: List[Dict[str, Any]] ) -> Dict[str, Any]: """ 便捷函数:分析陪练会话 Args: dialogue_history: 对话历史列表 Returns: 分析结果字典(兼容原有格式) """ result = await practice_analysis_service.analyze(dialogue_history) return result.to_dict()