فهرست منبع

fix(ocr): 改用 ThreadPoolExecutor 避免 asyncio 事件循环问题

- 使用 ThreadPoolExecutor 5并发执行 OCR
- 移除 nest_asyncio 依赖
- 避免事件循环冲突和关闭问题
WangXuMing 1 هفته پیش
والد
کامیت
bec8d41523
1فایلهای تغییر یافته به همراه24 افزوده شده و 32 حذف شده
  1. 24 32
      core/construction_review/component/minimal_pipeline/pdf_extractor.py

+ 24 - 32
core/construction_review/component/minimal_pipeline/pdf_extractor.py

@@ -1,17 +1,17 @@
 """
-PDF 结构提取器 - 步并发 OCR 版本
+PDF 结构提取器 - 步并发 OCR 版本
 
 基于 splitter_pdf 逻辑,直接提取章节结构并记录页码。
-支持 OCR 增强:检测表格区域并使用 asyncio 5并发 OCR,其他文本保持 PyMuPDF 提取。
+支持 OCR 增强:检测表格区域并使用 ThreadPoolExecutor 5并发 OCR,其他文本保持 PyMuPDF 提取。
 输出格式兼容后续分类与组装流程。
 """
 
-import asyncio
 import base64
 import io
 import re
+from concurrent.futures import ThreadPoolExecutor, as_completed
 from dataclasses import dataclass
-from typing import Dict, Any, List, Optional, Tuple, Set
+from typing import Dict, Any, List, Optional, Tuple
 
 import fitz
 import numpy as np
@@ -139,13 +139,7 @@ class PdfStructureExtractor:
 
         if table_regions:
             logger.info(f"[OCR执行] 使用 {self.OCR_CONCURRENT_WORKERS} 并发执行 OCR...")
-
-            # 使用 nest_asyncio 允许嵌套事件循环
-            import nest_asyncio
-            nest_asyncio.apply()
-
-            ocr_results = asyncio.run(self._process_ocr_concurrent(table_regions))
-
+            ocr_results = self._process_ocr_concurrent(table_regions)
             success_count = sum(1 for r in ocr_results if r.success)
             logger.info(f"[OCR执行] 完成 {success_count}/{len(table_regions)} 个表格 OCR")
 
@@ -261,42 +255,40 @@ class PdfStructureExtractor:
         logger.info(f"[PdfExtractor] 提取完成,共 {len(result['chapters'])} 个章节")
         return result
 
-    async def _process_ocr_concurrent(self, regions: List[TableRegion]) -> List[OcrResult]:
-        """异步并发处理 OCR"""
-        semaphore = asyncio.Semaphore(self.OCR_CONCURRENT_WORKERS)
+    def _process_ocr_concurrent(self, regions: List[TableRegion]) -> List[OcrResult]:
+        """同步并发处理 OCR(使用 ThreadPoolExecutor)"""
+        results: List[OcrResult] = []
 
-        async def _ocr_single(region: TableRegion) -> OcrResult:
-            async with semaphore:
+        with ThreadPoolExecutor(max_workers=self.OCR_CONCURRENT_WORKERS) as executor:
+            # 提交所有任务
+            future_to_region = {
+                executor.submit(self._ocr_table_region, r.page, r.bbox): r
+                for r in regions
+            }
+
+            # 处理完成的结果
+            for future in as_completed(future_to_region):
+                region = future_to_region[future]
                 try:
-                    text = await self._ocr_region_async(region.page, region.bbox)
-                    return OcrResult(
+                    text = future.result()
+                    results.append(OcrResult(
                         page_num=region.page_num,
                         bbox=region.bbox,
                         score=region.score,
                         text=text,
                         success=True,
-                    )
+                    ))
                 except Exception as e:
                     logger.error(f"  第 {region.page_num} 页表格 OCR 失败: {e}")
-                    return OcrResult(
+                    results.append(OcrResult(
                         page_num=region.page_num,
                         bbox=region.bbox,
                         score=region.score,
                         text="",
                         success=False,
-                    )
-
-        tasks = [_ocr_single(r) for r in regions]
-        return await asyncio.gather(*tasks)
+                    ))
 
-    async def _ocr_region_async(
-        self,
-        page: fitz.Page,
-        bbox: Tuple[float, float, float, float]
-    ) -> str:
-        """异步执行单区域 OCR"""
-        loop = asyncio.get_event_loop()
-        return await loop.run_in_executor(None, self._ocr_table_region, page, bbox)
+        return results
 
     def _detect_table_regions(
         self,