|
|
@@ -10,15 +10,15 @@ PDF 文本切分实现
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
-import re
|
|
|
from typing import Any, Dict, List
|
|
|
|
|
|
from ..config.provider import default_config_provider
|
|
|
from ..interfaces import TextSplitter
|
|
|
from ..utils.title_matcher import TitleMatcher
|
|
|
+from ..utils.text_split_support import HierarchicalChunkMixin
|
|
|
|
|
|
|
|
|
-class PdfTextSplitter(TextSplitter):
|
|
|
+class PdfTextSplitter(TextSplitter, HierarchicalChunkMixin):
|
|
|
"""按目录层级对 PDF 正文进行智能分块的实现(复刻 doc_worker 逻辑)。"""
|
|
|
|
|
|
def __init__(self) -> None:
|
|
|
@@ -314,377 +314,4 @@ class PdfTextSplitter(TextSplitter):
|
|
|
# 直接使用 TitleMatcher 的方法
|
|
|
return self._title_matcher._find_title_in_text(title, block, fuzzy_threshold)
|
|
|
|
|
|
- def _split_large_chunk(
|
|
|
- self,
|
|
|
- content: str,
|
|
|
- max_chunk_size: int,
|
|
|
- title: str,
|
|
|
- hierarchy_path: List[str] | None = None,
|
|
|
- ) -> List[Dict[str, Any]]:
|
|
|
- """
|
|
|
- 将超大块按句子级分割(保持语义完整)
|
|
|
- """
|
|
|
- # 按句子分割(中文句号、问号、感叹号、换行)
|
|
|
- sentences = re.split(r"([。!?\n])", content)
|
|
|
-
|
|
|
- # 重新组合句子和标点
|
|
|
- combined_sentences = []
|
|
|
- for i in range(0, len(sentences) - 1, 2):
|
|
|
- if i + 1 < len(sentences):
|
|
|
- combined_sentences.append(sentences[i] + sentences[i + 1])
|
|
|
- else:
|
|
|
- combined_sentences.append(sentences[i])
|
|
|
-
|
|
|
- if not combined_sentences:
|
|
|
- combined_sentences = [content]
|
|
|
-
|
|
|
- # 按max_chunk_size组合句子
|
|
|
- chunks = []
|
|
|
- current_chunk = ""
|
|
|
- current_start = 0
|
|
|
-
|
|
|
- for sentence in combined_sentences:
|
|
|
- if len(current_chunk) + len(sentence) <= max_chunk_size:
|
|
|
- current_chunk += sentence
|
|
|
- else:
|
|
|
- if current_chunk:
|
|
|
- chunk_data = {
|
|
|
- "content": current_chunk,
|
|
|
- "relative_start": current_start,
|
|
|
- "is_split": True, # 标记为分割块
|
|
|
- }
|
|
|
- if hierarchy_path is not None:
|
|
|
- chunk_data["hierarchy_path"] = hierarchy_path
|
|
|
- chunks.append(chunk_data)
|
|
|
- current_start += len(current_chunk)
|
|
|
- current_chunk = sentence
|
|
|
-
|
|
|
- # 添加最后一个块
|
|
|
- if current_chunk:
|
|
|
- chunk_data = {
|
|
|
- "content": current_chunk,
|
|
|
- "relative_start": current_start,
|
|
|
- "is_split": True,
|
|
|
- }
|
|
|
- if hierarchy_path is not None:
|
|
|
- chunk_data["hierarchy_path"] = hierarchy_path
|
|
|
- chunks.append(chunk_data)
|
|
|
-
|
|
|
- return chunks
|
|
|
-
|
|
|
- def _build_hierarchy_path_for_subtitle(
|
|
|
- self,
|
|
|
- sub_title_item: Dict[str, Any],
|
|
|
- all_toc_items: List[Dict[str, Any]],
|
|
|
- parent_title_info: Dict[str, Any],
|
|
|
- ) -> List[str]:
|
|
|
- """为子标题构建完整的层级路径"""
|
|
|
- hierarchy_path = []
|
|
|
-
|
|
|
- # 找到子标题在toc_items中的位置
|
|
|
- sub_title = sub_title_item.get("title", "")
|
|
|
- sub_title_idx = -1
|
|
|
- for idx, item in enumerate(all_toc_items):
|
|
|
- if item.get("title", "") == sub_title:
|
|
|
- sub_title_idx = idx
|
|
|
- break
|
|
|
-
|
|
|
- if sub_title_idx < 0:
|
|
|
- # 如果找不到,返回父标题->子标题
|
|
|
- return [parent_title_info["title"], sub_title]
|
|
|
-
|
|
|
- # 从子标题向前查找,找到每个层级的父级标题
|
|
|
- level_paths = {} # 存储每个层级对应的标题
|
|
|
- current_level = sub_title_item.get("level", 2)
|
|
|
-
|
|
|
- for i in range(sub_title_idx, -1, -1):
|
|
|
- item = all_toc_items[i]
|
|
|
- item_level = item.get("level", 1)
|
|
|
-
|
|
|
- if item_level <= current_level and item_level not in level_paths:
|
|
|
- level_paths[item_level] = item["title"]
|
|
|
- if item_level == 1:
|
|
|
- break
|
|
|
-
|
|
|
- # 按层级顺序构建路径(从1级到当前层级)
|
|
|
- for level in range(1, current_level + 1):
|
|
|
- if level in level_paths:
|
|
|
- hierarchy_path.append(level_paths[level])
|
|
|
-
|
|
|
- # 如果路径为空,至少包含父标题和子标题
|
|
|
- if not hierarchy_path:
|
|
|
- hierarchy_path = [parent_title_info["title"], sub_title]
|
|
|
-
|
|
|
- return hierarchy_path
|
|
|
-
|
|
|
- def _build_hierarchy_path(
|
|
|
- self, title: str, all_toc_items: List[Dict[str, Any]], target_level: int
|
|
|
- ) -> List[str]:
|
|
|
- """构建从1级到当前标题的完整层级路径"""
|
|
|
- hierarchy_path = []
|
|
|
-
|
|
|
- # 找到当前标题在目录中的位置
|
|
|
- current_item = None
|
|
|
- current_idx = -1
|
|
|
- for idx, item in enumerate(all_toc_items):
|
|
|
- if item["title"] == title:
|
|
|
- current_item = item
|
|
|
- current_idx = idx
|
|
|
- break
|
|
|
-
|
|
|
- if not current_item:
|
|
|
- # 如果找不到,返回只包含当前标题的路径
|
|
|
- return [title]
|
|
|
-
|
|
|
- current_level = current_item.get("level", target_level)
|
|
|
-
|
|
|
- # 从当前项向前查找,找到每个层级的最近父级
|
|
|
- level_paths = {} # 存储每个层级对应的标题
|
|
|
-
|
|
|
- for i in range(current_idx, -1, -1):
|
|
|
- item = all_toc_items[i]
|
|
|
- item_level = item.get("level", 1)
|
|
|
-
|
|
|
- if item_level <= current_level and item_level not in level_paths:
|
|
|
- level_paths[item_level] = item["title"]
|
|
|
- if item_level == 1:
|
|
|
- break
|
|
|
-
|
|
|
- # 按层级顺序构建路径(从1级到当前层级)
|
|
|
- for level in range(1, current_level + 1):
|
|
|
- if level in level_paths:
|
|
|
- hierarchy_path.append(level_paths[level])
|
|
|
- elif level == current_level:
|
|
|
- hierarchy_path.append(title)
|
|
|
-
|
|
|
- # 如果路径为空,至少包含当前标题
|
|
|
- if not hierarchy_path:
|
|
|
- hierarchy_path = [title]
|
|
|
-
|
|
|
- return hierarchy_path
|
|
|
-
|
|
|
- def _build_chunk_metadata(
|
|
|
- self,
|
|
|
- sub_chunk: Dict[str, Any],
|
|
|
- title_info: Dict[str, Any],
|
|
|
- start_pos: int,
|
|
|
- pages_content: List[Dict[str, Any]],
|
|
|
- i: int,
|
|
|
- j: int,
|
|
|
- chapter_classification_map: Dict[str, Dict[str, Any]] = None,
|
|
|
- ) -> Dict[str, Any]:
|
|
|
- """构建文本块的元数据"""
|
|
|
- content = sub_chunk["content"]
|
|
|
- chunk_start_pos = start_pos + sub_chunk["relative_start"]
|
|
|
- page_num = self._get_page_from_pos(chunk_start_pos, pages_content)
|
|
|
-
|
|
|
- # 构建section_label:使用完整的层级路径
|
|
|
- hierarchy_path = sub_chunk.get("hierarchy_path", [])
|
|
|
- sub_title = sub_chunk.get("sub_title", "")
|
|
|
-
|
|
|
- if hierarchy_path:
|
|
|
- section_label = "->".join(hierarchy_path)
|
|
|
- elif sub_title:
|
|
|
- section_label = f"{title_info['title']}->{sub_title}"
|
|
|
- else:
|
|
|
- section_label = title_info["title"]
|
|
|
-
|
|
|
- # 提取最低层级标题的编号
|
|
|
- if hierarchy_path:
|
|
|
- lowest_title = hierarchy_path[-1]
|
|
|
- title_number = self._extract_title_number(lowest_title)
|
|
|
- elif sub_title:
|
|
|
- title_number = self._extract_title_number(sub_title)
|
|
|
- else:
|
|
|
- title_number = self._extract_title_number(title_info["title"])
|
|
|
-
|
|
|
- # 构建chunk_id
|
|
|
- chunk_id_str = f"doc_chunk_{title_number}_{j}" if title_number else f"doc_chunk_{j}"
|
|
|
-
|
|
|
- # 获取一级目录的分类信息
|
|
|
- chapter_classification = None
|
|
|
- if chapter_classification_map:
|
|
|
- # 从hierarchy_path获取一级目录标题
|
|
|
- if hierarchy_path and len(hierarchy_path) > 0:
|
|
|
- chapter_title = hierarchy_path[0]
|
|
|
- chapter_classification = chapter_classification_map.get(chapter_title)
|
|
|
- elif not hierarchy_path:
|
|
|
- # 如果没有hierarchy_path,尝试从title_info获取
|
|
|
- chapter_title = title_info.get("title", "")
|
|
|
- chapter_classification = chapter_classification_map.get(chapter_title)
|
|
|
-
|
|
|
- chunk_data = {
|
|
|
- "file_name": "", # 由上层填充
|
|
|
- "chunk_id": chunk_id_str,
|
|
|
- "section_label": section_label,
|
|
|
- "project_plan_type": title_info.get("category_code", "other"),
|
|
|
- "chapter_classification": title_info.get("category_code", "other"),
|
|
|
- "element_tag": {
|
|
|
- "chunk_id": chunk_id_str,
|
|
|
- "page": page_num,
|
|
|
- "serial_number": title_number if title_number else str(i + 1),
|
|
|
- },
|
|
|
- "review_chunk_content": content,
|
|
|
- "_title_number": title_number,
|
|
|
- "_local_index": j,
|
|
|
- "_sort_key": chunk_start_pos,
|
|
|
- }
|
|
|
-
|
|
|
- # # 如果找到了一级目录的分类信息,添加到chunk中
|
|
|
- # if chapter_classification:
|
|
|
- # chunk_data["chapter_classification"] = chapter_classification
|
|
|
-
|
|
|
- return chunk_data
|
|
|
-
|
|
|
- def _finalize_chunk_ids(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
- """生成最终的chunk_id和serial_number"""
|
|
|
- final_chunks = []
|
|
|
-
|
|
|
- # 按 section_label 分组,为每组内的块生成递增的序号
|
|
|
- section_groups: Dict[str, int] = {} # section_label -> 当前序号
|
|
|
-
|
|
|
- for chunk in chunks:
|
|
|
- section_label = chunk.get("section_label", "")
|
|
|
-
|
|
|
- # 为当前 section_label 生成序号
|
|
|
- if section_label not in section_groups:
|
|
|
- section_groups[section_label] = 1
|
|
|
- else:
|
|
|
- section_groups[section_label] += 1
|
|
|
-
|
|
|
- local_index = section_groups[section_label]
|
|
|
-
|
|
|
- # 从section_label中提取标题路径的编号路径
|
|
|
- title_number_path = self._extract_title_number_path(section_label)
|
|
|
-
|
|
|
- # 生成chunk_id:doc_chunk_<标题路径的编号路径>_序号
|
|
|
- if title_number_path:
|
|
|
- chunk_id_str = f"doc_chunk_{title_number_path}_{local_index}"
|
|
|
- else:
|
|
|
- chunk_id_str = f"doc_chunk_{local_index}"
|
|
|
-
|
|
|
- # 从section_label中提取最底层级的编号(用于 serial_number)
|
|
|
- serial_number = self._extract_number_from_section_label(section_label)
|
|
|
-
|
|
|
- # 更新chunk数据
|
|
|
- final_chunk = {
|
|
|
- "file_name": chunk["file_name"],
|
|
|
- "chunk_id": chunk_id_str,
|
|
|
- "section_label": chunk["section_label"],
|
|
|
- "project_plan_type": chunk["project_plan_type"],
|
|
|
- "chapter_classification": chunk["chapter_classification"],
|
|
|
- "element_tag": {
|
|
|
- "chunk_id": chunk_id_str,
|
|
|
- "page": chunk["element_tag"]["page"],
|
|
|
- "serial_number": serial_number,
|
|
|
- },
|
|
|
- "review_chunk_content": chunk["review_chunk_content"],
|
|
|
- }
|
|
|
-
|
|
|
- final_chunks.append(final_chunk)
|
|
|
-
|
|
|
- return final_chunks
|
|
|
-
|
|
|
- def _get_page_from_pos(self, pos: int, pages_content: List[Dict[str, Any]]) -> int:
|
|
|
- """根据位置获取页码"""
|
|
|
- for page in pages_content:
|
|
|
- if page["start_pos"] <= pos < page["end_pos"]:
|
|
|
- return int(page["page_num"])
|
|
|
- return 1
|
|
|
-
|
|
|
- def _extract_title_number(self, title: str) -> str:
|
|
|
- """从标题中提取编号部分(支持多种格式)"""
|
|
|
- if not title:
|
|
|
- return ""
|
|
|
-
|
|
|
- # 匹配章节格式(如 第一章、第1章等)
|
|
|
- chapter_match = re.match(r"^(第[一二三四五六七八九十\d]+[章节条款部分])", title)
|
|
|
- if chapter_match:
|
|
|
- return chapter_match.group(1)
|
|
|
-
|
|
|
- # 匹配方括号数字格式(如 【1】、【2】等)
|
|
|
- bracket_match = re.match(r"^(【\d+】)", title)
|
|
|
- if bracket_match:
|
|
|
- return bracket_match.group(1)
|
|
|
-
|
|
|
- # 匹配双方括号数字格式(如 〖1.1〗、〖2.3〗等)
|
|
|
- double_bracket_match = re.match(r"^(〖\d+(?:\.\d+)*〗)", title)
|
|
|
- if double_bracket_match:
|
|
|
- return double_bracket_match.group(1)
|
|
|
-
|
|
|
- # 匹配数字编号格式(如 1.5, 1.6, 1.2.3等)
|
|
|
- number_match = re.match(r"^(\d+(?:\.\d+)*)", title)
|
|
|
- if number_match:
|
|
|
- return number_match.group(1)
|
|
|
-
|
|
|
- # 匹配中文编号格式(如 一、二、三等)
|
|
|
- chinese_match = re.match(r"^([一二三四五六七八九十]+)[、..)\)]", title)
|
|
|
- if chinese_match:
|
|
|
- return chinese_match.group(1)
|
|
|
-
|
|
|
- # 匹配圆括号编号格式(如 (1)、(一)等)
|
|
|
- paren_match = re.match(r"^([\((][一二三四五六七八九十\d]+[\))])", title)
|
|
|
- if paren_match:
|
|
|
- return paren_match.group(1)
|
|
|
-
|
|
|
- return ""
|
|
|
-
|
|
|
- def _extract_title_number_path(self, section_label: str) -> str:
|
|
|
- """从section_label中提取标题路径的编号路径"""
|
|
|
- if not section_label:
|
|
|
- return ""
|
|
|
-
|
|
|
- # 按"->"分割层级路径
|
|
|
- parts = section_label.split("->")
|
|
|
-
|
|
|
- # 提取每一层的编号
|
|
|
- number_paths = []
|
|
|
- for part in parts:
|
|
|
- part = part.strip()
|
|
|
- if part:
|
|
|
- number = self._extract_title_number(part)
|
|
|
- if number:
|
|
|
- number_paths.append(number)
|
|
|
-
|
|
|
- # 用"->"连接编号路径
|
|
|
- if number_paths:
|
|
|
- return "->".join(number_paths)
|
|
|
-
|
|
|
- return ""
|
|
|
-
|
|
|
- def _extract_number_from_section_label(self, section_label: str) -> str:
|
|
|
- """
|
|
|
- 从section_label中提取最底层级的编号
|
|
|
-
|
|
|
- 例如:
|
|
|
- "第一章 编制依据与说明->一) 编制依据" -> "一)"
|
|
|
- "第二章 工程概况->二)周边环境条件及工程地质->1、周边环境条件" -> "1"
|
|
|
- "第四章 施工工艺技术->一)主要部件说明->2、前临时支腿" -> "2"
|
|
|
- """
|
|
|
- if not section_label:
|
|
|
- return ""
|
|
|
-
|
|
|
- # 先找到最低层级部分(最后一个"->"后面的部分)
|
|
|
- if "->" in section_label:
|
|
|
- last_level_part = section_label.split("->")[-1].strip()
|
|
|
- else:
|
|
|
- last_level_part = section_label.strip()
|
|
|
-
|
|
|
- # 检查最低层级部分是否包含合并标记(" + ")
|
|
|
- if " + " in last_level_part:
|
|
|
- # 分割合并的部分
|
|
|
- merged_parts = last_level_part.split(" + ")
|
|
|
- numbers = []
|
|
|
- for part in merged_parts:
|
|
|
- part = part.strip()
|
|
|
- number = self._extract_title_number(part)
|
|
|
- if number:
|
|
|
- numbers.append(number)
|
|
|
-
|
|
|
- if numbers:
|
|
|
- return "+".join(numbers)
|
|
|
-
|
|
|
- # 没有合并的情况,直接提取最低层级的编号
|
|
|
- return self._extract_title_number(last_level_part)
|
|
|
-
|
|
|
|