Răsfoiți Sursa

fix(规则优化)

tangle 4 zile în urmă
părinte
comite
6f04793c1a

+ 275 - 4
core/construction_review/component/minimal_pipeline/pdf_extractor2.py

@@ -176,6 +176,12 @@ class PdfStructureExtractor:
                 )
                 if rebuilt_chapters:
                     structure["chapters"] = rebuilt_chapters
+                enriched_catalog = self._enrich_catalog_with_structure(
+                    result["catalog"],
+                    structure.get("chapters", {}),
+                )
+                if enriched_catalog:
+                    result["catalog"] = enriched_catalog
             structure.pop("_body_lines", None)
             result["chapters"] = structure.get("chapters", {})
             result["total_pages"] = len(doc)
@@ -409,6 +415,10 @@ class PdfStructureExtractor:
                 selected_chapters = parsed_chapters
 
         if selected_chapters:
+            selected_chapters = self._merge_catalog_chapters(
+                selected_chapters,
+                parsed_chapters,
+            )
             normalized["chapters"] = selected_chapters
             normalized["total_chapters"] = len(selected_chapters)
             normalized["formatted_text"] = self._format_catalog_chapters(selected_chapters)
@@ -553,13 +563,22 @@ class PdfStructureExtractor:
         if not parsed_chapters:
             return False
 
-        if cls._catalog_has_suspicious_structure(parsed_chapters):
-            return False
+        parsed_is_suspicious = cls._catalog_has_suspicious_structure(parsed_chapters)
+        existing_is_suspicious = cls._catalog_has_suspicious_structure(existing_chapters)
+
+        if parsed_is_suspicious:
+            if not existing_chapters or not existing_is_suspicious:
+                return False
+
+            parsed_score = cls._catalog_structure_score(parsed_chapters)
+            existing_score = cls._catalog_structure_score(existing_chapters)
+            overlap_ratio = cls._catalog_chapter_overlap_ratio(parsed_chapters, existing_chapters)
+            return overlap_ratio >= 0.6 and parsed_score > existing_score
 
         if not existing_chapters:
             return True
 
-        if cls._catalog_has_suspicious_structure(existing_chapters):
+        if existing_is_suspicious:
             return True
 
         parsed_score = cls._catalog_structure_score(parsed_chapters)
@@ -642,6 +661,138 @@ class PdfStructureExtractor:
             score += len(chapter.get("subsections", []) or [])
         return score
 
+    @classmethod
+    def _catalog_chapter_overlap_ratio(
+        cls,
+        chapters_a: List[Dict[str, Any]],
+        chapters_b: List[Dict[str, Any]],
+    ) -> float:
+        if not chapters_a or not chapters_b:
+            return 0.0
+
+        keys_a = {
+            cls._catalog_chapter_identity_key(chapter.get("title", ""))
+            for chapter in chapters_a
+            if chapter.get("title")
+        }
+        keys_b = {
+            cls._catalog_chapter_identity_key(chapter.get("title", ""))
+            for chapter in chapters_b
+            if chapter.get("title")
+        }
+        if not keys_a or not keys_b:
+            return 0.0
+
+        return len(keys_a & keys_b) / max(1, min(len(keys_a), len(keys_b)))
+
+    @classmethod
+    def _catalog_chapter_identity_key(cls, title: str) -> str:
+        cleaned = cls._clean_chapter_title(title)
+        if not cleaned:
+            return ""
+
+        chapter_match = re.match(
+            r"^第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章节部分篇]\s*(.*)$",
+            cleaned,
+        )
+        if chapter_match:
+            chapter_body = cls._normalize_heading_key(chapter_match.group(1))
+            if chapter_body:
+                return chapter_body
+
+        numeric_match = re.match(r"^\d{1,2}(?:[\..。、])?\s*(.*)$", cleaned)
+        if numeric_match:
+            numeric_body = cls._normalize_heading_key(numeric_match.group(1))
+            if numeric_body:
+                return numeric_body
+
+        return cls._normalize_heading_key(cleaned)
+
+    @classmethod
+    def _merge_catalog_chapters(
+        cls,
+        base_chapters: List[Dict[str, Any]],
+        supplemental_chapters: List[Dict[str, Any]],
+    ) -> List[Dict[str, Any]]:
+        if not base_chapters:
+            return supplemental_chapters or []
+        if not supplemental_chapters:
+            return base_chapters
+
+        merged: List[Dict[str, Any]] = []
+        supplemental_by_key = {
+            cls._catalog_chapter_identity_key(chapter.get("title", "")): chapter
+            for chapter in supplemental_chapters
+            if chapter.get("title")
+        }
+
+        for index, chapter in enumerate(base_chapters, 1):
+            chapter_copy = {
+                **chapter,
+                "subsections": [dict(sub) for sub in chapter.get("subsections", []) or []],
+            }
+            chapter_key = cls._catalog_chapter_identity_key(chapter_copy.get("title", ""))
+            supplemental = supplemental_by_key.get(chapter_key)
+            if supplemental:
+                merged_subsections = cls._merge_catalog_subsections(
+                    chapter_copy.get("subsections", []),
+                    supplemental.get("subsections", []) or [],
+                )
+                chapter_copy["subsections"] = merged_subsections
+            chapter_copy["index"] = index
+            merged.append(chapter_copy)
+
+        return merged
+
+    @classmethod
+    def _merge_catalog_subsections(
+        cls,
+        base_subsections: List[Dict[str, Any]],
+        supplemental_subsections: List[Dict[str, Any]],
+    ) -> List[Dict[str, Any]]:
+        if not base_subsections:
+            return [dict(sub) for sub in supplemental_subsections]
+        if not supplemental_subsections:
+            return [dict(sub) for sub in base_subsections]
+
+        def _subsection_score(items: List[Dict[str, Any]]) -> int:
+            score = 0
+            for item in items:
+                title = (item.get("title", "") or "").strip()
+                if not title:
+                    continue
+                score += 1
+                if re.match(r"^\d+\.\d+(?!\.\d)\.?\s*", title):
+                    score += 3
+                elif re.match(r"^(第\s*[一二三四五六七八九十百零两]+\s*节)", title):
+                    score += 3
+                elif re.match(r"^([一二三四五六七八九十百零两]+[、)\)\]])", title):
+                    score += 3
+                elif re.match(r"^[【\[]\s*\d+\s*[\]】]", title):
+                    score += 3
+                elif re.match(r"^\d{1,2}[\..。、]\s*", title):
+                    score += 1
+            return score
+
+        base_score = _subsection_score(base_subsections)
+        supplemental_score = _subsection_score(supplemental_subsections)
+        if supplemental_score > base_score:
+            return [dict(sub) for sub in supplemental_subsections]
+
+        merged = [dict(sub) for sub in base_subsections]
+        seen_keys = {
+            cls._normalize_heading_key(sub.get("title", ""))
+            for sub in merged
+            if sub.get("title")
+        }
+        for subsection in supplemental_subsections:
+            subsection_key = cls._normalize_heading_key(subsection.get("title", ""))
+            if not subsection_key or subsection_key in seen_keys:
+                continue
+            merged.append(dict(subsection))
+            seen_keys.add(subsection_key)
+        return merged
+
     @classmethod
     def _coerce_numeric_catalog_section(
         cls,
@@ -692,6 +843,111 @@ class PdfStructureExtractor:
                     lines.append(f"  {sub_title}")
         return "\n".join(lines)
 
+    def _enrich_catalog_with_structure(
+        self,
+        catalog: Dict[str, Any],
+        chapters: Dict[str, Dict[str, Dict[str, Any]]],
+    ) -> Dict[str, Any]:
+        catalog_chapters = catalog.get("chapters", []) if isinstance(catalog, dict) else []
+        if not catalog_chapters or not chapters:
+            return catalog
+
+        enriched = dict(catalog)
+        structure_items = list(chapters.items())
+        structure_by_key = {
+            self._catalog_chapter_identity_key(chapter_title): (chapter_title, sections)
+            for chapter_title, sections in structure_items
+        }
+        used_structure_keys: Set[str] = set()
+
+        enriched_chapters: List[Dict[str, Any]] = []
+        for catalog_chapter in catalog_chapters:
+            chapter_copy = dict(catalog_chapter)
+            chapter_key = self._catalog_chapter_identity_key(chapter_copy.get("title", ""))
+            structure_match = structure_by_key.get(chapter_key)
+            if structure_match is None:
+                enriched_chapters.append(chapter_copy)
+                continue
+
+            structure_title, structure_sections = structure_match
+            used_structure_keys.add(chapter_key)
+            title_payload = structure_sections.get("章节标题", {})
+            chapter_copy["title"] = structure_title
+            chapter_copy["content"] = title_payload.get("content", "")
+            chapter_copy["page_start"] = title_payload.get("page_start", self._safe_page_number(chapter_copy.get("page")))
+            chapter_copy["page_end"] = title_payload.get("page_end", chapter_copy["page_start"])
+
+            structure_subsections = [
+                (section_title, payload)
+                for section_title, payload in structure_sections.items()
+                if section_title != "章节标题"
+            ]
+            catalog_subsections = chapter_copy.get("subsections", []) or []
+            subsection_by_key = {
+                self._normalize_heading_key(subsection.get("title", "")): subsection
+                for subsection in catalog_subsections
+                if subsection.get("title")
+            }
+
+            enriched_subsections: List[Dict[str, Any]] = []
+            for section_title, payload in structure_subsections:
+                section_key = self._normalize_heading_key(section_title)
+                subsection = dict(subsection_by_key.get(section_key, {}))
+                subsection.setdefault("title", section_title)
+                subsection.setdefault("page", str(payload.get("page_start", chapter_copy["page_start"])))
+                subsection.setdefault("level", 2)
+                subsection.setdefault("original", section_title)
+                subsection["content"] = payload.get("content", "")
+                subsection["page_start"] = payload.get("page_start", chapter_copy["page_start"])
+                subsection["page_end"] = payload.get("page_end", subsection["page_start"])
+                enriched_subsections.append(subsection)
+
+            chapter_copy["subsections"] = enriched_subsections
+            enriched_chapters.append(chapter_copy)
+
+        existing_catalog_keys = {
+            self._catalog_chapter_identity_key(chapter.get("title", ""))
+            for chapter in enriched_chapters
+            if chapter.get("title")
+        }
+        for chapter_title, structure_sections in structure_items:
+            chapter_key = self._catalog_chapter_identity_key(chapter_title)
+            if chapter_key in existing_catalog_keys or chapter_key in used_structure_keys:
+                continue
+
+            title_payload = structure_sections.get("章节标题", {})
+            new_chapter = {
+                "index": len(enriched_chapters) + 1,
+                "title": chapter_title,
+                "page": str(title_payload.get("page_start", 1)),
+                "original": chapter_title,
+                "content": title_payload.get("content", ""),
+                "page_start": title_payload.get("page_start", 1),
+                "page_end": title_payload.get("page_end", title_payload.get("page_start", 1)),
+                "subsections": [],
+            }
+            for section_title, payload in structure_sections.items():
+                if section_title == "章节标题":
+                    continue
+                new_chapter["subsections"].append({
+                    "title": section_title,
+                    "page": str(payload.get("page_start", new_chapter["page_start"])),
+                    "level": 2,
+                    "original": section_title,
+                    "content": payload.get("content", ""),
+                    "page_start": payload.get("page_start", new_chapter["page_start"]),
+                    "page_end": payload.get("page_end", payload.get("page_start", new_chapter["page_start"])),
+                })
+            enriched_chapters.append(new_chapter)
+
+        for index, chapter in enumerate(enriched_chapters, 1):
+            chapter["index"] = index
+
+        enriched["chapters"] = enriched_chapters
+        enriched["total_chapters"] = len(enriched_chapters)
+        enriched["formatted_text"] = self._format_catalog_chapters(enriched_chapters)
+        return enriched
+
     def _reconcile_structure_with_catalog(
         self,
         chapters: Dict[str, Dict[str, Dict[str, Any]]],
@@ -1364,6 +1620,8 @@ class PdfStructureExtractor:
         rule_names: Optional[List[str]] = None,
     ) -> List[str]:
         clean_line = line.strip()
+        if level == "l1":
+            clean_line = cls._strip_leading_page_number_from_cn_chapter(clean_line)
         names = rule_names or list(cls.RULE_LIB.keys())
         return [
             rule_name
@@ -1379,9 +1637,22 @@ class PdfStructureExtractor:
     def _matches_section_heading(cls, line: str, rule_names: Optional[List[str]] = None) -> bool:
         return bool(cls._matching_rule_names(line, "l2", rule_names))
 
+    @staticmethod
+    def _strip_leading_page_number_from_cn_chapter(line: str) -> str:
+        cleaned = re.sub(r"\s+", " ", line.strip())
+        if not cleaned:
+            return ""
+
+        return re.sub(
+            r"^\d{1,3}\s+(?=第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章部分篇])",
+            "",
+            cleaned,
+            count=1,
+        ).strip()
+
     @staticmethod
     def _clean_chapter_title(line: str) -> str:
-        cleaned = line.strip()
+        cleaned = PdfStructureExtractor._strip_leading_page_number_from_cn_chapter(line)
         cleaned = re.sub(r"\s+\d+\s*$", "", cleaned)
         cleaned = re.sub(r"[\._\-]{3,}[^\u4e00-\u9fa5a-zA-Z0-9]*$", "", cleaned)
         cleaned = re.sub(r"\s+", " ", cleaned).strip()