| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- """
- 基于Redis的重复任务检查器
- 支持多进程间的重复任务检查
- """
- import os
- import json
- from datetime import datetime, timedelta
- import redis
- from foundation.logger.loggering import server_logger as logger
- class RedisDuplicateChecker:
- """基于Redis的重复任务检查器"""
- def __init__(self):
- try:
- # 从配置文件读取Redis连接信息
- from foundation.base.config import config_handler
- redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
- redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
- redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
- # 从配置文件获取数据库编号
- redis_db = config_handler.get('redis', 'REDIS_DB', '0')
- # 构建Redis连接URL (使用配置文件的数据库)
- if redis_password:
- redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}"
- else:
- redis_url = f"redis://{redis_host}:{redis_port}/{redis_db}"
- logger.info(f"连接Redis: {redis_url}")
- # 连接Redis
- self.redis_client = redis.from_url(redis_url, decode_responses=True)
- # 测试连接
- self.redis_client.ping()
- logger.info("Redis重复检查器连接成功")
- self.use_redis = True
- except Exception as e:
- logger.error(f"Redis连接失败,回退到内存模式: {str(e)}")
- # 回退到内存模式
- self.task_cache = {}
- self.use_redis = False
- else:
- self.use_redis = True
- async def is_duplicate_task(self, file_id: str) -> bool:
- """检查是否为重复任务"""
- try:
- if self.use_redis:
- # 使用Redis检查
- task_info = self.redis_client.get(f"task:{file_id}")
- if task_info:
- # 检查任务是否过期
- task_data = json.loads(task_info)
- created_at = datetime.fromisoformat(task_data['created_at'])
- if datetime.now() - created_at < timedelta(minutes=2):
- logger.info(f"发现重复任务: {file_id}")
- return True
- else:
- # 任务已过期,清理
- self.redis_client.delete(f"task:{file_id}")
- return False
- return False
- else:
- # 回退到内存模式
- if file_id in self.task_cache:
- logger.info(f"发现重复任务: {file_id}")
- return True
- return False
- except Exception as e:
- logger.error(f"检查重复任务失败: {str(e)}")
- return False
- async def register_task(self, file_info: dict, callback_task_id: str):
- """注册任务"""
- try:
- # 过滤掉不可序列化的字段(如file_content等bytes数据)
- serializable_file_info = {
- k: v for k, v in file_info.items()
- if k not in ['file_content'] and not isinstance(v, bytes)
- }
- task_data = {
- "callback_task_id": callback_task_id,
- "created_at": datetime.now().isoformat(),
- "used": False, # 标记任务是否已被使用启动审查
- "file_info": serializable_file_info
- }
- if self.use_redis:
- # 使用Redis存储,设置1小时过期
- self.redis_client.setex(
- f"task:{file_info['file_id']}",
- 3600, # 1小时
- json.dumps(task_data, ensure_ascii=False)
- )
- else:
- # 回退到内存模式
- self.task_cache[file_info['file_id']] = task_data
- logger.info(f"注册任务: {file_info['file_id']} -> {callback_task_id}")
- except Exception as e:
- logger.error(f"注册任务失败: {str(e)}")
- raise
- async def unregister_task(self, file_id: str):
- """取消注册任务"""
- try:
- if self.use_redis:
- self.redis_client.delete(f"task:{file_id}")
- else:
- if file_id in self.task_cache:
- del self.task_cache[file_id]
- logger.info(f"取消注册任务: {file_id}")
- except Exception as e:
- logger.error(f"取消注册任务失败: {str(e)}")
- async def is_valid_task_id(self, callback_task_id: str) -> bool:
- """验证任务ID是否存在且未过期"""
- try:
- if self.use_redis:
- # 遍历所有任务键,查找匹配的callback_task_id
- keys = self.redis_client.keys("task:*")
- for key in keys:
- task_info = self.redis_client.get(key)
- if task_info:
- task_data = json.loads(task_info)
- if task_data.get("callback_task_id") == callback_task_id:
- created_at = datetime.fromisoformat(task_data['created_at'])
- if datetime.now() - created_at < timedelta(minutes=2):
- return True
- else:
- # 任务已过期,清理
- self.redis_client.delete(key)
- return False
- else:
- # 内存模式检查
- for file_id, task_info in self.task_cache.items():
- if task_info.get("callback_task_id") == callback_task_id:
- created_at = datetime.fromisoformat(task_info['created_at'])
- if datetime.now() - created_at < timedelta(minutes=2):
- return True
- return False
- except Exception as e:
- logger.error(f"验证任务ID失败: {str(e)}")
- return False
- async def get_task_info(self, file_id: str) -> str:
- """获取任务信息"""
- try:
- if self.use_redis:
- task_info = self.redis_client.get(f"task:{file_id}")
- if task_info:
- task_data = json.loads(task_info)
- return task_data.get("callback_task_id", "")
- return ""
- else:
- if file_id in self.task_cache:
- return self.task_cache[file_id].get("callback_task_id", "")
- return ""
- except Exception as e:
- logger.error(f"获取任务信息失败: {str(e)}")
- return ""
- def cleanup_expired_cache(self):
- """清理过期缓存(Redis自动处理)"""
- try:
- if not self.use_redis:
- current_time = datetime.now()
- expired_files = []
- for file_id, task_info in list(self.task_cache.items()):
- created_at = datetime.fromisoformat(task_info['created_at'])
- if current_time - created_at > timedelta(hours=1):
- expired_files.append(file_id)
- for file_id in expired_files:
- del self.task_cache[file_id]
- if expired_files:
- logger.info(f"清理过期缓存: {len(expired_files)} 个文件")
- except Exception as e:
- logger.error(f"清理过期缓存失败: {str(e)}")
- async def is_task_already_used(self, callback_task_id: str) -> bool:
- """检查任务是否已经被使用启动审查"""
- try:
- if self.use_redis:
- # 遍历所有任务键,查找匹配的callback_task_id
- keys = self.redis_client.keys("task:*")
- for key in keys:
- task_info = self.redis_client.get(key)
- if task_info:
- task_data = json.loads(task_info)
- if task_data.get("callback_task_id") == callback_task_id:
- # 检查任务是否已被使用
- if task_data.get("used", False):
- logger.info(f"任务已被使用: {callback_task_id}")
- return True
- else:
- return False
- return False
- else:
- # 内存模式检查
- for file_id, task_info in self.task_cache.items():
- if task_info.get("callback_task_id") == callback_task_id:
- if task_info.get("used", False):
- return True
- else:
- return False
- return False
- except Exception as e:
- logger.error(f"检查任务使用状态失败: {str(e)}")
- return False
- async def mark_task_as_used(self, callback_task_id: str):
- """标记任务为已使用"""
- try:
- if self.use_redis:
- # 遍历所有任务键,查找匹配的callback_task_id
- keys = self.redis_client.keys("task:*")
- for key in keys:
- task_info = self.redis_client.get(key)
- if task_info:
- task_data = json.loads(task_info)
- if task_data.get("callback_task_id") == callback_task_id:
- # 更新used字段为True
- task_data["used"] = True
- self.redis_client.setex(
- key,
- 3600, # 1小时
- json.dumps(task_data, ensure_ascii=False)
- )
- logger.info(f"任务已标记为使用: {callback_task_id}")
- return
- else:
- # 内存模式
- for file_id, task_info in self.task_cache.items():
- if task_info.get("callback_task_id") == callback_task_id:
- task_info["used"] = True
- logger.info(f"任务已标记为使用: {callback_task_id}")
- return
- except Exception as e:
- logger.error(f"标记任务使用状态失败: {str(e)}")
|