inference_service.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. """推理服务 — 支持本地执行和 SSH 远程执行两种模式。"""
  2. import json
  3. from pathlib import Path
  4. from typing import Any
  5. from app.config import get_settings
  6. from app.core.logging import logger
  7. settings = get_settings()
  8. async def generate(
  9. adapter_path: str,
  10. prompt: str,
  11. max_new_tokens: int = 256,
  12. temperature: float = 0.8,
  13. top_p: float = 0.95,
  14. repetition_penalty: float = 1.1,
  15. do_sample: bool = True,
  16. ) -> dict[str, Any]:
  17. """使用已训练的 adapter 生成文本。"""
  18. # 从 adapter config 中获取 base model ID
  19. base_model_id = _get_base_model_id(adapter_path)
  20. if not base_model_id:
  21. return {"error": "无法找到基础模型信息,请确保训练任务已完成"}
  22. if settings.use_remote_compute:
  23. # 远程执行模式
  24. from app.core.remote_executor import run_inference_remote
  25. adapter_dir = Path(adapter_path)
  26. adapter_id = adapter_dir.name
  27. result = run_inference_remote(
  28. model_id=base_model_id,
  29. adapter_id=adapter_id,
  30. prompt=prompt,
  31. max_new_tokens=max_new_tokens,
  32. temperature=temperature,
  33. top_p=top_p,
  34. repetition_penalty=repetition_penalty,
  35. do_sample=do_sample,
  36. )
  37. if result:
  38. return result
  39. return {"error": "Remote inference failed"}
  40. # 本地执行模式
  41. return _generate_local(
  42. adapter_path=adapter_path,
  43. base_model_id=base_model_id,
  44. prompt=prompt,
  45. max_new_tokens=max_new_tokens,
  46. temperature=temperature,
  47. top_p=top_p,
  48. repetition_penalty=repetition_penalty,
  49. do_sample=do_sample,
  50. )
  51. def _generate_local(
  52. adapter_path: str,
  53. base_model_id: str,
  54. prompt: str,
  55. max_new_tokens: int,
  56. temperature: float,
  57. top_p: float,
  58. repetition_penalty: float,
  59. do_sample: bool,
  60. ) -> dict[str, Any]:
  61. """本地执行推理。"""
  62. try:
  63. import os
  64. import torch
  65. from transformers import AutoModelForCausalLM, AutoTokenizer
  66. from peft import PeftModel
  67. tokenizer = AutoTokenizer.from_pretrained(adapter_path, trust_remote_code=True)
  68. if tokenizer.pad_token is None:
  69. tokenizer.pad_token = tokenizer.eos_token
  70. # 沐曦 MPS 模式下固定用第一张物理 GPU,兜底用 cuda:0
  71. import torch
  72. visible_devices = os.environ.get("METAX_VISIBLE_DEVICES", "")
  73. if visible_devices:
  74. first_gpu = int(visible_devices.split(",")[0])
  75. # 检查设备是否真的存在,不存在则用 cuda:0
  76. if first_gpu >= torch.cuda.device_count():
  77. first_gpu = 0
  78. else:
  79. first_gpu = 0
  80. device_map = {"": first_gpu}
  81. torch.cuda.set_device(first_gpu)
  82. base_model = AutoModelForCausalLM.from_pretrained(
  83. base_model_id,
  84. torch_dtype=torch.float16,
  85. device_map=device_map,
  86. )
  87. model = PeftModel.from_pretrained(base_model, adapter_path)
  88. model.eval()
  89. inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
  90. with torch.no_grad():
  91. outputs = model.generate(
  92. **inputs,
  93. max_new_tokens=max_new_tokens,
  94. temperature=temperature,
  95. top_p=top_p,
  96. repetition_penalty=repetition_penalty,
  97. do_sample=do_sample,
  98. pad_token_id=tokenizer.eos_token_id,
  99. )
  100. generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
  101. generated_only = tokenizer.decode(outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)
  102. return {
  103. "prompt": prompt,
  104. "generated_text": generated_text,
  105. "generated_only": generated_only,
  106. "tokens_generated": int(outputs.shape[1] - inputs["input_ids"].shape[1]),
  107. }
  108. except Exception as e:
  109. logger.error(f"Inference failed: {e}")
  110. return {"error": str(e)}
  111. def _get_base_model_id(adapter_path: str) -> str | None:
  112. """从 adapter config 中获取 base model ID。"""
  113. config_path = Path(adapter_path) / "adapter_config.json"
  114. if config_path.exists():
  115. with open(config_path) as f:
  116. cfg = json.load(f)
  117. return cfg.get("base_model_name_or_path")
  118. return None
  119. async def get_available_adapters() -> list[dict[str, Any]]:
  120. """列出所有已训练的 adapter。"""
  121. adapters_dir = settings.adapters_dir
  122. if not adapters_dir.exists():
  123. return []
  124. result = []
  125. for d in sorted(adapters_dir.iterdir()):
  126. if not d.is_dir():
  127. continue
  128. adapter_config = d / "adapter_config.json"
  129. if adapter_config.exists():
  130. with open(adapter_config) as f:
  131. cfg = json.load(f)
  132. result.append({
  133. "id": d.name,
  134. "path": str(d),
  135. "base_model": cfg.get("base_model_name_or_path", "unknown"),
  136. "peft_type": cfg.get("peft_type", "unknown"),
  137. })
  138. return result
  139. async def run_inference_single(
  140. model_id: str,
  141. adapter_id: str,
  142. prompt: str,
  143. max_new_tokens: int,
  144. temperature: float,
  145. top_p: float,
  146. repetition_penalty: float,
  147. do_sample: bool,
  148. ) -> dict[str, Any]:
  149. """供远程 SSH 调用的单条推理入口。"""
  150. adapter_path = str(settings.adapters_dir / adapter_id)
  151. return _generate_local(
  152. adapter_path=adapter_path,
  153. base_model_id=model_id,
  154. prompt=prompt,
  155. max_new_tokens=max_new_tokens,
  156. temperature=temperature,
  157. top_p=top_p,
  158. repetition_penalty=repetition_penalty,
  159. do_sample=do_sample,
  160. )