model_test_service.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. from pathlib import Path
  2. from typing import Any
  3. from app.config import get_settings
  4. from app.core.logging import logger
  5. settings = get_settings()
  6. async def test_model(model_id: str, prompt: str, max_new_tokens: int = 128, temperature: float = 0.8, top_p: float = 0.95) -> dict[str, Any]:
  7. """加载已缓存模型并生成测试响应。"""
  8. if settings.use_remote_compute:
  9. return await _test_model_remote(model_id, prompt, max_new_tokens, temperature, top_p)
  10. return await _test_model_local(model_id, prompt, max_new_tokens, temperature, top_p)
  11. async def _test_model_remote(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]:
  12. """在算力节点容器内执行模型测试(通过 SSH + docker exec)。
  13. 方案:将 Python 脚本写入容器临时文件执行,避免 stdin 管道缓冲区限制。
  14. """
  15. import base64
  16. import json
  17. from app.core.remote_executor import ssh_exec
  18. container = settings.compute_node_docker_container
  19. python = settings.compute_node_python
  20. workdir = settings.compute_node_workdir
  21. # 将 prompt 进行 base64 编码,避免引号/特殊字符问题
  22. prompt_b64 = base64.b64encode(prompt.encode("utf-8")).decode()
  23. do_sample = str(temperature > 0).lower()
  24. # 独立脚本:零 app/db 依赖,参数全部通过环境变量传入
  25. # 开头通过 OS 级别重定向 fd 1 到 /dev/null,抑制 C 层调试输出
  26. # 最后恢复 fd 1 以打印 JSON
  27. script = rf"""\
  28. import os, sys, json, warnings, base64
  29. # 保存原始 fd 1(docker exec 的 stdout pipe),然后重定向到 /dev/null
  30. _orig_fd1 = os.dup(1)
  31. _devnull = os.open(os.devnull, os.O_WRONLY)
  32. os.dup2(_devnull, 1)
  33. os.close(_devnull)
  34. warnings.filterwarnings('ignore')
  35. os.environ['PYTHONWARNINGS'] = 'ignore'
  36. os.environ['TRANSFORMERS_VERBOSITY'] = 'error'
  37. os.environ['TRANSFORMERS_NO_ADVISORY_WARNINGS'] = 'true'
  38. from pathlib import Path
  39. import torch
  40. from transformers import logging as tf_logging
  41. tf_logging.set_verbosity_error()
  42. from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModel
  43. def find_model_path(model_id):
  44. for base in [
  45. '/root/Fine-tuning/backend/data/models',
  46. '/root/.cache/huggingface/hub',
  47. '/root/.cache/modelscope/hub',
  48. '/root/models',
  49. ]:
  50. bp = Path(base)
  51. if not bp.is_dir():
  52. continue
  53. flat_name = model_id.replace("/", "_")
  54. if (bp / flat_name / "config.json").exists():
  55. return str(bp / flat_name)
  56. if (bp / model_id / "config.json").exists():
  57. return str(bp / model_id)
  58. try:
  59. for child in bp.rglob("config.json"):
  60. if child.parent.is_dir():
  61. return str(child.parent)
  62. except Exception:
  63. pass
  64. return None
  65. model_id = os.environ.get('MODEL_ID', '')
  66. prompt = base64.b64decode(os.environ.get('PROMPT_B64', '')).decode('utf-8')
  67. max_new_tokens = int(os.environ.get('MAX_TOKENS', '128'))
  68. temperature = float(os.environ.get('TEMPERATURE', '0.8'))
  69. top_p = float(os.environ.get('TOP_P', '0.95'))
  70. do_sample = os.environ.get('DO_SAMPLE', 'true').lower() == 'true'
  71. model_path = find_model_path(model_id)
  72. if model_path is None:
  73. sys.stderr.write(json.dumps({{'error': f'Model not found: {{model_id}}'}}) + '\\n')
  74. exit(1)
  75. t = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
  76. t.pad_token = t.pad_token or t.eos_token
  77. has_accelerate = False
  78. try:
  79. import accelerate
  80. has_accelerate = True
  81. except ImportError:
  82. pass
  83. m = None
  84. load_errors = []
  85. device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
  86. for cls, kw in [(AutoModelForCausalLM, {{'trust_remote_code': True}}), (AutoModel, {{'trust_remote_code': True}})]:
  87. for dtype_val, dtype_name in [(torch.float16, 'float16'), (torch.float32, 'float32')]:
  88. try:
  89. if has_accelerate:
  90. m = cls.from_pretrained(model_path, dtype=dtype_val, device_map='auto', **kw)
  91. else:
  92. m = cls.from_pretrained(model_path, dtype=dtype_val, device_map=None, **kw)
  93. m = m.to(device)
  94. break
  95. except Exception as e:
  96. load_errors.append(f'{{cls.__name__}} {{dtype_name}}: {{str(e)[:200]}}')
  97. if m is not None:
  98. break
  99. if m is None:
  100. sys.stderr.write(json.dumps({{'error': 'Unable to load model', 'details': load_errors}}) + '\\n')
  101. exit(1)
  102. # 恢复 fd 1 到原始 stdout(docker exec 的 pipe)
  103. os.dup2(_orig_fd1, 1)
  104. os.close(_orig_fd1)
  105. m.eval()
  106. device = next(m.parameters()).device
  107. inp = t(prompt, return_tensors='pt').to(device)
  108. out = m.generate(**inp, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p, do_sample=do_sample, pad_token_id=t.eos_token_id)
  109. gen = t.decode(out[0][inp['input_ids'].shape[1]:], skip_special_tokens=True)
  110. print(json.dumps({{'generated_text': gen}}))
  111. """
  112. script_b64 = base64.b64encode(script.encode()).decode()
  113. # 通过 docker exec -i 将解码后的脚本内容传入容器内的 cat,写入临时文件后执行
  114. script_path = f"/tmp/test_model_{model_id.replace('/', '_')}.py"
  115. remote_cmd = (
  116. f"echo '{script_b64}' | base64 -d | "
  117. f"docker exec -i -w {workdir} "
  118. f"-e MODEL_ID={model_id} "
  119. f"-e PROMPT_B64={prompt_b64} "
  120. f"-e MAX_TOKENS={max_new_tokens} "
  121. f"-e TEMPERATURE={temperature} "
  122. f"-e TOP_P={top_p} "
  123. f"-e DO_SAMPLE={do_sample} "
  124. f"{container} bash -c 'cat > {script_path} && {python} {script_path}'"
  125. )
  126. code, stdout, stderr = ssh_exec(remote_cmd, timeout=600)
  127. # 清理容器内临时文件
  128. ssh_exec(f"docker exec {container} rm -f {script_path}", timeout=5)
  129. logger.info(f"Remote test result: code={code}, stdout_len={len(stdout)}, stderr_len={len(stderr)}")
  130. if stdout:
  131. logger.info(f"stdout (first 500): {stdout[:500]}")
  132. if stderr:
  133. logger.info(f"stderr (first 500): {stderr[:500]}")
  134. if code != 0:
  135. logger.error(f"Remote model test failed: {stderr}")
  136. return {"error": stderr.strip() or "Remote test failed"}
  137. for line in reversed(stdout.strip().split("\n")):
  138. line = line.strip()
  139. if line.startswith("{"):
  140. try:
  141. result = json.loads(line)
  142. result["model_id"] = model_id
  143. result["prompt"] = prompt
  144. return result
  145. except json.JSONDecodeError:
  146. continue
  147. return {"error": f"Invalid response: {stdout[:500]}"}
  148. async def _test_model_local(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]:
  149. """本地执行模型测试(仅用于开发环境)。"""
  150. import torch
  151. from transformers import AutoModel, AutoModelForCausalLM, AutoTokenizer, AutoConfig
  152. from app.services.model_service import resolve_model_path
  153. model_path = await resolve_model_path(model_id)
  154. if not model_path:
  155. return {"error": f"Model not found in cache: {model_id}"}
  156. model_dir = Path(model_path)
  157. if not (model_dir / "config.json").exists():
  158. return {"error": f"Model directory not found: {model_dir}"}
  159. tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True)
  160. if tokenizer.pad_token is None:
  161. tokenizer.pad_token = tokenizer.eos_token
  162. model = None
  163. for loader_cls, kwargs in [
  164. (AutoModelForCausalLM, {"trust_remote_code": True}),
  165. (AutoModel, {"trust_remote_code": True}),
  166. ]:
  167. try:
  168. model = loader_cls.from_pretrained(
  169. model_dir,
  170. torch_dtype=torch.float16,
  171. device_map="auto",
  172. **kwargs,
  173. )
  174. break
  175. except Exception:
  176. continue
  177. if model is None:
  178. return {"error": f"Unable to load model with any available loader. Model type may not be supported yet."}
  179. model.eval()
  180. inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
  181. with torch.no_grad():
  182. outputs = model.generate(
  183. **inputs,
  184. max_new_tokens=max_new_tokens,
  185. temperature=temperature,
  186. top_p=top_p,
  187. do_sample=temperature > 0,
  188. pad_token_id=tokenizer.eos_token_id,
  189. )
  190. generated_text = tokenizer.decode(outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)
  191. return {
  192. "model_id": model_id,
  193. "prompt": prompt,
  194. "generated_text": generated_text,
  195. }