Jelajahi Sumber

feat(ocr): 添加OCR进度推送、重试机制和HTML转Markdown

1. PDF提取器添加进度回调支持:
   - 版面分析阶段每5页推送进度(0-30%)
   - OCR识别阶段每完成5个表格推送进度(35-50%)

2. OCR添加指数退避重试机制:
   - 最多重试3次
   - 等待时间: 2s, 4s, 8s
   - 有效处理502等偶发网络错误

3. OCR结果自动HTML转Markdown:
   - 使用HTMLToMarkdownConverter转换表格
   - 转换失败保留原始内容

4. 修复to_legacy_dict的outline结构:
   - 添加chapter_classification字段
   - 添加secondary_category_code字段
   - subsections从secondary_classifications构建
WangXuMing 1 Minggu lalu
induk
melakukan
c40d8ec403

+ 33 - 6
core/construction_review/component/doc_worker/models/document_structure.py

@@ -377,6 +377,13 @@ class UnifiedDocumentStructure:
             "processing_timestamp": self.processing_timestamp
         }
 
+    @staticmethod
+    def _number_to_chinese(num: int) -> str:
+        """数字转中文序号"""
+        chinese_nums = {1: "一", 2: "二", 3: "三", 4: "四", 5: "五",
+                       6: "六", 7: "七", 8: "八", 9: "九", 10: "十"}
+        return chinese_nums.get(num, str(num))
+
     def to_legacy_dict(self) -> Dict[str, Any]:
         """
         转换为旧版字典格式(兼容 AI 审查工作流)
@@ -432,14 +439,34 @@ class UnifiedDocumentStructure:
             chunks.append(chunk)
 
         # 构建 outline 结构(兼容旧格式)
-        outline_chapters = []
-        for item in self.outline.items:
-            outline_chapters.append({
-                "original": item.raw_title or f"{item.first_name}->{item.second_name}",
-                "chapter": item.first_name,
-                "subsections": []
+        # 按一级分类分组构建 chapters
+        chapters_map: Dict[str, Dict[str, Any]] = {}
+
+        for sec in self.secondary_classifications:
+            # 一级code作为key
+            first_code = sec.first_code
+
+            if first_code not in chapters_map:
+                chapters_map[first_code] = {
+                    "index": sec.first_seq,
+                    "title": f"第{self._number_to_chinese(sec.first_seq)}章 {sec.first_name}",
+                    "page": str(sec.page_start or 1),
+                    "original": sec.section_label.split("->")[0] if "->" in sec.section_label else sec.first_name,
+                    "chapter_classification": first_code,
+                    "subsections": []
+                }
+
+            # 添加二级到 subsections
+            chapters_map[first_code]["subsections"].append({
+                "title": sec.section_label.split("->")[-1] if "->" in sec.section_label else sec.second_name,
+                "page": str(sec.page_start or 1),
+                "level": 2,
+                "original": sec.section_label,
+                "secondary_category_code": sec.second_code
             })
 
+        outline_chapters = list(chapters_map.values())
+
         return {
             "document_id": self.document_id,
             "document_name": self.document_name,

+ 77 - 21
core/construction_review/component/minimal_pipeline/pdf_extractor.py

@@ -89,10 +89,14 @@ class PdfStructureExtractor:
             self._layout_engine = RapidLayout()
         return self._layout_engine
 
-    def extract(self, file_content: bytes) -> Dict[str, Any]:
+    def extract(self, file_content: bytes, progress_callback=None) -> Dict[str, Any]:
         """
         从 PDF 字节流提取章节结构。
 
+        Args:
+            file_content: PDF 文件字节流
+            progress_callback: 进度回调函数,接收 (stage, current, message) 参数
+
         Returns:
             {
                 "chapters": {
@@ -106,21 +110,30 @@ class PdfStructureExtractor:
         """
         doc = fitz.open(stream=file_content)
         try:
-            structure = self._extract_from_doc(doc)
+            structure = self._extract_from_doc(doc, progress_callback)
             structure["total_pages"] = len(doc)
             return structure
         finally:
             doc.close()
 
-    def _extract_from_doc(self, doc: fitz.Document) -> Dict[str, Any]:
+    def _extract_from_doc(self, doc: fitz.Document, progress_callback=None) -> Dict[str, Any]:
         """提取文档结构(支持 OCR 异步并发)"""
 
+        def _emit_progress(stage: str, current: int, message: str):
+            """发送进度回调"""
+            if progress_callback:
+                try:
+                    progress_callback(stage, current, message)
+                except Exception:
+                    pass
+
         # === 阶段1: 收集所有需要 OCR 的表格区域 ===
         table_regions: List[TableRegion] = []
 
         if self.use_ocr:
             logger.info("[OCR预处理] 扫描所有页面的表格区域...")
-            for page_num in range(len(doc)):
+            total_pages = len(doc)
+            for page_num in range(total_pages):
                 page = doc.load_page(page_num)
                 rect = page.rect
                 clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
@@ -132,6 +145,10 @@ class PdfStructureExtractor:
                         bbox=bbox,
                         score=score
                     ))
+                # 每5页或最后一页推送一次进度
+                if (page_num + 1) % 5 == 0 or page_num == total_pages - 1:
+                    progress = int((page_num + 1) / total_pages * 30)  # OCR预处理占30%进度
+                    _emit_progress("版面分析", progress, f"扫描页面 {page_num + 1}/{total_pages}")
             logger.info(f"[OCR预处理] 共发现 {len(table_regions)} 个表格区域需要 OCR")
 
         # === 阶段2: 异步并发执行 OCR (5并发) ===
@@ -139,9 +156,11 @@ class PdfStructureExtractor:
 
         if table_regions:
             logger.info(f"[OCR执行] 使用 {self.OCR_CONCURRENT_WORKERS} 并发执行 OCR...")
-            ocr_results = self._process_ocr_concurrent(table_regions)
+            _emit_progress("版面分析", 35, f"发现 {len(table_regions)} 个表格,开始OCR识别...")
+            ocr_results = self._process_ocr_concurrent(table_regions, progress_callback=_emit_progress)
             success_count = sum(1 for r in ocr_results if r.success)
             logger.info(f"[OCR执行] 完成 {success_count}/{len(table_regions)} 个表格 OCR")
+            _emit_progress("版面分析", 50, f"OCR识别完成 {success_count}/{len(table_regions)}")
 
         # 按页码分组 OCR 结果
         ocr_by_page: Dict[int, List[OcrResult]] = {}
@@ -255,9 +274,11 @@ class PdfStructureExtractor:
         logger.info(f"[PdfExtractor] 提取完成,共 {len(result['chapters'])} 个章节")
         return result
 
-    def _process_ocr_concurrent(self, regions: List[TableRegion]) -> List[OcrResult]:
+    def _process_ocr_concurrent(self, regions: List[TableRegion], progress_callback=None) -> List[OcrResult]:
         """同步并发处理 OCR(使用 ThreadPoolExecutor)"""
         results: List[OcrResult] = []
+        total = len(regions)
+        completed = 0
 
         with ThreadPoolExecutor(max_workers=self.OCR_CONCURRENT_WORKERS) as executor:
             # 提交所有任务
@@ -269,6 +290,7 @@ class PdfStructureExtractor:
             # 处理完成的结果
             for future in as_completed(future_to_region):
                 region = future_to_region[future]
+                completed += 1
                 try:
                     text = future.result()
                     results.append(OcrResult(
@@ -288,6 +310,11 @@ class PdfStructureExtractor:
                         success=False,
                     ))
 
+                # 每完成5个或最后一个时推送进度
+                if progress_callback and (completed % 5 == 0 or completed == total):
+                    progress = 35 + int(completed / total * 15)  # OCR执行占15%进度(35-50)
+                    progress_callback("版面分析", progress, f"OCR识别中 {completed}/{total}")
+
         return results
 
     def _detect_table_regions(
@@ -334,8 +361,10 @@ class PdfStructureExtractor:
 
         return table_regions
 
-    def _ocr_table_region(self, page: fitz.Page, bbox: Tuple[float, float, float, float]) -> str:
-        """对指定区域进行 OCR 识别(使用 GLM-OCR)"""
+    def _ocr_table_region(self, page: fitz.Page, bbox: Tuple[float, float, float, float], max_retries: int = 3) -> str:
+        """对指定区域进行 OCR 识别(使用 GLM-OCR),支持指数退避重试"""
+        import time
+
         # 渲染指定区域
         rect = fitz.Rect(bbox)
         pix = page.get_pixmap(dpi=self.OCR_DPI, clip=rect)
@@ -375,16 +404,33 @@ class PdfStructureExtractor:
         if self.ocr_api_key:
             headers["Authorization"] = f"Bearer {self.ocr_api_key}"
 
-        response = requests.post(
-            self.ocr_api_url,
-            headers=headers,
-            json=payload,
-            timeout=self.ocr_timeout
-        )
-        response.raise_for_status()
-
-        result = response.json()
-        return self._extract_ocr_content(result)
+        # 指数退避重试
+        last_error = None
+        for attempt in range(max_retries):
+            try:
+                response = requests.post(
+                    self.ocr_api_url,
+                    headers=headers,
+                    json=payload,
+                    timeout=self.ocr_timeout
+                )
+                response.raise_for_status()
+
+                result = response.json()
+                return self._extract_ocr_content(result)
+
+            except Exception as e:
+                last_error = e
+                if attempt < max_retries - 1:
+                    # 指数退避: 2, 4, 8 秒
+                    wait_time = 2 ** (attempt + 1)
+                    logger.warning(f"  第 {page.number + 1} 页表格 OCR 第 {attempt + 1} 次失败: {e}, {wait_time}秒后重试...")
+                    time.sleep(wait_time)
+                else:
+                    logger.error(f"  第 {page.number + 1} 页表格 OCR 最终失败(已重试{max_retries}次): {e}")
+
+        # 所有重试都失败,抛出最后一个错误
+        raise last_error
 
     def _replace_table_regions(
         self,
@@ -500,12 +546,22 @@ class PdfStructureExtractor:
             return img_bytes
 
     def _extract_ocr_content(self, result: Dict) -> str:
-        """从 OCR 响应提取内容"""
+        """从 OCR 响应提取内容,并将 HTML 表格转换为 Markdown"""
+        content = ""
         if "choices" in result and isinstance(result["choices"], list):
             if len(result["choices"]) > 0:
                 message = result["choices"][0].get("message", {})
-                return message.get("content", "")
-        return ""
+                content = message.get("content", "")
+
+        # 如果内容包含 HTML 标签,转换为 Markdown
+        if content and "<" in content and ">" in content:
+            try:
+                from ..doc_worker.pdf_worker.html_to_markdown import convert_html_to_markdown
+                content = convert_html_to_markdown(content)
+            except Exception as e:
+                logger.debug(f"HTML 转 Markdown 失败,保留原始内容: {e}")
+
+        return content
 
     @staticmethod
     def _is_header_footer(line: str) -> bool:

+ 14 - 2
core/construction_review/component/minimal_pipeline/simple_processor.py

@@ -104,8 +104,20 @@ class SimpleDocumentProcessor:
         """执行核心流程,返回 (structure, primary_result, secondary_result, chunks)。"""
         logger.info(f"[SimpleProcessor] 开始处理文档: {file_name}")
 
-        # 1. PDF 结构提取
-        structure = self.pdf_extractor.extract(file_content)
+        # 1. PDF 结构提取(带进度回调)
+        def _extraction_progress(stage: str, current: int, message: str):
+            # 版面分析阶段映射到"文档提取"阶段,进度0-50
+            if progress_callback:
+                # 使用 asyncio.create_task 异步执行回调避免阻塞
+                try:
+                    loop = asyncio.get_event_loop()
+                    loop.create_task(self._emit_progress(
+                        progress_callback, "文档提取", int(current * 0.5), message
+                    ))
+                except Exception:
+                    pass
+
+        structure = self.pdf_extractor.extract(file_content, progress_callback=_extraction_progress)
         await self._emit_progress(progress_callback, "文档提取", 10, "PDF结构提取完成")
 
         # 2. 一级分类