| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700 |
- """
- Export Service for data export operations.
- Handles exporting annotations in various formats: JSON, CSV, COCO, YOLO.
- """
- import json
- import csv
- import io
- import os
- import uuid
- from datetime import datetime
- from typing import List, Dict, Any, Optional, Tuple
- from database import get_db_connection
- from schemas.export import (
- ExportFormat, ExportStatus, StatusFilter,
- TaskExportItem, AnnotationExportItem, ProjectExportData,
- COCOImage, COCOCategory, COCOAnnotation, COCOExportData
- )
- class ExportService:
- """Service for data export operations."""
-
- # Export directory
- EXPORT_DIR = "exports"
-
- @classmethod
- def ensure_export_dir(cls) -> str:
- """Ensure export directory exists."""
- if not os.path.exists(cls.EXPORT_DIR):
- os.makedirs(cls.EXPORT_DIR)
- return cls.EXPORT_DIR
-
- @staticmethod
- def create_export_job(
- project_id: str,
- format: str,
- status_filter: str,
- include_metadata: bool,
- created_by: str
- ) -> str:
- """
- Create a new export job record.
-
- Args:
- project_id: Project ID to export
- format: Export format
- status_filter: Task status filter
- include_metadata: Whether to include metadata
- created_by: User ID who created the job
-
- Returns:
- Export job ID
- """
- job_id = f"export_{uuid.uuid4().hex[:12]}"
-
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute("""
- INSERT INTO export_jobs (
- id, project_id, format, status, status_filter,
- include_metadata, created_by, created_at
- )
- VALUES (?, ?, ?, ?, ?, ?, ?, ?)
- """, (
- job_id, project_id, format, ExportStatus.PENDING.value,
- status_filter, include_metadata, created_by,
- datetime.now().isoformat()
- ))
-
- return job_id
-
- @staticmethod
- def update_export_job(
- job_id: str,
- status: str = None,
- file_path: str = None,
- error_message: str = None,
- total_tasks: int = None,
- exported_tasks: int = None
- ) -> None:
- """Update export job status and details."""
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- updates = []
- params = []
-
- if status:
- updates.append("status = ?")
- params.append(status)
- if status in [ExportStatus.COMPLETED.value, ExportStatus.FAILED.value]:
- updates.append("completed_at = ?")
- params.append(datetime.now().isoformat())
-
- if file_path is not None:
- updates.append("file_path = ?")
- params.append(file_path)
-
- if error_message is not None:
- updates.append("error_message = ?")
- params.append(error_message)
-
- if total_tasks is not None:
- updates.append("total_tasks = ?")
- params.append(total_tasks)
-
- if exported_tasks is not None:
- updates.append("exported_tasks = ?")
- params.append(exported_tasks)
-
- if updates:
- params.append(job_id)
- cursor.execute(f"""
- UPDATE export_jobs
- SET {', '.join(updates)}
- WHERE id = ?
- """, tuple(params))
-
- @staticmethod
- def get_export_job(job_id: str) -> Optional[Dict]:
- """Get export job by ID."""
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute("""
- SELECT * FROM export_jobs WHERE id = ?
- """, (job_id,))
- row = cursor.fetchone()
-
- if not row:
- return None
-
- return {
- "id": row["id"],
- "project_id": row["project_id"],
- "format": row["format"],
- "status": row["status"],
- "status_filter": row["status_filter"],
- "include_metadata": bool(row["include_metadata"]),
- "file_path": row["file_path"],
- "error_message": row["error_message"],
- "created_by": row["created_by"],
- "created_at": row["created_at"],
- "completed_at": row["completed_at"],
- "total_tasks": row["total_tasks"] or 0,
- "exported_tasks": row["exported_tasks"] or 0
- }
- @staticmethod
- def get_project_data(project_id: str) -> Optional[Dict]:
- """Get project basic info."""
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute("""
- SELECT id, name, description, config, created_at
- FROM projects WHERE id = ?
- """, (project_id,))
- row = cursor.fetchone()
-
- if not row:
- return None
-
- return {
- "id": row["id"],
- "name": row["name"],
- "description": row["description"],
- "config": row["config"],
- "created_at": row["created_at"]
- }
-
- @staticmethod
- def get_tasks_with_annotations(
- project_id: str,
- status_filter: str = "all"
- ) -> List[Dict]:
- """
- Get all tasks with their annotations for a project.
-
- Args:
- project_id: Project ID
- status_filter: Filter by task status
-
- Returns:
- List of tasks with annotations
- """
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # Build query based on status filter
- query = """
- SELECT t.id, t.name, t.data, t.status, t.assigned_to, t.created_at
- FROM tasks t
- WHERE t.project_id = ?
- """
- params = [project_id]
-
- if status_filter != "all":
- query += " AND t.status = ?"
- params.append(status_filter)
-
- query += " ORDER BY t.created_at"
-
- cursor.execute(query, tuple(params))
- task_rows = cursor.fetchall()
-
- tasks = []
- for task_row in task_rows:
- task_id = task_row["id"]
-
- # Get annotations for this task
- cursor.execute("""
- SELECT id, task_id, user_id, result, created_at, updated_at
- FROM annotations
- WHERE task_id = ?
- ORDER BY created_at
- """, (task_id,))
- annotation_rows = cursor.fetchall()
-
- annotations = []
- for ann_row in annotation_rows:
- result = ann_row["result"]
- if isinstance(result, str):
- try:
- result = json.loads(result)
- except json.JSONDecodeError:
- pass
-
- annotations.append({
- "id": ann_row["id"],
- "task_id": ann_row["task_id"],
- "user_id": ann_row["user_id"],
- "result": result,
- "created_at": str(ann_row["created_at"]),
- "updated_at": str(ann_row["updated_at"])
- })
-
- # Parse task data
- task_data = task_row["data"]
- if isinstance(task_data, str):
- try:
- task_data = json.loads(task_data)
- except json.JSONDecodeError:
- pass
-
- tasks.append({
- "id": task_row["id"],
- "name": task_row["name"],
- "data": task_data,
- "status": task_row["status"],
- "assigned_to": task_row["assigned_to"],
- "created_at": str(task_row["created_at"]),
- "annotations": annotations
- })
-
- return tasks
-
- @classmethod
- def export_to_json(
- cls,
- project_id: str,
- status_filter: str = "all",
- include_metadata: bool = True
- ) -> Tuple[str, int, int]:
- """
- Export project data to JSON format.
-
- Args:
- project_id: Project ID
- status_filter: Task status filter
- include_metadata: Whether to include metadata
-
- Returns:
- Tuple of (file_path, total_tasks, total_annotations)
- """
- project = cls.get_project_data(project_id)
- if not project:
- raise ValueError(f"Project {project_id} not found")
-
- tasks = cls.get_tasks_with_annotations(project_id, status_filter)
-
- total_annotations = sum(len(t["annotations"]) for t in tasks)
-
- export_data = {
- "project_id": project["id"],
- "project_name": project["name"],
- "export_format": "json",
- "export_time": datetime.now().isoformat(),
- "total_tasks": len(tasks),
- "total_annotations": total_annotations,
- "tasks": tasks
- }
-
- if include_metadata:
- export_data["project_description"] = project["description"]
- export_data["config"] = project["config"]
-
- # Write to file
- cls.ensure_export_dir()
- file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
- file_path = os.path.join(cls.EXPORT_DIR, file_name)
-
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(export_data, f, ensure_ascii=False, indent=2)
-
- return file_path, len(tasks), total_annotations
- @classmethod
- def export_to_csv(
- cls,
- project_id: str,
- status_filter: str = "all",
- include_metadata: bool = True
- ) -> Tuple[str, int, int]:
- """
- Export project data to CSV format.
-
- Args:
- project_id: Project ID
- status_filter: Task status filter
- include_metadata: Whether to include metadata
-
- Returns:
- Tuple of (file_path, total_tasks, total_annotations)
- """
- project = cls.get_project_data(project_id)
- if not project:
- raise ValueError(f"Project {project_id} not found")
-
- tasks = cls.get_tasks_with_annotations(project_id, status_filter)
-
- total_annotations = 0
-
- # Prepare CSV data
- cls.ensure_export_dir()
- file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
- file_path = os.path.join(cls.EXPORT_DIR, file_name)
-
- with open(file_path, 'w', newline='', encoding='utf-8') as f:
- writer = csv.writer(f)
-
- # Write header
- headers = [
- "task_id", "task_name", "task_status", "task_data",
- "annotation_id", "user_id", "annotation_result",
- "annotation_created_at", "annotation_updated_at"
- ]
- if include_metadata:
- headers.extend(["assigned_to", "task_created_at"])
- writer.writerow(headers)
-
- # Write data rows
- for task in tasks:
- if task["annotations"]:
- for ann in task["annotations"]:
- total_annotations += 1
- row = [
- task["id"],
- task["name"],
- task["status"],
- json.dumps(task["data"], ensure_ascii=False),
- ann["id"],
- ann["user_id"],
- json.dumps(ann["result"], ensure_ascii=False),
- ann["created_at"],
- ann["updated_at"]
- ]
- if include_metadata:
- row.extend([task["assigned_to"], task["created_at"]])
- writer.writerow(row)
- else:
- # Task without annotations
- row = [
- task["id"],
- task["name"],
- task["status"],
- json.dumps(task["data"], ensure_ascii=False),
- "", "", "", "", ""
- ]
- if include_metadata:
- row.extend([task["assigned_to"], task["created_at"]])
- writer.writerow(row)
-
- return file_path, len(tasks), total_annotations
-
- @classmethod
- def export_to_coco(
- cls,
- project_id: str,
- status_filter: str = "all",
- include_metadata: bool = True
- ) -> Tuple[str, int, int]:
- """
- Export project data to COCO format.
-
- COCO format is primarily for object detection tasks.
-
- Args:
- project_id: Project ID
- status_filter: Task status filter
- include_metadata: Whether to include metadata
-
- Returns:
- Tuple of (file_path, total_tasks, total_annotations)
- """
- project = cls.get_project_data(project_id)
- if not project:
- raise ValueError(f"Project {project_id} not found")
-
- tasks = cls.get_tasks_with_annotations(project_id, status_filter)
-
- # Initialize COCO structure
- coco_data = {
- "info": {
- "description": project["name"],
- "version": "1.0",
- "year": datetime.now().year,
- "contributor": "Annotation Platform",
- "date_created": datetime.now().isoformat()
- },
- "licenses": [],
- "images": [],
- "annotations": [],
- "categories": []
- }
-
- # Track categories
- category_map = {}
- category_id = 1
-
- # Track annotation ID
- annotation_id = 1
- total_annotations = 0
-
- for image_id, task in enumerate(tasks, start=1):
- # Add image entry
- task_data = task["data"]
- image_url = ""
- if isinstance(task_data, dict):
- image_url = task_data.get("image", task_data.get("image_url", ""))
-
- coco_data["images"].append({
- "id": image_id,
- "file_name": image_url or task["name"],
- "width": 0,
- "height": 0
- })
-
- # Process annotations
- for ann in task["annotations"]:
- result = ann["result"]
- if isinstance(result, dict):
- result = result.get("annotations", result.get("result", []))
- if not isinstance(result, list):
- result = [result] if result else []
-
- for item in result:
- if not isinstance(item, dict):
- continue
-
- total_annotations += 1
-
- # Extract label and bbox
- value = item.get("value", {})
- labels = value.get("rectanglelabels", value.get("labels", []))
-
- for label in labels:
- # Add category if new
- if label not in category_map:
- category_map[label] = category_id
- coco_data["categories"].append({
- "id": category_id,
- "name": label,
- "supercategory": ""
- })
- category_id += 1
-
- # Calculate bbox (COCO format: [x, y, width, height])
- x = value.get("x", 0)
- y = value.get("y", 0)
- width = value.get("width", 0)
- height = value.get("height", 0)
-
- coco_data["annotations"].append({
- "id": annotation_id,
- "image_id": image_id,
- "category_id": category_map[label],
- "bbox": [x, y, width, height],
- "area": width * height,
- "segmentation": [],
- "iscrowd": 0
- })
- annotation_id += 1
-
- # Write to file
- cls.ensure_export_dir()
- file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_coco.json"
- file_path = os.path.join(cls.EXPORT_DIR, file_name)
-
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(coco_data, f, ensure_ascii=False, indent=2)
-
- return file_path, len(tasks), total_annotations
- @classmethod
- def export_to_yolo(
- cls,
- project_id: str,
- status_filter: str = "all",
- include_metadata: bool = True
- ) -> Tuple[str, int, int]:
- """
- Export project data to YOLO format.
-
- YOLO format creates a directory with:
- - images/ (or references to images)
- - labels/ (txt files with annotations)
- - classes.txt (class names)
- - data.yaml (dataset configuration)
-
- For simplicity, we create a JSON file containing YOLO-formatted data.
-
- Args:
- project_id: Project ID
- status_filter: Task status filter
- include_metadata: Whether to include metadata
-
- Returns:
- Tuple of (file_path, total_tasks, total_annotations)
- """
- project = cls.get_project_data(project_id)
- if not project:
- raise ValueError(f"Project {project_id} not found")
-
- tasks = cls.get_tasks_with_annotations(project_id, status_filter)
-
- # Track classes
- class_map = {}
- class_id = 0
-
- # YOLO data structure
- yolo_data = {
- "info": {
- "project_name": project["name"],
- "export_time": datetime.now().isoformat(),
- "format": "yolo"
- },
- "classes": [],
- "images": []
- }
-
- total_annotations = 0
-
- for task in tasks:
- task_data = task["data"]
- image_url = ""
- image_width = 1.0 # Normalized
- image_height = 1.0 # Normalized
-
- if isinstance(task_data, dict):
- image_url = task_data.get("image", task_data.get("image_url", ""))
- image_width = task_data.get("width", 1.0)
- image_height = task_data.get("height", 1.0)
-
- image_entry = {
- "id": task["id"],
- "file_name": image_url or task["name"],
- "labels": []
- }
-
- # Process annotations
- for ann in task["annotations"]:
- result = ann["result"]
- if isinstance(result, dict):
- result = result.get("annotations", result.get("result", []))
- if not isinstance(result, list):
- result = [result] if result else []
-
- for item in result:
- if not isinstance(item, dict):
- continue
-
- total_annotations += 1
-
- # Extract label and bbox
- value = item.get("value", {})
- labels = value.get("rectanglelabels", value.get("labels", []))
-
- for label in labels:
- # Add class if new
- if label not in class_map:
- class_map[label] = class_id
- yolo_data["classes"].append(label)
- class_id += 1
-
- # Calculate YOLO format bbox
- # YOLO format: class_id x_center y_center width height (normalized 0-1)
- x = value.get("x", 0) / 100.0 # Convert from percentage
- y = value.get("y", 0) / 100.0
- w = value.get("width", 0) / 100.0
- h = value.get("height", 0) / 100.0
-
- # Convert to center coordinates
- x_center = x + w / 2
- y_center = y + h / 2
-
- image_entry["labels"].append({
- "class_id": class_map[label],
- "class_name": label,
- "x_center": round(x_center, 6),
- "y_center": round(y_center, 6),
- "width": round(w, 6),
- "height": round(h, 6),
- "yolo_line": f"{class_map[label]} {x_center:.6f} {y_center:.6f} {w:.6f} {h:.6f}"
- })
-
- yolo_data["images"].append(image_entry)
-
- # Write to file
- cls.ensure_export_dir()
- file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_yolo.json"
- file_path = os.path.join(cls.EXPORT_DIR, file_name)
-
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(yolo_data, f, ensure_ascii=False, indent=2)
-
- return file_path, len(tasks), total_annotations
-
- @classmethod
- def execute_export(
- cls,
- job_id: str,
- project_id: str,
- format: str,
- status_filter: str,
- include_metadata: bool
- ) -> Dict:
- """
- Execute the export job.
-
- Args:
- job_id: Export job ID
- project_id: Project ID
- format: Export format
- status_filter: Task status filter
- include_metadata: Whether to include metadata
-
- Returns:
- Export result with file path and statistics
- """
- try:
- # Update status to processing
- cls.update_export_job(job_id, status=ExportStatus.PROCESSING.value)
-
- # Execute export based on format
- if format == ExportFormat.JSON.value:
- file_path, total_tasks, total_annotations = cls.export_to_json(
- project_id, status_filter, include_metadata
- )
- elif format == ExportFormat.CSV.value:
- file_path, total_tasks, total_annotations = cls.export_to_csv(
- project_id, status_filter, include_metadata
- )
- elif format == ExportFormat.COCO.value:
- file_path, total_tasks, total_annotations = cls.export_to_coco(
- project_id, status_filter, include_metadata
- )
- elif format == ExportFormat.YOLO.value:
- file_path, total_tasks, total_annotations = cls.export_to_yolo(
- project_id, status_filter, include_metadata
- )
- else:
- raise ValueError(f"Unsupported export format: {format}")
-
- # Update job with success
- cls.update_export_job(
- job_id,
- status=ExportStatus.COMPLETED.value,
- file_path=file_path,
- total_tasks=total_tasks,
- exported_tasks=total_tasks
- )
-
- return {
- "success": True,
- "file_path": file_path,
- "total_tasks": total_tasks,
- "total_annotations": total_annotations
- }
-
- except Exception as e:
- # Update job with failure
- cls.update_export_job(
- job_id,
- status=ExportStatus.FAILED.value,
- error_message=str(e)
- )
-
- return {
- "success": False,
- "error": str(e)
- }
|