|
@@ -0,0 +1,549 @@
|
|
|
|
|
+"""
|
|
|
|
|
+YOLO 目录页检测与 OCR 提取模块
|
|
|
|
|
+
|
|
|
|
|
+用于在文档处理流程早期检测目录页并提取目录内容,
|
|
|
|
|
+输出结构与 outline 保持一致,便于后续进行目录完整性检查。
|
|
|
|
|
+"""
|
|
|
|
|
+
|
|
|
|
|
+import io
|
|
|
|
|
+import os
|
|
|
|
|
+import re
|
|
|
|
|
+from dataclasses import dataclass
|
|
|
|
|
+from typing import Dict, Any, List, Optional, Tuple
|
|
|
|
|
+from pathlib import Path
|
|
|
|
|
+
|
|
|
|
|
+import fitz
|
|
|
|
|
+import numpy as np
|
|
|
|
|
+
|
|
|
|
|
+from foundation.observability.logger.loggering import review_logger as logger
|
|
|
|
|
+
|
|
|
|
|
+# 尝试导入 YOLO 相关库
|
|
|
|
|
+try:
|
|
|
|
|
+ from ultralytics import YOLO
|
|
|
|
|
+ YOLO_AVAILABLE = True
|
|
|
|
|
+except ImportError:
|
|
|
|
|
+ YOLO_AVAILABLE = False
|
|
|
|
|
+
|
|
|
|
|
+try:
|
|
|
|
|
+ from PIL import Image
|
|
|
|
|
+ PIL_AVAILABLE = True
|
|
|
|
|
+except ImportError:
|
|
|
|
|
+ PIL_AVAILABLE = False
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@dataclass
|
|
|
|
|
+class CatalogItem:
|
|
|
|
|
+ """目录项结构"""
|
|
|
|
|
+ index: int # 章节序号(1-based)
|
|
|
|
|
+ title: str # 章节标题
|
|
|
|
|
+ page: str # 页码(字符串)
|
|
|
|
|
+ original: str # 原始文本
|
|
|
|
|
+ level: int = 1 # 层级(1=章,2=节)
|
|
|
|
|
+ parent_title: str = "" # 父章节标题(用于二级)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@dataclass
|
|
|
|
|
+class CatalogSection:
|
|
|
|
|
+ """目录节结构(对应二级目录)"""
|
|
|
|
|
+ title: str
|
|
|
|
|
+ page: str
|
|
|
|
|
+ level: int
|
|
|
|
|
+ original: str
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@dataclass
|
|
|
|
|
+class CatalogChapter:
|
|
|
|
|
+ """目录章结构(对应一级目录)"""
|
|
|
|
|
+ index: int
|
|
|
|
|
+ title: str
|
|
|
|
|
+ page: str
|
|
|
|
|
+ original: str
|
|
|
|
|
+ subsections: List[CatalogSection]
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class TOCCatalogExtractor:
|
|
|
|
|
+ """
|
|
|
|
|
+ 目录页检测与内容提取器
|
|
|
|
|
+
|
|
|
|
|
+ 使用 YOLO 模型检测目录页,使用 GLM-OCR 提取目录文本,
|
|
|
|
|
+ 解析为结构化数据,输出格式与 outline 保持一致。
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ # YOLO 配置
|
|
|
|
|
+ DEFAULT_MODEL_PATH = "config/yolo/best.pt"
|
|
|
|
|
+ CONF_THRESHOLD = 0.25
|
|
|
|
|
+ MAX_CHECK_PAGES = 50
|
|
|
|
|
+ DPI = 150
|
|
|
|
|
+
|
|
|
|
|
+ # OCR 配置
|
|
|
|
|
+ OCR_DPI = 200
|
|
|
|
|
+ MAX_SHORT_EDGE = 1024
|
|
|
|
|
+ JPEG_QUALITY = 90
|
|
|
|
|
+
|
|
|
|
|
+ def __init__(
|
|
|
|
|
+ self,
|
|
|
|
|
+ model_path: str = None,
|
|
|
|
|
+ ocr_api_url: str = "http://183.220.37.46:25429/v1/chat/completions",
|
|
|
|
|
+ ocr_api_key: str = "",
|
|
|
|
|
+ ocr_timeout: int = 600,
|
|
|
|
|
+ ):
|
|
|
|
|
+ self.model_path = model_path or self.DEFAULT_MODEL_PATH
|
|
|
|
|
+ self.ocr_api_url = ocr_api_url
|
|
|
|
|
+ self.ocr_api_key = ocr_api_key
|
|
|
|
|
+ self.ocr_timeout = ocr_timeout
|
|
|
|
|
+
|
|
|
|
|
+ self._model = None
|
|
|
|
|
+ self._yolo_available = YOLO_AVAILABLE and PIL_AVAILABLE
|
|
|
|
|
+
|
|
|
|
|
+ def _load_model(self) -> bool:
|
|
|
|
|
+ """加载 YOLO 模型"""
|
|
|
|
|
+ if not self._yolo_available:
|
|
|
|
|
+ logger.debug("[TOC检测] YOLO库未安装,跳过目录检测")
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ if not os.path.exists(self.model_path):
|
|
|
|
|
+ logger.debug(f"[TOC检测] 模型文件不存在: {self.model_path}")
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ if self._model is None:
|
|
|
|
|
+ try:
|
|
|
|
|
+ logger.info(f"[TOC检测] 正在加载YOLO模型: {self.model_path}")
|
|
|
|
|
+ self._model = YOLO(self.model_path)
|
|
|
|
|
+ return True
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.warning(f"[TOC检测] 模型加载失败: {e}")
|
|
|
|
|
+ return False
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ def detect_and_extract(
|
|
|
|
|
+ self,
|
|
|
|
|
+ file_content: bytes,
|
|
|
|
|
+ progress_callback=None
|
|
|
|
|
+ ) -> Optional[Dict[str, Any]]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 检测目录页并提取目录内容
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ file_content: PDF文件字节流
|
|
|
|
|
+ progress_callback: 进度回调函数
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 目录结构字典,格式与 outline 保持一致:
|
|
|
|
|
+ {
|
|
|
|
|
+ "chapters": [...],
|
|
|
|
|
+ "total_chapters": N
|
|
|
|
|
+ }
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self._load_model():
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ doc = fitz.open(stream=file_content)
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 1. 检测目录页范围
|
|
|
|
|
+ toc_pages = self._detect_toc_pages(doc, progress_callback)
|
|
|
|
|
+ if not toc_pages:
|
|
|
|
|
+ logger.info("[TOC检测] 未检测到目录页")
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"[TOC检测] 检测到目录页: 第{toc_pages[0]+1}页 - 第{toc_pages[-1]+1}页")
|
|
|
|
|
+
|
|
|
|
|
+ # 2. OCR 提取目录页内容
|
|
|
|
|
+ if progress_callback:
|
|
|
|
|
+ progress_callback("目录识别", 10, f"检测到{len(toc_pages)}页目录,开始OCR识别...")
|
|
|
|
|
+
|
|
|
|
|
+ toc_text = self._ocr_toc_pages(doc, toc_pages, progress_callback)
|
|
|
|
|
+
|
|
|
|
|
+ if not toc_text:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ # 3. 解析目录文本为结构化数据
|
|
|
|
|
+ if progress_callback:
|
|
|
|
|
+ progress_callback("目录识别", 80, "解析目录结构...")
|
|
|
|
|
+
|
|
|
|
|
+ catalog = self._parse_toc_text(toc_text)
|
|
|
|
|
+
|
|
|
|
|
+ if progress_callback:
|
|
|
|
|
+ progress_callback("目录识别", 100, f"目录提取完成,共{catalog['total_chapters']}章")
|
|
|
|
|
+
|
|
|
|
|
+ return catalog
|
|
|
|
|
+
|
|
|
|
|
+ finally:
|
|
|
|
|
+ doc.close()
|
|
|
|
|
+
|
|
|
|
|
+ def _detect_toc_pages(
|
|
|
|
|
+ self,
|
|
|
|
|
+ doc: fitz.Document,
|
|
|
|
|
+ progress_callback=None
|
|
|
|
|
+ ) -> List[int]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 使用 YOLO 检测目录页范围
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 目录页索引列表(0-based)
|
|
|
|
|
+ """
|
|
|
|
|
+ toc_pages = []
|
|
|
|
|
+ total_pages = len(doc)
|
|
|
|
|
+ pages_to_check = min(total_pages, self.MAX_CHECK_PAGES)
|
|
|
|
|
+
|
|
|
|
|
+ for page_idx in range(pages_to_check):
|
|
|
|
|
+ page = doc.load_page(page_idx)
|
|
|
|
|
+
|
|
|
|
|
+ # 渲染页面
|
|
|
|
|
+ zoom = self.DPI / 72
|
|
|
|
|
+ mat = fitz.Matrix(zoom, zoom)
|
|
|
|
|
+ pix = page.get_pixmap(matrix=mat)
|
|
|
|
|
+
|
|
|
|
|
+ # 转换为 numpy 数组
|
|
|
|
|
+ img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
|
|
|
|
|
+ img_array = np.array(img)
|
|
|
|
|
+
|
|
|
|
|
+ # YOLO 检测
|
|
|
|
|
+ results = self._model(img_array, conf=self.CONF_THRESHOLD, verbose=False)
|
|
|
|
|
+
|
|
|
|
|
+ # 检查是否检测到 catalogs 类别
|
|
|
|
|
+ has_catalogs = False
|
|
|
|
|
+ for result in results:
|
|
|
|
|
+ if result.boxes is not None:
|
|
|
|
|
+ for box in result.boxes:
|
|
|
|
|
+ cls_id = int(box.cls.item())
|
|
|
|
|
+ class_name = self._model.names.get(cls_id, f"class_{cls_id}")
|
|
|
|
|
+ if class_name == 'catalogs':
|
|
|
|
|
+ has_catalogs = True
|
|
|
|
|
+ break
|
|
|
|
|
+ if has_catalogs:
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ if has_catalogs:
|
|
|
|
|
+ toc_pages.append(page_idx)
|
|
|
|
|
+ logger.debug(f" 第{page_idx + 1:3d}页: 检测到目录")
|
|
|
|
|
+ else:
|
|
|
|
|
+ logger.debug(f" 第{page_idx + 1:3d}页: 未检测到目录")
|
|
|
|
|
+ # 如果已经检测到目录,且现在没有检测到,认为目录结束
|
|
|
|
|
+ if toc_pages:
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ if progress_callback and (page_idx + 1) % 5 == 0:
|
|
|
|
|
+ progress = int((page_idx + 1) / pages_to_check * 10)
|
|
|
|
|
+ progress_callback("目录识别", progress, f"扫描页面 {page_idx + 1}/{pages_to_check}")
|
|
|
|
|
+
|
|
|
|
|
+ return toc_pages
|
|
|
|
|
+
|
|
|
|
|
+ def _ocr_toc_pages(
|
|
|
|
|
+ self,
|
|
|
|
|
+ doc: fitz.Document,
|
|
|
|
|
+ toc_pages: List[int],
|
|
|
|
|
+ progress_callback=None
|
|
|
|
|
+ ) -> str:
|
|
|
|
|
+ """
|
|
|
|
|
+ 对目录页进行 OCR 识别
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 合并后的目录文本
|
|
|
|
|
+ """
|
|
|
|
|
+ import base64
|
|
|
|
|
+ import io
|
|
|
|
|
+ import requests
|
|
|
|
|
+ import time
|
|
|
|
|
+
|
|
|
|
|
+ all_texts = []
|
|
|
|
|
+ total = len(toc_pages)
|
|
|
|
|
+
|
|
|
|
|
+ for idx, page_idx in enumerate(toc_pages):
|
|
|
|
|
+ page = doc.load_page(page_idx)
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 渲染页面
|
|
|
|
|
+ pix = page.get_pixmap(dpi=self.OCR_DPI)
|
|
|
|
|
+ img_bytes = pix.tobytes("jpeg")
|
|
|
|
|
+
|
|
|
|
|
+ # 压缩图片
|
|
|
|
|
+ compressed = self._compress_image(img_bytes)
|
|
|
|
|
+ img_base64 = base64.b64encode(compressed).decode('utf-8')
|
|
|
|
|
+
|
|
|
|
|
+ # 请求 OCR
|
|
|
|
|
+ payload = {
|
|
|
|
|
+ "model": "GLM-OCR",
|
|
|
|
|
+ "messages": [
|
|
|
|
|
+ {
|
|
|
|
|
+ "role": "user",
|
|
|
|
|
+ "content": [
|
|
|
|
|
+ {
|
|
|
|
|
+ "type": "text",
|
|
|
|
|
+ "text": "这是一份施工方案文档的目录页。请识别并提取目录内容,按原文格式输出。"
|
|
|
|
|
+ "注意:"
|
|
|
|
|
+ "1. 保留章节层级关系(章/节)"
|
|
|
|
|
+ "2. 保留页码信息"
|
|
|
|
|
+ "3. 只输出目录内容,不要其他说明"
|
|
|
|
|
+ },
|
|
|
|
|
+ {
|
|
|
|
|
+ "type": "image_url",
|
|
|
|
|
+ "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"}
|
|
|
|
|
+ }
|
|
|
|
|
+ ]
|
|
|
|
|
+ }
|
|
|
|
|
+ ],
|
|
|
|
|
+ "max_tokens": 4096,
|
|
|
|
|
+ "temperature": 0.1
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ headers = {"Content-Type": "application/json"}
|
|
|
|
|
+ if self.ocr_api_key:
|
|
|
|
|
+ headers["Authorization"] = f"Bearer {self.ocr_api_key}"
|
|
|
|
|
+
|
|
|
|
|
+ # 指数退避重试
|
|
|
|
|
+ max_retries = 3
|
|
|
|
|
+ for attempt in range(max_retries):
|
|
|
|
|
+ try:
|
|
|
|
|
+ response = requests.post(
|
|
|
|
|
+ self.ocr_api_url,
|
|
|
|
|
+ headers=headers,
|
|
|
|
|
+ json=payload,
|
|
|
|
|
+ timeout=self.ocr_timeout
|
|
|
|
|
+ )
|
|
|
|
|
+ response.raise_for_status()
|
|
|
|
|
+ result = response.json()
|
|
|
|
|
+
|
|
|
|
|
+ content = ""
|
|
|
|
|
+ if "choices" in result and result["choices"]:
|
|
|
|
|
+ content = result["choices"][0].get("message", {}).get("content", "")
|
|
|
|
|
+
|
|
|
|
|
+ if content:
|
|
|
|
|
+ all_texts.append(content)
|
|
|
|
|
+ logger.debug(f" 第{page_idx + 1}页目录OCR成功")
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ if attempt < max_retries - 1:
|
|
|
|
|
+ wait_time = 2 ** (attempt + 1)
|
|
|
|
|
+ logger.warning(f" 第{page_idx + 1}页目录OCR失败,{wait_time}秒后重试...")
|
|
|
|
|
+ time.sleep(wait_time)
|
|
|
|
|
+ else:
|
|
|
|
|
+ logger.error(f" 第{page_idx + 1}页目录OCR最终失败: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ if progress_callback:
|
|
|
|
|
+ progress = 10 + int((idx + 1) / total * 60)
|
|
|
|
|
+ progress_callback("目录识别", progress, f"OCR识别中 {idx + 1}/{total}")
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f" 第{page_idx + 1}页OCR处理出错: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ return "\n".join(all_texts)
|
|
|
|
|
+
|
|
|
|
|
+ def _compress_image(self, img_bytes: bytes) -> bytes:
|
|
|
|
|
+ """压缩图片"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ from PIL import Image
|
|
|
|
|
+ img = Image.open(io.BytesIO(img_bytes))
|
|
|
|
|
+
|
|
|
|
|
+ if img.mode in ('RGBA', 'LA', 'P'):
|
|
|
|
|
+ background = Image.new('RGB', img.size, (255, 255, 255))
|
|
|
|
|
+ if img.mode == 'P':
|
|
|
|
|
+ img = img.convert('RGBA')
|
|
|
|
|
+ if img.mode in ('RGBA', 'LA'):
|
|
|
|
|
+ background.paste(img, mask=img.split()[-1])
|
|
|
|
|
+ img = background
|
|
|
|
|
+ elif img.mode != 'RGB':
|
|
|
|
|
+ img = img.convert('RGB')
|
|
|
|
|
+
|
|
|
|
|
+ min_edge = min(img.size)
|
|
|
|
|
+ if min_edge > self.MAX_SHORT_EDGE:
|
|
|
|
|
+ ratio = self.MAX_SHORT_EDGE / min_edge
|
|
|
|
|
+ new_size = (int(img.width * ratio), int(img.height * ratio))
|
|
|
|
|
+ img = img.resize(new_size, Image.Resampling.LANCZOS)
|
|
|
|
|
+
|
|
|
|
|
+ buffer = io.BytesIO()
|
|
|
|
|
+ img.save(buffer, format='JPEG', quality=self.JPEG_QUALITY, optimize=True)
|
|
|
|
|
+ return buffer.getvalue()
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.warning(f"[TOC检测] 图片压缩失败,使用原图: {e}")
|
|
|
|
|
+ return img_bytes
|
|
|
|
|
+
|
|
|
|
|
+ def _parse_toc_text(self, text: str) -> Dict[str, Any]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 解析目录文本为结构化数据
|
|
|
|
|
+
|
|
|
|
|
+ 支持格式:
|
|
|
|
|
+ - 第一章 XXX...................1
|
|
|
|
|
+ - 一、XXX......................2
|
|
|
|
|
+ - 1. XXX ......................3
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ {"chapters": [...], "total_chapters": N}
|
|
|
|
|
+ """
|
|
|
|
|
+ lines = text.strip().split('\n')
|
|
|
|
|
+ chapters = []
|
|
|
|
|
+ current_chapter = None
|
|
|
|
|
+
|
|
|
|
|
+ # 正则表达式模式
|
|
|
|
|
+ chapter_pattern = re.compile(
|
|
|
|
|
+ r'第\s*([一二三四五六七八九十百0-9]+)\s*章\s*[\s\.]*(.+?)\s*[\.\s]*(\d+)\s*$',
|
|
|
|
|
+ re.IGNORECASE
|
|
|
|
|
+ )
|
|
|
|
|
+ section_pattern = re.compile(
|
|
|
|
|
+ r'([一二三四五六七八九十]+)\s*[、\.\s]+\s*(.+?)\s*[\.\s]*(\d+)\s*$'
|
|
|
|
|
+ )
|
|
|
|
|
+ generic_pattern = re.compile(
|
|
|
|
|
+ r'([0-9]+)[\.\s]+(.+?)\s*[\.\s]+(\d+)\s*$'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ for line in lines:
|
|
|
|
|
+ line = line.strip()
|
|
|
|
|
+ if not line or len(line) < 3:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 移除 Markdown 表格符号
|
|
|
|
|
+ line = re.sub(r'^[\|\s]+|[\|\s]+$', '', line)
|
|
|
|
|
+ line = line.replace('|', ' ')
|
|
|
|
|
+
|
|
|
|
|
+ # 尝试匹配章
|
|
|
|
|
+ chapter_match = chapter_pattern.search(line)
|
|
|
|
|
+ if chapter_match:
|
|
|
|
|
+ chapter_num = chapter_match.group(1)
|
|
|
|
|
+ title = chapter_match.group(2).strip()
|
|
|
|
|
+ page = chapter_match.group(3).strip()
|
|
|
|
|
+
|
|
|
|
|
+ # 保存上一个章
|
|
|
|
|
+ if current_chapter:
|
|
|
|
|
+ chapters.append(current_chapter)
|
|
|
|
|
+
|
|
|
|
|
+ current_chapter = {
|
|
|
|
|
+ "index": self._chinese_to_number(chapter_num) if not chapter_num.isdigit() else int(chapter_num),
|
|
|
|
|
+ "title": f"第{chapter_num}章 {title}",
|
|
|
|
|
+ "page": page,
|
|
|
|
|
+ "original": line,
|
|
|
|
|
+ "subsections": []
|
|
|
|
|
+ }
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 尝试匹配节(二级)
|
|
|
|
|
+ section_match = section_pattern.search(line)
|
|
|
|
|
+ if section_match and current_chapter:
|
|
|
|
|
+ section_num = section_match.group(1)
|
|
|
|
|
+ title = section_match.group(2).strip()
|
|
|
|
|
+ page = section_match.group(3).strip()
|
|
|
|
|
+
|
|
|
|
|
+ current_chapter["subsections"].append({
|
|
|
|
|
+ "title": f"{section_num}、{title}",
|
|
|
|
|
+ "page": page,
|
|
|
|
|
+ "level": 2,
|
|
|
|
|
+ "original": line
|
|
|
|
|
+ })
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 尝试通用匹配(数字开头)
|
|
|
|
|
+ generic_match = generic_pattern.search(line)
|
|
|
|
|
+ if generic_match and current_chapter:
|
|
|
|
|
+ title = generic_match.group(2).strip()
|
|
|
|
|
+ page = generic_match.group(3).strip()
|
|
|
|
|
+
|
|
|
|
|
+ # 判断是章还是节(根据缩进或内容特征)
|
|
|
|
|
+ if any(kw in title for kw in ['编制依据', '工程概况', '施工计划', '施工工艺',
|
|
|
|
|
+ '安全保证', '质量保证', '环境保证', '人员配备',
|
|
|
|
|
+ '验收要求']):
|
|
|
|
|
+ # 可能是章标题(没有"第X章"前缀的变体)
|
|
|
|
|
+ chapters.append(current_chapter)
|
|
|
|
|
+ current_chapter = {
|
|
|
|
|
+ "index": len(chapters) + 1,
|
|
|
|
|
+ "title": title,
|
|
|
|
|
+ "page": page,
|
|
|
|
|
+ "original": line,
|
|
|
|
|
+ "subsections": []
|
|
|
|
|
+ }
|
|
|
|
|
+ else:
|
|
|
|
|
+ # 作为节
|
|
|
|
|
+ current_chapter["subsections"].append({
|
|
|
|
|
+ "title": title,
|
|
|
|
|
+ "page": page,
|
|
|
|
|
+ "level": 2,
|
|
|
|
|
+ "original": line
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ # 添加最后一个章
|
|
|
|
|
+ if current_chapter:
|
|
|
|
|
+ chapters.append(current_chapter)
|
|
|
|
|
+
|
|
|
|
|
+ # 如果没有匹配到章,尝试按空行或缩进分割
|
|
|
|
|
+ if not chapters and lines:
|
|
|
|
|
+ chapters = self._fallback_parse(lines)
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "chapters": chapters,
|
|
|
|
|
+ "total_chapters": len(chapters)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def _fallback_parse(self, lines: List[str]) -> List[Dict[str, Any]]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 降级解析策略:当正则无法匹配时使用启发式方法
|
|
|
|
|
+ """
|
|
|
|
|
+ chapters = []
|
|
|
|
|
+ idx = 0
|
|
|
|
|
+
|
|
|
|
|
+ for line in lines:
|
|
|
|
|
+ line = line.strip()
|
|
|
|
|
+ if not line:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 检查是否包含页码(行尾数字)
|
|
|
|
|
+ page_match = re.search(r'(\d+)\s*$', line)
|
|
|
|
|
+ if not page_match:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ page = page_match.group(1)
|
|
|
|
|
+ title = re.sub(r'[\.\s]+\d+\s*$', '', line).strip()
|
|
|
|
|
+
|
|
|
|
|
+ # 根据内容特征判断层级
|
|
|
|
|
+ is_chapter = any(kw in title for kw in ['编制依据', '工程概况', '施工计划',
|
|
|
|
|
+ '施工工艺', '安全保证', '质量保证',
|
|
|
|
|
+ '环境保证', '人员配备', '验收'])
|
|
|
|
|
+
|
|
|
|
|
+ if is_chapter or len(chapters) == 0:
|
|
|
|
|
+ idx += 1
|
|
|
|
|
+ chapters.append({
|
|
|
|
|
+ "index": idx,
|
|
|
|
|
+ "title": title,
|
|
|
|
|
+ "page": page,
|
|
|
|
|
+ "original": line,
|
|
|
|
|
+ "subsections": []
|
|
|
|
|
+ })
|
|
|
|
|
+ else:
|
|
|
|
|
+ # 作为上一章的节
|
|
|
|
|
+ if chapters:
|
|
|
|
|
+ chapters[-1]["subsections"].append({
|
|
|
|
|
+ "title": title,
|
|
|
|
|
+ "page": page,
|
|
|
|
|
+ "level": 2,
|
|
|
|
|
+ "original": line
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ return chapters
|
|
|
|
|
+
|
|
|
|
|
+ def _chinese_to_number(self, chinese: str) -> int:
|
|
|
|
|
+ """中文数字转阿拉伯数字"""
|
|
|
|
|
+ chinese_nums = {
|
|
|
|
|
+ '一': 1, '二': 2, '三': 3, '四': 4, '五': 5,
|
|
|
|
|
+ '六': 6, '七': 7, '八': 8, '九': 9, '十': 10,
|
|
|
|
|
+ '十一': 11, '十二': 12
|
|
|
|
|
+ }
|
|
|
|
|
+ return chinese_nums.get(chinese, 0)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def extract_catalog_from_pdf(
|
|
|
|
|
+ file_content: bytes,
|
|
|
|
|
+ model_path: str = None,
|
|
|
|
|
+ ocr_api_url: str = "http://183.220.37.46:25429/v1/chat/completions",
|
|
|
|
|
+ ocr_api_key: str = "",
|
|
|
|
|
+ progress_callback=None
|
|
|
|
|
+) -> Optional[Dict[str, Any]]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 便捷函数:从 PDF 提取目录结构
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ {"chapters": [...], "total_chapters": N} 或 None
|
|
|
|
|
+ """
|
|
|
|
|
+ extractor = TOCCatalogExtractor(
|
|
|
|
|
+ model_path=model_path,
|
|
|
|
|
+ ocr_api_url=ocr_api_url,
|
|
|
|
|
+ ocr_api_key=ocr_api_key
|
|
|
|
|
+ )
|
|
|
|
|
+ return extractor.detect_and_extract(file_content, progress_callback)
|