""" Export Service for data export operations. Handles exporting annotations in various formats: JSON, CSV, COCO, YOLO. """ import json import csv import io import os import uuid from datetime import datetime from typing import List, Dict, Any, Optional, Tuple from database import get_db_connection from schemas.export import ( ExportFormat, ExportStatus, StatusFilter, TaskExportItem, AnnotationExportItem, ProjectExportData, COCOImage, COCOCategory, COCOAnnotation, COCOExportData ) class ExportService: """Service for data export operations.""" # Export directory EXPORT_DIR = "exports" @classmethod def ensure_export_dir(cls) -> str: """Ensure export directory exists.""" if not os.path.exists(cls.EXPORT_DIR): os.makedirs(cls.EXPORT_DIR) return cls.EXPORT_DIR @staticmethod def create_export_job( project_id: str, format: str, status_filter: str, include_metadata: bool, created_by: str ) -> str: """ Create a new export job record. Args: project_id: Project ID to export format: Export format status_filter: Task status filter include_metadata: Whether to include metadata created_by: User ID who created the job Returns: Export job ID """ job_id = f"export_{uuid.uuid4().hex[:12]}" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO export_jobs ( id, project_id, format, status, status_filter, include_metadata, created_by, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( job_id, project_id, format, ExportStatus.PENDING.value, status_filter, include_metadata, created_by, datetime.now().isoformat() )) return job_id @staticmethod def update_export_job( job_id: str, status: str = None, file_path: str = None, error_message: str = None, total_tasks: int = None, exported_tasks: int = None ) -> None: """Update export job status and details.""" with get_db_connection() as conn: cursor = conn.cursor() updates = [] params = [] if status: updates.append("status = ?") params.append(status) if status in [ExportStatus.COMPLETED.value, ExportStatus.FAILED.value]: updates.append("completed_at = ?") params.append(datetime.now().isoformat()) if file_path is not None: updates.append("file_path = ?") params.append(file_path) if error_message is not None: updates.append("error_message = ?") params.append(error_message) if total_tasks is not None: updates.append("total_tasks = ?") params.append(total_tasks) if exported_tasks is not None: updates.append("exported_tasks = ?") params.append(exported_tasks) if updates: params.append(job_id) cursor.execute(f""" UPDATE export_jobs SET {', '.join(updates)} WHERE id = ? """, tuple(params)) @staticmethod def get_export_job(job_id: str) -> Optional[Dict]: """Get export job by ID.""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM export_jobs WHERE id = ? """, (job_id,)) row = cursor.fetchone() if not row: return None return { "id": row["id"], "project_id": row["project_id"], "format": row["format"], "status": row["status"], "status_filter": row["status_filter"], "include_metadata": bool(row["include_metadata"]), "file_path": row["file_path"], "error_message": row["error_message"], "created_by": row["created_by"], "created_at": row["created_at"], "completed_at": row["completed_at"], "total_tasks": row["total_tasks"] or 0, "exported_tasks": row["exported_tasks"] or 0 } @staticmethod def get_project_data(project_id: str) -> Optional[Dict]: """Get project basic info.""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT id, name, description, config, created_at FROM projects WHERE id = ? """, (project_id,)) row = cursor.fetchone() if not row: return None return { "id": row["id"], "name": row["name"], "description": row["description"], "config": row["config"], "created_at": row["created_at"] } @staticmethod def get_tasks_with_annotations( project_id: str, status_filter: str = "all" ) -> List[Dict]: """ Get all tasks with their annotations for a project. Args: project_id: Project ID status_filter: Filter by task status Returns: List of tasks with annotations """ with get_db_connection() as conn: cursor = conn.cursor() # Build query based on status filter query = """ SELECT t.id, t.name, t.data, t.status, t.assigned_to, t.created_at FROM tasks t WHERE t.project_id = ? """ params = [project_id] if status_filter != "all": query += " AND t.status = ?" params.append(status_filter) query += " ORDER BY t.created_at" cursor.execute(query, tuple(params)) task_rows = cursor.fetchall() tasks = [] for task_row in task_rows: task_id = task_row["id"] # Get annotations for this task cursor.execute(""" SELECT id, task_id, user_id, result, created_at, updated_at FROM annotations WHERE task_id = ? ORDER BY created_at """, (task_id,)) annotation_rows = cursor.fetchall() annotations = [] for ann_row in annotation_rows: result = ann_row["result"] if isinstance(result, str): try: result = json.loads(result) except json.JSONDecodeError: pass annotations.append({ "id": ann_row["id"], "task_id": ann_row["task_id"], "user_id": ann_row["user_id"], "result": result, "created_at": str(ann_row["created_at"]), "updated_at": str(ann_row["updated_at"]) }) # Parse task data task_data = task_row["data"] if isinstance(task_data, str): try: task_data = json.loads(task_data) except json.JSONDecodeError: pass tasks.append({ "id": task_row["id"], "name": task_row["name"], "data": task_data, "status": task_row["status"], "assigned_to": task_row["assigned_to"], "created_at": str(task_row["created_at"]), "annotations": annotations }) return tasks @classmethod def export_to_json( cls, project_id: str, status_filter: str = "all", include_metadata: bool = True ) -> Tuple[str, int, int]: """ Export project data to JSON format. Args: project_id: Project ID status_filter: Task status filter include_metadata: Whether to include metadata Returns: Tuple of (file_path, total_tasks, total_annotations) """ project = cls.get_project_data(project_id) if not project: raise ValueError(f"Project {project_id} not found") tasks = cls.get_tasks_with_annotations(project_id, status_filter) total_annotations = sum(len(t["annotations"]) for t in tasks) export_data = { "project_id": project["id"], "project_name": project["name"], "export_format": "json", "export_time": datetime.now().isoformat(), "total_tasks": len(tasks), "total_annotations": total_annotations, "tasks": tasks } if include_metadata: export_data["project_description"] = project["description"] export_data["config"] = project["config"] # Write to file cls.ensure_export_dir() file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" file_path = os.path.join(cls.EXPORT_DIR, file_name) with open(file_path, 'w', encoding='utf-8') as f: json.dump(export_data, f, ensure_ascii=False, indent=2) return file_path, len(tasks), total_annotations @classmethod def export_to_csv( cls, project_id: str, status_filter: str = "all", include_metadata: bool = True ) -> Tuple[str, int, int]: """ Export project data to CSV format. Args: project_id: Project ID status_filter: Task status filter include_metadata: Whether to include metadata Returns: Tuple of (file_path, total_tasks, total_annotations) """ project = cls.get_project_data(project_id) if not project: raise ValueError(f"Project {project_id} not found") tasks = cls.get_tasks_with_annotations(project_id, status_filter) total_annotations = 0 # Prepare CSV data cls.ensure_export_dir() file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" file_path = os.path.join(cls.EXPORT_DIR, file_name) with open(file_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) # Write header headers = [ "task_id", "task_name", "task_status", "task_data", "annotation_id", "user_id", "annotation_result", "annotation_created_at", "annotation_updated_at" ] if include_metadata: headers.extend(["assigned_to", "task_created_at"]) writer.writerow(headers) # Write data rows for task in tasks: if task["annotations"]: for ann in task["annotations"]: total_annotations += 1 row = [ task["id"], task["name"], task["status"], json.dumps(task["data"], ensure_ascii=False), ann["id"], ann["user_id"], json.dumps(ann["result"], ensure_ascii=False), ann["created_at"], ann["updated_at"] ] if include_metadata: row.extend([task["assigned_to"], task["created_at"]]) writer.writerow(row) else: # Task without annotations row = [ task["id"], task["name"], task["status"], json.dumps(task["data"], ensure_ascii=False), "", "", "", "", "" ] if include_metadata: row.extend([task["assigned_to"], task["created_at"]]) writer.writerow(row) return file_path, len(tasks), total_annotations @classmethod def export_to_coco( cls, project_id: str, status_filter: str = "all", include_metadata: bool = True ) -> Tuple[str, int, int]: """ Export project data to COCO format. COCO format is primarily for object detection tasks. Args: project_id: Project ID status_filter: Task status filter include_metadata: Whether to include metadata Returns: Tuple of (file_path, total_tasks, total_annotations) """ project = cls.get_project_data(project_id) if not project: raise ValueError(f"Project {project_id} not found") tasks = cls.get_tasks_with_annotations(project_id, status_filter) # Initialize COCO structure coco_data = { "info": { "description": project["name"], "version": "1.0", "year": datetime.now().year, "contributor": "Annotation Platform", "date_created": datetime.now().isoformat() }, "licenses": [], "images": [], "annotations": [], "categories": [] } # Track categories category_map = {} category_id = 1 # Track annotation ID annotation_id = 1 total_annotations = 0 for image_id, task in enumerate(tasks, start=1): # Add image entry task_data = task["data"] image_url = "" if isinstance(task_data, dict): image_url = task_data.get("image", task_data.get("image_url", "")) coco_data["images"].append({ "id": image_id, "file_name": image_url or task["name"], "width": 0, "height": 0 }) # Process annotations for ann in task["annotations"]: result = ann["result"] if isinstance(result, dict): result = result.get("annotations", result.get("result", [])) if not isinstance(result, list): result = [result] if result else [] for item in result: if not isinstance(item, dict): continue total_annotations += 1 # Extract label and bbox value = item.get("value", {}) labels = value.get("rectanglelabels", value.get("labels", [])) for label in labels: # Add category if new if label not in category_map: category_map[label] = category_id coco_data["categories"].append({ "id": category_id, "name": label, "supercategory": "" }) category_id += 1 # Calculate bbox (COCO format: [x, y, width, height]) x = value.get("x", 0) y = value.get("y", 0) width = value.get("width", 0) height = value.get("height", 0) coco_data["annotations"].append({ "id": annotation_id, "image_id": image_id, "category_id": category_map[label], "bbox": [x, y, width, height], "area": width * height, "segmentation": [], "iscrowd": 0 }) annotation_id += 1 # Write to file cls.ensure_export_dir() file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_coco.json" file_path = os.path.join(cls.EXPORT_DIR, file_name) with open(file_path, 'w', encoding='utf-8') as f: json.dump(coco_data, f, ensure_ascii=False, indent=2) return file_path, len(tasks), total_annotations @classmethod def export_to_yolo( cls, project_id: str, status_filter: str = "all", include_metadata: bool = True ) -> Tuple[str, int, int]: """ Export project data to YOLO format. YOLO format creates a directory with: - images/ (or references to images) - labels/ (txt files with annotations) - classes.txt (class names) - data.yaml (dataset configuration) For simplicity, we create a JSON file containing YOLO-formatted data. Args: project_id: Project ID status_filter: Task status filter include_metadata: Whether to include metadata Returns: Tuple of (file_path, total_tasks, total_annotations) """ project = cls.get_project_data(project_id) if not project: raise ValueError(f"Project {project_id} not found") tasks = cls.get_tasks_with_annotations(project_id, status_filter) # Track classes class_map = {} class_id = 0 # YOLO data structure yolo_data = { "info": { "project_name": project["name"], "export_time": datetime.now().isoformat(), "format": "yolo" }, "classes": [], "images": [] } total_annotations = 0 for task in tasks: task_data = task["data"] image_url = "" image_width = 1.0 # Normalized image_height = 1.0 # Normalized if isinstance(task_data, dict): image_url = task_data.get("image", task_data.get("image_url", "")) image_width = task_data.get("width", 1.0) image_height = task_data.get("height", 1.0) image_entry = { "id": task["id"], "file_name": image_url or task["name"], "labels": [] } # Process annotations for ann in task["annotations"]: result = ann["result"] if isinstance(result, dict): result = result.get("annotations", result.get("result", [])) if not isinstance(result, list): result = [result] if result else [] for item in result: if not isinstance(item, dict): continue total_annotations += 1 # Extract label and bbox value = item.get("value", {}) labels = value.get("rectanglelabels", value.get("labels", [])) for label in labels: # Add class if new if label not in class_map: class_map[label] = class_id yolo_data["classes"].append(label) class_id += 1 # Calculate YOLO format bbox # YOLO format: class_id x_center y_center width height (normalized 0-1) x = value.get("x", 0) / 100.0 # Convert from percentage y = value.get("y", 0) / 100.0 w = value.get("width", 0) / 100.0 h = value.get("height", 0) / 100.0 # Convert to center coordinates x_center = x + w / 2 y_center = y + h / 2 image_entry["labels"].append({ "class_id": class_map[label], "class_name": label, "x_center": round(x_center, 6), "y_center": round(y_center, 6), "width": round(w, 6), "height": round(h, 6), "yolo_line": f"{class_map[label]} {x_center:.6f} {y_center:.6f} {w:.6f} {h:.6f}" }) yolo_data["images"].append(image_entry) # Write to file cls.ensure_export_dir() file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_yolo.json" file_path = os.path.join(cls.EXPORT_DIR, file_name) with open(file_path, 'w', encoding='utf-8') as f: json.dump(yolo_data, f, ensure_ascii=False, indent=2) return file_path, len(tasks), total_annotations @classmethod def export_to_pascal_voc( cls, project_id: str, status_filter: str = "all", include_metadata: bool = True ) -> Tuple[str, int, int]: """ Export project data to PascalVOC format. PascalVOC format is a classic object detection format using XML. Returns a JSON file containing PascalVOC XML content for each image. Args: project_id: Project ID status_filter: Task status filter include_metadata: Whether to include metadata Returns: Tuple of (file_path, total_tasks, total_annotations) """ project = cls.get_project_data(project_id) if not project: raise ValueError(f"Project {project_id} not found") tasks = cls.get_tasks_with_annotations(project_id, status_filter) voc_data = [] total_annotations = 0 for idx, task in enumerate(tasks): task_data = task["data"] image_url = "" img_width = 0 img_height = 0 if isinstance(task_data, dict): image_url = task_data.get("image", task_data.get("image_url", "")) img_width = task_data.get("width", 0) img_height = task_data.get("height", 0) # Extract filename from URL image_filename = image_url.split('/')[-1] if image_url else f"image_{idx + 1}.jpg" objects = [] # Process annotations for ann in task["annotations"]: result = ann["result"] if isinstance(result, dict): result = result.get("annotations", result.get("result", [])) if not isinstance(result, list): result = [result] if result else [] for item in result: if not isinstance(item, dict): continue total_annotations += 1 value = item.get("value", {}) item_type = item.get("type", "") if item_type == "rectanglelabels": labels = value.get("rectanglelabels", []) for label in labels: x_pct = value.get("x", 0) y_pct = value.get("y", 0) w_pct = value.get("width", 0) h_pct = value.get("height", 0) if img_width > 0 and img_height > 0: xmin = int(x_pct * img_width / 100) ymin = int(y_pct * img_height / 100) xmax = int((x_pct + w_pct) * img_width / 100) ymax = int((y_pct + h_pct) * img_height / 100) else: xmin = x_pct ymin = y_pct xmax = x_pct + w_pct ymax = y_pct + h_pct objects.append({ "name": label, "pose": "Unspecified", "truncated": 0, "difficult": 0, "bndbox": { "xmin": xmin, "ymin": ymin, "xmax": xmax, "ymax": ymax } }) elif item_type == "polygonlabels": labels = value.get("polygonlabels", []) points = value.get("points", []) for label in labels: if points: x_coords = [p[0] for p in points] y_coords = [p[1] for p in points] if img_width > 0 and img_height > 0: xmin = int(min(x_coords) * img_width / 100) ymin = int(min(y_coords) * img_height / 100) xmax = int(max(x_coords) * img_width / 100) ymax = int(max(y_coords) * img_height / 100) polygon_points = [[int(p[0] * img_width / 100), int(p[1] * img_height / 100)] for p in points] else: xmin = min(x_coords) ymin = min(y_coords) xmax = max(x_coords) ymax = max(y_coords) polygon_points = points objects.append({ "name": label, "pose": "Unspecified", "truncated": 0, "difficult": 0, "bndbox": { "xmin": xmin, "ymin": ymin, "xmax": xmax, "ymax": ymax }, "polygon": polygon_points }) # Generate PascalVOC XML xml_content = cls._generate_voc_xml(image_filename, img_width, img_height, objects) voc_data.append({ "image": image_url, "filename": image_filename, "xml_content": xml_content, "objects": objects }) # Write to file cls.ensure_export_dir() file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_pascal_voc.json" file_path = os.path.join(cls.EXPORT_DIR, file_name) with open(file_path, 'w', encoding='utf-8') as f: json.dump(voc_data, f, ensure_ascii=False, indent=2) return file_path, len(tasks), total_annotations @staticmethod def _generate_voc_xml(filename: str, width: int, height: int, objects: List[Dict]) -> str: """Generate PascalVOC XML string.""" xml_lines = [ '', '', f' {filename}', ' ', ' Annotation Platform', ' ', ' ', f' {width}', f' {height}', ' 3', ' ', ' 0' ] for obj in objects: xml_lines.append(' ') xml_lines.append(f' {obj["name"]}') xml_lines.append(f' {obj.get("pose", "Unspecified")}') xml_lines.append(f' {obj.get("truncated", 0)}') xml_lines.append(f' {obj.get("difficult", 0)}') xml_lines.append(' ') xml_lines.append(f' {obj["bndbox"]["xmin"]}') xml_lines.append(f' {obj["bndbox"]["ymin"]}') xml_lines.append(f' {obj["bndbox"]["xmax"]}') xml_lines.append(f' {obj["bndbox"]["ymax"]}') xml_lines.append(' ') if 'polygon' in obj: xml_lines.append(' ') for point in obj['polygon']: xml_lines.append(f' {point[0]}{point[1]}') xml_lines.append(' ') xml_lines.append(' ') xml_lines.append('') return '\n'.join(xml_lines) @classmethod def export_to_sharegpt( cls, project_id: str, status_filter: str = "all", include_metadata: bool = True ) -> Tuple[str, int, int]: """ Export project data to ShareGPT format. ShareGPT format is used for conversation/dialogue model training. Args: project_id: Project ID status_filter: Task status filter include_metadata: Whether to include metadata Returns: Tuple of (file_path, total_tasks, total_annotations) """ project = cls.get_project_data(project_id) if not project: raise ValueError(f"Project {project_id} not found") tasks = cls.get_tasks_with_annotations(project_id, status_filter) sharegpt_data = [] total_annotations = 0 for task in tasks: task_data = task["data"] # Get text content text = "" if isinstance(task_data, dict): text = task_data.get("text", task_data.get("content", "")) elif isinstance(task_data, str): text = task_data # Process annotations for ann in task["annotations"]: total_annotations += 1 result = ann["result"] # Extract label/classification result label = "" if isinstance(result, dict): choices = result.get("choices", result.get("result", [])) if isinstance(choices, list) and choices: if isinstance(choices[0], dict): label = choices[0].get("value", {}).get("choices", [""])[0] else: label = str(choices[0]) elif isinstance(choices, str): label = choices elif isinstance(result, list) and result: first_item = result[0] if isinstance(first_item, dict): value = first_item.get("value", {}) choices = value.get("choices", value.get("labels", [])) if choices: label = choices[0] if isinstance(choices, list) else str(choices) if text and label: conversation = { "conversations": [ {"from": "human", "value": text}, {"from": "gpt", "value": label} ] } if include_metadata: conversation["id"] = task["id"] conversation["task_name"] = task["name"] sharegpt_data.append(conversation) # Write to file cls.ensure_export_dir() file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_sharegpt.json" file_path = os.path.join(cls.EXPORT_DIR, file_name) with open(file_path, 'w', encoding='utf-8') as f: json.dump(sharegpt_data, f, ensure_ascii=False, indent=2) return file_path, len(tasks), total_annotations @classmethod def export_to_alpaca( cls, project_id: str, status_filter: str = "all", include_metadata: bool = True ) -> Tuple[str, int, int]: """ Export project data to Alpaca format. Alpaca format is used for instruction fine-tuning of LLMs. Format: {"instruction": "...", "input": "...", "output": "..."} Args: project_id: Project ID status_filter: Task status filter include_metadata: Whether to include metadata Returns: Tuple of (file_path, total_tasks, total_annotations) """ project = cls.get_project_data(project_id) if not project: raise ValueError(f"Project {project_id} not found") tasks = cls.get_tasks_with_annotations(project_id, status_filter) alpaca_data = [] total_annotations = 0 for task in tasks: task_data = task["data"] # Get text content text = "" if isinstance(task_data, dict): text = task_data.get("text", task_data.get("content", "")) elif isinstance(task_data, str): text = task_data # Process annotations for ann in task["annotations"]: total_annotations += 1 result = ann["result"] # Extract label/classification result label = "" if isinstance(result, dict): choices = result.get("choices", result.get("result", [])) if isinstance(choices, list) and choices: if isinstance(choices[0], dict): label = choices[0].get("value", {}).get("choices", [""])[0] else: label = str(choices[0]) elif isinstance(choices, str): label = choices elif isinstance(result, list) and result: first_item = result[0] if isinstance(first_item, dict): value = first_item.get("value", {}) choices = value.get("choices", value.get("labels", [])) if choices: label = choices[0] if isinstance(choices, list) else str(choices) if text and label: alpaca_item = { "instruction": "请对以下文本进行分类", "input": text, "output": label } if include_metadata: alpaca_item["id"] = task["id"] alpaca_item["task_name"] = task["name"] alpaca_data.append(alpaca_item) # Write to file cls.ensure_export_dir() file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_alpaca.json" file_path = os.path.join(cls.EXPORT_DIR, file_name) with open(file_path, 'w', encoding='utf-8') as f: json.dump(alpaca_data, f, ensure_ascii=False, indent=2) return file_path, len(tasks), total_annotations @classmethod def execute_export( cls, job_id: str, project_id: str, format: str, status_filter: str, include_metadata: bool ) -> Dict: """ Execute the export job. Args: job_id: Export job ID project_id: Project ID format: Export format status_filter: Task status filter include_metadata: Whether to include metadata Returns: Export result with file path and statistics """ try: # Update status to processing cls.update_export_job(job_id, status=ExportStatus.PROCESSING.value) # Execute export based on format if format == ExportFormat.JSON.value: file_path, total_tasks, total_annotations = cls.export_to_json( project_id, status_filter, include_metadata ) elif format == ExportFormat.CSV.value: file_path, total_tasks, total_annotations = cls.export_to_csv( project_id, status_filter, include_metadata ) elif format == ExportFormat.COCO.value: file_path, total_tasks, total_annotations = cls.export_to_coco( project_id, status_filter, include_metadata ) elif format == ExportFormat.YOLO.value: file_path, total_tasks, total_annotations = cls.export_to_yolo( project_id, status_filter, include_metadata ) elif format == ExportFormat.PASCAL_VOC.value: file_path, total_tasks, total_annotations = cls.export_to_pascal_voc( project_id, status_filter, include_metadata ) elif format == ExportFormat.SHAREGPT.value: file_path, total_tasks, total_annotations = cls.export_to_sharegpt( project_id, status_filter, include_metadata ) elif format == ExportFormat.ALPACA.value: file_path, total_tasks, total_annotations = cls.export_to_alpaca( project_id, status_filter, include_metadata ) else: raise ValueError(f"Unsupported export format: {format}") # Update job with success cls.update_export_job( job_id, status=ExportStatus.COMPLETED.value, file_path=file_path, total_tasks=total_tasks, exported_tasks=total_tasks ) return { "success": True, "file_path": file_path, "total_tasks": total_tasks, "total_annotations": total_annotations } except Exception as e: # Update job with failure cls.update_export_job( job_id, status=ExportStatus.FAILED.value, error_message=str(e) ) return { "success": False, "error": str(e) }