Просмотр исходного кода

v0.0.3-重构启动模块
- 重构server\app.py结构
- 在config.ini 中增加launch配置项

WangXuMing 2 месяцев назад
Родитель
Сommit
fe30970cb2
2 измененных файлов с 402 добавлено и 171 удалено
  1. 4 0
      config/config.ini
  2. 398 171
      server/app.py

+ 4 - 0
config/config.ini

@@ -40,6 +40,10 @@ APP_CODE=lq-agent
 APP_SECRET=sx-73d32556-605e-11f0-9dd8-acde48001122
 
 
+[launch]
+HOST = 0.0.0.0
+LAUNCH_PORT = 8002
+
 [redis]
 REDIS_URL=redis://:123456@127.0.0.1:6379
 REDIS_HOST=127.0.0.1

+ 398 - 171
server/app.py

@@ -1,21 +1,19 @@
 import os
 import sys
 import time
+import signal
 import uvicorn
 import datetime
-import threading
-from pathlib import Path
-import argparse
-import signal
 import traceback
-
+import threading
 BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 sys.path.insert(0, BASE_DIR)
 
 from views import lifespan
-from fastapi.middleware.cors import CORSMiddleware
 from fastapi import FastAPI, HTTPException
 from fastapi.responses import JSONResponse
+from fastapi.middleware.cors import CORSMiddleware
+from foundation.base.config import config_handler
 from foundation.logger.loggering import server_logger
 from foundation.base.celery_app import app as celery_app
 
@@ -26,126 +24,130 @@ from views.construction_review.review_results import review_results_router
 from views.construction_review.launch_review import launch_review_router
 
 
-# 创建 FastAPI 应用
-def create_app() -> FastAPI:
-    """创建主应用服务"""
-    app = FastAPI(
-        title="Agent API - 施工方案审查系统",
-        version="0.3",
-        description="Agent API - 集成施工方案审查功能",
-        lifespan=lifespan
-    )
+class ServerUtils:
+    """服务器工具函数类 - 集中管理工具方法"""
 
-    # 添加 CORS 中间件
-    app.add_middleware(
-        CORSMiddleware,
-        allow_origins=["*"],  # 允许所有的来源
-        allow_credentials=True,
-        allow_methods=["*"],  # 允许的HTTP方法
-        allow_headers=["*"],  # 允许的请求头
-    )
+    @staticmethod
+    def get_redis_connection():
+        """获取Redis连接的统一工具函数
 
-    # 添加所有路由
-    app.include_router(test_router)
-    app.include_router(file_upload_router)
-    app.include_router(review_results_router)
-    app.include_router(launch_review_router)
+        Returns:
+            redis.Redis: Redis连接对象
+        """
+        import redis
+        from foundation.base.config import config_handler
 
-    # 全局异常处理
-    @app.exception_handler(HTTPException)
-    async def http_exception_handler(request, exc):
-        return JSONResponse(
-            status_code=exc.status_code,
-            content=exc.detail
-        )
+        # 从配置文件获取Redis连接参数
+        redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
+        redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
+        redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
+        redis_db = config_handler.get('redis', 'REDIS_DB', '0')
 
-    # 健康检查
-    @app.get("/health")
-    async def health_check():
-        timestamp = datetime.datetime.now().isoformat()
-        return {"status": "healthy", "timestamp": timestamp}
+        # 构建Redis URL
+        if redis_password:
+            redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}'
+        else:
+            redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}'
 
-    # Celery状态检查
-    @app.get("/celery/status")
-    async def get_celery_status():
-        """获取Celery Worker状态"""
-        global celery_manager
-        status = celery_manager.get_status()
-        return {
-            "celery_worker": status,
-            "timestamp": datetime.datetime.now().isoformat()
-        }
+        return redis.from_url(redis_url, decode_responses=True)
 
-    # API文档
-    @app.get("/api/docs")
-    async def api_docs():
-        return {
-            "title": "Agent API服务文档",
-            "description": "集成施工方案审查功能的API接口文档",
-            "version": "V.0.3",
-            "apis": [
-                {
-                    "name": "测试接口",
-                    "path": "/test",
-                    "method": "GET",
-                    "description": "系统测试接口"
-                },
-                {
-                    "name": "文档上传",
-                    "path": "/sgsc/file_upload",
-                    "method": "POST",
-                    "description": "上传施工方案文档"
-                },
-                {
-                    "name": "审查启动",
-                    "path": "/sgsc/sse/launch_review",
-                    "method": "POST",
-                    "description": "启动AI审查工作流"
-                },
-                {
-                    "name": "进度推送",
-                    "path": "/sgsc/sse/progress/{callback_task_id}",
-                    "method": "GET",
-                    "description": "SSE实时进度推送"
-                },
-                {
-                    "name": "结果获取",
-                    "path": "/sgsc/review_results",
-                    "method": "POST",
-                    "description": "获取审查结果"
-                }
-            ],
-        }
 
-    return app
+class RouteManager:
+    """路由管理类 - 负责路由配置和中间件设置"""
+
+    def __init__(self, app: FastAPI):
+        """初始化路由管理器
+
+        Args:
+            app: FastAPI应用实例
+        """
+        self.app = app
+        self._setup_cors()
+        self._setup_routes()
+        self._setup_exception_handlers()
+        self._setup_health_checks()
+
+
+    def _setup_cors(self):
+        """配置CORS中间件"""
+        self.app.add_middleware(
+            CORSMiddleware,
+            allow_origins=["*"],  # 允许所有的来源
+            allow_credentials=True,
+            allow_methods=["*"],  # 允许的HTTP方法
+            allow_headers=["*"],  # 允许的请求头
+        )
+
+    def _setup_routes(self):
+        """配置所有路由"""
+        self.app.include_router(test_router)
+        self.app.include_router(file_upload_router)
+        self.app.include_router(review_results_router)
+        self.app.include_router(launch_review_router)
+
+    def _setup_exception_handlers(self):
+        """配置全局异常处理"""
+        @self.app.exception_handler(HTTPException)
+        async def http_exception_handler(request, exc):
+            return JSONResponse(
+                status_code=exc.status_code,
+                content=exc.detail
+            )
+
+    def _setup_health_checks(self):
+        """配置健康检查接口"""
+        @self.app.get("/health")
+        async def health_check():
+            timestamp = datetime.datetime.now().isoformat()
+            return {"status": "healthy", "timestamp": timestamp}
+
+        @self.app.get("/celery/status")
+        async def get_celery_status():
+            """获取Celery Worker状态"""
+            # 延迟导入避免循环引用
+            from server.app import celery_manager
+            status = celery_manager.get_status()
+            return {
+                "celery_worker": status,
+                "timestamp": datetime.datetime.now().isoformat()
+            }
+
 
 
-# Celery Worker管理器
 class CeleryWorkerManager:
-    """Celery Worker程序化管理器"""
+    """Celery Worker程序化管理器 - 独立的Celery管理模块"""
 
-    def __init__(self):
+    def __init__(self, server_utils: ServerUtils = None):
+        """初始化Celery Worker管理器
+
+        Args:
+            server_utils: 服务器工具类实例
+        """
         self.worker = None
         self.is_running = False
         self.worker_thread = None
         self.shutdown_event = threading.Event()
+        self.server_utils = server_utils or ServerUtils()
+
+    def start_worker(self, **kwargs) -> bool:
+        """启动Celery Worker
 
-    def start_worker(self, **kwargs):
-        """启动Celery Worker"""
+        Returns:
+            bool: 启动是否成功
+        """
         if self.is_running:
             server_logger.warning("Celery Worker已在运行")
             return True
 
         try:
+            # 启动前清理残留任务
+            self._cleanup_redis_tasks("启动前")
+
             # 创建Worker函数
             def run_celery_worker():
                 try:
-                    # 使用最简单的启动方式
                     server_logger.info("Celery Worker开始运行...")
-
-                    # 直接启动worker,使用默认配置
                     celery_app.worker_main(['worker'])
-
                 except KeyboardInterrupt:
                     server_logger.info("收到停止信号,Celery Worker退出")
                 except Exception as e:
@@ -163,13 +165,14 @@ class CeleryWorkerManager:
             # 等待启动
             time.sleep(2)
 
-            if self.is_running and self.worker_thread.is_alive():
+            success = self.is_running and self.worker_thread.is_alive()
+            if success:
                 server_logger.info("Celery Worker启动成功")
-                return True
             else:
                 server_logger.error("Celery Worker启动失败")
                 self.is_running = False
-                return False
+
+            return success
 
         except ImportError as e:
             server_logger.error(f"导入Celery失败: {e}")
@@ -180,8 +183,15 @@ class CeleryWorkerManager:
             server_logger.exception("详细错误信息:")
             return False
 
-    def stop_worker(self, timeout: int = 5):
-        """停止Celery Worker"""
+    def stop_worker(self, timeout: int = 5) -> bool:
+        """优雅停止Celery Worker
+
+        Args:
+            timeout: 停止超时时间(秒)
+
+        Returns:
+            bool: 停止是否成功
+        """
         if not self.is_running:
             server_logger.info("Celery Worker未运行")
             return True
@@ -190,19 +200,20 @@ class CeleryWorkerManager:
             server_logger.info("停止Celery Worker...")
             self.shutdown_event.set()
 
-            # 发送停止信号给线程
             if self.worker_thread and self.worker_thread.is_alive():
                 # 尝试优雅停止
                 start_time = time.time()
                 while self.is_running and (time.time() - start_time) < timeout:
                     time.sleep(0.1)
 
-                # 如果还没停止,记录警告
                 if self.is_running:
                     server_logger.warning("Celery Worker优雅停止超时")
                 else:
                     server_logger.info("Celery Worker已优雅停止")
 
+            # 停止后清理Redis任务
+            self._cleanup_redis_tasks("停止时")
+
             self.is_running = False
             self.shutdown_event.clear()
             return True
@@ -211,8 +222,12 @@ class CeleryWorkerManager:
             server_logger.error(f"停止Celery Worker失败: {e}")
             return False
 
-    def stop_worker_immediately(self):
-        """立即停止Celery Worker,不等待"""
+    def stop_worker_immediately(self) -> bool:
+        """立即停止Celery Worker,不等待
+
+        Returns:
+            bool: 停止是否成功
+        """
         if not self.is_running:
             server_logger.info("Celery Worker未运行")
             return True
@@ -221,10 +236,7 @@ class CeleryWorkerManager:
             server_logger.info("立即停止Celery Worker...")
             self.shutdown_event.set()
 
-            # 设置超时事件,强制停止
-            import os
-
-            # 发送中断信号给当前进程
+            # 发送中断信号
             if hasattr(os, 'kill'):
                 try:
                     os.kill(os.getpid(), signal.SIGINT)
@@ -232,54 +244,185 @@ class CeleryWorkerManager:
                 except:
                     pass
 
-            # 立即设置状态为停止
+            # 停止后清理Redis任务
+            self._cleanup_redis_tasks("立即停止时")
+
             self.is_running = False
             self.shutdown_event.clear()
-
             server_logger.info("Celery Worker已立即停止")
             return True
 
         except Exception as e:
             server_logger.error(f"立即停止Celery Worker失败: {e}")
-            # 即使失败也要设置状态
             self.is_running = False
             return False
 
-    def get_status(self):
-        """获取Worker状态"""
+    def get_status(self) -> dict:
+        """获取Worker状态
+
+        Returns:
+            dict: 包含worker状态的字典
+        """
         return {
             "is_running": self.is_running,
             "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False,
         }
 
+    def _cleanup_redis_tasks(self, phase: str):
+        """清理Redis中的Celery任务
+
+        Args:
+            phase: 清理阶段(启动前/停止时/立即停止时)
+        """
+        try:
+            r = self.server_utils.get_redis_connection()
+            server_logger.info(f"{phase}清理Redis中的Celery任务...")
+
+            # 清理任务相关键
+            task_keys = r.keys('task:*')
+            celery_meta_keys = r.keys('celery-task-meta-*')
+            current_keys = r.keys('current:*')
+            kombu_keys = r.keys('_kombu.binding.*')
+
+            all_keys = task_keys + celery_meta_keys + current_keys + kombu_keys
+
+            for key in all_keys:
+                try:
+                    r.delete(key)
+                    server_logger.debug(f"{phase}清理: {key}")
+                except Exception as e:
+                    server_logger.warning(f"{phase}清理 {key} 失败: {e}")
+
+            # 清理队列
+            queues = ['celery', 'celery.pidbox', 'celeryev']
+            for queue in queues:
+                try:
+                    r.delete(queue)
+                    server_logger.debug(f"{phase}清理队列: {queue}")
+                except Exception as e:
+                    server_logger.warning(f"{phase}清理队列 {queue} 失败: {e}")
+
+            if all_keys:
+                server_logger.info(f"{phase}已清理 {len(all_keys)} 个Redis键")
+
+        except Exception as e:
+            server_logger.error(f"{phase}清理Redis任务失败: {e}")
+
     def __enter__(self):
         return self
 
     def __exit__(self, exc_type, exc_val, exc_tb):
         self.stop_worker()
 
+class ApplicationFactory:
+    """应用工厂类 - 负责创建和配置FastAPI应用"""
 
-# 全局Worker管理器实例
-celery_manager = CeleryWorkerManager()
+    def __init__(self):
+        """初始化应用工厂"""
+        self.server_utils = ServerUtils()
+        self.celery_manager = CeleryWorkerManager(self.server_utils)
+
+    def create_app(self) -> FastAPI:
+        """创建FastAPI应用实例
+
+        Returns:
+            FastAPI: 配置完成的应用实例
+        """
+        app = FastAPI(
+            title="Agent API - 施工方案审查系统",
+            version="0.3",
+            description="Agent API - 集成施工方案审查功能",
+            lifespan=lifespan
+        )
 
-def cleanup_redis_before_start():
-    """启动前清理Redis中的残留Celery任务"""
-    try:
-        import redis
-        from foundation.base.config import config_handler
+        # 使用路由管理器配置应用
+        route_manager = RouteManager(app)
 
-        # 连接Redis
-        redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
-        redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
-        redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
-        redis_db = config_handler.get('redis', 'REDIS_DB', '0')
+        return app
 
-        if redis_password:
-            redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}'
-        else:
-            redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}'
+    def create_server_config(self) -> dict:
+        """创建服务器配置
 
-        r = redis.from_url(redis_url, decode_responses=True)
+        Returns:
+            dict: 服务器配置字典
+        """
+        # 确保端口号是整数类型
+        port = config_handler.get('launch', 'LAUNCH_PORT', 8002)
+        try:
+            port = int(port)
+        except (ValueError, TypeError):
+            port = 8002
+
+        return {
+            'host': config_handler.get('launch', 'HOST', '0.0.0.0'),
+            'port': port,
+            'reload': False,
+            'with_celery': True
+        }
+
+
+# 全局实例
+app_factory = ApplicationFactory()
+celery_manager = app_factory.celery_manager
+
+# 创建 FastAPI 应用
+def create_app() -> FastAPI:
+    """创建主应用服务"""
+    app = FastAPI(
+        title="Agent API - 施工方案审查系统",
+        version="0.3",
+        description="Agent API - 集成施工方案审查功能",
+        lifespan=lifespan
+    )
+
+    # 添加 CORS 中间件
+    app.add_middleware(
+        CORSMiddleware,
+        allow_origins=["*"],  # 允许所有的来源
+        allow_credentials=True,
+        allow_methods=["*"],  # 允许的HTTP方法
+        allow_headers=["*"],  # 允许的请求头
+    )
+
+    # 添加所有路由
+    app.include_router(test_router)
+    app.include_router(file_upload_router)
+    app.include_router(review_results_router)
+    app.include_router(launch_review_router)
+
+    # 全局异常处理
+    @app.exception_handler(HTTPException)
+    async def http_exception_handler(request, exc):
+        return JSONResponse(
+            status_code=exc.status_code,
+            content=exc.detail
+        )
+
+    # 健康检查
+    @app.get("/health")
+    async def health_check():
+        timestamp = datetime.datetime.now().isoformat()
+        return {"status": "healthy", "timestamp": timestamp}
+
+    # Celery状态检查
+    @app.get("/celery/status")
+    async def get_celery_status():
+        """获取Celery Worker状态"""
+        global celery_manager
+        status = celery_manager.get_status()
+        return {
+            "celery_worker": status,
+            "timestamp": datetime.datetime.now().isoformat()
+        }
+
+    return app
+
+
+def cleanup_redis_before_start():
+    """启动前清理Redis中的残留Celery任务"""
+    try:
+        # 使用统一的Redis连接工具函数
+        r = get_redis_connection()
 
         server_logger.info("清理Redis中的残留Celery任务...")
 
@@ -375,21 +518,8 @@ def stop_celery_worker():
 
     # 立即取消所有任务注册(使用DB0,与启动时保持一致)
     try:
-        import redis
-        from foundation.base.config import config_handler
-
-        # 连接Redis(使用配置文件的数据库)
-        redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
-        redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
-        redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
-        redis_db = config_handler.get('redis', 'REDIS_DB', '0')
-
-        if redis_password:
-            redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}'
-        else:
-            redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}'
-
-        r = redis.from_url(redis_url, decode_responses=True)
+        # 使用统一的Redis连接工具函数
+        r = get_redis_connection()
 
         server_logger.info("停止时清理Redis中的Celery任务...")
 
@@ -425,10 +555,16 @@ def stop_celery_worker():
     # 立即停止Worker,不等待
     return celery_manager.stop_worker_immediately()
 
-def run_server(host: str = "127.0.0.1", port: int = 8002, reload: bool = False,
+def run_server(host: str = None, port: int = None, reload: bool = False,
                 with_celery: bool = True):
     """运行服务器"""
 
+    # 从配置文件获取默认值
+    if host is None:
+        host = config_handler.get('launch', 'HOST', '0.0.0.0')
+    if port is None:
+        port = config_handler.get('launch', 'LAUNCH_PORT')
+
     if with_celery:
         # 启动Celery Worker
         start_celery_worker_background()
@@ -486,36 +622,127 @@ def run_server(host: str = "127.0.0.1", port: int = 8002, reload: bool = False,
             stop_celery_worker()
 
 
-# 创建应用实例
 app = create_app()
 
 server_logger.info(msg="APP init successfully - 集成施工方案审查系统")
 
+class ServerRunner:
+    """服务器运行器 - 简化的主启动入口"""
+
+    def __init__(self, app_factory: ApplicationFactory):
+        """初始化服务器运行器
+
+        Args:
+            app_factory: 应用工厂实例
+        """
+        self.app_factory = app_factory
+        self.celery_manager = app_factory.celery_manager
+
+    def run_server(self, **kwargs):
+        """运行服务器
+
+        Args:
+            **kwargs: 服务器配置参数
+        """
+        # 获取配置
+        config = self.app_factory.create_server_config()
+        config.update(kwargs)
+
+        host = config.get('host', '0.0.0.0')
+        port = config.get('port', 8002)
+        # 确保端口号是整数类型
+        try:
+            port = int(port)
+        except (ValueError, TypeError):
+            port = 8002
+
+        reload = config.get('reload', False)
+        with_celery = config.get('with_celery', True)
+
+        if with_celery:
+            self._setup_celery_integration()
+
+        # 创建应用实例
+        app = self.app_factory.create_app()
+
+        try:
+            if reload:
+                app_import_path = "server.app:app"
+                uvicorn.run(app_import_path, host=host, port=port, reload=reload)
+            else:
+                uvicorn.run(app, host=host, port=port)
+        finally:
+            if with_celery:
+                self.celery_manager.stop_worker()
+
+    def _setup_celery_integration(self):
+        """设置Celery集成"""
+        # 启动Celery Worker
+        self.celery_manager.start_worker()
+
+        # 注册退出处理
+        import atexit
+        atexit.register(self.celery_manager.stop_worker_immediately)
+
+        # 设置信号处理
+        self._setup_signal_handlers()
+
+    def _setup_signal_handlers(self):
+        """设置信号处理器"""
+        def signal_handler(signum, frame):
+            server_logger.info(f"收到信号 {signum},正在停止服务...")
+            self.celery_manager.stop_worker_immediately()
+            sys.exit(0)
+
+        # 通用信号处理
+        try:
+            signal.signal(signal.SIGINT, signal_handler)  # Ctrl+C
+            signal.signal(signal.SIGTERM, signal_handler)  # 终止信号
+        except AttributeError:
+            # Windows可能不支持某些信号
+            pass
+
+        # Windows特有处理
+        if sys.platform == 'win32':
+            self._setup_windows_signal_handler()
+
+    def _setup_windows_signal_handler(self):
+        """设置Windows信号处理器"""
+        try:
+            import win32api
+            def win32_handler(dwCtrlType):
+                CTRL_C_EVENT = 0
+                CTRL_BREAK_EVENT = 1
+                CTRL_CLOSE_EVENT = 2
+                CTRL_SHUTDOWN_EVENT = 6
+
+                if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT):
+                    server_logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...")
+                    self.celery_manager.stop_worker_immediately()
+                    sys.exit(0)
+                return False
+            win32api.SetConsoleCtrlHandler(win32_handler, True)
+        except (ImportError, AttributeError) as e:
+            server_logger.debug(f"Windows控制台事件处理不可用: {e}")
+
+
+# 创建应用实例和运行器
+app = app_factory.create_app()
+server_runner = ServerRunner(app_factory)
+
+server_logger.info(msg="APP init successfully - 集成施工方案审查系统")
+
 
 # 运行Uvicorn服务器
 if __name__ == "__main__":
-    parser = argparse.ArgumentParser(description='Agent API - 施工方案审查系统')
-    parser.add_argument('--host', default='0.0.0.0', help='服务器地址')
-    parser.add_argument('--port', type=int, default=8001, help='服务器端口')
-    parser.add_argument('--no-celery', action='store_true', help='不启动Celery Worker')
-    parser.add_argument('--no-reload', action='store_true', help='关闭热重载')
-
-    args = parser.parse_args()
+    # 使用新的服务器运行器启动
+    config = app_factory.create_server_config()
 
-    server_logger.info("Agent API服务启动中...")
-    server_logger.info(f"服务地址: http://{args.host}:{args.port}")
-    server_logger.info(f"API文档: http://{args.host}:{args.port}/docs")
-    server_logger.info(f"健康检查: http://{args.host}:{args.port}/health")
-    server_logger.info(f"API概览: http://{args.host}:{args.port}/api/docs")
+    server_logger.info(f"Agent API服务启动中...运行在{config['host']}:{config['port']}")
 
-    if not args.no_celery:
+    if config['with_celery']:
         server_logger.info("Celery Worker: 已集成启动")
     else:
         server_logger.warning("Celery Worker: 已禁用")
 
-    run_server(
-        host=args.host,
-        port=args.port,
-        reload=False,
-        with_celery=not args.no_celery
-    )
+    server_runner.run_server(**config)