yolo_service.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. """
  2. YOLO 隐患识别服务
  3. """
  4. import base64
  5. import json
  6. from typing import Dict, Any
  7. import httpx
  8. from utils.config import settings
  9. from utils.logger import logger
  10. class YoloService:
  11. def __init__(self):
  12. self.base_url = settings.yolo.base_url.rstrip("/")
  13. async def detect_hazards(self, image_bytes: bytes, scene_type: str = "") -> Dict[str, Any]:
  14. """检测图片中的隐患。
  15. 对齐 Go 版本行为:
  16. - 调用 `/predict`
  17. - 请求字段为 `modeltype`、`image`、`conf_threshold`
  18. - `image` 传 base64 编码后的图片内容
  19. - 不输出模拟数据;失败直接抛错
  20. """
  21. detect_url = f"{self.base_url}/predict"
  22. if not image_bytes:
  23. raise ValueError("image_bytes 不能为空")
  24. image_base64 = base64.b64encode(image_bytes).decode("utf-8")
  25. logger.info(
  26. "[YOLO API] 开始请求检测接口: base_url=%s, detect_url=%s, scene_type=%s, image_bytes_len=%s, image_base64_len=%s",
  27. self.base_url,
  28. detect_url,
  29. scene_type or "<empty>",
  30. len(image_bytes),
  31. len(image_base64),
  32. )
  33. payload = {
  34. "modeltype": scene_type,
  35. "image": image_base64,
  36. "conf_threshold": 0.5,
  37. }
  38. payload_json = json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
  39. headers = {
  40. "Content-Type": "application/json",
  41. }
  42. logger.info("[YOLO API] 场景名称(modeltype): %s", json.dumps(payload["modeltype"], ensure_ascii=False))
  43. logger.info("[YOLO API] 置信度阈值(conf_threshold): %.2f", payload["conf_threshold"])
  44. logger.info("[YOLO API] base64前120字符: %s", image_base64[:120])
  45. logger.info("[YOLO API] YOLO请求JSON长度: %s", len(payload_json))
  46. logger.info("[YOLO API] YOLO请求JSON前1000字符: %s", payload_json[:1000])
  47. try:
  48. async with httpx.AsyncClient(timeout=30.0) as client:
  49. response = await client.post(
  50. detect_url,
  51. content=payload_json.encode("utf-8"),
  52. headers=headers,
  53. )
  54. response_text = response.text
  55. logger.info(
  56. "[YOLO API] 响应返回: status_code=%s, content_type=%s, request_url=%s, body_prefix=%s",
  57. response.status_code,
  58. response.headers.get("content-type", ""),
  59. detect_url,
  60. response_text[:2000],
  61. )
  62. if response.status_code != 200:
  63. raise RuntimeError(
  64. f"YOLO服务HTTP错误: {response.status_code}, body={response_text}"
  65. )
  66. result = response.json()
  67. if not isinstance(result, dict):
  68. raise RuntimeError(
  69. f"YOLO服务响应格式错误: {type(result).__name__}"
  70. )
  71. return result
  72. except httpx.TimeoutException as e:
  73. logger.error(
  74. "[YOLO API] 请求超时: base_url=%s, detect_url=%s, error=%s",
  75. self.base_url,
  76. detect_url,
  77. repr(e),
  78. )
  79. raise RuntimeError("YOLO服务请求超时") from e
  80. except httpx.RequestError as e:
  81. logger.error(
  82. "[YOLO API] 请求异常: base_url=%s, detect_url=%s, error=%s, type=%s",
  83. self.base_url,
  84. detect_url,
  85. str(e),
  86. type(e).__name__,
  87. )
  88. raise RuntimeError(f"YOLO服务请求失败: {type(e).__name__}") from e
  89. except ValueError as e:
  90. logger.error(
  91. "[YOLO API] 响应JSON解析失败: base_url=%s, detect_url=%s, error=%s",
  92. self.base_url,
  93. detect_url,
  94. str(e),
  95. )
  96. raise RuntimeError("YOLO服务响应解析失败") from e
  97. except Exception as e:
  98. logger.exception(
  99. "[YOLO API] 未知异常: base_url=%s, detect_url=%s, error=%s",
  100. self.base_url,
  101. detect_url,
  102. str(e),
  103. )
  104. raise RuntimeError("YOLO服务异常") from e
  105. # 全局实例
  106. yolo_service = YoloService()