""" 智创空间后端服务入口 """ import os import time import json import logging from logging.handlers import RotatingFileHandler from datetime import datetime from contextlib import asynccontextmanager from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from starlette.middleware.base import BaseHTTPMiddleware from starlette.responses import Response from app.routers import ( model_router, auth_router, user_router, oss_router, local_model_router, platform_api_key_router, openai_compat_router, platform_stats_router, user_local_model_permission_router, password_strength_router ) from app.routers.admin_auth_router import router as admin_auth_router from app.routers.admin_user_router import router as admin_user_router from app.routers.admin_model_router import router as admin_model_router from app.routers.admin_log_router import router as admin_log_router from app.routers.admin_stats_router import router as admin_stats_router from app.routers.admin_config_router import router as admin_config_router from app.routers.admin_local_model_router import router as admin_local_model_router from app.routers.admin_oss_router import router as admin_oss_router from app.routers.oauth_sso_router import router as sso_router from app.routers.admin_local_config_router import router as admin_local_config_router from app.core.async_logger import async_log_queue from app.core.redis import redis_manager from app.middleware import register_exception_handlers from app.middleware.rate_limit_middleware import RateLimitMiddleware from app.database import engine, SessionLocal from app.services.user_service import UserService # ==================== 日志配置 ==================== # 创建 logs 目录 os.makedirs('logs', exist_ok=True) # 配置日志格式 log_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # 配置根日志记录器 root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) # 清除已有的 handlers,避免重复(reload 模式下会重复加载) if root_logger.handlers: root_logger.handlers.clear() # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(log_formatter) root_logger.addHandler(console_handler) # 文件处理器(所有日志) file_handler = RotatingFileHandler( 'logs/app.log', maxBytes=10*1024*1024, # 10MB backupCount=5, encoding='utf-8' ) file_handler.setFormatter(log_formatter) root_logger.addHandler(file_handler) # 错误日志文件处理器 error_handler = RotatingFileHandler( 'logs/error.log', maxBytes=10*1024*1024, # 10MB backupCount=5, encoding='utf-8' ) error_handler.setLevel(logging.ERROR) error_handler.setFormatter(log_formatter) root_logger.addHandler(error_handler) logger = logging.getLogger(__name__) # 抑制第三方库的冗余日志(OSS SDK、HTTP 客户端等) for _noisy_logger in ['oss2', 'aiohttp', 'urllib3', 'urllib3.connectionpool', 'httpcore', 'httpx']: logging.getLogger(_noisy_logger).setLevel(logging.ERROR) logger.info("Logging configured: logs/app.log, logs/error.log") class RequestLogMiddleware(BaseHTTPMiddleware): """全局请求日志中间件""" async def dispatch(self, request: Request, call_next): start_time = time.time() # 尝试获取用户ID和用户名 user_id = None username = None auth_header = request.headers.get("Authorization") if auth_header and auth_header.startswith("Bearer "): try: from app.services.auth_service import AuthService from app.services.admin_auth_service import AdminAuthService token = auth_header[7:] # 先尝试普通用户token try: payload = AuthService.verify_token(token) user_id = payload.get("user_id") except: # 再尝试管理员token try: payload = AdminAuthService.verify_token(token) user_id = f"admin_{payload.get('admin_id')}" except: pass except: pass response = await call_next(request) duration_ms = int((time.time() - start_time) * 1000) # 记录到控制台 log_data = { "timestamp": datetime.now().isoformat(), "method": request.method, "path": str(request.url.path), "query_params": str(request.query_params) if request.query_params else None, "user_id": user_id or "anonymous", "status_code": response.status_code, "duration_ms": duration_ms } if response.status_code >= 400: logger.warning(f"Request: {json.dumps(log_data, ensure_ascii=False)}") else: logger.info(f"Request: {json.dumps(log_data, ensure_ascii=False)}") # 记录到异步队列(非阻塞,需求 6.1, 6.4) if user_id and not request.url.path.startswith(("/health", "/static", "/exports")): async_log_queue.enqueue({ "user_id": user_id, "api_path": str(request.url.path), "method": request.method, "status_code": response.status_code, "duration_ms": duration_ms, "request_params": dict(request.query_params) if request.query_params else None, "request_ip": request.client.host if request.client else None }) return response def init_admin_user(): """初始化管理员用户""" db = SessionLocal() try: user_service = UserService(db) user_service.init_admin_user() logger.info("管理员用户初始化完成") except Exception as e: logger.error(f"管理员用户初始化失败: {e}") finally: db.close() @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" logger.info("=" * 50) logger.info("智创空间后端服务启动中...") logger.info(f"数据库连接: {engine.url}") try: with engine.connect() as conn: logger.info("数据库连接成功") init_admin_user() # 初始化 Redis 连接(需求 4.1) redis_connected = await redis_manager.connect() if redis_connected: logger.info("Redis 连接已建立") else: logger.warning("Redis 连接失败,系统将以降级模式运行(无缓存、无分布式限流)") # 启动异步日志队列(需求 6.1) from app.core.async_database import AsyncSessionLocal await async_log_queue.start(AsyncSessionLocal) logger.info("异步日志队列已启动") # 启动定时任务(仅第一个 worker 运行,避免多 worker 重复执行) _is_main_worker = os.environ.get("_SCHEDULER_STARTED") != "1" if _is_main_worker: os.environ["_SCHEDULER_STARTED"] = "1" from apscheduler.schedulers.background import BackgroundScheduler # TODO: hourly_deduction_task 模块已移除,需要重新实现或确认是否需要 # from app.services.hourly_deduction_task import run_hourly_deduction scheduler = BackgroundScheduler() # 每小时整点执行定时扣减任务(已禁用) # scheduler.add_job( # run_hourly_deduction, # 'cron', # hour='*', # minute=0, # id='hourly_deduction', # name='每小时余额扣减任务' # ) # 爬虫数据同步任务:每天凌晨3点执行 try: import asyncio from app.services.crawler_sync_service import sync_from_crawler def run_crawler_sync(): db = SessionLocal() try: asyncio.run(sync_from_crawler(db)) except Exception as e: logger.error(f"爬虫同步任务异常: {e}") finally: db.close() scheduler.add_job( run_crawler_sync, 'cron', hour=3, minute=0, id='crawler_sync', name='爬虫数据同步任务' ) logger.info("定时任务已启动:爬虫数据同步任务每天凌晨3点执行") # 启动时立即触发一次同步 import threading threading.Thread(target=run_crawler_sync, daemon=True, name="crawler_sync_startup").start() logger.info("已触发启动时爬虫数据同步") except Exception as _: logger.exception("注册爬虫同步任务失败") if _is_main_worker: scheduler.start() logger.info("定时任务调度器已启动(当前为主 worker)") else: logger.info("非主 worker,跳过定时任务调度器启动") except Exception as e: logger.error(f"数据库连接失败: {e}") logger.info("=" * 50) yield # 停止异步日志队列(需求 6.1) logger.info("正在停止异步日志队列...") await async_log_queue.stop() # 关闭 Redis 连接(需求 4.1) logger.info("正在关闭 Redis 连接...") await redis_manager.close() logger.info("服务关闭") app = FastAPI( title="智创空间API", description="智创空间后端服务,包含模型广场等模块", version="1.0.0", lifespan=lifespan, docs_url="/docs", redoc_url="/redoc" ) # 添加安全中间件 from app.middleware.security_middleware import SecurityMiddleware # 添加全局请求日志中间件 app.add_middleware(RequestLogMiddleware) # 添加安全中间件(在CORS之前添加) app.add_middleware(SecurityMiddleware) # 添加限流中间件(需求 5.1) app.add_middleware(RateLimitMiddleware) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) register_exception_handlers(app) # 注意:local_model_router 必须在 model_router 之前注册, # 因为 model_router 有 /{model_id} 路由会捕获 /local app.include_router(local_model_router) app.include_router(model_router) app.include_router(auth_router) app.include_router(user_router) app.include_router(oss_router) app.include_router(platform_api_key_router) app.include_router(openai_compat_router) app.include_router(platform_stats_router) app.include_router(user_local_model_permission_router) app.include_router(password_strength_router) # 管理后台路由 app.include_router(admin_auth_router) app.include_router(admin_user_router) app.include_router(admin_model_router) app.include_router(admin_log_router) app.include_router(admin_stats_router) app.include_router(admin_config_router) app.include_router(admin_local_model_router) app.include_router(admin_oss_router) app.include_router(sso_router) app.include_router(admin_local_config_router) # 短信验证码路由 from app.routers.sms_router import router as sms_router app.include_router(sms_router) # 邮箱验证码路由 from app.routers.email_router import router as email_router app.include_router(email_router) # 公开品牌配置接口(无需登录,前端用于显示 logo/名称) @app.get("/api/public/branding") async def get_public_branding(): """返回平台品牌配置(system_name, system_logo, icp_number)""" from app.models.config import SystemConfig from app.database import SessionLocal import json as _json db = SessionLocal() try: def _get(key: str, default: str) -> str: row = db.query(SystemConfig).filter(SystemConfig.config_key == key).first() if row: try: return _json.loads(row.config_value) except Exception: return row.config_value return default return { "system_name": _get("system_name", "智创空间"), "system_logo": _get("system_logo", ""), "icp_number": _get("icp_number", ""), } finally: db.close() @app.get("/health") async def health_check(): """基本健康检查 返回系统整体健康状态(healthy/degraded/unhealthy)。 需求引用: 8.1, 8.2, 8.3, 8.5 """ from app.services.health_service import health_service overall = await health_service.get_overall_health() return {"status": overall["status"]} @app.get("/health/detailed") async def health_check_detailed(): """详细健康检查 返回所有组件的详细状态信息,包括: - 数据库连接状态和连接池使用情况 - Redis 连接状态和内存使用情况 - 异步日志队列状态 需求引用: 8.1, 8.2, 8.3, 8.4, 8.5 """ from app.services.health_service import health_service return await health_service.get_overall_health() if __name__ == "__main__": import uvicorn host = os.getenv("APP_HOST", "0.0.0.0") port = int(os.getenv("APP_PORT", "8010")) debug = os.getenv("DEBUG", "False").lower() == "true" logger.info(f"启动开发服务器: http://{host}:{port}") # 配置 reload 参数 reload_config = {} if debug: # 只监控 app 目录,避免监控 logs reload_config = { "reload": True, "reload_dirs": ["app"], # 只监控 app 目录 "reload_includes": ["*.py"], # 只监控 Python 文件 } uvicorn.run( "main:app", host=host, port=port, log_level="info", **reload_config )