|
@@ -0,0 +1,1325 @@
|
|
|
|
|
+from __future__ import annotations
|
|
|
|
|
+
|
|
|
|
|
+"""
|
|
|
|
|
+PDF 结构提取器。
|
|
|
|
|
+
|
|
|
|
|
+"""
|
|
|
|
|
+
|
|
|
|
|
+import re
|
|
|
|
|
+from dataclasses import dataclass
|
|
|
|
|
+from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
|
+
|
|
|
|
|
+import fitz
|
|
|
|
|
+
|
|
|
|
|
+try:
|
|
|
|
|
+ from .ocr_processor import OcrProcessor, OcrResult, TableRegion
|
|
|
|
|
+except ImportError: # pragma: no cover - direct script-style imports
|
|
|
|
|
+ try:
|
|
|
|
|
+ from ocr_processor import OcrProcessor, OcrResult, TableRegion # type: ignore
|
|
|
|
|
+ except ImportError: # pragma: no cover - OCR dependencies are optional
|
|
|
|
|
+ OcrProcessor = None # type: ignore
|
|
|
|
|
+ OcrResult = Any # type: ignore
|
|
|
|
|
+ TableRegion = Any # type: ignore
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+SECTION_TITLE_KEY = "章节标题"
|
|
|
|
|
+EMPTY_SECTION_PLACEHOLDER = "[本节无纯文本,原文档中可能为纯图片或表格]"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+TABLE_OCR_START = "[表格OCR识别结果]:"
|
|
|
|
|
+TABLE_OCR_END = "[/表格]"
|
|
|
|
|
+CN_LIST_L1_NUMERIC_L2_RULE = "Rule_8_中文序号章数字小节派"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@dataclass(frozen=True)
|
|
|
|
|
+class BodyLine:
|
|
|
|
|
+ """一条规范化后的正文行,以及它所在的 PDF 页码。"""
|
|
|
|
|
+
|
|
|
|
|
+ page: int
|
|
|
|
|
+ text: str
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class PdfStructureExtractor:
|
|
|
|
|
+ """基于规则的 PDF 正文结构提取器,可选增强表格 OCR 内容。"""
|
|
|
|
|
+
|
|
|
|
|
+ RULE_LIB = {
|
|
|
|
|
+ "Rule_1_纯数字派": {
|
|
|
|
|
+ "l1": re.compile(r"^\d{1,2}(?:[\..。])?\s+(?!\d)[\u4e00-\u9fa5A-Za-z].*"),
|
|
|
|
|
+ "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"),
|
|
|
|
|
+ },
|
|
|
|
|
+ "Rule_2_混合章派": {
|
|
|
|
|
+ "l1": re.compile(r"^第\s*(\d+)\s*[章部部分篇]\s*[、]?\s*(.*)"),
|
|
|
|
|
+ "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"),
|
|
|
|
|
+ },
|
|
|
|
|
+ "Rule_3_中英混血派": {
|
|
|
|
|
+ "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部部分篇]\s*[、]?\s*(.*)"),
|
|
|
|
|
+ "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"),
|
|
|
|
|
+ },
|
|
|
|
|
+ "Rule_4_传统公文派": {
|
|
|
|
|
+ "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部部分篇]\s*[、]?\s*(.*)"),
|
|
|
|
|
+ "l2": re.compile(r"^([一二三四五六七八九十百零两]+)[、\s]+([\u4e00-\u9fa5]+.*)"),
|
|
|
|
|
+ },
|
|
|
|
|
+ "Rule_5_单边括号派": {
|
|
|
|
|
+ "l1": re.compile(r"^第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章部部分篇]\s*[、]?\s*(.*)"),
|
|
|
|
|
+ "l2": re.compile(r"^([一二三四五六七八九十百零两]+)[))\]]\s*([\u4e00-\u9fa5]+.*)"),
|
|
|
|
|
+ },
|
|
|
|
|
+ "Rule_6_小节派": {
|
|
|
|
|
+ "l1": re.compile(r"^第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章部部分篇]\s*[、]?\s*(.*)"),
|
|
|
|
|
+ "l2": re.compile(r"^第\s*(\d+|[一二三四五六七八九十百零两]+)\s*节\s*[、]?\s*([\u4e00-\u9fa5]+.*)"),
|
|
|
|
|
+ },
|
|
|
|
|
+ "Rule_7_粗体括号派": {
|
|
|
|
|
+ "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部部分篇]\s*[、]?\s*(.*)"),
|
|
|
|
|
+ "l2": re.compile(r"^[【\[]\s*(\d+)\s*[\]】]\s*([\u4e00-\u9fa5]+.*)"),
|
|
|
|
|
+ },
|
|
|
|
|
+ CN_LIST_L1_NUMERIC_L2_RULE: {
|
|
|
|
|
+ "l1": re.compile(r"^([一二三四五六七八九十百零两]+)[、))\]]\s*([\u4e00-\u9fa5A-Za-z].*)"),
|
|
|
|
|
+ "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"),
|
|
|
|
|
+ },
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ CN_NUM_MAP = {
|
|
|
|
|
+ "零": 0,
|
|
|
|
|
+ "〇": 0,
|
|
|
|
|
+ "一": 1,
|
|
|
|
|
+ "二": 2,
|
|
|
|
|
+ "两": 2,
|
|
|
|
|
+ "三": 3,
|
|
|
|
|
+ "四": 4,
|
|
|
|
|
+ "五": 5,
|
|
|
|
|
+ "六": 6,
|
|
|
|
|
+ "七": 7,
|
|
|
|
|
+ "八": 8,
|
|
|
|
|
+ "九": 9,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ TOC_PATTERN = re.compile(r"\.{3,}|…{2,}|-{3,}|·{3,}|•{3,}")
|
|
|
|
|
+
|
|
|
|
|
+ def __init__(
|
|
|
|
|
+ self,
|
|
|
|
|
+ clip_top: float = 60,
|
|
|
|
|
+ clip_bottom: float = 60,
|
|
|
|
|
+ use_ocr: bool = False,
|
|
|
|
|
+ ocr_api_url: str = "",
|
|
|
|
|
+ ocr_timeout: int = 600,
|
|
|
|
|
+ ocr_api_key: str = "",
|
|
|
|
|
+ detect_toc: bool = True,
|
|
|
|
|
+ toc_model_path: str = "",
|
|
|
|
|
+ ):
|
|
|
|
|
+ """初始化提取参数,并在依赖可用时启用 OCR。"""
|
|
|
|
|
+
|
|
|
|
|
+ self.clip_top = clip_top
|
|
|
|
|
+ self.clip_bottom = clip_bottom
|
|
|
|
|
+ self.ocr_requested = bool(use_ocr)
|
|
|
|
|
+ self.ocr_processor = None
|
|
|
|
|
+ self.use_ocr = False
|
|
|
|
|
+ # OCR 是可选增强:rapid_layout 或 OCR 依赖缺失时,正文规则提取仍然照常运行。
|
|
|
|
|
+ if use_ocr and OcrProcessor is not None:
|
|
|
|
|
+ self.ocr_processor = OcrProcessor(
|
|
|
|
|
+ ocr_api_url=ocr_api_url,
|
|
|
|
|
+ ocr_timeout=ocr_timeout,
|
|
|
|
|
+ ocr_api_key=ocr_api_key,
|
|
|
|
|
+ )
|
|
|
|
|
+ self.use_ocr = self.ocr_processor.is_available()
|
|
|
|
|
+ self.detect_toc = False
|
|
|
|
|
+ self.ocr_api_url = ocr_api_url
|
|
|
|
|
+ self.ocr_timeout = ocr_timeout
|
|
|
|
|
+ self.ocr_api_key = ocr_api_key
|
|
|
|
|
+ self.toc_model_path = toc_model_path
|
|
|
|
|
+
|
|
|
|
|
+ def extract(self, file_content: bytes, progress_callback=None) -> Dict[str, Any]:
|
|
|
|
|
+ """提取章节、正文派生目录、规则诊断信息,以及可选的表格 OCR 内容。"""
|
|
|
|
|
+
|
|
|
|
|
+ result: Dict[str, Any] = {
|
|
|
|
|
+ "chapters": {},
|
|
|
|
|
+ "total_pages": 0,
|
|
|
|
|
+ "catalog": None,
|
|
|
|
|
+ "body_catalog": None,
|
|
|
|
|
+ "ocr_catalog": None,
|
|
|
|
|
+ "catalog_mode": "testc_body_only",
|
|
|
|
|
+ "body_rule": None,
|
|
|
|
|
+ "body_coverage": 0.0,
|
|
|
|
|
+ "rule_performance": {},
|
|
|
|
|
+ "ocr_content_mode": "disabled",
|
|
|
|
|
+ "ocr_table_count": 0,
|
|
|
|
|
+ "ocr_success_count": 0,
|
|
|
|
|
+ "ocr_inserted_count": 0,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ doc = fitz.open(stream=file_content, filetype="pdf")
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 正文切分仍由 PyMuPDF 文本和标题规则驱动,OCR 只在切分后作为小节内容补充。
|
|
|
|
|
+ body_lines = self._extract_body_lines(doc, progress_callback)
|
|
|
|
|
+ ocr_results = self._extract_table_ocr_results(doc, progress_callback)
|
|
|
|
|
+ raw_data, winning_rule, coverage_rate, rule_performance = self._extract_body_with_best_rule(body_lines)
|
|
|
|
|
+ chapters = self._convert_rule_output_to_chapters(raw_data)
|
|
|
|
|
+ ocr_stats = self._insert_ocr_results_into_chapters(chapters, ocr_results)
|
|
|
|
|
+ body_catalog = self._build_body_catalog_from_chapters(chapters)
|
|
|
|
|
+
|
|
|
|
|
+ result["chapters"] = chapters
|
|
|
|
|
+ result["total_pages"] = len(doc)
|
|
|
|
|
+ result["catalog"] = body_catalog
|
|
|
|
|
+ result["body_catalog"] = body_catalog
|
|
|
|
|
+ result["body_rule"] = winning_rule
|
|
|
|
|
+ result["body_coverage"] = coverage_rate
|
|
|
|
|
+ result["rule_performance"] = rule_performance
|
|
|
|
|
+ result["ocr_table_count"] = ocr_stats["table_count"]
|
|
|
|
|
+ result["ocr_success_count"] = ocr_stats["success_count"]
|
|
|
|
|
+ result["ocr_inserted_count"] = ocr_stats["inserted_count"]
|
|
|
|
|
+ # 记录 OCR 是否实际影响输出,方便批处理统计时判断 OCR 状态。
|
|
|
|
|
+ # disabled:默认值,表示本次没有请求 OCR。
|
|
|
|
|
+ # unavailable:请求了 OCR,但依赖不可用,例如 rapid_layout 未安装或检测器不可用。
|
|
|
|
|
+ # enabled_no_table:OCR 已启用,但没有检测到可识别的表格区域。
|
|
|
|
|
+ # table_regions_inserted:OCR 已启用,并且表格识别结果已经成功回填到正文小节。
|
|
|
|
|
+ # enabled_no_insert:OCR 已启用,但没有成功回填,通常是 OCR 失败或未定位到合适小节。
|
|
|
|
|
+ if self.ocr_requested and not self.use_ocr:
|
|
|
|
|
+ result["ocr_content_mode"] = "unavailable"
|
|
|
|
|
+ elif self.use_ocr and ocr_stats["table_count"] == 0:
|
|
|
|
|
+ result["ocr_content_mode"] = "enabled_no_table"
|
|
|
|
|
+ elif self.use_ocr and ocr_stats["inserted_count"] > 0:
|
|
|
|
|
+ result["ocr_content_mode"] = "table_regions_inserted"
|
|
|
|
|
+ elif self.use_ocr:
|
|
|
|
|
+ result["ocr_content_mode"] = "enabled_no_insert"
|
|
|
|
|
+ return result
|
|
|
|
|
+ finally:
|
|
|
|
|
+ doc.close()
|
|
|
|
|
+
|
|
|
|
|
+ def _extract_table_ocr_results(self, doc: fitz.Document, progress_callback=None) -> List[OcrResult]:
|
|
|
|
|
+ """在 OCR 启用时检测 PDF 表格区域,并发执行表格识别。"""
|
|
|
|
|
+
|
|
|
|
|
+ if not self.use_ocr or self.ocr_processor is None:
|
|
|
|
|
+ return []
|
|
|
|
|
+
|
|
|
|
|
+ def _emit_progress(stage: str, current: int, message: str) -> None:
|
|
|
|
|
+ """转发 OCR 进度,同时避免回调异常中断提取流程。"""
|
|
|
|
|
+
|
|
|
|
|
+ if not progress_callback:
|
|
|
|
|
+ return
|
|
|
|
|
+ try:
|
|
|
|
|
+ progress_callback(stage, current, message)
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ table_regions: List[TableRegion] = []
|
|
|
|
|
+ total_pages = len(doc)
|
|
|
|
|
+ for page_index in range(total_pages):
|
|
|
|
|
+ page = doc.load_page(page_index)
|
|
|
|
|
+ rect = page.rect
|
|
|
|
|
+ clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
|
|
|
|
|
+ regions = self.ocr_processor.detect_table_regions(page, page_index + 1, clip_box)
|
|
|
|
|
+ # 保存页面对象和区域坐标,便于 OcrProcessor 后续精确渲染表格裁剪区域。
|
|
|
|
|
+ for bbox, score in regions:
|
|
|
|
|
+ table_regions.append(TableRegion(
|
|
|
|
|
+ page_num=page_index + 1,
|
|
|
|
|
+ page=page,
|
|
|
|
|
+ bbox=bbox,
|
|
|
|
|
+ score=score,
|
|
|
|
|
+ ))
|
|
|
|
|
+
|
|
|
|
|
+ if page_index + 1 == total_pages or (page_index + 1) % 5 == 0:
|
|
|
|
|
+ progress = int((page_index + 1) / max(total_pages, 1) * 30)
|
|
|
|
|
+ _emit_progress("ocr_layout", progress, f"scan tables {page_index + 1}/{total_pages}")
|
|
|
|
|
+
|
|
|
|
|
+ if not table_regions:
|
|
|
|
|
+ return []
|
|
|
|
|
+
|
|
|
|
|
+ _emit_progress("ocr", 35, f"ocr tables 0/{len(table_regions)}")
|
|
|
|
|
+
|
|
|
|
|
+ def _progress_adapter(completed: int, total: int) -> None:
|
|
|
|
|
+ """把 OcrProcessor 的 completed/total 进度转换为提取器统一的进度格式。"""
|
|
|
|
|
+
|
|
|
|
|
+ progress = 35 + int(completed / max(total, 1) * 15)
|
|
|
|
|
+ _emit_progress("ocr", progress, f"ocr tables {completed}/{total}")
|
|
|
|
|
+
|
|
|
|
|
+ return self.ocr_processor.process_ocr_concurrent(
|
|
|
|
|
+ table_regions,
|
|
|
|
|
+ progress_callback=_progress_adapter,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ def _insert_ocr_results_into_chapters(
|
|
|
|
|
+ self,
|
|
|
|
|
+ chapters: Dict[str, Dict[str, Dict[str, Any]]],
|
|
|
|
|
+ ocr_results: List[OcrResult],
|
|
|
|
|
+ ) -> Dict[str, int]:
|
|
|
|
|
+ """把成功识别的表格 OCR 文本追加到同页最可能的小节正文中。"""
|
|
|
|
|
+
|
|
|
|
|
+ stats = {
|
|
|
|
|
+ "table_count": len(ocr_results),
|
|
|
|
|
+ "success_count": 0,
|
|
|
|
|
+ "inserted_count": 0,
|
|
|
|
|
+ }
|
|
|
|
|
+ if not chapters or not ocr_results:
|
|
|
|
|
+ return stats
|
|
|
|
|
+
|
|
|
|
|
+ successful_results = [
|
|
|
|
|
+ result for result in ocr_results
|
|
|
|
|
+ if getattr(result, "success", False) and str(getattr(result, "text", "") or "").strip()
|
|
|
|
|
+ ]
|
|
|
|
|
+ stats["success_count"] = len(successful_results)
|
|
|
|
|
+
|
|
|
|
|
+ for ocr_result in sorted(successful_results, key=lambda item: (item.page_num, item.bbox[1], item.bbox[0])):
|
|
|
|
|
+ # 轻量提取器在切分后不再保留文本块坐标,因此使用页码范围作为 OCR 回填的稳定定位信号。
|
|
|
|
|
+ target = self._find_ocr_target_section(chapters, ocr_result.page_num)
|
|
|
|
|
+ if target is None:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ _, _, payload = target
|
|
|
|
|
+ original_content = str(payload.get("content", "") or "").strip()
|
|
|
|
|
+ if original_content == EMPTY_SECTION_PLACEHOLDER:
|
|
|
|
|
+ original_content = ""
|
|
|
|
|
+
|
|
|
|
|
+ ocr_text = str(ocr_result.text or "").strip()
|
|
|
|
|
+ table_text = f"{TABLE_OCR_START}\n{ocr_text}\n{TABLE_OCR_END}"
|
|
|
|
|
+ payload["content"] = f"{original_content}\n\n{table_text}".strip()
|
|
|
|
|
+ payload["page_start"] = min(
|
|
|
|
|
+ self._safe_page_number(payload.get("page_start"), ocr_result.page_num),
|
|
|
|
|
+ ocr_result.page_num,
|
|
|
|
|
+ )
|
|
|
|
|
+ payload["page_end"] = max(
|
|
|
|
|
+ self._safe_page_number(payload.get("page_end"), ocr_result.page_num),
|
|
|
|
|
+ ocr_result.page_num,
|
|
|
|
|
+ )
|
|
|
|
|
+ stats["inserted_count"] += 1
|
|
|
|
|
+
|
|
|
|
|
+ return stats
|
|
|
|
|
+
|
|
|
|
|
+ def _find_ocr_target_section(
|
|
|
|
|
+ self,
|
|
|
|
|
+ chapters: Dict[str, Dict[str, Dict[str, Any]]],
|
|
|
|
|
+ page_num: int,
|
|
|
|
|
+ ) -> Optional[Tuple[str, str, Dict[str, Any]]]:
|
|
|
|
|
+ """查找页码范围最能覆盖 OCR 表格所在页的小节。"""
|
|
|
|
|
+
|
|
|
|
|
+ candidates: List[Tuple[int, int, str, str, Dict[str, Any]]] = []
|
|
|
|
|
+ fallback: Optional[Tuple[str, str, Dict[str, Any]]] = None
|
|
|
|
|
+
|
|
|
|
|
+ for chapter_title, sections in chapters.items():
|
|
|
|
|
+ if not isinstance(sections, dict):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ for section_title, payload in sections.items():
|
|
|
|
|
+ if not isinstance(payload, dict):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ page_start = self._safe_page_number(payload.get("page_start"), page_num)
|
|
|
|
|
+ page_end = self._safe_page_number(payload.get("page_end"), page_start)
|
|
|
|
|
+ if section_title == SECTION_TITLE_KEY:
|
|
|
|
|
+ if fallback is None and page_start <= page_num <= page_end:
|
|
|
|
|
+ fallback = (chapter_title, section_title, payload)
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 优先选择页码范围最窄的小节,过宽的范围通常是章节级内容外溢。
|
|
|
|
|
+ if page_start <= page_num <= page_end:
|
|
|
|
|
+ span = max(page_end - page_start, 0)
|
|
|
|
|
+ candidates.append((span, -page_start, chapter_title, section_title, payload))
|
|
|
|
|
+ elif page_start <= page_num:
|
|
|
|
|
+ fallback = (chapter_title, section_title, payload)
|
|
|
|
|
+
|
|
|
|
|
+ if candidates:
|
|
|
|
|
+ _, _, chapter_title, section_title, payload = min(candidates, key=lambda item: (item[0], item[1]))
|
|
|
|
|
+ return chapter_title, section_title, payload
|
|
|
|
|
+ return fallback
|
|
|
|
|
+
|
|
|
|
|
+ def _extract_body_lines(self, doc: fitz.Document, progress_callback=None) -> List[BodyLine]:
|
|
|
|
|
+ """读取裁剪后的页面文本,规范化正文行,并移除重复的非标题噪声。"""
|
|
|
|
|
+
|
|
|
|
|
+ page_lines_by_page: List[Tuple[int, List[str]]] = []
|
|
|
|
|
+ total_pages = len(doc)
|
|
|
|
|
+
|
|
|
|
|
+ for page_index in range(total_pages):
|
|
|
|
|
+ page = doc.load_page(page_index)
|
|
|
|
|
+ rect = page.rect
|
|
|
|
|
+ clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
|
|
|
|
|
+ text = page.get_text("text", clip=clip_box)
|
|
|
|
|
+
|
|
|
|
|
+ page_lines: List[str] = []
|
|
|
|
|
+ for line in self._prepare_page_lines(text):
|
|
|
|
|
+ stripped = line.strip()
|
|
|
|
|
+ if not stripped or self._is_header_footer(stripped):
|
|
|
|
|
+ continue
|
|
|
|
|
+ page_lines.append(stripped)
|
|
|
|
|
+
|
|
|
|
|
+ page_lines_by_page.append((page_index + 1, page_lines))
|
|
|
|
|
+
|
|
|
|
|
+ if progress_callback and (page_index + 1 == total_pages or (page_index + 1) % 10 == 0):
|
|
|
|
|
+ try:
|
|
|
|
|
+ progress_callback(
|
|
|
|
|
+ "正文抽取",
|
|
|
|
|
+ int((page_index + 1) / max(total_pages, 1) * 60),
|
|
|
|
|
+ f"读取正文页 {page_index + 1}/{total_pages}",
|
|
|
|
|
+ )
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ # 页眉页脚往往跨页重复,但真实标题不能被误删,所以只移除“不像标题”的重复行。
|
|
|
|
|
+ repeated_noise_keys = self._find_repeated_non_heading_lines(page_lines_by_page, total_pages)
|
|
|
|
|
+ body_lines: List[BodyLine] = []
|
|
|
|
|
+ for page, lines in page_lines_by_page:
|
|
|
|
|
+ for line in lines:
|
|
|
|
|
+ if self._normalize_repeated_line_key(line) in repeated_noise_keys:
|
|
|
|
|
+ continue
|
|
|
|
|
+ body_lines.append(BodyLine(page=page, text=line))
|
|
|
|
|
+ return body_lines
|
|
|
|
|
+
|
|
|
|
|
+ def _extract_body_with_best_rule(
|
|
|
|
|
+ self,
|
|
|
|
|
+ body_lines: List[BodyLine],
|
|
|
|
|
+ ) -> Tuple[Dict[str, Dict[str, List[Dict[str, Any]]]], Optional[str], float, Dict[str, Any]]:
|
|
|
|
|
+ """运行所有候选标题规则,并返回评分最高的正文结构。"""
|
|
|
|
|
+
|
|
|
|
|
+ total_raw_chars = sum(len(item.text.strip()) for item in body_lines if item.text.strip())
|
|
|
|
|
+ best_score = -9999
|
|
|
|
|
+ best_rule_name: Optional[str] = None
|
|
|
|
|
+ best_data: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
|
|
|
|
|
+ best_coverage = 0.0
|
|
|
|
|
+ rule_performance: Dict[str, Any] = {}
|
|
|
|
|
+
|
|
|
|
|
+ for rule_name, rule_set in self.RULE_LIB.items():
|
|
|
|
|
+ data = self._extract_with_rule(body_lines, rule_name, rule_set)
|
|
|
|
|
+ score, coverage_rate = self._evaluate_extraction(data, total_raw_chars)
|
|
|
|
|
+ l1_count = len(data)
|
|
|
|
|
+ l2_count = sum(
|
|
|
|
|
+ len([key for key in sections.keys() if not key.startswith("_") and key != SECTION_TITLE_KEY])
|
|
|
|
|
+ for sections in data.values()
|
|
|
|
|
+ )
|
|
|
|
|
+ if (
|
|
|
|
|
+ rule_name == CN_LIST_L1_NUMERIC_L2_RULE
|
|
|
|
|
+ and not self._is_viable_cn_list_l1_numeric_l2_structure(data, l1_count, l2_count)
|
|
|
|
|
+ ):
|
|
|
|
|
+ score -= 1500
|
|
|
|
|
+ rule_performance[rule_name] = {
|
|
|
|
|
+ "score": score,
|
|
|
|
|
+ "coverage_rate": f"{coverage_rate * 100:.1f}%",
|
|
|
|
|
+ "l1_count": l1_count,
|
|
|
|
|
+ "l2_count": l2_count,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # 规则选择以综合得分为主,覆盖率保留用于兜底过滤和诊断输出。
|
|
|
|
|
+ if score > best_score:
|
|
|
|
|
+ best_score = score
|
|
|
|
|
+ best_rule_name = rule_name
|
|
|
|
|
+ best_data = data
|
|
|
|
|
+ best_coverage = coverage_rate
|
|
|
|
|
+
|
|
|
|
|
+ if best_score <= 0 or best_coverage < 0.15:
|
|
|
|
|
+ return {}, best_rule_name, best_coverage, rule_performance
|
|
|
|
|
+
|
|
|
|
|
+ return best_data, best_rule_name, best_coverage, rule_performance
|
|
|
|
|
+
|
|
|
|
|
+ def _extract_with_rule(
|
|
|
|
|
+ self,
|
|
|
|
|
+ body_lines: List[BodyLine],
|
|
|
|
|
+ rule_name: str,
|
|
|
|
|
+ rule_set: Dict[str, re.Pattern],
|
|
|
|
|
+ ) -> Dict[str, Dict[str, List[Dict[str, Any]]]]:
|
|
|
|
|
+ """使用单个候选标题规则,把正文行切分到章节和小节桶中。"""
|
|
|
|
|
+
|
|
|
|
|
+ structured_data: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
|
|
|
|
|
+ current_l1: Optional[str] = None
|
|
|
|
|
+ current_l1_num = 0
|
|
|
|
|
+ current_l2: Optional[str] = None
|
|
|
|
|
+ pending_prefix: Optional[str] = None
|
|
|
|
|
+ pending_page: Optional[int] = None
|
|
|
|
|
+ last_l2_sub_num = 0
|
|
|
|
|
+
|
|
|
|
|
+ backup_l1: Optional[str] = None
|
|
|
|
|
+ backup_l1_num = 0
|
|
|
|
|
+ backup_l2: Optional[str] = None
|
|
|
|
|
+ backup_l2_sub_num = 0
|
|
|
|
|
+
|
|
|
|
|
+ is_numeric_l2 = rule_name in {
|
|
|
|
|
+ "Rule_1_纯数字派",
|
|
|
|
|
+ "Rule_2_混合章派",
|
|
|
|
|
+ "Rule_3_中英混血派",
|
|
|
|
|
+ CN_LIST_L1_NUMERIC_L2_RULE,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ for index, item in enumerate(body_lines):
|
|
|
|
|
+ # 先处理跨行标题碎片,再进入章/节识别,避免“第X章”单独成行时丢标题。
|
|
|
|
|
+ original_line = item.text.strip()
|
|
|
|
|
+ page = item.page
|
|
|
|
|
+ if not original_line or original_line.isdigit():
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ line = self._strip_leading_page_number_from_heading(original_line)
|
|
|
|
|
+ if pending_prefix:
|
|
|
|
|
+ line = f"{pending_prefix} {line}".strip()
|
|
|
|
|
+ original_line = line
|
|
|
|
|
+ page = pending_page or page
|
|
|
|
|
+ pending_prefix = None
|
|
|
|
|
+ pending_page = None
|
|
|
|
|
+
|
|
|
|
|
+ if self._is_incomplete_heading_fragment(line) and len(line) <= 15:
|
|
|
|
|
+ pending_prefix = line
|
|
|
|
|
+ pending_page = page
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ has_toc = self._is_toc_line(line)
|
|
|
|
|
+
|
|
|
|
|
+ match_l1 = rule_set["l1"].match(line)
|
|
|
|
|
+ if match_l1 and not has_toc:
|
|
|
|
|
+ core_text = self._blind_strip(line)
|
|
|
|
|
+ if len(core_text) < 2:
|
|
|
|
|
+ pending_prefix = line
|
|
|
|
|
+ pending_page = page
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ if self._is_valid_heading_strict(line, is_l1=True):
|
|
|
|
|
+ l1_candidate_num = self._extract_l1_number(line, rule_name, match_l1, current_l1_num)
|
|
|
|
|
+
|
|
|
|
|
+ if rule_name == CN_LIST_L1_NUMERIC_L2_RULE:
|
|
|
|
|
+ if not self._has_expected_numeric_l2_ahead(body_lines, index, l1_candidate_num):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ if rule_name == "Rule_1_纯数字派":
|
|
|
|
|
+ # 纯数字一级标题更容易误中表格行或编号列表,因此需要额外的序号和噪声校验。
|
|
|
|
|
+ if current_l1 is None and l1_candidate_num not in {1, 2}:
|
|
|
|
|
+ continue
|
|
|
|
|
+ if self._looks_like_plain_numeric_l1_noise(line):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ if rule_name in {"Rule_1_纯数字派", "Rule_2_混合章派"} and current_l1 is not None:
|
|
|
|
|
+ if l1_candidate_num < current_l1_num:
|
|
|
|
|
+ continue
|
|
|
|
|
+ if l1_candidate_num - current_l1_num > 2:
|
|
|
|
|
+ continue
|
|
|
|
|
+ if l1_candidate_num == current_l1_num:
|
|
|
|
|
+ # 同编号章节重复出现时,若旧章节尚未出现小节,则把旧章节视作误判并回收内容。
|
|
|
|
|
+ if not self._chapter_has_l2(structured_data.get(current_l1, {})):
|
|
|
|
|
+ old_preface = structured_data[current_l1].get(SECTION_TITLE_KEY, [])
|
|
|
|
|
+ old_page = self._safe_page_number(structured_data[current_l1].get("_chapter_page"), page)
|
|
|
|
|
+ restored = [{"text": current_l1, "page": old_page}] + old_preface
|
|
|
|
|
+ del structured_data[current_l1]
|
|
|
|
|
+
|
|
|
|
|
+ current_l1 = self._clean_chapter_title(line)
|
|
|
|
|
+ structured_data[current_l1] = {"_chapter_page": page} # type: ignore[assignment]
|
|
|
|
|
+ if restored:
|
|
|
|
|
+ structured_data[current_l1][SECTION_TITLE_KEY] = restored
|
|
|
|
|
+ current_l1_num = l1_candidate_num
|
|
|
|
|
+ current_l2 = None
|
|
|
|
|
+ last_l2_sub_num = 0
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ backup_l1 = current_l1
|
|
|
|
|
+ backup_l1_num = current_l1_num
|
|
|
|
|
+ backup_l2 = current_l2
|
|
|
|
|
+ backup_l2_sub_num = last_l2_sub_num
|
|
|
|
|
+
|
|
|
|
|
+ current_l1 = self._clean_chapter_title(line)
|
|
|
|
|
+ current_l1_num = l1_candidate_num
|
|
|
|
|
+ structured_data.setdefault(current_l1, {"_chapter_page": page}) # type: ignore[assignment]
|
|
|
|
|
+ current_l2 = None
|
|
|
|
|
+ last_l2_sub_num = 0
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ match_l2 = rule_set["l2"].match(line)
|
|
|
|
|
+ if current_l1 and match_l2 and not has_toc:
|
|
|
|
|
+ if self._is_valid_heading_strict(line, is_l1=False):
|
|
|
|
|
+ if is_numeric_l2:
|
|
|
|
|
+ l2_main_num = int(match_l2.group(1))
|
|
|
|
|
+ l2_sub_num = int(match_l2.group(2))
|
|
|
|
|
+
|
|
|
|
|
+ if l2_main_num != current_l1_num and l2_main_num == backup_l1_num and backup_l1 is not None:
|
|
|
|
|
+ # 若小节编号指向上一个章节,说明当前章节可能是误识别标题,优先回退到备份章节。
|
|
|
|
|
+ has_l2 = self._chapter_has_l2(structured_data.get(current_l1, {}))
|
|
|
|
|
+ if not has_l2 and current_l1 in structured_data:
|
|
|
|
|
+ fake_preface = structured_data[current_l1].get(SECTION_TITLE_KEY, [])
|
|
|
|
|
+ chapter_page = self._safe_page_number(
|
|
|
|
|
+ structured_data[current_l1].get("_chapter_page"),
|
|
|
|
|
+ page,
|
|
|
|
|
+ )
|
|
|
|
|
+ text_to_restore = [{"text": current_l1, "page": chapter_page}] + fake_preface
|
|
|
|
|
+ target_node = backup_l2 or SECTION_TITLE_KEY
|
|
|
|
|
+ structured_data.setdefault(backup_l1, {"_chapter_page": chapter_page}) # type: ignore[arg-type]
|
|
|
|
|
+ structured_data[backup_l1].setdefault(target_node, []).extend(text_to_restore)
|
|
|
|
|
+ del structured_data[current_l1]
|
|
|
|
|
+ current_l1 = backup_l1
|
|
|
|
|
+ current_l1_num = backup_l1_num
|
|
|
|
|
+ current_l2 = backup_l2
|
|
|
|
|
+ last_l2_sub_num = backup_l2_sub_num
|
|
|
|
|
+
|
|
|
|
|
+ if l2_main_num != current_l1_num:
|
|
|
|
|
+ pass
|
|
|
|
|
+ elif l2_sub_num <= last_l2_sub_num:
|
|
|
|
|
+ pass
|
|
|
|
|
+ elif self._is_suspicious_numeric_l2_jump(l2_sub_num, last_l2_sub_num):
|
|
|
|
|
+ # 大跨度跳号常见于正文引用,例如 1.2 后出现 1.9,不直接当作新小节。
|
|
|
|
|
+ pass
|
|
|
|
|
+ else:
|
|
|
|
|
+ current_l2 = self._clean_section_title(line)
|
|
|
|
|
+ last_l2_sub_num = l2_sub_num
|
|
|
|
|
+ self._ensure_section_node(structured_data, current_l1, current_l2, page)
|
|
|
|
|
+ continue
|
|
|
|
|
+ else:
|
|
|
|
|
+ l2_sub_num = self._extract_non_numeric_l2_number(match_l2.group(1))
|
|
|
|
|
+ if l2_sub_num <= last_l2_sub_num:
|
|
|
|
|
+ pass
|
|
|
|
|
+ else:
|
|
|
|
|
+ current_l2 = self._clean_section_title(line)
|
|
|
|
|
+ last_l2_sub_num = l2_sub_num
|
|
|
|
|
+ self._ensure_section_node(structured_data, current_l1, current_l2, page)
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ if current_l1 and not has_toc:
|
|
|
|
|
+ target_key = current_l2 or SECTION_TITLE_KEY
|
|
|
|
|
+ self._ensure_section_node(structured_data, current_l1, target_key, page)
|
|
|
|
|
+ structured_data[current_l1][target_key].append({"text": original_line, "page": page})
|
|
|
|
|
+
|
|
|
|
|
+ for chapter_title in list(structured_data.keys()):
|
|
|
|
|
+ chapter_sections = structured_data[chapter_title]
|
|
|
|
|
+ if list(chapter_sections.keys()) == ["_chapter_page"]:
|
|
|
|
|
+ del structured_data[chapter_title]
|
|
|
|
|
+
|
|
|
|
|
+ return structured_data
|
|
|
|
|
+
|
|
|
|
|
+ def _has_expected_numeric_l2_ahead(
|
|
|
|
|
+ self,
|
|
|
|
|
+ body_lines: List[BodyLine],
|
|
|
|
|
+ current_index: int,
|
|
|
|
|
+ chapter_number: int,
|
|
|
|
|
+ ) -> bool:
|
|
|
|
|
+ """校验中文序号一级标题后,是否跟着同主序号的数字二级标题。"""
|
|
|
|
|
+
|
|
|
|
|
+ if chapter_number <= 0 or current_index >= len(body_lines):
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ start_page = body_lines[current_index].page
|
|
|
|
|
+ max_index = min(len(body_lines), current_index + 40)
|
|
|
|
|
+ max_page = start_page + 3
|
|
|
|
|
+ expected_pattern = re.compile(
|
|
|
|
|
+ rf"^{chapter_number}\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5A-Za-z].*)"
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ for next_index in range(current_index + 1, max_index):
|
|
|
|
|
+ candidate_item = body_lines[next_index]
|
|
|
|
|
+ if candidate_item.page > max_page:
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ candidate_line = self._strip_leading_page_number_from_heading(candidate_item.text.strip())
|
|
|
|
|
+ if not candidate_line or self._is_toc_line(candidate_line):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ if (
|
|
|
|
|
+ expected_pattern.match(candidate_line)
|
|
|
|
|
+ and self._is_valid_heading_strict(candidate_line, is_l1=False)
|
|
|
|
|
+ ):
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ if next_index > current_index + 1 and any(
|
|
|
|
|
+ rule["l1"].match(candidate_line)
|
|
|
|
|
+ for name, rule in self.RULE_LIB.items()
|
|
|
|
|
+ if name != CN_LIST_L1_NUMERIC_L2_RULE
|
|
|
|
|
+ ):
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _is_viable_cn_list_l1_numeric_l2_structure(
|
|
|
|
|
+ raw_data: Dict[str, Dict[str, List[Dict[str, Any]]]],
|
|
|
|
|
+ l1_count: int,
|
|
|
|
|
+ l2_count: int,
|
|
|
|
|
+ ) -> bool:
|
|
|
|
|
+ """限制新规则只在真正形成“中文章 + 数字小节”结构时参与竞争。"""
|
|
|
|
|
+
|
|
|
|
|
+ if l1_count < 2 or l2_count < 3:
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ chapters_with_l2 = sum(
|
|
|
|
|
+ 1
|
|
|
|
|
+ for sections in raw_data.values()
|
|
|
|
|
+ if any(key for key in sections.keys() if not key.startswith("_") and key != SECTION_TITLE_KEY)
|
|
|
|
|
+ )
|
|
|
|
|
+ return chapters_with_l2 >= max(2, (l1_count + 1) // 2)
|
|
|
|
|
+
|
|
|
|
|
+ def _convert_rule_output_to_chapters(
|
|
|
|
|
+ self,
|
|
|
|
|
+ raw_data: Dict[str, Dict[str, List[Dict[str, Any]]]],
|
|
|
|
|
+ ) -> Dict[str, Dict[str, Dict[str, Any]]]:
|
|
|
|
|
+ """把规则提取出的临时结构转换为最终 chapters JSON 结构。"""
|
|
|
|
|
+
|
|
|
|
|
+ chapters: Dict[str, Dict[str, Dict[str, Any]]] = {}
|
|
|
|
|
+
|
|
|
|
|
+ for chapter_title, sections in raw_data.items():
|
|
|
|
|
+ chapter_page = self._safe_page_number(sections.get("_chapter_page"), 1)
|
|
|
|
|
+ chapter_payloads: Dict[str, Dict[str, Any]] = {}
|
|
|
|
|
+
|
|
|
|
|
+ for section_title, entries in sections.items():
|
|
|
|
|
+ if section_title.startswith("_"):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ if entries:
|
|
|
|
|
+ page_start = self._safe_page_number(entries[0].get("page"), chapter_page)
|
|
|
|
|
+ page_end = self._safe_page_number(entries[-1].get("page"), page_start)
|
|
|
|
|
+ content = "\n".join(str(entry.get("text", "") or "") for entry in entries).strip()
|
|
|
|
|
+ else:
|
|
|
|
|
+ page_start = chapter_page
|
|
|
|
|
+ page_end = chapter_page
|
|
|
|
|
+ content = ""
|
|
|
|
|
+
|
|
|
|
|
+ chapter_payloads[section_title] = {
|
|
|
|
|
+ "content": content or EMPTY_SECTION_PLACEHOLDER,
|
|
|
|
|
+ "page_start": page_start,
|
|
|
|
|
+ "page_end": page_end,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ chapter_payloads.setdefault(
|
|
|
|
|
+ SECTION_TITLE_KEY,
|
|
|
|
|
+ {"content": "", "page_start": chapter_page, "page_end": chapter_page},
|
|
|
|
|
+ )
|
|
|
|
|
+ chapters[chapter_title] = chapter_payloads
|
|
|
|
|
+
|
|
|
|
|
+ return chapters
|
|
|
|
|
+
|
|
|
|
|
+ def _evaluate_extraction(
|
|
|
|
|
+ self,
|
|
|
|
|
+ raw_data: Dict[str, Dict[str, List[Dict[str, Any]]]],
|
|
|
|
|
+ total_raw_chars: int,
|
|
|
|
|
+ ) -> Tuple[int, float]:
|
|
|
|
|
+ """根据章节数量、小节数量、空章节比例和正文覆盖率评估规则效果。"""
|
|
|
|
|
+
|
|
|
|
|
+ if not raw_data or total_raw_chars == 0:
|
|
|
|
|
+ return 0, 0.0
|
|
|
|
|
+
|
|
|
|
|
+ l1_count = len(raw_data)
|
|
|
|
|
+ l2_total_count = sum(
|
|
|
|
|
+ len([key for key in sections.keys() if not key.startswith("_") and key != SECTION_TITLE_KEY])
|
|
|
|
|
+ for sections in raw_data.values()
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ extracted_chars = 0
|
|
|
|
|
+ empty_l1_count = 0
|
|
|
|
|
+
|
|
|
|
|
+ for chapter_title, sections in raw_data.items():
|
|
|
|
|
+ extracted_chars += len(chapter_title)
|
|
|
|
|
+ chapter_has_content = False
|
|
|
|
|
+
|
|
|
|
|
+ for section_title, entries in sections.items():
|
|
|
|
|
+ if section_title.startswith("_"):
|
|
|
|
|
+ continue
|
|
|
|
|
+ if section_title != SECTION_TITLE_KEY:
|
|
|
|
|
+ extracted_chars += len(section_title)
|
|
|
|
|
+ content = "\n".join(str(entry.get("text", "") or "") for entry in entries).strip()
|
|
|
|
|
+ if content:
|
|
|
|
|
+ extracted_chars += len(content)
|
|
|
|
|
+ chapter_has_content = True
|
|
|
|
|
+
|
|
|
|
|
+ if not chapter_has_content:
|
|
|
|
|
+ empty_l1_count += 1
|
|
|
|
|
+
|
|
|
|
|
+ raw_coverage_rate = extracted_chars / total_raw_chars if total_raw_chars > 0 else 0.0
|
|
|
|
|
+ coverage_rate = min(raw_coverage_rate, 1.0)
|
|
|
|
|
+ score = 0
|
|
|
|
|
+
|
|
|
|
|
+ if 2 <= l1_count <= 25:
|
|
|
|
|
+ score += l1_count * 15
|
|
|
|
|
+ elif l1_count > 25:
|
|
|
|
|
+ score += 100
|
|
|
|
|
+
|
|
|
|
|
+ score += l2_total_count * 5
|
|
|
|
|
+
|
|
|
|
|
+ if l1_count > 0 and (empty_l1_count / l1_count) > 0.8:
|
|
|
|
|
+ score -= 500
|
|
|
|
|
+
|
|
|
|
|
+ if raw_coverage_rate > 0.8:
|
|
|
|
|
+ score += int(min(raw_coverage_rate, 1.0) * 1000)
|
|
|
|
|
+ elif raw_coverage_rate < 0.5:
|
|
|
|
|
+ score -= 1000
|
|
|
|
|
+
|
|
|
|
|
+ return score, coverage_rate
|
|
|
|
|
+
|
|
|
|
|
+ def _build_body_catalog_from_chapters(
|
|
|
|
|
+ self,
|
|
|
|
|
+ chapters: Dict[str, Dict[str, Dict[str, Any]]],
|
|
|
|
|
+ ) -> Optional[Dict[str, Any]]:
|
|
|
|
|
+ """从正文切分结果反向生成 body_catalog/catalog 结构。"""
|
|
|
|
|
+
|
|
|
|
|
+ if not chapters:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ catalog_chapters: List[Dict[str, Any]] = []
|
|
|
|
|
+ for chapter_title, sections in chapters.items():
|
|
|
|
|
+ if not isinstance(sections, dict):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ page_start, page_end = self._resolve_chapter_page_span(sections)
|
|
|
|
|
+ title_payload = sections.get(SECTION_TITLE_KEY, {})
|
|
|
|
|
+ catalog_chapter = {
|
|
|
|
|
+ "index": len(catalog_chapters) + 1,
|
|
|
|
|
+ "title": chapter_title,
|
|
|
|
|
+ "page": str(page_start),
|
|
|
|
|
+ "original": chapter_title,
|
|
|
|
|
+ "content": title_payload.get("content", "") if isinstance(title_payload, dict) else "",
|
|
|
|
|
+ "page_start": page_start,
|
|
|
|
|
+ "page_end": page_end,
|
|
|
|
|
+ "subsections": [],
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ for section_title, payload in sections.items():
|
|
|
|
|
+ if section_title == SECTION_TITLE_KEY or not isinstance(payload, dict):
|
|
|
|
|
+ continue
|
|
|
|
|
+ subsection_page_start = self._safe_page_number(payload.get("page_start"), page_start)
|
|
|
|
|
+ subsection_page_end = self._safe_page_number(payload.get("page_end"), subsection_page_start)
|
|
|
|
|
+ catalog_chapter["subsections"].append({
|
|
|
|
|
+ "title": section_title,
|
|
|
|
|
+ "page": str(subsection_page_start),
|
|
|
|
|
+ "level": 2,
|
|
|
|
|
+ "original": section_title,
|
|
|
|
|
+ "content": payload.get("content", ""),
|
|
|
|
|
+ "page_start": subsection_page_start,
|
|
|
|
|
+ "page_end": subsection_page_end,
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ catalog_chapters.append(catalog_chapter)
|
|
|
|
|
+
|
|
|
|
|
+ if not catalog_chapters:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "chapters": catalog_chapters,
|
|
|
|
|
+ "total_chapters": len(catalog_chapters),
|
|
|
|
|
+ "source": "body_titles",
|
|
|
|
|
+ "formatted_text": self._format_catalog_chapters(catalog_chapters),
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _prepare_page_lines(cls, text: str) -> List[str]:
|
|
|
|
|
+ """把页面原始文本拆成行,并提前合并可能被换行拆开的标题。"""
|
|
|
|
|
+
|
|
|
|
|
+ raw_lines = [line.strip() for line in (text or "").splitlines() if line.strip()]
|
|
|
|
|
+ prepared: List[str] = []
|
|
|
|
|
+ index = 0
|
|
|
|
|
+
|
|
|
|
|
+ while index < len(raw_lines):
|
|
|
|
|
+ merged_line, consumed = cls._merge_heading_fragment(raw_lines, index)
|
|
|
|
|
+ if merged_line:
|
|
|
|
|
+ prepared.append(merged_line)
|
|
|
|
|
+ index += consumed
|
|
|
|
|
+ continue
|
|
|
|
|
+ prepared.append(raw_lines[index])
|
|
|
|
|
+ index += 1
|
|
|
|
|
+
|
|
|
|
|
+ return prepared
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _merge_heading_fragment(cls, lines: List[str], start_index: int) -> Tuple[Optional[str], int]:
|
|
|
|
|
+ """尝试把当前位置开始的 2 到 3 行合并为一个完整标题。"""
|
|
|
|
|
+
|
|
|
|
|
+ first_line = lines[start_index].strip()
|
|
|
|
|
+ if not first_line:
|
|
|
|
|
+ return None, 1
|
|
|
|
|
+
|
|
|
|
|
+ first_normalized = cls._strip_leading_page_number_from_heading(first_line)
|
|
|
|
|
+ first_is_heading = cls._matches_any_heading(first_normalized)
|
|
|
|
|
+ first_is_incomplete = cls._is_incomplete_heading_fragment(first_normalized)
|
|
|
|
|
+ max_span = min(3, len(lines) - start_index)
|
|
|
|
|
+
|
|
|
|
|
+ for span in range(2, max_span + 1):
|
|
|
|
|
+ candidate_lines = [
|
|
|
|
|
+ cls._strip_leading_page_number_from_heading(lines[start_index + offset])
|
|
|
|
|
+ for offset in range(span)
|
|
|
|
|
+ ]
|
|
|
|
|
+ candidate_text = " ".join(item for item in candidate_lines if item).strip()
|
|
|
|
|
+ if not candidate_text or cls.TOC_PATTERN.search(candidate_text):
|
|
|
|
|
+ continue
|
|
|
|
|
+ if cls._looks_like_table_fragment(lines, start_index, span):
|
|
|
|
|
+ continue
|
|
|
|
|
+ if not cls._matches_any_heading(candidate_text):
|
|
|
|
|
+ continue
|
|
|
|
|
+ if first_is_incomplete or not first_is_heading:
|
|
|
|
|
+ return candidate_text, span
|
|
|
|
|
+
|
|
|
|
|
+ return None, 1
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _looks_like_table_fragment(cls, lines: List[str], start_index: int, span: int) -> bool:
|
|
|
|
|
+ """判断候选跨行标题是否更像表格单元格碎片。"""
|
|
|
|
|
+
|
|
|
|
|
+ first_line = lines[start_index].strip()
|
|
|
|
|
+ if not re.fullmatch(r"\d{1,2}(?:\.\d{1,2})?", first_line):
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ next_lines = [lines[idx].strip() for idx in range(start_index + 1, min(len(lines), start_index + 5))]
|
|
|
|
|
+ if next_lines and cls._is_short_table_cell(next_lines[0]):
|
|
|
|
|
+ return True
|
|
|
|
|
+ return sum(1 for item in next_lines if cls._is_short_table_cell(item) or cls._looks_like_quantity_cell(item)) >= 2
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _find_repeated_non_heading_lines(
|
|
|
|
|
+ cls,
|
|
|
|
|
+ page_lines_by_page: List[Tuple[int, List[str]]],
|
|
|
|
|
+ total_pages: int,
|
|
|
|
|
+ ) -> set[str]:
|
|
|
|
|
+ """找出跨页重复出现、且不属于标题的页眉页脚类噪声行。"""
|
|
|
|
|
+
|
|
|
|
|
+ if total_pages < 3:
|
|
|
|
|
+ return set()
|
|
|
|
|
+
|
|
|
|
|
+ pages_by_key: Dict[str, set[int]] = {}
|
|
|
|
|
+ for page, lines in page_lines_by_page:
|
|
|
|
|
+ for line in lines:
|
|
|
|
|
+ key = cls._normalize_repeated_line_key(line)
|
|
|
|
|
+ if not key or not (4 <= len(key) <= 80):
|
|
|
|
|
+ continue
|
|
|
|
|
+ normalized = cls._strip_leading_page_number_from_heading(line)
|
|
|
|
|
+ if cls._matches_any_heading(normalized) or cls._is_toc_line(normalized):
|
|
|
|
|
+ continue
|
|
|
|
|
+ pages_by_key.setdefault(key, set()).add(page)
|
|
|
|
|
+
|
|
|
|
|
+ threshold = max(3, (total_pages + 11) // 12)
|
|
|
|
|
+ return {key for key, pages in pages_by_key.items() if len(pages) >= threshold}
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _normalize_repeated_line_key(line: str) -> str:
|
|
|
|
|
+ """生成重复行检测使用的无空白 key。"""
|
|
|
|
|
+
|
|
|
|
|
+ return re.sub(r"\s+", "", str(line or "").strip())
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _matches_any_heading(cls, line: str) -> bool:
|
|
|
|
|
+ """判断文本是否命中任意一套章/节标题规则。"""
|
|
|
|
|
+
|
|
|
|
|
+ clean_line = line.strip()
|
|
|
|
|
+ return any(rule["l1"].match(clean_line) or rule["l2"].match(clean_line) for rule in cls.RULE_LIB.values())
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _is_incomplete_heading_fragment(cls, line: str) -> bool:
|
|
|
|
|
+ """识别只有编号或标题前缀、需要等待下一行拼接的标题碎片。"""
|
|
|
|
|
+
|
|
|
|
|
+ clean_line = re.sub(r"\s+", "", str(line or "").strip())
|
|
|
|
|
+ if not clean_line:
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ fragment_patterns = (
|
|
|
|
|
+ r"^第(?:\d+|[一二三四五六七八九十百零两]+)[章部部分篇]$",
|
|
|
|
|
+ r"^\d{1,2}(?:[\..。、])?$",
|
|
|
|
|
+ r"^\d{1,2}\.\d{1,2}(?!\.\d)\.?$",
|
|
|
|
|
+ r"^[一二三四五六七八九十百零两]+[、))\]]$",
|
|
|
|
|
+ r"^第(?:\d+|[一二三四五六七八九十百零两]+)节$",
|
|
|
|
|
+ r"^[【\[]\d+[\]】]$",
|
|
|
|
|
+ )
|
|
|
|
|
+ return any(re.match(pattern, clean_line) for pattern in fragment_patterns)
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _is_toc_line(cls, line: str) -> bool:
|
|
|
|
|
+ """判断一行文本是否像目录行。"""
|
|
|
|
|
+
|
|
|
|
|
+ clean_line = str(line or "").strip()
|
|
|
|
|
+ if cls.TOC_PATTERN.search(clean_line):
|
|
|
|
|
+ return True
|
|
|
|
|
+ return bool(re.search(r"\s{2,}\d{1,3}$", clean_line))
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _is_header_footer(cls, line: str) -> bool:
|
|
|
|
|
+ """过滤页码、页眉页脚和重复方案名等非正文内容。"""
|
|
|
|
|
+
|
|
|
|
|
+ compact = re.sub(r"\s+", "", str(line or "").strip())
|
|
|
|
|
+ if not compact:
|
|
|
|
|
+ return False
|
|
|
|
|
+ if compact.isdigit():
|
|
|
|
|
+ return True
|
|
|
|
|
+ if re.fullmatch(r"第\d+页(?:共\d+页)?", compact):
|
|
|
|
|
+ return True
|
|
|
|
|
+ if re.fullmatch(r"第\d+页/共\d+页", compact):
|
|
|
|
|
+ return True
|
|
|
|
|
+ if compact.upper() in {"I", "II", "III", "IV", "V", "VI", "VII", "VIII", "IX", "X"}:
|
|
|
|
|
+ return True
|
|
|
|
|
+ if compact in {"目录", "目", "录"}:
|
|
|
|
|
+ return False
|
|
|
|
|
+ normalized = cls._strip_leading_page_number_from_heading(line)
|
|
|
|
|
+ return not cls._matches_any_heading(normalized) and compact in {"专项方案", "施工方案"}
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _is_valid_heading_strict(cls, line: str, is_l1: bool = False) -> bool:
|
|
|
|
|
+ """用长度、标点、单位和数量词规则过滤疑似误命中的标题。"""
|
|
|
|
|
+
|
|
|
|
|
+ clean_line = str(line or "").strip()
|
|
|
|
|
+ if not (2 <= len(clean_line) <= 60):
|
|
|
|
|
+ return False
|
|
|
|
|
+ if clean_line.endswith(("、", ",", "。", ";", ":", ",", ";", ":")):
|
|
|
|
|
+ return False
|
|
|
|
|
+ if len(clean_line.split()) > 3:
|
|
|
|
|
+ return False
|
|
|
|
|
+ if re.search(r"[\((][A-Za-z\*/]+[\))]\s*$", clean_line):
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ unit_pattern = (
|
|
|
|
|
+ r"(?:版|版本|年一遇|倍|t|m|kg|cm|mm|km|m2|m3|㎡|m³|L|ml|MPa|kPa|kN|Hz|kW|KV|"
|
|
|
|
|
+ r"千克|公斤|千米|公里|平方米|立方米|平方|立方|分钟|小时|秒|工日|台班|台套|万元|亿元)"
|
|
|
|
|
+ )
|
|
|
|
|
+ if re.match(rf"^\d+(?:\.\d+)?\s*{unit_pattern}(?:\s|$|[\u4e00-\u9fa5])", clean_line, re.IGNORECASE):
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ quantity_pattern = (
|
|
|
|
|
+ r"(?:人|名|位|个|组|班|件|项|把|根|台|套|辆|部|架|座|块|片|张|卷|桶|袋|车|"
|
|
|
|
|
+ r"号|步|天|吨|箱|艘|磅|米|升|斤|两|次|条|孔|跨|排|层)"
|
|
|
|
|
+ )
|
|
|
|
|
+ if re.match(rf"^\d+(?:\.\d+)?\s*{quantity_pattern}(?:\s|$)", clean_line, re.IGNORECASE):
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ if is_l1:
|
|
|
|
|
+ if re.match(r"^0\d+", clean_line):
|
|
|
|
|
+ return False
|
|
|
|
|
+ number_match = re.search(r"^\d+|第\s*(\d+)", clean_line)
|
|
|
|
|
+ if number_match:
|
|
|
|
|
+ raw_number = number_match.group(1) or number_match.group(0)
|
|
|
|
|
+ if raw_number.isdigit() and int(raw_number) > 30:
|
|
|
|
|
+ return False
|
|
|
|
|
+ if re.search(r"[,。!!,??;;::]", clean_line):
|
|
|
|
|
+ return False
|
|
|
|
|
+ if len(clean_line) > 35:
|
|
|
|
|
+ return False
|
|
|
|
|
+ if cls._looks_like_numbered_quantity_line(clean_line):
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _looks_like_numbered_quantity_line(line: str) -> bool:
|
|
|
|
|
+ """判断数字开头的行是否更像数量清单而不是一级标题。"""
|
|
|
|
|
+
|
|
|
|
|
+ clean_line = re.sub(r"\s+", "", str(line or "").strip())
|
|
|
|
|
+ return bool(
|
|
|
|
|
+ re.match(
|
|
|
|
|
+ r"^\d+(?:号|步|天|吨|套|件|箱|把|根|辆|部|艘|块|片|张|卷|桶|袋|车|磅|米|升|斤|两|秒)",
|
|
|
|
|
+ clean_line,
|
|
|
|
|
+ )
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _looks_like_plain_numeric_l1_noise(line: str) -> bool:
|
|
|
|
|
+ """识别纯数字一级标题规则中常见的图名、规范名和岗位名噪声。"""
|
|
|
|
|
+
|
|
|
|
|
+ clean_line = re.sub(r"\s+", " ", str(line or "").strip())
|
|
|
|
|
+ match = re.match(r"^\d{1,2}(?:[\..。、])?\s+(.+)$", clean_line)
|
|
|
|
|
+ if not match:
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ title = match.group(1).strip()
|
|
|
|
|
+ compact = re.sub(r"\s+", "", title)
|
|
|
|
|
+ if not compact:
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ figure_terms = (
|
|
|
|
|
+ "示意图",
|
|
|
|
|
+ "布置图",
|
|
|
|
|
+ "断面图",
|
|
|
|
|
+ "构造图",
|
|
|
|
|
+ "大样图",
|
|
|
|
|
+ "详图",
|
|
|
|
|
+ "平面图",
|
|
|
|
|
+ "立面图",
|
|
|
|
|
+ "剖面图",
|
|
|
|
|
+ "流程图",
|
|
|
|
|
+ "曲线图",
|
|
|
|
|
+ )
|
|
|
|
|
+ if any(term in compact for term in figure_terms):
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ if re.search(r"(规范|标准|规程|指南|办法|条例|规定|导则|手册|文件)$", compact):
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ if re.search(r"(部|室|经理|总工|部长|主任|办公室|试验室)$", compact):
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ if re.search(r"(地震动|峰值加速度|反应谱|特征周期|场地类别|荷载组合|荷载标准值|分项系数)", compact):
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ chapter_keywords = (
|
|
|
|
|
+ "工程",
|
|
|
|
|
+ "编制",
|
|
|
|
|
+ "施工",
|
|
|
|
|
+ "安全",
|
|
|
|
|
+ "质量",
|
|
|
|
|
+ "环保",
|
|
|
|
|
+ "水保",
|
|
|
|
|
+ "文明",
|
|
|
|
|
+ "应急",
|
|
|
|
|
+ "验收",
|
|
|
|
|
+ "计算",
|
|
|
|
|
+ "附件",
|
|
|
|
|
+ "附录",
|
|
|
|
|
+ "总体",
|
|
|
|
|
+ "计划",
|
|
|
|
|
+ "组织",
|
|
|
|
|
+ "管理",
|
|
|
|
|
+ "保证",
|
|
|
|
|
+ "措施",
|
|
|
|
|
+ "方案",
|
|
|
|
|
+ "工艺",
|
|
|
|
|
+ "技术",
|
|
|
|
|
+ "要求",
|
|
|
|
|
+ "概况",
|
|
|
|
|
+ "依据",
|
|
|
|
|
+ "原则",
|
|
|
|
|
+ "资源",
|
|
|
|
|
+ "设备",
|
|
|
|
|
+ "材料",
|
|
|
|
|
+ "人员",
|
|
|
|
|
+ "进度",
|
|
|
|
|
+ "监测",
|
|
|
|
|
+ "风险",
|
|
|
|
|
+ "分析",
|
|
|
|
|
+ "检查",
|
|
|
|
|
+ "图纸",
|
|
|
|
|
+ "设计",
|
|
|
|
|
+ "部署",
|
|
|
|
|
+ "安排",
|
|
|
|
|
+ )
|
|
|
|
|
+ return not any(keyword in compact for keyword in chapter_keywords)
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _is_suspicious_numeric_l2_jump(l2_sub_num: int, last_l2_sub_num: int) -> bool:
|
|
|
|
|
+ """判断数字小节编号是否出现过大的可疑跳号。"""
|
|
|
|
|
+
|
|
|
|
|
+ if last_l2_sub_num <= 0:
|
|
|
|
|
+ return False
|
|
|
|
|
+ return l2_sub_num - last_l2_sub_num > 3
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _is_short_table_cell(text: str) -> bool:
|
|
|
|
|
+ """判断文本是否像短表格单元格。"""
|
|
|
|
|
+
|
|
|
|
|
+ clean = str(text or "").strip()
|
|
|
|
|
+ if not clean:
|
|
|
|
|
+ return False
|
|
|
|
|
+ if len(clean) <= 4 and re.fullmatch(r"[\u4e00-\u9fa5A-Za-z]{1,4}", clean):
|
|
|
|
|
+ return True
|
|
|
|
|
+ return bool(re.fullmatch(r"\d+(?:\.\d+)?", clean))
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _looks_like_quantity_cell(text: str) -> bool:
|
|
|
|
|
+ """判断文本是否像数量、单位或状态类表格单元格。"""
|
|
|
|
|
+
|
|
|
|
|
+ clean = str(text or "").strip()
|
|
|
|
|
+ if not clean:
|
|
|
|
|
+ return False
|
|
|
|
|
+ if clean in {"正常", "可使用", "若干", "大量"}:
|
|
|
|
|
+ return True
|
|
|
|
|
+ return bool(
|
|
|
|
|
+ re.match(
|
|
|
|
|
+ r"^\d+(?:\.\d+)?\s*(?:台|套|辆|部|架|座|个|件|人|m|km|cm|mm|kg|t|%)",
|
|
|
|
|
+ clean,
|
|
|
|
|
+ re.IGNORECASE,
|
|
|
|
|
+ )
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _blind_strip(text: str) -> str:
|
|
|
|
|
+ """粗略剥离标题编号前缀,用于判断剩余标题核心文本长度。"""
|
|
|
|
|
+
|
|
|
|
|
+ return re.sub(
|
|
|
|
|
+ r"^[第的一二三四五六七八九十百零两\d\.\s、))\]】\[((章节部部分篇]+",
|
|
|
|
|
+ "",
|
|
|
|
|
+ str(text or ""),
|
|
|
|
|
+ ).strip()
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _strip_leading_page_number_from_heading(cls, line: str) -> str:
|
|
|
|
|
+ """去掉标题行前方误混入的页码。"""
|
|
|
|
|
+
|
|
|
|
|
+ cleaned = re.sub(r"\s+", " ", str(line or "").strip())
|
|
|
|
|
+ if not cleaned:
|
|
|
|
|
+ return ""
|
|
|
|
|
+
|
|
|
|
|
+ return re.sub(
|
|
|
|
|
+ r"^\d{1,3}\s+(?="
|
|
|
|
|
+ r"(?:第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章部部分篇])|"
|
|
|
|
|
+ r"(?:\d{1,2}\.\d{1,2}(?!\.\d)\.?\s*[\u4e00-\u9fa5])|"
|
|
|
|
|
+ r"(?:\d{1,2}\s+[\u4e00-\u9fa5])|"
|
|
|
|
|
+ r"(?:[一二三四五六七八九十百零两]+[、))\]]\s*[\u4e00-\u9fa5])|"
|
|
|
|
|
+ r"(?:[【\[]\s*\d+\s*[\]】]\s*[\u4e00-\u9fa5])"
|
|
|
|
|
+ r")",
|
|
|
|
|
+ "",
|
|
|
|
|
+ cleaned,
|
|
|
|
|
+ count=1,
|
|
|
|
|
+ ).strip()
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _extract_l1_number(
|
|
|
|
|
+ cls,
|
|
|
|
|
+ line: str,
|
|
|
|
|
+ rule_name: str,
|
|
|
|
|
+ match_l1: re.Match[str],
|
|
|
|
|
+ current_l1_num: int,
|
|
|
|
|
+ ) -> int:
|
|
|
|
|
+ """从一级标题文本中提取章节序号,提取失败时顺延当前章节号。"""
|
|
|
|
|
+
|
|
|
|
|
+ if rule_name == "Rule_1_纯数字派":
|
|
|
|
|
+ number_match = re.match(r"^(\d+)", line)
|
|
|
|
|
+ return int(number_match.group(1)) if number_match else 999
|
|
|
|
|
+
|
|
|
|
|
+ if rule_name == "Rule_2_混合章派":
|
|
|
|
|
+ return int(match_l1.group(1))
|
|
|
|
|
+
|
|
|
|
|
+ if rule_name == CN_LIST_L1_NUMERIC_L2_RULE:
|
|
|
|
|
+ cn_match = re.match(r"^([一二三四五六七八九十百零两]+)[、))\]]", line)
|
|
|
|
|
+ if cn_match:
|
|
|
|
|
+ return cls._cn_to_int(cn_match.group(1))
|
|
|
|
|
+
|
|
|
|
|
+ chapter_match = re.search(r"^第\s*(\d+|[一二三四五六七八九十百零两]+)", line)
|
|
|
|
|
+ if chapter_match:
|
|
|
|
|
+ chapter_number = chapter_match.group(1)
|
|
|
|
|
+ return int(chapter_number) if chapter_number.isdigit() else cls._cn_to_int(chapter_number)
|
|
|
|
|
+
|
|
|
|
|
+ return current_l1_num + 1
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _extract_non_numeric_l2_number(cls, prefix: str) -> int:
|
|
|
|
|
+ """把非数字小节前缀转换为用于顺序比较的整数。"""
|
|
|
|
|
+
|
|
|
|
|
+ prefix = str(prefix or "").strip()
|
|
|
|
|
+ if prefix.isdigit():
|
|
|
|
|
+ return int(prefix)
|
|
|
|
|
+ return cls._cn_to_int(prefix)
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _cn_to_int(cls, text: str) -> int:
|
|
|
|
|
+ """把中文数字文本转换为整数。"""
|
|
|
|
|
+
|
|
|
|
|
+ normalized = str(text or "").replace("两", "二").strip()
|
|
|
|
|
+ if not normalized:
|
|
|
|
|
+ return 0
|
|
|
|
|
+ if normalized.isdigit():
|
|
|
|
|
+ return int(normalized)
|
|
|
|
|
+ if normalized == "十":
|
|
|
|
|
+ return 10
|
|
|
|
|
+ if "百" in normalized:
|
|
|
|
|
+ left, right = normalized.split("百", 1)
|
|
|
|
|
+ hundreds = cls.CN_NUM_MAP.get(left, 1) if left else 1
|
|
|
|
|
+ return hundreds * 100 + cls._cn_to_int(right)
|
|
|
|
|
+ if "十" in normalized:
|
|
|
|
|
+ left, right = normalized.split("十", 1)
|
|
|
|
|
+ tens = cls.CN_NUM_MAP.get(left, 1) if left else 1
|
|
|
|
|
+ ones = cls.CN_NUM_MAP.get(right, 0) if right else 0
|
|
|
|
|
+ return tens * 10 + ones
|
|
|
|
|
+ return cls.CN_NUM_MAP.get(normalized, 0)
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _ensure_section_node(
|
|
|
|
|
+ structured_data: Dict[str, Dict[str, List[Dict[str, Any]]]],
|
|
|
|
|
+ chapter_title: str,
|
|
|
|
|
+ section_title: str,
|
|
|
|
|
+ page: int,
|
|
|
|
|
+ ) -> None:
|
|
|
|
|
+ """确保章节和小节节点存在。"""
|
|
|
|
|
+
|
|
|
|
|
+ structured_data.setdefault(chapter_title, {"_chapter_page": page}) # type: ignore[assignment]
|
|
|
|
|
+ structured_data[chapter_title].setdefault(section_title, [])
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _chapter_has_l2(chapter_data: Dict[str, Any]) -> bool:
|
|
|
|
|
+ """判断章节临时结构中是否已经出现真实二级小节。"""
|
|
|
|
|
+
|
|
|
|
|
+ return any(key for key in chapter_data.keys() if not key.startswith("_") and key != SECTION_TITLE_KEY)
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _strip_catalog_page_suffix(text: str) -> str:
|
|
|
|
|
+ """清理目录行尾部的点线和页码。"""
|
|
|
|
|
+
|
|
|
|
|
+ cleaned = re.sub(r"\s+", " ", str(text or "").strip())
|
|
|
|
|
+ if not cleaned:
|
|
|
|
|
+ return ""
|
|
|
|
|
+ cleaned = re.sub(r"(?:[.\u2026\u00b7\u2022]{2,})[-\u2013\u2014 ]*\d+\s*$", "", cleaned).strip()
|
|
|
|
|
+ return re.sub(r"\s+\d{1,3}\s*$", "", cleaned).strip()
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _clean_chapter_title(cls, line: str) -> str:
|
|
|
|
|
+ """规范化一级标题文本,保留编号和标题主体。"""
|
|
|
|
|
+
|
|
|
|
|
+ cleaned = cls._strip_catalog_page_suffix(line)
|
|
|
|
|
+ cleaned = re.sub(r"\s+", " ", cleaned).strip()
|
|
|
|
|
+
|
|
|
|
|
+ cn_match = re.match(r"^(第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章部部分篇])[\s、::.-]*(.*)$", cleaned)
|
|
|
|
|
+ if cn_match:
|
|
|
|
|
+ prefix = re.sub(r"\s+", "", cn_match.group(1))
|
|
|
|
|
+ title = cn_match.group(2).strip()
|
|
|
|
|
+ return f"{prefix} {title}".strip()
|
|
|
|
|
+
|
|
|
|
|
+ cn_list_match = re.match(r"^([一二三四五六七八九十百零两]+[、))\]])\s*(.*)$", cleaned)
|
|
|
|
|
+ if cn_list_match:
|
|
|
|
|
+ prefix = cn_list_match.group(1).strip()
|
|
|
|
|
+ title = cn_list_match.group(2).strip()
|
|
|
|
|
+ return f"{prefix} {title}".strip()
|
|
|
|
|
+
|
|
|
|
|
+ num_match = re.match(r"^(\d{1,2})(?:[\..。、])?\s*(.*)$", cleaned)
|
|
|
|
|
+ if num_match:
|
|
|
|
|
+ prefix = num_match.group(1)
|
|
|
|
|
+ title = num_match.group(2).strip()
|
|
|
|
|
+ return f"{prefix} {title}".strip()
|
|
|
|
|
+
|
|
|
|
|
+ return cleaned
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def _clean_section_title(cls, line: str) -> str:
|
|
|
|
|
+ """规范化二级标题文本,保留小节编号和标题主体。"""
|
|
|
|
|
+
|
|
|
|
|
+ cleaned = cls._strip_catalog_page_suffix(line)
|
|
|
|
|
+ cleaned = re.sub(r"\s+", " ", cleaned).strip()
|
|
|
|
|
+
|
|
|
|
|
+ numeric_match = re.match(r"^(\d+\.\d+)(?!\.\d)\.?\s*(.*)$", cleaned)
|
|
|
|
|
+ if numeric_match:
|
|
|
|
|
+ prefix = numeric_match.group(1)
|
|
|
|
|
+ title = numeric_match.group(2).strip()
|
|
|
|
|
+ return f"{prefix} {title}".strip()
|
|
|
|
|
+
|
|
|
|
|
+ cn_section_match = re.match(r"^(第\s*[一二三四五六七八九十百零两]+\s*节)[\s、::.-]*(.*)$", cleaned)
|
|
|
|
|
+ if cn_section_match:
|
|
|
|
|
+ prefix = re.sub(r"\s+", "", cn_section_match.group(1))
|
|
|
|
|
+ title = cn_section_match.group(2).strip()
|
|
|
|
|
+ return f"{prefix} {title}".strip()
|
|
|
|
|
+
|
|
|
|
|
+ cn_list_match = re.match(r"^([一二三四五六七八九十百零两]+[、))\]])\s*(.*)$", cleaned)
|
|
|
|
|
+ if cn_list_match:
|
|
|
|
|
+ prefix = cn_list_match.group(1).strip()
|
|
|
|
|
+ title = cn_list_match.group(2).strip()
|
|
|
|
|
+ return f"{prefix} {title}".strip()
|
|
|
|
|
+
|
|
|
|
|
+ bracket_match = re.match(r"^([【\[]\s*\d+\s*[\]】])\s*(.*)$", cleaned)
|
|
|
|
|
+ if bracket_match:
|
|
|
|
|
+ prefix = re.sub(r"\s+", "", bracket_match.group(1))
|
|
|
|
|
+ title = bracket_match.group(2).strip()
|
|
|
|
|
+ return f"{prefix} {title}".strip()
|
|
|
|
|
+
|
|
|
|
|
+ return cleaned
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _resolve_chapter_page_span(sections: Dict[str, Dict[str, Any]]) -> Tuple[int, int]:
|
|
|
|
|
+ """根据章节下所有小节的页码范围,计算章节整体页码范围。"""
|
|
|
|
|
+
|
|
|
|
|
+ page_starts: List[int] = []
|
|
|
|
|
+ page_ends: List[int] = []
|
|
|
|
|
+ for payload in sections.values():
|
|
|
|
|
+ if not isinstance(payload, dict):
|
|
|
|
|
+ continue
|
|
|
|
|
+ page_start = PdfStructureExtractor._safe_page_number(payload.get("page_start"), 1)
|
|
|
|
|
+ page_end = PdfStructureExtractor._safe_page_number(payload.get("page_end"), page_start)
|
|
|
|
|
+ page_starts.append(page_start)
|
|
|
|
|
+ page_ends.append(page_end)
|
|
|
|
|
+ if not page_starts:
|
|
|
|
|
+ return 1, 1
|
|
|
|
|
+ return min(page_starts), max(page_ends)
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _format_catalog_chapters(chapters: List[Dict[str, Any]]) -> str:
|
|
|
|
|
+ """把目录章节结构格式化为便于查看的纯文本。"""
|
|
|
|
|
+
|
|
|
|
|
+ lines: List[str] = []
|
|
|
|
|
+ for chapter in chapters:
|
|
|
|
|
+ title = str(chapter.get("title", "") or "").strip()
|
|
|
|
|
+ if not title:
|
|
|
|
|
+ continue
|
|
|
|
|
+ lines.append(title)
|
|
|
|
|
+ for subsection in chapter.get("subsections", []) or []:
|
|
|
|
|
+ sub_title = str(subsection.get("title", "") or "").strip()
|
|
|
|
|
+ if sub_title:
|
|
|
|
|
+ lines.append(f" {sub_title}")
|
|
|
|
|
+ return "\n".join(lines)
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _safe_page_number(value: Any, default: int = 1) -> int:
|
|
|
|
|
+ """安全地把页码值转换为不小于 1 的整数。"""
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ return max(1, int(str(value).strip()))
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ return default
|