""" 简化版 PDF 结构提取器 基于 PyMuPDF 的规则引擎,将 PDF 按一级/二级标题切分为章节结构。 不依赖 OCR,不依赖任何 core/foundation 代码。 """ import re from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple import fitz @dataclass(frozen=True) class BodyLine: """一条规范化后的正文行,以及它所在的 PDF 页码。""" page: int text: str class SimplePdfExtractor: """基于规则的 PDF 正文结构提取器。""" RULE_LIB = { "Rule_1_纯数字派": { "l1": re.compile( r"^\d{1,2}(?:[\..。])?\s+(?:(?!\d)[\u4e00-\u9fa5A-Za-z].*|[、,,]\s*[\u4e00-\u9fa5A-Za-z0-9].*)" ), "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"), }, "Rule_2_混合章派": { "l1": re.compile(r"^第\s*(\d+)\s*[章部部分篇]\s*[,、]?\s*(.*)"), "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"), }, "Rule_3_中英混血派": { "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部部分篇]\s*[,、]?\s*(.*)"), "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"), }, "Rule_4_传统公文派": { "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部部分篇]\s*[,、]?\s*(.*)"), "l2": re.compile(r"^([一二三四五六七八九十百零两]+)[,、\s]+([\u4e00-\u9fa5]+.*)"), }, "Rule_5_单边括号派": { "l1": re.compile(r"^第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章部部分篇]\s*[,、]?\s*(.*)"), "l2": re.compile(r"^([一二三四五六七八九十百零两]+)[))\]]\s*([\u4e00-\u9fa5]+.*)"), }, "Rule_6_小节派": { "l1": re.compile(r"^第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章部部分篇]\s*[,、]?\s*(.*)"), "l2": re.compile(r"^第\s*(\d+|[一二三四五六七八九十百零两]+)\s*节\s*[,、]?\s*([\u4e00-\u9fa5]+.*)"), }, "Rule_7_粗体括号派": { "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部部分篇]\s*[,、]?\s*(.*)"), "l2": re.compile(r"^[【\[]\s*(\d+)\s*[\]】]\s*([\u4e00-\u9fa5]+.*)"), }, "Rule_8_中文序号章数字小节派": { "l1": re.compile(r"^([一二三四五六七八九十百零两]+)[,、))\]]\s*([\u4e00-\u9fa5A-Za-z].*)"), "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"), }, } CN_NUM_MAP = { "零": 0, "〇": 0, "一": 1, "二": 2, "两": 2, "三": 3, "四": 4, "五": 5, "六": 6, "七": 7, "八": 8, "九": 9, } TOC_PATTERN = re.compile(r"\.{3,}|…{2,}|-{3,}|·{3,}|•{3,}") def __init__(self, clip_top: float = 60, clip_bottom: float = 60): self.clip_top = clip_top self.clip_bottom = clip_bottom def extract(self, file_content: bytes) -> Dict[str, Any]: """提取章节结构。""" result: Dict[str, Any] = { "chapters": {}, "total_pages": 0, } doc = fitz.open(stream=file_content, filetype="pdf") try: body_lines = self._extract_body_lines(doc) raw_data, winning_rule, coverage_rate, rule_performance = self._extract_body_with_best_rule(body_lines) chapters = self._convert_rule_output_to_chapters(raw_data) result["chapters"] = chapters result["total_pages"] = len(doc) result["body_rule"] = winning_rule result["body_coverage"] = coverage_rate result["rule_performance"] = rule_performance return result finally: doc.close() def _extract_body_lines(self, doc: fitz.Document) -> List[BodyLine]: """读取裁剪后的页面文本,规范化正文行。""" page_lines_by_page: List[Tuple[int, List[str]]] = [] total_pages = len(doc) for page_index in range(total_pages): page = doc.load_page(page_index) rect = page.rect clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom) text = page.get_text("text", clip=clip_box) page_lines: List[str] = [] for line in text.splitlines(): stripped = line.strip() if not stripped or self._is_header_footer(stripped): continue page_lines.append(stripped) page_lines_by_page.append((page_index + 1, page_lines)) # 移除跨页重复的非标题噪声(页眉页脚) repeated_noise_keys = self._find_repeated_non_heading_lines(page_lines_by_page, total_pages) body_lines: List[BodyLine] = [] for page, lines in page_lines_by_page: for line in lines: if self._normalize_repeated_line_key(line) in repeated_noise_keys: continue body_lines.append(BodyLine(page=page, text=line)) return body_lines def _is_header_footer(self, text: str) -> bool: """判断是否为页眉页脚。""" # 纯数字页码 if re.match(r"^\d+$", text): return True # 常见页眉格式 if re.match(r"^(四川路桥|专项施工方案|第\s*\d+\s*页|Page\s*\d+)$", text, re.IGNORECASE): return True return False def _normalize_repeated_line_key(self, text: str) -> str: """归一化行文本,用于检测重复。""" return text.replace(" ", "").replace("\t", "").replace("\u3000", "") def _find_repeated_non_heading_lines(self, page_lines_by_page: List[Tuple[int, List[str]]], total_pages: int) -> set: """找出跨页重复且不像标题的行。""" line_counts: Dict[str, int] = {} for _, lines in page_lines_by_page: for line in lines: key = self._normalize_repeated_line_key(line) line_counts[key] = line_counts.get(key, 0) + 1 repeated = set() for key, count in line_counts.items(): if count >= 2 and count >= total_pages * 0.3: # 只移除明显不像标题的重复行 sample = next((line for _, lines in page_lines_by_page for line in lines if self._normalize_repeated_line_key(line) == key), "") if not self._looks_like_heading(sample): repeated.add(key) return repeated def _looks_like_heading(self, text: str) -> bool: """判断文本是否像标题。""" for rule_name, rule in self.RULE_LIB.items(): if rule["l1"].match(text) or rule["l2"].match(text): return True return False def _extract_body_with_best_rule( self, body_lines: List[BodyLine] ) -> Tuple[Dict[str, Any], str, float, Dict[str, Any]]: """用所有规则竞争,选出覆盖率最高的规则。""" best_result = None best_rule = "" best_coverage = 0.0 rule_performance = {} for rule_name, rule in self.RULE_LIB.items(): try: result, coverage = self._apply_rule(body_lines, rule["l1"], rule["l2"]) rule_performance[rule_name] = {"coverage": coverage} if coverage > best_coverage: best_coverage = coverage best_result = result best_rule = rule_name except Exception: rule_performance[rule_name] = {"coverage": 0.0, "error": True} if best_result is None: best_result = {} best_rule = "none" best_coverage = 0.0 return best_result, best_rule, best_coverage, rule_performance def _apply_rule( self, body_lines: List[BodyLine], l1_pattern: re.Pattern, l2_pattern: re.Pattern, ) -> Tuple[Dict[str, Any], float]: """应用一组规则,提取章节结构。""" result: Dict[str, Any] = {"chapters": []} current_chapter = None current_section = None current_content_lines: List[str] = [] current_pages: List[int] = [] total_lines = len(body_lines) heading_lines = 0 def _flush_section(): nonlocal current_chapter, current_section, current_content_lines, current_pages if current_chapter is None: return if current_section is None: # 章节标题行 chapter_data = result["chapters"][-1] if result["chapters"] else None if chapter_data: chapter_data["sections"]["章节标题"]["content"] = "\n".join(current_content_lines).strip() if current_pages: chapter_data["sections"]["章节标题"]["page_start"] = min(current_pages) chapter_data["sections"]["章节标题"]["page_end"] = max(current_pages) else: chapter_data = result["chapters"][-1] if result["chapters"] else None if chapter_data and current_section in chapter_data["sections"]: chapter_data["sections"][current_section]["content"] = "\n".join(current_content_lines).strip() if current_pages: chapter_data["sections"][current_section]["page_start"] = min(current_pages) chapter_data["sections"][current_section]["page_end"] = max(current_pages) current_content_lines = [] current_pages = [] for line in body_lines: text = line.text page = line.page l1_match = l1_pattern.match(text) l2_match = l2_pattern.match(text) if l1_match and not l2_match: # 一级标题 _flush_section() current_chapter = text current_section = None result["chapters"].append({ "title": text, "page_start": page, "sections": { "章节标题": { "content": "", "page_start": page, "page_end": page, } } }) current_content_lines = [text] current_pages = [page] heading_lines += 1 elif l2_match and current_chapter is not None: # 二级标题 _flush_section() current_section = text chapter_data = result["chapters"][-1] chapter_data["sections"][text] = { "content": "", "page_start": page, "page_end": page, } current_content_lines = [text] current_pages = [page] heading_lines += 1 else: # 正文 current_content_lines.append(text) current_pages.append(page) _flush_section() # 计算覆盖率 coverage = heading_lines / max(total_lines, 1) return result, coverage def _convert_rule_output_to_chapters(self, raw_data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: """将规则输出转换为以章节标题为键的字典。""" chapters: Dict[str, Dict[str, Any]] = {} for chapter in raw_data.get("chapters", []): title = chapter.get("title", "未命名章节") sections = {} for sec_name, sec_data in chapter.get("sections", {}).items(): sections[sec_name] = { "content": sec_data.get("content", ""), "page_start": sec_data.get("page_start", 1), "page_end": sec_data.get("page_end", 1), } chapters[title] = sections return chapters