import json import time from typing import Any, Optional from fastapi import APIRouter, Depends, Request from pydantic import BaseModel from sqlalchemy.orm import Session from database import get_db from models.scene import ( FirstScene, RecognitionRecord, Scene, SceneTemplate, SecondScene, ThirdScene, ) router = APIRouter() def _get_user_code(user: Any) -> str: return ( getattr(user, "userCode", None) or getattr(user, "user_code", None) or getattr(user, "account", "") ) def _load_hazard_details(record: RecognitionRecord): if not record.hazard_details: return [] try: data = json.loads(record.hazard_details) return data if isinstance(data, list) else [] except Exception: return [] def _split_labels(labels): if not labels: return [] if isinstance(labels, list): return [str(item).strip() for item in labels if str(item).strip()] return [ item.strip() for item in str(labels).replace(",", ",").split(",") if item.strip() ] def _unique_ordered(items): seen = set() ordered = [] for item in items: if not item or item in seen: continue seen.add(item) ordered.append(item) return ordered def _build_record_view(record: RecognitionRecord): hazard_details = _load_hazard_details(record) derived_labels = _unique_ordered( [ str(item.get("label") or "").strip() for item in hazard_details if str(item.get("label") or "").strip() ] ) display_labels = _split_labels(record.labels) or derived_labels if record.description: third_scenes = [item for item in str(record.description).split(" ") if item] else: third_scenes = derived_labels detections = [ { "label": item.get("label", ""), "box": item.get("bbox") or item.get("box") or [], "bbox": item.get("bbox") or item.get("box") or [], "confidence": item.get("confidence", 0), } for item in hazard_details ] return { "id": record.id, "title": record.title or "隐患提示记录", "description": record.description or " ".join(third_scenes), "original_image_url": record.original_image_url, "recognition_image_url": record.recognition_image_url, "labels": record.labels or ",".join(display_labels), "display_labels": display_labels, "third_scenes": third_scenes, "tag_type": record.tag_type or record.scene_type, "scene_type": record.scene_type, "effect_evaluation": record.effect_evaluation, "hazard_details": hazard_details, "detections": detections, } def _resolve_record_id( recognition_id: Optional[int] = None, recognition_record_id: Optional[int] = None, ): return recognition_id or recognition_record_id @router.get("/get_scene_list") async def get_scene_list(db: Session = Depends(get_db)): scenes = db.query(Scene).filter(Scene.is_deleted == 0).all() return { "statusCode": 200, "msg": "success", "data": [ { "id": s.id, "scene_name": s.scene_name, "scene_en_name": s.scene_en_name, } for s in scenes ], } @router.get("/get_first_scene_list") async def get_first_scene_list(scene_id: int, db: Session = Depends(get_db)): scenes = ( db.query(FirstScene) .filter(FirstScene.scene_id == scene_id, FirstScene.is_deleted == 0) .all() ) return { "statusCode": 200, "msg": "success", "data": [{"id": s.id, "first_scene_name": s.first_scene_name} for s in scenes], } @router.get("/get_second_scene_list") async def get_second_scene_list( first_scene_id: int, db: Session = Depends(get_db) ): scenes = ( db.query(SecondScene) .filter( SecondScene.first_scene_id == first_scene_id, SecondScene.is_deleted == 0, ) .all() ) return { "statusCode": 200, "msg": "success", "data": [{"id": s.id, "second_scene_name": s.second_scene_name} for s in scenes], } @router.get("/get_third_scene_list") async def get_third_scene_list( second_scene_id: int, db: Session = Depends(get_db) ): scenes = ( db.query(ThirdScene) .filter( ThirdScene.second_scene_id == second_scene_id, ThirdScene.is_deleted == 0, ) .all() ) return { "statusCode": 200, "msg": "success", "data": [ { "id": s.id, "third_scene_name": s.third_scene_name, "correct_example_image": s.correct_example_image, "wrong_example_image": s.wrong_example_image, } for s in scenes ], } @router.get("/get_third_scene_example_image") async def get_third_scene_example_image( third_scene_name: str, db: Session = Depends(get_db) ): if not third_scene_name: return {"statusCode": 400, "msg": "三级场景名称不能为空"} scene = ( db.query(ThirdScene) .filter( ThirdScene.third_scene_name == third_scene_name, ThirdScene.is_deleted == 0, ) .first() ) if not scene: return {"statusCode": 404, "msg": "三级场景不存在"} return { "statusCode": 200, "msg": "success", "data": { "id": scene.id, "third_scene_name": scene.third_scene_name, "correct_example_image": scene.correct_example_image, "wrong_example_image": scene.wrong_example_image, }, } @router.get("/get_history_recognition_record") async def get_history_recognition_record( request: Request, db: Session = Depends(get_db) ): user = request.state.user if not user: return {"statusCode": 401, "msg": "未授权"} user_code = _get_user_code(user) records = ( db.query(RecognitionRecord) .filter(RecognitionRecord.user_id == user_code, RecognitionRecord.is_deleted == 0) .order_by(RecognitionRecord.updated_at.desc()) .all() ) total = ( db.query(RecognitionRecord) .filter(RecognitionRecord.user_id == user_code, RecognitionRecord.is_deleted == 0) .count() ) return { "statusCode": 200, "msg": "success", "data": [ { **_build_record_view(record), "created_at": record.created_at, } for record in records ], "total": total, } @router.get("/get_recognition_record_detail") async def get_recognition_record_detail( recognition_id: Optional[int] = None, recognition_record_id: Optional[int] = None, db: Session = Depends(get_db), ): record_id = _resolve_record_id(recognition_id, recognition_record_id) if not record_id: return {"statusCode": 422, "msg": "recognition_id 不能为空"} record = ( db.query(RecognitionRecord) .filter(RecognitionRecord.id == record_id, RecognitionRecord.is_deleted == 0) .first() ) if not record: return {"statusCode": 404, "msg": "记录不存在"} record_view = _build_record_view(record) return { "statusCode": 200, "msg": "success", "data": { "id": record.id, "user_id": record.user_id, "title": record_view["title"], "description": record_view["description"], "original_image_url": record.original_image_url, "recognition_image_url": record.recognition_image_url, "labels": record_view["labels"], "display_labels": record_view["display_labels"], "third_scenes": record_view["third_scenes"], "tag_type": record_view["tag_type"], "scene_type": record.scene_type, "scene_match": record.scene_match, "tip_accuracy": record.tip_accuracy, "effect_evaluation": record.effect_evaluation, "user_remark": record.user_remark, "hazard_details": record_view["hazard_details"], "detections": record_view["detections"], "created_at": record.created_at, "updated_at": record.updated_at, }, } class DeleteRecognitionRequest(BaseModel): recognition_id: Optional[int] = None recognition_record_id: Optional[int] = None @router.post("/delete_recognition_record") async def delete_recognition_record( data: DeleteRecognitionRequest, request: Request, db: Session = Depends(get_db), ): user = request.state.user if not user: return {"statusCode": 401, "msg": "未授权"} record_id = _resolve_record_id(data.recognition_id, data.recognition_record_id) if not record_id: return {"statusCode": 422, "msg": "recognition_id 不能为空"} ( db.query(RecognitionRecord) .filter( RecognitionRecord.id == record_id, RecognitionRecord.user_id == _get_user_code(user), ) .update({"is_deleted": 1, "deleted_at": int(time.time())}) ) db.commit() return {"statusCode": 200, "msg": "删除成功"} class EvaluationRequest(BaseModel): id: int scene_match: Optional[int] = None tip_accuracy: Optional[int] = None effect_evaluation: Optional[int] = None user_remark: Optional[str] = None @router.post("/submit_evaluation") async def submit_evaluation(data: EvaluationRequest, db: Session = Depends(get_db)): record = ( db.query(RecognitionRecord) .filter(RecognitionRecord.id == data.id, RecognitionRecord.is_deleted == 0) .first() ) if not record: return {"statusCode": 404, "msg": "记录不存在"} if data.scene_match is not None: record.scene_match = data.scene_match if data.tip_accuracy is not None: record.tip_accuracy = data.tip_accuracy if data.effect_evaluation is not None: record.effect_evaluation = data.effect_evaluation if data.user_remark is not None: record.user_remark = data.user_remark record.updated_at = int(time.time()) db.commit() return {"statusCode": 200, "msg": "success"} @router.get("/get_latest_recognition_record") async def get_latest_recognition_record( request: Request, db: Session = Depends(get_db) ): user = request.state.user if not user: return {"statusCode": 401, "msg": "未授权"} record = ( db.query(RecognitionRecord) .filter( RecognitionRecord.user_id == _get_user_code(user), RecognitionRecord.is_deleted == 0, ) .order_by(RecognitionRecord.created_at.desc()) .first() ) if not record: return { "statusCode": 200, "msg": "success", "data": {"effect_evaluation": 1}, } return { "statusCode": 200, "msg": "success", "data": { "id": record.id, "title": record.title, "original_image_url": record.original_image_url, "recognition_image_url": record.recognition_image_url, "labels": record.labels, "created_at": record.created_at, "effect_evaluation": record.effect_evaluation, }, } class SceneTemplateCreate(BaseModel): scene_name: str scene_type: str scene_desc: str = "" model_name: str @router.post("/scene_template") async def create_scene_template( data: SceneTemplateCreate, db: Session = Depends(get_db) ): template = SceneTemplate( scene_name=data.scene_name, scene_type=data.scene_type, scene_desc=data.scene_desc, model_name=data.model_name, created_at=int(time.time()), updated_at=int(time.time()), is_deleted=0, ) db.add(template) db.commit() db.refresh(template) return { "statusCode": 200, "msg": "创建成功", "data": {"id": template.id}, } @router.get("/scene_templates") async def get_scene_templates( page: int = 1, page_size: int = 20, db: Session = Depends(get_db), ): if page_size > 100: page_size = 100 offset = (page - 1) * page_size total = db.query(SceneTemplate).filter(SceneTemplate.is_deleted == 0).count() templates = ( db.query(SceneTemplate) .filter(SceneTemplate.is_deleted == 0) .order_by(SceneTemplate.created_at.desc()) .offset(offset) .limit(page_size) .all() ) return { "statusCode": 200, "msg": "success", "data": { "total": total, "items": [ { "id": template.id, "scene_name": template.scene_name, "scene_type": template.scene_type, "scene_desc": template.scene_desc, "model_name": template.model_name, "created_at": template.created_at, } for template in templates ], }, } @router.get("/recognition_records") async def get_recognition_records( request: Request, scene_type: str = "", page: int = 1, page_size: int = 20, db: Session = Depends(get_db), ): user = request.state.user if not user: return {"statusCode": 401, "msg": "未授权"} if page_size > 100: page_size = 100 query = db.query(RecognitionRecord).filter( RecognitionRecord.user_id == _get_user_code(user), RecognitionRecord.is_deleted == 0, ) if scene_type: query = query.filter(RecognitionRecord.scene_type == scene_type) total = query.count() offset = (page - 1) * page_size records = ( query.order_by(RecognitionRecord.created_at.desc()) .offset(offset) .limit(page_size) .all() ) return { "statusCode": 200, "msg": "success", "data": { "total": total, "items": [ { "id": record.id, "scene_type": record.scene_type, "original_image_url": record.original_image_url, "result_image_url": record.recognition_image_url, "hazard_count": record.hazard_count, "current_step": record.current_step, "created_at": record.created_at, } for record in records ], }, }