worker_selector.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import asyncio
  2. import logging
  3. from typing import List, Optional
  4. from gpustack.client.worker_filesystem_client import WorkerFilesystemClient
  5. from gpustack.schemas.workers import Worker
  6. logger = logging.getLogger(__name__)
  7. class WorkerSelector:
  8. """Service for selecting workers based on various criteria."""
  9. def __init__(self, filesystem_client: WorkerFilesystemClient):
  10. self._filesystem_client = filesystem_client
  11. async def find_worker_with_path(
  12. self,
  13. workers: List[Worker],
  14. worker_id: Optional[int] = None,
  15. path: Optional[str] = None,
  16. ) -> Optional[Worker]:
  17. """
  18. Find a worker that has access to the specified path.
  19. Args:
  20. workers: List of workers to search
  21. worker_id: Optional specific worker ID to use
  22. path: Optional path to check for existence
  23. Returns:
  24. Worker that has access to the path, or None if not found
  25. Raises:
  26. ValueError: If worker_id is specified but worker not found
  27. """
  28. # If worker_id is specified, use that worker
  29. if worker_id is not None:
  30. worker = next((w for w in workers if w.id == worker_id), None)
  31. if worker is None:
  32. logger.error(f"Worker with id {worker_id} not found")
  33. raise ValueError(f"Worker with id {worker_id} not found")
  34. # If path is specified, check if it exists on this worker
  35. if path is not None:
  36. try:
  37. exists_response = await self._filesystem_client.path_exists(
  38. worker, path
  39. )
  40. if not exists_response.exists:
  41. logger.warning(
  42. f"Path {path} does not exist on worker {worker.id}"
  43. )
  44. return None
  45. except Exception as e:
  46. logger.error(
  47. f"Failed to check path {path} on worker {worker.id}: {e}"
  48. )
  49. return None
  50. return worker
  51. # If no worker_id is specified, find a worker that has the path
  52. if path is not None:
  53. return await self._find_worker_with_path_concurrent(workers, path)
  54. return None
  55. async def _find_worker_with_path_concurrent(
  56. self,
  57. workers: List[Worker],
  58. path: str,
  59. ) -> Optional[Worker]:
  60. """
  61. Concurrently check multiple workers for the existence of a path.
  62. Args:
  63. workers: List of workers to check
  64. path: Path to check for existence
  65. Returns:
  66. First worker that has the path, or None if not found
  67. """
  68. async def check_worker(worker: Worker) -> tuple[Worker, bool]:
  69. """Check if a worker has the specified path."""
  70. try:
  71. exists_response = await self._filesystem_client.path_exists(
  72. worker, path
  73. )
  74. return worker, exists_response.exists
  75. except Exception as e:
  76. logger.warning(
  77. f"Failed to check path {path} on worker {worker.id}: {e}"
  78. )
  79. return worker, False
  80. # Create tasks for all workers
  81. tasks = [check_worker(worker) for worker in workers]
  82. # Execute tasks concurrently and get results as they complete
  83. for completed_task in asyncio.as_completed(tasks):
  84. worker, exists = await completed_task
  85. if exists:
  86. logger.info(f"Found path {path} on worker {worker.id}")
  87. return worker
  88. # No worker has the path
  89. logger.warning(f"Path {path} not found on any worker")
  90. return None