oss_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. """
  2. 阿里云OSS服务
  3. 全局单例服务,提供图片上传、下载等功能
  4. 配置从环境变量加载
  5. """
  6. import os
  7. import uuid
  8. from urllib.parse import urlparse, quote, unquote
  9. from datetime import datetime
  10. from typing import Optional
  11. import oss2
  12. import requests
  13. class OSSService:
  14. """
  15. 阿里云OSS服务
  16. 全局单例服务,提供图片上传功能
  17. """
  18. _instance: Optional['OSSService'] = None
  19. def __new__(cls):
  20. if cls._instance is None:
  21. cls._instance = super().__new__(cls)
  22. cls._instance._initialized = False
  23. return cls._instance
  24. def __init__(self):
  25. if self._initialized:
  26. return
  27. self.access_key_id = os.getenv("OSS_ACCESS_KEY_ID")
  28. self.access_key_secret = os.getenv("OSS_ACCESS_KEY_SECRET")
  29. self.endpoint = os.getenv("OSS_ENDPOINT", "oss-cn-beijing.aliyuncs.com")
  30. self.bucket_name = os.getenv("OSS_BUCKET_NAME")
  31. self.bucket_domain = os.getenv("OSS_BUCKET_DOMAIN")
  32. self._bucket = None
  33. self._initialized = True
  34. @property
  35. def bucket(self):
  36. """延迟初始化bucket"""
  37. if self._bucket is None and self.access_key_id and self.access_key_secret and self.bucket_name:
  38. auth = oss2.Auth(self.access_key_id, self.access_key_secret)
  39. self._bucket = oss2.Bucket(auth, self.endpoint, self.bucket_name)
  40. return self._bucket
  41. def upload_image(self, image_data: bytes, prefix: str = "images") -> str:
  42. """
  43. 上传图片到OSS
  44. Args:
  45. image_data: 图片二进制数据
  46. prefix: 存储路径前缀
  47. Returns:
  48. 公开访问URL
  49. """
  50. if not self.bucket:
  51. raise RuntimeError("OSS服务未正确配置")
  52. object_key = self._generate_unique_filename(prefix)
  53. self.bucket.put_object(object_key, image_data)
  54. return f"https://{self.bucket_domain}/{object_key}"
  55. def upload_file(self, file_data: bytes, prefix: str = "uploads", original_filename: str = None) -> str:
  56. """
  57. 上传文件到OSS
  58. Args:
  59. file_data: 文件二进制数据
  60. prefix: 存储路径前缀
  61. original_filename: 原始文件名(用于获取扩展名)
  62. Returns:
  63. 公开访问URL
  64. """
  65. if not self.bucket:
  66. raise RuntimeError("OSS服务未正确配置")
  67. object_key = self._generate_unique_filename_with_ext(prefix, original_filename)
  68. headers = self._build_upload_headers(original_filename)
  69. self.bucket.put_object(object_key, file_data, headers=headers)
  70. return f"https://{self.bucket_domain}/{object_key}"
  71. def upload_file_stream(self, file_obj, prefix: str = "uploads", original_filename: str = None) -> str:
  72. """
  73. 流式上传文件到OSS
  74. Args:
  75. file_obj: 文件对象
  76. prefix: 存储路径前缀
  77. original_filename: 原始文件名
  78. Returns:
  79. 公开访问URL
  80. """
  81. if not self.bucket:
  82. raise RuntimeError("OSS服务未正确配置")
  83. object_key = self._generate_unique_filename_with_ext(prefix, original_filename)
  84. headers = self._build_upload_headers(original_filename)
  85. self.bucket.put_object(object_key, file_obj, headers=headers)
  86. return f"https://{self.bucket_domain}/{object_key}"
  87. def download_file(self, file_path: str) -> tuple[bytes, str]:
  88. """
  89. 从OSS下载文件
  90. Args:
  91. file_path: 文件路径(不含域名)
  92. Returns:
  93. (文件内容, content_type)
  94. """
  95. if not self.bucket:
  96. raise RuntimeError("OSS服务未正确配置")
  97. if not self.bucket.object_exists(file_path):
  98. raise FileNotFoundError(f"文件不存在: {file_path}")
  99. result = self.bucket.get_object(file_path)
  100. content = result.read()
  101. content_type = result.headers.get('Content-Type', 'application/octet-stream')
  102. return content, content_type
  103. def get_signed_url(self, file_path: str, expires: int = 3600) -> str:
  104. """
  105. 获取文件签名URL
  106. Args:
  107. file_path: 文件路径(不含域名)
  108. expires: 有效期(秒)
  109. Returns:
  110. 签名URL
  111. """
  112. if not self.bucket:
  113. raise RuntimeError("OSS服务未正确配置")
  114. url = self.bucket.sign_url('GET', file_path, expires)
  115. return url.replace('http://', 'https://', 1)
  116. def generate_presigned_put_url(
  117. self,
  118. prefix: str = "uploads",
  119. original_filename: str = None,
  120. expires: int = 300,
  121. content_type: str = None
  122. ) -> dict:
  123. """
  124. 生成预签名 PUT 上传 URL,供前端直传 OSS
  125. Args:
  126. prefix: 存储路径前缀
  127. original_filename: 原始文件名(用于保留扩展名)
  128. expires: 预签名 PUT URL 有效期(秒),默认 5 分钟
  129. content_type: 文件 Content-Type(可选,用于签名头)
  130. Returns:
  131. {
  132. "upload_url": 预签名 PUT URL(前端直接 PUT 到此地址),
  133. "object_key": OSS 对象路径,
  134. "public_url": 上传完成后的公开访问 URL(bucket 公读时有效),
  135. "access_url": 带签名的 GET URL(bucket 私有读时也可访问,有效期 24 小时)
  136. }
  137. """
  138. if not self.bucket:
  139. raise RuntimeError("OSS服务未正确配置")
  140. object_key = self._generate_unique_filename_with_ext(prefix, original_filename)
  141. headers = {}
  142. if content_type:
  143. headers["Content-Type"] = content_type
  144. # 注意:不将自定义元数据头加入签名,因为前端 PUT 直传不会携带这些头。
  145. # 否则签名不匹配会导致 OSS 返回 403 Forbidden。
  146. upload_url = self.bucket.sign_url(
  147. 'PUT',
  148. object_key,
  149. expires,
  150. headers=headers if headers else None
  151. )
  152. upload_url = upload_url.replace('http://', 'https://', 1)
  153. public_url = f"https://{self.bucket_domain}/{object_key}"
  154. # 生成带签名的 GET URL(有效期 24 小时),供第三方服务(如 DashScope)访问私有文件
  155. access_url = self.bucket.sign_url('GET', object_key, 86400)
  156. access_url = access_url.replace('http://', 'https://', 1)
  157. return {
  158. "upload_url": upload_url,
  159. "object_key": object_key,
  160. "public_url": public_url,
  161. "access_url": access_url,
  162. }
  163. async def upload_from_url(self, image_url: str, prefix: str = "images") -> str:
  164. """
  165. 从URL下载文件并上传到OSS
  166. Args:
  167. image_url: 文件URL
  168. prefix: 存储路径前缀
  169. Returns:
  170. OSS公开访问URL
  171. """
  172. response = requests.get(image_url, timeout=30)
  173. response.raise_for_status()
  174. content = response.content
  175. original_filename = self._infer_filename(image_url, response.headers.get("Content-Type"))
  176. ext_from_content = self._infer_extension_from_content(content)
  177. if ext_from_content and not original_filename.lower().endswith(ext_from_content):
  178. original_filename = f"downloaded{ext_from_content}"
  179. return self.upload_file(content, prefix, original_filename)
  180. def upload_from_url_sync(self, image_url: str, prefix: str = "images") -> str:
  181. """
  182. 从URL下载文件并上传到OSS(同步版本,供后台线程调用)
  183. Args:
  184. image_url: 文件URL
  185. prefix: 存储路径前缀
  186. Returns:
  187. OSS公开访问URL
  188. """
  189. response = requests.get(image_url, timeout=120)
  190. response.raise_for_status()
  191. content = response.content
  192. original_filename = self._infer_filename(image_url, response.headers.get("Content-Type"))
  193. ext_from_content = self._infer_extension_from_content(content)
  194. if ext_from_content and not original_filename.lower().endswith(ext_from_content):
  195. original_filename = f"downloaded{ext_from_content}"
  196. return self.upload_file(content, prefix, original_filename)
  197. def _generate_unique_filename(self, prefix: str) -> str:
  198. """生成唯一文件名"""
  199. date_path = datetime.now().strftime('%Y%m%d')
  200. unique_id = uuid.uuid4().hex
  201. return f"{prefix}/{date_path}/{unique_id}.png"
  202. def _generate_unique_filename_with_ext(self, prefix: str, original_filename: str = None) -> str:
  203. """生成文件路径,使用 UUID 作为对象名并尽量保留原始扩展名"""
  204. date_path = datetime.now().strftime('%Y%m%d')
  205. unique_id = uuid.uuid4().hex
  206. if original_filename:
  207. safe_name = self.get_safe_original_filename(original_filename)
  208. _, ext = os.path.splitext(safe_name)
  209. if ext:
  210. return f"{prefix}/{date_path}/{unique_id}{ext.lower()}"
  211. return f"{prefix}/{date_path}/{unique_id}"
  212. def _build_upload_headers(self, original_filename: str = None) -> Optional[dict]:
  213. """构造上传头,持久化原始文件名元数据。"""
  214. safe_name = self.get_safe_original_filename(original_filename)
  215. if not safe_name:
  216. return None
  217. return {
  218. 'x-oss-meta-original-filename': quote(safe_name, safe='')
  219. }
  220. def get_safe_original_filename(self, original_filename: Optional[str]) -> Optional[str]:
  221. """提取安全的原始文件名。"""
  222. if not original_filename:
  223. return None
  224. safe_name = os.path.basename(original_filename).strip()
  225. return safe_name or None
  226. def get_original_filename_from_headers(self, headers: Optional[dict], fallback: Optional[str] = None) -> Optional[str]:
  227. """从 OSS 响应头中读取上传时保存的原始文件名。"""
  228. if headers:
  229. for key, value in headers.items():
  230. if str(key).lower() == 'x-oss-meta-original-filename' and value:
  231. try:
  232. return unquote(str(value))
  233. except Exception:
  234. return str(value)
  235. return fallback
  236. def _infer_filename(self, file_url: str, content_type: Optional[str]) -> str:
  237. """Infer a filename from URL or content-type for OSS upload."""
  238. parsed = urlparse(file_url)
  239. name = os.path.basename(parsed.path)
  240. if name and "." in name:
  241. return name
  242. ext = ""
  243. if content_type:
  244. mime = content_type.split(";")[0].strip().lower()
  245. mime_map = {
  246. "video/mp4": ".mp4",
  247. "video/webm": ".webm",
  248. "video/quicktime": ".mov",
  249. "image/png": ".png",
  250. "image/jpeg": ".jpg",
  251. "image/jpg": ".jpg",
  252. "image/webp": ".webp",
  253. "audio/mpeg": ".mp3",
  254. "audio/wav": ".wav",
  255. }
  256. ext = mime_map.get(mime, "")
  257. return f"downloaded{ext or '.bin'}"
  258. def _infer_extension_from_content(self, content: bytes) -> Optional[str]:
  259. """Infer file extension from magic bytes (best-effort)."""
  260. if len(content) >= 12 and content[4:8] == b"ftyp":
  261. return ".mp4"
  262. if content.startswith(b"\x1A\x45\xDF\xA3"):
  263. return ".webm"
  264. if content.startswith(b"OggS"):
  265. return ".ogv"
  266. return None
  267. # 延迟初始化的全局实例
  268. _oss_service: Optional[OSSService] = None
  269. def get_oss_service() -> OSSService:
  270. """获取OSS服务实例(用于依赖注入)"""
  271. global _oss_service
  272. if _oss_service is None:
  273. _oss_service = OSSService()
  274. return _oss_service