Prechádzať zdrojové kódy

Merge branch 'dev-planWrite' of CRBC-MaaS-Platform-Project/LQAgentPlatform into dev

tangle 2 týždňov pred
rodič
commit
cb9bd97236

+ 195 - 57
core/construction_review/component/minimal_pipeline/pdf_extractor1.py

@@ -7,6 +7,7 @@ PDF 结构提取器。
 """
 
 import re
+from collections import Counter
 from dataclasses import dataclass
 from typing import Any, Dict, List, Optional, Set, Tuple
 
@@ -23,7 +24,18 @@ EMPTY_SECTION_PLACEHOLDER = "[本节无纯文本,原文档中可能为纯图
 TABLE_OCR_START = "[表格OCR识别结果]:"
 TABLE_OCR_END = "[/表格]"
 CN_LIST_L1_NUMERIC_L2_RULE = "Rule_8_中文序号章数字小节派"
-
+CATALOG_LIST = {
+    '编制依据':['法律法规','标准范围','文件制度','编制原则','编制范围'],
+    '工程概况':['设计概况','工程地质与水文气象','周边环境','施工平面及立面布置','施工要求和技术保证条件','风险辨别与分级','参建各方责任主体单位'],
+    '施工计划':['施工进度计划','施工材料计划','施工设备计划','劳动力计划','安全生产费用使用计划'],
+    '施工工艺技术':['主要施工方法概述','技术参数','施工准备','工艺流程','施工方法及操作要求','检查要求'],
+    '安全保证措施':['安全保证体系','组织保证措施','技术保证措施','监测监控措施','应急处置措施'],
+    '质量保证措施':['质量保证体系','质量目标','工程创优规划','质量控制程序与具体措施'],
+    '环境保证措施':['环境保证体系','环境保护组织机构','环境保护及文明施工措施'],
+    '施工管理及作业人员配备与分工':['施工管理人员','专职安全生产管理人员','特种作业人员','其它作业人员'],
+    '验收要求':['验收标准','验收程序','验收内容','验收时间','验收人员'],
+    '其它资料':['计算书','相关施工图纸','附图附表','编制及审核人员情况']
+}
 
 @dataclass(frozen=True)
 class BodyLine:
@@ -1269,6 +1281,7 @@ class PdfStructureExtractor:
 
         structured_data: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
         current_l1: Optional[str] = None
+        current_standard_l1: Optional[str] = None
         current_l1_num = 0
         current_l2: Optional[str] = None
         pending_prefix: Optional[str] = None
@@ -1278,6 +1291,7 @@ class PdfStructureExtractor:
         chapter_line_offset = 0
 
         backup_l1: Optional[str] = None
+        backup_standard_l1: Optional[str] = None
         backup_l1_num = 0
         backup_l2: Optional[str] = None
         backup_l2_sub_num = 0
@@ -1310,66 +1324,75 @@ class PdfStructureExtractor:
                 continue
 
             has_toc = self._is_toc_line(line)
+            has_unconfirmed_toc = self._looks_like_toc_candidate(line) and not has_toc
 
             match_l1 = rule_set["l1"].match(line)
-            if match_l1 and not has_toc:
-                core_text = self._blind_strip(line)
-                if len(core_text) < 2:
-                    pending_prefix = line
-                    pending_page = page
-                    continue
-
-                if self._is_valid_heading_strict(line, is_l1=True):
-                    l1_candidate_num = self._extract_l1_number(line, rule_name, match_l1, current_l1_num)
+            if match_l1 and not has_toc and not has_unconfirmed_toc:
+                standard_l1 = self._match_standard_catalog_chapter(line)
+                if not standard_l1:
+                    # 命中标题形态但不在标准目录中,按正文继续处理,避免误切章节。
+                    pass
+                else:
+                    core_text = self._blind_strip(line)
+                    if len(core_text) < 2:
+                        pending_prefix = line
+                        pending_page = page
+                        continue
 
-                    if rule_name == CN_LIST_L1_NUMERIC_L2_RULE:
-                        if not self._has_expected_numeric_l2_ahead(body_lines, index, l1_candidate_num):
-                            continue
+                    if self._is_valid_heading_strict(line, is_l1=True):
+                        l1_candidate_num = self._extract_l1_number(line, rule_name, match_l1, current_l1_num)
 
-                    if rule_name == "Rule_1_纯数字派":
-                        # 纯数字一级标题更容易误中表格行或编号列表,因此需要额外的序号和噪声校验。
-                        if current_l1 is None and l1_candidate_num not in {1, 2}:
-                            continue
-                        if self._looks_like_plain_numeric_l1_noise(line):
-                            continue
+                        if rule_name == CN_LIST_L1_NUMERIC_L2_RULE:
+                            if not self._has_expected_numeric_l2_ahead(body_lines, index, l1_candidate_num):
+                                continue
 
-                    if rule_name in {"Rule_1_纯数字派", "Rule_2_混合章派"} and current_l1 is not None:
-                        if l1_candidate_num < current_l1_num:
-                            continue
-                        if l1_candidate_num - current_l1_num > 2:
-                            continue
-                        if l1_candidate_num == current_l1_num:
-                            # 同编号章节重复出现时,若旧章节尚未出现小节,则把旧章节视作误判并回收内容。
-                            if not self._chapter_has_l2(structured_data.get(current_l1, {})):
-                                old_preface = structured_data[current_l1].get(SECTION_TITLE_KEY, [])
-                                old_page = self._safe_page_number(structured_data[current_l1].get("_chapter_page"), page)
-                                restored = [{"text": current_l1, "page": old_page}] + old_preface
-                                del structured_data[current_l1]
+                        if rule_name == "Rule_1_纯数字派":
+                            # 纯数字一级标题更容易误中表格行或编号列表,因此需要额外的序号和噪声校验。
+                            if current_l1 is None and l1_candidate_num not in {1, 2}:
+                                continue
+                            if self._looks_like_plain_numeric_l1_noise(line):
+                                continue
 
-                                current_l1 = self._clean_chapter_title(line)
-                                structured_data[current_l1] = {"_chapter_page": page}  # type: ignore[assignment]
-                                if restored:
-                                    structured_data[current_l1][SECTION_TITLE_KEY] = restored
-                                current_l1_num = l1_candidate_num
-                                current_l2 = None
-                                last_l2_sub_num = 0
-                                chapter_l2_style_hint = None
-                                chapter_line_offset = 0
-                            continue
+                        if rule_name in {"Rule_1_纯数字派", "Rule_2_混合章派"} and current_l1 is not None:
+                            if l1_candidate_num < current_l1_num:
+                                continue
+                            if l1_candidate_num - current_l1_num > 2:
+                                continue
+                            if l1_candidate_num == current_l1_num:
+                                # 同编号章节重复出现时,若旧章节尚未出现小节,则把旧章节视作误判并回收内容。
+                                if not self._chapter_has_l2(structured_data.get(current_l1, {})):
+                                    old_preface = structured_data[current_l1].get(SECTION_TITLE_KEY, [])
+                                    old_page = self._safe_page_number(structured_data[current_l1].get("_chapter_page"), page)
+                                    restored = [{"text": current_l1, "page": old_page}] + old_preface
+                                    del structured_data[current_l1]
+
+                                    current_l1 = self._clean_chapter_title(line)
+                                    current_standard_l1 = standard_l1
+                                    structured_data[current_l1] = {"_chapter_page": page}  # type: ignore[assignment]
+                                    if restored:
+                                        structured_data[current_l1][SECTION_TITLE_KEY] = restored
+                                    current_l1_num = l1_candidate_num
+                                    current_l2 = None
+                                    last_l2_sub_num = 0
+                                    chapter_l2_style_hint = None
+                                    chapter_line_offset = 0
+                                continue
 
-                    backup_l1 = current_l1
-                    backup_l1_num = current_l1_num
-                    backup_l2 = current_l2
-                    backup_l2_sub_num = last_l2_sub_num
-
-                    current_l1 = self._clean_chapter_title(line)
-                    current_l1_num = l1_candidate_num
-                    structured_data.setdefault(current_l1, {"_chapter_page": page})  # type: ignore[assignment]
-                    current_l2 = None
-                    last_l2_sub_num = 0
-                    chapter_l2_style_hint = None
-                    chapter_line_offset = 0
-                    continue
+                        backup_l1 = current_l1
+                        backup_standard_l1 = current_standard_l1
+                        backup_l1_num = current_l1_num
+                        backup_l2 = current_l2
+                        backup_l2_sub_num = last_l2_sub_num
+
+                        current_l1 = self._clean_chapter_title(line)
+                        current_standard_l1 = standard_l1
+                        current_l1_num = l1_candidate_num
+                        structured_data.setdefault(current_l1, {"_chapter_page": page})  # type: ignore[assignment]
+                        current_l2 = None
+                        last_l2_sub_num = 0
+                        chapter_l2_style_hint = None
+                        chapter_line_offset = 0
+                        continue
 
             if current_l1 and not has_toc:
                 chapter_line_offset += 1
@@ -1382,7 +1405,7 @@ class PdfStructureExtractor:
                     chapter_l2_style_hint = self._detect_cn_order_l2_style(line)
 
             match_l2 = rule_set["l2"].match(line)
-            if current_l1 and match_l2 and not has_toc:
+            if current_l1 and match_l2 and not has_toc and not has_unconfirmed_toc:
                 if self._is_valid_heading_strict(line, is_l1=False):
                     if is_numeric_l2:
                         l2_main_num = int(match_l2.group(1))
@@ -1403,6 +1426,7 @@ class PdfStructureExtractor:
                                 structured_data[backup_l1].setdefault(target_node, []).extend(text_to_restore)
                                 del structured_data[current_l1]
                                 current_l1 = backup_l1
+                                current_standard_l1 = backup_standard_l1
                                 current_l1_num = backup_l1_num
                                 current_l2 = backup_l2
                                 last_l2_sub_num = backup_l2_sub_num
@@ -1414,6 +1438,8 @@ class PdfStructureExtractor:
                         elif self._is_suspicious_numeric_l2_jump(l2_sub_num, last_l2_sub_num):
                             # 大跨度跳号常见于正文引用,例如 1.2 后出现 1.9,不直接当作新小节。
                             pass
+                        elif not self._match_standard_catalog_section(line, current_standard_l1):
+                            pass
                         else:
                             current_l2 = self._clean_section_title(line)
                             last_l2_sub_num = l2_sub_num
@@ -1431,6 +1457,8 @@ class PdfStructureExtractor:
                             l2_sub_num = self._extract_non_numeric_l2_number(match_l2.group(1))
                             if l2_sub_num <= last_l2_sub_num:
                                 pass
+                            elif not self._match_standard_catalog_section(line, current_standard_l1):
+                                pass
                             else:
                                 current_l2 = self._clean_section_title(line)
                                 last_l2_sub_num = l2_sub_num
@@ -2043,9 +2071,119 @@ class PdfStructureExtractor:
         """判断一行文本是否像目录行。"""
 
         clean_line = str(line or "").strip()
-        if cls.TOC_PATTERN.search(clean_line):
+        if not clean_line:
+            return False
+
+        compact = re.sub(r"\s+", "", clean_line)
+        if compact in {"目录", "目", "录"}:
             return True
-        return bool(re.search(r"\s{2,}\d{1,3}$", clean_line))
+
+        if not cls._looks_like_toc_candidate(clean_line):
+            return False
+
+        return cls._is_standard_catalog_name(clean_line)
+
+    @classmethod
+    def _looks_like_toc_candidate(cls, line: str) -> bool:
+        """只判断文本形态是否像目录行,不代表它已通过标准目录校验。"""
+
+        clean_line = str(line or "").strip()
+        if not clean_line:
+            return False
+        return bool(
+            cls.TOC_PATTERN.search(clean_line)
+            or re.search(r"\s{2,}\d{1,3}$", clean_line)
+        )
+
+    @classmethod
+    def _is_standard_catalog_name(cls, line: str) -> bool:
+        """目录候选去掉编号和页码后,必须命中标准目录名才当作目录行。"""
+
+        return bool(
+            cls._match_standard_catalog_chapter(line)
+            or cls._match_standard_catalog_section(line)
+        )
+
+    @classmethod
+    def _match_standard_catalog_chapter(cls, line: str) -> Optional[str]:
+        """返回命中的标准一级目录名;未命中时返回 None。"""
+
+        normalized = cls._normalize_catalog_name(line)
+        if not normalized:
+            return None
+
+        for chapter_title, section_titles in CATALOG_LIST.items():
+            if cls._catalog_name_matches_standard(normalized, chapter_title):
+                return chapter_title
+        return None
+
+    @classmethod
+    def _match_standard_catalog_section(
+        cls,
+        line: str,
+        chapter_title: Optional[str] = None,
+    ) -> Optional[str]:
+        """返回命中的标准二级目录名;传入一级目录时只在该一级下匹配。"""
+
+        normalized = cls._normalize_catalog_name(line)
+        if not normalized:
+            return None
+
+        if chapter_title:
+            chapter_candidates = [chapter_title]
+        else:
+            chapter_candidates = list(CATALOG_LIST.keys())
+
+        for candidate_chapter in chapter_candidates:
+            for section_title in CATALOG_LIST.get(candidate_chapter, []):
+                if cls._catalog_name_matches_standard(normalized, section_title):
+                    return section_title
+        return None
+
+    @classmethod
+    def _catalog_name_matches_standard(cls, extracted_name: str, standard_name: str) -> bool:
+        """按标题长度分段判断提取标题是否命中标准目录名。"""
+
+        extracted = cls._normalize_catalog_name(extracted_name)
+        standard = cls._normalize_catalog_name(standard_name)
+        if not extracted or not standard:
+            return False
+
+        # 标准目录匹配规则:
+        # 1. 提取标题少于 5 个字时,必须与标准目录名完全相等。
+        # 2. 提取标题超过 15 个字时,直接判定为非标准目录标题。
+        # 3. 提取标题 5 到 15 个字时,允许一定 OCR/抽取误差:
+        #    只要提取标题中至少 80% 的字出现在标准目录名中即可,字符顺序不作要求。
+        extracted_len = len(extracted)
+        if extracted_len < 5:
+            return extracted == standard
+        if extracted_len > 15:
+            return False
+
+        overlap_count = sum((Counter(extracted) & Counter(standard)).values())
+        return (overlap_count / max(extracted_len, 1)) >= 0.8
+
+    @classmethod
+    def _normalize_catalog_name(cls, text: str) -> str:
+        """去掉目录编号、页码、点线和空白,只保留用于标准目录比对的标题名。"""
+
+        cleaned = cls._strip_catalog_page_suffix(text)
+        cleaned = re.sub(r"\s+", " ", str(cleaned or "").strip())
+        if not cleaned:
+            return ""
+
+        prefix_patterns = (
+            r"^第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章节部部分篇][\s、::.-]*",
+            r"^\d+(?:\.\d+)*(?:[\..、))\]]|\s+|(?=[\u4e00-\u9fa5]))",
+            r"^[一二三四五六七八九十百零两]+(?:[、))\]]|\s+|(?=[\u4e00-\u9fa5]))",
+            r"^[((]\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[))]\s*",
+            r"^[【\[]\s*\d+\s*[\]】]\s*",
+        )
+        for pattern in prefix_patterns:
+            cleaned = re.sub(pattern, "", cleaned, count=1).strip()
+
+        cleaned = re.sub(r"\s+", "", cleaned)
+        return cleaned.strip("::、..")
 
     @classmethod
     def _is_header_footer(cls, line: str) -> bool: