knowledge_workflow.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. # coding=utf-8
  2. import asyncio
  3. import base64
  4. import json
  5. import pickle
  6. from functools import reduce
  7. from typing import Dict, List
  8. import requests
  9. import uuid_utils.compat as uuid
  10. from django.core.cache import cache
  11. from django.db import transaction
  12. from django.db.models import QuerySet
  13. from django.http import HttpResponse
  14. from django.utils import timezone
  15. from django.utils.translation import gettext_lazy as _
  16. from rest_framework import serializers, status
  17. from application.flow.common import Workflow, WorkflowMode
  18. from application.flow.i_step_node import KnowledgeWorkflowPostHandler
  19. from application.flow.knowledge_workflow_manage import KnowledgeWorkflowManage
  20. from application.flow.step_node import get_node
  21. from application.flow.tools import save_workflow_mapping
  22. from application.serializers.application import get_mcp_tools
  23. from common.constants.cache_version import Cache_Version
  24. from common.db.search import page_search
  25. from common.exception.app_exception import AppApiException
  26. from common.field.common import UploadedFileField
  27. from common.result import result
  28. from common.utils.common import bytes_to_uploaded_file
  29. from common.utils.common import restricted_loads, generate_uuid
  30. from common.utils.logger import maxkb_logger
  31. from common.utils.rsa_util import rsa_long_decrypt
  32. from common.utils.tool_code import ToolExecutor
  33. from knowledge.models import KnowledgeScope, Knowledge, KnowledgeType, KnowledgeWorkflow, KnowledgeWorkflowVersion, \
  34. File, FileSourceType
  35. from knowledge.models.knowledge_action import KnowledgeAction, State
  36. from knowledge.serializers.common import update_resource_mapping_by_knowledge
  37. from knowledge.serializers.knowledge import KnowledgeModelSerializer
  38. from system_manage.models import AuthTargetType
  39. from system_manage.models.resource_mapping import ResourceType
  40. from system_manage.serializers.user_resource_permission import UserResourcePermissionSerializer
  41. from tools.models import Tool, ToolScope, ToolType, ToolWorkflow
  42. from tools.serializers.tool import ToolExportModelSerializer
  43. from users.models import User
  44. tool_executor = ToolExecutor()
  45. def hand_node(node, update_tool_map):
  46. if node.get('type') == 'tool-lib-node':
  47. tool_lib_id = (node.get('properties', {}).get('node_data', {}).get('tool_lib_id') or '')
  48. node.get('properties', {}).get('node_data', {})['tool_lib_id'] = update_tool_map.get(tool_lib_id, tool_lib_id)
  49. if node.get('type') == 'search-knowledge-node':
  50. node.get('properties', {}).get('node_data', {})['knowledge_id_list'] = []
  51. if node.get('type') == 'ai-chat-node':
  52. node_data = node.get('properties', {}).get('node_data', {})
  53. mcp_tool_ids = node_data.get('mcp_tool_ids') or []
  54. node_data['mcp_tool_ids'] = [update_tool_map.get(tool_id,
  55. tool_id) for tool_id in mcp_tool_ids]
  56. tool_ids = node_data.get('tool_ids') or []
  57. node_data['tool_ids'] = [update_tool_map.get(tool_id,
  58. tool_id) for tool_id in tool_ids]
  59. skill_tool_ids = node_data.get('skill_tool_ids') or []
  60. node_data['skill_tool_ids'] = [update_tool_map.get(tool_id,
  61. tool_id) for tool_id in skill_tool_ids]
  62. if node.get('type') == 'mcp-node':
  63. mcp_tool_id = (node.get('properties', {}).get('node_data', {}).get('mcp_tool_id') or '')
  64. node.get('properties', {}).get('node_data', {})['mcp_tool_id'] = update_tool_map.get(mcp_tool_id,
  65. mcp_tool_id)
  66. if node.get('type') == 'tool-workflow-lib-node':
  67. tool_lib_id = (node.get('properties', {}).get('node_data', {}).get('tool_lib_id') or '')
  68. node.get('properties', {}).get('node_data', {})['tool_lib_id'] = update_tool_map.get(tool_lib_id,
  69. tool_lib_id)
  70. class KnowledgeWorkflowModelSerializer(serializers.ModelSerializer):
  71. class Meta:
  72. model = KnowledgeWorkflow
  73. fields = '__all__'
  74. class KnowledgeWorkflowActionRequestSerializer(serializers.Serializer):
  75. data_source = serializers.DictField(required=True, label=_('datasource data'))
  76. knowledge_base = serializers.DictField(required=True, label=_('knowledge base data'))
  77. class KnowledgeWorkflowImportRequest(serializers.Serializer):
  78. file = UploadedFileField(required=True, label=_("file"))
  79. class KnowledgeWorkflowActionListQuerySerializer(serializers.Serializer):
  80. user_name = serializers.CharField(required=False, label=_('Name'), allow_blank=True, allow_null=True)
  81. state = serializers.CharField(required=False, label=_("State"), allow_blank=True, allow_null=True)
  82. class KBWFInstance:
  83. def __init__(self, knowledge_workflow: dict, function_lib_list: List[dict], version: str, tool_list: List[dict]):
  84. self.knowledge_workflow = knowledge_workflow
  85. self.function_lib_list = function_lib_list
  86. self.version = version
  87. self.tool_list = tool_list
  88. def get_tool_list(self):
  89. return [*(self.tool_list or []), *(self.function_lib_list or [])]
  90. class KnowledgeWorkflowActionSerializer(serializers.Serializer):
  91. workspace_id = serializers.CharField(required=True, label=_('workspace id'))
  92. knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
  93. def get_query_set(self, instance: Dict):
  94. query_set = QuerySet(KnowledgeAction).filter(knowledge_id=self.data.get('knowledge_id')).values('id',
  95. 'knowledge_id',
  96. "state", 'meta',
  97. 'run_time',
  98. "create_time")
  99. if instance.get("user_name"):
  100. query_set = query_set.filter(meta__user_name__icontains=instance.get('user_name'))
  101. if instance.get('state'):
  102. query_set = query_set.filter(state=instance.get('state'))
  103. return query_set.order_by('-create_time')
  104. def list(self, instance: Dict, is_valid=True):
  105. if is_valid:
  106. self.is_valid(raise_exception=True)
  107. KnowledgeWorkflowActionListQuerySerializer(data=instance).is_valid(raise_exception=True)
  108. return [{'id': a.get("id"), 'knowledge_id': a.get("knowledge_id"), 'state': a.get("state"),
  109. 'meta': a.get("meta"), 'run_time': a.get("run_time"), 'create_time': a.get("create_time")} for a in
  110. self.get_query_set(instance)]
  111. def page(self, current_page, page_size, instance: Dict, is_valid=True):
  112. if is_valid:
  113. self.is_valid(raise_exception=True)
  114. KnowledgeWorkflowActionListQuerySerializer(data=instance).is_valid(raise_exception=True)
  115. return page_search(current_page, page_size, self.get_query_set(instance),
  116. lambda a: {'id': a.get("id"), 'knowledge_id': a.get("knowledge_id"), 'state': a.get("state"),
  117. 'meta': a.get("meta"), 'run_time': a.get("run_time"),
  118. 'create_time': a.get("create_time")})
  119. def action(self, instance: Dict, user, with_valid=True):
  120. if with_valid:
  121. self.is_valid(raise_exception=True)
  122. knowledge_workflow = QuerySet(KnowledgeWorkflow).filter(knowledge_id=self.data.get("knowledge_id")).first()
  123. knowledge_action_id = uuid.uuid7()
  124. meta = {'user_id': str(user.id),
  125. 'user_name': user.username}
  126. KnowledgeAction(id=knowledge_action_id,
  127. knowledge_id=self.data.get("knowledge_id"),
  128. state=State.STARTED,
  129. meta=meta).save()
  130. knowledge = QuerySet(Knowledge).filter(id=self.data.get('knowledge_id')).first()
  131. instance['knowledge_base'] = {**(instance.get('knowledge_base') or {}),
  132. 'knowledge': {'id': str(knowledge.id), 'name': knowledge.name,
  133. 'desc': knowledge.desc,
  134. 'workspace_id': knowledge.workspace_id}}
  135. work_flow_manage = KnowledgeWorkflowManage(
  136. Workflow.new_instance(knowledge_workflow.work_flow, WorkflowMode.KNOWLEDGE),
  137. {'knowledge_id': self.data.get("knowledge_id"), 'knowledge_action_id': knowledge_action_id, 'stream': True,
  138. 'workspace_id': self.data.get("workspace_id"),
  139. **instance},
  140. KnowledgeWorkflowPostHandler(None, knowledge_action_id),
  141. is_the_task_interrupted=lambda: cache.get(
  142. Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_key(action_id=knowledge_action_id),
  143. version=Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_version()) or False)
  144. work_flow_manage.run()
  145. return {'id': knowledge_action_id, 'knowledge_id': self.data.get("knowledge_id"), 'state': State.STARTED,
  146. 'details': {}, 'meta': meta}
  147. def upload_document(self, instance: Dict, user, with_valid=True):
  148. if with_valid:
  149. self.is_valid(raise_exception=True)
  150. knowledge_workflow = QuerySet(KnowledgeWorkflow).filter(knowledge_id=self.data.get("knowledge_id")).first()
  151. if not knowledge_workflow.is_publish:
  152. raise AppApiException(500, _("The knowledge base workflow has not been published"))
  153. knowledge_workflow_version = QuerySet(KnowledgeWorkflowVersion).filter(
  154. knowledge_id=self.data.get("knowledge_id")).order_by(
  155. '-create_time')[0:1].first()
  156. knowledge_action_id = uuid.uuid7()
  157. meta = {'user_id': str(user.id),
  158. 'user_name': user.username}
  159. KnowledgeAction(id=knowledge_action_id, knowledge_id=self.data.get("knowledge_id"), state=State.STARTED,
  160. meta=meta).save()
  161. knowledge = QuerySet(Knowledge).filter(id=self.data.get('knowledge_id')).first()
  162. instance['knowledge_base'] = {**(instance.get('knowledge_base') or {}),
  163. 'knowledge': {'id': str(knowledge.id), 'name': knowledge.name,
  164. 'desc': knowledge.desc,
  165. 'workspace_id': knowledge.workspace_id}}
  166. work_flow_manage = KnowledgeWorkflowManage(
  167. Workflow.new_instance(knowledge_workflow_version.work_flow, WorkflowMode.KNOWLEDGE),
  168. {'knowledge_id': self.data.get("knowledge_id"), 'knowledge_action_id': knowledge_action_id, 'stream': True,
  169. 'workspace_id': self.data.get("workspace_id"),
  170. **instance},
  171. KnowledgeWorkflowPostHandler(None, knowledge_action_id),
  172. is_the_task_interrupted=lambda: cache.get(
  173. Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_key(action_id=knowledge_action_id),
  174. version=Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_version()) or False
  175. )
  176. work_flow_manage.run()
  177. return {'id': knowledge_action_id, 'knowledge_id': self.data.get("knowledge_id"), 'state': State.STARTED,
  178. 'details': {}, 'meta': meta}
  179. class Operate(serializers.Serializer):
  180. workspace_id = serializers.CharField(required=True, label=_('workspace id'))
  181. knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
  182. id = serializers.UUIDField(required=True, label=_('knowledge action id'))
  183. def one(self, is_valid=True):
  184. if is_valid:
  185. self.is_valid(raise_exception=True)
  186. knowledge_action_id = self.data.get("id")
  187. knowledge_action = QuerySet(KnowledgeAction).filter(id=knowledge_action_id).first()
  188. return {'id': knowledge_action_id, 'knowledge_id': knowledge_action.knowledge_id,
  189. 'state': knowledge_action.state,
  190. 'details': knowledge_action.details,
  191. 'meta': knowledge_action.meta}
  192. def cancel(self, is_valid=True):
  193. if is_valid:
  194. self.is_valid(raise_exception=True)
  195. knowledge_action_id = self.data.get("id")
  196. cache.set(Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_key(action_id=knowledge_action_id), True,
  197. version=Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_version())
  198. QuerySet(KnowledgeAction).filter(id=knowledge_action_id, state__in=[State.STARTED, State.PENDING]).update(
  199. state=State.REVOKE)
  200. return True
  201. class KnowledgeWorkflowSerializer(serializers.Serializer):
  202. class Datasource(serializers.Serializer):
  203. type = serializers.CharField(required=True, label=_('type'))
  204. id = serializers.CharField(required=True, label=_('type'))
  205. params = serializers.DictField(required=True, label="")
  206. function_name = serializers.CharField(required=True, label=_('function_name'))
  207. def action(self):
  208. self.is_valid(raise_exception=True)
  209. if self.data.get('type') == 'local':
  210. node = get_node(self.data.get('id'), WorkflowMode.KNOWLEDGE)
  211. return node.__getattribute__(node, self.data.get("function_name"))(**self.data.get("params"))
  212. elif self.data.get('type') == 'tool':
  213. tool = QuerySet(Tool).filter(id=self.data.get("id")).first()
  214. init_params = json.loads(rsa_long_decrypt(tool.init_params))
  215. return tool_executor.exec_code(tool.code, {**init_params, **self.data.get('params')},
  216. self.data.get('function_name'))
  217. class Create(serializers.Serializer):
  218. user_id = serializers.UUIDField(required=True, label=_('user id'))
  219. workspace_id = serializers.CharField(required=True, label=_('workspace id'))
  220. scope = serializers.ChoiceField(
  221. required=False, label=_('scope'), default=KnowledgeScope.WORKSPACE, choices=KnowledgeScope.choices
  222. )
  223. @transaction.atomic
  224. def save_workflow(self, instance: Dict):
  225. self.is_valid(raise_exception=True)
  226. folder_id = instance.get('folder_id', self.data.get('workspace_id'))
  227. knowledge_id = uuid.uuid7()
  228. knowledge = Knowledge(
  229. id=knowledge_id,
  230. name=instance.get('name'),
  231. desc=instance.get('desc'),
  232. user_id=self.data.get('user_id'),
  233. type=instance.get('type', KnowledgeType.WORKFLOW),
  234. scope=self.data.get('scope', KnowledgeScope.WORKSPACE),
  235. folder_id=folder_id,
  236. workspace_id=self.data.get('workspace_id'),
  237. embedding_model_id=instance.get('embedding_model_id'),
  238. meta={},
  239. )
  240. knowledge.save()
  241. # 自动资源给授权当前用户
  242. UserResourcePermissionSerializer(data={
  243. 'workspace_id': self.data.get('workspace_id'),
  244. 'user_id': self.data.get('user_id'),
  245. 'auth_target_type': AuthTargetType.KNOWLEDGE.value
  246. }).auth_resource(str(knowledge_id))
  247. knowledge_workflow = KnowledgeWorkflow(
  248. id=uuid.uuid7(),
  249. knowledge_id=knowledge_id,
  250. workspace_id=self.data.get('workspace_id'),
  251. work_flow=instance.get('work_flow', {}),
  252. )
  253. knowledge_workflow.save()
  254. save_workflow_mapping(instance.get('work_flow', {}), ResourceType.KNOWLEDGE, str(knowledge_id))
  255. # 处理 work_flow_template
  256. if instance.get('work_flow_template') is not None:
  257. template_instance = instance.get('work_flow_template')
  258. download_url = template_instance.get('downloadUrl')
  259. # 查找匹配的版本名称
  260. res = requests.get(download_url, timeout=5)
  261. KnowledgeWorkflowSerializer.Import(data={
  262. 'user_id': self.data.get('user_id'),
  263. 'workspace_id': self.data.get('workspace_id'),
  264. 'knowledge_id': str(knowledge_id),
  265. }).import_({'file': bytes_to_uploaded_file(res.content, 'file.kbwf')}, is_import_tool=True)
  266. try:
  267. requests.get(template_instance.get('downloadCallbackUrl'), timeout=5)
  268. except Exception as e:
  269. maxkb_logger.error(f"callback appstore tool download error: {e}")
  270. return {**KnowledgeModelSerializer(knowledge).data, 'document_list': []}
  271. class Import(serializers.Serializer):
  272. user_id = serializers.UUIDField(required=True, label=_('user id'))
  273. workspace_id = serializers.CharField(required=False, label=_('workspace id'))
  274. knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
  275. @transaction.atomic
  276. def import_(self, instance: dict, is_import_tool, with_valid=True):
  277. if with_valid:
  278. self.is_valid()
  279. KnowledgeWorkflowImportRequest(data=instance).is_valid(raise_exception=True)
  280. user_id = self.data.get('user_id')
  281. workspace_id = self.data.get('workspace_id')
  282. knowledge_id = self.data.get('knowledge_id')
  283. kbwf_instance_bytes = instance.get('file').read()
  284. try:
  285. kbwf_instance = restricted_loads(kbwf_instance_bytes)
  286. except Exception as e:
  287. raise AppApiException(1001, _("Unsupported file format"))
  288. knowledge_workflow = kbwf_instance.knowledge_workflow
  289. tool_list = kbwf_instance.get_tool_list()
  290. update_tool_map = {}
  291. if len(tool_list) > 0:
  292. tool_id_list = reduce(lambda x, y: [*x, *y],
  293. [[tool.get('id'), generate_uuid((tool.get('id') + workspace_id or ''))]
  294. for tool
  295. in
  296. tool_list], [])
  297. # 存在的工具列表
  298. exits_tool_id_list = [str(tool.id) for tool in
  299. QuerySet(Tool).filter(id__in=tool_id_list, workspace_id=workspace_id)]
  300. # 需要更新的工具集合
  301. update_tool_map = {tool.get('id'): generate_uuid((tool.get('id') + workspace_id or '')) for tool
  302. in
  303. tool_list if
  304. not exits_tool_id_list.__contains__(
  305. tool.get('id'))}
  306. tool_list = [{**tool, 'id': update_tool_map.get(tool.get('id'))} for tool in tool_list if
  307. not exits_tool_id_list.__contains__(
  308. tool.get('id')) and not exits_tool_id_list.__contains__(
  309. generate_uuid((tool.get('id') + workspace_id or '')))]
  310. work_flow = self.to_knowledge_workflow(
  311. knowledge_workflow,
  312. update_tool_map,
  313. )
  314. tool_model_list = [self.to_tool(tool, workspace_id, user_id) for tool in tool_list]
  315. KnowledgeWorkflow.objects.filter(workspace_id=workspace_id, knowledge_id=knowledge_id).update_or_create(
  316. knowledge_id=knowledge_id,
  317. workspace_id=workspace_id,
  318. defaults={'work_flow': work_flow}
  319. )
  320. if is_import_tool:
  321. if len(tool_model_list) > 0:
  322. QuerySet(Tool).bulk_create(tool_model_list)
  323. QuerySet(ToolWorkflow).bulk_create(
  324. [ToolWorkflow(workspace_id=workspace_id,
  325. work_flow=self.reset_workflow(tool.get('work_flow'), update_tool_map),
  326. tool_id=tool.get('id'))
  327. for
  328. tool in tool_list if tool.get('tool_type') == ToolType.WORKFLOW])
  329. UserResourcePermissionSerializer(data={
  330. 'workspace_id': self.data.get('workspace_id'),
  331. 'user_id': self.data.get('user_id'),
  332. 'auth_target_type': AuthTargetType.TOOL.value
  333. }).auth_resource_batch([t.id for t in tool_model_list])
  334. return True
  335. update_resource_mapping_by_knowledge(knowledge_id)
  336. @staticmethod
  337. def to_knowledge_workflow(knowledge_workflow, update_tool_map):
  338. work_flow = knowledge_workflow.get("work_flow")
  339. for node in work_flow.get('nodes', []):
  340. hand_node(node, update_tool_map)
  341. if node.get('type') == 'loop_node':
  342. for n in node.get('properties', {}).get('node_data', {}).get('loop_body', {}).get('nodes', []):
  343. hand_node(n, update_tool_map)
  344. return work_flow
  345. @staticmethod
  346. def reset_workflow(work_flow, update_tool_map):
  347. for node in work_flow.get('nodes', []):
  348. hand_node(node, update_tool_map)
  349. if node.get('type') == 'loop-node':
  350. for n in node.get('properties', {}).get('node_data', {}).get('loop_body', {}).get('nodes', []):
  351. hand_node(n, update_tool_map)
  352. return work_flow
  353. @staticmethod
  354. def to_tool(tool, workspace_id, user_id):
  355. # 如果是技能类型的工具,需要将code保存为文件
  356. code = tool.get('code')
  357. if tool.get('tool_type') == ToolType.SKILL:
  358. skill_file_id = uuid.uuid7()
  359. skill_file = File(
  360. id=skill_file_id,
  361. file_name=f"{tool.get('name')}.zip",
  362. source_type=FileSourceType.TOOL,
  363. source_id=tool.get('id'),
  364. meta={}
  365. )
  366. skill_file.save(base64.b64decode(code))
  367. tool['code'] = skill_file_id
  368. return Tool(id=tool.get('id'),
  369. user_id=user_id,
  370. name=tool.get('name'),
  371. code=tool.get('code'),
  372. template_id=tool.get('template_id'),
  373. input_field_list=tool.get('input_field_list'),
  374. init_field_list=tool.get('init_field_list'),
  375. is_active=False if len((tool.get('init_field_list') or [])) > 0 else tool.get('is_active'),
  376. tool_type=tool.get('tool_type', 'CUSTOM') or 'CUSTOM',
  377. scope=ToolScope.SHARED if workspace_id == 'None' else ToolScope.WORKSPACE,
  378. folder_id='default' if workspace_id == 'None' else workspace_id,
  379. workspace_id=workspace_id)
  380. class Export(serializers.Serializer):
  381. user_id = serializers.UUIDField(required=True, label=_('user id'))
  382. workspace_id = serializers.CharField(required=False, label=_('workspace id'))
  383. knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
  384. def export(self, with_valid=True):
  385. try:
  386. if with_valid:
  387. self.is_valid()
  388. knowledge_id = self.data.get('knowledge_id')
  389. knowledge_workflow = QuerySet(KnowledgeWorkflow).filter(knowledge_id=knowledge_id).first()
  390. knowledge = QuerySet(Knowledge).filter(id=knowledge_id).first()
  391. from application.flow.tools import get_tool_id_list
  392. tool_id_list = get_tool_id_list(knowledge_workflow.work_flow, True)
  393. tool_list = []
  394. if len(tool_id_list) > 0:
  395. tool_list = QuerySet(Tool).filter(id__in=tool_id_list).exclude(scope=ToolScope.SHARED)
  396. tw_dict = {tw.tool_id: tw
  397. for tw in QuerySet(ToolWorkflow).filter(
  398. tool_id__in=[tool.id for tool in tool_list if tool.tool_type == ToolType.WORKFLOW])}
  399. # 如果是技能工具,则需要将code字段转换为文件内容的base64字符串
  400. for tool in tool_list:
  401. if tool.tool_type == ToolType.SKILL:
  402. skill_file = QuerySet(File).filter(id=tool.code).first()
  403. if skill_file:
  404. tool.code = base64.b64encode(skill_file.get_bytes()).decode('utf-8')
  405. knowledge_workflow_dict = KnowledgeWorkflowModelSerializer(knowledge_workflow).data
  406. kbwf_instance = KBWFInstance(
  407. knowledge_workflow_dict,
  408. [],
  409. 'v2',
  410. [self.to_tool_dict(tool, tw_dict) for tool in tool_list]
  411. )
  412. knowledge_workflow_pickle = pickle.dumps(kbwf_instance)
  413. response = HttpResponse(content_type='text/plain', content=knowledge_workflow_pickle)
  414. response['Content-Disposition'] = f'attachment; filename="{knowledge.name}.kbwf"'
  415. return response
  416. except Exception as e:
  417. return result.error(str(e), response_status=status.HTTP_500_INTERNAL_SERVER_ERROR)
  418. @staticmethod
  419. def to_tool_dict(tool, tool_workflow_dict):
  420. if tool.tool_type == ToolType.WORKFLOW:
  421. return {**ToolExportModelSerializer(tool).data, 'work_flow': tool_workflow_dict.get(tool.id).work_flow}
  422. return ToolExportModelSerializer(tool).data
  423. class Operate(serializers.Serializer):
  424. user_id = serializers.UUIDField(required=True, label=_('user id'))
  425. workspace_id = serializers.CharField(required=True, label=_('workspace id'))
  426. knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
  427. def publish(self, with_valid=True):
  428. if with_valid:
  429. self.is_valid()
  430. user_id = self.data.get('user_id')
  431. workspace_id = self.data.get("workspace_id")
  432. user = QuerySet(User).filter(id=user_id).first()
  433. knowledge_workflow = QuerySet(KnowledgeWorkflow).filter(knowledge_id=self.data.get("knowledge_id"),
  434. workspace_id=workspace_id).first()
  435. work_flow_version = KnowledgeWorkflowVersion(work_flow=knowledge_workflow.work_flow,
  436. knowledge_id=self.data.get("knowledge_id"),
  437. name=timezone.localtime(timezone.now()).strftime(
  438. '%Y-%m-%d %H:%M:%S'),
  439. publish_user_id=user_id,
  440. publish_user_name=user.username,
  441. workspace_id=workspace_id)
  442. work_flow_version.save()
  443. QuerySet(KnowledgeWorkflow).filter(
  444. knowledge_id=self.data.get("knowledge_id")
  445. ).update(is_publish=True, publish_time=timezone.now())
  446. return True
  447. def edit(self, instance: Dict):
  448. self.is_valid(raise_exception=True)
  449. if instance.get("work_flow"):
  450. QuerySet(KnowledgeWorkflow).update_or_create(knowledge_id=self.data.get("knowledge_id"),
  451. create_defaults={'id': uuid.uuid7(),
  452. 'knowledge_id': self.data.get(
  453. "knowledge_id"),
  454. "workspace_id": self.data.get(
  455. 'workspace_id'),
  456. 'work_flow': instance.get('work_flow',
  457. {}), },
  458. defaults={
  459. 'work_flow': instance.get('work_flow')
  460. })
  461. update_resource_mapping_by_knowledge(self.data.get("knowledge_id"))
  462. return self.one()
  463. if instance.get("work_flow_template"):
  464. template_instance = instance.get('work_flow_template')
  465. download_url = template_instance.get('downloadUrl')
  466. # 查找匹配的版本名称
  467. res = requests.get(download_url, timeout=5)
  468. KnowledgeWorkflowSerializer.Import(data={
  469. 'user_id': self.data.get('user_id'),
  470. 'workspace_id': self.data.get('workspace_id'),
  471. 'knowledge_id': str(self.data.get('knowledge_id')),
  472. }).import_({'file': bytes_to_uploaded_file(res.content, 'file.kbwf')}, is_import_tool=False)
  473. try:
  474. requests.get(template_instance.get('downloadCallbackUrl'), timeout=5)
  475. except Exception as e:
  476. maxkb_logger.error(f"callback appstore tool download error: {e}")
  477. return self.one()
  478. def one(self):
  479. self.is_valid(raise_exception=True)
  480. workflow = QuerySet(KnowledgeWorkflow).filter(knowledge_id=self.data.get('knowledge_id')).first()
  481. return {**KnowledgeWorkflowModelSerializer(workflow).data}
  482. class McpServersSerializer(serializers.Serializer):
  483. mcp_servers = serializers.JSONField(required=True)
  484. class KnowledgeWorkflowMcpSerializer(serializers.Serializer):
  485. knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
  486. user_id = serializers.UUIDField(required=True, label=_("User ID"))
  487. workspace_id = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_("Workspace ID"))
  488. def is_valid(self, *, raise_exception=False):
  489. super().is_valid(raise_exception=True)
  490. workspace_id = self.data.get('workspace_id')
  491. query_set = QuerySet(Knowledge).filter(id=self.data.get('knowledge_id'))
  492. if workspace_id:
  493. query_set = query_set.filter(workspace_id=workspace_id)
  494. if not query_set.exists():
  495. raise AppApiException(500, _('Knowledge id does not exist'))
  496. def get_mcp_servers(self, instance, with_valid=True):
  497. if with_valid:
  498. self.is_valid(raise_exception=True)
  499. McpServersSerializer(data=instance).is_valid(raise_exception=True)
  500. servers = json.loads(instance.get('mcp_servers'))
  501. for server, config in servers.items():
  502. if config.get('transport') not in ['sse', 'streamable_http']:
  503. raise AppApiException(500, _('Only support transport=sse or transport=streamable_http'))
  504. tools = []
  505. for server in servers:
  506. tools += [
  507. {
  508. 'server': server,
  509. 'name': tool.name,
  510. 'description': tool.description,
  511. 'args_schema': tool.args_schema,
  512. }
  513. for tool in asyncio.run(get_mcp_tools({server: servers[server]}))]
  514. return tools