""" 隐患识别路由 """ from fastapi import APIRouter, Depends, Request, File, UploadFile from sqlalchemy.orm import Session from pydantic import BaseModel from typing import Optional from database import get_db from models.scene import RecognitionRecord from services.yolo_service import yolo_service from services.oss_service import oss_service from utils.logger import logger from utils.crypto import decrypt_url from PIL import Image, ImageDraw, ImageFont import io import httpx import time import math import json router = APIRouter() class HazardRequest(BaseModel): """隐患识别请求""" image_url: str scene_type: str = "" user_name: str = "" user_account: str = "" class SaveStepRequest(BaseModel): """保存步骤请求""" record_id: int current_step: int @router.post("/hazard") async def hazard( request: Request, data: HazardRequest, db: Session = Depends(get_db) ): """ 隐患识别接口 流程: 1. 从 OSS 代理 URL 解密获取真实 URL 2. 下载图片到内存 3. 调用 YOLO 服务识别 4. 绘制边界框 + 水印(用户名/账号/日期) 5. 上传结果图片到 OSS 6. 插入 RecognitionRecord 7. 返回结果 """ user = request.state.user if not user: return {"statusCode": 401, "msg": "未授权"} try: # 1. 解密 OSS URL try: real_image_url = decrypt_url(data.image_url) except: # 如果解密失败,可能是直接的 URL real_image_url = data.image_url # 2. 下载图片到内存 async with httpx.AsyncClient(timeout=30.0) as client: img_response = await client.get(real_image_url) img_response.raise_for_status() image_bytes = img_response.content # 3. 调用 YOLO 服务识别 # 先上传图片到临时位置,或者传递 URL yolo_result = await yolo_service.detect_hazards(real_image_url, data.scene_type) hazards = yolo_result.get("hazards", []) hazard_count = len(hazards) # 4. 绘制边界框和水印 result_image_bytes = await _draw_boxes_and_watermark( image_bytes, hazards, user_name=data.user_name or user.account, user_account=user.account, ) # 5. 上传结果图片到 OSS result_filename = f"hazard_detection/{user.userCode}/{int(time.time())}.jpg" result_url = await oss_service.upload_bytes(result_image_bytes, result_filename) # 6. 插入 RecognitionRecord record = RecognitionRecord( user_id=user.userCode, scene_type=data.scene_type, original_image_url=data.image_url, recognition_image_url=result_url, hazard_count=hazard_count, hazard_details=json.dumps(hazards, ensure_ascii=False), current_step=1, created_at=int(time.time()), updated_at=int(time.time()), is_deleted=0 ) db.add(record) db.commit() db.refresh(record) # 7. 返回结果 return { "statusCode": 200, "msg": "识别成功", "data": { "record_id": record.id, "hazard_count": hazard_count, "hazards": hazards, "result_image_url": result_url, "original_image_url": data.image_url } } except httpx.HTTPError as e: logger.error(f"[hazard] 图片下载失败: {e}") return {"statusCode": 500, "msg": f"图片下载失败: {str(e)}"} except Exception as e: logger.error(f"[hazard] 处理异常: {e}") return {"statusCode": 500, "msg": f"处理失败: {str(e)}"} @router.post("/save_step") async def save_step( request: Request, data: SaveStepRequest, db: Session = Depends(get_db) ): """ 保存识别步骤 更新 RecognitionRecord.current_step """ user = request.state.user if not user: return {"statusCode": 401, "msg": "未授权"} try: # 更新步骤 affected = db.query(RecognitionRecord).filter( RecognitionRecord.id == data.record_id, RecognitionRecord.user_id == user.userCode ).update({ "current_step": data.current_step, "updated_at": int(time.time()) }) if affected == 0: return {"statusCode": 404, "msg": "记录不存在"} db.commit() return { "statusCode": 200, "msg": "保存成功", "data": { "record_id": data.record_id, "current_step": data.current_step } } except Exception as e: logger.error(f"[save_step] 异常: {e}") db.rollback() return {"statusCode": 500, "msg": f"保存失败: {str(e)}"} async def _draw_boxes_and_watermark( image_bytes: bytes, hazards: list, user_name: str, user_account: str ) -> bytes: """ 在图片上绘制边界框和水印(对齐Go版本) 功能: 1. 绘制检测边界框 2. 添加45度角水印(用户名、账号、日期) Args: image_bytes: 原始图片字节 hazards: YOLO 检测结果列表,每项包含 bbox, label, confidence user_name: 用户名 user_account: 用户账号 Returns: 处理后的图片字节 """ try: # 打开图片 image = Image.open(io.BytesIO(image_bytes)).convert("RGBA") width, height = image.size # 创建透明图层用于绘制 overlay = Image.new("RGBA", (width, height), (255, 255, 255, 0)) draw = ImageDraw.Draw(overlay) # 尝试加载字体 try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 20) font_small = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 14) except: try: # Windows字体路径 font = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 20) font_small = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 14) except: font = ImageFont.load_default() font_small = ImageFont.load_default() # 1. 绘制边界框 for hazard in hazards: bbox = hazard.get("bbox", []) label = hazard.get("label", "") confidence = hazard.get("confidence", 0) if len(bbox) == 4: x1, y1, x2, y2 = bbox # 绘制矩形框(红色) draw.rectangle([x1, y1, x2, y2], outline=(255, 0, 0, 255), width=3) # 绘制标签 text = f"{label} {confidence:.2f}" draw.text((x1, max(0, y1 - 25)), text, fill=(255, 0, 0, 255), font=font) # 2. 添加45度角水印(对齐Go版本) current_date = time.strftime("%Y/%m/%d") watermarks = [user_name, user_account, current_date] # 水印参数 text_height_estimate = 50 text_width_estimate = 150 angle = 45 # 创建水印文本图层 watermark_layer = Image.new("RGBA", (width * 2, height * 2), (255, 255, 255, 0)) watermark_draw = ImageDraw.Draw(watermark_layer) # 45度角平铺水印 for y in range(-height, height * 2, text_height_estimate): for x in range(-width, width * 2, text_width_estimate): # 计算当前行使用哪个水印文本 row_index = int(y / text_height_estimate) % len(watermarks) text = watermarks[row_index] # 使用更深的灰色(对齐Go版本) watermark_draw.text( (x, y), text, fill=(128, 128, 128, 60), # 半透明灰色 font=font_small ) # 旋转水印层 watermark_layer = watermark_layer.rotate(angle, expand=False, fillcolor=(255, 255, 255, 0)) # 裁剪到原始尺寸 crop_x = (watermark_layer.width - width) // 2 crop_y = (watermark_layer.height - height) // 2 watermark_layer = watermark_layer.crop((crop_x, crop_y, crop_x + width, crop_y + height)) # 合并图层 image = Image.alpha_composite(image, watermark_layer) image = Image.alpha_composite(image, overlay) # 转换为RGB并保存 final_image = image.convert("RGB") output = io.BytesIO() final_image.save(output, format="JPEG", quality=95) return output.getvalue() except Exception as e: logger.error(f"[_draw_boxes_and_watermark] 图片处理失败: {e}") # 如果处理失败,返回原图 return image_bytes