|
|
@@ -122,108 +122,111 @@ def _extract_archives(ds_dir: Path):
|
|
|
|
|
|
|
|
|
def _download_modelscope_data_files(dataset_id: str, ds_dir: Path):
|
|
|
- """通过 ModelScope API 下载"数据文件"区的压缩包。
|
|
|
- CLI download 只能下载 git 仓库中的元数据文件(如 train.csv),
|
|
|
- 图片数据集的压缩包存放在"数据文件"区,需要通过 API 单独下载。"""
|
|
|
+ """通过 ModelScope API 下载图片数据集的压缩包。
|
|
|
+
|
|
|
+ 图片数据集有一个 {dataset_name}.json 配置文件,记录了各 split 对应的
|
|
|
+ 元数据文件和压缩包名称,例如:
|
|
|
+ {"default": {"train": {"meta": "train.csv", "file": "train.zip"},
|
|
|
+ "validation": {"meta": "val.csv", "file": "val.zip"}}}
|
|
|
+
|
|
|
+ CLI download 只下载 git 仓库文件(CSV 等元数据),
|
|
|
+ 压缩包需要通过 /api/v1/datasets/{ns}/{name}/repo?FilePath=... 单独下载。
|
|
|
+ """
|
|
|
import urllib.request
|
|
|
- import urllib.error
|
|
|
import urllib.parse
|
|
|
|
|
|
api_base = "https://www.modelscope.cn"
|
|
|
- archive_exts = (".zip", ".tar.gz", ".tgz", ".tar.bz2", ".tbz2", ".tar")
|
|
|
- existing = {f.name for f in ds_dir.rglob("*") if f.is_file()}
|
|
|
-
|
|
|
- # Step 1: 获取数据集的数字 hub ID(tree API 需要用数字 ID,不是 namespace/name)
|
|
|
- hub_id = None
|
|
|
- try:
|
|
|
- info_url = f"{api_base}/api/v1/datasets/{dataset_id}"
|
|
|
- logger.info(f"Fetching dataset hub ID: {info_url}")
|
|
|
- req = urllib.request.Request(info_url, headers={"User-Agent": "FineTuning-Backend"})
|
|
|
- with urllib.request.urlopen(req, timeout=30) as resp:
|
|
|
- info = json.loads(resp.read().decode())
|
|
|
- hub_id = info.get("Data", {}).get("Id") or info.get("Data", {}).get("id")
|
|
|
- if hub_id:
|
|
|
- logger.info(f"Got dataset hub ID: {hub_id}")
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"Failed to get dataset hub ID: {e}")
|
|
|
+ namespace, ds_name = dataset_id.split("/", 1) if "/" in dataset_id else ("", dataset_id)
|
|
|
|
|
|
- # Step 2: 用 hub ID 列出仓库中的所有文件
|
|
|
- files = []
|
|
|
- if hub_id:
|
|
|
+ # Step 1: 找到配置文件 {dataset_name}.json 并读取
|
|
|
+ config_files = [p for p in ds_dir.glob("*.json") if p.name not in META_FILENAMES]
|
|
|
+ if not config_files:
|
|
|
+ # 尝试通过 API 下载配置文件
|
|
|
+ config_url = (f"{api_base}/api/v1/datasets/{dataset_id}/repo"
|
|
|
+ f"?Source=SDK&Revision=master&FilePath={ds_name}.json&View=false")
|
|
|
try:
|
|
|
- tree_url = (f"{api_base}/api/v1/datasets/{hub_id}/repo/tree"
|
|
|
- f"?Revision=master&Root=/&Recursive=True&PageNumber=1&PageSize=10000")
|
|
|
- logger.info(f"Listing dataset files: {tree_url}")
|
|
|
- req = urllib.request.Request(tree_url, headers={"User-Agent": "FineTuning-Backend"})
|
|
|
+ logger.info(f"尝试下载配置文件: {ds_name}.json")
|
|
|
+ req = urllib.request.Request(config_url, headers={"User-Agent": "FineTuning-Backend"})
|
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
|
- result = json.loads(resp.read().decode())
|
|
|
- files = result.get("Data", {}).get("Files", [])
|
|
|
- if files:
|
|
|
- logger.info(f"Found {len(files)} files in dataset repo")
|
|
|
- for f in files:
|
|
|
- fn = f.get("Name", f.get("name", ""))
|
|
|
- if fn:
|
|
|
- logger.debug(f" - {fn}")
|
|
|
- except urllib.error.HTTPError as e:
|
|
|
- logger.warning(f"Tree API error ({e.code}) for hub_id={hub_id}")
|
|
|
+ config_data = json.loads(resp.read().decode())
|
|
|
+ config_path = ds_dir / f"{ds_name}.json"
|
|
|
+ config_path.write_text(json.dumps(config_data, ensure_ascii=False), encoding="utf-8")
|
|
|
+ config_files = [config_path]
|
|
|
except Exception as e:
|
|
|
- logger.warning(f"Failed to list files via tree API: {e}")
|
|
|
+ logger.info(f"未找到配置文件 {ds_name}.json,跳过数据文件下载: {e}")
|
|
|
+ return
|
|
|
|
|
|
- if not files:
|
|
|
- # fallback: 直接用 namespace/name 格式尝试
|
|
|
+ # 在所有 json 配置文件中找到包含 "file" 字段的那个
|
|
|
+ config = None
|
|
|
+ for cf in config_files:
|
|
|
try:
|
|
|
- tree_url = (f"{api_base}/api/v1/datasets/{dataset_id}/repo/tree"
|
|
|
- f"?Revision=master&Root=/&Recursive=True&PageNumber=1&PageSize=10000")
|
|
|
- logger.info(f"Fallback tree URL: {tree_url}")
|
|
|
- req = urllib.request.Request(tree_url, headers={"User-Agent": "FineTuning-Backend"})
|
|
|
- with urllib.request.urlopen(req, timeout=30) as resp:
|
|
|
- result = json.loads(resp.read().decode())
|
|
|
- files = result.get("Data", {}).get("Files", [])
|
|
|
- if files:
|
|
|
- logger.info(f"Fallback found {len(files)} files")
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"Fallback tree API also failed: {e}")
|
|
|
+ data = json.loads(cf.read_text(encoding="utf-8"))
|
|
|
+ # 检查是否包含 file 字段(数据集配置格式)
|
|
|
+ if isinstance(data, dict):
|
|
|
+ for subset in data.values():
|
|
|
+ if isinstance(subset, dict):
|
|
|
+ for split_info in subset.values():
|
|
|
+ if isinstance(split_info, dict) and "file" in split_info and split_info["file"]:
|
|
|
+ config = data
|
|
|
+ break
|
|
|
+ if config:
|
|
|
+ break
|
|
|
+ if config:
|
|
|
+ break
|
|
|
+ except (json.JSONDecodeError, UnicodeDecodeError):
|
|
|
+ continue
|
|
|
|
|
|
- if not files:
|
|
|
- logger.warning(f"Could not list any files for dataset {dataset_id}")
|
|
|
+ if not config:
|
|
|
+ logger.info("未找到包含数据文件引用的配置文件,跳过")
|
|
|
return
|
|
|
|
|
|
- # Step 3: 筛选压缩包文件并下载
|
|
|
- # 下载 URL 用 namespace/name 格式: /api/v1/datasets/{ns}/{name}/repo?Source=SDK&Revision=master&FilePath=...
|
|
|
- namespace, name = dataset_id.split("/", 1)
|
|
|
- downloaded = []
|
|
|
- for file_info in files:
|
|
|
- fname = file_info.get("Name", file_info.get("name", ""))
|
|
|
- fpath = file_info.get("Path", file_info.get("path", fname))
|
|
|
- if not fname:
|
|
|
- continue
|
|
|
- if not any(fname.lower().endswith(ext) for ext in archive_exts):
|
|
|
+ logger.info(f"找到数据文件配置: {json.dumps(config, ensure_ascii=False)}")
|
|
|
+
|
|
|
+ # Step 2: 收集所有需要下载的压缩包文件名
|
|
|
+ archive_files = set()
|
|
|
+ for subset in config.values():
|
|
|
+ if not isinstance(subset, dict):
|
|
|
continue
|
|
|
+ for split_info in subset.values():
|
|
|
+ if isinstance(split_info, dict):
|
|
|
+ fname = split_info.get("file", "")
|
|
|
+ if fname:
|
|
|
+ archive_files.add(fname)
|
|
|
+
|
|
|
+ if not archive_files:
|
|
|
+ logger.info("配置中未找到数据文件,跳过")
|
|
|
+ return
|
|
|
+
|
|
|
+ # Step 3: 下载压缩包
|
|
|
+ existing = {f.name for f in ds_dir.rglob("*") if f.is_file()}
|
|
|
+ downloaded = []
|
|
|
+ for fname in archive_files:
|
|
|
if fname in existing:
|
|
|
- logger.info(f"Archive already exists, skipping: {fname}")
|
|
|
+ logger.info(f"压缩包已存在,跳过: {fname}")
|
|
|
continue
|
|
|
|
|
|
params = urllib.parse.urlencode({
|
|
|
"Source": "SDK", "Revision": "master",
|
|
|
- "FilePath": fpath, "View": "false",
|
|
|
+ "FilePath": fname, "View": "false",
|
|
|
})
|
|
|
- dl_url = f"{api_base}/api/v1/datasets/{namespace}/{name}/repo?{params}"
|
|
|
+ dl_url = f"{api_base}/api/v1/datasets/{dataset_id}/repo?{params}"
|
|
|
dest = ds_dir / fname
|
|
|
- logger.info(f"Downloading data file: {fname} from {dl_url}")
|
|
|
+ logger.info(f"下载数据文件: {fname}")
|
|
|
+ logger.info(f" URL: {dl_url}")
|
|
|
|
|
|
try:
|
|
|
req = urllib.request.Request(dl_url, headers={"User-Agent": "FineTuning-Backend"})
|
|
|
with urllib.request.urlopen(req, timeout=600) as resp:
|
|
|
dest.write_bytes(resp.read())
|
|
|
downloaded.append(fname)
|
|
|
- logger.info(f"Downloaded: {fname} ({dest.stat().st_size / 1024 / 1024:.1f}MB)")
|
|
|
+ logger.info(f" 下载完成: {fname} ({dest.stat().st_size / 1024 / 1024:.1f}MB)")
|
|
|
except Exception as e:
|
|
|
- logger.warning(f"Failed to download {fname}: {e}")
|
|
|
+ logger.warning(f" 下载失败 {fname}: {e}")
|
|
|
|
|
|
if downloaded:
|
|
|
- logger.info(f"Downloaded {len(downloaded)} data file(s): {downloaded}")
|
|
|
+ logger.info(f"共下载 {len(downloaded)} 个数据文件: {downloaded}")
|
|
|
else:
|
|
|
- logger.info("No downloadable archives found in dataset repo")
|
|
|
+ logger.info("没有需要下载的数据文件")
|
|
|
|
|
|
|
|
|
async def download_dataset(req: DatasetDownloadRequest) -> DatasetDownloadResponse:
|
|
|
@@ -400,106 +403,12 @@ async def recover_stale_downloads() -> None:
|
|
|
|
|
|
|
|
|
def _download_modelscope_dataset(dataset_id: str) -> tuple[Path, Path, int]:
|
|
|
- """用 MsDataset.load() 下载数据集,支持图片数据集(自动从 CDN 下载图片)。
|
|
|
- 如果 MsDataset.load() 失败,fallback 到 CLI 方式。"""
|
|
|
- namespace, ds_name = dataset_id.split("/", 1) if "/" in dataset_id else ("", dataset_id)
|
|
|
+ """用 modelscope CLI 下载数据集仓库文件,再通过 API 下载数据文件区的压缩包。"""
|
|
|
ds_dir = settings.processed_dir / f"ms_{dataset_id.replace('/', '_')}"
|
|
|
ds_dir.mkdir(parents=True, exist_ok=True)
|
|
|
-
|
|
|
- # 优先用 MsDataset.load(),它能自动下载"数据文件"区的图片
|
|
|
- try:
|
|
|
- records, record_count = _download_via_msdataset(dataset_id, ds_dir)
|
|
|
- if records:
|
|
|
- jsonl_path = ds_dir / "data.jsonl"
|
|
|
- with open(jsonl_path, "w", encoding="utf-8") as f:
|
|
|
- for item in records:
|
|
|
- f.write(json.dumps(item, ensure_ascii=False) + "\n")
|
|
|
- logger.info(f"MsDataset.load() 成功: {dataset_id} ({record_count} records)")
|
|
|
- return ds_dir, jsonl_path, record_count
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"MsDataset.load() failed for {dataset_id}: {e}, falling back to CLI")
|
|
|
-
|
|
|
- # fallback: CLI 方式(只下载 git 仓库文件,不含数据文件区图片)
|
|
|
return _download_modelscope_dataset_cli(dataset_id, ds_dir)
|
|
|
|
|
|
|
|
|
-def _download_via_msdataset(dataset_id: str, ds_dir: Path) -> tuple[list[dict], int]:
|
|
|
- """用 MsDataset.load() 下载数据集,处理图片列(PIL.Image → 保存到磁盘)。"""
|
|
|
- from modelscope.msdatasets import MsDataset
|
|
|
- from PIL import Image
|
|
|
-
|
|
|
- namespace, ds_name = dataset_id.split("/", 1) if "/" in dataset_id else ("", dataset_id)
|
|
|
- images_dir = ds_dir / "images"
|
|
|
-
|
|
|
- # 尝试加载不同 split
|
|
|
- ds = None
|
|
|
- for split in ("train", "validation", "test"):
|
|
|
- try:
|
|
|
- if namespace:
|
|
|
- ds = MsDataset.load(ds_name, namespace=namespace, split=split)
|
|
|
- else:
|
|
|
- ds = MsDataset.load(dataset_id, split=split)
|
|
|
- if ds:
|
|
|
- logger.info(f"MsDataset.load() loaded split '{split}': {len(ds) if hasattr(ds, '__len__') else '?'} records")
|
|
|
- break
|
|
|
- except Exception as e:
|
|
|
- logger.debug(f"split '{split}' failed: {e}")
|
|
|
-
|
|
|
- if not ds:
|
|
|
- # 不带 split 参数试试
|
|
|
- try:
|
|
|
- if namespace:
|
|
|
- ds = MsDataset.load(ds_name, namespace=namespace)
|
|
|
- else:
|
|
|
- ds = MsDataset.load(dataset_id)
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"MsDataset.load() without split also failed: {e}")
|
|
|
- return [], 0
|
|
|
-
|
|
|
- if not ds:
|
|
|
- return [], 0
|
|
|
-
|
|
|
- # 检查是否 iterable
|
|
|
- if not hasattr(ds, '__iter__'):
|
|
|
- return [], 0
|
|
|
-
|
|
|
- records = []
|
|
|
- img_counter = 0
|
|
|
- columns = None
|
|
|
-
|
|
|
- for row in ds:
|
|
|
- if not isinstance(row, dict):
|
|
|
- continue
|
|
|
- if columns is None:
|
|
|
- columns = list(row.keys())
|
|
|
-
|
|
|
- record = {}
|
|
|
- for k, v in row.items():
|
|
|
- if isinstance(v, Image.Image):
|
|
|
- # 图片对象:保存到磁盘,记录相对路径
|
|
|
- images_dir.mkdir(parents=True, exist_ok=True)
|
|
|
- img_name = f"{img_counter:06d}.jpg"
|
|
|
- img_path = images_dir / img_name
|
|
|
- if v.mode in ("RGBA", "P", "LA"):
|
|
|
- v = v.convert("RGB")
|
|
|
- v.save(str(img_path), format="JPEG", quality=90)
|
|
|
- record[k] = f"images/{img_name}"
|
|
|
- img_counter += 1
|
|
|
- else:
|
|
|
- record[k] = v
|
|
|
-
|
|
|
- records.append(record)
|
|
|
-
|
|
|
- # 进度日志
|
|
|
- if len(records) % 500 == 0:
|
|
|
- logger.info(f" 处理中... {len(records)} records, {img_counter} images saved")
|
|
|
-
|
|
|
- if img_counter > 0:
|
|
|
- logger.info(f"共保存 {img_counter} 张图片到 {images_dir}")
|
|
|
-
|
|
|
- return records, len(records)
|
|
|
-
|
|
|
-
|
|
|
def _download_modelscope_dataset_cli(dataset_id: str, ds_dir: Path) -> tuple[Path, Path, int]:
|
|
|
"""CLI 方式下载数据集(fallback,只下载 git 仓库文件)。"""
|
|
|
import subprocess
|
|
|
@@ -511,6 +420,10 @@ def _download_modelscope_dataset_cli(dataset_id: str, ds_dir: Path) -> tuple[Pat
|
|
|
logger.error(f"ModelScope CLI download failed (code={proc.returncode}): {proc.stderr[:500]}")
|
|
|
raise RuntimeError(f"ModelScope download failed: {proc.stderr[:500]}")
|
|
|
|
|
|
+ # CLI 下载完 git 仓库文件后,通过 API 下载数据文件区的压缩包并解压
|
|
|
+ _download_modelscope_data_files(dataset_id, ds_dir)
|
|
|
+ _extract_archives(ds_dir)
|
|
|
+
|
|
|
# 扫描下载目录中的所有文件
|
|
|
all_files = [p for p in ds_dir.rglob("*") if p.is_file()]
|
|
|
logger.info(f"CLI downloaded {len(all_files)} files to {ds_dir}")
|