vox_box.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. import logging
  2. import os
  3. from typing import Optional, List, Dict, Tuple
  4. from gpustack.schemas.models import ModelInstanceDeploymentMetadata
  5. from gpustack.utils.command import extend_args_no_exist, format_backend_parameters
  6. from gpustack.utils.envs import sanitize_env
  7. from gpustack.worker.backends.base import InferenceServer
  8. from gpustack_runtime.deployer import (
  9. Container,
  10. ContainerEnv,
  11. ContainerExecution,
  12. ContainerProfileEnum,
  13. WorkloadPlan,
  14. create_workload,
  15. ContainerRestartPolicyEnum,
  16. )
  17. logger = logging.getLogger(__name__)
  18. class VoxBoxServer(InferenceServer):
  19. def start(self):
  20. try:
  21. self._start()
  22. except Exception as e:
  23. self._handle_error(e)
  24. def _start(self):
  25. logger.info(f"Starting VoxBox model instance: {self._model_instance.name}")
  26. deployment_metadata = self._get_deployment_metadata()
  27. env = self._get_configured_env()
  28. command = None
  29. if self.inference_backend:
  30. command = self.inference_backend.get_container_entrypoint(
  31. self._model.backend_version
  32. )
  33. command_script = self._get_serving_command_script(env)
  34. command_args, injected = self._build_command_args(
  35. port=self._get_serving_port(),
  36. entrypoint=command,
  37. )
  38. try:
  39. self._update_model_instance(
  40. self._model_instance.id,
  41. injected_backend_parameters=format_backend_parameters(injected) or None,
  42. )
  43. except Exception as e:
  44. logger.warning(
  45. f"Failed to persist injected backend parameters for {self._model_instance.name}: {e}"
  46. )
  47. self._create_workload(
  48. deployment_metadata=deployment_metadata,
  49. command=command,
  50. command_script=command_script,
  51. command_args=command_args,
  52. env=env,
  53. )
  54. def _create_workload(
  55. self,
  56. deployment_metadata: ModelInstanceDeploymentMetadata,
  57. command: Optional[List[str]],
  58. command_script: Optional[str],
  59. command_args: List[str],
  60. env: Dict[str, str],
  61. ):
  62. image = self._get_configured_image()
  63. if not image:
  64. raise ValueError("Failed to get VoxBox backend image")
  65. # Command script will override the given command,
  66. # so we need to prepend command to command args.
  67. if command_script and command:
  68. command_args = command + command_args
  69. command = None
  70. resources = self._get_configured_resources(
  71. # Pass-through all devices as vox-box handles device itself.
  72. mount_all_devices=True,
  73. )
  74. mounts = self._get_configured_mounts()
  75. ports = self._get_configured_ports()
  76. # Read container config from environment variables
  77. container_config = self._get_container_env_config(env)
  78. run_container = Container(
  79. image=image,
  80. name="default",
  81. profile=ContainerProfileEnum.RUN,
  82. restart_policy=ContainerRestartPolicyEnum.NEVER,
  83. execution=ContainerExecution(
  84. privileged=True,
  85. command=command,
  86. command_script=command_script,
  87. args=command_args,
  88. run_as_user=container_config.user,
  89. run_as_group=container_config.group,
  90. ),
  91. envs=[
  92. ContainerEnv(
  93. name=name,
  94. value=value,
  95. )
  96. for name, value in env.items()
  97. ],
  98. resources=resources,
  99. mounts=mounts,
  100. ports=ports,
  101. )
  102. logger.info(f"Creating VoxBox container workload: {deployment_metadata.name}")
  103. logger.info(
  104. f"With image: {image}, "
  105. f"command: [{' '.join(command) if command else ''}], "
  106. f"arguments: [{' '.join(command_args)}], "
  107. f"ports: [{','.join([str(port.internal) for port in ports])}], "
  108. f"envs(inconsistent input items mean unchangeable):{os.linesep}"
  109. f"{os.linesep.join(f'{k}={v}' for k, v in sorted(sanitize_env(env).items()))}"
  110. )
  111. workload_plan = WorkloadPlan(
  112. name=deployment_metadata.name,
  113. host_network=True,
  114. shm_size=int(container_config.shm_size_gib * (1 << 30)),
  115. containers=[run_container],
  116. run_as_user=container_config.user,
  117. run_as_group=container_config.group,
  118. )
  119. create_workload(self._transform_workload_plan(workload_plan))
  120. logger.info(f"Created VoxBox container workload: {deployment_metadata.name}")
  121. def _build_command_args(
  122. self, port: int, entrypoint: Optional[List[str]] = None
  123. ) -> Tuple[List[str], List[str]]:
  124. arguments = [
  125. "vox-box",
  126. "start",
  127. "--model",
  128. self._model_path,
  129. "--data-dir",
  130. self._config.data_dir,
  131. ]
  132. # Allow version-specific command override if configured (before appending extra args)
  133. arguments = self.build_versioned_command_args(
  134. arguments,
  135. model_path=self._model_path,
  136. port=port,
  137. )
  138. user_backend_parameters = self._flatten_backend_param()
  139. arguments.extend(user_backend_parameters)
  140. # Append immutable arguments to ensure proper operation for accessing.
  141. # Only add if not already present in arguments.
  142. extend_args_no_exist(
  143. arguments, ("--host", self._worker.ip), ("--port", str(port))
  144. )
  145. if self._model_instance.gpu_indexes is not None:
  146. extend_args_no_exist(
  147. arguments,
  148. ("--device", f"cuda:{self._model_instance.gpu_indexes[0]}"),
  149. )
  150. injected = self._get_injected_backend_parameters(
  151. arguments, user_backend_parameters, entrypoint
  152. )
  153. return arguments, injected