gpu.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. import re
  2. from typing import Tuple, Union, List, Callable
  3. from gpustack.schemas.benchmark import GPUSnapshots
  4. from gpustack.schemas.workers import GPUDeviceStatus, WorkerBase
  5. pattern = r"^(?P<worker_name>.+):(?P<device>[^:]+):(?P<gpu_index>\d+)$"
  6. def parse_gpu_id(input: str) -> Tuple[bool, dict]:
  7. """
  8. Parse the input string to check if it matches the format worker_name:device:gpu_index.
  9. Args:
  10. input_string (str): The input string to parse.
  11. Returns:
  12. tuple: (bool, dict)
  13. - If matched, the first value is True, and the second value is a dictionary
  14. containing worker_name, device, and gpu_index.
  15. - If not matched, the first value is False, and the second value is None.
  16. """
  17. match = re.match(pattern, input)
  18. if match:
  19. return True, match.groupdict()
  20. return False, None
  21. def make_gpu_id(worker_name: str, device: str, gpu_index: int) -> str:
  22. """
  23. Generate gpu_id, format: worker_name:device:gpu_index
  24. """
  25. return f"{worker_name}:{device}:{gpu_index}"
  26. def group_gpu_ids_by_worker(gpu_ids: list) -> dict:
  27. """
  28. Group GPU IDs by worker name.
  29. Args:
  30. gpu_ids (list): List of GPU IDs.
  31. Returns:
  32. dict: A dictionary where the keys are worker names and the values are lists of GPU IDs.
  33. """
  34. worker_gpu_ids = {}
  35. for gpu_id in gpu_ids:
  36. is_valid, matched = parse_gpu_id(gpu_id)
  37. if not is_valid:
  38. raise ValueError(f"Invalid GPU ID: {gpu_id}")
  39. worker_name = matched.get("worker_name")
  40. if worker_name not in worker_gpu_ids:
  41. worker_gpu_ids[worker_name] = []
  42. worker_gpu_ids[worker_name].append(gpu_id)
  43. for worker_name, gpu_ids in worker_gpu_ids.items():
  44. worker_gpu_ids[worker_name] = sorted(gpu_ids)
  45. return worker_gpu_ids
  46. def group_gpu_indexes_by_gpu_type_and_worker(gpu_ids: list) -> dict:
  47. """
  48. Group GPU indexes by gpu type and worker name.
  49. Args:
  50. gpu_ids (list): List of GPU IDs.
  51. Returns:
  52. dict: {gpu_type: {worker: [gpu_index, ...]}}
  53. """
  54. result = {}
  55. for gpu_id in gpu_ids:
  56. is_valid, matched = parse_gpu_id(gpu_id)
  57. if not is_valid:
  58. raise ValueError(f"Invalid GPU ID: {gpu_id}")
  59. worker = matched["worker_name"]
  60. gpu_type = matched["device"]
  61. gpu_index = int(matched["gpu_index"])
  62. result.setdefault(gpu_type, {}).setdefault(worker, []).append(gpu_index)
  63. # Sort indexes for each type/worker
  64. return {
  65. t: {w: sorted(idx) for w, idx in workers.items()}
  66. for t, workers in result.items()
  67. }
  68. def all_gpu_match(
  69. worker: Union[List[WorkerBase], WorkerBase],
  70. verify: Callable[[GPUDeviceStatus], bool],
  71. ) -> bool:
  72. """
  73. Check if all GPUs in the worker match the given callable condition.
  74. Args:
  75. worker (Union[List[WorkerBase], WorkerBase]): A worker or a list of workers.
  76. verify (Callable[GPUDeviceInfo], bool): A function that takes a GPU device and returns a boolean.
  77. Returns:
  78. bool: True if all GPUs match the condition, False otherwise.
  79. """
  80. if not worker:
  81. return False
  82. if isinstance(worker, list):
  83. return all(all_gpu_match(w, verify) for w in worker)
  84. if not worker.status or not worker.status.gpu_devices:
  85. return False
  86. return all(verify(gpu) for gpu in worker.status.gpu_devices)
  87. def any_gpu_match(
  88. worker: Union[List[WorkerBase], WorkerBase],
  89. verify: Callable[[GPUDeviceStatus], bool],
  90. ) -> bool:
  91. """
  92. Check if any GPU in the worker matches the given callable condition.
  93. Args:
  94. worker (Union[List[WorkerBase], WorkerBase]): A worker or a list of workers.
  95. verify (Callable[GPUDeviceInfo], bool): A function that takes a GPU device and returns a boolean.
  96. Returns:
  97. bool: True if any GPU matches the condition, False otherwise.
  98. """
  99. if isinstance(worker, list):
  100. return any(any_gpu_match(w, verify) for w in worker)
  101. if not worker.status or not worker.status.gpu_devices:
  102. return False
  103. return any(verify(gpu) for gpu in worker.status.gpu_devices)
  104. def find_one_gpu(
  105. worker: Union[List[WorkerBase], WorkerBase]
  106. ) -> Union[GPUDeviceStatus, None]:
  107. if isinstance(worker, list):
  108. for w in worker:
  109. gpu = find_one_gpu(w)
  110. if gpu is not None:
  111. return gpu
  112. elif worker.status and worker.status.gpu_devices:
  113. return worker.status.gpu_devices[0]
  114. return None
  115. def compare_compute_capability(current: str | None, target: str | None) -> int:
  116. """
  117. Safely compares two CUDA compute capability version strings.
  118. Args:
  119. current: The compute capability of the current device (e.g., "7.5").
  120. Accepts None, empty, or whitespace-only strings as invalid.
  121. target: The required or reference compute capability (e.g., "8.0").
  122. Also accepts None or invalid strings.
  123. Returns:
  124. -1 if `current` is less than `target`,
  125. 0 if they are equal (including both being invalid),
  126. 1 if `current` is greater than `target`.
  127. Invalid inputs (None, empty, whitespace, or malformed "X.Y" format)
  128. are treated as the lowest possible version. Thus:
  129. - Any valid version > any invalid version.
  130. - Two invalid versions are considered equal.
  131. """
  132. def parse_cc(cc: str | None) -> tuple[int, int] | None:
  133. """Parse a compute capability string into (major, minor) integers."""
  134. if cc is None:
  135. return None
  136. cc = cc.strip()
  137. if not cc:
  138. return None
  139. parts = cc.split('.', 1)
  140. if len(parts) != 2:
  141. return None
  142. try:
  143. major = int(parts[0])
  144. minor = int(parts[1])
  145. # Compute Capability versions are non-negative
  146. if major < 0 or minor < 0:
  147. return None
  148. return major, minor
  149. except (ValueError, TypeError):
  150. return None
  151. cur_parsed = parse_cc(current)
  152. tgt_parsed = parse_cc(target)
  153. # Both invalid → considered equal
  154. if cur_parsed is None and tgt_parsed is None:
  155. return 0
  156. # Current is invalid, target is valid → current < target
  157. if cur_parsed is None:
  158. return -1
  159. # Target is invalid, current is valid → current > target
  160. if tgt_parsed is None:
  161. return 1
  162. # Both are valid: compare numerically
  163. cur_major, cur_minor = cur_parsed
  164. tgt_major, tgt_minor = tgt_parsed
  165. if cur_major > tgt_major:
  166. return 1
  167. elif cur_major < tgt_major:
  168. return -1
  169. else:
  170. if cur_minor > tgt_minor:
  171. return 1
  172. elif cur_minor < tgt_minor:
  173. return -1
  174. else:
  175. return 0
  176. def abbreviate_gpu_indexes(indexes, max_show=3):
  177. """Return abbreviated string of GPU indexes, e.g. [0,1,2...(more 4)]"""
  178. if not indexes:
  179. return "[]"
  180. if len(indexes) <= max_show:
  181. return str(indexes)
  182. shown = indexes[:max_show]
  183. hidden_count = len(indexes) - max_show
  184. return f"[{','.join(map(str, shown))}...(more {hidden_count})]"
  185. def abbreviate_worker_gpu_indexes(
  186. worker_name: str,
  187. gpu_indexes: list[int],
  188. other_worker_count: int,
  189. other_gpu_count: int,
  190. max_show_gpu=3,
  191. ) -> str:
  192. """Return abbreviated string of worker GPU indexes, e.g. worker1:[0,1,2...(more 4)]"""
  193. abbreviated_indexes = abbreviate_gpu_indexes(gpu_indexes, max_show_gpu)
  194. msg = f"worker {worker_name} GPU indexes {abbreviated_indexes}"
  195. if other_gpu_count > 0 and other_worker_count > 0:
  196. msg += f" and {other_gpu_count} {'GPUs' if other_gpu_count > 1 else 'GPU'}"
  197. msg += f" from other {other_worker_count} {'workers' if other_worker_count > 1 else 'worker'}"
  198. return msg
  199. def summary_gpu_snapshots(gpu_snapshots: GPUSnapshots) -> Tuple[str, str]:
  200. """Return a summary string of GPU snapshots."""
  201. if not gpu_snapshots:
  202. return "No GPUs", "No GPUs"
  203. gpu_groups = {}
  204. gpu_vendors = []
  205. for _, gpu in gpu_snapshots.items():
  206. if gpu.name not in gpu_groups:
  207. gpu_groups[gpu.name] = 0
  208. gpu_groups[gpu.name] += 1
  209. gpu_vendors.append(gpu.vendor)
  210. sorted_groups = sorted(gpu_groups.items(), key=lambda x: -x[1])
  211. gpu_summary = "; ".join(
  212. f"{name}" if count == 1 else f"{name}x{count}" for name, count in sorted_groups
  213. )
  214. gpu_vendor_summary = ", ".join(sorted(set(gpu_vendors)))
  215. return gpu_summary, gpu_vendor_summary