|
|
@@ -0,0 +1,814 @@
|
|
|
+"""
|
|
|
+文本切分模块
|
|
|
+实现按目录层级和字符数的智能切分逻辑
|
|
|
+"""
|
|
|
+
|
|
|
+import re
|
|
|
+from pathlib import Path
|
|
|
+from difflib import SequenceMatcher
|
|
|
+import fitz # PyMuPDF
|
|
|
+from docx import Document
|
|
|
+
|
|
|
+try:
|
|
|
+ from .config_loader import get_config
|
|
|
+except ImportError:
|
|
|
+ from config_loader import get_config
|
|
|
+
|
|
|
+
|
|
|
+class TextSplitter:
|
|
|
+ """文本切分器,支持PDF和Word格式"""
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.config = get_config()
|
|
|
+
|
|
|
+ def extract_full_text(self, file_path):
|
|
|
+ """
|
|
|
+ 提取文档的全文内容
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ file_path: 文档路径(PDF或Word)
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ list: 每页的文本内容
|
|
|
+ """
|
|
|
+ file_path = Path(file_path)
|
|
|
+ file_ext = file_path.suffix.lower()
|
|
|
+
|
|
|
+ if file_ext == '.pdf':
|
|
|
+ return self._extract_from_pdf(file_path)
|
|
|
+ elif file_ext in ['.docx', '.doc']:
|
|
|
+ return self._extract_from_word(file_path)
|
|
|
+ else:
|
|
|
+ raise ValueError(f"不支持的文件格式: {file_ext}")
|
|
|
+
|
|
|
+ def _extract_from_pdf(self, pdf_path):
|
|
|
+ """提取PDF的全文内容"""
|
|
|
+ try:
|
|
|
+ doc = fitz.open(pdf_path)
|
|
|
+ pages_content = []
|
|
|
+ current_pos = 0
|
|
|
+
|
|
|
+ for page_num in range(len(doc)):
|
|
|
+ page = doc[page_num]
|
|
|
+ text = page.get_text()
|
|
|
+
|
|
|
+ pages_content.append({
|
|
|
+ 'page_num': page_num + 1,
|
|
|
+ 'text': text,
|
|
|
+ 'start_pos': current_pos,
|
|
|
+ 'end_pos': current_pos + len(text),
|
|
|
+ 'source_file': str(pdf_path)
|
|
|
+ })
|
|
|
+
|
|
|
+ current_pos += len(text)
|
|
|
+
|
|
|
+ doc.close()
|
|
|
+ return pages_content
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 错误: 无法读取PDF全文 - {str(e)}")
|
|
|
+ return []
|
|
|
+
|
|
|
+ def _extract_from_word(self, word_path):
|
|
|
+ """提取Word的全文内容(包括段落和表格)"""
|
|
|
+ try:
|
|
|
+ doc = Document(word_path)
|
|
|
+ pages_content = []
|
|
|
+ current_pos = 0
|
|
|
+
|
|
|
+ # 提取所有内容(段落和表格按文档顺序)
|
|
|
+ all_content = []
|
|
|
+
|
|
|
+ # 遍历文档的所有元素(段落和表格)
|
|
|
+ for element in doc.element.body:
|
|
|
+ # 检查是段落还是表格
|
|
|
+ if element.tag.endswith('p'): # 段落
|
|
|
+ for para in doc.paragraphs:
|
|
|
+ if para._element == element:
|
|
|
+ text = para.text
|
|
|
+ if text.strip():
|
|
|
+ all_content.append(text)
|
|
|
+ break
|
|
|
+ elif element.tag.endswith('tbl'): # 表格
|
|
|
+ for table in doc.tables:
|
|
|
+ if table._element == element:
|
|
|
+ table_text = self._extract_table_text(table)
|
|
|
+ all_content.append(table_text)
|
|
|
+ break
|
|
|
+
|
|
|
+ # 模拟分页:每30个元素作为一"页"
|
|
|
+ elements_per_page = 30
|
|
|
+ for page_num in range(0, len(all_content), elements_per_page):
|
|
|
+ page_elements = all_content[page_num:page_num + elements_per_page]
|
|
|
+ page_text = '\n'.join(page_elements)
|
|
|
+
|
|
|
+ pages_content.append({
|
|
|
+ 'page_num': page_num // elements_per_page + 1,
|
|
|
+ 'text': page_text,
|
|
|
+ 'start_pos': current_pos,
|
|
|
+ 'end_pos': current_pos + len(page_text),
|
|
|
+ 'source_file': str(word_path)
|
|
|
+ })
|
|
|
+
|
|
|
+ current_pos += len(page_text)
|
|
|
+
|
|
|
+ return pages_content
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 错误: 无法读取Word全文 - {str(e)}")
|
|
|
+ return []
|
|
|
+
|
|
|
+ def _extract_table_text(self, table):
|
|
|
+ """提取表格内容为文本格式"""
|
|
|
+ table_text = []
|
|
|
+ for row in table.rows:
|
|
|
+ row_text = []
|
|
|
+ for cell in row.cells:
|
|
|
+ cell_text = cell.text.strip().replace('\n', ' ')
|
|
|
+ row_text.append(cell_text)
|
|
|
+ table_text.append('\t'.join(row_text))
|
|
|
+
|
|
|
+ return '\n[表格开始]\n' + '\n'.join(table_text) + '\n[表格结束]\n'
|
|
|
+
|
|
|
+ def split_by_hierarchy(self, classified_items, pages_content, toc_info,
|
|
|
+ target_level=2, max_chunk_size=1000, min_chunk_size=500):
|
|
|
+ """
|
|
|
+ 按目录层级和字符数智能切分文本
|
|
|
+
|
|
|
+ 新的分块逻辑:
|
|
|
+ 1. 按目录项定位到指定层级的正文标题
|
|
|
+ 2. 在指定层级正文标题所属的正文块中,先按目录项的最低层级子标题进行分块
|
|
|
+ 3. 然后逐个判断字符数:
|
|
|
+ - 超过max_chunk_size的进行分割(句子级,保持语义完整,分割的块不参与合并)
|
|
|
+ - 不足min_chunk_size的块进行合并(合并后不能超过max_chunk_size,否则不合并)
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ classified_items: 已分类的目录项列表
|
|
|
+ pages_content: 文档全文内容(按页)
|
|
|
+ toc_info: 目录信息
|
|
|
+ target_level: 目标层级
|
|
|
+ max_chunk_size: 最大分块字符数
|
|
|
+ min_chunk_size: 最小分块字符数
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ list: 带分类信息的文本块列表
|
|
|
+ """
|
|
|
+ full_text = ''.join([page['text'] for page in pages_content])
|
|
|
+
|
|
|
+ print(f" 正在定位{len(classified_items)}个已分类的标题...")
|
|
|
+ print(f" 目录所在页: {toc_info['toc_pages']}")
|
|
|
+
|
|
|
+ # 步骤1: 在正文中定位已分类的标题(跳过目录页)
|
|
|
+ located_titles = self._find_title_positions(
|
|
|
+ classified_items,
|
|
|
+ full_text,
|
|
|
+ pages_content,
|
|
|
+ toc_info['toc_pages']
|
|
|
+ )
|
|
|
+
|
|
|
+ # 只保留成功定位的标题
|
|
|
+ found_titles = [t for t in located_titles if t['found']]
|
|
|
+
|
|
|
+ if not found_titles:
|
|
|
+ print(f" 错误: 未能在正文中定位任何标题")
|
|
|
+ return []
|
|
|
+
|
|
|
+ print(f" 成功定位 {len(found_titles)}/{len(classified_items)} 个标题")
|
|
|
+
|
|
|
+ # 按位置排序
|
|
|
+ found_titles.sort(key=lambda x: x['position'])
|
|
|
+
|
|
|
+ # 步骤2: 提取所有层级的目录项,用于在正文块中查找子标题
|
|
|
+ all_toc_items = toc_info['toc_items']
|
|
|
+
|
|
|
+ # 步骤3: 对每个目标层级的标题,提取其正文块并进行智能切分
|
|
|
+ all_chunks = []
|
|
|
+
|
|
|
+ for i, title_info in enumerate(found_titles):
|
|
|
+ start_pos = title_info['position']
|
|
|
+
|
|
|
+ # 确定正文块的结束位置(下一个同级标题的位置)
|
|
|
+ if i + 1 < len(found_titles):
|
|
|
+ end_pos = found_titles[i + 1]['position']
|
|
|
+ else:
|
|
|
+ end_pos = len(full_text)
|
|
|
+
|
|
|
+ # 提取正文块
|
|
|
+ content_block = full_text[start_pos:end_pos]
|
|
|
+
|
|
|
+ # 在正文块中查找子标题(比目标层级更低的层级)
|
|
|
+ sub_chunks = self._split_by_sub_titles(
|
|
|
+ content_block,
|
|
|
+ all_toc_items,
|
|
|
+ title_info,
|
|
|
+ target_level,
|
|
|
+ max_chunk_size,
|
|
|
+ min_chunk_size
|
|
|
+ )
|
|
|
+
|
|
|
+ # 为每个子块添加元数据
|
|
|
+ for j, sub_chunk in enumerate(sub_chunks, 1):
|
|
|
+ # 计算实际页码
|
|
|
+ chunk_start_pos = start_pos + sub_chunk['relative_start']
|
|
|
+ page_num = self._get_page_number(chunk_start_pos, pages_content)
|
|
|
+
|
|
|
+ # 构建section_label(层级路径)
|
|
|
+ section_label = self._build_section_label(
|
|
|
+ title_info['title'],
|
|
|
+ sub_chunk.get('sub_title', '')
|
|
|
+ )
|
|
|
+
|
|
|
+ # 提取最低层级标题的编号
|
|
|
+ sub_title = sub_chunk.get('sub_title', '')
|
|
|
+ if sub_title:
|
|
|
+ title_number = self._extract_title_number(sub_title)
|
|
|
+ else:
|
|
|
+ # 如果没有子标题,从父标题提取
|
|
|
+ title_number = self._extract_title_number(title_info['title'])
|
|
|
+
|
|
|
+ # 构建chunk_id格式:doc_chunk_<serial_number>_<序号>
|
|
|
+ # 序号从1开始(如果合并了会从0开始)
|
|
|
+ chunk_id_str = f"doc_chunk_{title_number}_{j}" if title_number else f"doc_chunk_{j}"
|
|
|
+
|
|
|
+ all_chunks.append({
|
|
|
+ 'file_name': Path(pages_content[0].get('source_file', 'unknown')).name if pages_content else 'unknown',
|
|
|
+ 'chunk_id': chunk_id_str,
|
|
|
+ 'section_label': section_label,
|
|
|
+ 'project_plan_type': 'bridge_up_part',
|
|
|
+ 'element_tag': {
|
|
|
+ 'chunk_id': chunk_id_str,
|
|
|
+ 'page': page_num,
|
|
|
+ 'serial_number': title_number if title_number else str(i + 1)
|
|
|
+ },
|
|
|
+ 'review_chunk_content': sub_chunk['content'],
|
|
|
+ '_title_number': title_number, # 临时存储,用于合并时判断
|
|
|
+ '_local_index': j # 临时存储局部索引
|
|
|
+ })
|
|
|
+
|
|
|
+ # 步骤4: 对小块进行合并
|
|
|
+ merged_chunks = self._merge_small_chunks(all_chunks, max_chunk_size, min_chunk_size)
|
|
|
+
|
|
|
+ # 步骤5: 生成最终的chunk_id和serial_number
|
|
|
+ final_chunks = self._finalize_chunk_ids(merged_chunks)
|
|
|
+
|
|
|
+ print(f" 初始切分: {len(all_chunks)} 个块")
|
|
|
+ print(f" 合并后: {len(merged_chunks)} 个块")
|
|
|
+
|
|
|
+ return final_chunks
|
|
|
+
|
|
|
+ def _find_title_positions(self, classified_items, full_text, pages_content, toc_pages):
|
|
|
+ """在正文中定位已分类的标题位置(跳过目录页)"""
|
|
|
+ # 计算目录页的文本范围
|
|
|
+ toc_start_pos = float('inf')
|
|
|
+ toc_end_pos = 0
|
|
|
+
|
|
|
+ for page in pages_content:
|
|
|
+ if page['page_num'] in toc_pages:
|
|
|
+ toc_start_pos = min(toc_start_pos, page['start_pos'])
|
|
|
+ toc_end_pos = max(toc_end_pos, page['end_pos'])
|
|
|
+
|
|
|
+ print(f" 目录页范围: {toc_start_pos} - {toc_end_pos}")
|
|
|
+
|
|
|
+ located_titles = []
|
|
|
+
|
|
|
+ for item in classified_items:
|
|
|
+ title = item['title']
|
|
|
+ category = item['category']
|
|
|
+ category_code = item.get('category_code', 'other')
|
|
|
+
|
|
|
+ # 在全文中查找标题(使用配置的模糊匹配阈值)
|
|
|
+ fuzzy_threshold = self.config.fuzzy_threshold
|
|
|
+ pos = self._find_title_in_text(title, full_text, fuzzy_threshold=fuzzy_threshold)
|
|
|
+
|
|
|
+ # 如果找到的位置在目录页范围内,继续查找下一个出现
|
|
|
+ if pos >= 0 and toc_start_pos <= pos < toc_end_pos:
|
|
|
+ print(f" [跳过目录] {title} -> 位置: {pos} (在目录页)")
|
|
|
+
|
|
|
+ # 尝试在目录页之后继续查找
|
|
|
+ search_start = toc_end_pos
|
|
|
+ remaining_text = full_text[search_start:]
|
|
|
+ pos_in_remaining = self._find_title_in_text(title, remaining_text, fuzzy_threshold=fuzzy_threshold)
|
|
|
+
|
|
|
+ if pos_in_remaining >= 0:
|
|
|
+ pos = search_start + pos_in_remaining
|
|
|
+ print(f" [找到正文] {title} -> 位置: {pos}")
|
|
|
+ else:
|
|
|
+ pos = -1
|
|
|
+ print(f" [未找到] {title} (目录页之后)")
|
|
|
+
|
|
|
+ if pos >= 0:
|
|
|
+ # 确认位置不在目录页
|
|
|
+ if not (toc_start_pos <= pos < toc_end_pos):
|
|
|
+ # 找到对应的页码
|
|
|
+ page_num = self._get_page_number(pos, pages_content)
|
|
|
+
|
|
|
+ located_titles.append({
|
|
|
+ 'title': title,
|
|
|
+ 'category': category,
|
|
|
+ 'category_code': category_code,
|
|
|
+ 'position': pos,
|
|
|
+ 'toc_page': item.get('page', ''),
|
|
|
+ 'actual_page': page_num,
|
|
|
+ 'found': True
|
|
|
+ })
|
|
|
+ print(f" [确认] {title} -> 页码: {page_num}, 位置: {pos}")
|
|
|
+ else:
|
|
|
+ print(f" [未找到] {title} (只在目录页)")
|
|
|
+ located_titles.append({
|
|
|
+ 'title': title,
|
|
|
+ 'category': category,
|
|
|
+ 'category_code': category_code,
|
|
|
+ 'position': -1,
|
|
|
+ 'toc_page': item.get('page', ''),
|
|
|
+ 'found': False
|
|
|
+ })
|
|
|
+ else:
|
|
|
+ print(f" [未找到] {title}")
|
|
|
+ located_titles.append({
|
|
|
+ 'title': title,
|
|
|
+ 'category': category,
|
|
|
+ 'category_code': category_code,
|
|
|
+ 'position': -1,
|
|
|
+ 'toc_page': item.get('page', ''),
|
|
|
+ 'found': False
|
|
|
+ })
|
|
|
+
|
|
|
+ return located_titles
|
|
|
+
|
|
|
+ def _find_title_in_text(self, title, text, fuzzy_threshold=0.85):
|
|
|
+ """在文本中查找标题的位置"""
|
|
|
+ normalized_title = self._normalize_title(title)
|
|
|
+
|
|
|
+ # 方法1: 精确匹配
|
|
|
+ if normalized_title in text:
|
|
|
+ return text.index(normalized_title)
|
|
|
+
|
|
|
+ # 方法2: 移除所有空格后匹配
|
|
|
+ title_no_space = normalized_title.replace(' ', '')
|
|
|
+ text_no_space = text.replace(' ', '')
|
|
|
+ if title_no_space in text_no_space:
|
|
|
+ pos_no_space = text_no_space.index(title_no_space)
|
|
|
+ return pos_no_space
|
|
|
+
|
|
|
+ # 方法3: 按行查找,匹配度最高的行
|
|
|
+ lines = text.split('\n')
|
|
|
+ current_pos = 0
|
|
|
+ best_ratio = 0
|
|
|
+ best_pos = -1
|
|
|
+
|
|
|
+ for line in lines:
|
|
|
+ line_stripped = line.strip()
|
|
|
+
|
|
|
+ if len(line_stripped) < 3:
|
|
|
+ current_pos += len(line) + 1
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 计算相似度
|
|
|
+ ratio = SequenceMatcher(None, normalized_title, line_stripped).ratio()
|
|
|
+
|
|
|
+ if ratio > best_ratio:
|
|
|
+ best_ratio = ratio
|
|
|
+ best_pos = current_pos
|
|
|
+
|
|
|
+ current_pos += len(line) + 1
|
|
|
+
|
|
|
+ # 如果找到相似度足够高的行
|
|
|
+ if best_ratio >= fuzzy_threshold:
|
|
|
+ return best_pos
|
|
|
+
|
|
|
+ return -1
|
|
|
+
|
|
|
+ def _normalize_title(self, title):
|
|
|
+ """标准化标题用于匹配"""
|
|
|
+ normalized = re.sub(r'\s+', ' ', title)
|
|
|
+ normalized = normalized.strip()
|
|
|
+ return normalized
|
|
|
+
|
|
|
+ def _extract_title_number(self, title):
|
|
|
+ """
|
|
|
+ 从标题中提取编号部分
|
|
|
+
|
|
|
+ 例如:
|
|
|
+ "1.5 施工条件" -> "1.5"
|
|
|
+ "1.6 风险辨识与分级" -> "1.6"
|
|
|
+ "1 工程概况" -> "1"
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ title: 标题字符串
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ str: 编号部分,如果未找到则返回空字符串
|
|
|
+ """
|
|
|
+ # 匹配数字编号格式(如 1.5, 1.6, 1.2.3等)
|
|
|
+ number_match = re.match(r'^(\d+(?:\.\d+)*)', title)
|
|
|
+ if number_match:
|
|
|
+ return number_match.group(1)
|
|
|
+
|
|
|
+ # 匹配中文编号格式(如 一、二、三等)
|
|
|
+ chinese_match = re.match(r'^([一二三四五六七八九十]+)[、..]', title)
|
|
|
+ if chinese_match:
|
|
|
+ return chinese_match.group(1)
|
|
|
+
|
|
|
+ return ""
|
|
|
+
|
|
|
+ def _get_page_number(self, position, pages_content):
|
|
|
+ """根据位置获取页码"""
|
|
|
+ for page in pages_content:
|
|
|
+ if page['start_pos'] <= position < page['end_pos']:
|
|
|
+ return page['page_num']
|
|
|
+ return 1
|
|
|
+
|
|
|
+ def _split_by_sub_titles(self, content_block, all_toc_items, parent_title_info,
|
|
|
+ target_level, max_chunk_size, min_chunk_size):
|
|
|
+ """
|
|
|
+ 在正文块中按子标题进行切分
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ content_block: 正文块内容
|
|
|
+ all_toc_items: 所有目录项
|
|
|
+ parent_title_info: 父标题信息
|
|
|
+ target_level: 目标层级
|
|
|
+ max_chunk_size: 最大分块字符数
|
|
|
+ min_chunk_size: 最小分块字符数
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ list: 子块列表
|
|
|
+ """
|
|
|
+ # 查找比目标层级更低的子标题
|
|
|
+ sub_titles = []
|
|
|
+ fuzzy_threshold = self.config.fuzzy_threshold
|
|
|
+ for toc_item in all_toc_items:
|
|
|
+ if toc_item['level'] > target_level:
|
|
|
+ # 在正文块中查找这个子标题
|
|
|
+ pos = self._find_title_in_text(toc_item['title'], content_block, fuzzy_threshold=fuzzy_threshold)
|
|
|
+ if pos >= 0:
|
|
|
+ sub_titles.append({
|
|
|
+ 'title': toc_item['title'],
|
|
|
+ 'level': toc_item['level'],
|
|
|
+ 'position': pos
|
|
|
+ })
|
|
|
+
|
|
|
+ # 按位置排序
|
|
|
+ sub_titles.sort(key=lambda x: x['position'])
|
|
|
+
|
|
|
+ # 如果没有找到子标题,将整个正文块作为一个块
|
|
|
+ if not sub_titles:
|
|
|
+ # 检查是否需要分割
|
|
|
+ if len(content_block) > max_chunk_size:
|
|
|
+ return self._split_large_chunk(content_block, max_chunk_size, parent_title_info['title'])
|
|
|
+ else:
|
|
|
+ return [{
|
|
|
+ 'content': content_block,
|
|
|
+ 'relative_start': 0,
|
|
|
+ 'sub_title': '',
|
|
|
+ 'serial_number': ''
|
|
|
+ }]
|
|
|
+
|
|
|
+ # 按子标题切分
|
|
|
+ chunks = []
|
|
|
+ for i, sub_title in enumerate(sub_titles):
|
|
|
+ start_pos = sub_title['position']
|
|
|
+
|
|
|
+ # 确定结束位置
|
|
|
+ if i + 1 < len(sub_titles):
|
|
|
+ end_pos = sub_titles[i + 1]['position']
|
|
|
+ else:
|
|
|
+ end_pos = len(content_block)
|
|
|
+
|
|
|
+ chunk_content = content_block[start_pos:end_pos]
|
|
|
+
|
|
|
+ # 检查是否需要分割
|
|
|
+ if len(chunk_content) > max_chunk_size:
|
|
|
+ split_chunks = self._split_large_chunk(chunk_content, max_chunk_size, sub_title['title'])
|
|
|
+ for j, split_chunk in enumerate(split_chunks):
|
|
|
+ split_chunk['relative_start'] = start_pos + split_chunk['relative_start']
|
|
|
+ split_chunk['sub_title'] = sub_title['title']
|
|
|
+ chunks.append(split_chunk)
|
|
|
+ else:
|
|
|
+ chunks.append({
|
|
|
+ 'content': chunk_content,
|
|
|
+ 'relative_start': start_pos,
|
|
|
+ 'sub_title': sub_title['title']
|
|
|
+ })
|
|
|
+
|
|
|
+ return chunks
|
|
|
+
|
|
|
+ def _split_large_chunk(self, content, max_chunk_size, title):
|
|
|
+ """
|
|
|
+ 将超大块按句子级分割(保持语义完整)
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ content: 内容
|
|
|
+ max_chunk_size: 最大分块字符数
|
|
|
+ title: 标题
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ list: 分割后的块列表
|
|
|
+ """
|
|
|
+ # 按句子分割(中文句号、问号、感叹号)
|
|
|
+ sentences = re.split(r'([。!?\n])', content)
|
|
|
+
|
|
|
+ # 重新组合句子和标点
|
|
|
+ combined_sentences = []
|
|
|
+ for i in range(0, len(sentences) - 1, 2):
|
|
|
+ if i + 1 < len(sentences):
|
|
|
+ combined_sentences.append(sentences[i] + sentences[i + 1])
|
|
|
+ else:
|
|
|
+ combined_sentences.append(sentences[i])
|
|
|
+
|
|
|
+ if not combined_sentences:
|
|
|
+ combined_sentences = [content]
|
|
|
+
|
|
|
+ # 按max_chunk_size组合句子
|
|
|
+ chunks = []
|
|
|
+ current_chunk = ""
|
|
|
+ current_start = 0
|
|
|
+
|
|
|
+ for sentence in combined_sentences:
|
|
|
+ if len(current_chunk) + len(sentence) <= max_chunk_size:
|
|
|
+ current_chunk += sentence
|
|
|
+ else:
|
|
|
+ if current_chunk:
|
|
|
+ chunks.append({
|
|
|
+ 'content': current_chunk,
|
|
|
+ 'relative_start': current_start,
|
|
|
+ 'is_split': True # 标记为分割块,不参与合并
|
|
|
+ })
|
|
|
+ current_start += len(current_chunk)
|
|
|
+ current_chunk = sentence
|
|
|
+
|
|
|
+ # 添加最后一个块
|
|
|
+ if current_chunk:
|
|
|
+ chunks.append({
|
|
|
+ 'content': current_chunk,
|
|
|
+ 'relative_start': current_start,
|
|
|
+ 'is_split': True
|
|
|
+ })
|
|
|
+
|
|
|
+ return chunks
|
|
|
+
|
|
|
+ def _merge_small_chunks(self, chunks, max_chunk_size, min_chunk_size):
|
|
|
+ """
|
|
|
+ 合并小于min_chunk_size的块
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ chunks: 块列表
|
|
|
+ max_chunk_size: 最大分块字符数
|
|
|
+ min_chunk_size: 最小分块字符数
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ list: 合并后的块列表
|
|
|
+ """
|
|
|
+ if not chunks:
|
|
|
+ return []
|
|
|
+
|
|
|
+ # 先按最低层级标题编号分组处理(在同一标题内合并)
|
|
|
+ current_title_number = None
|
|
|
+ title_groups = []
|
|
|
+ current_group = []
|
|
|
+
|
|
|
+ for chunk in chunks:
|
|
|
+ title_number = chunk.get('_title_number', '')
|
|
|
+
|
|
|
+ if title_number != current_title_number:
|
|
|
+ # 保存上一组
|
|
|
+ if current_group:
|
|
|
+ title_groups.append({
|
|
|
+ 'title_number': current_title_number,
|
|
|
+ 'chunks': current_group
|
|
|
+ })
|
|
|
+ # 开始新组
|
|
|
+ current_title_number = title_number
|
|
|
+ current_group = [chunk]
|
|
|
+ else:
|
|
|
+ current_group.append(chunk)
|
|
|
+
|
|
|
+ # 保存最后一组
|
|
|
+ if current_group:
|
|
|
+ title_groups.append({
|
|
|
+ 'title_number': current_title_number,
|
|
|
+ 'chunks': current_group
|
|
|
+ })
|
|
|
+
|
|
|
+ # 在每个组内合并小块
|
|
|
+ merged_groups = []
|
|
|
+ for group in title_groups:
|
|
|
+ merged_chunks = self._merge_within_title(group['chunks'], max_chunk_size, min_chunk_size)
|
|
|
+ merged_groups.append({
|
|
|
+ 'title_number': group['title_number'],
|
|
|
+ 'chunks': merged_chunks
|
|
|
+ })
|
|
|
+
|
|
|
+ # 处理跨标题合并:如果上一组的最后一个块与当前组的第一个块都是小块,可以合并
|
|
|
+ final_merged = []
|
|
|
+ for i, group in enumerate(merged_groups):
|
|
|
+ if i == 0:
|
|
|
+ final_merged.extend(group['chunks'])
|
|
|
+ else:
|
|
|
+ # 检查是否可以与上一组的最后一个块合并
|
|
|
+ prev_group = merged_groups[i - 1]
|
|
|
+ if prev_group['chunks'] and group['chunks']:
|
|
|
+ prev_last = prev_group['chunks'][-1]
|
|
|
+ curr_first = group['chunks'][0]
|
|
|
+
|
|
|
+ prev_content = prev_last['review_chunk_content']
|
|
|
+ curr_content = curr_first['review_chunk_content']
|
|
|
+
|
|
|
+ # 如果两个块都是小块且不是分割块,可以合并
|
|
|
+ if (not prev_last.get('is_split', False) and
|
|
|
+ not curr_first.get('is_split', False) and
|
|
|
+ len(prev_content) < min_chunk_size and
|
|
|
+ len(curr_content) < min_chunk_size and
|
|
|
+ len(prev_content) + len(curr_content) <= max_chunk_size):
|
|
|
+
|
|
|
+ # 合并
|
|
|
+ merged_content = prev_content + '\n\n' + curr_content
|
|
|
+ merged_chunk = prev_last.copy()
|
|
|
+ merged_chunk['review_chunk_content'] = merged_content
|
|
|
+ merged_chunk['section_label'] = self._merge_section_labels(
|
|
|
+ prev_last['section_label'],
|
|
|
+ curr_first['section_label']
|
|
|
+ )
|
|
|
+ # 合并标题编号
|
|
|
+ prev_title_num = prev_last.get('_title_number', '')
|
|
|
+ curr_title_num = curr_first.get('_title_number', '')
|
|
|
+ if prev_title_num and curr_title_num and prev_title_num != curr_title_num:
|
|
|
+ # chunk_id中使用+号(无空格)
|
|
|
+ merged_chunk['_title_number'] = f"{prev_title_num}+{curr_title_num}"
|
|
|
+ # serial_number中使用空格(用于显示)
|
|
|
+ merged_chunk['_title_number_display'] = f"{prev_title_num} + {curr_title_num}"
|
|
|
+ merged_chunk['_is_merged'] = True
|
|
|
+
|
|
|
+ # 替换上一组的最后一个块
|
|
|
+ final_merged[-1] = merged_chunk
|
|
|
+ # 跳过当前组的第一个块
|
|
|
+ final_merged.extend(group['chunks'][1:])
|
|
|
+ else:
|
|
|
+ final_merged.extend(group['chunks'])
|
|
|
+ else:
|
|
|
+ final_merged.extend(group['chunks'])
|
|
|
+
|
|
|
+ return final_merged
|
|
|
+
|
|
|
+ def _merge_within_title(self, title_chunks, max_chunk_size, min_chunk_size):
|
|
|
+ """在同一个最低层级标题内合并小块"""
|
|
|
+ if not title_chunks:
|
|
|
+ return []
|
|
|
+
|
|
|
+ merged = []
|
|
|
+ i = 0
|
|
|
+
|
|
|
+ while i < len(title_chunks):
|
|
|
+ current_chunk = title_chunks[i]
|
|
|
+ current_content = current_chunk['review_chunk_content']
|
|
|
+
|
|
|
+ # 如果当前块是分割块,不参与合并
|
|
|
+ if current_chunk.get('is_split', False):
|
|
|
+ merged.append(current_chunk)
|
|
|
+ i += 1
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 如果当前块小于最小值,尝试与下一个块合并
|
|
|
+ if len(current_content) < min_chunk_size and i + 1 < len(title_chunks):
|
|
|
+ next_chunk = title_chunks[i + 1]
|
|
|
+ next_content = next_chunk['review_chunk_content']
|
|
|
+
|
|
|
+ # 检查下一个块是否也是小块且不是分割块
|
|
|
+ if (not next_chunk.get('is_split', False) and
|
|
|
+ len(current_content) + len(next_content) <= max_chunk_size):
|
|
|
+ # 合并
|
|
|
+ merged_content = current_content + '\n\n' + next_content
|
|
|
+ merged_chunk = current_chunk.copy()
|
|
|
+ merged_chunk['review_chunk_content'] = merged_content
|
|
|
+ # 使用优化的标签合并函数
|
|
|
+ merged_chunk['section_label'] = self._merge_section_labels(
|
|
|
+ current_chunk['section_label'],
|
|
|
+ next_chunk['section_label']
|
|
|
+ )
|
|
|
+ merged.append(merged_chunk)
|
|
|
+ i += 2 # 跳过下一个块
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 否则直接添加
|
|
|
+ merged.append(current_chunk)
|
|
|
+ i += 1
|
|
|
+
|
|
|
+ return merged
|
|
|
+
|
|
|
+ def _finalize_chunk_ids(self, chunks):
|
|
|
+ """
|
|
|
+ 生成最终的chunk_id和serial_number
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ chunks: 合并后的块列表
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ list: 最终处理后的块列表
|
|
|
+ """
|
|
|
+ final_chunks = []
|
|
|
+ current_title_number = None
|
|
|
+ local_index = 1
|
|
|
+
|
|
|
+ for i, chunk in enumerate(chunks):
|
|
|
+ title_number = chunk.get('_title_number', '')
|
|
|
+ is_merged = chunk.get('_is_merged', False)
|
|
|
+
|
|
|
+ # 提取标题编号的主要部分(用于判断是否在同一标题内)
|
|
|
+ # 如果包含+号,说明是跨标题合并的块
|
|
|
+ if '+' in str(title_number):
|
|
|
+ # 跨标题合并的块,序号从0开始
|
|
|
+ local_index = 0
|
|
|
+ # chunk_id中使用+号(无空格),如"1.5+1.6"
|
|
|
+ merged_title_number = title_number
|
|
|
+ # serial_number中使用空格,如"1.5 + 1.6"
|
|
|
+ serial_number_display = chunk.get('_title_number_display', title_number.replace('+', ' + '))
|
|
|
+ # 更新current_title_number为合并后的编号,这样下一个块会重新开始
|
|
|
+ current_title_number = title_number
|
|
|
+ else:
|
|
|
+ # 如果标题编号变化,重置索引
|
|
|
+ if title_number != current_title_number:
|
|
|
+ current_title_number = title_number
|
|
|
+ # 如果上一个块是跨标题合并的,说明当前标题的第一个块已经被合并了,序号从1开始
|
|
|
+ # 否则序号从1开始
|
|
|
+ local_index = 1
|
|
|
+ else:
|
|
|
+ local_index += 1
|
|
|
+ merged_title_number = title_number
|
|
|
+ serial_number_display = title_number
|
|
|
+
|
|
|
+ # 生成chunk_id(使用无空格的编号)
|
|
|
+ if merged_title_number:
|
|
|
+ chunk_id_str = f"doc_chunk_{merged_title_number}_{local_index}"
|
|
|
+ else:
|
|
|
+ chunk_id_str = f"doc_chunk_{local_index}"
|
|
|
+
|
|
|
+ # 更新chunk数据
|
|
|
+ final_chunk = {
|
|
|
+ 'file_name': chunk['file_name'],
|
|
|
+ 'chunk_id': chunk_id_str,
|
|
|
+ 'section_label': chunk['section_label'],
|
|
|
+ 'project_plan_type': 'bridge_up_part',
|
|
|
+ 'element_tag': {
|
|
|
+ 'chunk_id': chunk_id_str,
|
|
|
+ 'page': chunk['element_tag']['page'],
|
|
|
+ 'serial_number': serial_number_display if merged_title_number else ''
|
|
|
+ },
|
|
|
+ 'review_chunk_content': chunk['review_chunk_content']
|
|
|
+ }
|
|
|
+
|
|
|
+ final_chunks.append(final_chunk)
|
|
|
+
|
|
|
+ return final_chunks
|
|
|
+
|
|
|
+ def _build_section_label(self, parent_title, sub_title):
|
|
|
+ """构建section_label(层级路径)"""
|
|
|
+ if sub_title:
|
|
|
+ return f"{parent_title}->{sub_title}"
|
|
|
+ else:
|
|
|
+ return parent_title
|
|
|
+
|
|
|
+ def _merge_section_labels(self, label1, label2):
|
|
|
+ """
|
|
|
+ 合并两个section_label,提取公共前缀
|
|
|
+
|
|
|
+ 例如:
|
|
|
+ "1 工程概况->1.3 工程地质" + "1 工程概况->1.4 气象水文"
|
|
|
+ => "1 工程概况->1.3 工程地质 + 1.4 气象水文"
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ label1: 第一个标签
|
|
|
+ label2: 第二个标签
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ str: 合并后的标签
|
|
|
+ """
|
|
|
+ # 按"->"分割标签
|
|
|
+ parts1 = label1.split('->')
|
|
|
+ parts2 = label2.split('->')
|
|
|
+
|
|
|
+ # 找到公共前缀
|
|
|
+ common_prefix = []
|
|
|
+ for i in range(min(len(parts1), len(parts2))):
|
|
|
+ if parts1[i] == parts2[i]:
|
|
|
+ common_prefix.append(parts1[i])
|
|
|
+ else:
|
|
|
+ break
|
|
|
+
|
|
|
+ # 如果有公共前缀
|
|
|
+ if common_prefix:
|
|
|
+ # 获取不同的部分
|
|
|
+ diff1 = '->'.join(parts1[len(common_prefix):])
|
|
|
+ diff2 = '->'.join(parts2[len(common_prefix):])
|
|
|
+
|
|
|
+ # 构建合并后的标签
|
|
|
+ prefix = '->'.join(common_prefix)
|
|
|
+ if diff1 and diff2:
|
|
|
+ return f"{prefix}->{diff1} + {diff2}"
|
|
|
+ elif diff1:
|
|
|
+ return f"{prefix}->{diff1}"
|
|
|
+ elif diff2:
|
|
|
+ return f"{prefix}->{diff2}"
|
|
|
+ else:
|
|
|
+ return prefix
|
|
|
+ else:
|
|
|
+ # 没有公共前缀,直接用+连接
|
|
|
+ return f"{label1} + {label2}"
|
|
|
+
|