Просмотр исходного кода

增加编制依据导入向量库

lingmin_package@163.com 4 месяцев назад
Родитель
Сommit
b72d5815f5
26 измененных файлов с 1480 добавлено и 30 удалено
  1. 4 1
      README.md
  2. 16 0
      config/config.ini
  3. 20 0
      config/sql/pg_vector.sql
  4. 163 0
      file_processors/bfp_pdf_processor.py
  5. 28 28
      foundation/models/silicon_flow.py
  6. 108 0
      foundation/rag/vector/base_vector.py
  7. 347 0
      foundation/rag/vector/milvus_vector.py
  8. 269 0
      foundation/rag/vector/pg_vector.py
  9. 249 0
      foundation/rag/vector/pg_vector_mananger.py
  10. BIN
      test/bfp_files/GB50010-2010(2015版) 混凝土结构设计规范.pdf
  11. BIN
      test/bfp_files/《公路水运工程安全生产监督管理办法》.docx
  12. BIN
      test/bfp_files/《架桥机安全规程》.pdf
  13. BIN
      test/bfp_files/《架桥机通用技术条件》.pdf
  14. BIN
      test/bfp_files/《起重机械安全规程》.pdf
  15. BIN
      test/bfp_files/中华人民共和国安全生产法.pdf
  16. BIN
      test/bfp_files/公路工程施工安全技术规范.pdf
  17. BIN
      test/bfp_files/公路工程质量检验评定标准+第一册+土建工程.pdf
  18. BIN
      test/bfp_files/公路水运工程拟淘汰危及生产安全施工工艺、设备和材料目录.pdf
  19. BIN
      test/bfp_files/公路水运工程质量监督管理规定.docx
  20. BIN
      test/bfp_files/危险性较大的分部分项工程安全管理规定__2018年第15号国务院公报_中国政府网.pdf
  21. BIN
      test/bfp_files/建筑施工塔式起重机安装、使用、拆卸安全技术规范.pdf
  22. BIN
      test/bfp_files/建筑施工模板安全技术规范JGJ162-2008.pdf
  23. BIN
      test/bfp_files/建筑施工高空作业安全技术规范.pdf
  24. BIN
      test/bfp_files/施工现场临时用电安全技术规范.pdf
  25. BIN
      test/bfp_files/混凝土结构工程施工质量验收规范GB 50204-2015.pdf
  26. 276 1
      views/test_views.py

+ 4 - 1
README.md

@@ -12,8 +12,11 @@
     - gunicorn -c gunicorn_config.py server.app:app       多进程启动
 
 
+  ### PostgreSQL 数据库操作测试
+    sentence-transformers
 
-
+    pip install psycopg2-binary pgvector -i https://mirrors.aliyun.com/pypi/simple/
+    pip install pymilvus sentence-transformers 
  
  
 ### 测试接口

+ 16 - 0
config/config.ini

@@ -48,3 +48,19 @@ SLCF_CHAT_MODEL_ID=test-model
 SLCF_EMBED_MODEL_ID=netease-youdao/bce-embedding-base_v1
 SLCF_REANKER_MODEL_ID=BAAI/bge-reranker-v2-m3
 SLCF_VL_CHAT_MODEL_ID=THUDM/GLM-4.1V-9B-Thinking
+
+
+[pgvector]
+PGVECTOR_HOST=124.223.140.149
+PGVECTOR_PORT=7432
+PGVECTOR_DB=vector_db
+PGVECTOR_USER=vector_user
+PGVECTOR_PASSWORD=pg16@123
+
+
+[milvus]
+MILVUS_HOST=124.223.140.149
+MILVUS_PORT=7432
+MILVUS_DB=vector_db
+MILVUS_USER=vector_user
+MILVUS_PASSWORD=pg16@123

+ 20 - 0
config/sql/pg_vector.sql

@@ -0,0 +1,20 @@
+
+
+
+
+-- 创建编制依据向量表
+DROP TABLE IF EXISTS tv_basis_of_preparation;
+CREATE TABLE IF NOT EXISTS tv_basis_of_preparation (
+                    id SERIAL PRIMARY KEY,
+                    text_content TEXT,
+                    embedding vector(768),
+                    metadata JSONB DEFAULT '{}'::jsonb,
+                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+                );
+                
+-- 创建向量相似度索引
+CREATE INDEX IF NOT EXISTS idx_tv_basis_of_preparation_embedding 
+ON tv_basis_of_preparation USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
+
+总共:17个文档,分块数量:2900
+docs:16,total_chunks:1889

+ 163 - 0
file_processors/bfp_pdf_processor.py

@@ -0,0 +1,163 @@
+import os
+import time
+from tqdm import tqdm
+from langchain_community.document_loaders import PyMuPDFLoader
+from langchain_text_splitters import RecursiveCharacterTextSplitter
+from foundation.logger.loggering import server_logger
+from foundation.utils.common import handler_err
+from foundation.base.config import config_handler
+from langchain_core.documents import Document
+from foundation.rag.vector.base_vector import BaseVectorDB
+
+class BfpPDFProcessor:
+    def __init__(self, directory , base_vector: BaseVectorDB , **kwargs):
+        """
+        初始化 PDF 处理器
+        :param directory: PDF 文件所在目录
+        :param db_type: 数据库类型 ('vector' 或 'es')
+        :param kwargs: 其他参数
+        """
+        self.base_vector = base_vector
+        self.directory = directory  # PDF 文件所在目录
+        self.file_group_num = kwargs.get('file_group_num', 20)  # 每组处理的文件数
+        self.batch_num = kwargs.get('batch_num', 6)  # 每次插入的批次数量
+        self.chunksize = kwargs.get('chunksize', 500)  # 切分文本的大小
+        self.overlap = kwargs.get('overlap', 100)  # 切分文本的重叠大小
+        self.file_suffix_list = kwargs.get('file_suffix_list', ['.pdf' , '.docx' , '.doc'])
+        server_logger.info(f"""
+                    初始化PDF文件导入器:
+                    配置参数:
+                    - 文件后缀列表:{self.file_suffix_list}
+                    - 导入的文件路径:{self.directory}
+                    - 每次处理文件数:{self.file_group_num}
+                    - 每批次处理样本数:{self.batch_num}
+                    - 切分文本的大小:{self.chunksize}
+                    - 切分文本重叠大小:{self.overlap}
+                    """)
+
+    def load_pdf_files(self):
+        """
+        加载目录下的所有PDF文件
+        """
+        file_path = os.path.join(self.directory)
+        pdf_path_files = []
+        pdf_file_names = []
+        #server_logger.info(f"file_path: {file_path}")
+        for file_name in os.listdir(file_path):
+            # 获取后缀(带点) # file_name.lower().endswith('.docx'):
+            file_suffix = os.path.splitext(file_name)[1] 
+            if file_suffix in self.file_suffix_list:
+                pdf_file_names.append(file_name)
+                pdf_path_files.append(os.path.join(file_path, file_name))
+            else:
+                server_logger.info(f"Skipping {file_name} because it is not a PDF file.")
+
+        server_logger.info(f"Found {len(pdf_file_names)} PDF files.")
+        server_logger.info(f"pdf_path_files: {pdf_path_files},pdf_file_names:{pdf_file_names}")
+        return pdf_path_files , pdf_file_names
+
+    def load_pdf_content(self, pdf_path):
+        """
+        读取PDF文件内容
+        """
+        pdf_loader = PyMuPDFLoader(file_path=pdf_path)
+        docs = pdf_loader.load()
+        server_logger.info(f" Loading content from {pdf_path}.")
+        return docs
+
+
+    def load_and_process_data(self , file_path):
+        """读取和处理数据"""
+        with open(file=file_path, mode="r", encoding="utf8") as f:
+            data = f.read()
+        return data
+    
+    def split_text(self, documents):
+        """
+        将文本切分成小段
+        documents 是一个分组所有文件的 文档内容
+        """
+        # 切分文档
+        text_splitter = RecursiveCharacterTextSplitter(
+            chunk_size=self.chunksize,
+            chunk_overlap=self.overlap,
+            length_function=len,
+            add_start_index=True,
+        )
+        docs = text_splitter.split_documents(documents)
+        server_logger.info(f"Split text into smaller chunks with RecursiveCharacterTextSplitter. Total chunks: {len(docs)}")
+        return docs
+
+
+    def process_tqdm_pdfs_group(self):
+        """
+        处理PDF文件组 并且直接入库处理
+        """
+        total_chunks = 0
+        # 读取PDF文件内容
+        pdf_path_files , pdf_file_names = self.load_pdf_files()
+
+        server_logger.info(f"process {len(pdf_path_files)} documents.")
+        start_time = time.time()
+        total_docs_inserted = 0
+
+        total_batches = len(pdf_path_files)
+        
+        with tqdm(total=total_batches, desc="process batches", unit="batch") as pbar:
+            for pdf_path_file , pdf_file_name in zip(pdf_path_files , pdf_file_names):
+                # 读取PDF文件内容
+                document_content = self.load_pdf_content(pdf_path_file)
+                # 将文本切分成小段
+                docs = self.split_text(document_content)
+                total_chunks += len(docs)
+                server_logger.info(f"Documents pdf_file_name:{pdf_file_name},docs:{len(docs)}")
+                # 调用传入的插入函数
+                # 数据标准化处理
+                documents = self.base_vector.document_standard(docs)
+                self.base_vector.add_tqdm_batch_documents(param={"table_name": "tv_basis_of_preparation"}, documents=documents)
+
+                total_docs_inserted += 1
+                # 计算并显示当前的TPM
+                elapsed_time = time.time() - start_time
+                if elapsed_time > 0:
+                    tpm = (total_docs_inserted / elapsed_time) * 60
+                    pbar.set_postfix({"TPM": f"{tpm:.2f}"})
+
+                pbar.update(1)
+
+        # TODO 切分的问题 可以增加metadata元数据信息 
+        server_logger.info(f"Processed Documents:{self.directory},docs:{len(pdf_path_files)},total_chunks:{total_chunks}")
+        
+
+
+
+
+    def get_pdfs_group_data(self):
+        # 读取PDF文件内容
+        pdf_contents = []
+        pdf_path_files , pdf_file_names = self.load_pdf_files()
+
+        for pdf_path_file , pdf_file_name in zip(pdf_path_files , pdf_file_names):
+            # 读取PDF文件内容
+            document_content = self.load_pdf_content(pdf_path_file)
+            # 将文本切分成小段
+            docs = self.split_text(document_content)
+            pdf_contents.append(docs)
+            server_logger.info(f"Documents pdf_file_name:{pdf_file_name},docs:{len(docs)}")
+            #self.print_chunk_info(docs)
+
+        total_chunks = sum([len(docs) for docs in pdf_contents])
+        # TODO 切分的问题 可以增加metadata元数据信息 
+        server_logger.info(f"Processed Documents:{self.directory},docs:{len(pdf_contents)},total_chunks:{total_chunks}")
+        return pdf_contents , total_chunks
+        
+
+
+
+    def print_chunk_info(self, docs):
+        """打印切分文本的信息"""
+        for chunk in docs:
+            server_logger.info(f"\n {'=' * 100}")
+            server_logger.info(f"Chunk: {chunk.page_content}")
+            server_logger.info(f"Metadata: {chunk.metadata}")
+            server_logger.info(f"\n {'=' * 100}")

+ 28 - 28
foundation/models/silicon_flow.py

@@ -12,7 +12,7 @@ from foundation.logger.loggering import server_logger
 from foundation.utils.common import handler_err
 from openai import OpenAI
 from langchain_core.embeddings import Embeddings
-from chromadb.utils.embedding_functions import EmbeddingFunction
+#from chromadb.utils.embedding_functions import EmbeddingFunction
 from typing import List
 import numpy as np
 
@@ -55,37 +55,37 @@ class SiliconFlowEmbeddings(Embeddings):
 
 
 
-class ChromaSiliconFlowEmbedding(EmbeddingFunction):
-    """
-        将SiliconFlowEmbeddings适配到ChromaDB的嵌入函数接口
-    """
-    def __init__(self, embeddings):
-        self.embeddings = embeddings
+# class ChromaSiliconFlowEmbedding(EmbeddingFunction):
+#     """
+#         将SiliconFlowEmbeddings适配到ChromaDB的嵌入函数接口
+#     """
+#     def __init__(self, embeddings):
+#         self.embeddings = embeddings
 
-    def __call__(self, input: List[str]) -> List[List[float]]:
-        raw_embeddings = self.embeddings.embed_documents(input)  # 关键添加
-        return self.normalized_embeddings(raw_embeddings)
+#     def __call__(self, input: List[str]) -> List[List[float]]:
+#         raw_embeddings = self.embeddings.embed_documents(input)  # 关键添加
+#         return self.normalized_embeddings(raw_embeddings)
 
-    def embed_documents(self, input: List[str]) -> List[List[float]]:
-        raw_embeddings = self.embeddings.embed_documents(input)  # 关键添加
-        return self.normalized_embeddings(raw_embeddings)
+#     def embed_documents(self, input: List[str]) -> List[List[float]]:
+#         raw_embeddings = self.embeddings.embed_documents(input)  # 关键添加
+#         return self.normalized_embeddings(raw_embeddings)
 
-    def embed_query(self, text: str) -> List[float]:
-        """对查询文本进行向量化"""
-        raw_embeddings = self.embeddings.embed_documents([text])[0]
-        return self.normalized_embeddings(raw_embeddings)
+#     def embed_query(self, text: str) -> List[float]:
+#         """对查询文本进行向量化"""
+#         raw_embeddings = self.embeddings.embed_documents([text])[0]
+#         return self.normalized_embeddings(raw_embeddings)
 
     
-    def normalized_embeddings(self , raw_embeddings):
-        # L2归一化处理
-        normalized = []
-        for vector in raw_embeddings:
-            norm = np.linalg.norm(vector)
-            if norm > 0:
-                normalized.append(vector / norm)
-            else:
-                normalized.append(vector)
-        return normalized
+#     def normalized_embeddings(self , raw_embeddings):
+#         # L2归一化处理
+#         normalized = []
+#         for vector in raw_embeddings:
+#             norm = np.linalg.norm(vector)
+#             if norm > 0:
+#                 normalized.append(vector / norm)
+#             else:
+#                 normalized.append(vector)
+#         return normalized
 
 
 
@@ -104,7 +104,7 @@ class SiliconFlowAPI(BaseApiPlatform):
         self.client = self.get_openai_client(self.model_server_url, self.api_key)
         # 创建LangChain兼容的嵌入对象
         langchain_embeddings = SiliconFlowEmbeddings(base_url = self.model_server_url , api_key=self.api_key , embed_model_id=self.embed_model_id)
-        self.embed_model = ChromaSiliconFlowEmbedding(embeddings=langchain_embeddings)
+        #self.embed_model = ChromaSiliconFlowEmbedding(embeddings=langchain_embeddings)
 
 
 

+ 108 - 0
foundation/rag/vector/base_vector.py

@@ -0,0 +1,108 @@
+from foundation.logger.loggering import server_logger as logger
+import os
+import time
+from tqdm import tqdm
+from typing import List, Dict, Any
+from foundation.models.base_online_platform import BaseApiPlatform
+
+
+class BaseVectorDB:
+    """
+      向量数据库操作基类
+    """
+        
+    def __init__(self , base_api_platform :BaseApiPlatform):
+        self.base_api_platform = base_api_platform
+
+
+
+    def text_to_vector(self, text: str) -> List[float]:
+        """
+        将文本转换为向量
+        """
+        return self.base_api_platform.get_embeddings([text])[0]
+    
+
+    def document_standard(self, documents: List[Dict[str, Any]]):
+        """
+          文档标准处理
+        """
+        raise NotImplementedError
+
+    
+    def add_document(self , param: Dict[str, Any] , document: Dict[str, Any]):
+        """
+          单条添加文档
+          param: 扩展参数信息,如:表名称等
+          documents: 文档列表,包括元数据信息
+          # 返回: 添加的文档ID列表
+        """
+        raise NotImplementedError
+
+
+    def add_batch_documents(self , param: Dict[str, Any] , documents: List[Dict[str, Any]]):
+        """
+          批量添加文档
+          param: 扩展参数信息,如:表名称等
+          documents: 文档列表,包括元数据信息
+          # 返回: 添加的文档ID列表
+        """
+        raise NotImplementedError
+
+
+    def add_tqdm_batch_documents(self , param: Dict[str, Any] , documents: List[Dict[str, Any]] , batch_size=10):
+        """
+          批量添加文档(带进度条)
+          param: 扩展参数信息,如:表名称等
+          documents: 文档列表,包括元数据信息
+          # 返回: 添加的文档ID列表
+        """
+        
+        logger.info(f"Inserting {len(documents)} documents.")
+        start_time = time.time()
+        total_docs_inserted = 0
+
+        total_batches = (len(documents) + batch_size - 1) // batch_size
+
+        with tqdm(total=total_batches, desc="Inserting batches", unit="batch") as pbar:
+            for i in range(0, len(documents), batch_size):
+                batch = documents[i:i + batch_size]
+                # 调用传入的插入函数
+                self.add_batch_documents(param, batch)
+
+                total_docs_inserted += len(batch)
+                # 计算并显示当前的TPM
+                elapsed_time = time.time() - start_time
+                if elapsed_time > 0:
+                    tpm = (total_docs_inserted / elapsed_time) * 60
+                    pbar.set_postfix({"TPM": f"{tpm:.2f}"})
+
+                pbar.update(1)
+
+        
+
+
+    def retriever(self, input_query):
+        """
+          根据用户问题查询文档
+        """
+        raise NotImplementedError
+
+
+    def similarity_search(self, param: Dict[str, Any], query_text: str , min_score=0.5 , 
+                          top_k=10, filters: Dict[str, Any] = None):
+      """
+          根据用户问题查询文档
+      """
+      raise NotImplementedError
+
+
+    def retriever(self, param: Dict[str, Any], query_text: str, 
+                          top_k: int = 5, filters: Dict[str, Any] = None):
+      """
+          根据用户问题查询文档
+      """
+      raise NotImplementedError
+
+
+    

+ 347 - 0
foundation/rag/vector/milvus_vector.py

@@ -0,0 +1,347 @@
+import time
+from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
+from sentence_transformers import SentenceTransformer
+import numpy as np
+from typing import List, Dict, Any, Optional
+import json
+from foundation.base.config import config_handler
+from foundation.logger.loggering import server_logger as logger
+from foundation.rag.vector.base_vector import BaseVectorDB
+from foundation.models.base_online_platform import BaseApiPlatform
+
+class MilvusVectorManager(BaseVectorDB):
+    def __init__(self, base_api_platform :BaseApiPlatform):
+        """
+        初始化 Milvus 连接
+        """
+        self.base_api_platform = base_api_platform
+
+        self.host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
+        self.port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
+        self.user = config_handler.get('milvus', 'MILVUS_USER')
+        self.password = config_handler.get('milvus', 'MILVUS_PASSWORD')
+        
+        # 初始化文本向量化模型
+        #self.model = SentenceTransformer('all-MiniLM-L6-v2')  # 可以替换为其他模型
+        
+        # 连接到 Milvus
+        self.connect()
+    
+    def connect(self):
+        """连接到 Milvus 服务器"""
+        try:
+            connections.connect(
+                alias="default",
+                host=self.host,
+                port=self.port,
+                user=self.user,
+                password=self.password
+            )
+            logger.info(f"Connected to Milvus at {self.host}:{self.port}")
+        except Exception as e:
+            logger.error(f"Failed to connect to Milvus: {e}")
+            raise
+    
+    def create_collection(self, collection_name: str, dimension: int = 768, 
+                         description: str = "Vector collection for text embeddings"):
+        """
+        创建向量集合
+        """
+        try:
+            # 检查集合是否已存在
+            if utility.has_collection(collection_name):
+                logger.info(f"Collection {collection_name} already exists")
+                return
+            
+            # 定义字段
+            fields = [
+                FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
+                FieldSchema(name="text_content", dtype=DataType.VARCHAR, max_length=65535),
+                FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=dimension),
+                FieldSchema(name="metadata", dtype=DataType.JSON),
+                FieldSchema(name="created_at", dtype=DataType.INT64)
+            ]
+            
+            # 创建集合模式
+            schema = CollectionSchema(
+                fields=fields,
+                description=description
+            )
+            
+            # 创建集合
+            collection = Collection(
+                name=collection_name,
+                schema=schema
+            )
+            
+            # 创建索引
+            index_params = {
+                "index_type": "IVF_FLAT",
+                "metric_type": "COSINE",
+                "params": {"nlist": 100}
+            }
+            
+            collection.create_index(field_name="embedding", index_params=index_params)
+            logger.info(f"Collection {collection_name} created successfully!")
+            
+        except Exception as e:
+            logger.error(f"Error creating collection: {e}")
+            raise
+    
+    
+    
+
+    def add_document(self , param: Dict[str, Any] , document: Dict[str, Any]):
+        """
+        插入单个文本及其向量
+        """
+        try:
+            collection_name = param.get('collection_name')
+            text = document.get('content')
+            metadata = document.get('metadata')
+            collection = Collection(collection_name)
+            created_at = None
+            
+            # 转换文本为向量
+            embedding = self.text_to_vector(text)
+            
+            # 准备数据
+            data = [
+                [text],  # text_content
+                [embedding],  # embedding
+                [metadata or {}],  # metadata
+                [created_at or int(time.time())]  # created_at
+            ]
+            
+            # 插入数据
+            insert_result = collection.insert(data)
+            collection.flush()  # 确保数据被写入
+            
+            logger.info(f"Text inserted with ID: {insert_result.primary_keys[0]}")
+            return insert_result.primary_keys[0]
+            
+        except Exception as e:
+            logger.error(f"Error inserting text: {e}")
+            return None
+    
+
+
+    def add_batch_documents(self , param: Dict[str, Any] , documents: List[Dict[str, Any]]):
+        """
+        批量插入文本
+        texts: [{'text': '...', 'metadata': {...}}, ...]
+        """
+        try:
+            collection_name = param.get('collection_name')
+            collection = Collection(collection_name)
+            
+            text_contents = []
+            embeddings = []
+            metadatas = []
+            timestamps = []
+            
+            for item in documents:
+                text = item['content']
+                metadata = item.get('metadata', {})
+                
+                # 转换文本为向量
+                embedding = self.text_to_vector(text)
+                
+                text_contents.append(text)
+                embeddings.append(embedding)
+                metadatas.append(metadata)
+                timestamps.append(int(time.time()))
+            
+            # 准备批量数据
+            data = [text_contents, embeddings, metadatas, timestamps]
+            
+            # 批量插入
+            insert_result = collection.insert(data)
+            collection.flush()  # 确保数据被写入
+            
+            logger.info(f"Batch inserted {len(text_contents)} records, IDs: {insert_result.primary_keys}")
+            return insert_result.primary_keys
+            
+        except Exception as e:
+            logger.error(f"Error batch inserting: {e}")
+            return None
+    
+
+
+
+    def similarity_search(self, param: Dict[str, Any], query_text: str , min_score=0.5 ,
+                           top_k=5, filters: Dict[str, Any] = None):
+        """
+        搜索相似文本
+        """
+        try:
+            collection_name = param.get('collection_name')
+            collection = Collection(collection_name)
+            
+            # 加载集合到内存(如果还没有加载)
+            collection.load()
+            
+            # 转换查询文本为向量
+            query_embedding = self.text_to_vector(query_text)
+            
+            # 搜索参数
+            search_params = {
+                "metric_type": "COSINE",
+                "params": {"nprobe": 10}
+            }
+             # 构建过滤表达式
+            filter_expr = self._create_filter(filters)
+            
+            # 执行搜索
+            results = collection.search(
+                data=[query_embedding],
+                anns_field="embedding",
+                param=search_params,
+                limit=top_k,
+                expr=filter_expr,
+                output_fields=["text_content", "metadata"]
+            )
+            
+            # 格式化结果
+            formatted_results = []
+            for hits in results:
+                for hit in hits:
+                    formatted_results.append({
+                        'id': hit.id,
+                        'text_content': hit.entity.get('text_content'),
+                        'metadata': hit.entity.get('metadata'),
+                        'distance': hit.distance,
+                        'similarity': 1 - hit.distance  # 转换为相似度
+                    })
+            
+            return formatted_results
+            
+        except Exception as e:
+            logger.error(f"Error searching: {e}")
+            return []
+    
+    def retriever(self, param: Dict[str, Any], query_text: str, 
+                          top_k: int = 5, filters: Dict[str, Any] = None):
+        """
+        带过滤条件的相似搜索
+        """
+        try:
+            collection_name = param.get('collection_name')
+            collection = Collection(collection_name)
+            collection.load()
+            
+            query_embedding = self.text_to_vector(query_text)
+            
+            # 构建过滤表达式
+            filter_expr = self._create_filter(filters)
+            
+            search_params = {
+                "metric_type": "COSINE",
+                "params": {"nprobe": 10}
+            }
+            
+            results = collection.search(
+                data=[query_embedding],
+                anns_field="embedding",
+                param=search_params,
+                limit=top_k,
+                expr=filter_expr,
+                output_fields=["text_content", "metadata"]
+            )
+            
+            formatted_results = []
+            for hits in results:
+                for hit in hits:
+                    formatted_results.append({
+                        'id': hit.id,
+                        'text_content': hit.entity.get('text_content'),
+                        'metadata': hit.entity.get('metadata'),
+                        'distance': hit.distance,
+                        'similarity': 1 - hit.distance
+                    })
+            
+            return formatted_results
+            
+        except Exception as e:
+            logger.error(f"Error searching with filter: {e}")
+            return []
+    
+    
+    def _create_filter(self, filters: Dict[str, Any]) -> str:
+        """
+        创建过滤条件
+        """
+        # 构建过滤表达式
+        filter_expr = ""
+        if filters:
+            conditions = []
+            for key, value in filters.items():
+                if isinstance(value, str):
+                    conditions.append(f'metadata["{key}"] == "{value}"')
+                elif isinstance(value, (int, float)):
+                    conditions.append(f'metadata["{key}"] == {value}')
+                else:
+                    conditions.append(f'metadata["{key}"] == "{json.dumps(value)}"')
+            filter_expr = " and ".join(conditions)
+        
+        return filter_expr
+
+    def db_test(self):
+        import time
+        # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
+        from foundation.models.silicon_flow import SiliconFlowAPI
+        client = SiliconFlowAPI()
+        # 初始化 Milvus 管理器
+        milvus_manager = MilvusVectorManager(base_api_platform=client)
+        
+        # 创建集合
+        collection_name = 'text_embeddings'
+        milvus_manager.create_collection(collection_name, dimension=384)
+        
+        # 插入单个文本
+        sample_text = "这是一个关于人工智能的文档。"
+        milvus_manager.insert_text(
+            collection_name, 
+            sample_text, 
+            metadata={'category': 'AI', 'source': 'example'}
+        )
+        
+        # 批量插入文本
+        sample_texts = [
+            {
+                'text': '机器学习是人工智能的一个重要分支。',
+                'metadata': {'category': 'ML', 'author': 'John'}
+            },
+            {
+                'text': '深度学习在图像识别领域取得了显著成果。',
+                'metadata': {'category': 'Deep Learning', 'author': 'Jane'}
+            },
+            {
+                'text': '自然语言处理技术在聊天机器人中得到广泛应用。',
+                'metadata': {'category': 'NLP', 'author': 'Bob'}
+            }
+        ]
+        
+        param = {"collection_name": collection_name}
+        milvus_manager.add_batch_documents(param, sample_texts)
+        
+        # 搜索相似文本
+        query = "人工智能相关的技术"
+        similar_docs = milvus_manager.similarity_search(param, query, top_k=3)
+        
+        logger.info("Similar documents found:")
+        for doc in similar_docs:
+            logger.info(f"ID: {doc['id']}, Text: {doc['text_content'][:50]}..., Similarity: {doc['similarity']:.3f}")
+        
+        # 带过滤条件的搜索
+        filtered_docs = milvus_manager.search_with_filter(
+            collection_name, 
+            query, 
+            top_k=3, 
+            filters={'category': 'AI'}
+        )
+        
+        logger.info("\nFiltered similar documents:")
+        for doc in filtered_docs:
+            logger.info(f"ID: {doc['id']}, Text: {doc['text_content'][:50]}..., Similarity: {doc['similarity']:.3f}")
+

+ 269 - 0
foundation/rag/vector/pg_vector.py

@@ -0,0 +1,269 @@
+
+import psycopg2
+from psycopg2.extras import RealDictCursor
+import numpy as np
+from sentence_transformers import SentenceTransformer
+import json
+from typing import List, Dict, Any
+from foundation.base.config import config_handler
+from foundation.logger.loggering import server_logger as logger
+from foundation.rag.vector.base_vector import BaseVectorDB
+from foundation.models.base_online_platform import BaseApiPlatform
+
+class PGVectorDB(BaseVectorDB):
+    def __init__(self , base_api_platform :BaseApiPlatform):
+        """
+        初始化 pgvector 连接
+        """
+        self.connection_params = {
+            'host': config_handler.get('pgvector', 'PGVECTOR_HOST', 'localhost'),
+            'port': int(config_handler.get('pgvector', 'PGVECTOR_PORT', '5432')),
+            'database': config_handler.get('pgvector', 'PGVECTOR_DB', 'postgres'),
+            'user': config_handler.get('pgvector', 'PGVECTOR_USER', 'postgres'),
+            'password': config_handler.get('pgvector', 'PGVECTOR_PASSWORD', 'postgres')
+        }
+
+        self.base_api_platform = base_api_platform
+        
+        
+    def get_connection(self):
+        """获取数据库连接"""
+        #logger.info(f"Connecting to PostgreSQL...{self.connection_params}")
+        conn = psycopg2.connect(**self.connection_params)
+        # 启用 pgvector 扩展
+        with conn.cursor() as cur:
+            cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
+        conn.commit()
+        return conn
+    
+    def create_table(self, table_name: str, vector_dim: int = 384):
+        """
+        创建向量表
+        """
+        conn = self.get_connection()
+        try:
+            with conn.cursor() as cur:
+                # 创建表
+                create_table_sql = f"""
+                CREATE TABLE IF NOT EXISTS {table_name} (
+                    id SERIAL PRIMARY KEY,
+                    text_content TEXT,
+                    embedding vector({vector_dim}),
+                    metadata JSONB DEFAULT '{{}}'::jsonb,
+                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+                );
+                
+                -- 创建向量相似度索引
+                CREATE INDEX IF NOT EXISTS idx_{table_name}_embedding 
+                ON {table_name} USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
+                """
+                cur.execute(create_table_sql)
+                conn.commit()
+                print(f"Table {table_name} created successfully!")
+        except Exception as e:
+            logger.error(f"Error creating table: {e}")
+            conn.rollback()
+        finally:
+            conn.close()
+    
+
+    def document_standard(self, documents: List[Dict[str, Any]]):
+        """
+        对文档进行结果标准处理
+        """
+        result = []
+        for doc in documents:
+            tmp = {}
+            tmp['content'] = doc.page_content
+            tmp['metadata'] = doc.metadata if doc.metadata else {}
+            result.append(tmp)
+        return result
+
+
+
+    def add_document(self , param: Dict[str, Any] , document: Dict[str, Any]):
+        """
+        插入单个文本及其向量
+        """
+        table_name = param.get('table_name')
+        text = document.get('content')
+        metadata = document.get('metadata')
+
+        conn = self.get_connection()
+        try:
+            with conn.cursor() as cur:
+                embedding = self.text_to_vector(text)
+                metadata = metadata or {}
+                
+                insert_sql = f"""
+                INSERT INTO {table_name} (text_content, embedding, metadata)
+                VALUES (%s, %s, %s)
+                RETURNING id;
+                """
+                cur.execute(insert_sql, (text, embedding, json.dumps(metadata)))
+                inserted_id = cur.fetchone()[0]
+                conn.commit()
+                print(f"Text inserted with ID: {inserted_id}")
+                return inserted_id
+        except Exception as e:
+            print(f"Error inserting text: {e}")
+            conn.rollback()
+            return None
+        finally:
+            conn.close()
+    
+    def add_batch_documents(self , param: Dict[str, Any] , documents: List[Dict[str, Any]]):
+        """
+        批量插入文本
+        texts: [{'text': '...', 'metadata': {...}}, ...]
+        """
+        table_name = param.get('table_name')
+        conn = self.get_connection()
+        try:
+            with conn.cursor() as cur:
+                # 准备数据
+                data_to_insert = []
+                for item in documents:
+                    text = item['content']
+                    metadata = item.get('metadata', {})
+                    embedding = self.text_to_vector(text)
+                    data_to_insert.append((text, embedding, json.dumps(metadata)))
+                
+                # 批量插入
+                insert_sql = f"""
+                INSERT INTO {table_name} (text_content, embedding, metadata)
+                VALUES (%s, %s, %s)
+                """
+                cur.executemany(insert_sql, data_to_insert)
+                conn.commit()
+                logger.info(f"Batch inserted {len(data_to_insert)} records")
+        except Exception as e:
+            logger.error(f"Error batch inserting: {e}")
+            conn.rollback()
+        finally:
+            conn.close()
+    
+    def similarity_search(self, param: Dict[str, Any], query_text: str , min_score=0.5 , 
+                          top_k=5, filters: Dict[str, Any] = None):
+        """
+        搜索相似文本
+            search_similar 使用距离度量(越小越相似)
+            
+        """
+        table_name = param.get('table_name')
+        conn = self.get_connection()
+        try:
+            with conn.cursor(cursor_factory=RealDictCursor) as cur:
+                query_embedding = self.text_to_vector(query_text)
+                
+                search_sql = f"""
+                SELECT id, text_content, metadata, 
+                       embedding <=> %s::vector AS distance
+                FROM {table_name}
+                ORDER BY embedding <=> %s::vector
+                LIMIT %s;
+                """
+                cur.execute(search_sql, (query_embedding, query_embedding, top_k))
+                results = cur.fetchall()
+                
+                return results
+        except Exception as e:
+            logger.error(f"Error searching: {e}")
+            return []
+        finally:
+            conn.close()
+
+    
+    def retriever(self, param: Dict[str, Any], query_text: str , min_score=0.1 , 
+                                 top_k=10, filters: Dict[str, Any] = None):
+        """
+        使用余弦相似度搜索相似文本
+        """
+        table_name = param.get('table_name')
+        conn = self.get_connection()
+        try:
+            with conn.cursor(cursor_factory=RealDictCursor) as cur:
+                query_embedding = self.text_to_vector(query_text)
+                
+                search_sql = f"""
+                SELECT id, text_content, metadata,
+                       1 - (embedding <=> %s::vector) AS cosine_similarity
+                FROM {table_name}
+                WHERE 1 - (embedding <=> %s::vector) > %s
+                ORDER BY 1 - (embedding <=> %s::vector) DESC
+                LIMIT %s;
+                """
+                cur.execute(search_sql, (query_embedding, query_embedding, min_score, query_embedding, top_k))
+                results = cur.fetchall()
+                # 打印结果
+                self.result_logger_info(query_text , results)
+                return results
+        except Exception as e:
+            logger.error(f"Error searching with cosine similarity: {e}")
+            return []
+        finally:
+            conn.close()
+
+    
+    def result_logger_info(self , query, result_docs_cos):
+        """
+            记录搜索结果
+        """
+        logger.info(f"\n {'=' * 50}")
+        # 使用余弦相似度搜索
+        logger.info(f"\nSimilar documents with cosine similarity,query:{query},result_count: {len(result_docs_cos)}:")
+        for doc in result_docs_cos:
+            logger.info(f"ID: {doc['id']}, Text: {doc['text_content'][:50]}..., Similarity: {doc['cosine_similarity']:.3f}")
+
+
+
+    def db_test(self , query_text: str):
+        """
+        测试数据库连接和操作
+        """
+        table_name = 'test_documents'
+        # 创建表
+        self.create_table(table_name, vector_dim=768)
+        
+        # 插入单个文本
+        sample_text = "这是一个关于人工智能的文档。"
+        #self.insert_text(table_name, sample_text, {'category': 'AI', 'source': 'example'})
+        
+        # 批量插入文本
+        sample_texts = [
+            {
+                'text': '机器学习是人工智能的一个重要分支。',
+                'metadata': {'category': 'ML', 'author': 'John'}
+            },
+            {
+                'text': '深度学习在图像识别领域取得了显著成果。',
+                'metadata': {'category': 'Deep Learning', 'author': 'Jane'}
+            },
+            {
+                'text': '自然语言处理技术在聊天机器人中得到广泛应用。',
+                'metadata': {'category': 'NLP', 'author': 'Bob'}
+            }
+        ]
+        
+        #self.batch_insert_texts(table_name, sample_texts)
+        
+
+        logger.info(f"\n {'=' * 50}")
+        # 搜索相似文本
+        #query = "人工智能相关的技术"
+        query = query_text
+        logger.info(f"\n query={query}")
+
+        similar_docs = self.search_similar(table_name, query, top_k=3)
+        logger.info(f"Similar documents found {len(similar_docs)}:")
+        for doc in similar_docs:
+            logger.info(f"ID: {doc['id']}, Text: {doc['text_content'][:50]}..., Similarity: {1 - doc['distance']:.3f}")
+        
+        logger.info(f"\n {'=' * 50}")
+        # 使用余弦相似度搜索
+        similar_docs_cos = self.search_by_cosine_similarity(table_name, query, top_k=3)
+        
+        logger.info(f"\nSimilar documents with cosine similarity {len(similar_docs_cos)}:")
+        for doc in similar_docs_cos:
+            logger.info(f"ID: {doc['id']}, Text: {doc['text_content'][:50]}..., Similarity: {doc['cosine_similarity']:.3f}")
+

+ 249 - 0
foundation/rag/vector/pg_vector_mananger.py

@@ -0,0 +1,249 @@
+
+import psycopg2
+from psycopg2.extras import RealDictCursor
+import numpy as np
+from sentence_transformers import SentenceTransformer
+import json
+from typing import List, Dict, Any
+from foundation.base.config import config_handler
+from foundation.logger.loggering import server_logger as logger
+from foundation.rag.vector.base_vector import BaseVectorDB
+from foundation.models.base_online_platform import BaseApiPlatform
+
+class PGVectorManager:
+    def __init__(self , base_api_platform :BaseApiPlatform):
+        """
+        初始化 pgvector 连接
+        """
+        self.connection_params = {
+            'host': config_handler.get('pgvector', 'PGVECTOR_HOST', 'localhost'),
+            'port': int(config_handler.get('pgvector', 'PGVECTOR_PORT', '5432')),
+            'database': config_handler.get('pgvector', 'PGVECTOR_DB', 'postgres'),
+            'user': config_handler.get('pgvector', 'PGVECTOR_USER', 'postgres'),
+            'password': config_handler.get('pgvector', 'PGVECTOR_PASSWORD', 'postgres')
+        }
+
+        self.base_api_platform = base_api_platform
+        
+        # 初始化文本向量化模型
+        #self.model = SentenceTransformer('all-MiniLM-L6-v2')  # 可以替换为其他模型
+        
+    def get_connection(self):
+        """获取数据库连接"""
+        #logger.info(f"Connecting to PostgreSQL...{self.connection_params}")
+        conn = psycopg2.connect(**self.connection_params)
+        # 启用 pgvector 扩展
+        with conn.cursor() as cur:
+            cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
+        conn.commit()
+        return conn
+    
+    def create_table(self, table_name: str, vector_dim: int = 384):
+        """
+        创建向量表
+        """
+        conn = self.get_connection()
+        try:
+            with conn.cursor() as cur:
+                # 创建表
+                create_table_sql = f"""
+                CREATE TABLE IF NOT EXISTS {table_name} (
+                    id SERIAL PRIMARY KEY,
+                    text_content TEXT,
+                    embedding vector({vector_dim}),
+                    metadata JSONB DEFAULT '{{}}'::jsonb,
+                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+                );
+                
+                -- 创建向量相似度索引
+                CREATE INDEX IF NOT EXISTS idx_{table_name}_embedding 
+                ON {table_name} USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
+                """
+                cur.execute(create_table_sql)
+                conn.commit()
+                print(f"Table {table_name} created successfully!")
+        except Exception as e:
+            logger.error(f"Error creating table: {e}")
+            conn.rollback()
+        finally:
+            conn.close()
+    
+
+    def text_to_vector(self, text: str) -> List[float]:
+        """
+        将文本转换为向量
+        """
+        return self.base_api_platform.get_embeddings([text])[0]
+    
+    def insert_text(self, table_name: str, text: str, metadata: Dict[str, Any] = None):
+        """
+        插入单个文本及其向量
+        """
+        conn = self.get_connection()
+        try:
+            with conn.cursor() as cur:
+                embedding = self.text_to_vector(text)
+                metadata = metadata or {}
+                
+                insert_sql = f"""
+                INSERT INTO {table_name} (text_content, embedding, metadata)
+                VALUES (%s, %s, %s)
+                RETURNING id;
+                """
+                cur.execute(insert_sql, (text, embedding, json.dumps(metadata)))
+                inserted_id = cur.fetchone()[0]
+                conn.commit()
+                print(f"Text inserted with ID: {inserted_id}")
+                return inserted_id
+        except Exception as e:
+            print(f"Error inserting text: {e}")
+            conn.rollback()
+            return None
+        finally:
+            conn.close()
+    
+    def batch_insert_texts(self, table_name: str, texts: List[Dict[str, Any]]):
+        """
+        批量插入文本
+        texts: [{'text': '...', 'metadata': {...}}, ...]
+        """
+        conn = self.get_connection()
+        try:
+            with conn.cursor() as cur:
+                # 准备数据
+                data_to_insert = []
+                for item in texts:
+                    text = item['text']
+                    metadata = item.get('metadata', {})
+                    embedding = self.text_to_vector(text)
+                    data_to_insert.append((text, embedding, json.dumps(metadata)))
+                
+                # 批量插入
+                insert_sql = f"""
+                INSERT INTO {table_name} (text_content, embedding, metadata)
+                VALUES (%s, %s, %s)
+                """
+                cur.executemany(insert_sql, data_to_insert)
+                conn.commit()
+                print(f"Batch inserted {len(data_to_insert)} records")
+        except Exception as e:
+            logger.error(f"Error batch inserting: {e}")
+            conn.rollback()
+        finally:
+            conn.close()
+    
+    def search_similar(self, table_name: str, query_text: str, top_k: int = 5):
+        """
+        搜索相似文本
+            search_similar 使用距离度量(越小越相似)
+            
+        """
+        conn = self.get_connection()
+        try:
+            with conn.cursor(cursor_factory=RealDictCursor) as cur:
+                query_embedding = self.text_to_vector(query_text)
+                
+                search_sql = f"""
+                SELECT id, text_content, metadata, 
+                       embedding <=> %s::vector AS distance
+                FROM {table_name}
+                ORDER BY embedding <=> %s::vector
+                LIMIT %s;
+                """
+                cur.execute(search_sql, (query_embedding, query_embedding, top_k))
+                results = cur.fetchall()
+                
+                return results
+        except Exception as e:
+            logger.error(f"Error searching: {e}")
+            return []
+        finally:
+            conn.close()
+    
+    def search_by_cosine_similarity(self, table_name: str, query_text: str, top_k: int = 5, threshold: float = 0.5):
+        """
+        使用余弦相似度搜索相似文本
+        """
+        conn = self.get_connection()
+        try:
+            with conn.cursor(cursor_factory=RealDictCursor) as cur:
+                query_embedding = self.text_to_vector(query_text)
+                
+                search_sql = f"""
+                SELECT id, text_content, metadata,
+                       1 - (embedding <=> %s::vector) AS cosine_similarity
+                FROM {table_name}
+                WHERE 1 - (embedding <=> %s::vector) > %s
+                ORDER BY 1 - (embedding <=> %s::vector) DESC
+                LIMIT %s;
+                """
+                cur.execute(search_sql, (query_embedding, query_embedding, threshold, query_embedding, top_k))
+                results = cur.fetchall()
+                
+                return results
+        except Exception as e:
+            logger.error(f"Error searching with cosine similarity: {e}")
+            return []
+        finally:
+            conn.close()
+
+    def db_test(self , query_text: str):
+        """
+        测试数据库连接和操作
+        """
+        table_name = 'test_documents'
+        # 创建表
+        self.create_table(table_name, vector_dim=768)
+        
+        # 插入单个文本
+        sample_text = "这是一个关于人工智能的文档。"
+        #self.insert_text(table_name, sample_text, {'category': 'AI', 'source': 'example'})
+        
+        # 批量插入文本
+        sample_texts = [
+            {
+                'text': '机器学习是人工智能的一个重要分支。',
+                'metadata': {'category': 'ML', 'author': 'John'}
+            },
+            {
+                'text': '深度学习在图像识别领域取得了显著成果。',
+                'metadata': {'category': 'Deep Learning', 'author': 'Jane'}
+            },
+            {
+                'text': '自然语言处理技术在聊天机器人中得到广泛应用。',
+                'metadata': {'category': 'NLP', 'author': 'Bob'}
+            }
+        ]
+        
+        #self.batch_insert_texts(table_name, sample_texts)
+        
+
+        logger.info(f"\n {'=' * 50}")
+        # 搜索相似文本
+        #query = "人工智能相关的技术"
+        query = query_text
+        logger.info(f"\n query={query}")
+
+        similar_docs = self.search_similar(table_name, query, top_k=3)
+        logger.info(f"Similar documents found {len(similar_docs)}:")
+        for doc in similar_docs:
+            logger.info(f"ID: {doc['id']}, Text: {doc['text_content'][:50]}..., Similarity: {1 - doc['distance']:.3f}")
+        
+        logger.info(f"\n {'=' * 50}")
+        # 使用余弦相似度搜索
+        similar_docs_cos = self.search_by_cosine_similarity(table_name, query, top_k=3)
+        
+        logger.info(f"\nSimilar documents with cosine similarity {len(similar_docs_cos)}:")
+        for doc in similar_docs_cos:
+            logger.info(f"ID: {doc['id']}, Text: {doc['text_content'][:50]}..., Similarity: {doc['cosine_similarity']:.3f}")
+
+
+
+# 使用示例
+def main():
+    # 初始化数据库管理器
+    db_manager = PGVectorManager()
+    db_manager.db_test()
+
+if __name__ == "__main__":
+    main()

BIN
test/bfp_files/GB50010-2010(2015版) 混凝土结构设计规范.pdf


BIN
test/bfp_files/《公路水运工程安全生产监督管理办法》.docx


BIN
test/bfp_files/《架桥机安全规程》.pdf


BIN
test/bfp_files/《架桥机通用技术条件》.pdf


BIN
test/bfp_files/《起重机械安全规程》.pdf


BIN
test/bfp_files/中华人民共和国安全生产法.pdf


BIN
test/bfp_files/公路工程施工安全技术规范.pdf


BIN
test/bfp_files/公路工程质量检验评定标准+第一册+土建工程.pdf


BIN
test/bfp_files/公路水运工程拟淘汰危及生产安全施工工艺、设备和材料目录.pdf


BIN
test/bfp_files/公路水运工程质量监督管理规定.docx


BIN
test/bfp_files/危险性较大的分部分项工程安全管理规定__2018年第15号国务院公报_中国政府网.pdf


BIN
test/bfp_files/建筑施工塔式起重机安装、使用、拆卸安全技术规范.pdf


BIN
test/bfp_files/建筑施工模板安全技术规范JGJ162-2008.pdf


BIN
test/bfp_files/建筑施工高空作业安全技术规范.pdf


BIN
test/bfp_files/施工现场临时用电安全技术规范.pdf


BIN
test/bfp_files/混凝土结构工程施工质量验收规范GB 50204-2015.pdf


+ 276 - 1
views/test_views.py

@@ -22,7 +22,11 @@ from foundation.utils.common import return_json, handler_err
 from views import test_router, get_operation_id
 from foundation.agent.workflow.test_workflow_graph import test_workflow_graph
 from file_processors.pdf_processor import PDFProcessor
+from file_processors.bfp_pdf_processor import BfpPDFProcessor
 
+from foundation.models.silicon_flow import SiliconFlowAPI
+from foundation.rag.vector.pg_vector_mananger import PGVectorManager
+from foundation.rag.vector.pg_vector import PGVectorDB
 
 
 @test_router.post("/generate/chat", response_model=TestForm)
@@ -373,7 +377,7 @@ async def generate_chat_endpoint(
 
 
 @test_router.post("/data/pdf/governance", response_model=TestForm)
-async def generate_chat_endpoint(
+async def pdf_governance_endpoint(
         param: TestForm,
         trace_id: str = Depends(get_operation_id)):
     """
@@ -409,4 +413,275 @@ async def generate_chat_endpoint(
 
     except Exception as err:
         handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+    
+
+
+@test_router.post("/data/bfp/governance", response_model=TestForm)
+async def bfp_governance_endpoint(
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+        编制依据文档切分处理
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        print(trace_id)
+        # 从字典中获取input
+        input_query = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        task_prompt_info = {"task_prompt": ""}
+        #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
+        file_directory= "test/bfp_files"
+         # 初始化知识问答处理
+        pdf_processor = BfpPDFProcessor(directory=file_directory)
+        file_data = pdf_processor.process_pdfs_group()
+        server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/governance")
+        output = None
+        
+
+        #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+
+@test_router.post("/data/embedding/test", response_model=TestForm)
+async def embedding_test_endpoint(
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+        embedding模型测试
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        print(trace_id)
+        # 从字典中获取input
+        input_query = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        task_prompt_info = {"task_prompt": ""}
+        text = input_query
+         # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
+        from foundation.models.silicon_flow import SiliconFlowAPI
+        base_api_platform = SiliconFlowAPI()
+        embedding = base_api_platform.get_embeddings([text])[0]
+        embed_dim = len(embedding)
+        server_logger.info(trace_id=trace_id, msg=f"【result】: {embed_dim}")
+
+        output = f"embed_dim={embed_dim},embedding:{embedding}"
+        #output = test_generate_model_client.get_model_data_governance_invoke(trace_id , task_prompt_info, input_query, context)
+        # 直接执行
+        #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="embedding")
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="embedding")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="embedding")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+
+
+
+
+@test_router.post("/data/pgvector/test", response_model=TestForm)
+async def pgvector_test_endpoint(
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+        pg_vector 向量检索测试 
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        print(trace_id)
+        # 从字典中获取input
+        input_query = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        task_prompt_info = {"task_prompt": ""}
+        output = "success"
+         # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
+        client = SiliconFlowAPI()
+         # 初始化数据库管理器
+        # 1、原始测试
+        # db_manager = PGVectorManager(client)
+        # db_manager.db_test(query_text=input_query)
+
+        # 2、抽象测试
+        pg_vector_db = PGVectorDB(base_api_platform=client)
+        output = pg_vector_db.similarity_cosine_search(param={"table_name": "test_documents"}, query_text=input_query)
+
+        
+        # 直接执行
+        #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+    
+
+
+
+
+
+
+@test_router.post("/data/bfp/indb", response_model=TestForm)
+async def bfp_indb_endpoint(
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+        编制依据文档切分处理 和 入库处理
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        print(trace_id)
+        # 从字典中获取input
+        input_query = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        task_prompt_info = {"task_prompt": ""}
+        #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
+        file_directory= "test/bfp_files"
+         # 初始化知识问答处理
+        pdf_processor = BfpPDFProcessor(directory=file_directory)
+        file_data_list , total_chunks = pdf_processor.get_pdfs_group_data()
+        server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/governance")
+        output = None
+        
+        # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
+        client = SiliconFlowAPI()
+        # 抽象测试
+        pg_vector_db = PGVectorDB(base_api_platform=client)
+
+        for file_data in file_data_list:
+            #file_data = file_data[0:5]
+            # 数据标准化处理
+            documents = pg_vector_db.document_standard(file_data)
+            pg_vector_db.add_tqdm_batch_documents(param={"table_name": "tv_basis_of_preparation"}, documents=documents)
+
+
+        #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+    
+
+
+
+@test_router.post("/data/bfp/batch/indb", response_model=TestForm)
+async def bfp_batch_indb_endpoint(
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+        编制依据文档 批量切分和入库处理
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        # 从字典中获取input
+        input_query = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
+        client = SiliconFlowAPI()
+        # 抽象测试
+        pg_vector_db = PGVectorDB(base_api_platform=client)
+        #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
+        file_directory= "test/bfp_files"
+         # 初始化知识问答处理
+        pdf_processor = BfpPDFProcessor(directory=file_directory, base_vector=pg_vector_db)
+        pdf_processor.process_tqdm_pdfs_group()
+        server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/batch/indb")
+        output = "success"
+        
+        #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/batch/indb")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/batch/indb")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+    
+
+
+
+@test_router.post("/data/bfp/search", response_model=TestForm)
+async def bfp_search_endpoint(
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+        编制依据文档切分处理 和 入库处理
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        print(trace_id)
+        # 从字典中获取input
+        input_query = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        task_prompt_info = {"task_prompt": ""}
+        top_k = int(session_id)
+        
+        output = None
+        # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
+        client = SiliconFlowAPI()
+        # 抽象测试
+        pg_vector_db = PGVectorDB(base_api_platform=client)
+        output = pg_vector_db.retriever(param={"table_name": "tv_basis_of_preparation"}, query_text=input_query , top_k=top_k)
+
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
         return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))