export_service.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700
  1. """
  2. Export Service for data export operations.
  3. Handles exporting annotations in various formats: JSON, CSV, COCO, YOLO.
  4. """
  5. import json
  6. import csv
  7. import io
  8. import os
  9. import uuid
  10. from datetime import datetime
  11. from typing import List, Dict, Any, Optional, Tuple
  12. from database import get_db_connection
  13. from schemas.export import (
  14. ExportFormat, ExportStatus, StatusFilter,
  15. TaskExportItem, AnnotationExportItem, ProjectExportData,
  16. COCOImage, COCOCategory, COCOAnnotation, COCOExportData
  17. )
  18. class ExportService:
  19. """Service for data export operations."""
  20. # Export directory
  21. EXPORT_DIR = "exports"
  22. @classmethod
  23. def ensure_export_dir(cls) -> str:
  24. """Ensure export directory exists."""
  25. if not os.path.exists(cls.EXPORT_DIR):
  26. os.makedirs(cls.EXPORT_DIR)
  27. return cls.EXPORT_DIR
  28. @staticmethod
  29. def create_export_job(
  30. project_id: str,
  31. format: str,
  32. status_filter: str,
  33. include_metadata: bool,
  34. created_by: str
  35. ) -> str:
  36. """
  37. Create a new export job record.
  38. Args:
  39. project_id: Project ID to export
  40. format: Export format
  41. status_filter: Task status filter
  42. include_metadata: Whether to include metadata
  43. created_by: User ID who created the job
  44. Returns:
  45. Export job ID
  46. """
  47. job_id = f"export_{uuid.uuid4().hex[:12]}"
  48. with get_db_connection() as conn:
  49. cursor = conn.cursor()
  50. cursor.execute("""
  51. INSERT INTO export_jobs (
  52. id, project_id, format, status, status_filter,
  53. include_metadata, created_by, created_at
  54. )
  55. VALUES (?, ?, ?, ?, ?, ?, ?, ?)
  56. """, (
  57. job_id, project_id, format, ExportStatus.PENDING.value,
  58. status_filter, include_metadata, created_by,
  59. datetime.now().isoformat()
  60. ))
  61. return job_id
  62. @staticmethod
  63. def update_export_job(
  64. job_id: str,
  65. status: str = None,
  66. file_path: str = None,
  67. error_message: str = None,
  68. total_tasks: int = None,
  69. exported_tasks: int = None
  70. ) -> None:
  71. """Update export job status and details."""
  72. with get_db_connection() as conn:
  73. cursor = conn.cursor()
  74. updates = []
  75. params = []
  76. if status:
  77. updates.append("status = ?")
  78. params.append(status)
  79. if status in [ExportStatus.COMPLETED.value, ExportStatus.FAILED.value]:
  80. updates.append("completed_at = ?")
  81. params.append(datetime.now().isoformat())
  82. if file_path is not None:
  83. updates.append("file_path = ?")
  84. params.append(file_path)
  85. if error_message is not None:
  86. updates.append("error_message = ?")
  87. params.append(error_message)
  88. if total_tasks is not None:
  89. updates.append("total_tasks = ?")
  90. params.append(total_tasks)
  91. if exported_tasks is not None:
  92. updates.append("exported_tasks = ?")
  93. params.append(exported_tasks)
  94. if updates:
  95. params.append(job_id)
  96. cursor.execute(f"""
  97. UPDATE export_jobs
  98. SET {', '.join(updates)}
  99. WHERE id = ?
  100. """, tuple(params))
  101. @staticmethod
  102. def get_export_job(job_id: str) -> Optional[Dict]:
  103. """Get export job by ID."""
  104. with get_db_connection() as conn:
  105. cursor = conn.cursor()
  106. cursor.execute("""
  107. SELECT * FROM export_jobs WHERE id = ?
  108. """, (job_id,))
  109. row = cursor.fetchone()
  110. if not row:
  111. return None
  112. return {
  113. "id": row["id"],
  114. "project_id": row["project_id"],
  115. "format": row["format"],
  116. "status": row["status"],
  117. "status_filter": row["status_filter"],
  118. "include_metadata": bool(row["include_metadata"]),
  119. "file_path": row["file_path"],
  120. "error_message": row["error_message"],
  121. "created_by": row["created_by"],
  122. "created_at": row["created_at"],
  123. "completed_at": row["completed_at"],
  124. "total_tasks": row["total_tasks"] or 0,
  125. "exported_tasks": row["exported_tasks"] or 0
  126. }
  127. @staticmethod
  128. def get_project_data(project_id: str) -> Optional[Dict]:
  129. """Get project basic info."""
  130. with get_db_connection() as conn:
  131. cursor = conn.cursor()
  132. cursor.execute("""
  133. SELECT id, name, description, config, created_at
  134. FROM projects WHERE id = ?
  135. """, (project_id,))
  136. row = cursor.fetchone()
  137. if not row:
  138. return None
  139. return {
  140. "id": row["id"],
  141. "name": row["name"],
  142. "description": row["description"],
  143. "config": row["config"],
  144. "created_at": row["created_at"]
  145. }
  146. @staticmethod
  147. def get_tasks_with_annotations(
  148. project_id: str,
  149. status_filter: str = "all"
  150. ) -> List[Dict]:
  151. """
  152. Get all tasks with their annotations for a project.
  153. Args:
  154. project_id: Project ID
  155. status_filter: Filter by task status
  156. Returns:
  157. List of tasks with annotations
  158. """
  159. with get_db_connection() as conn:
  160. cursor = conn.cursor()
  161. # Build query based on status filter
  162. query = """
  163. SELECT t.id, t.name, t.data, t.status, t.assigned_to, t.created_at
  164. FROM tasks t
  165. WHERE t.project_id = ?
  166. """
  167. params = [project_id]
  168. if status_filter != "all":
  169. query += " AND t.status = ?"
  170. params.append(status_filter)
  171. query += " ORDER BY t.created_at"
  172. cursor.execute(query, tuple(params))
  173. task_rows = cursor.fetchall()
  174. tasks = []
  175. for task_row in task_rows:
  176. task_id = task_row["id"]
  177. # Get annotations for this task
  178. cursor.execute("""
  179. SELECT id, task_id, user_id, result, created_at, updated_at
  180. FROM annotations
  181. WHERE task_id = ?
  182. ORDER BY created_at
  183. """, (task_id,))
  184. annotation_rows = cursor.fetchall()
  185. annotations = []
  186. for ann_row in annotation_rows:
  187. result = ann_row["result"]
  188. if isinstance(result, str):
  189. try:
  190. result = json.loads(result)
  191. except json.JSONDecodeError:
  192. pass
  193. annotations.append({
  194. "id": ann_row["id"],
  195. "task_id": ann_row["task_id"],
  196. "user_id": ann_row["user_id"],
  197. "result": result,
  198. "created_at": str(ann_row["created_at"]),
  199. "updated_at": str(ann_row["updated_at"])
  200. })
  201. # Parse task data
  202. task_data = task_row["data"]
  203. if isinstance(task_data, str):
  204. try:
  205. task_data = json.loads(task_data)
  206. except json.JSONDecodeError:
  207. pass
  208. tasks.append({
  209. "id": task_row["id"],
  210. "name": task_row["name"],
  211. "data": task_data,
  212. "status": task_row["status"],
  213. "assigned_to": task_row["assigned_to"],
  214. "created_at": str(task_row["created_at"]),
  215. "annotations": annotations
  216. })
  217. return tasks
  218. @classmethod
  219. def export_to_json(
  220. cls,
  221. project_id: str,
  222. status_filter: str = "all",
  223. include_metadata: bool = True
  224. ) -> Tuple[str, int, int]:
  225. """
  226. Export project data to JSON format.
  227. Args:
  228. project_id: Project ID
  229. status_filter: Task status filter
  230. include_metadata: Whether to include metadata
  231. Returns:
  232. Tuple of (file_path, total_tasks, total_annotations)
  233. """
  234. project = cls.get_project_data(project_id)
  235. if not project:
  236. raise ValueError(f"Project {project_id} not found")
  237. tasks = cls.get_tasks_with_annotations(project_id, status_filter)
  238. total_annotations = sum(len(t["annotations"]) for t in tasks)
  239. export_data = {
  240. "project_id": project["id"],
  241. "project_name": project["name"],
  242. "export_format": "json",
  243. "export_time": datetime.now().isoformat(),
  244. "total_tasks": len(tasks),
  245. "total_annotations": total_annotations,
  246. "tasks": tasks
  247. }
  248. if include_metadata:
  249. export_data["project_description"] = project["description"]
  250. export_data["config"] = project["config"]
  251. # Write to file
  252. cls.ensure_export_dir()
  253. file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
  254. file_path = os.path.join(cls.EXPORT_DIR, file_name)
  255. with open(file_path, 'w', encoding='utf-8') as f:
  256. json.dump(export_data, f, ensure_ascii=False, indent=2)
  257. return file_path, len(tasks), total_annotations
  258. @classmethod
  259. def export_to_csv(
  260. cls,
  261. project_id: str,
  262. status_filter: str = "all",
  263. include_metadata: bool = True
  264. ) -> Tuple[str, int, int]:
  265. """
  266. Export project data to CSV format.
  267. Args:
  268. project_id: Project ID
  269. status_filter: Task status filter
  270. include_metadata: Whether to include metadata
  271. Returns:
  272. Tuple of (file_path, total_tasks, total_annotations)
  273. """
  274. project = cls.get_project_data(project_id)
  275. if not project:
  276. raise ValueError(f"Project {project_id} not found")
  277. tasks = cls.get_tasks_with_annotations(project_id, status_filter)
  278. total_annotations = 0
  279. # Prepare CSV data
  280. cls.ensure_export_dir()
  281. file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
  282. file_path = os.path.join(cls.EXPORT_DIR, file_name)
  283. with open(file_path, 'w', newline='', encoding='utf-8') as f:
  284. writer = csv.writer(f)
  285. # Write header
  286. headers = [
  287. "task_id", "task_name", "task_status", "task_data",
  288. "annotation_id", "user_id", "annotation_result",
  289. "annotation_created_at", "annotation_updated_at"
  290. ]
  291. if include_metadata:
  292. headers.extend(["assigned_to", "task_created_at"])
  293. writer.writerow(headers)
  294. # Write data rows
  295. for task in tasks:
  296. if task["annotations"]:
  297. for ann in task["annotations"]:
  298. total_annotations += 1
  299. row = [
  300. task["id"],
  301. task["name"],
  302. task["status"],
  303. json.dumps(task["data"], ensure_ascii=False),
  304. ann["id"],
  305. ann["user_id"],
  306. json.dumps(ann["result"], ensure_ascii=False),
  307. ann["created_at"],
  308. ann["updated_at"]
  309. ]
  310. if include_metadata:
  311. row.extend([task["assigned_to"], task["created_at"]])
  312. writer.writerow(row)
  313. else:
  314. # Task without annotations
  315. row = [
  316. task["id"],
  317. task["name"],
  318. task["status"],
  319. json.dumps(task["data"], ensure_ascii=False),
  320. "", "", "", "", ""
  321. ]
  322. if include_metadata:
  323. row.extend([task["assigned_to"], task["created_at"]])
  324. writer.writerow(row)
  325. return file_path, len(tasks), total_annotations
  326. @classmethod
  327. def export_to_coco(
  328. cls,
  329. project_id: str,
  330. status_filter: str = "all",
  331. include_metadata: bool = True
  332. ) -> Tuple[str, int, int]:
  333. """
  334. Export project data to COCO format.
  335. COCO format is primarily for object detection tasks.
  336. Args:
  337. project_id: Project ID
  338. status_filter: Task status filter
  339. include_metadata: Whether to include metadata
  340. Returns:
  341. Tuple of (file_path, total_tasks, total_annotations)
  342. """
  343. project = cls.get_project_data(project_id)
  344. if not project:
  345. raise ValueError(f"Project {project_id} not found")
  346. tasks = cls.get_tasks_with_annotations(project_id, status_filter)
  347. # Initialize COCO structure
  348. coco_data = {
  349. "info": {
  350. "description": project["name"],
  351. "version": "1.0",
  352. "year": datetime.now().year,
  353. "contributor": "Annotation Platform",
  354. "date_created": datetime.now().isoformat()
  355. },
  356. "licenses": [],
  357. "images": [],
  358. "annotations": [],
  359. "categories": []
  360. }
  361. # Track categories
  362. category_map = {}
  363. category_id = 1
  364. # Track annotation ID
  365. annotation_id = 1
  366. total_annotations = 0
  367. for image_id, task in enumerate(tasks, start=1):
  368. # Add image entry
  369. task_data = task["data"]
  370. image_url = ""
  371. if isinstance(task_data, dict):
  372. image_url = task_data.get("image", task_data.get("image_url", ""))
  373. coco_data["images"].append({
  374. "id": image_id,
  375. "file_name": image_url or task["name"],
  376. "width": 0,
  377. "height": 0
  378. })
  379. # Process annotations
  380. for ann in task["annotations"]:
  381. result = ann["result"]
  382. if isinstance(result, dict):
  383. result = result.get("annotations", result.get("result", []))
  384. if not isinstance(result, list):
  385. result = [result] if result else []
  386. for item in result:
  387. if not isinstance(item, dict):
  388. continue
  389. total_annotations += 1
  390. # Extract label and bbox
  391. value = item.get("value", {})
  392. labels = value.get("rectanglelabels", value.get("labels", []))
  393. for label in labels:
  394. # Add category if new
  395. if label not in category_map:
  396. category_map[label] = category_id
  397. coco_data["categories"].append({
  398. "id": category_id,
  399. "name": label,
  400. "supercategory": ""
  401. })
  402. category_id += 1
  403. # Calculate bbox (COCO format: [x, y, width, height])
  404. x = value.get("x", 0)
  405. y = value.get("y", 0)
  406. width = value.get("width", 0)
  407. height = value.get("height", 0)
  408. coco_data["annotations"].append({
  409. "id": annotation_id,
  410. "image_id": image_id,
  411. "category_id": category_map[label],
  412. "bbox": [x, y, width, height],
  413. "area": width * height,
  414. "segmentation": [],
  415. "iscrowd": 0
  416. })
  417. annotation_id += 1
  418. # Write to file
  419. cls.ensure_export_dir()
  420. file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_coco.json"
  421. file_path = os.path.join(cls.EXPORT_DIR, file_name)
  422. with open(file_path, 'w', encoding='utf-8') as f:
  423. json.dump(coco_data, f, ensure_ascii=False, indent=2)
  424. return file_path, len(tasks), total_annotations
  425. @classmethod
  426. def export_to_yolo(
  427. cls,
  428. project_id: str,
  429. status_filter: str = "all",
  430. include_metadata: bool = True
  431. ) -> Tuple[str, int, int]:
  432. """
  433. Export project data to YOLO format.
  434. YOLO format creates a directory with:
  435. - images/ (or references to images)
  436. - labels/ (txt files with annotations)
  437. - classes.txt (class names)
  438. - data.yaml (dataset configuration)
  439. For simplicity, we create a JSON file containing YOLO-formatted data.
  440. Args:
  441. project_id: Project ID
  442. status_filter: Task status filter
  443. include_metadata: Whether to include metadata
  444. Returns:
  445. Tuple of (file_path, total_tasks, total_annotations)
  446. """
  447. project = cls.get_project_data(project_id)
  448. if not project:
  449. raise ValueError(f"Project {project_id} not found")
  450. tasks = cls.get_tasks_with_annotations(project_id, status_filter)
  451. # Track classes
  452. class_map = {}
  453. class_id = 0
  454. # YOLO data structure
  455. yolo_data = {
  456. "info": {
  457. "project_name": project["name"],
  458. "export_time": datetime.now().isoformat(),
  459. "format": "yolo"
  460. },
  461. "classes": [],
  462. "images": []
  463. }
  464. total_annotations = 0
  465. for task in tasks:
  466. task_data = task["data"]
  467. image_url = ""
  468. image_width = 1.0 # Normalized
  469. image_height = 1.0 # Normalized
  470. if isinstance(task_data, dict):
  471. image_url = task_data.get("image", task_data.get("image_url", ""))
  472. image_width = task_data.get("width", 1.0)
  473. image_height = task_data.get("height", 1.0)
  474. image_entry = {
  475. "id": task["id"],
  476. "file_name": image_url or task["name"],
  477. "labels": []
  478. }
  479. # Process annotations
  480. for ann in task["annotations"]:
  481. result = ann["result"]
  482. if isinstance(result, dict):
  483. result = result.get("annotations", result.get("result", []))
  484. if not isinstance(result, list):
  485. result = [result] if result else []
  486. for item in result:
  487. if not isinstance(item, dict):
  488. continue
  489. total_annotations += 1
  490. # Extract label and bbox
  491. value = item.get("value", {})
  492. labels = value.get("rectanglelabels", value.get("labels", []))
  493. for label in labels:
  494. # Add class if new
  495. if label not in class_map:
  496. class_map[label] = class_id
  497. yolo_data["classes"].append(label)
  498. class_id += 1
  499. # Calculate YOLO format bbox
  500. # YOLO format: class_id x_center y_center width height (normalized 0-1)
  501. x = value.get("x", 0) / 100.0 # Convert from percentage
  502. y = value.get("y", 0) / 100.0
  503. w = value.get("width", 0) / 100.0
  504. h = value.get("height", 0) / 100.0
  505. # Convert to center coordinates
  506. x_center = x + w / 2
  507. y_center = y + h / 2
  508. image_entry["labels"].append({
  509. "class_id": class_map[label],
  510. "class_name": label,
  511. "x_center": round(x_center, 6),
  512. "y_center": round(y_center, 6),
  513. "width": round(w, 6),
  514. "height": round(h, 6),
  515. "yolo_line": f"{class_map[label]} {x_center:.6f} {y_center:.6f} {w:.6f} {h:.6f}"
  516. })
  517. yolo_data["images"].append(image_entry)
  518. # Write to file
  519. cls.ensure_export_dir()
  520. file_name = f"export_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_yolo.json"
  521. file_path = os.path.join(cls.EXPORT_DIR, file_name)
  522. with open(file_path, 'w', encoding='utf-8') as f:
  523. json.dump(yolo_data, f, ensure_ascii=False, indent=2)
  524. return file_path, len(tasks), total_annotations
  525. @classmethod
  526. def execute_export(
  527. cls,
  528. job_id: str,
  529. project_id: str,
  530. format: str,
  531. status_filter: str,
  532. include_metadata: bool
  533. ) -> Dict:
  534. """
  535. Execute the export job.
  536. Args:
  537. job_id: Export job ID
  538. project_id: Project ID
  539. format: Export format
  540. status_filter: Task status filter
  541. include_metadata: Whether to include metadata
  542. Returns:
  543. Export result with file path and statistics
  544. """
  545. try:
  546. # Update status to processing
  547. cls.update_export_job(job_id, status=ExportStatus.PROCESSING.value)
  548. # Execute export based on format
  549. if format == ExportFormat.JSON.value:
  550. file_path, total_tasks, total_annotations = cls.export_to_json(
  551. project_id, status_filter, include_metadata
  552. )
  553. elif format == ExportFormat.CSV.value:
  554. file_path, total_tasks, total_annotations = cls.export_to_csv(
  555. project_id, status_filter, include_metadata
  556. )
  557. elif format == ExportFormat.COCO.value:
  558. file_path, total_tasks, total_annotations = cls.export_to_coco(
  559. project_id, status_filter, include_metadata
  560. )
  561. elif format == ExportFormat.YOLO.value:
  562. file_path, total_tasks, total_annotations = cls.export_to_yolo(
  563. project_id, status_filter, include_metadata
  564. )
  565. else:
  566. raise ValueError(f"Unsupported export format: {format}")
  567. # Update job with success
  568. cls.update_export_job(
  569. job_id,
  570. status=ExportStatus.COMPLETED.value,
  571. file_path=file_path,
  572. total_tasks=total_tasks,
  573. exported_tasks=total_tasks
  574. )
  575. return {
  576. "success": True,
  577. "file_path": file_path,
  578. "total_tasks": total_tasks,
  579. "total_annotations": total_annotations
  580. }
  581. except Exception as e:
  582. # Update job with failure
  583. cls.update_export_job(
  584. job_id,
  585. status=ExportStatus.FAILED.value,
  586. error_message=str(e)
  587. )
  588. return {
  589. "success": False,
  590. "error": str(e)
  591. }