""" Hazard detection routes. """ from typing import Any, Dict, List, Optional import io import json import time import httpx from fastapi import APIRouter, Depends, Request from pydantic import BaseModel from sqlalchemy.orm import Session from PIL import Image, ImageDraw, ImageFont from database import get_db from models.scene import RecognitionRecord from services.oss_service import oss_service from services.yolo_service import yolo_service from utils.crypto import decrypt_url from utils.logger import logger router = APIRouter() class HazardRequest(BaseModel): """Compatible request model for old and new frontend payloads.""" image_url: Optional[str] = None image: Optional[str] = None scene_type: str = "" scene_name: str = "" user_name: str = "" username: str = "" user_account: str = "" account: str = "" date: str = "" class SaveStepRequest(BaseModel): """Save current step for a recognition record.""" record_id: int current_step: int SCENE_KEY_ALIASES = { "tunnel": "tunnel", "隧道": "tunnel", "隧道施工": "tunnel", "隧道工程": "tunnel", "simple_supported_bridge": "simple_supported_bridge", "bridge": "simple_supported_bridge", "桥梁": "simple_supported_bridge", "桥梁施工": "simple_supported_bridge", "桥梁工程": "simple_supported_bridge", "gas_station": "gas_station", "加油站": "gas_station", "special_equipment": "special_equipment", "特种设备": "special_equipment", "operate_highway": "operate_highway", "运营高速公路": "operate_highway", } SCENE_DISPLAY_NAMES = { "tunnel": "隧道工程", "simple_supported_bridge": "桥梁工程", "gas_station": "加油站", "special_equipment": "特种设备", "operate_highway": "运营高速公路", } def _get_user_code(user: Any) -> str: return ( getattr(user, "userCode", None) or getattr(user, "user_code", None) or getattr(user, "account", "") ) def _resolve_scene_key(scene_value: str) -> str: if not scene_value: return "" return SCENE_KEY_ALIASES.get(scene_value.strip(), scene_value.strip()) def _unique_ordered(items: List[str]) -> List[str]: seen = set() ordered = [] for item in items: if not item or item in seen: continue seen.add(item) ordered.append(item) return ordered def _build_frontend_result(hazards: List[Dict[str, Any]]) -> Dict[str, Any]: raw_labels: List[str] = [] element_hazards: Dict[str, List[str]] = {} detections: List[Dict[str, Any]] = [] for hazard in hazards: label = str(hazard.get("label") or "").strip() if not label: continue raw_labels.append(label) element_hazards.setdefault(label, []) if label not in element_hazards[label]: element_hazards[label].append(label) box = hazard.get("bbox") or hazard.get("box") or [] detections.append( { "label": label, "box": box, "bbox": box, "confidence": hazard.get("confidence", 0), } ) display_labels = _unique_ordered(raw_labels) return { "display_labels": display_labels, "labels": display_labels, "third_scenes": display_labels, "element_hazards": element_hazards, "detections": detections, } @router.post("/hazard") async def hazard( request: Request, data: HazardRequest, db: Session = Depends(get_db), ): """Run hazard detection and return a frontend-compatible payload.""" user = request.state.user if not user: return {"statusCode": 401, "msg": "未授权"} try: source_image_url = data.image_url or data.image if not source_image_url: return {"statusCode": 422, "msg": "image_url 不能为空"} scene_key = _resolve_scene_key(data.scene_type or data.scene_name) user_code = _get_user_code(user) user_name = ( data.user_name or data.username or getattr(user, "name", None) or getattr(user, "username", None) or getattr(user, "account", "") ) user_account = ( data.user_account or data.account or getattr(user, "account", "") ) try: real_image_url = decrypt_url(source_image_url) except Exception: real_image_url = source_image_url async with httpx.AsyncClient(timeout=30.0) as client: img_response = await client.get(real_image_url) img_response.raise_for_status() image_bytes = img_response.content yolo_result = await yolo_service.detect_hazards(real_image_url, scene_key) hazards = yolo_result.get("hazards", []) or [] hazard_count = len(hazards) frontend_result = _build_frontend_result(hazards) current_ts = int(time.time()) result_image_bytes = await _draw_boxes_and_watermark( image_bytes, hazards, user_name=user_name, user_account=user_account, ) result_filename = f"hazard_detection/{user_code}/{current_ts}.jpg" result_url = await oss_service.upload_bytes(result_image_bytes, result_filename) scene_display_name = SCENE_DISPLAY_NAMES.get(scene_key, scene_key or "隐患提示") record = RecognitionRecord( user_id=user_code, scene_type=scene_key, original_image_url=source_image_url, recognition_image_url=result_url, hazard_count=hazard_count, hazard_details=json.dumps(hazards, ensure_ascii=False), current_step=1, title=f"{scene_display_name}隐患提示", description=" ".join(frontend_result["third_scenes"]), labels=",".join(frontend_result["display_labels"]), tag_type=scene_key, created_at=current_ts, updated_at=current_ts, is_deleted=0, ) db.add(record) db.commit() db.refresh(record) return { "statusCode": 200, "msg": "识别成功", "data": { "record_id": record.id, "hazard_count": hazard_count, "hazards": hazards, "scene_name": scene_key, "annotated_image": result_url, "display_labels": frontend_result["display_labels"], "labels": frontend_result["labels"], "third_scenes": frontend_result["third_scenes"], "element_hazards": frontend_result["element_hazards"], "detections": frontend_result["detections"], "result_image_url": result_url, "original_image_url": source_image_url, }, } except httpx.HTTPError as e: logger.error(f"[hazard] 图片下载失败: {e}") db.rollback() return {"statusCode": 500, "msg": f"图片下载失败: {str(e)}"} except Exception as e: logger.error(f"[hazard] 处理异常: {e}") db.rollback() return {"statusCode": 500, "msg": f"处理失败: {str(e)}"} @router.post("/save_step") async def save_step( request: Request, data: SaveStepRequest, db: Session = Depends(get_db), ): """Update RecognitionRecord.current_step.""" user = request.state.user if not user: return {"statusCode": 401, "msg": "未授权"} try: affected = ( db.query(RecognitionRecord) .filter( RecognitionRecord.id == data.record_id, RecognitionRecord.user_id == _get_user_code(user), ) .update( { "current_step": data.current_step, "updated_at": int(time.time()), } ) ) if affected == 0: return {"statusCode": 404, "msg": "记录不存在"} db.commit() return { "statusCode": 200, "msg": "保存成功", "data": { "record_id": data.record_id, "current_step": data.current_step, }, } except Exception as e: logger.error(f"[save_step] 异常: {e}") db.rollback() return {"statusCode": 500, "msg": f"保存失败: {str(e)}"} async def _draw_boxes_and_watermark( image_bytes: bytes, hazards: List[Dict[str, Any]], user_name: str, user_account: str, ) -> bytes: """Draw detection boxes and a tiled watermark on the image.""" try: image = Image.open(io.BytesIO(image_bytes)).convert("RGBA") width, height = image.size overlay = Image.new("RGBA", (width, height), (255, 255, 255, 0)) draw = ImageDraw.Draw(overlay) try: font = ImageFont.truetype( "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 20 ) font_small = ImageFont.truetype( "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 14 ) except Exception: try: font = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 20) font_small = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 14) except Exception: font = ImageFont.load_default() font_small = ImageFont.load_default() for hazard in hazards: bbox = hazard.get("bbox", []) or hazard.get("box", []) label = hazard.get("label", "") confidence = hazard.get("confidence", 0) if len(bbox) == 4: x1, y1, x2, y2 = bbox draw.rectangle([x1, y1, x2, y2], outline=(255, 0, 0, 255), width=3) text = f"{label} {confidence:.2f}" draw.text( (x1, max(0, y1 - 25)), text, fill=(255, 0, 0, 255), font=font, ) current_date = time.strftime("%Y/%m/%d") watermarks = [user_name or "", user_account or "", current_date] watermarks = [text for text in watermarks if text] if not watermarks: watermarks = [current_date] text_height_estimate = 50 text_width_estimate = 150 angle = 45 watermark_layer = Image.new( "RGBA", (width * 2, height * 2), (255, 255, 255, 0) ) watermark_draw = ImageDraw.Draw(watermark_layer) for y in range(-height, height * 2, text_height_estimate): for x in range(-width, width * 2, text_width_estimate): row_index = int(y / text_height_estimate) % len(watermarks) watermark_draw.text( (x, y), watermarks[row_index], fill=(128, 128, 128, 60), font=font_small, ) watermark_layer = watermark_layer.rotate( angle, expand=False, fillcolor=(255, 255, 255, 0) ) crop_x = (watermark_layer.width - width) // 2 crop_y = (watermark_layer.height - height) // 2 watermark_layer = watermark_layer.crop( (crop_x, crop_y, crop_x + width, crop_y + height) ) image = Image.alpha_composite(image, watermark_layer) image = Image.alpha_composite(image, overlay) final_image = image.convert("RGB") output = io.BytesIO() final_image.save(output, format="JPEG", quality=95) return output.getvalue() except Exception as e: logger.error(f"[_draw_boxes_and_watermark] 图片处理失败: {e}") return image_bytes