""" 本地 AI 服务 - 遵循瑞小美 AI 接入规范 功能: - 支持 4sapi.com(首选)和 OpenRouter(备选)自动降级 - 统一的请求/响应格式 - 调用日志记录 """ import json import logging import time from dataclasses import dataclass, field from typing import Any, AsyncGenerator, Dict, List, Optional, Union from enum import Enum import httpx logger = logging.getLogger(__name__) class AIProvider(Enum): """AI 服务商""" PRIMARY = "4sapi" # 首选:4sapi.com FALLBACK = "openrouter" # 备选:OpenRouter @dataclass class AIResponse: """AI 响应结果""" content: str # AI 回复内容 model: str = "" # 使用的模型 provider: str = "" # 实际使用的服务商 input_tokens: int = 0 # 输入 token 数 output_tokens: int = 0 # 输出 token 数 total_tokens: int = 0 # 总 token 数 cost: float = 0.0 # 费用(美元) latency_ms: int = 0 # 响应延迟(毫秒) raw_response: Dict[str, Any] = field(default_factory=dict) # 原始响应 images: List[str] = field(default_factory=list) # 图像生成结果 annotations: Dict[str, Any] = field(default_factory=dict) # PDF 解析注释 @dataclass class AIConfig: """AI 服务配置""" primary_api_key: str # 通用 Key(Gemini/DeepSeek 等) anthropic_api_key: str = "" # Claude 专属 Key primary_base_url: str = "https://4sapi.com/v1" fallback_api_key: str = "" fallback_base_url: str = "https://openrouter.ai/api/v1" default_model: str = "claude-opus-4-5-20251101-thinking" # 默认使用最强模型 timeout: float = 120.0 max_retries: int = 2 # Claude 模型列表(需要使用 anthropic_api_key) CLAUDE_MODELS = [ "claude-opus-4-5-20251101-thinking", "claude-opus-4-5-20251101", "claude-sonnet-4-20250514", "claude-3-opus", "claude-3-sonnet", "claude-3-haiku", ] def is_claude_model(model: str) -> bool: """判断是否为 Claude 模型""" model_lower = model.lower() return any(claude in model_lower for claude in ["claude", "anthropic"]) # 模型名称映射:4sapi -> OpenRouter MODEL_MAPPING = { # 4sapi 使用简短名称,OpenRouter 使用完整路径 "gemini-3-flash-preview": "google/gemini-3-flash-preview", "gemini-3-pro-preview": "google/gemini-3-pro-preview", "claude-opus-4-5-20251101-thinking": "anthropic/claude-opus-4.5", "gemini-2.5-flash-image-preview": "google/gemini-2.0-flash-exp:free", } # 反向映射:OpenRouter -> 4sapi MODEL_MAPPING_REVERSE = {v: k for k, v in MODEL_MAPPING.items()} class AIServiceError(Exception): """AI 服务错误""" def __init__(self, message: str, provider: str = "", status_code: int = 0): super().__init__(message) self.provider = provider self.status_code = status_code class AIService: """ 本地 AI 服务 遵循瑞小美 AI 接入规范: - 首选 4sapi.com,失败自动降级到 OpenRouter - 统一的响应格式 - 自动模型名称转换 使用示例: ```python ai = AIService(module_code="knowledge_analysis") response = await ai.chat( messages=[ {"role": "system", "content": "你是助手"}, {"role": "user", "content": "你好"} ], prompt_name="greeting" ) print(response.content) ``` """ def __init__( self, module_code: str = "default", config: Optional[AIConfig] = None, db_session: Any = None ): """ 初始化 AI 服务 配置加载优先级(遵循瑞小美 AI 接入规范): 1. 显式传入的 config 参数 2. 数据库 ai_config 表(推荐) 3. 环境变量(fallback) Args: module_code: 模块标识,用于统计 config: AI 配置,None 则从数据库/环境变量读取 db_session: 数据库会话,用于记录调用日志和读取配置 """ self.module_code = module_code self.db_session = db_session self.config = config or self._load_config(db_session) logger.info(f"AIService 初始化: module={module_code}, primary={self.config.primary_base_url}") def _load_config(self, db_session: Any) -> AIConfig: """ 加载配置 配置加载优先级(遵循瑞小美 AI 接入规范): 1. 管理库 tenant_configs 表(推荐,通过 DynamicConfig) 2. 环境变量(fallback) Args: db_session: 数据库会话(可选,用于日志记录) Returns: AIConfig 配置对象 """ # 优先从管理库加载(同步方式) try: config = self._load_config_from_admin_db() if config: logger.info("✅ AI 配置已从管理库(tenant_configs)加载") return config except Exception as e: logger.debug(f"从管理库加载 AI 配置失败: {e}") # Fallback 到环境变量 logger.info("AI 配置从环境变量加载") return self._load_config_from_env() def _load_config_from_admin_db(self) -> Optional[AIConfig]: """ 从管理库 tenant_configs 表加载配置 使用同步方式直接查询 kaopeilian_admin.tenant_configs 表 Returns: AIConfig 配置对象,如果无数据则返回 None """ import os # 获取当前租户编码 tenant_code = os.getenv("TENANT_CODE", "demo") # 获取管理库连接信息 admin_db_host = os.getenv("ADMIN_DB_HOST", "prod-mysql") admin_db_port = int(os.getenv("ADMIN_DB_PORT", "3306")) admin_db_user = os.getenv("ADMIN_DB_USER", "root") admin_db_password = os.getenv("ADMIN_DB_PASSWORD", "") admin_db_name = os.getenv("ADMIN_DB_NAME", "kaopeilian_admin") if not admin_db_password: logger.debug("ADMIN_DB_PASSWORD 未配置,跳过管理库配置加载") return None try: from sqlalchemy import create_engine, text import urllib.parse # 构建连接 URL encoded_password = urllib.parse.quote_plus(admin_db_password) admin_db_url = f"mysql+pymysql://{admin_db_user}:{encoded_password}@{admin_db_host}:{admin_db_port}/{admin_db_name}?charset=utf8mb4" engine = create_engine(admin_db_url, pool_pre_ping=True) with engine.connect() as conn: # 1. 获取租户 ID result = conn.execute( text("SELECT id FROM tenants WHERE code = :code AND status = 'active'"), {"code": tenant_code} ) row = result.fetchone() if not row: logger.debug(f"租户 {tenant_code} 不存在或未激活") engine.dispose() return None tenant_id = row[0] # 2. 获取 AI 配置 result = conn.execute( text(""" SELECT config_key, config_value FROM tenant_configs WHERE tenant_id = :tenant_id AND config_group = 'ai' """), {"tenant_id": tenant_id} ) rows = result.fetchall() engine.dispose() if not rows: logger.debug(f"租户 {tenant_code} 无 AI 配置") return None # 转换为字典 config_dict = {row[0]: row[1] for row in rows} # 检查必要的配置是否存在 primary_key = config_dict.get("AI_PRIMARY_API_KEY", "") if not primary_key: logger.warning(f"租户 {tenant_code} 的 AI_PRIMARY_API_KEY 为空") return None logger.info(f"✅ 从管理库加载租户 {tenant_code} 的 AI 配置成功") return AIConfig( primary_api_key=primary_key, anthropic_api_key=config_dict.get("AI_ANTHROPIC_API_KEY", ""), primary_base_url=config_dict.get("AI_PRIMARY_BASE_URL", "https://4sapi.com/v1"), fallback_api_key=config_dict.get("AI_FALLBACK_API_KEY", ""), fallback_base_url=config_dict.get("AI_FALLBACK_BASE_URL", "https://openrouter.ai/api/v1"), default_model=config_dict.get("AI_DEFAULT_MODEL", "claude-opus-4-5-20251101-thinking"), timeout=float(config_dict.get("AI_TIMEOUT", "120")), ) except Exception as e: logger.debug(f"从管理库读取 AI 配置异常: {e}") return None def _load_config_from_env(self) -> AIConfig: """ 从环境变量加载配置 ⚠️ 强制要求(遵循瑞小美 AI 接入规范): - 禁止在代码中硬编码 API Key - 必须通过环境变量配置 Key 必须配置的环境变量: - AI_PRIMARY_API_KEY: 通用 Key(用于 Gemini/DeepSeek 等) - AI_ANTHROPIC_API_KEY: Claude 专属 Key """ import os primary_api_key = os.getenv("AI_PRIMARY_API_KEY", "") anthropic_api_key = os.getenv("AI_ANTHROPIC_API_KEY", "") # 检查必要的 Key 是否已配置 if not primary_api_key: logger.warning("⚠️ AI_PRIMARY_API_KEY 未配置,AI 服务可能无法正常工作") if not anthropic_api_key: logger.warning("⚠️ AI_ANTHROPIC_API_KEY 未配置,Claude 模型调用将失败") return AIConfig( # 通用 Key(Gemini/DeepSeek 等非 Anthropic 模型) primary_api_key=primary_api_key, # Claude 专属 Key anthropic_api_key=anthropic_api_key, primary_base_url=os.getenv("AI_PRIMARY_BASE_URL", "https://4sapi.com/v1"), fallback_api_key=os.getenv("AI_FALLBACK_API_KEY", ""), fallback_base_url=os.getenv("AI_FALLBACK_BASE_URL", "https://openrouter.ai/api/v1"), # 默认模型:遵循"优先最强"原则,使用 Claude Opus 4.5 default_model=os.getenv("AI_DEFAULT_MODEL", "claude-opus-4-5-20251101-thinking"), timeout=float(os.getenv("AI_TIMEOUT", "120")), ) def _convert_model_name(self, model: str, provider: AIProvider) -> str: """ 转换模型名称以匹配服务商格式 Args: model: 原始模型名称 provider: 目标服务商 Returns: 转换后的模型名称 """ if provider == AIProvider.FALLBACK: # 4sapi -> OpenRouter return MODEL_MAPPING.get(model, f"google/{model}" if "/" not in model else model) else: # OpenRouter -> 4sapi return MODEL_MAPPING_REVERSE.get(model, model.split("/")[-1] if "/" in model else model) async def chat( self, messages: List[Dict[str, str]], model: Optional[str] = None, temperature: float = 0.7, max_tokens: Optional[int] = None, prompt_name: str = "default", **kwargs ) -> AIResponse: """ 文本聊天 Args: messages: 消息列表 [{"role": "system/user/assistant", "content": "..."}] model: 模型名称,None 使用默认模型 temperature: 温度参数 max_tokens: 最大输出 token 数 prompt_name: 提示词名称,用于统计 **kwargs: 其他参数 Returns: AIResponse 响应对象 """ model = model or self.config.default_model # 构建请求体 payload = { "model": model, "messages": messages, "temperature": temperature, } if max_tokens: payload["max_tokens"] = max_tokens # 首选服务商 try: return await self._call_provider( provider=AIProvider.PRIMARY, endpoint="/chat/completions", payload=payload, prompt_name=prompt_name ) except AIServiceError as e: logger.warning(f"首选服务商调用失败: {e}, 尝试降级到备选服务商") # 如果没有备选 API Key,直接抛出异常 if not self.config.fallback_api_key: raise # 降级到备选服务商 # 转换模型名称 fallback_model = self._convert_model_name(model, AIProvider.FALLBACK) payload["model"] = fallback_model return await self._call_provider( provider=AIProvider.FALLBACK, endpoint="/chat/completions", payload=payload, prompt_name=prompt_name ) async def chat_stream( self, messages: List[Dict[str, str]], model: Optional[str] = None, temperature: float = 0.7, max_tokens: Optional[int] = None, prompt_name: str = "default", **kwargs ) -> AsyncGenerator[str, None]: """ 流式文本聊天 Args: messages: 消息列表 [{"role": "system/user/assistant", "content": "..."}] model: 模型名称,None 使用默认模型 temperature: 温度参数 max_tokens: 最大输出 token 数 prompt_name: 提示词名称,用于统计 **kwargs: 其他参数 Yields: str: 文本块(逐字返回) """ model = model or self.config.default_model # 构建请求体 payload = { "model": model, "messages": messages, "temperature": temperature, "stream": True, } if max_tokens: payload["max_tokens"] = max_tokens # 首选服务商 try: async for chunk in self._call_provider_stream( provider=AIProvider.PRIMARY, endpoint="/chat/completions", payload=payload, prompt_name=prompt_name ): yield chunk return except AIServiceError as e: logger.warning(f"首选服务商流式调用失败: {e}, 尝试降级到备选服务商") # 如果没有备选 API Key,直接抛出异常 if not self.config.fallback_api_key: raise # 降级到备选服务商 # 转换模型名称 fallback_model = self._convert_model_name(model, AIProvider.FALLBACK) payload["model"] = fallback_model async for chunk in self._call_provider_stream( provider=AIProvider.FALLBACK, endpoint="/chat/completions", payload=payload, prompt_name=prompt_name ): yield chunk async def _call_provider_stream( self, provider: AIProvider, endpoint: str, payload: Dict[str, Any], prompt_name: str ) -> AsyncGenerator[str, None]: """ 流式调用指定服务商 Args: provider: 服务商 endpoint: API 端点 payload: 请求体 prompt_name: 提示词名称 Yields: str: 文本块 """ # 获取配置 if provider == AIProvider.PRIMARY: base_url = self.config.primary_base_url # 根据模型选择 API Key:Claude 用专属 Key,其他用通用 Key model = payload.get("model", "") if is_claude_model(model) and self.config.anthropic_api_key: api_key = self.config.anthropic_api_key logger.debug(f"[Stream] 使用 Claude 专属 Key 调用模型: {model}") else: api_key = self.config.primary_api_key else: api_key = self.config.fallback_api_key base_url = self.config.fallback_base_url url = f"{base_url.rstrip('/')}{endpoint}" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } # OpenRouter 需要额外的 header if provider == AIProvider.FALLBACK: headers["HTTP-Referer"] = "https://kaopeilian.ireborn.com.cn" headers["X-Title"] = "KaoPeiLian" start_time = time.time() try: timeout = httpx.Timeout(self.config.timeout, connect=10.0) async with httpx.AsyncClient(timeout=timeout) as client: logger.info(f"流式调用 AI 服务: provider={provider.value}, model={payload.get('model')}") async with client.stream("POST", url, json=payload, headers=headers) as response: # 检查响应状态 if response.status_code != 200: error_text = await response.aread() logger.error(f"AI 服务流式返回错误: status={response.status_code}, body={error_text[:500]}") raise AIServiceError( f"API 流式请求失败: HTTP {response.status_code}", provider=provider.value, status_code=response.status_code ) # 处理 SSE 流 async for line in response.aiter_lines(): if not line or not line.strip(): continue # 解析 SSE 数据行 if line.startswith("data: "): data_str = line[6:] # 移除 "data: " 前缀 # 检查是否是结束标记 if data_str.strip() == "[DONE]": logger.info(f"流式响应完成: provider={provider.value}") return try: event_data = json.loads(data_str) # 提取 delta 内容 choices = event_data.get("choices", []) if choices: delta = choices[0].get("delta", {}) content = delta.get("content", "") if content: yield content except json.JSONDecodeError as e: logger.debug(f"解析流式数据失败: {e} - 数据: {data_str[:100]}") continue latency_ms = int((time.time() - start_time) * 1000) logger.info(f"流式调用完成: provider={provider.value}, latency={latency_ms}ms") except httpx.TimeoutException: latency_ms = int((time.time() - start_time) * 1000) logger.error(f"AI 服务流式超时: provider={provider.value}, latency={latency_ms}ms") raise AIServiceError(f"流式请求超时({self.config.timeout}秒)", provider=provider.value) except httpx.RequestError as e: logger.error(f"AI 服务流式网络错误: provider={provider.value}, error={e}") raise AIServiceError(f"流式网络错误: {e}", provider=provider.value) async def _call_provider( self, provider: AIProvider, endpoint: str, payload: Dict[str, Any], prompt_name: str ) -> AIResponse: """ 调用指定服务商 Args: provider: 服务商 endpoint: API 端点 payload: 请求体 prompt_name: 提示词名称 Returns: AIResponse 响应对象 """ # 获取配置 if provider == AIProvider.PRIMARY: base_url = self.config.primary_base_url # 根据模型选择 API Key:Claude 用专属 Key,其他用通用 Key model = payload.get("model", "") if is_claude_model(model) and self.config.anthropic_api_key: api_key = self.config.anthropic_api_key logger.debug(f"使用 Claude 专属 Key 调用模型: {model}") else: api_key = self.config.primary_api_key else: api_key = self.config.fallback_api_key base_url = self.config.fallback_base_url url = f"{base_url.rstrip('/')}{endpoint}" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } # OpenRouter 需要额外的 header if provider == AIProvider.FALLBACK: headers["HTTP-Referer"] = "https://kaopeilian.ireborn.com.cn" headers["X-Title"] = "KaoPeiLian" start_time = time.time() try: async with httpx.AsyncClient(timeout=self.config.timeout) as client: logger.info(f"调用 AI 服务: provider={provider.value}, model={payload.get('model')}") response = await client.post(url, json=payload, headers=headers) latency_ms = int((time.time() - start_time) * 1000) # 检查响应状态 if response.status_code != 200: error_text = response.text logger.error(f"AI 服务返回错误: status={response.status_code}, body={error_text[:500]}") raise AIServiceError( f"API 请求失败: HTTP {response.status_code}", provider=provider.value, status_code=response.status_code ) data = response.json() # 解析响应 ai_response = self._parse_response(data, provider, latency_ms) # 记录日志 logger.info( f"AI 调用成功: provider={provider.value}, model={ai_response.model}, " f"tokens={ai_response.total_tokens}, latency={latency_ms}ms" ) # 保存到数据库(如果有 session) await self._log_call(prompt_name, ai_response) return ai_response except httpx.TimeoutException: latency_ms = int((time.time() - start_time) * 1000) logger.error(f"AI 服务超时: provider={provider.value}, latency={latency_ms}ms") raise AIServiceError(f"请求超时({self.config.timeout}秒)", provider=provider.value) except httpx.RequestError as e: logger.error(f"AI 服务网络错误: provider={provider.value}, error={e}") raise AIServiceError(f"网络错误: {e}", provider=provider.value) def _parse_response( self, data: Dict[str, Any], provider: AIProvider, latency_ms: int ) -> AIResponse: """解析 API 响应""" # 提取内容 choices = data.get("choices", []) if not choices: raise AIServiceError("响应中没有 choices") message = choices[0].get("message", {}) content = message.get("content", "") # 提取 usage usage = data.get("usage", {}) input_tokens = usage.get("prompt_tokens", 0) output_tokens = usage.get("completion_tokens", 0) total_tokens = usage.get("total_tokens", input_tokens + output_tokens) # 提取费用(如果有) cost = usage.get("total_cost", 0.0) return AIResponse( content=content, model=data.get("model", ""), provider=provider.value, input_tokens=input_tokens, output_tokens=output_tokens, total_tokens=total_tokens, cost=cost, latency_ms=latency_ms, raw_response=data ) async def _log_call(self, prompt_name: str, response: AIResponse) -> None: """记录调用日志到数据库""" if not self.db_session: return try: # TODO: 实现调用日志记录 # 可以参考 ai_call_logs 表结构 pass except Exception as e: logger.warning(f"记录 AI 调用日志失败: {e}") async def analyze_document( self, content: str, prompt: str, model: Optional[str] = None, prompt_name: str = "document_analysis" ) -> AIResponse: """ 分析文档内容 Args: content: 文档内容 prompt: 分析提示词 model: 模型名称 prompt_name: 提示词名称 Returns: AIResponse 响应对象 """ messages = [ {"role": "user", "content": f"{prompt}\n\n文档内容:\n{content}"} ] return await self.chat( messages=messages, model=model, temperature=0.1, # 文档分析使用低温度 prompt_name=prompt_name ) # 便捷函数 async def quick_chat( messages: List[Dict[str, str]], model: Optional[str] = None, module_code: str = "quick" ) -> str: """ 快速聊天,返回纯文本 Args: messages: 消息列表 model: 模型名称 module_code: 模块标识 Returns: AI 回复的文本内容 """ ai = AIService(module_code=module_code) response = await ai.chat(messages, model=model) return response.content # 模型常量(遵循瑞小美 AI 接入规范) # 按优先级排序:首选 > 标准 > 快速 MODEL_PRIMARY = "claude-opus-4-5-20251101-thinking" # 🥇 首选:所有任务首先尝试 MODEL_STANDARD = "gemini-3-pro-preview" # 🥈 标准:Claude 失败后降级 MODEL_FAST = "gemini-3-flash-preview" # 🥉 快速:最终保底 MODEL_IMAGE = "gemini-2.5-flash-image-preview" # 🖼️ 图像生成专用 MODEL_VIDEO = "veo3.1-pro" # 🎬 视频生成专用 # 兼容旧代码的别名 DEFAULT_MODEL = MODEL_PRIMARY # 默认使用最强模型 MODEL_ANALYSIS = MODEL_PRIMARY MODEL_CREATIVE = MODEL_STANDARD MODEL_IMAGE_GEN = MODEL_IMAGE