| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- """
- 健康检查服务模块
- 实现数据库、Redis、异步日志队列等组件的健康状态检查。
- 需求引用: 8.1, 8.2, 8.3, 8.5
- """
- from typing import Dict, Any, Optional
- from dataclasses import dataclass, field
- from enum import Enum
- import time
- import logging
- logger = logging.getLogger(__name__)
- class HealthStatus(Enum):
- """健康状态枚举"""
- HEALTHY = "healthy"
- DEGRADED = "degraded"
- UNHEALTHY = "unhealthy"
- @dataclass
- class ComponentHealth:
- """组件健康状态数据类
-
- Attributes:
- name: 组件名称
- status: 健康状态
- latency_ms: 检查延迟(毫秒)
- details: 详细信息字典
- """
- name: str
- status: HealthStatus
- latency_ms: float = 0
- details: Dict[str, Any] = field(default_factory=dict)
-
- def to_dict(self) -> Dict[str, Any]:
- """转换为字典格式"""
- return {
- "name": self.name,
- "status": self.status.value,
- "latency_ms": round(self.latency_ms, 2),
- "details": self.details or {}
- }
- class HealthService:
- """健康检查服务
-
- 提供各组件的健康状态检查功能:
- - 数据库连接状态和连接池使用情况
- - Redis 连接状态和内存使用情况
- - 异步日志队列状态
-
- 组件异常时返回 DEGRADED 状态而非完全失败,
- 确保健康检查端点始终可用。
- """
-
- async def check_database(self) -> ComponentHealth:
- """检查数据库连接
-
- 执行简单查询验证数据库连接,并返回连接池使用情况。
-
- Returns:
- ComponentHealth: 数据库健康状态,包含连接池详情
- """
- from sqlalchemy import text
- from app.core.async_database import async_engine
-
- start = time.time()
- try:
- async with async_engine.connect() as conn:
- await conn.execute(text("SELECT 1"))
-
- latency = (time.time() - start) * 1000
-
- # 获取连接池状态
- pool = async_engine.pool
- pool_details = {
- "pool_size": pool.size(),
- "checked_out": pool.checkedout(),
- "overflow": pool.overflow(),
- "checked_in": pool.checkedin()
- }
-
- return ComponentHealth(
- name="database",
- status=HealthStatus.HEALTHY,
- latency_ms=latency,
- details=pool_details
- )
- except Exception as e:
- latency = (time.time() - start) * 1000
- logger.warning(f"数据库健康检查失败: {e}")
- return ComponentHealth(
- name="database",
- status=HealthStatus.UNHEALTHY,
- latency_ms=latency,
- details={"error": str(e)}
- )
-
- async def check_redis(self) -> ComponentHealth:
- """检查 Redis 连接
-
- 执行 ping 命令验证 Redis 连接,并返回内存使用情况。
- Redis 不可用时返回 DEGRADED 状态(系统可降级运行)。
-
- Returns:
- ComponentHealth: Redis 健康状态,包含内存使用详情
- """
- from app.core.redis import redis_manager
-
- start = time.time()
-
- redis = redis_manager.get_client()
- if not redis:
- return ComponentHealth(
- name="redis",
- status=HealthStatus.DEGRADED,
- latency_ms=0,
- details={"error": "Redis 未连接,系统降级运行"}
- )
-
- try:
- await redis.ping()
- latency = (time.time() - start) * 1000
-
- # 获取内存信息
- info = await redis.info("memory")
- memory_details = {
- "used_memory": info.get("used_memory_human", "unknown"),
- "used_memory_peak": info.get("used_memory_peak_human", "unknown"),
- "used_memory_lua": info.get("used_memory_lua_human", "unknown")
- }
-
- return ComponentHealth(
- name="redis",
- status=HealthStatus.HEALTHY,
- latency_ms=latency,
- details=memory_details
- )
- except Exception as e:
- latency = (time.time() - start) * 1000
- logger.warning(f"Redis 健康检查失败: {e}")
- return ComponentHealth(
- name="redis",
- status=HealthStatus.DEGRADED,
- latency_ms=latency,
- details={"error": str(e)}
- )
-
- async def check_async_log_queue(self) -> ComponentHealth:
- """检查异步日志队列状态
-
- 返回队列的运行状态、待处理日志数量和溢出计数。
-
- Returns:
- ComponentHealth: 异步日志队列健康状态
- """
- from app.core.async_logger import async_log_queue
-
- start = time.time()
-
- try:
- is_running = async_log_queue.is_running
- pending_count = async_log_queue.pending_count
- overflow_count = async_log_queue.overflow_count
-
- latency = (time.time() - start) * 1000
-
- # 判断状态
- if not is_running:
- status = HealthStatus.DEGRADED
- elif overflow_count > 0:
- # 有溢出但仍在运行,标记为降级
- status = HealthStatus.DEGRADED
- else:
- status = HealthStatus.HEALTHY
-
- return ComponentHealth(
- name="async_log_queue",
- status=status,
- latency_ms=latency,
- details={
- "is_running": is_running,
- "pending_count": pending_count,
- "overflow_count": overflow_count
- }
- )
- except Exception as e:
- latency = (time.time() - start) * 1000
- logger.warning(f"异步日志队列健康检查失败: {e}")
- return ComponentHealth(
- name="async_log_queue",
- status=HealthStatus.DEGRADED,
- latency_ms=latency,
- details={"error": str(e)}
- )
-
- async def get_overall_health(self) -> Dict[str, Any]:
- """获取整体健康状态
-
- 检查所有组件并综合判断整体状态:
- - 所有组件 HEALTHY -> 整体 HEALTHY
- - 任一组件 DEGRADED -> 整体 DEGRADED
- - 任一组件 UNHEALTHY -> 整体 UNHEALTHY
-
- Returns:
- Dict[str, Any]: 整体健康状态,包含各组件详情
- """
- # 并发检查所有组件
- import asyncio
-
- db_health, redis_health, log_queue_health = await asyncio.gather(
- self.check_database(),
- self.check_redis(),
- self.check_async_log_queue()
- )
-
- components = [db_health, redis_health, log_queue_health]
-
- # 综合判断整体状态
- overall_status = HealthStatus.HEALTHY
-
- for comp in components:
- if comp.status == HealthStatus.UNHEALTHY:
- overall_status = HealthStatus.UNHEALTHY
- break
- elif comp.status == HealthStatus.DEGRADED:
- overall_status = HealthStatus.DEGRADED
-
- return {
- "status": overall_status.value,
- "components": [comp.to_dict() for comp in components]
- }
- # 全局单例
- health_service = HealthService()
|