usage_details_archiver.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. """Periodic archival of ``model_usage_details`` rows past the retention window.
  2. Hot rows older than ``USAGE_DETAILS_RETENTION_MONTHS`` (anchored on
  3. ``COALESCE(completed_at, created_at)``) are moved to
  4. ``model_usage_details_archive`` so the hot table stays bounded for fast
  5. reconciliation queries while the audit trail is preserved indefinitely.
  6. Runs once on server startup and then on the cron schedule defined by
  7. ``USAGE_DETAILS_ARCHIVE_CRON`` (UTC). Leader-only — both archive and hot
  8. tables would race if multiple replicas ran the sweep concurrently.
  9. """
  10. import asyncio
  11. import calendar
  12. import logging
  13. from datetime import datetime, timezone
  14. from apscheduler.triggers.cron import CronTrigger
  15. from sqlalchemy import and_, delete, insert, or_, select
  16. from gpustack import envs
  17. from gpustack.schemas.model_usage_details import (
  18. ModelUsageDetails,
  19. ModelUsageDetailsArchive,
  20. )
  21. from gpustack.server.db import async_session
  22. logger = logging.getLogger(__name__)
  23. def _months_ago(dt: datetime, months: int) -> datetime:
  24. """Subtract ``months`` calendar months from ``dt``, clamping the day.
  25. Uses calendar arithmetic (not ``timedelta(days=30 * months)``) so the
  26. boundary is precise — ``13 months ago`` from May 7 is April 7, not
  27. a 5-day-off approximation.
  28. """
  29. target_year = dt.year
  30. target_month = dt.month - months
  31. while target_month <= 0:
  32. target_year -= 1
  33. target_month += 12
  34. last_day = calendar.monthrange(target_year, target_month)[1]
  35. target_day = min(dt.day, last_day)
  36. return dt.replace(year=target_year, month=target_month, day=target_day)
  37. def _assert_archive_shape_aligned() -> None:
  38. """Bulk archival relies on hot and archive tables having identical column
  39. lists so ``INSERT ... SELECT`` lines up positionally. If they ever drift,
  40. fail loudly at server startup instead of silently dropping a column on
  41. the next sweep.
  42. """
  43. hot_cols = {c.name for c in ModelUsageDetails.__table__.columns}
  44. archive_cols = {c.name for c in ModelUsageDetailsArchive.__table__.columns}
  45. if hot_cols != archive_cols:
  46. only_hot = hot_cols - archive_cols
  47. only_archive = archive_cols - hot_cols
  48. raise RuntimeError(
  49. "model_usage_details ↔ model_usage_details_archive column "
  50. "mismatch — bulk archival requires identical column lists. "
  51. f"Only on hot: {sorted(only_hot)}; "
  52. f"only on archive: {sorted(only_archive)}."
  53. )
  54. class UsageDetailsArchiver:
  55. """Leader-only loop that archives expired ``model_usage_details`` rows."""
  56. def __init__(self) -> None:
  57. self._retention_months = envs.USAGE_DETAILS_RETENTION_MONTHS
  58. self._batch_size = envs.USAGE_DETAILS_ARCHIVE_BATCH_SIZE
  59. # Validate the cron expression eagerly — a bad expression should
  60. # surface at startup, not silently degrade into an idle loop.
  61. # ``timezone.utc`` makes the schedule predictable across deployments
  62. # regardless of container TZ.
  63. try:
  64. self._trigger = CronTrigger.from_crontab(
  65. envs.USAGE_DETAILS_ARCHIVE_CRON, timezone=timezone.utc
  66. )
  67. except Exception as e:
  68. raise ValueError(
  69. "Invalid GPUSTACK_USAGE_DETAILS_ARCHIVE_CRON "
  70. f"(value={envs.USAGE_DETAILS_ARCHIVE_CRON!r}): {e}"
  71. ) from e
  72. # Surface schema drift at construction; the bulk SQL path below
  73. # would otherwise silently drop columns missing from the archive.
  74. _assert_archive_shape_aligned()
  75. # Cache the canonical column list once — used for INSERT ... SELECT.
  76. self._mirror_columns = [
  77. c.name for c in ModelUsageDetailsArchive.__table__.columns
  78. ]
  79. async def start(self) -> None:
  80. # Initial run: catches up on rows that aged out while no leader was
  81. # running (or while the previous leader was between cycles).
  82. try:
  83. await self.archive_once()
  84. except Exception as e:
  85. logger.error(f"Initial usage details archival failed: {e}")
  86. while True:
  87. sleep_seconds = self._seconds_until_next_fire()
  88. if sleep_seconds is None:
  89. # No future fire time — schedule is malformed in a way
  90. # CronTrigger accepted but can't satisfy. Bail out instead
  91. # of busy-looping.
  92. logger.error(
  93. "Cron %r yielded no future fire time; archiver loop stopping.",
  94. envs.USAGE_DETAILS_ARCHIVE_CRON,
  95. )
  96. return
  97. await asyncio.sleep(sleep_seconds)
  98. try:
  99. await self.archive_once()
  100. except Exception as e:
  101. logger.error(f"Usage details archival failed: {e}")
  102. def _seconds_until_next_fire(self) -> float | None:
  103. now = datetime.now(timezone.utc)
  104. next_fire = self._trigger.get_next_fire_time(None, now)
  105. if next_fire is None:
  106. return None
  107. # APScheduler returns a tz-aware datetime in the trigger's tz; subtract
  108. # works against ``now`` (also tz-aware UTC) without further coercion.
  109. return max(0.0, (next_fire - now).total_seconds())
  110. async def archive_once(self) -> int:
  111. """Drain rows older than the retention cutoff. Returns total moved."""
  112. cutoff = _months_ago(
  113. datetime.now(timezone.utc).replace(tzinfo=None),
  114. self._retention_months,
  115. )
  116. moved_total = 0
  117. while True:
  118. moved = await self._archive_batch(cutoff)
  119. if moved == 0:
  120. break
  121. moved_total += moved
  122. if moved_total > 0:
  123. logger.info(
  124. f"Archived {moved_total} model_usage_details rows older than "
  125. f"{cutoff.isoformat()} (retention={self._retention_months}mo)."
  126. )
  127. return moved_total
  128. async def _archive_batch(self, cutoff: datetime) -> int:
  129. """Move up to ``batch_size`` rows in a single transaction.
  130. Bulk SQL path: ``INSERT INTO archive (cols...) SELECT cols... FROM
  131. hot WHERE id IN (ids)`` keeps the row data inside the DB engine —
  132. Python only carries the id list. Avoids the per-row ORM hydration
  133. that would otherwise pull every column over the wire and rebuild
  134. it as a SQLAlchemy instance just to insert it back.
  135. """
  136. # Two-branch predicate (instead of COALESCE) so the planner can use
  137. # ix_..._completed_at on the modern fast path and fall back to
  138. # ix_..._created_at only for legacy rows missing completed_at.
  139. # PG combines them via BitmapOr; MySQL/SQLite may seq-scan but the
  140. # working set here is bounded by ``batch_size`` so it stays cheap.
  141. age_predicate = or_(
  142. and_(
  143. ModelUsageDetails.completed_at.is_not(None),
  144. ModelUsageDetails.completed_at < cutoff,
  145. ),
  146. and_(
  147. ModelUsageDetails.completed_at.is_(None),
  148. ModelUsageDetails.created_at < cutoff,
  149. ),
  150. )
  151. hot_table = ModelUsageDetails.__table__
  152. archive_table = ModelUsageDetailsArchive.__table__
  153. async with async_session() as session:
  154. ids = (
  155. await session.exec(
  156. select(ModelUsageDetails.id)
  157. .where(age_predicate)
  158. .order_by(ModelUsageDetails.id)
  159. .limit(self._batch_size)
  160. )
  161. ).all()
  162. if not ids:
  163. return 0
  164. # Defense against re-archival: if a previous sweep somehow left
  165. # an id in archive (replication quirk, mid-transaction rollback
  166. # of the delete leg), skip it on insert. INSERT and DELETE
  167. # share this transaction so the normal path can't hit this.
  168. existing_archive_ids = set(
  169. (
  170. await session.exec(
  171. select(ModelUsageDetailsArchive.id).where(
  172. ModelUsageDetailsArchive.id.in_(ids)
  173. )
  174. )
  175. ).all()
  176. )
  177. ids_to_insert = [i for i in ids if i not in existing_archive_ids]
  178. if ids_to_insert:
  179. projection = select(
  180. *[hot_table.c[name] for name in self._mirror_columns]
  181. ).where(hot_table.c.id.in_(ids_to_insert))
  182. await session.exec(
  183. insert(archive_table).from_select(self._mirror_columns, projection)
  184. )
  185. await session.exec(
  186. delete(ModelUsageDetails).where(ModelUsageDetails.id.in_(ids))
  187. )
  188. await session.commit()
  189. return len(ids)