ソースを参照

refactor: 拆分 server/app.py 为模块化结构

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
WangXuMing 2 週間 前
コミット
1ee52bcfad
7 ファイル変更410 行追加876 行削除
  1. 9 876
      server/app.py
  2. 123 0
      server/celery_manager.py
  3. 56 0
      server/factory.py
  4. 78 0
      server/redis_cleanup.py
  5. 62 0
      server/routes.py
  6. 40 0
      server/runner.py
  7. 42 0
      server/signals.py

+ 9 - 876
server/app.py

@@ -1,906 +1,39 @@
+"""
+FastAPI 应用入口。
+
+启动方式:
+    python server/app.py
+    uvicorn server.app:app --port=8001
+"""
 import os
 import sys
 import logging
 
-# Windows 平台 Celery 兼容性设置(必须在导入 celery 之前)
 if sys.platform == 'win32':
     os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
 
-# 抑制 pymilvus 的 AsyncMilvusClient 警告(在多进程环境中没有事件循环)
 logging.getLogger('pymilvus').setLevel(logging.ERROR)
 
-import time
-import redis
-import signal
-import uvicorn
-import datetime
-import traceback
-import threading
 BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 sys.path.insert(0, BASE_DIR)
 
-from views import lifespan
-from fastapi import FastAPI, HTTPException
-from fastapi.responses import JSONResponse
-from pydantic import BaseModel
-from typing import Optional, Dict, Any
-from fastapi.middleware.cors import CORSMiddleware
-from foundation.infrastructure.config.config import config_handler
-from foundation.infrastructure.cache import RedisConnectionFactory
+from server.factory import ApplicationFactory
+from server.runner import ServerRunner
 from foundation.observability.logger.loggering import server_logger
-from foundation.infrastructure.messaging.celery_app import app as celery_app
-
-# 导入所有路由
-from views.construction_review.file_upload import file_upload_router
-from views.construction_review.review_results import review_results_router
-from views.construction_review.launch_review import launch_review_router
-from views.construction_review.task_control import task_control_router
-from views.construction_review.desensitize_api import desensitize_router
-
-# 导入施工方案编写路由
-from views.construction_write.outline_views import outline_router
-from views.construction_write.content_completion import content_completion_router
-from views.construction_write.regenerate_views import regenerate_outline_router
-from views.construction_write.task_cancel_views import task_cancel_router
-
-
-class ServerUtils:
-    """服务器工具函数类 - 集中管理工具方法"""
-
-    @staticmethod
-    def get_redis_connection():
-        """获取Redis连接的统一工具函数
-
-        Returns:
-            redis.Redis: Redis连接对象
-        """
-
-
-        # 从配置文件获取Redis连接参数
-        redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
-        redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
-        redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
-        redis_db = config_handler.get('redis', 'REDIS_DB', '0')
-
-        # 构建Redis URL
-        if redis_password:
-            redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}'
-        else:
-            redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}'
-
-        return redis.from_url(redis_url, decode_responses=True)
-
-
-class RouteManager:
-    """路由管理类 - 负责路由配置和中间件设置"""
-
-    def __init__(self, app: FastAPI):
-        """初始化路由管理器
-
-        Args:
-            app: FastAPI应用实例
-        """
-        self.app = app
-        self._setup_cors()
-        self._setup_routes()
-        self._setup_exception_handlers()
-        self._setup_health_checks()
-        self._setup_api_docs()
-
-
-    def _setup_cors(self):
-        """配置CORS中间件"""
-        self.app.add_middleware(
-            CORSMiddleware,
-            allow_origins=["*"],  # 允许所有的来源
-            allow_credentials=True,
-            allow_methods=["*"],  # 允许的HTTP方法
-            allow_headers=["*"],  # 允许的请求头
-        )
-
-    def _setup_routes(self):
-        """配置所有路由"""
-        self.app.include_router(file_upload_router)
-        self.app.include_router(review_results_router)
-        self.app.include_router(launch_review_router)
-        self.app.include_router(task_control_router)  # 任务控制路由
-        self.app.include_router(desensitize_router)   # 数据脱敏路由
-
-        # 施工方案编写路由
-        self.app.include_router(outline_router)
-        self.app.include_router(content_completion_router)
-        self.app.include_router(regenerate_outline_router)
-        self.app.include_router(task_cancel_router)
-
-    def _setup_exception_handlers(self):
-        """配置全局异常处理"""
-        @self.app.exception_handler(HTTPException)
-        async def http_exception_handler(request, exc):
-            return JSONResponse(
-                status_code=exc.status_code,
-                content=exc.detail
-            )
-
-    def _setup_health_checks(self):
-        """配置健康检查接口"""
-        @self.app.get("/health" ,tags=["系统状态"])
-        async def health_check():
-            timestamp = datetime.datetime.now().isoformat()
-            return {"status": "healthy", "timestamp": timestamp}
-
-        @self.app.get("/celery/status", tags=["系统状态"])
-        async def get_celery_status():
-            """获取Celery Worker状态"""
-            # 延迟导入避免循环引用
-            from server.app import celery_manager
-            status = celery_manager.get_status()
-            return {
-                "celery_worker": status,
-                "timestamp": datetime.datetime.now().isoformat()
-            }
-
-    def _setup_api_docs(self):
-        """配置Swagger API文档"""
-        # 添加API文档信息接口
-        @self.app.get("/api/docs/info", tags=["系统状态"])
-        async def api_info():
-            """获取API文档信息"""
-            return {
-                "title": "Agent API - 施工方案审查系统",
-                "description": "集成施工方案审查功能的API接口文档",
-                "version": "0.3",
-                "docs_urls": {
-                    "swagger_ui": "/docs",
-                    "redoc": "/redoc",
-                    "openapi_json": "/openapi.json"
-                },
-                "features": [
-                    "自动生成API文档",
-                    "交互式API测试",
-                    "OpenAPI 3.0规范",
-                    "支持多种认证方式"
-                ]
-            }
-
-        @self.app.get("/api/docs/health", tags=["系统状态"])
-        async def docs_health_check():
-            """API文档健康检查"""
-            return {
-                "status": "healthy",
-                "service": "API Documentation",
-                "version": "0.3",
-                "timestamp": datetime.datetime.now().isoformat()
-            }
-
-
-
-class CeleryWorkerManager:
-    """Celery Worker程序化管理器 - 独立的Celery管理模块"""
-
-    def __init__(self, server_utils: ServerUtils = None):
-        """初始化Celery Worker管理器
-
-        Args:
-            server_utils: 服务器工具类实例
-        """
-        self.worker = None
-        self.is_running = False
-        self.worker_thread = None
-        self.shutdown_event = threading.Event()
-        self.server_utils = server_utils or ServerUtils()
-
-    def start_worker(self, **kwargs) -> bool:
-        """启动Celery Worker
-
-        Returns:
-            bool: 启动是否成功
-        """
-        if self.is_running:
-            server_logger.warning("Celery Worker已在运行")
-            return True
-
-        try:
-            # 启动前清理残留任务
-            self._cleanup_redis_tasks("启动前")
-
-            # 创建Worker函数
-            def run_celery_worker():
-                try:
-                    server_logger.info("Celery Worker开始运行...")
-                    server_logger.info("Worker配置: 并发数=4, 进程池=prefork, 日志输出=终端")
-
-                    # 配置子进程日志输出
-                    from foundation.observability.logger.loggering import configure_logging_for_subprocess
-                    configure_logging_for_subprocess()
-
-                    # 构建 Celery worker 参数
-                    worker_args = [
-                        'worker',                           # 子命令
-                        '-c', '4',                          # 并发数:4个worker进程
-                        '-P', 'prefork',                    # 进程池类型:prefork
-                        '-l', 'info',                       # 日志级别
-                        '--without-heartbeat',              # 禁用心跳(Windows兼容)
-                        '--without-gossip',                 # 禁用gossip(Windows兼容)
-                        '--without-mingle',                 # 禁用mingle(Windows兼容)
-                    ]
-
-                    # Windows 平台额外设置
-                    if sys.platform == 'win32':
-                        server_logger.info("Windows平台 detected,启用兼容性设置")
-                        os.environ['FORKED_BY_MULTIPROCESSING'] = '1'
-
-                    celery_app.worker_main(worker_args)
-                except KeyboardInterrupt:
-                    server_logger.info("收到停止信号,Celery Worker退出")
-                except Exception as e:
-                    server_logger.error(f"Celery Worker运行时出错: {e}")
-                    server_logger.exception("详细错误信息:")
-                finally:
-                    self.is_running = False
-                    server_logger.info("Celery Worker已停止")
-
-            # 在单独线程中启动Worker
-            self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True)
-            self.worker_thread.start()
-            self.is_running = True
-
-            # 等待启动
-            time.sleep(2)
-
-            success = self.is_running and self.worker_thread.is_alive()
-            if success:
-                server_logger.info("Celery Worker启动成功")
-            else:
-                server_logger.error("Celery Worker启动失败")
-                self.is_running = False
-
-            return success
-
-        except ImportError as e:
-            server_logger.error(f"导入Celery失败: {e}")
-            server_logger.info("请先安装Celery: pip install celery redis")
-            return False
-        except Exception as e:
-            server_logger.error(f"启动Celery Worker失败: {e}")
-            server_logger.exception("详细错误信息:")
-            return False
-
-    def stop_worker(self, timeout: int = 5) -> bool:
-        """优雅停止Celery Worker
-
-        Args:
-            timeout: 停止超时时间(秒)
-
-        Returns:
-            bool: 停止是否成功
-        """
-        if not self.is_running:
-            server_logger.info("Celery Worker未运行")
-            return True
-
-        try:
-            server_logger.info("停止Celery Worker...")
-            self.shutdown_event.set()
-
-            if self.worker_thread and self.worker_thread.is_alive():
-                # 尝试优雅停止
-                start_time = time.time()
-                while self.is_running and (time.time() - start_time) < timeout:
-                    time.sleep(0.1)
-
-                if self.is_running:
-                    server_logger.warning("Celery Worker优雅停止超时")
-                else:
-                    server_logger.info("Celery Worker已优雅停止")
-
-            # 停止后清理Redis任务
-            self._cleanup_redis_tasks("停止时")
-
-            self.is_running = False
-            self.shutdown_event.clear()
-            return True
-
-        except Exception as e:
-            server_logger.error(f"停止Celery Worker失败: {e}")
-            return False
-
-    def stop_worker_immediately(self) -> bool:
-        """立即停止Celery Worker,不等待
-
-        Returns:
-            bool: 停止是否成功
-        """
-        if not self.is_running:
-            server_logger.info("Celery Worker未运行")
-            return True
-
-        try:
-            server_logger.info("立即停止Celery Worker...")
-            self.shutdown_event.set()
-
-            # 发送中断信号
-            if hasattr(os, 'kill'):
-                try:
-                    os.kill(os.getpid(), signal.SIGINT)
-                    server_logger.info("已发送中断信号")
-                except:
-                    pass
-
-            # 停止后清理Redis任务
-            self._cleanup_redis_tasks("立即停止时")
-
-            self.is_running = False
-            self.shutdown_event.clear()
-            server_logger.info("Celery Worker已立即停止")
-            return True
-
-        except Exception as e:
-            server_logger.error(f"立即停止Celery Worker失败: {e}")
-            self.is_running = False
-            return False
-
-    def get_status(self) -> dict:
-        """获取Worker状态
-
-        Returns:
-            dict: 包含worker状态的字典
-        """
-        return {
-            "is_running": self.is_running,
-            "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False,
-        }
-
-    def _cleanup_redis_tasks(self, phase: str):
-        """清理Redis中的Celery任务 - 增强版
-
-        Args:
-            phase: 清理阶段(启动前/停止时/立即停止时)
-        """
-        try:
-            r = self.server_utils.get_redis_connection()
-            server_logger.info(f"{phase}清理Redis中的Celery任务...")
-
-            # 1. 首先停止所有正在运行的Worker(如果是启动前清理)
-            if phase == "启动前":
-                self._kill_existing_celery_workers()
-
-            # 2. 清理任务相关键
-            task_keys = r.keys('task:*')
-            celery_meta_keys = r.keys('celery-task-meta-*')
-            current_keys = r.keys('current:*')
-            kombu_keys = r.keys('_kombu.binding.*')
-
-            # 3. 清理Celery内部未确认任务队列(关键!)
-            unacked_keys = r.keys('unacked*')  # 未确认的任务
-            qos_keys = r.keys('celery@*')      # QoS/预取相关键
-
-            all_keys = (
-                task_keys + celery_meta_keys + current_keys +
-                kombu_keys + unacked_keys + qos_keys
-            )
-
-            for key in all_keys:
-                try:
-                    r.delete(key)
-                    server_logger.debug(f"{phase}清理: {key}")
-                except Exception as e:
-                    server_logger.warning(f"{phase}清理 {key} 失败: {e}")
-
-            # 4. 清理标准队列和优先级队列
-            queues = ['celery', 'celery.pidbox', 'celeryev']
-            for queue in queues:
-                try:
-                    r.delete(queue)
-                    server_logger.debug(f"{phase}清理队列: {queue}")
-                except Exception as e:
-                    server_logger.warning(f"{phase}清理队列 {queue} 失败: {e}")
-
-            # 5. 清理优先级队列(celery~1, celery~2 等)
-            priority_queues = r.keys('celery~*')
-            for queue in priority_queues:
-                try:
-                    r.delete(queue)
-                    server_logger.debug(f"{phase}清理优先级队列: {queue}")
-                except Exception as e:
-                    server_logger.warning(f"{phase}清理优先级队列 {queue} 失败: {e}")
-
-            # 6. 最后验证:确保队列为空
-            for queue in ['celery']:
-                try:
-                    queue_len = r.llen(queue)
-                    if queue_len > 0:
-                        server_logger.warning(f"队列 {queue} 仍有 {queue_len} 个任务,强制清空")
-                        r.delete(queue)
-                except Exception as e:
-                    server_logger.warning(f"验证队列 {queue} 失败: {e}")
-
-            if all_keys or priority_queues:
-                server_logger.info(f"{phase}已清理 {len(all_keys)} 个Redis键和 {len(priority_queues)} 个优先级队列")
-            else:
-                server_logger.info(f"{phase}未发现需要清理的残留任务")
-
-            # 7. 启动前清理时,等待一小段时间确保Redis同步完成
-            if phase == "启动前":
-                time.sleep(0.5)
-
-        except Exception as e:
-            server_logger.error(f"{phase}清理Redis任务失败: {e}")
-
-    def _kill_existing_celery_workers(self):
-        """终止所有现有的Celery Worker进程"""
-        try:
-            import subprocess
-            import platform
-
-            system = platform.system()
-            server_logger.info("检查并终止现有的Celery Worker进程...")
-
-            if system == "Windows":
-                # Windows: 使用 taskkill
-                try:
-                    # 查找 celery 进程
-                    result = subprocess.run(
-                        ['tasklist', '/FI', 'IMAGENAME eq python.exe', '/FO', 'CSV'],
-                        capture_output=True, text=True
-                    )
-                    if 'celery' in result.stdout.lower():
-                        subprocess.run(['taskkill', '/F', '/IM', 'celery.exe'], capture_output=True)
-                        server_logger.info("已终止现有的Celery Worker进程")
-                except Exception as e:
-                    server_logger.debug(f"终止Celery进程时出错: {e}")
-            else:
-                # Linux/Mac: 使用 pkill
-                try:
-                    subprocess.run(['pkill', '-f', 'celery worker'], capture_output=True)
-                    server_logger.info("已终止现有的Celery Worker进程")
-                except Exception as e:
-                    server_logger.debug(f"终止Celery进程时出错: {e}")
-
-            # 短暂等待确保进程完全终止
-            time.sleep(0.5)
-
-        except Exception as e:
-            server_logger.warning(f"终止现有Celery Worker失败: {e}")
-
-    def __enter__(self):
-        return self
 
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        self.stop_worker()
-
-class ApplicationFactory:
-    """应用工厂类 - 负责创建和配置FastAPI应用"""
-
-    def __init__(self):
-        """初始化应用工厂"""
-        self.server_utils = ServerUtils()
-        self.celery_manager = CeleryWorkerManager(self.server_utils)
-
-    def create_app(self) -> FastAPI:
-        """创建FastAPI应用实例
-
-        Returns:
-            FastAPI: 配置完成的应用实例
-        """
-        app = FastAPI(
-            title="Agent API - 施工方案审查系统",
-            version="0.3",
-            description="Agent API - 集成施工方案审查功能",
-            lifespan=lifespan
-        )
-
-        # 使用路由管理器配置应用
-        route_manager = RouteManager(app)
-
-        return app
-
-    def create_server_config(self) -> dict:
-        """创建服务器配置
-
-        Returns:
-            dict: 服务器配置字典
-        """
-        # 确保端口号是整数类型
-        port = config_handler.get('launch', 'LAUNCH_PORT', 8002)
-        try:
-            port = int(port)
-        except (ValueError, TypeError):
-            port = 8002
-
-        return {
-            'host': config_handler.get('launch', 'HOST', '0.0.0.0'),
-            'port': port,
-            'reload': False,
-            'with_celery': True
-        }
-
-
-# 全局实例
 app_factory = ApplicationFactory()
 celery_manager = app_factory.celery_manager
 
-# 创建 FastAPI 应用
-def create_app() -> FastAPI:
-    """创建主应用服务"""
-    app = FastAPI(
-        title="Agent API - 施工方案审查系统",
-        version="0.3",
-        description="Agent API - 集成施工方案审查功能",
-        lifespan=lifespan
-    )
-
-    # 添加 CORS 中间件
-    app.add_middleware(
-        CORSMiddleware,
-        allow_origins=["*"],  # 允许所有的来源
-        allow_credentials=True,
-        allow_methods=["*"],  # 允许的HTTP方法
-        allow_headers=["*"],  # 允许的请求头
-    )
-
-    # 添加所有路由
-    app.include_router(file_upload_router)
-    app.include_router(review_results_router)
-    app.include_router(launch_review_router)
-    app.include_router(desensitize_router)  # 数据脱敏路由
-
-    # 施工方案编写路由
-    app.include_router(outline_router)
-    app.include_router(content_completion_router)
-    app.include_router(regenerate_outline_router)
-    app.include_router(task_cancel_router)
-
-    # 全局异常处理
-    @app.exception_handler(HTTPException)
-    async def http_exception_handler(request, exc):
-        return JSONResponse(
-            status_code=exc.status_code,
-            content=exc.detail
-        )
-
-    # 健康检查
-    @app.get("/health")
-    async def health_check():
-        timestamp = datetime.datetime.now().isoformat()
-        return {"status": "healthy", "timestamp": timestamp}
-
-    # Celery状态检查
-    @app.get("/celery/status")
-    async def get_celery_status():
-        """获取Celery Worker状态"""
-        global celery_manager
-        status = celery_manager.get_status()
-        return {
-            "celery_worker": status,
-            "timestamp": datetime.datetime.now().isoformat()
-        }
-
-    return app
-server_utils = ServerUtils()
-
-def cleanup_redis_before_start():
-    """启动前清理Redis中的残留Celery任务"""
-    try:
-        # 使用统一的Redis连接工具函数
-        r = server_utils.get_redis_connection()
-
-        server_logger.info("清理Redis中的残留Celery任务...")
-
-        # 清理所有Celery相关的键,包括更多模式
-        keys_to_delete = []
-        for key in r.keys():
-            key_lower = key.lower()
-            # 扩展匹配模式,包括你遇到的实际键格式
-            if any(keyword in key_lower for keyword in [
-                'celery', 'task:', 'celery-task', 'kombu', 'current:'
-            ]):
-                keys_to_delete.append(key)
-            # 匹配特定模式
-            elif key.startswith('celery-task-meta-') or key.startswith('current:'):
-                keys_to_delete.append(key)
-            # 临时键
-            elif key == 't_key':
-                keys_to_delete.append(key)
-
-        # 清理消息队列
-        try:
-            # 清理所有Celery队列
-            queues = ['celery', 'celery.pidbox', 'celeryev']
-            for queue in queues:
-                # 删除队列
-                r.delete(queue)
-                server_logger.debug(f"已清理队列: {queue}")
-
-            # 清理Kombu绑定
-            kombu_keys = r.keys('_kombu.binding.*')
-            for key in kombu_keys:
-                r.delete(key)
-                server_logger.debug(f"已清理Kombu绑定: {key}")
-
-        except Exception as e:
-            server_logger.warning(f"清理队列失败: {e}")
-
-        # 清理识别到的键
-        if keys_to_delete:
-            for key in keys_to_delete:
-                try:
-                    r.delete(key)
-                    server_logger.debug(f"已清理: {key}")
-                except Exception as e:
-                    server_logger.warning(f"清理 {key} 失败: {e}")
-
-            server_logger.info(f"成功清理 {len(keys_to_delete)} 个Redis键")
-        else:
-            server_logger.info("没有发现需要清理的残留任务")
-
-        # 额外检查:确保关键队列被清空
-        try:
-            # 使用FLUSHDB只清空Celery相关的数据,而不是整个数据库
-            # 这里我们检查是否还有残留,如果有则进行更彻底的清理
-            remaining_keys = []
-            for key in r.keys():
-                if any(pattern in key.lower() for pattern in ['celery', 'kombu']):
-                    remaining_keys.append(key)
-
-            if remaining_keys:
-                server_logger.warning(f"发现 {len(remaining_keys)} 个残留键,进行彻底清理")
-                for key in remaining_keys:
-                    try:
-                        r.delete(key)
-                        server_logger.debug(f"彻底清理: {key}")
-                    except Exception as e:
-                        server_logger.warning(f"彻底清理 {key} 失败: {e}")
-        except Exception as e:
-            server_logger.warning(f"彻底清理检查失败: {e}")
-
-        return True
-
-    except Exception as e:
-        server_logger.error(f"清理Redis残留任务失败: {e}")
-        return False
-
-def start_celery_worker_background():
-    """在后台启动Celery Worker(异步方式)"""
-    # 添加调用栈调试
-    server_logger.info("=== Celery Worker启动调用栈 ===")
-    for line in traceback.format_stack():
-        server_logger.debug(f"  {line.strip()}")
-    server_logger.info("=== 调用栈结束 ===")
-
-    # 清理逻辑已在 celery_manager.start_worker() 内部的 _cleanup_redis_tasks() 中处理
-    return celery_manager.start_worker()
-
-def stop_celery_worker():
-    """停止Celery Worker"""
-    global celery_manager
-
-    # 立即取消所有任务注册(使用DB0,与启动时保持一致)
-    try:
-        # 使用统一的Redis连接工具函数
-        r = server_utils.get_redis_connection()
-
-        server_logger.info("停止时清理Redis中的Celery任务...")
-
-        # 清理任务相关键
-        task_keys = r.keys('task:*')  # 重复任务检查器的数据
-        celery_meta_keys = r.keys('celery-task-meta-*')
-        current_keys = r.keys('current:*')
-        kombu_keys = r.keys('_kombu.binding.*')
-
-        all_keys = task_keys + celery_meta_keys + current_keys + kombu_keys
-
-        for key in all_keys:
-            try:
-                r.delete(key)
-                server_logger.debug(f"停止时清理: {key}")
-            except Exception as e:
-                server_logger.warning(f"停止时清理 {key} 失败: {e}")
-
-        # 清理队列
-        queues = ['celery', 'celery.pidbox', 'celeryev']
-        for queue in queues:
-            try:
-                r.delete(queue)
-                server_logger.debug(f"停止时清理队列: {queue}")
-            except Exception as e:
-                server_logger.warning(f"停止时清理队列 {queue} 失败: {e}")
-
-        server_logger.info(f"停止时已清理 {len(all_keys)} 个Redis键")
-
-    except Exception as e:
-        server_logger.error(f"停止时清理Redis任务失败: {e}")
-
-    # 立即停止Worker,不等待
-    return celery_manager.stop_worker_immediately()
-
-def run_server(host: str = None, port: int = None, reload: bool = False,
-                with_celery: bool = True):
-    """运行服务器"""
-
-    # 从配置文件获取默认值
-    if host is None:
-        host = config_handler.get('launch', 'HOST', '0.0.0.0')
-    if port is None:
-        port = config_handler.get('launch', 'LAUNCH_PORT')
-
-    if with_celery:
-        # 启动Celery Worker
-        start_celery_worker_background()
-
-        # 注册退出时的清理函数
-        import atexit
-        atexit.register(stop_celery_worker)
-
-        # 设置信号处理
-        def signal_handler(signum, frame):
-            server_logger.info(f"收到信号 {signum},正在停止服务...")
-            stop_celery_worker()
-            sys.exit(0)
-
-        # Windows和Unix系统的信号处理
-        try:
-            signal.signal(signal.SIGINT, signal_handler)  # Ctrl+C
-            signal.signal(signal.SIGTERM, signal_handler)  # 终止信号
-        except AttributeError:
-            # Windows可能不支持某些信号
-            pass
-
-        # Windows特有的控制台事件处理
-        if sys.platform == 'win32':
-            try:
-                import win32api
-                def win32_handler(dwCtrlType):
-                    # 正确的控制台事件常量
-                    CTRL_C_EVENT = 0
-                    CTRL_BREAK_EVENT = 1
-                    CTRL_CLOSE_EVENT = 2
-                    CTRL_SHUTDOWN_EVENT = 6
-
-                    if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT):
-                        server_logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...")
-                        stop_celery_worker()
-                        sys.exit(0)
-                    return False
-                win32api.SetConsoleCtrlHandler(win32_handler, True)
-            except (ImportError, AttributeError) as e:
-                # 如果win32api不可用,跳过Windows控制台处理
-                server_logger.debug(f"Windows控制台事件处理不可用: {e}")
-                pass
-
-    try:
-        if reload:
-            # 重载模式需要正确的模块路径
-            app_import_path = "server.app:app"
-            uvicorn.run(app_import_path, host=host, port=port, reload=reload)
-        else:
-            # 直接运行模式,直接使用app对象
-            uvicorn.run(app, host=host, port=port)
-    finally:
-        if with_celery:
-            stop_celery_worker()
-
-
-app = create_app()
-
-server_logger.info(msg="APP init successfully - 集成施工方案审查系统")
-
-class ServerRunner:
-    """服务器运行器 - 简化的主启动入口"""
-
-    def __init__(self, app_factory: ApplicationFactory):
-        """初始化服务器运行器
-
-        Args:
-            app_factory: 应用工厂实例
-        """
-        self.app_factory = app_factory
-        self.celery_manager = app_factory.celery_manager
-
-    def run_server(self, **kwargs):
-        """运行服务器
-
-        Args:
-            **kwargs: 服务器配置参数
-        """
-        # 获取配置
-        config = self.app_factory.create_server_config()
-        config.update(kwargs)
-
-        host = config.get('host', '0.0.0.0')
-        port = config.get('port', 8002)
-        # 确保端口号是整数类型
-        try:
-            port = int(port)
-        except (ValueError, TypeError):
-            port = 8002
-
-        reload = config.get('reload', False)
-        with_celery = config.get('with_celery', True)
-
-        if with_celery:
-            self._setup_celery_integration()
-
-        # 创建应用实例
-        app = self.app_factory.create_app()
-
-        try:
-            if reload:
-                app_import_path = "server.app:app"
-                uvicorn.run(app_import_path, host=host, port=port, reload=reload)
-            else:
-                uvicorn.run(app, host=host, port=port)
-        finally:
-            if with_celery:
-                self.celery_manager.stop_worker()
-
-    def _setup_celery_integration(self):
-        """设置Celery集成"""
-        # 启动Celery Worker
-        self.celery_manager.start_worker()
-
-        # 注册退出处理
-        import atexit
-        atexit.register(self.celery_manager.stop_worker_immediately)
-
-        # 设置信号处理
-        self._setup_signal_handlers()
-
-    def _setup_signal_handlers(self):
-        """设置信号处理器"""
-        def signal_handler(signum, frame):
-            server_logger.info(f"收到信号 {signum},正在停止服务...")
-            self.celery_manager.stop_worker_immediately()
-            sys.exit(0)
-
-        # 通用信号处理
-        try:
-            signal.signal(signal.SIGINT, signal_handler)  # Ctrl+C
-            signal.signal(signal.SIGTERM, signal_handler)  # 终止信号
-        except AttributeError:
-            # Windows可能不支持某些信号
-            pass
-
-        # Windows特有处理
-        if sys.platform == 'win32':
-            self._setup_windows_signal_handler()
-
-    def _setup_windows_signal_handler(self):
-        """设置Windows信号处理器"""
-        try:
-            import win32api
-            def win32_handler(dwCtrlType):
-                CTRL_C_EVENT = 0
-                CTRL_BREAK_EVENT = 1
-                CTRL_CLOSE_EVENT = 2
-                CTRL_SHUTDOWN_EVENT = 6
-
-                if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT):
-                    server_logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...")
-                    self.celery_manager.stop_worker_immediately()
-                    sys.exit(0)
-                return False
-            win32api.SetConsoleCtrlHandler(win32_handler, True)
-        except (ImportError, AttributeError) as e:
-            server_logger.debug(f"Windows控制台事件处理不可用: {e}")
-
-
-# 创建应用实例和运行器
 app = app_factory.create_app()
 server_runner = ServerRunner(app_factory)
 
 server_logger.info(msg="APP init successfully - 集成施工方案审查系统")
 
-
-# 运行Uvicorn服务器
 if __name__ == "__main__":
-    # 使用新的服务器运行器启动
     config = app_factory.create_server_config()
-
     server_logger.info(f"Agent API服务启动中...运行在{config['host']}:{config['port']}")
-
     if config['with_celery']:
         server_logger.info("Celery Worker: 已集成启动")
     else:
         server_logger.warning("Celery Worker: 已禁用")
-
     server_runner.run_server(**config)

+ 123 - 0
server/celery_manager.py

@@ -0,0 +1,123 @@
+"""
+Celery Worker 生命周期管理。
+"""
+import os
+import sys
+import time
+import threading
+from foundation.observability.logger.loggering import server_logger
+from foundation.infrastructure.messaging.celery_app import app as celery_app
+from server.redis_cleanup import cleanup_redis_tasks
+
+
+class CeleryWorkerManager:
+    """Celery Worker 程序化管理器。"""
+
+    def __init__(self, server_utils):
+        self.worker = None
+        self.is_running = False
+        self.worker_thread = None
+        self.shutdown_event = threading.Event()
+        self.server_utils = server_utils
+
+    def start_worker(self, **kwargs) -> bool:
+        if self.is_running:
+            server_logger.warning("Celery Worker已在运行")
+            return True
+
+        try:
+            r = self.server_utils.get_redis_connection()
+            cleanup_redis_tasks(r, "启动前")
+
+            def run_celery_worker():
+                try:
+                    from foundation.observability.logger.loggering import configure_logging_for_subprocess
+                    configure_logging_for_subprocess()
+
+                    worker_args = [
+                        'worker', '-c', '4', '-P', 'prefork', '-l', 'info',
+                        '--without-heartbeat', '--without-gossip', '--without-mingle',
+                    ]
+                    if sys.platform == 'win32':
+                        os.environ['FORKED_BY_MULTIPROCESSING'] = '1'
+                    celery_app.worker_main(worker_args)
+                except KeyboardInterrupt:
+                    server_logger.info("收到停止信号,Celery Worker退出")
+                except Exception as e:
+                    server_logger.error(f"Celery Worker运行时出错: {e}")
+                finally:
+                    self.is_running = False
+
+            self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True)
+            self.worker_thread.start()
+            self.is_running = True
+            time.sleep(2)
+
+            success = self.is_running and self.worker_thread.is_alive()
+            if success:
+                server_logger.info("Celery Worker启动成功")
+            else:
+                server_logger.error("Celery Worker启动失败")
+                self.is_running = False
+            return success
+
+        except ImportError as e:
+            server_logger.error(f"导入Celery失败: {e}")
+            return False
+        except Exception as e:
+            server_logger.error(f"启动Celery Worker失败: {e}")
+            return False
+
+    def stop_worker(self, timeout: int = 5) -> bool:
+        if not self.is_running:
+            return True
+
+        try:
+            server_logger.info("停止Celery Worker...")
+            self.shutdown_event.set()
+
+            if self.worker_thread and self.worker_thread.is_alive():
+                start_time = time.time()
+                while self.is_running and (time.time() - start_time) < timeout:
+                    time.sleep(0.1)
+
+            r = self.server_utils.get_redis_connection()
+            cleanup_redis_tasks(r, "停止时")
+
+            self.is_running = False
+            self.shutdown_event.clear()
+            return True
+        except Exception as e:
+            server_logger.error(f"停止Celery Worker失败: {e}")
+            return False
+
+    def stop_worker_immediately(self) -> bool:
+        if not self.is_running:
+            return True
+
+        try:
+            server_logger.info("立即停止Celery Worker...")
+            self.shutdown_event.set()
+
+            r = self.server_utils.get_redis_connection()
+            cleanup_redis_tasks(r, "立即停止时")
+
+            self.is_running = False
+            self.shutdown_event.clear()
+            return True
+        except Exception as e:
+            server_logger.error(f"立即停止Celery Worker失败: {e}")
+            self.is_running = False
+            return False
+
+    def get_status(self) -> dict:
+        return {
+            "is_running": self.is_running,
+            "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False,
+        }
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.stop_worker()

+ 56 - 0
server/factory.py

@@ -0,0 +1,56 @@
+"""
+应用工厂:创建和配置 FastAPI 应用实例。
+"""
+import redis
+from fastapi import FastAPI
+from views import lifespan
+from foundation.infrastructure.config.config import config_handler
+from server.celery_manager import CeleryWorkerManager
+
+
+class ServerUtils:
+    """Redis 连接工具。"""
+
+    @staticmethod
+    def get_redis_connection():
+        redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
+        redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
+        redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
+        redis_db = config_handler.get('redis', 'REDIS_DB', '0')
+        if redis_password:
+            redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}'
+        else:
+            redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}'
+        return redis.from_url(redis_url, decode_responses=True)
+
+
+class ApplicationFactory:
+    """应用工厂。"""
+
+    def __init__(self):
+        self.server_utils = ServerUtils()
+        self.celery_manager = CeleryWorkerManager(self.server_utils)
+
+    def create_app(self) -> FastAPI:
+        from server.routes import register_routes
+        app = FastAPI(
+            title="Agent API - 施工方案审查系统",
+            version="0.3",
+            description="Agent API - 集成施工方案审查功能",
+            lifespan=lifespan,
+        )
+        register_routes(app)
+        return app
+
+    def create_server_config(self) -> dict:
+        port = config_handler.get('launch', 'LAUNCH_PORT', 8002)
+        try:
+            port = int(port)
+        except (ValueError, TypeError):
+            port = 8002
+        return {
+            'host': config_handler.get('launch', 'HOST', '0.0.0.0'),
+            'port': port,
+            'reload': False,
+            'with_celery': True,
+        }

+ 78 - 0
server/redis_cleanup.py

@@ -0,0 +1,78 @@
+"""
+Redis 残留任务清理。合并了原来散落在 app.py 中 3 处的清理逻辑。
+"""
+import time
+import subprocess
+import platform
+from foundation.observability.logger.loggering import server_logger
+
+
+def cleanup_redis_tasks(redis_client, phase: str = "启动前"):
+    """清理 Redis 中的 Celery 残留任务。
+
+    Args:
+        redis_client: Redis 连接对象
+        phase: "启动前" | "停止时" — 控制是否杀死现有 worker
+    """
+    try:
+        server_logger.info(f"{phase}清理Redis中的Celery任务...")
+
+        if phase == "启动前":
+            _kill_existing_celery_workers()
+
+        patterns = [
+            'task:*', 'celery-task-meta-*', 'current:*', '_kombu.binding.*',
+            'unacked*', 'celery@*', 'celery~*',
+        ]
+        all_keys = []
+        for pattern in patterns:
+            all_keys.extend(redis_client.keys(pattern))
+
+        for key in all_keys:
+            try:
+                redis_client.delete(key)
+            except Exception as e:
+                server_logger.warning(f"{phase}清理 {key} 失败: {e}")
+
+        for queue in ['celery', 'celery.pidbox', 'celeryev']:
+            try:
+                redis_client.delete(queue)
+            except Exception as e:
+                server_logger.warning(f"{phase}清理队列 {queue} 失败: {e}")
+
+        for queue in ['celery']:
+            queue_len = redis_client.llen(queue)
+            if queue_len > 0:
+                server_logger.warning(f"队列 {queue} 仍有 {queue_len} 个任务,强制清空")
+                redis_client.delete(queue)
+
+        if all_keys:
+            server_logger.info(f"{phase}已清理 {len(all_keys)} 个Redis键")
+        else:
+            server_logger.info(f"{phase}未发现需要清理的残留任务")
+
+        if phase == "启动前":
+            time.sleep(0.5)
+
+    except Exception as e:
+        server_logger.error(f"{phase}清理Redis任务失败: {e}")
+
+
+def _kill_existing_celery_workers():
+    """终止所有现有的 Celery Worker 进程。"""
+    system = platform.system()
+    try:
+        if system == "Windows":
+            result = subprocess.run(
+                ['tasklist', '/FI', 'IMAGENAME eq python.exe', '/FO', 'CSV'],
+                capture_output=True, text=True,
+            )
+            if 'celery' in result.stdout.lower():
+                subprocess.run(['taskkill', '/F', '/IM', 'celery.exe'], capture_output=True)
+                server_logger.info("已终止现有的Celery Worker进程")
+        else:
+            subprocess.run(['pkill', '-f', 'celery worker'], capture_output=True)
+            server_logger.info("已终止现有的Celery Worker进程")
+        time.sleep(0.5)
+    except Exception as e:
+        server_logger.warning(f"终止现有Celery Worker失败: {e}")

+ 62 - 0
server/routes.py

@@ -0,0 +1,62 @@
+"""
+路由注册:集中管理所有路由和中间件。
+"""
+import datetime
+from fastapi import FastAPI, HTTPException
+from fastapi.responses import JSONResponse
+from fastapi.middleware.cors import CORSMiddleware
+
+from views.construction_review.file_upload import file_upload_router
+from views.construction_review.review_results import review_results_router
+from views.construction_review.launch_review import launch_review_router
+from views.construction_review.task_control import task_control_router
+from views.construction_review.desensitize_api import desensitize_router
+from views.construction_write.outline_views import outline_router
+from views.construction_write.content_completion import content_completion_router
+from views.construction_write.regenerate_views import regenerate_outline_router
+from views.construction_write.task_cancel_views import task_cancel_router
+
+
+def register_routes(app: FastAPI):
+    """在 app 上注册所有路由和中间件。"""
+
+    app.add_middleware(
+        CORSMiddleware,
+        allow_origins=["*"],
+        allow_credentials=True,
+        allow_methods=["*"],
+        allow_headers=["*"],
+    )
+
+    app.include_router(file_upload_router)
+    app.include_router(review_results_router)
+    app.include_router(launch_review_router)
+    app.include_router(task_control_router)
+    app.include_router(desensitize_router)
+
+    app.include_router(outline_router)
+    app.include_router(content_completion_router)
+    app.include_router(regenerate_outline_router)
+    app.include_router(task_cancel_router)
+
+    @app.exception_handler(HTTPException)
+    async def http_exception_handler(request, exc):
+        return JSONResponse(status_code=exc.status_code, content=exc.detail)
+
+    @app.get("/health", tags=["系统状态"])
+    async def health_check():
+        return {"status": "healthy", "timestamp": datetime.datetime.now().isoformat()}
+
+    @app.get("/celery/status", tags=["系统状态"])
+    async def get_celery_status():
+        from server.app import app_factory
+        status = app_factory.celery_manager.get_status()
+        return {"celery_worker": status, "timestamp": datetime.datetime.now().isoformat()}
+
+    @app.get("/api/docs/info", tags=["系统状态"])
+    async def api_info():
+        return {
+            "title": "Agent API - 施工方案审查系统",
+            "version": "0.3",
+            "docs_urls": {"swagger_ui": "/docs", "redoc": "/redoc", "openapi_json": "/openapi.json"},
+        }

+ 40 - 0
server/runner.py

@@ -0,0 +1,40 @@
+"""
+服务器运行器:整合所有组件并启动 uvicorn。
+"""
+import atexit
+import uvicorn
+from foundation.observability.logger.loggering import server_logger
+from server.signals import setup_signal_handlers
+
+
+class ServerRunner:
+    """服务器运行器。"""
+
+    def __init__(self, app_factory):
+        self.app_factory = app_factory
+        self.celery_manager = app_factory.celery_manager
+
+    def run_server(self, **kwargs):
+        config = self.app_factory.create_server_config()
+        config.update(kwargs)
+
+        host = config.get('host', '0.0.0.0')
+        port = int(config.get('port', 8002))
+        reload = config.get('reload', False)
+        with_celery = config.get('with_celery', True)
+
+        if with_celery:
+            self.celery_manager.start_worker()
+            atexit.register(self.celery_manager.stop_worker_immediately)
+            setup_signal_handlers(lambda: self.celery_manager.stop_worker_immediately())
+
+        app = self.app_factory.create_app()
+
+        try:
+            if reload:
+                uvicorn.run("server.app:app", host=host, port=port, reload=reload)
+            else:
+                uvicorn.run(app, host=host, port=port)
+        finally:
+            if with_celery:
+                self.celery_manager.stop_worker()

+ 42 - 0
server/signals.py

@@ -0,0 +1,42 @@
+"""
+操作系统信号和 Windows 控制台事件处理。
+"""
+import sys
+import signal
+from foundation.observability.logger.loggering import server_logger
+
+
+def setup_signal_handlers(shutdown_callback):
+    """注册信号处理器,shutdown_callback 会在收到终止信号时调用。"""
+
+    def handler(signum, frame):
+        server_logger.info(f"收到信号 {signum},正在停止服务...")
+        shutdown_callback()
+        sys.exit(0)
+
+    try:
+        signal.signal(signal.SIGINT, handler)
+        signal.signal(signal.SIGTERM, handler)
+    except AttributeError:
+        pass
+
+    if sys.platform == 'win32':
+        _setup_windows_handler(shutdown_callback)
+
+
+def _setup_windows_handler(shutdown_callback):
+    try:
+        import win32api
+        def win32_handler(dwCtrlType):
+            CTRL_C_EVENT = 0
+            CTRL_BREAK_EVENT = 1
+            CTRL_CLOSE_EVENT = 2
+            CTRL_SHUTDOWN_EVENT = 6
+            if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT):
+                server_logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...")
+                shutdown_callback()
+                sys.exit(0)
+            return False
+        win32api.SetConsoleCtrlHandler(win32_handler, True)
+    except (ImportError, AttributeError) as e:
+        server_logger.debug(f"Windows控制台事件处理不可用: {e}")