| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521 |
- import os
- import sys
- import time
- import uvicorn
- import datetime
- import threading
- from pathlib import Path
- import argparse
- import signal
- import traceback
- BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.insert(0, BASE_DIR)
- from views import lifespan
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi import FastAPI, HTTPException
- from fastapi.responses import JSONResponse
- from foundation.logger.loggering import server_logger
- from foundation.base.celery_app import app as celery_app
- # 导入所有路由
- from views.test_views import test_router
- from views.construction_review.file_upload import file_upload_router
- from views.construction_review.review_results import review_results_router
- from views.construction_review.launch_review import launch_review_router
- # 创建 FastAPI 应用
- def create_app() -> FastAPI:
- """创建主应用服务"""
- app = FastAPI(
- title="Agent API - 施工方案审查系统",
- version="0.3",
- description="Agent API - 集成施工方案审查功能",
- lifespan=lifespan
- )
- # 添加 CORS 中间件
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"], # 允许所有的来源
- allow_credentials=True,
- allow_methods=["*"], # 允许的HTTP方法
- allow_headers=["*"], # 允许的请求头
- )
- # 添加所有路由
- app.include_router(test_router)
- app.include_router(file_upload_router)
- app.include_router(review_results_router)
- app.include_router(launch_review_router)
- # 全局异常处理
- @app.exception_handler(HTTPException)
- async def http_exception_handler(request, exc):
- return JSONResponse(
- status_code=exc.status_code,
- content=exc.detail
- )
- # 健康检查
- @app.get("/health")
- async def health_check():
- timestamp = datetime.datetime.now().isoformat()
- return {"status": "healthy", "timestamp": timestamp}
- # Celery状态检查
- @app.get("/celery/status")
- async def get_celery_status():
- """获取Celery Worker状态"""
- global celery_manager
- status = celery_manager.get_status()
- return {
- "celery_worker": status,
- "timestamp": datetime.datetime.now().isoformat()
- }
- # API文档
- @app.get("/api/docs")
- async def api_docs():
- return {
- "title": "Agent API服务文档",
- "description": "集成施工方案审查功能的API接口文档",
- "version": "V.0.3",
- "apis": [
- {
- "name": "测试接口",
- "path": "/test",
- "method": "GET",
- "description": "系统测试接口"
- },
- {
- "name": "文档上传",
- "path": "/sgsc/file_upload",
- "method": "POST",
- "description": "上传施工方案文档"
- },
- {
- "name": "审查启动",
- "path": "/sgsc/sse/launch_review",
- "method": "POST",
- "description": "启动AI审查工作流"
- },
- {
- "name": "进度推送",
- "path": "/sgsc/sse/progress/{callback_task_id}",
- "method": "GET",
- "description": "SSE实时进度推送"
- },
- {
- "name": "结果获取",
- "path": "/sgsc/review_results",
- "method": "POST",
- "description": "获取审查结果"
- }
- ],
- }
- return app
- # Celery Worker管理器
- class CeleryWorkerManager:
- """Celery Worker程序化管理器"""
- def __init__(self):
- self.worker = None
- self.is_running = False
- self.worker_thread = None
- self.shutdown_event = threading.Event()
- def start_worker(self, **kwargs):
- """启动Celery Worker"""
- if self.is_running:
- server_logger.warning("Celery Worker已在运行")
- return True
- try:
- # 创建Worker函数
- def run_celery_worker():
- try:
- # 使用最简单的启动方式
- server_logger.info("Celery Worker开始运行...")
- # 直接启动worker,使用默认配置
- celery_app.worker_main(['worker'])
- except KeyboardInterrupt:
- server_logger.info("收到停止信号,Celery Worker退出")
- except Exception as e:
- server_logger.error(f"Celery Worker运行时出错: {e}")
- server_logger.exception("详细错误信息:")
- finally:
- self.is_running = False
- server_logger.info("Celery Worker已停止")
- # 在单独线程中启动Worker
- self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True)
- self.worker_thread.start()
- self.is_running = True
- # 等待启动
- time.sleep(2)
- if self.is_running and self.worker_thread.is_alive():
- server_logger.info("Celery Worker启动成功")
- return True
- else:
- server_logger.error("Celery Worker启动失败")
- self.is_running = False
- return False
- except ImportError as e:
- server_logger.error(f"导入Celery失败: {e}")
- server_logger.info("请先安装Celery: pip install celery redis")
- return False
- except Exception as e:
- server_logger.error(f"启动Celery Worker失败: {e}")
- server_logger.exception("详细错误信息:")
- return False
- def stop_worker(self, timeout: int = 5):
- """停止Celery Worker"""
- if not self.is_running:
- server_logger.info("Celery Worker未运行")
- return True
- try:
- server_logger.info("停止Celery Worker...")
- self.shutdown_event.set()
- # 发送停止信号给线程
- if self.worker_thread and self.worker_thread.is_alive():
- # 尝试优雅停止
- start_time = time.time()
- while self.is_running and (time.time() - start_time) < timeout:
- time.sleep(0.1)
- # 如果还没停止,记录警告
- if self.is_running:
- server_logger.warning("Celery Worker优雅停止超时")
- else:
- server_logger.info("Celery Worker已优雅停止")
- self.is_running = False
- self.shutdown_event.clear()
- return True
- except Exception as e:
- server_logger.error(f"停止Celery Worker失败: {e}")
- return False
- def stop_worker_immediately(self):
- """立即停止Celery Worker,不等待"""
- if not self.is_running:
- server_logger.info("Celery Worker未运行")
- return True
- try:
- server_logger.info("立即停止Celery Worker...")
- self.shutdown_event.set()
- # 设置超时事件,强制停止
- import os
- # 发送中断信号给当前进程
- if hasattr(os, 'kill'):
- try:
- os.kill(os.getpid(), signal.SIGINT)
- server_logger.info("已发送中断信号")
- except:
- pass
- # 立即设置状态为停止
- self.is_running = False
- self.shutdown_event.clear()
- server_logger.info("Celery Worker已立即停止")
- return True
- except Exception as e:
- server_logger.error(f"立即停止Celery Worker失败: {e}")
- # 即使失败也要设置状态
- self.is_running = False
- return False
- def get_status(self):
- """获取Worker状态"""
- return {
- "is_running": self.is_running,
- "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False,
- }
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.stop_worker()
- # 全局Worker管理器实例
- celery_manager = CeleryWorkerManager()
- def cleanup_redis_before_start():
- """启动前清理Redis中的残留Celery任务"""
- try:
- import redis
- from foundation.base.config import config_handler
- # 连接Redis
- redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
- redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
- redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
- redis_db = config_handler.get('redis', 'REDIS_DB', '0')
- if redis_password:
- redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}'
- else:
- redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}'
- r = redis.from_url(redis_url, decode_responses=True)
- server_logger.info("清理Redis中的残留Celery任务...")
- # 清理所有Celery相关的键,包括更多模式
- keys_to_delete = []
- for key in r.keys():
- key_lower = key.lower()
- # 扩展匹配模式,包括你遇到的实际键格式
- if any(keyword in key_lower for keyword in [
- 'celery', 'task:', 'celery-task', 'kombu', 'current:'
- ]):
- keys_to_delete.append(key)
- # 匹配特定模式
- elif key.startswith('celery-task-meta-') or key.startswith('current:'):
- keys_to_delete.append(key)
- # 临时键
- elif key == 't_key':
- keys_to_delete.append(key)
- # 清理消息队列
- try:
- # 清理所有Celery队列
- queues = ['celery', 'celery.pidbox', 'celeryev']
- for queue in queues:
- # 删除队列
- r.delete(queue)
- server_logger.debug(f"已清理队列: {queue}")
- # 清理Kombu绑定
- kombu_keys = r.keys('_kombu.binding.*')
- for key in kombu_keys:
- r.delete(key)
- server_logger.debug(f"已清理Kombu绑定: {key}")
- except Exception as e:
- server_logger.warning(f"清理队列失败: {e}")
- # 清理识别到的键
- if keys_to_delete:
- for key in keys_to_delete:
- try:
- r.delete(key)
- server_logger.debug(f"已清理: {key}")
- except Exception as e:
- server_logger.warning(f"清理 {key} 失败: {e}")
- server_logger.info(f"成功清理 {len(keys_to_delete)} 个Redis键")
- else:
- server_logger.info("没有发现需要清理的残留任务")
- # 额外检查:确保关键队列被清空
- try:
- # 使用FLUSHDB只清空Celery相关的数据,而不是整个数据库
- # 这里我们检查是否还有残留,如果有则进行更彻底的清理
- remaining_keys = []
- for key in r.keys():
- if any(pattern in key.lower() for pattern in ['celery', 'kombu']):
- remaining_keys.append(key)
- if remaining_keys:
- server_logger.warning(f"发现 {len(remaining_keys)} 个残留键,进行彻底清理")
- for key in remaining_keys:
- try:
- r.delete(key)
- server_logger.debug(f"彻底清理: {key}")
- except Exception as e:
- server_logger.warning(f"彻底清理 {key} 失败: {e}")
- except Exception as e:
- server_logger.warning(f"彻底清理检查失败: {e}")
- return True
- except Exception as e:
- server_logger.error(f"清理Redis残留任务失败: {e}")
- return False
- def start_celery_worker_background():
- """在后台启动Celery Worker(异步方式)"""
- # 启动前清理残留任务
- cleanup_redis_before_start()
- # 添加调用栈调试
- server_logger.info("=== Celery Worker启动调用栈 ===")
- for line in traceback.format_stack():
- server_logger.debug(f" {line.strip()}")
- server_logger.info("=== 调用栈结束 ===")
- return celery_manager.start_worker()
- def stop_celery_worker():
- """停止Celery Worker"""
- global celery_manager
- # 立即取消所有任务注册(使用DB0,与启动时保持一致)
- try:
- import redis
- from foundation.base.config import config_handler
- # 连接Redis(使用配置文件的数据库)
- redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
- redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
- redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
- redis_db = config_handler.get('redis', 'REDIS_DB', '0')
- if redis_password:
- redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}'
- else:
- redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}'
- r = redis.from_url(redis_url, decode_responses=True)
- server_logger.info("停止时清理Redis中的Celery任务...")
- # 清理任务相关键
- task_keys = r.keys('task:*') # 重复任务检查器的数据
- celery_meta_keys = r.keys('celery-task-meta-*')
- current_keys = r.keys('current:*')
- kombu_keys = r.keys('_kombu.binding.*')
- all_keys = task_keys + celery_meta_keys + current_keys + kombu_keys
- for key in all_keys:
- try:
- r.delete(key)
- server_logger.debug(f"停止时清理: {key}")
- except Exception as e:
- server_logger.warning(f"停止时清理 {key} 失败: {e}")
- # 清理队列
- queues = ['celery', 'celery.pidbox', 'celeryev']
- for queue in queues:
- try:
- r.delete(queue)
- server_logger.debug(f"停止时清理队列: {queue}")
- except Exception as e:
- server_logger.warning(f"停止时清理队列 {queue} 失败: {e}")
- server_logger.info(f"停止时已清理 {len(all_keys)} 个Redis键")
- except Exception as e:
- server_logger.error(f"停止时清理Redis任务失败: {e}")
- # 立即停止Worker,不等待
- return celery_manager.stop_worker_immediately()
- def run_server(host: str = "127.0.0.1", port: int = 8002, reload: bool = False,
- with_celery: bool = True):
- """运行服务器"""
- if with_celery:
- # 启动Celery Worker
- start_celery_worker_background()
- # 注册退出时的清理函数
- import atexit
- atexit.register(stop_celery_worker)
- # 设置信号处理
- def signal_handler(signum, frame):
- server_logger.info(f"收到信号 {signum},正在停止服务...")
- stop_celery_worker()
- sys.exit(0)
- # Windows和Unix系统的信号处理
- try:
- signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
- signal.signal(signal.SIGTERM, signal_handler) # 终止信号
- except AttributeError:
- # Windows可能不支持某些信号
- pass
- # Windows特有的控制台事件处理
- if sys.platform == 'win32':
- try:
- import win32api
- def win32_handler(dwCtrlType):
- # 正确的控制台事件常量
- CTRL_C_EVENT = 0
- CTRL_BREAK_EVENT = 1
- CTRL_CLOSE_EVENT = 2
- CTRL_SHUTDOWN_EVENT = 6
- if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT):
- server_logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...")
- stop_celery_worker()
- sys.exit(0)
- return False
- win32api.SetConsoleCtrlHandler(win32_handler, True)
- except (ImportError, AttributeError) as e:
- # 如果win32api不可用,跳过Windows控制台处理
- server_logger.debug(f"Windows控制台事件处理不可用: {e}")
- pass
- try:
- if reload:
- # 重载模式需要正确的模块路径
- app_import_path = "server.app:app"
- uvicorn.run(app_import_path, host=host, port=port, reload=reload)
- else:
- # 直接运行模式,直接使用app对象
- uvicorn.run(app, host=host, port=port)
- finally:
- if with_celery:
- stop_celery_worker()
- # 创建应用实例
- app = create_app()
- server_logger.info(msg="APP init successfully - 集成施工方案审查系统")
- # 运行Uvicorn服务器
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description='Agent API - 施工方案审查系统')
- parser.add_argument('--host', default='0.0.0.0', help='服务器地址')
- parser.add_argument('--port', type=int, default=8001, help='服务器端口')
- parser.add_argument('--no-celery', action='store_true', help='不启动Celery Worker')
- parser.add_argument('--no-reload', action='store_true', help='关闭热重载')
- args = parser.parse_args()
- server_logger.info("Agent API服务启动中...")
- server_logger.info(f"服务地址: http://{args.host}:{args.port}")
- server_logger.info(f"API文档: http://{args.host}:{args.port}/docs")
- server_logger.info(f"健康检查: http://{args.host}:{args.port}/health")
- server_logger.info(f"API概览: http://{args.host}:{args.port}/api/docs")
- if not args.no_celery:
- server_logger.info("Celery Worker: 已集成启动")
- else:
- server_logger.warning("Celery Worker: 已禁用")
- run_server(
- host=args.host,
- port=8002,
- reload=False,
- with_celery=not args.no_celery
- )
|