| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- """Pure unit tests for ``UsageDetailsArchiver``.
- DB-level (sqlite/postgres) integration is intentionally out of scope here.
- This file covers everything that doesn't touch the database:
- * calendar arithmetic for the retention cutoff
- * hot ↔ archive table shape alignment (required by the bulk SQL path)
- * cron expression validation + next-fire computation
- """
- from datetime import datetime
- import pytest
- from gpustack.schemas.model_usage_details import (
- ModelUsageDetails,
- ModelUsageDetailsArchive,
- )
- from gpustack.server.usage_details_archiver import (
- UsageDetailsArchiver,
- _assert_archive_shape_aligned,
- _months_ago,
- )
- # ---------------------------------------------------------------------------
- # _months_ago — calendar arithmetic, not timedelta(days=30*N)
- # ---------------------------------------------------------------------------
- def test_months_ago_simple_subtraction():
- assert _months_ago(datetime(2026, 5, 7, 12, 0), 13) == datetime(2025, 4, 7, 12, 0)
- def test_months_ago_clamps_short_target_month():
- # 3/31 - 1 month must clamp to 2/28 (or 2/29 in leap years), not overflow to 3/3.
- assert _months_ago(datetime(2026, 3, 31, 12, 0), 1) == datetime(2026, 2, 28, 12, 0)
- def test_months_ago_handles_leap_year():
- assert _months_ago(datetime(2024, 3, 31), 1) == datetime(2024, 2, 29)
- def test_months_ago_wraps_year_boundary():
- assert _months_ago(datetime(2026, 1, 15, 12, 0), 2) == datetime(2025, 11, 15, 12, 0)
- def test_months_ago_wraps_year_boundary_multi():
- # 25 months back crosses two year boundaries.
- assert _months_ago(datetime(2026, 5, 7), 25) == datetime(2024, 4, 7)
- def test_months_ago_preserves_time_of_day():
- src = datetime(2026, 5, 7, 13, 45, 30, 123456)
- out = _months_ago(src, 13)
- assert (out.hour, out.minute, out.second, out.microsecond) == (13, 45, 30, 123456)
- # ---------------------------------------------------------------------------
- # Hot ↔ archive table shape alignment
- # (bulk INSERT ... SELECT positionally requires identical column lists)
- # ---------------------------------------------------------------------------
- def test_archive_shape_alignment_passes_for_current_schemas():
- # Should not raise — this is the runtime contract the archiver depends on.
- _assert_archive_shape_aligned()
- def test_archive_shape_alignment_includes_all_business_columns():
- """Belt-and-suspenders against silent column loss — call out the columns
- we expect on both sides explicitly so a future schema change can't strip
- them and still pass ``_assert_archive_shape_aligned``."""
- expected = {
- "id",
- "user_id",
- "user_name",
- "model_id",
- "model_name",
- "model_route_id",
- "model_route_name",
- "provider_id",
- "provider_name",
- "provider_type",
- "cluster_id",
- "cluster_name",
- "api_key_id",
- "api_key_name",
- "access_key",
- "api_key_is_custom",
- "date",
- "prompt_token_count",
- "completion_token_count",
- "prompt_cached_token_count",
- "operation",
- "started_at",
- "completed_at",
- "created_at",
- "updated_at",
- "deleted_at",
- }
- hot = {c.name for c in ModelUsageDetails.__table__.columns}
- archive = {c.name for c in ModelUsageDetailsArchive.__table__.columns}
- assert expected <= hot, f"hot table missing: {expected - hot}"
- assert expected <= archive, f"archive table missing: {expected - archive}"
- def test_archive_shape_alignment_raises_on_drift(monkeypatch):
- """Synthetic drift via monkeypatched __table__ proves the assertion
- actually trips — without this we can't trust the no-raise case above."""
- class _FakeColumn:
- def __init__(self, name):
- self.name = name
- class _FakeTable:
- def __init__(self, columns):
- self.columns = columns
- drifted_archive = _FakeTable(
- [_FakeColumn(c.name) for c in ModelUsageDetailsArchive.__table__.columns]
- )
- drifted_archive.columns = drifted_archive.columns[:-1] # drop one column
- monkeypatch.setattr(ModelUsageDetailsArchive, "__table__", drifted_archive)
- with pytest.raises(RuntimeError, match="column mismatch"):
- _assert_archive_shape_aligned()
- # ---------------------------------------------------------------------------
- # Construction-time validation of the cron expression
- # ---------------------------------------------------------------------------
- def test_archiver_rejects_invalid_cron(monkeypatch):
- monkeypatch.setattr("gpustack.envs.USAGE_DETAILS_ARCHIVE_CRON", "garbage")
- with pytest.raises(ValueError, match="USAGE_DETAILS_ARCHIVE_CRON"):
- UsageDetailsArchiver()
- def test_archiver_rejects_empty_cron(monkeypatch):
- monkeypatch.setattr("gpustack.envs.USAGE_DETAILS_ARCHIVE_CRON", "")
- with pytest.raises(ValueError, match="USAGE_DETAILS_ARCHIVE_CRON"):
- UsageDetailsArchiver()
- def test_archiver_reads_retention_and_batch_size(monkeypatch):
- monkeypatch.setattr("gpustack.envs.USAGE_DETAILS_RETENTION_MONTHS", 7)
- monkeypatch.setattr("gpustack.envs.USAGE_DETAILS_ARCHIVE_BATCH_SIZE", 250)
- arc = UsageDetailsArchiver()
- assert arc._retention_months == 7
- assert arc._batch_size == 250
- # ---------------------------------------------------------------------------
- # Next-fire computation across common cron expressions
- # ---------------------------------------------------------------------------
- @pytest.mark.parametrize(
- "expr,upper_bound_seconds",
- [
- ("0 3 * * *", 24 * 3600), # daily at 03:00 — within a day
- ("*/15 * * * *", 15 * 60), # every 15 minutes
- ("0 */6 * * *", 6 * 3600), # every 6 hours
- ("30 2 * * 0", 7 * 24 * 3600), # weekly Sunday — within a week
- ("0 0 1 * *", 32 * 24 * 3600), # 1st of month — within ~1 month
- ],
- )
- def test_seconds_until_next_fire_within_bound(monkeypatch, expr, upper_bound_seconds):
- monkeypatch.setattr("gpustack.envs.USAGE_DETAILS_ARCHIVE_CRON", expr)
- secs = UsageDetailsArchiver()._seconds_until_next_fire()
- assert secs is not None
- assert 0 < secs <= upper_bound_seconds
- def test_seconds_until_next_fire_returns_float(monkeypatch):
- monkeypatch.setattr("gpustack.envs.USAGE_DETAILS_ARCHIVE_CRON", "*/5 * * * *")
- assert isinstance(UsageDetailsArchiver()._seconds_until_next_fire(), float)
|