|
|
@@ -3,48 +3,89 @@
|
|
|
实现按目录层级和字符数的智能切分逻辑
|
|
|
"""
|
|
|
|
|
|
-import re
|
|
|
+import io
|
|
|
from pathlib import Path
|
|
|
-from difflib import SequenceMatcher
|
|
|
+from typing import Union
|
|
|
import fitz # PyMuPDF
|
|
|
from docx import Document
|
|
|
|
|
|
try:
|
|
|
from .config_loader import get_config
|
|
|
+ from .title_matcher import TitleMatcher
|
|
|
+ from .text_utils import TextUtils
|
|
|
+ from .chunk_splitter import ChunkSplitter
|
|
|
+ from .chunk_merger import ChunkMerger
|
|
|
+ from .chunk_metadata import ChunkMetadata
|
|
|
+ from .hierarchy_processor import HierarchyProcessor
|
|
|
except ImportError:
|
|
|
from config_loader import get_config
|
|
|
+ from title_matcher import TitleMatcher
|
|
|
+ from text_utils import TextUtils
|
|
|
+ from chunk_splitter import ChunkSplitter
|
|
|
+ from chunk_merger import ChunkMerger
|
|
|
+ from chunk_metadata import ChunkMetadata
|
|
|
+ from hierarchy_processor import HierarchyProcessor
|
|
|
|
|
|
|
|
|
class TextSplitter:
|
|
|
- """文本切分器,支持PDF和Word格式"""
|
|
|
+ """文本切分器,支持PDF和Word格式,支持文件路径和字节流输入"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.config = get_config()
|
|
|
+ self.title_matcher = TitleMatcher()
|
|
|
+ self.text_utils = TextUtils()
|
|
|
+ self.chunk_splitter = ChunkSplitter()
|
|
|
+ self.chunk_merger = ChunkMerger()
|
|
|
+ self.chunk_metadata = ChunkMetadata()
|
|
|
+ self.hierarchy_processor = HierarchyProcessor()
|
|
|
|
|
|
- def extract_full_text(self, file_path):
|
|
|
+ def extract_full_text(self, file_input: Union[str, Path, bytes], file_type: str = None):
|
|
|
"""
|
|
|
提取文档的全文内容
|
|
|
|
|
|
参数:
|
|
|
- file_path: 文档路径(PDF或Word)
|
|
|
+ file_input: 文档路径(PDF或Word)或字节流
|
|
|
+ file_type: 文件类型('pdf'或'docx'),当file_input为bytes时必需
|
|
|
|
|
|
返回:
|
|
|
list: 每页的文本内容
|
|
|
"""
|
|
|
- file_path = Path(file_path)
|
|
|
- file_ext = file_path.suffix.lower()
|
|
|
-
|
|
|
- if file_ext == '.pdf':
|
|
|
- return self._extract_from_pdf(file_path)
|
|
|
- elif file_ext in ['.docx', '.doc']:
|
|
|
- return self._extract_from_word(file_path)
|
|
|
+ # 判断输入类型
|
|
|
+ if isinstance(file_input, bytes):
|
|
|
+ if not file_type:
|
|
|
+ raise ValueError("当输入为字节流时,必须指定file_type参数('pdf'或'docx')")
|
|
|
+ file_ext = f'.{file_type.lower()}'
|
|
|
+ if file_ext == '.pdf':
|
|
|
+ return self._extract_from_pdf(file_input, is_bytes=True)
|
|
|
+ elif file_ext in ['.docx', '.doc']:
|
|
|
+ return self._extract_from_word(file_input, is_bytes=True)
|
|
|
+ else:
|
|
|
+ raise ValueError(f"不支持的文件格式: {file_ext}")
|
|
|
else:
|
|
|
- raise ValueError(f"不支持的文件格式: {file_ext}")
|
|
|
+ # 文件路径输入(保持向后兼容)
|
|
|
+ file_path = Path(file_input)
|
|
|
+ file_ext = file_path.suffix.lower()
|
|
|
+
|
|
|
+ if file_ext == '.pdf':
|
|
|
+ return self._extract_from_pdf(file_path, is_bytes=False)
|
|
|
+ elif file_ext in ['.docx', '.doc']:
|
|
|
+ return self._extract_from_word(file_path, is_bytes=False)
|
|
|
+ else:
|
|
|
+ raise ValueError(f"不支持的文件格式: {file_ext}")
|
|
|
|
|
|
- def _extract_from_pdf(self, pdf_path):
|
|
|
+ def _extract_from_pdf(self, pdf_input, is_bytes=False):
|
|
|
"""提取PDF的全文内容"""
|
|
|
try:
|
|
|
- doc = fitz.open(pdf_path)
|
|
|
+ if is_bytes:
|
|
|
+ # 从字节流打开
|
|
|
+ bytes_io = io.BytesIO(pdf_input)
|
|
|
+ doc = fitz.open(stream=bytes_io)
|
|
|
+ source_file = 'bytes_stream'
|
|
|
+ else:
|
|
|
+ # 从文件路径打开
|
|
|
+ doc = fitz.open(pdf_input)
|
|
|
+ source_file = str(pdf_input)
|
|
|
+
|
|
|
pages_content = []
|
|
|
current_pos = 0
|
|
|
|
|
|
@@ -57,7 +98,7 @@ class TextSplitter:
|
|
|
'text': text,
|
|
|
'start_pos': current_pos,
|
|
|
'end_pos': current_pos + len(text),
|
|
|
- 'source_file': str(pdf_path)
|
|
|
+ 'source_file': source_file
|
|
|
})
|
|
|
|
|
|
current_pos += len(text)
|
|
|
@@ -68,10 +109,19 @@ class TextSplitter:
|
|
|
print(f" 错误: 无法读取PDF全文 - {str(e)}")
|
|
|
return []
|
|
|
|
|
|
- def _extract_from_word(self, word_path):
|
|
|
+ def _extract_from_word(self, word_input, is_bytes=False):
|
|
|
"""提取Word的全文内容(包括段落和表格)"""
|
|
|
try:
|
|
|
- doc = Document(word_path)
|
|
|
+ if is_bytes:
|
|
|
+ # 从字节流打开
|
|
|
+ bytes_io = io.BytesIO(word_input)
|
|
|
+ doc = Document(bytes_io)
|
|
|
+ source_file = 'bytes_stream'
|
|
|
+ else:
|
|
|
+ # 从文件路径打开
|
|
|
+ doc = Document(word_input)
|
|
|
+ source_file = str(word_input)
|
|
|
+
|
|
|
pages_content = []
|
|
|
current_pos = 0
|
|
|
|
|
|
@@ -106,7 +156,7 @@ class TextSplitter:
|
|
|
'text': page_text,
|
|
|
'start_pos': current_pos,
|
|
|
'end_pos': current_pos + len(page_text),
|
|
|
- 'source_file': str(word_path)
|
|
|
+ 'source_file': source_file
|
|
|
})
|
|
|
|
|
|
current_pos += len(page_text)
|
|
|
@@ -129,7 +179,8 @@ class TextSplitter:
|
|
|
return '\n[表格开始]\n' + '\n'.join(table_text) + '\n[表格结束]\n'
|
|
|
|
|
|
def split_by_hierarchy(self, classified_items, pages_content, toc_info,
|
|
|
- target_level=2, max_chunk_size=1000, min_chunk_size=500):
|
|
|
+ target_level=2, max_chunk_size=1000, min_chunk_size=500,
|
|
|
+ use_concurrent=True, max_workers=None):
|
|
|
"""
|
|
|
按目录层级和字符数智能切分文本
|
|
|
|
|
|
@@ -147,6 +198,8 @@ class TextSplitter:
|
|
|
target_level: 目标层级
|
|
|
max_chunk_size: 最大分块字符数
|
|
|
min_chunk_size: 最小分块字符数
|
|
|
+ use_concurrent: 是否使用并发处理(默认True)
|
|
|
+ max_workers: 最大并发线程数(默认None,使用系统默认值)
|
|
|
|
|
|
返回:
|
|
|
list: 带分类信息的文本块列表
|
|
|
@@ -157,7 +210,7 @@ class TextSplitter:
|
|
|
print(f" 目录所在页: {toc_info['toc_pages']}")
|
|
|
|
|
|
# 步骤1: 在正文中定位已分类的标题(跳过目录页)
|
|
|
- located_titles = self._find_title_positions(
|
|
|
+ located_titles = self.title_matcher.find_title_positions(
|
|
|
classified_items,
|
|
|
full_text,
|
|
|
pages_content,
|
|
|
@@ -179,636 +232,30 @@ class TextSplitter:
|
|
|
# 步骤2: 提取所有层级的目录项,用于在正文块中查找子标题
|
|
|
all_toc_items = toc_info['toc_items']
|
|
|
|
|
|
- # 步骤3: 对每个目标层级的标题,提取其正文块并进行智能切分
|
|
|
- all_chunks = []
|
|
|
+ # 步骤2.5: 为每个找到的标题构建完整的层级路径
|
|
|
+ for title_info in found_titles:
|
|
|
+ hierarchy_path = self.chunk_metadata.build_hierarchy_path(title_info['title'], all_toc_items, target_level)
|
|
|
+ title_info['hierarchy_path'] = hierarchy_path
|
|
|
|
|
|
- for i, title_info in enumerate(found_titles):
|
|
|
- start_pos = title_info['position']
|
|
|
-
|
|
|
- # 确定正文块的结束位置(下一个同级标题的位置)
|
|
|
- if i + 1 < len(found_titles):
|
|
|
- end_pos = found_titles[i + 1]['position']
|
|
|
- else:
|
|
|
- end_pos = len(full_text)
|
|
|
-
|
|
|
- # 提取正文块
|
|
|
- content_block = full_text[start_pos:end_pos]
|
|
|
-
|
|
|
- # 在正文块中查找子标题(比目标层级更低的层级)
|
|
|
- sub_chunks = self._split_by_sub_titles(
|
|
|
- content_block,
|
|
|
- all_toc_items,
|
|
|
- title_info,
|
|
|
- target_level,
|
|
|
- max_chunk_size,
|
|
|
- min_chunk_size
|
|
|
+ # 步骤3: 按一级目录分组并并发处理
|
|
|
+ if use_concurrent:
|
|
|
+ all_chunks = self.hierarchy_processor.split_by_hierarchy_concurrent(
|
|
|
+ found_titles, full_text, pages_content, all_toc_items,
|
|
|
+ target_level, max_chunk_size, min_chunk_size, max_workers
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ all_chunks = self.hierarchy_processor.split_by_hierarchy_sequential(
|
|
|
+ found_titles, full_text, pages_content, all_toc_items,
|
|
|
+ target_level, max_chunk_size, min_chunk_size
|
|
|
)
|
|
|
-
|
|
|
- # 为每个子块添加元数据
|
|
|
- for j, sub_chunk in enumerate(sub_chunks, 1):
|
|
|
- # 计算实际页码
|
|
|
- chunk_start_pos = start_pos + sub_chunk['relative_start']
|
|
|
- page_num = self._get_page_number(chunk_start_pos, pages_content)
|
|
|
-
|
|
|
- # 构建section_label(层级路径)
|
|
|
- section_label = self._build_section_label(
|
|
|
- title_info['title'],
|
|
|
- sub_chunk.get('sub_title', '')
|
|
|
- )
|
|
|
-
|
|
|
- # 提取最低层级标题的编号
|
|
|
- sub_title = sub_chunk.get('sub_title', '')
|
|
|
- if sub_title:
|
|
|
- title_number = self._extract_title_number(sub_title)
|
|
|
- else:
|
|
|
- # 如果没有子标题,从父标题提取
|
|
|
- title_number = self._extract_title_number(title_info['title'])
|
|
|
-
|
|
|
- # 构建chunk_id格式:doc_chunk_<serial_number>_<序号>
|
|
|
- # 序号从1开始(如果合并了会从0开始)
|
|
|
- chunk_id_str = f"doc_chunk_{title_number}_{j}" if title_number else f"doc_chunk_{j}"
|
|
|
-
|
|
|
- all_chunks.append({
|
|
|
- 'file_name': Path(pages_content[0].get('source_file', 'unknown')).name if pages_content else 'unknown',
|
|
|
- 'chunk_id': chunk_id_str,
|
|
|
- 'section_label': section_label,
|
|
|
- 'project_plan_type': 'bridge_up_part',
|
|
|
- 'element_tag': {
|
|
|
- 'chunk_id': chunk_id_str,
|
|
|
- 'page': page_num,
|
|
|
- 'serial_number': title_number if title_number else str(i + 1)
|
|
|
- },
|
|
|
- 'review_chunk_content': sub_chunk['content'],
|
|
|
- '_title_number': title_number, # 临时存储,用于合并时判断
|
|
|
- '_local_index': j # 临时存储局部索引
|
|
|
- })
|
|
|
|
|
|
# 步骤4: 对小块进行合并
|
|
|
- merged_chunks = self._merge_small_chunks(all_chunks, max_chunk_size, min_chunk_size)
|
|
|
+ merged_chunks = self.chunk_merger.merge_small_chunks(all_chunks, max_chunk_size, min_chunk_size, target_level)
|
|
|
|
|
|
# 步骤5: 生成最终的chunk_id和serial_number
|
|
|
- final_chunks = self._finalize_chunk_ids(merged_chunks)
|
|
|
+ final_chunks = self.chunk_metadata.finalize_chunk_ids(merged_chunks)
|
|
|
|
|
|
print(f" 初始切分: {len(all_chunks)} 个块")
|
|
|
print(f" 合并后: {len(merged_chunks)} 个块")
|
|
|
|
|
|
return final_chunks
|
|
|
-
|
|
|
- def _find_title_positions(self, classified_items, full_text, pages_content, toc_pages):
|
|
|
- """在正文中定位已分类的标题位置(跳过目录页)"""
|
|
|
- # 计算目录页的文本范围
|
|
|
- toc_start_pos = float('inf')
|
|
|
- toc_end_pos = 0
|
|
|
-
|
|
|
- for page in pages_content:
|
|
|
- if page['page_num'] in toc_pages:
|
|
|
- toc_start_pos = min(toc_start_pos, page['start_pos'])
|
|
|
- toc_end_pos = max(toc_end_pos, page['end_pos'])
|
|
|
-
|
|
|
- print(f" 目录页范围: {toc_start_pos} - {toc_end_pos}")
|
|
|
-
|
|
|
- located_titles = []
|
|
|
-
|
|
|
- for item in classified_items:
|
|
|
- title = item['title']
|
|
|
- category = item['category']
|
|
|
- category_code = item.get('category_code', 'other')
|
|
|
-
|
|
|
- # 在全文中查找标题(使用配置的模糊匹配阈值)
|
|
|
- fuzzy_threshold = self.config.fuzzy_threshold
|
|
|
- pos = self._find_title_in_text(title, full_text, fuzzy_threshold=fuzzy_threshold)
|
|
|
-
|
|
|
- # 如果找到的位置在目录页范围内,继续查找下一个出现
|
|
|
- if pos >= 0 and toc_start_pos <= pos < toc_end_pos:
|
|
|
- print(f" [跳过目录] {title} -> 位置: {pos} (在目录页)")
|
|
|
-
|
|
|
- # 尝试在目录页之后继续查找
|
|
|
- search_start = toc_end_pos
|
|
|
- remaining_text = full_text[search_start:]
|
|
|
- pos_in_remaining = self._find_title_in_text(title, remaining_text, fuzzy_threshold=fuzzy_threshold)
|
|
|
-
|
|
|
- if pos_in_remaining >= 0:
|
|
|
- pos = search_start + pos_in_remaining
|
|
|
- print(f" [找到正文] {title} -> 位置: {pos}")
|
|
|
- else:
|
|
|
- pos = -1
|
|
|
- print(f" [未找到] {title} (目录页之后)")
|
|
|
-
|
|
|
- if pos >= 0:
|
|
|
- # 确认位置不在目录页
|
|
|
- if not (toc_start_pos <= pos < toc_end_pos):
|
|
|
- # 找到对应的页码
|
|
|
- page_num = self._get_page_number(pos, pages_content)
|
|
|
-
|
|
|
- located_titles.append({
|
|
|
- 'title': title,
|
|
|
- 'category': category,
|
|
|
- 'category_code': category_code,
|
|
|
- 'position': pos,
|
|
|
- 'toc_page': item.get('page', ''),
|
|
|
- 'actual_page': page_num,
|
|
|
- 'found': True
|
|
|
- })
|
|
|
- print(f" [确认] {title} -> 页码: {page_num}, 位置: {pos}")
|
|
|
- else:
|
|
|
- print(f" [未找到] {title} (只在目录页)")
|
|
|
- located_titles.append({
|
|
|
- 'title': title,
|
|
|
- 'category': category,
|
|
|
- 'category_code': category_code,
|
|
|
- 'position': -1,
|
|
|
- 'toc_page': item.get('page', ''),
|
|
|
- 'found': False
|
|
|
- })
|
|
|
- else:
|
|
|
- print(f" [未找到] {title}")
|
|
|
- located_titles.append({
|
|
|
- 'title': title,
|
|
|
- 'category': category,
|
|
|
- 'category_code': category_code,
|
|
|
- 'position': -1,
|
|
|
- 'toc_page': item.get('page', ''),
|
|
|
- 'found': False
|
|
|
- })
|
|
|
-
|
|
|
- return located_titles
|
|
|
-
|
|
|
- def _find_title_in_text(self, title, text, fuzzy_threshold=0.85):
|
|
|
- """在文本中查找标题的位置"""
|
|
|
- normalized_title = self._normalize_title(title)
|
|
|
-
|
|
|
- # 方法1: 精确匹配
|
|
|
- if normalized_title in text:
|
|
|
- return text.index(normalized_title)
|
|
|
-
|
|
|
- # 方法2: 移除所有空格后匹配
|
|
|
- title_no_space = normalized_title.replace(' ', '')
|
|
|
- text_no_space = text.replace(' ', '')
|
|
|
- if title_no_space in text_no_space:
|
|
|
- pos_no_space = text_no_space.index(title_no_space)
|
|
|
- return pos_no_space
|
|
|
-
|
|
|
- # 方法3: 按行查找,匹配度最高的行
|
|
|
- lines = text.split('\n')
|
|
|
- current_pos = 0
|
|
|
- best_ratio = 0
|
|
|
- best_pos = -1
|
|
|
-
|
|
|
- for line in lines:
|
|
|
- line_stripped = line.strip()
|
|
|
-
|
|
|
- if len(line_stripped) < 3:
|
|
|
- current_pos += len(line) + 1
|
|
|
- continue
|
|
|
-
|
|
|
- # 计算相似度
|
|
|
- ratio = SequenceMatcher(None, normalized_title, line_stripped).ratio()
|
|
|
-
|
|
|
- if ratio > best_ratio:
|
|
|
- best_ratio = ratio
|
|
|
- best_pos = current_pos
|
|
|
-
|
|
|
- current_pos += len(line) + 1
|
|
|
-
|
|
|
- # 如果找到相似度足够高的行
|
|
|
- if best_ratio >= fuzzy_threshold:
|
|
|
- return best_pos
|
|
|
-
|
|
|
- return -1
|
|
|
-
|
|
|
- def _normalize_title(self, title):
|
|
|
- """标准化标题用于匹配"""
|
|
|
- normalized = re.sub(r'\s+', ' ', title)
|
|
|
- normalized = normalized.strip()
|
|
|
- return normalized
|
|
|
-
|
|
|
- def _extract_title_number(self, title):
|
|
|
- """
|
|
|
- 从标题中提取编号部分
|
|
|
-
|
|
|
- 例如:
|
|
|
- "1.5 施工条件" -> "1.5"
|
|
|
- "1.6 风险辨识与分级" -> "1.6"
|
|
|
- "1 工程概况" -> "1"
|
|
|
-
|
|
|
- 参数:
|
|
|
- title: 标题字符串
|
|
|
-
|
|
|
- 返回:
|
|
|
- str: 编号部分,如果未找到则返回空字符串
|
|
|
- """
|
|
|
- # 匹配数字编号格式(如 1.5, 1.6, 1.2.3等)
|
|
|
- number_match = re.match(r'^(\d+(?:\.\d+)*)', title)
|
|
|
- if number_match:
|
|
|
- return number_match.group(1)
|
|
|
-
|
|
|
- # 匹配中文编号格式(如 一、二、三等)
|
|
|
- chinese_match = re.match(r'^([一二三四五六七八九十]+)[、..]', title)
|
|
|
- if chinese_match:
|
|
|
- return chinese_match.group(1)
|
|
|
-
|
|
|
- return ""
|
|
|
-
|
|
|
- def _get_page_number(self, position, pages_content):
|
|
|
- """根据位置获取页码"""
|
|
|
- for page in pages_content:
|
|
|
- if page['start_pos'] <= position < page['end_pos']:
|
|
|
- return page['page_num']
|
|
|
- return 1
|
|
|
-
|
|
|
- def _split_by_sub_titles(self, content_block, all_toc_items, parent_title_info,
|
|
|
- target_level, max_chunk_size, min_chunk_size):
|
|
|
- """
|
|
|
- 在正文块中按子标题进行切分
|
|
|
-
|
|
|
- 参数:
|
|
|
- content_block: 正文块内容
|
|
|
- all_toc_items: 所有目录项
|
|
|
- parent_title_info: 父标题信息
|
|
|
- target_level: 目标层级
|
|
|
- max_chunk_size: 最大分块字符数
|
|
|
- min_chunk_size: 最小分块字符数
|
|
|
-
|
|
|
- 返回:
|
|
|
- list: 子块列表
|
|
|
- """
|
|
|
- # 查找比目标层级更低的子标题
|
|
|
- sub_titles = []
|
|
|
- fuzzy_threshold = self.config.fuzzy_threshold
|
|
|
- for toc_item in all_toc_items:
|
|
|
- if toc_item['level'] > target_level:
|
|
|
- # 在正文块中查找这个子标题
|
|
|
- pos = self._find_title_in_text(toc_item['title'], content_block, fuzzy_threshold=fuzzy_threshold)
|
|
|
- if pos >= 0:
|
|
|
- sub_titles.append({
|
|
|
- 'title': toc_item['title'],
|
|
|
- 'level': toc_item['level'],
|
|
|
- 'position': pos
|
|
|
- })
|
|
|
-
|
|
|
- # 按位置排序
|
|
|
- sub_titles.sort(key=lambda x: x['position'])
|
|
|
-
|
|
|
- # 如果没有找到子标题,将整个正文块作为一个块
|
|
|
- if not sub_titles:
|
|
|
- # 检查是否需要分割
|
|
|
- if len(content_block) > max_chunk_size:
|
|
|
- return self._split_large_chunk(content_block, max_chunk_size, parent_title_info['title'])
|
|
|
- else:
|
|
|
- return [{
|
|
|
- 'content': content_block,
|
|
|
- 'relative_start': 0,
|
|
|
- 'sub_title': '',
|
|
|
- 'serial_number': ''
|
|
|
- }]
|
|
|
-
|
|
|
- # 按子标题切分
|
|
|
- chunks = []
|
|
|
- for i, sub_title in enumerate(sub_titles):
|
|
|
- start_pos = sub_title['position']
|
|
|
-
|
|
|
- # 确定结束位置
|
|
|
- if i + 1 < len(sub_titles):
|
|
|
- end_pos = sub_titles[i + 1]['position']
|
|
|
- else:
|
|
|
- end_pos = len(content_block)
|
|
|
-
|
|
|
- chunk_content = content_block[start_pos:end_pos]
|
|
|
-
|
|
|
- # 检查是否需要分割
|
|
|
- if len(chunk_content) > max_chunk_size:
|
|
|
- split_chunks = self._split_large_chunk(chunk_content, max_chunk_size, sub_title['title'])
|
|
|
- for j, split_chunk in enumerate(split_chunks):
|
|
|
- split_chunk['relative_start'] = start_pos + split_chunk['relative_start']
|
|
|
- split_chunk['sub_title'] = sub_title['title']
|
|
|
- chunks.append(split_chunk)
|
|
|
- else:
|
|
|
- chunks.append({
|
|
|
- 'content': chunk_content,
|
|
|
- 'relative_start': start_pos,
|
|
|
- 'sub_title': sub_title['title']
|
|
|
- })
|
|
|
-
|
|
|
- return chunks
|
|
|
-
|
|
|
- def _split_large_chunk(self, content, max_chunk_size, title):
|
|
|
- """
|
|
|
- 将超大块按句子级分割(保持语义完整)
|
|
|
-
|
|
|
- 参数:
|
|
|
- content: 内容
|
|
|
- max_chunk_size: 最大分块字符数
|
|
|
- title: 标题
|
|
|
-
|
|
|
- 返回:
|
|
|
- list: 分割后的块列表
|
|
|
- """
|
|
|
- # 按句子分割(中文句号、问号、感叹号)
|
|
|
- sentences = re.split(r'([。!?\n])', content)
|
|
|
-
|
|
|
- # 重新组合句子和标点
|
|
|
- combined_sentences = []
|
|
|
- for i in range(0, len(sentences) - 1, 2):
|
|
|
- if i + 1 < len(sentences):
|
|
|
- combined_sentences.append(sentences[i] + sentences[i + 1])
|
|
|
- else:
|
|
|
- combined_sentences.append(sentences[i])
|
|
|
-
|
|
|
- if not combined_sentences:
|
|
|
- combined_sentences = [content]
|
|
|
-
|
|
|
- # 按max_chunk_size组合句子
|
|
|
- chunks = []
|
|
|
- current_chunk = ""
|
|
|
- current_start = 0
|
|
|
-
|
|
|
- for sentence in combined_sentences:
|
|
|
- if len(current_chunk) + len(sentence) <= max_chunk_size:
|
|
|
- current_chunk += sentence
|
|
|
- else:
|
|
|
- if current_chunk:
|
|
|
- chunks.append({
|
|
|
- 'content': current_chunk,
|
|
|
- 'relative_start': current_start,
|
|
|
- 'is_split': True # 标记为分割块,不参与合并
|
|
|
- })
|
|
|
- current_start += len(current_chunk)
|
|
|
- current_chunk = sentence
|
|
|
-
|
|
|
- # 添加最后一个块
|
|
|
- if current_chunk:
|
|
|
- chunks.append({
|
|
|
- 'content': current_chunk,
|
|
|
- 'relative_start': current_start,
|
|
|
- 'is_split': True
|
|
|
- })
|
|
|
-
|
|
|
- return chunks
|
|
|
-
|
|
|
- def _merge_small_chunks(self, chunks, max_chunk_size, min_chunk_size):
|
|
|
- """
|
|
|
- 合并小于min_chunk_size的块
|
|
|
-
|
|
|
- 参数:
|
|
|
- chunks: 块列表
|
|
|
- max_chunk_size: 最大分块字符数
|
|
|
- min_chunk_size: 最小分块字符数
|
|
|
-
|
|
|
- 返回:
|
|
|
- list: 合并后的块列表
|
|
|
- """
|
|
|
- if not chunks:
|
|
|
- return []
|
|
|
-
|
|
|
- # 先按最低层级标题编号分组处理(在同一标题内合并)
|
|
|
- current_title_number = None
|
|
|
- title_groups = []
|
|
|
- current_group = []
|
|
|
-
|
|
|
- for chunk in chunks:
|
|
|
- title_number = chunk.get('_title_number', '')
|
|
|
-
|
|
|
- if title_number != current_title_number:
|
|
|
- # 保存上一组
|
|
|
- if current_group:
|
|
|
- title_groups.append({
|
|
|
- 'title_number': current_title_number,
|
|
|
- 'chunks': current_group
|
|
|
- })
|
|
|
- # 开始新组
|
|
|
- current_title_number = title_number
|
|
|
- current_group = [chunk]
|
|
|
- else:
|
|
|
- current_group.append(chunk)
|
|
|
-
|
|
|
- # 保存最后一组
|
|
|
- if current_group:
|
|
|
- title_groups.append({
|
|
|
- 'title_number': current_title_number,
|
|
|
- 'chunks': current_group
|
|
|
- })
|
|
|
-
|
|
|
- # 在每个组内合并小块
|
|
|
- merged_groups = []
|
|
|
- for group in title_groups:
|
|
|
- merged_chunks = self._merge_within_title(group['chunks'], max_chunk_size, min_chunk_size)
|
|
|
- merged_groups.append({
|
|
|
- 'title_number': group['title_number'],
|
|
|
- 'chunks': merged_chunks
|
|
|
- })
|
|
|
-
|
|
|
- # 处理跨标题合并:如果上一组的最后一个块与当前组的第一个块都是小块,可以合并
|
|
|
- final_merged = []
|
|
|
- for i, group in enumerate(merged_groups):
|
|
|
- if i == 0:
|
|
|
- final_merged.extend(group['chunks'])
|
|
|
- else:
|
|
|
- # 检查是否可以与上一组的最后一个块合并
|
|
|
- prev_group = merged_groups[i - 1]
|
|
|
- if prev_group['chunks'] and group['chunks']:
|
|
|
- prev_last = prev_group['chunks'][-1]
|
|
|
- curr_first = group['chunks'][0]
|
|
|
-
|
|
|
- prev_content = prev_last['review_chunk_content']
|
|
|
- curr_content = curr_first['review_chunk_content']
|
|
|
-
|
|
|
- # 如果两个块都是小块且不是分割块,可以合并
|
|
|
- if (not prev_last.get('is_split', False) and
|
|
|
- not curr_first.get('is_split', False) and
|
|
|
- len(prev_content) < min_chunk_size and
|
|
|
- len(curr_content) < min_chunk_size and
|
|
|
- len(prev_content) + len(curr_content) <= max_chunk_size):
|
|
|
-
|
|
|
- # 合并
|
|
|
- merged_content = prev_content + '\n\n' + curr_content
|
|
|
- merged_chunk = prev_last.copy()
|
|
|
- merged_chunk['review_chunk_content'] = merged_content
|
|
|
- merged_chunk['section_label'] = self._merge_section_labels(
|
|
|
- prev_last['section_label'],
|
|
|
- curr_first['section_label']
|
|
|
- )
|
|
|
- # 合并标题编号
|
|
|
- prev_title_num = prev_last.get('_title_number', '')
|
|
|
- curr_title_num = curr_first.get('_title_number', '')
|
|
|
- if prev_title_num and curr_title_num and prev_title_num != curr_title_num:
|
|
|
- # chunk_id中使用+号(无空格)
|
|
|
- merged_chunk['_title_number'] = f"{prev_title_num}+{curr_title_num}"
|
|
|
- # serial_number中使用空格(用于显示)
|
|
|
- merged_chunk['_title_number_display'] = f"{prev_title_num} + {curr_title_num}"
|
|
|
- merged_chunk['_is_merged'] = True
|
|
|
-
|
|
|
- # 替换上一组的最后一个块
|
|
|
- final_merged[-1] = merged_chunk
|
|
|
- # 跳过当前组的第一个块
|
|
|
- final_merged.extend(group['chunks'][1:])
|
|
|
- else:
|
|
|
- final_merged.extend(group['chunks'])
|
|
|
- else:
|
|
|
- final_merged.extend(group['chunks'])
|
|
|
-
|
|
|
- return final_merged
|
|
|
-
|
|
|
- def _merge_within_title(self, title_chunks, max_chunk_size, min_chunk_size):
|
|
|
- """在同一个最低层级标题内合并小块"""
|
|
|
- if not title_chunks:
|
|
|
- return []
|
|
|
-
|
|
|
- merged = []
|
|
|
- i = 0
|
|
|
-
|
|
|
- while i < len(title_chunks):
|
|
|
- current_chunk = title_chunks[i]
|
|
|
- current_content = current_chunk['review_chunk_content']
|
|
|
-
|
|
|
- # 如果当前块是分割块,不参与合并
|
|
|
- if current_chunk.get('is_split', False):
|
|
|
- merged.append(current_chunk)
|
|
|
- i += 1
|
|
|
- continue
|
|
|
-
|
|
|
- # 如果当前块小于最小值,尝试与下一个块合并
|
|
|
- if len(current_content) < min_chunk_size and i + 1 < len(title_chunks):
|
|
|
- next_chunk = title_chunks[i + 1]
|
|
|
- next_content = next_chunk['review_chunk_content']
|
|
|
-
|
|
|
- # 检查下一个块是否也是小块且不是分割块
|
|
|
- if (not next_chunk.get('is_split', False) and
|
|
|
- len(current_content) + len(next_content) <= max_chunk_size):
|
|
|
- # 合并
|
|
|
- merged_content = current_content + '\n\n' + next_content
|
|
|
- merged_chunk = current_chunk.copy()
|
|
|
- merged_chunk['review_chunk_content'] = merged_content
|
|
|
- # 使用优化的标签合并函数
|
|
|
- merged_chunk['section_label'] = self._merge_section_labels(
|
|
|
- current_chunk['section_label'],
|
|
|
- next_chunk['section_label']
|
|
|
- )
|
|
|
- merged.append(merged_chunk)
|
|
|
- i += 2 # 跳过下一个块
|
|
|
- continue
|
|
|
-
|
|
|
- # 否则直接添加
|
|
|
- merged.append(current_chunk)
|
|
|
- i += 1
|
|
|
-
|
|
|
- return merged
|
|
|
-
|
|
|
- def _finalize_chunk_ids(self, chunks):
|
|
|
- """
|
|
|
- 生成最终的chunk_id和serial_number
|
|
|
-
|
|
|
- 参数:
|
|
|
- chunks: 合并后的块列表
|
|
|
-
|
|
|
- 返回:
|
|
|
- list: 最终处理后的块列表
|
|
|
- """
|
|
|
- final_chunks = []
|
|
|
- current_title_number = None
|
|
|
- local_index = 1
|
|
|
-
|
|
|
- for i, chunk in enumerate(chunks):
|
|
|
- title_number = chunk.get('_title_number', '')
|
|
|
- is_merged = chunk.get('_is_merged', False)
|
|
|
-
|
|
|
- # 提取标题编号的主要部分(用于判断是否在同一标题内)
|
|
|
- # 如果包含+号,说明是跨标题合并的块
|
|
|
- if '+' in str(title_number):
|
|
|
- # 跨标题合并的块,序号从0开始
|
|
|
- local_index = 0
|
|
|
- # chunk_id中使用+号(无空格),如"1.5+1.6"
|
|
|
- merged_title_number = title_number
|
|
|
- # serial_number中使用空格,如"1.5 + 1.6"
|
|
|
- serial_number_display = chunk.get('_title_number_display', title_number.replace('+', ' + '))
|
|
|
- # 更新current_title_number为合并后的编号,这样下一个块会重新开始
|
|
|
- current_title_number = title_number
|
|
|
- else:
|
|
|
- # 如果标题编号变化,重置索引
|
|
|
- if title_number != current_title_number:
|
|
|
- current_title_number = title_number
|
|
|
- # 如果上一个块是跨标题合并的,说明当前标题的第一个块已经被合并了,序号从1开始
|
|
|
- # 否则序号从1开始
|
|
|
- local_index = 1
|
|
|
- else:
|
|
|
- local_index += 1
|
|
|
- merged_title_number = title_number
|
|
|
- serial_number_display = title_number
|
|
|
-
|
|
|
- # 生成chunk_id(使用无空格的编号)
|
|
|
- if merged_title_number:
|
|
|
- chunk_id_str = f"doc_chunk_{merged_title_number}_{local_index}"
|
|
|
- else:
|
|
|
- chunk_id_str = f"doc_chunk_{local_index}"
|
|
|
-
|
|
|
- # 更新chunk数据
|
|
|
- final_chunk = {
|
|
|
- 'file_name': chunk['file_name'],
|
|
|
- 'chunk_id': chunk_id_str,
|
|
|
- 'section_label': chunk['section_label'],
|
|
|
- 'project_plan_type': 'bridge_up_part',
|
|
|
- 'element_tag': {
|
|
|
- 'chunk_id': chunk_id_str,
|
|
|
- 'page': chunk['element_tag']['page'],
|
|
|
- 'serial_number': serial_number_display if merged_title_number else ''
|
|
|
- },
|
|
|
- 'review_chunk_content': chunk['review_chunk_content']
|
|
|
- }
|
|
|
-
|
|
|
- final_chunks.append(final_chunk)
|
|
|
-
|
|
|
- return final_chunks
|
|
|
-
|
|
|
- def _build_section_label(self, parent_title, sub_title):
|
|
|
- """构建section_label(层级路径)"""
|
|
|
- if sub_title:
|
|
|
- return f"{parent_title}->{sub_title}"
|
|
|
- else:
|
|
|
- return parent_title
|
|
|
-
|
|
|
- def _merge_section_labels(self, label1, label2):
|
|
|
- """
|
|
|
- 合并两个section_label,提取公共前缀
|
|
|
-
|
|
|
- 例如:
|
|
|
- "1 工程概况->1.3 工程地质" + "1 工程概况->1.4 气象水文"
|
|
|
- => "1 工程概况->1.3 工程地质 + 1.4 气象水文"
|
|
|
-
|
|
|
- 参数:
|
|
|
- label1: 第一个标签
|
|
|
- label2: 第二个标签
|
|
|
-
|
|
|
- 返回:
|
|
|
- str: 合并后的标签
|
|
|
- """
|
|
|
- # 按"->"分割标签
|
|
|
- parts1 = label1.split('->')
|
|
|
- parts2 = label2.split('->')
|
|
|
-
|
|
|
- # 找到公共前缀
|
|
|
- common_prefix = []
|
|
|
- for i in range(min(len(parts1), len(parts2))):
|
|
|
- if parts1[i] == parts2[i]:
|
|
|
- common_prefix.append(parts1[i])
|
|
|
- else:
|
|
|
- break
|
|
|
-
|
|
|
- # 如果有公共前缀
|
|
|
- if common_prefix:
|
|
|
- # 获取不同的部分
|
|
|
- diff1 = '->'.join(parts1[len(common_prefix):])
|
|
|
- diff2 = '->'.join(parts2[len(common_prefix):])
|
|
|
-
|
|
|
- # 构建合并后的标签
|
|
|
- prefix = '->'.join(common_prefix)
|
|
|
- if diff1 and diff2:
|
|
|
- return f"{prefix}->{diff1} + {diff2}"
|
|
|
- elif diff1:
|
|
|
- return f"{prefix}->{diff1}"
|
|
|
- elif diff2:
|
|
|
- return f"{prefix}->{diff2}"
|
|
|
- else:
|
|
|
- return prefix
|
|
|
- else:
|
|
|
- # 没有公共前缀,直接用+连接
|
|
|
- return f"{label1} + {label2}"
|
|
|
-
|