inference_service.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. """推理服务 — 支持本地执行和 SSH 远程执行两种模式。"""
  2. import json
  3. from pathlib import Path
  4. from typing import Any
  5. from app.config import get_settings
  6. from app.core.logging import logger
  7. settings = get_settings()
  8. async def generate(
  9. adapter_path: str,
  10. prompt: str,
  11. max_new_tokens: int = 256,
  12. temperature: float = 0.8,
  13. top_p: float = 0.95,
  14. repetition_penalty: float = 1.1,
  15. do_sample: bool = True,
  16. ) -> dict[str, Any]:
  17. """使用已训练的 adapter 生成文本。"""
  18. # 从 adapter config 中获取 base model ID
  19. base_model_id = _get_base_model_id(adapter_path)
  20. if not base_model_id:
  21. return {"error": "无法找到基础模型信息,请确保训练任务已完成"}
  22. if settings.use_remote_compute:
  23. # 远程执行模式
  24. from app.core.remote_executor import run_inference_remote
  25. adapter_dir = Path(adapter_path)
  26. adapter_id = adapter_dir.name
  27. result = run_inference_remote(
  28. model_id=base_model_id,
  29. adapter_id=adapter_id,
  30. prompt=prompt,
  31. max_new_tokens=max_new_tokens,
  32. temperature=temperature,
  33. top_p=top_p,
  34. repetition_penalty=repetition_penalty,
  35. do_sample=do_sample,
  36. )
  37. if result:
  38. return result
  39. return {"error": "Remote inference failed"}
  40. # 本地执行模式
  41. return _generate_local(
  42. adapter_path=adapter_path,
  43. base_model_id=base_model_id,
  44. prompt=prompt,
  45. max_new_tokens=max_new_tokens,
  46. temperature=temperature,
  47. top_p=top_p,
  48. repetition_penalty=repetition_penalty,
  49. do_sample=do_sample,
  50. )
  51. def _generate_local(
  52. adapter_path: str,
  53. base_model_id: str,
  54. prompt: str,
  55. max_new_tokens: int,
  56. temperature: float,
  57. top_p: float,
  58. repetition_penalty: float,
  59. do_sample: bool,
  60. ) -> dict[str, Any]:
  61. """本地执行推理。"""
  62. try:
  63. import os
  64. import torch
  65. from transformers import AutoModelForCausalLM, AutoTokenizer
  66. from peft import PeftModel
  67. tokenizer = AutoTokenizer.from_pretrained(adapter_path, trust_remote_code=True)
  68. if tokenizer.pad_token is None:
  69. tokenizer.pad_token = tokenizer.eos_token
  70. # 沐曦 MPS 模式下固定用第一张物理 GPU
  71. visible_devices = os.environ.get("METAX_VISIBLE_DEVICES", "0")
  72. first_gpu = int(visible_devices.split(",")[0])
  73. device_map = {"": first_gpu}
  74. base_model = AutoModelForCausalLM.from_pretrained(
  75. base_model_id,
  76. torch_dtype=torch.float16,
  77. device_map=device_map,
  78. )
  79. model = PeftModel.from_pretrained(base_model, adapter_path)
  80. model.eval()
  81. inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
  82. with torch.no_grad():
  83. outputs = model.generate(
  84. **inputs,
  85. max_new_tokens=max_new_tokens,
  86. temperature=temperature,
  87. top_p=top_p,
  88. repetition_penalty=repetition_penalty,
  89. do_sample=do_sample,
  90. pad_token_id=tokenizer.eos_token_id,
  91. )
  92. generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
  93. generated_only = tokenizer.decode(outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)
  94. return {
  95. "prompt": prompt,
  96. "generated_text": generated_text,
  97. "generated_only": generated_only,
  98. "tokens_generated": int(outputs.shape[1] - inputs["input_ids"].shape[1]),
  99. }
  100. except Exception as e:
  101. logger.error(f"Inference failed: {e}")
  102. return {"error": str(e)}
  103. def _get_base_model_id(adapter_path: str) -> str | None:
  104. """从 adapter config 中获取 base model ID。"""
  105. config_path = Path(adapter_path) / "adapter_config.json"
  106. if config_path.exists():
  107. with open(config_path) as f:
  108. cfg = json.load(f)
  109. return cfg.get("base_model_name_or_path")
  110. return None
  111. async def get_available_adapters() -> list[dict[str, Any]]:
  112. """列出所有已训练的 adapter。"""
  113. adapters_dir = settings.adapters_dir
  114. if not adapters_dir.exists():
  115. return []
  116. result = []
  117. for d in sorted(adapters_dir.iterdir()):
  118. if not d.is_dir():
  119. continue
  120. adapter_config = d / "adapter_config.json"
  121. if adapter_config.exists():
  122. with open(adapter_config) as f:
  123. cfg = json.load(f)
  124. result.append({
  125. "id": d.name,
  126. "path": str(d),
  127. "base_model": cfg.get("base_model_name_or_path", "unknown"),
  128. "peft_type": cfg.get("peft_type", "unknown"),
  129. })
  130. return result
  131. async def run_inference_single(
  132. model_id: str,
  133. adapter_id: str,
  134. prompt: str,
  135. max_new_tokens: int,
  136. temperature: float,
  137. top_p: float,
  138. repetition_penalty: float,
  139. do_sample: bool,
  140. ) -> dict[str, Any]:
  141. """供远程 SSH 调用的单条推理入口。"""
  142. adapter_path = str(settings.adapters_dir / adapter_id)
  143. return _generate_local(
  144. adapter_path=adapter_path,
  145. base_model_id=model_id,
  146. prompt=prompt,
  147. max_new_tokens=max_new_tokens,
  148. temperature=temperature,
  149. top_p=top_p,
  150. repetition_penalty=repetition_penalty,
  151. do_sample=do_sample,
  152. )