"""
External API Service.
Provides business logic for external system integration.
"""
import uuid
import json
import logging
import random
from datetime import datetime
from typing import Optional, List, Dict, Any
from database import get_db_connection
from schemas.external import (
TaskType, ProjectInitRequest, ProjectInitResponse,
ProgressResponse, AnnotatorProgress,
ExternalExportFormat, ExternalExportRequest, ExternalExportResponse,
TaskDataItem, TagItem
)
logger = logging.getLogger(__name__)
def generate_random_color() -> str:
"""
生成随机颜色
Returns:
str: #RRGGBB 格式的颜色字符串
"""
return f"#{random.randint(0, 0xFFFFFF):06x}"
# 预定义的颜色列表,用于生成更美观的颜色
PRESET_COLORS = [
"#FF5733", "#33FF57", "#3357FF", "#FF33F5", "#F5FF33",
"#33FFF5", "#FF8C33", "#8C33FF", "#33FF8C", "#FF338C",
"#5733FF", "#57FF33", "#FF3357", "#33F5FF", "#F533FF",
"#8CFF33", "#338CFF", "#FF338C", "#33FF57", "#5733FF"
]
def get_color_for_tag(index: int, specified_color: Optional[str] = None) -> str:
"""
获取标签颜色
Args:
index: 标签索引,用于从预设颜色中选择
specified_color: 指定的颜色,如果有则直接使用
Returns:
str: #RRGGBB 格式的颜色字符串
"""
if specified_color:
return specified_color
if index < len(PRESET_COLORS):
return PRESET_COLORS[index]
return generate_random_color()
# 默认XML配置模板(不含标签,由管理员后续配置)
DEFAULT_CONFIGS = {
TaskType.TEXT_CLASSIFICATION: """
""",
TaskType.IMAGE_CLASSIFICATION: """
""",
TaskType.OBJECT_DETECTION: """
""",
TaskType.NER: """
""",
TaskType.POLYGON: """
"""
}
def generate_config_with_tags(task_type: TaskType, tags: Optional[List[TagItem]] = None) -> str:
"""
根据任务类型和标签生成XML配置
Args:
task_type: 任务类型
tags: 标签列表,可选
Returns:
str: 生成的XML配置字符串
"""
if not tags or len(tags) == 0:
# 没有标签,返回默认配置
return DEFAULT_CONFIGS.get(task_type, DEFAULT_CONFIGS[TaskType.TEXT_CLASSIFICATION])
# 根据任务类型生成带标签的配置
if task_type == TaskType.TEXT_CLASSIFICATION:
labels_xml = "\n".join([
f' '
for i, tag in enumerate(tags)
])
return f"""
{labels_xml}
"""
elif task_type == TaskType.IMAGE_CLASSIFICATION:
labels_xml = "\n".join([
f' '
for i, tag in enumerate(tags)
])
return f"""
{labels_xml}
"""
elif task_type == TaskType.OBJECT_DETECTION:
labels_xml = "\n".join([
f' '
for i, tag in enumerate(tags)
])
return f"""
{labels_xml}
"""
elif task_type == TaskType.NER:
labels_xml = "\n".join([
f' '
for i, tag in enumerate(tags)
])
return f"""
{labels_xml}
"""
elif task_type == TaskType.POLYGON:
labels_xml = "\n".join([
f' '
for i, tag in enumerate(tags)
])
return f"""
{labels_xml}
"""
else:
return DEFAULT_CONFIGS.get(task_type, DEFAULT_CONFIGS[TaskType.TEXT_CLASSIFICATION])
class ExternalService:
"""对外API服务类"""
@staticmethod
def get_default_config(task_type: TaskType) -> str:
"""获取任务类型对应的默认XML配置"""
return DEFAULT_CONFIGS.get(task_type, DEFAULT_CONFIGS[TaskType.TEXT_CLASSIFICATION])
@staticmethod
def init_project(request: ProjectInitRequest, user_id: str) -> ProjectInitResponse:
"""
初始化项目并创建任务
Args:
request: 项目初始化请求
user_id: 创建者用户ID
Returns:
ProjectInitResponse: 项目初始化响应
"""
# 生成项目ID
project_id = f"proj_{uuid.uuid4().hex[:12]}"
# 根据是否有标签生成配置
if request.tags and len(request.tags) > 0:
config = generate_config_with_tags(request.task_type, request.tags)
else:
config = ExternalService.get_default_config(request.task_type)
with get_db_connection() as conn:
cursor = conn.cursor()
# 创建项目
cursor.execute("""
INSERT INTO projects (id, name, description, config, status, source, task_type, external_id, updated_at)
VALUES (?, ?, ?, ?, 'draft', 'external', ?, ?, CURRENT_TIMESTAMP)
""", (
project_id,
request.name,
request.description or "",
config,
request.task_type.value,
request.external_id
))
# 创建任务
task_count = 0
for i, item in enumerate(request.data):
task_id = f"task_{uuid.uuid4().hex[:12]}"
task_name = f"Task {i + 1}"
# 根据任务类型构建数据格式
if request.task_type in [TaskType.TEXT_CLASSIFICATION, TaskType.NER]:
task_data = {
"text": item.content,
"external_id": item.id,
"metadata": item.metadata or {}
}
else:
task_data = {
"image": item.content,
"external_id": item.id,
"metadata": item.metadata or {}
}
cursor.execute("""
INSERT INTO tasks (id, project_id, name, data, status)
VALUES (?, ?, ?, ?, 'pending')
""", (
task_id,
project_id,
task_name,
json.dumps(task_data)
))
task_count += 1
# 获取创建时间
cursor.execute("SELECT created_at FROM projects WHERE id = ?", (project_id,))
row = cursor.fetchone()
created_at = row["created_at"] if row else datetime.now()
return ProjectInitResponse(
project_id=project_id,
project_name=request.name,
task_count=task_count,
status="draft",
created_at=created_at,
config=config,
external_id=request.external_id
)
@staticmethod
def get_project_progress(project_id: str) -> Optional[ProgressResponse]:
"""
获取项目进度
Args:
project_id: 项目ID
Returns:
ProgressResponse: 进度响应,如果项目不存在返回None
"""
with get_db_connection() as conn:
cursor = conn.cursor()
# 获取项目信息
cursor.execute("""
SELECT id, name, status, updated_at
FROM projects
WHERE id = ?
""", (project_id,))
project = cursor.fetchone()
if not project:
return None
# 获取任务统计
cursor.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'in_progress' THEN 1 ELSE 0 END) as in_progress,
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending
FROM tasks
WHERE project_id = ?
""", (project_id,))
stats = cursor.fetchone()
total_tasks = stats["total"] or 0
completed_tasks = stats["completed"] or 0
in_progress_tasks = stats["in_progress"] or 0
pending_tasks = stats["pending"] or 0
# 计算完成百分比
completion_percentage = 0.0
if total_tasks > 0:
completion_percentage = round((completed_tasks / total_tasks) * 100, 2)
# 获取标注人员统计
cursor.execute("""
SELECT
t.assigned_to,
u.username,
COUNT(*) as assigned_count,
SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) as completed_count,
SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END) as in_progress_count
FROM tasks t
LEFT JOIN users u ON t.assigned_to = u.id
WHERE t.project_id = ? AND t.assigned_to IS NOT NULL
GROUP BY t.assigned_to, u.username
""", (project_id,))
annotators = []
for row in cursor.fetchall():
assigned_count = row["assigned_count"] or 0
completed_count = row["completed_count"] or 0
completion_rate = 0.0
if assigned_count > 0:
completion_rate = round((completed_count / assigned_count) * 100, 2)
annotators.append(AnnotatorProgress(
user_id=row["assigned_to"] or "",
username=row["username"] or "Unknown",
assigned_count=assigned_count,
completed_count=completed_count,
in_progress_count=row["in_progress_count"] or 0,
completion_rate=completion_rate
))
return ProgressResponse(
project_id=project_id,
project_name=project["name"],
status=project["status"] or "draft",
total_tasks=total_tasks,
completed_tasks=completed_tasks,
in_progress_tasks=in_progress_tasks,
pending_tasks=pending_tasks,
completion_percentage=completion_percentage,
annotators=annotators,
last_updated=project["updated_at"]
)
@staticmethod
def check_project_exists(project_id: str) -> bool:
"""检查项目是否存在"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT id FROM projects WHERE id = ?", (project_id,))
return cursor.fetchone() is not None
@staticmethod
def export_project_data(
project_id: str,
request: ExternalExportRequest,
base_url: str = ""
) -> Optional[ExternalExportResponse]:
"""
导出项目数据
Args:
project_id: 项目ID
request: 导出请求
base_url: 基础URL,用于生成下载链接
Returns:
ExternalExportResponse: 导出响应,如果项目不存在返回None
"""
import os
from datetime import timedelta
with get_db_connection() as conn:
cursor = conn.cursor()
# 检查项目是否存在
cursor.execute("SELECT id, name FROM projects WHERE id = ?", (project_id,))
project = cursor.fetchone()
if not project:
return None
# 构建查询条件
status_filter = ""
if request.completed_only:
status_filter = "AND t.status = 'completed'"
# 获取任务和标注数据
cursor.execute(f"""
SELECT
t.id as task_id,
t.data,
t.status,
t.assigned_to,
u.username as annotator_name,
a.id as annotation_id,
a.result as annotation_result,
a.updated_at as annotation_time
FROM tasks t
LEFT JOIN users u ON t.assigned_to = u.id
LEFT JOIN annotations a ON t.id = a.task_id
WHERE t.project_id = ? {status_filter}
ORDER BY t.id
""", (project_id,))
rows = cursor.fetchall()
# 组织数据
tasks_data = {}
for row in rows:
task_id = row["task_id"]
if task_id not in tasks_data:
task_data = json.loads(row["data"]) if row["data"] else {}
tasks_data[task_id] = {
"task_id": task_id,
"external_id": task_data.get("external_id"),
"original_data": task_data,
"annotations": [],
"status": row["status"],
"annotator": row["annotator_name"],
"completed_at": None
}
if row["annotation_id"]:
annotation_result = json.loads(row["annotation_result"]) if row["annotation_result"] else {}
tasks_data[task_id]["annotations"].append(annotation_result)
if row["annotation_time"]:
tasks_data[task_id]["completed_at"] = str(row["annotation_time"])
# 转换为列表
export_data = list(tasks_data.values())
total_exported = len(export_data)
# 生成导出文件
export_id = f"export_{uuid.uuid4().hex[:12]}"
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 确保导出目录存在
export_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "exports")
os.makedirs(export_dir, exist_ok=True)
# 根据格式生成文件
file_name, file_content = ExternalService._generate_export_file(
export_data,
request.format,
project_id,
timestamp
)
file_path = os.path.join(export_dir, file_name)
# 写入文件
if isinstance(file_content, str):
with open(file_path, 'w', encoding='utf-8') as f:
f.write(file_content)
else:
with open(file_path, 'wb') as f:
f.write(file_content)
file_size = os.path.getsize(file_path)
# 记录导出任务
cursor.execute("""
INSERT INTO export_jobs (id, project_id, format, status, status_filter, file_path, total_tasks, exported_tasks, completed_at)
VALUES (?, ?, ?, 'completed', ?, ?, ?, ?, CURRENT_TIMESTAMP)
""", (
export_id,
project_id,
request.format.value,
'completed' if request.completed_only else 'all',
file_path,
total_exported,
total_exported
))
# 计算过期时间(7天后)
expires_at = datetime.now() + timedelta(days=7)
return ExternalExportResponse(
project_id=project_id,
format=request.format.value,
total_exported=total_exported,
file_url=f"/api/exports/{export_id}/download",
file_name=file_name,
file_size=file_size,
expires_at=expires_at
)
@staticmethod
def _generate_export_file(
data: List[Dict],
format: ExternalExportFormat,
project_id: str,
timestamp: str
) -> tuple:
"""
根据格式生成导出文件
Returns:
tuple: (文件名, 文件内容)
"""
if format == ExternalExportFormat.JSON:
return ExternalService._export_json(data, project_id, timestamp)
elif format == ExternalExportFormat.CSV:
return ExternalService._export_csv(data, project_id, timestamp)
elif format == ExternalExportFormat.SHAREGPT:
return ExternalService._export_sharegpt(data, project_id, timestamp)
elif format == ExternalExportFormat.YOLO:
return ExternalService._export_yolo(data, project_id, timestamp)
elif format == ExternalExportFormat.COCO:
return ExternalService._export_coco(data, project_id, timestamp)
elif format == ExternalExportFormat.ALPACA:
return ExternalService._export_alpaca(data, project_id, timestamp)
elif format == ExternalExportFormat.PASCAL_VOC:
return ExternalService._export_pascal_voc(data, project_id, timestamp)
else:
return ExternalService._export_json(data, project_id, timestamp)
@staticmethod
def _export_json(data: List[Dict], project_id: str, timestamp: str) -> tuple:
"""导出JSON格式"""
file_name = f"export_{project_id}_{timestamp}.json"
content = json.dumps(data, ensure_ascii=False, indent=2)
return file_name, content
@staticmethod
def _export_csv(data: List[Dict], project_id: str, timestamp: str) -> tuple:
"""导出CSV格式"""
import csv
import io
file_name = f"export_{project_id}_{timestamp}.csv"
output = io.StringIO()
writer = csv.writer(output)
# 写入表头
writer.writerow(['task_id', 'external_id', 'status', 'annotator', 'original_data', 'annotations'])
# 写入数据
for item in data:
writer.writerow([
item.get('task_id', ''),
item.get('external_id', ''),
item.get('status', ''),
item.get('annotator', ''),
json.dumps(item.get('original_data', {}), ensure_ascii=False),
json.dumps(item.get('annotations', []), ensure_ascii=False)
])
return file_name, output.getvalue()
@staticmethod
def _export_sharegpt(data: List[Dict], project_id: str, timestamp: str) -> tuple:
"""导出ShareGPT对话格式"""
file_name = f"export_{project_id}_sharegpt_{timestamp}.json"
conversations = []
for item in data:
original = item.get('original_data', {})
annotations = item.get('annotations', [])
# 获取原始文本
text = original.get('text', original.get('image', ''))
# 获取标注结果
label = ""
if annotations:
for ann in annotations:
if isinstance(ann, list):
for a in ann:
if 'value' in a and 'choices' in a['value']:
label = ', '.join(a['value']['choices'])
break
elif isinstance(ann, dict):
if 'value' in ann and 'choices' in ann['value']:
label = ', '.join(ann['value']['choices'])
if text and label:
conversations.append({
"conversations": [
{"from": "human", "value": text},
{"from": "gpt", "value": label}
]
})
content = json.dumps(conversations, ensure_ascii=False, indent=2)
return file_name, content
@staticmethod
def _export_yolo(data: List[Dict], project_id: str, timestamp: str) -> tuple:
"""导出YOLO格式(简化版,返回JSON描述)"""
file_name = f"export_{project_id}_yolo_{timestamp}.json"
yolo_data = []
for item in data:
original = item.get('original_data', {})
annotations = item.get('annotations', [])
image_url = original.get('image', '')
boxes = []
polygons = []
for ann in annotations:
if isinstance(ann, list):
for a in ann:
if a.get('type') == 'rectanglelabels':
value = a.get('value', {})
boxes.append({
"label": value.get('rectanglelabels', [''])[0],
"x": value.get('x', 0) / 100,
"y": value.get('y', 0) / 100,
"width": value.get('width', 0) / 100,
"height": value.get('height', 0) / 100
})
elif a.get('type') == 'polygonlabels':
value = a.get('value', {})
points = value.get('points', [])
# 将点坐标归一化到0-1范围
normalized_points = [[p[0] / 100, p[1] / 100] for p in points]
polygons.append({
"label": value.get('polygonlabels', [''])[0],
"points": normalized_points
})
if image_url:
entry = {"image": image_url}
if boxes:
entry["boxes"] = boxes
if polygons:
entry["polygons"] = polygons
yolo_data.append(entry)
content = json.dumps(yolo_data, ensure_ascii=False, indent=2)
return file_name, content
@staticmethod
def _export_coco(data: List[Dict], project_id: str, timestamp: str) -> tuple:
"""导出COCO格式"""
file_name = f"export_{project_id}_coco_{timestamp}.json"
coco_data = {
"images": [],
"annotations": [],
"categories": []
}
category_map = {}
annotation_id = 1
for idx, item in enumerate(data):
original = item.get('original_data', {})
annotations = item.get('annotations', [])
image_url = original.get('image', '')
# 添加图像
coco_data["images"].append({
"id": idx + 1,
"file_name": image_url,
"width": 0,
"height": 0
})
# 处理标注
for ann in annotations:
if isinstance(ann, list):
for a in ann:
ann_type = a.get('type', '')
value = a.get('value', {})
if ann_type == 'rectanglelabels':
label = value.get('rectanglelabels', [''])[0]
# 添加类别
if label and label not in category_map:
cat_id = len(category_map) + 1
category_map[label] = cat_id
coco_data["categories"].append({
"id": cat_id,
"name": label
})
if label:
coco_data["annotations"].append({
"id": annotation_id,
"image_id": idx + 1,
"category_id": category_map.get(label, 0),
"bbox": [
value.get('x', 0),
value.get('y', 0),
value.get('width', 0),
value.get('height', 0)
],
"area": value.get('width', 0) * value.get('height', 0),
"iscrowd": 0
})
annotation_id += 1
elif ann_type == 'polygonlabels':
label = value.get('polygonlabels', [''])[0]
points = value.get('points', [])
# 添加类别
if label and label not in category_map:
cat_id = len(category_map) + 1
category_map[label] = cat_id
coco_data["categories"].append({
"id": cat_id,
"name": label
})
if label and points:
# 将点列表转换为COCO segmentation格式 [x1, y1, x2, y2, ...]
segmentation = []
for p in points:
segmentation.extend([p[0], p[1]])
# 计算边界框
x_coords = [p[0] for p in points]
y_coords = [p[1] for p in points]
x_min, x_max = min(x_coords), max(x_coords)
y_min, y_max = min(y_coords), max(y_coords)
width = x_max - x_min
height = y_max - y_min
# 计算面积(使用鞋带公式)
n = len(points)
area = 0
for i in range(n):
j = (i + 1) % n
area += points[i][0] * points[j][1]
area -= points[j][0] * points[i][1]
area = abs(area) / 2
coco_data["annotations"].append({
"id": annotation_id,
"image_id": idx + 1,
"category_id": category_map.get(label, 0),
"segmentation": [segmentation],
"bbox": [x_min, y_min, width, height],
"area": area,
"iscrowd": 0
})
annotation_id += 1
content = json.dumps(coco_data, ensure_ascii=False, indent=2)
return file_name, content
@staticmethod
def _export_alpaca(data: List[Dict], project_id: str, timestamp: str) -> tuple:
"""导出Alpaca指令微调格式"""
file_name = f"export_{project_id}_alpaca_{timestamp}.json"
alpaca_data = []
for item in data:
original = item.get('original_data', {})
annotations = item.get('annotations', [])
# 获取原始文本
text = original.get('text', '')
# 获取标注结果
label = ""
if annotations:
for ann in annotations:
if isinstance(ann, list):
for a in ann:
if 'value' in a and 'choices' in a['value']:
label = ', '.join(a['value']['choices'])
break
elif isinstance(ann, dict):
if 'value' in ann and 'choices' in ann['value']:
label = ', '.join(ann['value']['choices'])
if text:
alpaca_data.append({
"instruction": "请对以下文本进行分类",
"input": text,
"output": label or "未标注"
})
content = json.dumps(alpaca_data, ensure_ascii=False, indent=2)
return file_name, content
@staticmethod
def _export_pascal_voc(data: List[Dict], project_id: str, timestamp: str) -> tuple:
"""
导出PascalVOC XML格式
PascalVOC格式是一种常用的目标检测数据集格式,每张图片对应一个XML文件。
由于我们需要返回单个文件,这里返回一个包含所有标注的JSON文件,
其中每个条目包含对应的PascalVOC XML内容。
"""
file_name = f"export_{project_id}_pascal_voc_{timestamp}.json"
voc_data = []
for idx, item in enumerate(data):
original = item.get('original_data', {})
annotations = item.get('annotations', [])
image_url = original.get('image', '')
# 从URL中提取文件名
image_filename = image_url.split('/')[-1] if image_url else f"image_{idx + 1}.jpg"
# 获取图像尺寸(如果有的话)
img_width = original.get('width', 0)
img_height = original.get('height', 0)
objects = []
# 处理标注
for ann in annotations:
if isinstance(ann, list):
for a in ann:
ann_type = a.get('type', '')
value = a.get('value', {})
if ann_type == 'rectanglelabels':
label = value.get('rectanglelabels', [''])[0]
if label:
# 转换百分比坐标为像素坐标
x_pct = value.get('x', 0)
y_pct = value.get('y', 0)
w_pct = value.get('width', 0)
h_pct = value.get('height', 0)
# 如果有图像尺寸,转换为像素;否则保持百分比
if img_width > 0 and img_height > 0:
xmin = int(x_pct * img_width / 100)
ymin = int(y_pct * img_height / 100)
xmax = int((x_pct + w_pct) * img_width / 100)
ymax = int((y_pct + h_pct) * img_height / 100)
else:
xmin = x_pct
ymin = y_pct
xmax = x_pct + w_pct
ymax = y_pct + h_pct
objects.append({
"name": label,
"pose": "Unspecified",
"truncated": 0,
"difficult": 0,
"bndbox": {
"xmin": xmin,
"ymin": ymin,
"xmax": xmax,
"ymax": ymax
}
})
elif ann_type == 'polygonlabels':
label = value.get('polygonlabels', [''])[0]
points = value.get('points', [])
if label and points:
# 计算边界框
x_coords = [p[0] for p in points]
y_coords = [p[1] for p in points]
if img_width > 0 and img_height > 0:
xmin = int(min(x_coords) * img_width / 100)
ymin = int(min(y_coords) * img_height / 100)
xmax = int(max(x_coords) * img_width / 100)
ymax = int(max(y_coords) * img_height / 100)
else:
xmin = min(x_coords)
ymin = min(y_coords)
xmax = max(x_coords)
ymax = max(y_coords)
# 转换多边形点坐标
if img_width > 0 and img_height > 0:
polygon_points = [[int(p[0] * img_width / 100), int(p[1] * img_height / 100)] for p in points]
else:
polygon_points = points
objects.append({
"name": label,
"pose": "Unspecified",
"truncated": 0,
"difficult": 0,
"bndbox": {
"xmin": xmin,
"ymin": ymin,
"xmax": xmax,
"ymax": ymax
},
"polygon": polygon_points
})
# 生成PascalVOC XML内容
xml_content = ExternalService._generate_voc_xml(
image_filename,
img_width or 0,
img_height or 0,
objects
)
voc_data.append({
"image": image_url,
"filename": image_filename,
"xml_content": xml_content,
"objects": objects
})
content = json.dumps(voc_data, ensure_ascii=False, indent=2)
return file_name, content
@staticmethod
def _generate_voc_xml(filename: str, width: int, height: int, objects: List[Dict]) -> str:
"""生成PascalVOC格式的XML字符串"""
xml_lines = [
'',
'',
f' {filename}',
' ',
' Annotation Platform',
' ',
' ',
f' {width}',
f' {height}',
' 3',
' ',
' 0'
]
for obj in objects:
xml_lines.append(' ')
xml_lines.append('')
return '\n'.join(xml_lines)