inference_service.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. """推理服务 — 支持本地执行和 SSH 远程执行两种模式。"""
  2. import json
  3. from pathlib import Path
  4. from typing import Any
  5. from app.config import get_settings
  6. from app.core.logging import logger
  7. settings = get_settings()
  8. async def generate(
  9. adapter_path: str,
  10. prompt: str,
  11. max_new_tokens: int = 256,
  12. temperature: float = 0.8,
  13. top_p: float = 0.95,
  14. repetition_penalty: float = 1.1,
  15. do_sample: bool = True,
  16. ) -> dict[str, Any]:
  17. """使用已训练的 adapter 生成文本。"""
  18. # 从 adapter config 中获取 base model ID
  19. base_model_id = _get_base_model_id(adapter_path)
  20. if not base_model_id:
  21. return {"error": "无法找到基础模型信息,请确保训练任务已完成"}
  22. if settings.use_remote_compute:
  23. # 远程执行模式
  24. from app.core.remote_executor import run_inference_remote
  25. adapter_dir = Path(adapter_path)
  26. adapter_id = adapter_dir.name
  27. result = run_inference_remote(
  28. model_id=base_model_id,
  29. adapter_id=adapter_id,
  30. prompt=prompt,
  31. max_new_tokens=max_new_tokens,
  32. temperature=temperature,
  33. top_p=top_p,
  34. repetition_penalty=repetition_penalty,
  35. do_sample=do_sample,
  36. )
  37. if result:
  38. return result
  39. return {"error": "Remote inference failed"}
  40. # 本地执行模式
  41. return _generate_local(
  42. adapter_path=adapter_path,
  43. base_model_id=base_model_id,
  44. prompt=prompt,
  45. max_new_tokens=max_new_tokens,
  46. temperature=temperature,
  47. top_p=top_p,
  48. repetition_penalty=repetition_penalty,
  49. do_sample=do_sample,
  50. )
  51. def _generate_local(
  52. adapter_path: str,
  53. base_model_id: str,
  54. prompt: str,
  55. max_new_tokens: int,
  56. temperature: float,
  57. top_p: float,
  58. repetition_penalty: float,
  59. do_sample: bool,
  60. ) -> dict[str, Any]:
  61. """本地执行推理。"""
  62. try:
  63. import torch
  64. from transformers import AutoModelForCausalLM, AutoTokenizer
  65. from peft import PeftModel
  66. tokenizer = AutoTokenizer.from_pretrained(adapter_path, trust_remote_code=True)
  67. if tokenizer.pad_token is None:
  68. tokenizer.pad_token = tokenizer.eos_token
  69. base_model = AutoModelForCausalLM.from_pretrained(
  70. base_model_id,
  71. torch_dtype=torch.float16,
  72. device_map="auto",
  73. )
  74. model = PeftModel.from_pretrained(base_model, adapter_path)
  75. model.eval()
  76. inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
  77. with torch.no_grad():
  78. outputs = model.generate(
  79. **inputs,
  80. max_new_tokens=max_new_tokens,
  81. temperature=temperature,
  82. top_p=top_p,
  83. repetition_penalty=repetition_penalty,
  84. do_sample=do_sample,
  85. pad_token_id=tokenizer.eos_token_id,
  86. )
  87. generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
  88. generated_only = tokenizer.decode(outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)
  89. return {
  90. "prompt": prompt,
  91. "generated_text": generated_text,
  92. "generated_only": generated_only,
  93. "tokens_generated": int(outputs.shape[1] - inputs["input_ids"].shape[1]),
  94. }
  95. except Exception as e:
  96. logger.error(f"Inference failed: {e}")
  97. return {"error": str(e)}
  98. def _get_base_model_id(adapter_path: str) -> str | None:
  99. """从 adapter config 中获取 base model ID。"""
  100. config_path = Path(adapter_path) / "adapter_config.json"
  101. if config_path.exists():
  102. with open(config_path) as f:
  103. cfg = json.load(f)
  104. return cfg.get("base_model_name_or_path")
  105. return None
  106. async def get_available_adapters() -> list[dict[str, Any]]:
  107. """列出所有已训练的 adapter。"""
  108. adapters_dir = settings.adapters_dir
  109. if not adapters_dir.exists():
  110. return []
  111. result = []
  112. for d in sorted(adapters_dir.iterdir()):
  113. if not d.is_dir():
  114. continue
  115. adapter_config = d / "adapter_config.json"
  116. if adapter_config.exists():
  117. with open(adapter_config) as f:
  118. cfg = json.load(f)
  119. result.append({
  120. "id": d.name,
  121. "path": str(d),
  122. "base_model": cfg.get("base_model_name_or_path", "unknown"),
  123. "peft_type": cfg.get("peft_type", "unknown"),
  124. })
  125. return result
  126. async def run_inference_single(
  127. model_id: str,
  128. adapter_id: str,
  129. prompt: str,
  130. max_new_tokens: int,
  131. temperature: float,
  132. top_p: float,
  133. repetition_penalty: float,
  134. do_sample: bool,
  135. ) -> dict[str, Any]:
  136. """供远程 SSH 调用的单条推理入口。"""
  137. adapter_path = str(settings.adapters_dir / adapter_id)
  138. return _generate_local(
  139. adapter_path=adapter_path,
  140. base_model_id=model_id,
  141. prompt=prompt,
  142. max_new_tokens=max_new_tokens,
  143. temperature=temperature,
  144. top_p=top_p,
  145. repetition_penalty=repetition_penalty,
  146. do_sample=do_sample,
  147. )