gguf_resource_fit_selector.py 95 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526
  1. import itertools
  2. import logging
  3. import os
  4. import copy
  5. import time
  6. from typing import Any, Dict, List, Optional, Tuple
  7. from gpustack.policies.event_recorder.recorder import EventCollector, EventLevelEnum
  8. from gpustack.policies.utils import get_worker_allocatable_resource, ListMessageBuilder
  9. from gpustack.scheduler.calculator import (
  10. GPUOffloadEnum,
  11. ModelResourceClaim,
  12. Estimate,
  13. MemoryEstimate,
  14. calculate_gguf_model_resource_claim,
  15. )
  16. from gpustack.policies.base import (
  17. Allocatable,
  18. ModelInstanceScheduleCandidate,
  19. )
  20. from gpustack.policies.candidate_selectors.base_candidate_selector import (
  21. ScheduleCandidatesSelector,
  22. )
  23. from gpustack.schemas.models import (
  24. ComputedResourceClaim,
  25. Model,
  26. ModelInstance,
  27. ModelInstanceSubordinateWorker,
  28. is_image_model,
  29. )
  30. from gpustack.schemas.workers import Worker
  31. from gpustack.utils.command import find_parameter
  32. from gpustack.utils.convert import safe_int
  33. from gpustack.utils.gpu import parse_gpu_id, group_gpu_ids_by_worker
  34. from gpustack.utils.unit import byte_to_gib, byte_to_kib
  35. logger = logging.getLogger(__name__)
  36. DEFAULT_MAX_RPC_SERVER_COUNT = 8
  37. DEFAULT_MAX_RPC_COMBINATION_GENERATE_GPU_COUNT = 16
  38. default_max_rpc_server_count = int(
  39. os.getenv("DEFAULT_MAX_RPC_SERVER_COUNT", DEFAULT_MAX_RPC_SERVER_COUNT)
  40. )
  41. default_max_rpc_combination_generate_gpu_count = int(
  42. os.getenv(
  43. "DEFAULT_MAX_RPC_COMBINATION_GENERATE_GPU_COUNT",
  44. DEFAULT_MAX_RPC_COMBINATION_GENERATE_GPU_COUNT,
  45. )
  46. )
  47. # event reasons
  48. EVENT_REASON_INSUFFICIENT_RESOURCES = "INSUFFICIENT_RESOURCES"
  49. EVENT_REASON_MAX_RPC_COMBINATION_GENERATE_GPU_COUNT_EXCEED = (
  50. "MAX_RPC_COMBINATION_GENERATE_GPU_COUNT_EXCEED"
  51. )
  52. EVENT_REASON_INSUFFICIENT_RESOURCES_GPU_SELECTED = "INSUFFICIENT_RESOURCES_GPU_SELECTED"
  53. EVENT_REASON_SELECTED_INVALID_GPU = "SELECTED_INVALID_GPU"
  54. EVENT_REASON_INVALID_BACKEND_PARAMETER = "INVALID_BACKEND_PARAMETER"
  55. # event action
  56. EVENT_ACTION_SINGLE_WORKER_SINGLE_GPU_FULL_OFFLOADING = (
  57. "Single-Worker Single-GPU Full Offloading"
  58. )
  59. EVENT_ACTION_SINGLE_WORKER_MULTI_GPU_FULL_OFFLOADING = (
  60. "Single-Worker Multi-GPU Full Offloading"
  61. )
  62. EVENT_ACTION_DISTRIBUTED_DEPLOYMENT = "Distributed Deployment"
  63. EVENT_ACTION_SINGLE_WORKER_PARTIAL_OFFLOADING = "Single-Worker Partial Offloading"
  64. EVENT_ACTION_CPU_OFFLOADING = "CPU Offloading"
  65. EVENT_ACTION_PRE_CHECK = "Pre-Check"
  66. class GGUFResourceFitSelector(ScheduleCandidatesSelector):
  67. def __init__(
  68. self,
  69. model: Model,
  70. model_instances: List[ModelInstance],
  71. cache_dir: Optional[str] = None,
  72. ):
  73. self._initialize_basic_data(model, model_instances, cache_dir)
  74. self._initialize_cached_claim_data()
  75. self._initialize_model_parameters(model)
  76. self._initialize_selected_gpu_ids()
  77. def _initialize_basic_data(
  78. self,
  79. model: Model,
  80. model_instances: List[ModelInstance],
  81. cache_dir: Optional[str],
  82. ):
  83. """Initialize basic data."""
  84. self._model = model
  85. self._model_instances = model_instances
  86. self._cache_dir = cache_dir
  87. self._workers = [] # Initialize workers list for remote parsing
  88. self._workers_allocatable_resource = {}
  89. self._gpus_allocatable_vram = []
  90. self._workers_allocatable_vram = []
  91. self._worker_name_to_worker: Dict[str, Worker] = {}
  92. self._worker_id_to_worker: Dict[int, Worker] = {}
  93. self._max_gpu_vram = 0
  94. self._approximate_full_offload_required_gpu_number = 1
  95. self._allocatable_gpu_count = 0
  96. self._gpu_count = 0
  97. self._allocatable_worker_count = 0
  98. self._messages = []
  99. self._event_collector = EventCollector(self._model, logger)
  100. def _should_check_vision_tp_divisibility(self) -> bool:
  101. return False
  102. def _initialize_cached_claim_data(self):
  103. """Initialize cached claim data."""
  104. # Cached simple claim data.
  105. self._full_offload_resource_claim = None
  106. self._partial_offload_resource_claim = None
  107. self._disable_offload_resource_claim = None
  108. self._total_layers = 0
  109. self._non_uma_single_gpu_full_offload_vram = 0
  110. self._uma_single_gpu_full_offload_vram = 0
  111. self._non_uma_single_layer_vram = 0
  112. self._uma_single_layer_vram = 0
  113. self._rpc_non_uma_single_layer_vram = 0
  114. self._rpc_uma_single_layer_vram = 0
  115. # Cached complex claim data.
  116. self._multi_workers_multi_gpus_partial_offload_resource_claim_cache = {}
  117. self._single_worker_multi_gpus_partial_offload_resource_claim_cache = {}
  118. self._cache_max_size = 50
  119. def _initialize_model_parameters(self, model: Model):
  120. """Initialize model parameters."""
  121. self._param_tensor_split = None
  122. self._param_gpu_layers = None
  123. self._param_ctx_size_in_kib = 8192
  124. if model.backend_parameters:
  125. self._param_tensor_split = find_parameter(
  126. model.backend_parameters, ["ts", "tensor-split"]
  127. )
  128. _param_gpu_layers = find_parameter(
  129. model.backend_parameters, ["ngl", "gpu-layers", "n-gpu-layers"]
  130. )
  131. if _param_gpu_layers:
  132. self._param_gpu_layers = safe_int(_param_gpu_layers, default=None)
  133. _param_ctx_size = find_parameter(
  134. self._model.backend_parameters, ["ctx-size", "c"]
  135. )
  136. if _param_ctx_size:
  137. self._param_ctx_size_in_kib = byte_to_kib(safe_int(_param_ctx_size))
  138. def _initialize_selected_gpu_ids(self):
  139. """Initialize selected GPU IDs."""
  140. self._max_rpc_server_count = default_max_rpc_server_count
  141. self._selected_gpu_ids_by_worker = {}
  142. self._selected_gpu_ids = []
  143. if self._model.gpu_selector and self._model.gpu_selector.gpu_ids:
  144. self._selected_gpu_ids_by_worker = group_gpu_ids_by_worker(
  145. self._model.gpu_selector.gpu_ids
  146. )
  147. self._selected_gpu_ids = sorted(self._model.gpu_selector.gpu_ids)
  148. self._max_rpc_server_count = len(self._selected_gpu_ids)
  149. if self._param_tensor_split:
  150. # ignore the gpu_selector if tensor split is set.
  151. logger.info(
  152. f"Model {self._model.name} has tensor-split, ignore the gpu_selector"
  153. )
  154. self._selected_gpu_ids_by_worker = {}
  155. self._selected_gpu_ids = []
  156. def _get_worker_allocatable_resource(self, worker: Worker) -> Allocatable:
  157. if self._workers_allocatable_resource.get(worker.id):
  158. return self._workers_allocatable_resource.get(worker.id)
  159. return get_worker_allocatable_resource(self._model_instances, worker)
  160. def _get_claim_with_layers(
  161. self, layers: int, is_uma: bool = False
  162. ) -> Tuple[int, int]:
  163. vram_claim = 0
  164. ram_claim = 0
  165. for memory in self._partial_offload_resource_claim.items:
  166. if memory.offloadLayers == layers:
  167. vram_claim = memory.vrams[0].nonuma
  168. ram_claim = memory.ram.nonuma
  169. if is_uma:
  170. vram_claim = memory.vrams[0].uma
  171. ram_claim = memory.ram.uma
  172. break
  173. return vram_claim, ram_claim
  174. def _get_tensor_split_claim_with_layers(
  175. self,
  176. layers: int,
  177. is_uma: bool = False,
  178. claim_items: List[MemoryEstimate] = None,
  179. ) -> Tuple[List[int], int]:
  180. vram_claims = []
  181. ram_claim = 0
  182. for memory in claim_items:
  183. if memory.offloadLayers == layers:
  184. if is_uma:
  185. vram_claims = [vram.uma for vram in memory.vrams]
  186. ram_claim = memory.ram.uma
  187. else:
  188. vram_claims = [vram.nonuma for vram in memory.vrams]
  189. ram_claim = memory.ram.nonuma
  190. break
  191. return vram_claims, ram_claim
  192. def _filter_selected_gpus_in_workers(self, workers: List[Worker]):
  193. """
  194. Filter the selected GPUs in workers.
  195. """
  196. if (
  197. self._model.gpu_selector is None
  198. or self._model.gpu_selector.gpu_ids is None
  199. or len(self._model.gpu_selector.gpu_ids) == 0
  200. ):
  201. return workers, []
  202. for worker in workers:
  203. gpu_candidates = []
  204. for gpu in worker.status.gpu_devices:
  205. id = f"{worker.name}:{gpu.type}:{gpu.index}"
  206. if id not in self._model.gpu_selector.gpu_ids:
  207. continue
  208. gpu_candidates.append(gpu)
  209. worker.status.gpu_devices = gpu_candidates
  210. def _set_workers_allocatable_resource(self, workers: List[Worker]):
  211. workers_allocatable, workers_allocatable_vram, workers_gpus_allocatable_vram = (
  212. self._generate_workers_and_gpus_allocatable_resources(workers)
  213. )
  214. sorted_workers_allocatable_vram, sorted_gpus_allocatable_vram = (
  215. _sort_and_group_worker_gpu_vram(
  216. workers_allocatable_vram, workers_gpus_allocatable_vram
  217. )
  218. )
  219. self._workers_allocatable_resource = workers_allocatable
  220. self._gpus_allocatable_vram = sorted_gpus_allocatable_vram
  221. self._workers_allocatable_vram = sorted_workers_allocatable_vram
  222. self._allocatable_gpu_count = len(self._gpus_allocatable_vram)
  223. self._gpu_count = sum(
  224. len(worker.status.gpu_devices)
  225. for worker in workers
  226. if worker.status.gpu_devices
  227. )
  228. if len(self._gpus_allocatable_vram) > 0:
  229. self._max_gpu_vram = self._gpus_allocatable_vram[0][2]
  230. self._approximate_full_offload_required_gpu_number = (
  231. self._estimate_approximate_required_gpu_number(
  232. self._non_uma_single_gpu_full_offload_vram
  233. )
  234. )
  235. for worker in workers:
  236. self._worker_id_to_worker[worker.id] = worker
  237. self._worker_name_to_worker[worker.name] = worker
  238. self._allocatable_worker_count = len(self._workers_allocatable_resource.keys())
  239. def _set_single_layer_vram(
  240. self, result: ModelResourceClaim, rpc_result: ModelResourceClaim
  241. ):
  242. for item in result.resource_claim_estimate.items:
  243. if item.vrams[0].handleLayers == 1:
  244. self._non_uma_single_layer_vram = item.vrams[0].nonuma
  245. self._uma_single_layer_vram = item.vrams[0].uma
  246. break
  247. for item in rpc_result.resource_claim_estimate.items:
  248. if item.vrams[0].handleLayers == 1:
  249. self._rpc_non_uma_single_layer_vram = item.vrams[0].nonuma
  250. self._rpc_uma_single_layer_vram = item.vrams[0].uma
  251. break
  252. async def _set_offload_resource_claim(self):
  253. result = await self._calculate_model_resource_claim()
  254. rpc_result = await self._calculate_model_resource_claim(
  255. tensor_split=[1, 1],
  256. rpc=["host:80"],
  257. )
  258. disable_offload_result = copy.deepcopy(result)
  259. disable_offload_result.resource_claim_estimate.items = [
  260. result.resource_claim_estimate.items[0]
  261. ]
  262. full_offload_result = copy.deepcopy(result)
  263. full_offload_result.resource_claim_estimate.items = [
  264. result.resource_claim_estimate.items[-1]
  265. ]
  266. full_offload_item = full_offload_result.resource_claim_estimate.items[0]
  267. self._full_offload_resource_claim = full_offload_result.resource_claim_estimate
  268. self._partial_offload_resource_claim = result.resource_claim_estimate
  269. self._disable_offload_result_claim = (
  270. disable_offload_result.resource_claim_estimate
  271. )
  272. self._total_layers = full_offload_item.offloadLayers
  273. self._uma_single_gpu_full_offload_vram = full_offload_item.vrams[0].uma
  274. self._non_uma_single_gpu_full_offload_vram = full_offload_item.vrams[0].nonuma
  275. self._uma_single_gpu_full_offload_ram = full_offload_item.ram.uma
  276. self._non_uma_single_gpu_full_offload_ram = full_offload_item.ram.nonuma
  277. self._set_single_layer_vram(result, rpc_result)
  278. def _set_model_parameters(self):
  279. if self._param_gpu_layers and self._param_gpu_layers > self._total_layers:
  280. self._param_gpu_layers = self._total_layers
  281. def _set_messages(self, candidates: List[ModelInstanceScheduleCandidate]):
  282. event_messages = {
  283. EventLevelEnum.ERROR: [],
  284. EventLevelEnum.WARNING: [],
  285. EVENT_ACTION_PRE_CHECK: "",
  286. EVENT_ACTION_SINGLE_WORKER_SINGLE_GPU_FULL_OFFLOADING: "",
  287. EVENT_ACTION_SINGLE_WORKER_MULTI_GPU_FULL_OFFLOADING: "",
  288. EVENT_ACTION_DISTRIBUTED_DEPLOYMENT: "",
  289. EVENT_ACTION_SINGLE_WORKER_PARTIAL_OFFLOADING: "",
  290. EVENT_ACTION_CPU_OFFLOADING: "",
  291. }
  292. for event in self._event_collector.events:
  293. if event.level == EventLevelEnum.ERROR:
  294. event_messages[EventLevelEnum.ERROR].append(event.message)
  295. elif event.level == EventLevelEnum.WARNING:
  296. event_messages[EventLevelEnum.WARNING].append(event.message)
  297. else:
  298. event_messages[event.action] = event.message
  299. if event_messages[EventLevelEnum.ERROR]:
  300. self._messages = event_messages[EventLevelEnum.ERROR]
  301. return
  302. if not candidates:
  303. for action in [
  304. EVENT_ACTION_PRE_CHECK,
  305. EVENT_ACTION_SINGLE_WORKER_MULTI_GPU_FULL_OFFLOADING,
  306. EVENT_ACTION_SINGLE_WORKER_SINGLE_GPU_FULL_OFFLOADING,
  307. EVENT_ACTION_DISTRIBUTED_DEPLOYMENT,
  308. EVENT_ACTION_SINGLE_WORKER_PARTIAL_OFFLOADING,
  309. EVENT_ACTION_CPU_OFFLOADING,
  310. ]:
  311. if event_messages[action]:
  312. self._messages.append(event_messages[action])
  313. break
  314. self._messages.extend(event_messages[EventLevelEnum.WARNING])
  315. def get_messages(self) -> List[str]:
  316. return self._messages
  317. async def select_candidates(
  318. self, workers: List[Worker]
  319. ) -> List[ModelInstanceScheduleCandidate]:
  320. """
  321. Get schedule candidates by the resource fit claim.
  322. """
  323. if not workers:
  324. return []
  325. # Save workers reference for remote parsing
  326. self._workers = workers
  327. # reset the data with input workers.
  328. await self._set_offload_resource_claim()
  329. self._filter_selected_gpus_in_workers(workers)
  330. self._set_workers_allocatable_resource(workers)
  331. self._set_model_parameters()
  332. sorted_workers = self._sort_workers_by_allocatable_vram(workers)
  333. candidates = await self._filter_in_sequence(sorted_workers)
  334. return candidates
  335. async def _filter_in_sequence(
  336. self, workers: List[Worker]
  337. ) -> List[ModelInstanceScheduleCandidate]:
  338. """
  339. Filter the workers with the full offloading claim.
  340. """
  341. candidates = []
  342. candidate_functions = [
  343. self.find_single_worker_single_gpu_full_offloading_candidates,
  344. self.find_single_worker_multi_gpu_full_offloading_candidates,
  345. self.find_multi_worker_multi_gpu_candidates,
  346. self.find_single_worker_partial_offloading_candidates,
  347. self.find_single_worker_cpu_candidates,
  348. ]
  349. overall_start_time = time.time()
  350. for candidate_func in candidate_functions:
  351. if self._should_skip_candidate_func(candidate_func):
  352. continue
  353. func_start_time = time.time()
  354. logger.info(
  355. f"Begin filter candidates with resource fit selector: "
  356. f"{candidate_func.__name__}, model {self._model.name or self._model.readable_source}",
  357. )
  358. candidates = await candidate_func(workers)
  359. func_latency = time.time() - func_start_time
  360. logger.info(
  361. f"Finished filter candidates with resource fit selector: "
  362. f"{candidate_func.__name__}, model {self._model.name or self._model.readable_source}, "
  363. f"latency: {func_latency:.2f}s, candidates: {len(candidates)}",
  364. )
  365. if candidates is not None and len(candidates) > 0:
  366. break
  367. if (not candidates or len(candidates) == 0) and len(
  368. self._event_collector.events
  369. ) == 0:
  370. msg = ListMessageBuilder(
  371. f"The model requires approximately {byte_to_gib(self._non_uma_single_gpu_full_offload_vram)} GiB VRAM and {byte_to_gib(self._non_uma_single_gpu_full_offload_ram)} GiB RAM."
  372. )
  373. if self._selected_gpu_ids:
  374. msg.append(
  375. "Selected GPUs do not meet the requirements to run the model."
  376. )
  377. else:
  378. msg.append(
  379. "Cannot find a suitable worker combination to run the model."
  380. )
  381. self._event_collector.add(
  382. EventLevelEnum.ERROR,
  383. EVENT_ACTION_PRE_CHECK,
  384. str(msg),
  385. reason=EVENT_REASON_INSUFFICIENT_RESOURCES,
  386. )
  387. overall_latency = time.time() - overall_start_time
  388. logger.info(
  389. f"Finished resource fit selector found {len(candidates)} candidates, model {self._model.name or self._model.readable_source}, "
  390. f"latency: {overall_latency:.2f}s",
  391. )
  392. self._set_messages(candidates)
  393. return candidates
  394. def _should_skip_candidate_func(self, candidate_func) -> bool:
  395. # Skip conditions for CPU offloading.
  396. if not self._model.cpu_offloading and candidate_func in [
  397. self.find_single_worker_partial_offloading_candidates,
  398. self.find_single_worker_cpu_candidates,
  399. ]:
  400. return True
  401. # Skip conditions for distributed inference.
  402. if (
  403. not self._model.distributed_inference_across_workers
  404. and candidate_func == self.find_multi_worker_multi_gpu_candidates
  405. ):
  406. return True
  407. # Skip conditions for image models.
  408. if (
  409. is_image_model(self._model)
  410. and candidate_func
  411. != self.find_single_worker_single_gpu_full_offloading_candidates
  412. ):
  413. # Only full offloading is supported for image models.
  414. return True
  415. if self._should_skip_for_params(candidate_func):
  416. return True
  417. if self._should_skip_for_manual_scheduling(candidate_func):
  418. return True
  419. if self._should_skip_for_allocatable_resource(candidate_func):
  420. return True
  421. def _should_skip_for_params(self, candidate_func) -> bool:
  422. # Skip conditions for param gpu layers.
  423. if self._param_gpu_layers or self._param_gpu_layers == 0:
  424. if (
  425. self._param_gpu_layers > 0
  426. and self._param_gpu_layers < self._total_layers
  427. and not self._model.cpu_offloading
  428. ):
  429. self._event_collector.add(
  430. EventLevelEnum.ERROR,
  431. EVENT_ACTION_PRE_CHECK,
  432. f"The parameter --gpu-layers {self._param_gpu_layers} requires CPU offloading. Please allow CPU offloading in the model configuration.",
  433. reason=EVENT_REASON_INVALID_BACKEND_PARAMETER,
  434. )
  435. return True
  436. if (
  437. self._param_gpu_layers == 0
  438. and candidate_func != self.find_single_worker_cpu_candidates
  439. ):
  440. # User specified full CPU offloading.
  441. return True
  442. if (
  443. self._param_gpu_layers != 0
  444. and candidate_func == self.find_single_worker_cpu_candidates
  445. ):
  446. # User specified GPU offloading.
  447. return True
  448. if self._param_gpu_layers != self._total_layers and candidate_func in [
  449. self.find_single_worker_single_gpu_full_offloading_candidates,
  450. self.find_single_worker_multi_gpu_full_offloading_candidates,
  451. ]:
  452. # User specified partial offloading.
  453. return True
  454. if self._param_gpu_layers == self._total_layers and candidate_func in [
  455. self.find_single_worker_partial_offloading_candidates,
  456. self.find_single_worker_cpu_candidates,
  457. ]:
  458. # User specified full offloading.
  459. return True
  460. # Skip conditions for param tensor_split.
  461. if self._param_tensor_split:
  462. if (
  463. candidate_func
  464. == self.find_single_worker_single_gpu_full_offloading_candidates
  465. ):
  466. return True
  467. def _should_skip_for_manual_scheduling(self, candidate_func) -> bool:
  468. # Skip conditions for manual scheduling.
  469. if self._selected_gpu_ids:
  470. if candidate_func == self.find_single_worker_cpu_candidates:
  471. return True
  472. worker_num = len(self._selected_gpu_ids_by_worker)
  473. if (
  474. worker_num > 1
  475. and candidate_func != self.find_multi_worker_multi_gpu_candidates
  476. ):
  477. return True
  478. if worker_num == 1:
  479. selected_worker_name = next(
  480. iter(self._selected_gpu_ids_by_worker.keys())
  481. )
  482. selected_gpu_count = len(
  483. self._selected_gpu_ids_by_worker.get(selected_worker_name)
  484. )
  485. if (
  486. candidate_func == self.find_multi_worker_multi_gpu_candidates
  487. or (
  488. selected_gpu_count > 1
  489. and candidate_func
  490. == self.find_single_worker_single_gpu_full_offloading_candidates
  491. )
  492. or (
  493. selected_gpu_count == 1
  494. and candidate_func
  495. == self.find_single_worker_multi_gpu_full_offloading_candidates
  496. )
  497. ):
  498. return True
  499. def _should_skip_for_allocatable_resource( # noqa: C901
  500. self, candidate_func
  501. ) -> bool:
  502. # Skip conditions for worker allocatable resources.
  503. if self._allocatable_worker_count == 0:
  504. self._event_collector.add(
  505. EventLevelEnum.ERROR,
  506. EVENT_ACTION_PRE_CHECK,
  507. "Insufficient resources for the model. Please check the available VRAM and RAM for the workers.",
  508. reason=EVENT_REASON_INSUFFICIENT_RESOURCES,
  509. )
  510. return True
  511. if self._allocatable_gpu_count == 0:
  512. if candidate_func == self.find_single_worker_cpu_candidates:
  513. return False
  514. if self._selected_gpu_ids:
  515. self._event_collector.add(
  516. EventLevelEnum.ERROR,
  517. EVENT_ACTION_PRE_CHECK,
  518. f"Selected GPUs need at least {byte_to_gib(self._non_uma_single_layer_vram)} GiB of available VRAM.",
  519. reason=EVENT_REASON_INSUFFICIENT_RESOURCES_GPU_SELECTED,
  520. )
  521. return True
  522. if self._gpu_count != 0:
  523. self._event_collector.add(
  524. EventLevelEnum.INFO,
  525. EVENT_ACTION_PRE_CHECK,
  526. "No available resources for workers. Please check workers's available VRAM.",
  527. reason=EVENT_REASON_INSUFFICIENT_RESOURCES,
  528. )
  529. return True
  530. if (
  531. self._allocatable_worker_count < 2
  532. and candidate_func == self.find_multi_worker_multi_gpu_candidates
  533. ):
  534. return True
  535. if self._allocatable_gpu_count < 2 and candidate_func in [
  536. self.find_single_worker_multi_gpu_full_offloading_candidates,
  537. self.find_multi_worker_multi_gpu_candidates,
  538. ]:
  539. return True
  540. if self._selected_gpu_ids_by_worker:
  541. for (
  542. worker_name,
  543. selected_gpu_ids,
  544. ) in self._selected_gpu_ids_by_worker.items():
  545. worker = self._worker_name_to_worker.get(worker_name)
  546. if not worker:
  547. self._event_collector.add(
  548. EventLevelEnum.ERROR,
  549. EVENT_ACTION_PRE_CHECK,
  550. f"Selected GPUs's worker {worker_name} not found in the workers list.",
  551. reason=EVENT_REASON_SELECTED_INVALID_GPU,
  552. )
  553. return True
  554. is_unified_memory = worker.status.memory.is_unified_memory
  555. worker_allocatable = self._workers_allocatable_resource.get(worker.id)
  556. if not worker_allocatable:
  557. continue
  558. for selected_gpu_id in selected_gpu_ids:
  559. valid, matched = parse_gpu_id(selected_gpu_id)
  560. if not valid:
  561. continue
  562. selected_gpu_index = safe_int(matched.get("gpu_index"))
  563. vram = worker_allocatable.vram.get(selected_gpu_index, 0)
  564. vram_claim = self._get_single_layer_vram(is_unified_memory, False)
  565. if vram < vram_claim:
  566. self._event_collector.add(
  567. EventLevelEnum.ERROR,
  568. EVENT_ACTION_PRE_CHECK,
  569. f"Selected GPU {selected_gpu_id} lacks enough VRAM. At least {byte_to_gib(vram_claim)} GiB is required.",
  570. reason=EVENT_REASON_INSUFFICIENT_RESOURCES,
  571. )
  572. return True
  573. async def find_single_worker_single_gpu_full_offloading_candidates(
  574. self, workers: List[Worker]
  575. ) -> List[ModelInstanceScheduleCandidate]:
  576. """
  577. Find single worker single gpu full offloading candidates for the model instance with workers.
  578. """
  579. logger.debug(f"Input {len(self._gpus_allocatable_vram)} candidates")
  580. if not self._gpus_allocatable_vram:
  581. return []
  582. final_candidates = []
  583. for gpu_allocatable_vram in self._gpus_allocatable_vram:
  584. result, should_continue = (
  585. await self._find_single_worker_single_gpu_full_offloading_candidates(
  586. gpu_allocatable_vram
  587. )
  588. )
  589. if result:
  590. final_candidates.extend(result)
  591. if not should_continue:
  592. # Skip subsequent gpus because they have less vram
  593. break
  594. self._advise_for_find_single_worker_single_gpu_full_offloading_candidates(
  595. final_candidates, self._gpus_allocatable_vram[0]
  596. )
  597. logger.debug(f"Qualified {len(final_candidates)} candidates")
  598. return final_candidates
  599. async def _find_single_worker_single_gpu_full_offloading_candidates(
  600. self, gpu_allocatable_vram: Tuple[int, int, int]
  601. ) -> Tuple[List[ModelInstanceScheduleCandidate], bool]:
  602. """
  603. Find single worker single gpu full offloading candidates for the model instance with gpu_allocatable_vram.
  604. Args:
  605. gpu_allocatable_vram (Tuple[int, int, int]): Tuple of worker_id, gpu_index, gpu_allocatable_vram.
  606. Returns:
  607. Tuple[List[ModelInstanceScheduleCandidate], bool]: List of model instance schedule candidates and should_continue flag.
  608. """
  609. candidates = []
  610. worker = self._worker_id_to_worker.get(gpu_allocatable_vram[0])
  611. is_unified_memory = worker.status.memory.is_unified_memory
  612. vram_claim = self._non_uma_single_gpu_full_offload_vram
  613. ram_claim = self._non_uma_single_gpu_full_offload_ram
  614. if is_unified_memory:
  615. vram_claim = self._uma_single_gpu_full_offload_vram
  616. ram_claim = self._uma_single_gpu_full_offload_ram
  617. allocatable_vram = gpu_allocatable_vram[2]
  618. allocatable_ram = self._workers_allocatable_resource.get(worker.id).ram
  619. if is_unified_memory:
  620. # For UMA, we need to remove the claim of gpu memory before check the memory.
  621. if vram_claim > allocatable_vram:
  622. return None, True
  623. if ram_claim > allocatable_ram - vram_claim:
  624. return None, False
  625. else:
  626. if vram_claim > allocatable_vram:
  627. return None, True
  628. if ram_claim > allocatable_ram:
  629. return None, False
  630. gpu_index = gpu_allocatable_vram[1]
  631. satisfied_candidate = self._create_candidate(
  632. worker,
  633. self._total_layers,
  634. ram_claim,
  635. {gpu_index: vram_claim},
  636. [gpu_index],
  637. )
  638. candidates.append(satisfied_candidate)
  639. logger.debug(
  640. f"Found intermediate candidate: {satisfied_candidate.to_log_string()}"
  641. )
  642. return candidates, True
  643. async def find_single_worker_multi_gpu_full_offloading_candidates(
  644. self, workers: List[Worker]
  645. ) -> List[ModelInstanceScheduleCandidate]:
  646. """Find single worker multi gpu full offloading candidates for the model instance with workers.
  647. Args:
  648. workers (List[Worker]): workers sorted by allocatable vram resource.
  649. Returns:
  650. List[ModelInstanceScheduleCandidate]: List of model instance schedule candidates.
  651. """
  652. candidates = []
  653. advise_use_worker = None
  654. for worker in workers:
  655. if not worker.status.gpu_devices:
  656. continue
  657. result = (
  658. await self._find_single_worker_multi_gpu_full_offloading_candidates(
  659. worker
  660. )
  661. )
  662. if result:
  663. candidates.extend(result)
  664. elif advise_use_worker is None:
  665. advise_use_worker = worker
  666. logger.debug(f"Found {len(candidates)} intermediate candidates")
  667. min_gpu_count = -1
  668. final_candidates = []
  669. if candidates:
  670. min_gpu_count = min(len(candidate.gpu_indexes) for candidate in candidates)
  671. final_candidates = [
  672. candidate
  673. for candidate in candidates
  674. if len(candidate.gpu_indexes) == min_gpu_count
  675. ]
  676. await self._advise_for_find_single_worker_multi_gpus_full_offloading_candidates(
  677. final_candidates, advise_use_worker
  678. )
  679. return final_candidates
  680. async def _find_single_worker_multi_gpu_full_offloading_candidates( # noqa: C901
  681. self, worker: Worker
  682. ) -> List[ModelInstanceScheduleCandidate]:
  683. """
  684. Find single worker multi gpu full offloading candidates for the model instance.
  685. requires: worker.status.gpu_devices is not None
  686. """
  687. is_unified_memory = worker.status.memory.is_unified_memory
  688. allocatable = self._get_worker_allocatable_resource(worker)
  689. # Pre filter
  690. logger.debug(f"Pre candidates filter for worker: {worker.name}")
  691. total_gpu = len(allocatable.vram.keys())
  692. if total_gpu < 2:
  693. return None
  694. if is_unified_memory:
  695. if allocatable.ram < self._uma_single_gpu_full_offload_ram:
  696. return None
  697. else:
  698. if allocatable.ram < self._non_uma_single_gpu_full_offload_ram:
  699. return None
  700. candidates = []
  701. begin_gpu_count = max(2, self._approximate_full_offload_required_gpu_number)
  702. for gpu_count in range(begin_gpu_count, total_gpu + 1):
  703. gpu_combinations, equal_vram = (
  704. self._generate_combinations_for_single_worker_multi_gpus(
  705. allocatable, worker, gpu_count
  706. )
  707. )
  708. if not gpu_combinations:
  709. continue
  710. logger.debug(
  711. f"Input {len(gpu_combinations)} intermediate candidates for combinations with {gpu_count} gpus for worker: {worker.name}"
  712. )
  713. for i, gpu_combination in enumerate(gpu_combinations):
  714. satisfied_candidate = await self._find_single_worker_multi_gpu_full_offloading_candidates_with_combinations(
  715. worker, allocatable, gpu_combination, is_unified_memory
  716. )
  717. if satisfied_candidate:
  718. candidates.append(satisfied_candidate)
  719. logger.debug(
  720. f"Found intermediate candidate: {satisfied_candidate.to_log_string()}"
  721. )
  722. if equal_vram and i > 0:
  723. # Skip subsequent combinations because they have same vram
  724. break
  725. # clear cache each count
  726. self._single_worker_multi_gpus_partial_offload_resource_claim_cache.clear()
  727. if candidates:
  728. break
  729. logger.debug(
  730. f"Qualified {len(candidates)} candidates for worker: {worker.name}"
  731. )
  732. return candidates
  733. async def _find_single_worker_multi_gpu_full_offloading_candidates_with_combinations(
  734. self,
  735. worker: Worker,
  736. allocatable: Allocatable,
  737. gpu_combination: Tuple[Tuple[int]],
  738. is_unified_memory: bool,
  739. ) -> ModelInstanceScheduleCandidate:
  740. # Check the resource claim should at least satisfy the minium resource claim(single gpu full offload).
  741. vram_sum = sum([value[-1] for value in gpu_combination])
  742. if (
  743. is_unified_memory and vram_sum < self._uma_single_gpu_full_offload_vram
  744. ) or (
  745. not is_unified_memory
  746. and vram_sum < self._non_uma_single_gpu_full_offload_vram
  747. ):
  748. # Skip subsequent combinations with same gpu count because they have less vram
  749. return None
  750. cache_key = self._cache_key_for_single_worker_multi_gpus_combination(
  751. gpu_combination
  752. )
  753. tensor_splitting = [value[-1] for value in gpu_combination]
  754. estimate = await self._get_or_calculate_model_resource_claim(
  755. self._single_worker_multi_gpus_partial_offload_resource_claim_cache,
  756. cache_key,
  757. tensor_splitting,
  758. )
  759. full_offload_item = estimate.items[-1]
  760. # ram
  761. ram_claim = full_offload_item.ram.nonuma
  762. if is_unified_memory:
  763. ram_claim = full_offload_item.ram.uma
  764. if ram_claim > allocatable.ram:
  765. return None
  766. # vram
  767. vram_claim_matched = True
  768. vram_claim = {}
  769. for gci in range(len(gpu_combination)):
  770. estimate_gpu_index = gci
  771. real_gpu_index = gpu_combination[gci][0]
  772. gpu_allocatable = allocatable.vram[real_gpu_index]
  773. single_gpu_vram_claim = full_offload_item.vrams[estimate_gpu_index].nonuma
  774. if is_unified_memory:
  775. single_gpu_vram_claim = full_offload_item.vrams[estimate_gpu_index].uma
  776. if single_gpu_vram_claim > gpu_allocatable:
  777. vram_claim_matched = False
  778. break
  779. vram_claim[real_gpu_index] = single_gpu_vram_claim
  780. if not vram_claim_matched:
  781. # stop to check other combinations have the same gpu count.
  782. return None
  783. gpu_indexes = [value[0] for value in gpu_combination]
  784. satisfied_candidate = self._create_candidate(
  785. worker,
  786. self._total_layers,
  787. ram_claim,
  788. vram_claim,
  789. gpu_indexes,
  790. tensor_splitting,
  791. )
  792. return satisfied_candidate
  793. async def find_single_worker_partial_offloading_candidates(
  794. self, workers: List[Worker]
  795. ) -> List[ModelInstanceScheduleCandidate]:
  796. """
  797. Find single worker single gpu partial offloading candidates for the model instance.
  798. """
  799. max_offload_layers = 0
  800. if self._param_gpu_layers:
  801. max_offload_layers = self._param_gpu_layers
  802. single_gpu_partial_offloading_candidates = []
  803. for worker in workers:
  804. if not worker.status.gpu_devices:
  805. continue
  806. result = (
  807. await self._find_single_worker_single_gpu_partial_offloading_candidates(
  808. worker,
  809. max_offload_layers,
  810. )
  811. )
  812. if (
  813. result
  814. and result.computed_resource_claim.offload_layers >= max_offload_layers
  815. ):
  816. max_offload_layers = result.computed_resource_claim.offload_layers
  817. single_gpu_partial_offloading_candidates.append(result)
  818. logger.debug(
  819. f"Found {len(single_gpu_partial_offloading_candidates)} intermediate candidates for single_worker_single_gpu_partial_offloading_candidates, max_offload_layers: {max_offload_layers}"
  820. )
  821. multi_gpu_partial_offloading_candidates = []
  822. multi_gpu_max_offload_layers = 0
  823. for worker in workers:
  824. if not worker.status.gpu_devices:
  825. continue
  826. results = (
  827. await self._find_single_worker_multi_gpu_partial_offloading_candidates(
  828. worker, max_offload_layers
  829. )
  830. )
  831. if results:
  832. if _get_max_offload_layers(results) >= max_offload_layers:
  833. multi_gpu_max_offload_layers = _get_max_offload_layers(results)
  834. max_offload_layers = multi_gpu_max_offload_layers
  835. multi_gpu_partial_offloading_candidates.extend(results)
  836. logger.debug(
  837. f"Found {len(multi_gpu_partial_offloading_candidates)} intermediate candidates for find_single_worker_multi_gpu_partial_offloading_candidates, max_offload_layers: {multi_gpu_max_offload_layers}"
  838. )
  839. intermediate_candidates = (
  840. single_gpu_partial_offloading_candidates
  841. + multi_gpu_partial_offloading_candidates
  842. )
  843. final_candidates = _filter_candidates_by_max_offload_layers(
  844. intermediate_candidates, max_offload_layers
  845. )
  846. await self._advise_for_find_single_worker_partial_offloading_candidates(
  847. final_candidates, workers[0].id
  848. )
  849. return final_candidates
  850. async def _find_single_worker_single_gpu_partial_offloading_candidates( # noqa: C901
  851. self, worker: Worker, current_max_offload_layers: int = 0
  852. ) -> ModelInstanceScheduleCandidate:
  853. """
  854. Find single worker single gpu partial offloading candidates for the model instance.
  855. requires: worker.status.gpu_devices is not None
  856. Args:
  857. worker (Worker): Worker instance.
  858. current_max_offload_layers (int): Current max offload layers, if user specified param gpu laysers, the value would be the same as self._param_gpu_layers.
  859. """
  860. logger.debug(
  861. f"Input {len(worker.status.gpu_devices)} candidates for worker {worker.name}, with resource fit selector find_single_worker_single_gpu_partial_offloading_candidates, input max_offload_layers: {current_max_offload_layers}"
  862. )
  863. logger.debug(f"Pre candidates filter for worker: {worker.name}")
  864. if self._param_tensor_split:
  865. return None
  866. if self._selected_gpu_ids_by_worker:
  867. if worker.name not in self._selected_gpu_ids_by_worker:
  868. return None
  869. elif len(self._selected_gpu_ids_by_worker.get(worker.name)) > 1:
  870. return None
  871. is_unified_memory = worker.status.memory.is_unified_memory
  872. estimate = self._partial_offload_resource_claim
  873. allocatable = self._get_worker_allocatable_resource(worker)
  874. worker_allocatable_vram = sum(allocatable.vram.values())
  875. worker_allocatable_ram = allocatable.ram
  876. if not self._can_offload_at_least_one_layer(
  877. worker_allocatable_vram, self._get_single_layer_vram(is_unified_memory)
  878. ):
  879. return None
  880. sorted_gpus_memory = sorted(
  881. allocatable.vram.items(), key=lambda item: item[1], reverse=True
  882. )
  883. (
  884. vram_claim_for_current_max_offload_layers,
  885. ram_claim_for_current_max_offload_layers,
  886. ) = self._get_claim_with_layers(current_max_offload_layers, is_unified_memory)
  887. if worker_allocatable_vram < vram_claim_for_current_max_offload_layers:
  888. return None
  889. # NB(thxCode): There is a special case that when the worker is unified memory(usually, we assume it is an Apple macOS worker),
  890. # the true estimated usage includes both the vram and ram claims.
  891. # Since the `worker_allocatable_ram` is guaranteed to reflect the remaining unified memory of the worker,
  892. # we can check if the Apple macOS worker can serve the estimated usage or not.
  893. if is_unified_memory:
  894. if (
  895. worker_allocatable_ram
  896. < vram_claim_for_current_max_offload_layers
  897. + ram_claim_for_current_max_offload_layers
  898. ):
  899. return None
  900. # User specified gpu layers
  901. if self._param_gpu_layers:
  902. if worker_allocatable_ram < ram_claim_for_current_max_offload_layers:
  903. return None
  904. for gpu in sorted_gpus_memory:
  905. gpu_index = gpu[0]
  906. gpu_memory = gpu[1]
  907. if gpu_memory > vram_claim_for_current_max_offload_layers:
  908. satisfied_candidate = self._create_candidate(
  909. worker,
  910. current_max_offload_layers,
  911. ram_claim_for_current_max_offload_layers,
  912. {gpu_index: vram_claim_for_current_max_offload_layers},
  913. [gpu_index],
  914. )
  915. logger.debug(
  916. f"Found intermediate candidate: {satisfied_candidate.to_log_string()}"
  917. )
  918. return satisfied_candidate
  919. else:
  920. # Skip subsequent gpus because they have less vram
  921. break
  922. return None
  923. # Normal case, without user specified gpu layers
  924. arr = []
  925. estimate_arr = []
  926. for memory in estimate.items:
  927. if memory.fullOffloaded:
  928. continue
  929. if (
  930. current_max_offload_layers
  931. and memory.offloadLayers < current_max_offload_layers
  932. ):
  933. continue
  934. vram_claim = memory.vrams[0].nonuma
  935. ram_claim = memory.ram.nonuma
  936. if is_unified_memory:
  937. vram_claim = memory.vrams[0].uma
  938. ram_claim = memory.ram.uma
  939. arr.append(vram_claim)
  940. estimate_arr.append(
  941. {
  942. "vram": vram_claim,
  943. "ram": ram_claim,
  944. "offload_layers": memory.offloadLayers,
  945. }
  946. )
  947. for gpu in sorted_gpus_memory:
  948. gpu_index = gpu[0]
  949. gpu_memory = gpu[1]
  950. if self._selected_gpu_ids:
  951. valid, matched = parse_gpu_id(self._selected_gpu_ids[0])
  952. is_selected_gpu = valid and matched.get("gpu_index") == str(gpu_index)
  953. if not is_selected_gpu:
  954. continue
  955. if not self._can_offload_at_least_one_layer(
  956. gpu_memory, self._get_single_layer_vram(is_unified_memory)
  957. ):
  958. continue
  959. index = binary_search(arr, gpu_memory)
  960. if index == -1:
  961. # Skip subsequent gpus because they have less vram
  962. break
  963. if (
  964. is_unified_memory
  965. # For UMA, we need to remove the claim of gpu memory before check if the memory.
  966. and (estimate_arr[index]["ram"] > allocatable.ram - arr[index])
  967. or (estimate_arr[index]["ram"] > allocatable.ram)
  968. ):
  969. continue
  970. offload_layers = estimate_arr[index]["offload_layers"]
  971. if offload_layers >= current_max_offload_layers:
  972. current_max_offload_layers = offload_layers
  973. satisfied_candidate = self._create_candidate(
  974. worker,
  975. offload_layers,
  976. estimate_arr[index]["ram"],
  977. {gpu_index: estimate_arr[index]["vram"]},
  978. [gpu_index],
  979. )
  980. logger.debug(
  981. f"Found intermediate candidate: {satisfied_candidate.to_log_string()}"
  982. )
  983. return satisfied_candidate
  984. else:
  985. # Skip subsequent gpus because they have less vram
  986. break
  987. logger.debug(f"Qualified 0 candidates for worker: {worker.name}")
  988. return None
  989. async def _find_single_worker_multi_gpu_partial_offloading_candidates( # noqa: C901
  990. self, worker: Worker, current_max_offload_layers: int = 0
  991. ) -> List[ModelInstanceScheduleCandidate]:
  992. """
  993. Find single worker multi gpu partial offloading candidates for the model instance.
  994. requires: worker.status.gpu_devices is not None
  995. """
  996. logger.debug(
  997. f"Input {len(worker.status.gpu_devices)} candidates for worker {worker.name}, with resource fit selector find_single_worker_multi_gpu_partial_offloading_candidates, input max_offload_layers: {current_max_offload_layers}"
  998. )
  999. logger.debug(f"Pre candidates filter for worker: {worker.name}")
  1000. total_gpu = len(worker.status.gpu_devices) if worker.status.gpu_devices else 0
  1001. if total_gpu < 2:
  1002. return []
  1003. if self._selected_gpu_ids_by_worker:
  1004. if worker.name not in self._selected_gpu_ids_by_worker:
  1005. return []
  1006. elif len(self._selected_gpu_ids_by_worker.get(worker.name)) < 2:
  1007. return []
  1008. is_unified_memory = worker.status.memory.is_unified_memory
  1009. allocatable = self._get_worker_allocatable_resource(worker)
  1010. worker_allocatable_vram = sum(allocatable.vram.values())
  1011. if not self._can_offload_at_least_one_layer(
  1012. worker_allocatable_vram,
  1013. self._get_single_layer_vram(is_unified_memory),
  1014. ):
  1015. return []
  1016. (
  1017. vram_claim_for_current_max_offload_layers,
  1018. ram_claim_for_current_max_offload_layers,
  1019. ) = self._get_claim_with_layers(current_max_offload_layers, is_unified_memory)
  1020. if worker_allocatable_vram < vram_claim_for_current_max_offload_layers:
  1021. return []
  1022. candidates: List[ModelInstanceScheduleCandidate] = []
  1023. previous_max_offload_layers = current_max_offload_layers
  1024. for gpu_count in range(2, total_gpu + 1):
  1025. gpu_combinations, equal_vram = (
  1026. self._generate_combinations_for_single_worker_multi_gpus(
  1027. allocatable,
  1028. worker,
  1029. gpu_count,
  1030. vram_claim_for_current_max_offload_layers,
  1031. )
  1032. )
  1033. if not gpu_combinations:
  1034. continue
  1035. logger.debug(
  1036. f"Input {len(gpu_combinations)} intermediate candidates for combinations with {gpu_count} gpus for worker: {worker.name}, max_offload_layers: {current_max_offload_layers}"
  1037. )
  1038. for i, gpu_combination in enumerate(gpu_combinations):
  1039. sum_vram = sum([value[-1] for value in gpu_combination])
  1040. if sum_vram < vram_claim_for_current_max_offload_layers:
  1041. # Skip subsequent combinations with same gpu count because they have less vram
  1042. break
  1043. satisfied_candidate = await self._find_single_worker_multi_gpu_partial_offloading_candidates_with_combination(
  1044. worker, gpu_combination, current_max_offload_layers
  1045. )
  1046. if satisfied_candidate and (
  1047. satisfied_candidate.computed_resource_claim.offload_layers
  1048. >= current_max_offload_layers
  1049. ):
  1050. current_max_offload_layers = (
  1051. satisfied_candidate.computed_resource_claim.offload_layers
  1052. )
  1053. vram_claim_for_current_max_offload_layers, _ = (
  1054. self._get_claim_with_layers(
  1055. current_max_offload_layers, is_unified_memory
  1056. )
  1057. )
  1058. candidates.append(satisfied_candidate)
  1059. logger.debug(
  1060. f"Found intermediate candidate: {satisfied_candidate.to_log_string()}"
  1061. )
  1062. if equal_vram and i > 0:
  1063. # Skip subsequent combinations because they have same vram
  1064. break
  1065. if (
  1066. previous_max_offload_layers
  1067. and current_max_offload_layers <= previous_max_offload_layers
  1068. ):
  1069. # Skip subsequent gpu count because they need more gpus.
  1070. break
  1071. previous_max_offload_layers = current_max_offload_layers
  1072. if not candidates:
  1073. logger.debug(f"Qualified 0 candidates for worker: {worker.name}")
  1074. return None
  1075. max_offload_candidates = _get_max_offload_layers_candidates(candidates)
  1076. logger.debug(
  1077. f"Found {len(candidates)} intermediate candidates for worker {worker.name}, max_offload_layers of them: {current_max_offload_layers}"
  1078. )
  1079. min_gpu_count = min(
  1080. len(candidate.gpu_indexes) for candidate in max_offload_candidates
  1081. )
  1082. final_candiates = [
  1083. candidate
  1084. for candidate in max_offload_candidates
  1085. if len(candidate.gpu_indexes) == min_gpu_count
  1086. ]
  1087. logger.debug(
  1088. f"Qualified {len(final_candiates)} candidates for worker: {worker.name}"
  1089. )
  1090. return final_candiates
  1091. async def _find_single_worker_multi_gpu_partial_offloading_candidates_with_combination( # noqa: C901
  1092. self,
  1093. worker: Worker,
  1094. gpu_combination: Tuple[Tuple[int]],
  1095. max_offload_layers: int = 0,
  1096. ) -> ModelInstanceScheduleCandidate:
  1097. """
  1098. Find max offload layers for gpu combination.
  1099. Args:
  1100. worker (Worker): The worker instance containing GPU information.
  1101. gpu_combination (List[Tuple[int]]): A list of tuples, each containing GPU index and it's vram (e.g., [(0, 106), (1, 98)])
  1102. max_offload_layers (int): The current maximum offload layers, only consider candiate that offload layers equal or greater then it,
  1103. if user specified param gpu layers, the value of max_offload_layers is the same as self._param_gpu_layers.
  1104. """
  1105. is_unified_memory = worker.status.memory.is_unified_memory
  1106. allocatable = self._get_worker_allocatable_resource(worker)
  1107. tensor_splitting = [value[-1] for value in gpu_combination]
  1108. result = await self._calculate_model_resource_claim(
  1109. tensor_split=tensor_splitting,
  1110. )
  1111. estimate = result.resource_claim_estimate
  1112. gpu_indexes_mapping = [value[0] for value in gpu_combination]
  1113. final_offload_layers = -1
  1114. final_ram_claim = -1
  1115. final_gpu_claims = {}
  1116. final_gpu_indexes = []
  1117. estimate_items = estimate.items[::-1]
  1118. # User specified gpu layers
  1119. if self._param_gpu_layers:
  1120. if len(estimate.items) - 1 < self._param_gpu_layers:
  1121. logger.error(
  1122. f"Invalid param gpu layers: {self._param_gpu_layers}, max layers is {len(estimate.items) - 1}, model {self._model.name or self._model.readable_source}"
  1123. )
  1124. return None
  1125. estimate_items = estimate.items[
  1126. self._param_gpu_layers : self._param_gpu_layers + 1
  1127. ]
  1128. for item in estimate_items:
  1129. if item.fullOffloaded:
  1130. continue
  1131. if item.offloadLayers < max_offload_layers:
  1132. break
  1133. if (
  1134. is_unified_memory
  1135. and (item.ram.uma > (allocatable.ram - sum(g.uma for g in item.vrams)))
  1136. or (item.ram.nonuma > allocatable.ram)
  1137. ):
  1138. continue
  1139. vram_not_matched = False
  1140. gpu_indexes = []
  1141. gpu_claims = {}
  1142. for vram_index, vram_claim in enumerate(item.vrams):
  1143. real_gpu_index = gpu_indexes_mapping[vram_index]
  1144. if (
  1145. is_unified_memory
  1146. and vram_claim.uma > allocatable.vram[real_gpu_index]
  1147. ) or (vram_claim.nonuma > allocatable.vram[real_gpu_index]):
  1148. vram_not_matched = True
  1149. break
  1150. gpu_indexes.append(real_gpu_index)
  1151. gpu_claims[real_gpu_index] = vram_claim.nonuma
  1152. if is_unified_memory:
  1153. gpu_claims[real_gpu_index] = vram_claim.uma
  1154. if vram_not_matched:
  1155. continue
  1156. final_offload_layers = item.offloadLayers
  1157. final_gpu_claims = gpu_claims
  1158. final_gpu_indexes = gpu_indexes
  1159. final_ram_claim = item.ram.nonuma
  1160. if is_unified_memory:
  1161. final_ram_claim = item.ram.uma
  1162. break
  1163. if final_offload_layers == -1:
  1164. return None
  1165. return self._create_candidate(
  1166. worker,
  1167. final_offload_layers,
  1168. final_ram_claim,
  1169. final_gpu_claims,
  1170. final_gpu_indexes,
  1171. tensor_splitting,
  1172. )
  1173. async def find_single_worker_cpu_candidates(
  1174. self, workers: List[Worker]
  1175. ) -> List[ModelInstanceScheduleCandidate]:
  1176. """
  1177. Find single worker without offloading candidates for the model instance with workers.
  1178. """
  1179. logger.debug(f"Input {len(workers)} candidates")
  1180. final_candidates = []
  1181. sorted_workers = self._sort_workers_by_allocatable_ram(workers)
  1182. for worker in sorted_workers:
  1183. result = await self._find_single_worker_with_cpu_candidates(worker)
  1184. if result:
  1185. final_candidates.extend(result)
  1186. else:
  1187. break
  1188. # Skip subsequent workers because they have less ram
  1189. self._advise_for_find_single_worker_cpu_candidates(
  1190. final_candidates, sorted_workers[0].id
  1191. )
  1192. logger.debug(f"Qualified {len(final_candidates)} candidates")
  1193. return final_candidates
  1194. async def _find_single_worker_with_cpu_candidates(
  1195. self, worker: Worker
  1196. ) -> List[ModelInstanceScheduleCandidate]:
  1197. """
  1198. Find single worker without offloading candidates for the model instance.
  1199. """
  1200. allocatable = self._get_worker_allocatable_resource(worker)
  1201. is_unified_memory = worker.status.memory.is_unified_memory
  1202. estimate = self._disable_offload_result_claim
  1203. ram_claim = estimate.items[0].ram.nonuma
  1204. if is_unified_memory:
  1205. ram_claim = estimate.items[0].ram.uma
  1206. if ram_claim > allocatable.ram:
  1207. return []
  1208. satisfied_candidate = self._create_candidate(worker, 0, ram_claim, None, None)
  1209. logger.debug(
  1210. f"Found intermediate candidate: {satisfied_candidate.to_log_string()}"
  1211. )
  1212. return [satisfied_candidate]
  1213. async def find_multi_worker_multi_gpu_candidates(
  1214. self, workers: List[Worker]
  1215. ) -> List[ModelInstanceScheduleCandidate]:
  1216. combinations = self._generate_combinations_for_worker_with_rpcs(workers)
  1217. if combinations is None:
  1218. return []
  1219. combinations_count = sum(len(value) for value in combinations.values())
  1220. combinations_sorted_keys = sorted(combinations.keys())
  1221. logger.debug(
  1222. f"Input {combinations_count} intermediate candidates from combinations: 1 main + rpcs({combinations_sorted_keys[0] - 1} to {combinations_sorted_keys[-1] - 1})"
  1223. )
  1224. candidates = []
  1225. is_full_offloading = False
  1226. max_offload_layers = -1
  1227. for count in combinations_sorted_keys:
  1228. logger.debug(
  1229. f"Input {len(combinations[count])} intermediate candidates for combinations: 1 main + {count - 1} rpcs"
  1230. )
  1231. # Pre filter.
  1232. sum_first_vram = sum([value[-1] for value in combinations[count][0]])
  1233. begin_layers, end_layers = (
  1234. self._find_multi_worker_multi_gpu_candidates_determine_layer_range(
  1235. sum_first_vram, max_offload_layers
  1236. )
  1237. )
  1238. # Skip since the pre filter can't find begin layers for this combinations.
  1239. if begin_layers == -1:
  1240. continue
  1241. # Checking combinations.
  1242. logger.debug(
  1243. f"Checking combinations: 1 main + {count - 1} rpcs, begin_layers: {begin_layers}, end_layers: {end_layers}"
  1244. )
  1245. for combination in combinations[count]:
  1246. satisfied_candidate = (
  1247. await self._find_multi_worker_multi_gpu_candidate_with_combination(
  1248. combination,
  1249. begin_layers,
  1250. end_layers,
  1251. )
  1252. )
  1253. if not satisfied_candidate:
  1254. continue
  1255. if not is_full_offloading:
  1256. is_full_offloading = (
  1257. satisfied_candidate.computed_resource_claim.offload_layers
  1258. == self._total_layers
  1259. )
  1260. if (
  1261. satisfied_candidate.computed_resource_claim.offload_layers
  1262. >= max_offload_layers
  1263. ):
  1264. max_offload_layers = (
  1265. satisfied_candidate.computed_resource_claim.offload_layers
  1266. )
  1267. candidates.append(satisfied_candidate)
  1268. logger.debug(
  1269. f"Found intermediate candidate: {satisfied_candidate.to_log_string()}"
  1270. )
  1271. # Clean cache after each count.
  1272. self._multi_workers_multi_gpus_partial_offload_resource_claim_cache.clear()
  1273. if self._param_gpu_layers and len(candidates) > 0:
  1274. # Skip subsequent counts because they use more rpc servers to offload same layers.
  1275. break
  1276. if is_full_offloading:
  1277. # Skip subsequent counts because they use more rpc servers.
  1278. break
  1279. final_candidates = (
  1280. self._find_multi_worker_multi_gpu_candidates_finalize_candidates(
  1281. candidates, max_offload_layers
  1282. )
  1283. )
  1284. return final_candidates
  1285. def _find_multi_worker_multi_gpu_candidates_finalize_candidates(
  1286. self, candidates, max_offload_layers
  1287. ):
  1288. logger.debug(f"Found {len(candidates)} intermediate candidates")
  1289. final_candidates = []
  1290. if candidates:
  1291. if (
  1292. not self._model.cpu_offloading
  1293. and max_offload_layers != self._total_layers
  1294. ):
  1295. final_candidates = []
  1296. else:
  1297. final_candidates = _filter_candidates_by_max_offload_layers(
  1298. candidates, max_offload_layers
  1299. )
  1300. logger.debug(
  1301. f"Qualified candidates: {len(final_candidates)}, max_offload_layers: {max_offload_layers}"
  1302. )
  1303. return final_candidates
  1304. def _find_multi_worker_multi_gpu_candidates_determine_layer_range(
  1305. self, sum_first_vram, max_offload_layers
  1306. ) -> Tuple[int, int]:
  1307. """
  1308. Determine the layer range for multi worker multi gpu candidates.
  1309. """
  1310. begin_layers = -1
  1311. end_layers = -1
  1312. if self._param_gpu_layers:
  1313. vram_claim_with_with_layers, _ = self._get_claim_with_layers(
  1314. self._param_gpu_layers, True
  1315. )
  1316. if sum_first_vram < vram_claim_with_with_layers:
  1317. return begin_layers, end_layers
  1318. begin_layers = self._param_gpu_layers
  1319. end_layers = self._param_gpu_layers
  1320. else:
  1321. for index, item in enumerate(
  1322. reversed(self._partial_offload_resource_claim.items)
  1323. ):
  1324. if not self._model.cpu_offloading:
  1325. if index > 0:
  1326. break
  1327. if sum_first_vram > item.vrams[0].uma:
  1328. begin_layers = self._total_layers
  1329. end_layers = self._total_layers - 1
  1330. else:
  1331. if item.offloadLayers < max_offload_layers:
  1332. continue
  1333. if sum_first_vram > item.vrams[0].uma:
  1334. begin_layers = item.offloadLayers
  1335. end_layers = 0
  1336. break
  1337. return begin_layers, end_layers
  1338. async def _find_multi_worker_multi_gpu_candidate_with_combination( # noqa: C901
  1339. self,
  1340. combination,
  1341. begin_layers,
  1342. end_layers,
  1343. ) -> ModelInstanceScheduleCandidate:
  1344. """
  1345. find multi worker multi gpu candidate with combination.
  1346. combination example: ( ($worker_id, $worker_allocatable_vram), ($worker_id, $gpu_index, $gpu_allocatable), ($worker_id, $gpu_index, $gpu_allocatable) )
  1347. """
  1348. main_worker_id = combination[0][0]
  1349. main_worker = self._worker_id_to_worker.get(main_worker_id)
  1350. main_worker_is_unified_memory = main_worker.status.memory.is_unified_memory
  1351. main_worker_gpus = [
  1352. [value[1], value[2]]
  1353. for value in self._gpus_allocatable_vram
  1354. if value[0] == main_worker_id
  1355. ]
  1356. main_worker_gpu_indexes = [value[0] for value in main_worker_gpus]
  1357. flag_tensor_spliting = []
  1358. flag_rpc_servers = []
  1359. for i in range(1, len(combination)):
  1360. c_worker_id = combination[i][0]
  1361. flag_rpc_servers.append(
  1362. f"{self._worker_id_to_worker.get(c_worker_id).name}:{50052 + i}"
  1363. )
  1364. flag_tensor_spliting.append(combination[i][2])
  1365. flag_tensor_spliting.extend([value[1] for value in main_worker_gpus])
  1366. cache_key = self._cache_key_for_main_with_rpcs_combination(combination)
  1367. estimate_result: Estimate = await self._get_or_calculate_model_resource_claim( # type: ignore
  1368. self._multi_workers_multi_gpus_partial_offload_resource_claim_cache,
  1369. cache_key,
  1370. flag_tensor_spliting,
  1371. flag_rpc_servers,
  1372. )
  1373. satisfied_candidate = None
  1374. estimate_items: List[MemoryEstimate] = sorted(
  1375. estimate_result.items,
  1376. key=lambda x: x.offloadLayers,
  1377. reverse=True,
  1378. )
  1379. for e in estimate_items:
  1380. if e.offloadLayers > begin_layers or e.offloadLayers < end_layers:
  1381. continue
  1382. # main worker checking.
  1383. main_worker_ram_claim = e.ram.nonuma
  1384. if main_worker_is_unified_memory:
  1385. main_worker_ram_claim = e.ram.uma
  1386. if (
  1387. main_worker_ram_claim
  1388. > self._workers_allocatable_resource.get(main_worker_id).ram
  1389. ):
  1390. continue
  1391. main_worker_vram_claim = {}
  1392. main_worker_satisfied = False
  1393. for (
  1394. main_worker_gpu_index,
  1395. main_worker_gpu_allocatable,
  1396. ) in self._workers_allocatable_resource.get(main_worker_id).vram.items():
  1397. if main_worker_gpu_index not in main_worker_gpu_indexes:
  1398. continue
  1399. # vrams: [rpc_server1, rpc_server2, ..., main_worker]
  1400. position = len(flag_rpc_servers) + main_worker_gpu_indexes.index(
  1401. main_worker_gpu_index
  1402. )
  1403. claim = e.vrams[position].nonuma
  1404. if main_worker_is_unified_memory:
  1405. claim = e.vrams[position].uma
  1406. if claim > main_worker_gpu_allocatable:
  1407. main_worker_satisfied = False
  1408. break
  1409. main_worker_satisfied = True
  1410. main_worker_vram_claim[main_worker_gpu_index] = claim
  1411. if not main_worker_satisfied:
  1412. continue
  1413. subordinate_workers = self._check_combination_rpcs(
  1414. combination, e, self._total_layers
  1415. )
  1416. if not subordinate_workers:
  1417. continue
  1418. satisfied_candidate = self._create_candidate(
  1419. main_worker,
  1420. e.offloadLayers,
  1421. main_worker_ram_claim,
  1422. main_worker_vram_claim,
  1423. main_worker_gpu_indexes,
  1424. flag_tensor_spliting,
  1425. subordinate_workers,
  1426. )
  1427. break
  1428. return satisfied_candidate
  1429. def _generate_workers_and_gpus_allocatable_resources(
  1430. self, workers: List[Worker]
  1431. ) -> Tuple[
  1432. Dict[str, Allocatable], List[Tuple[int, int]], List[Tuple[int, int, int]]
  1433. ]:
  1434. """
  1435. Generate allocatable resources for workers and their GPUs.
  1436. Args:
  1437. workers (List[Worker]): List of workers.
  1438. Returns:
  1439. Tuple containing workers' allocatable resources, workers' allocatable VRAM, and GPUs' allocatable VRAM.
  1440. """
  1441. workers_allocatable = {}
  1442. workers_allocatable_vram = []
  1443. workers_gpus_allocatable_vram = []
  1444. for worker in workers:
  1445. result = self._get_worker_allocatable_resource(worker)
  1446. workers_allocatable[worker.id] = result
  1447. if len(result.vram.keys()) == 0:
  1448. continue
  1449. worker_allocatable_vram = sum(result.vram.values())
  1450. if worker_allocatable_vram > 0:
  1451. workers_allocatable_vram.append([worker.id, worker_allocatable_vram])
  1452. for gpu_device in worker.status.gpu_devices:
  1453. if gpu_device.index is None:
  1454. logger.warning(
  1455. f"gpu index is not found for {worker.name} {gpu_device.name}"
  1456. )
  1457. gpu_allocatable_vram = result.vram.get(gpu_device.index)
  1458. if gpu_allocatable_vram is not None and gpu_allocatable_vram > 0:
  1459. workers_gpus_allocatable_vram.append(
  1460. [worker.id, gpu_device.index, gpu_allocatable_vram]
  1461. )
  1462. return (
  1463. workers_allocatable,
  1464. workers_allocatable_vram,
  1465. workers_gpus_allocatable_vram,
  1466. )
  1467. def _sort_workers_by_allocatable_vram(self, workers: List[Worker]) -> List[Worker]:
  1468. worker_vram_totals = {
  1469. worker.id: sum(self._workers_allocatable_resource[worker.id].vram.values())
  1470. for worker in workers
  1471. }
  1472. sorted_workers = sorted(
  1473. workers, key=lambda worker: worker_vram_totals[worker.id], reverse=True
  1474. )
  1475. return sorted_workers
  1476. def _sort_workers_by_allocatable_ram(self, workers: List[Worker]) -> List[Worker]:
  1477. sorted_workers = sorted(
  1478. workers,
  1479. key=lambda worker: self._workers_allocatable_resource[worker.id].ram,
  1480. reverse=True,
  1481. )
  1482. return sorted_workers
  1483. def _can_offload_at_least_one_layer(
  1484. self, allocatable_vram: int, single_layer_vram: int
  1485. ) -> bool:
  1486. """Check if there is enough VRAM to offload at least one layer."""
  1487. return allocatable_vram >= single_layer_vram
  1488. def _get_single_layer_vram(self, is_unified_memory: bool, rpc: bool = False) -> int:
  1489. """Get the VRAM required for a single layer based on memory type and RPC."""
  1490. if rpc:
  1491. return (
  1492. self._rpc_uma_single_layer_vram
  1493. if is_unified_memory
  1494. else self._rpc_non_uma_single_layer_vram
  1495. )
  1496. return (
  1497. self._uma_single_layer_vram
  1498. if is_unified_memory
  1499. else self._non_uma_single_layer_vram
  1500. )
  1501. def _generate_combinations_given_tensor_split(
  1502. self,
  1503. ) -> dict[Tuple[Tuple[int]]]:
  1504. """
  1505. Generate gpu combinations given tensor split.
  1506. Example:
  1507. Given: tensor_split = "1,5,8"
  1508. Output: [((0, 1), (1, 5), (2, 8))]
  1509. """
  1510. tensor_splits = [int(x) for x in self._param_tensor_split.split(",")]
  1511. n_split = len(tensor_splits)
  1512. split_by_index = []
  1513. for i in range(n_split):
  1514. split_by_index.append((i, tensor_splits[i]))
  1515. gpu_combinations = list(itertools.combinations(split_by_index, n_split))
  1516. return gpu_combinations
  1517. def _generate_combinations_for_single_worker_multi_gpus(
  1518. self,
  1519. allocatable: Allocatable,
  1520. worker: Worker,
  1521. gpu_count: int,
  1522. at_least_vram: Optional[int] = None,
  1523. ) -> Tuple[List[Tuple[Tuple[int]]], bool]:
  1524. if self._param_tensor_split:
  1525. # use specified tensor split when the param is set.
  1526. total_gpu = len(worker.status.gpu_devices) or len(self._selected_gpu_ids)
  1527. if total_gpu < len(self._param_tensor_split.split(",")):
  1528. return None, False
  1529. gpu_combinations = self._generate_combinations_given_tensor_split()
  1530. return gpu_combinations, False
  1531. if self._selected_gpu_ids_by_worker.get(worker.name):
  1532. if len(self._selected_gpu_ids) != gpu_count:
  1533. return None, False
  1534. select_gpu_combinations = self._generate_combinations_with_selected_gpus(
  1535. worker
  1536. )
  1537. gpu_combinations = [select_gpu_combinations]
  1538. return gpu_combinations, False
  1539. filterd_gpus = []
  1540. equal_vram = True
  1541. for gpu_index, vram in allocatable.vram.items():
  1542. if not self._can_offload_at_least_one_layer(
  1543. vram,
  1544. self._get_single_layer_vram(worker.status.memory.is_unified_memory),
  1545. ):
  1546. continue
  1547. filterd_gpus.append((gpu_index, vram))
  1548. if vram != list(allocatable.vram.values())[0]:
  1549. equal_vram = False
  1550. total_gpu = len(filterd_gpus)
  1551. sorted_gpus_memory = sorted(
  1552. filterd_gpus, key=lambda item: item[1], reverse=True
  1553. )
  1554. if at_least_vram:
  1555. if (
  1556. sum([value[1] for value in sorted_gpus_memory[:gpu_count]])
  1557. < at_least_vram
  1558. ):
  1559. return None, False
  1560. gpu_combinations = list(itertools.combinations(sorted_gpus_memory, gpu_count))
  1561. # gpu_combinations examples:
  1562. # (($gpu_index, $gpu_allocatable), ($gpu_index, $gpu_allocatable))
  1563. return gpu_combinations, equal_vram
  1564. def _generate_combinations_with_selected_gpus(
  1565. self, worker: Worker
  1566. ) -> dict[Tuple[Tuple[int]]]:
  1567. gpu_combinations = []
  1568. selected_gpu_ids = self._selected_gpu_ids_by_worker.get(worker.name)
  1569. allocatable = self._get_worker_allocatable_resource(worker)
  1570. for selected_gpu_id in selected_gpu_ids:
  1571. valid, matched = parse_gpu_id(selected_gpu_id)
  1572. if not valid:
  1573. continue
  1574. selected_gpu_index = safe_int(matched.get("gpu_index"))
  1575. vram = allocatable.vram.get(selected_gpu_index, 0)
  1576. gpu_combinations.append((selected_gpu_index, vram))
  1577. sorted_gpu_combinations = sorted(
  1578. gpu_combinations, key=lambda item: item[1], reverse=True
  1579. )
  1580. return sorted_gpu_combinations
  1581. def _generate_combinations_for_worker_with_rpcs(
  1582. self,
  1583. workers: List[Worker],
  1584. ) -> Dict:
  1585. combinations = {}
  1586. if self._selected_gpu_ids:
  1587. combinations = (
  1588. self._generate_combinations_for_worker_with_rpcs_with_selected_gpu_ids(
  1589. workers,
  1590. )
  1591. )
  1592. else:
  1593. combinations = (
  1594. self._generate_combinations_for_worker_with_rpcs_without_selected_gpu_ids()
  1595. )
  1596. # combinations examples:
  1597. # [( ($worker_id, $worker_allocatable_vram), ($worker_id, $gpu_index, $gpu_allocatable), ($worker_id, $gpu_index, $gpu_allocatable) )]
  1598. return combinations
  1599. def _generate_combinations_for_worker_with_rpcs_with_selected_gpu_ids( # noqa: C901
  1600. self,
  1601. workers: List[Worker],
  1602. ) -> Dict:
  1603. if not self._selected_gpu_ids:
  1604. return None
  1605. selected_workers_allocatable_vram = []
  1606. worker_name_id_map = {worker.name: worker.id for worker in workers}
  1607. worker_names = list(self._selected_gpu_ids_by_worker.keys())
  1608. worker_ids = [worker.id for worker in workers if worker.name in worker_names]
  1609. for w in self._workers_allocatable_vram:
  1610. if w[0] in worker_ids:
  1611. selected_workers_allocatable_vram.append(w)
  1612. selected_workers_gpus_allocatable_vram = []
  1613. for selected_gpu_id in self._selected_gpu_ids:
  1614. valid, matched = parse_gpu_id(selected_gpu_id)
  1615. if not valid:
  1616. continue
  1617. selected_worker_name = matched.get("worker_name")
  1618. selected_gpu_index = matched.get("gpu_index")
  1619. selected_worker_id = worker_name_id_map.get(selected_worker_name)
  1620. for w in self._gpus_allocatable_vram:
  1621. if w[0] == selected_worker_id and str(w[1]) == selected_gpu_index:
  1622. selected_workers_gpus_allocatable_vram.append(w)
  1623. sorted_workers, sorted_gpus = _sort_and_group_worker_gpu_vram(
  1624. selected_workers_allocatable_vram,
  1625. selected_workers_gpus_allocatable_vram,
  1626. )
  1627. main_worker_vram, main_worker = self._get_main_for_combination(
  1628. sorted_workers,
  1629. )
  1630. if not main_worker_vram:
  1631. self._event_collector.add(
  1632. EventLevelEnum.ERROR,
  1633. EVENT_ACTION_DISTRIBUTED_DEPLOYMENT,
  1634. str(
  1635. ListMessageBuilder(
  1636. [
  1637. f"Selected GPUs need at least {byte_to_gib(self._non_uma_single_layer_vram)} GiB of VRAM.",
  1638. "No suitable GPU found for the main server.",
  1639. ]
  1640. )
  1641. ),
  1642. reason=EVENT_REASON_INSUFFICIENT_RESOURCES_GPU_SELECTED,
  1643. )
  1644. return None
  1645. # Check if the rpc gpus can offload even one layer.
  1646. filtered_gpus = [gpu for gpu in sorted_gpus if gpu[0] != main_worker_vram[0]]
  1647. for gpu in filtered_gpus:
  1648. rpc_at_least_vram = self._get_single_layer_vram(
  1649. self._worker_id_to_worker.get(gpu[0]).status.memory.is_unified_memory,
  1650. True,
  1651. )
  1652. if not self._can_offload_at_least_one_layer(
  1653. gpu[2],
  1654. rpc_at_least_vram,
  1655. ):
  1656. key = f"{self._worker_id_to_worker.get(gpu[0]).name}:{gpu[2]}"
  1657. self._event_collector.add(
  1658. EventLevelEnum.ERROR,
  1659. EVENT_ACTION_DISTRIBUTED_DEPLOYMENT,
  1660. str(
  1661. ListMessageBuilder(
  1662. [
  1663. f"The model in RPC requires approximately {byte_to_gib(rpc_at_least_vram)} GiB VRAM.",
  1664. f"Selected GPU {key} lacks enough VRAM to serve as an RPC server.",
  1665. ]
  1666. )
  1667. ),
  1668. reason=EVENT_REASON_INSUFFICIENT_RESOURCES_GPU_SELECTED,
  1669. )
  1670. return None
  1671. c = [
  1672. (
  1673. main_worker_vram,
  1674. *[gpu for gpu in filtered_gpus if gpu[0] != main_worker_vram[0]],
  1675. )
  1676. ]
  1677. combinations = {}
  1678. for item in c:
  1679. key = len(item)
  1680. if key not in combinations:
  1681. combinations[key] = []
  1682. combinations[key].append(item)
  1683. logger.debug(
  1684. f"Generated combinations with main: {main_worker.name} and rpcs number: {len(c) - 1}"
  1685. )
  1686. return combinations
  1687. def _generate_combinations_for_worker_with_rpcs_without_selected_gpu_ids(
  1688. self,
  1689. ) -> Dict:
  1690. main_worker_vram, main_worker = self._get_main_for_combination(
  1691. self._workers_allocatable_vram,
  1692. )
  1693. if not main_worker_vram:
  1694. self._event_collector.add(
  1695. EventLevelEnum.INFO,
  1696. EVENT_ACTION_DISTRIBUTED_DEPLOYMENT,
  1697. str(
  1698. ListMessageBuilder(
  1699. [
  1700. f"The model requires approximately {byte_to_gib(self._non_uma_single_layer_vram)} GiB VRAM.",
  1701. "No suitable main server found, current GPUs lack enough VRAM to offload a single layer.",
  1702. ]
  1703. )
  1704. ),
  1705. reason=EVENT_REASON_INSUFFICIENT_RESOURCES_GPU_SELECTED,
  1706. )
  1707. return None
  1708. # Skip the gpus if the rpc gpus can't offload even one layer.
  1709. filtered_gpus = [
  1710. gpu for gpu in self._gpus_allocatable_vram if gpu[0] != main_worker_vram[0]
  1711. ]
  1712. filtered_gpus = [
  1713. gpu
  1714. for gpu in filtered_gpus
  1715. if self._can_offload_at_least_one_layer(
  1716. gpu[2],
  1717. self._get_single_layer_vram(
  1718. self._worker_id_to_worker.get(
  1719. gpu[0]
  1720. ).status.memory.is_unified_memory,
  1721. True,
  1722. ),
  1723. )
  1724. ]
  1725. if len(filtered_gpus) == 0:
  1726. self._event_collector.add(
  1727. EventLevelEnum.INFO,
  1728. EVENT_ACTION_DISTRIBUTED_DEPLOYMENT,
  1729. str(
  1730. ListMessageBuilder(
  1731. [
  1732. f"The model requires approximately {byte_to_gib(self._rpc_non_uma_single_layer_vram)} GiB VRAM.",
  1733. "Current GPUs lacks enough VRAM to serve as an RPC server.",
  1734. ]
  1735. )
  1736. ),
  1737. reason=EVENT_REASON_INSUFFICIENT_RESOURCES_GPU_SELECTED,
  1738. )
  1739. return None
  1740. # Limit the number of gpus for generate rpc combination.
  1741. if len(filtered_gpus) > default_max_rpc_combination_generate_gpu_count:
  1742. self._event_collector.add(
  1743. EventLevelEnum.WARNING,
  1744. EVENT_ACTION_DISTRIBUTED_DEPLOYMENT,
  1745. str(
  1746. ListMessageBuilder(
  1747. f"Too many candidate RPC servers (max {default_max_rpc_combination_generate_gpu_count} supported), skipping distributed deployment. Use manual scheduling to select GPUs if needed."
  1748. )
  1749. ),
  1750. reason=EVENT_REASON_MAX_RPC_COMBINATION_GENERATE_GPU_COUNT_EXCEED,
  1751. )
  1752. return None
  1753. combinations = {}
  1754. key_range = min(len(filtered_gpus), self._max_rpc_server_count)
  1755. for i in range(1, (key_range + 1)):
  1756. c = [
  1757. (main_worker_vram, *v) for v in itertools.combinations(filtered_gpus, i)
  1758. ]
  1759. key = i + 1
  1760. if key not in combinations:
  1761. combinations[key] = []
  1762. combinations[key].extend(c)
  1763. logger.debug(
  1764. f"Generated combinations with main: {main_worker.name} and rpcs number: 1-{len(filtered_gpus)}"
  1765. )
  1766. return combinations
  1767. def _check_combination_rpcs(
  1768. self,
  1769. combination,
  1770. e: MemoryEstimate,
  1771. total_layers: int,
  1772. ) -> List[ModelInstanceSubordinateWorker]:
  1773. """
  1774. Check the rpc servers resource satisfied with combination.
  1775. combination example: ( ($worker_id, $worker_allocatable_vram), ($worker_id, $gpu_index, $gpu_allocatable), ($worker_id, $gpu_index, $gpu_allocatable) )
  1776. """
  1777. subordinate_workers: List[ModelInstanceSubordinateWorker] = []
  1778. for i in range(1, len(combination)):
  1779. r_worker_id = combination[i][0]
  1780. r_gpu_index = combination[i][1]
  1781. r_allocatable = combination[i][2]
  1782. r_worker = self._worker_id_to_worker.get(r_worker_id)
  1783. r_is_unified_memory = r_worker.status.memory.is_unified_memory
  1784. position = i - 1
  1785. r_vram_claim = e.vrams[position].nonuma
  1786. if r_is_unified_memory:
  1787. r_vram_claim = e.vrams[position].uma
  1788. if r_vram_claim > r_allocatable:
  1789. break
  1790. subordinate_workers.append(
  1791. ModelInstanceSubordinateWorker(
  1792. worker_id=r_worker_id,
  1793. worker_name=r_worker.name,
  1794. gpu_indexes=[r_gpu_index],
  1795. computed_resource_claim=ComputedResourceClaim(
  1796. is_unified_memory=r_is_unified_memory,
  1797. offload_layers=e.vrams[position].handleLayers,
  1798. vram={r_gpu_index: r_vram_claim},
  1799. ram=0,
  1800. total_layers=total_layers,
  1801. ),
  1802. )
  1803. )
  1804. if len(subordinate_workers) != len(combination) - 1:
  1805. return []
  1806. return subordinate_workers
  1807. def _get_main_for_combination(
  1808. self,
  1809. workers_allocations_vrams: Tuple[Tuple[int, int]],
  1810. ) -> Tuple[Tuple[int, int], Worker]:
  1811. """
  1812. Get the worker with the most allocatable vram as main
  1813. Args:
  1814. workers_allocations_vrams (Tuple[Tuple[int, int]]): each tuple example ($worker_id, $worker_allocatable_vram)
  1815. workers_allocatable (Dict[int, Allocatable]): workers allocatable resources
  1816. Returns:
  1817. Tuple[Tuple[int, int], Worker]: main worker vram and worker instance
  1818. """
  1819. max_worker_can_offload_vram = 0
  1820. main_worker = None
  1821. main_worker_vram = None
  1822. for sw in workers_allocations_vrams:
  1823. is_uma = self._worker_id_to_worker.get(
  1824. sw[0]
  1825. ).status.memory.is_unified_memory
  1826. single_layer_vram = self._get_single_layer_vram(is_uma)
  1827. if not self._can_offload_at_least_one_layer(
  1828. sw[1],
  1829. single_layer_vram,
  1830. ):
  1831. continue
  1832. sum_can_offload_at_least_one_layer_vram = 0
  1833. for ga in self._workers_allocatable_resource.get(sw[0]).vram.values():
  1834. if self._can_offload_at_least_one_layer(
  1835. ga,
  1836. single_layer_vram,
  1837. ):
  1838. sum_can_offload_at_least_one_layer_vram += ga
  1839. if sum_can_offload_at_least_one_layer_vram > max_worker_can_offload_vram:
  1840. max_worker_can_offload_vram = sum_can_offload_at_least_one_layer_vram
  1841. main_worker = self._worker_id_to_worker.get(sw[0])
  1842. main_worker_vram = sw
  1843. return main_worker_vram, main_worker
  1844. def _update_cache_for_multi_workers_multi_gpus_partial_offload_resource_claim(
  1845. self, key: str, value: Any
  1846. ):
  1847. if (
  1848. len(self._multi_workers_multi_gpus_partial_offload_resource_claim_cache)
  1849. < self._cache_max_size
  1850. ):
  1851. self._multi_workers_multi_gpus_partial_offload_resource_claim_cache[key] = (
  1852. value
  1853. )
  1854. def _update_cache_for_single_worker_multi_gpus_partial_offload_resource_claim(
  1855. self, key: str, value: Any
  1856. ):
  1857. if (
  1858. len(self._single_worker_multi_gpus_partial_offload_resource_claim_cache)
  1859. < self._cache_max_size
  1860. ):
  1861. self._single_worker_multi_gpus_partial_offload_resource_claim_cache[key] = (
  1862. value
  1863. )
  1864. def _cache_key_for_single_worker_multi_gpus_combination(self, combination):
  1865. # combination example:
  1866. # ( ($gpu_index, $gpu_allocatable), ($gpu_index, $gpu_allocatable) )
  1867. values = [str(item[-1]) for item in combination]
  1868. key = '|'.join(values)
  1869. return key
  1870. def _cache_key_for_main_with_rpcs_combination(self, combination):
  1871. # combination example:
  1872. # ( ($worker_id, $worker_allocatable_vram), ($worker_id, $gpu_index, $gpu_allocatable), ($worker_id, $gpu_index, $gpu_allocatable) )
  1873. values = [str(item[-1]) for item in combination]
  1874. key = '|'.join(values)
  1875. return key
  1876. async def _calculate_model_resource_claim(
  1877. self, offload: GPUOffloadEnum = GPUOffloadEnum.Partial, **kwargs
  1878. ) -> ModelResourceClaim:
  1879. return await calculate_gguf_model_resource_claim(
  1880. self._model,
  1881. offload,
  1882. workers=self._workers,
  1883. cache_dir=self._cache_dir,
  1884. **kwargs,
  1885. )
  1886. async def _get_or_calculate_model_resource_claim(
  1887. self, cache, cache_key, tensor_split=None, rpc=None
  1888. ) -> Estimate:
  1889. """
  1890. Get the resource claim estimate from cache or calculate it if not present in cache.
  1891. """
  1892. if cache_key in cache:
  1893. return cache[cache_key]
  1894. result = await self._calculate_model_resource_claim(
  1895. tensor_split=tensor_split, rpc=rpc
  1896. )
  1897. estimate = result.resource_claim_estimate
  1898. cache[cache_key] = estimate
  1899. return estimate
  1900. def _create_candidate(
  1901. self,
  1902. worker: Worker,
  1903. offload_layers: int,
  1904. ram_claim: int,
  1905. vram_claim: Dict[int, int],
  1906. gpu_indexes: Optional[List[int]] = None,
  1907. tensor_split: Optional[List[int]] = None,
  1908. subordinate_workers: Optional[List[ModelInstanceSubordinateWorker]] = None,
  1909. ) -> ModelInstanceScheduleCandidate:
  1910. """
  1911. Create a ModelInstanceScheduleCandidate object.
  1912. """
  1913. candidate = ModelInstanceScheduleCandidate(
  1914. worker=worker,
  1915. gpu_indexes=gpu_indexes,
  1916. computed_resource_claim=ComputedResourceClaim(
  1917. is_unified_memory=worker.status.memory.is_unified_memory,
  1918. offload_layers=offload_layers,
  1919. vram=vram_claim,
  1920. ram=ram_claim,
  1921. total_layers=self._total_layers,
  1922. tensor_split=tensor_split,
  1923. ),
  1924. subordinate_workers=subordinate_workers,
  1925. )
  1926. return candidate
  1927. def _estimate_approximate_required_gpu_number(self, vram: int) -> int:
  1928. """Estimate the approximate required number of GPUs based on the VRAM offload rate."""
  1929. if vram == 0:
  1930. return -1
  1931. vram_offload_rate = self._max_gpu_vram / vram
  1932. for i in range(1, 17):
  1933. if vram_offload_rate > 1 / i:
  1934. return i
  1935. return -1
  1936. def _advise_for_find_single_worker_single_gpu_full_offloading_candidates(
  1937. self,
  1938. candidates: List[ModelInstanceScheduleCandidate],
  1939. gpu_vram: Tuple[int, int, int],
  1940. ):
  1941. if candidates:
  1942. return
  1943. ram = self._workers_allocatable_resource.get(gpu_vram[0]).ram
  1944. self._event_collector.add(
  1945. EventLevelEnum.INFO,
  1946. EVENT_ACTION_SINGLE_WORKER_SINGLE_GPU_FULL_OFFLOADING,
  1947. str(
  1948. ListMessageBuilder(
  1949. [
  1950. f"The model requires approximately {byte_to_gib(self._non_uma_single_gpu_full_offload_vram)} GiB VRAM and {byte_to_gib(self._non_uma_single_gpu_full_offload_ram)} GiB RAM.",
  1951. f"The available GPU has {byte_to_gib(gpu_vram[2])} GiB VRAM and {byte_to_gib(ram)} GiB RAM.",
  1952. ]
  1953. )
  1954. ),
  1955. reason=EVENT_REASON_INSUFFICIENT_RESOURCES,
  1956. )
  1957. async def _advise_for_find_single_worker_multi_gpus_full_offloading_candidates(
  1958. self, candidates: List[ModelInstanceScheduleCandidate], worker: Worker
  1959. ):
  1960. """
  1961. Generate advise for single worker multi gpus.
  1962. """
  1963. if candidates:
  1964. return
  1965. if not worker:
  1966. return
  1967. worker_allocatable = self._workers_allocatable_resource.get(worker.id)
  1968. worker_gpus_allocatable_vram = sum(worker_allocatable.vram.values())
  1969. tensor_split = list(worker_allocatable.vram.values())
  1970. result = await self._calculate_model_resource_claim(
  1971. offload=GPUOffloadEnum.Full, tensor_split=tensor_split
  1972. )
  1973. estimate = result.resource_claim_estimate
  1974. sum_gpu_vram_claim = sum([vram.nonuma for vram in estimate.items[0].vrams])
  1975. ram_claim = estimate.items[0].ram.nonuma
  1976. if (
  1977. worker_gpus_allocatable_vram > sum_gpu_vram_claim
  1978. and worker_allocatable.ram > ram_claim
  1979. ):
  1980. self._event_collector.add(
  1981. EventLevelEnum.INFO,
  1982. EVENT_ACTION_SINGLE_WORKER_MULTI_GPU_FULL_OFFLOADING,
  1983. str(
  1984. ListMessageBuilder(
  1985. [
  1986. f"The model requires approximately {byte_to_gib(sum_gpu_vram_claim)} GiB VRAM and {byte_to_gib(ram_claim)} GiB RAM.",
  1987. f"The largest available worker provides {byte_to_gib(worker_gpus_allocatable_vram)} GiB VRAM and {byte_to_gib(worker_allocatable.ram)} GiB RAM.",
  1988. "Unable to find a combination of GPUs that satisfies the requirements when splitting by layers.",
  1989. ]
  1990. )
  1991. ),
  1992. reason=EVENT_REASON_INSUFFICIENT_RESOURCES,
  1993. )
  1994. else:
  1995. self._event_collector.add(
  1996. EventLevelEnum.INFO,
  1997. EVENT_ACTION_SINGLE_WORKER_MULTI_GPU_FULL_OFFLOADING,
  1998. str(
  1999. ListMessageBuilder(
  2000. [
  2001. f"The model requires approximately {byte_to_gib(sum_gpu_vram_claim)} GiB VRAM and {byte_to_gib(ram_claim)} GiB RAM.",
  2002. f"The largest available worker provides {byte_to_gib(worker_gpus_allocatable_vram)} GiB VRAM and {byte_to_gib(worker_allocatable.ram)} GiB RAM.",
  2003. ]
  2004. )
  2005. ),
  2006. reason=EVENT_REASON_INSUFFICIENT_RESOURCES,
  2007. )
  2008. async def _advise_for_find_single_worker_partial_offloading_candidates(
  2009. self, candidates: List[ModelInstanceScheduleCandidate], worker_id: int
  2010. ):
  2011. if candidates:
  2012. return
  2013. worker_allocatable = self._workers_allocatable_resource.get(worker_id)
  2014. worker_gpus_allocatable_vram = sum(worker_allocatable.vram.values())
  2015. if self._param_gpu_layers:
  2016. # Multi-GPU
  2017. tensor_split = list(worker_allocatable.vram.values())
  2018. result = await self._calculate_model_resource_claim(
  2019. offload=GPUOffloadEnum.Partial, tensor_split=tensor_split
  2020. )
  2021. estimate = result.resource_claim_estimate
  2022. vram_claims, ram_claim = self._get_tensor_split_claim_with_layers(
  2023. self._param_gpu_layers, False, estimate.items
  2024. )
  2025. sum_gpu_vram_claim = sum(vram_claims)
  2026. if (
  2027. worker_gpus_allocatable_vram > sum_gpu_vram_claim
  2028. and worker_allocatable.ram > ram_claim
  2029. ):
  2030. self._event_collector.add(
  2031. EventLevelEnum.INFO,
  2032. EVENT_ACTION_SINGLE_WORKER_PARTIAL_OFFLOADING,
  2033. str(
  2034. ListMessageBuilder(
  2035. [
  2036. f"The model requires approximately {byte_to_gib(sum_gpu_vram_claim)} GiB VRAM and {byte_to_gib(ram_claim)} GiB RAM to offload {self._param_gpu_layers} layers.",
  2037. f"The largest available worker provides {byte_to_gib(worker_gpus_allocatable_vram)} GiB VRAM and {byte_to_gib(worker_allocatable.ram)} GiB RAM but unable to find a combination of GPUs that satisfies the requirements when splitting by layers. Try offloading fewer layers to lower resource required.",
  2038. ]
  2039. )
  2040. ),
  2041. reason=EVENT_REASON_INSUFFICIENT_RESOURCES,
  2042. )
  2043. else:
  2044. self._event_collector.add(
  2045. EventLevelEnum.INFO,
  2046. EVENT_ACTION_SINGLE_WORKER_PARTIAL_OFFLOADING,
  2047. str(
  2048. ListMessageBuilder(
  2049. [
  2050. f"The model requires approximately {byte_to_gib(sum_gpu_vram_claim)} GiB VRAM and {byte_to_gib(ram_claim)} GiB RAM to offload {self._param_gpu_layers} layers.",
  2051. f"The largest available worker provides {byte_to_gib(worker_gpus_allocatable_vram)} GiB VRAM and {byte_to_gib(worker_allocatable.ram)} GiB RAM. Try offloading fewer layers to lower resource required.",
  2052. ]
  2053. )
  2054. ),
  2055. reason=EVENT_REASON_INSUFFICIENT_RESOURCES,
  2056. )
  2057. def _advise_for_find_single_worker_cpu_candidates(
  2058. self, candidates: List[ModelInstanceScheduleCandidate], worker_id: int
  2059. ):
  2060. if candidates:
  2061. return
  2062. worker_allocatable = self._workers_allocatable_resource.get(worker_id)
  2063. self._event_collector.add(
  2064. EventLevelEnum.INFO,
  2065. EVENT_ACTION_CPU_OFFLOADING,
  2066. str(
  2067. ListMessageBuilder(
  2068. [
  2069. f"The model requires approximately {byte_to_gib(self._disable_offload_result_claim.items[0].ram.nonuma)} GiB RAM.",
  2070. f"The largest available worker provides {byte_to_gib(worker_allocatable.ram)} GiB RAM.",
  2071. ]
  2072. )
  2073. ),
  2074. reason=EVENT_REASON_INSUFFICIENT_RESOURCES,
  2075. )
  2076. def binary_search(arr, target):
  2077. """
  2078. Binary search the target in the arr.
  2079. If the target is found, return the index of the target.
  2080. Args:
  2081. arr (List[int]): The input list, is a sorted list from smallest to largest.
  2082. target (int): The target number.
  2083. """
  2084. if len(arr) == 0:
  2085. return -1
  2086. if arr[0] > target:
  2087. return -1
  2088. if arr[-1] < target:
  2089. return len(arr) - 1
  2090. low, high = 0, len(arr) - 1
  2091. while low <= high:
  2092. mid = (low + high) // 2
  2093. if arr[mid] == target:
  2094. return mid
  2095. elif arr[mid] < target:
  2096. low = mid + 1
  2097. else:
  2098. high = mid - 1
  2099. return high
  2100. def _get_max_offload_layers(candidates: List[ModelInstanceScheduleCandidate]) -> int:
  2101. if not candidates:
  2102. return 0
  2103. return max(
  2104. candidate.computed_resource_claim.offload_layers for candidate in candidates
  2105. )
  2106. def _get_max_offload_layers_candidates(
  2107. candidates: List[ModelInstanceScheduleCandidate],
  2108. ) -> List[ModelInstanceScheduleCandidate]:
  2109. if not candidates:
  2110. return []
  2111. max_offload_layers = _get_max_offload_layers(candidates)
  2112. return _filter_candidates_by_max_offload_layers(candidates, max_offload_layers)
  2113. def _filter_candidates_by_max_offload_layers(
  2114. candidates: List[ModelInstanceScheduleCandidate], max_offload_layers
  2115. ) -> List[ModelInstanceScheduleCandidate]:
  2116. return [
  2117. candidate
  2118. for candidate in candidates
  2119. if candidate.computed_resource_claim.offload_layers == max_offload_layers
  2120. ]
  2121. def _sort_and_group_worker_gpu_vram(
  2122. workers_vram: List[Tuple[int, int]],
  2123. gpus_allocatable_vram: List[Tuple[int, int, int]],
  2124. ):
  2125. sorted_workers = sorted(workers_vram, key=lambda item: item[1], reverse=True)
  2126. sorted_gpus = sorted(gpus_allocatable_vram, key=lambda item: item[2], reverse=True)
  2127. return sorted_workers, sorted_gpus