import json import logging import os import sys from typing import Dict, List, Optional from gpustack.client.generated_clientset import ClientSet from gpustack.config.config import Config, set_global_config from gpustack.config.registration import read_worker_token from gpustack.envs import BENCHMARK_DATASET_SHAREGPT_PATH, BENCHMARK_REQUEST_TIMEOUT from gpustack.logging import setup_logging from gpustack.schemas.benchmark import ( DATASET_RANDOM, DATASET_SHAREGPT, Benchmark, BenchmarkDeploymentMetadata, BenchmarkStateEnum, ModelInstanceSnapshot, ) from gpustack.utils.command import find_bool_parameter from gpustack.utils.config import apply_registry_override_to_image from gpustack.utils.envs import filter_env_vars, sanitize_env from gpustack_runtime.logging import setup_logging as setup_runtime_logging from gpustack_runtime import envs as runtime_envs from gpustack_runtime.deployer import ContainerMount from gpustack_runtime.deployer import ( Container, ContainerEnv, ContainerExecution, ContainerProfileEnum, WorkloadPlan, create_workload, ContainerRestartPolicyEnum, ) from gpustack.utils.profiling import time_decorator from gpustack.utils.runtime import transform_workload_plan logger = logging.getLogger(__name__) class BenchmarkRunner: _clientset: ClientSet _config: Config _benchmark: Benchmark _model_path: str _model_endpoint: str _model_backend_parameters: Optional[List[str]] _api_url: str _api_key: str _benchmark_dir: Optional[str] _fallback_registry: Optional[str] = None """The fallback container registry to use if needed.""" @time_decorator def __init__( self, clientset: ClientSet, benchmark: Benchmark, cfg: Config, fallback_registry: Optional[str] = None, ): setup_logging(debug=cfg.debug) setup_runtime_logging() set_global_config(cfg) try: self._clientset = clientset self._benchmark = benchmark self._config = cfg self._fallback_registry = fallback_registry if ( benchmark.snapshot is None or benchmark.snapshot.instances is None or len(benchmark.snapshot.instances) == 0 or benchmark.snapshot.instances.get(benchmark.model_instance_name) is None ): raise ValueError( f"Benchmark {benchmark.name}(id={benchmark.id}) has no snapshot for model instance {benchmark.model_instance_name}" ) instance_snapshot: ModelInstanceSnapshot = benchmark.snapshot.instances.get( benchmark.model_instance_name ) if instance_snapshot.resolved_path is None: raise ValueError( f"Benchmark {benchmark.name}(id={benchmark.id}) snapshot for model instance {benchmark.model_instance_name} has no resolved path" ) if instance_snapshot.worker_ip is None: raise ValueError( f"Benchmark {benchmark.name}(id={benchmark.id}) snapshot for model instance {benchmark.model_instance_name} has no worker IP" ) if instance_snapshot.ports is None or len(instance_snapshot.ports) == 0: raise ValueError( f"Benchmark {benchmark.name}(id={benchmark.id}) snapshot for model instance {benchmark.model_instance_name} has no ports" ) self._benchmark_dir = self._config.benchmark_dir self._model_path = instance_snapshot.resolved_path self._model_endpoint = f"http://{instance_snapshot.worker_ip}:{instance_snapshot.ports[0] if instance_snapshot.ports else ''}" self._model_backend_parameters = instance_snapshot.backend_parameters _api_key = read_worker_token(self._config.data_dir) if _api_key is None: raise ValueError( f"Worker token not found for benchmark {benchmark.name}(id={benchmark.id}) progress reporting" ) self._api_key = _api_key _server_url = self._clientset.base_url if not _server_url: raise ValueError( f"Server URL not configured for benchmark {benchmark.name}(id={benchmark.id}) progress reporting" ) self._api_url = ( f"{_server_url.rstrip('/')}/v2/benchmarks/{self._benchmark.id}/state" ) except Exception as e: error_message = f"Failed to initialize: {e}" logger.error(error_message) try: patch_dict = { "state_message": error_message, "state": BenchmarkStateEnum.ERROR, } self._update_benchmark_state(benchmark.id, **patch_dict) except Exception as ue: logger.error( f"Failed to update benchmark {benchmark.name}(id={benchmark.id}) state: {ue}" ) sys.exit(1) def start(self): deployment_metadata = self._benchmark.get_deployment_metadata() env = {} if not runtime_envs.GPUSTACK_RUNTIME_DEPLOY_MIRRORED_DEPLOYMENT: env = filter_env_vars(os.environ) command_args = self._build_command_args() self._create_workload( deployment_metadata=deployment_metadata, command=["benchmark-runner"], command_args=command_args, env=env, ) def _create_workload( self, deployment_metadata: BenchmarkDeploymentMetadata, command: Optional[List[str]], command_args: List[str], env: Dict[str, str], ): image = apply_registry_override_to_image( self._config, self._config.benchmark_image_repo, self._fallback_registry ) if not image: raise ValueError("Failed to get image for benchmark runner workload") mounts = self._get_configured_mounts() run_container = Container( image=image, name="default", profile=ContainerProfileEnum.RUN, restart_policy=ContainerRestartPolicyEnum.NEVER, execution=ContainerExecution( privileged=True, args=command_args, ), envs=[ ContainerEnv( name=name, value=value, ) for name, value in env.items() ], mounts=mounts, ) logger.info( f"Creating benchmark container workload: {deployment_metadata.name}" ) logger.info( f"With image: {image}, " f"command: [{' '.join(command) if command else ''}], " f"arguments: [{' '.join(str(arg) for arg in command_args)}], " f"envs(inconsistent input items mean unchangeable):{os.linesep}" f"{os.linesep.join(f'{k}={v}' for k, v in sorted(sanitize_env(env).items()))}" ) workload_plan = WorkloadPlan( name=deployment_metadata.name, host_network=True, shm_size=10 * 1 << 30, # 10 GiB containers=[run_container], labels=deployment_metadata.labels, ) create_workload( transform_workload_plan( self._config, workload_plan, self._fallback_registry ) ) logger.info(f"Created benchmark container workload: {deployment_metadata.name}") def _build_command_args(self) -> List[str]: backend_kwargs = { "timeout": BENCHMARK_REQUEST_TIMEOUT, "response_handlers": { "chat_completions": "chat_completions_with_reasoning" }, } command_args = [ "benchmark", "run", "--target", self._model_endpoint, "--profile", "constant", "--rate", str(self._benchmark.request_rate), "--sample-requests", "0", "--processor", self._model_path, "--output-dir", f"{self._benchmark_dir}", "--outputs", f"{self._benchmark.id}.dual_json", "--progress-url", self._api_url, "--progress-auth", self._api_key, "--backend-kwargs", json.dumps(backend_kwargs), "--backend", "openai_http_error_detail", ] if find_bool_parameter(self._model_backend_parameters, ["trust-remote-code"]): command_args.extend( [ "--processor-args", json.dumps({"trust_remote_code": True}), ] ) if self._benchmark.dataset_name == DATASET_SHAREGPT: data = BENCHMARK_DATASET_SHAREGPT_PATH command_args.extend(["--data", data]) elif ( self._benchmark.dataset_name == DATASET_RANDOM and self._benchmark.dataset_input_tokens is not None and self._benchmark.dataset_output_tokens is not None ): data = f"prompt_tokens={self._benchmark.dataset_input_tokens},output_tokens={self._benchmark.dataset_output_tokens}" command_args.extend(["--data", data]) if self._benchmark.dataset_seed is not None: command_args.extend( [ "--random-seed", f"{self._benchmark.dataset_seed}", ] ) if ( self._benchmark.total_requests is not None and self._benchmark.total_requests > 0 ): command_args.extend( [ "--max-requests", f"{self._benchmark.total_requests}", ] ) return command_args def _update_benchmark_state(self, id: int, **kwargs): resp = self._clientset.http_client.get_httpx_client().patch( "/benchmarks/{id}/state".format(id=id), json=kwargs ) resp.raise_for_status() def _get_configured_mounts(self) -> List[ContainerMount]: """ Get the volume mounts for the model instance. If runtime mirrored deployment is enabled, no mounts will be set up. Returns: A list of ContainerMount objects for the model instance. """ mounts: List[ContainerMount] = [] if ( self._model_path and self._benchmark_dir and not runtime_envs.GPUSTACK_RUNTIME_DEPLOY_MIRRORED_DEPLOYMENT ): model_dir = os.path.dirname(self._model_path) mounts.extend( [ ContainerMount( path=model_dir, ), ContainerMount( path=self._benchmark_dir, ), ] ) return mounts