ソースを参照

v0.0.7-编写-功能完善
- 完善大纲生成接口字段
- 完善大纲生成SSE消息推送机制

WangXuMing 5 時間 前
コミット
e6050ae1f0
34 ファイル変更578 行追加2140 行削除
  1. 15 3
      core/base/progress_manager.py
  2. 11 12
      core/base/workflow_manager.py
  3. 52 35
      core/construction_review/component/reviewers/reference_basis_reviewer.py
  4. 12 45
      core/construction_review/component/reviewers/semantic_logic.py
  5. 12 45
      core/construction_review/component/reviewers/sensitive_word_check.py
  6. 38 14
      core/construction_review/component/reviewers/timeliness_basis_reviewer.py
  7. 1 1
      core/construction_review/component/reviewers/utils/ac_automaton.py
  8. 14 20
      core/construction_review/component/reviewers/utils/directory_extraction.py
  9. 13 14
      core/construction_review/component/reviewers/utils/punctuation_checker.py
  10. 14 19
      core/construction_review/component/reviewers/utils/reference_matcher.py
  11. 13 16
      core/construction_review/component/reviewers/utils/timeliness_determiner.py
  12. 3 8
      core/construction_review/workflows/ai_review_workflow.py
  13. 3 2
      core/construction_review/workflows/core_functions/ai_review_core_fun.py
  14. 109 3
      core/construction_write/component/outline_generator.py
  15. 0 3
      foundation/ai/__init__.py
  16. 0 11
      foundation/ai/agent/__init__.py
  17. 0 161
      foundation/ai/agent/base_agent.py
  18. 174 40
      foundation/ai/agent/generate/model_generate.py
  19. 0 105
      foundation/ai/agent/generate/test_intent.py
  20. 0 252
      foundation/ai/agent/test_agent.py
  21. 0 21
      foundation/ai/agent/workflow/test_cus_state.py
  22. 0 203
      foundation/ai/agent/workflow/test_workflow_graph.py
  23. 0 119
      foundation/ai/agent/workflow/test_workflow_node.py
  24. 1 6
      foundation/ai/rag/retrieval/entities_enhance.py
  25. 0 2
      foundation/ai/rag/retrieval/retrieval.py
  26. 41 24
      foundation/database/base/vector/milvus_vector.py
  27. 15 3
      foundation/infrastructure/messaging/celery_app.py
  28. 3 2
      foundation/observability/logger/loggering.py
  29. 0 57
      foundation/schemas/test_schemas.py
  30. 4 3
      server/app.py
  31. 0 1
      views/__init__.py
  32. 9 14
      views/construction_review/review_results.py
  33. 21 5
      views/construction_write/outline_views.py
  34. 0 871
      views/test_views.py

+ 15 - 3
core/base/progress_manager.py

@@ -154,7 +154,11 @@ class ProgressManager:
                             3600,  # 1小时过期
                             json.dumps(task_progress)
                         )
-                        logger.debug(f"更新进度到Redis: {callback_task_id}, 进度: {current}%")
+                        actual_current = task_progress.get("current")
+                        if current is not None:
+                            logger.debug(f"更新进度到Redis: {callback_task_id}, 进度: {actual_current}%")
+                        else:
+                            logger.debug(f"更新进度到Redis: {callback_task_id}, 进度保持: {actual_current}% (未传入)")
                     except Exception as sync_e:
                         logger.warning(f"同步Redis操作失败: {callback_task_id}, {sync_e}")
                         # 同步操作也失败时,降级到内存存储
@@ -166,7 +170,11 @@ class ProgressManager:
                     if not hasattr(self, 'current_data'):
                         self.current_data = {}
                     self.current_data[callback_task_id] = task_progress
-                    logger.debug(f"更新进度到内存: {callback_task_id}, 进度: {current}%")
+                    actual_current = task_progress.get("current")
+                    if current is not None:
+                        logger.debug(f"更新进度到内存: {callback_task_id}, 进度: {actual_current}%")
+                    else:
+                        logger.debug(f"更新进度到内存: {callback_task_id}, 进度保持: {actual_current}% (未传入)")
             except Exception as e:
                 logger.error(f"保存进度数据异常: {callback_task_id}, {e}")
                 if not hasattr(self, 'current_data'):
@@ -174,7 +182,11 @@ class ProgressManager:
                 self.current_data[callback_task_id] = task_progress
 
             # 进度已保存到 Redis,SSE 由主进程通过轮询获取
-            logger.debug(f"进度已更新到Redis: {callback_task_id}, current={current}%")
+            actual_current = task_progress.get("current")
+            if current is not None:
+                logger.debug(f"进度已更新到Redis: {callback_task_id}, current={actual_current}%")
+            else:
+                logger.debug(f"进度已更新到Redis: {callback_task_id}, current={actual_current}% (保持)")
 
         except Exception as e:
             logger.error(f"更新阶段进度失败: {str(e)}")

+ 11 - 12
core/base/workflow_manager.py

@@ -884,16 +884,12 @@ class WorkflowManager:
             包含:文档处理结果 + AI审查结果 + 报告生成结果
         """
         try:
-            import json
-            import os
+            from foundation.observability.cachefiles import cache, CacheBaseDir
 
             logger.info(f"开始保存完整结果: {state['callback_task_id']}")
 
-            # 创建 temp 目录
-            temp_dir = os.path.join("temp", "construction_review", "final_result")
-            os.makedirs(temp_dir, exist_ok=True)
-
             # 构建完整结果
+            ai_review_result = state.get("ai_review_result")
             complete_results = {
                 "callback_task_id": state["callback_task_id"],
                 "file_id": state["file_id"],
@@ -902,16 +898,19 @@ class WorkflowManager:
                 "overall_task_status": "processing",  # 此时还在处理中,complete节点才标记completed
                 "stage_status": state["stage_status"],
                 "document_result": state.get("document_result"),
-                "ai_review_result": state.get("ai_review_result"),
-                "issues": state.get("ai_review_result").get("review_results"),
+                "ai_review_result": ai_review_result,
+                "issues": ai_review_result.get("review_results") if ai_review_result else None,
                 "report_result": report_result,
                 "timestamp": datetime.now().isoformat()
             }
 
-            # 保存到文件
-            file_path = os.path.join(temp_dir, f"{state['callback_task_id']}.json")
-            with open(file_path, 'w', encoding='utf-8') as f:
-                json.dump(complete_results, f, ensure_ascii=False, indent=2)
+            # 使用 cache_manager 保存(指定文件名)
+            file_path = cache.save(
+                complete_results,
+                subdir="final_result",
+                filename=f"{state['callback_task_id']}.json",
+                base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW
+            )
 
             logger.info(f"完整结果已保存到: {file_path}")
 

+ 52 - 35
core/construction_review/component/reviewers/reference_basis_reviewer.py

@@ -18,13 +18,20 @@ from core.construction_review.component.reviewers.utils.punctuation_result_proce
 from core.construction_review.component.reviewers.utils.reference_matcher import match_reference_files
 from foundation.observability.logger.loggering import review_logger as logger
 from langchain_core.prompts import ChatPromptTemplate
-from langchain_openai import ChatOpenAI
+from foundation.ai.agent.generate.model_generate import generate_model_client
 
 class BasisSearchEngine:
     """编制依据向量搜索引擎"""
 
+    # 类级别的缓存,避免重复创建 Milvus 实例
+    _vectorstore_cache = {}
+
     def __init__(self):
         self.emdmodel = None
+        self.host = None
+        self.port = None
+        self.user = None
+        self.password = None
         self._initialize()
 
     def _initialize(self):
@@ -43,29 +50,46 @@ class BasisSearchEngine:
         except Exception as e:
             logger.error(f" BasisSearchEngine 初始化失败: {e}")
 
-    def hybrid_search(self, collection_name: str, query_text: str,
-                     top_k: int = 3, ranker_type: str = "weighted",
-                     dense_weight: float = 0.7, sparse_weight: float = 0.3):
-        try:
+    def _get_vectorstore(self, collection_name: str):
+        """获取或创建 Milvus vectorstore 实例(使用缓存)"""
+        cache_key = f"{self.host}:{self.port}:{collection_name}"
 
-            # 连接到现有集合
+        if cache_key not in BasisSearchEngine._vectorstore_cache:
             connection_args = {
                 "uri": f"http://{self.host}:{self.port}",
                 "user": self.user,
                 "db_name": "lq_db"
             }
-
             if self.password:
                 connection_args["password"] = self.password
 
-            vectorstore = Milvus(
-                embedding_function=self.emdmodel,
-                collection_name=collection_name,
-                connection_args=connection_args,
-                consistency_level="Strong",
-                builtin_function=BM25BuiltInFunction(),
-                vector_field=["dense", "sparse"]
-            )
+            # 抑制 AsyncMilvusClient 的警告日志
+            import logging
+            original_level = logging.getLogger('pymilvus').level
+            logging.getLogger('pymilvus').setLevel(logging.ERROR)
+
+            try:
+                vectorstore = Milvus(
+                    embedding_function=self.emdmodel,
+                    collection_name=collection_name,
+                    connection_args=connection_args,
+                    consistency_level="Strong",
+                    builtin_function=BM25BuiltInFunction(),
+                    vector_field=["dense", "sparse"]
+                )
+                BasisSearchEngine._vectorstore_cache[cache_key] = vectorstore
+                logger.info(f"创建并缓存 Milvus 连接: {cache_key}")
+            finally:
+                logging.getLogger('pymilvus').setLevel(original_level)
+
+        return BasisSearchEngine._vectorstore_cache[cache_key]
+
+    def hybrid_search(self, collection_name: str, query_text: str,
+                     top_k: int = 3, ranker_type: str = "weighted",
+                     dense_weight: float = 0.7, sparse_weight: float = 0.3):
+        try:
+            # 使用缓存的 vectorstore
+            vectorstore = self._get_vectorstore(collection_name)
 
             # 执行混合搜索
             if ranker_type == "weighted":
@@ -167,34 +191,27 @@ class LLMReviewClient:
     """LLM审查客户端"""
 
     def __init__(self):
-        # 固定使用Qwen3-30B模型
-        self.llm = ChatOpenAI(
-            model="qwen3-30b",
-            base_url="http://192.168.91.253:8003/v1",
-            api_key="sk-123456",
-            temperature=0.7,
-        )
+        """初始化LLM审查客户端,使用通用模型底座"""
+        self.model_client = generate_model_client
 
     async def review_basis(self, Message: str, trace_id: str = None) -> str:
         try:
-        
-            task_prompt_info = {
-                "task_prompt": Message,
-                "task_name": "规范性引用文件识别与状态判断"
-            }
-            logger.info(f" 模型调用准备阶段: {task_prompt_info}")
-
-            # 直接调用Qwen3-30B
-            prompt_template = task_prompt_info["task_prompt"]
-            messages = prompt_template.format_messages()
-            response = await self.llm.ainvoke(messages)
-            return response.content if hasattr(response, "content") else response
+            logger.info(f" 模型调用准备阶段: trace_id={trace_id}")
+
+            # 使用通用模型底座调用
+            messages = Message.format_messages() if hasattr(Message, 'format_messages') else Message
+            response = await self.model_client.get_model_generate_invoke(
+                trace_id=trace_id or "ref_basis_review",
+                messages=messages if isinstance(messages, list) else None,
+                prompt=messages if isinstance(messages, str) else None,
+                model_name="qwen3_30b"
+            )
+            return response
 
         except Exception as e:
             logger.error(f" 模型调用准备阶段失败: {e}")
             # 返回空JSON数组字符串以防解析崩溃
             return "[]"
-        # ==================== 修复结束 ====================
 
 
 class BasisReviewService:

+ 12 - 45
core/construction_review/component/reviewers/semantic_logic.py

@@ -1,40 +1,23 @@
 """
 语义逻辑审查模块
-使用自定义OpenAI兼容API进行语义逻辑检查
+使用通用模型底座进行语义逻辑检查
 """
 
 import time
 import asyncio
 from typing import Dict, Any
-from openai import AsyncOpenAI
 from core.construction_review.component.reviewers.base_reviewer import ReviewResult
 from core.construction_review.component.reviewers.utils.prompt_loader import prompt_loader
+from foundation.ai.agent.generate.model_generate import generate_model_client
 from foundation.observability.logger.loggering import review_logger as logger
 
 
-# 模型配置信息
-# 暂时写死,未来从配置文件读取
-SEMANTIC_LOGIC_MODEL_CONFIG = {
-    "base_url": "http://192.168.91.253:8003/v1",
-    "api_key": "sk-123456",
-    "model": "qwen3-30b",
-    "temperature": 0.7,
-    "max_tokens": 2000
-}
-
-
 class SemanticLogicReviewer:
     """语义逻辑审查器"""
-    
+
     def __init__(self):
         """初始化语义逻辑审查器"""
-        self.client = AsyncOpenAI(
-            base_url=SEMANTIC_LOGIC_MODEL_CONFIG["base_url"],
-            api_key=SEMANTIC_LOGIC_MODEL_CONFIG["api_key"]
-        )
-        self.model = SEMANTIC_LOGIC_MODEL_CONFIG["model"]
-        self.temperature = SEMANTIC_LOGIC_MODEL_CONFIG["temperature"]
-        self.max_tokens = SEMANTIC_LOGIC_MODEL_CONFIG["max_tokens"]
+        self.model_client = generate_model_client
         
     async def check_semantic_logic(
         self,
@@ -74,32 +57,16 @@ class SemanticLogicReviewer:
             
             # 格式化提示词消息
             messages = prompt_template.format_messages()
-            
-            # 转换为OpenAI API格式
-            api_messages = []
-            for msg in messages:
-                if hasattr(msg, 'type'):
-                    role = msg.type if msg.type in ['system', 'user', 'assistant'] else 'user'
-                else:
-                    role = 'user'
-                api_messages.append({
-                    "role": role,
-                    "content": msg.content
-                })
-            
-            logger.info(f"调用语义逻辑检查模型: {self.model}")
-            
-            # 调用OpenAI兼容API
-            response = await self.client.chat.completions.create(
-                model=self.model,
-                messages=api_messages,
-                temperature=self.temperature,
-                max_tokens=self.max_tokens
+
+            logger.info("调用语义逻辑检查模型")
+
+            # 使用通用模型底座调用
+            model_response = await self.model_client.get_model_generate_invoke(
+                trace_id=trace_id,
+                messages=messages,
+                model_name="qwen3_30b"
             )
             
-            # 提取模型响应
-            model_response = response.choices[0].message.content
-            
             logger.info(f"语义逻辑检查模型响应成功,响应长度: {len(model_response)}")
             
             # 计算执行时间

+ 12 - 45
core/construction_review/component/reviewers/sensitive_word_check.py

@@ -1,40 +1,23 @@
 """
 语法检查模块
-使用自定义OpenAI兼容API进行语法检查
+使用通用模型底座进行语法检查
 """
 
 import time
 import asyncio
 from typing import Dict, Any
-from openai import AsyncOpenAI
 from core.construction_review.component.reviewers.base_reviewer import ReviewResult
 from core.construction_review.component.reviewers.utils.prompt_loader import prompt_loader
+from foundation.ai.agent.generate.model_generate import generate_model_client
 from foundation.observability.logger.loggering import review_logger as logger
 
 
-# 模型配置信息
-# 暂时写死,未来从配置文件读取
-sensitive_word_check_MODEL_CONFIG = {
-    "base_url": "http://192.168.91.253:8003/v1",
-    "api_key": "sk-123456",
-    "model": "qwen3-30b",
-    "temperature": 0.7,
-    "max_tokens": 2000
-}
-
-
 class GrammarCheckReviewer:
     """语法检查审查器"""
-    
+
     def __init__(self):
         """初始化语法检查审查器"""
-        self.client = AsyncOpenAI(
-            base_url=sensitive_word_check_MODEL_CONFIG["base_url"],
-            api_key=sensitive_word_check_MODEL_CONFIG["api_key"]
-        )
-        self.model = sensitive_word_check_MODEL_CONFIG["model"]
-        self.temperature = sensitive_word_check_MODEL_CONFIG["temperature"]
-        self.max_tokens = sensitive_word_check_MODEL_CONFIG["max_tokens"]
+        self.model_client = generate_model_client
         
     async def check_grammar(
         self,
@@ -74,32 +57,16 @@ class GrammarCheckReviewer:
             
             # 格式化提示词消息
             messages = prompt_template.format_messages()
-            
-            # 转换为OpenAI API格式
-            api_messages = []
-            for msg in messages:
-                if hasattr(msg, 'type'):
-                    role = msg.type if msg.type in ['system', 'user', 'assistant'] else 'user'
-                else:
-                    role = 'user'
-                api_messages.append({
-                    "role": role,
-                    "content": msg.content
-                })
-            
-            logger.info(f"调用语法检查模型: {self.model}")
-            
-            # 调用OpenAI兼容API
-            response = await self.client.chat.completions.create(
-                model=self.model,
-                messages=api_messages,
-                temperature=self.temperature,
-                max_tokens=self.max_tokens
+
+            logger.info("调用语法检查模型")
+
+            # 使用通用模型底座调用
+            model_response = await self.model_client.get_model_generate_invoke(
+                trace_id=trace_id,
+                messages=messages,
+                model_name="qwen3_30b"
             )
             
-            # 提取模型响应
-            model_response = response.choices[0].message.content
-            
             logger.info(f"语法检查模型响应成功,响应长度: {len(model_response)}")
             
             # 计算执行时间

+ 38 - 14
core/construction_review/component/reviewers/timeliness_basis_reviewer.py

@@ -67,8 +67,15 @@ class StandardizedResponseProcessor:
 class BasisSearchEngine:
     """编制依据向量搜索引擎"""
 
+    # 类级别的缓存,避免重复创建 Milvus 实例
+    _vectorstore_cache = {}
+
     def __init__(self):
         self.emdmodel = None
+        self.host = None
+        self.port = None
+        self.user = None
+        self.password = None
         self._initialize()
 
     def _initialize(self):
@@ -87,29 +94,46 @@ class BasisSearchEngine:
         except Exception as e:
             logger.error(f" BasisSearchEngine 初始化失败: {e}")
 
-    def hybrid_search(self, collection_name: str, query_text: str,
-                     top_k: int = 3, ranker_type: str = "weighted",
-                     dense_weight: float = 0.7, sparse_weight: float = 0.3):
-        try:
+    def _get_vectorstore(self, collection_name: str):
+        """获取或创建 Milvus vectorstore 实例(使用缓存)"""
+        cache_key = f"{self.host}:{self.port}:{collection_name}"
 
-            # 连接到现有集合
+        if cache_key not in BasisSearchEngine._vectorstore_cache:
             connection_args = {
                 "uri": f"http://{self.host}:{self.port}",
                 "user": self.user,
                 "db_name": "lq_db"
             }
-
             if self.password:
                 connection_args["password"] = self.password
 
-            vectorstore = Milvus(
-                embedding_function=self.emdmodel,
-                collection_name=collection_name,
-                connection_args=connection_args,
-                consistency_level="Strong",
-                builtin_function=BM25BuiltInFunction(),
-                vector_field=["dense", "sparse"]
-            )
+            # 抑制 AsyncMilvusClient 的警告日志
+            import logging
+            original_level = logging.getLogger('pymilvus').level
+            logging.getLogger('pymilvus').setLevel(logging.ERROR)
+
+            try:
+                vectorstore = Milvus(
+                    embedding_function=self.emdmodel,
+                    collection_name=collection_name,
+                    connection_args=connection_args,
+                    consistency_level="Strong",
+                    builtin_function=BM25BuiltInFunction(),
+                    vector_field=["dense", "sparse"]
+                )
+                BasisSearchEngine._vectorstore_cache[cache_key] = vectorstore
+                logger.info(f"创建并缓存 Milvus 连接: {cache_key}")
+            finally:
+                logging.getLogger('pymilvus').setLevel(original_level)
+
+        return BasisSearchEngine._vectorstore_cache[cache_key]
+
+    def hybrid_search(self, collection_name: str, query_text: str,
+                     top_k: int = 3, ranker_type: str = "weighted",
+                     dense_weight: float = 0.7, sparse_weight: float = 0.3):
+        try:
+            # 使用缓存的 vectorstore
+            vectorstore = self._get_vectorstore(collection_name)
 
             # 执行混合搜索
             if ranker_type == "weighted":

+ 1 - 1
core/construction_review/component/reviewers/utils/ac_automaton.py

@@ -208,7 +208,7 @@ class SensitiveWordDetector:
             except Exception as e:
                 logger.error(f"加载 {file_path.name} 时出错: {e}")
         
-        logger.info(f"构建AC自动机失败指针...")
+        logger.info(f"构建AC自动机指针...")
         self.ac_automaton.build_fail_pointer()
         logger.info(f"AC自动机构建完成!总共加载 {total_words} 个敏感词,来自 {file_count} 个文件")
         

+ 14 - 20
core/construction_review/component/reviewers/utils/directory_extraction.py

@@ -18,9 +18,8 @@ from typing import List
 
 from pydantic import BaseModel, Field, ValidationError  # ✅ 最小修改:新增 ValidationError
 from langchain_core.prompts import ChatPromptTemplate
-from langchain_core.output_parsers import PydanticOutputParser, StrOutputParser  # ✅ 最小修改:新增 StrOutputParser
-from langchain_openai import ChatOpenAI  # ✅ 新增:OpenAI兼容的API调用
-
+from langchain_core.output_parsers import PydanticOutputParser
+from foundation.ai.agent.generate.model_generate import generate_model_client
 from foundation.observability.logger.loggering import review_logger as logger
 
 
@@ -155,20 +154,14 @@ async def extract_basis_with_langchain_qwen(progress_manager,callback_task_id:st
     text = text.replace("\r\n", "\n").replace("\r", "\n")
 
     try:
-        #✅ 修改:使用 ChatOpenAI 封装Qwen3-8B流式API
-        llm = ChatOpenAI(
-            model="Qwen3-8B",
-            base_url="http://192.168.91.253:9002/v1",
-            api_key="EMPTY",            # 占位
-            streaming=True,             # ✅ 开启流式
-            temperature=0.7,
-        )
-
         # 创建解析器(✅ 保留:仅用于 format_instructions)
         parser = PydanticOutputParser(pydantic_object=BasisItems)
 
-        # 构建链(✅ 最小修改:加 StrOutputParser,直接拿字符串)
-        chain = prompt | llm | StrOutputParser()
+        # 构建消息
+        messages = prompt.format_messages(
+            input_text=text,
+            format_instructions=parser.get_format_instructions()
+        )
 
         logger.info(f"[编制依据提取] 开始使用 LLM 提取,文本长度: {len(text)}")
 
@@ -190,12 +183,13 @@ async def extract_basis_with_langchain_qwen(progress_manager,callback_task_id:st
         # 流式调用模型,每检测到5个}符号推送一次进度
         raw_out = ""
         brace_count = 0
-        
-        # chain.stream() 返回流式迭代器
-        async for chunk in chain.astream({
-            "input_text": text,
-            "format_instructions": parser.get_format_instructions()
-        }):
+
+        # 使用通用模型底座的流式调用
+        for chunk in generate_model_client.get_model_generate_stream(
+            trace_id=callback_task_id or "directory_extract",
+            messages=messages,
+            model_name="lq_qwen3_8b"
+        ):
             raw_out += chunk
             
             # 统计}符号出现次数

+ 13 - 14
core/construction_review/component/reviewers/utils/punctuation_checker.py

@@ -7,7 +7,7 @@ from typing import List, Optional
 from pydantic import BaseModel, Field, ValidationError
 from langchain_core.prompts import ChatPromptTemplate
 from langchain_core.output_parsers import PydanticOutputParser, StrOutputParser
-from langchain_openai import ChatOpenAI
+from foundation.ai.agent.generate.model_generate import generate_model_client
 
 
 # ===== 1) 定义结构 =====
@@ -89,13 +89,8 @@ prompt = ChatPromptTemplate.from_messages([
     ("human", HUMAN)
 ])
 
-# ===== 5) LLM =====
-llm = ChatOpenAI(
-    model="qwen3-30b",
-    base_url="http://192.168.91.253:8003/v1",
-    api_key="sk-123456",
-    temperature=0.7,
-)
+# ===== 5) LLM Client (通用模型底座) =====
+model_client = generate_model_client
 
 
 # ===== 6) 提取第一个 JSON =====
@@ -215,20 +210,24 @@ async def check_punctuation(items: List[str]) -> str:
     if not llm_inputs:
         return json.dumps(pre_results, ensure_ascii=False, indent=2)
 
-    chain = prompt | llm | StrOutputParser()
     format_instructions = parser.get_format_instructions()
 
-    payload = {
-        "items": json.dumps(llm_inputs, ensure_ascii=False, indent=2),
-        "format_instructions": format_instructions
-    }
+    # 构建消息
+    messages = prompt.format_messages(
+        items=json.dumps(llm_inputs, ensure_ascii=False, indent=2),
+        format_instructions=format_instructions
+    )
 
     last_err = None
 
     llm_result: List[dict] = []
     for _ in range(2):
         try:
-            raw = await chain.ainvoke(payload)
+            raw = await model_client.get_model_generate_invoke(
+                trace_id="punctuation_check",
+                messages=messages,
+                model_name="qwen3_30b"
+            )
             data = extract_first_json(raw)
 
             # 兼容两种格式:带 items 字段或不带 items 字段(单个对象)

+ 14 - 19
core/construction_review/component/reviewers/utils/reference_matcher.py

@@ -7,7 +7,7 @@ from typing import List
 from pydantic import BaseModel, Field, ValidationError
 from langchain_core.prompts import ChatPromptTemplate
 from langchain_core.output_parsers import PydanticOutputParser, StrOutputParser
-from langchain_openai import ChatOpenAI
+from foundation.ai.agent.generate.model_generate import generate_model_client
 
 
 # ===== 1) 定义结构 =====
@@ -88,16 +88,8 @@ prompt = ChatPromptTemplate.from_messages([
     ("human", HUMAN)
 ])
 
-# ===== 5) LLM =====
-# from foundation.ai.models.model_handler import model_handler
-# llm = model_handler.get_model_by_name("qwen3_30b")
-
-llm = ChatOpenAI(
-    model="qwen3-30b",
-    base_url="http://192.168.91.253:8003/v1",
-    api_key="sk-123456",
-    temperature=0.7,
-)
+# ===== 5) LLM Client (通用模型底座) =====
+model_client = generate_model_client
 
 # ===== 6) 提取第一个 JSON =====
 def extract_first_json(text: str) -> dict:
@@ -131,20 +123,23 @@ async def match_reference_files(reference_text: str, review_text: str) -> str:
     Returns:
         匹配结果的JSON字符串
     """
-    chain = prompt | llm | StrOutputParser()
     format_instructions = parser.get_format_instructions()
 
-    payload = {
-        "reference_text": reference_text,
-        "review_text": review_text,
-        "format_instructions": format_instructions
-    }
+    # 构建消息
+    messages = prompt.format_messages(
+        reference_text=reference_text,
+        review_text=review_text,
+        format_instructions=format_instructions
+    )
 
     last_err = None
-
     for _ in range(2):
         try:
-            raw = await chain.ainvoke(payload)
+            raw = await model_client.get_model_generate_invoke(
+                trace_id="reference_match",
+                messages=messages,
+                model_name="qwen3_30b"
+            )
             print(f"[规范匹配] 模型输出: {raw}...")
             data = extract_first_json(raw)
             findings = MatchResults.model_validate(data)

+ 13 - 16
core/construction_review/component/reviewers/utils/timeliness_determiner.py

@@ -7,7 +7,7 @@ from typing import List, Literal
 from pydantic import BaseModel, Field, ValidationError
 from langchain_core.prompts import ChatPromptTemplate
 from langchain_core.output_parsers import PydanticOutputParser, StrOutputParser
-from langchain_openai import ChatOpenAI
+from foundation.ai.agent.generate.model_generate import generate_model_client
 
 
 # ===== 1) 定义结构 =====
@@ -90,15 +90,8 @@ prompt = ChatPromptTemplate.from_messages([
     ("human", HUMAN)
 ])
 
-# ===== 5) LLM =====
-# from foundation.ai.models.model_handler import model_handler
-# llm = model_handler.get_model_by_name("qwen3_30b")
-llm = ChatOpenAI(
-    model="qwen3-30b",
-    base_url="http://192.168.91.253:8003/v1",
-    api_key="sk-123456",
-    temperature=0,
-)
+# ===== 5) LLM Client (通用模型底座) =====
+model_client = generate_model_client
 
 # ===== 6) 提取第一个 JSON =====
 def extract_first_json(text: str) -> dict:
@@ -131,19 +124,23 @@ async def determine_timeliness_issue(match_results: str) -> str:
     Returns:
         时效性判定结果的JSON字符串
     """
-    chain = prompt | llm | StrOutputParser()
     format_instructions = parser.get_format_instructions()
 
-    payload = {
-        "match_results": match_results,
-        "format_instructions": format_instructions
-    }
+    # 构建消息
+    messages = prompt.format_messages(
+        match_results=match_results,
+        format_instructions=format_instructions
+    )
 
     last_err = None
 
     for _ in range(2):
         try:
-            raw = await chain.ainvoke(payload)
+            raw = await model_client.get_model_generate_invoke(
+                trace_id="timeliness_determine",
+                messages=messages,
+                model_name="qwen3_30b"
+            )
             print(f"[时效性判定] 模型输出: {raw}...")
             data = extract_first_json(raw)
             findings = TimelinessResults.model_validate(data)

+ 3 - 8
core/construction_review/workflows/ai_review_workflow.py

@@ -391,18 +391,13 @@ class AIReviewWorkflow:
 
             total_chapters = len(review_item_dict_sorted)
             cache.filtered_chunks(review_item_dict_sorted, base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW)
-            # 如果review_item_dict_sorted中只包含check_completeness,则total_chunks 仅计算chunk中is_complete_field = true的chunk数量
+            # 统计所有 filtered_chunks 作为总块数(与实际处理的块数保持一致)
             all_check_items = []
             for check_list in review_item_dict_sorted.values():
                 all_check_items.extend(check_list)  # 把每个分类的检查项加入总列表
 
-            # 判断:所有检查项是否都只有 "check_completeness"(无其他检查项)
-            if all(item == "check_completeness" for item in all_check_items):
-                # 仅统计 is_complete_field = True 的chunk数量(用生成器表达式省内存)
-                total_chunks = sum(1 for chunk in filtered_chunks if chunk.get("is_complete_field", False))
-            else:
-                # 统计所有 filtered_chunks
-                total_chunks = len(filtered_chunks)
+            # 统计所有 filtered_chunks 作为总块数(与实际处理的块数保持一致)
+            total_chunks = len(filtered_chunks)
 
             # 初始化issues列表
             all_issues = []

+ 3 - 2
core/construction_review/workflows/core_functions/ai_review_core_fun.py

@@ -180,8 +180,9 @@ class AIReviewCoreFun:
             else:
                 logger.warning(f"⚠️ 块{chunk_index}: issues为空,未添加到all_issues")
 
-        logger.info(f"🔍 章节{chapter_code}完成: 总共处理{total_chunks}个块, all_issues最终数量={len(all_issues)}")
-        return total_chunks, all_issues
+        chapter_processed_chunks = len(chapter_content)
+        logger.info(f"🔍 章节{chapter_code}完成: 本章节处理{chapter_processed_chunks}个块, all_issues最终数量={len(all_issues)}")
+        return chapter_processed_chunks, all_issues
 
     def _extract_issues_from_result(self, result: Any) -> List[Dict]:
         """

+ 109 - 3
core/construction_write/component/outline_generator.py

@@ -156,12 +156,32 @@ class OutlineGenerator:
             concurrency_limit = 3  # 默认并发信号数,可根据实际情况调整
             semaphore = asyncio.Semaphore(concurrency_limit)
 
+            # 计算所有叶子节点(末级节点)总数,用于进度计算
+            def count_leaf_nodes(node_list):
+                count = 0
+                for node in node_list:
+                    children = node.get("children", [])
+                    if children:
+                        count += count_leaf_nodes(children)
+                    else:
+                        count += 1
+                return count
+
+            total_leaf_chapters = count_leaf_nodes(filtered_template)
+            logger.info(f"[大纲生成] trace_id: {trace_id}, 叶子节点总数: {total_leaf_chapters}")
+
+            # 使用原子计数器统计已完成叶子节点数(使用列表包装以实现引用传递)
+            completed_count = [0]
+            completed_lock = asyncio.Lock()
+
             async def generate_with_semaphore(idx: int, chapter_node: dict) -> dict:
                 async with semaphore:
                     return await self._generate_single_chapter(
                         trace_id=f"{trace_id}_{idx}",
                         project_info=project_info,
-                        chapter_node=chapter_node
+                        chapter_node=chapter_node,
+                        completed_counter=(completed_count, completed_lock),
+                        total_leaf_chapters=total_leaf_chapters
                     )
 
             # 统一使用异步并发结构,信号数控制实际并发度
@@ -704,7 +724,9 @@ class OutlineGenerator:
         trace_id: str,
         project_info: Dict[str, Any],
         chapter_node: Dict[str, Any],
-        parent_index: str = ""
+        parent_index: str = "",
+        completed_counter: tuple = None,
+        total_leaf_chapters: int = 1
     ) -> Dict[str, Any]:
         """
         生成单个章节(支持内部异步并发生成子章节)
@@ -766,7 +788,9 @@ class OutlineGenerator:
                     trace_id=f"{trace_id}_{idx}",
                     project_info=project_info,
                     chapter_node=child_node,
-                    parent_index=current_index
+                    parent_index=current_index,
+                    completed_counter=completed_counter,
+                    total_leaf_chapters=total_leaf_chapters
                 )
                 for idx, child_node in enumerate(children)
             ]
@@ -799,9 +823,91 @@ class OutlineGenerator:
                 chapter_code=code
             )
 
+            # 末级节点完成时,原子地增加计数并推送进度
+            if completed_counter:
+                completed_count_list, completed_lock = completed_counter
+                async with completed_lock:
+                    completed_count_list[0] += 1
+                    current_completed = completed_count_list[0]
+                await self._send_chapter_progress(
+                    trace_id=trace_id,
+                    chapter_index=current_index,
+                    chapter_title=title,
+                    user_id=project_info.get("user_id"),
+                    completed_count=current_completed,
+                    total_chapters=total_leaf_chapters
+                )
+
         logger.info(f"[章节生成] trace_id: {trace_id}, {current_index} {title} 生成完成")
+
         return result_node
 
+    async def _send_chapter_progress(self, trace_id: str, chapter_index: str, chapter_title: str, user_id: str = None, completed_count: int = 0, total_chapters: int = 1):
+        """
+        发送章节生成进度
+
+        Args:
+            trace_id: 追踪ID(格式: callback_task_id_索引)
+            chapter_index: 章节编号
+            chapter_title: 章节标题
+            user_id: 用户ID
+            completed_count: 已完成章节数
+            total_chapters: 总章节数
+        """
+        try:
+            # 提取 callback_task_id
+            callback_task_id = self._extract_callback_task_id(trace_id)
+
+            progress_manager = ProgressManagerRegistry.get_progress_manager(callback_task_id)
+            logger.info(f"[章节进度调试] progress_manager={progress_manager}, registry_keys={list(ProgressManagerRegistry._registry.keys())}")
+            if progress_manager:
+                logger.info(f"大纲生成单章进度打印: task_id={callback_task_id}, 章节 [{chapter_index}] {chapter_title} 生成完成")
+                # 计算当前进度:基于实际完成数计算,范围 0-100
+                # 公式:(completed_count / total_chapters) * 100
+                current_progress = int((completed_count / max(total_chapters, 1)) * 100)
+                await progress_manager.update_stage_progress(
+                    callback_task_id=callback_task_id,
+                    user_id=user_id,
+                    stage_name="大纲生成",
+                    status="processing",
+                    message=f"正在生成章节 [{chapter_index}] {chapter_title}...",
+                    event_type="outline_chapter_progress",
+                    current=current_progress
+                )
+                logger.info(f"[章节进度] task_id={trace_id}, 章节 [{chapter_index}] {chapter_title} 生成完成, 进度={current_progress}%")
+            else:
+                logger.warning(f"[章节进度] 未找到 ProgressManager: callback_task_id={callback_task_id}")
+        except Exception as e:
+            logger.warning(f"[章节进度] 推送进度失败: {str(e)}")
+
+    def _extract_callback_task_id(self, trace_id: str) -> Optional[str]:
+        """
+        从 trace_id 提取 callback_task_id
+
+        trace_id 格式: callback_task_id_索引1_索引2...
+        例如: outline_abc123_0, outline_abc123_0_1, outline_abc123_0_6_0
+
+        Returns:
+            Optional[str]: callback_task_id 或 None
+        """
+        if not trace_id:
+            return None
+
+        # callback_task_id 格式为: outline_{uuid}
+        # 索引部分从第一个纯数字开始
+        # 例如: outline_7da350bcdd094bb9_0_0 -> outline_7da350bcdd094bb9
+        parts = trace_id.split("_")
+        result_parts = []
+        for part in parts:
+            # 遇到纯数字索引,停止拼接
+            if part.isdigit():
+                break
+            result_parts.append(part)
+
+        if result_parts:
+            return "_".join(result_parts)
+        return trace_id
+
     async def _generate_chapter_content(
         self,
         trace_id: str,

+ 0 - 3
foundation/ai/__init__.py

@@ -5,13 +5,10 @@ AI能力模块
 """
 
 from .models import ModelHandler, get_models
-from .agent import BaseAgent
 
 __all__ = [
     # 模型管理
     "ModelHandler",
     "get_models",
     "BaseApiPlatform",
-    # 智能代理
-    "BaseAgent"
 ]

+ 0 - 11
foundation/ai/agent/__init__.py

@@ -1,11 +0,0 @@
-"""
-智能代理模块
-
-提供AI智能代理的基础能力和工作流功能
-"""
-
-from .base_agent import BaseAgent
-
-__all__ = [
-    "BaseAgent"
-]

+ 0 - 161
foundation/ai/agent/base_agent.py

@@ -1,161 +0,0 @@
-
-# !/usr/bin/python
-# -*- coding: utf-8 -*-
-'''
-@Project    : lq-agent-api
-@File       :base_agent.py
-@IDE        :Cursor
-@Author     : 
-@Date       :2025/7/26 05:00
-'''
-from datetime import datetime
-from io import StringIO
-from contextlib import redirect_stdout
-from typing import Dict, List, Optional
-from foundation.observability.logger.loggering import server_logger
-from foundation.utils.redis_utils import get_redis_result_cache_data_and_delete_key
-
-class BaseAgent:
-    """
-     基础智能助手类
-    """
-
-    def __init__(self):
-        pass
-
-
-    def get_pretty_message_str(self, message) -> str:
-        """安全地捕获 pretty_print() 的输出"""
-        captured_output = StringIO()
-        with redirect_stdout(captured_output):
-            message.pretty_print()
-        return captured_output.getvalue()
-
-    
-    def log_stream_pretty_message(self , trace_id , event):
-        """
-            流式打印agent 整个推理过程 pretty_print() 的输出
-        """
-        event_type = event.get('event', '')
-        name = event.get('name', '')
-        data = event.get('data', {})
-        if event_type not in ['on_chain_start', 'on_chain_end', 'on_tool_start', 'on_tool_end', 'on_chat_model_start']:
-            return 
-        
-        server_logger.info(trace_id=trace_id , msg=f"\n================================= {event_type} ({name}) =================================")
-        if 'messages' in event:
-            for msg in event['messages']:
-                #msg.pretty_print()
-                output = self.get_pretty_message_str(msg)
-                server_logger.info(trace_id=trace_id , msg=f"\n{output}")
-        elif 'chunk' in data:
-            chunk = data['chunk']
-            if hasattr(chunk, 'content') and chunk.content:
-                server_logger.info(trace_id=trace_id , msg=f"Content: {chunk.content}")
-            if hasattr(chunk, 'tool_calls') and chunk.tool_calls:
-                server_logger.info(trace_id=trace_id , msg=f"Tool calls: {chunk.tool_calls}")
-        elif 'output' in data:
-            output = data['output']
-            if hasattr(output, 'pretty_print'):
-                #output.pretty_print()
-                output = self.get_pretty_message_str(output)
-                server_logger.info(trace_id=trace_id , msg=f"\n{output}")
-            else:
-                server_logger.info(trace_id=trace_id , msg=f"Output: {output}")
-
-
-
-    def get_input_context(
-            self,
-            trace_id: str,
-            task_prompt_info: dict,
-            input_query: str,
-            context: Optional[str] = None,
-            supplement_info: Optional[str] = None
-    ) -> tuple[str,str]:
-        """构建场景优化的上下文提示"""
-        context = context or "无相关数据"
-        task_prompt_info_str = task_prompt_info["task_prompt"]
-        
-        # 场景优化的上下文模板
-        context_template = """
-        助手会话 [ID: {trace_id}] 
-        时间: {timestamp}
-        任务: {task_prompt_info_str}
-        
-        用户提供上下文信息:
-        {context}
-        用户输入问题:
-        {input}
-        
-        """
-
-        input_context = context_template.format(
-            trace_id=trace_id,
-            task_prompt_info_str=task_prompt_info_str,
-            context=context,
-            input=input_query,
-            supplement_info=supplement_info,
-            timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-        )
-        
-
-          # 场景优化的上下文模板
-        summary_context_template = """
-        助手会话 [ID: {trace_id}] 
-        上下文信息:
-        {context}
-        用户问题:
-        {input}
-        """
-
-        input_summary_context = summary_context_template.format(
-            trace_id=trace_id,
-            context=context,
-            input=input_query,
-        )
-        return input_context , input_summary_context
-
-
-    def clean_json_output(self, raw_output: str) -> str:
-        """去除开头和结尾的 ```json 和 ```"""
-        cleaned = raw_output.strip()
-        if cleaned.startswith("```json"):
-            cleaned = cleaned[7:]  # 去掉开头的 ```json
-        if cleaned.endswith("```"):
-            cleaned = cleaned[:-3]  # 去掉结尾的 ```
-        return cleaned.strip()
-
-
-    
-    async def get_redis_result_cache_data(self , trace_id: str):
-        """
-            获取redis结果缓存数据
-            @param data_type: 数据类型,
-                基本信息 cattle_info
-                体温信息 cattle_temperature 
-                步数信息 cattle_walk
-                知识库检索溯源信息 retriever_resources
-            @param trace_id: 链路跟踪ID
-        """
-        # 基本信息
-        data_type = "cattle_info"
-        cattle_info = await get_redis_result_cache_data_and_delete_key(data_type=data_type , trace_id=trace_id)
-
-        data_type = "cattle_temperature"
-        cattle_temperature = await get_redis_result_cache_data_and_delete_key(data_type=data_type , trace_id=trace_id)
-
-        data_type = "cattle_walk"
-        cattle_walk = await get_redis_result_cache_data_and_delete_key(data_type=data_type , trace_id=trace_id)
-
-        data_type = "retriever_resources"
-        retriever_resources = await get_redis_result_cache_data_and_delete_key(data_type=data_type , trace_id=trace_id)
-        return {
-            "cattle_info": cattle_info,
-            "cattle_temperature": cattle_temperature,
-            "cattle_walk": cattle_walk,
-            "retriever_resources": retriever_resources
-        }
-
-
-

+ 174 - 40
foundation/ai/agent/generate/model_generate.py

@@ -9,11 +9,12 @@
 '''
 
 from langchain_core.prompts import ChatPromptTemplate
+from langchain_core.messages import BaseMessage, SystemMessage, HumanMessage
 from foundation.ai.models.model_handler import model_handler
 from foundation.observability.logger.loggering import review_logger as logger
 import asyncio
 import time
-from typing import Optional, Callable, Any
+from typing import Optional, Callable, Any, List, Union
 
 class GenerateModelClient:
     """
@@ -63,43 +64,85 @@ class GenerateModelClient:
                 logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {str(e)}, {wait_time}秒后重试...")
                 await asyncio.sleep(wait_time)
 
-    async def get_model_generate_invoke(self, trace_id: str, task_prompt_info: dict, timeout: Optional[int] = None, model_name: Optional[str] = None):
-        """
-            模型非流式生成(异步)
-
-            Args:
-                trace_id: 追踪ID
-                task_prompt_info: 任务提示词信息
-                timeout: 超时时间(可选)
-                model_name: 模型名称(可选),支持动态切换模型
-                          支持的模型:doubao, qwen, deepseek, gemini,
-                                      lq_qwen3_8b, lq_qwen3_8b_lq_lora,
-                                      lq_qwen3_4b, qwen_local_14b
-                          如果为None,则使用默认模型
+    async def get_model_generate_invoke(
+        self,
+        trace_id: str,
+        task_prompt_info: Optional[dict] = None,
+        messages: Optional[List[BaseMessage]] = None,
+        system_prompt: Optional[str] = None,
+        user_prompt: Optional[str] = None,
+        prompt: Optional[str] = None,
+        timeout: Optional[int] = None,
+        model_name: Optional[str] = None
+    ) -> str:
+        """模型非流式生成(异步)
+
+        支持多种调用方式(优先级从高到低):
+        1. messages: 直接传入 LangChain Message 对象列表
+        2. system_prompt + user_prompt: 分别传入系统和用户提示词
+        3. prompt: 传入单条用户提示词字符串
+        4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
+
+        Args:
+            trace_id: 追踪ID
+            task_prompt_info: 任务提示词信息(兼容旧接口),需包含 format_messages() 方法
+            messages: LangChain Message 对象列表(如 [SystemMessage, HumanMessage])
+            system_prompt: 系统提示词字符串
+            user_prompt: 用户提示词字符串
+            prompt: 单条用户提示词字符串(无系统提示时使用)
+            timeout: 超时时间(秒),默认使用构造时的 default_timeout
+            model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
+
+        Returns:
+            str: 模型生成的文本内容
+
+        Raises:
+            ValueError: 参数组合错误
+            TimeoutError: 调用超时
+            Exception: 模型调用异常
+
+        Examples:
+            # 方式1: 使用 Message 列表(推荐)
+            messages = [SystemMessage(content="你是专家"), HumanMessage(content="请分析...")]
+            result = await client.get_model_generate_invoke("trace-001", messages=messages)
+
+            # 方式2: 分别传入系统和用户提示词
+            result = await client.get_model_generate_invoke(
+                "trace-001",
+                system_prompt="你是专家",
+                user_prompt="请分析..."
+            )
+
+            # 方式3: 传入单条提示词
+            result = await client.get_model_generate_invoke("trace-001", prompt="请分析...")
+
+            # 方式4: 兼容旧接口(使用 PromptLoader)
+            task_prompt_info = {"task_prompt": chat_template}
+            result = await client.get_model_generate_invoke("trace-001", task_prompt_info=task_prompt_info)
         """
         start_time = time.time()
-        #current_timeout = int(timeout) or int(self.default_timeout)
         current_timeout = timeout or self.default_timeout
-        try:
-            # 根据model_name选择对应的模型
-            if model_name:
-                llm_to_use = self.model_handler.get_model_by_name(model_name)
-                logger.info(f"[模型调用] 使用指定模型: {model_name}, trace_id: {trace_id}")
-            else:
-                llm_to_use = self.llm
-                logger.info(f"[模型调用] 使用默认模型, trace_id: {trace_id}")
 
-            logger.info(f"[模型调用] 开始处理 trace_id: {trace_id}, 超时配置: {current_timeout}s")
+        try:
+            # 选择模型
+            llm_to_use = self.model_handler.get_model_by_name(model_name) if model_name else self.llm
+            logger.info(f"[模型调用] 使用{'指定' if model_name else '默认'}模型: {model_name or 'default'}, trace_id: {trace_id}")
 
-            prompt_template = task_prompt_info["task_prompt"]
-            messages = prompt_template.format_messages()
+            # 构建消息列表(按优先级)
+            final_messages = self._build_messages(
+                messages=messages,
+                system_prompt=system_prompt,
+                user_prompt=user_prompt,
+                prompt=prompt,
+                task_prompt_info=task_prompt_info
+            )
 
-            async def _invoke_model():
-                loop = asyncio.get_event_loop()
-                return await loop.run_in_executor(None, llm_to_use.invoke, messages)
+            # 定义模型调用函数,使用原生 ainvoke
+            async def _invoke():
+                return await llm_to_use.ainvoke(final_messages)
 
-            # 调用带重试机制的方法,超时控制在重试机制内部处理
-            response = await self._retry_with_backoff(_invoke_model, timeout=current_timeout)
+            # 调用带重试机制
+            response = await self._retry_with_backoff(_invoke, timeout=current_timeout)
 
             elapsed_time = time.time() - start_time
             logger.info(f"[模型调用] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
@@ -108,16 +151,101 @@ class GenerateModelClient:
         except asyncio.TimeoutError:
             elapsed_time = time.time() - start_time
             logger.error(f"[模型调用] 超时 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 超时阈值: {current_timeout}s")
-            raise TimeoutError(f"模型调用超时,trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
+            raise TimeoutError(f"模型调用超时,trace_id: {trace_id}")
 
         except Exception as e:
             elapsed_time = time.time() - start_time
-            logger.error(f"[模型调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误类型: {type(e).__name__}, 错误信息: {str(e)}")
+            logger.error(f"[模型调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
             raise
 
-    def get_model_generate_stream(self, trace_id: str, task_prompt_info: dict, timeout: Optional[int] = None):
+    def _build_messages(
+        self,
+        messages: Optional[List[BaseMessage]] = None,
+        system_prompt: Optional[str] = None,
+        user_prompt: Optional[str] = None,
+        prompt: Optional[str] = None,
+        task_prompt_info: Optional[dict] = None
+    ) -> List[BaseMessage]:
+        """构建消息列表(内部方法)
+
+        优先级:messages > system_prompt+user_prompt > prompt > task_prompt_info
         """
-            模型流式生成(同步生成器)- 带异常处理
+        # 方式1: 直接使用传入的 Message 列表
+        if messages is not None:
+            if not isinstance(messages, list):
+                raise ValueError("messages 必须是列表")
+            if len(messages) == 0:
+                raise ValueError("messages 不能为空列表")
+            logger.debug(f"使用传入的 messages 列表,共 {len(messages)} 条消息")
+            return messages
+
+        # 方式2: system_prompt + user_prompt
+        if system_prompt is not None and user_prompt is not None:
+            logger.debug("使用 system_prompt + user_prompt 构建消息")
+            return [SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)]
+
+        # 方式3: 单独 system_prompt(可能是特殊情况)
+        if system_prompt is not None:
+            logger.debug("使用单独的 system_prompt 构建消息")
+            return [SystemMessage(content=system_prompt)]
+
+        # 方式4: 单条 prompt 字符串
+        if prompt is not None:
+            logger.debug("使用单条 prompt 字符串构建消息")
+            return [HumanMessage(content=prompt)]
+
+        # 方式5: 兼容旧接口 task_prompt_info
+        if task_prompt_info is not None:
+            if "task_prompt" not in task_prompt_info:
+                raise ValueError("task_prompt_info 必须包含 'task_prompt' 键")
+            task_prompt = task_prompt_info["task_prompt"]
+            if hasattr(task_prompt, 'format_messages'):
+                logger.debug("使用 task_prompt_info 中的 ChatPromptTemplate 构建消息")
+                return task_prompt.format_messages()
+            elif isinstance(task_prompt, str):
+                logger.debug("使用 task_prompt_info 中的字符串构建消息")
+                return [HumanMessage(content=task_prompt)]
+            else:
+                raise ValueError(f"task_prompt 类型不支持: {type(task_prompt)}")
+
+        # 没有提供任何有效参数
+        raise ValueError(
+            "必须提供以下参数之一: "
+            "messages, system_prompt+user_prompt, prompt, 或 task_prompt_info"
+        )
+
+    def get_model_generate_stream(
+        self,
+        trace_id: str,
+        task_prompt_info: Optional[dict] = None,
+        messages: Optional[List[BaseMessage]] = None,
+        system_prompt: Optional[str] = None,
+        user_prompt: Optional[str] = None,
+        prompt: Optional[str] = None,
+        timeout: Optional[int] = None
+    ):
+        """模型流式生成(同步生成器)
+
+        支持多种调用方式(优先级从高到低):
+        1. messages: 直接传入 LangChain Message 对象列表
+        2. system_prompt + user_prompt: 分别传入系统和用户提示词
+        3. prompt: 传入单条用户提示词字符串
+        4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
+
+        Args:
+            trace_id: 追踪ID
+            task_prompt_info: 任务提示词信息(兼容旧接口)
+            messages: LangChain Message 对象列表
+            system_prompt: 系统提示词字符串
+            user_prompt: 用户提示词字符串
+            prompt: 单条用户提示词字符串
+            timeout: 超时时间(秒)
+
+        Yields:
+            str: 生成的文本块
+
+        Raises:
+            ValueError: 参数组合错误
         """
         start_time = time.time()
         current_timeout = timeout or self.default_timeout
@@ -125,17 +253,23 @@ class GenerateModelClient:
         try:
             logger.info(f"[模型流式调用] 开始处理 trace_id: {trace_id}, 超时配置: {current_timeout}s")
 
-            prompt_template = task_prompt_info["task_prompt"]
-            messages = prompt_template.format_messages()
+            # 构建消息列表
+            final_messages = self._build_messages(
+                messages=messages,
+                system_prompt=system_prompt,
+                user_prompt=user_prompt,
+                prompt=prompt,
+                task_prompt_info=task_prompt_info
+            )
 
-            response = self.llm.stream(messages)
+            response = self.llm.stream(final_messages)
 
             chunk_count = 0
             for chunk in response:
                 chunk_count += 1
                 if hasattr(chunk, 'content') and chunk.content:
                     yield chunk.content
-                elif chunk:  # 处理直接返回字符串的情况
+                elif chunk:
                     yield chunk
 
             elapsed_time = time.time() - start_time
@@ -143,7 +277,7 @@ class GenerateModelClient:
 
         except Exception as e:
             elapsed_time = time.time() - start_time
-            logger.error(f"[模型流式调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误类型: {type(e).__name__}, 错误信息: {str(e)}")
+            logger.error(f"[模型流式调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
             raise
 
 generate_model_client = GenerateModelClient(default_timeout=15, max_retries=2, backoff_factor=0.5)

+ 0 - 105
foundation/ai/agent/generate/test_intent.py

@@ -1,105 +0,0 @@
-# !/usr/bin/ python
-# -*- coding: utf-8 -*-
-'''
-@Project    : xiwu-agent-api
-@File       :intent.py
-@IDE        :PyCharm
-@Author     :LINGMIN
-@Date       :2025/7/14 12:04
-'''
-
-
-import os
-import sys
-sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
-
-from foundation.observability.logger.loggering import server_logger
-from foundation.ai.models import get_models
-from langchain_core.prompts import SystemMessagePromptTemplate
-from langchain_core.prompts import HumanMessagePromptTemplate
-from langchain_core.prompts import ChatPromptTemplate
-from langchain_core.prompts import FewShotChatMessagePromptTemplate
-from foundation.utils import yaml_utils
-from foundation.infrastructure.config import config_handler
-
-
-class TestIntentIdentifyClient:
-
-    def __init__(self):
-        """
-            创建意图识别类
-        """
-          # 获取部署的模型列表
-        llm, chat, embed = get_models()
-        self.llm_recognition = chat
-        # 加载 意图识别系统配置信息
-        self.intent_prompt = yaml_utils.get_intent_prompt()
-
-    def recognize_intent(self , trace_id: str , config: dict , input: str):
-        """
-        意图识别
-        输入:用户输入的问题
-        输出:识别出的意图,可选项:
-        """
-        session_id = config["session_id"]
-        history = "无"
-        # 根据历史记录和用户问题进行识别意图
-        return self.recognize_intent_history(input=input , history=history)
-
-
-    def recognize_intent_history(self , input: str , history="无"):
-        """
-        意图识别
-        输入:用户输入的问题
-        输出:识别出的意图,可选项:
-        """
-        # 准备few-shot样例
-        examples = self.intent_prompt["intent_examples"]
-        #server_logger.info(f"加载prompt配置.examples: {examples}")
-        system_prompt = self.intent_prompt["system_prompt"]
-        system_prompt = system_prompt.format(history=history)
-        server_logger.info(f"增加用户历史记录,用于意图识别,prompt配置.system_prompt: {system_prompt}")
-
-        # 定义样本模板
-        examples_prompt = ChatPromptTemplate.from_messages(
-            [
-                ("human", "{inn}"),
-                ("ai", "{out}"),
-            ]
-        )
-        few_shot_prompt = FewShotChatMessagePromptTemplate(example_prompt=examples_prompt,
-                                                           examples=examples)
-        final_prompt = ChatPromptTemplate.from_messages(
-            [
-                ('system', system_prompt),
-                few_shot_prompt,
-                ('human', '{input}'),
-            ]
-        )
-
-        chain = final_prompt | self.llm_recognition
-        server_logger.info(f"意图识别输入input: {input}")
-        result = chain.invoke(input={"input": input})
-        # 容错处理
-        if hasattr(result, 'content'):
-            # 如果 result 有 content 属性,使用它
-            return result.content
-        else:
-            # 否则,直接返回 result
-            return result
-
-
-
-
-
-intent_identify_client = TestIntentIdentifyClient()
-
-
-if __name__ == '__main__':
-   
-    input = "你好"
-    input = "查询课程"
-    input = "操作"
-    result = intent_identify_client.recognize_intent_history(history="" , input=input)
-    server_logger.info(f"result={result}")
-    

+ 0 - 252
foundation/ai/agent/test_agent.py

@@ -1,252 +0,0 @@
-# !/usr/bin/python
-# -*- coding: utf-8 -*-
-'''
-@Project    : lq-agent-api
-@File       :agent_mcp.py
-@IDE        :PyCharm
-@Author     :
-@Date       :2025/7/21 10:12
-'''
-import json
-
-from langgraph.prebuilt import create_react_agent
-from sqlalchemy.sql.functions import user
-from foundation.observability.logger.loggering import server_logger
-from foundation.utils.common import handler_err
-from foundation.ai.models import get_models
-from foundation.utils.yaml_utils import get_system_prompt_config
-
-import threading
-import time
-from typing import Dict, List, Optional, AsyncGenerator, Any, OrderedDict
-from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
-from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
-from langchain_core.runnables import RunnableConfig
-from foundation.ai.agent.base_agent import BaseAgent
-from foundation.schemas.test_schemas import TestForm
-# from foundation.agent.function.test_funciton import test_funtion
-
-
-class TestAgentClient(BaseAgent):
-    """
-    Xiwuzc 智能助手+MCP(带完整会话管理) - 针对场景优化
-    添加会话锁定机制,确保同一时间只有一个客户端可以使用特定会话
-    """
-    # 单例实例和线程锁
-    _instance = None
-    _singleton_lock = threading.Lock()
-
-    def __new__(cls):
-        """线程安全的单例模式实现"""
-        if cls._instance is None:
-            with cls._singleton_lock:
-                if cls._instance is None:
-                    cls._instance = super().__new__(cls)
-                    cls._instance._initialize()
-        return cls._instance
-
-    def _initialize(self):
-        """初始化模型和会话管理"""
-        llm, chat, embed = get_models()
-        self.llm = llm
-        self.chat = chat
-        self.embed = embed
-        self.agent_executor = None
-        self.initialized = False
-        self.psutil_available = True
-
-        # 固定系统提示词
-        self.system_prompt = get_system_prompt_config()["system_prompt"]
-
-        # 清理任务
-        self.cleanup_task = None
-        server_logger.info(" client initialized")
-
-    async def init_agent(self):
-        """初始化agent_executor(只需一次)"""
-        if self.initialized:
-            return
-
-        # 获取部署的模型列表
-        server_logger.info(f"系统提示词 system_prompt:{self.system_prompt}")
-
-        # 创建提示词模板 - 使用固定的系统提示词
-        prompt = ChatPromptTemplate.from_messages([
-            ("system", self.system_prompt),
-            MessagesPlaceholder(variable_name="messages"),
-            ("placeholder", "{agent_scratchpad}")
-        ])
-
-        # # 创建Agent - 不再使用MemorySaver
-        # self.agent_executor = create_react_agent(
-        #     self.llm,
-        #     tools=[test_funtion.query_info , test_funtion.execute , test_funtion.handle] ,  # 专用工具集 + 私有知识库检索工具
-        #     prompt=prompt
-        # )
-        self.initialized = True
-        server_logger.info(" agent initialized")
-
-
-    async def handle_query(self, trace_id: str, task_prompt_info: dict, input_query, context=None,
-                            config_param: TestForm = None):
-        try:
-            # 确保agent已初始化
-            if not self.initialized:
-                await self.init_agent()
-            
-            session_id = config_param.session_id
-           
-
-            try:
-                # 构建输入消息
-                input_message , input_summary_context = self.get_input_context(
-                    trace_id=trace_id,
-                    task_prompt_info=task_prompt_info,
-                    input_query=input_query,
-                    context=context
-                )
-                # 用于模型对话使用
-                input_human_message = HumanMessage(content=input_message)
-                # 用于对话历史记录摘要 
-                input_human_summary_message = HumanMessage(content=input_summary_context)
-                # 获取历史消息
-                history_messages = []
-                # 构造完整的消息列表
-                all_messages = list(history_messages) + [input_human_message]
-
-                # 配置执行上下文
-                config = RunnableConfig(
-                    configurable={"thread_id": session_id},
-                    runnable_kwargs={"recursion_limit": 15}
-                )
-
-                # 执行智能体
-                events = self.agent_executor.astream(
-                    {"messages": all_messages},
-                    config=config,
-                    stream_mode="values"
-                )
-
-                # 处理结果
-                full_response = []
-                async for event in events:
-                    if isinstance(event["messages"][-1], AIMessage):
-                        chunk = event["messages"][-1].content
-                        full_response.append(chunk)
-                    log_content = self.get_pretty_message_str(event["messages"][-1])
-                    server_logger.info("\n" + log_content.strip(), trace_id=trace_id)
-
-                if full_response:
-                    full_text = "".join(full_response)
-                    server_logger.info(trace_id=trace_id, msg=f"full_response: {full_text}")
-                    full_text = self.clean_json_output(full_text)
-                    return full_text
-            finally:
-                # 确保释放会话锁
-                pass
-        except PermissionError as e:
-            # 处理会话被其他设备锁定的情况
-            return str(e)
-        except Exception as e:
-            handler_err(server_logger, trace_id=trace_id, err=e, err_name='agent/chat')
-            return f"系统错误: {str(e)}"
-
-
-    async def handle_query_stream(
-            self,
-            trace_id: str,
-            task_prompt_info: dict,
-            input_query: str,
-            context: Optional[str] = None,
-            header_info: Optional[Dict] = None,
-            config_param: TestForm = None,
-    ) -> AsyncGenerator[str, None]:
-        """流式处理查询(优化缓冲管理)"""
-        try:
-            # 确保agent已初始化
-            if not self.initialized:
-                await self.init_agent()
-            
-            session_id = config_param.session_id
-        
-            try:
-                # 构建输入消息
-                input_message , input_summary_context = self.get_input_context(
-                    trace_id=trace_id,
-                    task_prompt_info=task_prompt_info,
-                    input_query=input_query,
-                    context=context
-                )
-                server_logger.info(trace_id=trace_id, msg=f"input_context: {input_message}")
-                # 用于模型对话使用
-                input_human_message = HumanMessage(content=input_message)
-                # 用于对话历史记录摘要 
-                input_human_summary_message = HumanMessage(content=input_summary_context)
-                 # 获取历史消息
-                history_messages = []
-                # 构造完整的消息列表
-                all_messages = list(history_messages) + [input_human_message]
-                # 配置执行上下文
-                config = RunnableConfig(
-                    configurable={"thread_id": session_id},
-                    runnable_kwargs={"recursion_limit": 15}
-                )
-
-                # 流式执行
-                events = self.agent_executor.astream_events(
-                    {"messages": all_messages},
-                    config=config,
-                    stream_mode="values"
-                )
-
-                full_response = []
-                buffer = []
-                last_flush_time = time.time()
-
-                # 流式处理事件
-                async for event in events:
-                    # 只在特定事件类型时打印日志
-                    self.log_stream_pretty_message(trace_id=trace_id, event=event)
-                   
-                    if 'chunk' in event['data'] and "on_chat_model_stream" in event['event']:
-                        chunk = event['data']['chunk'].content
-                        full_response.append(chunk)
-
-                        # 缓冲管理策略
-                        buffer.append(chunk)
-                        current_time = time.time()
-
-                        # 满足以下任一条件即刷新缓冲区
-                        if (len(buffer) >= 3 or  # 达到最小块数
-                                (current_time - last_flush_time) > 0.5 or  # 超时
-                                any(chunk.endswith((c, f"{c} ")) for c in
-                                    ['.', '。', '!', '?', '\n', ';', ';'])):  # 自然断点
-
-                            # 合并并发送缓冲内容
-                            combined = ''.join(buffer)
-                            yield combined
-
-                            # 重置缓冲
-                            buffer.clear()
-                            last_flush_time = current_time
-
-                # 处理剩余内容
-                if buffer:
-                    yield ''.join(buffer)
-
-                # 将完整响应添加到历史并进行压缩
-                if full_response:
-                    full_text = "".join(full_response)
-                    server_logger.info(trace_id=trace_id, msg=f"full_response: {full_text}")
-            finally:
-                # 确保释放会话锁
-                pass
-
-        except PermissionError as e:
-            yield json.dumps({"error": str(e)})
-        except Exception as e:
-            handler_err(server_logger, trace_id=trace_id, err=e, err_name='test_stream')
-            yield json.dumps({"error": f"系统错误: {str(e)}"})
-
-
-test_agent_client = TestAgentClient()

+ 0 - 21
foundation/ai/agent/workflow/test_cus_state.py

@@ -1,21 +0,0 @@
-
-from itertools import count
-from langgraph.graph import MessagesState
-
-
-
-
-class TestCusState(MessagesState):
-    """
-     第二步:定义状态结构
-    """
-    route_next: str                                  # 下一个节点  
-    
-    session_id: str                                  # 会话id  
-    trace_id: str                                    # 日志链路跟踪id
-    user_input: str                                  # 用户输入问题    
-    context: str                                     # 上下文数据
-    task_prompt_info: str                            # 任务提示
-
-
-

+ 0 - 203
foundation/ai/agent/workflow/test_workflow_graph.py

@@ -1,203 +0,0 @@
-
-# !/usr/bin/python
-# -*- coding: utf-8 -*-
-'''
-@Project    : 
-@File       :workflow_graph.py
-@IDE        :Cursor
-@Author     :LINGMIN
-@Date       :2025/08/10 18:00
-'''
-
-from foundation.ai.agent.workflow.test_cus_state import TestCusState
-from foundation.ai.agent.workflow.test_workflow_node import TestWorkflowNode
-from langgraph.graph import START, StateGraph, END
-from langgraph.checkpoint.memory import MemorySaver
-from foundation.observability.logger.loggering import server_logger
-from typing import AsyncGenerator
-import time
-from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
-from foundation.utils.common import return_json, handler_err
-import json
-from foundation.schemas.test_schemas import TestForm
-
-
-class TestWorkflowGraph:
-    """
-        工作流图
-    """
-    def __init__(self):
-        self.workflow_node = TestWorkflowNode()
-        self.checkpoint_saver = MemorySaver()
-        self.app = self.init_workflow_graph()
-        # 将生成的图片保存到文件(移除自动生成,避免启动失败)
-        # self.write_graph()
-
-
-
-
-    def init_workflow_graph(self):
-        """
-            初始化工作流图
-            使用 graph.get_state 和 get_state_history 检查状态。
-            启用 debug=True 查看详细日志。
-            使用 graph.get_graph().to_dot() 可视化状态图。
-        """
-        # 构建工作流图  创建状态图 , state_update_method="merge"
-        workflow = StateGraph(TestCusState)
-
-
-        ######分支2、代理Agent  supervisor_agent ##################################    
-        # 节点:  代理 agent 节点
-        workflow.add_node("supervisor_agent", self.workflow_node.supervisor_agent)
-        # agent节点1: 纯生成类问题
-        workflow.add_node("chat_box_generate", self.workflow_node.chat_box_generate)
-        # agent节点2:
-        workflow.add_node("common_agent", self.workflow_node.common_agent_node)
-
-
-        ###### 节点分支线条 ##################################    
-        # 固定问题识别
-        workflow.add_edge(START, "supervisor_agent")  
-        # 在图状态中填充 ‘next’字段,路由到具体的某个节点或结束图的运行,从来指定如何执行接下来的任务。
-        workflow.add_conditional_edges(source="supervisor_agent", 
-                path=lambda state: state["route_next"],
-                # 显式映射每个返回值到目标节点
-                path_map={
-                    "chat_box_generate": "chat_box_generate",
-                    "common_agent": "common_agent",
-                
-                }
-        )
-
-        supervisor_members_list = ["chat_box_generate" , "common_agent"] 
-
-         # 每个子代理 在完成后总是向主管 “汇报”
-        for agent_member in supervisor_members_list:
-            workflow.add_edge(agent_member, END) # 直接结束
-            #workflow.add_edge(agent_member, "supervisor_agent") # 回到路由 继续 判断执行
-
-       
-        #编译图
-        app = workflow.compile(checkpointer=self.checkpoint_saver)
-        #print(app.get_graph().draw_ascii())
-        server_logger.info(f"【图工作流构建完成】app={app}")
-        return app
-
-
-
-
-    async def handle_query_stream(self, param: TestForm, trace_id: str)-> AsyncGenerator[str, None]:
-        """
-        根据场景获取智能体反馈 (SSE流式响应)
-        """
-        try:
-
-            # 提取参数
-            user_input = param.input
-            session_id = param.config.session_id
-            context = param.context
-
-            
-            human_messages = [HumanMessage(content=user_input)]
-            # 完整的初始状态
-            initial_state = {
-                "messages": human_messages,
-                "session_id": session_id,                                # 会话id  
-                "trace_id": trace_id,                                  # 日志链路跟踪id
-                "task_prompt_info": {},                                    
-                "context": context ,                                    # 上下文数据
-                "user_input": user_input,
-            }
-            # 唯一的任务 ID(模拟 session_id / thread_id)
-            config = {"configurable": {"thread_id": session_id},
-                    "runnable_kwargs":{"recursion_limit": 50}
-            }
-            server_logger.info("======================== 启动新任务 ===========================")  #, interrupt_before=["user_confirm_task_planning"]
-
-            full_response = []
-            buffer = []
-            last_flush_time = time.time()
-            events = self.app.astream_events(initial_state, 
-                        config=config , 
-                        version="v1",  # 确保使用正确版本
-                        stream_mode="values"  # 或者 "updates"
-            )
-            # 流式处理事件
-            async for event in events:
-                #server_logger.info(trace_id=trace_id, msg=f"→ 事件类型: {event['event']}")
-                #server_logger.info(trace_id=trace_id, msg=f"→ 事件数据: {event['data']}")
-                
-                # 处理聊天模型流式输出
-                if event['event'] == 'on_chat_model_stream':
-                    if 'chunk' in event['data']:
-                        chunk = event['data']['chunk']
-                        if hasattr(chunk, 'content'):
-                            content = chunk.content
-                            full_response.append(content)
-                            
-                            # 缓冲管理策略
-                            buffer.append(content)
-                            current_time = time.time()
-                            
-                            # 刷新条件
-                            should_flush = (
-                                len(buffer) >= 3 or  # 达到最小块数
-                                (current_time - last_flush_time) > 0.5 or  # 超时
-                                any(content.endswith(('.', '。', '!', '?', '\n', ';', ';', '?', '!')) for content in buffer)  # 自然断点
-                            )
-                            
-                            if should_flush:
-                                combined = ''.join(buffer)
-                                yield combined
-                                
-                                buffer.clear()
-                                last_flush_time = current_time
-                
-                # 也可以处理其他类型的事件
-                # elif event['event'] == 'on_chain_stream':
-                #     server_logger.info(trace_id=trace_id, msg=f"链式处理: {event['data']}")
-                
-                # elif event['event'] == 'on_tool_stream':
-                #     server_logger.info(trace_id=trace_id, msg=f"工具调用: {event['data']}")
-            
-            # 处理剩余缓冲内容
-            if buffer:
-                yield ''.join(buffer)
-            
-            # 将完整响应添加到历史并进行压缩
-            if full_response:
-                full_text = "".join(full_response)
-                server_logger.info(trace_id=trace_id, msg=f"full_response: {full_text}", log_type="graph/stream")
-            
-        except Exception as e:
-            handler_err(server_logger, trace_id=trace_id, err=e, err_name='graph/stream')
-            yield json.dumps({"error": f"系统错误: {str(e)}"})
-
-
-
-
-    def write_graph(self, max_retries=3, retry_delay=2.0):
-        """
-            将图写入文件(包含错误处理和重试机制)
-        """
-        try:
-            # 使用增加的重试设置
-            graph_png = self.app.get_graph().draw_mermaid_png(
-                max_retries=max_retries,
-                retry_delay=retry_delay
-            )
-            with open("build_graph_app.png", "wb") as f:
-                f.write(graph_png)
-            server_logger.info(f"【图工作流写入文件完成】")
-            return True
-        except Exception as e:
-            server_logger.warning(f"【图工作流生成失败】{str(e)}")
-            server_logger.info("【图工作流】可以使用其他方式生成图表,如:")
-            server_logger.info("1. 使用本地图表渲染器:draw_mermaid_png(draw_method=MermaidDrawMethod.PYPPETEER)")
-            server_logger.info("2. 检查网络连接后重试")
-            return False
-
-
-# 实例化
-test_workflow_graph = TestWorkflowGraph()

+ 0 - 119
foundation/ai/agent/workflow/test_workflow_node.py

@@ -1,119 +0,0 @@
-
-
-# !/usr/bin/python
-# -*- coding: utf-8 -*-
-'''
-@Project    : 
-@File       :workflow_node.py
-@IDE        :Cursor
-@Author     :LINGMIN
-@Date       :2025/08/10 18:00
-'''
-
-
-import json
-import sys
-from foundation.observability.logger.loggering import server_logger
-from foundation.utils.common import handler_err
-from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
-from langchain_core.prompts import ChatPromptTemplate
-from foundation.ai.agent.workflow.test_cus_state import TestCusState
-from foundation.ai.agent.generate.test_intent import intent_identify_client
-from foundation.ai.agent.test_agent import test_agent_client
-from foundation.schemas.test_schemas import TestForm
-from foundation.ai.agent.generate.model_generate import generate_model_client
-from foundation.utils.yaml_utils import get_system_prompt_config
-
-
-
-class TestWorkflowNode:
-    """
-        工作流节点定义
-    """
-    def __init__(self):
-        """初始化模型和会话管理"""
-
-    
-
-    def supervisor_agent(self , state: TestCusState):
-        """
-            每个代理都与一个 Supervisor 代理通信(主管代理)。由  Supervisor 代理决定接下来应调用哪个代理
-            :param state:
-            :return:
-        """
-        session_id = state["session_id"]
-        trace_id = state["trace_id"]
-        user_input = state["user_input"]
-        route_next = state.get("route_next")
-        
-        server_logger.info(trace_id=trace_id, msg=f"\n===================================[Supervisor].begin-route_next:{route_next}=============================")
-        
-        config = {
-            "session_id": session_id
-        }
-        # 格式化输出,智能格式化输出
-        route_next = intent_identify_client.recognize_intent(trace_id=trace_id , config=config , input=user_input)
-        server_logger.info(trace_id=trace_id, msg=f"[Supervisor].intent_identify_client.recognize_intent:{route_next}")
-        if route_next not in ["chat_box_generate" , "common_agent"]:
-            route_next = "chat_box_generate"
-
-        
-        server_logger.info(trace_id=trace_id, msg=f"\n===================================[Supervisor].end-route_next:{route_next}=============================")
-        return {
-            "route_next": route_next
-        }
-
-
-
-    async def common_agent_node(self , state: TestCusState):
-        """
-            通用代理节点
-            :param state:
-            :return:
-        """
-        session_id = state["session_id"]
-        trace_id = state["trace_id"]
-        user_input = state["user_input"]
-        config_param = TestForm(session_id=session_id)
-        task_prompt_info = {"task_prompt": ""}
-        response_content = await test_agent_client.handle_query(trace_id=trace_id , config_param=config_param, 
-                                                                task_prompt_info=task_prompt_info, 
-                                                                input_query=user_input, context=None)
-        messages = [AIMessage(content=response_content, name="common_agent_node")]
-        return {
-            "messages": messages,
-            "previous_agent": "common_agent",
-            "route_next": "FINISH"   # ✅ 直接结束流程
-        }
-    
-
-    async def chat_box_generate(self , state: TestCusState) -> dict:
-        """
-            模型生成节点(纯生成类问题)
-            :param state:
-            :return:
-        """
-        session_id = state["session_id"]
-        trace_id = state["trace_id"]
-        user_input = state["user_input"]
-        task_prompt_info = state["task_prompt_info"]
-        task_prompt_info["task_prompt"] = ""
-
-      # 创建ChatPromptTemplate
-        template = ChatPromptTemplate.from_messages([
-            ("system", get_system_prompt_config()['system_prompt']),
-            ("user", user_input)
-        ])
-
-        task_prompt_info = {"task_prompt": template}
-
-        response_content = await generate_model_client.get_model_generate_invoke(trace_id=trace_id , task_prompt_info=task_prompt_info)
-        messages = [AIMessage(content=response_content , name="chat_box_generate")]
-        server_logger.info(trace_id=trace_id, msg=f"【result】: {response_content}", log_type="chat_box_generate")
-        return {
-            "messages": messages,
-            "route_next": "FINISH"   # ✅ 直接结束流程
-        }
-
-
-

+ 1 - 6
foundation/ai/rag/retrieval/entities_enhance.py

@@ -9,7 +9,6 @@ from foundation.observability.logger.loggering import server_logger
 class EntitiesEnhance():
 
     def __init__(self):
-        self.save_path = "temp\entity_bfp_recall\entity_bfp_recall.json"
         self.bfp_result_lists = []
     @track_execution_time
     def entities_enhance_retrieval(self, query_pairs):
@@ -48,13 +47,9 @@ class EntitiesEnhance():
 
             self.bfp_result_lists.append(bfp_result)
 
-        self.test_file(self.bfp_result_lists, seve=True)
         return self.bfp_result_lists
             
 
-    def test_file(self,bfp_result,seve = False):
-        if seve:
-            with open(self.save_path, "w", encoding="utf-8") as f:
-                json.dump(bfp_result, f, ensure_ascii=False, indent=4)
+
 
 entity_enhance = EntitiesEnhance()

+ 0 - 2
foundation/ai/rag/retrieval/retrieval.py

@@ -490,8 +490,6 @@ class RetrievalManager:
                     'hybrid_similarity': hybrid_similarity
                 })
 
-                # 输出双重评分信息
-                # self.logger.info(f"重排序评分 #{i+1}: 标题='{title}' | 混合搜索相似度={hybrid_similarity:.4f} | BGE重排序评分={rerank_score:.6f}")
 
             return scored_docs
 

+ 41 - 24
foundation/database/base/vector/milvus_vector.py

@@ -86,20 +86,28 @@ class MilvusVectorManager(BaseVectorDB):
             "first_bfp_collection_test"
         ]
 
-        for collection_name in common_collections:
-            try:
-                _get_logger().info(f"预创建vectorstore连接: {collection_name}")
-                self._vectorstore_cache[collection_name] = Milvus(
-                    embedding_function=self.emdmodel,
-                    collection_name=collection_name,
-                    connection_args=self.connection_args,
-                    consistency_level="Strong",
-                    builtin_function=BM25BuiltInFunction(),
-                    vector_field=["dense", "sparse"]
-                )
-                _get_logger().info(f"成功预创建连接: {collection_name}")
-            except Exception as e:
-                _get_logger().error(f"预创建连接失败 {collection_name}: {e}")
+        # 抑制 AsyncMilvusClient 的警告日志
+        import logging
+        original_level = logging.getLogger('pymilvus').level
+        logging.getLogger('pymilvus').setLevel(logging.ERROR)
+
+        try:
+            for collection_name in common_collections:
+                try:
+                    _get_logger().info(f"预创建vectorstore连接: {collection_name}")
+                    self._vectorstore_cache[collection_name] = Milvus(
+                        embedding_function=self.emdmodel,
+                        collection_name=collection_name,
+                        connection_args=self.connection_args,
+                        consistency_level="Strong",
+                        builtin_function=BM25BuiltInFunction(),
+                        vector_field=["dense", "sparse"]
+                    )
+                    _get_logger().info(f"成功预创建连接: {collection_name}")
+                except Exception as e:
+                    _get_logger().error(f"预创建连接失败 {collection_name}: {e}")
+        finally:
+            logging.getLogger('pymilvus').setLevel(original_level)
 
     def text_to_vector(self, text: str) -> List[float]:
         """
@@ -460,16 +468,25 @@ class MilvusVectorManager(BaseVectorDB):
             else:
                 # 如果缓存中没有,创建新连接(降级方案)
                 _get_logger().warning(f"缓存中未找到连接: {collection_name},创建新连接")
-                vectorstore = Milvus(
-                    embedding_function=self.emdmodel,
-                    collection_name=collection_name,
-                    connection_args=self.connection_args,
-                    consistency_level="Strong",
-                    builtin_function=BM25BuiltInFunction(),
-                    vector_field=["dense", "sparse"]
-                )
-                # 缓存新创建的连接
-                self._vectorstore_cache[collection_name] = vectorstore
+
+                # 抑制 AsyncMilvusClient 的警告日志
+                import logging
+                original_level = logging.getLogger('pymilvus').level
+                logging.getLogger('pymilvus').setLevel(logging.ERROR)
+
+                try:
+                    vectorstore = Milvus(
+                        embedding_function=self.emdmodel,
+                        collection_name=collection_name,
+                        connection_args=self.connection_args,
+                        consistency_level="Strong",
+                        builtin_function=BM25BuiltInFunction(),
+                        vector_field=["dense", "sparse"]
+                    )
+                    # 缓存新创建的连接
+                    self._vectorstore_cache[collection_name] = vectorstore
+                finally:
+                    logging.getLogger('pymilvus').setLevel(original_level)
             _get_logger().info(f"混合召回topk: {top_k}")
             # 执行混合搜索,使用 similarity_search_with_score 获取评分
             if ranker_type == "weighted":

+ 15 - 3
foundation/infrastructure/messaging/celery_app.py

@@ -1,9 +1,19 @@
 """
 Celery应用配置
 负责任务队列管理,不涉及具体业务逻辑
+
+平台适配说明:
+- Windows开发环境: 使用 solo 池(避免 BrokenPipeError)
+- Linux生产环境: 使用 prefork 池(多进程高性能)
 """
 
 import os
+import sys
+import logging
+
+# 抑制 pymilvus 的 AsyncMilvusClient 警告(在多进程环境中没有事件循环)
+logging.getLogger('pymilvus').setLevel(logging.ERROR)
+
 from celery import Celery
 from foundation.infrastructure.config.config import config_handler
 
@@ -43,9 +53,11 @@ app.conf.update(
     worker_prefetch_multiplier=1,  # 每个worker一次只取一个任务
     task_acks_late=True,           # 任务完成后再确认
 
-    # 并发控制 - 至少4个worker进程以保证审查和编写任务并行执行
-    worker_concurrency=4,          # 每个worker进程数(保证审查和编写任务并行)
-    worker_pool='prefork',        # 使用prefork模式支持多进程并发
+    # 并发控制 - 根据平台自动适配
+    # Windows开发环境: 使用 solo 池(单进程,避免 BrokenPipeError)
+    # Linux生产环境: 使用 prefork 池(多进程高性能)
+    worker_pool='solo' if sys.platform == 'win32' else 'prefork',
+    worker_concurrency=1 if sys.platform == 'win32' else 4,
 
     # 网络和连接配置 - 防止30分钟断连
     broker_connection_timeout=30,      # 连接超时30秒

+ 3 - 2
foundation/observability/logger/loggering.py

@@ -158,6 +158,9 @@ server_logger = CompatibleLogger(
 # 添加trace_filter到logger,自动注入system_trace_id
 server_logger.addFilter(trace_filter)
 
+# 抑制第三方库的冗余日志
+logging.getLogger('pymilvus').setLevel(logging.ERROR)  # 抑制 AsyncMilvusClient 警告
+
 # 设置日志级别
 server_logger.info("logging initialized")
 
@@ -208,8 +211,6 @@ class ModuleLogger:
         # 创建控制台处理器(强制为开发环境启用)
         if console_output:
             self._create_console_handler()
-            # 同时添加一个简化的控制台处理器用于子进程
-            self._create_simple_console_handler()
 
         # 添加trace_filter
         self.logger.addFilter(trace_filter)

+ 0 - 57
foundation/schemas/test_schemas.py

@@ -1,57 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-"""
-测试模式定义
-
-提供测试相关的数据模型和配置结构
-"""
-
-from typing import Optional, Dict, Any, List
-from pydantic import BaseModel, Field
-
-
-class TestConfig(BaseModel):
-    """测试配置"""
-    session_id: str = Field(description="会话ID")
-    model_type: Optional[str] = Field(default="gemini", description="模型类型")
-    temperature: Optional[float] = Field(default=0.7, description="温度参数")
-    max_tokens: Optional[int] = Field(default=2000, description="最大token数")
-
-
-class TestForm(BaseModel):
-    """测试表单"""
-    input: str = Field(description="输入内容")
-    context: Optional[Dict[str, Any]] = Field(default=None, description="上下文信息")
-    config: TestConfig = Field(description="配置信息")
-
-
-class TestResponse(BaseModel):
-    """测试响应"""
-    output: str = Field(description="输出结果")
-    trace_id: Optional[str] = Field(default=None, description="追踪ID")
-    processing_time: Optional[float] = Field(default=None, description="处理时间(秒)")
-
-
-class StreamEvent(BaseModel):
-    """流式事件"""
-    event: str = Field(description="事件类型")
-    data: Dict[str, Any] = Field(description="事件数据")
-
-
-class TestResult(BaseModel):
-    """测试结果"""
-    success: bool = Field(description="是否成功")
-    message: str = Field(description="消息")
-    data: Optional[Dict[str, Any]] = Field(default=None, description="数据")
-    error: Optional[str] = Field(default=None, description="错误信息")
-
-
-# 导出的类
-__all__ = [
-    "TestConfig",
-    "TestForm",
-    "TestResponse",
-    "StreamEvent",
-    "TestResult"
-]

+ 4 - 3
server/app.py

@@ -1,10 +1,14 @@
 import os
 import sys
+import logging
 
 # Windows 平台 Celery 兼容性设置(必须在导入 celery 之前)
 if sys.platform == 'win32':
     os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
 
+# 抑制 pymilvus 的 AsyncMilvusClient 警告(在多进程环境中没有事件循环)
+logging.getLogger('pymilvus').setLevel(logging.ERROR)
+
 import time
 import redis
 import signal
@@ -27,7 +31,6 @@ from foundation.observability.logger.loggering import server_logger
 from foundation.infrastructure.messaging.celery_app import app as celery_app
 
 # 导入所有路由
-from views.test_views import test_router
 from views.construction_review.file_upload import file_upload_router
 from views.construction_review.review_results import review_results_router
 from views.construction_review.launch_review import launch_review_router
@@ -93,7 +96,6 @@ class RouteManager:
 
     def _setup_routes(self):
         """配置所有路由"""
-        self.app.include_router(test_router)
         self.app.include_router(file_upload_router)
         self.app.include_router(review_results_router)
         self.app.include_router(launch_review_router)
@@ -529,7 +531,6 @@ def create_app() -> FastAPI:
     )
 
     # 添加所有路由
-    app.include_router(test_router)
     app.include_router(file_upload_router)
     app.include_router(review_results_router)
     app.include_router(launch_review_router)

+ 0 - 1
views/__init__.py

@@ -42,7 +42,6 @@ async def lifespan(app: FastAPI):
         await async_db_pool.close()
 
 
-test_router = APIRouter(prefix="/test")
 current_operation_id: ContextVar[str] = ContextVar("operation_id", default=str(uuid.uuid4()))
 
 

+ 9 - 14
views/construction_review/review_results.py

@@ -11,6 +11,7 @@ from fastapi import APIRouter, HTTPException, Query
 from pydantic import BaseModel
 from typing import Optional, Dict, Any
 from .schemas.error_schemas import ReviewResultsErrors
+from foundation.observability.cachefiles import cache, CacheBaseDir
 
 
 # 导入文件上传模块的存储
@@ -52,23 +53,17 @@ async def review_results(
         if user not in valid_users:
             raise HTTPException(status_code=403, detail="无效的用户ID")
 
-        # 构建文件路径
-        temp_dir = "temp"
-        file_path = os.path.join(temp_dir, f"{callback_task_id}.json")
+        # 使用 cache_manager 读取结果文件
+        review_results = cache.load(
+            subdir="final_result",
+            filename=f"{callback_task_id}.json",
+            base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW,
+            default=None
+        )
 
-        # 检查文件是否存在
-        if not os.path.exists(file_path):
+        if review_results is None:
             raise HTTPException(status_code=404, detail="审查结果文件不存在")
 
-        # 读取文件内容
-        try:
-            with open(file_path, 'r', encoding='utf-8') as f:
-                review_results = json.load(f)
-        except json.JSONDecodeError:
-            raise HTTPException(status_code=500, detail="审查结果文件格式错误")
-        except Exception as e:
-            raise HTTPException(status_code=500, detail=f"读取文件失败: {str(e)}")
-
         # 验证文件中的用户信息是否匹配
         if review_results.get("user_id") != user:
             raise HTTPException(status_code=403, detail="用户权限验证失败")

+ 21 - 5
views/construction_write/outline_views.py

@@ -392,6 +392,8 @@ async def generating_outline(request: OutlineGenerationRequest):
             # 持续监听进度并转发(从 Redis 轮询,支持跨进程)
             last_progress = 10
             last_progress_data = None
+            last_event_type = "processing"
+            last_message = ""
             no_change_count = 0
 
             while True:
@@ -401,12 +403,26 @@ async def generating_outline(request: OutlineGenerationRequest):
 
                     if progress_data:
                         current_progress = progress_data.get("current", last_progress)
-
-                        # 进度有变化或状态变化时推送
-                        if (current_progress != last_progress or
-                            progress_data.get("overall_task_status") != last_progress_data.get("overall_task_status") if last_progress_data else True):
-
+                        current_event_type = progress_data.get("event_type", "processing")
+                        current_message = progress_data.get("message", "")
+
+                        # 进度有变化、event_type变化、message变化或状态变化时推送
+                        should_push = False
+                        if current_progress != last_progress:
+                            should_push = True
+                        elif current_event_type != last_event_type:
+                            should_push = True
+                        elif current_message != last_message:
+                            should_push = True
+                        elif last_progress_data is None:
+                            should_push = True
+                        elif progress_data.get("overall_task_status") != last_progress_data.get("overall_task_status"):
+                            should_push = True
+
+                        if should_push:
                             last_progress = current_progress
+                            last_event_type = current_event_type
+                            last_message = current_message
                             last_progress_data = progress_data
 
                             # 转发进度事件

+ 0 - 871
views/test_views.py

@@ -1,871 +0,0 @@
-# !/usr/bin/ python
-# -*- coding: utf-8 -*-
-'''
-@Project    : lq-agent-api
-@File       :cattle_farm_views.py
-@IDE        :PyCharm
-@Author     :
-@Date       :2025/7/10 17:32
-'''
-import json
-from typing import Optional, List, Dict, Any
-from pydantic import BaseModel, Field
-
-from fastapi import Depends, Response, Header
-from sse_starlette import EventSourceResponse
-from starlette.responses import JSONResponse
-from fastapi import Depends, Request, Response, Header
-
-from foundation.ai.agent.test_agent import test_agent_client
-from foundation.ai.agent.generate.model_generate import generate_model_client
-from foundation.observability.logger.loggering import server_logger
-from foundation.schemas.test_schemas import TestForm
-from foundation.utils.common import return_json, handler_err
-from views import test_router, get_operation_id
-from foundation.ai.agent.workflow.test_workflow_graph import test_workflow_graph
-
-from foundation.database.base.sql.async_mysql_base_dao import TestTabDAO
-from foundation.database.base.sql import BasisOfPreparationDAO
-from foundation.utils.tool_utils import DateTimeEncoder
-from langchain_core.prompts import ChatPromptTemplate
-from foundation.utils.yaml_utils import system_prompt_config
-
-from foundation.ai.models.model_handler import model_handler as mh
-from foundation.database.base.vector.pg_vector import PGVectorDB
-from foundation.database.base.vector.milvus_vector import MilvusVectorManager
-
-
-# 响应模型用于Swagger文档
-class StandardResponse(BaseModel):
-    """标准响应模型"""
-    code: int = Field(description="响应状态码 (0:成功, 其他:错误)")
-    msg: str = Field(description="响应消息")
-    data: Optional[Dict[str, Any]] = Field(default=None, description="响应数据")
-    data_type: Optional[str] = Field(default=None, description="数据类型")
-    trace_id: Optional[str] = Field(default=None, description="追踪ID")
-
-
-class UserRecord(BaseModel):
-    """用户记录模型"""
-    user_id: Optional[int] = Field(description="用户ID")
-    name: Optional[str] = Field(description="用户名")
-    email: Optional[str] = Field(description="邮箱")
-    age: Optional[int] = Field(description="年龄")
-    created_at: Optional[str] = Field(description="创建时间")
-
-
-class EmbeddingResponse(BaseModel):
-    """向量嵌入响应模型"""
-    embed_dim: int = Field(description="嵌入向量维度")
-    embedding: List[float] = Field(description="嵌入向量数据")
-
-
-class SearchResult(BaseModel):
-    """搜索结果模型"""
-    text_content: Optional[str] = Field(description="文本内容")
-    score: Optional[float] = Field(description="相关性得分")
-    metadata: Optional[Dict[str, Any]] = Field(default=None, description="元数据")
-
-
-class SSEData(BaseModel):
-    """SSE数据模型"""
-    code: int = Field(description="状态码")
-    output: Optional[str] = Field(default=None, description="输出内容")
-    completed: bool = Field(description="是否完成")
-    trace_id: Optional[str] = Field(default=None, description="追踪ID")
-    message: Optional[str] = Field(default=None, description="消息")
-    dataType: Optional[str] = Field(default="text", description="数据类型")
-
-
-
-@test_router.post("/generate/chat", tags=["模型生成"])
-async def generate_chat_endpoint(
-        param: TestForm,
-        trace_id: str = Depends(get_operation_id)):
-    """
-        生成类模型
-    """
-    try:
-        server_logger.info(trace_id=trace_id, msg=f"{param}")
-        print(trace_id)
-        # 从字典中获取input
-        input_query = param.input
-        session_id = param.config.session_id
-        context = param.context
-        header_info = {
-        }
-        
-            # 创建ChatPromptTemplate
-        template = ChatPromptTemplate.from_messages([
-            ("system", system_prompt_config['system_prompt']),
-            ("user", input_query)
-        ])
-
-        task_prompt_info = {"task_prompt": template}
-        output = await generate_model_client.get_model_generate_invoke(trace_id=trace_id , task_prompt_info=task_prompt_info)
-        # 直接执行
-        server_logger.info(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
-        # 返回字典格式的响应
-        return JSONResponse(
-            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
-
-    except ValueError as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-    except Exception as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-
-@test_router.post("/generate/stream", tags=["模型生成"])
-async def generate_stream_endpoint(
-        param: TestForm,
-        trace_id: str = Depends(get_operation_id)):
-    """
-        生成类模型
-    """
-    try:
-        server_logger.info(trace_id=trace_id, msg=f"{param}")
-
-        # 从字典中获取input
-        input_query = param.input
-        session_id = param.config.session_id
-        context = param.context
-        header_info = {
-        }
-              # 创建ChatPromptTemplate
-        template = ChatPromptTemplate.from_messages([
-            ("system", system_prompt_config['system_prompt']),
-            ("user", input_query)
-        ])
-
-        task_prompt_info = {"task_prompt": template}
-        # 创建 SSE 流式响应
-        async def event_generator():
-            try:
-                # 流式处理查询 trace_id, task_prompt_info: dict, input_query, context=None
-                for chunk in generate_model_client.get_model_generate_stream(trace_id=trace_id , task_prompt_info=task_prompt_info):
-                    # 发送数据块
-                    yield {
-                        "event": "message",
-                        "data": json.dumps({
-                            "output": chunk,
-                            "completed": False,
-                        }, ensure_ascii=False)
-                    }
-                     # 获取缓存数据
-                result_data = {}
-                # 发送结束事件
-                yield {
-                    "event": "message_end",
-                    "data": json.dumps({
-                        "completed": True,
-                        "message": json.dumps(result_data, ensure_ascii=False),
-                        "code": 0,
-                        "trace_id": trace_id,
-                    }, ensure_ascii=False),
-                }
-            except Exception as e:
-                # 错误处理
-                yield {
-                    "event": "error",
-                    "data": json.dumps({
-                        "trace_id": trace_id,
-                        "message": str(e),
-                        "code": 1
-                    }, ensure_ascii=False)
-                }
-            finally:
-                # 不需要关闭客户端,因为它是单例
-                pass
-        # 返回 SSE 响应
-        return EventSourceResponse(
-            event_generator(),
-            headers={
-                "Cache-Control": "no-cache",
-                "Connection": "keep-alive"
-            }
-        )
-    except ValueError as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-    except Exception as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-# 路由
-
-@test_router.post("/agent/chat", tags=["智能体"])
-async def chat_endpoint(
-        param: TestForm,
-        trace_id: str = Depends(get_operation_id)):
-    """
-    根据场景获取智能体反馈
-    """
-
-    try:
-        server_logger.info(trace_id=trace_id, msg=f"{param}")
-        # 验证参数
-
-        # 从字典中获取input
-        input_data = param.input
-        session_id = param.config.session_id
-        context = param.context
-        header_info = {
-        }
-        task_prompt_info = {"task_prompt": ""}
-  
-        # stream 流式执行
-        output = await test_agent_client.handle_query(trace_id  , task_prompt_info, input_data, context, param.config)
-        # 直接执行
-        server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
-        # 返回字典格式的响应
-        return JSONResponse(
-            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
-    except ValueError as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-    except Exception as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-
-@test_router.post("/agent/stream", response_class=Response, tags=["智能体"])
-async def chat_agent_stream(param: TestForm,
-                     trace_id: str = Depends(get_operation_id)):
-    """
-    根据场景获取智能体反馈 (SSE流式响应)
-    """
-    try:
-        server_logger.info(trace_id=trace_id, msg=f"{param}")
-
-       
-        # 提取参数
-        input_data = param.input
-        context = param.context
-        header_info = {
-          
-        }
-        task_prompt_info = {"task_prompt": ""}
-          # 如果business_scene为None,则使用大模型进行意图识别
-
-        server_logger.info(trace_id=trace_id, msg=f"{param}")
-        # 创建 SSE 流式响应
-        async def event_generator():
-            try:
-                # 流式处理查询
-                async for chunk in test_agent_client.handle_query_stream(
-                        trace_id=trace_id,
-                        config_param=param.config,
-                        task_prompt_info=task_prompt_info,
-                        input_query=input_data,
-                        context=context,
-                        header_info=header_info
-                ):
-                    server_logger.debug(trace_id=trace_id, msg=f"{chunk}")
-                    # 发送数据块
-                    yield {
-                        "event": "message",
-                        "data": json.dumps({
-                            "code": 0,
-                            "output": chunk,
-                            "completed": False,
-                            "trace_id": trace_id,
-                        }, ensure_ascii=False)
-                    }
-                # 获取缓存数据
-                result_data = {}
-                # 发送结束事件
-                yield {
-                    "event": "message_end",
-                    "data": json.dumps({
-                        "completed": True,
-                        "message": json.dumps(result_data, ensure_ascii=False),
-                        "code": 0,
-                        "trace_id": trace_id,
-                    }, ensure_ascii=False),
-                }
-            except Exception as e:
-                # 错误处理
-                yield {
-                    "event": "error",
-                    "data": json.dumps({
-                        "trace_id": trace_id,
-                        "message": str(e),
-                        "code": 1
-                    }, ensure_ascii=False)
-                }
-            finally:
-                # 不需要关闭客户端,因为它是单例
-                pass
-
-        # 返回 SSE 响应
-        return EventSourceResponse(
-            event_generator(),
-            headers={
-                "Cache-Control": "no-cache",
-                "Connection": "keep-alive"
-            }
-        )
-
-    except Exception as err:
-        # 初始错误处理
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/stream")
-        return JSONResponse(
-            return_json(code=1, msg=f"{err}", trace_id=trace_id),
-            status_code=500
-        )
-
-
-
-@test_router.post("/graph/stream", response_class=Response, tags=["智能体"])
-async def chat_graph_stream(param: TestForm,
-                     trace_id: str = Depends(get_operation_id)):
-    """
-        根据场景获取智能体反馈 (SSE流式响应)
-    """
-    try:
-        server_logger.info(trace_id=trace_id, msg=f"{param}")
-        # request_param = {
-        #     "input": param.input,
-        #     "config": param.config,
-        #     "context": param.context
-        # }
-        # 创建 SSE 流式响应 
-        async def event_generator():
-                try:
-                    # 流式处理查询
-                    async for chunk in test_workflow_graph.handle_query_stream(
-                            param=param,
-                            trace_id=trace_id,
-                    ):
-                        server_logger.debug(trace_id=trace_id, msg=f"{chunk}")
-                        # 发送数据块
-                        yield {
-                            "event": "message",
-                            "data": json.dumps({
-                                "code": 0,
-                                "output": chunk,
-                                "completed": False,
-                                "trace_id": trace_id,
-                                "dataType": "text"
-                            }, ensure_ascii=False)
-                        }
-
-                    # 发送结束事件
-                    yield {
-                        "event": "message_end",
-                        "data": json.dumps({
-                            "completed": True,
-                            "message": "Stream completed",
-                            "code": 0,
-                            "trace_id": trace_id,
-                            "dataType": "text"
-                        }, ensure_ascii=False),
-                    }
-                except Exception as e:
-                    # 错误处理
-                    yield {
-                        "event": "error",
-                        "data": json.dumps({
-                            "trace_id": trace_id,
-                            "msg": str(e),
-                            "code": 1,
-                            "dataType": "text"
-                        }, ensure_ascii=False)
-                    }
-                finally:
-                    # 不需要关闭客户端,因为它是单例
-                    pass
-
-        # 返回 SSE 响应
-        return EventSourceResponse(
-            event_generator(),
-            headers={
-                "Cache-Control": "no-cache",
-                "Connection": "keep-alive"
-            }
-        )
-        
-    except Exception as err:
-        # 初始错误处理
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="graph/stream")
-        return JSONResponse(
-            return_json(code=1, msg=f"{err}", trace_id=trace_id),
-            status_code=500
-        )
-    
-
-
-
-
-@test_router.post("/redis", tags=["redis"])
-async def test_redis(
-        request: Request,
-        param: TestForm,
-        trace_id: str = Depends(get_operation_id)):
-    """
-    根据MySQL应用
-    """
-    try:
-        server_logger.info(trace_id=trace_id, msg=f"{param}")
-        # 验证参数
-
-        # 从字典中获取input
-        input_data = param.input
-        session_id = param.config.session_id
-        context = param.context
-        header_info = {
-        }
-        from foundation.utils.redis_utils import set_redis_result_cache_data , get_redis_result_cache_data
-        output = "success"
-        data_type = "output"
-
-        await set_redis_result_cache_data(data_type=data_type , trace_id=trace_id , value=input_data)
-        server_logger.info(trace_id=trace_id, msg=f"key:{trace_id}:{data_type},value:{input_data} redis 设置成功")
-        output = await get_redis_result_cache_data(data_type=data_type , trace_id=trace_id)
-        # 直接执行
-        server_logger.info(trace_id=trace_id, msg=f"【result】: {output}", log_type="/redis")
-        # 返回字典格式的响应
-        return JSONResponse(
-            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
-    except ValueError as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/redis")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-    except Exception as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/redis")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-    
-
-
-
-
-@test_router.post("/mysql/add", tags=["mysql"])
-async def test_mysql_add(
-        request: Request,
-        param: TestForm,
-        trace_id: str = Depends(get_operation_id)):
-    """
-    根据MySQL应用
-    """
-    try:
-        server_logger.info(trace_id=trace_id, msg=f"{param}")
-        # 验证参数
-
-        # 从字典中获取input
-        input_data = param.input
-        session_id = param.config.session_id
-        context = param.context
-        header_info = {
-        }
-        # 从app.state中获取数据库连接池
-        async_db_pool = request.app.state.async_db_pool
-        from foundation.database.base.sql.async_mysql_base_dao import TestTabDAO
-        test_tab_dao = TestTabDAO(async_db_pool)
-        #  name: str, email: str, age: int
-        name = input_data
-        email = session_id
-        age = 18
-        test_id = await test_tab_dao.insert_user(name=name, email=email, age=age)
-        output = f"【test_id】: {test_id}"
-       
-        # 直接执行
-        server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/mysql/add")
-        # 返回字典格式的响应
-        return JSONResponse(
-            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
-    except ValueError as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/add")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-    except Exception as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/add")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-    
-
-
-@test_router.post("/mysql/get", tags=["mysql"])
-async def test_mysql_add(
-        request: Request,
-        param: TestForm,
-        trace_id: str = Depends(get_operation_id)):
-    """
-    根据MySQL应用
-    """
-    try:
-        server_logger.info(trace_id=trace_id, msg=f"{param}")
-        # 验证参数
-
-        # 从字典中获取input
-        input_data = param.input
-        session_id = param.config.session_id
-        context = param.context
-        header_info = {
-        }
-        # 从app.state中获取数据库连接池
-        async_db_pool = request.app.state.async_db_pool
-        test_tab_dao = TestTabDAO(async_db_pool)
-        test_id = input_data;
-        data = await test_tab_dao.get_user_by_id(user_id=test_id)
-        server_logger.info(trace_id=trace_id, msg=f"【result】: {data}", log_type="/mysql/get")
-        json_str = json.dumps(data , cls=DateTimeEncoder, ensure_ascii=False, indent=2)
-        output = json_str
-        # 直接执行
-        server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/mysql/get")
-        # 返回字典格式的响应
-        return JSONResponse(
-            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
-    except ValueError as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/get")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-    except Exception as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/get")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-    
-@test_router.post("/mysql/list", tags=["mysql"])
-async def test_mysql_add(
-        request: Request,
-        param: TestForm,
-        trace_id: str = Depends(get_operation_id)):
-    """
-    根据MySQL应用
-    """
-    try:
-        server_logger.info(trace_id=trace_id, msg=f"{param}")
-        # 验证参数
-
-        # 从字典中获取input
-        input_data = param.input
-        session_id = param.config.session_id
-        context = param.context
-        header_info = {
-        }
-        # 从app.state中获取数据库连接池
-        async_db_pool = request.app.state.async_db_pool
-        from foundation.database.base.sql.async_mysql_base_dao import TestTabDAO
-        test_tab_dao = TestTabDAO(async_db_pool)
-        test_id = input_data;
-        data = await test_tab_dao.get_all_users()
-        server_logger.info(trace_id=trace_id, msg=f"【result】: {data}", log_type="/mysql/list")
-        json_str = json.dumps(data , cls=DateTimeEncoder, ensure_ascii=False, indent=2)
-        output = json_str
-        # 直接执行
-        server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/mysql/list")
-        # 返回字典格式的响应
-        return JSONResponse(
-            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
-    except ValueError as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/list")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-    except Exception as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/list")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-
-
-
-@test_router.post("/mysql/update", tags=["mysql"])
-async def test_mysql_add(
-        request: Request,
-        param: TestForm,
-        trace_id: str = Depends(get_operation_id)):
-    """
-    根据MySQL应用
-    """
-    try:
-        server_logger.info(trace_id=trace_id, msg=f"{param}")
-        # 验证参数
-
-        # 从字典中获取input
-        input_data = param.input
-        session_id = param.config.session_id
-        context = param.context
-        header_info = {
-        }
-        # 从app.state中获取数据库连接池
-        async_db_pool = request.app.state.async_db_pool
-        test_tab_dao = TestTabDAO(async_db_pool)
-        test_id = session_id;
-        updates = {
-            "name": input_data,
-            "email": "test_email——upt",
-            "age": 22
-        }
-        success = await test_tab_dao.update_user(user_id=test_id , **updates)
-        server_logger.info(trace_id=trace_id, msg=f"【result】: {success}", log_type="/mysql/update")
-        output = success
-        # 直接执行
-        server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/mysql/update")
-        # 返回字典格式的响应
-        return JSONResponse(
-            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
-    except ValueError as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/update")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-    except Exception as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/update")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-    
-
-
-
-
-@test_router.post("/bop/get", tags=["mysql"])
-async def test_bop_get(
-        request: Request,
-        param: TestForm,
-        trace_id: str = Depends(get_operation_id)):
-    """
-    根据MySQL应用
-    """
-    try:
-        server_logger.info(trace_id=trace_id, msg=f"{param}")
-        # 验证参数
-
-        # 从字典中获取input
-        input_data = param.input
-        session_id = param.config.session_id
-        context = param.context
-        header_info = {
-        }
-        # 从app.state中获取数据库连接池
-        async_db_pool = request.app.state.async_db_pool
-        bop_dao = BasisOfPreparationDAO(async_db_pool)
-        test_id = input_data;
-        data = await bop_dao.get_info_by_id(id=test_id)
-        server_logger.info(trace_id=trace_id, msg=f"【result】: {data}", log_type="/bop/get")
-        json_str = json.dumps(data , cls=DateTimeEncoder, ensure_ascii=False, indent=2)
-        output = json_str
-        # 直接执行
-        server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/bop/get")
-        # 返回字典格式的响应
-        return JSONResponse(
-            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
-    except ValueError as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/bop/get")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-    except Exception as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/bop/get")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-    
-@test_router.post("/bop/list", tags=["mysql"])
-async def test_mysql_add(
-        request: Request,
-        param: TestForm,
-        trace_id: str = Depends(get_operation_id)):
-    """
-    根据MySQL应用
-    """
-    try:
-        server_logger.info(trace_id=trace_id, msg=f"{param}")
-        # 验证参数
-
-        # 从字典中获取input
-        input_data = param.input
-        session_id = param.config.session_id
-        context = param.context
-        header_info = {
-        }
-        # 从app.state中获取数据库连接池
-        async_db_pool = request.app.state.async_db_pool
-        from foundation.database.base.sql.async_mysql_base_dao import TestTabDAO
-        bop_dao = BasisOfPreparationDAO(async_db_pool)
-        test_id = input_data;
-        data = await bop_dao.get_list()
-        server_logger.info(trace_id=trace_id, msg=f"【result】: {data}", log_type="/bop/list")
-        json_str = json.dumps(data , cls=DateTimeEncoder, ensure_ascii=False, indent=2)
-        output = json_str
-        # 直接执行
-        server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/bop/list")
-        # 返回字典格式的响应
-        return JSONResponse(
-            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
-    except ValueError as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/bop/list")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-    except Exception as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="/bop/list")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-
-
-
-# RAG-嵌入接口
-@test_router.post("/embedding", tags=["RAG服务"])
-async def embedding_test_endpoint(
-        param: TestForm,
-        trace_id: str = Depends(get_operation_id)):
-    """
-        embedding模型测试
-    """
-    try:
-        server_logger.info(trace_id=trace_id, msg=f"{param}")
-        print(trace_id)
-        # 从字典中获取input
-        input_query = param.input
-        session_id = param.config.session_id
-        context = param.context
-        header_info = {
-        }
-        task_prompt_info = {"task_prompt": ""}
-        text = input_query
-
-
-        embedding = mh._get_lq_qwen3_8b_emd()
-        embed_dim = len(embedding)
-        server_logger.info(trace_id=trace_id, msg=f"【result】: {embed_dim}")
-
-        output = f"embed_dim={embed_dim},embedding:{embedding}"
-        #output = test_generate_model_client.get_model_data_governance_invoke(trace_id , task_prompt_info, input_query, context)
-        # 直接执行
-        #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="embedding")
-        # 返回字典格式的响应
-        return JSONResponse(
-            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
-
-    except ValueError as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="embedding")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-    except Exception as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="embedding")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-
-
-
-
-
-#PG向量检索
-@test_router.post("/bfp/search", tags=["RAG服务"])
-async def bfp_search_endpoint(
-        param: TestForm,
-        trace_id: str = Depends(get_operation_id)):
-    """
-        编制依据向量检索
-    """
-    try:
-        server_logger.info(trace_id=trace_id, msg=f"{param}")
-        print(trace_id)
-        # 从字典中获取input
-        input_query = param.input
-        session_id = param.config.session_id
-        context = param.context
-        header_info = {
-        }
-        task_prompt_info = {"task_prompt": ""}
-        top_k = int(session_id)
-        
-        output = None
-        # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
-
-        # 抽象测试
-        pg_vector_db = PGVectorDB()
-        output = pg_vector_db.retriever(param={"table_name": "tv_basis_of_preparation"}, query_text=input_query , top_k=top_k)
-
-        # 返回字典格式的响应
-        return JSONResponse(
-            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
-
-    except ValueError as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-    except Exception as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-    
-
-
-#PG向量检索/重排序
-@test_router.post("/bfp/search/rerank", tags=["RAG服务"])
-async def bfp_search_endpoint(
-        param: TestForm,
-        trace_id: str = Depends(get_operation_id)):
-    """
-        编制依据文档检索和重排序
-    """
-    try:
-        server_logger.info(trace_id=trace_id, msg=f"{param}")
-        print(trace_id)
-        # 从字典中获取input
-        input_query = param.input
-        session_id = param.config.session_id
-        context = param.context
-        header_info = {
-        }
-        task_prompt_info = {"task_prompt": ""}
-        top_k = int(session_id)
-        
-        output = None
-        # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
-        #client = SiliconFlowAPI()
-        # 抽象测试
-        pg_vector_db = PGVectorDB()
-        output = pg_vector_db.retriever(param={"table_name": "tv_basis_of_preparation"}, query_text=input_query , top_k=top_k)
-        # 重排序处理
-        content_list = [doc["text_content"] for doc in output]
-        #output = client.rerank(input_query=input_query, documents=content_list , top_n=top_k)
-
-        # 返回字典格式的响应
-        return JSONResponse(
-            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
-
-    except ValueError as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-    except Exception as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-    
-
-
-
-# Milvus向量检索
-@test_router.post("/data/bfp/milvus/search", tags=["milvus"])
-async def bfp_search_endpoint(
-        param: TestForm,
-        trace_id: str = Depends(get_operation_id)):
-    """
-        编制依据文档切分处理 和 入库处理
-    """
-    try:
-        server_logger.info(trace_id=trace_id, msg=f"{param}")
-        print(trace_id)
-        # 从字典中获取input
-        input_query = param.input
-        session_id = param.config.session_id
-        context = param.context
-        header_info = {
-        }
-        task_prompt_info = {"task_prompt": ""}
-        top_k = int(session_id)
-        
-        output = None
-        # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
-        #client = SiliconFlowAPI()
-        # 抽象测试
-        vector_db = MilvusVectorManager()
-        output = vector_db.retriever(param={"collection_name": "tv_basis_of_preparation"}, query_text=input_query , top_k=top_k)
-
-        # 返回字典格式的响应
-        return JSONResponse(
-            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
-
-    except ValueError as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/search")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
-    except Exception as err:
-        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/search")
-        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))