Przeglądaj źródła

v0.0.3-主路由合并

WangXuMing 3 miesięcy temu
rodzic
commit
4886218a40
2 zmienionych plików z 430 dodań i 24 usunięć
  1. 429 23
      server/app.py
  2. 1 1
      temp/AI审查结果.json

+ 429 - 23
server/app.py

@@ -1,46 +1,452 @@
 import os
 import sys
-
+import time
+import uvicorn
+import datetime
+import threading
+from pathlib import Path
+import argparse
+import signal
+import traceback
 
 BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 sys.path.insert(0, BASE_DIR)
 
 from views import lifespan
-
 from fastapi.middleware.cors import CORSMiddleware
-
-from fastapi import FastAPI
-
+from fastapi import FastAPI, HTTPException
+from fastapi.responses import JSONResponse
 from foundation.logger.loggering import server_logger
+from foundation.base.celery_app import app as celery_app
+
+# 导入所有路由
 from views.test_views import test_router
+from views.construction_review.file_upload import file_upload_router
+from views.construction_review.review_results import review_results_router
+from views.construction_review.launch_review import launch_review_router
+from views.construction_review.task_progress import task_progress_router
 
 
 # 创建 FastAPI 应用
-app = FastAPI(
-    title=" Agent API",
-    version="0.2",
-    description=" Agent API",
-    lifespan=lifespan
-)
+def create_app() -> FastAPI:
+    """创建主应用服务"""
+    app = FastAPI(
+        title="Agent API - 施工方案审查系统",
+        version="0.3",
+        description="Agent API - 集成施工方案审查功能",
+        lifespan=lifespan
+    )
+
+    # 添加 CORS 中间件
+    app.add_middleware(
+        CORSMiddleware,
+        allow_origins=["*"],  # 允许所有的来源
+        allow_credentials=True,
+        allow_methods=["*"],  # 允许的HTTP方法
+        allow_headers=["*"],  # 允许的请求头
+    )
+
+    # 添加所有路由
+    app.include_router(test_router)
+    app.include_router(file_upload_router)
+    app.include_router(review_results_router)
+    app.include_router(launch_review_router)
+    app.include_router(task_progress_router)
+
+    # 全局异常处理
+    @app.exception_handler(HTTPException)
+    async def http_exception_handler(request, exc):
+        return JSONResponse(
+            status_code=exc.status_code,
+            content=exc.detail
+        )
+
+    # 健康检查
+    @app.get("/health")
+    async def health_check():
+        timestamp = datetime.datetime.now().isoformat()
+        return {"status": "healthy", "timestamp": timestamp}
+
+    # Celery状态检查
+    @app.get("/celery/status")
+    async def get_celery_status():
+        """获取Celery Worker状态"""
+        global celery_manager
+        status = celery_manager.get_status()
+        return {
+            "celery_worker": status,
+            "timestamp": datetime.datetime.now().isoformat()
+        }
+
+    # API文档
+    @app.get("/api/docs")
+    async def api_docs():
+        return {
+            "title": "Agent API服务文档",
+            "description": "集成施工方案审查功能的API接口文档",
+            "version": "V.0.3",
+            "apis": [
+                {
+                    "name": "测试接口",
+                    "path": "/test",
+                    "method": "GET",
+                    "description": "系统测试接口"
+                },
+                {
+                    "name": "文档上传",
+                    "path": "/sgsc/file_upload",
+                    "method": "POST",
+                    "description": "上传施工方案文档"
+                },
+                {
+                    "name": "审查启动",
+                    "path": "/sgsc/sse/launch_review",
+                    "method": "POST",
+                    "description": "启动AI审查工作流"
+                },
+                {
+                    "name": "进度推送",
+                    "path": "/sgsc/sse/progress/{callback_task_id}",
+                    "method": "GET",
+                    "description": "SSE实时进度推送"
+                },
+                {
+                    "name": "结果获取",
+                    "path": "/sgsc/review_results",
+                    "method": "POST",
+                    "description": "获取审查结果"
+                }
+            ],
+        }
+
+    return app
+
+
+# Celery Worker管理器
+class CeleryWorkerManager:
+    """Celery Worker程序化管理器"""
+
+    def __init__(self):
+        self.worker = None
+        self.is_running = False
+        self.worker_thread = None
+        self.shutdown_event = threading.Event()
+
+    def start_worker(self, **kwargs):
+        """启动Celery Worker"""
+        if self.is_running:
+            server_logger.warning("Celery Worker已在运行")
+            return True
+
+        try:
+            # 创建Worker函数
+            def run_celery_worker():
+                try:
+                    # 使用最简单的启动方式
+                    server_logger.info("Celery Worker开始运行...")
+
+                    # 直接启动worker,使用默认配置
+                    celery_app.worker_main(['worker'])
+
+                except KeyboardInterrupt:
+                    server_logger.info("收到停止信号,Celery Worker退出")
+                except Exception as e:
+                    server_logger.error(f"Celery Worker运行时出错: {e}")
+                    server_logger.exception("详细错误信息:")
+                finally:
+                    self.is_running = False
+                    server_logger.info("Celery Worker已停止")
+
+            # 在单独线程中启动Worker
+            self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True)
+            self.worker_thread.start()
+            self.is_running = True
+
+            # 等待启动
+            time.sleep(2)
+
+            if self.is_running and self.worker_thread.is_alive():
+                server_logger.info("Celery Worker启动成功")
+                return True
+            else:
+                server_logger.error("Celery Worker启动失败")
+                self.is_running = False
+                return False
+
+        except ImportError as e:
+            server_logger.error(f"导入Celery失败: {e}")
+            server_logger.info("请先安装Celery: pip install celery redis")
+            return False
+        except Exception as e:
+            server_logger.error(f"启动Celery Worker失败: {e}")
+            server_logger.exception("详细错误信息:")
+            return False
+
+    def stop_worker(self, timeout: int = 5):
+        """停止Celery Worker"""
+        if not self.is_running:
+            server_logger.info("Celery Worker未运行")
+            return True
+
+        try:
+            server_logger.info("停止Celery Worker...")
+            self.shutdown_event.set()
+
+            # 发送停止信号给线程
+            if self.worker_thread and self.worker_thread.is_alive():
+                # 尝试优雅停止
+                start_time = time.time()
+                while self.is_running and (time.time() - start_time) < timeout:
+                    time.sleep(0.1)
+
+                # 如果还没停止,记录警告
+                if self.is_running:
+                    server_logger.warning("Celery Worker优雅停止超时")
+                else:
+                    server_logger.info("Celery Worker已优雅停止")
+
+            self.is_running = False
+            self.shutdown_event.clear()
+            return True
+
+        except Exception as e:
+            server_logger.error(f"停止Celery Worker失败: {e}")
+            return False
+
+    def stop_worker_immediately(self):
+        """立即停止Celery Worker,不等待"""
+        if not self.is_running:
+            server_logger.info("Celery Worker未运行")
+            return True
+
+        try:
+            server_logger.info("立即停止Celery Worker...")
+            self.shutdown_event.set()
+
+            # 设置超时事件,强制停止
+            import os
+
+            # 发送中断信号给当前进程
+            if hasattr(os, 'kill'):
+                try:
+                    os.kill(os.getpid(), signal.SIGINT)
+                    server_logger.info("已发送中断信号")
+                except:
+                    pass
+
+            # 立即设置状态为停止
+            self.is_running = False
+            self.shutdown_event.clear()
+
+            server_logger.info("Celery Worker已立即停止")
+            return True
+
+        except Exception as e:
+            server_logger.error(f"立即停止Celery Worker失败: {e}")
+            # 即使失败也要设置状态
+            self.is_running = False
+            return False
 
+    def get_status(self):
+        """获取Worker状态"""
+        return {
+            "is_running": self.is_running,
+            "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False,
+        }
 
-app.include_router(test_router)
+    def __enter__(self):
+        return self
 
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.stop_worker()
 
-# 添加 CORS 中间件
-app.add_middleware(
-    CORSMiddleware,
-    allow_origins=["*"],  # 允许所有的来源
-    allow_credentials=True,
-    allow_methods=["*"],  # 允许的HTTP方法
-    allow_headers=["*"],  # 允许的请求头
-)
 
+# 全局Worker管理器实例
+celery_manager = CeleryWorkerManager()
 
-server_logger.info(msg="APP init successfully")
+def cleanup_redis_before_start():
+    """启动前清理Redis中的残留Celery任务"""
+    try:
+        import redis
+        from foundation.base.config import config_handler
+
+        # 连接Redis
+        redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
+        redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
+        redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
+
+        if redis_password:
+            redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/0'
+        else:
+            redis_url = f'redis://{redis_host}:{redis_port}/0'
+
+        r = redis.from_url(redis_url, decode_responses=True)
+
+        server_logger.info("清理Redis中的残留Celery任务...")
+
+        # 清理所有Celery相关的键
+        keys_to_delete = []
+        for key in r.keys():
+            if any(keyword in key.lower() for keyword in ['celery', 'task:']):
+                keys_to_delete.append(key)
+
+        if keys_to_delete:
+            for key in keys_to_delete:
+                try:
+                    r.delete(key)
+                    server_logger.debug(f"已清理: {key}")
+                except Exception as e:
+                    server_logger.warning(f"清理 {key} 失败: {e}")
+
+            server_logger.info(f"成功清理 {len(keys_to_delete)} 个Redis键")
+        else:
+            server_logger.info("没有发现需要清理的残留任务")
+
+        return True
+
+    except Exception as e:
+        server_logger.error(f"清理Redis残留任务失败: {e}")
+        return False
+
+def start_celery_worker_background():
+    """在后台启动Celery Worker(异步方式)"""
+    # 启动前清理残留任务
+    cleanup_redis_before_start()
+
+    # 添加调用栈调试
+    server_logger.info("=== Celery Worker启动调用栈 ===")
+    for line in traceback.format_stack():
+        server_logger.debug(f"  {line.strip()}")
+    server_logger.info("=== 调用栈结束 ===")
+
+    return celery_manager.start_worker()
+
+def stop_celery_worker():
+    """停止Celery Worker"""
+    global celery_manager
+
+    # 立即取消所有任务注册
+    try:
+        import redis
+        from foundation.base.config import config_handler
+
+        # 连接Redis
+        redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
+        redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
+        redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
+
+        if redis_password:
+            redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/2'
+        else:
+            redis_url = f'redis://{redis_host}:{redis_port}/2'
+
+        r = redis.from_url(redis_url, decode_responses=True)
+
+        # 清理所有任务注册
+        task_keys = r.keys('task:*')
+        for key in task_keys:
+            r.delete(key)
+            server_logger.info(f"取消任务注册: {key}")
+
+        server_logger.info(f"已取消 {len(task_keys)} 个任务注册")
+
+    except Exception as e:
+        server_logger.error(f"取消任务注册失败: {e}")
+
+    # 立即停止Worker,不等待
+    return celery_manager.stop_worker_immediately()
+
+def run_server(host: str = "127.0.0.1", port: int = 8001, reload: bool = False,
+                with_celery: bool = True):
+    """运行服务器"""
+
+    if with_celery:
+        # 启动Celery Worker
+        start_celery_worker_background()
+
+        # 注册退出时的清理函数
+        import atexit
+        atexit.register(stop_celery_worker)
+
+        # 设置信号处理
+        def signal_handler(signum, frame):
+            server_logger.info(f"收到信号 {signum},正在停止服务...")
+            stop_celery_worker()
+            sys.exit(0)
+
+        # Windows和Unix系统的信号处理
+        try:
+            signal.signal(signal.SIGINT, signal_handler)  # Ctrl+C
+            signal.signal(signal.SIGTERM, signal_handler)  # 终止信号
+        except AttributeError:
+            # Windows可能不支持某些信号
+            pass
+
+        # Windows特有的控制台事件处理
+        if sys.platform == 'win32':
+            try:
+                import win32api
+                def win32_handler(dwCtrlType):
+                    # 正确的控制台事件常量
+                    CTRL_C_EVENT = 0
+                    CTRL_BREAK_EVENT = 1
+                    CTRL_CLOSE_EVENT = 2
+                    CTRL_SHUTDOWN_EVENT = 6
+
+                    if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT):
+                        server_logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...")
+                        stop_celery_worker()
+                        sys.exit(0)
+                    return False
+                win32api.SetConsoleCtrlHandler(win32_handler, True)
+            except (ImportError, AttributeError) as e:
+                # 如果win32api不可用,跳过Windows控制台处理
+                server_logger.debug(f"Windows控制台事件处理不可用: {e}")
+                pass
+
+    try:
+        if reload:
+            # 重载模式需要正确的模块路径
+            app_import_path = "server.app:app"
+            uvicorn.run(app_import_path, host=host, port=port, reload=reload)
+        else:
+            # 直接运行模式,直接使用app对象
+            uvicorn.run(app, host=host, port=port)
+    finally:
+        if with_celery:
+            stop_celery_worker()
+
+
+# 创建应用实例
+app = create_app()
+
+server_logger.info(msg="APP init successfully - 集成施工方案审查系统")
 
 
 # 运行Uvicorn服务器
 if __name__ == "__main__":
-    import uvicorn
-    uvicorn.run(app, host="0.0.0.0", port=8001,reload=True)
+    parser = argparse.ArgumentParser(description='Agent API - 施工方案审查系统')
+    parser.add_argument('--host', default='0.0.0.0', help='服务器地址')
+    parser.add_argument('--port', type=int, default=8001, help='服务器端口')
+    parser.add_argument('--no-celery', action='store_true', help='不启动Celery Worker')
+    parser.add_argument('--no-reload', action='store_true', help='关闭热重载')
+
+    args = parser.parse_args()
+
+    server_logger.info("Agent API服务启动中...")
+    server_logger.info(f"服务地址: http://{args.host}:{args.port}")
+    server_logger.info(f"API文档: http://{args.host}:{args.port}/docs")
+    server_logger.info(f"健康检查: http://{args.host}:{args.port}/health")
+    server_logger.info(f"API概览: http://{args.host}:{args.port}/api/docs")
+
+    if not args.no_celery:
+        server_logger.info("Celery Worker: 已集成启动")
+    else:
+        server_logger.warning("Celery Worker: 已禁用")
+
+    run_server(
+        host=args.host,
+        port=args.port,
+        reload=False,
+        with_celery=not args.no_celery
+    )

Plik diff jest za duży
+ 1 - 1
temp/AI审查结果.json


Niektóre pliki nie zostały wyświetlone z powodu dużej ilości zmienionych plików