| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- """
- OSS 上传服务
- """
- import oss2
- from typing import Optional
- from utils.config import settings
- from utils.logger import logger
- from utils.crypto import encrypt_url, decrypt_url
- class OSSService:
- def __init__(self):
- auth = oss2.Auth(settings.oss.access_key_id, settings.oss.access_key_secret)
- self.bucket = oss2.Bucket(auth, settings.oss.endpoint, settings.oss.bucket)
- self.encrypt_key = settings.oss.parse_encrypt_key
-
- def upload_file(self, file_data: bytes, filename: str, folder: str = "uploads") -> str:
- """上传文件到OSS"""
- try:
- object_name = f"{folder}/{filename}"
- result = self.bucket.put_object(object_name, file_data)
-
- if result.status == 200:
- file_url = f"https://{settings.oss.bucket}.{settings.oss.endpoint}/{object_name}"
- encrypted_url = encrypt_url(file_url)
- logger.info(f"文件上传成功: {object_name}")
- return encrypted_url
- else:
- raise Exception(f"上传失败,状态码: {result.status}")
- except Exception as e:
- logger.error(f"OSS上传失败: {e}")
- raise
-
- def upload_image(self, file_data: bytes, filename: str) -> str:
- """上传图片"""
- return self.upload_file(file_data, filename, folder="images")
-
- def upload_json(self, json_data: str, filename: str) -> str:
- """上传JSON文件"""
- return self.upload_file(json_data.encode('utf-8'), filename, folder="json")
-
- def get_file_url(self, object_name: str, expires: int = 3600) -> str:
- """获取文件访问URL"""
- try:
- url = self.bucket.sign_url('GET', object_name, expires)
- return url
- except Exception as e:
- logger.error(f"获取文件URL失败: {e}")
- raise
- def parse_url(self, encrypted_url: str) -> str:
- """解析加密的URL"""
- try:
- return decrypt_url(encrypted_url)
- except Exception as e:
- logger.error(f"URL解析失败: {e}")
- raise
-
- def get_signed_url(self, filename: str, expires: int = 3600) -> str:
- """获取签名URL"""
- try:
- object_name = f"shudao/{filename}"
- url = self.bucket.sign_url('GET', object_name, expires)
- return url
- except Exception as e:
- logger.error(f"获取签名URL失败: {e}")
- raise
- # 全局实例
- oss_service = OSSService()
|