""" Statistics API router. Provides endpoints for project and platform statistics. """ import json from fastapi import APIRouter, HTTPException, status, Request from database import get_db_connection from schemas.statistics import ( ProjectStatisticsResponse, OverviewStatisticsResponse, UserTaskStats ) router = APIRouter( prefix="/api/statistics", tags=["statistics"] ) def calculate_project_statistics(project_id: str) -> ProjectStatisticsResponse: """ 计算项目统计信息。 Args: project_id: 项目ID Returns: ProjectStatisticsResponse 包含项目统计信息 """ with get_db_connection() as conn: cursor = conn.cursor() # 获取项目信息 cursor.execute(""" SELECT id, name FROM projects WHERE id = ? """, (project_id,)) project_row = cursor.fetchone() if not project_row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"项目 '{project_id}' 不存在" ) project_name = project_row["name"] # 任务统计 cursor.execute(""" SELECT COUNT(*) as total_tasks, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed_tasks, SUM(CASE WHEN status = 'in_progress' THEN 1 ELSE 0 END) as in_progress_tasks, SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending_tasks FROM tasks WHERE project_id = ? """, (project_id,)) task_row = cursor.fetchone() total_tasks = task_row["total_tasks"] or 0 completed_tasks = task_row["completed_tasks"] or 0 in_progress_tasks = task_row["in_progress_tasks"] or 0 pending_tasks = task_row["pending_tasks"] or 0 # 计算数据条数(从任务的 data.items 中统计) cursor.execute(""" SELECT data FROM tasks WHERE project_id = ? """, (project_id,)) total_items = 0 task_rows = cursor.fetchall() for row in task_rows: try: data = json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] items = data.get("items", []) total_items += len(items) except: pass # 标注数量 cursor.execute(""" SELECT COUNT(*) as annotation_count FROM annotations a JOIN tasks t ON a.task_id = t.id WHERE t.project_id = ? """, (project_id,)) annotation_row = cursor.fetchone() annotated_items = annotation_row["annotation_count"] or 0 # 计算完成率 task_completion_rate = 0.0 if total_tasks > 0: task_completion_rate = round(completed_tasks / total_tasks * 100, 2) data_completion_rate = 0.0 if total_items > 0: data_completion_rate = round(annotated_items / total_items * 100, 2) # 人员统计 cursor.execute(""" SELECT u.id as user_id, u.username, COUNT(DISTINCT t.id) as assigned_tasks, SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) as completed_tasks, SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END) as in_progress_tasks, SUM(CASE WHEN t.status = 'pending' THEN 1 ELSE 0 END) as pending_tasks, COUNT(DISTINCT a.id) as annotation_count FROM users u LEFT JOIN tasks t ON t.assigned_to = u.id AND t.project_id = ? LEFT JOIN annotations a ON a.task_id = t.id AND a.user_id = u.id WHERE u.role = 'annotator' AND t.id IS NOT NULL GROUP BY u.id, u.username ORDER BY assigned_tasks DESC """, (project_id,)) user_stats = [] for user_row in cursor.fetchall(): assigned = user_row["assigned_tasks"] or 0 completed = user_row["completed_tasks"] or 0 completion_rate = 0.0 if assigned > 0: completion_rate = round(completed / assigned * 100, 2) user_stats.append(UserTaskStats( user_id=user_row["user_id"], username=user_row["username"], assigned_tasks=assigned, completed_tasks=completed, in_progress_tasks=user_row["in_progress_tasks"] or 0, pending_tasks=user_row["pending_tasks"] or 0, annotation_count=user_row["annotation_count"] or 0, completion_rate=completion_rate )) return ProjectStatisticsResponse( project_id=project_id, project_name=project_name, total_tasks=total_tasks, completed_tasks=completed_tasks, in_progress_tasks=in_progress_tasks, pending_tasks=pending_tasks, total_items=total_items, annotated_items=annotated_items, task_completion_rate=task_completion_rate, data_completion_rate=data_completion_rate, user_stats=user_stats ) def calculate_overview_statistics() -> OverviewStatisticsResponse: """ 计算平台总览统计信息。 Returns: OverviewStatisticsResponse 包含平台统计信息 """ with get_db_connection() as conn: cursor = conn.cursor() # 项目统计 cursor.execute("SELECT COUNT(*) as total FROM projects") total_projects = cursor.fetchone()["total"] or 0 # 任务统计 cursor.execute(""" SELECT COUNT(*) as total_tasks, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed_tasks, SUM(CASE WHEN status = 'in_progress' THEN 1 ELSE 0 END) as in_progress_tasks, SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending_tasks FROM tasks """) task_row = cursor.fetchone() total_tasks = task_row["total_tasks"] or 0 completed_tasks = task_row["completed_tasks"] or 0 in_progress_tasks = task_row["in_progress_tasks"] or 0 pending_tasks = task_row["pending_tasks"] or 0 # 用户统计 cursor.execute(""" SELECT COUNT(*) as total_users, SUM(CASE WHEN role = 'admin' THEN 1 ELSE 0 END) as admin_count, SUM(CASE WHEN role = 'annotator' THEN 1 ELSE 0 END) as annotator_count FROM users """) user_row = cursor.fetchone() total_users = user_row["total_users"] or 0 admin_count = user_row["admin_count"] or 0 annotator_count = user_row["annotator_count"] or 0 # 标注统计 cursor.execute("SELECT COUNT(*) as total FROM annotations") total_annotations = cursor.fetchone()["total"] or 0 # 计算整体完成率 overall_completion_rate = 0.0 if total_tasks > 0: overall_completion_rate = round(completed_tasks / total_tasks * 100, 2) return OverviewStatisticsResponse( total_projects=total_projects, total_tasks=total_tasks, completed_tasks=completed_tasks, in_progress_tasks=in_progress_tasks, pending_tasks=pending_tasks, total_users=total_users, admin_count=admin_count, annotator_count=annotator_count, total_annotations=total_annotations, overall_completion_rate=overall_completion_rate ) @router.get("/overview", response_model=OverviewStatisticsResponse) async def get_overview_statistics(request: Request): """ 获取平台总览统计信息。 需要管理员权限。 """ user = getattr(request.state, "user", None) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="未认证" ) if user["role"] != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="只有管理员可以查看平台统计" ) return calculate_overview_statistics() @router.get("/projects/{project_id}", response_model=ProjectStatisticsResponse) async def get_project_statistics(request: Request, project_id: str): """ 获取项目统计信息。 包含任务统计、数据统计和人员统计。 """ user = getattr(request.state, "user", None) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="未认证" ) return calculate_project_statistics(project_id)