sync.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. # coding=utf-8
  2. """
  3. @project: MaxKB
  4. @Author:虎
  5. @file: sync.py
  6. @date:2024/8/20 21:37
  7. @desc:
  8. """
  9. import traceback
  10. from typing import List
  11. from celery_once import QueueOnce
  12. from django.utils.translation import gettext_lazy as _
  13. from common.utils.fork import ForkManage, Fork
  14. from common.utils.logger import maxkb_logger
  15. from ops import celery_app
  16. @celery_app.task(base=QueueOnce, once={'keys': ['knowledge_id']}, name='celery:sync_web_knowledge')
  17. def sync_web_knowledge(knowledge_id: str, url: str, selector: str):
  18. from knowledge.task.handler import get_save_handler
  19. try:
  20. maxkb_logger.info(
  21. _('Start--->Start synchronization web knowledge base:{knowledge_id}').format(knowledge_id=knowledge_id))
  22. ForkManage(url, selector.split(" ") if selector is not None else []).fork(2, set(),
  23. get_save_handler(knowledge_id,
  24. selector))
  25. maxkb_logger.info(_('End--->End synchronization web knowledge base:{knowledge_id}').format(knowledge_id=knowledge_id))
  26. except Exception as e:
  27. maxkb_logger.error(_('Synchronize web knowledge base:{knowledge_id} error{error}{traceback}').format(
  28. knowledge_id=knowledge_id, error=str(e), traceback=traceback.format_exc()))
  29. @celery_app.task(base=QueueOnce, once={'keys': ['knowledge_id']}, name='celery:sync_replace_web_knowledge')
  30. def sync_replace_web_knowledge(knowledge_id: str, url: str, selector: str):
  31. from knowledge.task.handler import get_sync_handler
  32. try:
  33. maxkb_logger.info(
  34. _('Start--->Start synchronization web knowledge base:{knowledge_id}').format(knowledge_id=knowledge_id))
  35. ForkManage(url, selector.split(" ") if selector is not None else []).fork(2, set(),
  36. get_sync_handler(knowledge_id
  37. ))
  38. maxkb_logger.info(_('End--->End synchronization web knowledge base:{knowledge_id}').format(knowledge_id=knowledge_id))
  39. except Exception as e:
  40. maxkb_logger.error(_('Synchronize web knowledge base:{knowledge_id} error{error}{traceback}').format(
  41. knowledge_id=knowledge_id, error=str(e), traceback=traceback.format_exc()))
  42. @celery_app.task(name='celery:sync_web_document')
  43. def sync_web_document(knowledge_id, source_url_list: List[str], selector: str):
  44. from knowledge.task.handler import get_sync_web_document_handler
  45. handler = get_sync_web_document_handler(knowledge_id)
  46. for source_url in source_url_list:
  47. try:
  48. result = Fork(base_fork_url=source_url, selector_list=selector.split(' ')).fork()
  49. handler(source_url, selector, result)
  50. except Exception as e:
  51. pass