""" 双人对练房间服务 功能: - 房间创建、加入、退出 - 房间状态管理 - 消息广播 - 对练结束处理 """ import logging import random import string from datetime import datetime from typing import Optional, List, Dict, Any from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, update, and_ from sqlalchemy.orm import selectinload from app.models.practice_room import PracticeRoom, PracticeRoomMessage from app.models.practice import PracticeDialogue, PracticeSession from app.models.user import User logger = logging.getLogger(__name__) class PracticeRoomService: """双人对练房间服务""" # 房间状态常量 STATUS_WAITING = "waiting" # 等待加入 STATUS_READY = "ready" # 准备就绪 STATUS_PRACTICING = "practicing" # 对练中 STATUS_COMPLETED = "completed" # 已完成 STATUS_CANCELED = "canceled" # 已取消 # 消息类型常量 MSG_TYPE_CHAT = "chat" # 聊天消息 MSG_TYPE_SYSTEM = "system" # 系统消息 MSG_TYPE_JOIN = "join" # 加入消息 MSG_TYPE_LEAVE = "leave" # 离开消息 MSG_TYPE_START = "start" # 开始消息 MSG_TYPE_END = "end" # 结束消息 def __init__(self, db: AsyncSession): self.db = db # ==================== 房间管理 ==================== async def create_room( self, host_user_id: int, scene_id: Optional[int] = None, scene_name: Optional[str] = None, scene_type: Optional[str] = None, scene_background: Optional[str] = None, role_a_name: str = "销售顾问", role_b_name: str = "顾客", role_a_description: Optional[str] = None, role_b_description: Optional[str] = None, host_role: str = "A", room_name: Optional[str] = None ) -> PracticeRoom: """ 创建对练房间 Args: host_user_id: 房主用户ID scene_id: 场景ID(可选) scene_name: 场景名称 scene_type: 场景类型 scene_background: 场景背景 role_a_name: 角色A名称 role_b_name: 角色B名称 role_a_description: 角色A描述 role_b_description: 角色B描述 host_role: 房主选择的角色(A或B) room_name: 房间名称 Returns: PracticeRoom: 创建的房间对象 """ # 生成唯一的6位房间码 room_code = await self._generate_unique_room_code() # 创建房间 room = PracticeRoom( room_code=room_code, room_name=room_name or f"{scene_name or '双人对练'}房间", scene_id=scene_id, scene_name=scene_name, scene_type=scene_type, scene_background=scene_background, role_a_name=role_a_name, role_b_name=role_b_name, role_a_description=role_a_description, role_b_description=role_b_description, host_user_id=host_user_id, host_role=host_role, status=self.STATUS_WAITING ) self.db.add(room) await self.db.commit() await self.db.refresh(room) logger.info(f"创建房间成功: room_code={room_code}, host_user_id={host_user_id}") return room async def join_room( self, room_code: str, user_id: int ) -> PracticeRoom: """ 加入房间 Args: room_code: 房间码 user_id: 用户ID Returns: PracticeRoom: 房间对象 Raises: ValueError: 房间不存在、已满或状态不允许加入 """ # 查询房间 room = await self.get_room_by_code(room_code) if not room: raise ValueError("房间不存在或已过期") # 检查是否是房主(房主重新进入) if room.host_user_id == user_id: return room # 检查房间状态 if room.status not in [self.STATUS_WAITING, self.STATUS_READY]: raise ValueError("房间已开始对练或已结束,无法加入") # 检查是否已满 if room.guest_user_id and room.guest_user_id != user_id: raise ValueError("房间已满") # 加入房间 room.guest_user_id = user_id room.status = self.STATUS_READY await self.db.commit() await self.db.refresh(room) # 发送系统消息 await self._add_system_message(room.id, f"用户已加入房间", self.MSG_TYPE_JOIN, user_id) logger.info(f"用户加入房间: room_code={room_code}, user_id={user_id}") return room async def leave_room( self, room_code: str, user_id: int ) -> bool: """ 离开房间 Args: room_code: 房间码 user_id: 用户ID Returns: bool: 是否成功离开 """ room = await self.get_room_by_code(room_code) if not room: return False # 如果是房主离开,取消房间 if room.host_user_id == user_id: room.status = self.STATUS_CANCELED await self._add_system_message(room.id, "房主离开,房间已关闭", self.MSG_TYPE_LEAVE, user_id) # 如果是嘉宾离开 elif room.guest_user_id == user_id: room.guest_user_id = None room.status = self.STATUS_WAITING await self._add_system_message(room.id, "对方已离开房间", self.MSG_TYPE_LEAVE, user_id) else: return False await self.db.commit() logger.info(f"用户离开房间: room_code={room_code}, user_id={user_id}") return True async def start_practice( self, room_code: str, user_id: int ) -> PracticeRoom: """ 开始对练(仅房主可操作) Args: room_code: 房间码 user_id: 用户ID(必须是房主) Returns: PracticeRoom: 房间对象 """ room = await self.get_room_by_code(room_code) if not room: raise ValueError("房间不存在") if room.host_user_id != user_id: raise ValueError("只有房主可以开始对练") if room.status != self.STATUS_READY: raise ValueError("房间未就绪,请等待对方加入") room.status = self.STATUS_PRACTICING room.started_at = datetime.now() await self.db.commit() await self.db.refresh(room) # 发送开始消息 await self._add_system_message(room.id, "对练开始!", self.MSG_TYPE_START) logger.info(f"对练开始: room_code={room_code}") return room async def end_practice( self, room_code: str, user_id: int ) -> PracticeRoom: """ 结束对练 Args: room_code: 房间码 user_id: 用户ID Returns: PracticeRoom: 房间对象 """ room = await self.get_room_by_code(room_code) if not room: raise ValueError("房间不存在") if room.status != self.STATUS_PRACTICING: raise ValueError("对练未在进行中") # 计算时长 if room.started_at: duration = (datetime.now() - room.started_at).total_seconds() room.duration_seconds = int(duration) room.status = self.STATUS_COMPLETED room.ended_at = datetime.now() await self.db.commit() await self.db.refresh(room) # 发送结束消息 await self._add_system_message(room.id, "对练结束!", self.MSG_TYPE_END) logger.info(f"对练结束: room_code={room_code}, duration={room.duration_seconds}s") return room # ==================== 消息管理 ==================== async def send_message( self, room_id: int, user_id: int, content: Optional[str], role_name: Optional[str] = None, message_type: Optional[str] = None, extra_data: Optional[dict] = None ) -> PracticeRoomMessage: """ 发送聊天消息或信令消息 Args: room_id: 房间ID user_id: 发送者ID content: 消息内容 role_name: 角色名称 message_type: 消息类型(默认为 chat) extra_data: 额外数据(用于 WebRTC 信令等) Returns: PracticeRoomMessage: 消息对象 """ import json # 获取当前消息序号 sequence = await self._get_next_sequence(room_id) # 如果是信令消息,将 extra_data 序列化到 content 中 actual_content = content if extra_data and not content: actual_content = json.dumps(extra_data) message = PracticeRoomMessage( room_id=room_id, user_id=user_id, message_type=message_type or self.MSG_TYPE_CHAT, content=actual_content, role_name=role_name, sequence=sequence ) self.db.add(message) # 只有聊天消息才更新房间统计 if (message_type or self.MSG_TYPE_CHAT) == self.MSG_TYPE_CHAT: room = await self.get_room_by_id(room_id) if room: room.total_turns += 1 user_role = room.get_user_role(user_id) if user_role == "A": room.role_a_turns += 1 elif user_role == "B": room.role_b_turns += 1 await self.db.commit() await self.db.refresh(message) return message async def get_messages( self, room_id: int, since_sequence: int = 0, limit: int = 100 ) -> List[PracticeRoomMessage]: """ 获取房间消息(用于SSE轮询) Args: room_id: 房间ID since_sequence: 从该序号之后开始获取 limit: 最大数量 Returns: List[PracticeRoomMessage]: 消息列表 """ result = await self.db.execute( select(PracticeRoomMessage) .where( and_( PracticeRoomMessage.room_id == room_id, PracticeRoomMessage.sequence > since_sequence ) ) .order_by(PracticeRoomMessage.sequence) .limit(limit) ) return list(result.scalars().all()) async def get_all_messages(self, room_id: int) -> List[PracticeRoomMessage]: """ 获取房间所有消息 Args: room_id: 房间ID Returns: List[PracticeRoomMessage]: 消息列表 """ result = await self.db.execute( select(PracticeRoomMessage) .where(PracticeRoomMessage.room_id == room_id) .order_by(PracticeRoomMessage.sequence) ) return list(result.scalars().all()) # ==================== 查询方法 ==================== async def get_room_by_code(self, room_code: str) -> Optional[PracticeRoom]: """根据房间码获取房间""" result = await self.db.execute( select(PracticeRoom).where( and_( PracticeRoom.room_code == room_code, PracticeRoom.is_deleted == False ) ) ) return result.scalar_one_or_none() async def get_room_by_id(self, room_id: int) -> Optional[PracticeRoom]: """根据ID获取房间""" result = await self.db.execute( select(PracticeRoom).where( and_( PracticeRoom.id == room_id, PracticeRoom.is_deleted == False ) ) ) return result.scalar_one_or_none() async def get_user_rooms( self, user_id: int, status: Optional[str] = None, limit: int = 20 ) -> List[PracticeRoom]: """获取用户的房间列表""" query = select(PracticeRoom).where( and_( (PracticeRoom.host_user_id == user_id) | (PracticeRoom.guest_user_id == user_id), PracticeRoom.is_deleted == False ) ) if status: query = query.where(PracticeRoom.status == status) query = query.order_by(PracticeRoom.created_at.desc()).limit(limit) result = await self.db.execute(query) return list(result.scalars().all()) async def get_room_with_users(self, room_code: str) -> Optional[Dict[str, Any]]: """获取房间详情(包含用户信息)""" room = await self.get_room_by_code(room_code) if not room: return None # 获取用户信息 host_user = None guest_user = None if room.host_user_id: result = await self.db.execute( select(User).where(User.id == room.host_user_id) ) host_user = result.scalar_one_or_none() if room.guest_user_id: result = await self.db.execute( select(User).where(User.id == room.guest_user_id) ) guest_user = result.scalar_one_or_none() return { "room": room, "host_user": host_user, "guest_user": guest_user, "host_role_name": room.get_role_name(room.host_role), "guest_role_name": room.get_role_name("B" if room.host_role == "A" else "A") if guest_user else None } # ==================== 辅助方法 ==================== async def _generate_unique_room_code(self) -> str: """生成唯一的6位房间码""" for _ in range(10): # 最多尝试10次 code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6)) # 排除容易混淆的字符 code = code.replace('0', 'X').replace('O', 'Y').replace('I', 'Z').replace('1', 'W') # 检查是否已存在 existing = await self.get_room_by_code(code) if not existing: return code raise ValueError("无法生成唯一房间码,请稍后重试") async def _get_next_sequence(self, room_id: int) -> int: """获取下一个消息序号""" result = await self.db.execute( select(PracticeRoomMessage.sequence) .where(PracticeRoomMessage.room_id == room_id) .order_by(PracticeRoomMessage.sequence.desc()) .limit(1) ) last_seq = result.scalar_one_or_none() return (last_seq or 0) + 1 async def _add_system_message( self, room_id: int, content: str, msg_type: str, user_id: Optional[int] = None ) -> PracticeRoomMessage: """添加系统消息""" sequence = await self._get_next_sequence(room_id) message = PracticeRoomMessage( room_id=room_id, user_id=user_id, message_type=msg_type, content=content, sequence=sequence ) self.db.add(message) await self.db.commit() await self.db.refresh(message) return message # ==================== 便捷函数 ==================== async def create_practice_room( db: AsyncSession, host_user_id: int, **kwargs ) -> PracticeRoom: """便捷函数:创建房间""" service = PracticeRoomService(db) return await service.create_room(host_user_id, **kwargs) async def join_practice_room( db: AsyncSession, room_code: str, user_id: int ) -> PracticeRoom: """便捷函数:加入房间""" service = PracticeRoomService(db) return await service.join_room(room_code, user_id)