| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785 |
- import json
- from collections import defaultdict
- import logging
- import re
- from typing import Dict, List, Optional, Tuple
- from gpustack.policies.base import (
- Allocatable,
- ModelInstanceScheduleCandidate,
- )
- from gpustack.policies.candidate_selectors.base_candidate_selector import (
- EVENT_ACTION_AUTO_MULTI_WORKER_MULTI_GPU,
- EVENT_ACTION_AUTO_SINGLE_GPU,
- EVENT_ACTION_AUTO_SINGLE_WORKER_MULTI_GPU,
- EVENT_ACTION_DEFAULT,
- EVENT_ACTION_MANUAL_MULTI,
- RequestEstimateUsage,
- ScheduleCandidatesSelector,
- )
- from gpustack.policies.event_recorder.recorder import EventCollector, EventLevelEnum
- from gpustack.policies.utils import (
- ListMessageBuilder,
- estimate_model_vram,
- group_workers_by_gpu_type,
- ram_not_enough,
- get_model_ram_claim,
- get_computed_ram_claim,
- sort_workers_by_gpu_count,
- )
- from gpustack.schemas.models import (
- CategoryEnum,
- ComputedResourceClaim,
- Model,
- ModelInstance,
- ModelInstanceSubordinateWorker,
- )
- from gpustack.schemas.workers import GPUDevicesStatus, Worker
- from gpustack.config import Config
- from gpustack.utils.command import (
- find_bool_parameter,
- find_parameter,
- find_int_parameter,
- )
- from gpustack.utils.unit import byte_to_gib
- logger = logging.getLogger(__name__)
- def parse_model_size_by_name(model_name: str) -> int:
- """
- Parse the model size from the model name.
- """
- match = re.search(r"(\d+(?:\.\d+)?)\s*[Bb]", model_name)
- if match:
- size_in_billions = float(match.group(1))
- return int(size_in_billions * 1e9)
- else:
- raise ValueError(f"Cannot parse model size from model name: {model_name}")
- class VLLMResourceFitSelector(ScheduleCandidatesSelector):
- def __init__(
- self,
- cfg: Config,
- model: Model,
- model_instances: List[ModelInstance],
- ):
- super().__init__(cfg, model, model_instances)
- self._vram_claim = 0
- self._ram_claim = 0
- self._largest_single_gpu_vram = 0
- self._largest_single_gpu_vram_utilization = 0
- self._largest_multi_gpu_vram = 0
- self._largest_multi_gpu_total = 0
- self._largest_multi_gpu_utilization_satisfied_count = 0
- self._messages = []
- self._event_collector = EventCollector(self._model, logger)
- self._workers_allocatable_resource: Dict[int, Allocatable] = {}
- self._worker_name_to_vram: Dict[str, Dict[int, int]] = {}
- self._unsatisfied_gpu_messages: Dict[str, List[int]] = {}
- world_size, strategies = (
- VLLMResourceFitSelector.get_world_size_from_backend_parameters(model)
- )
- self._set_gpu_count(world_size, strategies)
- async def _init_model_parameters(self, workers: List[Worker]):
- await super()._init_model_parameters(workers)
- self._validate_arguments()
- # GMU relies on architecture info in model parameters. Set it after model parameters are initialized.
- self._set_gpu_memory_utilization()
- def _should_check_vision_tp_divisibility(self) -> bool:
- if not self._model.backend_parameters:
- return True
- language_only = find_bool_parameter(
- self._model.backend_parameters, ["language-model-only"]
- )
- return not language_only
- @staticmethod
- def get_world_size_from_backend_parameters(
- model: Model,
- ) -> Tuple[Optional[int], Optional[List[str]]]:
- tp = find_int_parameter(
- model.backend_parameters, ["tensor-parallel-size", "tp"]
- )
- pp = find_int_parameter(
- model.backend_parameters, ["pipeline-parallel-size", "pp"]
- )
- dp = find_int_parameter(model.backend_parameters, ["data-parallel-size", "dp"])
- dpl = find_int_parameter(
- model.backend_parameters, ["--data-parallel-size-local", "dpl"]
- )
- if tp or pp or dp:
- world_size = 1
- strategies = []
- if tp:
- strategies.append("tp")
- world_size *= tp
- if pp:
- strategies.append("pp")
- world_size *= pp
- if dp:
- strategies.append("dp")
- world_size *= dp
- if dpl:
- # NB(thxCode): Indicate how many DP groups(each group owns `-dp` number devices) are there in one worker.
- world_size *= dpl
- return world_size, strategies
- return None, None
- def _set_gpu_memory_utilization(self):
- self._gpu_memory_utilization = 0.9
- model = self._model
- if self._disable_gpu_memory_utilization():
- # gpu memory utilization is not used for non-LLM models
- self._gpu_memory_utilization = 0
- self._gpu_memory_utilization_parameter_name = "gpu-memory-utilization"
- gmu = find_parameter(
- model.backend_parameters, [self._gpu_memory_utilization_parameter_name]
- )
- if gmu:
- self._gpu_memory_utilization = float(gmu)
- def _disable_gpu_memory_utilization(self) -> bool:
- """
- Determine whether GPU memory utilization should be disabled. vLLM does not use --gpu-memory-utilization for non-LLM models
- like embedding and reranker, except for some specific models like Qwen3-Embedding and Qwen3-Reranker.
- Rules:
- 1. For non-LLM models, GPU memory utilization is DISABLED (return True) unless they are in the exception list.
- 2. Otherwise, GPU memory utilization is ENABLED (return False).
- """
- if not self._model.categories:
- return False
- architectures = self._model_params.architectures or []
- # Non-LLM models that vLLM still uses GPU memory utilization
- NON_LLM_GMU_EXCEPTIONS = {
- "Qwen3ForCausalLM",
- "Qwen3ForSequenceClassification", # Qwen3-Embedding & Qwen3-Reranker
- "Qwen3VLForConditionalGeneration", # Qwen3-VL-Embedding & Qwen3-VL-Reranker
- }
- use_gmu_categories = [CategoryEnum.LLM, CategoryEnum.SPEECH_TO_TEXT]
- if any(cat in self._model.categories for cat in use_gmu_categories):
- return False
- # Disable for non-LLM models unless they are in the exception list
- return not any(arch in NON_LLM_GMU_EXCEPTIONS for arch in architectures)
- def _set_model_parameters(self):
- super()._set_model_parameters()
- # Get the architectures from hf-overrides. This helps make resource allocation
- # decisions for specific models like Qwen3-Embedding and Qwen3-Reranker.
- hf_overrides = find_parameter(self._model.backend_parameters, ["hf-overrides"])
- if hf_overrides:
- overrides_dict = json.loads(hf_overrides)
- if isinstance(overrides_dict, dict) and "architectures" in overrides_dict:
- self._model_params.architectures = overrides_dict["architectures"]
- self._num_attention_heads = self._model_params.num_attention_heads
- def _cal_effective_vram(self) -> float:
- if self._largest_multi_gpu_total == 0:
- return 0.0
- return (
- byte_to_gib(self._largest_multi_gpu_vram)
- * self._gpu_memory_utilization
- * self._largest_multi_gpu_utilization_satisfied_count
- / self._largest_multi_gpu_total
- )
- def _set_messages(self):
- if self._messages:
- return
- event_messages = {
- EVENT_ACTION_DEFAULT: "",
- EVENT_ACTION_MANUAL_MULTI: "",
- EVENT_ACTION_AUTO_MULTI_WORKER_MULTI_GPU: "",
- EVENT_ACTION_AUTO_SINGLE_WORKER_MULTI_GPU: "",
- EVENT_ACTION_AUTO_SINGLE_GPU: "",
- }
- for event in self._event_collector.events:
- event_messages[event.action] = event.message
- messages = event_messages[EVENT_ACTION_DEFAULT] + "\n"
- for action in [
- EVENT_ACTION_MANUAL_MULTI,
- EVENT_ACTION_AUTO_MULTI_WORKER_MULTI_GPU,
- EVENT_ACTION_AUTO_SINGLE_WORKER_MULTI_GPU,
- EVENT_ACTION_AUTO_SINGLE_GPU,
- ]:
- if event_messages[action]:
- messages += event_messages[action]
- break
- self._messages.append(messages)
- def _add_message(self, message: str):
- self._messages.append(message)
- def get_messages(self) -> List[str]:
- return self._messages
- def _get_worker_vram(self, worker: Worker) -> Dict[int, int]:
- if worker.name in self._worker_name_to_vram:
- return self._worker_name_to_vram[worker.name]
- if worker.status is None or not worker.status.gpu_devices:
- return {}
- vram_total_by_index = {}
- for gpu in worker.status.gpu_devices:
- total = gpu.memory.total if gpu.memory else 0
- vram_total_by_index[gpu.index] = total
- self._worker_name_to_vram[worker.name] = vram_total_by_index
- return vram_total_by_index
- async def select_candidates(
- self, workers: List[Worker]
- ) -> List[ModelInstanceScheduleCandidate]:
- """
- Get schedule candidates that fit the GPU resources requirement.
- """
- # Initialize model parameters.
- await self._init_model_parameters(workers)
- self._vram_claim = await estimate_model_vram(
- self._model, self._config.huggingface_token, workers
- )
- self._ram_claim = get_model_ram_claim(self._model)
- logger.info(
- f"Calculated resource claim for model {self._model.readable_source}, "
- f"VRAM claim: {self._vram_claim}, RAM claim: {self._ram_claim}"
- )
- default_msg_list = ListMessageBuilder(
- f"The model requires approximately {byte_to_gib(self._vram_claim)} GiB of VRAM"
- f"{f' and {byte_to_gib(self._ram_claim)} GiB of RAM' if self._ram_claim > 0 else ''}."
- )
- if self._gpu_memory_utilization != 0:
- default_msg_list.append(
- f"With --{self._gpu_memory_utilization_parameter_name}={self._gpu_memory_utilization}, "
- f"all GPUs combined need to provide at least {byte_to_gib(int(self._vram_claim / self._gpu_memory_utilization))} GiB of total VRAM "
- f"and each GPU needs {int(self._gpu_memory_utilization * 100)}% of allocatable VRAM."
- )
- self._event_collector.add(
- EventLevelEnum.INFO,
- EVENT_ACTION_DEFAULT,
- str(default_msg_list),
- )
- candidate_functions = [
- self.find_manual_gpu_selection_candidates,
- self.find_single_worker_single_gpu_full_offloading_candidates,
- self.find_single_worker_multi_gpu_full_offloading_candidates,
- self.find_multi_worker_multi_gpu_candidates,
- ]
- for candidate_func in candidate_functions:
- if self.should_skip_candidate_func(candidate_func):
- continue
- logger.debug(
- f"model {self._model.readable_source}, filter candidates with resource fit selector: {candidate_func.__name__}"
- )
- candidates = candidate_func(workers)
- if len(candidates) >= 1 and candidates[0].overcommit:
- # Manually selected candidate with overcommit. Also add the message.
- # It's useful for compatibility check.
- self._set_messages()
- if candidates:
- return candidates
- self._set_messages()
- return []
- def should_skip_candidate_func(self, candidate_func) -> bool:
- # Skip conditions for manual GPU selection.
- if (
- self._selected_gpu_workers
- and candidate_func != self.find_manual_gpu_selection_candidates
- ):
- return True
- # Skip conditions for distributed inference.
- if (
- not self._model.distributed_inference_across_workers
- and candidate_func == self.find_multi_worker_multi_gpu_candidates
- ):
- return True
- return False
- def find_manual_gpu_selection_candidates(
- self, workers: List[Worker]
- ) -> List[ModelInstanceScheduleCandidate]:
- request = RequestEstimateUsage(
- ram=self._ram_claim,
- vram=self._vram_claim,
- )
- return self._find_manual_gpu_selection_candidates(
- workers,
- {"*": self._gpu_memory_utilization},
- request,
- self._event_collector,
- )
- def find_single_worker_single_gpu_full_offloading_candidates(
- self, workers: List[Worker]
- ) -> List[ModelInstanceScheduleCandidate]:
- """
- Find single worker single gpu full offloading candidates for the model instance with workers.
- """
- if self._gpu_count is not None and self._gpu_count > 1:
- # Skip multi-GPU selection
- return []
- candidates = []
- workers_of_type = group_workers_by_gpu_type(workers)
- for gpu_type, workers_of_type in workers_of_type.items():
- for worker in workers_of_type:
- if not worker.status.gpu_devices:
- continue
- result = self._find_single_worker_single_gpu_full_offloading_candidates(
- worker,
- gpu_type,
- )
- if result:
- candidates.extend(result)
- return candidates
- def _find_single_worker_single_gpu_full_offloading_candidates(
- self,
- worker: Worker,
- gpu_type: Optional[str] = None,
- ) -> List[ModelInstanceScheduleCandidate]:
- """
- Find single worker single gpu full offloading candidates for the model instance with worker.
- requires: worker.status.gpu_devices is not None
- """
- candidates = []
- allocatable = self.get_worker_allocatable_resource(worker, gpu_type)
- if ram_not_enough(self._ram_claim, allocatable):
- return []
- if not worker.status.gpu_devices:
- return []
- for _, gpu in enumerate(worker.status.gpu_devices):
- gpu_index = gpu.index
- allocatable_vram = allocatable.vram.get(gpu_index, 0)
- allocatable_gpu_memory_utilization = allocatable_vram / gpu.memory.total
- if allocatable_vram > self._largest_single_gpu_vram:
- self._largest_single_gpu_vram = allocatable_vram
- self._largest_single_gpu_vram_utilization = (
- allocatable_gpu_memory_utilization
- )
- if gpu.memory is None or gpu.memory.total == 0:
- continue
- exceeds_vram = (
- self._vram_claim > gpu.memory.total * self._gpu_memory_utilization
- if self._gpu_memory_utilization > 0 # LLMs
- else self._vram_claim > allocatable_vram # non LLMs
- )
- exceeds_memory_utilization = (
- self._gpu_memory_utilization > 0
- and allocatable_gpu_memory_utilization < self._gpu_memory_utilization
- )
- if exceeds_vram or exceeds_memory_utilization:
- continue
- vram_claim_bytes = (
- int(gpu.memory.total * self._gpu_memory_utilization)
- if self._gpu_memory_utilization > 0 # LLMs
- else int(self._vram_claim) # non LLMs
- )
- vram_claim = {gpu_index: vram_claim_bytes}
- candidates.append(
- ModelInstanceScheduleCandidate(
- worker=worker,
- gpu_indexes=[gpu_index],
- gpu_type=gpu.type,
- computed_resource_claim=ComputedResourceClaim(
- vram=vram_claim,
- ram=get_computed_ram_claim(self._model, vram_claim),
- estimated_vram=self._vram_claim,
- ),
- )
- )
- if not candidates or (len(candidates) == 1 and candidates[0].overcommit):
- event_msg = f"The current available GPU only has {byte_to_gib(self._largest_single_gpu_vram)} GiB allocatable VRAM."
- if self._gpu_memory_utilization != 0:
- event_msg = (
- event_msg.rstrip(".")
- + f" ({(self._largest_single_gpu_vram_utilization * 100):.2f}%)."
- )
- self._event_collector.add(
- EventLevelEnum.INFO,
- EVENT_ACTION_AUTO_SINGLE_GPU,
- str(ListMessageBuilder(event_msg)),
- )
- return candidates
- def find_single_worker_multi_gpu_full_offloading_candidates(
- self, workers: List[Worker]
- ) -> List[ModelInstanceScheduleCandidate]:
- if self._gpu_count == 1:
- return []
- candidates = []
- workers_of_type = group_workers_by_gpu_type(workers)
- for gpu_type, workers_of_type in workers_of_type.items():
- for worker in workers_of_type:
- if not worker.status.gpu_devices:
- continue
- result = self._find_single_worker_multi_gpu_full_offloading_candidates(
- worker, gpu_type
- )
- if result:
- candidates.extend(result)
- if not candidates:
- return []
- min_gpu_count = min(len(candidate.gpu_indexes) for candidate in candidates)
- final_candidates = [
- candidate
- for candidate in candidates
- if len(candidate.gpu_indexes) == min_gpu_count
- ]
- return final_candidates
- def _find_single_worker_multi_gpu_full_offloading_candidates( # noqa: C901
- self, worker: Worker, gpu_type: Optional[str] = None
- ) -> List[ModelInstanceScheduleCandidate]:
- """
- Find single worker multi gpu full offloading candidates for the model instance.
- requires: worker.status.gpu_devices is not None
- """
- total_gpu = len(worker.status.gpu_devices)
- if total_gpu < 2:
- return None
- allocatable = self.get_worker_allocatable_resource(worker, gpu_type)
- if ram_not_enough(self._ram_claim, allocatable):
- return []
- gpu_list = []
- total_allocatable_vram = 0
- satisfied_gpu_count = 0
- for gpu in worker.status.gpu_devices:
- if gpu.memory is None or gpu.memory.total is None:
- continue
- allocatable_vram = allocatable.vram.get(gpu.index, 0)
- total_allocatable_vram += allocatable_vram
- if allocatable_vram / gpu.memory.total > self._gpu_memory_utilization:
- satisfied_gpu_count += 1
- gpu_list.append(gpu)
- if total_allocatable_vram > self._largest_multi_gpu_total:
- self._largest_multi_gpu_vram = total_allocatable_vram
- self._largest_multi_gpu_utilization_satisfied_count = satisfied_gpu_count
- self._largest_multi_gpu_total = len(worker.status.gpu_devices)
- # Sort by vram in descending order
- sorted_gpu_devices: GPUDevicesStatus = sorted(
- gpu_list,
- key=lambda gpu: allocatable.vram.get(gpu.index, 0),
- reverse=True,
- )
- vram_sum = 0
- gpu_sum = 0
- gpu_indexes = []
- vram_claim: Dict[int, int] = {}
- found_candidate = False
- for _, gpu in enumerate(sorted_gpu_devices):
- gpu_indexes.append(gpu.index)
- vram_claim[gpu.index] = (
- int(gpu.memory.total * self._gpu_memory_utilization)
- if self._gpu_memory_utilization > 0 # LLMs
- else allocatable.vram.get(gpu.index, 0) # non LLMs
- )
- gpu_sum += 1
- vram_sum += vram_claim[gpu.index]
- if not self._is_tp_size_divisible(gpu_sum):
- continue
- if self._gpu_count and gpu_sum >= self._gpu_count:
- if vram_sum >= self._vram_claim:
- found_candidate = True
- # if self._gpu_count is set, cannot return more than gpu_count
- break
- if (not self._gpu_count) and vram_sum >= self._vram_claim:
- found_candidate = True
- break
- if found_candidate:
- return [
- ModelInstanceScheduleCandidate(
- worker=worker,
- gpu_type=gpu_type,
- gpu_indexes=gpu_indexes,
- computed_resource_claim=ComputedResourceClaim(
- vram=vram_claim,
- ram=get_computed_ram_claim(self._model, vram_claim),
- estimated_vram=self._vram_claim,
- ),
- )
- ]
- event_msg_list = []
- if msg := self._check_tp_size_divisibility(
- self._largest_multi_gpu_utilization_satisfied_count
- ):
- event_msg_list.append(msg)
- event_msg = f"The largest available worker has {byte_to_gib(self._largest_multi_gpu_vram)} GiB allocatable VRAM."
- if self._gpu_memory_utilization != 0:
- event_msg = (
- event_msg.rstrip(".")
- + f", {self._largest_multi_gpu_utilization_satisfied_count}/{self._largest_multi_gpu_total} of GPUs meet the VRAM utilization ratio, providing {self._cal_effective_vram():.2f} GiB of allocatable VRAM."
- )
- event_msg_list.append(event_msg)
- self._event_collector.add(
- EventLevelEnum.INFO,
- EVENT_ACTION_AUTO_SINGLE_WORKER_MULTI_GPU,
- str(ListMessageBuilder(event_msg_list)),
- )
- return []
- def find_multi_worker_multi_gpu_candidates(
- self, workers: List[Worker]
- ) -> List[ModelInstanceScheduleCandidate]:
- candidates = []
- workers_of_type = group_workers_by_gpu_type(workers)
- for gpu_type, workers_of_type in workers_of_type.items():
- result = self.auto_select_multi_worker_multi_gpu_candidates(
- workers_of_type, gpu_type
- )
- if result:
- candidates.extend(result)
- return candidates
- def auto_select_multi_worker_multi_gpu_candidates( # noqa: C901
- self, workers: List[Worker], gpu_type: Optional[str] = None
- ) -> List[ModelInstanceScheduleCandidate]:
- """
- Auto select multi worker multi gpu candidates.
- Currently, a candidate should match the following conditions:
- 1. Workers in the candidate have the same number of GPUs.
- 2. All GPUs in the worker satisfy the gpu_memory_utilization requirement.
- 3. TP size can be divided by the number of attention heads.
- 4. The total VRAM claim is greater than the estimated VRAM claim.
- 5. If gpu_count is set via parallelism, the total GPU count should be equal to gpu_count.
- """
- if not workers or len(workers) < 2:
- return []
- sort_workers_by_gpu_count(workers)
- workers_by_gpu_count_dict = defaultdict(list)
- for worker in workers:
- if not worker.status or not worker.status.gpu_devices:
- continue
- workers_by_gpu_count_dict[len(worker.status.gpu_devices)].append(worker)
- # Store the optimal combination info to show
- workers_combination: List[Worker] = []
- largest_vram = 0
- worker_count = 0
- device_count_per_worker = 0
- # Loop through worker groups with the same number of GPUs.
- for gpu_count, worker_group in workers_by_gpu_count_dict.items():
- if len(worker_group) < 2:
- continue
- if not self._is_tp_size_divisible(gpu_count):
- continue
- selected_workers: List[Worker] = []
- gpu_sum = 0
- vram_sum = 0
- for worker in worker_group:
- allocatable = self.get_worker_allocatable_resource(worker, gpu_type)
- if ram_not_enough(self._ram_claim, allocatable):
- # The RAM resource(for extended KV cache) is required per worker.
- # Skip the worker if it does not satisfy the RAM requirement.
- continue
- if any(
- gpu.memory is None
- or gpu.memory.total is None
- or (
- allocatable.vram.get(gpu.index, 0) / gpu.memory.total
- < self._gpu_memory_utilization
- )
- for gpu in worker.status.gpu_devices
- ):
- # Skip the worker if any GPU does not satisfy the gpu_memory_utilization requirement.
- continue
- selected_workers.append(worker)
- gpu_sum += gpu_count
- vram_sum += sum(
- int(gpu.memory.total * (self._gpu_memory_utilization or 1))
- for gpu in worker.status.gpu_devices
- )
- if self._gpu_count:
- # Parallelism is set. Proceed until we match the exact GPU count.
- if gpu_sum < self._gpu_count:
- continue
- elif gpu_sum > self._gpu_count:
- break
- if vram_sum >= self._vram_claim:
- return [
- _create_candidate(
- self._model,
- selected_workers,
- self._gpu_memory_utilization,
- estimated_vram=self._vram_claim,
- )
- ]
- if vram_sum > largest_vram:
- workers_combination = selected_workers
- largest_vram = vram_sum
- worker_count = len(worker_group)
- device_count_per_worker = gpu_count
- # Nothing can be return, construct scheduling message
- event_message = ListMessageBuilder([])
- if self._gpu_memory_utilization == 0:
- event_message.append(
- f"The largest available worker has {byte_to_gib(largest_vram)} GiB of VRAM."
- )
- elif workers_combination:
- worker_names = [worker.name for worker in workers_combination]
- worker_names_msg = (
- str(worker_names[:3]).rstrip("]")
- + f"...(more {len(worker_names) - 3})]"
- if len(worker_names) > 3
- else str(worker_names)
- )
- message = f"The optimal combination {worker_names_msg} provides {byte_to_gib(largest_vram)} GiB of allocatable VRAM."
- if worker_count - len(workers_combination) > 0:
- message += f" There are {worker_count - len(workers_combination)} {'workers' if worker_count - len(workers_combination) > 1 else 'worker'} that can provide {device_count_per_worker} {'GPUs' if device_count_per_worker > 1 else 'GPU'}, as the workers in the combination, but some GPUs among them fail to meet requirements."
- event_message.append(message)
- event_message.append(
- "Cannot find a suitable worker combination to run the model in distributed mode. "
- "If you are confident that the resources are sufficient, you may manually schedule the model by selecting the workers and GPUs."
- )
- self._event_collector.add(
- EventLevelEnum.INFO,
- EVENT_ACTION_AUTO_MULTI_WORKER_MULTI_GPU,
- str(event_message),
- )
- return []
- def _validate_arguments(self):
- tp = find_int_parameter(
- self._model.backend_parameters, ["tensor-parallel-size", "tp"]
- )
- if msg := self._check_tp_size_divisibility(tp):
- raise ValueError(
- msg + " Consider adjusting your tensor-parallel-size value."
- )
- def _create_candidate(
- model: Model,
- selected_workers: List[Worker],
- gpu_memory_utilization: float = 0.9,
- estimated_vram: Optional[int] = None,
- ) -> ModelInstanceScheduleCandidate:
- """
- Create a candidate with all GPUs from the selected workers.
- """
- main_worker = selected_workers[0]
- vram_claim_main = {
- gpu.index: int(gpu.memory.total * gpu_memory_utilization)
- for gpu in main_worker.status.gpu_devices
- }
- gpu_type = main_worker.status.gpu_devices[0].type
- candidate = ModelInstanceScheduleCandidate(
- worker=main_worker,
- gpu_type=gpu_type,
- gpu_indexes=[gpu.index for gpu in main_worker.status.gpu_devices],
- computed_resource_claim=ComputedResourceClaim(
- vram=vram_claim_main,
- ram=get_computed_ram_claim(model, vram_claim_main),
- estimated_vram=estimated_vram,
- ),
- )
- candidate.subordinate_workers = []
- for worker in selected_workers[1:]:
- vram_claim_subworker = {
- gpu.index: int(gpu.memory.total * gpu_memory_utilization)
- for gpu in worker.status.gpu_devices
- }
- candidate.subordinate_workers.append(
- ModelInstanceSubordinateWorker(
- worker_id=worker.id,
- worker_name=worker.name,
- worker_ip=worker.ip,
- worker_ifname=worker.ifname,
- total_gpus=len(worker.status.gpu_devices),
- gpu_type=gpu_type,
- gpu_indexes=[gpu.index for gpu in worker.status.gpu_devices],
- computed_resource_claim=ComputedResourceClaim(
- vram=vram_claim_subworker,
- ram=get_computed_ram_claim(model, vram_claim_subworker),
- estimated_vram=estimated_vram,
- ),
- )
- )
- return candidate
|