networking_higress_io_v1_api.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. from kubernetes_asyncio import client
  2. from kubernetes_asyncio.client import V1ObjectMeta
  3. from typing import List, Optional, Dict, Any
  4. from pydantic import BaseModel, Field
  5. # --- McpBridge Data Structures (Pydantic) ---
  6. GROUP = "networking.higress.io"
  7. VERSION = "v1"
  8. PLURAL = "mcpbridges"
  9. class McpBridgeProxy(BaseModel):
  10. connectTimeout: Optional[int] = None
  11. listenerPort: Optional[int] = None
  12. name: Optional[str] = None
  13. serverAddress: Optional[str] = None
  14. serverPort: Optional[int] = None
  15. type: Optional[str] = None
  16. class VPortService(BaseModel):
  17. name: Optional[str] = None
  18. value: Optional[int] = None
  19. class VPort(BaseModel):
  20. default: Optional[int] = None
  21. services: Optional[List[VPortService]] = Field(default_factory=list)
  22. class McpBridgeRegistry(BaseModel):
  23. allowMcpServers: Optional[List[str]] = Field(default_factory=list)
  24. authSecretName: Optional[str] = None
  25. consulDatacenter: Optional[str] = None
  26. consulNamespace: Optional[str] = None
  27. consulRefreshInterval: Optional[int] = None
  28. consulServiceTag: Optional[str] = None
  29. domain: Optional[str] = None
  30. enableMCPServer: Optional[bool] = None
  31. enableScopeMcpServers: Optional[bool] = None
  32. mcpServerBaseUrl: Optional[str] = None
  33. mcpServerExportDomains: Optional[List[str]] = Field(default_factory=list)
  34. metadata: Optional[Dict[str, Any]] = Field(default_factory=dict)
  35. nacosAccessKey: Optional[str] = None
  36. nacosAddressServer: Optional[str] = None
  37. nacosGroups: Optional[List[str]] = Field(default_factory=list)
  38. nacosNamespace: Optional[str] = None
  39. nacosNamespaceId: Optional[str] = None
  40. nacosRefreshInterval: Optional[int] = None
  41. nacosSecretKey: Optional[str] = None
  42. name: Optional[str] = None
  43. port: Optional[int] = None
  44. protocol: Optional[str] = None
  45. proxyName: Optional[str] = None
  46. sni: Optional[str] = None
  47. type: Optional[str] = None
  48. vport: Optional[VPort] = None
  49. zkServicesPath: Optional[List[str]] = Field(default_factory=list)
  50. def get_service_name(self) -> str:
  51. """The service name for this registry."""
  52. return f"{self.name}.{self.type}"
  53. def get_service_name_with_port(self) -> str:
  54. """The service name with port for this registry."""
  55. return f"{self.get_service_name()}:{self.port if self.port else 80}"
  56. class McpBridgeSpec(BaseModel):
  57. proxies: Optional[List[McpBridgeProxy]] = Field(default_factory=list)
  58. registries: Optional[List[McpBridgeRegistry]] = Field(default_factory=list)
  59. class McpBridgeStatus(BaseModel):
  60. data: Optional[Dict[str, Any]] = Field(default_factory=dict)
  61. class McpBridge(BaseModel):
  62. apiVersion: str = f"{GROUP}/{VERSION}"
  63. kind: str = "McpBridge"
  64. metadata: Optional[Dict[str, Any]] = Field(default_factory=dict)
  65. spec: Optional[McpBridgeSpec] = None
  66. status: Optional[McpBridgeStatus] = None
  67. def parse_metadata(self) -> V1ObjectMeta:
  68. """
  69. Parse the metadata dictionary into a V1ObjectMeta object.
  70. """
  71. return V1ObjectMeta(**self.metadata) if self.metadata else V1ObjectMeta()
  72. class NetworkingHigressIoV1Api:
  73. def __init__(self, api_client: client.ApiClient):
  74. self.custom_api = client.CustomObjectsApi(api_client)
  75. async def edit_mcpbridge(
  76. self, namespace: str, name: str, body: McpBridge
  77. ) -> Dict[str, Any]:
  78. """Edit (replace) a McpBridge resource."""
  79. return await self.custom_api.replace_namespaced_custom_object(
  80. GROUP,
  81. VERSION,
  82. namespace,
  83. PLURAL,
  84. name,
  85. (
  86. body.model_dump(by_alias=True, exclude_none=True)
  87. if isinstance(body, McpBridge)
  88. else body
  89. ),
  90. )
  91. async def create_mcpbridge(self, namespace: str, body: McpBridge) -> Dict[str, Any]:
  92. """Create a McpBridge resource in the given namespace."""
  93. return await self.custom_api.create_namespaced_custom_object(
  94. GROUP,
  95. VERSION,
  96. namespace,
  97. PLURAL,
  98. (
  99. body.model_dump(by_alias=True, exclude_none=True)
  100. if isinstance(body, McpBridge)
  101. else body
  102. ),
  103. )
  104. async def get_mcpbridge(self, namespace: str, name: str) -> Dict[str, Any]:
  105. """Get a McpBridge resource by name."""
  106. return await self.custom_api.get_namespaced_custom_object(
  107. GROUP, VERSION, namespace, PLURAL, name
  108. )
  109. async def list_mcpbridges(
  110. self, namespace: str, label_selector: Optional[str] = None
  111. ) -> Dict[str, Any]:
  112. """List all McpBridge resources in the given namespace."""
  113. return await self.custom_api.list_namespaced_custom_object(
  114. GROUP, VERSION, namespace, PLURAL, label_selector=label_selector
  115. )
  116. async def patch_mcpbridge(
  117. self, namespace: str, name: str, body: Dict[str, Any]
  118. ) -> Dict[str, Any]:
  119. """Patch a McpBridge resource."""
  120. return await self.custom_api.patch_namespaced_custom_object(
  121. GROUP, VERSION, namespace, PLURAL, name, body
  122. )
  123. async def delete_mcpbridge(
  124. self, namespace: str, name: str, body: Optional[Dict[str, Any]] = None
  125. ) -> Dict[str, Any]:
  126. """Delete a McpBridge resource."""
  127. return await self.custom_api.delete_namespaced_custom_object(
  128. GROUP, VERSION, namespace, PLURAL, name, body=body
  129. )