test_metrics_collector.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. import asyncio
  2. import pytest
  3. from gpustack.server.metrics_collector import (
  4. ModelUsageMetrics,
  5. _estimate_partial_usage,
  6. _make_buffer_key,
  7. _resolve_usage_tokens,
  8. accumulate_gateway_metrics,
  9. gateway_details_buffer,
  10. gateway_metrics_buffer,
  11. )
  12. @pytest.fixture(autouse=True)
  13. def clear_buffer():
  14. gateway_metrics_buffer.clear()
  15. gateway_details_buffer.clear()
  16. yield
  17. gateway_metrics_buffer.clear()
  18. gateway_details_buffer.clear()
  19. def test_model_usage_metrics_defaults():
  20. m = ModelUsageMetrics(model="qwen3-0.6b")
  21. assert m.input_token == 0
  22. assert m.output_token == 0
  23. assert m.total_token == 0
  24. assert m.input_cached_token == 0
  25. assert m.request_count == 1
  26. assert m.user_id is None
  27. assert m.model_id is None
  28. assert m.provider_id is None
  29. assert m.access_key is None
  30. # 1700000000000 ms = 2023-11-14 UTC; pin completed_at so the date segment
  31. # of the buffer key is deterministic.
  32. _FIXED_COMPLETED_AT_MS = 1700000000000
  33. _FIXED_DATE_ISO = "2023-11-14"
  34. def test_make_buffer_key():
  35. m = ModelUsageMetrics(
  36. model="qwen3-0.6b",
  37. model_id=1,
  38. user_id=2,
  39. access_key="abc",
  40. completed_at=_FIXED_COMPLETED_AT_MS,
  41. )
  42. assert _make_buffer_key(m) == f"1..qwen3-0.6b.2.abc..{_FIXED_DATE_ISO}"
  43. def test_make_buffer_key_none_fields():
  44. m = ModelUsageMetrics(model="qwen3-0.6b", completed_at=_FIXED_COMPLETED_AT_MS)
  45. assert _make_buffer_key(m) == f"..qwen3-0.6b....{_FIXED_DATE_ISO}"
  46. def test_accumulate_new_entry():
  47. m = ModelUsageMetrics(
  48. model="qwen3-0.6b",
  49. model_id=1,
  50. user_id=2,
  51. input_token=100,
  52. output_token=200,
  53. input_cached_token=60,
  54. request_count=1,
  55. )
  56. asyncio.run(accumulate_gateway_metrics([m]))
  57. assert len(gateway_metrics_buffer) == 1
  58. entry = list(gateway_metrics_buffer.values())[0]
  59. assert entry.input_token == 100
  60. assert entry.output_token == 200
  61. assert entry.input_cached_token == 60
  62. assert entry.request_count == 1
  63. def test_accumulate_same_key_sums_values():
  64. m1 = ModelUsageMetrics(
  65. model="qwen3-0.6b",
  66. model_id=1,
  67. user_id=2,
  68. input_token=100,
  69. output_token=200,
  70. input_cached_token=60,
  71. request_count=1,
  72. )
  73. m2 = ModelUsageMetrics(
  74. model="qwen3-0.6b",
  75. model_id=1,
  76. user_id=2,
  77. input_token=50,
  78. output_token=80,
  79. input_cached_token=15,
  80. request_count=1,
  81. )
  82. asyncio.run(accumulate_gateway_metrics([m1]))
  83. asyncio.run(accumulate_gateway_metrics([m2]))
  84. assert len(gateway_metrics_buffer) == 1
  85. entry = list(gateway_metrics_buffer.values())[0]
  86. assert entry.input_token == 150
  87. assert entry.output_token == 280
  88. assert entry.input_cached_token == 75
  89. assert entry.request_count == 2
  90. def test_accumulate_different_keys():
  91. m1 = ModelUsageMetrics(model="qwen3-0.6b", model_id=1, user_id=2, input_token=100)
  92. m2 = ModelUsageMetrics(model="qwen3-0.6b", model_id=1, user_id=3, input_token=50)
  93. asyncio.run(accumulate_gateway_metrics([m1, m2]))
  94. assert len(gateway_metrics_buffer) == 2
  95. def test_accumulate_total_token_summed():
  96. m1 = ModelUsageMetrics(model="m", model_id=1, total_token=300)
  97. m2 = ModelUsageMetrics(model="m", model_id=1, total_token=100)
  98. asyncio.run(accumulate_gateway_metrics([m1]))
  99. asyncio.run(accumulate_gateway_metrics([m2]))
  100. entry = list(gateway_metrics_buffer.values())[0]
  101. assert entry.total_token == 400
  102. def test_resolve_usage_tokens_falls_back_total_for_reranker_model():
  103. metric = ModelUsageMetrics(model="bge-m3", total_token=77)
  104. model = type("StubModel", (), {"categories": ["reranker"]})()
  105. prompt_tokens, completion_tokens = _resolve_usage_tokens(metric, model)
  106. assert prompt_tokens == 77
  107. assert completion_tokens == 0
  108. # ---------------------------------------------------------------------------
  109. # _estimate_partial_usage — server-side backfill for incomplete reports
  110. # ---------------------------------------------------------------------------
  111. def test_estimate_partial_usage_skips_completed_reports():
  112. # ``completed=True`` means the canonical usage chunk arrived; no estimation
  113. # should overwrite the authoritative tokens.
  114. metric = ModelUsageMetrics(
  115. model="m",
  116. completed=True,
  117. request_content_bytes=4096,
  118. output_chunk_count=128,
  119. )
  120. _estimate_partial_usage(metric)
  121. assert metric.input_token == 0
  122. assert metric.output_token == 0
  123. assert metric.total_token == 0
  124. def test_estimate_partial_usage_backfills_blank_tokens(monkeypatch):
  125. # Pin divisors so the assertion is independent of the production defaults.
  126. monkeypatch.setattr("gpustack.envs.USAGE_ESTIMATED_BYTES_PER_INPUT_TOKEN", 4)
  127. monkeypatch.setattr("gpustack.envs.USAGE_ESTIMATED_TOKENS_PER_OUTPUT_CHUNK", 2)
  128. metric = ModelUsageMetrics(
  129. model="m",
  130. completed=False,
  131. request_content_bytes=400,
  132. output_chunk_count=10,
  133. )
  134. _estimate_partial_usage(metric)
  135. assert metric.input_token == 100 # 400 / 4
  136. assert metric.output_token == 20 # 10 * 2
  137. assert metric.total_token == 120
  138. def test_estimate_partial_usage_preserves_existing_partial_values(monkeypatch):
  139. # Anthropic-style early ``input_token`` from message_start must survive
  140. # — only blank slots get filled.
  141. monkeypatch.setattr("gpustack.envs.USAGE_ESTIMATED_BYTES_PER_INPUT_TOKEN", 4)
  142. metric = ModelUsageMetrics(
  143. model="m",
  144. completed=False,
  145. input_token=999,
  146. request_content_bytes=400,
  147. output_chunk_count=0,
  148. )
  149. _estimate_partial_usage(metric)
  150. assert metric.input_token == 999 # not overwritten
  151. assert metric.output_token == 0 # no chunks → no estimate
  152. def test_estimate_partial_usage_clamps_input_token_to_at_least_one(monkeypatch):
  153. # Tiny payload (< divisor) must still produce a non-zero input token —
  154. # otherwise the request count rises but token count stays at 0 forever
  155. # for short prompts on disconnect, masking the request from billing.
  156. monkeypatch.setattr("gpustack.envs.USAGE_ESTIMATED_BYTES_PER_INPUT_TOKEN", 100)
  157. metric = ModelUsageMetrics(
  158. model="m",
  159. completed=False,
  160. request_content_bytes=10,
  161. )
  162. _estimate_partial_usage(metric)
  163. assert metric.input_token == 1
  164. # ---------------------------------------------------------------------------
  165. # _trim_details_buffer_locked — bounds details buffer under flush failure
  166. # ---------------------------------------------------------------------------
  167. def test_accumulate_caps_details_buffer_and_drops_oldest(monkeypatch, caplog):
  168. # Cap to 3 so the test is fast; push 5 distinct metrics and assert the
  169. # oldest two are evicted FIFO with a WARNING log.
  170. monkeypatch.setattr("gpustack.envs.USAGE_DETAILS_BUFFER_MAX_SIZE", 3)
  171. metrics = [
  172. ModelUsageMetrics(model="m", model_id=1, user_id=i, input_token=i)
  173. for i in range(1, 6)
  174. ]
  175. with caplog.at_level("WARNING"):
  176. asyncio.run(accumulate_gateway_metrics(metrics))
  177. assert len(gateway_details_buffer) == 3
  178. # FIFO eviction: the surviving entries are the last three pushed.
  179. assert [e.user_id for e in gateway_details_buffer] == [3, 4, 5]
  180. assert any(
  181. "gateway_details_buffer exceeded cap" in rec.message for rec in caplog.records
  182. )
  183. # ---------------------------------------------------------------------------
  184. # Buffer key splits across midnight so cross-day streams roll up separately
  185. # ---------------------------------------------------------------------------
  186. def test_make_buffer_key_splits_across_midnight():
  187. # Two metrics identical except for the completion date — must hash to
  188. # distinct keys so the rollup attributes each request to the day it
  189. # ended in (proxy contract: completed_at anchors the billing period).
  190. nov_14_ms = 1700000000000 # 2023-11-14
  191. nov_15_ms = nov_14_ms + 24 * 3600 * 1000 # 2023-11-15
  192. m_day1 = ModelUsageMetrics(
  193. model="qwen3-0.6b",
  194. model_id=1,
  195. user_id=2,
  196. completed_at=nov_14_ms,
  197. )
  198. m_day2 = ModelUsageMetrics(
  199. model="qwen3-0.6b",
  200. model_id=1,
  201. user_id=2,
  202. completed_at=nov_15_ms,
  203. )
  204. key1 = _make_buffer_key(m_day1)
  205. key2 = _make_buffer_key(m_day2)
  206. assert key1 != key2
  207. assert key1.endswith(".2023-11-14")
  208. assert key2.endswith(".2023-11-15")
  209. def test_accumulate_splits_rollup_across_midnight():
  210. nov_14_ms = 1700000000000
  211. nov_15_ms = nov_14_ms + 24 * 3600 * 1000
  212. asyncio.run(
  213. accumulate_gateway_metrics(
  214. [
  215. ModelUsageMetrics(
  216. model="qwen3-0.6b",
  217. model_id=1,
  218. user_id=2,
  219. input_token=10,
  220. completed_at=nov_14_ms,
  221. ),
  222. ModelUsageMetrics(
  223. model="qwen3-0.6b",
  224. model_id=1,
  225. user_id=2,
  226. input_token=20,
  227. completed_at=nov_15_ms,
  228. ),
  229. ]
  230. )
  231. )
  232. assert len(gateway_metrics_buffer) == 2