Переглянути джерело

fix(ocr): 使用 nest_asyncio 解决事件循环冲突

- 在已运行的事件循环中使用 nest_asyncio.apply()
- 允许 asyncio.run() 在嵌套事件循环中执行
WangXuMing 1 тиждень тому
батько
коміт
56bf42f85e

+ 3 - 0
config/config.ini

@@ -58,6 +58,9 @@ REDIS_PASSWORD=123456
 REDIS_MAX_CONNECTIONS=50
 
 [ocr]
+# 是否启用 OCR 表格识别(true/false)
+enable = true
+
 # OCR 引擎选择(以下写法都支持):
 # GLM-OCR: glm_ocr | glm-ocr | glmocr
 # MinerU:  mineru | mineru-ocr | mineru_ocr

+ 126 - 56
core/construction_review/component/minimal_pipeline/pdf_extractor.py

@@ -1,14 +1,16 @@
 """
-PDF 结构提取器
+PDF 结构提取器 - 异步并发 OCR 版本
 
 基于 splitter_pdf 逻辑,直接提取章节结构并记录页码。
-支持 OCR 增强:检测表格区域并精准 OCR,其他文本保持 PyMuPDF 提取。
+支持 OCR 增强:检测表格区域并使用 asyncio 5并发 OCR,其他文本保持 PyMuPDF 提取。
 输出格式兼容后续分类与组装流程。
 """
 
+import asyncio
 import base64
 import io
 import re
+from dataclasses import dataclass
 from typing import Dict, Any, List, Optional, Tuple, Set
 
 import fitz
@@ -26,8 +28,27 @@ except ImportError:
     RapidLayout = None
 
 
+@dataclass
+class TableRegion:
+    """表格区域信息"""
+    page_num: int
+    page: fitz.Page
+    bbox: Tuple[float, float, float, float]
+    score: float
+
+
+@dataclass
+class OcrResult:
+    """OCR 结果"""
+    page_num: int
+    bbox: Tuple[float, float, float, float]
+    score: float
+    text: str
+    success: bool
+
+
 class PdfStructureExtractor:
-    """PDF 章节结构提取器(支持 OCR 增强)"""
+    """PDF 章节结构提取器(支持 OCR 异步并发)"""
 
     CHAPTER_PATTERN = re.compile(r"^第[一二三四五六七八九十百]+章\s*.*")
     SECTION_PATTERN = re.compile(r"^[一二三四五六七八九十百]+、\s*.*")
@@ -38,6 +59,7 @@ class PdfStructureExtractor:
     JPEG_QUALITY = 90
     OCR_DPI = 200
     OCR_CONFIDENCE_THRESHOLD = 0.5
+    OCR_CONCURRENT_WORKERS = 5
 
     def __init__(
         self,
@@ -46,6 +68,7 @@ class PdfStructureExtractor:
         use_ocr: bool = False,
         ocr_api_url: str = "http://183.220.37.46:25429/v1/chat/completions",
         ocr_timeout: int = 600,
+        ocr_api_key: str = "",
     ):
         self.clip_top = clip_top
         self.clip_bottom = clip_bottom
@@ -54,6 +77,7 @@ class PdfStructureExtractor:
         # OCR 配置
         self.ocr_api_url = ocr_api_url
         self.ocr_timeout = ocr_timeout
+        self.ocr_api_key = ocr_api_key
         self._layout_engine: Optional[Any] = None
 
         if use_ocr and not RAPID_LAYOUT_AVAILABLE:
@@ -89,19 +113,76 @@ class PdfStructureExtractor:
             doc.close()
 
     def _extract_from_doc(self, doc: fitz.Document) -> Dict[str, Any]:
+        """提取文档结构(支持 OCR 异步并发)"""
+
+        # === 阶段1: 收集所有需要 OCR 的表格区域 ===
+        table_regions: List[TableRegion] = []
+
+        if self.use_ocr:
+            logger.info("[OCR预处理] 扫描所有页面的表格区域...")
+            for page_num in range(len(doc)):
+                page = doc.load_page(page_num)
+                rect = page.rect
+                clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
+                regions = self._detect_table_regions(page, page_num + 1, clip_box)
+                for bbox, score in regions:
+                    table_regions.append(TableRegion(
+                        page_num=page_num + 1,
+                        page=page,
+                        bbox=bbox,
+                        score=score
+                    ))
+            logger.info(f"[OCR预处理] 共发现 {len(table_regions)} 个表格区域需要 OCR")
+
+        # === 阶段2: 异步并发执行 OCR (5并发) ===
+        ocr_results: List[OcrResult] = []
+
+        if table_regions:
+            logger.info(f"[OCR执行] 使用 {self.OCR_CONCURRENT_WORKERS} 并发执行 OCR...")
+
+            # 使用 nest_asyncio 允许嵌套事件循环
+            import nest_asyncio
+            nest_asyncio.apply()
+
+            ocr_results = asyncio.run(self._process_ocr_concurrent(table_regions))
+
+            success_count = sum(1 for r in ocr_results if r.success)
+            logger.info(f"[OCR执行] 完成 {success_count}/{len(table_regions)} 个表格 OCR")
+
+        # 按页码分组 OCR 结果
+        ocr_by_page: Dict[int, List[OcrResult]] = {}
+        for result in ocr_results:
+            if result.success:
+                if result.page_num not in ocr_by_page:
+                    ocr_by_page[result.page_num] = []
+                ocr_by_page[result.page_num].append(result)
+
+        # === 阶段3: 提取页面文本(应用 OCR 结果)并切分章节 ===
         structured_data: Dict[str, Dict[str, Dict[str, Any]]] = {}
         current_chapter = "未分类前言"
         current_section = "默认部分"
         in_body = False
 
+        logger.info("[文本提取] 提取页面内容并切分章节...")
+
         for page_num in range(len(doc)):
             page = doc.load_page(page_num)
             rect = page.rect
             clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
 
-            # 获取页面文本(支持 OCR 增强)
-            if self.use_ocr:
-                text = self._extract_page_with_ocr(page, page_num + 1, clip_box)
+            # 获取页面文本(应用 OCR 结果)
+            if page_num + 1 in ocr_by_page:
+                original_text = page.get_text("text", clip=clip_box)
+                ocr_results_list = [
+                    {
+                        "region_index": i,
+                        "bbox": r.bbox,
+                        "score": r.score,
+                        "ocr_text": r.text,
+                    }
+                    for i, r in enumerate(ocr_by_page[page_num + 1])
+                ]
+                text = self._replace_table_regions(page, original_text, ocr_results_list, clip_box)
             else:
                 text = page.get_text("text", clip=clip_box)
 
@@ -180,56 +261,42 @@ class PdfStructureExtractor:
         logger.info(f"[PdfExtractor] 提取完成,共 {len(result['chapters'])} 个章节")
         return result
 
-    def _extract_page_with_ocr(self, page: fitz.Page, page_num: int, clip_box: fitz.Rect) -> str:
-        """
-        提取页面内容,对表格区域使用 OCR
-
-        流程:
-        1. PyMuPDF 提取全部文本(确保章节格式稳定)
-        2. 检测表格区域坐标
-        3. 只 OCR 表格区域
-        4. 用 OCR 结果替换表格区域内容
-        """
-        import time
-
-        start_time = time.time()
-
-        # 1. PyMuPDF 提取全部文本
-        original_text = page.get_text("text", clip=clip_box)
-
-        # 2. 检测表格区域
-        table_regions = self._detect_table_regions(page, page_num, clip_box)
-
-        if not table_regions:
-            return original_text
-
-        logger.info(f"  第 {page_num} 页: 检测到 {len(table_regions)} 个表格区域")
-
-        # 3. OCR 每个表格区域
-        ocr_results = []
-        for idx, (bbox, score) in enumerate(table_regions):
-            try:
-                ocr_text = self._ocr_table_region(page, bbox)
-                ocr_results.append({
-                    "region_index": idx,
-                    "bbox": bbox,
-                    "score": score,
-                    "ocr_text": ocr_text,
-                })
-                logger.debug(f"    区域 {idx+1}: OCR 完成 ({len(ocr_text)} 字符)")
-            except Exception as e:
-                logger.error(f"    区域 {idx+1}: OCR 失败 ({e})")
-
-        # 4. 替换表格区域内容
-        if ocr_results:
-            updated_text = self._replace_table_regions(
-                page, original_text, ocr_results, clip_box
-            )
-            elapsed = time.time() - start_time
-            logger.info(f"  第 {page_num} 页: OCR 处理完成,耗时 {elapsed:.2f}s")
-            return updated_text
-
-        return original_text
+    async def _process_ocr_concurrent(self, regions: List[TableRegion]) -> List[OcrResult]:
+        """异步并发处理 OCR"""
+        semaphore = asyncio.Semaphore(self.OCR_CONCURRENT_WORKERS)
+
+        async def _ocr_single(region: TableRegion) -> OcrResult:
+            async with semaphore:
+                try:
+                    text = await self._ocr_region_async(region.page, region.bbox)
+                    return OcrResult(
+                        page_num=region.page_num,
+                        bbox=region.bbox,
+                        score=region.score,
+                        text=text,
+                        success=True,
+                    )
+                except Exception as e:
+                    logger.error(f"  第 {region.page_num} 页表格 OCR 失败: {e}")
+                    return OcrResult(
+                        page_num=region.page_num,
+                        bbox=region.bbox,
+                        score=region.score,
+                        text="",
+                        success=False,
+                    )
+
+        tasks = [_ocr_single(r) for r in regions]
+        return await asyncio.gather(*tasks)
+
+    async def _ocr_region_async(
+        self,
+        page: fitz.Page,
+        bbox: Tuple[float, float, float, float]
+    ) -> str:
+        """异步执行单区域 OCR"""
+        loop = asyncio.get_event_loop()
+        return await loop.run_in_executor(None, self._ocr_table_region, page, bbox)
 
     def _detect_table_regions(
         self,
@@ -313,6 +380,9 @@ class PdfStructureExtractor:
         }
 
         headers = {"Content-Type": "application/json"}
+        if self.ocr_api_key:
+            headers["Authorization"] = f"Bearer {self.ocr_api_key}"
+
         response = requests.post(
             self.ocr_api_url,
             headers=headers,

+ 19 - 1
core/construction_review/component/minimal_pipeline/simple_processor.py

@@ -36,7 +36,25 @@ class SimpleDocumentProcessor:
     """最简文档处理器"""
 
     def __init__(self, use_ocr: bool = False):
-        self.pdf_extractor = PdfStructureExtractor(use_ocr=use_ocr)
+        # 从配置读取 OCR 配置
+        ocr_api_url = "http://183.220.37.46:25429/v1/chat/completions"
+        ocr_api_key = ""
+        ocr_timeout = 600
+
+        try:
+            from foundation.infrastructure.config.config import config_handler
+            ocr_api_url = config_handler.get("ocr", "GLM_OCR_API_URL", ocr_api_url)
+            ocr_api_key = config_handler.get("ocr", "GLM_OCR_API_KEY", "")
+            ocr_timeout = int(config_handler.get("ocr", "GLM_OCR_TIMEOUT", str(ocr_timeout)))
+        except Exception:
+            pass
+
+        self.pdf_extractor = PdfStructureExtractor(
+            use_ocr=use_ocr,
+            ocr_api_url=ocr_api_url,
+            ocr_api_key=ocr_api_key,
+            ocr_timeout=ocr_timeout,
+        )
         self.hierarchy_classifier = HierarchyClassifier()
         self.chunk_classifier = ChunkClassifier()
 

+ 7 - 1
core/construction_review/workflows/document_workflow.py

@@ -32,9 +32,15 @@ class DocumentWorkflow:
 
         self.progress_manager = progress_manager
         self.redis_duplicate_checker = redis_duplicate_checker
+        # 从配置读取是否启用 OCR
+        from foundation.infrastructure.config.config import config_handler
+        use_ocr_str = config_handler.get("ocr", "enable", "false")
+        use_ocr = use_ocr_str.lower() in ("true", "1", "yes", "on")
+
         self.document_processor = DocumentProcessor(
             progress_manager=progress_manager,
-            callback_task_id=task_file_info.callback_task_id
+            callback_task_id=task_file_info.callback_task_id,
+            use_ocr=use_ocr
         )
 
     async def _processing_heartbeat(self, progress_state: dict) -> None: