utils.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. # -*- coding: utf-8 -*-
  2. #
  3. import logging
  4. import os
  5. import uuid
  6. from django.conf import settings
  7. from django_celery_beat.models import (
  8. PeriodicTasks
  9. )
  10. from common.utils.logger import maxkb_logger
  11. from maxkb.const import PROJECT_DIR
  12. logger = logging.getLogger(__file__)
  13. def disable_celery_periodic_task(task_name):
  14. from django_celery_beat.models import PeriodicTask
  15. PeriodicTask.objects.filter(name=task_name).update(enabled=False)
  16. PeriodicTasks.update_changed()
  17. def delete_celery_periodic_task(task_name):
  18. from django_celery_beat.models import PeriodicTask
  19. PeriodicTask.objects.filter(name=task_name).delete()
  20. PeriodicTasks.update_changed()
  21. def get_celery_periodic_task(task_name):
  22. from django_celery_beat.models import PeriodicTask
  23. task = PeriodicTask.objects.filter(name=task_name).first()
  24. return task
  25. def make_dirs(name, mode=0o700, exist_ok=False):
  26. """ 默认权限设置为 0o700 """
  27. return os.makedirs(name, mode=mode, exist_ok=exist_ok)
  28. def get_task_log_path(base_path, task_id, level=2):
  29. task_id = str(task_id)
  30. try:
  31. uuid.UUID(task_id)
  32. except:
  33. return os.path.join(PROJECT_DIR, 'data', 'caution.txt')
  34. rel_path = os.path.join(*task_id[:level], task_id + '.log')
  35. path = os.path.join(base_path, rel_path)
  36. make_dirs(os.path.dirname(path), exist_ok=True)
  37. return path
  38. def get_celery_task_log_path(task_id):
  39. return get_task_log_path(settings.CELERY_LOG_DIR, task_id)
  40. def get_celery_status():
  41. from . import app
  42. i = app.control.inspect()
  43. ping_data = i.ping() or {}
  44. active_nodes = [k for k, v in ping_data.items() if v.get('ok') == 'pong']
  45. active_queue_worker = set([n.split('@')[0] for n in active_nodes if n])
  46. # Celery Worker 数量: 2
  47. if len(active_queue_worker) < 2:
  48. maxkb_logger.info("Not all celery worker worked")
  49. return False
  50. else:
  51. return True