Просмотр исходного кода

Merge branch 'dev_sgsc_wxm_fix_chunk_split' of CRBC-MaaS-Platform-Project/LQAgentPlatform into dev

WangXuMing 2 недель назад
Родитель
Сommit
b86945d957

+ 66 - 7
core/construction_review/component/doc_worker/pdf_worker/text_splitter.py

@@ -103,19 +103,37 @@ class PdfTextSplitter(TextSplitter, HierarchicalChunkMixin):
 
         # 步骤4: 按目录层级处理每个标题块
         all_chunks: List[Dict[str, Any]] = []
-        
+
+        # 建立已定位标题的快速查找映射,用于后续 TOC 边界保护
+        found_titles_map = {t["title"]: t["position"] for t in found_titles}
+
         for i, title_info in enumerate(found_titles):
             start_pos = title_info["position"]
-            
-            # 确定正文块的结束位置(下一个同级标题的位置)
+
+            # 基础边界:下一个已定位的同级标题
             if i + 1 < len(found_titles):
                 end_pos = found_titles[i + 1]["position"]
             else:
                 end_pos = len(full_text)
-            
+
+            # TOC 边界保护:防止因标题定位错误导致的跨章节合并。
+            # 问题场景(用户原话描述):
+            # "当时的规则是两个标题之间的内容。但如果说最后一个标题跨章节了,
+            #  它就缺失了,缺失就会把下个章节的第一个标题,然后合并到最后上一个
+            #  章节的最后一个节里面。"
+            # 典型表现:第十章标题被错误定位到目录页(page 6),导致真正的第十章
+            # 没被识别,第九章最后一个二级标题 content_block 的 end_pos 被延长到
+            # len(full_text),将第十章的"计算书"、"相关施工图纸"等全部内容吞进
+            # doc_chunk_第九章->五_1。
+            toc_boundary = self._get_toc_boundary_position(
+                title_info["title"], all_toc_items, target_level, found_titles_map, full_text
+            )
+            if toc_boundary is not None and toc_boundary > start_pos:
+                end_pos = min(end_pos, toc_boundary)
+
             # 提取正文块
             content_block = full_text[start_pos:end_pos]
-            
+
             # 在正文块中查找子标题(按最低层级切分)
             sub_chunks = self._split_by_sub_titles(
                 content_block,
@@ -125,7 +143,7 @@ class PdfTextSplitter(TextSplitter, HierarchicalChunkMixin):
                 max_chunk_size,
                 min_chunk_size,
             )
-            
+
             # 为每个子块添加元数据
             for j, sub_chunk in enumerate(sub_chunks, 1):
                 chunk_data = self._build_chunk_metadata(
@@ -133,13 +151,54 @@ class PdfTextSplitter(TextSplitter, HierarchicalChunkMixin):
                 )
                 all_chunks.append(chunk_data)
 
-        # 步骤4: 生成最终的chunk_id和serial_number
+        # 步骤5: 生成最终的chunk_id和serial_number
         final_chunks = self._finalize_chunk_ids(all_chunks)
 
         print(f"  完成切分: {len(final_chunks)} 个块")
 
         return final_chunks
 
+    def _get_toc_boundary_position(
+        self,
+        title: str,
+        all_toc_items: List[Dict[str, Any]],
+        target_level: int,
+        found_titles_map: Dict[str, int],
+        full_text: str,
+    ) -> int | None:
+        """
+        在 all_toc_items 中找到当前标题的下一个兄弟/更高级标题,
+        并返回其在正文中的边界位置,防止 content_block 跨章节合并。
+        """
+        current_idx = -1
+        for idx, item in enumerate(all_toc_items):
+            if item.get("title") == title and item.get("level", target_level) == target_level:
+                current_idx = idx
+                break
+
+        if current_idx < 0:
+            return None
+
+        for idx in range(current_idx + 1, len(all_toc_items)):
+            item = all_toc_items[idx]
+            if item.get("level", 1) <= target_level:
+                boundary_title = item["title"]
+                # 优先使用已定位的位置
+                if boundary_title in found_titles_map:
+                    return found_titles_map[boundary_title]
+                # 回退:尝试在正文中直接定位
+                if full_text and self._title_matcher:
+                    pos = self._title_matcher._find_title_in_text(
+                        boundary_title,
+                        full_text,
+                        float(self._cfg.get("text_splitting.fuzzy_threshold", 0.8)),
+                    )
+                    if pos >= 0:
+                        return pos
+                return None
+
+        return None
+
     def _split_by_sub_titles(
         self,
         content_block: str,

+ 95 - 46
core/construction_review/component/doc_worker/utils/title_matcher.py

@@ -29,11 +29,14 @@ class TitleMatcher:
     ) -> List[Dict[str, Any]]:
         """
         在正文中定位已分类标题(跳过目录页范围)。
-        
+
         优化逻辑(参考 doc_worker):
         1. 先在全文中查找标题位置
         2. 如果找到的位置在目录页范围内,继续在目录页之后查找
         3. 如果找到的位置不在目录页范围内,直接使用该位置
+
+        修复:支持多位置匹配,结合 toc_page 进行页码择优,
+        避免将目录中的靠前匹配误当作正文标题,导致后续章节内容被错误合并。
         """
         # 计算目录页的文本范围
         toc_start_pos = float("inf")
@@ -47,58 +50,61 @@ class TitleMatcher:
 
         located: List[Dict[str, Any]] = []
         fuzzy_threshold = float(self._cfg.get("text_splitting.fuzzy_threshold", 0.8))
+        page_tolerance = int(self._cfg.get("text_splitting.page_tolerance", 10))
 
         for item in classified_items:
             title = item["title"]
             category = item.get("category", "")
             category_code = item.get("category_code", "other")
-
-            # 步骤1: 在全文中查找标题位置
-            pos = self._find_title_in_text(title, full_text, fuzzy_threshold)
-            
-            # 步骤2: 如果找到的位置在目录页范围内,继续在目录页之后查找
-            if pos >= 0 and toc_end_pos > 0 and toc_start_pos <= pos < toc_end_pos:
-                # 在目录页之后继续查找
-                if toc_end_pos < len(full_text):
-                    search_start = int(toc_end_pos)
-                    remaining_text = full_text[search_start:]
-                    pos_in_remaining = self._find_title_in_text(title, remaining_text, fuzzy_threshold)
-                    
-                    if pos_in_remaining >= 0:
-                        pos = search_start + pos_in_remaining
-                    else:
-                        pos = -1
+            toc_page = item.get("page", "")
+
+            # 步骤1: 查找所有匹配位置(完整标题 + 正文部分),并排除目录页
+            all_positions = self._find_all_valid_title_positions(
+                title, full_text, fuzzy_threshold, toc_start_pos, toc_end_pos
+            )
+
+            pos = -1
+            if all_positions:
+                # 步骤2: 如果有多个有效位置,根据 toc_page 选择最接近的位置
+                if len(all_positions) > 1 and toc_page:
+                    try:
+                        toc_page_num = int(toc_page)
+                        best_pos = all_positions[0]
+                        best_diff = abs(self._get_page_number(best_pos, pages_content) - toc_page_num)
+                        for candidate_pos in all_positions[1:]:
+                            candidate_page = self._get_page_number(candidate_pos, pages_content)
+                            diff = abs(candidate_page - toc_page_num)
+                            if diff < best_diff:
+                                best_diff = diff
+                                best_pos = candidate_pos
+                        pos = best_pos
+                    except ValueError:
+                        pos = all_positions[0]
                 else:
-                    pos = -1
-            
+                    pos = all_positions[0]
+
             # 步骤3: 确认位置并添加到结果
             if pos >= 0:
-                # 确认位置不在目录页(避免误判)
-                if not (toc_end_pos > 0 and toc_start_pos <= pos < toc_end_pos):
-                    page_num = self._get_page_number(pos, pages_content)
-                    located.append(
-                        {
-                            "title": title,
-                            "category": category,
-                            "category_code": category_code,
-                            "position": pos,
-                            "toc_page": item.get("page", ""),
-                            "actual_page": page_num,
-                            "found": True,
-                        }
-                    )
-                else:
-                    # 位置仍然在目录页内,标记为未找到
-                    located.append(
-                        {
-                            "title": title,
-                            "category": category,
-                            "category_code": category_code,
-                            "position": -1,
-                            "toc_page": item.get("page", ""),
-                            "found": False,
-                        }
-                    )
+                page_num = self._get_page_number(pos, pages_content)
+                # 页码校验:如果实际页码与目录页码差距过大,且存在其他候选,则标记为可疑
+                if toc_page:
+                    try:
+                        toc_page_num = int(toc_page)
+                        if abs(page_num - toc_page_num) > page_tolerance:
+                            print(f"    警告: 标题 '{title}' 匹配位置页码({page_num})与目录页码({toc_page_num})差距过大,可能存在错误匹配")
+                    except ValueError:
+                        pass
+                located.append(
+                    {
+                        "title": title,
+                        "category": category,
+                        "category_code": category_code,
+                        "position": pos,
+                        "toc_page": toc_page,
+                        "actual_page": page_num,
+                        "found": True,
+                    }
+                )
             else:
                 located.append(
                     {
@@ -106,13 +112,56 @@ class TitleMatcher:
                         "category": category,
                         "category_code": category_code,
                         "position": -1,
-                        "toc_page": item.get("page", ""),
+                        "toc_page": toc_page,
                         "found": False,
                     }
                 )
 
         return located
 
+    def _find_all_valid_title_positions(
+        self,
+        title: str,
+        text: str,
+        fuzzy_threshold: float,
+        toc_start_pos: float,
+        toc_end_pos: float,
+    ) -> List[int]:
+        """
+        查找标题在正文中的所有有效位置(排除目录页范围),并按位置排序。
+
+        策略:
+        1. 先找完整标题的所有位置;
+        2. 如果完整标题没找到,再找标题正文部分的所有位置;
+        3. 过滤掉目录页范围内的位置。
+        """
+        positions: List[int] = []
+
+        # 方法1: 完整标题匹配
+        full_positions = self._find_full_title_positions(title, text)
+        if full_positions:
+            positions = full_positions
+        else:
+            # 方法2: 标题正文部分匹配
+            title_content = self._extract_title_content(title)
+            if title_content:
+                content_positions = self._find_content_positions(title_content, text)
+                if content_positions:
+                    positions = content_positions
+            # 如果标题正文也没找到,回退到模糊匹配
+            if not positions:
+                legacy_pos = self._find_title_in_text_legacy(title, text, fuzzy_threshold)
+                if legacy_pos >= 0:
+                    positions = [legacy_pos]
+
+        # 过滤目录页范围
+        valid_positions = [
+            p for p in positions
+            if not (toc_end_pos > 0 and toc_start_pos <= p < toc_end_pos)
+        ]
+
+        return sorted(valid_positions)
+
     def _find_title_in_text(self, title: str, text: str, fuzzy_threshold: float) -> int:
         """
         在文本中查找标题的近似位置(返回标题在文本中的精确起始位置)。

+ 334 - 0
utils_test/Chunk_Split_Test/test_chunk_split_batch.py

@@ -0,0 +1,334 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+文档切分修复批量验证测试
+
+测试目标:批量验证多个 PDF 中最后一章是否被正确提取,无跨章节泄漏。
+"""
+
+import json
+import os
+import sys
+import traceback
+from datetime import datetime
+from pathlib import Path
+
+# 添加项目根目录到路径
+project_root = Path(__file__).parent.parent.parent
+sys.path.insert(0, str(project_root))
+
+from core.construction_review.component.doc_worker.pipeline import PipelineComponents, DefaultDocumentPipeline, DefaultFileParseFacade
+from core.construction_review.component.doc_worker.config.provider import default_config_provider
+from core.construction_review.component.doc_worker.pdf_worker.toc_extractor import PdfTOCExtractor
+from core.construction_review.component.doc_worker.classification.hierarchy_classifier import HierarchyClassifier
+from core.construction_review.component.doc_worker.pdf_worker.fulltext_extractor import PdfFullTextExtractor
+from core.construction_review.component.doc_worker.pdf_worker.text_splitter import PdfTextSplitter
+from core.construction_review.component.doc_worker.pdf_worker.json_writer import PdfJsonResultWriter
+
+
+TEST_DIR = Path("D:/wx_work/sichuan_luqiao/lu_sgsc_testfile")
+
+TEST_FILES = [
+    # 必须包含
+    Path("utils_test/Chunk_Split_Test/标准结构测试文件.pdf").resolve(),
+    # 代表性施工方案(按推荐优先级排序)
+    TEST_DIR / "测试模版-四川路桥专项施工方案框架以及编制说明(2025修订第三版)- v0.2.pdf",
+    TEST_DIR / "成渝扩容桥梁下部结构专项施工方案(正式版)(1).pdf",
+    TEST_DIR / "达州绕西高速西段RX2标段人工挖孔桩施工方案(2).pdf",
+    TEST_DIR / "高处作业安全带、防坠器系挂方案.2026.1.5改.pdf",
+    TEST_DIR / "四川智能建造科技股份有限公司G999线大源至中和高速公路TJ5项目经理部龙泉山左线特大桥T梁安装专项施工方案.pdf",
+    TEST_DIR / "主线天桥现浇箱梁支模体系(满堂支架)安全专项施工方案(1).pdf",
+]
+
+
+def build_test_facade():
+    components = PipelineComponents(
+        config=default_config_provider,
+        toc_extractor=PdfTOCExtractor(),
+        classifier=HierarchyClassifier(),
+        fulltext_extractor=PdfFullTextExtractor(),
+        splitter=PdfTextSplitter(),
+        writers=[PdfJsonResultWriter()],
+        chunk_classifier=None,
+    )
+    pipeline = DefaultDocumentPipeline(components)
+    return DefaultFileParseFacade(pipeline)
+
+
+def locate_existing_files() -> list[Path]:
+    existing = []
+    for p in TEST_FILES:
+        if p.exists():
+            existing.append(p)
+        else:
+            print(f"[SKIP] 文件不存在,跳过: {p}")
+    return existing
+
+
+def run_pipeline(file_path: Path, facade) -> dict:
+    print(f"\n[INFO] 正在处理: {file_path.name}")
+    result = facade.process_file(
+        file_path=file_path,
+        target_level=None,
+        max_chunk_size=None,
+        min_chunk_size=None,
+        output_dir=None,
+    )
+    return result
+
+
+def analyze_file(file_path: Path, result: dict) -> dict:
+    chunks = result.get("chunks") or []
+    toc_info = result.get("toc_info") or {}
+    toc_items = toc_info.get("toc_items") or []
+
+    section_labels = sorted({c.get("section_label", "UNKNOWN") for c in chunks})
+
+    # 一级章节标签:section_label 中不含 "->" 的部分
+    first_level_labels = []
+    for label in section_labels:
+        if "->" in label:
+            first = label.split("->")[0].strip()
+            if first not in first_level_labels:
+                first_level_labels.append(first)
+        else:
+            if label.strip() not in first_level_labels:
+                first_level_labels.append(label.strip())
+
+    # 找目录中 level=1 的最后一个章节
+    level1_items = [item for item in toc_items if item.get("level") == 1]
+    last_level1_item = level1_items[-1] if level1_items else None
+    last_level1_title = last_level1_item.get("title", "").strip() if last_level1_item else ""
+    last_level1_page = last_level1_item.get("page") if last_level1_item else None
+
+    # 判断最后一章是否有对应 chunk(模糊匹配标题)
+    def normalize(t: str) -> str:
+        return t.replace(" ", "").replace("\u3000", "").strip()
+
+    last_chapter_found = False
+    matched_label = None
+    if last_level1_title:
+        norm_target = normalize(last_level1_title)
+        for label in first_level_labels:
+            if norm_target in normalize(label) or normalize(label) in norm_target:
+                last_chapter_found = True
+                matched_label = label
+                break
+
+    # 检查最后一章 page 是否明显大于目录页范围(简单:page > toc_page + 2)
+    toc_page = toc_info.get("toc_page") or 1
+    try:
+        toc_page = int(toc_page)
+    except (ValueError, TypeError):
+        toc_page = 1
+    page_reasonable = False
+    if last_level1_page is not None:
+        try:
+            page_reasonable = int(last_level1_page) > toc_page + 2
+        except (ValueError, TypeError):
+            page_reasonable = False
+
+    # 检查跨章节泄漏
+    leak_detected = False
+    leak_details = []
+    if len(first_level_labels) >= 2 and last_level1_title:
+        # 倒数第二个一级章节
+        prev_first = first_level_labels[-2] if len(first_level_labels) >= 2 else None
+        if prev_first:
+            # 该一级章节下的所有 chunk(包含其二级节)中的最后一个 chunk
+            prev_chunks = [c for c in chunks if c.get("section_label", "").startswith(prev_first)]
+            if prev_chunks:
+                last_prev_chunk = prev_chunks[-1]
+                content = (last_prev_chunk.get("review_chunk_content", "") or "") + (last_prev_chunk.get("content", "") or "")
+                # 用最后一章标题的几个关键词检查是否混入
+                keywords = [k for k in last_level1_title.split() if len(k) >= 2]
+                if not keywords:
+                    keywords = [last_level1_title]
+                for kw in keywords:
+                    if kw in content:
+                        leak_detected = True
+                        leak_details.append({
+                            "chunk_id": last_prev_chunk.get("chunk_id"),
+                            "section_label": last_prev_chunk.get("section_label"),
+                            "keyword": kw,
+                        })
+
+    # 特殊情形:如果完全没有识别出章节标题(只有 fallback 的 "正文" chunk),
+    # 说明 toc_extractor 可能将正文页误判为目录页,导致 title_matcher 过滤掉所有匹配。
+    # 这与本次 "第十章被吞并" 的修复无关,单独标记。
+    if len(chunks) == 1 and len(section_labels) == 1 and section_labels[0] == "正文":
+        return {
+            "filename": file_path.name,
+            "total_chunks": len(chunks),
+            "total_level1": 0,
+            "last_level1_title": last_level1_title,
+            "last_level1_page": last_level1_page,
+            "last_chapter_found": False,
+            "last_chapter_label": None,
+            "page_reasonable": False,
+            "toc_page": toc_page,
+            "leak_detected": False,
+            "leak_details": [],
+            "section_labels": section_labels,
+            "return_code": 1,
+            "reasons": ["未能识别任何章节标题(可能目录页范围误判),无法评估切分修复效果"],
+        }
+
+    # 返回码判定
+    ret = 0
+    reasons = []
+    if not last_chapter_found:
+        ret = 1
+        reasons.append("最后一章未找到对应 chunk")
+    if not page_reasonable:
+        ret = 1
+        reasons.append("最后一章页码可能异常(落在目录页附近)")
+    if leak_detected:
+        ret = 1
+        reasons.append("发现跨章节内容泄漏")
+
+    return {
+        "filename": file_path.name,
+        "total_chunks": len(chunks),
+        "total_level1": len(first_level_labels),
+        "last_level1_title": last_level1_title,
+        "last_level1_page": last_level1_page,
+        "last_chapter_found": last_chapter_found,
+        "last_chapter_label": matched_label,
+        "page_reasonable": page_reasonable,
+        "toc_page": toc_page,
+        "leak_detected": leak_detected,
+        "leak_details": leak_details,
+        "section_labels": section_labels,
+        "return_code": ret,
+        "reasons": reasons,
+    }
+
+
+def print_summary(reports: list[dict]) -> str:
+    lines = []
+    lines.append("\n" + "=" * 80)
+    lines.append("批量切分测试汇总")
+    lines.append("=" * 80)
+
+    passed = 0
+    failed = 0
+    for r in reports:
+        status = "PASS" if r["return_code"] == 0 else "FAIL"
+        if r["return_code"] == 0:
+            passed += 1
+        else:
+            failed += 1
+        lines.append(f"\n文件: {r['filename']}")
+        lines.append(f"  状态: {status}")
+        lines.append(f"  总 chunk 数: {r['total_chunks']}")
+        lines.append(f"  总一级章节数: {r['total_level1']}")
+        lines.append(f"  最后一章标题: {r['last_level1_title']}")
+        lines.append(f"  最后一章页码: {r['last_level1_page']}")
+        lines.append(f"  最后一章提取成功: {r['last_chapter_found']} ({r['last_chapter_label'] or 'N/A'})")
+        lines.append(f"  页码合理: {r['page_reasonable']} (目录页={r['toc_page']})")
+        lines.append(f"  跨章节泄漏: {r['leak_detected']}")
+        if r["leak_details"]:
+            for d in r["leak_details"]:
+                lines.append(f"    -> {d['chunk_id']} ({d['section_label']}) 包含 '{d['keyword']}'")
+        if r["reasons"]:
+            lines.append(f"  不通过原因: {'; '.join(r['reasons'])}")
+
+    lines.append("\n" + "-" * 80)
+    lines.append(f"汇总: {passed} 通过, {failed} 失败 / 总计 {len(reports)} 个文件")
+    lines.append("=" * 80)
+    summary = "\n".join(lines)
+    print(summary)
+    return summary
+
+
+def main() -> int:
+    files = locate_existing_files()
+    if not files:
+        print("[ERROR] 没有可用的测试文件。")
+        return 1
+
+    facade = build_test_facade()
+    reports = []
+    errors = []
+
+    for fp in files:
+        try:
+            result = run_pipeline(fp, facade)
+            report = analyze_file(fp, result)
+            reports.append(report)
+        except Exception as e:
+            print(f"[ERROR] 处理失败: {fp.name} -> {e}")
+            traceback.print_exc()
+            errors.append({"filename": fp.name, "error": str(e)})
+
+    summary = print_summary(reports)
+
+    # 写出报告和中间 JSON
+    out_dir = Path(__file__).parent
+    md_path = out_dir / "batch_test_report.md"
+    json_path = out_dir / "batch_test_result.json"
+
+    with open(json_path, "w", encoding="utf-8") as f:
+        json.dump({
+            "timestamp": datetime.now().isoformat(),
+            "reports": reports,
+            "errors": errors,
+        }, f, ensure_ascii=False, indent=2)
+    print(f"[INFO] JSON 结果已保存: {json_path}")
+
+    md_content = f"""# 文档切分修复批量测试报告
+
+生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
+
+## 测试文件列表
+
+"""
+    for fp in files:
+        md_content += f"- `{fp.name}`\n"
+
+    md_content += "\n## 详细结果\n\n"
+    for r in reports:
+        status = "PASS" if r["return_code"] == 0 else "FAIL"
+        md_content += f"### {r['filename']} — {status}\n\n"
+        md_content += f"- 总 chunk 数: {r['total_chunks']}\n"
+        md_content += f"- 总一级章节数: {r['total_level1']}\n"
+        md_content += f"- 最后一章标题: {r['last_level1_title']}\n"
+        md_content += f"- 最后一章页码: {r['last_level1_page']}\n"
+        md_content += f"- 最后一章提取成功: {'是' if r['last_chapter_found'] else '否'} (`{r['last_chapter_label'] or 'N/A'}`)\n"
+        md_content += f"- 页码合理: {'是' if r['page_reasonable'] else '否'} (目录页={r['toc_page']})\n"
+        md_content += f"- 跨章节泄漏: {'是' if r['leak_detected'] else '否'}\n"
+        if r["leak_details"]:
+            md_content += "  泄漏详情:\n"
+            for d in r["leak_details"]:
+                md_content += f"  - `{d['chunk_id']}` (`{d['section_label']}`) 包含关键词 `{d['keyword']}`\n"
+        if r["reasons"]:
+            md_content += f"- 不通过原因: **{';'.join(r['reasons'])}**\n"
+        md_content += "\n"
+
+    if errors:
+        md_content += "## 运行错误\n\n"
+        for e in errors:
+            md_content += f"- `{e['filename']}`: {e['error']}\n"
+        md_content += "\n"
+
+    total = len(reports)
+    passed = sum(1 for r in reports if r["return_code"] == 0)
+    failed = total - passed
+    md_content += f"""## 汇总
+
+- 通过: {passed}
+- 失败: {failed}
+- 总计: {total}
+- 运行错误: {len(errors)}
+"""
+
+    with open(md_path, "w", encoding="utf-8") as f:
+        f.write(md_content)
+    print(f"[INFO] Markdown 报告已保存: {md_path}")
+
+    return 0
+
+
+if __name__ == "__main__":
+    sys.exit(main())

+ 255 - 0
utils_test/Chunk_Split_Test/test_chunk_split_fix.py

@@ -0,0 +1,255 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+文档切分模块修复验证测试
+
+测试目标:验证 "第十章 其他资料" 内容不会被错误合并到 "第九章 验收要求->五、验收人员" 中。
+
+问题根因:
+- `title_matcher.find_title_positions` 只取第一个匹配,导致第十章标题被错误定位到目录页(page 6)。
+- 真正的第十章(page 46)未被发现,第九章成为最后一项,content_block 延伸到全文末尾。
+- "计算书"、"相关施工图纸"、"编制及审核人员情况" 全部被合并进 doc_chunk_第九章->五_1。
+
+修复点:
+1. title_matcher.py:支持多位置匹配,结合 toc_page 页码择优。
+2. text_splitter.py:增加 all_toc_items 硬边界保护,防止 content_block 跨章节溢出。
+
+运行方式:
+  python utils_test/Chunk_Split_Test/test_chunk_split_fix.py
+
+可选环境变量:
+  TEST_PDF_PATH=xxx.pdf  指定自定义 PDF 测试文档
+"""
+
+import json
+import os
+import sys
+from pathlib import Path
+
+# 添加项目根目录到路径
+project_root = Path(__file__).parent.parent.parent
+sys.path.insert(0, str(project_root))
+
+from core.construction_review.component.doc_worker.pipeline import PipelineComponents, DefaultDocumentPipeline, DefaultFileParseFacade
+from core.construction_review.component.doc_worker.config.provider import default_config_provider
+from core.construction_review.component.doc_worker.pdf_worker.toc_extractor import PdfTOCExtractor
+from core.construction_review.component.doc_worker.classification.hierarchy_classifier import HierarchyClassifier
+from core.construction_review.component.doc_worker.pdf_worker.fulltext_extractor import PdfFullTextExtractor
+from core.construction_review.component.doc_worker.pdf_worker.text_splitter import PdfTextSplitter
+from core.construction_review.component.doc_worker.pdf_worker.json_writer import PdfJsonResultWriter
+
+
+# 默认测试文档:四川路桥测试模版 PDF(注意:doc_worker CLI 目前仅支持 PDF)
+DEFAULT_TEST_PDF = Path("D:/wx_work/sichuan_luqiao/lu_sgsc_testfile/测试模版-四川路桥专项施工方案框架以及编制说明(2025修订第三版)- v0.2.pdf")
+ALTERNATIVE_TEST_DOCX = project_root / "utils_test" / "Completeness_Test" / "测试模版-四川路桥专项施工方案框架以及编制说明(2025修订第三版)- v0.2.docx"
+
+
+def build_test_facade():
+    """
+    构建一个轻量级 facade:
+    - 跳过 chunk 分类(避免大量 LLM 调用)
+    - 使用 PyMuPDF 纯本地提取(避免 MinerU OCR 的耗时网络调用)
+    """
+    components = PipelineComponents(
+        config=default_config_provider,
+        toc_extractor=PdfTOCExtractor(),
+        classifier=HierarchyClassifier(),
+        fulltext_extractor=PdfFullTextExtractor(),  # 纯本地,速度远快于 Hybrid/MinerU
+        splitter=PdfTextSplitter(),
+        writers=[PdfJsonResultWriter()],
+        chunk_classifier=None,  # 关键:跳过二级/三级分类
+    )
+    pipeline = DefaultDocumentPipeline(components)
+    return DefaultFileParseFacade(pipeline)
+
+
+def locate_test_file() -> Path | None:
+    """定位可用的测试文档。"""
+    custom = os.environ.get("TEST_PDF_PATH")
+    if custom:
+        p = Path(custom)
+        if p.exists():
+            return p
+        print(f"[WARN] 自定义测试文件不存在: {p}")
+
+    if DEFAULT_TEST_PDF.exists():
+        return DEFAULT_TEST_PDF
+
+    # 如果只有 docx,提示用户
+    if ALTERNATIVE_TEST_DOCX.exists():
+        print(f"[WARN] 找到 docx 版本但 pdf_worker 暂不支持 docx: {ALTERNATIVE_TEST_DOCX}")
+        print(f"[HINT] 请将 docx 另存为 PDF 后放到: {DEFAULT_TEST_PDF}")
+
+    return None
+
+
+def run_pipeline(file_path: Path) -> dict:
+    """运行 doc_worker 管线,返回结果。"""
+    print(f"\n[INFO] 正在处理文档: {file_path}")
+    print("[INFO] 使用测试 facade(仅 TOC + 一级分类 + 切分,跳过 chunk 级 LLM 分类)")
+
+    facade = build_test_facade()
+    result = facade.process_file(
+        file_path=file_path,
+        target_level=None,      # 使用配置默认值
+        max_chunk_size=None,
+        min_chunk_size=None,
+        output_dir=None,
+    )
+    return result
+
+
+def analyze_chunks(result: dict) -> dict:
+    """分析 chunks 结构,提取关键指标。"""
+    chunks = result.get("chunks", []) or []
+    toc_info = result.get("toc_info", {}) or {}
+    classification = result.get("classification", {}) or {}
+
+    # 按 section_label 分组
+    section_to_chunks: dict[str, list[dict]] = {}
+    for chunk in chunks:
+        label = chunk.get("section_label", "UNKNOWN")
+        section_to_chunks.setdefault(label, []).append(chunk)
+
+    # 定位关键 chunk
+    chapter_10_chunks = [c for c in chunks if "第十章" in c.get("section_label", "")]
+    chapter_9_last_chunks = [c for c in chunks if c.get("section_label", "").startswith("第九章")]
+
+    # 找 "第九章->五" 的 chunk(问题原型的重灾区)
+    nine_five_chunks = section_to_chunks.get("第九章 验收要求->五、 验收人员", [])
+
+    # 提取 "计算书" 等关键词是否出现在不该出现的位置
+    leak_keywords = ["计算书", "相关施工图纸", "编制及审核人员情况"]
+    leaks: list[dict] = []
+    for chunk in chunks:
+        label = chunk.get("section_label", "")
+        if "第九章" in label and "验收人员" in label:
+            content = chunk.get("review_chunk_content", "") + chunk.get("content", "")
+            for kw in leak_keywords:
+                if kw in content:
+                    leaks.append({"chunk_id": chunk.get("chunk_id"), "section_label": label, "keyword": kw})
+
+    return {
+        "total_chunks": len(chunks),
+        "toc_count": toc_info.get("toc_count", 0),
+        "target_level": classification.get("target_level"),
+        "section_labels": sorted(section_to_chunks.keys()),
+        "chapter_10_chunks": chapter_10_chunks,
+        "chapter_9_last_chunks": chapter_9_last_chunks,
+        "nine_five_chunks": nine_five_chunks,
+        "leaks": leaks,
+        "chunks": chunks,
+    }
+
+
+def print_report(report: dict) -> None:
+    """打印readable报告。"""
+    print("\n" + "=" * 80)
+    print("文档切分修复验证报告")
+    print("=" * 80)
+    print(f"总 chunk 数: {report['total_chunks']}")
+    print(f"目录项数: {report['toc_count']}")
+    print(f"切分目标层级: {report['target_level']}")
+
+    print("\n[SECTION_LABEL 列表]")
+    for label in report["section_labels"]:
+        print(f"  - {label}")
+
+    print("\n[第十章相关 chunks]")
+    if report["chapter_10_chunks"]:
+        for c in report["chapter_10_chunks"]:
+            print(f"  {c.get('chunk_id')} | {c.get('section_label')} | page={c.get('element_tag', {}).get('page')}")
+    else:
+        print("  (无) —— 严重异常!")
+
+    print("\n[第九章 验收人员 chunks]")
+    if report["nine_five_chunks"]:
+        for c in report["nine_five_chunks"]:
+            print(f"  {c.get('chunk_id')} | {c.get('section_label')} | page={c.get('element_tag', {}).get('page')}")
+    else:
+        print("  (无)")
+
+    print("\n[内容泄漏检查]")
+    if report["leaks"]:
+        print("  FAIL —— 发现第十章关键词出现在第九章 chunk 中!")
+        for leak in report["leaks"]:
+            print(f"    -> {leak['chunk_id']} ({leak['section_label']}) 包含 '{leak['keyword']}'")
+    else:
+        print("  PASS —— 未发现跨章节内容泄漏。")
+
+    print("\n[断言检查]")
+    passed = 0
+    failed = 0
+
+    # 断言1: 必须存在第十章的 chunk
+    labels = report["section_labels"]
+    chapter_10_exists = any("第十章" in l for l in labels)
+    if chapter_10_exists:
+        print("  [PASS] 存在 section_label 包含 '第十章' 的 chunk")
+        passed += 1
+    else:
+        print("  [FAIL] 未找到任何 section_label 包含 '第十章' 的 chunk")
+        failed += 1
+
+    # 断言2: 第九章->五 不应该包含第十章关键词
+    if not report["leaks"]:
+        print("  [PASS] 第九章->五 未包含第十章专属关键词")
+        passed += 1
+    else:
+        print("  [FAIL] 第九章->五 包含第十章专属关键词")
+        failed += 1
+
+    # 断言3: 第十章不应该有 page=6 的异常 chunk
+    abnormal_page_6 = [
+        c for c in report["chapter_10_chunks"]
+        if c.get("element_tag", {}).get("page") == 6
+    ]
+    if not abnormal_page_6:
+        print("  [PASS] 未发现 page=6 的异常第十章 chunk")
+        passed += 1
+    else:
+        print(f"  [FAIL] 发现 {len(abnormal_page_6)} 个 page=6 的异常第十章 chunk")
+        for c in abnormal_page_6:
+            print(f"       {c.get('chunk_id')} | {c.get('section_label')}")
+        failed += 1
+
+    print(f"\n结果: {passed} 通过, {failed} 失败")
+    print("=" * 80)
+
+
+def main() -> int:
+    test_file = locate_test_file()
+    if not test_file:
+        print("[ERROR] 未找到可用的测试 PDF 文档。")
+        print(f"[INFO] 请通过环境变量指定: TEST_PDF_PATH=xxx.pdf python {__file__}")
+        return 1
+
+    result = run_pipeline(test_file)
+    report = analyze_chunks(result)
+    print_report(report)
+
+    # 写出中间结果,方便后续人工排查
+    output_path = Path(__file__).parent / "last_test_result.json"
+    with open(output_path, "w", encoding="utf-8") as f:
+        # 只保留可读的关键字段
+        dump_data = {
+            "source": str(test_file),
+            "section_labels": report["section_labels"],
+            "chunks_summary": [
+                {
+                    "chunk_id": c.get("chunk_id"),
+                    "section_label": c.get("section_label"),
+                    "page": c.get("element_tag", {}).get("page"),
+                    "content_preview": (c.get("review_chunk_content", "") or c.get("content", ""))[:200].replace("\n", " ") + "...",
+                }
+                for c in result.get("chunks", [])
+            ],
+        }
+        json.dump(dump_data, f, ensure_ascii=False, indent=2)
+    print(f"[INFO] 摘要已保存到: {output_path}")
+
+    return 0 if report["leaks"] == [] and any("第十章" in l for l in report["section_labels"]) else 1
+
+
+if __name__ == "__main__":
+    sys.exit(main())