| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- from pathlib import Path
- from typing import Any
- from app.config import get_settings
- from app.core.logging import logger
- settings = get_settings()
- async def test_model(model_id: str, prompt: str, max_new_tokens: int = 128, temperature: float = 0.8, top_p: float = 0.95) -> dict[str, Any]:
- """加载已缓存模型并生成测试响应。"""
- if settings.use_remote_compute:
- return await _test_model_remote(model_id, prompt, max_new_tokens, temperature, top_p)
- return await _test_model_local(model_id, prompt, max_new_tokens, temperature, top_p)
- async def _test_model_remote(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]:
- """在算力节点容器内执行模型测试(通过 SSH + docker exec)。
- 方案:将 Python 脚本写入容器临时文件执行,避免 stdin 管道缓冲区限制。
- """
- import base64
- import json
- from app.core.remote_executor import ssh_exec
- container = settings.compute_node_docker_container
- python = settings.compute_node_python
- workdir = settings.compute_node_workdir
- # 将 prompt 进行 base64 编码,避免引号/特殊字符问题
- prompt_b64 = base64.b64encode(prompt.encode("utf-8")).decode()
- do_sample = str(temperature > 0).lower()
- # 独立脚本:零 app/db 依赖,参数全部通过环境变量传入
- # 开头通过 OS 级别重定向 fd 1 到 /dev/null,抑制 C 层调试输出
- # 最后恢复 fd 1 以打印 JSON
- script = rf"""\
- import os, sys, json, warnings, base64
- # 保存原始 fd 1(docker exec 的 stdout pipe),然后重定向到 /dev/null
- _orig_fd1 = os.dup(1)
- _devnull = os.open(os.devnull, os.O_WRONLY)
- os.dup2(_devnull, 1)
- os.close(_devnull)
- warnings.filterwarnings('ignore')
- os.environ['PYTHONWARNINGS'] = 'ignore'
- os.environ['TRANSFORMERS_VERBOSITY'] = 'error'
- os.environ['TRANSFORMERS_NO_ADVISORY_WARNINGS'] = 'true'
- from pathlib import Path
- import torch
- from transformers import logging as tf_logging
- tf_logging.set_verbosity_error()
- from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModel
- def find_model_path(model_id):
- for base in [
- '/root/Fine-tuning/backend/data/models',
- '/root/.cache/huggingface/hub',
- '/root/.cache/modelscope/hub',
- '/root/models',
- ]:
- bp = Path(base)
- if not bp.is_dir():
- continue
- flat_name = model_id.replace("/", "_")
- if (bp / flat_name / "config.json").exists():
- return str(bp / flat_name)
- if (bp / model_id / "config.json").exists():
- return str(bp / model_id)
- try:
- for child in bp.rglob("config.json"):
- if child.parent.is_dir():
- return str(child.parent)
- except Exception:
- pass
- return None
- model_id = os.environ.get('MODEL_ID', '')
- prompt = base64.b64decode(os.environ.get('PROMPT_B64', '')).decode('utf-8')
- max_new_tokens = int(os.environ.get('MAX_TOKENS', '128'))
- temperature = float(os.environ.get('TEMPERATURE', '0.8'))
- top_p = float(os.environ.get('TOP_P', '0.95'))
- do_sample = os.environ.get('DO_SAMPLE', 'true').lower() == 'true'
- model_path = find_model_path(model_id)
- if model_path is None:
- sys.stderr.write(json.dumps({{'error': f'Model not found: {{model_id}}'}}) + '\\n')
- exit(1)
- t = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
- t.pad_token = t.pad_token or t.eos_token
- has_accelerate = False
- try:
- import accelerate
- has_accelerate = True
- except ImportError:
- pass
- m = None
- load_errors = []
- device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
- for cls, kw in [(AutoModelForCausalLM, {{'trust_remote_code': True}}), (AutoModel, {{'trust_remote_code': True}})]:
- for dtype_val, dtype_name in [(torch.float16, 'float16'), (torch.float32, 'float32')]:
- try:
- if has_accelerate:
- m = cls.from_pretrained(model_path, dtype=dtype_val, device_map='auto', **kw)
- else:
- m = cls.from_pretrained(model_path, dtype=dtype_val, device_map=None, **kw)
- m = m.to(device)
- break
- except Exception as e:
- load_errors.append(f'{{cls.__name__}} {{dtype_name}}: {{str(e)[:200]}}')
- if m is not None:
- break
- if m is None:
- sys.stderr.write(json.dumps({{'error': 'Unable to load model', 'details': load_errors}}) + '\\n')
- exit(1)
- # 恢复 fd 1 到原始 stdout(docker exec 的 pipe)
- os.dup2(_orig_fd1, 1)
- os.close(_orig_fd1)
- m.eval()
- device = next(m.parameters()).device
- inp = t(prompt, return_tensors='pt').to(device)
- out = m.generate(**inp, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p, do_sample=do_sample, pad_token_id=t.eos_token_id)
- gen = t.decode(out[0][inp['input_ids'].shape[1]:], skip_special_tokens=True)
- print(json.dumps({{'generated_text': gen}}))
- """
- script_b64 = base64.b64encode(script.encode()).decode()
- # 通过 docker exec -i 将解码后的脚本内容传入容器内的 cat,写入临时文件后执行
- script_path = f"/tmp/test_model_{model_id.replace('/', '_')}.py"
- remote_cmd = (
- f"echo '{script_b64}' | base64 -d | "
- f"docker exec -i -w {workdir} "
- f"-e MODEL_ID={model_id} "
- f"-e PROMPT_B64={prompt_b64} "
- f"-e MAX_TOKENS={max_new_tokens} "
- f"-e TEMPERATURE={temperature} "
- f"-e TOP_P={top_p} "
- f"-e DO_SAMPLE={do_sample} "
- f"{container} bash -c 'cat > {script_path} && {python} {script_path}'"
- )
- code, stdout, stderr = ssh_exec(remote_cmd, timeout=600)
- # 清理容器内临时文件
- ssh_exec(f"docker exec {container} rm -f {script_path}", timeout=5)
- logger.info(f"Remote test result: code={code}, stdout_len={len(stdout)}, stderr_len={len(stderr)}")
- if stdout:
- logger.info(f"stdout (first 500): {stdout[:500]}")
- if stderr:
- logger.info(f"stderr (first 500): {stderr[:500]}")
- if code != 0:
- logger.error(f"Remote model test failed: {stderr}")
- return {"error": stderr.strip() or "Remote test failed"}
- for line in reversed(stdout.strip().split("\n")):
- line = line.strip()
- if line.startswith("{"):
- try:
- result = json.loads(line)
- result["model_id"] = model_id
- result["prompt"] = prompt
- return result
- except json.JSONDecodeError:
- continue
- return {"error": f"Invalid response: {stdout[:500]}"}
- async def _test_model_local(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]:
- """本地执行模型测试(仅用于开发环境)。"""
- import torch
- from transformers import AutoModel, AutoModelForCausalLM, AutoTokenizer, AutoConfig
- from app.services.model_service import resolve_model_path
- model_path = await resolve_model_path(model_id)
- if not model_path:
- return {"error": f"Model not found in cache: {model_id}"}
- model_dir = Path(model_path)
- if not (model_dir / "config.json").exists():
- return {"error": f"Model directory not found: {model_dir}"}
- tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True)
- if tokenizer.pad_token is None:
- tokenizer.pad_token = tokenizer.eos_token
- model = None
- for loader_cls, kwargs in [
- (AutoModelForCausalLM, {"trust_remote_code": True}),
- (AutoModel, {"trust_remote_code": True}),
- ]:
- try:
- model = loader_cls.from_pretrained(
- model_dir,
- torch_dtype=torch.float16,
- device_map="auto",
- **kwargs,
- )
- break
- except Exception:
- continue
- if model is None:
- return {"error": f"Unable to load model with any available loader. Model type may not be supported yet."}
- model.eval()
- inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
- with torch.no_grad():
- outputs = model.generate(
- **inputs,
- max_new_tokens=max_new_tokens,
- temperature=temperature,
- top_p=top_p,
- do_sample=temperature > 0,
- pad_token_id=tokenizer.eos_token_id,
- )
- generated_text = tokenizer.decode(outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)
- return {
- "model_id": model_id,
- "prompt": prompt,
- "generated_text": generated_text,
- }
|