Эх сурвалжийг харах

v0.0.4-功能优化:
- 扩展了章节切分逻辑,确保没有章节块遗漏;

ChenJiSheng 2 сар өмнө
parent
commit
3f09325910

+ 3 - 2
.gitignore

@@ -70,9 +70,10 @@ todo.md
 temp/
 *.json
 test_rawdata/
-temp\AI审查结果.json
+temp/AI审查结果.json
 mineru_temp/
 config/config.ini
 路桥/
 output/
-命令
+命令
+/core/construction_review/component/doc_worker/utils/llm_client copy.py

+ 2 - 2
core/construction_review/component/doc_worker/config/llm_api.yaml

@@ -16,8 +16,8 @@ doubao:
   DOUBAO_API_KEY: YOUR_DOUBAO_API_KEY_FOR_RAG_EVAL
 
 qwen:
-  QWEN_SERVER_URL: https://aqai.shudaodsj.com:22000/v1/
-  QWEN_MODEL_ID: Qwen/Qwen3-30B-A3B-Instruct-2507
+  QWEN_SERVER_URL: http://192.168.91.253:9002/v1/
+  QWEN_MODEL_ID: Qwen/Qwen3-8B
   QWEN_API_KEY: ms-9ad4a379-d592-4acd-b92c-8bac08a4a045
 
 keywords:

+ 63 - 53
core/construction_review/component/doc_worker/docx_worker/text_splitter.py

@@ -224,67 +224,77 @@ class DocxTextSplitter(TextSplitter, HierarchicalChunkMixin):
                     }
                 ]
 
-        # 找到最低层级(用于判断哪些是最底层的标题)
-        max_level = max(sub["level"] for sub in all_sub_titles)
+        # 找到直接子标题(parent_level + 1)和所有更深层级的标题
+        direct_child_level = parent_level + 1
+        direct_child_titles = [sub for sub in all_sub_titles if sub["level"] == direct_child_level]
         
-        # 只保留最低层级的标题作为切分点
-        lowest_level_titles = [sub for sub in all_sub_titles if sub["level"] == max_level]
+        # 找到最低层级(用于判断哪些是最底层的标题)
+        max_level = max(sub["level"] for sub in all_sub_titles) if all_sub_titles else parent_level
         
-        print(f"      父标题: {parent_title}, 找到 {len(all_sub_titles)} 个子标题, 最低层级: {max_level}, 最低层级标题数: {len(lowest_level_titles)}")
+        print(f"      父标题: {parent_title}, 找到 {len(all_sub_titles)} 个子标题, 直接子标题数: {len(direct_child_titles)}, 最低层级: {max_level}")
+
+        # 如果没有直接子标题,但有更深层级的标题,使用最低层级标题切分(保持向后兼容)
+        if not direct_child_titles and all_sub_titles:
+            lowest_level_titles = [sub for sub in all_sub_titles if sub["level"] == max_level]
+            print(f"      没有直接子标题,使用最低层级标题切分: {len(lowest_level_titles)} 个")
+            direct_child_titles = lowest_level_titles
 
-        # 按最低层级标题切分
+        # 按直接子标题切分(如果存在)
         chunks = []
-        for i, sub_title in enumerate(lowest_level_titles):
-            start_pos = sub_title["position"]
+        if direct_child_titles:
+            for i, sub_title in enumerate(direct_child_titles):
+                start_pos = sub_title["position"]
 
-            # 确定结束位置(下一个最低层级标题的位置)
-            if i + 1 < len(lowest_level_titles):
-                end_pos = lowest_level_titles[i + 1]["position"]
-            else:
+                # 确定结束位置(下一个同级或更高级标题的位置)
+                # 在 all_sub_titles 中查找下一个位置大于当前标题,且 level <= direct_child_level 的标题
                 end_pos = len(content_block)
-
-            chunk_content = content_block[start_pos:end_pos]
-            
-            # 调试信息
-            content_preview = chunk_content[:100].replace("\n", " ")
-            print(f"        切分块 {i+1}: {sub_title['title']}, 位置: {start_pos}-{end_pos}, 长度: {len(chunk_content)}, 预览: {content_preview}...")
-
-            # 检查子标题是否有实际正文内容
-            title_len = len(sub_title["title"])
-            content_after_title = chunk_content[title_len:].strip()
-
-            if not content_after_title or len(content_after_title) < 10:
-                print(f"        跳过(内容不足)")
-                continue
-
-            # 构建层级路径
-            hierarchy_path = self._build_hierarchy_path_for_subtitle(
-                sub_title["toc_item"], all_toc_items, parent_title_info
-            )
-
-            # 只有当块超过 max_chunk_size 时才按句子切分
-            if len(chunk_content) > max_chunk_size:
-                print(f"        块过大,按句子切分")
-                split_chunks = self._split_large_chunk(
-                    chunk_content, max_chunk_size, sub_title["title"], hierarchy_path
-                )
-                for split_chunk in split_chunks:
-                    split_chunk["relative_start"] = start_pos + split_chunk["relative_start"]
-                    split_chunk["sub_title"] = sub_title["title"]
-                    if "hierarchy_path" not in split_chunk:
-                        split_chunk["hierarchy_path"] = hierarchy_path
-                    chunks.append(split_chunk)
-            else:
-                # 直接作为一个块
-                chunks.append(
-                    {
-                        "content": chunk_content,
-                        "relative_start": start_pos,
-                        "sub_title": sub_title["title"],
-                        "hierarchy_path": hierarchy_path,
-                    }
+                for next_sub in all_sub_titles:
+                    if next_sub["position"] > start_pos and next_sub["level"] <= direct_child_level:
+                        end_pos = next_sub["position"]
+                        break
+
+                chunk_content = content_block[start_pos:end_pos]
+                
+                # 调试信息
+                content_preview = chunk_content[:100].replace("\n", " ")
+                print(f"        切分块 {i+1}: {sub_title['title']} (level={sub_title['level']}), 位置: {start_pos}-{end_pos}, 长度: {len(chunk_content)}, 预览: {content_preview}...")
+
+                # 检查子标题是否有实际正文内容
+                title_len = len(sub_title["title"])
+                content_after_title = chunk_content[title_len:].strip()
+
+                if not content_after_title or len(content_after_title) < 10:
+                    print(f"        跳过(内容不足)")
+                    continue
+
+                # 构建层级路径
+                hierarchy_path = self._build_hierarchy_path_for_subtitle(
+                    sub_title["toc_item"], all_toc_items, parent_title_info
                 )
 
+                # 只有当块超过 max_chunk_size 时才按句子切分
+                if len(chunk_content) > max_chunk_size:
+                    print(f"        块过大,按句子切分")
+                    split_chunks = self._split_large_chunk(
+                        chunk_content, max_chunk_size, sub_title["title"], hierarchy_path
+                    )
+                    for split_chunk in split_chunks:
+                        split_chunk["relative_start"] = start_pos + split_chunk["relative_start"]
+                        split_chunk["sub_title"] = sub_title["title"]
+                        if "hierarchy_path" not in split_chunk:
+                            split_chunk["hierarchy_path"] = hierarchy_path
+                        chunks.append(split_chunk)
+                else:
+                    # 直接作为一个块
+                    chunks.append(
+                        {
+                            "content": chunk_content,
+                            "relative_start": start_pos,
+                            "sub_title": sub_title["title"],
+                            "hierarchy_path": hierarchy_path,
+                        }
+                    )
+
         # 如果所有子标题都没有正文内容,返回整个正文块
         if not chunks:
             if len(content_block) > max_chunk_size:

+ 61 - 1065
core/construction_review/component/doc_worker/pdf_worker/text_splitter.py

@@ -10,17 +10,15 @@ PDF 文本切分实现
 
 from __future__ import annotations
 
-import json
-import re
 from typing import Any, Dict, List
 
 from ..config.provider import default_config_provider
 from ..interfaces import TextSplitter
 from ..utils.title_matcher import TitleMatcher
+from ..utils.text_split_support import HierarchicalChunkMixin
 
 
-
-class PdfTextSplitter(TextSplitter):
+class PdfTextSplitter(TextSplitter, HierarchicalChunkMixin):
     """按目录层级对 PDF 正文进行智能分块的实现(复刻 doc_worker 逻辑)。"""
 
     def __init__(self) -> None:
@@ -231,727 +229,77 @@ class PdfTextSplitter(TextSplitter):
                     }
                 ]
 
-        # 找到最低层级(用于判断哪些是最底层的标题)
-        max_level = max(sub["level"] for sub in all_sub_titles)
+        # 找到直接子标题(parent_level + 1)和所有更深层级的标题
+        direct_child_level = parent_level + 1
+        direct_child_titles = [sub for sub in all_sub_titles if sub["level"] == direct_child_level]
         
-        # 只保留最低层级的标题作为切分点
-        lowest_level_titles = [sub for sub in all_sub_titles if sub["level"] == max_level]
+        # 找到最低层级(用于判断哪些是最底层的标题)
+        max_level = max(sub["level"] for sub in all_sub_titles) if all_sub_titles else parent_level
         
-        print(f"      父标题: {parent_title}, 找到 {len(all_sub_titles)} 个子标题, 最低层级: {max_level}, 最低层级标题数: {len(lowest_level_titles)}")
-
-        # 按最低层级标题切分
-        chunks = []
-        for i, sub_title in enumerate(lowest_level_titles):
-            start_pos = sub_title["position"]
-
-            # 确定结束位置(下一个最低层级标题的位置)
-            if i + 1 < len(lowest_level_titles):
-                end_pos = lowest_level_titles[i + 1]["position"]
-            else:
-                end_pos = len(content_block)
-
-            chunk_content = content_block[start_pos:end_pos]
-            
-            # 调试信息
-            content_preview = chunk_content[:100].replace("\n", " ")
-            print(f"        切分块 {i+1}: {sub_title['title']}, 位置: {start_pos}-{end_pos}, 长度: {len(chunk_content)}, 预览: {content_preview}...")
-
-            # 检查子标题是否有实际正文内容
-            title_len = len(sub_title["title"])
-            content_after_title = chunk_content[title_len:].strip()
-
-            if not content_after_title or len(content_after_title) < 10:
-                print(f"        跳过(内容不足)")
-                continue
-
-            # 构建层级路径
-            hierarchy_path = self._build_hierarchy_path_for_subtitle(
-                sub_title["toc_item"], all_toc_items, parent_title_info
-            )
-
-            # 只有当块超过 max_chunk_size 时才按句子切分
-            if len(chunk_content) > max_chunk_size:
-                print(f"        块过大,按句子切分")
-                split_chunks = self._split_large_chunk(
-                    chunk_content, max_chunk_size, sub_title["title"], hierarchy_path
-                )
-                for split_chunk in split_chunks:
-                    split_chunk["relative_start"] = start_pos + split_chunk["relative_start"]
-                    split_chunk["sub_title"] = sub_title["title"]
-                    if "hierarchy_path" not in split_chunk:
-                        split_chunk["hierarchy_path"] = hierarchy_path
-                    chunks.append(split_chunk)
-            else:
-                # 直接作为一个块
-                chunks.append(
-                    {
-                        "content": chunk_content,
-                        "relative_start": start_pos,
-                        "sub_title": sub_title["title"],
-                        "hierarchy_path": hierarchy_path,
-                    }
-                )
-
-        # 如果所有子标题都没有正文内容,返回整个正文块
-        if not chunks:
-            if len(content_block) > max_chunk_size:
-                return self._split_large_chunk(
-                    content_block, max_chunk_size, parent_title,
-                    parent_title_info.get("hierarchy_path", [parent_title])
-                )
-            else:
-                return [
-                    {
-                        "content": content_block,
-                        "relative_start": 0,
-                        "sub_title": "",
-                        "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
-                    }
-                ]
+        print(f"      父标题: {parent_title}, 找到 {len(all_sub_titles)} 个子标题, 直接子标题数: {len(direct_child_titles)}, 最低层级: {max_level}")
 
-        return chunks
+        # 如果没有直接子标题,但有更深层级的标题,使用最低层级标题切分(保持向后兼容)
+        if not direct_child_titles and all_sub_titles:
+            lowest_level_titles = [sub for sub in all_sub_titles if sub["level"] == max_level]
+            print(f"      没有直接子标题,使用最低层级标题切分: {len(lowest_level_titles)} 个")
+            direct_child_titles = lowest_level_titles
 
-    def _find_title_in_block(self, title: str, block: str, fuzzy_threshold: float) -> int:
-        """在文本块中查找标题位置(简化版)"""
-        # 直接使用 TitleMatcher 的方法
-        return self._title_matcher._find_title_in_text(title, block, fuzzy_threshold)
-
-    def _split_large_chunk(
-        self,
-        content: str,
-        max_chunk_size: int,
-        title: str,
-        hierarchy_path: List[str] | None = None,
-    ) -> List[Dict[str, Any]]:
-        """
-        将超大块按句子级分割(保持语义完整)
-        """
-        # 按句子分割(中文句号、问号、感叹号、换行)
-        sentences = re.split(r"([。!?\n])", content)
-
-        # 重新组合句子和标点
-        combined_sentences = []
-        for i in range(0, len(sentences) - 1, 2):
-            if i + 1 < len(sentences):
-                combined_sentences.append(sentences[i] + sentences[i + 1])
-            else:
-                combined_sentences.append(sentences[i])
-
-        if not combined_sentences:
-            combined_sentences = [content]
-
-        # 按max_chunk_size组合句子
+        # 按直接子标题切分(如果存在)
         chunks = []
-        current_chunk = ""
-        current_start = 0
-
-        for sentence in combined_sentences:
-            if len(current_chunk) + len(sentence) <= max_chunk_size:
-                current_chunk += sentence
-            else:
-                if current_chunk:
-                    chunk_data = {
-                        "content": current_chunk,
-                        "relative_start": current_start,
-                        "is_split": True,  # 标记为分割块
-                    }
-                    if hierarchy_path is not None:
-                        chunk_data["hierarchy_path"] = hierarchy_path
-                    chunks.append(chunk_data)
-                    current_start += len(current_chunk)
-                current_chunk = sentence
-
-        # 添加最后一个块
-        if current_chunk:
-            chunk_data = {
-                "content": current_chunk,
-                "relative_start": current_start,
-                "is_split": True,
-            }
-            if hierarchy_path is not None:
-                chunk_data["hierarchy_path"] = hierarchy_path
-            chunks.append(chunk_data)
-
-        return chunks
-
-    def _build_hierarchy_path_for_subtitle(
-        self,
-        sub_title_item: Dict[str, Any],
-        all_toc_items: List[Dict[str, Any]],
-        parent_title_info: Dict[str, Any],
-    ) -> List[str]:
-        """为子标题构建完整的层级路径"""
-        hierarchy_path = []
-
-        # 找到子标题在toc_items中的位置
-        sub_title = sub_title_item.get("title", "")
-        sub_title_idx = -1
-        for idx, item in enumerate(all_toc_items):
-            if item.get("title", "") == sub_title:
-                sub_title_idx = idx
-                break
-
-        if sub_title_idx < 0:
-            # 如果找不到,返回父标题->子标题
-            return [parent_title_info["title"], sub_title]
-
-        # 从子标题向前查找,找到每个层级的父级标题
-        level_paths = {}  # 存储每个层级对应的标题
-        current_level = sub_title_item.get("level", 2)
-
-        for i in range(sub_title_idx, -1, -1):
-            item = all_toc_items[i]
-            item_level = item.get("level", 1)
-
-            if item_level <= current_level and item_level not in level_paths:
-                level_paths[item_level] = item["title"]
-                if item_level == 1:
-                    break
-
-        # 按层级顺序构建路径(从1级到当前层级)
-        for level in range(1, current_level + 1):
-            if level in level_paths:
-                hierarchy_path.append(level_paths[level])
-
-        # 如果路径为空,至少包含父标题和子标题
-        if not hierarchy_path:
-            hierarchy_path = [parent_title_info["title"], sub_title]
-
-        return hierarchy_path
-
-    def _build_hierarchy_path(
-        self, title: str, all_toc_items: List[Dict[str, Any]], target_level: int
-    ) -> List[str]:
-        """构建从1级到当前标题的完整层级路径"""
-        hierarchy_path = []
-
-        # 找到当前标题在目录中的位置
-        current_item = None
-        current_idx = -1
-        for idx, item in enumerate(all_toc_items):
-            if item["title"] == title:
-                current_item = item
-                current_idx = idx
-                break
-
-        if not current_item:
-            # 如果找不到,返回只包含当前标题的路径
-            return [title]
-
-        current_level = current_item.get("level", target_level)
-
-        # 从当前项向前查找,找到每个层级的最近父级
-        level_paths = {}  # 存储每个层级对应的标题
-
-        for i in range(current_idx, -1, -1):
-            item = all_toc_items[i]
-            item_level = item.get("level", 1)
-
-            if item_level <= current_level and item_level not in level_paths:
-                level_paths[item_level] = item["title"]
-                if item_level == 1:
-                    break
-
-        # 按层级顺序构建路径(从1级到当前层级)
-        for level in range(1, current_level + 1):
-            if level in level_paths:
-                hierarchy_path.append(level_paths[level])
-            elif level == current_level:
-                hierarchy_path.append(title)
-
-        # 如果路径为空,至少包含当前标题
-        if not hierarchy_path:
-            hierarchy_path = [title]
-
-        return hierarchy_path
-
-    def _build_chunk_metadata(
-        self,
-        sub_chunk: Dict[str, Any],
-        title_info: Dict[str, Any],
-        start_pos: int,
-        pages_content: List[Dict[str, Any]],
-        i: int,
-        j: int,
-        chapter_classification_map: Dict[str, Dict[str, Any]] = None,
-    ) -> Dict[str, Any]:
-        """构建文本块的元数据"""
-        content = sub_chunk["content"]
-        chunk_start_pos = start_pos + sub_chunk["relative_start"]
-        page_num = self._get_page_from_pos(chunk_start_pos, pages_content)
-
-        # 构建section_label:使用完整的层级路径
-        hierarchy_path = sub_chunk.get("hierarchy_path", [])
-        sub_title = sub_chunk.get("sub_title", "")
-
-        if hierarchy_path:
-            section_label = "->".join(hierarchy_path)
-        elif sub_title:
-            section_label = f"{title_info['title']}->{sub_title}"
-        else:
-            section_label = title_info["title"]
-
-        # 提取最低层级标题的编号
-        if hierarchy_path:
-            lowest_title = hierarchy_path[-1]
-            title_number = self._extract_title_number(lowest_title)
-        elif sub_title:
-            title_number = self._extract_title_number(sub_title)
-        else:
-            title_number = self._extract_title_number(title_info["title"])
-
-        # 构建chunk_id
-        chunk_id_str = f"doc_chunk_{title_number}_{j}" if title_number else f"doc_chunk_{j}"
-
-        # 获取一级目录的分类信息
-        chapter_classification = None
-        if chapter_classification_map:
-            # 从hierarchy_path获取一级目录标题
-            if hierarchy_path and len(hierarchy_path) > 0:
-                chapter_title = hierarchy_path[0]
-                chapter_classification = chapter_classification_map.get(chapter_title)
-            elif not hierarchy_path:
-                # 如果没有hierarchy_path,尝试从title_info获取
-                chapter_title = title_info.get("title", "")
-                chapter_classification = chapter_classification_map.get(chapter_title)
-
-        chunk_data = {
-            "file_name": "",  # 由上层填充
-            "chunk_id": chunk_id_str,
-            "section_label": section_label,
-            "project_plan_type": title_info.get("category_code", "other"),
-            "chapter_classification": title_info.get("category_code", "other"),
-            "element_tag": {
-                "chunk_id": chunk_id_str,
-                "page": page_num,
-                "serial_number": title_number if title_number else str(i + 1),
-            },
-            "review_chunk_content": content,
-            "_title_number": title_number,
-            "_local_index": j,
-            "_sort_key": chunk_start_pos,
-        }
-
-        # # 如果找到了一级目录的分类信息,添加到chunk中
-        # if chapter_classification:
-        #     chunk_data["chapter_classification"] = chapter_classification
-
-        return chunk_data
-
-    def _finalize_chunk_ids(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
-        """生成最终的chunk_id和serial_number"""
-        final_chunks = []
-        
-        # 按 section_label 分组,为每组内的块生成递增的序号
-        section_groups: Dict[str, int] = {}  # section_label -> 当前序号
-
-        for chunk in chunks:
-            section_label = chunk.get("section_label", "")
-            
-            # 为当前 section_label 生成序号
-            if section_label not in section_groups:
-                section_groups[section_label] = 1
-            else:
-                section_groups[section_label] += 1
-            
-            local_index = section_groups[section_label]
-
-            # 从section_label中提取标题路径的编号路径
-            title_number_path = self._extract_title_number_path(section_label)
-
-            # 生成chunk_id:doc_chunk_<标题路径的编号路径>_序号
-            if title_number_path:
-                chunk_id_str = f"doc_chunk_{title_number_path}_{local_index}"
-            else:
-                chunk_id_str = f"doc_chunk_{local_index}"
-
-            # 从section_label中提取最底层级的编号(用于 serial_number)
-            serial_number = self._extract_number_from_section_label(section_label)
-
-            # 更新chunk数据
-            final_chunk = {
-                "file_name": chunk["file_name"],
-                "chunk_id": chunk_id_str,
-                "section_label": chunk["section_label"],
-                "project_plan_type": chunk["project_plan_type"],
-                "chapter_classification": chunk["chapter_classification"],
-                "element_tag": {
-                    "chunk_id": chunk_id_str,
-                    "page": chunk["element_tag"]["page"],
-                    "serial_number": serial_number,
-                },
-                "review_chunk_content": chunk["review_chunk_content"],
-            }
-            print(f"[INFO] 更新chunk数据{chunk_id_str}")
-            with open(rf"temp\document_temp\文档切分预处理结果_final_chunk.json", 'w', encoding='utf-8') as f:
-                json.dump(final_chunk, f, ensure_ascii=False, indent=4)
-            final_chunks.append(final_chunk)
-
-        return final_chunks
-
-    def _get_page_from_pos(self, pos: int, pages_content: List[Dict[str, Any]]) -> int:
-        """根据位置获取页码"""
-        for page in pages_content:
-            if page["start_pos"] <= pos < page["end_pos"]:
-                return int(page["page_num"])
-        return 1
+        if direct_child_titles:
+            for i, sub_title in enumerate(direct_child_titles):
+                start_pos = sub_title["position"]
 
-    def _extract_title_number(self, title: str) -> str:
-        """从标题中提取编号部分(支持多种格式)"""
-        if not title:
-            return ""
-        
-        # 匹配章节格式(如 第一章、第1章等)
-        chapter_match = re.match(r"^(第[一二三四五六七八九十\d]+[章节条款部分])", title)
-        if chapter_match:
-            return chapter_match.group(1)
-        
-        # 匹配方括号数字格式(如 【1】、【2】等)
-        bracket_match = re.match(r"^(【\d+】)", title)
-        if bracket_match:
-            return bracket_match.group(1)
-        
-        # 匹配双方括号数字格式(如 〖1.1〗、〖2.3〗等)
-        double_bracket_match = re.match(r"^(〖\d+(?:\.\d+)*〗)", title)
-        if double_bracket_match:
-            return double_bracket_match.group(1)
-        
-        # 匹配数字编号格式(如 1.5, 1.6, 1.2.3等)
-        number_match = re.match(r"^(\d+(?:\.\d+)*)", title)
-        if number_match:
-            return number_match.group(1)
-        
-        # 匹配中文编号格式(如 一、二、三等)
-        chinese_match = re.match(r"^([一二三四五六七八九十]+)[、..)\)]", title)
-        if chinese_match:
-            return chinese_match.group(1)
-        
-        # 匹配圆括号编号格式(如 (1)、(一)等)
-        paren_match = re.match(r"^([\((][一二三四五六七八九十\d]+[\))])", title)
-        if paren_match:
-            return paren_match.group(1)
-        
-        return ""
-
-    def _extract_title_number_path(self, section_label: str) -> str:
-        """从section_label中提取标题路径的编号路径"""
-        if not section_label:
-            return ""
-
-        # 按"->"分割层级路径
-        parts = section_label.split("->")
-
-        # 提取每一层的编号
-        number_paths = []
-        for part in parts:
-            part = part.strip()
-            if part:
-                number = self._extract_title_number(part)
-                if number:
-                    number_paths.append(number)
-
-        # 用"->"连接编号路径
-        if number_paths:
-            return "->".join(number_paths)
-
-        return ""
-
-    def _extract_number_from_section_label(self, section_label: str) -> str:
-        """
-        从section_label中提取最底层级的编号
-        
-        例如:
-        "第一章 编制依据与说明->一) 编制依据" -> "一)"
-        "第二章 工程概况->二)周边环境条件及工程地质->1、周边环境条件" -> "1"
-        "第四章 施工工艺技术->一)主要部件说明->2、前临时支腿" -> "2"
-        """
-        if not section_label:
-            return ""
-
-        # 先找到最低层级部分(最后一个"->"后面的部分)
-        if "->" in section_label:
-            last_level_part = section_label.split("->")[-1].strip()
-        else:
-            last_level_part = section_label.strip()
-
-        # 检查最低层级部分是否包含合并标记(" + ")
-        if " + " in last_level_part:
-            # 分割合并的部分
-            merged_parts = last_level_part.split(" + ")
-            numbers = []
-            for part in merged_parts:
-                part = part.strip()
-                number = self._extract_title_number(part)
-                if number:
-                    numbers.append(number)
-
-            if numbers:
-                return "+".join(numbers)
-
-        # 没有合并的情况,直接提取最低层级的编号
-        return self._extract_title_number(last_level_part)
-
-
-
-    """按目录层级对 PDF 正文进行智能分块的实现(复刻 doc_worker 逻辑)。"""
-
-    def __init__(self) -> None:
-        self._cfg = default_config_provider
-        self._title_matcher = TitleMatcher()
-
-    def split_by_hierarchy(
-        self,
-        classification_items: List[Dict[str, Any]],
-        pages_content: List[Dict[str, Any]],
-        toc_info: Dict[str, Any],
-        target_level: int,
-        max_chunk_size: int,
-        min_chunk_size: int,
-    ) -> List[Dict[str, Any]]:
-        """
-        按目录层级和字符数智能切分文本
-        
-        新的分块逻辑:
-        1. 跳过目录页,按目录项定位到指定层级的正文标题
-        2. 在指定层级正文标题所属的正文块中,先按目录项的最低层级子标题进行分块
-        3. 对每个块按字符数判断:
-           - 超过max_chunk_size的进行句子级分割(保持语义尽量完整)
-        """
-        toc_pages = toc_info.get("toc_pages", []) or []
-        all_toc_items = toc_info.get("toc_items", [])
-        
-        # 使用完整全文
-        full_text = "".join(p.get("text", "") for p in pages_content)
-
-        print(f"  正在定位{len(classification_items)}个已分类的标题...")
-        print(f"  目录所在页: {toc_pages}")
-
-        # 步骤1: 在正文中定位已分类的标题(跳过目录页)
-        located = self._title_matcher.find_title_positions(
-            classification_items, full_text, pages_content, toc_pages
-        )
-        
-        # 只保留成功定位的标题
-        found_titles = [t for t in located if t["found"]]
-        if not found_titles:
-            print(f"  错误: 未能在正文中定位任何标题")
-            return []
-
-        print(f"  成功定位 {len(found_titles)}/{len(classification_items)} 个标题")
-        
-        # 按位置排序
-        found_titles.sort(key=lambda x: x["position"])
-
-        # 步骤2: 为每个找到的标题构建完整的层级路径
-        for title_info in found_titles:
-            hierarchy_path = self._build_hierarchy_path(
-                title_info["title"], all_toc_items, target_level
-            )
-            title_info["hierarchy_path"] = hierarchy_path
-
-        # 步骤3: 按目录层级处理每个标题块
-        all_chunks: List[Dict[str, Any]] = []
-        
-        for i, title_info in enumerate(found_titles):
-            start_pos = title_info["position"]
-            
-            # 确定正文块的结束位置(下一个同级标题的位置)
-            if i + 1 < len(found_titles):
-                end_pos = found_titles[i + 1]["position"]
-            else:
-                end_pos = len(full_text)
-            
-            # 提取正文块
-            content_block = full_text[start_pos:end_pos]
-            
-            # 在正文块中查找子标题(按最低层级切分)
-            sub_chunks = self._split_by_sub_titles(
-                content_block,
-                all_toc_items,
-                title_info,
-                target_level,
-                max_chunk_size,
-                min_chunk_size,
-            )
-            
-            # 为每个子块添加元数据
-            for j, sub_chunk in enumerate(sub_chunks, 1):
-                chunk_data = self._build_chunk_metadata(
-                    sub_chunk, title_info, start_pos, pages_content, i, j
+                # 确定结束位置(下一个同级或更高级标题的位置)
+                # 在 all_sub_titles 中查找下一个位置大于当前标题,且 level <= direct_child_level 的标题
+                end_pos = len(content_block)
+                for next_sub in all_sub_titles:
+                    if next_sub["position"] > start_pos and next_sub["level"] <= direct_child_level:
+                        end_pos = next_sub["position"]
+                        break
+
+                chunk_content = content_block[start_pos:end_pos]
+                
+                # 调试信息
+                content_preview = chunk_content[:100].replace("\n", " ")
+                print(f"        切分块 {i+1}: {sub_title['title']} (level={sub_title['level']}), 位置: {start_pos}-{end_pos}, 长度: {len(chunk_content)}, 预览: {content_preview}...")
+
+                # 检查子标题是否有实际正文内容
+                title_len = len(sub_title["title"])
+                content_after_title = chunk_content[title_len:].strip()
+
+                if not content_after_title or len(content_after_title) < 10:
+                    print(f"        跳过(内容不足)")
+                    continue
+
+                # 构建层级路径
+                hierarchy_path = self._build_hierarchy_path_for_subtitle(
+                    sub_title["toc_item"], all_toc_items, parent_title_info
                 )
-                all_chunks.append(chunk_data)
 
-        # 步骤4: 生成最终的chunk_id和serial_number
-        final_chunks = self._finalize_chunk_ids(all_chunks)
-
-        print(f"  初始切分: {len(all_chunks)} 个块")
-        print(f"  最终块数: {len(final_chunks)} 个块")
-
-        return final_chunks
-
-    def _split_by_sub_titles(
-        self,
-        content_block: str,
-        all_toc_items: List[Dict[str, Any]],
-        parent_title_info: Dict[str, Any],
-        target_level: int,
-        max_chunk_size: int,
-        min_chunk_size: int,
-    ) -> List[Dict[str, Any]]:
-        """
-        在正文块中按子标题进行切分(按照toc_items的顺序和层级关系)
-        
-        核心逻辑:
-        1. 查找所有层级的子标题(不限于直接子标题)
-        2. 按位置排序后,两个相邻子标题之间的内容作为一个块
-        3. 只有当块超过 max_chunk_size 时才按句子切分
-        """
-        # 找到父标题在toc_items中的位置
-        parent_title = parent_title_info["title"]
-        parent_idx = -1
-        parent_level = target_level
-        
-        for idx, toc_item in enumerate(all_toc_items):
-            if toc_item["title"] == parent_title:
-                parent_idx = idx
-                parent_level = toc_item.get("level", target_level)
-                break
-
-        if parent_idx < 0:
-            # 如果找不到父标题,将整个正文块作为一个块
-            if len(content_block) > max_chunk_size:
-                return self._split_large_chunk(content_block, max_chunk_size, parent_title, [])
-            else:
-                return [
-                    {
-                        "content": content_block,
-                        "relative_start": 0,
-                        "sub_title": "",
-                        "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
-                    }
-                ]
-
-        # 找到下一个同级或更高级标题的位置(确定父标题的范围)
-        next_sibling_idx = len(all_toc_items)
-        for idx in range(parent_idx + 1, len(all_toc_items)):
-            item = all_toc_items[idx]
-            if item.get("level", 1) <= parent_level:
-                next_sibling_idx = idx
-                break
-
-        # 查找所有子标题(所有 level > parent_level 的标题)
-        # 这是关键:不限于直接子标题,而是所有更深层级的标题
-        all_sub_titles = []
-        fuzzy_threshold = float(self._cfg.get("text_splitting.fuzzy_threshold", 0.8))
-
-        for idx in range(parent_idx + 1, next_sibling_idx):
-            toc_item = all_toc_items[idx]
-            item_level = toc_item.get("level", 1)
-            
-            # 查找所有更深层级的子标题
-            if item_level > parent_level:
-                # 在正文块中查找这个子标题
-                pos = self._find_title_in_block(
-                    toc_item["title"], content_block, fuzzy_threshold
-                )
-                if pos >= 0:
-                    # 调试:显示找到的标题及其周围内容
-                    context_start = max(0, pos - 20)
-                    context_end = min(len(content_block), pos + len(toc_item["title"]) + 50)
-                    context = content_block[context_start:context_end].replace("\n", " ")
-                    print(f"        找到子标题: {toc_item['title']} (level={item_level}), 位置={pos}, 上下文: ...{context}...")
-                    
-                    all_sub_titles.append(
+                # 只有当块超过 max_chunk_size 时才按句子切分
+                if len(chunk_content) > max_chunk_size:
+                    print(f"        块过大,按句子切分")
+                    split_chunks = self._split_large_chunk(
+                        chunk_content, max_chunk_size, sub_title["title"], hierarchy_path
+                    )
+                    for split_chunk in split_chunks:
+                        split_chunk["relative_start"] = start_pos + split_chunk["relative_start"]
+                        split_chunk["sub_title"] = sub_title["title"]
+                        if "hierarchy_path" not in split_chunk:
+                            split_chunk["hierarchy_path"] = hierarchy_path
+                        chunks.append(split_chunk)
+                else:
+                    # 直接作为一个块
+                    chunks.append(
                         {
-                            "title": toc_item["title"],
-                            "level": toc_item["level"],
-                            "position": pos,
-                            "toc_index": idx,
-                            "toc_item": toc_item,
+                            "content": chunk_content,
+                            "relative_start": start_pos,
+                            "sub_title": sub_title["title"],
+                            "hierarchy_path": hierarchy_path,
                         }
                     )
 
-        # 按位置排序
-        all_sub_titles.sort(key=lambda x: x["position"])
-
-        # 如果没有找到任何子标题,将整个正文块作为一个块
-        if not all_sub_titles:
-            if len(content_block) > max_chunk_size:
-                return self._split_large_chunk(
-                    content_block, max_chunk_size, parent_title, 
-                    parent_title_info.get("hierarchy_path", [parent_title])
-                )
-            else:
-                return [
-                    {
-                        "content": content_block,
-                        "relative_start": 0,
-                        "sub_title": "",
-                        "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
-                    }
-                ]
-
-        # 找到最低层级(用于判断哪些是最底层的标题)
-        max_level = max(sub["level"] for sub in all_sub_titles)
-        
-        # 只保留最低层级的标题作为切分点
-        lowest_level_titles = [sub for sub in all_sub_titles if sub["level"] == max_level]
-        
-        print(f"      父标题: {parent_title}, 找到 {len(all_sub_titles)} 个子标题, 最低层级: {max_level}, 最低层级标题数: {len(lowest_level_titles)}")
-
-        # 按最低层级标题切分
-        chunks = []
-        for i, sub_title in enumerate(lowest_level_titles):
-            start_pos = sub_title["position"]
-
-            # 确定结束位置(下一个最低层级标题的位置)
-            if i + 1 < len(lowest_level_titles):
-                end_pos = lowest_level_titles[i + 1]["position"]
-            else:
-                end_pos = len(content_block)
-
-            chunk_content = content_block[start_pos:end_pos]
-            
-            # 调试信息
-            content_preview = chunk_content[:100].replace("\n", " ")
-            print(f"        切分块 {i+1}: {sub_title['title']}, 位置: {start_pos}-{end_pos}, 长度: {len(chunk_content)}, 预览: {content_preview}...")
-
-            # 检查子标题是否有实际正文内容
-            title_len = len(sub_title["title"])
-            content_after_title = chunk_content[title_len:].strip()
-
-            if not content_after_title or len(content_after_title) < 10:
-                print(f"        跳过(内容不足)")
-                continue
-
-            # 构建层级路径
-            hierarchy_path = self._build_hierarchy_path_for_subtitle(
-                sub_title["toc_item"], all_toc_items, parent_title_info
-            )
-
-            # 只有当块超过 max_chunk_size 时才按句子切分
-            if len(chunk_content) > max_chunk_size:
-                print(f"        块过大,按句子切分")
-                split_chunks = self._split_large_chunk(
-                    chunk_content, max_chunk_size, sub_title["title"], hierarchy_path
-                )
-                for split_chunk in split_chunks:
-                    split_chunk["relative_start"] = start_pos + split_chunk["relative_start"]
-                    split_chunk["sub_title"] = sub_title["title"]
-                    if "hierarchy_path" not in split_chunk:
-                        split_chunk["hierarchy_path"] = hierarchy_path
-                    chunks.append(split_chunk)
-            else:
-                # 直接作为一个块
-                chunks.append(
-                    {
-                        "content": chunk_content,
-                        "relative_start": start_pos,
-                        "sub_title": sub_title["title"],
-                        "hierarchy_path": hierarchy_path,
-                    }
-                )
-
         # 如果所有子标题都没有正文内容,返回整个正文块
         if not chunks:
             if len(content_block) > max_chunk_size:
@@ -976,356 +324,4 @@ class PdfTextSplitter(TextSplitter):
         # 直接使用 TitleMatcher 的方法
         return self._title_matcher._find_title_in_text(title, block, fuzzy_threshold)
 
-    def _split_large_chunk(
-        self,
-        content: str,
-        max_chunk_size: int,
-        title: str,
-        hierarchy_path: List[str] | None = None,
-    ) -> List[Dict[str, Any]]:
-        """
-        将超大块按句子级分割(保持语义完整)
-        """
-        # 按句子分割(中文句号、问号、感叹号、换行)
-        sentences = re.split(r"([。!?\n])", content)
-
-        # 重新组合句子和标点
-        combined_sentences = []
-        for i in range(0, len(sentences) - 1, 2):
-            if i + 1 < len(sentences):
-                combined_sentences.append(sentences[i] + sentences[i + 1])
-            else:
-                combined_sentences.append(sentences[i])
-
-        if not combined_sentences:
-            combined_sentences = [content]
-
-        # 按max_chunk_size组合句子
-        chunks = []
-        current_chunk = ""
-        current_start = 0
-
-        for sentence in combined_sentences:
-            if len(current_chunk) + len(sentence) <= max_chunk_size:
-                current_chunk += sentence
-            else:
-                if current_chunk:
-                    chunk_data = {
-                        "content": current_chunk,
-                        "relative_start": current_start,
-                        "is_split": True,  # 标记为分割块
-                    }
-                    if hierarchy_path is not None:
-                        chunk_data["hierarchy_path"] = hierarchy_path
-                    chunks.append(chunk_data)
-                    current_start += len(current_chunk)
-                current_chunk = sentence
-
-        # 添加最后一个块
-        if current_chunk:
-            chunk_data = {
-                "content": current_chunk,
-                "relative_start": current_start,
-                "is_split": True,
-            }
-            if hierarchy_path is not None:
-                chunk_data["hierarchy_path"] = hierarchy_path
-            chunks.append(chunk_data)
-
-        return chunks
-
-    def _build_hierarchy_path_for_subtitle(
-        self,
-        sub_title_item: Dict[str, Any],
-        all_toc_items: List[Dict[str, Any]],
-        parent_title_info: Dict[str, Any],
-    ) -> List[str]:
-        """为子标题构建完整的层级路径"""
-        hierarchy_path = []
-
-        # 找到子标题在toc_items中的位置
-        sub_title = sub_title_item.get("title", "")
-        sub_title_idx = -1
-        for idx, item in enumerate(all_toc_items):
-            if item.get("title", "") == sub_title:
-                sub_title_idx = idx
-                break
-
-        if sub_title_idx < 0:
-            # 如果找不到,返回父标题->子标题
-            return [parent_title_info["title"], sub_title]
-
-        # 从子标题向前查找,找到每个层级的父级标题
-        level_paths = {}  # 存储每个层级对应的标题
-        current_level = sub_title_item.get("level", 2)
-
-        for i in range(sub_title_idx, -1, -1):
-            item = all_toc_items[i]
-            item_level = item.get("level", 1)
-
-            if item_level <= current_level and item_level not in level_paths:
-                level_paths[item_level] = item["title"]
-                if item_level == 1:
-                    break
-
-        # 按层级顺序构建路径(从1级到当前层级)
-        for level in range(1, current_level + 1):
-            if level in level_paths:
-                hierarchy_path.append(level_paths[level])
-
-        # 如果路径为空,至少包含父标题和子标题
-        if not hierarchy_path:
-            hierarchy_path = [parent_title_info["title"], sub_title]
-
-        return hierarchy_path
-
-    def _build_hierarchy_path(
-        self, title: str, all_toc_items: List[Dict[str, Any]], target_level: int
-    ) -> List[str]:
-        """构建从1级到当前标题的完整层级路径"""
-        hierarchy_path = []
-
-        # 找到当前标题在目录中的位置
-        current_item = None
-        current_idx = -1
-        for idx, item in enumerate(all_toc_items):
-            if item["title"] == title:
-                current_item = item
-                current_idx = idx
-                break
-
-        if not current_item:
-            # 如果找不到,返回只包含当前标题的路径
-            return [title]
-
-        current_level = current_item.get("level", target_level)
-
-        # 从当前项向前查找,找到每个层级的最近父级
-        level_paths = {}  # 存储每个层级对应的标题
-
-        for i in range(current_idx, -1, -1):
-            item = all_toc_items[i]
-            item_level = item.get("level", 1)
-
-            if item_level <= current_level and item_level not in level_paths:
-                level_paths[item_level] = item["title"]
-                if item_level == 1:
-                    break
-
-        # 按层级顺序构建路径(从1级到当前层级)
-        for level in range(1, current_level + 1):
-            if level in level_paths:
-                hierarchy_path.append(level_paths[level])
-            elif level == current_level:
-                hierarchy_path.append(title)
-
-        # 如果路径为空,至少包含当前标题
-        if not hierarchy_path:
-            hierarchy_path = [title]
-
-        return hierarchy_path
-
-    def _build_chunk_metadata(
-        self,
-        sub_chunk: Dict[str, Any],
-        title_info: Dict[str, Any],
-        start_pos: int,
-        pages_content: List[Dict[str, Any]],
-        i: int,
-        j: int,
-    ) -> Dict[str, Any]:
-        """构建文本块的元数据"""
-        content = sub_chunk["content"]
-        chunk_start_pos = start_pos + sub_chunk["relative_start"]
-        page_num = self._get_page_from_pos(chunk_start_pos, pages_content)
-
-        # 构建section_label:使用完整的层级路径
-        hierarchy_path = sub_chunk.get("hierarchy_path", [])
-        sub_title = sub_chunk.get("sub_title", "")
-
-        if hierarchy_path:
-            section_label = "->".join(hierarchy_path)
-        elif sub_title:
-            section_label = f"{title_info['title']}->{sub_title}"
-        else:
-            section_label = title_info["title"]
-
-        # 提取最低层级标题的编号
-        if hierarchy_path:
-            lowest_title = hierarchy_path[-1]
-            title_number = self._extract_title_number(lowest_title)
-        elif sub_title:
-            title_number = self._extract_title_number(sub_title)
-        else:
-            title_number = self._extract_title_number(title_info["title"])
-
-        # 构建chunk_id
-        chunk_id_str = f"doc_chunk_{title_number}_{j}" if title_number else f"doc_chunk_{j}"
-
-        return {
-            "file_name": "",  # 由上层填充
-            "chunk_id": chunk_id_str,
-            "section_label": section_label,
-            "project_plan_type": title_info.get("category_code", "other"),
-            "element_tag": {
-                "chunk_id": chunk_id_str,
-                "page": page_num,
-                "serial_number": title_number if title_number else str(i + 1),
-            },
-            "review_chunk_content": content,
-            "_title_number": title_number,
-            "_local_index": j,
-            "_sort_key": chunk_start_pos,
-        }
-
-    def _finalize_chunk_ids(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
-        """生成最终的chunk_id和serial_number"""
-        final_chunks = []
-        
-        # 按 section_label 分组,为每组内的块生成递增的序号
-        section_groups: Dict[str, int] = {}  # section_label -> 当前序号
-
-        for chunk in chunks:
-            section_label = chunk.get("section_label", "")
-            
-            # 为当前 section_label 生成序号
-            if section_label not in section_groups:
-                section_groups[section_label] = 1
-            else:
-                section_groups[section_label] += 1
-            
-            local_index = section_groups[section_label]
-
-            # 从section_label中提取标题路径的编号路径
-            title_number_path = self._extract_title_number_path(section_label)
-
-            # 生成chunk_id:doc_chunk_<标题路径的编号路径>_序号
-            if title_number_path:
-                chunk_id_str = f"doc_chunk_{title_number_path}_{local_index}"
-            else:
-                chunk_id_str = f"doc_chunk_{local_index}"
-
-            # 从section_label中提取最底层级的编号(用于 serial_number)
-            serial_number = self._extract_number_from_section_label(section_label)
-
-            # 更新chunk数据
-            final_chunk = {
-                "file_name": chunk["file_name"],
-                "chunk_id": chunk_id_str,
-                "section_label": chunk["section_label"],
-                "project_plan_type": chunk["project_plan_type"],
-                "element_tag": {
-                    "chunk_id": chunk_id_str,
-                    "page": chunk["element_tag"]["page"],
-                    "serial_number": serial_number,
-                },
-                "review_chunk_content": chunk["review_chunk_content"],
-            }
-
-            final_chunks.append(final_chunk)
-
-        return final_chunks
-
-    def _get_page_from_pos(self, pos: int, pages_content: List[Dict[str, Any]]) -> int:
-        """根据位置获取页码"""
-        for page in pages_content:
-            if page["start_pos"] <= pos < page["end_pos"]:
-                return int(page["page_num"])
-        return 1
-
-    def _extract_title_number(self, title: str) -> str:
-        """从标题中提取编号部分(支持多种格式)"""
-        if not title:
-            return ""
-        
-        # 匹配章节格式(如 第一章、第1章等)
-        chapter_match = re.match(r"^(第[一二三四五六七八九十\d]+[章节条款部分])", title)
-        if chapter_match:
-            return chapter_match.group(1)
-        
-        # 匹配方括号数字格式(如 【1】、【2】等)
-        bracket_match = re.match(r"^(【\d+】)", title)
-        if bracket_match:
-            return bracket_match.group(1)
-        
-        # 匹配双方括号数字格式(如 〖1.1〗、〖2.3〗等)
-        double_bracket_match = re.match(r"^(〖\d+(?:\.\d+)*〗)", title)
-        if double_bracket_match:
-            return double_bracket_match.group(1)
-        
-        # 匹配数字编号格式(如 1.5, 1.6, 1.2.3等)
-        number_match = re.match(r"^(\d+(?:\.\d+)*)", title)
-        if number_match:
-            return number_match.group(1)
-        
-        # 匹配中文编号格式(如 一、二、三等)
-        chinese_match = re.match(r"^([一二三四五六七八九十]+)[、..)\)]", title)
-        if chinese_match:
-            return chinese_match.group(1)
-        
-        # 匹配圆括号编号格式(如 (1)、(一)等)
-        paren_match = re.match(r"^([\((][一二三四五六七八九十\d]+[\))])", title)
-        if paren_match:
-            return paren_match.group(1)
-        
-        return ""
-
-    def _extract_title_number_path(self, section_label: str) -> str:
-        """从section_label中提取标题路径的编号路径"""
-        if not section_label:
-            return ""
-
-        # 按"->"分割层级路径
-        parts = section_label.split("->")
-
-        # 提取每一层的编号
-        number_paths = []
-        for part in parts:
-            part = part.strip()
-            if part:
-                number = self._extract_title_number(part)
-                if number:
-                    number_paths.append(number)
-
-        # 用"->"连接编号路径
-        if number_paths:
-            return "->".join(number_paths)
-
-        return ""
-
-    def _extract_number_from_section_label(self, section_label: str) -> str:
-        """
-        从section_label中提取最底层级的编号
-        
-        例如:
-        "第一章 编制依据与说明->一) 编制依据" -> "一)"
-        "第二章 工程概况->二)周边环境条件及工程地质->1、周边环境条件" -> "1"
-        "第四章 施工工艺技术->一)主要部件说明->2、前临时支腿" -> "2"
-        """
-        if not section_label:
-            return ""
-
-        # 先找到最低层级部分(最后一个"->"后面的部分)
-        if "->" in section_label:
-            last_level_part = section_label.split("->")[-1].strip()
-        else:
-            last_level_part = section_label.strip()
-
-        # 检查最低层级部分是否包含合并标记(" + ")
-        if " + " in last_level_part:
-            # 分割合并的部分
-            merged_parts = last_level_part.split(" + ")
-            numbers = []
-            for part in merged_parts:
-                part = part.strip()
-                number = self._extract_title_number(part)
-                if number:
-                    numbers.append(number)
-
-            if numbers:
-                return "+".join(numbers)
-
-        # 没有合并的情况,直接提取最低层级的编号
-        return self._extract_title_number(last_level_part)
-
 

+ 128 - 47
core/construction_review/component/doc_worker/utils/title_matcher.py

@@ -125,32 +125,116 @@ class TitleMatcher:
         """
         在文本中查找标题的近似位置(返回标题在文本中的精确起始位置)。
         
-        智能匹配策略:
-        1. 先用标题正文部分定位(可能有多个位置)
-        2. 再用编号部分确认
-        3. 如果编号确认不了,就使用定位到的元素所在行只有标题部分,没有其他字符(转义字符除外)的那个
-        4. 否则就直接确认第一个匹配位置
+        优化后的匹配策略:
+        1. 先用完整标题进行定位
+        2. 如果定位不到,再用标题的正文部分进行定位
+        3. 定位到多个位置的元素,选用元素独占一行的(只有标题正文,没有其他非转义字符)
         """
-        # 提取标题的编号部分和正文部分
-        title_number = self._extract_title_number(title)
+        # 步骤1: 先用完整标题进行定位
+        full_title_positions = self._find_full_title_positions(title, text)
+        
+        if full_title_positions:
+            # 如果找到完整标题的多个位置,优先选择独占一行的
+            best_pos = self._select_best_position(full_title_positions, text, title)
+            if best_pos >= 0:
+                return best_pos
+            # 如果找不到独占一行的,返回第一个位置
+            return full_title_positions[0]
+        
+        # 步骤2: 如果完整标题定位不到,再用标题的正文部分进行定位
         title_content = self._extract_title_content(title)
         
         if not title_content:
             # 如果没有正文部分,使用原来的逻辑
             return self._find_title_in_text_legacy(title, text, fuzzy_threshold)
         
-        # 移除转义字符后的文本
+        # 查找所有匹配标题正文部分的位置
+        content_positions = self._find_content_positions(title_content, text)
+        
+        if not content_positions:
+            # 如果没有找到任何位置,使用模糊匹配
+            return self._find_title_in_text_legacy(title, text, fuzzy_threshold)
+        
+        # 步骤3: 定位到多个位置的元素,选用元素独占一行的
+        best_pos = self._select_best_position(content_positions, text, title_content)
+        if best_pos >= 0:
+            return best_pos
+        
+        # 如果找不到独占一行的,返回第一个位置
+        return content_positions[0]
+    
+    def _find_full_title_positions(self, title: str, text: str) -> List[int]:
+        """
+        查找完整标题在文本中的所有位置
+        
+        返回:
+            List[int]: 所有匹配位置的列表
+        """
+        positions = []
+        
+        # 移除转义字符后的文本和标题
+        text_clean = self._remove_escape_chars(text)
+        title_clean = self._remove_escape_chars(title)
+        title_normalized = self._normalize_title(title_clean)
+        
+        if not title_normalized:
+            return positions
+        
+        # 按行查找(更高效)
+        lines = text.split('\n')
+        current_pos = 0
+        
+        for line in lines:
+            line_clean = self._remove_escape_chars(line)
+            line_normalized = self._normalize_title(line_clean)
+            
+            # 检查行中是否包含完整标题
+            if title_normalized in line_normalized:
+                pos_in_line = line_normalized.find(title_normalized)
+                if pos_in_line >= 0:
+                    line_pos = self._find_pattern_in_line(
+                        title_normalized, line, pos_in_line
+                    )
+                    if line_pos >= 0:
+                        positions.append(current_pos + line_pos)
+            
+            # 移除空格后查找
+            title_no_space = title_normalized.replace(' ', '')
+            line_no_space = line_normalized.replace(' ', '')
+            if title_no_space and title_no_space in line_no_space:
+                pos_in_line = line_no_space.find(title_no_space)
+                if pos_in_line >= 0:
+                    line_pos = self._find_pattern_in_line(
+                        title_no_space, line, pos_in_line
+                    )
+                    if line_pos >= 0:
+                        pos = current_pos + line_pos
+                        if pos not in positions:
+                            positions.append(pos)
+            
+            current_pos += len(line) + 1  # +1 for newline
+        
+        # 去重并排序
+        return sorted(set(positions))
+    
+    def _find_content_positions(self, title_content: str, text: str) -> List[int]:
+        """
+        查找标题正文部分在文本中的所有位置
+        
+        返回:
+            List[int]: 所有匹配位置的列表
+        """
+        positions = []
+        
+        # 移除转义字符后的文本和标题正文
         text_clean = self._remove_escape_chars(text)
         title_content_clean = self._remove_escape_chars(title_content)
         title_content_normalized = self._normalize_title(title_content_clean)
         
         if not title_content_normalized:
-            return -1
-        
-        # 查找所有匹配标题正文部分的位置
-        candidate_positions = []
+            return positions
         
-        # 方法1: 按行查找(更高效)
+        # 按行查找(更高效)
         lines = text.split('\n')
         current_pos = 0
         
@@ -160,17 +244,15 @@ class TitleMatcher:
             
             # 检查行中是否包含标题正文
             if title_content_normalized in line_normalized:
-                # 找到标题在行中的位置
                 pos_in_line = line_normalized.find(title_content_normalized)
                 if pos_in_line >= 0:
-                    # 映射回原始行的位置
                     line_pos = self._find_pattern_in_line(
                         title_content_normalized, line, pos_in_line
                     )
                     if line_pos >= 0:
-                        candidate_positions.append(current_pos + line_pos)
+                        positions.append(current_pos + line_pos)
             
-            # 方法2: 移除空格后查找
+            # 移除空格后查找
             title_no_space = title_content_normalized.replace(' ', '')
             line_no_space = line_normalized.replace(' ', '')
             if title_no_space and title_no_space in line_no_space:
@@ -181,36 +263,40 @@ class TitleMatcher:
                     )
                     if line_pos >= 0:
                         pos = current_pos + line_pos
-                        if pos not in candidate_positions:
-                            candidate_positions.append(pos)
+                        if pos not in positions:
+                            positions.append(pos)
             
             current_pos += len(line) + 1  # +1 for newline
         
-        if not candidate_positions:
-            # 如果没有找到任何位置,使用模糊匹配
-            return self._find_title_in_text_legacy(title, text, fuzzy_threshold)
-        
         # 去重并排序
-        candidate_positions = sorted(set(candidate_positions))
-        
-        # 如果有编号部分,尝试用编号确认
-        if title_number:
-            for pos in candidate_positions:
-                # 检查该位置前后的文本是否包含编号
-                check_range = 50  # 检查前后50个字符
-                start_check = max(0, pos - check_range)
-                end_check = min(len(text), pos + check_range)
-                context = text[start_check:end_check]
-                
-                # 在上下文中查找编号
-                if self._check_number_in_context(title_number, context, pos - start_check):
-                    return pos
+        return sorted(set(positions))
+    
+    def _select_best_position(self, positions: List[int], text: str, title_or_content: str) -> int:
+        """
+        从多个位置中选择最佳位置(优先选择独占一行的)
+        
+        参数:
+            positions: 候选位置列表
+            text: 全文
+            title_or_content: 标题或标题正文部分
+            
+        返回:
+            int: 最佳位置,如果找不到独占一行的则返回-1
+        """
+        if not positions:
+            return -1
+        
+        # 移除转义字符后的标题
+        title_clean = self._remove_escape_chars(title_or_content)
+        title_normalized = self._normalize_title(title_clean)
+        
+        if not title_normalized:
+            return -1
         
-        # 如果编号确认不了,检查每个位置所在的行是否只有标题(没有其他字符)
         best_pos = -1
         best_score = -1
         
-        for pos in candidate_positions:
+        for pos in positions:
             # 找到该位置所在的行
             line_start = text.rfind('\n', 0, pos) + 1
             line_end = text.find('\n', pos)
@@ -220,20 +306,15 @@ class TitleMatcher:
             line_text = text[line_start:line_end]
             line_clean = self._remove_escape_chars(line_text).strip()
             
-            # 检查该行是否只包含标题(允许前后有少量空白和标点
-            if self._is_line_only_title(line_clean, title_content_normalized):
+            # 检查该行是否只包含标题(没有其他非转义字符
+            if self._is_line_only_title(line_clean, title_normalized):
                 # 计算匹配度(行越短、越接近标题,分数越高)
                 score = 1000 - len(line_clean)
                 if score > best_score:
                     best_score = score
                     best_pos = pos
         
-        # 如果找到了只包含标题的行,返回该位置
-        if best_pos >= 0:
-            return best_pos
-        
-        # 否则返回第一个匹配位置
-        return candidate_positions[0]
+        return best_pos
 
     def _find_title_in_text_legacy(self, title: str, text: str, fuzzy_threshold: float) -> int:
         """

+ 1 - 3
core/construction_review/component/doc_worker/命令

@@ -1,11 +1,9 @@
-python -m file_parse.pdf_worker.cli ".\路桥\47_四川川交路桥有限责任公司会理至禄劝(四川境)高速公路项目土建项目ZCB1-3合同段项目经理部.pdf" -l 1 --max-size 3000 --min-size 50 -o ./output
 python -m file_parse.docx_worker.cli ".\路桥\47_四川川交路桥有限责任公司会理至禄劝(四川境)高速公路项目土建项目ZCB1-3合同段项目经理部.docx" -l 1 --max-size 3000 --min-size 50 -o ./output
+python -m file_parse.pdf_worker.cli ".\路桥\47_四川川交路桥有限责任公司会理至禄劝(四川境)高速公路项目土建项目ZCB1-3合同段项目经理部.pdf" -l 1 --max-size 3000 --min-size 50 -o ./output
 
 
 
 python -m file_parse.pdf_worker.cli "Z:\施工方案及编制依据案例库(第一阶段)1205\施工方案文档列表\44_四川公路桥梁建设集团有限公司镇巴(川陕界)至广安高速公路通广段C合同段C4项目经理部.pdf" -l 1 --max-size 3000 --min-size 50 -o ./output
 
 
-            user_code = first_account.get('userCode')
-