|
|
@@ -17,8 +17,7 @@ async def test_model(model_id: str, prompt: str, max_new_tokens: int = 128, temp
|
|
|
async def _test_model_remote(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]:
|
|
|
"""在算力节点容器内执行模型测试(通过 SSH + docker exec)。
|
|
|
|
|
|
- 方案:通过 SSH 在远端容器内直接执行 Python 单行命令,
|
|
|
- 所有参数通过环境变量传入,避免任何引号/转义问题。
|
|
|
+ 方案:将 Python 脚本写入容器临时文件执行,避免 stdin 管道缓冲区限制。
|
|
|
"""
|
|
|
import base64
|
|
|
import json
|
|
|
@@ -33,10 +32,16 @@ async def _test_model_remote(model_id: str, prompt: str, max_new_tokens: int, te
|
|
|
do_sample = str(temperature > 0).lower()
|
|
|
|
|
|
# 独立脚本:零 app/db 依赖,参数全部通过环境变量传入
|
|
|
+ # 开头通过 OS 级别重定向 fd 1 到 /dev/null,抑制 C 层调试输出
|
|
|
+ # 最后恢复 fd 1 以打印 JSON
|
|
|
script = rf"""\
|
|
|
-import warnings, json, os, base64, sys
|
|
|
+import os, sys, json, warnings, base64
|
|
|
+# 保存原始 fd 1(docker exec 的 stdout pipe),然后重定向到 /dev/null
|
|
|
+_orig_fd1 = os.dup(1)
|
|
|
+_devnull = os.open(os.devnull, os.O_WRONLY)
|
|
|
+os.dup2(_devnull, 1)
|
|
|
+os.close(_devnull)
|
|
|
warnings.filterwarnings('ignore')
|
|
|
-warnings.filterwarnings('ignore', category=FutureWarning)
|
|
|
os.environ['PYTHONWARNINGS'] = 'ignore'
|
|
|
os.environ['TRANSFORMERS_VERBOSITY'] = 'error'
|
|
|
os.environ['TRANSFORMERS_NO_ADVISORY_WARNINGS'] = 'true'
|
|
|
@@ -47,7 +52,6 @@ tf_logging.set_verbosity_error()
|
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModel
|
|
|
|
|
|
def find_model_path(model_id):
|
|
|
- # 远端实际存储路径(与 model_service.resolve_model_path 一致)
|
|
|
for base in [
|
|
|
'/root/Fine-tuning/backend/data/models',
|
|
|
'/root/.cache/huggingface/hub',
|
|
|
@@ -57,14 +61,11 @@ def find_model_path(model_id):
|
|
|
bp = Path(base)
|
|
|
if not bp.is_dir():
|
|
|
continue
|
|
|
- # 尝试 namespace_name 扁平化匹配(HF 风格)
|
|
|
flat_name = model_id.replace("/", "_")
|
|
|
if (bp / flat_name / "config.json").exists():
|
|
|
return str(bp / flat_name)
|
|
|
- # 尝试 namespace/name 嵌套匹配(ModelScope 风格)
|
|
|
if (bp / model_id / "config.json").exists():
|
|
|
return str(bp / model_id)
|
|
|
- # 扫描所有目录
|
|
|
try:
|
|
|
for child in bp.rglob("config.json"):
|
|
|
if child.parent.is_dir():
|
|
|
@@ -82,23 +83,12 @@ do_sample = os.environ.get('DO_SAMPLE', 'true').lower() == 'true'
|
|
|
|
|
|
model_path = find_model_path(model_id)
|
|
|
if model_path is None:
|
|
|
- print(json.dumps({{'error': f'Model not found in cache: {{model_id}}'}}))
|
|
|
+ sys.stderr.write(json.dumps({{'error': f'Model not found: {{model_id}}'}}) + '\\n')
|
|
|
exit(1)
|
|
|
|
|
|
-# 抑制模型加载时的调试输出(Qwen3.5 等模型会直接 print 到 stdout)
|
|
|
-_original_stdout = sys.stdout
|
|
|
-
|
|
|
-class _SilentStdout:
|
|
|
- def write(self, *args, **kwargs):
|
|
|
- pass
|
|
|
- def flush(self, *args, **kwargs):
|
|
|
- pass
|
|
|
-
|
|
|
-sys.stdout = _SilentStdout()
|
|
|
t = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
|
|
|
t.pad_token = t.pad_token or t.eos_token
|
|
|
|
|
|
-# 判断 accelerate 是否可用,决定加载策略
|
|
|
has_accelerate = False
|
|
|
try:
|
|
|
import accelerate
|
|
|
@@ -124,12 +114,14 @@ for cls, kw in [(AutoModelForCausalLM, {{'trust_remote_code': True}}), (AutoMode
|
|
|
if m is not None:
|
|
|
break
|
|
|
|
|
|
-sys.stdout = _original_stdout
|
|
|
-
|
|
|
if m is None:
|
|
|
- print(json.dumps({{'error': 'Unable to load model', 'details': load_errors}}))
|
|
|
+ sys.stderr.write(json.dumps({{'error': 'Unable to load model', 'details': load_errors}}) + '\\n')
|
|
|
exit(1)
|
|
|
|
|
|
+# 恢复 fd 1 到原始 stdout(docker exec 的 pipe)
|
|
|
+os.dup2(_orig_fd1, 1)
|
|
|
+os.close(_orig_fd1)
|
|
|
+
|
|
|
m.eval()
|
|
|
device = next(m.parameters()).device
|
|
|
inp = t(prompt, return_tensors='pt').to(device)
|
|
|
@@ -140,20 +132,24 @@ print(json.dumps({{'generated_text': gen}}))
|
|
|
|
|
|
script_b64 = base64.b64encode(script.encode()).decode()
|
|
|
|
|
|
- # 通过环境变量传递参数,脚本通过 stdin 管道传入容器内的 Python
|
|
|
- remote_cmd = (
|
|
|
- f"echo {script_b64} | base64 -d | "
|
|
|
- f"docker exec -i -w {workdir} "
|
|
|
+ # 先将脚本写入容器内的临时文件,再执行,避免 echo | pipe 的缓冲区限制
|
|
|
+ script_path = f"/tmp/test_model_{model_id.replace('/', '_')}.py"
|
|
|
+ write_cmd = (
|
|
|
+ f"echo {script_b64} | base64 -d > {script_path} && "
|
|
|
+ f"docker exec -w {workdir} "
|
|
|
f"-e MODEL_ID={model_id} "
|
|
|
f"-e PROMPT_B64={prompt_b64} "
|
|
|
f"-e MAX_TOKENS={max_new_tokens} "
|
|
|
f"-e TEMPERATURE={temperature} "
|
|
|
f"-e TOP_P={top_p} "
|
|
|
f"-e DO_SAMPLE={do_sample} "
|
|
|
- f"{container} {python}"
|
|
|
+ f"{container} {python} {script_path}"
|
|
|
)
|
|
|
|
|
|
- code, stdout, stderr = ssh_exec(remote_cmd, timeout=600)
|
|
|
+ code, stdout, stderr = ssh_exec(write_cmd, timeout=600)
|
|
|
+
|
|
|
+ # 清理临时文件
|
|
|
+ ssh_exec(f"docker exec {container} rm -f {script_path}", timeout=5)
|
|
|
|
|
|
logger.info(f"Remote test result: code={code}, stdout_len={len(stdout)}, stderr_len={len(stderr)}")
|
|
|
if stdout:
|