Jelajahi Sumber

fix(纯数字内容切掉bug)

tangle 3 hari lalu
induk
melakukan
49fcb04057

+ 193 - 11
core/construction_review/component/minimal_pipeline/pdf_extractor.py

@@ -206,18 +206,122 @@ class PdfStructureExtractor:
         Returns:
             {"chapters": [...], "total_chapters": N} 或 None
         """
-        # 延迟导入避免循环依赖(YOLO依赖必须存在,否则报错)
-        from .toc_detector import TOCCatalogExtractor
-
-        if self._toc_extractor is None:
-            self._toc_extractor = TOCCatalogExtractor(
-                model_path=self.toc_model_path,
-                ocr_api_url=self.ocr_api_url,
-                ocr_api_key=self.ocr_api_key,
-                ocr_timeout=self.ocr_timeout,
+        catalog: Optional[Dict[str, Any]] = None
+
+        try:
+            # 延迟导入避免循环依赖(YOLO依赖必须存在,否则报错)
+            from .toc_detector import TOCCatalogExtractor
+
+            if self._toc_extractor is None:
+                self._toc_extractor = TOCCatalogExtractor(
+                    model_path=self.toc_model_path,
+                    ocr_api_url=self.ocr_api_url,
+                    ocr_api_key=self.ocr_api_key,
+                    ocr_timeout=self.ocr_timeout,
+                )
+
+            catalog = self._toc_extractor.detect_and_extract(file_content, progress_callback)
+        except Exception as exc:
+            logger.warning(f"[PDF提取] 目录检测器不可用,回退到纯文本目录解析: {exc}")
+
+        if catalog:
+            catalog_chapters = self._sanitize_catalog_chapters(catalog.get("chapters", []))
+            raw_text = (catalog.get("raw_ocr_text") or "").strip()
+            if catalog_chapters or raw_text:
+                return catalog
+
+        fallback_catalog = self._extract_catalog_from_front_pages_text(file_content)
+        if fallback_catalog:
+            logger.info(
+                f"[PDF提取] 使用前几页纯文本目录兜底成功: {fallback_catalog.get('total_chapters', 0)} 章"
             )
+        return fallback_catalog
+
+    def _extract_catalog_from_front_pages_text(
+        self,
+        file_content: bytes,
+        max_pages: int = 12,
+    ) -> Optional[Dict[str, Any]]:
+        """当目录检测失败时,从前几页纯文本中兜底解析目录。"""
+        doc = fitz.open(stream=file_content)
+        try:
+            catalog_pages: List[str] = []
+            started = False
+            scan_pages = min(max_pages, len(doc))
 
-        return self._toc_extractor.detect_and_extract(file_content, progress_callback)
+            for page_num in range(scan_pages):
+                page = doc.load_page(page_num)
+                rect = page.rect
+                clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
+                page_text = page.get_text("text", clip=clip_box)
+                if not page_text or not page_text.strip():
+                    if started:
+                        break
+                    continue
+
+                has_marker, toc_like_count, page_suffix_count = self._catalog_text_signals(page_text)
+                if not started:
+                    is_catalog_page = (
+                        has_marker
+                        or page_suffix_count >= 2
+                        or (page_suffix_count >= 1 and toc_like_count >= 6)
+                    )
+                    if not is_catalog_page:
+                        continue
+                    started = True
+                else:
+                    is_catalog_page = (
+                        has_marker
+                        or page_suffix_count >= 1
+                    )
+                    if not is_catalog_page:
+                        break
+
+                catalog_pages.append(page_text)
+
+            raw_text = "\n".join(catalog_pages).strip()
+            if not raw_text:
+                return None
+
+            chapters = self._parse_catalog_from_raw_text(raw_text)
+            if not chapters:
+                return None
+
+            return {
+                "chapters": chapters,
+                "total_chapters": len(chapters),
+                "raw_ocr_text": raw_text,
+                "formatted_text": self._format_catalog_chapters(chapters),
+            }
+        finally:
+            doc.close()
+
+    @classmethod
+    def _catalog_text_signals(cls, text: str) -> Tuple[bool, int, int]:
+        compact_text = re.sub(r"\s+", "", text or "")
+        has_marker = "目录" in compact_text or "目錄" in compact_text
+        toc_like_count = 0
+        page_suffix_count = 0
+
+        for raw_line in cls._prepare_catalog_raw_lines(text):
+            title_text, page = cls._split_catalog_entry(raw_line)
+            if not title_text:
+                continue
+
+            compact_title = re.sub(r"\s+", "", title_text)
+            if compact_title in {"目录", "目錄"}:
+                toc_like_count += 1
+                continue
+
+            if page is not None:
+                page_suffix_count += 1
+                toc_like_count += 1
+                continue
+
+            if cls._matching_rule_names(title_text, "l1") or cls._matching_rule_names(title_text, "l2"):
+                toc_like_count += 1
+
+        return has_marker, toc_like_count, page_suffix_count
 
     def _extract_from_doc(self, doc: fitz.Document, progress_callback=None) -> Dict[str, Any]:
         """提取文档结构(支持 OCR 异步并发)。
@@ -288,6 +392,7 @@ class PdfStructureExtractor:
         in_body = False
         candidate_rule_names: Optional[List[str]] = None
         active_rule_name: Optional[str] = None
+        last_numeric_chapter_no: Optional[int] = None
 
         logger.info("[文本提取] 提取页面内容并切分章节...")
 
@@ -333,7 +438,15 @@ class PdfStructureExtractor:
                     # 只有首次遇到真正的一级标题后,才认为进入正文。
                     # 这样可以避免目录页虽然命中标题规则,却被误当成正文结构。
                     matched_rules = self._matching_rule_names(line, "l1")
-                    if matched_rules and not self.TOC_PATTERN.search(line):
+                    if (
+                        matched_rules
+                        and not self.TOC_PATTERN.search(line)
+                        and not self._is_false_positive_chapter_heading(
+                            line,
+                            matched_rules,
+                            last_numeric_chapter_no,
+                        )
+                    ):
                         in_body = True
                         candidate_rule_names = matched_rules
                     else:
@@ -351,9 +464,18 @@ class PdfStructureExtractor:
                 # 匹配章标题
                 matched_chapter_rules = self._matching_rule_names(line, "l1", active_scope)
                 if matched_chapter_rules:
+                    if self._is_false_positive_chapter_heading(
+                        line,
+                        matched_chapter_rules,
+                        last_numeric_chapter_no,
+                    ):
+                        continue
                     if active_rule_name is None:
                         candidate_rule_names = matched_chapter_rules
                     current_chapter = self._clean_chapter_title(line)
+                    numeric_chapter_no = self._extract_numeric_l1_number(line)
+                    if numeric_chapter_no is not None:
+                        last_numeric_chapter_no = numeric_chapter_no
                     current_section = "章节标题"
                     if current_chapter not in structured_data:
                         structured_data[current_chapter] = {}
@@ -1618,6 +1740,66 @@ class PdfStructureExtractor:
     def _matches_section_heading(cls, line: str, rule_names: Optional[List[str]] = None) -> bool:
         return bool(cls._matching_rule_names(line, "l2", rule_names))
 
+    @staticmethod
+    def _extract_numeric_l1_number(line: str) -> Optional[int]:
+        cleaned = PdfStructureExtractor._strip_leading_page_number_from_cn_chapter(line.strip())
+        match = re.match(r"^(\d{1,2})(?:[\..。、]?)\s*(.*)$", cleaned)
+        if not match:
+            return None
+
+        try:
+            return int(match.group(1))
+        except Exception:
+            return None
+
+    @staticmethod
+    def _looks_like_measurement_heading_body(body: str) -> bool:
+        compact_body = re.sub(r"\s+", "", body or "")
+        if not compact_body:
+            return True
+
+        measurement_patterns = (
+            r"^(?:个月|月|天|年一遇|台|套|吨|千瓦|人|名|%|‰|℃)",
+            r"^(?:kva|kv|mpa|kn)(?:\d|$)",
+            r"^(?:km|cm|mm|m)(?:\d|$)",
+            r"^m[23](?:$|[^A-Za-z])",
+            r"^(?:㎡|㎥)",
+        )
+        lower_body = compact_body.lower()
+        return any(re.match(pattern, lower_body) for pattern in measurement_patterns)
+
+    @classmethod
+    def _is_false_positive_chapter_heading(
+        cls,
+        line: str,
+        matched_rules: Optional[List[str]],
+        previous_numeric_chapter_no: Optional[int],
+    ) -> bool:
+        if not matched_rules or "Rule_1_纯数字派" not in matched_rules:
+            return False
+
+        cleaned = cls._strip_leading_page_number_from_cn_chapter(line.strip())
+        match = re.match(r"^(\d{1,2})(?:[\..。、]?)\s*(.*)$", cleaned)
+        if not match:
+            return False
+
+        try:
+            chapter_no = int(match.group(1))
+        except Exception:
+            return False
+
+        body = (match.group(2) or "").strip()
+        if cls._looks_like_measurement_heading_body(body):
+            return True
+
+        if previous_numeric_chapter_no is None:
+            return chapter_no > 12
+
+        if chapter_no <= previous_numeric_chapter_no:
+            return True
+
+        return chapter_no > previous_numeric_chapter_no + 5
+
     @staticmethod
     def _strip_leading_page_number_from_cn_chapter(line: str) -> str:
         cleaned = re.sub(r"\s+", " ", line.strip())