""" 智创空间后端服务入口 """ import os import time import json import logging from logging.handlers import RotatingFileHandler from datetime import datetime from contextlib import asynccontextmanager from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from starlette.middleware.base import BaseHTTPMiddleware from starlette.responses import Response from app.routers import ( model_router, llm_router, auth_router, user_router, conversation_router, image_router, oss_router, audio_router, billing_router, invoice_router, video_router, audio_v2_router, toolbox_router, image_translation_router, ocr_router, tingwu_router, zhiwen_router, translation_router, research_router, local_model_router, platform_api_key_router, openai_compat_router, platform_stats_router, user_local_model_permission_router, password_strength_router ) from app.core.async_logger import async_log_queue from app.core.redis import redis_manager from app.middleware.rate_limit_middleware import RateLimitMiddleware from app.routers.photo_answer_router import router as photo_answer_router from app.routers.multimodal_translation_router import router as multimodal_translation_router from app.routers.admin_auth_router import router as admin_auth_router from app.routers.admin_user_router import router as admin_user_router from app.routers.admin_model_router import router as admin_model_router from app.routers.admin_log_router import router as admin_log_router from app.routers.admin_stats_router import router as admin_stats_router from app.routers.admin_order_router import router as admin_order_router from app.routers.admin_bill_router import router as admin_bill_router from app.routers.admin_config_router import router as admin_config_router from app.routers.admin_review_router import router as admin_review_router from app.routers.admin_consumption_router import router as admin_consumption_router from app.routers.admin_invoice_router import router as admin_invoice_router from app.routers.admin_local_model_router import router as admin_local_model_router from app.routers.sso_router import router as sso_router from app.routers.alipay_router import router as alipay_router from app.routers.admin_local_config_router import router as admin_local_config_router from app.routers.enterprise_auth_router import router as enterprise_auth_router from app.routers.enterprise_router import router as enterprise_router from app.routers.super_admin_router import router as super_admin_router from app.routers.enterprise_local_config_router import router as enterprise_local_config_router from app.middleware import register_exception_handlers from app.middleware.tenant_middleware import TenantMiddleware from app.database import engine, SessionLocal from app.services.user_service import UserService # ==================== 日志配置 ==================== # 创建 logs 目录 os.makedirs('logs', exist_ok=True) # 配置日志格式 log_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # 配置根日志记录器 root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) # 清除已有的 handlers,避免重复(reload 模式下会重复加载) if root_logger.handlers: root_logger.handlers.clear() # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(log_formatter) root_logger.addHandler(console_handler) # 文件处理器(所有日志) file_handler = RotatingFileHandler( 'logs/app.log', maxBytes=10*1024*1024, # 10MB backupCount=5, encoding='utf-8' ) file_handler.setFormatter(log_formatter) root_logger.addHandler(file_handler) # 错误日志文件处理器 error_handler = RotatingFileHandler( 'logs/error.log', maxBytes=10*1024*1024, # 10MB backupCount=5, encoding='utf-8' ) error_handler.setLevel(logging.ERROR) error_handler.setFormatter(log_formatter) root_logger.addHandler(error_handler) logger = logging.getLogger(__name__) logger.info("Logging configured: logs/app.log, logs/error.log") class RequestLogMiddleware(BaseHTTPMiddleware): """全局请求日志中间件""" async def dispatch(self, request: Request, call_next): start_time = time.time() # 尝试获取用户ID和用户名 user_id = None username = None auth_header = request.headers.get("Authorization") if auth_header and auth_header.startswith("Bearer "): try: from app.services.auth_service import AuthService from app.services.admin_auth_service import AdminAuthService token = auth_header[7:] # 先尝试普通用户token try: payload = AuthService.verify_token(token) user_id = payload.get("user_id") except: # 再尝试管理员token try: payload = AdminAuthService.verify_token(token) user_id = f"admin_{payload.get('admin_id')}" except: pass except: pass response = await call_next(request) duration_ms = int((time.time() - start_time) * 1000) # 记录到控制台 log_data = { "timestamp": datetime.now().isoformat(), "method": request.method, "path": str(request.url.path), "query_params": str(request.query_params) if request.query_params else None, "user_id": user_id or "anonymous", "status_code": response.status_code, "duration_ms": duration_ms } if response.status_code >= 400: logger.warning(f"Request: {json.dumps(log_data, ensure_ascii=False)}") else: logger.info(f"Request: {json.dumps(log_data, ensure_ascii=False)}") # 记录到异步队列(非阻塞,需求 6.1, 6.4) if user_id and not request.url.path.startswith(("/health", "/static", "/exports")): async_log_queue.enqueue({ "user_id": user_id, "api_path": str(request.url.path), "method": request.method, "status_code": response.status_code, "duration_ms": duration_ms, "request_params": dict(request.query_params) if request.query_params else None, "request_ip": request.client.host if request.client else None }) return response def init_admin_user(): """初始化管理员用户""" db = SessionLocal() try: user_service = UserService(db) user_service.init_admin_user() logger.info("管理员用户初始化完成") except Exception as e: logger.error(f"管理员用户初始化失败: {e}") finally: db.close() @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" logger.info("=" * 50) logger.info("智创空间后端服务启动中...") logger.info(f"数据库连接: {engine.url}") try: with engine.connect() as conn: logger.info("数据库连接成功") init_admin_user() # 初始化 Redis 连接(需求 4.1) redis_connected = await redis_manager.connect() if redis_connected: logger.info("Redis 连接已建立") else: logger.warning("Redis 连接失败,系统将以降级模式运行(无缓存、无分布式限流)") # 启动异步日志队列(需求 6.1) from app.core.async_database import AsyncSessionLocal await async_log_queue.start(AsyncSessionLocal) logger.info("异步日志队列已启动") # 启动定时任务 from apscheduler.schedulers.background import BackgroundScheduler # TODO: hourly_deduction_task 模块已移除,需要重新实现或确认是否需要 # from app.services.hourly_deduction_task import run_hourly_deduction scheduler = BackgroundScheduler() # 每小时整点执行定时扣减任务(已禁用) # scheduler.add_job( # run_hourly_deduction, # 'cron', # hour='*', # minute=0, # id='hourly_deduction', # name='每小时余额扣减任务' # ) # 异步消费同步任务:定期扫描 balance_log 并回填到 user_consumption try: from app.services.consumption_sync_service import run_sync_job scheduler.add_job( run_sync_job, 'interval', seconds=60, args=[SessionLocal], id='consumption_sync', name='消费同步任务' ) logger.info("定时任务已启动:消费同步任务每60秒执行一次") except Exception as _: logger.exception("注册消费同步任务失败") # 爬虫数据同步任务:每天凌晨3点执行 try: import asyncio from app.services.crawler_sync_service import sync_from_crawler def run_crawler_sync(): db = SessionLocal() try: asyncio.run(sync_from_crawler(db)) except Exception as e: logger.error(f"爬虫同步任务异常: {e}") finally: db.close() scheduler.add_job( run_crawler_sync, 'cron', hour=3, minute=0, id='crawler_sync', name='爬虫数据同步任务' ) logger.info("定时任务已启动:爬虫数据同步任务每天凌晨3点执行") # 启动时立即触发一次同步 import threading threading.Thread(target=run_crawler_sync, daemon=True, name="crawler_sync_startup").start() logger.info("已触发启动时爬虫数据同步") except Exception as _: logger.exception("注册爬虫同步任务失败") # 企业余额预警短信定时检查(每小时整点) try: import asyncio as _asyncio import concurrent.futures as _futures def run_balance_warn(): db = SessionLocal() try: from app.services.sms_service import tenant_warn_sms with _futures.ThreadPoolExecutor() as pool: pool.submit(_asyncio.run, tenant_warn_sms.check_and_warn(db)).result() except Exception as e: logger.error(f"余额预警短信任务异常: {e}") finally: db.close() scheduler.add_job( run_balance_warn, 'cron', hour='*', minute=30, id='balance_warn_sms', name='企业余额预警短信任务' ) logger.info("定时任务已启动:企业余额预警短信任务每小时30分执行") except Exception as _: logger.exception("注册余额预警短信任务失败") scheduler.start() except Exception as e: logger.error(f"数据库连接失败: {e}") logger.info("=" * 50) yield # 停止异步日志队列(需求 6.1) logger.info("正在停止异步日志队列...") await async_log_queue.stop() # 关闭 Redis 连接(需求 4.1) logger.info("正在关闭 Redis 连接...") await redis_manager.close() logger.info("服务关闭") app = FastAPI( title="智创空间API", description="智创空间后端服务,包含模型广场等模块", version="1.0.0", lifespan=lifespan, docs_url="/docs", redoc_url="/redoc" ) # 添加安全中间件 from app.middleware.security_middleware import SecurityMiddleware # 添加全局请求日志中间件 app.add_middleware(RequestLogMiddleware) # 添加租户上下文中间件 app.add_middleware(TenantMiddleware) # 添加安全中间件(在CORS之前添加) app.add_middleware(SecurityMiddleware) # 添加限流中间件(需求 5.1) app.add_middleware(RateLimitMiddleware) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) register_exception_handlers(app) # 注意:local_model_router 必须在 model_router 之前注册, # 因为 model_router 有 /{model_id} 路由会捕获 /local app.include_router(local_model_router) app.include_router(model_router) app.include_router(llm_router) app.include_router(auth_router) app.include_router(user_router) app.include_router(conversation_router) app.include_router(image_router) app.include_router(oss_router) app.include_router(audio_router) app.include_router(billing_router) app.include_router(invoice_router) app.include_router(video_router) app.include_router(audio_v2_router) app.include_router(toolbox_router) app.include_router(image_translation_router) app.include_router(ocr_router) app.include_router(tingwu_router) app.include_router(zhiwen_router) app.include_router(translation_router) app.include_router(research_router) app.include_router(photo_answer_router) app.include_router(multimodal_translation_router) app.include_router(platform_api_key_router) app.include_router(openai_compat_router) app.include_router(platform_stats_router) app.include_router(user_local_model_permission_router) app.include_router(password_strength_router) # 管理后台路由 app.include_router(admin_auth_router) app.include_router(admin_user_router) app.include_router(admin_model_router) app.include_router(admin_log_router) app.include_router(admin_stats_router) app.include_router(admin_order_router) app.include_router(admin_bill_router) app.include_router(admin_config_router) app.include_router(admin_review_router) app.include_router(admin_consumption_router) app.include_router(admin_invoice_router) app.include_router(admin_local_model_router) app.include_router(sso_router) app.include_router(alipay_router) app.include_router(admin_local_config_router) # 企业管理员路由 app.include_router(enterprise_auth_router) app.include_router(enterprise_router) from app.routers.enterprise_admin_user_router import router as enterprise_admin_user_router from app.routers.enterprise_admin_stats_router import router as enterprise_admin_stats_router from app.routers.enterprise_admin_proxy_router import router as enterprise_admin_proxy_router from app.routers.sms_router import router as sms_router app.include_router(enterprise_admin_user_router) app.include_router(enterprise_admin_stats_router) app.include_router(enterprise_admin_proxy_router) app.include_router(sms_router) # 超级管理后台路由 app.include_router(super_admin_router) app.include_router(enterprise_local_config_router) # 租户申请公开路由 from app.routers.tenant_apply_router import router as tenant_apply_router app.include_router(tenant_apply_router) # 公开品牌配置接口(无需登录,前端用于显示 logo/名称) @app.get("/api/public/branding") async def get_public_branding(request: Request): """返回当前租户的品牌配置(system_name, system_logo)""" from app.models.config import SystemConfig from app.database import SessionLocal import json as _json tenant_id = getattr(request.state, 'tenant_id', None) db = SessionLocal() try: def _get(key: str, default: str) -> str: row = db.query(SystemConfig).filter( SystemConfig.tenant_id == tenant_id, SystemConfig.config_key == key ).first() if row: try: return _json.loads(row.config_value) except Exception: return row.config_value # fallback 到全局配置 row = db.query(SystemConfig).filter( SystemConfig.tenant_id == None, SystemConfig.config_key == key ).first() if row: try: return _json.loads(row.config_value) except Exception: return row.config_value return default return { "system_name": _get("system_name", "智创空间"), "system_logo": _get("system_logo", ""), "icp_number": _get("icp_number", ""), } finally: db.close() @app.get("/health") async def health_check(): """基本健康检查 返回系统整体健康状态(healthy/degraded/unhealthy)。 需求引用: 8.1, 8.2, 8.3, 8.5 """ from app.services.health_service import health_service overall = await health_service.get_overall_health() return {"status": overall["status"]} @app.get("/health/detailed") async def health_check_detailed(): """详细健康检查 返回所有组件的详细状态信息,包括: - 数据库连接状态和连接池使用情况 - Redis 连接状态和内存使用情况 - 异步日志队列状态 需求引用: 8.1, 8.2, 8.3, 8.4, 8.5 """ from app.services.health_service import health_service return await health_service.get_overall_health() if __name__ == "__main__": import uvicorn host = os.getenv("APP_HOST", "0.0.0.0") port = int(os.getenv("APP_PORT", "8010")) debug = os.getenv("DEBUG", "False").lower() == "true" logger.info(f"启动开发服务器: http://{host}:{port}") # 配置 reload 参数 reload_config = {} if debug: # 只监控 app 目录,避免监控 logs reload_config = { "reload": True, "reload_dirs": ["app"], # 只监控 app 目录 "reload_includes": ["*.py"], # 只监控 Python 文件 } uvicorn.run( "main:app", host=host, port=port, log_level="info", **reload_config )