""" OSS 上传服务 """ import oss2 from typing import Optional from utils.config import settings from utils.logger import logger from utils.crypto import encrypt_url, decrypt_url class OSSService: def __init__(self): auth = oss2.Auth(settings.oss.access_key_id, settings.oss.access_key_secret) self.bucket = oss2.Bucket(auth, settings.oss.endpoint, settings.oss.bucket) self.encrypt_key = settings.oss.parse_encrypt_key def upload_file(self, file_data: bytes, filename: str, folder: str = "uploads") -> str: """上传文件到OSS""" try: object_name = f"{folder}/{filename}" result = self.bucket.put_object(object_name, file_data) if result.status == 200: file_url = f"https://{settings.oss.bucket}.{settings.oss.endpoint}/{object_name}" encrypted_url = encrypt_url(file_url) logger.info(f"文件上传成功: {object_name}") return encrypted_url else: raise Exception(f"上传失败,状态码: {result.status}") except Exception as e: logger.error(f"OSS上传失败: {e}") raise def upload_image(self, file_data: bytes, filename: str) -> str: """上传图片""" return self.upload_file(file_data, filename, folder="images") def upload_json(self, json_data: str, filename: str) -> str: """上传JSON文件""" return self.upload_file(json_data.encode('utf-8'), filename, folder="json") def get_file_url(self, object_name: str, expires: int = 3600) -> str: """获取文件访问URL""" try: url = self.bucket.sign_url('GET', object_name, expires) return url except Exception as e: logger.error(f"获取文件URL失败: {e}") raise def parse_url(self, encrypted_url: str) -> str: """解析加密的URL""" try: return decrypt_url(encrypted_url) except Exception as e: logger.error(f"URL解析失败: {e}") raise def get_signed_url(self, filename: str, expires: int = 3600) -> str: """获取签名URL""" try: object_name = f"shudao/{filename}" url = self.bucket.sign_url('GET', object_name, expires) return url except Exception as e: logger.error(f"获取签名URL失败: {e}") raise # 全局实例 oss_service = OSSService()