custom.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. import logging
  2. import os
  3. import shlex
  4. from typing import Dict, List, Optional, Tuple
  5. from gpustack.schemas.models import ModelInstanceDeploymentMetadata
  6. from gpustack.utils.command import format_backend_parameters
  7. from gpustack.utils.envs import sanitize_env
  8. from gpustack.worker.backends.base import InferenceServer
  9. from gpustack_runtime.deployer import (
  10. Container,
  11. ContainerEnv,
  12. ContainerExecution,
  13. ContainerProfileEnum,
  14. WorkloadPlan,
  15. create_workload,
  16. ContainerRestartPolicyEnum,
  17. )
  18. logger = logging.getLogger(__name__)
  19. class CustomServer(InferenceServer):
  20. """
  21. Generic pluggable inference server backend with container management capabilities.
  22. This backend allows users to specify any command and automatically handles:
  23. - Command path resolution
  24. - Version management
  25. - Environment variable setup
  26. - Model path and port configuration
  27. - Backend parameters passing
  28. - Error handling and logging
  29. - Container management operations (logs, stop, status, etc.)
  30. Usage:
  31. Set model.backend_command to specify the command name (e.g., "vllm", "custom-server")
  32. The backend will automatically call get_command_path(command_name) to resolve the path.
  33. """
  34. def start(self):
  35. try:
  36. self._start()
  37. except Exception as e:
  38. self._handle_error(e)
  39. def _start(self):
  40. logger.info(
  41. f"Starting custom backend model instance: {self._model_instance.name}"
  42. )
  43. deployment_metadata = self._get_deployment_metadata()
  44. env = self._get_configured_env()
  45. command = None
  46. if self.inference_backend:
  47. command = self.inference_backend.get_container_entrypoint(
  48. self._model.backend_version
  49. )
  50. command_args, injected = self._build_command_args(entrypoint=command)
  51. try:
  52. self._update_model_instance(
  53. self._model_instance.id,
  54. injected_backend_parameters=format_backend_parameters(injected) or None,
  55. )
  56. except Exception as e:
  57. logger.warning(
  58. f"Failed to persist injected backend parameters for {self._model_instance.name}: {e}"
  59. )
  60. self._create_workload(
  61. deployment_metadata=deployment_metadata,
  62. command=command,
  63. command_args=command_args,
  64. env=env,
  65. )
  66. def _create_workload(
  67. self,
  68. deployment_metadata: ModelInstanceDeploymentMetadata,
  69. command: Optional[List[str]],
  70. command_args: List[str],
  71. env: Dict[str, str],
  72. ):
  73. image = self._get_configured_image()
  74. if not image:
  75. raise ValueError("Failed to get Custom backend image")
  76. resources = self._get_configured_resources()
  77. mounts = self._get_configured_mounts()
  78. ports = self._get_configured_ports()
  79. # Read container config from environment variables
  80. container_config = self._get_container_env_config(env)
  81. run_container = Container(
  82. image=image,
  83. name="default",
  84. profile=ContainerProfileEnum.RUN,
  85. restart_policy=ContainerRestartPolicyEnum.NEVER,
  86. execution=ContainerExecution(
  87. privileged=True,
  88. command=command,
  89. args=command_args,
  90. run_as_user=container_config.user,
  91. run_as_group=container_config.group,
  92. ),
  93. envs=[
  94. ContainerEnv(
  95. name=name,
  96. value=value,
  97. )
  98. for name, value in env.items()
  99. ],
  100. mounts=mounts,
  101. resources=resources,
  102. ports=ports,
  103. )
  104. logger.info(
  105. f"Creating custom backend container workload: {deployment_metadata.name}"
  106. )
  107. logger.info(
  108. f"With image: {image}, "
  109. f"command: [{' '.join(command) if command else ''}], "
  110. f"arguments: [{' '.join(command_args)}], "
  111. f"ports: [{','.join([str(port.internal) for port in ports])}], "
  112. f"envs(inconsistent input items mean unchangeable):{os.linesep}"
  113. f"{os.linesep.join(f'{k}={v}' for k, v in sorted(sanitize_env(env).items()))}"
  114. )
  115. workload_plan = WorkloadPlan(
  116. name=deployment_metadata.name,
  117. host_network=True,
  118. shm_size=int(container_config.shm_size_gib * (1 << 30)),
  119. containers=[run_container],
  120. run_as_user=container_config.user,
  121. run_as_group=container_config.group,
  122. )
  123. create_workload(self._transform_workload_plan(workload_plan))
  124. logger.info(
  125. f"Created custom backend container workload: {deployment_metadata.name}"
  126. )
  127. def _build_command_args(
  128. self, entrypoint: Optional[List[str]] = None
  129. ) -> Tuple[List[str], List[str]]:
  130. command_args = []
  131. command_args_inline = self.inference_backend.replace_command_param(
  132. version=self._model.backend_version,
  133. model_path=self._model_path,
  134. port=self._get_serving_port(),
  135. worker_ip=self._worker.ip,
  136. model_name=self._model.name,
  137. command=self._model.run_command,
  138. env=self._model.env,
  139. )
  140. if command_args_inline:
  141. command_args = shlex.split(command_args_inline)
  142. # Add user-defined backend parameters
  143. user_backend_parameters = self._flatten_backend_param()
  144. command_args.extend(user_backend_parameters)
  145. injected = self._get_injected_backend_parameters(
  146. command_args, user_backend_parameters, entrypoint
  147. )
  148. return command_args, injected