Procházet zdrojové kódy

修复文件传输问题和数据预览问题

lxylxy123321 před 1 týdnem
rodič
revize
fd8933807d

+ 20 - 0
backend/app/core/remote_executor.py

@@ -16,6 +16,26 @@ def _get_ssh_prefix() -> list[str]:
     return prefix
 
 
+def scp_to_remote(local_path: str, remote_path: str) -> tuple[int, str, str]:
+    """通过 SCP 把本地文件复制到远端主机,返回 (exit_code, stdout, stderr)。"""
+    target = f"{settings.compute_node_ssh_user}@{settings.compute_node_host}"
+    scp_args = ["scp", *_get_ssh_prefix(), "-P", str(settings.compute_node_ssh_port)]
+    if settings.compute_node_ssh_key:
+        scp_args += ["-i", settings.compute_node_ssh_key]
+    elif settings.compute_node_ssh_password:
+        scp_args = ["sshpass", "-p", settings.compute_node_ssh_password] + scp_args
+    scp_args += [local_path, f"{target}:{remote_path}"]
+
+    try:
+        proc = subprocess.run(scp_args, capture_output=True, text=True, timeout=30)
+        clean_stderr = "\n".join(line for line in proc.stderr.split("\n")
+                                  if not line.startswith("Warning:"))
+        return proc.returncode, proc.stdout, clean_stderr
+    except Exception as e:
+        logger.error(f"SCP failed: {e}")
+        return -1, "", str(e)
+
+
 def ssh_exec(cmd: str, timeout: int | None = None) -> tuple[int, str, str]:
     """通过 SSH 在算力节点执行命令,返回 (exit_code, stdout, stderr)。"""
     if not settings.use_remote_compute:

+ 43 - 13
backend/app/services/dataset_service.py

@@ -22,6 +22,9 @@ META_FILENAMES = frozenset({
     "special_tokens_map.json", "tokenizer_config.json",
     "added_tokens.json", "vocab.json", "merges.txt",
     "config.json", "preprocessor_config.json",
+    # HF/ModelScope dataset metadata
+    "dataset_info.json", "dataset_infos.json", "dataset.json",
+    "state.json", "dataset_dict.json",
 })
 
 # File size threshold: files smaller than this (bytes) are likely metadata
@@ -70,19 +73,41 @@ async def download_dataset(req: DatasetDownloadRequest) -> DatasetDownloadRespon
 
             ds_dir = settings.processed_dir / f"ms_{req.dataset_id.replace('/', '_')}"
             ds_dir.mkdir(parents=True, exist_ok=True)
-            # 用独立进程调用 CLI,完全隔离 FastAPI 事件循环
-            proc = subprocess.run(
-                [
-                    "modelscope", "download",
-                    "--dataset", req.dataset_id,
-                    "--local_dir", str(ds_dir),
-                ],
-                capture_output=True, text=True, timeout=3600,
-            )
-            if proc.returncode != 0:
-                raise RuntimeError(f"modelscope CLI failed: {proc.stderr}")
-            # 扫描下载的文件,找训练数据
-            jsonl_path, record_count = _scan_and_convert_to_jsonl(ds_dir)
+            # 优先尝试用 modelscope 的 load_dataset API 加载数据
+            try:
+                from modelscope.msdatasets import MsDataset
+                ms_ds = MsDataset.load(req.dataset_id)
+                if hasattr(ms_ds, '__getitem__') and hasattr(ms_ds, '__len__'):
+                    split = ms_ds
+                elif isinstance(ms_ds, dict):
+                    split = ms_ds.get("train") or ms_ds.get("default") or list(ms_ds.values())[0]
+                else:
+                    split = ms_ds
+                output_path = ds_dir / "data.jsonl"
+                record_count = 0
+                with open(output_path, "w", encoding="utf-8") as f:
+                    for item in split:
+                        f.write(json.dumps({k: str(v) for k, v in item.items()}, ensure_ascii=False) + "\n")
+                        record_count += 1
+                if record_count == 0:
+                    raise RuntimeError("MsDataset loaded but returned 0 records")
+                jsonl_path = output_path
+            except (ImportError, RuntimeError) as e:
+                # 回退到 CLI 下载方式
+                logger.warning(f"MsDataset.load failed: {e}, falling back to CLI download")
+                proc = subprocess.run(
+                    [
+                        "modelscope", "download",
+                        "--dataset", req.dataset_id,
+                        "--local_dir", str(ds_dir),
+                    ],
+                    capture_output=True, text=True, timeout=3600,
+                )
+                if proc.returncode != 0:
+                    raise RuntimeError(f"modelscope CLI failed: {proc.stderr}")
+                jsonl_path, record_count = _scan_and_convert_to_jsonl(ds_dir)
+                if record_count == 0:
+                    raise RuntimeError("No training data found in downloaded dataset files")
         else:
             from datasets import load_dataset
 
@@ -99,6 +124,8 @@ async def download_dataset(req: DatasetDownloadRequest) -> DatasetDownloadRespon
                     f.write(json.dumps(item, ensure_ascii=False) + "\n")
             jsonl_path = output_path
             record_count = len(split) if hasattr(split, "__len__") else 0
+            if record_count == 0:
+                raise RuntimeError("HF dataset loaded but returned 0 records")
 
         record = DatasetRecord(
             id=str(uuid.uuid4()),
@@ -152,6 +179,9 @@ def _scan_and_convert_to_jsonl(ds_dir: Path) -> tuple[Path, int]:
                                 out.write(json.dumps(item, ensure_ascii=False) + "\n")
                                 record_count += 1
                         elif isinstance(data, dict):
+                            # 跳过 HF/ModelScope dataset metadata(features/splits 结构)
+                            if "features" in data or "splits" in data or "dataset_name" in data:
+                                continue
                             out.write(json.dumps(data, ensure_ascii=False) + "\n")
                             record_count += 1
                 except Exception:

+ 85 - 72
backend/app/services/model_test_service.py

@@ -17,85 +17,98 @@ async def test_model(model_id: str, prompt: str, max_new_tokens: int = 128, temp
 def _test_model_remote(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]:
     """通过 SSH 在算力节点执行模型测试。
 
-    使用独立的 remote_model_test.py 脚本(无 app/db 依赖,不依赖 sqlalchemy),
-    通过 SSH + heredoc 部署到远端,docker exec 在容器内执行。
+    先 scp 脚本到远端,再 docker exec 执行文件,完全避开 heredoc/引号/管道问题。
     """
     import json
-    from app.core.remote_executor import ssh_exec
-
-    # 转义 prompt 中的单引号和反斜杠,用于 shell 安全传递
-    safe_prompt = prompt.replace("\\", "\\\\").replace("'", "\\'")
+    import tempfile
+    from app.core.remote_executor import scp_to_remote, ssh_exec
 
     container = settings.compute_node_docker_container
     python = settings.compute_node_python
     workdir = settings.compute_node_workdir
 
-    # 将脚本写入远端临时文件,执行后清理
-    remote_cmd = (
-        f"cat > /tmp/remote_model_test.py << 'SCRIPT_EOF'\n"
-        f"import json, sys\n"
-        f"from pathlib import Path\n"
-        f"import torch\n"
-        f"from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModel\n"
-        f"\n"
-        f"def find_model_path(model_id):\n"
-        f"    candidates = [\n"
-        f"        '/root/.cache/huggingface/hub',\n"
-        f"        '/root/.cache/modelscope/hub',\n"
-        f"        '/root/models',\n"
-        f"    ]\n"
-        f"    for base in candidates:\n"
-        f"        bp = Path(base)\n"
-        f"        if not bp.is_dir():\n"
-        f"            continue\n"
-        f"        # Direct match\n"
-        f"        for child in bp.rglob('config.json'):\n"
-        f"            parent = child.parent\n"
-        f"            if parent.is_dir():\n"
-        f"                return str(parent)\n"
-        f"    return None\n"
-        f"\n"
-        f"model_id = sys.argv[1]\n"
-        f"prompt = sys.argv[2]\n"
-        f"max_new_tokens = int(sys.argv[3])\n"
-        f"temperature = float(sys.argv[4])\n"
-        f"top_p = float(sys.argv[5])\n"
-        f"\n"
-        f"model_path = find_model_path(model_id)\n"
-        f"if model_path is None:\n"
-        f"    print(json.dumps({{'error': f'Model not found in cache: {{model_id}}'}}))\n"
-        f"    sys.exit(1)\n"
-        f"\n"
-        f"t = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)\n"
-        f"t.pad_token = t.pad_token or t.eos_token\n"
-        f"\n"
-        f"m = None\n"
-        f"for cls, kw in [\n"
-        f"    (AutoModelForCausalLM, {{'trust_remote_code': True}}),\n"
-        f"    (AutoModel, {{'trust_remote_code': True}}),\n"
-        f"]:\n"
-        f"    try:\n"
-        f"        m = cls.from_pretrained(model_path, torch_dtype=torch.float16, device_map='auto', **kw)\n"
-        f"        break\n"
-        f"    except Exception:\n"
-        f"        pass\n"
-        f"\n"
-        f"if m is None:\n"
-        f"    print(json.dumps({{'error': 'Unable to load model'}}))\n"
-        f"    sys.exit(1)\n"
-        f"\n"
-        f"m.eval()\n"
-        f"inp = t(prompt, return_tensors='pt').to(m.device)\n"
-        f"out = m.generate(**inp, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p, do_sample={str(temperature > 0).lower()}, pad_token_id=t.eos_token_id)\n"
-        f"gen = t.decode(out[0][inp['input_ids'].shape[1]:], skip_special_tokens=True)\n"
-        f"print(json.dumps({{'generated_text': gen}}))\n"
-        f"SCRIPT_EOF\n"
-        f"\n"
-        f"docker exec -w {workdir} {container} {python} /tmp/remote_model_test.py '{model_id}' '{safe_prompt}' {max_new_tokens} {temperature} {top_p}\n"
-        f"rm -f /tmp/remote_model_test.py"
-    )
-
-    code, stdout, stderr = ssh_exec(remote_cmd, timeout=600)
+    # 独立的模型测试脚本内容(零 app/db 依赖)
+    python_script = """\
+import json, sys
+from pathlib import Path
+import torch
+from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModel
+
+def find_model_path(model_id):
+    candidates = [
+        '/root/.cache/huggingface/hub',
+        '/root/.cache/modelscope/hub',
+        '/root/models',
+    ]
+    for base in candidates:
+        bp = Path(base)
+        if not bp.is_dir():
+            continue
+        for child in bp.rglob('config.json'):
+            parent = child.parent
+            if parent.is_dir():
+                return str(parent)
+    return None
+
+model_id = sys.argv[1]
+prompt = sys.argv[2]
+max_new_tokens = int(sys.argv[3])
+temperature = float(sys.argv[4])
+top_p = float(sys.argv[5])
+
+model_path = find_model_path(model_id)
+if model_path is None:
+    print(json.dumps({'error': 'Model not found in cache'}))
+    sys.exit(1)
+
+t = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
+t.pad_token = t.pad_token or t.eos_token
+
+m = None
+for cls, kw in [
+    (AutoModelForCausalLM, {'trust_remote_code': True}),
+    (AutoModel, {'trust_remote_code': True}),
+]:
+    try:
+        m = cls.from_pretrained(model_path, torch_dtype=torch.float16, device_map='auto', **kw)
+        break
+    except Exception:
+        pass
+
+if m is None:
+    print(json.dumps({'error': 'Unable to load model'}))
+    sys.exit(1)
+
+m.eval()
+inp = t(prompt, return_tensors='pt').to(m.device)
+out = m.generate(**inp, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p, do_sample=%s, pad_token_id=t.eos_token_id)
+gen = t.decode(out[0][inp['input_ids'].shape[1]:], skip_special_tokens=True)
+print(json.dumps({'generated_text': gen}))
+""" % str(temperature > 0).lower()
+
+    # 写入本地临时文件 → scp 到远端 → docker exec 执行 → 清理
+    remote_script = "/tmp/remote_model_test.py"
+    with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False, encoding="utf-8") as tmp:
+        tmp.write(python_script)
+        tmp.flush()
+        tmp_path = tmp.name
+
+    try:
+        code, out, err = scp_to_remote(tmp_path, remote_script)
+        if code != 0:
+            logger.error(f"SCP failed: {err}")
+            return {"error": f"Failed to upload script: {err.strip()}"}
+
+        remote_cmd = f"docker exec -w {workdir} {container} {python} {remote_script} '{model_id}' '{prompt.replace(chr(39), chr(92)+chr(39))}' {max_new_tokens} {temperature} {top_p}"
+        code, stdout, stderr = ssh_exec(remote_cmd, timeout=600)
+
+        if code != 0:
+            logger.error(f"Remote model test failed: {stderr}")
+            return {"error": stderr.strip() or "Remote test failed"}
+    finally:
+        import os
+        os.unlink(tmp_path)
+        ssh_exec(f"rm -f {remote_script}", timeout=10)
 
     logger.info(f"Remote test result: code={code}, stdout_len={len(stdout)}, stderr_len={len(stderr)}")
     if stdout: