| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787 |
- import os
- import sys
- import time
- import redis
- import signal
- import uvicorn
- import datetime
- import traceback
- import threading
- BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.insert(0, BASE_DIR)
- from views import lifespan
- from fastapi import FastAPI, HTTPException
- from fastapi.responses import JSONResponse
- from pydantic import BaseModel
- from typing import Optional, Dict, Any
- from fastapi.middleware.cors import CORSMiddleware
- from foundation.infrastructure.config.config import config_handler
- from foundation.infrastructure.cache import RedisConnectionFactory
- from foundation.observability.logger.loggering import server_logger
- from foundation.infrastructure.messaging.celery_app import app as celery_app
- # 导入所有路由
- from views.test_views import test_router
- from views.construction_review.file_upload import file_upload_router
- from views.construction_review.review_results import review_results_router
- from views.construction_review.launch_review import launch_review_router
- from views.construction_review.task_control import task_control_router
- class ServerUtils:
- """服务器工具函数类 - 集中管理工具方法"""
- @staticmethod
- def get_redis_connection():
- """获取Redis连接的统一工具函数
- Returns:
- redis.Redis: Redis连接对象
- """
- # 从配置文件获取Redis连接参数
- redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
- redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
- redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
- redis_db = config_handler.get('redis', 'REDIS_DB', '0')
- # 构建Redis URL
- if redis_password:
- redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}'
- else:
- redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}'
- return redis.from_url(redis_url, decode_responses=True)
- class RouteManager:
- """路由管理类 - 负责路由配置和中间件设置"""
- def __init__(self, app: FastAPI):
- """初始化路由管理器
- Args:
- app: FastAPI应用实例
- """
- self.app = app
- self._setup_cors()
- self._setup_routes()
- self._setup_exception_handlers()
- self._setup_health_checks()
- self._setup_api_docs()
- def _setup_cors(self):
- """配置CORS中间件"""
- self.app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"], # 允许所有的来源
- allow_credentials=True,
- allow_methods=["*"], # 允许的HTTP方法
- allow_headers=["*"], # 允许的请求头
- )
- def _setup_routes(self):
- """配置所有路由"""
- self.app.include_router(test_router)
- self.app.include_router(file_upload_router)
- self.app.include_router(review_results_router)
- self.app.include_router(launch_review_router)
- self.app.include_router(task_control_router) # 新增:任务控制路由
- def _setup_exception_handlers(self):
- """配置全局异常处理"""
- @self.app.exception_handler(HTTPException)
- async def http_exception_handler(request, exc):
- return JSONResponse(
- status_code=exc.status_code,
- content=exc.detail
- )
- def _setup_health_checks(self):
- """配置健康检查接口"""
- @self.app.get("/health" ,tags=["系统状态"])
- async def health_check():
- timestamp = datetime.datetime.now().isoformat()
- return {"status": "healthy", "timestamp": timestamp}
- @self.app.get("/celery/status", tags=["系统状态"])
- async def get_celery_status():
- """获取Celery Worker状态"""
- # 延迟导入避免循环引用
- from server.app import celery_manager
- status = celery_manager.get_status()
- return {
- "celery_worker": status,
- "timestamp": datetime.datetime.now().isoformat()
- }
- def _setup_api_docs(self):
- """配置Swagger API文档"""
- # 添加API文档信息接口
- @self.app.get("/api/docs/info", tags=["系统状态"])
- async def api_info():
- """获取API文档信息"""
- return {
- "title": "Agent API - 施工方案审查系统",
- "description": "集成施工方案审查功能的API接口文档",
- "version": "0.3",
- "docs_urls": {
- "swagger_ui": "/docs",
- "redoc": "/redoc",
- "openapi_json": "/openapi.json"
- },
- "features": [
- "自动生成API文档",
- "交互式API测试",
- "OpenAPI 3.0规范",
- "支持多种认证方式"
- ]
- }
- @self.app.get("/api/docs/health", tags=["系统状态"])
- async def docs_health_check():
- """API文档健康检查"""
- return {
- "status": "healthy",
- "service": "API Documentation",
- "version": "0.3",
- "timestamp": datetime.datetime.now().isoformat()
- }
- class CeleryWorkerManager:
- """Celery Worker程序化管理器 - 独立的Celery管理模块"""
- def __init__(self, server_utils: ServerUtils = None):
- """初始化Celery Worker管理器
- Args:
- server_utils: 服务器工具类实例
- """
- self.worker = None
- self.is_running = False
- self.worker_thread = None
- self.shutdown_event = threading.Event()
- self.server_utils = server_utils or ServerUtils()
- def start_worker(self, **kwargs) -> bool:
- """启动Celery Worker
- Returns:
- bool: 启动是否成功
- """
- if self.is_running:
- server_logger.warning("Celery Worker已在运行")
- return True
- try:
- # 启动前清理残留任务
- self._cleanup_redis_tasks("启动前")
- # 创建Worker函数
- def run_celery_worker():
- try:
- server_logger.info("Celery Worker开始运行...")
- celery_app.worker_main(['worker'])
- except KeyboardInterrupt:
- server_logger.info("收到停止信号,Celery Worker退出")
- except Exception as e:
- server_logger.error(f"Celery Worker运行时出错: {e}")
- server_logger.exception("详细错误信息:")
- finally:
- self.is_running = False
- server_logger.info("Celery Worker已停止")
- # 在单独线程中启动Worker
- self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True)
- self.worker_thread.start()
- self.is_running = True
- # 等待启动
- time.sleep(2)
- success = self.is_running and self.worker_thread.is_alive()
- if success:
- server_logger.info("Celery Worker启动成功")
- else:
- server_logger.error("Celery Worker启动失败")
- self.is_running = False
- return success
- except ImportError as e:
- server_logger.error(f"导入Celery失败: {e}")
- server_logger.info("请先安装Celery: pip install celery redis")
- return False
- except Exception as e:
- server_logger.error(f"启动Celery Worker失败: {e}")
- server_logger.exception("详细错误信息:")
- return False
- def stop_worker(self, timeout: int = 5) -> bool:
- """优雅停止Celery Worker
- Args:
- timeout: 停止超时时间(秒)
- Returns:
- bool: 停止是否成功
- """
- if not self.is_running:
- server_logger.info("Celery Worker未运行")
- return True
- try:
- server_logger.info("停止Celery Worker...")
- self.shutdown_event.set()
- if self.worker_thread and self.worker_thread.is_alive():
- # 尝试优雅停止
- start_time = time.time()
- while self.is_running and (time.time() - start_time) < timeout:
- time.sleep(0.1)
- if self.is_running:
- server_logger.warning("Celery Worker优雅停止超时")
- else:
- server_logger.info("Celery Worker已优雅停止")
- # 停止后清理Redis任务
- self._cleanup_redis_tasks("停止时")
- self.is_running = False
- self.shutdown_event.clear()
- return True
- except Exception as e:
- server_logger.error(f"停止Celery Worker失败: {e}")
- return False
- def stop_worker_immediately(self) -> bool:
- """立即停止Celery Worker,不等待
- Returns:
- bool: 停止是否成功
- """
- if not self.is_running:
- server_logger.info("Celery Worker未运行")
- return True
- try:
- server_logger.info("立即停止Celery Worker...")
- self.shutdown_event.set()
- # 发送中断信号
- if hasattr(os, 'kill'):
- try:
- os.kill(os.getpid(), signal.SIGINT)
- server_logger.info("已发送中断信号")
- except:
- pass
- # 停止后清理Redis任务
- self._cleanup_redis_tasks("立即停止时")
- self.is_running = False
- self.shutdown_event.clear()
- server_logger.info("Celery Worker已立即停止")
- return True
- except Exception as e:
- server_logger.error(f"立即停止Celery Worker失败: {e}")
- self.is_running = False
- return False
- def get_status(self) -> dict:
- """获取Worker状态
- Returns:
- dict: 包含worker状态的字典
- """
- return {
- "is_running": self.is_running,
- "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False,
- }
- def _cleanup_redis_tasks(self, phase: str):
- """清理Redis中的Celery任务
- Args:
- phase: 清理阶段(启动前/停止时/立即停止时)
- """
- try:
- r = self.server_utils.get_redis_connection()
- server_logger.info(f"{phase}清理Redis中的Celery任务...")
- # 清理任务相关键
- task_keys = r.keys('task:*')
- celery_meta_keys = r.keys('celery-task-meta-*')
- current_keys = r.keys('current:*')
- kombu_keys = r.keys('_kombu.binding.*')
- all_keys = task_keys + celery_meta_keys + current_keys + kombu_keys
- for key in all_keys:
- try:
- r.delete(key)
- server_logger.debug(f"{phase}清理: {key}")
- except Exception as e:
- server_logger.warning(f"{phase}清理 {key} 失败: {e}")
- # 清理队列
- queues = ['celery', 'celery.pidbox', 'celeryev']
- for queue in queues:
- try:
- r.delete(queue)
- server_logger.debug(f"{phase}清理队列: {queue}")
- except Exception as e:
- server_logger.warning(f"{phase}清理队列 {queue} 失败: {e}")
- if all_keys:
- server_logger.info(f"{phase}已清理 {len(all_keys)} 个Redis键")
- except Exception as e:
- server_logger.error(f"{phase}清理Redis任务失败: {e}")
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.stop_worker()
- class ApplicationFactory:
- """应用工厂类 - 负责创建和配置FastAPI应用"""
- def __init__(self):
- """初始化应用工厂"""
- self.server_utils = ServerUtils()
- self.celery_manager = CeleryWorkerManager(self.server_utils)
- def create_app(self) -> FastAPI:
- """创建FastAPI应用实例
- Returns:
- FastAPI: 配置完成的应用实例
- """
- app = FastAPI(
- title="Agent API - 施工方案审查系统",
- version="0.3",
- description="Agent API - 集成施工方案审查功能",
- lifespan=lifespan
- )
- # 使用路由管理器配置应用
- route_manager = RouteManager(app)
- return app
- def create_server_config(self) -> dict:
- """创建服务器配置
- Returns:
- dict: 服务器配置字典
- """
- # 确保端口号是整数类型
- port = config_handler.get('launch', 'LAUNCH_PORT', 8002)
- try:
- port = int(port)
- except (ValueError, TypeError):
- port = 8002
- return {
- 'host': config_handler.get('launch', 'HOST', '0.0.0.0'),
- 'port': port,
- 'reload': False,
- 'with_celery': True
- }
- # 全局实例
- app_factory = ApplicationFactory()
- celery_manager = app_factory.celery_manager
- # 创建 FastAPI 应用
- def create_app() -> FastAPI:
- """创建主应用服务"""
- app = FastAPI(
- title="Agent API - 施工方案审查系统",
- version="0.3",
- description="Agent API - 集成施工方案审查功能",
- lifespan=lifespan
- )
- # 添加 CORS 中间件
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"], # 允许所有的来源
- allow_credentials=True,
- allow_methods=["*"], # 允许的HTTP方法
- allow_headers=["*"], # 允许的请求头
- )
- # 添加所有路由
- app.include_router(test_router)
- app.include_router(file_upload_router)
- app.include_router(review_results_router)
- app.include_router(launch_review_router)
- # 全局异常处理
- @app.exception_handler(HTTPException)
- async def http_exception_handler(request, exc):
- return JSONResponse(
- status_code=exc.status_code,
- content=exc.detail
- )
- # 健康检查
- @app.get("/health")
- async def health_check():
- timestamp = datetime.datetime.now().isoformat()
- return {"status": "healthy", "timestamp": timestamp}
- # Celery状态检查
- @app.get("/celery/status")
- async def get_celery_status():
- """获取Celery Worker状态"""
- global celery_manager
- status = celery_manager.get_status()
- return {
- "celery_worker": status,
- "timestamp": datetime.datetime.now().isoformat()
- }
- return app
- def cleanup_redis_before_start():
- """启动前清理Redis中的残留Celery任务"""
- try:
- # 使用统一的Redis连接工具函数
- r = server_utils.get_redis_connection()
- server_logger.info("清理Redis中的残留Celery任务...")
- # 清理所有Celery相关的键,包括更多模式
- keys_to_delete = []
- for key in r.keys():
- key_lower = key.lower()
- # 扩展匹配模式,包括你遇到的实际键格式
- if any(keyword in key_lower for keyword in [
- 'celery', 'task:', 'celery-task', 'kombu', 'current:'
- ]):
- keys_to_delete.append(key)
- # 匹配特定模式
- elif key.startswith('celery-task-meta-') or key.startswith('current:'):
- keys_to_delete.append(key)
- # 临时键
- elif key == 't_key':
- keys_to_delete.append(key)
- # 清理消息队列
- try:
- # 清理所有Celery队列
- queues = ['celery', 'celery.pidbox', 'celeryev']
- for queue in queues:
- # 删除队列
- r.delete(queue)
- server_logger.debug(f"已清理队列: {queue}")
- # 清理Kombu绑定
- kombu_keys = r.keys('_kombu.binding.*')
- for key in kombu_keys:
- r.delete(key)
- server_logger.debug(f"已清理Kombu绑定: {key}")
- except Exception as e:
- server_logger.warning(f"清理队列失败: {e}")
- # 清理识别到的键
- if keys_to_delete:
- for key in keys_to_delete:
- try:
- r.delete(key)
- server_logger.debug(f"已清理: {key}")
- except Exception as e:
- server_logger.warning(f"清理 {key} 失败: {e}")
- server_logger.info(f"成功清理 {len(keys_to_delete)} 个Redis键")
- else:
- server_logger.info("没有发现需要清理的残留任务")
- # 额外检查:确保关键队列被清空
- try:
- # 使用FLUSHDB只清空Celery相关的数据,而不是整个数据库
- # 这里我们检查是否还有残留,如果有则进行更彻底的清理
- remaining_keys = []
- for key in r.keys():
- if any(pattern in key.lower() for pattern in ['celery', 'kombu']):
- remaining_keys.append(key)
- if remaining_keys:
- server_logger.warning(f"发现 {len(remaining_keys)} 个残留键,进行彻底清理")
- for key in remaining_keys:
- try:
- r.delete(key)
- server_logger.debug(f"彻底清理: {key}")
- except Exception as e:
- server_logger.warning(f"彻底清理 {key} 失败: {e}")
- except Exception as e:
- server_logger.warning(f"彻底清理检查失败: {e}")
- return True
- except Exception as e:
- server_logger.error(f"清理Redis残留任务失败: {e}")
- return False
- def start_celery_worker_background():
- """在后台启动Celery Worker(异步方式)"""
- # 启动前清理残留任务
- cleanup_redis_before_start()
- # 添加调用栈调试
- server_logger.info("=== Celery Worker启动调用栈 ===")
- for line in traceback.format_stack():
- server_logger.debug(f" {line.strip()}")
- server_logger.info("=== 调用栈结束 ===")
- return celery_manager.start_worker()
- def stop_celery_worker():
- """停止Celery Worker"""
- global celery_manager
- # 立即取消所有任务注册(使用DB0,与启动时保持一致)
- try:
- # 使用统一的Redis连接工具函数
- r = server_utils.get_redis_connection()
- server_logger.info("停止时清理Redis中的Celery任务...")
- # 清理任务相关键
- task_keys = r.keys('task:*') # 重复任务检查器的数据
- celery_meta_keys = r.keys('celery-task-meta-*')
- current_keys = r.keys('current:*')
- kombu_keys = r.keys('_kombu.binding.*')
- all_keys = task_keys + celery_meta_keys + current_keys + kombu_keys
- for key in all_keys:
- try:
- r.delete(key)
- server_logger.debug(f"停止时清理: {key}")
- except Exception as e:
- server_logger.warning(f"停止时清理 {key} 失败: {e}")
- # 清理队列
- queues = ['celery', 'celery.pidbox', 'celeryev']
- for queue in queues:
- try:
- r.delete(queue)
- server_logger.debug(f"停止时清理队列: {queue}")
- except Exception as e:
- server_logger.warning(f"停止时清理队列 {queue} 失败: {e}")
- server_logger.info(f"停止时已清理 {len(all_keys)} 个Redis键")
- except Exception as e:
- server_logger.error(f"停止时清理Redis任务失败: {e}")
- # 立即停止Worker,不等待
- return celery_manager.stop_worker_immediately()
- def run_server(host: str = None, port: int = None, reload: bool = False,
- with_celery: bool = True):
- """运行服务器"""
- # 从配置文件获取默认值
- if host is None:
- host = config_handler.get('launch', 'HOST', '0.0.0.0')
- if port is None:
- port = config_handler.get('launch', 'LAUNCH_PORT')
- if with_celery:
- # 启动Celery Worker
- start_celery_worker_background()
- # 注册退出时的清理函数
- import atexit
- atexit.register(stop_celery_worker)
- # 设置信号处理
- def signal_handler(signum, frame):
- server_logger.info(f"收到信号 {signum},正在停止服务...")
- stop_celery_worker()
- sys.exit(0)
- # Windows和Unix系统的信号处理
- try:
- signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
- signal.signal(signal.SIGTERM, signal_handler) # 终止信号
- except AttributeError:
- # Windows可能不支持某些信号
- pass
- # Windows特有的控制台事件处理
- if sys.platform == 'win32':
- try:
- import win32api
- def win32_handler(dwCtrlType):
- # 正确的控制台事件常量
- CTRL_C_EVENT = 0
- CTRL_BREAK_EVENT = 1
- CTRL_CLOSE_EVENT = 2
- CTRL_SHUTDOWN_EVENT = 6
- if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT):
- server_logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...")
- stop_celery_worker()
- sys.exit(0)
- return False
- win32api.SetConsoleCtrlHandler(win32_handler, True)
- except (ImportError, AttributeError) as e:
- # 如果win32api不可用,跳过Windows控制台处理
- server_logger.debug(f"Windows控制台事件处理不可用: {e}")
- pass
- try:
- if reload:
- # 重载模式需要正确的模块路径
- app_import_path = "server.app:app"
- uvicorn.run(app_import_path, host=host, port=port, reload=reload)
- else:
- # 直接运行模式,直接使用app对象
- uvicorn.run(app, host=host, port=port)
- finally:
- if with_celery:
- stop_celery_worker()
- app = create_app()
- server_logger.info(msg="APP init successfully - 集成施工方案审查系统")
- class ServerRunner:
- """服务器运行器 - 简化的主启动入口"""
- def __init__(self, app_factory: ApplicationFactory):
- """初始化服务器运行器
- Args:
- app_factory: 应用工厂实例
- """
- self.app_factory = app_factory
- self.celery_manager = app_factory.celery_manager
- def run_server(self, **kwargs):
- """运行服务器
- Args:
- **kwargs: 服务器配置参数
- """
- # 获取配置
- config = self.app_factory.create_server_config()
- config.update(kwargs)
- host = config.get('host', '0.0.0.0')
- port = config.get('port', 8002)
- # 确保端口号是整数类型
- try:
- port = int(port)
- except (ValueError, TypeError):
- port = 8002
- reload = config.get('reload', False)
- with_celery = config.get('with_celery', True)
- if with_celery:
- self._setup_celery_integration()
- # 创建应用实例
- app = self.app_factory.create_app()
- try:
- if reload:
- app_import_path = "server.app:app"
- uvicorn.run(app_import_path, host=host, port=port, reload=reload)
- else:
- uvicorn.run(app, host=host, port=port)
- finally:
- if with_celery:
- self.celery_manager.stop_worker()
- def _setup_celery_integration(self):
- """设置Celery集成"""
- # 启动Celery Worker
- self.celery_manager.start_worker()
- # 注册退出处理
- import atexit
- atexit.register(self.celery_manager.stop_worker_immediately)
- # 设置信号处理
- self._setup_signal_handlers()
- def _setup_signal_handlers(self):
- """设置信号处理器"""
- def signal_handler(signum, frame):
- server_logger.info(f"收到信号 {signum},正在停止服务...")
- self.celery_manager.stop_worker_immediately()
- sys.exit(0)
- # 通用信号处理
- try:
- signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
- signal.signal(signal.SIGTERM, signal_handler) # 终止信号
- except AttributeError:
- # Windows可能不支持某些信号
- pass
- # Windows特有处理
- if sys.platform == 'win32':
- self._setup_windows_signal_handler()
- def _setup_windows_signal_handler(self):
- """设置Windows信号处理器"""
- try:
- import win32api
- def win32_handler(dwCtrlType):
- CTRL_C_EVENT = 0
- CTRL_BREAK_EVENT = 1
- CTRL_CLOSE_EVENT = 2
- CTRL_SHUTDOWN_EVENT = 6
- if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT):
- server_logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...")
- self.celery_manager.stop_worker_immediately()
- sys.exit(0)
- return False
- win32api.SetConsoleCtrlHandler(win32_handler, True)
- except (ImportError, AttributeError) as e:
- server_logger.debug(f"Windows控制台事件处理不可用: {e}")
- # 创建应用实例和运行器
- app = app_factory.create_app()
- server_runner = ServerRunner(app_factory)
- server_logger.info(msg="APP init successfully - 集成施工方案审查系统")
- # 运行Uvicorn服务器
- if __name__ == "__main__":
- # 使用新的服务器运行器启动
- config = app_factory.create_server_config()
- server_logger.info(f"Agent API服务启动中...运行在{config['host']}:{config['port']}")
- if config['with_celery']:
- server_logger.info("Celery Worker: 已集成启动")
- else:
- server_logger.warning("Celery Worker: 已禁用")
- server_runner.run_server(**config)
|