Przeglądaj źródła

dev:优化了文件处理模块的章节定位不准问题与性能卡顿问题;

ChenJiSheng 2 miesięcy temu
rodzic
commit
c55917ef25

+ 1 - 0
core/construction_review/component/doc_worker/__init__.py

@@ -39,3 +39,4 @@ __all__ = [
 
 
 
 
 
 
+

+ 1 - 0
core/construction_review/component/doc_worker/config/provider.py

@@ -51,3 +51,4 @@ default_config_provider = YamlConfigProvider()
 
 
 
 
 
 
+

+ 1 - 0
core/construction_review/component/doc_worker/interfaces.py

@@ -226,3 +226,4 @@ class FileParseFacade(ABC):
 
 
 
 
 
 
+

+ 1 - 0
core/construction_review/component/doc_worker/utils/text_split_support.py

@@ -113,3 +113,4 @@ class SimpleChunkSplitter:
 
 
 
 
 
 
+

+ 235 - 77
core/construction_review/component/doc_worker/utils/title_matcher.py

@@ -27,7 +27,14 @@ class TitleMatcher:
         pages_content: List[Dict[str, Any]],
         pages_content: List[Dict[str, Any]],
         toc_pages: List[int],
         toc_pages: List[int],
     ) -> List[Dict[str, Any]]:
     ) -> List[Dict[str, Any]]:
-        """在正文中定位已分类标题(跳过目录页范围)。"""
+        """
+        在正文中定位已分类标题(跳过目录页范围)。
+        
+        优化逻辑(参考 doc_worker):
+        1. 先在全文中查找标题位置
+        2. 如果找到的位置在目录页范围内,继续在目录页之后查找
+        3. 如果找到的位置不在目录页范围内,直接使用该位置
+        """
         # 计算目录页的文本范围
         # 计算目录页的文本范围
         toc_start_pos = float("inf")
         toc_start_pos = float("inf")
         toc_end_pos = 0
         toc_end_pos = 0
@@ -46,47 +53,34 @@ class TitleMatcher:
             category = item.get("category", "")
             category = item.get("category", "")
             category_code = item.get("category_code", "other")
             category_code = item.get("category_code", "other")
 
 
-            # 直接在目录页之后的正文中查找(跳过目录页)
-            if toc_end_pos > 0 and toc_end_pos < len(full_text):
-                # 只在目录页之后的正文中查找
-                search_start = int(toc_end_pos)
-                remaining_text = full_text[search_start:]
-                pos_in_remaining = self._find_title_in_text(title, remaining_text, fuzzy_threshold)
+            # 步骤1: 在全文中查找标题位置
+            pos = self._find_title_in_text(title, full_text, fuzzy_threshold)
+            
+            # 步骤2: 如果找到的位置在目录页范围内,继续在目录页之后查找
+            if pos >= 0 and toc_end_pos > 0 and toc_start_pos <= pos < toc_end_pos:
+                print(f"    [跳过目录] {title} -> 位置: {pos} (在目录页)")
                 
                 
-                if pos_in_remaining >= 0:
-                    pos = search_start + pos_in_remaining
-                    page_num = self._get_page_number(pos, pages_content)
-                    print(f"    [找到正文] {title} -> 页码: {page_num}, 位置: {pos}")
-                    located.append(
-                        {
-                            "title": title,
-                            "category": category,
-                            "category_code": category_code,
-                            "position": pos,
-                            "toc_page": item.get("page", ""),
-                            "actual_page": page_num,
-                            "found": True,
-                        }
-                    )
+                # 在目录页之后继续查找
+                if toc_end_pos < len(full_text):
+                    search_start = int(toc_end_pos)
+                    remaining_text = full_text[search_start:]
+                    pos_in_remaining = self._find_title_in_text(title, remaining_text, fuzzy_threshold)
+                    
+                    if pos_in_remaining >= 0:
+                        pos = search_start + pos_in_remaining
+                        print(f"    [找到正文] {title} -> 位置: {pos}")
+                    else:
+                        pos = -1
+                        print(f"    [未找到] {title} (目录页之后)")
                 else:
                 else:
-                    print(f"    [未找到] {title} (目录页之后)")
-                    located.append(
-                        {
-                            "title": title,
-                            "category": category,
-                            "category_code": category_code,
-                            "position": -1,
-                            "toc_page": item.get("page", ""),
-                            "found": False,
-                        }
-                    )
-            else:
-                # 如果没有目录页信息,在全文中查找
-                pos = self._find_title_in_text(title, full_text, fuzzy_threshold)
-                
-                if pos >= 0:
+                    pos = -1
+                    print(f"    [未找到] {title} (目录页之后无内容)")
+            
+            # 步骤3: 确认位置并添加到结果
+            if pos >= 0:
+                # 确认位置不在目录页(避免误判)
+                if not (toc_end_pos > 0 and toc_start_pos <= pos < toc_end_pos):
                     page_num = self._get_page_number(pos, pages_content)
                     page_num = self._get_page_number(pos, pages_content)
-                    print(f"    [找到] {title} -> 页码: {page_num}, 位置: {pos}")
                     located.append(
                     located.append(
                         {
                         {
                             "title": title,
                             "title": title,
@@ -98,8 +92,10 @@ class TitleMatcher:
                             "found": True,
                             "found": True,
                         }
                         }
                     )
                     )
+                    print(f"    [确认] {title} -> 页码: {page_num}, 位置: {pos}")
                 else:
                 else:
-                    print(f"    [未找到] {title}")
+                    # 位置仍然在目录页内,标记为未找到
+                    print(f"    [未找到] {title} (只在目录页)")
                     located.append(
                     located.append(
                         {
                         {
                             "title": title,
                             "title": title,
@@ -110,58 +106,82 @@ class TitleMatcher:
                             "found": False,
                             "found": False,
                         }
                         }
                     )
                     )
+            else:
+                print(f"    [未找到] {title}")
+                located.append(
+                    {
+                        "title": title,
+                        "category": category,
+                        "category_code": category_code,
+                        "position": -1,
+                        "toc_page": item.get("page", ""),
+                        "found": False,
+                    }
+                )
 
 
         return located
         return located
 
 
     def _find_title_in_text(self, title: str, text: str, fuzzy_threshold: float) -> int:
     def _find_title_in_text(self, title: str, text: str, fuzzy_threshold: float) -> int:
-        """在文本中查找标题的近似位置(返回标题在文本中的精确起始位置)。"""
-        title_norm = self._normalize(title)
-        if not title_norm:
+        """
+        在文本中查找标题的近似位置(返回标题在文本中的精确起始位置)。
+        
+        优化逻辑(参考 doc_worker):
+        1. 使用清理后的文本进行精确匹配
+        2. 移除所有空格后进行匹配
+        3. 行级模糊匹配作为最后手段
+        """
+        # 移除转义字符后的标题和文本
+        title_clean = self._remove_escape_chars(title)
+        text_clean = self._remove_escape_chars(text)
+        
+        # 标准化标题(统一空白字符)
+        normalized_title = self._normalize_title(title_clean)
+        
+        if not normalized_title:
             return -1
             return -1
 
 
-        # 方法1: 直接在原始文本中查找(不标准化)
-        if title in text:
-            return text.find(title)
-
-        # 方法2: 标准化后查找,然后映射回原始位置
-        text_norm = self._normalize(text)
-        idx = text_norm.find(title_norm)
-        if idx >= 0:
-            # 尝试在原始文本中找到对应位置
-            # 简单估算:标准化可能会移除一些字符,所以原始位置可能稍有偏移
-            # 在估算位置附近搜索
-            search_start = max(0, idx - 50)
-            search_end = min(len(text), idx + len(title) + 50)
-            search_window = text[search_start:search_end]
-            
-            if title in search_window:
-                return search_start + search_window.find(title)
+        # 方法1: 在清理后的文本中精确匹配,然后映射回原始位置
+        if normalized_title in text_clean:
+            pos_in_clean = text_clean.index(normalized_title)
+            # 映射回原始文本的位置
+            original_pos = self._map_clean_position_to_original(pos_in_clean, text, text_clean, normalized_title)
+            if original_pos >= 0:
+                return original_pos
 
 
-        # 方法3: 行级模糊匹配(最后的手段)
+        # 方法2: 移除所有空格后匹配
+        title_no_space = normalized_title.replace(' ', '')
+        text_clean_no_space = text_clean.replace(' ', '')
+        if title_no_space and title_no_space in text_clean_no_space:
+            pos_in_clean_no_space = text_clean_no_space.index(title_no_space)
+            # 映射回原始文本的位置
+            original_pos = self._map_clean_position_to_original(pos_in_clean_no_space, text, text_clean_no_space, title_no_space)
+            if original_pos >= 0:
+                return original_pos
+
+        # 方法3: 按行查找,匹配度最高的行
+        lines_original = text.split('\n')
+        current_pos_original = 0
         best_ratio = 0.0
         best_ratio = 0.0
         best_pos = -1
         best_pos = -1
-        best_line_start = -1
-        cur_pos = 0
         
         
-        for line in text.split("\n"):
-            line_norm = self._normalize(line)
-            if len(line_norm) < 3:
-                cur_pos += len(line) + 1
+        for line_original in lines_original:
+            line_clean = self._remove_escape_chars(line_original)
+            line_stripped = line_clean.strip()
+            
+            if len(line_stripped) < 3:
+                current_pos_original += len(line_original) + 1
                 continue
                 continue
             
             
-            ratio = SequenceMatcher(None, title_norm, line_norm).ratio()
+            # 计算相似度
+            ratio = SequenceMatcher(None, normalized_title, line_stripped).ratio()
+            
             if ratio > best_ratio:
             if ratio > best_ratio:
                 best_ratio = ratio
                 best_ratio = ratio
-                best_line_start = cur_pos
-                # 尝试在这一行中找到标题的精确位置
-                if title in line:
-                    best_pos = cur_pos + line.find(title)
-                else:
-                    # 如果找不到精确位置,使用行首
-                    best_pos = cur_pos
+                best_pos = current_pos_original
             
             
-            cur_pos += len(line) + 1
-
+            current_pos_original += len(line_original) + 1
+        
+        # 如果找到相似度足够高的行
         if best_ratio >= fuzzy_threshold:
         if best_ratio >= fuzzy_threshold:
             return best_pos
             return best_pos
         
         
@@ -180,6 +200,144 @@ class TitleMatcher:
         # 合并空白
         # 合并空白
         text = re.sub(r"\s+", " ", text)
         text = re.sub(r"\s+", " ", text)
         return text.strip()
         return text.strip()
+    
+    def _normalize_title(self, title: str) -> str:
+        """标准化标题用于匹配(统一空白字符)。"""
+        normalized = re.sub(r'\s+', ' ', title)
+        normalized = normalized.strip()
+        return normalized
+    
+    def _remove_escape_chars(self, text: str) -> str:
+        """
+        移除文本中可能的各种转义字符和特殊字符。
+        完全不保留任何转义字符(如换行、制表、回车等),只保留普通空格和可见字符。
+        
+        参考 doc_worker 的实现。
+        """
+        if not text:
+            return text
+        
+        # 第一步:移除所有控制字符(包括换行符\n、制表符\t、回车符\r等)
+        # \x00-\x1F: 控制字符(包括\n=0x0A, \r=0x0D, \t=0x09等)
+        # \x7F: DEL字符
+        text = re.sub(r'[\x00-\x1F\x7F]', '', text)
+        
+        # 第二步:移除零宽字符和特殊Unicode空白字符
+        # \u200B-\u200D: 零宽空格、零宽非断字符、零宽断字符
+        # \uFEFF: 零宽无断字符(BOM)
+        # \u2028: 行分隔符
+        # \u2029: 段落分隔符
+        # \u2000-\u200A: 各种Unicode空格字符
+        text = re.sub(r'[\u2000-\u200D\u2028\u2029\uFEFF]', '', text)
+        
+        # 第三步:将全角空格转换为普通空格(保留其他全角字符)
+        text = text.replace('\u3000', ' ')
+        
+        # 第四步:统一处理连续空格(将多个连续空格替换为单个空格)
+        # 注意:这里只处理普通空格(U+0020),不处理其他空白字符(因为已经移除了)
+        text = re.sub(r' +', ' ', text)
+        
+        # 第五步:去除首尾空格
+        text = text.strip()
+        
+        return text
+    
+    def _map_clean_position_to_original(self, clean_pos: int, original_text: str, clean_text: str, search_pattern: str = None) -> int:
+        """
+        将清理后文本的位置映射回原始文本的位置。
+        
+        参数:
+            clean_pos: 清理后文本中的位置
+            original_text: 原始文本
+            clean_text: 清理后的文本
+            search_pattern: 要搜索的模式(用于在原始文本中直接查找)
+            
+        返回:
+            int: 原始文本中的位置,如果未找到则返回-1
+        """
+        if clean_pos >= len(clean_text):
+            return len(original_text)
+        
+        # 如果提供了搜索模式,先在原始文本中直接查找
+        if search_pattern:
+            # 尝试在原始文本中直接查找(移除转义字符后)
+            pattern_clean = self._remove_escape_chars(search_pattern)
+            if not pattern_clean:
+                pattern_clean = search_pattern
+            
+            # 在原始文本中查找匹配的位置
+            # 使用一个滑动窗口,对每个位置清理后进行比较
+            search_window_size = min(len(original_text), len(original_text))
+            step = max(1, len(pattern_clean) // 4)  # 步长,避免太慢
+            
+            for i in range(0, search_window_size, step):
+                if i + len(pattern_clean) * 2 > len(original_text):
+                    break
+                
+                # 取一个窗口,清理后检查是否包含模式
+                window = original_text[i:i + len(pattern_clean) * 3]
+                window_clean = self._remove_escape_chars(window)
+                
+                if pattern_clean in window_clean:
+                    # 找到模式在窗口中的位置
+                    pos_in_window = window_clean.index(pattern_clean)
+                    # 映射回原始窗口的位置
+                    original_window_pos = self._find_pattern_in_original_window(
+                        pattern_clean, window, i
+                    )
+                    if original_window_pos >= 0:
+                        return original_window_pos
+        
+        # 如果直接查找失败,使用基于比例的估算
+        if len(clean_text) > 0:
+            ratio = clean_pos / len(clean_text)
+            estimated_pos = int(ratio * len(original_text))
+            # 在估算位置附近查找
+            search_range = min(100, len(original_text) // 10)
+            start = max(0, estimated_pos - search_range)
+            end = min(len(original_text), estimated_pos + search_range)
+            
+            if search_pattern:
+                # 在估算位置附近查找模式
+                pattern_clean_local = self._remove_escape_chars(search_pattern)
+                for i in range(start, end):
+                    if i + len(search_pattern) > len(original_text):
+                        break
+                    window = original_text[i:i + len(search_pattern) * 2]
+                    window_clean = self._remove_escape_chars(window)
+                    if search_pattern in window_clean or (pattern_clean_local and pattern_clean_local in window_clean):
+                        return i
+            
+            return estimated_pos
+        
+        return -1
+    
+    def _find_pattern_in_original_window(self, pattern_clean: str, original_window: str, window_start_pos: int) -> int:
+        """
+        在原始窗口中找到清理后模式对应的位置。
+        
+        参数:
+            pattern_clean: 清理后的模式
+            original_window: 原始窗口文本
+            window_start_pos: 窗口在原始文本中的起始位置
+            
+        返回:
+            int: 模式在原始文本中的位置,如果未找到则返回-1
+        """
+        # 尝试在原始窗口中直接查找
+        if pattern_clean in original_window:
+            return window_start_pos + original_window.index(pattern_clean)
+        
+        # 如果直接查找失败,使用清理后的窗口
+        window_clean = self._remove_escape_chars(original_window)
+        if pattern_clean in window_clean:
+            pos_in_clean = window_clean.index(pattern_clean)
+            # 映射回原始窗口的位置(近似)
+            if len(window_clean) > 0:
+                ratio = pos_in_clean / len(window_clean)
+                return window_start_pos + int(ratio * len(original_window))
+        
+        return -1
 
 
     def _get_page_number(self, position: int, pages_content: List[Dict[str, Any]]) -> int:
     def _get_page_number(self, position: int, pages_content: List[Dict[str, Any]]) -> int:
         for page in pages_content:
         for page in pages_content:

+ 6 - 2
core/construction_review/component/doc_worker/命令

@@ -1,2 +1,6 @@
-python -m core.construction_review.component.doc_worker.pdf_worker.cli ".\路桥\47_四川川交路桥有限责任公司会理至禄劝(四川境)高速公路项目土建项目ZCB1-3合同段项目经理部.pdf" -l 1 --max-size 3000 --min-size 50 -o ./output
-python -m core.construction_review.component.doc_worker.docx_worker.cli ".\路桥\47_四川川交路桥有限责任公司会理至禄劝(四川境)高速公路项目土建项目ZCB1-3合同段项目经理部.docx" -l 1 --max-size 3000 --min-size 50 -o ./output
+python -m file_parse.pdf_worker.cli ".\路桥\47_四川川交路桥有限责任公司会理至禄劝(四川境)高速公路项目土建项目ZCB1-3合同段项目经理部.pdf" -l 1 --max-size 3000 --min-size 50 -o ./output
+python -m file_parse.docx_worker.cli ".\路桥\47_四川川交路桥有限责任公司会理至禄劝(四川境)高速公路项目土建项目ZCB1-3合同段项目经理部.docx" -l 1 --max-size 3000 --min-size 50 -o ./output
+
+
+
+python -m file_parse.pdf_worker.cli "Z:\施工方案及编制依据案例库(第一阶段)1205\施工方案文档列表\44_四川公路桥梁建设集团有限公司镇巴(川陕界)至广安高速公路通广段C合同段C4项目经理部.pdf" -l 1 --max-size 3000 --min-size 50 -o ./output