Преглед на файлове

Merge branch 'dev' of http://192.168.0.3:3000/CRBC-MaaS-Platform-Project/LQAgentPlatform into dev

ChenJiSheng преди 1 месец
родител
ревизия
48b21241ac

+ 36 - 20
core/construction_review/component/ai_review_engine.py

@@ -69,6 +69,7 @@ from foundation.ai.rag.retrieval.entities_enhance import entity_enhance
 from foundation.ai.rag.retrieval.query_rewrite import query_rewrite_manager
 from foundation.infrastructure.config.config import config_handler
 from foundation.observability.logger.loggering import server_logger as logger
+from core.construction_review.component.reviewers.utils.directory_extraction import BasisItems
 
 from pathlib import Path
 import sys
@@ -1161,23 +1162,30 @@ class AIReviewEngine(BaseReviewer):
             logger.info(f"开始编制依据审查,trace_id: {trace_id}")
 
             # 提取关键数据
-            review_content = review_data.get('content', '')
+            basis_items: BasisItems = review_data.get('basis_items')
+            review_content_text = review_data.get('content', '')
             max_concurrent = review_data.get('max_concurrent', 10)
 
-            # 添加调试信息
-            logger.info(f"提取的编制依据内容长度: {len(review_content)}")
-            if review_content:
-                logger.info(f"编制依据内容预览: {review_content[:50]}...")
+            basis_list = []
+            if basis_items and getattr(basis_items, "items", None):
+                basis_list = [item.raw for item in basis_items.items if getattr(item, "raw", None)]
+                review_content_text = review_content_text or "\n".join(basis_list)
+
+            logger.info(f"提取的编制依据条目数: {len(basis_list)}")
+            if basis_list:
+                logger.info(f"编制依据内容预览: {basis_list[0][:50]}...")
+            elif review_content_text:
+                logger.info(f"编制依据内容预览(文本): {review_content_text[:50]}...")
             else:
                 logger.warning("编制依据内容为空,将跳过审查")
 
             # 检查是否有有效的编制依据内容
-            if not review_content or not review_content.strip():
+            if not basis_list:
                 logger.warning("没有可执行的编制依据审查任务")
                 return {
                     "reference_basis_review_results": {
                         "review_results": [],
-                        "review_content": review_content,
+                        "review_content": review_content_text,
                         "total_basis_items": 0,
                         "valid_items": 0,
                         "standard_items": 0,
@@ -1200,7 +1208,7 @@ class AIReviewEngine(BaseReviewer):
                     from core.construction_review.component.reviewers.reference_basis_reviewer import BasisReviewService
                     async with BasisReviewService(max_concurrent=max_concurrent) as service:
                         reference_basis_review_results = await service.review_all(
-                            review_content,
+                            basis_items,
                             collection_name="first_bfp_collection_status",
                             progress_manager=progress_manager,
                             callback_task_id=callback_task_id
@@ -1229,7 +1237,7 @@ class AIReviewEngine(BaseReviewer):
                 return {
                     "reference_basis_review_results": {
                         "review_results": [],
-                        "review_content": review_content,
+                        "review_content": review_content_text,
                         "total_basis_items": 0,
                         "valid_items": 0,
                         "standard_items": 0,
@@ -1242,7 +1250,7 @@ class AIReviewEngine(BaseReviewer):
             return {
                 "reference_basis_review_results": {
                     "review_results": reference_basis_review_results,
-                    "review_content": review_content,
+                    "review_content": review_content_text,
                     "total_basis_items": total_items,
                     "valid_items": valid_items,
                     "standard_items": standard_items,
@@ -1287,23 +1295,31 @@ class AIReviewEngine(BaseReviewer):
             logger.info(f"开始编制依据审查,trace_id: {trace_id}")
 
             # 提取关键数据
-            review_content = review_data.get('content', '')
+            basis_items: BasisItems = review_data.get('basis_items')
+            review_content_text = review_data.get('content', '')
             max_concurrent = review_data.get('max_concurrent', 10)
 
-            # 添加调试信息
-            logger.info(f"提取的编制依据内容长度: {len(review_content)}")
-            if review_content:
-                logger.info(f"编制依据内容预览: {review_content[:50]}...")
+            # 基于BasisItems计算统计信息
+            basis_list = []
+            if basis_items and getattr(basis_items, "items", None):
+                basis_list = [item.raw for item in basis_items.items if getattr(item, "raw", None)]
+                review_content_text = review_content_text or "\n".join(basis_list)
+
+            logger.info(f"提取的编制依据条目数: {len(basis_list)}")
+            if basis_list:
+                logger.info(f"编制依据内容预览: {basis_list[0][:50]}...")
+            elif review_content_text:
+                logger.info(f"编制依据内容预览(文本): {review_content_text[:50]}...")
             else:
                 logger.warning("编制依据内容为空,将跳过审查")
 
             # 检查是否有有效的编制依据内容
-            if not review_content or not review_content.strip():
+            if not basis_list:
                 logger.warning("没有可执行的编制依据审查任务")
                 return {
                     "timeliness_basis_review_results": {
                         "review_results": [],
-                        "review_content": review_content,
+                        "review_content": review_content_text,
                         "total_basis_items": 0,
                         "valid_items": 0,
                         "standard_items": 0,
@@ -1326,7 +1342,7 @@ class AIReviewEngine(BaseReviewer):
                     from core.construction_review.component.reviewers.timeliness_basis_reviewer import BasisReviewService
                     async with BasisReviewService(max_concurrent=max_concurrent) as service:
                         timeliness_basis_review_results = await service.review_all(
-                            review_content,
+                            basis_items,
                             collection_name="first_bfp_collection_status",
                             progress_manager=progress_manager,
                             callback_task_id=callback_task_id
@@ -1355,7 +1371,7 @@ class AIReviewEngine(BaseReviewer):
                 return {
                     "timeliness_basis_review_results": {
                         "review_results": [],
-                        "review_content": review_content,
+                        "review_content": review_content_text,
                         "total_basis_items": 0,
                         "valid_items": 0,
                         "standard_items": 0,
@@ -1368,7 +1384,7 @@ class AIReviewEngine(BaseReviewer):
             return {
                 "timeliness_basis_review_results": {
                     "review_results": timeliness_basis_review_results,
-                    "review_content": review_content,
+                    "review_content": review_content_text,
                     "total_basis_items": total_items,
                     "valid_items": valid_items,
                     "standard_items": standard_items,

+ 26 - 96
core/construction_review/component/reviewers/reference_basis_reviewer.py

@@ -1,61 +1,22 @@
-import os
-import sys
-import json
-import re
+from __future__ import annotations
+
+import asyncio
 import time
+import yaml
 from typing import Any, Dict, List, Optional
-import asyncio
-
-project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../../'))
-# 将根目录添加到sys.path
-sys.path.append(project_root)
-
-# 导入必要的依赖
-try:
-    from pymilvus import connections, Collection
-    from foundation.infrastructure.config.config import config_handler
-    from foundation.ai.models.model_handler import model_handler as mh
-    from foundation.ai.agent.generate.model_generate import generate_model_client
-    from core.construction_review.component.reviewers.utils.prompt_loader import prompt_loader
-    from core.construction_review.component.reviewers.utils.inter_tool import InterTool
-    from foundation.observability.logger.loggering import server_logger as logger
-    from langchain_core.prompts import ChatPromptTemplate
-    from langchain_milvus import Milvus, BM25BuiltInFunction
-    from functools import partial
-    
-except ImportError as e:
-    logger.warning(f"Warning: 无法导入依赖: {e}")
-    # 设置默认值,避免程序崩溃
-    mh = None
-    generate_model_client = None
-    prompt_loader = None
-    logger = None
-    InterTool = None
-
-
-class TextProcessor:
-    """文本处理工具类"""
-
-    @staticmethod
-    def extract_basis(text: str) -> List[str]:
-        """从文本中提取编制依据,支持:《》《》(…)《》【…】"""
-        pattern = re.compile(
-            r'《[^》]+》'                 # 《标题》
-            r'(?:([^)]+)|【[^】]+】)?'  # 可选的(…)或【…】
-        )
-        return pattern.findall(text)
-
-
 
+from core.construction_review.component.reviewers.utils.directory_extraction import BasisItem, BasisItems
+from core.construction_review.component.reviewers.utils.inter_tool import InterTool
+from core.construction_review.component.reviewers.utils.prompt_loader import PromptLoader
+from foundation.ai.agent.generate.model_generate import generate_model_client
+from foundation.observability.logger.loggering import server_logger as logger
+from langchain_core.prompts import ChatPromptTemplate
 
 class StandardizedResponseProcessor:
     """标准化响应处理器 - 统一为outline_reviewer.py格式"""
 
     def __init__(self):
-        if InterTool:
-            self.inter_tool = InterTool()
-        else:
-            self.inter_tool = None
+        self.inter_tool = InterTool()
 
     def process_llm_response(self, response_text: str, check_name: str, chapter_code: str,check_item_code:str) -> List[Dict[str, Any]]:
         """
@@ -105,11 +66,8 @@ class MessageBuilder:
 
     def __init__(self, prompt_loader_instance=None):
         self.prompt_loader = prompt_loader_instance
-
-    
-
+        
     def get_prompt_template(self):
-        import yaml
         with open("core/construction_review/component/reviewers/prompt/reference_basis_reviewer.yaml", "r", encoding="utf-8") as f:
             data = yaml.safe_load(f)
         return ChatPromptTemplate.from_messages([
@@ -149,10 +107,7 @@ class BasisReviewService:
 
     def __init__(self, max_concurrent: int = 4):
         self.llm_client = LLMReviewClient()
-        self.text_processor = TextProcessor()
-        self.response_processor = StandardizedResponseProcessor()  # 标准化处理器
-        # 确保使用最新的prompt_loader实例
-        from core.construction_review.component.reviewers.utils.prompt_loader import PromptLoader
+        self.response_processor = StandardizedResponseProcessor()
         fresh_prompt_loader = PromptLoader()
         self.message_builder = MessageBuilder(fresh_prompt_loader)
         self.max_concurrent = max_concurrent
@@ -211,40 +166,13 @@ class BasisReviewService:
                     "risk_info": {"risk_level": "high"}
                 }]
 
-    
-    
-    async def _async_search_basis(
-        self,
-        basis: str,
-        collection_name: str,
-        top_k_each: int
-    ) -> List[dict]:
-        """异步搜索单个编制依据(Hybrid Search)"""
-        try:
-            loop = asyncio.get_running_loop()
-            func = partial(
-                self.search_engine.hybrid_search,
-                collection_name=collection_name,
-                query_text=basis,
-                top_k=top_k_each,
-                ranker_type="weighted",
-                dense_weight=0.3,
-                sparse_weight=0.7
-            )
-            retrieved = await loop.run_in_executor(None, func)
-            logger.info(f" 搜索 '{basis}' -> 找到 {len(retrieved or [])} 个结果")
-            return retrieved or []
-        except Exception as e:
-            logger.error(f" 搜索失败 '{basis}': {e}")
-            return []
-
-    
-    async def review_all(self, text: str, collection_name: str = "first_bfp_collection_status",
+    async def review_all(self, basis_items: BasisItems, collection_name: str = "first_bfp_collection_status",
                         progress_manager=None, callback_task_id: str = None) -> List[List[Dict[str, Any]]]:
-        """异步批量审查所有编制依据"""
-        # items = self.text_processor.extract_basis(text)
-        from core.construction_review.component.reviewers.utils.directory_extraction import extract_basis_with_langchain_qwen
-        items = [item.raw for item in extract_basis_with_langchain_qwen(text).items]
+        """异步批量审查所有编制依据(BasisItems 入参)"""
+        if not basis_items or not getattr(basis_items, "items", None):
+            return []
+        
+        items = [item.raw for item in basis_items.items if getattr(item, "raw", None)]
         if not items:
             return []
 
@@ -404,12 +332,14 @@ async def review_basis_batch_async(basis_items: List[str], max_concurrent: int =
         return await service.review_batch(basis_items)
 
 
-async def review_all_basis_async(text: str, max_concurrent: int = 4) -> List[List[Dict[str, Any]]]:
-    """异步全部审查便捷函数"""
+async def review_all_basis_async(basis_items: BasisItems, max_concurrent: int = 4) -> List[List[Dict[str, Any]]]:
+    """异步全部审查便捷函数(BasisItems 入参)"""
     async with BasisReviewService(max_concurrent=max_concurrent) as service:
-        return await service.review_all(text)
+        return await service.review_all(basis_items)
 
 if __name__ == "__main__":
     # 简单测试
-    test_text = "《中华人民共和国特  种设备安全法》(2023)"
-    result = asyncio.run(review_all_basis_async(test_text))
+    test_basis_items = BasisItems(items=[
+        BasisItem(title="中华人民共和国特种设备安全法", suffix="2023", raw="《中华人民共和国特种设备安全法》(2023)")
+    ])
+    result = asyncio.run(review_all_basis_async(test_basis_items))

+ 48 - 144
core/construction_review/component/reviewers/timeliness_basis_reviewer.py

@@ -1,61 +1,25 @@
-import os
-import sys
+from __future__ import annotations
+
 import json
-import re
 import time
-from typing import Any, Dict, List, Optional
 import asyncio
-
-project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../../'))
-# 将根目录添加到sys.path
-sys.path.append(project_root)
-
-# 导入必要的依赖
-try:
-    from pymilvus import connections, Collection
-    from foundation.infrastructure.config.config import config_handler
-    from foundation.ai.models.model_handler import model_handler as mh
-    from foundation.ai.agent.generate.model_generate import generate_model_client
-    from core.construction_review.component.reviewers.utils.prompt_loader import prompt_loader
-    from core.construction_review.component.reviewers.utils.inter_tool import InterTool
-    from foundation.observability.logger.loggering import server_logger as logger
-    from langchain_core.prompts import ChatPromptTemplate
-    from langchain_milvus import Milvus, BM25BuiltInFunction
-    from functools import partial
-    
-except ImportError as e:
-    logger.warning(f"Warning: 无法导入依赖: {e}")
-    # 设置默认值,避免程序崩溃
-    mh = None
-    generate_model_client = None
-    prompt_loader = None
-    logger = None
-    InterTool = None
-
-
-class TextProcessor:
-    """文本处理工具类"""
-
-    @staticmethod
-    def extract_basis(text: str) -> List[str]:
-        """从文本中提取编制依据,支持:《》《》(…)《》【…】"""
-        pattern = re.compile(
-            r'《[^》]+》'                 # 《标题》
-            r'(?:([^)]+)|【[^】]+】)?'  # 可选的(…)或【…】
-        )
-        return pattern.findall(text)
-
-
-
+from typing import Any, Dict, List
+from functools import partial
+
+from langchain_milvus import Milvus, BM25BuiltInFunction
+from foundation.infrastructure.config.config import config_handler
+from foundation.ai.models.model_handler import model_handler as mh
+from core.construction_review.component.reviewers.utils.inter_tool import InterTool
+from core.construction_review.component.reviewers.utils.directory_extraction import BasisItems, BasisItem
+from foundation.observability.logger.loggering import server_logger as logger
+from core.construction_review.component.reviewers.utils.reference_matcher import match_reference_files
+from core.construction_review.component.reviewers.utils.timeliness_determiner import determine_timeliness_issue
 
 class StandardizedResponseProcessor:
-    """标准化响应处理器 - 统一为outline_reviewer.py格式"""
+    """标准化响应处理器"""
 
     def __init__(self):
-        if InterTool:
-            self.inter_tool = InterTool()
-        else:
-            self.inter_tool = None
+        self.inter_tool = InterTool()
 
     def process_llm_response(self, response_text: str, check_name: str , chapter_code: str ,check_item_code:str) -> List[Dict[str, Any]]:
         """
@@ -70,10 +34,6 @@ class StandardizedResponseProcessor:
         Returns:
             List[Dict]: 标准格式的审查结果列表
         """
-        if not self.inter_tool:
-            logger.warning("InterTool未初始化,返回空结果")
-            return []
-
         try:
             json_data = response_text
 
@@ -104,23 +64,6 @@ class StandardizedResponseProcessor:
             }]
 
 
-class MessageBuilder:
-    """消息构建工具类"""
-
-    def __init__(self, prompt_loader_instance=None):
-        self.prompt_loader = prompt_loader_instance
-
-    
-
-    def get_prompt_template(self):
-        import yaml
-        with open("core/construction_review/component/reviewers/prompt/timeliness_basis_reviewer.yaml", "r", encoding="utf-8") as f:
-            data = yaml.safe_load(f)
-        return ChatPromptTemplate.from_messages([
-                ("system", data["timeliness_basis_reviewer"]["system_prompt"]),
-                ("user", data["timeliness_basis_reviewer"]["user_prompt_template"])
-            ])
-    
 class BasisSearchEngine:
     """编制依据向量搜索引擎"""
 
@@ -138,11 +81,8 @@ class BasisSearchEngine:
             self.password = config_handler.get('milvus', 'MILVUS_PASSWORD')
 
             # 初始化嵌入模型
-            if mh:
-                self.emdmodel = mh._get_lq_qwen3_8b_emd()
-                logger.info(" 嵌入模型初始化成功")
-            else:
-                raise ImportError("无法获取嵌入模型")
+            self.emdmodel = mh._get_lq_qwen3_8b_emd()
+            logger.info("嵌入模型初始化成功")
 
         except Exception as e:
             logger.error(f" BasisSearchEngine 初始化失败: {e}")
@@ -205,45 +145,12 @@ class BasisSearchEngine:
             logger.error(f" 搜索失败: {e}")
 
 
-class LLMReviewClient:
-    """LLM审查客户端"""
-
-    async def review_basis(self, Message: str, trace_id: str = None) -> str:
-        try:
-        
-            task_prompt_info = {
-                "task_prompt": Message,
-                "task_name": "规范性引用文件识别与状态判断"
-            }
-            logger.info(f" 模型调用准备阶段: {task_prompt_info}")
-
-            # 调用统一模型客户端 - 编制依据审查设置90秒超时
-            response = await generate_model_client.get_model_generate_invoke(
-                trace_id=trace_id,
-                task_prompt_info=task_prompt_info,
-                timeout=90
-            )
-            return response
-
-        except Exception as e:
-            logger.error(f" 模型调用准备阶段失败: {e}")
-            # 返回空JSON数组字符串以防解析崩溃
-            return "[]"
-        # ==================== 修复结束 ====================
-
-
 class BasisReviewService:
     """编制依据审查服务核心类"""
 
     def __init__(self, max_concurrent: int = 4):
         self.search_engine = BasisSearchEngine()
-        self.llm_client = LLMReviewClient()
-        self.text_processor = TextProcessor()
-        self.response_processor = StandardizedResponseProcessor()  # 标准化处理器
-        # 确保使用最新的prompt_loader实例
-        from core.construction_review.component.reviewers.utils.prompt_loader import PromptLoader
-        fresh_prompt_loader = PromptLoader()
-        self.message_builder = MessageBuilder(fresh_prompt_loader)
+        self.response_processor = StandardizedResponseProcessor()
         self.max_concurrent = max_concurrent
         self._semaphore = None
 
@@ -261,8 +168,6 @@ class BasisReviewService:
         self,
         basis_items: List[str],
         collection_name: str = "first_bfp_collection_status",
-        filters: Optional[Dict[str, Any]] = None,
-        min_score: float = 0.3,
         top_k_each: int = 3,
     ) -> List[Dict[str, Any]]:
         """异步批次审查(通常3条)"""
@@ -292,24 +197,10 @@ class BasisReviewService:
                         # result 是 List[dict],需要遍历
                         texts = [item["text_content"] for item in result if "text_content" in item]
                         grouped_candidates.append(texts)
-                print("搜索结果:\n"+str(grouped_candidates))
-
-                # # 构建提示词模板和用户内容
-                # prompt_template = self.message_builder.get_prompt_template()
-                # message = prompt_template.partial(reference_content=grouped_candidates, check_content=basis_items)
-                # trace_id = f"prep_basis_batch_{int(time.time())}"
-                # llm_out = await self.llm_client.review_basis(message, trace_id)
-                
-
-                from core.construction_review.component.reviewers.utils.reference_matcher import match_reference_files
-                from core.construction_review.component.reviewers.utils.timeliness_determiner import determine_timeliness_issue
                 
                 llm_out = await determine_timeliness_issue(await match_reference_files(reference_text=grouped_candidates, review_text=basis_items))
                 
-                
-                standardized_result = self.response_processor.process_llm_response(llm_out, "timeliness_check", "basis","basis_timeliness_check")
-                print("标准化处理器处理响应:\n")
-                print(standardized_result)
+                standardized_result = self.response_processor.process_llm_response(llm_out, "timeliness_check", "basis", "basis_timeliness_check")
                 # 统计问题数量
                 issue_count = sum(1 for item in standardized_result if item.get('exist_issue', False))
                 logger.info(f"编制依据批次审查完成:总计 {len(basis_items)} 项,发现问题 {issue_count} 项")
@@ -317,7 +208,7 @@ class BasisReviewService:
                 return standardized_result
 
             except Exception as e:
-                logger.error(f" 批次处理失败1: {e}")
+                logger.error(f" 批次处理失败: {e}")
                 return [{
                     "check_item": "timeliness_check",
                     "chapter_code": "basis",
@@ -355,12 +246,13 @@ class BasisReviewService:
             return []
 
     
-    async def review_all(self, text: str, collection_name: str = "first_bfp_collection_status",
+    async def review_all(self, basis_items: BasisItems, collection_name: str = "first_bfp_collection_status",
                         progress_manager=None, callback_task_id: str = None) -> List[List[Dict[str, Any]]]:
-        """异步批量审查所有编制依据"""
-        from core.construction_review.component.reviewers.utils.directory_extraction import extract_basis_with_langchain_qwen
-        items = [item.raw for item in extract_basis_with_langchain_qwen(text).items]
-        #items = self.text_processor.extract_basis(text)
+        """异步批量审查所有编制依据(入参为 BasisItems)"""
+        if not basis_items or not getattr(basis_items, "items", None):
+            return []
+
+        items = [item.raw for item in basis_items.items if getattr(item, "raw", None)]
         if not items:
             return []
 
@@ -519,16 +411,28 @@ async def review_basis_batch_async(basis_items: List[str], max_concurrent: int =
         return await service.review_batch(basis_items)
 
 
-async def review_all_basis_async(text: str, max_concurrent: int = 4) -> List[List[Dict[str, Any]]]:
-    """异步全部审查便捷函数"""
+async def review_all_basis_async(basis_items: BasisItems, max_concurrent: int = 4) -> List[List[Dict[str, Any]]]:
+    """异步全部审查便捷函数(BasisItems 入参)"""
     async with BasisReviewService(max_concurrent=max_concurrent) as service:
-        return await service.review_all(text)
+        return await service.review_all(basis_items)
 
 if __name__ == "__main__":
-    # 简单测试
-    test_text = """
-使用要求:按照《坠落防护水平生命线装置》GB 38454 或《电力高处作业防坠
-器》DL/T 1147 中的规定进行现场实验,实验结果符合《坠落防护挂点装置》GB
-30862 中的规定
-    """
-    result = asyncio.run(review_all_basis_async(test_text))
+    # 直接构造 BasisItems 测试 review_all
+    test_basis_items = BasisItems(items=[
+        BasisItem(title="坠落防护水平生命线装置", suffix="GB 38454", raw="《坠落防护水平生命线装置》GB 38454"),
+        BasisItem(title="电力高处作业防坠器", suffix="DL/T 1147", raw="《电力高处作业防坠器》DL/T 1147"),
+        BasisItem(title="坠落防护挂点装置", suffix="GB 30862", raw="《坠落防护挂点装置》GB 30862"),
+        BasisItem(title="混凝土结构设计规范", suffix="GB 50010-2010", raw="《混凝土结构设计规范》GB 50010-2010"),
+        BasisItem(title="建筑施工组织设计规范", suffix="GB/T 50502-2015", raw="《建筑施工组织设计规范》GB/T 50502-2015"),
+    ])
+    
+    print(f"测试 {len(test_basis_items.items)} 项编制依据:")
+    for idx, item in enumerate(test_basis_items.items, 1):
+        print(f"  {idx}. {item.raw}")
+    
+    print("\n开始异步审查...")
+    result = asyncio.run(review_all_basis_async(test_basis_items))
+    
+    print("\n审查结果:")
+    print(json.dumps(result, ensure_ascii=False, indent=2))
+

+ 66 - 14
core/construction_review/component/reviewers/utils/directory_extraction.py

@@ -13,11 +13,13 @@ from __future__ import annotations
 
 import json  # ✅ 最小修改:新增
 import re
+import asyncio
 from typing import List
 
 from pydantic import BaseModel, Field, ValidationError  # ✅ 最小修改:新增 ValidationError
 from langchain_core.prompts import ChatPromptTemplate
 from langchain_core.output_parsers import PydanticOutputParser, StrOutputParser  # ✅ 最小修改:新增 StrOutputParser
+from langchain_openai import ChatOpenAI  # ✅ 新增:OpenAI兼容的API调用
 
 from foundation.observability.logger.loggering import server_logger as logger
 
@@ -105,18 +107,22 @@ def extract_first_json(text: str) -> dict:
 
 
 # --------- 5) 主函数:使用 LangChain 抽取(最小改动版) ---------
-def extract_basis_with_langchain_qwen(text: str) -> BasisItems:
+async def extract_basis_with_langchain_qwen(progress_manager,callback_task_id,text: str) -> BasisItems:
     """
-    使用 LangChain + LLM 提取编制依据信息
+    使用 LangChain + LLM 提取编制依据信息(流式输出版本)
     """
     # 标准化文本(无论是否走LLM,都先做)
     text = text.replace("\r\n", "\n").replace("\r", "\n")
 
     try:
-        from foundation.ai.models.model_handler import model_handler
-
-        # 获取模型实例
-        llm = model_handler._get_qwen_model()
+        #✅ 修改:使用 ChatOpenAI 封装Qwen3-8B流式API
+        llm = ChatOpenAI(
+            model="Qwen3-8B",
+            base_url="http://192.168.91.253:9002/v1",
+            api_key="EMPTY",            # 占位
+            streaming=True,             # ✅ 开启流式
+            temperature=0.7,
+        )
 
         # 创建解析器(✅ 保留:仅用于 format_instructions)
         parser = PydanticOutputParser(pydantic_object=BasisItems)
@@ -126,12 +132,54 @@ def extract_basis_with_langchain_qwen(text: str) -> BasisItems:
 
         logger.info(f"[编制依据提取] 开始使用 LLM 提取,文本长度: {len(text)}")
 
-        # 调用模型 -> raw_out 是 str
-        raw_out = chain.invoke({
+        # 推送开始消息
+        if progress_manager and callback_task_id:
+            try:
+                await progress_manager.update_stage_progress(
+                    callback_task_id=callback_task_id,
+                    stage_name="AI审查",
+                    current=0,
+                    status="processing",
+                    message=f"开始编制依据提取",
+                    overall_task_status="processing",
+                    event_type="processing"
+                )
+            except Exception as e:
+                logger.error(f"SSE推送开始消息失败: {e}")
+
+        # 流式调用模型,每检测到5个}符号推送一次进度
+        raw_out = ""
+        brace_count = 0
+        
+        # chain.stream() 返回流式迭代器
+        async for chunk in chain.astream({
             "input_text": text,
             "format_instructions": parser.get_format_instructions()
-        })
-        print(raw_out)
+        }):
+            raw_out += chunk
+            
+            # 统计}符号出现次数
+            for char in chunk:
+                if char == "}":
+                    brace_count += 1
+                    
+                    # 每5个}推送一次进度
+                    if brace_count % 5 == 0:
+                        if progress_manager and callback_task_id:
+                            try:
+                                await progress_manager.update_stage_progress(
+                                    callback_task_id=callback_task_id,
+                                    stage_name="AI审查",
+                                    current=0,
+                                    status="processing",
+                                    message=f"编制依据提取中... (已处理 {brace_count} 个结构)",
+                                    overall_task_status="processing",
+                                    event_type="processing"
+                                )
+                            except Exception as e:
+                                logger.error(f"SSE推送进度失败: {e}")
+        
+        logger.info(f"编制依据提取调试:{raw_out}")
 
         # ✅ 最小修改:不再 parser.parse / 不再 _parse_with_retry
         data = extract_first_json(raw_out)
@@ -201,7 +249,11 @@ if __name__ == "__main__":
 (4)近年来,我公司参加类似工程的经验;
 (5)本合同段工程现场踏勘、调查所获得的现场情况、自然环境、人文环境、市场环境等参考资料;
 """
-    result = extract_basis_with_langchain_qwen(demo)
-    print(f"\n提取到 {len(result.items)} 条编制依据:")
-    for idx, item in enumerate(result.items, 1):
-        print(f"\n{idx}. {item.model_dump()}")
+
+    async def _demo_run():
+        result = await extract_basis_with_langchain_qwen(None, None, demo)
+        print(f"\n提取到 {len(result.items)} 条编制依据:")
+        for idx, item in enumerate(result.items, 1):
+            print(f"\n{idx}. {item.model_dump()}")
+
+    asyncio.run(_demo_run())

+ 47 - 24
core/construction_review/workflows/ai_review_workflow.py

@@ -48,6 +48,9 @@ from langgraph.graph import StateGraph, END
 from langgraph.graph.message import add_messages
 from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
 from foundation.observability.logger.loggering import server_logger as logger
+from core.construction_review.component.reviewers.utils.directory_extraction import (
+    extract_basis_with_langchain_qwen,
+)
 from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
 from ..component import AIReviewEngine
 from ..component.reviewers.utils.inter_tool import InterTool
@@ -535,63 +538,83 @@ class AIReviewWorkflow:
             # # 4. 执行编制依据审查
             # #await self.core_fun._send_start_review_progress(state, total_units,'prpe_basis')
             reference_check = "reference_check" in self.task_info.get_review_config_list()
+            timeliness_check = "timeliness_check" in self.task_info.get_review_config_list()
             reference_check_result = None
+            timeliness_check_result = None
+
+            # 统一提取一次编制依据内容(任一审查开启时才提取)
+            basis_content = ""
+            basis_items = None
+            if reference_check or timeliness_check:
+                prep_basis_content = self._extract_prep_basis_content(state["structured_content"])
+                if prep_basis_content:
+                    try:
+                        basis_items = await extract_basis_with_langchain_qwen(
+                            progress_manager=state.get("progress_manager"),
+                            callback_task_id=state.get("callback_task_id"),
+                            text=prep_basis_content,
+                        )
+                        basis_content = "\n".join(
+                            [
+                                item.raw
+                                for item in getattr(basis_items, "items", [])
+                                if getattr(item, "raw", None)
+                            ]
+                        ).strip()
+                        if not basis_content:
+                            basis_content = prep_basis_content
+                        logger.info(
+                            f"编制依据AI提取完成,条数: {len(getattr(basis_items, 'items', []))}"
+                        )
+                    except Exception as e:
+                        logger.error(f"编制依据AI提取失败,回退原文: {e}", exc_info=True)
+                        basis_content = prep_basis_content
+                else:
+                    logger.warning(f"未找到编制依据内容,跳过编制依据审查准备")
+
             logger.info(f"执行编制依据审查")
             if not reference_check:
                 logger.info(f"跳过执行编制依据审查")
             else:
-                # 从结构化内容中提取编制依据文本
-                prep_basis_content = self._extract_prep_basis_content(state["structured_content"])
-                if prep_basis_content:
+                if basis_content:
                     logger.info(f"开始执行编制依据审查")
 
-                    # 准备编制依据审查数据
                     prep_basis_review_data = {
-                        'content': prep_basis_content,
+                        'content': basis_content,
+                        'basis_items': basis_items,
                         'max_concurrent': self.max_concurrent
                     }
 
-                    # 执行编制依据审查
                     reference_check_result = await self.ai_review_engine.reference_basis_reviewer(
                         review_data=prep_basis_review_data,
                         trace_id=state["callback_task_id"],
                         state=state,
                         stage_name="编制依据审查"
                     )
-
-                    # SSE推送已在prep_basis_reviewer.py中的review_all方法中处理
                 else:
                     logger.warning(f"未找到编制依据内容,跳过编制依据审查")
-            # 4. 执行编制依据审查
-            #await self.core_fun._send_start_review_progress(state, total_units,'prpe_basis')
-            timeliness_check = "timeliness_check" in self.task_info.get_review_config_list()
-            timeliness_check_result = None
-            logger.info(f"执行编制依据审查")
+
+            logger.info(f"执行编制依据审查(时效性)")
             if not timeliness_check:
-                logger.info(f"跳过执行编制依据审查")
+                logger.info(f"跳过执行编制依据审查(时效性)")
             else:
-                # 从结构化内容中提取编制依据文本
-                prep_basis_content = self._extract_prep_basis_content(state["structured_content"])
-                if prep_basis_content:
-                    logger.info(f"开始执行编制依据审查")
+                if basis_content:
+                    logger.info(f"开始执行编制依据审查(时效性)")
 
-                    # 准备编制依据审查数据
                     timeliness_check_data = {
-                        'content': prep_basis_content,
+                        'content': basis_content,
+                        'basis_items': basis_items,
                         'max_concurrent': self.max_concurrent
                     }
 
-                    # 执行编制依据审查
                     timeliness_check_result = await self.ai_review_engine.timeliness_basis_reviewer(
                         review_data=timeliness_check_data,
                         trace_id=state["callback_task_id"],
                         state=state,
                         stage_name="编制依据审查"
                     )
-
-                    # SSE推送已在prep_basis_reviewer.py中的review_all方法中处理
                 else:
-                    logger.warning(f"未找到编制依据内容,跳过编制依据审查")
+                    logger.warning(f"未找到编制依据内容,跳过编制依据审查(时效性)")
 
 
             # 6. 汇总结果