Browse Source

feat(新的文档结构化规则)

tangle 4 ngày trước cách đây
mục cha
commit
a8bb401516

+ 960 - 0
core/construction_review/component/minimal_pipeline/pdf_extractor2.py

@@ -0,0 +1,960 @@
+"""
+PDF 结构提取器 - 同步并发 OCR 版本
+
+基于 splitter_pdf 逻辑,直接提取章节结构并记录页码。
+支持 OCR 增强:检测表格区域并使用 ThreadPoolExecutor 5并发 OCR,其他文本保持 PyMuPDF 提取。
+输出格式兼容后续分类与组装流程。
+"""
+
+import base64
+import io
+import re
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from dataclasses import dataclass
+from typing import Dict, Any, List, Optional, Tuple
+
+import fitz
+import numpy as np
+import requests
+
+from foundation.observability.logger.loggering import review_logger as logger
+
+# 尝试导入 RapidLayout
+try:
+    from rapid_layout import RapidLayout
+    RAPID_LAYOUT_AVAILABLE = True
+except ImportError:
+    RAPID_LAYOUT_AVAILABLE = False
+    RapidLayout = None
+
+
+@dataclass
+class TableRegion:
+    """表格区域信息"""
+    page_num: int
+    page: fitz.Page
+    bbox: Tuple[float, float, float, float]
+    score: float
+
+
+@dataclass
+class OcrResult:
+    """OCR 结果"""
+    page_num: int
+    bbox: Tuple[float, float, float, float]
+    score: float
+    text: str
+    success: bool
+
+
+class PdfStructureExtractor:
+    """PDF 章节结构提取器(支持 OCR 异步并发)"""
+
+    RULE_LIB = {
+        "Rule_1_纯数字派": {
+            "l1": re.compile(r"^\d{1,2}(?:[\..。、])?\s*(?!\d)[\u4e00-\u9fa5A-Za-z].*"),
+            "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5A-Za-z].*)"),
+        },
+        "Rule_2_混合章派": {
+            "l1": re.compile(r"^第\s*(\d+)\s*[章部分篇][\s、]*(.*)"),
+            "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5A-Za-z].*)"),
+        },
+        "Rule_3_中英混血派": {
+            "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部分篇][\s、]*(.*)"),
+            "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5A-Za-z].*)"),
+        },
+        "Rule_4_传统公文派": {
+            "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部分篇][\s、]*(.*)"),
+            "l2": re.compile(r"^([一二三四五六七八九十百零两]+)[、\s]+([\u4e00-\u9fa5A-Za-z].*)"),
+        },
+        "Rule_5_单边括号派": {
+            "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部分篇][\s、]*(.*)"),
+            "l2": re.compile(r"^([一二三四五六七八九十百零两]+)[)\)\]][\s]*([\u4e00-\u9fa5A-Za-z].*)"),
+        },
+        "Rule_6_小节派": {
+            "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部分篇][\s、]*(.*)"),
+            "l2": re.compile(r"^第\s*([一二三四五六七八九十百零两]+)\s*节[\s、]*([\u4e00-\u9fa5A-Za-z].*)"),
+        },
+        "Rule_7_粗体括号派": {
+            "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部分篇][\s、]*(.*)"),
+            "l2": re.compile(r"^[【\[]\s*(\d+)\s*[\]】][\s]*([\u4e00-\u9fa5A-Za-z].*)"),
+        },
+    }
+    TOC_PATTERN = re.compile(r"\.{3,}|…{2,}")
+
+    # OCR 配置
+    MAX_SHORT_EDGE = 1024
+    JPEG_QUALITY = 90
+    OCR_DPI = 200
+    OCR_CONFIDENCE_THRESHOLD = 0.5
+    OCR_CONCURRENT_WORKERS = 5
+
+    def __init__(
+        self,
+        clip_top: float = 60,
+        clip_bottom: float = 60,
+        use_ocr: bool = False,
+        ocr_api_url: str = "http://183.220.37.46:25429/v1/chat/completions",
+        ocr_timeout: int = 600,
+        ocr_api_key: str = "",
+        detect_toc: bool = True,
+        toc_model_path: str = "config/yolo/best.pt",
+    ):
+        self.clip_top = clip_top
+        self.clip_bottom = clip_bottom
+        self.use_ocr = use_ocr and RAPID_LAYOUT_AVAILABLE
+
+        # OCR 配置
+        self.ocr_api_url = ocr_api_url
+        self.ocr_timeout = ocr_timeout
+        self.ocr_api_key = ocr_api_key
+        self._layout_engine: Optional[Any] = None
+
+        # 目录检测配置
+        self.detect_toc = detect_toc
+        self.toc_model_path = toc_model_path
+        self._toc_extractor = None
+
+        if use_ocr and not RAPID_LAYOUT_AVAILABLE:
+            logger.warning("RapidLayout 未安装,OCR 功能不可用")
+
+    def _get_layout_engine(self) -> Optional[Any]:
+        """延迟初始化 RapidLayout"""
+        if self._layout_engine is None and RAPID_LAYOUT_AVAILABLE:
+            self._layout_engine = RapidLayout()
+        return self._layout_engine
+
+    def extract(self, file_content: bytes, progress_callback=None) -> Dict[str, Any]:
+        """
+        从 PDF 字节流提取章节结构。
+
+        Args:
+            file_content: PDF 文件字节流
+            progress_callback: 进度回调函数,接收 (stage, current, message) 参数
+
+        Returns:
+            {
+                "chapters": {
+                    "第一章 xxx": {
+                        "章节标题": {"content": "...", "page_start": 1, "page_end": 1},
+                        "一、xxx": {"content": "...", "page_start": 2, "page_end": 3},
+                    }
+                },
+                "total_pages": N,
+                "catalog": {  # 目录结构(YOLO检测+OCR提取)
+                    "chapters": [...],
+                    "total_chapters": N
+                }
+            }
+        """
+        result = {"chapters": {}, "total_pages": 0, "catalog": None}
+
+        # === 阶段0: 目录页检测与提取(如果启用)===
+        if self.detect_toc:
+            try:
+                catalog = self._extract_catalog(file_content, progress_callback)
+                if catalog:
+                    catalog = self._normalize_catalog(catalog)
+                    result["catalog"] = catalog
+                    logger.info(f"[PDF提取] 目录提取完成: {catalog.get('total_chapters', 0)} 章")
+            except Exception as e:
+                logger.warning(f"[PDF提取] 目录提取失败: {e}")
+
+        # === 阶段1-3: 文档结构提取 ===
+        doc = fitz.open(stream=file_content)
+        try:
+            structure = self._extract_from_doc(doc, progress_callback)
+            if result.get("catalog"):
+                structure["chapters"] = self._reconcile_structure_with_catalog(
+                    structure.get("chapters", {}),
+                    result["catalog"],
+                )
+            result["chapters"] = structure.get("chapters", {})
+            result["total_pages"] = len(doc)
+            return result
+        finally:
+            doc.close()
+
+    def _extract_catalog(self, file_content: bytes, progress_callback=None) -> Optional[Dict[str, Any]]:
+        """
+        提取目录结构(YOLO检测 + OCR识别)
+
+        Returns:
+            {"chapters": [...], "total_chapters": N} 或 None
+        """
+        # 延迟导入避免循环依赖(YOLO依赖必须存在,否则报错)
+        from .toc_detector import TOCCatalogExtractor
+
+        if self._toc_extractor is None:
+            self._toc_extractor = TOCCatalogExtractor(
+                model_path=self.toc_model_path,
+                ocr_api_url=self.ocr_api_url,
+                ocr_api_key=self.ocr_api_key,
+                ocr_timeout=self.ocr_timeout,
+            )
+
+        return self._toc_extractor.detect_and_extract(file_content, progress_callback)
+
+    def _extract_from_doc(self, doc: fitz.Document, progress_callback=None) -> Dict[str, Any]:
+        """提取文档结构(支持 OCR 异步并发)"""
+
+        def _emit_progress(stage: str, current: int, message: str):
+            """发送进度回调"""
+            if progress_callback:
+                try:
+                    progress_callback(stage, current, message)
+                except Exception:
+                    pass
+
+        # === 阶段1: 收集所有需要 OCR 的表格区域 ===
+        table_regions: List[TableRegion] = []
+
+        if self.use_ocr:
+            logger.info("[OCR预处理] 扫描所有页面的表格区域...")
+            total_pages = len(doc)
+            for page_num in range(total_pages):
+                page = doc.load_page(page_num)
+                rect = page.rect
+                clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
+                regions = self._detect_table_regions(page, page_num + 1, clip_box)
+                for bbox, score in regions:
+                    table_regions.append(TableRegion(
+                        page_num=page_num + 1,
+                        page=page,
+                        bbox=bbox,
+                        score=score
+                    ))
+                # 每5页或最后一页推送一次进度
+                if (page_num + 1) % 5 == 0 or page_num == total_pages - 1:
+                    progress = int((page_num + 1) / total_pages * 30)  # OCR预处理占30%进度
+                    _emit_progress("版面分析", progress, f"扫描页面 {page_num + 1}/{total_pages}")
+            logger.info(f"[OCR预处理] 共发现 {len(table_regions)} 个表格区域需要 OCR")
+
+        # === 阶段2: 异步并发执行 OCR (5并发) ===
+        ocr_results: List[OcrResult] = []
+
+        if table_regions:
+            logger.info(f"[OCR执行] 使用 {self.OCR_CONCURRENT_WORKERS} 并发执行 OCR...")
+            _emit_progress("版面分析", 35, f"发现 {len(table_regions)} 个表格,开始OCR识别...")
+            ocr_results = self._process_ocr_concurrent(table_regions, progress_callback=_emit_progress)
+            success_count = sum(1 for r in ocr_results if r.success)
+            logger.info(f"[OCR执行] 完成 {success_count}/{len(table_regions)} 个表格 OCR")
+            _emit_progress("版面分析", 50, f"OCR识别完成 {success_count}/{len(table_regions)}")
+
+        # 按页码分组 OCR 结果
+        ocr_by_page: Dict[int, List[OcrResult]] = {}
+        for result in ocr_results:
+            if result.success:
+                if result.page_num not in ocr_by_page:
+                    ocr_by_page[result.page_num] = []
+                ocr_by_page[result.page_num].append(result)
+
+        # === 阶段3: 提取页面文本(应用 OCR 结果)并切分章节 ===
+        structured_data: Dict[str, Dict[str, Dict[str, Any]]] = {}
+        current_chapter = "未分类前言"
+        current_section = "默认部分"
+        in_body = False
+        candidate_rule_names: Optional[List[str]] = None
+        active_rule_name: Optional[str] = None
+
+        logger.info("[文本提取] 提取页面内容并切分章节...")
+
+        for page_num in range(len(doc)):
+            page = doc.load_page(page_num)
+            rect = page.rect
+            clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
+
+            # 获取页面文本(应用 OCR 结果)
+            if page_num + 1 in ocr_by_page:
+                original_text = page.get_text("text", clip=clip_box)
+                ocr_results_list = [
+                    {
+                        "region_index": i,
+                        "bbox": r.bbox,
+                        "score": r.score,
+                        "ocr_text": r.text,
+                    }
+                    for i, r in enumerate(ocr_by_page[page_num + 1])
+                ]
+                text = self._replace_table_regions(page, original_text, ocr_results_list, clip_box)
+            else:
+                text = page.get_text("text", clip=clip_box)
+
+            lines = text.split("\n")
+
+            for line in lines:
+                line = line.strip()
+                if not line:
+                    continue
+                if self._is_header_footer(line):
+                    continue
+
+                # 跳过目录阶段
+                if not in_body:
+                    matched_rules = self._matching_rule_names(line, "l1")
+                    if matched_rules and not self.TOC_PATTERN.search(line):
+                        in_body = True
+                        candidate_rule_names = matched_rules
+                    else:
+                        continue
+
+                # 跳过残余目录格式
+                if self.TOC_PATTERN.search(line):
+                    continue
+
+                active_scope = [active_rule_name] if active_rule_name else candidate_rule_names
+
+                # 匹配章标题
+                matched_chapter_rules = self._matching_rule_names(line, "l1", active_scope)
+                if matched_chapter_rules:
+                    if active_rule_name is None:
+                        candidate_rule_names = matched_chapter_rules
+                    current_chapter = self._clean_chapter_title(line)
+                    current_section = "章节标题"
+                    if current_chapter not in structured_data:
+                        structured_data[current_chapter] = {}
+                    if current_section not in structured_data[current_chapter]:
+                        structured_data[current_chapter][current_section] = {
+                            "lines": [],
+                            "page_start": page_num + 1,
+                            "page_end": page_num + 1,
+                        }
+                    continue
+
+                # 匹配节标题
+                matched_section_rules = self._matching_rule_names(line, "l2", active_scope)
+                if matched_section_rules:
+                    if active_rule_name is None:
+                        if candidate_rule_names:
+                            for rule_name in candidate_rule_names:
+                                if rule_name in matched_section_rules:
+                                    active_rule_name = rule_name
+                                    break
+                        if active_rule_name is None:
+                            active_rule_name = matched_section_rules[0]
+                    current_section = self._clean_section_title(line)
+                    if current_chapter not in structured_data:
+                        structured_data[current_chapter] = {}
+                    if current_section not in structured_data[current_chapter]:
+                        structured_data[current_chapter][current_section] = {
+                            "lines": [],
+                            "page_start": page_num + 1,
+                            "page_end": page_num + 1,
+                        }
+                    continue
+
+                # 确保结构存在
+                if current_chapter not in structured_data:
+                    structured_data[current_chapter] = {}
+                if current_section not in structured_data[current_chapter]:
+                    structured_data[current_chapter][current_section] = {
+                        "lines": [],
+                        "page_start": page_num + 1,
+                        "page_end": page_num + 1,
+                    }
+
+                # 添加内容
+                structured_data[current_chapter][current_section]["lines"].append(line)
+                structured_data[current_chapter][current_section]["page_end"] = page_num + 1
+
+        # 将行列表拼接为文本
+        result: Dict[str, Any] = {"chapters": {}}
+        for chap, sections in structured_data.items():
+            result["chapters"][chap] = {}
+            for sec, data in sections.items():
+                result["chapters"][chap][sec] = {
+                    "content": "\n".join(data["lines"]),
+                    "page_start": data["page_start"],
+                    "page_end": data["page_end"],
+                }
+
+        logger.info(f"[PdfExtractor] 提取完成,共 {len(result['chapters'])} 个章节")
+        return result
+
+    def _normalize_catalog(self, catalog: Dict[str, Any]) -> Dict[str, Any]:
+        if not catalog:
+            return {}
+
+        normalized = dict(catalog)
+        raw_text = catalog.get("raw_ocr_text", "")
+        parsed_chapters = self._parse_catalog_from_raw_text(raw_text) if isinstance(raw_text, str) else []
+        if parsed_chapters:
+            normalized["chapters"] = parsed_chapters
+            normalized["total_chapters"] = len(parsed_chapters)
+            normalized["formatted_text"] = self._format_catalog_chapters(parsed_chapters)
+        return normalized
+
+    def _parse_catalog_from_raw_text(self, text: str) -> List[Dict[str, Any]]:
+        if not text or not text.strip():
+            return []
+
+        chapters: List[Dict[str, Any]] = []
+        current_chapter: Optional[Dict[str, Any]] = None
+        active_l2_rule: Optional[str] = None
+
+        for raw_line in text.splitlines():
+            title_text, page = self._split_catalog_entry(raw_line)
+            if not title_text:
+                continue
+
+            compact = re.sub(r"\s+", "", title_text)
+            if compact in {"目录", "目錄"}:
+                continue
+
+            chapter_matches = self._matching_rule_names(title_text, "l1")
+            if chapter_matches:
+                current_chapter = {
+                    "index": len(chapters) + 1,
+                    "title": self._clean_chapter_title(title_text),
+                    "page": str(page or 1),
+                    "original": raw_line.strip(),
+                    "subsections": [],
+                }
+                chapters.append(current_chapter)
+                active_l2_rule = None
+                continue
+
+            if current_chapter is None:
+                continue
+
+            section_matches = self._matching_rule_names(title_text, "l2")
+            if not section_matches:
+                continue
+
+            if active_l2_rule is None:
+                active_l2_rule = section_matches[0]
+            if active_l2_rule not in section_matches:
+                continue
+
+            section_title = self._clean_section_title(title_text)
+            section_key = self._normalize_heading_key(section_title)
+            existing_keys = {
+                self._normalize_heading_key(sub.get("title", ""))
+                for sub in current_chapter.get("subsections", [])
+            }
+            if section_key in existing_keys:
+                continue
+
+            current_chapter["subsections"].append({
+                "title": section_title,
+                "page": str(page or current_chapter.get("page", 1)),
+                "level": 2,
+                "original": raw_line.strip(),
+            })
+
+        return chapters
+
+    @staticmethod
+    def _split_catalog_entry(line: str) -> Tuple[str, Optional[int]]:
+        cleaned = line.strip()
+        if not cleaned:
+            return "", None
+
+        cleaned = re.sub(r"\s+", " ", cleaned).strip()
+        page_match = re.search(r"(?:[.\u2026\u00b7\u2022 ]{2,})(\d+)\s*$", cleaned)
+        if page_match:
+            title_text = cleaned[:page_match.start()].strip()
+            title_text = re.sub(r"[.\u2026\u00b7\u2022 ]+$", "", title_text).strip()
+            return title_text, int(page_match.group(1))
+
+        return cleaned, None
+
+    @staticmethod
+    def _format_catalog_chapters(chapters: List[Dict[str, Any]]) -> str:
+        lines: List[str] = []
+        for chapter in chapters:
+            title = chapter.get("title", "").strip()
+            if not title:
+                continue
+            lines.append(title)
+            for sub in chapter.get("subsections", []):
+                sub_title = sub.get("title", "").strip()
+                if sub_title:
+                    lines.append(f"  {sub_title}")
+        return "\n".join(lines)
+
+    def _reconcile_structure_with_catalog(
+        self,
+        chapters: Dict[str, Dict[str, Dict[str, Any]]],
+        catalog: Dict[str, Any],
+    ) -> Dict[str, Dict[str, Dict[str, Any]]]:
+        catalog_chapters = catalog.get("chapters", []) if isinstance(catalog, dict) else []
+        if not chapters or not catalog_chapters:
+            return chapters
+
+        section_title_key = "章节标题"
+        chapter_title_payloads: Dict[str, List[Dict[str, Any]]] = {}
+        flat_sections: List[Tuple[str, Dict[str, Any]]] = []
+
+        for chapter_title, sections in chapters.items():
+            title_key = self._normalize_heading_key(chapter_title)
+            title_payload = sections.get(section_title_key)
+            if title_payload is not None:
+                chapter_title_payloads.setdefault(title_key, []).append({
+                    "content": title_payload.get("content", ""),
+                    "page_start": title_payload.get("page_start", 1),
+                    "page_end": title_payload.get("page_end", title_payload.get("page_start", 1)),
+                })
+
+            for section_title, payload in sections.items():
+                if section_title == section_title_key:
+                    continue
+                flat_sections.append((
+                    self._normalize_heading_key(section_title),
+                    {
+                        "content": payload.get("content", ""),
+                        "page_start": payload.get("page_start", 1),
+                        "page_end": payload.get("page_end", payload.get("page_start", 1)),
+                    },
+                ))
+
+        rebuilt: Dict[str, Dict[str, Dict[str, Any]]] = {}
+        search_start = 0
+        used_indices = set()
+
+        for chapter in catalog_chapters:
+            chapter_title = (chapter.get("title", "") or "").strip()
+            if not chapter_title:
+                continue
+
+            chapter_page = self._safe_page_number(chapter.get("page"))
+            chapter_key = self._normalize_heading_key(chapter_title)
+            title_candidates = chapter_title_payloads.get(chapter_key, [])
+            title_payload = title_candidates.pop(0) if title_candidates else self._empty_section_payload(chapter_page)
+
+            rebuilt[chapter_title] = {
+                section_title_key: title_payload,
+            }
+
+            for subsection in chapter.get("subsections", []):
+                section_title = (subsection.get("title", "") or "").strip()
+                if not section_title:
+                    continue
+
+                target_key = self._normalize_heading_key(section_title)
+                match_index = None
+                for idx in range(search_start, len(flat_sections)):
+                    if idx in used_indices:
+                        continue
+                    if flat_sections[idx][0] == target_key:
+                        match_index = idx
+                        break
+                if match_index is None:
+                    for idx, (section_key, _) in enumerate(flat_sections):
+                        if idx in used_indices:
+                            continue
+                        if section_key == target_key:
+                            match_index = idx
+                            break
+
+                if match_index is not None:
+                    used_indices.add(match_index)
+                    search_start = max(search_start, match_index + 1)
+                    rebuilt[chapter_title][section_title] = flat_sections[match_index][1]
+                else:
+                    rebuilt[chapter_title][section_title] = self._empty_section_payload(
+                        self._safe_page_number(subsection.get("page"), chapter_page)
+                    )
+
+        return rebuilt or chapters
+
+    @staticmethod
+    def _normalize_heading_key(text: str) -> str:
+        normalized = (text or "").strip()
+        normalized = normalized.replace("【", "[").replace("】", "]")
+        normalized = normalized.replace("(", "(").replace(")", ")")
+        normalized = normalized.replace(".", ".").replace("。", ".")
+        normalized = re.sub(r"\s+", "", normalized)
+        return normalized
+
+    @staticmethod
+    def _safe_page_number(value: Any, default: int = 1) -> int:
+        try:
+            return max(1, int(str(value).strip()))
+        except Exception:
+            return default
+
+    @staticmethod
+    def _empty_section_payload(page_num: int) -> Dict[str, Any]:
+        return {
+            "content": "",
+            "page_start": page_num,
+            "page_end": page_num,
+        }
+
+    def _process_ocr_concurrent(self, regions: List[TableRegion], progress_callback=None) -> List[OcrResult]:
+        """同步并发处理 OCR(使用 ThreadPoolExecutor)"""
+        results: List[OcrResult] = []
+        total = len(regions)
+        completed = 0
+
+        with ThreadPoolExecutor(max_workers=self.OCR_CONCURRENT_WORKERS) as executor:
+            # 提交所有任务
+            future_to_region = {
+                executor.submit(self._ocr_table_region, r.page, r.bbox): r
+                for r in regions
+            }
+
+            # 处理完成的结果
+            for future in as_completed(future_to_region):
+                region = future_to_region[future]
+                completed += 1
+                try:
+                    text = future.result()
+                    results.append(OcrResult(
+                        page_num=region.page_num,
+                        bbox=region.bbox,
+                        score=region.score,
+                        text=text,
+                        success=True,
+                    ))
+                except Exception as e:
+                    logger.error(f"  第 {region.page_num} 页表格 OCR 失败: {e}")
+                    results.append(OcrResult(
+                        page_num=region.page_num,
+                        bbox=region.bbox,
+                        score=region.score,
+                        text="",
+                        success=False,
+                    ))
+
+                # 每完成5个或最后一个时推送进度
+                if progress_callback and (completed % 5 == 0 or completed == total):
+                    progress = 35 + int(completed / total * 15)  # OCR执行占15%进度(35-50)
+                    progress_callback("版面分析", progress, f"OCR识别中 {completed}/{total}")
+
+        return results
+
+    def _detect_table_regions(
+        self,
+        page: fitz.Page,
+        page_num: int,
+        clip_box: fitz.Rect
+    ) -> List[Tuple[Tuple[float, float, float, float], float]]:
+        """检测页面中的表格区域,返回坐标列表"""
+        table_regions: List[Tuple[Tuple[float, float, float, float], float]] = []
+
+        if not RAPID_LAYOUT_AVAILABLE:
+            return table_regions
+
+        layout_engine = self._get_layout_engine()
+        if layout_engine is None:
+            return table_regions
+
+        # 渲染页面(裁剪区域)
+        pix = page.get_pixmap(dpi=self.OCR_DPI, clip=clip_box)
+        img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, 3)
+
+        try:
+            layout_output = layout_engine(img)
+
+            # 解析版面结果
+            if hasattr(layout_output, 'boxes') and hasattr(layout_output, 'class_names'):
+                # 获取缩放比例
+                scale_x = clip_box.width / img.shape[1]
+                scale_y = clip_box.height / img.shape[0]
+
+                for box, label, score in zip(layout_output.boxes, layout_output.class_names, layout_output.scores):
+                    if label == "table" and score > self.OCR_CONFIDENCE_THRESHOLD:
+                        # 转换为 PDF 坐标
+                        pdf_x1 = clip_box.x0 + box[0] * scale_x
+                        pdf_y1 = clip_box.y0 + box[1] * scale_y
+                        pdf_x2 = clip_box.x0 + box[2] * scale_x
+                        pdf_y2 = clip_box.y0 + box[3] * scale_y
+
+                        table_regions.append(((pdf_x1, pdf_y1, pdf_x2, pdf_y2), score))
+
+        except Exception as e:
+            logger.warning(f"  第 {page_num} 页: 版面分析失败 ({e})")
+
+        return table_regions
+
+    def _ocr_table_region(self, page: fitz.Page, bbox: Tuple[float, float, float, float], max_retries: int = 3) -> str:
+        """对指定区域进行 OCR 识别(使用 GLM-OCR),支持指数退避重试"""
+        import time
+
+        # 渲染指定区域
+        rect = fitz.Rect(bbox)
+        pix = page.get_pixmap(dpi=self.OCR_DPI, clip=rect)
+        img_bytes = pix.tobytes("jpeg")
+
+        # 压缩图片
+        compressed = self._compress_image(img_bytes)
+        img_base64 = base64.b64encode(compressed).decode('utf-8')
+
+        # 请求 OCR
+        payload = {
+            "model": "GLM-OCR",
+            "messages": [
+                {
+                    "role": "user",
+                    "content": [
+                        {
+                            "type": "text",
+                            "text": "识别图片中的表格内容,按原文排版输出。"
+                                    "注意:"
+                                    "1. 表格用 Markdown 表格格式"
+                                    "2. 保持换行和列对齐"
+                                    "3. 只输出表格内容,不要其他说明"
+                        },
+                        {
+                            "type": "image_url",
+                            "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"}
+                        }
+                    ]
+                }
+            ],
+            "max_tokens": 2048,
+            "temperature": 0.1
+        }
+
+        headers = {"Content-Type": "application/json"}
+        if self.ocr_api_key:
+            headers["Authorization"] = f"Bearer {self.ocr_api_key}"
+
+        # 指数退避重试
+        last_error = None
+        for attempt in range(max_retries):
+            try:
+                response = requests.post(
+                    self.ocr_api_url,
+                    headers=headers,
+                    json=payload,
+                    timeout=self.ocr_timeout
+                )
+                response.raise_for_status()
+
+                result = response.json()
+                return self._extract_ocr_content(result)
+
+            except Exception as e:
+                last_error = e
+                if attempt < max_retries - 1:
+                    # 指数退避: 2, 4, 8 秒
+                    wait_time = 2 ** (attempt + 1)
+                    logger.warning(f"  第 {page.number + 1} 页表格 OCR 第 {attempt + 1} 次失败: {e}, {wait_time}秒后重试...")
+                    time.sleep(wait_time)
+                else:
+                    logger.error(f"  第 {page.number + 1} 页表格 OCR 最终失败(已重试{max_retries}次): {e}")
+
+        # 所有重试都失败,抛出最后一个错误
+        raise last_error
+
+    def _replace_table_regions(
+        self,
+        page: fitz.Page,
+        original_text: str,
+        ocr_results: List[Dict],
+        clip_box: fitz.Rect
+    ) -> str:
+        """用 OCR 结果替换原始文本中的表格区域"""
+        if not ocr_results:
+            return original_text
+
+        # 获取页面上的文本块及其坐标
+        text_blocks = []
+        for block in page.get_text("blocks"):
+            x0, y0, x1, y1, text, _, _ = block
+            # 只考虑裁剪区域内的文本
+            if y0 >= clip_box.y0 and y1 <= clip_box.y1:
+                text_blocks.append({
+                    "bbox": (x0, y0, x1, y1),
+                    "text": text.strip(),
+                })
+
+        # 按 Y 坐标排序
+        text_blocks.sort(key=lambda b: (b["bbox"][1], b["bbox"][0]))
+
+        # 找出属于表格区域的文本块
+        replaced_indices: Set[int] = set()
+        for ocr_result in ocr_results:
+            bbox = ocr_result["bbox"]
+            rx0, ry0, rx1, ry1 = bbox
+
+            for idx, block in enumerate(text_blocks):
+                if idx in replaced_indices:
+                    continue
+                bx0, by0, bx1, by1 = block["bbox"]
+
+                # 检查重叠
+                overlap_x = max(0, min(bx1, rx1) - max(bx0, rx0))
+                overlap_y = max(0, min(by1, ry1) - max(by0, ry0))
+                overlap_area = overlap_x * overlap_y
+                block_area = (bx1 - bx0) * (by1 - by0)
+
+                if block_area > 0 and overlap_area / block_area > 0.5:
+                    replaced_indices.add(idx)
+
+        # 构建新文本
+        result_parts: List[str] = []
+        last_idx = 0
+
+        for ocr_result in sorted(ocr_results, key=lambda r: r["bbox"][1]):
+            bbox = ocr_result["bbox"]
+            rx0, ry0, rx1, ry1 = bbox
+
+            # 找到该表格区域之前的文本
+            region_start_idx = None
+            for idx, block in enumerate(text_blocks):
+                if idx in replaced_indices:
+                    bx0, by0, bx1, by1 = block["bbox"]
+                    if (bx0 >= rx0 - 5 and bx1 <= rx1 + 5 and
+                        by0 >= ry0 - 5 and by1 <= ry1 + 5):
+                        if region_start_idx is None:
+                            region_start_idx = idx
+                        last_idx = idx + 1
+
+            if region_start_idx is not None:
+                # 添加表格前的非表格文本
+                for idx in range(last_idx - (last_idx - region_start_idx), region_start_idx):
+                    if idx not in replaced_indices and idx < len(text_blocks):
+                        result_parts.append(text_blocks[idx]["text"])
+                        result_parts.append("\n")
+
+                # 添加 OCR 结果
+                result_parts.append(ocr_result["ocr_text"])
+                result_parts.append("\n")
+
+        # 添加剩余文本
+        for idx in range(last_idx, len(text_blocks)):
+            if idx not in replaced_indices:
+                result_parts.append(text_blocks[idx]["text"])
+                result_parts.append("\n")
+
+        return "".join(result_parts).strip() or original_text
+
+    def _compress_image(self, img_bytes: bytes) -> bytes:
+        """压缩图片"""
+        try:
+            from PIL import Image
+            img = Image.open(io.BytesIO(img_bytes))
+
+            if img.mode in ('RGBA', 'LA', 'P'):
+                background = Image.new('RGB', img.size, (255, 255, 255))
+                if img.mode == 'P':
+                    img = img.convert('RGBA')
+                if img.mode in ('RGBA', 'LA'):
+                    background.paste(img, mask=img.split()[-1])
+                img = background
+            elif img.mode != 'RGB':
+                img = img.convert('RGB')
+
+            min_edge = min(img.size)
+            if min_edge > self.MAX_SHORT_EDGE:
+                ratio = self.MAX_SHORT_EDGE / min_edge
+                new_size = (int(img.width * ratio), int(img.height * ratio))
+                img = img.resize(new_size, Image.Resampling.LANCZOS)
+
+            buffer = io.BytesIO()
+            img.save(buffer, format='JPEG', quality=self.JPEG_QUALITY, optimize=True)
+            return buffer.getvalue()
+
+        except Exception as e:
+            logger.warning(f"图片压缩失败,使用原图: {e}")
+            return img_bytes
+
+    def _extract_ocr_content(self, result: Dict) -> str:
+        """从 OCR 响应提取内容,并将 HTML 表格转换为 Markdown"""
+        content = ""
+        if "choices" in result and isinstance(result["choices"], list):
+            if len(result["choices"]) > 0:
+                message = result["choices"][0].get("message", {})
+                content = message.get("content", "")
+
+        # 如果内容包含 HTML 标签,转换为 Markdown
+        if content and "<" in content and ">" in content:
+            try:
+                from ..doc_worker.pdf_worker.html_to_markdown import convert_html_to_markdown
+                content = convert_html_to_markdown(content)
+            except Exception as e:
+                logger.debug(f"HTML 转 Markdown 失败,保留原始内容: {e}")
+
+        return content
+
+    @staticmethod
+    def _is_header_footer(line: str) -> bool:
+        return (
+            "四川路桥建设集团股份有限公司" in line
+            or "T梁运输及安装专项施工方案" in line
+            or line.isdigit()
+        )
+
+    @classmethod
+    def _matching_rule_names(
+        cls,
+        line: str,
+        level: str,
+        rule_names: Optional[List[str]] = None,
+    ) -> List[str]:
+        clean_line = line.strip()
+        names = rule_names or list(cls.RULE_LIB.keys())
+        return [
+            rule_name
+            for rule_name in names
+            if cls.RULE_LIB[rule_name][level].match(clean_line)
+        ]
+
+    @classmethod
+    def _matches_chapter_heading(cls, line: str, rule_names: Optional[List[str]] = None) -> bool:
+        return bool(cls._matching_rule_names(line, "l1", rule_names))
+
+    @classmethod
+    def _matches_section_heading(cls, line: str, rule_names: Optional[List[str]] = None) -> bool:
+        return bool(cls._matching_rule_names(line, "l2", rule_names))
+
+    @staticmethod
+    def _clean_chapter_title(line: str) -> str:
+        cleaned = line.strip()
+        cleaned = re.sub(r"\s+\d+\s*$", "", cleaned)
+        cleaned = re.sub(r"[\._\-]{3,}[^\u4e00-\u9fa5a-zA-Z0-9]*$", "", cleaned)
+        cleaned = re.sub(r"\s+", " ", cleaned).strip()
+
+        cn_chapter_match = re.match(
+            r"^(第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章部分篇])[\s、::\.-]*(.*)$",
+            cleaned,
+        )
+        if cn_chapter_match:
+            prefix = re.sub(r"\s+", "", cn_chapter_match.group(1))
+            title = cn_chapter_match.group(2).strip()
+            return f"{prefix} {title}".strip()
+
+        num_chapter_match = re.match(r"^(\d{1,2})(?:[\..。、])?\s*(.*)$", cleaned)
+        if num_chapter_match:
+            prefix = num_chapter_match.group(1)
+            title = num_chapter_match.group(2).strip()
+            return f"{prefix} {title}".strip()
+
+        return cleaned
+
+    @staticmethod
+    def _clean_section_title(line: str) -> str:
+        cleaned = line.strip()
+        cleaned = re.sub(r"\s+\d+\s*$", "", cleaned)
+        cleaned = re.sub(r"[\._\-]{3,}[^\u4e00-\u9fa5a-zA-Z0-9]*$", "", cleaned)
+        cleaned = re.sub(r"\s+", " ", cleaned).strip()
+
+        numeric_section_match = re.match(r"^(\d+\.\d+)(?!\.\d)\.?\s*(.*)$", cleaned)
+        if numeric_section_match:
+            prefix = numeric_section_match.group(1)
+            title = numeric_section_match.group(2).strip()
+            return f"{prefix} {title}".strip()
+
+        cn_section_match = re.match(r"^(第\s*[一二三四五六七八九十百零两]+\s*节)[\s、::\.-]*(.*)$", cleaned)
+        if cn_section_match:
+            prefix = re.sub(r"\s+", "", cn_section_match.group(1))
+            title = cn_section_match.group(2).strip()
+            return f"{prefix} {title}".strip()
+
+        cn_list_match = re.match(r"^([一二三四五六七八九十百零两]+[、)\)\]])[\s]*(.*)$", cleaned)
+        if cn_list_match:
+            prefix = cn_list_match.group(1).strip()
+            title = cn_list_match.group(2).strip()
+            return f"{prefix} {title}".strip()
+
+        bracket_match = re.match(r"^([【\[]\s*\d+\s*[\]】])[\s]*(.*)$", cleaned)
+        if bracket_match:
+            prefix = re.sub(r"\s+", "", bracket_match.group(1))
+            title = bracket_match.group(2).strip()
+            return f"{prefix} {title}".strip()
+
+        return cleaned

+ 5 - 1
core/construction_review/component/minimal_pipeline/simple_processor.py

@@ -9,13 +9,14 @@
 """
 """
 
 
 import asyncio
 import asyncio
+import json
 import uuid
 import uuid
 from collections import defaultdict
 from collections import defaultdict
 from typing import Dict, Any, Optional, Tuple, List
 from typing import Dict, Any, Optional, Tuple, List
 
 
 from foundation.observability.logger.loggering import review_logger as logger
 from foundation.observability.logger.loggering import review_logger as logger
 
 
-from .pdf_extractor import PdfStructureExtractor
+from .pdf_extractor2 import PdfStructureExtractor
 from .toc_builder import build_toc_items_from_structure
 from .toc_builder import build_toc_items_from_structure
 from .chunk_assembler import assemble_chunks
 from .chunk_assembler import assemble_chunks
 from ..doc_worker.classification.hierarchy_classifier import HierarchyClassifier
 from ..doc_worker.classification.hierarchy_classifier import HierarchyClassifier
@@ -121,6 +122,9 @@ class SimpleDocumentProcessor:
                     pass
                     pass
 
 
         structure = self.pdf_extractor.extract(file_content, progress_callback=_extraction_progress)
         structure = self.pdf_extractor.extract(file_content, progress_callback=_extraction_progress)
+        print("-"*50)
+        print(f'{json.dumps(structure, ensure_ascii=False, indent=2)}')
+        print("-"*50)
         catalog = structure.get("catalog")  # 获取YOLO检测+OCR提取的目录
         catalog = structure.get("catalog")  # 获取YOLO检测+OCR提取的目录
 
 
         # 对 catalog 进行分类(如果存在)
         # 对 catalog 进行分类(如果存在)