| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- # coding=utf-8
- """
- @project: MaxKB
- @Author:niu
- @file: task_source_trigger.py
- @date:2026/1/22 16:18
- @desc:
- """
- from typing import Dict
- from django.db import transaction
- from django.db.models import QuerySet
- from django.utils.translation import gettext_lazy as _
- from rest_framework import serializers
- from application.models import Application
- from common.exception.app_exception import AppApiException
- from tools.models import Tool
- from trigger.models import TriggerTypeChoices, Trigger, TriggerTaskTypeChoices, TriggerTask
- from trigger.serializers.trigger import TriggerModelSerializer, TriggerSerializer, ApplicationTriggerTaskSerializer, \
- ToolTriggerTaskSerializer, TriggerTaskModelSerializer
- class TaskSourceTriggerTaskEditRequest(serializers.Serializer):
- meta = serializers.DictField(default=dict, required=False)
- parameter = serializers.DictField(default=dict, required=False)
- class TaskSourceTriggerEditRequest(serializers.Serializer):
- name = serializers.CharField(required=False, label=_('trigger name'))
- desc = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('trigger description'))
- trigger_type = serializers.ChoiceField(required=False, choices=TriggerTypeChoices)
- trigger_setting = serializers.DictField(required=False, label=_("trigger setting"))
- meta = serializers.DictField(default=dict, required=False)
- trigger_task = TaskSourceTriggerTaskEditRequest(many=True, required=False)
- class TaskSourceTriggerSerializer(serializers.Serializer):
- workspace_id = serializers.CharField(required=True, label=_('workspace id'))
- user_id = serializers.UUIDField(required=True, label=_("User ID"))
- def insert(self, instance, with_valid=True):
- if with_valid:
- self.is_valid(raise_exception=True)
- if not len(instance.get("trigger_task")) == 1:
- raise AppApiException(500, _('Trigger task number must be one'))
- source_id = instance.get('source_id')
- source_type = instance.get('source_type')
- source_trigger_task = instance.get('trigger_task')[0]
- if not (instance.get('source_id') == source_id and source_trigger_task.get('source_type') == source_type):
- raise AppApiException(500, _('Incorrect trigger task'))
- return TriggerSerializer(data={
- 'workspace_id': self.data.get('workspace_id'),
- 'user_id': self.data.get('user_id')
- }).insert(instance, with_valid=True)
- class TaskSourceTriggerOperateSerializer(serializers.Serializer):
- trigger_id = serializers.UUIDField(required=True, label=_('trigger id'))
- workspace_id = serializers.CharField(required=False, label=_('workspace id'))
- source_type = serializers.CharField(required=True, label=_('source type'))
- source_id = serializers.CharField(required=True, label=_('source id'))
- def is_valid(self, *, raise_exception=False):
- super().is_valid(raise_exception=True)
- workspace_id = self.data.get('workspace_id')
- query_set = QuerySet(Trigger).filter(id=self.data.get('trigger_id'))
- if workspace_id:
- query_set = query_set.filter(workspace_id=workspace_id)
- if not query_set.exists():
- raise AppApiException(500, _('Trigger id does not exist'))
- def one(self, with_valid=True):
- if with_valid:
- self.is_valid()
- trigger_id = self.data.get('trigger_id')
- workspace_id = self.data.get('workspace_id')
- source_id = self.data.get('source_id')
- source_type = self.data.get('source_type')
- trigger = QuerySet(Trigger).filter(workspace_id=workspace_id, id=trigger_id).first()
- trigger_task = TriggerTaskModelSerializer(TriggerTask.objects.filter(
- trigger_id=trigger_id, source_id=source_id, source_type=source_type).first()).data
- if source_type == TriggerTaskTypeChoices.APPLICATION:
- application_task = ApplicationTriggerTaskSerializer(
- Application.objects.filter(workspace_id=workspace_id, id=source_id).first()).data
- return {
- **TriggerModelSerializer(trigger).data,
- 'trigger_task': trigger_task,
- 'application_task': application_task,
- }
- if source_type == TriggerTaskTypeChoices.TOOL:
- tool_task = ToolTriggerTaskSerializer(
- Tool.objects.filter(workspace_id=workspace_id, id=source_id).first()).data
- return {
- **TriggerModelSerializer(trigger).data,
- 'trigger_task': trigger_task,
- 'tool_task': tool_task,
- }
- @transaction.atomic
- def edit(self, instance: Dict, with_valid=True):
- from trigger.handler.simple_tools import deploy, undeploy
- if with_valid:
- self.is_valid(raise_exception=True)
- serializer = TaskSourceTriggerEditRequest(data=instance)
- serializer.is_valid(raise_exception=True)
- valid_data = serializer.validated_data
- trigger_id = self.data.get('trigger_id')
- workspace_id = self.data.get('workspace_id')
- source_id = self.data.get('source_id')
- source_type = self.data.get('source_type')
- trigger = Trigger.objects.filter(workspace_id=workspace_id, id=trigger_id).first()
- if not trigger:
- raise serializers.ValidationError(_('Trigger not found'))
- task_source_trigger_edit_field_list = ['name', 'desc', 'trigger_type', 'trigger_setting', 'meta']
- trigger_deploy_edit_field_list = ['trigger_type', 'trigger_setting']
- need_redeploy = any(field in instance for field in trigger_deploy_edit_field_list)
- for field in task_source_trigger_edit_field_list:
- if field in valid_data:
- setattr(trigger, field, valid_data.get(field))
- trigger.save()
- trigger_task = valid_data.get('trigger_task')
- if trigger_task is not None:
- # 检查是否为空列表
- if not trigger_task:
- raise serializers.ValidationError(_('Trigger must have at least one task'))
- TriggerTask.objects.filter(
- source_id=source_id,
- source_type=source_type,
- trigger_id=trigger_id
- ).update(parameter=trigger_task[0].get("parameter"), meta=trigger_task[0].get("meta"))
- else:
- # 用户没提交 trigger_task 字段,确保数据库中有 task
- if not TriggerTask.objects.filter(trigger_id=trigger_id).exists():
- raise serializers.ValidationError(_('Trigger must have at least one task'))
- if need_redeploy:
- if trigger.is_active and trigger.trigger_type == 'SCHEDULED':
- deploy(TriggerModelSerializer(trigger).data, **{})
- else:
- undeploy(TriggerModelSerializer(trigger).data, **{})
- return self.one()
- # 删除的是当前trigger_id+source_id+source_type对应的task
- @transaction.atomic
- def delete(self):
- from trigger.handler.simple_tools import undeploy
- self.is_valid(raise_exception=True)
- trigger_id = self.data.get('trigger_id')
- workspace_id = self.data.get('workspace_id')
- source_id = self.data.get('source_id')
- source_type = self.data.get('source_type')
- trigger = Trigger.objects.filter(workspace_id=workspace_id, id=trigger_id).first()
- if not trigger:
- raise AppApiException(404, _('Trigger not found'))
- delete_count = TriggerTask.objects.filter(trigger_id=trigger_id, source_id=source_id,
- source_type=source_type).delete()[0]
- if delete_count == 0:
- raise AppApiException(404, _('Task not found'))
- has_other_tasks = TriggerTask.objects.filter(trigger_id=trigger_id).exists()
- undeploy(TriggerModelSerializer(trigger).data, **{})
- if not has_other_tasks:
- trigger.delete()
- return True
- class TaskSourceTriggerListSerializer(serializers.Serializer):
- workspace_id = serializers.CharField(required=True, label=_('workspace id'))
- source_type = serializers.CharField(required=True, label=_('source type'))
- source_id = serializers.CharField(required=True, label=_('source id'))
- def list(self, with_valid=True):
- if with_valid:
- self.is_valid(raise_exception=True)
- triggers = Trigger.objects.filter(workspace_id=self.data.get("workspace_id"),
- triggertask__source_id=self.data.get("source_id"),
- triggertask__source_type=self.data.get("source_type"),
- is_active=True
- ).distinct()
- return [TriggerModelSerializer(trigger).data for trigger in triggers]
|