import json
import re
import uuid_utils.compat as uuid
from django.db.models import QuerySet
from application.models import ApplicationApiKey, Application, ChatUserType, ChatSourceChoices
from chat.serializers.chat import ChatSerializers
class MCPToolHandler:
def __init__(self, auth_header):
app_key = QuerySet(ApplicationApiKey).filter(secret_key=auth_header, is_active=True).first()
if not app_key:
raise PermissionError("Invalid API Key")
self.application = QuerySet(Application).filter(id=app_key.application_id, is_publish=True).first()
if not self.application:
raise PermissionError("Application is not found or not published")
def initialize(self):
return {
"protocolVersion": "2025-06-18",
"serverInfo": {
"name": "maxkb-mcp",
"version": "1.0.0"
},
"capabilities": {
"tools": {}
}
}
def list_tools(self):
return {
"tools": [
{
"name": f'agent_{str(self.application.id)[:8]}',
"description": f'{self.application.name} {self.application.desc}',
"inputSchema": {
"type": "object",
"properties": {
"message": {"type": "string", "description": "The message to send to the AI."},
},
"required": ["message"]
}
}
]
}
def _get_chat_id(self):
from application.models import ChatUserType
from chat.serializers.chat import OpenChatSerializers
from common.init import init_template
init_template.run()
return OpenChatSerializers(data={
'application_id': self.application.id,
'chat_user_id': str(uuid.uuid7()),
'chat_user_type': ChatUserType.ANONYMOUS_USER,
'ip_address': '-',
'source': {"type": ChatSourceChoices.ONLINE.value},
'debug': False
}).open()
def call_tool(self, params):
name = params["name"]
args = params.get("arguments", {})
# print(params)
payload = {
'message': args.get('message'),
'stream': True,
're_chat': False
}
resp = ChatSerializers(data={
'chat_id': self._get_chat_id(),
'chat_user_id': str(uuid.uuid7()),
'chat_user_type': ChatUserType.ANONYMOUS_USER,
'application_id': self.application.id,
'ip_address': '-',
'source': {"type": ChatSourceChoices.ONLINE.value},
'debug': False,
}).chat(payload)
chunks = []
for raw_line in resp:
line = raw_line.decode("utf-8", errors="replace").rstrip("\r\n")
if not line.startswith("data:"):
continue
payload = line[5:].strip()
if not payload or payload == "[DONE]":
continue
try:
event = json.loads(payload)
except json.JSONDecodeError:
continue
if event.get("operate") is True:
chunks.append(event.get("content", ""))
if event.get("is_end"):
break
data = ''.join(chunks)
# 排除标签
data = re.sub(r'.*?', '', data, flags=re.DOTALL)
return {"content": [{"type": "text", "text": data}]}