exporter.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. import asyncio
  2. import re
  3. from prometheus_client import CONTENT_TYPE_LATEST, REGISTRY, generate_latest
  4. from prometheus_client.registry import Collector
  5. from prometheus_client.core import (
  6. GaugeMetricFamily,
  7. InfoMetricFamily,
  8. )
  9. import uvicorn
  10. from gpustack.config.config import Config
  11. from gpustack.exporter.bus_metrics import BusMetricsCollector
  12. from gpustack.logging import setup_logging
  13. from gpustack.schemas.config import ModelInstanceProxyModeEnum
  14. from gpustack.schemas.clusters import Cluster
  15. from gpustack.schemas.models import Model
  16. from gpustack.schemas.workers import Worker, WorkerStateEnum
  17. from gpustack.server.db import async_session
  18. from gpustack.server.deps import SessionDep
  19. from gpustack.utils.name import metric_name
  20. import logging
  21. from sqlmodel.ext.asyncio.session import AsyncSession
  22. from sqlalchemy.orm import selectinload
  23. from fastapi import FastAPI, Response
  24. logger = logging.getLogger(__name__)
  25. # Prometheus label name pattern
  26. # https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
  27. label_name_pattern = r'^[a-zA-Z_:][a-zA-Z0-9_:]*$'
  28. class MetricExporter(Collector):
  29. def __init__(self, cfg: Config):
  30. self._cache_metrics = []
  31. self._port = cfg.metrics_port
  32. def collect(self):
  33. for metric in self._cache_metrics:
  34. yield metric
  35. async def generate_metrics_cache(self):
  36. while True:
  37. async with async_session() as session:
  38. self._cache_metrics = await self._collect_metrics(session)
  39. await asyncio.sleep(3)
  40. async def _collect_metrics(self, session: AsyncSession):
  41. cluster_labels = ["cluster_id", "cluster_name"]
  42. worker_labels = cluster_labels + ["worker_id", "worker_name"]
  43. model_labels = cluster_labels + ["model_id", "model_name"]
  44. model_instance_labels = worker_labels + [
  45. "model_id",
  46. "model_name",
  47. "model_instance_name",
  48. ]
  49. # cluster metrics
  50. cluster_info = InfoMetricFamily(metric_name("cluster"), "Cluster information")
  51. cluster_status = GaugeMetricFamily(
  52. metric_name("cluster_status"),
  53. "Cluster status",
  54. labels=cluster_labels + ["state"],
  55. )
  56. # worker metrics
  57. worker_info = InfoMetricFamily(metric_name("worker"), "Worker information")
  58. worker_status = GaugeMetricFamily(
  59. metric_name("worker_status"),
  60. "Worker status",
  61. labels=worker_labels + ["state"],
  62. )
  63. # model metrics
  64. model_info = InfoMetricFamily(metric_name("model"), "Model information")
  65. model_desired_instances = GaugeMetricFamily(
  66. metric_name("model_desired_instances"),
  67. "Desired instances of the model",
  68. labels=model_labels,
  69. )
  70. model_running_instances = GaugeMetricFamily(
  71. metric_name("model_running_instances"),
  72. "Running instances of the model",
  73. labels=model_labels,
  74. )
  75. model_instance_status = GaugeMetricFamily(
  76. metric_name("model_instance_status"),
  77. "Model instance status",
  78. labels=model_instance_labels + ["state"],
  79. )
  80. model_instance_restart_count = GaugeMetricFamily(
  81. metric_name("model_instance_restart_count"),
  82. "Model instance restart count",
  83. labels=model_instance_labels,
  84. )
  85. model_instance_latest_restart_time = GaugeMetricFamily(
  86. metric_name("model_instance_latest_restart_time"),
  87. "Model instance latest restart time as Unix timestamp seconds",
  88. labels=model_instance_labels,
  89. )
  90. metrics = [
  91. cluster_info,
  92. cluster_status,
  93. worker_info,
  94. worker_status,
  95. model_info,
  96. model_desired_instances,
  97. model_running_instances,
  98. model_instance_status,
  99. model_instance_restart_count,
  100. model_instance_latest_restart_time,
  101. ]
  102. # cluster metrics
  103. cluster_id_to_name = {}
  104. model_id_to_name = {}
  105. model_id_to_cluster_id = {}
  106. clusters = await Cluster.all(
  107. session,
  108. options=[
  109. selectinload(Cluster.cluster_workers),
  110. selectinload(Cluster.cluster_models).selectinload(Model.instances),
  111. ],
  112. )
  113. for cluster in clusters:
  114. cluster_id_to_name[str(cluster.id)] = cluster.name
  115. cluster_label_values = [str(cluster.id), cluster.name]
  116. cluster_info.add_metric(
  117. cluster_labels + ["provider"],
  118. {
  119. "cluster_id": str(cluster.id),
  120. "cluster_name": cluster.name,
  121. "provider": str(cluster.provider),
  122. },
  123. )
  124. cluster_status.add_metric(
  125. cluster_label_values + [cluster.state],
  126. 1,
  127. )
  128. # worker metrics
  129. workers = cluster.cluster_workers
  130. for worker in workers:
  131. worker_label_values = cluster_label_values + [
  132. str(worker.id),
  133. worker.name,
  134. worker.state,
  135. ]
  136. worker_dynamic_label_keys = []
  137. worker_info_metric_values = {
  138. "cluster_id": str(cluster.id),
  139. "cluster_name": cluster.name,
  140. "worker_id": str(worker.id),
  141. "worker_name": worker.name,
  142. }
  143. for k, v in (worker.labels or {}).items():
  144. if not re.match(label_name_pattern, k):
  145. continue
  146. worker_dynamic_label_keys.append(k)
  147. worker_info_metric_values[k] = v
  148. worker_info.add_metric(
  149. worker_labels + worker_dynamic_label_keys,
  150. worker_info_metric_values,
  151. )
  152. worker_status.add_metric(
  153. worker_label_values,
  154. 1,
  155. )
  156. # model metrics
  157. models = cluster.cluster_models
  158. for model in models:
  159. model_id_to_name[str(model.id)] = model.name
  160. model_id_to_cluster_id[str(model.id)] = str(cluster.id)
  161. model_label_values = cluster_label_values + [
  162. str(model.id),
  163. model.name,
  164. ]
  165. model_info.add_metric(
  166. model_labels
  167. + ["runtime", "runtime_version", "source", "source_key"],
  168. {
  169. "cluster_id": str(cluster.id),
  170. "cluster_name": cluster.name,
  171. "model_id": str(model.id),
  172. "model_name": model.name,
  173. "runtime": model.backend,
  174. "runtime_version": model.backend_version or "unknown",
  175. "source": model.source,
  176. "source_key": model.model_source_key,
  177. },
  178. )
  179. model_desired_instances.add_metric(
  180. model_label_values,
  181. model.replicas,
  182. )
  183. model_running_instances.add_metric(
  184. model_label_values,
  185. model.ready_replicas,
  186. )
  187. # instance metrics
  188. instances = model.instances
  189. for mi in instances:
  190. worker_id = str(mi.worker_id) if mi.worker_id else "unknown"
  191. worker_name = mi.worker_name if mi.worker_name else "unknown"
  192. mi_label_values = cluster_label_values + [
  193. worker_id,
  194. worker_name,
  195. str(model.id),
  196. model.name,
  197. mi.name,
  198. ]
  199. model_instance_status.add_metric(
  200. mi_label_values + [mi.state],
  201. 1,
  202. )
  203. model_instance_restart_count.add_metric(
  204. mi_label_values,
  205. mi.restart_count or 0,
  206. )
  207. model_instance_latest_restart_time.add_metric(
  208. mi_label_values,
  209. (
  210. mi.last_restart_time.timestamp()
  211. if mi.last_restart_time
  212. else 0
  213. ),
  214. )
  215. # return all metrics
  216. return metrics
  217. async def start(self):
  218. try:
  219. REGISTRY.register(self)
  220. REGISTRY.register(BusMetricsCollector())
  221. # Start FastAPI server
  222. app = FastAPI(
  223. title="GPUStack Metrics Exporter", response_model_exclude_unset=True
  224. )
  225. @app.get("/metrics")
  226. def metrics():
  227. data = generate_latest(REGISTRY)
  228. return Response(content=data, media_type=CONTENT_TYPE_LATEST)
  229. @app.get("/metrics/targets")
  230. async def metrics_targets(session: SessionDep):
  231. return await _metrics_targets(session, is_proxy=False)
  232. @app.get("/metrics/proxy-targets")
  233. async def metrics_proxy_targets(session: SessionDep):
  234. return await _metrics_targets(session, is_proxy=True)
  235. async def _metrics_targets(session: AsyncSession, is_proxy: bool):
  236. targets = []
  237. worker_list = await Worker.all(
  238. session=session, options=[selectinload(Worker.cluster)]
  239. )
  240. cluster_workers = {}
  241. for worker in worker_list:
  242. preferred_address = (
  243. worker.advertise_address if not is_proxy else worker.ip
  244. )
  245. if (
  246. worker.state == WorkerStateEnum.READY
  247. and worker.metrics_port
  248. and worker.metrics_port > 0
  249. and (
  250. is_proxy
  251. == (worker.proxy_mode == ModelInstanceProxyModeEnum.TUNNEL)
  252. )
  253. ):
  254. key = (worker.cluster_id, worker.cluster.name)
  255. if key not in cluster_workers:
  256. cluster_workers[key] = []
  257. cluster_workers[key].append(
  258. f"{preferred_address}:{worker.metrics_port}"
  259. )
  260. for (cluster_id, cluster_name), endpoints in cluster_workers.items():
  261. targets.append(
  262. {
  263. "labels": {
  264. "cluster_id": str(cluster_id),
  265. "cluster_name": cluster_name,
  266. },
  267. "targets": endpoints,
  268. }
  269. )
  270. return targets
  271. config = uvicorn.Config(
  272. app,
  273. host="0.0.0.0",
  274. port=self._port,
  275. access_log=False,
  276. log_level="error",
  277. )
  278. setup_logging()
  279. logger.info(f"Serving metric exporter on {config.host}:{config.port}.")
  280. server = uvicorn.Server(config)
  281. await server.serve()
  282. except Exception as e:
  283. logger.error(f"Failed to start metric exporter: {e}")