| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- # coding=utf-8
- """
- @project: MaxKB
- @Author:虎虎
- @file: simple_task.py
- @date:2026/1/14 19:18
- @desc:
- """
- from threading import Thread
- from trigger.handler.impl.task.application_task import ApplicationTask
- from trigger.handler.impl.task.tool_task import ToolTask
- from trigger.handler.impl.trigger.event_trigger import EventTrigger
- from trigger.handler.impl.trigger.scheduled_trigger import ScheduledTrigger
- simple_task_handlers = [ApplicationTask(), ToolTask()]
- simple_trigger_handlers = [ScheduledTrigger(), EventTrigger()]
- def execute(trigger_task, **kwargs):
- """
- 执行触发器任务
- @param trigger_task: 触发器任务数据
- @param kwargs: 额外数据
- @return:
- """
- for simple_task_handler in simple_task_handlers:
- if simple_task_handler.support(trigger_task, **kwargs):
- Thread(target=simple_task_handler.execute, args=(trigger_task,), kwargs=kwargs).start()
- return
- raise Exception("不支持的处理器类型")
- def deploy(trigger, **kwargs):
- """
- 部署触发器
- @param trigger: 触发器字典数据
- @param kwargs: 额外数据
- @return:
- """
- for simple_trigger_handler in simple_trigger_handlers:
- if simple_trigger_handler.support(trigger, **kwargs):
- return simple_trigger_handler.deploy(trigger, **kwargs)
- raise Exception("不支持的触发器类型")
- def undeploy(trigger, **kwargs):
- """
- 取消部署触发器
- @param trigger: 触发器字典数据
- @param kwargs: 额外数据
- @return:
- """
- for simple_trigger_handler in simple_trigger_handlers:
- return simple_trigger_handler.undeploy(trigger, **kwargs)
- raise Exception("不支持的触发器类型")
|