| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- import asyncio
- import re
- from prometheus_client import CONTENT_TYPE_LATEST, REGISTRY, generate_latest
- from prometheus_client.registry import Collector
- from prometheus_client.core import (
- GaugeMetricFamily,
- InfoMetricFamily,
- )
- import uvicorn
- from gpustack.config.config import Config
- from gpustack.exporter.bus_metrics import BusMetricsCollector
- from gpustack.logging import setup_logging
- from gpustack.schemas.config import ModelInstanceProxyModeEnum
- from gpustack.schemas.clusters import Cluster
- from gpustack.schemas.models import Model
- from gpustack.schemas.workers import Worker, WorkerStateEnum
- from gpustack.server.db import async_session
- from gpustack.server.deps import SessionDep
- from gpustack.utils.name import metric_name
- import logging
- from sqlmodel.ext.asyncio.session import AsyncSession
- from sqlalchemy.orm import selectinload
- from fastapi import FastAPI, Response
- logger = logging.getLogger(__name__)
- # Prometheus label name pattern
- # https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
- label_name_pattern = r'^[a-zA-Z_:][a-zA-Z0-9_:]*$'
- class MetricExporter(Collector):
- def __init__(self, cfg: Config):
- self._cache_metrics = []
- self._port = cfg.metrics_port
- def collect(self):
- for metric in self._cache_metrics:
- yield metric
- async def generate_metrics_cache(self):
- while True:
- async with async_session() as session:
- self._cache_metrics = await self._collect_metrics(session)
- await asyncio.sleep(3)
- async def _collect_metrics(self, session: AsyncSession):
- cluster_labels = ["cluster_id", "cluster_name"]
- worker_labels = cluster_labels + ["worker_id", "worker_name"]
- model_labels = cluster_labels + ["model_id", "model_name"]
- model_instance_labels = worker_labels + [
- "model_id",
- "model_name",
- "model_instance_name",
- ]
- # cluster metrics
- cluster_info = InfoMetricFamily(metric_name("cluster"), "Cluster information")
- cluster_status = GaugeMetricFamily(
- metric_name("cluster_status"),
- "Cluster status",
- labels=cluster_labels + ["state"],
- )
- # worker metrics
- worker_info = InfoMetricFamily(metric_name("worker"), "Worker information")
- worker_status = GaugeMetricFamily(
- metric_name("worker_status"),
- "Worker status",
- labels=worker_labels + ["state"],
- )
- # model metrics
- model_info = InfoMetricFamily(metric_name("model"), "Model information")
- model_desired_instances = GaugeMetricFamily(
- metric_name("model_desired_instances"),
- "Desired instances of the model",
- labels=model_labels,
- )
- model_running_instances = GaugeMetricFamily(
- metric_name("model_running_instances"),
- "Running instances of the model",
- labels=model_labels,
- )
- model_instance_status = GaugeMetricFamily(
- metric_name("model_instance_status"),
- "Model instance status",
- labels=model_instance_labels + ["state"],
- )
- model_instance_restart_count = GaugeMetricFamily(
- metric_name("model_instance_restart_count"),
- "Model instance restart count",
- labels=model_instance_labels,
- )
- model_instance_latest_restart_time = GaugeMetricFamily(
- metric_name("model_instance_latest_restart_time"),
- "Model instance latest restart time as Unix timestamp seconds",
- labels=model_instance_labels,
- )
- metrics = [
- cluster_info,
- cluster_status,
- worker_info,
- worker_status,
- model_info,
- model_desired_instances,
- model_running_instances,
- model_instance_status,
- model_instance_restart_count,
- model_instance_latest_restart_time,
- ]
- # cluster metrics
- cluster_id_to_name = {}
- model_id_to_name = {}
- model_id_to_cluster_id = {}
- clusters = await Cluster.all(
- session,
- options=[
- selectinload(Cluster.cluster_workers),
- selectinload(Cluster.cluster_models).selectinload(Model.instances),
- ],
- )
- for cluster in clusters:
- cluster_id_to_name[str(cluster.id)] = cluster.name
- cluster_label_values = [str(cluster.id), cluster.name]
- cluster_info.add_metric(
- cluster_labels + ["provider"],
- {
- "cluster_id": str(cluster.id),
- "cluster_name": cluster.name,
- "provider": str(cluster.provider),
- },
- )
- cluster_status.add_metric(
- cluster_label_values + [cluster.state],
- 1,
- )
- # worker metrics
- workers = cluster.cluster_workers
- for worker in workers:
- worker_label_values = cluster_label_values + [
- str(worker.id),
- worker.name,
- worker.state,
- ]
- worker_dynamic_label_keys = []
- worker_info_metric_values = {
- "cluster_id": str(cluster.id),
- "cluster_name": cluster.name,
- "worker_id": str(worker.id),
- "worker_name": worker.name,
- }
- for k, v in (worker.labels or {}).items():
- if not re.match(label_name_pattern, k):
- continue
- worker_dynamic_label_keys.append(k)
- worker_info_metric_values[k] = v
- worker_info.add_metric(
- worker_labels + worker_dynamic_label_keys,
- worker_info_metric_values,
- )
- worker_status.add_metric(
- worker_label_values,
- 1,
- )
- # model metrics
- models = cluster.cluster_models
- for model in models:
- model_id_to_name[str(model.id)] = model.name
- model_id_to_cluster_id[str(model.id)] = str(cluster.id)
- model_label_values = cluster_label_values + [
- str(model.id),
- model.name,
- ]
- model_info.add_metric(
- model_labels
- + ["runtime", "runtime_version", "source", "source_key"],
- {
- "cluster_id": str(cluster.id),
- "cluster_name": cluster.name,
- "model_id": str(model.id),
- "model_name": model.name,
- "runtime": model.backend,
- "runtime_version": model.backend_version or "unknown",
- "source": model.source,
- "source_key": model.model_source_key,
- },
- )
- model_desired_instances.add_metric(
- model_label_values,
- model.replicas,
- )
- model_running_instances.add_metric(
- model_label_values,
- model.ready_replicas,
- )
- # instance metrics
- instances = model.instances
- for mi in instances:
- worker_id = str(mi.worker_id) if mi.worker_id else "unknown"
- worker_name = mi.worker_name if mi.worker_name else "unknown"
- mi_label_values = cluster_label_values + [
- worker_id,
- worker_name,
- str(model.id),
- model.name,
- mi.name,
- ]
- model_instance_status.add_metric(
- mi_label_values + [mi.state],
- 1,
- )
- model_instance_restart_count.add_metric(
- mi_label_values,
- mi.restart_count or 0,
- )
- model_instance_latest_restart_time.add_metric(
- mi_label_values,
- (
- mi.last_restart_time.timestamp()
- if mi.last_restart_time
- else 0
- ),
- )
- # return all metrics
- return metrics
- async def start(self):
- try:
- REGISTRY.register(self)
- REGISTRY.register(BusMetricsCollector())
- # Start FastAPI server
- app = FastAPI(
- title="GPUStack Metrics Exporter", response_model_exclude_unset=True
- )
- @app.get("/metrics")
- def metrics():
- data = generate_latest(REGISTRY)
- return Response(content=data, media_type=CONTENT_TYPE_LATEST)
- @app.get("/metrics/targets")
- async def metrics_targets(session: SessionDep):
- return await _metrics_targets(session, is_proxy=False)
- @app.get("/metrics/proxy-targets")
- async def metrics_proxy_targets(session: SessionDep):
- return await _metrics_targets(session, is_proxy=True)
- async def _metrics_targets(session: AsyncSession, is_proxy: bool):
- targets = []
- worker_list = await Worker.all(
- session=session, options=[selectinload(Worker.cluster)]
- )
- cluster_workers = {}
- for worker in worker_list:
- preferred_address = (
- worker.advertise_address if not is_proxy else worker.ip
- )
- if (
- worker.state == WorkerStateEnum.READY
- and worker.metrics_port
- and worker.metrics_port > 0
- and (
- is_proxy
- == (worker.proxy_mode == ModelInstanceProxyModeEnum.TUNNEL)
- )
- ):
- key = (worker.cluster_id, worker.cluster.name)
- if key not in cluster_workers:
- cluster_workers[key] = []
- cluster_workers[key].append(
- f"{preferred_address}:{worker.metrics_port}"
- )
- for (cluster_id, cluster_name), endpoints in cluster_workers.items():
- targets.append(
- {
- "labels": {
- "cluster_id": str(cluster_id),
- "cluster_name": cluster_name,
- },
- "targets": endpoints,
- }
- )
- return targets
- config = uvicorn.Config(
- app,
- host="0.0.0.0",
- port=self._port,
- access_log=False,
- log_level="error",
- )
- setup_logging()
- logger.info(f"Serving metric exporter on {config.host}:{config.port}.")
- server = uvicorn.Server(config)
- await server.serve()
- except Exception as e:
- logger.error(f"Failed to start metric exporter: {e}")
|