pdf_extractor.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. """
  2. 简化版 PDF 结构提取器
  3. 基于 PyMuPDF 的规则引擎,将 PDF 按一级/二级标题切分为章节结构。
  4. 不依赖 OCR,不依赖任何 core/foundation 代码。
  5. """
  6. import re
  7. from dataclasses import dataclass
  8. from typing import Any, Dict, List, Optional, Tuple
  9. import fitz
  10. @dataclass(frozen=True)
  11. class BodyLine:
  12. """一条规范化后的正文行,以及它所在的 PDF 页码。"""
  13. page: int
  14. text: str
  15. class SimplePdfExtractor:
  16. """基于规则的 PDF 正文结构提取器。"""
  17. RULE_LIB = {
  18. "Rule_1_纯数字派": {
  19. "l1": re.compile(
  20. r"^\d{1,2}(?:[\..。])?\s+(?:(?!\d)[\u4e00-\u9fa5A-Za-z].*|[、,,]\s*[\u4e00-\u9fa5A-Za-z0-9].*)"
  21. ),
  22. "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"),
  23. },
  24. "Rule_2_混合章派": {
  25. "l1": re.compile(r"^第\s*(\d+)\s*[章部部分篇]\s*[,、]?\s*(.*)"),
  26. "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"),
  27. },
  28. "Rule_3_中英混血派": {
  29. "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部部分篇]\s*[,、]?\s*(.*)"),
  30. "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"),
  31. },
  32. "Rule_4_传统公文派": {
  33. "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部部分篇]\s*[,、]?\s*(.*)"),
  34. "l2": re.compile(r"^([一二三四五六七八九十百零两]+)[,、\s]+([\u4e00-\u9fa5]+.*)"),
  35. },
  36. "Rule_5_单边括号派": {
  37. "l1": re.compile(r"^第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章部部分篇]\s*[,、]?\s*(.*)"),
  38. "l2": re.compile(r"^([一二三四五六七八九十百零两]+)[))\]]\s*([\u4e00-\u9fa5]+.*)"),
  39. },
  40. "Rule_6_小节派": {
  41. "l1": re.compile(r"^第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章部部分篇]\s*[,、]?\s*(.*)"),
  42. "l2": re.compile(r"^第\s*(\d+|[一二三四五六七八九十百零两]+)\s*节\s*[,、]?\s*([\u4e00-\u9fa5]+.*)"),
  43. },
  44. "Rule_7_粗体括号派": {
  45. "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部部分篇]\s*[,、]?\s*(.*)"),
  46. "l2": re.compile(r"^[【\[]\s*(\d+)\s*[\]】]\s*([\u4e00-\u9fa5]+.*)"),
  47. },
  48. "Rule_8_中文序号章数字小节派": {
  49. "l1": re.compile(r"^([一二三四五六七八九十百零两]+)[,、))\]]\s*([\u4e00-\u9fa5A-Za-z].*)"),
  50. "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5]+.*)"),
  51. },
  52. }
  53. CN_NUM_MAP = {
  54. "零": 0, "〇": 0, "一": 1, "二": 2, "两": 2, "三": 3, "四": 4,
  55. "五": 5, "六": 6, "七": 7, "八": 8, "九": 9,
  56. }
  57. TOC_PATTERN = re.compile(r"\.{3,}|…{2,}|-{3,}|·{3,}|•{3,}")
  58. def __init__(self, clip_top: float = 60, clip_bottom: float = 60):
  59. self.clip_top = clip_top
  60. self.clip_bottom = clip_bottom
  61. def extract(self, file_content: bytes) -> Dict[str, Any]:
  62. """提取章节结构。"""
  63. result: Dict[str, Any] = {
  64. "chapters": {},
  65. "total_pages": 0,
  66. }
  67. doc = fitz.open(stream=file_content, filetype="pdf")
  68. try:
  69. body_lines = self._extract_body_lines(doc)
  70. raw_data, winning_rule, coverage_rate, rule_performance = self._extract_body_with_best_rule(body_lines)
  71. chapters = self._convert_rule_output_to_chapters(raw_data)
  72. result["chapters"] = chapters
  73. result["total_pages"] = len(doc)
  74. result["body_rule"] = winning_rule
  75. result["body_coverage"] = coverage_rate
  76. result["rule_performance"] = rule_performance
  77. return result
  78. finally:
  79. doc.close()
  80. def _extract_body_lines(self, doc: fitz.Document) -> List[BodyLine]:
  81. """读取裁剪后的页面文本,规范化正文行。"""
  82. page_lines_by_page: List[Tuple[int, List[str]]] = []
  83. total_pages = len(doc)
  84. for page_index in range(total_pages):
  85. page = doc.load_page(page_index)
  86. rect = page.rect
  87. clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
  88. text = page.get_text("text", clip=clip_box)
  89. page_lines: List[str] = []
  90. for line in text.splitlines():
  91. stripped = line.strip()
  92. if not stripped or self._is_header_footer(stripped):
  93. continue
  94. page_lines.append(stripped)
  95. page_lines_by_page.append((page_index + 1, page_lines))
  96. # 移除跨页重复的非标题噪声(页眉页脚)
  97. repeated_noise_keys = self._find_repeated_non_heading_lines(page_lines_by_page, total_pages)
  98. body_lines: List[BodyLine] = []
  99. for page, lines in page_lines_by_page:
  100. for line in lines:
  101. if self._normalize_repeated_line_key(line) in repeated_noise_keys:
  102. continue
  103. body_lines.append(BodyLine(page=page, text=line))
  104. return body_lines
  105. def _is_header_footer(self, text: str) -> bool:
  106. """判断是否为页眉页脚。"""
  107. # 纯数字页码
  108. if re.match(r"^\d+$", text):
  109. return True
  110. # 常见页眉格式
  111. if re.match(r"^(四川路桥|专项施工方案|第\s*\d+\s*页|Page\s*\d+)$", text, re.IGNORECASE):
  112. return True
  113. return False
  114. def _normalize_repeated_line_key(self, text: str) -> str:
  115. """归一化行文本,用于检测重复。"""
  116. return text.replace(" ", "").replace("\t", "").replace("\u3000", "")
  117. def _find_repeated_non_heading_lines(self, page_lines_by_page: List[Tuple[int, List[str]]], total_pages: int) -> set:
  118. """找出跨页重复且不像标题的行。"""
  119. line_counts: Dict[str, int] = {}
  120. for _, lines in page_lines_by_page:
  121. for line in lines:
  122. key = self._normalize_repeated_line_key(line)
  123. line_counts[key] = line_counts.get(key, 0) + 1
  124. repeated = set()
  125. for key, count in line_counts.items():
  126. if count >= 2 and count >= total_pages * 0.3:
  127. # 只移除明显不像标题的重复行
  128. sample = next((line for _, lines in page_lines_by_page for line in lines
  129. if self._normalize_repeated_line_key(line) == key), "")
  130. if not self._looks_like_heading(sample):
  131. repeated.add(key)
  132. return repeated
  133. def _looks_like_heading(self, text: str) -> bool:
  134. """判断文本是否像标题。"""
  135. for rule_name, rule in self.RULE_LIB.items():
  136. if rule["l1"].match(text) or rule["l2"].match(text):
  137. return True
  138. return False
  139. def _extract_body_with_best_rule(
  140. self, body_lines: List[BodyLine]
  141. ) -> Tuple[Dict[str, Any], str, float, Dict[str, Any]]:
  142. """用所有规则竞争,选出覆盖率最高的规则。"""
  143. best_result = None
  144. best_rule = ""
  145. best_coverage = 0.0
  146. rule_performance = {}
  147. for rule_name, rule in self.RULE_LIB.items():
  148. try:
  149. result, coverage = self._apply_rule(body_lines, rule["l1"], rule["l2"])
  150. rule_performance[rule_name] = {"coverage": coverage}
  151. if coverage > best_coverage:
  152. best_coverage = coverage
  153. best_result = result
  154. best_rule = rule_name
  155. except Exception:
  156. rule_performance[rule_name] = {"coverage": 0.0, "error": True}
  157. if best_result is None:
  158. best_result = {}
  159. best_rule = "none"
  160. best_coverage = 0.0
  161. return best_result, best_rule, best_coverage, rule_performance
  162. def _apply_rule(
  163. self,
  164. body_lines: List[BodyLine],
  165. l1_pattern: re.Pattern,
  166. l2_pattern: re.Pattern,
  167. ) -> Tuple[Dict[str, Any], float]:
  168. """应用一组规则,提取章节结构。"""
  169. result: Dict[str, Any] = {"chapters": []}
  170. current_chapter = None
  171. current_section = None
  172. current_content_lines: List[str] = []
  173. current_pages: List[int] = []
  174. total_lines = len(body_lines)
  175. heading_lines = 0
  176. def _flush_section():
  177. nonlocal current_chapter, current_section, current_content_lines, current_pages
  178. if current_chapter is None:
  179. return
  180. if current_section is None:
  181. # 章节标题行
  182. chapter_data = result["chapters"][-1] if result["chapters"] else None
  183. if chapter_data:
  184. chapter_data["sections"]["章节标题"]["content"] = "\n".join(current_content_lines).strip()
  185. if current_pages:
  186. chapter_data["sections"]["章节标题"]["page_start"] = min(current_pages)
  187. chapter_data["sections"]["章节标题"]["page_end"] = max(current_pages)
  188. else:
  189. chapter_data = result["chapters"][-1] if result["chapters"] else None
  190. if chapter_data and current_section in chapter_data["sections"]:
  191. chapter_data["sections"][current_section]["content"] = "\n".join(current_content_lines).strip()
  192. if current_pages:
  193. chapter_data["sections"][current_section]["page_start"] = min(current_pages)
  194. chapter_data["sections"][current_section]["page_end"] = max(current_pages)
  195. current_content_lines = []
  196. current_pages = []
  197. for line in body_lines:
  198. text = line.text
  199. page = line.page
  200. l1_match = l1_pattern.match(text)
  201. l2_match = l2_pattern.match(text)
  202. if l1_match and not l2_match:
  203. # 一级标题
  204. _flush_section()
  205. current_chapter = text
  206. current_section = None
  207. result["chapters"].append({
  208. "title": text,
  209. "page_start": page,
  210. "sections": {
  211. "章节标题": {
  212. "content": "",
  213. "page_start": page,
  214. "page_end": page,
  215. }
  216. }
  217. })
  218. current_content_lines = [text]
  219. current_pages = [page]
  220. heading_lines += 1
  221. elif l2_match and current_chapter is not None:
  222. # 二级标题
  223. _flush_section()
  224. current_section = text
  225. chapter_data = result["chapters"][-1]
  226. chapter_data["sections"][text] = {
  227. "content": "",
  228. "page_start": page,
  229. "page_end": page,
  230. }
  231. current_content_lines = [text]
  232. current_pages = [page]
  233. heading_lines += 1
  234. else:
  235. # 正文
  236. current_content_lines.append(text)
  237. current_pages.append(page)
  238. _flush_section()
  239. # 计算覆盖率
  240. coverage = heading_lines / max(total_lines, 1)
  241. return result, coverage
  242. def _convert_rule_output_to_chapters(self, raw_data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
  243. """将规则输出转换为以章节标题为键的字典。"""
  244. chapters: Dict[str, Dict[str, Any]] = {}
  245. for chapter in raw_data.get("chapters", []):
  246. title = chapter.get("title", "未命名章节")
  247. sections = {}
  248. for sec_name, sec_data in chapter.get("sections", {}).items():
  249. sections[sec_name] = {
  250. "content": sec_data.get("content", ""),
  251. "page_start": sec_data.get("page_start", 1),
  252. "page_end": sec_data.get("page_end", 1),
  253. }
  254. chapters[title] = sections
  255. return chapters