| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407 |
- """
- 智创空间后端服务入口
- """
- import os
- import time
- import json
- import logging
- from logging.handlers import RotatingFileHandler
- from datetime import datetime
- from contextlib import asynccontextmanager
- from fastapi import FastAPI, Request
- from fastapi.middleware.cors import CORSMiddleware
- from starlette.middleware.base import BaseHTTPMiddleware
- from starlette.responses import Response
- from app.routers import (
- model_router, auth_router, user_router,
- oss_router,
- local_model_router, platform_api_key_router, openai_compat_router, platform_stats_router,
- user_local_model_permission_router, password_strength_router
- )
- from app.routers.admin_auth_router import router as admin_auth_router
- from app.routers.admin_user_router import router as admin_user_router
- from app.routers.admin_model_router import router as admin_model_router
- from app.routers.admin_log_router import router as admin_log_router
- from app.routers.admin_stats_router import router as admin_stats_router
- from app.routers.admin_config_router import router as admin_config_router
- from app.routers.admin_local_model_router import router as admin_local_model_router
- from app.routers.admin_oss_router import router as admin_oss_router
- from app.routers.oauth_sso_router import router as sso_router
- from app.routers.admin_local_config_router import router as admin_local_config_router
- from app.core.async_logger import async_log_queue
- from app.core.redis import redis_manager
- from app.middleware import register_exception_handlers
- from app.middleware.rate_limit_middleware import RateLimitMiddleware
- from app.database import engine, SessionLocal
- from app.services.user_service import UserService
- # ==================== 日志配置 ====================
- # 创建 logs 目录
- os.makedirs('logs', exist_ok=True)
- # 配置日志格式
- log_formatter = logging.Formatter(
- '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- # 配置根日志记录器
- root_logger = logging.getLogger()
- root_logger.setLevel(logging.INFO)
- # 清除已有的 handlers,避免重复(reload 模式下会重复加载)
- if root_logger.handlers:
- root_logger.handlers.clear()
- # 控制台处理器
- console_handler = logging.StreamHandler()
- console_handler.setFormatter(log_formatter)
- root_logger.addHandler(console_handler)
- # 文件处理器(所有日志)
- file_handler = RotatingFileHandler(
- 'logs/app.log',
- maxBytes=10*1024*1024, # 10MB
- backupCount=5,
- encoding='utf-8'
- )
- file_handler.setFormatter(log_formatter)
- root_logger.addHandler(file_handler)
- # 错误日志文件处理器
- error_handler = RotatingFileHandler(
- 'logs/error.log',
- maxBytes=10*1024*1024, # 10MB
- backupCount=5,
- encoding='utf-8'
- )
- error_handler.setLevel(logging.ERROR)
- error_handler.setFormatter(log_formatter)
- root_logger.addHandler(error_handler)
- logger = logging.getLogger(__name__)
- # 抑制第三方库的冗余日志(OSS SDK、HTTP 客户端等)
- for _noisy_logger in ['oss2', 'aiohttp', 'urllib3', 'urllib3.connectionpool', 'httpcore', 'httpx']:
- logging.getLogger(_noisy_logger).setLevel(logging.ERROR)
- logger.info("Logging configured: logs/app.log, logs/error.log")
- class RequestLogMiddleware(BaseHTTPMiddleware):
- """全局请求日志中间件"""
-
- async def dispatch(self, request: Request, call_next):
- start_time = time.time()
-
- # 尝试获取用户ID和用户名
- user_id = None
- username = None
- auth_header = request.headers.get("Authorization")
- if auth_header and auth_header.startswith("Bearer "):
- try:
- from app.services.auth_service import AuthService
- from app.services.admin_auth_service import AdminAuthService
- token = auth_header[7:]
-
- # 先尝试普通用户token
- try:
- payload = AuthService.verify_token(token)
- user_id = payload.get("user_id")
- except:
- # 再尝试管理员token
- try:
- payload = AdminAuthService.verify_token(token)
- user_id = f"admin_{payload.get('admin_id')}"
- except:
- pass
- except:
- pass
-
- response = await call_next(request)
-
- duration_ms = int((time.time() - start_time) * 1000)
-
- # 记录到控制台
- log_data = {
- "timestamp": datetime.now().isoformat(),
- "method": request.method,
- "path": str(request.url.path),
- "query_params": str(request.query_params) if request.query_params else None,
- "user_id": user_id or "anonymous",
- "status_code": response.status_code,
- "duration_ms": duration_ms
- }
-
- if response.status_code >= 400:
- logger.warning(f"Request: {json.dumps(log_data, ensure_ascii=False)}")
- else:
- logger.info(f"Request: {json.dumps(log_data, ensure_ascii=False)}")
-
- # 记录到异步队列(非阻塞,需求 6.1, 6.4)
- if user_id and not request.url.path.startswith(("/health", "/static", "/exports")):
- async_log_queue.enqueue({
- "user_id": user_id,
- "api_path": str(request.url.path),
- "method": request.method,
- "status_code": response.status_code,
- "duration_ms": duration_ms,
- "request_params": dict(request.query_params) if request.query_params else None,
- "request_ip": request.client.host if request.client else None
- })
-
- return response
- def init_admin_user():
- """初始化管理员用户"""
- db = SessionLocal()
- try:
- user_service = UserService(db)
- user_service.init_admin_user()
- logger.info("管理员用户初始化完成")
- except Exception as e:
- logger.error(f"管理员用户初始化失败: {e}")
- finally:
- db.close()
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- """应用生命周期管理"""
- logger.info("=" * 50)
- logger.info("智创空间后端服务启动中...")
- logger.info(f"数据库连接: {engine.url}")
- try:
- with engine.connect() as conn:
- logger.info("数据库连接成功")
- init_admin_user()
-
- # 初始化 Redis 连接(需求 4.1)
- redis_connected = await redis_manager.connect()
- if redis_connected:
- logger.info("Redis 连接已建立")
- else:
- logger.warning("Redis 连接失败,系统将以降级模式运行(无缓存、无分布式限流)")
-
- # 启动异步日志队列(需求 6.1)
- from app.core.async_database import AsyncSessionLocal
- await async_log_queue.start(AsyncSessionLocal)
- logger.info("异步日志队列已启动")
-
- # 启动定时任务(仅第一个 worker 运行,避免多 worker 重复执行)
- _is_main_worker = os.environ.get("_SCHEDULER_STARTED") != "1"
- if _is_main_worker:
- os.environ["_SCHEDULER_STARTED"] = "1"
- from apscheduler.schedulers.background import BackgroundScheduler
- # TODO: hourly_deduction_task 模块已移除,需要重新实现或确认是否需要
- # from app.services.hourly_deduction_task import run_hourly_deduction
-
- scheduler = BackgroundScheduler()
- # 每小时整点执行定时扣减任务(已禁用)
- # scheduler.add_job(
- # run_hourly_deduction,
- # 'cron',
- # hour='*',
- # minute=0,
- # id='hourly_deduction',
- # name='每小时余额扣减任务'
- # )
- # 爬虫数据同步任务:每天凌晨3点执行
- try:
- import asyncio
- from app.services.crawler_sync_service import sync_from_crawler
- def run_crawler_sync():
- db = SessionLocal()
- try:
- asyncio.run(sync_from_crawler(db))
- except Exception as e:
- logger.error(f"爬虫同步任务异常: {e}")
- finally:
- db.close()
- scheduler.add_job(
- run_crawler_sync,
- 'cron',
- hour=3,
- minute=0,
- id='crawler_sync',
- name='爬虫数据同步任务'
- )
- logger.info("定时任务已启动:爬虫数据同步任务每天凌晨3点执行")
- # 启动时立即触发一次同步
- import threading
- threading.Thread(target=run_crawler_sync, daemon=True, name="crawler_sync_startup").start()
- logger.info("已触发启动时爬虫数据同步")
- except Exception as _:
- logger.exception("注册爬虫同步任务失败")
- if _is_main_worker:
- scheduler.start()
- logger.info("定时任务调度器已启动(当前为主 worker)")
- else:
- logger.info("非主 worker,跳过定时任务调度器启动")
-
- except Exception as e:
- logger.error(f"数据库连接失败: {e}")
- logger.info("=" * 50)
- yield
-
- # 停止异步日志队列(需求 6.1)
- logger.info("正在停止异步日志队列...")
- await async_log_queue.stop()
-
- # 关闭 Redis 连接(需求 4.1)
- logger.info("正在关闭 Redis 连接...")
- await redis_manager.close()
-
- logger.info("服务关闭")
- app = FastAPI(
- title="智创空间API",
- description="智创空间后端服务,包含模型广场等模块",
- version="1.0.0",
- lifespan=lifespan,
- docs_url="/docs",
- redoc_url="/redoc"
- )
- # 添加安全中间件
- from app.middleware.security_middleware import SecurityMiddleware
- # 添加全局请求日志中间件
- app.add_middleware(RequestLogMiddleware)
- # 添加安全中间件(在CORS之前添加)
- app.add_middleware(SecurityMiddleware)
- # 添加限流中间件(需求 5.1)
- app.add_middleware(RateLimitMiddleware)
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- register_exception_handlers(app)
- # 注意:local_model_router 必须在 model_router 之前注册,
- # 因为 model_router 有 /{model_id} 路由会捕获 /local
- app.include_router(local_model_router)
- app.include_router(model_router)
- app.include_router(auth_router)
- app.include_router(user_router)
- app.include_router(oss_router)
- app.include_router(platform_api_key_router)
- app.include_router(openai_compat_router)
- app.include_router(platform_stats_router)
- app.include_router(user_local_model_permission_router)
- app.include_router(password_strength_router)
- # 管理后台路由
- app.include_router(admin_auth_router)
- app.include_router(admin_user_router)
- app.include_router(admin_model_router)
- app.include_router(admin_log_router)
- app.include_router(admin_stats_router)
- app.include_router(admin_config_router)
- app.include_router(admin_local_model_router)
- app.include_router(admin_oss_router)
- app.include_router(sso_router)
- app.include_router(admin_local_config_router)
- # 短信验证码路由
- from app.routers.sms_router import router as sms_router
- app.include_router(sms_router)
- # 邮箱验证码路由
- from app.routers.email_router import router as email_router
- app.include_router(email_router)
- # 公开品牌配置接口(无需登录,前端用于显示 logo/名称)
- @app.get("/api/public/branding")
- async def get_public_branding():
- """返回平台品牌配置(system_name, system_logo, icp_number)"""
- from app.models.config import SystemConfig
- from app.database import SessionLocal
- import json as _json
- db = SessionLocal()
- try:
- def _get(key: str, default: str) -> str:
- row = db.query(SystemConfig).filter(SystemConfig.config_key == key).first()
- if row:
- try:
- return _json.loads(row.config_value)
- except Exception:
- return row.config_value
- return default
- return {
- "system_name": _get("system_name", "智创空间"),
- "system_logo": _get("system_logo", ""),
- "icp_number": _get("icp_number", ""),
- }
- finally:
- db.close()
- @app.get("/health")
- async def health_check():
- """基本健康检查
-
- 返回系统整体健康状态(healthy/degraded/unhealthy)。
- 需求引用: 8.1, 8.2, 8.3, 8.5
- """
- from app.services.health_service import health_service
- overall = await health_service.get_overall_health()
- return {"status": overall["status"]}
- @app.get("/health/detailed")
- async def health_check_detailed():
- """详细健康检查
-
- 返回所有组件的详细状态信息,包括:
- - 数据库连接状态和连接池使用情况
- - Redis 连接状态和内存使用情况
- - 异步日志队列状态
-
- 需求引用: 8.1, 8.2, 8.3, 8.4, 8.5
- """
- from app.services.health_service import health_service
- return await health_service.get_overall_health()
- if __name__ == "__main__":
- import uvicorn
- host = os.getenv("APP_HOST", "0.0.0.0")
- port = int(os.getenv("APP_PORT", "8010"))
- debug = os.getenv("DEBUG", "False").lower() == "true"
-
- logger.info(f"启动开发服务器: http://{host}:{port}")
-
- # 配置 reload 参数
- reload_config = {}
- if debug:
- # 只监控 app 目录,避免监控 logs
- reload_config = {
- "reload": True,
- "reload_dirs": ["app"], # 只监控 app 目录
- "reload_includes": ["*.py"], # 只监控 Python 文件
- }
-
- uvicorn.run(
- "main:app",
- host=host,
- port=port,
- log_level="info",
- **reload_config
- )
|