worker_filesystem_client.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import json
  2. import logging
  3. from typing import Dict
  4. import aiohttp
  5. from gpustack.schemas.filesystem import FileExistsResponse
  6. from gpustack.schemas.models import Model
  7. from gpustack.schemas.workers import Worker
  8. from gpustack.server.worker_request import request_to_worker
  9. from gpustack import envs
  10. logger = logging.getLogger(__name__)
  11. _TIMEOUT = 15
  12. _GGUF_PARSE_TIMEOUT = 90
  13. class WorkerFilesystemClient:
  14. """Client for interacting with worker filesystem APIs."""
  15. def __init__(self):
  16. """Initialize the client and create HTTP clients."""
  17. self._connector = aiohttp.TCPConnector(
  18. limit=envs.TCP_CONNECTOR_LIMIT,
  19. force_close=True,
  20. )
  21. self._http_client = aiohttp.ClientSession(
  22. connector=self._connector, trust_env=True
  23. )
  24. self._http_client_no_proxy = aiohttp.ClientSession(connector=self._connector)
  25. async def __aenter__(self):
  26. """Context manager entry."""
  27. return self
  28. async def __aexit__(self, exc_type, exc_val, exc_tb):
  29. """Context manager exit - close all HTTP clients."""
  30. await self.close()
  31. async def close(self):
  32. """Close all HTTP clients and connector."""
  33. await self._http_client.close()
  34. await self._http_client_no_proxy.close()
  35. await self._connector.close()
  36. async def read_model_config(
  37. self,
  38. worker: Worker,
  39. path: str,
  40. ) -> Dict:
  41. """
  42. Read and parse a config file on a worker.
  43. Args:
  44. worker: The worker to query
  45. path: The file path to read
  46. Returns:
  47. Parsed config as dict
  48. """
  49. _, body = await request_to_worker(
  50. worker=worker,
  51. method="GET",
  52. path="files/model-config",
  53. proxy_client=self._http_client,
  54. no_proxy_client=self._http_client_no_proxy,
  55. params={"path": path},
  56. timeout=aiohttp.ClientTimeout(total=_TIMEOUT, sock_connect=5),
  57. )
  58. return json.loads(body)
  59. async def path_exists(
  60. self,
  61. worker: Worker,
  62. path: str,
  63. ) -> FileExistsResponse:
  64. """
  65. Check if a path exists on a worker.
  66. Args:
  67. worker: The worker to query
  68. path: The path to check
  69. Returns:
  70. FileExistsResponse indicating if the path exists
  71. """
  72. _, body = await request_to_worker(
  73. worker=worker,
  74. method="GET",
  75. path="files/file-exists",
  76. proxy_client=self._http_client,
  77. no_proxy_client=self._http_client_no_proxy,
  78. params={"path": path},
  79. timeout=aiohttp.ClientTimeout(total=_TIMEOUT, sock_connect=5),
  80. )
  81. return FileExistsResponse.model_validate_json(body)
  82. async def get_model_weight_size(
  83. self,
  84. worker: Worker,
  85. path: str,
  86. ) -> int:
  87. """
  88. Get the size of model weight files in a directory on a worker.
  89. Args:
  90. worker: The worker to query
  91. path: The directory path to scan
  92. Returns:
  93. The total size in bytes
  94. """
  95. _, body = await request_to_worker(
  96. worker=worker,
  97. method="GET",
  98. path="files/model-weight-size",
  99. proxy_client=self._http_client,
  100. no_proxy_client=self._http_client_no_proxy,
  101. params={"path": path},
  102. timeout=aiohttp.ClientTimeout(total=_TIMEOUT, sock_connect=5),
  103. )
  104. return json.loads(body).get("size", 0)
  105. async def parse_gguf(
  106. self,
  107. worker: Worker,
  108. model: Model,
  109. offload: str = "full",
  110. **kwargs, # tensor_split, rpc override parameters
  111. ) -> Dict:
  112. """
  113. Parse a GGUF file on a worker using gguf-parser.
  114. Args:
  115. worker: The worker to query
  116. model: The Model object to parse
  117. offload: GPU offload strategy (full, partial, disable)
  118. **kwargs: Optional override parameters (tensor_split, rpc)
  119. Returns:
  120. Parsed GGUF output as dict (GGUFParserOutput structure)
  121. Raises:
  122. aiohttp.ClientError: If the request fails
  123. """
  124. # Build request payload
  125. payload = {
  126. "model_dict": model.model_dump(mode="json"),
  127. "offload": offload,
  128. }
  129. # Add override parameters
  130. for key in ("tensor_split", "rpc"):
  131. if key in kwargs:
  132. payload[key] = kwargs[key]
  133. _, body = await request_to_worker(
  134. worker=worker,
  135. method="POST",
  136. path="files/parse-gguf",
  137. proxy_client=self._http_client,
  138. no_proxy_client=self._http_client_no_proxy,
  139. data=json.dumps(payload).encode(),
  140. headers={"Content-Type": "application/json"},
  141. timeout=aiohttp.ClientTimeout(total=_GGUF_PARSE_TIMEOUT, sock_connect=10),
  142. )
  143. response_data = json.loads(body)
  144. if not response_data.get("success", False):
  145. error = response_data.get("error", "Unknown error")
  146. raise aiohttp.ClientError(f"GGUF parsing failed: {error}")
  147. # Parse JSON output
  148. output_str = response_data.get("output", "{}")
  149. return json.loads(output_str)