|
|
@@ -0,0 +1,81 @@
|
|
|
+"""
|
|
|
+Milvus Service:业务层(直接用 manager.client 调 Milvus 原生方法)
|
|
|
+"""
|
|
|
+from __future__ import annotations
|
|
|
+
|
|
|
+import logging
|
|
|
+from typing import List, Dict, Any
|
|
|
+from datetime import datetime
|
|
|
+
|
|
|
+from src.app.config.milvus import get_milvus_manager
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+
|
|
|
+class MilvusService:
|
|
|
+ def __init__(self):
|
|
|
+ self.client = get_milvus_manager().client
|
|
|
+
|
|
|
+ def get_collection_details(self) -> List[Dict[str, Any]]:
|
|
|
+ """
|
|
|
+ 获取所有 Collections 详细信息(按你的要求):
|
|
|
+ - 时间转换直接写在这里:physical_ms = ts_int >> 18
|
|
|
+ - load state / row_count 不保底:拿不到就让异常抛出
|
|
|
+ - 直接调用 MilvusClient 原生方法(不再二次封装)
|
|
|
+ """
|
|
|
+ details: List[Dict[str, Any]] = []
|
|
|
+
|
|
|
+ names = self.client.list_collections()
|
|
|
+
|
|
|
+ for name in names:
|
|
|
+ desc = self.client.describe_collection(collection_name=name)
|
|
|
+ stats = self.client.get_collection_stats(collection_name=name)
|
|
|
+ load_state = self.client.get_load_state(collection_name=name)
|
|
|
+
|
|
|
+ # ===== 时间戳转换(按你指定写法,无封装)=====
|
|
|
+ created_time = None
|
|
|
+ updated_time = None
|
|
|
+
|
|
|
+ if desc.get("created_timestamp") is not None:
|
|
|
+ ts_int = int(desc["created_timestamp"])
|
|
|
+ physical_ms = ts_int >> 18
|
|
|
+ created_time = datetime.fromtimestamp(physical_ms / 1000).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+
|
|
|
+ if desc.get("update_timestamp") is not None:
|
|
|
+ ts_int = int(desc["update_timestamp"])
|
|
|
+ physical_ms = ts_int >> 18
|
|
|
+ updated_time = datetime.fromtimestamp(physical_ms / 1000).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+
|
|
|
+ # ===== 数量:不保底(要求返回结构必须有 row_count)=====
|
|
|
+ entity_count = stats["row_count"]
|
|
|
+
|
|
|
+ # ===== 状态:不保底(要求返回结构必须有 state)=====
|
|
|
+ status = load_state["state"]
|
|
|
+
|
|
|
+ details.append(
|
|
|
+ {
|
|
|
+ "name": name,
|
|
|
+ "status": status,
|
|
|
+ "entity_count": entity_count,
|
|
|
+ "description": desc.get("description", ""),
|
|
|
+ "created_time": created_time,
|
|
|
+ "updated_time": updated_time,
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.info(f"成功获取Collections详细信息,共{len(details)}个")
|
|
|
+ return details
|
|
|
+
|
|
|
+
|
|
|
+# 可选:单例
|
|
|
+milvus_service = MilvusService()
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ # 推荐这样跑:
|
|
|
+ # uv run python -m src.app.services.milvus_service
|
|
|
+ import json
|
|
|
+
|
|
|
+ data = MilvusService().get_collection_details()
|
|
|
+ for item in data:
|
|
|
+ print(json.dumps(item, ensure_ascii=False, indent=2))
|