ソースを参照

fix(目录规则调整)

tangle 2 週間 前
コミット
5bc0ceac9a

+ 10 - 10
core/construction_review/component/minimal_pipeline/pdf_extractor1.py

@@ -154,14 +154,14 @@ class PdfStructureExtractor:
         }
 
         ocr_catalog: Optional[Dict[str, Any]] = None
-        if self.detect_toc:
-            try:
-                ocr_catalog = self._extract_catalog(file_content, progress_callback)
-                if ocr_catalog:
-                    ocr_catalog = self._normalize_catalog(ocr_catalog)
-                    logger.info(f"[PDF提取] 目录提取完成: {ocr_catalog.get('total_chapters', 0)} 章")
-            except Exception as exc:
-                logger.warning(f"[PDF提取] OCR目录提取失败: {exc}")
+        # if self.detect_toc:
+        #     try:
+        #         ocr_catalog = self._extract_catalog(file_content, progress_callback)
+        #         if ocr_catalog:
+        #             ocr_catalog = self._normalize_catalog(ocr_catalog)
+        #             logger.info(f"[PDF提取] 目录提取完成: {ocr_catalog.get('total_chapters', 0)} 章")
+        #     except Exception as exc:
+        #         logger.warning(f"[PDF提取] OCR目录提取失败: {exc}")
 
         doc = fitz.open(stream=file_content, filetype="pdf")
         try:
@@ -2151,7 +2151,7 @@ class PdfStructureExtractor:
         # 1. 提取标题少于 5 个字时,必须与标准目录名完全相等。
         # 2. 提取标题超过 15 个字时,直接判定为非标准目录标题。
         # 3. 提取标题 5 到 15 个字时,允许一定 OCR/抽取误差:
-        #    只要提取标题中至少 80% 的字出现在标准目录名中即可,字符顺序不作要求。
+        #    只要提取标题中至少 65% 的字出现在标准目录名中即可,字符顺序不作要求。
         extracted_len = len(extracted)
         if extracted_len < 5:
             return extracted == standard
@@ -2159,7 +2159,7 @@ class PdfStructureExtractor:
             return False
 
         overlap_count = sum((Counter(extracted) & Counter(standard)).values())
-        return (overlap_count / max(extracted_len, 1)) >= 0.8
+        return (overlap_count / max(extracted_len, 1)) >= 0.65
 
     @classmethod
     def _normalize_catalog_name(cls, text: str) -> str:

+ 94 - 0
core/construction_review/component/minimal_pipeline/pdf_extractor_batch_runner.py

@@ -37,6 +37,7 @@ if str(REPO_ROOT) not in sys.path:
 
 SPECIAL_SECTION_KEYS = {"章节标题", "默认部分"}
 STAT_FILE_NAME = "static.text"
+UNMATCHED_STANDARD_CATALOG_FILE_NAME = "unmatched_standard_catalog_titles.md"
 TOC_LINE_PATTERN = re.compile(r"(?:[.\u2026·•…]{2,}|-{3,}).{0,30}\d+\s*$")
 TOC_PAGE_SUFFIX_PATTERN = re.compile(
     r"(?:[.\u2026\u00b7\u2022·•…]{2,}|-{3,})[-\u2013\u2014 ]*(?:-\s*)?\d{1,3}(?:\s*-)?\s*$"
@@ -884,6 +885,85 @@ def append_static_record(
         )
 
 
+def collect_unmatched_standard_catalog_titles(
+    pdf_path: Path,
+    extractor: Any,
+    extractor_result: Dict[str, Any],
+) -> List[Dict[str, Any]]:
+    rule_name = extractor_result.get("body_rule")
+    rule_set = getattr(extractor, "RULE_LIB", {}).get(rule_name)
+    if not rule_name or not rule_set:
+        return []
+
+    records: List[Dict[str, Any]] = []
+    seen: set[Tuple[str, int]] = set()
+
+    try:
+        with fitz.open(pdf_path) as doc:
+            body_lines, _ = extractor._extract_body_lines(doc)
+    except Exception:
+        return []
+
+    current_l1_title = ""
+    for item in body_lines:
+        original = str(getattr(item, "text", "") or "").strip()
+        if not original or original.isdigit():
+            continue
+
+        line = extractor._strip_leading_page_number_from_heading(original)
+        if not line:
+            continue
+
+        source = "目录候选" if extractor._looks_like_toc_candidate(line) else "正文标题"
+        candidates: List[int] = []
+
+        if rule_set["l1"].match(line):
+            if source == "目录候选" or extractor._is_valid_heading_strict(line, is_l1=True):
+                current_l1_title = extractor._normalize_catalog_name(line)
+                if not extractor._match_standard_catalog_chapter(line):
+                    candidates.append(1)
+
+        if rule_set["l2"].match(line):
+            if source == "目录候选" or extractor._is_valid_heading_strict(line, is_l1=False):
+                if not extractor._match_standard_catalog_section(line, None):
+                    candidates.append(2)
+
+        for level in candidates:
+            normalized = extractor._normalize_catalog_name(line)
+            key = (normalized, level)
+            if not normalized or key in seen:
+                continue
+            seen.add(key)
+            records.append({
+                "normalized": normalized,
+                "level": "一级" if level == 1 else "二级",
+                "parent_l1": normalized if level == 1 else current_l1_title,
+            })
+
+    return records
+
+
+def append_unmatched_standard_catalog_titles(
+    md_path: Path,
+    records: List[Dict[str, Any]],
+) -> None:
+    if not records:
+        return
+
+    md_path.parent.mkdir(parents=True, exist_ok=True)
+    needs_header = not md_path.exists() or md_path.stat().st_size == 0
+    with md_path.open("a", encoding="utf-8", newline="") as file:
+        if needs_header:
+            file.write("| 一级标题 | 标题名称 | 级别 |\n")
+            file.write("|---|---|---|\n")
+        for record in records:
+            parent_l1 = str(record.get("parent_l1", "") or "").strip()
+            title = str(record.get("normalized", "") or "").strip()
+            level = str(record.get("level", "") or "").strip()
+            if title:
+                file.write(f"| {parent_l1} | {title} | {level} |\n")
+
+
 def sanitize_filename_component(value: str) -> str:
     sanitized = value.strip()
     for char in '<>:"/\\|?*':
@@ -1018,6 +1098,15 @@ def process_pdf(
         catalog_quality_rate_text=catalog_quality_rate_text,
         content_quality_rate_text=content_quality_rate_text,
     )
+    unmatched_records = collect_unmatched_standard_catalog_titles(
+        pdf_path=pdf_path,
+        extractor=extractor,
+        extractor_result=extractor_result,
+    )
+    append_unmatched_standard_catalog_titles(
+        md_path=output_path.parent / UNMATCHED_STANDARD_CATALOG_FILE_NAME,
+        records=unmatched_records,
+    )
     return output_path, quality_rate_text
 
 
@@ -1053,6 +1142,11 @@ def main() -> int:
     print(f"Extractor: {args.extractor}")
     print("=" * 80)
 
+    if output_dir is not None:
+        unmatched_path = output_dir / UNMATCHED_STANDARD_CATALOG_FILE_NAME
+        if unmatched_path.exists():
+            unmatched_path.unlink()
+
     success_count = 0
     for index, pdf_path in enumerate(pdf_files, 1):
         print(f"[{index}/{len(pdf_files)}] Processing: {pdf_path.name}")