clusters.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679
  1. import math
  2. import random
  3. import secrets
  4. from typing import Any, Callable, Optional, Union
  5. from urllib.parse import urlencode
  6. import aiohttp
  7. from fastapi import APIRouter, Depends, Request, Response, Query
  8. from fastapi.responses import RedirectResponse, StreamingResponse
  9. from enum import Enum
  10. from sqlalchemy.orm import selectinload
  11. from gpustack.api.exceptions import (
  12. AlreadyExistsException,
  13. InternalServerErrorException,
  14. NotFoundException,
  15. InvalidException,
  16. ConflictException,
  17. ServiceUnavailableException,
  18. )
  19. from gpustack.api.responses import StreamingResponseWithStatusCode
  20. from gpustack.schemas.common import PaginatedList, Pagination
  21. from gpustack.schemas.config import parse_base_model_to_env_vars
  22. from gpustack.api.tenant import (
  23. TenantContext,
  24. bypass_tenant_filter,
  25. cluster_visibility_conditions,
  26. assert_cluster_visible,
  27. assert_cluster_writable,
  28. validate_owner_principal,
  29. )
  30. from gpustack.schemas.workers import Worker, WorkerStateEnum
  31. from gpustack.server.db import async_session
  32. from gpustack.server.deps import SessionDep, TenantContextDep
  33. from gpustack.server.services import create_user_with_principal
  34. from gpustack.server.worker_request import stream_to_worker
  35. from gpustack.schemas.clusters import (
  36. ClusterListParams,
  37. ClusterUpdate,
  38. ClusterCreate,
  39. ClusterPublic,
  40. ClustersPublic,
  41. Cluster,
  42. ClusterStateEnum,
  43. ClusterProvider,
  44. SensitiveRegistrationConfig,
  45. ClusterRegistrationTokenPublic,
  46. WorkerPoolCreate,
  47. WorkerPoolPublic,
  48. WorkerPool,
  49. CloudOptions,
  50. )
  51. from gpustack.schemas.organizations import PLATFORM_PRINCIPAL_ID
  52. from gpustack.schemas.users import User, UserRole, system_name_prefix
  53. from gpustack.schemas.api_keys import ApiKey
  54. from gpustack.security import get_secret_hash, API_KEY_PREFIX
  55. from gpustack.k8s.manifest_template import TemplateConfig
  56. from gpustack.config.config import get_global_config, get_cluster_image_name
  57. from gpustack.utils.grafana import resolve_grafana_base_url
  58. from gpustack_runtime.detector import ManufacturerEnum
  59. CLUSTER_LOAD_OPTIONS = [
  60. selectinload(Cluster.cluster_workers),
  61. selectinload(Cluster.cluster_models),
  62. ]
  63. router = APIRouter()
  64. def get_server_url(request: Request, cluster_override: Optional[str]) -> str:
  65. """Construct the server URL based on request headers or fallback to default."""
  66. if cluster_override:
  67. return cluster_override.strip("/")
  68. url = get_global_config().server_external_url
  69. if not url:
  70. url = f"{request.url.scheme}://{request.url.hostname}"
  71. if request.url.port:
  72. url += f":{request.url.port}"
  73. return url
  74. def _is_cluster_visible(cluster: Cluster, ctx: TenantContext) -> bool:
  75. """Python-side mirror of cluster_visibility_conditions for in-memory lists."""
  76. if bypass_tenant_filter(ctx):
  77. return True
  78. if (
  79. ctx.current_principal_id is not None
  80. and cluster.owner_principal_id == ctx.current_principal_id
  81. ):
  82. return True
  83. if cluster.id in ctx.accessible_cluster_ids:
  84. return True
  85. return False
  86. @router.get("", response_model=ClustersPublic, response_model_exclude_none=True)
  87. async def get_clusters(
  88. session: SessionDep,
  89. ctx: TenantContextDep,
  90. params: ClusterListParams = Depends(),
  91. name: str = None,
  92. search: str = None,
  93. ):
  94. fuzzy_fields = {}
  95. if search:
  96. fuzzy_fields = {"name": search}
  97. fields = {'deleted_at': None}
  98. if name:
  99. fields = {"name": name}
  100. if params.watch:
  101. return StreamingResponse(
  102. Cluster.streaming(
  103. fields=fields,
  104. fuzzy_fields=fuzzy_fields,
  105. options=CLUSTER_LOAD_OPTIONS,
  106. filter_func=lambda c: _is_cluster_visible(c, ctx),
  107. ),
  108. media_type="text/event-stream",
  109. )
  110. async with async_session() as session:
  111. # Push visibility filtering into the query — own-Org cluster OR
  112. # cluster_access grant — instead of fetching the whole table and
  113. # filtering in Python.
  114. items = await Cluster.all_by_fields(
  115. session=session,
  116. fields=fields,
  117. fuzzy_fields=fuzzy_fields,
  118. options=CLUSTER_LOAD_OPTIONS,
  119. extra_conditions=cluster_visibility_conditions(ctx, Cluster),
  120. )
  121. if not items:
  122. return PaginatedList[ClusterPublic](
  123. items=[],
  124. pagination=Pagination(
  125. page=params.page,
  126. perPage=params.perPage,
  127. total=0,
  128. totalPage=0,
  129. ),
  130. )
  131. if params.page < 1 or params.perPage < 1:
  132. # Return all items.
  133. pagination = Pagination(
  134. page=1,
  135. perPage=len(items),
  136. total=len(items),
  137. totalPage=1,
  138. )
  139. return PaginatedList[ClusterPublic](items=items, pagination=pagination)
  140. # sort in memory
  141. order_by = params.order_by
  142. if order_by:
  143. for field, direction in reversed(order_by):
  144. items.sort(
  145. key=_make_sort_key(field),
  146. reverse=direction == "desc",
  147. )
  148. # Paginate results.
  149. start = (params.page - 1) * params.perPage
  150. end = start + params.perPage
  151. paginated_items = items[start:end]
  152. count = len(items)
  153. total_page = math.ceil(count / params.perPage)
  154. pagination = Pagination(
  155. page=params.page,
  156. perPage=params.perPage,
  157. total=count,
  158. totalPage=total_page,
  159. )
  160. return PaginatedList[ClusterPublic](
  161. items=paginated_items, pagination=pagination
  162. )
  163. def _make_sort_key(field: str) -> Callable[[Any], tuple]:
  164. """
  165. Returns a key function for sorting objects by a given field.
  166. Handles:
  167. - None values (placed at the end regardless of sort direction),
  168. - Enum instances (uses .value for comparison),
  169. - Other types (str, int, float, datetime, etc.) as long as they are comparable.
  170. """
  171. def key_func(obj: Any) -> tuple:
  172. val = getattr(obj, field, None)
  173. if val is None:
  174. # (1, None) ensures None is sorted after non-None values
  175. return (1, None)
  176. if isinstance(val, Enum):
  177. # Use the underlying value of the Enum for comparison
  178. sort_val = val.value
  179. else:
  180. sort_val = val
  181. # (0, sort_val) so non-None values come first
  182. return (0, sort_val)
  183. return key_func
  184. @router.get("/{id}", response_model=ClusterPublic, response_model_exclude_none=True)
  185. async def get_cluster(session: SessionDep, ctx: TenantContextDep, id: int):
  186. cluster = await Cluster.one_by_id(
  187. session,
  188. id,
  189. options=CLUSTER_LOAD_OPTIONS,
  190. )
  191. assert_cluster_visible(ctx, cluster, not_found_message=f"cluster {id} not found")
  192. return cluster
  193. def create_update_check(
  194. provider: ClusterProvider, input: Union[ClusterCreate, ClusterUpdate]
  195. ):
  196. cfg = get_global_config()
  197. is_cloud_provider = provider not in [
  198. ClusterProvider.Kubernetes,
  199. ClusterProvider.Docker,
  200. ]
  201. if (
  202. is_cloud_provider
  203. and isinstance(input, ClusterCreate)
  204. and input.credential_id is None
  205. ):
  206. raise InvalidException(
  207. message=f"credential_id is required for provider {provider}"
  208. )
  209. server_url = input.server_url or cfg.server_external_url
  210. if is_cloud_provider and server_url is None:
  211. raise InvalidException(
  212. message=f"server_url is required for provider {provider}"
  213. )
  214. if provider == ClusterProvider.Kubernetes:
  215. # check for volume mounts
  216. if input.k8s_volume_mounts is None or len(input.k8s_volume_mounts) < 1:
  217. # at least one volume mount is required, and the default one is for gpustack data dir.
  218. raise InvalidException(
  219. message="At least one k8s_volume_mount is required, and the default one is for gpustack data dir."
  220. )
  221. if (
  222. input.k8s_volume_mounts[0].volume_source is None
  223. or input.k8s_volume_mounts[0].volume_source.host_path is None
  224. ):
  225. raise InvalidException(
  226. message="The first k8s_volume_mount must be for gpustack data dir with hostPath volume source."
  227. )
  228. def enforce_data_dir_mounts(input: Union[ClusterCreate, ClusterUpdate]):
  229. """
  230. Assuming the first item of k8s_volume_mounts is for gpustack data dir, enforce that it is always present and has the correct settings.
  231. """
  232. # the first volume must exist as it's validated in create_update_check, and it must be for gpustack data dir, so we enforce it here.
  233. data_dir_mount = input.k8s_volume_mounts[0]
  234. data_dir_mount.name = "gpustack-data-dir"
  235. data_dir_mount.mount_path = "/var/lib/gpustack"
  236. data_dir_mount.read_only = False
  237. data_dir_mount.volume_source.host_path.type = "DirectoryOrCreate"
  238. data_dir_mount.volume_source.config_map = None
  239. data_dir_mount.volume_source.persistent_volume_claim = None
  240. @router.post("", response_model=ClusterPublic, response_model_exclude_none=True)
  241. async def create_cluster(
  242. session: SessionDep, ctx: TenantContextDep, input: ClusterCreate
  243. ):
  244. # Every cluster has an owner Org. Fill in a sensible default when the
  245. # caller omitted it: their current Org context, or the platform Org
  246. # for admin in "All" mode (admin's home is Default).
  247. if input.owner_principal_id is None:
  248. input.owner_principal_id = ctx.current_principal_id or PLATFORM_PRINCIPAL_ID
  249. validate_owner_principal(input.owner_principal_id, ctx, resource_label="cluster")
  250. # Cluster names are unique within their owning Org, not globally —
  251. # two Orgs can each have a "c1".
  252. existing = await Cluster.one_by_fields(
  253. session,
  254. {
  255. 'deleted_at': None,
  256. "name": input.name,
  257. "owner_principal_id": input.owner_principal_id,
  258. },
  259. )
  260. if existing:
  261. raise AlreadyExistsException(message=f"cluster {input.name} already exists")
  262. create_update_check(input.provider, input)
  263. if input.provider == ClusterProvider.Kubernetes:
  264. enforce_data_dir_mounts(input)
  265. # Auto-promote the first cluster in an Org to that Org's default so
  266. # users don't have to flip a separate switch after onboarding.
  267. has_existing_in_org = await Cluster.first_by_field(
  268. session=session, field="owner_principal_id", value=input.owner_principal_id
  269. )
  270. auto_default = has_existing_in_org is None
  271. access_key = secrets.token_hex(8)
  272. secret_key = secrets.token_hex(16)
  273. target_state = ClusterStateEnum.READY
  274. state_message = None
  275. if input.provider not in [ClusterProvider.Kubernetes, ClusterProvider.Docker]:
  276. target_state = ClusterStateEnum.PENDING
  277. state_message = "No workers have been provisioned for this cluster yet."
  278. pools = input.worker_pools or []
  279. to_create_cluster = Cluster.model_validate(
  280. {
  281. **input.model_dump(exclude={"worker_pools"}),
  282. "state": target_state,
  283. "state_message": state_message,
  284. "hashed_suffix": secrets.token_hex(6),
  285. "registration_token": f"{API_KEY_PREFIX}_{access_key}_{secret_key}",
  286. "is_default": auto_default,
  287. }
  288. )
  289. to_create_user = User(
  290. username=f'{system_name_prefix}-{to_create_cluster.hashed_suffix}',
  291. is_system=True,
  292. role=UserRole.Cluster,
  293. hashed_password="",
  294. )
  295. to_create_apikey = ApiKey(
  296. name=f'{system_name_prefix}-{to_create_cluster.hashed_suffix}',
  297. access_key=access_key,
  298. hashed_secret_key=get_secret_hash(secret_key),
  299. )
  300. try:
  301. # create cluster
  302. cluster = await Cluster.create(session, to_create_cluster, auto_commit=False)
  303. # create pools
  304. for pool in pools:
  305. to_create_pool = WorkerPool.model_validate(
  306. {
  307. **pool.model_dump(),
  308. "cluster_id": cluster.id,
  309. # Pool inherits its cluster's owner so list filters
  310. # can scope without joining.
  311. "owner_principal_id": cluster.owner_principal_id,
  312. "cloud_options": (
  313. pool.cloud_options if pool.cloud_options else CloudOptions()
  314. ),
  315. }
  316. )
  317. to_create_pool.cluster = cluster
  318. await WorkerPool.create(
  319. session=session, source=to_create_pool, auto_commit=False
  320. )
  321. to_create_user.cluster = cluster
  322. user = await create_user_with_principal(session, to_create_user)
  323. to_create_apikey.user_id = user.id
  324. await ApiKey.create(session=session, source=to_create_apikey, auto_commit=False)
  325. await session.commit()
  326. await session.refresh(cluster)
  327. return cluster
  328. except Exception as e:
  329. raise InternalServerErrorException(message=f"Failed to create cluster: {e}")
  330. @router.put("/{id}", response_model=ClusterPublic, response_model_exclude_none=True)
  331. async def update_cluster(
  332. session: SessionDep, ctx: TenantContextDep, id: int, input: ClusterUpdate
  333. ):
  334. cluster = await Cluster.one_by_id(session, id)
  335. if not cluster:
  336. raise NotFoundException(message=f"cluster {id} not found")
  337. assert_cluster_writable(ctx, cluster)
  338. create_update_check(cluster.provider, input)
  339. if cluster.provider == ClusterProvider.Kubernetes:
  340. enforce_data_dir_mounts(input)
  341. try:
  342. await cluster.update(session=session, source=input)
  343. except Exception as e:
  344. raise InternalServerErrorException(message=f"Failed to update cluster: {e}")
  345. return await Cluster.one_by_id(
  346. session,
  347. id,
  348. options=CLUSTER_LOAD_OPTIONS,
  349. )
  350. @router.delete("/{id}")
  351. async def delete_cluster(session: SessionDep, ctx: TenantContextDep, id: int):
  352. existing = await Cluster.one_by_id(
  353. session,
  354. id,
  355. options=[
  356. selectinload(Cluster.cluster_workers),
  357. selectinload(Cluster.cluster_models),
  358. selectinload(Cluster.cluster_model_instances),
  359. ],
  360. )
  361. if not existing or existing.deleted_at is not None:
  362. raise NotFoundException(message=f"cluster {id} not found")
  363. assert_cluster_writable(ctx, existing)
  364. # check for workers, if any are present, prevent deletion
  365. if len(existing.cluster_workers) > 0:
  366. raise ConflictException(
  367. message=f"cluster {existing.name}(id: {id}) has workers, cannot be deleted"
  368. )
  369. # check for models, if any are present, prevent deletion
  370. if len(existing.cluster_models) > 0:
  371. raise ConflictException(
  372. message=f"cluster {existing.name}(id: {id}) has models, cannot be deleted"
  373. )
  374. # check for model instances, if any are present, prevent deletion
  375. if len(existing.cluster_model_instances) > 0:
  376. raise ConflictException(
  377. message=f"cluster {existing.name}(id: {id}) has model instances, cannot be deleted"
  378. )
  379. try:
  380. await existing.delete(session=session)
  381. except Exception as e:
  382. raise InternalServerErrorException(message=f"Failed to delete cluster: {e}")
  383. @router.post("/{id}/set-default")
  384. async def set_default_cluster(session: SessionDep, ctx: TenantContextDep, id: int):
  385. # "Default cluster" is a per-Org concept now: each Org has at most
  386. # one default, and that's what its members' deploy form falls back
  387. # to. Writing it follows the standard cluster-write rule (admin
  388. # always; Org admin only on their own Org's clusters).
  389. cluster = await Cluster.one_by_id(session, id)
  390. if not cluster:
  391. raise NotFoundException(message=f"cluster {id} not found")
  392. assert_cluster_writable(ctx, cluster)
  393. try:
  394. # Unset any existing default in this cluster's Org. The partial
  395. # unique index guarantees there's at most one to begin with.
  396. existing_defaults = await Cluster.all_by_fields(
  397. session,
  398. {
  399. 'is_default': True,
  400. 'deleted_at': None,
  401. 'owner_principal_id': cluster.owner_principal_id,
  402. },
  403. )
  404. for dc in existing_defaults:
  405. if dc.id != cluster.id:
  406. await dc.update(
  407. session=session,
  408. source={"is_default": False},
  409. auto_commit=False,
  410. )
  411. # Set this cluster as the Org's default.
  412. await cluster.update(
  413. session=session,
  414. source={"is_default": True},
  415. auto_commit=False,
  416. )
  417. await session.commit()
  418. except Exception as e:
  419. raise InternalServerErrorException(
  420. message=f"Failed to set default cluster: {e}"
  421. )
  422. @router.post("/{id}/worker-pools", response_model=WorkerPoolPublic)
  423. async def create_worker_pool(
  424. session: SessionDep, ctx: TenantContextDep, id: int, input: WorkerPoolCreate
  425. ):
  426. cluster = await Cluster.one_by_id(session, id)
  427. if not cluster or cluster.deleted_at is not None:
  428. raise NotFoundException(message=f"cluster {id} not found")
  429. assert_cluster_writable(ctx, cluster)
  430. if cluster.provider in [ClusterProvider.Docker, ClusterProvider.Kubernetes]:
  431. raise InvalidException(
  432. message=f"Cannot create worker pool for cluster {cluster.name}(id: {id}) with provider {cluster.provider}"
  433. )
  434. try:
  435. cloud_options = input.cloud_options or CloudOptions()
  436. worker_pool = WorkerPool.model_validate(
  437. {
  438. **input.model_dump(),
  439. "cluster_id": id,
  440. "owner_principal_id": cluster.owner_principal_id,
  441. "cloud_options": cloud_options,
  442. }
  443. )
  444. worker_pool.cluster = cluster
  445. return await WorkerPool.create(session, worker_pool)
  446. except Exception as e:
  447. raise InternalServerErrorException(message=f"Failed to create worker pool: {e}")
  448. def get_registration_from_cluster(
  449. request: Request, cluster: Cluster
  450. ) -> ClusterRegistrationTokenPublic:
  451. config = cluster.worker_config.model_dump() if cluster.worker_config else {}
  452. sensitive_registration = SensitiveRegistrationConfig(
  453. token=cluster.registration_token, **config
  454. )
  455. return ClusterRegistrationTokenPublic(
  456. token=cluster.registration_token,
  457. server_url=get_server_url(request, cluster.server_url),
  458. image=get_cluster_image_name(
  459. cluster.worker_config
  460. ), # Default image, can be customized
  461. env=parse_base_model_to_env_vars(sensitive_registration),
  462. args=[],
  463. )
  464. @router.get("/{id}/registration-token", response_model=ClusterRegistrationTokenPublic)
  465. async def get_registration_token(
  466. request: Request, session: SessionDep, ctx: TenantContextDep, id: int
  467. ):
  468. cluster = await Cluster.one_by_id(session, id)
  469. if not cluster or cluster.deleted_at is not None:
  470. raise NotFoundException(message=f"cluster {id} not found")
  471. # Registration token is a write-class secret (anyone holding it can
  472. # register a worker into this cluster) — gate with the writable check.
  473. assert_cluster_writable(ctx, cluster)
  474. return get_registration_from_cluster(request, cluster)
  475. @router.get("/{id}/manifests")
  476. async def get_cluster_manifests(
  477. request: Request,
  478. session: SessionDep,
  479. id: int,
  480. runtime: Optional[ManufacturerEnum] = Query(
  481. None, description="Optional runtime to include in the manifest"
  482. ),
  483. ):
  484. cluster = await Cluster.one_by_id(session, id)
  485. if not cluster or cluster.deleted_at is not None:
  486. raise NotFoundException(message=f"cluster {id} not found")
  487. if cluster.provider != ClusterProvider.Kubernetes:
  488. raise InvalidException(
  489. message=f"Cannot get manifests for cluster {cluster.name}(id: {id}) with provider {cluster.provider}"
  490. )
  491. config = TemplateConfig(
  492. registration=get_registration_from_cluster(request, cluster),
  493. cluster_suffix=cluster.hashed_suffix,
  494. namespace=getattr(cluster.worker_config, "namespace", None),
  495. runtime_enum=runtime,
  496. k8s_volume_mounts=cluster.k8s_volume_mounts,
  497. )
  498. yaml_content = config.render()
  499. return Response(
  500. content=yaml_content,
  501. media_type="application/x-yaml",
  502. headers={"Content-Disposition": "attachment; filename=manifest.yaml"},
  503. )
  504. @router.get("/{id}/dashboard")
  505. async def get_cluster_dashboard(
  506. session: SessionDep,
  507. ctx: TenantContextDep,
  508. id: int,
  509. request: Request,
  510. ):
  511. cluster = await Cluster.one_by_id(session, id)
  512. assert_cluster_visible(ctx, cluster, not_found_message="cluster not found")
  513. cfg = get_global_config()
  514. if not cfg.get_grafana_url() or not cfg.grafana_worker_dashboard_uid:
  515. raise InternalServerErrorException(
  516. message="Grafana dashboard settings are not configured"
  517. )
  518. query_params = {"var-cluster_name": cluster.name}
  519. grafana_base = resolve_grafana_base_url(cfg, request)
  520. slug = "gpustack-worker"
  521. dashboard_url = f"{grafana_base}/d/{cfg.grafana_worker_dashboard_uid}/{slug}"
  522. if query_params:
  523. dashboard_url = f"{dashboard_url}?{urlencode(query_params)}"
  524. return RedirectResponse(url=dashboard_url, status_code=302)
  525. # Hop-by-hop headers and other things we should not forward to the worker; the
  526. # worker layer will inject its own Authorization, and the worker→k8s leg will
  527. # inject the in-pod ServiceAccount token.
  528. _CLUSTER_PROXY_REQUEST_HEADER_SKIP = {
  529. "host",
  530. "content-length",
  531. "transfer-encoding",
  532. "connection",
  533. "keep-alive",
  534. "proxy-authenticate",
  535. "proxy-authorization",
  536. "te",
  537. "trailer",
  538. "upgrade",
  539. "authorization",
  540. "cookie",
  541. "x-api-key",
  542. "x-forwarded-host",
  543. "x-forwarded-port",
  544. "x-forwarded-proto",
  545. }
  546. @router.api_route(
  547. "/{id}/proxy/{path:path}",
  548. methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"],
  549. include_in_schema=False,
  550. )
  551. async def cluster_apiserver_proxy(
  552. request: Request,
  553. session: SessionDep,
  554. id: int,
  555. path: str,
  556. ):
  557. """
  558. Proxy a request to the Kubernetes API server of a Kubernetes-provider
  559. cluster, by forwarding it through one of the cluster's worker pods. The
  560. worker uses its in-pod ServiceAccount credentials to call the API server.
  561. """
  562. cluster = await Cluster.one_by_id(session, id)
  563. if not cluster or cluster.deleted_at is not None:
  564. raise NotFoundException(message=f"cluster {id} not found")
  565. if cluster.provider != ClusterProvider.Kubernetes:
  566. raise InvalidException(
  567. message=(
  568. f"cluster {cluster.name}(id: {id}) provider is "
  569. f"{cluster.provider.value}; API server proxy is only supported "
  570. "for Kubernetes-provider clusters."
  571. )
  572. )
  573. workers = await Worker.all_by_fields(
  574. session,
  575. fields={"cluster_id": id, "state": WorkerStateEnum.READY},
  576. )
  577. if not workers:
  578. raise ServiceUnavailableException(
  579. message=f"No ready workers in cluster {cluster.name}(id: {id})"
  580. )
  581. worker = random.choice(workers)
  582. headers = {
  583. k: v
  584. for k, v in request.headers.items()
  585. if k.lower() not in _CLUSTER_PROXY_REQUEST_HEADER_SKIP
  586. }
  587. # Stream the request body through to avoid loading large payloads (e.g. apply
  588. # of big manifests) into memory.
  589. body = (
  590. request.stream() if request.method not in ("GET", "HEAD", "OPTIONS") else None
  591. )
  592. # request.query_params preserves order but a flat dict is sufficient for
  593. # the Kubernetes API surface we forward (no duplicate keys in practice).
  594. params = dict(request.query_params) or None
  595. # No total timeout — Kubernetes watch and log-follow streams may be open
  596. # indefinitely. Connect timeout still bounds the upstream connect step.
  597. timeout = aiohttp.ClientTimeout(total=None, sock_connect=10)
  598. return StreamingResponseWithStatusCode(
  599. stream_to_worker(
  600. worker=worker,
  601. method=request.method,
  602. path=f"cluster-proxy/{path}",
  603. proxy_client=request.app.state.http_client,
  604. no_proxy_client=request.app.state.http_client_no_proxy,
  605. params=params,
  606. data=body,
  607. headers=headers,
  608. timeout=timeout,
  609. raw=True,
  610. ),
  611. )