""" Export API router. Provides endpoints for exporting project annotations in various formats. """ import os from fastapi import APIRouter, HTTPException, status, Request from fastapi.responses import FileResponse from database import get_db_connection from schemas.export import ( ExportRequest, ExportJobResponse, ExportProgressResponse, ExportFormat, ExportStatus ) from services.export_service import ExportService router = APIRouter( prefix="/api", tags=["export"] ) @router.post("/projects/{project_id}/export", response_model=ExportJobResponse) async def create_export( request: Request, project_id: str, export_request: ExportRequest ): """ Create a new export job for a project. Args: request: FastAPI Request object (contains user info) project_id: Project unique identifier export_request: Export configuration Returns: Export job details Raises: HTTPException: 404 if project not found HTTPException: 403 if user doesn't have permission Requires authentication (admin only). """ # Get current user user = request.state.user # Check admin permission if user["role"] != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="只有管理员可以导出数据" ) # Verify project exists project = ExportService.get_project_data(project_id) if not project: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"项目 '{project_id}' 不存在" ) # Create export job job_id = ExportService.create_export_job( project_id=project_id, format=export_request.format.value, status_filter=export_request.status_filter.value, include_metadata=export_request.include_metadata, created_by=user["id"] ) # Execute export synchronously (for simplicity) # In production, this should be done asynchronously with a task queue result = ExportService.execute_export( job_id=job_id, project_id=project_id, format=export_request.format.value, status_filter=export_request.status_filter.value, include_metadata=export_request.include_metadata ) # Get updated job info job = ExportService.get_export_job(job_id) if not job: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="导出任务创建失败" ) # Build download URL if completed download_url = None if job["status"] == ExportStatus.COMPLETED.value and job["file_path"]: download_url = f"/api/exports/{job_id}/download" return ExportJobResponse( id=job["id"], project_id=job["project_id"], format=job["format"], status=job["status"], status_filter=job["status_filter"], include_metadata=job["include_metadata"], file_path=job["file_path"], download_url=download_url, error_message=job["error_message"], created_at=job["created_at"], completed_at=job["completed_at"], total_tasks=job["total_tasks"], exported_tasks=job["exported_tasks"] ) @router.get("/exports/{export_id}/status", response_model=ExportProgressResponse) async def get_export_status(request: Request, export_id: str): """ Get export job status and progress. Args: request: FastAPI Request object (contains user info) export_id: Export job unique identifier Returns: Export job progress details Raises: HTTPException: 404 if export job not found Requires authentication. """ job = ExportService.get_export_job(export_id) if not job: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"导出任务 '{export_id}' 不存在" ) # Calculate progress progress = 0.0 if job["total_tasks"] > 0: progress = job["exported_tasks"] / job["total_tasks"] elif job["status"] == ExportStatus.COMPLETED.value: progress = 1.0 return ExportProgressResponse( id=job["id"], status=job["status"], progress=progress, total_tasks=job["total_tasks"], exported_tasks=job["exported_tasks"], error_message=job["error_message"] ) @router.get("/exports/{export_id}/download") async def download_export(request: Request, export_id: str): """ Download exported file. Args: request: FastAPI Request object (contains user info) export_id: Export job unique identifier Returns: File download response Raises: HTTPException: 404 if export job or file not found HTTPException: 400 if export not completed Requires authentication. """ job = ExportService.get_export_job(export_id) if not job: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"导出任务 '{export_id}' 不存在" ) if job["status"] != ExportStatus.COMPLETED.value: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"导出任务尚未完成,当前状态: {job['status']}" ) if not job["file_path"] or not os.path.exists(job["file_path"]): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="导出文件不存在" ) # Determine media type based on format media_type = "application/json" if job["format"] == ExportFormat.CSV.value: media_type = "text/csv" # Get filename from path filename = os.path.basename(job["file_path"]) return FileResponse( path=job["file_path"], media_type=media_type, filename=filename ) @router.get("/exports/{export_id}", response_model=ExportJobResponse) async def get_export_job(request: Request, export_id: str): """ Get export job details. Args: request: FastAPI Request object (contains user info) export_id: Export job unique identifier Returns: Export job details Raises: HTTPException: 404 if export job not found Requires authentication. """ job = ExportService.get_export_job(export_id) if not job: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"导出任务 '{export_id}' 不存在" ) # Build download URL if completed download_url = None if job["status"] == ExportStatus.COMPLETED.value and job["file_path"]: download_url = f"/api/exports/{job['id']}/download" return ExportJobResponse( id=job["id"], project_id=job["project_id"], format=job["format"], status=job["status"], status_filter=job["status_filter"], include_metadata=job["include_metadata"], file_path=job["file_path"], download_url=download_url, error_message=job["error_message"], created_at=job["created_at"], completed_at=job["completed_at"], total_tasks=job["total_tasks"], exported_tasks=job["exported_tasks"] )