from datetime import date, datetime from math import ceil from typing import Any, Dict, List, Optional from fastapi import APIRouter from sqlalchemy import Date, Select, String, and_, asc, cast, desc, literal from sqlmodel import func, or_, select from gpustack.api.exceptions import ForbiddenException, InvalidException from gpustack.schemas.model_usage import ModelUsage from gpustack.schemas.users import User from gpustack.schemas.principals import OrgRole from gpustack.schemas.usage import ( USAGE_GRANULARITY_MONTH, USAGE_GRANULARITY_DAY, USAGE_GRANULARITY_WEEK, USAGE_GROUP_BY_API_KEY, USAGE_GROUP_BY_DATE, USAGE_GROUP_BY_MODEL, USAGE_GROUP_BY_USER, USAGE_SCOPE_ALL, USAGE_SCOPE_SELF, USAGE_METRIC_API_KEYS_USED, USAGE_METRIC_API_REQUESTS, USAGE_METRIC_AVG_TOKENS_PER_REQUEST, USAGE_METRIC_INPUT_CACHED_TOKENS, USAGE_METRIC_DATE, USAGE_METRIC_INPUT_TOKENS, USAGE_METRIC_LAST_ACTIVE, USAGE_METRIC_MODELS_CALLED, USAGE_METRIC_OUTPUT_TOKENS, USAGE_METRIC_TOTAL_TOKENS, USAGE_SORT_DESC, UsageBreakdownDateDimension, UsageBreakdownDimension, UsageBreakdownItem, UsageBreakdownRequest, UsageBreakdownResponse, UsageFilterItem, UsageFilterOption, UsageFilters, UsageIdentity, UsageIdentityCurrent, UsageIdentityValue, UsageMetaResponse, UsageOption, UsageSummary, ) from gpustack.schemas.common import Pagination from gpustack.server.deps import CurrentUserDep, SessionDep, TenantContextDep from gpustack.utils.usage_snapshots import ( format_usage_api_key_label, format_usage_date_label, format_usage_model_label, format_usage_user_label, ) router = APIRouter() METRIC_OPTIONS = [ UsageOption(key=USAGE_METRIC_INPUT_TOKENS, label="Input Tokens"), UsageOption(key=USAGE_METRIC_OUTPUT_TOKENS, label="Output Tokens"), UsageOption(key=USAGE_METRIC_INPUT_CACHED_TOKENS, label="Input Cached Tokens"), UsageOption(key=USAGE_METRIC_TOTAL_TOKENS, label="Total Tokens"), UsageOption(key=USAGE_METRIC_API_REQUESTS, label="API Requests"), ] GRANULARITY_OPTIONS = [ UsageOption(key=USAGE_GRANULARITY_DAY, label="Day"), UsageOption(key=USAGE_GRANULARITY_WEEK, label="Week"), UsageOption(key=USAGE_GRANULARITY_MONTH, label="Month"), ] SELF_GROUP_BY_OPTIONS = [ UsageOption(key=USAGE_GROUP_BY_DATE, label="Date"), UsageOption(key=USAGE_GROUP_BY_MODEL, label="Model"), UsageOption(key=USAGE_GROUP_BY_API_KEY, label="API Key"), ] ADMIN_GROUP_BY_OPTIONS = [ UsageOption(key=USAGE_GROUP_BY_DATE, label="Date"), UsageOption(key=USAGE_GROUP_BY_MODEL, label="Model"), UsageOption(key=USAGE_GROUP_BY_USER, label="User"), UsageOption(key=USAGE_GROUP_BY_API_KEY, label="API Key"), ] def _null_safe_column_filter(column, value: Any): if value is None: return column.is_(None) return column == value def _model_key_expression(): return ( func.coalesce(ModelUsage.cluster_name + literal("/"), literal("")) + ModelUsage.model_name ) def _model_identity_expression(): """Distinct identity for counting models called. Include current IDs and snapshot fields so direct deployments, provider models, and deleted historical records with the same model name remain separate identities. """ return ( func.coalesce(cast(ModelUsage.model_id, String), literal("deleted")) + literal("|") + func.coalesce(cast(ModelUsage.provider_id, String), literal("no-provider")) + literal("|") + func.coalesce(ModelUsage.provider_name, literal("")) + literal("|") + func.coalesce(ModelUsage.provider_type, literal("")) + literal("|") + _model_key_expression() ) def _metric_columns() -> Dict[str, Any]: model_identity = _model_identity_expression() return { USAGE_METRIC_INPUT_TOKENS: func.coalesce( func.sum(ModelUsage.prompt_token_count), 0 ), USAGE_METRIC_OUTPUT_TOKENS: func.coalesce( func.sum(ModelUsage.completion_token_count), 0 ), USAGE_METRIC_INPUT_CACHED_TOKENS: func.coalesce( func.sum(ModelUsage.prompt_cached_token_count), 0 ), USAGE_METRIC_TOTAL_TOKENS: func.coalesce( func.sum(ModelUsage.prompt_token_count + ModelUsage.completion_token_count), 0, ), USAGE_METRIC_API_REQUESTS: func.coalesce(func.sum(ModelUsage.request_count), 0), USAGE_METRIC_MODELS_CALLED: func.count(func.distinct(model_identity)), } def _date_bucket_expression(session, granularity: str): if granularity == USAGE_GRANULARITY_DAY: return ModelUsage.date dialect = session.get_bind().dialect.name if granularity == USAGE_GRANULARITY_WEEK: if dialect == "postgresql": return cast(func.date_trunc("week", ModelUsage.date), Date) if dialect == "mysql": return func.subdate(ModelUsage.date, func.weekday(ModelUsage.date)) if granularity == USAGE_GRANULARITY_MONTH: if dialect == "postgresql": return cast(func.date_trunc("month", ModelUsage.date), Date) if dialect == "mysql": return func.str_to_date( func.date_format(ModelUsage.date, "%Y-%m-01"), "%Y-%m-%d" ) return ModelUsage.date def _group_columns(group_by: str, *, date_bucket_expr=None): if group_by == USAGE_GROUP_BY_DATE: if date_bucket_expr is None: date_bucket_expr = ModelUsage.date return [date_bucket_expr.label("group_date")], [date_bucket_expr] if group_by == USAGE_GROUP_BY_MODEL: return [ ModelUsage.cluster_name.label("group_cluster_name"), ModelUsage.model_name.label("group_model_name"), ModelUsage.model_id.label("group_model_id"), ModelUsage.provider_name.label("group_provider_name"), ModelUsage.provider_type.label("group_provider_type"), ModelUsage.provider_id.label("group_provider_id"), ], [ ModelUsage.cluster_name, ModelUsage.model_name, ModelUsage.model_id, ModelUsage.provider_name, ModelUsage.provider_type, ModelUsage.provider_id, ] if group_by == USAGE_GROUP_BY_USER: return [ ModelUsage.user_name.label("group_user_name"), ModelUsage.user_id.label("group_user_id"), ], [ModelUsage.user_name, ModelUsage.user_id] return [ ModelUsage.user_name.label("group_user_name"), ModelUsage.api_key_name.label("group_api_key_name"), ModelUsage.access_key.label("group_access_key"), ModelUsage.api_key_is_custom.label("group_api_key_is_custom"), ModelUsage.user_id.label("group_user_id"), ModelUsage.api_key_id.label("group_api_key_id"), ], [ ModelUsage.user_name, ModelUsage.api_key_name, ModelUsage.access_key, ModelUsage.api_key_is_custom, ModelUsage.user_id, ModelUsage.api_key_id, ] def _combined_group_columns(group_bys: List[str], *, date_bucket_expr=None): select_columns = [] select_column_keys = set() group_columns = [] group_column_keys = set() for group_by in group_bys: current_select_columns, current_group_columns = _group_columns( group_by, date_bucket_expr=date_bucket_expr ) for column in current_select_columns: key = getattr(column, "key", None) or str(column) if key in select_column_keys: continue select_column_keys.add(key) select_columns.append(column) for column in current_group_columns: key = str(column) if key in group_column_keys: continue group_column_keys.add(key) group_columns.append(column) return select_columns, group_columns def _row_identity(group_by: str, row: Any) -> UsageIdentity: if group_by == USAGE_GROUP_BY_MODEL: model_id = getattr(row, "group_model_id", None) provider_id = getattr(row, "group_provider_id", None) current = None if model_id is not None or provider_id is not None: current = UsageIdentityCurrent( model_id=model_id, provider_id=provider_id, ) return UsageIdentity( value=UsageIdentityValue( cluster_name=getattr(row, "group_cluster_name", None), model_name=getattr(row, "group_model_name", None), provider_name=getattr(row, "group_provider_name", None), provider_type=getattr(row, "group_provider_type", None), ), current=current, ) if group_by == USAGE_GROUP_BY_USER: user_id = getattr(row, "group_user_id", None) return UsageIdentity( value=UsageIdentityValue(user_name=getattr(row, "group_user_name", None)), current=None if user_id is None else UsageIdentityCurrent(user_id=user_id), ) api_key_id = getattr(row, "group_api_key_id", None) current = None if api_key_id is not None: current = UsageIdentityCurrent( user_id=getattr(row, "group_user_id", None), api_key_id=api_key_id, ) return UsageIdentity( value=UsageIdentityValue( user_name=getattr(row, "group_user_name", None), api_key_name=getattr(row, "group_api_key_name", None), access_key=getattr(row, "group_access_key", None), api_key_is_custom=getattr(row, "group_api_key_is_custom", None), ), current=current, ) def _row_date_dimension(row: Any, granularity: str) -> UsageBreakdownDateDimension: value = _coerce_date(row.group_date) return UsageBreakdownDateDimension( value=value, label=format_usage_date_label(value, granularity) ) def _coerce_date(value: Any) -> date: if isinstance(value, datetime): return value.date() if isinstance(value, date): return value if isinstance(value, str): return date.fromisoformat(value[:10]) return value def _row_dimension(group_by: str, row: Any) -> UsageBreakdownDimension: if group_by == USAGE_GROUP_BY_API_KEY and not getattr( row, "group_api_key_name", None ): return UsageBreakdownDimension( identity=None, label="-", deleted=False, ) identity = _row_identity(group_by, row) return UsageBreakdownDimension( identity=identity, label=_identity_label(group_by, identity), deleted=_identity_deleted(identity), ) def _identity_deleted(identity: UsageIdentity) -> bool: return identity.current is None def _identity_label(group_by: str, identity: UsageIdentity) -> str: value = identity.value if group_by == USAGE_GROUP_BY_MODEL: label = format_usage_model_label( model_name=value.model_name, cluster_name=value.cluster_name, provider_name=value.provider_name, ) elif group_by == USAGE_GROUP_BY_USER: label = format_usage_user_label(value.user_name) else: label = format_usage_api_key_label( user_name=value.user_name, api_key_name=value.api_key_name, ) if _identity_deleted(identity): label = f"{label} (Deleted)" return label def _filter_condition(group_by: str, item: UsageFilterItem): value = item.identity.value current = item.identity.current conditions = [] if group_by == USAGE_GROUP_BY_MODEL: conditions.extend( [ _null_safe_column_filter(ModelUsage.cluster_name, value.cluster_name), _null_safe_column_filter(ModelUsage.model_name, value.model_name), _null_safe_column_filter(ModelUsage.provider_name, value.provider_name), _null_safe_column_filter(ModelUsage.provider_type, value.provider_type), ] ) if current is None or current.model_id is None: conditions.append(ModelUsage.model_id.is_(None)) else: conditions.append(ModelUsage.model_id == current.model_id) if current is None or current.provider_id is None: conditions.append(ModelUsage.provider_id.is_(None)) else: conditions.append(ModelUsage.provider_id == current.provider_id) elif group_by == USAGE_GROUP_BY_USER: conditions.append( _null_safe_column_filter(ModelUsage.user_name, value.user_name) ) if current is None or current.user_id is None: conditions.append(ModelUsage.user_id.is_(None)) else: conditions.append(ModelUsage.user_id == current.user_id) else: conditions.extend( [ _null_safe_column_filter(ModelUsage.user_name, value.user_name), _null_safe_column_filter(ModelUsage.api_key_name, value.api_key_name), _null_safe_column_filter(ModelUsage.access_key, value.access_key), ] ) if value.api_key_is_custom is not None: conditions.append( _null_safe_column_filter( ModelUsage.api_key_is_custom, value.api_key_is_custom ) ) if current is None or current.api_key_id is None: conditions.append(ModelUsage.api_key_id.is_(None)) else: conditions.append(ModelUsage.api_key_id == current.api_key_id) conditions.append( _null_safe_column_filter(ModelUsage.user_id, current.user_id) ) return and_(*conditions) def _can_use_all_scope(user: User, ctx) -> bool: """The cross-user (``all``) view is meaningful for platform admin and real Org admin only. Others fall back to ``self``. Personal Org admin doesn't qualify even though their ``org_role`` is ADMIN: a Personal Org has exactly one member, so ``all`` would just be ``self`` — and worse, the org_id filter applied for ``all`` would hide the user's own usage from Platform-shared models, since those rows carry the model owner's owner_principal_id, not the Personal Org's. """ if user.is_admin: return True if ctx is None: return False if ctx.org_role != OrgRole.ADMIN: return False return not ctx.current_is_personal_scope def _resolve_effective_scope(user: User, ctx, requested_scope: str) -> str: """Map requested scope onto what the caller actually gets. - ``self`` always allowed. - ``all`` allowed for managers / admin; non-managers asking for ``all`` are silently downgraded to ``self`` (the request default is ``all``, so a regular user with no explicit scope shouldn't hit a 403). Privacy-sensitive group_bys (e.g. ``user``) are rejected later in ``_check_permission`` once the effective scope is locked. """ if requested_scope == USAGE_SCOPE_SELF: return USAGE_SCOPE_SELF if not _can_use_all_scope(user, ctx): return USAGE_SCOPE_SELF return USAGE_SCOPE_ALL def _apply_usage_scope_and_filters( statement: Select, *, user: User, filters, scope: str, org_id: Optional[int] = None, ) -> Select: # ``self`` view always restricts to the caller's own rows. ``all`` # view restricts by owner_principal_id when one is in context (platform # admin in cross-org "All" mode has org_id=None and sees everything). if scope == USAGE_SCOPE_SELF: statement = statement.where(ModelUsage.user_id == user.id) elif org_id is not None: statement = statement.where(ModelUsage.owner_principal_id == org_id) for group_by, items in [ (USAGE_GROUP_BY_MODEL, filters.models), (USAGE_GROUP_BY_USER, filters.users), (USAGE_GROUP_BY_API_KEY, filters.api_keys), ]: if not items: continue if group_by == USAGE_GROUP_BY_USER and scope == USAGE_SCOPE_SELF: raise ForbiddenException(message="No permission to filter by user") statement = statement.where( or_(*[_filter_condition(group_by, item) for item in items]) ) return statement def _base_statement() -> Select: return select().select_from(ModelUsage) def _date_scoped_statement( statement: Select, start_date: date, end_date: date ) -> Select: return statement.where(ModelUsage.date >= start_date).where( ModelUsage.date <= end_date ) def _exclude_incomplete_api_key_identity(statement: Select) -> Select: return ( statement.where(ModelUsage.api_key_name.is_not(None)) .where(ModelUsage.api_key_name != "") .where(ModelUsage.access_key.is_not(None)) .where(ModelUsage.access_key != "") ) async def _get_rows(session, statement: Select): return (await session.exec(statement)).all() async def _get_first_row(session, statement: Select): rows = await _get_rows(session, statement) return rows[0] if rows else None def _summary_from_row(row: Any) -> UsageSummary: if row is None: return UsageSummary() return UsageSummary( input_tokens=int(getattr(row, USAGE_METRIC_INPUT_TOKENS, 0) or 0), output_tokens=int(getattr(row, USAGE_METRIC_OUTPUT_TOKENS, 0) or 0), input_cached_tokens=int(getattr(row, USAGE_METRIC_INPUT_CACHED_TOKENS, 0) or 0), total_tokens=int(getattr(row, USAGE_METRIC_TOTAL_TOKENS, 0) or 0), api_requests=int(getattr(row, USAGE_METRIC_API_REQUESTS, 0) or 0), models_called=int(getattr(row, USAGE_METRIC_MODELS_CALLED, 0) or 0), ) def _option_from_identity(group_by: str, identity: UsageIdentity) -> UsageFilterOption: return UsageFilterOption( identity=identity, label=_identity_label(group_by, identity), deleted=_identity_deleted(identity), ) async def _get_filter_options( session, *, base_statement: Select, group_by: str, ) -> List[UsageFilterOption]: select_columns, group_columns = _group_columns(group_by) if group_by == USAGE_GROUP_BY_API_KEY: base_statement = _exclude_incomplete_api_key_identity(base_statement) rows = await _get_rows( session, base_statement.with_only_columns(*select_columns) .distinct() .order_by(*group_columns), ) return [ _option_from_identity(group_by, _row_identity(group_by, row)) for row in rows ] @router.get("/meta", response_model=UsageMetaResponse, response_model_exclude_none=True) async def get_usage_meta( session: SessionDep, user: CurrentUserDep, ctx: TenantContextDep, scope: str = USAGE_SCOPE_ALL, ): # Default scope is "all"; downgrades to "self" for non-managers so # the page works without an explicit scope param. if scope == USAGE_SCOPE_ALL and not _can_use_all_scope(user, ctx): scope = USAGE_SCOPE_SELF elif scope not in (USAGE_SCOPE_SELF, USAGE_SCOPE_ALL): raise InvalidException(message=f"Unsupported scope: {scope}") base_statement = _base_statement() if scope == USAGE_SCOPE_SELF: base_statement = base_statement.where(ModelUsage.user_id == user.id) elif ctx.current_principal_id is not None: base_statement = base_statement.where( ModelUsage.owner_principal_id == ctx.current_principal_id ) model_options = await _get_filter_options( session, base_statement=base_statement, group_by=USAGE_GROUP_BY_MODEL ) user_options: List[UsageFilterOption] = [] if scope == USAGE_SCOPE_ALL: user_options = await _get_filter_options( session, base_statement=base_statement, group_by=USAGE_GROUP_BY_USER ) api_key_options = await _get_filter_options( session, base_statement=base_statement, group_by=USAGE_GROUP_BY_API_KEY ) return UsageMetaResponse( metrics=METRIC_OPTIONS, granularities=GRANULARITY_OPTIONS, group_bys=( ADMIN_GROUP_BY_OPTIONS if scope == USAGE_SCOPE_ALL else SELF_GROUP_BY_OPTIONS ), filters=UsageFilters( models=model_options, users=user_options, api_keys=api_key_options, ), ) def _check_permission(user: User, ctx, request, effective_scope: str) -> None: """``mine`` view forbids the user-grouping / user-filter dimensions (privacy: a user can only see their own rows). ``org`` view allows them since the caller is admin / owner / manager.""" group_by = request.group_by group_bys = group_by if isinstance(group_by, list) else [group_by] if effective_scope == USAGE_SCOPE_SELF: if USAGE_GROUP_BY_USER in group_bys: raise ForbiddenException(message="No permission to group by user") if request.filters.users: raise ForbiddenException(message="No permission to filter by user") def _sort_expression(sort_by: str, metric_columns: Dict[str, Any], date_sort_expr=None): if sort_by == USAGE_METRIC_DATE: return date_sort_expr if date_sort_expr is not None else ModelUsage.date if sort_by == USAGE_METRIC_AVG_TOKENS_PER_REQUEST: return metric_columns[USAGE_METRIC_TOTAL_TOKENS] / func.nullif( metric_columns[USAGE_METRIC_API_REQUESTS], 0 ) if sort_by == USAGE_METRIC_API_KEYS_USED: return func.count(func.distinct(ModelUsage.access_key)) if sort_by == USAGE_METRIC_LAST_ACTIVE: return func.max(ModelUsage.date) if sort_by in metric_columns: return metric_columns[sort_by] return metric_columns[USAGE_METRIC_TOTAL_TOKENS] def _order_expression( order_by: List[tuple[str, str]], metric_columns: Dict[str, Any], date_sort_expr=None ): if not order_by: order_by = [(USAGE_METRIC_TOTAL_TOKENS, USAGE_SORT_DESC)] sort_exprs = [] for sort_by, direction in order_by: sort_expr = _sort_expression(sort_by, metric_columns, date_sort_expr) sort_exprs.append( desc(sort_expr) if direction == USAGE_SORT_DESC else asc(sort_expr) ) return sort_exprs def _row_count_value(row: Any) -> int: if row is None: return 0 if isinstance(row, tuple): return int(row[0] or 0) return int(row or 0) def _single_group_by(group_bys: List[str], group_by: str) -> bool: return len(group_bys) == 1 and group_bys[0] == group_by def _breakdown_bucket_granularity(request: UsageBreakdownRequest) -> str: if USAGE_GROUP_BY_DATE not in request.group_by: return USAGE_GRANULARITY_DAY return request.granularity or USAGE_GRANULARITY_DAY def _build_breakdown_item( group_bys: List[str], row: Any, granularity: str ) -> UsageBreakdownItem: api_requests = int(getattr(row, USAGE_METRIC_API_REQUESTS, 0) or 0) total_tokens = int(getattr(row, USAGE_METRIC_TOTAL_TOKENS, 0) or 0) breakdown_item = UsageBreakdownItem( input_tokens=int(getattr(row, USAGE_METRIC_INPUT_TOKENS, 0) or 0), output_tokens=int(getattr(row, USAGE_METRIC_OUTPUT_TOKENS, 0) or 0), input_cached_tokens=int(getattr(row, USAGE_METRIC_INPUT_CACHED_TOKENS, 0) or 0), total_tokens=total_tokens, api_requests=api_requests, avg_tokens_per_request=total_tokens / api_requests if api_requests else 0, last_active=getattr(row, USAGE_METRIC_LAST_ACTIVE, None), ) if USAGE_GROUP_BY_DATE in group_bys: breakdown_item.date = _row_date_dimension(row, granularity) if USAGE_GROUP_BY_MODEL in group_bys: breakdown_item.model = _row_dimension(USAGE_GROUP_BY_MODEL, row) if USAGE_GROUP_BY_USER in group_bys: breakdown_item.user = _row_dimension(USAGE_GROUP_BY_USER, row) if USAGE_GROUP_BY_API_KEY in group_bys: breakdown_item.api_key = _row_dimension(USAGE_GROUP_BY_API_KEY, row) if _single_group_by(group_bys, USAGE_GROUP_BY_USER): breakdown_item.models_called = int( getattr(row, USAGE_METRIC_MODELS_CALLED, 0) or 0 ) breakdown_item.api_keys_used = int( getattr(row, USAGE_METRIC_API_KEYS_USED, 0) or 0 ) elif _single_group_by(group_bys, USAGE_GROUP_BY_API_KEY): breakdown_item.models_called = int( getattr(row, USAGE_METRIC_MODELS_CALLED, 0) or 0 ) return breakdown_item @router.post( "/breakdown", response_model=UsageBreakdownResponse, response_model_exclude_none=True, ) async def get_usage_breakdown( session: SessionDep, user: CurrentUserDep, ctx: TenantContextDep, request: UsageBreakdownRequest, ): effective_scope = _resolve_effective_scope(user, ctx, request.scope) _check_permission(user, ctx, request, effective_scope) metric_columns = _metric_columns() base_statement = _apply_usage_scope_and_filters( _base_statement(), user=user, filters=request.filters, scope=effective_scope, org_id=ctx.current_principal_id, ) if _single_group_by(request.group_by, USAGE_GROUP_BY_API_KEY): base_statement = _exclude_incomplete_api_key_identity(base_statement) scoped_statement = _date_scoped_statement( base_statement, request.start_date, request.end_date ) summary_columns = [metric_columns[item].label(item) for item in metric_columns] summary_row = await _get_first_row( session, scoped_statement.with_only_columns(*summary_columns) ) granularity = _breakdown_bucket_granularity(request) date_bucket_expr = _date_bucket_expression(session, granularity) select_columns, group_columns = _combined_group_columns( request.group_by, date_bucket_expr=date_bucket_expr ) aggregate_columns = [ metric_columns[USAGE_METRIC_INPUT_TOKENS].label(USAGE_METRIC_INPUT_TOKENS), metric_columns[USAGE_METRIC_OUTPUT_TOKENS].label(USAGE_METRIC_OUTPUT_TOKENS), metric_columns[USAGE_METRIC_INPUT_CACHED_TOKENS].label( USAGE_METRIC_INPUT_CACHED_TOKENS ), metric_columns[USAGE_METRIC_TOTAL_TOKENS].label(USAGE_METRIC_TOTAL_TOKENS), metric_columns[USAGE_METRIC_API_REQUESTS].label(USAGE_METRIC_API_REQUESTS), metric_columns[USAGE_METRIC_MODELS_CALLED].label(USAGE_METRIC_MODELS_CALLED), func.count(func.distinct(ModelUsage.access_key)).label( USAGE_METRIC_API_KEYS_USED ), func.max(ModelUsage.date).label(USAGE_METRIC_LAST_ACTIVE), ] grouped_statement = scoped_statement.with_only_columns( *select_columns, *aggregate_columns ).group_by(*group_columns) count_statement = select(func.count()).select_from(grouped_statement.subquery()) date_sort_expr = ( date_bucket_expr if USAGE_GROUP_BY_DATE in request.group_by else func.max(ModelUsage.date) ) sort_exprs = _order_expression(request.order_by, metric_columns, date_sort_expr) items_statement = ( grouped_statement.order_by(*sort_exprs) .offset((request.page - 1) * request.perPage) .limit(request.perPage) ) total = _row_count_value(await _get_first_row(session, count_statement)) item_rows = await _get_rows(session, items_statement) return UsageBreakdownResponse( summary=_summary_from_row(summary_row), group_by=request.group_by, granularity=granularity if USAGE_GROUP_BY_DATE in request.group_by else None, pagination=Pagination( page=request.page, perPage=request.perPage, total=total, totalPage=ceil(total / request.perPage) if total else 0, ), items=[ _build_breakdown_item(request.group_by, row, granularity) for row in item_rows ], )