model_test_service.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. from pathlib import Path
  2. from typing import Any
  3. from app.config import get_settings
  4. from app.core.logging import logger
  5. settings = get_settings()
  6. async def test_model(model_id: str, prompt: str, max_new_tokens: int = 128, temperature: float = 0.8, top_p: float = 0.95) -> dict[str, Any]:
  7. """加载已缓存模型并生成测试响应。"""
  8. if settings.use_remote_compute:
  9. return _test_model_remote(model_id, prompt, max_new_tokens, temperature, top_p)
  10. return _test_model_local(model_id, prompt, max_new_tokens, temperature, top_p)
  11. def _test_model_remote(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]:
  12. """通过 SSH 在算力节点执行模型测试。
  13. 先 scp 脚本到远端,再 docker exec 执行文件,完全避开 heredoc/引号/管道问题。
  14. """
  15. import json
  16. import tempfile
  17. from app.core.remote_executor import scp_to_remote, ssh_exec
  18. container = settings.compute_node_docker_container
  19. python = settings.compute_node_python
  20. workdir = settings.compute_node_workdir
  21. # 独立的模型测试脚本内容(零 app/db 依赖)
  22. python_script = """\
  23. import json, sys
  24. from pathlib import Path
  25. import torch
  26. from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModel
  27. def find_model_path(model_id):
  28. candidates = [
  29. '/root/.cache/huggingface/hub',
  30. '/root/.cache/modelscope/hub',
  31. '/root/models',
  32. ]
  33. for base in candidates:
  34. bp = Path(base)
  35. if not bp.is_dir():
  36. continue
  37. for child in bp.rglob('config.json'):
  38. parent = child.parent
  39. if parent.is_dir():
  40. return str(parent)
  41. return None
  42. model_id = sys.argv[1]
  43. prompt = sys.argv[2]
  44. max_new_tokens = int(sys.argv[3])
  45. temperature = float(sys.argv[4])
  46. top_p = float(sys.argv[5])
  47. model_path = find_model_path(model_id)
  48. if model_path is None:
  49. print(json.dumps({'error': 'Model not found in cache'}))
  50. sys.exit(1)
  51. t = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
  52. t.pad_token = t.pad_token or t.eos_token
  53. m = None
  54. for cls, kw in [
  55. (AutoModelForCausalLM, {'trust_remote_code': True}),
  56. (AutoModel, {'trust_remote_code': True}),
  57. ]:
  58. try:
  59. m = cls.from_pretrained(model_path, torch_dtype=torch.float16, device_map='auto', **kw)
  60. break
  61. except Exception:
  62. pass
  63. if m is None:
  64. print(json.dumps({'error': 'Unable to load model'}))
  65. sys.exit(1)
  66. m.eval()
  67. inp = t(prompt, return_tensors='pt').to(m.device)
  68. out = m.generate(**inp, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p, do_sample=%s, pad_token_id=t.eos_token_id)
  69. gen = t.decode(out[0][inp['input_ids'].shape[1]:], skip_special_tokens=True)
  70. print(json.dumps({'generated_text': gen}))
  71. """ % str(temperature > 0).lower()
  72. # 写入本地临时文件 → scp 到远端 → docker exec 执行 → 清理
  73. remote_script = "/tmp/remote_model_test.py"
  74. with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False, encoding="utf-8") as tmp:
  75. tmp.write(python_script)
  76. tmp.flush()
  77. tmp_path = tmp.name
  78. try:
  79. code, out, err = scp_to_remote(tmp_path, remote_script)
  80. if code != 0:
  81. logger.error(f"SCP failed: {err}")
  82. return {"error": f"Failed to upload script: {err.strip()}"}
  83. remote_cmd = f"docker exec -w {workdir} {container} {python} {remote_script} '{model_id}' '{prompt.replace(chr(39), chr(92)+chr(39))}' {max_new_tokens} {temperature} {top_p}"
  84. code, stdout, stderr = ssh_exec(remote_cmd, timeout=600)
  85. if code != 0:
  86. logger.error(f"Remote model test failed: {stderr}")
  87. return {"error": stderr.strip() or "Remote test failed"}
  88. finally:
  89. import os
  90. os.unlink(tmp_path)
  91. ssh_exec(f"rm -f {remote_script}", timeout=10)
  92. logger.info(f"Remote test result: code={code}, stdout_len={len(stdout)}, stderr_len={len(stderr)}")
  93. if stdout:
  94. logger.info(f"stdout (first 500): {stdout[:500]}")
  95. if stderr:
  96. logger.info(f"stderr (first 500): {stderr[:500]}")
  97. if code != 0:
  98. logger.error(f"Remote model test failed: {stderr}")
  99. return {"error": stderr.strip() or "Remote test failed"}
  100. # 提取最后一行 JSON
  101. for line in reversed(stdout.strip().split("\n")):
  102. line = line.strip()
  103. if line.startswith("{"):
  104. try:
  105. result = json.loads(line)
  106. result["model_id"] = model_id
  107. result["prompt"] = prompt
  108. return result
  109. except json.JSONDecodeError:
  110. continue
  111. return {"error": f"Invalid response: {stdout[:500]}"}
  112. async def _test_model_local(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]:
  113. """本地执行模型测试(仅用于开发环境)。"""
  114. import torch
  115. from transformers import AutoModel, AutoModelForCausalLM, AutoTokenizer, AutoConfig
  116. from app.services.model_service import resolve_model_path
  117. model_path = await resolve_model_path(model_id)
  118. if not model_path:
  119. return {"error": f"Model not found in cache: {model_id}"}
  120. model_dir = Path(model_path)
  121. if not (model_dir / "config.json").exists():
  122. return {"error": f"Model directory not found: {model_dir}"}
  123. tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True)
  124. if tokenizer.pad_token is None:
  125. tokenizer.pad_token = tokenizer.eos_token
  126. # 通用加载策略:尝试多种加载方式,自动兼容各种新架构
  127. model = None
  128. for loader_cls, kwargs in [
  129. (AutoModelForCausalLM, {"trust_remote_code": True}),
  130. (AutoModel, {"trust_remote_code": True}),
  131. ]:
  132. try:
  133. model = loader_cls.from_pretrained(
  134. model_dir,
  135. torch_dtype=torch.float16,
  136. device_map="auto",
  137. **kwargs,
  138. )
  139. break
  140. except Exception:
  141. continue
  142. if model is None:
  143. return {"error": f"Unable to load model with any available loader. Model type may not be supported yet."}
  144. model.eval()
  145. inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
  146. with torch.no_grad():
  147. outputs = model.generate(
  148. **inputs,
  149. max_new_tokens=max_new_tokens,
  150. temperature=temperature,
  151. top_p=top_p,
  152. do_sample=temperature > 0,
  153. pad_token_id=tokenizer.eos_token_id,
  154. )
  155. generated_text = tokenizer.decode(outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)
  156. return {
  157. "model_id": model_id,
  158. "prompt": prompt,
  159. "generated_text": generated_text,
  160. }