فهرست منبع

v0.0.3-修复celery任务启动清理

WangXuMing 3 ماه پیش
والد
کامیت
222e925b8a

+ 6 - 3
core/base/redis_duplicate_checker.py

@@ -21,11 +21,14 @@ class RedisDuplicateChecker:
             redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
             redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
 
-            # 构建Redis连接URL
+            # 从配置文件获取数据库编号
+            redis_db = config_handler.get('redis', 'REDIS_DB', '0')
+
+            # 构建Redis连接URL (使用配置文件的数据库)
             if redis_password:
-                redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/2"
+                redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}"
             else:
-                redis_url = f"redis://{redis_host}:{redis_port}/2"
+                redis_url = f"redis://{redis_host}:{redis_port}/{redis_db}"
 
             logger.info(f"连接Redis: {redis_url}")
 

+ 3 - 3
foundation/base/celery_app.py

@@ -14,13 +14,13 @@ from foundation.trace.celery_trace import init
 redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
 redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
 redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
-
+redis_db = config_handler.get('redis', 'REDIS_DB', '0')
 
 # 构建Redis连接URL
 if redis_password:
-    redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/0"
+    redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}"
 else:
-    redis_url = f"redis://{redis_host}:{redis_port}/0"
+    redis_url = f"redis://{redis_host}:{redis_port}/{redis_db}"
 
 print(f"Connecting to Redis: {redis_url}")
 

+ 11 - 3
foundation/utils/redis_utils.py

@@ -68,7 +68,7 @@ async def get_redis_result_cache_data_and_delete_key(data_type: str , trace_id:
 
 
 @track_execution_time
-async def store_file_info(file_id: str, file_info: Dict[str, Any], expire_seconds: int = 3600) -> bool:
+async def store_file_info(file_id: str, file_info: Dict[str, Any], expire_seconds: int = 3600, force_update: bool = False) -> bool:
     """
     存储文件信息(直接存储模式)
 
@@ -76,6 +76,7 @@ async def store_file_info(file_id: str, file_info: Dict[str, Any], expire_second
         file_id: 文件ID
         file_info: 文件信息字典
         expire_seconds: 过期时间(秒),默认1小时
+        force_update: 是否强制更新已存在的文件信息
 
     Returns:
         bool: 存储是否成功
@@ -83,10 +84,17 @@ async def store_file_info(file_id: str, file_info: Dict[str, Any], expire_second
     try:
         redis_store = await RedisConnectionFactory.get_redis_store()
 
-        # 检查是否已存在,避免重复存储
+        # 检查是否已存在,如果存在则更新callback_task_id
         existing_meta = await redis_store.get(f"meta:{file_id}")
         if existing_meta:
-            server_logger.info(f"文件信息已存在,跳过存储: {file_id}")
+            # 解析现有元数据
+            existing_file_info = json.loads(existing_meta.decode('utf-8'))
+            # 更新callback_task_id为最新的
+            existing_file_info['callback_task_id'] = file_info.get('callback_task_id', existing_file_info['callback_task_id'])
+
+            # 存储更新后的元数据
+            await redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(existing_file_info))
+            server_logger.info(f"文件信息已存在,更新callback_task_id: {file_id} -> {existing_file_info['callback_task_id']}")
             return True
 
         # 提取文件内容

+ 86 - 15
server/app.py

@@ -274,22 +274,52 @@ def cleanup_redis_before_start():
         redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
         redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
         redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
+        redis_db = config_handler.get('redis', 'REDIS_DB', '0')
 
         if redis_password:
-            redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/0'
+            redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}'
         else:
-            redis_url = f'redis://{redis_host}:{redis_port}/0'
+            redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}'
 
         r = redis.from_url(redis_url, decode_responses=True)
 
         server_logger.info("清理Redis中的残留Celery任务...")
 
-        # 清理所有Celery相关的键
+        # 清理所有Celery相关的键,包括更多模式
         keys_to_delete = []
         for key in r.keys():
-            if any(keyword in key.lower() for keyword in ['celery', 'task:']):
+            key_lower = key.lower()
+            # 扩展匹配模式,包括你遇到的实际键格式
+            if any(keyword in key_lower for keyword in [
+                'celery', 'task:', 'celery-task', 'kombu', 'current:'
+            ]):
                 keys_to_delete.append(key)
+            # 匹配特定模式
+            elif key.startswith('celery-task-meta-') or key.startswith('current:'):
+                keys_to_delete.append(key)
+            # 临时键
+            elif key == 't_key':
+                keys_to_delete.append(key)
+
+        # 清理消息队列
+        try:
+            # 清理所有Celery队列
+            queues = ['celery', 'celery.pidbox', 'celeryev']
+            for queue in queues:
+                # 删除队列
+                r.delete(queue)
+                server_logger.debug(f"已清理队列: {queue}")
+
+            # 清理Kombu绑定
+            kombu_keys = r.keys('_kombu.binding.*')
+            for key in kombu_keys:
+                r.delete(key)
+                server_logger.debug(f"已清理Kombu绑定: {key}")
 
+        except Exception as e:
+            server_logger.warning(f"清理队列失败: {e}")
+
+        # 清理识别到的键
         if keys_to_delete:
             for key in keys_to_delete:
                 try:
@@ -302,6 +332,26 @@ def cleanup_redis_before_start():
         else:
             server_logger.info("没有发现需要清理的残留任务")
 
+        # 额外检查:确保关键队列被清空
+        try:
+            # 使用FLUSHDB只清空Celery相关的数据,而不是整个数据库
+            # 这里我们检查是否还有残留,如果有则进行更彻底的清理
+            remaining_keys = []
+            for key in r.keys():
+                if any(pattern in key.lower() for pattern in ['celery', 'kombu']):
+                    remaining_keys.append(key)
+
+            if remaining_keys:
+                server_logger.warning(f"发现 {len(remaining_keys)} 个残留键,进行彻底清理")
+                for key in remaining_keys:
+                    try:
+                        r.delete(key)
+                        server_logger.debug(f"彻底清理: {key}")
+                    except Exception as e:
+                        server_logger.warning(f"彻底清理 {key} 失败: {e}")
+        except Exception as e:
+            server_logger.warning(f"彻底清理检查失败: {e}")
+
         return True
 
     except Exception as e:
@@ -325,33 +375,54 @@ def stop_celery_worker():
     """停止Celery Worker"""
     global celery_manager
 
-    # 立即取消所有任务注册
+    # 立即取消所有任务注册(使用DB0,与启动时保持一致)
     try:
         import redis
         from foundation.base.config import config_handler
 
-        # 连接Redis
+        # 连接Redis(使用配置文件的数据库)
         redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
         redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
         redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
+        redis_db = config_handler.get('redis', 'REDIS_DB', '0')
 
         if redis_password:
-            redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/2'
+            redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}'
         else:
-            redis_url = f'redis://{redis_host}:{redis_port}/2'
+            redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}'
 
         r = redis.from_url(redis_url, decode_responses=True)
 
-        # 清理所有任务注册
-        task_keys = r.keys('task:*')
-        for key in task_keys:
-            r.delete(key)
-            server_logger.info(f"取消任务注册: {key}")
+        server_logger.info("停止时清理Redis中的Celery任务...")
+
+        # 清理任务相关键
+        task_keys = r.keys('task:*')  # 重复任务检查器的数据
+        celery_meta_keys = r.keys('celery-task-meta-*')
+        current_keys = r.keys('current:*')
+        kombu_keys = r.keys('_kombu.binding.*')
+
+        all_keys = task_keys + celery_meta_keys + current_keys + kombu_keys
+
+        for key in all_keys:
+            try:
+                r.delete(key)
+                server_logger.debug(f"停止时清理: {key}")
+            except Exception as e:
+                server_logger.warning(f"停止时清理 {key} 失败: {e}")
+
+        # 清理队列
+        queues = ['celery', 'celery.pidbox', 'celeryev']
+        for queue in queues:
+            try:
+                r.delete(queue)
+                server_logger.debug(f"停止时清理队列: {queue}")
+            except Exception as e:
+                server_logger.warning(f"停止时清理队列 {queue} 失败: {e}")
 
-        server_logger.info(f"已取消 {len(task_keys)} 个任务注册")
+        server_logger.info(f"停止时已清理 {len(all_keys)} 个Redis键")
 
     except Exception as e:
-        server_logger.error(f"取消任务注册失败: {e}")
+        server_logger.error(f"停止时清理Redis任务失败: {e}")
 
     # 立即停止Worker,不等待
     return celery_manager.stop_worker_immediately()

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 3 - 4
temp/AI审查结果.json


+ 6 - 4
views/construction_review/app.py

@@ -259,11 +259,12 @@ def cleanup_redis_before_start():
         redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
         redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
         redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
+        redis_db = config_handler.get('redis', 'REDIS_DB', '0')
 
         if redis_password:
-            redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/0'
+            redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}'
         else:
-            redis_url = f'redis://{redis_host}:{redis_port}/0'
+            redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}'
 
         r = redis.from_url(redis_url, decode_responses=True)
 
@@ -320,11 +321,12 @@ def stop_celery_worker():
         redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
         redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
         redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
+        redis_db = config_handler.get('redis', 'REDIS_DB', '0')
 
         if redis_password:
-            redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/2'
+            redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}'
         else:
-            redis_url = f'redis://{redis_host}:{redis_port}/2'
+            redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}'
 
         r = redis.from_url(redis_url, decode_responses=True)
 

+ 11 - 2
views/construction_review/launch_review.py

@@ -224,11 +224,20 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
                     logger.error(f"文件信息获取失败: {file_id}")
                     raise LaunchReviewErrors.file_info_not_found()
 
-                # 添加审查配置到文件信息
+                # 立即更新Redis中的callback_task_id为当前值
+                try:
+                    from foundation.utils.redis_utils import store_file_info
+                    await store_file_info(file_id, {'callback_task_id': callback_task_id})
+                    logger.info(f"已更新Redis中的callback_task_id: {callback_task_id}")
+                except Exception as e:
+                    logger.warning(f"更新Redis中的callback_task_id失败: {e}")
+
+                # 添加审查配置到文件信息,并确保使用当前正确的callback_task_id
                 file_info.update({
                     'review_config': review_config,
                     'project_plan_type': project_plan_type,
-                    'launched_at': int(time.time())
+                    'launched_at': int(time.time()),
+                    'callback_task_id': callback_task_id  # 确保使用当前正确的callback_task_id
                 })
 
 

برخی فایل ها در این مقایسه diff نمایش داده نمی شوند زیرا تعداد فایل ها بسیار زیاد است