|
|
@@ -2,35 +2,89 @@
|
|
|
PDF 结构提取器 - 同步并发 OCR 版本
|
|
|
|
|
|
基于 splitter_pdf 逻辑,直接提取章节结构并记录页码。
|
|
|
-支持 OCR 增强:检测表格区域并使用 ThreadPoolExecutor 5并发 OCR,其他文本保持 PyMuPDF 提取。
|
|
|
+支持 OCR 增强:表格检测和识别委托给 OcrProcessor,其他文本保持 PyMuPDF 提取。
|
|
|
输出格式兼容后续分类与组装流程。
|
|
|
"""
|
|
|
|
|
|
import re
|
|
|
-from typing import Dict, Any, List, Optional, Tuple
|
|
|
+from dataclasses import dataclass
|
|
|
+from typing import Dict, Any, List, Optional, Tuple, Set
|
|
|
|
|
|
import fitz
|
|
|
|
|
|
from foundation.observability.logger.loggering import review_logger as logger
|
|
|
+from .ocr_processor import OcrProcessor
|
|
|
|
|
|
-from .ocr_processor import OcrProcessor, TableRegion, OcrResult
|
|
|
|
|
|
-# 尝试导入 RapidLayout
|
|
|
-try:
|
|
|
- from rapid_layout import RapidLayout
|
|
|
- RAPID_LAYOUT_AVAILABLE = True
|
|
|
-except ImportError:
|
|
|
- RAPID_LAYOUT_AVAILABLE = False
|
|
|
- RapidLayout = None
|
|
|
+@dataclass
|
|
|
+class TableRegion:
|
|
|
+ """表格区域信息"""
|
|
|
+ page_num: int
|
|
|
+ page: fitz.Page
|
|
|
+ bbox: Tuple[float, float, float, float]
|
|
|
+ score: float
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class OcrResult:
|
|
|
+ """OCR 结果"""
|
|
|
+ page_num: int
|
|
|
+ bbox: Tuple[float, float, float, float]
|
|
|
+ score: float
|
|
|
+ text: str
|
|
|
+ success: bool
|
|
|
|
|
|
|
|
|
class PdfStructureExtractor:
|
|
|
"""PDF 章节结构提取器(支持 OCR 异步并发)"""
|
|
|
|
|
|
- CHAPTER_PATTERN = re.compile(r'^第[一二三四五六七八九十百]+章\s*.*')
|
|
|
- SECTION_PATTERN = re.compile(r'^[一二三四五六七八九十百]+、\s*.*')
|
|
|
+ RULE_LIB = {
|
|
|
+ "Rule_1_纯数字派": {
|
|
|
+ "l1": re.compile(r"^\d{1,2}(?:[\..。、])?\s*(?!\d)[\u4e00-\u9fa5A-Za-z].*"),
|
|
|
+ "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5A-Za-z].*)"),
|
|
|
+ },
|
|
|
+ "Rule_2_混合章派": {
|
|
|
+ "l1": re.compile(r"^第\s*(\d+)\s*[章部分篇][\s、]*(.*)"),
|
|
|
+ "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5A-Za-z].*)"),
|
|
|
+ },
|
|
|
+ "Rule_3_中英混血派": {
|
|
|
+ "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部分篇][\s、]*(.*)"),
|
|
|
+ "l2": re.compile(r"^(\d+)\.(\d+)(?!\.\d)\.?\s*([\u4e00-\u9fa5A-Za-z].*)"),
|
|
|
+ },
|
|
|
+ "Rule_4_传统公文派": {
|
|
|
+ "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部分篇][\s、]*(.*)"),
|
|
|
+ "l2": re.compile(r"^([一二三四五六七八九十百零两]+)[、\s]+([\u4e00-\u9fa5A-Za-z].*)"),
|
|
|
+ },
|
|
|
+ "Rule_5_单边括号派": {
|
|
|
+ "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部分篇][\s、]*(.*)"),
|
|
|
+ "l2": re.compile(r"^([一二三四五六七八九十百零两]+)[)\)\]][\s]*([\u4e00-\u9fa5A-Za-z].*)"),
|
|
|
+ },
|
|
|
+ "Rule_6_小节派": {
|
|
|
+ "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部分篇][\s、]*(.*)"),
|
|
|
+ "l2": re.compile(r"^第\s*([一二三四五六七八九十百零两]+)\s*节[\s、]*([\u4e00-\u9fa5A-Za-z].*)"),
|
|
|
+ },
|
|
|
+ "Rule_7_粗体括号派": {
|
|
|
+ "l1": re.compile(r"^第\s*[一二三四五六七八九十百零两]+\s*[章部分篇][\s、]*(.*)"),
|
|
|
+ "l2": re.compile(r"^[【\[]\s*(\d+)\s*[\]】][\s]*([\u4e00-\u9fa5A-Za-z].*)"),
|
|
|
+ },
|
|
|
+ "Rule_8_cn_list_l1_numeric_l2": {
|
|
|
+ "l1": re.compile(
|
|
|
+ r"^(?:[一二三四五六七八九十百零两]+)[、\)\]\uFF09]\s*[\u4e00-\u9fa5A-Za-z].*"
|
|
|
+ ),
|
|
|
+ "l2": re.compile(
|
|
|
+ r"^\d{1,2}(?:[、\.\uFF0E\u3002\)\]\uFF09])\s*(?!\d)[\u4e00-\u9fa5A-Za-z].*"
|
|
|
+ ),
|
|
|
+ },
|
|
|
+ }
|
|
|
TOC_PATTERN = re.compile(r"\.{3,}|…{2,}")
|
|
|
|
|
|
+ # OCR 配置
|
|
|
+ MAX_SHORT_EDGE = 1024
|
|
|
+ JPEG_QUALITY = 90
|
|
|
+ OCR_DPI = 200
|
|
|
+ OCR_CONFIDENCE_THRESHOLD = 0.5
|
|
|
+ OCR_CONCURRENT_WORKERS = 5
|
|
|
+
|
|
|
def __init__(
|
|
|
self,
|
|
|
clip_top: float = 60,
|
|
|
@@ -44,22 +98,37 @@ class PdfStructureExtractor:
|
|
|
):
|
|
|
self.clip_top = clip_top
|
|
|
self.clip_bottom = clip_bottom
|
|
|
- self.use_ocr = use_ocr and RAPID_LAYOUT_AVAILABLE
|
|
|
|
|
|
- # 初始化 OCR 处理器
|
|
|
- self._ocr_processor = OcrProcessor(
|
|
|
- ocr_api_url=ocr_api_url,
|
|
|
- ocr_timeout=ocr_timeout,
|
|
|
- ocr_api_key=ocr_api_key,
|
|
|
- ) if self.use_ocr else None
|
|
|
+ # OCR 配置
|
|
|
+ self.ocr_api_url = ocr_api_url
|
|
|
+ self.ocr_timeout = ocr_timeout
|
|
|
+ self.ocr_api_key = ocr_api_key
|
|
|
+ self.ocr_processor: Optional[OcrProcessor] = None
|
|
|
+ self.use_ocr = False
|
|
|
+ if use_ocr:
|
|
|
+ self.ocr_processor = OcrProcessor(
|
|
|
+ ocr_api_url=ocr_api_url,
|
|
|
+ ocr_timeout=ocr_timeout,
|
|
|
+ ocr_api_key=ocr_api_key,
|
|
|
+ max_short_edge=self.MAX_SHORT_EDGE,
|
|
|
+ jpeg_quality=self.JPEG_QUALITY,
|
|
|
+ ocr_dpi=self.OCR_DPI,
|
|
|
+ confidence_threshold=self.OCR_CONFIDENCE_THRESHOLD,
|
|
|
+ concurrent_workers=self.OCR_CONCURRENT_WORKERS,
|
|
|
+ )
|
|
|
+ self.use_ocr = self.ocr_processor.is_available()
|
|
|
+ self._layout_engine: Optional[Any] = None
|
|
|
|
|
|
# 目录检测配置
|
|
|
self.detect_toc = detect_toc
|
|
|
self.toc_model_path = toc_model_path
|
|
|
self._toc_extractor = None
|
|
|
|
|
|
- if use_ocr and not RAPID_LAYOUT_AVAILABLE:
|
|
|
- logger.warning("RapidLayout 未安装,OCR 功能不可用")
|
|
|
+ def _get_layout_engine(self) -> Optional[Any]:
|
|
|
+ """兼容旧调用,实际由 OcrProcessor 管理版面引擎。"""
|
|
|
+ if self.ocr_processor is None:
|
|
|
+ return None
|
|
|
+ return self.ocr_processor._get_layout_engine()
|
|
|
|
|
|
def extract(self, file_content: bytes, progress_callback=None) -> Dict[str, Any]:
|
|
|
"""
|
|
|
@@ -91,6 +160,7 @@ class PdfStructureExtractor:
|
|
|
try:
|
|
|
catalog = self._extract_catalog(file_content, progress_callback)
|
|
|
if catalog:
|
|
|
+ catalog = self._normalize_catalog(catalog)
|
|
|
result["catalog"] = catalog
|
|
|
logger.info(f"[PDF提取] 目录提取完成: {catalog.get('total_chapters', 0)} 章")
|
|
|
except Exception as e:
|
|
|
@@ -100,6 +170,29 @@ class PdfStructureExtractor:
|
|
|
doc = fitz.open(stream=file_content)
|
|
|
try:
|
|
|
structure = self._extract_from_doc(doc, progress_callback)
|
|
|
+ if result.get("catalog"):
|
|
|
+ # 正文抽取和目录检测是两条独立链路:
|
|
|
+ # 1. 正文抽取更容易拿到连续 content
|
|
|
+ # 2. 目录检测更容易保留顺序和层级
|
|
|
+ # 这里先用目录骨架对齐正文,再按标题边界重建内容,尽量减少漏标题造成的结构缺失。
|
|
|
+ structure["chapters"] = self._reconcile_structure_with_catalog(
|
|
|
+ structure.get("chapters", {}),
|
|
|
+ result["catalog"],
|
|
|
+ )
|
|
|
+ rebuilt_chapters = self._rebuild_section_contents_from_catalog(
|
|
|
+ structure.get("chapters", {}),
|
|
|
+ result["catalog"],
|
|
|
+ structure.get("_body_lines", []),
|
|
|
+ )
|
|
|
+ if rebuilt_chapters:
|
|
|
+ structure["chapters"] = rebuilt_chapters
|
|
|
+ enriched_catalog = self._enrich_catalog_with_structure(
|
|
|
+ result["catalog"],
|
|
|
+ structure.get("chapters", {}),
|
|
|
+ )
|
|
|
+ if enriched_catalog:
|
|
|
+ result["catalog"] = enriched_catalog
|
|
|
+ structure.pop("_body_lines", None)
|
|
|
result["chapters"] = structure.get("chapters", {})
|
|
|
result["total_pages"] = len(doc)
|
|
|
return result
|
|
|
@@ -117,31 +210,22 @@ class PdfStructureExtractor:
|
|
|
from .toc_detector import TOCCatalogExtractor
|
|
|
|
|
|
if self._toc_extractor is None:
|
|
|
- # 使用 OCR 处理器的配置(如果已初始化)
|
|
|
- ocr_config = {}
|
|
|
- if self._ocr_processor:
|
|
|
- ocr_config = {
|
|
|
- "ocr_api_url": self._ocr_processor.ocr_api_url,
|
|
|
- "ocr_api_key": self._ocr_processor.ocr_api_key,
|
|
|
- "ocr_timeout": self._ocr_processor.ocr_timeout,
|
|
|
- }
|
|
|
self._toc_extractor = TOCCatalogExtractor(
|
|
|
model_path=self.toc_model_path,
|
|
|
- **ocr_config
|
|
|
+ ocr_api_url=self.ocr_api_url,
|
|
|
+ ocr_api_key=self.ocr_api_key,
|
|
|
+ ocr_timeout=self.ocr_timeout,
|
|
|
)
|
|
|
|
|
|
return self._toc_extractor.detect_and_extract(file_content, progress_callback)
|
|
|
|
|
|
def _extract_from_doc(self, doc: fitz.Document, progress_callback=None) -> Dict[str, Any]:
|
|
|
- """
|
|
|
- 提取文档结构(支持 OCR 异步并发)- 带坐标的精准回填方案。
|
|
|
-
|
|
|
- 流程:
|
|
|
- 1. 提取带坐标的文本块
|
|
|
- 2. 章节标题匹配 + 块归属划分
|
|
|
- 3. 扫描表格区域并 OCR
|
|
|
- 4. 根据表格坐标,将其作为新的块插入到对应小节
|
|
|
- 5. 将每个小节的块列表按顺序拼接成纯文本输出
|
|
|
+ """提取文档结构(支持 OCR 异步并发)。
|
|
|
+
|
|
|
+ 整体分三步:
|
|
|
+ 1. 先扫描页面,找出需要 OCR 替换的表格区域
|
|
|
+ 2. 并发执行 OCR,并把识别结果按页回填
|
|
|
+ 3. 重新遍历页面文本,按标题规则切出 chapter / section 结构
|
|
|
"""
|
|
|
|
|
|
def _emit_progress(stage: str, current: int, message: str):
|
|
|
@@ -152,38 +236,106 @@ class PdfStructureExtractor:
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
- total_pages = len(doc)
|
|
|
+ # === 阶段1: 收集所有需要 OCR 的表格区域 ===
|
|
|
+ table_regions: List[TableRegion] = []
|
|
|
+
|
|
|
+ if self.use_ocr:
|
|
|
+ logger.info("[OCR预处理] 扫描所有页面的表格区域...")
|
|
|
+ total_pages = len(doc)
|
|
|
+ for page_num in range(total_pages):
|
|
|
+ page = doc.load_page(page_num)
|
|
|
+ rect = page.rect
|
|
|
+ clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
|
|
|
+ regions = self._detect_table_regions(page, page_num + 1, clip_box)
|
|
|
+ for bbox, score in regions:
|
|
|
+ table_regions.append(TableRegion(
|
|
|
+ page_num=page_num + 1,
|
|
|
+ page=page,
|
|
|
+ bbox=bbox,
|
|
|
+ score=score
|
|
|
+ ))
|
|
|
+ # 每5页或最后一页推送一次进度
|
|
|
+ if (page_num + 1) % 5 == 0 or page_num == total_pages - 1:
|
|
|
+ progress = int((page_num + 1) / total_pages * 30) # OCR预处理占30%进度
|
|
|
+ _emit_progress("版面分析", progress, f"扫描页面 {page_num + 1}/{total_pages}")
|
|
|
+ logger.info(f"[OCR预处理] 共发现 {len(table_regions)} 个表格区域需要 OCR")
|
|
|
+
|
|
|
+ # === 阶段2: 异步并发执行 OCR (5并发) ===
|
|
|
+ ocr_results: List[OcrResult] = []
|
|
|
|
|
|
- # ==================== 阶段1: 提取带坐标的文本块并归属到章节/小节====================
|
|
|
- logger.info("[阶段1] 提取带坐标的文本块并归属章节...")
|
|
|
+ if table_regions:
|
|
|
+ logger.info(f"[OCR执行] 使用 {self.OCR_CONCURRENT_WORKERS} 并发执行 OCR...")
|
|
|
+ _emit_progress("版面分析", 35, f"发现 {len(table_regions)} 个表格,开始OCR识别...")
|
|
|
+ ocr_results = self._process_ocr_concurrent(table_regions, progress_callback=_emit_progress)
|
|
|
+ success_count = sum(1 for r in ocr_results if r.success)
|
|
|
+ logger.info(f"[OCR执行] 完成 {success_count}/{len(table_regions)} 个表格 OCR")
|
|
|
+ _emit_progress("版面分析", 50, f"OCR识别完成 {success_count}/{len(table_regions)}")
|
|
|
|
|
|
- # 数据结构: {(chapter_name, section_name): [blocks_with_position]}
|
|
|
- chapter_blocks: Dict[Tuple[str, str], List[Dict[str, Any]]] = {}
|
|
|
+ # 按页码分组 OCR 结果
|
|
|
+ ocr_by_page: Dict[int, List[OcrResult]] = {}
|
|
|
+ for result in ocr_results:
|
|
|
+ if result.success:
|
|
|
+ if result.page_num not in ocr_by_page:
|
|
|
+ ocr_by_page[result.page_num] = []
|
|
|
+ ocr_by_page[result.page_num].append(result)
|
|
|
+
|
|
|
+ # === 阶段3: 提取页面文本(应用 OCR 结果)并切分章节 ===
|
|
|
+ structured_data: Dict[str, Dict[str, Dict[str, Any]]] = {}
|
|
|
+ # body_lines 保留过滤页眉页脚后的线性正文,后续目录回填时会再次按标题边界切段。
|
|
|
+ body_lines: List[Dict[str, Any]] = []
|
|
|
current_chapter = "未分类前言"
|
|
|
current_section = "默认部分"
|
|
|
in_body = False
|
|
|
+ candidate_rule_names: Optional[List[str]] = None
|
|
|
+ active_rule_name: Optional[str] = None
|
|
|
|
|
|
- for page_num in range(total_pages):
|
|
|
+ logger.info("[文本提取] 提取页面内容并切分章节...")
|
|
|
+
|
|
|
+ for page_num in range(len(doc)):
|
|
|
page = doc.load_page(page_num)
|
|
|
rect = page.rect
|
|
|
clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
|
|
|
|
|
|
- # 获取带坐标的文本块
|
|
|
- blocks = self._extract_text_blocks_with_position(page, clip_box)
|
|
|
-
|
|
|
- for block in blocks:
|
|
|
- line = block["text"]
|
|
|
-
|
|
|
- # 跳过空行和页眉页脚
|
|
|
- if not line.strip():
|
|
|
+ # 获取页面文本(应用 OCR 结果)
|
|
|
+ if page_num + 1 in ocr_by_page:
|
|
|
+ original_text = page.get_text("text", clip=clip_box)
|
|
|
+ ocr_results_list = [
|
|
|
+ {
|
|
|
+ "region_index": i,
|
|
|
+ "bbox": r.bbox,
|
|
|
+ "score": r.score,
|
|
|
+ "ocr_text": r.text,
|
|
|
+ }
|
|
|
+ for i, r in enumerate(ocr_by_page[page_num + 1])
|
|
|
+ ]
|
|
|
+ text = self._replace_table_regions(page, original_text, ocr_results_list, clip_box)
|
|
|
+ else:
|
|
|
+ text = page.get_text("text", clip=clip_box)
|
|
|
+
|
|
|
+ lines = self._prepare_page_lines(text)
|
|
|
+ for line in lines:
|
|
|
+ if not line or self._is_header_footer(line):
|
|
|
+ continue
|
|
|
+ body_lines.append({
|
|
|
+ "page": page_num + 1,
|
|
|
+ "text": line,
|
|
|
+ })
|
|
|
+
|
|
|
+ for line in lines:
|
|
|
+ line = line.strip()
|
|
|
+ if not line:
|
|
|
continue
|
|
|
if self._is_header_footer(line):
|
|
|
continue
|
|
|
|
|
|
# 跳过目录阶段
|
|
|
if not in_body:
|
|
|
- if self.CHAPTER_PATTERN.match(line) and not self.TOC_PATTERN.search(line):
|
|
|
+ # 只有首次遇到真正的一级标题后,才认为进入正文。
|
|
|
+ # 这样可以避免目录页虽然命中标题规则,却被误当成正文结构。
|
|
|
+ matched_rules = self._matching_rule_names(line, "l1")
|
|
|
+ if matched_rules and not self.TOC_PATTERN.search(line):
|
|
|
in_body = True
|
|
|
+ candidate_rule_names = matched_rules
|
|
|
else:
|
|
|
continue
|
|
|
|
|
|
@@ -191,291 +343,1367 @@ class PdfStructureExtractor:
|
|
|
if self.TOC_PATTERN.search(line):
|
|
|
continue
|
|
|
|
|
|
+ # candidate_rule_names 表示“这篇文档可能使用的标题体系”;
|
|
|
+ # active_rule_name 表示“已经确认正在使用的二级标题规则”。
|
|
|
+ # 先宽松候选、后收敛到单一规则,可以减少混合编号文档里的串匹配。
|
|
|
+ active_scope = [active_rule_name] if active_rule_name else candidate_rule_names
|
|
|
+
|
|
|
# 匹配章标题
|
|
|
- if self.CHAPTER_PATTERN.match(line):
|
|
|
+ matched_chapter_rules = self._matching_rule_names(line, "l1", active_scope)
|
|
|
+ if matched_chapter_rules:
|
|
|
+ if active_rule_name is None:
|
|
|
+ candidate_rule_names = matched_chapter_rules
|
|
|
current_chapter = self._clean_chapter_title(line)
|
|
|
current_section = "章节标题"
|
|
|
- key = (current_chapter, current_section)
|
|
|
- if key not in chapter_blocks:
|
|
|
- chapter_blocks[key] = []
|
|
|
- chapter_blocks[key].append(block)
|
|
|
+ if current_chapter not in structured_data:
|
|
|
+ structured_data[current_chapter] = {}
|
|
|
+ if current_section not in structured_data[current_chapter]:
|
|
|
+ structured_data[current_chapter][current_section] = {
|
|
|
+ "lines": [],
|
|
|
+ "page_start": page_num + 1,
|
|
|
+ "page_end": page_num + 1,
|
|
|
+ }
|
|
|
continue
|
|
|
|
|
|
# 匹配节标题
|
|
|
- if self.SECTION_PATTERN.match(line):
|
|
|
- current_section = line
|
|
|
- key = (current_chapter, current_section)
|
|
|
- if key not in chapter_blocks:
|
|
|
- chapter_blocks[key] = []
|
|
|
- chapter_blocks[key].append(block)
|
|
|
+ matched_section_rules = self._matching_rule_names(line, "l2", active_scope)
|
|
|
+ if matched_section_rules:
|
|
|
+ if active_rule_name is None:
|
|
|
+ if candidate_rule_names:
|
|
|
+ for rule_name in candidate_rule_names:
|
|
|
+ if rule_name in matched_section_rules:
|
|
|
+ active_rule_name = rule_name
|
|
|
+ break
|
|
|
+ if active_rule_name is None:
|
|
|
+ active_rule_name = matched_section_rules[0]
|
|
|
+ current_section = self._clean_section_title(line)
|
|
|
+ if current_chapter not in structured_data:
|
|
|
+ structured_data[current_chapter] = {}
|
|
|
+ if current_section not in structured_data[current_chapter]:
|
|
|
+ structured_data[current_chapter][current_section] = {
|
|
|
+ "lines": [],
|
|
|
+ "page_start": page_num + 1,
|
|
|
+ "page_end": page_num + 1,
|
|
|
+ }
|
|
|
continue
|
|
|
|
|
|
- # 普通内容块
|
|
|
- key = (current_chapter, current_section)
|
|
|
- if key not in chapter_blocks:
|
|
|
- chapter_blocks[key] = []
|
|
|
- chapter_blocks[key].append(block)
|
|
|
+ # 确保结构存在
|
|
|
+ if current_chapter not in structured_data:
|
|
|
+ structured_data[current_chapter] = {}
|
|
|
+ if current_section not in structured_data[current_chapter]:
|
|
|
+ structured_data[current_chapter][current_section] = {
|
|
|
+ "lines": [],
|
|
|
+ "page_start": page_num + 1,
|
|
|
+ "page_end": page_num + 1,
|
|
|
+ }
|
|
|
|
|
|
- logger.info(f"[阶段1] 章节结构提取完成,共 {len({k[0] for k in chapter_blocks})} 个章节")
|
|
|
+ # 添加内容
|
|
|
+ structured_data[current_chapter][current_section]["lines"].append(line)
|
|
|
+ structured_data[current_chapter][current_section]["page_end"] = page_num + 1
|
|
|
+
|
|
|
+ # 将行列表拼接为文本
|
|
|
+ result: Dict[str, Any] = {"chapters": {}, "_body_lines": body_lines}
|
|
|
+ for chap, sections in structured_data.items():
|
|
|
+ result["chapters"][chap] = {}
|
|
|
+ for sec, data in sections.items():
|
|
|
+ result["chapters"][chap][sec] = {
|
|
|
+ "content": "\n".join(data["lines"]),
|
|
|
+ "page_start": data["page_start"],
|
|
|
+ "page_end": data["page_end"],
|
|
|
+ }
|
|
|
|
|
|
- # ==================== 阶段2: 收集表格区域并OCR(如果启用OCR)====================
|
|
|
- table_regions: List[TableRegion] = []
|
|
|
- ocr_results: List[OcrResult] = []
|
|
|
+ logger.info(f"[PdfExtractor] 提取完成,共 {len(result['chapters'])} 个章节")
|
|
|
+ return result
|
|
|
|
|
|
- if self.use_ocr and self._ocr_processor:
|
|
|
- logger.info("[阶段2] 扫描表格区域...")
|
|
|
- for page_num in range(total_pages):
|
|
|
- page = doc.load_page(page_num)
|
|
|
- rect = page.rect
|
|
|
- clip_box = fitz.Rect(0, self.clip_top, rect.width, rect.height - self.clip_bottom)
|
|
|
- regions = self._ocr_processor.detect_table_regions(page, page_num + 1, clip_box)
|
|
|
- for bbox, score in regions:
|
|
|
- table_regions.append(TableRegion(
|
|
|
- page_num=page_num + 1,
|
|
|
- page=page,
|
|
|
- bbox=bbox,
|
|
|
- score=score
|
|
|
- ))
|
|
|
- # 每5页推送进度
|
|
|
- if (page_num + 1) % 5 == 0 or page_num == total_pages - 1:
|
|
|
- progress = int((page_num + 1) / total_pages * 30)
|
|
|
- _emit_progress("版面分析", progress, f"扫描页面 {page_num + 1}/{total_pages}")
|
|
|
+ def _normalize_catalog(self, catalog: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
+ """统一目录来源并择优合并。
|
|
|
|
|
|
- logger.info(f"[阶段2] 发现 {len(table_regions)} 个表格区域")
|
|
|
+ 目录检测器输出的 chapters 更像“骨架”,raw_ocr_text 更接近页面原文。
|
|
|
+ 这里会分别解析两份结果,判断谁更可信,再做一次合并补齐。
|
|
|
+ """
|
|
|
+ if not catalog:
|
|
|
+ return {}
|
|
|
+
|
|
|
+ normalized = dict(catalog)
|
|
|
+ existing_chapters = self._sanitize_catalog_chapters(catalog.get("chapters", []))
|
|
|
+ raw_text = catalog.get("raw_ocr_text", "")
|
|
|
+ parsed_chapters = self._parse_catalog_from_raw_text(raw_text) if isinstance(raw_text, str) else []
|
|
|
+ selected_chapters = existing_chapters
|
|
|
+
|
|
|
+ if parsed_chapters:
|
|
|
+ if self._should_prefer_parsed_catalog(parsed_chapters, existing_chapters):
|
|
|
+ selected_chapters = parsed_chapters
|
|
|
+ elif existing_chapters:
|
|
|
+ logger.info(
|
|
|
+ "[PDF提取] raw_ocr_text目录解析结果异常,保留原始目录骨架: "
|
|
|
+ f"parsed={len(parsed_chapters)}, original={len(existing_chapters)}"
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ selected_chapters = parsed_chapters
|
|
|
|
|
|
- # 执行OCR
|
|
|
- if table_regions:
|
|
|
- _emit_progress("版面分析", 35, f"发现 {len(table_regions)} 个表格,开始OCR识别...")
|
|
|
- ocr_results = self._ocr_processor.process_ocr_concurrent(
|
|
|
- table_regions,
|
|
|
- progress_callback=lambda completed, total: _emit_progress(
|
|
|
- "版面分析", 35 + int(completed / total * 15), f"OCR识别中 {completed}/{total}"
|
|
|
- )
|
|
|
+ if selected_chapters:
|
|
|
+ selected_chapters = self._merge_catalog_chapters(
|
|
|
+ selected_chapters,
|
|
|
+ parsed_chapters,
|
|
|
+ )
|
|
|
+ normalized["chapters"] = selected_chapters
|
|
|
+ normalized["total_chapters"] = len(selected_chapters)
|
|
|
+ normalized["formatted_text"] = self._format_catalog_chapters(selected_chapters)
|
|
|
+ return normalized
|
|
|
+
|
|
|
+ def _parse_catalog_from_raw_text(self, text: str) -> List[Dict[str, Any]]:
|
|
|
+ """把目录页 OCR 原文解析成章节树。
|
|
|
+
|
|
|
+ 解析时会先根据首批命中的一级标题推断文档的目录样式,
|
|
|
+ 后续再尽量沿用同一套规则收敛二级标题,避免不同编号体系互相污染。
|
|
|
+ """
|
|
|
+ if not text or not text.strip():
|
|
|
+ return []
|
|
|
+
|
|
|
+ chapters: List[Dict[str, Any]] = []
|
|
|
+ current_chapter: Optional[Dict[str, Any]] = None
|
|
|
+ active_l2_rule: Optional[str] = None
|
|
|
+ document_l1_rules: Optional[List[str]] = None
|
|
|
+
|
|
|
+ for raw_line in self._prepare_catalog_raw_lines(text):
|
|
|
+ title_text, page = self._split_catalog_entry(raw_line)
|
|
|
+ if not title_text:
|
|
|
+ continue
|
|
|
+
|
|
|
+ compact = re.sub(r"\s+", "", title_text)
|
|
|
+ if compact in {"目录", "目錄"}:
|
|
|
+ continue
|
|
|
+
|
|
|
+ chapter_matches = self._matching_rule_names(title_text, "l1", document_l1_rules)
|
|
|
+ if chapter_matches:
|
|
|
+ if document_l1_rules is None:
|
|
|
+ document_l1_rules = chapter_matches
|
|
|
+ current_chapter = {
|
|
|
+ "index": len(chapters) + 1,
|
|
|
+ "title": self._clean_chapter_title(title_text),
|
|
|
+ "page": str(page or 1),
|
|
|
+ "original": raw_line.strip(),
|
|
|
+ "subsections": [],
|
|
|
+ }
|
|
|
+ chapters.append(current_chapter)
|
|
|
+ active_l2_rule = None
|
|
|
+ continue
|
|
|
+
|
|
|
+ if current_chapter is None:
|
|
|
+ continue
|
|
|
+
|
|
|
+ section_matches = self._matching_rule_names(title_text, "l2")
|
|
|
+ if not section_matches:
|
|
|
+ numeric_section_title = self._coerce_numeric_catalog_section(
|
|
|
+ title_text,
|
|
|
+ document_l1_rules,
|
|
|
+ active_l2_rule,
|
|
|
)
|
|
|
- success_count = sum(1 for r in ocr_results if r.success)
|
|
|
- logger.info(f"[阶段2] OCR完成 {success_count}/{len(table_regions)}")
|
|
|
- _emit_progress("版面分析", 50, f"OCR识别完成 {success_count}/{len(table_regions)}")
|
|
|
-
|
|
|
- # ==================== 阶段3: 将OCR结果作为新块插入到对应章节====================
|
|
|
- if ocr_results:
|
|
|
- logger.info("[阶段3] 将OCR结果回填到对应章节...")
|
|
|
- self._insert_ocr_blocks_into_chapters(chapter_blocks, ocr_results)
|
|
|
-
|
|
|
- # ==================== 阶段4: 生成最终输出(块列表转纯文本)====================
|
|
|
- logger.info("[阶段4] 生成最终文本输出...")
|
|
|
- result: Dict[str, Any] = {"chapters": {}}
|
|
|
-
|
|
|
- for (chap_name, sec_name), blocks in chapter_blocks.items():
|
|
|
- if chap_name not in result["chapters"]:
|
|
|
- result["chapters"][chap_name] = {}
|
|
|
-
|
|
|
- # 按页码和Y坐标排序块
|
|
|
- blocks.sort(key=lambda b: (b["page"], b["bbox"][1]))
|
|
|
-
|
|
|
- # 拼接文本
|
|
|
- lines = []
|
|
|
- page_start = blocks[0]["page"] if blocks else 1
|
|
|
- page_end = blocks[-1]["page"] if blocks else 1
|
|
|
-
|
|
|
- for block in blocks:
|
|
|
- if block.get("type") == "table":
|
|
|
- lines.append(f"\n[表格OCR识别结果]:\n{block['text']}\n[/表格]\n")
|
|
|
- else:
|
|
|
- lines.append(block["text"])
|
|
|
+ if numeric_section_title:
|
|
|
+ section_key = self._normalize_heading_key(numeric_section_title)
|
|
|
+ existing_keys = {
|
|
|
+ self._normalize_heading_key(sub.get("title", ""))
|
|
|
+ for sub in current_chapter.get("subsections", [])
|
|
|
+ }
|
|
|
+ if section_key not in existing_keys:
|
|
|
+ current_chapter["subsections"].append({
|
|
|
+ "title": numeric_section_title,
|
|
|
+ "page": str(page or current_chapter.get("page", 1)),
|
|
|
+ "level": 2,
|
|
|
+ "original": raw_line.strip(),
|
|
|
+ })
|
|
|
+ continue
|
|
|
|
|
|
- result["chapters"][chap_name][sec_name] = {
|
|
|
- "content": "\n".join(lines),
|
|
|
- "page_start": page_start,
|
|
|
- "page_end": page_end,
|
|
|
+ if active_l2_rule is None:
|
|
|
+ active_l2_rule = section_matches[0]
|
|
|
+ if active_l2_rule not in section_matches:
|
|
|
+ continue
|
|
|
+
|
|
|
+ section_title = self._clean_section_title(title_text)
|
|
|
+ section_key = self._normalize_heading_key(section_title)
|
|
|
+ existing_keys = {
|
|
|
+ self._normalize_heading_key(sub.get("title", ""))
|
|
|
+ for sub in current_chapter.get("subsections", [])
|
|
|
}
|
|
|
+ if section_key in existing_keys:
|
|
|
+ continue
|
|
|
|
|
|
- logger.info(f"[PdfExtractor] 提取完成,共 {len(result['chapters'])} 个章节")
|
|
|
- return result
|
|
|
+ current_chapter["subsections"].append({
|
|
|
+ "title": section_title,
|
|
|
+ "page": str(page or current_chapter.get("page", 1)),
|
|
|
+ "level": 2,
|
|
|
+ "original": raw_line.strip(),
|
|
|
+ })
|
|
|
|
|
|
- def _extract_text_blocks_with_position(
|
|
|
- self,
|
|
|
- page: fitz.Page,
|
|
|
- clip_box: fitz.Rect
|
|
|
+ return chapters
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _sanitize_catalog_chapters(cls, chapters: Any) -> List[Dict[str, Any]]:
|
|
|
+ if not isinstance(chapters, list):
|
|
|
+ return []
|
|
|
+
|
|
|
+ sanitized: List[Dict[str, Any]] = []
|
|
|
+ seen_chapter_keys: Set[str] = set()
|
|
|
+
|
|
|
+ for idx, chapter in enumerate(chapters, 1):
|
|
|
+ if not isinstance(chapter, dict):
|
|
|
+ continue
|
|
|
+
|
|
|
+ chapter_title = cls._clean_chapter_title(str(chapter.get("title", "") or ""))
|
|
|
+ chapter_key = cls._normalize_heading_key(chapter_title)
|
|
|
+ if not chapter_key or chapter_key in seen_chapter_keys:
|
|
|
+ continue
|
|
|
+
|
|
|
+ seen_chapter_keys.add(chapter_key)
|
|
|
+ chapter_page = str(chapter.get("page") or idx)
|
|
|
+ subsections: List[Dict[str, Any]] = []
|
|
|
+ seen_section_keys: Set[str] = set()
|
|
|
+
|
|
|
+ for subsection in chapter.get("subsections", []) or []:
|
|
|
+ if not isinstance(subsection, dict):
|
|
|
+ continue
|
|
|
+
|
|
|
+ section_title = cls._clean_section_title(str(subsection.get("title", "") or ""))
|
|
|
+ section_key = cls._normalize_heading_key(section_title)
|
|
|
+ if not section_key or section_key in seen_section_keys:
|
|
|
+ continue
|
|
|
+
|
|
|
+ seen_section_keys.add(section_key)
|
|
|
+ subsections.append({
|
|
|
+ "title": section_title,
|
|
|
+ "page": str(subsection.get("page") or chapter_page),
|
|
|
+ "level": 2,
|
|
|
+ "original": subsection.get("original", "") or section_title,
|
|
|
+ })
|
|
|
+
|
|
|
+ sanitized.append({
|
|
|
+ "index": len(sanitized) + 1,
|
|
|
+ "title": chapter_title,
|
|
|
+ "page": chapter_page,
|
|
|
+ "original": chapter.get("original", "") or chapter_title,
|
|
|
+ "subsections": subsections,
|
|
|
+ })
|
|
|
+
|
|
|
+ return sanitized
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _prepare_catalog_raw_lines(cls, text: str) -> List[str]:
|
|
|
+ raw_lines = [line.strip() for line in text.splitlines() if line.strip()]
|
|
|
+ prepared: List[str] = []
|
|
|
+ index = 0
|
|
|
+
|
|
|
+ while index < len(raw_lines):
|
|
|
+ current = raw_lines[index].strip()
|
|
|
+ compact_current = re.sub(r"\s+", "", current)
|
|
|
+
|
|
|
+ if compact_current in {"目", "錄", "录"} and index + 1 < len(raw_lines):
|
|
|
+ next_compact = re.sub(r"\s+", "", raw_lines[index + 1].strip())
|
|
|
+ if compact_current + next_compact in {"目录", "目錄"}:
|
|
|
+ prepared.append(compact_current + next_compact)
|
|
|
+ index += 2
|
|
|
+ continue
|
|
|
+
|
|
|
+ if cls._is_incomplete_heading_fragment(current) and index + 1 < len(raw_lines):
|
|
|
+ next_line = raw_lines[index + 1].strip()
|
|
|
+ candidate = f"{current} {next_line}".strip()
|
|
|
+ _, candidate_page = cls._split_catalog_entry(candidate)
|
|
|
+ if (
|
|
|
+ cls._matching_rule_names(candidate, "l1")
|
|
|
+ or cls._matching_rule_names(candidate, "l2")
|
|
|
+ or candidate_page is not None
|
|
|
+ ):
|
|
|
+ prepared.append(candidate)
|
|
|
+ index += 2
|
|
|
+ continue
|
|
|
+
|
|
|
+ prepared.append(current)
|
|
|
+ index += 1
|
|
|
+
|
|
|
+ return prepared
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _should_prefer_parsed_catalog(
|
|
|
+ cls,
|
|
|
+ parsed_chapters: List[Dict[str, Any]],
|
|
|
+ existing_chapters: List[Dict[str, Any]],
|
|
|
+ ) -> bool:
|
|
|
+ if not parsed_chapters:
|
|
|
+ return False
|
|
|
+
|
|
|
+ parsed_is_suspicious = cls._catalog_has_suspicious_structure(parsed_chapters)
|
|
|
+ existing_is_suspicious = cls._catalog_has_suspicious_structure(existing_chapters)
|
|
|
+
|
|
|
+ if parsed_is_suspicious:
|
|
|
+ if not existing_chapters or not existing_is_suspicious:
|
|
|
+ return False
|
|
|
+
|
|
|
+ parsed_score = cls._catalog_structure_score(parsed_chapters)
|
|
|
+ existing_score = cls._catalog_structure_score(existing_chapters)
|
|
|
+ overlap_ratio = cls._catalog_chapter_overlap_ratio(parsed_chapters, existing_chapters)
|
|
|
+ return overlap_ratio >= 0.6 and parsed_score > existing_score
|
|
|
+
|
|
|
+ if not existing_chapters:
|
|
|
+ return True
|
|
|
+
|
|
|
+ if existing_is_suspicious:
|
|
|
+ return True
|
|
|
+
|
|
|
+ if cls._should_prefer_single_level_parsed_catalog(parsed_chapters, existing_chapters):
|
|
|
+ return True
|
|
|
+
|
|
|
+ parsed_score = cls._catalog_structure_score(parsed_chapters)
|
|
|
+ existing_score = cls._catalog_structure_score(existing_chapters)
|
|
|
+ if parsed_score <= existing_score:
|
|
|
+ return False
|
|
|
+
|
|
|
+ if not cls._catalog_has_suspicious_structure(existing_chapters):
|
|
|
+ existing_count = len(existing_chapters)
|
|
|
+ parsed_count = len(parsed_chapters)
|
|
|
+ if parsed_count > max(existing_count * 2, existing_count + 8):
|
|
|
+ return False
|
|
|
+ if existing_count >= 4 and parsed_count < max(2, existing_count // 2):
|
|
|
+ return False
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _should_prefer_single_level_parsed_catalog(
|
|
|
+ cls,
|
|
|
+ parsed_chapters: List[Dict[str, Any]],
|
|
|
+ existing_chapters: List[Dict[str, Any]],
|
|
|
+ ) -> bool:
|
|
|
+ """特判“单层目录被误识别成一章多节”的场景。"""
|
|
|
+ if len(parsed_chapters) < 2 or len(existing_chapters) != 1:
|
|
|
+ return False
|
|
|
+
|
|
|
+ if any(chapter.get("subsections") for chapter in parsed_chapters):
|
|
|
+ return False
|
|
|
+
|
|
|
+ existing_subsections = existing_chapters[0].get("subsections", []) or []
|
|
|
+ if len(existing_subsections) < len(parsed_chapters) - 1:
|
|
|
+ return False
|
|
|
+
|
|
|
+ parsed_pages = [
|
|
|
+ cls._safe_page_number(chapter.get("page"), 1)
|
|
|
+ for chapter in parsed_chapters
|
|
|
+ ]
|
|
|
+ return parsed_pages == sorted(parsed_pages)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _catalog_has_suspicious_structure(cls, chapters: List[Dict[str, Any]]) -> bool:
|
|
|
+ if not chapters:
|
|
|
+ return False
|
|
|
+
|
|
|
+ titles = [(chapter.get("title", "") or "").strip() for chapter in chapters]
|
|
|
+ chinese_chapter_count = sum(
|
|
|
+ 1 for title in titles
|
|
|
+ if re.match(r"^第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章节部分篇]", title)
|
|
|
+ )
|
|
|
+ numeric_heading_count = sum(
|
|
|
+ 1 for title in titles
|
|
|
+ if re.match(r"^\d{1,2}(?:[\..。、])?\s+\S+", title)
|
|
|
+ )
|
|
|
+ embedded_numeric_body_count = 0
|
|
|
+ repeated_chapter_no_count = 0
|
|
|
+ reversed_chapter_no_count = 0
|
|
|
+ seen_chapter_numbers: Set[str] = set()
|
|
|
+ previous_numeric_chapter_no: Optional[int] = None
|
|
|
+
|
|
|
+ for title in titles:
|
|
|
+ chapter_match = re.match(
|
|
|
+ r"^第\s*(\d+|[一二三四五六七八九十百零两]+)\s*[章节部分篇]\s*(.*)$",
|
|
|
+ title,
|
|
|
+ )
|
|
|
+ if not chapter_match:
|
|
|
+ continue
|
|
|
+
|
|
|
+ chapter_no = re.sub(r"\s+", "", chapter_match.group(1))
|
|
|
+ chapter_body = (chapter_match.group(2) or "").strip()
|
|
|
+ if chapter_no in seen_chapter_numbers:
|
|
|
+ repeated_chapter_no_count += 1
|
|
|
+ seen_chapter_numbers.add(chapter_no)
|
|
|
+
|
|
|
+ if chapter_no.isdigit():
|
|
|
+ current_numeric_no = int(chapter_no)
|
|
|
+ if previous_numeric_chapter_no is not None and current_numeric_no < previous_numeric_chapter_no:
|
|
|
+ reversed_chapter_no_count += 1
|
|
|
+ previous_numeric_chapter_no = current_numeric_no
|
|
|
+
|
|
|
+ if re.match(r"^\d{1,2}(?:\.\d{1,2})*\.?(?:\s+|$)", chapter_body):
|
|
|
+ embedded_numeric_body_count += 1
|
|
|
+
|
|
|
+ if chinese_chapter_count >= 2 and numeric_heading_count >= max(3, chinese_chapter_count // 2):
|
|
|
+ return True
|
|
|
+
|
|
|
+ if chinese_chapter_count >= max(2, len(titles) // 3) and numeric_heading_count >= max(2, len(titles) // 6):
|
|
|
+ return True
|
|
|
+
|
|
|
+ if embedded_numeric_body_count >= max(2, len(titles) // 5):
|
|
|
+ return True
|
|
|
+
|
|
|
+ if repeated_chapter_no_count > 0 or reversed_chapter_no_count > 0:
|
|
|
+ return True
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _catalog_structure_score(chapters: List[Dict[str, Any]]) -> int:
|
|
|
+ score = 0
|
|
|
+ for chapter in chapters:
|
|
|
+ score += 1
|
|
|
+ score += len(chapter.get("subsections", []) or [])
|
|
|
+ return score
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _catalog_chapter_overlap_ratio(
|
|
|
+ cls,
|
|
|
+ chapters_a: List[Dict[str, Any]],
|
|
|
+ chapters_b: List[Dict[str, Any]],
|
|
|
+ ) -> float:
|
|
|
+ if not chapters_a or not chapters_b:
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+ keys_a = {
|
|
|
+ cls._catalog_chapter_identity_key(chapter.get("title", ""))
|
|
|
+ for chapter in chapters_a
|
|
|
+ if chapter.get("title")
|
|
|
+ }
|
|
|
+ keys_b = {
|
|
|
+ cls._catalog_chapter_identity_key(chapter.get("title", ""))
|
|
|
+ for chapter in chapters_b
|
|
|
+ if chapter.get("title")
|
|
|
+ }
|
|
|
+ if not keys_a or not keys_b:
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+ return len(keys_a & keys_b) / max(1, min(len(keys_a), len(keys_b)))
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _catalog_chapter_identity_key(cls, title: str) -> str:
|
|
|
+ cleaned = cls._clean_chapter_title(title)
|
|
|
+ if not cleaned:
|
|
|
+ return ""
|
|
|
+
|
|
|
+ chapter_match = re.match(
|
|
|
+ r"^第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章节部分篇]\s*(.*)$",
|
|
|
+ cleaned,
|
|
|
+ )
|
|
|
+ if chapter_match:
|
|
|
+ chapter_body = cls._normalize_heading_key(chapter_match.group(1))
|
|
|
+ if chapter_body:
|
|
|
+ return chapter_body
|
|
|
+
|
|
|
+ numeric_match = re.match(r"^\d{1,2}(?:[\..。、])?\s*(.*)$", cleaned)
|
|
|
+ if numeric_match:
|
|
|
+ numeric_body = cls._normalize_heading_key(numeric_match.group(1))
|
|
|
+ if numeric_body:
|
|
|
+ return numeric_body
|
|
|
+
|
|
|
+ return cls._normalize_heading_key(cleaned)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _merge_catalog_chapters(
|
|
|
+ cls,
|
|
|
+ base_chapters: List[Dict[str, Any]],
|
|
|
+ supplemental_chapters: List[Dict[str, Any]],
|
|
|
) -> List[Dict[str, Any]]:
|
|
|
- """
|
|
|
- 提取带坐标的文本块列表。
|
|
|
+ if not base_chapters:
|
|
|
+ return supplemental_chapters or []
|
|
|
+ if not supplemental_chapters:
|
|
|
+ return base_chapters
|
|
|
+
|
|
|
+ merged: List[Dict[str, Any]] = []
|
|
|
+ supplemental_by_key = {
|
|
|
+ cls._catalog_chapter_identity_key(chapter.get("title", "")): chapter
|
|
|
+ for chapter in supplemental_chapters
|
|
|
+ if chapter.get("title")
|
|
|
+ }
|
|
|
+
|
|
|
+ for index, chapter in enumerate(base_chapters, 1):
|
|
|
+ chapter_copy = {
|
|
|
+ **chapter,
|
|
|
+ "subsections": [dict(sub) for sub in chapter.get("subsections", []) or []],
|
|
|
+ }
|
|
|
+ chapter_key = cls._catalog_chapter_identity_key(chapter_copy.get("title", ""))
|
|
|
+ supplemental = supplemental_by_key.get(chapter_key)
|
|
|
+ if supplemental:
|
|
|
+ merged_subsections = cls._merge_catalog_subsections(
|
|
|
+ chapter_copy.get("subsections", []),
|
|
|
+ supplemental.get("subsections", []) or [],
|
|
|
+ )
|
|
|
+ chapter_copy["subsections"] = merged_subsections
|
|
|
+ chapter_copy["index"] = index
|
|
|
+ merged.append(chapter_copy)
|
|
|
|
|
|
- 使用 page.get_text("dict") 获取每个文本块的精确边界框和文本内容。
|
|
|
- """
|
|
|
- blocks = []
|
|
|
- page_dict = page.get_text("dict", clip=clip_box)
|
|
|
-
|
|
|
- for block in page_dict.get("blocks", []):
|
|
|
- if block.get("type") == 0: # 文本块
|
|
|
- bbox = block["bbox"]
|
|
|
- y_center = (bbox[1] + bbox[3]) / 2
|
|
|
-
|
|
|
- # 拼接块内所有文本
|
|
|
- text_lines = []
|
|
|
- for line in block.get("lines", []):
|
|
|
- line_text = ""
|
|
|
- for span in line.get("spans", []):
|
|
|
- line_text += span.get("text", "")
|
|
|
- if line_text.strip():
|
|
|
- text_lines.append(line_text)
|
|
|
-
|
|
|
- if text_lines:
|
|
|
- blocks.append({
|
|
|
- "text": "\n".join(text_lines),
|
|
|
- "page": page.number + 1,
|
|
|
- "bbox": bbox,
|
|
|
- "y_center": y_center,
|
|
|
- "type": "text"
|
|
|
- })
|
|
|
-
|
|
|
- # 按阅读顺序排序(Y坐标为主,X坐标为辅)
|
|
|
- blocks.sort(key=lambda b: (b["page"], b["bbox"][1], b["bbox"][0]))
|
|
|
- return blocks
|
|
|
-
|
|
|
- def _insert_ocr_blocks_into_chapters(
|
|
|
+ return merged
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _merge_catalog_subsections(
|
|
|
+ cls,
|
|
|
+ base_subsections: List[Dict[str, Any]],
|
|
|
+ supplemental_subsections: List[Dict[str, Any]],
|
|
|
+ ) -> List[Dict[str, Any]]:
|
|
|
+ if not base_subsections:
|
|
|
+ return [dict(sub) for sub in supplemental_subsections]
|
|
|
+ if not supplemental_subsections:
|
|
|
+ return [dict(sub) for sub in base_subsections]
|
|
|
+
|
|
|
+ def _subsection_score(items: List[Dict[str, Any]]) -> int:
|
|
|
+ score = 0
|
|
|
+ for item in items:
|
|
|
+ title = (item.get("title", "") or "").strip()
|
|
|
+ if not title:
|
|
|
+ continue
|
|
|
+ score += 1
|
|
|
+ if re.match(r"^\d+\.\d+(?!\.\d)\.?\s*", title):
|
|
|
+ score += 3
|
|
|
+ elif re.match(r"^(第\s*[一二三四五六七八九十百零两]+\s*节)", title):
|
|
|
+ score += 3
|
|
|
+ elif re.match(r"^([一二三四五六七八九十百零两]+[、)\)\]])", title):
|
|
|
+ score += 3
|
|
|
+ elif re.match(r"^[【\[]\s*\d+\s*[\]】]", title):
|
|
|
+ score += 3
|
|
|
+ elif re.match(r"^\d{1,2}[\..。、]\s*", title):
|
|
|
+ score += 1
|
|
|
+ return score
|
|
|
+
|
|
|
+ base_score = _subsection_score(base_subsections)
|
|
|
+ supplemental_score = _subsection_score(supplemental_subsections)
|
|
|
+ if supplemental_score > base_score:
|
|
|
+ return [dict(sub) for sub in supplemental_subsections]
|
|
|
+
|
|
|
+ merged = [dict(sub) for sub in base_subsections]
|
|
|
+ seen_keys = {
|
|
|
+ cls._normalize_heading_key(sub.get("title", ""))
|
|
|
+ for sub in merged
|
|
|
+ if sub.get("title")
|
|
|
+ }
|
|
|
+ for subsection in supplemental_subsections:
|
|
|
+ subsection_key = cls._normalize_heading_key(subsection.get("title", ""))
|
|
|
+ if not subsection_key or subsection_key in seen_keys:
|
|
|
+ continue
|
|
|
+ merged.append(dict(subsection))
|
|
|
+ seen_keys.add(subsection_key)
|
|
|
+ return merged
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _coerce_numeric_catalog_section(
|
|
|
+ cls,
|
|
|
+ title_text: str,
|
|
|
+ document_l1_rules: Optional[List[str]],
|
|
|
+ active_l2_rule: Optional[str],
|
|
|
+ ) -> Optional[str]:
|
|
|
+ if active_l2_rule is not None:
|
|
|
+ return None
|
|
|
+
|
|
|
+ if not document_l1_rules:
|
|
|
+ return None
|
|
|
+
|
|
|
+ if "Rule_1_纯数字派" in document_l1_rules:
|
|
|
+ return None
|
|
|
+
|
|
|
+ if re.match(r"^\d{1,2}(?:[\..。、])?\s*(?!\d)[\u4e00-\u9fa5A-Za-z].*", title_text.strip()):
|
|
|
+ return cls._clean_section_title(title_text)
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _split_catalog_entry(line: str) -> Tuple[str, Optional[int]]:
|
|
|
+ cleaned = line.strip()
|
|
|
+ if not cleaned:
|
|
|
+ return "", None
|
|
|
+
|
|
|
+ cleaned = re.sub(r"\s+", " ", cleaned).strip()
|
|
|
+ page_match = re.search(
|
|
|
+ r"(?:[.\u2026\u00b7\u2022·• ]{2,})[-\u2013\u2014 ]*(\d+)\s*[-\u2013\u2014 ]*$",
|
|
|
+ cleaned,
|
|
|
+ )
|
|
|
+ if page_match:
|
|
|
+ title_text = cleaned[:page_match.start()].strip()
|
|
|
+ title_text = re.sub(r"[.\u2026\u00b7\u2022 ]+$", "", title_text).strip()
|
|
|
+ return title_text, int(page_match.group(1))
|
|
|
+
|
|
|
+ return cleaned, None
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _format_catalog_chapters(chapters: List[Dict[str, Any]]) -> str:
|
|
|
+ lines: List[str] = []
|
|
|
+ for chapter in chapters:
|
|
|
+ title = chapter.get("title", "").strip()
|
|
|
+ if not title:
|
|
|
+ continue
|
|
|
+ lines.append(title)
|
|
|
+ for sub in chapter.get("subsections", []):
|
|
|
+ sub_title = sub.get("title", "").strip()
|
|
|
+ if sub_title:
|
|
|
+ lines.append(f" {sub_title}")
|
|
|
+ return "\n".join(lines)
|
|
|
+
|
|
|
+ def _enrich_catalog_with_structure(
|
|
|
self,
|
|
|
- chapter_blocks: Dict[Tuple[str, str], List[Dict[str, Any]]],
|
|
|
- ocr_results: List[OcrResult]
|
|
|
- ) -> None:
|
|
|
+ catalog: Dict[str, Any],
|
|
|
+ chapters: Dict[str, Dict[str, Dict[str, Any]]],
|
|
|
+ ) -> Dict[str, Any]:
|
|
|
+ catalog_chapters = catalog.get("chapters", []) if isinstance(catalog, dict) else []
|
|
|
+ if not catalog_chapters or not chapters:
|
|
|
+ return catalog
|
|
|
+
|
|
|
+ enriched = dict(catalog)
|
|
|
+ structure_items = list(chapters.items())
|
|
|
+ structure_by_key = {
|
|
|
+ self._catalog_chapter_identity_key(chapter_title): (chapter_title, sections)
|
|
|
+ for chapter_title, sections in structure_items
|
|
|
+ }
|
|
|
+ used_structure_keys: Set[str] = set()
|
|
|
+
|
|
|
+ enriched_chapters: List[Dict[str, Any]] = []
|
|
|
+ for catalog_chapter in catalog_chapters:
|
|
|
+ chapter_copy = dict(catalog_chapter)
|
|
|
+ chapter_key = self._catalog_chapter_identity_key(chapter_copy.get("title", ""))
|
|
|
+ structure_match = structure_by_key.get(chapter_key)
|
|
|
+ if structure_match is None:
|
|
|
+ enriched_chapters.append(chapter_copy)
|
|
|
+ continue
|
|
|
+
|
|
|
+ structure_title, structure_sections = structure_match
|
|
|
+ used_structure_keys.add(chapter_key)
|
|
|
+ title_payload = structure_sections.get("章节标题", {})
|
|
|
+ chapter_copy["title"] = structure_title
|
|
|
+ chapter_copy["content"] = title_payload.get("content", "")
|
|
|
+ chapter_copy["page_start"] = title_payload.get("page_start", self._safe_page_number(chapter_copy.get("page")))
|
|
|
+ chapter_copy["page_end"] = title_payload.get("page_end", chapter_copy["page_start"])
|
|
|
+
|
|
|
+ structure_subsections = [
|
|
|
+ (section_title, payload)
|
|
|
+ for section_title, payload in structure_sections.items()
|
|
|
+ if section_title != "章节标题"
|
|
|
+ ]
|
|
|
+ catalog_subsections = chapter_copy.get("subsections", []) or []
|
|
|
+ subsection_by_key = {
|
|
|
+ self._normalize_heading_key(subsection.get("title", "")): subsection
|
|
|
+ for subsection in catalog_subsections
|
|
|
+ if subsection.get("title")
|
|
|
+ }
|
|
|
+
|
|
|
+ enriched_subsections: List[Dict[str, Any]] = []
|
|
|
+ for section_title, payload in structure_subsections:
|
|
|
+ section_key = self._normalize_heading_key(section_title)
|
|
|
+ subsection = dict(subsection_by_key.get(section_key, {}))
|
|
|
+ subsection.setdefault("title", section_title)
|
|
|
+ subsection.setdefault("page", str(payload.get("page_start", chapter_copy["page_start"])))
|
|
|
+ subsection.setdefault("level", 2)
|
|
|
+ subsection.setdefault("original", section_title)
|
|
|
+ subsection["content"] = payload.get("content", "")
|
|
|
+ subsection["page_start"] = payload.get("page_start", chapter_copy["page_start"])
|
|
|
+ subsection["page_end"] = payload.get("page_end", subsection["page_start"])
|
|
|
+ enriched_subsections.append(subsection)
|
|
|
+
|
|
|
+ chapter_copy["subsections"] = enriched_subsections
|
|
|
+ enriched_chapters.append(chapter_copy)
|
|
|
+
|
|
|
+ existing_catalog_keys = {
|
|
|
+ self._catalog_chapter_identity_key(chapter.get("title", ""))
|
|
|
+ for chapter in enriched_chapters
|
|
|
+ if chapter.get("title")
|
|
|
+ }
|
|
|
+ for chapter_title, structure_sections in structure_items:
|
|
|
+ chapter_key = self._catalog_chapter_identity_key(chapter_title)
|
|
|
+ if chapter_key in existing_catalog_keys or chapter_key in used_structure_keys:
|
|
|
+ continue
|
|
|
+
|
|
|
+ title_payload = structure_sections.get("章节标题", {})
|
|
|
+ new_chapter = {
|
|
|
+ "index": len(enriched_chapters) + 1,
|
|
|
+ "title": chapter_title,
|
|
|
+ "page": str(title_payload.get("page_start", 1)),
|
|
|
+ "original": chapter_title,
|
|
|
+ "content": title_payload.get("content", ""),
|
|
|
+ "page_start": title_payload.get("page_start", 1),
|
|
|
+ "page_end": title_payload.get("page_end", title_payload.get("page_start", 1)),
|
|
|
+ "subsections": [],
|
|
|
+ }
|
|
|
+ for section_title, payload in structure_sections.items():
|
|
|
+ if section_title == "章节标题":
|
|
|
+ continue
|
|
|
+ new_chapter["subsections"].append({
|
|
|
+ "title": section_title,
|
|
|
+ "page": str(payload.get("page_start", new_chapter["page_start"])),
|
|
|
+ "level": 2,
|
|
|
+ "original": section_title,
|
|
|
+ "content": payload.get("content", ""),
|
|
|
+ "page_start": payload.get("page_start", new_chapter["page_start"]),
|
|
|
+ "page_end": payload.get("page_end", payload.get("page_start", new_chapter["page_start"])),
|
|
|
+ })
|
|
|
+ enriched_chapters.append(new_chapter)
|
|
|
+
|
|
|
+ for index, chapter in enumerate(enriched_chapters, 1):
|
|
|
+ chapter["index"] = index
|
|
|
+
|
|
|
+ enriched["chapters"] = enriched_chapters
|
|
|
+ enriched["total_chapters"] = len(enriched_chapters)
|
|
|
+ enriched["formatted_text"] = self._format_catalog_chapters(enriched_chapters)
|
|
|
+ return enriched
|
|
|
+
|
|
|
+ def _reconcile_structure_with_catalog(
|
|
|
+ self,
|
|
|
+ chapters: Dict[str, Dict[str, Dict[str, Any]]],
|
|
|
+ catalog: Dict[str, Any],
|
|
|
+ ) -> Dict[str, Dict[str, Dict[str, Any]]]:
|
|
|
+ """把正文抽取结果挂回目录骨架。
|
|
|
+
|
|
|
+ 正文抽取结果通常 content 更完整,但层级可能漏掉;
|
|
|
+ 目录结果层级更稳,但 content 为空或不完整。
|
|
|
+ 这里按标题归一化后顺序匹配,把正文内容重新映射回目录结构。
|
|
|
"""
|
|
|
- 将OCR结果作为新的块插入到对应章节。
|
|
|
+ catalog_chapters = catalog.get("chapters", []) if isinstance(catalog, dict) else []
|
|
|
+ if not chapters or not catalog_chapters:
|
|
|
+ return chapters
|
|
|
+
|
|
|
+ section_title_key = "章节标题"
|
|
|
+ # 将正文结构拆成“章标题内容”和“所有节标题内容”两条索引,方便后续按目录顺序逐项匹配。
|
|
|
+ chapter_title_payloads: Dict[str, List[Dict[str, Any]]] = {}
|
|
|
+ flat_sections: List[Tuple[str, Dict[str, Any]]] = []
|
|
|
+ matched_chapter_count = 0
|
|
|
+ matched_section_count = 0
|
|
|
+ total_catalog_sections = 0
|
|
|
+
|
|
|
+ for chapter_title, sections in chapters.items():
|
|
|
+ title_key = self._normalize_heading_key(chapter_title)
|
|
|
+ title_payload = sections.get(section_title_key)
|
|
|
+ if title_payload is not None:
|
|
|
+ chapter_title_payloads.setdefault(title_key, []).append({
|
|
|
+ "content": title_payload.get("content", ""),
|
|
|
+ "page_start": title_payload.get("page_start", 1),
|
|
|
+ "page_end": title_payload.get("page_end", title_payload.get("page_start", 1)),
|
|
|
+ })
|
|
|
+
|
|
|
+ for section_title, payload in sections.items():
|
|
|
+ if section_title == section_title_key:
|
|
|
+ continue
|
|
|
+ flat_sections.append((
|
|
|
+ self._normalize_heading_key(section_title),
|
|
|
+ {
|
|
|
+ "content": payload.get("content", ""),
|
|
|
+ "page_start": payload.get("page_start", 1),
|
|
|
+ "page_end": payload.get("page_end", payload.get("page_start", 1)),
|
|
|
+ },
|
|
|
+ ))
|
|
|
+
|
|
|
+ rebuilt: Dict[str, Dict[str, Dict[str, Any]]] = {}
|
|
|
+ # 优先按顺序向后匹配,找不到时再全局回退一次,兼顾正确率和容错性。
|
|
|
+ search_start = 0
|
|
|
+ used_indices = set()
|
|
|
+
|
|
|
+ for chapter in catalog_chapters:
|
|
|
+ chapter_title = (chapter.get("title", "") or "").strip()
|
|
|
+ if not chapter_title:
|
|
|
+ continue
|
|
|
+
|
|
|
+ chapter_page = self._safe_page_number(chapter.get("page"))
|
|
|
+ chapter_key = self._normalize_heading_key(chapter_title)
|
|
|
+ title_candidates = chapter_title_payloads.get(chapter_key, [])
|
|
|
+ has_title_match = bool(title_candidates)
|
|
|
+ title_payload = title_candidates.pop(0) if title_candidates else self._empty_section_payload(chapter_page)
|
|
|
+ if has_title_match:
|
|
|
+ matched_chapter_count += 1
|
|
|
+
|
|
|
+ rebuilt[chapter_title] = {
|
|
|
+ section_title_key: title_payload,
|
|
|
+ }
|
|
|
+
|
|
|
+ for subsection in chapter.get("subsections", []):
|
|
|
+ section_title = (subsection.get("title", "") or "").strip()
|
|
|
+ if not section_title:
|
|
|
+ continue
|
|
|
+ total_catalog_sections += 1
|
|
|
+
|
|
|
+ target_key = self._normalize_heading_key(section_title)
|
|
|
+ match_index = None
|
|
|
+ for idx in range(search_start, len(flat_sections)):
|
|
|
+ if idx in used_indices:
|
|
|
+ continue
|
|
|
+ if flat_sections[idx][0] == target_key:
|
|
|
+ match_index = idx
|
|
|
+ break
|
|
|
+ if match_index is None:
|
|
|
+ for idx, (section_key, _) in enumerate(flat_sections):
|
|
|
+ if idx in used_indices:
|
|
|
+ continue
|
|
|
+ if section_key == target_key:
|
|
|
+ match_index = idx
|
|
|
+ break
|
|
|
+
|
|
|
+ if match_index is not None:
|
|
|
+ used_indices.add(match_index)
|
|
|
+ search_start = max(search_start, match_index + 1)
|
|
|
+ rebuilt[chapter_title][section_title] = flat_sections[match_index][1]
|
|
|
+ matched_section_count += 1
|
|
|
+ else:
|
|
|
+ rebuilt[chapter_title][section_title] = self._empty_section_payload(
|
|
|
+ self._safe_page_number(subsection.get("page"), chapter_page)
|
|
|
+ )
|
|
|
+
|
|
|
+ if total_catalog_sections > 0 and matched_section_count == 0:
|
|
|
+ return chapters
|
|
|
|
|
|
- 策略:
|
|
|
- 1. 找到表格Y坐标所在的页面
|
|
|
- 2. 在该页面的所有小节中,找到表格Y坐标介于哪两个文本块之间
|
|
|
- 3. 将OCR块插入到正确位置
|
|
|
+ if matched_chapter_count == 0 and matched_section_count == 0:
|
|
|
+ return chapters
|
|
|
+
|
|
|
+ return rebuilt or chapters
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _normalize_heading_key(text: str) -> str:
|
|
|
+ normalized = PdfStructureExtractor._strip_catalog_page_suffix((text or "").strip())
|
|
|
+ normalized = normalized.replace("【", "[").replace("】", "]")
|
|
|
+ normalized = normalized.replace("(", "(").replace(")", ")")
|
|
|
+ normalized = normalized.replace(".", ".").replace("。", ".")
|
|
|
+ normalized = re.sub(r"\s+", "", normalized)
|
|
|
+ return normalized
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _safe_page_number(value: Any, default: int = 1) -> int:
|
|
|
+ try:
|
|
|
+ return max(1, int(str(value).strip()))
|
|
|
+ except Exception:
|
|
|
+ return default
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _empty_section_payload(page_num: int) -> Dict[str, Any]:
|
|
|
+ return {
|
|
|
+ "content": "",
|
|
|
+ "page_start": page_num,
|
|
|
+ "page_end": page_num,
|
|
|
+ }
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _prepare_page_lines(cls, text: str) -> List[str]:
|
|
|
+ """清洗页面文本行,并尝试把被换行拆开的标题重新合并。"""
|
|
|
+ raw_lines = [line.strip() for line in text.split("\n") if line.strip()]
|
|
|
+ prepared_lines: List[str] = []
|
|
|
+ index = 0
|
|
|
+
|
|
|
+ while index < len(raw_lines):
|
|
|
+ merged_line, consumed = cls._merge_heading_fragment(raw_lines, index)
|
|
|
+ if merged_line:
|
|
|
+ prepared_lines.append(merged_line)
|
|
|
+ index += consumed
|
|
|
+ continue
|
|
|
+
|
|
|
+ prepared_lines.append(raw_lines[index])
|
|
|
+ index += 1
|
|
|
+
|
|
|
+ return prepared_lines
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _merge_heading_fragment(
|
|
|
+ cls,
|
|
|
+ lines: List[str],
|
|
|
+ start_index: int,
|
|
|
+ ) -> Tuple[Optional[str], int]:
|
|
|
+ """尝试把当前位置开始的 2~3 行拼成完整标题。"""
|
|
|
+ first_line = lines[start_index].strip()
|
|
|
+ if not first_line:
|
|
|
+ return None, 1
|
|
|
+
|
|
|
+ first_is_heading = bool(cls._matching_rule_names(first_line, "l1") or cls._matching_rule_names(first_line, "l2"))
|
|
|
+ first_is_incomplete = cls._is_incomplete_heading_fragment(first_line)
|
|
|
+ max_span = min(3, len(lines) - start_index)
|
|
|
+
|
|
|
+ for span in range(2, max_span + 1):
|
|
|
+ candidate_lines = [lines[start_index + offset].strip() for offset in range(span)]
|
|
|
+ candidate_text = " ".join(candidate_lines).strip()
|
|
|
+ if not candidate_text or cls.TOC_PATTERN.search(candidate_text):
|
|
|
+ continue
|
|
|
+ if not (cls._matching_rule_names(candidate_text, "l1") or cls._matching_rule_names(candidate_text, "l2")):
|
|
|
+ continue
|
|
|
+ # 只有首行本身像“半截标题”,或者合并后明显更像标题时才吞并后续行,避免误吃正文。
|
|
|
+ if first_is_incomplete or not first_is_heading:
|
|
|
+ return candidate_text, span
|
|
|
+
|
|
|
+ return None, 1
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _is_incomplete_heading_fragment(line: str) -> bool:
|
|
|
+ clean_line = re.sub(r"\s+", "", line.strip())
|
|
|
+ if not clean_line:
|
|
|
+ return False
|
|
|
+
|
|
|
+ fragment_patterns = (
|
|
|
+ r"^第(?:\d+|[一二三四五六七八九十百零两]+)[章部分篇]$",
|
|
|
+ r"^\d{1,2}(?:[\..。、])$",
|
|
|
+ r"^[【\[]\d+[\]】]$",
|
|
|
+ r"^[一二三四五六七八九十百零两]+[、)\)\]]$",
|
|
|
+ r"^第[一二三四五六七八九十百零两]+节$",
|
|
|
+ r"^\d+\.\d+(?!\.\d)\.?$",
|
|
|
+ )
|
|
|
+ return any(re.match(pattern, clean_line) for pattern in fragment_patterns)
|
|
|
+
|
|
|
+ def _rebuild_section_contents_from_catalog(
|
|
|
+ self,
|
|
|
+ chapters: Dict[str, Dict[str, Dict[str, Any]]],
|
|
|
+ catalog: Dict[str, Any],
|
|
|
+ body_lines: List[Dict[str, Any]],
|
|
|
+ ) -> Dict[str, Dict[str, Dict[str, Any]]]:
|
|
|
+ """基于目录顺序和正文行号,重新切分 section content。
|
|
|
+
|
|
|
+ 当正文结构抽取漏掉部分标题时,直接使用结构化结果容易出现 content 缺段。
|
|
|
+ 这里把目录拍平成一条标题时间线,再在线性正文里定位这些标题,
|
|
|
+ 用“当前标题到下一个标题”之间的文本作为当前 section 的正文。
|
|
|
"""
|
|
|
- # 按页码分组OCR结果
|
|
|
- ocr_by_page: Dict[int, List[OcrResult]] = {}
|
|
|
- for result in ocr_results:
|
|
|
- if result.success:
|
|
|
- if result.page_num not in ocr_by_page:
|
|
|
- ocr_by_page[result.page_num] = []
|
|
|
- ocr_by_page[result.page_num].append(result)
|
|
|
+ catalog_chapters = catalog.get("chapters", []) if isinstance(catalog, dict) else []
|
|
|
+ if not catalog_chapters or not body_lines:
|
|
|
+ return chapters
|
|
|
+
|
|
|
+ # 先把目录展开成顺序列表,方便统一定位每个标题在正文中的起点。
|
|
|
+ expected_items: List[Dict[str, Any]] = []
|
|
|
+ total_sections = 0
|
|
|
+ for chapter in catalog_chapters:
|
|
|
+ chapter_title = (chapter.get("title", "") or "").strip()
|
|
|
+ if not chapter_title:
|
|
|
+ continue
|
|
|
+ chapter_page = self._safe_page_number(chapter.get("page"))
|
|
|
+ expected_items.append({
|
|
|
+ "kind": "chapter",
|
|
|
+ "title": chapter_title,
|
|
|
+ "chapter_title": chapter_title,
|
|
|
+ "section_title": "章节标题",
|
|
|
+ "page_hint": chapter_page,
|
|
|
+ "line_index": None,
|
|
|
+ "page": chapter_page,
|
|
|
+ })
|
|
|
+
|
|
|
+ for subsection in chapter.get("subsections", []):
|
|
|
+ section_title = (subsection.get("title", "") or "").strip()
|
|
|
+ if not section_title:
|
|
|
+ continue
|
|
|
+ total_sections += 1
|
|
|
+ expected_items.append({
|
|
|
+ "kind": "section",
|
|
|
+ "title": section_title,
|
|
|
+ "chapter_title": chapter_title,
|
|
|
+ "section_title": section_title,
|
|
|
+ "page_hint": self._safe_page_number(subsection.get("page"), chapter_page),
|
|
|
+ "line_index": None,
|
|
|
+ "page": self._safe_page_number(subsection.get("page"), chapter_page),
|
|
|
+ })
|
|
|
+
|
|
|
+ if not expected_items or total_sections == 0:
|
|
|
+ return chapters
|
|
|
+
|
|
|
+ search_start = 0
|
|
|
+ found_sections = 0
|
|
|
+ for item in expected_items:
|
|
|
+ line_index = self._find_heading_line_index(
|
|
|
+ body_lines,
|
|
|
+ item["title"],
|
|
|
+ item["kind"],
|
|
|
+ search_start,
|
|
|
+ )
|
|
|
+ item["line_index"] = line_index
|
|
|
+ if line_index is not None:
|
|
|
+ item["page"] = body_lines[line_index]["page"]
|
|
|
+ search_start = line_index + 1
|
|
|
+ if item["kind"] == "section":
|
|
|
+ found_sections += 1
|
|
|
+
|
|
|
+ if found_sections == 0:
|
|
|
+ return chapters
|
|
|
+
|
|
|
+ rebuilt: Dict[str, Dict[str, Dict[str, Any]]] = {}
|
|
|
+ section_title_key = "章节标题"
|
|
|
+
|
|
|
+ for chapter in catalog_chapters:
|
|
|
+ chapter_title = (chapter.get("title", "") or "").strip()
|
|
|
+ if not chapter_title:
|
|
|
+ continue
|
|
|
+
|
|
|
+ chapter_page = self._safe_page_number(chapter.get("page"))
|
|
|
+ existing_sections = chapters.get(chapter_title, {})
|
|
|
+ rebuilt[chapter_title] = {
|
|
|
+ section_title_key: existing_sections.get(section_title_key, self._empty_section_payload(chapter_page))
|
|
|
+ }
|
|
|
+
|
|
|
+ for subsection in chapter.get("subsections", []):
|
|
|
+ section_title = (subsection.get("title", "") or "").strip()
|
|
|
+ if not section_title:
|
|
|
+ continue
|
|
|
+ rebuilt[chapter_title][section_title] = existing_sections.get(
|
|
|
+ section_title,
|
|
|
+ self._empty_section_payload(self._safe_page_number(subsection.get("page"), chapter_page)),
|
|
|
+ )
|
|
|
|
|
|
- # 处理每个包含表格的页面
|
|
|
- for page_num, ocr_list in ocr_by_page.items():
|
|
|
- # 找到该页面涉及的所有小节
|
|
|
- page_sections = []
|
|
|
- for (chap_name, sec_name), blocks in chapter_blocks.items():
|
|
|
- # 检查该小节是否包含该页面的块
|
|
|
- page_blocks = [b for b in blocks if b["page"] == page_num]
|
|
|
- if page_blocks:
|
|
|
- page_sections.append({
|
|
|
- "chapter": chap_name,
|
|
|
- "section": sec_name,
|
|
|
- "blocks": page_blocks,
|
|
|
- "all_blocks": blocks, # 引用原列表用于插入
|
|
|
- })
|
|
|
-
|
|
|
- if not page_sections:
|
|
|
- logger.warning(f"[OCR回填] 第{page_num}页没有匹配到任何小节")
|
|
|
+ for idx, item in enumerate(expected_items):
|
|
|
+ if item["kind"] != "section" or item["line_index"] is None:
|
|
|
continue
|
|
|
|
|
|
- # 处理每个OCR结果
|
|
|
- for ocr_result in sorted(ocr_list, key=lambda r: r.bbox[1]):
|
|
|
- table_y_top = ocr_result.bbox[1]
|
|
|
- table_y_bottom = ocr_result.bbox[3]
|
|
|
- ocr_text = ocr_result.text
|
|
|
-
|
|
|
- # 构造表格块
|
|
|
- table_block = {
|
|
|
- "text": ocr_text,
|
|
|
- "page": page_num,
|
|
|
- "bbox": ocr_result.bbox,
|
|
|
- "y_center": (table_y_top + table_y_bottom) / 2,
|
|
|
- "type": "table"
|
|
|
- }
|
|
|
+ # 下一个已定位标题就是当前 section 的右边界;没有下一个则取到文末。
|
|
|
+ next_heading_index = len(body_lines)
|
|
|
+ for later in expected_items[idx + 1:]:
|
|
|
+ if later["line_index"] is not None:
|
|
|
+ next_heading_index = later["line_index"]
|
|
|
+ break
|
|
|
|
|
|
- # 找到目标小节
|
|
|
- target_section = None
|
|
|
- insert_index = -1
|
|
|
+ content_entries = body_lines[item["line_index"] + 1:next_heading_index]
|
|
|
+ content_text = "\n".join(entry["text"] for entry in content_entries).strip()
|
|
|
+ existing_payload = rebuilt[item["chapter_title"]].get(item["section_title"], {})
|
|
|
|
|
|
- for ps in page_sections:
|
|
|
- # 获取该小节在该页面的所有块,按Y坐标排序
|
|
|
- page_blocks = sorted(ps["blocks"], key=lambda b: b["bbox"][1])
|
|
|
+ if not content_text and (existing_payload.get("content") or "").strip():
|
|
|
+ continue
|
|
|
|
|
|
- if not page_blocks:
|
|
|
- continue
|
|
|
+ if content_entries:
|
|
|
+ page_start = content_entries[0]["page"]
|
|
|
+ page_end = content_entries[-1]["page"]
|
|
|
+ else:
|
|
|
+ page_start = item["page"]
|
|
|
+ page_end = item["page"]
|
|
|
|
|
|
- # 找到表格应该插入的位置
|
|
|
- # 策略:表格上边界位于哪个块之后
|
|
|
- found = False
|
|
|
- for i, block in enumerate(page_blocks):
|
|
|
- block_y_bottom = block["bbox"][3]
|
|
|
- if i < len(page_blocks) - 1:
|
|
|
- next_y_top = page_blocks[i + 1]["bbox"][1]
|
|
|
- else:
|
|
|
- next_y_top = float('inf')
|
|
|
-
|
|
|
- # 如果表格位于当前块之后,且在下一块之前
|
|
|
- if block_y_bottom <= table_y_top < next_y_top:
|
|
|
- # 找到在原列表中的位置
|
|
|
- try:
|
|
|
- insert_index = ps["all_blocks"].index(block) + 1
|
|
|
- target_section = ps
|
|
|
- found = True
|
|
|
- break
|
|
|
- except ValueError:
|
|
|
- continue
|
|
|
-
|
|
|
- # 如果表格在所有块之前
|
|
|
- if not found and table_y_top < page_blocks[0]["bbox"][1]:
|
|
|
- try:
|
|
|
- insert_index = ps["all_blocks"].index(page_blocks[0])
|
|
|
- target_section = ps
|
|
|
- found = True
|
|
|
- except ValueError:
|
|
|
- continue
|
|
|
+ rebuilt[item["chapter_title"]][item["section_title"]] = {
|
|
|
+ "content": content_text,
|
|
|
+ "page_start": page_start,
|
|
|
+ "page_end": page_end,
|
|
|
+ }
|
|
|
|
|
|
- # 如果表格在所有块之后
|
|
|
- if not found and table_y_bottom > page_blocks[-1]["bbox"][3]:
|
|
|
- try:
|
|
|
- insert_index = ps["all_blocks"].index(page_blocks[-1]) + 1
|
|
|
- target_section = ps
|
|
|
- found = True
|
|
|
- except ValueError:
|
|
|
- continue
|
|
|
+ return rebuilt or chapters
|
|
|
|
|
|
- if found:
|
|
|
- break
|
|
|
+ def _find_heading_line_index(
|
|
|
+ self,
|
|
|
+ body_lines: List[Dict[str, Any]],
|
|
|
+ target_title: str,
|
|
|
+ heading_kind: str,
|
|
|
+ start_index: int,
|
|
|
+ ) -> Optional[int]:
|
|
|
+ """在线性正文中查找目标标题行。
|
|
|
+
|
|
|
+ 先做归一化后的精确匹配;若 OCR / PDF 抽取给标题前面带了噪声前缀,
|
|
|
+ 再退一步做“候选行后缀等于目标标题”的宽松匹配。
|
|
|
+ """
|
|
|
+ target_key = self._normalize_heading_key(target_title)
|
|
|
+ if not target_key:
|
|
|
+ return None
|
|
|
|
|
|
- # 执行插入
|
|
|
- if target_section and insert_index >= 0:
|
|
|
- target_section["all_blocks"].insert(insert_index, table_block)
|
|
|
- logger.debug(
|
|
|
- f"[OCR回填] 第{page_num}页表格(Y={table_y_top:.0f}) -> "
|
|
|
- f"{target_section['chapter']}/{target_section['section']} 位置{insert_index}"
|
|
|
- )
|
|
|
+ for index in range(start_index, len(body_lines)):
|
|
|
+ candidate_text = (body_lines[index].get("text") or "").strip()
|
|
|
+ if not candidate_text or self.TOC_PATTERN.search(candidate_text):
|
|
|
+ continue
|
|
|
+
|
|
|
+ if heading_kind == "chapter":
|
|
|
+ candidate_key = self._normalize_heading_key(self._clean_chapter_title(candidate_text))
|
|
|
+ else:
|
|
|
+ candidate_key = self._normalize_heading_key(self._clean_section_title(candidate_text))
|
|
|
+
|
|
|
+ if candidate_key == target_key:
|
|
|
+ return index
|
|
|
+
|
|
|
+ raw_candidate_key = self._normalize_heading_key(candidate_text)
|
|
|
+ # 某些 PDF 会把页码、序号或残余字符拼到标题前面,这里允许有限前缀噪声。
|
|
|
+ if raw_candidate_key.endswith(target_key):
|
|
|
+ prefix = raw_candidate_key[:-len(target_key)]
|
|
|
+ if not prefix or re.fullmatch(
|
|
|
+ r"[\dA-Za-z\.\-_/|,:;()\[\]\u3001\u3002\uff0c\uff1a\uff1b\uff08\uff09\u3010\u3011]+",
|
|
|
+ prefix,
|
|
|
+ ):
|
|
|
+ return index
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+ def _process_ocr_concurrent(self, regions: List[TableRegion], progress_callback=None) -> List[OcrResult]:
|
|
|
+ """同步并发处理 OCR,具体实现委托给 OcrProcessor。"""
|
|
|
+ if self.ocr_processor is None:
|
|
|
+ return []
|
|
|
+
|
|
|
+ if not progress_callback:
|
|
|
+ return self.ocr_processor.process_ocr_concurrent(regions)
|
|
|
+
|
|
|
+ def _progress_adapter(completed: int, total: int):
|
|
|
+ progress = 35 + int(completed / total * 15) if total else 50
|
|
|
+ progress_callback("版面分析", progress, f"OCR识别中 {completed}/{total}")
|
|
|
+
|
|
|
+ return self.ocr_processor.process_ocr_concurrent(
|
|
|
+ regions,
|
|
|
+ progress_callback=_progress_adapter,
|
|
|
+ )
|
|
|
+
|
|
|
+ def _detect_table_regions(
|
|
|
+ self,
|
|
|
+ page: fitz.Page,
|
|
|
+ page_num: int,
|
|
|
+ clip_box: fitz.Rect
|
|
|
+ ) -> List[Tuple[Tuple[float, float, float, float], float]]:
|
|
|
+ """检测页面中的表格区域,具体实现委托给 OcrProcessor。"""
|
|
|
+ if self.ocr_processor is None:
|
|
|
+ return []
|
|
|
+ return self.ocr_processor.detect_table_regions(page, page_num, clip_box)
|
|
|
+
|
|
|
+ def _ocr_table_region(self, page: fitz.Page, bbox: Tuple[float, float, float, float], max_retries: int = 3) -> str:
|
|
|
+ """对指定区域进行 OCR 识别,具体实现委托给 OcrProcessor。"""
|
|
|
+ if self.ocr_processor is None:
|
|
|
+ raise RuntimeError("OCR processor is not initialized")
|
|
|
+ return self.ocr_processor._ocr_table_region(page, bbox, max_retries=max_retries)
|
|
|
+
|
|
|
+ def _replace_table_regions(
|
|
|
+ self,
|
|
|
+ page: fitz.Page,
|
|
|
+ original_text: str,
|
|
|
+ ocr_results: List[Dict],
|
|
|
+ clip_box: fitz.Rect
|
|
|
+ ) -> str:
|
|
|
+ """用 OCR 结果替换原始文本中的表格区域。"""
|
|
|
+ if self.ocr_processor is None:
|
|
|
+ return original_text
|
|
|
+ if not ocr_results:
|
|
|
+ return original_text
|
|
|
+
|
|
|
+ # 这里保留章节提取场景的兼容逻辑:
|
|
|
+ # 1. 标题块不参与表格替换,避免目录/章节标题被表格框误吞;
|
|
|
+ # 2. 仅替换真正落入表格区域的正文块,保留表格前后的普通文本;
|
|
|
+ # 3. OCR 返回空时退回原始 PDF 文本,避免整块内容被清空。
|
|
|
+ text_blocks = []
|
|
|
+ for block in page.get_text("blocks"):
|
|
|
+ x0, y0, x1, y1, text, _, _ = block
|
|
|
+ if y0 >= clip_box.y0 and y1 <= clip_box.y1:
|
|
|
+ text_blocks.append({
|
|
|
+ "bbox": (x0, y0, x1, y1),
|
|
|
+ "text": text.strip(),
|
|
|
+ })
|
|
|
+
|
|
|
+ text_blocks.sort(key=lambda b: (b["bbox"][1], b["bbox"][0]))
|
|
|
+
|
|
|
+ if not text_blocks:
|
|
|
+ return original_text
|
|
|
+
|
|
|
+ region_entries: List[Dict[str, Any]] = []
|
|
|
+ replaced_indices: Set[int] = set()
|
|
|
+
|
|
|
+ for ocr_result in sorted(ocr_results, key=lambda r: r["bbox"][1]):
|
|
|
+ rx0, ry0, rx1, ry1 = ocr_result["bbox"]
|
|
|
+ current_indices: List[int] = []
|
|
|
+
|
|
|
+ for idx, block in enumerate(text_blocks):
|
|
|
+ if idx in replaced_indices:
|
|
|
+ continue
|
|
|
+ if self._block_contains_heading(block["text"]):
|
|
|
+ continue
|
|
|
+
|
|
|
+ bx0, by0, bx1, by1 = block["bbox"]
|
|
|
+ overlap_x = max(0, min(bx1, rx1) - max(bx0, rx0))
|
|
|
+ overlap_y = max(0, min(by1, ry1) - max(by0, ry0))
|
|
|
+ overlap_area = overlap_x * overlap_y
|
|
|
+ block_area = max((bx1 - bx0) * (by1 - by0), 1)
|
|
|
+
|
|
|
+ if overlap_area / block_area > 0.5:
|
|
|
+ current_indices.append(idx)
|
|
|
+
|
|
|
+ if not current_indices:
|
|
|
+ continue
|
|
|
+
|
|
|
+ replaced_indices.update(current_indices)
|
|
|
+ region_entries.append({
|
|
|
+ "start": min(current_indices),
|
|
|
+ "end": max(current_indices),
|
|
|
+ "ocr_text": (ocr_result.get("ocr_text") or "").strip(),
|
|
|
+ })
|
|
|
+
|
|
|
+ if not region_entries:
|
|
|
+ return original_text
|
|
|
+
|
|
|
+ region_by_start = {entry["start"]: entry for entry in region_entries}
|
|
|
+ result_parts: List[str] = []
|
|
|
+ idx = 0
|
|
|
+
|
|
|
+ while idx < len(text_blocks):
|
|
|
+ region = region_by_start.get(idx)
|
|
|
+ if region is not None:
|
|
|
+ if region["ocr_text"]:
|
|
|
+ result_parts.append(region["ocr_text"])
|
|
|
+ result_parts.append("\n")
|
|
|
else:
|
|
|
- # 兜底:追加到该页面第一个小节末尾
|
|
|
- if page_sections:
|
|
|
- ps = page_sections[0]
|
|
|
- ps["all_blocks"].append(table_block)
|
|
|
- logger.warning(
|
|
|
- f"[OCR回填] 第{page_num}页表格无法精确定位,追加到 {ps['chapter']}/{ps['section']}"
|
|
|
- )
|
|
|
+ for block_idx in range(region["start"], region["end"] + 1):
|
|
|
+ block_text = text_blocks[block_idx]["text"]
|
|
|
+ if block_text:
|
|
|
+ result_parts.append(block_text)
|
|
|
+ result_parts.append("\n")
|
|
|
+ idx = region["end"] + 1
|
|
|
+ continue
|
|
|
+
|
|
|
+ if idx not in replaced_indices:
|
|
|
+ block_text = text_blocks[idx]["text"]
|
|
|
+ if block_text:
|
|
|
+ result_parts.append(block_text)
|
|
|
+ result_parts.append("\n")
|
|
|
+ idx += 1
|
|
|
+
|
|
|
+ return "".join(result_parts).strip() or original_text
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _block_contains_heading(cls, text: str) -> bool:
|
|
|
+ if not text or not text.strip():
|
|
|
+ return False
|
|
|
+
|
|
|
+ for line in cls._prepare_page_lines(text):
|
|
|
+ stripped = line.strip()
|
|
|
+ if not stripped:
|
|
|
+ continue
|
|
|
+ if cls._matching_rule_names(stripped, "l1") or cls._matching_rule_names(stripped, "l2"):
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _compress_image(self, img_bytes: bytes) -> bytes:
|
|
|
+ """压缩图片,具体实现委托给 OcrProcessor。"""
|
|
|
+ if self.ocr_processor is None:
|
|
|
+ return img_bytes
|
|
|
+ return self.ocr_processor._compress_image(img_bytes)
|
|
|
+
|
|
|
+ def _extract_ocr_content(self, result: Dict) -> str:
|
|
|
+ """从 OCR 响应提取内容,具体实现委托给 OcrProcessor。"""
|
|
|
+ if self.ocr_processor is None:
|
|
|
+ return ""
|
|
|
+ return self.ocr_processor._extract_ocr_content(result)
|
|
|
|
|
|
@staticmethod
|
|
|
def _is_header_footer(line: str) -> bool:
|
|
|
+ compact_line = re.sub(r"\s+", "", line.strip())
|
|
|
+ if not compact_line:
|
|
|
+ return False
|
|
|
+
|
|
|
+ heading_prefix = re.match(
|
|
|
+ r"^(第[\d一二三四五六七八九十百零两]+[章节部分篇]|[\d]+\.\d+|[\d]+[\..。、]?|[一二三四五六七八九十百零两]+[、)\)\]]|第[一二三四五六七八九十百零两]+节|【\d+】)",
|
|
|
+ compact_line,
|
|
|
+ )
|
|
|
+
|
|
|
+ if compact_line.isdigit():
|
|
|
+ return True
|
|
|
+
|
|
|
+ if (
|
|
|
+ compact_line.endswith("有限责任公司")
|
|
|
+ or compact_line.endswith("有限公司")
|
|
|
+ or compact_line.endswith("股份有限公司")
|
|
|
+ ) and not heading_prefix:
|
|
|
+ return True
|
|
|
+
|
|
|
+ if compact_line.endswith("专项施工方案") and not heading_prefix:
|
|
|
+ return True
|
|
|
+
|
|
|
return (
|
|
|
"四川路桥建设集团股份有限公司" in line
|
|
|
or "T梁运输及安装专项施工方案" in line
|
|
|
- or line.isdigit()
|
|
|
+ or (
|
|
|
+ compact_line.endswith("工程项目")
|
|
|
+ and len(compact_line) >= 8
|
|
|
+ and not compact_line.startswith("第")
|
|
|
+ )
|
|
|
)
|
|
|
|
|
|
+ @classmethod
|
|
|
+ def _matching_rule_names(
|
|
|
+ cls,
|
|
|
+ line: str,
|
|
|
+ level: str,
|
|
|
+ rule_names: Optional[List[str]] = None,
|
|
|
+ ) -> List[str]:
|
|
|
+ clean_line = line.strip()
|
|
|
+ if level == "l1":
|
|
|
+ clean_line = cls._strip_leading_page_number_from_cn_chapter(clean_line)
|
|
|
+ names = rule_names or list(cls.RULE_LIB.keys())
|
|
|
+ return [
|
|
|
+ rule_name
|
|
|
+ for rule_name in names
|
|
|
+ if cls.RULE_LIB[rule_name][level].match(clean_line)
|
|
|
+ ]
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _matches_chapter_heading(cls, line: str, rule_names: Optional[List[str]] = None) -> bool:
|
|
|
+ return bool(cls._matching_rule_names(line, "l1", rule_names))
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _matches_section_heading(cls, line: str, rule_names: Optional[List[str]] = None) -> bool:
|
|
|
+ return bool(cls._matching_rule_names(line, "l2", rule_names))
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _strip_leading_page_number_from_cn_chapter(line: str) -> str:
|
|
|
+ cleaned = re.sub(r"\s+", " ", line.strip())
|
|
|
+ if not cleaned:
|
|
|
+ return ""
|
|
|
+
|
|
|
+ return re.sub(
|
|
|
+ r"^\d{1,3}\s+(?=第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章部分篇])",
|
|
|
+ "",
|
|
|
+ cleaned,
|
|
|
+ count=1,
|
|
|
+ ).strip()
|
|
|
+
|
|
|
@staticmethod
|
|
|
def _clean_chapter_title(line: str) -> str:
|
|
|
- chapter_match = re.search(r"第[一二三四五六七八九十百]+章", line)
|
|
|
- if not chapter_match:
|
|
|
- return line.strip()
|
|
|
-
|
|
|
- prefix = chapter_match.group(0)
|
|
|
- remaining = line[chapter_match.end() :].strip()
|
|
|
- remaining = re.sub(r"^[\.\s]+", "", remaining)
|
|
|
- remaining = re.sub(r"\s+\d+\s*$", "", remaining)
|
|
|
- remaining = re.sub(r"[\._\-]{3,}[^\u4e00-\u9fa5a-zA-Z0-9]*", "", remaining)
|
|
|
-
|
|
|
- if remaining:
|
|
|
- return f"{prefix} {remaining}"
|
|
|
- return prefix
|
|
|
+ cleaned = PdfStructureExtractor._strip_leading_page_number_from_cn_chapter(line)
|
|
|
+ cleaned = PdfStructureExtractor._strip_catalog_page_suffix(cleaned)
|
|
|
+ cleaned = re.sub(r"\s+\d+\s*$", "", cleaned)
|
|
|
+ cleaned = re.sub(r"[\._\-]{3,}[^\u4e00-\u9fa5a-zA-Z0-9]*$", "", cleaned)
|
|
|
+ cleaned = re.sub(r"\s+", " ", cleaned).strip()
|
|
|
+
|
|
|
+ cn_chapter_match = re.match(
|
|
|
+ r"^(第\s*(?:\d+|[一二三四五六七八九十百零两]+)\s*[章部分篇])[\s、::\.-]*(.*)$",
|
|
|
+ cleaned,
|
|
|
+ )
|
|
|
+ if cn_chapter_match:
|
|
|
+ prefix = re.sub(r"\s+", "", cn_chapter_match.group(1))
|
|
|
+ title = cn_chapter_match.group(2).strip()
|
|
|
+ return f"{prefix} {title}".strip()
|
|
|
+
|
|
|
+ num_chapter_match = re.match(r"^(\d{1,2})(?:[\..。、])?\s*(.*)$", cleaned)
|
|
|
+ if num_chapter_match:
|
|
|
+ prefix = num_chapter_match.group(1)
|
|
|
+ title = num_chapter_match.group(2).strip()
|
|
|
+ return f"{prefix} {title}".strip()
|
|
|
+
|
|
|
+ return cleaned
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _clean_section_title(line: str) -> str:
|
|
|
+ cleaned = line.strip()
|
|
|
+ cleaned = PdfStructureExtractor._strip_catalog_page_suffix(cleaned)
|
|
|
+ cleaned = re.sub(r"\s+\d+\s*$", "", cleaned)
|
|
|
+ cleaned = re.sub(r"[\._\-]{3,}[^\u4e00-\u9fa5a-zA-Z0-9]*$", "", cleaned)
|
|
|
+ cleaned = re.sub(r"\s+", " ", cleaned).strip()
|
|
|
+
|
|
|
+ numeric_section_match = re.match(r"^(\d+\.\d+)(?!\.\d)\.?\s*(.*)$", cleaned)
|
|
|
+ if numeric_section_match:
|
|
|
+ prefix = numeric_section_match.group(1)
|
|
|
+ title = numeric_section_match.group(2).strip()
|
|
|
+ return f"{prefix} {title}".strip()
|
|
|
+
|
|
|
+ numeric_list_match = re.match(r"^(\d{1,2})(?:[、\.\uFF0E\u3002\)\]\uFF09])\s*(.*)$", cleaned)
|
|
|
+ if numeric_list_match:
|
|
|
+ prefix = numeric_list_match.group(1)
|
|
|
+ title = numeric_list_match.group(2).strip()
|
|
|
+ return f"{prefix} {title}".strip()
|
|
|
+
|
|
|
+ cn_section_match = re.match(r"^(第\s*[一二三四五六七八九十百零两]+\s*节)[\s、::\.-]*(.*)$", cleaned)
|
|
|
+ if cn_section_match:
|
|
|
+ prefix = re.sub(r"\s+", "", cn_section_match.group(1))
|
|
|
+ title = cn_section_match.group(2).strip()
|
|
|
+ return f"{prefix} {title}".strip()
|
|
|
+
|
|
|
+ cn_list_match = re.match(r"^([一二三四五六七八九十百零两]+[、)\)\]])[\s]*(.*)$", cleaned)
|
|
|
+ if cn_list_match:
|
|
|
+ prefix = cn_list_match.group(1).strip()
|
|
|
+ title = cn_list_match.group(2).strip()
|
|
|
+ return f"{prefix} {title}".strip()
|
|
|
+
|
|
|
+ bracket_match = re.match(r"^([【\[]\s*\d+\s*[\]】])[\s]*(.*)$", cleaned)
|
|
|
+ if bracket_match:
|
|
|
+ prefix = re.sub(r"\s+", "", bracket_match.group(1))
|
|
|
+ title = bracket_match.group(2).strip()
|
|
|
+ return f"{prefix} {title}".strip()
|
|
|
+
|
|
|
+ return cleaned
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _strip_catalog_page_suffix(text: str) -> str:
|
|
|
+ cleaned = re.sub(r"\s+", " ", (text or "").strip())
|
|
|
+ if not cleaned:
|
|
|
+ return ""
|
|
|
+
|
|
|
+ return re.sub(
|
|
|
+ r"(?:[.\u2026\u00b7\u2022·• ]{2,})[-\u2013\u2014 ]*\d+\s*[-\u2013\u2014 ]*$",
|
|
|
+ "",
|
|
|
+ cleaned,
|
|
|
+ ).strip()
|