model_test_service.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. from pathlib import Path
  2. from typing import Any
  3. from app.config import get_settings
  4. from app.core.logging import logger
  5. settings = get_settings()
  6. async def test_model(model_id: str, prompt: str, max_new_tokens: int = 128, temperature: float = 0.8, top_p: float = 0.95) -> dict[str, Any]:
  7. """加载已缓存模型并生成测试响应。"""
  8. if settings.use_remote_compute:
  9. return _test_model_remote(model_id, prompt, max_new_tokens, temperature, top_p)
  10. return _test_model_local(model_id, prompt, max_new_tokens, temperature, top_p)
  11. def _test_model_remote(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]:
  12. """通过 SSH 在算力节点执行模型测试。
  13. 单次 SSH 命令:echo base64 | docker exec -i python,不依赖 scp/docker cp/heredoc。
  14. """
  15. import base64
  16. import json
  17. from app.core.remote_executor import ssh_exec
  18. container = settings.compute_node_docker_container
  19. python = settings.compute_node_python
  20. workdir = settings.compute_node_workdir
  21. # 独立的 Python 脚本(通过 sys.argv 接收参数,避免引号嵌套)
  22. script = """\
  23. import json, sys
  24. from pathlib import Path
  25. import torch
  26. from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModel
  27. def find_model_path(model_id):
  28. for base in ['/root/.cache/huggingface/hub', '/root/.cache/modelscope/hub', '/root/models']:
  29. bp = Path(base)
  30. if not bp.is_dir():
  31. continue
  32. for child in bp.rglob('config.json'):
  33. if child.parent.is_dir():
  34. return str(child.parent)
  35. return None
  36. model_path = find_model_path(sys.argv[1])
  37. if model_path is None:
  38. print(json.dumps({'error': 'Model not found in cache'}))
  39. sys.exit(1)
  40. t = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
  41. t.pad_token = t.pad_token or t.eos_token
  42. m = None
  43. for cls, kw in [(AutoModelForCausalLM, {'trust_remote_code': True}), (AutoModel, {'trust_remote_code': True})]:
  44. try:
  45. m = cls.from_pretrained(model_path, torch_dtype=torch.float16, device_map='auto', **kw)
  46. break
  47. except Exception:
  48. pass
  49. if m is None:
  50. print(json.dumps({'error': 'Unable to load model'}))
  51. sys.exit(1)
  52. m.eval()
  53. inp = t(sys.argv[2], return_tensors='pt').to(m.device)
  54. out = m.generate(**inp, max_new_tokens=int(sys.argv[3]), temperature=float(sys.argv[4]), top_p=float(sys.argv[5]), do_sample=float(sys.argv[4]) > 0, pad_token_id=t.eos_token_id)
  55. print(json.dumps({'generated_text': t.decode(out[0][inp['input_ids'].shape[1]:], skip_special_tokens=True)}))
  56. """
  57. script_b64 = base64.b64encode(script.encode()).decode()
  58. # 单次 SSH:echo base64 → docker exec -i python,不创建任何中间文件
  59. remote_cmd = f"echo {script_b64} | base64 -d | docker exec -i -w {workdir} {container} {python}"
  60. code, stdout, stderr = ssh_exec(remote_cmd, timeout=600)
  61. logger.info(f"Remote test result: code={code}, stdout_len={len(stdout)}, stderr_len={len(stderr)}")
  62. if stdout:
  63. logger.info(f"stdout (first 500): {stdout[:500]}")
  64. if stderr:
  65. logger.info(f"stderr (first 500): {stderr[:500]}")
  66. if code != 0:
  67. logger.error(f"Remote model test failed: {stderr}")
  68. return {"error": stderr.strip() or "Remote test failed"}
  69. # 提取最后一行 JSON
  70. for line in reversed(stdout.strip().split("\n")):
  71. line = line.strip()
  72. if line.startswith("{"):
  73. try:
  74. result = json.loads(line)
  75. result["model_id"] = model_id
  76. result["prompt"] = prompt
  77. return result
  78. except json.JSONDecodeError:
  79. continue
  80. return {"error": f"Invalid response: {stdout[:500]}"}
  81. async def _test_model_local(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]:
  82. """本地执行模型测试(仅用于开发环境)。"""
  83. import torch
  84. from transformers import AutoModel, AutoModelForCausalLM, AutoTokenizer, AutoConfig
  85. from app.services.model_service import resolve_model_path
  86. model_path = await resolve_model_path(model_id)
  87. if not model_path:
  88. return {"error": f"Model not found in cache: {model_id}"}
  89. model_dir = Path(model_path)
  90. if not (model_dir / "config.json").exists():
  91. return {"error": f"Model directory not found: {model_dir}"}
  92. tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True)
  93. if tokenizer.pad_token is None:
  94. tokenizer.pad_token = tokenizer.eos_token
  95. # 通用加载策略:尝试多种加载方式,自动兼容各种新架构
  96. model = None
  97. for loader_cls, kwargs in [
  98. (AutoModelForCausalLM, {"trust_remote_code": True}),
  99. (AutoModel, {"trust_remote_code": True}),
  100. ]:
  101. try:
  102. model = loader_cls.from_pretrained(
  103. model_dir,
  104. torch_dtype=torch.float16,
  105. device_map="auto",
  106. **kwargs,
  107. )
  108. break
  109. except Exception:
  110. continue
  111. if model is None:
  112. return {"error": f"Unable to load model with any available loader. Model type may not be supported yet."}
  113. model.eval()
  114. inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
  115. with torch.no_grad():
  116. outputs = model.generate(
  117. **inputs,
  118. max_new_tokens=max_new_tokens,
  119. temperature=temperature,
  120. top_p=top_p,
  121. do_sample=temperature > 0,
  122. pad_token_id=tokenizer.eos_token_id,
  123. )
  124. generated_text = tokenizer.decode(outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)
  125. return {
  126. "model_id": model_id,
  127. "prompt": prompt,
  128. "generated_text": generated_text,
  129. }