Parcourir la source

将训练后的流程放在253上执行

lxylxy123321 il y a 1 semaine
Parent
commit
33609e7898

+ 92 - 0
backend/app/core/remote_deploy.py

@@ -0,0 +1,92 @@
+"""远程部署/导出入口 — 在算力节点的训练容器里执行。"""
+import json
+import os
+import shutil
+from pathlib import Path
+
+os.environ["PYTORCH_NO_FLASH"] = "1"
+os.environ["MACA_MPS_MODE"] = "1"
+os.environ["METAX_VISIBLE_DEVICES"] = "2,3"
+
+_DATA_DIR = Path(os.environ.get("COMPUTE_NODE_REMOTE_DATA_DIR", "/root/Fine-tuning/backend/data"))
+_ADAPTERS_DIR = _DATA_DIR / "adapters"
+
+
+async def run_remote_export(job_id: str, merge_with_base: bool = False, export_format: str = "safetensors") -> dict:
+    """在远程容器里合并 adapter 或导出 GGUF。"""
+    adapter_path = _ADAPTERS_DIR / job_id
+    if not adapter_path.exists():
+        return {"error": f"Adapter not found: {adapter_path}"}
+
+    output_path = _ADAPTERS_DIR / f"{job_id}_merged"
+
+    try:
+        import torch
+        from transformers import AutoModelForCausalLM, AutoTokenizer
+
+        if merge_with_base:
+            from peft import PeftModel
+
+            base_model_id = _get_base_model_id(job_id)
+            if base_model_id:
+                visible_devices = os.environ.get("METAX_VISIBLE_DEVICES", "0")
+                first_gpu = int(visible_devices.split(",")[0])
+                device_map = {"": first_gpu}
+
+                base_model = AutoModelForCausalLM.from_pretrained(
+                    base_model_id, torch_dtype=torch.float16, device_map=device_map
+                )
+                peft_model = PeftModel.from_pretrained(base_model, adapter_path)
+                merged = peft_model.merge_and_unload()
+                merged.save_pretrained(output_path)
+                tokenizer = AutoTokenizer.from_pretrained(adapter_path)
+                tokenizer.save_pretrained(output_path)
+            else:
+                from peft import PeftModel
+
+                merged = PeftModel.from_pretrained(
+                    AutoModelForCausalLM.from_pretrained(
+                        str(adapter_path), torch_dtype=torch.float16, device_map={"": 0}
+                    ),
+                    adapter_path,
+                )
+                merged = merged.merge_and_unload()
+                merged.save_pretrained(output_path)
+                tokenizer = AutoTokenizer.from_pretrained(adapter_path)
+                tokenizer.save_pretrained(output_path)
+        else:
+            shutil.copytree(adapter_path, output_path)
+
+        result = {"output_path": str(output_path)}
+
+        if export_format == "gguf":
+            gguf_path = output_path.with_suffix(".gguf")
+            _export_to_gguf(output_path, gguf_path)
+            result["gguf_path"] = str(gguf_path)
+
+        return result
+
+    except Exception as e:
+        import traceback
+        return {"error": str(e), "traceback": traceback.format_exc()}
+
+
+def _get_base_model_id(job_id: str):
+    config_path = _ADAPTERS_DIR / job_id / "adapter_config.json"
+    if config_path.exists():
+        with open(config_path) as f:
+            return json.load(f).get("base_model_name_or_path")
+    return None
+
+
+def _export_to_gguf(model_path: Path, output_path: Path):
+    try:
+        import subprocess
+        result = subprocess.run(
+            ["python", "-m", "llama_cpp.convert_hf_to_gguf", str(model_path), "--outfile", str(output_path)],
+            capture_output=True, text=True, timeout=600,
+        )
+        if result.returncode != 0:
+            raise RuntimeError(result.stderr)
+    except Exception as e:
+        raise RuntimeError(f"GGUF export not available: {e}")

+ 56 - 0
backend/app/core/remote_eval.py

@@ -0,0 +1,56 @@
+"""远程评估入口 — 在算力节点的训练容器里执行。"""
+import json
+import os
+from pathlib import Path
+
+# 禁用 FlashAttention,启用 MPS
+os.environ["PYTORCH_NO_FLASH"] = "1"
+os.environ["MACA_MPS_MODE"] = "1"
+os.environ["METAX_VISIBLE_DEVICES"] = "2,3"
+
+_DATA_DIR = Path(os.environ.get("COMPUTE_NODE_REMOTE_DATA_DIR", "/root/Fine-tuning/backend/data"))
+_ADAPTERS_DIR = _DATA_DIR / "adapters"
+
+
+async def run_remote_eval(job_id: str) -> dict:
+    """在远程容器里加载 adapter,计算 perplexity。"""
+    adapter_path = _ADAPTERS_DIR / job_id
+    if not adapter_path.exists():
+        return {"error": f"Adapter not found: {adapter_path}"}
+
+    import torch
+    from transformers import AutoModelForCausalLM, AutoTokenizer
+
+    # 加载 adapter(沐曦 MPS 模式下固定用第一张物理 GPU)
+    visible_devices = os.environ.get("METAX_VISIBLE_DEVICES", "0")
+    first_gpu = int(visible_devices.split(",")[0])
+    device_map = {"": first_gpu}
+
+    model = AutoModelForCausalLM.from_pretrained(
+        adapter_path, torch_dtype=torch.float16, device_map=device_map
+    )
+    tokenizer = AutoTokenizer.from_pretrained(adapter_path, trust_remote_code=True)
+    model.eval()
+
+    sample_texts = [
+        "The quick brown fox jumps over the lazy dog.",
+        "Hello, how are you doing today?",
+    ]
+
+    losses = []
+    with torch.no_grad():
+        for text in sample_texts:
+            inputs = tokenizer(text, return_tensors="pt").to(model.device)
+            outputs = model(**inputs, labels=inputs["input_ids"])
+            losses.append(outputs.loss.item())
+
+    avg_loss = sum(losses) / len(losses) if losses else 0
+    perplexity = torch.exp(torch.tensor(avg_loss)).item() if avg_loss > 0 else 0
+
+    return {
+        "metrics": {
+            "eval_loss": round(avg_loss, 4),
+            "perplexity": round(perplexity, 2),
+            "num_samples": len(sample_texts),
+        }
+    }

+ 88 - 56
backend/app/services/deploy_service.py

@@ -6,6 +6,8 @@ from typing import Any
 from app.config import get_settings
 from app.core.db import async_session, DeployTaskModel
 from app.core.logging import logger
+from app.core.remote_executor import ssh_exec
+from sqlalchemy import select
 
 settings = get_settings()
 
@@ -16,12 +18,6 @@ async def export_adapter(job_id: str, config: dict[str, Any]) -> dict[str, Any]:
     merge_with_base = config.get("merge_with_base", False)
     export_format = config.get("export_format", "safetensors")
 
-    adapter_path = settings.adapters_dir / job_id
-    if not adapter_path.exists():
-        return {"job_id": job_id, "status": "failed", "output_path": None, "error": "Adapter not found"}
-
-    output_path = settings.adapters_dir / f"{job_id}_merged"
-
     # 写入数据库
     task = DeployTaskModel(
         id=task_id,
@@ -34,24 +30,32 @@ async def export_adapter(job_id: str, config: dict[str, Any]) -> dict[str, Any]:
         await session.commit()
 
     try:
+        # 远程模式:通过 SSH 在算力节点执行
+        if settings.use_remote_compute:
+            result = await _run_remote_export(task_id, job_id, merge_with_base, export_format)
+            return result
+
+        # 本地模式(原有逻辑)
+        adapter_path = settings.adapters_dir / job_id
+        if not adapter_path.exists():
+            return _update_task_status(task_id, "failed", error="Adapter not found")
+
+        output_path = settings.adapters_dir / f"{job_id}_merged"
+
         import torch
         from transformers import AutoModelForCausalLM, AutoTokenizer
 
         if merge_with_base:
-            # 加载 base model 并合并 adapter
-            base_model_id = _get_base_model_id(job_id)
+            base_model_id = _get_base_model_id_local(job_id)
             if base_model_id:
                 base_model = AutoModelForCausalLM.from_pretrained(
                     base_model_id, torch_dtype=torch.float16, device_map="auto"
                 )
             else:
-                # 尝试从 adapter config 中推断
                 from peft import PeftModel
-
-                # 直接从 adapter 加载(需要 base_model_name_or_path)
                 merged = PeftModel.from_pretrained(
                     AutoModelForCausalLM.from_pretrained(
-                        adapter_path / "adapter_config.json", torch_dtype=torch.float16
+                        str(adapter_path), torch_dtype=torch.float16
                     ),
                     adapter_path,
                 )
@@ -59,76 +63,89 @@ async def export_adapter(job_id: str, config: dict[str, Any]) -> dict[str, Any]:
                 merged.save_pretrained(output_path)
                 tokenizer = AutoTokenizer.from_pretrained(adapter_path)
                 tokenizer.save_pretrained(output_path)
-            logger.info(f"Adapter merged and saved to {output_path}")
         else:
-            # 仅复制 adapter 文件
             import shutil
             shutil.copytree(adapter_path, output_path)
-            logger.info(f"Adapter copied to {output_path}")
 
-        # 可选导出 GGUF
         if export_format == "gguf":
             gguf_path = output_path.with_suffix(".gguf")
-            _export_to_gguf(output_path, gguf_path)
+            _export_to_gguf_local(output_path, gguf_path)
 
-        # 更新数据库
-        async with async_session() as session:
-            from sqlalchemy import select
-            result = await session.execute(select(DeployTaskModel).where(DeployTaskModel.id == task_id))
-            record = result.scalar_one_or_none()
-            if record:
-                record.status = "completed"
-                record.output_path = str(output_path)
-                await session.commit()
-
-        return {"job_id": job_id, "status": "completed", "output_path": str(output_path)}
+        return _update_task_status(task_id, "completed", output_path=str(output_path))
 
     except Exception as e:
         logger.error(f"Export failed for job {job_id}: {e}")
+        return _update_task_status(task_id, "failed", error=str(e))
+
+
+async def _run_remote_export(task_id: str, job_id: str, merge_with_base: bool, export_format: str) -> dict:
+    """通过 SSH 在远程容器执行部署。"""
+    remote_cmd = (
+        f"docker exec "
+        f"-e MACA_MPS_MODE=1 "
+        f"-e METAX_VISIBLE_DEVICES=2,3 "
+        f"-w {settings.compute_node_workdir} "
+        f"{settings.compute_node_docker_container} "
+        f"{settings.compute_node_python} -c \""
+        "import asyncio, json; "
+        "from app.core.remote_deploy import run_remote_export; "
+        f"result = asyncio.run(run_remote_export('{job_id}', {merge_with_base}, '{export_format}')); "
+        "print(json.dumps(result, ensure_ascii=False))\" 2>&1"
+    )
+
+    code, stdout, stderr = ssh_exec(remote_cmd, timeout=600)
+
+    if code != 0:
+        logger.error(f"Remote export failed: {stderr}")
+        return _update_task_status(task_id, "failed", error=stderr.strip())
+
+    for line in reversed(stdout.strip().split("\n")):
+        line = line.strip()
+        if line.startswith("{"):
+            try:
+                result = json.loads(line)
+                if "error" in result:
+                    return _update_task_status(task_id, "failed", error=result["error"])
+                return _update_task_status(task_id, "completed", output_path=result.get("output_path"))
+            except json.JSONDecodeError:
+                continue
+
+    return _update_task_status(task_id, "failed", error=f"Invalid response: {stdout[:500]}")
+
+
+def _update_task_status(task_id: str, status: str, output_path: str = None, error: str = None):
+    import asyncio
+
+    async def _update():
         async with async_session() as session:
-            from sqlalchemy import select
             result = await session.execute(select(DeployTaskModel).where(DeployTaskModel.id == task_id))
             record = result.scalar_one_or_none()
             if record:
-                record.status = "failed"
-                record.error = str(e)
+                record.status = status
+                if output_path:
+                    record.output_path = output_path
+                if error:
+                    record.error = error
                 await session.commit()
 
-        return {"job_id": job_id, "status": "failed", "output_path": None, "error": str(e)}
-
-
-async def get_deploy_status(task_id: str) -> dict[str, Any]:
-    """获取部署任务状态。"""
-    async with async_session() as session:
-        from sqlalchemy import select
-        result = await session.execute(select(DeployTaskModel).where(DeployTaskModel.id == task_id))
-        record = result.scalar_one_or_none()
-        if record:
-            return {
-                "job_id": record.job_id,
-                "status": record.status,
-                "output_path": record.output_path,
-                "error": record.error,
-            }
-    return {"job_id": "", "status": "not_found", "output_path": None, "error": None}
+    asyncio.get_event_loop().run_until_complete(_update())
+    base = {"job_id": "", "status": status, "output_path": output_path}
+    if error:
+        base["error"] = error
+    return base
 
 
-def _get_base_model_id(job_id: str) -> str | None:
-    """从 adapter config 中获取 base model ID。"""
+def _get_base_model_id_local(job_id: str):
     config_path = settings.adapters_dir / job_id / "adapter_config.json"
     if config_path.exists():
         import json
         with open(config_path) as f:
-            cfg = json.load(f)
-        return cfg.get("base_model_name_or_path")
+            return json.load(f).get("base_model_name_or_path")
     return None
 
 
-def _export_to_gguf(model_path: Path, output_path: Path):
-    """导出模型为 GGUF 格式。"""
+def _export_to_gguf_local(model_path: Path, output_path: Path):
     try:
-        from llama_cpp import Llama
-        # 使用 llama-cpp-python 的 convert 工具
         import subprocess
         result = subprocess.run(
             ["python", "-m", "llama_cpp.convert_hf_to_gguf", str(model_path), "--outfile", str(output_path)],
@@ -138,3 +155,18 @@ def _export_to_gguf(model_path: Path, output_path: Path):
             logger.error(f"GGUF export failed: {result.stderr}")
     except Exception as e:
         logger.warning(f"GGUF export not available: {e}")
+
+
+async def get_deploy_status(task_id: str) -> dict[str, Any]:
+    """获取部署任务状态。"""
+    async with async_session() as session:
+        result = await session.execute(select(DeployTaskModel).where(DeployTaskModel.id == task_id))
+        record = result.scalar_one_or_none()
+        if record:
+            return {
+                "job_id": record.job_id,
+                "status": record.status,
+                "output_path": record.output_path,
+                "error": record.error,
+            }
+    return {"job_id": "", "status": "not_found", "output_path": None, "error": None}

+ 53 - 1
backend/app/services/eval_service.py

@@ -6,6 +6,7 @@ from typing import Any
 from app.config import get_settings
 from app.core.db import async_session, EvalResultModel
 from app.core.logging import logger
+from app.core.remote_executor import ssh_exec
 from sqlalchemy import select
 
 settings = get_settings()
@@ -14,6 +15,12 @@ settings = get_settings()
 async def run_evaluation(job_id: str, config: dict[str, Any]) -> dict[str, Any]:
     """在已训练的 adapter 上运行评估(perplexity)。"""
     eval_id = str(uuid.uuid4())
+
+    # 远程训练模式:把评估任务也发到远程容器执行
+    if settings.use_remote_compute:
+        logger.info(f"Running remote evaluation for job {job_id}")
+        return await _run_remote_evaluation(eval_id, job_id)
+
     adapter_path = settings.adapters_dir / job_id
 
     if not adapter_path.exists():
@@ -34,7 +41,7 @@ async def run_evaluation(job_id: str, config: dict[str, Any]) -> dict[str, Any]:
             record = result.scalar_one_or_none()
 
         if record:
-            dataset_path = record.dataset_id  # 这里简化处理,实际应从文件系统读取
+            dataset_path = record.dataset_id
 
         metrics = {}
         model.eval()
@@ -79,6 +86,51 @@ async def run_evaluation(job_id: str, config: dict[str, Any]) -> dict[str, Any]:
         return {"id": eval_id, "job_id": job_id, "metrics": {}, "created_at": "", "error": str(e)}
 
 
+async def _run_remote_evaluation(eval_id: str, job_id: str) -> dict[str, Any]:
+    """通过 SSH 在远程容器里执行评估。"""
+    remote_cmd = (
+        f"docker exec "
+        f"-e MACA_MPS_MODE=1 "
+        f"-e METAX_VISIBLE_DEVICES=2,3 "
+        f"-w {settings.compute_node_workdir} "
+        f"{settings.compute_node_docker_container} "
+        f"{settings.compute_node_python} -c \""
+        "import asyncio, json; "
+        "from app.core.remote_eval import run_remote_eval; "
+        f"result = asyncio.run(run_remote_eval('{job_id}')); "
+        "print(json.dumps(result, ensure_ascii=False))\" 2>&1"
+    )
+
+    code, stdout, stderr = ssh_exec(remote_cmd, timeout=300)
+
+    if code != 0:
+        logger.error(f"Remote evaluation failed: {stderr}")
+        return {"id": eval_id, "job_id": job_id, "metrics": {}, "created_at": "", "error": stderr.strip()}
+
+    # 提取最后一行 JSON
+    for line in reversed(stdout.strip().split("\n")):
+        line = line.strip()
+        if line.startswith("{"):
+            try:
+                result = json.loads(line)
+                # 保存结果到本地数据库
+                eval_record = EvalResultModel(
+                    id=eval_id,
+                    job_id=job_id,
+                    metrics=json.dumps(result.get("metrics", {})),
+                    created_at=datetime.utcnow(),
+                )
+                async with async_session() as session:
+                    session.add(eval_record)
+                    await session.commit()
+                return {"id": eval_id, "job_id": job_id, "metrics": result.get("metrics", {}),
+                        "created_at": eval_record.created_at.isoformat()}
+            except json.JSONDecodeError:
+                continue
+
+    return {"id": eval_id, "job_id": job_id, "metrics": {}, "created_at": "", "error": f"Invalid response: {stdout[:500]}"}
+
+
 async def get_evaluation_results(eval_id: str) -> dict[str, Any]:
     """获取已完成评估的结果。"""
     async with async_session() as session:

+ 7 - 1
backend/app/services/inference_service.py

@@ -70,6 +70,7 @@ def _generate_local(
 ) -> dict[str, Any]:
     """本地执行推理。"""
     try:
+        import os
         import torch
         from transformers import AutoModelForCausalLM, AutoTokenizer
         from peft import PeftModel
@@ -78,10 +79,15 @@ def _generate_local(
         if tokenizer.pad_token is None:
             tokenizer.pad_token = tokenizer.eos_token
 
+        # 沐曦 MPS 模式下固定用第一张物理 GPU
+        visible_devices = os.environ.get("METAX_VISIBLE_DEVICES", "0")
+        first_gpu = int(visible_devices.split(",")[0])
+        device_map = {"": first_gpu}
+
         base_model = AutoModelForCausalLM.from_pretrained(
             base_model_id,
             torch_dtype=torch.float16,
-            device_map="auto",
+            device_map=device_map,
         )
         model = PeftModel.from_pretrained(base_model, adapter_path)
         model.eval()