| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076 |
- """
- Export Service for data export operations.
- Handles exporting annotations in various formats: JSON, CSV, COCO, YOLO.
- """
- import json
- import csv
- import io
- import os
- import uuid
- from datetime import datetime
- from typing import List, Dict, Any, Optional, Tuple
- from database import get_db_connection
- from schemas.export import (
- ExportFormat, ExportStatus, StatusFilter,
- TaskExportItem, AnnotationExportItem, ProjectExportData,
- COCOImage, COCOCategory, COCOAnnotation, COCOExportData
- )
- class ExportService:
- """Service for data export operations."""
-
- # Export directory
- EXPORT_DIR = "exports"
-
- @classmethod
- def ensure_export_dir(cls) -> str:
- """Ensure export directory exists."""
- if not os.path.exists(cls.EXPORT_DIR):
- os.makedirs(cls.EXPORT_DIR)
- return cls.EXPORT_DIR
-
- @staticmethod
- def create_export_job(
- project_id: str,
- format: str,
- status_filter: str,
- include_metadata: bool,
- created_by: str
- ) -> str:
- """
- Create a new export job record.
-
- Args:
- project_id: Project ID to export
- format: Export format
- status_filter: Task status filter
- include_metadata: Whether to include metadata
- created_by: User ID who created the job
-
- Returns:
- Export job ID
- """
- job_id = f"export_{uuid.uuid4().hex[:12]}"
-
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute("""
- INSERT INTO export_jobs (
- id, project_id, format, status, status_filter,
- include_metadata, created_by, created_at
- )
- VALUES (?, ?, ?, ?, ?, ?, ?, ?)
- """, (
- job_id, project_id, format, ExportStatus.PENDING.value,
- status_filter, include_metadata, created_by,
- datetime.now().isoformat()
- ))
-
- return job_id
-
- @staticmethod
- def update_export_job(
- job_id: str,
- status: str = None,
- file_path: str = None,
- error_message: str = None,
- total_tasks: int = None,
- exported_tasks: int = None
- ) -> None:
- """Update export job status and details."""
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- updates = []
- params = []
-
- if status:
- updates.append("status = ?")
- params.append(status)
- if status in [ExportStatus.COMPLETED.value, ExportStatus.FAILED.value]:
- updates.append("completed_at = ?")
- params.append(datetime.now().isoformat())
-
- if file_path is not None:
- updates.append("file_path = ?")
- params.append(file_path)
-
- if error_message is not None:
- updates.append("error_message = ?")
- params.append(error_message)
-
- if total_tasks is not None:
- updates.append("total_tasks = ?")
- params.append(total_tasks)
-
- if exported_tasks is not None:
- updates.append("exported_tasks = ?")
- params.append(exported_tasks)
-
- if updates:
- params.append(job_id)
- cursor.execute(f"""
- UPDATE export_jobs
- SET {', '.join(updates)}
- WHERE id = ?
- """, tuple(params))
-
- @staticmethod
- def get_export_job(job_id: str) -> Optional[Dict]:
- """Get export job by ID."""
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute("""
- SELECT * FROM export_jobs WHERE id = ?
- """, (job_id,))
- row = cursor.fetchone()
-
- if not row:
- return None
-
- return {
- "id": row["id"],
- "project_id": row["project_id"],
- "format": row["format"],
- "status": row["status"],
- "status_filter": row["status_filter"],
- "include_metadata": bool(row["include_metadata"]),
- "file_path": row["file_path"],
- "error_message": row["error_message"],
- "created_by": row["created_by"],
- "created_at": row["created_at"],
- "completed_at": row["completed_at"],
- "total_tasks": row["total_tasks"] or 0,
- "exported_tasks": row["exported_tasks"] or 0
- }
- @staticmethod
- def get_project_data(project_id: str) -> Optional[Dict]:
- """Get project basic info."""
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute("""
- SELECT id, name, description, config, created_at
- FROM projects WHERE id = ?
- """, (project_id,))
- row = cursor.fetchone()
-
- if not row:
- return None
-
- return {
- "id": row["id"],
- "name": row["name"],
- "description": row["description"],
- "config": row["config"],
- "created_at": row["created_at"]
- }
-
- @staticmethod
- def get_tasks_with_annotations(
- project_id: str,
- status_filter: str = "all"
- ) -> List[Dict]:
- """
- Get all tasks with their annotations for a project.
-
- Args:
- project_id: Project ID
- status_filter: Filter by task status
-
- Returns:
- List of tasks with annotations
- """
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # Build query based on status filter
- query = """
- SELECT t.id, t.name, t.data, t.status, t.assigned_to, t.created_at
- FROM tasks t
- WHERE t.project_id = ?
- """
- params = [project_id]
-
- if status_filter != "all":
- query += " AND t.status = ?"
- params.append(status_filter)
-
- query += " ORDER BY t.created_at"
-
- cursor.execute(query, tuple(params))
- task_rows = cursor.fetchall()
-
- tasks = []
- for task_row in task_rows:
- task_id = task_row["id"]
-
- # Get annotations for this task
- cursor.execute("""
- SELECT id, task_id, user_id, result, created_at, updated_at
- FROM annotations
- WHERE task_id = ?
- ORDER BY created_at
- """, (task_id,))
- annotation_rows = cursor.fetchall()
-
- annotations = []
- for ann_row in annotation_rows:
- result = ann_row["result"]
- if isinstance(result, str):
- try:
- result = json.loads(result)
- except json.JSONDecodeError:
- pass
-
- annotations.append({
- "id": ann_row["id"],
- "task_id": ann_row["task_id"],
- "user_id": ann_row["user_id"],
- "result": result,
- "created_at": str(ann_row["created_at"]),
- "updated_at": str(ann_row["updated_at"])
- })
-
- # Parse task data
- task_data = task_row["data"]
- if isinstance(task_data, str):
- try:
- task_data = json.loads(task_data)
- except json.JSONDecodeError:
- pass
-
- tasks.append({
- "id": task_row["id"],
- "name": task_row["name"],
- "data": task_data,
- "status": task_row["status"],
- "assigned_to": task_row["assigned_to"],
- "created_at": str(task_row["created_at"]),
- "annotations": annotations
- })
-
- return tasks
-
- @classmethod
- def export_to_json(
- cls,
- project_id: str,
- status_filter: str = "all",
- include_metadata: bool = True
- ) -> Tuple[str, int, int]:
- """
- Export project data to JSON format.
-
- Args:
- project_id: Project ID
- status_filter: Task status filter
- include_metadata: Whether to include metadata
-
- Returns:
- Tuple of (file_path, total_tasks, total_annotations)
- """
- project = cls.get_project_data(project_id)
- if not project:
- raise ValueError(f"Project {project_id} not found")
-
- tasks = cls.get_tasks_with_annotations(project_id, status_filter)
-
- total_annotations = sum(len(t["annotations"]) for t in tasks)
-
- export_data = {
- "project_id": project["id"],
- "project_name": project["name"],
- "export_format": "json",
- "export_time": datetime.now().isoformat(),
- "total_tasks": len(tasks),
- "total_annotations": total_annotations,
- "tasks": tasks
- }
-
- if include_metadata:
- export_data["project_description"] = project["description"]
- export_data["config"] = project["config"]
-
- # Write to file
- cls.ensure_export_dir()
- file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
- file_path = os.path.join(cls.EXPORT_DIR, file_name)
-
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(export_data, f, ensure_ascii=False, indent=2)
-
- return file_path, len(tasks), total_annotations
- @classmethod
- def export_to_csv(
- cls,
- project_id: str,
- status_filter: str = "all",
- include_metadata: bool = True
- ) -> Tuple[str, int, int]:
- """
- Export project data to CSV format.
-
- Args:
- project_id: Project ID
- status_filter: Task status filter
- include_metadata: Whether to include metadata
-
- Returns:
- Tuple of (file_path, total_tasks, total_annotations)
- """
- project = cls.get_project_data(project_id)
- if not project:
- raise ValueError(f"Project {project_id} not found")
-
- tasks = cls.get_tasks_with_annotations(project_id, status_filter)
-
- total_annotations = 0
-
- # Prepare CSV data
- cls.ensure_export_dir()
- file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
- file_path = os.path.join(cls.EXPORT_DIR, file_name)
-
- with open(file_path, 'w', newline='', encoding='utf-8') as f:
- writer = csv.writer(f)
-
- # Write header
- headers = [
- "task_id", "task_name", "task_status", "task_data",
- "annotation_id", "user_id", "annotation_result",
- "annotation_created_at", "annotation_updated_at"
- ]
- if include_metadata:
- headers.extend(["assigned_to", "task_created_at"])
- writer.writerow(headers)
-
- # Write data rows
- for task in tasks:
- if task["annotations"]:
- for ann in task["annotations"]:
- total_annotations += 1
- row = [
- task["id"],
- task["name"],
- task["status"],
- json.dumps(task["data"], ensure_ascii=False),
- ann["id"],
- ann["user_id"],
- json.dumps(ann["result"], ensure_ascii=False),
- ann["created_at"],
- ann["updated_at"]
- ]
- if include_metadata:
- row.extend([task["assigned_to"], task["created_at"]])
- writer.writerow(row)
- else:
- # Task without annotations
- row = [
- task["id"],
- task["name"],
- task["status"],
- json.dumps(task["data"], ensure_ascii=False),
- "", "", "", "", ""
- ]
- if include_metadata:
- row.extend([task["assigned_to"], task["created_at"]])
- writer.writerow(row)
-
- return file_path, len(tasks), total_annotations
-
- @classmethod
- def export_to_coco(
- cls,
- project_id: str,
- status_filter: str = "all",
- include_metadata: bool = True
- ) -> Tuple[str, int, int]:
- """
- Export project data to COCO format.
-
- COCO format is primarily for object detection tasks.
-
- Args:
- project_id: Project ID
- status_filter: Task status filter
- include_metadata: Whether to include metadata
-
- Returns:
- Tuple of (file_path, total_tasks, total_annotations)
- """
- project = cls.get_project_data(project_id)
- if not project:
- raise ValueError(f"Project {project_id} not found")
-
- tasks = cls.get_tasks_with_annotations(project_id, status_filter)
-
- # Initialize COCO structure
- coco_data = {
- "info": {
- "description": project["name"],
- "version": "1.0",
- "year": datetime.now().year,
- "contributor": "Annotation Platform",
- "date_created": datetime.now().isoformat()
- },
- "licenses": [],
- "images": [],
- "annotations": [],
- "categories": []
- }
-
- # Track categories
- category_map = {}
- category_id = 1
-
- # Track annotation ID
- annotation_id = 1
- total_annotations = 0
-
- for image_id, task in enumerate(tasks, start=1):
- # Add image entry
- task_data = task["data"]
- image_url = ""
- if isinstance(task_data, dict):
- image_url = task_data.get("image", task_data.get("image_url", ""))
-
- coco_data["images"].append({
- "id": image_id,
- "file_name": image_url or task["name"],
- "width": 0,
- "height": 0
- })
-
- # Process annotations
- for ann in task["annotations"]:
- result = ann["result"]
- if isinstance(result, dict):
- result = result.get("annotations", result.get("result", []))
- if not isinstance(result, list):
- result = [result] if result else []
-
- for item in result:
- if not isinstance(item, dict):
- continue
-
- total_annotations += 1
-
- # Extract label and bbox
- value = item.get("value", {})
- labels = value.get("rectanglelabels", value.get("labels", []))
-
- for label in labels:
- # Add category if new
- if label not in category_map:
- category_map[label] = category_id
- coco_data["categories"].append({
- "id": category_id,
- "name": label,
- "supercategory": ""
- })
- category_id += 1
-
- # Calculate bbox (COCO format: [x, y, width, height])
- x = value.get("x", 0)
- y = value.get("y", 0)
- width = value.get("width", 0)
- height = value.get("height", 0)
-
- coco_data["annotations"].append({
- "id": annotation_id,
- "image_id": image_id,
- "category_id": category_map[label],
- "bbox": [x, y, width, height],
- "area": width * height,
- "segmentation": [],
- "iscrowd": 0
- })
- annotation_id += 1
-
- # Write to file
- cls.ensure_export_dir()
- file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_coco.json"
- file_path = os.path.join(cls.EXPORT_DIR, file_name)
-
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(coco_data, f, ensure_ascii=False, indent=2)
-
- return file_path, len(tasks), total_annotations
- @classmethod
- def export_to_yolo(
- cls,
- project_id: str,
- status_filter: str = "all",
- include_metadata: bool = True
- ) -> Tuple[str, int, int]:
- """
- Export project data to YOLO format.
-
- YOLO format creates a directory with:
- - images/ (or references to images)
- - labels/ (txt files with annotations)
- - classes.txt (class names)
- - data.yaml (dataset configuration)
-
- For simplicity, we create a JSON file containing YOLO-formatted data.
-
- Args:
- project_id: Project ID
- status_filter: Task status filter
- include_metadata: Whether to include metadata
-
- Returns:
- Tuple of (file_path, total_tasks, total_annotations)
- """
- project = cls.get_project_data(project_id)
- if not project:
- raise ValueError(f"Project {project_id} not found")
-
- tasks = cls.get_tasks_with_annotations(project_id, status_filter)
-
- # Track classes
- class_map = {}
- class_id = 0
-
- # YOLO data structure
- yolo_data = {
- "info": {
- "project_name": project["name"],
- "export_time": datetime.now().isoformat(),
- "format": "yolo"
- },
- "classes": [],
- "images": []
- }
-
- total_annotations = 0
-
- for task in tasks:
- task_data = task["data"]
- image_url = ""
- image_width = 1.0 # Normalized
- image_height = 1.0 # Normalized
-
- if isinstance(task_data, dict):
- image_url = task_data.get("image", task_data.get("image_url", ""))
- image_width = task_data.get("width", 1.0)
- image_height = task_data.get("height", 1.0)
-
- image_entry = {
- "id": task["id"],
- "file_name": image_url or task["name"],
- "labels": []
- }
-
- # Process annotations
- for ann in task["annotations"]:
- result = ann["result"]
- if isinstance(result, dict):
- result = result.get("annotations", result.get("result", []))
- if not isinstance(result, list):
- result = [result] if result else []
-
- for item in result:
- if not isinstance(item, dict):
- continue
-
- total_annotations += 1
-
- # Extract label and bbox
- value = item.get("value", {})
- labels = value.get("rectanglelabels", value.get("labels", []))
-
- for label in labels:
- # Add class if new
- if label not in class_map:
- class_map[label] = class_id
- yolo_data["classes"].append(label)
- class_id += 1
-
- # Calculate YOLO format bbox
- # YOLO format: class_id x_center y_center width height (normalized 0-1)
- x = value.get("x", 0) / 100.0 # Convert from percentage
- y = value.get("y", 0) / 100.0
- w = value.get("width", 0) / 100.0
- h = value.get("height", 0) / 100.0
-
- # Convert to center coordinates
- x_center = x + w / 2
- y_center = y + h / 2
-
- image_entry["labels"].append({
- "class_id": class_map[label],
- "class_name": label,
- "x_center": round(x_center, 6),
- "y_center": round(y_center, 6),
- "width": round(w, 6),
- "height": round(h, 6),
- "yolo_line": f"{class_map[label]} {x_center:.6f} {y_center:.6f} {w:.6f} {h:.6f}"
- })
-
- yolo_data["images"].append(image_entry)
-
- # Write to file
- cls.ensure_export_dir()
- file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_yolo.json"
- file_path = os.path.join(cls.EXPORT_DIR, file_name)
-
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(yolo_data, f, ensure_ascii=False, indent=2)
-
- return file_path, len(tasks), total_annotations
-
- @classmethod
- def export_to_pascal_voc(
- cls,
- project_id: str,
- status_filter: str = "all",
- include_metadata: bool = True
- ) -> Tuple[str, int, int]:
- """
- Export project data to PascalVOC format.
-
- PascalVOC format is a classic object detection format using XML.
- Returns a JSON file containing PascalVOC XML content for each image.
-
- Args:
- project_id: Project ID
- status_filter: Task status filter
- include_metadata: Whether to include metadata
-
- Returns:
- Tuple of (file_path, total_tasks, total_annotations)
- """
- project = cls.get_project_data(project_id)
- if not project:
- raise ValueError(f"Project {project_id} not found")
-
- tasks = cls.get_tasks_with_annotations(project_id, status_filter)
-
- voc_data = []
- total_annotations = 0
-
- for idx, task in enumerate(tasks):
- task_data = task["data"]
- image_url = ""
- img_width = 0
- img_height = 0
-
- if isinstance(task_data, dict):
- image_url = task_data.get("image", task_data.get("image_url", ""))
- img_width = task_data.get("width", 0)
- img_height = task_data.get("height", 0)
-
- # Extract filename from URL
- image_filename = image_url.split('/')[-1] if image_url else f"image_{idx + 1}.jpg"
-
- objects = []
-
- # Process annotations
- for ann in task["annotations"]:
- result = ann["result"]
- if isinstance(result, dict):
- result = result.get("annotations", result.get("result", []))
- if not isinstance(result, list):
- result = [result] if result else []
-
- for item in result:
- if not isinstance(item, dict):
- continue
-
- total_annotations += 1
-
- value = item.get("value", {})
- item_type = item.get("type", "")
-
- if item_type == "rectanglelabels":
- labels = value.get("rectanglelabels", [])
- for label in labels:
- x_pct = value.get("x", 0)
- y_pct = value.get("y", 0)
- w_pct = value.get("width", 0)
- h_pct = value.get("height", 0)
-
- if img_width > 0 and img_height > 0:
- xmin = int(x_pct * img_width / 100)
- ymin = int(y_pct * img_height / 100)
- xmax = int((x_pct + w_pct) * img_width / 100)
- ymax = int((y_pct + h_pct) * img_height / 100)
- else:
- xmin = x_pct
- ymin = y_pct
- xmax = x_pct + w_pct
- ymax = y_pct + h_pct
-
- objects.append({
- "name": label,
- "pose": "Unspecified",
- "truncated": 0,
- "difficult": 0,
- "bndbox": {
- "xmin": xmin,
- "ymin": ymin,
- "xmax": xmax,
- "ymax": ymax
- }
- })
-
- elif item_type == "polygonlabels":
- labels = value.get("polygonlabels", [])
- points = value.get("points", [])
-
- for label in labels:
- if points:
- x_coords = [p[0] for p in points]
- y_coords = [p[1] for p in points]
-
- if img_width > 0 and img_height > 0:
- xmin = int(min(x_coords) * img_width / 100)
- ymin = int(min(y_coords) * img_height / 100)
- xmax = int(max(x_coords) * img_width / 100)
- ymax = int(max(y_coords) * img_height / 100)
- polygon_points = [[int(p[0] * img_width / 100), int(p[1] * img_height / 100)] for p in points]
- else:
- xmin = min(x_coords)
- ymin = min(y_coords)
- xmax = max(x_coords)
- ymax = max(y_coords)
- polygon_points = points
-
- objects.append({
- "name": label,
- "pose": "Unspecified",
- "truncated": 0,
- "difficult": 0,
- "bndbox": {
- "xmin": xmin,
- "ymin": ymin,
- "xmax": xmax,
- "ymax": ymax
- },
- "polygon": polygon_points
- })
-
- # Generate PascalVOC XML
- xml_content = cls._generate_voc_xml(image_filename, img_width, img_height, objects)
-
- voc_data.append({
- "image": image_url,
- "filename": image_filename,
- "xml_content": xml_content,
- "objects": objects
- })
-
- # Write to file
- cls.ensure_export_dir()
- file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_pascal_voc.json"
- file_path = os.path.join(cls.EXPORT_DIR, file_name)
-
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(voc_data, f, ensure_ascii=False, indent=2)
-
- return file_path, len(tasks), total_annotations
-
- @staticmethod
- def _generate_voc_xml(filename: str, width: int, height: int, objects: List[Dict]) -> str:
- """Generate PascalVOC XML string."""
- xml_lines = [
- '<?xml version="1.0" encoding="UTF-8"?>',
- '<annotation>',
- f' <filename>{filename}</filename>',
- ' <source>',
- ' <database>Annotation Platform</database>',
- ' </source>',
- ' <size>',
- f' <width>{width}</width>',
- f' <height>{height}</height>',
- ' <depth>3</depth>',
- ' </size>',
- ' <segmented>0</segmented>'
- ]
-
- for obj in objects:
- xml_lines.append(' <object>')
- xml_lines.append(f' <name>{obj["name"]}</name>')
- xml_lines.append(f' <pose>{obj.get("pose", "Unspecified")}</pose>')
- xml_lines.append(f' <truncated>{obj.get("truncated", 0)}</truncated>')
- xml_lines.append(f' <difficult>{obj.get("difficult", 0)}</difficult>')
- xml_lines.append(' <bndbox>')
- xml_lines.append(f' <xmin>{obj["bndbox"]["xmin"]}</xmin>')
- xml_lines.append(f' <ymin>{obj["bndbox"]["ymin"]}</ymin>')
- xml_lines.append(f' <xmax>{obj["bndbox"]["xmax"]}</xmax>')
- xml_lines.append(f' <ymax>{obj["bndbox"]["ymax"]}</ymax>')
- xml_lines.append(' </bndbox>')
-
- if 'polygon' in obj:
- xml_lines.append(' <polygon>')
- for point in obj['polygon']:
- xml_lines.append(f' <pt><x>{point[0]}</x><y>{point[1]}</y></pt>')
- xml_lines.append(' </polygon>')
-
- xml_lines.append(' </object>')
-
- xml_lines.append('</annotation>')
-
- return '\n'.join(xml_lines)
-
- @classmethod
- def export_to_sharegpt(
- cls,
- project_id: str,
- status_filter: str = "all",
- include_metadata: bool = True
- ) -> Tuple[str, int, int]:
- """
- Export project data to ShareGPT format.
-
- ShareGPT format is used for conversation/dialogue model training.
-
- Args:
- project_id: Project ID
- status_filter: Task status filter
- include_metadata: Whether to include metadata
-
- Returns:
- Tuple of (file_path, total_tasks, total_annotations)
- """
- project = cls.get_project_data(project_id)
- if not project:
- raise ValueError(f"Project {project_id} not found")
-
- tasks = cls.get_tasks_with_annotations(project_id, status_filter)
-
- sharegpt_data = []
- total_annotations = 0
-
- for task in tasks:
- task_data = task["data"]
-
- # Get text content
- text = ""
- if isinstance(task_data, dict):
- text = task_data.get("text", task_data.get("content", ""))
- elif isinstance(task_data, str):
- text = task_data
-
- # Process annotations
- for ann in task["annotations"]:
- total_annotations += 1
- result = ann["result"]
-
- # Extract label/classification result
- label = ""
- if isinstance(result, dict):
- choices = result.get("choices", result.get("result", []))
- if isinstance(choices, list) and choices:
- if isinstance(choices[0], dict):
- label = choices[0].get("value", {}).get("choices", [""])[0]
- else:
- label = str(choices[0])
- elif isinstance(choices, str):
- label = choices
- elif isinstance(result, list) and result:
- first_item = result[0]
- if isinstance(first_item, dict):
- value = first_item.get("value", {})
- choices = value.get("choices", value.get("labels", []))
- if choices:
- label = choices[0] if isinstance(choices, list) else str(choices)
-
- if text and label:
- conversation = {
- "conversations": [
- {"from": "human", "value": text},
- {"from": "gpt", "value": label}
- ]
- }
- if include_metadata:
- conversation["id"] = task["id"]
- conversation["task_name"] = task["name"]
- sharegpt_data.append(conversation)
-
- # Write to file
- cls.ensure_export_dir()
- file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_sharegpt.json"
- file_path = os.path.join(cls.EXPORT_DIR, file_name)
-
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(sharegpt_data, f, ensure_ascii=False, indent=2)
-
- return file_path, len(tasks), total_annotations
-
- @classmethod
- def export_to_alpaca(
- cls,
- project_id: str,
- status_filter: str = "all",
- include_metadata: bool = True
- ) -> Tuple[str, int, int]:
- """
- Export project data to Alpaca format.
-
- Alpaca format is used for instruction fine-tuning of LLMs.
- Format: {"instruction": "...", "input": "...", "output": "..."}
-
- Args:
- project_id: Project ID
- status_filter: Task status filter
- include_metadata: Whether to include metadata
-
- Returns:
- Tuple of (file_path, total_tasks, total_annotations)
- """
- project = cls.get_project_data(project_id)
- if not project:
- raise ValueError(f"Project {project_id} not found")
-
- tasks = cls.get_tasks_with_annotations(project_id, status_filter)
-
- alpaca_data = []
- total_annotations = 0
-
- for task in tasks:
- task_data = task["data"]
-
- # Get text content
- text = ""
- if isinstance(task_data, dict):
- text = task_data.get("text", task_data.get("content", ""))
- elif isinstance(task_data, str):
- text = task_data
-
- # Process annotations
- for ann in task["annotations"]:
- total_annotations += 1
- result = ann["result"]
-
- # Extract label/classification result
- label = ""
- if isinstance(result, dict):
- choices = result.get("choices", result.get("result", []))
- if isinstance(choices, list) and choices:
- if isinstance(choices[0], dict):
- label = choices[0].get("value", {}).get("choices", [""])[0]
- else:
- label = str(choices[0])
- elif isinstance(choices, str):
- label = choices
- elif isinstance(result, list) and result:
- first_item = result[0]
- if isinstance(first_item, dict):
- value = first_item.get("value", {})
- choices = value.get("choices", value.get("labels", []))
- if choices:
- label = choices[0] if isinstance(choices, list) else str(choices)
-
- if text and label:
- alpaca_item = {
- "instruction": "请对以下文本进行分类",
- "input": text,
- "output": label
- }
- if include_metadata:
- alpaca_item["id"] = task["id"]
- alpaca_item["task_name"] = task["name"]
- alpaca_data.append(alpaca_item)
-
- # Write to file
- cls.ensure_export_dir()
- file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_alpaca.json"
- file_path = os.path.join(cls.EXPORT_DIR, file_name)
-
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(alpaca_data, f, ensure_ascii=False, indent=2)
-
- return file_path, len(tasks), total_annotations
-
- @classmethod
- def execute_export(
- cls,
- job_id: str,
- project_id: str,
- format: str,
- status_filter: str,
- include_metadata: bool
- ) -> Dict:
- """
- Execute the export job.
-
- Args:
- job_id: Export job ID
- project_id: Project ID
- format: Export format
- status_filter: Task status filter
- include_metadata: Whether to include metadata
-
- Returns:
- Export result with file path and statistics
- """
- try:
- # Update status to processing
- cls.update_export_job(job_id, status=ExportStatus.PROCESSING.value)
-
- # Execute export based on format
- if format == ExportFormat.JSON.value:
- file_path, total_tasks, total_annotations = cls.export_to_json(
- project_id, status_filter, include_metadata
- )
- elif format == ExportFormat.CSV.value:
- file_path, total_tasks, total_annotations = cls.export_to_csv(
- project_id, status_filter, include_metadata
- )
- elif format == ExportFormat.COCO.value:
- file_path, total_tasks, total_annotations = cls.export_to_coco(
- project_id, status_filter, include_metadata
- )
- elif format == ExportFormat.YOLO.value:
- file_path, total_tasks, total_annotations = cls.export_to_yolo(
- project_id, status_filter, include_metadata
- )
- elif format == ExportFormat.PASCAL_VOC.value:
- file_path, total_tasks, total_annotations = cls.export_to_pascal_voc(
- project_id, status_filter, include_metadata
- )
- elif format == ExportFormat.SHAREGPT.value:
- file_path, total_tasks, total_annotations = cls.export_to_sharegpt(
- project_id, status_filter, include_metadata
- )
- elif format == ExportFormat.ALPACA.value:
- file_path, total_tasks, total_annotations = cls.export_to_alpaca(
- project_id, status_filter, include_metadata
- )
- else:
- raise ValueError(f"Unsupported export format: {format}")
-
- # Update job with success
- cls.update_export_job(
- job_id,
- status=ExportStatus.COMPLETED.value,
- file_path=file_path,
- total_tasks=total_tasks,
- exported_tasks=total_tasks
- )
-
- return {
- "success": True,
- "file_path": file_path,
- "total_tasks": total_tasks,
- "total_annotations": total_annotations
- }
-
- except Exception as e:
- # Update job with failure
- cls.update_export_job(
- job_id,
- status=ExportStatus.FAILED.value,
- error_message=str(e)
- )
-
- return {
- "success": False,
- "error": str(e)
- }
|