pipeline_manage.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. # coding=utf-8
  2. """
  3. @project: maxkb
  4. @Author:虎
  5. @file: pipeline_manage.py
  6. @date:2024/1/9 17:40
  7. @desc:
  8. """
  9. import time
  10. from functools import reduce
  11. from typing import List, Type, Dict
  12. from application.chat_pipeline.I_base_chat_pipeline import IBaseChatPipelineStep
  13. from common.handle.base_to_response import BaseToResponse
  14. from common.handle.impl.response.system_to_response import SystemToResponse
  15. class PipelineManage:
  16. def __init__(self, step_list: List[Type[IBaseChatPipelineStep]],
  17. base_to_response: BaseToResponse = SystemToResponse(),
  18. debug=False):
  19. # 步骤执行器
  20. self.step_list = [step() for step in step_list]
  21. self.run_step_list = []
  22. # 上下文
  23. self.context = {'message_tokens': 0, 'answer_tokens': 0}
  24. self.base_to_response = base_to_response
  25. self.debug = debug
  26. def run(self, context: Dict = None):
  27. self.context['start_time'] = time.time()
  28. if context is not None:
  29. for key, value in context.items():
  30. self.context[key] = value
  31. for step in self.step_list:
  32. self.run_step_list.append(step)
  33. step.run(self)
  34. def get_details(self):
  35. return reduce(lambda x, y: {**x, **y}, [{item.get('step_type'): item} for item in
  36. filter(lambda r: r is not None,
  37. [row.get_details(self) for row in self.run_step_list])], {})
  38. def get_base_to_response(self):
  39. return self.base_to_response
  40. class builder:
  41. def __init__(self):
  42. self.step_list: List[Type[IBaseChatPipelineStep]] = []
  43. self.base_to_response = SystemToResponse()
  44. self.debug = False
  45. def append_step(self, step: Type[IBaseChatPipelineStep]):
  46. self.step_list.append(step)
  47. return self
  48. def add_base_to_response(self, base_to_response: BaseToResponse):
  49. self.base_to_response = base_to_response
  50. return self
  51. def add_debug(self, debug):
  52. self.debug = debug
  53. return self
  54. def build(self):
  55. return PipelineManage(step_list=self.step_list, base_to_response=self.base_to_response, debug=self.debug)