Prechádzať zdrojové kódy

feat: 创建 first_bfp_collection_entity Milvus Collection

ai02 4 týždňov pred
rodič
commit
9d5ea3230f

+ 231 - 0
src/app/scripts/first_bfp_collection_entity_create.py

@@ -0,0 +1,231 @@
+"""
+创建 first_bfp_collection_entity Milvus Collection
+
+collection用途:存储建筑工程编制依据实体,支持全文检索+语义检索
+
+字段结构(7个核心字段):
+1. text: 抽取的实体名字(如"延性构件构造细节设计")
+2. pk: 整型,Milvus自增主键
+3. dense: 浮点向量,维度4096,用于语义检索
+4. sparse: 稀疏向量,由BM25函数从text自动生成
+5. uuid: 字符串,UUID4格式
+6. file: 字符串,MD文档文件名
+7. title: 字符串,实体所在句的MD标题
+8. background: 字符串(JSON数组),包含实体的原句上下文
+
+用法:
+    uv run -m src.app.scripts.first_bfp_collection_entity_create
+"""
+from __future__ import annotations
+
+from pymilvus import DataType, Function, FunctionType
+
+from app.config.milvus_client import get_milvusclient
+
+# Collection 名称
+COLLECTION_NAME = "first_bfp_collection_entity2"
+
+# 向量维度(与embedding模型输出维度一致)
+DENSE_DIM = 4096
+
+
+def create_schema():
+    """创建Schema定义 - 7个核心字段"""
+    client = get_milvusclient()
+    
+    schema = client.create_schema(auto_id=True, enable_dynamic_fields=False)
+    
+    # ==================== 核心检索字段 ====================
+    
+    # 1. text字段:实体名字,用于全文检索和BM25
+    schema.add_field(
+        "text",
+        DataType.VARCHAR,
+        max_length=65535,
+        enable_analyzer=True,
+        analyzer_params={"type": "chinese"},
+        enable_match=True,
+        description="抽取的实体名字,如:延性构件构造细节设计、JGJ59-2011"
+    )
+    
+    # 2. pk字段:主键,Milvus自增
+    schema.add_field(
+        "pk",
+        DataType.INT64,
+        is_primary=True,
+        auto_id=True,
+        description="主键ID,自增列"
+    )
+    
+    # 3. dense字段:密集向量,用于语义检索
+    schema.add_field(
+        "dense",
+        DataType.FLOAT_VECTOR,
+        dim=DENSE_DIM,
+        description="浮点向量,维度4096,用于语义检索"
+    )
+    
+    # 4. sparse字段:稀疏向量,由BM25函数从text字段自动生成
+    schema.add_field(
+        "sparse",
+        DataType.SPARSE_FLOAT_VECTOR,
+        description="稀疏向量,由BM25函数从text字段自动生成,用于关键词检索"
+    )
+    
+    # ==================== 业务字段 ====================
+    
+    # 5. uuid字段:UUID4格式唯一标识
+    schema.add_field(
+        "uuid",
+        DataType.VARCHAR,
+        max_length=64,
+        description="UUID4格式唯一标识,如:f81d4fae-7dec-11d0-a765-00a0c91e6bf6"
+    )
+    
+    # 6. file字段:MD文档文件名
+    schema.add_field(
+        "file",
+        DataType.VARCHAR,
+        max_length=65535,
+        description="MD文档原始文件名,如:133《建筑施工安全检查标准》(JGJ59-2011).md"
+    )
+    
+    # 7. title字段:实体所在句的MD标题
+    schema.add_field(
+        "title",
+        DataType.VARCHAR,
+        max_length=65535,
+        description="实体所在句的MD标题(层级路径),如:第8章 构造规定 > 8.1 延性构件"
+    )
+    
+    # 8. background字段:实体上下文原句(JSON数组字符串)
+    schema.add_field(
+        "background",
+        DataType.VARCHAR,
+        max_length=65535,
+        description='实体上下文原句,JSON数组格式,如:["本规范第8章对延性构件...进行了规定"]'
+    )
+    
+    # ==================== BM25函数定义 ====================
+    
+    # BM25函数:自动从text字段生成sparse向量
+    bm25_function = Function(
+        name="bm25_function",
+        function_type=FunctionType.BM25,
+        input_field_names=["text"],
+        output_field_names=["sparse"],
+    )
+    schema.add_function(bm25_function)
+    
+    return schema
+
+
+def create_index(client, collection_name: str):
+    """为Collection创建索引"""
+    index_specs = [
+        {"field_name": "text", "index_name": "text_idx", "index_type": "INVERTED"},
+        {"field_name": "dense", "index_name": "dense", "index_type": "AUTOINDEX", "metric_type": "L2"},
+        {"field_name": "sparse", "index_name": "sparse", "index_type": "AUTOINDEX", "metric_type": "BM25"},
+        {"field_name": "uuid", "index_name": "uuid_idx", "index_type": "INVERTED"},
+        {"field_name": "file", "index_name": "file_idx", "index_type": "INVERTED"},
+        {"field_name": "title", "index_name": "title_idx", "index_type": "INVERTED"},
+        {"field_name": "background", "index_name": "background_idx", "index_type": "INVERTED"},
+    ]
+
+    for spec in index_specs:
+        index_params = client.prepare_index_params()
+        index_params.add_index(**spec)
+        try:
+            client.create_index(collection_name=collection_name, index_params=index_params)
+            print(f"✅ 索引创建成功: {collection_name}.{spec['index_name']}")
+        except Exception as e:
+            error_text = str(e).lower()
+            if "already exist" in error_text or "duplicate" in error_text:
+                print(f"ℹ️ 索引已存在,跳过: {collection_name}.{spec['index_name']}")
+                continue
+            raise
+
+
+def ensure_collection(collection_name: str = COLLECTION_NAME, create_idx: bool = True, auto_load: bool = True):
+    """
+    确保Collection存在,不存在则创建,并自动加载到内存
+    
+    Args:
+        collection_name: Collection名称
+        create_idx: 是否创建索引
+        auto_load: 是否自动加载到内存
+    """
+    client = get_milvusclient()
+    
+    if client.has_collection(collection_name=collection_name):
+        print(f"ℹ️ Collection已存在: {collection_name}")
+        if create_idx:
+            print(f"🔍 Collection已存在,补建索引: {collection_name}")
+            create_index(client, collection_name)
+        if auto_load:
+            try:
+                client.load_collection(collection_name=collection_name)
+                print(f"✅ Collection已加载到内存: {collection_name}")
+            except Exception as e:
+                print(f"⚠️ Collection加载失败(可能已加载): {e}")
+        return False
+    
+    print(f"📝 创建Schema: {collection_name}")
+    schema = create_schema()
+    
+    print(f"🏗️ 创建Collection: {collection_name}")
+    client.create_collection(
+        collection_name=collection_name,
+        schema=schema,
+        consistency_level="Bounded"
+    )
+    
+    if create_idx:
+        print(f"🔍 创建索引: {collection_name}")
+        create_index(client, collection_name)
+    
+    if auto_load:
+        print(f"📂 加载Collection到内存: {collection_name}")
+        try:
+            client.load_collection(collection_name=collection_name)
+            print(f"✅ Collection加载完成: {collection_name}")
+        except Exception as e:
+            print(f"⚠️ Collection加载失败: {e}")
+    
+    print(f"✅ Collection创建完成: {collection_name}")
+    return True
+
+
+def main():
+    """主函数"""
+    print("=" * 70)
+    print("编制依据实体详情 - Milvus Collection 创建工具")
+    print("=" * 70)
+    print("字段结构:")
+    print("  1. text: 抽取的实体名字")
+    print("  2. pk: 自增主键")
+    print("  3. dense: 密集向量(4096维)")
+    print("  4. sparse: 稀疏向量(BM25生成)")
+    print("  5. uuid: UUID4唯一标识")
+    print("  6. file: MD文档文件名")
+    print("  7. title: 实体所在句的MD标题")
+    print("  8. background: 实体上下文原句(JSON数组)")
+    print("=" * 70)
+    
+    try:
+        created = ensure_collection(COLLECTION_NAME, auto_load=True)
+        
+        print("\n" + "=" * 70)
+        if created:
+            print("✅ Collection创建并加载完成!")
+        else:
+            print("ℹ️ Collection已存在并加载")
+        print("=" * 70)
+        
+    except Exception as e:
+        print(f"\n❌ 创建失败: {str(e)}")
+        raise
+
+
+if __name__ == "__main__":
+    main()