Browse Source

数据治理代码

lingmin_package@163.com 2 months ago
parent
commit
6953548b32
44 changed files with 3659 additions and 1 deletions
  1. 63 0
      .gitignore
  2. 28 0
      Dockerfile
  3. 34 1
      README.md
  4. BIN
      config/.DS_Store
  5. 50 0
      config/config.ini
  6. 10 0
      config/prompt/common_model_query.yaml
  7. 22 0
      config/prompt/intent_prompt.yaml
  8. 29 0
      config/prompt/system_prompt.yaml
  9. 137 0
      file_processors/pdf_processor.py
  10. 0 0
      foundation/__init__.py
  11. 11 0
      foundation/agent/__init__.py
  12. 161 0
      foundation/agent/base_agent.py
  13. 41 0
      foundation/agent/function/test_funciton.py
  14. 9 0
      foundation/agent/generate/__init__.py
  15. 137 0
      foundation/agent/generate/model_generate.py
  16. 105 0
      foundation/agent/generate/test_intent.py
  17. 252 0
      foundation/agent/test_agent.py
  18. 21 0
      foundation/agent/workflow/test_cus_state.py
  19. 192 0
      foundation/agent/workflow/test_workflow_graph.py
  20. 110 0
      foundation/agent/workflow/test_workflow_node.py
  21. 71 0
      foundation/base/async_redis_lock.py
  22. 35 0
      foundation/base/config.py
  23. 39 0
      foundation/base/redis_config.py
  24. 212 0
      foundation/base/redis_connection.py
  25. 67 0
      foundation/base/redis_lock.py
  26. 39 0
      foundation/core_enums.py
  27. 145 0
      foundation/logger/loggering.py
  28. 65 0
      foundation/models/base_online_platform.py
  29. 194 0
      foundation/models/silicon_flow.py
  30. 97 0
      foundation/schemas/__init__.py
  31. 22 0
      foundation/schemas/test_schemas.py
  32. 76 0
      foundation/utils/common.py
  33. 63 0
      foundation/utils/redis_utils.py
  34. 41 0
      foundation/utils/tool_utils.py
  35. 205 0
      foundation/utils/utils.py
  36. 93 0
      foundation/utils/yaml_utils.py
  37. 35 0
      gunicorn_config.py
  38. 179 0
      requirements.txt
  39. 64 0
      run.sh
  40. 46 0
      server/app.py
  41. 10 0
      server/cus_middlewares.py
  42. BIN
      test/pdf_files/G4216 线屏山新市至金阳段高速公路项目XJ4 合同段T 梁预制、运输及安装专项施工方案(修编).pdf
  43. 37 0
      views/__init__.py
  44. 412 0
      views/test_views.py

+ 63 - 0
.gitignore

@@ -0,0 +1,63 @@
+# ---> Python
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+env/
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+*.egg-info/
+.installed.cfg
+*.egg
+
+# PyInstaller
+#  Usually these files are written by a python script from a template
+#  before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*,cover
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+target/
+
+todo.md
+.design
+.claude

+ 28 - 0
Dockerfile

@@ -0,0 +1,28 @@
+FROM python:3.13-slim
+
+ENV DEBIAN_FRONTEND=noninteractive \
+    TZ=Asia/Shanghai
+
+# 安装系统依赖包并创建虚拟环境
+RUN chmod 777 /tmp \
+    && python -m venv /venv
+
+ENV PATH="/venv/bin:$PATH"
+
+# 先复制 requirements 文件安装依赖(利用缓存)
+COPY requirements.txt /tmp/
+RUN /venv/bin/pip config set global.index-url https://mirrors.aliyun.com/pypi/simple \
+    && /venv/bin/pip config set install.trusted-host mirrors.aliyun.com \
+    && /venv/bin/pip --default-timeout=1800 install -r /tmp/requirements.txt \
+    && rm -rf /root/.cache
+
+# 设置工作目录并复制项目文件
+WORKDIR /app
+COPY . /app
+
+EXPOSE 8001
+# 确保脚本可执行
+RUN chmod 777 run.sh
+
+# 使用虚拟环境运行脚本
+CMD ["/venv/bin/gunicorn", "-c", "gunicorn_config.py", "server.app:app"]

+ 34 - 1
README.md

@@ -1,2 +1,35 @@
-# LQDataGovernance
 
+
+
+### LQDataGovernance 数据治理服务
+
+
+ #### 环境安装
+   - pip install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
+ #### 后端启动服务
+    - cd LQDataGovernance
+    - uvicorn server.app:app --port=8010 --host=0.0.0.0
+    - gunicorn -c gunicorn_config.py server.app:app       多进程启动
+
+
+
+
+ 
+ 
+### 测试接口
+
+  #### 生成模型接口 
+    - chat
+        http://localhost:8010/test/data/governance
+     {
+      "config": {
+          "session_id":"222"
+      },
+      "input": "本项目共设3个预制场:1#预制场位于连接线LK5+600填方区,负责70片T梁预制;2#预制场位于A匝道AK0+273挖方路基,负责555片T梁;3#预制场位于E匝道EK0+000挖方区,负责144片T梁。各预制场均配备龙门吊、型钢台座、智能喷淋养护系统等设备,满足不同梁型的预制需求。"
+    }
+
+    {\n  \"system\": \"你是一个施工方案业务编写专家!\",\n  \"instruction\": \"请根据提供的预制场信息,总结各预制场的分布及主要任务。\",\n  \"input\": \"本项目共设3个预制场:1#预制场位于连接线LK5+600填方区,负责70片T梁预制;2#预制场位于A匝道AK0+273挖方路基,负责555片T梁;3#预制场位于E匝道EK0+000挖方区,负责144片T梁。各预制场均配备龙门吊、型钢台座、智能喷淋养护系统等设备,满足不同梁型的预制需求。\",\n  \"output\": \"本项目共设置3个预制场:\\n1. 1#预制场位于连接线LK5+600填方区,主要负责70片T梁的预制;\\n2. 2#预制场位于A匝道AK0+273挖方路基,主要负责555片T梁的预制;\\n3. 3#预制场位于E匝道EK0+000挖方区,主要负责144片T梁的预制。\\n各预制场均配备了龙门吊、型钢台座和智能喷淋养护系统等设备,能够满足不同梁型的预制需求。\"\n}
+
+  
+
+  

BIN
config/.DS_Store


+ 50 - 0
config/config.ini

@@ -0,0 +1,50 @@
+
+
+[model]
+MODEL_TYPE=qwen
+
+[deepseek]
+DEEPSEEK_SERVER_URL=https://api.deepseek.com
+DEEPSEEK_MODEL_ID=deepseek-chat
+DEEPSEEK_API_KEY=
+
+
+[qwen]
+MODEL_SERVER_URL=https://api-inference.modelscope.cn/v1/
+CHAT_MODEL_ID=Qwen/Qwen3-30B-A3B
+API_KEY=ms-61bf873e-7536-42a9-b830-b12dca656e1f
+
+
+[api_key]
+DASHSCOPE_API_KEY=sk-9fca4fca37ce4f509ec9ead71ccdd542
+EMBED_MODEL_ID=text-embedding-v4
+
+
+
+[app]
+APP_CODE=lq-agent
+APP_SECRET=sx-73d32556-605e-11f0-9dd8-acde48001122
+
+
+[redis]
+REDIS_URL=redis://:123456@127.0.0.1:6379
+REDIS_HOST=127.0.0.1
+REDIS_PORT=6379
+REDIS_DB=0
+REDIS_PASSWORD=123456
+REDIS_MAX_CONNECTIONS=50
+
+[log]
+LOG_FILE_PATH=logs
+LOG_FILE_MAX_MB=10
+LOG_BACKUP_COUNT=5
+CONSOLE_OUTPUT=True
+
+
+[siliconflow]
+SLCF_MODEL_SERVER_URL=https://api.siliconflow.cn/v1
+SLCF_API_KEY=sk-npqfinszhdvnwvensnjmlqtihgevehqiyfwunedxnefkmrud
+SLCF_CHAT_MODEL_ID=test-model
+SLCF_EMBED_MODEL_ID=netease-youdao/bce-embedding-base_v1
+SLCF_REANKER_MODEL_ID=BAAI/bge-reranker-v2-m3
+SLCF_VL_CHAT_MODEL_ID=THUDM/GLM-4.1V-9B-Thinking

+ 10 - 0
config/prompt/common_model_query.yaml

@@ -0,0 +1,10 @@
+
+# 任务提示词
+task_prompt: |
+  你是一个智能助手,根据提供的信息回答问题。
+
+
+
+# test
+template: |
+  ## 测试内容

+ 22 - 0
config/prompt/intent_prompt.yaml

@@ -0,0 +1,22 @@
+
+# 系统提示词
+system_prompt: |
+  基于提供的样例,结合用户最近的对话历史上下文进行意图识别,精准匹配对应的业务场景指令。
+  必须优先参考最近的上下文语义及用户意图演变,若问题与样例中的任一业务场景相符,则返回对应指令;若无法匹配任何已定义场景,则返回 chat_box_generate。
+  严格遵守:仅输出指令字符串,不附加任何解释、说明或格式。
+  用户目前历史上下文信息:
+  {history}
+
+
+
+
+# 意图案例 准备few-shot样例;
+intent_examples: 
+  - inn: 你好;咨询.
+    out: chat_box_generate
+
+  - inn: 执行;操作;查询;处理;
+    out: common_agent
+
+
+           

+ 29 - 0
config/prompt/system_prompt.yaml

@@ -0,0 +1,29 @@
+
+
+# 系统提示词
+system_prompt: |
+  分析专家于一身的AI助手,提供全方位的智能化指导。
+        你的建议要务实、经济、易操作,并能基于物联网数据提供精准预警和具体解决方案。
+            
+
+# 系统提示词
+system_data_governance_prompt: |
+  请根据这个文档 生成模型微调的问答对数据集,如:
+    {{
+      "system": "你是一个施工方案业务编写专家!", # 系统提示词(选填)
+      "instruction": "", # 人类指令(必填)
+      "input": "",
+      "output": "", # 模型回答(必填)
+    }}
+    
+    
+# 用户上下文会话记录 摘要提示词
+summary_system_prompt: |
+  请总结以下对话内容,保留关键信息:
+  {history}
+
+
+
+# test
+template: |
+  ## 测试内容

+ 137 - 0
file_processors/pdf_processor.py

@@ -0,0 +1,137 @@
+import os
+import time
+from tqdm import tqdm
+from langchain_community.document_loaders import PyMuPDFLoader
+from langchain_text_splitters import RecursiveCharacterTextSplitter
+from foundation.logger.loggering import server_logger
+from foundation.utils.common import handler_err
+from foundation.base.config import config_handler
+from langchain_core.documents import Document
+
+
+class PDFProcessor:
+    def __init__(self, directory , **kwargs):
+        """
+        初始化 PDF 处理器
+        :param directory: PDF 文件所在目录
+        :param db_type: 数据库类型 ('vector' 或 'es')
+        :param kwargs: 其他参数
+        """
+        self.directory = directory  # PDF 文件所在目录
+        self.file_group_num = kwargs.get('file_group_num', 20)  # 每组处理的文件数
+        self.batch_num = kwargs.get('batch_num', 6)  # 每次插入的批次数量
+        self.chunksize = kwargs.get('chunksize', 500)  # 切分文本的大小
+        self.overlap = kwargs.get('overlap', 100)  # 切分文本的重叠大小
+        self.file_suffix_list = kwargs.get('file_suffix_list', ['.pdf' , '.docx' , '.doc'])
+        server_logger.info(f"""
+                    初始化PDF文件导入器:
+                    配置参数:
+                    - 文件后缀列表:{self.file_suffix_list}
+                    - 导入的文件路径:{self.directory}
+                    - 每次处理文件数:{self.file_group_num}
+                    - 每批次处理样本数:{self.batch_num}
+                    - 切分文本的大小:{self.chunksize}
+                    - 切分文本重叠大小:{self.overlap}
+                    """)
+
+    def load_pdf_files(self):
+        """
+        加载目录下的所有PDF文件
+        """
+        file_path = os.path.join(self.directory)
+        pdf_path_files = []
+        pdf_file_names = []
+        #server_logger.info(f"file_path: {file_path}")
+        for file_name in os.listdir(file_path):
+            # 获取后缀(带点) # file_name.lower().endswith('.docx'):
+            file_suffix = os.path.splitext(file_name)[1] 
+            if file_suffix in self.file_suffix_list:
+                pdf_file_names.append(file_name)
+                pdf_path_files.append(os.path.join(file_path, file_name))
+            else:
+                server_logger.info(f"Skipping {file_name} because it is not a PDF file.")
+
+        server_logger.info(f"Found {len(pdf_file_names)} PDF files.")
+        server_logger.info(f"pdf_path_files: {pdf_path_files},pdf_file_names:{pdf_file_names}")
+        return pdf_path_files , pdf_file_names
+
+    def load_pdf_content(self, pdf_path):
+        """
+        读取PDF文件内容
+        """
+        pdf_loader = PyMuPDFLoader(file_path=pdf_path)
+        docs = pdf_loader.load()
+        server_logger.info(f" Loading content from {pdf_path}.")
+        return docs
+
+
+    def load_and_process_data(self , file_path):
+        """读取和处理数据"""
+        with open(file=file_path, mode="r", encoding="utf8") as f:
+            data = f.read()
+        return data
+    
+    def split_text(self, documents):
+        """
+        将文本切分成小段
+        documents 是一个分组所有文件的 文档内容
+        """
+        # 切分文档
+        text_splitter = RecursiveCharacterTextSplitter(
+            chunk_size=self.chunksize,
+            chunk_overlap=self.overlap,
+            length_function=len,
+            add_start_index=True,
+        )
+        docs = text_splitter.split_documents(documents)
+        server_logger.info(f"Split text into smaller chunks with RecursiveCharacterTextSplitter. Total chunks: {len(docs)}")
+        return docs
+
+    def insert_docs(self, docs, insert_function, batch_size=None):
+        """
+        将文档插入到指定的数据库,并显示进度
+        :param docs: 要插入的文档列表
+        :param insert_function: 插入函数
+        :param batch_size: 批次大小
+        """
+        if batch_size is None:
+            batch_size = self.batch_num
+
+        server_logger.info(f"Inserting {len(docs)} documents.")
+        start_time = time.time()
+        total_docs_inserted = 0
+
+        total_batches = (len(docs) + batch_size - 1) // batch_size
+
+        with tqdm(total=total_batches, desc="Inserting batches", unit="batch") as pbar:
+            for i in range(0, len(docs), batch_size):
+                batch = docs[i:i + batch_size]
+                insert_function(batch)  # 调用传入的插入函数
+
+                total_docs_inserted += len(batch)
+
+                # 计算并显示当前的TPM
+                elapsed_time = time.time() - start_time
+                if elapsed_time > 0:
+                    tpm = (total_docs_inserted / elapsed_time) * 60
+                    pbar.set_postfix({"TPM": f"{tpm:.2f}"})
+
+                pbar.update(1)
+
+
+    def process_pdfs_group(self):
+        # 读取PDF文件内容
+        pdf_contents = []
+        pdf_path_files , pdf_file_names = self.load_pdf_files()
+
+        for pdf_path_file , pdf_file_name in zip(pdf_path_files , pdf_file_names):
+            # 读取PDF文件内容
+            document_content = self.load_pdf_content(pdf_path_file)
+            # 将文本切分成小段
+            docs = self.split_text(document_content)
+            pdf_contents.append(docs)
+
+        # TODO 切分的问题 可以增加metadata元数据信息 
+        server_logger.info(f"Processed Documents:{self.directory},docs:{len(pdf_contents)}")
+        return pdf_contents
+        

+ 0 - 0
foundation/__init__.py


+ 11 - 0
foundation/agent/__init__.py

@@ -0,0 +1,11 @@
+# !/usr/bin/ python
+# -*- coding: utf-8 -*-
+'''
+@Project    : lq-agent-api
+@File       :__init__.py.py
+@IDE        :PyCharm
+@Author     : 
+@Date       :2025/7/14 15:04
+'''
+
+

+ 161 - 0
foundation/agent/base_agent.py

@@ -0,0 +1,161 @@
+
+# !/usr/bin/python
+# -*- coding: utf-8 -*-
+'''
+@Project    : lq-agent-api
+@File       :base_agent.py
+@IDE        :Cursor
+@Author     : 
+@Date       :2025/7/26 05:00
+'''
+from datetime import datetime
+from io import StringIO
+from contextlib import redirect_stdout
+from typing import Dict, List, Optional
+from foundation.logger.loggering import server_logger
+from foundation.utils.redis_utils import get_redis_result_cache_data_and_delete_key
+
+class BaseAgent:
+    """
+     基础智能助手类
+    """
+
+    def __init__(self):
+        pass
+
+
+    def get_pretty_message_str(self, message) -> str:
+        """安全地捕获 pretty_print() 的输出"""
+        captured_output = StringIO()
+        with redirect_stdout(captured_output):
+            message.pretty_print()
+        return captured_output.getvalue()
+
+    
+    def log_stream_pretty_message(self , trace_id , event):
+        """
+            流式打印agent 整个推理过程 pretty_print() 的输出
+        """
+        event_type = event.get('event', '')
+        name = event.get('name', '')
+        data = event.get('data', {})
+        if event_type not in ['on_chain_start', 'on_chain_end', 'on_tool_start', 'on_tool_end', 'on_chat_model_start']:
+            return 
+        
+        server_logger.info(trace_id=trace_id , msg=f"\n================================= {event_type} ({name}) =================================")
+        if 'messages' in event:
+            for msg in event['messages']:
+                #msg.pretty_print()
+                output = self.get_pretty_message_str(msg)
+                server_logger.info(trace_id=trace_id , msg=f"\n{output}")
+        elif 'chunk' in data:
+            chunk = data['chunk']
+            if hasattr(chunk, 'content') and chunk.content:
+                server_logger.info(trace_id=trace_id , msg=f"Content: {chunk.content}")
+            if hasattr(chunk, 'tool_calls') and chunk.tool_calls:
+                server_logger.info(trace_id=trace_id , msg=f"Tool calls: {chunk.tool_calls}")
+        elif 'output' in data:
+            output = data['output']
+            if hasattr(output, 'pretty_print'):
+                #output.pretty_print()
+                output = self.get_pretty_message_str(output)
+                server_logger.info(trace_id=trace_id , msg=f"\n{output}")
+            else:
+                server_logger.info(trace_id=trace_id , msg=f"Output: {output}")
+
+
+
+    def get_input_context(
+            self,
+            trace_id: str,
+            task_prompt_info: dict,
+            input_query: str,
+            context: Optional[str] = None,
+            supplement_info: Optional[str] = None
+    ) -> tuple[str,str]:
+        """构建场景优化的上下文提示"""
+        context = context or "无相关数据"
+        task_prompt_info_str = task_prompt_info["task_prompt"]
+        
+        # 场景优化的上下文模板
+        context_template = """
+        助手会话 [ID: {trace_id}] 
+        时间: {timestamp}
+        任务: {task_prompt_info_str}
+        
+        用户提供上下文信息:
+        {context}
+        用户输入问题:
+        {input}
+        
+        """
+
+        input_context = context_template.format(
+            trace_id=trace_id,
+            task_prompt_info_str=task_prompt_info_str,
+            context=context,
+            input=input_query,
+            supplement_info=supplement_info,
+            timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+        )
+        
+
+          # 场景优化的上下文模板
+        summary_context_template = """
+        助手会话 [ID: {trace_id}] 
+        上下文信息:
+        {context}
+        用户问题:
+        {input}
+        """
+
+        input_summary_context = summary_context_template.format(
+            trace_id=trace_id,
+            context=context,
+            input=input_query,
+        )
+        return input_context , input_summary_context
+
+
+    def clean_json_output(self, raw_output: str) -> str:
+        """去除开头和结尾的 ```json 和 ```"""
+        cleaned = raw_output.strip()
+        if cleaned.startswith("```json"):
+            cleaned = cleaned[7:]  # 去掉开头的 ```json
+        if cleaned.endswith("```"):
+            cleaned = cleaned[:-3]  # 去掉结尾的 ```
+        return cleaned.strip()
+
+
+    
+    async def get_redis_result_cache_data(self , trace_id: str):
+        """
+            获取redis结果缓存数据
+            @param data_type: 数据类型,
+                基本信息 cattle_info
+                体温信息 cattle_temperature 
+                步数信息 cattle_walk
+                知识库检索溯源信息 retriever_resources
+            @param trace_id: 链路跟踪ID
+        """
+        # 基本信息
+        data_type = "cattle_info"
+        cattle_info = await get_redis_result_cache_data_and_delete_key(data_type=data_type , trace_id=trace_id)
+
+        data_type = "cattle_temperature"
+        cattle_temperature = await get_redis_result_cache_data_and_delete_key(data_type=data_type , trace_id=trace_id)
+
+        data_type = "cattle_walk"
+        cattle_walk = await get_redis_result_cache_data_and_delete_key(data_type=data_type , trace_id=trace_id)
+
+        data_type = "retriever_resources"
+        retriever_resources = await get_redis_result_cache_data_and_delete_key(data_type=data_type , trace_id=trace_id)
+        return {
+            "cattle_info": cattle_info,
+            "cattle_temperature": cattle_temperature,
+            "cattle_walk": cattle_walk,
+            "retriever_resources": retriever_resources
+        }
+
+
+

+ 41 - 0
foundation/agent/function/test_funciton.py

@@ -0,0 +1,41 @@
+
+
+
+class TestFunciton:
+
+
+    def __init__(self):
+        pass
+
+
+
+    def query_info(self , session_id):
+        """
+            查询信息
+            session_id: 会话ID
+        """
+        return "查询结果:小红,session_id:"+session_id
+    
+
+
+    def execute(self , session_id):
+        """
+            执行任务
+            session_id: 会话ID
+        """
+        return "任务执行完成,session_id:"+session_id
+
+
+
+
+    def handle(self , session_id):
+        """
+            处理任务
+            session_id: 会话ID
+        """
+        return "处理结果:小东,session_id:"+session_id
+    
+
+
+
+test_funtion = TestFunciton()

+ 9 - 0
foundation/agent/generate/__init__.py

@@ -0,0 +1,9 @@
+# !/usr/bin/ python
+# -*- coding: utf-8 -*-
+'''
+@Project    : lq-agent-api
+@File       :__init__.py.py
+@IDE        :PyCharm
+@Author     :
+@Date       :2025/7/14 14:22
+'''

+ 137 - 0
foundation/agent/generate/model_generate.py

@@ -0,0 +1,137 @@
+# !/usr/bin/ python
+# -*- coding: utf-8 -*-
+'''
+@Project    : lq-agent-api
+@File       :model_generate.py
+@IDE        :PyCharm
+@Author     :
+@Date       :2025/7/14 14:22
+'''
+
+from typing import Dict, Optional
+from langchain_core.prompts import HumanMessagePromptTemplate
+from langchain_core.prompts import ChatPromptTemplate
+from foundation.utils.utils import get_models
+from foundation.utils.yaml_utils import system_prompt_config
+
+
+class TestGenerateModelClient:
+    """
+        主要是生成式模型
+    """
+
+    def __init__(self):
+        # 获取部署的模型列表
+        llm, chat, embed = get_models()
+        self.llm = llm
+        self.chat = chat
+        # 固定系统提示词
+        self.system_prompt = system_prompt_config["system_prompt"]
+        self.system_data_governance_prompt = system_prompt_config["system_data_governance_prompt"]
+
+
+    def get_prompt_template(self):
+        """
+            构造普通Prompt提示词模板
+        """
+        human_template = """
+            {system_message}
+            用户的问题为:
+                {question}  
+            答案为:
+        """
+        human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
+        chat_prompt_template = ChatPromptTemplate.from_messages([human_message_prompt])
+        return chat_prompt_template
+    
+    
+    def get_model_generate_invoke(self, trace_id, task_prompt_info: dict, input_query, context=None):
+        """
+            模型生成链
+        """
+        # Step 1: 定义系统提示词模板 system_prompt
+
+        # Step 2: 构建完整的 prompt 模板
+        prompt_template = ChatPromptTemplate.from_messages([
+            ("system", self.system_prompt), #task_prompt_info["task_prompt"]
+            ("human", "{input}")
+        ])
+        # Step 3: 初始化模型
+        # Step 4: 使用模板格式化输入
+        messages = prompt_template.invoke({"input": input_query})
+        # Step 5: 流式调用模型
+        response = self.llm.invoke(messages)
+        return response.content
+
+
+
+
+    def get_model_data_governance_invoke(self, trace_id, task_prompt_info: dict, input_query, context=None):
+        """
+            模型生成链
+        """
+        # Step 1: 定义系统提示词模板 system_prompt
+
+        # Step 2: 构建完整的 prompt 模板
+        prompt_template = ChatPromptTemplate.from_messages([
+            ("system", self.system_data_governance_prompt), #task_prompt_info["task_prompt"]
+            ("human", "{input}")
+        ])
+        # Step 3: 初始化模型
+        # Step 4: 使用模板格式化输入
+        messages = prompt_template.invoke({"input": input_query})
+        # Step 5: 流式调用模型
+        response = self.llm.invoke(messages)
+        return response.content
+
+
+    def get_model_generate_stream(self, trace_id, task_prompt_info: dict, input_query, context=None):
+        """
+            模型生成链
+        """
+        # Step 1: 定义系统提示词模板 system_prompt
+
+        # Step 2: 构建完整的 prompt 模板
+        prompt_template = ChatPromptTemplate.from_messages([
+            ("system",  self.system_prompt), #task_prompt_info["task_prompt"]
+            ("human", "{input}")
+        ])
+        # Step 3: 初始化模型
+        # Step 4: 使用模板格式化输入
+        messages = prompt_template.invoke({"input": input_query})
+        # Step 5: 流式调用模型
+        response = self.llm.stream(messages)
+        # Step 6: 逐 token 输出(打字机效果)
+        for chunk in response:
+            yield chunk.content
+
+
+
+    def get_input_context(
+            self,
+            trace_id: str,
+            task_prompt_info: dict,
+            input_query: str,
+            context: Optional[str] = None
+    ) -> str:
+        #server_logger.info(f"task_prompt_info: {task_prompt_info}")
+        """构建问题和上下文"""
+        context = context or "无"
+        task_prompt_info_str = task_prompt_info["task_prompt"]
+
+        # 针对场景优化的上下文提示
+        base_context_prompt = """
+            日志链路跟踪ID:{trace_id}
+            任务信息:{task_prompt_info_str}
+            相关上下文数据:{context}
+            户问题:{input}
+        """
+        return base_context_prompt.format(
+            trace_id=trace_id,
+            task_prompt_info_str=task_prompt_info_str,
+            context=context,
+            input=input_query
+        )
+
+#
+test_generate_model_client = TestGenerateModelClient()

+ 105 - 0
foundation/agent/generate/test_intent.py

@@ -0,0 +1,105 @@
+# !/usr/bin/ python
+# -*- coding: utf-8 -*-
+'''
+@Project    : xiwu-agent-api
+@File       :intent.py
+@IDE        :PyCharm
+@Author     :LINGMIN
+@Date       :2025/7/14 12:04
+'''
+
+
+import os
+import sys
+sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
+
+from foundation.logger.loggering import server_logger
+from foundation.utils.utils import get_models
+from langchain_core.prompts import SystemMessagePromptTemplate
+from langchain_core.prompts import HumanMessagePromptTemplate
+from langchain_core.prompts import ChatPromptTemplate
+from langchain_core.prompts import FewShotChatMessagePromptTemplate
+from foundation.utils import yaml_utils
+from foundation.base.config import config_handler
+
+
+class TestIntentIdentifyClient:
+
+    def __init__(self):
+        """
+            创建意图识别类
+        """
+          # 获取部署的模型列表
+        llm, chat, embed = get_models()
+        self.llm_recognition = chat
+        # 加载 意图识别系统配置信息
+        self.intent_prompt = yaml_utils.get_intent_prompt()
+
+    def recognize_intent(self , trace_id: str , config: dict , input: str):
+        """
+        意图识别
+        输入:用户输入的问题
+        输出:识别出的意图,可选项:
+        """
+        session_id = config["session_id"]
+        history = "无"
+        # 根据历史记录和用户问题进行识别意图
+        return self.recognize_intent_history(input=input , history=history)
+
+
+    def recognize_intent_history(self , input: str , history="无"):
+        """
+        意图识别
+        输入:用户输入的问题
+        输出:识别出的意图,可选项:
+        """
+        # 准备few-shot样例
+        examples = self.intent_prompt["intent_examples"]
+        #server_logger.info(f"加载prompt配置.examples: {examples}")
+        system_prompt = self.intent_prompt["system_prompt"]
+        system_prompt = system_prompt.format(history=history)
+        server_logger.info(f"增加用户历史记录,用于意图识别,prompt配置.system_prompt: {system_prompt}")
+
+        # 定义样本模板
+        examples_prompt = ChatPromptTemplate.from_messages(
+            [
+                ("human", "{inn}"),
+                ("ai", "{out}"),
+            ]
+        )
+        few_shot_prompt = FewShotChatMessagePromptTemplate(example_prompt=examples_prompt,
+                                                           examples=examples)
+        final_prompt = ChatPromptTemplate.from_messages(
+            [
+                ('system', system_prompt),
+                few_shot_prompt,
+                ('human', '{input}'),
+            ]
+        )
+
+        chain = final_prompt | self.llm_recognition
+        server_logger.info(f"意图识别输入input: {input}")
+        result = chain.invoke(input={"input": input})
+        # 容错处理
+        if hasattr(result, 'content'):
+            # 如果 result 有 content 属性,使用它
+            return result.content
+        else:
+            # 否则,直接返回 result
+            return result
+
+
+
+
+
+intent_identify_client = TestIntentIdentifyClient()
+
+
+if __name__ == '__main__':
+   
+    input = "你好"
+    input = "查询课程"
+    input = "操作"
+    result = intent_identify_client.recognize_intent_history(history="" , input=input)
+    server_logger.info(f"result={result}")
+    

+ 252 - 0
foundation/agent/test_agent.py

@@ -0,0 +1,252 @@
+# !/usr/bin/python
+# -*- coding: utf-8 -*-
+'''
+@Project    : lq-agent-api
+@File       :agent_mcp.py
+@IDE        :PyCharm
+@Author     :
+@Date       :2025/7/21 10:12
+'''
+import json
+
+from langgraph.prebuilt import create_react_agent
+from sqlalchemy.sql.functions import user
+from foundation.logger.loggering import server_logger
+from foundation.utils.common import handler_err
+from foundation.utils.utils import get_models
+from foundation.utils.yaml_utils import system_prompt_config
+
+import threading
+import time
+from typing import Dict, List, Optional, AsyncGenerator, Any, OrderedDict
+from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
+from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
+from langchain_core.runnables import RunnableConfig
+from foundation.agent.base_agent import BaseAgent
+from foundation.schemas.test_schemas import FormConfig
+from foundation.agent.function.test_funciton import test_funtion
+
+
+class TestAgentClient(BaseAgent):
+    """
+    Xiwuzc 智能助手+MCP(带完整会话管理) - 针对场景优化
+    添加会话锁定机制,确保同一时间只有一个客户端可以使用特定会话
+    """
+    # 单例实例和线程锁
+    _instance = None
+    _singleton_lock = threading.Lock()
+
+    def __new__(cls):
+        """线程安全的单例模式实现"""
+        if cls._instance is None:
+            with cls._singleton_lock:
+                if cls._instance is None:
+                    cls._instance = super().__new__(cls)
+                    cls._instance._initialize()
+        return cls._instance
+
+    def _initialize(self):
+        """初始化模型和会话管理"""
+        llm, chat, embed = get_models()
+        self.llm = llm
+        self.chat = chat
+        self.embed = embed
+        self.agent_executor = None
+        self.initialized = False
+        self.psutil_available = True
+
+        # 固定系统提示词
+        self.system_prompt = system_prompt_config["system_prompt"]
+
+        # 清理任务
+        self.cleanup_task = None
+        server_logger.info(" client initialized")
+
+    async def init_agent(self):
+        """初始化agent_executor(只需一次)"""
+        if self.initialized:
+            return
+
+        # 获取部署的模型列表
+        server_logger.info(f"系统提示词 system_prompt:{self.system_prompt}")
+
+        # 创建提示词模板 - 使用固定的系统提示词
+        prompt = ChatPromptTemplate.from_messages([
+            ("system", self.system_prompt),
+            MessagesPlaceholder(variable_name="messages"),
+            ("placeholder", "{agent_scratchpad}")
+        ])
+
+        # 创建Agent - 不再使用MemorySaver
+        self.agent_executor = create_react_agent(
+            self.llm,
+            tools=[test_funtion.query_info , test_funtion.execute , test_funtion.handle] ,  # 专用工具集 + 私有知识库检索工具
+            prompt=prompt
+        )
+        self.initialized = True
+        server_logger.info(" agent initialized")
+
+
+    async def handle_query(self, trace_id: str, task_prompt_info: dict, input_query, context=None,
+                            config_param: FormConfig = None):
+        try:
+            # 确保agent已初始化
+            if not self.initialized:
+                await self.init_agent()
+            
+            session_id = config_param.session_id
+           
+
+            try:
+                # 构建输入消息
+                input_message , input_summary_context = self.get_input_context(
+                    trace_id=trace_id,
+                    task_prompt_info=task_prompt_info,
+                    input_query=input_query,
+                    context=context
+                )
+                # 用于模型对话使用
+                input_human_message = HumanMessage(content=input_message)
+                # 用于对话历史记录摘要 
+                input_human_summary_message = HumanMessage(content=input_summary_context)
+                # 获取历史消息
+                history_messages = []
+                # 构造完整的消息列表
+                all_messages = list(history_messages) + [input_human_message]
+
+                # 配置执行上下文
+                config = RunnableConfig(
+                    configurable={"thread_id": session_id},
+                    runnable_kwargs={"recursion_limit": 15}
+                )
+
+                # 执行智能体
+                events = self.agent_executor.astream(
+                    {"messages": all_messages},
+                    config=config,
+                    stream_mode="values"
+                )
+
+                # 处理结果
+                full_response = []
+                async for event in events:
+                    if isinstance(event["messages"][-1], AIMessage):
+                        chunk = event["messages"][-1].content
+                        full_response.append(chunk)
+                    log_content = self.get_pretty_message_str(event["messages"][-1])
+                    server_logger.info("\n" + log_content.strip(), trace_id=trace_id)
+
+                if full_response:
+                    full_text = "".join(full_response)
+                    server_logger.info(trace_id=trace_id, msg=f"full_response: {full_text}")
+                    full_text = self.clean_json_output(full_text)
+                    return full_text
+            finally:
+                # 确保释放会话锁
+                pass
+        except PermissionError as e:
+            # 处理会话被其他设备锁定的情况
+            return str(e)
+        except Exception as e:
+            handler_err(server_logger, trace_id=trace_id, err=e, err_name='agent/chat')
+            return f"系统错误: {str(e)}"
+
+
+    async def handle_query_stream(
+            self,
+            trace_id: str,
+            task_prompt_info: dict,
+            input_query: str,
+            context: Optional[str] = None,
+            header_info: Optional[Dict] = None,
+            config_param: FormConfig = None,
+    ) -> AsyncGenerator[str, None]:
+        """流式处理查询(优化缓冲管理)"""
+        try:
+            # 确保agent已初始化
+            if not self.initialized:
+                await self.init_agent()
+            
+            session_id = config_param.session_id
+        
+            try:
+                # 构建输入消息
+                input_message , input_summary_context = self.get_input_context(
+                    trace_id=trace_id,
+                    task_prompt_info=task_prompt_info,
+                    input_query=input_query,
+                    context=context
+                )
+                server_logger.info(trace_id=trace_id, msg=f"input_context: {input_message}")
+                # 用于模型对话使用
+                input_human_message = HumanMessage(content=input_message)
+                # 用于对话历史记录摘要 
+                input_human_summary_message = HumanMessage(content=input_summary_context)
+                 # 获取历史消息
+                history_messages = []
+                # 构造完整的消息列表
+                all_messages = list(history_messages) + [input_human_message]
+                # 配置执行上下文
+                config = RunnableConfig(
+                    configurable={"thread_id": session_id},
+                    runnable_kwargs={"recursion_limit": 15}
+                )
+
+                # 流式执行
+                events = self.agent_executor.astream_events(
+                    {"messages": all_messages},
+                    config=config,
+                    stream_mode="values"
+                )
+
+                full_response = []
+                buffer = []
+                last_flush_time = time.time()
+
+                # 流式处理事件
+                async for event in events:
+                    # 只在特定事件类型时打印日志
+                    self.log_stream_pretty_message(trace_id=trace_id, event=event)
+                   
+                    if 'chunk' in event['data'] and "on_chat_model_stream" in event['event']:
+                        chunk = event['data']['chunk'].content
+                        full_response.append(chunk)
+
+                        # 缓冲管理策略
+                        buffer.append(chunk)
+                        current_time = time.time()
+
+                        # 满足以下任一条件即刷新缓冲区
+                        if (len(buffer) >= 3 or  # 达到最小块数
+                                (current_time - last_flush_time) > 0.5 or  # 超时
+                                any(chunk.endswith((c, f"{c} ")) for c in
+                                    ['.', '。', '!', '?', '\n', ';', ';'])):  # 自然断点
+
+                            # 合并并发送缓冲内容
+                            combined = ''.join(buffer)
+                            yield combined
+
+                            # 重置缓冲
+                            buffer.clear()
+                            last_flush_time = current_time
+
+                # 处理剩余内容
+                if buffer:
+                    yield ''.join(buffer)
+
+                # 将完整响应添加到历史并进行压缩
+                if full_response:
+                    full_text = "".join(full_response)
+                    server_logger.info(trace_id=trace_id, msg=f"full_response: {full_text}")
+            finally:
+                # 确保释放会话锁
+                pass
+
+        except PermissionError as e:
+            yield json.dumps({"error": str(e)})
+        except Exception as e:
+            handler_err(server_logger, trace_id=trace_id, err=e, err_name='test_stream')
+            yield json.dumps({"error": f"系统错误: {str(e)}"})
+
+
+test_agent_client = TestAgentClient()

+ 21 - 0
foundation/agent/workflow/test_cus_state.py

@@ -0,0 +1,21 @@
+
+from itertools import count
+from langgraph.graph import MessagesState
+
+
+
+
+class TestCusState(MessagesState):
+    """
+     第二步:定义状态结构
+    """
+    route_next: str                                  # 下一个节点  
+    
+    session_id: str                                  # 会话id  
+    trace_id: str                                    # 日志链路跟踪id
+    user_input: str                                  # 用户输入问题    
+    context: str                                     # 上下文数据
+    task_prompt_info: str                            # 任务提示
+
+
+

+ 192 - 0
foundation/agent/workflow/test_workflow_graph.py

@@ -0,0 +1,192 @@
+
+# !/usr/bin/python
+# -*- coding: utf-8 -*-
+'''
+@Project    : 
+@File       :workflow_graph.py
+@IDE        :Cursor
+@Author     :LINGMIN
+@Date       :2025/08/10 18:00
+'''
+
+from foundation.agent.workflow.test_cus_state import TestCusState
+from foundation.agent.workflow.test_workflow_node import TestWorkflowNode
+from langgraph.graph import START, StateGraph, END
+from langgraph.checkpoint.memory import MemorySaver
+from foundation.logger.loggering import server_logger
+from typing import AsyncGenerator
+import time
+from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
+from foundation.utils.common import return_json, handler_err
+import json
+from foundation.schemas.test_schemas import TestForm, FormConfig
+
+
+class TestWorkflowGraph:
+    """
+        工作流图
+    """
+    def __init__(self):
+        self.workflow_node = TestWorkflowNode()
+        self.checkpoint_saver = MemorySaver()
+        self.app = self.init_workflow_graph()
+        # 将生成的图片保存到文件
+        #self.write_graph()
+
+
+
+
+    def init_workflow_graph(self):
+        """
+            初始化工作流图
+            使用 graph.get_state 和 get_state_history 检查状态。
+            启用 debug=True 查看详细日志。
+            使用 graph.get_graph().to_dot() 可视化状态图。
+        """
+        # 构建工作流图  创建状态图 , state_update_method="merge"
+        workflow = StateGraph(TestCusState)
+
+
+        ######分支2、代理Agent  supervisor_agent ##################################    
+        # 节点:  代理 agent 节点
+        workflow.add_node("supervisor_agent", self.workflow_node.supervisor_agent)
+        # agent节点1: 纯生成类问题
+        workflow.add_node("chat_box_generate", self.workflow_node.chat_box_generate)
+        # agent节点2:
+        workflow.add_node("common_agent", self.workflow_node.common_agent_node)
+
+
+        ###### 节点分支线条 ##################################    
+        # 固定问题识别
+        workflow.add_edge(START, "supervisor_agent")  
+        # 在图状态中填充 ‘next’字段,路由到具体的某个节点或结束图的运行,从来指定如何执行接下来的任务。
+        workflow.add_conditional_edges(source="supervisor_agent", 
+                path=lambda state: state["route_next"],
+                # 显式映射每个返回值到目标节点
+                path_map={
+                    "chat_box_generate": "chat_box_generate",
+                    "common_agent": "common_agent",
+                
+                }
+        )
+
+        supervisor_members_list = ["chat_box_generate" , "common_agent"] 
+
+         # 每个子代理 在完成后总是向主管 “汇报”
+        for agent_member in supervisor_members_list:
+            workflow.add_edge(agent_member, END) # 直接结束
+            #workflow.add_edge(agent_member, "supervisor_agent") # 回到路由 继续 判断执行
+
+       
+        #编译图
+        app = workflow.compile(checkpointer=self.checkpoint_saver)
+        #print(app.get_graph().draw_ascii())
+        server_logger.info(f"【图工作流构建完成】app={app}")
+        return app
+
+
+
+
+    async def handle_query_stream(self, param: TestForm, trace_id: str)-> AsyncGenerator[str, None]:
+        """
+        根据场景获取智能体反馈 (SSE流式响应)
+        """
+        try:
+
+            # 提取参数
+            user_input = param.input
+            session_id = param.config.session_id
+            context = param.context
+
+            
+            human_messages = [HumanMessage(content=user_input)]
+            # 完整的初始状态
+            initial_state = {
+                "messages": human_messages,
+                "session_id": session_id,                                # 会话id  
+                "trace_id": trace_id,                                  # 日志链路跟踪id
+                "task_prompt_info": {},                                    
+                "context": context ,                                    # 上下文数据
+                "user_input": user_input,
+            }
+            # 唯一的任务 ID(模拟 session_id / thread_id)
+            config = {"configurable": {"thread_id": session_id},
+                    "runnable_kwargs":{"recursion_limit": 50}
+            }
+            server_logger.info("======================== 启动新任务 ===========================")  #, interrupt_before=["user_confirm_task_planning"]
+
+            full_response = []
+            buffer = []
+            last_flush_time = time.time()
+            events = self.app.astream_events(initial_state, 
+                        config=config , 
+                        version="v1",  # 确保使用正确版本
+                        stream_mode="values"  # 或者 "updates"
+            )
+            # 流式处理事件
+            async for event in events:
+                #server_logger.info(trace_id=trace_id, msg=f"→ 事件类型: {event['event']}")
+                #server_logger.info(trace_id=trace_id, msg=f"→ 事件数据: {event['data']}")
+                
+                # 处理聊天模型流式输出
+                if event['event'] == 'on_chat_model_stream':
+                    if 'chunk' in event['data']:
+                        chunk = event['data']['chunk']
+                        if hasattr(chunk, 'content'):
+                            content = chunk.content
+                            full_response.append(content)
+                            
+                            # 缓冲管理策略
+                            buffer.append(content)
+                            current_time = time.time()
+                            
+                            # 刷新条件
+                            should_flush = (
+                                len(buffer) >= 3 or  # 达到最小块数
+                                (current_time - last_flush_time) > 0.5 or  # 超时
+                                any(content.endswith(('.', '。', '!', '?', '\n', ';', ';', '?', '!')) for content in buffer)  # 自然断点
+                            )
+                            
+                            if should_flush:
+                                combined = ''.join(buffer)
+                                yield combined
+                                
+                                buffer.clear()
+                                last_flush_time = current_time
+                
+                # 也可以处理其他类型的事件
+                # elif event['event'] == 'on_chain_stream':
+                #     server_logger.info(trace_id=trace_id, msg=f"链式处理: {event['data']}")
+                
+                # elif event['event'] == 'on_tool_stream':
+                #     server_logger.info(trace_id=trace_id, msg=f"工具调用: {event['data']}")
+            
+            # 处理剩余缓冲内容
+            if buffer:
+                yield ''.join(buffer)
+            
+            # 将完整响应添加到历史并进行压缩
+            if full_response:
+                full_text = "".join(full_response)
+                server_logger.info(trace_id=trace_id, msg=f"full_response: {full_text}", log_type="graph/stream")
+            
+        except Exception as e:
+            handler_err(server_logger, trace_id=trace_id, err=e, err_name='graph/stream')
+            yield json.dumps({"error": f"系统错误: {str(e)}"})
+
+
+
+
+    def write_graph(self):
+        """
+            将图写入文件
+        """
+        # 
+        graph_png = self.app.get_graph().draw_mermaid_png()
+        with open("build_graph_app.png", "wb") as f:
+            f.write(graph_png)
+        server_logger.info(f"【图工作流写入文件完成】")
+
+
+# 实例化
+test_workflow_graph = TestWorkflowGraph()

+ 110 - 0
foundation/agent/workflow/test_workflow_node.py

@@ -0,0 +1,110 @@
+
+
+# !/usr/bin/python
+# -*- coding: utf-8 -*-
+'''
+@Project    : 
+@File       :workflow_node.py
+@IDE        :Cursor
+@Author     :LINGMIN
+@Date       :2025/08/10 18:00
+'''
+
+
+import json
+import sys
+from foundation.logger.loggering import server_logger
+from foundation.utils.common import handler_err
+from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
+from langchain_core.prompts import ChatPromptTemplate
+from foundation.agent.workflow.test_cus_state import TestCusState
+from foundation.agent.generate.test_intent import intent_identify_client
+from foundation.agent.test_agent import test_agent_client
+from foundation.schemas.test_schemas import FormConfig
+from foundation.agent.generate.model_generate import test_generate_model_client
+
+
+
+
+class TestWorkflowNode:
+    """
+        工作流节点定义
+    """
+    def __init__(self):
+        """初始化模型和会话管理"""
+
+    
+
+    def supervisor_agent(self , state: TestCusState):
+        """
+            每个代理都与一个 Supervisor 代理通信(主管代理)。由  Supervisor 代理决定接下来应调用哪个代理
+            :param state:
+            :return:
+        """
+        session_id = state["session_id"]
+        trace_id = state["trace_id"]
+        user_input = state["user_input"]
+        route_next = state.get("route_next")
+        
+        server_logger.info(trace_id=trace_id, msg=f"\n===================================[Supervisor].begin-route_next:{route_next}=============================")
+        
+        config = {
+            "session_id": session_id
+        }
+        # 格式化输出,智能格式化输出
+        route_next = intent_identify_client.recognize_intent(trace_id=trace_id , config=config , input=user_input)
+        server_logger.info(trace_id=trace_id, msg=f"[Supervisor].intent_identify_client.recognize_intent:{route_next}")
+        if route_next not in ["chat_box_generate" , "common_agent"]:
+            route_next = "chat_box_generate"
+
+        
+        server_logger.info(trace_id=trace_id, msg=f"\n===================================[Supervisor].end-route_next:{route_next}=============================")
+        return {
+            "route_next": route_next
+        }
+
+
+
+    async def common_agent_node(self , state: TestCusState):
+        """
+            通用代理节点
+            :param state:
+            :return:
+        """
+        session_id = state["session_id"]
+        trace_id = state["trace_id"]
+        user_input = state["user_input"]
+        config_param = FormConfig(session_id=session_id)
+        task_prompt_info = {"task_prompt": ""}
+        response_content = await test_agent_client.handle_query(trace_id=trace_id , config_param=config_param, 
+                                                                task_prompt_info=task_prompt_info, 
+                                                                input_query=user_input, context=None)
+        messages = [AIMessage(content=response_content, name="common_agent_node")]
+        return {
+            "messages": messages,
+            "previous_agent": "common_agent",
+            "route_next": "FINISH"   # ✅ 直接结束流程
+        }
+    
+
+    def chat_box_generate(self , state: TestCusState) -> dict:
+        """
+            模型生成节点(纯生成类问题)
+            :param state:
+            :return:
+        """
+        session_id = state["session_id"]
+        trace_id = state["trace_id"]
+        user_input = state["user_input"]
+        task_prompt_info = state["task_prompt_info"]
+        task_prompt_info["task_prompt"] = ""
+        response_content = test_generate_model_client.get_model_generate_invoke(trace_id=trace_id , task_prompt_info=task_prompt_info, input_query=user_input)
+        messages = [AIMessage(content=response_content , name="chat_box_generate")]
+        server_logger.info(trace_id=trace_id, msg=f"【result】: {response_content}", log_type="chat_box_generate")
+        return {
+            "messages": messages,
+            "route_next": "FINISH"   # ✅ 直接结束流程
+        }
+
+
+

+ 71 - 0
foundation/base/async_redis_lock.py

@@ -0,0 +1,71 @@
+import asyncio
+import time
+import uuid
+from typing import Optional
+from foundation.logger.loggering import server_logger
+
+class AsyncRedisLock:
+    def __init__(self, redis_client, lock_name: str, expire_time: int = 30):
+        """
+        :param redis_client: 异步 Redis 客户端连接
+        :param lock_name: 锁的名称
+        :param expire_time: 锁的过期时间(秒)
+        """
+        self.redis = redis_client
+        self.lock_name = lock_name
+        self.expire_time = expire_time
+        self.identifier = str(uuid.uuid4())  # 唯一标识,用于安全释放锁
+
+    async def acquire(self, timeout: float = 10) -> bool:
+        """
+        异步获取锁
+        :param timeout: 获取锁的超时时间(秒)
+        :return: 是否成功获取锁
+        """
+        end = time.time() + timeout
+        while time.time() < end:
+            #server_logger.info(f"尝试获取锁: {self.lock_name},{self.identifier},{self.expire_time}")
+            # 尝试获取锁
+            if await self.redis.set(
+                self.lock_name, 
+                self.identifier, 
+                nx=True, 
+                ex=self.expire_time
+            ):
+                return True
+            await asyncio.sleep(0.001)  # 短暂等待后重试
+        return False
+
+    async def release(self) -> bool:
+        """
+        异步释放锁
+        :return: 是否成功释放锁
+        """
+        # 使用 Lua 脚本保证原子性
+        unlock_script = """
+        if redis.call("get", KEYS[1]) == ARGV[1] then
+            return redis.call("del", KEYS[1])
+        else
+            return 0
+        end
+        """
+        try:
+            # 注意这里参数传递方式与同步版本不同
+            result = await self.redis.eval(
+                unlock_script, 
+                1 , 
+                self.lock_name, 
+                self.identifier
+            )
+            return bool(result)
+        except Exception as e:
+            print(f"Error releasing lock: {e}")
+            return False
+
+    async def __aenter__(self):
+        if not await self.acquire():
+            raise Exception("Could not acquire lock")
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        await self.release()

+ 35 - 0
foundation/base/config.py

@@ -0,0 +1,35 @@
+# !/usr/bin/python
+# -*- encoding: utf-8 -*-
+"""
+@Time    :   2025/07/10 14:40
+@Author  :   
+@File    :   config.py
+@Software:   VScode
+@Desc    :   None
+"""
+from configparser import ConfigParser
+
+
+class ConfigHandler:
+    def __init__(self, config_file=""):
+        self.config = ConfigParser()
+        self.config.read(config_file, encoding='utf-8')
+
+    def get(self, section, option, default=None):
+        try:
+            if section == "before":
+                option = f"online_{option}" if bool(self.config.get("general", "is_online")) else f"inline_{option}"
+                value = self.config.get(section, option)
+            else:
+                value = self.config.get(section, option)
+            if "#" in value:
+                value = value.split('#')[0].strip()
+        except Exception as err:
+            value = default
+        return value
+
+    def getboolean(self, section, option):
+        return self.config.getboolean(section, option)
+
+
+config_handler = ConfigHandler("./config/config.ini")

+ 39 - 0
foundation/base/redis_config.py

@@ -0,0 +1,39 @@
+# !/usr/bin/python
+# -*- coding: utf-8 -*-
+'''
+@Project    : lq-agent-api
+@File       :redis_config.py
+@IDE        :PyCharm
+@Author     :
+@Date       :2025/7/21 13:44
+'''
+
+from dataclasses import dataclass
+from foundation.base.config import config_handler
+
+
+@dataclass
+class RedisConfig:
+    """Redis 连接配置"""
+    url: str = "redis://127.0.0.1:6379"
+    host: str = "127.0.0.1"
+    port: int = 6379
+    password: str = None
+    db: int = 0
+    max_connections: int = 50
+    session_prefix: str = "session:"
+    lock_prefix: str = "lock:"
+    session_ttl: int = 3600  # 会话过期时间(秒)
+
+
+
+def load_config_from_env() -> tuple[RedisConfig]:
+    """从环境变量加载配置"""
+    redis_config = RedisConfig(
+        url=config_handler.get("redis", "REDIS_URL", "redis://127.0.0.1:6379"),
+        password=config_handler.get("redis", "REDIS_PASSWORD"),
+        db=int(config_handler.get("redis", "REDIS_DB", "0")),
+        max_connections=int(config_handler.get("redis", "REDIS_MAX_CONNECTIONS", "50"))
+    )
+    return redis_config
+

+ 212 - 0
foundation/base/redis_connection.py

@@ -0,0 +1,212 @@
+# !/usr/bin/python
+# -*- coding: utf-8 -*-
+'''
+@Project    : lq-agent-api
+@File       :redis_connection.py.py
+@IDE        :PyCharm
+@Author     :
+@Date       :2025/7/21 15:07
+'''
+import redis                     # 同步专用
+from redis import asyncio as aioredis
+
+
+from typing import Optional, Protocol, Dict, Any
+from foundation.base.redis_config import RedisConfig
+from foundation.base.redis_config import load_config_from_env
+from foundation.logger.loggering import server_logger
+from typing import Dict, Any, List
+from typing import Tuple, Optional
+from langchain_community.storage import RedisStore
+
+class RedisConnection(Protocol):
+    """
+    Redis 接口协议
+    """
+    async def get(self, key: str) -> Any: ...
+
+    async def set(self, key: str, value: Any, ex: Optional[int] = None, nx: bool = False) -> bool: ...
+
+    async def hget(self, key: str, field: str) -> Any: ...
+
+    async def hset(self, key: str, field: str, value: Any) -> int: ...
+
+    async def hmset(self, key: str, mapping: Dict[str, Any]) -> bool: ...
+
+    async def hgetall(self, key: str) -> Dict[str, Any]: ...
+
+    async def delete(self, *keys: str) -> int: ...
+
+    async def exists(self, key: str) -> int: ...
+
+    async def expire(self, key: str, seconds: int) -> bool: ...
+
+    async def scan(self, cursor: int, match: Optional[str] = None, count: Optional[int] = None) -> tuple[
+        int, list[str]]: ...
+
+    async def eval(self, script: str, keys: list[str], args: list[str]) -> Any: ...
+    async def close(self) -> None: ...
+
+
+
+
+
+class RedisAdapter(RedisConnection):
+    """
+    Redis 适配器
+    """
+    def __init__(self, config: RedisConfig):
+        self.config = config
+        # 用于普通Redis 操作存储
+        self._redis = None
+        # 用于 langchain RedisStore 存储
+        self._langchain_redis_client = None
+
+    async def connect(self):
+        """创建Redis连接"""
+        self._redis = await aioredis.from_url(
+            self.config.url,
+            password=self.config.password,
+            db=self.config.db,
+            encoding="utf-8",
+            decode_responses=True,
+            max_connections=self.config.max_connections
+        )
+        # 用于 langchain RedisStore 存储  
+        # 必须设为 False(LangChain 需要 bytes 数据)
+        self._langchain_redis_client = aioredis.from_url(
+            self.config.url,
+            password=self.config.password,
+            db=self.config.db,
+            encoding="utf-8",
+            decode_responses=False,
+            max_connections=self.config.max_connections
+        )
+       
+        # ✅ 使用同步 Redis 客户端
+        # self._langchain_redis_client = redis.Redis.from_url(
+        #     self.config.url,
+        #     password=self.config.password,
+        #     db=self.config.db,
+        #     decode_responses=False,  # LangChain 需要 bytes
+        # )
+        #错误:Expected Redis client, got Redis instead 
+        # self._langchain_redis_client = async_redis.from_url(
+        #         self.config.url,
+        #         password=self.config.password,
+        #         db=self.config.db,
+        #         decode_responses=False
+        #     )
+      
+        return self
+
+    async def get(self, key: str) -> Any:
+        return await self._redis.get(key)
+
+    async def set(self, key: str, value: Any, ex: Optional[int] = None, nx: bool = False) -> bool:
+        return await self._redis.set(key, value, ex=ex, nx=nx)
+
+    async def hget(self, key: str, field: str) -> Any:
+        return await self._redis.hget(key, field)
+
+    async def hset(self, key: str, field: str, value: Any) -> int:
+        return await self._redis.hset(key, field, value)
+
+    async def hmset(self, key: str, mapping: Dict[str, Any]) -> bool:
+        return await self._redis.hmset(key, mapping)
+
+    async def hgetall(self, key: str) -> Dict[str, Any]:
+        return await self._redis.hgetall(key)
+
+    async def delete(self, *keys: str) -> int:
+        return await self._redis.delete(*keys)
+
+    async def exists(self, key: str) -> int:
+        return await self._redis.exists(key)
+
+    async def expire(self, key: str, seconds: int) -> bool:
+        return await self._redis.expire(key, seconds)
+
+    async def scan(self, cursor: int, match: Optional[str] = None, count: Optional[int] = None) -> tuple[
+        int, list[str]]:
+        return await self._redis.scan(cursor, match=match, count=count)
+    
+    async def eval(self, script: str, numkeys: int, *keys_and_args: str) -> Any:
+        return await self._redis.eval(script, numkeys, *keys_and_args) #  解包成独立参数
+
+
+    def get_langchain_redis_client(self):
+        return self._langchain_redis_client
+
+    async def close(self) -> None:
+        if self._redis:
+            await self._redis.close()
+            await self._redis.wait_closed()
+        if self._langchain_redis_client:
+            await self._langchain_redis_client.close()
+            await self._langchain_redis_client.wait_closed()
+
+
+
+
+class RedisConnectionFactory:
+    """
+    redis 连接工厂函数
+    """
+    _connections: Dict[str, RedisConnection] = {}
+    _stores: Dict[str, RedisStore] = {}
+
+    @classmethod
+    async def get_connection(cls) -> RedisConnection:
+        """获取Redis连接(单例模式)"""
+        # 加载配置
+        redis_config = load_config_from_env()
+        #server_logger.info(f"redis_config={redis_config}")
+        # 使用配置参数生成唯一标识
+        conn_id = f"{redis_config.url}-{redis_config.db}"
+
+        if conn_id not in cls._connections:
+            adapter = RedisAdapter(redis_config)
+            await adapter.connect()
+            cls._connections[conn_id] = adapter
+        return cls._connections[conn_id]
+
+    @classmethod
+    async def get_redis_store(cls) -> RedisStore:
+        """获取 LangChain RedisStore 实例"""
+        # 加载配置
+        redis_config = load_config_from_env()
+        conn = await cls.get_connection()  # 或通过其他方式获取
+        client = conn.get_langchain_redis_client()
+        return client
+    @classmethod
+    async def get_langchain_redis_store(cls) -> RedisStore:
+        """获取 LangChain RedisStore 实例
+            目前该方法存在问题
+        """
+        # 加载配置
+        redis_config = load_config_from_env()
+        # 使用配置参数生成唯一标识
+        store_id = f"{redis_config.url}-{redis_config.db}"
+        if store_id not in cls._stores:
+            conn = await cls.get_connection()  # 或通过其他方式获取
+            client = conn.get_langchain_redis_client()
+            store = client
+            server_logger.info(f"client={client}")
+            server_logger.info(f"store={dir(store)}")
+            cls._stores[store_id] = store
+        return cls._stores[store_id]
+
+    @classmethod
+    async def close_all(cls):
+        """关闭所有Redis连接"""
+        for conn in cls._connections.values():
+            await conn.close()
+        cls._connections = {}
+
+    @classmethod
+    def get_connection_count(cls) -> int:
+        """获取当前连接数"""
+        return len(cls._connections)
+
+

+ 67 - 0
foundation/base/redis_lock.py

@@ -0,0 +1,67 @@
+# !/usr/bin/python
+# -*- encoding: utf-8 -*-
+"""
+@Time    :   2025/07/30 14:40
+@Author  :    
+@File    :   RedisLock.py
+@Software:   VScode
+@Desc    :   None
+"""
+
+
+import time
+import uuid
+
+class RedisLock:
+    """
+    Redis 锁类
+    """
+    
+    def __init__(self, redis_client, lock_name, expire_time=30):
+        """
+        :param redis_client: Redis 客户端连接
+        :param lock_name: 锁的名称
+        :param expire_time: 锁的过期时间(秒)
+        """
+        self.redis = redis_client
+        self.lock_name = lock_name
+        self.expire_time = expire_time
+        self.identifier = str(uuid.uuid4())  # 唯一标识,用于安全释放锁
+
+    def acquire(self, timeout=10):
+        """
+        获取锁
+        :param timeout: 获取锁的超时时间(秒)
+        :return: 是否成功获取锁
+        """
+        end = time.time() + timeout
+        while time.time() < end:
+            
+            # 尝试获取锁
+            if self.redis.set(self.lock_name, self.identifier, nx=True, ex=self.expire_time):
+                return True
+            time.sleep(0.001)  # 短暂等待后重试
+        return False
+
+    def release(self):
+        """
+        释放锁
+        """
+        # 使用 Lua 脚本保证原子性
+        unlock_script = """
+        if redis.call("get", KEYS[1]) == ARGV[1] then
+            return redis.call("del", KEYS[1])
+        else
+            return 0
+        end
+        """
+        self.redis.eval(unlock_script, 1, self.lock_name, self.identifier)
+
+
+    def __enter__(self):
+        if not self.acquire():
+            raise Exception("Could not acquire lock")
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.release()

+ 39 - 0
foundation/core_enums.py

@@ -0,0 +1,39 @@
+
+from enum import Enum
+
+
+class ErrorCodeEnum(Enum):
+    """
+        错误码枚举定义
+    """
+    SUCCESS = ('100000', '成功')
+    ERROR = ('100500', '服务异常')
+
+    
+    SESSION_ID_EMPTY = ('100001', '会话ID为空')
+    BUSINSESS_SCENE_ERROR = ('100002', '业务场景错误')
+    INPUT_INFO_EMPTY = ('100003', '用户输入为空')
+    BUSINSESS_SCENE_EMPTY = ('100004', '业务场景为空')
+    BUSINSESS_SCENE_PROMPT_FILE_EMPTY = ('100005', '业务场景提示词文件为空')
+    BUSINSESS_SCENE_PROMPT_FILE_NOT_EXISTS = ('100006', '业务场景提示词文件不存在')
+    BUSINSESS_SCENE_PROMPT_FILE_READ_ERROR = ('100007', '业务场景提示词文件读取异常')
+    
+    def __init__(self, code : str, desc : str):
+        self.code = code
+        self.desc = desc
+
+    
+    def get_item_by_code(self , code : str):
+        """
+            根据code 找枚举项
+        """
+        for item in list(ErrorCodeEnum):
+            if item.code == code:
+                return item
+        return None
+
+    
+
+    def __str__(self) -> str:
+        return self.code + ":"  + self.desc
+    

+ 145 - 0
foundation/logger/loggering.py

@@ -0,0 +1,145 @@
+# !/usr/bin/ python
+# -*- coding: utf-8 -*-
+'''
+@Project    : lq-agent-api
+@File       :loggering.py
+@IDE        :PyCharm
+@Author     :
+@Date       :2025/7/11 10:48
+'''
+from foundation.base.config import config_handler
+
+
+import os
+import sys
+import logging
+from logging.handlers import RotatingFileHandler
+
+
+class CompatibleLogger(logging.Logger):
+    """
+    完全兼容的日志记录器,继承自 logging.Logger
+    提供按级别分文件的日志记录,每个文件包含指定级别及更高级别的日志
+    """
+
+    def __init__(self, name, log_dir="logs", console_output=True,
+                 file_max_mb=10, backup_count=5,
+                 log_format=None, datefmt=None):
+        # 初始化父类
+        super().__init__(name)
+        self.setLevel(logging.DEBUG)  # 设置logger自身为最低级别
+
+        # 存储配置
+        self.log_dir = log_dir
+        self.console_output = console_output
+        self.file_max_bytes = file_max_mb * 1024 * 1024
+        self.backup_count = backup_count
+
+        # 设置日志格式
+        self._set_formatter(log_format, datefmt)
+
+        # 确保日志目录存在
+        os.makedirs(log_dir, exist_ok=True)
+
+        # 清除可能存在的旧处理器
+        if self.hasHandlers():
+            self.handlers.clear()
+
+        # 创建文件处理器
+        self._create_file_handlers()
+
+        # 创建控制台处理器
+        if console_output:
+            self._create_console_handler()
+
+    def _set_formatter(self, log_format, datefmt):
+        """设置日志格式"""
+        if log_format is None:
+            log_format = 'P%(process)d.T%(thread)d | %(asctime)s | %(levelname)-8s | %(trace_id)-10s | %(log_type)-5s | %(message)s'
+
+        if datefmt is None:
+            datefmt = '%Y-%m-%d %H:%M:%S'
+
+        self.formatter = logging.Formatter(log_format, datefmt)
+
+    def _create_file_handlers(self):
+        """为每个日志级别创建文件处理器,每个文件包含该级别及更高级别的日志"""
+        level_files = {
+            logging.DEBUG: os.path.join(self.log_dir, "data_governance_debug.log"),
+            logging.INFO: os.path.join(self.log_dir, "data_governance_info.log"),
+            logging.WARNING: os.path.join(self.log_dir, "data_governance_warning.log"),
+            logging.ERROR: os.path.join(self.log_dir, "data_governance_error.log"),
+            logging.CRITICAL: os.path.join(self.log_dir, "data_governance_critical.log"),
+        }
+
+        for level, filename in level_files.items():
+            handler = RotatingFileHandler(
+                filename=filename,
+                mode='a',
+                maxBytes=self.file_max_bytes,
+                backupCount=self.backup_count,
+                encoding='utf-8'
+            )
+            handler.setLevel(level)  # 设置级别为对应文件级别
+            handler.setFormatter(self.formatter)
+            # 为每个级别的日志文件都添加一个筛选器,确保记录该级别及其更高级别
+            handler.addFilter(lambda record, lvl=level: record.levelno >= lvl)
+            self.addHandler(handler)
+
+    def _create_console_handler(self):
+        """创建控制台日志处理器"""
+        console_handler = logging.StreamHandler(sys.stdout)
+        console_handler.setLevel(logging.INFO)
+        console_handler.setFormatter(self.formatter)
+        self.addHandler(console_handler)
+
+    def _log_with_context(self, level, msg, trace_id, log_type, *args, **kwargs):
+        """统一的日志记录方法"""
+        extra = kwargs.get('extra', {})
+        extra.update({
+            'trace_id': trace_id,
+            'log_type': log_type
+        })
+        kwargs['extra'] = extra
+        super().log(level, msg, *args, **kwargs)
+    
+
+
+    def debug(self, msg, *args, trace_id="", log_type="system", **kwargs):
+        self._log_with_context(logging.DEBUG, msg, trace_id, log_type, *args, **kwargs)
+
+    def info(self, msg, *args, trace_id="", log_type="system", **kwargs):
+        self._log_with_context(logging.INFO, msg, trace_id, log_type, *args, **kwargs)
+
+    def warning(self, msg, *args, trace_id="", log_type="system", **kwargs):
+        self._log_with_context(logging.WARNING, msg, trace_id, log_type, *args, **kwargs)
+
+    def error(self, msg, *args, trace_id="", log_type="system", **kwargs):
+        self._log_with_context(logging.ERROR, msg, trace_id, log_type, *args, **kwargs)
+    
+    def exception(self, msg, *args, trace_id="", log_type="system", exc_info=True, **kwargs):
+        """记录异常信息,包含堆栈跟踪"""
+        extra = kwargs.get('extra', {})
+        extra.update({
+            'trace_id': trace_id,
+            'log_type': log_type
+        })
+        kwargs['extra'] = extra
+        kwargs['exc_info'] = exc_info  # 确保异常信息被记录
+        super().error(msg, *args, **kwargs)  # 使用 error 级别记录异常
+
+    def critical(self, msg, *args, trace_id="", log_type="system", **kwargs):
+        self._log_with_context(logging.CRITICAL, msg, trace_id, log_type, *args, **kwargs)
+
+
+server_logger = CompatibleLogger(
+    name="agent_log",
+    log_dir=config_handler.get("log", "LOG_FILE_PATH" , "logs"),
+    console_output=False if config_handler.get("log", "CONSOLE_OUTPUT" , "True").upper() == "FALSE" else True,
+    file_max_mb=int(config_handler.get("log", "LOG_FILE_MAX_MB", "10")),
+    backup_count=int(config_handler.get("log", "LOG_BACKUP_COUNT", "5"))
+)
+
+
+# 设置日志级别
+server_logger.info("logging initialized")

+ 65 - 0
foundation/models/base_online_platform.py

@@ -0,0 +1,65 @@
+
+
+import os
+from foundation.base.config import config_handler
+from foundation.logger.loggering import server_logger
+from openai import OpenAI
+
+
+class BaseApiPlatform:
+
+    def __init__(self):
+        self.api_key = os.getenv("API_KEY")
+        self.headers = {
+            "Authorization": f"Bearer {self.api_key}",
+            "Content-Type": "application/json"
+        }
+
+    
+    def get_openai_client(self , model_server_url , api_key):
+        """
+            获取openai模型 client
+        """
+        server_logger.info(f"get_openai_client -> model_server_url:{model_server_url},api_key:{api_key}")
+        # 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx",
+        client = OpenAI(
+            api_key=api_key,
+            base_url=model_server_url,
+        )
+        return client
+
+
+
+    def get_chat_model(self):
+        """
+        获取chat模型
+        :return:
+        """
+        raise NotImplementedError
+    
+
+
+    def get_embeddings(self, texts: list[str]):
+        """
+        向量化文本
+        :param texts: 文本列表
+        :return: 向量列表
+        """
+        raise NotImplementedError
+
+
+    def rerank(self, input_query: str, documents: list, top_n: int = 5, return_documents: bool = True):
+        """
+            使用 BGE 重排序模型进行相关性打分
+            使用重排序模型对候选文档进行排序
+            :param query: 用户查询语句
+            :param documents: 候选文本列表
+            :param top_n: 返回前 N 个结果
+            :return: 排序后的结果列表,包含文本和相似度分数
+        """
+        raise NotImplementedError
+
+    
+
+
+    

+ 194 - 0
foundation/models/silicon_flow.py

@@ -0,0 +1,194 @@
+
+
+
+import os
+import sys
+sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
+import requests
+from dotenv import load_dotenv
+from foundation.models.base_online_platform import BaseApiPlatform
+from foundation.base.config import config_handler
+from foundation.logger.loggering import server_logger
+from foundation.utils.common import handler_err
+from openai import OpenAI
+from langchain_core.embeddings import Embeddings
+from chromadb.utils.embedding_functions import EmbeddingFunction
+from typing import List
+import numpy as np
+
+
+
+class SiliconFlowEmbeddings(Embeddings):
+    """
+    LangChain 兼容的硅基流动嵌入模型客户端
+    使用方式:
+        embeddings = SiliconFlowEmbeddings(
+            model="netease-youdao/bce-embedding-base_v1",
+            api_key="sk-..."
+        )
+        vectors = embeddings.embed_documents(["文本1", "文本2"])
+    """
+    def __init__(self, base_url: str, api_key: str, embed_model_id: str):
+        self.model = embed_model_id
+        self.api_key = api_key
+        if not self.api_key:
+            raise ValueError("必须提供 api_key 或设置环境变量 SILICONFLOW_API_KEY")
+        self.client = OpenAI(
+            api_key=self.api_key,
+            base_url=base_url
+        )
+
+    def embed_documents(self, texts: List[str]) -> List[List[float]]:
+        """对文档列表进行向量化"""
+        if not texts:
+            return []
+        response = self.client.embeddings.create(
+            model=self.model,
+            input=texts
+        )
+        return [item.embedding for item in response.data]
+
+    def embed_query(self, text: str) -> List[float]:
+        """对查询文本进行向量化"""
+        return self.embed_documents([text])[0]
+
+
+
+
+class ChromaSiliconFlowEmbedding(EmbeddingFunction):
+    """
+        将SiliconFlowEmbeddings适配到ChromaDB的嵌入函数接口
+    """
+    def __init__(self, embeddings):
+        self.embeddings = embeddings
+
+    def __call__(self, input: List[str]) -> List[List[float]]:
+        raw_embeddings = self.embeddings.embed_documents(input)  # 关键添加
+        return self.normalized_embeddings(raw_embeddings)
+
+    def embed_documents(self, input: List[str]) -> List[List[float]]:
+        raw_embeddings = self.embeddings.embed_documents(input)  # 关键添加
+        return self.normalized_embeddings(raw_embeddings)
+
+    def embed_query(self, text: str) -> List[float]:
+        """对查询文本进行向量化"""
+        raw_embeddings = self.embeddings.embed_documents([text])[0]
+        return self.normalized_embeddings(raw_embeddings)
+
+    
+    def normalized_embeddings(self , raw_embeddings):
+        # L2归一化处理
+        normalized = []
+        for vector in raw_embeddings:
+            norm = np.linalg.norm(vector)
+            if norm > 0:
+                normalized.append(vector / norm)
+            else:
+                normalized.append(vector)
+        return normalized
+
+
+
+class SiliconFlowAPI(BaseApiPlatform):
+    def __init__(self , trace_id=""):
+        self.trace_id = trace_id
+        self.config_prefix = "siliconflow"
+        self.model_server_url = config_handler.get(self.config_prefix, "SLCF_MODEL_SERVER_URL")
+        self.api_key = config_handler.get(self.config_prefix, "SLCF_API_KEY")
+        self.embed_url = self.model_server_url +"/embeddings" #/embeddings 
+        self.rerank_url = self.model_server_url +"/rerank"  #/rerank
+        self.embed_model_id = config_handler.get(self.config_prefix, "SLCF_EMBED_MODEL_ID")
+        self.rerank_model_id = config_handler.get(self.config_prefix, "SLCF_REANKER_MODEL_ID")
+        server_logger.info(f"SiliconFlowAPI -> embed_url:{self.embed_url},rerank_url:{self.rerank_url}")
+        server_logger.info(f"SiliconFlowAPI -> embed_model_id:{self.embed_model_id},rerank_model_id:{self.rerank_model_id}")
+        self.client = self.get_openai_client(self.model_server_url, self.api_key)
+        # 创建LangChain兼容的嵌入对象
+        langchain_embeddings = SiliconFlowEmbeddings(base_url = self.model_server_url , api_key=self.api_key , embed_model_id=self.embed_model_id)
+        self.embed_model = ChromaSiliconFlowEmbedding(embeddings=langchain_embeddings)
+
+
+
+    def get_embed_model(self):
+        """
+            获取嵌入模型
+        """
+        return self.embed_model
+
+
+    def get_embeddings(self, texts: list[str]):
+        """获取文本向量(embedding)"""
+        try:
+            response = self.client.embeddings.create(
+                model=self.embed_model_id,  # 指定向量模型
+                input=texts if isinstance(texts, list) else [texts]
+            )
+            # 返回 embeddings 列表
+            return [data.embedding for data in response.data]
+        except Exception as e:
+            handler_err(server_logger, trace_id=self.trace_id, err=e, err_name='Embedding 调用失败')
+            raise
+
+
+    def rerank(self, input_query: str, documents: list, top_n: int = 5, return_documents: bool = True):
+        """
+            使用 BGE 重排序模型进行相关性打分
+            使用重排序模型对候选文档进行排序
+            :param query: 用户查询语句
+            :param documents: 候选文本列表
+            :param top_n: 返回前 N 个结果
+            :return: 排序后的结果列表,包含文本和相似度分数
+        """
+        try:
+            headers = {
+                "Authorization": f"Bearer {self.api_key}",
+                "Content-Type": "application/json"
+            }
+            payload = {
+                "model": self.rerank_model_id,
+                "query": input_query,
+                "documents": documents,
+                "top_n": top_n,
+                "return_documents": return_documents
+            }
+            response = requests.post(self.rerank_url, json=payload, headers=headers)
+            response.raise_for_status()
+            data = response.json()
+
+            results = []
+            for item in data['results']:
+                results.append({
+                    "index": item['index'],
+                    "relevance_score": item['relevance_score'],
+                    "document": item.get('document', {}).get('text', None)
+                })
+            return results
+        except Exception as e:
+            handler_err(server_logger, trace_id=self.trace_id, err=e, err_name='重排序调用失败')
+            raise
+
+
+
+
+# 使用示例
+if __name__ == "__main__":
+    # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
+    client = SiliconFlowAPI()
+
+    # 示例1:向量化文本
+    texts = ["奶牛养殖技术", "牛肉市场价格分析"]
+    embeddings = client.get_embeddings(texts)
+    print(f"向量维度:{len(embeddings[0])}")  # 输出向量维度
+
+    # 示例2:重排序文档
+    query = "如何提高牛奶产量?"
+    documents = [
+        "奶牛饲料配比指南",
+        "牧场管理规范",
+        "牛奶加工工艺流程",
+        "提高产奶量的10个技巧"
+    ]
+    rerank_results = client.rerank(query, documents)
+    
+    print("\n重排序结果:")
+    for result in sorted(rerank_results, key=lambda x: x['relevance_score'], reverse=True):
+        print(f"{result['index']} (得分: {result['relevance_score']:.2f}): {documents[result['index']]}")

+ 97 - 0
foundation/schemas/__init__.py

@@ -0,0 +1,97 @@
+import re
+
+
+def is_number(character: str):
+    """
+    判断是否为数字
+    """
+    return bool(re.match(r'^[-+]?\d+(\.\d+)?$', character)) if character else False
+
+
+
+def check_new_parameter(check_v, key, value):
+    """
+    请求前对数据进行校验
+    params: check_v: 一个list [type,(limit_condition)]
+    type 表示 value 应该用什么类型
+    limit_condition 就是检验的条件,可以不存在,如果是list则表示为其中一个,tuple则表示在此范围内
+    key: 是指被检查的参数的名称用以打印日志及返回报错
+    value: 是现在key所对应的具体的值,也是被用来被检查的值
+    return: None or error_msg
+    example:
+      [str, "name", (1, 64)] 表示 name应是字符串类型, 长度应大于等于1小于等于64
+      [list, "sex", [0, 1,...] 表示 sex应是列表,且值应该存在于list[0, 1]中
+      [int, "year", (1, 200)] 表示 year应是整数类型, 数值应大于等于1小于等于200
+    """
+    if check_v[0] == int:
+        if is_number(str(value)):
+            value = int(value)
+    if not isinstance(value, check_v[0]):
+        return "type error, %s should be %s, but now is %s" % (key, str(check_v[0]), value)
+
+    if len(check_v) == 2:
+        if check_v[0] == str:
+            if isinstance(check_v[1], list):
+                if value not in check_v[1]:
+                    return "Invalid param, %s is %s now, not in %s" % (key, value, str(check_v[1]))
+            if isinstance(check_v[1], tuple):
+                if len(value) < check_v[1][0] or len(value) > check_v[1][1]:
+                    return "Invalid param, %s is %s now, length is %s, range from %s to %s" % (key, value, len(value),
+                                                                                               check_v[0], check_v[1])
+        if check_v[0] == int:
+            if isinstance(check_v[1], list):
+                if value not in check_v[1]:
+                    return "Invalid param, %s is %s now, not in %s" % (key, value, str(check_v[1]))
+            if isinstance(check_v[1], tuple):
+                if value < check_v[1][0] or value > check_v[1][1]:
+                    return "Invalid param, %s is %s now, range from %s to %s" % (key, value, check_v[0], check_v[1])
+        if check_v[0] == list:
+            if isinstance(check_v[1], list):
+                for v in value:
+                    if v not in check_v[1]:
+                        return "Invalid param, %s is %s now, not in %s" % (key, v, str(check_v[1]))
+
+
+class CheckParams:
+    """
+    检验参数
+    """
+
+    def __init__(self, request_body, params_dict, logger, logger_name=None):
+        self.request_body = request_body
+        self.params_dict = params_dict
+        self.logger_name = logger_name
+
+        self.res = {
+            "code": 0,
+            "message": "ok",
+            "data": {"operate_id": ""}
+        }
+        self.logger = logger
+        self.body = dict()
+
+    def start(self):
+        """
+        开始校验
+        :return:
+        """
+        for k, v in self.params_dict.items():
+            if k not in self.request_body:
+                if v[0] is True:
+                    message = f"Invalid Access, {self.logger_name} %s is not Found in request" % k
+                    self.logger.error(message)
+                    self.res["message"] = message
+                    self.res["code"] = 400
+                    return False, self.res
+                else:
+                    continue
+            check_result = check_new_parameter(v[1:], k, self.request_body[k])
+            if check_result is None:
+                self.body[k] = self.request_body[k]
+            else:
+                message = f"Invalid Access, {self.logger_name} " + check_result
+                self.logger.error(message)
+                self.res["message"] = message
+                self.res["code"] = 400
+                return False, self.res
+        return True, self.body

+ 22 - 0
foundation/schemas/test_schemas.py

@@ -0,0 +1,22 @@
+# !/usr/bin/ python
+# -*- coding: utf-8 -*-
+'''
+@Project    : lq-agent-api
+@File       :test_schemas.py
+@IDE        :PyCharm
+@Author     :
+@Date       :2025/7/11 12:41
+'''
+from typing import Optional
+from pydantic import BaseModel, constr, Field
+
+
+class FormConfig(BaseModel):
+    session_id: constr(max_length=128) =Field(description="会话id")
+
+
+
+class TestForm(BaseModel):
+    config: FormConfig
+    input: Optional[str] = Field(description="用户输入")
+    context: Optional[str]  = Field(default=None, description="参考上下文")

+ 76 - 0
foundation/utils/common.py

@@ -0,0 +1,76 @@
+# !/usr/bin/ python
+# -*- coding: utf-8 -*-
+'''
+@Project    : lq-agent-api
+@File       :common.py
+@IDE        :PyCharm
+@Author     :
+@Date       :2025/7/11 11:36
+'''
+import time
+import uuid
+from functools import wraps
+
+
+def return_json(code=0, msg='ok', business_scene=None, data=None, trace_id=str(uuid.uuid4()), data_type="text", page=0, page_size=10, *args, **kwargs):
+    res = {
+        "code": code,
+        "message": msg,
+        "page": page,
+        "page_size": page_size,
+        "trace_id": trace_id,
+        "business_scene": business_scene,
+    }
+    if data:
+        if args:
+            data += args
+        data['dataType'] = data_type
+    res['data'] = data
+
+    if kwargs:
+        res.update(kwargs)
+    return res
+
+
+def calcu_run_time(logger, name: str):
+    """
+    执行时间统计装饰器
+    :param logger: log obj
+    :param name: log name
+    :return:
+    """
+
+    def inner_fuc(func):
+        @wraps(func)
+        def calcu_wrapper(*args, **kwargs):
+            start_time = float("%.3f" % time.time())
+            logger.info(f"{name}_start_time: {start_time}")
+            result = func(*args, **kwargs)
+            end_time = float("%.3f" % time.time())
+            logger.info(f"{name}_end_time: {end_time}")
+            logger.info("request_total_cost_time: {}".format(end_time - start_time))
+            return result
+
+        return calcu_wrapper
+
+    return inner_fuc
+
+
+
+def handler_err(logger, err, trace_id: str="", err_name: str=""):
+    """
+    日志格式化
+    返回具体错误
+    报错文件
+    报错行数
+    :param logger: log obj
+    :param err: error obj
+    :param operate_id: 操作id, default=""
+    :param err_name: error name, default=""
+    """
+    trace_id = trace_id if trace_id else f"{uuid.uuid4()}"
+    logger.error(trace_id=trace_id, log_type=err_name, msg=f'error file: {err}')
+    logger.error(trace_id=trace_id, log_type=err_name, msg=f'data error file: {err.__traceback__.tb_frame.f_globals["__file__"]}')
+    logger.error(trace_id=trace_id, log_type=err_name, msg=f"data error line: {err.__traceback__.tb_lineno}")
+    logger.exception(trace_id=trace_id, log_type=err_name, msg=f"Error Stack trace:")
+

+ 63 - 0
foundation/utils/redis_utils.py

@@ -0,0 +1,63 @@
+
+import json
+from foundation.logger.loggering import server_logger
+from foundation.base.redis_connection import RedisConnectionFactory
+from foundation.base.config import config_handler
+# 缓存数据有效期 默认 3 分钟
+CACHE_DATA_EXPIRED_TIME = 3 * 60
+
+
+
+
+
+
+async def set_redis_result_cache_data(data_type: str , trace_id: str, value: str):
+    """
+      设置redis结果缓存数据
+        @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk
+        @param trace_id: 链路跟踪ID
+        @param value: 缓存数据
+    """
+    expired_time = config_handler.get("api", "CACHE_DATA_EXPIRED_TIME" , CACHE_DATA_EXPIRED_TIME)
+    key = f"{trace_id}:{data_type}"
+     # 直接获取 RedisStore
+    redis_store = await RedisConnectionFactory.get_redis_store()
+    await redis_store.set(key, value , ex=expired_time) 
+
+
+
+
+async def get_redis_result_cache_data(data_type: str , trace_id: str):
+    """
+      获取redis结果缓存数据
+        @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk
+        @param trace_id: 链路跟踪ID
+    """
+    key = f"{trace_id}:{data_type}"
+     # 直接获取 RedisStore
+    redis_store = await RedisConnectionFactory.get_redis_store()
+    value = await redis_store.get(key) 
+    return value
+
+
+
+async def get_redis_result_cache_data_and_delete_key(data_type: str , trace_id: str):
+    """
+      获取redis结果缓存数据
+        @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk
+        @param trace_id: 链路跟踪ID
+    """
+    key = f"{trace_id}:{data_type}"
+     # 直接获取 RedisStore
+    redis_store = await RedisConnectionFactory.get_redis_store()
+    value = await redis_store.get(key) 
+    server_logger.info(f"获取redis结果缓存数据: {key}-{value}")
+    if value is None:
+        return None
+    # 第一步:转成字符串(decode)
+    json_str = value.decode('utf-8')
+    # 第二步:解析 JSON
+    data = json.loads(json_str)
+    # 删除key
+    #await redis_store.delete(key)
+    return data

+ 41 - 0
foundation/utils/tool_utils.py

@@ -0,0 +1,41 @@
+import time
+from math import log
+import os
+from dotenv import load_dotenv
+from foundation.core_enums import ErrorCodeEnum
+from functools import wraps
+
+from foundation.logger.loggering import server_logger
+from foundation.utils.common import handler_err
+from foundation.base.config import config_handler
+
+# 获取当前文件的目录
+current_dir = os.path.dirname(__file__)
+# 构建到 .env 的相对路径
+conf_file_path = os.path.join(current_dir , '../',  '.env')
+#server_logger.info(f"当前目录: {conf_file_path}")
+# 加载环境变量
+load_dotenv(dotenv_path=conf_file_path)
+
+def verify_param(param: dict):
+    """
+        验证请求参数
+    """
+    input_data = param.get("input")
+    session_id = param.get("config").get("session_id")
+    if input_data is None:
+        raise ValueError(ErrorCodeEnum.INPUT_INFO_EMPTY.__str__)
+    if session_id is None:
+        raise ValueError(ErrorCodeEnum.SESSION_ID_EMPTY.__str__)
+    
+
+
+
+def get_system_prompt() -> str:
+    """
+        获取系统提示语
+    """
+    system_prompt = config_handler.get("system", "SYSTEM_PROMPT")
+    server_logger.info(f"获取系统提示语: {system_prompt}")
+    return str(system_prompt)
+

+ 205 - 0
foundation/utils/utils.py

@@ -0,0 +1,205 @@
+import json
+import time
+import uuid
+from datetime import datetime
+from typing import List, Dict, Optional
+
+from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
+from langchain_openai import ChatOpenAI
+
+from foundation.base.config import config_handler
+from foundation.logger.loggering import server_logger
+
+
+def get_models():
+    """
+    获取模型,模型类型 默认为deepseek 、qwen
+    """
+    model_type = config_handler.get("model", "MODEL_TYPE")
+    server_logger.info(f"get_models -> model_type:{model_type}")
+    if model_type.upper() == "QWEN":
+        return get_deploy_qwen_models()
+    return get_deepseek_models()
+
+
+def get_deepseek_models():
+    """
+    获取DeepSeek模型
+    """
+    deepseek_model_server_url = config_handler.get("deepseek", "DEEPSEEK_SERVER_URL")
+    deepseek_chat_model_id = config_handler.get("deepseek", "DEEPSEEK_MODEL_ID")
+    deepseek_api_key = config_handler.get("deepseek", "DEEPSEEK_API_KEY")
+    server_logger.info(f"get_deepseek_models -> chat_model_id:{deepseek_chat_model_id},api_key:{deepseek_api_key}")
+    if deepseek_model_server_url is None or deepseek_chat_model_id is None or deepseek_api_key is None:
+        server_logger.error("请设置环境变量: DEEPSEEK_SERVER_URL, DEEPSEEK_MODEL_ID, DEEPSEEK_API_KEY")
+        raise Exception("设置环境变量: DEEPSEEK_SERVER_URL, DEEPSEEK_MODEL_ID, DEEPSEEK_API_KEY")
+    # llm 大模型
+    llm = ChatOpenAI(base_url=deepseek_model_server_url,
+                     api_key=deepseek_api_key,
+                     model=deepseek_chat_model_id,
+                     max_tokens=4096,
+                     temperature=0.3,
+                     top_p=0.7,
+                     extra_body={
+                         "enable_thinking": False  # 添加这个参数以避免报错
+                     })
+    # chat 大模型
+    chat = ChatOpenAI(base_url=deepseek_model_server_url,
+                      api_key=deepseek_api_key,
+                      model=deepseek_chat_model_id,
+                      max_tokens=4096,
+                      temperature=0.3,
+                      top_p=0.2,
+                      extra_body={
+                          "enable_thinking": False  # 添加这个参数以避免报错
+                      })
+    embed = None
+    return llm, chat, embed
+
+
+# 获取千问模型
+def get_deploy_qwen_models():
+    """
+        加载千问系列大模型-魔搭在线Qwen3 API服务
+    """
+    model_server_url = config_handler.get("qwen", "MODEL_SERVER_URL")
+    chat_model_id = config_handler.get("qwen", "CHAT_MODEL_ID")
+    api_key = config_handler.get("qwen", "API_KEY")
+    embedding_model_id = config_handler.get("qwen", "EMBED_MODEL_ID")
+    # temperature = os.getenv("CHAT_MODEL_TEMPERATURE")
+    server_logger.info(
+        f"get_qwen_chat_model -> chat_model_id:{chat_model_id},api_key:{api_key},embedding_model_id:{embedding_model_id}")
+    if model_server_url is None or chat_model_id is None or api_key is None:
+        server_logger.error("请设置环境变量: MODEL_SERVER_URL, CHAT_MODEL_ID, API_KEY")
+        raise Exception("请设置环境变量: MODEL_SERVER_URL, CHAT_MODEL_ID, API_KEY")
+
+    # llm 大模型
+    llm = ChatOpenAI(base_url=model_server_url,
+                     api_key=api_key,
+                     model=chat_model_id,
+                     max_tokens=1024,
+                     temperature=0.5,
+                     top_p=0.7,
+                     extra_body={
+                         "enable_thinking": False  # 添加这个参数以避免报错
+                     })
+    # chat 大模型
+    chat = ChatOpenAI(base_url=model_server_url,
+                      api_key=api_key,
+                      model=chat_model_id,
+                      max_tokens=1024,
+                      temperature=0.01,
+                      top_p=0.2,
+                      extra_body={
+                          "enable_thinking": False  # 添加这个参数以避免报错
+                      })
+
+    # embedding 大模型 text-embedding-v3  text-embedding-v4
+    # from langchain_community.embeddings import DashScopeEmbeddings
+    embed = None  # DashScopeEmbeddings(model=embedding_model_id)
+    return llm, chat, embed
+
+
+def test_qwen_chat_model():
+    #  获取模型
+    llm, chat, embed = get_deploy_qwen_models()
+    example_query = "你好,你是谁?"
+    result = llm.invoke(input=example_query)
+    server_logger.info(f"result={result}")
+    print(f"result={result}")
+
+
+def test_deepseek_chat_model():
+    #  获取模型
+    llm, chat, embed = get_deepseek_models()
+    example_query = "你好,你是谁?"
+    result = llm.invoke(input=example_query)
+    server_logger.info(f"result={result}")
+    print(f"result={result}")
+
+
+def serialize_messages(messages: List[Dict]) -> str:
+    """序列化消息列表为JSON字符串"""
+    return json.dumps(messages)
+
+
+def deserialize_messages(data: str) -> List[Dict]:
+    """反序列化JSON字符串为消息列表"""
+    return json.loads(data) if data else []
+
+
+def to_langchain_messages(messages: List[Dict]) -> List:
+    """将消息字典转换为LangChain消息对象"""
+    langchain_messages = []
+    for msg in messages:
+        if msg["role"] == "user":
+            langchain_messages.append(HumanMessage(content=msg["content"]))
+        elif msg["role"] == "assistant":
+            langchain_messages.append(AIMessage(content=msg["content"]))
+        elif msg["role"] == "system":
+            langchain_messages.append(SystemMessage(content=msg["content"]))
+    return langchain_messages
+
+
+def generate_session_id() -> str:
+    """生成唯一的会话ID"""
+    return f"xiwuzc-{uuid.uuid4()}"
+
+
+def get_current_timestamp() -> float:
+    """获取当前时间戳"""
+    return time.time()
+
+
+def format_timestamp(timestamp: float) -> str:
+    """格式化时间戳为可读字符串"""
+    return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
+
+
+def build_input_context(
+        trace_id: str,
+        task_prompt_info: dict,
+        input_query: str,
+        context: Optional[str] = None,
+        supplement_info: Optional[str] = None,
+        header_info: Optional[Dict] = None
+) -> str:
+    """构建场景优化的上下文提示"""
+    context = context or "无相关数据"
+    supplement_info = supplement_info or "无补充信息"
+    token = header_info.get('token', '') if header_info else ''
+    tenantId = header_info.get('tenantId', '') if header_info else ''
+    task_prompt_info_str = task_prompt_info["task_prompt"]
+
+    return f"""
+🐄 助手会话 [ID: {trace_id}] 🐖
+⏰ 时间: {format_timestamp(get_current_timestamp())}
+📌 任务: {task_prompt_info_str}
+
+📊 相关数据:
+{context}
+
+ℹ️ 补充信息:
+{supplement_info}
+
+❓ 户问题:
+{input_query}
+
+🔒 安全验证: {token}
+🏠 场ID: {tenantId}
+""".strip()
+
+
+def clean_json_output(raw_output: str) -> str:
+    """去除开头和结尾的 ```json 和 ```"""
+    cleaned = raw_output.strip()
+    if cleaned.startswith("```json"):
+        cleaned = cleaned[7:]
+    if cleaned.endswith("```"):
+        cleaned = cleaned[:-3]
+    return cleaned.strip()
+
+
+if __name__ == "__main__":
+    test_qwen_chat_model()  # 运行
+    # test_deepseek_chat_model()

+ 93 - 0
foundation/utils/yaml_utils.py

@@ -0,0 +1,93 @@
+
+# !/usr/bin/ python
+# -*- coding: utf-8 -*-
+'''
+@Project    : lq-agent-api
+@File       :yaml_utils.py
+@IDE        :PyCharm
+@Author     :
+@Date       :2025/7/10 17:32
+'''
+
+import os
+import yaml
+from foundation.logger.loggering import server_logger
+
+import os
+from dotenv import load_dotenv
+from functools import wraps
+
+from foundation.logger.loggering import server_logger
+from foundation.utils.common import handler_err
+from foundation.base.config import config_handler
+
+# 获取当前文件的目录
+current_dir = os.path.dirname(__file__)
+# 获取项目根目录
+project_root = os.path.dirname(os.path.dirname(current_dir))
+# 构建到 .env 的相对路径
+conf_file_path = os.path.join(project_root, '.env')
+#server_logger.info(f"当前目录: {conf_file_path}")
+
+
+
+
+def get_system_prompt() -> dict:
+    """
+        获取系统提示语
+    """
+     # 构建文件路径 判断文件是否存在
+    yaml_file = get_yaml_file_path("system_prompt.yaml")
+    
+    try:
+        with open(yaml_file, 'r', encoding='utf-8') as f:
+            prompt_config = yaml.safe_load(f)
+        # 验证必需字段
+        #validate_prompt_config(prompt_config, prompt_name)
+        server_logger.info(f"成功加载系统system_prompt配置: {prompt_config["system_prompt"]}")
+        return prompt_config
+        
+    except Exception as e:
+        server_logger.error(f"加载system_prompt文件失败: {yaml_file}, 错误: {str(e)}")
+        raise
+
+
+
+
+def get_yaml_file_path(file_name: str) -> str:
+    """
+        获取yaml文件路径
+        :param file_name:
+        :return:
+    """
+    yaml_file = os.path.join(project_root, 'config', 'prompt' , file_name)
+    if not os.path.exists(yaml_file):
+        raise FileNotFoundError(f"Prompt文件不存在: {file_name}")
+    return yaml_file
+
+
+
+
+def get_intent_prompt() -> dict:
+    """
+        获取意图识别 系统提示语
+    """
+     # 构建文件路径 判断文件是否存在
+    yaml_file = get_yaml_file_path("intent_prompt.yaml")
+    
+    try:
+        with open(yaml_file, 'r', encoding='utf-8') as f:
+            prompt_config = yaml.safe_load(f)
+        # 验证必需字段
+        #validate_prompt_config(prompt_config, prompt_name)
+        server_logger.info(f"成功加载[意图识别]系统.system_prompt配置: {prompt_config["system_prompt"]}")
+        server_logger.info(f"成功加载[意图识别]系统配置.examples: {prompt_config["intent_examples"]}")
+        return prompt_config
+        
+    except Exception as e:
+        server_logger.error(f"加载意图识别intent_prompt文件失败: {yaml_file}, 错误: {str(e)}")
+        raise
+
+
+#获取系统提示语
+system_prompt_config = get_system_prompt()

+ 35 - 0
gunicorn_config.py

@@ -0,0 +1,35 @@
+# !/usr/bin/python
+# -*- coding: utf-8 -*-
+'''
+@Project    : lq-agent-api
+@File       :gunicorn_config.py
+@IDE        :PyCharm
+@Author     :
+@Date       :2025/7/23 09:07
+'''
+
+
+import multiprocessing
+
+# 基础配置
+bind = "0.0.0.0:8010"
+workers = multiprocessing.cpu_count() + 1  # 推荐公式
+worker_class = "uvicorn.workers.UvicornWorker"
+timeout = 120
+keepalive = 5
+
+# 日志配置
+accesslog = "./gunicorn_log/access_log.log"  # 输出到 stdout
+errorlog = "./gunicorn_log/error_log.log"   # 错误日志到 stderr
+loglevel = "info"
+
+# 性能优化
+max_requests = 1000     # 防止内存泄漏
+max_requests_jitter = 50
+graceful_timeout = 30   # 优雅停机时间
+
+# MCP 特定优化
+preload_app = True  # 减少内存占用,加速启动
+
+# 安全增强
+limit_request_line = 4094  # 防止过大请求头

+ 179 - 0
requirements.txt

@@ -0,0 +1,179 @@
+aiohappyeyeballs==2.6.1
+aiohttp==3.12.13
+aiosignal==1.4.0
+annotated-types==0.7.0
+anyio==4.9.0
+async-timeout==5.0.1
+attrs==25.3.0
+Authlib==1.6.0
+backoff==2.2.1
+bcrypt==4.3.0
+build==1.2.2.post1
+cachetools==5.5.2
+certifi==2025.7.9
+cffi==1.17.1
+charset-normalizer==3.4.2
+chromadb==1.0.15
+click==8.2.1
+coloredlogs==15.0.1
+concurrent-log-handler==0.9.28
+cryptography==45.0.5
+cyclopts==3.22.2
+dashscope==1.23.8
+dataclasses-json==0.6.7
+distro==1.9.0
+dnspython==2.7.0
+docstring_parser==0.16
+docutils==0.21.2
+durationpy==0.10
+email_validator==2.2.0
+exceptiongroup==1.3.0
+fastapi==0.116.0
+fastmcp==2.10.4
+filelock==3.18.0
+flatbuffers==25.2.10
+frozenlist==1.7.0
+fsspec==2025.5.1
+google-auth==2.40.3
+googleapis-common-protos==1.70.0
+greenlet==3.2.3
+grpcio==1.67.1
+gunicorn==23.0.0
+h11==0.16.0
+hf-xet==1.1.5
+httpcore==1.0.9
+httptools==0.6.4
+httpx==0.28.1
+httpx-sse==0.4.1
+huggingface-hub==0.33.2
+humanfriendly==10.0
+idna==3.10
+importlib_metadata==8.7.0
+importlib_resources==6.5.2
+iniconfig==2.1.0
+jieba==0.42.1
+jiter==0.10.0
+joblib==1.5.1
+jsonpatch==1.33
+jsonpointer==3.0.0
+jsonschema==4.24.0
+jsonschema-specifications==2025.4.1
+jsonschema_pydantic==0.6
+kubernetes==33.1.0
+langchain==0.3.26
+langchain-chroma==0.2.4
+langchain-community==0.3.27
+langchain-core==0.3.68
+langchain-mcp-adapters==0.1.9
+langchain-mcp-tools==0.2.10
+langchain-milvus==0.2.1
+langchain-openai==0.3.27
+langchain-text-splitters==0.3.8
+langgraph==0.5.2
+langgraph-checkpoint==2.1.0
+langgraph-prebuilt==0.5.2
+langgraph-sdk==0.1.72
+langserve==0.3.1
+langsmith==0.4.4
+markdown-it-py==3.0.0
+marshmallow==3.26.1
+mcp==1.10.1
+mdurl==0.1.2
+milvus-lite==2.5.1
+mmh3==5.1.0
+mpmath==1.3.0
+multidict==6.6.3
+mypy_extensions==1.1.0
+mysql-connector-python==9.3.0
+nest-asyncio==1.6.0
+nltk==3.9.1
+numpy==2.3.1
+oauthlib==3.3.1
+onnxruntime==1.22.0
+openai==1.93.3
+openapi-pydantic==0.5.1
+opentelemetry-api==1.34.1
+opentelemetry-exporter-otlp-proto-common==1.34.1
+opentelemetry-exporter-otlp-proto-grpc==1.34.1
+opentelemetry-proto==1.34.1
+opentelemetry-sdk==1.34.1
+opentelemetry-semantic-conventions==0.55b1
+orjson==3.10.18
+ormsgpack==1.10.0
+overrides==7.7.0
+packaging==24.2
+pandas==2.3.1
+pluggy==1.6.0
+portalocker==3.2.0
+posthog==5.4.0
+propcache==0.3.2
+protobuf==5.29.5
+psutil==7.0.0
+pyasn1==0.6.1
+pyasn1_modules==0.4.2
+pybase64==1.4.1
+pycparser==2.22
+pydantic==2.11.7
+pydantic-settings==2.10.1
+pydantic_core==2.33.2
+Pygments==2.19.2
+PyJWT==2.8.0
+pymilvus==2.5.12
+PyMuPDF==1.26.3
+PyMySQL==1.1.1
+pyperclip==1.9.0
+PyPika==0.48.9
+pyproject_hooks==1.2.0
+pytest==8.4.1
+pytest-asyncio==1.0.0
+python-dateutil==2.9.0.post0
+python-dotenv==1.1.1
+python-multipart==0.0.20
+pytz==2025.2
+PyYAML==6.0.2
+referencing==0.36.2
+regex==2024.11.6
+requests==2.32.4
+requests-oauthlib==2.0.0
+requests-toolbelt==1.0.0
+rich==14.0.0
+rich-rst==1.3.1
+rpds-py==0.26.0
+rsa==4.9.1
+setuptools==78.1.1
+shellingham==1.5.4
+six==1.17.0
+sniffio==1.3.1
+SQLAlchemy==2.0.41
+sse-starlette==2.4.1
+starlette==0.46.2
+sympy==1.14.0
+tenacity==9.1.2
+tiktoken==0.9.0
+tokenizers==0.21.2
+tqdm==4.67.1
+typer==0.16.0
+typing-inspect==0.9.0
+typing-inspection==0.4.1
+typing_extensions==4.14.1
+tzdata==2025.2
+ujson==5.10.0
+urllib3==2.5.0
+uv==0.7.20
+uvicorn==0.35.0
+uvloop==0.21.0
+watchfiles==1.1.0
+websocket-client==1.8.0
+websockets==15.0.1
+wheel==0.45.1
+xinference-client==1.7.1.post1
+xxhash==3.5.0
+yarl==1.20.1
+zhipuai==2.1.5.20250708
+zipp==3.23.0
+zstandard==0.23.0
+aioredis==2.0.1
+redis==6.2.0
+langgraph-checkpoint-postgres==2.0.23
+langgraph-checkpoint-redis==0.0.8
+langchain-redis==0.2.3

+ 64 - 0
run.sh

@@ -0,0 +1,64 @@
+#!/bin/bash
+
+# 服务管理脚本
+APP_NAME="xiwu_agent_server"         # 自定义服务名称
+PID_FILE="./gunicorn_log/gunicorn.pid"          # PID 文件路径
+LOG_FILE="./gunicorn_log/gunicorn.log"          # 日志文件路径
+START_COMMAND="gunicorn -c gunicorn_config.py server.app:app"
+
+case "$1" in
+    start)
+        if [ -f "$PID_FILE" ]; then
+            if kill -0 $(cat "$PID_FILE") >/dev/null 2>&1; then
+                echo "✅ $APP_NAME 已在运行 (PID: $(cat $PID_FILE))"
+                exit 1
+            else
+                rm -f "$PID_FILE"
+            fi
+        fi
+
+        echo "🚀 启动 $APP_NAME..."
+        nohup $START_COMMAND >> "$LOG_FILE" 2>&1 &
+        echo $! > "$PID_FILE"
+        echo "🟢 启动成功! PID: $(cat $PID_FILE)"
+        echo "📝 日志输出: $LOG_FILE"
+        ;;
+
+    stop)
+        if [ ! -f "$PID_FILE" ]; then
+            echo "🔴 $APP_NAME 未运行"
+            exit 1
+        fi
+
+        PID=$(cat "$PID_FILE")
+        echo "🛑 停止 $APP_NAME (PID: $PID)..."
+        kill -TERM $PID
+        rm -f "$PID_FILE"
+        echo "⭕ 已停止"
+        ;;
+
+    restart)
+        $0 stop
+        sleep 2
+        $0 start
+        ;;
+
+    status)
+        if [ -f "$PID_FILE" ]; then
+            if kill -0 $(cat "$PID_FILE") >/dev/null 2>&1; then
+                echo "🟢 $APP_NAME 正在运行 (PID: $(cat $PID_FILE))"
+            else
+                echo "❌ PID 文件存在但进程未运行"
+                rm -f "$PID_FILE"
+            fi
+        else
+            echo "🔴 $APP_NAME 未运行"
+        fi
+        ;;
+
+    *)
+        echo "使用方法: $0 {start|stop|restart|status}"
+        exit 1
+esac
+
+exit 0

+ 46 - 0
server/app.py

@@ -0,0 +1,46 @@
+import os
+import sys
+
+
+BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+sys.path.insert(0, BASE_DIR)
+
+from views import lifespan
+
+from fastapi.middleware.cors import CORSMiddleware
+
+from fastapi import FastAPI
+
+from foundation.logger.loggering import server_logger
+from views.test_views import test_router
+
+
+# 创建 FastAPI 应用
+app = FastAPI(
+    title="Data Governance API",
+    version="0.2",
+    description="Data Governance API",
+    lifespan=lifespan
+)
+
+
+app.include_router(test_router)
+
+
+# 添加 CORS 中间件
+app.add_middleware(
+    CORSMiddleware,
+    allow_origins=["*"],  # 允许所有的来源
+    allow_credentials=True,
+    allow_methods=["*"],  # 允许的HTTP方法
+    allow_headers=["*"],  # 允许的请求头
+)
+
+
+server_logger.info(msg="APP init successfully")
+
+
+# 运行Uvicorn服务器
+if __name__ == "__main__":
+    import uvicorn
+    uvicorn.run(app, host="0.0.0.0", port=8010,reload=True)

+ 10 - 0
server/cus_middlewares.py

@@ -0,0 +1,10 @@
+# !/usr/bin/ python
+# -*- coding: utf-8 -*-
+'''
+@Project    : lq-agent-api
+@File       :cus_middlewares.py
+@IDE        :PyCharm
+@Author     :
+@Date       :2025/7/14 09:57
+'''
+

BIN
test/pdf_files/G4216 线屏山新市至金阳段高速公路项目XJ4 合同段T 梁预制、运输及安装专项施工方案(修编).pdf


+ 37 - 0
views/__init__.py

@@ -0,0 +1,37 @@
+# !/usr/bin/ python
+# -*- coding: utf-8 -*-
+'''
+@Project    : lq-agent-api
+@File       :__init__.py.py
+@IDE        :PyCharm
+@Author     :
+@Date       :2025/7/10 17:04
+'''
+
+import uuid
+from contextlib import asynccontextmanager
+from contextvars import ContextVar
+
+from fastapi import FastAPI, APIRouter
+
+
+mcp_server = None
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+    # 启动时加载工具
+    #await mcp_server.get_mcp_tools()
+
+    yield
+    # 关闭时清理
+    if mcp_server and mcp_server.cleanup:
+        await mcp_server.close()
+
+test_router = APIRouter(prefix="/test", tags=["agent"])
+current_operation_id: ContextVar[str] = ContextVar("operation_id", default=str(uuid.uuid4()))
+
+
+
+def get_operation_id() -> str:
+    """依赖项:获取当前操作ID"""
+    return current_operation_id.get()

+ 412 - 0
views/test_views.py

@@ -0,0 +1,412 @@
+# !/usr/bin/ python
+# -*- coding: utf-8 -*-
+'''
+@Project    : lq-agent-api
+@File       :cattle_farm_views.py
+@IDE        :PyCharm
+@Author     :
+@Date       :2025/7/10 17:32
+'''
+import json
+from typing import Optional
+
+from fastapi import Depends, Response, Header
+from sse_starlette import EventSourceResponse
+from starlette.responses import JSONResponse
+
+from foundation.agent.test_agent import test_agent_client
+from foundation.agent.generate.model_generate import test_generate_model_client
+from foundation.logger.loggering import server_logger
+from foundation.schemas.test_schemas import TestForm
+from foundation.utils.common import return_json, handler_err
+from views import test_router, get_operation_id
+from foundation.agent.workflow.test_workflow_graph import test_workflow_graph
+from file_processors.pdf_processor import PDFProcessor
+
+
+
+@test_router.post("/generate/chat", response_model=TestForm)
+async def generate_chat_endpoint(
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+        生成类模型
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        print(trace_id)
+        # 从字典中获取input
+        input_query = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        task_prompt_info = {"task_prompt": ""}
+        output = test_generate_model_client.get_model_generate_invoke(trace_id , task_prompt_info, 
+                                                                                 input_query, context)
+        # 直接执行
+        server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+
+@test_router.post("/generate/stream", response_model=TestForm)
+async def generate_stream_endpoint(
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+        生成类模型
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+
+        # 从字典中获取input
+        input_query = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        task_prompt_info = {"task_prompt": ""}
+        # 创建 SSE 流式响应
+        async def event_generator():
+            try:
+                # 流式处理查询 trace_id, task_prompt_info: dict, input_query, context=None
+                for chunk in test_generate_model_client.get_model_generate_stream(trace_id , task_prompt_info, 
+                                                                                 input_query, context):
+                    # 发送数据块
+                    yield {
+                        "event": "message",
+                        "data": json.dumps({
+                            "output": chunk,
+                            "completed": False,
+                        }, ensure_ascii=False)
+                    }
+                     # 获取缓存数据
+                result_data = {}
+                # 发送结束事件
+                yield {
+                    "event": "message_end",
+                    "data": json.dumps({
+                        "completed": True,
+                        "message": json.dumps(result_data, ensure_ascii=False),
+                        "code": 0,
+                        "trace_id": trace_id,
+                    }, ensure_ascii=False),
+                }
+            except Exception as e:
+                # 错误处理
+                yield {
+                    "event": "error",
+                    "data": json.dumps({
+                        "trace_id": trace_id,
+                        "message": str(e),
+                        "code": 1
+                    }, ensure_ascii=False)
+                }
+            finally:
+                # 不需要关闭客户端,因为它是单例
+                pass
+        # 返回 SSE 响应
+        return EventSourceResponse(
+            event_generator(),
+            headers={
+                "Cache-Control": "no-cache",
+                "Connection": "keep-alive"
+            }
+        )
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+# 路由
+
+@test_router.post("/agent/chat", response_model=TestForm)
+async def chat_endpoint(
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+    根据场景获取智能体反馈
+    """
+
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        # 验证参数
+
+        # 从字典中获取input
+        input_data = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        task_prompt_info = {"task_prompt": ""}
+  
+        # stream 流式执行
+        output = await test_agent_client.handle_query(trace_id  , task_prompt_info, input_data, context, param.config)
+        # 直接执行
+        server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+
+@test_router.post("/agent/stream", response_class=Response)
+async def chat_agent_stream(param: TestForm,
+                     trace_id: str = Depends(get_operation_id)):
+    """
+    根据场景获取智能体反馈 (SSE流式响应)
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+
+       
+        # 提取参数
+        input_data = param.input
+        context = param.context
+        header_info = {
+          
+        }
+        task_prompt_info = {"task_prompt": ""}
+          # 如果business_scene为None,则使用大模型进行意图识别
+
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        # 创建 SSE 流式响应
+        async def event_generator():
+            try:
+                # 流式处理查询
+                async for chunk in test_agent_client.handle_query_stream(
+                        trace_id=trace_id,
+                        config_param=param.config,
+                        task_prompt_info=task_prompt_info,
+                        input_query=input_data,
+                        context=context,
+                        header_info=header_info
+                ):
+                    server_logger.debug(trace_id=trace_id, msg=f"{chunk}")
+                    # 发送数据块
+                    yield {
+                        "event": "message",
+                        "data": json.dumps({
+                            "code": 0,
+                            "output": chunk,
+                            "completed": False,
+                            "trace_id": trace_id,
+                        }, ensure_ascii=False)
+                    }
+                # 获取缓存数据
+                result_data = {}
+                # 发送结束事件
+                yield {
+                    "event": "message_end",
+                    "data": json.dumps({
+                        "completed": True,
+                        "message": json.dumps(result_data, ensure_ascii=False),
+                        "code": 0,
+                        "trace_id": trace_id,
+                    }, ensure_ascii=False),
+                }
+            except Exception as e:
+                # 错误处理
+                yield {
+                    "event": "error",
+                    "data": json.dumps({
+                        "trace_id": trace_id,
+                        "message": str(e),
+                        "code": 1
+                    }, ensure_ascii=False)
+                }
+            finally:
+                # 不需要关闭客户端,因为它是单例
+                pass
+
+        # 返回 SSE 响应
+        return EventSourceResponse(
+            event_generator(),
+            headers={
+                "Cache-Control": "no-cache",
+                "Connection": "keep-alive"
+            }
+        )
+
+    except Exception as err:
+        # 初始错误处理
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/stream")
+        return JSONResponse(
+            return_json(code=1, msg=f"{err}", trace_id=trace_id),
+            status_code=500
+        )
+
+
+
+@test_router.post("/graph/stream", response_class=Response)
+async def chat_graph_stream(param: TestForm,
+                     trace_id: str = Depends(get_operation_id)):
+    """
+        根据场景获取智能体反馈 (SSE流式响应)
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        # request_param = {
+        #     "input": param.input,
+        #     "config": param.config,
+        #     "context": param.context
+        # }
+        # 创建 SSE 流式响应 
+        async def event_generator():
+                try:
+                    # 流式处理查询
+                    async for chunk in test_workflow_graph.handle_query_stream(
+                            param=param,
+                            trace_id=trace_id,
+                    ):
+                        server_logger.debug(trace_id=trace_id, msg=f"{chunk}")
+                        # 发送数据块
+                        yield {
+                            "event": "message",
+                            "data": json.dumps({
+                                "code": 0,
+                                "output": chunk,
+                                "completed": False,
+                                "trace_id": trace_id,
+                                "dataType": "text"
+                            }, ensure_ascii=False)
+                        }
+
+                    # 发送结束事件
+                    yield {
+                        "event": "message_end",
+                        "data": json.dumps({
+                            "completed": True,
+                            "message": "Stream completed",
+                            "code": 0,
+                            "trace_id": trace_id,
+                            "dataType": "text"
+                        }, ensure_ascii=False),
+                    }
+                except Exception as e:
+                    # 错误处理
+                    yield {
+                        "event": "error",
+                        "data": json.dumps({
+                            "trace_id": trace_id,
+                            "msg": str(e),
+                            "code": 1,
+                            "dataType": "text"
+                        }, ensure_ascii=False)
+                    }
+                finally:
+                    # 不需要关闭客户端,因为它是单例
+                    pass
+
+        # 返回 SSE 响应
+        return EventSourceResponse(
+            event_generator(),
+            headers={
+                "Cache-Control": "no-cache",
+                "Connection": "keep-alive"
+            }
+        )
+        
+    except Exception as err:
+        # 初始错误处理
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="graph/stream")
+        return JSONResponse(
+            return_json(code=1, msg=f"{err}", trace_id=trace_id),
+            status_code=500
+        )
+
+
+
+
+@test_router.post("/data/governance", response_model=TestForm)
+async def generate_chat_endpoint(
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+        生成类模型
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        print(trace_id)
+        # 从字典中获取input
+        input_query = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        task_prompt_info = {"task_prompt": ""}
+        output = test_generate_model_client.get_model_data_governance_invoke(trace_id , task_prompt_info, 
+                                                                                 input_query, context)
+        # 直接执行
+        server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+
+
+@test_router.post("/data/pdf/governance", response_model=TestForm)
+async def generate_chat_endpoint(
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+        生成类模型
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        print(trace_id)
+        # 从字典中获取input
+        input_query = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        task_prompt_info = {"task_prompt": ""}
+        #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
+        file_directory= "test/pdf_files"
+         # 初始化知识问答处理
+        pdf_processor = PDFProcessor(directory=file_directory)
+        file_data = pdf_processor.process_pdfs_group()
+        server_logger.info(trace_id=trace_id, msg=f"【result】: {file_data}", log_type="agent/chat")
+        output = None
+        #output = test_generate_model_client.get_model_data_governance_invoke(trace_id , task_prompt_info, input_query, context)
+        # 直接执行
+        server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))