collector.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. import socket
  2. import logging
  3. from typing import Optional, Callable
  4. from gpustack.config.config import Config
  5. from gpustack.client.generated_clientset import ClientSet
  6. from gpustack.detectors.base import GPUDetectExepction
  7. from gpustack.detectors.custom.custom import Custom
  8. from gpustack.detectors.detector_factory import DetectorFactory
  9. from gpustack.envs import WORKER_STATUS_COLLECTION_LOG_SLOW_SECONDS
  10. from gpustack.policies.base import Allocated
  11. from gpustack.schemas.models import ComputedResourceClaim
  12. from gpustack.schemas.workers import (
  13. MountPoint,
  14. WorkerStatusPublic,
  15. WorkerStatus,
  16. SystemReserved,
  17. GPUDevicesStatus,
  18. SystemInfo,
  19. )
  20. from gpustack.utils.profiling import time_decorator
  21. logger = logging.getLogger(__name__)
  22. class WorkerStatusCollector:
  23. _cfg: Config
  24. _worker_id_getter: Callable[[], int]
  25. _worker_ifname_getter: Callable[[], str]
  26. _worker_ip_getter: Callable[[], str]
  27. _worker_uuid_getter: Callable[[], str]
  28. _gpu_devices: GPUDevicesStatus
  29. _system_info: SystemInfo
  30. @property
  31. def gpu_devices(self) -> GPUDevicesStatus:
  32. return self._gpu_devices
  33. @property
  34. def system_info(self) -> SystemInfo:
  35. return self._system_info
  36. def __init__(
  37. self,
  38. cfg: Config,
  39. worker_ip_getter: Callable[[], str],
  40. worker_ifname_getter: Callable[[], str],
  41. worker_id_getter: Callable[[], int],
  42. worker_uuid_getter: Callable[[], str],
  43. ):
  44. self._cfg = cfg
  45. self._worker_ip_getter = worker_ip_getter
  46. self._worker_ifname_getter = worker_ifname_getter
  47. self._worker_id_getter = worker_id_getter
  48. self._worker_uuid_getter = worker_uuid_getter
  49. self._gpu_devices = cfg.get_gpu_devices()
  50. self._system_info = cfg.get_system_info()
  51. if self._gpu_devices and self._system_info:
  52. self._detector_factory = DetectorFactory(
  53. device="custom",
  54. gpu_detectors={"custom": [Custom(gpu_devices=self._gpu_devices)]},
  55. system_info_detector=Custom(system_info=self._system_info),
  56. )
  57. elif self._gpu_devices:
  58. self._detector_factory = DetectorFactory(
  59. device="custom",
  60. gpu_detectors={"custom": [Custom(gpu_devices=self._gpu_devices)]},
  61. )
  62. elif self._system_info:
  63. self._detector_factory = DetectorFactory(
  64. system_info_detector=Custom(system_info=self._system_info)
  65. )
  66. else:
  67. self._detector_factory = DetectorFactory()
  68. """A class for collecting worker status information."""
  69. @time_decorator(log_slow_seconds=WORKER_STATUS_COLLECTION_LOG_SLOW_SECONDS)
  70. def timed_collect(self, clientset: ClientSet = None, initial: bool = False):
  71. return self.collect(clientset=clientset, initial=initial)
  72. def collect(
  73. self, clientset: ClientSet = None, initial: bool = False
  74. ) -> WorkerStatusPublic: # noqa: C901
  75. """Collect worker status information."""
  76. status = WorkerStatus.get_default_status()
  77. state_message = None
  78. try:
  79. system_info = self._detector_factory.detect_system_info()
  80. status = WorkerStatus.model_validate({**system_info.model_dump()})
  81. except Exception as e:
  82. logger.error(f"Failed to detect system info: {e}")
  83. if not initial:
  84. try:
  85. gpu_devices = self._detector_factory.detect_gpus()
  86. status.gpu_devices = gpu_devices
  87. except GPUDetectExepction as e:
  88. state_message = str(e)
  89. except Exception as e:
  90. logger.error(f"Failed to detect GPU devices: {e}")
  91. self._inject_unified_memory(status)
  92. self._inject_computed_filesystem_usage(status)
  93. self._inject_allocated_resource(clientset, status)
  94. # If disable_worker_metrics is set, set metrics_port to -1
  95. metrics_port = self._cfg.worker_metrics_port
  96. if self._cfg.disable_worker_metrics:
  97. metrics_port = -1
  98. return WorkerStatusPublic(
  99. advertise_address=self._cfg.advertise_address or self._worker_ip_getter(),
  100. hostname=socket.gethostname(),
  101. ip=self._worker_ip_getter(),
  102. ifname=self._worker_ifname_getter(),
  103. port=self._cfg.worker_port,
  104. metrics_port=metrics_port,
  105. system_reserved=SystemReserved(**self._cfg.get_system_reserved()),
  106. state_message=state_message,
  107. status=status,
  108. worker_uuid=self._worker_uuid_getter(),
  109. proxy_mode=self._cfg.proxy_mode,
  110. )
  111. def _inject_unified_memory(self, status: WorkerStatus):
  112. is_unified_memory = False
  113. if status.gpu_devices is not None and len(status.gpu_devices) != 0:
  114. is_unified_memory = status.gpu_devices[0].memory.is_unified_memory
  115. if status.memory is not None:
  116. status.memory.is_unified_memory = is_unified_memory
  117. def _inject_computed_filesystem_usage(self, status: WorkerStatus):
  118. if (
  119. status.os is None
  120. or "Windows" not in status.os.name
  121. or status.filesystem is None
  122. ):
  123. return
  124. try:
  125. computed = MountPoint(
  126. name="computed",
  127. mount_point="/",
  128. total=0,
  129. used=0,
  130. free=0,
  131. available=0,
  132. )
  133. for mountpoint in status.filesystem:
  134. computed.total = computed.total + mountpoint.total
  135. computed.used = computed.used + mountpoint.used
  136. computed.free = computed.free + mountpoint.free
  137. computed.available = computed.available + mountpoint.available
  138. # inject computed filesystem usage
  139. status.filesystem.append(computed)
  140. except Exception as e:
  141. logger.error(f"Failed to inject filesystem usage: {e}")
  142. def _inject_allocated_resource( # noqa: C901
  143. self, clientset: ClientSet, status: WorkerStatus
  144. ):
  145. if clientset is None:
  146. return
  147. worker_id = self._worker_id_getter()
  148. allocated = Allocated(ram=0, vram={})
  149. try:
  150. # TODO avoid listing model_instances with clientset.
  151. # The calculation might not be needed here.
  152. model_instances = clientset.model_instances.list()
  153. for model_instance in model_instances.items:
  154. if (
  155. model_instance.distributed_servers
  156. and model_instance.distributed_servers.subordinate_workers
  157. ):
  158. for (
  159. subworker
  160. ) in model_instance.distributed_servers.subordinate_workers:
  161. if subworker.worker_id != worker_id:
  162. continue
  163. aggregate_computed_resource_claim_allocated(
  164. allocated, subworker.computed_resource_claim
  165. )
  166. if model_instance.worker_id != worker_id:
  167. continue
  168. aggregate_computed_resource_claim_allocated(
  169. allocated, model_instance.computed_resource_claim
  170. )
  171. # inject allocated resources
  172. if status.memory is not None:
  173. status.memory.allocated = allocated.ram
  174. if status.gpu_devices is not None:
  175. for i, device in enumerate(status.gpu_devices):
  176. if device.index in allocated.vram:
  177. status.gpu_devices[i].memory.allocated = allocated.vram[
  178. device.index
  179. ]
  180. else:
  181. status.gpu_devices[i].memory.allocated = 0
  182. except Exception as e:
  183. logger.error(f"Failed to inject allocated resources: {e}")
  184. def aggregate_computed_resource_claim_allocated(
  185. allocated: Allocated, computed_resource_claim: Optional[ComputedResourceClaim]
  186. ):
  187. """Aggregate allocated resources from a ComputedResourceClaim into Allocated."""
  188. if computed_resource_claim is None:
  189. return
  190. if computed_resource_claim.ram:
  191. allocated.ram += computed_resource_claim.ram
  192. for gpu_index, vram in (computed_resource_claim.vram or {}).items():
  193. allocated.vram[gpu_index] = (allocated.vram.get(gpu_index) or 0) + vram