""" 应用工厂:创建和配置 FastAPI 应用实例。 """ import redis from fastapi import FastAPI from views import lifespan from foundation.infrastructure.config.config import config_handler from server.celery_manager import CeleryWorkerManager class ServerUtils: """Redis 连接工具。""" @staticmethod def get_redis_connection(): redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost') redis_port = config_handler.get('redis', 'REDIS_PORT', '6379') redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '') redis_db = config_handler.get('redis', 'REDIS_DB', '0') if redis_password: redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}' else: redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}' return redis.from_url(redis_url, decode_responses=True) class ApplicationFactory: """应用工厂。""" def __init__(self): self.server_utils = ServerUtils() self.celery_manager = CeleryWorkerManager(self.server_utils) def create_app(self) -> FastAPI: from server.routes import register_routes app = FastAPI( title="Agent API - 施工方案审查系统", version="0.3", description="Agent API - 集成施工方案审查功能", lifespan=lifespan, ) register_routes(app) return app def create_server_config(self) -> dict: port = config_handler.get('launch', 'LAUNCH_PORT', 8002) try: port = int(port) except (ValueError, TypeError): port = 8002 return { 'host': config_handler.get('launch', 'HOST', '0.0.0.0'), 'port': port, 'reload': False, 'with_celery': True, }