""" Export Service for data export operations. Handles exporting annotations in various formats: JSON, CSV, COCO, YOLO. """ import json import csv import io import os import uuid from datetime import datetime from typing import List, Dict, Any, Optional, Tuple from database import get_db_connection from schemas.export import ( ExportFormat, ExportStatus, StatusFilter, TaskExportItem, AnnotationExportItem, ProjectExportData, COCOImage, COCOCategory, COCOAnnotation, COCOExportData ) class ExportService: """Service for data export operations.""" # Export directory EXPORT_DIR = "exports" @classmethod def ensure_export_dir(cls) -> str: """Ensure export directory exists.""" if not os.path.exists(cls.EXPORT_DIR): os.makedirs(cls.EXPORT_DIR) return cls.EXPORT_DIR @staticmethod def create_export_job( project_id: str, format: str, status_filter: str, include_metadata: bool, created_by: str ) -> str: """ Create a new export job record. Args: project_id: Project ID to export format: Export format status_filter: Task status filter include_metadata: Whether to include metadata created_by: User ID who created the job Returns: Export job ID """ job_id = f"export_{uuid.uuid4().hex[:12]}" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO export_jobs ( id, project_id, format, status, status_filter, include_metadata, created_by, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( job_id, project_id, format, ExportStatus.PENDING.value, status_filter, include_metadata, created_by, datetime.now().isoformat() )) return job_id @staticmethod def update_export_job( job_id: str, status: str = None, file_path: str = None, error_message: str = None, total_tasks: int = None, exported_tasks: int = None ) -> None: """Update export job status and details.""" with get_db_connection() as conn: cursor = conn.cursor() updates = [] params = [] if status: updates.append("status = ?") params.append(status) if status in [ExportStatus.COMPLETED.value, ExportStatus.FAILED.value]: updates.append("completed_at = ?") params.append(datetime.now().isoformat()) if file_path is not None: updates.append("file_path = ?") params.append(file_path) if error_message is not None: updates.append("error_message = ?") params.append(error_message) if total_tasks is not None: updates.append("total_tasks = ?") params.append(total_tasks) if exported_tasks is not None: updates.append("exported_tasks = ?") params.append(exported_tasks) if updates: params.append(job_id) cursor.execute(f""" UPDATE export_jobs SET {', '.join(updates)} WHERE id = ? """, tuple(params)) @staticmethod def get_export_job(job_id: str) -> Optional[Dict]: """Get export job by ID.""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM export_jobs WHERE id = ? """, (job_id,)) row = cursor.fetchone() if not row: return None return { "id": row["id"], "project_id": row["project_id"], "format": row["format"], "status": row["status"], "status_filter": row["status_filter"], "include_metadata": bool(row["include_metadata"]), "file_path": row["file_path"], "error_message": row["error_message"], "created_by": row["created_by"], "created_at": row["created_at"], "completed_at": row["completed_at"], "total_tasks": row["total_tasks"] or 0, "exported_tasks": row["exported_tasks"] or 0 } @staticmethod def get_project_data(project_id: str) -> Optional[Dict]: """Get project basic info.""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT id, name, description, config, created_at FROM projects WHERE id = ? """, (project_id,)) row = cursor.fetchone() if not row: return None return { "id": row["id"], "name": row["name"], "description": row["description"], "config": row["config"], "created_at": row["created_at"] } @staticmethod def get_tasks_with_annotations( project_id: str, status_filter: str = "all" ) -> List[Dict]: """ Get all tasks with their annotations for a project. Args: project_id: Project ID status_filter: Filter by task status Returns: List of tasks with annotations """ with get_db_connection() as conn: cursor = conn.cursor() # Build query based on status filter query = """ SELECT t.id, t.name, t.data, t.status, t.assigned_to, t.created_at FROM tasks t WHERE t.project_id = ? """ params = [project_id] if status_filter != "all": query += " AND t.status = ?" params.append(status_filter) query += " ORDER BY t.created_at" cursor.execute(query, tuple(params)) task_rows = cursor.fetchall() tasks = [] for task_row in task_rows: task_id = task_row["id"] # Get annotations for this task cursor.execute(""" SELECT id, task_id, user_id, result, created_at, updated_at FROM annotations WHERE task_id = ? ORDER BY created_at """, (task_id,)) annotation_rows = cursor.fetchall() annotations = [] for ann_row in annotation_rows: result = ann_row["result"] if isinstance(result, str): try: result = json.loads(result) except json.JSONDecodeError: pass annotations.append({ "id": ann_row["id"], "task_id": ann_row["task_id"], "user_id": ann_row["user_id"], "result": result, "created_at": str(ann_row["created_at"]), "updated_at": str(ann_row["updated_at"]) }) # Parse task data task_data = task_row["data"] if isinstance(task_data, str): try: task_data = json.loads(task_data) except json.JSONDecodeError: pass tasks.append({ "id": task_row["id"], "name": task_row["name"], "data": task_data, "status": task_row["status"], "assigned_to": task_row["assigned_to"], "created_at": str(task_row["created_at"]), "annotations": annotations }) return tasks @classmethod def export_to_json( cls, project_id: str, status_filter: str = "all", include_metadata: bool = True ) -> Tuple[str, int, int]: """ Export project data to JSON format. Args: project_id: Project ID status_filter: Task status filter include_metadata: Whether to include metadata Returns: Tuple of (file_path, total_tasks, total_annotations) """ project = cls.get_project_data(project_id) if not project: raise ValueError(f"Project {project_id} not found") tasks = cls.get_tasks_with_annotations(project_id, status_filter) total_annotations = sum(len(t["annotations"]) for t in tasks) export_data = { "project_id": project["id"], "project_name": project["name"], "export_format": "json", "export_time": datetime.now().isoformat(), "total_tasks": len(tasks), "total_annotations": total_annotations, "tasks": tasks } if include_metadata: export_data["project_description"] = project["description"] export_data["config"] = project["config"] # Write to file cls.ensure_export_dir() file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" file_path = os.path.join(cls.EXPORT_DIR, file_name) with open(file_path, 'w', encoding='utf-8') as f: json.dump(export_data, f, ensure_ascii=False, indent=2) return file_path, len(tasks), total_annotations @classmethod def export_to_csv( cls, project_id: str, status_filter: str = "all", include_metadata: bool = True ) -> Tuple[str, int, int]: """ Export project data to CSV format. Args: project_id: Project ID status_filter: Task status filter include_metadata: Whether to include metadata Returns: Tuple of (file_path, total_tasks, total_annotations) """ project = cls.get_project_data(project_id) if not project: raise ValueError(f"Project {project_id} not found") tasks = cls.get_tasks_with_annotations(project_id, status_filter) total_annotations = 0 # Prepare CSV data cls.ensure_export_dir() file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" file_path = os.path.join(cls.EXPORT_DIR, file_name) with open(file_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) # Write header headers = [ "task_id", "task_name", "task_status", "task_data", "annotation_id", "user_id", "annotation_result", "annotation_created_at", "annotation_updated_at" ] if include_metadata: headers.extend(["assigned_to", "task_created_at"]) writer.writerow(headers) # Write data rows for task in tasks: if task["annotations"]: for ann in task["annotations"]: total_annotations += 1 row = [ task["id"], task["name"], task["status"], json.dumps(task["data"], ensure_ascii=False), ann["id"], ann["user_id"], json.dumps(ann["result"], ensure_ascii=False), ann["created_at"], ann["updated_at"] ] if include_metadata: row.extend([task["assigned_to"], task["created_at"]]) writer.writerow(row) else: # Task without annotations row = [ task["id"], task["name"], task["status"], json.dumps(task["data"], ensure_ascii=False), "", "", "", "", "" ] if include_metadata: row.extend([task["assigned_to"], task["created_at"]]) writer.writerow(row) return file_path, len(tasks), total_annotations @classmethod def export_to_coco( cls, project_id: str, status_filter: str = "all", include_metadata: bool = True ) -> Tuple[str, int, int]: """ Export project data to COCO format. COCO format is primarily for object detection tasks. Args: project_id: Project ID status_filter: Task status filter include_metadata: Whether to include metadata Returns: Tuple of (file_path, total_tasks, total_annotations) """ project = cls.get_project_data(project_id) if not project: raise ValueError(f"Project {project_id} not found") tasks = cls.get_tasks_with_annotations(project_id, status_filter) # Initialize COCO structure coco_data = { "info": { "description": project["name"], "version": "1.0", "year": datetime.now().year, "contributor": "Annotation Platform", "date_created": datetime.now().isoformat() }, "licenses": [], "images": [], "annotations": [], "categories": [] } # Track categories category_map = {} category_id = 1 # Track annotation ID annotation_id = 1 total_annotations = 0 for image_id, task in enumerate(tasks, start=1): # Add image entry task_data = task["data"] image_url = "" if isinstance(task_data, dict): image_url = task_data.get("image", task_data.get("image_url", "")) coco_data["images"].append({ "id": image_id, "file_name": image_url or task["name"], "width": 0, "height": 0 }) # Process annotations for ann in task["annotations"]: result = ann["result"] if isinstance(result, dict): result = result.get("annotations", result.get("result", [])) if not isinstance(result, list): result = [result] if result else [] for item in result: if not isinstance(item, dict): continue total_annotations += 1 # Extract label and bbox value = item.get("value", {}) labels = value.get("rectanglelabels", value.get("labels", [])) for label in labels: # Add category if new if label not in category_map: category_map[label] = category_id coco_data["categories"].append({ "id": category_id, "name": label, "supercategory": "" }) category_id += 1 # Calculate bbox (COCO format: [x, y, width, height]) x = value.get("x", 0) y = value.get("y", 0) width = value.get("width", 0) height = value.get("height", 0) coco_data["annotations"].append({ "id": annotation_id, "image_id": image_id, "category_id": category_map[label], "bbox": [x, y, width, height], "area": width * height, "segmentation": [], "iscrowd": 0 }) annotation_id += 1 # Write to file cls.ensure_export_dir() file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_coco.json" file_path = os.path.join(cls.EXPORT_DIR, file_name) with open(file_path, 'w', encoding='utf-8') as f: json.dump(coco_data, f, ensure_ascii=False, indent=2) return file_path, len(tasks), total_annotations @classmethod def export_to_yolo( cls, project_id: str, status_filter: str = "all", include_metadata: bool = True ) -> Tuple[str, int, int]: """ Export project data to YOLO format. YOLO format creates a directory with: - images/ (or references to images) - labels/ (txt files with annotations) - classes.txt (class names) - data.yaml (dataset configuration) For simplicity, we create a JSON file containing YOLO-formatted data. Args: project_id: Project ID status_filter: Task status filter include_metadata: Whether to include metadata Returns: Tuple of (file_path, total_tasks, total_annotations) """ project = cls.get_project_data(project_id) if not project: raise ValueError(f"Project {project_id} not found") tasks = cls.get_tasks_with_annotations(project_id, status_filter) # Track classes class_map = {} class_id = 0 # YOLO data structure yolo_data = { "info": { "project_name": project["name"], "export_time": datetime.now().isoformat(), "format": "yolo" }, "classes": [], "images": [] } total_annotations = 0 for task in tasks: task_data = task["data"] image_url = "" image_width = 1.0 # Normalized image_height = 1.0 # Normalized if isinstance(task_data, dict): image_url = task_data.get("image", task_data.get("image_url", "")) image_width = task_data.get("width", 1.0) image_height = task_data.get("height", 1.0) image_entry = { "id": task["id"], "file_name": image_url or task["name"], "labels": [] } # Process annotations for ann in task["annotations"]: result = ann["result"] if isinstance(result, dict): result = result.get("annotations", result.get("result", [])) if not isinstance(result, list): result = [result] if result else [] for item in result: if not isinstance(item, dict): continue total_annotations += 1 # Extract label and bbox value = item.get("value", {}) labels = value.get("rectanglelabels", value.get("labels", [])) for label in labels: # Add class if new if label not in class_map: class_map[label] = class_id yolo_data["classes"].append(label) class_id += 1 # Calculate YOLO format bbox # YOLO format: class_id x_center y_center width height (normalized 0-1) x = value.get("x", 0) / 100.0 # Convert from percentage y = value.get("y", 0) / 100.0 w = value.get("width", 0) / 100.0 h = value.get("height", 0) / 100.0 # Convert to center coordinates x_center = x + w / 2 y_center = y + h / 2 image_entry["labels"].append({ "class_id": class_map[label], "class_name": label, "x_center": round(x_center, 6), "y_center": round(y_center, 6), "width": round(w, 6), "height": round(h, 6), "yolo_line": f"{class_map[label]} {x_center:.6f} {y_center:.6f} {w:.6f} {h:.6f}" }) yolo_data["images"].append(image_entry) # Write to file cls.ensure_export_dir() file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_yolo.json" file_path = os.path.join(cls.EXPORT_DIR, file_name) with open(file_path, 'w', encoding='utf-8') as f: json.dump(yolo_data, f, ensure_ascii=False, indent=2) return file_path, len(tasks), total_annotations @classmethod def execute_export( cls, job_id: str, project_id: str, format: str, status_filter: str, include_metadata: bool ) -> Dict: """ Execute the export job. Args: job_id: Export job ID project_id: Project ID format: Export format status_filter: Task status filter include_metadata: Whether to include metadata Returns: Export result with file path and statistics """ try: # Update status to processing cls.update_export_job(job_id, status=ExportStatus.PROCESSING.value) # Execute export based on format if format == ExportFormat.JSON.value: file_path, total_tasks, total_annotations = cls.export_to_json( project_id, status_filter, include_metadata ) elif format == ExportFormat.CSV.value: file_path, total_tasks, total_annotations = cls.export_to_csv( project_id, status_filter, include_metadata ) elif format == ExportFormat.COCO.value: file_path, total_tasks, total_annotations = cls.export_to_coco( project_id, status_filter, include_metadata ) elif format == ExportFormat.YOLO.value: file_path, total_tasks, total_annotations = cls.export_to_yolo( project_id, status_filter, include_metadata ) else: raise ValueError(f"Unsupported export format: {format}") # Update job with success cls.update_export_job( job_id, status=ExportStatus.COMPLETED.value, file_path=file_path, total_tasks=total_tasks, exported_tasks=total_tasks ) return { "success": True, "file_path": file_path, "total_tasks": total_tasks, "total_annotations": total_annotations } except Exception as e: # Update job with failure cls.update_export_job( job_id, status=ExportStatus.FAILED.value, error_message=str(e) ) return { "success": False, "error": str(e) }