Files
012-kaopeilian/backend/app/services/practice_room_service.py
yuliang_guo c5d460b413
Some checks failed
continuous-integration/drone/push Build is failing
feat: 添加双人对练语音通话功能
- 后端:扩展 SSE 支持 WebRTC 信令消息转发
- 前端:创建 WebRTC 连接管理模块 (webrtc.ts)
- 前端:创建 useVoiceCall 组合式函数
- 前端:在对练房间添加语音通话 UI
- 集成 Web Speech API 实现语音转文字
2026-01-28 15:45:47 +08:00

527 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
双人对练房间服务
功能:
- 房间创建、加入、退出
- 房间状态管理
- 消息广播
- 对练结束处理
"""
import logging
import random
import string
from datetime import datetime
from typing import Optional, List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, and_
from sqlalchemy.orm import selectinload
from app.models.practice_room import PracticeRoom, PracticeRoomMessage
from app.models.practice import PracticeDialogue, PracticeSession
from app.models.user import User
logger = logging.getLogger(__name__)
class PracticeRoomService:
"""双人对练房间服务"""
# 房间状态常量
STATUS_WAITING = "waiting" # 等待加入
STATUS_READY = "ready" # 准备就绪
STATUS_PRACTICING = "practicing" # 对练中
STATUS_COMPLETED = "completed" # 已完成
STATUS_CANCELED = "canceled" # 已取消
# 消息类型常量
MSG_TYPE_CHAT = "chat" # 聊天消息
MSG_TYPE_SYSTEM = "system" # 系统消息
MSG_TYPE_JOIN = "join" # 加入消息
MSG_TYPE_LEAVE = "leave" # 离开消息
MSG_TYPE_START = "start" # 开始消息
MSG_TYPE_END = "end" # 结束消息
def __init__(self, db: AsyncSession):
self.db = db
# ==================== 房间管理 ====================
async def create_room(
self,
host_user_id: int,
scene_id: Optional[int] = None,
scene_name: Optional[str] = None,
scene_type: Optional[str] = None,
scene_background: Optional[str] = None,
role_a_name: str = "销售顾问",
role_b_name: str = "顾客",
role_a_description: Optional[str] = None,
role_b_description: Optional[str] = None,
host_role: str = "A",
room_name: Optional[str] = None
) -> PracticeRoom:
"""
创建对练房间
Args:
host_user_id: 房主用户ID
scene_id: 场景ID可选
scene_name: 场景名称
scene_type: 场景类型
scene_background: 场景背景
role_a_name: 角色A名称
role_b_name: 角色B名称
role_a_description: 角色A描述
role_b_description: 角色B描述
host_role: 房主选择的角色A或B
room_name: 房间名称
Returns:
PracticeRoom: 创建的房间对象
"""
# 生成唯一的6位房间码
room_code = await self._generate_unique_room_code()
# 创建房间
room = PracticeRoom(
room_code=room_code,
room_name=room_name or f"{scene_name or '双人对练'}房间",
scene_id=scene_id,
scene_name=scene_name,
scene_type=scene_type,
scene_background=scene_background,
role_a_name=role_a_name,
role_b_name=role_b_name,
role_a_description=role_a_description,
role_b_description=role_b_description,
host_user_id=host_user_id,
host_role=host_role,
status=self.STATUS_WAITING
)
self.db.add(room)
await self.db.commit()
await self.db.refresh(room)
logger.info(f"创建房间成功: room_code={room_code}, host_user_id={host_user_id}")
return room
async def join_room(
self,
room_code: str,
user_id: int
) -> PracticeRoom:
"""
加入房间
Args:
room_code: 房间码
user_id: 用户ID
Returns:
PracticeRoom: 房间对象
Raises:
ValueError: 房间不存在、已满或状态不允许加入
"""
# 查询房间
room = await self.get_room_by_code(room_code)
if not room:
raise ValueError("房间不存在或已过期")
# 检查是否是房主(房主重新进入)
if room.host_user_id == user_id:
return room
# 检查房间状态
if room.status not in [self.STATUS_WAITING, self.STATUS_READY]:
raise ValueError("房间已开始对练或已结束,无法加入")
# 检查是否已满
if room.guest_user_id and room.guest_user_id != user_id:
raise ValueError("房间已满")
# 加入房间
room.guest_user_id = user_id
room.status = self.STATUS_READY
await self.db.commit()
await self.db.refresh(room)
# 发送系统消息
await self._add_system_message(room.id, f"用户已加入房间", self.MSG_TYPE_JOIN, user_id)
logger.info(f"用户加入房间: room_code={room_code}, user_id={user_id}")
return room
async def leave_room(
self,
room_code: str,
user_id: int
) -> bool:
"""
离开房间
Args:
room_code: 房间码
user_id: 用户ID
Returns:
bool: 是否成功离开
"""
room = await self.get_room_by_code(room_code)
if not room:
return False
# 如果是房主离开,取消房间
if room.host_user_id == user_id:
room.status = self.STATUS_CANCELED
await self._add_system_message(room.id, "房主离开,房间已关闭", self.MSG_TYPE_LEAVE, user_id)
# 如果是嘉宾离开
elif room.guest_user_id == user_id:
room.guest_user_id = None
room.status = self.STATUS_WAITING
await self._add_system_message(room.id, "对方已离开房间", self.MSG_TYPE_LEAVE, user_id)
else:
return False
await self.db.commit()
logger.info(f"用户离开房间: room_code={room_code}, user_id={user_id}")
return True
async def start_practice(
self,
room_code: str,
user_id: int
) -> PracticeRoom:
"""
开始对练(仅房主可操作)
Args:
room_code: 房间码
user_id: 用户ID必须是房主
Returns:
PracticeRoom: 房间对象
"""
room = await self.get_room_by_code(room_code)
if not room:
raise ValueError("房间不存在")
if room.host_user_id != user_id:
raise ValueError("只有房主可以开始对练")
if room.status != self.STATUS_READY:
raise ValueError("房间未就绪,请等待对方加入")
room.status = self.STATUS_PRACTICING
room.started_at = datetime.now()
await self.db.commit()
await self.db.refresh(room)
# 发送开始消息
await self._add_system_message(room.id, "对练开始!", self.MSG_TYPE_START)
logger.info(f"对练开始: room_code={room_code}")
return room
async def end_practice(
self,
room_code: str,
user_id: int
) -> PracticeRoom:
"""
结束对练
Args:
room_code: 房间码
user_id: 用户ID
Returns:
PracticeRoom: 房间对象
"""
room = await self.get_room_by_code(room_code)
if not room:
raise ValueError("房间不存在")
if room.status != self.STATUS_PRACTICING:
raise ValueError("对练未在进行中")
# 计算时长
if room.started_at:
duration = (datetime.now() - room.started_at).total_seconds()
room.duration_seconds = int(duration)
room.status = self.STATUS_COMPLETED
room.ended_at = datetime.now()
await self.db.commit()
await self.db.refresh(room)
# 发送结束消息
await self._add_system_message(room.id, "对练结束!", self.MSG_TYPE_END)
logger.info(f"对练结束: room_code={room_code}, duration={room.duration_seconds}s")
return room
# ==================== 消息管理 ====================
async def send_message(
self,
room_id: int,
user_id: int,
content: Optional[str],
role_name: Optional[str] = None,
message_type: Optional[str] = None,
extra_data: Optional[dict] = None
) -> PracticeRoomMessage:
"""
发送聊天消息或信令消息
Args:
room_id: 房间ID
user_id: 发送者ID
content: 消息内容
role_name: 角色名称
message_type: 消息类型(默认为 chat
extra_data: 额外数据(用于 WebRTC 信令等)
Returns:
PracticeRoomMessage: 消息对象
"""
import json
# 获取当前消息序号
sequence = await self._get_next_sequence(room_id)
# 如果是信令消息,将 extra_data 序列化到 content 中
actual_content = content
if extra_data and not content:
actual_content = json.dumps(extra_data)
message = PracticeRoomMessage(
room_id=room_id,
user_id=user_id,
message_type=message_type or self.MSG_TYPE_CHAT,
content=actual_content,
role_name=role_name,
sequence=sequence
)
self.db.add(message)
# 只有聊天消息才更新房间统计
if (message_type or self.MSG_TYPE_CHAT) == self.MSG_TYPE_CHAT:
room = await self.get_room_by_id(room_id)
if room:
room.total_turns += 1
user_role = room.get_user_role(user_id)
if user_role == "A":
room.role_a_turns += 1
elif user_role == "B":
room.role_b_turns += 1
await self.db.commit()
await self.db.refresh(message)
return message
async def get_messages(
self,
room_id: int,
since_sequence: int = 0,
limit: int = 100
) -> List[PracticeRoomMessage]:
"""
获取房间消息用于SSE轮询
Args:
room_id: 房间ID
since_sequence: 从该序号之后开始获取
limit: 最大数量
Returns:
List[PracticeRoomMessage]: 消息列表
"""
result = await self.db.execute(
select(PracticeRoomMessage)
.where(
and_(
PracticeRoomMessage.room_id == room_id,
PracticeRoomMessage.sequence > since_sequence
)
)
.order_by(PracticeRoomMessage.sequence)
.limit(limit)
)
return list(result.scalars().all())
async def get_all_messages(self, room_id: int) -> List[PracticeRoomMessage]:
"""
获取房间所有消息
Args:
room_id: 房间ID
Returns:
List[PracticeRoomMessage]: 消息列表
"""
result = await self.db.execute(
select(PracticeRoomMessage)
.where(PracticeRoomMessage.room_id == room_id)
.order_by(PracticeRoomMessage.sequence)
)
return list(result.scalars().all())
# ==================== 查询方法 ====================
async def get_room_by_code(self, room_code: str) -> Optional[PracticeRoom]:
"""根据房间码获取房间"""
result = await self.db.execute(
select(PracticeRoom).where(
and_(
PracticeRoom.room_code == room_code,
PracticeRoom.is_deleted == False
)
)
)
return result.scalar_one_or_none()
async def get_room_by_id(self, room_id: int) -> Optional[PracticeRoom]:
"""根据ID获取房间"""
result = await self.db.execute(
select(PracticeRoom).where(
and_(
PracticeRoom.id == room_id,
PracticeRoom.is_deleted == False
)
)
)
return result.scalar_one_or_none()
async def get_user_rooms(
self,
user_id: int,
status: Optional[str] = None,
limit: int = 20
) -> List[PracticeRoom]:
"""获取用户的房间列表"""
query = select(PracticeRoom).where(
and_(
(PracticeRoom.host_user_id == user_id) | (PracticeRoom.guest_user_id == user_id),
PracticeRoom.is_deleted == False
)
)
if status:
query = query.where(PracticeRoom.status == status)
query = query.order_by(PracticeRoom.created_at.desc()).limit(limit)
result = await self.db.execute(query)
return list(result.scalars().all())
async def get_room_with_users(self, room_code: str) -> Optional[Dict[str, Any]]:
"""获取房间详情(包含用户信息)"""
room = await self.get_room_by_code(room_code)
if not room:
return None
# 获取用户信息
host_user = None
guest_user = None
if room.host_user_id:
result = await self.db.execute(
select(User).where(User.id == room.host_user_id)
)
host_user = result.scalar_one_or_none()
if room.guest_user_id:
result = await self.db.execute(
select(User).where(User.id == room.guest_user_id)
)
guest_user = result.scalar_one_or_none()
return {
"room": room,
"host_user": host_user,
"guest_user": guest_user,
"host_role_name": room.get_role_name(room.host_role),
"guest_role_name": room.get_role_name("B" if room.host_role == "A" else "A") if guest_user else None
}
# ==================== 辅助方法 ====================
async def _generate_unique_room_code(self) -> str:
"""生成唯一的6位房间码"""
for _ in range(10): # 最多尝试10次
code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6))
# 排除容易混淆的字符
code = code.replace('0', 'X').replace('O', 'Y').replace('I', 'Z').replace('1', 'W')
# 检查是否已存在
existing = await self.get_room_by_code(code)
if not existing:
return code
raise ValueError("无法生成唯一房间码,请稍后重试")
async def _get_next_sequence(self, room_id: int) -> int:
"""获取下一个消息序号"""
result = await self.db.execute(
select(PracticeRoomMessage.sequence)
.where(PracticeRoomMessage.room_id == room_id)
.order_by(PracticeRoomMessage.sequence.desc())
.limit(1)
)
last_seq = result.scalar_one_or_none()
return (last_seq or 0) + 1
async def _add_system_message(
self,
room_id: int,
content: str,
msg_type: str,
user_id: Optional[int] = None
) -> PracticeRoomMessage:
"""添加系统消息"""
sequence = await self._get_next_sequence(room_id)
message = PracticeRoomMessage(
room_id=room_id,
user_id=user_id,
message_type=msg_type,
content=content,
sequence=sequence
)
self.db.add(message)
await self.db.commit()
await self.db.refresh(message)
return message
# ==================== 便捷函数 ====================
async def create_practice_room(
db: AsyncSession,
host_user_id: int,
**kwargs
) -> PracticeRoom:
"""便捷函数:创建房间"""
service = PracticeRoomService(db)
return await service.create_room(host_user_id, **kwargs)
async def join_practice_room(
db: AsyncSession,
room_code: str,
user_id: int
) -> PracticeRoom:
"""便捷函数:加入房间"""
service = PracticeRoomService(db)
return await service.join_room(room_code, user_id)