| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- """
- 阿里云OSS服务
- 全局单例服务,提供图片上传、下载等功能
- 配置从环境变量加载
- """
- import os
- import uuid
- from urllib.parse import urlparse, quote, unquote
- from datetime import datetime
- from typing import Optional
- import oss2
- import requests
- class OSSService:
- """
- 阿里云OSS服务
-
- 全局单例服务,提供图片上传功能
- """
- _instance: Optional['OSSService'] = None
-
- def __new__(cls):
- if cls._instance is None:
- cls._instance = super().__new__(cls)
- cls._instance._initialized = False
- return cls._instance
-
- def __init__(self):
- if self._initialized:
- return
-
- self.access_key_id = os.getenv("OSS_ACCESS_KEY_ID")
- self.access_key_secret = os.getenv("OSS_ACCESS_KEY_SECRET")
- self.endpoint = os.getenv("OSS_ENDPOINT", "oss-cn-beijing.aliyuncs.com")
- self.bucket_name = os.getenv("OSS_BUCKET_NAME")
- self.bucket_domain = os.getenv("OSS_BUCKET_DOMAIN")
- self._bucket = None
- self._initialized = True
-
- @property
- def bucket(self):
- """延迟初始化bucket"""
- if self._bucket is None and self.access_key_id and self.access_key_secret and self.bucket_name:
- auth = oss2.Auth(self.access_key_id, self.access_key_secret)
- self._bucket = oss2.Bucket(auth, self.endpoint, self.bucket_name)
- return self._bucket
-
- def upload_image(self, image_data: bytes, prefix: str = "images") -> str:
- """
- 上传图片到OSS
-
- Args:
- image_data: 图片二进制数据
- prefix: 存储路径前缀
-
- Returns:
- 公开访问URL
- """
- if not self.bucket:
- raise RuntimeError("OSS服务未正确配置")
-
- object_key = self._generate_unique_filename(prefix)
- self.bucket.put_object(object_key, image_data)
- return f"https://{self.bucket_domain}/{object_key}"
-
- def upload_file(self, file_data: bytes, prefix: str = "uploads", original_filename: str = None) -> str:
- """
- 上传文件到OSS
-
- Args:
- file_data: 文件二进制数据
- prefix: 存储路径前缀
- original_filename: 原始文件名(用于获取扩展名)
-
- Returns:
- 公开访问URL
- """
- if not self.bucket:
- raise RuntimeError("OSS服务未正确配置")
-
- object_key = self._generate_unique_filename_with_ext(prefix, original_filename)
- headers = self._build_upload_headers(original_filename)
- self.bucket.put_object(object_key, file_data, headers=headers)
- return f"https://{self.bucket_domain}/{object_key}"
- def upload_file_stream(self, file_obj, prefix: str = "uploads", original_filename: str = None) -> str:
- """
- 流式上传文件到OSS
- Args:
- file_obj: 文件对象
- prefix: 存储路径前缀
- original_filename: 原始文件名
- Returns:
- 公开访问URL
- """
- if not self.bucket:
- raise RuntimeError("OSS服务未正确配置")
- object_key = self._generate_unique_filename_with_ext(prefix, original_filename)
- headers = self._build_upload_headers(original_filename)
- self.bucket.put_object(object_key, file_obj, headers=headers)
- return f"https://{self.bucket_domain}/{object_key}"
-
- def download_file(self, file_path: str) -> tuple[bytes, str]:
- """
- 从OSS下载文件
-
- Args:
- file_path: 文件路径(不含域名)
-
- Returns:
- (文件内容, content_type)
- """
- if not self.bucket:
- raise RuntimeError("OSS服务未正确配置")
-
- if not self.bucket.object_exists(file_path):
- raise FileNotFoundError(f"文件不存在: {file_path}")
-
- result = self.bucket.get_object(file_path)
- content = result.read()
- content_type = result.headers.get('Content-Type', 'application/octet-stream')
- return content, content_type
-
- def get_signed_url(self, file_path: str, expires: int = 3600) -> str:
- """
- 获取文件签名URL
-
- Args:
- file_path: 文件路径(不含域名)
- expires: 有效期(秒)
-
- Returns:
- 签名URL
- """
- if not self.bucket:
- raise RuntimeError("OSS服务未正确配置")
-
- url = self.bucket.sign_url('GET', file_path, expires)
- return url.replace('http://', 'https://', 1)
- def generate_presigned_put_url(
- self,
- prefix: str = "uploads",
- original_filename: str = None,
- expires: int = 300,
- content_type: str = None
- ) -> dict:
- """
- 生成预签名 PUT 上传 URL,供前端直传 OSS
- Args:
- prefix: 存储路径前缀
- original_filename: 原始文件名(用于保留扩展名)
- expires: 预签名 PUT URL 有效期(秒),默认 5 分钟
- content_type: 文件 Content-Type(可选,用于签名头)
- Returns:
- {
- "upload_url": 预签名 PUT URL(前端直接 PUT 到此地址),
- "object_key": OSS 对象路径,
- "public_url": 上传完成后的公开访问 URL(bucket 公读时有效),
- "access_url": 带签名的 GET URL(bucket 私有读时也可访问,有效期 24 小时)
- }
- """
- if not self.bucket:
- raise RuntimeError("OSS服务未正确配置")
- object_key = self._generate_unique_filename_with_ext(prefix, original_filename)
- headers = {}
- if content_type:
- headers["Content-Type"] = content_type
- # 注意:不将自定义元数据头加入签名,因为前端 PUT 直传不会携带这些头。
- # 否则签名不匹配会导致 OSS 返回 403 Forbidden。
- upload_url = self.bucket.sign_url(
- 'PUT',
- object_key,
- expires,
- headers=headers if headers else None
- )
- upload_url = upload_url.replace('http://', 'https://', 1)
- public_url = f"https://{self.bucket_domain}/{object_key}"
- # 生成带签名的 GET URL(有效期 24 小时),供第三方服务(如 DashScope)访问私有文件
- access_url = self.bucket.sign_url('GET', object_key, 86400)
- access_url = access_url.replace('http://', 'https://', 1)
- return {
- "upload_url": upload_url,
- "object_key": object_key,
- "public_url": public_url,
- "access_url": access_url,
- }
-
- async def upload_from_url(self, image_url: str, prefix: str = "images") -> str:
- """
- 从URL下载文件并上传到OSS
-
- Args:
- image_url: 文件URL
- prefix: 存储路径前缀
-
- Returns:
- OSS公开访问URL
- """
- response = requests.get(image_url, timeout=30)
- response.raise_for_status()
- content = response.content
- original_filename = self._infer_filename(image_url, response.headers.get("Content-Type"))
- ext_from_content = self._infer_extension_from_content(content)
- if ext_from_content and not original_filename.lower().endswith(ext_from_content):
- original_filename = f"downloaded{ext_from_content}"
- return self.upload_file(content, prefix, original_filename)
- def upload_from_url_sync(self, image_url: str, prefix: str = "images") -> str:
- """
- 从URL下载文件并上传到OSS(同步版本,供后台线程调用)
- Args:
- image_url: 文件URL
- prefix: 存储路径前缀
- Returns:
- OSS公开访问URL
- """
- response = requests.get(image_url, timeout=120)
- response.raise_for_status()
- content = response.content
- original_filename = self._infer_filename(image_url, response.headers.get("Content-Type"))
- ext_from_content = self._infer_extension_from_content(content)
- if ext_from_content and not original_filename.lower().endswith(ext_from_content):
- original_filename = f"downloaded{ext_from_content}"
- return self.upload_file(content, prefix, original_filename)
-
- def _generate_unique_filename(self, prefix: str) -> str:
- """生成唯一文件名"""
- date_path = datetime.now().strftime('%Y%m%d')
- unique_id = uuid.uuid4().hex
- return f"{prefix}/{date_path}/{unique_id}.png"
-
- def _generate_unique_filename_with_ext(self, prefix: str, original_filename: str = None) -> str:
- """生成文件路径,使用 UUID 作为对象名并尽量保留原始扩展名"""
- date_path = datetime.now().strftime('%Y%m%d')
- unique_id = uuid.uuid4().hex
- if original_filename:
- safe_name = self.get_safe_original_filename(original_filename)
- _, ext = os.path.splitext(safe_name)
- if ext:
- return f"{prefix}/{date_path}/{unique_id}{ext.lower()}"
- return f"{prefix}/{date_path}/{unique_id}"
- def _build_upload_headers(self, original_filename: str = None) -> Optional[dict]:
- """构造上传头,持久化原始文件名元数据。"""
- safe_name = self.get_safe_original_filename(original_filename)
- if not safe_name:
- return None
- return {
- 'x-oss-meta-original-filename': quote(safe_name, safe='')
- }
- def get_safe_original_filename(self, original_filename: Optional[str]) -> Optional[str]:
- """提取安全的原始文件名。"""
- if not original_filename:
- return None
- safe_name = os.path.basename(original_filename).strip()
- return safe_name or None
- def get_original_filename_from_headers(self, headers: Optional[dict], fallback: Optional[str] = None) -> Optional[str]:
- """从 OSS 响应头中读取上传时保存的原始文件名。"""
- if headers:
- for key, value in headers.items():
- if str(key).lower() == 'x-oss-meta-original-filename' and value:
- try:
- return unquote(str(value))
- except Exception:
- return str(value)
- return fallback
- def _infer_filename(self, file_url: str, content_type: Optional[str]) -> str:
- """Infer a filename from URL or content-type for OSS upload."""
- parsed = urlparse(file_url)
- name = os.path.basename(parsed.path)
- if name and "." in name:
- return name
- ext = ""
- if content_type:
- mime = content_type.split(";")[0].strip().lower()
- mime_map = {
- "video/mp4": ".mp4",
- "video/webm": ".webm",
- "video/quicktime": ".mov",
- "image/png": ".png",
- "image/jpeg": ".jpg",
- "image/jpg": ".jpg",
- "image/webp": ".webp",
- "audio/mpeg": ".mp3",
- "audio/wav": ".wav",
- }
- ext = mime_map.get(mime, "")
- return f"downloaded{ext or '.bin'}"
- def _infer_extension_from_content(self, content: bytes) -> Optional[str]:
- """Infer file extension from magic bytes (best-effort)."""
- if len(content) >= 12 and content[4:8] == b"ftyp":
- return ".mp4"
- if content.startswith(b"\x1A\x45\xDF\xA3"):
- return ".webm"
- if content.startswith(b"OggS"):
- return ".ogv"
- return None
- # 延迟初始化的全局实例
- _oss_service: Optional[OSSService] = None
- def get_oss_service() -> OSSService:
- """获取OSS服务实例(用于依赖注入)"""
- global _oss_service
- if _oss_service is None:
- _oss_service = OSSService()
- return _oss_service
|