| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756 |
- from datetime import date, datetime
- from math import ceil
- from typing import Any, Dict, List, Optional
- from fastapi import APIRouter
- from sqlalchemy import Date, Select, String, and_, asc, cast, desc, literal
- from sqlmodel import func, or_, select
- from gpustack.api.exceptions import ForbiddenException, InvalidException
- from gpustack.schemas.model_usage import ModelUsage
- from gpustack.schemas.users import User
- from gpustack.schemas.principals import OrgRole
- from gpustack.schemas.usage import (
- USAGE_GRANULARITY_MONTH,
- USAGE_GRANULARITY_DAY,
- USAGE_GRANULARITY_WEEK,
- USAGE_GROUP_BY_API_KEY,
- USAGE_GROUP_BY_DATE,
- USAGE_GROUP_BY_MODEL,
- USAGE_GROUP_BY_USER,
- USAGE_SCOPE_ALL,
- USAGE_SCOPE_SELF,
- USAGE_METRIC_API_KEYS_USED,
- USAGE_METRIC_API_REQUESTS,
- USAGE_METRIC_AVG_TOKENS_PER_REQUEST,
- USAGE_METRIC_INPUT_CACHED_TOKENS,
- USAGE_METRIC_DATE,
- USAGE_METRIC_INPUT_TOKENS,
- USAGE_METRIC_LAST_ACTIVE,
- USAGE_METRIC_MODELS_CALLED,
- USAGE_METRIC_OUTPUT_TOKENS,
- USAGE_METRIC_TOTAL_TOKENS,
- USAGE_SORT_DESC,
- UsageBreakdownDateDimension,
- UsageBreakdownDimension,
- UsageBreakdownItem,
- UsageBreakdownRequest,
- UsageBreakdownResponse,
- UsageFilterItem,
- UsageFilterOption,
- UsageFilters,
- UsageIdentity,
- UsageIdentityCurrent,
- UsageIdentityValue,
- UsageMetaResponse,
- UsageOption,
- UsageSummary,
- )
- from gpustack.schemas.common import Pagination
- from gpustack.server.deps import CurrentUserDep, SessionDep, TenantContextDep
- from gpustack.utils.usage_snapshots import (
- format_usage_api_key_label,
- format_usage_date_label,
- format_usage_model_label,
- format_usage_user_label,
- )
- router = APIRouter()
- METRIC_OPTIONS = [
- UsageOption(key=USAGE_METRIC_INPUT_TOKENS, label="Input Tokens"),
- UsageOption(key=USAGE_METRIC_OUTPUT_TOKENS, label="Output Tokens"),
- UsageOption(key=USAGE_METRIC_INPUT_CACHED_TOKENS, label="Input Cached Tokens"),
- UsageOption(key=USAGE_METRIC_TOTAL_TOKENS, label="Total Tokens"),
- UsageOption(key=USAGE_METRIC_API_REQUESTS, label="API Requests"),
- ]
- GRANULARITY_OPTIONS = [
- UsageOption(key=USAGE_GRANULARITY_DAY, label="Day"),
- UsageOption(key=USAGE_GRANULARITY_WEEK, label="Week"),
- UsageOption(key=USAGE_GRANULARITY_MONTH, label="Month"),
- ]
- SELF_GROUP_BY_OPTIONS = [
- UsageOption(key=USAGE_GROUP_BY_DATE, label="Date"),
- UsageOption(key=USAGE_GROUP_BY_MODEL, label="Model"),
- UsageOption(key=USAGE_GROUP_BY_API_KEY, label="API Key"),
- ]
- ADMIN_GROUP_BY_OPTIONS = [
- UsageOption(key=USAGE_GROUP_BY_DATE, label="Date"),
- UsageOption(key=USAGE_GROUP_BY_MODEL, label="Model"),
- UsageOption(key=USAGE_GROUP_BY_USER, label="User"),
- UsageOption(key=USAGE_GROUP_BY_API_KEY, label="API Key"),
- ]
- def _null_safe_column_filter(column, value: Any):
- if value is None:
- return column.is_(None)
- return column == value
- def _model_key_expression():
- return (
- func.coalesce(ModelUsage.cluster_name + literal("/"), literal(""))
- + ModelUsage.model_name
- )
- def _model_identity_expression():
- """Distinct identity for counting models called.
- Include current IDs and snapshot fields so direct deployments, provider
- models, and deleted historical records with the same model name remain
- separate identities.
- """
- return (
- func.coalesce(cast(ModelUsage.model_id, String), literal("deleted"))
- + literal("|")
- + func.coalesce(cast(ModelUsage.provider_id, String), literal("no-provider"))
- + literal("|")
- + func.coalesce(ModelUsage.provider_name, literal(""))
- + literal("|")
- + func.coalesce(ModelUsage.provider_type, literal(""))
- + literal("|")
- + _model_key_expression()
- )
- def _metric_columns() -> Dict[str, Any]:
- model_identity = _model_identity_expression()
- return {
- USAGE_METRIC_INPUT_TOKENS: func.coalesce(
- func.sum(ModelUsage.prompt_token_count), 0
- ),
- USAGE_METRIC_OUTPUT_TOKENS: func.coalesce(
- func.sum(ModelUsage.completion_token_count), 0
- ),
- USAGE_METRIC_INPUT_CACHED_TOKENS: func.coalesce(
- func.sum(ModelUsage.prompt_cached_token_count), 0
- ),
- USAGE_METRIC_TOTAL_TOKENS: func.coalesce(
- func.sum(ModelUsage.prompt_token_count + ModelUsage.completion_token_count),
- 0,
- ),
- USAGE_METRIC_API_REQUESTS: func.coalesce(func.sum(ModelUsage.request_count), 0),
- USAGE_METRIC_MODELS_CALLED: func.count(func.distinct(model_identity)),
- }
- def _date_bucket_expression(session, granularity: str):
- if granularity == USAGE_GRANULARITY_DAY:
- return ModelUsage.date
- dialect = session.get_bind().dialect.name
- if granularity == USAGE_GRANULARITY_WEEK:
- if dialect == "postgresql":
- return cast(func.date_trunc("week", ModelUsage.date), Date)
- if dialect == "mysql":
- return func.subdate(ModelUsage.date, func.weekday(ModelUsage.date))
- if granularity == USAGE_GRANULARITY_MONTH:
- if dialect == "postgresql":
- return cast(func.date_trunc("month", ModelUsage.date), Date)
- if dialect == "mysql":
- return func.str_to_date(
- func.date_format(ModelUsage.date, "%Y-%m-01"), "%Y-%m-%d"
- )
- return ModelUsage.date
- def _group_columns(group_by: str, *, date_bucket_expr=None):
- if group_by == USAGE_GROUP_BY_DATE:
- if date_bucket_expr is None:
- date_bucket_expr = ModelUsage.date
- return [date_bucket_expr.label("group_date")], [date_bucket_expr]
- if group_by == USAGE_GROUP_BY_MODEL:
- return [
- ModelUsage.cluster_name.label("group_cluster_name"),
- ModelUsage.model_name.label("group_model_name"),
- ModelUsage.model_id.label("group_model_id"),
- ModelUsage.provider_name.label("group_provider_name"),
- ModelUsage.provider_type.label("group_provider_type"),
- ModelUsage.provider_id.label("group_provider_id"),
- ], [
- ModelUsage.cluster_name,
- ModelUsage.model_name,
- ModelUsage.model_id,
- ModelUsage.provider_name,
- ModelUsage.provider_type,
- ModelUsage.provider_id,
- ]
- if group_by == USAGE_GROUP_BY_USER:
- return [
- ModelUsage.user_name.label("group_user_name"),
- ModelUsage.user_id.label("group_user_id"),
- ], [ModelUsage.user_name, ModelUsage.user_id]
- return [
- ModelUsage.user_name.label("group_user_name"),
- ModelUsage.api_key_name.label("group_api_key_name"),
- ModelUsage.access_key.label("group_access_key"),
- ModelUsage.api_key_is_custom.label("group_api_key_is_custom"),
- ModelUsage.user_id.label("group_user_id"),
- ModelUsage.api_key_id.label("group_api_key_id"),
- ], [
- ModelUsage.user_name,
- ModelUsage.api_key_name,
- ModelUsage.access_key,
- ModelUsage.api_key_is_custom,
- ModelUsage.user_id,
- ModelUsage.api_key_id,
- ]
- def _combined_group_columns(group_bys: List[str], *, date_bucket_expr=None):
- select_columns = []
- select_column_keys = set()
- group_columns = []
- group_column_keys = set()
- for group_by in group_bys:
- current_select_columns, current_group_columns = _group_columns(
- group_by, date_bucket_expr=date_bucket_expr
- )
- for column in current_select_columns:
- key = getattr(column, "key", None) or str(column)
- if key in select_column_keys:
- continue
- select_column_keys.add(key)
- select_columns.append(column)
- for column in current_group_columns:
- key = str(column)
- if key in group_column_keys:
- continue
- group_column_keys.add(key)
- group_columns.append(column)
- return select_columns, group_columns
- def _row_identity(group_by: str, row: Any) -> UsageIdentity:
- if group_by == USAGE_GROUP_BY_MODEL:
- model_id = getattr(row, "group_model_id", None)
- provider_id = getattr(row, "group_provider_id", None)
- current = None
- if model_id is not None or provider_id is not None:
- current = UsageIdentityCurrent(
- model_id=model_id,
- provider_id=provider_id,
- )
- return UsageIdentity(
- value=UsageIdentityValue(
- cluster_name=getattr(row, "group_cluster_name", None),
- model_name=getattr(row, "group_model_name", None),
- provider_name=getattr(row, "group_provider_name", None),
- provider_type=getattr(row, "group_provider_type", None),
- ),
- current=current,
- )
- if group_by == USAGE_GROUP_BY_USER:
- user_id = getattr(row, "group_user_id", None)
- return UsageIdentity(
- value=UsageIdentityValue(user_name=getattr(row, "group_user_name", None)),
- current=None if user_id is None else UsageIdentityCurrent(user_id=user_id),
- )
- api_key_id = getattr(row, "group_api_key_id", None)
- current = None
- if api_key_id is not None:
- current = UsageIdentityCurrent(
- user_id=getattr(row, "group_user_id", None),
- api_key_id=api_key_id,
- )
- return UsageIdentity(
- value=UsageIdentityValue(
- user_name=getattr(row, "group_user_name", None),
- api_key_name=getattr(row, "group_api_key_name", None),
- access_key=getattr(row, "group_access_key", None),
- api_key_is_custom=getattr(row, "group_api_key_is_custom", None),
- ),
- current=current,
- )
- def _row_date_dimension(row: Any, granularity: str) -> UsageBreakdownDateDimension:
- value = _coerce_date(row.group_date)
- return UsageBreakdownDateDimension(
- value=value, label=format_usage_date_label(value, granularity)
- )
- def _coerce_date(value: Any) -> date:
- if isinstance(value, datetime):
- return value.date()
- if isinstance(value, date):
- return value
- if isinstance(value, str):
- return date.fromisoformat(value[:10])
- return value
- def _row_dimension(group_by: str, row: Any) -> UsageBreakdownDimension:
- if group_by == USAGE_GROUP_BY_API_KEY and not getattr(
- row, "group_api_key_name", None
- ):
- return UsageBreakdownDimension(
- identity=None,
- label="-",
- deleted=False,
- )
- identity = _row_identity(group_by, row)
- return UsageBreakdownDimension(
- identity=identity,
- label=_identity_label(group_by, identity),
- deleted=_identity_deleted(identity),
- )
- def _identity_deleted(identity: UsageIdentity) -> bool:
- return identity.current is None
- def _identity_label(group_by: str, identity: UsageIdentity) -> str:
- value = identity.value
- if group_by == USAGE_GROUP_BY_MODEL:
- label = format_usage_model_label(
- model_name=value.model_name,
- cluster_name=value.cluster_name,
- provider_name=value.provider_name,
- )
- elif group_by == USAGE_GROUP_BY_USER:
- label = format_usage_user_label(value.user_name)
- else:
- label = format_usage_api_key_label(
- user_name=value.user_name,
- api_key_name=value.api_key_name,
- )
- if _identity_deleted(identity):
- label = f"{label} (Deleted)"
- return label
- def _filter_condition(group_by: str, item: UsageFilterItem):
- value = item.identity.value
- current = item.identity.current
- conditions = []
- if group_by == USAGE_GROUP_BY_MODEL:
- conditions.extend(
- [
- _null_safe_column_filter(ModelUsage.cluster_name, value.cluster_name),
- _null_safe_column_filter(ModelUsage.model_name, value.model_name),
- _null_safe_column_filter(ModelUsage.provider_name, value.provider_name),
- _null_safe_column_filter(ModelUsage.provider_type, value.provider_type),
- ]
- )
- if current is None or current.model_id is None:
- conditions.append(ModelUsage.model_id.is_(None))
- else:
- conditions.append(ModelUsage.model_id == current.model_id)
- if current is None or current.provider_id is None:
- conditions.append(ModelUsage.provider_id.is_(None))
- else:
- conditions.append(ModelUsage.provider_id == current.provider_id)
- elif group_by == USAGE_GROUP_BY_USER:
- conditions.append(
- _null_safe_column_filter(ModelUsage.user_name, value.user_name)
- )
- if current is None or current.user_id is None:
- conditions.append(ModelUsage.user_id.is_(None))
- else:
- conditions.append(ModelUsage.user_id == current.user_id)
- else:
- conditions.extend(
- [
- _null_safe_column_filter(ModelUsage.user_name, value.user_name),
- _null_safe_column_filter(ModelUsage.api_key_name, value.api_key_name),
- _null_safe_column_filter(ModelUsage.access_key, value.access_key),
- ]
- )
- if value.api_key_is_custom is not None:
- conditions.append(
- _null_safe_column_filter(
- ModelUsage.api_key_is_custom, value.api_key_is_custom
- )
- )
- if current is None or current.api_key_id is None:
- conditions.append(ModelUsage.api_key_id.is_(None))
- else:
- conditions.append(ModelUsage.api_key_id == current.api_key_id)
- conditions.append(
- _null_safe_column_filter(ModelUsage.user_id, current.user_id)
- )
- return and_(*conditions)
- def _can_use_all_scope(user: User, ctx) -> bool:
- """The cross-user (``all``) view is meaningful for platform admin
- and real Org admin only. Others fall back to ``self``.
- Personal Org admin doesn't qualify even though their ``org_role``
- is ADMIN: a Personal Org has exactly one member, so ``all`` would
- just be ``self`` — and worse, the org_id filter applied for ``all``
- would hide the user's own usage from Platform-shared models, since
- those rows carry the model owner's owner_principal_id, not the
- Personal Org's.
- """
- if user.is_admin:
- return True
- if ctx is None:
- return False
- if ctx.org_role != OrgRole.ADMIN:
- return False
- return not ctx.current_is_personal_scope
- def _resolve_effective_scope(user: User, ctx, requested_scope: str) -> str:
- """Map requested scope onto what the caller actually gets.
- - ``self`` always allowed.
- - ``all`` allowed for managers / admin; non-managers asking for
- ``all`` are silently downgraded to ``self`` (the request default
- is ``all``, so a regular user with no explicit scope shouldn't
- hit a 403). Privacy-sensitive group_bys (e.g. ``user``) are
- rejected later in ``_check_permission`` once the effective scope
- is locked.
- """
- if requested_scope == USAGE_SCOPE_SELF:
- return USAGE_SCOPE_SELF
- if not _can_use_all_scope(user, ctx):
- return USAGE_SCOPE_SELF
- return USAGE_SCOPE_ALL
- def _apply_usage_scope_and_filters(
- statement: Select,
- *,
- user: User,
- filters,
- scope: str,
- org_id: Optional[int] = None,
- ) -> Select:
- # ``self`` view always restricts to the caller's own rows. ``all``
- # view restricts by owner_principal_id when one is in context (platform
- # admin in cross-org "All" mode has org_id=None and sees everything).
- if scope == USAGE_SCOPE_SELF:
- statement = statement.where(ModelUsage.user_id == user.id)
- elif org_id is not None:
- statement = statement.where(ModelUsage.owner_principal_id == org_id)
- for group_by, items in [
- (USAGE_GROUP_BY_MODEL, filters.models),
- (USAGE_GROUP_BY_USER, filters.users),
- (USAGE_GROUP_BY_API_KEY, filters.api_keys),
- ]:
- if not items:
- continue
- if group_by == USAGE_GROUP_BY_USER and scope == USAGE_SCOPE_SELF:
- raise ForbiddenException(message="No permission to filter by user")
- statement = statement.where(
- or_(*[_filter_condition(group_by, item) for item in items])
- )
- return statement
- def _base_statement() -> Select:
- return select().select_from(ModelUsage)
- def _date_scoped_statement(
- statement: Select, start_date: date, end_date: date
- ) -> Select:
- return statement.where(ModelUsage.date >= start_date).where(
- ModelUsage.date <= end_date
- )
- def _exclude_incomplete_api_key_identity(statement: Select) -> Select:
- return (
- statement.where(ModelUsage.api_key_name.is_not(None))
- .where(ModelUsage.api_key_name != "")
- .where(ModelUsage.access_key.is_not(None))
- .where(ModelUsage.access_key != "")
- )
- async def _get_rows(session, statement: Select):
- return (await session.exec(statement)).all()
- async def _get_first_row(session, statement: Select):
- rows = await _get_rows(session, statement)
- return rows[0] if rows else None
- def _summary_from_row(row: Any) -> UsageSummary:
- if row is None:
- return UsageSummary()
- return UsageSummary(
- input_tokens=int(getattr(row, USAGE_METRIC_INPUT_TOKENS, 0) or 0),
- output_tokens=int(getattr(row, USAGE_METRIC_OUTPUT_TOKENS, 0) or 0),
- input_cached_tokens=int(getattr(row, USAGE_METRIC_INPUT_CACHED_TOKENS, 0) or 0),
- total_tokens=int(getattr(row, USAGE_METRIC_TOTAL_TOKENS, 0) or 0),
- api_requests=int(getattr(row, USAGE_METRIC_API_REQUESTS, 0) or 0),
- models_called=int(getattr(row, USAGE_METRIC_MODELS_CALLED, 0) or 0),
- )
- def _option_from_identity(group_by: str, identity: UsageIdentity) -> UsageFilterOption:
- return UsageFilterOption(
- identity=identity,
- label=_identity_label(group_by, identity),
- deleted=_identity_deleted(identity),
- )
- async def _get_filter_options(
- session,
- *,
- base_statement: Select,
- group_by: str,
- ) -> List[UsageFilterOption]:
- select_columns, group_columns = _group_columns(group_by)
- if group_by == USAGE_GROUP_BY_API_KEY:
- base_statement = _exclude_incomplete_api_key_identity(base_statement)
- rows = await _get_rows(
- session,
- base_statement.with_only_columns(*select_columns)
- .distinct()
- .order_by(*group_columns),
- )
- return [
- _option_from_identity(group_by, _row_identity(group_by, row)) for row in rows
- ]
- @router.get("/meta", response_model=UsageMetaResponse, response_model_exclude_none=True)
- async def get_usage_meta(
- session: SessionDep,
- user: CurrentUserDep,
- ctx: TenantContextDep,
- scope: str = USAGE_SCOPE_ALL,
- ):
- # Default scope is "all"; downgrades to "self" for non-managers so
- # the page works without an explicit scope param.
- if scope == USAGE_SCOPE_ALL and not _can_use_all_scope(user, ctx):
- scope = USAGE_SCOPE_SELF
- elif scope not in (USAGE_SCOPE_SELF, USAGE_SCOPE_ALL):
- raise InvalidException(message=f"Unsupported scope: {scope}")
- base_statement = _base_statement()
- if scope == USAGE_SCOPE_SELF:
- base_statement = base_statement.where(ModelUsage.user_id == user.id)
- elif ctx.current_principal_id is not None:
- base_statement = base_statement.where(
- ModelUsage.owner_principal_id == ctx.current_principal_id
- )
- model_options = await _get_filter_options(
- session, base_statement=base_statement, group_by=USAGE_GROUP_BY_MODEL
- )
- user_options: List[UsageFilterOption] = []
- if scope == USAGE_SCOPE_ALL:
- user_options = await _get_filter_options(
- session, base_statement=base_statement, group_by=USAGE_GROUP_BY_USER
- )
- api_key_options = await _get_filter_options(
- session, base_statement=base_statement, group_by=USAGE_GROUP_BY_API_KEY
- )
- return UsageMetaResponse(
- metrics=METRIC_OPTIONS,
- granularities=GRANULARITY_OPTIONS,
- group_bys=(
- ADMIN_GROUP_BY_OPTIONS
- if scope == USAGE_SCOPE_ALL
- else SELF_GROUP_BY_OPTIONS
- ),
- filters=UsageFilters(
- models=model_options,
- users=user_options,
- api_keys=api_key_options,
- ),
- )
- def _check_permission(user: User, ctx, request, effective_scope: str) -> None:
- """``mine`` view forbids the user-grouping / user-filter dimensions
- (privacy: a user can only see their own rows). ``org`` view allows
- them since the caller is admin / owner / manager."""
- group_by = request.group_by
- group_bys = group_by if isinstance(group_by, list) else [group_by]
- if effective_scope == USAGE_SCOPE_SELF:
- if USAGE_GROUP_BY_USER in group_bys:
- raise ForbiddenException(message="No permission to group by user")
- if request.filters.users:
- raise ForbiddenException(message="No permission to filter by user")
- def _sort_expression(sort_by: str, metric_columns: Dict[str, Any], date_sort_expr=None):
- if sort_by == USAGE_METRIC_DATE:
- return date_sort_expr if date_sort_expr is not None else ModelUsage.date
- if sort_by == USAGE_METRIC_AVG_TOKENS_PER_REQUEST:
- return metric_columns[USAGE_METRIC_TOTAL_TOKENS] / func.nullif(
- metric_columns[USAGE_METRIC_API_REQUESTS], 0
- )
- if sort_by == USAGE_METRIC_API_KEYS_USED:
- return func.count(func.distinct(ModelUsage.access_key))
- if sort_by == USAGE_METRIC_LAST_ACTIVE:
- return func.max(ModelUsage.date)
- if sort_by in metric_columns:
- return metric_columns[sort_by]
- return metric_columns[USAGE_METRIC_TOTAL_TOKENS]
- def _order_expression(
- order_by: List[tuple[str, str]], metric_columns: Dict[str, Any], date_sort_expr=None
- ):
- if not order_by:
- order_by = [(USAGE_METRIC_TOTAL_TOKENS, USAGE_SORT_DESC)]
- sort_exprs = []
- for sort_by, direction in order_by:
- sort_expr = _sort_expression(sort_by, metric_columns, date_sort_expr)
- sort_exprs.append(
- desc(sort_expr) if direction == USAGE_SORT_DESC else asc(sort_expr)
- )
- return sort_exprs
- def _row_count_value(row: Any) -> int:
- if row is None:
- return 0
- if isinstance(row, tuple):
- return int(row[0] or 0)
- return int(row or 0)
- def _single_group_by(group_bys: List[str], group_by: str) -> bool:
- return len(group_bys) == 1 and group_bys[0] == group_by
- def _breakdown_bucket_granularity(request: UsageBreakdownRequest) -> str:
- if USAGE_GROUP_BY_DATE not in request.group_by:
- return USAGE_GRANULARITY_DAY
- return request.granularity or USAGE_GRANULARITY_DAY
- def _build_breakdown_item(
- group_bys: List[str], row: Any, granularity: str
- ) -> UsageBreakdownItem:
- api_requests = int(getattr(row, USAGE_METRIC_API_REQUESTS, 0) or 0)
- total_tokens = int(getattr(row, USAGE_METRIC_TOTAL_TOKENS, 0) or 0)
- breakdown_item = UsageBreakdownItem(
- input_tokens=int(getattr(row, USAGE_METRIC_INPUT_TOKENS, 0) or 0),
- output_tokens=int(getattr(row, USAGE_METRIC_OUTPUT_TOKENS, 0) or 0),
- input_cached_tokens=int(getattr(row, USAGE_METRIC_INPUT_CACHED_TOKENS, 0) or 0),
- total_tokens=total_tokens,
- api_requests=api_requests,
- avg_tokens_per_request=total_tokens / api_requests if api_requests else 0,
- last_active=getattr(row, USAGE_METRIC_LAST_ACTIVE, None),
- )
- if USAGE_GROUP_BY_DATE in group_bys:
- breakdown_item.date = _row_date_dimension(row, granularity)
- if USAGE_GROUP_BY_MODEL in group_bys:
- breakdown_item.model = _row_dimension(USAGE_GROUP_BY_MODEL, row)
- if USAGE_GROUP_BY_USER in group_bys:
- breakdown_item.user = _row_dimension(USAGE_GROUP_BY_USER, row)
- if USAGE_GROUP_BY_API_KEY in group_bys:
- breakdown_item.api_key = _row_dimension(USAGE_GROUP_BY_API_KEY, row)
- if _single_group_by(group_bys, USAGE_GROUP_BY_USER):
- breakdown_item.models_called = int(
- getattr(row, USAGE_METRIC_MODELS_CALLED, 0) or 0
- )
- breakdown_item.api_keys_used = int(
- getattr(row, USAGE_METRIC_API_KEYS_USED, 0) or 0
- )
- elif _single_group_by(group_bys, USAGE_GROUP_BY_API_KEY):
- breakdown_item.models_called = int(
- getattr(row, USAGE_METRIC_MODELS_CALLED, 0) or 0
- )
- return breakdown_item
- @router.post(
- "/breakdown",
- response_model=UsageBreakdownResponse,
- response_model_exclude_none=True,
- )
- async def get_usage_breakdown(
- session: SessionDep,
- user: CurrentUserDep,
- ctx: TenantContextDep,
- request: UsageBreakdownRequest,
- ):
- effective_scope = _resolve_effective_scope(user, ctx, request.scope)
- _check_permission(user, ctx, request, effective_scope)
- metric_columns = _metric_columns()
- base_statement = _apply_usage_scope_and_filters(
- _base_statement(),
- user=user,
- filters=request.filters,
- scope=effective_scope,
- org_id=ctx.current_principal_id,
- )
- if _single_group_by(request.group_by, USAGE_GROUP_BY_API_KEY):
- base_statement = _exclude_incomplete_api_key_identity(base_statement)
- scoped_statement = _date_scoped_statement(
- base_statement, request.start_date, request.end_date
- )
- summary_columns = [metric_columns[item].label(item) for item in metric_columns]
- summary_row = await _get_first_row(
- session, scoped_statement.with_only_columns(*summary_columns)
- )
- granularity = _breakdown_bucket_granularity(request)
- date_bucket_expr = _date_bucket_expression(session, granularity)
- select_columns, group_columns = _combined_group_columns(
- request.group_by, date_bucket_expr=date_bucket_expr
- )
- aggregate_columns = [
- metric_columns[USAGE_METRIC_INPUT_TOKENS].label(USAGE_METRIC_INPUT_TOKENS),
- metric_columns[USAGE_METRIC_OUTPUT_TOKENS].label(USAGE_METRIC_OUTPUT_TOKENS),
- metric_columns[USAGE_METRIC_INPUT_CACHED_TOKENS].label(
- USAGE_METRIC_INPUT_CACHED_TOKENS
- ),
- metric_columns[USAGE_METRIC_TOTAL_TOKENS].label(USAGE_METRIC_TOTAL_TOKENS),
- metric_columns[USAGE_METRIC_API_REQUESTS].label(USAGE_METRIC_API_REQUESTS),
- metric_columns[USAGE_METRIC_MODELS_CALLED].label(USAGE_METRIC_MODELS_CALLED),
- func.count(func.distinct(ModelUsage.access_key)).label(
- USAGE_METRIC_API_KEYS_USED
- ),
- func.max(ModelUsage.date).label(USAGE_METRIC_LAST_ACTIVE),
- ]
- grouped_statement = scoped_statement.with_only_columns(
- *select_columns, *aggregate_columns
- ).group_by(*group_columns)
- count_statement = select(func.count()).select_from(grouped_statement.subquery())
- date_sort_expr = (
- date_bucket_expr
- if USAGE_GROUP_BY_DATE in request.group_by
- else func.max(ModelUsage.date)
- )
- sort_exprs = _order_expression(request.order_by, metric_columns, date_sort_expr)
- items_statement = (
- grouped_statement.order_by(*sort_exprs)
- .offset((request.page - 1) * request.perPage)
- .limit(request.perPage)
- )
- total = _row_count_value(await _get_first_row(session, count_statement))
- item_rows = await _get_rows(session, items_statement)
- return UsageBreakdownResponse(
- summary=_summary_from_row(summary_row),
- group_by=request.group_by,
- granularity=granularity if USAGE_GROUP_BY_DATE in request.group_by else None,
- pagination=Pagination(
- page=request.page,
- perPage=request.perPage,
- total=total,
- totalPage=ceil(total / request.perPage) if total else 0,
- ),
- items=[
- _build_breakdown_item(request.group_by, row, granularity)
- for row in item_rows
- ],
- )
|