瀏覽代碼

fix(更换新的pdf_extractor文件)

tangle 3 天之前
父節點
當前提交
b93cd363db
共有 1 個文件被更改,包括 54 次插入210 次删除
  1. 54 210
      core/construction_review/component/minimal_pipeline/pdf_extractor2.py

+ 54 - 210
core/construction_review/component/minimal_pipeline/pdf_extractor2.py

@@ -2,30 +2,18 @@
 PDF 结构提取器 - 同步并发 OCR 版本
 PDF 结构提取器 - 同步并发 OCR 版本
 
 
 基于 splitter_pdf 逻辑,直接提取章节结构并记录页码。
 基于 splitter_pdf 逻辑,直接提取章节结构并记录页码。
-支持 OCR 增强:检测表格区域并使用 ThreadPoolExecutor 5并发 OCR,其他文本保持 PyMuPDF 提取。
+支持 OCR 增强:表格检测和识别委托给 OcrProcessor,其他文本保持 PyMuPDF 提取。
 输出格式兼容后续分类与组装流程。
 输出格式兼容后续分类与组装流程。
 """
 """
 
 
-import base64
-import io
 import re
 import re
-from concurrent.futures import ThreadPoolExecutor, as_completed
 from dataclasses import dataclass
 from dataclasses import dataclass
 from typing import Dict, Any, List, Optional, Tuple, Set
 from typing import Dict, Any, List, Optional, Tuple, Set
 
 
 import fitz
 import fitz
-import numpy as np
-import requests
 
 
 from foundation.observability.logger.loggering import review_logger as logger
 from foundation.observability.logger.loggering import review_logger as logger
-
-# 尝试导入 RapidLayout
-try:
-    from rapid_layout import RapidLayout
-    RAPID_LAYOUT_AVAILABLE = True
-except ImportError:
-    RAPID_LAYOUT_AVAILABLE = False
-    RapidLayout = None
+from .ocr_processor import OcrProcessor
 
 
 
 
 @dataclass
 @dataclass
@@ -110,12 +98,25 @@ class PdfStructureExtractor:
     ):
     ):
         self.clip_top = clip_top
         self.clip_top = clip_top
         self.clip_bottom = clip_bottom
         self.clip_bottom = clip_bottom
-        self.use_ocr = use_ocr and RAPID_LAYOUT_AVAILABLE
 
 
         # OCR 配置
         # OCR 配置
         self.ocr_api_url = ocr_api_url
         self.ocr_api_url = ocr_api_url
         self.ocr_timeout = ocr_timeout
         self.ocr_timeout = ocr_timeout
         self.ocr_api_key = ocr_api_key
         self.ocr_api_key = ocr_api_key
+        self.ocr_processor: Optional[OcrProcessor] = None
+        self.use_ocr = False
+        if use_ocr:
+            self.ocr_processor = OcrProcessor(
+                ocr_api_url=ocr_api_url,
+                ocr_timeout=ocr_timeout,
+                ocr_api_key=ocr_api_key,
+                max_short_edge=self.MAX_SHORT_EDGE,
+                jpeg_quality=self.JPEG_QUALITY,
+                ocr_dpi=self.OCR_DPI,
+                confidence_threshold=self.OCR_CONFIDENCE_THRESHOLD,
+                concurrent_workers=self.OCR_CONCURRENT_WORKERS,
+            )
+            self.use_ocr = self.ocr_processor.is_available()
         self._layout_engine: Optional[Any] = None
         self._layout_engine: Optional[Any] = None
 
 
         # 目录检测配置
         # 目录检测配置
@@ -123,14 +124,11 @@ class PdfStructureExtractor:
         self.toc_model_path = toc_model_path
         self.toc_model_path = toc_model_path
         self._toc_extractor = None
         self._toc_extractor = None
 
 
-        if use_ocr and not RAPID_LAYOUT_AVAILABLE:
-            logger.warning("RapidLayout 未安装,OCR 功能不可用")
-
     def _get_layout_engine(self) -> Optional[Any]:
     def _get_layout_engine(self) -> Optional[Any]:
-        """延迟初始化 RapidLayout"""
-        if self._layout_engine is None and RAPID_LAYOUT_AVAILABLE:
-            self._layout_engine = RapidLayout()
-        return self._layout_engine
+        """兼容旧调用,实际由 OcrProcessor 管理版面引擎。"""
+        if self.ocr_processor is None:
+            return None
+        return self.ocr_processor._get_layout_engine()
 
 
     def extract(self, file_content: bytes, progress_callback=None) -> Dict[str, Any]:
     def extract(self, file_content: bytes, progress_callback=None) -> Dict[str, Any]:
         """
         """
@@ -1409,47 +1407,21 @@ class PdfStructureExtractor:
         return None
         return None
 
 
     def _process_ocr_concurrent(self, regions: List[TableRegion], progress_callback=None) -> List[OcrResult]:
     def _process_ocr_concurrent(self, regions: List[TableRegion], progress_callback=None) -> List[OcrResult]:
-        """同步并发处理 OCR(使用 ThreadPoolExecutor)"""
-        results: List[OcrResult] = []
-        total = len(regions)
-        completed = 0
-
-        with ThreadPoolExecutor(max_workers=self.OCR_CONCURRENT_WORKERS) as executor:
-            # 提交所有任务
-            future_to_region = {
-                executor.submit(self._ocr_table_region, r.page, r.bbox): r
-                for r in regions
-            }
+        """同步并发处理 OCR,具体实现委托给 OcrProcessor。"""
+        if self.ocr_processor is None:
+            return []
 
 
-            # 处理完成的结果
-            for future in as_completed(future_to_region):
-                region = future_to_region[future]
-                completed += 1
-                try:
-                    text = future.result()
-                    results.append(OcrResult(
-                        page_num=region.page_num,
-                        bbox=region.bbox,
-                        score=region.score,
-                        text=text,
-                        success=True,
-                    ))
-                except Exception as e:
-                    logger.error(f"  第 {region.page_num} 页表格 OCR 失败: {e}")
-                    results.append(OcrResult(
-                        page_num=region.page_num,
-                        bbox=region.bbox,
-                        score=region.score,
-                        text="",
-                        success=False,
-                    ))
+        if not progress_callback:
+            return self.ocr_processor.process_ocr_concurrent(regions)
 
 
-                # 每完成5个或最后一个时推送进度
-                if progress_callback and (completed % 5 == 0 or completed == total):
-                    progress = 35 + int(completed / total * 15)  # OCR执行占15%进度(35-50)
-                    progress_callback("版面分析", progress, f"OCR识别中 {completed}/{total}")
+        def _progress_adapter(completed: int, total: int):
+            progress = 35 + int(completed / total * 15) if total else 50
+            progress_callback("版面分析", progress, f"OCR识别中 {completed}/{total}")
 
 
-        return results
+        return self.ocr_processor.process_ocr_concurrent(
+            regions,
+            progress_callback=_progress_adapter,
+        )
 
 
     def _detect_table_regions(
     def _detect_table_regions(
         self,
         self,
@@ -1457,114 +1429,16 @@ class PdfStructureExtractor:
         page_num: int,
         page_num: int,
         clip_box: fitz.Rect
         clip_box: fitz.Rect
     ) -> List[Tuple[Tuple[float, float, float, float], float]]:
     ) -> List[Tuple[Tuple[float, float, float, float], float]]:
-        """检测页面中的表格区域,返回坐标列表"""
-        table_regions: List[Tuple[Tuple[float, float, float, float], float]] = []
-
-        if not RAPID_LAYOUT_AVAILABLE:
-            return table_regions
-
-        layout_engine = self._get_layout_engine()
-        if layout_engine is None:
-            return table_regions
-
-        # 渲染页面(裁剪区域)
-        pix = page.get_pixmap(dpi=self.OCR_DPI, clip=clip_box)
-        img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, 3)
-
-        try:
-            layout_output = layout_engine(img)
-
-            # 解析版面结果
-            if hasattr(layout_output, 'boxes') and hasattr(layout_output, 'class_names'):
-                # 获取缩放比例
-                scale_x = clip_box.width / img.shape[1]
-                scale_y = clip_box.height / img.shape[0]
-
-                for box, label, score in zip(layout_output.boxes, layout_output.class_names, layout_output.scores):
-                    if label == "table" and score > self.OCR_CONFIDENCE_THRESHOLD:
-                        # 转换为 PDF 坐标
-                        pdf_x1 = clip_box.x0 + box[0] * scale_x
-                        pdf_y1 = clip_box.y0 + box[1] * scale_y
-                        pdf_x2 = clip_box.x0 + box[2] * scale_x
-                        pdf_y2 = clip_box.y0 + box[3] * scale_y
-
-                        table_regions.append(((pdf_x1, pdf_y1, pdf_x2, pdf_y2), score))
-
-        except Exception as e:
-            logger.warning(f"  第 {page_num} 页: 版面分析失败 ({e})")
-
-        return table_regions
+        """检测页面中的表格区域,具体实现委托给 OcrProcessor。"""
+        if self.ocr_processor is None:
+            return []
+        return self.ocr_processor.detect_table_regions(page, page_num, clip_box)
 
 
     def _ocr_table_region(self, page: fitz.Page, bbox: Tuple[float, float, float, float], max_retries: int = 3) -> str:
     def _ocr_table_region(self, page: fitz.Page, bbox: Tuple[float, float, float, float], max_retries: int = 3) -> str:
-        """对指定区域进行 OCR 识别(使用 GLM-OCR),支持指数退避重试"""
-        import time
-
-        # 渲染指定区域
-        rect = fitz.Rect(bbox)
-        pix = page.get_pixmap(dpi=self.OCR_DPI, clip=rect)
-        img_bytes = pix.tobytes("jpeg")
-
-        # 压缩图片
-        compressed = self._compress_image(img_bytes)
-        img_base64 = base64.b64encode(compressed).decode('utf-8')
-
-        # 请求 OCR
-        payload = {
-            "model": "GLM-OCR",
-            "messages": [
-                {
-                    "role": "user",
-                    "content": [
-                        {
-                            "type": "text",
-                            "text": "识别图片中的表格内容,按原文排版输出。"
-                                    "注意:"
-                                    "1. 表格用 Markdown 表格格式"
-                                    "2. 保持换行和列对齐"
-                                    "3. 只输出表格内容,不要其他说明"
-                        },
-                        {
-                            "type": "image_url",
-                            "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"}
-                        }
-                    ]
-                }
-            ],
-            "max_tokens": 2048,
-            "temperature": 0.1
-        }
-
-        headers = {"Content-Type": "application/json"}
-        if self.ocr_api_key:
-            headers["Authorization"] = f"Bearer {self.ocr_api_key}"
-
-        # 指数退避重试
-        last_error = None
-        for attempt in range(max_retries):
-            try:
-                response = requests.post(
-                    self.ocr_api_url,
-                    headers=headers,
-                    json=payload,
-                    timeout=self.ocr_timeout
-                )
-                response.raise_for_status()
-
-                result = response.json()
-                return self._extract_ocr_content(result)
-
-            except Exception as e:
-                last_error = e
-                if attempt < max_retries - 1:
-                    # 指数退避: 2, 4, 8 秒
-                    wait_time = 2 ** (attempt + 1)
-                    logger.warning(f"  第 {page.number + 1} 页表格 OCR 第 {attempt + 1} 次失败: {e}, {wait_time}秒后重试...")
-                    time.sleep(wait_time)
-                else:
-                    logger.error(f"  第 {page.number + 1} 页表格 OCR 最终失败(已重试{max_retries}次): {e}")
-
-        # 所有重试都失败,抛出最后一个错误
-        raise last_error
+        """对指定区域进行 OCR 识别,具体实现委托给 OcrProcessor。"""
+        if self.ocr_processor is None:
+            raise RuntimeError("OCR processor is not initialized")
+        return self.ocr_processor._ocr_table_region(page, bbox, max_retries=max_retries)
 
 
     def _replace_table_regions(
     def _replace_table_regions(
         self,
         self,
@@ -1573,10 +1447,16 @@ class PdfStructureExtractor:
         ocr_results: List[Dict],
         ocr_results: List[Dict],
         clip_box: fitz.Rect
         clip_box: fitz.Rect
     ) -> str:
     ) -> str:
-        """用 OCR 结果替换原始文本中的表格区域"""
+        """用 OCR 结果替换原始文本中的表格区域。"""
+        if self.ocr_processor is None:
+            return original_text
         if not ocr_results:
         if not ocr_results:
             return original_text
             return original_text
 
 
+        # 这里保留章节提取场景的兼容逻辑:
+        # 1. 标题块不参与表格替换,避免目录/章节标题被表格框误吞;
+        # 2. 仅替换真正落入表格区域的正文块,保留表格前后的普通文本;
+        # 3. OCR 返回空时退回原始 PDF 文本,避免整块内容被清空。
         text_blocks = []
         text_blocks = []
         for block in page.get_text("blocks"):
         for block in page.get_text("blocks"):
             x0, y0, x1, y1, text, _, _ = block
             x0, y0, x1, y1, text, _, _ = block
@@ -1668,52 +1548,16 @@ class PdfStructureExtractor:
         return False
         return False
 
 
     def _compress_image(self, img_bytes: bytes) -> bytes:
     def _compress_image(self, img_bytes: bytes) -> bytes:
-        """压缩图片"""
-        try:
-            from PIL import Image
-            img = Image.open(io.BytesIO(img_bytes))
-
-            if img.mode in ('RGBA', 'LA', 'P'):
-                background = Image.new('RGB', img.size, (255, 255, 255))
-                if img.mode == 'P':
-                    img = img.convert('RGBA')
-                if img.mode in ('RGBA', 'LA'):
-                    background.paste(img, mask=img.split()[-1])
-                img = background
-            elif img.mode != 'RGB':
-                img = img.convert('RGB')
-
-            min_edge = min(img.size)
-            if min_edge > self.MAX_SHORT_EDGE:
-                ratio = self.MAX_SHORT_EDGE / min_edge
-                new_size = (int(img.width * ratio), int(img.height * ratio))
-                img = img.resize(new_size, Image.Resampling.LANCZOS)
-
-            buffer = io.BytesIO()
-            img.save(buffer, format='JPEG', quality=self.JPEG_QUALITY, optimize=True)
-            return buffer.getvalue()
-
-        except Exception as e:
-            logger.warning(f"图片压缩失败,使用原图: {e}")
+        """压缩图片,具体实现委托给 OcrProcessor。"""
+        if self.ocr_processor is None:
             return img_bytes
             return img_bytes
+        return self.ocr_processor._compress_image(img_bytes)
 
 
     def _extract_ocr_content(self, result: Dict) -> str:
     def _extract_ocr_content(self, result: Dict) -> str:
-        """从 OCR 响应提取内容,并将 HTML 表格转换为 Markdown"""
-        content = ""
-        if "choices" in result and isinstance(result["choices"], list):
-            if len(result["choices"]) > 0:
-                message = result["choices"][0].get("message", {})
-                content = message.get("content", "")
-
-        # 如果内容包含 HTML 标签,转换为 Markdown
-        if content and "<" in content and ">" in content:
-            try:
-                from ..doc_worker.pdf_worker.html_to_markdown import convert_html_to_markdown
-                content = convert_html_to_markdown(content)
-            except Exception as e:
-                logger.debug(f"HTML 转 Markdown 失败,保留原始内容: {e}")
-
-        return content
+        """从 OCR 响应提取内容,具体实现委托给 OcrProcessor。"""
+        if self.ocr_processor is None:
+            return ""
+        return self.ocr_processor._extract_ocr_content(result)
 
 
     @staticmethod
     @staticmethod
     def _is_header_footer(line: str) -> bool:
     def _is_header_footer(line: str) -> bool: