model_test_service.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. from pathlib import Path
  2. from typing import Any
  3. import asyncio
  4. from app.config import get_settings
  5. from app.core.logging import logger
  6. settings = get_settings()
  7. async def test_model(model_id: str, prompt: str, max_new_tokens: int = 128, temperature: float = 0.8, top_p: float = 0.95) -> dict[str, Any]:
  8. """加载已缓存模型并生成测试响应。"""
  9. if settings.use_remote_compute:
  10. return await _test_model_remote(model_id, prompt, max_new_tokens, temperature, top_p)
  11. return await _test_model_local(model_id, prompt, max_new_tokens, temperature, top_p)
  12. async def _test_model_remote(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]:
  13. """在算力节点容器内执行模型测试(通过 SSH + docker exec)。
  14. 方案:将 Python 脚本写入容器临时文件执行,避免 stdin 管道缓冲区限制。
  15. """
  16. import base64
  17. import json
  18. from app.core.remote_executor import ssh_exec
  19. container = settings.compute_node_docker_container
  20. python = settings.compute_node_python
  21. workdir = settings.compute_node_workdir
  22. # 将 prompt 进行 base64 编码,避免引号/特殊字符问题
  23. prompt_b64 = base64.b64encode(prompt.encode("utf-8")).decode()
  24. do_sample = str(temperature > 0).lower()
  25. # 独立脚本:零 app/db 依赖,参数全部通过环境变量传入
  26. # 开头通过 OS 级别重定向 fd 1 到 /dev/null,抑制 C 层调试输出
  27. # 最后恢复 fd 1 以打印 JSON
  28. script = rf"""\
  29. import os, sys, json, warnings, base64
  30. # 保存原始 fd 1(docker exec 的 stdout pipe),然后重定向到 /dev/null
  31. _orig_fd1 = os.dup(1)
  32. _devnull = os.open(os.devnull, os.O_WRONLY)
  33. os.dup2(_devnull, 1)
  34. os.close(_devnull)
  35. warnings.filterwarnings('ignore')
  36. os.environ['PYTHONWARNINGS'] = 'ignore'
  37. os.environ['TRANSFORMERS_VERBOSITY'] = 'error'
  38. os.environ['TRANSFORMERS_NO_ADVISORY_WARNINGS'] = 'true'
  39. from pathlib import Path
  40. import torch
  41. from transformers import logging as tf_logging
  42. tf_logging.set_verbosity_error()
  43. from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModel
  44. def find_model_path(model_id):
  45. for base in [
  46. '/root/Fine-tuning/backend/data/models',
  47. '/root/.cache/huggingface/hub',
  48. '/root/.cache/modelscope/hub',
  49. '/root/models',
  50. ]:
  51. bp = Path(base)
  52. if not bp.is_dir():
  53. continue
  54. flat_name = model_id.replace("/", "_")
  55. if (bp / flat_name / "config.json").exists():
  56. return str(bp / flat_name)
  57. if (bp / model_id / "config.json").exists():
  58. return str(bp / model_id)
  59. try:
  60. for child in bp.rglob("config.json"):
  61. if child.parent.is_dir():
  62. return str(child.parent)
  63. except Exception:
  64. pass
  65. return None
  66. model_id = os.environ.get('MODEL_ID', '')
  67. prompt = base64.b64decode(os.environ.get('PROMPT_B64', '')).decode('utf-8')
  68. max_new_tokens = int(os.environ.get('MAX_TOKENS', '128'))
  69. temperature = float(os.environ.get('TEMPERATURE', '0.8'))
  70. top_p = float(os.environ.get('TOP_P', '0.95'))
  71. do_sample = os.environ.get('DO_SAMPLE', 'true').lower() == 'true'
  72. model_path = find_model_path(model_id)
  73. if model_path is None:
  74. sys.stderr.write(json.dumps({{'error': f'Model not found: {{model_id}}'}}) + '\\n')
  75. exit(1)
  76. t = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
  77. t.pad_token = t.pad_token or t.eos_token
  78. has_accelerate = False
  79. try:
  80. import accelerate
  81. has_accelerate = True
  82. except ImportError:
  83. pass
  84. m = None
  85. load_errors = []
  86. device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
  87. for cls, kw in [(AutoModelForCausalLM, {{'trust_remote_code': True}}), (AutoModel, {{'trust_remote_code': True}})]:
  88. for dtype_val, dtype_name in [(torch.float16, 'float16'), (torch.float32, 'float32')]:
  89. try:
  90. if has_accelerate:
  91. m = cls.from_pretrained(model_path, dtype=dtype_val, device_map='auto', **kw)
  92. else:
  93. m = cls.from_pretrained(model_path, dtype=dtype_val, device_map=None, **kw)
  94. m = m.to(device)
  95. break
  96. except Exception as e:
  97. load_errors.append(f'{{cls.__name__}} {{dtype_name}}: {{str(e)[:200]}}')
  98. if m is not None:
  99. break
  100. if m is None:
  101. sys.stderr.write(json.dumps({{'error': 'Unable to load model', 'details': load_errors}}) + '\\n')
  102. exit(1)
  103. # 恢复 fd 1 到原始 stdout(docker exec 的 pipe)
  104. os.dup2(_orig_fd1, 1)
  105. os.close(_orig_fd1)
  106. m.eval()
  107. device = next(m.parameters()).device
  108. inp = t(prompt, return_tensors='pt').to(device)
  109. out = m.generate(**inp, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p, do_sample=do_sample, pad_token_id=t.eos_token_id)
  110. gen = t.decode(out[0][inp['input_ids'].shape[1]:], skip_special_tokens=True)
  111. print(json.dumps({{'generated_text': gen}}))
  112. """
  113. script_b64 = base64.b64encode(script.encode()).decode()
  114. # 通过 docker exec -i 将解码后的脚本内容传入容器内的 cat,写入临时文件后执行
  115. script_path = f"/tmp/test_model_{model_id.replace('/', '_')}.py"
  116. remote_cmd = (
  117. f"echo '{script_b64}' | base64 -d | "
  118. f"docker exec -i -w {workdir} "
  119. f"-e MODEL_ID={model_id} "
  120. f"-e PROMPT_B64={prompt_b64} "
  121. f"-e MAX_TOKENS={max_new_tokens} "
  122. f"-e TEMPERATURE={temperature} "
  123. f"-e TOP_P={top_p} "
  124. f"-e DO_SAMPLE={do_sample} "
  125. f"{container} bash -c 'cat > {script_path} && {python} {script_path}'"
  126. )
  127. code, stdout, stderr = await asyncio.to_thread(ssh_exec, remote_cmd, timeout=600)
  128. # 清理容器内临时文件
  129. await asyncio.to_thread(ssh_exec, f"docker exec {container} rm -f {script_path}", timeout=5)
  130. logger.info(f"Remote test result: code={code}, stdout_len={len(stdout)}, stderr_len={len(stderr)}")
  131. if stdout:
  132. logger.info(f"stdout (first 500): {stdout[:500]}")
  133. if stderr:
  134. logger.info(f"stderr (first 500): {stderr[:500]}")
  135. if code != 0:
  136. logger.error(f"Remote model test failed: {stderr}")
  137. return {"error": stderr.strip() or "Remote test failed"}
  138. for line in reversed(stdout.strip().split("\n")):
  139. line = line.strip()
  140. if line.startswith("{"):
  141. try:
  142. result = json.loads(line)
  143. result["model_id"] = model_id
  144. result["prompt"] = prompt
  145. return result
  146. except json.JSONDecodeError:
  147. continue
  148. return {"error": f"Invalid response: {stdout[:500]}"}
  149. async def _test_model_local(model_id: str, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict[str, Any]:
  150. """本地执行模型测试(仅用于开发环境)。"""
  151. from app.services.model_service import resolve_model_path
  152. model_path = await resolve_model_path(model_id)
  153. if not model_path:
  154. return {"error": f"Model not found in cache: {model_id}"}
  155. model_dir = Path(model_path)
  156. if not (model_dir / "config.json").exists():
  157. return {"error": f"Model directory not found: {model_dir}"}
  158. # GPU 操作在线程池中执行,避免阻塞事件循环
  159. return await asyncio.to_thread(
  160. _run_local_inference, model_dir, prompt, max_new_tokens, temperature, top_p
  161. )
  162. def _run_local_inference(model_dir: Path, prompt: str, max_new_tokens: int, temperature: float, top_p: float) -> dict:
  163. """同步执行本地模型加载和推理。"""
  164. import torch
  165. from transformers import AutoModel, AutoModelForCausalLM, AutoTokenizer
  166. tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True)
  167. if tokenizer.pad_token is None:
  168. tokenizer.pad_token = tokenizer.eos_token
  169. model = None
  170. for loader_cls, kwargs in [
  171. (AutoModelForCausalLM, {"trust_remote_code": True}),
  172. (AutoModel, {"trust_remote_code": True}),
  173. ]:
  174. try:
  175. model = loader_cls.from_pretrained(
  176. model_dir,
  177. torch_dtype=torch.float16,
  178. device_map="auto",
  179. **kwargs,
  180. )
  181. break
  182. except Exception:
  183. continue
  184. if model is None:
  185. return {"error": "Unable to load model with any available loader. Model type may not be supported yet."}
  186. model.eval()
  187. inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
  188. with torch.no_grad():
  189. outputs = model.generate(
  190. **inputs,
  191. max_new_tokens=max_new_tokens,
  192. temperature=temperature,
  193. top_p=top_p,
  194. do_sample=temperature > 0,
  195. pad_token_id=tokenizer.eos_token_id,
  196. )
  197. generated_text = tokenizer.decode(outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)
  198. return {
  199. "prompt": prompt,
  200. "generated_text": generated_text,
  201. }