redis_duplicate_checker.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. """
  2. 基于Redis的重复任务检查器
  3. 支持多进程间的重复任务检查
  4. """
  5. import os
  6. import json
  7. from datetime import datetime, timedelta
  8. import redis
  9. from foundation.observability.logger.loggering import server_logger as logger
  10. class RedisDuplicateChecker:
  11. """基于Redis的重复任务检查器"""
  12. def __init__(self):
  13. try:
  14. # 从配置文件读取Redis连接信息
  15. from foundation.infrastructure.config import config_handler
  16. redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
  17. redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
  18. redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
  19. # 从配置文件获取数据库编号
  20. redis_db = config_handler.get('redis', 'REDIS_DB', '0')
  21. # 构建Redis连接URL (使用配置文件的数据库)
  22. if redis_password:
  23. redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}"
  24. else:
  25. redis_url = f"redis://{redis_host}:{redis_port}/{redis_db}"
  26. logger.info(f"连接Redis: {redis_url}")
  27. # 连接Redis
  28. self.redis_client = redis.from_url(redis_url, decode_responses=True)
  29. # 测试连接
  30. self.redis_client.ping()
  31. logger.info("Redis重复检查器连接成功")
  32. self.use_redis = True
  33. except Exception as e:
  34. logger.error(f"Redis连接失败,回退到内存模式: {str(e)}")
  35. # 回退到内存模式
  36. self.task_cache = {}
  37. self.use_redis = False
  38. else:
  39. self.use_redis = True
  40. async def is_duplicate_task(self, file_id: str) -> bool:
  41. """检查是否为重复任务"""
  42. try:
  43. if self.use_redis:
  44. # 使用Redis检查
  45. task_info = self.redis_client.get(f"task:{file_id}")
  46. if task_info:
  47. # 检查任务是否过期
  48. task_data = json.loads(task_info)
  49. created_at = datetime.fromisoformat(task_data['created_at'])
  50. if datetime.now() - created_at < timedelta(minutes=2):
  51. logger.info(f"发现重复任务: {file_id}")
  52. return True
  53. else:
  54. # 任务已过期,清理
  55. self.redis_client.delete(f"task:{file_id}")
  56. return False
  57. return False
  58. else:
  59. # 回退到内存模式
  60. if file_id in self.task_cache:
  61. logger.info(f"发现重复任务: {file_id}")
  62. return True
  63. return False
  64. except Exception as e:
  65. logger.error(f"检查重复任务失败: {str(e)}")
  66. return False
  67. async def register_task(self, file_info: dict, callback_task_id: str):
  68. """注册任务"""
  69. try:
  70. # 过滤掉不可序列化的字段(如file_content等bytes数据)
  71. serializable_file_info = {
  72. k: v for k, v in file_info.items()
  73. if k not in ['file_content'] and not isinstance(v, bytes)
  74. }
  75. task_data = {
  76. "callback_task_id": callback_task_id,
  77. "created_at": datetime.now().isoformat(),
  78. "used": False, # 标记任务是否已被使用启动审查
  79. "file_info": serializable_file_info
  80. }
  81. if self.use_redis:
  82. # 使用Redis存储,设置1小时过期
  83. self.redis_client.setex(
  84. f"task:{file_info['file_id']}",
  85. 3600, # 1小时
  86. json.dumps(task_data, ensure_ascii=False)
  87. )
  88. else:
  89. # 回退到内存模式
  90. self.task_cache[file_info['file_id']] = task_data
  91. logger.info(f"注册任务: {file_info['file_id']} -> {callback_task_id}")
  92. except Exception as e:
  93. logger.error(f"注册任务失败: {str(e)}")
  94. raise
  95. async def unregister_task(self, file_id: str):
  96. """取消注册任务"""
  97. try:
  98. if self.use_redis:
  99. self.redis_client.delete(f"task:{file_id}")
  100. else:
  101. if file_id in self.task_cache:
  102. del self.task_cache[file_id]
  103. logger.info(f"取消注册任务: {file_id}")
  104. except Exception as e:
  105. logger.error(f"取消注册任务失败: {str(e)}")
  106. async def is_valid_task_id(self, callback_task_id: str) -> bool:
  107. """验证任务ID是否存在且未过期"""
  108. try:
  109. if self.use_redis:
  110. # 遍历所有任务键,查找匹配的callback_task_id
  111. keys = self.redis_client.keys("task:*")
  112. for key in keys:
  113. task_info = self.redis_client.get(key)
  114. if task_info:
  115. task_data = json.loads(task_info)
  116. if task_data.get("callback_task_id") == callback_task_id:
  117. created_at = datetime.fromisoformat(task_data['created_at'])
  118. if datetime.now() - created_at < timedelta(hours=1):
  119. return True
  120. else:
  121. # 任务已过期,清理
  122. self.redis_client.delete(key)
  123. return False
  124. else:
  125. # 内存模式检查
  126. for file_id, task_info in self.task_cache.items():
  127. if task_info.get("callback_task_id") == callback_task_id:
  128. created_at = datetime.fromisoformat(task_info['created_at'])
  129. if datetime.now() - created_at < timedelta(hours=1):
  130. return True
  131. return False
  132. except Exception as e:
  133. logger.error(f"验证任务ID失败: {str(e)}")
  134. return False
  135. async def get_task_info(self, file_id: str) -> str:
  136. """获取任务信息"""
  137. try:
  138. if self.use_redis:
  139. task_info = self.redis_client.get(f"task:{file_id}")
  140. if task_info:
  141. task_data = json.loads(task_info)
  142. return task_data.get("callback_task_id", "")
  143. return ""
  144. else:
  145. if file_id in self.task_cache:
  146. return self.task_cache[file_id].get("callback_task_id", "")
  147. return ""
  148. except Exception as e:
  149. logger.error(f"获取任务信息失败: {str(e)}")
  150. return ""
  151. def cleanup_expired_cache(self):
  152. """清理过期缓存(Redis自动处理)"""
  153. try:
  154. if not self.use_redis:
  155. current_time = datetime.now()
  156. expired_files = []
  157. for file_id, task_info in list(self.task_cache.items()):
  158. created_at = datetime.fromisoformat(task_info['created_at'])
  159. if current_time - created_at > timedelta(hours=1):
  160. expired_files.append(file_id)
  161. for file_id in expired_files:
  162. del self.task_cache[file_id]
  163. if expired_files:
  164. logger.info(f"清理过期缓存: {len(expired_files)} 个文件")
  165. except Exception as e:
  166. logger.error(f"清理过期缓存失败: {str(e)}")
  167. async def is_task_already_used(self, callback_task_id: str) -> bool:
  168. """检查任务是否已经被使用启动审查"""
  169. try:
  170. if self.use_redis:
  171. # 遍历所有任务键,查找匹配的callback_task_id
  172. keys = self.redis_client.keys("task:*")
  173. for key in keys:
  174. task_info = self.redis_client.get(key)
  175. if task_info:
  176. task_data = json.loads(task_info)
  177. if task_data.get("callback_task_id") == callback_task_id:
  178. # 检查任务是否已被使用
  179. if task_data.get("used", False):
  180. logger.info(f"任务已被使用: {callback_task_id}")
  181. return True
  182. else:
  183. return False
  184. return False
  185. else:
  186. # 内存模式检查
  187. for file_id, task_info in self.task_cache.items():
  188. if task_info.get("callback_task_id") == callback_task_id:
  189. if task_info.get("used", False):
  190. return True
  191. else:
  192. return False
  193. return False
  194. except Exception as e:
  195. logger.error(f"检查任务使用状态失败: {str(e)}")
  196. return False
  197. async def mark_task_as_used(self, callback_task_id: str):
  198. """标记任务为已使用"""
  199. try:
  200. if self.use_redis:
  201. # 遍历所有任务键,查找匹配的callback_task_id
  202. keys = self.redis_client.keys("task:*")
  203. for key in keys:
  204. task_info = self.redis_client.get(key)
  205. if task_info:
  206. task_data = json.loads(task_info)
  207. if task_data.get("callback_task_id") == callback_task_id:
  208. # 更新used字段为True
  209. task_data["used"] = True
  210. self.redis_client.setex(
  211. key,
  212. 3600, # 1小时
  213. json.dumps(task_data, ensure_ascii=False)
  214. )
  215. logger.info(f"任务已标记为使用: {callback_task_id}")
  216. return
  217. else:
  218. # 内存模式
  219. for file_id, task_info in self.task_cache.items():
  220. if task_info.get("callback_task_id") == callback_task_id:
  221. task_info["used"] = True
  222. logger.info(f"任务已标记为使用: {callback_task_id}")
  223. return
  224. except Exception as e:
  225. logger.error(f"标记任务使用状态失败: {str(e)}")