import sys import os from contextlib import asynccontextmanager from fastapi import FastAPI, Request, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from fastapi.exceptions import RequestValidationError from datetime import datetime, timezone import logging import logging.handlers # 导入路径配置 try: import path_config # 自动配置 Python 路径 except ImportError: # 手动配置路径(兼容性) # D:\UGit\LQAdminPlatform\src\app\server\app.py -> D:\UGit\LQAdminPlatform\src src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')) if src_path not in sys.path: sys.path.insert(0, src_path) # D:\UGit\LQAdminPlatform\src\app\server\app.py -> D:\UGit\LQAdminPlatform root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../..')) if root_path not in sys.path: sys.path.insert(0, root_path) # 尝试再次导入 path_config 以获取完整的路径配置(包括 app 目录等) try: import path_config except ImportError: pass from app.core.config import config_handler from app.base import init_db, close_db, init_redis, close_redis from app.core.exceptions import BaseAPIException from app.schemas.base import ResponseSchema from app.logger.config import configure_logging, get_logger # 配置全局日志(在导入其他模块之前) configure_logging( log_level=config_handler.get("admin_app", "LOG_LEVEL", "INFO").lower(), log_dir="logs", log_to_file=True, log_to_console=True ) # 获取logger logger = get_logger("lqadmin_server") # 导入视图路由 from views.system_view import router as system_router from views.sample_view import router as sample_router from views.auth_view import router as auth_router from views.sso_view import router as sso_router from views.knowledge_base_view import router as knowledge_base_router from views.snippet_view import router as snippet_router from views.tag_view import router as tag_router from views.search_engine_view import router as search_engine_router from views.image_view import router as image_router from views.dict_category_view import router as dict_category_router from views.dict_item_view import router as dict_item_router # 导入对外API路由 from app.api.v1.system_api_view import router as system_api_router # 导入现有API路由 # from app.api.v1.api_router import api_router @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" # 启动时执行 logger.info("=" * 60) logger.info("正在启动 LQAdminPlatform 服务...") logger.info("=" * 60) try: # 初始化数据库 await init_db() logger.info("✅ 数据库初始化成功") except Exception as e: logger.error(f"❌ 数据库初始化失败: {e}") try: # 初始化Redis await init_redis() logger.info("✅ Redis初始化成功") except Exception as e: logger.warning(f"⚠️ Redis初始化失败: {e}") logger.info("=" * 60) logger.info(f"🚀 服务启动成功!") logger.info(f"📍 服务地址: http://{config_handler.get('admin_app', 'HOST', '0.0.0.0')}:{config_handler.get_int('admin_app', 'PORT', 8000)}") logger.info(f"📚 API文档: http://{config_handler.get('admin_app', 'HOST', '0.0.0.0')}:{config_handler.get_int('admin_app', 'PORT', 8000)}/docs") logger.info("=" * 60) yield # 关闭时执行 logger.info("=" * 60) logger.info("正在关闭 LQAdminPlatform 服务...") logger.info("=" * 60) try: await close_db() logger.info("✅ 数据库连接已关闭") except Exception as e: logger.error(f"❌ 关闭数据库连接失败: {e}") try: await close_redis() logger.info("✅ Redis连接已关闭") except Exception as e: logger.warning(f"⚠️ 关闭Redis连接失败: {e}") logger.info("=" * 60) logger.info("👋 服务已关闭") logger.info("=" * 60) # 创建FastAPI应用实例 app = FastAPI( title=config_handler.get("admin_app", "APP_NAME", "后台管理"), version=config_handler.get("admin_app", "APP_VERSION", "1.0.0"), description="四川路桥样本中心", docs_url="/docs" if config_handler.get_bool("admin_app", "DEBUG", False) else None, redoc_url="/redoc" if config_handler.get_bool("admin_app", "DEBUG", False) else None, lifespan=lifespan ) # 配置CORS - 允许前端跨域访问 app.add_middleware( CORSMiddleware, allow_origins=[ "http://localhost:8000", "http://localhost:3000", "http://localhost:3001", "http://localhost:5173", # Vite默认端口 "http://localhost:8080", "http://127.0.0.1:3000", "http://127.0.0.1:3001", "http://127.0.0.1:5173", "http://127.0.0.1:8080", "*" # 临时允许所有,方便调试 ], allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"], allow_headers=["*"], expose_headers=["X-New-Token", "X-Token-Refreshed"], # 关键:暴露自定义响应头 ) # 添加Token刷新中间件 from app.middleware.token_refresh_middleware import TokenRefreshMiddleware app.add_middleware( TokenRefreshMiddleware, exclude_paths=[ "/auth/login", "/auth/captcha", "/auth/refresh", "/docs", "/openapi.json", "/redoc", "/favicon.ico", "/health", "/" ] ) logger.info("✅ Token刷新中间件已注册") # 添加响应处理中间件 - 用于在响应头中添加新token @app.middleware("http") async def add_new_token_to_response(request: Request, call_next): """ 响应处理中间件 如果request.state中有new_token,将其添加到响应头 """ logger.info(f">>> [响应中间件] 处理请求: {request.url.path}") response = await call_next(request) logger.info(f">>> [响应中间件] 请求处理完成,检查是否有新token") logger.info(f">>> [响应中间件] hasattr(request.state, 'new_token'): {hasattr(request.state, 'new_token')}") # 检查是否有新token需要返回 if hasattr(request.state, "new_token"): logger.info(f">>> [响应中间件] 发现新token,添加到响应头") response.headers["X-New-Token"] = request.state.new_token response.headers["X-Token-Refreshed"] = "true" logger.info(f">>> [响应中间件] 已在响应头中添加新token") logger.info(f">>> [响应中间件] X-New-Token前20字符: {request.state.new_token[:20]}...") else: logger.info(f">>> [响应中间件] 没有新token需要添加") return response logger.info("✅ Token响应处理中间件已注册") # --- 调试中间件 --- """ @app.middleware("http") async def log_requests(request: Request, call_next): # logger.info(f"收到请求: {request.method} {request.url}") try: response = await call_next(request) # logger.info(f"请求响应: {response.status_code}") return response except Exception as e: logger.error(f"请求处理异常: {e}") raise """ # ------------------ # 全局异常处理 @app.exception_handler(BaseAPIException) async def api_exception_handler(request: Request, exc: BaseAPIException): """处理自定义API异常""" logger.error(f"API异常: {exc.message} - {exc.details}") return JSONResponse( status_code=exc.status_code, content={ "code": exc.code, "message": exc.message, "data": exc.details, "timestamp": datetime.now(timezone.utc).isoformat() } ) @app.exception_handler(RequestValidationError) async def validation_exception_handler(request: Request, exc: RequestValidationError): """处理请求验证异常""" errors = [] for error in exc.errors(): errors.append({ "field": ".".join(str(loc) for loc in error["loc"]), "message": error["msg"], "type": error["type"] }) logger.warning(f"参数验证失败: {errors}") return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={ "code": "100001", "message": "参数验证失败", "data": {"errors": errors}, "timestamp": datetime.now(timezone.utc).isoformat() } ) @app.exception_handler(Exception) async def general_exception_handler(request: Request, exc: Exception): """处理通用异常""" logger.error(f"未处理的异常: {exc}", exc_info=True) return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={ "code": "500001", "message": "服务器内部错误" if not config_handler.get_bool("admin_app", "DEBUG", False) else str(exc), "data": None, "timestamp": datetime.now(timezone.utc).isoformat() } ) # 健康检查端点 @app.get("/health", tags=["系统"]) async def health_check(): """健康检查""" return ResponseSchema( code="000000", message="服务正常运行", data={ "status": "healthy", "version": config_handler.get("admin_app", "APP_VERSION", "1.0.0"), "timestamp": datetime.now(timezone.utc) } ) # 根路径 @app.get("/", tags=["系统"]) async def root(): """根路径""" return ResponseSchema( code="000000", message="欢迎使用 LQAdminPlatform", data={ "name": config_handler.get("admin_app", "APP_NAME", "后台管理"), "version": config_handler.get("admin_app", "APP_VERSION", "1.0.0"), "docs": "/docs" if config_handler.get_bool("admin_app", "DEBUG", False) else None, "modules": ["系统管理", "样本中心"] } ) # 包含API路由 # 现有的API路由(保持兼容) # app.include_router(api_router, prefix="/api/v1") # 对外API路由 app.include_router(system_api_router, prefix="/api/external/v1") # 新的模块化视图路由 app.include_router(system_router, prefix="/api/v1") app.include_router(auth_router, prefix="/api/v1") app.include_router(sso_router, prefix="") app.include_router(sample_router, prefix="/api/v1/sample") app.include_router(sample_router, prefix="/api") # 兼容外部测试脚本的路径 (/api/external/...) app.include_router(knowledge_base_router, prefix="/api/v1") app.include_router(snippet_router, prefix="/api/v1") app.include_router(tag_router, prefix="/api/v1") app.include_router(search_engine_router, prefix="/api/v1") app.include_router(image_router, prefix="/api/v1") app.include_router(dict_category_router, prefix="/api/v1") app.include_router(dict_item_router, prefix="/api/v1") def create_app() -> FastAPI: """创建并返回FastAPI应用实例""" return app if __name__ == "__main__": import uvicorn # 查找可用端口 import socket def check_port(port): """检查端口是否可用""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: s.bind(('localhost', port)) return True except OSError: return False def find_available_port(start_port=8000, max_port=8010): """查找可用端口""" for port in range(start_port, max_port + 1): if check_port(port): return port return None # 获取配置的端口或查找可用端口 port = config_handler.get_int("admin_app", "PORT", 8000) if not check_port(port): logger.warning(f"端口 {port} 已被占用,正在查找可用端口...") port = find_available_port(port, port + 10) if port: logger.info(f"找到可用端口: {port}") else: logger.error("未找到可用端口") sys.exit(1) # 启动服务器 uvicorn.run( "app.server.app:app", host=config_handler.get("admin_app", "HOST", "0.0.0.0"), port=port, reload=config_handler.get_bool("admin_app", "RELOAD", False), log_level=config_handler.get("admin_app", "LOG_LEVEL", "INFO").lower() )