import uuid from datetime import datetime from pathlib import Path from typing import Any from app.config import get_settings from app.core.db import async_session, DeployTaskModel from app.core.logging import logger settings = get_settings() async def export_adapter(job_id: str, config: dict[str, Any]) -> dict[str, Any]: """合并 adapter 与基础模型,并可选导出为 GGUF。""" task_id = str(uuid.uuid4()) merge_with_base = config.get("merge_with_base", False) export_format = config.get("export_format", "safetensors") adapter_path = settings.adapters_dir / job_id if not adapter_path.exists(): return {"job_id": job_id, "status": "failed", "output_path": None, "error": "Adapter not found"} output_path = settings.adapters_dir / f"{job_id}_merged" # 写入数据库 task = DeployTaskModel( id=task_id, job_id=job_id, status="pending", created_at=datetime.utcnow(), ) async with async_session() as session: session.add(task) await session.commit() try: import torch from transformers import AutoModelForCausalLM, AutoTokenizer if merge_with_base: # 加载 base model 并合并 adapter base_model_id = _get_base_model_id(job_id) if base_model_id: base_model = AutoModelForCausalLM.from_pretrained( base_model_id, torch_dtype=torch.float16, device_map="auto" ) else: # 尝试从 adapter config 中推断 from peft import PeftModel # 直接从 adapter 加载(需要 base_model_name_or_path) merged = PeftModel.from_pretrained( AutoModelForCausalLM.from_pretrained( adapter_path / "adapter_config.json", torch_dtype=torch.float16 ), adapter_path, ) merged = merged.merge_and_unload() merged.save_pretrained(output_path) tokenizer = AutoTokenizer.from_pretrained(adapter_path) tokenizer.save_pretrained(output_path) logger.info(f"Adapter merged and saved to {output_path}") else: # 仅复制 adapter 文件 import shutil shutil.copytree(adapter_path, output_path) logger.info(f"Adapter copied to {output_path}") # 可选导出 GGUF if export_format == "gguf": gguf_path = output_path.with_suffix(".gguf") _export_to_gguf(output_path, gguf_path) # 更新数据库 async with async_session() as session: from sqlalchemy import select result = await session.execute(select(DeployTaskModel).where(DeployTaskModel.id == task_id)) record = result.scalar_one_or_none() if record: record.status = "completed" record.output_path = str(output_path) await session.commit() return {"job_id": job_id, "status": "completed", "output_path": str(output_path)} except Exception as e: logger.error(f"Export failed for job {job_id}: {e}") async with async_session() as session: from sqlalchemy import select result = await session.execute(select(DeployTaskModel).where(DeployTaskModel.id == task_id)) record = result.scalar_one_or_none() if record: record.status = "failed" record.error = str(e) await session.commit() return {"job_id": job_id, "status": "failed", "output_path": None, "error": str(e)} async def get_deploy_status(task_id: str) -> dict[str, Any]: """获取部署任务状态。""" async with async_session() as session: from sqlalchemy import select result = await session.execute(select(DeployTaskModel).where(DeployTaskModel.id == task_id)) record = result.scalar_one_or_none() if record: return { "job_id": record.job_id, "status": record.status, "output_path": record.output_path, "error": record.error, } return {"job_id": "", "status": "not_found", "output_path": None, "error": None} def _get_base_model_id(job_id: str) -> str | None: """从 adapter config 中获取 base model ID。""" config_path = settings.adapters_dir / job_id / "adapter_config.json" if config_path.exists(): import json with open(config_path) as f: cfg = json.load(f) return cfg.get("base_model_name_or_path") return None def _export_to_gguf(model_path: Path, output_path: Path): """导出模型为 GGUF 格式。""" try: from llama_cpp import Llama # 使用 llama-cpp-python 的 convert 工具 import subprocess result = subprocess.run( ["python", "-m", "llama_cpp.convert_hf_to_gguf", str(model_path), "--outfile", str(output_path)], capture_output=True, text=True, timeout=600, ) if result.returncode != 0: logger.error(f"GGUF export failed: {result.stderr}") except Exception as e: logger.warning(f"GGUF export not available: {e}")