workers.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718
  1. import secrets
  2. import datetime
  3. import base64
  4. import uuid
  5. import logging
  6. import asyncio
  7. from typing import Optional, List, Dict, Any, Set
  8. from sqlmodel.ext.asyncio.session import AsyncSession
  9. from sqlalchemy.exc import IntegrityError
  10. from sqlalchemy.orm import selectinload
  11. from urllib.parse import urlencode
  12. from fastapi import APIRouter, Depends, Response, Request
  13. from fastapi.responses import StreamingResponse, RedirectResponse
  14. from gpustack.api.exceptions import (
  15. AlreadyExistsException,
  16. InternalServerErrorException,
  17. NotFoundException,
  18. ForbiddenException,
  19. InvalidException,
  20. )
  21. from gpustack.config.config import get_global_config
  22. from gpustack.api.tenant import (
  23. bypass_tenant_filter,
  24. assert_cluster_resource_visible,
  25. assert_org_owned_writable,
  26. cluster_resource_visibility_conditions,
  27. )
  28. from gpustack.server.deps import (
  29. SessionDep,
  30. CurrentUserDep,
  31. TenantContextDep,
  32. )
  33. from gpustack.server.db import async_session
  34. from gpustack.server.worker_status_buffer import (
  35. heartbeat_flush_buffer,
  36. heartbeat_flush_buffer_lock,
  37. worker_status_flush_buffer,
  38. worker_status_flush_buffer_lock,
  39. )
  40. from gpustack.schemas.workers import (
  41. WorkerCreate,
  42. WorkerListParams,
  43. WorkerPublic,
  44. WorkerUpdate,
  45. WorkersPublic,
  46. Worker,
  47. WorkerRegistrationPublic,
  48. WorkerStatusStored,
  49. WorkerStateEnum,
  50. )
  51. from gpustack.schemas.clusters import Cluster, Credential, ClusterStateEnum
  52. from gpustack.schemas.users import User, UserRole
  53. from gpustack.schemas.api_keys import ApiKey
  54. from gpustack.schemas.config import (
  55. SensitivePredefinedConfig,
  56. PredefinedConfigNoDefaults,
  57. )
  58. from gpustack.security import get_secret_hash, API_KEY_PREFIX
  59. from gpustack.server.services import WorkerService, create_user_with_principal
  60. from gpustack.cloud_providers.common import key_bytes_to_openssh_pem
  61. from gpustack.utils.grafana import resolve_grafana_base_url
  62. router = APIRouter()
  63. system_name_prefix = "system/worker"
  64. logger = logging.getLogger(__name__)
  65. # Semaphore for creating workers to prevent db contention
  66. # FIXME: replace with an optimized implementation
  67. create_worker_semaphore = asyncio.Semaphore(10)
  68. def to_worker_public(input: Worker, me: bool) -> WorkerPublic:
  69. data = input.model_dump()
  70. if me:
  71. data['me'] = me
  72. return WorkerPublic.model_validate(data)
  73. def _make_worker_visibility_filter(ctx):
  74. """Return a row-level visibility predicate matching the SQL filter
  75. produced by ``cluster_resource_visibility_conditions``."""
  76. def _visible(w) -> bool:
  77. if bypass_tenant_filter(ctx):
  78. return True
  79. org_id = getattr(w, "owner_principal_id", None)
  80. if (
  81. ctx.current_principal_id is not None
  82. and org_id is not None
  83. and org_id == ctx.current_principal_id
  84. ):
  85. return True
  86. if getattr(w, "cluster_id", None) in ctx.accessible_cluster_ids:
  87. return True
  88. return False
  89. return _visible
  90. def _build_worker_list_filters(name, uuid, cluster_id, search):
  91. fuzzy_fields = {"name": search} if search else {}
  92. fields = {}
  93. if name:
  94. fields["name"] = name
  95. if uuid:
  96. fields["worker_uuid"] = uuid
  97. if cluster_id:
  98. fields["cluster_id"] = cluster_id
  99. return fields, fuzzy_fields
  100. def _normalize_worker_order_by(order_by):
  101. if not order_by:
  102. return order_by
  103. out = []
  104. for field, direction in order_by:
  105. # maps gpus (gpu count) to internal JSON array length representation
  106. if field == "gpus":
  107. out.append(("status.gpu_devices[]", direction))
  108. else:
  109. out.append((field, direction))
  110. return out
  111. @router.get("", response_model=WorkersPublic)
  112. async def get_workers(
  113. user: CurrentUserDep,
  114. ctx: TenantContextDep,
  115. params: WorkerListParams = Depends(),
  116. name: str = None,
  117. search: str = None,
  118. uuid: str = None,
  119. me: Optional[bool] = None,
  120. cluster_id: Optional[int] = None,
  121. ):
  122. fields, fuzzy_fields = _build_worker_list_filters(name, uuid, cluster_id, search)
  123. # Worker carries denormalized owner_principal_id (synced from cluster) so
  124. # tenant filtering can use the same OR-of-{own-Org, cluster_access} rule
  125. # as cluster_resource_visibility_conditions.
  126. extra_conditions = cluster_resource_visibility_conditions(ctx, Worker)
  127. visible = _make_worker_visibility_filter(ctx)
  128. if params.watch:
  129. return StreamingResponse(
  130. Worker.streaming(
  131. fields=fields, fuzzy_fields=fuzzy_fields, filter_func=visible
  132. ),
  133. media_type="text/event-stream",
  134. )
  135. if me and user.worker is not None:
  136. # me query overrides all other filters
  137. fields = {"id": user.worker.id}
  138. fuzzy_fields = {}
  139. async with async_session() as session:
  140. worker_list = await Worker.paginated_by_query(
  141. session=session,
  142. fields=fields,
  143. fuzzy_fields=fuzzy_fields,
  144. extra_conditions=extra_conditions,
  145. page=params.page,
  146. per_page=params.perPage,
  147. order_by=_normalize_worker_order_by(params.order_by),
  148. )
  149. if not user.worker:
  150. return worker_list
  151. public_list = [
  152. to_worker_public(worker, user.worker.id == worker.id)
  153. for worker in worker_list.items
  154. ]
  155. return WorkersPublic(items=public_list, pagination=worker_list.pagination)
  156. @router.get("/{id}", response_model=WorkerPublic)
  157. async def get_worker(
  158. user: CurrentUserDep,
  159. ctx: TenantContextDep,
  160. session: SessionDep,
  161. id: int,
  162. ):
  163. worker = await Worker.one_by_id(session, id)
  164. assert_cluster_resource_visible(ctx, worker, not_found_message="worker not found")
  165. if user.worker is not None and user.worker.id == worker.id:
  166. return to_worker_public(worker, True)
  167. return worker
  168. @router.get("/{id}/dashboard")
  169. async def get_worker_dashboard(
  170. session: SessionDep,
  171. ctx: TenantContextDep,
  172. id: int,
  173. request: Request,
  174. ):
  175. worker = await Worker.one_by_id(session, id)
  176. assert_cluster_resource_visible(ctx, worker, not_found_message="worker not found")
  177. cfg = get_global_config()
  178. if not cfg.get_grafana_url() or not cfg.grafana_worker_dashboard_uid:
  179. raise InternalServerErrorException(
  180. message="Grafana dashboard settings are not configured"
  181. )
  182. cluster = None
  183. if worker.cluster_id is not None:
  184. cluster = await Cluster.one_by_id(session, worker.cluster_id)
  185. query_params = {}
  186. if cluster is not None:
  187. query_params["var-cluster_name"] = cluster.name
  188. query_params["var-worker_name"] = worker.name
  189. grafana_base = resolve_grafana_base_url(cfg, request)
  190. slug = "gpustack-worker"
  191. dashboard_url = f"{grafana_base}/d/{cfg.grafana_worker_dashboard_uid}/{slug}"
  192. if query_params:
  193. dashboard_url = f"{dashboard_url}?{urlencode(query_params)}"
  194. return RedirectResponse(url=dashboard_url, status_code=302)
  195. def update_worker_data(
  196. worker_in: WorkerCreate,
  197. existing: Optional[Worker] = None,
  198. **kwargs,
  199. ) -> Worker:
  200. to_create_worker = None
  201. cluster: Optional[Cluster] = kwargs.get("cluster")
  202. if existing is not None:
  203. # Preserve maintenance field from existing worker if not explicitly set in worker_in
  204. incoming_data = worker_in.model_dump()
  205. if (
  206. incoming_data.get("maintenance") is None
  207. and existing.maintenance is not None
  208. ):
  209. incoming_data["maintenance"] = existing.maintenance
  210. to_create_worker = Worker.model_validate(
  211. {
  212. **existing.model_dump(),
  213. **incoming_data,
  214. "labels": {
  215. **existing.labels,
  216. **worker_in.labels,
  217. },
  218. "cluster_id": existing.cluster_id,
  219. # Re-sync from cluster in case org ownership changed.
  220. "owner_principal_id": (
  221. cluster.owner_principal_id
  222. if cluster is not None
  223. else existing.owner_principal_id
  224. ),
  225. "state": WorkerStateEnum.READY,
  226. }
  227. )
  228. else:
  229. # new worker should ignore the reported worker_uuid
  230. to_create_worker = Worker.model_validate(
  231. {
  232. **worker_in.model_dump(exclude={"name", "worker_uuid"}),
  233. "name": worker_in.name or worker_in.hostname,
  234. "worker_uuid": "",
  235. "state": WorkerStateEnum.READY,
  236. "owner_principal_id": (
  237. cluster.owner_principal_id if cluster is not None else None
  238. ),
  239. **kwargs,
  240. }
  241. )
  242. if cluster is not None:
  243. to_create_worker.cluster = cluster
  244. to_create_worker.compute_state()
  245. return to_create_worker
  246. def _matches_exact_fields(worker: Worker, fields: Optional[Dict[str, Any]]) -> bool:
  247. if not fields:
  248. return True
  249. return all(getattr(worker, k, None) == v for k, v in fields.items())
  250. def _matches_fuzzy_fields(worker: Worker, fuzzy_fields: Dict[str, str]) -> bool:
  251. if not fuzzy_fields:
  252. return True
  253. for k, v in fuzzy_fields.items():
  254. attr = getattr(worker, k, None)
  255. if not isinstance(attr, str) or v.lower() not in attr.lower():
  256. return False
  257. return True
  258. def filter_workers_by_fields(
  259. workers: List[Worker],
  260. fields: Optional[Dict[str, Any]],
  261. fuzzy_fields: Dict[str, str] = {},
  262. ) -> List[Worker]:
  263. if not fields and not fuzzy_fields:
  264. return workers
  265. to_return = []
  266. for worker in workers:
  267. match = _matches_exact_fields(worker, fields) and _matches_fuzzy_fields(
  268. worker, fuzzy_fields
  269. )
  270. if match:
  271. to_return.append(worker)
  272. return to_return
  273. def get_existing_worker(
  274. cluster_id: int, worker_in: WorkerCreate, workers: List[Worker]
  275. ) -> Optional[Worker]:
  276. static_fields = {
  277. "deleted_at": None,
  278. "cluster_id": cluster_id,
  279. }
  280. if worker_in.name == "":
  281. return None
  282. # find existing worker by external_id or worker_uuid
  283. for field in ["external_id", "worker_uuid"]:
  284. value = getattr(worker_in, field, None)
  285. if value is None:
  286. continue
  287. fields = {**static_fields, field: value}
  288. existing_worker = next(iter(filter_workers_by_fields(workers, fields)), None)
  289. if existing_worker is not None:
  290. return existing_worker
  291. # find existing worker by name
  292. if worker_in.labels and worker_in.labels.get("gpustack.existence-check"):
  293. fields = {"name": worker_in.name}
  294. existing_worker = next(iter(filter_workers_by_fields(workers, fields)), None)
  295. if existing_worker is not None:
  296. if existing_worker.cluster_id != cluster_id:
  297. raise AlreadyExistsException(
  298. message=f"worker with name {worker_in.name} already exists in another cluster"
  299. )
  300. return existing_worker
  301. return None
  302. def check_worker_name_conflict(
  303. name: str, workers: List[Worker], existing_id: Optional[int] = None
  304. ):
  305. if name == "":
  306. if existing_id is not None:
  307. raise InvalidException(message="worker name cannot be empty")
  308. return
  309. workers = [worker for worker in workers if worker.id != existing_id]
  310. name_conflict_fields = {"name": name}
  311. name_conflict_worker = next(
  312. iter(filter_workers_by_fields(workers, name_conflict_fields)), None
  313. )
  314. if name_conflict_worker is not None:
  315. raise AlreadyExistsException(message=f"worker with name {name} already exists")
  316. def find_available_worker_name(
  317. original_name: str, current_name: str, related_names: Set[str]
  318. ) -> str:
  319. if original_name not in related_names:
  320. return original_name
  321. index = 1
  322. if current_name.startswith(f"{original_name}-"):
  323. suffix = current_name[len(original_name) + 1 :]
  324. if suffix.isdigit():
  325. index = int(suffix) + 1
  326. new_name = f"{original_name}-{index}"
  327. while new_name in related_names:
  328. index += 1
  329. new_name = f"{original_name}-{index}"
  330. return new_name
  331. async def retry_create_worker(
  332. session: AsyncSession, to_create: Worker, workers: List[Worker]
  333. ) -> Worker:
  334. related_workers = filter_workers_by_fields(
  335. workers,
  336. fields={
  337. "deleted_at": None,
  338. },
  339. fuzzy_fields={"name": to_create.name},
  340. )
  341. related_names = set(worker.name for worker in related_workers)
  342. original_name = to_create.name
  343. current_name = to_create.name
  344. for i in range(5):
  345. try:
  346. current_name = find_available_worker_name(
  347. original_name, current_name, related_names
  348. )
  349. to_create.name = current_name
  350. to_create.labels["worker-name"] = current_name
  351. new_worker = await Worker.create(session, to_create, auto_commit=False)
  352. return new_worker
  353. except IntegrityError:
  354. logger.warning(
  355. f"Worker name collision detected for worker name {to_create.name}, retrying... (attempt {i + 1}/5)"
  356. )
  357. related_names.add(current_name)
  358. await asyncio.sleep(0.1) # small delay before retrying to reduce contention
  359. raise InternalServerErrorException(
  360. message="Failed to create worker with unique name after multiple attempts"
  361. )
  362. def retry_create_unique_worker_uuid(workers: List[Worker]) -> str:
  363. current_uuids = set(
  364. worker.worker_uuid for worker in workers if worker.worker_uuid != ""
  365. )
  366. for i in range(5):
  367. new_uuid = str(uuid.uuid4())
  368. if new_uuid not in current_uuids:
  369. return new_uuid
  370. logger.warning(
  371. f"UUID collision detected for worker_uuid {new_uuid}, retrying... (attempt {i + 1}/5)"
  372. )
  373. # might not be necessary to retry so many times, but just in case, we want to make sure
  374. # the system can recover from such a rare event without manual intervention
  375. raise InternalServerErrorException(
  376. message="Failed to generate unique worker UUID after multiple attempts"
  377. )
  378. def _resolve_create_worker_cluster_id(user, worker_in: WorkerCreate) -> int:
  379. cluster_id = (
  380. worker_in.cluster_id if worker_in.cluster_id is not None else user.cluster_id
  381. )
  382. if cluster_id is None:
  383. raise ForbiddenException(message="Missing cluster_id for worker registration")
  384. return cluster_id
  385. def _build_worker_config_dict(cluster: Cluster) -> Dict[str, Any]:
  386. sensitive_fields = set(SensitivePredefinedConfig.model_fields.keys())
  387. worker_config = (
  388. {}
  389. if cluster.worker_config is None
  390. else cluster.worker_config.model_dump(exclude=sensitive_fields)
  391. )
  392. cfg = get_global_config()
  393. if (
  394. cfg.system_default_container_registry is not None
  395. and len(cfg.system_default_container_registry) > 0
  396. ):
  397. worker_config.setdefault(
  398. "system_default_container_registry",
  399. cfg.system_default_container_registry,
  400. )
  401. return worker_config
  402. async def _resolve_existing_worker_user(
  403. session, existing_worker: Optional[Worker]
  404. ) -> Optional[User]:
  405. if existing_worker is None:
  406. return None
  407. return await User.one_by_field(
  408. session=session,
  409. field="worker_id",
  410. value=existing_worker.id,
  411. options=[selectinload(User.api_keys)],
  412. )
  413. def _existing_api_key(existing_user: Optional[User]) -> Optional[ApiKey]:
  414. if existing_user is None or not existing_user.api_keys:
  415. return None
  416. return existing_user.api_keys[0]
  417. async def _persist_worker_registration(
  418. session,
  419. *,
  420. existing_worker: Optional[Worker],
  421. new_worker: Worker,
  422. new_token: str,
  423. to_create_user: Optional[User],
  424. existing_user: Optional[User],
  425. to_create_apikey: Optional[ApiKey],
  426. all_workers: List[Worker],
  427. cluster: Cluster,
  428. ) -> Worker:
  429. if existing_worker is not None:
  430. if to_create_apikey is not None:
  431. new_worker.token = new_token
  432. await WorkerService(session).update(
  433. existing_worker, new_worker, auto_commit=False
  434. )
  435. worker = existing_worker
  436. else:
  437. worker = await retry_create_worker(session, new_worker, all_workers)
  438. created_user = None
  439. if to_create_user is not None:
  440. to_create_user.worker = worker
  441. created_user = await create_user_with_principal(session, to_create_user)
  442. if to_create_apikey is not None:
  443. to_create_apikey.user = existing_user or created_user
  444. to_create_apikey.user_id = (existing_user or created_user).id
  445. await ApiKey.create(session=session, source=to_create_apikey, auto_commit=False)
  446. if cluster.state != ClusterStateEnum.READY:
  447. cluster.state = ClusterStateEnum.READY
  448. await cluster.update(session=session, auto_commit=False)
  449. await session.commit()
  450. return worker
  451. @router.post("", response_model=WorkerRegistrationPublic)
  452. async def create_worker(user: CurrentUserDep, worker_in: WorkerCreate):
  453. # Worker registration runs through two paths: (1) v1_base_router with
  454. # a human session — admin-only, since spinning up workers is a
  455. # platform-level action; (2) cluster_client_router with the cluster
  456. # service-account token (user.is_system=True). Allow both, deny the
  457. # rest.
  458. if not (user.is_admin or getattr(user, "is_system", False)):
  459. raise ForbiddenException(message="Only platform admin can register workers")
  460. async with create_worker_semaphore:
  461. async with async_session() as session:
  462. cluster_id = _resolve_create_worker_cluster_id(user, worker_in)
  463. all_workers = await Worker.all_by_fields(session, {"deleted_at": None})
  464. existing_worker = get_existing_worker(cluster_id, worker_in, all_workers)
  465. check_worker_name_conflict(
  466. worker_in.name,
  467. all_workers,
  468. existing_id=existing_worker.id if existing_worker else None,
  469. )
  470. if existing_worker is None and worker_in.external_id is not None:
  471. # avoid creating a worker with a non-existent external_id
  472. raise NotFoundException(
  473. message=f"worker with external_id {worker_in.external_id} not found"
  474. )
  475. if existing_worker is not None:
  476. existing_worker = await Worker.one_by_id(
  477. session=session, id=existing_worker.id, for_update=True
  478. )
  479. cluster = await Cluster.one_by_id(session, cluster_id)
  480. if cluster is None or cluster.deleted_at is not None:
  481. raise NotFoundException(message="Cluster not found")
  482. worker_config = _build_worker_config_dict(cluster)
  483. hashed_suffix = secrets.token_hex(6)
  484. access_key = secrets.token_hex(8)
  485. secret_key = secrets.token_hex(16)
  486. new_token = f"{API_KEY_PREFIX}_{access_key}_{secret_key}"
  487. new_worker = update_worker_data(
  488. worker_in,
  489. existing=existing_worker,
  490. # following args are only used when creating a new worker
  491. provider=cluster.provider,
  492. cluster=cluster,
  493. token=new_token,
  494. )
  495. if new_worker.worker_uuid == "":
  496. new_worker.worker_uuid = retry_create_unique_worker_uuid(all_workers)
  497. existing_user = await _resolve_existing_worker_user(
  498. session, existing_worker
  499. )
  500. to_create_user = (
  501. User(
  502. username=f'{system_name_prefix}-{hashed_suffix}',
  503. is_system=True,
  504. role=UserRole.Worker,
  505. hashed_password="",
  506. cluster=cluster,
  507. )
  508. if existing_user is None
  509. else None
  510. )
  511. existing_api_key = _existing_api_key(existing_user)
  512. to_create_apikey = (
  513. ApiKey(
  514. name=f'{system_name_prefix}-{hashed_suffix}',
  515. access_key=access_key,
  516. hashed_secret_key=get_secret_hash(secret_key),
  517. )
  518. if existing_api_key is None
  519. else None
  520. )
  521. try:
  522. worker = await _persist_worker_registration(
  523. session,
  524. existing_worker=existing_worker,
  525. new_worker=new_worker,
  526. new_token=new_token,
  527. to_create_user=to_create_user,
  528. existing_user=existing_user,
  529. to_create_apikey=to_create_apikey,
  530. all_workers=all_workers,
  531. cluster=cluster,
  532. )
  533. worker_dump = worker.model_dump()
  534. worker_dump["token"] = worker.token
  535. worker_dump["worker_config"] = (
  536. PredefinedConfigNoDefaults.model_validate(worker_config)
  537. )
  538. return WorkerRegistrationPublic.model_validate(worker_dump)
  539. except Exception as e:
  540. await session.rollback()
  541. raise InternalServerErrorException(
  542. message=f"Failed to create worker: {e}"
  543. )
  544. @router.put("/{id}", response_model=WorkerPublic)
  545. async def update_worker(
  546. ctx: TenantContextDep,
  547. session: SessionDep,
  548. id: int,
  549. worker_in: WorkerUpdate,
  550. ):
  551. worker = await Worker.one_by_id(session, id)
  552. if worker is not None and worker.deleted_at is not None:
  553. worker = None
  554. assert_cluster_resource_visible(ctx, worker, not_found_message="worker not found")
  555. assert_org_owned_writable(ctx, worker, resource_label="worker")
  556. patch = worker_in.model_dump()
  557. if worker_in.maintenance is not None:
  558. worker.maintenance = worker_in.maintenance
  559. worker.compute_state()
  560. patch["state"] = worker.state
  561. try:
  562. await WorkerService(session).update(worker, patch)
  563. except Exception as e:
  564. raise InternalServerErrorException(message=f"Failed to update worker: {e}")
  565. return worker
  566. @router.delete("/{id}")
  567. async def delete_worker(ctx: TenantContextDep, session: SessionDep, id: int):
  568. worker = await Worker.one_by_id(session, id)
  569. if worker is not None and worker.deleted_at is not None:
  570. worker = None
  571. assert_cluster_resource_visible(ctx, worker, not_found_message="worker not found")
  572. assert_org_owned_writable(ctx, worker, resource_label="worker")
  573. try:
  574. soft = worker.external_id is not None
  575. if soft:
  576. worker.state = WorkerStateEnum.DELETING
  577. await WorkerService(session).delete(worker, soft=soft)
  578. except Exception as e:
  579. raise InternalServerErrorException(message=f"Failed to delete worker: {e}")
  580. async def create_worker_status(user: CurrentUserDep, input: WorkerStatusStored):
  581. if user.worker is None:
  582. raise ForbiddenException(message="Failed to find related worker")
  583. heartbeat_time = datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0)
  584. input_dict = input.model_dump(exclude_unset=True)
  585. input_dict["heartbeat_time"] = heartbeat_time
  586. # Add worker status to buffer for batch update
  587. async with worker_status_flush_buffer_lock:
  588. worker_status_flush_buffer[user.worker.id] = input_dict
  589. return Response(status_code=204)
  590. async def heartbeat(user: CurrentUserDep):
  591. if user.worker is None:
  592. raise ForbiddenException(message="Failed to find related worker")
  593. # Add worker ID to buffer for batch update
  594. async with heartbeat_flush_buffer_lock:
  595. heartbeat_flush_buffer.add(user.worker.id)
  596. return Response(status_code=204)
  597. @router.get("/{id}/privatekey")
  598. async def get_worker_privatekey(
  599. ctx: TenantContextDep,
  600. session: SessionDep,
  601. id: int,
  602. ):
  603. worker = await Worker.one_by_id(session, id)
  604. if worker is not None and worker.deleted_at is not None:
  605. worker = None
  606. assert_cluster_resource_visible(ctx, worker, not_found_message="worker not found")
  607. # Private key is a write-class secret (anyone holding it can SSH into the
  608. # host) — gate with the writable check, same as the cluster registration
  609. # token endpoint in routes/clusters.py.
  610. assert_org_owned_writable(ctx, worker, resource_label="worker")
  611. if worker.ssh_key_id is None:
  612. raise NotFoundException(message="worker ssh key not found")
  613. ssh_key = await Credential.one_by_id(session, worker.ssh_key_id)
  614. if not ssh_key:
  615. raise NotFoundException(message="worker ssh key not found")
  616. private_key_bytes = base64.b64decode(ssh_key.encoded_private_key)
  617. private_key_pem = key_bytes_to_openssh_pem(
  618. private_key_bytes, ssh_key.ssh_key_options.algorithm
  619. )
  620. return Response(
  621. content=private_key_pem,
  622. media_type="application/octet-stream",
  623. headers={
  624. "Content-Disposition": f"attachment; filename=worker-{id}-private_key.pem"
  625. },
  626. )