| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505 |
- # The data structures in this file are adapted from:
- # https://github.com/vllm-project/guidellm/blob/62b0f8e01f5c558920fd5d02fe828459264b4f87/src/guidellm/benchmark/schemas/generative/report.py#L58
- # Modifications have been made to fit project requirements.
- import json
- import uuid
- from pathlib import Path
- from typing import Generic, Literal, Optional, Self, TypeVar
- from pydantic import BaseModel, Field
- from gpustack.schemas.benchmark import BenchmarkMetrics
- BaseModelT = TypeVar("BaseModelT", bound=BaseModel)
- RegisterClassT = TypeVar("RegisterClassT", bound=type)
- SuccessfulT = TypeVar("SuccessfulT")
- ErroredT = TypeVar("ErroredT")
- IncompleteT = TypeVar("IncompleteT")
- TotalT = TypeVar("TotalT")
- GenerativeRequestType = Literal[
- "text_completions",
- "chat_completions",
- "audio_transcriptions",
- "audio_translations",
- ]
- class StatusBreakdown(BaseModel, Generic[SuccessfulT, ErroredT, IncompleteT, TotalT]):
- """
- Generic model for organizing results by processing status.
- Provides structured categorization of results into successful, errored,
- incomplete, and total status groups. Supports flexible typing for each
- status category to accommodate different result types while maintaining
- consistent organization patterns across the application.
- Example:
- ::
- from guidellm.utils import StatusBreakdown
- # Define a breakdown for request counts
- breakdown = StatusBreakdown[int, int, int, int](
- successful=150,
- errored=5,
- incomplete=10,
- total=165
- )
- """
- successful: SuccessfulT = Field(
- description="Results or metrics for requests with successful completion status",
- default=None, # type: ignore[assignment]
- )
- errored: ErroredT = Field(
- description="Results or metrics for requests with error completion status",
- default=None, # type: ignore[assignment]
- )
- incomplete: IncompleteT = Field(
- description="Results or metrics for requests with incomplete processing status",
- default=None, # type: ignore[assignment]
- )
- total: TotalT = Field(
- description="Aggregated results or metrics combining all status categories",
- default=None, # type: ignore[assignment]
- )
- class SchedulerMetrics(BaseModel):
- """
- Scheduler timing and performance statistics.
- Tracks overall benchmark timing, request counts by status, and detailed internal
- scheduler performance metrics including queue times, processing delays, and
- request execution statistics. Used to analyze scheduler efficiency and identify
- bottlenecks in request processing pipelines.
- """
- # Overall timings for the scheduler
- start_time: float = Field(
- description="Unix timestamp when the benchmark run started"
- )
- request_start_time: float = Field(
- description="Unix timestamp when first request was made"
- )
- measure_start_time: float = Field(
- description="Unix timestamp when measurement period started"
- )
- measure_end_time: float = Field(
- description="Unix timestamp when measurement period ended"
- )
- request_end_time: float = Field(
- description="Unix timestamp when last request completed"
- )
- end_time: float = Field(description="Unix timestamp when the benchmark run ended")
- class Percentiles(BaseModel):
- """
- Standard percentile values for probability distributions.
- Captures key percentile points from 0.1th to 99.9th percentile for comprehensive
- distribution analysis, enabling assessment of central tendency, spread, and tail
- behavior in benchmark metrics.
- """
- p50: float = Field(description="50th percentile (median) value")
- p90: float = Field(description="90th percentile value")
- p95: float = Field(description="95th percentile value")
- p99: float = Field(description="99th percentile value")
- class DistributionSummary(BaseModel):
- """
- Comprehensive statistical summary of a probability distribution.
- Captures central tendency (mean, median, mode), spread (variance, std_dev),
- extrema (min, max), and percentile information with optional probability density
- function. Supports creation from raw values, PDFs, or time-based event data for
- rate and concurrency analysis in benchmark metrics.
- """
- mean: float = Field(description="Mean/average value")
- median: float = Field(description="Median (50th percentile) value")
- min: float = Field(description="Minimum value")
- max: float = Field(description="Maximum value")
- percentiles: Percentiles = Field(description="Standard percentile values")
- class StatusDistributionSummary(
- StatusBreakdown[
- DistributionSummary,
- DistributionSummary,
- DistributionSummary,
- DistributionSummary,
- ]
- ):
- """
- Distribution summaries broken down by request status categories.
- Provides separate statistical analysis for successful, incomplete, and errored
- requests with total aggregate statistics. Enables status-aware performance analysis
- and SLO validation across different request outcomes in benchmark results.
- """
- pass
- class GenerativeMetrics(BaseModel):
- """
- Comprehensive metrics for generative AI benchmarks.
- Aggregates request statistics, token metrics, timing distributions, and
- domain-specific measurements across text, image, video, and audio modalities.
- Provides detailed statistical summaries including distribution analysis for
- throughput, latency, concurrency, and resource utilization metrics across
- successful, incomplete, and errored requests.
- """
- # Request stats
- request_totals: StatusBreakdown[int, int, int, int] = Field(
- description="Request counts by status: successful, incomplete, errored, total"
- )
- requests_per_second: StatusDistributionSummary = Field(
- description="Distribution of requests per second across benchmark execution"
- )
- request_concurrency: StatusDistributionSummary = Field(
- description="Distribution of concurrent request counts during execution"
- )
- request_latency: StatusDistributionSummary = Field(
- description="Distribution of request latencies for completed requests"
- )
- request_streaming_iterations_count: StatusDistributionSummary = Field(
- description="Distribution of stream iterations for completed requests"
- )
- # General token stats
- prompt_token_count: StatusDistributionSummary = Field(
- description="Distribution of prompt token counts by request status"
- )
- output_token_count: StatusDistributionSummary = Field(
- description="Distribution of output token counts by request status"
- )
- total_token_count: StatusDistributionSummary = Field(
- description="Distribution of total token counts by request status"
- )
- time_to_first_token_ms: StatusDistributionSummary = Field(
- description="Distribution of first token latencies in milliseconds"
- )
- time_per_output_token_ms: StatusDistributionSummary = Field(
- description="Distribution of average time per output token in milliseconds"
- )
- inter_token_latency_ms: StatusDistributionSummary = Field(
- description="Distribution of inter-token latencies in milliseconds"
- )
- prompt_tokens_per_second: StatusDistributionSummary = Field(
- description="Distribution of prompt token processing rates"
- )
- output_tokens_per_second: StatusDistributionSummary = Field(
- description="Distribution of output token generation rates"
- )
- tokens_per_second: StatusDistributionSummary = Field(
- description="Distribution of total token throughput including prompt and output"
- )
- output_tokens_per_iteration: StatusDistributionSummary = Field(
- description="Distribution of output tokens generated per streaming iteration"
- )
- iter_tokens_per_iteration: StatusDistributionSummary = Field(
- description=(
- "Distribution of output tokens (without first) generated per "
- "streaming iteration"
- )
- )
- class RequestTimings(BaseModel):
- """
- Timing measurements for tracking request lifecycle events.
- Provides comprehensive timing data for distributed request processing, capturing
- key timestamps from initial targeting through final completion. Essential for
- performance analysis, SLA monitoring, and debugging request processing bottlenecks
- across scheduler workers and backend systems.
- """
- targeted_start: float | None = Field(
- default=None,
- description="Unix timestamp when request was initially targeted for execution",
- )
- queued: float | None = Field(
- default=None,
- description="Unix timestamp when request was placed into processing queue",
- )
- dequeued: float | None = Field(
- default=None,
- description="Unix timestamp when request was removed from queue for processing",
- )
- scheduled_at: float | None = Field(
- default=None,
- description="Unix timestamp when the request was scheduled for processing",
- )
- resolve_start: float | None = Field(
- default=None,
- description="Unix timestamp when backend resolution of the request began",
- )
- request_start: float | None = Field(
- default=None,
- description="Unix timestamp when the backend began processing the request",
- )
- first_request_iteration: float | None = Field(
- default=None,
- )
- first_token_iteration: float | None = Field(
- default=None,
- )
- last_token_iteration: float | None = Field(
- default=None,
- )
- last_request_iteration: float | None = Field(
- default=None,
- )
- request_iterations: int = Field(
- default=0,
- )
- token_iterations: int = Field(
- default=0,
- )
- request_end: float | None = Field(
- default=None,
- description="Unix timestamp when the backend completed processing the request",
- )
- resolve_end: float | None = Field(
- default=None,
- description="Unix timestamp when backend resolution of the request completed",
- )
- finalized: float | None = Field(
- default=None,
- description="Unix timestamp when request was processed by the scheduler",
- )
- class RequestInfo(BaseModel):
- """
- Complete information about a request in the scheduler system.
- Encapsulates all metadata, status tracking, and timing information for requests
- processed through the distributed scheduler. Provides comprehensive lifecycle
- tracking from initial queuing through final completion, including error handling
- and node identification for debugging and performance analysis.
- Example:
- ::
- request = RequestInfo()
- request.status = "in_progress"
- start_time = request.started_at
- completion_time = request.completed_at
- """
- request_id: str = Field(
- description="Unique identifier for the request",
- default_factory=lambda: str(uuid.uuid4()),
- )
- status: Literal[
- "queued", "pending", "in_progress", "completed", "errored", "cancelled"
- ] = Field(description="Current processing status of the request", default="queued")
- scheduler_node_id: int = Field(
- description="ID/rank of the scheduler node handling the request",
- default=-1,
- )
- scheduler_process_id: int = Field(
- description="ID/rank of the node's scheduler process handling the request",
- default=-1,
- )
- scheduler_start_time: float = Field(
- description="Unix timestamp when scheduler processing began",
- default=-1,
- )
- timings: RequestTimings = Field(
- default_factory=RequestTimings,
- description="Timing measurements for the request lifecycle",
- )
- error: str | None = Field(
- default=None, description="Error message if the request status is 'errored'"
- )
- traceback: str | None = Field(
- default=None,
- description="Full traceback of the error if the request status is 'errored'",
- )
- class UsageMetrics(BaseModel):
- """
- Multimodal usage metrics for generation requests.
- Tracks resource consumption across different modalities including text, images,
- video, and audio. Provides granular metrics for tokens, bytes, duration, and
- format-specific measurements to enable comprehensive usage monitoring and billing.
- """
- # Text stats
- text_tokens: int | None = Field(
- default=None, description="Number of text tokens processed/generated."
- )
- text_words: int | None = Field(
- default=None, description="Number of text words processed/generated."
- )
- text_characters: int | None = Field(
- default=None, description="Number of text characters processed/generated."
- )
- class GenerativeRequestStats(BaseModel):
- """
- Request statistics for generative AI text generation workloads.
- Captures comprehensive performance metrics for individual generative requests,
- including token counts, timing measurements, and derived performance statistics.
- Provides computed properties for latency analysis, throughput calculations,
- and token generation metrics essential for benchmark evaluation.
- Example:
- ::
- stats = GenerativeRequestStats(
- request_id="req_123",
- request_type="text_completion",
- info=request_info,
- input_metrics=input_usage,
- output_metrics=output_usage
- )
- throughput = stats.output_tokens_per_second
- """
- type_: Literal["generative_request_stats"] = "generative_request_stats"
- request_id: str = Field(description="Unique identifier for the request")
- request_type: GenerativeRequestType | str = Field(
- description="Type of generative request (text_completion or chat_completion)"
- )
- response_id: str | None = Field(
- default=None, description="Unique identifier matching vLLM Response ID"
- )
- request_args: str | None = Field(
- default=None, description="Backend arguments used for this request"
- )
- output: str | None = Field(
- default=None, description="Generated text output from the request"
- )
- info: RequestInfo = Field(description="Request metadata and timing information")
- input_metrics: UsageMetrics = Field(
- description="Token usage statistics for the input prompt"
- )
- output_metrics: UsageMetrics = Field(
- description="Token usage statistics for the generated output"
- )
- class GenerativeBenchmark(BaseModel):
- """
- Complete generative AI benchmark results with specialized metrics.
- Encapsulates comprehensive performance data from scheduler-driven generative
- workload executions including request-level statistics, token/latency distributions,
- throughput analysis, and concurrency patterns. Provides computed fields for temporal
- analysis and status-grouped request details for detailed post-execution reporting.
- """
- scheduler_metrics: SchedulerMetrics = Field(
- description="Scheduler timing and performance statistics",
- )
- metrics: GenerativeMetrics = Field(
- description="Performance metrics and statistical distributions",
- )
- start_time: float = Field(
- description="Benchmark start time in seconds since epoch",
- )
- end_time: float = Field(
- description="Benchmark end time in seconds since epoch",
- )
- duration: float = Field(
- description="Total benchmark execution duration in seconds",
- )
- requests_truncated: StatusBreakdown[
- list[GenerativeRequestStats],
- list[GenerativeRequestStats],
- list[GenerativeRequestStats],
- None,
- ] = Field(
- default_factory=lambda: StatusBreakdown(
- successful=[],
- errored=[],
- incomplete=[],
- total=None,
- ),
- description=(
- "Request details grouped by status: successful, incomplete, errored"
- ),
- )
- class GenerativeBenchmarksReport(BaseModel):
- """
- Container for multiple benchmark results with load/save functionality.
- Aggregates multiple generative benchmark executions into a single report,
- providing persistence through JSON and YAML file formats. Enables result
- collection, storage, and retrieval across different execution sessions with
- automatic file type detection and path resolution.
- """
- benchmarks: list[GenerativeBenchmark] = Field(
- description="List of completed benchmarks in the report",
- default_factory=list,
- )
- def to_metrics(self) -> Optional[BenchmarkMetrics]:
- """
- Convert the report to a gpustack benchmark metrics object.
- """
- if not self.benchmarks:
- return None
- if self.benchmarks[0].metrics is None:
- return None
- fbm = self.benchmarks[0].metrics
- return BenchmarkMetrics(
- raw_metrics=self.model_dump(),
- requests_per_second_mean=fbm.requests_per_second.successful.mean,
- request_latency_mean=fbm.request_latency.successful.mean,
- time_per_output_token_mean=fbm.time_per_output_token_ms.successful.mean,
- inter_token_latency_mean=fbm.inter_token_latency_ms.successful.mean,
- time_to_first_token_mean=fbm.time_to_first_token_ms.successful.mean,
- tokens_per_second_mean=fbm.tokens_per_second.successful.mean,
- output_tokens_per_second_mean=fbm.output_tokens_per_second.successful.mean,
- input_tokens_per_second_mean=fbm.prompt_tokens_per_second.successful.mean,
- request_concurrency_max=fbm.request_concurrency.successful.max,
- request_concurrency_mean=fbm.request_concurrency.successful.mean,
- request_total=fbm.request_totals.total,
- request_successful=fbm.request_totals.successful,
- request_errored=fbm.request_totals.errored,
- request_incomplete=fbm.request_totals.incomplete,
- )
- @classmethod
- def load_file(cls, path: str) -> Self:
- """
- Load report from JSON or YAML file.
- :param path: File path or directory containing DEFAULT_FILE to load from
- :param type_: File format override ('json' or 'yaml'), auto-detected from
- extension if None
- :return: Loaded report instance with benchmarks and configuration
- :raises ValueError: If file type is unsupported or cannot be determined
- :raises FileNotFoundError: If specified file does not exist
- """
- file_path = Path(path)
- file_type = file_path.suffix.lower()[1:]
- with open(file_path, "r", encoding="utf-8") as metrics_file:
- if file_type == "json":
- model_dict = json.loads(metrics_file.read())
- else:
- raise ValueError(f"Unsupported file type: {file_type} for {file_path}.")
- return cls.model_validate(model_dict)
|