bus_metrics.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. """Prometheus metrics for the in-process event bus, pulled at scrape time."""
  2. from typing import Iterator
  3. from prometheus_client.registry import Collector
  4. from prometheus_client.core import (
  5. CounterMetricFamily,
  6. GaugeMetricFamily,
  7. Metric,
  8. )
  9. from gpustack.server.bus import event_bus
  10. from gpustack.utils.name import metric_name
  11. class BusMetricsCollector(Collector):
  12. def collect(self) -> Iterator[Metric]:
  13. subscribers = GaugeMetricFamily(
  14. metric_name("bus_subscribers"),
  15. "Number of active bus subscribers per topic.",
  16. labels=["topic"],
  17. )
  18. queue_depth = GaugeMetricFamily(
  19. metric_name("bus_queue_depth"),
  20. "Current per-subscriber queue depth at scrape time.",
  21. labels=["topic", "source"],
  22. )
  23. queue_capacity = GaugeMetricFamily(
  24. metric_name("bus_queue_capacity"),
  25. "Per-subscriber queue maxsize.",
  26. labels=["topic", "source"],
  27. )
  28. queue_full = GaugeMetricFamily(
  29. metric_name("bus_queue_full"),
  30. "1 if the subscriber queue is full at scrape time, 0 otherwise. "
  31. "A sustained 1 indicates a slow consumer holding back the bus.",
  32. labels=["topic", "source"],
  33. )
  34. queue_saturation = GaugeMetricFamily(
  35. metric_name("bus_queue_saturation_ratio"),
  36. "Per-subscriber queue depth as a fraction of capacity "
  37. "(qsize / maxsize).",
  38. labels=["topic", "source"],
  39. )
  40. latest_keys = GaugeMetricFamily(
  41. metric_name("bus_subscriber_latest_keys"),
  42. "Number of distinct ids pending coalesced UPDATED delivery "
  43. "per subscriber (size of latest_by_key).",
  44. labels=["topic", "source"],
  45. )
  46. events = CounterMetricFamily(
  47. metric_name("bus_events"),
  48. "Cumulative event counts per subscriber. Kinds: "
  49. "received, filtered, coalesced, enqueued, backpressured "
  50. "(see EventCountKind).",
  51. labels=["topic", "source", "kind", "event_type"],
  52. )
  53. # Copy in case subscribe/unsubscribe races with collection.
  54. snapshot = {topic: list(subs) for topic, subs in event_bus.subscribers.items()}
  55. for topic, subs in snapshot.items():
  56. subscribers.add_metric([topic], len(subs))
  57. for sub in subs:
  58. source = sub.source or ""
  59. qsize = sub.queue.qsize()
  60. maxsize = sub.queue.maxsize
  61. queue_depth.add_metric([topic, source], qsize)
  62. queue_capacity.add_metric([topic, source], maxsize)
  63. queue_full.add_metric([topic, source], 1.0 if sub.queue.full() else 0.0)
  64. queue_saturation.add_metric(
  65. [topic, source], qsize / maxsize if maxsize else 0.0
  66. )
  67. latest_keys.add_metric([topic, source], len(sub.latest_by_key))
  68. for (kind, event_type_name), count in sub.event_counts.items():
  69. events.add_metric(
  70. [topic, source, kind.value, event_type_name], count
  71. )
  72. yield subscribers
  73. yield queue_depth
  74. yield queue_capacity
  75. yield queue_full
  76. yield queue_saturation
  77. yield latest_keys
  78. yield events