Kaynağa Gözat

fix(功能问题修复)

tangle 4 gün önce
ebeveyn
işleme
39395d5c72

+ 529 - 52
core/construction_review/component/minimal_pipeline/pdf_extractor2.py

@@ -11,7 +11,7 @@ import io
 import re
 from concurrent.futures import ThreadPoolExecutor, as_completed
 from dataclasses import dataclass
-from typing import Dict, Any, List, Optional, Tuple
+from typing import Dict, Any, List, Optional, Tuple, Set
 
 import fitz
 import numpy as np
@@ -169,6 +169,14 @@ class PdfStructureExtractor:
                     structure.get("chapters", {}),
                     result["catalog"],
                 )
+                rebuilt_chapters = self._rebuild_section_contents_from_catalog(
+                    structure.get("chapters", {}),
+                    result["catalog"],
+                    structure.get("_body_lines", []),
+                )
+                if rebuilt_chapters:
+                    structure["chapters"] = rebuilt_chapters
+            structure.pop("_body_lines", None)
             result["chapters"] = structure.get("chapters", {})
             result["total_pages"] = len(doc)
             return result
@@ -251,6 +259,7 @@ class PdfStructureExtractor:
 
         # === 阶段3: 提取页面文本(应用 OCR 结果)并切分章节 ===
         structured_data: Dict[str, Dict[str, Dict[str, Any]]] = {}
+        body_lines: List[Dict[str, Any]] = []
         current_chapter = "未分类前言"
         current_section = "默认部分"
         in_body = False
@@ -280,7 +289,14 @@ class PdfStructureExtractor:
             else:
                 text = page.get_text("text", clip=clip_box)
 
-            lines = text.split("\n")
+            lines = self._prepare_page_lines(text)
+            for line in lines:
+                if not line or self._is_header_footer(line):
+                    continue
+                body_lines.append({
+                    "page": page_num + 1,
+                    "text": line,
+                })
 
             for line in lines:
                 line = line.strip()
@@ -358,7 +374,7 @@ class PdfStructureExtractor:
                 structured_data[current_chapter][current_section]["page_end"] = page_num + 1
 
         # 将行列表拼接为文本
-        result: Dict[str, Any] = {"chapters": {}}
+        result: Dict[str, Any] = {"chapters": {}, "_body_lines": body_lines}
         for chap, sections in structured_data.items():
             result["chapters"][chap] = {}
             for sec, data in sections.items():
@@ -376,12 +392,26 @@ class PdfStructureExtractor:
             return {}
 
         normalized = dict(catalog)
+        existing_chapters = self._sanitize_catalog_chapters(catalog.get("chapters", []))
         raw_text = catalog.get("raw_ocr_text", "")
         parsed_chapters = self._parse_catalog_from_raw_text(raw_text) if isinstance(raw_text, str) else []
+        selected_chapters = existing_chapters
+
         if parsed_chapters:
-            normalized["chapters"] = parsed_chapters
-            normalized["total_chapters"] = len(parsed_chapters)
-            normalized["formatted_text"] = self._format_catalog_chapters(parsed_chapters)
+            if self._should_prefer_parsed_catalog(parsed_chapters, existing_chapters):
+                selected_chapters = parsed_chapters
+            elif existing_chapters:
+                logger.info(
+                    "[PDF提取] raw_ocr_text目录解析结果异常,保留原始目录骨架: "
+                    f"parsed={len(parsed_chapters)}, original={len(existing_chapters)}"
+                )
+            else:
+                selected_chapters = parsed_chapters
+
+        if selected_chapters:
+            normalized["chapters"] = selected_chapters
+            normalized["total_chapters"] = len(selected_chapters)
+            normalized["formatted_text"] = self._format_catalog_chapters(selected_chapters)
         return normalized
 
     def _parse_catalog_from_raw_text(self, text: str) -> List[Dict[str, Any]]:
@@ -391,6 +421,7 @@ class PdfStructureExtractor:
         chapters: List[Dict[str, Any]] = []
         current_chapter: Optional[Dict[str, Any]] = None
         active_l2_rule: Optional[str] = None
+        document_l1_rules: Optional[List[str]] = None
 
         for raw_line in text.splitlines():
             title_text, page = self._split_catalog_entry(raw_line)
@@ -401,8 +432,10 @@ class PdfStructureExtractor:
             if compact in {"目录", "目錄"}:
                 continue
 
-            chapter_matches = self._matching_rule_names(title_text, "l1")
+            chapter_matches = self._matching_rule_names(title_text, "l1", document_l1_rules)
             if chapter_matches:
+                if document_l1_rules is None:
+                    document_l1_rules = chapter_matches
                 current_chapter = {
                     "index": len(chapters) + 1,
                     "title": self._clean_chapter_title(title_text),
@@ -419,6 +452,24 @@ class PdfStructureExtractor:
 
             section_matches = self._matching_rule_names(title_text, "l2")
             if not section_matches:
+                numeric_section_title = self._coerce_numeric_catalog_section(
+                    title_text,
+                    document_l1_rules,
+                    active_l2_rule,
+                )
+                if numeric_section_title:
+                    section_key = self._normalize_heading_key(numeric_section_title)
+                    existing_keys = {
+                        self._normalize_heading_key(sub.get("title", ""))
+                        for sub in current_chapter.get("subsections", [])
+                    }
+                    if section_key not in existing_keys:
+                        current_chapter["subsections"].append({
+                            "title": numeric_section_title,
+                            "page": str(page or current_chapter.get("page", 1)),
+                            "level": 2,
+                            "original": raw_line.strip(),
+                        })
                 continue
 
             if active_l2_rule is None:
@@ -444,6 +495,174 @@ class PdfStructureExtractor:
 
         return chapters
 
+    @classmethod
+    def _sanitize_catalog_chapters(cls, chapters: Any) -> List[Dict[str, Any]]:
+        if not isinstance(chapters, list):
+            return []
+
+        sanitized: List[Dict[str, Any]] = []
+        seen_chapter_keys: Set[str] = set()
+
+        for idx, chapter in enumerate(chapters, 1):
+            if not isinstance(chapter, dict):
+                continue
+
+            chapter_title = cls._clean_chapter_title(str(chapter.get("title", "") or ""))
+            chapter_key = cls._normalize_heading_key(chapter_title)
+            if not chapter_key or chapter_key in seen_chapter_keys:
+                continue
+
+            seen_chapter_keys.add(chapter_key)
+            chapter_page = str(chapter.get("page") or idx)
+            subsections: List[Dict[str, Any]] = []
+            seen_section_keys: Set[str] = set()
+
+            for subsection in chapter.get("subsections", []) or []:
+                if not isinstance(subsection, dict):
+                    continue
+
+                section_title = cls._clean_section_title(str(subsection.get("title", "") or ""))
+                section_key = cls._normalize_heading_key(section_title)
+                if not section_key or section_key in seen_section_keys:
+                    continue
+
+                seen_section_keys.add(section_key)
+                subsections.append({
+                    "title": section_title,
+                    "page": str(subsection.get("page") or chapter_page),
+                    "level": 2,
+                    "original": subsection.get("original", "") or section_title,
+                })
+
+            sanitized.append({
+                "index": len(sanitized) + 1,
+                "title": chapter_title,
+                "page": chapter_page,
+                "original": chapter.get("original", "") or chapter_title,
+                "subsections": subsections,
+            })
+
+        return sanitized
+
+    @classmethod
+    def _should_prefer_parsed_catalog(
+        cls,
+        parsed_chapters: List[Dict[str, Any]],
+        existing_chapters: List[Dict[str, Any]],
+    ) -> bool:
+        if not parsed_chapters:
+            return False
+
+        if cls._catalog_has_suspicious_structure(parsed_chapters):
+            return False
+
+        if not existing_chapters:
+            return True
+
+        if cls._catalog_has_suspicious_structure(existing_chapters):
+            return True
+
+        parsed_score = cls._catalog_structure_score(parsed_chapters)
+        existing_score = cls._catalog_structure_score(existing_chapters)
+        if parsed_score <= existing_score:
+            return False
+
+        if not cls._catalog_has_suspicious_structure(existing_chapters):
+            existing_count = len(existing_chapters)
+            parsed_count = len(parsed_chapters)
+            if parsed_count > max(existing_count * 2, existing_count + 8):
+                return False
+            if existing_count >= 4 and parsed_count < max(2, existing_count // 2):
+                return False
+
+        return True
+
+    @classmethod
+    def _catalog_has_suspicious_structure(cls, chapters: List[Dict[str, Any]]) -> bool:
+        if not chapters:
+            return False
+
+        titles = [(chapter.get("title", "") or "").strip() for chapter in chapters]
+        chinese_chapter_count = sum(
+            1 for title in titles
+            if re.match(r"^第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章节部分篇]", title)
+        )
+        numeric_heading_count = sum(
+            1 for title in titles
+            if re.match(r"^\d{1,2}(?:[\..。、])?\s+\S+", title)
+        )
+        embedded_numeric_body_count = 0
+        repeated_chapter_no_count = 0
+        reversed_chapter_no_count = 0
+        seen_chapter_numbers: Set[str] = set()
+        previous_numeric_chapter_no: Optional[int] = None
+
+        for title in titles:
+            chapter_match = re.match(
+                r"^第\s*(\d+|[一二三四五六七八九十百零两]+)\s*[章节部分篇]\s*(.*)$",
+                title,
+            )
+            if not chapter_match:
+                continue
+
+            chapter_no = re.sub(r"\s+", "", chapter_match.group(1))
+            chapter_body = (chapter_match.group(2) or "").strip()
+            if chapter_no in seen_chapter_numbers:
+                repeated_chapter_no_count += 1
+            seen_chapter_numbers.add(chapter_no)
+
+            if chapter_no.isdigit():
+                current_numeric_no = int(chapter_no)
+                if previous_numeric_chapter_no is not None and current_numeric_no < previous_numeric_chapter_no:
+                    reversed_chapter_no_count += 1
+                previous_numeric_chapter_no = current_numeric_no
+
+            if re.match(r"^\d{1,2}(?:\.\d{1,2})*\.?(?:\s+|$)", chapter_body):
+                embedded_numeric_body_count += 1
+
+        if chinese_chapter_count >= 2 and numeric_heading_count >= max(3, chinese_chapter_count // 2):
+            return True
+
+        if chinese_chapter_count >= max(2, len(titles) // 3) and numeric_heading_count >= max(2, len(titles) // 6):
+            return True
+
+        if embedded_numeric_body_count >= max(2, len(titles) // 5):
+            return True
+
+        if repeated_chapter_no_count > 0 or reversed_chapter_no_count > 0:
+            return True
+
+        return False
+
+    @staticmethod
+    def _catalog_structure_score(chapters: List[Dict[str, Any]]) -> int:
+        score = 0
+        for chapter in chapters:
+            score += 1
+            score += len(chapter.get("subsections", []) or [])
+        return score
+
+    @classmethod
+    def _coerce_numeric_catalog_section(
+        cls,
+        title_text: str,
+        document_l1_rules: Optional[List[str]],
+        active_l2_rule: Optional[str],
+    ) -> Optional[str]:
+        if active_l2_rule is not None:
+            return None
+
+        if not document_l1_rules:
+            return None
+
+        if "Rule_1_纯数字派" in document_l1_rules:
+            return None
+
+        if re.match(r"^\d{1,2}(?:[\..。、])?\s*(?!\d)[\u4e00-\u9fa5A-Za-z].*", title_text.strip()):
+            return cls._clean_section_title(title_text)
+
+        return None
+
     @staticmethod
     def _split_catalog_entry(line: str) -> Tuple[str, Optional[int]]:
         cleaned = line.strip()
@@ -582,6 +801,220 @@ class PdfStructureExtractor:
             "page_end": page_num,
         }
 
+    @classmethod
+    def _prepare_page_lines(cls, text: str) -> List[str]:
+        raw_lines = [line.strip() for line in text.split("\n") if line.strip()]
+        prepared_lines: List[str] = []
+        index = 0
+
+        while index < len(raw_lines):
+            merged_line, consumed = cls._merge_heading_fragment(raw_lines, index)
+            if merged_line:
+                prepared_lines.append(merged_line)
+                index += consumed
+                continue
+
+            prepared_lines.append(raw_lines[index])
+            index += 1
+
+        return prepared_lines
+
+    @classmethod
+    def _merge_heading_fragment(
+        cls,
+        lines: List[str],
+        start_index: int,
+    ) -> Tuple[Optional[str], int]:
+        first_line = lines[start_index].strip()
+        if not first_line:
+            return None, 1
+
+        first_is_heading = bool(cls._matching_rule_names(first_line, "l1") or cls._matching_rule_names(first_line, "l2"))
+        first_is_incomplete = cls._is_incomplete_heading_fragment(first_line)
+        max_span = min(3, len(lines) - start_index)
+
+        for span in range(2, max_span + 1):
+            candidate_lines = [lines[start_index + offset].strip() for offset in range(span)]
+            candidate_text = " ".join(candidate_lines).strip()
+            if not candidate_text or cls.TOC_PATTERN.search(candidate_text):
+                continue
+            if not (cls._matching_rule_names(candidate_text, "l1") or cls._matching_rule_names(candidate_text, "l2")):
+                continue
+            if first_is_incomplete or not first_is_heading:
+                return candidate_text, span
+
+        return None, 1
+
+    @staticmethod
+    def _is_incomplete_heading_fragment(line: str) -> bool:
+        clean_line = re.sub(r"\s+", "", line.strip())
+        if not clean_line:
+            return False
+
+        fragment_patterns = (
+            r"^第(?:\d+|[一二三四五六七八九十百零两]+)[章部分篇]$",
+            r"^\d{1,2}(?:[\..。、])$",
+            r"^[【\[]\d+[\]】]$",
+            r"^[一二三四五六七八九十百零两]+[、)\)\]]$",
+            r"^第[一二三四五六七八九十百零两]+节$",
+            r"^\d+\.\d+(?!\.\d)\.?$",
+        )
+        return any(re.match(pattern, clean_line) for pattern in fragment_patterns)
+
+    def _rebuild_section_contents_from_catalog(
+        self,
+        chapters: Dict[str, Dict[str, Dict[str, Any]]],
+        catalog: Dict[str, Any],
+        body_lines: List[Dict[str, Any]],
+    ) -> Dict[str, Dict[str, Dict[str, Any]]]:
+        catalog_chapters = catalog.get("chapters", []) if isinstance(catalog, dict) else []
+        if not catalog_chapters or not body_lines:
+            return chapters
+
+        expected_items: List[Dict[str, Any]] = []
+        total_sections = 0
+        for chapter in catalog_chapters:
+            chapter_title = (chapter.get("title", "") or "").strip()
+            if not chapter_title:
+                continue
+            chapter_page = self._safe_page_number(chapter.get("page"))
+            expected_items.append({
+                "kind": "chapter",
+                "title": chapter_title,
+                "chapter_title": chapter_title,
+                "section_title": "章节标题",
+                "page_hint": chapter_page,
+                "line_index": None,
+                "page": chapter_page,
+            })
+
+            for subsection in chapter.get("subsections", []):
+                section_title = (subsection.get("title", "") or "").strip()
+                if not section_title:
+                    continue
+                total_sections += 1
+                expected_items.append({
+                    "kind": "section",
+                    "title": section_title,
+                    "chapter_title": chapter_title,
+                    "section_title": section_title,
+                    "page_hint": self._safe_page_number(subsection.get("page"), chapter_page),
+                    "line_index": None,
+                    "page": self._safe_page_number(subsection.get("page"), chapter_page),
+                })
+
+        if not expected_items or total_sections == 0:
+            return chapters
+
+        search_start = 0
+        found_sections = 0
+        for item in expected_items:
+            line_index = self._find_heading_line_index(
+                body_lines,
+                item["title"],
+                item["kind"],
+                search_start,
+            )
+            item["line_index"] = line_index
+            if line_index is not None:
+                item["page"] = body_lines[line_index]["page"]
+                search_start = line_index + 1
+                if item["kind"] == "section":
+                    found_sections += 1
+
+        if found_sections == 0:
+            return chapters
+
+        rebuilt: Dict[str, Dict[str, Dict[str, Any]]] = {}
+        section_title_key = "章节标题"
+
+        for chapter in catalog_chapters:
+            chapter_title = (chapter.get("title", "") or "").strip()
+            if not chapter_title:
+                continue
+
+            chapter_page = self._safe_page_number(chapter.get("page"))
+            existing_sections = chapters.get(chapter_title, {})
+            rebuilt[chapter_title] = {
+                section_title_key: existing_sections.get(section_title_key, self._empty_section_payload(chapter_page))
+            }
+
+            for subsection in chapter.get("subsections", []):
+                section_title = (subsection.get("title", "") or "").strip()
+                if not section_title:
+                    continue
+                rebuilt[chapter_title][section_title] = existing_sections.get(
+                    section_title,
+                    self._empty_section_payload(self._safe_page_number(subsection.get("page"), chapter_page)),
+                )
+
+        for idx, item in enumerate(expected_items):
+            if item["kind"] != "section" or item["line_index"] is None:
+                continue
+
+            next_heading_index = len(body_lines)
+            for later in expected_items[idx + 1:]:
+                if later["line_index"] is not None:
+                    next_heading_index = later["line_index"]
+                    break
+
+            content_entries = body_lines[item["line_index"] + 1:next_heading_index]
+            content_text = "\n".join(entry["text"] for entry in content_entries).strip()
+            existing_payload = rebuilt[item["chapter_title"]].get(item["section_title"], {})
+
+            if not content_text and (existing_payload.get("content") or "").strip():
+                continue
+
+            if content_entries:
+                page_start = content_entries[0]["page"]
+                page_end = content_entries[-1]["page"]
+            else:
+                page_start = item["page"]
+                page_end = item["page"]
+
+            rebuilt[item["chapter_title"]][item["section_title"]] = {
+                "content": content_text,
+                "page_start": page_start,
+                "page_end": page_end,
+            }
+
+        return rebuilt or chapters
+
+    def _find_heading_line_index(
+        self,
+        body_lines: List[Dict[str, Any]],
+        target_title: str,
+        heading_kind: str,
+        start_index: int,
+    ) -> Optional[int]:
+        target_key = self._normalize_heading_key(target_title)
+        if not target_key:
+            return None
+
+        for index in range(start_index, len(body_lines)):
+            candidate_text = (body_lines[index].get("text") or "").strip()
+            if not candidate_text or self.TOC_PATTERN.search(candidate_text):
+                continue
+
+            if heading_kind == "chapter":
+                candidate_key = self._normalize_heading_key(self._clean_chapter_title(candidate_text))
+            else:
+                candidate_key = self._normalize_heading_key(self._clean_section_title(candidate_text))
+
+            if candidate_key == target_key:
+                return index
+
+            raw_candidate_key = self._normalize_heading_key(candidate_text)
+            if raw_candidate_key.endswith(target_key):
+                prefix = raw_candidate_key[:-len(target_key)]
+                if not prefix or re.fullmatch(
+                    r"[\dA-Za-z\.\-_/|,:;()\[\]\u3001\u3002\uff0c\uff1a\uff1b\uff08\uff09\u3010\u3011]+",
+                    prefix,
+                ):
+                    return index
+
+        return None
+
     def _process_ocr_concurrent(self, regions: List[TableRegion], progress_callback=None) -> List[OcrResult]:
         """同步并发处理 OCR(使用 ThreadPoolExecutor)"""
         results: List[OcrResult] = []
@@ -751,78 +1184,96 @@ class PdfStructureExtractor:
         if not ocr_results:
             return original_text
 
-        # 获取页面上的文本块及其坐标
         text_blocks = []
         for block in page.get_text("blocks"):
             x0, y0, x1, y1, text, _, _ = block
-            # 只考虑裁剪区域内的文本
             if y0 >= clip_box.y0 and y1 <= clip_box.y1:
                 text_blocks.append({
                     "bbox": (x0, y0, x1, y1),
                     "text": text.strip(),
                 })
 
-        # 按 Y 坐标排序
         text_blocks.sort(key=lambda b: (b["bbox"][1], b["bbox"][0]))
 
-        # 找出属于表格区域的文本块
+        if not text_blocks:
+            return original_text
+
+        region_entries: List[Dict[str, Any]] = []
         replaced_indices: Set[int] = set()
-        for ocr_result in ocr_results:
-            bbox = ocr_result["bbox"]
-            rx0, ry0, rx1, ry1 = bbox
+
+        for ocr_result in sorted(ocr_results, key=lambda r: r["bbox"][1]):
+            rx0, ry0, rx1, ry1 = ocr_result["bbox"]
+            current_indices: List[int] = []
 
             for idx, block in enumerate(text_blocks):
                 if idx in replaced_indices:
                     continue
-                bx0, by0, bx1, by1 = block["bbox"]
+                if self._block_contains_heading(block["text"]):
+                    continue
 
-                # 检查重叠
+                bx0, by0, bx1, by1 = block["bbox"]
                 overlap_x = max(0, min(bx1, rx1) - max(bx0, rx0))
                 overlap_y = max(0, min(by1, ry1) - max(by0, ry0))
                 overlap_area = overlap_x * overlap_y
-                block_area = (bx1 - bx0) * (by1 - by0)
+                block_area = max((bx1 - bx0) * (by1 - by0), 1)
 
-                if block_area > 0 and overlap_area / block_area > 0.5:
-                    replaced_indices.add(idx)
+                if overlap_area / block_area > 0.5:
+                    current_indices.append(idx)
 
-        # 构建新文本
-        result_parts: List[str] = []
-        last_idx = 0
+            if not current_indices:
+                continue
 
-        for ocr_result in sorted(ocr_results, key=lambda r: r["bbox"][1]):
-            bbox = ocr_result["bbox"]
-            rx0, ry0, rx1, ry1 = bbox
+            replaced_indices.update(current_indices)
+            region_entries.append({
+                "start": min(current_indices),
+                "end": max(current_indices),
+                "ocr_text": (ocr_result.get("ocr_text") or "").strip(),
+            })
+
+        if not region_entries:
+            return original_text
+
+        region_by_start = {entry["start"]: entry for entry in region_entries}
+        result_parts: List[str] = []
+        idx = 0
+
+        while idx < len(text_blocks):
+            region = region_by_start.get(idx)
+            if region is not None:
+                if region["ocr_text"]:
+                    result_parts.append(region["ocr_text"])
+                    result_parts.append("\n")
+                else:
+                    for block_idx in range(region["start"], region["end"] + 1):
+                        block_text = text_blocks[block_idx]["text"]
+                        if block_text:
+                            result_parts.append(block_text)
+                            result_parts.append("\n")
+                idx = region["end"] + 1
+                continue
 
-            # 找到该表格区域之前的文本
-            region_start_idx = None
-            for idx, block in enumerate(text_blocks):
-                if idx in replaced_indices:
-                    bx0, by0, bx1, by1 = block["bbox"]
-                    if (bx0 >= rx0 - 5 and bx1 <= rx1 + 5 and
-                        by0 >= ry0 - 5 and by1 <= ry1 + 5):
-                        if region_start_idx is None:
-                            region_start_idx = idx
-                        last_idx = idx + 1
-
-            if region_start_idx is not None:
-                # 添加表格前的非表格文本
-                for idx in range(last_idx - (last_idx - region_start_idx), region_start_idx):
-                    if idx not in replaced_indices and idx < len(text_blocks):
-                        result_parts.append(text_blocks[idx]["text"])
-                        result_parts.append("\n")
-
-                # 添加 OCR 结果
-                result_parts.append(ocr_result["ocr_text"])
-                result_parts.append("\n")
-
-        # 添加剩余文本
-        for idx in range(last_idx, len(text_blocks)):
             if idx not in replaced_indices:
-                result_parts.append(text_blocks[idx]["text"])
-                result_parts.append("\n")
+                block_text = text_blocks[idx]["text"]
+                if block_text:
+                    result_parts.append(block_text)
+                    result_parts.append("\n")
+            idx += 1
 
         return "".join(result_parts).strip() or original_text
 
+    @classmethod
+    def _block_contains_heading(cls, text: str) -> bool:
+        if not text or not text.strip():
+            return False
+
+        for line in cls._prepare_page_lines(text):
+            stripped = line.strip()
+            if not stripped:
+                continue
+            if cls._matching_rule_names(stripped, "l1") or cls._matching_rule_names(stripped, "l2"):
+                return True
+        return False
+
     def _compress_image(self, img_bytes: bytes) -> bytes:
         """压缩图片"""
         try:
@@ -873,10 +1324,36 @@ class PdfStructureExtractor:
 
     @staticmethod
     def _is_header_footer(line: str) -> bool:
+        compact_line = re.sub(r"\s+", "", line.strip())
+        if not compact_line:
+            return False
+
+        heading_prefix = re.match(
+            r"^(第[\d一二三四五六七八九十百零两]+[章节部分篇]|[\d]+\.\d+|[\d]+[\..。、]?|[一二三四五六七八九十百零两]+[、)\)\]]|第[一二三四五六七八九十百零两]+节|【\d+】)",
+            compact_line,
+        )
+
+        if compact_line.isdigit():
+            return True
+
+        if (
+            compact_line.endswith("有限责任公司")
+            or compact_line.endswith("有限公司")
+            or compact_line.endswith("股份有限公司")
+        ) and not heading_prefix:
+            return True
+
+        if compact_line.endswith("专项施工方案") and not heading_prefix:
+            return True
+
         return (
             "四川路桥建设集团股份有限公司" in line
             or "T梁运输及安装专项施工方案" in line
-            or line.isdigit()
+            or (
+                compact_line.endswith("工程项目")
+                and len(compact_line) >= 8
+                and not compact_line.startswith("第")
+            )
         )
 
     @classmethod

+ 3 - 3
core/construction_review/component/minimal_pipeline/simple_processor.py

@@ -122,9 +122,9 @@ class SimpleDocumentProcessor:
                     pass
 
         structure = self.pdf_extractor.extract(file_content, progress_callback=_extraction_progress)
-        print("-"*50)
-        print(f'{json.dumps(structure, ensure_ascii=False, indent=2)}')
-        print("-"*50)
+        logger.info("-"*50)
+        logger.info(f'{json.dumps(structure, ensure_ascii=False, indent=2)}')
+        logger.info("-"*50)
         catalog = structure.get("catalog")  # 获取YOLO检测+OCR提取的目录
 
         # 对 catalog 进行分类(如果存在)