# coding=utf-8 """ @project: MaxKB @Author:niu @file: task_source_trigger.py @date:2026/1/22 16:18 @desc: """ from typing import Dict from django.db import transaction from django.db.models import QuerySet from django.utils.translation import gettext_lazy as _ from rest_framework import serializers from application.models import Application from common.exception.app_exception import AppApiException from tools.models import Tool from trigger.models import TriggerTypeChoices, Trigger, TriggerTaskTypeChoices, TriggerTask from trigger.serializers.trigger import TriggerModelSerializer, TriggerSerializer, ApplicationTriggerTaskSerializer, \ ToolTriggerTaskSerializer, TriggerTaskModelSerializer class TaskSourceTriggerTaskEditRequest(serializers.Serializer): meta = serializers.DictField(default=dict, required=False) parameter = serializers.DictField(default=dict, required=False) class TaskSourceTriggerEditRequest(serializers.Serializer): name = serializers.CharField(required=False, label=_('trigger name')) desc = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('trigger description')) trigger_type = serializers.ChoiceField(required=False, choices=TriggerTypeChoices) trigger_setting = serializers.DictField(required=False, label=_("trigger setting")) meta = serializers.DictField(default=dict, required=False) trigger_task = TaskSourceTriggerTaskEditRequest(many=True, required=False) class TaskSourceTriggerSerializer(serializers.Serializer): workspace_id = serializers.CharField(required=True, label=_('workspace id')) user_id = serializers.UUIDField(required=True, label=_("User ID")) def insert(self, instance, with_valid=True): if with_valid: self.is_valid(raise_exception=True) if not len(instance.get("trigger_task")) == 1: raise AppApiException(500, _('Trigger task number must be one')) source_id = instance.get('source_id') source_type = instance.get('source_type') source_trigger_task = instance.get('trigger_task')[0] if not (instance.get('source_id') == source_id and source_trigger_task.get('source_type') == source_type): raise AppApiException(500, _('Incorrect trigger task')) return TriggerSerializer(data={ 'workspace_id': self.data.get('workspace_id'), 'user_id': self.data.get('user_id') }).insert(instance, with_valid=True) class TaskSourceTriggerOperateSerializer(serializers.Serializer): trigger_id = serializers.UUIDField(required=True, label=_('trigger id')) workspace_id = serializers.CharField(required=False, label=_('workspace id')) source_type = serializers.CharField(required=True, label=_('source type')) source_id = serializers.CharField(required=True, label=_('source id')) def is_valid(self, *, raise_exception=False): super().is_valid(raise_exception=True) workspace_id = self.data.get('workspace_id') query_set = QuerySet(Trigger).filter(id=self.data.get('trigger_id')) if workspace_id: query_set = query_set.filter(workspace_id=workspace_id) if not query_set.exists(): raise AppApiException(500, _('Trigger id does not exist')) def one(self, with_valid=True): if with_valid: self.is_valid() trigger_id = self.data.get('trigger_id') workspace_id = self.data.get('workspace_id') source_id = self.data.get('source_id') source_type = self.data.get('source_type') trigger = QuerySet(Trigger).filter(workspace_id=workspace_id, id=trigger_id).first() trigger_task = TriggerTaskModelSerializer(TriggerTask.objects.filter( trigger_id=trigger_id, source_id=source_id, source_type=source_type).first()).data if source_type == TriggerTaskTypeChoices.APPLICATION: application_task = ApplicationTriggerTaskSerializer( Application.objects.filter(workspace_id=workspace_id, id=source_id).first()).data return { **TriggerModelSerializer(trigger).data, 'trigger_task': trigger_task, 'application_task': application_task, } if source_type == TriggerTaskTypeChoices.TOOL: tool_task = ToolTriggerTaskSerializer( Tool.objects.filter(workspace_id=workspace_id, id=source_id).first()).data return { **TriggerModelSerializer(trigger).data, 'trigger_task': trigger_task, 'tool_task': tool_task, } @transaction.atomic def edit(self, instance: Dict, with_valid=True): from trigger.handler.simple_tools import deploy, undeploy if with_valid: self.is_valid(raise_exception=True) serializer = TaskSourceTriggerEditRequest(data=instance) serializer.is_valid(raise_exception=True) valid_data = serializer.validated_data trigger_id = self.data.get('trigger_id') workspace_id = self.data.get('workspace_id') source_id = self.data.get('source_id') source_type = self.data.get('source_type') trigger = Trigger.objects.filter(workspace_id=workspace_id, id=trigger_id).first() if not trigger: raise serializers.ValidationError(_('Trigger not found')) task_source_trigger_edit_field_list = ['name', 'desc', 'trigger_type', 'trigger_setting', 'meta'] trigger_deploy_edit_field_list = ['trigger_type', 'trigger_setting'] need_redeploy = any(field in instance for field in trigger_deploy_edit_field_list) for field in task_source_trigger_edit_field_list: if field in valid_data: setattr(trigger, field, valid_data.get(field)) trigger.save() trigger_task = valid_data.get('trigger_task') if trigger_task is not None: # 检查是否为空列表 if not trigger_task: raise serializers.ValidationError(_('Trigger must have at least one task')) TriggerTask.objects.filter( source_id=source_id, source_type=source_type, trigger_id=trigger_id ).update(parameter=trigger_task[0].get("parameter"), meta=trigger_task[0].get("meta")) else: # 用户没提交 trigger_task 字段,确保数据库中有 task if not TriggerTask.objects.filter(trigger_id=trigger_id).exists(): raise serializers.ValidationError(_('Trigger must have at least one task')) if need_redeploy: if trigger.is_active and trigger.trigger_type == 'SCHEDULED': deploy(TriggerModelSerializer(trigger).data, **{}) else: undeploy(TriggerModelSerializer(trigger).data, **{}) return self.one() # 删除的是当前trigger_id+source_id+source_type对应的task @transaction.atomic def delete(self): from trigger.handler.simple_tools import undeploy self.is_valid(raise_exception=True) trigger_id = self.data.get('trigger_id') workspace_id = self.data.get('workspace_id') source_id = self.data.get('source_id') source_type = self.data.get('source_type') trigger = Trigger.objects.filter(workspace_id=workspace_id, id=trigger_id).first() if not trigger: raise AppApiException(404, _('Trigger not found')) delete_count = TriggerTask.objects.filter(trigger_id=trigger_id, source_id=source_id, source_type=source_type).delete()[0] if delete_count == 0: raise AppApiException(404, _('Task not found')) has_other_tasks = TriggerTask.objects.filter(trigger_id=trigger_id).exists() undeploy(TriggerModelSerializer(trigger).data, **{}) if not has_other_tasks: trigger.delete() return True class TaskSourceTriggerListSerializer(serializers.Serializer): workspace_id = serializers.CharField(required=True, label=_('workspace id')) source_type = serializers.CharField(required=True, label=_('source type')) source_id = serializers.CharField(required=True, label=_('source id')) def list(self, with_valid=True): if with_valid: self.is_valid(raise_exception=True) triggers = Trigger.objects.filter(workspace_id=self.data.get("workspace_id"), triggertask__source_id=self.data.get("source_id"), triggertask__source_type=self.data.get("source_type"), is_active=True ).distinct() return [TriggerModelSerializer(trigger).data for trigger in triggers]