""" OSS 上传服务 """ import oss2 import io from typing import Optional from urllib.parse import quote from datetime import datetime, timezone from PIL import Image from utils.config import settings from utils.logger import logger from utils.crypto import encrypt_url, decrypt_url class OSSService: def __init__(self): auth = oss2.Auth(settings.oss.access_key_id, settings.oss.access_key_secret) self.bucket = oss2.Bucket(auth, settings.oss.endpoint, settings.oss.bucket) self.encrypt_key = settings.oss.parse_encrypt_key logger.info( "OSSService 初始化完成: endpoint=%s, bucket=%s, access_key_id=%s", settings.oss.endpoint, settings.oss.bucket, f"{settings.oss.access_key_id[:4]}***" if settings.oss.access_key_id else "EMPTY", ) def _build_public_url(self, object_name: str) -> str: """ 构建 OSS 公网直链。 与 Go 版本保持一致: - 使用 endpoint/bucket/object_name 的永久直链格式 - object_name 做 URL 编码,避免中文/空格导致图片无法访问 """ endpoint = settings.oss.endpoint.rstrip("/") bucket = settings.oss.bucket if endpoint.startswith("http://") or endpoint.startswith("https://"): base_url = f"{endpoint}/{bucket}" else: base_url = f"https://{bucket}.{endpoint}" encoded_object_name = "/".join( quote(part, safe="") for part in object_name.split("/") ) public_url = f"{base_url}/{encoded_object_name}" logger.info( "构建 OSS 公网 URL 完成: object_name=%s, base_url=%s, encoded_object_name=%s, public_url=%s", object_name, base_url, encoded_object_name, public_url, ) return public_url def _compress_image_to_jpeg(self, file_data: bytes) -> bytes: """ 按 Go 版本思路压缩图片: - 尝试转成 JPEG - 优先通过质量压缩 - 失败时回退原始数据 """ try: original_size = len(file_data) logger.info("开始压缩图片: original_size=%s bytes", original_size) img = Image.open(io.BytesIO(file_data)) logger.info( "图片打开成功: format=%s, mode=%s, size=%sx%s", getattr(img, "format", "unknown"), img.mode, img.size[0], img.size[1], ) img = img.convert("RGB") out = io.BytesIO() img.save(out, format="JPEG", quality=85, optimize=True) compressed = out.getvalue() logger.info( "图片压缩完成: compressed_size=%s bytes, ratio=%.2f%%", len(compressed), (len(compressed) / original_size * 100) if original_size else 0, ) # 只有压缩有效时才采用,否则回退原图 if compressed and len(compressed) < original_size: logger.info("采用压缩后的图片数据上传") return compressed logger.info("压缩后未明显变小,回退原图上传") return compressed or file_data except Exception as e: logger.warning(f"图片压缩失败,回退原图: {e}") return file_data def upload_file(self, file_data: bytes, filename: str, folder: str = "uploads") -> str: """上传文件到OSS""" try: object_name = f"{folder}/{filename}" logger.info( "准备上传文件: folder=%s, filename=%s, object_name=%s, size=%s bytes", folder, filename, object_name, len(file_data), ) result = self.bucket.put_object(object_name, file_data) logger.info( "OSS 上传响应: object_name=%s, status=%s, request_id=%s, etag=%s", object_name, getattr(result, "status", None), getattr(result, "request_id", None), getattr(result, "etag", None), ) if result.status == 200: file_url = self._build_public_url(object_name) logger.info( "文件上传成功: object_name=%s, public_url=%s", object_name, file_url, ) return file_url else: logger.error( "文件上传失败: object_name=%s, status=%s, response=%s", object_name, result.status, result, ) raise Exception(f"上传失败,状态码: {result.status}") except Exception as e: logger.error(f"OSS上传失败: {e}", exc_info=True) raise def upload_image(self, file_data: bytes, filename: str) -> str: """ 上传图片,行为对齐 Go 版本: - 使用 images/YYYY/MMdd_timestamp.jpg 命名 - 做基础 JPEG 压缩 - 使用 public-read 直链访问 - 返回可直接访问的公网 URL """ try: now = datetime.now(timezone.utc) timestamp = int(now.timestamp()) object_name = f"images/{now.year}/{now.strftime('%m%d')}_{timestamp}.jpg" logger.info( "准备上传图片: original_filename=%s, object_name=%s, timestamp=%s, utc_now=%s", filename, object_name, timestamp, now.isoformat(), ) compressed_data = self._compress_image_to_jpeg(file_data) logger.info( "图片数据准备完成: object_name=%s, upload_size=%s bytes", object_name, len(compressed_data), ) result = self.bucket.put_object( object_name, compressed_data, headers={"x-oss-object-acl": "public-read"} ) logger.info( "OSS 图片上传响应: object_name=%s, status=%s, request_id=%s, etag=%s", object_name, getattr(result, "status", None), getattr(result, "request_id", None), getattr(result, "etag", None), ) if result.status == 200: public_url = self._build_public_url(object_name) logger.info( "图片上传成功: object_name=%s, public_url=%s", object_name, public_url, ) return public_url logger.error( "图片上传失败: object_name=%s, status=%s, response=%s", object_name, result.status, result, ) raise Exception(f"上传图片失败,状态码: {result.status}") except Exception as e: logger.error(f"OSS图片上传失败: {e}", exc_info=True) raise def upload_json(self, json_data: str, filename: str) -> str: """上传JSON文件""" return self.upload_file(json_data.encode("utf-8"), filename, folder="json") def get_file_url(self, object_name: str, expires: int = 3600) -> str: """获取文件访问URL""" try: url = self.bucket.sign_url("GET", object_name, expires) return url except Exception as e: logger.error(f"获取文件URL失败: {e}") raise def parse_url(self, encrypted_url: str) -> str: """解析加密的URL""" try: return decrypt_url(encrypted_url) except Exception as e: logger.error(f"URL解析失败: {e}") raise def get_signed_url(self, filename: str, expires: int = 3600) -> str: """获取签名URL""" try: object_name = f"shudao/{filename}" url = self.bucket.sign_url("GET", object_name, expires) return url except Exception as e: logger.error(f"获取签名URL失败: {e}") raise # 全局实例 oss_service = OSSService()