system_load.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import asyncio
  2. import logging
  3. from typing import Tuple, Dict, List
  4. from gpustack.schemas.workers import Worker
  5. from gpustack.schemas.system_load import SystemLoad
  6. from gpustack.server.db import async_session
  7. logger = logging.getLogger(__name__)
  8. def workers_by_cluster_id(workers: List[Worker]) -> Dict[int, List[Worker]]:
  9. rtn: Dict[int, List[Worker]] = {}
  10. for worker in workers:
  11. if worker.cluster_id not in rtn:
  12. rtn[worker.cluster_id] = []
  13. rtn[worker.cluster_id].append(worker)
  14. return rtn
  15. def _safe_cpu_rate(worker: Worker) -> float:
  16. if worker.status and worker.status.cpu and worker.status.cpu.utilization_rate:
  17. return worker.status.cpu.utilization_rate
  18. return 0.0
  19. def _safe_memory_rate(worker: Worker) -> float:
  20. if worker.status and worker.status.memory and worker.status.memory.utilization_rate:
  21. return worker.status.memory.utilization_rate
  22. return 0.0
  23. def compute_avg_cpu_memory_utilization_rate(
  24. workers: List[Worker],
  25. ) -> Dict[int | None, Tuple[float, float]]:
  26. rtn: Dict[int | None, Tuple[float, float]] = {
  27. None: (0, 0),
  28. }
  29. by_cluster = workers_by_cluster_id(workers)
  30. cpu_sum_value = 0
  31. memory_sum_value = 0
  32. for cluster_id, cluster_workers in by_cluster.items():
  33. cpu_value = sum(_safe_cpu_rate(worker) for worker in cluster_workers)
  34. memory_value = sum(_safe_memory_rate(worker) for worker in cluster_workers)
  35. rtn[cluster_id] = (
  36. cpu_value / len(cluster_workers),
  37. memory_value / len(cluster_workers),
  38. )
  39. cpu_sum_value += cpu_value
  40. memory_sum_value += memory_value
  41. if len(workers) > 0:
  42. cpu_rate = cpu_sum_value / len(workers)
  43. memory_rate = memory_sum_value / len(workers)
  44. rtn[None] = (cpu_rate, memory_rate)
  45. return rtn
  46. def compute_avg_gpu_utilization_rate(
  47. workers: List[Worker],
  48. ) -> Dict[int | None, Tuple[float, float]]:
  49. by_cluster = workers_by_cluster_id(workers)
  50. rtn: Dict[int | None, Tuple[float, float]] = {}
  51. all_util_count = 0
  52. all_memory_count = 0
  53. all_util_sum_value = 0
  54. all_memory_sum_value = 0
  55. for cluster_id, cluster_workers in by_cluster.items():
  56. util_count = sum(
  57. 1
  58. for worker in cluster_workers
  59. for gpu in worker.status.gpu_devices or []
  60. if gpu.core and gpu.core.utilization_rate is not None
  61. )
  62. memory_count = sum(
  63. 1
  64. for worker in cluster_workers
  65. for gpu in worker.status.gpu_devices or []
  66. if gpu.memory and gpu.memory.utilization_rate is not None
  67. )
  68. util_sum_value = sum(
  69. gpu.core.utilization_rate
  70. for worker in cluster_workers
  71. for gpu in worker.status.gpu_devices or []
  72. if gpu.core and gpu.core.utilization_rate is not None
  73. )
  74. memory_sum_value = sum(
  75. gpu.memory.utilization_rate
  76. for worker in cluster_workers
  77. for gpu in worker.status.gpu_devices or []
  78. if gpu.memory and gpu.memory.utilization_rate is not None
  79. )
  80. util_rate = util_sum_value / util_count if util_count > 0 else 0
  81. memory_rate = memory_sum_value / memory_count if memory_count > 0 else 0
  82. rtn[cluster_id] = (util_rate, memory_rate)
  83. all_util_count += util_count
  84. all_memory_count += memory_count
  85. all_util_sum_value += util_sum_value
  86. all_memory_sum_value += memory_sum_value
  87. rtn[None] = (
  88. all_util_sum_value / all_util_count if all_util_count > 0 else 0,
  89. all_memory_sum_value / all_memory_count if all_memory_count > 0 else 0,
  90. )
  91. return rtn
  92. def compute_system_load(workers: List[Worker]) -> List[SystemLoad]:
  93. workers = [worker for worker in workers if not worker.state.is_provisioning]
  94. cpu_memory_by_cluster = compute_avg_cpu_memory_utilization_rate(workers)
  95. gpu_vram_by_cluster = compute_avg_gpu_utilization_rate(workers)
  96. rtn: List[SystemLoad] = [
  97. SystemLoad(
  98. cluster_id=cluster_id,
  99. cpu=cpu_memory_by_cluster.get(cluster_id, (0, 0))[0],
  100. ram=cpu_memory_by_cluster.get(cluster_id, (0, 0))[1],
  101. gpu=gpu_vram_by_cluster.get(cluster_id, (0, 0))[0],
  102. vram=gpu_vram_by_cluster.get(cluster_id, (0, 0))[1],
  103. )
  104. for cluster_id in set(cpu_memory_by_cluster) | set(gpu_vram_by_cluster)
  105. ]
  106. return rtn
  107. class SystemLoadCollector:
  108. def __init__(self, interval=60):
  109. self.interval = interval
  110. async def start(self):
  111. while True:
  112. await asyncio.sleep(self.interval)
  113. try:
  114. async with async_session() as session:
  115. workers = await Worker.all(session=session)
  116. system_loads = compute_system_load(workers)
  117. session.add_all(system_loads)
  118. await session.commit()
  119. except Exception as e:
  120. logger.error(f"Failed to collect system load: {e}")