| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- # connections_test_final_last.py(终极适配版,无任何导入错误)
- from pymilvus import connections, CollectionSchema, FieldSchema, DataType, Collection
- from pymilvus.orm import use_database, list_collections
- MILVUS_HOST = "192.168.92.61"
- MILVUS_PORT = 19530
- TARGET_DB = "lq_db" # 旧数据可能存在的数据库
- VECTOR_DIM = 128 # 替换为你的向量维度(比如 768)
- try:
- # 1. 建立连接
- connections.connect(
- alias="milvus_last_conn",
- host=MILVUS_HOST,
- port=MILVUS_PORT,
- timeout=30
- )
- print("✅ Milvus 连接成功!")
- # 2. 尝试切换到 lq_db 数据库
- print(f"🔄 尝试切换到数据库:{TARGET_DB}")
- use_database(TARGET_DB, using="milvus_last_conn")
- print(f"✅ 成功切换到数据库:{TARGET_DB}")
- # 3. 查询 lq_db 中所有 Collection(自动找旧数据)
- print(f"\n📋 {TARGET_DB} 数据库中所有 Collection:")
- all_collections = list_collections(using="milvus_last_conn")
-
- if not all_collections:
- print(f"❌ {TARGET_DB} 中无任何 Collection(旧数据元数据已丢失)")
- else:
- print(f"✅ 找到 {len(all_collections)} 个 Collection:{all_collections}")
-
- # 4. 遍历每个 Collection 查数据量
- for coll_name in all_collections:
- print(f"\n=== 🔍 Collection:{coll_name} ===")
- # 定义通用结构(适配大多数场景)
- fields = [
- FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
- FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=VECTOR_DIM)
- ]
- schema = CollectionSchema(fields=fields)
- coll = Collection(
- name=coll_name,
- schema=schema,
- using="milvus_last_conn"
- )
-
- row_count = coll.num_entities
- if row_count > 0:
- print(f"🎉 旧数据找到!数据量:{row_count} 条")
- print(f"👉 Attu 切换到 {TARGET_DB} → 选择 {coll_name} 即可查看!")
- else:
- print(f"⚠️ 该 Collection 无数据")
- except Exception as e:
- error_msg = str(e).lower()
- if "database not found" in error_msg:
- print(f"❌ 未找到 {TARGET_DB} 数据库(旧数据库已丢失)")
- else:
- print(f"❌ 错误:{e}")
- finally:
- # 断开连接
- if connections.has_connection("milvus_last_conn"):
- connections.disconnect("milvus_last_conn")
- print("\n🔌 已断开连接")
- if __name__ == "__main__":
- pass
|