Ver Fonte

v0.0.3-重复任务、过期任务检查

WangXuMing há 3 meses atrás
pai
commit
fa1829e8b0

+ 1 - 1
config/config.ini

@@ -1,7 +1,7 @@
 
 
 [model]
-MODEL_TYPE=qwen_local_1.5b
+MODEL_TYPE=qwen_local_14b
 
 
 

+ 96 - 1
core/base/redis_duplicate_checker.py

@@ -87,6 +87,7 @@ class RedisDuplicateChecker:
             task_data = {
                 "callback_task_id": callback_task_id,
                 "created_at": datetime.now().isoformat(),
+                "used": False,  # 标记任务是否已被使用启动审查
                 "file_info": serializable_file_info
             }
 
@@ -121,6 +122,37 @@ class RedisDuplicateChecker:
         except Exception as e:
             logger.error(f"取消注册任务失败: {str(e)}")
 
+    async def is_valid_task_id(self, callback_task_id: str) -> bool:
+        """验证任务ID是否存在且未过期"""
+        try:
+            if self.use_redis:
+                # 遍历所有任务键,查找匹配的callback_task_id
+                keys = self.redis_client.keys("task:*")
+                for key in keys:
+                    task_info = self.redis_client.get(key)
+                    if task_info:
+                        task_data = json.loads(task_info)
+                        if task_data.get("callback_task_id") == callback_task_id:
+                            created_at = datetime.fromisoformat(task_data['created_at'])
+                            if datetime.now() - created_at < timedelta(minutes=2):
+                                return True
+                            else:
+                                # 任务已过期,清理
+                                self.redis_client.delete(key)
+                return False
+            else:
+                # 内存模式检查
+                for file_id, task_info in self.task_cache.items():
+                    if task_info.get("callback_task_id") == callback_task_id:
+                        created_at = datetime.fromisoformat(task_info['created_at'])
+                        if datetime.now() - created_at < timedelta(minutes=2):
+                            return True
+                return False
+
+        except Exception as e:
+            logger.error(f"验证任务ID失败: {str(e)}")
+            return False
+
     async def get_task_info(self, file_id: str) -> str:
         """获取任务信息"""
         try:
@@ -158,4 +190,67 @@ class RedisDuplicateChecker:
                     logger.info(f"清理过期缓存: {len(expired_files)} 个文件")
 
         except Exception as e:
-            logger.error(f"清理过期缓存失败: {str(e)}")
+            logger.error(f"清理过期缓存失败: {str(e)}")
+
+    async def is_task_already_used(self, callback_task_id: str) -> bool:
+        """检查任务是否已经被使用启动审查"""
+        try:
+            if self.use_redis:
+                # 遍历所有任务键,查找匹配的callback_task_id
+                keys = self.redis_client.keys("task:*")
+                for key in keys:
+                    task_info = self.redis_client.get(key)
+                    if task_info:
+                        task_data = json.loads(task_info)
+                        if task_data.get("callback_task_id") == callback_task_id:
+                            # 检查任务是否已被使用
+                            if task_data.get("used", False):
+                                logger.info(f"任务已被使用: {callback_task_id}")
+                                return True
+                            else:
+                                return False
+                return False
+            else:
+                # 内存模式检查
+                for file_id, task_info in self.task_cache.items():
+                    if task_info.get("callback_task_id") == callback_task_id:
+                        if task_info.get("used", False):
+                            return True
+                        else:
+                            return False
+                return False
+
+        except Exception as e:
+            logger.error(f"检查任务使用状态失败: {str(e)}")
+            return False
+
+    async def mark_task_as_used(self, callback_task_id: str):
+        """标记任务为已使用"""
+        try:
+            if self.use_redis:
+                # 遍历所有任务键,查找匹配的callback_task_id
+                keys = self.redis_client.keys("task:*")
+                for key in keys:
+                    task_info = self.redis_client.get(key)
+                    if task_info:
+                        task_data = json.loads(task_info)
+                        if task_data.get("callback_task_id") == callback_task_id:
+                            # 更新used字段为True
+                            task_data["used"] = True
+                            self.redis_client.setex(
+                                key,
+                                3600,  # 1小时
+                                json.dumps(task_data, ensure_ascii=False)
+                            )
+                            logger.info(f"任务已标记为使用: {callback_task_id}")
+                            return
+            else:
+                # 内存模式
+                for file_id, task_info in self.task_cache.items():
+                    if task_info.get("callback_task_id") == callback_task_id:
+                        task_info["used"] = True
+                        logger.info(f"任务已标记为使用: {callback_task_id}")
+                        return
+
+        except Exception as e:
+            logger.error(f"标记任务使用状态失败: {str(e)}")

+ 2 - 4
core/base/workflow_manager.py

@@ -103,7 +103,6 @@ class WorkflowManager:
         """同步提交任务处理(用于Celery worker)"""
         try:
 
-
             logger.info(f"提交文档处理任务: {file_info['file_id']}")
 
             # 1. 生成任务链ID
@@ -119,11 +118,10 @@ class WorkflowManager:
                 created_at=datetime.now()
             )
 
-            # 4. 注册任务
-            asyncio.run(self.redis_duplicate_checker.register_task(file_info, callback_task_id))
+            # 3. 添加到活跃任务跟踪
             self.active_chains[callback_task_id] = task_chain
 
-            # 5. 初始化进度管理
+            # 4. 初始化进度管理
             asyncio.run(self.progress_manager.initialize_progress(
                 callback_task_id=callback_task_id,
                 user_id=file_info.get('user_id', 'default_user'),

Diff do ficheiro suprimidas por serem muito extensas
+ 1 - 2
temp/AI审查结果.json


+ 8 - 0
views/construction_review/file_upload.py

@@ -192,6 +192,14 @@ async def file_upload(
             logger.warning(f"缓存文件信息到Redis失败: {str(e)}")
             # 不影响主流程,继续处理
 
+        # 预注册任务到重复检查器,以便启动审查时验证任务ID
+        try:
+            await duplicatechecker.register_task(file_info, callback_task_id)
+            logger.info(f"任务已预注册: {callback_task_id}")
+        except Exception as e:
+            logger.error(f"任务预注册失败: {str(e)}")
+            # 预注册失败不应影响文件上传成功
+
         # try:
             # # 提交处理任务到工作流管理器
             # await workflow_manager.submit_task_processing(file_info)

+ 13 - 33
views/construction_review/launch_review.py

@@ -187,7 +187,7 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
 
                 # 从callback_task_id中提取file_id (格式: file_id-timestamp)
                 file_id = callback_task_id.rsplit('-', 1)[0] if '-' in callback_task_id else callback_task_id
-
+                logger.info(f"处理文件: {file_id}")
                 # 发送处理状态
                 status_data = json.dumps({
                     "callback_task_id": callback_task_id,
@@ -197,16 +197,16 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
                 }, ensure_ascii=False)
                 yield format_sse_event("processing", status_data)
 
-                # 检查重复任务
-                if await duplicatechecker.is_duplicate_task(file_id):
-                    error_data = json.dumps({
-                        "callback_task_id": callback_task_id,
-                        "error": "task_already_exists",
-                        "message": "任务已存在,请勿重复提交",
-                        "timestamp": datetime.now().isoformat()
-                    }, ensure_ascii=False)
-                    yield format_sse_event("error", error_data)
-                    return
+                # 验证任务ID是否存在且未过期
+                if not await duplicatechecker.is_valid_task_id(callback_task_id):
+                    raise LaunchReviewErrors.task_not_found_or_expired()
+
+                # 检查任务是否已经被使用启动审查
+                if await duplicatechecker.is_task_already_used(callback_task_id):
+                    raise LaunchReviewErrors.task_already_exists()
+
+                # 标记任务为已使用
+                await duplicatechecker.mark_task_as_used(callback_task_id)
 
                 # 获取文件信息
                 status_data = json.dumps({
@@ -219,28 +219,9 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
 
                 file_info = await get_file_info(file_id, include_content=True)
 
-
-
                 if not file_info:
-                    error_data = json.dumps({
-                        "callback_task_id": callback_task_id,
-                        "error": "task_not_found",
-                        "message": "任务ID不存在或已过期",
-                        "timestamp": datetime.now().isoformat()
-                    }, ensure_ascii=False)
-                    yield format_sse_event("error", error_data)
-                    return
-
-                # 验证必要的字段
-                if 'file_content' not in file_info:
-                    error_data = json.dumps({
-                        "callback_task_id": callback_task_id,
-                        "error": "missing_content",
-                        "message": "文件内容缺失",
-                        "timestamp": datetime.now().isoformat()
-                    }, ensure_ascii=False)
-                    yield format_sse_event("error", error_data)
-                    return
+                    logger.error(f"文件信息获取失败: {file_id}")
+                    raise LaunchReviewErrors.file_info_not_found()
 
                 # 添加审查配置到文件信息
                 file_info.update({
@@ -266,7 +247,6 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
                 # 发送成功启动状态
                 success_data = json.dumps({
                     "callback_task_id": callback_task_id,
-                    "task_id": task_id,
                     "file_id": file_info['file_id'],
                     "review_config": review_config,
                     "project_plan_type": project_plan_type,

+ 24 - 0
views/construction_review/schemas/error_schemas.py

@@ -158,6 +158,20 @@ class ErrorCodes:
         "status_code": 500
     }
 
+    QDSC012 = {
+        "code": "QDSC012",
+        "error_type": "TASK_NOT_FOUND_OR_EXPIRED",
+        "message": "任务ID不存在或已过期,请重新检查callback_task_id是否正确,或重新上传文件",
+        "status_code": 404
+    }
+
+    QDSC013 = {
+        "code": "QDSC013",
+        "error_type": "FILE_INFO_NOT_FOUND",
+        "message": "文件信息获取失败,任务ID不存在或已过期",
+        "status_code": 404
+    }
+
 
     # 审查结果接口错误码 (SCJG001-SCJG008)
     SCJG001 = {
@@ -384,6 +398,16 @@ class LaunchReviewErrors:
         logger.error(ErrorCodes.QDSC011)
         return create_server_error("QDSC011", original_error)
 
+    @staticmethod
+    def task_not_found_or_expired():
+        logger.error(ErrorCodes.QDSC012)
+        return create_http_exception(ErrorCodes.QDSC012)
+
+    @staticmethod
+    def file_info_not_found():
+        logger.error(ErrorCodes.QDSC013)
+        return create_http_exception(ErrorCodes.QDSC013)
+
 
 class ReviewResultsErrors:
     """审查结果接口错误"""

+ 3 - 1
views/construction_review/task_progress.py

@@ -140,7 +140,9 @@ async def sse_progress_stream(
                                     completion_json = json.dumps(completion_data, ensure_ascii=False)
                                     yield format_sse_event("completed", completion_json)
 
-                                    logger.info(f"全部任务完成,断开SSE连接: {callback_task_id}, 状态: {overall_task_status}")
+                                    #sse_callback_manager.unregister_callback(callback_task_id)
+                                    await sse_manager.disconnect(callback_task_id)
+                                    logger.info(f"全部任务完成,SSE连接已关闭: {callback_task_id}, 状态: {overall_task_status}")
                                     break
 
                         elif message.get("type") == "connection_established":

Alguns ficheiros não foram mostrados porque muitos ficheiros mudaram neste diff