Bläddra i källkod

增加milvus测试接口

lingmin_package@163.com 3 månader sedan
förälder
incheckning
0370592e36
3 ändrade filer med 127 tillägg och 32 borttagningar
  1. 33 3
      README.md
  2. 47 27
      foundation/rag/vector/milvus_vector.py
  3. 47 2
      views/test_views.py

+ 33 - 3
README.md

@@ -13,9 +13,7 @@
     - python .\views\construction_review\app.py
 
 
-    
-
-
+  
     pip install aioredis -i https://mirrors.aliyun.com/pypi/simple/
     pip install langgraph-checkpoint-postgres -i https://mirrors.aliyun.com/pypi/simple/
     pip install langchain-redis -i https://mirrors.aliyun.com/pypi/simple/
@@ -32,6 +30,38 @@
 
 
 
+### 向量数据库 milvus
+   - cd /home/cjb/lq_workspace/milvus
+   - docker-compose up -d
+   - 检查服务是否正常 http://192.168.0.3:9091/healthz
+   - 拉取并运行 Attu http://192.168.0.3:13000/#/connect
+
+
+#### 测试向量数据库 检索测试接口
+  - 测试接口1
+    http://localhost:8001/test/data/bfp/milvus/search
+        {
+        "config": {
+            "session_id":"5"
+        },
+        "input": "普通模版荷载计算"
+      }
+
+
+
+   - 测试接口2
+      http://localhost:8001/test/data/bfp/milvus/search
+        {
+        "config": {
+            "session_id":"3"
+        },
+        "input": "安全生产条件"
+      }
+
+
+
+
+
 ### 测试接口
 
   #### 生成模型接口 

+ 47 - 27
foundation/rag/vector/milvus_vector.py

@@ -18,6 +18,7 @@ class MilvusVectorManager(BaseVectorDB):
 
         self.host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
         self.port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
+        self.milvus_db = config_handler.get('milvus', 'MILVUS_DB', 'default')
         self.user = config_handler.get('milvus', 'MILVUS_USER')
         self.password = config_handler.get('milvus', 'MILVUS_PASSWORD')
         
@@ -28,14 +29,18 @@ class MilvusVectorManager(BaseVectorDB):
         self.connect()
     
     def connect(self):
-        """连接到 Milvus 服务器"""
+        """连接到 Milvus 服务器
+        ,
+                password=self.password
+                alias="default",
+        """
         try:
             connections.connect(
                 alias="default",
                 host=self.host,
                 port=self.port,
                 user=self.user,
-                password=self.password
+                db_name="lq_db"
             )
             logger.info(f"Connected to Milvus at {self.host}:{self.port}")
         except Exception as e:
@@ -51,13 +56,15 @@ class MilvusVectorManager(BaseVectorDB):
             # 检查集合是否已存在
             if utility.has_collection(collection_name):
                 logger.info(f"Collection {collection_name} already exists")
-                return
+                utility.drop_collection(collection_name)
+                logger.info(f"Collection '{collection_name}' dropped successfully")
+                
             
             # 定义字段
             fields = [
                 FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
-                FieldSchema(name="text_content", dtype=DataType.VARCHAR, max_length=65535),
                 FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=dimension),
+                FieldSchema(name="text_content", dtype=DataType.VARCHAR, max_length=65535),
                 FieldSchema(name="metadata", dtype=DataType.JSON),
                 FieldSchema(name="created_at", dtype=DataType.INT64)
             ]
@@ -103,16 +110,20 @@ class MilvusVectorManager(BaseVectorDB):
             created_at = None
             
             # 转换文本为向量
+
             embedding = self.text_to_vector(text)
-            
+            #logger.info(f"Text converted to embedding:{isinstance(embedding, list)} ,{len(embedding)}")
+            #logger.info(f"Text converted to embedding:{embedding}")
             # 准备数据
             data = [
-                [text],  # text_content
                 [embedding],  # embedding
+                [text],  # text_content
                 [metadata or {}],  # metadata
                 [created_at or int(time.time())]  # created_at
             ]
+            logger.info(f"Preparing to insert text_contents:{len(data[0])} ,{len(data[1])},{len(data[2])},{len(data[3])}")
             
+
             # 插入数据
             insert_result = collection.insert(data)
             collection.flush()  # 确保数据被写入
@@ -152,8 +163,10 @@ class MilvusVectorManager(BaseVectorDB):
                 metadatas.append(metadata)
                 timestamps.append(int(time.time()))
             
+            
             # 准备批量数据
-            data = [text_contents, embeddings, metadatas, timestamps]
+            data = [embeddings, text_contents, metadatas, timestamps]
+            #logger.info(f"Preparing to insert text_contents:{len(text_contents)} ,{len(embeddings)},{len(metadatas)},{len(timestamps)}")
             
             # 批量插入
             insert_result = collection.insert(data)
@@ -195,7 +208,7 @@ class MilvusVectorManager(BaseVectorDB):
             # 执行搜索
             results = collection.search(
                 data=[query_embedding],
-                anns_field="embedding",
+                anns_field="vector",
                 param=search_params,
                 limit=top_k,
                 expr=filter_expr,
@@ -242,7 +255,7 @@ class MilvusVectorManager(BaseVectorDB):
             
             results = collection.search(
                 data=[query_embedding],
-                anns_field="embedding",
+                anns_field="vector",
                 param=search_params,
                 limit=top_k,
                 expr=filter_expr,
@@ -286,7 +299,8 @@ class MilvusVectorManager(BaseVectorDB):
         
         return filter_expr
 
-    def db_test(self):
+    def db_test(self , query_text):
+        query = query_text
         import time
         # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
         from foundation.models.silicon_flow import SiliconFlowAPI
@@ -296,52 +310,58 @@ class MilvusVectorManager(BaseVectorDB):
         
         # 创建集合
         collection_name = 'text_embeddings'
-        milvus_manager.create_collection(collection_name, dimension=384)
+        milvus_manager.create_collection(collection_name, dimension=768)
+        
+        param = {"collection_name": collection_name}
         
         # 插入单个文本
         sample_text = "这是一个关于人工智能的文档。"
-        milvus_manager.insert_text(
-            collection_name, 
-            sample_text, 
-            metadata={'category': 'AI', 'source': 'example'}
+        milvus_manager.add_document(
+            param, 
+            {"content":sample_text , "metadata": {'category': 'AI', 'source': 'example'}}
         )
         
         # 批量插入文本
         sample_texts = [
             {
-                'text': '机器学习是人工智能的一个重要分支。',
+                'content': '机器学习是人工智能的一个重要分支。',
                 'metadata': {'category': 'ML', 'author': 'John'}
             },
             {
-                'text': '深度学习在图像识别领域取得了显著成果。',
+                'content': '深度学习在图像识别领域取得了显著成果。',
                 'metadata': {'category': 'Deep Learning', 'author': 'Jane'}
             },
             {
-                'text': '自然语言处理技术在聊天机器人中得到广泛应用。',
+                'content': '自然语言处理技术在聊天机器人中得到广泛应用。',
                 'metadata': {'category': 'NLP', 'author': 'Bob'}
             }
+            ,
+            {
+                'content': 'AI发展速度快,但需要更多的计算资源。',
+                'metadata': {'category': 'AI', 'author': 'Bob'}
+            }
         ]
         
-        param = {"collection_name": collection_name}
-        milvus_manager.add_batch_documents(param, sample_texts)
+       
+        milvus_manager.add_batch_documents(param=param, documents=sample_texts)
         
         # 搜索相似文本
         query = "人工智能相关的技术"
-        similar_docs = milvus_manager.similarity_search(param, query, top_k=3)
+        similar_docs = milvus_manager.similarity_search(param, query, top_k=5)
         
-        logger.info("Similar documents found:")
+        logger.info(f"Similar documents found-{len(similar_docs)}:")
         for doc in similar_docs:
             logger.info(f"ID: {doc['id']}, Text: {doc['text_content'][:50]}..., Similarity: {doc['similarity']:.3f}")
-        
+        logger.info(f"{'=' *20}")
         # 带过滤条件的搜索
-        filtered_docs = milvus_manager.search_with_filter(
-            collection_name, 
+        filtered_docs = milvus_manager.retriever(
+            param, 
             query, 
-            top_k=3, 
+            top_k=5, 
             filters={'category': 'AI'}
         )
         
-        logger.info("\nFiltered similar documents:")
+        logger.info(f"\nFiltered similar documents-{len(filtered_docs)}:")
         for doc in filtered_docs:
             logger.info(f"ID: {doc['id']}, Text: {doc['text_content'][:50]}..., Similarity: {doc['similarity']:.3f}")
 

+ 47 - 2
views/test_views.py

@@ -26,11 +26,14 @@ from foundation.agent.workflow.test_workflow_graph import test_workflow_graph
 from foundation.base.mysql.async_mysql_base_dao import TestTabDAO
 from database.repositories.bus_data_query import BasisOfPreparationDAO
 from foundation.utils.tool_utils import DateTimeEncoder
-from foundation.models.silicon_flow import SiliconFlowAPI
-from foundation.rag.vector.pg_vector import PGVectorDB
 from langchain_core.prompts import ChatPromptTemplate
 from foundation.utils.yaml_utils import system_prompt_config
 
+from foundation.models.silicon_flow import SiliconFlowAPI
+from foundation.rag.vector.pg_vector import PGVectorDB
+from foundation.rag.vector.milvus_vector import MilvusVectorManager
+
+
 
 @test_router.post("/generate/chat", response_model=TestForm)
 async def generate_chat_endpoint(
@@ -739,4 +742,46 @@ async def bfp_search_endpoint(
 
     except Exception as err:
         handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+    
+
+
+
+
+@test_router.post("/data/bfp/milvus/search", response_model=TestForm)
+async def bfp_search_endpoint(
+        param: TestForm,
+        trace_id: str = Depends(get_operation_id)):
+    """
+        编制依据文档切分处理 和 入库处理
+    """
+    try:
+        server_logger.info(trace_id=trace_id, msg=f"{param}")
+        print(trace_id)
+        # 从字典中获取input
+        input_query = param.input
+        session_id = param.config.session_id
+        context = param.context
+        header_info = {
+        }
+        task_prompt_info = {"task_prompt": ""}
+        top_k = int(session_id)
+        
+        output = None
+        # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
+        client = SiliconFlowAPI()
+        # 抽象测试
+        vector_db = MilvusVectorManager(base_api_platform=client)
+        output = vector_db.retriever(param={"collection_name": "tv_basis_of_preparation"}, query_text=input_query , top_k=top_k)
+
+        # 返回字典格式的响应
+        return JSONResponse(
+            return_json(data={"output": output}, data_type="text", trace_id=trace_id))
+
+    except ValueError as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/search")
+        return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
+
+    except Exception as err:
+        handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/search")
         return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))