| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- """
- 简化版 PDF 结构提取器
- 基于 PyMuPDF 的规则引擎,将 PDF 按一级/二级标题切分为章节结构。
- 不依赖 OCR,不依赖任何 core/foundation 代码。
- """
- import re
- from dataclasses import dataclass
- from typing import Any, Dict, List, Optional, Tuple
- import fitz
- @dataclass(frozen=True)
- class BodyLine:
- """一条规范化后的正文行,以及它所在的 PDF 页码。"""
- page: int
- text: str
- class SimplePdfExtractor:
- """基于规则的 PDF 正文结构提取器。"""
- RULE_LIB = {
- "Rule_1_纯数字派": {
- "l1": re.compile(
- r"^\d{1,2}(?:[\..。])?\s+(?:(?!\d)[\u4e00-\u9fa5A-Za-z].*|[、,,]\s*[\u4e00-\u9fa5A-Za-z0-9].*)"
- ),
- "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"),
- },
- "Rule_2_混合章派": {
- "l1": re.compile(r"^第\s*(\d+)\s*[章部部分篇]\s*[,、]?\s*(.*)"),
- "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"),
- },
- "Rule_3_中英混血派": {
- "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部部分篇]\s*[,、]?\s*(.*)"),
- "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"),
- },
- "Rule_4_传统公文派": {
- "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部部分篇]\s*[,、]?\s*(.*)"),
- "l2": re.compile(r"^([一二三四五六七八九十百零两]+)[,、\s]+([\u4e00-\u9fa5]+.*)"),
- },
- "Rule_5_单边括号派": {
- "l1": re.compile(r"^第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章部部分篇]\s*[,、]?\s*(.*)"),
- "l2": re.compile(r"^([一二三四五六七八九十百零两]+)[))\]]\s*([\u4e00-\u9fa5]+.*)"),
- },
- "Rule_6_小节派": {
- "l1": re.compile(r"^第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章部部分篇]\s*[,、]?\s*(.*)"),
- "l2": re.compile(r"^第\s*(\d+|[一二三四五六七八九十百零两]+)\s*节\s*[,、]?\s*([\u4e00-\u9fa5]+.*)"),
- },
- "Rule_7_粗体括号派": {
- "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部部分篇]\s*[,、]?\s*(.*)"),
- "l2": re.compile(r"^[【\[]\s*(\d+)\s*[\]】]\s*([\u4e00-\u9fa5]+.*)"),
- },
- "Rule_8_中文序号章数字小节派": {
- "l1": re.compile(r"^([一二三四五六七八九十百零两]+)[,、))\]]\s*([\u4e00-\u9fa5A-Za-z].*)"),
- "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"),
- },
- }
- CN_NUM_MAP = {
- "零": 0, "〇": 0, "一": 1, "二": 2, "两": 2, "三": 3, "四": 4,
- "五": 5, "六": 6, "七": 7, "八": 8, "九": 9,
- }
- TOC_PATTERN = re.compile(r"\.{3,}|…{2,}|-{3,}|·{3,}|•{3,}")
- def __init__(self, clip_top: float = 60, clip_bottom: float = 60):
- self.clip_top = clip_top
- self.clip_bottom = clip_bottom
- def extract(self, file_content: bytes) -> Dict[str, Any]:
- """提取章节结构。"""
- result: Dict[str, Any] = {
- "chapters": {},
- "total_pages": 0,
- }
- doc = fitz.open(stream=file_content, filetype="pdf")
- try:
- body_lines = self._extract_body_lines(doc)
- raw_data, winning_rule, coverage_rate, rule_performance = self._extract_body_with_best_rule(body_lines)
- chapters = self._convert_rule_output_to_chapters(raw_data)
- result["chapters"] = chapters
- result["total_pages"] = len(doc)
- result["body_rule"] = winning_rule
- result["body_coverage"] = coverage_rate
- result["rule_performance"] = rule_performance
- return result
- finally:
- doc.close()
- def _extract_body_lines(self, doc: fitz.Document) -> List[BodyLine]:
- """读取裁剪后的页面文本,规范化正文行。"""
- page_lines_by_page: List[Tuple[int, List[str]]] = []
- total_pages = len(doc)
- for page_index in range(total_pages):
- page = doc.load_page(page_index)
- rect = page.rect
- clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
- text = page.get_text("text", clip=clip_box)
- page_lines: List[str] = []
- for line in text.splitlines():
- stripped = line.strip()
- if not stripped or self._is_header_footer(stripped):
- continue
- page_lines.append(stripped)
- page_lines_by_page.append((page_index + 1, page_lines))
- # 移除跨页重复的非标题噪声(页眉页脚)
- repeated_noise_keys = self._find_repeated_non_heading_lines(page_lines_by_page, total_pages)
- body_lines: List[BodyLine] = []
- for page, lines in page_lines_by_page:
- for line in lines:
- if self._normalize_repeated_line_key(line) in repeated_noise_keys:
- continue
- body_lines.append(BodyLine(page=page, text=line))
- return body_lines
- def _is_header_footer(self, text: str) -> bool:
- """判断是否为页眉页脚。"""
- # 纯数字页码
- if re.match(r"^\d+$", text):
- return True
- # 常见页眉格式
- if re.match(r"^(四川路桥|专项施工方案|第\s*\d+\s*页|Page\s*\d+)$", text, re.IGNORECASE):
- return True
- return False
- def _normalize_repeated_line_key(self, text: str) -> str:
- """归一化行文本,用于检测重复。"""
- return text.replace(" ", "").replace("\t", "").replace("\u3000", "")
- def _find_repeated_non_heading_lines(self, page_lines_by_page: List[Tuple[int, List[str]]], total_pages: int) -> set:
- """找出跨页重复且不像标题的行。"""
- line_counts: Dict[str, int] = {}
- for _, lines in page_lines_by_page:
- for line in lines:
- key = self._normalize_repeated_line_key(line)
- line_counts[key] = line_counts.get(key, 0) + 1
- repeated = set()
- for key, count in line_counts.items():
- if count >= 2 and count >= total_pages * 0.3:
- # 只移除明显不像标题的重复行
- sample = next((line for _, lines in page_lines_by_page for line in lines
- if self._normalize_repeated_line_key(line) == key), "")
- if not self._looks_like_heading(sample):
- repeated.add(key)
- return repeated
- def _looks_like_heading(self, text: str) -> bool:
- """判断文本是否像标题。"""
- for rule_name, rule in self.RULE_LIB.items():
- if rule["l1"].match(text) or rule["l2"].match(text):
- return True
- return False
- def _extract_body_with_best_rule(
- self, body_lines: List[BodyLine]
- ) -> Tuple[Dict[str, Any], str, float, Dict[str, Any]]:
- """用所有规则竞争,选出覆盖率最高的规则。"""
- best_result = None
- best_rule = ""
- best_coverage = 0.0
- rule_performance = {}
- for rule_name, rule in self.RULE_LIB.items():
- try:
- result, coverage = self._apply_rule(body_lines, rule["l1"], rule["l2"])
- rule_performance[rule_name] = {"coverage": coverage}
- if coverage > best_coverage:
- best_coverage = coverage
- best_result = result
- best_rule = rule_name
- except Exception:
- rule_performance[rule_name] = {"coverage": 0.0, "error": True}
- if best_result is None:
- best_result = {}
- best_rule = "none"
- best_coverage = 0.0
- return best_result, best_rule, best_coverage, rule_performance
- def _apply_rule(
- self,
- body_lines: List[BodyLine],
- l1_pattern: re.Pattern,
- l2_pattern: re.Pattern,
- ) -> Tuple[Dict[str, Any], float]:
- """应用一组规则,提取章节结构。"""
- result: Dict[str, Any] = {"chapters": []}
- current_chapter = None
- current_section = None
- current_content_lines: List[str] = []
- current_pages: List[int] = []
- total_lines = len(body_lines)
- heading_lines = 0
- def _flush_section():
- nonlocal current_chapter, current_section, current_content_lines, current_pages
- if current_chapter is None:
- return
- if current_section is None:
- # 章节标题行
- chapter_data = result["chapters"][-1] if result["chapters"] else None
- if chapter_data:
- chapter_data["sections"]["章节标题"]["content"] = "\n".join(current_content_lines).strip()
- if current_pages:
- chapter_data["sections"]["章节标题"]["page_start"] = min(current_pages)
- chapter_data["sections"]["章节标题"]["page_end"] = max(current_pages)
- else:
- chapter_data = result["chapters"][-1] if result["chapters"] else None
- if chapter_data and current_section in chapter_data["sections"]:
- chapter_data["sections"][current_section]["content"] = "\n".join(current_content_lines).strip()
- if current_pages:
- chapter_data["sections"][current_section]["page_start"] = min(current_pages)
- chapter_data["sections"][current_section]["page_end"] = max(current_pages)
- current_content_lines = []
- current_pages = []
- for line in body_lines:
- text = line.text
- page = line.page
- l1_match = l1_pattern.match(text)
- l2_match = l2_pattern.match(text)
- if l1_match and not l2_match:
- # 一级标题
- _flush_section()
- current_chapter = text
- current_section = None
- result["chapters"].append({
- "title": text,
- "page_start": page,
- "sections": {
- "章节标题": {
- "content": "",
- "page_start": page,
- "page_end": page,
- }
- }
- })
- current_content_lines = [text]
- current_pages = [page]
- heading_lines += 1
- elif l2_match and current_chapter is not None:
- # 二级标题
- _flush_section()
- current_section = text
- chapter_data = result["chapters"][-1]
- chapter_data["sections"][text] = {
- "content": "",
- "page_start": page,
- "page_end": page,
- }
- current_content_lines = [text]
- current_pages = [page]
- heading_lines += 1
- else:
- # 正文
- current_content_lines.append(text)
- current_pages.append(page)
- _flush_section()
- # 计算覆盖率
- coverage = heading_lines / max(total_lines, 1)
- return result, coverage
- def _convert_rule_output_to_chapters(self, raw_data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
- """将规则输出转换为以章节标题为键的字典。"""
- chapters: Dict[str, Dict[str, Any]] = {}
- for chapter in raw_data.get("chapters", []):
- title = chapter.get("title", "未命名章节")
- sections = {}
- for sec_name, sec_data in chapter.get("sections", {}).items():
- sections[sec_name] = {
- "content": sec_data.get("content", ""),
- "page_start": sec_data.get("page_start", 1),
- "page_end": sec_data.get("page_end", 1),
- }
- chapters[title] = sections
- return chapters
|