__init__.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855
  1. import time
  2. import asyncio
  3. import base64
  4. import os
  5. import logging
  6. import yaml
  7. import copy
  8. from functools import partial
  9. from typing import Any, Dict, Tuple, List, Optional, Literal
  10. from pydantic import BaseModel
  11. from kubernetes_asyncio import client as k8s_client
  12. from kubernetes_asyncio.client import Configuration
  13. from kubernetes_asyncio.config.kube_config import KubeConfigLoader, KubeConfigMerger, ConfigNode
  14. from kubernetes_asyncio.config.incluster_config import (
  15. InClusterConfigLoader,
  16. SERVICE_TOKEN_FILENAME,
  17. SERVICE_CERT_FILENAME,
  18. )
  19. from kubernetes_asyncio.client.rest import ApiException
  20. from gpustack.api.auth import GATEWAY_AUTH_TOKEN_HEADER
  21. from gpustack.config.config import Config
  22. from gpustack.schemas.config import GatewayModeEnum
  23. from gpustack import envs
  24. from gpustack.gateway import client as gw_client
  25. from gpustack.gateway.client import (
  26. McpBridge,
  27. McpBridgeSpec,
  28. McpBridgeRegistry,
  29. WasmPluginSpec,
  30. WasmPluginMatchRule,
  31. )
  32. from gpustack.gateway.labels_annotations import managed_labels, match_labels
  33. from gpustack.gateway.utils import (
  34. default_mcp_bridge_name,
  35. openai_model_prefixes,
  36. anthropic_model_exact,
  37. gpustack_ai_proxy_name,
  38. gpustack_model_mapper_name,
  39. gpustack_generic_route_transformer_name,
  40. mcp_ingress_equal,
  41. get_default_mcpbridge_ref,
  42. ensure_wasm_plugin,
  43. router_header_key,
  44. gpustack_original_path_header,
  45. gpustack_fallback_path_header,
  46. )
  47. from gpustack.gateway.plugins import (
  48. get_plugin_url_with_name_and_version,
  49. )
  50. from gpustack.security import AUTH_CACHE_HEADER
  51. logger = logging.getLogger(__name__)
  52. mcp_registry_port = 80
  53. supported_openai_routes = [
  54. route for v in openai_model_prefixes for route in v.flattened_prefixes()
  55. ]
  56. supported_anthropic_routes = [
  57. route for v in anthropic_model_exact for route in v.flattened_prefixes()
  58. ]
  59. async_gateway_config: Configuration = None
  60. def init_async_k8s_config(cfg: Config):
  61. if cfg.gateway_mode == GatewayModeEnum.disabled:
  62. return
  63. global async_gateway_config
  64. if async_gateway_config is not None:
  65. return
  66. configuration = Configuration()
  67. if cfg.gateway_mode == GatewayModeEnum.incluster:
  68. cfg_loader = InClusterConfigLoader(
  69. token_filename=SERVICE_TOKEN_FILENAME,
  70. cert_filename=SERVICE_CERT_FILENAME,
  71. )
  72. cfg_loader.load_and_set(configuration)
  73. else:
  74. kubeconfig_path = cfg.gateway_kubeconfig
  75. if not kubeconfig_path or not os.path.isfile(kubeconfig_path):
  76. logger.debug(f"Kubeconfig not found at {kubeconfig_path}, skipping k8s config initialization")
  77. return
  78. config_dict = KubeConfigMerger(cfg.gateway_kubeconfig).config
  79. if isinstance(config_dict, ConfigNode):
  80. config_dict = config_dict.value
  81. if not config_dict or not config_dict.get("current-context"):
  82. logger.debug(f"Kubeconfig at {kubeconfig_path} is empty or missing current-context, skipping k8s config initialization")
  83. return
  84. cfg_loader = KubeConfigLoader(config_dict=config_dict)
  85. if not cfg_loader._load_user_token():
  86. cfg_loader._load_user_pass_token()
  87. cfg_loader._load_cluster_info()
  88. cfg_loader._set_config(configuration)
  89. async_gateway_config = configuration
  90. def get_async_k8s_config(cfg: Config) -> Optional[Configuration]:
  91. if cfg.gateway_mode == GatewayModeEnum.disabled:
  92. return None
  93. global async_gateway_config
  94. if async_gateway_config is None:
  95. init_async_k8s_config(cfg=cfg)
  96. return async_gateway_config
  97. def wait_for_apiserver_ready(cfg: Config, timeout: int = 60, interval: int = 5):
  98. async def get_api_resources():
  99. config = get_async_k8s_config(cfg)
  100. start = time.time()
  101. v1 = k8s_client.CoreV1Api(k8s_client.ApiClient(configuration=config))
  102. while True:
  103. try:
  104. await v1.get_api_resources()
  105. break
  106. except Exception:
  107. if time.time() - start > timeout:
  108. raise
  109. await asyncio.sleep(interval)
  110. try:
  111. asyncio.run(get_api_resources())
  112. except asyncio.CancelledError:
  113. raise
  114. def get_gpustack_higress_registry(cfg: Config) -> McpBridgeRegistry:
  115. registry_type = "dns"
  116. domain = f"{cfg.service_discovery_name}.{cfg.get_namespace()}.svc"
  117. mcp_port = cfg.get_api_port()
  118. if cfg.gateway_mode != GatewayModeEnum.incluster:
  119. registry_type = "static"
  120. mcp_port = mcp_registry_port
  121. port = cfg.get_api_port()
  122. if cfg.gateway_mode == GatewayModeEnum.external:
  123. address = cfg.get_advertise_address()
  124. elif cfg.gateway_mode == GatewayModeEnum.embedded:
  125. address = "127.0.0.1"
  126. domain = f"{address}:{port}"
  127. mcp_registry_name = (
  128. "gpustack"
  129. if cfg.server_role() != Config.ServerRole.WORKER
  130. else "gpustack-worker"
  131. )
  132. registry = McpBridgeRegistry(
  133. type=registry_type,
  134. name=mcp_registry_name,
  135. port=mcp_port,
  136. protocol="http",
  137. domain=domain,
  138. )
  139. return registry
  140. async def ensure_mcp_resources(cfg: Config, api_client: k8s_client.ApiClient):
  141. api = gw_client.NetworkingHigressIoV1Api(api_client)
  142. # use default name for embedded mode
  143. gateway_namespace = cfg.gateway_namespace
  144. try:
  145. data: Dict[str, Any] = await api.get_mcpbridge(
  146. namespace=gateway_namespace, name=default_mcp_bridge_name
  147. )
  148. default_bridge = McpBridge.model_validate(data)
  149. except ApiException as e:
  150. if e.status == 404:
  151. default_bridge = None
  152. else:
  153. raise
  154. target_registry = get_gpustack_higress_registry(cfg=cfg)
  155. try:
  156. if not default_bridge:
  157. bridge = McpBridge(
  158. metadata={"name": "default", "namespace": gateway_namespace},
  159. spec=McpBridgeSpec(registries=[target_registry]),
  160. )
  161. await api.create_mcpbridge(namespace=gateway_namespace, body=bridge)
  162. else:
  163. should_update = False
  164. registries = (
  165. default_bridge.spec.registries
  166. if default_bridge.spec and default_bridge.spec.registries
  167. else []
  168. )
  169. if not any(r.name == target_registry.name for r in registries):
  170. if default_bridge.spec is None:
  171. default_bridge.spec = McpBridgeSpec()
  172. registries.append(target_registry)
  173. default_bridge.spec.registries = registries
  174. should_update = True
  175. else:
  176. registry = next(r for r in registries if r.name == target_registry.name)
  177. if (
  178. registry.type != target_registry.type
  179. or registry.domain != target_registry.domain
  180. or registry.port != target_registry.port
  181. or registry.protocol != target_registry.protocol
  182. ):
  183. registry.type = target_registry.type
  184. registry.domain = target_registry.domain
  185. registry.port = target_registry.port
  186. registry.protocol = target_registry.protocol
  187. should_update = True
  188. if should_update:
  189. await api.edit_mcpbridge(
  190. namespace=gateway_namespace, name='default', body=default_bridge
  191. )
  192. except ApiException as e:
  193. raise RuntimeError("Failed to ensure ingress resources") from e
  194. async def ensure_ingress_resources(cfg: Config, api_client: k8s_client.ApiClient):
  195. """
  196. Ensure the ingress resources to route traffic to mcpbridge are created.
  197. """
  198. gateway_namespace = cfg.gateway_namespace
  199. hostname = cfg.get_external_hostname()
  200. tls_secret_name = cfg.get_tls_secret_name()
  201. network_v1_client = k8s_client.NetworkingV1Api(api_client=api_client)
  202. ingress_name = envs.GATEWAY_MIRROR_INGRESS_NAME
  203. try:
  204. ingress: k8s_client.V1Ingress = await network_v1_client.read_namespaced_ingress(
  205. name=ingress_name, namespace=gateway_namespace
  206. )
  207. except ApiException as e:
  208. if e.status == 404:
  209. ingress = None
  210. else:
  211. raise
  212. registry = get_gpustack_higress_registry(cfg=cfg)
  213. expected_rule = k8s_client.V1IngressRule(
  214. http=k8s_client.V1HTTPIngressRuleValue(
  215. paths=[
  216. k8s_client.V1HTTPIngressPath(
  217. path="/",
  218. path_type="Prefix",
  219. backend=k8s_client.V1IngressBackend(
  220. resource=get_default_mcpbridge_ref()
  221. ),
  222. )
  223. ]
  224. ),
  225. )
  226. expected_ingress = k8s_client.V1Ingress(
  227. metadata=k8s_client.V1ObjectMeta(
  228. name=ingress_name,
  229. namespace=gateway_namespace,
  230. annotations={
  231. "higress.io/destination": f"{registry.get_service_name_with_port()}",
  232. "higress.io/ignore-path-case": "false",
  233. },
  234. labels=managed_labels,
  235. ),
  236. spec=k8s_client.V1IngressSpec(
  237. ingress_class_name=cfg.gateway_ingress_class,
  238. rules=[expected_rule],
  239. ),
  240. )
  241. if tls_secret_name is not None:
  242. expected_ingress.spec.tls = [
  243. k8s_client.V1IngressTLS(
  244. hosts=[hostname] if hostname is not None else None,
  245. secret_name=tls_secret_name,
  246. )
  247. ]
  248. if hostname is not None:
  249. host_rule = copy.deepcopy(expected_rule)
  250. host_rule.host = hostname
  251. expected_ingress.spec.rules.append(host_rule)
  252. if not ingress:
  253. await network_v1_client.create_namespaced_ingress(
  254. namespace=gateway_namespace, body=expected_ingress
  255. )
  256. elif match_labels(getattr(ingress.metadata, 'labels', {}), managed_labels):
  257. # only update ingress managed by gpustack
  258. if not mcp_ingress_equal(ingress, expected_ingress):
  259. await network_v1_client.replace_namespaced_ingress(
  260. name=ingress_name, namespace=gateway_namespace, body=expected_ingress
  261. )
  262. def get_match_rules(
  263. match_type: Literal["whitelist", "blacklist"],
  264. paths: List[Tuple[str, str]],
  265. ) -> Dict[str, Any]:
  266. match_list = [
  267. {
  268. "match_rule_path": pair[0],
  269. "match_rule_type": pair[1],
  270. }
  271. for pair in paths
  272. ]
  273. return {
  274. "match_list": match_list,
  275. "match_type": match_type,
  276. }
  277. def ext_auth_plugin(cfg: Config) -> Tuple[str, WasmPluginSpec]:
  278. resource_name = "gpustack-llm-ext-auth"
  279. registry = get_gpustack_higress_registry(cfg=cfg)
  280. # this is to auth requests except for gpustack
  281. default_match_rule = get_match_rules(
  282. match_type="blacklist",
  283. paths=[("/", "prefix")],
  284. )
  285. gpustack_match_rule = get_match_rules(
  286. match_type="whitelist",
  287. paths=[("/", "prefix")],
  288. )
  289. http_service = {
  290. "authorization_request": {
  291. "allowed_headers": [
  292. {"exact": "X-GPUStack-Real-IP"},
  293. {"exact": "x-higress-llm-model"},
  294. {"exact": "x-api-key"},
  295. {"exact": "cookie"},
  296. {"exact": AUTH_CACHE_HEADER},
  297. ],
  298. "headers_to_add": {
  299. GATEWAY_AUTH_TOKEN_HEADER: cfg.get_derived_gateway_token(),
  300. },
  301. },
  302. "authorization_response": {
  303. "allowed_upstream_headers": [
  304. {"exact": "X-Mse-Consumer"},
  305. {"exact": "Authorization"},
  306. {"exact": "cookie"},
  307. {"exact": AUTH_CACHE_HEADER},
  308. ]
  309. },
  310. "endpoint": {
  311. "path": "/token-auth",
  312. "request_method": "GET",
  313. "service_name": registry.get_service_name(),
  314. "service_port": registry.port,
  315. },
  316. "endpoint_mode": "forward_auth",
  317. "timeout": envs.HIGRESS_EXT_AUTH_TIMEOUT_MS,
  318. }
  319. namespace = cfg.get_namespace()
  320. if namespace == cfg.gateway_namespace:
  321. namespace = ""
  322. # the ingress in plugin matchRules should not contains namespace prefix
  323. # if it is in the same namespace with the gateway.
  324. ingress_name = f"{namespace}/{envs.GATEWAY_MIRROR_INGRESS_NAME}".lstrip("/")
  325. expected_spec = WasmPluginSpec(
  326. defaultConfig={
  327. "http_service": http_service,
  328. **default_match_rule,
  329. },
  330. defaultConfigDisable=False,
  331. failStrategy="FAIL_OPEN",
  332. phase="AUTHN",
  333. priority=360,
  334. url=get_plugin_url_with_name_and_version(
  335. name="ext-auth", version="2.0.0", cfg=cfg
  336. ),
  337. matchRules=[
  338. WasmPluginMatchRule(
  339. config={
  340. "http_service": http_service,
  341. **gpustack_match_rule,
  342. },
  343. configDisable=False,
  344. ingress=[ingress_name],
  345. )
  346. ],
  347. )
  348. return resource_name, expected_spec
  349. def ai_statistics_plugin(cfg: Config) -> Tuple[str, WasmPluginSpec]:
  350. resource_name = "gpustack-ai-statistics"
  351. expected_spec = WasmPluginSpec(
  352. defaultConfig={
  353. "enable_content_types": envs.GATEWAY_AI_STATISTICS_PLUGIN_CONTENT_TYPES,
  354. "attributes": [
  355. {
  356. "apply_to_log": True,
  357. "apply_to_span": False,
  358. "key": "consumer",
  359. "value": "x-mse-consumer",
  360. "value_source": "request_header",
  361. }
  362. ],
  363. },
  364. defaultConfigDisable=False,
  365. failStrategy="FAIL_OPEN",
  366. imagePullPolicy="UNSPECIFIED_POLICY",
  367. matchRules=[],
  368. phase="UNSPECIFIED_PHASE",
  369. priority=900,
  370. url=get_plugin_url_with_name_and_version(
  371. name="ai-statistics", version="2.0.0", cfg=cfg
  372. ),
  373. )
  374. return resource_name, expected_spec
  375. def model_router_plugin(cfg: Config) -> Tuple[str, WasmPluginSpec]:
  376. resource_name = "gpustack-model-router"
  377. enabled_paths = supported_openai_routes + supported_anthropic_routes
  378. enabled_paths.append("/model/proxy")
  379. expected_spec = WasmPluginSpec(
  380. defaultConfig={
  381. 'modelToHeader': 'x-higress-llm-model',
  382. 'enableOnPathSuffix': enabled_paths,
  383. },
  384. defaultConfigDisable=False,
  385. failStrategy="FAIL_OPEN",
  386. imagePullPolicy="UNSPECIFIED_POLICY",
  387. matchRules=[],
  388. phase="AUTHN",
  389. priority=900,
  390. url=get_plugin_url_with_name_and_version(
  391. name="model-router", version="2.0.0", cfg=cfg
  392. ),
  393. )
  394. return resource_name, expected_spec
  395. def model_pre_route_plugin(cfg: Config) -> Tuple[str, WasmPluginSpec]:
  396. resource_name = "gpustack-set-model-pre-route"
  397. enabled_path_suffixes = supported_openai_routes + supported_anthropic_routes
  398. enabled_path_prefixes = ["/model/proxy"]
  399. expected_spec = WasmPluginSpec(
  400. defaultConfig={
  401. 'clusterNameHeader': router_header_key,
  402. 'routeNameHeader': 'X-GPUStack-Route-Name',
  403. 'enableOnPathSuffix': enabled_path_suffixes,
  404. 'enableOnPathPrefix': enabled_path_prefixes,
  405. },
  406. defaultConfigDisable=False,
  407. failStrategy="FAIL_OPEN",
  408. imagePullPolicy="UNSPECIFIED_POLICY",
  409. matchRules=[],
  410. phase="AUTHN",
  411. priority=90,
  412. url=get_plugin_url_with_name_and_version(
  413. name="gpustack-set-header-pre-route", version="1.0.0", cfg=cfg
  414. ),
  415. )
  416. return resource_name, expected_spec
  417. def model_mapper_plugin(cfg: Config) -> Tuple[str, WasmPluginSpec]:
  418. return gpustack_model_mapper_name, WasmPluginSpec(
  419. phase="AUTHN",
  420. priority=800,
  421. url=get_plugin_url_with_name_and_version(
  422. name="model-mapper", version="2.0.0", cfg=cfg
  423. ),
  424. defaultConfigDisable=False,
  425. defaultConfig={"modelMapping": {}},
  426. matchRules=[],
  427. failStrategy="FAIL_OPEN",
  428. )
  429. class HeaderRule(BaseModel):
  430. key: Optional[str] = None
  431. newKey: Optional[str] = None
  432. oldKey: Optional[str] = None
  433. fromKey: Optional[str] = None
  434. toKey: Optional[str] = None
  435. value: Optional[str] = None
  436. newValue: Optional[str] = None
  437. appendValue: Optional[str] = None
  438. value_type: Optional[Literal["object", "bool", "number", "string"]] = None
  439. strategy: Optional[Literal["RETAIN_FIRST", "RETAIN_LAST", "RETAIN_UNIQUE"]] = None
  440. host_pattern: Optional[str] = None
  441. path_pattern: Optional[str] = None
  442. def transform_header(
  443. operate: Literal["remove", "rename", "replace", "add", "append", "map", "dedupe"],
  444. *rules: HeaderRule,
  445. ) -> Dict[str, Any]:
  446. # TODO: add validation in the future
  447. return {
  448. "headers": [rule.model_dump(exclude_none=True) for rule in rules],
  449. "operate": operate,
  450. }
  451. def transformer_plugin(cfg: Config) -> Tuple[str, WasmPluginSpec]:
  452. resource_name = "gpustack-header-transformer"
  453. expected_spec = WasmPluginSpec(
  454. defaultConfig={
  455. "reqRules": [
  456. transform_header(
  457. "remove",
  458. HeaderRule(
  459. key=GATEWAY_AUTH_TOKEN_HEADER,
  460. ),
  461. HeaderRule(
  462. key=router_header_key,
  463. ),
  464. ),
  465. transform_header(
  466. "rename",
  467. HeaderRule(
  468. oldKey="x-gpustack-model",
  469. newKey="x-higress-llm-model",
  470. ),
  471. HeaderRule(
  472. oldKey=gpustack_fallback_path_header,
  473. newKey=":path",
  474. ),
  475. ),
  476. transform_header(
  477. "dedupe",
  478. HeaderRule(
  479. key="x-gpustack-model",
  480. strategy="RETAIN_FIRST",
  481. ),
  482. HeaderRule(
  483. key="x-higress-llm-model",
  484. strategy="RETAIN_FIRST",
  485. ),
  486. HeaderRule(
  487. key=":path",
  488. strategy="RETAIN_LAST",
  489. ),
  490. ),
  491. transform_header(
  492. "map",
  493. HeaderRule(
  494. fromKey=':path',
  495. toKey=gpustack_original_path_header,
  496. ),
  497. ),
  498. transform_header(
  499. "remove",
  500. HeaderRule(
  501. key=gpustack_fallback_path_header,
  502. ),
  503. ),
  504. ],
  505. },
  506. defaultConfigDisable=False,
  507. failStrategy="FAIL_OPEN",
  508. imagePullPolicy="UNSPECIFIED_POLICY",
  509. matchRules=[],
  510. phase="AUTHN",
  511. priority=810,
  512. url=get_plugin_url_with_name_and_version(
  513. name="transformer", version="2.0.0", cfg=cfg
  514. ),
  515. )
  516. return resource_name, expected_spec
  517. def generic_route_transformer_plugin(cfg: Config) -> Tuple[str, WasmPluginSpec]:
  518. """
  519. Pre-route transformer that injects x-higress-llm-model based on the route id
  520. captured from /model/proxy/<id>/... paths. Per-route HeaderRules are merged
  521. into defaultConfig.reqRules by the per-route reconciler.
  522. defaultConfigDisable is fixed to False for the lifetime of the plugin —
  523. toggling it rewrites Envoy's filter chain and drops every in-flight
  524. connection through the gateway.
  525. Runtime shape after two generic routes (id=1 "route-one", id=2 "route-two")
  526. have been reconciled — the reconciler only mutates the `headers` list:
  527. apiVersion: extensions.higress.io/v1alpha1
  528. kind: WasmPlugin
  529. metadata:
  530. name: gpustack-generic-route-transformer
  531. spec:
  532. phase: AUTHN
  533. priority: 905
  534. defaultConfigDisable: false
  535. defaultConfig:
  536. reqRules:
  537. - operate: add
  538. headers:
  539. - key: x-higress-llm-model
  540. value: route-one
  541. path_pattern: ^/model/proxy/1(/.*)?$
  542. - key: x-higress-llm-model
  543. value: route-two
  544. path_pattern: ^/model/proxy/2(/.*)?$
  545. On a request for ``/model/proxy/1/v1/chat/completions`` Higress rewrites the
  546. match of path_pattern inside ``:path`` with ``value`` — the whole path is
  547. consumed by the pattern (``(/.*)?$`` tail), so the header becomes exactly
  548. ``route-one`` and routing falls through to the main ingress's header
  549. matcher.
  550. """
  551. expected_spec = WasmPluginSpec(
  552. defaultConfig={"reqRules": []},
  553. defaultConfigDisable=False,
  554. failStrategy="FAIL_OPEN",
  555. imagePullPolicy="UNSPECIFIED_POLICY",
  556. matchRules=[],
  557. phase="AUTHN",
  558. priority=905, # ahead of model-router (900) so header wins
  559. url=get_plugin_url_with_name_and_version(
  560. name="transformer", version="2.0.0", cfg=cfg
  561. ),
  562. )
  563. return gpustack_generic_route_transformer_name, expected_spec
  564. def token_usage_plugin(cfg: Config) -> Tuple[str, WasmPluginSpec]:
  565. registry = get_gpustack_higress_registry(cfg=cfg)
  566. resource_name = "gpustack-token-usage"
  567. expected_spec = WasmPluginSpec(
  568. defaultConfig={
  569. 'realIPToHeader': "X-GPUStack-Real-IP",
  570. 'endpoint': {
  571. "path": "/v2/usage/gateway-metrics",
  572. "service_name": registry.get_service_name(),
  573. "service_port": registry.port,
  574. },
  575. 'header_add': {
  576. GATEWAY_AUTH_TOKEN_HEADER: cfg.get_derived_gateway_token(),
  577. },
  578. },
  579. defaultConfigDisable=False,
  580. failStrategy="FAIL_OPEN",
  581. imagePullPolicy="UNSPECIFIED_POLICY",
  582. matchRules=[],
  583. phase="UNSPECIFIED_PHASE",
  584. priority=910,
  585. url=get_plugin_url_with_name_and_version(
  586. name="gpustack-token-usage", version="1.0.0", cfg=cfg
  587. ),
  588. )
  589. return resource_name, expected_spec
  590. def ai_proxy_plugin(cfg: Config) -> Tuple[str, WasmPluginSpec]:
  591. resource_name = gpustack_ai_proxy_name
  592. expected_spec = WasmPluginSpec(
  593. defaultConfig={},
  594. defaultConfigDisable=False,
  595. failStrategy="FAIL_OPEN",
  596. imagePullPolicy="UNSPECIFIED_POLICY",
  597. matchRules=[],
  598. priority=100,
  599. phase="UNSPECIFIED_PHASE",
  600. url=get_plugin_url_with_name_and_version(
  601. name="ai-proxy", version="2.0.0", cfg=cfg
  602. ),
  603. )
  604. return resource_name, expected_spec
  605. async def ensure_tls_secret(cfg: Config, api_client: k8s_client.ApiClient):
  606. """
  607. Ensure the TLS secret if ssl key pair is provided.
  608. """
  609. ssl_keyfile = cfg.ssl_keyfile
  610. ssl_certfile = cfg.ssl_certfile
  611. if not ssl_keyfile or not ssl_certfile:
  612. return
  613. if not (os.path.isfile(ssl_keyfile) and os.path.isfile(ssl_certfile)):
  614. raise RuntimeError(
  615. f"SSL keyfile {ssl_keyfile} or certfile {ssl_certfile} does not exist"
  616. )
  617. # read key and cert files and encode into base64
  618. with open(ssl_keyfile, 'rb') as f:
  619. ssl_key_bytes = f.read()
  620. with open(ssl_certfile, 'rb') as f:
  621. ssl_cert_bytes = f.read()
  622. ssl_key_data = base64.b64encode(ssl_key_bytes).decode()
  623. ssl_cert_data = base64.b64encode(ssl_cert_bytes).decode()
  624. gateway_namespace = cfg.gateway_namespace
  625. core_v1_client = k8s_client.CoreV1Api(api_client=api_client)
  626. secret_name = cfg.get_tls_secret_name()
  627. to_create_tls_secret = k8s_client.V1Secret(
  628. metadata=k8s_client.V1ObjectMeta(
  629. name=secret_name,
  630. namespace=gateway_namespace,
  631. labels=managed_labels,
  632. ),
  633. type="kubernetes.io/tls",
  634. data={
  635. "tls.key": ssl_key_data,
  636. "tls.crt": ssl_cert_data,
  637. },
  638. )
  639. try:
  640. existing_secret: k8s_client.V1Secret = (
  641. await core_v1_client.read_namespaced_secret(
  642. name=secret_name, namespace=gateway_namespace
  643. )
  644. )
  645. except ApiException as e:
  646. if e.status == 404:
  647. existing_secret = None
  648. else:
  649. raise
  650. if not existing_secret:
  651. await core_v1_client.create_namespaced_secret(
  652. namespace=gateway_namespace, body=to_create_tls_secret
  653. )
  654. elif match_labels(getattr(existing_secret.metadata, 'labels', {}), managed_labels):
  655. if existing_secret.data != to_create_tls_secret.data:
  656. await core_v1_client.replace_namespaced_secret(
  657. name=secret_name, namespace=gateway_namespace, body=to_create_tls_secret
  658. )
  659. async def ensure_gateway_timeout(cfg: Config, api_client: k8s_client.ApiClient):
  660. namespace = cfg.gateway_namespace
  661. higress_config_name = "higress-config"
  662. core_v1_client = k8s_client.CoreV1Api(api_client=api_client)
  663. try:
  664. higress_config: k8s_client.V1ConfigMap = (
  665. await core_v1_client.read_namespaced_config_map(
  666. name=higress_config_name, namespace=namespace
  667. )
  668. )
  669. should_update = False
  670. config_data: str = higress_config.data["higress"]
  671. config = yaml.safe_load(config_data)
  672. idle_timeout = (
  673. config.get("downstream", {}).get("idleTimeout")
  674. if isinstance(config, dict)
  675. else None
  676. )
  677. if idle_timeout is None or str(idle_timeout) != f"{envs.PROXY_TIMEOUT}":
  678. config.setdefault("downstream", {})["idleTimeout"] = envs.PROXY_TIMEOUT
  679. higress_config.data["higress"] = yaml.safe_dump(config)
  680. should_update = True
  681. upstream_idle_timeout = (
  682. config.get("upstream", {}).get("idleTimeout")
  683. if isinstance(config, dict)
  684. else None
  685. )
  686. if (
  687. upstream_idle_timeout is None
  688. or str(upstream_idle_timeout) != f"{envs.PROXY_UPSTREAM_IDLE_TIMEOUT}"
  689. ):
  690. config.setdefault("upstream", {})[
  691. "idleTimeout"
  692. ] = envs.PROXY_UPSTREAM_IDLE_TIMEOUT
  693. higress_config.data["higress"] = yaml.safe_dump(config)
  694. should_update = True
  695. if should_update:
  696. await core_v1_client.replace_namespaced_config_map(
  697. name=higress_config_name,
  698. namespace=namespace,
  699. body=higress_config,
  700. )
  701. except Exception as e:
  702. logger.error(f"Failed to read or parse Higress config map: {e}")
  703. raise
  704. def spec_replace(
  705. current_spec: Optional[WasmPluginSpec],
  706. expected_spec: WasmPluginSpec,
  707. create_only: bool = False,
  708. ) -> WasmPluginSpec:
  709. if current_spec is None:
  710. return expected_spec
  711. if create_only:
  712. if current_spec.url != expected_spec.url:
  713. current_spec.url = expected_spec.url
  714. return current_spec
  715. return expected_spec
  716. def validate_ai_statistics_plugin_content_types():
  717. for content_type in envs.GATEWAY_AI_STATISTICS_PLUGIN_CONTENT_TYPES:
  718. if content_type == "audio/pcm":
  719. raise ValueError(
  720. "audio/pcm content type is not supported in ai statistics plugin"
  721. )
  722. def initialize_gateway(cfg: Config, timeout: int = 60, interval: int = 5):
  723. if cfg.gateway_mode == GatewayModeEnum.disabled:
  724. return
  725. # For embedded/external mode, wait for kubeconfig file to be available
  726. # (apiserver s6 service generates it on startup)
  727. if cfg.gateway_mode != GatewayModeEnum.incluster:
  728. kubeconfig_path = cfg.gateway_kubeconfig
  729. start_time = time.time()
  730. while not kubeconfig_path or not os.path.isfile(kubeconfig_path):
  731. if time.time() - start_time > timeout:
  732. logger.warning(f"Kubeconfig not found at {kubeconfig_path} after {timeout}s, skipping gateway setup")
  733. return
  734. logger.info(f"Waiting for kubeconfig at {kubeconfig_path}...")
  735. time.sleep(interval)
  736. init_async_k8s_config(cfg=cfg)
  737. # If k8s config couldn't be initialized (e.g., no valid kubeconfig), skip gateway setup
  738. if async_gateway_config is None:
  739. logger.warning("Gateway k8s config could not be initialized, skipping gateway setup")
  740. return
  741. wait_for_apiserver_ready(cfg=cfg, timeout=timeout, interval=interval)
  742. if cfg.gateway_mode in [
  743. GatewayModeEnum.embedded,
  744. GatewayModeEnum.external,
  745. GatewayModeEnum.incluster,
  746. ]:
  747. validate_ai_statistics_plugin_content_types()
  748. plugin_list: List[Tuple[str, WasmPluginSpec]] = [
  749. ext_auth_plugin(cfg=cfg),
  750. ai_statistics_plugin(cfg=cfg),
  751. model_router_plugin(cfg=cfg),
  752. ai_proxy_plugin(cfg=cfg),
  753. model_pre_route_plugin(cfg=cfg),
  754. model_mapper_plugin(cfg=cfg),
  755. generic_route_transformer_plugin(cfg=cfg),
  756. ]
  757. if cfg.server_role() != Config.ServerRole.WORKER:
  758. plugin_list.append(transformer_plugin(cfg=cfg))
  759. plugin_list.append(token_usage_plugin(cfg=cfg))
  760. async def prepare():
  761. api_client = k8s_client.ApiClient(
  762. configuration=get_async_k8s_config(cfg=cfg)
  763. )
  764. await ensure_tls_secret(cfg=cfg, api_client=api_client)
  765. await ensure_mcp_resources(cfg=cfg, api_client=api_client)
  766. if cfg.gateway_mode != GatewayModeEnum.incluster:
  767. await ensure_gateway_timeout(cfg=cfg, api_client=api_client)
  768. await ensure_ingress_resources(cfg=cfg, api_client=api_client)
  769. for plugin_name, plugin_spec in plugin_list:
  770. create_only = plugin_name in [
  771. gpustack_ai_proxy_name,
  772. gpustack_model_mapper_name,
  773. gpustack_generic_route_transformer_name,
  774. ]
  775. spec_diff_func = partial(
  776. spec_replace, expected_spec=plugin_spec, create_only=create_only
  777. )
  778. await ensure_wasm_plugin(
  779. api=gw_client.ExtensionsHigressIoV1Api(api_client),
  780. name=plugin_name,
  781. namespace=cfg.gateway_namespace,
  782. spec_diff=spec_diff_func,
  783. )
  784. try:
  785. asyncio.run(prepare())
  786. except asyncio.CancelledError:
  787. raise
  788. except Exception as e:
  789. raise RuntimeError("Failed to initialize gateway resources") from e