import os import sys import time import signal import uvicorn import datetime import traceback import threading BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, BASE_DIR) from views import lifespan from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from foundation.base.config import config_handler from foundation.logger.loggering import server_logger from foundation.base.celery_app import app as celery_app # 导入所有路由 from views.test_views import test_router from views.construction_review.file_upload import file_upload_router from views.construction_review.review_results import review_results_router from views.construction_review.launch_review import launch_review_router class ServerUtils: """服务器工具函数类 - 集中管理工具方法""" @staticmethod def get_redis_connection(): """获取Redis连接的统一工具函数 Returns: redis.Redis: Redis连接对象 """ import redis from foundation.base.config import config_handler # 从配置文件获取Redis连接参数 redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost') redis_port = config_handler.get('redis', 'REDIS_PORT', '6379') redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '') redis_db = config_handler.get('redis', 'REDIS_DB', '0') # 构建Redis URL if redis_password: redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}' else: redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}' return redis.from_url(redis_url, decode_responses=True) class RouteManager: """路由管理类 - 负责路由配置和中间件设置""" def __init__(self, app: FastAPI): """初始化路由管理器 Args: app: FastAPI应用实例 """ self.app = app self._setup_cors() self._setup_routes() self._setup_exception_handlers() self._setup_health_checks() def _setup_cors(self): """配置CORS中间件""" self.app.add_middleware( CORSMiddleware, allow_origins=["*"], # 允许所有的来源 allow_credentials=True, allow_methods=["*"], # 允许的HTTP方法 allow_headers=["*"], # 允许的请求头 ) def _setup_routes(self): """配置所有路由""" self.app.include_router(test_router) self.app.include_router(file_upload_router) self.app.include_router(review_results_router) self.app.include_router(launch_review_router) def _setup_exception_handlers(self): """配置全局异常处理""" @self.app.exception_handler(HTTPException) async def http_exception_handler(request, exc): return JSONResponse( status_code=exc.status_code, content=exc.detail ) def _setup_health_checks(self): """配置健康检查接口""" @self.app.get("/health") async def health_check(): timestamp = datetime.datetime.now().isoformat() return {"status": "healthy", "timestamp": timestamp} @self.app.get("/celery/status") async def get_celery_status(): """获取Celery Worker状态""" # 延迟导入避免循环引用 from server.app import celery_manager status = celery_manager.get_status() return { "celery_worker": status, "timestamp": datetime.datetime.now().isoformat() } class CeleryWorkerManager: """Celery Worker程序化管理器 - 独立的Celery管理模块""" def __init__(self, server_utils: ServerUtils = None): """初始化Celery Worker管理器 Args: server_utils: 服务器工具类实例 """ self.worker = None self.is_running = False self.worker_thread = None self.shutdown_event = threading.Event() self.server_utils = server_utils or ServerUtils() def start_worker(self, **kwargs) -> bool: """启动Celery Worker Returns: bool: 启动是否成功 """ if self.is_running: server_logger.warning("Celery Worker已在运行") return True try: # 启动前清理残留任务 self._cleanup_redis_tasks("启动前") # 创建Worker函数 def run_celery_worker(): try: server_logger.info("Celery Worker开始运行...") celery_app.worker_main(['worker']) except KeyboardInterrupt: server_logger.info("收到停止信号,Celery Worker退出") except Exception as e: server_logger.error(f"Celery Worker运行时出错: {e}") server_logger.exception("详细错误信息:") finally: self.is_running = False server_logger.info("Celery Worker已停止") # 在单独线程中启动Worker self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True) self.worker_thread.start() self.is_running = True # 等待启动 time.sleep(2) success = self.is_running and self.worker_thread.is_alive() if success: server_logger.info("Celery Worker启动成功") else: server_logger.error("Celery Worker启动失败") self.is_running = False return success except ImportError as e: server_logger.error(f"导入Celery失败: {e}") server_logger.info("请先安装Celery: pip install celery redis") return False except Exception as e: server_logger.error(f"启动Celery Worker失败: {e}") server_logger.exception("详细错误信息:") return False def stop_worker(self, timeout: int = 5) -> bool: """优雅停止Celery Worker Args: timeout: 停止超时时间(秒) Returns: bool: 停止是否成功 """ if not self.is_running: server_logger.info("Celery Worker未运行") return True try: server_logger.info("停止Celery Worker...") self.shutdown_event.set() if self.worker_thread and self.worker_thread.is_alive(): # 尝试优雅停止 start_time = time.time() while self.is_running and (time.time() - start_time) < timeout: time.sleep(0.1) if self.is_running: server_logger.warning("Celery Worker优雅停止超时") else: server_logger.info("Celery Worker已优雅停止") # 停止后清理Redis任务 self._cleanup_redis_tasks("停止时") self.is_running = False self.shutdown_event.clear() return True except Exception as e: server_logger.error(f"停止Celery Worker失败: {e}") return False def stop_worker_immediately(self) -> bool: """立即停止Celery Worker,不等待 Returns: bool: 停止是否成功 """ if not self.is_running: server_logger.info("Celery Worker未运行") return True try: server_logger.info("立即停止Celery Worker...") self.shutdown_event.set() # 发送中断信号 if hasattr(os, 'kill'): try: os.kill(os.getpid(), signal.SIGINT) server_logger.info("已发送中断信号") except: pass # 停止后清理Redis任务 self._cleanup_redis_tasks("立即停止时") self.is_running = False self.shutdown_event.clear() server_logger.info("Celery Worker已立即停止") return True except Exception as e: server_logger.error(f"立即停止Celery Worker失败: {e}") self.is_running = False return False def get_status(self) -> dict: """获取Worker状态 Returns: dict: 包含worker状态的字典 """ return { "is_running": self.is_running, "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False, } def _cleanup_redis_tasks(self, phase: str): """清理Redis中的Celery任务 Args: phase: 清理阶段(启动前/停止时/立即停止时) """ try: r = self.server_utils.get_redis_connection() server_logger.info(f"{phase}清理Redis中的Celery任务...") # 清理任务相关键 task_keys = r.keys('task:*') celery_meta_keys = r.keys('celery-task-meta-*') current_keys = r.keys('current:*') kombu_keys = r.keys('_kombu.binding.*') all_keys = task_keys + celery_meta_keys + current_keys + kombu_keys for key in all_keys: try: r.delete(key) server_logger.debug(f"{phase}清理: {key}") except Exception as e: server_logger.warning(f"{phase}清理 {key} 失败: {e}") # 清理队列 queues = ['celery', 'celery.pidbox', 'celeryev'] for queue in queues: try: r.delete(queue) server_logger.debug(f"{phase}清理队列: {queue}") except Exception as e: server_logger.warning(f"{phase}清理队列 {queue} 失败: {e}") if all_keys: server_logger.info(f"{phase}已清理 {len(all_keys)} 个Redis键") except Exception as e: server_logger.error(f"{phase}清理Redis任务失败: {e}") def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop_worker() class ApplicationFactory: """应用工厂类 - 负责创建和配置FastAPI应用""" def __init__(self): """初始化应用工厂""" self.server_utils = ServerUtils() self.celery_manager = CeleryWorkerManager(self.server_utils) def create_app(self) -> FastAPI: """创建FastAPI应用实例 Returns: FastAPI: 配置完成的应用实例 """ app = FastAPI( title="Agent API - 施工方案审查系统", version="0.3", description="Agent API - 集成施工方案审查功能", lifespan=lifespan ) # 使用路由管理器配置应用 route_manager = RouteManager(app) return app def create_server_config(self) -> dict: """创建服务器配置 Returns: dict: 服务器配置字典 """ # 确保端口号是整数类型 port = config_handler.get('launch', 'LAUNCH_PORT', 8002) try: port = int(port) except (ValueError, TypeError): port = 8002 return { 'host': config_handler.get('launch', 'HOST', '0.0.0.0'), 'port': port, 'reload': False, 'with_celery': True } # 全局实例 app_factory = ApplicationFactory() celery_manager = app_factory.celery_manager # 创建 FastAPI 应用 def create_app() -> FastAPI: """创建主应用服务""" app = FastAPI( title="Agent API - 施工方案审查系统", version="0.3", description="Agent API - 集成施工方案审查功能", lifespan=lifespan ) # 添加 CORS 中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], # 允许所有的来源 allow_credentials=True, allow_methods=["*"], # 允许的HTTP方法 allow_headers=["*"], # 允许的请求头 ) # 添加所有路由 app.include_router(test_router) app.include_router(file_upload_router) app.include_router(review_results_router) app.include_router(launch_review_router) # 全局异常处理 @app.exception_handler(HTTPException) async def http_exception_handler(request, exc): return JSONResponse( status_code=exc.status_code, content=exc.detail ) # 健康检查 @app.get("/health") async def health_check(): timestamp = datetime.datetime.now().isoformat() return {"status": "healthy", "timestamp": timestamp} # Celery状态检查 @app.get("/celery/status") async def get_celery_status(): """获取Celery Worker状态""" global celery_manager status = celery_manager.get_status() return { "celery_worker": status, "timestamp": datetime.datetime.now().isoformat() } return app def cleanup_redis_before_start(): """启动前清理Redis中的残留Celery任务""" try: # 使用统一的Redis连接工具函数 r = get_redis_connection() server_logger.info("清理Redis中的残留Celery任务...") # 清理所有Celery相关的键,包括更多模式 keys_to_delete = [] for key in r.keys(): key_lower = key.lower() # 扩展匹配模式,包括你遇到的实际键格式 if any(keyword in key_lower for keyword in [ 'celery', 'task:', 'celery-task', 'kombu', 'current:' ]): keys_to_delete.append(key) # 匹配特定模式 elif key.startswith('celery-task-meta-') or key.startswith('current:'): keys_to_delete.append(key) # 临时键 elif key == 't_key': keys_to_delete.append(key) # 清理消息队列 try: # 清理所有Celery队列 queues = ['celery', 'celery.pidbox', 'celeryev'] for queue in queues: # 删除队列 r.delete(queue) server_logger.debug(f"已清理队列: {queue}") # 清理Kombu绑定 kombu_keys = r.keys('_kombu.binding.*') for key in kombu_keys: r.delete(key) server_logger.debug(f"已清理Kombu绑定: {key}") except Exception as e: server_logger.warning(f"清理队列失败: {e}") # 清理识别到的键 if keys_to_delete: for key in keys_to_delete: try: r.delete(key) server_logger.debug(f"已清理: {key}") except Exception as e: server_logger.warning(f"清理 {key} 失败: {e}") server_logger.info(f"成功清理 {len(keys_to_delete)} 个Redis键") else: server_logger.info("没有发现需要清理的残留任务") # 额外检查:确保关键队列被清空 try: # 使用FLUSHDB只清空Celery相关的数据,而不是整个数据库 # 这里我们检查是否还有残留,如果有则进行更彻底的清理 remaining_keys = [] for key in r.keys(): if any(pattern in key.lower() for pattern in ['celery', 'kombu']): remaining_keys.append(key) if remaining_keys: server_logger.warning(f"发现 {len(remaining_keys)} 个残留键,进行彻底清理") for key in remaining_keys: try: r.delete(key) server_logger.debug(f"彻底清理: {key}") except Exception as e: server_logger.warning(f"彻底清理 {key} 失败: {e}") except Exception as e: server_logger.warning(f"彻底清理检查失败: {e}") return True except Exception as e: server_logger.error(f"清理Redis残留任务失败: {e}") return False def start_celery_worker_background(): """在后台启动Celery Worker(异步方式)""" # 启动前清理残留任务 cleanup_redis_before_start() # 添加调用栈调试 server_logger.info("=== Celery Worker启动调用栈 ===") for line in traceback.format_stack(): server_logger.debug(f" {line.strip()}") server_logger.info("=== 调用栈结束 ===") return celery_manager.start_worker() def stop_celery_worker(): """停止Celery Worker""" global celery_manager # 立即取消所有任务注册(使用DB0,与启动时保持一致) try: # 使用统一的Redis连接工具函数 r = get_redis_connection() server_logger.info("停止时清理Redis中的Celery任务...") # 清理任务相关键 task_keys = r.keys('task:*') # 重复任务检查器的数据 celery_meta_keys = r.keys('celery-task-meta-*') current_keys = r.keys('current:*') kombu_keys = r.keys('_kombu.binding.*') all_keys = task_keys + celery_meta_keys + current_keys + kombu_keys for key in all_keys: try: r.delete(key) server_logger.debug(f"停止时清理: {key}") except Exception as e: server_logger.warning(f"停止时清理 {key} 失败: {e}") # 清理队列 queues = ['celery', 'celery.pidbox', 'celeryev'] for queue in queues: try: r.delete(queue) server_logger.debug(f"停止时清理队列: {queue}") except Exception as e: server_logger.warning(f"停止时清理队列 {queue} 失败: {e}") server_logger.info(f"停止时已清理 {len(all_keys)} 个Redis键") except Exception as e: server_logger.error(f"停止时清理Redis任务失败: {e}") # 立即停止Worker,不等待 return celery_manager.stop_worker_immediately() def run_server(host: str = None, port: int = None, reload: bool = False, with_celery: bool = True): """运行服务器""" # 从配置文件获取默认值 if host is None: host = config_handler.get('launch', 'HOST', '0.0.0.0') if port is None: port = config_handler.get('launch', 'LAUNCH_PORT') if with_celery: # 启动Celery Worker start_celery_worker_background() # 注册退出时的清理函数 import atexit atexit.register(stop_celery_worker) # 设置信号处理 def signal_handler(signum, frame): server_logger.info(f"收到信号 {signum},正在停止服务...") stop_celery_worker() sys.exit(0) # Windows和Unix系统的信号处理 try: signal.signal(signal.SIGINT, signal_handler) # Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # 终止信号 except AttributeError: # Windows可能不支持某些信号 pass # Windows特有的控制台事件处理 if sys.platform == 'win32': try: import win32api def win32_handler(dwCtrlType): # 正确的控制台事件常量 CTRL_C_EVENT = 0 CTRL_BREAK_EVENT = 1 CTRL_CLOSE_EVENT = 2 CTRL_SHUTDOWN_EVENT = 6 if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT): server_logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...") stop_celery_worker() sys.exit(0) return False win32api.SetConsoleCtrlHandler(win32_handler, True) except (ImportError, AttributeError) as e: # 如果win32api不可用,跳过Windows控制台处理 server_logger.debug(f"Windows控制台事件处理不可用: {e}") pass try: if reload: # 重载模式需要正确的模块路径 app_import_path = "server.app:app" uvicorn.run(app_import_path, host=host, port=port, reload=reload) else: # 直接运行模式,直接使用app对象 uvicorn.run(app, host=host, port=port) finally: if with_celery: stop_celery_worker() app = create_app() server_logger.info(msg="APP init successfully - 集成施工方案审查系统") class ServerRunner: """服务器运行器 - 简化的主启动入口""" def __init__(self, app_factory: ApplicationFactory): """初始化服务器运行器 Args: app_factory: 应用工厂实例 """ self.app_factory = app_factory self.celery_manager = app_factory.celery_manager def run_server(self, **kwargs): """运行服务器 Args: **kwargs: 服务器配置参数 """ # 获取配置 config = self.app_factory.create_server_config() config.update(kwargs) host = config.get('host', '0.0.0.0') port = config.get('port', 8002) # 确保端口号是整数类型 try: port = int(port) except (ValueError, TypeError): port = 8002 reload = config.get('reload', False) with_celery = config.get('with_celery', True) if with_celery: self._setup_celery_integration() # 创建应用实例 app = self.app_factory.create_app() try: if reload: app_import_path = "server.app:app" uvicorn.run(app_import_path, host=host, port=port, reload=reload) else: uvicorn.run(app, host=host, port=port) finally: if with_celery: self.celery_manager.stop_worker() def _setup_celery_integration(self): """设置Celery集成""" # 启动Celery Worker self.celery_manager.start_worker() # 注册退出处理 import atexit atexit.register(self.celery_manager.stop_worker_immediately) # 设置信号处理 self._setup_signal_handlers() def _setup_signal_handlers(self): """设置信号处理器""" def signal_handler(signum, frame): server_logger.info(f"收到信号 {signum},正在停止服务...") self.celery_manager.stop_worker_immediately() sys.exit(0) # 通用信号处理 try: signal.signal(signal.SIGINT, signal_handler) # Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # 终止信号 except AttributeError: # Windows可能不支持某些信号 pass # Windows特有处理 if sys.platform == 'win32': self._setup_windows_signal_handler() def _setup_windows_signal_handler(self): """设置Windows信号处理器""" try: import win32api def win32_handler(dwCtrlType): CTRL_C_EVENT = 0 CTRL_BREAK_EVENT = 1 CTRL_CLOSE_EVENT = 2 CTRL_SHUTDOWN_EVENT = 6 if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT): server_logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...") self.celery_manager.stop_worker_immediately() sys.exit(0) return False win32api.SetConsoleCtrlHandler(win32_handler, True) except (ImportError, AttributeError) as e: server_logger.debug(f"Windows控制台事件处理不可用: {e}") # 创建应用实例和运行器 app = app_factory.create_app() server_runner = ServerRunner(app_factory) server_logger.info(msg="APP init successfully - 集成施工方案审查系统") # 运行Uvicorn服务器 if __name__ == "__main__": # 使用新的服务器运行器启动 config = app_factory.create_server_config() server_logger.info(f"Agent API服务启动中...运行在{config['host']}:{config['port']}") if config['with_celery']: server_logger.info("Celery Worker: 已集成启动") else: server_logger.warning("Celery Worker: 已禁用") server_runner.run_server(**config)