clean_chat_job.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. # coding=utf-8
  2. import datetime
  3. from django.db import transaction
  4. from django.db.models import Q, Max
  5. from django.utils import timezone
  6. from application.models import Application, Chat, ChatRecord
  7. from common.job.scheduler import scheduler
  8. from common.utils.lock import lock, RedisLock
  9. from common.utils.logger import maxkb_logger
  10. from knowledge.models import File
  11. def clean_chat_log_job():
  12. clean_chat_log_job_lock()
  13. @lock(lock_key='clean_chat_log_job_execute', timeout=30)
  14. def clean_chat_log_job_lock():
  15. from django.utils.translation import gettext_lazy as _
  16. maxkb_logger.info(_('start clean chat log'))
  17. now = timezone.now()
  18. applications = Application.objects.all().values('id', 'clean_time', 'file_clean_time')
  19. cutoff_dates = {
  20. app['id']: now - datetime.timedelta(days=app['clean_time'] or 180)
  21. for app in applications
  22. }
  23. file_cutoff_dates = {
  24. app['id']: now - datetime.timedelta(days=app['file_clean_time'] or app['clean_time'] or 180)
  25. for app in applications
  26. }
  27. file_conditions = Q()
  28. for app_id, cutoff_date in file_cutoff_dates.items():
  29. file_conditions |= Q(chat__application_id=app_id, create_time__lt=cutoff_date)
  30. clean_method(file_conditions, clean_log=False)
  31. query_conditions = Q()
  32. for app_id, cutoff_date in cutoff_dates.items():
  33. query_conditions |= Q(chat__application_id=app_id, create_time__lt=cutoff_date)
  34. clean_method(query_conditions)
  35. maxkb_logger.info(_('end clean chat log'))
  36. def clean_method(query_conditions, clean_log=True):
  37. batch_size = 500
  38. while True:
  39. with transaction.atomic():
  40. chat_records = ChatRecord.objects.filter(query_conditions).select_related('chat').only('id', 'chat_id',
  41. 'create_time')[
  42. :batch_size]
  43. if not chat_records:
  44. break
  45. chat_record_ids = [record.id for record in chat_records]
  46. chat_ids = {record.chat_id for record in chat_records}
  47. # 计算每个 chat_id 的最大 create_time
  48. max_create_times = ChatRecord.objects.filter(id__in=chat_record_ids).values('chat_id').annotate(
  49. max_create_time=Max('create_time'))
  50. # 收集需要删除的文件
  51. files_to_delete = []
  52. for record in chat_records:
  53. max_create_time = next(
  54. (item['max_create_time'] for item in max_create_times if
  55. str(item['chat_id']) == str(record.chat_id)), None)
  56. if max_create_time:
  57. files_to_delete.extend(
  58. File.objects.filter(source_id=str(record.chat_id), create_time__lt=max_create_time)
  59. )
  60. # 删除 ChatRecord
  61. deleted_count = 0
  62. if clean_log:
  63. deleted_count = ChatRecord.objects.filter(id__in=chat_record_ids).delete()[0]
  64. from django.db.models import Count
  65. updated_counts = ChatRecord.objects.filter(chat_id__in=chat_ids) \
  66. .values('chat_id') \
  67. .annotate(count=Count('id'))
  68. count_map = {item['chat_id']: item['count'] for item in updated_counts}
  69. for chat_id in chat_ids:
  70. count = count_map.get(chat_id, 0) # 如果没有记录则为0
  71. Chat.objects.filter(id=chat_id).update(chat_record_count=count)
  72. # 删除没有关联 ChatRecord 的 Chat
  73. Chat.objects.filter(chatrecord__isnull=True, id__in=chat_ids).delete()
  74. File.objects.filter(loid__in=[file.loid for file in files_to_delete]).delete()
  75. if deleted_count < batch_size:
  76. break
  77. def run():
  78. rlock = RedisLock()
  79. if rlock.try_lock('clean_chat_log_job', 30 * 30):
  80. try:
  81. maxkb_logger.debug('get lock clean_chat_log_job')
  82. existing_job = scheduler.get_job(job_id='clean_chat_log')
  83. if existing_job is not None:
  84. existing_job.remove()
  85. scheduler.add_job(clean_chat_log_job, 'cron', hour='0', minute='5', id='clean_chat_log',
  86. misfire_grace_time=300, max_instances=1)
  87. finally:
  88. rlock.un_lock('clean_chat_log_job')