logger.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. import logging
  2. from logging import StreamHandler
  3. from threading import get_ident
  4. from celery import current_task
  5. from celery.signals import task_prerun, task_postrun
  6. from django.conf import settings
  7. from kombu import Connection, Exchange, Queue, Producer
  8. from kombu.mixins import ConsumerMixin
  9. from common.utils.logger import maxkb_logger
  10. from .utils import get_celery_task_log_path
  11. from .const import CELERY_LOG_MAGIC_MARK
  12. routing_key = 'celery_log'
  13. celery_log_exchange = Exchange('celery_log_exchange', type='direct')
  14. celery_log_queue = [Queue('celery_log', celery_log_exchange, routing_key=routing_key)]
  15. class CeleryLoggerConsumer(ConsumerMixin):
  16. def __init__(self):
  17. self.connection = Connection(settings.CELERY_LOG_BROKER_URL)
  18. def get_consumers(self, Consumer, channel):
  19. return [Consumer(queues=celery_log_queue,
  20. accept=['pickle', 'json'],
  21. callbacks=[self.process_task])
  22. ]
  23. def handle_task_start(self, task_id, message):
  24. pass
  25. def handle_task_end(self, task_id, message):
  26. pass
  27. def handle_task_log(self, task_id, msg, message):
  28. pass
  29. def process_task(self, body, message):
  30. action = body.get('action')
  31. task_id = body.get('task_id')
  32. msg = body.get('msg')
  33. if action == CeleryLoggerProducer.ACTION_TASK_LOG:
  34. self.handle_task_log(task_id, msg, message)
  35. elif action == CeleryLoggerProducer.ACTION_TASK_START:
  36. self.handle_task_start(task_id, message)
  37. elif action == CeleryLoggerProducer.ACTION_TASK_END:
  38. self.handle_task_end(task_id, message)
  39. class CeleryLoggerProducer:
  40. ACTION_TASK_START, ACTION_TASK_LOG, ACTION_TASK_END = range(3)
  41. def __init__(self):
  42. self.connection = Connection(settings.CELERY_LOG_BROKER_URL)
  43. @property
  44. def producer(self):
  45. return Producer(self.connection)
  46. def publish(self, payload):
  47. self.producer.publish(
  48. payload, serializer='json', exchange=celery_log_exchange,
  49. declare=[celery_log_exchange], routing_key=routing_key
  50. )
  51. def log(self, task_id, msg):
  52. payload = {'task_id': task_id, 'msg': msg, 'action': self.ACTION_TASK_LOG}
  53. return self.publish(payload)
  54. def read(self):
  55. pass
  56. def flush(self):
  57. pass
  58. def task_end(self, task_id):
  59. payload = {'task_id': task_id, 'action': self.ACTION_TASK_END}
  60. return self.publish(payload)
  61. def task_start(self, task_id):
  62. payload = {'task_id': task_id, 'action': self.ACTION_TASK_START}
  63. return self.publish(payload)
  64. class CeleryTaskLoggerHandler(StreamHandler):
  65. terminator = '\r\n'
  66. def __init__(self, *args, **kwargs):
  67. super().__init__(*args, **kwargs)
  68. task_prerun.connect(self.on_task_start)
  69. task_postrun.connect(self.on_start_end)
  70. @staticmethod
  71. def get_current_task_id():
  72. if not current_task:
  73. return
  74. task_id = current_task.request.root_id
  75. return task_id
  76. def on_task_start(self, sender, task_id, **kwargs):
  77. return self.handle_task_start(task_id)
  78. def on_start_end(self, sender, task_id, **kwargs):
  79. return self.handle_task_end(task_id)
  80. def after_task_publish(self, sender, body, **kwargs):
  81. pass
  82. def emit(self, record):
  83. task_id = self.get_current_task_id()
  84. if not task_id:
  85. return
  86. try:
  87. self.write_task_log(task_id, record)
  88. self.flush()
  89. except Exception:
  90. self.handleError(record)
  91. def write_task_log(self, task_id, msg):
  92. pass
  93. def handle_task_start(self, task_id):
  94. pass
  95. def handle_task_end(self, task_id):
  96. pass
  97. class CeleryThreadingLoggerHandler(CeleryTaskLoggerHandler):
  98. @staticmethod
  99. def get_current_thread_id():
  100. return str(get_ident())
  101. def emit(self, record):
  102. thread_id = self.get_current_thread_id()
  103. try:
  104. self.write_thread_task_log(thread_id, record)
  105. self.flush()
  106. except ValueError:
  107. self.handleError(record)
  108. def write_thread_task_log(self, thread_id, msg):
  109. pass
  110. def handle_task_start(self, task_id):
  111. pass
  112. def handle_task_end(self, task_id):
  113. pass
  114. def handleError(self, record) -> None:
  115. pass
  116. class CeleryTaskMQLoggerHandler(CeleryTaskLoggerHandler):
  117. def __init__(self):
  118. self.producer = CeleryLoggerProducer()
  119. super().__init__(stream=None)
  120. def write_task_log(self, task_id, record):
  121. msg = self.format(record)
  122. self.producer.log(task_id, msg)
  123. def flush(self):
  124. self.producer.flush()
  125. class CeleryTaskFileHandler(CeleryTaskLoggerHandler):
  126. def __init__(self, *args, **kwargs):
  127. self.f = None
  128. super().__init__(*args, **kwargs)
  129. def emit(self, record):
  130. msg = self.format(record)
  131. if not self.f or self.f.closed:
  132. return
  133. self.f.write(msg)
  134. self.f.write(self.terminator)
  135. self.flush()
  136. def flush(self):
  137. self.f and self.f.flush()
  138. def handle_task_start(self, task_id):
  139. log_path = get_celery_task_log_path(task_id)
  140. self.f = open(log_path, 'a')
  141. def handle_task_end(self, task_id):
  142. self.f and self.f.close()
  143. class CeleryThreadTaskFileHandler(CeleryThreadingLoggerHandler):
  144. def __init__(self, *args, **kwargs):
  145. self.thread_id_fd_mapper = {}
  146. self.task_id_thread_id_mapper = {}
  147. super().__init__(*args, **kwargs)
  148. def write_thread_task_log(self, thread_id, record):
  149. f = self.thread_id_fd_mapper.get(thread_id, None)
  150. if not f:
  151. raise ValueError('Not found thread task file')
  152. msg = self.format(record)
  153. f.write(msg.encode())
  154. f.write(self.terminator.encode())
  155. f.flush()
  156. def flush(self):
  157. for f in self.thread_id_fd_mapper.values():
  158. f.flush()
  159. def handle_task_start(self, task_id):
  160. maxkb_logger.info('handle_task_start')
  161. log_path = get_celery_task_log_path(task_id)
  162. thread_id = self.get_current_thread_id()
  163. self.task_id_thread_id_mapper[task_id] = thread_id
  164. f = open(log_path, 'ab')
  165. self.thread_id_fd_mapper[thread_id] = f
  166. def handle_task_end(self, task_id):
  167. maxkb_logger.info('handle_task_end')
  168. ident_id = self.task_id_thread_id_mapper.get(task_id, '')
  169. f = self.thread_id_fd_mapper.pop(ident_id, None)
  170. if f and not f.closed:
  171. f.write(CELERY_LOG_MAGIC_MARK)
  172. f.close()
  173. self.task_id_thread_id_mapper.pop(task_id, None)