redis_duplicate_checker.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. """
  2. 基于Redis的重复任务检查器
  3. 支持多进程间的重复任务检查
  4. """
  5. import os
  6. import json
  7. from datetime import datetime, timedelta
  8. import redis
  9. from foundation.logger.loggering import server_logger as logger
  10. class RedisDuplicateChecker:
  11. """基于Redis的重复任务检查器"""
  12. def __init__(self):
  13. try:
  14. # 从配置文件读取Redis连接信息
  15. from foundation.base.config import config_handler
  16. redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
  17. redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
  18. redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
  19. # 构建Redis连接URL
  20. if redis_password:
  21. redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/2"
  22. else:
  23. redis_url = f"redis://{redis_host}:{redis_port}/2"
  24. logger.info(f"连接Redis: {redis_url}")
  25. # 连接Redis
  26. self.redis_client = redis.from_url(redis_url, decode_responses=True)
  27. # 测试连接
  28. self.redis_client.ping()
  29. logger.info("Redis重复检查器连接成功")
  30. self.use_redis = True
  31. except Exception as e:
  32. logger.error(f"Redis连接失败,回退到内存模式: {str(e)}")
  33. # 回退到内存模式
  34. self.task_cache = {}
  35. self.use_redis = False
  36. else:
  37. self.use_redis = True
  38. async def is_duplicate_task(self, file_id: str) -> bool:
  39. """检查是否为重复任务"""
  40. try:
  41. if self.use_redis:
  42. # 使用Redis检查
  43. task_info = self.redis_client.get(f"task:{file_id}")
  44. if task_info:
  45. # 检查任务是否过期
  46. task_data = json.loads(task_info)
  47. created_at = datetime.fromisoformat(task_data['created_at'])
  48. if datetime.now() - created_at < timedelta(minutes=2):
  49. logger.info(f"发现重复任务: {file_id}")
  50. return True
  51. else:
  52. # 任务已过期,清理
  53. self.redis_client.delete(f"task:{file_id}")
  54. return False
  55. return False
  56. else:
  57. # 回退到内存模式
  58. if file_id in self.task_cache:
  59. logger.info(f"发现重复任务: {file_id}")
  60. return True
  61. return False
  62. except Exception as e:
  63. logger.error(f"检查重复任务失败: {str(e)}")
  64. return False
  65. async def register_task(self, file_info: dict, callback_task_id: str):
  66. """注册任务"""
  67. try:
  68. # 过滤掉不可序列化的字段(如file_content等bytes数据)
  69. serializable_file_info = {
  70. k: v for k, v in file_info.items()
  71. if k not in ['file_content'] and not isinstance(v, bytes)
  72. }
  73. task_data = {
  74. "callback_task_id": callback_task_id,
  75. "created_at": datetime.now().isoformat(),
  76. "file_info": serializable_file_info
  77. }
  78. if self.use_redis:
  79. # 使用Redis存储,设置1小时过期
  80. self.redis_client.setex(
  81. f"task:{file_info['file_id']}",
  82. 3600, # 1小时
  83. json.dumps(task_data, ensure_ascii=False)
  84. )
  85. else:
  86. # 回退到内存模式
  87. self.task_cache[file_info['file_id']] = task_data
  88. logger.info(f"注册任务: {file_info['file_id']} -> {callback_task_id}")
  89. except Exception as e:
  90. logger.error(f"注册任务失败: {str(e)}")
  91. raise
  92. async def unregister_task(self, file_id: str):
  93. """取消注册任务"""
  94. try:
  95. if self.use_redis:
  96. self.redis_client.delete(f"task:{file_id}")
  97. else:
  98. if file_id in self.task_cache:
  99. del self.task_cache[file_id]
  100. logger.info(f"取消注册任务: {file_id}")
  101. except Exception as e:
  102. logger.error(f"取消注册任务失败: {str(e)}")
  103. async def get_task_info(self, file_id: str) -> str:
  104. """获取任务信息"""
  105. try:
  106. if self.use_redis:
  107. task_info = self.redis_client.get(f"task:{file_id}")
  108. if task_info:
  109. task_data = json.loads(task_info)
  110. return task_data.get("callback_task_id", "")
  111. return ""
  112. else:
  113. if file_id in self.task_cache:
  114. return self.task_cache[file_id].get("callback_task_id", "")
  115. return ""
  116. except Exception as e:
  117. logger.error(f"获取任务信息失败: {str(e)}")
  118. return ""
  119. def cleanup_expired_cache(self):
  120. """清理过期缓存(Redis自动处理)"""
  121. try:
  122. if not self.use_redis:
  123. current_time = datetime.now()
  124. expired_files = []
  125. for file_id, task_info in list(self.task_cache.items()):
  126. created_at = datetime.fromisoformat(task_info['created_at'])
  127. if current_time - created_at > timedelta(hours=1):
  128. expired_files.append(file_id)
  129. for file_id in expired_files:
  130. del self.task_cache[file_id]
  131. if expired_files:
  132. logger.info(f"清理过期缓存: {len(expired_files)} 个文件")
  133. except Exception as e:
  134. logger.error(f"清理过期缓存失败: {str(e)}")