| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- """
- 隐患识别路由
- """
- from fastapi import APIRouter, Depends, Request, File, UploadFile
- from sqlalchemy.orm import Session
- from pydantic import BaseModel
- from typing import Optional
- from database import get_db
- from models.scene import RecognitionRecord
- from services.yolo_service import yolo_service
- from services.oss_service import oss_service
- from utils.logger import logger
- from utils.crypto import decrypt_url
- from PIL import Image, ImageDraw, ImageFont
- import io
- import httpx
- import time
- import math
- import json
- router = APIRouter()
- class HazardRequest(BaseModel):
- """隐患识别请求"""
- image_url: str
- scene_type: str = ""
- user_name: str = ""
- user_account: str = ""
- class SaveStepRequest(BaseModel):
- """保存步骤请求"""
- record_id: int
- current_step: int
- @router.post("/hazard")
- async def hazard(
- request: Request,
- data: HazardRequest,
- db: Session = Depends(get_db)
- ):
- """
- 隐患识别接口
- 流程:
- 1. 从 OSS 代理 URL 解密获取真实 URL
- 2. 下载图片到内存
- 3. 调用 YOLO 服务识别
- 4. 绘制边界框 + 水印(用户名/账号/日期)
- 5. 上传结果图片到 OSS
- 6. 插入 RecognitionRecord
- 7. 返回结果
- """
- user = request.state.user
- if not user:
- return {"statusCode": 401, "msg": "未授权"}
-
- try:
- # 1. 解密 OSS URL
- try:
- real_image_url = decrypt_url(data.image_url)
- except:
- # 如果解密失败,可能是直接的 URL
- real_image_url = data.image_url
-
- # 2. 下载图片到内存
- async with httpx.AsyncClient(timeout=30.0) as client:
- img_response = await client.get(real_image_url)
- img_response.raise_for_status()
- image_bytes = img_response.content
-
- # 3. 调用 YOLO 服务识别
- # 先上传图片到临时位置,或者传递 URL
- yolo_result = await yolo_service.detect_hazards(real_image_url, data.scene_type)
-
- hazards = yolo_result.get("hazards", [])
- hazard_count = len(hazards)
-
- # 4. 绘制边界框和水印
- result_image_bytes = await _draw_boxes_and_watermark(
- image_bytes,
- hazards,
- user_name=data.user_name or user.account,
- user_account=user.account,
- )
-
- # 5. 上传结果图片到 OSS
- result_filename = f"hazard_detection/{user.user_id}/{int(time.time())}.jpg"
- result_url = await oss_service.upload_bytes(result_image_bytes, result_filename)
-
- # 6. 插入 RecognitionRecord
- record = RecognitionRecord(
- user_id=user.user_id,
- scene_type=data.scene_type,
- original_image_url=data.image_url,
- recognition_image_url=result_url,
- hazard_count=hazard_count,
- hazard_details=json.dumps(hazards, ensure_ascii=False),
- current_step=1,
- created_at=int(time.time()),
- updated_at=int(time.time()),
- is_deleted=0
- )
- db.add(record)
- db.commit()
- db.refresh(record)
-
- # 7. 返回结果
- return {
- "statusCode": 200,
- "msg": "识别成功",
- "data": {
- "record_id": record.id,
- "hazard_count": hazard_count,
- "hazards": hazards,
- "result_image_url": result_url,
- "original_image_url": data.image_url
- }
- }
-
- except httpx.HTTPError as e:
- logger.error(f"[hazard] 图片下载失败: {e}")
- return {"statusCode": 500, "msg": f"图片下载失败: {str(e)}"}
- except Exception as e:
- logger.error(f"[hazard] 处理异常: {e}")
- return {"statusCode": 500, "msg": f"处理失败: {str(e)}"}
- @router.post("/save_step")
- async def save_step(
- request: Request,
- data: SaveStepRequest,
- db: Session = Depends(get_db)
- ):
- """
- 保存识别步骤
- 更新 RecognitionRecord.current_step
- """
- user = request.state.user
- if not user:
- return {"statusCode": 401, "msg": "未授权"}
-
- try:
- # 更新步骤
- affected = db.query(RecognitionRecord).filter(
- RecognitionRecord.id == data.record_id,
- RecognitionRecord.user_id == user.user_id
- ).update({
- "current_step": data.current_step,
- "updated_at": int(time.time())
- })
-
- if affected == 0:
- return {"statusCode": 404, "msg": "记录不存在"}
-
- db.commit()
-
- return {
- "statusCode": 200,
- "msg": "保存成功",
- "data": {
- "record_id": data.record_id,
- "current_step": data.current_step
- }
- }
-
- except Exception as e:
- logger.error(f"[save_step] 异常: {e}")
- db.rollback()
- return {"statusCode": 500, "msg": f"保存失败: {str(e)}"}
- async def _draw_boxes_and_watermark(
- image_bytes: bytes,
- hazards: list,
- user_name: str,
- user_account: str
- ) -> bytes:
- """
- 在图片上绘制边界框和水印(对齐Go版本)
-
- 功能:
- 1. 绘制检测边界框
- 2. 添加45度角水印(用户名、账号、日期)
-
- Args:
- image_bytes: 原始图片字节
- hazards: YOLO 检测结果列表,每项包含 bbox, label, confidence
- user_name: 用户名
- user_account: 用户账号
-
- Returns:
- 处理后的图片字节
- """
- try:
- # 打开图片
- image = Image.open(io.BytesIO(image_bytes)).convert("RGBA")
- width, height = image.size
-
- # 创建透明图层用于绘制
- overlay = Image.new("RGBA", (width, height), (255, 255, 255, 0))
- draw = ImageDraw.Draw(overlay)
-
- # 尝试加载字体
- try:
- font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 20)
- font_small = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 14)
- except:
- try:
- # Windows字体路径
- font = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 20)
- font_small = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 14)
- except:
- font = ImageFont.load_default()
- font_small = ImageFont.load_default()
-
- # 1. 绘制边界框
- for hazard in hazards:
- bbox = hazard.get("bbox", [])
- label = hazard.get("label", "")
- confidence = hazard.get("confidence", 0)
-
- if len(bbox) == 4:
- x1, y1, x2, y2 = bbox
- # 绘制矩形框(红色)
- draw.rectangle([x1, y1, x2, y2], outline=(255, 0, 0, 255), width=3)
- # 绘制标签
- text = f"{label} {confidence:.2f}"
- draw.text((x1, max(0, y1 - 25)), text, fill=(255, 0, 0, 255), font=font)
-
- # 2. 添加45度角水印(对齐Go版本)
- current_date = time.strftime("%Y/%m/%d")
- watermarks = [user_name, user_account, current_date]
-
- # 水印参数
- text_height_estimate = 50
- text_width_estimate = 150
- angle = 45
-
- # 创建水印文本图层
- watermark_layer = Image.new("RGBA", (width * 2, height * 2), (255, 255, 255, 0))
- watermark_draw = ImageDraw.Draw(watermark_layer)
-
- # 45度角平铺水印
- for y in range(-height, height * 2, text_height_estimate):
- for x in range(-width, width * 2, text_width_estimate):
- # 计算当前行使用哪个水印文本
- row_index = int(y / text_height_estimate) % len(watermarks)
- text = watermarks[row_index]
-
- # 使用更深的灰色(对齐Go版本)
- watermark_draw.text(
- (x, y),
- text,
- fill=(128, 128, 128, 60), # 半透明灰色
- font=font_small
- )
-
- # 旋转水印层
- watermark_layer = watermark_layer.rotate(angle, expand=False, fillcolor=(255, 255, 255, 0))
-
- # 裁剪到原始尺寸
- crop_x = (watermark_layer.width - width) // 2
- crop_y = (watermark_layer.height - height) // 2
- watermark_layer = watermark_layer.crop((crop_x, crop_y, crop_x + width, crop_y + height))
-
- # 合并图层
- image = Image.alpha_composite(image, watermark_layer)
- image = Image.alpha_composite(image, overlay)
-
- # 转换为RGB并保存
- final_image = image.convert("RGB")
- output = io.BytesIO()
- final_image.save(output, format="JPEG", quality=95)
- return output.getvalue()
-
- except Exception as e:
- logger.error(f"[_draw_boxes_and_watermark] 图片处理失败: {e}")
- # 如果处理失败,返回原图
- return image_bytes
|