import os import sys import time import uvicorn import datetime import threading from pathlib import Path import argparse import signal import traceback BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, BASE_DIR) from views import lifespan from fastapi.middleware.cors import CORSMiddleware from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse from foundation.logger.loggering import server_logger from foundation.base.celery_app import app as celery_app # 导入所有路由 from views.test_views import test_router from views.construction_review.file_upload import file_upload_router from views.construction_review.review_results import review_results_router from views.construction_review.launch_review import launch_review_router from views.construction_review.task_progress import task_progress_router # 创建 FastAPI 应用 def create_app() -> FastAPI: """创建主应用服务""" app = FastAPI( title="Agent API - 施工方案审查系统", version="0.3", description="Agent API - 集成施工方案审查功能", lifespan=lifespan ) # 添加 CORS 中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], # 允许所有的来源 allow_credentials=True, allow_methods=["*"], # 允许的HTTP方法 allow_headers=["*"], # 允许的请求头 ) # 添加所有路由 app.include_router(test_router) app.include_router(file_upload_router) app.include_router(review_results_router) app.include_router(launch_review_router) app.include_router(task_progress_router) # 全局异常处理 @app.exception_handler(HTTPException) async def http_exception_handler(request, exc): return JSONResponse( status_code=exc.status_code, content=exc.detail ) # 健康检查 @app.get("/health") async def health_check(): timestamp = datetime.datetime.now().isoformat() return {"status": "healthy", "timestamp": timestamp} # Celery状态检查 @app.get("/celery/status") async def get_celery_status(): """获取Celery Worker状态""" global celery_manager status = celery_manager.get_status() return { "celery_worker": status, "timestamp": datetime.datetime.now().isoformat() } # API文档 @app.get("/api/docs") async def api_docs(): return { "title": "Agent API服务文档", "description": "集成施工方案审查功能的API接口文档", "version": "V.0.3", "apis": [ { "name": "测试接口", "path": "/test", "method": "GET", "description": "系统测试接口" }, { "name": "文档上传", "path": "/sgsc/file_upload", "method": "POST", "description": "上传施工方案文档" }, { "name": "审查启动", "path": "/sgsc/sse/launch_review", "method": "POST", "description": "启动AI审查工作流" }, { "name": "进度推送", "path": "/sgsc/sse/progress/{callback_task_id}", "method": "GET", "description": "SSE实时进度推送" }, { "name": "结果获取", "path": "/sgsc/review_results", "method": "POST", "description": "获取审查结果" } ], } return app # Celery Worker管理器 class CeleryWorkerManager: """Celery Worker程序化管理器""" def __init__(self): self.worker = None self.is_running = False self.worker_thread = None self.shutdown_event = threading.Event() def start_worker(self, **kwargs): """启动Celery Worker""" if self.is_running: server_logger.warning("Celery Worker已在运行") return True try: # 创建Worker函数 def run_celery_worker(): try: # 使用最简单的启动方式 server_logger.info("Celery Worker开始运行...") # 直接启动worker,使用默认配置 celery_app.worker_main(['worker']) except KeyboardInterrupt: server_logger.info("收到停止信号,Celery Worker退出") except Exception as e: server_logger.error(f"Celery Worker运行时出错: {e}") server_logger.exception("详细错误信息:") finally: self.is_running = False server_logger.info("Celery Worker已停止") # 在单独线程中启动Worker self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True) self.worker_thread.start() self.is_running = True # 等待启动 time.sleep(2) if self.is_running and self.worker_thread.is_alive(): server_logger.info("Celery Worker启动成功") return True else: server_logger.error("Celery Worker启动失败") self.is_running = False return False except ImportError as e: server_logger.error(f"导入Celery失败: {e}") server_logger.info("请先安装Celery: pip install celery redis") return False except Exception as e: server_logger.error(f"启动Celery Worker失败: {e}") server_logger.exception("详细错误信息:") return False def stop_worker(self, timeout: int = 5): """停止Celery Worker""" if not self.is_running: server_logger.info("Celery Worker未运行") return True try: server_logger.info("停止Celery Worker...") self.shutdown_event.set() # 发送停止信号给线程 if self.worker_thread and self.worker_thread.is_alive(): # 尝试优雅停止 start_time = time.time() while self.is_running and (time.time() - start_time) < timeout: time.sleep(0.1) # 如果还没停止,记录警告 if self.is_running: server_logger.warning("Celery Worker优雅停止超时") else: server_logger.info("Celery Worker已优雅停止") self.is_running = False self.shutdown_event.clear() return True except Exception as e: server_logger.error(f"停止Celery Worker失败: {e}") return False def stop_worker_immediately(self): """立即停止Celery Worker,不等待""" if not self.is_running: server_logger.info("Celery Worker未运行") return True try: server_logger.info("立即停止Celery Worker...") self.shutdown_event.set() # 设置超时事件,强制停止 import os # 发送中断信号给当前进程 if hasattr(os, 'kill'): try: os.kill(os.getpid(), signal.SIGINT) server_logger.info("已发送中断信号") except: pass # 立即设置状态为停止 self.is_running = False self.shutdown_event.clear() server_logger.info("Celery Worker已立即停止") return True except Exception as e: server_logger.error(f"立即停止Celery Worker失败: {e}") # 即使失败也要设置状态 self.is_running = False return False def get_status(self): """获取Worker状态""" return { "is_running": self.is_running, "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False, } def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop_worker() # 全局Worker管理器实例 celery_manager = CeleryWorkerManager() def cleanup_redis_before_start(): """启动前清理Redis中的残留Celery任务""" try: import redis from foundation.base.config import config_handler # 连接Redis redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost') redis_port = config_handler.get('redis', 'REDIS_PORT', '6379') redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '') if redis_password: redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/0' else: redis_url = f'redis://{redis_host}:{redis_port}/0' r = redis.from_url(redis_url, decode_responses=True) server_logger.info("清理Redis中的残留Celery任务...") # 清理所有Celery相关的键 keys_to_delete = [] for key in r.keys(): if any(keyword in key.lower() for keyword in ['celery', 'task:']): keys_to_delete.append(key) if keys_to_delete: for key in keys_to_delete: try: r.delete(key) server_logger.debug(f"已清理: {key}") except Exception as e: server_logger.warning(f"清理 {key} 失败: {e}") server_logger.info(f"成功清理 {len(keys_to_delete)} 个Redis键") else: server_logger.info("没有发现需要清理的残留任务") return True except Exception as e: server_logger.error(f"清理Redis残留任务失败: {e}") return False def start_celery_worker_background(): """在后台启动Celery Worker(异步方式)""" # 启动前清理残留任务 cleanup_redis_before_start() # 添加调用栈调试 server_logger.info("=== Celery Worker启动调用栈 ===") for line in traceback.format_stack(): server_logger.debug(f" {line.strip()}") server_logger.info("=== 调用栈结束 ===") return celery_manager.start_worker() def stop_celery_worker(): """停止Celery Worker""" global celery_manager # 立即取消所有任务注册 try: import redis from foundation.base.config import config_handler # 连接Redis redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost') redis_port = config_handler.get('redis', 'REDIS_PORT', '6379') redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '') if redis_password: redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/2' else: redis_url = f'redis://{redis_host}:{redis_port}/2' r = redis.from_url(redis_url, decode_responses=True) # 清理所有任务注册 task_keys = r.keys('task:*') for key in task_keys: r.delete(key) server_logger.info(f"取消任务注册: {key}") server_logger.info(f"已取消 {len(task_keys)} 个任务注册") except Exception as e: server_logger.error(f"取消任务注册失败: {e}") # 立即停止Worker,不等待 return celery_manager.stop_worker_immediately() def run_server(host: str = "127.0.0.1", port: int = 8001, reload: bool = False, with_celery: bool = True): """运行服务器""" if with_celery: # 启动Celery Worker start_celery_worker_background() # 注册退出时的清理函数 import atexit atexit.register(stop_celery_worker) # 设置信号处理 def signal_handler(signum, frame): server_logger.info(f"收到信号 {signum},正在停止服务...") stop_celery_worker() sys.exit(0) # Windows和Unix系统的信号处理 try: signal.signal(signal.SIGINT, signal_handler) # Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # 终止信号 except AttributeError: # Windows可能不支持某些信号 pass # Windows特有的控制台事件处理 if sys.platform == 'win32': try: import win32api def win32_handler(dwCtrlType): # 正确的控制台事件常量 CTRL_C_EVENT = 0 CTRL_BREAK_EVENT = 1 CTRL_CLOSE_EVENT = 2 CTRL_SHUTDOWN_EVENT = 6 if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT): server_logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...") stop_celery_worker() sys.exit(0) return False win32api.SetConsoleCtrlHandler(win32_handler, True) except (ImportError, AttributeError) as e: # 如果win32api不可用,跳过Windows控制台处理 server_logger.debug(f"Windows控制台事件处理不可用: {e}") pass try: if reload: # 重载模式需要正确的模块路径 app_import_path = "server.app:app" uvicorn.run(app_import_path, host=host, port=port, reload=reload) else: # 直接运行模式,直接使用app对象 uvicorn.run(app, host=host, port=port) finally: if with_celery: stop_celery_worker() # 创建应用实例 app = create_app() server_logger.info(msg="APP init successfully - 集成施工方案审查系统") # 运行Uvicorn服务器 if __name__ == "__main__": parser = argparse.ArgumentParser(description='Agent API - 施工方案审查系统') parser.add_argument('--host', default='0.0.0.0', help='服务器地址') parser.add_argument('--port', type=int, default=8001, help='服务器端口') parser.add_argument('--no-celery', action='store_true', help='不启动Celery Worker') parser.add_argument('--no-reload', action='store_true', help='关闭热重载') args = parser.parse_args() server_logger.info("Agent API服务启动中...") server_logger.info(f"服务地址: http://{args.host}:{args.port}") server_logger.info(f"API文档: http://{args.host}:{args.port}/docs") server_logger.info(f"健康检查: http://{args.host}:{args.port}/health") server_logger.info(f"API概览: http://{args.host}:{args.port}/api/docs") if not args.no_celery: server_logger.info("Celery Worker: 已集成启动") else: server_logger.warning("Celery Worker: 已禁用") run_server( host=args.host, port=args.port, reload=False, with_celery=not args.no_celery )