Преглед изворни кода

Merge branch 'dev-new-rules' of CRBC-MaaS-Platform-Project/LQAgentPlatform into dev

tangle пре 3 дана
родитељ
комит
96e57a3723

+ 1865 - 0
core/construction_review/component/minimal_pipeline/pdf_extractor2.py

@@ -0,0 +1,1865 @@
+"""
+PDF 结构提取器 - 同步并发 OCR 版本
+
+基于 splitter_pdf 逻辑,直接提取章节结构并记录页码。
+支持 OCR 增强:检测表格区域并使用 ThreadPoolExecutor 5并发 OCR,其他文本保持 PyMuPDF 提取。
+输出格式兼容后续分类与组装流程。
+"""
+
+import base64
+import io
+import re
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from dataclasses import dataclass
+from typing import Dict, Any, List, Optional, Tuple, Set
+
+import fitz
+import numpy as np
+import requests
+
+from foundation.observability.logger.loggering import review_logger as logger
+
+# 尝试导入 RapidLayout
+try:
+    from rapid_layout import RapidLayout
+    RAPID_LAYOUT_AVAILABLE = True
+except ImportError:
+    RAPID_LAYOUT_AVAILABLE = False
+    RapidLayout = None
+
+
+@dataclass
+class TableRegion:
+    """表格区域信息"""
+    page_num: int
+    page: fitz.Page
+    bbox: Tuple[float, float, float, float]
+    score: float
+
+
+@dataclass
+class OcrResult:
+    """OCR 结果"""
+    page_num: int
+    bbox: Tuple[float, float, float, float]
+    score: float
+    text: str
+    success: bool
+
+
+class PdfStructureExtractor:
+    """PDF 章节结构提取器(支持 OCR 异步并发)"""
+
+    RULE_LIB = {
+        "Rule_1_纯数字派": {
+            "l1": re.compile(r"^\d{1,2}(?:[\..。、])?\s*(?!\d)[\u4e00-\u9fa5A-Za-z].*"),
+            "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5A-Za-z].*)"),
+        },
+        "Rule_2_混合章派": {
+            "l1": re.compile(r"^第\s*(\d+)\s*[章部分篇][\s、]*(.*)"),
+            "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5A-Za-z].*)"),
+        },
+        "Rule_3_中英混血派": {
+            "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部分篇][\s、]*(.*)"),
+            "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5A-Za-z].*)"),
+        },
+        "Rule_4_传统公文派": {
+            "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部分篇][\s、]*(.*)"),
+            "l2": re.compile(r"^([一二三四五六七八九十百零两]+)[、\s]+([\u4e00-\u9fa5A-Za-z].*)"),
+        },
+        "Rule_5_单边括号派": {
+            "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部分篇][\s、]*(.*)"),
+            "l2": re.compile(r"^([一二三四五六七八九十百零两]+)[)\)\]][\s]*([\u4e00-\u9fa5A-Za-z].*)"),
+        },
+        "Rule_6_小节派": {
+            "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部分篇][\s、]*(.*)"),
+            "l2": re.compile(r"^第\s*([一二三四五六七八九十百零两]+)\s*节[\s、]*([\u4e00-\u9fa5A-Za-z].*)"),
+        },
+        "Rule_7_粗体括号派": {
+            "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部分篇][\s、]*(.*)"),
+            "l2": re.compile(r"^[【\[]\s*(\d+)\s*[\]】][\s]*([\u4e00-\u9fa5A-Za-z].*)"),
+        },
+        "Rule_8_cn_list_l1_numeric_l2": {
+            "l1": re.compile(
+                r"^(?:[一二三四五六七八九十百零两]+)[、\)\]\uFF09]\s*[\u4e00-\u9fa5A-Za-z].*"
+            ),
+            "l2": re.compile(
+                r"^\d{1,2}(?:[、\.\uFF0E\u3002\)\]\uFF09])\s*(?!\d)[\u4e00-\u9fa5A-Za-z].*"
+            ),
+        },
+    }
+    TOC_PATTERN = re.compile(r"\.{3,}|…{2,}")
+
+    # OCR 配置
+    MAX_SHORT_EDGE = 1024
+    JPEG_QUALITY = 90
+    OCR_DPI = 200
+    OCR_CONFIDENCE_THRESHOLD = 0.5
+    OCR_CONCURRENT_WORKERS = 5
+
+    def __init__(
+        self,
+        clip_top: float = 60,
+        clip_bottom: float = 60,
+        use_ocr: bool = False,
+        ocr_api_url: str = "http://183.220.37.46:25429/v1/chat/completions",
+        ocr_timeout: int = 600,
+        ocr_api_key: str = "",
+        detect_toc: bool = True,
+        toc_model_path: str = "config/yolo/best.pt",
+    ):
+        self.clip_top = clip_top
+        self.clip_bottom = clip_bottom
+        self.use_ocr = use_ocr and RAPID_LAYOUT_AVAILABLE
+
+        # OCR 配置
+        self.ocr_api_url = ocr_api_url
+        self.ocr_timeout = ocr_timeout
+        self.ocr_api_key = ocr_api_key
+        self._layout_engine: Optional[Any] = None
+
+        # 目录检测配置
+        self.detect_toc = detect_toc
+        self.toc_model_path = toc_model_path
+        self._toc_extractor = None
+
+        if use_ocr and not RAPID_LAYOUT_AVAILABLE:
+            logger.warning("RapidLayout 未安装,OCR 功能不可用")
+
+    def _get_layout_engine(self) -> Optional[Any]:
+        """延迟初始化 RapidLayout"""
+        if self._layout_engine is None and RAPID_LAYOUT_AVAILABLE:
+            self._layout_engine = RapidLayout()
+        return self._layout_engine
+
+    def extract(self, file_content: bytes, progress_callback=None) -> Dict[str, Any]:
+        """
+        从 PDF 字节流提取章节结构。
+
+        Args:
+            file_content: PDF 文件字节流
+            progress_callback: 进度回调函数,接收 (stage, current, message) 参数
+
+        Returns:
+            {
+                "chapters": {
+                    "第一章 xxx": {
+                        "章节标题": {"content": "...", "page_start": 1, "page_end": 1},
+                        "一、xxx": {"content": "...", "page_start": 2, "page_end": 3},
+                    }
+                },
+                "total_pages": N,
+                "catalog": {  # 目录结构(YOLO检测+OCR提取)
+                    "chapters": [...],
+                    "total_chapters": N
+                }
+            }
+        """
+        result = {"chapters": {}, "total_pages": 0, "catalog": None}
+
+        # === 阶段0: 目录页检测与提取(如果启用)===
+        if self.detect_toc:
+            try:
+                catalog = self._extract_catalog(file_content, progress_callback)
+                if catalog:
+                    catalog = self._normalize_catalog(catalog)
+                    result["catalog"] = catalog
+                    logger.info(f"[PDF提取] 目录提取完成: {catalog.get('total_chapters', 0)} 章")
+            except Exception as e:
+                logger.warning(f"[PDF提取] 目录提取失败: {e}")
+
+        # === 阶段1-3: 文档结构提取 ===
+        doc = fitz.open(stream=file_content)
+        try:
+            structure = self._extract_from_doc(doc, progress_callback)
+            if result.get("catalog"):
+                # 正文抽取和目录检测是两条独立链路:
+                # 1. 正文抽取更容易拿到连续 content
+                # 2. 目录检测更容易保留顺序和层级
+                # 这里先用目录骨架对齐正文,再按标题边界重建内容,尽量减少漏标题造成的结构缺失。
+                structure["chapters"] = self._reconcile_structure_with_catalog(
+                    structure.get("chapters", {}),
+                    result["catalog"],
+                )
+                rebuilt_chapters = self._rebuild_section_contents_from_catalog(
+                    structure.get("chapters", {}),
+                    result["catalog"],
+                    structure.get("_body_lines", []),
+                )
+                if rebuilt_chapters:
+                    structure["chapters"] = rebuilt_chapters
+                enriched_catalog = self._enrich_catalog_with_structure(
+                    result["catalog"],
+                    structure.get("chapters", {}),
+                )
+                if enriched_catalog:
+                    result["catalog"] = enriched_catalog
+            structure.pop("_body_lines", None)
+            result["chapters"] = structure.get("chapters", {})
+            result["total_pages"] = len(doc)
+            return result
+        finally:
+            doc.close()
+
+    def _extract_catalog(self, file_content: bytes, progress_callback=None) -> Optional[Dict[str, Any]]:
+        """
+        提取目录结构(YOLO检测 + OCR识别)
+
+        Returns:
+            {"chapters": [...], "total_chapters": N} 或 None
+        """
+        # 延迟导入避免循环依赖(YOLO依赖必须存在,否则报错)
+        from .toc_detector import TOCCatalogExtractor
+
+        if self._toc_extractor is None:
+            self._toc_extractor = TOCCatalogExtractor(
+                model_path=self.toc_model_path,
+                ocr_api_url=self.ocr_api_url,
+                ocr_api_key=self.ocr_api_key,
+                ocr_timeout=self.ocr_timeout,
+            )
+
+        return self._toc_extractor.detect_and_extract(file_content, progress_callback)
+
+    def _extract_from_doc(self, doc: fitz.Document, progress_callback=None) -> Dict[str, Any]:
+        """提取文档结构(支持 OCR 异步并发)。
+
+        整体分三步:
+        1. 先扫描页面,找出需要 OCR 替换的表格区域
+        2. 并发执行 OCR,并把识别结果按页回填
+        3. 重新遍历页面文本,按标题规则切出 chapter / section 结构
+        """
+
+        def _emit_progress(stage: str, current: int, message: str):
+            """发送进度回调"""
+            if progress_callback:
+                try:
+                    progress_callback(stage, current, message)
+                except Exception:
+                    pass
+
+        # === 阶段1: 收集所有需要 OCR 的表格区域 ===
+        table_regions: List[TableRegion] = []
+
+        if self.use_ocr:
+            logger.info("[OCR预处理] 扫描所有页面的表格区域...")
+            total_pages = len(doc)
+            for page_num in range(total_pages):
+                page = doc.load_page(page_num)
+                rect = page.rect
+                clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
+                regions = self._detect_table_regions(page, page_num + 1, clip_box)
+                for bbox, score in regions:
+                    table_regions.append(TableRegion(
+                        page_num=page_num + 1,
+                        page=page,
+                        bbox=bbox,
+                        score=score
+                    ))
+                # 每5页或最后一页推送一次进度
+                if (page_num + 1) % 5 == 0 or page_num == total_pages - 1:
+                    progress = int((page_num + 1) / total_pages * 30)  # OCR预处理占30%进度
+                    _emit_progress("版面分析", progress, f"扫描页面 {page_num + 1}/{total_pages}")
+            logger.info(f"[OCR预处理] 共发现 {len(table_regions)} 个表格区域需要 OCR")
+
+        # === 阶段2: 异步并发执行 OCR (5并发) ===
+        ocr_results: List[OcrResult] = []
+
+        if table_regions:
+            logger.info(f"[OCR执行] 使用 {self.OCR_CONCURRENT_WORKERS} 并发执行 OCR...")
+            _emit_progress("版面分析", 35, f"发现 {len(table_regions)} 个表格,开始OCR识别...")
+            ocr_results = self._process_ocr_concurrent(table_regions, progress_callback=_emit_progress)
+            success_count = sum(1 for r in ocr_results if r.success)
+            logger.info(f"[OCR执行] 完成 {success_count}/{len(table_regions)} 个表格 OCR")
+            _emit_progress("版面分析", 50, f"OCR识别完成 {success_count}/{len(table_regions)}")
+
+        # 按页码分组 OCR 结果
+        ocr_by_page: Dict[int, List[OcrResult]] = {}
+        for result in ocr_results:
+            if result.success:
+                if result.page_num not in ocr_by_page:
+                    ocr_by_page[result.page_num] = []
+                ocr_by_page[result.page_num].append(result)
+
+        # === 阶段3: 提取页面文本(应用 OCR 结果)并切分章节 ===
+        structured_data: Dict[str, Dict[str, Dict[str, Any]]] = {}
+        # body_lines 保留过滤页眉页脚后的线性正文,后续目录回填时会再次按标题边界切段。
+        body_lines: List[Dict[str, Any]] = []
+        current_chapter = "未分类前言"
+        current_section = "默认部分"
+        in_body = False
+        candidate_rule_names: Optional[List[str]] = None
+        active_rule_name: Optional[str] = None
+
+        logger.info("[文本提取] 提取页面内容并切分章节...")
+
+        for page_num in range(len(doc)):
+            page = doc.load_page(page_num)
+            rect = page.rect
+            clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
+
+            # 获取页面文本(应用 OCR 结果)
+            if page_num + 1 in ocr_by_page:
+                original_text = page.get_text("text", clip=clip_box)
+                ocr_results_list = [
+                    {
+                        "region_index": i,
+                        "bbox": r.bbox,
+                        "score": r.score,
+                        "ocr_text": r.text,
+                    }
+                    for i, r in enumerate(ocr_by_page[page_num + 1])
+                ]
+                text = self._replace_table_regions(page, original_text, ocr_results_list, clip_box)
+            else:
+                text = page.get_text("text", clip=clip_box)
+
+            lines = self._prepare_page_lines(text)
+            for line in lines:
+                if not line or self._is_header_footer(line):
+                    continue
+                body_lines.append({
+                    "page": page_num + 1,
+                    "text": line,
+                })
+
+            for line in lines:
+                line = line.strip()
+                if not line:
+                    continue
+                if self._is_header_footer(line):
+                    continue
+
+                # 跳过目录阶段
+                if not in_body:
+                    # 只有首次遇到真正的一级标题后,才认为进入正文。
+                    # 这样可以避免目录页虽然命中标题规则,却被误当成正文结构。
+                    matched_rules = self._matching_rule_names(line, "l1")
+                    if matched_rules and not self.TOC_PATTERN.search(line):
+                        in_body = True
+                        candidate_rule_names = matched_rules
+                    else:
+                        continue
+
+                # 跳过残余目录格式
+                if self.TOC_PATTERN.search(line):
+                    continue
+
+                # candidate_rule_names 表示“这篇文档可能使用的标题体系”;
+                # active_rule_name 表示“已经确认正在使用的二级标题规则”。
+                # 先宽松候选、后收敛到单一规则,可以减少混合编号文档里的串匹配。
+                active_scope = [active_rule_name] if active_rule_name else candidate_rule_names
+
+                # 匹配章标题
+                matched_chapter_rules = self._matching_rule_names(line, "l1", active_scope)
+                if matched_chapter_rules:
+                    if active_rule_name is None:
+                        candidate_rule_names = matched_chapter_rules
+                    current_chapter = self._clean_chapter_title(line)
+                    current_section = "章节标题"
+                    if current_chapter not in structured_data:
+                        structured_data[current_chapter] = {}
+                    if current_section not in structured_data[current_chapter]:
+                        structured_data[current_chapter][current_section] = {
+                            "lines": [],
+                            "page_start": page_num + 1,
+                            "page_end": page_num + 1,
+                        }
+                    continue
+
+                # 匹配节标题
+                matched_section_rules = self._matching_rule_names(line, "l2", active_scope)
+                if matched_section_rules:
+                    if active_rule_name is None:
+                        if candidate_rule_names:
+                            for rule_name in candidate_rule_names:
+                                if rule_name in matched_section_rules:
+                                    active_rule_name = rule_name
+                                    break
+                        if active_rule_name is None:
+                            active_rule_name = matched_section_rules[0]
+                    current_section = self._clean_section_title(line)
+                    if current_chapter not in structured_data:
+                        structured_data[current_chapter] = {}
+                    if current_section not in structured_data[current_chapter]:
+                        structured_data[current_chapter][current_section] = {
+                            "lines": [],
+                            "page_start": page_num + 1,
+                            "page_end": page_num + 1,
+                        }
+                    continue
+
+                # 确保结构存在
+                if current_chapter not in structured_data:
+                    structured_data[current_chapter] = {}
+                if current_section not in structured_data[current_chapter]:
+                    structured_data[current_chapter][current_section] = {
+                        "lines": [],
+                        "page_start": page_num + 1,
+                        "page_end": page_num + 1,
+                    }
+
+                # 添加内容
+                structured_data[current_chapter][current_section]["lines"].append(line)
+                structured_data[current_chapter][current_section]["page_end"] = page_num + 1
+
+        # 将行列表拼接为文本
+        result: Dict[str, Any] = {"chapters": {}, "_body_lines": body_lines}
+        for chap, sections in structured_data.items():
+            result["chapters"][chap] = {}
+            for sec, data in sections.items():
+                result["chapters"][chap][sec] = {
+                    "content": "\n".join(data["lines"]),
+                    "page_start": data["page_start"],
+                    "page_end": data["page_end"],
+                }
+
+        logger.info(f"[PdfExtractor] 提取完成,共 {len(result['chapters'])} 个章节")
+        return result
+
+    def _normalize_catalog(self, catalog: Dict[str, Any]) -> Dict[str, Any]:
+        """统一目录来源并择优合并。
+
+        目录检测器输出的 chapters 更像“骨架”,raw_ocr_text 更接近页面原文。
+        这里会分别解析两份结果,判断谁更可信,再做一次合并补齐。
+        """
+        if not catalog:
+            return {}
+
+        normalized = dict(catalog)
+        existing_chapters = self._sanitize_catalog_chapters(catalog.get("chapters", []))
+        raw_text = catalog.get("raw_ocr_text", "")
+        parsed_chapters = self._parse_catalog_from_raw_text(raw_text) if isinstance(raw_text, str) else []
+        selected_chapters = existing_chapters
+
+        if parsed_chapters:
+            if self._should_prefer_parsed_catalog(parsed_chapters, existing_chapters):
+                selected_chapters = parsed_chapters
+            elif existing_chapters:
+                logger.info(
+                    "[PDF提取] raw_ocr_text目录解析结果异常,保留原始目录骨架: "
+                    f"parsed={len(parsed_chapters)}, original={len(existing_chapters)}"
+                )
+            else:
+                selected_chapters = parsed_chapters
+
+        if selected_chapters:
+            selected_chapters = self._merge_catalog_chapters(
+                selected_chapters,
+                parsed_chapters,
+            )
+            normalized["chapters"] = selected_chapters
+            normalized["total_chapters"] = len(selected_chapters)
+            normalized["formatted_text"] = self._format_catalog_chapters(selected_chapters)
+        return normalized
+
+    def _parse_catalog_from_raw_text(self, text: str) -> List[Dict[str, Any]]:
+        """把目录页 OCR 原文解析成章节树。
+
+        解析时会先根据首批命中的一级标题推断文档的目录样式,
+        后续再尽量沿用同一套规则收敛二级标题,避免不同编号体系互相污染。
+        """
+        if not text or not text.strip():
+            return []
+
+        chapters: List[Dict[str, Any]] = []
+        current_chapter: Optional[Dict[str, Any]] = None
+        active_l2_rule: Optional[str] = None
+        document_l1_rules: Optional[List[str]] = None
+
+        for raw_line in self._prepare_catalog_raw_lines(text):
+            title_text, page = self._split_catalog_entry(raw_line)
+            if not title_text:
+                continue
+
+            compact = re.sub(r"\s+", "", title_text)
+            if compact in {"目录", "目錄"}:
+                continue
+
+            chapter_matches = self._matching_rule_names(title_text, "l1", document_l1_rules)
+            if chapter_matches:
+                if document_l1_rules is None:
+                    document_l1_rules = chapter_matches
+                current_chapter = {
+                    "index": len(chapters) + 1,
+                    "title": self._clean_chapter_title(title_text),
+                    "page": str(page or 1),
+                    "original": raw_line.strip(),
+                    "subsections": [],
+                }
+                chapters.append(current_chapter)
+                active_l2_rule = None
+                continue
+
+            if current_chapter is None:
+                continue
+
+            section_matches = self._matching_rule_names(title_text, "l2")
+            if not section_matches:
+                numeric_section_title = self._coerce_numeric_catalog_section(
+                    title_text,
+                    document_l1_rules,
+                    active_l2_rule,
+                )
+                if numeric_section_title:
+                    section_key = self._normalize_heading_key(numeric_section_title)
+                    existing_keys = {
+                        self._normalize_heading_key(sub.get("title", ""))
+                        for sub in current_chapter.get("subsections", [])
+                    }
+                    if section_key not in existing_keys:
+                        current_chapter["subsections"].append({
+                            "title": numeric_section_title,
+                            "page": str(page or current_chapter.get("page", 1)),
+                            "level": 2,
+                            "original": raw_line.strip(),
+                        })
+                continue
+
+            if active_l2_rule is None:
+                active_l2_rule = section_matches[0]
+            if active_l2_rule not in section_matches:
+                continue
+
+            section_title = self._clean_section_title(title_text)
+            section_key = self._normalize_heading_key(section_title)
+            existing_keys = {
+                self._normalize_heading_key(sub.get("title", ""))
+                for sub in current_chapter.get("subsections", [])
+            }
+            if section_key in existing_keys:
+                continue
+
+            current_chapter["subsections"].append({
+                "title": section_title,
+                "page": str(page or current_chapter.get("page", 1)),
+                "level": 2,
+                "original": raw_line.strip(),
+            })
+
+        return chapters
+
+    @classmethod
+    def _sanitize_catalog_chapters(cls, chapters: Any) -> List[Dict[str, Any]]:
+        if not isinstance(chapters, list):
+            return []
+
+        sanitized: List[Dict[str, Any]] = []
+        seen_chapter_keys: Set[str] = set()
+
+        for idx, chapter in enumerate(chapters, 1):
+            if not isinstance(chapter, dict):
+                continue
+
+            chapter_title = cls._clean_chapter_title(str(chapter.get("title", "") or ""))
+            chapter_key = cls._normalize_heading_key(chapter_title)
+            if not chapter_key or chapter_key in seen_chapter_keys:
+                continue
+
+            seen_chapter_keys.add(chapter_key)
+            chapter_page = str(chapter.get("page") or idx)
+            subsections: List[Dict[str, Any]] = []
+            seen_section_keys: Set[str] = set()
+
+            for subsection in chapter.get("subsections", []) or []:
+                if not isinstance(subsection, dict):
+                    continue
+
+                section_title = cls._clean_section_title(str(subsection.get("title", "") or ""))
+                section_key = cls._normalize_heading_key(section_title)
+                if not section_key or section_key in seen_section_keys:
+                    continue
+
+                seen_section_keys.add(section_key)
+                subsections.append({
+                    "title": section_title,
+                    "page": str(subsection.get("page") or chapter_page),
+                    "level": 2,
+                    "original": subsection.get("original", "") or section_title,
+                })
+
+            sanitized.append({
+                "index": len(sanitized) + 1,
+                "title": chapter_title,
+                "page": chapter_page,
+                "original": chapter.get("original", "") or chapter_title,
+                "subsections": subsections,
+            })
+
+        return sanitized
+
+    @classmethod
+    def _prepare_catalog_raw_lines(cls, text: str) -> List[str]:
+        raw_lines = [line.strip() for line in text.splitlines() if line.strip()]
+        prepared: List[str] = []
+        index = 0
+
+        while index < len(raw_lines):
+            current = raw_lines[index].strip()
+            compact_current = re.sub(r"\s+", "", current)
+
+            if compact_current in {"目", "錄", "录"} and index + 1 < len(raw_lines):
+                next_compact = re.sub(r"\s+", "", raw_lines[index + 1].strip())
+                if compact_current + next_compact in {"目录", "目錄"}:
+                    prepared.append(compact_current + next_compact)
+                    index += 2
+                    continue
+
+            if cls._is_incomplete_heading_fragment(current) and index + 1 < len(raw_lines):
+                next_line = raw_lines[index + 1].strip()
+                candidate = f"{current} {next_line}".strip()
+                _, candidate_page = cls._split_catalog_entry(candidate)
+                if (
+                    cls._matching_rule_names(candidate, "l1")
+                    or cls._matching_rule_names(candidate, "l2")
+                    or candidate_page is not None
+                ):
+                    prepared.append(candidate)
+                    index += 2
+                    continue
+
+            prepared.append(current)
+            index += 1
+
+        return prepared
+
+    @classmethod
+    def _should_prefer_parsed_catalog(
+        cls,
+        parsed_chapters: List[Dict[str, Any]],
+        existing_chapters: List[Dict[str, Any]],
+    ) -> bool:
+        if not parsed_chapters:
+            return False
+
+        parsed_is_suspicious = cls._catalog_has_suspicious_structure(parsed_chapters)
+        existing_is_suspicious = cls._catalog_has_suspicious_structure(existing_chapters)
+
+        if parsed_is_suspicious:
+            if not existing_chapters or not existing_is_suspicious:
+                return False
+
+            parsed_score = cls._catalog_structure_score(parsed_chapters)
+            existing_score = cls._catalog_structure_score(existing_chapters)
+            overlap_ratio = cls._catalog_chapter_overlap_ratio(parsed_chapters, existing_chapters)
+            return overlap_ratio >= 0.6 and parsed_score > existing_score
+
+        if not existing_chapters:
+            return True
+
+        if existing_is_suspicious:
+            return True
+
+        if cls._should_prefer_single_level_parsed_catalog(parsed_chapters, existing_chapters):
+            return True
+
+        parsed_score = cls._catalog_structure_score(parsed_chapters)
+        existing_score = cls._catalog_structure_score(existing_chapters)
+        if parsed_score <= existing_score:
+            return False
+
+        if not cls._catalog_has_suspicious_structure(existing_chapters):
+            existing_count = len(existing_chapters)
+            parsed_count = len(parsed_chapters)
+            if parsed_count > max(existing_count * 2, existing_count + 8):
+                return False
+            if existing_count >= 4 and parsed_count < max(2, existing_count // 2):
+                return False
+
+        return True
+
+    @classmethod
+    def _should_prefer_single_level_parsed_catalog(
+        cls,
+        parsed_chapters: List[Dict[str, Any]],
+        existing_chapters: List[Dict[str, Any]],
+    ) -> bool:
+        """特判“单层目录被误识别成一章多节”的场景。"""
+        if len(parsed_chapters) < 2 or len(existing_chapters) != 1:
+            return False
+
+        if any(chapter.get("subsections") for chapter in parsed_chapters):
+            return False
+
+        existing_subsections = existing_chapters[0].get("subsections", []) or []
+        if len(existing_subsections) < len(parsed_chapters) - 1:
+            return False
+
+        parsed_pages = [
+            cls._safe_page_number(chapter.get("page"), 1)
+            for chapter in parsed_chapters
+        ]
+        return parsed_pages == sorted(parsed_pages)
+
+    @classmethod
+    def _catalog_has_suspicious_structure(cls, chapters: List[Dict[str, Any]]) -> bool:
+        if not chapters:
+            return False
+
+        titles = [(chapter.get("title", "") or "").strip() for chapter in chapters]
+        chinese_chapter_count = sum(
+            1 for title in titles
+            if re.match(r"^第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章节部分篇]", title)
+        )
+        numeric_heading_count = sum(
+            1 for title in titles
+            if re.match(r"^\d{1,2}(?:[\..。、])?\s+\S+", title)
+        )
+        embedded_numeric_body_count = 0
+        repeated_chapter_no_count = 0
+        reversed_chapter_no_count = 0
+        seen_chapter_numbers: Set[str] = set()
+        previous_numeric_chapter_no: Optional[int] = None
+
+        for title in titles:
+            chapter_match = re.match(
+                r"^第\s*(\d+|[一二三四五六七八九十百零两]+)\s*[章节部分篇]\s*(.*)$",
+                title,
+            )
+            if not chapter_match:
+                continue
+
+            chapter_no = re.sub(r"\s+", "", chapter_match.group(1))
+            chapter_body = (chapter_match.group(2) or "").strip()
+            if chapter_no in seen_chapter_numbers:
+                repeated_chapter_no_count += 1
+            seen_chapter_numbers.add(chapter_no)
+
+            if chapter_no.isdigit():
+                current_numeric_no = int(chapter_no)
+                if previous_numeric_chapter_no is not None and current_numeric_no < previous_numeric_chapter_no:
+                    reversed_chapter_no_count += 1
+                previous_numeric_chapter_no = current_numeric_no
+
+            if re.match(r"^\d{1,2}(?:\.\d{1,2})*\.?(?:\s+|$)", chapter_body):
+                embedded_numeric_body_count += 1
+
+        if chinese_chapter_count >= 2 and numeric_heading_count >= max(3, chinese_chapter_count // 2):
+            return True
+
+        if chinese_chapter_count >= max(2, len(titles) // 3) and numeric_heading_count >= max(2, len(titles) // 6):
+            return True
+
+        if embedded_numeric_body_count >= max(2, len(titles) // 5):
+            return True
+
+        if repeated_chapter_no_count > 0 or reversed_chapter_no_count > 0:
+            return True
+
+        return False
+
+    @staticmethod
+    def _catalog_structure_score(chapters: List[Dict[str, Any]]) -> int:
+        score = 0
+        for chapter in chapters:
+            score += 1
+            score += len(chapter.get("subsections", []) or [])
+        return score
+
+    @classmethod
+    def _catalog_chapter_overlap_ratio(
+        cls,
+        chapters_a: List[Dict[str, Any]],
+        chapters_b: List[Dict[str, Any]],
+    ) -> float:
+        if not chapters_a or not chapters_b:
+            return 0.0
+
+        keys_a = {
+            cls._catalog_chapter_identity_key(chapter.get("title", ""))
+            for chapter in chapters_a
+            if chapter.get("title")
+        }
+        keys_b = {
+            cls._catalog_chapter_identity_key(chapter.get("title", ""))
+            for chapter in chapters_b
+            if chapter.get("title")
+        }
+        if not keys_a or not keys_b:
+            return 0.0
+
+        return len(keys_a & keys_b) / max(1, min(len(keys_a), len(keys_b)))
+
+    @classmethod
+    def _catalog_chapter_identity_key(cls, title: str) -> str:
+        cleaned = cls._clean_chapter_title(title)
+        if not cleaned:
+            return ""
+
+        chapter_match = re.match(
+            r"^第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章节部分篇]\s*(.*)$",
+            cleaned,
+        )
+        if chapter_match:
+            chapter_body = cls._normalize_heading_key(chapter_match.group(1))
+            if chapter_body:
+                return chapter_body
+
+        numeric_match = re.match(r"^\d{1,2}(?:[\..。、])?\s*(.*)$", cleaned)
+        if numeric_match:
+            numeric_body = cls._normalize_heading_key(numeric_match.group(1))
+            if numeric_body:
+                return numeric_body
+
+        return cls._normalize_heading_key(cleaned)
+
+    @classmethod
+    def _merge_catalog_chapters(
+        cls,
+        base_chapters: List[Dict[str, Any]],
+        supplemental_chapters: List[Dict[str, Any]],
+    ) -> List[Dict[str, Any]]:
+        if not base_chapters:
+            return supplemental_chapters or []
+        if not supplemental_chapters:
+            return base_chapters
+
+        merged: List[Dict[str, Any]] = []
+        supplemental_by_key = {
+            cls._catalog_chapter_identity_key(chapter.get("title", "")): chapter
+            for chapter in supplemental_chapters
+            if chapter.get("title")
+        }
+
+        for index, chapter in enumerate(base_chapters, 1):
+            chapter_copy = {
+                **chapter,
+                "subsections": [dict(sub) for sub in chapter.get("subsections", []) or []],
+            }
+            chapter_key = cls._catalog_chapter_identity_key(chapter_copy.get("title", ""))
+            supplemental = supplemental_by_key.get(chapter_key)
+            if supplemental:
+                merged_subsections = cls._merge_catalog_subsections(
+                    chapter_copy.get("subsections", []),
+                    supplemental.get("subsections", []) or [],
+                )
+                chapter_copy["subsections"] = merged_subsections
+            chapter_copy["index"] = index
+            merged.append(chapter_copy)
+
+        return merged
+
+    @classmethod
+    def _merge_catalog_subsections(
+        cls,
+        base_subsections: List[Dict[str, Any]],
+        supplemental_subsections: List[Dict[str, Any]],
+    ) -> List[Dict[str, Any]]:
+        if not base_subsections:
+            return [dict(sub) for sub in supplemental_subsections]
+        if not supplemental_subsections:
+            return [dict(sub) for sub in base_subsections]
+
+        def _subsection_score(items: List[Dict[str, Any]]) -> int:
+            score = 0
+            for item in items:
+                title = (item.get("title", "") or "").strip()
+                if not title:
+                    continue
+                score += 1
+                if re.match(r"^\d+\.\d+(?!\.\d)\.?\s*", title):
+                    score += 3
+                elif re.match(r"^(第\s*[一二三四五六七八九十百零两]+\s*节)", title):
+                    score += 3
+                elif re.match(r"^([一二三四五六七八九十百零两]+[、)\)\]])", title):
+                    score += 3
+                elif re.match(r"^[【\[]\s*\d+\s*[\]】]", title):
+                    score += 3
+                elif re.match(r"^\d{1,2}[\..。、]\s*", title):
+                    score += 1
+            return score
+
+        base_score = _subsection_score(base_subsections)
+        supplemental_score = _subsection_score(supplemental_subsections)
+        if supplemental_score > base_score:
+            return [dict(sub) for sub in supplemental_subsections]
+
+        merged = [dict(sub) for sub in base_subsections]
+        seen_keys = {
+            cls._normalize_heading_key(sub.get("title", ""))
+            for sub in merged
+            if sub.get("title")
+        }
+        for subsection in supplemental_subsections:
+            subsection_key = cls._normalize_heading_key(subsection.get("title", ""))
+            if not subsection_key or subsection_key in seen_keys:
+                continue
+            merged.append(dict(subsection))
+            seen_keys.add(subsection_key)
+        return merged
+
+    @classmethod
+    def _coerce_numeric_catalog_section(
+        cls,
+        title_text: str,
+        document_l1_rules: Optional[List[str]],
+        active_l2_rule: Optional[str],
+    ) -> Optional[str]:
+        if active_l2_rule is not None:
+            return None
+
+        if not document_l1_rules:
+            return None
+
+        if "Rule_1_纯数字派" in document_l1_rules:
+            return None
+
+        if re.match(r"^\d{1,2}(?:[\..。、])?\s*(?!\d)[\u4e00-\u9fa5A-Za-z].*", title_text.strip()):
+            return cls._clean_section_title(title_text)
+
+        return None
+
+    @staticmethod
+    def _split_catalog_entry(line: str) -> Tuple[str, Optional[int]]:
+        cleaned = line.strip()
+        if not cleaned:
+            return "", None
+
+        cleaned = re.sub(r"\s+", " ", cleaned).strip()
+        page_match = re.search(
+            r"(?:[.\u2026\u00b7\u2022·• ]{2,})[-\u2013\u2014 ]*(\d+)\s*[-\u2013\u2014 ]*$",
+            cleaned,
+        )
+        if page_match:
+            title_text = cleaned[:page_match.start()].strip()
+            title_text = re.sub(r"[.\u2026\u00b7\u2022 ]+$", "", title_text).strip()
+            return title_text, int(page_match.group(1))
+
+        return cleaned, None
+
+    @staticmethod
+    def _format_catalog_chapters(chapters: List[Dict[str, Any]]) -> str:
+        lines: List[str] = []
+        for chapter in chapters:
+            title = chapter.get("title", "").strip()
+            if not title:
+                continue
+            lines.append(title)
+            for sub in chapter.get("subsections", []):
+                sub_title = sub.get("title", "").strip()
+                if sub_title:
+                    lines.append(f"  {sub_title}")
+        return "\n".join(lines)
+
+    def _enrich_catalog_with_structure(
+        self,
+        catalog: Dict[str, Any],
+        chapters: Dict[str, Dict[str, Dict[str, Any]]],
+    ) -> Dict[str, Any]:
+        catalog_chapters = catalog.get("chapters", []) if isinstance(catalog, dict) else []
+        if not catalog_chapters or not chapters:
+            return catalog
+
+        enriched = dict(catalog)
+        structure_items = list(chapters.items())
+        structure_by_key = {
+            self._catalog_chapter_identity_key(chapter_title): (chapter_title, sections)
+            for chapter_title, sections in structure_items
+        }
+        used_structure_keys: Set[str] = set()
+
+        enriched_chapters: List[Dict[str, Any]] = []
+        for catalog_chapter in catalog_chapters:
+            chapter_copy = dict(catalog_chapter)
+            chapter_key = self._catalog_chapter_identity_key(chapter_copy.get("title", ""))
+            structure_match = structure_by_key.get(chapter_key)
+            if structure_match is None:
+                enriched_chapters.append(chapter_copy)
+                continue
+
+            structure_title, structure_sections = structure_match
+            used_structure_keys.add(chapter_key)
+            title_payload = structure_sections.get("章节标题", {})
+            chapter_copy["title"] = structure_title
+            chapter_copy["content"] = title_payload.get("content", "")
+            chapter_copy["page_start"] = title_payload.get("page_start", self._safe_page_number(chapter_copy.get("page")))
+            chapter_copy["page_end"] = title_payload.get("page_end", chapter_copy["page_start"])
+
+            structure_subsections = [
+                (section_title, payload)
+                for section_title, payload in structure_sections.items()
+                if section_title != "章节标题"
+            ]
+            catalog_subsections = chapter_copy.get("subsections", []) or []
+            subsection_by_key = {
+                self._normalize_heading_key(subsection.get("title", "")): subsection
+                for subsection in catalog_subsections
+                if subsection.get("title")
+            }
+
+            enriched_subsections: List[Dict[str, Any]] = []
+            for section_title, payload in structure_subsections:
+                section_key = self._normalize_heading_key(section_title)
+                subsection = dict(subsection_by_key.get(section_key, {}))
+                subsection.setdefault("title", section_title)
+                subsection.setdefault("page", str(payload.get("page_start", chapter_copy["page_start"])))
+                subsection.setdefault("level", 2)
+                subsection.setdefault("original", section_title)
+                subsection["content"] = payload.get("content", "")
+                subsection["page_start"] = payload.get("page_start", chapter_copy["page_start"])
+                subsection["page_end"] = payload.get("page_end", subsection["page_start"])
+                enriched_subsections.append(subsection)
+
+            chapter_copy["subsections"] = enriched_subsections
+            enriched_chapters.append(chapter_copy)
+
+        existing_catalog_keys = {
+            self._catalog_chapter_identity_key(chapter.get("title", ""))
+            for chapter in enriched_chapters
+            if chapter.get("title")
+        }
+        for chapter_title, structure_sections in structure_items:
+            chapter_key = self._catalog_chapter_identity_key(chapter_title)
+            if chapter_key in existing_catalog_keys or chapter_key in used_structure_keys:
+                continue
+
+            title_payload = structure_sections.get("章节标题", {})
+            new_chapter = {
+                "index": len(enriched_chapters) + 1,
+                "title": chapter_title,
+                "page": str(title_payload.get("page_start", 1)),
+                "original": chapter_title,
+                "content": title_payload.get("content", ""),
+                "page_start": title_payload.get("page_start", 1),
+                "page_end": title_payload.get("page_end", title_payload.get("page_start", 1)),
+                "subsections": [],
+            }
+            for section_title, payload in structure_sections.items():
+                if section_title == "章节标题":
+                    continue
+                new_chapter["subsections"].append({
+                    "title": section_title,
+                    "page": str(payload.get("page_start", new_chapter["page_start"])),
+                    "level": 2,
+                    "original": section_title,
+                    "content": payload.get("content", ""),
+                    "page_start": payload.get("page_start", new_chapter["page_start"]),
+                    "page_end": payload.get("page_end", payload.get("page_start", new_chapter["page_start"])),
+                })
+            enriched_chapters.append(new_chapter)
+
+        for index, chapter in enumerate(enriched_chapters, 1):
+            chapter["index"] = index
+
+        enriched["chapters"] = enriched_chapters
+        enriched["total_chapters"] = len(enriched_chapters)
+        enriched["formatted_text"] = self._format_catalog_chapters(enriched_chapters)
+        return enriched
+
+    def _reconcile_structure_with_catalog(
+        self,
+        chapters: Dict[str, Dict[str, Dict[str, Any]]],
+        catalog: Dict[str, Any],
+    ) -> Dict[str, Dict[str, Dict[str, Any]]]:
+        """把正文抽取结果挂回目录骨架。
+
+        正文抽取结果通常 content 更完整,但层级可能漏掉;
+        目录结果层级更稳,但 content 为空或不完整。
+        这里按标题归一化后顺序匹配,把正文内容重新映射回目录结构。
+        """
+        catalog_chapters = catalog.get("chapters", []) if isinstance(catalog, dict) else []
+        if not chapters or not catalog_chapters:
+            return chapters
+
+        section_title_key = "章节标题"
+        # 将正文结构拆成“章标题内容”和“所有节标题内容”两条索引,方便后续按目录顺序逐项匹配。
+        chapter_title_payloads: Dict[str, List[Dict[str, Any]]] = {}
+        flat_sections: List[Tuple[str, Dict[str, Any]]] = []
+        matched_chapter_count = 0
+        matched_section_count = 0
+        total_catalog_sections = 0
+
+        for chapter_title, sections in chapters.items():
+            title_key = self._normalize_heading_key(chapter_title)
+            title_payload = sections.get(section_title_key)
+            if title_payload is not None:
+                chapter_title_payloads.setdefault(title_key, []).append({
+                    "content": title_payload.get("content", ""),
+                    "page_start": title_payload.get("page_start", 1),
+                    "page_end": title_payload.get("page_end", title_payload.get("page_start", 1)),
+                })
+
+            for section_title, payload in sections.items():
+                if section_title == section_title_key:
+                    continue
+                flat_sections.append((
+                    self._normalize_heading_key(section_title),
+                    {
+                        "content": payload.get("content", ""),
+                        "page_start": payload.get("page_start", 1),
+                        "page_end": payload.get("page_end", payload.get("page_start", 1)),
+                    },
+                ))
+
+        rebuilt: Dict[str, Dict[str, Dict[str, Any]]] = {}
+        # 优先按顺序向后匹配,找不到时再全局回退一次,兼顾正确率和容错性。
+        search_start = 0
+        used_indices = set()
+
+        for chapter in catalog_chapters:
+            chapter_title = (chapter.get("title", "") or "").strip()
+            if not chapter_title:
+                continue
+
+            chapter_page = self._safe_page_number(chapter.get("page"))
+            chapter_key = self._normalize_heading_key(chapter_title)
+            title_candidates = chapter_title_payloads.get(chapter_key, [])
+            has_title_match = bool(title_candidates)
+            title_payload = title_candidates.pop(0) if title_candidates else self._empty_section_payload(chapter_page)
+            if has_title_match:
+                matched_chapter_count += 1
+
+            rebuilt[chapter_title] = {
+                section_title_key: title_payload,
+            }
+
+            for subsection in chapter.get("subsections", []):
+                section_title = (subsection.get("title", "") or "").strip()
+                if not section_title:
+                    continue
+                total_catalog_sections += 1
+
+                target_key = self._normalize_heading_key(section_title)
+                match_index = None
+                for idx in range(search_start, len(flat_sections)):
+                    if idx in used_indices:
+                        continue
+                    if flat_sections[idx][0] == target_key:
+                        match_index = idx
+                        break
+                if match_index is None:
+                    for idx, (section_key, _) in enumerate(flat_sections):
+                        if idx in used_indices:
+                            continue
+                        if section_key == target_key:
+                            match_index = idx
+                            break
+
+                if match_index is not None:
+                    used_indices.add(match_index)
+                    search_start = max(search_start, match_index + 1)
+                    rebuilt[chapter_title][section_title] = flat_sections[match_index][1]
+                    matched_section_count += 1
+                else:
+                    rebuilt[chapter_title][section_title] = self._empty_section_payload(
+                        self._safe_page_number(subsection.get("page"), chapter_page)
+                    )
+
+        if total_catalog_sections > 0 and matched_section_count == 0:
+            return chapters
+
+        if matched_chapter_count == 0 and matched_section_count == 0:
+            return chapters
+
+        return rebuilt or chapters
+
+    @staticmethod
+    def _normalize_heading_key(text: str) -> str:
+        normalized = PdfStructureExtractor._strip_catalog_page_suffix((text or "").strip())
+        normalized = normalized.replace("【", "[").replace("】", "]")
+        normalized = normalized.replace("(", "(").replace(")", ")")
+        normalized = normalized.replace(".", ".").replace("。", ".")
+        normalized = re.sub(r"\s+", "", normalized)
+        return normalized
+
+    @staticmethod
+    def _safe_page_number(value: Any, default: int = 1) -> int:
+        try:
+            return max(1, int(str(value).strip()))
+        except Exception:
+            return default
+
+    @staticmethod
+    def _empty_section_payload(page_num: int) -> Dict[str, Any]:
+        return {
+            "content": "",
+            "page_start": page_num,
+            "page_end": page_num,
+        }
+
+    @classmethod
+    def _prepare_page_lines(cls, text: str) -> List[str]:
+        """清洗页面文本行,并尝试把被换行拆开的标题重新合并。"""
+        raw_lines = [line.strip() for line in text.split("\n") if line.strip()]
+        prepared_lines: List[str] = []
+        index = 0
+
+        while index < len(raw_lines):
+            merged_line, consumed = cls._merge_heading_fragment(raw_lines, index)
+            if merged_line:
+                prepared_lines.append(merged_line)
+                index += consumed
+                continue
+
+            prepared_lines.append(raw_lines[index])
+            index += 1
+
+        return prepared_lines
+
+    @classmethod
+    def _merge_heading_fragment(
+        cls,
+        lines: List[str],
+        start_index: int,
+    ) -> Tuple[Optional[str], int]:
+        """尝试把当前位置开始的 2~3 行拼成完整标题。"""
+        first_line = lines[start_index].strip()
+        if not first_line:
+            return None, 1
+
+        first_is_heading = bool(cls._matching_rule_names(first_line, "l1") or cls._matching_rule_names(first_line, "l2"))
+        first_is_incomplete = cls._is_incomplete_heading_fragment(first_line)
+        max_span = min(3, len(lines) - start_index)
+
+        for span in range(2, max_span + 1):
+            candidate_lines = [lines[start_index + offset].strip() for offset in range(span)]
+            candidate_text = " ".join(candidate_lines).strip()
+            if not candidate_text or cls.TOC_PATTERN.search(candidate_text):
+                continue
+            if not (cls._matching_rule_names(candidate_text, "l1") or cls._matching_rule_names(candidate_text, "l2")):
+                continue
+            # 只有首行本身像“半截标题”,或者合并后明显更像标题时才吞并后续行,避免误吃正文。
+            if first_is_incomplete or not first_is_heading:
+                return candidate_text, span
+
+        return None, 1
+
+    @staticmethod
+    def _is_incomplete_heading_fragment(line: str) -> bool:
+        clean_line = re.sub(r"\s+", "", line.strip())
+        if not clean_line:
+            return False
+
+        fragment_patterns = (
+            r"^第(?:\d+|[一二三四五六七八九十百零两]+)[章部分篇]$",
+            r"^\d{1,2}(?:[\..。、])$",
+            r"^[【\[]\d+[\]】]$",
+            r"^[一二三四五六七八九十百零两]+[、)\)\]]$",
+            r"^第[一二三四五六七八九十百零两]+节$",
+            r"^\d+\.\d+(?!\.\d)\.?$",
+        )
+        return any(re.match(pattern, clean_line) for pattern in fragment_patterns)
+
+    def _rebuild_section_contents_from_catalog(
+        self,
+        chapters: Dict[str, Dict[str, Dict[str, Any]]],
+        catalog: Dict[str, Any],
+        body_lines: List[Dict[str, Any]],
+    ) -> Dict[str, Dict[str, Dict[str, Any]]]:
+        """基于目录顺序和正文行号,重新切分 section content。
+
+        当正文结构抽取漏掉部分标题时,直接使用结构化结果容易出现 content 缺段。
+        这里把目录拍平成一条标题时间线,再在线性正文里定位这些标题,
+        用“当前标题到下一个标题”之间的文本作为当前 section 的正文。
+        """
+        catalog_chapters = catalog.get("chapters", []) if isinstance(catalog, dict) else []
+        if not catalog_chapters or not body_lines:
+            return chapters
+
+        # 先把目录展开成顺序列表,方便统一定位每个标题在正文中的起点。
+        expected_items: List[Dict[str, Any]] = []
+        total_sections = 0
+        for chapter in catalog_chapters:
+            chapter_title = (chapter.get("title", "") or "").strip()
+            if not chapter_title:
+                continue
+            chapter_page = self._safe_page_number(chapter.get("page"))
+            expected_items.append({
+                "kind": "chapter",
+                "title": chapter_title,
+                "chapter_title": chapter_title,
+                "section_title": "章节标题",
+                "page_hint": chapter_page,
+                "line_index": None,
+                "page": chapter_page,
+            })
+
+            for subsection in chapter.get("subsections", []):
+                section_title = (subsection.get("title", "") or "").strip()
+                if not section_title:
+                    continue
+                total_sections += 1
+                expected_items.append({
+                    "kind": "section",
+                    "title": section_title,
+                    "chapter_title": chapter_title,
+                    "section_title": section_title,
+                    "page_hint": self._safe_page_number(subsection.get("page"), chapter_page),
+                    "line_index": None,
+                    "page": self._safe_page_number(subsection.get("page"), chapter_page),
+                })
+
+        if not expected_items or total_sections == 0:
+            return chapters
+
+        search_start = 0
+        found_sections = 0
+        for item in expected_items:
+            line_index = self._find_heading_line_index(
+                body_lines,
+                item["title"],
+                item["kind"],
+                search_start,
+            )
+            item["line_index"] = line_index
+            if line_index is not None:
+                item["page"] = body_lines[line_index]["page"]
+                search_start = line_index + 1
+                if item["kind"] == "section":
+                    found_sections += 1
+
+        if found_sections == 0:
+            return chapters
+
+        rebuilt: Dict[str, Dict[str, Dict[str, Any]]] = {}
+        section_title_key = "章节标题"
+
+        for chapter in catalog_chapters:
+            chapter_title = (chapter.get("title", "") or "").strip()
+            if not chapter_title:
+                continue
+
+            chapter_page = self._safe_page_number(chapter.get("page"))
+            existing_sections = chapters.get(chapter_title, {})
+            rebuilt[chapter_title] = {
+                section_title_key: existing_sections.get(section_title_key, self._empty_section_payload(chapter_page))
+            }
+
+            for subsection in chapter.get("subsections", []):
+                section_title = (subsection.get("title", "") or "").strip()
+                if not section_title:
+                    continue
+                rebuilt[chapter_title][section_title] = existing_sections.get(
+                    section_title,
+                    self._empty_section_payload(self._safe_page_number(subsection.get("page"), chapter_page)),
+                )
+
+        for idx, item in enumerate(expected_items):
+            if item["kind"] != "section" or item["line_index"] is None:
+                continue
+
+            # 下一个已定位标题就是当前 section 的右边界;没有下一个则取到文末。
+            next_heading_index = len(body_lines)
+            for later in expected_items[idx + 1:]:
+                if later["line_index"] is not None:
+                    next_heading_index = later["line_index"]
+                    break
+
+            content_entries = body_lines[item["line_index"] + 1:next_heading_index]
+            content_text = "\n".join(entry["text"] for entry in content_entries).strip()
+            existing_payload = rebuilt[item["chapter_title"]].get(item["section_title"], {})
+
+            if not content_text and (existing_payload.get("content") or "").strip():
+                continue
+
+            if content_entries:
+                page_start = content_entries[0]["page"]
+                page_end = content_entries[-1]["page"]
+            else:
+                page_start = item["page"]
+                page_end = item["page"]
+
+            rebuilt[item["chapter_title"]][item["section_title"]] = {
+                "content": content_text,
+                "page_start": page_start,
+                "page_end": page_end,
+            }
+
+        return rebuilt or chapters
+
+    def _find_heading_line_index(
+        self,
+        body_lines: List[Dict[str, Any]],
+        target_title: str,
+        heading_kind: str,
+        start_index: int,
+    ) -> Optional[int]:
+        """在线性正文中查找目标标题行。
+
+        先做归一化后的精确匹配;若 OCR / PDF 抽取给标题前面带了噪声前缀,
+        再退一步做“候选行后缀等于目标标题”的宽松匹配。
+        """
+        target_key = self._normalize_heading_key(target_title)
+        if not target_key:
+            return None
+
+        for index in range(start_index, len(body_lines)):
+            candidate_text = (body_lines[index].get("text") or "").strip()
+            if not candidate_text or self.TOC_PATTERN.search(candidate_text):
+                continue
+
+            if heading_kind == "chapter":
+                candidate_key = self._normalize_heading_key(self._clean_chapter_title(candidate_text))
+            else:
+                candidate_key = self._normalize_heading_key(self._clean_section_title(candidate_text))
+
+            if candidate_key == target_key:
+                return index
+
+            raw_candidate_key = self._normalize_heading_key(candidate_text)
+            # 某些 PDF 会把页码、序号或残余字符拼到标题前面,这里允许有限前缀噪声。
+            if raw_candidate_key.endswith(target_key):
+                prefix = raw_candidate_key[:-len(target_key)]
+                if not prefix or re.fullmatch(
+                    r"[\dA-Za-z\.\-_/|,:;()\[\]\u3001\u3002\uff0c\uff1a\uff1b\uff08\uff09\u3010\u3011]+",
+                    prefix,
+                ):
+                    return index
+
+        return None
+
+    def _process_ocr_concurrent(self, regions: List[TableRegion], progress_callback=None) -> List[OcrResult]:
+        """同步并发处理 OCR(使用 ThreadPoolExecutor)"""
+        results: List[OcrResult] = []
+        total = len(regions)
+        completed = 0
+
+        with ThreadPoolExecutor(max_workers=self.OCR_CONCURRENT_WORKERS) as executor:
+            # 提交所有任务
+            future_to_region = {
+                executor.submit(self._ocr_table_region, r.page, r.bbox): r
+                for r in regions
+            }
+
+            # 处理完成的结果
+            for future in as_completed(future_to_region):
+                region = future_to_region[future]
+                completed += 1
+                try:
+                    text = future.result()
+                    results.append(OcrResult(
+                        page_num=region.page_num,
+                        bbox=region.bbox,
+                        score=region.score,
+                        text=text,
+                        success=True,
+                    ))
+                except Exception as e:
+                    logger.error(f"  第 {region.page_num} 页表格 OCR 失败: {e}")
+                    results.append(OcrResult(
+                        page_num=region.page_num,
+                        bbox=region.bbox,
+                        score=region.score,
+                        text="",
+                        success=False,
+                    ))
+
+                # 每完成5个或最后一个时推送进度
+                if progress_callback and (completed % 5 == 0 or completed == total):
+                    progress = 35 + int(completed / total * 15)  # OCR执行占15%进度(35-50)
+                    progress_callback("版面分析", progress, f"OCR识别中 {completed}/{total}")
+
+        return results
+
+    def _detect_table_regions(
+        self,
+        page: fitz.Page,
+        page_num: int,
+        clip_box: fitz.Rect
+    ) -> List[Tuple[Tuple[float, float, float, float], float]]:
+        """检测页面中的表格区域,返回坐标列表"""
+        table_regions: List[Tuple[Tuple[float, float, float, float], float]] = []
+
+        if not RAPID_LAYOUT_AVAILABLE:
+            return table_regions
+
+        layout_engine = self._get_layout_engine()
+        if layout_engine is None:
+            return table_regions
+
+        # 渲染页面(裁剪区域)
+        pix = page.get_pixmap(dpi=self.OCR_DPI, clip=clip_box)
+        img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, 3)
+
+        try:
+            layout_output = layout_engine(img)
+
+            # 解析版面结果
+            if hasattr(layout_output, 'boxes') and hasattr(layout_output, 'class_names'):
+                # 获取缩放比例
+                scale_x = clip_box.width / img.shape[1]
+                scale_y = clip_box.height / img.shape[0]
+
+                for box, label, score in zip(layout_output.boxes, layout_output.class_names, layout_output.scores):
+                    if label == "table" and score > self.OCR_CONFIDENCE_THRESHOLD:
+                        # 转换为 PDF 坐标
+                        pdf_x1 = clip_box.x0 + box[0] * scale_x
+                        pdf_y1 = clip_box.y0 + box[1] * scale_y
+                        pdf_x2 = clip_box.x0 + box[2] * scale_x
+                        pdf_y2 = clip_box.y0 + box[3] * scale_y
+
+                        table_regions.append(((pdf_x1, pdf_y1, pdf_x2, pdf_y2), score))
+
+        except Exception as e:
+            logger.warning(f"  第 {page_num} 页: 版面分析失败 ({e})")
+
+        return table_regions
+
+    def _ocr_table_region(self, page: fitz.Page, bbox: Tuple[float, float, float, float], max_retries: int = 3) -> str:
+        """对指定区域进行 OCR 识别(使用 GLM-OCR),支持指数退避重试"""
+        import time
+
+        # 渲染指定区域
+        rect = fitz.Rect(bbox)
+        pix = page.get_pixmap(dpi=self.OCR_DPI, clip=rect)
+        img_bytes = pix.tobytes("jpeg")
+
+        # 压缩图片
+        compressed = self._compress_image(img_bytes)
+        img_base64 = base64.b64encode(compressed).decode('utf-8')
+
+        # 请求 OCR
+        payload = {
+            "model": "GLM-OCR",
+            "messages": [
+                {
+                    "role": "user",
+                    "content": [
+                        {
+                            "type": "text",
+                            "text": "识别图片中的表格内容,按原文排版输出。"
+                                    "注意:"
+                                    "1. 表格用 Markdown 表格格式"
+                                    "2. 保持换行和列对齐"
+                                    "3. 只输出表格内容,不要其他说明"
+                        },
+                        {
+                            "type": "image_url",
+                            "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"}
+                        }
+                    ]
+                }
+            ],
+            "max_tokens": 2048,
+            "temperature": 0.1
+        }
+
+        headers = {"Content-Type": "application/json"}
+        if self.ocr_api_key:
+            headers["Authorization"] = f"Bearer {self.ocr_api_key}"
+
+        # 指数退避重试
+        last_error = None
+        for attempt in range(max_retries):
+            try:
+                response = requests.post(
+                    self.ocr_api_url,
+                    headers=headers,
+                    json=payload,
+                    timeout=self.ocr_timeout
+                )
+                response.raise_for_status()
+
+                result = response.json()
+                return self._extract_ocr_content(result)
+
+            except Exception as e:
+                last_error = e
+                if attempt < max_retries - 1:
+                    # 指数退避: 2, 4, 8 秒
+                    wait_time = 2 ** (attempt + 1)
+                    logger.warning(f"  第 {page.number + 1} 页表格 OCR 第 {attempt + 1} 次失败: {e}, {wait_time}秒后重试...")
+                    time.sleep(wait_time)
+                else:
+                    logger.error(f"  第 {page.number + 1} 页表格 OCR 最终失败(已重试{max_retries}次): {e}")
+
+        # 所有重试都失败,抛出最后一个错误
+        raise last_error
+
+    def _replace_table_regions(
+        self,
+        page: fitz.Page,
+        original_text: str,
+        ocr_results: List[Dict],
+        clip_box: fitz.Rect
+    ) -> str:
+        """用 OCR 结果替换原始文本中的表格区域"""
+        if not ocr_results:
+            return original_text
+
+        text_blocks = []
+        for block in page.get_text("blocks"):
+            x0, y0, x1, y1, text, _, _ = block
+            if y0 >= clip_box.y0 and y1 <= clip_box.y1:
+                text_blocks.append({
+                    "bbox": (x0, y0, x1, y1),
+                    "text": text.strip(),
+                })
+
+        text_blocks.sort(key=lambda b: (b["bbox"][1], b["bbox"][0]))
+
+        if not text_blocks:
+            return original_text
+
+        region_entries: List[Dict[str, Any]] = []
+        replaced_indices: Set[int] = set()
+
+        for ocr_result in sorted(ocr_results, key=lambda r: r["bbox"][1]):
+            rx0, ry0, rx1, ry1 = ocr_result["bbox"]
+            current_indices: List[int] = []
+
+            for idx, block in enumerate(text_blocks):
+                if idx in replaced_indices:
+                    continue
+                if self._block_contains_heading(block["text"]):
+                    continue
+
+                bx0, by0, bx1, by1 = block["bbox"]
+                overlap_x = max(0, min(bx1, rx1) - max(bx0, rx0))
+                overlap_y = max(0, min(by1, ry1) - max(by0, ry0))
+                overlap_area = overlap_x * overlap_y
+                block_area = max((bx1 - bx0) * (by1 - by0), 1)
+
+                if overlap_area / block_area > 0.5:
+                    current_indices.append(idx)
+
+            if not current_indices:
+                continue
+
+            replaced_indices.update(current_indices)
+            region_entries.append({
+                "start": min(current_indices),
+                "end": max(current_indices),
+                "ocr_text": (ocr_result.get("ocr_text") or "").strip(),
+            })
+
+        if not region_entries:
+            return original_text
+
+        region_by_start = {entry["start"]: entry for entry in region_entries}
+        result_parts: List[str] = []
+        idx = 0
+
+        while idx < len(text_blocks):
+            region = region_by_start.get(idx)
+            if region is not None:
+                if region["ocr_text"]:
+                    result_parts.append(region["ocr_text"])
+                    result_parts.append("\n")
+                else:
+                    for block_idx in range(region["start"], region["end"] + 1):
+                        block_text = text_blocks[block_idx]["text"]
+                        if block_text:
+                            result_parts.append(block_text)
+                            result_parts.append("\n")
+                idx = region["end"] + 1
+                continue
+
+            if idx not in replaced_indices:
+                block_text = text_blocks[idx]["text"]
+                if block_text:
+                    result_parts.append(block_text)
+                    result_parts.append("\n")
+            idx += 1
+
+        return "".join(result_parts).strip() or original_text
+
+    @classmethod
+    def _block_contains_heading(cls, text: str) -> bool:
+        if not text or not text.strip():
+            return False
+
+        for line in cls._prepare_page_lines(text):
+            stripped = line.strip()
+            if not stripped:
+                continue
+            if cls._matching_rule_names(stripped, "l1") or cls._matching_rule_names(stripped, "l2"):
+                return True
+        return False
+
+    def _compress_image(self, img_bytes: bytes) -> bytes:
+        """压缩图片"""
+        try:
+            from PIL import Image
+            img = Image.open(io.BytesIO(img_bytes))
+
+            if img.mode in ('RGBA', 'LA', 'P'):
+                background = Image.new('RGB', img.size, (255, 255, 255))
+                if img.mode == 'P':
+                    img = img.convert('RGBA')
+                if img.mode in ('RGBA', 'LA'):
+                    background.paste(img, mask=img.split()[-1])
+                img = background
+            elif img.mode != 'RGB':
+                img = img.convert('RGB')
+
+            min_edge = min(img.size)
+            if min_edge > self.MAX_SHORT_EDGE:
+                ratio = self.MAX_SHORT_EDGE / min_edge
+                new_size = (int(img.width * ratio), int(img.height * ratio))
+                img = img.resize(new_size, Image.Resampling.LANCZOS)
+
+            buffer = io.BytesIO()
+            img.save(buffer, format='JPEG', quality=self.JPEG_QUALITY, optimize=True)
+            return buffer.getvalue()
+
+        except Exception as e:
+            logger.warning(f"图片压缩失败,使用原图: {e}")
+            return img_bytes
+
+    def _extract_ocr_content(self, result: Dict) -> str:
+        """从 OCR 响应提取内容,并将 HTML 表格转换为 Markdown"""
+        content = ""
+        if "choices" in result and isinstance(result["choices"], list):
+            if len(result["choices"]) > 0:
+                message = result["choices"][0].get("message", {})
+                content = message.get("content", "")
+
+        # 如果内容包含 HTML 标签,转换为 Markdown
+        if content and "<" in content and ">" in content:
+            try:
+                from ..doc_worker.pdf_worker.html_to_markdown import convert_html_to_markdown
+                content = convert_html_to_markdown(content)
+            except Exception as e:
+                logger.debug(f"HTML 转 Markdown 失败,保留原始内容: {e}")
+
+        return content
+
+    @staticmethod
+    def _is_header_footer(line: str) -> bool:
+        compact_line = re.sub(r"\s+", "", line.strip())
+        if not compact_line:
+            return False
+
+        heading_prefix = re.match(
+            r"^(第[\d一二三四五六七八九十百零两]+[章节部分篇]|[\d]+\.\d+|[\d]+[\..。、]?|[一二三四五六七八九十百零两]+[、)\)\]]|第[一二三四五六七八九十百零两]+节|【\d+】)",
+            compact_line,
+        )
+
+        if compact_line.isdigit():
+            return True
+
+        if (
+            compact_line.endswith("有限责任公司")
+            or compact_line.endswith("有限公司")
+            or compact_line.endswith("股份有限公司")
+        ) and not heading_prefix:
+            return True
+
+        if compact_line.endswith("专项施工方案") and not heading_prefix:
+            return True
+
+        return (
+            "四川路桥建设集团股份有限公司" in line
+            or "T梁运输及安装专项施工方案" in line
+            or (
+                compact_line.endswith("工程项目")
+                and len(compact_line) >= 8
+                and not compact_line.startswith("第")
+            )
+        )
+
+    @classmethod
+    def _matching_rule_names(
+        cls,
+        line: str,
+        level: str,
+        rule_names: Optional[List[str]] = None,
+    ) -> List[str]:
+        clean_line = line.strip()
+        if level == "l1":
+            clean_line = cls._strip_leading_page_number_from_cn_chapter(clean_line)
+        names = rule_names or list(cls.RULE_LIB.keys())
+        return [
+            rule_name
+            for rule_name in names
+            if cls.RULE_LIB[rule_name][level].match(clean_line)
+        ]
+
+    @classmethod
+    def _matches_chapter_heading(cls, line: str, rule_names: Optional[List[str]] = None) -> bool:
+        return bool(cls._matching_rule_names(line, "l1", rule_names))
+
+    @classmethod
+    def _matches_section_heading(cls, line: str, rule_names: Optional[List[str]] = None) -> bool:
+        return bool(cls._matching_rule_names(line, "l2", rule_names))
+
+    @staticmethod
+    def _strip_leading_page_number_from_cn_chapter(line: str) -> str:
+        cleaned = re.sub(r"\s+", " ", line.strip())
+        if not cleaned:
+            return ""
+
+        return re.sub(
+            r"^\d{1,3}\s+(?=第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章部分篇])",
+            "",
+            cleaned,
+            count=1,
+        ).strip()
+
+    @staticmethod
+    def _clean_chapter_title(line: str) -> str:
+        cleaned = PdfStructureExtractor._strip_leading_page_number_from_cn_chapter(line)
+        cleaned = PdfStructureExtractor._strip_catalog_page_suffix(cleaned)
+        cleaned = re.sub(r"\s+\d+\s*$", "", cleaned)
+        cleaned = re.sub(r"[\._\-]{3,}[^\u4e00-\u9fa5a-zA-Z0-9]*$", "", cleaned)
+        cleaned = re.sub(r"\s+", " ", cleaned).strip()
+
+        cn_chapter_match = re.match(
+            r"^(第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章部分篇])[\s、::\.-]*(.*)$",
+            cleaned,
+        )
+        if cn_chapter_match:
+            prefix = re.sub(r"\s+", "", cn_chapter_match.group(1))
+            title = cn_chapter_match.group(2).strip()
+            return f"{prefix} {title}".strip()
+
+        num_chapter_match = re.match(r"^(\d{1,2})(?:[\..。、])?\s*(.*)$", cleaned)
+        if num_chapter_match:
+            prefix = num_chapter_match.group(1)
+            title = num_chapter_match.group(2).strip()
+            return f"{prefix} {title}".strip()
+
+        return cleaned
+
+    @staticmethod
+    def _clean_section_title(line: str) -> str:
+        cleaned = line.strip()
+        cleaned = PdfStructureExtractor._strip_catalog_page_suffix(cleaned)
+        cleaned = re.sub(r"\s+\d+\s*$", "", cleaned)
+        cleaned = re.sub(r"[\._\-]{3,}[^\u4e00-\u9fa5a-zA-Z0-9]*$", "", cleaned)
+        cleaned = re.sub(r"\s+", " ", cleaned).strip()
+
+        numeric_section_match = re.match(r"^(\d+\.\d+)(?!\.\d)\.?\s*(.*)$", cleaned)
+        if numeric_section_match:
+            prefix = numeric_section_match.group(1)
+            title = numeric_section_match.group(2).strip()
+            return f"{prefix} {title}".strip()
+
+        numeric_list_match = re.match(r"^(\d{1,2})(?:[、\.\uFF0E\u3002\)\]\uFF09])\s*(.*)$", cleaned)
+        if numeric_list_match:
+            prefix = numeric_list_match.group(1)
+            title = numeric_list_match.group(2).strip()
+            return f"{prefix} {title}".strip()
+
+        cn_section_match = re.match(r"^(第\s*[一二三四五六七八九十百零两]+\s*节)[\s、::\.-]*(.*)$", cleaned)
+        if cn_section_match:
+            prefix = re.sub(r"\s+", "", cn_section_match.group(1))
+            title = cn_section_match.group(2).strip()
+            return f"{prefix} {title}".strip()
+
+        cn_list_match = re.match(r"^([一二三四五六七八九十百零两]+[、)\)\]])[\s]*(.*)$", cleaned)
+        if cn_list_match:
+            prefix = cn_list_match.group(1).strip()
+            title = cn_list_match.group(2).strip()
+            return f"{prefix} {title}".strip()
+
+        bracket_match = re.match(r"^([【\[]\s*\d+\s*[\]】])[\s]*(.*)$", cleaned)
+        if bracket_match:
+            prefix = re.sub(r"\s+", "", bracket_match.group(1))
+            title = bracket_match.group(2).strip()
+            return f"{prefix} {title}".strip()
+
+        return cleaned
+
+    @staticmethod
+    def _strip_catalog_page_suffix(text: str) -> str:
+        cleaned = re.sub(r"\s+", " ", (text or "").strip())
+        if not cleaned:
+            return ""
+
+        return re.sub(
+            r"(?:[.\u2026\u00b7\u2022·• ]{2,})[-\u2013\u2014 ]*\d+\s*[-\u2013\u2014 ]*$",
+            "",
+            cleaned,
+        ).strip()

+ 3 - 1
core/construction_review/component/minimal_pipeline/simple_processor.py

@@ -9,6 +9,7 @@
 """
 
 import asyncio
+import json
 import uuid
 from collections import defaultdict
 from typing import Dict, Any, Optional, Tuple, List
@@ -16,7 +17,8 @@ from typing import Dict, Any, Optional, Tuple, List
 from foundation.observability.logger.loggering import review_logger as logger
 from foundation.observability.cachefiles import cache, CacheBaseDir
 
-from .pdf_extractor import PdfStructureExtractor
+from .pdf_extractor2 import PdfStructureExtractor
+#from .pdf_extractor import PdfStructureExtractor
 from .toc_builder import build_toc_items_from_structure
 from .chunk_assembler import assemble_chunks
 from ..doc_worker.classification.hierarchy_classifier import HierarchyClassifier