from fastapi import APIRouter, Depends, Request from sqlalchemy.orm import Session from pydantic import BaseModel from typing import Optional from database import get_db from models.scene import Scene, FirstScene, SecondScene, ThirdScene, RecognitionRecord, SceneTemplate import time router = APIRouter() @router.get("/get_scene_list") async def get_scene_list(db: Session = Depends(get_db)): """获取场景列表""" scenes = db.query(Scene).filter(Scene.is_deleted == 0).all() return { "statusCode": 200, "msg": "success", "data": [{"id": s.id, "scene_name": s.scene_name, "scene_en_name": s.scene_en_name} for s in scenes] } @router.get("/get_first_scene_list") async def get_first_scene_list(scene_id: int, db: Session = Depends(get_db)): """获取一级场景列表""" scenes = db.query(FirstScene).filter( FirstScene.scene_id == scene_id, FirstScene.is_deleted == 0 ).all() return { "statusCode": 200, "msg": "success", "data": [{"id": s.id, "first_scene_name": s.first_scene_name} for s in scenes] } @router.get("/get_second_scene_list") async def get_second_scene_list(first_scene_id: int, db: Session = Depends(get_db)): """获取二级场景列表""" scenes = db.query(SecondScene).filter( SecondScene.first_scene_id == first_scene_id, SecondScene.is_deleted == 0 ).all() return { "statusCode": 200, "msg": "success", "data": [{"id": s.id, "second_scene_name": s.second_scene_name} for s in scenes] } @router.get("/get_third_scene_list") async def get_third_scene_list(second_scene_id: int, db: Session = Depends(get_db)): """获取三级场景列表""" scenes = db.query(ThirdScene).filter( ThirdScene.second_scene_id == second_scene_id, ThirdScene.is_deleted == 0 ).all() return { "statusCode": 200, "msg": "success", "data": [{ "id": s.id, "third_scene_name": s.third_scene_name, "correct_example_image": s.correct_example_image, "wrong_example_image": s.wrong_example_image } for s in scenes] } @router.get("/get_third_scene_example_image") async def get_third_scene_example_image(third_scene_name: str, db: Session = Depends(get_db)): """获取三级场景示例图""" if not third_scene_name: return {"statusCode": 400, "msg": "三级场景名称不能为空"} scene = db.query(ThirdScene).filter( ThirdScene.third_scene_name == third_scene_name, ThirdScene.is_deleted == 0 ).first() if not scene: return {"statusCode": 404, "msg": "三级场景不存在"} return { "statusCode": 200, "msg": "success", "data": { "id": scene.id, "third_scene_name": scene.third_scene_name, "correct_example_image": scene.correct_example_image, "wrong_example_image": scene.wrong_example_image } } @router.get("/get_history_recognition_record") async def get_history_recognition_record(request: Request, db: Session = Depends(get_db)): """获取隐患识别历史记录""" user = request.state.user if not user: return {"statusCode": 401, "msg": "未授权"} # 获取所有记录(不限制数量) records = db.query(RecognitionRecord).filter( RecognitionRecord.user_id == user.userCode, RecognitionRecord.is_deleted == 0 ).order_by(RecognitionRecord.updated_at.desc()).all() # 计算总数 total = db.query(RecognitionRecord).filter( RecognitionRecord.user_id == user.userCode, RecognitionRecord.is_deleted == 0 ).count() return { "statusCode": 200, "msg": "success", "data": [{ "id": r.id, "title": r.title, "original_image_url": r.original_image_url, "recognition_image_url": r.recognition_image_url, "labels": r.labels, "created_at": r.created_at } for r in records], "total": total } @router.get("/get_recognition_record_detail") async def get_recognition_record_detail(recognition_id: int, db: Session = Depends(get_db)): """获取识别记录详情""" record = db.query(RecognitionRecord).filter( RecognitionRecord.id == recognition_id, RecognitionRecord.is_deleted == 0 ).first() if not record: return {"statusCode": 404, "msg": "记录不存在"} # 将 Description 字符串转换为数组 third_scenes = [] if record.description: third_scenes = record.description.split(" ") return { "statusCode": 200, "msg": "success", "data": { "id": record.id, "user_id": record.user_id, "title": record.title, "description": record.description, "original_image_url": record.original_image_url, "recognition_image_url": record.recognition_image_url, "labels": record.labels, "third_scenes": third_scenes, "tag_type": record.tag_type, "scene_match": record.scene_match, "tip_accuracy": record.tip_accuracy, "effect_evaluation": record.effect_evaluation, "user_remark": record.user_remark, "created_at": record.created_at, "updated_at": record.updated_at } } class DeleteRecognitionRequest(BaseModel): recognition_id: int @router.post("/delete_recognition_record") async def delete_recognition_record(data: DeleteRecognitionRequest, request: Request, db: Session = Depends(get_db)): """删除识别记录(软删除)""" user = request.state.user if not user: return {"statusCode": 401, "msg": "未授权"} db.query(RecognitionRecord).filter( RecognitionRecord.id == data.recognition_id, RecognitionRecord.user_id == user.userCode ).update({ "is_deleted": 1, "deleted_at": int(time.time()) }) db.commit() return {"statusCode": 200, "msg": "删除成功"} class EvaluationRequest(BaseModel): id: int scene_match: Optional[int] = None tip_accuracy: Optional[int] = None effect_evaluation: Optional[int] = None user_remark: Optional[str] = None @router.post("/submit_evaluation") async def submit_evaluation(data: EvaluationRequest, db: Session = Depends(get_db)): """提交点评""" record = db.query(RecognitionRecord).filter( RecognitionRecord.id == data.id, RecognitionRecord.is_deleted == 0 ).first() if not record: return {"statusCode": 404, "msg": "记录不存在"} # 更新评价字段 if data.scene_match is not None: record.scene_match = data.scene_match if data.tip_accuracy is not None: record.tip_accuracy = data.tip_accuracy if data.effect_evaluation is not None: record.effect_evaluation = data.effect_evaluation if data.user_remark is not None: record.user_remark = data.user_remark record.updated_at = int(time.time()) db.commit() return {"statusCode": 200, "msg": "success"} @router.get("/get_latest_recognition_record") async def get_latest_recognition_record(request: Request, db: Session = Depends(get_db)): """获取最新识别记录""" user = request.state.user if not user: return {"statusCode": 401, "msg": "未授权"} record = db.query(RecognitionRecord).filter( RecognitionRecord.user_id == user.userCode, RecognitionRecord.is_deleted == 0 ).order_by(RecognitionRecord.created_at.desc()).first() # 如果数据为空,则构建一个假数据 effect_evaluation=1 给前端 if not record: return { "statusCode": 200, "msg": "success", "data": { "effect_evaluation": 1 } } return { "statusCode": 200, "msg": "success", "data": { "id": record.id, "title": record.title, "original_image_url": record.original_image_url, "recognition_image_url": record.recognition_image_url, "labels": record.labels, "created_at": record.created_at, "effect_evaluation": record.effect_evaluation } } # ============================================================ # 场景模板接口(对齐Go版本) # ============================================================ class SceneTemplateCreate(BaseModel): """创建场景模板请求""" scene_name: str scene_type: str scene_desc: str = "" model_name: str @router.post("/scene_template") async def create_scene_template(data: SceneTemplateCreate, db: Session = Depends(get_db)): """创建场景模板""" template = SceneTemplate( scene_name=data.scene_name, scene_type=data.scene_type, scene_desc=data.scene_desc, model_name=data.model_name, created_at=int(time.time()), updated_at=int(time.time()), is_deleted=0 ) db.add(template) db.commit() db.refresh(template) return { "statusCode": 200, "msg": "创建成功", "data": {"id": template.id} } @router.get("/scene_templates") async def get_scene_templates( page: int = 1, page_size: int = 20, db: Session = Depends(get_db) ): """获取场景模板列表(分页)""" # 限制page_size最大值 if page_size > 100: page_size = 100 offset = (page - 1) * page_size # 查询总数 total = db.query(SceneTemplate).filter( SceneTemplate.is_deleted == 0 ).count() # 查询列表 templates = db.query(SceneTemplate).filter( SceneTemplate.is_deleted == 0 ).order_by(SceneTemplate.created_at.desc()).offset(offset).limit(page_size).all() return { "statusCode": 200, "msg": "success", "data": { "total": total, "items": [ { "id": t.id, "scene_name": t.scene_name, "scene_type": t.scene_type, "scene_desc": t.scene_desc, "model_name": t.model_name, "created_at": t.created_at } for t in templates ] } } @router.get("/recognition_records") async def get_recognition_records( request: Request, scene_type: str = "", page: int = 1, page_size: int = 20, db: Session = Depends(get_db) ): """获取识别记录列表(分页+筛选)- 符合REST规范""" user = request.state.user if not user: return {"statusCode": 401, "msg": "未授权"} # 限制page_size最大值 if page_size > 100: page_size = 100 # 构建查询条件 query = db.query(RecognitionRecord).filter( RecognitionRecord.user_id == user.userCode, RecognitionRecord.is_deleted == 0 ) # 场景类型筛选 if scene_type: query = query.filter(RecognitionRecord.scene_type == scene_type) # 查询总数 total = query.count() # 分页查询 offset = (page - 1) * page_size records = query.order_by( RecognitionRecord.created_at.desc() ).offset(offset).limit(page_size).all() return { "statusCode": 200, "msg": "success", "data": { "total": total, "items": [ { "id": r.id, "scene_type": r.scene_type, "original_image_url": r.original_image_url, "result_image_url": r.recognition_image_url, "hazard_count": r.hazard_count, "current_step": r.current_step, "created_at": r.created_at } for r in records ] } }