""" LLM JSON Parser - 大模型 JSON 输出解析器 功能: - 使用 json-repair 库修复 AI 输出的 JSON - 处理中文标点、尾部逗号、Python 风格等问题 - Schema 校验确保数据完整性 使用示例: ```python from app.services.ai.llm_json_parser import parse_llm_json, parse_with_fallback # 简单解析 result = parse_llm_json(ai_response) # 带 Schema 校验和默认值 result = parse_with_fallback( ai_response, schema=MY_SCHEMA, default=[] ) ``` """ import json import re import logging from typing import Any, Dict, List, Optional, Tuple, Union from dataclasses import dataclass, field logger = logging.getLogger(__name__) # 尝试导入 json-repair try: from json_repair import loads as json_repair_loads from json_repair import repair_json HAS_JSON_REPAIR = True except ImportError: HAS_JSON_REPAIR = False logger.warning("json-repair 未安装,将使用内置修复逻辑") # 尝试导入 jsonschema try: from jsonschema import validate, ValidationError, Draft7Validator HAS_JSONSCHEMA = True except ImportError: HAS_JSONSCHEMA = False logger.warning("jsonschema 未安装,将跳过 Schema 校验") # ==================== 异常类 ==================== class JSONParseError(Exception): """JSON 解析错误基类""" def __init__(self, message: str, raw_text: str = "", issues: List[dict] = None): super().__init__(message) self.raw_text = raw_text self.issues = issues or [] class JSONUnrecoverableError(JSONParseError): """不可恢复的 JSON 错误""" pass # ==================== 解析结果 ==================== @dataclass class ParseResult: """解析结果""" success: bool data: Any = None method: str = "" # direct / json_repair / preprocessed / fixed / completed / default issues: List[dict] = field(default_factory=list) raw_text: str = "" error: str = "" # ==================== 核心解析函数 ==================== def parse_llm_json( text: str, *, strict: bool = False, return_result: bool = False ) -> Union[Any, ParseResult]: """ 智能解析 LLM 输出的 JSON Args: text: 原始文本 strict: 严格模式,不进行自动修复 return_result: 返回 ParseResult 对象而非直接数据 Returns: 解析后的 JSON 对象,或 ParseResult(如果 return_result=True) Raises: JSONUnrecoverableError: 所有修复尝试都失败 """ if not text or not text.strip(): if return_result: return ParseResult(success=False, error="Empty input") raise JSONUnrecoverableError("Empty input", text) text = text.strip() issues = [] # 第一层:直接解析 try: data = json.loads(text) result = ParseResult(success=True, data=data, method="direct", raw_text=text) return result if return_result else data except json.JSONDecodeError: pass if strict: if return_result: return ParseResult(success=False, error="Strict mode: direct parse failed", raw_text=text) raise JSONUnrecoverableError("Strict mode: direct parse failed", text) # 第二层:使用 json-repair(推荐) if HAS_JSON_REPAIR: try: data = json_repair_loads(text) issues.append({"type": "json_repair", "action": "Auto-repaired by json-repair library"}) result = ParseResult(success=True, data=data, method="json_repair", issues=issues, raw_text=text) return result if return_result else data except Exception as e: logger.debug(f"json-repair 修复失败: {e}") # 第三层:预处理(提取代码块、清理文字) preprocessed = _preprocess_text(text) if preprocessed != text: try: data = json.loads(preprocessed) issues.append({"type": "preprocessed", "action": "Extracted JSON from text"}) result = ParseResult(success=True, data=data, method="preprocessed", issues=issues, raw_text=text) return result if return_result else data except json.JSONDecodeError: pass # 再次尝试 json-repair if HAS_JSON_REPAIR: try: data = json_repair_loads(preprocessed) issues.append({"type": "json_repair_preprocessed", "action": "Repaired after preprocessing"}) result = ParseResult(success=True, data=data, method="json_repair", issues=issues, raw_text=text) return result if return_result else data except Exception: pass # 第四层:自动修复 fixed, fix_issues = _fix_json_format(preprocessed) issues.extend(fix_issues) if fixed != preprocessed: try: data = json.loads(fixed) result = ParseResult(success=True, data=data, method="fixed", issues=issues, raw_text=text) return result if return_result else data except json.JSONDecodeError: pass # 第五层:尝试补全截断的 JSON completed = _try_complete_json(fixed) if completed: try: data = json.loads(completed) issues.append({"type": "completed", "action": "Auto-completed truncated JSON"}) result = ParseResult(success=True, data=data, method="completed", issues=issues, raw_text=text) return result if return_result else data except json.JSONDecodeError: pass # 所有尝试都失败 diagnosis = diagnose_json_error(fixed) if return_result: return ParseResult( success=False, method="failed", issues=issues + diagnosis.get("issues", []), raw_text=text, error=f"All parse attempts failed. Issues: {diagnosis}" ) raise JSONUnrecoverableError(f"All parse attempts failed: {diagnosis}", text, issues) def parse_with_fallback( raw_text: str, schema: dict = None, default: Any = None, *, validate_schema: bool = True, on_error: str = "default" # "default" / "raise" / "none" ) -> Any: """ 带兜底的 JSON 解析 Args: raw_text: 原始文本 schema: JSON Schema(可选) default: 默认值 validate_schema: 是否进行 Schema 校验 on_error: 错误处理方式 Returns: 解析后的数据或默认值 """ try: result = parse_llm_json(raw_text, return_result=True) if not result.success: logger.warning(f"JSON 解析失败: {result.error}") if on_error == "raise": raise JSONUnrecoverableError(result.error, raw_text, result.issues) elif on_error == "none": return None return default data = result.data # Schema 校验 if validate_schema and schema and HAS_JSONSCHEMA: is_valid, errors = validate_json_schema(data, schema) if not is_valid: logger.warning(f"Schema 校验失败: {errors}") if on_error == "raise": raise JSONUnrecoverableError(f"Schema validation failed: {errors}", raw_text) elif on_error == "none": return None return default # 记录解析方法 if result.method != "direct": logger.info(f"JSON 解析成功: method={result.method}, issues={result.issues}") return data except Exception as e: logger.error(f"JSON 解析异常: {e}") if on_error == "raise": raise elif on_error == "none": return None return default # ==================== 预处理函数 ==================== def _preprocess_text(text: str) -> str: """预处理文本:提取代码块、清理前后文字""" # 移除 BOM text = text.lstrip('\ufeff') # 移除零宽字符 text = re.sub(r'[\u200b\u200c\u200d\ufeff]', '', text) # 【重要】先替换中文标点为英文标点(在找边界之前做,否则中文引号会破坏边界检测) cn_punctuation = { ',': ',', '。': '.', ':': ':', ';': ';', '"': '"', '"': '"', ''': "'", ''': "'", '【': '[', '】': ']', '(': '(', ')': ')', '{': '{', '}': '}', } for cn, en in cn_punctuation.items(): text = text.replace(cn, en) # 提取 Markdown 代码块 patterns = [ r'```json\s*([\s\S]*?)\s*```', r'```\s*([\s\S]*?)\s*```', r'`([^`]+)`', ] for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: extracted = match.group(1).strip() if extracted.startswith(('{', '[')): text = extracted break # 找到 JSON 边界 text = _find_json_boundaries(text) return text.strip() def _find_json_boundaries(text: str) -> str: """找到 JSON 的起止位置""" # 找第一个 { 或 [ start = -1 for i, c in enumerate(text): if c in '{[': start = i break if start == -1: return text # 找最后一个匹配的 } 或 ] depth = 0 end = -1 in_string = False escape = False for i in range(start, len(text)): c = text[i] if escape: escape = False continue if c == '\\': escape = True continue if c == '"': in_string = not in_string continue if in_string: continue if c in '{[': depth += 1 elif c in '}]': depth -= 1 if depth == 0: end = i + 1 break if end == -1: # 找最后一个 } 或 ] for i in range(len(text) - 1, start, -1): if text[i] in '}]': end = i + 1 break if end > start: return text[start:end] return text[start:] # ==================== 修复函数 ==================== def _fix_json_format(text: str) -> Tuple[str, List[dict]]: """修复常见 JSON 格式问题""" issues = [] # 1. 中文标点转英文 cn_punctuation = { ',': ',', '。': '.', ':': ':', ';': ';', '"': '"', '"': '"', ''': "'", ''': "'", '【': '[', '】': ']', '(': '(', ')': ')', '{': '{', '}': '}', } for cn, en in cn_punctuation.items(): if cn in text: text = text.replace(cn, en) issues.append({"type": "chinese_punctuation", "from": cn, "to": en}) # 2. 移除注释 if '//' in text: text = re.sub(r'//[^\n]*', '', text) issues.append({"type": "removed_comments", "style": "single-line"}) if '/*' in text: text = re.sub(r'/\*[\s\S]*?\*/', '', text) issues.append({"type": "removed_comments", "style": "multi-line"}) # 3. Python 风格转 JSON python_replacements = [ (r'\bTrue\b', 'true'), (r'\bFalse\b', 'false'), (r'\bNone\b', 'null'), ] for pattern, replacement in python_replacements: if re.search(pattern, text): text = re.sub(pattern, replacement, text) issues.append({"type": "python_style", "from": pattern, "to": replacement}) # 4. 移除尾部逗号 trailing_comma_patterns = [ (r',(\s*})', r'\1'), (r',(\s*\])', r'\1'), ] for pattern, replacement in trailing_comma_patterns: if re.search(pattern, text): text = re.sub(pattern, replacement, text) issues.append({"type": "trailing_comma", "action": "removed"}) # 5. 修复单引号(谨慎处理) if text.count("'") > text.count('"') and re.match(r"^\s*\{?\s*'", text): text = re.sub(r"'([^']*)'(\s*:)", r'"\1"\2', text) text = re.sub(r":\s*'([^']*)'", r': "\1"', text) issues.append({"type": "single_quotes", "action": "replaced"}) return text, issues def _try_complete_json(text: str) -> Optional[str]: """尝试补全截断的 JSON""" if not text: return None # 统计括号 stack = [] in_string = False escape = False for c in text: if escape: escape = False continue if c == '\\': escape = True continue if c == '"': in_string = not in_string continue if in_string: continue if c in '{[': stack.append(c) elif c == '}': if stack and stack[-1] == '{': stack.pop() elif c == ']': if stack and stack[-1] == '[': stack.pop() if not stack: return None # 已经平衡了 # 如果在字符串中,先闭合字符串 if in_string: text += '"' # 补全括号 completion = "" for bracket in reversed(stack): if bracket == '{': completion += '}' elif bracket == '[': completion += ']' return text + completion # ==================== Schema 校验 ==================== def validate_json_schema(data: Any, schema: dict) -> Tuple[bool, List[dict]]: """ 校验 JSON 是否符合 Schema Returns: (is_valid, errors) """ if not HAS_JSONSCHEMA: logger.warning("jsonschema 未安装,跳过校验") return True, [] try: validator = Draft7Validator(schema) errors = list(validator.iter_errors(data)) if errors: error_messages = [ { "path": list(e.absolute_path), "message": e.message, "validator": e.validator } for e in errors ] return False, error_messages return True, [] except Exception as e: return False, [{"message": str(e)}] # ==================== 诊断函数 ==================== def diagnose_json_error(text: str) -> dict: """诊断 JSON 错误""" issues = [] # 检查是否为空 if not text or not text.strip(): issues.append({ "type": "empty_input", "severity": "critical", "suggestion": "输入为空" }) return {"issues": issues, "fixable": False} # 检查中文标点 cn_punctuation = [',', '。', ':', ';', '"', '"', ''', '''] for p in cn_punctuation: if p in text: issues.append({ "type": "chinese_punctuation", "char": p, "severity": "low", "suggestion": f"将 {p} 替换为对应英文标点" }) # 检查代码块包裹 if '```' in text: issues.append({ "type": "markdown_wrapped", "severity": "low", "suggestion": "需要提取代码块内容" }) # 检查注释 if '//' in text or '/*' in text: issues.append({ "type": "has_comments", "severity": "low", "suggestion": "需要移除注释" }) # 检查 Python 风格 if re.search(r'\b(True|False|None)\b', text): issues.append({ "type": "python_style", "severity": "low", "suggestion": "将 True/False/None 转为 true/false/null" }) # 检查尾部逗号 if re.search(r',\s*[}\]]', text): issues.append({ "type": "trailing_comma", "severity": "low", "suggestion": "移除 } 或 ] 前的逗号" }) # 检查括号平衡 open_braces = text.count('{') - text.count('}') open_brackets = text.count('[') - text.count(']') if open_braces > 0: issues.append({ "type": "unclosed_brace", "count": open_braces, "severity": "medium", "suggestion": f"缺少 {open_braces} 个 }}" }) elif open_braces < 0: issues.append({ "type": "extra_brace", "count": -open_braces, "severity": "medium", "suggestion": f"多余 {-open_braces} 个 }}" }) if open_brackets > 0: issues.append({ "type": "unclosed_bracket", "count": open_brackets, "severity": "medium", "suggestion": f"缺少 {open_brackets} 个 ]" }) elif open_brackets < 0: issues.append({ "type": "extra_bracket", "count": -open_brackets, "severity": "medium", "suggestion": f"多余 {-open_brackets} 个 ]" }) # 检查引号平衡 quote_count = text.count('"') if quote_count % 2 != 0: issues.append({ "type": "unbalanced_quotes", "severity": "high", "suggestion": "引号数量不平衡,可能有未闭合的字符串" }) # 判断是否可修复 fixable_types = { "chinese_punctuation", "markdown_wrapped", "has_comments", "python_style", "trailing_comma", "unclosed_brace", "unclosed_bracket" } fixable = all(i["type"] in fixable_types for i in issues) return { "issues": issues, "issue_count": len(issues), "fixable": fixable, "severity": max( (i.get("severity", "low") for i in issues), key=lambda x: {"low": 1, "medium": 2, "high": 3, "critical": 4}.get(x, 0), default="low" ) } # ==================== 便捷函数 ==================== def safe_json_loads(text: str, default: Any = None) -> Any: """安全的 json.loads,失败返回默认值""" try: return parse_llm_json(text) except Exception: return default def extract_json_from_text(text: str) -> Optional[str]: """从文本中提取 JSON 字符串""" preprocessed = _preprocess_text(text) fixed, _ = _fix_json_format(preprocessed) try: json.loads(fixed) return fixed except Exception: completed = _try_complete_json(fixed) if completed: try: json.loads(completed) return completed except Exception: pass return None def clean_llm_output(text: str) -> Tuple[str, List[str]]: """ 清洗大模型输出,返回清洗后的文本和应用的清洗规则 Args: text: 原始输出文本 Returns: (cleaned_text, applied_rules) """ if not text: return "", ["empty_input"] applied_rules = [] original = text # 1. 去除 BOM 头 if text.startswith('\ufeff'): text = text.lstrip('\ufeff') applied_rules.append("removed_bom") # 2. 去除 ANSI 转义序列 ansi_pattern = re.compile(r'\x1b\[[0-9;]*m') if ansi_pattern.search(text): text = ansi_pattern.sub('', text) applied_rules.append("removed_ansi") # 3. 去除首尾空白 text = text.strip() # 4. 去除开头的客套话 polite_patterns = [ r'^好的[,,。.]?\s*', r'^当然[,,。.]?\s*', r'^没问题[,,。.]?\s*', r'^根据您的要求[,,。.]?\s*', r'^以下是.*?[::]\s*', r'^分析结果如下[::]\s*', r'^我来为您.*?[::]\s*', r'^这是.*?结果[::]\s*', ] for pattern in polite_patterns: if re.match(pattern, text, re.IGNORECASE): text = re.sub(pattern, '', text, flags=re.IGNORECASE) applied_rules.append("removed_polite_prefix") break # 5. 提取 Markdown JSON 代码块 json_block_patterns = [ r'```json\s*([\s\S]*?)\s*```', r'```\s*([\s\S]*?)\s*```', ] for pattern in json_block_patterns: match = re.search(pattern, text, re.IGNORECASE) if match: extracted = match.group(1).strip() if extracted.startswith(('{', '[')): text = extracted applied_rules.append("extracted_code_block") break # 6. 处理零宽字符 zero_width = re.compile(r'[\u200b\u200c\u200d\ufeff]') if zero_width.search(text): text = zero_width.sub('', text) applied_rules.append("removed_zero_width") return text.strip(), applied_rules