| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- # coding=utf-8
- """
- @project: MaxKB
- @Author:niu
- @file: trigger.py
- @date:2026/1/14 11:44
- @desc:
- """
- from django.db.models import QuerySet
- from django.utils.translation import gettext_lazy as _
- from drf_spectacular.utils import extend_schema
- from rest_framework.request import Request
- from rest_framework.views import APIView
- from application.api.application_api import ApplicationCreateAPI
- from common import result
- from common.auth import TokenAuth
- from common.auth.authentication import has_permissions
- from common.constants.permission_constants import PermissionConstants, RoleConstants, ViewPermission, CompareConstants, \
- Permission, Group, Operate
- from common.log.log import log
- from common.result import DefaultResultSerializer
- from trigger.models import Trigger
- from trigger.serializers.task_source_trigger import TaskSourceTriggerListSerializer, TaskSourceTriggerOperateSerializer, \
- TaskSourceTriggerSerializer
- from trigger.serializers.trigger import TriggerQuerySerializer, TriggerOperateSerializer
- from trigger.api.trigger import TriggerCreateAPI, TriggerOperateAPI, TriggerEditAPI, TriggerBatchDeleteAPI, \
- TriggerBatchActiveAPI, TaskSourceTriggerOperateAPI, TaskSourceTriggerAPI, TaskSourceTriggerCreateAPI, \
- TriggerQueryAPI, TriggerQueryPageAPI
- from trigger.serializers.trigger import TriggerSerializer
- def get_trigger_operation_object(trigger_id):
- trigger_model = QuerySet(model=Trigger).filter(id=trigger_id).first()
- if trigger_model is not None:
- return {
- "name": trigger_model.name
- }
- def get_trigger_operation_object_batch(trigger_id_list):
- trigger_model_list = QuerySet(model=Trigger).filter(id__in=trigger_id_list)
- if trigger_model_list is not None:
- return {
- "name": f'[{",".join([trigger_model.name for trigger_model in trigger_model_list])}]',
- "trigger_list": [{'name': trigger_model.name, 'type': trigger_model.type} for trigger_model in
- trigger_model_list]
- }
- class TriggerView(APIView):
- authentication_classes = [TokenAuth]
- @extend_schema(
- methods=['POST'],
- description=_('Create trigger'),
- summary=_('Create trigger'),
- operation_id=_('Create trigger'), # type: ignore
- parameters=TriggerCreateAPI.get_parameters(),
- request=TriggerCreateAPI.get_request(),
- responses=TriggerCreateAPI.get_response(),
- tags=[_('Trigger')] # type: ignore
- )
- @has_permissions(
- PermissionConstants.TRIGGER_CREATE.get_workspace_permission_workspace_manage_role(),
- RoleConstants.WORKSPACE_MANAGE.get_workspace_role(),
- )
- @log(
- menu="Trigger", operate="Create trigger",
- get_operation_object=lambda r, k: r.data.get('name'),
- )
- def post(self, request: Request, workspace_id: str):
- return result.success(TriggerSerializer(
- data={'workspace_id': workspace_id, 'user_id': request.user.id}).insert(request.data))
- @extend_schema(
- methods=['GET'],
- description=_('Get the trigger list'),
- summary=_('Get the trigger list'),
- operation_id=_('Get the trigger list'), # type: ignore
- parameters=TriggerQueryAPI.get_parameters(),
- responses=TriggerQueryAPI.get_response(),
- tags=[_('Trigger')] # type: ignore
- )
- @has_permissions(
- PermissionConstants.TRIGGER_READ.get_workspace_permission_workspace_manage_role(),
- RoleConstants.WORKSPACE_MANAGE.get_workspace_role(),
- )
- def get(self, request: Request, workspace_id: str):
- return result.success(TriggerQuerySerializer(data={
- 'workspace_id': workspace_id,
- 'name': request.query_params.get('name'),
- 'type': request.query_params.get('type'),
- 'task': request.query_params.get('task'),
- 'is_active': request.query_params.get('is_active'),
- 'create_user': request.query_params.get('create_user'),
- }).list())
- class Operate(APIView):
- authentication_classes = [TokenAuth]
- @extend_schema(
- methods=['GET'],
- description=_('Get trigger details'),
- summary=_('Get trigger details'),
- operation_id=_('Get trigger details'), # type: ignore
- parameters=TriggerOperateAPI.get_parameters(),
- responses=result.DefaultResultSerializer,
- tags=[_('Trigger')] # type: ignore
- )
- @has_permissions(
- PermissionConstants.TRIGGER_READ.get_workspace_permission_workspace_manage_role(),
- RoleConstants.WORKSPACE_MANAGE.get_workspace_role(),
- )
- @log(
- menu="Trigger", operate="Get trigger details",
- get_operation_object=lambda r, k: get_trigger_operation_object(k.get('trigger_id')),
- )
- def get(self, request: Request, workspace_id: str, trigger_id: str):
- return result.success(TriggerOperateSerializer(
- data={'trigger_id': trigger_id, 'workspace_id': workspace_id, 'user_id': request.user.id}
- ).one())
- @extend_schema(
- methods=['PUT'],
- description=_('Modify the trigger'),
- summary=_('Modify the trigger'),
- operation_id=_('Modify the trigger'), # type: ignore
- parameters=TriggerOperateAPI.get_parameters(),
- request=TriggerEditAPI.get_request(),
- responses=result.DefaultResultSerializer,
- tags=[_('Trigger')] # type: ignore
- )
- @has_permissions(
- PermissionConstants.TRIGGER_EDIT.get_workspace_permission_workspace_manage_role(),
- RoleConstants.WORKSPACE_MANAGE.get_workspace_role(),
- )
- @log(
- menu="Trigger", operate="Modify the trigger",
- get_operation_object=lambda r, k: get_trigger_operation_object(k.get('trigger_id')),
- )
- def put(self, request: Request, workspace_id: str, trigger_id: str):
- return result.success(TriggerOperateSerializer(
- data={'trigger_id': trigger_id, 'workspace_id': workspace_id, 'user_id': request.user.id}
- ).edit(request.data))
- @extend_schema(
- methods=['DELETE'],
- description=_('Delete the trigger'),
- summary=_('Delete the trigger'),
- operation_id=_('Delete the trigger'), # type: ignore
- parameters=TriggerOperateAPI.get_parameters(),
- responses=result.DefaultResultSerializer,
- tags=[_('Trigger')] # type: ignore
- )
- @has_permissions(
- PermissionConstants.TRIGGER_DELETE.get_workspace_permission_workspace_manage_role(),
- RoleConstants.WORKSPACE_MANAGE.get_workspace_role(),
- )
- @log(
- menu="Trigger", operate="Delete the trigger",
- get_operation_object=lambda r, k: get_trigger_operation_object(k.get('trigger_id')),
- )
- def delete(self, request: Request, workspace_id: str, trigger_id: str):
- return result.success(TriggerOperateSerializer(
- data={'trigger_id': trigger_id, 'workspace_id': workspace_id, 'user_id': request.user.id}
- ).delete())
- class BatchDelete(APIView):
- authentication_classes = [TokenAuth]
- @extend_schema(
- methods=['PUT'],
- description=_('Delete trigger in batches'),
- summary=_('Delete trigger in batches'),
- operation_id=_('Delete trigger in batches'), # type: ignore
- parameters=TriggerBatchDeleteAPI.get_parameters(),
- request=TriggerBatchDeleteAPI.get_request(),
- responses=result.DefaultResultSerializer,
- tags=[_('Trigger')] # type: ignore
- )
- @has_permissions(
- PermissionConstants.TRIGGER_DELETE.get_workspace_permission_workspace_manage_role(),
- RoleConstants.WORKSPACE_MANAGE.get_workspace_role(),
- )
- @log(
- menu="Trigger", operate="Delete trigger in batches",
- get_operation_object=lambda r, k: get_trigger_operation_object_batch(r.data.get('id_list')),
- )
- def put(self, request: Request, workspace_id: str):
- return result.success(TriggerSerializer.Batch(
- data={'workspace_id': workspace_id, 'user_id': request.user.id}
- ).batch_delete(request.data))
- class BatchActivate(APIView):
- authentication_classes = [TokenAuth]
- @extend_schema(
- methods=['PUT'],
- description=_('Activate trigger in batches'),
- summary=_('Activate trigger in batches'),
- operation_id=_('Activate trigger in batches'), # type: ignore
- parameters=TriggerBatchDeleteAPI.get_parameters(),
- request=TriggerBatchActiveAPI.get_request(),
- responses=result.DefaultResultSerializer,
- tags=[_('Trigger')] # type: ignore
- )
- @has_permissions(
- PermissionConstants.TRIGGER_EDIT.get_workspace_permission_workspace_manage_role(),
- RoleConstants.WORKSPACE_MANAGE.get_workspace_role(),
- )
- @log(
- menu="Trigger", operate="Activate trigger in batches",
- get_operation_object=lambda r, k: get_trigger_operation_object_batch(r.data.get('id_list')),
- )
- def put(self, request: Request, workspace_id: str):
- return result.success(TriggerSerializer.Batch(
- data={'workspace_id': workspace_id, 'user_id': request.user.id}
- ).batch_switch(request.data))
- class Page(APIView):
- authentication_classes = [TokenAuth]
- @extend_schema(
- methods=['GET'],
- description=_('Get the trigger list by page'),
- summary=_('Get the trigger list by page'),
- operation_id=_('Get the trigger list by page'), # type: ignore
- parameters=TriggerQueryPageAPI.get_parameters(),
- responses=TriggerQueryPageAPI.get_response(),
- tags=[_('Trigger')] # type: ignore
- )
- @has_permissions(
- PermissionConstants.TRIGGER_READ.get_workspace_permission_workspace_manage_role(),
- RoleConstants.WORKSPACE_MANAGE.get_workspace_role(),
- )
- def get(self, request: Request, workspace_id: str, current_page: int, page_size: int):
- return result.success(TriggerQuerySerializer(data={
- 'workspace_id': workspace_id,
- 'name': request.query_params.get('name'),
- 'task': request.query_params.get('task'),
- 'type': request.query_params.get('type'),
- 'is_active': request.query_params.get('is_active'),
- 'create_user': request.query_params.get('create_user'),
- }).page(current_page, page_size))
- class TaskSourceTriggerView(APIView):
- authentication_classes = [TokenAuth]
- @extend_schema(
- methods=['POST'],
- description=_('Create trigger in source'),
- summary=_('Create trigger in source'),
- operation_id=_('Create trigger in source'), # type: ignore
- parameters=TaskSourceTriggerCreateAPI.get_parameters(),
- request=TaskSourceTriggerCreateAPI.get_request(),
- responses=TaskSourceTriggerCreateAPI.get_response(),
- tags=[_('Trigger')] # type: ignore
- )
- @has_permissions(
- lambda r, kwargs: Permission(group=Group(kwargs.get("source_type")), operate=Operate.TRIGGER_CREATE,
- resource_path=f"/WORKSPACE/{kwargs.get('workspace_id')}:ROLE/WORKSPACE_MANAGE"
- ),
- lambda r, kwargs: Permission(group=Group(kwargs.get("source_type")), operate=Operate.TRIGGER_CREATE,
- resource_path=f"/WORKSPACE/{kwargs.get('workspace_id')}/{kwargs.get('source_type')}/{kwargs.get('source_id')}"
- ),
- ViewPermission([RoleConstants.USER.get_workspace_role()],
- [lambda r, kwargs: Permission(group=Group(kwargs.get('source_type')),
- operate=Operate.SELF,
- resource_path=f"/WORKSPACE/{kwargs.get('workspace_id')}/{kwargs.get('source_type')}/{kwargs.get('source_id')}")],
- CompareConstants.AND),
- RoleConstants.WORKSPACE_MANAGE.get_workspace_role())
- @log(
- menu="Trigger", operate="Create trigger in source",
- get_operation_object=lambda r, k: r.data.get('name'),
- )
- def post(self, request: Request, workspace_id: str, source_type: str, source_id: str):
- return result.success(TaskSourceTriggerSerializer(data={
- 'workspace_id': workspace_id,
- 'user_id': request.user.id
- }).insert({**request.data, 'source_id': source_id,
- 'workspace_id': workspace_id,
- 'is_active': True,
- 'source_type': source_type}))
- @extend_schema(
- methods=['GET'],
- description=_('Get the trigger list of source'),
- summary=_('Get the trigger list of source'),
- operation_id=_('Get the trigger list of source'), # type: ignore
- parameters=TaskSourceTriggerAPI.get_parameters(),
- responses=DefaultResultSerializer,
- tags=[_('Trigger')] # type: ignore
- )
- @has_permissions(
- lambda r, kwargs: Permission(group=Group(kwargs.get("source_type")), operate=Operate.TRIGGER_READ,
- resource_path=f"/WORKSPACE/{kwargs.get('workspace_id')}:ROLE/WORKSPACE_MANAGE"
- ),
- lambda r, kwargs: Permission(group=Group(kwargs.get("source_type")), operate=Operate.TRIGGER_READ,
- resource_path=f"/WORKSPACE/{kwargs.get('workspace_id')}/{kwargs.get('source_type')}/{kwargs.get('source_id')}"
- ),
- RoleConstants.USER.get_workspace_role(),
- RoleConstants.WORKSPACE_MANAGE.get_workspace_role())
- def get(self, request: Request, workspace_id: str, source_type: str, source_id: str):
- return result.success(TaskSourceTriggerListSerializer(data={
- 'workspace_id': workspace_id,
- 'source_id': source_id,
- 'source_type': source_type,
- }).list())
- class Operate(APIView):
- authentication_classes = [TokenAuth]
- @extend_schema(
- methods=['GET'],
- description=_('Get Task source trigger details'),
- summary=_('Get Task source trigger details'),
- operation_id=_('Get Task source trigger details'), # type: ignore
- parameters=TaskSourceTriggerOperateAPI.get_parameters(),
- responses=result.DefaultResultSerializer,
- tags=[_('Trigger')] # type: ignore
- )
- @has_permissions(
- lambda r, kwargs: Permission(group=Group(kwargs.get("source_type")), operate=Operate.TRIGGER_READ,
- resource_path=f"/WORKSPACE/{kwargs.get('workspace_id')}:ROLE/WORKSPACE_MANAGE"
- ),
- lambda r, kwargs: Permission(group=Group(kwargs.get("source_type")), operate=Operate.TRIGGER_READ,
- resource_path=f"/WORKSPACE/{kwargs.get('workspace_id')}/{kwargs.get('source_type')}/{kwargs.get('source_id')}"
- ),
- RoleConstants.USER.get_workspace_role(),
- RoleConstants.WORKSPACE_MANAGE.get_workspace_role())
- def get(self, request: Request, workspace_id: str, source_type: str, source_id: str, trigger_id: str):
- return result.success(TaskSourceTriggerOperateSerializer(
- data={'trigger_id': trigger_id, 'workspace_id': workspace_id,
- 'source_id': source_id, 'source_type': source_type}
- ).one())
- @extend_schema(
- methods=['PUT'],
- description=_('Modify the task source trigger'),
- summary=_('Modify the task source trigger'),
- operation_id=_('Modify the task source trigger'), # type: ignore
- parameters=TaskSourceTriggerOperateAPI.get_parameters(),
- request=TaskSourceTriggerOperateAPI.get_request(),
- responses=result.DefaultResultSerializer,
- tags=[_('Trigger')] # type: ignore
- )
- @has_permissions(
- lambda r, kwargs: Permission(group=Group(kwargs.get("source_type")), operate=Operate.TRIGGER_EDIT,
- resource_path=f"/WORKSPACE/{kwargs.get('workspace_id')}:ROLE/WORKSPACE_MANAGE"
- ),
- lambda r, kwargs: Permission(group=Group(kwargs.get("source_type")), operate=Operate.TRIGGER_EDIT,
- resource_path=f"/WORKSPACE/{kwargs.get('workspace_id')}/{kwargs.get('source_type')}/{kwargs.get('source_id')}"
- ),
- ViewPermission([RoleConstants.USER.get_workspace_role()],
- [lambda r, kwargs: Permission(group=Group(kwargs.get('source_type')),
- operate=Operate.SELF,
- resource_path=f"/WORKSPACE/{kwargs.get('workspace_id')}/{kwargs.get('source_type')}/{kwargs.get('source_id')}")],
- CompareConstants.AND),
- RoleConstants.WORKSPACE_MANAGE.get_workspace_role())
- @log(
- menu="Trigger", operate="Modify the source point trigger",
- get_operation_object=lambda r, k: get_trigger_operation_object(k.get('trigger_id')),
- )
- def put(self, request: Request, workspace_id: str, source_type: str, source_id: str, trigger_id: str):
- return result.success(TaskSourceTriggerOperateSerializer(
- data={'trigger_id': trigger_id, 'workspace_id': workspace_id,
- 'source_id': source_id, 'source_type': source_type}
- ).edit(request.data))
- @extend_schema(
- methods=['DELETE'],
- description=_('Delete the task source trigger'),
- summary=_('Delete the task source trigger'),
- operation_id=_('Delete the task source trigger'), # type: ignore
- parameters=TaskSourceTriggerOperateAPI.get_parameters(),
- responses=result.DefaultResultSerializer,
- tags=[_('Trigger')] # type: ignore
- )
- @has_permissions(
- lambda r, kwargs: Permission(group=Group(kwargs.get("source_type")), operate=Operate.TRIGGER_DELETE,
- resource_path=f"/WORKSPACE/{kwargs.get('workspace_id')}:ROLE/WORKSPACE_MANAGE"
- ),
- lambda r, kwargs: Permission(group=Group(kwargs.get("source_type")), operate=Operate.TRIGGER_DELETE,
- resource_path=f"/WORKSPACE/{kwargs.get('workspace_id')}/{kwargs.get('source_type')}/{kwargs.get('source_id')}"
- ),
- ViewPermission([RoleConstants.USER.get_workspace_role()],
- [lambda r, kwargs: Permission(group=Group(kwargs.get('source_type')),
- operate=Operate.SELF,
- resource_path=f"/WORKSPACE/{kwargs.get('workspace_id')}/{kwargs.get('source_type')}/{kwargs.get('source_id')}")],
- CompareConstants.AND),
- RoleConstants.WORKSPACE_MANAGE.get_workspace_role())
- @log(
- menu="Trigger", operate="Delete the source point trigger",
- get_operation_object=lambda r, k: get_trigger_operation_object(k.get('trigger_id')),
- )
- def delete(self, request: Request, workspace_id: str, source_type: str, source_id: str, trigger_id: str):
- return result.success(TaskSourceTriggerOperateSerializer(
- data={'trigger_id': trigger_id, 'workspace_id': workspace_id,
- 'source_id': source_id, 'source_type': source_type}
- ).delete())
|