| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 |
- from enum import Enum
- from kubernetes_asyncio import client
- from kubernetes_asyncio.client import V1ObjectMeta
- from typing import List, Optional, Dict, Any
- from pydantic import BaseModel, Field
- # --- McpBridge Data Structures (Pydantic) ---
- GROUP = "networking.istio.io"
- VERSION = "v1alpha3"
- PLURAL = "envoyfilters"
- class ApplyToStringEnum(str, Enum):
- INVALID = "INVALID"
- LISTENER = "LISTENER"
- FILTER_CHAIN = "FILTER_CHAIN"
- NETWORK_FILTER = "NETWORK_FILTER"
- HTTP_FILTER = "HTTP_FILTER"
- ROUTE_CONFIGURATION = "ROUTE_CONFIGURATION"
- VIRTUAL_HOST = "VIRTUAL_HOST"
- HTTP_ROUTE = "HTTP_ROUTE"
- CLUSTER = "CLUSTER"
- EXTENSION_CONFIG = "EXTENSION_CONFIG"
- BOOTSTRAP = "BOOTSTRAP"
- LISTENER_FILTER = "LISTENER_FILTER"
- class MatchContextEnum(str, Enum):
- ANY = "ANY"
- SIDECAR_INBOUND = "SIDECAR_INBOUND"
- SIDECAR_OUTBOUND = "SIDECAR_OUTBOUND"
- GATEWAY = "GATEWAY"
- class OperationStringEnum(str, Enum):
- INVALID = "INVALID"
- MERGE = "MERGE"
- ADD = "ADD"
- REMOVE = "REMOVE"
- INSERT_BEFORE = "INSERT_BEFORE"
- INSERT_AFTER = "INSERT_AFTER"
- INSERT_FIRST = "INSERT_FIRST"
- REPLACE = "REPLACE"
- class PatchFilterClassEnum(str, Enum):
- UNSPECIFIED = "UNSPECIFIED"
- AUTHN = "AUTHN"
- AUTHZ = "AUTHZ"
- STATS = "STATS"
- class RouteActionEnum(str, Enum):
- ANY = "ANY"
- ROUTE = "ROUTE"
- REDIRECT = "REDIRECT"
- DIRECT_RESPONSE = "DIRECT_RESPONSE"
- class WorkloadSelector(BaseModel):
- labels: Optional[Dict[str, str]] = Field(default_factory=dict)
- class PolicyTargetReference(BaseModel):
- group: str
- kind: str
- name: str
- namespace: str = ""
- class ProxyMatch(BaseModel):
- proxyVersion: Optional[str] = None
- metadata: Optional[Dict[str, Any]] = None
- class ListenerMatch(BaseModel):
- portNumber: Optional[int] = None
- portName: Optional[str] = None
- filterChain: Optional[Dict[str, Any]] = None
- listenerFilter: Optional[str] = None
- name: Optional[str] = None
- class VirtualHostMatchRoute(BaseModel):
- name: Optional[str] = None
- action: Optional[RouteActionEnum] = None
- class VirtualHostMatch(BaseModel):
- name: Optional[str] = None
- domainName: Optional[str] = None
- route: Optional[VirtualHostMatchRoute] = None
- class RouteConfigurationMatch(BaseModel):
- portNumber: Optional[int] = None
- portName: Optional[str] = None
- gateway: Optional[str] = None
- vhost: Optional[VirtualHostMatch] = None
- name: Optional[str] = None
- class ClusterMatch(BaseModel):
- portNumber: Optional[int] = None
- service: Optional[str] = None
- subset: Optional[str] = None
- name: Optional[str] = None
- class MatchObjectType(BaseModel):
- listener: Optional[ListenerMatch] = None
- routeConfiguration: Optional[RouteConfigurationMatch] = None
- cluster: Optional[ClusterMatch] = None
- class EnvoyConfigObjectMatch(MatchObjectType):
- context: MatchContextEnum
- proxy: Optional[ProxyMatch] = None
- class Patch(BaseModel):
- operation: OperationStringEnum
- value: Optional[Dict[str, Any]] = None
- filterClass: Optional[PatchFilterClassEnum] = None
- class EnvoyConfigPatchObject(BaseModel):
- applyTo: ApplyToStringEnum
- match: Optional[EnvoyConfigObjectMatch] = None
- patch: Optional[Patch] = None
- class EnvoyFilterSpec(BaseModel):
- workloadSelector: Optional[WorkloadSelector] = None
- targetRefs: Optional[List[PolicyTargetReference]] = None
- configPatches: Optional[List[EnvoyConfigPatchObject]] = None
- priority: Optional[int] = None
- class EnvoyFilter(BaseModel):
- apiVersion: str = f"{GROUP}/{VERSION}"
- kind: str = "EnvoyFilter"
- metadata: Optional[Dict[str, Any]] = Field(default_factory=dict)
- spec: Optional[EnvoyFilterSpec] = None
- def parse_metadata(self) -> V1ObjectMeta:
- """
- Parse the metadata dictionary into a V1ObjectMeta object.
- """
- return V1ObjectMeta(**self.metadata) if self.metadata else V1ObjectMeta()
- class NetworkingIstioIoV1Alpha3Api:
- def __init__(self, api_client: client.ApiClient):
- self.custom_api = client.CustomObjectsApi(api_client)
- async def edit_envoyfilter(
- self, namespace: str, name: str, body: EnvoyFilter
- ) -> Dict[str, Any]:
- """Edit (replace) a EnvoyFilter resource."""
- return await self.custom_api.replace_namespaced_custom_object(
- GROUP,
- VERSION,
- namespace,
- PLURAL,
- name,
- (
- body.model_dump(by_alias=True, exclude_none=True)
- if isinstance(body, EnvoyFilter)
- else body
- ),
- )
- async def create_envoyfilter(
- self, namespace: str, body: EnvoyFilter
- ) -> Dict[str, Any]:
- """Create a EnvoyFilter resource in the given namespace."""
- return await self.custom_api.create_namespaced_custom_object(
- GROUP,
- VERSION,
- namespace,
- PLURAL,
- (
- body.model_dump(by_alias=True, exclude_none=True)
- if isinstance(body, EnvoyFilter)
- else body
- ),
- )
- async def get_envoyfilter(self, namespace: str, name: str) -> Dict[str, Any]:
- """Get a EnvoyFilter resource by name."""
- return await self.custom_api.get_namespaced_custom_object(
- GROUP, VERSION, namespace, PLURAL, name
- )
- async def list_envoyfilters(
- self, namespace: str, label_selector: Optional[str] = None
- ) -> Dict[str, Any]:
- """List all EnvoyFilter resources in the given namespace."""
- return await self.custom_api.list_namespaced_custom_object(
- GROUP, VERSION, namespace, PLURAL, label_selector=label_selector
- )
- async def patch_envoyfilter(
- self, namespace: str, name: str, body: Dict[str, Any]
- ) -> Dict[str, Any]:
- """Patch a EnvoyFilter resource."""
- return await self.custom_api.patch_namespaced_custom_object(
- GROUP, VERSION, namespace, PLURAL, name, body
- )
- async def delete_envoyfilter(
- self, namespace: str, name: str, body: Optional[Dict[str, Any]] = None
- ) -> Dict[str, Any]:
- """Delete a EnvoyFilter resource."""
- return await self.custom_api.delete_namespaced_custom_object(
- GROUP, VERSION, namespace, PLURAL, name, body=body
- )
- def get_4xx_5xx_fallback_value(
- ingress_name: str,
- fallback_header: str = "x-higress-fallback-from",
- extra_req_headers: Optional[Dict[str, str]] = None,
- ) -> Dict[str, Any]:
- response_headers = [
- {
- "append": False,
- "header": {"key": fallback_header, "value": ingress_name},
- }
- ]
- request_headers = response_headers.copy()
- if extra_req_headers:
- for key, value in extra_req_headers.items():
- request_headers.append(
- {
- "append": False,
- "header": {"key": key, "value": value},
- }
- )
- redirect_policy = {
- "keep_original_response_code": False,
- "max_internal_redirects": 10,
- "only_redirect_upstream_code": False,
- "request_headers_to_add": request_headers,
- "response_headers_to_add": response_headers,
- "use_original_request_body": True,
- "use_original_request_uri": True,
- }
- action = {
- "name": "action",
- "typed_config": {
- "@type": "type.googleapis.com/udpa.type.v1.TypedStruct",
- "type_url": "type.googleapis.com/envoy.extensions.http.custom_response.redirect_policy.v3.RedirectPolicy",
- "value": redirect_policy,
- },
- }
- def predicate_response_code(code_class: str) -> Dict[str, Any]:
- return {
- "single_predicate": {
- "input": {
- "name": f"{code_class}_response",
- "typed_config": {
- "@type": "type.googleapis.com/envoy.type.matcher.v3.HttpResponseStatusCodeClassMatchInput"
- },
- },
- "value_match": {"exact": code_class},
- }
- }
- # matcher
- matcher = {
- "on_match": {"action": action},
- "predicate": {
- "or_matcher": {
- "predicate": [
- predicate_response_code("4xx"),
- predicate_response_code("5xx"),
- ]
- }
- },
- }
- # custom_response_matcher
- custom_response_matcher = {"matcher_list": {"matchers": [matcher]}}
- typed_per_filter_config = {
- "typed_per_filter_config": {
- "envoy.filters.http.custom_response": {
- "@type": "type.googleapis.com/udpa.type.v1.TypedStruct",
- "type_url": "type.googleapis.com/envoy.extensions.filters.http.custom_response.v3.CustomResponse",
- "value": {"custom_response_matcher": custom_response_matcher},
- }
- }
- }
- return typed_per_filter_config
- def get_ingress_fallback_envoyfilter(
- ingress_name: str,
- namespace: str,
- fallback_header: str = "x-higress-fallback-from",
- labels: Optional[Dict[str, str]] = None,
- extra_req_headers: Optional[Dict[str, str]] = None,
- ) -> EnvoyFilter:
- object_match = EnvoyConfigObjectMatch(
- context=MatchContextEnum.GATEWAY,
- routeConfiguration=RouteConfigurationMatch(
- vhost=VirtualHostMatch(
- route=VirtualHostMatchRoute(
- name=ingress_name,
- ),
- ),
- ),
- )
- envoyfilter = EnvoyFilter(
- metadata={
- "name": ingress_name,
- "namespace": namespace,
- "labels": labels or {},
- },
- spec=EnvoyFilterSpec(
- configPatches=[
- EnvoyConfigPatchObject(
- applyTo=ApplyToStringEnum.HTTP_ROUTE,
- match=object_match,
- patch=Patch(
- operation=OperationStringEnum.MERGE,
- value=get_4xx_5xx_fallback_value(
- ingress_name, fallback_header, extra_req_headers
- ),
- ),
- )
- ]
- ),
- )
- return envoyfilter
|