""" 思考过程二次总结器(方案三) 将模型输出中的 ... / Reasoning 等“思考过程”改写为可展示给用户的中文“思考过程摘要”, 避免原始长推理链路直接透传到前端/数据库。 """ import logging import json import re from typing import Any, List, Tuple from utils.config import settings from utils.prompt_loader import load_prompt logger = logging.getLogger(__name__) _NOISE_LINE_PATTERNS = [ re.compile(r"^\s*```"), re.compile(r"^\s*#+\s*"), re.compile(r"^\s*(thinking process|reasoning)\s*[::]?\s*$", re.IGNORECASE), re.compile(r"^\s*analyze the request\s*[::]?\s*$", re.IGNORECASE), re.compile(r"^\s*(role|task|input|output)\b", re.IGNORECASE), re.compile(r"^\s*(角色|任务|输入|输出要求|开始输出)\b"), re.compile(r"^\s*(意图分析|意图识别|思路分析|分析结果|思考过程摘要|思考过程|思考要点)\s*[::]?\s*$"), ] _NOISE_CONTAINS_PATTERNS = [ re.compile(r"\bThinking Process\b", re.IGNORECASE), re.compile(r"\bAnalyze the Request\b", re.IGNORECASE), re.compile(r"\bRole\b\s*[::]", re.IGNORECASE), re.compile(r"\bTask\b\s*[::]", re.IGNORECASE), re.compile(r"\bInput\b\s*[::]", re.IGNORECASE), re.compile(r"\bOutput\b\s*[::]", re.IGNORECASE), re.compile(r"\bFinal Answer\b", re.IGNORECASE), re.compile(r"思考要点总结器|思考过程展示改写器"), ] _LEADING_LABEL_RE = re.compile( r"^\s*(思考过程摘要|思考过程|思考要点|要点|总结|意图分析|意图识别|思路分析|分析结果)\s*[::]\s*" ) def _try_parse_json_text(text: str) -> bool: candidate = (text or "").strip() if not candidate: return False if not (candidate.startswith("{") or candidate.startswith("[")): return False try: json.loads(candidate) return True except Exception: return False def _extract_first_json_object(text: str) -> str: """提取文本中第一个平衡的大括号 JSON 片段(不保证一定可解析)。""" source = text or "" start_idx = source.find("{") if start_idx == -1: return "" depth = 0 in_string = False escape = False for idx in range(start_idx, len(source)): ch = source[idx] if escape: escape = False continue if ch == "\\": escape = True continue if ch == '"': in_string = not in_string continue if in_string: continue if ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: return source[start_idx: idx + 1] return source[start_idx:] def _extract_json_from_model_output(text: str) -> Tuple[str, str]: """从模型输出中提取 JSON 段,并返回其前面的说明/思考文本。""" response_text = (text or "").strip() if not response_text: return "", "" code_block = re.search( r"```(?:json)?\s*\n?(.*?)\n?```", response_text, re.DOTALL | re.IGNORECASE, ) if code_block: candidate = (code_block.group(1) or "").strip() if _try_parse_json_text(candidate): return response_text[: code_block.start()].strip(), candidate candidate = _extract_first_json_object(response_text).strip() if candidate and candidate != response_text and _try_parse_json_text(candidate): start_idx = response_text.find(candidate) thinking = response_text[:start_idx].strip() if start_idx >= 0 else "" return thinking, candidate return "", "" def split_thinking_and_answer(text: str) -> Tuple[str, str]: """ 拆分模型输出中的思考过程与正式回答。 - 优先识别 ... - 其次识别以 Reasoning/Thinking Process/思考过程 开头且包含 Final Answer/Answer 标记的格式 - 未匹配到明确分隔符时,不做拆分 """ response_text = (text or "").strip() if not response_text: return "", "" think_match = re.search( r"\s*(.*?)\s*\s*(.*)", response_text, re.DOTALL | re.IGNORECASE, ) if think_match: return think_match.group(1).strip(), think_match.group(2).strip() leading_reasoning = re.match( r"^\s*(Thinking Process|Reasoning|思考过程|思维链|推理过程)\s*[::]\s*", response_text, re.IGNORECASE, ) if not leading_reasoning: return "", response_text final_marker = re.search( r"\n\s*(Final Answer|Final Response|Answer|最终答案|正式回答|最终回复|输出结果)\s*[::]\s*", response_text, re.IGNORECASE, ) if not final_marker: extracted_thinking, extracted_json = _extract_json_from_model_output(response_text) if extracted_json: extracted_thinking = re.sub( r"^\s*(Thinking Process|Reasoning|思考过程|思维链|推理过程)\s*[::]\s*", "", (extracted_thinking or "").strip(), flags=re.IGNORECASE, ).strip() return extracted_thinking, extracted_json.strip() return "", response_text thinking = response_text[: final_marker.start()].strip() answer = response_text[final_marker.end() :].strip() thinking = re.sub( r"^\s*(Thinking Process|Reasoning|思考过程|思维链|推理过程)\s*[::]\s*", "", thinking, flags=re.IGNORECASE, ).strip() return thinking, answer _LIST_PREFIX_RE = re.compile(r"^\s*(?:[-•*]\s+|\d+\s*[.)、]\s+)") _REQUIRED_PREFIX_RE = re.compile(r"^\s*(?:我们需要理解问题:用户问的是|嗯,用户问的是)") # 更强的兜底校验:避免回显 prompt 元信息 / 英文标签 / Markdown 列表等 _INVALID_SUMMARY_PATTERNS = [ re.compile(r"\bThinking Process\b", re.IGNORECASE), re.compile(r"\bAnalyze the Request\b", re.IGNORECASE), re.compile(r"\bFinal Answer\b", re.IGNORECASE), re.compile(r"\b(Role|Task|Input|Output)\b", re.IGNORECASE), re.compile(r"(?:^|\n)\s*(角色|任务|输入|输出要求|输出|开始输出)\s*[::]", re.MULTILINE), re.compile(r"思考要点总结器|思考过程展示改写器"), re.compile(r"```"), re.compile(r"#+\s*\S"), re.compile(r"-\s+|•\s+"), re.compile(r"\d+\s*[.)、]\s+"), ] def _is_summary_acceptable(text: str) -> bool: summary = (text or "").strip() if not summary: return False if not _REQUIRED_PREFIX_RE.match(summary): return False if any(pattern.search(summary) for pattern in _INVALID_SUMMARY_PATTERNS): return False return True def _strip_noise_lines(text: str) -> str: lines = (text or "").splitlines() kept: List[str] = [] for raw_line in lines: line = raw_line.rstrip() stripped = line.strip() if not stripped: kept.append("") continue if any(pattern.match(stripped) for pattern in _NOISE_LINE_PATTERNS): continue if any(pattern.search(stripped) for pattern in _NOISE_CONTAINS_PATTERNS): continue stripped = _LEADING_LABEL_RE.sub("", stripped).strip() had_list_prefix = bool(_LIST_PREFIX_RE.match(stripped)) stripped = _LIST_PREFIX_RE.sub("", stripped).strip() if had_list_prefix and stripped and stripped[-1] not in "。!?;!?;": stripped = stripped.rstrip(",,") + ";" if stripped: kept.append(stripped) cleaned = "\n".join(kept) cleaned = re.sub(r"\n\s*\n\s*\n+", "\n\n", cleaned).strip() return cleaned def _split_paragraphs(text: str) -> List[str]: chunks = [chunk.strip() for chunk in re.split(r"\n\s*\n+", text or "") if chunk.strip()] paragraphs: List[str] = [] for chunk in chunks: paragraph = re.sub(r"\s*\n\s*", "", chunk).strip() if paragraph: paragraphs.append(paragraph) return paragraphs def normalize_thinking_summary( text: str, *, max_points: int = 5, max_output_chars: int = 600, ) -> str: """将模型输出归一化为中文自然段(最多 max_points 段)。""" raw_text = (text or "").strip() if not raw_text: return "" cleaned = _strip_noise_lines(raw_text) if not cleaned: return "" paragraphs = _split_paragraphs(cleaned) if not paragraphs: return "" paragraphs = paragraphs[: max(1, int(max_points or 5))] normalized = "\n\n".join(paragraphs).strip() if max_output_chars and len(normalized) > int(max_output_chars): normalized = normalized[: int(max_output_chars)].rstrip() return normalized def _build_fallback_summary( user_question: str, *, max_points: int, max_output_chars: int, ) -> str: question = (user_question or "").strip() if not question: question = "这个问题" engineering_keywords = ( "施工", "支架", "脚手架", "架桥机", "桥梁", "隧道", "混凝土", "钢筋", "模板", "验算", "荷载", "地基", "扣件", "碗扣", ) is_engineering = any(keyword in question for keyword in engineering_keywords) domain_hint = "工程施工/技术" if is_engineering else "需要结构化说明" outline_paragraph = ( "我会按常见施工方案结构来组织回答:工程概况 → 编制依据 → 施工准备 → 设计与参数 → 施工工艺 → 检查验收与监测 → 拆除 → 安全与应急 → 计算要点。" if is_engineering else "在信息不充分的情况下,我会先按通用结构组织回答:背景/目标与依据 → 核心要点与步骤 → 注意事项与边界条件 → 需要追问的关键信息。" ) data_paragraph = ( "如果涉及验算或参数,我不会直接给出精确数值结论(缺少荷载、尺寸、工况等数据),而是说明验算项目、取值原则与需要补充的信息。" if is_engineering else "如果涉及具体数据或条件(时间/地区/预算/限制等),我会说明我的假设、可选口径,并提示需要你进一步确认的关键信息。" ) paragraphs = [ f"我们需要理解问题:用户问的是“{question}”。这更像是一个{domain_hint}类问题,我需要先确认用户希望得到的是方案框架、步骤清单,还是关键控制点与注意事项。", outline_paragraph, data_paragraph, "如果你能补充具体场景(适用对象/工况/阶段/关注重点),我可以把回答细化为可执行的流程清单与检查表。", ] paragraphs = paragraphs[: max(1, int(max_points or 3))] text = "\n\n".join(paragraphs).strip() if max_output_chars and len(text) > int(max_output_chars): text = text[: int(max_output_chars)].rstrip() return text async def summarize_thinking_content( *, user_question: str, raw_thinking: str, final_answer: str = "", chat_service: Any, context: str = "", ) -> str: """ 二次总结原始思考过程,返回可展示的中文“思考过程摘要”(自然段)。 失败兜底:返回空字符串,避免回退 raw_thinking(防止暴露长推理链路)。 """ cfg = getattr(settings, "thinking_summary", None) enabled = getattr(cfg, "enabled", True) if cfg else True if not enabled: return "" thinking_text = (raw_thinking or "").strip() if not thinking_text: return "" max_points = int(getattr(cfg, "max_points", 5) if cfg else 5) max_input_chars = int(getattr(cfg, "max_input_chars", 1500) if cfg else 1500) max_output_chars = int(getattr(cfg, "max_output_chars", 600) if cfg else 600) # temperature 预留配置项(当前 qwen_service.chat 未透传该参数) _ = float(getattr(cfg, "temperature", 0.2) if cfg else 0.2) if max_input_chars and len(thinking_text) > max_input_chars: thinking_text = thinking_text[:max_input_chars] prompt = load_prompt( "thinking_summary", userMessage=user_question or "", thinkingText=thinking_text, finalAnswer=final_answer or "", maxPoints=str(max_points), ) if not (prompt or "").strip(): logger.warning("[thinking_summary] Prompt 为空,已跳过") return "" try: response = await chat_service.chat( [{"role": "user", "content": prompt}], ) except Exception as exc: logger.warning( f"[thinking_summary] 生成失败({context or 'unknown'}): {type(exc).__name__}", exc_info=True, ) return _build_fallback_summary( user_question=user_question, max_points=max_points, max_output_chars=max_output_chars, ) # 避免总结器自己又输出 段 _, summary_final = split_thinking_and_answer(response or "") summary_text = (summary_final or response or "").strip() normalized = normalize_thinking_summary( summary_text, max_points=max_points, max_output_chars=max_output_chars, ) if not normalized or not _is_summary_acceptable(normalized): return _build_fallback_summary( user_question=user_question, max_points=max_points, max_output_chars=max_output_chars, ) logger.info( f"[thinking_summary] 生成成功({context or 'unknown'}) | raw_len={len(raw_thinking or '')} | out_len={len(normalized)}" ) return normalized