| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581 |
- # coding=utf-8
- import asyncio
- import base64
- import json
- import pickle
- from functools import reduce
- from typing import Dict, List
- import requests
- import uuid_utils.compat as uuid
- from django.core.cache import cache
- from django.db import transaction
- from django.db.models import QuerySet
- from django.http import HttpResponse
- from django.utils import timezone
- from django.utils.translation import gettext_lazy as _
- from rest_framework import serializers, status
- from application.flow.common import Workflow, WorkflowMode
- from application.flow.i_step_node import KnowledgeWorkflowPostHandler
- from application.flow.knowledge_workflow_manage import KnowledgeWorkflowManage
- from application.flow.step_node import get_node
- from application.flow.tools import save_workflow_mapping
- from application.serializers.application import get_mcp_tools
- from common.constants.cache_version import Cache_Version
- from common.db.search import page_search
- from common.exception.app_exception import AppApiException
- from common.field.common import UploadedFileField
- from common.result import result
- from common.utils.common import bytes_to_uploaded_file
- from common.utils.common import restricted_loads, generate_uuid
- from common.utils.logger import maxkb_logger
- from common.utils.rsa_util import rsa_long_decrypt
- from common.utils.tool_code import ToolExecutor
- from knowledge.models import KnowledgeScope, Knowledge, KnowledgeType, KnowledgeWorkflow, KnowledgeWorkflowVersion, \
- File, FileSourceType
- from knowledge.models.knowledge_action import KnowledgeAction, State
- from knowledge.serializers.common import update_resource_mapping_by_knowledge
- from knowledge.serializers.knowledge import KnowledgeModelSerializer
- from system_manage.models import AuthTargetType
- from system_manage.models.resource_mapping import ResourceType
- from system_manage.serializers.user_resource_permission import UserResourcePermissionSerializer
- from tools.models import Tool, ToolScope, ToolType, ToolWorkflow
- from tools.serializers.tool import ToolExportModelSerializer
- from users.models import User
- tool_executor = ToolExecutor()
- def hand_node(node, update_tool_map):
- if node.get('type') == 'tool-lib-node':
- tool_lib_id = (node.get('properties', {}).get('node_data', {}).get('tool_lib_id') or '')
- node.get('properties', {}).get('node_data', {})['tool_lib_id'] = update_tool_map.get(tool_lib_id, tool_lib_id)
- if node.get('type') == 'search-knowledge-node':
- node.get('properties', {}).get('node_data', {})['knowledge_id_list'] = []
- if node.get('type') == 'ai-chat-node':
- node_data = node.get('properties', {}).get('node_data', {})
- mcp_tool_ids = node_data.get('mcp_tool_ids') or []
- node_data['mcp_tool_ids'] = [update_tool_map.get(tool_id,
- tool_id) for tool_id in mcp_tool_ids]
- tool_ids = node_data.get('tool_ids') or []
- node_data['tool_ids'] = [update_tool_map.get(tool_id,
- tool_id) for tool_id in tool_ids]
- skill_tool_ids = node_data.get('skill_tool_ids') or []
- node_data['skill_tool_ids'] = [update_tool_map.get(tool_id,
- tool_id) for tool_id in skill_tool_ids]
- if node.get('type') == 'mcp-node':
- mcp_tool_id = (node.get('properties', {}).get('node_data', {}).get('mcp_tool_id') or '')
- node.get('properties', {}).get('node_data', {})['mcp_tool_id'] = update_tool_map.get(mcp_tool_id,
- mcp_tool_id)
- if node.get('type') == 'tool-workflow-lib-node':
- tool_lib_id = (node.get('properties', {}).get('node_data', {}).get('tool_lib_id') or '')
- node.get('properties', {}).get('node_data', {})['tool_lib_id'] = update_tool_map.get(tool_lib_id,
- tool_lib_id)
- class KnowledgeWorkflowModelSerializer(serializers.ModelSerializer):
- class Meta:
- model = KnowledgeWorkflow
- fields = '__all__'
- class KnowledgeWorkflowActionRequestSerializer(serializers.Serializer):
- data_source = serializers.DictField(required=True, label=_('datasource data'))
- knowledge_base = serializers.DictField(required=True, label=_('knowledge base data'))
- class KnowledgeWorkflowImportRequest(serializers.Serializer):
- file = UploadedFileField(required=True, label=_("file"))
- class KnowledgeWorkflowActionListQuerySerializer(serializers.Serializer):
- user_name = serializers.CharField(required=False, label=_('Name'), allow_blank=True, allow_null=True)
- state = serializers.CharField(required=False, label=_("State"), allow_blank=True, allow_null=True)
- class KBWFInstance:
- def __init__(self, knowledge_workflow: dict, function_lib_list: List[dict], version: str, tool_list: List[dict]):
- self.knowledge_workflow = knowledge_workflow
- self.function_lib_list = function_lib_list
- self.version = version
- self.tool_list = tool_list
- def get_tool_list(self):
- return [*(self.tool_list or []), *(self.function_lib_list or [])]
- class KnowledgeWorkflowActionSerializer(serializers.Serializer):
- workspace_id = serializers.CharField(required=True, label=_('workspace id'))
- knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
- def get_query_set(self, instance: Dict):
- query_set = QuerySet(KnowledgeAction).filter(knowledge_id=self.data.get('knowledge_id')).values('id',
- 'knowledge_id',
- "state", 'meta',
- 'run_time',
- "create_time")
- if instance.get("user_name"):
- query_set = query_set.filter(meta__user_name__icontains=instance.get('user_name'))
- if instance.get('state'):
- query_set = query_set.filter(state=instance.get('state'))
- return query_set.order_by('-create_time')
- def list(self, instance: Dict, is_valid=True):
- if is_valid:
- self.is_valid(raise_exception=True)
- KnowledgeWorkflowActionListQuerySerializer(data=instance).is_valid(raise_exception=True)
- return [{'id': a.get("id"), 'knowledge_id': a.get("knowledge_id"), 'state': a.get("state"),
- 'meta': a.get("meta"), 'run_time': a.get("run_time"), 'create_time': a.get("create_time")} for a in
- self.get_query_set(instance)]
- def page(self, current_page, page_size, instance: Dict, is_valid=True):
- if is_valid:
- self.is_valid(raise_exception=True)
- KnowledgeWorkflowActionListQuerySerializer(data=instance).is_valid(raise_exception=True)
- return page_search(current_page, page_size, self.get_query_set(instance),
- lambda a: {'id': a.get("id"), 'knowledge_id': a.get("knowledge_id"), 'state': a.get("state"),
- 'meta': a.get("meta"), 'run_time': a.get("run_time"),
- 'create_time': a.get("create_time")})
- def action(self, instance: Dict, user, with_valid=True):
- if with_valid:
- self.is_valid(raise_exception=True)
- knowledge_workflow = QuerySet(KnowledgeWorkflow).filter(knowledge_id=self.data.get("knowledge_id")).first()
- knowledge_action_id = uuid.uuid7()
- meta = {'user_id': str(user.id),
- 'user_name': user.username}
- KnowledgeAction(id=knowledge_action_id,
- knowledge_id=self.data.get("knowledge_id"),
- state=State.STARTED,
- meta=meta).save()
- knowledge = QuerySet(Knowledge).filter(id=self.data.get('knowledge_id')).first()
- instance['knowledge_base'] = {**(instance.get('knowledge_base') or {}),
- 'knowledge': {'id': str(knowledge.id), 'name': knowledge.name,
- 'desc': knowledge.desc,
- 'workspace_id': knowledge.workspace_id}}
- work_flow_manage = KnowledgeWorkflowManage(
- Workflow.new_instance(knowledge_workflow.work_flow, WorkflowMode.KNOWLEDGE),
- {'knowledge_id': self.data.get("knowledge_id"), 'knowledge_action_id': knowledge_action_id, 'stream': True,
- 'workspace_id': self.data.get("workspace_id"),
- **instance},
- KnowledgeWorkflowPostHandler(None, knowledge_action_id),
- is_the_task_interrupted=lambda: cache.get(
- Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_key(action_id=knowledge_action_id),
- version=Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_version()) or False)
- work_flow_manage.run()
- return {'id': knowledge_action_id, 'knowledge_id': self.data.get("knowledge_id"), 'state': State.STARTED,
- 'details': {}, 'meta': meta}
- def upload_document(self, instance: Dict, user, with_valid=True):
- if with_valid:
- self.is_valid(raise_exception=True)
- knowledge_workflow = QuerySet(KnowledgeWorkflow).filter(knowledge_id=self.data.get("knowledge_id")).first()
- if not knowledge_workflow.is_publish:
- raise AppApiException(500, _("The knowledge base workflow has not been published"))
- knowledge_workflow_version = QuerySet(KnowledgeWorkflowVersion).filter(
- knowledge_id=self.data.get("knowledge_id")).order_by(
- '-create_time')[0:1].first()
- knowledge_action_id = uuid.uuid7()
- meta = {'user_id': str(user.id),
- 'user_name': user.username}
- KnowledgeAction(id=knowledge_action_id, knowledge_id=self.data.get("knowledge_id"), state=State.STARTED,
- meta=meta).save()
- knowledge = QuerySet(Knowledge).filter(id=self.data.get('knowledge_id')).first()
- instance['knowledge_base'] = {**(instance.get('knowledge_base') or {}),
- 'knowledge': {'id': str(knowledge.id), 'name': knowledge.name,
- 'desc': knowledge.desc,
- 'workspace_id': knowledge.workspace_id}}
- work_flow_manage = KnowledgeWorkflowManage(
- Workflow.new_instance(knowledge_workflow_version.work_flow, WorkflowMode.KNOWLEDGE),
- {'knowledge_id': self.data.get("knowledge_id"), 'knowledge_action_id': knowledge_action_id, 'stream': True,
- 'workspace_id': self.data.get("workspace_id"),
- **instance},
- KnowledgeWorkflowPostHandler(None, knowledge_action_id),
- is_the_task_interrupted=lambda: cache.get(
- Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_key(action_id=knowledge_action_id),
- version=Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_version()) or False
- )
- work_flow_manage.run()
- return {'id': knowledge_action_id, 'knowledge_id': self.data.get("knowledge_id"), 'state': State.STARTED,
- 'details': {}, 'meta': meta}
- class Operate(serializers.Serializer):
- workspace_id = serializers.CharField(required=True, label=_('workspace id'))
- knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
- id = serializers.UUIDField(required=True, label=_('knowledge action id'))
- def one(self, is_valid=True):
- if is_valid:
- self.is_valid(raise_exception=True)
- knowledge_action_id = self.data.get("id")
- knowledge_action = QuerySet(KnowledgeAction).filter(id=knowledge_action_id).first()
- return {'id': knowledge_action_id, 'knowledge_id': knowledge_action.knowledge_id,
- 'state': knowledge_action.state,
- 'details': knowledge_action.details,
- 'meta': knowledge_action.meta}
- def cancel(self, is_valid=True):
- if is_valid:
- self.is_valid(raise_exception=True)
- knowledge_action_id = self.data.get("id")
- cache.set(Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_key(action_id=knowledge_action_id), True,
- version=Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_version())
- QuerySet(KnowledgeAction).filter(id=knowledge_action_id, state__in=[State.STARTED, State.PENDING]).update(
- state=State.REVOKE)
- return True
- class KnowledgeWorkflowSerializer(serializers.Serializer):
- class Datasource(serializers.Serializer):
- type = serializers.CharField(required=True, label=_('type'))
- id = serializers.CharField(required=True, label=_('type'))
- params = serializers.DictField(required=True, label="")
- function_name = serializers.CharField(required=True, label=_('function_name'))
- def action(self):
- self.is_valid(raise_exception=True)
- if self.data.get('type') == 'local':
- node = get_node(self.data.get('id'), WorkflowMode.KNOWLEDGE)
- return node.__getattribute__(node, self.data.get("function_name"))(**self.data.get("params"))
- elif self.data.get('type') == 'tool':
- tool = QuerySet(Tool).filter(id=self.data.get("id")).first()
- init_params = json.loads(rsa_long_decrypt(tool.init_params))
- return tool_executor.exec_code(tool.code, {**init_params, **self.data.get('params')},
- self.data.get('function_name'))
- class Create(serializers.Serializer):
- user_id = serializers.UUIDField(required=True, label=_('user id'))
- workspace_id = serializers.CharField(required=True, label=_('workspace id'))
- scope = serializers.ChoiceField(
- required=False, label=_('scope'), default=KnowledgeScope.WORKSPACE, choices=KnowledgeScope.choices
- )
- @transaction.atomic
- def save_workflow(self, instance: Dict):
- self.is_valid(raise_exception=True)
- folder_id = instance.get('folder_id', self.data.get('workspace_id'))
- knowledge_id = uuid.uuid7()
- knowledge = Knowledge(
- id=knowledge_id,
- name=instance.get('name'),
- desc=instance.get('desc'),
- user_id=self.data.get('user_id'),
- type=instance.get('type', KnowledgeType.WORKFLOW),
- scope=self.data.get('scope', KnowledgeScope.WORKSPACE),
- folder_id=folder_id,
- workspace_id=self.data.get('workspace_id'),
- embedding_model_id=instance.get('embedding_model_id'),
- meta={},
- )
- knowledge.save()
- # 自动资源给授权当前用户
- UserResourcePermissionSerializer(data={
- 'workspace_id': self.data.get('workspace_id'),
- 'user_id': self.data.get('user_id'),
- 'auth_target_type': AuthTargetType.KNOWLEDGE.value
- }).auth_resource(str(knowledge_id))
- knowledge_workflow = KnowledgeWorkflow(
- id=uuid.uuid7(),
- knowledge_id=knowledge_id,
- workspace_id=self.data.get('workspace_id'),
- work_flow=instance.get('work_flow', {}),
- )
- knowledge_workflow.save()
- save_workflow_mapping(instance.get('work_flow', {}), ResourceType.KNOWLEDGE, str(knowledge_id))
- # 处理 work_flow_template
- if instance.get('work_flow_template') is not None:
- template_instance = instance.get('work_flow_template')
- download_url = template_instance.get('downloadUrl')
- # 查找匹配的版本名称
- res = requests.get(download_url, timeout=5)
- KnowledgeWorkflowSerializer.Import(data={
- 'user_id': self.data.get('user_id'),
- 'workspace_id': self.data.get('workspace_id'),
- 'knowledge_id': str(knowledge_id),
- }).import_({'file': bytes_to_uploaded_file(res.content, 'file.kbwf')}, is_import_tool=True)
- try:
- requests.get(template_instance.get('downloadCallbackUrl'), timeout=5)
- except Exception as e:
- maxkb_logger.error(f"callback appstore tool download error: {e}")
- return {**KnowledgeModelSerializer(knowledge).data, 'document_list': []}
- class Import(serializers.Serializer):
- user_id = serializers.UUIDField(required=True, label=_('user id'))
- workspace_id = serializers.CharField(required=False, label=_('workspace id'))
- knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
- @transaction.atomic
- def import_(self, instance: dict, is_import_tool, with_valid=True):
- if with_valid:
- self.is_valid()
- KnowledgeWorkflowImportRequest(data=instance).is_valid(raise_exception=True)
- user_id = self.data.get('user_id')
- workspace_id = self.data.get('workspace_id')
- knowledge_id = self.data.get('knowledge_id')
- kbwf_instance_bytes = instance.get('file').read()
- try:
- kbwf_instance = restricted_loads(kbwf_instance_bytes)
- except Exception as e:
- raise AppApiException(1001, _("Unsupported file format"))
- knowledge_workflow = kbwf_instance.knowledge_workflow
- tool_list = kbwf_instance.get_tool_list()
- update_tool_map = {}
- if len(tool_list) > 0:
- tool_id_list = reduce(lambda x, y: [*x, *y],
- [[tool.get('id'), generate_uuid((tool.get('id') + workspace_id or ''))]
- for tool
- in
- tool_list], [])
- # 存在的工具列表
- exits_tool_id_list = [str(tool.id) for tool in
- QuerySet(Tool).filter(id__in=tool_id_list, workspace_id=workspace_id)]
- # 需要更新的工具集合
- update_tool_map = {tool.get('id'): generate_uuid((tool.get('id') + workspace_id or '')) for tool
- in
- tool_list if
- not exits_tool_id_list.__contains__(
- tool.get('id'))}
- tool_list = [{**tool, 'id': update_tool_map.get(tool.get('id'))} for tool in tool_list if
- not exits_tool_id_list.__contains__(
- tool.get('id')) and not exits_tool_id_list.__contains__(
- generate_uuid((tool.get('id') + workspace_id or '')))]
- work_flow = self.to_knowledge_workflow(
- knowledge_workflow,
- update_tool_map,
- )
- tool_model_list = [self.to_tool(tool, workspace_id, user_id) for tool in tool_list]
- KnowledgeWorkflow.objects.filter(workspace_id=workspace_id, knowledge_id=knowledge_id).update_or_create(
- knowledge_id=knowledge_id,
- workspace_id=workspace_id,
- defaults={'work_flow': work_flow}
- )
- if is_import_tool:
- if len(tool_model_list) > 0:
- QuerySet(Tool).bulk_create(tool_model_list)
- QuerySet(ToolWorkflow).bulk_create(
- [ToolWorkflow(workspace_id=workspace_id,
- work_flow=self.reset_workflow(tool.get('work_flow'), update_tool_map),
- tool_id=tool.get('id'))
- for
- tool in tool_list if tool.get('tool_type') == ToolType.WORKFLOW])
- UserResourcePermissionSerializer(data={
- 'workspace_id': self.data.get('workspace_id'),
- 'user_id': self.data.get('user_id'),
- 'auth_target_type': AuthTargetType.TOOL.value
- }).auth_resource_batch([t.id for t in tool_model_list])
- return True
- update_resource_mapping_by_knowledge(knowledge_id)
- @staticmethod
- def to_knowledge_workflow(knowledge_workflow, update_tool_map):
- work_flow = knowledge_workflow.get("work_flow")
- for node in work_flow.get('nodes', []):
- hand_node(node, update_tool_map)
- if node.get('type') == 'loop_node':
- for n in node.get('properties', {}).get('node_data', {}).get('loop_body', {}).get('nodes', []):
- hand_node(n, update_tool_map)
- return work_flow
- @staticmethod
- def reset_workflow(work_flow, update_tool_map):
- for node in work_flow.get('nodes', []):
- hand_node(node, update_tool_map)
- if node.get('type') == 'loop-node':
- for n in node.get('properties', {}).get('node_data', {}).get('loop_body', {}).get('nodes', []):
- hand_node(n, update_tool_map)
- return work_flow
- @staticmethod
- def to_tool(tool, workspace_id, user_id):
- # 如果是技能类型的工具,需要将code保存为文件
- code = tool.get('code')
- if tool.get('tool_type') == ToolType.SKILL:
- skill_file_id = uuid.uuid7()
- skill_file = File(
- id=skill_file_id,
- file_name=f"{tool.get('name')}.zip",
- source_type=FileSourceType.TOOL,
- source_id=tool.get('id'),
- meta={}
- )
- skill_file.save(base64.b64decode(code))
- tool['code'] = skill_file_id
- return Tool(id=tool.get('id'),
- user_id=user_id,
- name=tool.get('name'),
- code=tool.get('code'),
- template_id=tool.get('template_id'),
- input_field_list=tool.get('input_field_list'),
- init_field_list=tool.get('init_field_list'),
- is_active=False if len((tool.get('init_field_list') or [])) > 0 else tool.get('is_active'),
- tool_type=tool.get('tool_type', 'CUSTOM') or 'CUSTOM',
- scope=ToolScope.SHARED if workspace_id == 'None' else ToolScope.WORKSPACE,
- folder_id='default' if workspace_id == 'None' else workspace_id,
- workspace_id=workspace_id)
- class Export(serializers.Serializer):
- user_id = serializers.UUIDField(required=True, label=_('user id'))
- workspace_id = serializers.CharField(required=False, label=_('workspace id'))
- knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
- def export(self, with_valid=True):
- try:
- if with_valid:
- self.is_valid()
- knowledge_id = self.data.get('knowledge_id')
- knowledge_workflow = QuerySet(KnowledgeWorkflow).filter(knowledge_id=knowledge_id).first()
- knowledge = QuerySet(Knowledge).filter(id=knowledge_id).first()
- from application.flow.tools import get_tool_id_list
- tool_id_list = get_tool_id_list(knowledge_workflow.work_flow, True)
- tool_list = []
- if len(tool_id_list) > 0:
- tool_list = QuerySet(Tool).filter(id__in=tool_id_list).exclude(scope=ToolScope.SHARED)
- tw_dict = {tw.tool_id: tw
- for tw in QuerySet(ToolWorkflow).filter(
- tool_id__in=[tool.id for tool in tool_list if tool.tool_type == ToolType.WORKFLOW])}
- # 如果是技能工具,则需要将code字段转换为文件内容的base64字符串
- for tool in tool_list:
- if tool.tool_type == ToolType.SKILL:
- skill_file = QuerySet(File).filter(id=tool.code).first()
- if skill_file:
- tool.code = base64.b64encode(skill_file.get_bytes()).decode('utf-8')
- knowledge_workflow_dict = KnowledgeWorkflowModelSerializer(knowledge_workflow).data
- kbwf_instance = KBWFInstance(
- knowledge_workflow_dict,
- [],
- 'v2',
- [self.to_tool_dict(tool, tw_dict) for tool in tool_list]
- )
- knowledge_workflow_pickle = pickle.dumps(kbwf_instance)
- response = HttpResponse(content_type='text/plain', content=knowledge_workflow_pickle)
- response['Content-Disposition'] = f'attachment; filename="{knowledge.name}.kbwf"'
- return response
- except Exception as e:
- return result.error(str(e), response_status=status.HTTP_500_INTERNAL_SERVER_ERROR)
- @staticmethod
- def to_tool_dict(tool, tool_workflow_dict):
- if tool.tool_type == ToolType.WORKFLOW:
- return {**ToolExportModelSerializer(tool).data, 'work_flow': tool_workflow_dict.get(tool.id).work_flow}
- return ToolExportModelSerializer(tool).data
- class Operate(serializers.Serializer):
- user_id = serializers.UUIDField(required=True, label=_('user id'))
- workspace_id = serializers.CharField(required=True, label=_('workspace id'))
- knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
- def publish(self, with_valid=True):
- if with_valid:
- self.is_valid()
- user_id = self.data.get('user_id')
- workspace_id = self.data.get("workspace_id")
- user = QuerySet(User).filter(id=user_id).first()
- knowledge_workflow = QuerySet(KnowledgeWorkflow).filter(knowledge_id=self.data.get("knowledge_id"),
- workspace_id=workspace_id).first()
- work_flow_version = KnowledgeWorkflowVersion(work_flow=knowledge_workflow.work_flow,
- knowledge_id=self.data.get("knowledge_id"),
- name=timezone.localtime(timezone.now()).strftime(
- '%Y-%m-%d %H:%M:%S'),
- publish_user_id=user_id,
- publish_user_name=user.username,
- workspace_id=workspace_id)
- work_flow_version.save()
- QuerySet(KnowledgeWorkflow).filter(
- knowledge_id=self.data.get("knowledge_id")
- ).update(is_publish=True, publish_time=timezone.now())
- return True
- def edit(self, instance: Dict):
- self.is_valid(raise_exception=True)
- if instance.get("work_flow"):
- QuerySet(KnowledgeWorkflow).update_or_create(knowledge_id=self.data.get("knowledge_id"),
- create_defaults={'id': uuid.uuid7(),
- 'knowledge_id': self.data.get(
- "knowledge_id"),
- "workspace_id": self.data.get(
- 'workspace_id'),
- 'work_flow': instance.get('work_flow',
- {}), },
- defaults={
- 'work_flow': instance.get('work_flow')
- })
- update_resource_mapping_by_knowledge(self.data.get("knowledge_id"))
- return self.one()
- if instance.get("work_flow_template"):
- template_instance = instance.get('work_flow_template')
- download_url = template_instance.get('downloadUrl')
- # 查找匹配的版本名称
- res = requests.get(download_url, timeout=5)
- KnowledgeWorkflowSerializer.Import(data={
- 'user_id': self.data.get('user_id'),
- 'workspace_id': self.data.get('workspace_id'),
- 'knowledge_id': str(self.data.get('knowledge_id')),
- }).import_({'file': bytes_to_uploaded_file(res.content, 'file.kbwf')}, is_import_tool=False)
- try:
- requests.get(template_instance.get('downloadCallbackUrl'), timeout=5)
- except Exception as e:
- maxkb_logger.error(f"callback appstore tool download error: {e}")
- return self.one()
- def one(self):
- self.is_valid(raise_exception=True)
- workflow = QuerySet(KnowledgeWorkflow).filter(knowledge_id=self.data.get('knowledge_id')).first()
- return {**KnowledgeWorkflowModelSerializer(workflow).data}
- class McpServersSerializer(serializers.Serializer):
- mcp_servers = serializers.JSONField(required=True)
- class KnowledgeWorkflowMcpSerializer(serializers.Serializer):
- knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
- user_id = serializers.UUIDField(required=True, label=_("User ID"))
- workspace_id = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_("Workspace ID"))
- def is_valid(self, *, raise_exception=False):
- super().is_valid(raise_exception=True)
- workspace_id = self.data.get('workspace_id')
- query_set = QuerySet(Knowledge).filter(id=self.data.get('knowledge_id'))
- if workspace_id:
- query_set = query_set.filter(workspace_id=workspace_id)
- if not query_set.exists():
- raise AppApiException(500, _('Knowledge id does not exist'))
- def get_mcp_servers(self, instance, with_valid=True):
- if with_valid:
- self.is_valid(raise_exception=True)
- McpServersSerializer(data=instance).is_valid(raise_exception=True)
- servers = json.loads(instance.get('mcp_servers'))
- for server, config in servers.items():
- if config.get('transport') not in ['sse', 'streamable_http']:
- raise AppApiException(500, _('Only support transport=sse or transport=streamable_http'))
- tools = []
- for server in servers:
- tools += [
- {
- 'server': server,
- 'name': tool.name,
- 'description': tool.description,
- 'args_schema': tool.args_schema,
- }
- for tool in asyncio.run(get_mcp_tools({server: servers[server]}))]
- return tools
|