# connections_test_final_last.py(终极适配版,无任何导入错误) from pymilvus import connections, CollectionSchema, FieldSchema, DataType, Collection from pymilvus.orm import use_database, list_collections MILVUS_HOST = "192.168.92.61" MILVUS_PORT = 19530 TARGET_DB = "lq_db" # 旧数据可能存在的数据库 VECTOR_DIM = 128 # 替换为你的向量维度(比如 768) try: # 1. 建立连接 connections.connect( alias="milvus_last_conn", host=MILVUS_HOST, port=MILVUS_PORT, timeout=30 ) print("✅ Milvus 连接成功!") # 2. 尝试切换到 lq_db 数据库 print(f"🔄 尝试切换到数据库:{TARGET_DB}") use_database(TARGET_DB, using="milvus_last_conn") print(f"✅ 成功切换到数据库:{TARGET_DB}") # 3. 查询 lq_db 中所有 Collection(自动找旧数据) print(f"\n📋 {TARGET_DB} 数据库中所有 Collection:") all_collections = list_collections(using="milvus_last_conn") if not all_collections: print(f"❌ {TARGET_DB} 中无任何 Collection(旧数据元数据已丢失)") else: print(f"✅ 找到 {len(all_collections)} 个 Collection:{all_collections}") # 4. 遍历每个 Collection 查数据量 for coll_name in all_collections: print(f"\n=== 🔍 Collection:{coll_name} ===") # 定义通用结构(适配大多数场景) fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=VECTOR_DIM) ] schema = CollectionSchema(fields=fields) coll = Collection( name=coll_name, schema=schema, using="milvus_last_conn" ) row_count = coll.num_entities if row_count > 0: print(f"🎉 旧数据找到!数据量:{row_count} 条") print(f"👉 Attu 切换到 {TARGET_DB} → 选择 {coll_name} 即可查看!") else: print(f"⚠️ 该 Collection 无数据") except Exception as e: error_msg = str(e).lower() if "database not found" in error_msg: print(f"❌ 未找到 {TARGET_DB} 数据库(旧数据库已丢失)") else: print(f"❌ 错误:{e}") finally: # 断开连接 if connections.has_connection("milvus_last_conn"): connections.disconnect("milvus_last_conn") print("\n🔌 已断开连接") if __name__ == "__main__": pass