"""
思考过程二次总结器(方案三)
将模型输出中的 ... / Reasoning 等“思考过程”改写为可展示给用户的中文“思考过程摘要”,
避免原始长推理链路直接透传到前端/数据库。
"""
import logging
import json
import re
from typing import Any, List, Tuple
from utils.config import settings
from utils.prompt_loader import load_prompt
logger = logging.getLogger(__name__)
_NOISE_LINE_PATTERNS = [
re.compile(r"^\s*```"),
re.compile(r"^\s*#+\s*"),
re.compile(r"^\s*(thinking process|reasoning)\s*[::]?\s*$", re.IGNORECASE),
re.compile(r"^\s*analyze the request\s*[::]?\s*$", re.IGNORECASE),
re.compile(r"^\s*(role|task|input|output)\b", re.IGNORECASE),
re.compile(r"^\s*(角色|任务|输入|输出要求|开始输出)\b"),
re.compile(r"^\s*(意图分析|意图识别|思路分析|分析结果|思考过程摘要|思考过程|思考要点)\s*[::]?\s*$"),
]
_NOISE_CONTAINS_PATTERNS = [
re.compile(r"\bThinking Process\b", re.IGNORECASE),
re.compile(r"\bAnalyze the Request\b", re.IGNORECASE),
re.compile(r"\bRole\b\s*[::]", re.IGNORECASE),
re.compile(r"\bTask\b\s*[::]", re.IGNORECASE),
re.compile(r"\bInput\b\s*[::]", re.IGNORECASE),
re.compile(r"\bOutput\b\s*[::]", re.IGNORECASE),
re.compile(r"\bFinal Answer\b", re.IGNORECASE),
re.compile(r"思考要点总结器|思考过程展示改写器"),
]
_LEADING_LABEL_RE = re.compile(
r"^\s*(思考过程摘要|思考过程|思考要点|要点|总结|意图分析|意图识别|思路分析|分析结果)\s*[::]\s*"
)
def _try_parse_json_text(text: str) -> bool:
candidate = (text or "").strip()
if not candidate:
return False
if not (candidate.startswith("{") or candidate.startswith("[")):
return False
try:
json.loads(candidate)
return True
except Exception:
return False
def _extract_first_json_object(text: str) -> str:
"""提取文本中第一个平衡的大括号 JSON 片段(不保证一定可解析)。"""
source = text or ""
start_idx = source.find("{")
if start_idx == -1:
return ""
depth = 0
in_string = False
escape = False
for idx in range(start_idx, len(source)):
ch = source[idx]
if escape:
escape = False
continue
if ch == "\\":
escape = True
continue
if ch == '"':
in_string = not in_string
continue
if in_string:
continue
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
return source[start_idx: idx + 1]
return source[start_idx:]
def _extract_json_from_model_output(text: str) -> Tuple[str, str]:
"""从模型输出中提取 JSON 段,并返回其前面的说明/思考文本。"""
response_text = (text or "").strip()
if not response_text:
return "", ""
code_block = re.search(
r"```(?:json)?\s*\n?(.*?)\n?```",
response_text,
re.DOTALL | re.IGNORECASE,
)
if code_block:
candidate = (code_block.group(1) or "").strip()
if _try_parse_json_text(candidate):
return response_text[: code_block.start()].strip(), candidate
candidate = _extract_first_json_object(response_text).strip()
if candidate and candidate != response_text and _try_parse_json_text(candidate):
start_idx = response_text.find(candidate)
thinking = response_text[:start_idx].strip() if start_idx >= 0 else ""
return thinking, candidate
return "", ""
def split_thinking_and_answer(text: str) -> Tuple[str, str]:
"""
拆分模型输出中的思考过程与正式回答。
- 优先识别 ...
- 其次识别以 Reasoning/Thinking Process/思考过程 开头且包含 Final Answer/Answer 标记的格式
- 未匹配到明确分隔符时,不做拆分
"""
response_text = (text or "").strip()
if not response_text:
return "", ""
think_match = re.search(
r"\s*(.*?)\s*\s*(.*)",
response_text,
re.DOTALL | re.IGNORECASE,
)
if think_match:
return think_match.group(1).strip(), think_match.group(2).strip()
leading_reasoning = re.match(
r"^\s*(Thinking Process|Reasoning|思考过程|思维链|推理过程)\s*[::]\s*",
response_text,
re.IGNORECASE,
)
if not leading_reasoning:
return "", response_text
final_marker = re.search(
r"\n\s*(Final Answer|Final Response|Answer|最终答案|正式回答|最终回复|输出结果)\s*[::]\s*",
response_text,
re.IGNORECASE,
)
if not final_marker:
extracted_thinking, extracted_json = _extract_json_from_model_output(response_text)
if extracted_json:
extracted_thinking = re.sub(
r"^\s*(Thinking Process|Reasoning|思考过程|思维链|推理过程)\s*[::]\s*",
"",
(extracted_thinking or "").strip(),
flags=re.IGNORECASE,
).strip()
return extracted_thinking, extracted_json.strip()
return "", response_text
thinking = response_text[: final_marker.start()].strip()
answer = response_text[final_marker.end() :].strip()
thinking = re.sub(
r"^\s*(Thinking Process|Reasoning|思考过程|思维链|推理过程)\s*[::]\s*",
"",
thinking,
flags=re.IGNORECASE,
).strip()
return thinking, answer
_LIST_PREFIX_RE = re.compile(r"^\s*(?:[-•*]\s+|\d+\s*[.)、]\s+)")
_REQUIRED_PREFIX_RE = re.compile(r"^\s*(?:我们需要理解问题:用户问的是|嗯,用户问的是)")
# 更强的兜底校验:避免回显 prompt 元信息 / 英文标签 / Markdown 列表等
_INVALID_SUMMARY_PATTERNS = [
re.compile(r"\bThinking Process\b", re.IGNORECASE),
re.compile(r"\bAnalyze the Request\b", re.IGNORECASE),
re.compile(r"\bFinal Answer\b", re.IGNORECASE),
re.compile(r"\b(Role|Task|Input|Output)\b", re.IGNORECASE),
re.compile(r"(?:^|\n)\s*(角色|任务|输入|输出要求|输出|开始输出)\s*[::]", re.MULTILINE),
re.compile(r"思考要点总结器|思考过程展示改写器"),
re.compile(r"```"),
re.compile(r"#+\s*\S"),
re.compile(r"-\s+|•\s+"),
re.compile(r"\d+\s*[.)、]\s+"),
]
def _is_summary_acceptable(text: str) -> bool:
summary = (text or "").strip()
if not summary:
return False
if not _REQUIRED_PREFIX_RE.match(summary):
return False
if any(pattern.search(summary) for pattern in _INVALID_SUMMARY_PATTERNS):
return False
return True
def _strip_noise_lines(text: str) -> str:
lines = (text or "").splitlines()
kept: List[str] = []
for raw_line in lines:
line = raw_line.rstrip()
stripped = line.strip()
if not stripped:
kept.append("")
continue
if any(pattern.match(stripped) for pattern in _NOISE_LINE_PATTERNS):
continue
if any(pattern.search(stripped) for pattern in _NOISE_CONTAINS_PATTERNS):
continue
stripped = _LEADING_LABEL_RE.sub("", stripped).strip()
had_list_prefix = bool(_LIST_PREFIX_RE.match(stripped))
stripped = _LIST_PREFIX_RE.sub("", stripped).strip()
if had_list_prefix and stripped and stripped[-1] not in "。!?;!?;":
stripped = stripped.rstrip(",,") + ";"
if stripped:
kept.append(stripped)
cleaned = "\n".join(kept)
cleaned = re.sub(r"\n\s*\n\s*\n+", "\n\n", cleaned).strip()
return cleaned
def _split_paragraphs(text: str) -> List[str]:
chunks = [chunk.strip() for chunk in re.split(r"\n\s*\n+", text or "") if chunk.strip()]
paragraphs: List[str] = []
for chunk in chunks:
paragraph = re.sub(r"\s*\n\s*", "", chunk).strip()
if paragraph:
paragraphs.append(paragraph)
return paragraphs
def normalize_thinking_summary(
text: str,
*,
max_points: int = 5,
max_output_chars: int = 600,
) -> str:
"""将模型输出归一化为中文自然段(最多 max_points 段)。"""
raw_text = (text or "").strip()
if not raw_text:
return ""
cleaned = _strip_noise_lines(raw_text)
if not cleaned:
return ""
paragraphs = _split_paragraphs(cleaned)
if not paragraphs:
return ""
paragraphs = paragraphs[: max(1, int(max_points or 5))]
normalized = "\n\n".join(paragraphs).strip()
if max_output_chars and len(normalized) > int(max_output_chars):
normalized = normalized[: int(max_output_chars)].rstrip()
return normalized
def _build_fallback_summary(
user_question: str,
*,
max_points: int,
max_output_chars: int,
) -> str:
question = (user_question or "").strip()
if not question:
question = "这个问题"
engineering_keywords = (
"施工",
"支架",
"脚手架",
"架桥机",
"桥梁",
"隧道",
"混凝土",
"钢筋",
"模板",
"验算",
"荷载",
"地基",
"扣件",
"碗扣",
)
is_engineering = any(keyword in question for keyword in engineering_keywords)
domain_hint = "工程施工/技术" if is_engineering else "需要结构化说明"
outline_paragraph = (
"我会按常见施工方案结构来组织回答:工程概况 → 编制依据 → 施工准备 → 设计与参数 → 施工工艺 → 检查验收与监测 → 拆除 → 安全与应急 → 计算要点。"
if is_engineering
else "在信息不充分的情况下,我会先按通用结构组织回答:背景/目标与依据 → 核心要点与步骤 → 注意事项与边界条件 → 需要追问的关键信息。"
)
data_paragraph = (
"如果涉及验算或参数,我不会直接给出精确数值结论(缺少荷载、尺寸、工况等数据),而是说明验算项目、取值原则与需要补充的信息。"
if is_engineering
else "如果涉及具体数据或条件(时间/地区/预算/限制等),我会说明我的假设、可选口径,并提示需要你进一步确认的关键信息。"
)
paragraphs = [
f"我们需要理解问题:用户问的是“{question}”。这更像是一个{domain_hint}类问题,我需要先确认用户希望得到的是方案框架、步骤清单,还是关键控制点与注意事项。",
outline_paragraph,
data_paragraph,
"如果你能补充具体场景(适用对象/工况/阶段/关注重点),我可以把回答细化为可执行的流程清单与检查表。",
]
paragraphs = paragraphs[: max(1, int(max_points or 3))]
text = "\n\n".join(paragraphs).strip()
if max_output_chars and len(text) > int(max_output_chars):
text = text[: int(max_output_chars)].rstrip()
return text
async def summarize_thinking_content(
*,
user_question: str,
raw_thinking: str,
final_answer: str = "",
chat_service: Any,
context: str = "",
) -> str:
"""
二次总结原始思考过程,返回可展示的中文“思考过程摘要”(自然段)。
失败兜底:返回空字符串,避免回退 raw_thinking(防止暴露长推理链路)。
"""
cfg = getattr(settings, "thinking_summary", None)
enabled = getattr(cfg, "enabled", True) if cfg else True
if not enabled:
return ""
thinking_text = (raw_thinking or "").strip()
if not thinking_text:
return ""
max_points = int(getattr(cfg, "max_points", 5) if cfg else 5)
max_input_chars = int(getattr(cfg, "max_input_chars", 1500) if cfg else 1500)
max_output_chars = int(getattr(cfg, "max_output_chars", 600) if cfg else 600)
# temperature 预留配置项(当前 qwen_service.chat 未透传该参数)
_ = float(getattr(cfg, "temperature", 0.2) if cfg else 0.2)
if max_input_chars and len(thinking_text) > max_input_chars:
thinking_text = thinking_text[:max_input_chars]
prompt = load_prompt(
"thinking_summary",
userMessage=user_question or "",
thinkingText=thinking_text,
finalAnswer=final_answer or "",
maxPoints=str(max_points),
)
if not (prompt or "").strip():
logger.warning("[thinking_summary] Prompt 为空,已跳过")
return ""
try:
response = await chat_service.chat(
[{"role": "user", "content": prompt}],
)
except Exception as exc:
logger.warning(
f"[thinking_summary] 生成失败({context or 'unknown'}): {type(exc).__name__}",
exc_info=True,
)
return _build_fallback_summary(
user_question=user_question,
max_points=max_points,
max_output_chars=max_output_chars,
)
# 避免总结器自己又输出 段
_, summary_final = split_thinking_and_answer(response or "")
summary_text = (summary_final or response or "").strip()
normalized = normalize_thinking_summary(
summary_text,
max_points=max_points,
max_output_chars=max_output_chars,
)
if not normalized or not _is_summary_acceptable(normalized):
return _build_fallback_summary(
user_question=user_question,
max_points=max_points,
max_output_chars=max_output_chars,
)
logger.info(
f"[thinking_summary] 生成成功({context or 'unknown'}) | raw_len={len(raw_thinking or '')} | out_len={len(normalized)}"
)
return normalized