""" 服务主应用 整合所有接口,提供统一的测试服务 """ import datetime import sys import os import threading import subprocess import time from multiprocessing import Process # 添加项目根目录到Python路径 current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(os.path.dirname(current_dir)) sys.path.insert(0, project_root) # 现在可以正常导入了 from foundation.logger.loggering import server_logger as logger from foundation.base.celery_app import app as celery_app from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse import uvicorn # 现在可以正常导入了 from views.construction_review.file_upload import file_upload_router from views.construction_review.task_progress import task_progress_router from views.construction_review.review_results import review_results_router def create_app() -> FastAPI: """创建接口服务""" app = FastAPI( title="施工方案审查API服务", description="用于前端开发和接口联调服务", version="0.0.1" ) # 添加CORS中间件(允许前端访问) app.add_middleware( CORSMiddleware, allow_origins=["*"], # 在生产环境中应该设置具体的域名 allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 添加路由 app.include_router(file_upload_router) app.include_router(task_progress_router) app.include_router(review_results_router) # 全局异常处理 @app.exception_handler(HTTPException) async def http_exception_handler(request, exc): return JSONResponse( status_code=exc.status_code, content=exc.detail ) # 健康检查 @app.get("/health") async def health_check(): timestamp = datetime.datetime.now().isoformat() return {"status": "healthy", "timestamp": timestamp} # Celery状态检查 @app.get("/celery/status") async def get_celery_status(): """获取Celery Worker状态""" global celery_manager status = celery_manager.get_status() return { "celery_worker": status, "timestamp": datetime.datetime.now().isoformat() } # API文档 @app.get("/api/docs") async def api_docs(): return { "title": "施工方案审查服务API文档", "description": "API接口文档", "version": "V.0.1", "apis": [ { "name": "文档上传", "path": "/sgsc/file_upload", "method": "POST", "description": "上传施工方案文档" }, { "name": "进度查询", "path": "/sgsc/task_progress/{callback_task_id}", "method": "GET", "description": "查询审查任务进度" }, { "name": "结果获取", "path": "/sgsc/review_results", "method": "POST", "description": "获取审查结果" } ], } return app # Celery Worker管理器 class CeleryWorkerManager: """Celery Worker程序化管理器""" def __init__(self): self.worker = None self.is_running = False self.worker_thread = None self.shutdown_event = threading.Event() def start_worker(self, **kwargs): """启动Celery Worker""" if self.is_running: logger.warning("Celery Worker已在运行") return True try: # 导入Celery应用 from foundation.base.celery_app import app as celery_app # 创建Worker函数 def run_celery_worker(): try: # 使用最简单的启动方式 logger.info("Celery Worker开始运行...") # 直接启动worker,使用默认配置 celery_app.worker_main(['worker']) except KeyboardInterrupt: logger.info("收到停止信号,Celery Worker退出") except Exception as e: logger.error(f"Celery Worker运行时出错: {e}") logger.exception("详细错误信息:") finally: self.is_running = False logger.info("Celery Worker已停止") # 在单独线程中启动Worker self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True) self.worker_thread.start() self.is_running = True # 等待启动 time.sleep(2) if self.is_running and self.worker_thread.is_alive(): logger.info("Celery Worker启动成功") return True else: logger.error("Celery Worker启动失败") self.is_running = False return False except ImportError as e: logger.error(f"导入Celery失败: {e}") logger.info("请先安装Celery: pip install celery redis") return False except Exception as e: logger.error(f"启动Celery Worker失败: {e}") logger.exception("详细错误信息:") return False def stop_worker(self, timeout: int = 5): """停止Celery Worker""" if not self.is_running: logger.info("Celery Worker未运行") return True try: logger.info("停止Celery Worker...") self.shutdown_event.set() # 发送停止信号给线程 if self.worker_thread and self.worker_thread.is_alive(): # 尝试优雅停止 start_time = time.time() while self.is_running and (time.time() - start_time) < timeout: time.sleep(0.1) # 如果还没停止,记录警告 if self.is_running: logger.warning("Celery Worker优雅停止超时") else: logger.info("Celery Worker已优雅停止") self.is_running = False self.shutdown_event.clear() return True except Exception as e: logger.error(f"停止Celery Worker失败: {e}") return False def stop_worker_immediately(self): """立即停止Celery Worker,不等待""" if not self.is_running: logger.info("Celery Worker未运行") return True try: logger.info("立即停止Celery Worker...") self.shutdown_event.set() # 设置超时事件,强制停止 import signal import os # 发送中断信号给当前进程 if hasattr(os, 'kill'): try: os.kill(os.getpid(), signal.SIGINT) logger.info("已发送中断信号") except: pass # 立即设置状态为停止 self.is_running = False self.shutdown_event.clear() logger.info("Celery Worker已立即停止") return True except Exception as e: logger.error(f"立即停止Celery Worker失败: {e}") # 即使失败也要设置状态 self.is_running = False return False def get_status(self): """获取Worker状态""" return { "is_running": self.is_running, "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False, } def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop_worker() # 全局Worker管理器实例 celery_manager = CeleryWorkerManager() def start_celery_worker(): """启动Celery Worker(同步方式,用于测试)""" return celery_manager.start_worker() def cleanup_redis_before_start(): """启动前清理Redis中的残留Celery任务""" try: import redis from foundation.base.config import config_handler # 连接Redis redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost') redis_port = config_handler.get('redis', 'REDIS_PORT', '6379') redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '') if redis_password: redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/0' else: redis_url = f'redis://{redis_host}:{redis_port}/0' r = redis.from_url(redis_url, decode_responses=True) logger.info("清理Redis中的残留Celery任务...") # 清理所有Celery相关的键 keys_to_delete = [] for key in r.keys(): if any(keyword in key.lower() for keyword in ['celery', 'task:']): keys_to_delete.append(key) if keys_to_delete: for key in keys_to_delete: try: r.delete(key) logger.debug(f"已清理: {key}") except Exception as e: logger.warning(f"清理 {key} 失败: {e}") logger.info(f"成功清理 {len(keys_to_delete)} 个Redis键") else: logger.info("没有发现需要清理的残留任务") return True except Exception as e: logger.error(f"清理Redis残留任务失败: {e}") return False def start_celery_worker_background(): """在后台启动Celery Worker(异步方式)""" # 启动前清理残留任务 cleanup_redis_before_start() # 添加调用栈调试 import traceback logger.info("=== Celery Worker启动调用栈 ===") for line in traceback.format_stack(): logger.debug(f" {line.strip()}") logger.info("=== 调用栈结束 ===") return celery_manager.start_worker() def stop_celery_worker(): """停止Celery Worker""" global celery_manager # 立即取消所有任务注册 try: import redis from foundation.base.config import config_handler # 连接Redis redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost') redis_port = config_handler.get('redis', 'REDIS_PORT', '6379') redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '') if redis_password: redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/2' else: redis_url = f'redis://{redis_host}:{redis_port}/2' r = redis.from_url(redis_url, decode_responses=True) # 清理所有任务注册 task_keys = r.keys('task:*') for key in task_keys: r.delete(key) logger.info(f"取消任务注册: {key}") logger.info(f"已取消 {len(task_keys)} 个任务注册") except Exception as e: logger.error(f"取消任务注册失败: {e}") # 立即停止Worker,不等待 return celery_manager.stop_worker_immediately() def run_server(host: str = "127.0.0.1", port: int = 8034, reload: bool = False, with_celery: bool = True): """运行服务器""" if with_celery: # 启动Celery Worker start_celery_worker_background() # 注册退出时的清理函数 import atexit atexit.register(stop_celery_worker) # 设置信号处理 import signal def signal_handler(signum, frame): logger.info(f"收到信号 {signum},正在停止服务...") stop_celery_worker() sys.exit(0) # Windows和Unix系统的信号处理 try: signal.signal(signal.SIGINT, signal_handler) # Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # 终止信号 except AttributeError: # Windows可能不支持某些信号 pass # Windows特有的控制台事件处理 if sys.platform == 'win32': try: import win32api def win32_handler(dwCtrlType): # 正确的控制台事件常量 CTRL_C_EVENT = 0 CTRL_BREAK_EVENT = 1 CTRL_CLOSE_EVENT = 2 CTRL_SHUTDOWN_EVENT = 6 if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT): logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...") stop_celery_worker() sys.exit(0) return False win32api.SetConsoleCtrlHandler(win32_handler, True) except (ImportError, AttributeError) as e: # 如果win32api不可用,跳过Windows控制台处理 logger.debug(f"Windows控制台事件处理不可用: {e}") pass try: if reload: # 重载模式需要正确的模块路径 app_import_path = "views.construction_review.app:app" uvicorn.run(app_import_path, host=host, port=port, reload=reload) else: # 直接运行模式,直接使用app对象 uvicorn.run(app, host=host, port=port) finally: if with_celery: stop_celery_worker() app = create_app() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='施工方案审查API服务') parser.add_argument('--host', default='127.0.0.1', help='服务器地址') parser.add_argument('--port', type=int, default=8035, help='服务器端口') parser.add_argument('--no-celery', action='store_true', help='不启动Celery Worker') parser.add_argument('--no-reload', action='store_true', help='关闭热重载') args = parser.parse_args() logger.info("施工方案审查API服务启动中...") logger.info(f"服务地址: http://{args.host}:{args.port}") logger.info(f"API文档: http://{args.host}:{args.port}/docs") logger.info(f"健康检查: http://{args.host}:{args.port}/health") if not args.no_celery: logger.info("Celery Worker: 已集成启动") else: logger.warning("Celery Worker: 已禁用") run_server( host=args.host, port=args.port, reload=False, with_celery=not args.no_celery )