Procházet zdrojové kódy

v0.0.3-2025年11月27日16:04更新记录
-修复文件内容redis ttl时间未同步更新BUG
- 增加倾向性审查字段,与API文档同步
- 增加sse响应事件类型字段,与API文档同步
- 增加审查任务完成时,同步关闭sse链接功能,避免死链问题
- 统一sse响应字段,与API文档同步

WangXuMing před 3 měsíci
rodič
revize
f8be82b9b1

binární
build_graph_app.png


+ 1 - 1
config/config.ini

@@ -29,7 +29,7 @@ QWEN_API_KEY=ms-9ad4a379-d592-4acd-b92c-8bac08a4a045
 
 [ai_review]
 # 调试模式配置
-MAX_REVIEW_UNITS=3
+MAX_REVIEW_UNITS=1
 REVIEW_MODE=random
 # REVIEW_MODE=all/random/first
 

+ 26 - 38
core/base/progress_manager.py

@@ -123,7 +123,7 @@ class ProgressManager:
             logger.error(f"初始化进度失败: {str(e)}")
             raise
 
-    async def update_stage_progress(self, callback_task_id: str, stage_name: str = None, current: int = None, status: str = None, message: str = None, issues=None, user_id: str = None, overall_task_status: str = None):
+    async def update_stage_progress(self, callback_task_id: str, stage_name: str = None, current: int = None, status: str = None, message: str = None, issues=None, user_id: str = None, overall_task_status: str = None, event_type: str = "processing"):
         """更新阶段进度 - 除callback_task_id外,其他参数都可选
 
         Args:
@@ -135,6 +135,7 @@ class ProgressManager:
             issues: 问题列表(可选)
             user_id: 用户ID(可选)
             overall_task_status: 整体任务状态(可选)
+            event_type: SSE事件类型(可选,默认为"processing")
         """
         try:
             task_progress = None
@@ -176,6 +177,11 @@ class ProgressManager:
                 task_progress["overall_task_status"] = overall_task_status
             elif "overall_task_status" not in task_progress:
                 task_progress["overall_task_status"] = "processing"
+            if event_type is not None:
+                task_progress["event_type"] = event_type
+                logger.debug(f"设置event_type: {event_type} for {callback_task_id}")
+            else:
+                logger.debug(f"event_type为None,不设置 for {callback_task_id}")
 
             try:
                 if self.redis_connected:
@@ -208,6 +214,9 @@ class ProgressManager:
             logger.debug(f"触发SSE推送: {callback_task_id}")
             updated_progress = await self.get_progress(callback_task_id)
             issues = task_progress.get("issues")
+            event_type = task_progress.get("event_type", "processing")
+            logger.debug(f"触发SSE回调: {callback_task_id}, event_type: {event_type}")
+
             if updated_progress and issues and len(issues) > 0 and issues[0] != 'clear':
                 await sse_callback_manager.trigger_callback(callback_task_id, updated_progress)
             elif updated_progress and not issues:  # 空列表时也要推送
@@ -267,6 +276,8 @@ class ProgressManager:
             # 添加可选字段
             if "issues" in task_progress:
                 result["issues"] = task_progress["issues"]
+            if "event_type" in task_progress:
+                result["event_type"] = task_progress["event_type"]
 
             return result
 
@@ -274,45 +285,22 @@ class ProgressManager:
             logger.error(f"获取进度失败: {str(e)}")
             return None
 
-    async def complete_task(self, callback_task_id: str):
+    async def complete_task(self, callback_task_id: str, user_id: str = None, overall_task_status: str = "completed", current_data: dict = None):
         """标记任务完成"""
         try:
-            task_progress = None
-            logger.info(f"通知sse连接关闭: {callback_task_id}")
-            if self.redis_connected:
-                redis_key = await self._get_redis_key(callback_task_id)
-                progress_json = self.redis_client.get(redis_key)
-                if progress_json:
-                    task_progress = json.loads(progress_json)
-                else:
-                    logger.warning(f"Redis中未找到任务进度: {callback_task_id}")
-                    return
-            else:
-                # 从内存读取
-                if hasattr(self, 'current_data') and callback_task_id in self.current_data:
-                    task_progress = self.current_data[callback_task_id]
-                else:
-                    logger.warning(f"内存中未找到任务进度: {callback_task_id}")
-                    return
-
-            task_progress["status"] = "completed"
-            task_progress["overall_task_status"] = "completed"
-            task_progress["message"] = "施工方案审查任务已完成!"
-            task_progress["updated_at"] = datetime.now().isoformat()
-
-
-            # 保存更新后的数据
-            if self.redis_connected:
-                self.redis_client.setex(
-                    redis_key,
-                    3600,
-                    json.dumps(task_progress)
-                )
-            else:
-                if hasattr(self, 'current_data'):
-                    self.current_data[callback_task_id] = task_progress
-
-            # 触发SSE进度更新推送
+            # 使用update_stage_progress方法更新响应数据
+            await self.update_stage_progress(
+                callback_task_id=callback_task_id,
+                user_id=user_id,
+                current=current_data.get("current", 100) if current_data else 100,
+                stage_name="审查完成",
+                status="completed",
+                message="施工审查方案处理完成!",
+                overall_task_status=overall_task_status,
+                issues=current_data.get("issues", []) if current_data else []
+            )
+
+            logger.info(f"任务关闭: {callback_task_id}")
             completed_progress = await self.get_progress(callback_task_id)
             if completed_progress:
                 await sse_callback_manager.trigger_callback(callback_task_id, completed_progress)

+ 2 - 2
core/base/workflow_manager.py

@@ -226,7 +226,7 @@ class WorkflowManager:
             # 清理任务注册
             asyncio.run(self.redis_duplicate_checker.unregister_task(task_chain.file_id))
             # 通知SSE连接任务完成
-            asyncio.run(self.progress_manager.complete_task(task_chain.callback_task_id))
+            asyncio.run(self.progress_manager.complete_task(task_chain.callback_task_id, task_chain.user_id))
 
             # 清理Redis文件缓存
             try:
@@ -260,7 +260,7 @@ class WorkflowManager:
                 "status": "failed",
                 "timestamp": datetime.now().isoformat()
             }
-            asyncio.run(self.progress_manager.complete_task(task_chain.callback_task_id))
+            asyncio.run(self.progress_manager.complete_task(task_chain.callback_task_id, task_chain.user_id, "failed", error_result))
 
             raise
         finally:

+ 13 - 9
core/construction_review/component/reviewers/base_reviewer.py

@@ -104,16 +104,20 @@ class BaseReviewer(ABC):
                 'timestamp': time.time()
             }
 
-            asyncio.create_task(
-                state["progress_manager"].update_stage_progress(
-                    callback_task_id=state["callback_task_id"],
-                    stage_name=stage_name,
-                    current=current_progress,
-                    status="processing",
-                    message=f"{name} 审查完成, 耗时: {result.execution_time:.2f}s",
-                    issues=[review_result_data]
+            # 推送审查完成信息
+            if state and state.get("progress_manager"):
+                # 使用专门的事件类型,避免覆盖主进度的processing_flag
+                asyncio.create_task(
+                    state["progress_manager"].update_stage_progress(
+                        callback_task_id=state["callback_task_id"],
+                        stage_name=stage_name,
+                        current=None,  # 明确不更新current,保持主流程进度
+                        status="processing",
+                        message=f"{name} 审查完成, 耗时: {result.execution_time:.2f}s",
+                        issues=[review_result_data],
+                        event_type="processing"  # 使用专门的事件类型
+                    )
                 )
-            )
             logger.info(f"{name} 审查完成, 耗时: {result.execution_time:.2f}s")
 
             return result

+ 54 - 42
core/construction_review/workflows/ai_review_workflow.py

@@ -209,7 +209,8 @@ class AIReviewWorkflow:
                 stage_name="AI审查",
                 current=0,
                 status="processing",
-                message="开始AI审查"
+                message="开始AI审查",
+                event_type="processing"
             )
         else:
             logger.warning(f"AI审查工作流中未找到ProgressManager: {state.get('progress_manager', 'None')}")
@@ -249,7 +250,8 @@ class AIReviewWorkflow:
                     stage_name="AI审查",
                     current=0,
                     status="processing",
-                    message=f"开始AI审查,共 {total_units} 个审查单元"
+                    message=f"开始AI审查,共 {total_units} 个审查单元",
+                    event_type="processing"
                 )
 
             
@@ -272,8 +274,11 @@ class AIReviewWorkflow:
 
                         # 方法内部进度计算(基于当前处理的单元)
                         current_progress = int((unit_index / total_units) * 100)
-                        progress_message = f"正在处理第 {unit_index + 1}/{total_units} 个单元: {section_label}"
-                        
+
+                        # 构建进度消息
+                        message = f"正在处理第 {unit_index + 1}/{total_units} 个单元: {section_label}"
+                        logger.info(f"开始处理单元: {unit_index + 1}/{total_units} - {section_label}")
+
                         # 并发执行各种原子化审查方法
                         review_tasks = [
                             self.ai_review_engine.basic_compliance_check(trace_id_idx, unit_content, stage_name, state, current_progress),
@@ -302,50 +307,53 @@ class AIReviewWorkflow:
                             basic_result,
                             technical_result
                         )
-                        logger.info(f"issues: {issues}")
-                        # 统计发现问题数量
-                        issues_count = sum(len(issue.get("review_lists", [])) for issue in issues)
+                        #logger.info(f"issues: {issues}")
 
-                        # 更新进度
+                        # 单元审查完成,更新进度
                         nonlocal completed_units
                         completed_units += 1
-                        current = int((completed_units / total_units) * 100)
+                        current = int((completed_units / total_units) * 100)  # 修正进度计算
+
+                        # 统计发现问题数量
+                        issues_count = sum(len(issue.get("review_lists", [])) for issue in issues)
 
-                        # 构建包含问题信息的消息
+                        # 构建完成消息
                         if issues_count > 0:
-                            message = f"正在处理第 {unit_index + 1}/{total_units} 个单元: {section_label}(已发现{issues_count}个问题)"
+                            message = f"已完成第 {unit_index + 1}/{total_units} 个单元: {section_label}(已发现{issues_count}个问题)"
                         else:
-                            message = f"正在处理第 {unit_index + 1}/{total_units} 个单元: {section_label}"
+                            message = f"已完成第 {unit_index + 1}/{total_units} 个单元: {section_label}"
 
-                        logger.info(f"更新进度: {current}% {message}")
+                        logger.info(f"单元审查完成,更新进度: {current}% {message}")
 
-                        # 更新ProgressManager进度,包含issues信息
+                        # 发送processing_flag事件(在单元完成时)
                         if state["progress_manager"]:
-                            # 如果有issues,通过额外参数传递
-                            if issues:
-                                asyncio.create_task(
-                                    state["progress_manager"].update_stage_progress(
-                                        callback_task_id=state["callback_task_id"],
-                                        stage_name=stage_name,
-                                        current=current,
-                                        status="unit_review_update",
-                                        message=message,
-                                        issues=issues,
-                                        user_id=state.get("user_id", ""),
-                                        overall_task_status="processing"
-                                    )
-                                )
+                            logger.info(f"发送processing_flag事件: current={current}, message={message}")
+                            await state["progress_manager"].update_stage_progress(
+                                callback_task_id=state["callback_task_id"],
+                                stage_name="AI审查",
+                                current=current,
+                                status="processing",
+                                message=message,
+                                user_id=state.get("user_id", ""),
+                                overall_task_status="processing",
+                                event_type="processing_flag"
+                            )
 
-                            else:
-                                asyncio.create_task(
-                                    state["progress_manager"].update_stage_progress(
-                                        callback_task_id=state["callback_task_id"],
-                                        stage_name=stage_name,
-                                        current=current,
-                                        status="processing",
-                                        message=message
-                                    )
+                        # 如果有issues,额外推送详细信息
+                        if issues:
+                            asyncio.create_task(
+                                state["progress_manager"].update_stage_progress(
+                                    callback_task_id=state["callback_task_id"],
+                                    stage_name=stage_name,  # 保留详细的stage_name
+                                    current=current,
+                                    status="unit_review_update",
+                                    message=f"发现{issues_count}个问题: {section_label}",
+                                    issues=issues,
+                                    user_id=state.get("user_id", ""),
+                                    overall_task_status="processing",
+                                    event_type="unit_review"  # 明确设置事件类型
                                 )
+                            )
 
                         
                         # 清空当前issues
@@ -426,8 +434,10 @@ class AIReviewWorkflow:
                 callback_task_id=state["callback_task_id"],
                 stage_name="AI审查",
                 current=90,
-                status="completed",
-                message="AI审查完成"
+                status="processing",
+                message="AI审查完成",
+                overall_task_status="processing",
+                event_type="processing"
             )
 
         state["messages"].append(AIMessage(content="AI审查工作流完成"))
@@ -448,7 +458,9 @@ class AIReviewWorkflow:
                 stage_name="AI审查",
                 current=50,
                 status="failed",
-                message=f"AI审查失败: {state['error_message']}"
+                message=f"AI审查失败: {state['error_message']}",
+                overall_task_status="failed",
+                event_type="error"
             )
 
         state["messages"].append(AIMessage(
@@ -560,7 +572,7 @@ class AIReviewWorkflow:
                 logger.info(f"跳过分数字段: {check_key}")
                 continue
 
-            logger.info(f"检查项 {check_key} 的结果: {check_result}")
+            #logger.info(f"检查项 {check_key} 的结果: {check_result}")
 
             if check_result and "details" in check_result and "response" in check_result["details"]:
                 response = check_result["details"]["response"]
@@ -588,7 +600,7 @@ class AIReviewWorkflow:
         if review_lists:
             issue = {
                 "issue_id": f"{callback_task_id}-{max_risk_level}-{unit_index}",
-                "metadata": {
+                "metadata": { 
                     "review_location_label": review_location_label,
                     "original_content": unit_content.get('content', '')
                 },

+ 1 - 1
foundation/logger/loggering.py

@@ -30,7 +30,7 @@ class CompatibleLogger(logging.Logger):
                  log_format=None, datefmt=None):
         # 初始化父类
         super().__init__(name)
-        self.setLevel(logging.INFO)  # 设置logger自身为最低级别
+        self.setLevel(logging.DEBUG)  # 设置logger自身为最低级别
 
         # 存储配置
         self.log_dir = log_dir

+ 22 - 4
foundation/utils/redis_utils.py

@@ -90,11 +90,29 @@ async def store_file_info(file_id: str, file_info: Dict[str, Any], expire_second
             # 解析现有元数据
             existing_file_info = json.loads(existing_meta.decode('utf-8'))
             # 更新callback_task_id为最新的
-            existing_file_info['callback_task_id'] = file_info.get('callback_task_id', existing_file_info['callback_task_id'])
+            if 'callback_task_id' in file_info:
+                existing_file_info['callback_task_id'] = file_info['callback_task_id']
+            elif 'callback_task_id' not in existing_file_info:
+                # 如果两者都没有callback_task_id,添加一个新的
+                existing_file_info['callback_task_id'] = None
+
+            # 并行更新meta和content的TTL,确保同步过期
+            update_tasks = [
+                redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(existing_file_info))
+            ]
+
+            # 如果存在content,也需要更新其TTL以保持同步
+            content_key = f"content:{file_id}"
+            existing_content = await redis_store.get(content_key)
+            if existing_content:
+                update_tasks.append(redis_store.setex(content_key, expire_seconds, existing_content))
+                server_logger.info(f"同步更新content的TTL: {content_key}")
+            else:
+                server_logger.warning(f"未找到content键,只更新meta TTL: {content_key}")
 
-            # 存储更新后的元数据
-            await redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(existing_file_info))
-            server_logger.info(f"文件信息已存在,更新callback_task_id: {file_id} -> {existing_file_info['callback_task_id']}")
+            # 执行并行更新
+            await asyncio.gather(*update_tasks)
+            server_logger.info(f"文件信息已存在,同步更新TTL: {file_id} -> {existing_file_info['callback_task_id']}")
             return True
 
         # 提取文件内容

Rozdílová data souboru nebyla zobrazena, protože soubor je příliš velký
+ 1 - 1
temp/AI审查结果.json


+ 235 - 0
test/redis_ttl_bug_test.py

@@ -0,0 +1,235 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Redis TTL不同步Bug单元测试
+
+验证问题:store_file_info函数只更新meta的TTL,不更新content的TTL,
+导致两个键的过期时间不同步,造成间歇性缓存失败。
+"""
+
+import asyncio
+import json
+import time
+import pytest
+from typing import Dict, Any
+
+# 导入被测试的模块
+import sys
+from pathlib import Path
+root_dir = Path(__file__).parent.parent
+sys.path.append(str(root_dir))
+
+from foundation.utils.redis_utils import store_file_info, get_file_info
+from foundation.base.redis_connection import RedisConnectionFactory
+
+
+class TestRedisTTLSync:
+    """Redis TTL同步问题测试类"""
+
+    @pytest.fixture
+    async def redis_store(self):
+        """Redis连接fixture"""
+        redis_store = await RedisConnectionFactory.get_redis_store()
+        yield redis_store
+        # 测试后清理
+        await RedisConnectionFactory.close_all()
+
+    @pytest.fixture
+    async def test_file_data(self):
+        """测试文件数据fixture"""
+        file_id = "test_ttl_bug_file_123"
+        file_info = {
+            "file_name": "测试文档.pdf",
+            "file_size": 1024,
+            "file_content": "这是一个测试文件内容,用于验证TTL不同步问题。" * 50,
+            "callback_task_id": "original_task_id",
+            "upload_time": int(time.time())
+        }
+        return file_id, file_info
+
+    async def test_ttl_sync_bug_reproduction(self, redis_store, test_file_data):
+        """
+        测试1: 复现TTL不同步Bug
+
+        1. 存储完整文件信息
+        2. 更新callback_task_id(触发TTL不同步)
+        3. 等待原始content过期
+        4. 验证获取文件内容失败
+        """
+        file_id, file_info = test_file_data
+
+        print("\n=== 开始测试TTL不同步Bug复现 ===")
+
+        # 步骤1: 存储完整文件信息,TTL=5秒(便于测试)
+        print("1. 存储完整文件信息...")
+        success = await store_file_info(file_id, file_info, expire_seconds=5)
+        assert success, "存储文件信息失败"
+
+        # 验证初始状态
+        print("2. 验证初始状态...")
+        retrieved_info = await get_file_info(file_id, include_content=True)
+        assert retrieved_info is not None, "初始获取文件信息失败"
+        assert retrieved_info['file_content'] is not None, "初始文件内容为空"
+        print(f"   [OK] 文件内容长度: {len(retrieved_info['file_content'])}")
+
+        # 检查键的TTL
+        meta_ttl = await redis_store.ttl(f"meta:{file_id}")
+        content_ttl = await redis_store.ttl(f"content:{file_id}")
+        print(f"   [OK] 初始TTL - meta: {meta_ttl}s, content: {content_ttl}s")
+        assert abs(meta_ttl - content_ttl) <= 1, "初始TTL应该基本同步"
+
+        # 步骤3: 等待3秒,让TTL减少一些
+        print("3. 等待3秒...")
+        await asyncio.sleep(3)
+
+        # 步骤4: 更新callback_task_id(触发Bug)
+        print("4. 更新callback_task_id(触发TTL不同步Bug)...")
+        update_success = await store_file_info(
+            file_id,
+            {'callback_task_id': 'updated_task_id'},
+            expire_seconds=10  # 设置更长的TTL
+        )
+        assert update_success, "更新callback_task_id失败"
+
+        # 步骤5: 检查TTL不同步问题
+        print("5. 检查TTL不同步问题...")
+        new_meta_ttl = await redis_store.ttl(f"meta:{file_id}")
+        new_content_ttl = await redis_store.ttl(f"content:{file_id}")
+        print(f"   [OK] 更新后TTL - meta: {new_meta_ttl}s, content: {new_content_ttl}s")
+
+        # 验证TTL不同步:meta的TTL应该重置为10秒,content应该继续倒计时(约2秒)
+        assert new_meta_ttl > 8, f"meta的TTL应该被重置为约10秒,实际为{new_meta_ttl}"
+        assert new_content_ttl < 5, f"content的TTL应该继续倒计时,实际为{new_content_ttl}"
+        assert abs(new_meta_ttl - new_content_ttl) > 3, "TTL应该明显不同步"
+
+        print("   [PASS] TTL不同步Bug已复现!")
+
+        # 步骤6: 等待content过期
+        print("6. 等待content过期...")
+        await asyncio.sleep(new_content_ttl + 1)
+
+        # 步骤7: 验证获取文件内容失败
+        print("7. 验证content过期后获取失败...")
+        final_info = await get_file_info(file_id, include_content=True)
+
+        # 根据当前bug逻辑,这里应该返回None(因为content缺失)
+        assert final_info is None, f"content过期后应该返回None,实际返回: {final_info}"
+
+        print("   [PASS] Bug验证成功:content过期后无法获取文件信息")
+
+        # 清理测试数据
+        await redis_store.delete(f"meta:{file_id}", f"content:{file_id}")
+
+    async def test_only_meta_retrieval_works(self, redis_store, test_file_data):
+        """
+        测试2: 验证不包含content的获取仍然正常工作
+
+        即使content过期,meta信息应该仍然可以获取
+        """
+        file_id, file_info = test_file_data
+
+        print("\n=== 测试meta单独获取功能 ===")
+
+        # 存储文件信息
+        await store_file_info(file_id, file_info, expire_seconds=3)
+
+        # 更新callback_task_id
+        await store_file_info(file_id, {'callback_task_id': 'updated'}, expire_seconds=10)
+
+        # 等待content过期
+        await asyncio.sleep(4)
+
+        # 测试不包含content的获取
+        meta_only_info = await get_file_info(file_id, include_content=False)
+
+        assert meta_only_info is not None, "meta信息应该可以正常获取"
+        assert meta_only_info['callback_task_id'] == 'updated', "callback_task_id应该被正确更新"
+        assert 'file_content' not in meta_only_info, "不应该包含file_content"
+
+        print("   [PASS] meta信息单独获取正常")
+
+        # 清理
+        await redis_store.delete(f"meta:{file_id}")
+
+    async def test_current_bug_detection_in_logs(self, redis_store, test_file_data):
+        """
+        测试3: 模拟日志中观察到的实际场景
+
+        复现日志中的时间序列:
+        - 11:41:53 文件信息存储
+        - 11:42:01 获取失败(8秒后)
+        """
+        file_id, file_info = test_file_data
+
+        print("\n=== 模拟日志中的实际场景 ===")
+
+        # 步骤1: 模拟文件上传(11:41:53)
+        print("1. 模拟文件上传(11:41:53)...")
+        upload_time = time.time()
+        await store_file_info(file_id, file_info, expire_seconds=10)  # 10秒TTL
+
+        # 步骤2: 模拟一些处理时间
+        await asyncio.sleep(2)
+
+        # 步骤3: 模拟callback_task_id更新(可能发生在中间某个时间点)
+        print("3. 模拟callback_task_id更新...")
+        await store_file_info(file_id, {'callback_task_id': 'updated_task'}, expire_seconds=15)
+
+        # 步骤4: 等待到11:42:01(约8秒后)
+        elapsed_time = time.time() - upload_time
+        wait_time = 8 - elapsed_time
+        if wait_time > 0:
+            print(f"4. 等待到11:42:01(还需要{wait_time:.1f}秒)...")
+            await asyncio.sleep(wait_time)
+
+        # 步骤5: 验证获取失败(如日志所示)
+        print("5. 验证11:42:01时的获取失败...")
+        failed_info = await get_file_info(file_id, include_content=True)
+
+        assert failed_info is None, "模拟11:42:01时应该获取失败"
+        print("   [PASS] 成功复现日志中的失败场景")
+
+        # 清理
+        await redis_store.delete(f"meta:{file_id}")
+
+
+async def run_ttl_bug_tests():
+    """运行TTL Bug测试"""
+    print("[TEST] 开始Redis TTL不同步Bug测试")
+
+    test_instance = TestRedisTTLSync()
+
+    try:
+        # 获取redis连接
+        redis_store = await RedisConnectionFactory.get_redis_store()
+
+        # 准备测试数据
+        file_id = "test_ttl_bug_file_123"
+        file_info = {
+            "file_name": "测试文档.pdf",
+            "file_size": 1024,
+            "file_content": "这是一个测试文件内容,用于验证TTL不同步问题。" * 50,
+            "callback_task_id": "original_task_id",
+            "upload_time": int(time.time())
+        }
+        test_file_data = (file_id, file_info)
+
+        # 运行测试
+        await test_instance.test_ttl_sync_bug_reproduction(redis_store, test_file_data)
+        await test_instance.test_only_meta_retrieval_works(redis_store, test_file_data)
+        await test_instance.test_current_bug_detection_in_logs(redis_store, test_file_data)
+
+        print("\n[PASS] 所有TTL Bug测试通过!")
+
+    except Exception as e:
+        print(f"\n[FAIL] 测试失败: {e}")
+        import traceback
+        traceback.print_exc()
+
+    finally:
+        await RedisConnectionFactory.close_all()
+
+
+if __name__ == "__main__":
+    # 直接运行测试
+    asyncio.run(run_ttl_bug_tests())

+ 185 - 0
test/redis_ttl_fix_test.py

@@ -0,0 +1,185 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Redis TTL不同步Bug修复验证测试
+
+验证修复后的store_file_info函数能够同步更新meta和content的TTL
+"""
+
+import asyncio
+import json
+import time
+import sys
+from pathlib import Path
+
+root_dir = Path(__file__).parent.parent
+sys.path.append(str(root_dir))
+
+from foundation.utils.redis_utils import store_file_info, get_file_info
+from foundation.base.redis_connection import RedisConnectionFactory
+
+
+async def test_ttl_sync_fix():
+    """测试TTL同步修复效果"""
+    print("[TEST] 开始验证TTL同步修复效果")
+
+    try:
+        redis_store = await RedisConnectionFactory.get_redis_store()
+        file_id = "test_ttl_fix_file_456"
+        file_info = {
+            "file_name": "修复测试文档.pdf",
+            "file_size": 2048,
+            "file_content": "这是用于验证修复效果的测试文件内容。" * 100,
+            "callback_task_id": "original_task",
+            "upload_time": int(time.time())
+        }
+
+        print("\n=== 步骤1: 存储完整文件信息 ===")
+        # 存储文件信息,TTL=8秒
+        success = await store_file_info(file_id, file_info, expire_seconds=8)
+        assert success, "存储文件信息失败"
+
+        # 验证初始TTL同步
+        meta_ttl = await redis_store.ttl(f"meta:{file_id}")
+        content_ttl = await redis_store.ttl(f"content:{file_id}")
+        print(f"初始TTL - meta: {meta_ttl}s, content: {content_ttl}s")
+        assert abs(meta_ttl - content_ttl) <= 1, "初始TTL应该同步"
+
+        print("\n=== 步骤2: 等待2秒后更新callback_task_id ===")
+        # 等待2秒,让TTL减少
+        await asyncio.sleep(2)
+
+        # 更新callback_task_id,触发TTL同步修复逻辑
+        update_success = await store_file_info(
+            file_id,
+            {'callback_task_id': 'updated_task'},
+            expire_seconds=15  # 设置更长的TTL
+        )
+        assert update_success, "更新callback_task_id失败"
+
+        print("\n=== 步骤3: 验证TTL同步修复效果 ===")
+        # 检查TTL是否同步更新
+        new_meta_ttl = await redis_store.ttl(f"meta:{file_id}")
+        new_content_ttl = await redis_store.ttl(f"content:{file_id}")
+        print(f"更新后TTL - meta: {new_meta_ttl}s, content: {new_content_ttl}s")
+
+        # 验证修复效果:两个TTL都应该被重置为15秒左右
+        assert new_meta_ttl > 12, f"meta的TTL应该被重置为约15秒,实际为{new_meta_ttl}"
+        assert new_content_ttl > 12, f"content的TTL也应该被重置为约15秒,实际为{new_content_ttl}"
+        assert abs(new_meta_ttl - new_content_ttl) <= 2, "修复后TTL应该保持同步"
+
+        print("[PASS] TTL同步修复验证成功!")
+
+        print("\n=== 步骤4: 验证文件内容获取正常 ===")
+        # 等待一段时间后验证文件内容仍然可以获取
+        await asyncio.sleep(3)
+
+        retrieved_info = await get_file_info(file_id, include_content=True)
+        assert retrieved_info is not None, "修复后应该能正常获取文件信息"
+        assert retrieved_info['file_content'] is not None, "修复后文件内容应该存在"
+        assert retrieved_info['callback_task_id'] == 'updated_task', "callback_task_id应该被正确更新"
+
+        print(f"[PASS] 文件信息获取正常,内容长度: {len(retrieved_info['file_content'])}")
+
+        print("\n=== 步骤5: 验证长期稳定性 ===")
+        # 再等待一段时间,验证TTL继续同步
+        await asyncio.sleep(3)
+
+        final_meta_ttl = await redis_store.ttl(f"meta:{file_id}")
+        final_content_ttl = await redis_store.ttl(f"content:{file_id}")
+        print(f"最终TTL - meta: {final_meta_ttl}s, content: {final_content_ttl}s")
+
+        # TTL差值应该仍然很小
+        assert abs(final_meta_ttl - final_content_ttl) <= 2, "TTL应该保持同步"
+
+        # 文件内容应该仍然可以获取
+        final_info = await get_file_info(file_id, include_content=True)
+        assert final_info is not None, "最终应该能获取文件信息"
+        assert final_info['file_content'] is not None, "最终文件内容应该存在"
+
+        print("[PASS] 长期稳定性验证成功!")
+
+        print("\n=== 步骤6: 对比修复前后行为 ===")
+
+        # 清理测试文件
+        await redis_store.delete(f"meta:{file_id}", f"content:{file_id}")
+
+        # 模拟修复前的错误行为(如果需要对比)
+        print("修复前行为: meta和content TTL不同步,导致间歇性获取失败")
+        print("修复后行为: meta和content TTL保持同步,确保稳定获取")
+        print("[PASS] 修复效果验证完成!")
+
+    except Exception as e:
+        print(f"[FAIL] 测试失败: {e}")
+        import traceback
+        traceback.print_exc()
+
+    finally:
+        await RedisConnectionFactory.close_all()
+
+
+async def test_edge_cases():
+    """测试边界情况"""
+    print("\n[TEST] 开始测试边界情况")
+
+    try:
+        redis_store = await RedisConnectionFactory.get_redis_store()
+
+        # 测试1: 只有meta没有content的情况
+        file_id_1 = "test_edge_meta_only"
+        await redis_store.setex(f"meta:{file_id_1}", 10, json.dumps({"test": "meta_only"}))
+
+        success = await store_file_info(file_id_1, {'callback_task_id': 'edge_test'}, expire_seconds=15)
+        assert success, "只有meta的情况下更新应该成功"
+
+        meta_ttl = await redis_store.ttl(f"meta:{file_id_1}")
+        assert meta_ttl > 12, "meta的TTL应该被更新"
+        print("[PASS] 只有meta的边界情况测试通过")
+
+        await redis_store.delete(f"meta:{file_id_1}")
+
+        # 测试2: 新文件存储的正常行为
+        file_id_2 = "test_edge_new_file"
+        new_file_info = {
+            "file_name": "新文件.pdf",
+            "file_content": "新文件内容",
+            "callback_task_id": "new_task"
+        }
+
+        success = await store_file_info(file_id_2, new_file_info, expire_seconds=10)
+        assert success, "新文件存储应该成功"
+
+        meta_ttl = await redis_store.ttl(f"meta:{file_id_2}")
+        content_ttl = await redis_store.ttl(f"content:{file_id_2}")
+        assert abs(meta_ttl - content_ttl) <= 1, "新文件的TTL应该同步"
+        print("[PASS] 新文件存储的边界情况测试通过")
+
+        await redis_store.delete(f"meta:{file_id_2}", f"content:{file_id_2}")
+
+        print("[PASS] 所有边界情况测试通过!")
+
+    except Exception as e:
+        print(f"[FAIL] 边界情况测试失败: {e}")
+        import traceback
+        traceback.print_exc()
+
+    finally:
+        await RedisConnectionFactory.close_all()
+
+
+async def run_fix_validation_tests():
+    """运行修复验证测试"""
+    print("=" * 60)
+    print("Redis TTL不同步Bug修复验证测试")
+    print("=" * 60)
+
+    await test_ttl_sync_fix()
+    await test_edge_cases()
+
+    print("\n" + "=" * 60)
+    print("[SUCCESS] 所有修复验证测试通过!")
+    print("=" * 60)
+
+
+if __name__ == "__main__":
+    asyncio.run(run_fix_validation_tests())

+ 145 - 45
views/construction_review/launch_review.py

@@ -16,7 +16,7 @@ from fastapi.responses import StreamingResponse
 from core.base.redis_duplicate_checker import RedisDuplicateChecker
 from foundation.logger.loggering import server_logger as logger
 from foundation.trace.trace_context import TraceContext, auto_trace
-from foundation.utils.redis_utils import get_file_info
+from foundation.utils.redis_utils import get_file_info,store_file_info
 from core.base.workflow_manager import WorkflowManager
 from core.base.progress_manager import ProgressManager, sse_callback_manager
 from views.construction_review.file_upload import validate_upload_parameters
@@ -66,8 +66,11 @@ class SimpleSSEManager:
         """发送进度更新 - 将进度数据放入队列推送给客户端"""
         queue = self.connections.get(callback_task_id)
         if queue:
-            # 根据数据状态决定事件类型
-            event_type = "unit_review_update" if current_data.get("status") == "unit_review_update" else "progress_update"
+            # 优先使用progress_manager传递的event_type,如果没有则使用默认逻辑
+            event_type = current_data.get("event_type", "processing")
+            # 保持向后兼容性
+            if event_type == "processing" and current_data.get("status") == "unit_review_update":
+                event_type = "unit_review_update"
 
             await queue.put({
                 "type": event_type,
@@ -95,6 +98,10 @@ class LaunchReviewRequest(BaseModel):
     """启动审查请求模型"""
     callback_task_id: str = Field(..., description="回调任务ID,从文件上传接口获取")
     user_id: str = Field(..., description="用户标识")
+    tendency_review_role: str = Field(
+        "default_role",
+        description="倾向性审查角色,如选项为 一线施工人员、项目负责人、监理、检测员、项目总工、环保检测人员等"
+    )
     review_config: List[str] = Field(
         ...,
         description="审查配置列表,包含的项为启用状态"
@@ -146,6 +153,26 @@ def validate_project_plan_type(project_plan_type: str) -> None:
     if project_plan_type not in supported_types:
         raise LaunchReviewErrors.project_plan_type_invalid()
 
+def validate_tendency_review_role(tendency_review_role: str) -> None:
+    """验证倾向性审查角色"""
+    # 当前支持的倾向性审查角色类型
+    supported_roles = {
+        'default_role',          # 默认角色
+    }
+
+    if tendency_review_role not in supported_roles:
+        raise LaunchReviewErrors.tendency_review_role_invalid()
+
+def validate_user_id(user_id: str) -> None:
+    """验证用户标识"""
+    # 当前支持的用户标识列表
+    supported_users = {
+        'user-001'
+    }
+
+    if user_id not in supported_users:
+        raise LaunchReviewErrors.invalid_user()
+
 
 @launch_review_router.post("/sse/launch_review")
 @auto_trace(generate_if_missing=True)
@@ -161,10 +188,15 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
     """
     callback_task_id = request_data.callback_task_id
     TraceContext.set_trace_id(callback_task_id)
+    user_id = request_data.user_id
     review_config = request_data.review_config
     project_plan_type = request_data.project_plan_type
+    tendency_review_role = request_data.tendency_review_role
+
+    logger.info(f"收到审查启动SSE请求: callback_task_id={callback_task_id}, user_id={user_id}, tendency_review_role={tendency_review_role}")
 
-    logger.info(f"收到审查启动SSE请求: callback_task_id={callback_task_id}")
+    # 验证用户标识
+    validate_user_id(user_id)
 
     # 验证审查配置
     validate_review_config(review_config)
@@ -172,6 +204,9 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
     # 验证工程方案类型
     validate_project_plan_type(project_plan_type)
 
+    # 验证倾向性审查角色
+    validate_tendency_review_role(tendency_review_role)
+
     # 注册SSE回调
     sse_callback_manager.register_callback(callback_task_id, sse_progress_callback)
     queue = await sse_manager.connect(callback_task_id)
@@ -182,9 +217,14 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
             # 发送连接确认
             connected_data = json.dumps({
                 "callback_task_id": callback_task_id,
-                "stage": "startup",
+                "user_id": user_id,
+                "current": 0,
+                "stage_name": "启动审查SSE连接",
+                "status": "connected",
                 "message": "启动审查SSE连接已建立,正在处理请求...",
-                "timestamp": datetime.now().isoformat()
+                "overall_task_status": "processing",
+                "updated_at": int(time.time()),
+                "issues": []
             }, ensure_ascii=False)
             yield format_sse_event("connected", connected_data)
 
@@ -197,9 +237,14 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
                 # 发送处理状态
                 status_data = json.dumps({
                     "callback_task_id": callback_task_id,
-                    "stage": "validation",
+                    "user_id": user_id,
+                    "current": 5,
+                    "stage_name": f"验证文件信息: {file_id}",
+                    "status": "processing",
                     "message": f"正在验证文件信息: {file_id}",
-                    "timestamp": datetime.now().isoformat()
+                    "overall_task_status": "processing",
+                    "updated_at": int(time.time()),
+                    "issues": []
                 }, ensure_ascii=False)
                 yield format_sse_event("processing", status_data)
 
@@ -222,7 +267,6 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
 
                 # 立即更新Redis中的callback_task_id为当前值
                 try:
-                    from foundation.utils.redis_utils import store_file_info
                     await store_file_info(file_id, {'callback_task_id': callback_task_id})
                     logger.info(f"已更新Redis中的callback_task_id: {callback_task_id}")
                 except Exception as e:
@@ -230,8 +274,10 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
 
                 # 添加审查配置到文件信息,并确保使用当前正确的callback_task_id
                 file_info.update({
+                    'user_id': user_id,
                     'review_config': review_config,
                     'project_plan_type': project_plan_type,
+                    'tendency_review_role': tendency_review_role,
                     'launched_at': int(time.time()),
                     'callback_task_id': callback_task_id  # 确保使用当前正确的callback_task_id
                 })
@@ -244,13 +290,14 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
                 # 发送成功启动状态
                 success_data = json.dumps({
                     "callback_task_id": callback_task_id,
-                    "file_id": file_info['file_id'],
-                    "review_config": review_config,
-                    "project_plan_type": project_plan_type,
+                    "user_id": user_id,
+                    "current": 10,
+                    "stage_name": "任务启动成功",
                     "status": "submitted",
-                    "submitted_at": file_info['launched_at'],
                     "message": "施工方案审查任务启动成功,请耐心等待结果...",
-                    "timestamp": datetime.now().isoformat()
+                    "overall_task_status": "processing",
+                    "updated_at": int(time.time()),
+                    "issues": []
                 }, ensure_ascii=False)
                 yield format_sse_event("submitted", success_data)
 
@@ -260,31 +307,66 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
                     try:
                         message = await queue.get()
 
-                        if message.get("type") == "progress_update":
-                            current_data = message.get("data")
-                            if current_data:
-                                progress_json = json.dumps(current_data, ensure_ascii=False)
-                                yield format_sse_event("progress", progress_json)
-                        elif message.get("type") == "unit_review_update":
-                            current_data = message.get("data")
-                            if current_data:
-                                unit_review_json = json.dumps(current_data, ensure_ascii=False)
-                                yield format_sse_event("unit_review", unit_review_json)
-
-                        # # 统一检查任务完成状态
-                        # if 'current_data' in locals() and current_data:
-                        #     overall_task_status = current_data.get("overall_task_status")
-                        #     if overall_task_status in ["completed", "failed"]:
-                        #         completion_data = {
-                        #             "callback_task_id": callback_task_id,
-                        #             "task_status": overall_task_status,
-                        #             "overall_progress": current_data.get("current", 100),
-                        #             "timestamp": datetime.now().isoformat(),
-                        #             "message": "审查任务处理完成!"
-                        #         }
-                        #         completion_json = json.dumps(completion_data, ensure_ascii=False)
-                        #         yield format_sse_event("completed", completion_json)
-                        #         break
+                        # 处理所有类型的进度更新消息
+                        message_type = message.get("type")
+                        current_data = message.get("data")
+
+                        if current_data:
+                            # 根据消息类型决定数据格式
+                            if message_type == "unit_review_update":
+                                # 单元审查更新的特殊格式
+                                unified_data = {
+                                    "callback_task_id": callback_task_id,
+                                    "user_id": user_id,
+                                    "current": current_data.get("current", 0),
+                                    "stage_name": current_data.get("stage_name", "单元审查"),
+                                    "status": "unit_review_update",
+                                    "message": current_data.get("message", ""),
+                                    "overall_task_status": current_data.get("overall_task_status", "processing"),
+                                    "updated_at": current_data.get("updated_at", int(time.time())),
+                                    "issues": current_data.get("issues", [])
+                                }
+                            else:
+                                # 通用进度更新格式(包括 processing_flag, processing, completed 等)
+                                unified_data = {
+                                    "callback_task_id": callback_task_id,
+                                    "user_id": user_id,
+                                    "current": current_data.get("current", 0),
+                                    "stage_name": current_data.get("stage_name", "处理中"),
+                                    "status": current_data.get("status", "processing"),
+                                    "message": current_data.get("message", ""),
+                                    "overall_task_status": current_data.get("overall_task_status", "processing"),
+                                    "updated_at": current_data.get("updated_at", int(time.time())),
+                                    "issues": current_data.get("issues", [])
+                                }
+
+                            # 使用从progress_manager传递的事件类型,或回退到消息类型
+                            sse_event_type = current_data.get("event_type", message_type)
+                            if not sse_event_type:
+                                sse_event_type = "processing"  # 最终回退
+
+                            logger.debug(f"生成SSE事件: {sse_event_type}, 消息类型: {message_type}, current: {current_data.get('current')}")
+                            unified_data_json = json.dumps(unified_data, ensure_ascii=False)
+                            yield format_sse_event(sse_event_type, unified_data_json)
+
+                        # 统一检查任务完成状态
+                        if current_data:
+                            overall_task_status = current_data.get("overall_task_status")
+                            if overall_task_status in ["completed", "failed"]:
+                                completion_data = {
+                                    "callback_task_id": callback_task_id,
+                                    "user_id": user_id,
+                                    "current": current_data.get("current", 100),
+                                    "stage_name": "审查完成",
+                                    "status": "completed",
+                                    "message": "施工审查方案处理完成!",
+                                    "overall_task_status": overall_task_status,
+                                    "updated_at": int(time.time()),
+                                    "issues": current_data.get("issues", []),
+                                }
+                                completion_json = json.dumps(completion_data, ensure_ascii=False)
+                                yield format_sse_event("completed", completion_json)
+                                break
 
                     except Exception as e:
                         logger.error(f"队列消息处理异常: {callback_task_id}")
@@ -298,9 +380,15 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
                 logger.error(f"异常堆栈: {traceback.format_exc()}")
                 error_data = json.dumps({
                     "callback_task_id": callback_task_id,
-                    "error": e.detail.get("code") if hasattr(e, 'detail') and e.detail else "http_error",
+                    "user_id": user_id,
+                    "current": 0,
+                    "stage_name": "处理异常",
+                    "status": "error",
                     "message": e.detail.get("message") if hasattr(e, 'detail') and e.detail else str(e),
-                    "timestamp": datetime.now().isoformat()
+                    "overall_task_status": "failed",
+                    "updated_at": int(time.time()),
+                    "issues": [],
+                    "error": e.detail.get("code") if hasattr(e, 'detail') and e.detail else "http_error"
                 }, ensure_ascii=False)
                 yield format_sse_event("error", error_data)
 
@@ -310,9 +398,15 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
                 logger.error(f"异常堆栈: {traceback.format_exc()}")
                 error_data = json.dumps({
                     "callback_task_id": callback_task_id,
-                    "error": "internal_error",
+                    "user_id": user_id,
+                    "current": 0,
+                    "stage_name": "内部错误",
+                    "status": "error",
                     "message": f"服务端内部错误: {str(e)}",
-                    "timestamp": datetime.now().isoformat()
+                    "overall_task_status": "failed",
+                    "updated_at": int(time.time()),
+                    "issues": [],
+                    "error": "internal_error"
                 }, ensure_ascii=False)
                 yield format_sse_event("error", error_data)
 
@@ -322,9 +416,15 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
             logger.error(f"异常堆栈: {traceback.format_exc()}")
             error_data = json.dumps({
                 "callback_task_id": callback_task_id,
-                "error": "sse_error",
+                "user_id": user_id if 'user_id' in locals() else "unknown",
+                "current": 0,
+                "stage_name": "SSE流异常",
+                "status": "error",
                 "message": f"SSE流异常: {str(e)}",
-                "timestamp": datetime.now().isoformat()
+                "overall_task_status": "failed",
+                "updated_at": int(time.time()),
+                "issues": [],
+                "error": "sse_error"
             }, ensure_ascii=False)
             yield format_sse_event("error", error_data)
 

+ 12 - 0
views/construction_review/schemas/error_schemas.py

@@ -172,6 +172,13 @@ class ErrorCodes:
         "status_code": 404
     }
 
+    QDSC014 = {
+        "code": "QDSC014",
+        "error_type": "TENDENCY_REVIEW_ROLE_INVALID",
+        "message": "倾向性审查角色无效(未提供或未注册)",
+        "status_code": 400
+    }
+
 
     # 审查结果接口错误码 (SCJG001-SCJG008)
     SCJG001 = {
@@ -408,6 +415,11 @@ class LaunchReviewErrors:
         logger.error(ErrorCodes.QDSC013)
         return create_http_exception(ErrorCodes.QDSC013)
 
+    @staticmethod
+    def tendency_review_role_invalid():
+        logger.error(ErrorCodes.QDSC014)
+        return create_http_exception(ErrorCodes.QDSC014)
+
 
 class ReviewResultsErrors:
     """审查结果接口错误"""

Některé soubory nejsou zobrazeny, neboť je v těchto rozdílových datech změněno mnoho souborů