| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- import re
- from typing import Tuple, Union, List, Callable
- from gpustack.schemas.benchmark import GPUSnapshots
- from gpustack.schemas.workers import GPUDeviceStatus, WorkerBase
- pattern = r"^(?P<worker_name>.+):(?P<device>[^:]+):(?P<gpu_index>\d+)$"
- def parse_gpu_id(input: str) -> Tuple[bool, dict]:
- """
- Parse the input string to check if it matches the format worker_name:device:gpu_index.
- Args:
- input_string (str): The input string to parse.
- Returns:
- tuple: (bool, dict)
- - If matched, the first value is True, and the second value is a dictionary
- containing worker_name, device, and gpu_index.
- - If not matched, the first value is False, and the second value is None.
- """
- match = re.match(pattern, input)
- if match:
- return True, match.groupdict()
- return False, None
- def make_gpu_id(worker_name: str, device: str, gpu_index: int) -> str:
- """
- Generate gpu_id, format: worker_name:device:gpu_index
- """
- return f"{worker_name}:{device}:{gpu_index}"
- def group_gpu_ids_by_worker(gpu_ids: list) -> dict:
- """
- Group GPU IDs by worker name.
- Args:
- gpu_ids (list): List of GPU IDs.
- Returns:
- dict: A dictionary where the keys are worker names and the values are lists of GPU IDs.
- """
- worker_gpu_ids = {}
- for gpu_id in gpu_ids:
- is_valid, matched = parse_gpu_id(gpu_id)
- if not is_valid:
- raise ValueError(f"Invalid GPU ID: {gpu_id}")
- worker_name = matched.get("worker_name")
- if worker_name not in worker_gpu_ids:
- worker_gpu_ids[worker_name] = []
- worker_gpu_ids[worker_name].append(gpu_id)
- for worker_name, gpu_ids in worker_gpu_ids.items():
- worker_gpu_ids[worker_name] = sorted(gpu_ids)
- return worker_gpu_ids
- def group_gpu_indexes_by_gpu_type_and_worker(gpu_ids: list) -> dict:
- """
- Group GPU indexes by gpu type and worker name.
- Args:
- gpu_ids (list): List of GPU IDs.
- Returns:
- dict: {gpu_type: {worker: [gpu_index, ...]}}
- """
- result = {}
- for gpu_id in gpu_ids:
- is_valid, matched = parse_gpu_id(gpu_id)
- if not is_valid:
- raise ValueError(f"Invalid GPU ID: {gpu_id}")
- worker = matched["worker_name"]
- gpu_type = matched["device"]
- gpu_index = int(matched["gpu_index"])
- result.setdefault(gpu_type, {}).setdefault(worker, []).append(gpu_index)
- # Sort indexes for each type/worker
- return {
- t: {w: sorted(idx) for w, idx in workers.items()}
- for t, workers in result.items()
- }
- def all_gpu_match(
- worker: Union[List[WorkerBase], WorkerBase],
- verify: Callable[[GPUDeviceStatus], bool],
- ) -> bool:
- """
- Check if all GPUs in the worker match the given callable condition.
- Args:
- worker (Union[List[WorkerBase], WorkerBase]): A worker or a list of workers.
- verify (Callable[GPUDeviceInfo], bool): A function that takes a GPU device and returns a boolean.
- Returns:
- bool: True if all GPUs match the condition, False otherwise.
- """
- if not worker:
- return False
- if isinstance(worker, list):
- return all(all_gpu_match(w, verify) for w in worker)
- if not worker.status or not worker.status.gpu_devices:
- return False
- return all(verify(gpu) for gpu in worker.status.gpu_devices)
- def any_gpu_match(
- worker: Union[List[WorkerBase], WorkerBase],
- verify: Callable[[GPUDeviceStatus], bool],
- ) -> bool:
- """
- Check if any GPU in the worker matches the given callable condition.
- Args:
- worker (Union[List[WorkerBase], WorkerBase]): A worker or a list of workers.
- verify (Callable[GPUDeviceInfo], bool): A function that takes a GPU device and returns a boolean.
- Returns:
- bool: True if any GPU matches the condition, False otherwise.
- """
- if isinstance(worker, list):
- return any(any_gpu_match(w, verify) for w in worker)
- if not worker.status or not worker.status.gpu_devices:
- return False
- return any(verify(gpu) for gpu in worker.status.gpu_devices)
- def find_one_gpu(
- worker: Union[List[WorkerBase], WorkerBase]
- ) -> Union[GPUDeviceStatus, None]:
- if isinstance(worker, list):
- for w in worker:
- gpu = find_one_gpu(w)
- if gpu is not None:
- return gpu
- elif worker.status and worker.status.gpu_devices:
- return worker.status.gpu_devices[0]
- return None
- def compare_compute_capability(current: str | None, target: str | None) -> int:
- """
- Safely compares two CUDA compute capability version strings.
- Args:
- current: The compute capability of the current device (e.g., "7.5").
- Accepts None, empty, or whitespace-only strings as invalid.
- target: The required or reference compute capability (e.g., "8.0").
- Also accepts None or invalid strings.
- Returns:
- -1 if `current` is less than `target`,
- 0 if they are equal (including both being invalid),
- 1 if `current` is greater than `target`.
- Invalid inputs (None, empty, whitespace, or malformed "X.Y" format)
- are treated as the lowest possible version. Thus:
- - Any valid version > any invalid version.
- - Two invalid versions are considered equal.
- """
- def parse_cc(cc: str | None) -> tuple[int, int] | None:
- """Parse a compute capability string into (major, minor) integers."""
- if cc is None:
- return None
- cc = cc.strip()
- if not cc:
- return None
- parts = cc.split('.', 1)
- if len(parts) != 2:
- return None
- try:
- major = int(parts[0])
- minor = int(parts[1])
- # Compute Capability versions are non-negative
- if major < 0 or minor < 0:
- return None
- return major, minor
- except (ValueError, TypeError):
- return None
- cur_parsed = parse_cc(current)
- tgt_parsed = parse_cc(target)
- # Both invalid → considered equal
- if cur_parsed is None and tgt_parsed is None:
- return 0
- # Current is invalid, target is valid → current < target
- if cur_parsed is None:
- return -1
- # Target is invalid, current is valid → current > target
- if tgt_parsed is None:
- return 1
- # Both are valid: compare numerically
- cur_major, cur_minor = cur_parsed
- tgt_major, tgt_minor = tgt_parsed
- if cur_major > tgt_major:
- return 1
- elif cur_major < tgt_major:
- return -1
- else:
- if cur_minor > tgt_minor:
- return 1
- elif cur_minor < tgt_minor:
- return -1
- else:
- return 0
- def abbreviate_gpu_indexes(indexes, max_show=3):
- """Return abbreviated string of GPU indexes, e.g. [0,1,2...(more 4)]"""
- if not indexes:
- return "[]"
- if len(indexes) <= max_show:
- return str(indexes)
- shown = indexes[:max_show]
- hidden_count = len(indexes) - max_show
- return f"[{','.join(map(str, shown))}...(more {hidden_count})]"
- def abbreviate_worker_gpu_indexes(
- worker_name: str,
- gpu_indexes: list[int],
- other_worker_count: int,
- other_gpu_count: int,
- max_show_gpu=3,
- ) -> str:
- """Return abbreviated string of worker GPU indexes, e.g. worker1:[0,1,2...(more 4)]"""
- abbreviated_indexes = abbreviate_gpu_indexes(gpu_indexes, max_show_gpu)
- msg = f"worker {worker_name} GPU indexes {abbreviated_indexes}"
- if other_gpu_count > 0 and other_worker_count > 0:
- msg += f" and {other_gpu_count} {'GPUs' if other_gpu_count > 1 else 'GPU'}"
- msg += f" from other {other_worker_count} {'workers' if other_worker_count > 1 else 'worker'}"
- return msg
- def summary_gpu_snapshots(gpu_snapshots: GPUSnapshots) -> Tuple[str, str]:
- """Return a summary string of GPU snapshots."""
- if not gpu_snapshots:
- return "No GPUs", "No GPUs"
- gpu_groups = {}
- gpu_vendors = []
- for _, gpu in gpu_snapshots.items():
- if gpu.name not in gpu_groups:
- gpu_groups[gpu.name] = 0
- gpu_groups[gpu.name] += 1
- gpu_vendors.append(gpu.vendor)
- sorted_groups = sorted(gpu_groups.items(), key=lambda x: -x[1])
- gpu_summary = "; ".join(
- f"{name}" if count == 1 else f"{name}x{count}" for name, count in sorted_groups
- )
- gpu_vendor_summary = ", ".join(sorted(set(gpu_vendors)))
- return gpu_summary, gpu_vendor_summary
|