|
|
@@ -1,16 +1,10 @@
|
|
|
"""
|
|
|
-PDF 全文提取实现(Celery 安全版)
|
|
|
-- 强制单进程(Celery Worker 层负责多任务并发)
|
|
|
-- 避免多进程嵌套导致的死锁和资源竞争
|
|
|
-- 使用正则表达式优化页眉页脚过滤
|
|
|
+PDF 全文提取实现
|
|
|
"""
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
import io
|
|
|
-import os
|
|
|
-import re
|
|
|
-import sys
|
|
|
from typing import Any, Dict, List, Tuple
|
|
|
|
|
|
import fitz # PyMuPDF
|
|
|
@@ -18,326 +12,274 @@ import fitz # PyMuPDF
|
|
|
from ..config.provider import default_config_provider
|
|
|
from ..interfaces import DocumentSource, FullTextExtractor
|
|
|
|
|
|
-# 预编译正则表达式缓存
|
|
|
-_SPACE_PATTERN_CACHE: Dict[int, re.Pattern] = {}
|
|
|
-
|
|
|
-
|
|
|
-def _get_space_pattern(threshold: int) -> re.Pattern:
|
|
|
- """获取预编译的空格匹配正则表达式。"""
|
|
|
- if threshold not in _SPACE_PATTERN_CACHE:
|
|
|
- _SPACE_PATTERN_CACHE[threshold] = re.compile(rf" {{{threshold},}}")
|
|
|
- return _SPACE_PATTERN_CACHE[threshold]
|
|
|
-
|
|
|
-
|
|
|
-def _is_running_in_celery() -> bool:
|
|
|
- """
|
|
|
- 检测当前是否在 Celery Worker 进程中运行。
|
|
|
-
|
|
|
- 使用简单可靠的启发式方法,避免导入 celery 模块(会触发初始化)。
|
|
|
-
|
|
|
- Returns:
|
|
|
- True 如果在 Celery worker 进程中,否则 False
|
|
|
- """
|
|
|
- # 1. 检测 Celery worker 特定的环境变量(最可靠的标志)
|
|
|
- # CELERY_WORKER_NAME 和 CELERY_WORKER_HOST 是 Celery worker 启动时设置的环境变量
|
|
|
- if os.environ.get('CELERY_WORKER_NAME') or os.environ.get('CELERY_WORKER_HOST'):
|
|
|
- return True
|
|
|
-
|
|
|
- # 2. 检测进程名特征
|
|
|
- # Celery 进程名通常以 'celery' 开头(如 celery, celery.exe)
|
|
|
- process_name = sys.argv[0] if sys.argv else ''
|
|
|
- base_name = os.path.basename(process_name).lower()
|
|
|
- if base_name.startswith('celery') and not base_name.endswith('.py'):
|
|
|
- return True
|
|
|
-
|
|
|
- # 3. 检测命令行参数
|
|
|
- # Celery worker 启动时命令行包含 'celery' 和 'worker' 或 '-P prefork'
|
|
|
- cmd_line = sys.argv if sys.argv else []
|
|
|
- cmd_str = ' '.join(cmd_line).lower()
|
|
|
- has_celery = 'celery' in cmd_str
|
|
|
- has_worker = 'worker' in cmd_str or 'beat' in cmd_str
|
|
|
- # 排除 Python 脚本直接运行的情况(如 python test_celery_xxx.py)
|
|
|
- is_script = base_name.endswith('.py')
|
|
|
- if has_celery and has_worker and not is_script:
|
|
|
- return True
|
|
|
-
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-def _should_use_parallel_extraction() -> bool:
|
|
|
- """
|
|
|
- 判断是否可以使用多进程并行提取PDF。
|
|
|
-
|
|
|
- 策略:
|
|
|
- - 所有平台都强制单进程
|
|
|
-
|
|
|
- 原因:
|
|
|
- 1. 系统完全基于 Celery 进行多任务管理,Celery Worker 层已经实现了多进程并发
|
|
|
- 2. PDF 提取层如果再用多进程,会导致多进程嵌套,引发:
|
|
|
- - 死锁风险
|
|
|
- - 数据库连接池耗尽
|
|
|
- - AI 模型重复加载,内存爆炸
|
|
|
- 3. Windows 平台 fork 机制不完善,多进程问题更严重
|
|
|
-
|
|
|
- Returns:
|
|
|
- False 始终使用单进程(Celery 层负责多任务并发)
|
|
|
- """
|
|
|
- # 系统基于 Celery 管理多任务,PDF 提取始终单进程
|
|
|
- # Celery Worker 层已经实现了多进程并发处理多个审查任务
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-def _process_page_worker(
|
|
|
- args: Tuple[int, bytes | str, int, int, str]
|
|
|
-) -> Dict[str, Any]:
|
|
|
- """
|
|
|
- 处理单个页面的工作函数。
|
|
|
-
|
|
|
- Args:
|
|
|
- args: (page_num, doc_source, doc_is_bytes, header_space_threshold, source_file)
|
|
|
-
|
|
|
- Returns:
|
|
|
- 页面数据字典
|
|
|
- """
|
|
|
- page_num, doc_source, doc_is_bytes, header_space_threshold, source_file = args
|
|
|
-
|
|
|
- try:
|
|
|
- # 打开文档进行处理
|
|
|
- if doc_is_bytes:
|
|
|
- doc = fitz.open(stream=doc_source)
|
|
|
- else:
|
|
|
- doc = fitz.open(doc_source)
|
|
|
-
|
|
|
- try:
|
|
|
- page = doc[page_num]
|
|
|
- # 提取文本(含表格占位符)
|
|
|
- text = _extract_text_with_table_placeholders(page)
|
|
|
- # 过滤页眉页脚
|
|
|
- text = _filter_header_footer(text, header_space_threshold)
|
|
|
-
|
|
|
- return {
|
|
|
- "page_num": page_num + 1,
|
|
|
- "text": text,
|
|
|
- "source_file": source_file,
|
|
|
- }
|
|
|
- finally:
|
|
|
- doc.close()
|
|
|
- except Exception as e:
|
|
|
- print(f" 警告: 处理第 {page_num + 1} 页时出错: {e}")
|
|
|
- return {
|
|
|
- "page_num": page_num + 1,
|
|
|
- "text": "",
|
|
|
- "source_file": source_file,
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-def _extract_text_with_table_placeholders(page: fitz.Page) -> str:
|
|
|
- """提取页面文本,将表格部分用 <表格></表格> 标签替换。"""
|
|
|
- # 获取页面中所有表格的边界框
|
|
|
- table_bboxes = _get_table_bboxes(page)
|
|
|
-
|
|
|
- # 如果没有表格,直接使用普通文本提取
|
|
|
- if not table_bboxes:
|
|
|
- return page.get_text()
|
|
|
-
|
|
|
- # 获取带位置信息的文本
|
|
|
- text_dict = page.get_text("dict")
|
|
|
-
|
|
|
- # 收集所有元素(文本块和表格),按 y 坐标排序
|
|
|
- elements = []
|
|
|
-
|
|
|
- # 添加表格标记
|
|
|
- for table_bbox in table_bboxes:
|
|
|
- elements.append({
|
|
|
- "type": "table",
|
|
|
- "y": table_bbox[1],
|
|
|
- "bbox": table_bbox,
|
|
|
- })
|
|
|
-
|
|
|
- # 处理文本块
|
|
|
- for block in text_dict.get("blocks", []):
|
|
|
- if "lines" not in block:
|
|
|
- continue
|
|
|
-
|
|
|
- block_bbox = block["bbox"]
|
|
|
-
|
|
|
- # 检查是否在表格区域内
|
|
|
- if not _is_in_table_region(block_bbox, table_bboxes):
|
|
|
- block_text = ""
|
|
|
- for line in block["lines"]:
|
|
|
- line_text = ""
|
|
|
- for span in line["spans"]:
|
|
|
- line_text += span["text"]
|
|
|
- if line_text.strip():
|
|
|
- block_text += line_text + "\n"
|
|
|
-
|
|
|
- if block_text.strip():
|
|
|
- elements.append({
|
|
|
- "type": "text",
|
|
|
- "y": block_bbox[1],
|
|
|
- "text": block_text.strip(),
|
|
|
- })
|
|
|
-
|
|
|
- # 按 y 坐标排序
|
|
|
- elements.sort(key=lambda x: x["y"])
|
|
|
-
|
|
|
- # 构建页面文本
|
|
|
- page_text_parts = []
|
|
|
- last_was_table = False
|
|
|
-
|
|
|
- for element in elements:
|
|
|
- if element["type"] == "table":
|
|
|
- if not last_was_table:
|
|
|
- page_text_parts.append("<表格></表格>")
|
|
|
- last_was_table = True
|
|
|
- else:
|
|
|
- page_text_parts.append(element["text"])
|
|
|
- last_was_table = False
|
|
|
-
|
|
|
- return "\n".join(page_text_parts).strip()
|
|
|
-
|
|
|
-
|
|
|
-def _get_table_bboxes(page: fitz.Page) -> List[Tuple[float, float, float, float]]:
|
|
|
- """获取页面中所有表格的边界框。"""
|
|
|
- table_bboxes = []
|
|
|
- try:
|
|
|
- tables = page.find_tables()
|
|
|
- for table in tables:
|
|
|
- table_bboxes.append(table.bbox)
|
|
|
- except Exception:
|
|
|
- pass
|
|
|
- return table_bboxes
|
|
|
-
|
|
|
-
|
|
|
-def _is_in_table_region(
|
|
|
- bbox: Tuple[float, float, float, float],
|
|
|
- table_bboxes: List[Tuple[float, float, float, float]],
|
|
|
- overlap_threshold: float = 0.5,
|
|
|
-) -> bool:
|
|
|
- """判断文本块是否在表格区域内。"""
|
|
|
- x0, y0, x1, y1 = bbox
|
|
|
- text_area = (x1 - x0) * (y1 - y0)
|
|
|
-
|
|
|
- for table_bbox in table_bboxes:
|
|
|
- tx0, ty0, tx1, ty1 = table_bbox
|
|
|
-
|
|
|
- overlap_x0 = max(x0, tx0)
|
|
|
- overlap_y0 = max(y0, ty0)
|
|
|
- overlap_x1 = min(x1, tx1)
|
|
|
- overlap_y1 = min(y1, ty1)
|
|
|
-
|
|
|
- if overlap_x0 < overlap_x1 and overlap_y0 < overlap_y1:
|
|
|
- overlap_area = (overlap_x1 - overlap_x0) * (overlap_y1 - overlap_y0)
|
|
|
- overlap_ratio = overlap_area / text_area if text_area > 0 else 0
|
|
|
-
|
|
|
- if overlap_ratio >= overlap_threshold:
|
|
|
- return True
|
|
|
-
|
|
|
- center_x = (x0 + x1) / 2
|
|
|
- center_y = (y0 + y1) / 2
|
|
|
- if _point_in_bbox((center_x, center_y), table_bbox):
|
|
|
- return True
|
|
|
-
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-def _point_in_bbox(
|
|
|
- point: Tuple[float, float], bbox: Tuple[float, float, float, float]
|
|
|
-) -> bool:
|
|
|
- """判断点是否在边界框内。"""
|
|
|
- x, y = point
|
|
|
- x0, y0, x1, y1 = bbox
|
|
|
- return x0 <= x <= x1 and y0 <= y <= y1
|
|
|
-
|
|
|
-
|
|
|
-def _filter_header_footer(text: str, header_space_threshold: int) -> str:
|
|
|
- """过滤页眉页脚(正则表达式优化版)。"""
|
|
|
- lines = text.split("\n")
|
|
|
-
|
|
|
- if len(lines) <= 1:
|
|
|
- return text
|
|
|
-
|
|
|
- # 使用预编译的正则表达式匹配连续空格
|
|
|
- space_pattern = _get_space_pattern(header_space_threshold)
|
|
|
-
|
|
|
- # 过滤页眉
|
|
|
- filtered_lines = [
|
|
|
- line for line in lines
|
|
|
- if not space_pattern.search(line)
|
|
|
- ]
|
|
|
-
|
|
|
- # 过滤页脚(删除最后一行)
|
|
|
- if len(filtered_lines) > 0:
|
|
|
- filtered_lines.pop()
|
|
|
-
|
|
|
- return "\n".join(filtered_lines)
|
|
|
-
|
|
|
|
|
|
class PdfFullTextExtractor(FullTextExtractor):
|
|
|
- """
|
|
|
- 按页提取 PDF 全文内容。
|
|
|
-
|
|
|
- 并发策略:
|
|
|
- - 强制单进程(Celery Worker 层已负责多任务并发)
|
|
|
- - 避免多进程嵌套导致的死锁和资源竞争
|
|
|
- """
|
|
|
+ """按页提取 PDF 全文内容。"""
|
|
|
|
|
|
def __init__(self) -> None:
|
|
|
self._cfg = default_config_provider
|
|
|
- self._use_parallel = _should_use_parallel_extraction() # 始终返回 False
|
|
|
|
|
|
def extract_full_text(self, source: DocumentSource) -> List[Dict[str, Any]]:
|
|
|
- """提取PDF全文,使用单进程模式(Celery层负责多任务并发)。"""
|
|
|
- # 获取配置
|
|
|
- header_space_threshold = int(self._cfg.get("header_footer_filter.header_space_threshold", 20))
|
|
|
-
|
|
|
- # 准备文档数据
|
|
|
if source.content is not None:
|
|
|
- doc_data = source.content
|
|
|
- doc_is_bytes = True
|
|
|
+ doc = fitz.open(stream=io.BytesIO(source.content))
|
|
|
source_file = "bytes_stream"
|
|
|
elif source.path is not None:
|
|
|
- doc_data = str(source.path)
|
|
|
- doc_is_bytes = False
|
|
|
+ doc = fitz.open(source.path)
|
|
|
source_file = str(source.path)
|
|
|
else:
|
|
|
raise ValueError("DocumentSource 既没有 path 也没有 content")
|
|
|
|
|
|
- # 先获取总页数
|
|
|
- if doc_is_bytes:
|
|
|
- temp_doc = fitz.open(stream=io.BytesIO(doc_data))
|
|
|
- else:
|
|
|
- temp_doc = fitz.open(doc_data)
|
|
|
- total_pages = len(temp_doc)
|
|
|
- temp_doc.close()
|
|
|
+ pages: List[Dict[str, Any]] = []
|
|
|
+ current_pos = 0
|
|
|
+ try:
|
|
|
+ for page_num in range(len(doc)):
|
|
|
+ page = doc[page_num]
|
|
|
+ # # 提取文本,表格部分用 <表格></表格> 标签替换
|
|
|
+ text = self._extract_text_with_table_placeholders(page)
|
|
|
+ # 过滤页眉页脚
|
|
|
+ text = self._filter_header_footer(text)
|
|
|
+ pages.append(
|
|
|
+ {
|
|
|
+ "page_num": page_num + 1,
|
|
|
+ "text": text,
|
|
|
+ "start_pos": current_pos,
|
|
|
+ "end_pos": current_pos + len(text),
|
|
|
+ "source_file": source_file,
|
|
|
+ }
|
|
|
+ )
|
|
|
+ current_pos += len(text)
|
|
|
+ finally:
|
|
|
+ doc.close()
|
|
|
|
|
|
- # 单进程提取PDF页面
|
|
|
- pages = self._extract_sequential(
|
|
|
- doc_data, doc_is_bytes, total_pages, header_space_threshold, source_file
|
|
|
+ return pages
|
|
|
+
|
|
|
+ def _filter_header_footer(self, text: str) -> str:
|
|
|
+ """
|
|
|
+ 过滤页眉页脚
|
|
|
+
|
|
|
+ 过滤规则:
|
|
|
+ 1. 页眉:检测连续空格,检测到就删掉这行
|
|
|
+ 2. 页脚:每页的最后一行,删掉每页的最后一行
|
|
|
+ """
|
|
|
+ # 获取配置
|
|
|
+ header_space_threshold = self._cfg.get(
|
|
|
+ "header_footer_filter.header_space_threshold", 20
|
|
|
)
|
|
|
|
|
|
- # 按页码排序并计算位置
|
|
|
- pages.sort(key=lambda x: x["page_num"])
|
|
|
- current_pos = 0
|
|
|
- for page in pages:
|
|
|
- page["start_pos"] = current_pos
|
|
|
- current_pos += len(page["text"])
|
|
|
- page["end_pos"] = current_pos
|
|
|
+ lines = text.split("\n")
|
|
|
+
|
|
|
+ # 如果只有一行或没有行,直接返回
|
|
|
+ if len(lines) <= 1:
|
|
|
+ return text
|
|
|
+
|
|
|
+ # 第一步:过滤页眉(连续空格超过阈值的行)
|
|
|
+ filtered_lines: List[str] = []
|
|
|
+ for line in lines:
|
|
|
+ # 统计连续空格的最大长度
|
|
|
+ max_consecutive_spaces = 0
|
|
|
+ current_spaces = 0
|
|
|
+ for char in line:
|
|
|
+ if char == " ":
|
|
|
+ current_spaces += 1
|
|
|
+ max_consecutive_spaces = max(max_consecutive_spaces, current_spaces)
|
|
|
+ else:
|
|
|
+ current_spaces = 0
|
|
|
+
|
|
|
+ # 如果连续空格数超过阈值,认为是页眉行,跳过
|
|
|
+ if max_consecutive_spaces >= header_space_threshold:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 保留非页眉行
|
|
|
+ filtered_lines.append(line)
|
|
|
+
|
|
|
+ # 第二步:过滤页脚(删除最后一行)
|
|
|
+ if len(filtered_lines) > 0:
|
|
|
+ filtered_lines.pop() # 删除最后一行
|
|
|
|
|
|
- return pages
|
|
|
+ return "\n".join(filtered_lines)
|
|
|
+
|
|
|
+ def _count_chinese_chars(self, text: str) -> int:
|
|
|
+ """
|
|
|
+ 统计文本中的中文字符数(不含转义字符)
|
|
|
+
|
|
|
+ 中文字符范围:\u4e00-\u9fff
|
|
|
+ """
|
|
|
+ count = 0
|
|
|
+ for char in text:
|
|
|
+ # 判断是否是中文字符
|
|
|
+ if "\u4e00" <= char <= "\u9fff":
|
|
|
+ count += 1
|
|
|
+ return count
|
|
|
+
|
|
|
+ def _get_table_bboxes(self, page: fitz.Page) -> List[Tuple[float, float, float, float]]:
|
|
|
+ """
|
|
|
+ 获取页面中所有表格的边界框。
|
|
|
+
|
|
|
+ Args:
|
|
|
+ page: PyMuPDF 页面对象
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 表格边界框列表,每个边界框为 (x0, y0, x1, y1)
|
|
|
+ """
|
|
|
+ table_bboxes = []
|
|
|
+
|
|
|
+ try:
|
|
|
+ tables = page.find_tables()
|
|
|
+ for table in tables:
|
|
|
+ # 获取表格的边界框
|
|
|
+ bbox = table.bbox
|
|
|
+ table_bboxes.append(bbox)
|
|
|
+ except AttributeError:
|
|
|
+ # 如果 find_tables 方法不存在,说明 PyMuPDF 版本太低
|
|
|
+ # 这种情况下不提取表格,只返回空列表
|
|
|
+ pass
|
|
|
+ except Exception:
|
|
|
+ # 表格识别失败,静默处理,继续提取文本
|
|
|
+ pass
|
|
|
+
|
|
|
+ return table_bboxes
|
|
|
|
|
|
- def _extract_sequential(
|
|
|
+ def _point_in_bbox(
|
|
|
+ self, point: Tuple[float, float], bbox: Tuple[float, float, float, float]
|
|
|
+ ) -> bool:
|
|
|
+ """
|
|
|
+ 判断点是否在边界框内。
|
|
|
+
|
|
|
+ Args:
|
|
|
+ point: (x, y) 坐标
|
|
|
+ bbox: (x0, y0, x1, y1) 边界框
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 如果点在边界框内返回 True,否则返回 False
|
|
|
+ """
|
|
|
+ x, y = point
|
|
|
+ x0, y0, x1, y1 = bbox
|
|
|
+ return x0 <= x <= x1 and y0 <= y <= y1
|
|
|
+
|
|
|
+ def _is_in_table_region(
|
|
|
self,
|
|
|
- doc_data: bytes | str,
|
|
|
- doc_is_bytes: bool,
|
|
|
- total_pages: int,
|
|
|
- header_space_threshold: int,
|
|
|
- source_file: str,
|
|
|
- ) -> List[Dict[str, Any]]:
|
|
|
- """串行提取页面文本。"""
|
|
|
- pages: List[Dict[str, Any]] = []
|
|
|
- for page_num in range(total_pages):
|
|
|
- args = (page_num, doc_data, doc_is_bytes, header_space_threshold, source_file)
|
|
|
- page_data = _process_page_worker(args)
|
|
|
- pages.append(page_data)
|
|
|
- return pages
|
|
|
+ bbox: Tuple[float, float, float, float],
|
|
|
+ table_bboxes: List[Tuple[float, float, float, float]],
|
|
|
+ overlap_threshold: float = 0.5,
|
|
|
+ ) -> bool:
|
|
|
+ """
|
|
|
+ 判断文本块是否在表格区域内。
|
|
|
+
|
|
|
+ Args:
|
|
|
+ bbox: 文本块的边界框 (x0, y0, x1, y1)
|
|
|
+ table_bboxes: 表格边界框列表
|
|
|
+ overlap_threshold: 重叠阈值,如果文本块与表格的重叠面积超过这个比例,认为在表格内
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 如果文本块在表格区域内返回 True,否则返回 False
|
|
|
+ """
|
|
|
+ x0, y0, x1, y1 = bbox
|
|
|
+ text_area = (x1 - x0) * (y1 - y0)
|
|
|
+
|
|
|
+ for table_bbox in table_bboxes:
|
|
|
+ tx0, ty0, tx1, ty1 = table_bbox
|
|
|
+
|
|
|
+ # 计算重叠区域
|
|
|
+ overlap_x0 = max(x0, tx0)
|
|
|
+ overlap_y0 = max(y0, ty0)
|
|
|
+ overlap_x1 = min(x1, tx1)
|
|
|
+ overlap_y1 = min(y1, ty1)
|
|
|
+
|
|
|
+ if overlap_x0 < overlap_x1 and overlap_y0 < overlap_y1:
|
|
|
+ # 有重叠
|
|
|
+ overlap_area = (overlap_x1 - overlap_x0) * (overlap_y1 - overlap_y0)
|
|
|
+ overlap_ratio = overlap_area / text_area if text_area > 0 else 0
|
|
|
+
|
|
|
+ # 如果重叠比例超过阈值,或者文本块的中心点在表格内,认为在表格区域
|
|
|
+ if overlap_ratio >= overlap_threshold:
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 检查文本块中心点是否在表格内
|
|
|
+ center_x = (x0 + x1) / 2
|
|
|
+ center_y = (y0 + y1) / 2
|
|
|
+ if self._point_in_bbox((center_x, center_y), table_bbox):
|
|
|
+ return True
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _extract_text_with_table_placeholders(self, page: fitz.Page) -> str:
|
|
|
+ """
|
|
|
+ 提取页面文本,将表格部分用 <表格></表格> 标签替换。
|
|
|
+
|
|
|
+ Args:
|
|
|
+ page: PyMuPDF 页面对象
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 提取的文本内容,表格部分用 <表格></表格> 标签替换
|
|
|
+ """
|
|
|
+ # 获取页面中所有表格的边界框
|
|
|
+ table_bboxes = self._get_table_bboxes(page)
|
|
|
+
|
|
|
+ # 如果没有表格,直接使用普通文本提取
|
|
|
+ if not table_bboxes:
|
|
|
+ return page.get_text()
|
|
|
+
|
|
|
+ # 获取带位置信息的文本
|
|
|
+ text_dict = page.get_text("dict")
|
|
|
+
|
|
|
+ # 收集所有元素(文本块和表格),按 y 坐标排序
|
|
|
+ elements = []
|
|
|
+
|
|
|
+ # 添加表格标记
|
|
|
+ for table_bbox in table_bboxes:
|
|
|
+ elements.append(
|
|
|
+ {
|
|
|
+ "type": "table",
|
|
|
+ "y": table_bbox[1], # 使用 y0 作为排序依据
|
|
|
+ "bbox": table_bbox,
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ # 处理文本块
|
|
|
+ for block in text_dict.get("blocks", []):
|
|
|
+ if "lines" not in block: # 跳过非文本块(如图片)
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 获取文本块的边界框
|
|
|
+ block_bbox = block["bbox"]
|
|
|
+
|
|
|
+ # 检查是否在表格区域内
|
|
|
+ if not self._is_in_table_region(block_bbox, table_bboxes):
|
|
|
+ # 不在表格区域内,提取文本
|
|
|
+ block_text = ""
|
|
|
+ for line in block["lines"]:
|
|
|
+ line_text = ""
|
|
|
+ for span in line["spans"]:
|
|
|
+ line_text += span["text"]
|
|
|
+ if line_text.strip():
|
|
|
+ block_text += line_text + "\n"
|
|
|
+
|
|
|
+ if block_text.strip():
|
|
|
+ elements.append(
|
|
|
+ {
|
|
|
+ "type": "text",
|
|
|
+ "y": block_bbox[1],
|
|
|
+ "text": block_text.strip(),
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ # 按 y 坐标排序
|
|
|
+ elements.sort(key=lambda x: x["y"])
|
|
|
+
|
|
|
+ # 构建页面文本
|
|
|
+ page_text_parts = []
|
|
|
+ last_was_table = False
|
|
|
+
|
|
|
+ for element in elements:
|
|
|
+ if element["type"] == "table":
|
|
|
+ if not last_was_table:
|
|
|
+ page_text_parts.append("<表格></表格>")
|
|
|
+ last_was_table = True
|
|
|
+ else:
|
|
|
+ page_text_parts.append(element["text"])
|
|
|
+ last_was_table = False
|
|
|
+
|
|
|
+ return "\n".join(page_text_parts).strip()
|
|
|
+
|
|
|
+
|
|
|
+
|