simple_tools.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. # coding=utf-8
  2. """
  3. @project: MaxKB
  4. @Author:虎虎
  5. @file: simple_task.py
  6. @date:2026/1/14 19:18
  7. @desc:
  8. """
  9. from threading import Thread
  10. from trigger.handler.impl.task.application_task import ApplicationTask
  11. from trigger.handler.impl.task.tool_task import ToolTask
  12. from trigger.handler.impl.trigger.event_trigger import EventTrigger
  13. from trigger.handler.impl.trigger.scheduled_trigger import ScheduledTrigger
  14. simple_task_handlers = [ApplicationTask(), ToolTask()]
  15. simple_trigger_handlers = [ScheduledTrigger(), EventTrigger()]
  16. def execute(trigger_task, **kwargs):
  17. """
  18. 执行触发器任务
  19. @param trigger_task: 触发器任务数据
  20. @param kwargs: 额外数据
  21. @return:
  22. """
  23. for simple_task_handler in simple_task_handlers:
  24. if simple_task_handler.support(trigger_task, **kwargs):
  25. Thread(target=simple_task_handler.execute, args=(trigger_task,), kwargs=kwargs).start()
  26. return
  27. raise Exception("不支持的处理器类型")
  28. def deploy(trigger, **kwargs):
  29. """
  30. 部署触发器
  31. @param trigger: 触发器字典数据
  32. @param kwargs: 额外数据
  33. @return:
  34. """
  35. for simple_trigger_handler in simple_trigger_handlers:
  36. if simple_trigger_handler.support(trigger, **kwargs):
  37. return simple_trigger_handler.deploy(trigger, **kwargs)
  38. raise Exception("不支持的触发器类型")
  39. def undeploy(trigger, **kwargs):
  40. """
  41. 取消部署触发器
  42. @param trigger: 触发器字典数据
  43. @param kwargs: 额外数据
  44. @return:
  45. """
  46. for simple_trigger_handler in simple_trigger_handlers:
  47. return simple_trigger_handler.undeploy(trigger, **kwargs)
  48. raise Exception("不支持的触发器类型")