chenkun 2 тижнів тому
батько
коміт
bd817f6a17

+ 6 - 6
src/app/config/config.ini

@@ -99,13 +99,13 @@ MINERU_TOKEN=eyJ0eXBlIjoiSldUIiwiYWxnIjoiSFM1MTIifQ.eyJqdGkiOiIyNjQwMDgzNiIsInJv
 MINERU_API_APPLY=https://mineru.net/api/v4/file-urls/batch
 MINERU_API_BATCH_RESULT=https://mineru.net/api/v4/extract-results/batch/{}
 
+# 外部标注平台配置
+[external_api]
+admin_token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyXzIwMjYwMTI5MTUxMTM4XzkzYjIyMjZkIiwidXNlcm5hbWUiOiJhZG1pbiIsImVtYWlsIjoiYWRtaW5AZXhhbXBsZS5jb20iLCJyb2xlIjoiYWRtaW4iLCJleHAiOjEwNDEwMTg5NjYxLCJpYXQiOjE3NzAyNzYwNjEsInR5cGUiOiJhY2Nlc3MifQ.QW5qwNXop3Id4fVE9rpelTgdyGZUzMralQAFOVj4Mtw
+project_api_url=http://192.168.92.61:9003/api/external/projects
+download_base_url=http://192.168.92.61:9003
 
 # embedding模型配置
 EMBEDDING_BASE_URL=http://192.168.91.253:9003/v1
 EMBEDDING_MODEL=Qwen3-Embedding-8B
-EMBEDDING_API_KEY=dummy
-
-# 外部标注平台联动配置
-[external_platform]
-API_URL=http://192.168.92.61:9003/api/external/projects/init
-ADMIN_TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyXzIwMjYwMTI5MTUxMTM4XzkzYjIyMjZkIiwidXNlcm5hbWUiOiJhZG1pbiIsImVtYWlsIjoiYWRtaW5AZXhhbXBsZS5jb20iLCJyb2xlIjoiYWRtaW4iLCJleHAiOjEwNDEwMTg5NjYxLCJpYXQiOjE3NzAyNzYwNjEsInR5cGUiOiJhY2Nlc3MifQ.QW5qwNXop3Id4fVE9rpelTgdyGZUzMralQAFOVj4Mtw
+EMBEDDING_API_KEY=dummy

+ 3 - 2
src/app/sample/schemas/sample_schemas.py

@@ -85,5 +85,6 @@ class UploadUrlResponse(BaseModel):
     object_name: str
 
 class ExportRequest(BaseModel):
-    project_id: str
-    format: Optional[str] = "json"
+    project_id: Optional[str] = None
+    format: Optional[str] = "json"
+    completed_only: Optional[bool] = False

+ 2 - 1
src/app/server/app.py

@@ -256,7 +256,8 @@ async def root():
 app.include_router(system_router, prefix="/api/v1")
 app.include_router(oauth_router, prefix="")
 app.include_router(auth_router, prefix="/api/v1")
-app.include_router(sample_router, prefix="/api/v1")
+app.include_router(sample_router, prefix="/api/v1/sample")
+app.include_router(sample_router, prefix="/api")  # 兼容外部测试脚本的路径 (/api/external/...)
 app.include_router(knowledge_base_router, prefix="/api/v1")
 app.include_router(snippet_router, prefix="/api/v1")
 app.include_router(tag_router, prefix="/api/v1")

+ 200 - 28
src/app/services/task_service.py

@@ -36,9 +36,22 @@ class TaskService:
             if not cursor.fetchone():
                 cursor.execute("ALTER TABLE t_task_management ADD COLUMN project_name varchar(255) NULL COMMENT '项目显示名称' AFTER project_id")
             
+            cursor.execute("SHOW COLUMNS FROM t_task_management LIKE 'external_completed_count'")
+            if not cursor.fetchone():
+                cursor.execute("ALTER TABLE t_task_management ADD COLUMN external_completed_count int NULL DEFAULT 0 COMMENT '外部平台完成数量' AFTER annotation_status")
+            
+            cursor.execute("SHOW COLUMNS FROM t_task_management LIKE 'external_total_count'")
+            if not cursor.fetchone():
+                cursor.execute("ALTER TABLE t_task_management ADD COLUMN external_total_count int NULL DEFAULT 0 COMMENT '外部平台总任务数量' AFTER external_completed_count")
+            
             conn.commit()
 
             # 2. 处理索引冲突
+            cursor.execute("SHOW COLUMNS FROM t_task_management LIKE 'file_url'")
+            if not cursor.fetchone():
+                cursor.execute("ALTER TABLE t_task_management ADD COLUMN file_url VARCHAR(512) DEFAULT NULL AFTER metadata")
+                logger.info("Added column file_url to t_task_management")
+
             cursor.execute("SHOW INDEX FROM t_task_management WHERE Column_name = 'business_id'")
             indexes = cursor.fetchall()
             for idx in indexes:
@@ -140,9 +153,15 @@ class TaskService:
                 SELECT 
                     project_id, 
                     MAX(project_name) as project_name,
+                    MAX(task_id) as task_id,
                     MAX(tag) as tag,
                     MAX(id) as sort_id,
-                    COUNT(*) as file_count
+                    COALESCE(NULLIF(MAX(external_total_count), 0), COUNT(*)) as file_count,
+                    COALESCE(
+                        MAX(external_completed_count),
+                        SUM(CASE WHEN annotation_status = 'completed' THEN 1 ELSE 0 END),
+                        0
+                    ) as completed_count
                 FROM t_task_management 
                 WHERE type = %s 
                 GROUP BY project_id
@@ -427,7 +446,7 @@ class TaskService:
         try:
             # 1. 获取任务记录
             sql_tasks = """
-                SELECT business_id as id, type, task_id, tag, metadata
+                SELECT business_id as id, type, task_id, project_name, tag, metadata
                 FROM t_task_management
                 WHERE project_id = %s
             """
@@ -440,6 +459,7 @@ class TaskService:
             # 2. 解析基本信息
             first_row = rows[0]
             internal_task_type = first_row['type']
+            project_name = first_row.get('project_name') or project_id
             remote_project_id = first_row.get('task_id') or project_id
             
             # 映射任务类型
@@ -468,6 +488,9 @@ class TaskService:
             for item in rows:
                 task_id = item['id']
                 
+                # 记录原始数据状态
+                logger.debug(f"正在处理导出任务 {task_id}, tag: {item.get('tag')}, metadata_keys: {list(json.loads(item['metadata']).keys()) if item.get('metadata') else 'None'}")
+                
                 # 提取并处理标签
                 doc_tags = []
                 if item.get('tag'):
@@ -486,6 +509,7 @@ class TaskService:
                 
                 # 获取任务内容
                 task_contents = []
+                annotation_results = []
                 if internal_task_type == 'data':
                     task_contents = milvus_data_map.get(task_id, [])
                     if not task_contents:
@@ -516,15 +540,21 @@ class TaskService:
                     if doc_tags:
                         task_metadata['tags'] = [{"tag": tag} for tag in doc_tags]
                         
-                    final_tasks.append({
+                    task_item = {
                         "id": f"{task_id}_{idx}" if len(task_contents) > 1 else task_id,
                         "content": content,
                         "metadata": task_metadata
-                    })
+                    }
+                    
+                    # 尝试从元数据中提取 annotation_result
+                    if db_metadata and 'annotation_result' in db_metadata:
+                        task_item['annotation_result'] = db_metadata['annotation_result']
+                    
+                    final_tasks.append(task_item)
 
             # 统一进行一次递归序列化处理
             return self._serialize_datetime({
-                "name": project_id,
+                "name": project_name,
                 "description": "",
                 "task_type": external_task_type,
                 "data": final_tasks,
@@ -618,10 +648,9 @@ class TaskService:
             
             # 2. 获取配置
             from app.core.config import config_handler
-            api_url = config_handler.get('external_platform', 'API_URL', 'http://192.168.92.61:9003/api/external/projects/init')
-            # 转换 init URL 为 progress URL
-            progress_url = api_url.replace('/init', f'/{remote_project_id}/progress')
-            token = config_handler.get('external_platform', 'ADMIN_TOKEN', '')
+            api_base_url = config_handler.get('external_api', 'project_api_url', 'http://192.168.92.61:9003/api/external/projects').rstrip('/')
+            progress_url = f"{api_base_url}/{remote_project_id}/progress"
+            token = config_handler.get('external_api', 'admin_token', '')
 
             # 3. 发送请求
             async with httpx.AsyncClient(timeout=10.0) as client:
@@ -629,7 +658,29 @@ class TaskService:
                 response = await client.get(progress_url, headers=headers)
                 
                 if response.status_code == 200:
-                    return response.json()
+                    data = response.json()
+                    # 同步更新本地缓存的完成数量和总数
+                    completed_count = data.get('completed_tasks', 0)
+                    total_count = data.get('total_tasks', 0)
+                    try:
+                        conn_update = get_db_connection()
+                        if conn_update:
+                            cursor_update = conn_update.cursor()
+                            cursor_update.execute(
+                                """
+                                UPDATE t_task_management 
+                                SET external_completed_count = %s, external_total_count = %s 
+                                WHERE project_id = %s
+                                """,
+                                (completed_count, total_count, project_id)
+                            )
+                            conn_update.commit()
+                            cursor_update.close()
+                            conn_update.close()
+                    except Exception as ex:
+                        logger.warning(f"更新本地进度缓存失败: {ex}")
+                        
+                    return data
                 else:
                     logger.error(f"查询进度失败: {response.status_code} - {response.text}")
                     return {"error": f"外部平台返回错误 ({response.status_code})"}
@@ -640,7 +691,7 @@ class TaskService:
         finally:
             conn.close()
 
-    async def export_labeled_data(self, project_id: str, export_format: str = 'json') -> Dict[str, Any]:
+    async def export_labeled_data(self, project_id: str, export_format: str = 'json', completed_only: bool = True) -> Dict[str, Any]:
         """触发外部标注项目的数据导出"""
         conn = get_db_connection()
         if not conn:
@@ -660,16 +711,9 @@ class TaskService:
             
             # 2. 获取配置
             from app.core.config import config_handler
-            api_url = config_handler.get('external_platform', 'API_URL', 'http://192.168.92.61:9003/api/external/projects/init')
-            # 转换 init URL 为 export URL
-            # 如果 URL 包含 /init,则替换为 /export,否则直接追加
-            if '/init' in api_url:
-                export_url = api_url.replace('/init', f'/{remote_project_id}/export')
-            else:
-                # 兼容不包含 /init 的情况,直接拼接
-                base_url = api_url.rstrip('/')
-                export_url = f"{base_url}/{remote_project_id}/export"
-            token = config_handler.get('external_platform', 'ADMIN_TOKEN', '')
+            api_base_url = config_handler.get('external_api', 'project_api_url', 'http://192.168.92.61:9003/api/external/projects').rstrip('/')
+            export_url = f"{api_base_url}/{remote_project_id}/export"
+            token = config_handler.get('external_api', 'admin_token', '')
 
             # 3. 发送请求
             async with httpx.AsyncClient(timeout=30.0) as client:
@@ -679,12 +723,124 @@ class TaskService:
                 }
                 payload = {
                     "format": export_format,
-                    "completed_only": True
+                    "completed_only": completed_only
                 }
                 response = await client.post(export_url, json=payload, headers=headers)
                 
                 if response.status_code in (200, 201):
-                    return response.json()
+                    res_data = response.json()
+                    logger.info(f"外部平台导出响应数据类型: {type(res_data)}, 键名: {list(res_data.keys()) if isinstance(res_data, dict) else 'None'}")
+                    
+                    # 1. 统一获取下载地址并回写 (兼容 download_url 和 file_url)
+                    download_url = res_data.get('download_url') or res_data.get('file_url')
+                    if isinstance(res_data, dict) and download_url:
+                        try:
+                            cursor_update = conn.cursor()
+                            affected = cursor_update.execute(
+                                "UPDATE t_task_management SET file_url = %s WHERE project_id = %s",
+                                (download_url, project_id)
+                            )
+                            conn.commit()
+                            cursor_update.close()
+                            logger.info(f"导出时同步更新 file_url: {download_url}, 受影响行数: {affected}")
+                        except Exception as ex:
+                            logger.warning(f"导出同步回写 file_url 失败: {ex}")
+                    
+                    # 2. 同步回写 annotation_result 到 metadata
+                    # 情况 A: 接口直接返回了任务列表数据
+                    export_items = []
+                    if isinstance(res_data, dict) and 'data' in res_data and isinstance(res_data['data'], list):
+                        export_items = res_data['data']
+                    
+                    # 情况 B: 接口返回了文件链接,且格式为 JSON,尝试下载并解析以获取标注结果
+                    elif isinstance(res_data, dict) and download_url and res_data.get('format') == 'json':
+                        try:
+                            # 补全 URL 协议 (如果外部平台返回的是相对路径)
+                            full_download_url = download_url
+                            if not download_url.startswith('http'):
+                                from app.core.config import config_handler
+                                # 尝试从配置获取 base_url,如果没有则从 api_url 中提取
+                                base_url = config_handler.get('external_api', 'download_base_url', '')
+                                if not base_url:
+                                    # 兜底:从 project_api_url 中提取域名部分
+                                    api_base = config_handler.get('external_api', 'project_api_url', 'http://192.168.92.61:9003')
+                                    from urllib.parse import urlparse
+                                    parsed = urlparse(api_base)
+                                    base_url = f"{parsed.scheme}://{parsed.netloc}"
+                                
+                                full_download_url = f"{base_url.rstrip('/')}/{download_url.lstrip('/')}"
+                                
+                            logger.info(f"正在从导出链接获取详细标注数据以同步数据库: {full_download_url}")
+                            # 注意:这里需要带上 token
+                            file_res = await client.get(full_download_url, headers=headers)
+                            if file_res.status_code == 200:
+                                file_json = file_res.json()
+                                # 外部平台导出的 JSON 结构通常是 { "data": [...] } 或直接是 [...]
+                                if isinstance(file_json, dict) and 'data' in file_json:
+                                    export_items = file_json['data']
+                                elif isinstance(file_json, list):
+                                    export_items = file_json
+                                
+                                if export_items:
+                                    logger.info(f"成功获取导出项,共 {len(export_items)} 条。")
+                                else:
+                                    logger.warning("获取到的导出列表为空")
+                        except Exception as ex:
+                            logger.warning(f"从导出文件同步标注数据失败: {ex}")
+
+                    if export_items:
+                        updated_count = 0
+                        try:
+                            cursor_meta = conn.cursor()
+                            for ext_item in export_items:
+                                # 根据实际 JSON 结构提取数据
+                                # 1. 提取 original_id
+                                original_data = ext_item.get('original_data', {})
+                                meta = original_data.get('metadata', {}) if original_data else ext_item.get('metadata', {})
+                                # 增加兼容性:如果 metadata 里没有,尝试直接从 ext_item 找,或者从 external_id 提取
+                                original_id = meta.get('original_id')
+                                if not original_id:
+                                    ext_id = ext_item.get('external_id', '')
+                                    if '_' in ext_id: # 比如 "uuid_4" 这种结构,提取前面的 uuid
+                                        original_id = ext_id.rsplit('_', 1)[0]
+                                    else:
+                                        original_id = ext_id
+                                
+                                # 2. 提取 annotation_result
+                                annotations = ext_item.get('annotations', [])
+                                if annotations and isinstance(annotations, list):
+                                    annotation_res = annotations[0].get('result')
+                                else:
+                                    annotation_res = ext_item.get('annotation_result')
+                                
+                                if original_id and annotation_res is not None:
+                                    # 注意:这里需要根据 business_id 或 metadata 里的 original_id 来匹配
+                                    # 样本中心 t_task_management 表的 business_id 存的是原始数据的唯一标识
+                                    cursor_meta.execute(
+                                        "SELECT id, metadata FROM t_task_management WHERE business_id = %s OR id = %s", 
+                                        (original_id, original_id)
+                                    )
+                                    row = cursor_meta.fetchone()
+                                    if row:
+                                        db_id = row['id']
+                                        current_meta = json.loads(row['metadata']) if row['metadata'] else {}
+                                        current_meta['annotation_result'] = annotation_res
+                                        
+                                        cursor_meta.execute(
+                                            "UPDATE t_task_management SET metadata = %s WHERE id = %s",
+                                            (json.dumps(current_meta, ensure_ascii=False), db_id)
+                                        )
+                                        updated_count += 1
+                                    else:
+                                        logger.debug(f"未在数据库中找到对应的任务: {original_id}")
+                            
+                            conn.commit()
+                            cursor_meta.close()
+                            logger.info(f"已从导出数据同步回写 {updated_count} 条任务的 annotation_result")
+                        except Exception as ex:
+                            logger.warning(f"同步回写 annotation_result 异常: {ex}")
+                            
+                    return res_data
                 else:
                     logger.error(f"导出数据失败: {response.status_code} - {response.text}")
                     return {"error": f"外部平台返回错误 ({response.status_code})"}
@@ -713,8 +869,9 @@ class TaskService:
             
             # 2. 获取配置
             from app.core.config import config_handler
-            api_url = config_handler.get('external_platform', 'API_URL', 'http://192.168.92.61:9003/api/external/projects/init')
-            token = config_handler.get('external_platform', 'ADMIN_TOKEN', '')
+            api_base_url = config_handler.get('external_api', 'project_api_url', 'http://192.168.92.61:9003/api/external/projects').rstrip('/')
+            api_url = f"{api_base_url}/init"
+            token = config_handler.get('external_api', 'admin_token', '')
 
             # 3. 发送请求
             async with httpx.AsyncClient(timeout=60.0) as client:
@@ -727,17 +884,32 @@ class TaskService:
                 
                 if response.status_code in (200, 201):
                     res_data = response.json()
+                    logger.info(f"外部平台推送成功响应: {res_data}")
                     remote_project_id = res_data.get('project_id')
                     
-                    if remote_project_id:
-                        # 4. 回写外部项目 ID (复用当前连接)
+                    # 获取下载地址并回写 (兼容多种返回格式)
+                    download_url = res_data.get('download_url') or res_data.get('file_url')
+                    if download_url:
+                        # 4. 回写外部项目 ID 和下载地址 (复用当前连接)
+                        cursor = conn.cursor()
+                        # 先检查受影响行数
+                        affected = cursor.execute(
+                            "UPDATE t_task_management SET task_id = %s, file_url = %s WHERE project_id = %s", 
+                            (remote_project_id, download_url, project_id)
+                        )
+                        conn.commit()
+                        cursor.close()
+                        logger.info(f"已回写 task_id: {remote_project_id} 和 file_url: {download_url}, 受影响行数: {affected}")
+                    elif remote_project_id:
+                        # 仅回写外部项目 ID
                         cursor = conn.cursor()
-                        cursor.execute(
+                        affected = cursor.execute(
                             "UPDATE t_task_management SET task_id = %s WHERE project_id = %s", 
                             (remote_project_id, project_id)
                         )
                         conn.commit()
                         cursor.close()
+                        logger.info(f"仅回写 task_id: {remote_project_id}, 受影响行数: {affected}")
                     
                     return True, f"推送成功!外部项目ID: {remote_project_id or '未知'}"
                 else:

+ 84 - 4
src/views/sample_view.py

@@ -8,7 +8,7 @@ from datetime import datetime, timezone
 from typing import Optional, List, Any, Union
 
 from fastapi import APIRouter, Depends, HTTPException, Request, Response, BackgroundTasks
-from fastapi.responses import HTMLResponse
+from fastapi.responses import HTMLResponse, StreamingResponse
 from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
 
 from app.sample.schemas.sample_schemas import BatchEnterRequest, BatchDeleteRequest, ConvertRequest, DocumentAdd, UploadUrlRequest, ExportRequest
@@ -22,7 +22,7 @@ from app.services.task_service import task_service
 # 获取logger
 logger = logging.getLogger(__name__)
 
-router = APIRouter(prefix="/sample", tags=["样本中心"])
+router = APIRouter(tags=["样本中心"])
 security = HTTPBearer()
 
 security_optional = HTTPBearer(auto_error=False)
@@ -83,9 +83,10 @@ async def init_external_project(request: Request, credentials: HTTPAuthorization
         logger.exception("项目初始化接口异常")
         return ApiResponse(code=500, message=str(e)).model_dump()
 
+@router.get("/external/projects/{project_id}/progress")
 @router.get("/external/projects/progress")
 async def get_external_project_progress(
-    project_id: str, 
+    project_id: Optional[str] = None, 
     credentials: HTTPAuthorizationCredentials = Depends(security)
 ):
     """查询项目进度"""
@@ -94,6 +95,9 @@ async def get_external_project_progress(
         if not payload:
             return ApiResponse(code=401, message="无效的访问令牌").model_dump()
             
+        if not project_id:
+            return ApiResponse(code=400, message="缺少项目ID").model_dump()
+
         progress = await task_service.get_project_progress(project_id=project_id)
         if "error" in progress:
             return ApiResponse(code=500, message=progress["error"]).model_dump()
@@ -103,26 +107,102 @@ async def get_external_project_progress(
         logger.exception("查询进度接口异常")
         return ApiResponse(code=500, message=str(e)).model_dump()
 
+@router.post("/external/projects/{project_id}/export")
 @router.post("/external/projects/export")
 async def export_external_project(
     req: ExportRequest, 
+    project_id: Optional[str] = None,
     credentials: HTTPAuthorizationCredentials = Depends(security)
 ):
     """导出项目已完成的标注数据"""
     try:
+        logger.debug(f"收到导出请求: project_id={project_id}, req={req.model_dump()}, has_credentials={bool(credentials.credentials)}")
         payload = verify_token(credentials.credentials)
         if not payload:
+            logger.warning(f"导出请求 Token 验证失败: {credentials.credentials[:10]}...")
             return ApiResponse(code=401, message="无效的访问令牌").model_dump()
+        
+        # 优先从路径获取 project_id,否则从 body 获取
+        actual_project_id = project_id or req.project_id
+        if not actual_project_id:
+            logger.warning("导出请求缺少项目ID")
+            return ApiResponse(code=400, message="缺少项目ID").model_dump()
             
-        result = await task_service.export_labeled_data(project_id=req.project_id, export_format=req.format)
+        logger.info(f"正在为项目 {actual_project_id} 执行导出,格式: {req.format}")
+        result = await task_service.export_labeled_data(
+            project_id=actual_project_id, 
+            export_format=req.format,
+            completed_only=req.completed_only
+        )
+        
         if "error" in result:
+            logger.error(f"导出执行失败: {result['error']}")
             return ApiResponse(code=500, message=result["error"]).model_dump()
             
+        logger.info(f"项目 {actual_project_id} 导出成功")
         return ApiResponse(code=0, message="成功", data=result).model_dump()
     except Exception as e:
         logger.exception("导出数据接口异常")
         return ApiResponse(code=500, message=str(e)).model_dump()
 
+@router.get("/external/download-proxy")
+async def download_proxy(url: str, filename: str, credentials: HTTPAuthorizationCredentials = Depends(security)):
+    """后端中转下载外部平台文件,解决跨域和 Token 携带问题"""
+    try:
+        payload = verify_token(credentials.credentials)
+        if not payload:
+            raise HTTPException(status_code=401, detail="无效的访问令牌")
+            
+        # 确保 URL 是完整的
+        if not url.startswith('http'):
+            from app.core.config import config_handler
+            base_url = config_handler.get("external_api", "download_base_url", "http://192.168.92.61:9003")
+            url = f"{base_url.rstrip('/')}/{url.lstrip('/')}"
+            
+        logger.info(f"正在中转下载文件: {url}")
+        
+        # 使用标注平台的管理员 Token 进行下载
+        from app.core.config import config_handler
+        admin_token = config_handler.get("external_api", "admin_token", "")
+        if not admin_token:
+            # 如果配置中没有,则尝试使用当前用户的 Token(兜底)
+            admin_token = credentials.credentials
+        
+        client = httpx.AsyncClient(timeout=60.0, follow_redirects=True)
+        headers = {
+            "Authorization": f"Bearer {admin_token}"
+        }
+        
+        # 发送请求但不立即读取内容
+        req = client.build_request("GET", url, headers=headers)
+        response = await client.send(req, stream=True)
+        
+        if response.status_code != 200:
+            await response.aclose()
+            await client.aclose()
+            logger.error(f"外部平台返回错误: {response.status_code}")
+            return ApiResponse(code=response.status_code, message="外部平台下载失败").model_dump()
+            
+        async def stream_generator():
+            try:
+                async for chunk in response.aiter_bytes():
+                    yield chunk
+            finally:
+                await response.aclose()
+                await client.aclose()
+        
+        # 流式返回给前端
+        return StreamingResponse(
+            stream_generator(),
+            media_type=response.headers.get("content-type", "application/octet-stream"),
+            headers={
+                "Content-Disposition": f"attachment; filename={urllib.parse.quote(filename)}"
+            }
+        )
+    except Exception as e:
+        logger.exception("下载代理异常")
+        return ApiResponse(code=500, message=str(e)).model_dump()
+
 @router.post("/documents/upload-url")
 async def get_upload_url(req: UploadUrlRequest, credentials: HTTPAuthorizationCredentials = Depends(security)):
     """获取 MinIO 预签名上传 URL"""