Sfoglia il codice sorgente

refactor(pdf_extractor): 实现精准 OCR 表格回填方案

- 提取文本块时保留坐标信息(使用 get_text dict)

- 章节结构提取与 OCR 分离,确保章节不错位

- 根据 Y 坐标将 OCR 结果精确插入到对应小节

- 新增 _extract_text_blocks_with_position 方法

- 新增 _insert_ocr_blocks_into_chapters 方法

- 修复第三章、第四章、第八章缺失问题
WangXuMing 4 giorni fa
parent
commit
0f7258ce9d

+ 266 - 105
core/construction_review/component/minimal_pipeline/pdf_extractor.py

@@ -50,8 +50,8 @@ class OcrResult:
 class PdfStructureExtractor:
     """PDF 章节结构提取器(支持 OCR 异步并发)"""
 
-    CHAPTER_PATTERN = re.compile(r"^第[一二三四五六七八九十百]+章\s*.*")
-    SECTION_PATTERN = re.compile(r"^[一二三四五六七八九十百]+、\s*.*")
+    CHAPTER_PATTERN = re.compile(r'^第[一二三四五六七八九十百]+章\s*.*')
+    SECTION_PATTERN = re.compile(r'^[一二三四五六七八九十百]+、\s*.*')
     TOC_PATTERN = re.compile(r"\.{3,}|…{2,}")
 
     # OCR 配置
@@ -162,7 +162,16 @@ class PdfStructureExtractor:
         return self._toc_extractor.detect_and_extract(file_content, progress_callback)
 
     def _extract_from_doc(self, doc: fitz.Document, progress_callback=None) -> Dict[str, Any]:
-        """提取文档结构(支持 OCR 异步并发)"""
+        """
+        提取文档结构(支持 OCR 异步并发)- 带坐标的精准回填方案。
+
+        流程:
+        1. 提取带坐标的文本块
+        2. 章节标题匹配 + 块归属划分
+        3. 扫描表格区域并 OCR
+        4. 根据表格坐标,将其作为新的块插入到对应小节
+        5. 将每个小节的块列表按顺序拼接成纯文本输出
+        """
 
         def _emit_progress(stage: str, current: int, message: str):
             """发送进度回调"""
@@ -172,83 +181,30 @@ class PdfStructureExtractor:
                 except Exception:
                     pass
 
-        # === 阶段1: 收集所有需要 OCR 的表格区域 ===
-        table_regions: List[TableRegion] = []
-
-        if self.use_ocr:
-            logger.info("[OCR预处理] 扫描所有页面的表格区域...")
-            total_pages = len(doc)
-            for page_num in range(total_pages):
-                page = doc.load_page(page_num)
-                rect = page.rect
-                clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
-                regions = self._detect_table_regions(page, page_num + 1, clip_box)
-                for bbox, score in regions:
-                    table_regions.append(TableRegion(
-                        page_num=page_num + 1,
-                        page=page,
-                        bbox=bbox,
-                        score=score
-                    ))
-                # 每5页或最后一页推送一次进度
-                if (page_num + 1) % 5 == 0 or page_num == total_pages - 1:
-                    progress = int((page_num + 1) / total_pages * 30)  # OCR预处理占30%进度
-                    _emit_progress("版面分析", progress, f"扫描页面 {page_num + 1}/{total_pages}")
-            logger.info(f"[OCR预处理] 共发现 {len(table_regions)} 个表格区域需要 OCR")
-
-        # === 阶段2: 异步并发执行 OCR (5并发) ===
-        ocr_results: List[OcrResult] = []
-
-        if table_regions:
-            logger.info(f"[OCR执行] 使用 {self.OCR_CONCURRENT_WORKERS} 并发执行 OCR...")
-            _emit_progress("版面分析", 35, f"发现 {len(table_regions)} 个表格,开始OCR识别...")
-            ocr_results = self._process_ocr_concurrent(table_regions, progress_callback=_emit_progress)
-            success_count = sum(1 for r in ocr_results if r.success)
-            logger.info(f"[OCR执行] 完成 {success_count}/{len(table_regions)} 个表格 OCR")
-            _emit_progress("版面分析", 50, f"OCR识别完成 {success_count}/{len(table_regions)}")
+        total_pages = len(doc)
 
-        # 按页码分组 OCR 结果
-        ocr_by_page: Dict[int, List[OcrResult]] = {}
-        for result in ocr_results:
-            if result.success:
-                if result.page_num not in ocr_by_page:
-                    ocr_by_page[result.page_num] = []
-                ocr_by_page[result.page_num].append(result)
+        # ==================== 阶段1: 提取带坐标的文本块并归属到章节/小节====================
+        logger.info("[阶段1] 提取带坐标的文本块并归属章节...")
 
-        # === 阶段3: 提取页面文本(应用 OCR 结果)并切分章节 ===
-        structured_data: Dict[str, Dict[str, Dict[str, Any]]] = {}
+        # 数据结构: {(chapter_name, section_name): [blocks_with_position]}
+        chapter_blocks: Dict[Tuple[str, str], List[Dict[str, Any]]] = {}
         current_chapter = "未分类前言"
         current_section = "默认部分"
         in_body = False
 
-        logger.info("[文本提取] 提取页面内容并切分章节...")
-
-        for page_num in range(len(doc)):
+        for page_num in range(total_pages):
             page = doc.load_page(page_num)
             rect = page.rect
             clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
 
-            # 获取页面文本(应用 OCR 结果)
-            if page_num + 1 in ocr_by_page:
-                original_text = page.get_text("text", clip=clip_box)
-                ocr_results_list = [
-                    {
-                        "region_index": i,
-                        "bbox": r.bbox,
-                        "score": r.score,
-                        "ocr_text": r.text,
-                    }
-                    for i, r in enumerate(ocr_by_page[page_num + 1])
-                ]
-                text = self._replace_table_regions(page, original_text, ocr_results_list, clip_box)
-            else:
-                text = page.get_text("text", clip=clip_box)
+            # 获取带坐标的文本块
+            blocks = self._extract_text_blocks_with_position(page, clip_box)
 
-            lines = text.split("\n")
+            for block in blocks:
+                line = block["text"]
 
-            for line in lines:
-                line = line.strip()
-                if not line:
+                # 跳过空行和页眉页脚
+                if not line.strip():
                     continue
                 if self._is_header_footer(line):
                     continue
@@ -268,53 +224,94 @@ class PdfStructureExtractor:
                 if self.CHAPTER_PATTERN.match(line):
                     current_chapter = self._clean_chapter_title(line)
                     current_section = "章节标题"
-                    if current_chapter not in structured_data:
-                        structured_data[current_chapter] = {}
-                    if current_section not in structured_data[current_chapter]:
-                        structured_data[current_chapter][current_section] = {
-                            "lines": [],
-                            "page_start": page_num + 1,
-                            "page_end": page_num + 1,
-                        }
+                    key = (current_chapter, current_section)
+                    if key not in chapter_blocks:
+                        chapter_blocks[key] = []
+                    chapter_blocks[key].append(block)
                     continue
 
                 # 匹配节标题
                 if self.SECTION_PATTERN.match(line):
                     current_section = line
-                    if current_chapter not in structured_data:
-                        structured_data[current_chapter] = {}
-                    if current_section not in structured_data[current_chapter]:
-                        structured_data[current_chapter][current_section] = {
-                            "lines": [],
-                            "page_start": page_num + 1,
-                            "page_end": page_num + 1,
-                        }
+                    key = (current_chapter, current_section)
+                    if key not in chapter_blocks:
+                        chapter_blocks[key] = []
+                    chapter_blocks[key].append(block)
                     continue
 
-                # 确保结构存在
-                if current_chapter not in structured_data:
-                    structured_data[current_chapter] = {}
-                if current_section not in structured_data[current_chapter]:
-                    structured_data[current_chapter][current_section] = {
-                        "lines": [],
-                        "page_start": page_num + 1,
-                        "page_end": page_num + 1,
-                    }
+                # 普通内容块
+                key = (current_chapter, current_section)
+                if key not in chapter_blocks:
+                    chapter_blocks[key] = []
+                chapter_blocks[key].append(block)
+
+        logger.info(f"[阶段1] 章节结构提取完成,共 {len({k[0] for k in chapter_blocks})} 个章节")
+
+        # ==================== 阶段2: 收集表格区域并OCR(如果启用OCR)====================
+        table_regions: List[TableRegion] = []
+        ocr_results: List[OcrResult] = []
+
+        if self.use_ocr:
+            logger.info("[阶段2] 扫描表格区域...")
+            for page_num in range(total_pages):
+                page = doc.load_page(page_num)
+                rect = page.rect
+                clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
+                regions = self._detect_table_regions(page, page_num + 1, clip_box)
+                for bbox, score in regions:
+                    table_regions.append(TableRegion(
+                        page_num=page_num + 1,
+                        page=page,
+                        bbox=bbox,
+                        score=score
+                    ))
+                # 每5页推送进度
+                if (page_num + 1) % 5 == 0 or page_num == total_pages - 1:
+                    progress = int((page_num + 1) / total_pages * 30)
+                    _emit_progress("版面分析", progress, f"扫描页面 {page_num + 1}/{total_pages}")
+
+            logger.info(f"[阶段2] 发现 {len(table_regions)} 个表格区域")
+
+            # 执行OCR
+            if table_regions:
+                _emit_progress("版面分析", 35, f"发现 {len(table_regions)} 个表格,开始OCR识别...")
+                ocr_results = self._process_ocr_concurrent(table_regions, progress_callback=_emit_progress)
+                success_count = sum(1 for r in ocr_results if r.success)
+                logger.info(f"[阶段2] OCR完成 {success_count}/{len(table_regions)}")
+                _emit_progress("版面分析", 50, f"OCR识别完成 {success_count}/{len(table_regions)}")
 
-                # 添加内容
-                structured_data[current_chapter][current_section]["lines"].append(line)
-                structured_data[current_chapter][current_section]["page_end"] = page_num + 1
+        # ==================== 阶段3: 将OCR结果作为新块插入到对应章节====================
+        if ocr_results:
+            logger.info("[阶段3] 将OCR结果回填到对应章节...")
+            self._insert_ocr_blocks_into_chapters(chapter_blocks, ocr_results)
 
-        # 将行列表拼接为文本
+        # ==================== 阶段4: 生成最终输出(块列表转纯文本)====================
+        logger.info("[阶段4] 生成最终文本输出...")
         result: Dict[str, Any] = {"chapters": {}}
-        for chap, sections in structured_data.items():
-            result["chapters"][chap] = {}
-            for sec, data in sections.items():
-                result["chapters"][chap][sec] = {
-                    "content": "\n".join(data["lines"]),
-                    "page_start": data["page_start"],
-                    "page_end": data["page_end"],
-                }
+
+        for (chap_name, sec_name), blocks in chapter_blocks.items():
+            if chap_name not in result["chapters"]:
+                result["chapters"][chap_name] = {}
+
+            # 按页码和Y坐标排序块
+            blocks.sort(key=lambda b: (b["page"], b["bbox"][1]))
+
+            # 拼接文本
+            lines = []
+            page_start = blocks[0]["page"] if blocks else 1
+            page_end = blocks[-1]["page"] if blocks else 1
+
+            for block in blocks:
+                if block.get("type") == "table":
+                    lines.append(f"\n[表格OCR识别结果]:\n{block['text']}\n[/表格]\n")
+                else:
+                    lines.append(block["text"])
+
+            result["chapters"][chap_name][sec_name] = {
+                "content": "\n".join(lines),
+                "page_start": page_start,
+                "page_end": page_end,
+            }
 
         logger.info(f"[PdfExtractor] 提取完成,共 {len(result['chapters'])} 个章节")
         return result
@@ -608,6 +605,170 @@ class PdfStructureExtractor:
 
         return content
 
+    def _extract_text_blocks_with_position(
+        self,
+        page: fitz.Page,
+        clip_box: fitz.Rect
+    ) -> List[Dict[str, Any]]:
+        """
+        提取带坐标的文本块列表。
+
+        使用 page.get_text("dict") 获取每个文本块的精确边界框和文本内容。
+        """
+        blocks = []
+        page_dict = page.get_text("dict", clip=clip_box)
+
+        for block in page_dict.get("blocks", []):
+            if block.get("type") == 0:  # 文本块
+                bbox = block["bbox"]
+                y_center = (bbox[1] + bbox[3]) / 2
+
+                # 拼接块内所有文本
+                text_lines = []
+                for line in block.get("lines", []):
+                    line_text = ""
+                    for span in line.get("spans", []):
+                        line_text += span.get("text", "")
+                    if line_text.strip():
+                        text_lines.append(line_text)
+
+                if text_lines:
+                    blocks.append({
+                        "text": "\n".join(text_lines),
+                        "page": page.number + 1,
+                        "bbox": bbox,
+                        "y_center": y_center,
+                        "type": "text"
+                    })
+
+        # 按阅读顺序排序(Y坐标为主,X坐标为辅)
+        blocks.sort(key=lambda b: (b["page"], b["bbox"][1], b["bbox"][0]))
+        return blocks
+
+    def _insert_ocr_blocks_into_chapters(
+        self,
+        chapter_blocks: Dict[Tuple[str, str], List[Dict[str, Any]]],
+        ocr_results: List[OcrResult]
+    ) -> None:
+        """
+        将OCR结果作为新的块插入到对应章节。
+
+        策略:
+        1. 找到表格Y坐标所在的页面
+        2. 在该页面的所有小节中,找到表格Y坐标介于哪两个文本块之间
+        3. 将OCR块插入到正确位置
+        """
+        # 按页码分组OCR结果
+        ocr_by_page: Dict[int, List[OcrResult]] = {}
+        for result in ocr_results:
+            if result.success:
+                if result.page_num not in ocr_by_page:
+                    ocr_by_page[result.page_num] = []
+                ocr_by_page[result.page_num].append(result)
+
+        # 处理每个包含表格的页面
+        for page_num, ocr_list in ocr_by_page.items():
+            # 找到该页面涉及的所有小节
+            page_sections = []
+            for (chap_name, sec_name), blocks in chapter_blocks.items():
+                # 检查该小节是否包含该页面的块
+                page_blocks = [b for b in blocks if b["page"] == page_num]
+                if page_blocks:
+                    page_sections.append({
+                        "chapter": chap_name,
+                        "section": sec_name,
+                        "blocks": page_blocks,
+                        "all_blocks": blocks,  # 引用原列表用于插入
+                    })
+
+            if not page_sections:
+                logger.warning(f"[OCR回填] 第{page_num}页没有匹配到任何小节")
+                continue
+
+            # 处理每个OCR结果
+            for ocr_result in sorted(ocr_list, key=lambda r: r.bbox[1]):
+                table_y_top = ocr_result.bbox[1]
+                table_y_bottom = ocr_result.bbox[3]
+                ocr_text = ocr_result.text
+
+                # 构造表格块
+                table_block = {
+                    "text": ocr_text,
+                    "page": page_num,
+                    "bbox": ocr_result.bbox,
+                    "y_center": (table_y_top + table_y_bottom) / 2,
+                    "type": "table"
+                }
+
+                # 找到目标小节
+                target_section = None
+                insert_index = -1
+
+                for ps in page_sections:
+                    # 获取该小节在该页面的所有块,按Y坐标排序
+                    page_blocks = sorted(ps["blocks"], key=lambda b: b["bbox"][1])
+
+                    if not page_blocks:
+                        continue
+
+                    # 找到表格应该插入的位置
+                    # 策略:表格上边界位于哪个块之后
+                    found = False
+                    for i, block in enumerate(page_blocks):
+                        block_y_bottom = block["bbox"][3]
+                        if i < len(page_blocks) - 1:
+                            next_y_top = page_blocks[i + 1]["bbox"][1]
+                        else:
+                            next_y_top = float('inf')
+
+                        # 如果表格位于当前块之后,且在下一块之前
+                        if block_y_bottom <= table_y_top < next_y_top:
+                            # 找到在原列表中的位置
+                            try:
+                                insert_index = ps["all_blocks"].index(block) + 1
+                                target_section = ps
+                                found = True
+                                break
+                            except ValueError:
+                                continue
+
+                    # 如果表格在所有块之前
+                    if not found and table_y_top < page_blocks[0]["bbox"][1]:
+                        try:
+                            insert_index = ps["all_blocks"].index(page_blocks[0])
+                            target_section = ps
+                            found = True
+                        except ValueError:
+                            continue
+
+                    # 如果表格在所有块之后
+                    if not found and table_y_bottom > page_blocks[-1]["bbox"][3]:
+                        try:
+                            insert_index = ps["all_blocks"].index(page_blocks[-1]) + 1
+                            target_section = ps
+                            found = True
+                        except ValueError:
+                            continue
+
+                    if found:
+                        break
+
+                # 执行插入
+                if target_section and insert_index >= 0:
+                    target_section["all_blocks"].insert(insert_index, table_block)
+                    logger.debug(
+                        f"[OCR回填] 第{page_num}页表格(Y={table_y_top:.0f}) -> "
+                        f"{target_section['chapter']}/{target_section['section']} 位置{insert_index}"
+                    )
+                else:
+                    # 兜底:追加到该页面第一个小节末尾
+                    if page_sections:
+                        ps = page_sections[0]
+                        ps["all_blocks"].append(table_block)
+                        logger.warning(
+                            f"[OCR回填] 第{page_num}页表格无法精确定位,追加到 {ps['chapter']}/{ps['section']}"
+                        )
+
     @staticmethod
     def _is_header_footer(line: str) -> bool:
         return (

+ 15 - 1
core/construction_review/component/minimal_pipeline/simple_processor.py

@@ -14,6 +14,7 @@ from collections import defaultdict
 from typing import Dict, Any, Optional, Tuple, List
 
 from foundation.observability.logger.loggering import review_logger as logger
+from foundation.observability.cachefiles import cache, CacheBaseDir
 
 from .pdf_extractor import PdfStructureExtractor
 from .toc_builder import build_toc_items_from_structure
@@ -123,7 +124,7 @@ class SimpleDocumentProcessor:
         structure = self.pdf_extractor.extract(file_content, progress_callback=_extraction_progress)
 
         # 文档提取质量检查
-        self._check_extraction_quality(structure)
+        self._check_extraction_quality(structure, file_name)
 
         catalog = structure.get("catalog")  # 获取YOLO检测+OCR提取的目录
 
@@ -459,6 +460,7 @@ class SimpleDocumentProcessor:
     def _check_extraction_quality(
         self,
         structure: Dict[str, Any],
+        file_name: str = "",
         default_total_chapters: int = 10,
         default_total_subsections: int = 41,
         l1_threshold: float = 0.70,
@@ -538,3 +540,15 @@ class SimpleDocumentProcessor:
 
         # 将质量检查结果添加到 chapters 中
         chapters["quality_check"] = quality_result
+
+        # 保存提取结果到缓存目录,使用真实文件名
+        if file_name:
+            # 去掉扩展名,添加后缀
+            base_name = file_name.rsplit(".", 1)[0] if "." in file_name else file_name
+            cache_filename = f"{base_name}_预处理结果.json"
+            cache.save(
+                structure,
+                subdir="document_temp",
+                filename=cache_filename,
+                base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW,
+            )