| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- # -*- coding: utf-8 -*-
- """LLM 输出解析辅助函数。"""
- import json
- import re
- from typing import Any, Dict, Optional
- _FENCED_JSON_RE = re.compile(r"```(?:json)?\s*([\s\S]*?)\s*```", re.IGNORECASE)
- # 回退正则:从类 JSON 结构中提取 "answer" 字段值,处理未转义控制字符等情况。
- _ANSWER_FIELD_RE = re.compile(
- r'"answer"\s*:\s*"((?:[^"\\]|\\.)*)"',
- re.DOTALL,
- )
- def extract_json_object(text: str) -> Dict[str, Any]:
- """从模型响应中提取 JSON 对象。"""
- if not text:
- return {}
- stripped = text.strip()
- fenced_match = _FENCED_JSON_RE.search(stripped)
- if fenced_match:
- stripped = fenced_match.group(1).strip()
- try:
- value = json.loads(stripped)
- return value if isinstance(value, dict) else {}
- except json.JSONDecodeError:
- pass
- start = stripped.find("{")
- end = stripped.rfind("}")
- if start >= 0 and end > start:
- fragment = stripped[start:end + 1]
- try:
- value = json.loads(fragment)
- return value if isinstance(value, dict) else {}
- except json.JSONDecodeError:
- # 重试时转义控制字符(模型常在字符串值中输出字面换行/制表符)
- repaired = _repair_control_chars(fragment)
- if repaired != fragment:
- try:
- value = json.loads(repaired)
- return value if isinstance(value, dict) else {}
- except json.JSONDecodeError:
- pass
- return {}
- def extract_answer_field(text: str) -> Optional[str]:
- """尽力从原始 LLM 响应中提取 "answer" 字段。
- 当 ``extract_json_object`` 解析失败时(如流式输出包含未转义控制字符),
- 作为回退方案使用。
- """
- if not text:
- return None
- match = _ANSWER_FIELD_RE.search(text)
- if not match:
- return None
- raw_value = match.group(1)
- # 解标准 JSON 转义序列
- try:
- return json.loads(f'"{raw_value}"')
- except json.JSONDecodeError:
- return raw_value
- def _repair_control_chars(s: str) -> str:
- """替换 JSON 字符串值中的字面控制字符。
- 模型有时会在字符串字面量中输出原始换行符/制表符,
- 导致 ``json.loads`` 报错。此函数将其替换为正确的转义序列,
- 同时保持周围 JSON 结构不变。
- """
- # 仅替换引号之间的控制字符。
- # 简单处理:将所有未转义的 \n/\r/\t 替换为转义版本,
- # 但跳过已转义的序列(前面有反斜杠的)。
- result = []
- i = 0
- in_string = False
- while i < len(s):
- c = s[i]
- if c == '"' and (i == 0 or s[i - 1] != "\\"):
- in_string = not in_string
- result.append(c)
- elif in_string and c == "\n":
- result.append("\\n")
- elif in_string and c == "\r":
- result.append("\\r")
- elif in_string and c == "\t":
- result.append("\\t")
- else:
- result.append(c)
- i += 1
- return "".join(result)
- def compact_json(value: Any) -> str:
- return json.dumps(value, ensure_ascii=False, indent=2)
|