""" 健康检查服务模块 实现数据库、Redis、异步日志队列等组件的健康状态检查。 需求引用: 8.1, 8.2, 8.3, 8.5 """ from typing import Dict, Any, Optional from dataclasses import dataclass, field from enum import Enum import time import logging logger = logging.getLogger(__name__) class HealthStatus(Enum): """健康状态枚举""" HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" @dataclass class ComponentHealth: """组件健康状态数据类 Attributes: name: 组件名称 status: 健康状态 latency_ms: 检查延迟(毫秒) details: 详细信息字典 """ name: str status: HealthStatus latency_ms: float = 0 details: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """转换为字典格式""" return { "name": self.name, "status": self.status.value, "latency_ms": round(self.latency_ms, 2), "details": self.details or {} } class HealthService: """健康检查服务 提供各组件的健康状态检查功能: - 数据库连接状态和连接池使用情况 - Redis 连接状态和内存使用情况 - 异步日志队列状态 组件异常时返回 DEGRADED 状态而非完全失败, 确保健康检查端点始终可用。 """ async def check_database(self) -> ComponentHealth: """检查数据库连接 执行简单查询验证数据库连接,并返回连接池使用情况。 Returns: ComponentHealth: 数据库健康状态,包含连接池详情 """ from sqlalchemy import text from app.core.async_database import async_engine start = time.time() try: async with async_engine.connect() as conn: await conn.execute(text("SELECT 1")) latency = (time.time() - start) * 1000 # 获取连接池状态 pool = async_engine.pool pool_details = { "pool_size": pool.size(), "checked_out": pool.checkedout(), "overflow": pool.overflow(), "checked_in": pool.checkedin() } return ComponentHealth( name="database", status=HealthStatus.HEALTHY, latency_ms=latency, details=pool_details ) except Exception as e: latency = (time.time() - start) * 1000 logger.warning(f"数据库健康检查失败: {e}") return ComponentHealth( name="database", status=HealthStatus.UNHEALTHY, latency_ms=latency, details={"error": str(e)} ) async def check_redis(self) -> ComponentHealth: """检查 Redis 连接 执行 ping 命令验证 Redis 连接,并返回内存使用情况。 Redis 不可用时返回 DEGRADED 状态(系统可降级运行)。 Returns: ComponentHealth: Redis 健康状态,包含内存使用详情 """ from app.core.redis import redis_manager start = time.time() redis = redis_manager.get_client() if not redis: return ComponentHealth( name="redis", status=HealthStatus.DEGRADED, latency_ms=0, details={"error": "Redis 未连接,系统降级运行"} ) try: await redis.ping() latency = (time.time() - start) * 1000 # 获取内存信息 info = await redis.info("memory") memory_details = { "used_memory": info.get("used_memory_human", "unknown"), "used_memory_peak": info.get("used_memory_peak_human", "unknown"), "used_memory_lua": info.get("used_memory_lua_human", "unknown") } return ComponentHealth( name="redis", status=HealthStatus.HEALTHY, latency_ms=latency, details=memory_details ) except Exception as e: latency = (time.time() - start) * 1000 logger.warning(f"Redis 健康检查失败: {e}") return ComponentHealth( name="redis", status=HealthStatus.DEGRADED, latency_ms=latency, details={"error": str(e)} ) async def check_async_log_queue(self) -> ComponentHealth: """检查异步日志队列状态 返回队列的运行状态、待处理日志数量和溢出计数。 Returns: ComponentHealth: 异步日志队列健康状态 """ from app.core.async_logger import async_log_queue start = time.time() try: is_running = async_log_queue.is_running pending_count = async_log_queue.pending_count overflow_count = async_log_queue.overflow_count latency = (time.time() - start) * 1000 # 判断状态 if not is_running: status = HealthStatus.DEGRADED elif overflow_count > 0: # 有溢出但仍在运行,标记为降级 status = HealthStatus.DEGRADED else: status = HealthStatus.HEALTHY return ComponentHealth( name="async_log_queue", status=status, latency_ms=latency, details={ "is_running": is_running, "pending_count": pending_count, "overflow_count": overflow_count } ) except Exception as e: latency = (time.time() - start) * 1000 logger.warning(f"异步日志队列健康检查失败: {e}") return ComponentHealth( name="async_log_queue", status=HealthStatus.DEGRADED, latency_ms=latency, details={"error": str(e)} ) async def get_overall_health(self) -> Dict[str, Any]: """获取整体健康状态 检查所有组件并综合判断整体状态: - 所有组件 HEALTHY -> 整体 HEALTHY - 任一组件 DEGRADED -> 整体 DEGRADED - 任一组件 UNHEALTHY -> 整体 UNHEALTHY Returns: Dict[str, Any]: 整体健康状态,包含各组件详情 """ # 并发检查所有组件 import asyncio db_health, redis_health, log_queue_health = await asyncio.gather( self.check_database(), self.check_redis(), self.check_async_log_queue() ) components = [db_health, redis_health, log_queue_health] # 综合判断整体状态 overall_status = HealthStatus.HEALTHY for comp in components: if comp.status == HealthStatus.UNHEALTHY: overall_status = HealthStatus.UNHEALTHY break elif comp.status == HealthStatus.DEGRADED: overall_status = HealthStatus.DEGRADED return { "status": overall_status.value, "components": [comp.to_dict() for comp in components] } # 全局单例 health_service = HealthService()