| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- """
- OSS 上传服务
- """
- import oss2
- import io
- from typing import Optional
- from urllib.parse import quote
- from datetime import datetime, timezone
- from PIL import Image
- from utils.config import settings
- from utils.logger import logger
- from utils.crypto import encrypt_url, decrypt_url
- class OSSService:
- def __init__(self):
- auth = oss2.Auth(settings.oss.access_key_id, settings.oss.access_key_secret)
- self.bucket = oss2.Bucket(auth, settings.oss.endpoint, settings.oss.bucket)
- self.encrypt_key = settings.oss.parse_encrypt_key
- logger.info(
- "OSSService 初始化完成: endpoint=%s, bucket=%s, access_key_id=%s",
- settings.oss.endpoint,
- settings.oss.bucket,
- f"{settings.oss.access_key_id[:4]}***" if settings.oss.access_key_id else "EMPTY",
- )
- def _build_public_url(self, object_name: str) -> str:
- """
- 构建 OSS 公网直链。
- 与 Go 版本保持一致:
- - 使用 endpoint/bucket/object_name 的永久直链格式
- - object_name 做 URL 编码,避免中文/空格导致图片无法访问
- """
- endpoint = settings.oss.endpoint.rstrip("/")
- bucket = settings.oss.bucket
- if endpoint.startswith("http://") or endpoint.startswith("https://"):
- base_url = f"{endpoint}/{bucket}"
- else:
- base_url = f"https://{bucket}.{endpoint}"
- encoded_object_name = "/".join(
- quote(part, safe="")
- for part in object_name.split("/")
- )
- public_url = f"{base_url}/{encoded_object_name}"
- logger.info(
- "构建 OSS 公网 URL 完成: object_name=%s, base_url=%s, encoded_object_name=%s, public_url=%s",
- object_name,
- base_url,
- encoded_object_name,
- public_url,
- )
- return public_url
- def _compress_image_to_jpeg(self, file_data: bytes) -> bytes:
- """
- 按 Go 版本思路压缩图片:
- - 尝试转成 JPEG
- - 优先通过质量压缩
- - 失败时回退原始数据
- """
- try:
- original_size = len(file_data)
- logger.info("开始压缩图片: original_size=%s bytes", original_size)
- img = Image.open(io.BytesIO(file_data))
- logger.info(
- "图片打开成功: format=%s, mode=%s, size=%sx%s",
- getattr(img, "format", "unknown"),
- img.mode,
- img.size[0],
- img.size[1],
- )
- img = img.convert("RGB")
- out = io.BytesIO()
- img.save(out, format="JPEG", quality=85, optimize=True)
- compressed = out.getvalue()
- logger.info(
- "图片压缩完成: compressed_size=%s bytes, ratio=%.2f%%",
- len(compressed),
- (len(compressed) / original_size * 100) if original_size else 0,
- )
- # 只有压缩有效时才采用,否则回退原图
- if compressed and len(compressed) < original_size:
- logger.info("采用压缩后的图片数据上传")
- return compressed
- logger.info("压缩后未明显变小,回退原图上传")
- return compressed or file_data
- except Exception as e:
- logger.warning(f"图片压缩失败,回退原图: {e}")
- return file_data
- def upload_file(self, file_data: bytes, filename: str, folder: str = "uploads") -> str:
- """上传文件到OSS"""
- try:
- object_name = f"{folder}/{filename}"
- logger.info(
- "准备上传文件: folder=%s, filename=%s, object_name=%s, size=%s bytes",
- folder,
- filename,
- object_name,
- len(file_data),
- )
- result = self.bucket.put_object(object_name, file_data)
- logger.info(
- "OSS 上传响应: object_name=%s, status=%s, request_id=%s, etag=%s",
- object_name,
- getattr(result, "status", None),
- getattr(result, "request_id", None),
- getattr(result, "etag", None),
- )
- if result.status == 200:
- file_url = self._build_public_url(object_name)
- logger.info(
- "文件上传成功: object_name=%s, public_url=%s",
- object_name,
- file_url,
- )
- return file_url
- else:
- logger.error(
- "文件上传失败: object_name=%s, status=%s, response=%s",
- object_name,
- result.status,
- result,
- )
- raise Exception(f"上传失败,状态码: {result.status}")
- except Exception as e:
- logger.error(f"OSS上传失败: {e}", exc_info=True)
- raise
- def upload_image(self, file_data: bytes, filename: str) -> str:
- """
- 上传图片,行为对齐 Go 版本:
- - 使用 images/YYYY/MMdd_timestamp.jpg 命名
- - 做基础 JPEG 压缩
- - 使用 public-read 直链访问
- - 返回可直接访问的公网 URL
- """
- try:
- now = datetime.now(timezone.utc)
- timestamp = int(now.timestamp())
- object_name = f"images/{now.year}/{now.strftime('%m%d')}_{timestamp}.jpg"
- logger.info(
- "准备上传图片: original_filename=%s, object_name=%s, timestamp=%s, utc_now=%s",
- filename,
- object_name,
- timestamp,
- now.isoformat(),
- )
- compressed_data = self._compress_image_to_jpeg(file_data)
- logger.info(
- "图片数据准备完成: object_name=%s, upload_size=%s bytes",
- object_name,
- len(compressed_data),
- )
- result = self.bucket.put_object(
- object_name,
- compressed_data,
- headers={"x-oss-object-acl": "public-read"}
- )
- logger.info(
- "OSS 图片上传响应: object_name=%s, status=%s, request_id=%s, etag=%s",
- object_name,
- getattr(result, "status", None),
- getattr(result, "request_id", None),
- getattr(result, "etag", None),
- )
- if result.status == 200:
- public_url = self._build_public_url(object_name)
- logger.info(
- "图片上传成功: object_name=%s, public_url=%s",
- object_name,
- public_url,
- )
- return public_url
- logger.error(
- "图片上传失败: object_name=%s, status=%s, response=%s",
- object_name,
- result.status,
- result,
- )
- raise Exception(f"上传图片失败,状态码: {result.status}")
- except Exception as e:
- logger.error(f"OSS图片上传失败: {e}", exc_info=True)
- raise
- def upload_json(self, json_data: str, filename: str) -> str:
- """上传JSON文件"""
- return self.upload_file(json_data.encode("utf-8"), filename, folder="json")
- def get_file_url(self, object_name: str, expires: int = 3600) -> str:
- """获取文件访问URL"""
- try:
- url = self.bucket.sign_url("GET", object_name, expires)
- return url
- except Exception as e:
- logger.error(f"获取文件URL失败: {e}")
- raise
- def parse_url(self, encrypted_url: str) -> str:
- """解析加密的URL"""
- try:
- return decrypt_url(encrypted_url)
- except Exception as e:
- logger.error(f"URL解析失败: {e}")
- raise
- def get_signed_url(self, filename: str, expires: int = 3600) -> str:
- """获取签名URL"""
- try:
- object_name = f"shudao/{filename}"
- url = self.bucket.sign_url("GET", object_name, expires)
- return url
- except Exception as e:
- logger.error(f"获取签名URL失败: {e}")
- raise
- # 全局实例
- oss_service = OSSService()
|