"""
External API Service.
Provides business logic for external system integration.
"""
import uuid
import json
import logging
from datetime import datetime
from typing import Optional, List, Dict, Any
from database import get_db_connection
from schemas.external import (
TaskType, ProjectInitRequest, ProjectInitResponse,
ProgressResponse, AnnotatorProgress,
ExternalExportFormat, ExternalExportRequest, ExternalExportResponse,
TaskDataItem
)
logger = logging.getLogger(__name__)
# 默认XML配置模板(不含标签,由管理员后续配置)
DEFAULT_CONFIGS = {
TaskType.TEXT_CLASSIFICATION: """
""",
TaskType.IMAGE_CLASSIFICATION: """
""",
TaskType.OBJECT_DETECTION: """
""",
TaskType.NER: """
"""
}
class ExternalService:
"""对外API服务类"""
@staticmethod
def get_default_config(task_type: TaskType) -> str:
"""获取任务类型对应的默认XML配置"""
return DEFAULT_CONFIGS.get(task_type, DEFAULT_CONFIGS[TaskType.TEXT_CLASSIFICATION])
@staticmethod
def init_project(request: ProjectInitRequest, user_id: str) -> ProjectInitResponse:
"""
初始化项目并创建任务
Args:
request: 项目初始化请求
user_id: 创建者用户ID
Returns:
ProjectInitResponse: 项目初始化响应
"""
# 生成项目ID
project_id = f"proj_{uuid.uuid4().hex[:12]}"
# 获取默认配置
config = ExternalService.get_default_config(request.task_type)
with get_db_connection() as conn:
cursor = conn.cursor()
# 创建项目
cursor.execute("""
INSERT INTO projects (id, name, description, config, status, source, task_type, external_id, updated_at)
VALUES (?, ?, ?, ?, 'draft', 'external', ?, ?, CURRENT_TIMESTAMP)
""", (
project_id,
request.name,
request.description or "",
config,
request.task_type.value,
request.external_id
))
# 创建任务
task_count = 0
for i, item in enumerate(request.data):
task_id = f"task_{uuid.uuid4().hex[:12]}"
task_name = f"Task {i + 1}"
# 根据任务类型构建数据格式
if request.task_type in [TaskType.TEXT_CLASSIFICATION, TaskType.NER]:
task_data = {
"text": item.content,
"external_id": item.id,
"metadata": item.metadata or {}
}
else:
task_data = {
"image": item.content,
"external_id": item.id,
"metadata": item.metadata or {}
}
cursor.execute("""
INSERT INTO tasks (id, project_id, name, data, status)
VALUES (?, ?, ?, ?, 'pending')
""", (
task_id,
project_id,
task_name,
json.dumps(task_data)
))
task_count += 1
# 获取创建时间
cursor.execute("SELECT created_at FROM projects WHERE id = ?", (project_id,))
row = cursor.fetchone()
created_at = row["created_at"] if row else datetime.now()
return ProjectInitResponse(
project_id=project_id,
project_name=request.name,
task_count=task_count,
status="draft",
created_at=created_at,
config=config,
external_id=request.external_id
)
@staticmethod
def get_project_progress(project_id: str) -> Optional[ProgressResponse]:
"""
获取项目进度
Args:
project_id: 项目ID
Returns:
ProgressResponse: 进度响应,如果项目不存在返回None
"""
with get_db_connection() as conn:
cursor = conn.cursor()
# 获取项目信息
cursor.execute("""
SELECT id, name, status, updated_at
FROM projects
WHERE id = ?
""", (project_id,))
project = cursor.fetchone()
if not project:
return None
# 获取任务统计
cursor.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'in_progress' THEN 1 ELSE 0 END) as in_progress,
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending
FROM tasks
WHERE project_id = ?
""", (project_id,))
stats = cursor.fetchone()
total_tasks = stats["total"] or 0
completed_tasks = stats["completed"] or 0
in_progress_tasks = stats["in_progress"] or 0
pending_tasks = stats["pending"] or 0
# 计算完成百分比
completion_percentage = 0.0
if total_tasks > 0:
completion_percentage = round((completed_tasks / total_tasks) * 100, 2)
# 获取标注人员统计
cursor.execute("""
SELECT
t.assigned_to,
u.username,
COUNT(*) as assigned_count,
SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) as completed_count,
SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END) as in_progress_count
FROM tasks t
LEFT JOIN users u ON t.assigned_to = u.id
WHERE t.project_id = ? AND t.assigned_to IS NOT NULL
GROUP BY t.assigned_to, u.username
""", (project_id,))
annotators = []
for row in cursor.fetchall():
assigned_count = row["assigned_count"] or 0
completed_count = row["completed_count"] or 0
completion_rate = 0.0
if assigned_count > 0:
completion_rate = round((completed_count / assigned_count) * 100, 2)
annotators.append(AnnotatorProgress(
user_id=row["assigned_to"] or "",
username=row["username"] or "Unknown",
assigned_count=assigned_count,
completed_count=completed_count,
in_progress_count=row["in_progress_count"] or 0,
completion_rate=completion_rate
))
return ProgressResponse(
project_id=project_id,
project_name=project["name"],
status=project["status"] or "draft",
total_tasks=total_tasks,
completed_tasks=completed_tasks,
in_progress_tasks=in_progress_tasks,
pending_tasks=pending_tasks,
completion_percentage=completion_percentage,
annotators=annotators,
last_updated=project["updated_at"]
)
@staticmethod
def check_project_exists(project_id: str) -> bool:
"""检查项目是否存在"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT id FROM projects WHERE id = ?", (project_id,))
return cursor.fetchone() is not None
@staticmethod
def export_project_data(
project_id: str,
request: ExternalExportRequest,
base_url: str = ""
) -> Optional[ExternalExportResponse]:
"""
导出项目数据
Args:
project_id: 项目ID
request: 导出请求
base_url: 基础URL,用于生成下载链接
Returns:
ExternalExportResponse: 导出响应,如果项目不存在返回None
"""
import os
from datetime import timedelta
with get_db_connection() as conn:
cursor = conn.cursor()
# 检查项目是否存在
cursor.execute("SELECT id, name FROM projects WHERE id = ?", (project_id,))
project = cursor.fetchone()
if not project:
return None
# 构建查询条件
status_filter = ""
if request.completed_only:
status_filter = "AND t.status = 'completed'"
# 获取任务和标注数据
cursor.execute(f"""
SELECT
t.id as task_id,
t.data,
t.status,
t.assigned_to,
u.username as annotator_name,
a.id as annotation_id,
a.result as annotation_result,
a.updated_at as annotation_time
FROM tasks t
LEFT JOIN users u ON t.assigned_to = u.id
LEFT JOIN annotations a ON t.id = a.task_id
WHERE t.project_id = ? {status_filter}
ORDER BY t.id
""", (project_id,))
rows = cursor.fetchall()
# 组织数据
tasks_data = {}
for row in rows:
task_id = row["task_id"]
if task_id not in tasks_data:
task_data = json.loads(row["data"]) if row["data"] else {}
tasks_data[task_id] = {
"task_id": task_id,
"external_id": task_data.get("external_id"),
"original_data": task_data,
"annotations": [],
"status": row["status"],
"annotator": row["annotator_name"],
"completed_at": None
}
if row["annotation_id"]:
annotation_result = json.loads(row["annotation_result"]) if row["annotation_result"] else {}
tasks_data[task_id]["annotations"].append(annotation_result)
if row["annotation_time"]:
tasks_data[task_id]["completed_at"] = row["annotation_time"]
# 转换为列表
export_data = list(tasks_data.values())
total_exported = len(export_data)
# 生成导出文件
export_id = f"export_{uuid.uuid4().hex[:12]}"
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 确保导出目录存在
export_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "exports")
os.makedirs(export_dir, exist_ok=True)
# 根据格式生成文件
file_name, file_content = ExternalService._generate_export_file(
export_data,
request.format,
project_id,
timestamp
)
file_path = os.path.join(export_dir, file_name)
# 写入文件
if isinstance(file_content, str):
with open(file_path, 'w', encoding='utf-8') as f:
f.write(file_content)
else:
with open(file_path, 'wb') as f:
f.write(file_content)
file_size = os.path.getsize(file_path)
# 记录导出任务
cursor.execute("""
INSERT INTO export_jobs (id, project_id, format, status, status_filter, file_path, total_tasks, exported_tasks, completed_at)
VALUES (?, ?, ?, 'completed', ?, ?, ?, ?, CURRENT_TIMESTAMP)
""", (
export_id,
project_id,
request.format.value,
'completed' if request.completed_only else 'all',
file_path,
total_exported,
total_exported
))
# 计算过期时间(7天后)
expires_at = datetime.now() + timedelta(days=7)
return ExternalExportResponse(
project_id=project_id,
format=request.format.value,
total_exported=total_exported,
file_url=f"/api/exports/{export_id}/download",
file_name=file_name,
file_size=file_size,
expires_at=expires_at
)
@staticmethod
def _generate_export_file(
data: List[Dict],
format: ExternalExportFormat,
project_id: str,
timestamp: str
) -> tuple:
"""
根据格式生成导出文件
Returns:
tuple: (文件名, 文件内容)
"""
if format == ExternalExportFormat.JSON:
return ExternalService._export_json(data, project_id, timestamp)
elif format == ExternalExportFormat.CSV:
return ExternalService._export_csv(data, project_id, timestamp)
elif format == ExternalExportFormat.SHAREGPT:
return ExternalService._export_sharegpt(data, project_id, timestamp)
elif format == ExternalExportFormat.YOLO:
return ExternalService._export_yolo(data, project_id, timestamp)
elif format == ExternalExportFormat.COCO:
return ExternalService._export_coco(data, project_id, timestamp)
elif format == ExternalExportFormat.ALPACA:
return ExternalService._export_alpaca(data, project_id, timestamp)
else:
return ExternalService._export_json(data, project_id, timestamp)
@staticmethod
def _export_json(data: List[Dict], project_id: str, timestamp: str) -> tuple:
"""导出JSON格式"""
file_name = f"export_{project_id}_{timestamp}.json"
content = json.dumps(data, ensure_ascii=False, indent=2)
return file_name, content
@staticmethod
def _export_csv(data: List[Dict], project_id: str, timestamp: str) -> tuple:
"""导出CSV格式"""
import csv
import io
file_name = f"export_{project_id}_{timestamp}.csv"
output = io.StringIO()
writer = csv.writer(output)
# 写入表头
writer.writerow(['task_id', 'external_id', 'status', 'annotator', 'original_data', 'annotations'])
# 写入数据
for item in data:
writer.writerow([
item.get('task_id', ''),
item.get('external_id', ''),
item.get('status', ''),
item.get('annotator', ''),
json.dumps(item.get('original_data', {}), ensure_ascii=False),
json.dumps(item.get('annotations', []), ensure_ascii=False)
])
return file_name, output.getvalue()
@staticmethod
def _export_sharegpt(data: List[Dict], project_id: str, timestamp: str) -> tuple:
"""导出ShareGPT对话格式"""
file_name = f"export_{project_id}_sharegpt_{timestamp}.json"
conversations = []
for item in data:
original = item.get('original_data', {})
annotations = item.get('annotations', [])
# 获取原始文本
text = original.get('text', original.get('image', ''))
# 获取标注结果
label = ""
if annotations:
for ann in annotations:
if isinstance(ann, list):
for a in ann:
if 'value' in a and 'choices' in a['value']:
label = ', '.join(a['value']['choices'])
break
elif isinstance(ann, dict):
if 'value' in ann and 'choices' in ann['value']:
label = ', '.join(ann['value']['choices'])
if text and label:
conversations.append({
"conversations": [
{"from": "human", "value": text},
{"from": "gpt", "value": label}
]
})
content = json.dumps(conversations, ensure_ascii=False, indent=2)
return file_name, content
@staticmethod
def _export_yolo(data: List[Dict], project_id: str, timestamp: str) -> tuple:
"""导出YOLO格式(简化版,返回JSON描述)"""
file_name = f"export_{project_id}_yolo_{timestamp}.json"
yolo_data = []
for item in data:
original = item.get('original_data', {})
annotations = item.get('annotations', [])
image_url = original.get('image', '')
boxes = []
for ann in annotations:
if isinstance(ann, list):
for a in ann:
if a.get('type') == 'rectanglelabels':
value = a.get('value', {})
boxes.append({
"label": value.get('rectanglelabels', [''])[0],
"x": value.get('x', 0) / 100,
"y": value.get('y', 0) / 100,
"width": value.get('width', 0) / 100,
"height": value.get('height', 0) / 100
})
if image_url:
yolo_data.append({
"image": image_url,
"boxes": boxes
})
content = json.dumps(yolo_data, ensure_ascii=False, indent=2)
return file_name, content
@staticmethod
def _export_coco(data: List[Dict], project_id: str, timestamp: str) -> tuple:
"""导出COCO格式"""
file_name = f"export_{project_id}_coco_{timestamp}.json"
coco_data = {
"images": [],
"annotations": [],
"categories": []
}
category_map = {}
annotation_id = 1
for idx, item in enumerate(data):
original = item.get('original_data', {})
annotations = item.get('annotations', [])
image_url = original.get('image', '')
# 添加图像
coco_data["images"].append({
"id": idx + 1,
"file_name": image_url,
"width": 0,
"height": 0
})
# 处理标注
for ann in annotations:
if isinstance(ann, list):
for a in ann:
if a.get('type') == 'rectanglelabels':
value = a.get('value', {})
label = value.get('rectanglelabels', [''])[0]
# 添加类别
if label and label not in category_map:
cat_id = len(category_map) + 1
category_map[label] = cat_id
coco_data["categories"].append({
"id": cat_id,
"name": label
})
if label:
coco_data["annotations"].append({
"id": annotation_id,
"image_id": idx + 1,
"category_id": category_map.get(label, 0),
"bbox": [
value.get('x', 0),
value.get('y', 0),
value.get('width', 0),
value.get('height', 0)
],
"area": value.get('width', 0) * value.get('height', 0),
"iscrowd": 0
})
annotation_id += 1
content = json.dumps(coco_data, ensure_ascii=False, indent=2)
return file_name, content
@staticmethod
def _export_alpaca(data: List[Dict], project_id: str, timestamp: str) -> tuple:
"""导出Alpaca指令微调格式"""
file_name = f"export_{project_id}_alpaca_{timestamp}.json"
alpaca_data = []
for item in data:
original = item.get('original_data', {})
annotations = item.get('annotations', [])
# 获取原始文本
text = original.get('text', '')
# 获取标注结果
label = ""
if annotations:
for ann in annotations:
if isinstance(ann, list):
for a in ann:
if 'value' in a and 'choices' in a['value']:
label = ', '.join(a['value']['choices'])
break
elif isinstance(ann, dict):
if 'value' in ann and 'choices' in ann['value']:
label = ', '.join(ann['value']['choices'])
if text:
alpaca_data.append({
"instruction": "请对以下文本进行分类",
"input": text,
"output": label or "未标注"
})
content = json.dumps(alpaca_data, ensure_ascii=False, indent=2)
return file_name, content