| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- from kubernetes_asyncio import client
- from kubernetes_asyncio.client import V1ObjectMeta
- from typing import List, Optional, Dict, Any
- from pydantic import BaseModel, Field
- # --- McpBridge Data Structures (Pydantic) ---
- GROUP = "networking.higress.io"
- VERSION = "v1"
- PLURAL = "mcpbridges"
- class McpBridgeProxy(BaseModel):
- connectTimeout: Optional[int] = None
- listenerPort: Optional[int] = None
- name: Optional[str] = None
- serverAddress: Optional[str] = None
- serverPort: Optional[int] = None
- type: Optional[str] = None
- class VPortService(BaseModel):
- name: Optional[str] = None
- value: Optional[int] = None
- class VPort(BaseModel):
- default: Optional[int] = None
- services: Optional[List[VPortService]] = Field(default_factory=list)
- class McpBridgeRegistry(BaseModel):
- allowMcpServers: Optional[List[str]] = Field(default_factory=list)
- authSecretName: Optional[str] = None
- consulDatacenter: Optional[str] = None
- consulNamespace: Optional[str] = None
- consulRefreshInterval: Optional[int] = None
- consulServiceTag: Optional[str] = None
- domain: Optional[str] = None
- enableMCPServer: Optional[bool] = None
- enableScopeMcpServers: Optional[bool] = None
- mcpServerBaseUrl: Optional[str] = None
- mcpServerExportDomains: Optional[List[str]] = Field(default_factory=list)
- metadata: Optional[Dict[str, Any]] = Field(default_factory=dict)
- nacosAccessKey: Optional[str] = None
- nacosAddressServer: Optional[str] = None
- nacosGroups: Optional[List[str]] = Field(default_factory=list)
- nacosNamespace: Optional[str] = None
- nacosNamespaceId: Optional[str] = None
- nacosRefreshInterval: Optional[int] = None
- nacosSecretKey: Optional[str] = None
- name: Optional[str] = None
- port: Optional[int] = None
- protocol: Optional[str] = None
- proxyName: Optional[str] = None
- sni: Optional[str] = None
- type: Optional[str] = None
- vport: Optional[VPort] = None
- zkServicesPath: Optional[List[str]] = Field(default_factory=list)
- def get_service_name(self) -> str:
- """The service name for this registry."""
- return f"{self.name}.{self.type}"
- def get_service_name_with_port(self) -> str:
- """The service name with port for this registry."""
- return f"{self.get_service_name()}:{self.port if self.port else 80}"
- class McpBridgeSpec(BaseModel):
- proxies: Optional[List[McpBridgeProxy]] = Field(default_factory=list)
- registries: Optional[List[McpBridgeRegistry]] = Field(default_factory=list)
- class McpBridgeStatus(BaseModel):
- data: Optional[Dict[str, Any]] = Field(default_factory=dict)
- class McpBridge(BaseModel):
- apiVersion: str = f"{GROUP}/{VERSION}"
- kind: str = "McpBridge"
- metadata: Optional[Dict[str, Any]] = Field(default_factory=dict)
- spec: Optional[McpBridgeSpec] = None
- status: Optional[McpBridgeStatus] = None
- def parse_metadata(self) -> V1ObjectMeta:
- """
- Parse the metadata dictionary into a V1ObjectMeta object.
- """
- return V1ObjectMeta(**self.metadata) if self.metadata else V1ObjectMeta()
- class NetworkingHigressIoV1Api:
- def __init__(self, api_client: client.ApiClient):
- self.custom_api = client.CustomObjectsApi(api_client)
- async def edit_mcpbridge(
- self, namespace: str, name: str, body: McpBridge
- ) -> Dict[str, Any]:
- """Edit (replace) a McpBridge resource."""
- return await self.custom_api.replace_namespaced_custom_object(
- GROUP,
- VERSION,
- namespace,
- PLURAL,
- name,
- (
- body.model_dump(by_alias=True, exclude_none=True)
- if isinstance(body, McpBridge)
- else body
- ),
- )
- async def create_mcpbridge(self, namespace: str, body: McpBridge) -> Dict[str, Any]:
- """Create a McpBridge resource in the given namespace."""
- return await self.custom_api.create_namespaced_custom_object(
- GROUP,
- VERSION,
- namespace,
- PLURAL,
- (
- body.model_dump(by_alias=True, exclude_none=True)
- if isinstance(body, McpBridge)
- else body
- ),
- )
- async def get_mcpbridge(self, namespace: str, name: str) -> Dict[str, Any]:
- """Get a McpBridge resource by name."""
- return await self.custom_api.get_namespaced_custom_object(
- GROUP, VERSION, namespace, PLURAL, name
- )
- async def list_mcpbridges(
- self, namespace: str, label_selector: Optional[str] = None
- ) -> Dict[str, Any]:
- """List all McpBridge resources in the given namespace."""
- return await self.custom_api.list_namespaced_custom_object(
- GROUP, VERSION, namespace, PLURAL, label_selector=label_selector
- )
- async def patch_mcpbridge(
- self, namespace: str, name: str, body: Dict[str, Any]
- ) -> Dict[str, Any]:
- """Patch a McpBridge resource."""
- return await self.custom_api.patch_namespaced_custom_object(
- GROUP, VERSION, namespace, PLURAL, name, body
- )
- async def delete_mcpbridge(
- self, namespace: str, name: str, body: Optional[Dict[str, Any]] = None
- ) -> Dict[str, Any]:
- """Delete a McpBridge resource."""
- return await self.custom_api.delete_namespaced_custom_object(
- GROUP, VERSION, namespace, PLURAL, name, body=body
- )
|