Parcourir la source

Merge branch 'dev' of http://192.168.0.3:3000/CRBC-MaaS-Platform-Project/LQAgentPlatform into dev

Diamond_ore il y a 2 mois
Parent
commit
b830a3e124
34 fichiers modifiés avec 14146 ajouts et 6135 suppressions
  1. 2 0
      .gitignore
  2. 65 100
      config/config.ini.template
  3. 5 1
      core/base/task_models.py
  4. 2 2
      core/construction_review/component/ai_review_engine.py
  5. 2 0
      core/construction_review/component/doc_worker/__init__.py
  6. 68 121
      core/construction_review/component/doc_worker/classification/hierarchy_classifier.py
  7. 30 0
      core/construction_review/component/doc_worker/config/llm_api.yaml
  8. 53 0
      core/construction_review/component/doc_worker/config/prompt.yaml
  9. 2 0
      core/construction_review/component/doc_worker/config/provider.py
  10. 35 26
      core/construction_review/component/doc_worker/docx_worker/full_text_extractor.py
  11. 89 353
      core/construction_review/component/doc_worker/docx_worker/text_splitter.py
  12. 12 10
      core/construction_review/component/doc_worker/docx_worker/toc_extractor.py
  13. 2 0
      core/construction_review/component/doc_worker/interfaces.py
  14. 23 0
      core/construction_review/component/doc_worker/pdf_worker/__init__.py
  15. 641 0
      core/construction_review/component/doc_worker/pdf_worker/text_splitter.py
  16. 374 0
      core/construction_review/component/doc_worker/utils/llm_client.py
  17. 80 0
      core/construction_review/component/doc_worker/utils/prompt_loader.py
  18. 321 0
      core/construction_review/component/doc_worker/utils/text_split_support.py
  19. 300 4
      core/construction_review/component/doc_worker/utils/title_matcher.py
  20. 5 0
      core/construction_review/component/doc_worker/命令
  21. 13 12
      core/construction_review/component/document_processor.py
  22. 1 1
      core/construction_review/component/reviewers/outline_reviewer.py
  23. 124 13
      core/construction_review/workflows/ai_review_workflow.py
  24. 4544 100
      logs/agent_debug.log.1
  25. 1401 134
      logs/agent_debug.log.2
  26. 0 0
      logs/agent_debug.log.3
  27. 0 164
      logs/agent_debug.log.4
  28. 0 4832
      logs/agent_debug.log.5
  29. 4544 100
      logs/agent_info.log.1
  30. 1401 134
      logs/agent_info.log.2
  31. 0 0
      logs/agent_info.log.3
  32. 0 0
      logs/agent_info.log.4
  33. 0 28
      logs/agent_info.log.5
  34. 7 0
      views/construction_review/launch_review.py

+ 2 - 0
.gitignore

@@ -74,3 +74,5 @@ temp\AI审查结果.json
 mineru_temp/
 config/config.ini
 路桥/
+output/
+命令

+ 65 - 100
config/config.ini.template

@@ -1,72 +1,75 @@
-# LQ Agent Platform 配置文件
+# LQ Agent Platform 配置文件模板
+# 说明: 复制此文件为 config.ini 并根据实际环境修改配置值
 
 
 [model]
-# 模型类型选择: gemini, deepseek, doubao, qwen
-MODEL_TYPE=gemini
+MODEL_TYPE=lq_qwen3_8b
 
 # Embedding模型类型选择: lq_qwen3_8b_emd, siliconflow_embed
 EMBEDDING_MODEL_TYPE=lq_qwen3_8b_emd
 
-# Rerank模型类型选择: bge, qwen3
-RERANK_MODEL_TYPE=bge
+# Rerank模型类型选择: bge_rerank_model, lq_rerank_model, silicoflow_rerank_model
+RERANK_MODEL_TYPE=lq_rerank_model
 
 
 
-# ==================== 模型配置 ====================
-
 [gemini]
-# Gemini 模型配置
 GEMINI_SERVER_URL=https://generativelanguage.googleapis.com/v1beta/openai/
 GEMINI_MODEL_ID=gemini-2.0-flash
 GEMINI_API_KEY=AIzaSyBwcjYoxci4QM1mqIaVcbIf_zmsrN9yuWE
 
 [deepseek]
-# DeepSeek 模型配置
 DEEPSEEK_SERVER_URL=https://api.deepseek.com
 DEEPSEEK_MODEL_ID=deepseek-chat
 DEEPSEEK_API_KEY=sk-9fe722389bac47e9ab30cf45b32eb736
 
 [doubao]
-# 豆包 模型配置
 DOUBAO_SERVER_URL=https://ark.cn-beijing.volces.com/api/v3/
 DOUBAO_MODEL_ID=doubao-seed-1-6-flash-250715
 DOUBAO_API_KEY=c98686df-506f-432c-98de-32e571a8e916
 
+
 [qwen]
-# Qwen 模型配置
 QWEN_SERVER_URL=https://api-inference.modelscope.cn/v1/
 QWEN_MODEL_ID=Qwen/Qwen3-4B
 QWEN_API_KEY=ms-9ad4a379-d592-4acd-b92c-8bac08a4a045
 
 
+[ai_review]
+# 调试模式配置
+MAX_REVIEW_UNITS=1
+REVIEW_MODE=first
+# REVIEW_MODE=all/random/first
 
-# ==================== 本地模型配置 ====================
 
-[lq_qwen3_8b]
-# 本地 Qwen3-8B 模型配置
-QWEN_LOCAL_1_5B_SERVER_URL=http://192.168.91.253:9002/v1
-QWEN_LOCAL_1_5B_MODEL_ID=Qwen3-8B
-QWEN_LOCAL_1_5B_API_KEY=dummy
+[app]
+APP_CODE=lq-agent
+APP_SECRET=sx-73d32556-605e-11f0-9dd8-acde48001122
 
-[lq_qwen3_4b]
-# 本地 Qwen3-4B 模型配置
-QWEN_LOCAL_1_5B_SERVER_URL=http://192.168.91.253:9001/v1
-QWEN_LOCAL_1_5B_MODEL_ID=Qwen3-4B
-QWEN_LOCAL_1_5B_API_KEY=dummy
 
-[lq_qwen3_8B_lora]
-# 本地 Qwen3-8B LoRA 模型配置
-LQ_QWEN3_8B_LQ_LORA_SERVER_URL=http://192.168.91.253:9006/v1
-LQ_QWEN3_8B_LQ_LORA_MODEL_ID=Qwen3-8B-lq-lora
-LQ_QWEN3_8B_LQ_LORA_API_KEY=dummy
+[launch]
+HOST = 0.0.0.0
+LAUNCH_PORT = 8002
+
+[redis]
+REDIS_URL=redis://127.0.0.1:6379/0
+REDIS_HOST=127.0.0.1
+REDIS_PORT=6379
+REDIS_DB=0
+REDIS_PASSWORD=123456
+REDIS_MAX_CONNECTIONS=50
 
+[log]
+LOG_FILE_PATH=logs
+LOG_FILE_MAX_MB=10
+LOG_BACKUP_COUNT=5
+CONSOLE_OUTPUT=True
 
+[user_lists]
+USERS=['user-001']
 
-# ==================== 第三方API配置 ====================
 
 [siliconflow]
-# 硅基流动 API 配置
 SLCF_MODEL_SERVER_URL=https://api.siliconflow.cn/v1
 SLCF_API_KEY=sk-npqfinszhdvnwvensnjmlqtihgevehqiyfwunedxnefkmrud
 SLCF_CHAT_MODEL_ID=test-model
@@ -81,34 +84,42 @@ SLCF_EMBED_API_KEY=sk-lgumiafjofmfzbbjusplckaijbtlcnvmugydteqpljvejsel
 SLCF_EMBED_MODEL_ID=Qwen/Qwen3-Embedding-8B
 SLCF_EMBED_DIMENSIONS=4096
 
+[lq_qwen3_8b]
+QWEN_LOCAL_1_5B_SERVER_URL=http://192.168.91.253:9002/v1
+QWEN_LOCAL_1_5B_MODEL_ID=Qwen3-8B
+QWEN_LOCAL_1_5B_API_KEY=dummy
 
+[lq_qwen3_4b]
+QWEN_LOCAL_1_5B_SERVER_URL=http://192.168.91.253:9001/v1
+QWEN_LOCAL_1_5B_MODEL_ID=Qwen3-4B
+QWEN_LOCAL_1_5B_API_KEY=dummy
 
-# ==================== 应用配置 ====================
-
-[app]
-# 应用代码和密钥
-APP_CODE=lq-agent
-APP_SECRET=sx-73d32556-605e-11f0-9dd8-acde48001122
-
-
-
-# ==================== 启动配置 ====================
-
-[launch]
-HOST=0.0.0.0
-LAUNCH_PORT=8002
-
+# 本地部署的Qwen3-Reranker-8B配置
+[lq_rerank_model]
+LQ_RERANKER_SERVER_URL=http://192.168.91.253:9004/v1/rerank
+LQ_RERANKER_MODEL=Qwen3-Reranker-8B
+LQ_RERANKER_API_KEY=dummy
+LQ_RERANKER_TOP_N=10
+
+# 硅基流动API的Qwen3-Reranker-8B配置
+[silicoflow_rerank_model]
+SILICOFLOW_RERANKER_API_URL=https://api.siliconflow.cn/v1/rerank
+SILICOFLOW_RERANKER_API_KEY=sk-lgumiafjofmfzbbjusplckaijbtlcnvmugydteqpljvejsel
+SILICOFLOW_RERANKER_MODEL=Qwen/Qwen3-Reranker-8B
+
+# BGE Reranker配置
+[bge_rerank_model]
+BGE_RERANKER_SERVER_URL=http://192.168.91.253:9004/rerank
+BGE_RERANKER_MODEL=BAAI/bge-reranker-v2-m3
+BGE_RERANKER_API_KEY=dummy
+BGE_RERANKER_TOP_N=10
 
+[lq_qwen3_8B_lora]
+LQ_QWEN3_8B_LQ_LORA_SERVER_URL=http://192.168.91.253:9006/v1
+LQ_QWEN3_8B_LQ_LORA_MODEL_ID=Qwen3-8B-lq-lora
+LQ_QWEN3_8B_LQ_LORA_API_KEY=dummy
 
-# ==================== 数据库配置 ====================
 
-[redis]
-REDIS_URL=redis://127.0.0.1:6379/0
-REDIS_HOST=127.0.0.1
-REDIS_PORT=6379
-REDIS_DB=0
-REDIS_PASSWORD=123456
-REDIS_MAX_CONNECTIONS=50
 
 [mysql]
 MYSQL_HOST=192.168.92.61
@@ -120,6 +131,7 @@ MYSQL_MIN_SIZE=1
 MYSQL_MAX_SIZE=5
 MYSQL_AUTO_COMMIT=True
 
+
 [pgvector]
 PGVECTOR_HOST=124.223.140.149
 PGVECTOR_PORT=7432
@@ -127,6 +139,7 @@ PGVECTOR_DB=vector_db
 PGVECTOR_USER=vector_user
 PGVECTOR_PASSWORD=pg16@123
 
+
 [milvus]
 MILVUS_HOST=192.168.92.61
 MILVUS_PORT=19530
@@ -136,57 +149,9 @@ MILVUS_USER=
 MILVUS_PASSWORD=
 
 
-
-# ==================== RAG 检索配置 ====================
-
 [hybrid_search]
 # 混合检索权重配置
 DENSE_WEIGHT=0.3
 SPARSE_WEIGHT=0.7
 
-[retrieval]
-# 召回配置
-
-
 
-# ==================== 重排序模型配置 ====================
-
-[rerank_model]
-# BGE Reranker 配置(本地部署)
-BGE_RERANKER_SERVER_RUL=http://192.168.91.253:9005/v1/rerank
-BGE_RERANKER_MODEL_ID=BAAI/bge-reranker-v2-m3
-BGE_RERANKER_API_KEY=dummy
-BGE_RERANKER_TOP_N=10
-
-[rerank_model_qwen]
-# Qwen3-Reranker-8B 配置(硅基流动API)
-QWEN_RERANKER_API_URL=https://api.siliconflow.cn/v1/rerank
-QWEN_RERANKER_API_KEY=sk-npqfinszhdvnwvensnjmlqtihgevehqiyfwunedxnefkmrud
-QWEN_RERANKER_MODEL=Qwen/Qwen3-Reranker-8B
-
-
-
-# ==================== AI审查配置 ====================
-
-[ai_review]
-# 调试模式配置
-MAX_REVIEW_UNITS=1
-REVIEW_MODE=first
-# REVIEW_MODE=all/random/first
-
-
-
-# ==================== 日志配置 ====================
-
-[log]
-LOG_FILE_PATH=logs
-LOG_FILE_MAX_MB=10
-LOG_BACKUP_COUNT=5
-CONSOLE_OUTPUT=True
-
-
-
-# ==================== 用户配置 ====================
-
-[user_lists]
-USERS=['user-001']

+ 5 - 1
core/base/task_models.py

@@ -38,7 +38,7 @@ class TaskFileInfo:
         self.review_config = file_info.get('review_config', [])
         self.project_plan_type = file_info.get('project_plan_type', '')
         self.tendency_review_role = file_info.get('tendency_review_role', '')
-
+        self.test_designation_chunk_flag = file_info.get('test_designation_chunk_flag', '')
         # 时间戳信息
         self.launched_at = file_info.get('launched_at', 0)
 
@@ -63,6 +63,10 @@ class TaskFileInfo:
         """获取倾向性审查角色"""
         return self.tendency_review_role
 
+    def get_test_designation_chunk_flag(self) -> str:
+        """获取测试定位标志符"""
+        return self.test_designation_chunk_flag
+
     def has_review_type(self, review_type: str) -> bool:
         """检查是否包含指定的审查类型"""
         return review_type in self.review_config

+ 2 - 2
core/construction_review/component/ai_review_engine.py

@@ -806,7 +806,7 @@ class AIReviewEngine(BaseReviewer):
 
             # 提取关键数据
             review_content = review_data.get('content', '')
-            max_concurrent = review_data.get('max_concurrent', 4)
+            max_concurrent = review_data.get('max_concurrent', 10)
 
             # 添加调试信息
             logger.info(f"提取的编制依据内容长度: {len(review_content)}")
@@ -932,7 +932,7 @@ class AIReviewEngine(BaseReviewer):
 
             # 提取关键数据
             review_content = review_data.get('content', '')
-            max_concurrent = review_data.get('max_concurrent', 4)
+            max_concurrent = review_data.get('max_concurrent', 10)
 
             # 添加调试信息
             logger.info(f"提取的编制依据内容长度: {len(review_content)}")

+ 2 - 0
core/construction_review/component/doc_worker/__init__.py

@@ -40,3 +40,5 @@ __all__ = [
 
 
 
+
+

+ 68 - 121
core/construction_review/component/doc_worker/classification/hierarchy_classifier.py

@@ -1,20 +1,22 @@
 """
-目录分类模块(基于二级目录关键词匹配
+目录分类模块(基于LLM API智能识别
 
-适配 file_parse 的配置系统,通过匹配一级目录下的二级目录关键词来判断一级目录的分类。
+适配 file_parse 的配置系统,通过异步并发调用LLM API来判断一级目录的分类。
 """
 
 from __future__ import annotations
 
-import re
 from collections import Counter
 from typing import Any, Dict, List, Optional
 
+from ..interfaces import HierarchyClassifier as IHierarchyClassifier
 from ..config.provider import default_config_provider
+from ..utils.llm_client import LLMClient
+from ..utils.prompt_loader import PromptLoader
 
 
-class HierarchyClassifier:
-    """基于层级结构的目录分类器(通过二级目录匹配来分类一级目录)"""
+class HierarchyClassifier(IHierarchyClassifier):
+    """基于层级结构的目录分类器(通过LLM API智能识别来分类一级目录)"""
 
     def __init__(self):
         """初始化分类器"""
@@ -22,35 +24,21 @@ class HierarchyClassifier:
         
         # 获取分类配置
         self.category_mapping = self._cfg.get("categories.mapping", {})
-        self.category_keywords = self._cfg.get("categories.keywords", {})
         
-        # 预编译正则表达式模式
-        self._compile_patterns()
-
-    def _compile_patterns(self):
-        """预编译所有类别的正则表达式模式"""
-        self.compiled_patterns = {}
-        
-        for category, rules in self.category_keywords.items():
-            patterns = rules.get("patterns", [])
-            compiled = []
-            for pattern in patterns:
-                try:
-                    compiled.append(re.compile(pattern, re.IGNORECASE))
-                except re.error as e:
-                    print(f"  警告: 类别 '{category}' 的正则表达式 '{pattern}' 编译失败: {e}")
-            self.compiled_patterns[category] = compiled
+        # 初始化LLM客户端和提示词加载器
+        self.llm_client = LLMClient(config_provider=self._cfg)
+        self.prompt_loader = PromptLoader()
 
     def classify(
         self, toc_items: List[Dict[str, Any]], target_level: int = 1
     ) -> Dict[str, Any]:
         """
-        对目录项进行智能分类(基于二级目录关键词匹配
+        对目录项进行智能分类(基于LLM API智能识别)
         
         新逻辑:
         1. 只对一级目录进行分类
-        2. 通过匹配一级目录下的二级目录关键词来判断一级目录的分类
-        3. 使用投票机制:统计二级目录匹配到的类别,票数最多的类别作为一级目录的分类
+        2. 通过异步并发调用LLM API,基于一级目录标题及其下属二级目录来判断分类
+        3. 使用LLM的智能理解能力进行准确分类
         
         参数:
             toc_items: 目录项列表(已经过层级识别)
@@ -65,7 +53,7 @@ class HierarchyClassifier:
                 "category_stats": {...}
             }
         """
-        print(f"\n正在对{target_level}级目录进行智能分类(基于二级目录关键词匹配)...")
+        print(f"\n正在对{target_level}级目录进行智能分类(基于LLM API识别)...")
         
         # 筛选出指定层级的目录项
         level1_items = [item for item in toc_items if item["level"] == target_level]
@@ -106,19 +94,65 @@ class HierarchyClassifier:
                 {"level1_item": level1_item, "level2_children": level2_children}
             )
         
-        print(f"  正在使用二级目录关键词进行匹配分类...")
+        print(f"  正在使用LLM API进行异步并发识别分类...")
         
-        # 对每个一级目录进行分类
+        # 准备LLM API请求
+        llm_requests = []
+        for item_with_children in level1_with_children:
+            level1_item = item_with_children["level1_item"]
+            level2_children = item_with_children["level2_children"]
+            
+            # 准备二级目录标题列表
+            level2_titles = "\n".join([f"- {child['title']}" for child in level2_children])
+            if not level2_titles:
+                level2_titles = "(无二级目录)"
+            
+            # 渲染提示词模板
+            prompt = self.prompt_loader.render(
+                "toc_classification",
+                level1_title=level1_item["title"],
+                level2_titles=level2_titles
+            )
+            
+            # 构建消息列表
+            messages = [
+                {"role": "system", "content": prompt["system"]},
+                {"role": "user", "content": prompt["user"]}
+            ]
+            
+            llm_requests.append(messages)
+        
+        # 批量异步调用LLM API
+        llm_results = self.llm_client.batch_call(llm_requests)
+        
+        # 处理分类结果
         classified_items = []
         category_stats = Counter()
         
-        for item_with_children in level1_with_children:
+        for i, (item_with_children, llm_result) in enumerate(zip(level1_with_children, llm_results)):
             level1_item = item_with_children["level1_item"]
             level2_children = item_with_children["level2_children"]
             
-            # 通过二级目录匹配来判断一级目录的分类
-            category_cn = self._classify_by_children(level1_item["title"], level2_children)
-            category_en = self.category_mapping.get(category_cn, "other")
+            # 解析LLM返回结果
+            if llm_result and isinstance(llm_result, dict):
+                category_cn = llm_result.get("category_cn", "非规范项")
+                category_code = llm_result.get("category_code", "non_standard")
+                confidence = llm_result.get("confidence", 0.0)
+                
+                # 验证类别是否在映射表中,如果不在则使用兜底类别"非规范项"
+                if category_cn not in self.category_mapping:
+                    print(f"  警告: LLM返回的类别 '{category_cn}' 不在标准类别中,使用兜底类别'非规范项'")
+                    category_cn = "非规范项"
+                    category_code = "non_standard"
+                
+                # 确保category_code与mapping一致
+                category_code = self.category_mapping.get(category_cn, "non_standard")
+            else:
+                # LLM调用失败,使用兜底类别"非规范项"
+                print(f"  警告: 一级目录 '{level1_item['title']}' 的LLM分类失败,使用兜底类别'非规范项'")
+                category_cn = "非规范项"
+                category_code = "non_standard"
+                confidence = 0.0
             
             classified_items.append(
                 {
@@ -126,10 +160,11 @@ class HierarchyClassifier:
                     "page": level1_item["page"],
                     "level": level1_item["level"],
                     "category": category_cn,
-                    "category_code": category_en,
+                    "category_code": category_code,
                     "original": level1_item.get("original", ""),
                     "level2_count": len(level2_children),
                     "level2_titles": [child["title"] for child in level2_children],
+                    "confidence": confidence if llm_result else 0.0,
                 }
             )
             
@@ -143,91 +178,3 @@ class HierarchyClassifier:
             "target_level": target_level,
             "category_stats": dict(category_stats),
         }
-
-    def _classify_by_children(
-        self, level1_title: str, level2_children: List[Dict[str, Any]]
-    ) -> str:
-        """
-        通过二级目录关键词匹配来判断一级目录的分类
-        
-        参数:
-            level1_title: 一级目录标题
-            level2_children: 二级目录列表
-            
-        返回:
-            str: 类别名称
-        """
-        if not level2_children:
-            # 如果没有二级目录,直接匹配一级目录标题
-            return self._match_category(level1_title)
-        
-        # 统计每个类别的匹配次数(投票机制)
-        category_votes = Counter()
-        
-        # 遍历所有二级目录,进行关键词匹配
-        for child in level2_children:
-            child_title = child["title"]
-            matched_category = self._match_category(child_title)
-            
-            # 如果匹配到了非"非规范项"的类别,增加投票
-            if matched_category != "非规范项":
-                category_votes[matched_category] += 1
-        
-        # 如果有匹配结果,返回票数最多的类别
-        if category_votes:
-            most_common_category = category_votes.most_common(1)[0][0]
-            return most_common_category
-        
-        # 如果二级目录都没有匹配到,尝试匹配一级目录标题
-        level1_category = self._match_category(level1_title)
-        if level1_category != "非规范项":
-            return level1_category
-        
-        # 默认返回"非规范项"
-        return "非规范项"
-
-    def _match_category(self, title: str) -> str:
-        """
-        使用正则表达式和关键词匹配目录项标题,返回对应的类别
-        
-        参数:
-            title: 目录项标题
-            
-        返回:
-            str: 类别名称,如果未匹配到则返回"非规范项"
-        """
-        # 去掉开头的编号,便于匹配
-        title_clean = self._remove_number_prefix(title)
-        
-        # 优先级1: 使用正则表达式匹配
-        for category, patterns in self.compiled_patterns.items():
-            for pattern in patterns:
-                if pattern.search(title) or pattern.search(title_clean):
-                    return category
-        
-        # 优先级2: 使用关键词匹配
-        for category, rules in self.category_keywords.items():
-            keywords = rules.get("keywords", [])
-            for keyword in keywords:
-                if keyword in title or keyword in title_clean:
-                    return category
-        
-        # 默认返回"非规范项"
-        return "非规范项"
-
-    def _remove_number_prefix(self, title: str) -> str:
-        """
-        去掉标题开头的编号
-        
-        参数:
-            title: 原始标题
-            
-        返回:
-            str: 去掉编号后的标题
-        """
-        # 去掉开头的编号(如 "1 ", "1. ", "第一章 " 等)
-        title_clean = re.sub(r"^[\d一二三四五六七八九十]+[、\.\s]+", "", title)
-        title_clean = re.sub(r"^第[一二三四五六七八九十\d]+[章节条款]\s*", "", title_clean)
-        title_clean = re.sub(r"^【\d+】\s*", "", title_clean)
-        title_clean = re.sub(r"^〖\d+(?:\.\d+)*〗\s*", "", title_clean)
-        return title_clean

+ 30 - 0
core/construction_review/component/doc_worker/config/llm_api.yaml

@@ -0,0 +1,30 @@
+MODEL_TYPE: qwen
+
+gemini:
+  GEMINI_SERVER_URL: https://generativelanguage.googleapis.com/v1beta/openai/
+  GEMINI_MODEL_ID: gemini-2.0-flash
+  GEMINI_API_KEY: YOUR_GEMINI_API_KEY_FOR_RAG_EVAL
+
+deepseek:
+  DEEPSEEK_SERVER_URL: https://api.deepseek.com
+  DEEPSEEK_MODEL_ID: deepseek-chat
+  DEEPSEEK_API_KEY: YOUR_DEEPSEEK_API_KEY_FOR_RAG_EVAL
+
+doubao:
+  DOUBAO_SERVER_URL: https://ark.cn-beijing.volces.com/api/v3/
+  DOUBAO_MODEL_ID: doubao-seed-1-6-flash-250715
+  DOUBAO_API_KEY: YOUR_DOUBAO_API_KEY_FOR_RAG_EVAL
+
+qwen:
+  QWEN_SERVER_URL: https://aqai.shudaodsj.com:22000/v1/
+  QWEN_MODEL_ID: Qwen/Qwen3-30B-A3B-Instruct-2507
+  QWEN_API_KEY: ms-9ad4a379-d592-4acd-b92c-8bac08a4a045
+
+keywords:
+  timeout: 30
+  max_retries: 2
+  concurrent_workers: 20
+  stream: false
+  request_payload:
+    temperature: 0.3
+    max_tokens: 1024

+ 53 - 0
core/construction_review/component/doc_worker/config/prompt.yaml

@@ -0,0 +1,53 @@
+toc_classification:
+  system: |
+    你是一名工程与施工领域的专业文档分类专家,负责对施工方案文档的目录进行分类识别。
+    - 根据一级目录标题及其下属二级目录的内容,准确判断该一级目录应属于哪个标准类别;
+    - 严格依据提供的分类标准进行分类,不能随意创建新类别;
+    - 如果目录项明显不属于任何标准类别,应分类为"其他资料"。
+    - /no_think
+  user_template: |
+    任务:对施工方案文档的目录项进行分类识别。
+
+    一级目录标题:{{ level1_title }}
+
+    二级目录列表:
+    {{ level2_titles }}
+
+    分类标准(一级标题及对应说明):
+    - 一、编制依据:本章包含法律法规、标准规范、文件制度、编制原则、编制范围等五个方面。
+    - 二、工程概况:本章包含设计概况、工程地质与水文气象、周边环境、施工平面及立面布置、施工要求和技术保证条件、风险辨识与分级、参建各方责任主体单位等七个方面。
+    - 三、施工计划:本章包含施工进度计划、施工材料计划、施工设备计划、劳动力计划、安全生产费用使用计划等五个方面。
+    - 四、施工工艺技术:本章包含主要施工方法概述、技术参数、工艺流程、施工准备、施工方法及操作要求、检查要求等六个方面。
+    - 五、安全保证措施:本章包含安全保证体系、组织保证措施、技术保证措施、监测监控措施、应急处置措施等五个方面。
+    - 六、质量保证措施:本章包含质量保证体系、质量目标、工程创优规划、质量控制程序与具体措施等四个方面。
+    - 七、环境保证措施:本章包含环境保证体系、环境保护组织机构、环境保护及文明施工措施等三个方面。
+    - 八、施工管理及作业人员配备与分工:本章包含施工管理人员、专职安全生产管理人员、特种作业人员、其他作业人员等四个方面。
+    - 九、验收要求:本章包含验收标准、验收程序、验收内容、验收时间、验收人员等五个方面。
+    - 十、其他资料:本章包含计算书、相关施工图纸、附图附表、编制及审核人员情况等四个方面。
+
+    输出要求(只输出 JSON):
+    {
+      "category_cn": "类别中文名称",
+      "category_code": "类别英文代码",
+      "confidence": "分类置信度(0-1之间的小数)"
+    }
+
+    类别中文名称与英文代码对应关系:
+    - 编制依据 -> basis
+    - 工程概况 -> overview
+    - 施工计划 -> plan
+    - 施工工艺技术 -> technology
+    - 安全保证措施 -> safety
+    - 质量保证措施 -> quality
+    - 环境保证措施 -> environment
+    - 施工管理及作业人员配备与分工 -> management
+    - 验收要求 -> acceptance
+    - 其他资料 -> other
+
+
+
+
+
+
+
+

+ 2 - 0
core/construction_review/component/doc_worker/config/provider.py

@@ -52,3 +52,5 @@ default_config_provider = YamlConfigProvider()
 
 
 
+
+

+ 35 - 26
core/construction_review/component/doc_worker/docx_worker/full_text_extractor.py

@@ -54,26 +54,43 @@ class DocxFullTextExtractor(FullTextExtractor):
         else:
             raise ValueError("DocumentSource 必须提供 path 或 content")
 
-        # 提取所有段落内容(过滤目录行)
-        all_paragraphs = []
-        for para in doc.paragraphs:
-            text = para.text
-            # 过滤目录行:标题\t页码
-            if text and not re.match(r"^.+\t+\d+\s*$", text):
-                all_paragraphs.append(text)
-
-        # 提取表格内容
-        for table in doc.tables:
-            table_text = self._extract_table_text(table)
-            all_paragraphs.append(table_text)
+        # 按照文档中的实际顺序提取段落和表格
+        # 创建段落和表格的元素到对象的映射
+        para_map = {para._element: para for para in doc.paragraphs}
+        table_map = {table._element: table for table in doc.tables}
+        
+        # 按照文档中的顺序遍历所有元素
+        all_elements = []
+        for element in doc.element.body:
+            if element in para_map:
+                # 段落元素
+                para = para_map[element]
+                text = para.text
+                # 过滤目录行:标题\t页码
+                if text and not re.match(r"^.+\t+\d+\s*$", text):
+                    all_elements.append(text)
+            elif element in table_map:
+                # 表格元素
+                table = table_map[element]
+                table_text = self._extract_table_text(table)
+                all_elements.append(table_text)
 
-        # 模拟分页:每 N 个段落作为一页
+        # 模拟分页:每 N 个元素作为一页
         pages_content = []
         current_pos = 0
         
-        for page_num in range(0, len(all_paragraphs), self.paragraphs_per_page):
-            page_paragraphs = all_paragraphs[page_num:page_num + self.paragraphs_per_page]
-            page_text = "\n".join(page_paragraphs)
+        # 正则表达式:匹配 [表格开始]...任意内容...[表格结束] 模式
+        table_placeholder_pattern = re.compile(
+            r'\n?\[表格开始\]\n.*?\n\[表格结束\]\n?',
+            re.DOTALL
+        )
+        
+        for page_num in range(0, len(all_elements), self.paragraphs_per_page):
+            page_elements = all_elements[page_num:page_num + self.paragraphs_per_page]
+            page_text = "\n".join(page_elements)
+            
+            # 将任何可能存在的 [表格开始]...表格内容...[表格结束] 替换为占位符
+            page_text = table_placeholder_pattern.sub('\n<表格></表格>\n', page_text)
             
             pages_content.append({
                 "page_num": page_num // self.paragraphs_per_page + 1,
@@ -88,13 +105,5 @@ class DocxFullTextExtractor(FullTextExtractor):
         return pages_content
 
     def _extract_table_text(self, table) -> str:
-        """提取表格内容为文本格式"""
-        table_text = []
-        for row in table.rows:
-            row_text = []
-            for cell in row.cells:
-                cell_text = cell.text.strip().replace("\n", " ")
-                row_text.append(cell_text)
-            table_text.append("\t".join(row_text))
-        
-        return "\n[表格开始]\n" + "\n".join(table_text) + "\n[表格结束]\n"
+        """提取表格占位符,不提取实际内容"""
+        return "\n<表格></表格>\n"

+ 89 - 353
core/construction_review/component/doc_worker/docx_worker/text_splitter.py

@@ -9,15 +9,15 @@ DOCX 文本切分实现
 
 from __future__ import annotations
 
-import re
 from typing import Any, Dict, List
 
 from ..config.provider import default_config_provider
 from ..interfaces import TextSplitter
 from ..utils.title_matcher import TitleMatcher
+from ..utils.text_split_support import HierarchicalChunkMixin
 
 
-class DocxTextSplitter(TextSplitter):
+class DocxTextSplitter(TextSplitter, HierarchicalChunkMixin):
     """按目录层级对 DOCX 正文进行智能分块的实现"""
 
     def __init__(self) -> None:
@@ -131,10 +131,15 @@ class DocxTextSplitter(TextSplitter):
         max_chunk_size: int,
         min_chunk_size: int,
     ) -> List[Dict[str, Any]]:
-        """在正文块中按子标题进行切分(与 PDF 逻辑一致)"""
-        # 实现与 PdfTextSplitter._split_by_sub_titles 完全相同
-        # 为简洁起见,这里直接复用相同的逻辑
+        """
+        在正文块中按子标题进行切分(按照toc_items的顺序和层级关系)
         
+        核心逻辑:
+        1. 查找所有层级的子标题(不限于直接子标题)
+        2. 按位置排序后,两个相邻子标题之间的内容作为一个块
+        3. 只有当块超过 max_chunk_size 时才按句子切分
+        """
+        # 找到父标题在toc_items中的位置
         parent_title = parent_title_info["title"]
         parent_idx = -1
         parent_level = target_level
@@ -146,17 +151,20 @@ class DocxTextSplitter(TextSplitter):
                 break
 
         if parent_idx < 0:
+            # 如果找不到父标题,将整个正文块作为一个块
             if len(content_block) > max_chunk_size:
                 return self._split_large_chunk(content_block, max_chunk_size, parent_title, [])
             else:
-                return [{
-                    "content": content_block,
-                    "relative_start": 0,
-                    "sub_title": "",
-                    "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
-                }]
-
-        # 找到下一个同级或更高级标题的位置
+                return [
+                    {
+                        "content": content_block,
+                        "relative_start": 0,
+                        "sub_title": "",
+                        "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
+                    }
+                ]
+
+        # 找到下一个同级或更高级标题的位置(确定父标题的范围)
         next_sibling_idx = len(all_toc_items)
         for idx in range(parent_idx + 1, len(all_toc_items)):
             item = all_toc_items[idx]
@@ -164,7 +172,8 @@ class DocxTextSplitter(TextSplitter):
                 next_sibling_idx = idx
                 break
 
-        # 查找所有子标题
+        # 查找所有子标题(所有 level > parent_level 的标题)
+        # 这是关键:不限于直接子标题,而是所有更深层级的标题
         all_sub_titles = []
         fuzzy_threshold = float(self._cfg.get("text_splitting.fuzzy_threshold", 0.8))
 
@@ -172,44 +181,63 @@ class DocxTextSplitter(TextSplitter):
             toc_item = all_toc_items[idx]
             item_level = toc_item.get("level", 1)
             
+            # 查找所有更深层级的子标题
             if item_level > parent_level:
-                pos = self._title_matcher._find_title_in_text(
+                # 在正文块中查找这个子标题
+                pos = self._find_title_in_block(
                     toc_item["title"], content_block, fuzzy_threshold
                 )
                 if pos >= 0:
-                    all_sub_titles.append({
-                        "title": toc_item["title"],
-                        "level": toc_item["level"],
-                        "position": pos,
-                        "toc_index": idx,
-                        "toc_item": toc_item,
-                    })
+                    # 调试:显示找到的标题及其周围内容
+                    context_start = max(0, pos - 20)
+                    context_end = min(len(content_block), pos + len(toc_item["title"]) + 50)
+                    context = content_block[context_start:context_end].replace("\n", " ")
+                    print(f"        找到子标题: {toc_item['title']} (level={item_level}), 位置={pos}, 上下文: ...{context}...")
+                    
+                    all_sub_titles.append(
+                        {
+                            "title": toc_item["title"],
+                            "level": toc_item["level"],
+                            "position": pos,
+                            "toc_index": idx,
+                            "toc_item": toc_item,
+                        }
+                    )
 
+        # 按位置排序
         all_sub_titles.sort(key=lambda x: x["position"])
 
+        # 如果没有找到任何子标题,将整个正文块作为一个块
         if not all_sub_titles:
             if len(content_block) > max_chunk_size:
                 return self._split_large_chunk(
-                    content_block, max_chunk_size, parent_title,
+                    content_block, max_chunk_size, parent_title, 
                     parent_title_info.get("hierarchy_path", [parent_title])
                 )
             else:
-                return [{
-                    "content": content_block,
-                    "relative_start": 0,
-                    "sub_title": "",
-                    "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
-                }]
-
-        # 找到最低层级
+                return [
+                    {
+                        "content": content_block,
+                        "relative_start": 0,
+                        "sub_title": "",
+                        "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
+                    }
+                ]
+
+        # 找到最低层级(用于判断哪些是最底层的标题)
         max_level = max(sub["level"] for sub in all_sub_titles)
+        
+        # 只保留最低层级的标题作为切分点
         lowest_level_titles = [sub for sub in all_sub_titles if sub["level"] == max_level]
+        
+        print(f"      父标题: {parent_title}, 找到 {len(all_sub_titles)} 个子标题, 最低层级: {max_level}, 最低层级标题数: {len(lowest_level_titles)}")
 
         # 按最低层级标题切分
         chunks = []
         for i, sub_title in enumerate(lowest_level_titles):
             start_pos = sub_title["position"]
 
+            # 确定结束位置(下一个最低层级标题的位置)
             if i + 1 < len(lowest_level_titles):
                 end_pos = lowest_level_titles[i + 1]["position"]
             else:
@@ -217,17 +245,26 @@ class DocxTextSplitter(TextSplitter):
 
             chunk_content = content_block[start_pos:end_pos]
             
+            # 调试信息
+            content_preview = chunk_content[:100].replace("\n", " ")
+            print(f"        切分块 {i+1}: {sub_title['title']}, 位置: {start_pos}-{end_pos}, 长度: {len(chunk_content)}, 预览: {content_preview}...")
+
+            # 检查子标题是否有实际正文内容
             title_len = len(sub_title["title"])
             content_after_title = chunk_content[title_len:].strip()
 
             if not content_after_title or len(content_after_title) < 10:
+                print(f"        跳过(内容不足)")
                 continue
 
+            # 构建层级路径
             hierarchy_path = self._build_hierarchy_path_for_subtitle(
                 sub_title["toc_item"], all_toc_items, parent_title_info
             )
 
+            # 只有当块超过 max_chunk_size 时才按句子切分
             if len(chunk_content) > max_chunk_size:
+                print(f"        块过大,按句子切分")
                 split_chunks = self._split_large_chunk(
                     chunk_content, max_chunk_size, sub_title["title"], hierarchy_path
                 )
@@ -238,13 +275,17 @@ class DocxTextSplitter(TextSplitter):
                         split_chunk["hierarchy_path"] = hierarchy_path
                     chunks.append(split_chunk)
             else:
-                chunks.append({
-                    "content": chunk_content,
-                    "relative_start": start_pos,
-                    "sub_title": sub_title["title"],
-                    "hierarchy_path": hierarchy_path,
-                })
+                # 直接作为一个块
+                chunks.append(
+                    {
+                        "content": chunk_content,
+                        "relative_start": start_pos,
+                        "sub_title": sub_title["title"],
+                        "hierarchy_path": hierarchy_path,
+                    }
+                )
 
+        # 如果所有子标题都没有正文内容,返回整个正文块
         if not chunks:
             if len(content_block) > max_chunk_size:
                 return self._split_large_chunk(
@@ -252,257 +293,21 @@ class DocxTextSplitter(TextSplitter):
                     parent_title_info.get("hierarchy_path", [parent_title])
                 )
             else:
-                return [{
-                    "content": content_block,
-                    "relative_start": 0,
-                    "sub_title": "",
-                    "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
-                }]
-
-        return chunks
-
-    def _split_large_chunk(
-        self,
-        content: str,
-        max_chunk_size: int,
-        title: str,
-        hierarchy_path: List[str] | None = None,
-    ) -> List[Dict[str, Any]]:
-        """将超大块按句子级分割(保持语义完整)"""
-        sentences = re.split(r"([。!?\n])", content)
-
-        combined_sentences = []
-        for i in range(0, len(sentences) - 1, 2):
-            if i + 1 < len(sentences):
-                combined_sentences.append(sentences[i] + sentences[i + 1])
-            else:
-                combined_sentences.append(sentences[i])
-
-        if not combined_sentences:
-            combined_sentences = [content]
-
-        chunks = []
-        current_chunk = ""
-        current_start = 0
-
-        for sentence in combined_sentences:
-            if len(current_chunk) + len(sentence) <= max_chunk_size:
-                current_chunk += sentence
-            else:
-                if current_chunk:
-                    chunk_data = {
-                        "content": current_chunk,
-                        "relative_start": current_start,
-                        "is_split": True,
+                return [
+                    {
+                        "content": content_block,
+                        "relative_start": 0,
+                        "sub_title": "",
+                        "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
                     }
-                    if hierarchy_path is not None:
-                        chunk_data["hierarchy_path"] = hierarchy_path
-                    chunks.append(chunk_data)
-                    current_start += len(current_chunk)
-                current_chunk = sentence
-
-        if current_chunk:
-            chunk_data = {
-                "content": current_chunk,
-                "relative_start": current_start,
-                "is_split": True,
-            }
-            if hierarchy_path is not None:
-                chunk_data["hierarchy_path"] = hierarchy_path
-            chunks.append(chunk_data)
+                ]
 
         return chunks
 
-    def _build_hierarchy_path_for_subtitle(
-        self,
-        sub_title_item: Dict[str, Any],
-        all_toc_items: List[Dict[str, Any]],
-        parent_title_info: Dict[str, Any],
-    ) -> List[str]:
-        """为子标题构建完整的层级路径"""
-        hierarchy_path = []
-        sub_title = sub_title_item.get("title", "")
-        sub_title_idx = -1
-        
-        for idx, item in enumerate(all_toc_items):
-            if item.get("title", "") == sub_title:
-                sub_title_idx = idx
-                break
-
-        if sub_title_idx < 0:
-            return [parent_title_info["title"], sub_title]
-
-        level_paths = {}
-        current_level = sub_title_item.get("level", 2)
-
-        for i in range(sub_title_idx, -1, -1):
-            item = all_toc_items[i]
-            item_level = item.get("level", 1)
-
-            if item_level <= current_level and item_level not in level_paths:
-                level_paths[item_level] = item["title"]
-                if item_level == 1:
-                    break
-
-        for level in range(1, current_level + 1):
-            if level in level_paths:
-                hierarchy_path.append(level_paths[level])
-
-        if not hierarchy_path:
-            hierarchy_path = [parent_title_info["title"], sub_title]
-
-        return hierarchy_path
-
-    def _build_hierarchy_path(
-        self, title: str, all_toc_items: List[Dict[str, Any]], target_level: int
-    ) -> List[str]:
-        """构建从1级到当前标题的完整层级路径"""
-        hierarchy_path = []
-        current_item = None
-        current_idx = -1
-        
-        for idx, item in enumerate(all_toc_items):
-            if item["title"] == title:
-                current_item = item
-                current_idx = idx
-                break
-
-        if not current_item:
-            return [title]
-
-        current_level = current_item.get("level", target_level)
-        level_paths = {}
-
-        for i in range(current_idx, -1, -1):
-            item = all_toc_items[i]
-            item_level = item.get("level", 1)
-
-            if item_level <= current_level and item_level not in level_paths:
-                level_paths[item_level] = item["title"]
-                if item_level == 1:
-                    break
-
-        for level in range(1, current_level + 1):
-            if level in level_paths:
-                hierarchy_path.append(level_paths[level])
-            elif level == current_level:
-                hierarchy_path.append(title)
-
-        if not hierarchy_path:
-            hierarchy_path = [title]
-
-        return hierarchy_path
-
-    def _build_chunk_metadata(
-        self,
-        sub_chunk: Dict[str, Any],
-        title_info: Dict[str, Any],
-        start_pos: int,
-        pages_content: List[Dict[str, Any]],
-        i: int,
-        j: int,
-        chapter_classification_map: Dict[str, Dict[str, Any]] = None,
-    ) -> Dict[str, Any]:
-        """构建文本块的元数据"""
-        content = sub_chunk["content"]
-        chunk_start_pos = start_pos + sub_chunk["relative_start"]
-        page_num = self._get_page_from_pos(chunk_start_pos, pages_content)
-
-        hierarchy_path = sub_chunk.get("hierarchy_path", [])
-        sub_title = sub_chunk.get("sub_title", "")
-
-        if hierarchy_path:
-            section_label = "->".join(hierarchy_path)
-        elif sub_title:
-            section_label = f"{title_info['title']}->{sub_title}"
-        else:
-            section_label = title_info["title"]
-
-        if hierarchy_path:
-            lowest_title = hierarchy_path[-1]
-            title_number = self._extract_title_number(lowest_title)
-        elif sub_title:
-            title_number = self._extract_title_number(sub_title)
-        else:
-            title_number = self._extract_title_number(title_info["title"])
-
-        chunk_id_str = f"doc_chunk_{title_number}_{j}" if title_number else f"doc_chunk_{j}"
-
-        # 获取一级目录的分类信息
-        chapter_classification = None
-        if chapter_classification_map:
-            # 从hierarchy_path获取一级目录标题
-            if hierarchy_path and len(hierarchy_path) > 0:
-                chapter_title = hierarchy_path[0]
-                chapter_classification = chapter_classification_map.get(chapter_title)
-            elif not hierarchy_path:
-                # 如果没有hierarchy_path,尝试从title_info获取
-                chapter_title = title_info.get("title", "")
-                chapter_classification = chapter_classification_map.get(chapter_title)
-
-        chunk_data = {
-            "file_name": "",
-            "chunk_id": chunk_id_str,
-            "section_label": section_label,
-            "project_plan_type": title_info.get("category_code", "other"),
-            "chapter_classification": title_info.get("category_code", "other"),
-            "element_tag": {
-                "chunk_id": chunk_id_str,
-                "page": page_num,
-                "serial_number": title_number if title_number else str(i + 1),
-            },
-            "review_chunk_content": content,
-            "_title_number": title_number,
-            "_local_index": j,
-            "_sort_key": chunk_start_pos,
-        }
-
-        # # 如果找到了一级目录的分类信息,添加到chunk中
-        # if chapter_classification:
-        #     chunk_data["chapter_classification"] = chapter_classification
-
-        return chunk_data
-
-    def _finalize_chunk_ids(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
-        """生成最终的chunk_id和serial_number"""
-        final_chunks = []
-        section_groups: Dict[str, int] = {}
-
-        for chunk in chunks:
-            section_label = chunk.get("section_label", "")
-            
-            if section_label not in section_groups:
-                section_groups[section_label] = 1
-            else:
-                section_groups[section_label] += 1
-            
-            local_index = section_groups[section_label]
-            title_number_path = self._extract_title_number_path(section_label)
-
-            if title_number_path:
-                chunk_id_str = f"doc_chunk_{title_number_path}_{local_index}"
-            else:
-                chunk_id_str = f"doc_chunk_{local_index}"
-
-            serial_number = self._extract_number_from_section_label(section_label)
-
-            final_chunk = {
-                "file_name": chunk["file_name"],
-                "chunk_id": chunk_id_str,
-                "section_label": chunk["section_label"],
-                "project_plan_type": chunk["project_plan_type"],
-                "chapter_classification": chunk["chapter_classification"],
-                "element_tag": {
-                    "chunk_id": chunk_id_str,
-                    "page": chunk["element_tag"]["page"],
-                    "serial_number": serial_number,
-                },
-                "review_chunk_content": chunk["review_chunk_content"],
-            }
-
-            final_chunks.append(final_chunk)
-
-        return final_chunks
+    def _find_title_in_block(self, title: str, block: str, fuzzy_threshold: float) -> int:
+        """在文本块中查找标题位置(简化版)"""
+        # 直接使用 TitleMatcher 的方法
+        return self._title_matcher._find_title_in_text(title, block, fuzzy_threshold)
 
     def _get_page_from_pos(self, pos: int, pages_content: List[Dict[str, Any]]) -> int:
         """根据位置获取页码"""
@@ -510,72 +315,3 @@ class DocxTextSplitter(TextSplitter):
             if page["start_pos"] <= pos < page["end_pos"]:
                 return int(page["page_num"])
         return 1
-
-    def _extract_title_number(self, title: str) -> str:
-        """从标题中提取编号部分"""
-        if not title:
-            return ""
-        
-        if re.match(r"^(第[一二三四五六七八九十\d]+[章节条款部分])", title):
-            return re.match(r"^(第[一二三四五六七八九十\d]+[章节条款部分])", title).group(1)
-        
-        if re.match(r"^(【\d+】)", title):
-            return re.match(r"^(【\d+】)", title).group(1)
-        
-        if re.match(r"^(〖\d+(?:\.\d+)*〗)", title):
-            return re.match(r"^(〖\d+(?:\.\d+)*〗)", title).group(1)
-        
-        if re.match(r"^(\d+(?:\.\d+)*)", title):
-            return re.match(r"^(\d+(?:\.\d+)*)", title).group(1)
-        
-        if re.match(r"^([一二三四五六七八九十]+)[、..)\)]", title):
-            return re.match(r"^([一二三四五六七八九十]+)[、..)\)]", title).group(1)
-        
-        if re.match(r"^([\((][一二三四五六七八九十\d]+[\))])", title):
-            return re.match(r"^([\((][一二三四五六七八九十\d]+[\))])", title).group(1)
-        
-        return ""
-
-    def _extract_title_number_path(self, section_label: str) -> str:
-        """从section_label中提取标题路径的编号路径"""
-        if not section_label:
-            return ""
-
-        parts = section_label.split("->")
-        number_paths = []
-        
-        for part in parts:
-            part = part.strip()
-            if part:
-                number = self._extract_title_number(part)
-                if number:
-                    number_paths.append(number)
-
-        if number_paths:
-            return "->".join(number_paths)
-
-        return ""
-
-    def _extract_number_from_section_label(self, section_label: str) -> str:
-        """从section_label中提取最底层级的编号"""
-        if not section_label:
-            return ""
-
-        if "->" in section_label:
-            last_level_part = section_label.split("->")[-1].strip()
-        else:
-            last_level_part = section_label.strip()
-
-        if " + " in last_level_part:
-            merged_parts = last_level_part.split(" + ")
-            numbers = []
-            for part in merged_parts:
-                part = part.strip()
-                number = self._extract_title_number(part)
-                if number:
-                    numbers.append(number)
-
-            if numbers:
-                return "+".join(numbers)
-
-        return self._extract_title_number(last_level_part)

+ 12 - 10
core/construction_review/component/doc_worker/docx_worker/toc_extractor.py

@@ -13,6 +13,7 @@ from typing import Any, Dict, List
 from docx import Document
 
 from ..interfaces import TOCExtractor, DocumentSource
+from ..utils.toc_level_identifier import TOCLevelIdentifier
 
 
 class DocxTOCExtractor(TOCExtractor):
@@ -21,6 +22,10 @@ class DocxTOCExtractor(TOCExtractor):
     # 目录行模式:标题 + 制表符 + 页码
     TOC_PATTERN = re.compile(r"^(?P<title>.+?)\t+(?P<page>\d+)\s*$")
 
+    def __init__(self) -> None:
+        """初始化 DOCX 目录提取器"""
+        self._level_identifier = TOCLevelIdentifier()
+
     def extract_toc(self, source: DocumentSource) -> Dict[str, Any]:
         """
         提取 DOCX 文档的目录信息
@@ -55,13 +60,10 @@ class DocxTOCExtractor(TOCExtractor):
                 title = match.group("title").strip()
                 page = int(match.group("page"))
                 
-                # 判断层级(简单规则:根据编号格式)
-                level = self._detect_level(title)
-                
+                # 先不设置层级,后续统一识别
                 toc_items.append({
                     "title": title,
                     "page": page,
-                    "level": level,
                     "original": text,
                 })
                 
@@ -75,6 +77,9 @@ class DocxTOCExtractor(TOCExtractor):
         else:
             toc_pages = []
 
+        # 使用 TOCLevelIdentifier 识别层级(与 doc_worker 保持一致)
+        toc_items = self._level_identifier.identify_levels(toc_items)
+
         return {
             "toc_items": toc_items,
             "toc_count": len(toc_items),
@@ -83,13 +88,10 @@ class DocxTOCExtractor(TOCExtractor):
 
     def _detect_level(self, title: str) -> int:
         """
-        根据标题格式检测层级
+        根据标题格式检测层级(已废弃,保留仅用于向后兼容)
         
-        规则:
-        - 第X章 -> level 1
-        - 一)、二)、三) -> level 2
-        - 1、2、3、 -> level 3
-        - (1)、(2)、(3) -> level 4
+        注意:此方法已不再使用,现在使用 TOCLevelIdentifier 统一识别层级。
+        保留此方法仅用于向后兼容和测试。
         """
         # 章节格式
         if re.match(r"^第[一二三四五六七八九十\d]+章", title):

+ 2 - 0
core/construction_review/component/doc_worker/interfaces.py

@@ -227,3 +227,5 @@ class FileParseFacade(ABC):
 
 
 
+
+

+ 23 - 0
core/construction_review/component/doc_worker/pdf_worker/__init__.py

@@ -0,0 +1,23 @@
+"""
+PDF 文档处理模块
+
+提供 PDF 文件的目录提取、全文提取、文本切分等功能。
+"""
+
+from .adapter import PdfWorkerConfig, build_pdf_facade
+from .toc_extractor import PdfTOCExtractor
+from .fulltext_extractor import PdfFullTextExtractor
+from .text_splitter import PdfTextSplitter
+from .classifier import PdfHierarchyClassifier
+from .json_writer import PdfJsonResultWriter
+
+__all__ = [
+    "PdfTOCExtractor",
+    "PdfFullTextExtractor",
+    "PdfTextSplitter",
+    "PdfHierarchyClassifier",
+    "PdfJsonResultWriter",
+    "PdfWorkerConfig",
+    "build_pdf_facade",
+]
+

+ 641 - 0
core/construction_review/component/doc_worker/pdf_worker/text_splitter.py

@@ -10,6 +10,7 @@ PDF 文本切分实现
 
 from __future__ import annotations
 
+import json
 import re
 from typing import Any, Dict, List
 
@@ -18,6 +19,7 @@ from ..interfaces import TextSplitter
 from ..utils.title_matcher import TitleMatcher
 
 
+
 class PdfTextSplitter(TextSplitter):
     """按目录层级对 PDF 正文进行智能分块的实现(复刻 doc_worker 逻辑)。"""
 
@@ -580,6 +582,645 @@ class PdfTextSplitter(TextSplitter):
                 },
                 "review_chunk_content": chunk["review_chunk_content"],
             }
+            print(f"[INFO] 更新chunk数据{chunk_id_str}")
+            with open(rf"temp\document_temp\文档切分预处理结果_final_chunk.json", 'w', encoding='utf-8') as f:
+                json.dump(final_chunk, f, ensure_ascii=False, indent=4)
+            final_chunks.append(final_chunk)
+
+        return final_chunks
+
+    def _get_page_from_pos(self, pos: int, pages_content: List[Dict[str, Any]]) -> int:
+        """根据位置获取页码"""
+        for page in pages_content:
+            if page["start_pos"] <= pos < page["end_pos"]:
+                return int(page["page_num"])
+        return 1
+
+    def _extract_title_number(self, title: str) -> str:
+        """从标题中提取编号部分(支持多种格式)"""
+        if not title:
+            return ""
+        
+        # 匹配章节格式(如 第一章、第1章等)
+        chapter_match = re.match(r"^(第[一二三四五六七八九十\d]+[章节条款部分])", title)
+        if chapter_match:
+            return chapter_match.group(1)
+        
+        # 匹配方括号数字格式(如 【1】、【2】等)
+        bracket_match = re.match(r"^(【\d+】)", title)
+        if bracket_match:
+            return bracket_match.group(1)
+        
+        # 匹配双方括号数字格式(如 〖1.1〗、〖2.3〗等)
+        double_bracket_match = re.match(r"^(〖\d+(?:\.\d+)*〗)", title)
+        if double_bracket_match:
+            return double_bracket_match.group(1)
+        
+        # 匹配数字编号格式(如 1.5, 1.6, 1.2.3等)
+        number_match = re.match(r"^(\d+(?:\.\d+)*)", title)
+        if number_match:
+            return number_match.group(1)
+        
+        # 匹配中文编号格式(如 一、二、三等)
+        chinese_match = re.match(r"^([一二三四五六七八九十]+)[、..)\)]", title)
+        if chinese_match:
+            return chinese_match.group(1)
+        
+        # 匹配圆括号编号格式(如 (1)、(一)等)
+        paren_match = re.match(r"^([\((][一二三四五六七八九十\d]+[\))])", title)
+        if paren_match:
+            return paren_match.group(1)
+        
+        return ""
+
+    def _extract_title_number_path(self, section_label: str) -> str:
+        """从section_label中提取标题路径的编号路径"""
+        if not section_label:
+            return ""
+
+        # 按"->"分割层级路径
+        parts = section_label.split("->")
+
+        # 提取每一层的编号
+        number_paths = []
+        for part in parts:
+            part = part.strip()
+            if part:
+                number = self._extract_title_number(part)
+                if number:
+                    number_paths.append(number)
+
+        # 用"->"连接编号路径
+        if number_paths:
+            return "->".join(number_paths)
+
+        return ""
+
+    def _extract_number_from_section_label(self, section_label: str) -> str:
+        """
+        从section_label中提取最底层级的编号
+        
+        例如:
+        "第一章 编制依据与说明->一) 编制依据" -> "一)"
+        "第二章 工程概况->二)周边环境条件及工程地质->1、周边环境条件" -> "1"
+        "第四章 施工工艺技术->一)主要部件说明->2、前临时支腿" -> "2"
+        """
+        if not section_label:
+            return ""
+
+        # 先找到最低层级部分(最后一个"->"后面的部分)
+        if "->" in section_label:
+            last_level_part = section_label.split("->")[-1].strip()
+        else:
+            last_level_part = section_label.strip()
+
+        # 检查最低层级部分是否包含合并标记(" + ")
+        if " + " in last_level_part:
+            # 分割合并的部分
+            merged_parts = last_level_part.split(" + ")
+            numbers = []
+            for part in merged_parts:
+                part = part.strip()
+                number = self._extract_title_number(part)
+                if number:
+                    numbers.append(number)
+
+            if numbers:
+                return "+".join(numbers)
+
+        # 没有合并的情况,直接提取最低层级的编号
+        return self._extract_title_number(last_level_part)
+
+
+
+    """按目录层级对 PDF 正文进行智能分块的实现(复刻 doc_worker 逻辑)。"""
+
+    def __init__(self) -> None:
+        self._cfg = default_config_provider
+        self._title_matcher = TitleMatcher()
+
+    def split_by_hierarchy(
+        self,
+        classification_items: List[Dict[str, Any]],
+        pages_content: List[Dict[str, Any]],
+        toc_info: Dict[str, Any],
+        target_level: int,
+        max_chunk_size: int,
+        min_chunk_size: int,
+    ) -> List[Dict[str, Any]]:
+        """
+        按目录层级和字符数智能切分文本
+        
+        新的分块逻辑:
+        1. 跳过目录页,按目录项定位到指定层级的正文标题
+        2. 在指定层级正文标题所属的正文块中,先按目录项的最低层级子标题进行分块
+        3. 对每个块按字符数判断:
+           - 超过max_chunk_size的进行句子级分割(保持语义尽量完整)
+        """
+        toc_pages = toc_info.get("toc_pages", []) or []
+        all_toc_items = toc_info.get("toc_items", [])
+        
+        # 使用完整全文
+        full_text = "".join(p.get("text", "") for p in pages_content)
+
+        print(f"  正在定位{len(classification_items)}个已分类的标题...")
+        print(f"  目录所在页: {toc_pages}")
+
+        # 步骤1: 在正文中定位已分类的标题(跳过目录页)
+        located = self._title_matcher.find_title_positions(
+            classification_items, full_text, pages_content, toc_pages
+        )
+        
+        # 只保留成功定位的标题
+        found_titles = [t for t in located if t["found"]]
+        if not found_titles:
+            print(f"  错误: 未能在正文中定位任何标题")
+            return []
+
+        print(f"  成功定位 {len(found_titles)}/{len(classification_items)} 个标题")
+        
+        # 按位置排序
+        found_titles.sort(key=lambda x: x["position"])
+
+        # 步骤2: 为每个找到的标题构建完整的层级路径
+        for title_info in found_titles:
+            hierarchy_path = self._build_hierarchy_path(
+                title_info["title"], all_toc_items, target_level
+            )
+            title_info["hierarchy_path"] = hierarchy_path
+
+        # 步骤3: 按目录层级处理每个标题块
+        all_chunks: List[Dict[str, Any]] = []
+        
+        for i, title_info in enumerate(found_titles):
+            start_pos = title_info["position"]
+            
+            # 确定正文块的结束位置(下一个同级标题的位置)
+            if i + 1 < len(found_titles):
+                end_pos = found_titles[i + 1]["position"]
+            else:
+                end_pos = len(full_text)
+            
+            # 提取正文块
+            content_block = full_text[start_pos:end_pos]
+            
+            # 在正文块中查找子标题(按最低层级切分)
+            sub_chunks = self._split_by_sub_titles(
+                content_block,
+                all_toc_items,
+                title_info,
+                target_level,
+                max_chunk_size,
+                min_chunk_size,
+            )
+            
+            # 为每个子块添加元数据
+            for j, sub_chunk in enumerate(sub_chunks, 1):
+                chunk_data = self._build_chunk_metadata(
+                    sub_chunk, title_info, start_pos, pages_content, i, j
+                )
+                all_chunks.append(chunk_data)
+
+        # 步骤4: 生成最终的chunk_id和serial_number
+        final_chunks = self._finalize_chunk_ids(all_chunks)
+
+        print(f"  初始切分: {len(all_chunks)} 个块")
+        print(f"  最终块数: {len(final_chunks)} 个块")
+
+        return final_chunks
+
+    def _split_by_sub_titles(
+        self,
+        content_block: str,
+        all_toc_items: List[Dict[str, Any]],
+        parent_title_info: Dict[str, Any],
+        target_level: int,
+        max_chunk_size: int,
+        min_chunk_size: int,
+    ) -> List[Dict[str, Any]]:
+        """
+        在正文块中按子标题进行切分(按照toc_items的顺序和层级关系)
+        
+        核心逻辑:
+        1. 查找所有层级的子标题(不限于直接子标题)
+        2. 按位置排序后,两个相邻子标题之间的内容作为一个块
+        3. 只有当块超过 max_chunk_size 时才按句子切分
+        """
+        # 找到父标题在toc_items中的位置
+        parent_title = parent_title_info["title"]
+        parent_idx = -1
+        parent_level = target_level
+        
+        for idx, toc_item in enumerate(all_toc_items):
+            if toc_item["title"] == parent_title:
+                parent_idx = idx
+                parent_level = toc_item.get("level", target_level)
+                break
+
+        if parent_idx < 0:
+            # 如果找不到父标题,将整个正文块作为一个块
+            if len(content_block) > max_chunk_size:
+                return self._split_large_chunk(content_block, max_chunk_size, parent_title, [])
+            else:
+                return [
+                    {
+                        "content": content_block,
+                        "relative_start": 0,
+                        "sub_title": "",
+                        "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
+                    }
+                ]
+
+        # 找到下一个同级或更高级标题的位置(确定父标题的范围)
+        next_sibling_idx = len(all_toc_items)
+        for idx in range(parent_idx + 1, len(all_toc_items)):
+            item = all_toc_items[idx]
+            if item.get("level", 1) <= parent_level:
+                next_sibling_idx = idx
+                break
+
+        # 查找所有子标题(所有 level > parent_level 的标题)
+        # 这是关键:不限于直接子标题,而是所有更深层级的标题
+        all_sub_titles = []
+        fuzzy_threshold = float(self._cfg.get("text_splitting.fuzzy_threshold", 0.8))
+
+        for idx in range(parent_idx + 1, next_sibling_idx):
+            toc_item = all_toc_items[idx]
+            item_level = toc_item.get("level", 1)
+            
+            # 查找所有更深层级的子标题
+            if item_level > parent_level:
+                # 在正文块中查找这个子标题
+                pos = self._find_title_in_block(
+                    toc_item["title"], content_block, fuzzy_threshold
+                )
+                if pos >= 0:
+                    # 调试:显示找到的标题及其周围内容
+                    context_start = max(0, pos - 20)
+                    context_end = min(len(content_block), pos + len(toc_item["title"]) + 50)
+                    context = content_block[context_start:context_end].replace("\n", " ")
+                    print(f"        找到子标题: {toc_item['title']} (level={item_level}), 位置={pos}, 上下文: ...{context}...")
+                    
+                    all_sub_titles.append(
+                        {
+                            "title": toc_item["title"],
+                            "level": toc_item["level"],
+                            "position": pos,
+                            "toc_index": idx,
+                            "toc_item": toc_item,
+                        }
+                    )
+
+        # 按位置排序
+        all_sub_titles.sort(key=lambda x: x["position"])
+
+        # 如果没有找到任何子标题,将整个正文块作为一个块
+        if not all_sub_titles:
+            if len(content_block) > max_chunk_size:
+                return self._split_large_chunk(
+                    content_block, max_chunk_size, parent_title, 
+                    parent_title_info.get("hierarchy_path", [parent_title])
+                )
+            else:
+                return [
+                    {
+                        "content": content_block,
+                        "relative_start": 0,
+                        "sub_title": "",
+                        "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
+                    }
+                ]
+
+        # 找到最低层级(用于判断哪些是最底层的标题)
+        max_level = max(sub["level"] for sub in all_sub_titles)
+        
+        # 只保留最低层级的标题作为切分点
+        lowest_level_titles = [sub for sub in all_sub_titles if sub["level"] == max_level]
+        
+        print(f"      父标题: {parent_title}, 找到 {len(all_sub_titles)} 个子标题, 最低层级: {max_level}, 最低层级标题数: {len(lowest_level_titles)}")
+
+        # 按最低层级标题切分
+        chunks = []
+        for i, sub_title in enumerate(lowest_level_titles):
+            start_pos = sub_title["position"]
+
+            # 确定结束位置(下一个最低层级标题的位置)
+            if i + 1 < len(lowest_level_titles):
+                end_pos = lowest_level_titles[i + 1]["position"]
+            else:
+                end_pos = len(content_block)
+
+            chunk_content = content_block[start_pos:end_pos]
+            
+            # 调试信息
+            content_preview = chunk_content[:100].replace("\n", " ")
+            print(f"        切分块 {i+1}: {sub_title['title']}, 位置: {start_pos}-{end_pos}, 长度: {len(chunk_content)}, 预览: {content_preview}...")
+
+            # 检查子标题是否有实际正文内容
+            title_len = len(sub_title["title"])
+            content_after_title = chunk_content[title_len:].strip()
+
+            if not content_after_title or len(content_after_title) < 10:
+                print(f"        跳过(内容不足)")
+                continue
+
+            # 构建层级路径
+            hierarchy_path = self._build_hierarchy_path_for_subtitle(
+                sub_title["toc_item"], all_toc_items, parent_title_info
+            )
+
+            # 只有当块超过 max_chunk_size 时才按句子切分
+            if len(chunk_content) > max_chunk_size:
+                print(f"        块过大,按句子切分")
+                split_chunks = self._split_large_chunk(
+                    chunk_content, max_chunk_size, sub_title["title"], hierarchy_path
+                )
+                for split_chunk in split_chunks:
+                    split_chunk["relative_start"] = start_pos + split_chunk["relative_start"]
+                    split_chunk["sub_title"] = sub_title["title"]
+                    if "hierarchy_path" not in split_chunk:
+                        split_chunk["hierarchy_path"] = hierarchy_path
+                    chunks.append(split_chunk)
+            else:
+                # 直接作为一个块
+                chunks.append(
+                    {
+                        "content": chunk_content,
+                        "relative_start": start_pos,
+                        "sub_title": sub_title["title"],
+                        "hierarchy_path": hierarchy_path,
+                    }
+                )
+
+        # 如果所有子标题都没有正文内容,返回整个正文块
+        if not chunks:
+            if len(content_block) > max_chunk_size:
+                return self._split_large_chunk(
+                    content_block, max_chunk_size, parent_title,
+                    parent_title_info.get("hierarchy_path", [parent_title])
+                )
+            else:
+                return [
+                    {
+                        "content": content_block,
+                        "relative_start": 0,
+                        "sub_title": "",
+                        "hierarchy_path": parent_title_info.get("hierarchy_path", [parent_title]),
+                    }
+                ]
+
+        return chunks
+
+    def _find_title_in_block(self, title: str, block: str, fuzzy_threshold: float) -> int:
+        """在文本块中查找标题位置(简化版)"""
+        # 直接使用 TitleMatcher 的方法
+        return self._title_matcher._find_title_in_text(title, block, fuzzy_threshold)
+
+    def _split_large_chunk(
+        self,
+        content: str,
+        max_chunk_size: int,
+        title: str,
+        hierarchy_path: List[str] | None = None,
+    ) -> List[Dict[str, Any]]:
+        """
+        将超大块按句子级分割(保持语义完整)
+        """
+        # 按句子分割(中文句号、问号、感叹号、换行)
+        sentences = re.split(r"([。!?\n])", content)
+
+        # 重新组合句子和标点
+        combined_sentences = []
+        for i in range(0, len(sentences) - 1, 2):
+            if i + 1 < len(sentences):
+                combined_sentences.append(sentences[i] + sentences[i + 1])
+            else:
+                combined_sentences.append(sentences[i])
+
+        if not combined_sentences:
+            combined_sentences = [content]
+
+        # 按max_chunk_size组合句子
+        chunks = []
+        current_chunk = ""
+        current_start = 0
+
+        for sentence in combined_sentences:
+            if len(current_chunk) + len(sentence) <= max_chunk_size:
+                current_chunk += sentence
+            else:
+                if current_chunk:
+                    chunk_data = {
+                        "content": current_chunk,
+                        "relative_start": current_start,
+                        "is_split": True,  # 标记为分割块
+                    }
+                    if hierarchy_path is not None:
+                        chunk_data["hierarchy_path"] = hierarchy_path
+                    chunks.append(chunk_data)
+                    current_start += len(current_chunk)
+                current_chunk = sentence
+
+        # 添加最后一个块
+        if current_chunk:
+            chunk_data = {
+                "content": current_chunk,
+                "relative_start": current_start,
+                "is_split": True,
+            }
+            if hierarchy_path is not None:
+                chunk_data["hierarchy_path"] = hierarchy_path
+            chunks.append(chunk_data)
+
+        return chunks
+
+    def _build_hierarchy_path_for_subtitle(
+        self,
+        sub_title_item: Dict[str, Any],
+        all_toc_items: List[Dict[str, Any]],
+        parent_title_info: Dict[str, Any],
+    ) -> List[str]:
+        """为子标题构建完整的层级路径"""
+        hierarchy_path = []
+
+        # 找到子标题在toc_items中的位置
+        sub_title = sub_title_item.get("title", "")
+        sub_title_idx = -1
+        for idx, item in enumerate(all_toc_items):
+            if item.get("title", "") == sub_title:
+                sub_title_idx = idx
+                break
+
+        if sub_title_idx < 0:
+            # 如果找不到,返回父标题->子标题
+            return [parent_title_info["title"], sub_title]
+
+        # 从子标题向前查找,找到每个层级的父级标题
+        level_paths = {}  # 存储每个层级对应的标题
+        current_level = sub_title_item.get("level", 2)
+
+        for i in range(sub_title_idx, -1, -1):
+            item = all_toc_items[i]
+            item_level = item.get("level", 1)
+
+            if item_level <= current_level and item_level not in level_paths:
+                level_paths[item_level] = item["title"]
+                if item_level == 1:
+                    break
+
+        # 按层级顺序构建路径(从1级到当前层级)
+        for level in range(1, current_level + 1):
+            if level in level_paths:
+                hierarchy_path.append(level_paths[level])
+
+        # 如果路径为空,至少包含父标题和子标题
+        if not hierarchy_path:
+            hierarchy_path = [parent_title_info["title"], sub_title]
+
+        return hierarchy_path
+
+    def _build_hierarchy_path(
+        self, title: str, all_toc_items: List[Dict[str, Any]], target_level: int
+    ) -> List[str]:
+        """构建从1级到当前标题的完整层级路径"""
+        hierarchy_path = []
+
+        # 找到当前标题在目录中的位置
+        current_item = None
+        current_idx = -1
+        for idx, item in enumerate(all_toc_items):
+            if item["title"] == title:
+                current_item = item
+                current_idx = idx
+                break
+
+        if not current_item:
+            # 如果找不到,返回只包含当前标题的路径
+            return [title]
+
+        current_level = current_item.get("level", target_level)
+
+        # 从当前项向前查找,找到每个层级的最近父级
+        level_paths = {}  # 存储每个层级对应的标题
+
+        for i in range(current_idx, -1, -1):
+            item = all_toc_items[i]
+            item_level = item.get("level", 1)
+
+            if item_level <= current_level and item_level not in level_paths:
+                level_paths[item_level] = item["title"]
+                if item_level == 1:
+                    break
+
+        # 按层级顺序构建路径(从1级到当前层级)
+        for level in range(1, current_level + 1):
+            if level in level_paths:
+                hierarchy_path.append(level_paths[level])
+            elif level == current_level:
+                hierarchy_path.append(title)
+
+        # 如果路径为空,至少包含当前标题
+        if not hierarchy_path:
+            hierarchy_path = [title]
+
+        return hierarchy_path
+
+    def _build_chunk_metadata(
+        self,
+        sub_chunk: Dict[str, Any],
+        title_info: Dict[str, Any],
+        start_pos: int,
+        pages_content: List[Dict[str, Any]],
+        i: int,
+        j: int,
+    ) -> Dict[str, Any]:
+        """构建文本块的元数据"""
+        content = sub_chunk["content"]
+        chunk_start_pos = start_pos + sub_chunk["relative_start"]
+        page_num = self._get_page_from_pos(chunk_start_pos, pages_content)
+
+        # 构建section_label:使用完整的层级路径
+        hierarchy_path = sub_chunk.get("hierarchy_path", [])
+        sub_title = sub_chunk.get("sub_title", "")
+
+        if hierarchy_path:
+            section_label = "->".join(hierarchy_path)
+        elif sub_title:
+            section_label = f"{title_info['title']}->{sub_title}"
+        else:
+            section_label = title_info["title"]
+
+        # 提取最低层级标题的编号
+        if hierarchy_path:
+            lowest_title = hierarchy_path[-1]
+            title_number = self._extract_title_number(lowest_title)
+        elif sub_title:
+            title_number = self._extract_title_number(sub_title)
+        else:
+            title_number = self._extract_title_number(title_info["title"])
+
+        # 构建chunk_id
+        chunk_id_str = f"doc_chunk_{title_number}_{j}" if title_number else f"doc_chunk_{j}"
+
+        return {
+            "file_name": "",  # 由上层填充
+            "chunk_id": chunk_id_str,
+            "section_label": section_label,
+            "project_plan_type": title_info.get("category_code", "other"),
+            "element_tag": {
+                "chunk_id": chunk_id_str,
+                "page": page_num,
+                "serial_number": title_number if title_number else str(i + 1),
+            },
+            "review_chunk_content": content,
+            "_title_number": title_number,
+            "_local_index": j,
+            "_sort_key": chunk_start_pos,
+        }
+
+    def _finalize_chunk_ids(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+        """生成最终的chunk_id和serial_number"""
+        final_chunks = []
+        
+        # 按 section_label 分组,为每组内的块生成递增的序号
+        section_groups: Dict[str, int] = {}  # section_label -> 当前序号
+
+        for chunk in chunks:
+            section_label = chunk.get("section_label", "")
+            
+            # 为当前 section_label 生成序号
+            if section_label not in section_groups:
+                section_groups[section_label] = 1
+            else:
+                section_groups[section_label] += 1
+            
+            local_index = section_groups[section_label]
+
+            # 从section_label中提取标题路径的编号路径
+            title_number_path = self._extract_title_number_path(section_label)
+
+            # 生成chunk_id:doc_chunk_<标题路径的编号路径>_序号
+            if title_number_path:
+                chunk_id_str = f"doc_chunk_{title_number_path}_{local_index}"
+            else:
+                chunk_id_str = f"doc_chunk_{local_index}"
+
+            # 从section_label中提取最底层级的编号(用于 serial_number)
+            serial_number = self._extract_number_from_section_label(section_label)
+
+            # 更新chunk数据
+            final_chunk = {
+                "file_name": chunk["file_name"],
+                "chunk_id": chunk_id_str,
+                "section_label": chunk["section_label"],
+                "project_plan_type": chunk["project_plan_type"],
+                "element_tag": {
+                    "chunk_id": chunk_id_str,
+                    "page": chunk["element_tag"]["page"],
+                    "serial_number": serial_number,
+                },
+                "review_chunk_content": chunk["review_chunk_content"],
+            }
 
             final_chunks.append(final_chunk)
 

+ 374 - 0
core/construction_review/component/doc_worker/utils/llm_client.py

@@ -0,0 +1,374 @@
+"""
+LLM API客户端工具类
+支持异步并发调用多个LLM API请求
+"""
+
+from __future__ import annotations
+
+import asyncio
+import json
+from typing import Any, Dict, List, Optional
+from pathlib import Path
+
+try:
+    import aiohttp
+    HAS_AIOHTTP = True
+except ImportError:
+    HAS_AIOHTTP = False
+
+try:
+    import requests
+    HAS_REQUESTS = True
+except ImportError:
+    HAS_REQUESTS = False
+
+from ..config.provider import default_config_provider
+
+
+class LLMClient:
+    """LLM API客户端,支持异步并发调用"""
+
+    def __init__(self, config_provider=None):
+        """
+        初始化LLM客户端
+        
+        参数:
+            config_provider: 配置提供者,如果为None则使用默认配置
+        """
+        self._cfg = config_provider or default_config_provider
+        self._load_config()
+
+    def _load_config(self):
+        """加载LLM API配置"""
+        # 加载llm_api.yaml配置
+        llm_api_path = Path(__file__).parent.parent / "config" / "llm_api.yaml"
+        import yaml
+        
+        with open(llm_api_path, "r", encoding="utf-8") as f:
+            llm_config = yaml.safe_load(f) or {}
+        
+        # 获取模型类型
+        self.model_type = llm_config.get("MODEL_TYPE", "qwen").lower()
+        
+        # 获取模型配置
+        model_config = llm_config.get(self.model_type, {})
+        
+        # 根据模型类型设置URL、模型ID和API Key
+        if self.model_type == "qwen":
+            self.api_url = model_config.get("QWEN_SERVER_URL", "").rstrip("/")
+            self.model_id = model_config.get("QWEN_MODEL_ID", "")
+            self.api_key = model_config.get("QWEN_API_KEY", "")
+            self.base_url = f"{self.api_url}/chat/completions"
+        elif self.model_type == "deepseek":
+            self.api_url = model_config.get("DEEPSEEK_SERVER_URL", "").rstrip("/")
+            self.model_id = model_config.get("DEEPSEEK_MODEL_ID", "")
+            self.api_key = model_config.get("DEEPSEEK_API_KEY", "")
+            self.base_url = f"{self.api_url}/chat/completions"
+        elif self.model_type == "doubao":
+            self.api_url = model_config.get("DOUBAO_SERVER_URL", "").rstrip("/")
+            self.model_id = model_config.get("DOUBAO_MODEL_ID", "")
+            self.api_key = model_config.get("DOUBAO_API_KEY", "")
+            self.base_url = f"{self.api_url}/chat/completions"
+        elif self.model_type == "gemini":
+            self.api_url = model_config.get("GEMINI_SERVER_URL", "").rstrip("/")
+            self.model_id = model_config.get("GEMINI_MODEL_ID", "")
+            self.api_key = model_config.get("GEMINI_API_KEY", "")
+            self.base_url = f"{self.api_url}/chat/completions"
+        else:
+            raise ValueError(f"不支持的模型类型: {self.model_type}")
+        
+        # 获取通用配置
+        keywords_config = llm_config.get("keywords", {})
+        self.timeout = keywords_config.get("timeout", 30)
+        self.max_retries = keywords_config.get("max_retries", 2)
+        self.concurrent_workers = keywords_config.get("concurrent_workers", 20)
+        self.stream = keywords_config.get("stream", False)
+        
+        request_payload = keywords_config.get("request_payload", {})
+        self.temperature = request_payload.get("temperature", 0.3)
+        self.max_tokens = request_payload.get("max_tokens", 1024)
+
+    async def _call_api_async(self, session: aiohttp.ClientSession, messages: List[Dict[str, str]]) -> Dict[str, Any]:
+        """
+        异步调用LLM API
+        
+        参数:
+            session: aiohttp会话
+            messages: 消息列表
+            
+        返回:
+            API响应结果
+        """
+        headers = {
+            "Content-Type": "application/json",
+            "Authorization": f"Bearer {self.api_key}"
+        }
+        
+        payload = {
+            "model": self.model_id,
+            "messages": messages,
+            "temperature": self.temperature,
+            "max_tokens": self.max_tokens,
+            "stream": self.stream
+        }
+        
+        for attempt in range(self.max_retries):
+            try:
+                async with session.post(
+                    self.base_url,
+                    json=payload,
+                    headers=headers,
+                    timeout=aiohttp.ClientTimeout(total=self.timeout)
+                ) as response:
+                    if response.status == 200:
+                        result = await response.json()
+                        return result
+                    else:
+                        error_text = await response.text()
+                        if attempt < self.max_retries - 1:
+                            await asyncio.sleep(1 * (attempt + 1))  # 指数退避
+                            continue
+                        raise Exception(f"API调用失败,状态码: {response.status}, 错误: {error_text}")
+            except asyncio.TimeoutError:
+                if attempt < self.max_retries - 1:
+                    await asyncio.sleep(1 * (attempt + 1))
+                    continue
+                raise Exception(f"API调用超时(超过{self.timeout}秒)")
+            except Exception as e:
+                if attempt < self.max_retries - 1:
+                    await asyncio.sleep(1 * (attempt + 1))
+                    continue
+                raise
+        
+        raise Exception("API调用失败,已达到最大重试次数")
+
+    def _call_api_sync(self, messages: List[Dict[str, str]]) -> Dict[str, Any]:
+        """
+        同步调用LLM API(回退方案,当没有aiohttp时使用)
+        
+        参数:
+            messages: 消息列表
+            
+        返回:
+            API响应结果
+        """
+        if not HAS_REQUESTS:
+            raise ImportError("需要安装 aiohttp 或 requests 库才能使用LLM API客户端")
+        
+        headers = {
+            "Content-Type": "application/json",
+            "Authorization": f"Bearer {self.api_key}"
+        }
+        
+        payload = {
+            "model": self.model_id,
+            "messages": messages,
+            "temperature": self.temperature,
+            "max_tokens": self.max_tokens,
+            "stream": self.stream
+        }
+        
+        for attempt in range(self.max_retries):
+            try:
+                response = requests.post(
+                    self.base_url,
+                    json=payload,
+                    headers=headers,
+                    timeout=self.timeout
+                )
+                if response.status_code == 200:
+                    return response.json()
+                else:
+                    if attempt < self.max_retries - 1:
+                        import time
+                        time.sleep(1 * (attempt + 1))
+                        continue
+                    raise Exception(f"API调用失败,状态码: {response.status_code}, 错误: {response.text}")
+            except requests.Timeout:
+                if attempt < self.max_retries - 1:
+                    import time
+                    time.sleep(1 * (attempt + 1))
+                    continue
+                raise Exception(f"API调用超时(超过{self.timeout}秒)")
+            except Exception as e:
+                if attempt < self.max_retries - 1:
+                    import time
+                    time.sleep(1 * (attempt + 1))
+                    continue
+                raise
+        
+        raise Exception("API调用失败,已达到最大重试次数")
+
+    async def _process_single_request(self, session: aiohttp.ClientSession, messages: List[Dict[str, str]]) -> Optional[Dict[str, Any]]:
+        """
+        处理单个请求(包装异常处理)
+        
+        参数:
+            session: aiohttp会话
+            messages: 消息列表
+            
+        返回:
+            解析后的JSON结果,如果失败则返回None
+        """
+        try:
+            response = await self._call_api_async(session, messages)
+            
+            # 提取响应内容
+            if "choices" in response and len(response["choices"]) > 0:
+                content = response["choices"][0].get("message", {}).get("content", "")
+                
+                # 尝试解析JSON
+                try:
+                    # 尝试提取JSON(可能在markdown代码块中)
+                    if "```json" in content:
+                        start = content.find("```json") + 7
+                        end = content.find("```", start)
+                        content = content[start:end].strip()
+                    elif "```" in content:
+                        start = content.find("```") + 3
+                        end = content.find("```", start)
+                        content = content[start:end].strip()
+                    
+                    return json.loads(content)
+                except json.JSONDecodeError:
+                    # 如果不是JSON,返回原始内容
+                    return {"raw_content": content}
+            else:
+                return None
+        except Exception as e:
+            print(f"  LLM API调用错误: {e}")
+            return None
+
+    async def batch_call_async(self, requests: List[List[Dict[str, str]]]) -> List[Optional[Dict[str, Any]]]:
+        """
+        异步批量调用LLM API
+        
+        参数:
+            requests: 请求列表,每个请求是一个消息列表
+            
+        返回:
+            结果列表,与输入请求一一对应
+        """
+        if not HAS_AIOHTTP:
+            # 回退到同步调用(在异步环境中)
+            if HAS_REQUESTS:
+                print("  警告: 未安装aiohttp,在异步环境中使用同步调用(性能较差)")
+                results = []
+                for req in requests:
+                    try:
+                        response = self._call_api_sync(req)
+                        if "choices" in response and len(response["choices"]) > 0:
+                            content = response["choices"][0].get("message", {}).get("content", "")
+                            try:
+                                if "```json" in content:
+                                    start = content.find("```json") + 7
+                                    end = content.find("```", start)
+                                    content = content[start:end].strip()
+                                elif "```" in content:
+                                    start = content.find("```") + 3
+                                    end = content.find("```", start)
+                                    content = content[start:end].strip()
+                                results.append(json.loads(content))
+                            except json.JSONDecodeError:
+                                results.append({"raw_content": content})
+                        else:
+                            results.append(None)
+                    except Exception as e:
+                        print(f"  LLM API调用错误: {e}")
+                        results.append(None)
+                return results
+            else:
+                raise ImportError("需要安装 aiohttp 或 requests 库才能使用LLM API客户端")
+        
+        # 使用信号量限制并发数
+        semaphore = asyncio.Semaphore(self.concurrent_workers)
+        
+        async def bounded_request(session, messages):
+            async with semaphore:
+                return await self._process_single_request(session, messages)
+        
+        async with aiohttp.ClientSession() as session:
+            tasks = [bounded_request(session, req) for req in requests]
+            results = await asyncio.gather(*tasks, return_exceptions=True)
+            
+            # 处理异常结果
+            processed_results = []
+            for result in results:
+                if isinstance(result, Exception):
+                    print(f"  LLM API调用异常: {result}")
+                    processed_results.append(None)
+                else:
+                    processed_results.append(result)
+            
+            return processed_results
+
+    def batch_call(self, requests: List[List[Dict[str, str]]]) -> List[Optional[Dict[str, Any]]]:
+        """
+        同步批量调用LLM API(兼容接口)
+        
+        参数:
+            requests: 请求列表,每个请求是一个消息列表
+            
+        返回:
+            结果列表,与输入请求一一对应
+        
+        注意: 此方法使用 workflow_manager.py 的全局事件循环,不再自行初始化事件循环
+        """
+        if HAS_AIOHTTP:
+            # 使用异步实现
+            # 注释掉异步初始化,直接使用 workflow_manager.py 设置的全局事件循环
+            # loop = asyncio.get_event_loop()
+            # if loop.is_running():
+            #     # 如果事件循环已经在运行,创建新的事件循环
+            #     import nest_asyncio
+            #     try:
+            #         nest_asyncio.apply()
+            #     except ImportError:
+            #         # 如果没有nest_asyncio,回退到同步调用
+            #         return self._batch_call_sync_fallback(requests)
+            # return loop.run_until_complete(self.batch_call_async(requests))
+            
+            # 使用 workflow_manager.py 的全局事件循环(如果已存在)
+            try:
+                # 获取 workflow_manager.py 设置的全局事件循环
+                loop = asyncio.get_event_loop()
+                # 直接使用全局循环,不进行任何初始化
+                return loop.run_until_complete(self.batch_call_async(requests))
+            except RuntimeError:
+                # 如果没有事件循环(workflow_manager.py 还未初始化),回退到同步调用
+                return self._batch_call_sync_fallback(requests)
+        else:
+            return self._batch_call_sync_fallback(requests)
+
+    def _batch_call_sync_fallback(self, requests: List[List[Dict[str, str]]]) -> List[Optional[Dict[str, Any]]]:
+        """
+        同步批量调用回退方案
+        """
+        if not HAS_REQUESTS:
+            raise ImportError("需要安装 requests 库才能使用同步调用模式")
+        
+        results = []
+        for req in requests:
+            try:
+                response = self._call_api_sync(req)
+                if "choices" in response and len(response["choices"]) > 0:
+                    content = response["choices"][0].get("message", {}).get("content", "")
+                    try:
+                        if "```json" in content:
+                            start = content.find("```json") + 7
+                            end = content.find("```", start)
+                            content = content[start:end].strip()
+                        elif "```" in content:
+                            start = content.find("```") + 3
+                            end = content.find("```", start)
+                            content = content[start:end].strip()
+                        results.append(json.loads(content))
+                    except json.JSONDecodeError:
+                        results.append({"raw_content": content})
+                else:
+                    results.append(None)
+            except Exception as e:
+                print(f"  LLM API调用错误: {e}")
+                results.append(None)
+        return results
+

+ 80 - 0
core/construction_review/component/doc_worker/utils/prompt_loader.py

@@ -0,0 +1,80 @@
+"""
+提示词模板加载器
+从prompt.yaml中加载提示词模板
+"""
+
+from __future__ import annotations
+
+import re
+from pathlib import Path
+from typing import Dict, Any
+import yaml
+
+
+class PromptLoader:
+    """提示词模板加载器"""
+
+    def __init__(self, prompt_file: Path | None = None):
+        """
+        初始化提示词加载器
+        
+        参数:
+            prompt_file: 提示词文件路径,如果为None则使用默认路径
+        """
+        if prompt_file is None:
+            prompt_file = Path(__file__).parent.parent / "config" / "prompt.yaml"
+        self._prompt_file = Path(prompt_file)
+        self._prompts: Dict[str, Any] = {}
+        self._load()
+
+    def _load(self):
+        """加载提示词文件"""
+        if not self._prompt_file.exists():
+            raise FileNotFoundError(f"提示词文件不存在: {self._prompt_file}")
+        with self._prompt_file.open("r", encoding="utf-8") as f:
+            self._prompts = yaml.safe_load(f) or {}
+
+    def get_template(self, template_name: str) -> Dict[str, str]:
+        """
+        获取提示词模板
+        
+        参数:
+            template_name: 模板名称(例如 "toc_classification")
+            
+        返回:
+            包含system和user_template的字典
+        """
+        template_config = self._prompts.get(template_name, {})
+        return {
+            "system": template_config.get("system", ""),
+            "user_template": template_config.get("user_template", "")
+        }
+
+    def render(self, template_name: str, **kwargs) -> Dict[str, str]:
+        """
+        渲染提示词模板
+        
+        参数:
+            template_name: 模板名称
+            **kwargs: 模板变量
+            
+        返回:
+            包含system和user消息的字典
+        """
+        template = self.get_template(template_name)
+        
+        # 渲染user模板
+        user_content = template["user_template"]
+        
+        # 替换模板变量 {{ variable }}
+        def replace_var(match):
+            var_name = match.group(1).strip()
+            return str(kwargs.get(var_name, match.group(0)))
+        
+        user_content = re.sub(r"\{\{\s*(\w+)\s*\}\}", replace_var, user_content)
+        
+        return {
+            "system": template["system"],
+            "user": user_content
+        }
+

+ 321 - 0
core/construction_review/component/doc_worker/utils/text_split_support.py

@@ -10,6 +10,8 @@ from __future__ import annotations
 from dataclasses import dataclass
 from typing import Any, Dict, List
 
+import re
+
 
 @dataclass
 class ChunkMetaBuilder:
@@ -108,9 +110,328 @@ class SimpleChunkSplitter:
         return len(text)
 
 
+class HierarchicalChunkMixin:
+    """
+    分级目录切分的通用工具 Mixin。
+
+    把原先 `PdfTextSplitter` / `DocxTextSplitter` 中完全相同的
+    chunk 元数据构造、层级路径、编号提取等方法抽到这里,
+    便于多种 worker 复用。
+    """
+
+    def _split_large_chunk(
+        self,
+        content: str,
+        max_chunk_size: int,
+        title: str,
+        hierarchy_path: List[str] | None = None,
+    ) -> List[Dict[str, Any]]:
+        """
+        将超大块按句子级分割(保持语义完整)。
+        """
+        sentences = re.split(r"([。!?\n])", content)
+
+        combined_sentences = []
+        for i in range(0, len(sentences) - 1, 2):
+            if i + 1 < len(sentences):
+                combined_sentences.append(sentences[i] + sentences[i + 1])
+            else:
+                combined_sentences.append(sentences[i])
+
+        if not combined_sentences:
+            combined_sentences = [content]
+
+        chunks: List[Dict[str, Any]] = []
+        current_chunk = ""
+        current_start = 0
+
+        for sentence in combined_sentences:
+            if len(current_chunk) + len(sentence) <= max_chunk_size:
+                current_chunk += sentence
+            else:
+                if current_chunk:
+                    chunk_data: Dict[str, Any] = {
+                        "content": current_chunk,
+                        "relative_start": current_start,
+                        "is_split": True,
+                    }
+                    if hierarchy_path is not None:
+                        chunk_data["hierarchy_path"] = hierarchy_path
+                    chunks.append(chunk_data)
+                    current_start += len(current_chunk)
+                current_chunk = sentence
+
+        if current_chunk:
+            chunk_data = {
+                "content": current_chunk,
+                "relative_start": current_start,
+                "is_split": True,
+            }
+            if hierarchy_path is not None:
+                chunk_data["hierarchy_path"] = hierarchy_path
+            chunks.append(chunk_data)
+
+        return chunks
+
+    def _build_hierarchy_path_for_subtitle(
+        self,
+        sub_title_item: Dict[str, Any],
+        all_toc_items: List[Dict[str, Any]],
+        parent_title_info: Dict[str, Any],
+    ) -> List[str]:
+        """为子标题构建完整的层级路径。"""
+        hierarchy_path: List[str] = []
+
+        sub_title = sub_title_item.get("title", "")
+        sub_title_idx = -1
+        for idx, item in enumerate(all_toc_items):
+            if item.get("title", "") == sub_title:
+                sub_title_idx = idx
+                break
+
+        if sub_title_idx < 0:
+            return [parent_title_info["title"], sub_title]
+
+        level_paths: Dict[int, str] = {}
+        current_level = sub_title_item.get("level", 2)
+
+        for i in range(sub_title_idx, -1, -1):
+            item = all_toc_items[i]
+            item_level = item.get("level", 1)
+
+            if item_level <= current_level and item_level not in level_paths:
+                level_paths[item_level] = item["title"]
+                if item_level == 1:
+                    break
+
+        for level in range(1, current_level + 1):
+            if level in level_paths:
+                hierarchy_path.append(level_paths[level])
+
+        if not hierarchy_path:
+            hierarchy_path = [parent_title_info["title"], sub_title]
+
+        return hierarchy_path
+
+    def _build_hierarchy_path(
+        self, title: str, all_toc_items: List[Dict[str, Any]], target_level: int
+    ) -> List[str]:
+        """构建从1级到当前标题的完整层级路径。"""
+        hierarchy_path: List[str] = []
+
+        current_item: Dict[str, Any] | None = None
+        current_idx = -1
+        for idx, item in enumerate(all_toc_items):
+            if item["title"] == title:
+                current_item = item
+                current_idx = idx
+                break
+
+        if not current_item:
+            return [title]
+
+        current_level = current_item.get("level", target_level)
+        level_paths: Dict[int, str] = {}
+
+        for i in range(current_idx, -1, -1):
+            item = all_toc_items[i]
+            item_level = item.get("level", 1)
+
+            if item_level <= current_level and item_level not in level_paths:
+                level_paths[item_level] = item["title"]
+                if item_level == 1:
+                    break
+
+        for level in range(1, current_level + 1):
+            if level in level_paths:
+                hierarchy_path.append(level_paths[level])
+            elif level == current_level:
+                hierarchy_path.append(title)
+
+        if not hierarchy_path:
+            hierarchy_path = [title]
+
+        return hierarchy_path
+
+    def _build_chunk_metadata(
+        self,
+        sub_chunk: Dict[str, Any],
+        title_info: Dict[str, Any],
+        start_pos: int,
+        pages_content: List[Dict[str, Any]],
+        i: int,
+        j: int,
+        chapter_classification_map: Dict[str, Dict[str, Any]] | None = None,
+    ) -> Dict[str, Any]:
+        """构建文本块的元数据。"""
+        content = sub_chunk["content"]
+        chunk_start_pos = start_pos + sub_chunk["relative_start"]
+        page_num = self._get_page_from_pos(chunk_start_pos, pages_content)
+
+        hierarchy_path = sub_chunk.get("hierarchy_path", [])
+        sub_title = sub_chunk.get("sub_title", "")
+
+        if hierarchy_path:
+            section_label = "->".join(hierarchy_path)
+        elif sub_title:
+            section_label = f"{title_info['title']}->{sub_title}"
+        else:
+            section_label = title_info["title"]
+
+        if hierarchy_path:
+            lowest_title = hierarchy_path[-1]
+            title_number = self._extract_title_number(lowest_title)
+        elif sub_title:
+            title_number = self._extract_title_number(sub_title)
+        else:
+            title_number = self._extract_title_number(title_info["title"])
+
+        chunk_id_str = f"doc_chunk_{title_number}_{j}" if title_number else f"doc_chunk_{j}"
+
+        chapter_classification = None
+        if chapter_classification_map:
+            if hierarchy_path and len(hierarchy_path) > 0:
+                chapter_title = hierarchy_path[0]
+                chapter_classification = chapter_classification_map.get(chapter_title)
+            elif not hierarchy_path:
+                chapter_title = title_info.get("title", "")
+                chapter_classification = chapter_classification_map.get(chapter_title)
+
+        chunk_data: Dict[str, Any] = {
+            "file_name": "",
+            "chunk_id": chunk_id_str,
+            "section_label": section_label,
+            "project_plan_type": title_info.get("category_code", "other"),
+            "chapter_classification": title_info.get("category_code", "other"),
+            "element_tag": {
+                "chunk_id": chunk_id_str,
+                "page": page_num,
+                "serial_number": title_number if title_number else str(i + 1),
+            },
+            "review_chunk_content": content,
+            "_title_number": title_number,
+            "_local_index": j,
+            "_sort_key": chunk_start_pos,
+        }
+
+        # if chapter_classification:
+        #     chunk_data["chapter_classification"] = chapter_classification
+
+        return chunk_data
+
+    def _finalize_chunk_ids(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+        """生成最终的chunk_id和serial_number。"""
+        final_chunks: List[Dict[str, Any]] = []
+        section_groups: Dict[str, int] = {}
+
+        for chunk in chunks:
+            section_label = chunk.get("section_label", "")
+
+            if section_label not in section_groups:
+                section_groups[section_label] = 1
+            else:
+                section_groups[section_label] += 1
+
+            local_index = section_groups[section_label]
+            title_number_path = self._extract_title_number_path(section_label)
+
+            if title_number_path:
+                chunk_id_str = f"doc_chunk_{title_number_path}_{local_index}"
+            else:
+                chunk_id_str = f"doc_chunk_{local_index}"
+
+            serial_number = self._extract_number_from_section_label(section_label)
+
+            final_chunk = {
+                "file_name": chunk["file_name"],
+                "chunk_id": chunk_id_str,
+                "section_label": chunk["section_label"],
+                "project_plan_type": chunk["project_plan_type"],
+                "chapter_classification": chunk["chapter_classification"],
+                "element_tag": {
+                    "chunk_id": chunk_id_str,
+                    "page": chunk["element_tag"]["page"],
+                    "serial_number": serial_number,
+                },
+                "review_chunk_content": chunk["review_chunk_content"],
+            }
+
+            final_chunks.append(final_chunk)
+
+        return final_chunks
+
+    def _get_page_from_pos(self, pos: int, pages_content: List[Dict[str, Any]]) -> int:
+        """根据位置获取页码。"""
+        for page in pages_content:
+            if page["start_pos"] <= pos < page["end_pos"]:
+                return int(page["page_num"])
+        return 1
+
+    def _extract_title_number(self, title: str) -> str:
+        """从标题中提取编号部分。"""
+        if not title:
+            return ""
+
+        if re.match(r"^(第[一二三四五六七八九十\d]+[章节条款部分])", title):
+            return re.match(r"^(第[一二三四五六七八九十\d]+[章节条款部分])", title).group(1)
+
+        if re.match(r"^(【\d+】)", title):
+            return re.match(r"^(【\d+】)", title).group(1)
+
+        if re.match(r"^(〖\d+(?:\.\d+)*〗)", title):
+            return re.match(r"^(〖\d+(?:\.\d+)*〗)", title).group(1)
+
+        if re.match(r"^(\d+(?:\.\d+)*)", title):
+            return re.match(r"^(\d+(?:\.\d+)*)", title).group(1)
+
+        if re.match(r"^([一二三四五六七八九十]+)[、..)\)]", title):
+            return re.match(r"^([一二三四五六七八九十]+)[、..)\)]", title).group(1)
+
+        if re.match(r"^([\((][一二三四五六七八九十\d]+[\))])", title):
+            return re.match(r"^([\((][一二三四五六七八九十\d]+[\))])", title).group(1)
+
+        return ""
+
+    def _extract_title_number_path(self, section_label: str) -> str:
+        """从section_label中提取标题路径的编号路径。"""
+        if not section_label:
+            return ""
+
+        parts = section_label.split("->")
+        number_paths: List[str] = []
+
+        for part in parts:
+            part = part.strip()
+            if part:
+                number = self._extract_title_number(part)
+                if number:
+                    number_paths.append(number)
+
+        if number_paths:
+            return "->".join(number_paths)
 
+        return ""
 
+    def _extract_number_from_section_label(self, section_label: str) -> str:
+        """从section_label中提取最底层级的编号。"""
+        if not section_label:
+            return ""
 
+        if "->" in section_label:
+            last_level_part = section_label.split("->")[-1].strip()
+        else:
+            last_level_part = section_label.strip()
 
+        if " + " in last_level_part:
+            merged_parts = last_level_part.split(" + ")
+            numbers: List[str] = []
+            for part in merged_parts:
+                part = part.strip()
+                number = self._extract_title_number(part)
+                if number:
+                    numbers.append(number)
 
+            if numbers:
+                return "+".join(numbers)
 
+        return self._extract_title_number(last_level_part)

+ 300 - 4
core/construction_review/component/doc_worker/utils/title_matcher.py

@@ -125,10 +125,119 @@ class TitleMatcher:
         """
         在文本中查找标题的近似位置(返回标题在文本中的精确起始位置)。
         
-        优化逻辑(参考 doc_worker):
-        1. 使用清理后的文本进行精确匹配
-        2. 移除所有空格后进行匹配
-        3. 行级模糊匹配作为最后手段
+        智能匹配策略:
+        1. 先用标题正文部分定位(可能有多个位置)
+        2. 再用编号部分确认
+        3. 如果编号确认不了,就使用定位到的元素所在行只有标题部分,没有其他字符(转义字符除外)的那个
+        4. 否则就直接确认第一个匹配位置
+        """
+        # 提取标题的编号部分和正文部分
+        title_number = self._extract_title_number(title)
+        title_content = self._extract_title_content(title)
+        
+        if not title_content:
+            # 如果没有正文部分,使用原来的逻辑
+            return self._find_title_in_text_legacy(title, text, fuzzy_threshold)
+        
+        # 移除转义字符后的文本
+        text_clean = self._remove_escape_chars(text)
+        title_content_clean = self._remove_escape_chars(title_content)
+        title_content_normalized = self._normalize_title(title_content_clean)
+        
+        if not title_content_normalized:
+            return -1
+        
+        # 查找所有匹配标题正文部分的位置
+        candidate_positions = []
+        
+        # 方法1: 按行查找(更高效)
+        lines = text.split('\n')
+        current_pos = 0
+        
+        for line in lines:
+            line_clean = self._remove_escape_chars(line)
+            line_normalized = self._normalize_title(line_clean)
+            
+            # 检查行中是否包含标题正文
+            if title_content_normalized in line_normalized:
+                # 找到标题在行中的位置
+                pos_in_line = line_normalized.find(title_content_normalized)
+                if pos_in_line >= 0:
+                    # 映射回原始行的位置
+                    line_pos = self._find_pattern_in_line(
+                        title_content_normalized, line, pos_in_line
+                    )
+                    if line_pos >= 0:
+                        candidate_positions.append(current_pos + line_pos)
+            
+            # 方法2: 移除空格后查找
+            title_no_space = title_content_normalized.replace(' ', '')
+            line_no_space = line_normalized.replace(' ', '')
+            if title_no_space and title_no_space in line_no_space:
+                pos_in_line = line_no_space.find(title_no_space)
+                if pos_in_line >= 0:
+                    line_pos = self._find_pattern_in_line(
+                        title_no_space, line, pos_in_line
+                    )
+                    if line_pos >= 0:
+                        pos = current_pos + line_pos
+                        if pos not in candidate_positions:
+                            candidate_positions.append(pos)
+            
+            current_pos += len(line) + 1  # +1 for newline
+        
+        if not candidate_positions:
+            # 如果没有找到任何位置,使用模糊匹配
+            return self._find_title_in_text_legacy(title, text, fuzzy_threshold)
+        
+        # 去重并排序
+        candidate_positions = sorted(set(candidate_positions))
+        
+        # 如果有编号部分,尝试用编号确认
+        if title_number:
+            for pos in candidate_positions:
+                # 检查该位置前后的文本是否包含编号
+                check_range = 50  # 检查前后50个字符
+                start_check = max(0, pos - check_range)
+                end_check = min(len(text), pos + check_range)
+                context = text[start_check:end_check]
+                
+                # 在上下文中查找编号
+                if self._check_number_in_context(title_number, context, pos - start_check):
+                    return pos
+        
+        # 如果编号确认不了,检查每个位置所在的行是否只有标题(没有其他字符)
+        best_pos = -1
+        best_score = -1
+        
+        for pos in candidate_positions:
+            # 找到该位置所在的行
+            line_start = text.rfind('\n', 0, pos) + 1
+            line_end = text.find('\n', pos)
+            if line_end == -1:
+                line_end = len(text)
+            
+            line_text = text[line_start:line_end]
+            line_clean = self._remove_escape_chars(line_text).strip()
+            
+            # 检查该行是否只包含标题(允许前后有少量空白和标点)
+            if self._is_line_only_title(line_clean, title_content_normalized):
+                # 计算匹配度(行越短、越接近标题,分数越高)
+                score = 1000 - len(line_clean)
+                if score > best_score:
+                    best_score = score
+                    best_pos = pos
+        
+        # 如果找到了只包含标题的行,返回该位置
+        if best_pos >= 0:
+            return best_pos
+        
+        # 否则返回第一个匹配位置
+        return candidate_positions[0]
+
+    def _find_title_in_text_legacy(self, title: str, text: str, fuzzy_threshold: float) -> int:
+        """
+        原有的标题查找逻辑(作为回退方案)
         """
         # 移除转义字符后的标题和文本
         title_clean = self._remove_escape_chars(title)
@@ -312,6 +421,46 @@ class TitleMatcher:
         
         return -1
     
+    def _find_pattern_in_line(self, pattern: str, line: str, pattern_pos_in_normalized: int) -> int:
+        """
+        在原始行中找到模式的位置
+        
+        参数:
+            pattern: 要查找的模式(已标准化)
+            line: 原始行文本
+            pattern_pos_in_normalized: 模式在标准化行中的位置
+            
+        返回:
+            int: 模式在原始行中的位置,如果未找到则返回-1
+        """
+        # 先尝试直接查找
+        if pattern in line:
+            return line.index(pattern)
+        
+        # 使用标准化后的行来映射位置
+        line_clean = self._remove_escape_chars(line)
+        line_normalized = self._normalize_title(line_clean)
+        
+        if pattern_pos_in_normalized >= len(line_normalized):
+            return -1
+        
+        # 通过字符对齐找到原始位置
+        clean_chars = 0
+        original_chars = 0
+        
+        for orig_char in line:
+            if clean_chars >= pattern_pos_in_normalized:
+                break
+            
+            orig_char_clean = self._remove_escape_chars(orig_char)
+            if orig_char_clean:
+                orig_char_normalized = self._normalize_title(orig_char_clean)
+                if orig_char_normalized:
+                    clean_chars += len(orig_char_normalized)
+            original_chars += 1
+        
+        return original_chars if original_chars < len(line) else -1
+
     def _find_pattern_in_original_window(self, pattern_clean: str, original_window: str, window_start_pos: int) -> int:
         """
         在原始窗口中找到清理后模式对应的位置。
@@ -345,6 +494,153 @@ class TitleMatcher:
                 return int(page["page_num"])
         return 1
 
+    def _extract_title_number(self, title: str) -> str:
+        """
+        从标题中提取编号部分
+        
+        例如:
+        "第一章 编制依据" -> "第一章"
+        "一、工程概况" -> "一"
+        "1. 施工计划" -> "1"
+        """
+        if not title:
+            return ""
+        
+        # 匹配章节格式(如 第一章、第1章等)
+        chapter_match = re.match(r'^(第[一二三四五六七八九十\d]+[章节条款部分])', title)
+        if chapter_match:
+            return chapter_match.group(1)
+        
+        # 匹配方括号数字格式(如 【1】、【2】等)
+        bracket_match = re.match(r'^(【\d+】)', title)
+        if bracket_match:
+            return bracket_match.group(1)
+        
+        # 匹配双方括号数字格式(如 〖1.1〗、〖2.3〗等)
+        double_bracket_match = re.match(r'^(〖\d+(?:\.\d+)*〗)', title)
+        if double_bracket_match:
+            return double_bracket_match.group(1)
+        
+        # 匹配数字编号格式(如 1.5, 1.6, 1.2.3等,可能后跟空格或、)
+        number_match = re.match(r'^(\d+(?:\.\d+)*)[\s、..]?', title)
+        if number_match:
+            return number_match.group(1)
+        
+        # 匹配中文编号格式(如 一、二、三等)
+        chinese_match = re.match(r'^([一二三四五六七八九十]+)[、..]', title)
+        if chinese_match:
+            return chinese_match.group(1)
+        
+        # 匹配圆括号编号格式(如 (1)、(一)等)
+        paren_match = re.match(r'^([\((][一二三四五六七八九十\d]+[\))])', title)
+        if paren_match:
+            return paren_match.group(1)
+        
+        return ""
+
+    def _extract_title_content(self, title: str) -> str:
+        """
+        从标题中提取正文部分(去除编号)
+        
+        例如:
+        "第一章 编制依据" -> "编制依据"
+        "一、工程概况" -> "工程概况"
+        "1. 施工计划" -> "施工计划"
+        """
+        if not title:
+            return title
+        
+        # 提取编号
+        number = self._extract_title_number(title)
+        if number:
+            # 移除编号部分
+            content = title[len(number):].strip()
+            # 移除可能的标点符号(如 "、", ".", " " 等)
+            content = re.sub(r'^[、..\s]+', '', content)
+            return content
+        
+        return title
+
+    def _check_number_in_context(self, number: str, context: str, title_pos_in_context: int) -> bool:
+        """
+        检查编号是否在标题位置的上下文中
+        
+        参数:
+            number: 编号字符串
+            context: 上下文文本
+            title_pos_in_context: 标题在上下文中的位置
+            
+        返回:
+            bool: 如果编号在标题附近找到则返回True
+        """
+        if not number:
+            return False
+        
+        # 在标题位置前后查找编号
+        # 编号可能在标题之前或之后
+        check_before = max(0, title_pos_in_context - len(number) - 10)
+        check_after = min(len(context), title_pos_in_context + 100)
+        
+        context_around = context[check_before:check_after]
+        
+        # 清理上下文用于匹配
+        context_clean = self._remove_escape_chars(context_around)
+        number_clean = self._remove_escape_chars(number)
+        
+        # 检查编号是否在上下文中
+        if number_clean in context_clean:
+            return True
+        
+        # 也检查移除空格后的匹配
+        context_no_space = context_clean.replace(' ', '')
+        number_no_space = number_clean.replace(' ', '')
+        if number_no_space and number_no_space in context_no_space:
+            return True
+        
+        return False
+
+    def _is_line_only_title(self, line_clean: str, title_content: str) -> bool:
+        """
+        检查行是否只包含标题(没有其他字符,转义字符除外)
+        
+        参数:
+            line_clean: 清理后的行文本
+            title_content: 标题正文部分
+            
+        返回:
+            bool: 如果行只包含标题则返回True
+        """
+        if not line_clean or not title_content:
+            return False
+        
+        # 标准化行文本和标题
+        line_normalized = self._normalize_title(line_clean)
+        title_normalized = self._normalize_title(title_content)
+        
+        # 如果行完全匹配标题
+        if line_normalized == title_normalized:
+            return True
+        
+        # 如果行以标题开头,后面只有空白或标点
+        if line_normalized.startswith(title_normalized):
+            remaining = line_normalized[len(title_normalized):].strip()
+            # 如果剩余部分只包含标点符号或空白,认为是匹配的
+            if not remaining or re.match(r'^[,。、;:!?\s]*$', remaining):
+                return True
+        
+        # 移除空格后比较
+        line_no_space = line_normalized.replace(' ', '')
+        title_no_space = title_normalized.replace(' ', '')
+        if line_no_space == title_no_space:
+            return True
+        
+        if line_no_space.startswith(title_no_space):
+            remaining = line_no_space[len(title_no_space):]
+            if not remaining or re.match(r'^[,。、;:!?]*$', remaining):
+                return True
+        
+        return False
+
 
 
 

+ 5 - 0
core/construction_review/component/doc_worker/命令

@@ -4,3 +4,8 @@ python -m file_parse.docx_worker.cli ".\路桥\47_四川川交路桥有限责任
 
 
 python -m file_parse.pdf_worker.cli "Z:\施工方案及编制依据案例库(第一阶段)1205\施工方案文档列表\44_四川公路桥梁建设集团有限公司镇巴(川陕界)至广安高速公路通广段C合同段C4项目经理部.pdf" -l 1 --max-size 3000 --min-size 50 -o ./output
+
+
+            user_code = first_account.get('userCode')
+
+

+ 13 - 12
core/construction_review/component/document_processor.py

@@ -11,12 +11,13 @@ import tempfile
 from pathlib import Path
 from typing import Dict, Any, Optional, Callable
 from datetime import datetime
+import asyncio
 
 from foundation.observability.logger.loggering import server_logger as logger
 
 # 引入doc_worker核心组件
 try:
-    from .doc_worker.interfaces import DocumentSource, TOCExtractor, FullTextExtractor, TextSplitter, HierarchyClassifier
+    from .doc_worker.interfaces import DocumentSource, TOCExtractor, FullTextExtractor, TextSplitter
     from .doc_worker.pdf_worker.toc_extractor import PdfTOCExtractor
     from .doc_worker.pdf_worker.fulltext_extractor import PdfFullTextExtractor
     from .doc_worker.pdf_worker.text_splitter import PdfTextSplitter
@@ -27,7 +28,7 @@ try:
     from .doc_worker.classification.hierarchy_classifier import HierarchyClassifier as DocxHierarchyClassifier
     from .doc_worker.config.provider import default_config_provider
 except ImportError:
-    from core.construction_review.component.doc_worker.interfaces import DocumentSource, TOCExtractor, FullTextExtractor, TextSplitter, HierarchyClassifier
+    from core.construction_review.component.doc_worker.interfaces import DocumentSource, TOCExtractor, FullTextExtractor, TextSplitter
     from core.construction_review.component.doc_worker.pdf_worker.toc_extractor import PdfTOCExtractor
     from core.construction_review.component.doc_worker.pdf_worker.fulltext_extractor import PdfFullTextExtractor
     from core.construction_review.component.doc_worker.pdf_worker.text_splitter import PdfTextSplitter
@@ -148,7 +149,10 @@ class DocumentProcessor:
 
             # 步骤3: 提取文档全文
             logger.info("步骤3: 提取文档全文")
-            pages_content = self.pdf_fulltext_extractor.extract_full_text(source)
+            # 将同步CPU/IO密集操作放入线程池,避免阻塞事件循环
+            pages_content = await asyncio.to_thread(
+                self.pdf_fulltext_extractor.extract_full_text, source
+            )
             
             if not pages_content:
                 logger.warning("无法提取文档全文,使用基础处理模式")
@@ -285,7 +289,10 @@ class DocumentProcessor:
 
             # 步骤3: 提取文档全文
             logger.info("步骤3: 提取文档全文")
-            pages_content = self.docx_fulltext_extractor.extract_full_text(source)
+            # 将同步CPU/IO密集操作放入线程池,避免阻塞事件循环
+            pages_content = await asyncio.to_thread(
+                self.docx_fulltext_extractor.extract_full_text, source
+            )
             
             if not pages_content:
                 logger.warning("无法提取文档全文,使用基础处理模式")
@@ -534,16 +541,10 @@ class DocumentProcessor:
             
             # 如果使用了智能处理,保留额外信息
             if is_smart_processing:
-                # if 'toc_info' in raw_content:
-                #     result['toc_info'] = raw_content['toc_info']
-                # if 'classification' in raw_content:
-                #     result['classification'] = raw_content['classification']
-
-                # 处理原始大纲,按章节层级结构化 - 复用doc_worker的逻辑
                 result['outline'] = self._create_outline_from_toc(raw_content.get('toc_info', {}))
 
-            # with open(rf"temp\document_temp\文档切分预处理结果.json", 'w', encoding='utf-8') as f:
-            #     json.dump(result, f, ensure_ascii=False, indent=4)
+            with open(rf"temp\document_temp\文档切分预处理结果.json", 'w', encoding='utf-8') as f:
+                json.dump(result, f, ensure_ascii=False, indent=4)
             return result
 
         except Exception as e:

+ 1 - 1
core/construction_review/component/reviewers/outline_reviewer.py

@@ -277,7 +277,7 @@ class OutlineReviewer:
         logger.info(f"开始次级大纲并发审查,有效项目数量: {len(valid_items)}")
 
         # 创建并发审查任务
-        semaphore = asyncio.Semaphore(5)  # 限制并发数为5,避免过载
+        semaphore = asyncio.Semaphore(20)  # 限制并发数为5,避免过载
         tasks = []
 
         for i, outline_item in valid_items:

+ 124 - 13
core/construction_review/workflows/ai_review_workflow.py

@@ -154,12 +154,13 @@ class AIReviewWorkflow:
         workflow.set_entry_point("start")
         workflow.add_edge("start", "initialize_progress")
         workflow.add_edge("initialize_progress", "ai_review")
-        workflow.add_edge("ai_review", "save_results")
+        # 删除默认边,由条件边控制路由
+        # workflow.add_edge("ai_review", "save_results")
         workflow.add_edge("save_results", "complete")
         workflow.add_edge("complete", END)
         workflow.add_edge("error_handler", END)
 
-        # 添加条件边(错误处理)
+        # 添加条件边(错误处理)- 替代默认边
         workflow.add_conditional_edges(
             "ai_review",
             self.inter_tool._check_ai_review_result,
@@ -268,9 +269,23 @@ class AIReviewWorkflow:
         """
         try:
             logger.info(f"AI审查节点开始执行,任务ID: {self.task_info.callback_task_id}")
+            test_designation_chunk_flag = self.task_info.get_test_designation_chunk_flag()
+            logger.info(f"测试定位标志: {test_designation_chunk_flag}")
 
             # 1. 准备审查单元数据
-            review_chunks, total_units, total_all_units = await self.core_fun._prepare_review_units(state)
+            review_chunks, total_units = await self.core_fun._prepare_review_units(state, test_designation_chunk_flag)
+
+            # 检查指定测试章节是否未找到
+            if test_designation_chunk_flag is not None and not review_chunks:
+                error_msg = f"AI审查测试失败:未找到指定审查标志「{test_designation_chunk_flag}」。请修改指定审查标识字段串,建议去除前后符号等(如书名号《》、括号()等),使用更简洁的关键词重新尝试。"
+                logger.error(f"🔴 {error_msg}")
+                return {
+                    "current_stage": "ai_review",
+                    "error_message": error_msg,
+                    "status": "failed",
+                    "messages": [AIMessage(content=error_msg)]
+                }
+
             logger.info(f"准备审查单元完成,总单元数: {total_units}, 实际审查: {len(review_chunks)}")
 
             if not review_chunks:
@@ -318,7 +333,7 @@ class AIReviewWorkflow:
                     # 准备编制依据审查数据
                     prep_basis_review_data = {
                         'content': prep_basis_content,
-                        'max_concurrent': 4
+                        'max_concurrent': 20
                     }
 
                     # 执行编制依据审查
@@ -348,7 +363,7 @@ class AIReviewWorkflow:
                     # 准备编制依据审查数据
                     timeliness_check_data = {
                         'content': prep_basis_content,
-                        'max_concurrent': 4
+                        'max_concurrent': 20
                     }
 
                     # 执行编制依据审查
@@ -534,7 +549,7 @@ class AIReviewWorkflow:
                 stage_name="AI审查",
                 current=50,
                 status="failed",
-                message=f"AI审查失败: {state['error_message']}",
+                message=f"{state['error_message']}",
                 overall_task_status="failed",
                 event_type="error"
             )
@@ -678,7 +693,7 @@ class AIReviewCoreFun:
         
         try:
 
-            semaphore = asyncio.Semaphore(2)  # 并发审查数
+            semaphore = asyncio.Semaphore(3)  # 并发审查数
 
             async def process_unit_and_notify(unit_index, unit_content):
                 """处理单个单元,完成后立即推送通知"""
@@ -729,15 +744,35 @@ class AIReviewCoreFun:
 
 
 
-    async def _prepare_review_units(self, state: AIReviewState) -> tuple:
+    async def _prepare_review_units(self, state: AIReviewState, test_designation_chunk_flag) -> tuple:
         """准备审查单元数据 (增加清理旧进度缓存)"""
         try:
             # 筛选要审查的单元
             all_chunks = state['structured_content']['chunks']
-            review_chunks = self._filter_review_units(all_chunks)
+            # 筛除编制依据章节
+            clearned_chunks = self._remove_basis_chunks(all_chunks)
+
+            # 判断是否需要筛选指定测试章节
+            if test_designation_chunk_flag is not None:
+                # 用户指定了测试章节,进行筛选
+                logger.info(f"开始筛选指定测试章节: {test_designation_chunk_flag}")
+                designation_test_chunk = self._designation_test_chunks(clearned_chunks, test_designation_chunk_flag)
+
+                if not designation_test_chunk:
+                    # 指定了测试章节但未找到,返回空列表
+                    logger.warning(f"未找到包含关键字 '{test_designation_chunk_flag}' 的测试章节,建议去除前后符号(如《》())使用简洁关键词")
+                    review_chunks = []
+                else:
+                    # 找到指定测试章节
+                    logger.info(f"找到 {len(designation_test_chunk)} 个指定测试章节")
+                    review_chunks = designation_test_chunk
+            else:
+                # 未指定测试章节,使用正常筛选流程
+                logger.info(f"未指定测试章节,使用正常筛选流程")
+                review_chunks = self._filter_review_units(clearned_chunks)
 
             total_units = len(review_chunks)
-            total_all_units = len(all_chunks)
+            logger.info(f"最终审查单元数量: {total_units}")
 
             # 【修复 3】: 任务开始前,清理 Redis 中的旧计数,防止进度条计算错误
             try:
@@ -749,13 +784,89 @@ class AIReviewCoreFun:
                     logger.info(f"已清理旧进度缓存: {completed_key}")
             except Exception as e:
                 logger.warning(f"清理进度缓存失败 (不影响主流程): {str(e)}")
-
-            logger.info(f"AI审查开始: 总单元数 {total_all_units}, 实际审查 {total_units} 个单元")
-            return review_chunks, total_units, total_all_units
+            return review_chunks, total_units
         except Exception as e:
             logger.error(f"准备审查单元失败: {str(e)}")
             raise
 
+    def _remove_basis_chunks(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+        """
+        筛除编制依据章节的chunks
+
+        Args:
+            chunks: 所有章节chunks列表
+
+        Returns:
+            List[Dict[str, Any]]: 筛除编制依据章节后的chunks列表
+
+        Note:
+            根据 chapter_classification 字段筛选,排除值为 "basis" 的章节
+        """
+        try:
+            filtered_chunks = []
+            removed_count = 0
+            logger.info(f"开始筛除编制依据章节")
+            for chunk in chunks:
+                # 检查章节分类字段
+                chapter_classification = chunk.get('chapter_classification', '')
+                
+                # 保留非编制依据章节
+                if chapter_classification != 'basis':
+                    logger.info(f"保留非编制依据章节,当前章节: {chapter_classification}")
+                    filtered_chunks.append(chunk)
+                else:
+                    removed_count += 1
+                    logger.debug(f"筛除编制依据章节: {chunk.get('section_label', '未知章节')}")
+
+            logger.info(f"编制依据章节筛除完成: 共筛除 {removed_count} 个章节, 保留 {len(filtered_chunks)} 个章节")
+
+            return filtered_chunks
+
+        except Exception as e:
+            logger.error(f"筛除编制依据章节失败: {str(e)}")
+            # 出错时返回原始列表
+            return chunks
+    def _designation_test_chunks(self, chunks: List[Dict[str, Any]],test_designation_chunk_flag:str) -> List[Dict[str, Any]]:
+        """筛选设计测试章节
+        
+        Args:
+            chunks: 所有章节chunks列表
+
+        Returns:
+            List[Dict[str, Any]]: 筛选后的chunks列表
+
+        Note:
+            根据 chapter_classification 字段筛选,排除值为 "designation_test" 的章节
+
+        Raises:
+            Exception: 筛选失败
+        
+        """
+        try: 
+            designation_chunks = []
+            filtered_count = 0
+
+            logger.info(f"开始筛选设计测试章节")
+            for chunk in chunks:
+                content = chunk.get('content', '')
+                section_label = chunk.get('section_label', '未知章节')
+                logger.info(f"正在处理章节: {section_label}")
+                if test_designation_chunk_flag in content or test_designation_chunk_flag in section_label:
+                    logger.info(f"已命中指定测试章节: {chunk.get('section_label', '未知章节')}")
+                    designation_chunks.append(chunk)
+                else:
+                    filtered_count += 1
+                    logger.debug(f"跳过章节: {chunk.get('section_label', '未知章节')}")
+                if not designation_chunks:
+                    logger.info(f"未找到指定测试章节,请修改关键字尝试!")
+
+            return designation_chunks
+                     
+        except Exception as e:
+            logger.error(f"筛选设计测试章节失败: {str(e)}")
+            # 出错时返回原始列表
+            return chunks
+
     async def _review_single_unit(self, unit_content: Dict[str, Any], unit_index: int,
                                   total_units: int, state: AIReviewState) -> ReviewResult:
         """

Fichier diff supprimé car celui-ci est trop grand
+ 4544 - 100
logs/agent_debug.log.1


Fichier diff supprimé car celui-ci est trop grand
+ 1401 - 134
logs/agent_debug.log.2


Fichier diff supprimé car celui-ci est trop grand
+ 0 - 0
logs/agent_debug.log.3


Fichier diff supprimé car celui-ci est trop grand
+ 0 - 164
logs/agent_debug.log.4


Fichier diff supprimé car celui-ci est trop grand
+ 0 - 4832
logs/agent_debug.log.5


Fichier diff supprimé car celui-ci est trop grand
+ 4544 - 100
logs/agent_info.log.1


Fichier diff supprimé car celui-ci est trop grand
+ 1401 - 134
logs/agent_info.log.2


Fichier diff supprimé car celui-ci est trop grand
+ 0 - 0
logs/agent_info.log.3


Fichier diff supprimé car celui-ci est trop grand
+ 0 - 0
logs/agent_info.log.4


Fichier diff supprimé car celui-ci est trop grand
+ 0 - 28
logs/agent_info.log.5


+ 7 - 0
views/construction_review/launch_review.py

@@ -88,6 +88,11 @@ class LaunchReviewRequest(BaseModel):
         description="工程方案类型: 01_pf_Found_Rotary_Drill(旋挖钻机、冲击钻机成孔桩), 02_pf_Dig_Manual_Pile(人工挖孔桩), 03_bd_Sub_Cyl_Pier(圆柱墩、系梁、盖梁), 04_bd_Sub_Rect_Turn(矩形墩采用翻模工艺、系梁、盖梁), 05_bd_High_Rect_Slide(矩形墩采用爬模工艺、系梁、盖梁), 06_bu_Pre_SS_Beam(简支梁预制、运输及架桥机安装), 07_bu_Erect_Truck_TBeam(汽车式起重机安装T梁), 08_bu_Cast_Col_Support(梁柱式支架), 09_bu_Cast_Full_Support(满堂式支架), 10_bu_Cast_Cant_Trolley(挂篮), 11_se_Elev_Lift_Proj(起重吊装工程), 12_se_Tower_Crane_Proj(起重吊装设备安装), 13_o_Height_Work_Op(高空作业)"
     )
 
+    test_designation_chunk_flag: Optional[str] = Field(  # 标注为可选字符串
+        None,  
+        description="测试定位标志符,用于指定特定审查片段(可选字段)"
+    )
+
     class Config:
         extra = "forbid"  # 禁止额外的字段
 
@@ -189,6 +194,7 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
     review_config = request_data.review_config
     project_plan_type = request_data.project_plan_type
     tendency_review_role = request_data.tendency_review_role
+    test_designation_chunk_flag = request_data.test_designation_chunk_flag
 
     logger.info(f"收到审查启动SSE请求: callback_task_id={callback_task_id}, user_id={user_id}, tendency_review_role={tendency_review_role}")
 
@@ -274,6 +280,7 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
                     'review_config': review_config,
                     'project_plan_type': project_plan_type,
                     'tendency_review_role': tendency_review_role,
+                    'test_designation_chunk_flag': test_designation_chunk_flag,
                     'launched_at': int(time.time()),
                     'callback_task_id': callback_task_id  # 确保使用当前正确的callback_task_id
                 })

Certains fichiers n'ont pas été affichés car il y a eu trop de fichiers modifiés dans ce diff