model_test_service.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. from pathlib import Path
  2. from typing import Any
  3. from app.config import get_settings
  4. from app.core.logging import logger
  5. settings = get_settings()
  6. async def test_model(model_id: str, prompt: str, max_new_tokens: int = 128, temperature: float = 0.8, top_p: float = 0.95) -> dict[str, Any]:
  7. """加载已缓存模型并生成测试响应。"""
  8. if settings.use_remote_compute:
  9. return _test_model_remote(model_id, prompt, max_new_tokens, temperature, top_p)
  10. return _test_model_local(model_id, prompt, max_new_tokens, temperature, top_p)
  11. def _test_model_remote(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]:
  12. """通过 SSH 在算力节点执行模型测试。
  13. 流程:scp 到远端宿主机 → docker cp 传入容器 → docker exec 执行 → 清理
  14. """
  15. import json
  16. import os
  17. import tempfile
  18. from app.core.remote_executor import scp_to_remote, ssh_exec
  19. container = settings.compute_node_docker_container
  20. python = settings.compute_node_python
  21. workdir = settings.compute_node_workdir
  22. # 独立的模型测试脚本内容(零 app/db 依赖)
  23. python_script = """\
  24. import json, sys
  25. from pathlib import Path
  26. import torch
  27. from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModel
  28. def find_model_path(model_id):
  29. candidates = [
  30. '/root/.cache/huggingface/hub',
  31. '/root/.cache/modelscope/hub',
  32. '/root/models',
  33. ]
  34. for base in candidates:
  35. bp = Path(base)
  36. if not bp.is_dir():
  37. continue
  38. for child in bp.rglob('config.json'):
  39. parent = child.parent
  40. if parent.is_dir():
  41. return str(parent)
  42. return None
  43. model_id = sys.argv[1]
  44. prompt = sys.argv[2]
  45. max_new_tokens = int(sys.argv[3])
  46. temperature = float(sys.argv[4])
  47. top_p = float(sys.argv[5])
  48. model_path = find_model_path(model_id)
  49. if model_path is None:
  50. print(json.dumps({'error': 'Model not found in cache'}))
  51. sys.exit(1)
  52. t = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
  53. t.pad_token = t.pad_token or t.eos_token
  54. m = None
  55. for cls, kw in [
  56. (AutoModelForCausalLM, {'trust_remote_code': True}),
  57. (AutoModel, {'trust_remote_code': True}),
  58. ]:
  59. try:
  60. m = cls.from_pretrained(model_path, torch_dtype=torch.float16, device_map='auto', **kw)
  61. break
  62. except Exception:
  63. pass
  64. if m is None:
  65. print(json.dumps({'error': 'Unable to load model'}))
  66. sys.exit(1)
  67. m.eval()
  68. inp = t(prompt, return_tensors='pt').to(m.device)
  69. out = m.generate(**inp, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p, do_sample=%s, pad_token_id=t.eos_token_id)
  70. gen = t.decode(out[0][inp['input_ids'].shape[1]:], skip_special_tokens=True)
  71. print(json.dumps({'generated_text': gen}))
  72. """ % str(temperature > 0).lower()
  73. remote_script = "/tmp/_model_test.py"
  74. with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False, encoding="utf-8") as tmp:
  75. tmp.write(python_script)
  76. tmp.flush()
  77. tmp_path = tmp.name
  78. try:
  79. # Step 1: SCP 到远端宿主机
  80. host_tmp = "/tmp/_model_test_host.py"
  81. code, out, err = scp_to_remote(tmp_path, host_tmp)
  82. if code != 0:
  83. logger.error(f"SCP failed: {err}")
  84. return {"error": f"Failed to upload script: {err.strip()}"}
  85. # Step 2: docker cp 把文件从宿主机传入容器
  86. cp_cmd = f"docker cp {host_tmp} {container}:/tmp/_model_test.py"
  87. code, out, err = ssh_exec(cp_cmd, timeout=10)
  88. if code != 0:
  89. logger.error(f"docker cp failed: {err}")
  90. return {"error": f"Failed to copy script to container: {err.strip()}"}
  91. # Step 3: docker exec 执行容器内的脚本
  92. safe_prompt = prompt.replace("'", "\\'")
  93. run_cmd = f"docker exec -w {workdir} {container} {python} /tmp/_model_test.py '{model_id}' '{safe_prompt}' {max_new_tokens} {temperature} {top_p}"
  94. code, stdout, stderr = ssh_exec(run_cmd, timeout=600)
  95. if code != 0:
  96. logger.error(f"Remote model test failed: {stderr}")
  97. return {"error": stderr.strip() or "Remote test failed"}
  98. finally:
  99. os.unlink(tmp_path)
  100. ssh_exec(f"rm -f /tmp/_model_test_host.py", timeout=10)
  101. ssh_exec(f"docker exec {container} rm -f /tmp/_model_test.py", timeout=10)
  102. logger.info(f"Remote test result: code={code}, stdout_len={len(stdout)}, stderr_len={len(stderr)}")
  103. if stdout:
  104. logger.info(f"stdout (first 500): {stdout[:500]}")
  105. if stderr:
  106. logger.info(f"stderr (first 500): {stderr[:500]}")
  107. if code != 0:
  108. logger.error(f"Remote model test failed: {stderr}")
  109. return {"error": stderr.strip() or "Remote test failed"}
  110. # 提取最后一行 JSON
  111. for line in reversed(stdout.strip().split("\n")):
  112. line = line.strip()
  113. if line.startswith("{"):
  114. try:
  115. result = json.loads(line)
  116. result["model_id"] = model_id
  117. result["prompt"] = prompt
  118. return result
  119. except json.JSONDecodeError:
  120. continue
  121. return {"error": f"Invalid response: {stdout[:500]}"}
  122. async def _test_model_local(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]:
  123. """本地执行模型测试(仅用于开发环境)。"""
  124. import torch
  125. from transformers import AutoModel, AutoModelForCausalLM, AutoTokenizer, AutoConfig
  126. from app.services.model_service import resolve_model_path
  127. model_path = await resolve_model_path(model_id)
  128. if not model_path:
  129. return {"error": f"Model not found in cache: {model_id}"}
  130. model_dir = Path(model_path)
  131. if not (model_dir / "config.json").exists():
  132. return {"error": f"Model directory not found: {model_dir}"}
  133. tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True)
  134. if tokenizer.pad_token is None:
  135. tokenizer.pad_token = tokenizer.eos_token
  136. # 通用加载策略:尝试多种加载方式,自动兼容各种新架构
  137. model = None
  138. for loader_cls, kwargs in [
  139. (AutoModelForCausalLM, {"trust_remote_code": True}),
  140. (AutoModel, {"trust_remote_code": True}),
  141. ]:
  142. try:
  143. model = loader_cls.from_pretrained(
  144. model_dir,
  145. torch_dtype=torch.float16,
  146. device_map="auto",
  147. **kwargs,
  148. )
  149. break
  150. except Exception:
  151. continue
  152. if model is None:
  153. return {"error": f"Unable to load model with any available loader. Model type may not be supported yet."}
  154. model.eval()
  155. inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
  156. with torch.no_grad():
  157. outputs = model.generate(
  158. **inputs,
  159. max_new_tokens=max_new_tokens,
  160. temperature=temperature,
  161. top_p=top_p,
  162. do_sample=temperature > 0,
  163. pad_token_id=tokenizer.eos_token_id,
  164. )
  165. generated_text = tokenizer.decode(outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)
  166. return {
  167. "model_id": model_id,
  168. "prompt": prompt,
  169. "generated_text": generated_text,
  170. }