open_project_service.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. """
  2. Open API project query service.
  3. Handles project listing, detail, and dataset export queries.
  4. """
  5. import math
  6. import uuid
  7. import logging
  8. from datetime import datetime, timezone
  9. from typing import Optional, Dict
  10. from database import get_db_connection
  11. logger = logging.getLogger(__name__)
  12. # task_type → project_type 映射
  13. TASK_TO_PROJECT_TYPE = {
  14. "text_classification": "text",
  15. "ner": "text",
  16. "image_classification": "image",
  17. "object_detection": "image",
  18. "polygon": "image",
  19. }
  20. # 开放 API 允许访问的项目状态
  21. ALLOWED_STATUSES = ("ready", "in_progress", "completed")
  22. # Download token store: {token: {file_path, project_id, format_val, total_exported, expires_at}}
  23. _DOWNLOAD_STORE: Dict[str, Dict] = {}
  24. def _map_project_type(task_type: str) -> str:
  25. """将数据库 task_type 映射为 API 返回的 project_type。"""
  26. return TASK_TO_PROJECT_TYPE.get(task_type, "text")
  27. def list_projects(
  28. name: Optional[str] = None,
  29. project_type: Optional[str] = None,
  30. status: Optional[str] = None,
  31. page: int = 1,
  32. page_size: int = 20,
  33. ) -> dict:
  34. """
  35. 查询项目列表。
  36. 返回包含 items、total、分页信息的字典。
  37. 仅返回状态为 ready/in_progress/completed 的项目。
  38. """
  39. page = max(page, 1)
  40. page_size = min(max(page_size, 1), 100)
  41. conditions = ["p.status IN %s"]
  42. params = [ALLOWED_STATUSES]
  43. if name:
  44. conditions.append("p.name LIKE %s")
  45. params.append(f"%{name}%")
  46. if project_type:
  47. allowed_types = [t for t, pt in TASK_TO_PROJECT_TYPE.items() if pt == project_type]
  48. if allowed_types:
  49. conditions.append("p.task_type IN %s")
  50. params.append(tuple(allowed_types))
  51. if status and status in ALLOWED_STATUSES:
  52. conditions.append("p.status = %s")
  53. params.append(status)
  54. where = " AND ".join(conditions)
  55. with get_db_connection() as conn:
  56. # Count total
  57. cursor = conn.cursor()
  58. cursor.execute(f"SELECT COUNT(*) AS cnt FROM projects p WHERE {where}", tuple(params))
  59. total = cursor.fetchone()["cnt"]
  60. total_pages = max(math.ceil(total / page_size), 1) if total > 0 else 1
  61. offset = (page - 1) * page_size
  62. # Fetch items
  63. cursor.execute(
  64. f"""
  65. SELECT p.id, p.name, p.description, p.task_type, p.status,
  66. p.created_at, p.updated_at,
  67. COUNT(t.id) AS task_count,
  68. SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) AS completed_task_count
  69. FROM projects p
  70. LEFT JOIN tasks t ON t.project_id = p.id
  71. WHERE {where}
  72. GROUP BY p.id
  73. ORDER BY p.updated_at DESC
  74. LIMIT %s OFFSET %s
  75. """,
  76. tuple(params) + (page_size, offset),
  77. )
  78. rows = cursor.fetchall()
  79. items = []
  80. for row in rows:
  81. items.append({
  82. "project_id": row["id"],
  83. "project_name": row["name"],
  84. "description": row["description"] or "",
  85. "project_type": _map_project_type(row["task_type"] or ""),
  86. "task_type": row["task_type"] or "",
  87. "status": row["status"],
  88. "created_by": "",
  89. "created_at": row["created_at"],
  90. "updated_at": row["updated_at"],
  91. "task_count": int(row["task_count"]),
  92. "completed_task_count": int(row["completed_task_count"]),
  93. })
  94. return {
  95. "items": items,
  96. "total": total,
  97. "page": page,
  98. "page_size": page_size,
  99. "total_pages": total_pages,
  100. "has_next": page < total_pages,
  101. "has_prev": page > 1,
  102. }
  103. def get_project_detail(project_id: str) -> Optional[dict]:
  104. """
  105. 根据项目 ID 查询项目详情。
  106. 返回包含统计信息的完整项目信息。
  107. """
  108. with get_db_connection() as conn:
  109. cursor = conn.cursor()
  110. cursor.execute(
  111. """
  112. SELECT p.id, p.name, p.description, p.task_type, p.status,
  113. p.created_at, p.updated_at,
  114. COUNT(t.id) AS task_count,
  115. SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) AS completed_task_count,
  116. SUM(CASE WHEN t.assigned_to IS NOT NULL THEN 1 ELSE 0 END) AS assigned_task_count
  117. FROM projects p
  118. LEFT JOIN tasks t ON t.project_id = p.id
  119. WHERE p.id = %s
  120. GROUP BY p.id
  121. """,
  122. (project_id,),
  123. )
  124. row = cursor.fetchone()
  125. if not row:
  126. return None
  127. total = max(row["task_count"], 1)
  128. completed = row["completed_task_count"]
  129. return {
  130. "project_id": row["id"],
  131. "project_name": row["name"],
  132. "description": row["description"] or "",
  133. "project_type": _map_project_type(row["task_type"] or ""),
  134. "task_type": row["task_type"] or "",
  135. "status": row["status"],
  136. "created_by": "",
  137. "created_at": row["created_at"],
  138. "updated_at": row["updated_at"],
  139. "task_count": int(row["task_count"]),
  140. "completed_task_count": int(row["completed_task_count"]),
  141. "assigned_task_count": int(row["assigned_task_count"]),
  142. "completion_percentage": round(completed / total * 100, 1),
  143. }
  144. # --- Download token management ---
  145. def create_download_token(
  146. file_path: str,
  147. project_id: str,
  148. format_val: str,
  149. total_exported: int,
  150. expires_at: datetime,
  151. ) -> str:
  152. """Create a download token and store the download info."""
  153. token = f"dl_{uuid.uuid4().hex[:12]}"
  154. _DOWNLOAD_STORE[token] = {
  155. "file_path": file_path,
  156. "project_id": project_id,
  157. "format_val": format_val,
  158. "total_exported": total_exported,
  159. "expires_at": expires_at,
  160. }
  161. _cleanup_expired_tokens()
  162. return token
  163. def get_download_info(token: str) -> Optional[Dict]:
  164. """Get download info for a token. Returns None if expired or not found."""
  165. info = _DOWNLOAD_STORE.get(token)
  166. if not info:
  167. return None
  168. if info["expires_at"] < datetime.now(timezone.utc):
  169. del _DOWNLOAD_STORE[token]
  170. return None
  171. return info
  172. def _cleanup_expired_tokens():
  173. """Remove expired tokens."""
  174. now = datetime.now(timezone.utc)
  175. expired = [t for t, info in _DOWNLOAD_STORE.items() if info["expires_at"] < now]
  176. for t in expired:
  177. del _DOWNLOAD_STORE[t]