| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 |
- import json
- import logging
- import os
- import sys
- from typing import Dict, List, Optional
- from gpustack.client.generated_clientset import ClientSet
- from gpustack.config.config import Config, set_global_config
- from gpustack.config.registration import read_worker_token
- from gpustack.envs import BENCHMARK_DATASET_SHAREGPT_PATH, BENCHMARK_REQUEST_TIMEOUT
- from gpustack.logging import setup_logging
- from gpustack.schemas.benchmark import (
- DATASET_RANDOM,
- DATASET_SHAREGPT,
- Benchmark,
- BenchmarkDeploymentMetadata,
- BenchmarkStateEnum,
- ModelInstanceSnapshot,
- )
- from gpustack.utils.command import find_bool_parameter
- from gpustack.utils.config import apply_registry_override_to_image
- from gpustack.utils.envs import filter_env_vars, sanitize_env
- from gpustack_runtime.logging import setup_logging as setup_runtime_logging
- from gpustack_runtime import envs as runtime_envs
- from gpustack_runtime.deployer import ContainerMount
- from gpustack_runtime.deployer import (
- Container,
- ContainerEnv,
- ContainerExecution,
- ContainerProfileEnum,
- WorkloadPlan,
- create_workload,
- ContainerRestartPolicyEnum,
- )
- from gpustack.utils.profiling import time_decorator
- from gpustack.utils.runtime import transform_workload_plan
- logger = logging.getLogger(__name__)
- class BenchmarkRunner:
- _clientset: ClientSet
- _config: Config
- _benchmark: Benchmark
- _model_path: str
- _model_endpoint: str
- _model_backend_parameters: Optional[List[str]]
- _api_url: str
- _api_key: str
- _benchmark_dir: Optional[str]
- _fallback_registry: Optional[str] = None
- """The fallback container registry to use if needed."""
- @time_decorator
- def __init__(
- self,
- clientset: ClientSet,
- benchmark: Benchmark,
- cfg: Config,
- fallback_registry: Optional[str] = None,
- ):
- setup_logging(debug=cfg.debug)
- setup_runtime_logging()
- set_global_config(cfg)
- try:
- self._clientset = clientset
- self._benchmark = benchmark
- self._config = cfg
- self._fallback_registry = fallback_registry
- if (
- benchmark.snapshot is None
- or benchmark.snapshot.instances is None
- or len(benchmark.snapshot.instances) == 0
- or benchmark.snapshot.instances.get(benchmark.model_instance_name)
- is None
- ):
- raise ValueError(
- f"Benchmark {benchmark.name}(id={benchmark.id}) has no snapshot for model instance {benchmark.model_instance_name}"
- )
- instance_snapshot: ModelInstanceSnapshot = benchmark.snapshot.instances.get(
- benchmark.model_instance_name
- )
- if instance_snapshot.resolved_path is None:
- raise ValueError(
- f"Benchmark {benchmark.name}(id={benchmark.id}) snapshot for model instance {benchmark.model_instance_name} has no resolved path"
- )
- if instance_snapshot.worker_ip is None:
- raise ValueError(
- f"Benchmark {benchmark.name}(id={benchmark.id}) snapshot for model instance {benchmark.model_instance_name} has no worker IP"
- )
- if instance_snapshot.ports is None or len(instance_snapshot.ports) == 0:
- raise ValueError(
- f"Benchmark {benchmark.name}(id={benchmark.id}) snapshot for model instance {benchmark.model_instance_name} has no ports"
- )
- self._benchmark_dir = self._config.benchmark_dir
- self._model_path = instance_snapshot.resolved_path
- self._model_endpoint = f"http://{instance_snapshot.worker_ip}:{instance_snapshot.ports[0] if instance_snapshot.ports else ''}"
- self._model_backend_parameters = instance_snapshot.backend_parameters
- _api_key = read_worker_token(self._config.data_dir)
- if _api_key is None:
- raise ValueError(
- f"Worker token not found for benchmark {benchmark.name}(id={benchmark.id}) progress reporting"
- )
- self._api_key = _api_key
- _server_url = self._clientset.base_url
- if not _server_url:
- raise ValueError(
- f"Server URL not configured for benchmark {benchmark.name}(id={benchmark.id}) progress reporting"
- )
- self._api_url = (
- f"{_server_url.rstrip('/')}/v2/benchmarks/{self._benchmark.id}/state"
- )
- except Exception as e:
- error_message = f"Failed to initialize: {e}"
- logger.error(error_message)
- try:
- patch_dict = {
- "state_message": error_message,
- "state": BenchmarkStateEnum.ERROR,
- }
- self._update_benchmark_state(benchmark.id, **patch_dict)
- except Exception as ue:
- logger.error(
- f"Failed to update benchmark {benchmark.name}(id={benchmark.id}) state: {ue}"
- )
- sys.exit(1)
- def start(self):
- deployment_metadata = self._benchmark.get_deployment_metadata()
- env = {}
- if not runtime_envs.GPUSTACK_RUNTIME_DEPLOY_MIRRORED_DEPLOYMENT:
- env = filter_env_vars(os.environ)
- command_args = self._build_command_args()
- self._create_workload(
- deployment_metadata=deployment_metadata,
- command=["benchmark-runner"],
- command_args=command_args,
- env=env,
- )
- def _create_workload(
- self,
- deployment_metadata: BenchmarkDeploymentMetadata,
- command: Optional[List[str]],
- command_args: List[str],
- env: Dict[str, str],
- ):
- image = apply_registry_override_to_image(
- self._config, self._config.benchmark_image_repo, self._fallback_registry
- )
- if not image:
- raise ValueError("Failed to get image for benchmark runner workload")
- mounts = self._get_configured_mounts()
- run_container = Container(
- image=image,
- name="default",
- profile=ContainerProfileEnum.RUN,
- restart_policy=ContainerRestartPolicyEnum.NEVER,
- execution=ContainerExecution(
- privileged=True,
- args=command_args,
- ),
- envs=[
- ContainerEnv(
- name=name,
- value=value,
- )
- for name, value in env.items()
- ],
- mounts=mounts,
- )
- logger.info(
- f"Creating benchmark container workload: {deployment_metadata.name}"
- )
- logger.info(
- f"With image: {image}, "
- f"command: [{' '.join(command) if command else ''}], "
- f"arguments: [{' '.join(str(arg) for arg in command_args)}], "
- f"envs(inconsistent input items mean unchangeable):{os.linesep}"
- f"{os.linesep.join(f'{k}={v}' for k, v in sorted(sanitize_env(env).items()))}"
- )
- workload_plan = WorkloadPlan(
- name=deployment_metadata.name,
- host_network=True,
- shm_size=10 * 1 << 30, # 10 GiB
- containers=[run_container],
- labels=deployment_metadata.labels,
- )
- create_workload(
- transform_workload_plan(
- self._config, workload_plan, self._fallback_registry
- )
- )
- logger.info(f"Created benchmark container workload: {deployment_metadata.name}")
- def _build_command_args(self) -> List[str]:
- backend_kwargs = {
- "timeout": BENCHMARK_REQUEST_TIMEOUT,
- "response_handlers": {
- "chat_completions": "chat_completions_with_reasoning"
- },
- }
- command_args = [
- "benchmark",
- "run",
- "--target",
- self._model_endpoint,
- "--profile",
- "constant",
- "--rate",
- str(self._benchmark.request_rate),
- "--sample-requests",
- "0",
- "--processor",
- self._model_path,
- "--output-dir",
- f"{self._benchmark_dir}",
- "--outputs",
- f"{self._benchmark.id}.dual_json",
- "--progress-url",
- self._api_url,
- "--progress-auth",
- self._api_key,
- "--backend-kwargs",
- json.dumps(backend_kwargs),
- "--backend",
- "openai_http_error_detail",
- ]
- if find_bool_parameter(self._model_backend_parameters, ["trust-remote-code"]):
- command_args.extend(
- [
- "--processor-args",
- json.dumps({"trust_remote_code": True}),
- ]
- )
- if self._benchmark.dataset_name == DATASET_SHAREGPT:
- data = BENCHMARK_DATASET_SHAREGPT_PATH
- command_args.extend(["--data", data])
- elif (
- self._benchmark.dataset_name == DATASET_RANDOM
- and self._benchmark.dataset_input_tokens is not None
- and self._benchmark.dataset_output_tokens is not None
- ):
- data = f"prompt_tokens={self._benchmark.dataset_input_tokens},output_tokens={self._benchmark.dataset_output_tokens}"
- command_args.extend(["--data", data])
- if self._benchmark.dataset_seed is not None:
- command_args.extend(
- [
- "--random-seed",
- f"{self._benchmark.dataset_seed}",
- ]
- )
- if (
- self._benchmark.total_requests is not None
- and self._benchmark.total_requests > 0
- ):
- command_args.extend(
- [
- "--max-requests",
- f"{self._benchmark.total_requests}",
- ]
- )
- return command_args
- def _update_benchmark_state(self, id: int, **kwargs):
- resp = self._clientset.http_client.get_httpx_client().patch(
- "/benchmarks/{id}/state".format(id=id), json=kwargs
- )
- resp.raise_for_status()
- def _get_configured_mounts(self) -> List[ContainerMount]:
- """
- Get the volume mounts for the model instance.
- If runtime mirrored deployment is enabled, no mounts will be set up.
- Returns:
- A list of ContainerMount objects for the model instance.
- """
- mounts: List[ContainerMount] = []
- if (
- self._model_path
- and self._benchmark_dir
- and not runtime_envs.GPUSTACK_RUNTIME_DEPLOY_MIRRORED_DEPLOYMENT
- ):
- model_dir = os.path.dirname(self._model_path)
- mounts.extend(
- [
- ContainerMount(
- path=model_dir,
- ),
- ContainerMount(
- path=self._benchmark_dir,
- ),
- ]
- )
- return mounts
|