exporter.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. from concurrent.futures import ThreadPoolExecutor
  2. from typing import Callable
  3. from prometheus_client.registry import Collector
  4. from prometheus_client import (
  5. CollectorRegistry,
  6. generate_latest,
  7. CONTENT_TYPE_LATEST,
  8. )
  9. from prometheus_client.core import (
  10. GaugeMetricFamily,
  11. InfoMetricFamily,
  12. )
  13. from gpustack.client.generated_clientset import ClientSet
  14. from gpustack.config.config import Config
  15. from gpustack.logging import setup_logging
  16. from gpustack.utils.name import metric_name
  17. from gpustack.worker.collector import WorkerStatusCollector
  18. import uvicorn
  19. import logging
  20. from fastapi import FastAPI, Response
  21. logger = logging.getLogger(__name__)
  22. unified_registry = CollectorRegistry()
  23. raw_registry = CollectorRegistry()
  24. def _safe_label(value, default: str = "unknown") -> str:
  25. return default if value is None else str(value)
  26. def _add_metric(
  27. metric: GaugeMetricFamily,
  28. labels: list[str],
  29. value: float | int | None,
  30. ):
  31. if value is None:
  32. return
  33. metric.add_metric(labels, value)
  34. class MetricExporter(Collector):
  35. _worker_ip_getter: Callable[[], str]
  36. _worker_name_getter: Callable[[], str]
  37. _worker_id_getter: Callable[[], int]
  38. _collector: WorkerStatusCollector
  39. def __init__(
  40. self,
  41. cfg: Config,
  42. collector: WorkerStatusCollector,
  43. worker_name_getter: Callable[[], str],
  44. worker_ip_getter: Callable[[], str],
  45. worker_id_getter: Callable[[], int],
  46. clientset_getter: Callable[[], ClientSet] = None,
  47. cache: dict = None,
  48. ):
  49. self._collector = collector
  50. self._worker_name_getter = worker_name_getter
  51. self._worker_id_getter = worker_id_getter
  52. self._worker_ip_getter = worker_ip_getter
  53. self._port = cfg.worker_metrics_port
  54. self._cache = cache
  55. self._clientset_getter = clientset_getter
  56. def collect(self):
  57. with ThreadPoolExecutor() as executor:
  58. worker_future = executor.submit(list, self.collect_worker_metrics())
  59. runtime_future = executor.submit(list, self.collect_runtime_metrics())
  60. for metric in worker_future.result():
  61. yield metric
  62. for metric in runtime_future.result():
  63. yield metric
  64. def collect_worker_metrics(self): # noqa: C901
  65. labels = ["worker_id", "worker_name", "instance"]
  66. filesystem_labels = labels + ["mountpoint"]
  67. gpu_labels = labels + ["gpu_index", "gpu_name", "gpu_chip_index"]
  68. # metrics
  69. os_info = InfoMetricFamily(
  70. metric_name("worker_node_os"), "Operating system information"
  71. )
  72. kernel_info = InfoMetricFamily(
  73. metric_name("worker_node_kernel"), "Kernel information"
  74. )
  75. uptime = GaugeMetricFamily(
  76. metric_name("worker_node_uptime_seconds"),
  77. "Uptime in seconds of the worker node",
  78. labels=labels,
  79. )
  80. cpu_cores = GaugeMetricFamily(
  81. metric_name("worker_node_cpu_cores"),
  82. "Total CPUs cores of the worker node",
  83. labels=labels,
  84. )
  85. cpu_utilization_rate = GaugeMetricFamily(
  86. metric_name("worker_node_cpu_utilization_rate"),
  87. "Rate of CPU utilization on the worker node",
  88. labels=labels,
  89. )
  90. memory_total = GaugeMetricFamily(
  91. metric_name("worker_node_memory_total_bytes"),
  92. "Total memory in bytes of the worker node",
  93. labels=labels,
  94. )
  95. memory_used = GaugeMetricFamily(
  96. metric_name("worker_node_memory_used_bytes"),
  97. "Memory used in bytes of the worker node",
  98. labels=labels,
  99. )
  100. memory_utilization_rate = GaugeMetricFamily(
  101. metric_name("worker_node_memory_utilization_rate"),
  102. "Rate of memory utilization on the worker node",
  103. labels=labels,
  104. )
  105. gpu_info = InfoMetricFamily("worker_node_gpu", "GPU information")
  106. gpu_cores = GaugeMetricFamily(
  107. metric_name("worker_node_gpu_cores"),
  108. "Total GPUs cores of the worker node",
  109. labels=gpu_labels,
  110. )
  111. gpu_utilization_rate = GaugeMetricFamily(
  112. metric_name("worker_node_gpu_utilization_rate"),
  113. "Rate of GPU utilization on the worker node",
  114. labels=gpu_labels,
  115. )
  116. gpu_temperature = GaugeMetricFamily(
  117. metric_name("worker_node_gpu_temperature_celsius"),
  118. "GPU temperature in celsius of the worker node",
  119. labels=gpu_labels,
  120. )
  121. gram_total = GaugeMetricFamily(
  122. metric_name("worker_node_gram_total_bytes"),
  123. "Total GPU RAM in bytes of the worker node",
  124. labels=gpu_labels,
  125. )
  126. gram_allocated = GaugeMetricFamily(
  127. metric_name("worker_node_gram_allocated_bytes"),
  128. "Allocated GPU RAM in bytes of the worker node",
  129. labels=gpu_labels,
  130. )
  131. gram_used = GaugeMetricFamily(
  132. metric_name("worker_node_gram_used_bytes"),
  133. "GPU RAM used in bytes of the worker node",
  134. labels=gpu_labels,
  135. )
  136. gram_utilization_rate = GaugeMetricFamily(
  137. metric_name("worker_node_gram_utilization_rate"),
  138. "Rate of GPU RAM utilization on the worker node",
  139. labels=gpu_labels,
  140. )
  141. filesystem_total = GaugeMetricFamily(
  142. metric_name("worker_node_filesystem_total_bytes"),
  143. "Total filesystem in bytes of the worker node",
  144. labels=filesystem_labels,
  145. )
  146. filesystem_used = GaugeMetricFamily(
  147. metric_name("worker_node_filesystem_used_bytes"),
  148. "Total filesystem used in bytes of the worker node",
  149. labels=filesystem_labels,
  150. )
  151. filesystem_utilization_rate = GaugeMetricFamily(
  152. metric_name("worker_node_filesystem_utilization_rate"),
  153. "Rate of filesystem utilization on the worker node",
  154. labels=filesystem_labels,
  155. )
  156. worker_ip = _safe_label(self._worker_ip_getter())
  157. worker_id = _safe_label(self._worker_id_getter())
  158. worker_name = _safe_label(self._worker_name_getter())
  159. worker_label_values = [worker_id, worker_name, worker_ip]
  160. try:
  161. worker = self._collector.timed_collect(clientset=self._clientset_getter())
  162. status = worker.status
  163. if status is None:
  164. logger.error("Empty worker node status from collector.")
  165. return
  166. except Exception as e:
  167. logger.error(f"Failed to get worker node status for metrics exporter: {e}")
  168. return
  169. # system
  170. if status.os is not None:
  171. os_info.add_metric(
  172. ["worker_id", "worker_name", "instance", "name", "version"],
  173. {
  174. "worker_id": worker_id,
  175. "worker_name": worker_name,
  176. "instance": worker_ip,
  177. "name": _safe_label(status.os.name),
  178. "version": _safe_label(status.os.version),
  179. },
  180. )
  181. # kernel
  182. if status.kernel is not None:
  183. kernel_info.add_metric(
  184. ["worker_id", "worker_name", "instance", "name", "version"],
  185. {
  186. "worker_id": worker_id,
  187. "worker_name": worker_name,
  188. "instance": worker_ip,
  189. "name": _safe_label(status.kernel.name),
  190. "release": _safe_label(status.kernel.release),
  191. "version": _safe_label(status.kernel.version),
  192. "architecture": _safe_label(status.kernel.architecture),
  193. },
  194. )
  195. # uptime
  196. if status.uptime is not None:
  197. _add_metric(uptime, worker_label_values, status.uptime.uptime)
  198. # cpu
  199. if status.cpu is not None:
  200. _add_metric(cpu_cores, worker_label_values, status.cpu.total)
  201. _add_metric(
  202. cpu_utilization_rate,
  203. worker_label_values,
  204. status.cpu.utilization_rate,
  205. )
  206. # memory
  207. if status.memory is not None:
  208. _add_metric(memory_total, worker_label_values, status.memory.total)
  209. _add_metric(memory_used, worker_label_values, status.memory.used)
  210. if (
  211. status.memory.total is not None
  212. and status.memory.used is not None
  213. and status.memory.total != 0
  214. ):
  215. _add_metric(
  216. memory_utilization_rate,
  217. worker_label_values,
  218. _rate(status.memory.used, status.memory.total),
  219. )
  220. # gpu
  221. if status.gpu_devices is not None:
  222. for i, d in enumerate(status.gpu_devices):
  223. gpu_chip_index = "0" # TODO(michelia): Placeholder, replace with actual chip index if available
  224. gpu_label_values = worker_label_values + [
  225. str(i),
  226. _safe_label(d.name),
  227. _safe_label(gpu_chip_index),
  228. ]
  229. gpu_info.add_metric(
  230. gpu_labels,
  231. {
  232. "worker_id": worker_id,
  233. "worker_name": worker_name,
  234. "instance": worker_ip,
  235. "gpu_index": str(i),
  236. "gpu_chip_index": _safe_label(gpu_chip_index),
  237. "gpu_name": _safe_label(d.name),
  238. },
  239. )
  240. if d.core is not None:
  241. _add_metric(gpu_cores, gpu_label_values, d.core.total)
  242. _add_metric(
  243. gpu_utilization_rate,
  244. gpu_label_values,
  245. d.core.utilization_rate,
  246. )
  247. _add_metric(gpu_temperature, gpu_label_values, d.temperature)
  248. if d.memory is not None:
  249. _add_metric(gram_total, gpu_label_values, d.memory.total)
  250. _add_metric(gram_allocated, gpu_label_values, d.memory.allocated)
  251. _add_metric(gram_used, gpu_label_values, d.memory.used)
  252. if (
  253. d.memory.total is not None
  254. and d.memory.used is not None
  255. and d.memory.total != 0
  256. ):
  257. _add_metric(
  258. gram_utilization_rate,
  259. gpu_label_values,
  260. _rate(d.memory.used, d.memory.total),
  261. )
  262. # filesystem
  263. if status.filesystem is not None:
  264. for _, d in enumerate(status.filesystem):
  265. _add_metric(
  266. filesystem_total,
  267. worker_label_values + [_safe_label(d.mount_point)],
  268. d.total,
  269. )
  270. _add_metric(
  271. filesystem_used,
  272. worker_label_values + [_safe_label(d.mount_point)],
  273. d.used,
  274. )
  275. if d.total is not None and d.used is not None and d.total != 0:
  276. _add_metric(
  277. filesystem_utilization_rate,
  278. worker_label_values + [_safe_label(d.mount_point)],
  279. _rate(d.used, d.total),
  280. )
  281. # system
  282. yield os_info
  283. yield kernel_info
  284. yield uptime
  285. yield cpu_cores
  286. yield cpu_utilization_rate
  287. yield memory_total
  288. yield memory_used
  289. yield memory_utilization_rate
  290. yield gpu_info
  291. yield gpu_cores
  292. yield gpu_utilization_rate
  293. yield gpu_temperature
  294. yield gram_total
  295. yield gram_allocated
  296. yield gram_used
  297. yield gram_utilization_rate
  298. yield filesystem_total
  299. yield filesystem_used
  300. yield filesystem_utilization_rate
  301. def collect_runtime_metrics(self):
  302. if not self._cache or self._cache.get("unified") is None:
  303. return
  304. for _, prom_metric in self._cache["unified"].items():
  305. yield prom_metric
  306. def start(self):
  307. try:
  308. raw_collector = RawCollector(
  309. cache=self._cache,
  310. )
  311. raw_registry.register(raw_collector)
  312. unified_registry.register(self)
  313. # Start FastAPI server
  314. app = FastAPI(
  315. title="GPUStack Worker Metrics Exporter",
  316. response_model_exclude_unset=True,
  317. )
  318. @app.get("/metrics")
  319. def metrics():
  320. data = generate_latest(unified_registry)
  321. return Response(content=data, media_type=CONTENT_TYPE_LATEST)
  322. @app.get("/metrics/raw")
  323. def metrics_raw():
  324. data = generate_latest(raw_registry)
  325. return Response(content=data, media_type=CONTENT_TYPE_LATEST)
  326. config = uvicorn.Config(
  327. app,
  328. host="0.0.0.0",
  329. port=self._port,
  330. access_log=False,
  331. log_level="error",
  332. )
  333. setup_logging()
  334. logger.info(f"Serving metric exporter on {config.host}:{config.port}.")
  335. server = uvicorn.Server(config)
  336. server.run()
  337. except Exception as e:
  338. logger.error(f"Failed to start metric exporter: {e}")
  339. class RawCollector(Collector):
  340. def __init__(
  341. self,
  342. cache: dict = None,
  343. ):
  344. self._cache = cache
  345. def collect(self):
  346. # passthrough raw metrics from runtime and add gpustack related labels.
  347. if not self._cache or self._cache.get("raw") is None:
  348. return
  349. for _, prom_metric in self._cache["raw"].items():
  350. yield prom_metric
  351. def _rate(used, total):
  352. return round(used / total, 6) * 100 if total != 0 else 0