""" OSS文件管理路由 提供文件上传、下载接口 """ from fastapi import APIRouter, UploadFile, File, Depends, HTTPException, Query, Request from fastapi.responses import StreamingResponse, Response from sqlalchemy.orm import Session from typing import Optional import io from urllib.parse import urlparse from app.database import get_db from app.services.oss_service import get_oss_service, OSSService from app.services.user_service import UserService from app.services.auth_service import AuthService from app.middleware.auth_log_middleware import get_debug_mode, get_default_admin_user from app.dependencies.auth import get_current_user from app.models.user import User router = APIRouter(prefix="/api/oss", tags=["OSS文件管理"]) # 文件大小限制配置 MAX_FILE_SIZE_DEFAULT = 20 * 1024 * 1024 # 20MB(默认) MAX_FILE_SIZE_TINGWU = 6 * 1024 * 1024 * 1024 # 6GB(通义听悟音视频) @router.post("/upload") async def upload_file( file: UploadFile = File(...), prefix: str = Query(default="uploads", description="存储路径前缀"), current_user: User = Depends(get_current_user), oss_service: OSSService = Depends(get_oss_service) ): """ 上传文件到OSS - 需要用户认证 - 文件大小限制: - 通义听悟(tingwu/):6GB - 其他路径:20MB - 返回OSS公开访问URL """ content = await file.read() # 根据路径前缀选择不同的大小限制 max_size = MAX_FILE_SIZE_TINGWU if prefix.startswith('tingwu') else MAX_FILE_SIZE_DEFAULT max_size_mb = max_size / (1024 * 1024) if len(content) > max_size: raise HTTPException( status_code=400, detail=f"文件大小超过{max_size_mb:.0f}MB限制" ) try: url = oss_service.upload_file(content, prefix, file.filename) return { "code": 200, "message": "success", "data": { "url": url, "filename": file.filename, "size": len(content) } } except RuntimeError as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/file") async def get_file( request: Request, url: Optional[str] = Query(default=None, description="OSS完整URL"), path: Optional[str] = Query(default=None, description="OSS文件路径(不含域名)"), token: Optional[str] = Query(default=None, description="可选:JWT令牌(查询参数)"), db: Session = Depends(get_db), oss_service: OSSService = Depends(get_oss_service) ): user = None if token: try: payload = AuthService.verify_token(token) user_id = payload.get("user_id") if user_id: user_service = UserService(db) user = user_service.get_user_by_id(user_id) except Exception: pass if not user and get_debug_mode(): user = get_default_admin_user(db) if not user: raise HTTPException(status_code=401, detail="Authentication required") if not url and not path: raise HTTPException(status_code=400, detail="缺少参数") file_path = path if not file_path and url: parsed = urlparse(url) domain = oss_service.bucket_domain or "" expected_netloc = domain.replace("https://", "").replace("http://", "") if parsed.netloc != expected_netloc: raise HTTPException(status_code=400, detail="非法域名") file_path = parsed.path.lstrip("/") try: content, content_type = oss_service.download_file(file_path) return Response( content=content, media_type=content_type, headers={"Access-Control-Allow-Origin": "*"} ) except FileNotFoundError: raise HTTPException(status_code=404, detail="文件不存在") except RuntimeError as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/download") async def download_file( file_path: str = Query(..., description="OSS文件路径(不含域名)"), current_user: User = Depends(get_current_user), oss_service: OSSService = Depends(get_oss_service) ): """ 下载OSS文件 - 需要用户认证 - 通过文件路径获取文件内容 - 返回文件流 """ try: content, content_type = oss_service.download_file(file_path) return StreamingResponse( io.BytesIO(content), media_type=content_type, headers={"Content-Disposition": f"attachment; filename={file_path.split('/')[-1]}"} ) except FileNotFoundError: raise HTTPException(status_code=404, detail="文件不存在") except RuntimeError as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/signed-url") async def get_signed_url( file_path: str = Query(..., description="OSS文件路径(不含域名)"), expires: int = Query(default=3600, description="签名URL有效期(秒)"), current_user: User = Depends(get_current_user), oss_service: OSSService = Depends(get_oss_service) ): """ 获取文件签名URL - 需要用户认证 - 返回带签名的临时访问URL - 默认有效期1小时 """ try: url = oss_service.get_signed_url(file_path, expires) return { "code": 200, "message": "success", "data": { "url": url, "expires_in": expires } } except RuntimeError as e: raise HTTPException(status_code=500, detail=str(e))