|
|
@@ -0,0 +1,963 @@
|
|
|
+import re
|
|
|
+import json
|
|
|
+import os
|
|
|
+from base_document import DocumentParser,DocumentChunk
|
|
|
+from typing import List, Dict, Any, Tuple
|
|
|
+from PyPDF2 import PdfReader, PdfWriter
|
|
|
+import fitz # PyMuPDF - 更好的PDF解析
|
|
|
+import docx
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+class EnhancedPDFTocExtractor:
|
|
|
+ """增强的PDF目录提取器"""
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.level_patterns = {
|
|
|
+ 1: [
|
|
|
+ r'^\d+\s+', # "1 "
|
|
|
+ r'^第[一二三四五六七八九十]+\S*\s+', # "第一章"
|
|
|
+ r'^[A-Z]\s+', # "A "
|
|
|
+ ],
|
|
|
+ 2: [
|
|
|
+ r'^\d+\.\d+\s+', # "1.1 "
|
|
|
+ r'^[一二三四五六七八九十]+、\s*', # "一、"
|
|
|
+ r'^\(\d+\)\s+', # "(1)"
|
|
|
+ ],
|
|
|
+ 3: [
|
|
|
+ r'^\d+\.\d+\.\d+\s+', # "1.1.1"
|
|
|
+ r'^\d+\)\s+', # "1)"
|
|
|
+ ]
|
|
|
+ }
|
|
|
+
|
|
|
+ def extract_complete_toc(self, file_path: str) -> List[Dict[str, Any]]:
|
|
|
+ """提取完整的目录结构"""
|
|
|
+ print(f"正在提取PDF目录: {os.path.basename(file_path)}")
|
|
|
+
|
|
|
+ # 方法1: 使用PyMuPDF提取书签
|
|
|
+ pymupdf_toc = self._extract_with_pymupdf(file_path)
|
|
|
+ if pymupdf_toc and self._validate_toc(pymupdf_toc):
|
|
|
+ print(f"PyMuPDF提取到 {len(pymupdf_toc)} 个有效目录项")
|
|
|
+ return pymupdf_toc
|
|
|
+
|
|
|
+ # 方法2: 从文本内容提取目录结构
|
|
|
+ text_toc = self._extract_from_text_content(file_path)
|
|
|
+ if text_toc and self._validate_toc(text_toc):
|
|
|
+ print(f"文本分析提取到 {len(text_toc)} 个有效目录项")
|
|
|
+ return text_toc
|
|
|
+
|
|
|
+ print("未提取到有效的目录结构")
|
|
|
+ return []
|
|
|
+
|
|
|
+ def _extract_with_pymupdf(self, file_path: str) -> List[Dict[str, Any]]:
|
|
|
+ """使用PyMuPDF提取目录"""
|
|
|
+ try:
|
|
|
+ doc = fitz.open(file_path)
|
|
|
+
|
|
|
+ # 获取书签
|
|
|
+ toc = doc.get_toc()
|
|
|
+ if not toc:
|
|
|
+ print("PDF没有书签目录")
|
|
|
+ doc.close()
|
|
|
+ return []
|
|
|
+
|
|
|
+ processed_toc = []
|
|
|
+ parent_stack = [] # 用于跟踪父级标题
|
|
|
+
|
|
|
+ for item in toc:
|
|
|
+ level, title, page_num = item
|
|
|
+
|
|
|
+ # 清理标题
|
|
|
+ clean_title = self._clean_title(title)
|
|
|
+ if not clean_title or len(clean_title) < 2:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 确定实际页码(PyMuPDF页码从1开始)
|
|
|
+ actual_page = max(1, page_num)
|
|
|
+
|
|
|
+ # 处理层级关系
|
|
|
+ while parent_stack and parent_stack[-1]['level'] >= level:
|
|
|
+ parent_stack.pop()
|
|
|
+
|
|
|
+ parent_titles = [p['title'] for p in parent_stack]
|
|
|
+
|
|
|
+ # 创建目录项
|
|
|
+ toc_item = {
|
|
|
+ 'title': clean_title,
|
|
|
+ 'level': level,
|
|
|
+ 'page_start': actual_page,
|
|
|
+ 'page_end': actual_page,
|
|
|
+ 'parent_titles': parent_titles,
|
|
|
+ 'source': 'pymupdf_toc'
|
|
|
+ }
|
|
|
+
|
|
|
+ processed_toc.append(toc_item)
|
|
|
+
|
|
|
+ # 更新父级栈
|
|
|
+ parent_stack.append({
|
|
|
+ 'title': clean_title,
|
|
|
+ 'level': level,
|
|
|
+ 'page': actual_page
|
|
|
+ })
|
|
|
+
|
|
|
+ doc.close()
|
|
|
+ return processed_toc
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"PyMuPDF提取目录失败: {e}")
|
|
|
+ return []
|
|
|
+
|
|
|
+ def _extract_from_text_content(self, file_path: str) -> List[Dict[str, Any]]:
|
|
|
+ """从文本内容提取目录结构"""
|
|
|
+ try:
|
|
|
+ doc = fitz.open(file_path)
|
|
|
+ full_text = ""
|
|
|
+
|
|
|
+ # 提取前20页的文本(目录通常在文档前面)
|
|
|
+ max_pages_for_toc = min(20, len(doc))
|
|
|
+ page_texts = []
|
|
|
+
|
|
|
+ for page_num in range(max_pages_for_toc):
|
|
|
+ page = doc[page_num]
|
|
|
+ text = page.get_text().strip()
|
|
|
+ if text:
|
|
|
+ page_texts.append((page_num + 1, text))
|
|
|
+ full_text += f"--- 第{page_num + 1}页 ---\n{text}\n\n"
|
|
|
+
|
|
|
+ doc.close()
|
|
|
+
|
|
|
+ # 从文本中识别目录
|
|
|
+ toc_items = self._identify_toc_from_text(full_text, page_texts)
|
|
|
+
|
|
|
+ # 如果找到目录,进一步分析层级
|
|
|
+ if toc_items:
|
|
|
+ toc_items = self._analyze_toc_levels(toc_items)
|
|
|
+
|
|
|
+ return toc_items
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"文本内容提取目录失败: {e}")
|
|
|
+ return []
|
|
|
+
|
|
|
+ def _identify_toc_from_text(self, full_text: str, page_texts: List[Tuple[int, str]]) -> List[Dict[str, Any]]:
|
|
|
+ """从文本中识别目录"""
|
|
|
+ toc_items = []
|
|
|
+
|
|
|
+ # 目录特征模式
|
|
|
+ toc_patterns = [
|
|
|
+ # 带页码的目录项: "1 总则 ........... 1"
|
|
|
+ r'^(\d+(?:\.\d+)*)\s+([^\.]{5,50}?)\s*\.{3,}\s*(\d+)\s*$',
|
|
|
+ # 中文编号: "第一章 总则 ........... 1"
|
|
|
+ r'^(第[一二三四五六七八九十百千]+[章节条])\s+([^\.]{5,50}?)\s*\.{3,}\s*(\d+)\s*$',
|
|
|
+ # 简单格式: "1 总则 1"
|
|
|
+ r'^(\d+(?:\.\d+)*)\s+([^\d\.]{5,50}?)\s+(\d+)\s*$',
|
|
|
+ ]
|
|
|
+
|
|
|
+ lines = full_text.split('\n')
|
|
|
+ current_page = 1
|
|
|
+
|
|
|
+ for line in lines:
|
|
|
+ line = line.strip()
|
|
|
+ if not line or len(line) > 200:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 更新当前页码
|
|
|
+ page_match = re.match(r'^---\s*第(\d+)页\s*---$', line)
|
|
|
+ if page_match:
|
|
|
+ current_page = int(page_match.group(1))
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 检查是否是目录项
|
|
|
+ for pattern in toc_patterns:
|
|
|
+ match = re.match(pattern, line)
|
|
|
+ if match:
|
|
|
+ numbering = match.group(1)
|
|
|
+ title = match.group(2).strip()
|
|
|
+ page_num = int(match.group(3)) if match.groups() >= 3 else current_page
|
|
|
+
|
|
|
+ # 清理标题
|
|
|
+ title = self._clean_title(title)
|
|
|
+ if not title or len(title) < 2:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 确定层级
|
|
|
+ level = self._determine_level_from_numbering(numbering)
|
|
|
+
|
|
|
+ toc_items.append({
|
|
|
+ 'title': title,
|
|
|
+ 'level': level,
|
|
|
+ 'page_start': page_num,
|
|
|
+ 'page_end': page_num,
|
|
|
+ 'parent_titles': [],
|
|
|
+ 'source': 'text_analysis'
|
|
|
+ })
|
|
|
+ break
|
|
|
+
|
|
|
+ return toc_items
|
|
|
+
|
|
|
+ def _analyze_toc_levels(self, toc_items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
+ """分析目录项的层级关系"""
|
|
|
+ if not toc_items:
|
|
|
+ return []
|
|
|
+
|
|
|
+ # 根据编号模式重新确定层级
|
|
|
+ for item in toc_items:
|
|
|
+ title = item['title']
|
|
|
+ # 基于标题前的编号确定层级
|
|
|
+ level = self._analyze_title_level(title)
|
|
|
+ if level > 0:
|
|
|
+ item['level'] = level
|
|
|
+
|
|
|
+ # 构建层级关系
|
|
|
+ return self._build_hierarchy(toc_items)
|
|
|
+
|
|
|
+ def _analyze_title_level(self, title: str) -> int:
|
|
|
+ """分析标题的层级"""
|
|
|
+ # 检查常见的层级模式
|
|
|
+ if re.match(r'^\d+\.\d+\.\d+', title):
|
|
|
+ return 3
|
|
|
+ elif re.match(r'^\d+\.\d+', title):
|
|
|
+ return 2
|
|
|
+ elif re.match(r'^\d+', title):
|
|
|
+ return 1
|
|
|
+ elif re.match(r'^第[一二三四五六七八九十]+[章节]', title):
|
|
|
+ return 1
|
|
|
+ elif re.match(r'^[一二三四五六七八九十]+、', title):
|
|
|
+ return 2
|
|
|
+ elif re.match(r'^\(\d+\)', title):
|
|
|
+ return 3
|
|
|
+
|
|
|
+ return 1 # 默认层级
|
|
|
+
|
|
|
+ def _build_hierarchy(self, toc_items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
+ """构建层级关系"""
|
|
|
+ if not toc_items:
|
|
|
+ return []
|
|
|
+
|
|
|
+ hierarchical_items = []
|
|
|
+ parent_stack = []
|
|
|
+
|
|
|
+ for item in toc_items:
|
|
|
+ current_level = item['level']
|
|
|
+
|
|
|
+ # 弹出栈中层级大于等于当前层级的项目
|
|
|
+ while parent_stack and parent_stack[-1]['level'] >= current_level:
|
|
|
+ parent_stack.pop()
|
|
|
+
|
|
|
+ # 获取父级标题
|
|
|
+ parent_titles = [p['title'] for p in parent_stack]
|
|
|
+ item['parent_titles'] = parent_titles
|
|
|
+
|
|
|
+ hierarchical_items.append(item)
|
|
|
+
|
|
|
+ # 将当前项目压入栈
|
|
|
+ parent_stack.append({
|
|
|
+ 'title': item['title'],
|
|
|
+ 'level': item['level'],
|
|
|
+ 'page': item['page_start']
|
|
|
+ })
|
|
|
+
|
|
|
+ return hierarchical_items
|
|
|
+
|
|
|
+ def _determine_level_from_numbering(self, numbering: str) -> int:
|
|
|
+ """根据编号确定层级"""
|
|
|
+ if '.' in numbering:
|
|
|
+ dot_count = numbering.count('.')
|
|
|
+ return min(dot_count + 1, 3)
|
|
|
+ elif re.match(r'^第[一二三四五六七八九十]+[章节]', numbering):
|
|
|
+ return 1
|
|
|
+ elif re.match(r'^[一二三四五六七八九十]+、', numbering):
|
|
|
+ return 2
|
|
|
+ else:
|
|
|
+ return 1
|
|
|
+
|
|
|
+ def _clean_title(self, title: str) -> str:
|
|
|
+ """清理标题"""
|
|
|
+ if not title:
|
|
|
+ return ""
|
|
|
+
|
|
|
+ # 移除.pdf后缀
|
|
|
+ title = re.sub(r'\.pdf$', '', title, flags=re.IGNORECASE)
|
|
|
+
|
|
|
+ # 移除常见的噪音字符
|
|
|
+ #title = re.sub(r'^[\s\d\.\-•>*]*', '', title) # 开头的编号和符号
|
|
|
+ #title = re.sub(r'[\s\d\.\-•>*]*$', '', title) # 结尾的编号和符号
|
|
|
+
|
|
|
+ # 移除多余的空白字符
|
|
|
+ title = re.sub(r'\s+', ' ', title).strip()
|
|
|
+
|
|
|
+ # 移除目录特有的噪音
|
|
|
+ title = re.sub(r'\.{3,}.*$', '', title) # 移除省略号和页码
|
|
|
+
|
|
|
+ return title
|
|
|
+
|
|
|
+ def _validate_toc(self, toc_items: List[Dict[str, Any]]) -> bool:
|
|
|
+ """验证目录结构的有效性"""
|
|
|
+ if not toc_items:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 检查是否有合理的页码分布
|
|
|
+ pages = [item['page_start'] for item in toc_items]
|
|
|
+ unique_pages = set(pages)
|
|
|
+
|
|
|
+ # 如果所有页码都是1,可能有问题
|
|
|
+ if len(unique_pages) == 1 and 1 in unique_pages:
|
|
|
+ print("警告:所有目录项的页码都是1,可能提取不准确")
|
|
|
+ # 不立即返回False,可能短文档确实都在第1页
|
|
|
+
|
|
|
+ # 检查标题质量
|
|
|
+ valid_titles = 0
|
|
|
+ for item in toc_items:
|
|
|
+ title = item['title']
|
|
|
+ if len(title) >= 2 and len(title) <= 100:
|
|
|
+ valid_titles += 1
|
|
|
+
|
|
|
+ # 至少要有一定比例的有效标题
|
|
|
+ return valid_titles >= max(2, len(toc_items) * 0.5)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+class PDFTocExtractor:
|
|
|
+ """PDF目录提取器 - 专门处理PDF目录解析"""
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.level_markers = {
|
|
|
+ 1: ['chapter', 'part', '篇', '章', 'section'],
|
|
|
+ 2: ['section', '节', 'subsection'],
|
|
|
+ 3: ['subsubsection', '小节', 'topic']
|
|
|
+ }
|
|
|
+
|
|
|
+ def extract_toc_with_pymupdf(self, file_path: str) -> List[Dict[str, Any]]:
|
|
|
+ """使用PyMuPDF提取目录(更准确)"""
|
|
|
+ try:
|
|
|
+ doc = fitz.open(file_path)
|
|
|
+ toc = doc.get_toc()
|
|
|
+ doc.close()
|
|
|
+
|
|
|
+ return self._process_pymupdf_toc(toc)
|
|
|
+ except Exception as e:
|
|
|
+ print(f"PyMuPDF提取目录失败: {e}")
|
|
|
+ return []
|
|
|
+
|
|
|
+ def _process_pymupdf_toc(self, toc: List) -> List[Dict[str, Any]]:
|
|
|
+ """处理PyMuPDF返回的目录数据"""
|
|
|
+ processed_toc = []
|
|
|
+
|
|
|
+ for item in toc:
|
|
|
+ print(item)
|
|
|
+ level, title, page_num = item
|
|
|
+ # PyMuPDF的页码是从1开始的,但需要验证
|
|
|
+ actual_page = max(1, page_num)
|
|
|
+
|
|
|
+ processed_toc.append({
|
|
|
+ 'title': self._clean_title(title),
|
|
|
+ 'level': level,
|
|
|
+ 'page_start': actual_page,
|
|
|
+ 'page_end': actual_page,
|
|
|
+ 'parent_titles': [],
|
|
|
+ 'source': 'pymupdf_toc'
|
|
|
+ })
|
|
|
+
|
|
|
+ return processed_toc
|
|
|
+
|
|
|
+ def extract_toc_with_pypdf2(self, file_path: str) -> List[Dict[str, Any]]:
|
|
|
+ """使用PyPDF2提取目录"""
|
|
|
+ try:
|
|
|
+ with open(file_path, 'rb') as file:
|
|
|
+ pdf_reader = PdfReader(file)
|
|
|
+
|
|
|
+ if not hasattr(pdf_reader, 'outline') or not pdf_reader.outline:
|
|
|
+ return []
|
|
|
+
|
|
|
+ return self._extract_toc_from_outline(pdf_reader.outline, pdf_reader)
|
|
|
+ except Exception as e:
|
|
|
+ print(f"PyPDF2提取目录失败: {e}")
|
|
|
+ return []
|
|
|
+
|
|
|
+ def _extract_toc_from_outline(self, outline, pdf_reader, level=1, parent_titles=None, parent_pages=None):
|
|
|
+ """从书签中提取目录结构"""
|
|
|
+ if parent_titles is None:
|
|
|
+ parent_titles = []
|
|
|
+ if parent_pages is None:
|
|
|
+ parent_pages = []
|
|
|
+
|
|
|
+ toc_items = []
|
|
|
+
|
|
|
+ for item in outline:
|
|
|
+ if isinstance(item, dict):
|
|
|
+ # 提取标题
|
|
|
+ title = self._extract_title(item)
|
|
|
+ if not title:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 提取页码 - 这是关键修复
|
|
|
+ page_num = self._extract_page_number(item, pdf_reader)
|
|
|
+
|
|
|
+ # 确定层级
|
|
|
+ actual_level = self._determine_level(title, level)
|
|
|
+
|
|
|
+ toc_item = {
|
|
|
+ 'title': title,
|
|
|
+ 'level': actual_level,
|
|
|
+ 'page_start': page_num,
|
|
|
+ 'page_end': page_num,
|
|
|
+ 'parent_titles': parent_titles.copy(),
|
|
|
+ 'parent_pages': parent_pages.copy(),
|
|
|
+ 'source': 'pypdf2_outline'
|
|
|
+ }
|
|
|
+
|
|
|
+ toc_items.append(toc_item)
|
|
|
+
|
|
|
+ # 处理子项目
|
|
|
+ if '/First' in item:
|
|
|
+ child_parent_titles = parent_titles + [title]
|
|
|
+ child_parent_pages = parent_pages + [page_num]
|
|
|
+ children = self._extract_toc_from_outline(
|
|
|
+ item['/First'], pdf_reader, actual_level + 1,
|
|
|
+ child_parent_titles, child_parent_pages
|
|
|
+ )
|
|
|
+ toc_items.extend(children)
|
|
|
+
|
|
|
+ elif isinstance(item, list):
|
|
|
+ # 处理嵌套结构
|
|
|
+ nested_items = self._extract_toc_from_outline(
|
|
|
+ item, pdf_reader, level, parent_titles, parent_pages
|
|
|
+ )
|
|
|
+ toc_items.extend(nested_items)
|
|
|
+
|
|
|
+ return toc_items
|
|
|
+
|
|
|
+ def _extract_title(self, item) -> str:
|
|
|
+ """提取并清理标题"""
|
|
|
+ title = item.get('/Title', '')
|
|
|
+ if isinstance(title, str):
|
|
|
+ # 清理标题中的编号和特殊字符
|
|
|
+ title = re.sub(r'^\s*[\d\.\s]+\s*', '', title)
|
|
|
+ title = title.strip()
|
|
|
+ return title if title else "未命名标题"
|
|
|
+
|
|
|
+ def _extract_page_number(self, item, pdf_reader) -> int:
|
|
|
+ """提取页码 - 关键修复方法"""
|
|
|
+ try:
|
|
|
+ # 方法1: 从/A/Destinations提取
|
|
|
+ if '/A' in item:
|
|
|
+ action = item['/A']
|
|
|
+ if '/D' in action:
|
|
|
+ dest = action['/D']
|
|
|
+ if isinstance(dest, list) and len(dest) > 0:
|
|
|
+ page_ref = dest[0]
|
|
|
+ return self._get_page_number_from_ref(page_ref, pdf_reader)
|
|
|
+
|
|
|
+ # 方法2: 从/Dest直接提取
|
|
|
+ if '/Dest' in item:
|
|
|
+ dest = item['/Dest']
|
|
|
+ if isinstance(dest, list) and len(dest) > 0:
|
|
|
+ page_ref = dest[0]
|
|
|
+ return self._get_page_number_from_ref(page_ref, pdf_reader)
|
|
|
+
|
|
|
+ # 方法3: 尝试从其他属性提取
|
|
|
+ for key in ['/Page', '/P']:
|
|
|
+ if key in item:
|
|
|
+ page_ref = item[key]
|
|
|
+ return self._get_page_number_from_ref(page_ref, pdf_reader)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"提取页码失败: {e}")
|
|
|
+
|
|
|
+ # 默认返回第1页
|
|
|
+ return 1
|
|
|
+
|
|
|
+ def _get_page_number_from_ref(self, page_ref, pdf_reader) -> int:
|
|
|
+ """从页面引用获取实际页码"""
|
|
|
+ try:
|
|
|
+ if hasattr(page_ref, 'get_object'):
|
|
|
+ page_obj = page_ref.get_object()
|
|
|
+ else:
|
|
|
+ page_obj = page_ref
|
|
|
+
|
|
|
+ # 在PDF阅读器中查找页面索引
|
|
|
+ for i, page in enumerate(pdf_reader.pages):
|
|
|
+ if hasattr(page, 'get_object'):
|
|
|
+ page_obj2 = page.get_object()
|
|
|
+ if page_obj2 == page_obj:
|
|
|
+ return i + 1 # 转换为从1开始的页码
|
|
|
+
|
|
|
+ # 如果找不到,尝试其他方法
|
|
|
+ if hasattr(page_obj, 'indirect_ref'):
|
|
|
+ # 使用间接引用ID来估算
|
|
|
+ ref_id = getattr(page_obj.indirect_ref, 'idnum', 0)
|
|
|
+ if ref_id > 0:
|
|
|
+ return min(max(1, ref_id % 100), len(pdf_reader.pages))
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"解析页面引用失败: {e}")
|
|
|
+
|
|
|
+ return 1
|
|
|
+
|
|
|
+ def _determine_level(self, title: str, base_level: int) -> int:
|
|
|
+ """根据标题内容确定层级"""
|
|
|
+ title_lower = title.lower()
|
|
|
+
|
|
|
+ # 基于关键词判断层级
|
|
|
+ for level, markers in self.level_markers.items():
|
|
|
+ for marker in markers:
|
|
|
+ if marker in title_lower:
|
|
|
+ return level
|
|
|
+
|
|
|
+ # 基于标题格式判断
|
|
|
+ if re.match(r'^(第[一二三四五六七八九十]+[章节篇])', title):
|
|
|
+ return 1
|
|
|
+ elif re.match(r'^\d+\.\d+', title):
|
|
|
+ dots_count = title.count('.')
|
|
|
+ return min(dots_count, 3)
|
|
|
+ elif re.match(r'^[一二三四五六七八九十]、', title):
|
|
|
+ return 2
|
|
|
+
|
|
|
+ # 默认使用基础层级
|
|
|
+ return base_level
|
|
|
+
|
|
|
+ def _clean_title(self, title: str) -> str:
|
|
|
+ """清理标题"""
|
|
|
+ # 移除多余的空白字符
|
|
|
+ title = re.sub(r'\s+', ' ', title).strip()
|
|
|
+
|
|
|
+ # 移除开头的编号(如果存在)
|
|
|
+ title = re.sub(r'^\s*[\d\.\s]+\s*', '', title)
|
|
|
+
|
|
|
+ return title
|
|
|
+
|
|
|
+ def extract_toc_from_text(self, file_path: str) -> List[Dict[str, Any]]:
|
|
|
+ """从文本内容中提取目录结构"""
|
|
|
+ try:
|
|
|
+ doc = fitz.open(file_path)
|
|
|
+ toc_items = []
|
|
|
+
|
|
|
+ for page_num in range(len(doc)):
|
|
|
+ page = doc[page_num]
|
|
|
+ text = page.get_text()
|
|
|
+
|
|
|
+ # 在文本中查找标题模式
|
|
|
+ headings = self._find_headings_in_text(text, page_num + 1)
|
|
|
+ toc_items.extend(headings)
|
|
|
+
|
|
|
+ doc.close()
|
|
|
+ return toc_items
|
|
|
+ except Exception as e:
|
|
|
+ print(f"从文本提取目录失败: {e}")
|
|
|
+ return []
|
|
|
+
|
|
|
+ def _find_headings_in_text(self, text: str, page_num: int) -> List[Dict[str, Any]]:
|
|
|
+ """在文本中查找标题"""
|
|
|
+ headings = []
|
|
|
+ lines = text.split('\n')
|
|
|
+
|
|
|
+ heading_patterns = [
|
|
|
+ # 中文标题模式
|
|
|
+ (r'^(第[一二三四五六七八九十零百千]+[章节条款篇])\s+(.+)$', 1),
|
|
|
+ (r'^([一二三四五六七八九十]、)\s*(.+)$', 2),
|
|
|
+ # 数字标题模式
|
|
|
+ (r'^(\d+)\s+(.+)$', 2),
|
|
|
+ (r'^(\d+\.\d+)\s+(.+)$', 3),
|
|
|
+ (r'^(\d+\.\d+\.\d+)\s+(.+)$', 4),
|
|
|
+ # 英文标题模式
|
|
|
+ (r'^(Chapter|Section)\s+(\d+)\s+(.+)$', 1),
|
|
|
+ (r'^(\d+\.\d+)\s+(.+)$', 2),
|
|
|
+ ]
|
|
|
+
|
|
|
+ for line in lines:
|
|
|
+ line = line.strip()
|
|
|
+ if len(line) > 100: # 太长的行不是标题
|
|
|
+ continue
|
|
|
+
|
|
|
+ for pattern, level in heading_patterns:
|
|
|
+ match = re.match(pattern, line)
|
|
|
+ if match:
|
|
|
+ if len(match.groups()) == 2:
|
|
|
+ title = match.group(2)
|
|
|
+ else:
|
|
|
+ title = match.group(3) if len(match.groups()) > 2 else line
|
|
|
+
|
|
|
+ headings.append({
|
|
|
+ 'title': title.strip(),
|
|
|
+ 'level': level,
|
|
|
+ 'page_start': page_num,
|
|
|
+ 'page_end': page_num,
|
|
|
+ 'parent_titles': [],
|
|
|
+ 'source': 'text_analysis'
|
|
|
+ })
|
|
|
+ break
|
|
|
+
|
|
|
+ return headings
|
|
|
+
|
|
|
+class PDFParser(DocumentParser):
|
|
|
+ """PDF文档解析器 - 修复版本"""
|
|
|
+
|
|
|
+ def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 100):
|
|
|
+ super().__init__(chunk_size, chunk_overlap)
|
|
|
+ self.toc_extractor = EnhancedPDFTocExtractor()
|
|
|
+
|
|
|
+ def extract_toc_structure(self, file_path: str) -> List[Dict[str, Any]]:
|
|
|
+ """提取PDF目录结构 - 增强版本"""
|
|
|
+ print(f"正在解析PDF目录: {os.path.basename(file_path)}")
|
|
|
+
|
|
|
+ # 使用增强的目录提取器
|
|
|
+ toc_structure = self.toc_extractor.extract_complete_toc(file_path)
|
|
|
+
|
|
|
+ if not toc_structure:
|
|
|
+ print("未提取到目录结构,将使用传统分割方式")
|
|
|
+ return []
|
|
|
+
|
|
|
+ # 验证并修复目录结构
|
|
|
+ toc_structure = self._validate_and_fix_toc(toc_structure, file_path)
|
|
|
+
|
|
|
+ print(f"最终提取到 {len(toc_structure)} 个目录项")
|
|
|
+ return toc_structure
|
|
|
+
|
|
|
+ def _validate_and_fix_toc(self, toc_structure: List[Dict[str, Any]], file_path: str) -> List[Dict[str, Any]]:
|
|
|
+ """验证并修复目录结构 - 完整版本"""
|
|
|
+ if not toc_structure:
|
|
|
+ return []
|
|
|
+
|
|
|
+ doc_length = self._get_pdf_length(file_path)
|
|
|
+ fixed_toc = []
|
|
|
+
|
|
|
+ print(f"文档总页数: {doc_length}")
|
|
|
+ print(f"开始验证 {len(toc_structure)} 个目录项...")
|
|
|
+
|
|
|
+ invalid_count = 0
|
|
|
+
|
|
|
+ for i, item in enumerate(toc_structure):
|
|
|
+ original_page = item['page_start']
|
|
|
+
|
|
|
+ # 检查页码有效性
|
|
|
+ if (original_page <= 0 or
|
|
|
+ original_page > doc_length + 5 or # 允许稍微超出
|
|
|
+ (i > 0 and original_page < toc_structure[i-1]['page_start'])): # 页码倒序
|
|
|
+
|
|
|
+ # 标记为估算的页码
|
|
|
+ estimated_page = self._estimate_page_number(item, doc_length, toc_structure)
|
|
|
+ item['page_start'] = estimated_page
|
|
|
+ item['page_end'] = estimated_page
|
|
|
+ item['source'] = 'estimated'
|
|
|
+ invalid_count += 1
|
|
|
+
|
|
|
+ print(f" 修复第{i+1}项: '{item['title']}' 页码 {original_page} -> {estimated_page}")
|
|
|
+
|
|
|
+ # 清理标题
|
|
|
+ original_title = item['title']
|
|
|
+ item['title'] = self._clean_title_completely(original_title)
|
|
|
+ if original_title != item['title']:
|
|
|
+ print(f" 清理标题: '{original_title}' -> '{item['title']}'")
|
|
|
+
|
|
|
+ fixed_toc.append(item)
|
|
|
+
|
|
|
+ if invalid_count > 0:
|
|
|
+ print(f"共修复 {invalid_count} 个无效页码")
|
|
|
+
|
|
|
+ return fixed_toc
|
|
|
+
|
|
|
+ def _get_pdf_length(self, file_path: str) -> int:
|
|
|
+ """获取PDF页数"""
|
|
|
+ try:
|
|
|
+ doc = fitz.open(file_path)
|
|
|
+ length = len(doc)
|
|
|
+ doc.close()
|
|
|
+ return length
|
|
|
+ except:
|
|
|
+ return 100 # 默认值
|
|
|
+
|
|
|
+ def _estimate_page_number(self, item: Dict[str, Any], doc_length: int, toc_structure: List[Dict[str, Any]] = None) -> int:
|
|
|
+ """估算页码 - 改进版本"""
|
|
|
+ if not toc_structure:
|
|
|
+ return 1
|
|
|
+
|
|
|
+ try:
|
|
|
+ index = toc_structure.index(item)
|
|
|
+
|
|
|
+ # 方法1: 如果有前面的有效页码,基于前面的页码估算
|
|
|
+ for i in range(index - 1, -1, -1):
|
|
|
+ prev_item = toc_structure[i]
|
|
|
+ if (prev_item['page_start'] > 0 and
|
|
|
+ prev_item['page_start'] <= doc_length and
|
|
|
+ prev_item.get('source') != 'estimated'):
|
|
|
+ # 基于前一项的页码估算,假设每项占2-3页
|
|
|
+ estimated = prev_item['page_start'] + 2
|
|
|
+ return min(estimated, doc_length)
|
|
|
+
|
|
|
+ # 方法2: 基于索引位置估算(假设目录在文档前部)
|
|
|
+ if index < 10: # 前10项可能在文档前20页
|
|
|
+ return min(index + 1, 20)
|
|
|
+ else:
|
|
|
+ # 后续项按比例分布
|
|
|
+ progress = index / len(toc_structure)
|
|
|
+ estimated = int(progress * doc_length * 0.8) + 1 # 留20%给附录等
|
|
|
+ return min(max(1, estimated), doc_length)
|
|
|
+
|
|
|
+ except (ValueError, IndexError):
|
|
|
+ return 1
|
|
|
+
|
|
|
+ def _clean_title_completely(self, title: str) -> str:
|
|
|
+ """完全清理标题"""
|
|
|
+ if not title:
|
|
|
+ return "未命名标题"
|
|
|
+
|
|
|
+ # 移除文件后缀
|
|
|
+ title = re.sub(r'\.(pdf|docx?|txt)$', '', title, flags=re.IGNORECASE)
|
|
|
+
|
|
|
+ # 移除页码引用
|
|
|
+ title = re.sub(r'[\(\{\[\<]?页码?\s*\d+[\)\}\ \]\>]?', '', title)
|
|
|
+
|
|
|
+ # 清理空白字符
|
|
|
+ title = re.sub(r'\s+', ' ', title).strip()
|
|
|
+
|
|
|
+ return title if title else "未命名标题"
|
|
|
+
|
|
|
+ def extract_full_content(self, file_path: str) -> str:
|
|
|
+ """提取完整PDF内容"""
|
|
|
+ full_content = ""
|
|
|
+
|
|
|
+ try:
|
|
|
+ doc = fitz.open(file_path)
|
|
|
+
|
|
|
+ for page_num in range(len(doc)):
|
|
|
+ page = doc[page_num]
|
|
|
+ text = page.get_text()
|
|
|
+
|
|
|
+ if text.strip():
|
|
|
+ full_content += f"--- 第{page_num + 1}页 ---\n{text}\n\n"
|
|
|
+
|
|
|
+ doc.close()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"PyMuPDF提取内容失败: {e}")
|
|
|
+ # 回退到PyPDF2
|
|
|
+ try:
|
|
|
+ with open(file_path, 'rb') as file:
|
|
|
+ pdf_reader = PdfReader(file)
|
|
|
+
|
|
|
+ for page_num, page in enumerate(pdf_reader.pages, 1):
|
|
|
+ page_content = page.extract_text()
|
|
|
+ if page_content.strip():
|
|
|
+ full_content += f"--- 第{page_num}页 ---\n{page_content}\n\n"
|
|
|
+ except Exception as e2:
|
|
|
+ print(f"PyPDF2提取内容也失败: {e2}")
|
|
|
+
|
|
|
+ return full_content
|
|
|
+
|
|
|
+ def extract_content_by_section(self, file_path: str, section_info: Dict[str, Any]) -> str:
|
|
|
+ """提取指定章节内容"""
|
|
|
+ content = ""
|
|
|
+ start_page = section_info.get('page_start', 1)
|
|
|
+ end_page = section_info.get('page_end', start_page)
|
|
|
+
|
|
|
+ print(f"提取章节: {section_info['title']}, 页码: {start_page}-{end_page}")
|
|
|
+
|
|
|
+ try:
|
|
|
+ doc = fitz.open(file_path)
|
|
|
+
|
|
|
+ # 调整页码范围
|
|
|
+ start_idx = max(0, start_page - 1)
|
|
|
+ end_idx = min(len(doc) - 1, end_page - 1)
|
|
|
+
|
|
|
+ for page_num in range(start_idx, end_idx + 1):
|
|
|
+ page = doc[page_num]
|
|
|
+ text = page.get_text()
|
|
|
+ if text.strip():
|
|
|
+ content += text + "\n\n"
|
|
|
+
|
|
|
+ doc.close()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"提取章节内容失败: {e}")
|
|
|
+
|
|
|
+ return content
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+class WordParser(DocumentParser):
|
|
|
+ """Word文档解析器"""
|
|
|
+
|
|
|
+ def extract_toc_structure(self, file_path: str) -> List[Dict[str, Any]]:
|
|
|
+ """提取Word目录结构"""
|
|
|
+ doc = docx.Document(file_path)
|
|
|
+ toc_structure = []
|
|
|
+ current_parents = []
|
|
|
+
|
|
|
+ for paragraph in doc.paragraphs:
|
|
|
+ if paragraph.style.name.startswith('Heading'):
|
|
|
+ level = int(paragraph.style.name.replace('Heading', '').strip())
|
|
|
+ title = paragraph.text.strip()
|
|
|
+
|
|
|
+ # 更新父级标题栈
|
|
|
+ while current_parents and current_parents[-1]['level'] >= level:
|
|
|
+ current_parents.pop()
|
|
|
+
|
|
|
+ parent_titles = [p['title'] for p in current_parents]
|
|
|
+
|
|
|
+ # 估算页码(Word没有直接的页码信息)
|
|
|
+ # 这里使用段落索引作为近似值
|
|
|
+ para_index = doc.paragraphs.index(paragraph)
|
|
|
+ estimated_page = para_index // 50 + 1 # 假设每页50个段落
|
|
|
+
|
|
|
+ toc_structure.append({
|
|
|
+ 'title': title,
|
|
|
+ 'level': level,
|
|
|
+ 'page_start': estimated_page,
|
|
|
+ 'page_end': estimated_page,
|
|
|
+ 'parent_titles': parent_titles.copy(),
|
|
|
+ 'paragraph_index': para_index
|
|
|
+ })
|
|
|
+
|
|
|
+ current_parents.append({
|
|
|
+ 'title': title,
|
|
|
+ 'level': level,
|
|
|
+ 'index': len(toc_structure) - 1
|
|
|
+ })
|
|
|
+
|
|
|
+ return toc_structure
|
|
|
+
|
|
|
+ def extract_content_by_section(self, file_path: str, section_info: Dict[str, Any]) -> str:
|
|
|
+ """提取指定章节内容"""
|
|
|
+ doc = docx.Document(file_path)
|
|
|
+ content = []
|
|
|
+ in_section = False
|
|
|
+ current_level = section_info['level']
|
|
|
+
|
|
|
+ start_index = section_info.get('paragraph_index', 0)
|
|
|
+ next_section_index = self._find_next_section_index(doc, start_index, current_level)
|
|
|
+
|
|
|
+ for i, paragraph in enumerate(doc.paragraphs):
|
|
|
+ if i < start_index:
|
|
|
+ continue
|
|
|
+
|
|
|
+ if i == start_index:
|
|
|
+ in_section = True
|
|
|
+ content.append(paragraph.text)
|
|
|
+ continue
|
|
|
+
|
|
|
+ if in_section:
|
|
|
+ # 检查是否到达下一同级或更高级标题
|
|
|
+ if (paragraph.style.name.startswith('Heading') and
|
|
|
+ i >= next_section_index):
|
|
|
+ break
|
|
|
+
|
|
|
+ content.append(paragraph.text)
|
|
|
+
|
|
|
+ return '\n'.join(content)
|
|
|
+
|
|
|
+ def _find_next_section_index(self, doc, start_index: int, current_level: int) -> int:
|
|
|
+ """查找下一个同级或更高级标题的索引"""
|
|
|
+ for i in range(start_index + 1, len(doc.paragraphs)):
|
|
|
+ paragraph = doc.paragraphs[i]
|
|
|
+ if paragraph.style.name.startswith('Heading'):
|
|
|
+ level = int(paragraph.style.name.replace('Heading', '').strip())
|
|
|
+ if level <= current_level:
|
|
|
+ return i
|
|
|
+ return len(doc.paragraphs)
|
|
|
+
|
|
|
+class DocumentSplitter:
|
|
|
+ """文档拆分管理器"""
|
|
|
+
|
|
|
+ def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 100):
|
|
|
+ self.chunk_size = chunk_size
|
|
|
+ self.chunk_overlap = chunk_overlap
|
|
|
+ self.parsers = {
|
|
|
+ '.pdf': PDFParser(chunk_size, chunk_overlap),
|
|
|
+ '.docx': WordParser(chunk_size, chunk_overlap)
|
|
|
+ }
|
|
|
+
|
|
|
+ def split_document(self, file_path: str) -> List[DocumentChunk]:
|
|
|
+ """拆分文档"""
|
|
|
+ file_ext = os.path.splitext(file_path)[1].lower()
|
|
|
+
|
|
|
+ if file_ext not in self.parsers:
|
|
|
+ raise ValueError(f"不支持的文件格式: {file_ext}")
|
|
|
+
|
|
|
+ parser = self.parsers[file_ext]
|
|
|
+ return parser.split_document(file_path)
|
|
|
+
|
|
|
+ def save_chunks_to_json(self, chunks: List[DocumentChunk], output_file: str):
|
|
|
+ """保存片段到JSON文件"""
|
|
|
+ chunk_data = []
|
|
|
+
|
|
|
+ for chunk in chunks:
|
|
|
+ chunk_data.append({
|
|
|
+ 'content': chunk.content,
|
|
|
+ 'metadata': {
|
|
|
+ 'title': chunk.metadata.title,
|
|
|
+ 'level': chunk.metadata.level,
|
|
|
+ 'page_start': chunk.metadata.page_start,
|
|
|
+ 'page_end': chunk.metadata.page_end,
|
|
|
+ 'parent_titles': chunk.metadata.parent_titles,
|
|
|
+ 'content_hash': chunk.metadata.content_hash
|
|
|
+ },
|
|
|
+ 'chunk_index': chunk.chunk_index,
|
|
|
+ 'total_chunks': chunk.total_chunks
|
|
|
+ })
|
|
|
+
|
|
|
+ with open(output_file, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(chunk_data, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+ def print_chunk_summary(self, chunks: List[DocumentChunk]):
|
|
|
+ """打印片段摘要"""
|
|
|
+ print("\n=== 文档拆分摘要 ===")
|
|
|
+ print(f"总片段数: {len(chunks)}")
|
|
|
+
|
|
|
+ for i, chunk in enumerate(chunks):
|
|
|
+ print(f"\n片段 {i+1}:")
|
|
|
+ print(f" 标题: {chunk.metadata.title}")
|
|
|
+ print(f" 层级: {chunk.metadata.level}")
|
|
|
+ print(f" 页码: {chunk.metadata.page_start}-{chunk.metadata.page_end}")
|
|
|
+ print(f" 父级: {' -> '.join(chunk.metadata.parent_titles)}")
|
|
|
+ print(f" 片段: {chunk.chunk_index + 1}/{chunk.total_chunks}")
|
|
|
+ print(f" 内容长度: {len(chunk.content)}")
|
|
|
+ print(f" 内容预览: {chunk.content[:100]}...")
|
|
|
+
|
|
|
+# 使用示例
|
|
|
+def main():
|
|
|
+ # 初始化拆分器
|
|
|
+ splitter = DocumentSplitter(chunk_size=800, chunk_overlap=50)
|
|
|
+
|
|
|
+ # 示例文件路径
|
|
|
+ file_path = "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/bfp_files/"
|
|
|
+ pdf_file = file_path + "公路工程施工安全技术规范.pdf"
|
|
|
+ #pdf_file = file_path + "公路桥涵施工技术规范.pdf"
|
|
|
+ word_file = "example.docx"
|
|
|
+
|
|
|
+ try:
|
|
|
+
|
|
|
+ if os.path.exists(pdf_file):
|
|
|
+ print(f"测试PDF目录提取: {pdf_file}")
|
|
|
+
|
|
|
+ # 创建PDF解析器实例进行测试
|
|
|
+ pdf_parser = PDFParser()
|
|
|
+
|
|
|
+ # 测试目录提取
|
|
|
+ #toc_structure = pdf_parser.extract_toc_structure(pdf_file)
|
|
|
+ # print(f"\n提取到的目录结构 ({len(toc_structure)} 项):")
|
|
|
+ # for i, item in enumerate(toc_structure):
|
|
|
+ # print(f"{i+1:2d}. 层级:{item['level']:2d} 页码:{item['page_start']:3d}-{item['page_end']:3d} 标题: {item['title']}")
|
|
|
+
|
|
|
+ # 测试完整文档拆分
|
|
|
+ chunks = splitter.split_document(pdf_file)
|
|
|
+ #splitter.print_chunk_summary(chunks)
|
|
|
+
|
|
|
+ # 拆分PDF文档
|
|
|
+ # if os.path.exists(pdf_file):
|
|
|
+ # print(f"处理PDF文档: {pdf_file}")
|
|
|
+ # pdf_chunks = splitter.split_document(pdf_file)
|
|
|
+ # splitter.save_chunks_to_json(pdf_chunks, "pdf_chunks.json")
|
|
|
+ # splitter.print_chunk_summary(pdf_chunks[:5]) # 只显示前5个片段
|
|
|
+
|
|
|
+ # 拆分Word文档
|
|
|
+ # if os.path.exists(word_file):
|
|
|
+ # print(f"\n处理Word文档: {word_file}")
|
|
|
+ # word_chunks = splitter.split_document(word_file)
|
|
|
+ # splitter.save_chunks_to_json(word_chunks, "word_chunks.json")
|
|
|
+ # splitter.print_chunk_summary(word_chunks[:5])
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"处理文档时出错: {e}")
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|