| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- #!/usr/bin/env python3
- """测试通过 OSS 凭证下载数据文件区的图片。"""
- import json
- import urllib.request
- import urllib.parse
- import sys
- dataset_id = sys.argv[1] if len(sys.argv) > 1 else "tany0699/carBrands50"
- namespace, ds_name = dataset_id.split("/", 1)
- print(f"数据集: {dataset_id}\n")
- from modelscope.hub.api import HubApi
- api = HubApi()
- # 获取 OSS 凭证
- print("=== 获取 OSS 凭证 ===")
- config = api.get_dataset_access_config(
- dataset_name=ds_name,
- namespace=namespace,
- revision="master",
- )
- host = config["Host"]
- backup_dir = config["BackupDir"] # zip 文件
- unzip_dir = config["Dir"] # 已解压的文件
- access_id = config["AccessId"]
- access_secret = config["AccessSecret"]
- security_token = config["SecurityToken"]
- print(f"Host: {host}")
- print(f"Zip 目录: {backup_dir}")
- print(f"解压目录: {unzip_dir}")
- print(f"过期时间: {config['Expiration']}")
- # 测试1: 直接下载 zip(公开访问?)
- print("\n=== 测试1: 直接访问 public-zip/train.zip ===")
- url = f"{host}/{backup_dir}train.zip"
- print(f"URL: {url}")
- try:
- req = urllib.request.Request(url, method="HEAD", headers={"User-Agent": "Test"})
- with urllib.request.urlopen(req, timeout=15) as resp:
- print(f"HEAD: {resp.status} | size={resp.headers.get('Content-Length', '?')}")
- except Exception as e:
- print(f"HEAD: {e}")
- # 测试2: 带 STS 签名下载 zip
- print("\n=== 测试2: 带 STS 签名访问 train.zip ===")
- # OSS STS 签名 URL 格式: ?OSSAccessKeyId=xxx&Expires=xxx&Signature=xxx&security-token=xxx
- import time
- import hmac
- import hashlib
- import base64
- expires = str(int(time.time()) + 3600)
- string_to_sign = f"HEAD\n\n\n{expires}\n/{config['Bucket']}/{backup_dir}train.zip"
- h = hmac.new(access_secret.encode(), string_to_sign.encode(), hashlib.sha1)
- signature = urllib.parse.quote(base64.b64encode(h.digest()))
- url = (f"{host}/{backup_dir}train.zip"
- f"?OSSAccessKeyId={urllib.parse.quote(access_id)}"
- f"&Expires={expires}"
- f"&Signature={signature}"
- f"&security-token={urllib.parse.quote(security_token)}")
- print(f"URL: {url[:200]}...")
- try:
- req = urllib.request.Request(url, method="HEAD", headers={"User-Agent": "Test"})
- with urllib.request.urlopen(req, timeout=15) as resp:
- print(f"HEAD: {resp.status} | size={resp.headers.get('Content-Length', '?')}")
- print(f">>> 成功! <<<")
- except Exception as e:
- print(f"HEAD: {e}")
- # 测试3: GET 下载前 1MB
- print("\n=== 测试3: GET 下载 train.zip 前 1MB ===")
- try:
- req = urllib.request.Request(url, headers={"User-Agent": "Test", "Range": "bytes=0-1048575"})
- with urllib.request.urlopen(req, timeout=30) as resp:
- data = resp.read(1048576)
- is_zip = data[:4] == b'PK\x03\x04'
- print(f"GET: {resp.status} | {len(data)} bytes | is_zip={is_zip}")
- if is_zip:
- print(f">>> 成功! 是 ZIP 文件! <<<")
- except Exception as e:
- print(f"GET: {e}")
- # 测试4: 访问 public-unzip-dataset(已解压的图片)
- print("\n=== 测试4: 直接访问 public-unzip-dataset(已解压图片) ===")
- url = f"{host}/{unzip_dir}"
- print(f"URL: {url}")
- try:
- req = urllib.request.Request(url, headers={"User-Agent": "Test"})
- with urllib.request.urlopen(req, timeout=15) as resp:
- content = resp.read().decode("utf-8", errors="replace")
- print(f"状态: {resp.status}")
- print(f"内容前 500 字符: {content[:500]}")
- except Exception as e:
- print(f"失败: {e}")
- # 测试5: 带签名访问 public-unzip-dataset
- print("\n=== 测试5: 带签名访问 public-unzip-dataset ===")
- string_to_sign = f"GET\n\n\n{expires}\n/{config['Bucket']}/{unzip_dir}"
- h = hmac.new(access_secret.encode(), string_to_sign.encode(), hashlib.sha1)
- signature = urllib.parse.quote(base64.b64encode(h.digest()))
- url = (f"{host}/{unzip_dir}"
- f"?OSSAccessKeyId={urllib.parse.quote(access_id)}"
- f"&Expires={expires}"
- f"&Signature={signature}"
- f"&security-token={urllib.parse.quote(security_token)}")
- try:
- req = urllib.request.Request(url, headers={"User-Agent": "Test"})
- with urllib.request.urlopen(req, timeout=15) as resp:
- content = resp.read().decode("utf-8", errors="replace")
- print(f"状态: {resp.status}")
- print(f"内容前 500 字符: {content[:500]}")
- except Exception as e:
- print(f"失败: {e}")
- print("\n=== 完成 ===")
|