""" 基于Redis的重复任务检查器 支持多进程间的重复任务检查 """ import os import json from datetime import datetime, timedelta import redis from foundation.observability.logger.loggering import review_logger as logger class RedisDuplicateChecker: """基于Redis的重复任务检查器""" def __init__(self): try: # 从配置文件读取Redis连接信息 from foundation.infrastructure.config import config_handler redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost') redis_port = config_handler.get('redis', 'REDIS_PORT', '6379') redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '') # 从配置文件获取数据库编号 redis_db = config_handler.get('redis', 'REDIS_DB', '0') # 构建Redis连接URL (使用配置文件的数据库) if redis_password: redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}" else: redis_url = f"redis://{redis_host}:{redis_port}/{redis_db}" logger.info(f"连接Redis: {redis_url}") # 连接Redis self.redis_client = redis.from_url(redis_url, decode_responses=True) # 测试连接 self.redis_client.ping() logger.info("Redis重复检查器连接成功") self.use_redis = True except Exception as e: logger.error(f"Redis连接失败,回退到内存模式: {str(e)}") # 回退到内存模式 self.task_cache = {} self.use_redis = False else: self.use_redis = True async def is_duplicate_task(self, file_id: str) -> bool: """检查是否为重复任务""" try: if self.use_redis: # 使用Redis检查 task_info = self.redis_client.get(f"task:{file_id}") if task_info: # 检查任务是否过期 task_data = json.loads(task_info) created_at = datetime.fromisoformat(task_data['created_at']) if datetime.now() - created_at < timedelta(minutes=2): logger.info(f"发现重复任务: {file_id}") return True else: # 任务已过期,清理 self.redis_client.delete(f"task:{file_id}") return False return False else: # 回退到内存模式 if file_id in self.task_cache: logger.info(f"发现重复任务: {file_id}") return True return False except Exception as e: logger.error(f"检查重复任务失败: {str(e)}") return False async def register_task(self, file_info: dict, callback_task_id: str): """注册任务""" try: # 过滤掉不可序列化的字段(如file_content等bytes数据) serializable_file_info = { k: v for k, v in file_info.items() if k not in ['file_content'] and not isinstance(v, bytes) } task_data = { "callback_task_id": callback_task_id, "created_at": datetime.now().isoformat(), "used": False, # 标记任务是否已被使用启动审查 "file_info": serializable_file_info } if self.use_redis: # 使用Redis存储,设置1小时过期 self.redis_client.setex( f"task:{file_info['file_id']}", 3600, # 1小时 json.dumps(task_data, ensure_ascii=False) ) else: # 回退到内存模式 self.task_cache[file_info['file_id']] = task_data logger.info(f"注册任务: {file_info['file_id']} -> {callback_task_id}") except Exception as e: logger.error(f"注册任务失败: {str(e)}") raise async def unregister_task(self, file_id: str): """取消注册任务""" try: if self.use_redis: self.redis_client.delete(f"task:{file_id}") else: if file_id in self.task_cache: del self.task_cache[file_id] logger.info(f"取消注册任务: {file_id}") except Exception as e: logger.error(f"取消注册任务失败: {str(e)}") async def is_valid_task_id(self, callback_task_id: str) -> bool: """验证任务ID是否存在且未过期""" try: if self.use_redis: # 遍历所有任务键,查找匹配的callback_task_id keys = self.redis_client.keys("task:*") for key in keys: task_info = self.redis_client.get(key) if task_info: task_data = json.loads(task_info) if task_data.get("callback_task_id") == callback_task_id: created_at = datetime.fromisoformat(task_data['created_at']) if datetime.now() - created_at < timedelta(hours=1): return True else: # 任务已过期,清理 self.redis_client.delete(key) return False else: # 内存模式检查 for file_id, task_info in self.task_cache.items(): if task_info.get("callback_task_id") == callback_task_id: created_at = datetime.fromisoformat(task_info['created_at']) if datetime.now() - created_at < timedelta(hours=1): return True return False except Exception as e: logger.error(f"验证任务ID失败: {str(e)}") return False async def get_task_info(self, file_id: str) -> str: """获取任务信息""" try: if self.use_redis: task_info = self.redis_client.get(f"task:{file_id}") if task_info: task_data = json.loads(task_info) return task_data.get("callback_task_id", "") return "" else: if file_id in self.task_cache: return self.task_cache[file_id].get("callback_task_id", "") return "" except Exception as e: logger.error(f"获取任务信息失败: {str(e)}") return "" def cleanup_expired_cache(self): """清理过期缓存(Redis自动处理)""" try: if not self.use_redis: current_time = datetime.now() expired_files = [] for file_id, task_info in list(self.task_cache.items()): created_at = datetime.fromisoformat(task_info['created_at']) if current_time - created_at > timedelta(hours=1): expired_files.append(file_id) for file_id in expired_files: del self.task_cache[file_id] if expired_files: logger.info(f"清理过期缓存: {len(expired_files)} 个文件") except Exception as e: logger.error(f"清理过期缓存失败: {str(e)}") async def is_task_already_used(self, callback_task_id: str) -> bool: """检查任务是否已经被使用启动审查""" try: if self.use_redis: # 遍历所有任务键,查找匹配的callback_task_id keys = self.redis_client.keys("task:*") for key in keys: task_info = self.redis_client.get(key) if task_info: task_data = json.loads(task_info) if task_data.get("callback_task_id") == callback_task_id: # 检查任务是否已被使用 if task_data.get("used", False): logger.info(f"任务已被使用: {callback_task_id}") return True else: return False return False else: # 内存模式检查 for file_id, task_info in self.task_cache.items(): if task_info.get("callback_task_id") == callback_task_id: if task_info.get("used", False): return True else: return False return False except Exception as e: logger.error(f"检查任务使用状态失败: {str(e)}") return False async def mark_task_as_used(self, callback_task_id: str): """标记任务为已使用""" try: if self.use_redis: # 遍历所有任务键,查找匹配的callback_task_id keys = self.redis_client.keys("task:*") for key in keys: task_info = self.redis_client.get(key) if task_info: task_data = json.loads(task_info) if task_data.get("callback_task_id") == callback_task_id: # 更新used字段为True task_data["used"] = True self.redis_client.setex( key, 3600, # 1小时 json.dumps(task_data, ensure_ascii=False) ) logger.info(f"任务已标记为使用: {callback_task_id}") return else: # 内存模式 for file_id, task_info in self.task_cache.items(): if task_info.get("callback_task_id") == callback_task_id: task_info["used"] = True logger.info(f"任务已标记为使用: {callback_task_id}") return except Exception as e: logger.error(f"标记任务使用状态失败: {str(e)}")