""" YOLO 隐患识别服务 """ import httpx from typing import Dict, Any from utils.config import settings from utils.logger import logger class YoloService: def __init__(self): self.base_url = settings.yolo.base_url async def detect_hazards(self, image_url: str, scene_type: str = "") -> Dict[str, Any]: """检测图片中的隐患""" try: data = { "image_url": image_url, "scene_type": scene_type } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( f"{self.base_url}/detect", json=data ) response.raise_for_status() return response.json() except httpx.HTTPError as e: logger.warning(f"YOLO API 调用失败,返回模拟数据: {e}") # 返回模拟数据 return { "hazards": [], "count": 0, "message": "YOLO服务暂时不可用,返回模拟数据" } except Exception as e: logger.error(f"YOLO 服务异常: {e}") return { "hazards": [], "count": 0, "message": "识别服务异常" } # 全局实例 yolo_service = YoloService()