| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530 |
- import json
- import time
- from typing import Any, Optional
- from fastapi import APIRouter, Depends, Request
- from pydantic import BaseModel
- from sqlalchemy.orm import Session
- from database import get_db
- from models.scene import (
- FirstScene,
- RecognitionRecord,
- Scene,
- SceneTemplate,
- SecondScene,
- ThirdScene,
- )
- router = APIRouter()
- def _get_user_code(user: Any) -> str:
- return (
- getattr(user, "userCode", None)
- or getattr(user, "user_code", None)
- or getattr(user, "account", "")
- )
- def _load_hazard_details(record: RecognitionRecord):
- if not record.hazard_details:
- return []
- try:
- data = json.loads(record.hazard_details)
- return data if isinstance(data, list) else []
- except Exception:
- return []
- def _split_labels(labels):
- if not labels:
- return []
- if isinstance(labels, list):
- return [str(item).strip() for item in labels if str(item).strip()]
- return [
- item.strip()
- for item in str(labels).replace(",", ",").split(",")
- if item.strip()
- ]
- def _unique_ordered(items):
- seen = set()
- ordered = []
- for item in items:
- if not item or item in seen:
- continue
- seen.add(item)
- ordered.append(item)
- return ordered
- def _build_record_view(record: RecognitionRecord):
- hazard_details = _load_hazard_details(record)
- derived_labels = _unique_ordered(
- [
- str(item.get("label") or "").strip()
- for item in hazard_details
- if str(item.get("label") or "").strip()
- ]
- )
- display_labels = _split_labels(record.labels) or derived_labels
- if record.description:
- third_scenes = [item for item in str(record.description).split(" ") if item]
- else:
- third_scenes = derived_labels
- detections = [
- {
- "label": item.get("label", ""),
- "box": item.get("bbox") or item.get("box") or [],
- "bbox": item.get("bbox") or item.get("box") or [],
- "confidence": item.get("confidence", 0),
- }
- for item in hazard_details
- ]
- return {
- "id": record.id,
- "title": record.title or "隐患提示记录",
- "description": record.description or " ".join(third_scenes),
- "original_image_url": record.original_image_url,
- "recognition_image_url": record.recognition_image_url,
- "labels": record.labels or ",".join(display_labels),
- "display_labels": display_labels,
- "third_scenes": third_scenes,
- "tag_type": record.tag_type or record.scene_type,
- "scene_type": record.scene_type,
- "effect_evaluation": record.effect_evaluation,
- "hazard_details": hazard_details,
- "detections": detections,
- }
- def _resolve_record_id(
- recognition_id: Optional[int] = None,
- recognition_record_id: Optional[int] = None,
- ):
- return recognition_id or recognition_record_id
- @router.get("/get_scene_list")
- async def get_scene_list(db: Session = Depends(get_db)):
- scenes = db.query(Scene).filter(Scene.is_deleted == 0).all()
- return {
- "statusCode": 200,
- "msg": "success",
- "data": [
- {
- "id": s.id,
- "scene_name": s.scene_name,
- "scene_en_name": s.scene_en_name,
- }
- for s in scenes
- ],
- }
- @router.get("/get_first_scene_list")
- async def get_first_scene_list(scene_id: int, db: Session = Depends(get_db)):
- scenes = (
- db.query(FirstScene)
- .filter(FirstScene.scene_id == scene_id, FirstScene.is_deleted == 0)
- .all()
- )
- return {
- "statusCode": 200,
- "msg": "success",
- "data": [{"id": s.id, "first_scene_name": s.first_scene_name} for s in scenes],
- }
- @router.get("/get_second_scene_list")
- async def get_second_scene_list(
- first_scene_id: int, db: Session = Depends(get_db)
- ):
- scenes = (
- db.query(SecondScene)
- .filter(
- SecondScene.first_scene_id == first_scene_id,
- SecondScene.is_deleted == 0,
- )
- .all()
- )
- return {
- "statusCode": 200,
- "msg": "success",
- "data": [{"id": s.id, "second_scene_name": s.second_scene_name} for s in scenes],
- }
- @router.get("/get_third_scene_list")
- async def get_third_scene_list(
- second_scene_id: int, db: Session = Depends(get_db)
- ):
- scenes = (
- db.query(ThirdScene)
- .filter(
- ThirdScene.second_scene_id == second_scene_id,
- ThirdScene.is_deleted == 0,
- )
- .all()
- )
- return {
- "statusCode": 200,
- "msg": "success",
- "data": [
- {
- "id": s.id,
- "third_scene_name": s.third_scene_name,
- "correct_example_image": s.correct_example_image,
- "wrong_example_image": s.wrong_example_image,
- }
- for s in scenes
- ],
- }
- @router.get("/get_third_scene_example_image")
- async def get_third_scene_example_image(
- third_scene_name: str, db: Session = Depends(get_db)
- ):
- if not third_scene_name:
- return {"statusCode": 400, "msg": "三级场景名称不能为空"}
- scene = (
- db.query(ThirdScene)
- .filter(
- ThirdScene.third_scene_name == third_scene_name,
- ThirdScene.is_deleted == 0,
- )
- .first()
- )
- if not scene:
- return {"statusCode": 404, "msg": "三级场景不存在"}
- return {
- "statusCode": 200,
- "msg": "success",
- "data": {
- "id": scene.id,
- "third_scene_name": scene.third_scene_name,
- "correct_example_image": scene.correct_example_image,
- "wrong_example_image": scene.wrong_example_image,
- },
- }
- @router.get("/get_history_recognition_record")
- async def get_history_recognition_record(
- request: Request, db: Session = Depends(get_db)
- ):
- user = request.state.user
- if not user:
- return {"statusCode": 401, "msg": "未授权"}
- user_code = _get_user_code(user)
- records = (
- db.query(RecognitionRecord)
- .filter(RecognitionRecord.user_id == user_code, RecognitionRecord.is_deleted == 0)
- .order_by(RecognitionRecord.updated_at.desc())
- .all()
- )
- total = (
- db.query(RecognitionRecord)
- .filter(RecognitionRecord.user_id == user_code, RecognitionRecord.is_deleted == 0)
- .count()
- )
- return {
- "statusCode": 200,
- "msg": "success",
- "data": [
- {
- **_build_record_view(record),
- "created_at": record.created_at,
- }
- for record in records
- ],
- "total": total,
- }
- @router.get("/get_recognition_record_detail")
- async def get_recognition_record_detail(
- recognition_id: Optional[int] = None,
- recognition_record_id: Optional[int] = None,
- db: Session = Depends(get_db),
- ):
- record_id = _resolve_record_id(recognition_id, recognition_record_id)
- if not record_id:
- return {"statusCode": 422, "msg": "recognition_id 不能为空"}
- record = (
- db.query(RecognitionRecord)
- .filter(RecognitionRecord.id == record_id, RecognitionRecord.is_deleted == 0)
- .first()
- )
- if not record:
- return {"statusCode": 404, "msg": "记录不存在"}
- record_view = _build_record_view(record)
- return {
- "statusCode": 200,
- "msg": "success",
- "data": {
- "id": record.id,
- "user_id": record.user_id,
- "title": record_view["title"],
- "description": record_view["description"],
- "original_image_url": record.original_image_url,
- "recognition_image_url": record.recognition_image_url,
- "labels": record_view["labels"],
- "display_labels": record_view["display_labels"],
- "third_scenes": record_view["third_scenes"],
- "tag_type": record_view["tag_type"],
- "scene_type": record.scene_type,
- "scene_match": record.scene_match,
- "tip_accuracy": record.tip_accuracy,
- "effect_evaluation": record.effect_evaluation,
- "user_remark": record.user_remark,
- "hazard_details": record_view["hazard_details"],
- "detections": record_view["detections"],
- "created_at": record.created_at,
- "updated_at": record.updated_at,
- },
- }
- class DeleteRecognitionRequest(BaseModel):
- recognition_id: Optional[int] = None
- recognition_record_id: Optional[int] = None
- @router.post("/delete_recognition_record")
- async def delete_recognition_record(
- data: DeleteRecognitionRequest,
- request: Request,
- db: Session = Depends(get_db),
- ):
- user = request.state.user
- if not user:
- return {"statusCode": 401, "msg": "未授权"}
- record_id = _resolve_record_id(data.recognition_id, data.recognition_record_id)
- if not record_id:
- return {"statusCode": 422, "msg": "recognition_id 不能为空"}
- (
- db.query(RecognitionRecord)
- .filter(
- RecognitionRecord.id == record_id,
- RecognitionRecord.user_id == _get_user_code(user),
- )
- .update({"is_deleted": 1, "deleted_at": int(time.time())})
- )
- db.commit()
- return {"statusCode": 200, "msg": "删除成功"}
- class EvaluationRequest(BaseModel):
- id: int
- scene_match: Optional[int] = None
- tip_accuracy: Optional[int] = None
- effect_evaluation: Optional[int] = None
- user_remark: Optional[str] = None
- @router.post("/submit_evaluation")
- async def submit_evaluation(data: EvaluationRequest, db: Session = Depends(get_db)):
- record = (
- db.query(RecognitionRecord)
- .filter(RecognitionRecord.id == data.id, RecognitionRecord.is_deleted == 0)
- .first()
- )
- if not record:
- return {"statusCode": 404, "msg": "记录不存在"}
- if data.scene_match is not None:
- record.scene_match = data.scene_match
- if data.tip_accuracy is not None:
- record.tip_accuracy = data.tip_accuracy
- if data.effect_evaluation is not None:
- record.effect_evaluation = data.effect_evaluation
- if data.user_remark is not None:
- record.user_remark = data.user_remark
- record.updated_at = int(time.time())
- db.commit()
- return {"statusCode": 200, "msg": "success"}
- @router.get("/get_latest_recognition_record")
- async def get_latest_recognition_record(
- request: Request, db: Session = Depends(get_db)
- ):
- user = request.state.user
- if not user:
- return {"statusCode": 401, "msg": "未授权"}
- record = (
- db.query(RecognitionRecord)
- .filter(
- RecognitionRecord.user_id == _get_user_code(user),
- RecognitionRecord.is_deleted == 0,
- )
- .order_by(RecognitionRecord.created_at.desc())
- .first()
- )
- if not record:
- return {
- "statusCode": 200,
- "msg": "success",
- "data": {"effect_evaluation": 1},
- }
- return {
- "statusCode": 200,
- "msg": "success",
- "data": {
- "id": record.id,
- "title": record.title,
- "original_image_url": record.original_image_url,
- "recognition_image_url": record.recognition_image_url,
- "labels": record.labels,
- "created_at": record.created_at,
- "effect_evaluation": record.effect_evaluation,
- },
- }
- class SceneTemplateCreate(BaseModel):
- scene_name: str
- scene_type: str
- scene_desc: str = ""
- model_name: str
- @router.post("/scene_template")
- async def create_scene_template(
- data: SceneTemplateCreate, db: Session = Depends(get_db)
- ):
- template = SceneTemplate(
- scene_name=data.scene_name,
- scene_type=data.scene_type,
- scene_desc=data.scene_desc,
- model_name=data.model_name,
- created_at=int(time.time()),
- updated_at=int(time.time()),
- is_deleted=0,
- )
- db.add(template)
- db.commit()
- db.refresh(template)
- return {
- "statusCode": 200,
- "msg": "创建成功",
- "data": {"id": template.id},
- }
- @router.get("/scene_templates")
- async def get_scene_templates(
- page: int = 1,
- page_size: int = 20,
- db: Session = Depends(get_db),
- ):
- if page_size > 100:
- page_size = 100
- offset = (page - 1) * page_size
- total = db.query(SceneTemplate).filter(SceneTemplate.is_deleted == 0).count()
- templates = (
- db.query(SceneTemplate)
- .filter(SceneTemplate.is_deleted == 0)
- .order_by(SceneTemplate.created_at.desc())
- .offset(offset)
- .limit(page_size)
- .all()
- )
- return {
- "statusCode": 200,
- "msg": "success",
- "data": {
- "total": total,
- "items": [
- {
- "id": template.id,
- "scene_name": template.scene_name,
- "scene_type": template.scene_type,
- "scene_desc": template.scene_desc,
- "model_name": template.model_name,
- "created_at": template.created_at,
- }
- for template in templates
- ],
- },
- }
- @router.get("/recognition_records")
- async def get_recognition_records(
- request: Request,
- scene_type: str = "",
- page: int = 1,
- page_size: int = 20,
- db: Session = Depends(get_db),
- ):
- user = request.state.user
- if not user:
- return {"statusCode": 401, "msg": "未授权"}
- if page_size > 100:
- page_size = 100
- query = db.query(RecognitionRecord).filter(
- RecognitionRecord.user_id == _get_user_code(user),
- RecognitionRecord.is_deleted == 0,
- )
- if scene_type:
- query = query.filter(RecognitionRecord.scene_type == scene_type)
- total = query.count()
- offset = (page - 1) * page_size
- records = (
- query.order_by(RecognitionRecord.created_at.desc())
- .offset(offset)
- .limit(page_size)
- .all()
- )
- return {
- "statusCode": 200,
- "msg": "success",
- "data": {
- "total": total,
- "items": [
- {
- "id": record.id,
- "scene_type": record.scene_type,
- "original_image_url": record.original_image_url,
- "result_image_url": record.recognition_image_url,
- "hazard_count": record.hazard_count,
- "current_step": record.current_step,
- "created_at": record.created_at,
- }
- for record in records
- ],
- },
- }
|