""" 阿里云OSS服务 全局单例服务,提供图片上传、下载等功能 配置从环境变量加载 """ import os import uuid from urllib.parse import urlparse, quote, unquote from datetime import datetime from typing import Optional import oss2 import requests class OSSService: """ 阿里云OSS服务 全局单例服务,提供图片上传功能 """ _instance: Optional['OSSService'] = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self.access_key_id = os.getenv("OSS_ACCESS_KEY_ID") self.access_key_secret = os.getenv("OSS_ACCESS_KEY_SECRET") self.endpoint = os.getenv("OSS_ENDPOINT", "oss-cn-beijing.aliyuncs.com") self.bucket_name = os.getenv("OSS_BUCKET_NAME") self.bucket_domain = os.getenv("OSS_BUCKET_DOMAIN") self._bucket = None self._initialized = True @property def bucket(self): """延迟初始化bucket""" if self._bucket is None and self.access_key_id and self.access_key_secret and self.bucket_name: auth = oss2.Auth(self.access_key_id, self.access_key_secret) self._bucket = oss2.Bucket(auth, self.endpoint, self.bucket_name) return self._bucket def upload_image(self, image_data: bytes, prefix: str = "images") -> str: """ 上传图片到OSS Args: image_data: 图片二进制数据 prefix: 存储路径前缀 Returns: 公开访问URL """ if not self.bucket: raise RuntimeError("OSS服务未正确配置") object_key = self._generate_unique_filename(prefix) self.bucket.put_object(object_key, image_data) return f"https://{self.bucket_domain}/{object_key}" def upload_file(self, file_data: bytes, prefix: str = "uploads", original_filename: str = None) -> str: """ 上传文件到OSS Args: file_data: 文件二进制数据 prefix: 存储路径前缀 original_filename: 原始文件名(用于获取扩展名) Returns: 公开访问URL """ if not self.bucket: raise RuntimeError("OSS服务未正确配置") object_key = self._generate_unique_filename_with_ext(prefix, original_filename) headers = self._build_upload_headers(original_filename) self.bucket.put_object(object_key, file_data, headers=headers) return f"https://{self.bucket_domain}/{object_key}" def upload_file_stream(self, file_obj, prefix: str = "uploads", original_filename: str = None) -> str: """ 流式上传文件到OSS Args: file_obj: 文件对象 prefix: 存储路径前缀 original_filename: 原始文件名 Returns: 公开访问URL """ if not self.bucket: raise RuntimeError("OSS服务未正确配置") object_key = self._generate_unique_filename_with_ext(prefix, original_filename) headers = self._build_upload_headers(original_filename) self.bucket.put_object(object_key, file_obj, headers=headers) return f"https://{self.bucket_domain}/{object_key}" def download_file(self, file_path: str) -> tuple[bytes, str]: """ 从OSS下载文件 Args: file_path: 文件路径(不含域名) Returns: (文件内容, content_type) """ if not self.bucket: raise RuntimeError("OSS服务未正确配置") if not self.bucket.object_exists(file_path): raise FileNotFoundError(f"文件不存在: {file_path}") result = self.bucket.get_object(file_path) content = result.read() content_type = result.headers.get('Content-Type', 'application/octet-stream') return content, content_type def get_signed_url(self, file_path: str, expires: int = 3600) -> str: """ 获取文件签名URL Args: file_path: 文件路径(不含域名) expires: 有效期(秒) Returns: 签名URL """ if not self.bucket: raise RuntimeError("OSS服务未正确配置") url = self.bucket.sign_url('GET', file_path, expires) return url.replace('http://', 'https://', 1) def generate_presigned_put_url( self, prefix: str = "uploads", original_filename: str = None, expires: int = 300, content_type: str = None ) -> dict: """ 生成预签名 PUT 上传 URL,供前端直传 OSS Args: prefix: 存储路径前缀 original_filename: 原始文件名(用于保留扩展名) expires: 预签名 PUT URL 有效期(秒),默认 5 分钟 content_type: 文件 Content-Type(可选,用于签名头) Returns: { "upload_url": 预签名 PUT URL(前端直接 PUT 到此地址), "object_key": OSS 对象路径, "public_url": 上传完成后的公开访问 URL(bucket 公读时有效), "access_url": 带签名的 GET URL(bucket 私有读时也可访问,有效期 24 小时) } """ if not self.bucket: raise RuntimeError("OSS服务未正确配置") object_key = self._generate_unique_filename_with_ext(prefix, original_filename) headers = {} if content_type: headers["Content-Type"] = content_type # 注意:不将自定义元数据头加入签名,因为前端 PUT 直传不会携带这些头。 # 否则签名不匹配会导致 OSS 返回 403 Forbidden。 upload_url = self.bucket.sign_url( 'PUT', object_key, expires, headers=headers if headers else None ) upload_url = upload_url.replace('http://', 'https://', 1) public_url = f"https://{self.bucket_domain}/{object_key}" # 生成带签名的 GET URL(有效期 24 小时),供第三方服务(如 DashScope)访问私有文件 access_url = self.bucket.sign_url('GET', object_key, 86400) access_url = access_url.replace('http://', 'https://', 1) return { "upload_url": upload_url, "object_key": object_key, "public_url": public_url, "access_url": access_url, } async def upload_from_url(self, image_url: str, prefix: str = "images") -> str: """ 从URL下载文件并上传到OSS Args: image_url: 文件URL prefix: 存储路径前缀 Returns: OSS公开访问URL """ response = requests.get(image_url, timeout=30) response.raise_for_status() content = response.content original_filename = self._infer_filename(image_url, response.headers.get("Content-Type")) ext_from_content = self._infer_extension_from_content(content) if ext_from_content and not original_filename.lower().endswith(ext_from_content): original_filename = f"downloaded{ext_from_content}" return self.upload_file(content, prefix, original_filename) def upload_from_url_sync(self, image_url: str, prefix: str = "images") -> str: """ 从URL下载文件并上传到OSS(同步版本,供后台线程调用) Args: image_url: 文件URL prefix: 存储路径前缀 Returns: OSS公开访问URL """ response = requests.get(image_url, timeout=120) response.raise_for_status() content = response.content original_filename = self._infer_filename(image_url, response.headers.get("Content-Type")) ext_from_content = self._infer_extension_from_content(content) if ext_from_content and not original_filename.lower().endswith(ext_from_content): original_filename = f"downloaded{ext_from_content}" return self.upload_file(content, prefix, original_filename) def _generate_unique_filename(self, prefix: str) -> str: """生成唯一文件名""" date_path = datetime.now().strftime('%Y%m%d') unique_id = uuid.uuid4().hex return f"{prefix}/{date_path}/{unique_id}.png" def _generate_unique_filename_with_ext(self, prefix: str, original_filename: str = None) -> str: """生成文件路径,使用 UUID 作为对象名并尽量保留原始扩展名""" date_path = datetime.now().strftime('%Y%m%d') unique_id = uuid.uuid4().hex if original_filename: safe_name = self.get_safe_original_filename(original_filename) _, ext = os.path.splitext(safe_name) if ext: return f"{prefix}/{date_path}/{unique_id}{ext.lower()}" return f"{prefix}/{date_path}/{unique_id}" def _build_upload_headers(self, original_filename: str = None) -> Optional[dict]: """构造上传头,持久化原始文件名元数据。""" safe_name = self.get_safe_original_filename(original_filename) if not safe_name: return None return { 'x-oss-meta-original-filename': quote(safe_name, safe='') } def get_safe_original_filename(self, original_filename: Optional[str]) -> Optional[str]: """提取安全的原始文件名。""" if not original_filename: return None safe_name = os.path.basename(original_filename).strip() return safe_name or None def get_original_filename_from_headers(self, headers: Optional[dict], fallback: Optional[str] = None) -> Optional[str]: """从 OSS 响应头中读取上传时保存的原始文件名。""" if headers: for key, value in headers.items(): if str(key).lower() == 'x-oss-meta-original-filename' and value: try: return unquote(str(value)) except Exception: return str(value) return fallback def _infer_filename(self, file_url: str, content_type: Optional[str]) -> str: """Infer a filename from URL or content-type for OSS upload.""" parsed = urlparse(file_url) name = os.path.basename(parsed.path) if name and "." in name: return name ext = "" if content_type: mime = content_type.split(";")[0].strip().lower() mime_map = { "video/mp4": ".mp4", "video/webm": ".webm", "video/quicktime": ".mov", "image/png": ".png", "image/jpeg": ".jpg", "image/jpg": ".jpg", "image/webp": ".webp", "audio/mpeg": ".mp3", "audio/wav": ".wav", } ext = mime_map.get(mime, "") return f"downloaded{ext or '.bin'}" def _infer_extension_from_content(self, content: bytes) -> Optional[str]: """Infer file extension from magic bytes (best-effort).""" if len(content) >= 12 and content[4:8] == b"ftyp": return ".mp4" if content.startswith(b"\x1A\x45\xDF\xA3"): return ".webm" if content.startswith(b"OggS"): return ".ogv" return None # 延迟初始化的全局实例 _oss_service: Optional[OSSService] = None def get_oss_service() -> OSSService: """获取OSS服务实例(用于依赖注入)""" global _oss_service if _oss_service is None: _oss_service = OSSService() return _oss_service