""" Hazard detection routes. """ from typing import Any, Dict, List, Optional import io import json import time import traceback from urllib.parse import urlparse, parse_qs import httpx from fastapi import APIRouter, Depends, Request from pydantic import BaseModel, Field from sqlalchemy.orm import Session from PIL import Image, ImageDraw, ImageFont from database import get_db from models.scene import RecognitionRecord, Scene, SecondScene, ThirdScene from services.oss_service import oss_service from services.yolo_service import yolo_service from utils.crypto import decrypt_url from utils.label_translator import translate_labels, translate_scenes_for_query from utils.logger import logger router = APIRouter() class HazardRequest(BaseModel): """兼容前端与 Go 版请求结构。""" scene_name: str = "" scene_type: str = "" image: str = "" image_url: str = "" date: str = "" title: str = "" description: str = "" labels: str = "" current_step: Optional[int] = None hazard_count: Optional[int] = None third_scene: str = "" second_scene: str = "" recognition_image_url: str = "" original_image_url: str = "" tag_type: str = "" hazard_details: Any = Field(default_factory=dict) save_history: bool = True @property def source_image(self) -> str: return self.image or self.image_url or self.original_image_url or "" @property def source_scene(self) -> str: return self.scene_name or self.scene_type or self.tag_type class SaveStepRequest(BaseModel): """Save current step for a recognition record.""" record_id: int current_step: int SCENE_KEY_ALIASES = { "tunnel": "tunnel", "隧道": "tunnel", "隧道工程": "tunnel", "simple_supported_bridge": "simple_supported_bridge", "bridge": "simple_supported_bridge", "桥梁": "simple_supported_bridge", "桥梁工程": "simple_supported_bridge", "gas_station": "gas_station", "加油站": "gas_station", "special_equipment": "special_equipment", "特种设备": "special_equipment", "operate_highway": "operate_highway", "运营高速公路": "operate_highway", } SCENE_DISPLAY_NAMES = { "tunnel": "隧道工程", "simple_supported_bridge": "桥梁工程", "gas_station": "加油站", "special_equipment": "特种设备", "operate_highway": "运营高速公路", } DEFAULT_SCENE_KEY = "tunnel" def _get_user_code(user: Any) -> str: return ( getattr(user, "userCode", None) or getattr(user, "user_code", None) or getattr(user, "account", "") ) def _resolve_scene_key(scene_value: str) -> str: if not scene_value: return "" return SCENE_KEY_ALIASES.get(scene_value.strip(), scene_value.strip()) def _scene_exists(db: Session, scene_key: str) -> bool: if not scene_key: return False return ( db.query(Scene) .filter(Scene.scene_en_name == scene_key, Scene.is_deleted == 0) .first() is not None ) def _normalize_label_for_scene(label: str, scene_name: str) -> str: if _should_trim_scene_label_prefix(scene_name): return _process_highway_label(label) return label def _process_highway_label(label: str) -> str: underscore_index = label.find("_") if underscore_index == -1: return label return label[underscore_index + 1 :] def _should_trim_scene_label_prefix(scene_name: str) -> bool: if not scene_name: return False return any( keyword in scene_name for keyword in ( "运营高速公路", "operate_highway", "隧道", "tunnel", "简支梁", "桥梁", "simple_supported_bridge", ) ) def _remove_duplicate_labels(labels: List[str]) -> str: unique_labels: List[str] = [] seen = set() for label in labels: label = str(label).strip() if label and label not in seen: seen.add(label) unique_labels.append(label) return ", ".join(unique_labels) def _dedupe_list(items: List[str]) -> List[str]: seen = set() result: List[str] = [] for item in items: if item and item not in seen: seen.add(item) result.append(item) return result def _resolve_image_url(source_image_url: str) -> str: """Resolve encrypted/proxy/public image URLs to a directly fetchable URL.""" if not source_image_url: return source_image_url url_text = source_image_url.strip() if not url_text: return url_text if "/apiv1/oss/parse/" in url_text: try: parsed = urlparse(url_text) encrypted_url = parse_qs(parsed.query).get("url", [""])[0] if encrypted_url: return decrypt_url(encrypted_url) except Exception: pass try: decrypted_url = decrypt_url(url_text) if decrypted_url and decrypted_url != url_text: return decrypted_url except Exception: pass return url_text def _unique_ordered(items: List[str]) -> List[str]: seen = set() ordered = [] for item in items: if not item or item in seen: continue seen.add(item) ordered.append(item) return ordered def _build_scene_relations( db: Session, normalized_labels: List[str] ) -> tuple[list[str], dict[str, list[str]]]: """ Build scene relations by mapping sub-secondary scenes to secondary scenes, then querying for third scenes (hazards). Args: db: Database session normalized_labels: List of sub-secondary scene names (e.g., "加油机", "加油枪") Returns: Tuple of (third_scene_names, element_hazards) - third_scene_names: List of all third scene names (hazards) - element_hazards: Dict mapping original labels to their hazards """ third_scene_names: List[str] = [] element_hazards: Dict[str, List[str]] = {} for label in normalized_labels: # Translate sub-secondary scene to secondary scene for database query # e.g., "加油机" -> "加油设施及附属场地" query_label = translate_scenes_for_query([label])[0] if label else label # Query SecondScene table using the translated secondary scene name second_scene = ( db.query(SecondScene) .filter( SecondScene.second_scene_name == query_label, SecondScene.is_deleted == 0, ) .first() ) if not second_scene: # No matching secondary scene found, store empty hazards list element_hazards.setdefault(label, []) continue # Query ThirdScene table for hazards related to this secondary scene third_scenes = ( db.query(ThirdScene) .filter( ThirdScene.second_scene_id == second_scene.id, ThirdScene.is_deleted == 0, ) .all() ) if third_scenes: current_element_hazards = [item.third_scene_name for item in third_scenes] third_scene_names.extend(current_element_hazards) # Store hazards under the ORIGINAL sub-secondary scene label element_hazards[label] = _dedupe_list( element_hazards.get(label, []) + current_element_hazards ) else: element_hazards.setdefault(label, []) return _dedupe_list(third_scene_names), element_hazards @router.post("/hazard") async def hazard( request: Request, data: HazardRequest, db: Session = Depends(get_db), ): """隐患识别接口:严格按 Go 版的流程、字段和返回结构实现。""" user = request.state.user if not user: logger.warning("[hazard] request.state.user is empty") return {"statusCode": 401, "msg": "获取用户信息失败"} user_id = getattr(user, "id", None) or getattr(user, "user_id", None) or 1 user_code = _get_user_code(user) request_id = getattr(request.state, "request_id", None) or f"hazard-{int(time.time() * 1000)}" try: logger.info( "[hazard][%s] incoming payload: user_id=%r, user_code=%r, scene_name=%r, scene_type=%r, image_present=%s, image_url_present=%s, image_len=%s, image_url_len=%s, date=%r", request_id, user_id, user_code, data.scene_name, data.scene_type, bool(data.image), bool(data.image_url), len(data.image or ""), len(data.image_url or ""), data.date, ) logger.info( "[hazard][%s] request headers: content_type=%r, content_length=%r, user_agent=%r, path=%r, method=%r", request_id, request.headers.get("content-type"), request.headers.get("content-length"), request.headers.get("user-agent"), request.url.path, request.method, ) source_image_url = _resolve_image_url(data.source_image) logger.info( "[hazard][%s] resolved image url: raw=%r, resolved=%r", request_id, data.source_image, source_image_url, ) scene_key = _resolve_scene_key(data.source_scene) if not scene_key: scene_key = DEFAULT_SCENE_KEY logger.info( "[hazard][%s] resolved scene: source_scene=%r, scene_key=%r", request_id, data.source_scene, scene_key, ) if not source_image_url: logger.warning( "[hazard][%s] validation failed: scene_name=%r, scene_type=%r, image_present=%s, image_url_present=%s, raw_image_len=%s, raw_image_url_len=%s", request_id, data.scene_name, data.scene_type, bool(data.image), bool(data.image_url), len(data.image or ""), len(data.image_url or ""), ) return {"statusCode": 400, "msg": "图片链接不能为空"} scene_exists = _scene_exists(db, scene_key) logger.info( "[hazard][%s] scene lookup result: scene_key=%r, exists=%s", request_id, scene_key, scene_exists, ) if not scene_exists: logger.warning("[hazard][%s] scene not found: %r", request_id, scene_key) return {"statusCode": 500, "msg": "场景名称不存在"} logger.info( "[hazard][%s] downloading image: url=%r", request_id, source_image_url, ) try: async with httpx.AsyncClient(timeout=30.0) as client: img_response = await client.get(source_image_url) logger.info( "[hazard][%s] image download response: status_code=%s, content_type=%r, content_length=%s", request_id, img_response.status_code, img_response.headers.get("content-type"), img_response.headers.get("content-length"), ) img_response.raise_for_status() image_bytes = img_response.content except httpx.HTTPError as e: logger.error( "[hazard][%s] 下载图片失败: url=%r, error=%s, traceback=%s", request_id, source_image_url, e, traceback.format_exc(), ) error_msg = "图片下载失败" if isinstance(e, httpx.TimeoutException): error_msg = "图片下载超时,请检查网络连接或稍后重试" elif isinstance(e, httpx.ConnectError): error_msg = "无法连接到图片服务器,请检查图片链接是否正确" elif hasattr(e, 'response') and e.response is not None: status_code = e.response.status_code if status_code == 404: error_msg = "图片不存在,请检查图片链接是否正确" elif status_code == 403: error_msg = "无权访问该图片,请检查图片链接权限" elif status_code >= 500: error_msg = "图片服务器暂时不可用,请稍后重试" else: error_msg = f"图片下载失败(错误代码:{status_code})" return {"statusCode": 500, "msg": error_msg} logger.info( "[hazard][%s] image downloaded: bytes=%s", request_id, len(image_bytes), ) try: logger.info("[hazard][%s] calling yolo_service.detect_hazards: scene_key=%r", request_id, scene_key) yolo_result = await yolo_service.detect_hazards(image_bytes, scene_key) logger.info( "[hazard][%s] yolo result summary: keys=%s, labels_count=%s, boxes_count=%s, scores_count=%s, model_type=%r", request_id, list(yolo_result.keys()) if isinstance(yolo_result, dict) else type(yolo_result).__name__, len((yolo_result or {}).get("labels", []) or []) if isinstance(yolo_result, dict) else -1, len((yolo_result or {}).get("boxes", []) or []) if isinstance(yolo_result, dict) else -1, len((yolo_result or {}).get("scores", []) or []) if isinstance(yolo_result, dict) else -1, (yolo_result or {}).get("model_type", "") if isinstance(yolo_result, dict) else "", ) logger.debug("[hazard][%s] yolo result raw: %s", request_id, json.dumps(yolo_result, ensure_ascii=False, default=str)) except httpx.TimeoutException as e: logger.error( "[hazard][%s] YOLO检测超时: scene_key=%r, error=%s, traceback=%s", request_id, scene_key, e, traceback.format_exc(), ) return {"statusCode": 500, "msg": "AI识别服务响应超时,请稍后重试"} except httpx.ConnectError as e: logger.error( "[hazard][%s] YOLO服务连接失败: scene_key=%r, error=%s, traceback=%s", request_id, scene_key, e, traceback.format_exc(), ) return {"statusCode": 500, "msg": "无法连接到AI识别服务,请联系管理员"} except httpx.HTTPStatusError as e: logger.error( "[hazard][%s] YOLO服务返回错误状态: scene_key=%r, status_code=%s, error=%s, traceback=%s", request_id, scene_key, e.response.status_code if hasattr(e, 'response') else 'unknown', e, traceback.format_exc(), ) status_code = e.response.status_code if hasattr(e, 'response') else 500 if status_code == 400: return {"statusCode": 500, "msg": "图片格式不支持或图片已损坏,请更换图片"} elif status_code == 404: return {"statusCode": 500, "msg": f"未找到场景'{SCENE_DISPLAY_NAMES.get(scene_key, scene_key)}'的识别模型"} elif status_code >= 500: return {"statusCode": 500, "msg": "AI识别服务暂时不可用,请稍后重试"} else: return {"statusCode": 500, "msg": f"AI识别服务返回错误(代码:{status_code})"} except ValueError as e: logger.error( "[hazard][%s] YOLO返回数据格式错误: scene_key=%r, error=%s, traceback=%s", request_id, scene_key, e, traceback.format_exc(), ) return {"statusCode": 500, "msg": "AI识别服务返回数据格式错误,请联系管理员"} except Exception as e: logger.error( "[hazard][%s] YOLO检测失败: scene_key=%r, error=%s, traceback=%s", request_id, scene_key, e, traceback.format_exc(), ) error_msg = str(e) if "timeout" in error_msg.lower(): return {"statusCode": 500, "msg": "AI识别服务响应超时,请稍后重试"} elif "connection" in error_msg.lower(): return {"statusCode": 500, "msg": "无法连接到AI识别服务,请联系管理员"} else: return {"statusCode": 500, "msg": f"AI识别失败:{error_msg}"} labels = yolo_result.get("labels", []) or [] boxes = yolo_result.get("boxes", []) or [] scores = yolo_result.get("scores", []) or [] model_type = yolo_result.get("model_type", "") if not labels: logger.info( "[hazard][%s] YOLO返回空结果: scene_key=%r, model_type=%r, image_bytes=%s", request_id, scene_key, model_type, len(image_bytes), ) return { "statusCode": 200, "msg": "未检测到隐患", "data": { "scene_name": scene_key, "scene_display_name": SCENE_DISPLAY_NAMES.get(scene_key, scene_key), "tag_type": scene_key, "title": f"{SCENE_DISPLAY_NAMES.get(scene_key, scene_key)}隐患提示", "total_detections": 0, "detections": [], "model_type": model_type, "original_image": source_image_url, "annotated_image": source_image_url, "labels": "", "display_labels": [], "third_scenes": [], "element_hazards": {}, "no_hazards_detected": True, }, } normalized_labels = [ _normalize_label_for_scene(str(label), scene_key) for label in labels ] logger.info( "[hazard][%s] normalized labels: raw_labels=%s, normalized_labels=%s", request_id, labels, normalized_labels, ) # Translate pinyin/English labels to Chinese translated_labels = translate_labels(normalized_labels, fallback_to_original=True) logger.info( "[hazard][%s] translated labels: before=%s, after=%s", request_id, normalized_labels, translated_labels, ) normalized_labels = translated_labels detection_results = [] for i, label in enumerate(normalized_labels): if i < len(scores) and i < len(boxes): detection_results.append( { "label": label, "score": scores[i], "box": boxes[i], "percent": f"{scores[i] * 100:.2f}%", } ) logger.info( "[hazard][%s] detection results built: total=%s, detail=%s", request_id, len(detection_results), detection_results, ) third_scene_names, element_hazards = _build_scene_relations( db, normalized_labels ) logger.info( "[hazard][%s] scene relations built: third_scene_names=%s, element_hazards=%s", request_id, third_scene_names, element_hazards, ) scene_name = scene_key scene_display_name = SCENE_DISPLAY_NAMES.get(scene_key, scene_name) tag_type = scene_key title = scene_display_name description = " ".join(third_scene_names) current_step = 1 hazard_count = len(detection_results) # 不再绘制检测框,直接使用原始图片 logger.info( "[hazard][%s] skipping image annotation, using original image", request_id, ) recognition_image_url = source_image_url try: labels_text = _remove_duplicate_labels([item["label"] for item in detection_results]) if data.labels.strip(): labels_text = _remove_duplicate_labels( [item.strip() for item in data.labels.split(",") if item.strip()] ) or labels_text record_title = data.title.strip() or title record_description = data.description.strip() or description record_third_scene = data.third_scene.strip() or " ".join(third_scene_names) record_second_scene = data.second_scene.strip() record_tag_type = data.tag_type.strip() or tag_type record_current_step = data.current_step if data.current_step is not None else current_step record_hazard_count = ( data.hazard_count if data.hazard_count is not None else hazard_count ) raw_hazard_details = data.hazard_details if isinstance(raw_hazard_details, str): try: record_hazard_details = json.loads(raw_hazard_details) except Exception: record_hazard_details = raw_hazard_details elif raw_hazard_details: record_hazard_details = raw_hazard_details else: record_hazard_details = detection_results recognition_image_url = ( data.recognition_image_url.strip() or recognition_image_url ) now_ts = int(time.time()) record_payload = { "original_image_url": source_image_url, "recognition_image_url": recognition_image_url, "user_id": user_id, "scene_type": scene_name, "hazard_count": record_hazard_count, "current_step": record_current_step, "hazard_details": json.dumps(record_hazard_details, ensure_ascii=False) if not isinstance(record_hazard_details, str) else record_hazard_details, "title": record_title, "description": record_description, "labels": labels_text, "tag_type": record_tag_type, "second_scene": record_second_scene, "third_scene": record_third_scene, "created_at": now_ts, "updated_at": now_ts, "is_deleted": 0, } logger.info( "[hazard][%s] ===== HISTORY SAVE START ===== user_id=%r, scene_key=%r, labels=%r, title=%r, save_history=%s", request_id, user_id, scene_key, labels_text, record_title, data.save_history, ) logger.info( "[hazard][%s] history record payload: %s", request_id, json.dumps(record_payload, ensure_ascii=False, default=str), ) record_id_value = None if data.save_history: record = RecognitionRecord(**record_payload) db.add(record) logger.info("[hazard][%s] record added to session, committing db transaction", request_id) db.commit() logger.info("[hazard][%s] db commit success, refreshing record", request_id) db.refresh(record) record_id_value = getattr(record, "id", None) logger.info( "[hazard][%s] record saved successfully: id=%r, current_step=%r, created_at=%r, updated_at=%r", request_id, record_id_value, getattr(record, "current_step", None), getattr(record, "created_at", None), getattr(record, "updated_at", None), ) else: logger.info( "[hazard][%s] save_history=False, skipping record save", request_id, ) except Exception as e: logger.error( "[hazard][%s] 识别失败(保存记录阶段): error=%s, traceback=%s", request_id, e, traceback.format_exc(), ) db.rollback() logger.info("[hazard][%s] db rollback executed after record save failure", request_id) error_msg = str(e) if "connection" in error_msg.lower() or "lost connection" in error_msg.lower(): return {"statusCode": 500, "msg": "数据库连接失败,请稍后重试"} elif "duplicate" in error_msg.lower() or "unique" in error_msg.lower(): return {"statusCode": 500, "msg": "记录已存在,请勿重复提交"} elif "timeout" in error_msg.lower(): return {"statusCode": 500, "msg": "数据库操作超时,请稍后重试"} elif "disk" in error_msg.lower() or "space" in error_msg.lower(): return {"statusCode": 500, "msg": "存储空间不足,请联系管理员"} else: return {"statusCode": 500, "msg": f"保存识别记录失败:{error_msg}"} display_labels = _dedupe_list([item["label"] for item in detection_results]) response_data = { "scene_name": scene_name, "scene_display_name": scene_display_name, "tag_type": tag_type, "title": record_title, "description": record_description, "current_step": record_current_step, "hazard_count": record_hazard_count, "total_detections": len(detection_results), "detections": detection_results, "model_type": model_type, "original_image": source_image_url, "annotated_image": recognition_image_url, "recognition_image_url": recognition_image_url, "labels": labels_text, "display_labels": display_labels, "third_scenes": third_scene_names, "element_hazards": element_hazards, "record_id": record_id_value, } logger.info( "[hazard][%s] response prepared successfully: %s", request_id, json.dumps(response_data, ensure_ascii=False, default=str), ) return { "statusCode": 200, "msg": "识别成功", "data": response_data, } except Exception as e: logger.error( "[hazard][%s] 处理失败: error=%s, traceback=%s", request_id, e, traceback.format_exc(), ) db.rollback() logger.info("[hazard][%s] db rollback executed in outer exception handler", request_id) error_msg = str(e) if "connection" in error_msg.lower(): return {"statusCode": 500, "msg": "服务连接失败,请稍后重试"} elif "timeout" in error_msg.lower(): return {"statusCode": 500, "msg": "服务响应超时,请稍后重试"} elif "memory" in error_msg.lower() or "out of memory" in error_msg.lower(): return {"statusCode": 500, "msg": "系统资源不足,请稍后重试或联系管理员"} elif "permission" in error_msg.lower() or "denied" in error_msg.lower(): return {"statusCode": 500, "msg": "权限不足,请联系管理员"} else: return {"statusCode": 500, "msg": f"识别处理失败:{error_msg}"} @router.post("/save_step") async def save_step( request: Request, data: SaveStepRequest, db: Session = Depends(get_db), ): """Update RecognitionRecord.current_step.""" user = request.state.user if not user: logger.warning("[save_step] unauthorized request: request.state.user is empty") return {"statusCode": 401, "msg": "未授权"} user_id = getattr(user, "user_id", None) or getattr(user, "id", None) request_id = getattr(request.state, "request_id", None) or f"save-step-{int(time.time() * 1000)}" try: logger.info( "[save_step][%s] incoming payload: user_id=%r, record_id=%r, current_step=%r, path=%r, method=%r", request_id, user_id, data.record_id, data.current_step, request.url.path, request.method, ) logger.info( "[save_step][%s] updating RecognitionRecord: record_id=%r, user_id=%r", request_id, data.record_id, user_id, ) affected = ( db.query(RecognitionRecord) .filter( RecognitionRecord.id == data.record_id, RecognitionRecord.user_id == user_id, ) .update( { "current_step": data.current_step, "updated_at": int(time.time()), } ) ) logger.info("[save_step][%s] update result: affected_rows=%s", request_id, affected) if affected == 0: logger.warning( "[save_step][%s] record not found or no permission: record_id=%r, user_id=%r", request_id, data.record_id, user_id, ) return {"statusCode": 404, "msg": "记录不存在"} logger.info("[save_step][%s] committing db transaction", request_id) db.commit() logger.info( "[save_step][%s] save success: record_id=%r, current_step=%r", request_id, data.record_id, data.current_step, ) return { "statusCode": 200, "msg": "保存成功", "data": { "record_id": data.record_id, "current_step": data.current_step, }, } except Exception as e: logger.error( "[save_step][%s] 异常: error=%s, traceback=%s", request_id, e, traceback.format_exc(), ) db.rollback() logger.info("[save_step][%s] db rollback executed after save_step failure", request_id) return {"statusCode": 500, "msg": f"保存失败: {str(e)}"} async def _draw_boxes_and_watermark( image_bytes: bytes, hazards: List[Dict[str, Any]], user_name: str, user_account: str, ) -> bytes: """Draw detection boxes and a tiled watermark on the image.""" try: image = Image.open(io.BytesIO(image_bytes)).convert("RGBA") width, height = image.size overlay = Image.new("RGBA", (width, height), (255, 255, 255, 0)) draw = ImageDraw.Draw(overlay) try: font = ImageFont.truetype( "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 20 ) font_small = ImageFont.truetype( "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 14 ) except Exception: try: font = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 20) font_small = ImageFont.truetype("C:/Windows/Fonts/msyh.ttc", 14) except Exception: font = ImageFont.load_default() font_small = ImageFont.load_default() for hazard in hazards: bbox = hazard.get("bbox", []) or hazard.get("box", []) label = hazard.get("label", "") confidence = hazard.get("confidence", 0) if len(bbox) == 4: x1, y1, x2, y2 = bbox draw.rectangle([x1, y1, x2, y2], outline=(255, 0, 0, 255), width=3) text = f"{label} {confidence:.2f}" draw.text( (x1, max(0, y1 - 25)), text, fill=(255, 0, 0, 255), font=font, ) current_date = time.strftime("%Y/%m/%d") watermarks = [user_name or "", user_account or "", current_date] watermarks = [text for text in watermarks if text] if not watermarks: watermarks = [current_date] text_height_estimate = 50 text_width_estimate = 150 angle = 45 watermark_layer = Image.new( "RGBA", (width * 2, height * 2), (255, 255, 255, 0) ) watermark_draw = ImageDraw.Draw(watermark_layer) for y in range(-height, height * 2, text_height_estimate): for x in range(-width, width * 2, text_width_estimate): row_index = int(y / text_height_estimate) % len(watermarks) watermark_draw.text( (x, y), watermarks[row_index], fill=(128, 128, 128, 60), font=font_small, ) watermark_layer = watermark_layer.rotate( angle, expand=False, fillcolor=(255, 255, 255, 0) ) crop_x = (watermark_layer.width - width) // 2 crop_y = (watermark_layer.height - height) // 2 watermark_layer = watermark_layer.crop( (crop_x, crop_y, crop_x + width, crop_y + height) ) image = Image.alpha_composite(image, watermark_layer) image = Image.alpha_composite(image, overlay) final_image = image.convert("RGB") output = io.BytesIO() final_image.save(output, format="JPEG", quality=95) return output.getvalue() except Exception as e: logger.error( "[_draw_boxes_and_watermark] 图片处理失败: %s, traceback=%s", e, traceback.format_exc(), ) return image_bytes