Sfoglia il codice sorgente

v0.0.2-上传接口调通+celery队列管理+审查基本框架构建完成mock

wandaan 3 mesi fa
parent
commit
3a3a3505a4

BIN
build_graph_app.png


+ 1 - 1
config/config.ini

@@ -43,4 +43,4 @@ CONSOLE_OUTPUT=True
 
 
 [user_lists]
-USERS=['user-001']
+USERS=['user-001']

+ 125 - 0
core/base/progress_manager.py

@@ -0,0 +1,125 @@
+"""
+任务进度管理器
+负责任务进度的存储、更新和查询
+"""
+
+import json
+from typing import Dict, Any, Optional
+from datetime import datetime
+
+from foundation.logger.loggering import server_logger as logger
+
+class ProgressManager:
+    """任务进度管理器"""
+
+    def __init__(self):
+        self.progress_data = {}  # 简化:使用内存存储
+
+    async def initialize_progress(self, callback_task_id: str, user_id: str, stages: list):
+        """初始化进度记录"""
+        try:
+            self.progress_data[callback_task_id] = {
+                "user_id": user_id,
+                "overall_progress": 0,
+                "current_stage": stages[0]["stage_name"] if stages else "",
+                "stages": stages,
+                "updated_at": datetime.now()
+            }
+            logger.info(f"初始化任务进度: {callback_task_id}")
+
+        except Exception as e:
+            logger.error(f"初始化进度失败: {str(e)}")
+            raise
+
+    async def update_stage_progress(self, callback_task_id: str, stage_name: str,
+                                  progress: int, status: str, message: str = "",
+                                  sub_progress: int = 0):
+        """更新阶段进度"""
+        try:
+            if callback_task_id not in self.progress_data:
+                logger.warning(f"任务进度不存在: {callback_task_id}")
+                return
+
+            task_progress = self.progress_data[callback_task_id]
+
+            # 更新阶段进度
+            for stage in task_progress["stages"]:
+                if stage["stage_name"] == stage_name:
+                    stage["progress"] = progress
+                    stage["stage_status"] = status
+                    stage["message"] = message
+                    stage["sub_progress"] = sub_progress
+                    break
+
+            # 更新当前阶段和整体进度
+            task_progress["current_stage"] = stage_name
+            task_progress["overall_progress"] = self._calculate_overall_progress(task_progress["stages"])
+            task_progress["updated_at"] = datetime.now()
+
+            logger.debug(f"更新进度: {callback_task_id}, 阶段: {stage_name}, 进度: {progress}%")
+
+        except Exception as e:
+            logger.error(f"更新阶段进度失败: {str(e)}")
+            raise
+
+    async def get_progress(self, callback_task_id: str) -> Optional[Dict[str, Any]]:
+        """获取任务进度"""
+        try:
+            if callback_task_id not in self.progress_data:
+                return None
+
+            task_progress = self.progress_data[callback_task_id]
+
+            # 计算整体状态
+            if any(stage["stage_status"] == "failed" for stage in task_progress["stages"]):
+                review_task_status = "failed"
+            elif all(stage["stage_status"] == "completed" for stage in task_progress["stages"]):
+                review_task_status = "completed"
+            elif any(stage["stage_status"] == "processing" for stage in task_progress["stages"]):
+                review_task_status = "processing"
+            else:
+                review_task_status = "pending"
+
+            return {
+                "callback_task_id": callback_task_id,
+                "user_id": task_progress["user_id"],
+                "review_task_status": review_task_status,
+                "overall_progress": task_progress["overall_progress"],
+                "stages": task_progress["stages"],
+                "updated_at": int(task_progress["updated_at"].timestamp()),
+                "estimated_remaining": 600
+            }
+
+        except Exception as e:
+            logger.error(f"获取进度失败: {str(e)}")
+            return None
+
+    async def complete_task(self, callback_task_id: str, result: Dict[str, Any]):
+        """标记任务完成"""
+        try:
+            if callback_task_id in self.progress_data:
+                task_progress = self.progress_data[callback_task_id]
+
+                # 完成最后一个阶段
+                if task_progress["stages"]:
+                    task_progress["stages"][-1]["stage_status"] = "completed"
+                    task_progress["stages"][-1]["progress"] = 100
+
+                task_progress["overall_progress"] = 100
+                task_progress["updated_at"] = datetime.now()
+
+                # 保存结果
+                task_progress["result"] = result
+
+            logger.info(f"任务完成: {callback_task_id}")
+
+        except Exception as e:
+            logger.error(f"标记任务完成失败: {str(e)}")
+            raise
+
+    def _calculate_overall_progress(self, stages: list) -> int:
+        """计算整体进度"""
+        if not stages:
+            return 0
+        total_progress = sum(stage["progress"] for stage in stages)
+        return int(total_progress / len(stages))

+ 161 - 0
core/base/redis_duplicate_checker.py

@@ -0,0 +1,161 @@
+"""
+基于Redis的重复任务检查器
+支持多进程间的重复任务检查
+"""
+
+import os
+import json
+from datetime import datetime, timedelta
+import redis
+from foundation.logger.loggering import server_logger as logger
+
+
+class RedisDuplicateChecker:
+    """基于Redis的重复任务检查器"""
+
+    def __init__(self):
+        try:
+            # 从配置文件读取Redis连接信息
+            from foundation.base.config import config_handler
+            redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
+            redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
+            redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
+
+            # 构建Redis连接URL
+            if redis_password:
+                redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/2"
+            else:
+                redis_url = f"redis://{redis_host}:{redis_port}/2"
+
+            logger.info(f"连接Redis: {redis_url}")
+
+            # 连接Redis
+            self.redis_client = redis.from_url(redis_url, decode_responses=True)
+
+            # 测试连接
+            self.redis_client.ping()
+            logger.info("Redis重复检查器连接成功")
+            self.use_redis = True
+
+        except Exception as e:
+            logger.error(f"Redis连接失败,回退到内存模式: {str(e)}")
+            # 回退到内存模式
+            self.task_cache = {}
+            self.use_redis = False
+        else:
+            self.use_redis = True
+
+    async def is_duplicate_task(self, file_id: str) -> bool:
+        """检查是否为重复任务"""
+        try:
+            if self.use_redis:
+                # 使用Redis检查
+                task_info = self.redis_client.get(f"task:{file_id}")
+                if task_info:
+                    # 检查任务是否过期
+                    task_data = json.loads(task_info)
+                    created_at = datetime.fromisoformat(task_data['created_at'])
+
+                    if datetime.now() - created_at < timedelta(minutes=2):
+                        logger.info(f"发现重复任务: {file_id}")
+                        return True
+                    else:
+                        # 任务已过期,清理
+                        self.redis_client.delete(f"task:{file_id}")
+                        return False
+                return False
+            else:
+                # 回退到内存模式
+                if file_id in self.task_cache:
+                    logger.info(f"发现重复任务: {file_id}")
+                    return True
+                return False
+
+        except Exception as e:
+            logger.error(f"检查重复任务失败: {str(e)}")
+            return False
+
+    async def register_task(self, file_info: dict, callback_task_id: str):
+        """注册任务"""
+        try:
+            # 过滤掉不可序列化的字段(如file_content等bytes数据)
+            serializable_file_info = {
+                k: v for k, v in file_info.items()
+                if k not in ['file_content'] and not isinstance(v, bytes)
+            }
+
+            task_data = {
+                "callback_task_id": callback_task_id,
+                "created_at": datetime.now().isoformat(),
+                "file_info": serializable_file_info
+            }
+
+            if self.use_redis:
+                # 使用Redis存储,设置1小时过期
+                self.redis_client.setex(
+                    f"task:{file_info['file_id']}",
+                    3600,  # 1小时
+                    json.dumps(task_data, ensure_ascii=False)
+                )
+            else:
+                # 回退到内存模式
+                self.task_cache[file_info['file_id']] = task_data
+
+            logger.info(f"注册任务: {file_info['file_id']} -> {callback_task_id}")
+
+        except Exception as e:
+            logger.error(f"注册任务失败: {str(e)}")
+            raise
+
+    async def unregister_task(self, file_id: str):
+        """取消注册任务"""
+        try:
+            if self.use_redis:
+                self.redis_client.delete(f"task:{file_id}")
+            else:
+                if file_id in self.task_cache:
+                    del self.task_cache[file_id]
+
+            logger.info(f"取消注册任务: {file_id}")
+
+        except Exception as e:
+            logger.error(f"取消注册任务失败: {str(e)}")
+
+    async def get_task_info(self, file_id: str) -> str:
+        """获取任务信息"""
+        try:
+            if self.use_redis:
+                task_info = self.redis_client.get(f"task:{file_id}")
+                if task_info:
+                    task_data = json.loads(task_info)
+                    return task_data.get("callback_task_id", "")
+                return ""
+            else:
+                if file_id in self.task_cache:
+                    return self.task_cache[file_id].get("callback_task_id", "")
+                return ""
+
+        except Exception as e:
+            logger.error(f"获取任务信息失败: {str(e)}")
+            return ""
+
+    def cleanup_expired_cache(self):
+        """清理过期缓存(Redis自动处理)"""
+        try:
+            if not self.use_redis:
+                current_time = datetime.now()
+                expired_files = []
+
+                for file_id, task_info in list(self.task_cache.items()):
+                    created_at = datetime.fromisoformat(task_info['created_at'])
+                    if current_time - created_at > timedelta(hours=1):
+                        expired_files.append(file_id)
+
+                for file_id in expired_files:
+                    del self.task_cache[file_id]
+
+                if expired_files:
+                    logger.info(f"清理过期缓存: {len(expired_files)} 个文件")
+
+        except Exception as e:
+            logger.error(f"清理过期缓存失败: {str(e)}")

+ 288 - 0
core/base/workflow_manager.py

@@ -0,0 +1,288 @@
+"""
+基于LangGraph的工作流管理器
+负责任务的创建、编排和执行,使用LangGraph进行状态管理
+"""
+
+import asyncio
+import uuid
+from typing import Dict, Optional, TypedDict, Annotated, List
+from datetime import datetime
+from dataclasses import dataclass
+
+from langgraph.graph import StateGraph, END
+from langgraph.graph.message import add_messages
+from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
+
+from foundation.logger.loggering import server_logger as logger
+from foundation.utils.time_statistics import track_execution_time
+from .progress_manager import ProgressManager
+from .redis_duplicate_checker import RedisDuplicateChecker
+from ..construction_review.workflows import DocumentWorkflow,AIReviewWorkflow,ReportWorkflow
+
+@dataclass
+class TaskChain:
+    """任务链"""
+    callback_task_id: str
+    file_id: str
+    user_id: str
+    status: str  # processing, completed, failed
+    current_stage: str
+    created_at: datetime
+    started_at: Optional[datetime] = None
+    completed_at: Optional[datetime] = None
+    results: Dict = None
+
+    def __post_init__(self):
+        if self.results is None:
+            self.results = {}
+
+class WorkflowManager:
+    """工作流管理器"""
+
+    def __init__(self, max_concurrent_docs: int = 5, max_concurrent_reviews: int = 10):
+        self.max_concurrent_docs = max_concurrent_docs
+        self.max_concurrent_reviews = max_concurrent_reviews
+
+        # 并发控制
+        self.doc_semaphore = asyncio.Semaphore(max_concurrent_docs)
+        self.review_semaphore = asyncio.Semaphore(max_concurrent_reviews)
+
+        # 服务组件
+        self.progress_manager = ProgressManager()
+        self.redis_duplicate_checker = RedisDuplicateChecker()
+
+        # 活跃任务跟踪
+        self.active_chains: Dict[str, TaskChain] = {}
+        self._cleanup_task_started = False
+
+    async def submit_task_processing(self, file_info: dict) -> str:
+        """异步提交任务处理(用于file_upload层)"""
+        from foundation.base.tasks import submit_task_processing_task
+
+        try:
+            logger.info(f"提交文档处理任务到Celery: {file_info['file_id']}")
+
+            # 提交到Celery队列
+            task = submit_task_processing_task.delay(file_info)
+
+            logger.info(f"Celery任务已提交,Task ID: {task.id}")
+            return task.id
+
+        except Exception as e:
+            logger.error(f"提交Celery任务失败: {str(e)}")
+            raise
+    @track_execution_time   
+    def submit_task_processing_sync(self, file_info: dict) -> dict:
+        """同步提交任务处理(用于Celery worker)"""
+        try:
+
+
+            logger.info(f"提交文档处理任务: {file_info['file_id']}")
+
+            # 1. 生成任务链ID
+            callback_task_id = file_info['callback_task_id']
+
+            # 2. 创建任务链
+            task_chain = TaskChain(
+                callback_task_id=callback_task_id,
+                file_id=file_info['file_id'],
+                user_id=file_info['user_id'],
+                status="processing",
+                current_stage="document_processing",
+                created_at=datetime.now()
+            )
+
+            # 4. 注册任务
+            asyncio.run(self.redis_duplicate_checker.register_task(file_info, callback_task_id))
+            self.active_chains[callback_task_id] = task_chain
+
+            # 5. 初始化进度
+            asyncio.run(self.progress_manager.initialize_progress(
+                callback_task_id=callback_task_id,
+                user_id=file_info['user_id'],
+                stages=[
+                    {"stage_name": "文件上传", "progress": 100, "status": "completed"},
+                    {"stage_name": "文档处理", "progress": 0, "status": "pending"},
+                    {"stage_name": "AI审查", "progress": 0, "status": "pending"},
+                    {"stage_name": "报告生成", "progress": 0, "status": "pending"}
+                ]
+            ))
+
+            # 6. 启动处理流程(同步执行)
+            self._process_task_chain_sync(task_chain, file_info['file_content'], file_info['file_type'])
+            # logger.info(f"提交文档处理任务: {callback_task_id}")
+            logger.info(f"施工方案审查任务已完成! ")
+            logger.info(f"文件ID: {file_info['file_id']}")
+            logger.info(f"文件名:{file_info['file_name']}")
+
+        except Exception as e:
+            logger.error(f"提交文档处理任务失败: {str(e)}")
+            raise
+    
+
+    async def _process_task_chain(self, task_chain: TaskChain, file_content: bytes, file_type: str):
+        """处理文档任务链 - 串行执行,内部并发"""
+        try:
+            task_chain.started_at = datetime.now()
+
+            # 阶段1:文档处理(串行)
+            async with self.doc_semaphore:
+                task_chain.current_stage = "document_processing"
+
+                document_workflow = DocumentWorkflow(
+                    file_id=task_chain.file_id,
+                    callback_task_id=task_chain.callback_task_id,
+                    user_id=task_chain.user_id,
+                    progress_manager=self.progress_manager,
+                    redis_duplicate_checker=self.redis_duplicate_checker
+                )
+
+                doc_result = await document_workflow.execute(file_content, file_type)
+                task_chain.results['document'] = doc_result
+
+            # 阶段2:AI审查(内部并发)
+            task_chain.current_stage = "ai_review"
+
+            structured_content = doc_result['structured_content']
+
+            ai_workflow = AIReviewWorkflow(
+                file_id=task_chain.file_id,
+                callback_task_id=task_chain.callback_task_id,
+                user_id=task_chain.user_id,
+                structured_content=structured_content,
+                progress_manager=self.progress_manager
+            )
+
+            ai_result = await ai_workflow.execute()
+            task_chain.results['ai_review'] = ai_result
+
+            # 阶段3:报告生成(串行)
+            task_chain.current_stage = "report_generation"
+
+            report_workflow = ReportWorkflow(
+                file_id=task_chain.file_id,
+                callback_task_id=task_chain.callback_task_id,
+                user_id=task_chain.user_id,
+                ai_review_results=ai_result,
+                progress_manager=self.progress_manager
+            )
+
+            report_result = await report_workflow.execute()
+            task_chain.results['report'] = report_result
+
+            # 完成任务链
+            task_chain.status = "completed"
+            task_chain.completed_at = datetime.now()
+
+            # 清理任务注册
+            await self.redis_duplicate_checker.unregister_task(task_chain.file_id)
+
+            logger.info(f"文档处理任务链完成: {task_chain.callback_task_id}")
+
+        except Exception as e:
+            task_chain.status = "failed"
+            logger.error(f"文档处理任务链失败: {task_chain.callback_task_id}, 错误: {str(e)}")
+
+            # 清理任务注册
+            await self.redis_duplicate_checker.unregister_task(task_chain.file_id)
+
+            raise
+        finally:
+            # 清理活跃任务
+            if task_chain.callback_task_id in self.active_chains:
+                del self.active_chains[task_chain.callback_task_id]
+
+
+
+    def _process_task_chain_sync(self, task_chain: TaskChain, file_content: bytes, file_type: str):
+        """同步处理文档任务链(用于Celery worker)"""
+        try:
+            task_chain.started_at = datetime.now()
+
+            # 阶段1:文档处理(串行)
+            task_chain.current_stage = "document_processing"
+
+            document_workflow = DocumentWorkflow(
+                file_id=task_chain.file_id,
+                callback_task_id=task_chain.callback_task_id,
+                user_id=task_chain.user_id,
+                progress_manager=self.progress_manager,
+                redis_duplicate_checker=self.redis_duplicate_checker
+            )
+
+            # 同步执行文档处理
+            loop = asyncio.new_event_loop()
+            asyncio.set_event_loop(loop)
+            doc_result = loop.run_until_complete(document_workflow.execute(file_content, file_type))
+            loop.close()
+
+            task_chain.results['document'] = doc_result
+
+            # 阶段2:AI审查(内部并发)
+            task_chain.current_stage = "ai_review"
+
+            structured_content = doc_result['structured_content']
+
+            ai_workflow = AIReviewWorkflow(
+                file_id=task_chain.file_id,
+                callback_task_id=task_chain.callback_task_id,
+                user_id=task_chain.user_id,
+                structured_content=structured_content,
+                progress_manager=self.progress_manager
+            )
+
+            # 同步执行AI审查
+            loop = asyncio.new_event_loop()
+            asyncio.set_event_loop(loop)
+            ai_result = loop.run_until_complete(ai_workflow.execute())
+            loop.close()
+
+            task_chain.results['ai_review'] = ai_result
+
+            # 阶段3:报告生成(串行)
+            task_chain.current_stage = "report_generation"
+
+            report_workflow = ReportWorkflow(
+                file_id=task_chain.file_id,
+                callback_task_id=task_chain.callback_task_id,
+                user_id=task_chain.user_id,
+                ai_review_results=ai_result,
+                progress_manager=self.progress_manager
+            )
+
+            # 同步执行报告生成
+            loop = asyncio.new_event_loop()
+            asyncio.set_event_loop(loop)
+            report_result = loop.run_until_complete(report_workflow.execute())
+            loop.close()
+
+            task_chain.results['report'] = report_result
+
+            # 完成任务链
+            task_chain.status = "completed"
+            task_chain.completed_at = datetime.now()
+
+            # 清理任务注册
+            asyncio.run(self.redis_duplicate_checker.unregister_task(task_chain.file_id))
+
+            logger.info(f"文档处理任务链完成: {task_chain.callback_task_id}")
+            return task_chain.results
+
+        except Exception as e:
+            task_chain.status = "failed"
+            logger.error(f"文档处理任务链失败: {task_chain.callback_task_id}, 错误: {str(e)}")
+
+            # 清理任务注册
+            asyncio.run(self.redis_duplicate_checker.unregister_task(task_chain.file_id))
+
+            raise
+        finally:
+            # 清理活跃任务
+            if task_chain.callback_task_id in self.active_chains:
+                del self.active_chains[task_chain.callback_task_id]
+
+
+
+    async def update_task_status(self, callback_task_id: str) -> Optional[Dict]:
+        """更新任务状态"""
+        pass

+ 14 - 0
core/construction_review/component/__init__.py

@@ -0,0 +1,14 @@
+"""
+施工方案审查核心组件模块
+负责文档处理、AI审查和报告生成的核心业务逻辑
+"""
+
+from .document_processor import DocumentProcessor
+from .ai_review_engine import AIReviewEngine
+from .report_generator import ReportGenerator
+
+__all__ = [
+    'DocumentProcessor',
+    'AIReviewEngine',
+    'ReportGenerator'
+]

+ 178 - 0
core/construction_review/component/ai_review_engine.py

@@ -0,0 +1,178 @@
+"""
+AI审查引擎
+负责执行AI审查,支持审查条目并发处理
+"""
+
+import asyncio
+import time
+from typing import Dict, List, Any, Optional, Callable
+from dataclasses import dataclass
+from datetime import datetime
+
+from foundation.logger.loggering import server_logger as logger
+
+@dataclass
+class ReviewResult:
+    """审查结果"""
+    unit_index: int
+    unit_content: Dict[str, Any]
+    basic_compliance: Dict[str, Any]
+    technical_compliance: Dict[str, Any]
+    rag_enhanced: Dict[str, Any]
+    overall_risk: str
+
+class AIReviewEngine:
+    """AI审查引擎 - 支持审查条目并发"""
+
+    def __init__(self, max_concurrent_reviews: int = 20):
+        self.max_concurrent_reviews = max_concurrent_reviews
+        self.semaphore = asyncio.Semaphore(max_concurrent_reviews)
+
+    
+    async def basic_compliance_check(self, unit_content: Dict[str, Any]) -> Dict[str, Any]:
+        """基础合规性检查"""
+        # 词句语法检查
+        grammar_result = await self.check_grammar(unit_content['content'])
+
+        # 语义逻辑检查
+        semantic_result = await self.check_semantic_logic(unit_content['content'])
+
+        # 条文完整性检查
+        completeness_result = await self.check_completeness(unit_content['content'])
+
+        return {
+            'grammar_check': grammar_result,
+            'semantic_check': semantic_result,
+            'completeness_check': completeness_result,
+            'overall_score': self._calculate_basic_score(grammar_result, semantic_result, completeness_result)
+        }
+
+    async def technical_compliance_check(self, unit_content: Dict[str, Any]) -> Dict[str, Any]:
+        """技术性合规检查"""
+        # 强制性标准符合性检查
+        mandatory_result = await self.check_mandatory_standards(unit_content['content'])
+
+        # 设计值符合性检查
+        design_value_result = await self.check_design_values(unit_content['content'])
+
+        # 技术参数精确检查
+        technical_param_result = await self.check_technical_parameters(unit_content['content'])
+
+        return {
+            'mandatory_standards': mandatory_result,
+            'design_values': design_value_result,
+            'technical_parameters': technical_param_result,
+            'overall_score': self._calculate_technical_score(mandatory_result, design_value_result, technical_param_result)
+        }
+
+    async def rag_enhanced_check(self, unit_content: Dict[str, Any]) -> Dict[str, Any]:
+        """RAG增强审查"""
+        # 向量检索
+        vector_results = await self.vector_search(unit_content['content'])
+
+        # 混合检索
+        hybrid_results = await self.hybrid_search(unit_content['content'])
+
+        # 重排序
+        reranked_results = await self.rerank_results(unit_content['content'], hybrid_results)
+
+        return {
+            'vector_search': vector_results,
+            'hybrid_search': hybrid_results,
+            'reranked_results': reranked_results,
+            'enhanced_suggestions': self.generate_enhanced_suggestions(reranked_results)
+        }
+
+    # 基础合规性审查 - 原子化组件方法
+    async def check_grammar(self, content: str) -> Dict[str, Any]:
+        """语法检查"""
+        await asyncio.sleep(0.1)  # 模拟处理时间
+        return {"score": 85, "issues": []}
+
+    async def check_semantic_logic(self, content: str) -> Dict[str, Any]:
+        """语义逻辑检查"""
+        logger.info(f"开始执行语法检查,内容:{content}")
+        await asyncio.sleep(0.1)
+        return {"score": 90, "logic_issues": []}
+
+    async def check_completeness(self, content: str) -> Dict[str, Any]:
+        """完整性检查"""
+        logger.info(f"开始执行完整性检查,内容:{content}")
+        await asyncio.sleep(0.1)
+        return {"score": 88, "missing_items": []}
+
+    async def check_mandatory_standards(self, content: str) -> Dict[str, Any]:
+        """强制性标准检查"""
+        logger.info(f"开始执行强制性标准检查,内容:{content}")
+        await asyncio.sleep(0.3)
+        return {"compliance_rate": 92, "violations": []}
+
+    async def check_design_values(self, content: str) -> Dict[str, Any]:
+        """设计值检查"""
+        await asyncio.sleep(0.2)
+        return {"accuracy": 87, "deviations": []}
+
+    async def check_technical_parameters(self, content: str) -> Dict[str, Any]:
+        """技术参数检查"""
+        await asyncio.sleep(0.2)
+        return {"precision": 90, "errors": []}
+
+    # RAG检索增强 - 原子化组件方法
+    async def vector_search(self, content: str) -> List[Dict[str, Any]]:
+        """向量检索"""
+        await asyncio.sleep(0.1)
+        return [{"similarity": 0.85, "content": "相关标准1"}, {"similarity": 0.78, "content": "相关标准2"}]
+
+    async def hybrid_search(self, content: str) -> List[Dict[str, Any]]:
+        """混合检索"""
+        await asyncio.sleep(0.2)
+        return [{"score": 0.88, "content": "混合检索结果1"}, {"score": 0.82, "content": "混合检索结果2"}]
+
+    async def rerank_results(self, content: str, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+        """重排序"""
+        await asyncio.sleep(0.1)
+        return sorted(results, key=lambda x: x.get('score', 0), reverse=True)[:5]
+
+    def generate_enhanced_suggestions(self, results: List[Dict[str, Any]]) -> List[str]:
+        """生成增强建议"""
+        suggestions = []
+        for result in results:
+            suggestions.append(f"基于{result.get('content', '相关内容')}的建议")
+        return suggestions
+
+    def _calculate_basic_score(self, grammar: Dict, semantic: Dict, completeness: Dict) -> float:
+        """计算基础合规性得分"""
+        return (grammar.get('score', 0) + semantic.get('score', 0) + completeness.get('score', 0)) / 3
+
+    def _calculate_technical_score(self, mandatory: Dict, design: Dict, technical: Dict) -> float:
+        """计算技术合规性得分"""
+        return (mandatory.get('compliance_rate', 0) + design.get('accuracy', 0) + technical.get('precision', 0)) / 3
+
+    def _calculate_overall_risk(self, basic: Dict, technical: Dict, rag: Dict) -> str:
+        """计算总体风险等级"""
+        basic_score = basic.get('overall_score', 0)
+        technical_score = technical.get('overall_score', 0)
+
+        avg_score = (basic_score + technical_score) / 2
+
+        if avg_score >= 90:
+            return "low"
+        elif avg_score >= 70:
+            return "medium"
+        else:
+            return "high"
+
+    def _aggregate_results(self, results: List[ReviewResult]) -> Dict[str, Any]:
+        """汇总审查结果"""
+        risk_counts = {"high": 0, "medium": 0, "low": 0}
+
+        for result in results:
+            risk_counts[result.overall_risk] += 1
+
+        return {
+            "risk_distribution": risk_counts,
+            "total_issues": len([r for r in results if r.overall_risk != "low"]),
+            "high_risk_count": risk_counts["high"],
+            "medium_risk_count": risk_counts["medium"],
+            "low_risk_count": risk_counts["low"]
+        }

+ 173 - 0
core/construction_review/component/document_processor.py

@@ -0,0 +1,173 @@
+"""
+文档处理器
+负责文档解析、内容提取和结构化处理
+"""
+
+import io   
+from docx import Document
+from typing import Dict, Any, Optional, Callable
+from datetime import datetime
+
+from foundation.logger.loggering import server_logger as logger
+
+from langchain_community.document_loaders import PyPDFLoader
+from langchain.text_splitter import RecursiveCharacterTextSplitter
+
+class DocumentProcessor:
+    """文档处理器"""
+
+    def __init__(self):
+        self.supported_types = ['pdf', 'docx']
+
+    async def process_document(self, file_content: bytes, file_type: str,
+                             progress_callback: Optional[Callable[[int, str], None]] = None) -> Dict[str, Any]:
+        """
+        处理文档
+
+        Args:
+            file_content: 文件内容
+            file_type: 文件类型
+            progress_callback: 进度回调函数
+
+        Returns:
+            Dict: 解析结果
+        """
+        try:
+            logger.info(f"开始处理文档,类型: {file_type}")
+
+            # 简化处理:直接解析
+            if file_type.lower() == 'pdf':
+                result = await self.parse_pdf_content(file_content)
+            elif file_type.lower() == 'docx':
+                result = await self.parse_docx_content(file_content)
+            else:
+                raise ValueError(f"不支持的文件类型: {file_type}")
+
+            # 结构化内容
+            structured_result = self.structure_content(result)
+
+            if progress_callback:
+                progress_callback(100, "文档处理完成")
+
+            return structured_result
+
+        except Exception as e:
+            logger.error(f"文档处理失败: {str(e)}")
+            raise
+
+    async def parse_pdf_content(self, file_content: bytes) -> Dict[str, Any]:
+        """解析PDF内容"""
+        try:
+            # 保存到临时文件
+            import tempfile
+            with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file:
+                temp_file.write(file_content)
+                temp_file_path = temp_file.name
+
+            # 使用PyPDFLoader解析
+            loader = PyPDFLoader(temp_file_path)
+            documents = loader.load()
+
+            # 文本分块
+            text_splitter = RecursiveCharacterTextSplitter(
+                chunk_size=1000,
+                chunk_overlap=200,
+                separators=["\n\n", "\n", " ", ""]
+            )
+            splits = text_splitter.split_documents(documents)
+
+            logger.info(f"PDF解析完成,分块数量: {len(splits)}")
+
+            return {
+                'document_type': 'pdf',
+                'total_pages': len(documents),
+                'total_chunks': len(splits),
+                'chunks': [
+                    {
+                        'page': doc.metadata.get('page', 0),
+                        'content': doc.page_content,
+                        'metadata': doc.metadata
+                    }
+                    for doc in documents
+                ],
+                'splits': [
+                    {
+                        'content': split.page_content,
+                        'metadata': split.metadata
+                    }
+                    for split in splits
+                ]
+            }
+
+        except Exception as e:
+            logger.error(f"PDF解析失败: {str(e)}")
+            raise
+
+    async def parse_docx_content(self, file_content: bytes) -> Dict[str, Any]:
+        """解析DOCX内容"""
+        try:
+            # 简化实现:直接返回文本内容
+            # 实际实现中可以使用python-docx库
+
+
+            doc = Document(io.BytesIO(file_content))
+            full_text = '\n'.join([paragraph.text for paragraph in doc.paragraphs])
+
+            # 简单分块
+            chunks = []
+            chunk_size = 1000
+            for i in range(0, len(full_text), chunk_size):
+                chunk_text = full_text[i:i+chunk_size]
+                chunks.append({
+                    'chunk_id': f'chunk_{i+1}',
+                    'content': chunk_text,
+                    'metadata': {'chunk_index': i+1}
+                })
+
+            logger.info(f"DOCX解析完成,分块数量: {len(chunks)}")
+
+            return {
+                'document_type': 'docx',
+                'total_chunks': len(chunks),
+                'full_text': full_text,
+                'chunks': chunks,
+                'metadata': {
+                    'paragraphs_count': len(doc.paragraphs),
+                    'word_count': len(full_text.split())
+                }
+            }
+
+        except Exception as e:
+            logger.error(f"DOCX解析失败: {str(e)}")
+            raise
+
+    def structure_content(self, raw_content: Dict[str, Any]) -> Dict[str, Any]:
+        """结构化处理"""
+        try:
+            if raw_content['document_type'] == 'pdf':
+                # PDF结构化
+                chunks = []
+                for i, chunk in enumerate(raw_content['chunks']):
+                    chunks.append({
+                        'chunk_id': f'chunk_{i+1}',
+                        'page': chunk['page'],
+                        'content': chunk['content'],
+                        'chapter': f'第{chunk["page"]}页',
+                        'title': f'内容块{i+1}',
+                        'original_content': chunk['content'][:100] + '...' if len(chunk['content']) > 100 else chunk['content']
+                    })
+            else:
+                # DOCX结构化
+                chunks = raw_content.get('chunks', [])
+
+            return {
+                'document_name': f"施工方案文档_{raw_content.get('document_type', 'unknown')}",
+                'document_type': raw_content['document_type'],
+                'total_chunks': raw_content.get('total_chunks', 0),
+                'chunks': chunks,
+                'metadata': raw_content.get('metadata', {})
+            }
+
+        except Exception as e:
+            logger.error(f"内容结构化失败: {str(e)}")
+            raise

+ 292 - 0
core/construction_review/component/report_generator.py

@@ -0,0 +1,292 @@
+"""
+报告生成器
+负责生成审查报告和多维评分
+"""
+
+import asyncio
+from typing import Dict, List, Any, Optional, Callable
+from dataclasses import dataclass
+from datetime import datetime
+import json
+
+from foundation.logger.loggering import server_logger as logger
+
+@dataclass
+class DimensionScores:
+    """四维评分"""
+    safety: int      # 安全维度评分
+    quality: int     # 质量维度评分
+    schedule: int    # 进度维度评分
+    cost: int        # 成本维度评分
+
+@dataclass
+class FinalReport:
+    """最终报告"""
+    file_id: str
+    document_name: str
+    risk_stats: Dict[str, int]
+    dimension_scores: DimensionScores
+    summary_report: str
+    multidimensional_report: str
+    recommendations: List[str]
+    generated_at: datetime
+
+class ReportGenerator:
+    """报告生成器"""
+
+    def __init__(self):
+        self.risk_weight_mapping = {
+            "high": 3,
+            "medium": 2,
+            "low": 1
+        }
+
+    async def generate_report(self, file_id: str, review_results: Dict[str, Any],
+                            progress_callback: Optional[Callable[[int, str], None]] = None) -> FinalReport:
+        """
+        生成审查报告
+
+        Args:
+            file_id: 文件ID
+            review_results: AI审查结果
+            progress_callback: 进度回调函数
+
+        Returns:
+            FinalReport: 最终报告
+        """
+        try:
+            logger.info(f"开始生成报告,文件ID: {file_id}")
+
+            if progress_callback:
+                progress_callback(20, "汇总审查结果")
+
+            # 1. 数据汇总
+            aggregated_data = await self._aggregate_results(review_results)
+
+            if progress_callback:
+                progress_callback(40, "计算多维评分")
+
+            # 2. 计算四维评分
+            dimension_scores = await self._calculate_dimension_scores(aggregated_data)
+
+            if progress_callback:
+                progress_callback(60, "生成总结报告")
+
+            # 3. 生成总结报告
+            summary_report = await self._generate_summary(aggregated_data, dimension_scores)
+
+            if progress_callback:
+                progress_callback(80, "生成多维报告")
+
+            # 4. 生成多维报告
+            multidimensional_report = await self._generate_multidimensional_report(
+                aggregated_data, dimension_scores
+            )
+
+            if progress_callback:
+                progress_callback(100, "报告生成完成")
+
+            # 5. 生成建议
+            recommendations = await self._generate_recommendations(aggregated_data, dimension_scores)
+
+            final_report = FinalReport(
+                file_id=file_id,
+                document_name=f"施工方案审查报告_{file_id}",
+                risk_stats=aggregated_data.get('risk_distribution', {}),
+                dimension_scores=dimension_scores,
+                summary_report=summary_report,
+                multidimensional_report=multidimensional_report,
+                recommendations=recommendations,
+                generated_at=datetime.now()
+            )
+
+            logger.info(f"报告生成完成,文件ID: {file_id}")
+            return final_report
+
+        except Exception as e:
+            logger.error(f"报告生成失败: {str(e)}")
+            raise
+
+    async def _aggregate_results(self, review_results: Dict[str, Any]) -> Dict[str, Any]:
+        """汇总审查结果"""
+        summary = review_results.get('summary', {})
+        risk_stats = summary.get('risk_distribution', {})
+
+        # 按风险等级分类问题
+        issues_by_risk = {
+            "high": [],
+            "medium": [],
+            "low": []
+        }
+
+        # 分析每个审查单元的问题
+        for result in review_results.get('review_results', []):
+            risk_level = result.overall_risk
+            issues_by_risk[risk_level].append({
+                'unit_index': result.unit_index,
+                'chapter': result.unit_content.get('chapter', ''),
+                'title': result.unit_content.get('title', ''),
+                'content': result.unit_content.get('original_content', ''),
+                'basic_issues': self._extract_issues(result.basic_compliance),
+                'technical_issues': self._extract_issues(result.technical_compliance),
+                'rag_suggestions': result.rag_enhanced.get('enhanced_suggestions', [])
+            })
+
+        return {
+            'risk_distribution': risk_stats,
+            'issues_by_risk': issues_by_risk,
+            'total_units': review_results.get('total_units', 0),
+            'successful_units': review_results.get('successful_units', 0),
+            'failed_units': review_results.get('failed_units', 0)
+        }
+
+    async def _calculate_dimension_scores(self, aggregated_data: Dict[str, Any]) -> DimensionScores:
+        """计算四维评分"""
+        risk_stats = aggregated_data.get('risk_distribution', {})
+        total_issues = risk_stats.get('high', 0) + risk_stats.get('medium', 0) + risk_stats.get('low', 0)
+        total_units = aggregated_data.get('total_units', 1)
+
+        # 基础评分计算
+        base_score = max(0, 100 - (risk_stats.get('high', 0) * 10) - (risk_stats.get('medium', 0) * 5) - (risk_stats.get('low', 0) * 2))
+
+        # 安全维度评分 (安全风险权重更高)
+        safety_score = max(0, base_score - (risk_stats.get('high', 0) * 15) - (risk_stats.get('medium', 0) * 8))
+
+        # 质量维度评分
+        quality_score = max(0, base_score - (risk_stats.get('high', 0) * 12) - (risk_stats.get('medium', 0) * 6))
+
+        # 进度维度评分
+        schedule_score = max(0, base_score - (risk_stats.get('medium', 0) * 8) - (risk_stats.get('low', 0) * 3))
+
+        # 成本维度评分
+        cost_score = max(0, base_score - (risk_stats.get('high', 0) * 10) - (risk_stats.get('medium', 0) * 5))
+
+        return DimensionScores(
+            safety=min(100, safety_score),
+            quality=min(100, quality_score),
+            schedule=min(100, schedule_score),
+            cost=min(100, cost_score)
+        )
+
+    async def _generate_summary(self, aggregated_data: Dict[str, Any], dimension_scores: DimensionScores) -> str:
+        """生成总结报告"""
+        risk_stats = aggregated_data.get('risk_distribution', {})
+        high_risk = risk_stats.get('high', 0)
+        medium_risk = risk_stats.get('medium', 0)
+        low_risk = risk_stats.get('low', 0)
+        total_issues = high_risk + medium_risk + low_risk
+
+        # 生成总结报告
+        summary_parts = []
+
+        # 整体评价
+        if high_risk == 0 and medium_risk <= 2:
+            summary_parts.append("该施工方案整体符合规范要求,质量良好。")
+        elif high_risk <= 2:
+            summary_parts.append("该施工方案基本符合规范要求,存在少量问题需要整改。")
+        else:
+            summary_parts.append("该施工方案存在较多高风险问题,需要重点整改。")
+
+        # 风险统计
+        summary_parts.append(f"发现风险问题 {total_issues} 个,其中高风险 {high_risk} 个,中风险 {medium_risk} 个,低风险 {low_risk} 个。")
+
+        # 评分说明
+        avg_score = (dimension_scores.safety + dimension_scores.quality +
+                     dimension_scores.schedule + dimension_scores.cost) / 4
+
+        if avg_score >= 85:
+            summary_parts.append(f"综合评分 {avg_score:.1f} 分,表现优秀。")
+        elif avg_score >= 70:
+            summary_parts.append(f"综合评分 {avg_score:.1f} 分,表现良好。")
+        else:
+            summary_parts.append(f"综合评分 {avg_score:.1f} 分,需要改进。")
+
+        return " ".join(summary_parts)
+
+    async def _generate_multidimensional_report(self, aggregated_data: Dict[str, Any],
+                                              dimension_scores: DimensionScores) -> str:
+        """生成多维报告"""
+        report_parts = []
+
+        # 安全维度分析
+        safety_desc = self._get_score_description(dimension_scores.safety, "安全")
+        report_parts.append(f"安全维度评分 {dimension_scores.safety} 分:{safety_desc}")
+
+        # 质量维度分析
+        quality_desc = self._get_score_description(dimension_scores.quality, "质量")
+        report_parts.append(f"质量维度评分 {dimension_scores.quality} 分:{quality_desc}")
+
+        # 进度维度分析
+        schedule_desc = self._get_score_description(dimension_scores.schedule, "进度")
+        report_parts.append(f"进度维度评分 {dimension_scores.schedule} 分:{schedule_desc}")
+
+        # 成本维度分析
+        cost_desc = self._get_score_description(dimension_scores.cost, "成本")
+        report_parts.append(f"成本维度评分 {dimension_scores.cost} 分:{cost_desc}")
+
+        # 综合建议
+        report_parts.append("\n综合建议:")
+        if dimension_scores.safety < 70:
+            report_parts.append("• 重点关注安全管理,完善安全技术措施。")
+        if dimension_scores.quality < 70:
+            report_parts.append("• 加强质量控制,完善施工工艺标准。")
+        if dimension_scores.schedule < 70:
+            report_parts.append("• 优化进度管理,确保工期可控。")
+        if dimension_scores.cost < 70:
+            report_parts.append("• 加强成本控制,避免预算超支。")
+
+        return "\n".join(report_parts)
+
+    async def _generate_recommendations(self, aggregated_data: Dict[str, Any],
+                                       dimension_scores: DimensionScores) -> List[str]:
+        """生成改进建议"""
+        recommendations = []
+
+        # 基于风险分布的建议
+        risk_stats = aggregated_data.get('risk_distribution', {})
+        high_risk_issues = aggregated_data.get('issues_by_risk', {}).get('high', [])
+
+        if high_risk_issues:
+            recommendations.append("立即处理高风险问题:")
+            for issue in high_risk_issues[:3]:  # 取前3个高风险问题
+                recommendations.append(f"• {issue.get('title', '未知章节')}:{issue.get('content', '')[:50]}...")
+
+        # 基于评分的建议
+        if dimension_scores.safety < 80:
+            recommendations.append("完善安全管理体系:")
+            recommendations.append("• 补充安全技术交底记录")
+            recommendations.append("• 加强现场安全防护措施")
+
+        if dimension_scores.quality < 80:
+            recommendations.append("提升施工质量:")
+            recommendations.append("• 完善施工工艺标准")
+            recommendations.append("• 加强质量检查频次")
+
+        if dimension_scores.schedule < 80:
+            recommendations.append("优化进度管理:")
+            recommendations.append("• 细化施工进度计划")
+            recommendations.append("• 建立进度预警机制")
+
+        return recommendations
+
+    def _extract_issues(self, compliance_result: Dict[str, Any]) -> List[str]:
+        """提取问题描述"""
+        if isinstance(compliance_result, dict) and 'error' in compliance_result:
+            return [f"检查失败: {compliance_result['error']}"]
+
+        # 根据实际审查结果结构提取问题
+        # 这里需要根据实际的审查结果结构来实现
+        return []
+
+    def _get_score_description(self, score: int, dimension: str) -> str:
+        """获取评分描述"""
+        if score >= 90:
+            return f"{dimension}管理优秀,符合规范要求,无明显风险。"
+        elif score >= 80:
+            return f"{dimension}管理良好,基本符合规范,存在少量细节问题。"
+        elif score >= 70:
+            return f"{dimension}管理一般,需要改进部分环节。"
+        elif score >= 60:
+            return f"{dimension}管理较差,存在较多问题需要整改。"
+        else:
+            return f"{dimension}管理差,存在严重问题,需要全面整改。"

+ 14 - 0
core/construction_review/workflows/__init__.py

@@ -0,0 +1,14 @@
+"""
+工作流模块
+包含文档处理、AI审查和报告生成的工作流实现
+"""
+
+from .document_workflow import DocumentWorkflow
+from .ai_review_workflow import AIReviewWorkflow
+from .report_workflow import ReportWorkflow
+
+__all__ = [
+    'DocumentWorkflow',
+    'AIReviewWorkflow',
+    'ReportWorkflow'
+]

+ 0 - 6
core/construction_review/workflows/ai_review.py

@@ -1,6 +0,0 @@
-
-
-from pydantic import BaseModel
-
-class ReviewWorkflow(BaseModel):
-    

+ 402 - 0
core/construction_review/workflows/ai_review_workflow.py

@@ -0,0 +1,402 @@
+"""
+基于LangGraph的AI审查工作流
+负责AI审查的流程控制和业务编排,使用LangGraph进行状态管理
+"""
+
+import asyncio
+import json
+from dataclasses import asdict
+import time
+from typing import Optional, Callable, Dict, Any, TypedDict, Annotated, List
+from dataclasses import dataclass
+from langgraph.graph import StateGraph, END
+from langgraph.graph.message import add_messages
+from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
+from foundation.logger.loggering import server_logger as logger
+from foundation.utils.time_statistics import track_execution_time
+from ..component import AIReviewEngine
+
+
+@dataclass
+class ReviewResult:
+    """审查结果"""
+    unit_index: int
+    unit_content: Dict[str, Any]
+    basic_compliance: Dict[str, Any]
+    technical_compliance: Dict[str, Any]
+    rag_enhanced: Dict[str, Any]
+    overall_risk: str
+
+class AIReviewState(TypedDict):
+    """AI审查工作流状态"""
+    # 基本信息
+    file_id: str
+    callback_task_id: str
+    user_id: str
+    structured_content: Dict[str, Any]
+
+    # AI审查结果
+    review_results: Optional[Dict[str, Any]]
+
+    # 状态和进度
+    current_stage: str
+    status: str
+    error_message: Optional[str]
+
+    # 进度管理
+    progress_manager: Optional[Any]
+
+    # 消息日志(用于LangGraph状态追踪)
+    messages: Annotated[List[BaseMessage], add_messages]
+
+
+class AIReviewWorkflow:
+    """基于LangGraph的AI审查工作流"""
+
+    def __init__(self, file_id: str, callback_task_id: str, user_id: str,
+                 structured_content: Dict[str, Any], progress_manager=None):
+        self.file_id = file_id
+        self.callback_task_id = callback_task_id
+        self.user_id = user_id
+        self.structured_content = structured_content
+        self.progress_manager = progress_manager
+        self.ai_review_engine = AIReviewEngine()
+
+        # 构建LangGraph工作流
+        self.graph = self._build_workflow()
+
+    def _build_workflow(self) -> StateGraph:
+        """构建AI审查的LangGraph工作流图"""
+        workflow = StateGraph(AIReviewState)
+
+        # 添加节点
+        workflow.add_node("start", self._start_node)
+        workflow.add_node("initialize_progress", self._initialize_progress_node)
+        workflow.add_node("ai_review", self._ai_review_node)
+        workflow.add_node("complete", self._complete_node)
+        workflow.add_node("error_handler", self._error_handler_node)
+
+        # 设置入口点
+        workflow.set_entry_point("start")
+
+        # 添加边(定义流程)
+        workflow.add_edge("start", "initialize_progress")
+        workflow.add_edge("initialize_progress", "ai_review")
+        workflow.add_edge("ai_review", "complete")
+        workflow.add_edge("complete", END)
+        workflow.add_edge("error_handler", END)
+
+        # 添加条件边(错误处理)
+        workflow.add_conditional_edges(
+            "ai_review",
+            self._check_ai_review_result,
+            {
+                "success": "complete",
+                "error": "error_handler"
+            }
+        )
+
+        self.graph = workflow.compile()
+        self._get_workflow_graph()
+
+        return self.graph
+
+    async def execute(self) -> dict:
+        """执行基于LangGraph的AI审查工作流"""
+        try:
+            logger.info(f"开始AI审查工作流,文件ID: {self.file_id}")
+
+            # 初始状态
+            initial_state = AIReviewState(
+                file_id=self.file_id,
+                callback_task_id=self.callback_task_id,
+                user_id=self.user_id,
+                structured_content=self.structured_content,
+                review_results=None,
+                current_stage="start",
+                status="processing",
+                error_message=None,
+                progress_manager=self.progress_manager,
+                messages=[HumanMessage(content=f"开始AI审查: {self.file_id}")]
+            )
+
+            # 执行LangGraph工作流
+            result = await self.graph.ainvoke(initial_state)
+
+            logger.info(f"LangGraph AI审查工作流完成,文件ID: {self.file_id}")
+            review_results = {
+                'file_id': result['file_id'],
+                'total_units': result['review_results'].get('total_units', 0) if result['review_results'] else 0,
+                'successful_units': result['review_results'].get('successful_units', 0) if result['review_results'] else 0,
+                'failed_units': result['review_results'].get('failed_units', 0) if result['review_results'] else 0,
+                'review_results': result['review_results'].get('review_results', []) if result['review_results'] else [],
+                'summary': result['review_results'].get('summary', {}) if result['review_results'] else {},
+                'status': result['status']
+            }
+
+            logger.info(f"保存审查结果")
+            with open('temp/AI审查结果.json', "w",encoding='utf-8') as f:
+                json.dump(result, f, ensure_ascii=False, indent=2, default=str)
+
+            return review_results
+
+        except Exception as e:
+            logger.error(f"LangGraph AI审查工作流执行失败: {str(e)}")
+            raise
+
+    # ========== LangGraph节点实现 ==========
+
+    async def _start_node(self, state: AIReviewState) -> AIReviewState:
+        """开始节点"""
+        logger.info(f"AI审查工作流启动: {state['file_id']}")
+
+        state["current_stage"] = "start"
+        state["status"] = "processing"
+        state["messages"].append(AIMessage(content="AI审查工作流启动"))
+
+        return state
+
+    async def _initialize_progress_node(self, state: AIReviewState) -> AIReviewState:
+        """初始化进度节点"""
+        logger.info(f"初始化AI审查进度: {state['file_id']}")
+
+        state["current_stage"] = "initialize_progress"
+
+        # 更新进度
+        if state["progress_manager"]:
+            await state["progress_manager"].update_stage_progress(
+                callback_task_id=state["callback_task_id"],
+                stage_name="AI审查",
+                progress=0,
+                status="processing",
+                message="开始AI审查"
+            )
+
+        state["messages"].append(AIMessage(content="进度初始化完成"))
+
+        return state
+    
+    async def _ai_review_node(self, state: AIReviewState) -> AIReviewState:
+        """AI审查节点 - 使用LangGraph编排原子化组件方法"""
+        try:
+            logger.info(f"执行AI审查: {state['file_id']}")
+
+            state["current_stage"] = "ai_review"
+
+            total_units = len(state['structured_content']['chunks'])
+            completed_units = 0
+
+            # 进度回调函数
+            def progress_callback(progress: int, message: str):
+                # 将AI审查的进度映射到整体进度
+                overall_progress = 50 + int(progress * 0.4)  # AI审查占整体进度的40%
+                if state["progress_manager"]:
+                    asyncio.create_task(
+                        state["progress_manager"].update_stage_progress(
+                            callback_task_id=state["callback_task_id"],
+                            stage_name="AI审查",
+                            progress=overall_progress,
+                            status="processing",
+                            message=message
+                        )
+                    )
+
+            # 使用原子化组件方法审查单个单元
+            async def review_single_unit(unit_content: Dict[str, Any], unit_index: int) -> ReviewResult:
+                """使用LangGraph编排的原子化组件方法审查单个单元"""
+                async with self.ai_review_engine.semaphore:
+                    try:
+                        # 并发执行各种原子化审查方法
+                        review_tasks = [
+                            self.ai_review_engine.basic_compliance_check(unit_content),
+                            self.ai_review_engine.technical_compliance_check(unit_content),
+                            self.ai_review_engine.rag_enhanced_check(unit_content)
+                        ]
+
+                        # 等待所有审查完成
+                        review_results = await asyncio.gather(*review_tasks, return_exceptions=True)
+
+                        # 处理异常结果
+                        basic_result = review_results[0] if not isinstance(review_results[0], Exception) else {"error": str(review_results[0])}
+                        technical_result = review_results[1] if not isinstance(review_results[1], Exception) else {"error": str(review_results[1])}
+                        rag_result = review_results[2] if not isinstance(review_results[2], Exception) else {"error": str(review_results[2])}
+
+                        # 计算总体风险等级
+                        overall_risk = self._calculate_overall_risk(basic_result, technical_result, rag_result)
+
+                        # 更新进度
+                        nonlocal completed_units
+                        completed_units += 1
+                        progress = int((completed_units / total_units) * 100)
+                        message = f"已完成 {completed_units}/{total_units} 个审查单元"
+
+                        if progress_callback:
+                            progress_callback(progress, message)
+
+                        return ReviewResult(
+                            unit_index=unit_index,
+                            unit_content=unit_content,
+                            basic_compliance=basic_result,
+                            technical_compliance=technical_result,
+                            rag_enhanced=rag_result,
+                            overall_risk=overall_risk
+                        )
+
+                    except Exception as e:
+                        logger.error(f"审查单元 {unit_index} 失败: {str(e)}")
+                        return ReviewResult(
+                            unit_index=unit_index,
+                            unit_content=unit_content,
+                            basic_compliance={"error": str(e)},
+                            technical_compliance={"error": str(e)},
+                            rag_enhanced={"error": str(e)},
+                            overall_risk="error"
+                        )
+
+            # 并发审查所有单元
+            review_tasks = [
+                asyncio.create_task(review_single_unit(content, i))
+                for i, content in enumerate(state['structured_content']['chunks'])
+            ]
+
+            # 等待所有审查完成
+            all_results = await asyncio.gather(*review_tasks)
+
+            # 过滤成功结果
+            successful_results = [result for result in all_results if result.overall_risk != "error"]
+
+            # 汇总结果
+            summary = self._aggregate_results(successful_results)
+
+            review_results = {
+                'total_units': total_units,
+                'successful_units': len(successful_results),
+                'failed_units': total_units - len(successful_results),
+                'review_results': successful_results,
+                'summary': summary
+            }
+
+            state["review_results"] = review_results
+            state["messages"].append(AIMessage(
+                content=f"AI审查完成,共处理{total_units}个单元,成功{len(successful_results)}个"
+            ))
+
+            return state
+
+        except Exception as e:
+            logger.error(f"AI审查失败: {str(e)}")
+            state["error_message"] = str(e)
+            state["messages"].append(AIMessage(content=f"AI审查失败: {str(e)}"))
+            return state
+
+    async def _complete_node(self, state: AIReviewState) -> AIReviewState:
+        """完成节点"""
+        logger.info(f"AI审查完成: {state['file_id']}")
+
+        state["current_stage"] = "complete"
+        state["status"] = "completed"
+
+        # 更新完成状态
+        if state["progress_manager"]:
+            await state["progress_manager"].update_stage_progress(
+                callback_task_id=state["callback_task_id"],
+                stage_name="AI审查",
+                progress=90,
+                status="completed",
+                message="AI审查完成"
+            )
+
+        state["messages"].append(AIMessage(content="AI审查工作流完成"))
+
+        return state
+
+    async def _error_handler_node(self, state: AIReviewState) -> AIReviewState:
+        """错误处理节点"""
+        logger.error(f"AI审查错误处理: {state['file_id']}, 错误: {state['error_message']}")
+
+        state["status"] = "failed"
+        state["current_stage"] = "error_handler"
+
+        # 更新错误状态
+        if state["progress_manager"]:
+            await state["progress_manager"].update_stage_progress(
+                callback_task_id=state["callback_task_id"],
+                stage_name="AI审查",
+                progress=50,
+                status="failed",
+                message=f"AI审查失败: {state['error_message']}"
+            )
+
+        state["messages"].append(AIMessage(
+            content=f"错误处理: {state['error_message']}"
+        ))
+
+        return state
+
+    # ========== 辅助方法 ==========
+
+    def _calculate_overall_risk(self, basic_result: Dict, technical_result: Dict, rag_result: Dict) -> str:
+        """计算总体风险等级"""
+        try:
+            # 基于各种审查结果计算风险等级
+            basic_score = basic_result.get('overall_score', 0)
+            technical_score = technical_result.get('overall_score', 0)
+
+            if basic_score >= 90 and technical_score >= 90:
+                return "low"
+            elif basic_score >= 70 and technical_score >= 70:
+                return "medium"
+            else:
+                return "high"
+        except:
+            return "medium"
+
+    def _aggregate_results(self, successful_results: List[ReviewResult]) -> Dict[str, Any]:
+        """汇总审查结果"""
+        try:
+            if not successful_results:
+                return {}
+
+            # 计算统计数据
+            risk_stats = {"low": 0, "medium": 0, "high": 0, "error": 0}
+            for result in successful_results:
+                risk_stats[result.overall_risk] += 1
+
+            # 计算平均分
+            total_basic_score = sum(r.basic_compliance.get('overall_score', 0) for r in successful_results)
+            total_technical_score = sum(r.technical_compliance.get('overall_score', 0) for r in successful_results)
+
+            avg_basic_score = total_basic_score / len(successful_results)
+            avg_technical_score = total_technical_score / len(successful_results)
+
+            return {
+                'risk_stats': risk_stats,
+                'avg_basic_score': avg_basic_score,
+                'avg_technical_score': avg_technical_score,
+                'total_reviewed': len(successful_results)
+            }
+        except Exception as e:
+            logger.error(f"结果汇总失败: {str(e)}")
+            return {}
+
+    # ========== 条件边函数 ==========
+
+    def _check_ai_review_result(self, state: AIReviewState) -> str:
+        """检查AI审查结果"""
+        if state.get("error_message"):
+            return "error"
+        return "success"
+
+    def _get_workflow_graph(self):
+        """获取工作流图(可视化输出)"""
+        grandalf_graph = self.graph.get_graph()
+        grandalf_graph.print_ascii()
+ 
+
+
+
+    async def _get_status(self) -> dict:
+        """获取工作流状态"""
+        if self.progress_manager:
+            return await self.progress_manager.get_progress(self.callback_task_id)
+        return {}

+ 0 - 32
core/construction_review/workflows/document_ans.py

@@ -1,32 +0,0 @@
-
-
-# 文档解析流程
-from langchain_community.document_loaders import PyPDFLoader  # 加载PDF文件
-from langchain.text_splitter import RecursiveCharacterTextSplitter  # 文本分块
-from foundation.logger.loggering import server_logger
-
-logger = server_logger
-
-
-class DocumentParse:
-
-    """
-    文档解析
-    """
-
-    @staticmethod
-    def document_parse(file_path):
-        # 1. 加载PDF
-        loader = PyPDFLoader(file_path)
-        documents = loader.load()
-        
-        # 2. 文本分块
-        text_splitter = RecursiveCharacterTextSplitter(
-            chunk_size=1000,  # 块大小
-            chunk_overlap=20,  # 块重叠
-            separators=["\n\n", "\n", " ", ""]  # 分块分隔符
-        )
-        splits = text_splitter.split_documents(documents)  # 得到分块后的文档
-        logger.info(f"加载的: {len(splits)}条审查条款")
-        return splits
-

+ 100 - 0
core/construction_review/workflows/document_workflow.py

@@ -0,0 +1,100 @@
+"""
+文档处理工作流
+负责文档处理的流程控制和业务编排
+"""
+
+import asyncio
+from typing import Optional, Callable
+from datetime import datetime
+
+from foundation.logger.loggering import server_logger as logger
+from ..component import DocumentProcessor
+
+class DocumentWorkflow:
+    """文档处理工作流"""
+
+    def __init__(self, file_id: str, callback_task_id: str, user_id: str,
+                 progress_manager=None, redis_duplicate_checker=None):
+        self.file_id = file_id
+        self.callback_task_id = callback_task_id
+        self.user_id = user_id
+        self.progress_manager = progress_manager
+        self.redis_duplicate_checker = redis_duplicate_checker
+        self.document_processor = DocumentProcessor()
+
+    async def execute(self, file_content: bytes, file_type: str) -> dict:
+        """执行文档处理工作流"""
+        try:
+            logger.info(f"开始文档处理工作流,文件ID: {self.file_id}")
+
+            # 2. 初始化进度
+            await self.progress_manager.initialize_progress(
+                callback_task_id=self.callback_task_id,
+                user_id=self.user_id,
+                stages=[
+                    {"stage_name": "文档上传", "progress": 100, "status": "completed"},
+                    {"stage_name": "文档解析", "progress": 0, "status": "pending"},
+                    {"stage_name": "内容提取", "progress": 0, "status": "pending"},
+                    {"stage_name": "结构化处理", "progress": 0, "status": "pending"}
+                ]
+            )
+
+            # 4. 执行文档处理
+            def progress_callback(progress: int, message: str):
+                asyncio.create_task(
+                    self.progress_manager.update_stage_progress(
+                        callback_task_id=self.callback_task_id,
+                        stage_name="文档处理",
+                        progress=progress,
+                        status="processing",
+                        message=message
+                    )
+                )
+
+            structured_content = await self.document_processor.process_document(
+                file_content=file_content,
+                file_type=file_type,
+                progress_callback=progress_callback
+            )
+
+            # 5. 更新完成状态
+            await self.progress_manager.update_stage_progress(
+                callback_task_id=self.callback_task_id,
+                stage_name="文档处理",
+                progress=100,
+                status="completed",
+                message="文档处理完成"
+            )
+
+            # 6. 保存处理结果
+            result = {
+                'file_id': self.file_id,
+                'structured_content': structured_content,
+                'document_name': structured_content['document_name'],
+                'total_chunks': structured_content['total_chunks'],
+                'metadata': structured_content['metadata']
+            }
+
+            logger.info(f"文档处理工作流完成,文件ID: {self.file_id}")
+            return result
+
+        except Exception as e:
+            logger.error(f"文档处理工作流失败: {str(e)}")
+
+            # 更新错误状态
+            if self.progress_manager:
+                await self.progress_manager.update_stage_progress(
+                    callback_task_id=self.callback_task_id,
+                    stage_name="文档处理",
+                    progress=0,
+                    status="failed",
+                    message=f"处理失败: {str(e)}"
+                )
+
+            raise
+
+    async def get_status(self) -> dict:
+        """获取工作流状态"""
+        if self.progress_manager:
+            return await self.progress_manager.get_progress(self.callback_task_id)
+        return {}

+ 117 - 0
core/construction_review/workflows/report_workflow.py

@@ -0,0 +1,117 @@
+"""
+报告生成工作流
+负责报告生成的流程控制和业务编排
+"""
+
+import asyncio
+from typing import Optional, Callable
+from datetime import datetime
+
+from foundation.logger.loggering import server_logger as logger
+from ..component import ReportGenerator
+
+class ReportWorkflow:
+    """报告生成工作流"""
+
+    def __init__(self, file_id: str, callback_task_id: str, user_id: str,
+                 ai_review_results: dict, progress_manager=None):
+        self.file_id = file_id
+        self.callback_task_id = callback_task_id
+        self.user_id = user_id
+        self.ai_review_results = ai_review_results
+        self.progress_manager = progress_manager
+        self.report_generator = ReportGenerator()
+
+    async def execute(self) -> dict:
+        """执行报告生成工作流"""
+        try:
+            logger.info(f"开始报告生成工作流,文件ID: {self.file_id}")
+
+            # 1. 初始化进度
+            await self.progress_manager.update_stage_progress(
+                callback_task_id=self.callback_task_id,
+                stage_name="报告生成",
+                progress=0,
+                status="processing",
+                message="开始生成报告"
+            )
+
+            # 2. 生成报告
+            def progress_callback(progress: int, message: str):
+                # 将报告生成的进度映射到整体进度
+                overall_progress = 90 + int(progress * 0.1)  # 报告生成占整体进度的10%
+                asyncio.create_task(
+                    self.progress_manager.update_stage_progress(
+                        callback_task_id=self.callback_task_id,
+                        stage_name="报告生成",
+                        progress=overall_progress,
+                        status="processing",
+                        message=message
+                    )
+                )
+
+            final_report = await self.report_generator.generate_report(
+                file_id=self.file_id,
+                review_results=self.ai_review_results,
+                progress_callback=progress_callback
+            )
+
+            # 3. 更新完成状态
+            await self.progress_manager.update_stage_progress(
+                callback_task_id=self.callback_task_id,
+                stage_name="报告生成",
+                progress=100,
+                status="completed",
+                message="报告生成完成"
+            )
+
+            # 4. 标记任务链完成
+            await self.progress_manager.complete_task(
+                callback_task_id=self.callback_task_id,
+                result=self._convert_report_to_dict(final_report)
+            )
+
+            # 5. 处理结果
+            result = self._convert_report_to_dict(final_report)
+
+            logger.info(f"报告生成工作流完成,文件ID: {self.file_id}")
+            return result
+
+        except Exception as e:
+            logger.error(f"报告生成工作流失败: {str(e)}")
+
+            # 更新错误状态
+            if self.progress_manager:
+                await self.progress_manager.update_stage_progress(
+                    callback_task_id=self.callback_task_id,
+                    stage_name="报告生成",
+                    progress=90,
+                    status="failed",
+                    message=f"报告生成失败: {str(e)}"
+                )
+
+            raise
+
+    def _convert_report_to_dict(self, final_report) -> dict:
+        """将报告对象转换为字典"""
+        return {
+            'file_id': final_report.file_id,
+            'document_name': final_report.document_name,
+            'risk_stats': final_report.risk_stats,
+            'dimension_scores': {
+                'safety': final_report.dimension_scores.safety,
+                'quality': final_report.dimension_scores.quality,
+                'schedule': final_report.dimension_scores.schedule,
+                'cost': final_report.dimension_scores.cost
+            },
+            'summary_report': final_report.summary_report,
+            'multidimensional_report': final_report.multidimensional_report,
+            'recommendations': final_report.recommendations,
+            'generated_at': final_report.generated_at.isoformat()
+        }
+
+    async def get_status(self) -> dict:
+        """获取工作流状态"""
+        if self.progress_manager:
+            return await self.progress_manager.get_progress(self.callback_task_id)
+        return {}

+ 0 - 0
core/construction_review/workflows/task_progress.py


+ 55 - 0
foundation/base/celery_app.py

@@ -0,0 +1,55 @@
+"""
+Celery应用配置
+负责任务队列管理,不涉及具体业务逻辑
+"""
+
+import os
+from celery import Celery
+from .config import config_handler
+
+# 从配置文件获取Redis连接信息
+redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
+redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
+redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
+
+
+# 构建Redis连接URL
+if redis_password:
+    redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/0"
+else:
+    redis_url = f"redis://{redis_host}:{redis_port}/0"
+
+print(f"Connecting to Redis: {redis_url}")
+
+app = Celery(
+    'workflow_tasks',
+    broker=redis_url,
+    backend=redis_url,
+    include=['foundation.base.tasks']
+)
+
+# 配置
+app.conf.update(
+    task_serializer='json',
+    accept_content=['json'],
+    result_serializer='json',
+    timezone='Asia/Shanghai',
+    enable_utc=True,
+
+    # Worker配置
+    worker_prefetch_multiplier=1,  # 每个worker一次只取一个任务
+    task_acks_late=True,           # 任务完成后再确认
+
+    # 并发控制
+    worker_concurrency=2,          # 每个worker进程数(文档处理较重,不宜过多)
+    worker_pool='solo',           # 使用单线程模式(避免GIL问题)
+
+    # 任务配置
+    task_track_started=True,
+    task_time_limit=600,           # 10分钟超时(文档处理较慢)
+    task_soft_time_limit=540,      # 9分钟软超时
+    worker_max_tasks_per_child=5,  # 每个worker进程最多处理5个任务后重启(防止内存泄漏)
+
+    # 结果过期时间
+    result_expires=3600,           # 1小时后过期
+)

+ 82 - 0
foundation/base/tasks.py

@@ -0,0 +1,82 @@
+"""
+Celery任务定义
+只负责任务调度,具体业务逻辑由WorkflowManager处理
+"""
+
+from celery import current_task
+from .celery_app import app
+from core.base.workflow_manager import WorkflowManager
+from foundation.logger.loggering import server_logger as logger
+from foundation.utils.time_statistics import track_execution_time
+
+
+@app.task(bind=True)
+def submit_task_processing_task(self, file_info: dict):
+    """
+    提交任务处理到Celery队列
+    这个任务只负责调用WorkflowManager,不包含业务逻辑
+    """
+    import traceback
+
+    # 添加调试信息
+    logger.info("=== Celery任务接收调试 ===")
+    logger.info(f"任务ID: {self.request.id}")
+    logger.info(f"文件ID: {file_info.get('file_id')}")
+    logger.info(f"回调任务ID: {file_info.get('callback_task_id')}")
+    logger.info("=== 任务接收调用栈 ===")
+    for line in traceback.format_stack():
+        logger.debug(f"  {line.strip()}")
+    logger.info("=== 调用栈结束 ===")
+
+    try:
+        # 更新任务状态 - 开始处理
+        self.update_state(
+            state='PROGRESS',
+            meta={
+                'current': 0,
+                'total': 100,
+                'status': '开始处理文档',
+                'file_id': file_info.get('file_id')
+            }
+        )
+
+        logger.info(f"开始执行业务逻辑,文件ID: {file_info.get('file_id')}")
+
+        # 创建独立的WorkflowManager实例执行业务逻辑
+        workflow_manager = WorkflowManager(
+            max_concurrent_docs=1,  # Celery worker中单任务执行
+            max_concurrent_reviews=5
+        )
+
+        # 同步执行(Celery worker本身就是独立的进程)
+
+        result = workflow_manager.submit_task_processing_sync(file_info)
+
+
+
+        # 更新任务状态 - 完成
+        self.update_state(
+            state='PROGRESS',
+            meta={
+                'current': 100,
+                'total': 100,
+                'status': '处理完成',
+                'file_id': file_info.get('file_id')
+            }
+        )
+
+
+        return {
+            'status': 'success',
+            'file_id': file_info.get('file_id'),
+            'callback_task_id': file_info.get('callback_task_id'),
+            'result': result
+        }
+
+    except Exception as e:
+        # 记录错误并重试
+        logger.error(f"任务处理失败: {str(e)}")
+        logger.exception("详细错误信息:")
+        # 自动重试,延迟60秒,最多重试2次
+        self.retry(countdown=60, max_retries=2, exc=e)
+        raise

+ 17 - 0
foundation/utils/md5.py

@@ -0,0 +1,17 @@
+import hashlib
+
+def md5_id(file_content_or_path):
+    """计算文件内容或文件路径的MD5哈希值作为ID"""
+    md5_hash = hashlib.md5()
+
+    # 判断输入是文件内容(bytes)还是文件路径(str)
+    if isinstance(file_content_or_path, bytes):
+        # 直接处理文件内容
+        md5_hash.update(file_content_or_path)
+    else:
+        # 处理文件路径
+        with open(file_content_or_path, 'rb') as f:
+            for chunk in iter(lambda: f.read(4096), b''):
+                md5_hash.update(chunk)
+
+    return md5_hash.hexdigest()

+ 21 - 0
foundation/utils/time_statistics.py

@@ -0,0 +1,21 @@
+import time
+from functools import wraps
+from ..logger.loggering import server_logger as logger
+
+def track_execution_time(func):
+    """
+    追踪函数执行时间并通过日志输出的装饰器
+    记录函数开始执行、执行完成及耗时(保留两位小数)
+    """
+    @wraps(func)
+    def wrapper(*args, **kwargs):
+        logger.info(f"[{func.__name__}] 开始执行")
+        start_time = time.perf_counter()
+        
+        try:
+            return func(*args, **kwargs)
+        finally:
+            duration = time.perf_counter() - start_time
+            logger.info(f"[{func.__name__}] 执行完成,耗时: {duration:.2f} 秒")
+    
+    return wrapper

+ 4 - 0
requirements.txt

@@ -19,6 +19,8 @@ coloredlogs==15.0.1
 concurrent-log-handler==0.9.28
 cryptography==45.0.5
 cyclopts==3.22.2
+celery==5.5.3
+redis
 dashscope==1.23.8
 dataclasses-json==0.6.7
 distro==1.9.0
@@ -101,6 +103,7 @@ opentelemetry-semantic-conventions==0.55b1
 orjson==3.10.18
 ormsgpack==1.10.0
 overrides==7.7.0
+python-docx==1.2.0
 packaging==24.2
 pandas==2.3.1
 pluggy==1.6.0
@@ -140,6 +143,7 @@ rich==14.0.0
 rich-rst==1.3.1
 rpds-py==0.26.0
 rsa==4.9.1
+grandalf==0.8
 setuptools==78.1.1
 shellingham==1.5.4
 six==1.17.0

File diff suppressed because it is too large
+ 76 - 0
temp/AI审查结果.json


+ 348 - 10
views/construction_review/app.py

@@ -6,12 +6,19 @@
 import datetime
 import sys
 import os
+import threading
+import subprocess
+import time
+from multiprocessing import Process
 
 # 添加项目根目录到Python路径
 current_dir = os.path.dirname(os.path.abspath(__file__))
 project_root = os.path.dirname(os.path.dirname(current_dir))
 sys.path.insert(0, project_root)
 
+# 现在可以正常导入了
+from foundation.logger.loggering import server_logger as logger
+from foundation.base.celery_app import app as celery_app
 from fastapi import FastAPI, HTTPException
 from fastapi.middleware.cors import CORSMiddleware
 from fastapi.responses import JSONResponse
@@ -58,6 +65,17 @@ def create_app() -> FastAPI:
         timestamp = datetime.datetime.now().isoformat()
         return {"status": "healthy", "timestamp": timestamp}
 
+    # Celery状态检查
+    @app.get("/celery/status")
+    async def get_celery_status():
+        """获取Celery Worker状态"""
+        global celery_manager
+        status = celery_manager.get_status()
+        return {
+            "celery_worker": status,
+            "timestamp": datetime.datetime.now().isoformat()
+        }
+
     # API文档
     @app.get("/api/docs")
     async def api_docs():
@@ -90,18 +108,338 @@ def create_app() -> FastAPI:
         }
 
     return app
-app = create_app()
+# Celery Worker管理器
+class CeleryWorkerManager:
+    """Celery Worker程序化管理器"""
+
+    def __init__(self):
+        self.worker = None
+        self.is_running = False
+        self.worker_thread = None
+        self.shutdown_event = threading.Event()
+
+    def start_worker(self, **kwargs):
+        """启动Celery Worker"""
+        if self.is_running:
+            logger.warning("Celery Worker已在运行")
+            return True
+
+        try:
+            # 导入Celery应用
+            from foundation.base.celery_app import app as celery_app
+
+            # 创建Worker函数
+            def run_celery_worker():
+                try:
+                    # 使用最简单的启动方式
+                    logger.info("Celery Worker开始运行...")
+
+                    # 直接启动worker,使用默认配置
+                    celery_app.worker_main(['worker'])
+
+                except KeyboardInterrupt:
+                    logger.info("收到停止信号,Celery Worker退出")
+                except Exception as e:
+                    logger.error(f"Celery Worker运行时出错: {e}")
+                    logger.exception("详细错误信息:")
+                finally:
+                    self.is_running = False
+                    logger.info("Celery Worker已停止")
+
+            # 在单独线程中启动Worker
+            self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True)
+            self.worker_thread.start()
+            self.is_running = True
+
+            # 等待启动
+            time.sleep(2)
+
+            if self.is_running and self.worker_thread.is_alive():
+                logger.info("Celery Worker启动成功")
+                return True
+            else:
+                logger.error("Celery Worker启动失败")
+                self.is_running = False
+                return False
+
+        except ImportError as e:
+            logger.error(f"导入Celery失败: {e}")
+            logger.info("请先安装Celery: pip install celery redis")
+            return False
+        except Exception as e:
+            logger.error(f"启动Celery Worker失败: {e}")
+            logger.exception("详细错误信息:")
+            return False
+
+    def stop_worker(self, timeout: int = 5):
+        """停止Celery Worker"""
+        if not self.is_running:
+            logger.info("Celery Worker未运行")
+            return True
+
+        try:
+            logger.info("停止Celery Worker...")
+            self.shutdown_event.set()
+
+            # 发送停止信号给线程
+            if self.worker_thread and self.worker_thread.is_alive():
+                # 尝试优雅停止
+                start_time = time.time()
+                while self.is_running and (time.time() - start_time) < timeout:
+                    time.sleep(0.1)
+
+                # 如果还没停止,记录警告
+                if self.is_running:
+                    logger.warning("Celery Worker优雅停止超时")
+                else:
+                    logger.info("Celery Worker已优雅停止")
+
+            self.is_running = False
+            self.shutdown_event.clear()
+            return True
+
+        except Exception as e:
+            logger.error(f"停止Celery Worker失败: {e}")
+            return False
+
+    def stop_worker_immediately(self):
+        """立即停止Celery Worker,不等待"""
+        if not self.is_running:
+            logger.info("Celery Worker未运行")
+            return True
+
+        try:
+            logger.info("立即停止Celery Worker...")
+            self.shutdown_event.set()
+
+            # 设置超时事件,强制停止
+            import signal
+            import os
+
+            # 发送中断信号给当前进程
+            if hasattr(os, 'kill'):
+                try:
+                    os.kill(os.getpid(), signal.SIGINT)
+                    logger.info("已发送中断信号")
+                except:
+                    pass
+
+            # 立即设置状态为停止
+            self.is_running = False
+            self.shutdown_event.clear()
+
+            logger.info("Celery Worker已立即停止")
+            return True
+
+        except Exception as e:
+            logger.error(f"立即停止Celery Worker失败: {e}")
+            # 即使失败也要设置状态
+            self.is_running = False
+            return False
+
+    def get_status(self):
+        """获取Worker状态"""
+        return {
+            "is_running": self.is_running,
+            "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False,
+        }
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.stop_worker()
+
+
+# 全局Worker管理器实例
+celery_manager = CeleryWorkerManager()
+
+def start_celery_worker():
+    """启动Celery Worker(同步方式,用于测试)"""
+    return celery_manager.start_worker()
+
+def cleanup_redis_before_start():
+    """启动前清理Redis中的残留Celery任务"""
+    try:
+        import redis
+        from foundation.base.config import config_handler
+
+        # 连接Redis
+        redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
+        redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
+        redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
 
-def run_server(host: str = "127.0.0.1", port: int = 8034, reload: bool = True):
+        if redis_password:
+            redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/0'
+        else:
+            redis_url = f'redis://{redis_host}:{redis_port}/0'
+
+        r = redis.from_url(redis_url, decode_responses=True)
+
+        logger.info("清理Redis中的残留Celery任务...")
+
+        # 清理所有Celery相关的键
+        keys_to_delete = []
+        for key in r.keys():
+            if any(keyword in key.lower() for keyword in ['celery', 'task:']):
+                keys_to_delete.append(key)
+
+        if keys_to_delete:
+            for key in keys_to_delete:
+                try:
+                    r.delete(key)
+                    logger.debug(f"已清理: {key}")
+                except Exception as e:
+                    logger.warning(f"清理 {key} 失败: {e}")
+
+            logger.info(f"成功清理 {len(keys_to_delete)} 个Redis键")
+        else:
+            logger.info("没有发现需要清理的残留任务")
+
+        return True
+
+    except Exception as e:
+        logger.error(f"清理Redis残留任务失败: {e}")
+        return False
+
+def start_celery_worker_background():
+    """在后台启动Celery Worker(异步方式)"""
+    # 启动前清理残留任务
+    cleanup_redis_before_start()
+
+    # 添加调用栈调试
+    import traceback
+    logger.info("=== Celery Worker启动调用栈 ===")
+    for line in traceback.format_stack():
+        logger.debug(f"  {line.strip()}")
+    logger.info("=== 调用栈结束 ===")
+
+    return celery_manager.start_worker()
+
+def stop_celery_worker():
+    """停止Celery Worker"""
+    global celery_manager
+
+    # 立即取消所有任务注册
+    try:
+        import redis
+        from foundation.base.config import config_handler
+
+        # 连接Redis
+        redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
+        redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
+        redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
+
+        if redis_password:
+            redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/2'
+        else:
+            redis_url = f'redis://{redis_host}:{redis_port}/2'
+
+        r = redis.from_url(redis_url, decode_responses=True)
+
+        # 清理所有任务注册
+        task_keys = r.keys('task:*')
+        for key in task_keys:
+            r.delete(key)
+            logger.info(f"取消任务注册: {key}")
+
+        logger.info(f"已取消 {len(task_keys)} 个任务注册")
+
+    except Exception as e:
+        logger.error(f"取消任务注册失败: {e}")
+
+    # 立即停止Worker,不等待
+    return celery_manager.stop_worker_immediately()
+
+def run_server(host: str = "127.0.0.1", port: int = 8034, reload: bool = False,
+                with_celery: bool = True):
     """运行服务器"""
-    if reload:
-        # 重载模式需要正确的模块路径
-        app_import_path = "views.construction_review.app:app"
-        uvicorn.run(app_import_path, host=host, port=port, reload=reload)
-    else:
-        # 直接运行模式,直接使用app对象
-        uvicorn.run(app, host=host, port=port)
+
+    if with_celery:
+        # 启动Celery Worker
+        start_celery_worker_background()
+
+        # 注册退出时的清理函数
+        import atexit
+        atexit.register(stop_celery_worker)
+
+        # 设置信号处理
+        import signal
+        def signal_handler(signum, frame):
+            logger.info(f"收到信号 {signum},正在停止服务...")
+            stop_celery_worker()
+            sys.exit(0)
+
+        # Windows和Unix系统的信号处理
+        try:
+            signal.signal(signal.SIGINT, signal_handler)  # Ctrl+C
+            signal.signal(signal.SIGTERM, signal_handler)  # 终止信号
+        except AttributeError:
+            # Windows可能不支持某些信号
+            pass
+
+        # Windows特有的控制台事件处理
+        if sys.platform == 'win32':
+            try:
+                import win32api
+                def win32_handler(dwCtrlType):
+                    # 正确的控制台事件常量
+                    CTRL_C_EVENT = 0
+                    CTRL_BREAK_EVENT = 1
+                    CTRL_CLOSE_EVENT = 2
+                    CTRL_SHUTDOWN_EVENT = 6
+
+                    if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT):
+                        logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...")
+                        stop_celery_worker()
+                        sys.exit(0)
+                    return False
+                win32api.SetConsoleCtrlHandler(win32_handler, True)
+            except (ImportError, AttributeError) as e:
+                # 如果win32api不可用,跳过Windows控制台处理
+                logger.debug(f"Windows控制台事件处理不可用: {e}")
+                pass
+
+    try:
+        if reload:
+            # 重载模式需要正确的模块路径
+            app_import_path = "views.construction_review.app:app"
+            uvicorn.run(app_import_path, host=host, port=port, reload=reload)
+        else:
+            # 直接运行模式,直接使用app对象
+            uvicorn.run(app, host=host, port=port)
+    finally:
+        if with_celery:
+            stop_celery_worker()
+
+
+app = create_app()
 
 
 if __name__ == "__main__":
-    run_server(reload=True)  # 直接运行时关闭重载
+    import argparse
+
+    parser = argparse.ArgumentParser(description='施工方案审查API服务')
+    parser.add_argument('--host', default='127.0.0.1', help='服务器地址')
+    parser.add_argument('--port', type=int, default=8035, help='服务器端口')
+    parser.add_argument('--no-celery', action='store_true', help='不启动Celery Worker')
+    parser.add_argument('--no-reload', action='store_true', help='关闭热重载')
+
+    args = parser.parse_args()
+
+    logger.info("施工方案审查API服务启动中...")
+    logger.info(f"服务地址: http://{args.host}:{args.port}")
+    logger.info(f"API文档: http://{args.host}:{args.port}/docs")
+    logger.info(f"健康检查: http://{args.host}:{args.port}/health")
+
+    if not args.no_celery:
+        logger.info("Celery Worker: 已集成启动")
+    else:
+        logger.warning("Celery Worker: 已禁用")
+
+    run_server(
+        host=args.host,
+        port=args.port,
+        reload=False,
+        with_celery=not args.no_celery
+    )

+ 133 - 85
views/construction_review/file_upload.py

@@ -3,13 +3,16 @@
 模拟文件上传功能,返回文件ID和回调任务ID
 """
 import ast
+import traceback
 import uuid
 import time
 from datetime import datetime
 from fastapi import APIRouter, UploadFile, File, Form, HTTPException
 from pydantic import BaseModel
 from typing import Optional,List
-from core.construction_review.workflows.document_ans import DocumentParse
+from foundation.utils import md5
+from core.base.redis_duplicate_checker import RedisDuplicateChecker
+from core.base.workflow_manager import WorkflowManager
 from foundation.logger.loggering import server_logger as logger
 from foundation.base.config import config_handler
 from .schemas.error_schemas import FileUploadErrors
@@ -18,59 +21,64 @@ from .schemas.error_schemas import FileUploadErrors
 
 file_upload_router = APIRouter(prefix="/sgsc", tags=["文档上传"])
 uploaded_files = {}
+# 初始化工作流管理器
+workflow_manager = WorkflowManager(
+    max_concurrent_docs=3,
+    max_concurrent_reviews=5
+)
+# 使用workflow_manager的duplicatechecker实例,确保一致性
+duplicatechecker = workflow_manager.redis_duplicate_checker
 
 class FileUploadResponse(BaseModel):
     code: int
     data: dict
 
-def validate_file(file: UploadFile) -> None:
-    """验证文件格式和大小"""
-    # 检查文件是否存在
+def get_file_size(file: UploadFile) -> int:
+    """获取文件大小的可靠同步方法(兼容 seek 仅支持单参数的情况)"""
+    try:
+        content = file.file.read()
+        size = len(content)
+        file.file.seek(0)
+        size_mb = size / (1024 * 1024)
+        return size,round(size_mb, 2)
+    except Exception as e:
+        logger.warning(f"获取文件大小失败: {str(e)}")
+        return 0, 0.0  
+
+def validate_file(file: UploadFile, file_content: bytes = None) -> None:
+    """验证文件格式"""
+
+    file_extension = '.' + file.filename.split('.')[-1].lower() if '.' in file.filename else ''
+    # 检测文件类型
+    actual_file_type = 'unknown'
+
+    if file_content.startswith(b'%PDF'):
+        actual_file_type = 'pdf'
+    elif file_content.startswith(b'PK\x03\x04'):
+        if file_extension in ['.docx'] or file_extension in ['.doc'] :
+            actual_file_type = 'docx/doc'
+        else:
+            logger.warning(f"未知文件类型,: {file_content[:20]}")
+            raise FileUploadErrors.file_format_unsupported()
+    else:
+        logger.warning(f"未知文件类型,: {file_content[:20]}")
+        raise FileUploadErrors.file_format_unsupported()
 
-    if not file or not file.filename:
-        raise FileUploadErrors.file_missing()
 
-    # 检查文件大小(Mock中假设文件大小合理,实际应该读取文件内容)
-    # 这里可以添加文件大小检查逻辑
-    file_size = getattr(file, 'size', None)
-    if file_size is not None and file_size == 0:
-        raise FileUploadErrors.file_rejected("文件为空")
 
-    # 支持的文件类型
-    allowed_mime_types = {
-        'application/pdf',
-        'application/msword',
-        'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
-    }
 
-    # 检查文件格式
-    if file.content_type not in allowed_mime_types:
-        raise FileUploadErrors.file_format_unsupported()
+    logger.info(f"文件类型验证通过: {actual_file_type} (扩展名: {file_extension}, MIME: {file.content_type})")
 
 @file_upload_router.post("/file_upload", response_model=FileUploadResponse)
 async def file_upload(
-    file: List[UploadFile] = File([]),  # 改为文件列表,支持多文件检测
+    file: List[UploadFile] = File([]),  
     callback_url: str = Form(None),
     project_plan_type: str = Form(None),
-    user: str = Form(None)  # 用户参数从表单获取,不从配置获取
+    user: str = Form(None)  
 ):
     """
     文件上传接口
     """
-
-    # 调试日志信息
-    logger.info(f"文件上传请求 - 用户: {user}, 文件数量: {len(file) if file else 0}",
-                log_type="upload", trace_id=f"upload-{int(time.time())}")
-
-    # 记录每个文件的信息
-    if file:
-        for i, f in enumerate(file):
-            file_size = getattr(f, 'size', 0)  # 安全获取文件大小,避免属性不存在错误
-            logger.info(f"文件 {i+1}: {f.filename}, 大小: {file_size}, 类型: {f.content_type}", log_type="upload")
-    logger.info(f"请求参数 - 回调URL: {callback_url}, 工程类型: {project_plan_type}",
-                log_type="upload")
-    logger.info(f"用户标识: {user}")
-
     try:
         # 验证工程方案类型
         valid_project_types = {
@@ -81,17 +89,34 @@ async def file_upload(
         valid_users = ast.literal_eval(config_handler.get("user_lists", "USERS"))
         
         # 验证文件上传
+
         if not file or len(file) == 0:
             raise FileUploadErrors.file_missing()
+        elif not file[0].filename:
+            raise FileUploadErrors.file_missing()
         elif len(file) > 1:
+            logger.info(f"文件上传请求 - 用户: {user}, 文件数量: {len(file) if file else 0}",
+            log_type="upload", trace_id=f"upload-{int(time.time())}")
             raise FileUploadErrors.file_multiple()
         
-        
-
-        # 验证文件格式和大小(只验证第一个文件)
-
+        # 验证文件数量
         if file and len(file) > 0:
-            validate_file(file[0])
+            try:
+                content = file[0].file.read()
+                file[0].file.seek(0)  # 重置文件指针
+            except:
+                content = b""
+            validate_file(file[0], content)
+
+        # 验证文件格式
+        file_size, file_size_mb = get_file_size(file[0])
+        if file_size == 0:
+            raise FileUploadErrors.file_missing()
+
+        # 验证文件大小限制
+        if file_size_mb > 30:  # 文件大小不能超过30MB
+            raise FileUploadErrors.file_size_exceeded()
+        
         # 验证回调地址
         if callback_url is '':
             raise FileUploadErrors.callback_url_missing()
@@ -104,59 +129,82 @@ async def file_upload(
         if project_plan_type not in valid_project_types:
             raise FileUploadErrors.project_plan_type_invalid()
 
-        # 生成文件ID和回调任务ID
-        file_id = str(uuid.uuid4())
+        # 生成文件MD5ID
+        file_id = md5.md5_id(content)
+        if await duplicatechecker.is_duplicate_task(file_id):
+            raise FileUploadErrors.task_already_exists()
+
         created_at = int(time.time())
-        callback_task_id = f"{file_id}-{created_at}"
 
-        # 保存文件信息
+        # 详细文件信息调试
+        logger.info(f"=== 文件详细信息 ===")
+        logger.info(f"文件名: {file[0].filename}")
+        logger.info(f"文件扩展名: {file[0].filename.split('.')[-1] if '.' in file[0].filename else '无扩展名'}")
+        logger.info(f"文件头信息: {content[:50] if 'content' in locals() else '未读取'}")
+        logger.info(f"文件大小: {file_size_mb} MB")
+        logger.info(f"========================", log_type="upload")
+        logger.info(f"请求参数 - 回调URL: {callback_url}\n, 工程类型: {project_plan_type}",
+                    log_type="upload")
+        logger.info(f"用户标识: {user}")
+
+        # 确定文件类型
+        file_extension = file[0].filename.split('.')[-1].lower() if '.' in file[0].filename else ''
+        if content.startswith(b'%PDF'):
+            file_type = 'pdf'
+        elif content.startswith(b'PK\x03\x04') and file_extension in ['docx', 'doc']:
+            file_type = 'docx'
+        else:
+            file_type = 'unknown'
+
+
+        # 生成回调任务ID
+        callback_task_id = f"chain-{file_id}-{int(datetime.now().timestamp())}"
+
+        # 记录文件信息
         file_info = {
-            "id": file_id,
-            "name": file[0].filename,
-            "size": 1024 * 1024,  # 文件大小 1MB
-            "created_at": created_at,
-            "status": "success",
-            "callback_task_id": callback_task_id,
-            "callback_url": callback_url,
-            "project_plan_type": project_plan_type,
-            "user": user,
-            "upload_time": datetime.now().isoformat()
-        }
+                'file_id': file_id,
+                'file_content': content,
+                'user_id': user,
+                'file_type': file_type,
+                'callback_task_id': callback_task_id,
+                "file_name": file[0].filename,
+                "file_size": file_size_mb,
+                "project_plan_type": project_plan_type,
+                'updated_at': created_at
+            }
 
-        # 文档处理(暂时注释,等文件保存逻辑实现后再启用)
-        # DocumentParse.document_parse(file_path)
-
-        uploaded_files[file_id] = file_info
-        uploaded_files[callback_task_id] = {
-            "file_id": file_id,
-            "user": user,
-            "review_task_status": "processing",
-            "overall_progress": 0,
-            "stages": [
-                {"stage_name": "文件上传", "progress": 100, "stage_status": "completed"},
-                {"stage_name": "格式校验", "progress": 0, "stage_status": "pending"},
-                {"stage_name": "内容提取", "progress": 0, "stage_status": "pending"},
-                {"stage_name": "智能审查", "progress": 0, "stage_status": "pending"}
-            ],
-            "updated_at": int(time.time()),
-            "estimated_remaining": 1800  # 预计30分钟
-        }
+        try:
+            # 提交处理任务到工作流管理器
+            await workflow_manager.submit_task_processing(file_info)
+            logger.info(f"文档处理任务已提交,任务ID: {callback_task_id}")
 
-        return FileUploadResponse(
-            code=200,
-            data={
-                "id": file_id,
-                "name": file[0].filename,
-                "size": file_info["size"],
-                "created_at": file_info["created_at"],
-                "status": "success",
-                "callback_task_id": callback_task_id
-            }
-        )
+
+
+            return FileUploadResponse(
+                code=200,
+                data={
+                    "id": file_info['file_id'],
+                    "name": file_info['file_name'],
+                    "size": file_size_mb,
+                    "created_at": created_at,
+                    "status": "processing",
+                    "callback_task_id": file_info['callback_task_id']
+                }
+            )
+
+        except Exception as workflow_error:
+            logger.error(f"工作流提交失败: {str(workflow_error)}")
+            raise FileUploadErrors.internal_error(workflow_error)
 
     except HTTPException:
+        logger.error(f"HTTP异常: {traceback.format_exc()}")
         raise
     except Exception as e:
+        logger.error(f"文件上传失败: {str(e)}")
+        logger.error(f"错误详情: {traceback.format_exc()}")
         raise FileUploadErrors.internal_error(e)
 
 
+
+
+

+ 45 - 8
views/construction_review/schemas/error_schemas.py

@@ -5,7 +5,7 @@
 
 from typing import Dict, Any
 from fastapi import HTTPException
-
+from foundation.logger.loggering import server_logger as logger
 
 class ErrorCodes:
     """错误码常量定义"""
@@ -42,14 +42,14 @@ class ErrorCodes:
     WJSC005 = {
         "code": "WJSC005",
         "error_type": "FILE_SIZE_EXCEEDED",
-        "message": "文件过大(最大30MB)",
+        "message": "文件过大(最大不超过30MB)",
         "status_code": 400
     }
 
     WJSC006 = {
         "code": "WJSC006",
         "error_type": "PROJECT_PLAN_TYPE_INVALID",
-        "message": "工程方案类型无效(未注册)",
+        "message": "无效工程方案类型(未提供或未注册)",
         "status_code": 400
     }
 
@@ -63,7 +63,7 @@ class ErrorCodes:
     WJSC008 = {
         "code": "WJSC008",
         "error_type": "INVALID_USER",
-        "message": "用户标识(user)无效",
+        "message": "用户标识未提供或无效",
         "status_code": 403
     }
 
@@ -76,6 +76,13 @@ class ErrorCodes:
 
     WJSC010 = {
         "code": "WJSC010",
+        "error_type": "TASK_ALREADY_EXISTS",
+        "message": "任务已存在,请勿重复提交",
+        "status_code": 409
+    }
+
+    WJSC011 = {
+        "code": "WJSC011",
         "error_type": "INTERNAL_ERROR",
         "message": "服务端内部错误",
         "status_code": 500
@@ -106,7 +113,7 @@ class ErrorCodes:
     JDLX004 = {
         "code": "JDLX004",
         "error_type": "INVALID_USER",
-        "message": "用户标识(user)无效",
+        "message": "用户标识未提供或无效",
         "status_code": 403
     }
 
@@ -156,7 +163,7 @@ class ErrorCodes:
     SCJG005 = {
         "code": "SCJG005",
         "error_type": "INVALID_USER",
-        "message": "用户标识无效",
+        "message": "用户标识未提供或无效",
         "status_code": 403
     }
 
@@ -217,7 +224,7 @@ def create_server_error(error_code: str, original_error: Exception) -> HTTPExcep
         HTTPException: FastAPI异常对象
     """
     error_map = {
-        "WJSC008": ErrorCodes.WJSC008,
+        "WJSC011": ErrorCodes.WJSC011,
         "JDLX006": ErrorCodes.JDLX006,
         "SCJG008": ErrorCodes.SCJG008
     }
@@ -234,43 +241,59 @@ class FileUploadErrors:
 
     @staticmethod
     def file_missing():
+        logger.error(ErrorCodes.WJSC001)
         return create_http_exception(ErrorCodes.WJSC001)
 
     @staticmethod
     def file_multiple():
+        logger.error(ErrorCodes.WJSC002)
         return create_http_exception(ErrorCodes.WJSC002)
 
     @staticmethod
     def file_rejected(message: str = None):
+        logger.error(ErrorCodes.WJSC003)
         return create_http_exception(ErrorCodes.WJSC003, message)
 
     @staticmethod
     def file_format_unsupported():
+        logger.error(ErrorCodes.WJSC004)
         return create_http_exception(ErrorCodes.WJSC004)
 
     @staticmethod
     def file_size_exceeded():
+        logger.error(ErrorCodes.WJSC005)
         return create_http_exception(ErrorCodes.WJSC005)
 
     @staticmethod
     def project_plan_type_invalid():
+        logger.error(ErrorCodes.WJSC006)
         return create_http_exception(ErrorCodes.WJSC006)
 
     @staticmethod
     def unauthorized():
+        logger.error(ErrorCodes.WJSC007)
         return create_http_exception(ErrorCodes.WJSC007)
     
     @staticmethod
     def invalid_user():
+        logger.error(ErrorCodes.WJSC008)
         return create_http_exception(ErrorCodes.WJSC008)
     
     @staticmethod
     def callback_url_missing():
+        logger.error(ErrorCodes.WJSC009)
         return create_http_exception(ErrorCodes.WJSC009)
 
+
+    @staticmethod
+    def task_already_exists():
+        logger.error(ErrorCodes.WJSC010)
+        return create_http_exception(ErrorCodes.WJSC010)
+
     @staticmethod
     def internal_error(original_error: Exception):
-        return create_server_error("WJSC010", original_error)
+        logger.error(ErrorCodes.WJSC011)
+        return create_server_error("WJSC011", original_error)
 
 
 class TaskProgressErrors:
@@ -278,26 +301,32 @@ class TaskProgressErrors:
 
     @staticmethod
     def missing_parameters():
+        logger.error(ErrorCodes.JDLX001)
         return create_http_exception(ErrorCodes.JDLX001)
 
     @staticmethod
     def invalid_param_format():
+        logger.error(ErrorCodes.JDLX002)
         return create_http_exception(ErrorCodes.JDLX002)
 
     @staticmethod
     def unauthorized():
+        logger.error(ErrorCodes.JDLX003)
         return create_http_exception(ErrorCodes.JDLX003)
 
     @staticmethod
     def invalid_user():
+        logger.error(ErrorCodes.JDLX004)
         return create_http_exception(ErrorCodes.JDLX004)
 
     @staticmethod
     def task_not_found():
+        logger.error(ErrorCodes.JDLX005)
         return create_http_exception(ErrorCodes.JDLX005)
 
     @staticmethod
     def server_internal_error(original_error: Exception):
+        logger.error(ErrorCodes.JDLX006, original_error)
         return create_server_error("JDLX006", original_error)
 
 
@@ -306,32 +335,40 @@ class ReviewResultsErrors:
 
     @staticmethod
     def invalid_type():
+        logger.error(ErrorCodes.SCJG001)
         return create_http_exception(ErrorCodes.SCJG001)
 
     @staticmethod
     def missing_param_id():
+        logger.error(ErrorCodes.SCJG002)
         return create_http_exception(ErrorCodes.SCJG002)
 
     @staticmethod
     def invalid_id_format():
+        logger.error(ErrorCodes.SCJG003)
         return create_http_exception(ErrorCodes.SCJG003)
 
     @staticmethod
     def unauthorized():
+        logger.error(ErrorCodes.SCJG004)
         return create_http_exception(ErrorCodes.SCJG004)
 
     @staticmethod
     def invalid_user():
+        logger.error(ErrorCodes.SCJG005)
         return create_http_exception(ErrorCodes.SCJG005)
 
     @staticmethod
     def task_not_found():
+        logger.error(ErrorCodes.SCJG006)
         return create_http_exception(ErrorCodes.SCJG006)
 
     @staticmethod
     def no_review_results():
+        logger.error(ErrorCodes.SCJG007)
         return create_http_exception(ErrorCodes.SCJG007)
 
     @staticmethod
     def server_error(original_error: Exception):
+        logger.error(ErrorCodes.SCJG008)
         return create_server_error("SCJG008", original_error)

+ 7 - 11
views/construction_review/task_progress.py

@@ -1,6 +1,6 @@
 """
-审查进度轮询接口Mock实现
-模拟任务进度更新,支持多阶段进度展示
+审查进度轮询接口
+支持Celery任务状态查询和进度展示
 """
 
 import time
@@ -9,14 +9,11 @@ from datetime import datetime
 from fastapi import APIRouter, HTTPException, Query
 from pydantic import BaseModel
 from typing import Optional
+from celery.result import AsyncResult
+from foundation.base.celery_app import app
 
-task_progress_router = APIRouter(prefix="/sgsc", tags=["进度轮询Mock"])
+task_progress_router = APIRouter(prefix="/sgsc", tags=["进度轮询"])
 
-# 导入文件上传模块的存储
-try:
-    from .file_upload import uploaded_files
-except ImportError:
-    from views.construction_review.file_upload import uploaded_files
 
 # 导入错误码定义
 from .schemas.error_schemas import TaskProgressErrors
@@ -104,7 +101,7 @@ async def task_progress(
     user: str = Query(None)
 ):
     """
-    Mock任务进度轮询接口
+    任务进度轮询接口
     """
     try:
         # 验证参数
@@ -143,8 +140,7 @@ async def task_progress(
                 "review_task_status": updated_task["review_task_status"],
                 "overall_progress": updated_task["overall_progress"],
                 "stages": updated_task["stages"],
-                "updated_at": updated_task["updated_at"],
-                "estimated_remaining": updated_task["estimated_remaining"]
+                "updated_at": updated_task["updated_at"]
             }
         )
 

Some files were not shown because too many files changed in this diff