| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399 |
- """
- 思考过程二次总结器(方案三)
- 将模型输出中的 <think>...</think> / Reasoning 等“思考过程”改写为可展示给用户的中文“思考过程摘要”,
- 避免原始长推理链路直接透传到前端/数据库。
- """
- import logging
- import json
- import re
- from typing import Any, List, Tuple
- from utils.config import settings
- from utils.prompt_loader import load_prompt
- logger = logging.getLogger(__name__)
- _NOISE_LINE_PATTERNS = [
- re.compile(r"^\s*```"),
- re.compile(r"^\s*#+\s*"),
- re.compile(r"^\s*(thinking process|reasoning)\s*[::]?\s*$", re.IGNORECASE),
- re.compile(r"^\s*analyze the request\s*[::]?\s*$", re.IGNORECASE),
- re.compile(r"^\s*(role|task|input|output)\b", re.IGNORECASE),
- re.compile(r"^\s*(角色|任务|输入|输出要求|开始输出)\b"),
- re.compile(r"^\s*(意图分析|意图识别|思路分析|分析结果|思考过程摘要|思考过程|思考要点)\s*[::]?\s*$"),
- ]
- _NOISE_CONTAINS_PATTERNS = [
- re.compile(r"\bThinking Process\b", re.IGNORECASE),
- re.compile(r"\bAnalyze the Request\b", re.IGNORECASE),
- re.compile(r"\bRole\b\s*[::]", re.IGNORECASE),
- re.compile(r"\bTask\b\s*[::]", re.IGNORECASE),
- re.compile(r"\bInput\b\s*[::]", re.IGNORECASE),
- re.compile(r"\bOutput\b\s*[::]", re.IGNORECASE),
- re.compile(r"\bFinal Answer\b", re.IGNORECASE),
- re.compile(r"思考要点总结器|思考过程展示改写器"),
- ]
- _LEADING_LABEL_RE = re.compile(
- r"^\s*(思考过程摘要|思考过程|思考要点|要点|总结|意图分析|意图识别|思路分析|分析结果)\s*[::]\s*"
- )
- def _try_parse_json_text(text: str) -> bool:
- candidate = (text or "").strip()
- if not candidate:
- return False
- if not (candidate.startswith("{") or candidate.startswith("[")):
- return False
- try:
- json.loads(candidate)
- return True
- except Exception:
- return False
- def _extract_first_json_object(text: str) -> str:
- """提取文本中第一个平衡的大括号 JSON 片段(不保证一定可解析)。"""
- source = text or ""
- start_idx = source.find("{")
- if start_idx == -1:
- return ""
- depth = 0
- in_string = False
- escape = False
- for idx in range(start_idx, len(source)):
- ch = source[idx]
- if escape:
- escape = False
- continue
- if ch == "\\":
- escape = True
- continue
- if ch == '"':
- in_string = not in_string
- continue
- if in_string:
- continue
- if ch == "{":
- depth += 1
- elif ch == "}":
- depth -= 1
- if depth == 0:
- return source[start_idx: idx + 1]
- return source[start_idx:]
- def _extract_json_from_model_output(text: str) -> Tuple[str, str]:
- """从模型输出中提取 JSON 段,并返回其前面的说明/思考文本。"""
- response_text = (text or "").strip()
- if not response_text:
- return "", ""
- code_block = re.search(
- r"```(?:json)?\s*\n?(.*?)\n?```",
- response_text,
- re.DOTALL | re.IGNORECASE,
- )
- if code_block:
- candidate = (code_block.group(1) or "").strip()
- if _try_parse_json_text(candidate):
- return response_text[: code_block.start()].strip(), candidate
- candidate = _extract_first_json_object(response_text).strip()
- if candidate and candidate != response_text and _try_parse_json_text(candidate):
- start_idx = response_text.find(candidate)
- thinking = response_text[:start_idx].strip() if start_idx >= 0 else ""
- return thinking, candidate
- return "", ""
- def split_thinking_and_answer(text: str) -> Tuple[str, str]:
- """
- 拆分模型输出中的思考过程与正式回答。
- - 优先识别 <think>...</think>
- - 其次识别以 Reasoning/Thinking Process/思考过程 开头且包含 Final Answer/Answer 标记的格式
- - 未匹配到明确分隔符时,不做拆分
- """
- response_text = (text or "").strip()
- if not response_text:
- return "", ""
- think_match = re.search(
- r"<think>\s*(.*?)\s*</think>\s*(.*)",
- response_text,
- re.DOTALL | re.IGNORECASE,
- )
- if think_match:
- return think_match.group(1).strip(), think_match.group(2).strip()
- leading_reasoning = re.match(
- r"^\s*(Thinking Process|Reasoning|思考过程|思维链|推理过程)\s*[::]\s*",
- response_text,
- re.IGNORECASE,
- )
- if not leading_reasoning:
- return "", response_text
- final_marker = re.search(
- r"\n\s*(Final Answer|Final Response|Answer|最终答案|正式回答|最终回复|输出结果)\s*[::]\s*",
- response_text,
- re.IGNORECASE,
- )
- if not final_marker:
- extracted_thinking, extracted_json = _extract_json_from_model_output(response_text)
- if extracted_json:
- extracted_thinking = re.sub(
- r"^\s*(Thinking Process|Reasoning|思考过程|思维链|推理过程)\s*[::]\s*",
- "",
- (extracted_thinking or "").strip(),
- flags=re.IGNORECASE,
- ).strip()
- return extracted_thinking, extracted_json.strip()
- return "", response_text
- thinking = response_text[: final_marker.start()].strip()
- answer = response_text[final_marker.end() :].strip()
- thinking = re.sub(
- r"^\s*(Thinking Process|Reasoning|思考过程|思维链|推理过程)\s*[::]\s*",
- "",
- thinking,
- flags=re.IGNORECASE,
- ).strip()
- return thinking, answer
- _LIST_PREFIX_RE = re.compile(r"^\s*(?:[-•*]\s+|\d+\s*[.)、]\s+)")
- _REQUIRED_PREFIX_RE = re.compile(r"^\s*(?:用户问的是|嗯,用户问的是)")
- # 更强的兜底校验:避免回显 prompt 元信息 / 英文标签 / Markdown 列表等
- _INVALID_SUMMARY_PATTERNS = [
- re.compile(r"\bThinking Process\b", re.IGNORECASE),
- re.compile(r"\bAnalyze the Request\b", re.IGNORECASE),
- re.compile(r"\bFinal Answer\b", re.IGNORECASE),
- re.compile(r"\b(Role|Task|Input|Output)\b", re.IGNORECASE),
- re.compile(r"(?:^|\n)\s*(角色|任务|输入|输出要求|输出|开始输出)\s*[::]", re.MULTILINE),
- re.compile(r"(?:^|\n)\s*(识别用户明确查询|问题核心主题|需结合公司主体)", re.MULTILINE),
- re.compile(r"我们需要理解问题:"),
- re.compile(r"思考要点总结器|思考过程展示改写器"),
- re.compile(r"```"),
- re.compile(r"#+\s*\S"),
- re.compile(r"-\s+|•\s+"),
- re.compile(r"\d+\s*[.)、]\s+"),
- ]
- def _is_summary_acceptable(text: str) -> bool:
- summary = (text or "").strip()
- if not summary:
- return False
- if not _REQUIRED_PREFIX_RE.match(summary):
- return False
- if any(pattern.search(summary) for pattern in _INVALID_SUMMARY_PATTERNS):
- return False
- return True
- def _strip_noise_lines(text: str) -> str:
- lines = (text or "").splitlines()
- kept: List[str] = []
- for raw_line in lines:
- line = raw_line.rstrip()
- stripped = line.strip()
- if not stripped:
- kept.append("")
- continue
- if any(pattern.match(stripped) for pattern in _NOISE_LINE_PATTERNS):
- continue
- if any(pattern.search(stripped) for pattern in _NOISE_CONTAINS_PATTERNS):
- continue
- stripped = _LEADING_LABEL_RE.sub("", stripped).strip()
- had_list_prefix = bool(_LIST_PREFIX_RE.match(stripped))
- stripped = _LIST_PREFIX_RE.sub("", stripped).strip()
- if had_list_prefix and stripped and stripped[-1] not in "。!?;!?;":
- stripped = stripped.rstrip(",,") + ";"
- if stripped:
- kept.append(stripped)
- cleaned = "\n".join(kept)
- cleaned = re.sub(r"\n\s*\n\s*\n+", "\n\n", cleaned).strip()
- return cleaned
- def _split_paragraphs(text: str) -> List[str]:
- chunks = [chunk.strip() for chunk in re.split(r"\n\s*\n+", text or "") if chunk.strip()]
- paragraphs: List[str] = []
- for chunk in chunks:
- paragraph = re.sub(r"\s*\n\s*", "", chunk).strip()
- if paragraph:
- paragraphs.append(paragraph)
- return paragraphs
- def normalize_thinking_summary(
- text: str,
- *,
- max_points: int = 5,
- max_output_chars: int = 600,
- ) -> str:
- """将模型输出归一化为中文自然段(最多 max_points 段)。"""
- raw_text = (text or "").strip()
- if not raw_text:
- return ""
- cleaned = _strip_noise_lines(raw_text)
- if not cleaned:
- return ""
- paragraphs = _split_paragraphs(cleaned)
- if not paragraphs:
- return ""
- paragraphs = paragraphs[: max(1, int(max_points or 5))]
- normalized = "\n\n".join(paragraphs).strip()
- if max_output_chars and len(normalized) > int(max_output_chars):
- normalized = normalized[: int(max_output_chars)].rstrip()
- return normalized
- def _build_fallback_summary(
- user_question: str,
- *,
- max_points: int,
- max_output_chars: int,
- ) -> str:
- question = (user_question or "").strip()
- if not question:
- question = "这个问题"
- engineering_keywords = (
- "施工",
- "支架",
- "脚手架",
- "架桥机",
- "桥梁",
- "隧道",
- "混凝土",
- "钢筋",
- "模板",
- "验算",
- "荷载",
- "地基",
- "扣件",
- "碗扣",
- )
- is_engineering = any(keyword in question for keyword in engineering_keywords)
- domain_hint = "工程施工/技术" if is_engineering else "需要结构化说明"
- outline_paragraph = (
- "我会按常见施工方案结构来组织回答:工程概况 → 编制依据 → 施工准备 → 设计与参数 → 施工工艺 → 检查验收与监测 → 拆除 → 安全与应急 → 计算要点。"
- if is_engineering
- else "在信息不充分的情况下,我会先按通用结构组织回答:背景/目标与依据 → 核心要点与步骤 → 注意事项与边界条件 → 需要追问的关键信息。"
- )
- data_paragraph = (
- "如果涉及验算或参数,我不会直接给出精确数值结论(缺少荷载、尺寸、工况等数据),而是说明验算项目、取值原则与需要补充的信息。"
- if is_engineering
- else "如果涉及具体数据或条件(时间/地区/预算/限制等),我会说明我的假设、可选口径,并提示需要你进一步确认的关键信息。"
- )
- paragraphs = [
- f"用户问的是“{question}”。这更像是一个{domain_hint}类问题,我需要先确认用户希望得到的是方案框架、步骤清单,还是关键控制点与注意事项。",
- outline_paragraph,
- data_paragraph,
- "如果你能补充具体场景(适用对象/工况/阶段/关注重点),我可以把回答细化为可执行的流程清单与检查表。",
- ]
- paragraphs = paragraphs[: max(1, int(max_points or 3))]
- text = "\n\n".join(paragraphs).strip()
- if max_output_chars and len(text) > int(max_output_chars):
- text = text[: int(max_output_chars)].rstrip()
- return text
- async def summarize_thinking_content(
- *,
- user_question: str,
- raw_thinking: str,
- final_answer: str = "",
- chat_service: Any,
- context: str = "",
- ) -> str:
- """
- 二次总结原始思考过程,返回可展示的中文“思考过程摘要”(自然段)。
- 失败兜底:返回空字符串,避免回退 raw_thinking(防止暴露长推理链路)。
- """
- cfg = getattr(settings, "thinking_summary", None)
- enabled = getattr(cfg, "enabled", True) if cfg else True
- if not enabled:
- return ""
- thinking_text = (raw_thinking or "").strip()
- if not thinking_text:
- return ""
- max_points = int(getattr(cfg, "max_points", 5) if cfg else 5)
- max_input_chars = int(getattr(cfg, "max_input_chars", 1500) if cfg else 1500)
- max_output_chars = int(getattr(cfg, "max_output_chars", 600) if cfg else 600)
- # temperature 预留配置项(当前 qwen_service.chat 未透传该参数)
- _ = float(getattr(cfg, "temperature", 0.2) if cfg else 0.2)
- if max_input_chars and len(thinking_text) > max_input_chars:
- thinking_text = thinking_text[:max_input_chars]
- prompt = load_prompt(
- "thinking_summary",
- userMessage=user_question or "",
- thinkingText=thinking_text,
- finalAnswer=final_answer or "",
- maxPoints=str(max_points),
- )
- if not (prompt or "").strip():
- logger.warning("[thinking_summary] Prompt 为空,已跳过")
- return ""
- try:
- response = await chat_service.chat(
- [{"role": "user", "content": prompt}],
- )
- except Exception as exc:
- logger.warning(
- f"[thinking_summary] 生成失败({context or 'unknown'}): {type(exc).__name__}",
- exc_info=True,
- )
- return _build_fallback_summary(
- user_question=user_question,
- max_points=max_points,
- max_output_chars=max_output_chars,
- )
- # 避免总结器自己又输出 <think> 段
- _, summary_final = split_thinking_and_answer(response or "")
- summary_text = (summary_final or response or "").strip()
- normalized = normalize_thinking_summary(
- summary_text,
- max_points=max_points,
- max_output_chars=max_output_chars,
- )
- if not normalized or not _is_summary_acceptable(normalized):
- return _build_fallback_summary(
- user_question=user_question,
- max_points=max_points,
- max_output_chars=max_output_chars,
- )
- logger.info(
- f"[thinking_summary] 生成成功({context or 'unknown'}) | raw_len={len(raw_thinking or '')} | out_len={len(normalized)}"
- )
- return normalized
|