ソースを参照

修复远程传输与模型下载报错

lxylxy123321 1 週間 前
コミット
b4fd4e1052
3 ファイル変更117 行追加134 行削除
  1. 75 71
      backend/app/services/dataset_service.py
  2. 26 13
      backend/app/services/model_test_service.py
  3. 16 50
      result.txt

+ 75 - 71
backend/app/services/dataset_service.py

@@ -1,7 +1,7 @@
 import asyncio
 import json
 import uuid
-from datetime import datetime
+from datetime import datetime, timezone
 from pathlib import Path
 from typing import Any
 
@@ -22,9 +22,6 @@ META_FILENAMES = frozenset({
     "special_tokens_map.json", "tokenizer_config.json",
     "added_tokens.json", "vocab.json", "merges.txt",
     "config.json", "preprocessor_config.json",
-    # HF/ModelScope dataset metadata
-    "dataset_info.json", "dataset_infos.json", "dataset.json",
-    "state.json", "dataset_dict.json",
 })
 
 # File size threshold: files smaller than this (bytes) are likely metadata
@@ -69,25 +66,7 @@ async def download_dataset(req: DatasetDownloadRequest) -> DatasetDownloadRespon
     """从 HuggingFace 或 ModelScope 下载数据集。"""
     try:
         if req.use_modelscope:
-            # ModelScope 数据集是 HF 镜像,直接用 datasets 库加载
-            from datasets import load_dataset
-
-            ds_dir = settings.processed_dir / f"ms_{req.dataset_id.replace('/', '_')}"
-            ds_dir.mkdir(parents=True, exist_ok=True)
-            ds = load_dataset(req.dataset_id)
-            if "train" in ds:
-                split = ds["train"]
-            else:
-                split = ds[list(ds.keys())[0]]
-            output_path = ds_dir / "data.jsonl"
-            record_count = 0
-            with open(output_path, "w", encoding="utf-8") as f:
-                for item in split:
-                    f.write(json.dumps(item, ensure_ascii=False) + "\n")
-                    record_count += 1
-            if record_count == 0:
-                raise RuntimeError("Dataset loaded but returned 0 records")
-            jsonl_path = output_path
+            ds_dir, jsonl_path, record_count = await asyncio.to_thread(_download_modelscope_dataset, req.dataset_id)
         else:
             from datasets import load_dataset
 
@@ -104,8 +83,6 @@ async def download_dataset(req: DatasetDownloadRequest) -> DatasetDownloadRespon
                     f.write(json.dumps(item, ensure_ascii=False) + "\n")
             jsonl_path = output_path
             record_count = len(split) if hasattr(split, "__len__") else 0
-            if record_count == 0:
-                raise RuntimeError("HF dataset loaded but returned 0 records")
 
         record = DatasetRecord(
             id=str(uuid.uuid4()),
@@ -113,7 +90,7 @@ async def download_dataset(req: DatasetDownloadRequest) -> DatasetDownloadRespon
             format="jsonl",
             record_count=record_count,
             file_path=str(jsonl_path),
-            created_at=datetime.utcnow(),
+            created_at=datetime.now(timezone.utc),
         )
         async with async_session() as session:
             session.add(record)
@@ -126,55 +103,82 @@ async def download_dataset(req: DatasetDownloadRequest) -> DatasetDownloadRespon
         return DatasetDownloadResponse(dataset_id=req.dataset_id, status="failed", error=str(e))
 
 
-def _scan_and_convert_to_jsonl(ds_dir: Path) -> tuple[Path, int]:
-    """扫描 CLI 下载的数据集目录,找训练数据文件并转为 JSONL。"""
-    # 找所有可能的数据文件
-    data_files = []
-    for ext in ("*.jsonl", "*.json", "*.csv"):
-        data_files.extend(ds_dir.rglob(ext))
-    # 过滤掉元数据文件
-    data_files = [f for f in data_files if f.name not in META_FILENAMES]
+def _download_modelscope_dataset(dataset_id: str) -> tuple[Path, Path, int]:
+    """用 snapshot_download 下载数据集文件,完全绕过 datasets 库,避免版本兼容问题。"""
+    from modelscope import snapshot_download
 
-    if not data_files:
-        raise RuntimeError(f"No dataset files found in {ds_dir}")
+    ds_dir = settings.processed_dir / f"ms_{dataset_id.replace('/', '_')}"
+    ds_dir.mkdir(parents=True, exist_ok=True)
+
+    local_path = snapshot_download(dataset_id, cache_dir=str(settings.processed_dir))
 
+    # ModelScope 的 snapshot_download 把实际数据存到 cache_dir/downloads/<hash> 里
+    # 而 local_path 指向的目录只有元数据文件,需要额外扫描 downloads 目录
+    all_files = [p for p in Path(local_path).rglob("*") if p.is_file()]
+    downloads_dir = settings.processed_dir / "downloads"
+    if downloads_dir.exists():
+        for p in downloads_dir.rglob("*"):
+            if p.is_file() and str(p.parent) != str(ds_dir):
+                all_files.append(p)
+
+    # 识别训练数据文件
+    data_files = [f for f in all_files if _is_training_data_file(f)]
+
+    if not data_files:
+        fallback = [f for f in all_files if f.suffix in (".json", ".jsonl")]
+        logger.warning(f"No training data files found in {dataset_id}. "
+                       f"Available JSON files: {[f.name for f in fallback]}")
+        if fallback:
+            data_files = fallback
+        else:
+            raise ValueError(f"No JSON/JSONL data files found in dataset {dataset_id}")
+
+    # 优先取 train / data 开头的文件
+    target = None
+    for name in ("train.jsonl", "train.json", "data.jsonl", "data.json"):
+        for f in data_files:
+            if f.name == name:
+                target = f
+                break
+        if target:
+            break
+    if not target:
+        # 优先取数据量最大的文件
+        target = sorted(data_files, key=lambda f: f.stat().st_size, reverse=True)[0]
+
+    logger.info(f"Selected data file: {target} (size={target.stat().st_size})")
+
+    # 读取并统一转为 JSONL
     jsonl_path = ds_dir / "data.jsonl"
     record_count = 0
+    content = target.read_text(encoding="utf-8")
+
+    if target.suffix == ".jsonl" or not target.suffix:
+        # JSONL 或无后缀文件:尝试逐行解析
+        records = []
+        for line in content.splitlines():
+            line = line.strip()
+            if not line:
+                continue
+            try:
+                records.append(json.loads(line))
+            except json.JSONDecodeError:
+                # 如果逐行解析失败,尝试整体解析(可能是 JSON 数组)
+                records = json.loads(content)
+                if not isinstance(records, list):
+                    records = [records]
+                break
+    else:
+        records = json.loads(content)
+        if not isinstance(records, list):
+            records = [records]
+
+    with open(jsonl_path, "w", encoding="utf-8") as f:
+        for item in records:
+            f.write(json.dumps(item, ensure_ascii=False) + "\n")
+            record_count += 1
 
-    with open(jsonl_path, "w", encoding="utf-8") as out:
-        for data_file in data_files:
-            if data_file.suffix == ".jsonl":
-                with open(data_file, "r", encoding="utf-8") as f:
-                    for line in f:
-                        line = line.strip()
-                        if line:
-                            out.write(line + "\n")
-                            record_count += 1
-            elif data_file.suffix == ".json":
-                try:
-                    with open(data_file, "r", encoding="utf-8") as f:
-                        data = json.load(f)
-                        if isinstance(data, list):
-                            for item in data:
-                                out.write(json.dumps(item, ensure_ascii=False) + "\n")
-                                record_count += 1
-                        elif isinstance(data, dict):
-                            # 跳过 HF/ModelScope dataset metadata(features/splits 结构)
-                            if "features" in data or "splits" in data or "dataset_name" in data:
-                                continue
-                            out.write(json.dumps(data, ensure_ascii=False) + "\n")
-                            record_count += 1
-                except Exception:
-                    pass
-            elif data_file.suffix == ".csv":
-                import csv
-                with open(data_file, "r", encoding="utf-8") as f:
-                    reader = csv.DictReader(f)
-                    for row in reader:
-                        out.write(json.dumps(dict(row), ensure_ascii=False) + "\n")
-                        record_count += 1
-
-    return jsonl_path, record_count
+    return ds_dir, jsonl_path, record_count
 
 
 async def upload_dataset(file: UploadFile) -> dict[str, Any]:
@@ -200,7 +204,7 @@ async def upload_dataset(file: UploadFile) -> dict[str, Any]:
         format=fmt,
         record_count=record_count,
         file_path=str(file_path),
-        created_at=datetime.utcnow(),
+        created_at=datetime.now(timezone.utc),
     )
     async with async_session() as session:
         session.add(record)

+ 26 - 13
backend/app/services/model_test_service.py

@@ -10,14 +10,15 @@ settings = get_settings()
 async def test_model(model_id: str, prompt: str, max_new_tokens: int = 128, temperature: float = 0.8, top_p: float = 0.95) -> dict[str, Any]:
     """加载已缓存模型并生成测试响应。"""
     if settings.use_remote_compute:
-        return _test_model_remote(model_id, prompt, max_new_tokens, temperature, top_p)
-    return _test_model_local(model_id, prompt, max_new_tokens, temperature, top_p)
+        return await _test_model_remote(model_id, prompt, max_new_tokens, temperature, top_p)
+    return await _test_model_local(model_id, prompt, max_new_tokens, temperature, top_p)
 
 
-def _test_model_remote(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]:
-    """通过 SSH 在算力节点执行模型测试。
+async def _test_model_remote(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]:
+    """在算力节点容器内执行模型测试(通过 SSH + docker exec)
 
-    通过环境变量传递参数,base64 编码脚本通过 stdin 管道传给 docker exec -i python。
+    方案:通过 SSH 在远端容器内直接执行 Python 单行命令,
+    所有参数通过环境变量传入,避免任何引号/转义问题。
     """
     import base64
     import json
@@ -27,11 +28,11 @@ def _test_model_remote(model_id: str, prompt: str, max_new_tokens: int, temperat
     python = settings.compute_node_python
     workdir = settings.compute_node_workdir
 
-    # 参数通过 base64 编码,脚本内通过 os.environ 读取,完全避免引号/转义问题
-    prompt_b64 = base64.b64encode(prompt.encode('utf-8')).decode()
+    # 将 prompt 进行 base64 编码,避免引号/特殊字符问题
+    prompt_b64 = base64.b64encode(prompt.encode("utf-8")).decode()
     do_sample = str(temperature > 0).lower()
 
-    # 独立的 Python 脚本(参数通过环境变量传入)
+    # 独立脚本:零 app/db 依赖,参数全部通过环境变量传入
     script = rf"""\
 import json, os, base64
 from pathlib import Path
@@ -39,12 +40,26 @@ import torch
 from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModel
 
 def find_model_path(model_id):
-    for base in ['/root/.cache/huggingface/hub', '/root/.cache/modelscope/hub', '/root/models']:
+    # 远端实际存储路径(与 model_service.resolve_model_path 一致)
+    for base in [
+        '/root/Fine-tuning/backend/data/models',
+        '/root/.cache/huggingface/hub',
+        '/root/.cache/modelscope/hub',
+        '/root/models',
+    ]:
         bp = Path(base)
         if not bp.is_dir():
             continue
+        # 尝试 namespace_name 扁平化匹配(HF 风格)
+        flat_name = model_id.replace("/", "_")
+        if (bp / flat_name / "config.json").exists():
+            return str(bp / flat_name)
+        # 尝试 namespace/name 嵌套匹配(ModelScope 风格)
+        if (bp / model_id / "config.json").exists():
+            return str(bp / model_id)
+        # 扫描所有目录
         try:
-            for child in bp.rglob('config.json'):
+            for child in bp.rglob("config.json"):
                 if child.parent.is_dir():
                     return str(child.parent)
         except Exception:
@@ -87,7 +102,7 @@ print(json.dumps({{'generated_text': gen}}))
 
     script_b64 = base64.b64encode(script.encode()).decode()
 
-    # 环境变量通过 docker exec -e 传入容器,脚本通过 stdin 管道传入
+    # 通过环境变量传递参数,脚本通过 stdin 管道传入容器内的 Python
     remote_cmd = (
         f"echo {script_b64} | base64 -d | "
         f"docker exec -i -w {workdir} "
@@ -112,7 +127,6 @@ print(json.dumps({{'generated_text': gen}}))
         logger.error(f"Remote model test failed: {stderr}")
         return {"error": stderr.strip() or "Remote test failed"}
 
-    # 提取最后一行 JSON
     for line in reversed(stdout.strip().split("\n")):
         line = line.strip()
         if line.startswith("{"):
@@ -145,7 +159,6 @@ async def _test_model_local(model_id: str, prompt: str, max_new_tokens: int, tem
     if tokenizer.pad_token is None:
         tokenizer.pad_token = tokenizer.eos_token
 
-    # 通用加载策略:尝试多种加载方式,自动兼容各种新架构
     model = None
     for loader_cls, kwargs in [
         (AutoModelForCausalLM, {"trust_remote_code": True}),

+ 16 - 50
result.txt

@@ -1,57 +1,23 @@
 lq@lq:~$ sudo docker logs -f finetune-backend
 INFO:     Started server process [1]
 INFO:     Waiting for application startup.
-2026-05-19 16:26:52 | INFO     | peft-platform | JobQueue started with 2 workers
+2026-05-19 16:40:10 | INFO     | peft-platform | JobQueue started with 2 workers
 INFO:     Application startup complete.
 INFO:     Uvicorn running on http://0.0.0.0:8010 (Press CTRL+C to quit)
-INFO:     127.0.0.1:56270 - "GET /health HTTP/1.1" 200 OK
-INFO:     172.20.0.4:51748 - "GET /api/v1/models/ HTTP/1.0" 200 OK
-2026-05-19 16:27:27 | ERROR    | peft-platform | SSH command timeout after 10s: docker cp /tmp/_model_test_host.py finetune-trainer:/tmp/_model_test.py
-2026-05-19 16:27:27 | ERROR    | peft-platform | docker cp failed: Command timed out after 10s
-2026-05-19 16:27:37 | ERROR    | peft-platform | SSH command timeout after 10s: rm -f /tmp/_model_test_host.py
-2026-05-19 16:27:47 | ERROR    | peft-platform | SSH command timeout after 10s: docker exec finetune-trainer rm -f /tmp/_model_test.py
-INFO:     172.20.0.4:51758 - "POST /api/v1/models/test HTTP/1.0" 400 Bad Request
-INFO:     127.0.0.1:44200 - "GET /health HTTP/1.1" 200 OK
-2026-05-19 16:28:00 | ERROR    | peft-platform | Dataset download failed: No module named 'oss2'
-INFO:     172.20.0.4:54718 - "POST /api/v1/datasets/download HTTP/1.0" 400 Bad Request
-INFO:     172.20.0.4:54728 - "GET /api/v1/datasets/ HTTP/1.0" 200 OK
-INFO:     127.0.0.1:47648 - "GET /health HTTP/1.1" 200 OK
-INFO:     127.0.0.1:50168 - "GET /health HTTP/1.1" 200 OK
-INFO:     127.0.0.1:45410 - "GET /health HTTP/1.1" 200 OK
-INFO:     127.0.0.1:41070 - "GET /health HTTP/1.1" 200 OK
-INFO:     127.0.0.1:55204 - "GET /health HTTP/1.1" 200 OK
-INFO:     127.0.0.1:49536 - "GET /health HTTP/1.1" 200 OK
-INFO:     127.0.0.1:56426 - "GET /health HTTP/1.1" 200 OK
-INFO:     127.0.0.1:37942 - "GET /health HTTP/1.1" 200 OK
-INFO:     127.0.0.1:39410 - "GET /health HTTP/1.1" 200 OK
-INFO:     127.0.0.1:43676 - "GET /health HTTP/1.1" 200 OK
-INFO:     127.0.0.1:56854 - "GET /health HTTP/1.1" 200 OK
-INFO:     127.0.0.1:34694 - "GET /health HTTP/1.1" 200 OK
-INFO:     127.0.0.1:51000 - "GET /health HTTP/1.1" 200 OK
-INFO:     127.0.0.1:46542 - "GET /health HTTP/1.1" 200 OK
-INFO:     Shutting down
-INFO:     Waiting for application shutdown.
-2026-05-19 16:35:04 | INFO     | peft-platform | JobQueue stopped
-INFO:     Application shutdown complete.
-INFO:     Finished server process [1]
-lq@lq:~$ sudo docker logs -f finetune-backend
-INFO:     Started server process [1]
-INFO:     Waiting for application startup.
-2026-05-19 16:35:07 | INFO     | peft-platform | JobQueue started with 2 workers
-INFO:     Application startup complete.
-INFO:     Uvicorn running on http://0.0.0.0:8010 (Press CTRL+C to quit)
-INFO:     127.0.0.1:53532 - "GET /health HTTP/1.1" 200 OK
-2026-05-19 16:35:19 | ERROR    | peft-platform | Dataset download failed: cannot import name 'get_metadata_patterns' from 'datasets.data_files' (/usr/local/lib/python3.10/site-packages/datasets/data_files.py)
-INFO:     172.20.0.4:44290 - "POST /api/v1/datasets/download HTTP/1.0" 400 Bad Request
-INFO:     172.20.0.4:43040 - "GET /api/v1/models/ HTTP/1.0" 200 OK
-2026-05-19 16:36:01 | INFO     | peft-platform | Remote test result: code=1, stdout_len=0, stderr_len=110
-2026-05-19 16:36:01 | INFO     | peft-platform | stderr (first 500): Traceback (most recent call last):
-  File "<stdin>", line 16, in <module>
-IndexError: list index out of range
+INFO:     127.0.0.1:59114 - "GET /health HTTP/1.1" 200 OK
+INFO:     172.20.0.4:38930 - "GET /api/v1/models/ HTTP/1.0" 200 OK
+2026-05-19 16:40:50 | INFO     | peft-platform | Remote test result: code=1, stdout_len=57, stderr_len=0
+2026-05-19 16:40:50 | INFO     | peft-platform | stdout (first 500): {"error": "Model not found in cache: Qwen/Qwen3.5-0.8B"}
 
-2026-05-19 16:36:01 | ERROR    | peft-platform | Remote model test failed: Traceback (most recent call last):
-  File "<stdin>", line 16, in <module>
-IndexError: list index out of range
+2026-05-19 16:40:50 | ERROR    | peft-platform | Remote model test failed: 
+INFO:     172.20.0.4:38938 - "POST /api/v1/models/test HTTP/1.0" 400 Bad Request
+INFO:     127.0.0.1:56142 - "GET /health HTTP/1.1" 200 OK
 
-INFO:     172.20.0.4:43052 - "POST /api/v1/models/test HTTP/1.0" 400 Bad Request
-INFO:     127.0.0.1:42716 - "GET /health HTTP/1.1" 200 OK
+'[Errno 101] Network is unreachable' thrown while requesting HEAD https://huggingface.co/datasets/yanalong/yanalong/resolve/main/README.md
+2026-05-19 16:42:19 | WARNING  | huggingface_hub.utils._http | '[Errno 101] Network is unreachable' thrown while requesting HEAD https://huggingface.co/datasets/yanalong/yanalong/resolve/main/README.md
+Retrying in 1s [Retry 1/5].
+2026-05-19 16:42:19 | WARNING  | huggingface_hub.utils._http | Retrying in 1s [Retry 1/5].
+2026-05-19 16:42:20 | ERROR    | peft-platform | Dataset download failed: Cannot send a request, as the client has been closed.
+INFO:     172.20.0.4:33656 - "POST /api/v1/datasets/download HTTP/1.0" 400 Bad Request
+INFO:     127.0.0.1:57332 - "GET /health HTTP/1.1" 200 OK
+INFO:     127.0.0.1:45734 - "GET /health HTTP/1.1" 200 OK