signal_handler.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. # -*- coding: utf-8 -*-
  2. #
  3. import logging
  4. import os
  5. import uuid_utils.compat as uuid
  6. from celery import subtask
  7. from celery.signals import (
  8. worker_ready, worker_shutdown, after_setup_logger, task_revoked, task_prerun
  9. )
  10. from django.core.cache import cache
  11. from django_apscheduler.models import DjangoJob
  12. from django_celery_beat.models import PeriodicTask
  13. from common.utils.logger import maxkb_logger
  14. from .decorator import get_after_app_ready_tasks, get_after_app_shutdown_clean_tasks
  15. from .logger import CeleryThreadTaskFileHandler
  16. logger = logging.getLogger(__file__)
  17. safe_str = lambda x: x
  18. def init_scheduler():
  19. from common import job
  20. from common.init import init_template
  21. from trigger.models import Trigger
  22. job.run()
  23. init_template.run()
  24. # 清理已经不存在的 trigger job
  25. trigger_jobs = DjangoJob.objects.filter(id__startswith="trigger:")
  26. # 从 job id 中提取 trigger_id (格式: trigger:<trigger_id>:task:...)
  27. trigger_ids_from_jobs = set()
  28. job_id_to_trigger_id = {} # 映射 job_id -> trigger_id
  29. for job in trigger_jobs:
  30. parts = job.id.split(':')
  31. if len(parts) >= 2:
  32. trigger_id = uuid.UUID(parts[1]) # 提取 trigger_id
  33. trigger_ids_from_jobs.add(trigger_id)
  34. job_id_to_trigger_id[job.id] = trigger_id
  35. # 获取所有有效的 Trigger ID
  36. valid_trigger_ids = set(Trigger.objects.filter(
  37. id__in=trigger_ids_from_jobs, is_active=True
  38. ).values_list('id', flat=True))
  39. # 找出需要删除的 job (trigger 已不存在的)
  40. jobs_to_delete = [
  41. job_id for job_id, trigger_id in job_id_to_trigger_id.items()
  42. if trigger_id not in valid_trigger_ids
  43. ]
  44. if jobs_to_delete:
  45. DjangoJob.objects.filter(id__in=jobs_to_delete).delete()
  46. logger.info(f"Cleaned up {len(jobs_to_delete)} orphaned trigger jobs")
  47. try:
  48. from xpack import job as xpack_job
  49. xpack_job.run()
  50. except ImportError:
  51. pass
  52. @worker_ready.connect
  53. def on_app_ready(sender=None, headers=None, **kwargs):
  54. if cache.get("CELERY_APP_READY", 0) == 1:
  55. return
  56. cache.set("CELERY_APP_READY", 1, 10)
  57. # 初始化定时任务
  58. init_scheduler()
  59. tasks = get_after_app_ready_tasks()
  60. logger.debug("Work ready signal recv")
  61. logger.debug("Start need start task: [{}]".format(", ".join(tasks)))
  62. for task in tasks:
  63. periodic_task = PeriodicTask.objects.filter(task=task).first()
  64. if periodic_task and not periodic_task.enabled:
  65. logger.debug("Periodic task [{}] is disabled!".format(task))
  66. continue
  67. subtask(task).delay()
  68. def delete_files(directory):
  69. if os.path.isdir(directory):
  70. for filename in os.listdir(directory):
  71. file_path = os.path.join(directory, filename)
  72. if os.path.isfile(file_path):
  73. os.remove(file_path)
  74. @worker_shutdown.connect
  75. def after_app_shutdown_periodic_tasks(sender=None, **kwargs):
  76. if cache.get("CELERY_APP_SHUTDOWN", 0) == 1:
  77. return
  78. cache.set("CELERY_APP_SHUTDOWN", 1, 10)
  79. tasks = get_after_app_shutdown_clean_tasks()
  80. logger.debug("Worker shutdown signal recv")
  81. logger.debug("Clean period tasks: [{}]".format(', '.join(tasks)))
  82. PeriodicTask.objects.filter(name__in=tasks).delete()
  83. @after_setup_logger.connect
  84. def add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs):
  85. if not logger:
  86. return
  87. task_handler = CeleryThreadTaskFileHandler()
  88. task_handler.setLevel(loglevel)
  89. formatter = logging.Formatter(format)
  90. task_handler.setFormatter(formatter)
  91. logger.addHandler(task_handler)
  92. @task_revoked.connect
  93. def on_task_revoked(request, terminated, signum, expired, **kwargs):
  94. maxkb_logger.info('task_revoked', terminated)
  95. @task_prerun.connect
  96. def on_taskaa_start(sender, task_id, **kwargs):
  97. pass
  98. # sender.update_state(state='REVOKED',
  99. # meta={'exc_type': 'Exception', 'exc': 'Exception', 'message': '暂停任务', 'exc_message': ''})