|
@@ -2,6 +2,7 @@ from __future__ import annotations
|
|
|
|
|
|
|
|
"""
|
|
"""
|
|
|
PDF 结构提取器。
|
|
PDF 结构提取器。
|
|
|
|
|
+不依赖ocr的目录提取,使用基于规则的正文结构切分。
|
|
|
|
|
|
|
|
"""
|
|
"""
|
|
|
|
|
|
|
@@ -10,16 +11,9 @@ from dataclasses import dataclass
|
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
|
|
|
|
import fitz
|
|
import fitz
|
|
|
|
|
+from foundation.observability.logger.loggering import review_logger as logger
|
|
|
|
|
|
|
|
-try:
|
|
|
|
|
- from .ocr_processor import OcrProcessor, OcrResult, TableRegion
|
|
|
|
|
-except ImportError: # pragma: no cover - direct script-style imports
|
|
|
|
|
- try:
|
|
|
|
|
- from ocr_processor import OcrProcessor, OcrResult, TableRegion # type: ignore
|
|
|
|
|
- except ImportError: # pragma: no cover - OCR dependencies are optional
|
|
|
|
|
- OcrProcessor = None # type: ignore
|
|
|
|
|
- OcrResult = Any # type: ignore
|
|
|
|
|
- TableRegion = Any # type: ignore
|
|
|
|
|
|
|
+from .ocr_processor import OcrProcessor, OcrResult, TableRegion
|
|
|
|
|
|
|
|
|
|
|
|
|
SECTION_TITLE_KEY = "章节标题"
|
|
SECTION_TITLE_KEY = "章节标题"
|
|
@@ -44,7 +38,10 @@ class PdfStructureExtractor:
|
|
|
|
|
|
|
|
RULE_LIB = {
|
|
RULE_LIB = {
|
|
|
"Rule_1_纯数字派": {
|
|
"Rule_1_纯数字派": {
|
|
|
- "l1": re.compile(r"^\d{1,2}(?:[\..。])?\s+(?!\d)[\u4e00-\u9fa5A-Za-z].*"),
|
|
|
|
|
|
|
+ "l1": re.compile(
|
|
|
|
|
+ r"^\d{1,2}(?:[\..。])?\s+"
|
|
|
|
|
+ r"(?:(?!\d)[\u4e00-\u9fa5A-Za-z].*|[、,,]\s*[\u4e00-\u9fa5A-Za-z0-9].*)"
|
|
|
|
|
+ ),
|
|
|
"l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"),
|
|
"l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"),
|
|
|
},
|
|
},
|
|
|
"Rule_2_混合章派": {
|
|
"Rule_2_混合章派": {
|
|
@@ -103,7 +100,7 @@ class PdfStructureExtractor:
|
|
|
ocr_timeout: int = 600,
|
|
ocr_timeout: int = 600,
|
|
|
ocr_api_key: str = "",
|
|
ocr_api_key: str = "",
|
|
|
detect_toc: bool = True,
|
|
detect_toc: bool = True,
|
|
|
- toc_model_path: str = "",
|
|
|
|
|
|
|
+ toc_model_path: str = "config/yolo/best.pt",
|
|
|
):
|
|
):
|
|
|
"""初始化提取参数,并在依赖可用时启用 OCR。"""
|
|
"""初始化提取参数,并在依赖可用时启用 OCR。"""
|
|
|
|
|
|
|
@@ -120,11 +117,12 @@ class PdfStructureExtractor:
|
|
|
ocr_api_key=ocr_api_key,
|
|
ocr_api_key=ocr_api_key,
|
|
|
)
|
|
)
|
|
|
self.use_ocr = self.ocr_processor.is_available()
|
|
self.use_ocr = self.ocr_processor.is_available()
|
|
|
- self.detect_toc = False
|
|
|
|
|
|
|
+ self.detect_toc = detect_toc
|
|
|
self.ocr_api_url = ocr_api_url
|
|
self.ocr_api_url = ocr_api_url
|
|
|
self.ocr_timeout = ocr_timeout
|
|
self.ocr_timeout = ocr_timeout
|
|
|
self.ocr_api_key = ocr_api_key
|
|
self.ocr_api_key = ocr_api_key
|
|
|
self.toc_model_path = toc_model_path
|
|
self.toc_model_path = toc_model_path
|
|
|
|
|
+ self._toc_extractor = None
|
|
|
|
|
|
|
|
def extract(self, file_content: bytes, progress_callback=None) -> Dict[str, Any]:
|
|
def extract(self, file_content: bytes, progress_callback=None) -> Dict[str, Any]:
|
|
|
"""提取章节、正文派生目录、规则诊断信息,以及可选的表格 OCR 内容。"""
|
|
"""提取章节、正文派生目录、规则诊断信息,以及可选的表格 OCR 内容。"""
|
|
@@ -135,7 +133,7 @@ class PdfStructureExtractor:
|
|
|
"catalog": None,
|
|
"catalog": None,
|
|
|
"body_catalog": None,
|
|
"body_catalog": None,
|
|
|
"ocr_catalog": None,
|
|
"ocr_catalog": None,
|
|
|
- "catalog_mode": "testc_body_only",
|
|
|
|
|
|
|
+ "catalog_mode": "none",
|
|
|
"body_rule": None,
|
|
"body_rule": None,
|
|
|
"body_coverage": 0.0,
|
|
"body_coverage": 0.0,
|
|
|
"rule_performance": {},
|
|
"rule_performance": {},
|
|
@@ -145,6 +143,13 @@ class PdfStructureExtractor:
|
|
|
"ocr_inserted_count": 0,
|
|
"ocr_inserted_count": 0,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ ocr_catalog: Optional[Dict[str, Any]] = None
|
|
|
|
|
+ # if self.detect_toc:
|
|
|
|
|
+ # try:
|
|
|
|
|
+ # ocr_catalog = self._extract_catalog(file_content, progress_callback)
|
|
|
|
|
+ # except Exception as exc:
|
|
|
|
|
+ # logger.warning(f"[PDF提取] OCR目录提取失败: {exc}")
|
|
|
|
|
+
|
|
|
doc = fitz.open(stream=file_content, filetype="pdf")
|
|
doc = fitz.open(stream=file_content, filetype="pdf")
|
|
|
try:
|
|
try:
|
|
|
# 正文切分仍由 PyMuPDF 文本和标题规则驱动,OCR 只在切分后作为小节内容补充。
|
|
# 正文切分仍由 PyMuPDF 文本和标题规则驱动,OCR 只在切分后作为小节内容补充。
|
|
@@ -157,14 +162,21 @@ class PdfStructureExtractor:
|
|
|
|
|
|
|
|
result["chapters"] = chapters
|
|
result["chapters"] = chapters
|
|
|
result["total_pages"] = len(doc)
|
|
result["total_pages"] = len(doc)
|
|
|
- result["catalog"] = body_catalog
|
|
|
|
|
result["body_catalog"] = body_catalog
|
|
result["body_catalog"] = body_catalog
|
|
|
|
|
+ #result["ocr_catalog"] = ocr_catalog
|
|
|
|
|
+ result["catalog"] = body_catalog or ocr_catalog
|
|
|
result["body_rule"] = winning_rule
|
|
result["body_rule"] = winning_rule
|
|
|
result["body_coverage"] = coverage_rate
|
|
result["body_coverage"] = coverage_rate
|
|
|
result["rule_performance"] = rule_performance
|
|
result["rule_performance"] = rule_performance
|
|
|
result["ocr_table_count"] = ocr_stats["table_count"]
|
|
result["ocr_table_count"] = ocr_stats["table_count"]
|
|
|
result["ocr_success_count"] = ocr_stats["success_count"]
|
|
result["ocr_success_count"] = ocr_stats["success_count"]
|
|
|
result["ocr_inserted_count"] = ocr_stats["inserted_count"]
|
|
result["ocr_inserted_count"] = ocr_stats["inserted_count"]
|
|
|
|
|
+ if body_catalog and ocr_catalog:
|
|
|
|
|
+ result["catalog_mode"] = "body_and_ocr"
|
|
|
|
|
+ elif body_catalog:
|
|
|
|
|
+ result["catalog_mode"] = "body_only"
|
|
|
|
|
+ elif ocr_catalog:
|
|
|
|
|
+ result["catalog_mode"] = "ocr_only"
|
|
|
# 记录 OCR 是否实际影响输出,方便批处理统计时判断 OCR 状态。
|
|
# 记录 OCR 是否实际影响输出,方便批处理统计时判断 OCR 状态。
|
|
|
# disabled:默认值,表示本次没有请求 OCR。
|
|
# disabled:默认值,表示本次没有请求 OCR。
|
|
|
# unavailable:请求了 OCR,但依赖不可用,例如 rapid_layout 未安装或检测器不可用。
|
|
# unavailable:请求了 OCR,但依赖不可用,例如 rapid_layout 未安装或检测器不可用。
|
|
@@ -183,6 +195,31 @@ class PdfStructureExtractor:
|
|
|
finally:
|
|
finally:
|
|
|
doc.close()
|
|
doc.close()
|
|
|
|
|
|
|
|
|
|
+ def _extract_catalog(self, file_content: bytes, progress_callback=None) -> Optional[Dict[str, Any]]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 提取目录结构(YOLO检测 + OCR识别)
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ {"chapters": [...], "total_chapters": N} 或 None
|
|
|
|
|
+ """
|
|
|
|
|
+ from .toc_detector import TOCCatalogExtractor
|
|
|
|
|
+
|
|
|
|
|
+ if self._toc_extractor is None:
|
|
|
|
|
+ self._toc_extractor = TOCCatalogExtractor(
|
|
|
|
|
+ model_path=self.toc_model_path,
|
|
|
|
|
+ ocr_api_url=self.ocr_api_url,
|
|
|
|
|
+ ocr_api_key=self.ocr_api_key,
|
|
|
|
|
+ ocr_timeout=self.ocr_timeout,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ catalog = self._toc_extractor.detect_and_extract(file_content, progress_callback)
|
|
|
|
|
+ if not catalog:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ normalized_catalog = dict(catalog)
|
|
|
|
|
+ normalized_catalog.setdefault("source", "ocr_toc")
|
|
|
|
|
+ return normalized_catalog
|
|
|
|
|
+
|
|
|
def _extract_table_ocr_results(self, doc: fitz.Document, progress_callback=None) -> List[OcrResult]:
|
|
def _extract_table_ocr_results(self, doc: fitz.Document, progress_callback=None) -> List[OcrResult]:
|
|
|
"""在 OCR 启用时检测 PDF 表格区域,并发执行表格识别。"""
|
|
"""在 OCR 启用时检测 PDF 表格区域,并发执行表格识别。"""
|
|
|
|
|
|
|
@@ -338,6 +375,16 @@ class PdfStructureExtractor:
|
|
|
continue
|
|
continue
|
|
|
page_lines.append(stripped)
|
|
page_lines.append(stripped)
|
|
|
|
|
|
|
|
|
|
+ recovered_headings, clipped_fragment_keys = self._recover_top_clipped_l1_headings(page, page_lines)
|
|
|
|
|
+ if clipped_fragment_keys:
|
|
|
|
|
+ page_lines = [
|
|
|
|
|
+ line
|
|
|
|
|
+ for line in page_lines
|
|
|
|
|
+ if self._normalize_repeated_line_key(line) not in clipped_fragment_keys
|
|
|
|
|
+ ]
|
|
|
|
|
+ if recovered_headings:
|
|
|
|
|
+ page_lines = recovered_headings + page_lines
|
|
|
|
|
+
|
|
|
page_lines_by_page.append((page_index + 1, page_lines))
|
|
page_lines_by_page.append((page_index + 1, page_lines))
|
|
|
|
|
|
|
|
if progress_callback and (page_index + 1 == total_pages or (page_index + 1) % 10 == 0):
|
|
if progress_callback and (page_index + 1 == total_pages or (page_index + 1) % 10 == 0):
|
|
@@ -360,6 +407,127 @@ class PdfStructureExtractor:
|
|
|
body_lines.append(BodyLine(page=page, text=line))
|
|
body_lines.append(BodyLine(page=page, text=line))
|
|
|
return body_lines
|
|
return body_lines
|
|
|
|
|
|
|
|
|
|
+ def _recover_top_clipped_l1_headings(
|
|
|
|
|
+ self,
|
|
|
|
|
+ page: fitz.Page,
|
|
|
|
|
+ page_lines: List[str],
|
|
|
|
|
+ ) -> Tuple[List[str], set[str]]:
|
|
|
|
|
+ """恢复被顶部裁剪线切坏的一级标题,并返回需要清理的碎片 key。"""
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ page_dict = page.get_text("dict")
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ return [], set()
|
|
|
|
|
+
|
|
|
|
|
+ recovered_headings: List[str] = []
|
|
|
|
|
+ fragment_keys: set[str] = set()
|
|
|
|
|
+ existing_keys = {self._normalize_repeated_line_key(line) for line in page_lines}
|
|
|
|
|
+ top_band_limit = min(page.rect.height, self.clip_top + 40)
|
|
|
|
|
+ sorted_blocks = sorted(
|
|
|
|
|
+ (block for block in page_dict.get("blocks", []) if block.get("type") == 0),
|
|
|
|
|
+ key=lambda item: item.get("bbox", [0, 0, 0, 0])[1],
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ for block in sorted_blocks:
|
|
|
|
|
+ bbox = block.get("bbox") or ()
|
|
|
|
|
+ if len(bbox) != 4:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ x0, y0, x1, y1 = bbox
|
|
|
|
|
+ if not (y0 < self.clip_top < y1):
|
|
|
|
|
+ continue
|
|
|
|
|
+ if y0 < max(0.0, self.clip_top - 35):
|
|
|
|
|
+ continue
|
|
|
|
|
+ if y1 > top_band_limit:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ full_text = self._extract_text_block_text(block)
|
|
|
|
|
+ if not full_text:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ full_lines = [line.strip() for line in self._prepare_page_lines(full_text) if line.strip()]
|
|
|
|
|
+ full_heading = next(
|
|
|
|
|
+ (
|
|
|
|
|
+ line
|
|
|
|
|
+ for line in full_lines
|
|
|
|
|
+ if self._matches_any_l1_heading(line) and self._is_valid_heading_strict(line, is_l1=True)
|
|
|
|
|
+ ),
|
|
|
|
|
+ None,
|
|
|
|
|
+ )
|
|
|
|
|
+ if not full_heading:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ full_key = self._normalize_repeated_line_key(full_heading)
|
|
|
|
|
+ if full_key in existing_keys:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ clipped_rect = fitz.Rect(x0, self.clip_top, x1, min(y1, page.rect.height))
|
|
|
|
|
+ clipped_text = page.get_text("text", clip=clipped_rect)
|
|
|
|
|
+ clipped_lines = [line.strip() for line in self._prepare_page_lines(clipped_text) if line.strip()]
|
|
|
|
|
+ if any(self._matches_any_l1_heading(line) for line in clipped_lines):
|
|
|
|
|
+ continue
|
|
|
|
|
+ if not self._looks_like_clipped_heading_loss(full_heading, clipped_lines):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ recovered_headings.append(full_heading)
|
|
|
|
|
+ existing_keys.add(full_key)
|
|
|
|
|
+ fragment_keys.update(
|
|
|
|
|
+ self._normalize_repeated_line_key(line)
|
|
|
|
|
+ for line in clipped_lines
|
|
|
|
|
+ if line and self._normalize_repeated_line_key(line) != full_key
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ return recovered_headings, fragment_keys
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _extract_text_block_text(cls, block: Dict[str, Any]) -> str:
|
|
|
|
|
+ """从 PyMuPDF 的 dict block 中按行还原文本。"""
|
|
|
|
|
+
|
|
|
|
|
+ block_lines: List[str] = []
|
|
|
|
|
+ for line in block.get("lines", []) or []:
|
|
|
|
|
+ spans = line.get("spans", []) or []
|
|
|
|
|
+ line_text = "".join(str(span.get("text", "") or "") for span in spans).strip()
|
|
|
|
|
+ if line_text:
|
|
|
|
|
+ block_lines.append(line_text)
|
|
|
|
|
+ return "\n".join(block_lines)
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _matches_any_l1_heading(cls, line: str) -> bool:
|
|
|
|
|
+ """判断文本是否命中任意一级标题规则。"""
|
|
|
|
|
+
|
|
|
|
|
+ clean_line = cls._strip_leading_page_number_from_heading(str(line or "").strip())
|
|
|
|
|
+ if not clean_line or cls._is_toc_line(clean_line):
|
|
|
|
|
+ return False
|
|
|
|
|
+ return any(rule["l1"].match(clean_line) for rule in cls.RULE_LIB.values())
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _looks_like_clipped_heading_loss(cls, full_heading: str, clipped_lines: List[str]) -> bool:
|
|
|
|
|
+ """判断裁剪后的文本是否只是完整一级标题的残片。"""
|
|
|
|
|
+
|
|
|
|
|
+ full_key = cls._normalize_repeated_line_key(full_heading)
|
|
|
|
|
+ if not full_key:
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ clipped_keys: List[str] = []
|
|
|
|
|
+ for line in clipped_lines:
|
|
|
|
|
+ key = cls._normalize_repeated_line_key(line)
|
|
|
|
|
+ if key:
|
|
|
|
|
+ clipped_keys.append(key)
|
|
|
|
|
+
|
|
|
|
|
+ if not clipped_keys:
|
|
|
|
|
+ return True
|
|
|
|
|
+ if len(clipped_keys) > 3:
|
|
|
|
|
+ return False
|
|
|
|
|
+ if any(key == full_key for key in clipped_keys):
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ combined_key = "".join(clipped_keys)
|
|
|
|
|
+ if combined_key == full_key:
|
|
|
|
|
+ return True
|
|
|
|
|
+ if combined_key and combined_key in full_key:
|
|
|
|
|
+ return True
|
|
|
|
|
+ return all(key in full_key for key in clipped_keys)
|
|
|
|
|
+
|
|
|
def _extract_body_with_best_rule(
|
|
def _extract_body_with_best_rule(
|
|
|
self,
|
|
self,
|
|
|
body_lines: List[BodyLine],
|
|
body_lines: List[BodyLine],
|
|
@@ -367,10 +535,12 @@ class PdfStructureExtractor:
|
|
|
"""运行所有候选标题规则,并返回评分最高的正文结构。"""
|
|
"""运行所有候选标题规则,并返回评分最高的正文结构。"""
|
|
|
|
|
|
|
|
total_raw_chars = sum(len(item.text.strip()) for item in body_lines if item.text.strip())
|
|
total_raw_chars = sum(len(item.text.strip()) for item in body_lines if item.text.strip())
|
|
|
|
|
+ preferred_cn_l2_style = self._detect_document_cn_order_l2_style(body_lines)
|
|
|
best_score = -9999
|
|
best_score = -9999
|
|
|
best_rule_name: Optional[str] = None
|
|
best_rule_name: Optional[str] = None
|
|
|
best_data: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
|
|
best_data: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
|
|
|
best_coverage = 0.0
|
|
best_coverage = 0.0
|
|
|
|
|
+ best_rule_style_preference = 0
|
|
|
rule_performance: Dict[str, Any] = {}
|
|
rule_performance: Dict[str, Any] = {}
|
|
|
|
|
|
|
|
for rule_name, rule_set in self.RULE_LIB.items():
|
|
for rule_name, rule_set in self.RULE_LIB.items():
|
|
@@ -381,24 +551,49 @@ class PdfStructureExtractor:
|
|
|
len([key for key in sections.keys() if not key.startswith("_") and key != SECTION_TITLE_KEY])
|
|
len([key for key in sections.keys() if not key.startswith("_") and key != SECTION_TITLE_KEY])
|
|
|
for sections in data.values()
|
|
for sections in data.values()
|
|
|
)
|
|
)
|
|
|
|
|
+ rule_guard_reason: Optional[str] = None
|
|
|
if (
|
|
if (
|
|
|
rule_name == CN_LIST_L1_NUMERIC_L2_RULE
|
|
rule_name == CN_LIST_L1_NUMERIC_L2_RULE
|
|
|
- and not self._is_viable_cn_list_l1_numeric_l2_structure(data, l1_count, l2_count)
|
|
|
|
|
):
|
|
):
|
|
|
- score -= 1500
|
|
|
|
|
|
|
+ is_viable, rule_guard_reason = self._inspect_cn_list_l1_numeric_l2_structure(
|
|
|
|
|
+ body_lines,
|
|
|
|
|
+ data,
|
|
|
|
|
+ l1_count,
|
|
|
|
|
+ l2_count,
|
|
|
|
|
+ )
|
|
|
|
|
+ if not is_viable:
|
|
|
|
|
+ score -= 1500
|
|
|
|
|
+ rule_style_preference = self._score_rule_cn_l2_style_preference(rule_name, preferred_cn_l2_style)
|
|
|
rule_performance[rule_name] = {
|
|
rule_performance[rule_name] = {
|
|
|
"score": score,
|
|
"score": score,
|
|
|
"coverage_rate": f"{coverage_rate * 100:.1f}%",
|
|
"coverage_rate": f"{coverage_rate * 100:.1f}%",
|
|
|
"l1_count": l1_count,
|
|
"l1_count": l1_count,
|
|
|
"l2_count": l2_count,
|
|
"l2_count": l2_count,
|
|
|
}
|
|
}
|
|
|
|
|
+ if rule_guard_reason:
|
|
|
|
|
+ rule_performance[rule_name]["guard_reason"] = rule_guard_reason
|
|
|
|
|
+ if rule_style_preference > 0:
|
|
|
|
|
+ rule_performance[rule_name]["style_preference"] = rule_style_preference
|
|
|
|
|
|
|
|
# 规则选择以综合得分为主,覆盖率保留用于兜底过滤和诊断输出。
|
|
# 规则选择以综合得分为主,覆盖率保留用于兜底过滤和诊断输出。
|
|
|
- if score > best_score:
|
|
|
|
|
|
|
+ if (
|
|
|
|
|
+ score > best_score
|
|
|
|
|
+ or (
|
|
|
|
|
+ score == best_score
|
|
|
|
|
+ and rule_style_preference > best_rule_style_preference
|
|
|
|
|
+ and abs(coverage_rate - best_coverage) <= 0.03
|
|
|
|
|
+ )
|
|
|
|
|
+ or (
|
|
|
|
|
+ score == best_score
|
|
|
|
|
+ and rule_style_preference == best_rule_style_preference
|
|
|
|
|
+ and coverage_rate > best_coverage
|
|
|
|
|
+ )
|
|
|
|
|
+ ):
|
|
|
best_score = score
|
|
best_score = score
|
|
|
best_rule_name = rule_name
|
|
best_rule_name = rule_name
|
|
|
best_data = data
|
|
best_data = data
|
|
|
best_coverage = coverage_rate
|
|
best_coverage = coverage_rate
|
|
|
|
|
+ best_rule_style_preference = rule_style_preference
|
|
|
|
|
|
|
|
if best_score <= 0 or best_coverage < 0.15:
|
|
if best_score <= 0 or best_coverage < 0.15:
|
|
|
return {}, best_rule_name, best_coverage, rule_performance
|
|
return {}, best_rule_name, best_coverage, rule_performance
|
|
@@ -420,6 +615,8 @@ class PdfStructureExtractor:
|
|
|
pending_prefix: Optional[str] = None
|
|
pending_prefix: Optional[str] = None
|
|
|
pending_page: Optional[int] = None
|
|
pending_page: Optional[int] = None
|
|
|
last_l2_sub_num = 0
|
|
last_l2_sub_num = 0
|
|
|
|
|
+ chapter_l2_style_hint: Optional[str] = None
|
|
|
|
|
+ chapter_line_offset = 0
|
|
|
|
|
|
|
|
backup_l1: Optional[str] = None
|
|
backup_l1: Optional[str] = None
|
|
|
backup_l1_num = 0
|
|
backup_l1_num = 0
|
|
@@ -497,6 +694,8 @@ class PdfStructureExtractor:
|
|
|
current_l1_num = l1_candidate_num
|
|
current_l1_num = l1_candidate_num
|
|
|
current_l2 = None
|
|
current_l2 = None
|
|
|
last_l2_sub_num = 0
|
|
last_l2_sub_num = 0
|
|
|
|
|
+ chapter_l2_style_hint = None
|
|
|
|
|
+ chapter_line_offset = 0
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
backup_l1 = current_l1
|
|
backup_l1 = current_l1
|
|
@@ -509,8 +708,20 @@ class PdfStructureExtractor:
|
|
|
structured_data.setdefault(current_l1, {"_chapter_page": page}) # type: ignore[assignment]
|
|
structured_data.setdefault(current_l1, {"_chapter_page": page}) # type: ignore[assignment]
|
|
|
current_l2 = None
|
|
current_l2 = None
|
|
|
last_l2_sub_num = 0
|
|
last_l2_sub_num = 0
|
|
|
|
|
+ chapter_l2_style_hint = None
|
|
|
|
|
+ chapter_line_offset = 0
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
|
|
+ if current_l1 and not has_toc:
|
|
|
|
|
+ chapter_line_offset += 1
|
|
|
|
|
+ if (
|
|
|
|
|
+ chapter_l2_style_hint is None
|
|
|
|
|
+ and chapter_line_offset <= 30
|
|
|
|
|
+ and rule_name in {"Rule_4_传统公文派", "Rule_5_单边括号派"}
|
|
|
|
|
+ and self._is_valid_heading_strict(line, is_l1=False)
|
|
|
|
|
+ ):
|
|
|
|
|
+ chapter_l2_style_hint = self._detect_cn_order_l2_style(line)
|
|
|
|
|
+
|
|
|
match_l2 = rule_set["l2"].match(line)
|
|
match_l2 = rule_set["l2"].match(line)
|
|
|
if current_l1 and match_l2 and not has_toc:
|
|
if current_l1 and match_l2 and not has_toc:
|
|
|
if self._is_valid_heading_strict(line, is_l1=False):
|
|
if self._is_valid_heading_strict(line, is_l1=False):
|
|
@@ -550,14 +761,22 @@ class PdfStructureExtractor:
|
|
|
self._ensure_section_node(structured_data, current_l1, current_l2, page)
|
|
self._ensure_section_node(structured_data, current_l1, current_l2, page)
|
|
|
continue
|
|
continue
|
|
|
else:
|
|
else:
|
|
|
- l2_sub_num = self._extract_non_numeric_l2_number(match_l2.group(1))
|
|
|
|
|
- if l2_sub_num <= last_l2_sub_num:
|
|
|
|
|
|
|
+ candidate_l2_style = self._detect_cn_order_l2_style(line)
|
|
|
|
|
+ if (
|
|
|
|
|
+ chapter_l2_style_hint is not None
|
|
|
|
|
+ and candidate_l2_style is not None
|
|
|
|
|
+ and candidate_l2_style != chapter_l2_style_hint
|
|
|
|
|
+ ):
|
|
|
pass
|
|
pass
|
|
|
else:
|
|
else:
|
|
|
- current_l2 = self._clean_section_title(line)
|
|
|
|
|
- last_l2_sub_num = l2_sub_num
|
|
|
|
|
- self._ensure_section_node(structured_data, current_l1, current_l2, page)
|
|
|
|
|
- continue
|
|
|
|
|
|
|
+ l2_sub_num = self._extract_non_numeric_l2_number(match_l2.group(1))
|
|
|
|
|
+ if l2_sub_num <= last_l2_sub_num:
|
|
|
|
|
+ pass
|
|
|
|
|
+ else:
|
|
|
|
|
+ current_l2 = self._clean_section_title(line)
|
|
|
|
|
+ last_l2_sub_num = l2_sub_num
|
|
|
|
|
+ self._ensure_section_node(structured_data, current_l1, current_l2, page)
|
|
|
|
|
+ continue
|
|
|
|
|
|
|
|
if current_l1 and not has_toc:
|
|
if current_l1 and not has_toc:
|
|
|
target_key = current_l2 or SECTION_TITLE_KEY
|
|
target_key = current_l2 or SECTION_TITLE_KEY
|
|
@@ -613,23 +832,166 @@ class PdfStructureExtractor:
|
|
|
|
|
|
|
|
return False
|
|
return False
|
|
|
|
|
|
|
|
- @staticmethod
|
|
|
|
|
- def _is_viable_cn_list_l1_numeric_l2_structure(
|
|
|
|
|
|
|
+ def _inspect_cn_list_l1_numeric_l2_structure(
|
|
|
|
|
+ self,
|
|
|
|
|
+ body_lines: List[BodyLine],
|
|
|
raw_data: Dict[str, Dict[str, List[Dict[str, Any]]]],
|
|
raw_data: Dict[str, Dict[str, List[Dict[str, Any]]]],
|
|
|
l1_count: int,
|
|
l1_count: int,
|
|
|
l2_count: int,
|
|
l2_count: int,
|
|
|
- ) -> bool:
|
|
|
|
|
- """限制新规则只在真正形成“中文章 + 数字小节”结构时参与竞争。"""
|
|
|
|
|
|
|
+ ) -> Tuple[bool, Optional[str]]:
|
|
|
|
|
+ """限制 Rule_8 只在真正缺少显式章节结构时作为兜底参与竞争。"""
|
|
|
|
|
|
|
|
if l1_count < 2 or l2_count < 3:
|
|
if l1_count < 2 or l2_count < 3:
|
|
|
- return False
|
|
|
|
|
|
|
+ return False, "insufficient_structure"
|
|
|
|
|
+
|
|
|
|
|
+ if self._has_stable_explicit_chapter_headings(body_lines):
|
|
|
|
|
+ return False, "explicit_chapter_structure"
|
|
|
|
|
+
|
|
|
|
|
+ if self._has_excessive_cn_list_l1_resets(raw_data):
|
|
|
|
|
+ return False, "cn_list_l1_resets"
|
|
|
|
|
|
|
|
chapters_with_l2 = sum(
|
|
chapters_with_l2 = sum(
|
|
|
1
|
|
1
|
|
|
for sections in raw_data.values()
|
|
for sections in raw_data.values()
|
|
|
if any(key for key in sections.keys() if not key.startswith("_") and key != SECTION_TITLE_KEY)
|
|
if any(key for key in sections.keys() if not key.startswith("_") and key != SECTION_TITLE_KEY)
|
|
|
)
|
|
)
|
|
|
- return chapters_with_l2 >= max(2, (l1_count + 1) // 2)
|
|
|
|
|
|
|
+ if chapters_with_l2 < max(2, (l1_count + 1) // 2):
|
|
|
|
|
+ return False, "too_few_chapters_with_l2"
|
|
|
|
|
+
|
|
|
|
|
+ return True, None
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _has_stable_explicit_chapter_headings(cls, body_lines: List[BodyLine]) -> bool:
|
|
|
|
|
+ """判断正文前段是否已经存在稳定的“第X章”显式章节结构。"""
|
|
|
|
|
+
|
|
|
|
|
+ chapter_numbers: List[int] = []
|
|
|
|
|
+
|
|
|
|
|
+ for item in body_lines:
|
|
|
|
|
+ line = cls._strip_leading_page_number_from_heading(item.text.strip())
|
|
|
|
|
+ if not line or cls._is_toc_line(line):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ chapter_match = re.match(
|
|
|
|
|
+ r"^第\s*(\d+|[一二三四五六七八九十百零两]+)\s*[章节部部分篇]",
|
|
|
|
|
+ line,
|
|
|
|
|
+ )
|
|
|
|
|
+ if not chapter_match:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ token = chapter_match.group(1)
|
|
|
|
|
+ chapter_num = int(token) if token.isdigit() else cls._cn_to_int(token)
|
|
|
|
|
+ if chapter_num <= 0:
|
|
|
|
|
+ continue
|
|
|
|
|
+ if chapter_numbers and chapter_numbers[-1] == chapter_num:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ chapter_numbers.append(chapter_num)
|
|
|
|
|
+ if len(chapter_numbers) >= 4:
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ return len(set(chapter_numbers)) >= 2
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _has_excessive_cn_list_l1_resets(
|
|
|
|
|
+ cls,
|
|
|
|
|
+ raw_data: Dict[str, Dict[str, List[Dict[str, Any]]]],
|
|
|
|
|
+ ) -> bool:
|
|
|
|
|
+ """判断 Rule_8 的一级序号是否出现明显重复回跳,避免章内标题被抬成顶层。"""
|
|
|
|
|
+
|
|
|
|
|
+ l1_sequence: List[int] = []
|
|
|
|
|
+ for chapter_title in raw_data.keys():
|
|
|
|
|
+ match = re.match(r"^([一二三四五六七八九十百零两]+)[、))\]]", str(chapter_title or "").strip())
|
|
|
|
|
+ if not match:
|
|
|
|
|
+ continue
|
|
|
|
|
+ chapter_num = cls._cn_to_int(match.group(1))
|
|
|
|
|
+ if chapter_num > 0:
|
|
|
|
|
+ l1_sequence.append(chapter_num)
|
|
|
|
|
+
|
|
|
|
|
+ if len(l1_sequence) < 3:
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ backward_jumps = 0
|
|
|
|
|
+ severe_resets = 0
|
|
|
|
|
+ for prev_num, curr_num in zip(l1_sequence, l1_sequence[1:]):
|
|
|
|
|
+ if curr_num < prev_num:
|
|
|
|
|
+ backward_jumps += 1
|
|
|
|
|
+ if prev_num >= 3 and curr_num <= 2:
|
|
|
|
|
+ severe_resets += 1
|
|
|
|
|
+
|
|
|
|
|
+ return severe_resets >= 1 or backward_jumps >= 2
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _detect_cn_order_l2_style(cls, line: str) -> Optional[str]:
|
|
|
|
|
+ """识别中文序号小节标题的样式,区分“ 一)”和“ 一、/一 空格”。"""
|
|
|
|
|
+
|
|
|
|
|
+ cleaned = cls._strip_catalog_page_suffix(line)
|
|
|
|
|
+ cleaned = re.sub(r"\s+", " ", str(cleaned or "").strip())
|
|
|
|
|
+ if not cleaned:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ bracket_match = re.match(
|
|
|
|
|
+ r"^[一二三四五六七八九十百零两]+[))\]]\s*[\u4e00-\u9fa5A-Za-z].*",
|
|
|
|
|
+ cleaned,
|
|
|
|
|
+ )
|
|
|
|
|
+ if bracket_match:
|
|
|
|
|
+ return "bracket"
|
|
|
|
|
+
|
|
|
|
|
+ plain_match = re.match(
|
|
|
|
|
+ r"^[一二三四五六七八九十百零两]+(?:、|\s+)\s*[\u4e00-\u9fa5A-Za-z].*",
|
|
|
|
|
+ cleaned,
|
|
|
|
|
+ )
|
|
|
|
|
+ if plain_match:
|
|
|
|
|
+ return "plain"
|
|
|
|
|
+
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ def _detect_document_cn_order_l2_style(self, body_lines: List[BodyLine]) -> Optional[str]:
|
|
|
|
|
+ """按章节扫描正文早期小节样式,为 Rule_4/5 平分时提供稳定偏好。"""
|
|
|
|
|
+
|
|
|
|
|
+ plain_count = 0
|
|
|
|
|
+ bracket_count = 0
|
|
|
|
|
+ lines_since_chapter = -1
|
|
|
|
|
+
|
|
|
|
|
+ for item in body_lines:
|
|
|
|
|
+ line = self._strip_leading_page_number_from_heading(item.text.strip())
|
|
|
|
|
+ if not line or self._is_toc_line(line):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ if re.match(r"^第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章部部分篇]", line):
|
|
|
|
|
+ lines_since_chapter = 0
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ if lines_since_chapter < 0:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ lines_since_chapter += 1
|
|
|
|
|
+ if lines_since_chapter > 30:
|
|
|
|
|
+ lines_since_chapter = -1
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ style = self._detect_cn_order_l2_style(line)
|
|
|
|
|
+ if style is None or not self._is_valid_heading_strict(line, is_l1=False):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ if style == "plain":
|
|
|
|
|
+ plain_count += 1
|
|
|
|
|
+ elif style == "bracket":
|
|
|
|
|
+ bracket_count += 1
|
|
|
|
|
+ lines_since_chapter = -1
|
|
|
|
|
+
|
|
|
|
|
+ if plain_count == bracket_count:
|
|
|
|
|
+ return None
|
|
|
|
|
+ return "plain" if plain_count > bracket_count else "bracket"
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _score_rule_cn_l2_style_preference(rule_name: str, preferred_style: Optional[str]) -> int:
|
|
|
|
|
+ """把文档级样式偏好映射到规则选择的平分决胜分。"""
|
|
|
|
|
+
|
|
|
|
|
+ if preferred_style == "plain" and rule_name == "Rule_4_传统公文派":
|
|
|
|
|
+ return 1
|
|
|
|
|
+ if preferred_style == "bracket" and rule_name == "Rule_5_单边括号派":
|
|
|
|
|
+ return 1
|
|
|
|
|
+ return 0
|
|
|
|
|
|
|
|
def _convert_rule_output_to_chapters(
|
|
def _convert_rule_output_to_chapters(
|
|
|
self,
|
|
self,
|
|
@@ -1064,6 +1426,8 @@ class PdfStructureExtractor:
|
|
|
"设计",
|
|
"设计",
|
|
|
"部署",
|
|
"部署",
|
|
|
"安排",
|
|
"安排",
|
|
|
|
|
+ "方法",
|
|
|
|
|
+ "参数",
|
|
|
)
|
|
)
|
|
|
return not any(keyword in compact for keyword in chapter_keywords)
|
|
return not any(keyword in compact for keyword in chapter_keywords)
|
|
|
|
|
|