test_bus_metrics.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. """Tests for the pull-based bus metrics collector."""
  2. import asyncio
  3. import pytest
  4. from gpustack.exporter.bus_metrics import BusMetricsCollector
  5. from gpustack.server.bus import (
  6. Event,
  7. EventCountKind,
  8. EventType,
  9. Subscriber,
  10. event_bus,
  11. )
  12. def _sample_for(metrics, metric_name, label_match):
  13. for metric in metrics:
  14. if metric.name != metric_name:
  15. continue
  16. for sample in metric.samples:
  17. if all(sample.labels.get(k) == v for k, v in label_match.items()):
  18. return sample
  19. raise AssertionError(
  20. f"Sample for metric={metric_name} labels={label_match} not found"
  21. )
  22. def _all_samples(metrics, metric_name):
  23. for metric in metrics:
  24. if metric.name == metric_name:
  25. return list(metric.samples)
  26. return []
  27. @pytest.mark.asyncio
  28. async def test_bus_metrics_collector_reflects_subscriber_state():
  29. topic = "_test_bus_metrics_topic"
  30. sub_a = event_bus.subscribe(topic, source="test_a")
  31. sub_b = event_bus.subscribe(topic, source="test_b", event_types={EventType.CREATED})
  32. try:
  33. # sub_a takes everything (second UPDATED for id=2 → COALESCED);
  34. # sub_b filters non-CREATED.
  35. await sub_a.enqueue(Event(type=EventType.CREATED, data={"id": 1}, id=1))
  36. await sub_a.enqueue(Event(type=EventType.UPDATED, data={"id": 2}, id=2))
  37. await sub_a.enqueue(Event(type=EventType.UPDATED, data={"id": 2}, id=2))
  38. await sub_b.enqueue(Event(type=EventType.UPDATED, data={"id": 3}, id=3))
  39. await sub_b.enqueue(Event(type=EventType.CREATED, data={"id": 4}, id=4))
  40. metrics = list(BusMetricsCollector().collect())
  41. # 2 subscribers under our topic.
  42. sample = _sample_for(metrics, "gpustack:bus_subscribers", {"topic": topic})
  43. assert sample.value == 2
  44. # sub_a — RECEIVED total = 3.
  45. sample = _sample_for(
  46. metrics,
  47. "gpustack:bus_events",
  48. {
  49. "topic": topic,
  50. "source": "test_a",
  51. "kind": EventCountKind.RECEIVED.value,
  52. "event_type": EventType.UPDATED.name,
  53. },
  54. )
  55. assert sample.value == 2 # two UPDATED events received
  56. # sub_a — second UPDATED merged into latest_by_key.
  57. sample = _sample_for(
  58. metrics,
  59. "gpustack:bus_events",
  60. {
  61. "topic": topic,
  62. "source": "test_a",
  63. "kind": EventCountKind.COALESCED.value,
  64. "event_type": EventType.UPDATED.name,
  65. },
  66. )
  67. assert sample.value == 1
  68. # sub_a — first UPDATED actually entered the queue.
  69. sample = _sample_for(
  70. metrics,
  71. "gpustack:bus_events",
  72. {
  73. "topic": topic,
  74. "source": "test_a",
  75. "kind": EventCountKind.ENQUEUED.value,
  76. "event_type": EventType.UPDATED.name,
  77. },
  78. )
  79. assert sample.value == 1
  80. # sub_a — CREATED enqueued.
  81. sample = _sample_for(
  82. metrics,
  83. "gpustack:bus_events",
  84. {
  85. "topic": topic,
  86. "source": "test_a",
  87. "kind": EventCountKind.ENQUEUED.value,
  88. "event_type": EventType.CREATED.name,
  89. },
  90. )
  91. assert sample.value == 1
  92. # sub_b — UPDATED received then filtered.
  93. sample = _sample_for(
  94. metrics,
  95. "gpustack:bus_events",
  96. {
  97. "topic": topic,
  98. "source": "test_b",
  99. "kind": EventCountKind.RECEIVED.value,
  100. "event_type": EventType.UPDATED.name,
  101. },
  102. )
  103. assert sample.value == 1
  104. sample = _sample_for(
  105. metrics,
  106. "gpustack:bus_events",
  107. {
  108. "topic": topic,
  109. "source": "test_b",
  110. "kind": EventCountKind.FILTERED.value,
  111. "event_type": EventType.UPDATED.name,
  112. },
  113. )
  114. assert sample.value == 1
  115. # Filter rejected before enqueue → no ENQUEUED for that event_type.
  116. for sample in _all_samples(metrics, "gpustack:bus_events"):
  117. if (
  118. sample.labels.get("topic") == topic
  119. and sample.labels.get("source") == "test_b"
  120. and sample.labels.get("event_type") == EventType.UPDATED.name
  121. ):
  122. assert sample.labels.get("kind") != EventCountKind.ENQUEUED.value
  123. # sub_b — CREATED received and enqueued.
  124. sample = _sample_for(
  125. metrics,
  126. "gpustack:bus_events",
  127. {
  128. "topic": topic,
  129. "source": "test_b",
  130. "kind": EventCountKind.ENQUEUED.value,
  131. "event_type": EventType.CREATED.name,
  132. },
  133. )
  134. assert sample.value == 1
  135. # No backpressure samples expected — queues weren't full.
  136. for sample in _all_samples(metrics, "gpustack:bus_events"):
  137. if sample.labels.get("topic") == topic:
  138. assert sample.labels.get("kind") != EventCountKind.BACKPRESSURED.value
  139. # queue_full gauge = 0 for both subs (queues have items but aren't full).
  140. for source in ("test_a", "test_b"):
  141. sample = _sample_for(
  142. metrics,
  143. "gpustack:bus_queue_full",
  144. {
  145. "topic": topic,
  146. "source": source,
  147. },
  148. )
  149. assert sample.value == 0
  150. finally:
  151. event_bus.unsubscribe(topic, sub_a)
  152. event_bus.unsubscribe(topic, sub_b)
  153. @pytest.mark.asyncio
  154. async def test_bus_metrics_collector_reports_backpressure_and_queue_full():
  155. topic = "_test_bus_metrics_qfull"
  156. sub = Subscriber(topic=topic, source="slow", queue_size=1)
  157. event_bus.subscribers.setdefault(topic, []).append(sub)
  158. try:
  159. await sub.enqueue(Event(type=EventType.CREATED, data={"id": 1}, id=1))
  160. pending = asyncio.create_task(
  161. sub.enqueue(Event(type=EventType.CREATED, data={"id": 2}, id=2))
  162. )
  163. # Yield so the enqueue task hits the full-queue branch.
  164. for _ in range(5):
  165. await asyncio.sleep(0)
  166. if sub.event_counts.get(
  167. (EventCountKind.BACKPRESSURED, EventType.CREATED.name)
  168. ):
  169. break
  170. pending.cancel()
  171. try:
  172. await pending
  173. except (asyncio.CancelledError, Exception):
  174. pass
  175. metrics = list(BusMetricsCollector().collect())
  176. sample = _sample_for(
  177. metrics,
  178. "gpustack:bus_events",
  179. {
  180. "topic": topic,
  181. "source": "slow",
  182. "kind": EventCountKind.BACKPRESSURED.value,
  183. "event_type": EventType.CREATED.name,
  184. },
  185. )
  186. assert sample.value >= 1
  187. sample = _sample_for(
  188. metrics,
  189. "gpustack:bus_queue_full",
  190. {
  191. "topic": topic,
  192. "source": "slow",
  193. },
  194. )
  195. assert sample.value == 1
  196. sample = _sample_for(
  197. metrics,
  198. "gpustack:bus_queue_saturation_ratio",
  199. {
  200. "topic": topic,
  201. "source": "slow",
  202. },
  203. )
  204. assert sample.value == 1.0
  205. sample = _sample_for(
  206. metrics,
  207. "gpustack:bus_queue_capacity",
  208. {
  209. "topic": topic,
  210. "source": "slow",
  211. },
  212. )
  213. assert sample.value == 1
  214. finally:
  215. event_bus.unsubscribe(topic, sub)
  216. @pytest.mark.asyncio
  217. async def test_received_equals_filtered_plus_coalesced_plus_enqueued():
  218. """Sanity: ``RECEIVED = FILTERED + COALESCED + ENQUEUED``."""
  219. topic = "_test_bus_metrics_invariant"
  220. sub = event_bus.subscribe(
  221. topic, source="invariant", event_types={EventType.CREATED, EventType.UPDATED}
  222. )
  223. try:
  224. # Mix: 2 CREATED (both enqueued), 3 UPDATED for id=10 (1 enqueued + 2
  225. # coalesced), 1 DELETED (filtered out by event_types).
  226. await sub.enqueue(Event(type=EventType.CREATED, data={"id": 1}, id=1))
  227. await sub.enqueue(Event(type=EventType.CREATED, data={"id": 2}, id=2))
  228. await sub.enqueue(Event(type=EventType.UPDATED, data={"id": 10}, id=10))
  229. await sub.enqueue(Event(type=EventType.UPDATED, data={"id": 10}, id=10))
  230. await sub.enqueue(Event(type=EventType.UPDATED, data={"id": 10}, id=10))
  231. await sub.enqueue(Event(type=EventType.DELETED, data={"id": 99}, id=99))
  232. def total(kind: EventCountKind) -> int:
  233. return sum(
  234. count for (k, _evt), count in sub.event_counts.items() if k is kind
  235. )
  236. assert total(EventCountKind.RECEIVED) == 6
  237. assert total(EventCountKind.FILTERED) == 1
  238. assert total(EventCountKind.COALESCED) == 2
  239. assert total(EventCountKind.ENQUEUED) == 3
  240. assert total(EventCountKind.RECEIVED) == total(EventCountKind.FILTERED) + total(
  241. EventCountKind.COALESCED
  242. ) + total(EventCountKind.ENQUEUED)
  243. finally:
  244. event_bus.unsubscribe(topic, sub)