Prechádzať zdrojové kódy

增加milvus向量数据库

lingmin_package@163.com 4 mesiacov pred
rodič
commit
eaf24a4334

+ 6 - 6
config/config.ini

@@ -50,6 +50,7 @@ SLCF_REANKER_MODEL_ID=BAAI/bge-reranker-v2-m3
 SLCF_VL_CHAT_MODEL_ID=THUDM/GLM-4.1V-9B-Thinking
 
 
+
 [pgvector]
 PGVECTOR_HOST=124.223.140.149
 PGVECTOR_PORT=7432
@@ -57,10 +58,9 @@ PGVECTOR_DB=vector_db
 PGVECTOR_USER=vector_user
 PGVECTOR_PASSWORD=pg16@123
 
-
 [milvus]
-MILVUS_HOST=124.223.140.149
-MILVUS_PORT=7432
-MILVUS_DB=vector_db
-MILVUS_USER=vector_user
-MILVUS_PASSWORD=pg16@123
+MILVUS_HOST=192.168.0.3
+MILVUS_PORT=19530
+MILVUS_DB=lq_db
+MILVUS_USER=
+MILVUS_PASSWORD=

+ 75 - 0
docker/docker-compose.yml

@@ -0,0 +1,75 @@
+version: '3.5'
+
+services:
+  etcd:
+    container_name: milvus-etcd
+    image: quay.io/coreos/etcd:v3.5.5
+    environment:
+      - ETCD_AUTO_COMPACTION_MODE=revision
+      - ETCD_AUTO_COMPACTION_RETENTION=1000
+      - ETCD_QUOTA_BACKEND_BYTES=4294967296
+      - ETCD_SNAPSHOT_COUNT=50000
+    volumes:
+      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
+    command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
+    healthcheck:
+      test: ["CMD", "etcdctl", "endpoint", "health"]
+      interval: 30s
+      timeout: 20s
+      retries: 3
+
+  minio:
+    container_name: milvus-minio
+    image: minio/minio:RELEASE.2023-03-20T20-16-18Z
+    environment:
+      MINIO_ACCESS_KEY: minioadmin
+      MINIO_SECRET_KEY: minioadmin
+    ports:
+      - "9001:9001"
+      - "9000:9000"
+    volumes:
+      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
+    command: minio server /minio_data --console-address ":9001"
+    healthcheck:
+      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
+      interval: 30s
+      timeout: 20s
+      retries: 3
+
+  standalone:
+    container_name: milvus-standalone
+    image: milvusdb/milvus:v2.3.4
+    command: ["milvus", "run", "standalone"]
+    security_opt:
+    - seccomp:unconfined
+    environment:
+      ETCD_ENDPOINTS: etcd:2379
+      MINIO_ADDRESS: minio:9000
+    volumes:
+      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
+    healthcheck:
+      test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"]
+      interval: 30s
+      start_period: 90s
+      timeout: 20s
+      retries: 3
+    ports:
+      - "19530:19530"
+      - "9091:9091"
+    depends_on:
+      - "etcd"
+      - "minio"
+
+  attu:
+    container_name: milvus-attu
+    image: zilliz/attu:latest
+    environment:
+      - MILVUS_URL=standalone:19530  # 关键:使用服务名和端口
+    ports:
+      - "13000:3000"
+    depends_on:
+      - "standalone"  # 确保在 standalone 启动后启动
+
+networks:
+  default:
+    name: milvus

+ 2 - 2
file_processors/bfp_pdf_processor.py

@@ -89,7 +89,7 @@ class BfpPDFProcessor:
         return docs
 
 
-    def process_tqdm_pdfs_group(self):
+    def process_tqdm_pdfs_group(self , key_name:str = "collection_name"):
         """
         处理PDF文件组 并且直接入库处理
         """
@@ -114,7 +114,7 @@ class BfpPDFProcessor:
                 # 调用传入的插入函数
                 # 数据标准化处理
                 documents = self.base_vector.document_standard(docs)
-                self.base_vector.add_tqdm_batch_documents(param={"table_name": "tv_basis_of_preparation"}, documents=documents)
+                self.base_vector.add_tqdm_batch_documents(param={key_name: "tv_basis_of_preparation"}, documents=documents)
 
                 total_docs_inserted += 1
                 # 计算并显示当前的TPM

+ 9 - 2
foundation/rag/vector/base_vector.py

@@ -23,11 +23,18 @@ class BaseVectorDB:
         return self.base_api_platform.get_embeddings([text])[0]
     
 
+
     def document_standard(self, documents: List[Dict[str, Any]]):
         """
-          文档标准处理
+        对文档进行结果标准处理
         """
-        raise NotImplementedError
+        result = []
+        for doc in documents:
+            tmp = {}
+            tmp['content'] = doc.page_content
+            tmp['metadata'] = doc.metadata if doc.metadata else {}
+            result.append(tmp)
+        return result
 
     
     def add_document(self , param: Dict[str, Any] , document: Dict[str, Any]):

+ 45 - 25
foundation/rag/vector/milvus_vector.py

@@ -18,6 +18,7 @@ class MilvusVectorManager(BaseVectorDB):
 
         self.host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
         self.port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
+        self.milvus_db = config_handler.get('milvus', 'MILVUS_DB', 'default')
         self.user = config_handler.get('milvus', 'MILVUS_USER')
         self.password = config_handler.get('milvus', 'MILVUS_PASSWORD')
         
@@ -28,14 +29,18 @@ class MilvusVectorManager(BaseVectorDB):
         self.connect()
     
     def connect(self):
-        """连接到 Milvus 服务器"""
+        """连接到 Milvus 服务器
+        ,
+                password=self.password
+                alias="default",
+        """
         try:
             connections.connect(
                 alias="default",
                 host=self.host,
                 port=self.port,
                 user=self.user,
-                password=self.password
+                db_name="lq_db"
             )
             logger.info(f"Connected to Milvus at {self.host}:{self.port}")
         except Exception as e:
@@ -51,13 +56,15 @@ class MilvusVectorManager(BaseVectorDB):
             # 检查集合是否已存在
             if utility.has_collection(collection_name):
                 logger.info(f"Collection {collection_name} already exists")
-                return
+                utility.drop_collection(collection_name)
+                logger.info(f"Collection '{collection_name}' dropped successfully")
+                
             
             # 定义字段
             fields = [
                 FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
-                FieldSchema(name="text_content", dtype=DataType.VARCHAR, max_length=65535),
                 FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=dimension),
+                FieldSchema(name="text_content", dtype=DataType.VARCHAR, max_length=65535),
                 FieldSchema(name="metadata", dtype=DataType.JSON),
                 FieldSchema(name="created_at", dtype=DataType.INT64)
             ]
@@ -103,16 +110,20 @@ class MilvusVectorManager(BaseVectorDB):
             created_at = None
             
             # 转换文本为向量
+
             embedding = self.text_to_vector(text)
-            
+            #logger.info(f"Text converted to embedding:{isinstance(embedding, list)} ,{len(embedding)}")
+            #logger.info(f"Text converted to embedding:{embedding}")
             # 准备数据
             data = [
-                [text],  # text_content
                 [embedding],  # embedding
+                [text],  # text_content
                 [metadata or {}],  # metadata
                 [created_at or int(time.time())]  # created_at
             ]
+            logger.info(f"Preparing to insert text_contents:{len(data[0])} ,{len(data[1])},{len(data[2])},{len(data[3])}")
             
+
             # 插入数据
             insert_result = collection.insert(data)
             collection.flush()  # 确保数据被写入
@@ -152,8 +163,10 @@ class MilvusVectorManager(BaseVectorDB):
                 metadatas.append(metadata)
                 timestamps.append(int(time.time()))
             
+            
             # 准备批量数据
-            data = [text_contents, embeddings, metadatas, timestamps]
+            data = [embeddings, text_contents, metadatas, timestamps]
+            #logger.info(f"Preparing to insert text_contents:{len(text_contents)} ,{len(embeddings)},{len(metadatas)},{len(timestamps)}")
             
             # 批量插入
             insert_result = collection.insert(data)
@@ -286,7 +299,8 @@ class MilvusVectorManager(BaseVectorDB):
         
         return filter_expr
 
-    def db_test(self):
+    def db_test(self , query_text):
+        query = query_text
         import time
         # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
         from foundation.models.silicon_flow import SiliconFlowAPI
@@ -296,52 +310,58 @@ class MilvusVectorManager(BaseVectorDB):
         
         # 创建集合
         collection_name = 'text_embeddings'
-        milvus_manager.create_collection(collection_name, dimension=384)
+        milvus_manager.create_collection(collection_name, dimension=768)
+        
+        param = {"collection_name": collection_name}
         
         # 插入单个文本
         sample_text = "这是一个关于人工智能的文档。"
-        milvus_manager.insert_text(
-            collection_name, 
-            sample_text, 
-            metadata={'category': 'AI', 'source': 'example'}
+        milvus_manager.add_document(
+            param, 
+            {"content":sample_text , "metadata": {'category': 'AI', 'source': 'example'}}
         )
         
         # 批量插入文本
         sample_texts = [
             {
-                'text': '机器学习是人工智能的一个重要分支。',
+                'content': '机器学习是人工智能的一个重要分支。',
                 'metadata': {'category': 'ML', 'author': 'John'}
             },
             {
-                'text': '深度学习在图像识别领域取得了显著成果。',
+                'content': '深度学习在图像识别领域取得了显著成果。',
                 'metadata': {'category': 'Deep Learning', 'author': 'Jane'}
             },
             {
-                'text': '自然语言处理技术在聊天机器人中得到广泛应用。',
+                'content': '自然语言处理技术在聊天机器人中得到广泛应用。',
                 'metadata': {'category': 'NLP', 'author': 'Bob'}
             }
+            ,
+            {
+                'content': 'AI发展速度快,但需要更多的计算资源。',
+                'metadata': {'category': 'AI', 'author': 'Bob'}
+            }
         ]
         
-        param = {"collection_name": collection_name}
-        milvus_manager.add_batch_documents(param, sample_texts)
+       
+        milvus_manager.add_batch_documents(param=param, documents=sample_texts)
         
         # 搜索相似文本
         query = "人工智能相关的技术"
-        similar_docs = milvus_manager.similarity_search(param, query, top_k=3)
+        similar_docs = milvus_manager.similarity_search(param, query, top_k=5)
         
-        logger.info("Similar documents found:")
+        logger.info(f"Similar documents found-{len(similar_docs)}:")
         for doc in similar_docs:
             logger.info(f"ID: {doc['id']}, Text: {doc['text_content'][:50]}..., Similarity: {doc['similarity']:.3f}")
-        
+        logger.info(f"{'=' *20}")
         # 带过滤条件的搜索
-        filtered_docs = milvus_manager.search_with_filter(
-            collection_name, 
+        filtered_docs = milvus_manager.retriever(
+            param, 
             query, 
-            top_k=3, 
+            top_k=5, 
             filters={'category': 'AI'}
         )
         
-        logger.info("\nFiltered similar documents:")
+        logger.info(f"\nFiltered similar documents-{len(filtered_docs)}:")
         for doc in filtered_docs:
             logger.info(f"ID: {doc['id']}, Text: {doc['text_content'][:50]}..., Similarity: {doc['similarity']:.3f}")
 

+ 44 - 0
test/test_pymilvus.py

@@ -0,0 +1,44 @@
+from pymilvus import connections, utility
+from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
+
+def get_connection_info():
+    """获取 Milvus 连接信息"""
+    """连接到多个数据库"""
+    databases = ['default', 'lq_db']
+    try:
+        # 使用与你的代码相同的连接方式  alias="default",  db_name="lq_db"
+        connections.connect(
+            alias="default", 
+            host='192.168.0.3', 
+            port='19530',
+            db_name="lq_db"
+        )
+        
+        #utility.get_connection("lq_db")
+        # 获取服务器信息
+        version = utility.get_server_version()
+        print(f"✅ Milvus 版本: {version}")
+        
+        # 获取连接详情
+        # conn = connections.get_connection("default")
+        # print(f"✅ 连接地址: {conn._address}")
+        # print(f"✅ 连接端口: {conn._port}")
+        
+        # 测试列出 collections
+        collections = utility.list_collections()
+        print(f"✅ 现有 collections: {collections}")
+        print(f"{collections[0]}")
+
+        collection_name = "tv_basis_of_preparation"
+        collection = Collection(collection_name, using="lq_db")
+        print(f"✅ 集合信息: {collection.name}")
+
+        
+        return True
+        
+    except Exception as e:
+        print(f"❌ 连接失败: {e}")
+        return False
+
+if __name__ == "__main__":
+    get_connection_info()

+ 134 - 1
views/test_views.py

@@ -27,6 +27,7 @@ from file_processors.bfp_pdf_processor import BfpPDFProcessor
 from foundation.models.silicon_flow import SiliconFlowAPI
 from foundation.rag.vector.pg_vector_mananger import PGVectorManager
 from foundation.rag.vector.pg_vector import PGVectorDB
+from foundation.rag.vector.milvus_vector import MilvusVectorManager
 
 
 @test_router.post("/generate/chat", response_model=TestForm)
@@ -550,6 +551,52 @@ async def pgvector_test_endpoint(
 
 
 
+@test_router.post("/data/milvus/test", response_model=TestForm)
+async def pgvector_test_endpoint(
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+        milvus 向量检索测试 
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        print(trace_id)
+        # 从字典中获取input
+        input_query = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        task_prompt_info = {"task_prompt": ""}
+        output = "success"
+         # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
+        client = SiliconFlowAPI()
+         # 初始化数据库管理器
+        # 1、原始测试
+        db_manager = MilvusVectorManager(client)
+        db_manager.db_test(query_text=input_query)
+
+        # 2、抽象测试
+        # pg_vector_db = MilvusVectorManager(base_api_platform=client)
+        # output = pg_vector_db.similarity_cosine_search(param={"table_name": "test_documents"}, query_text=input_query)
+
+        
+        # 直接执行
+        #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+    
+
+
 
 
 @test_router.post("/data/bfp/indb", response_model=TestForm)
@@ -628,7 +675,7 @@ async def bfp_batch_indb_endpoint(
         file_directory= "test/bfp_files"
          # 初始化知识问答处理
         pdf_processor = BfpPDFProcessor(directory=file_directory, base_vector=pg_vector_db)
-        pdf_processor.process_tqdm_pdfs_group()
+        pdf_processor.process_tqdm_pdfs_group(key_name="table_name")
         server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/batch/indb")
         output = "success"
         
@@ -684,4 +731,90 @@ async def bfp_search_endpoint(
 
     except Exception as err:
         handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+    
+
+
+
+
+
+@test_router.post("/data/bfp/milvus/batch/indb", response_model=TestForm)
+async def bfp_batch_indb_endpoint(
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+        编制依据文档 批量切分和入库处理
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        # 从字典中获取input
+        input_query = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
+        client = SiliconFlowAPI()
+        # 抽象测试
+        vector_db = MilvusVectorManager(base_api_platform=client)
+        #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
+        file_directory= "test/bfp_files"
+         # 初始化知识问答处理
+        pdf_processor = BfpPDFProcessor(directory=file_directory, base_vector=vector_db)
+        pdf_processor.process_tqdm_pdfs_group(key_name="collection_name")
+        server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/milvus/batch/indb")
+        output = "success"
+        
+        #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/batch/indb")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/batch/indb")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+    
+
+
+
+@test_router.post("/data/bfp/milvus/search", response_model=TestForm)
+async def bfp_search_endpoint(
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+        编制依据文档切分处理 和 入库处理
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        print(trace_id)
+        # 从字典中获取input
+        input_query = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        task_prompt_info = {"task_prompt": ""}
+        top_k = int(session_id)
+        
+        output = None
+        # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
+        client = SiliconFlowAPI()
+        # 抽象测试
+        vector_db = MilvusVectorManager(base_api_platform=client)
+        output = vector_db.retriever(param={"collection_name": "tv_basis_of_preparation"}, query_text=input_query , top_k=top_k)
+
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/search")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/search")
         return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))