Kaynağa Gözat

dev:修复文件切分模块的字段传值问题;

ChenJiSheng 2 hafta önce
ebeveyn
işleme
b6c6fdd095

+ 2 - 0
core/construction_review/component/doc_worker/__init__.py

@@ -37,3 +37,5 @@ __all__ = [
 
 
 
+
+

+ 2 - 1
core/construction_review/component/doc_worker/config/provider.py

@@ -11,7 +11,6 @@ from pathlib import Path
 from typing import Any, Dict
 
 import yaml
- 
 
 from ..interfaces import ConfigProvider
 
@@ -50,3 +49,5 @@ default_config_provider = YamlConfigProvider()
 
 
 
+
+

+ 1 - 0
core/construction_review/component/doc_worker/docx_worker/full_text_extractor.py

@@ -8,6 +8,7 @@ from __future__ import annotations
 
 import re
 from io import BytesIO
+from pathlib import Path
 from typing import Any, Dict, List
 
 from docx import Document

+ 0 - 1
core/construction_review/component/doc_worker/docx_worker/text_splitter.py

@@ -17,7 +17,6 @@ from ..interfaces import TextSplitter
 from ..utils.title_matcher import TitleMatcher
 
 
-
 class DocxTextSplitter(TextSplitter):
     """按目录层级对 DOCX 正文进行智能分块的实现"""
 

+ 2 - 0
core/construction_review/component/doc_worker/interfaces.py

@@ -224,3 +224,5 @@ class FileParseFacade(ABC):
 
 
 
+
+

+ 5 - 5
core/construction_review/component/doc_worker/pdf_worker/adapter.py

@@ -13,11 +13,11 @@ from typing import List, Optional
 
 from ..config.provider import default_config_provider
 from ..interfaces import DocumentPipeline, FileParseFacade, ResultWriter
-from ..pdf_worker.classifier import PdfHierarchyClassifier
-from ..pdf_worker.fulltext_extractor import PdfFullTextExtractor
-from ..pdf_worker.json_writer import PdfJsonResultWriter
-from ..pdf_worker.text_splitter import PdfTextSplitter
-from ..pdf_worker.toc_extractor import PdfTOCExtractor
+from .classifier import PdfHierarchyClassifier
+from .fulltext_extractor import PdfFullTextExtractor
+from .json_writer import PdfJsonResultWriter
+from .text_splitter import PdfTextSplitter
+from .toc_extractor import PdfTOCExtractor
 from ..pipeline import (
     DefaultDocumentPipeline,
     DefaultFileParseFacade,

+ 2 - 0
core/construction_review/component/doc_worker/pdf_worker/classifier.py

@@ -121,3 +121,5 @@ class PdfHierarchyClassifier(HierarchyClassifier):
 
 
 
+
+

+ 0 - 638
core/construction_review/component/doc_worker/pdf_worker/text_splitter.py

@@ -18,7 +18,6 @@ from ..interfaces import TextSplitter
 from ..utils.title_matcher import TitleMatcher
 
 
-
 class PdfTextSplitter(TextSplitter):
     """按目录层级对 PDF 正文进行智能分块的实现(复刻 doc_worker 逻辑)。"""
 
@@ -689,640 +688,3 @@ class PdfTextSplitter(TextSplitter):
         return self._extract_title_number(last_level_part)
 
 
-
-    """按目录层级对 PDF 正文进行智能分块的实现(复刻 doc_worker 逻辑)。"""
-
-    def __init__(self) -> None:
-        self._cfg = default_config_provider
-        self._title_matcher = TitleMatcher()
-
-    def split_by_hierarchy(
-        self,
-        classification_items: List[Dict[str, Any]],
-        pages_content: List[Dict[str, Any]],
-        toc_info: Dict[str, Any],
-        target_level: int,
-        max_chunk_size: int,
-        min_chunk_size: int,
-    ) -> List[Dict[str, Any]]:
-        """
-        按目录层级和字符数智能切分文本
-        
-        新的分块逻辑:
-        1. 跳过目录页,按目录项定位到指定层级的正文标题
-        2. 在指定层级正文标题所属的正文块中,先按目录项的最低层级子标题进行分块
-        3. 对每个块按字符数判断:
-           - 超过max_chunk_size的进行句子级分割(保持语义尽量完整)
-        """
-        toc_pages = toc_info.get("toc_pages", []) or []
-        all_toc_items = toc_info.get("toc_items", [])
-        
-        # 使用完整全文
-        full_text = "".join(p.get("text", "") for p in pages_content)
-
-        print(f"  正在定位{len(classification_items)}个已分类的标题...")
-        print(f"  目录所在页: {toc_pages}")
-
-        # 步骤1: 在正文中定位已分类的标题(跳过目录页)
-        located = self._title_matcher.find_title_positions(
-            classification_items, full_text, pages_content, toc_pages
-        )
-        
-        # 只保留成功定位的标题
-        found_titles = [t for t in located if t["found"]]
-        if not found_titles:
-            print(f"  错误: 未能在正文中定位任何标题")
-            return []
-
-        print(f"  成功定位 {len(found_titles)}/{len(classification_items)} 个标题")
-        
-        # 按位置排序
-        found_titles.sort(key=lambda x: x["position"])
-
-        # 步骤2: 为每个找到的标题构建完整的层级路径
-        for title_info in found_titles:
-            hierarchy_path = self._build_hierarchy_path(
-                title_info["title"], all_toc_items, target_level
-            )
-            title_info["hierarchy_path"] = hierarchy_path
-
-        # 步骤3: 按目录层级处理每个标题块
-        all_chunks: List[Dict[str, Any]] = []
-        
-        for i, title_info in enumerate(found_titles):
-            start_pos = title_info["position"]
-            
-            # 确定正文块的结束位置(下一个同级标题的位置)
-            if i + 1 < len(found_titles):
-                end_pos = found_titles[i + 1]["position"]
-            else:
-                end_pos = len(full_text)
-            
-            # 提取正文块
-            content_block = full_text[start_pos:end_pos]
-            
-            # 在正文块中查找子标题(按最低层级切分)
-            sub_chunks = self._split_by_sub_titles(
-                content_block,
-                all_toc_items,
-                title_info,
-                target_level,
-                max_chunk_size,
-                min_chunk_size,
-            )
-            
-            # 为每个子块添加元数据
-            for j, sub_chunk in enumerate(sub_chunks, 1):
-                chunk_data = self._build_chunk_metadata(
-                    sub_chunk, title_info, start_pos, pages_content, i, j
-                )
-                all_chunks.append(chunk_data)
-
-        # 步骤4: 生成最终的chunk_id和serial_number
-        final_chunks = self._finalize_chunk_ids(all_chunks)
-
-        print(f"  初始切分: {len(all_chunks)} 个块")
-        print(f"  最终块数: {len(final_chunks)} 个块")
-
-        return final_chunks
-
-    def _split_by_sub_titles(
-        self,
-        content_block: str,
-        all_toc_items: List[Dict[str, Any]],
-        parent_title_info: Dict[str, Any],
-        target_level: int,
-        max_chunk_size: int,
-        min_chunk_size: int,
-    ) -> List[Dict[str, Any]]:
-        """
-        在正文块中按子标题进行切分(按照toc_items的顺序和层级关系)
-        
-        核心逻辑:
-        1. 查找所有层级的子标题(不限于直接子标题)
-        2. 按位置排序后,两个相邻子标题之间的内容作为一个块
-        3. 只有当块超过 max_chunk_size 时才按句子切分
-        """
-        # 找到父标题在toc_items中的位置
-        parent_title = parent_title_info["title"]
-        parent_idx = -1
-        parent_level = target_level
-        
-        for idx, toc_item in enumerate(all_toc_items):
-            if toc_item["title"] == parent_title:
-                parent_idx = idx
-                parent_level = toc_item.get("level", target_level)
-                break
-
-        if parent_idx < 0:
-            # 如果找不到父标题,将整个正文块作为一个块
-            if len(content_block) > max_chunk_size:
-                return self._split_large_chunk(content_block, max_chunk_size, parent_title, [])
-            else:
-                return [
-                    {
-                        "content": content_block,
-                        "relative_start": 0,
-                        "sub_title": "",
-                        "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
-                    }
-                ]
-
-        # 找到下一个同级或更高级标题的位置(确定父标题的范围)
-        next_sibling_idx = len(all_toc_items)
-        for idx in range(parent_idx + 1, len(all_toc_items)):
-            item = all_toc_items[idx]
-            if item.get("level", 1) <= parent_level:
-                next_sibling_idx = idx
-                break
-
-        # 查找所有子标题(所有 level > parent_level 的标题)
-        # 这是关键:不限于直接子标题,而是所有更深层级的标题
-        all_sub_titles = []
-        fuzzy_threshold = float(self._cfg.get("text_splitting.fuzzy_threshold", 0.8))
-
-        for idx in range(parent_idx + 1, next_sibling_idx):
-            toc_item = all_toc_items[idx]
-            item_level = toc_item.get("level", 1)
-            
-            # 查找所有更深层级的子标题
-            if item_level > parent_level:
-                # 在正文块中查找这个子标题
-                pos = self._find_title_in_block(
-                    toc_item["title"], content_block, fuzzy_threshold
-                )
-                if pos >= 0:
-                    # 调试:显示找到的标题及其周围内容
-                    context_start = max(0, pos - 20)
-                    context_end = min(len(content_block), pos + len(toc_item["title"]) + 50)
-                    context = content_block[context_start:context_end].replace("\n", " ")
-                    print(f"        找到子标题: {toc_item['title']} (level={item_level}), 位置={pos}, 上下文: ...{context}...")
-                    
-                    all_sub_titles.append(
-                        {
-                            "title": toc_item["title"],
-                            "level": toc_item["level"],
-                            "position": pos,
-                            "toc_index": idx,
-                            "toc_item": toc_item,
-                        }
-                    )
-
-        # 按位置排序
-        all_sub_titles.sort(key=lambda x: x["position"])
-
-        # 如果没有找到任何子标题,将整个正文块作为一个块
-        if not all_sub_titles:
-            if len(content_block) > max_chunk_size:
-                return self._split_large_chunk(
-                    content_block, max_chunk_size, parent_title, 
-                    parent_title_info.get("hierarchy_path", [parent_title])
-                )
-            else:
-                return [
-                    {
-                        "content": content_block,
-                        "relative_start": 0,
-                        "sub_title": "",
-                        "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
-                    }
-                ]
-
-        # 找到最低层级(用于判断哪些是最底层的标题)
-        max_level = max(sub["level"] for sub in all_sub_titles)
-        
-        # 只保留最低层级的标题作为切分点
-        lowest_level_titles = [sub for sub in all_sub_titles if sub["level"] == max_level]
-        
-        print(f"      父标题: {parent_title}, 找到 {len(all_sub_titles)} 个子标题, 最低层级: {max_level}, 最低层级标题数: {len(lowest_level_titles)}")
-
-        # 按最低层级标题切分
-        chunks = []
-        for i, sub_title in enumerate(lowest_level_titles):
-            start_pos = sub_title["position"]
-
-            # 确定结束位置(下一个最低层级标题的位置)
-            if i + 1 < len(lowest_level_titles):
-                end_pos = lowest_level_titles[i + 1]["position"]
-            else:
-                end_pos = len(content_block)
-
-            chunk_content = content_block[start_pos:end_pos]
-            
-            # 调试信息
-            content_preview = chunk_content[:100].replace("\n", " ")
-            print(f"        切分块 {i+1}: {sub_title['title']}, 位置: {start_pos}-{end_pos}, 长度: {len(chunk_content)}, 预览: {content_preview}...")
-
-            # 检查子标题是否有实际正文内容
-            title_len = len(sub_title["title"])
-            content_after_title = chunk_content[title_len:].strip()
-
-            if not content_after_title or len(content_after_title) < 10:
-                print(f"        跳过(内容不足)")
-                continue
-
-            # 构建层级路径
-            hierarchy_path = self._build_hierarchy_path_for_subtitle(
-                sub_title["toc_item"], all_toc_items, parent_title_info
-            )
-
-            # 只有当块超过 max_chunk_size 时才按句子切分
-            if len(chunk_content) > max_chunk_size:
-                print(f"        块过大,按句子切分")
-                split_chunks = self._split_large_chunk(
-                    chunk_content, max_chunk_size, sub_title["title"], hierarchy_path
-                )
-                for split_chunk in split_chunks:
-                    split_chunk["relative_start"] = start_pos + split_chunk["relative_start"]
-                    split_chunk["sub_title"] = sub_title["title"]
-                    if "hierarchy_path" not in split_chunk:
-                        split_chunk["hierarchy_path"] = hierarchy_path
-                    chunks.append(split_chunk)
-            else:
-                # 直接作为一个块
-                chunks.append(
-                    {
-                        "content": chunk_content,
-                        "relative_start": start_pos,
-                        "sub_title": sub_title["title"],
-                        "hierarchy_path": hierarchy_path,
-                    }
-                )
-
-        # 如果所有子标题都没有正文内容,返回整个正文块
-        if not chunks:
-            if len(content_block) > max_chunk_size:
-                return self._split_large_chunk(
-                    content_block, max_chunk_size, parent_title,
-                    parent_title_info.get("hierarchy_path", [parent_title])
-                )
-            else:
-                return [
-                    {
-                        "content": content_block,
-                        "relative_start": 0,
-                        "sub_title": "",
-                        "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
-                    }
-                ]
-
-        return chunks
-
-    def _find_title_in_block(self, title: str, block: str, fuzzy_threshold: float) -> int:
-        """在文本块中查找标题位置(简化版)"""
-        # 直接使用 TitleMatcher 的方法
-        return self._title_matcher._find_title_in_text(title, block, fuzzy_threshold)
-
-    def _split_large_chunk(
-        self,
-        content: str,
-        max_chunk_size: int,
-        title: str,
-        hierarchy_path: List[str] | None = None,
-    ) -> List[Dict[str, Any]]:
-        """
-        将超大块按句子级分割(保持语义完整)
-        """
-        # 按句子分割(中文句号、问号、感叹号、换行)
-        sentences = re.split(r"([。!?\n])", content)
-
-        # 重新组合句子和标点
-        combined_sentences = []
-        for i in range(0, len(sentences) - 1, 2):
-            if i + 1 < len(sentences):
-                combined_sentences.append(sentences[i] + sentences[i + 1])
-            else:
-                combined_sentences.append(sentences[i])
-
-        if not combined_sentences:
-            combined_sentences = [content]
-
-        # 按max_chunk_size组合句子
-        chunks = []
-        current_chunk = ""
-        current_start = 0
-
-        for sentence in combined_sentences:
-            if len(current_chunk) + len(sentence) <= max_chunk_size:
-                current_chunk += sentence
-            else:
-                if current_chunk:
-                    chunk_data = {
-                        "content": current_chunk,
-                        "relative_start": current_start,
-                        "is_split": True,  # 标记为分割块
-                    }
-                    if hierarchy_path is not None:
-                        chunk_data["hierarchy_path"] = hierarchy_path
-                    chunks.append(chunk_data)
-                    current_start += len(current_chunk)
-                current_chunk = sentence
-
-        # 添加最后一个块
-        if current_chunk:
-            chunk_data = {
-                "content": current_chunk,
-                "relative_start": current_start,
-                "is_split": True,
-            }
-            if hierarchy_path is not None:
-                chunk_data["hierarchy_path"] = hierarchy_path
-            chunks.append(chunk_data)
-
-        return chunks
-
-    def _build_hierarchy_path_for_subtitle(
-        self,
-        sub_title_item: Dict[str, Any],
-        all_toc_items: List[Dict[str, Any]],
-        parent_title_info: Dict[str, Any],
-    ) -> List[str]:
-        """为子标题构建完整的层级路径"""
-        hierarchy_path = []
-
-        # 找到子标题在toc_items中的位置
-        sub_title = sub_title_item.get("title", "")
-        sub_title_idx = -1
-        for idx, item in enumerate(all_toc_items):
-            if item.get("title", "") == sub_title:
-                sub_title_idx = idx
-                break
-
-        if sub_title_idx < 0:
-            # 如果找不到,返回父标题->子标题
-            return [parent_title_info["title"], sub_title]
-
-        # 从子标题向前查找,找到每个层级的父级标题
-        level_paths = {}  # 存储每个层级对应的标题
-        current_level = sub_title_item.get("level", 2)
-
-        for i in range(sub_title_idx, -1, -1):
-            item = all_toc_items[i]
-            item_level = item.get("level", 1)
-
-            if item_level <= current_level and item_level not in level_paths:
-                level_paths[item_level] = item["title"]
-                if item_level == 1:
-                    break
-
-        # 按层级顺序构建路径(从1级到当前层级)
-        for level in range(1, current_level + 1):
-            if level in level_paths:
-                hierarchy_path.append(level_paths[level])
-
-        # 如果路径为空,至少包含父标题和子标题
-        if not hierarchy_path:
-            hierarchy_path = [parent_title_info["title"], sub_title]
-
-        return hierarchy_path
-
-    def _build_hierarchy_path(
-        self, title: str, all_toc_items: List[Dict[str, Any]], target_level: int
-    ) -> List[str]:
-        """构建从1级到当前标题的完整层级路径"""
-        hierarchy_path = []
-
-        # 找到当前标题在目录中的位置
-        current_item = None
-        current_idx = -1
-        for idx, item in enumerate(all_toc_items):
-            if item["title"] == title:
-                current_item = item
-                current_idx = idx
-                break
-
-        if not current_item:
-            # 如果找不到,返回只包含当前标题的路径
-            return [title]
-
-        current_level = current_item.get("level", target_level)
-
-        # 从当前项向前查找,找到每个层级的最近父级
-        level_paths = {}  # 存储每个层级对应的标题
-
-        for i in range(current_idx, -1, -1):
-            item = all_toc_items[i]
-            item_level = item.get("level", 1)
-
-            if item_level <= current_level and item_level not in level_paths:
-                level_paths[item_level] = item["title"]
-                if item_level == 1:
-                    break
-
-        # 按层级顺序构建路径(从1级到当前层级)
-        for level in range(1, current_level + 1):
-            if level in level_paths:
-                hierarchy_path.append(level_paths[level])
-            elif level == current_level:
-                hierarchy_path.append(title)
-
-        # 如果路径为空,至少包含当前标题
-        if not hierarchy_path:
-            hierarchy_path = [title]
-
-        return hierarchy_path
-
-    def _build_chunk_metadata(
-        self,
-        sub_chunk: Dict[str, Any],
-        title_info: Dict[str, Any],
-        start_pos: int,
-        pages_content: List[Dict[str, Any]],
-        i: int,
-        j: int,
-    ) -> Dict[str, Any]:
-        """构建文本块的元数据"""
-        content = sub_chunk["content"]
-        chunk_start_pos = start_pos + sub_chunk["relative_start"]
-        page_num = self._get_page_from_pos(chunk_start_pos, pages_content)
-
-        # 构建section_label:使用完整的层级路径
-        hierarchy_path = sub_chunk.get("hierarchy_path", [])
-        sub_title = sub_chunk.get("sub_title", "")
-
-        if hierarchy_path:
-            section_label = "->".join(hierarchy_path)
-        elif sub_title:
-            section_label = f"{title_info['title']}->{sub_title}"
-        else:
-            section_label = title_info["title"]
-
-        # 提取最低层级标题的编号
-        if hierarchy_path:
-            lowest_title = hierarchy_path[-1]
-            title_number = self._extract_title_number(lowest_title)
-        elif sub_title:
-            title_number = self._extract_title_number(sub_title)
-        else:
-            title_number = self._extract_title_number(title_info["title"])
-
-        # 构建chunk_id
-        chunk_id_str = f"doc_chunk_{title_number}_{j}" if title_number else f"doc_chunk_{j}"
-
-        return {
-            "file_name": "",  # 由上层填充
-            "chunk_id": chunk_id_str,
-            "section_label": section_label,
-            "project_plan_type": title_info.get("category_code", "other"),
-            "element_tag": {
-                "chunk_id": chunk_id_str,
-                "page": page_num,
-                "serial_number": title_number if title_number else str(i + 1),
-            },
-            "review_chunk_content": content,
-            "_title_number": title_number,
-            "_local_index": j,
-            "_sort_key": chunk_start_pos,
-        }
-
-    def _finalize_chunk_ids(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
-        """生成最终的chunk_id和serial_number"""
-        final_chunks = []
-        
-        # 按 section_label 分组,为每组内的块生成递增的序号
-        section_groups: Dict[str, int] = {}  # section_label -> 当前序号
-
-        for chunk in chunks:
-            section_label = chunk.get("section_label", "")
-            
-            # 为当前 section_label 生成序号
-            if section_label not in section_groups:
-                section_groups[section_label] = 1
-            else:
-                section_groups[section_label] += 1
-            
-            local_index = section_groups[section_label]
-
-            # 从section_label中提取标题路径的编号路径
-            title_number_path = self._extract_title_number_path(section_label)
-
-            # 生成chunk_id:doc_chunk_<标题路径的编号路径>_序号
-            if title_number_path:
-                chunk_id_str = f"doc_chunk_{title_number_path}_{local_index}"
-            else:
-                chunk_id_str = f"doc_chunk_{local_index}"
-
-            # 从section_label中提取最底层级的编号(用于 serial_number)
-            serial_number = self._extract_number_from_section_label(section_label)
-
-            # 更新chunk数据
-            final_chunk = {
-                "file_name": chunk["file_name"],
-                "chunk_id": chunk_id_str,
-                "section_label": chunk["section_label"],
-                "project_plan_type": chunk["project_plan_type"],
-                "element_tag": {
-                    "chunk_id": chunk_id_str,
-                    "page": chunk["element_tag"]["page"],
-                    "serial_number": serial_number,
-                },
-                "review_chunk_content": chunk["review_chunk_content"],
-            }
-
-            final_chunks.append(final_chunk)
-
-        return final_chunks
-
-    def _get_page_from_pos(self, pos: int, pages_content: List[Dict[str, Any]]) -> int:
-        """根据位置获取页码"""
-        for page in pages_content:
-            if page["start_pos"] <= pos < page["end_pos"]:
-                return int(page["page_num"])
-        return 1
-
-    def _extract_title_number(self, title: str) -> str:
-        """从标题中提取编号部分(支持多种格式)"""
-        if not title:
-            return ""
-        
-        # 匹配章节格式(如 第一章、第1章等)
-        chapter_match = re.match(r"^(第[一二三四五六七八九十\d]+[章节条款部分])", title)
-        if chapter_match:
-            return chapter_match.group(1)
-        
-        # 匹配方括号数字格式(如 【1】、【2】等)
-        bracket_match = re.match(r"^(【\d+】)", title)
-        if bracket_match:
-            return bracket_match.group(1)
-        
-        # 匹配双方括号数字格式(如 〖1.1〗、〖2.3〗等)
-        double_bracket_match = re.match(r"^(〖\d+(?:\.\d+)*〗)", title)
-        if double_bracket_match:
-            return double_bracket_match.group(1)
-        
-        # 匹配数字编号格式(如 1.5, 1.6, 1.2.3等)
-        number_match = re.match(r"^(\d+(?:\.\d+)*)", title)
-        if number_match:
-            return number_match.group(1)
-        
-        # 匹配中文编号格式(如 一、二、三等)
-        chinese_match = re.match(r"^([一二三四五六七八九十]+)[、..)\)]", title)
-        if chinese_match:
-            return chinese_match.group(1)
-        
-        # 匹配圆括号编号格式(如 (1)、(一)等)
-        paren_match = re.match(r"^([\((][一二三四五六七八九十\d]+[\))])", title)
-        if paren_match:
-            return paren_match.group(1)
-        
-        return ""
-
-    def _extract_title_number_path(self, section_label: str) -> str:
-        """从section_label中提取标题路径的编号路径"""
-        if not section_label:
-            return ""
-
-        # 按"->"分割层级路径
-        parts = section_label.split("->")
-
-        # 提取每一层的编号
-        number_paths = []
-        for part in parts:
-            part = part.strip()
-            if part:
-                number = self._extract_title_number(part)
-                if number:
-                    number_paths.append(number)
-
-        # 用"->"连接编号路径
-        if number_paths:
-            return "->".join(number_paths)
-
-        return ""
-
-    def _extract_number_from_section_label(self, section_label: str) -> str:
-        """
-        从section_label中提取最底层级的编号
-        
-        例如:
-        "第一章 编制依据与说明->一) 编制依据" -> "一)"
-        "第二章 工程概况->二)周边环境条件及工程地质->1、周边环境条件" -> "1"
-        "第四章 施工工艺技术->一)主要部件说明->2、前临时支腿" -> "2"
-        """
-        if not section_label:
-            return ""
-
-        # 先找到最低层级部分(最后一个"->"后面的部分)
-        if "->" in section_label:
-            last_level_part = section_label.split("->")[-1].strip()
-        else:
-            last_level_part = section_label.strip()
-
-        # 检查最低层级部分是否包含合并标记(" + ")
-        if " + " in last_level_part:
-            # 分割合并的部分
-            merged_parts = last_level_part.split(" + ")
-            numbers = []
-            for part in merged_parts:
-                part = part.strip()
-                number = self._extract_title_number(part)
-                if number:
-                    numbers.append(number)
-
-            if numbers:
-                return "+".join(numbers)
-
-        # 没有合并的情况,直接提取最低层级的编号
-        return self._extract_title_number(last_level_part)
-
-

+ 4 - 2
core/construction_review/component/doc_worker/pdf_worker/toc_extractor.py

@@ -1,7 +1,7 @@
 """
-PDF 目录提取实现(基于 doc_worker 接口)
+PDF 目录提取实现(基于 file_parse 接口)
 
-只处理 PDF,不依赖上层业务
+只处理 PDF,不依赖 doc_worker
 """
 
 from __future__ import annotations
@@ -79,3 +79,5 @@ class PdfTOCExtractor(TOCExtractor):
 
 
 
+
+

+ 2 - 0
core/construction_review/component/doc_worker/utils/text_split_support.py

@@ -111,3 +111,5 @@ class SimpleChunkSplitter:
 
 
 
+
+

+ 2 - 0
core/construction_review/component/doc_worker/utils/title_matcher.py

@@ -191,3 +191,5 @@ class TitleMatcher:
 
 
 
+
+

+ 2 - 0
core/construction_review/component/doc_worker/utils/toc_level_identifier.py

@@ -130,3 +130,5 @@ class TOCLevelIdentifier:
 
 
 
+
+

+ 1 - 1
core/construction_review/component/doc_worker/utils/toc_pattern_matcher.py

@@ -2,7 +2,7 @@
 目录模式匹配工具(PDF / Word 通用)
 
 该实现与原 doc_worker 中的 TOCPatternMatcher 逻辑等价,
-但独立存在于 doc_worker.utils 中,便于被多种 worker 复用。
+但独立存在于 file_parse.utils 中,便于被多种 worker 复用。
 """
 
 from __future__ import annotations

+ 2 - 2
core/construction_review/component/doc_worker/命令

@@ -1,2 +1,2 @@
-python -m file_parse.pdf_worker.cli ".\路桥\47_四川川交路桥有限责任公司会理至禄劝(四川境)高速公路项目土建项目ZCB1-3合同段项目经理部.pdf" -l 1 --max-size 3000 --min-size 50 -o ./output
-python -m file_parse.docx_worker.cli ".\路桥\47_四川川交路桥有限责任公司会理至禄劝(四川境)高速公路项目土建项目ZCB1-3合同段项目经理部.docx" -l 1 --max-size 3000 --min-size 50 -o ./output
+python -m core.construction_review.component.doc_worker.pdf_worker.cli ".\路桥\47_四川川交路桥有限责任公司会理至禄劝(四川境)高速公路项目土建项目ZCB1-3合同段项目经理部.pdf" -l 1 --max-size 3000 --min-size 50 -o ./output
+python -m core.construction_review.component.doc_worker.docx_worker.cli ".\路桥\47_四川川交路桥有限责任公司会理至禄劝(四川境)高速公路项目土建项目ZCB1-3合同段项目经理部.docx" -l 1 --max-size 3000 --min-size 50 -o ./output

+ 4 - 2
core/construction_review/component/document_processor.py

@@ -190,6 +190,7 @@ class DocumentProcessor:
                             'chunk_id': chunk.get('chunk_id', ''),
                             'section_label': chunk.get('section_label', ''),
                             'project_plan_type': chunk.get('project_plan_type', ''),
+                            'chapter_classification': chunk.get('chapter_classification', ''),
                             'element_tag': chunk.get('element_tag', {})
                         }
                     }
@@ -326,6 +327,7 @@ class DocumentProcessor:
                             'chunk_id': chunk.get('chunk_id', ''),
                             'section_label': chunk.get('section_label', ''),
                             'project_plan_type': chunk.get('project_plan_type', ''),
+                            'chapter_classification': chunk.get('chapter_classification', ''),
                             'element_tag': chunk.get('element_tag', {})
                         }
                     }
@@ -540,8 +542,8 @@ class DocumentProcessor:
                 # 处理原始大纲,按章节层级结构化 - 复用doc_worker的逻辑
                 result['outline'] = self._create_outline_from_toc(raw_content.get('toc_info', {}))
 
-            # with open(rf"temp\document_temp\文档切分预处理结果.json", 'w', encoding='utf-8') as f:
-            #     json.dump(result, f, ensure_ascii=False, indent=4)
+            with open(rf"temp\document_temp\文档切分预处理结果.json", 'w', encoding='utf-8') as f:
+                json.dump(result, f, ensure_ascii=False, indent=4)
             return result
 
         except Exception as e:

BIN
路桥/47_四川川交路桥有限责任公司会理至禄劝(四川境)高速公路项目土建项目ZCB1-3合同段项目经理部.docx