Some checks failed
continuous-integration/drone/push Build is failing
更新内容: - 后端 AI 服务优化(能力分析、知识点解析等) - 前端考试和陪练界面更新 - 修复多个 prompt 和 JSON 解析问题 - 更新 Coze 语音客户端
719 lines
20 KiB
Python
719 lines
20 KiB
Python
"""
|
||
LLM JSON Parser - 大模型 JSON 输出解析器
|
||
|
||
功能:
|
||
- 使用 json-repair 库修复 AI 输出的 JSON
|
||
- 处理中文标点、尾部逗号、Python 风格等问题
|
||
- Schema 校验确保数据完整性
|
||
|
||
使用示例:
|
||
```python
|
||
from app.services.ai.llm_json_parser import parse_llm_json, parse_with_fallback
|
||
|
||
# 简单解析
|
||
result = parse_llm_json(ai_response)
|
||
|
||
# 带 Schema 校验和默认值
|
||
result = parse_with_fallback(
|
||
ai_response,
|
||
schema=MY_SCHEMA,
|
||
default=[]
|
||
)
|
||
```
|
||
"""
|
||
|
||
import json
|
||
import re
|
||
import logging
|
||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||
from dataclasses import dataclass, field
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 尝试导入 json-repair
|
||
try:
|
||
from json_repair import loads as json_repair_loads
|
||
from json_repair import repair_json
|
||
HAS_JSON_REPAIR = True
|
||
except ImportError:
|
||
HAS_JSON_REPAIR = False
|
||
logger.warning("json-repair 未安装,将使用内置修复逻辑")
|
||
|
||
# 尝试导入 jsonschema
|
||
try:
|
||
from jsonschema import validate, ValidationError, Draft7Validator
|
||
HAS_JSONSCHEMA = True
|
||
except ImportError:
|
||
HAS_JSONSCHEMA = False
|
||
logger.warning("jsonschema 未安装,将跳过 Schema 校验")
|
||
|
||
|
||
# ==================== 异常类 ====================
|
||
|
||
class JSONParseError(Exception):
|
||
"""JSON 解析错误基类"""
|
||
def __init__(self, message: str, raw_text: str = "", issues: List[dict] = None):
|
||
super().__init__(message)
|
||
self.raw_text = raw_text
|
||
self.issues = issues or []
|
||
|
||
|
||
class JSONUnrecoverableError(JSONParseError):
|
||
"""不可恢复的 JSON 错误"""
|
||
pass
|
||
|
||
|
||
# ==================== 解析结果 ====================
|
||
|
||
@dataclass
|
||
class ParseResult:
|
||
"""解析结果"""
|
||
success: bool
|
||
data: Any = None
|
||
method: str = "" # direct / json_repair / preprocessed / fixed / completed / default
|
||
issues: List[dict] = field(default_factory=list)
|
||
raw_text: str = ""
|
||
error: str = ""
|
||
|
||
|
||
# ==================== 核心解析函数 ====================
|
||
|
||
def parse_llm_json(
|
||
text: str,
|
||
*,
|
||
strict: bool = False,
|
||
return_result: bool = False
|
||
) -> Union[Any, ParseResult]:
|
||
"""
|
||
智能解析 LLM 输出的 JSON
|
||
|
||
Args:
|
||
text: 原始文本
|
||
strict: 严格模式,不进行自动修复
|
||
return_result: 返回 ParseResult 对象而非直接数据
|
||
|
||
Returns:
|
||
解析后的 JSON 对象,或 ParseResult(如果 return_result=True)
|
||
|
||
Raises:
|
||
JSONUnrecoverableError: 所有修复尝试都失败
|
||
"""
|
||
if not text or not text.strip():
|
||
if return_result:
|
||
return ParseResult(success=False, error="Empty input")
|
||
raise JSONUnrecoverableError("Empty input", text)
|
||
|
||
text = text.strip()
|
||
issues = []
|
||
|
||
# 第一层:直接解析
|
||
try:
|
||
data = json.loads(text)
|
||
result = ParseResult(success=True, data=data, method="direct", raw_text=text)
|
||
return result if return_result else data
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
if strict:
|
||
if return_result:
|
||
return ParseResult(success=False, error="Strict mode: direct parse failed", raw_text=text)
|
||
raise JSONUnrecoverableError("Strict mode: direct parse failed", text)
|
||
|
||
# 第二层:使用 json-repair(推荐)
|
||
if HAS_JSON_REPAIR:
|
||
try:
|
||
data = json_repair_loads(text)
|
||
issues.append({"type": "json_repair", "action": "Auto-repaired by json-repair library"})
|
||
result = ParseResult(success=True, data=data, method="json_repair", issues=issues, raw_text=text)
|
||
return result if return_result else data
|
||
except Exception as e:
|
||
logger.debug(f"json-repair 修复失败: {e}")
|
||
|
||
# 第三层:预处理(提取代码块、清理文字)
|
||
preprocessed = _preprocess_text(text)
|
||
if preprocessed != text:
|
||
try:
|
||
data = json.loads(preprocessed)
|
||
issues.append({"type": "preprocessed", "action": "Extracted JSON from text"})
|
||
result = ParseResult(success=True, data=data, method="preprocessed", issues=issues, raw_text=text)
|
||
return result if return_result else data
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
# 再次尝试 json-repair
|
||
if HAS_JSON_REPAIR:
|
||
try:
|
||
data = json_repair_loads(preprocessed)
|
||
issues.append({"type": "json_repair_preprocessed", "action": "Repaired after preprocessing"})
|
||
result = ParseResult(success=True, data=data, method="json_repair", issues=issues, raw_text=text)
|
||
return result if return_result else data
|
||
except Exception:
|
||
pass
|
||
|
||
# 第四层:自动修复
|
||
fixed, fix_issues = _fix_json_format(preprocessed)
|
||
issues.extend(fix_issues)
|
||
|
||
if fixed != preprocessed:
|
||
try:
|
||
data = json.loads(fixed)
|
||
result = ParseResult(success=True, data=data, method="fixed", issues=issues, raw_text=text)
|
||
return result if return_result else data
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
# 第五层:尝试补全截断的 JSON
|
||
completed = _try_complete_json(fixed)
|
||
if completed:
|
||
try:
|
||
data = json.loads(completed)
|
||
issues.append({"type": "completed", "action": "Auto-completed truncated JSON"})
|
||
result = ParseResult(success=True, data=data, method="completed", issues=issues, raw_text=text)
|
||
return result if return_result else data
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
# 所有尝试都失败
|
||
diagnosis = diagnose_json_error(fixed)
|
||
if return_result:
|
||
return ParseResult(
|
||
success=False,
|
||
method="failed",
|
||
issues=issues + diagnosis.get("issues", []),
|
||
raw_text=text,
|
||
error=f"All parse attempts failed. Issues: {diagnosis}"
|
||
)
|
||
raise JSONUnrecoverableError(f"All parse attempts failed: {diagnosis}", text, issues)
|
||
|
||
|
||
def parse_with_fallback(
|
||
raw_text: str,
|
||
schema: dict = None,
|
||
default: Any = None,
|
||
*,
|
||
validate_schema: bool = True,
|
||
on_error: str = "default" # "default" / "raise" / "none"
|
||
) -> Any:
|
||
"""
|
||
带兜底的 JSON 解析
|
||
|
||
Args:
|
||
raw_text: 原始文本
|
||
schema: JSON Schema(可选)
|
||
default: 默认值
|
||
validate_schema: 是否进行 Schema 校验
|
||
on_error: 错误处理方式
|
||
|
||
Returns:
|
||
解析后的数据或默认值
|
||
"""
|
||
try:
|
||
result = parse_llm_json(raw_text, return_result=True)
|
||
|
||
if not result.success:
|
||
logger.warning(f"JSON 解析失败: {result.error}")
|
||
if on_error == "raise":
|
||
raise JSONUnrecoverableError(result.error, raw_text, result.issues)
|
||
elif on_error == "none":
|
||
return None
|
||
return default
|
||
|
||
data = result.data
|
||
|
||
# Schema 校验
|
||
if validate_schema and schema and HAS_JSONSCHEMA:
|
||
is_valid, errors = validate_json_schema(data, schema)
|
||
if not is_valid:
|
||
logger.warning(f"Schema 校验失败: {errors}")
|
||
if on_error == "raise":
|
||
raise JSONUnrecoverableError(f"Schema validation failed: {errors}", raw_text)
|
||
elif on_error == "none":
|
||
return None
|
||
return default
|
||
|
||
# 记录解析方法
|
||
if result.method != "direct":
|
||
logger.info(f"JSON 解析成功: method={result.method}, issues={result.issues}")
|
||
|
||
return data
|
||
|
||
except Exception as e:
|
||
logger.error(f"JSON 解析异常: {e}")
|
||
if on_error == "raise":
|
||
raise
|
||
elif on_error == "none":
|
||
return None
|
||
return default
|
||
|
||
|
||
# ==================== 预处理函数 ====================
|
||
|
||
def _preprocess_text(text: str) -> str:
|
||
"""预处理文本:提取代码块、清理前后文字"""
|
||
# 移除 BOM
|
||
text = text.lstrip('\ufeff')
|
||
|
||
# 移除零宽字符
|
||
text = re.sub(r'[\u200b\u200c\u200d\ufeff]', '', text)
|
||
|
||
# 【重要】先替换中文标点为英文标点(在找边界之前做,否则中文引号会破坏边界检测)
|
||
cn_punctuation = {
|
||
',': ',', '。': '.', ':': ':', ';': ';',
|
||
'"': '"', '"': '"', ''': "'", ''': "'",
|
||
'【': '[', '】': ']', '(': '(', ')': ')',
|
||
'{': '{', '}': '}',
|
||
}
|
||
for cn, en in cn_punctuation.items():
|
||
text = text.replace(cn, en)
|
||
|
||
# 提取 Markdown 代码块
|
||
patterns = [
|
||
r'```json\s*([\s\S]*?)\s*```',
|
||
r'```\s*([\s\S]*?)\s*```',
|
||
r'`([^`]+)`',
|
||
]
|
||
|
||
for pattern in patterns:
|
||
match = re.search(pattern, text, re.IGNORECASE)
|
||
if match:
|
||
extracted = match.group(1).strip()
|
||
if extracted.startswith(('{', '[')):
|
||
text = extracted
|
||
break
|
||
|
||
# 找到 JSON 边界
|
||
text = _find_json_boundaries(text)
|
||
|
||
return text.strip()
|
||
|
||
|
||
def _find_json_boundaries(text: str) -> str:
|
||
"""找到 JSON 的起止位置"""
|
||
# 找第一个 { 或 [
|
||
start = -1
|
||
for i, c in enumerate(text):
|
||
if c in '{[':
|
||
start = i
|
||
break
|
||
|
||
if start == -1:
|
||
return text
|
||
|
||
# 找最后一个匹配的 } 或 ]
|
||
depth = 0
|
||
end = -1
|
||
in_string = False
|
||
escape = False
|
||
|
||
for i in range(start, len(text)):
|
||
c = text[i]
|
||
|
||
if escape:
|
||
escape = False
|
||
continue
|
||
|
||
if c == '\\':
|
||
escape = True
|
||
continue
|
||
|
||
if c == '"':
|
||
in_string = not in_string
|
||
continue
|
||
|
||
if in_string:
|
||
continue
|
||
|
||
if c in '{[':
|
||
depth += 1
|
||
elif c in '}]':
|
||
depth -= 1
|
||
if depth == 0:
|
||
end = i + 1
|
||
break
|
||
|
||
if end == -1:
|
||
# 找最后一个 } 或 ]
|
||
for i in range(len(text) - 1, start, -1):
|
||
if text[i] in '}]':
|
||
end = i + 1
|
||
break
|
||
|
||
if end > start:
|
||
return text[start:end]
|
||
|
||
return text[start:]
|
||
|
||
|
||
# ==================== 修复函数 ====================
|
||
|
||
def _fix_json_format(text: str) -> Tuple[str, List[dict]]:
|
||
"""修复常见 JSON 格式问题"""
|
||
issues = []
|
||
|
||
# 1. 中文标点转英文
|
||
cn_punctuation = {
|
||
',': ',', '。': '.', ':': ':', ';': ';',
|
||
'"': '"', '"': '"', ''': "'", ''': "'",
|
||
'【': '[', '】': ']', '(': '(', ')': ')',
|
||
'{': '{', '}': '}',
|
||
}
|
||
for cn, en in cn_punctuation.items():
|
||
if cn in text:
|
||
text = text.replace(cn, en)
|
||
issues.append({"type": "chinese_punctuation", "from": cn, "to": en})
|
||
|
||
# 2. 移除注释
|
||
if '//' in text:
|
||
text = re.sub(r'//[^\n]*', '', text)
|
||
issues.append({"type": "removed_comments", "style": "single-line"})
|
||
|
||
if '/*' in text:
|
||
text = re.sub(r'/\*[\s\S]*?\*/', '', text)
|
||
issues.append({"type": "removed_comments", "style": "multi-line"})
|
||
|
||
# 3. Python 风格转 JSON
|
||
python_replacements = [
|
||
(r'\bTrue\b', 'true'),
|
||
(r'\bFalse\b', 'false'),
|
||
(r'\bNone\b', 'null'),
|
||
]
|
||
for pattern, replacement in python_replacements:
|
||
if re.search(pattern, text):
|
||
text = re.sub(pattern, replacement, text)
|
||
issues.append({"type": "python_style", "from": pattern, "to": replacement})
|
||
|
||
# 4. 移除尾部逗号
|
||
trailing_comma_patterns = [
|
||
(r',(\s*})', r'\1'),
|
||
(r',(\s*\])', r'\1'),
|
||
]
|
||
for pattern, replacement in trailing_comma_patterns:
|
||
if re.search(pattern, text):
|
||
text = re.sub(pattern, replacement, text)
|
||
issues.append({"type": "trailing_comma", "action": "removed"})
|
||
|
||
# 5. 修复单引号(谨慎处理)
|
||
if text.count("'") > text.count('"') and re.match(r"^\s*\{?\s*'", text):
|
||
text = re.sub(r"'([^']*)'(\s*:)", r'"\1"\2', text)
|
||
text = re.sub(r":\s*'([^']*)'", r': "\1"', text)
|
||
issues.append({"type": "single_quotes", "action": "replaced"})
|
||
|
||
return text, issues
|
||
|
||
|
||
def _try_complete_json(text: str) -> Optional[str]:
|
||
"""尝试补全截断的 JSON"""
|
||
if not text:
|
||
return None
|
||
|
||
# 统计括号
|
||
stack = []
|
||
in_string = False
|
||
escape = False
|
||
|
||
for c in text:
|
||
if escape:
|
||
escape = False
|
||
continue
|
||
|
||
if c == '\\':
|
||
escape = True
|
||
continue
|
||
|
||
if c == '"':
|
||
in_string = not in_string
|
||
continue
|
||
|
||
if in_string:
|
||
continue
|
||
|
||
if c in '{[':
|
||
stack.append(c)
|
||
elif c == '}':
|
||
if stack and stack[-1] == '{':
|
||
stack.pop()
|
||
elif c == ']':
|
||
if stack and stack[-1] == '[':
|
||
stack.pop()
|
||
|
||
if not stack:
|
||
return None # 已经平衡了
|
||
|
||
# 如果在字符串中,先闭合字符串
|
||
if in_string:
|
||
text += '"'
|
||
|
||
# 补全括号
|
||
completion = ""
|
||
for bracket in reversed(stack):
|
||
if bracket == '{':
|
||
completion += '}'
|
||
elif bracket == '[':
|
||
completion += ']'
|
||
|
||
return text + completion
|
||
|
||
|
||
# ==================== Schema 校验 ====================
|
||
|
||
def validate_json_schema(data: Any, schema: dict) -> Tuple[bool, List[dict]]:
|
||
"""
|
||
校验 JSON 是否符合 Schema
|
||
|
||
Returns:
|
||
(is_valid, errors)
|
||
"""
|
||
if not HAS_JSONSCHEMA:
|
||
logger.warning("jsonschema 未安装,跳过校验")
|
||
return True, []
|
||
|
||
try:
|
||
validator = Draft7Validator(schema)
|
||
errors = list(validator.iter_errors(data))
|
||
|
||
if errors:
|
||
error_messages = [
|
||
{
|
||
"path": list(e.absolute_path),
|
||
"message": e.message,
|
||
"validator": e.validator
|
||
}
|
||
for e in errors
|
||
]
|
||
return False, error_messages
|
||
|
||
return True, []
|
||
|
||
except Exception as e:
|
||
return False, [{"message": str(e)}]
|
||
|
||
|
||
# ==================== 诊断函数 ====================
|
||
|
||
def diagnose_json_error(text: str) -> dict:
|
||
"""诊断 JSON 错误"""
|
||
issues = []
|
||
|
||
# 检查是否为空
|
||
if not text or not text.strip():
|
||
issues.append({
|
||
"type": "empty_input",
|
||
"severity": "critical",
|
||
"suggestion": "输入为空"
|
||
})
|
||
return {"issues": issues, "fixable": False}
|
||
|
||
# 检查中文标点
|
||
cn_punctuation = [',', '。', ':', ';', '"', '"', ''', ''']
|
||
for p in cn_punctuation:
|
||
if p in text:
|
||
issues.append({
|
||
"type": "chinese_punctuation",
|
||
"char": p,
|
||
"severity": "low",
|
||
"suggestion": f"将 {p} 替换为对应英文标点"
|
||
})
|
||
|
||
# 检查代码块包裹
|
||
if '```' in text:
|
||
issues.append({
|
||
"type": "markdown_wrapped",
|
||
"severity": "low",
|
||
"suggestion": "需要提取代码块内容"
|
||
})
|
||
|
||
# 检查注释
|
||
if '//' in text or '/*' in text:
|
||
issues.append({
|
||
"type": "has_comments",
|
||
"severity": "low",
|
||
"suggestion": "需要移除注释"
|
||
})
|
||
|
||
# 检查 Python 风格
|
||
if re.search(r'\b(True|False|None)\b', text):
|
||
issues.append({
|
||
"type": "python_style",
|
||
"severity": "low",
|
||
"suggestion": "将 True/False/None 转为 true/false/null"
|
||
})
|
||
|
||
# 检查尾部逗号
|
||
if re.search(r',\s*[}\]]', text):
|
||
issues.append({
|
||
"type": "trailing_comma",
|
||
"severity": "low",
|
||
"suggestion": "移除 } 或 ] 前的逗号"
|
||
})
|
||
|
||
# 检查括号平衡
|
||
open_braces = text.count('{') - text.count('}')
|
||
open_brackets = text.count('[') - text.count(']')
|
||
|
||
if open_braces > 0:
|
||
issues.append({
|
||
"type": "unclosed_brace",
|
||
"count": open_braces,
|
||
"severity": "medium",
|
||
"suggestion": f"缺少 {open_braces} 个 }}"
|
||
})
|
||
elif open_braces < 0:
|
||
issues.append({
|
||
"type": "extra_brace",
|
||
"count": -open_braces,
|
||
"severity": "medium",
|
||
"suggestion": f"多余 {-open_braces} 个 }}"
|
||
})
|
||
|
||
if open_brackets > 0:
|
||
issues.append({
|
||
"type": "unclosed_bracket",
|
||
"count": open_brackets,
|
||
"severity": "medium",
|
||
"suggestion": f"缺少 {open_brackets} 个 ]"
|
||
})
|
||
elif open_brackets < 0:
|
||
issues.append({
|
||
"type": "extra_bracket",
|
||
"count": -open_brackets,
|
||
"severity": "medium",
|
||
"suggestion": f"多余 {-open_brackets} 个 ]"
|
||
})
|
||
|
||
# 检查引号平衡
|
||
quote_count = text.count('"')
|
||
if quote_count % 2 != 0:
|
||
issues.append({
|
||
"type": "unbalanced_quotes",
|
||
"severity": "high",
|
||
"suggestion": "引号数量不平衡,可能有未闭合的字符串"
|
||
})
|
||
|
||
# 判断是否可修复
|
||
fixable_types = {
|
||
"chinese_punctuation", "markdown_wrapped", "has_comments",
|
||
"python_style", "trailing_comma", "unclosed_brace", "unclosed_bracket"
|
||
}
|
||
fixable = all(i["type"] in fixable_types for i in issues)
|
||
|
||
return {
|
||
"issues": issues,
|
||
"issue_count": len(issues),
|
||
"fixable": fixable,
|
||
"severity": max(
|
||
(i.get("severity", "low") for i in issues),
|
||
key=lambda x: {"low": 1, "medium": 2, "high": 3, "critical": 4}.get(x, 0),
|
||
default="low"
|
||
)
|
||
}
|
||
|
||
|
||
# ==================== 便捷函数 ====================
|
||
|
||
def safe_json_loads(text: str, default: Any = None) -> Any:
|
||
"""安全的 json.loads,失败返回默认值"""
|
||
try:
|
||
return parse_llm_json(text)
|
||
except Exception:
|
||
return default
|
||
|
||
|
||
def extract_json_from_text(text: str) -> Optional[str]:
|
||
"""从文本中提取 JSON 字符串"""
|
||
preprocessed = _preprocess_text(text)
|
||
fixed, _ = _fix_json_format(preprocessed)
|
||
|
||
try:
|
||
json.loads(fixed)
|
||
return fixed
|
||
except Exception:
|
||
completed = _try_complete_json(fixed)
|
||
if completed:
|
||
try:
|
||
json.loads(completed)
|
||
return completed
|
||
except Exception:
|
||
pass
|
||
|
||
return None
|
||
|
||
|
||
def clean_llm_output(text: str) -> Tuple[str, List[str]]:
|
||
"""
|
||
清洗大模型输出,返回清洗后的文本和应用的清洗规则
|
||
|
||
Args:
|
||
text: 原始输出文本
|
||
|
||
Returns:
|
||
(cleaned_text, applied_rules)
|
||
"""
|
||
if not text:
|
||
return "", ["empty_input"]
|
||
|
||
applied_rules = []
|
||
original = text
|
||
|
||
# 1. 去除 BOM 头
|
||
if text.startswith('\ufeff'):
|
||
text = text.lstrip('\ufeff')
|
||
applied_rules.append("removed_bom")
|
||
|
||
# 2. 去除 ANSI 转义序列
|
||
ansi_pattern = re.compile(r'\x1b\[[0-9;]*m')
|
||
if ansi_pattern.search(text):
|
||
text = ansi_pattern.sub('', text)
|
||
applied_rules.append("removed_ansi")
|
||
|
||
# 3. 去除首尾空白
|
||
text = text.strip()
|
||
|
||
# 4. 去除开头的客套话
|
||
polite_patterns = [
|
||
r'^好的[,,。.]?\s*',
|
||
r'^当然[,,。.]?\s*',
|
||
r'^没问题[,,。.]?\s*',
|
||
r'^根据您的要求[,,。.]?\s*',
|
||
r'^以下是.*?[::]\s*',
|
||
r'^分析结果如下[::]\s*',
|
||
r'^我来为您.*?[::]\s*',
|
||
r'^这是.*?结果[::]\s*',
|
||
]
|
||
for pattern in polite_patterns:
|
||
if re.match(pattern, text, re.IGNORECASE):
|
||
text = re.sub(pattern, '', text, flags=re.IGNORECASE)
|
||
applied_rules.append("removed_polite_prefix")
|
||
break
|
||
|
||
# 5. 提取 Markdown JSON 代码块
|
||
json_block_patterns = [
|
||
r'```json\s*([\s\S]*?)\s*```',
|
||
r'```\s*([\s\S]*?)\s*```',
|
||
]
|
||
for pattern in json_block_patterns:
|
||
match = re.search(pattern, text, re.IGNORECASE)
|
||
if match:
|
||
extracted = match.group(1).strip()
|
||
if extracted.startswith(('{', '[')):
|
||
text = extracted
|
||
applied_rules.append("extracted_code_block")
|
||
break
|
||
|
||
# 6. 处理零宽字符
|
||
zero_width = re.compile(r'[\u200b\u200c\u200d\ufeff]')
|
||
if zero_width.search(text):
|
||
text = zero_width.sub('', text)
|
||
applied_rules.append("removed_zero_width")
|
||
|
||
return text.strip(), applied_rules
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|