| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380 |
- """
- Hazard detection routes.
- """
- from typing import Any, Dict, List, Optional
- import io
- import json
- import time
- import httpx
- from fastapi import APIRouter, Depends, Request
- from pydantic import BaseModel
- from sqlalchemy.orm import Session
- from PIL import Image, ImageDraw, ImageFont
- from database import get_db
- from models.scene import RecognitionRecord
- from services.oss_service import oss_service
- from services.yolo_service import yolo_service
- from utils.crypto import decrypt_url
- from utils.logger import logger
- router = APIRouter()
- class HazardRequest(BaseModel):
- """Compatible request model for old and new frontend payloads."""
- image_url: Optional[str] = None
- image: Optional[str] = None
- scene_type: str = ""
- scene_name: str = ""
- user_name: str = ""
- username: str = ""
- user_account: str = ""
- account: str = ""
- date: str = ""
- class SaveStepRequest(BaseModel):
- """Save current step for a recognition record."""
- record_id: int
- current_step: int
- SCENE_KEY_ALIASES = {
- "tunnel": "tunnel",
- "隧道": "tunnel",
- "隧道施工": "tunnel",
- "隧道工程": "tunnel",
- "simple_supported_bridge": "simple_supported_bridge",
- "bridge": "simple_supported_bridge",
- "桥梁": "simple_supported_bridge",
- "桥梁施工": "simple_supported_bridge",
- "桥梁工程": "simple_supported_bridge",
- "gas_station": "gas_station",
- "加油站": "gas_station",
- "special_equipment": "special_equipment",
- "特种设备": "special_equipment",
- "operate_highway": "operate_highway",
- "运营高速公路": "operate_highway",
- }
- SCENE_DISPLAY_NAMES = {
- "tunnel": "隧道工程",
- "simple_supported_bridge": "桥梁工程",
- "gas_station": "加油站",
- "special_equipment": "特种设备",
- "operate_highway": "运营高速公路",
- }
- def _get_user_code(user: Any) -> str:
- return (
- getattr(user, "userCode", None)
- or getattr(user, "user_code", None)
- or getattr(user, "account", "")
- )
- def _resolve_scene_key(scene_value: str) -> str:
- if not scene_value:
- return ""
- return SCENE_KEY_ALIASES.get(scene_value.strip(), scene_value.strip())
- def _unique_ordered(items: List[str]) -> List[str]:
- seen = set()
- ordered = []
- for item in items:
- if not item or item in seen:
- continue
- seen.add(item)
- ordered.append(item)
- return ordered
- def _build_frontend_result(hazards: List[Dict[str, Any]]) -> Dict[str, Any]:
- raw_labels: List[str] = []
- element_hazards: Dict[str, List[str]] = {}
- detections: List[Dict[str, Any]] = []
- for hazard in hazards:
- label = str(hazard.get("label") or "").strip()
- if not label:
- continue
- raw_labels.append(label)
- element_hazards.setdefault(label, [])
- if label not in element_hazards[label]:
- element_hazards[label].append(label)
- box = hazard.get("bbox") or hazard.get("box") or []
- detections.append(
- {
- "label": label,
- "box": box,
- "bbox": box,
- "confidence": hazard.get("confidence", 0),
- }
- )
- display_labels = _unique_ordered(raw_labels)
- return {
- "display_labels": display_labels,
- "labels": display_labels,
- "third_scenes": display_labels,
- "element_hazards": element_hazards,
- "detections": detections,
- }
- @router.post("/hazard")
- async def hazard(
- request: Request,
- data: HazardRequest,
- db: Session = Depends(get_db),
- ):
- """Run hazard detection and return a frontend-compatible payload."""
- user = request.state.user
- if not user:
- return {"statusCode": 401, "msg": "未授权"}
- try:
- source_image_url = data.image_url or data.image
- if not source_image_url:
- return {"statusCode": 422, "msg": "image_url 不能为空"}
- scene_key = _resolve_scene_key(data.scene_type or data.scene_name)
- user_code = _get_user_code(user)
- user_name = (
- data.user_name
- or data.username
- or getattr(user, "name", None)
- or getattr(user, "username", None)
- or getattr(user, "account", "")
- )
- user_account = (
- data.user_account
- or data.account
- or getattr(user, "account", "")
- )
- try:
- real_image_url = decrypt_url(source_image_url)
- except Exception:
- real_image_url = source_image_url
- async with httpx.AsyncClient(timeout=30.0) as client:
- img_response = await client.get(real_image_url)
- img_response.raise_for_status()
- image_bytes = img_response.content
- yolo_result = await yolo_service.detect_hazards(real_image_url, scene_key)
- hazards = yolo_result.get("hazards", []) or []
- hazard_count = len(hazards)
- frontend_result = _build_frontend_result(hazards)
- current_ts = int(time.time())
- result_image_bytes = await _draw_boxes_and_watermark(
- image_bytes,
- hazards,
- user_name=user_name,
- user_account=user_account,
- )
- result_filename = f"hazard_detection/{user_code}/{current_ts}.jpg"
- result_url = await oss_service.upload_bytes(result_image_bytes, result_filename)
- scene_display_name = SCENE_DISPLAY_NAMES.get(scene_key, scene_key or "隐患提示")
- record = RecognitionRecord(
- user_id=user_code,
- scene_type=scene_key,
- original_image_url=source_image_url,
- recognition_image_url=result_url,
- hazard_count=hazard_count,
- hazard_details=json.dumps(hazards, ensure_ascii=False),
- current_step=1,
- title=f"{scene_display_name}隐患提示",
- description=" ".join(frontend_result["third_scenes"]),
- labels=",".join(frontend_result["display_labels"]),
- tag_type=scene_key,
- created_at=current_ts,
- updated_at=current_ts,
- is_deleted=0,
- )
- db.add(record)
- db.commit()
- db.refresh(record)
- return {
- "statusCode": 200,
- "msg": "识别成功",
- "data": {
- "record_id": record.id,
- "hazard_count": hazard_count,
- "hazards": hazards,
- "scene_name": scene_key,
- "annotated_image": result_url,
- "display_labels": frontend_result["display_labels"],
- "labels": frontend_result["labels"],
- "third_scenes": frontend_result["third_scenes"],
- "element_hazards": frontend_result["element_hazards"],
- "detections": frontend_result["detections"],
- "result_image_url": result_url,
- "original_image_url": source_image_url,
- },
- }
- except httpx.HTTPError as e:
- logger.error(f"[hazard] 图片下载失败: {e}")
- db.rollback()
- return {"statusCode": 500, "msg": f"图片下载失败: {str(e)}"}
- except Exception as e:
- logger.error(f"[hazard] 处理异常: {e}")
- db.rollback()
- return {"statusCode": 500, "msg": f"处理失败: {str(e)}"}
- @router.post("/save_step")
- async def save_step(
- request: Request,
- data: SaveStepRequest,
- db: Session = Depends(get_db),
- ):
- """Update RecognitionRecord.current_step."""
- user = request.state.user
- if not user:
- return {"statusCode": 401, "msg": "未授权"}
- try:
- affected = (
- db.query(RecognitionRecord)
- .filter(
- RecognitionRecord.id == data.record_id,
- RecognitionRecord.user_id == _get_user_code(user),
- )
- .update(
- {
- "current_step": data.current_step,
- "updated_at": int(time.time()),
- }
- )
- )
- if affected == 0:
- return {"statusCode": 404, "msg": "记录不存在"}
- db.commit()
- return {
- "statusCode": 200,
- "msg": "保存成功",
- "data": {
- "record_id": data.record_id,
- "current_step": data.current_step,
- },
- }
- except Exception as e:
- logger.error(f"[save_step] 异常: {e}")
- db.rollback()
- return {"statusCode": 500, "msg": f"保存失败: {str(e)}"}
- async def _draw_boxes_and_watermark(
- image_bytes: bytes,
- hazards: List[Dict[str, Any]],
- user_name: str,
- user_account: str,
- ) -> bytes:
- """Draw detection boxes and a tiled watermark on the image."""
- try:
- image = Image.open(io.BytesIO(image_bytes)).convert("RGBA")
- width, height = image.size
- overlay = Image.new("RGBA", (width, height), (255, 255, 255, 0))
- draw = ImageDraw.Draw(overlay)
- try:
- font = ImageFont.truetype(
- "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 20
- )
- font_small = ImageFont.truetype(
- "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 14
- )
- except Exception:
- try:
- font = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 20)
- font_small = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 14)
- except Exception:
- font = ImageFont.load_default()
- font_small = ImageFont.load_default()
- for hazard in hazards:
- bbox = hazard.get("bbox", []) or hazard.get("box", [])
- label = hazard.get("label", "")
- confidence = hazard.get("confidence", 0)
- if len(bbox) == 4:
- x1, y1, x2, y2 = bbox
- draw.rectangle([x1, y1, x2, y2], outline=(255, 0, 0, 255), width=3)
- text = f"{label} {confidence:.2f}"
- draw.text(
- (x1, max(0, y1 - 25)),
- text,
- fill=(255, 0, 0, 255),
- font=font,
- )
- current_date = time.strftime("%Y/%m/%d")
- watermarks = [user_name or "", user_account or "", current_date]
- watermarks = [text for text in watermarks if text]
- if not watermarks:
- watermarks = [current_date]
- text_height_estimate = 50
- text_width_estimate = 150
- angle = 45
- watermark_layer = Image.new(
- "RGBA", (width * 2, height * 2), (255, 255, 255, 0)
- )
- watermark_draw = ImageDraw.Draw(watermark_layer)
- for y in range(-height, height * 2, text_height_estimate):
- for x in range(-width, width * 2, text_width_estimate):
- row_index = int(y / text_height_estimate) % len(watermarks)
- watermark_draw.text(
- (x, y),
- watermarks[row_index],
- fill=(128, 128, 128, 60),
- font=font_small,
- )
- watermark_layer = watermark_layer.rotate(
- angle, expand=False, fillcolor=(255, 255, 255, 0)
- )
- crop_x = (watermark_layer.width - width) // 2
- crop_y = (watermark_layer.height - height) // 2
- watermark_layer = watermark_layer.crop(
- (crop_x, crop_y, crop_x + width, crop_y + height)
- )
- image = Image.alpha_composite(image, watermark_layer)
- image = Image.alpha_composite(image, overlay)
- final_image = image.convert("RGB")
- output = io.BytesIO()
- final_image.save(output, format="JPEG", quality=95)
- return output.getvalue()
- except Exception as e:
- logger.error(f"[_draw_boxes_and_watermark] 图片处理失败: {e}")
- return image_bytes
|