Prechádzať zdrojové kódy

dev:扩展了文件分块的分类字段;

ChenJiSheng 2 mesiacov pred
rodič
commit
d4de1168d6

+ 38 - 4
core/construction_review/component/doc_worker/docx_worker/text_splitter.py

@@ -17,6 +17,7 @@ from ..interfaces import TextSplitter
 from ..utils.title_matcher import TitleMatcher
 
 
+
 class DocxTextSplitter(TextSplitter):
     """按目录层级对 DOCX 正文进行智能分块的实现"""
 
@@ -63,14 +64,26 @@ class DocxTextSplitter(TextSplitter):
         # 按位置排序
         found_titles.sort(key=lambda x: x["position"])
 
-        # 步骤2: 为每个找到的标题构建完整的层级路径
+        # 步骤2: 构建一级目录标题到分类信息的映射
+        chapter_classification_map: Dict[str, Dict[str, Any]] = {}
+        for item in classification_items:
+            if item.get("level") == 1:
+                chapter_title = item.get("title", "")
+                chapter_classification_map[chapter_title] = {
+                    "category": item.get("category", ""),
+                    "category_code": item.get("category_code", "other"),
+                    "page": item.get("page", ""),
+                    "level": item.get("level", 1),
+                }
+
+        # 步骤3: 为每个找到的标题构建完整的层级路径
         for title_info in found_titles:
             hierarchy_path = self._build_hierarchy_path(
                 title_info["title"], all_toc_items, target_level
             )
             title_info["hierarchy_path"] = hierarchy_path
 
-        # 步骤3: 按目录层级处理每个标题块
+        # 步骤4: 按目录层级处理每个标题块
         all_chunks: List[Dict[str, Any]] = []
         
         for i, title_info in enumerate(found_titles):
@@ -98,7 +111,7 @@ class DocxTextSplitter(TextSplitter):
             # 为每个子块添加元数据
             for j, sub_chunk in enumerate(sub_chunks, 1):
                 chunk_data = self._build_chunk_metadata(
-                    sub_chunk, title_info, start_pos, pages_content, i, j
+                    sub_chunk, title_info, start_pos, pages_content, i, j, chapter_classification_map
                 )
                 all_chunks.append(chunk_data)
 
@@ -389,6 +402,7 @@ class DocxTextSplitter(TextSplitter):
         pages_content: List[Dict[str, Any]],
         i: int,
         j: int,
+        chapter_classification_map: Dict[str, Dict[str, Any]] = None,
     ) -> Dict[str, Any]:
         """构建文本块的元数据"""
         content = sub_chunk["content"]
@@ -415,11 +429,24 @@ class DocxTextSplitter(TextSplitter):
 
         chunk_id_str = f"doc_chunk_{title_number}_{j}" if title_number else f"doc_chunk_{j}"
 
-        return {
+        # 获取一级目录的分类信息
+        chapter_classification = None
+        if chapter_classification_map:
+            # 从hierarchy_path获取一级目录标题
+            if hierarchy_path and len(hierarchy_path) > 0:
+                chapter_title = hierarchy_path[0]
+                chapter_classification = chapter_classification_map.get(chapter_title)
+            elif not hierarchy_path:
+                # 如果没有hierarchy_path,尝试从title_info获取
+                chapter_title = title_info.get("title", "")
+                chapter_classification = chapter_classification_map.get(chapter_title)
+
+        chunk_data = {
             "file_name": "",
             "chunk_id": chunk_id_str,
             "section_label": section_label,
             "project_plan_type": title_info.get("category_code", "other"),
+            "chapter_classification": title_info.get("category_code", "other"),
             "element_tag": {
                 "chunk_id": chunk_id_str,
                 "page": page_num,
@@ -431,6 +458,12 @@ class DocxTextSplitter(TextSplitter):
             "_sort_key": chunk_start_pos,
         }
 
+        # 如果找到了一级目录的分类信息,添加到chunk中
+        if chapter_classification:
+            chunk_data["chapter_classification"] = chapter_classification
+
+        return chunk_data
+
     def _finalize_chunk_ids(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
         """生成最终的chunk_id和serial_number"""
         final_chunks = []
@@ -459,6 +492,7 @@ class DocxTextSplitter(TextSplitter):
                 "chunk_id": chunk_id_str,
                 "section_label": chunk["section_label"],
                 "project_plan_type": chunk["project_plan_type"],
+                "chapter_classification": chunk["chapter_classification"],
                 "element_tag": {
                     "chunk_id": chunk_id_str,
                     "page": chunk["element_tag"]["page"],

+ 671 - 0
core/construction_review/component/doc_worker/pdf_worker/text_splitter.py

@@ -18,7 +18,678 @@ from ..interfaces import TextSplitter
 from ..utils.title_matcher import TitleMatcher
 
 
+
 class PdfTextSplitter(TextSplitter):
+    """按目录层级对 PDF 正文进行智能分块的实现(复刻 doc_worker 逻辑)。"""
+
+    def __init__(self) -> None:
+        self._cfg = default_config_provider
+        self._title_matcher = TitleMatcher()
+
+    def split_by_hierarchy(
+        self,
+        classification_items: List[Dict[str, Any]],
+        pages_content: List[Dict[str, Any]],
+        toc_info: Dict[str, Any],
+        target_level: int,
+        max_chunk_size: int,
+        min_chunk_size: int,
+    ) -> List[Dict[str, Any]]:
+        """
+        按目录层级和字符数智能切分文本
+        
+        新的分块逻辑:
+        1. 跳过目录页,按目录项定位到指定层级的正文标题
+        2. 在指定层级正文标题所属的正文块中,先按目录项的最低层级子标题进行分块
+        3. 对每个块按字符数判断:
+           - 超过max_chunk_size的进行句子级分割(保持语义尽量完整)
+        """
+        toc_pages = toc_info.get("toc_pages", []) or []
+        all_toc_items = toc_info.get("toc_items", [])
+        
+        # 使用完整全文
+        full_text = "".join(p.get("text", "") for p in pages_content)
+
+        print(f"  正在定位{len(classification_items)}个已分类的标题...")
+        print(f"  目录所在页: {toc_pages}")
+
+        # 步骤1: 在正文中定位已分类的标题(跳过目录页)
+        located = self._title_matcher.find_title_positions(
+            classification_items, full_text, pages_content, toc_pages
+        )
+        
+        # 只保留成功定位的标题
+        found_titles = [t for t in located if t["found"]]
+        if not found_titles:
+            print(f"  错误: 未能在正文中定位任何标题")
+            return []
+
+        print(f"  成功定位 {len(found_titles)}/{len(classification_items)} 个标题")
+        
+        # 按位置排序
+        found_titles.sort(key=lambda x: x["position"])
+
+        # 步骤2: 构建一级目录标题到分类信息的映射
+        chapter_classification_map: Dict[str, Dict[str, Any]] = {}
+        for item in classification_items:
+            if item.get("level") == 1:
+                chapter_title = item.get("title", "")
+                chapter_classification_map[chapter_title] = {
+                    "category": item.get("category", ""),
+                    "category_code": item.get("category_code", "other"),
+                    "page": item.get("page", ""),
+                    "level": item.get("level", 1),
+                }
+
+        # 步骤3: 为每个找到的标题构建完整的层级路径
+        for title_info in found_titles:
+            hierarchy_path = self._build_hierarchy_path(
+                title_info["title"], all_toc_items, target_level
+            )
+            title_info["hierarchy_path"] = hierarchy_path
+
+        # 步骤4: 按目录层级处理每个标题块
+        all_chunks: List[Dict[str, Any]] = []
+        
+        for i, title_info in enumerate(found_titles):
+            start_pos = title_info["position"]
+            
+            # 确定正文块的结束位置(下一个同级标题的位置)
+            if i + 1 < len(found_titles):
+                end_pos = found_titles[i + 1]["position"]
+            else:
+                end_pos = len(full_text)
+            
+            # 提取正文块
+            content_block = full_text[start_pos:end_pos]
+            
+            # 在正文块中查找子标题(按最低层级切分)
+            sub_chunks = self._split_by_sub_titles(
+                content_block,
+                all_toc_items,
+                title_info,
+                target_level,
+                max_chunk_size,
+                min_chunk_size,
+            )
+            
+            # 为每个子块添加元数据
+            for j, sub_chunk in enumerate(sub_chunks, 1):
+                chunk_data = self._build_chunk_metadata(
+                    sub_chunk, title_info, start_pos, pages_content, i, j, chapter_classification_map
+                )
+                all_chunks.append(chunk_data)
+
+        # 步骤4: 生成最终的chunk_id和serial_number
+        final_chunks = self._finalize_chunk_ids(all_chunks)
+
+        print(f"  初始切分: {len(all_chunks)} 个块")
+        print(f"  最终块数: {len(final_chunks)} 个块")
+
+        return final_chunks
+
+    def _split_by_sub_titles(
+        self,
+        content_block: str,
+        all_toc_items: List[Dict[str, Any]],
+        parent_title_info: Dict[str, Any],
+        target_level: int,
+        max_chunk_size: int,
+        min_chunk_size: int,
+    ) -> List[Dict[str, Any]]:
+        """
+        在正文块中按子标题进行切分(按照toc_items的顺序和层级关系)
+        
+        核心逻辑:
+        1. 查找所有层级的子标题(不限于直接子标题)
+        2. 按位置排序后,两个相邻子标题之间的内容作为一个块
+        3. 只有当块超过 max_chunk_size 时才按句子切分
+        """
+        # 找到父标题在toc_items中的位置
+        parent_title = parent_title_info["title"]
+        parent_idx = -1
+        parent_level = target_level
+        
+        for idx, toc_item in enumerate(all_toc_items):
+            if toc_item["title"] == parent_title:
+                parent_idx = idx
+                parent_level = toc_item.get("level", target_level)
+                break
+
+        if parent_idx < 0:
+            # 如果找不到父标题,将整个正文块作为一个块
+            if len(content_block) > max_chunk_size:
+                return self._split_large_chunk(content_block, max_chunk_size, parent_title, [])
+            else:
+                return [
+                    {
+                        "content": content_block,
+                        "relative_start": 0,
+                        "sub_title": "",
+                        "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
+                    }
+                ]
+
+        # 找到下一个同级或更高级标题的位置(确定父标题的范围)
+        next_sibling_idx = len(all_toc_items)
+        for idx in range(parent_idx + 1, len(all_toc_items)):
+            item = all_toc_items[idx]
+            if item.get("level", 1) <= parent_level:
+                next_sibling_idx = idx
+                break
+
+        # 查找所有子标题(所有 level > parent_level 的标题)
+        # 这是关键:不限于直接子标题,而是所有更深层级的标题
+        all_sub_titles = []
+        fuzzy_threshold = float(self._cfg.get("text_splitting.fuzzy_threshold", 0.8))
+
+        for idx in range(parent_idx + 1, next_sibling_idx):
+            toc_item = all_toc_items[idx]
+            item_level = toc_item.get("level", 1)
+            
+            # 查找所有更深层级的子标题
+            if item_level > parent_level:
+                # 在正文块中查找这个子标题
+                pos = self._find_title_in_block(
+                    toc_item["title"], content_block, fuzzy_threshold
+                )
+                if pos >= 0:
+                    # 调试:显示找到的标题及其周围内容
+                    context_start = max(0, pos - 20)
+                    context_end = min(len(content_block), pos + len(toc_item["title"]) + 50)
+                    context = content_block[context_start:context_end].replace("\n", " ")
+                    print(f"        找到子标题: {toc_item['title']} (level={item_level}), 位置={pos}, 上下文: ...{context}...")
+                    
+                    all_sub_titles.append(
+                        {
+                            "title": toc_item["title"],
+                            "level": toc_item["level"],
+                            "position": pos,
+                            "toc_index": idx,
+                            "toc_item": toc_item,
+                        }
+                    )
+
+        # 按位置排序
+        all_sub_titles.sort(key=lambda x: x["position"])
+
+        # 如果没有找到任何子标题,将整个正文块作为一个块
+        if not all_sub_titles:
+            if len(content_block) > max_chunk_size:
+                return self._split_large_chunk(
+                    content_block, max_chunk_size, parent_title, 
+                    parent_title_info.get("hierarchy_path", [parent_title])
+                )
+            else:
+                return [
+                    {
+                        "content": content_block,
+                        "relative_start": 0,
+                        "sub_title": "",
+                        "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
+                    }
+                ]
+
+        # 找到最低层级(用于判断哪些是最底层的标题)
+        max_level = max(sub["level"] for sub in all_sub_titles)
+        
+        # 只保留最低层级的标题作为切分点
+        lowest_level_titles = [sub for sub in all_sub_titles if sub["level"] == max_level]
+        
+        print(f"      父标题: {parent_title}, 找到 {len(all_sub_titles)} 个子标题, 最低层级: {max_level}, 最低层级标题数: {len(lowest_level_titles)}")
+
+        # 按最低层级标题切分
+        chunks = []
+        for i, sub_title in enumerate(lowest_level_titles):
+            start_pos = sub_title["position"]
+
+            # 确定结束位置(下一个最低层级标题的位置)
+            if i + 1 < len(lowest_level_titles):
+                end_pos = lowest_level_titles[i + 1]["position"]
+            else:
+                end_pos = len(content_block)
+
+            chunk_content = content_block[start_pos:end_pos]
+            
+            # 调试信息
+            content_preview = chunk_content[:100].replace("\n", " ")
+            print(f"        切分块 {i+1}: {sub_title['title']}, 位置: {start_pos}-{end_pos}, 长度: {len(chunk_content)}, 预览: {content_preview}...")
+
+            # 检查子标题是否有实际正文内容
+            title_len = len(sub_title["title"])
+            content_after_title = chunk_content[title_len:].strip()
+
+            if not content_after_title or len(content_after_title) < 10:
+                print(f"        跳过(内容不足)")
+                continue
+
+            # 构建层级路径
+            hierarchy_path = self._build_hierarchy_path_for_subtitle(
+                sub_title["toc_item"], all_toc_items, parent_title_info
+            )
+
+            # 只有当块超过 max_chunk_size 时才按句子切分
+            if len(chunk_content) > max_chunk_size:
+                print(f"        块过大,按句子切分")
+                split_chunks = self._split_large_chunk(
+                    chunk_content, max_chunk_size, sub_title["title"], hierarchy_path
+                )
+                for split_chunk in split_chunks:
+                    split_chunk["relative_start"] = start_pos + split_chunk["relative_start"]
+                    split_chunk["sub_title"] = sub_title["title"]
+                    if "hierarchy_path" not in split_chunk:
+                        split_chunk["hierarchy_path"] = hierarchy_path
+                    chunks.append(split_chunk)
+            else:
+                # 直接作为一个块
+                chunks.append(
+                    {
+                        "content": chunk_content,
+                        "relative_start": start_pos,
+                        "sub_title": sub_title["title"],
+                        "hierarchy_path": hierarchy_path,
+                    }
+                )
+
+        # 如果所有子标题都没有正文内容,返回整个正文块
+        if not chunks:
+            if len(content_block) > max_chunk_size:
+                return self._split_large_chunk(
+                    content_block, max_chunk_size, parent_title,
+                    parent_title_info.get("hierarchy_path", [parent_title])
+                )
+            else:
+                return [
+                    {
+                        "content": content_block,
+                        "relative_start": 0,
+                        "sub_title": "",
+                        "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
+                    }
+                ]
+
+        return chunks
+
+    def _find_title_in_block(self, title: str, block: str, fuzzy_threshold: float) -> int:
+        """在文本块中查找标题位置(简化版)"""
+        # 直接使用 TitleMatcher 的方法
+        return self._title_matcher._find_title_in_text(title, block, fuzzy_threshold)
+
+    def _split_large_chunk(
+        self,
+        content: str,
+        max_chunk_size: int,
+        title: str,
+        hierarchy_path: List[str] | None = None,
+    ) -> List[Dict[str, Any]]:
+        """
+        将超大块按句子级分割(保持语义完整)
+        """
+        # 按句子分割(中文句号、问号、感叹号、换行)
+        sentences = re.split(r"([。!?\n])", content)
+
+        # 重新组合句子和标点
+        combined_sentences = []
+        for i in range(0, len(sentences) - 1, 2):
+            if i + 1 < len(sentences):
+                combined_sentences.append(sentences[i] + sentences[i + 1])
+            else:
+                combined_sentences.append(sentences[i])
+
+        if not combined_sentences:
+            combined_sentences = [content]
+
+        # 按max_chunk_size组合句子
+        chunks = []
+        current_chunk = ""
+        current_start = 0
+
+        for sentence in combined_sentences:
+            if len(current_chunk) + len(sentence) <= max_chunk_size:
+                current_chunk += sentence
+            else:
+                if current_chunk:
+                    chunk_data = {
+                        "content": current_chunk,
+                        "relative_start": current_start,
+                        "is_split": True,  # 标记为分割块
+                    }
+                    if hierarchy_path is not None:
+                        chunk_data["hierarchy_path"] = hierarchy_path
+                    chunks.append(chunk_data)
+                    current_start += len(current_chunk)
+                current_chunk = sentence
+
+        # 添加最后一个块
+        if current_chunk:
+            chunk_data = {
+                "content": current_chunk,
+                "relative_start": current_start,
+                "is_split": True,
+            }
+            if hierarchy_path is not None:
+                chunk_data["hierarchy_path"] = hierarchy_path
+            chunks.append(chunk_data)
+
+        return chunks
+
+    def _build_hierarchy_path_for_subtitle(
+        self,
+        sub_title_item: Dict[str, Any],
+        all_toc_items: List[Dict[str, Any]],
+        parent_title_info: Dict[str, Any],
+    ) -> List[str]:
+        """为子标题构建完整的层级路径"""
+        hierarchy_path = []
+
+        # 找到子标题在toc_items中的位置
+        sub_title = sub_title_item.get("title", "")
+        sub_title_idx = -1
+        for idx, item in enumerate(all_toc_items):
+            if item.get("title", "") == sub_title:
+                sub_title_idx = idx
+                break
+
+        if sub_title_idx < 0:
+            # 如果找不到,返回父标题->子标题
+            return [parent_title_info["title"], sub_title]
+
+        # 从子标题向前查找,找到每个层级的父级标题
+        level_paths = {}  # 存储每个层级对应的标题
+        current_level = sub_title_item.get("level", 2)
+
+        for i in range(sub_title_idx, -1, -1):
+            item = all_toc_items[i]
+            item_level = item.get("level", 1)
+
+            if item_level <= current_level and item_level not in level_paths:
+                level_paths[item_level] = item["title"]
+                if item_level == 1:
+                    break
+
+        # 按层级顺序构建路径(从1级到当前层级)
+        for level in range(1, current_level + 1):
+            if level in level_paths:
+                hierarchy_path.append(level_paths[level])
+
+        # 如果路径为空,至少包含父标题和子标题
+        if not hierarchy_path:
+            hierarchy_path = [parent_title_info["title"], sub_title]
+
+        return hierarchy_path
+
+    def _build_hierarchy_path(
+        self, title: str, all_toc_items: List[Dict[str, Any]], target_level: int
+    ) -> List[str]:
+        """构建从1级到当前标题的完整层级路径"""
+        hierarchy_path = []
+
+        # 找到当前标题在目录中的位置
+        current_item = None
+        current_idx = -1
+        for idx, item in enumerate(all_toc_items):
+            if item["title"] == title:
+                current_item = item
+                current_idx = idx
+                break
+
+        if not current_item:
+            # 如果找不到,返回只包含当前标题的路径
+            return [title]
+
+        current_level = current_item.get("level", target_level)
+
+        # 从当前项向前查找,找到每个层级的最近父级
+        level_paths = {}  # 存储每个层级对应的标题
+
+        for i in range(current_idx, -1, -1):
+            item = all_toc_items[i]
+            item_level = item.get("level", 1)
+
+            if item_level <= current_level and item_level not in level_paths:
+                level_paths[item_level] = item["title"]
+                if item_level == 1:
+                    break
+
+        # 按层级顺序构建路径(从1级到当前层级)
+        for level in range(1, current_level + 1):
+            if level in level_paths:
+                hierarchy_path.append(level_paths[level])
+            elif level == current_level:
+                hierarchy_path.append(title)
+
+        # 如果路径为空,至少包含当前标题
+        if not hierarchy_path:
+            hierarchy_path = [title]
+
+        return hierarchy_path
+
+    def _build_chunk_metadata(
+        self,
+        sub_chunk: Dict[str, Any],
+        title_info: Dict[str, Any],
+        start_pos: int,
+        pages_content: List[Dict[str, Any]],
+        i: int,
+        j: int,
+        chapter_classification_map: Dict[str, Dict[str, Any]] = None,
+    ) -> Dict[str, Any]:
+        """构建文本块的元数据"""
+        content = sub_chunk["content"]
+        chunk_start_pos = start_pos + sub_chunk["relative_start"]
+        page_num = self._get_page_from_pos(chunk_start_pos, pages_content)
+
+        # 构建section_label:使用完整的层级路径
+        hierarchy_path = sub_chunk.get("hierarchy_path", [])
+        sub_title = sub_chunk.get("sub_title", "")
+
+        if hierarchy_path:
+            section_label = "->".join(hierarchy_path)
+        elif sub_title:
+            section_label = f"{title_info['title']}->{sub_title}"
+        else:
+            section_label = title_info["title"]
+
+        # 提取最低层级标题的编号
+        if hierarchy_path:
+            lowest_title = hierarchy_path[-1]
+            title_number = self._extract_title_number(lowest_title)
+        elif sub_title:
+            title_number = self._extract_title_number(sub_title)
+        else:
+            title_number = self._extract_title_number(title_info["title"])
+
+        # 构建chunk_id
+        chunk_id_str = f"doc_chunk_{title_number}_{j}" if title_number else f"doc_chunk_{j}"
+
+        # 获取一级目录的分类信息
+        chapter_classification = None
+        if chapter_classification_map:
+            # 从hierarchy_path获取一级目录标题
+            if hierarchy_path and len(hierarchy_path) > 0:
+                chapter_title = hierarchy_path[0]
+                chapter_classification = chapter_classification_map.get(chapter_title)
+            elif not hierarchy_path:
+                # 如果没有hierarchy_path,尝试从title_info获取
+                chapter_title = title_info.get("title", "")
+                chapter_classification = chapter_classification_map.get(chapter_title)
+
+        chunk_data = {
+            "file_name": "",  # 由上层填充
+            "chunk_id": chunk_id_str,
+            "section_label": section_label,
+            "project_plan_type": title_info.get("category_code", "other"),
+            "chapter_classification": title_info.get("category_code", "other"),
+            "element_tag": {
+                "chunk_id": chunk_id_str,
+                "page": page_num,
+                "serial_number": title_number if title_number else str(i + 1),
+            },
+            "review_chunk_content": content,
+            "_title_number": title_number,
+            "_local_index": j,
+            "_sort_key": chunk_start_pos,
+        }
+
+        # 如果找到了一级目录的分类信息,添加到chunk中
+        if chapter_classification:
+            chunk_data["chapter_classification"] = chapter_classification
+
+        return chunk_data
+
+    def _finalize_chunk_ids(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+        """生成最终的chunk_id和serial_number"""
+        final_chunks = []
+        
+        # 按 section_label 分组,为每组内的块生成递增的序号
+        section_groups: Dict[str, int] = {}  # section_label -> 当前序号
+
+        for chunk in chunks:
+            section_label = chunk.get("section_label", "")
+            
+            # 为当前 section_label 生成序号
+            if section_label not in section_groups:
+                section_groups[section_label] = 1
+            else:
+                section_groups[section_label] += 1
+            
+            local_index = section_groups[section_label]
+
+            # 从section_label中提取标题路径的编号路径
+            title_number_path = self._extract_title_number_path(section_label)
+
+            # 生成chunk_id:doc_chunk_<标题路径的编号路径>_序号
+            if title_number_path:
+                chunk_id_str = f"doc_chunk_{title_number_path}_{local_index}"
+            else:
+                chunk_id_str = f"doc_chunk_{local_index}"
+
+            # 从section_label中提取最底层级的编号(用于 serial_number)
+            serial_number = self._extract_number_from_section_label(section_label)
+
+            # 更新chunk数据
+            final_chunk = {
+                "file_name": chunk["file_name"],
+                "chunk_id": chunk_id_str,
+                "section_label": chunk["section_label"],
+                "project_plan_type": chunk["project_plan_type"],
+                "chapter_classification": chunk["chapter_classification"],
+                "element_tag": {
+                    "chunk_id": chunk_id_str,
+                    "page": chunk["element_tag"]["page"],
+                    "serial_number": serial_number,
+                },
+                "review_chunk_content": chunk["review_chunk_content"],
+            }
+
+            final_chunks.append(final_chunk)
+
+        return final_chunks
+
+    def _get_page_from_pos(self, pos: int, pages_content: List[Dict[str, Any]]) -> int:
+        """根据位置获取页码"""
+        for page in pages_content:
+            if page["start_pos"] <= pos < page["end_pos"]:
+                return int(page["page_num"])
+        return 1
+
+    def _extract_title_number(self, title: str) -> str:
+        """从标题中提取编号部分(支持多种格式)"""
+        if not title:
+            return ""
+        
+        # 匹配章节格式(如 第一章、第1章等)
+        chapter_match = re.match(r"^(第[一二三四五六七八九十\d]+[章节条款部分])", title)
+        if chapter_match:
+            return chapter_match.group(1)
+        
+        # 匹配方括号数字格式(如 【1】、【2】等)
+        bracket_match = re.match(r"^(【\d+】)", title)
+        if bracket_match:
+            return bracket_match.group(1)
+        
+        # 匹配双方括号数字格式(如 〖1.1〗、〖2.3〗等)
+        double_bracket_match = re.match(r"^(〖\d+(?:\.\d+)*〗)", title)
+        if double_bracket_match:
+            return double_bracket_match.group(1)
+        
+        # 匹配数字编号格式(如 1.5, 1.6, 1.2.3等)
+        number_match = re.match(r"^(\d+(?:\.\d+)*)", title)
+        if number_match:
+            return number_match.group(1)
+        
+        # 匹配中文编号格式(如 一、二、三等)
+        chinese_match = re.match(r"^([一二三四五六七八九十]+)[、..)\)]", title)
+        if chinese_match:
+            return chinese_match.group(1)
+        
+        # 匹配圆括号编号格式(如 (1)、(一)等)
+        paren_match = re.match(r"^([\((][一二三四五六七八九十\d]+[\))])", title)
+        if paren_match:
+            return paren_match.group(1)
+        
+        return ""
+
+    def _extract_title_number_path(self, section_label: str) -> str:
+        """从section_label中提取标题路径的编号路径"""
+        if not section_label:
+            return ""
+
+        # 按"->"分割层级路径
+        parts = section_label.split("->")
+
+        # 提取每一层的编号
+        number_paths = []
+        for part in parts:
+            part = part.strip()
+            if part:
+                number = self._extract_title_number(part)
+                if number:
+                    number_paths.append(number)
+
+        # 用"->"连接编号路径
+        if number_paths:
+            return "->".join(number_paths)
+
+        return ""
+
+    def _extract_number_from_section_label(self, section_label: str) -> str:
+        """
+        从section_label中提取最底层级的编号
+        
+        例如:
+        "第一章 编制依据与说明->一) 编制依据" -> "一)"
+        "第二章 工程概况->二)周边环境条件及工程地质->1、周边环境条件" -> "1"
+        "第四章 施工工艺技术->一)主要部件说明->2、前临时支腿" -> "2"
+        """
+        if not section_label:
+            return ""
+
+        # 先找到最低层级部分(最后一个"->"后面的部分)
+        if "->" in section_label:
+            last_level_part = section_label.split("->")[-1].strip()
+        else:
+            last_level_part = section_label.strip()
+
+        # 检查最低层级部分是否包含合并标记(" + ")
+        if " + " in last_level_part:
+            # 分割合并的部分
+            merged_parts = last_level_part.split(" + ")
+            numbers = []
+            for part in merged_parts:
+                part = part.strip()
+                number = self._extract_title_number(part)
+                if number:
+                    numbers.append(number)
+
+            if numbers:
+                return "+".join(numbers)
+
+        # 没有合并的情况,直接提取最低层级的编号
+        return self._extract_title_number(last_level_part)
+
+
+
     """按目录层级对 PDF 正文进行智能分块的实现(复刻 doc_worker 逻辑)。"""
 
     def __init__(self) -> None: