from pathlib import Path from typing import Any from app.config import get_settings from app.core.logging import logger settings = get_settings() async def test_model(model_id: str, prompt: str, max_new_tokens: int = 128, temperature: float = 0.8, top_p: float = 0.95) -> dict[str, Any]: """加载已缓存模型并生成测试响应。""" if settings.use_remote_compute: return await _test_model_remote(model_id, prompt, max_new_tokens, temperature, top_p) return await _test_model_local(model_id, prompt, max_new_tokens, temperature, top_p) async def _test_model_remote(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]: """在算力节点容器内执行模型测试(通过 SSH + docker exec)。 方案:将 Python 脚本写入容器临时文件执行,避免 stdin 管道缓冲区限制。 """ import base64 import json from app.core.remote_executor import ssh_exec container = settings.compute_node_docker_container python = settings.compute_node_python workdir = settings.compute_node_workdir # 将 prompt 进行 base64 编码,避免引号/特殊字符问题 prompt_b64 = base64.b64encode(prompt.encode("utf-8")).decode() do_sample = str(temperature > 0).lower() # 独立脚本:零 app/db 依赖,参数全部通过环境变量传入 # 开头通过 OS 级别重定向 fd 1 到 /dev/null,抑制 C 层调试输出 # 最后恢复 fd 1 以打印 JSON script = rf"""\ import os, sys, json, warnings, base64 # 保存原始 fd 1(docker exec 的 stdout pipe),然后重定向到 /dev/null _orig_fd1 = os.dup(1) _devnull = os.open(os.devnull, os.O_WRONLY) os.dup2(_devnull, 1) os.close(_devnull) warnings.filterwarnings('ignore') os.environ['PYTHONWARNINGS'] = 'ignore' os.environ['TRANSFORMERS_VERBOSITY'] = 'error' os.environ['TRANSFORMERS_NO_ADVISORY_WARNINGS'] = 'true' from pathlib import Path import torch from transformers import logging as tf_logging tf_logging.set_verbosity_error() from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModel def find_model_path(model_id): for base in [ '/root/Fine-tuning/backend/data/models', '/root/.cache/huggingface/hub', '/root/.cache/modelscope/hub', '/root/models', ]: bp = Path(base) if not bp.is_dir(): continue flat_name = model_id.replace("/", "_") if (bp / flat_name / "config.json").exists(): return str(bp / flat_name) if (bp / model_id / "config.json").exists(): return str(bp / model_id) try: for child in bp.rglob("config.json"): if child.parent.is_dir(): return str(child.parent) except Exception: pass return None model_id = os.environ.get('MODEL_ID', '') prompt = base64.b64decode(os.environ.get('PROMPT_B64', '')).decode('utf-8') max_new_tokens = int(os.environ.get('MAX_TOKENS', '128')) temperature = float(os.environ.get('TEMPERATURE', '0.8')) top_p = float(os.environ.get('TOP_P', '0.95')) do_sample = os.environ.get('DO_SAMPLE', 'true').lower() == 'true' model_path = find_model_path(model_id) if model_path is None: sys.stderr.write(json.dumps({{'error': f'Model not found: {{model_id}}'}}) + '\\n') exit(1) t = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) t.pad_token = t.pad_token or t.eos_token has_accelerate = False try: import accelerate has_accelerate = True except ImportError: pass m = None load_errors = [] device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') for cls, kw in [(AutoModelForCausalLM, {{'trust_remote_code': True}}), (AutoModel, {{'trust_remote_code': True}})]: for dtype_val, dtype_name in [(torch.float16, 'float16'), (torch.float32, 'float32')]: try: if has_accelerate: m = cls.from_pretrained(model_path, dtype=dtype_val, device_map='auto', **kw) else: m = cls.from_pretrained(model_path, dtype=dtype_val, device_map=None, **kw) m = m.to(device) break except Exception as e: load_errors.append(f'{{cls.__name__}} {{dtype_name}}: {{str(e)[:200]}}') if m is not None: break if m is None: sys.stderr.write(json.dumps({{'error': 'Unable to load model', 'details': load_errors}}) + '\\n') exit(1) # 恢复 fd 1 到原始 stdout(docker exec 的 pipe) os.dup2(_orig_fd1, 1) os.close(_orig_fd1) m.eval() device = next(m.parameters()).device inp = t(prompt, return_tensors='pt').to(device) out = m.generate(**inp, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p, do_sample=do_sample, pad_token_id=t.eos_token_id) gen = t.decode(out[0][inp['input_ids'].shape[1]:], skip_special_tokens=True) print(json.dumps({{'generated_text': gen}})) """ script_b64 = base64.b64encode(script.encode()).decode() # 通过 docker exec -i 将解码后的脚本内容传入容器内的 cat,写入临时文件后执行 script_path = f"/tmp/test_model_{model_id.replace('/', '_')}.py" remote_cmd = ( f"echo '{script_b64}' | base64 -d | " f"docker exec -i -w {workdir} " f"-e MODEL_ID={model_id} " f"-e PROMPT_B64={prompt_b64} " f"-e MAX_TOKENS={max_new_tokens} " f"-e TEMPERATURE={temperature} " f"-e TOP_P={top_p} " f"-e DO_SAMPLE={do_sample} " f"{container} bash -c 'cat > {script_path} && {python} {script_path}'" ) code, stdout, stderr = ssh_exec(remote_cmd, timeout=600) # 清理容器内临时文件 ssh_exec(f"docker exec {container} rm -f {script_path}", timeout=5) logger.info(f"Remote test result: code={code}, stdout_len={len(stdout)}, stderr_len={len(stderr)}") if stdout: logger.info(f"stdout (first 500): {stdout[:500]}") if stderr: logger.info(f"stderr (first 500): {stderr[:500]}") if code != 0: logger.error(f"Remote model test failed: {stderr}") return {"error": stderr.strip() or "Remote test failed"} for line in reversed(stdout.strip().split("\n")): line = line.strip() if line.startswith("{"): try: result = json.loads(line) result["model_id"] = model_id result["prompt"] = prompt return result except json.JSONDecodeError: continue return {"error": f"Invalid response: {stdout[:500]}"} async def _test_model_local(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]: """本地执行模型测试(仅用于开发环境)。""" import torch from transformers import AutoModel, AutoModelForCausalLM, AutoTokenizer, AutoConfig from app.services.model_service import resolve_model_path model_path = await resolve_model_path(model_id) if not model_path: return {"error": f"Model not found in cache: {model_id}"} model_dir = Path(model_path) if not (model_dir / "config.json").exists(): return {"error": f"Model directory not found: {model_dir}"} tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token model = None for loader_cls, kwargs in [ (AutoModelForCausalLM, {"trust_remote_code": True}), (AutoModel, {"trust_remote_code": True}), ]: try: model = loader_cls.from_pretrained( model_dir, torch_dtype=torch.float16, device_map="auto", **kwargs, ) break except Exception: continue if model is None: return {"error": f"Unable to load model with any available loader. Model type may not be supported yet."} model.eval() inputs = tokenizer(prompt, return_tensors="pt").to(model.device) with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p, do_sample=temperature > 0, pad_token_id=tokenizer.eos_token_id, ) generated_text = tokenizer.decode(outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True) return { "model_id": model_id, "prompt": prompt, "generated_text": generated_text, }