health_service.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. """
  2. 健康检查服务模块
  3. 实现数据库、Redis、异步日志队列等组件的健康状态检查。
  4. 需求引用: 8.1, 8.2, 8.3, 8.5
  5. """
  6. from typing import Dict, Any, Optional
  7. from dataclasses import dataclass, field
  8. from enum import Enum
  9. import time
  10. import logging
  11. logger = logging.getLogger(__name__)
  12. class HealthStatus(Enum):
  13. """健康状态枚举"""
  14. HEALTHY = "healthy"
  15. DEGRADED = "degraded"
  16. UNHEALTHY = "unhealthy"
  17. @dataclass
  18. class ComponentHealth:
  19. """组件健康状态数据类
  20. Attributes:
  21. name: 组件名称
  22. status: 健康状态
  23. latency_ms: 检查延迟(毫秒)
  24. details: 详细信息字典
  25. """
  26. name: str
  27. status: HealthStatus
  28. latency_ms: float = 0
  29. details: Dict[str, Any] = field(default_factory=dict)
  30. def to_dict(self) -> Dict[str, Any]:
  31. """转换为字典格式"""
  32. return {
  33. "name": self.name,
  34. "status": self.status.value,
  35. "latency_ms": round(self.latency_ms, 2),
  36. "details": self.details or {}
  37. }
  38. class HealthService:
  39. """健康检查服务
  40. 提供各组件的健康状态检查功能:
  41. - 数据库连接状态和连接池使用情况
  42. - Redis 连接状态和内存使用情况
  43. - 异步日志队列状态
  44. 组件异常时返回 DEGRADED 状态而非完全失败,
  45. 确保健康检查端点始终可用。
  46. """
  47. async def check_database(self) -> ComponentHealth:
  48. """检查数据库连接
  49. 执行简单查询验证数据库连接,并返回连接池使用情况。
  50. Returns:
  51. ComponentHealth: 数据库健康状态,包含连接池详情
  52. """
  53. from sqlalchemy import text
  54. from app.core.async_database import async_engine
  55. start = time.time()
  56. try:
  57. async with async_engine.connect() as conn:
  58. await conn.execute(text("SELECT 1"))
  59. latency = (time.time() - start) * 1000
  60. # 获取连接池状态
  61. pool = async_engine.pool
  62. pool_details = {
  63. "pool_size": pool.size(),
  64. "checked_out": pool.checkedout(),
  65. "overflow": pool.overflow(),
  66. "checked_in": pool.checkedin()
  67. }
  68. return ComponentHealth(
  69. name="database",
  70. status=HealthStatus.HEALTHY,
  71. latency_ms=latency,
  72. details=pool_details
  73. )
  74. except Exception as e:
  75. latency = (time.time() - start) * 1000
  76. logger.warning(f"数据库健康检查失败: {e}")
  77. return ComponentHealth(
  78. name="database",
  79. status=HealthStatus.UNHEALTHY,
  80. latency_ms=latency,
  81. details={"error": str(e)}
  82. )
  83. async def check_redis(self) -> ComponentHealth:
  84. """检查 Redis 连接
  85. 执行 ping 命令验证 Redis 连接,并返回内存使用情况。
  86. Redis 不可用时返回 DEGRADED 状态(系统可降级运行)。
  87. Returns:
  88. ComponentHealth: Redis 健康状态,包含内存使用详情
  89. """
  90. from app.core.redis import redis_manager
  91. start = time.time()
  92. redis = redis_manager.get_client()
  93. if not redis:
  94. return ComponentHealth(
  95. name="redis",
  96. status=HealthStatus.DEGRADED,
  97. latency_ms=0,
  98. details={"error": "Redis 未连接,系统降级运行"}
  99. )
  100. try:
  101. await redis.ping()
  102. latency = (time.time() - start) * 1000
  103. # 获取内存信息
  104. info = await redis.info("memory")
  105. memory_details = {
  106. "used_memory": info.get("used_memory_human", "unknown"),
  107. "used_memory_peak": info.get("used_memory_peak_human", "unknown"),
  108. "used_memory_lua": info.get("used_memory_lua_human", "unknown")
  109. }
  110. return ComponentHealth(
  111. name="redis",
  112. status=HealthStatus.HEALTHY,
  113. latency_ms=latency,
  114. details=memory_details
  115. )
  116. except Exception as e:
  117. latency = (time.time() - start) * 1000
  118. logger.warning(f"Redis 健康检查失败: {e}")
  119. return ComponentHealth(
  120. name="redis",
  121. status=HealthStatus.DEGRADED,
  122. latency_ms=latency,
  123. details={"error": str(e)}
  124. )
  125. async def check_async_log_queue(self) -> ComponentHealth:
  126. """检查异步日志队列状态
  127. 返回队列的运行状态、待处理日志数量和溢出计数。
  128. Returns:
  129. ComponentHealth: 异步日志队列健康状态
  130. """
  131. from app.core.async_logger import async_log_queue
  132. start = time.time()
  133. try:
  134. is_running = async_log_queue.is_running
  135. pending_count = async_log_queue.pending_count
  136. overflow_count = async_log_queue.overflow_count
  137. latency = (time.time() - start) * 1000
  138. # 判断状态
  139. if not is_running:
  140. status = HealthStatus.DEGRADED
  141. elif overflow_count > 0:
  142. # 有溢出但仍在运行,标记为降级
  143. status = HealthStatus.DEGRADED
  144. else:
  145. status = HealthStatus.HEALTHY
  146. return ComponentHealth(
  147. name="async_log_queue",
  148. status=status,
  149. latency_ms=latency,
  150. details={
  151. "is_running": is_running,
  152. "pending_count": pending_count,
  153. "overflow_count": overflow_count
  154. }
  155. )
  156. except Exception as e:
  157. latency = (time.time() - start) * 1000
  158. logger.warning(f"异步日志队列健康检查失败: {e}")
  159. return ComponentHealth(
  160. name="async_log_queue",
  161. status=HealthStatus.DEGRADED,
  162. latency_ms=latency,
  163. details={"error": str(e)}
  164. )
  165. async def get_overall_health(self) -> Dict[str, Any]:
  166. """获取整体健康状态
  167. 检查所有组件并综合判断整体状态:
  168. - 所有组件 HEALTHY -> 整体 HEALTHY
  169. - 任一组件 DEGRADED -> 整体 DEGRADED
  170. - 任一组件 UNHEALTHY -> 整体 UNHEALTHY
  171. Returns:
  172. Dict[str, Any]: 整体健康状态,包含各组件详情
  173. """
  174. # 并发检查所有组件
  175. import asyncio
  176. db_health, redis_health, log_queue_health = await asyncio.gather(
  177. self.check_database(),
  178. self.check_redis(),
  179. self.check_async_log_queue()
  180. )
  181. components = [db_health, redis_health, log_queue_health]
  182. # 综合判断整体状态
  183. overall_status = HealthStatus.HEALTHY
  184. for comp in components:
  185. if comp.status == HealthStatus.UNHEALTHY:
  186. overall_status = HealthStatus.UNHEALTHY
  187. break
  188. elif comp.status == HealthStatus.DEGRADED:
  189. overall_status = HealthStatus.DEGRADED
  190. return {
  191. "status": overall_status.value,
  192. "components": [comp.to_dict() for comp in components]
  193. }
  194. # 全局单例
  195. health_service = HealthService()