Files
012-kaopeilian/backend/app/services/yanji_service.py
111 998211c483 feat: 初始化考培练系统项目
- 从服务器拉取完整代码
- 按框架规范整理项目结构
- 配置 Drone CI 测试环境部署
- 包含后端(FastAPI)、前端(Vue3)、管理端

技术栈: Vue3 + TypeScript + FastAPI + MySQL
2026-01-24 19:33:28 +08:00

511 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
言迹智能工牌API服务
"""
import logging
import random
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
import httpx
from app.core.config import settings
logger = logging.getLogger(__name__)
class YanjiService:
"""言迹智能工牌API服务类"""
def __init__(self):
self.base_url = settings.YANJI_API_BASE
self.client_id = settings.YANJI_CLIENT_ID
self.client_secret = settings.YANJI_CLIENT_SECRET
self.tenant_id = settings.YANJI_TENANT_ID
self.estate_id = int(settings.YANJI_ESTATE_ID)
# Token缓存
self._access_token: Optional[str] = None
self._token_expires_at: Optional[datetime] = None
async def get_access_token(self) -> str:
"""
获取或刷新access_token
Returns:
access_token字符串
"""
# 检查缓存的token是否仍然有效提前5分钟刷新
if self._access_token and self._token_expires_at:
if datetime.now() < self._token_expires_at - timedelta(minutes=5):
return self._access_token
# 获取新的token
url = f"{self.base_url}/oauth/token"
params = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
}
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params, timeout=30.0)
response.raise_for_status()
data = response.json()
self._access_token = data["access_token"]
expires_in = data.get("expires_in", 3600) # 默认1小时
self._token_expires_at = datetime.now() + timedelta(seconds=expires_in)
logger.info(f"言迹API token获取成功有效期至: {self._token_expires_at}")
return self._access_token
async def _request(
self,
method: str,
path: str,
params: Optional[Dict] = None,
json_data: Optional[Dict] = None,
) -> Dict:
"""
统一的HTTP请求方法
Args:
method: HTTP方法GET/POST等
path: API路径
params: Query参数
json_data: Body参数JSON
Returns:
响应数据data字段
Raises:
Exception: API调用失败
"""
token = await self.get_access_token()
url = f"{self.base_url}{path}"
headers = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient() as client:
response = await client.request(
method=method,
url=url,
params=params,
json=json_data,
headers=headers,
timeout=60.0,
)
response.raise_for_status()
result = response.json()
# 言迹API: code='0'或code=0表示成功
code = result.get("code")
if str(code) != '0':
error_msg = result.get("msg", "Unknown error")
logger.error(f"言迹API调用失败: {error_msg}, result={result}")
raise Exception(f"言迹API错误: {error_msg}")
# data可能为None返回空字典或空列表由调用方判断
return result.get("data")
async def get_visit_audios(
self, external_visit_ids: List[str]
) -> List[Dict]:
"""
根据来访单ID获取录音信息
Args:
external_visit_ids: 三方来访ID列表最多10个
Returns:
录音信息列表
"""
if not external_visit_ids:
return []
if len(external_visit_ids) > 10:
logger.warning(f"来访单ID数量超过限制截取前10个")
external_visit_ids = external_visit_ids[:10]
data = await self._request(
method="POST",
path="/api/beauty/v1/visit/audios",
json_data={
"estateId": self.estate_id,
"externalVisitIds": external_visit_ids,
},
)
if data is None:
logger.info(f"获取来访录音信息: 无数据")
return []
records = data.get("records", [])
logger.info(f"获取来访录音信息成功: {len(records)}")
return records
async def get_audio_asr_result(self, audio_id: int) -> Dict:
"""
获取录音的ASR分析结果对话文本
Args:
audio_id: 录音ID
Returns:
ASR分析结果包含对话文本数组
"""
data = await self._request(
method="GET",
path="/api/beauty/v1/audio/asr-analysed",
params={"estateId": self.estate_id, "audioId": audio_id},
)
# 检查data是否为None
if data is None:
logger.warning(f"录音ASR结果为None: audio_id={audio_id}")
return {}
# data是一个数组取第一个元素
if isinstance(data, list) and len(data) > 0:
result = data[0]
logger.info(
f"获取录音ASR结果成功: audio_id={audio_id}, "
f"对话数={len(result.get('result', []))}"
)
return result
else:
logger.warning(f"录音ASR结果为空: audio_id={audio_id}")
return {}
async def get_recent_conversations(
self, consultant_phone: str, limit: int = 10
) -> List[Dict]:
"""
获取员工最近N条对话记录
业务逻辑:
1. 通过员工手机号获取录音列表(目前使用模拟数据)
2. 对每个录音获取ASR分析结果
3. 组合返回完整的对话记录
Args:
consultant_phone: 员工手机号
limit: 获取数量默认10条
Returns:
对话记录列表,格式:
[{
"audio_id": 123,
"visit_id": "xxx",
"start_time": "2025-01-15 10:30:00",
"duration": 120000,
"consultant_name": "张三",
"consultant_phone": "13800138000",
"conversation": [
{"role": "consultant", "text": "您好..."},
{"role": "customer", "text": "你好..."}
]
}]
"""
# TODO: 目前言迹API没有直接通过手机号查询录音的接口
# 需要先获取来访单列表,再获取录音
# 这里暂时返回空列表,后续根据实际业务需求补充
logger.warning(
f"获取员工对话记录功能需要额外的业务逻辑支持 "
f"(consultant_phone={consultant_phone}, limit={limit})"
)
# 返回空列表,表示暂未实现
return []
async def get_conversations_by_visit_ids(
self, external_visit_ids: List[str]
) -> List[Dict]:
"""
根据来访单ID列表获取对话记录
Args:
external_visit_ids: 三方来访ID列表
Returns:
对话记录列表
"""
if not external_visit_ids:
return []
# 1. 获取录音信息
audio_records = await self.get_visit_audios(external_visit_ids)
if not audio_records:
logger.info("没有找到录音记录")
return []
# 2. 对每个录音获取ASR分析结果
conversations = []
for audio in audio_records:
audio_id = audio.get("id")
if not audio_id:
continue
try:
asr_result = await self.get_audio_asr_result(audio_id)
# 解析对话文本
conversation_messages = []
for item in asr_result.get("result", []):
role = "consultant" if item.get("role") == -1 else "customer"
conversation_messages.append({
"role": role,
"text": item.get("text", ""),
"begin_time": item.get("beginTime"),
"end_time": item.get("endTime"),
})
# 组合完整对话记录
conversations.append({
"audio_id": audio_id,
"visit_id": audio.get("externalVisitId", ""),
"start_time": audio.get("startTime", ""),
"duration": audio.get("duration", 0),
"consultant_name": audio.get("consultantName", ""),
"consultant_phone": audio.get("consultantPhone", ""),
"conversation": conversation_messages,
})
except Exception as e:
logger.error(f"获取录音ASR结果失败: audio_id={audio_id}, error={e}")
continue
logger.info(f"成功获取{len(conversations)}条对话记录")
return conversations
async def get_audio_list(self, phone: str) -> List[Dict]:
"""
获取员工的录音列表(模拟)
注意言迹API暂时没有提供通过手机号直接查询录音列表的接口
这里使用模拟数据,返回假想的录音列表
Args:
phone: 员工手机号
Returns:
录音信息列表
"""
logger.info(f"获取员工录音列表(模拟): phone={phone}")
# 模拟返回10条录音记录
mock_audios = []
base_time = datetime.now()
for i in range(10):
# 模拟不同时长的录音
durations = [25000, 45000, 180000, 240000, 120000, 90000, 60000, 300000, 420000, 150000]
mock_audios.append({
"id": f"mock_audio_{i+1}",
"externalVisitId": f"visit_{i+1}",
"startTime": (base_time - timedelta(days=i)).strftime("%Y-%m-%d %H:%M:%S"),
"duration": durations[i], # 毫秒
"consultantName": "模拟员工",
"consultantPhone": phone
})
return mock_audios
async def get_employee_conversations_for_analysis(
self,
phone: str,
limit: int = 10
) -> List[Dict[str, Any]]:
"""
获取员工最近N条录音的模拟对话数据用于能力分析
Args:
phone: 员工手机号
limit: 获取数量默认10条
Returns:
对话数据列表,格式:
[{
"audio_id": "mock_audio_1",
"duration_seconds": 25,
"start_time": "2025-10-15 10:30:00",
"dialogue_history": [
{"speaker": "consultant", "content": "您好..."},
{"speaker": "customer", "content": "你好..."}
]
}]
"""
# 1. 获取录音列表
audios = await self.get_audio_list(phone)
if not audios:
logger.warning(f"未找到员工的录音记录: phone={phone}")
return []
# 2. 筛选前limit条
selected_audios = audios[:limit]
# 3. 为每条录音生成模拟对话
conversations = []
for audio in selected_audios:
conversation = self._generate_mock_conversation(audio)
conversations.append(conversation)
logger.info(f"生成模拟对话数据: phone={phone}, count={len(conversations)}")
return conversations
def _generate_mock_conversation(self, audio: Dict) -> Dict:
"""
为录音生成模拟对话数据
根据录音时长选择不同复杂度的对话模板:
- <30秒: 短对话4-6轮
- 30秒-5分钟: 中等对话8-12轮
- >5分钟: 长对话15-20轮完整销售流程
Args:
audio: 录音信息字典
Returns:
对话数据字典
"""
duration = int(audio.get('duration', 60000)) // 1000 # 转换为秒
# 根据时长选择对话模板
if duration < 30:
dialogue = self._short_conversation_template()
elif duration < 300:
dialogue = self._medium_conversation_template()
else:
dialogue = self._long_conversation_template()
return {
"audio_id": audio.get('id'),
"duration_seconds": duration,
"start_time": audio.get('startTime'),
"dialogue_history": dialogue
}
def _short_conversation_template(self) -> List[Dict]:
"""短对话模板(<30秒- 4-6轮对话"""
templates = [
[
{"speaker": "consultant", "content": "您好,欢迎光临曼尼斐绮,请问有什么可以帮到您?"},
{"speaker": "customer", "content": "你好,我想了解一下面部护理项目"},
{"speaker": "consultant", "content": "好的,我们有多种面部护理方案,请问您主要关注哪方面呢?"},
{"speaker": "customer", "content": "主要是想改善皮肤暗沉"},
{"speaker": "consultant", "content": "明白了,针对皮肤暗沉,我推荐我们的美白焕肤套餐"}
],
[
{"speaker": "consultant", "content": "您好,请问需要什么帮助吗?"},
{"speaker": "customer", "content": "我想咨询一下祛斑项目"},
{"speaker": "consultant", "content": "好的,请问您主要是哪种类型的斑点呢?"},
{"speaker": "customer", "content": "脸颊两侧有些黄褐斑"},
{"speaker": "consultant", "content": "了解,我们有专门针对黄褐斑的光子嫩肤项目,效果很不错"}
],
[
{"speaker": "consultant", "content": "欢迎光临,有什么可以帮您的吗?"},
{"speaker": "customer", "content": "我想预约做个面部护理"},
{"speaker": "consultant", "content": "好的,请问您之前做过我们的项目吗?"},
{"speaker": "customer", "content": "没有,第一次来"},
{"speaker": "consultant", "content": "那我建议您先做个免费的皮肤检测,帮您制定个性化方案"},
{"speaker": "customer", "content": "好的,那现在可以吗?"}
]
]
return random.choice(templates)
def _medium_conversation_template(self) -> List[Dict]:
"""中等对话模板30秒-5分钟- 8-12轮对话"""
templates = [
[
{"speaker": "consultant", "content": "您好,欢迎光临曼尼斐绮,我是美容顾问小王,请问怎么称呼您?"},
{"speaker": "customer", "content": "你好,我姓李"},
{"speaker": "consultant", "content": "李女士您好,请问今天是第一次了解我们的项目吗?"},
{"speaker": "customer", "content": "是的,之前在网上看到你们的介绍"},
{"speaker": "consultant", "content": "好的,您对哪方面的美容项目比较感兴趣呢?"},
{"speaker": "customer", "content": "我想改善面部松弛的问题,最近感觉皮肤没有以前紧致了"},
{"speaker": "consultant", "content": "我理解您的困扰。请问您多大年龄?平时有做面部护理吗?"},
{"speaker": "customer", "content": "我35岁平时就是用护肤品没做过专业护理"},
{"speaker": "consultant", "content": "明白了。35岁开始注重抗衰是很及时的。我们有几种方案比如射频紧肤、超声刀提拉还有胶原蛋白再生项目"},
{"speaker": "customer", "content": "这几种有什么区别吗?"},
{"speaker": "consultant", "content": "射频主要是刺激胶原蛋白增生,效果温和持久。超声刀作用更深层,提拉效果更明显但价格稍高。我建议您先做个皮肤检测,看具体适合哪种"},
{"speaker": "customer", "content": "好的,那先做个检测吧"}
],
[
{"speaker": "consultant", "content": "您好,欢迎光临,我是美容顾问晓雯,请问您是第一次来吗?"},
{"speaker": "customer", "content": "是的,朋友推荐过来看看"},
{"speaker": "consultant", "content": "太好了,请问您朋友是做的什么项目呢?"},
{"speaker": "customer", "content": "她做的好像是什么水光针"},
{"speaker": "consultant", "content": "水光针确实是我们很受欢迎的项目。请问您今天主要想了解哪方面呢?"},
{"speaker": "customer", "content": "我主要是皮肤有点粗糙,毛孔也大"},
{"speaker": "consultant", "content": "嗯,针对毛孔粗大和皮肤粗糙,水光针确实有不错的效果。不过我建议先看看您的具体情况"},
{"speaker": "customer", "content": "需要检查吗?"},
{"speaker": "consultant", "content": "是的,我们有专业的皮肤检测仪,可以看到肉眼看不到的皮肤问题,这样制定方案更精准"},
{"speaker": "customer", "content": "好的,那检查一下吧"},
{"speaker": "consultant", "content": "好的请这边来检查大概需要5分钟"}
]
]
return random.choice(templates)
def _long_conversation_template(self) -> List[Dict]:
"""长对话模板(>5分钟- 15-20轮对话完整销售流程"""
templates = [
[
{"speaker": "consultant", "content": "您好,欢迎光临曼尼斐绮,我是资深美容顾问晓雯,请问怎么称呼您?"},
{"speaker": "customer", "content": "你好,我姓陈"},
{"speaker": "consultant", "content": "陈女士您好,看您气色很好,平时应该很注重保养吧?"},
{"speaker": "customer", "content": "还好吧,基本的护肤品会用"},
{"speaker": "consultant", "content": "这样啊。那今天是专程过来了解我们的项目,还是朋友推荐的呢?"},
{"speaker": "customer", "content": "我闺蜜在你们这做过,说效果不错,所以想来看看"},
{"speaker": "consultant", "content": "太好了,请问您闺蜜做的是什么项目呢?"},
{"speaker": "customer", "content": "好像是什么光子嫩肤"},
{"speaker": "consultant", "content": "明白了,光子嫩肤确实是我们的明星项目。不过每个人的皮肤状况不同,我先帮您做个详细的皮肤检测,看看最适合您的方案好吗?"},
{"speaker": "customer", "content": "好的"},
{"speaker": "consultant", "content": "陈女士通过检测我看到您的皮肤主要有三个问题一是T区毛孔粗大二是两颊有轻微色斑三是皮肤缺水。您平时有感觉到这些问题吗"},
{"speaker": "customer", "content": "对,毛孔确实有点大,色斑是最近才发现的"},
{"speaker": "consultant", "content": "嗯,这些问题如果不及时处理会越来越明显。针对您的情况,我建议做一个综合性的美白嫩肤方案"},
{"speaker": "customer", "content": "具体是怎么做的?"},
{"speaker": "consultant", "content": "我们采用光子嫩肤配合水光针的组合疗程。光子嫩肤主要解决色斑和毛孔问题,水光针补水锁水,效果相辅相成"},
{"speaker": "customer", "content": "听起来不错,大概需要多少钱?"},
{"speaker": "consultant", "content": "我们现在正好有活动光子嫩肤单次原价3800水光针单次2600组合套餐优惠后只要5800相当于打了九折"},
{"speaker": "customer", "content": "嗯...还是有点贵"},
{"speaker": "consultant", "content": "我理解您的顾虑。但是陈女士您想想这个价格是一次性投入效果却能维持3-6个月。平均下来每天不到30块钱换来的是皮肤的明显改善"},
{"speaker": "customer", "content": "这倒也是..."},
{"speaker": "consultant", "content": "而且这个活动就到本月底下个月恢复原价的话就要6400了。您今天如果确定的话我还可以帮您申请赠送一次基础补水护理"},
{"speaker": "customer", "content": "那行吧,今天就定了"},
{"speaker": "consultant", "content": "太好了!陈女士您做了个很明智的决定。我现在帮您预约最近的时间,您看周三下午方便吗?"}
],
[
{"speaker": "consultant", "content": "您好,欢迎光临,我是美容顾问小张,请问您贵姓?"},
{"speaker": "customer", "content": "我姓王"},
{"speaker": "consultant", "content": "王女士您好,请坐。今天想了解什么项目呢?"},
{"speaker": "customer", "content": "我想做个面部提升,感觉脸有点下垂了"},
{"speaker": "consultant", "content": "嗯,我看得出来您平时很注重保养。请问您今年多大年龄?"},
{"speaker": "customer", "content": "我42了"},
{"speaker": "consultant", "content": "42岁这个年龄段确实容易出现轻微松弛。您之前有做过抗衰项目吗"},
{"speaker": "customer", "content": "做过几次普通的面部护理,但感觉效果不明显"},
{"speaker": "consultant", "content": "普通护理主要是表层保养,对于松弛问题作用有限。您的情况需要更深层的治疗"},
{"speaker": "customer", "content": "那有什么好的方案吗?"},
{"speaker": "consultant", "content": "针对您的情况,我推荐热玛吉或者超声刀。这两种都是通过热能刺激深层胶原蛋白重组,达到紧致提升的效果"},
{"speaker": "customer", "content": "这两种有什么区别?"},
{"speaker": "consultant", "content": "热玛吉作用在真皮层,效果更自然持久,适合轻中度松弛。超声刀能到达筋膜层,提拉力度更强,适合松弛比较明显的情况"},
{"speaker": "customer", "content": "我的情况适合哪种?"},
{"speaker": "consultant", "content": "从您的面部状况来看,我建议选择热玛吉。您的松弛程度属于轻度,热玛吉的效果会更自然,恢复期也更短"},
{"speaker": "customer", "content": "费用大概多少?"},
{"speaker": "consultant", "content": "热玛吉全脸的话我们的价格是28800元。不过您今天来的时机很好我们正在做周年庆活动可以优惠到23800"},
{"speaker": "customer", "content": "还是挺贵的啊"},
{"speaker": "consultant", "content": "王女士我理解您的感受。但是热玛吉一次治疗效果可以维持2-3年平均每天只要20多块钱。而且这是一次性投入不需要反复做"},
{"speaker": "customer", "content": "效果真的能维持那么久吗?"},
{"speaker": "consultant", "content": "这是有科学依据的。热玛吉刺激的是您自身的胶原蛋白再生,不是外来填充,所以效果持久自然。我们有很多客户都做过,反馈都很好"},
{"speaker": "customer", "content": "那我考虑一下吧"},
{"speaker": "consultant", "content": "可以的。不过这个活动优惠就到本周日,下周就恢复原价了。而且名额有限,您要是确定的话最好尽快预约"},
{"speaker": "customer", "content": "好吧,那我今天就定下来吧"}
]
]
return random.choice(templates)