|
@@ -135,7 +135,7 @@ async def _execute_export(task_id: str, job_id: str, merge_with_base: bool, expo
|
|
|
|
|
|
|
|
# 把 inference_worker.py 和启动脚本复制到输出目录
|
|
# 把 inference_worker.py 和启动脚本复制到输出目录
|
|
|
if output_path and settings.use_remote_compute:
|
|
if output_path and settings.use_remote_compute:
|
|
|
- _copy_worker_template_remote(output_path)
|
|
|
|
|
|
|
+ await _copy_worker_template_remote(output_path)
|
|
|
|
|
|
|
|
await _update_deploy_status(task_id, "completed", output_path=output_path)
|
|
await _update_deploy_status(task_id, "completed", output_path=output_path)
|
|
|
return {"output_path": output_path}
|
|
return {"output_path": output_path}
|
|
@@ -227,7 +227,7 @@ async def _launch_remote_worker(task_id: str, model_path: str, port: int) -> str
|
|
|
f"docker exec {settings.compute_node_docker_container} "
|
|
f"docker exec {settings.compute_node_docker_container} "
|
|
|
f"bash -c 'fuser -k {port}/tcp 2>/dev/null; sleep 1; true'"
|
|
f"bash -c 'fuser -k {port}/tcp 2>/dev/null; sleep 1; true'"
|
|
|
)
|
|
)
|
|
|
- ssh_exec(kill_cmd, timeout=15)
|
|
|
|
|
|
|
+ await asyncio.to_thread(ssh_exec, kill_cmd, timeout=15)
|
|
|
|
|
|
|
|
# worker 脚本在容器内的路径
|
|
# worker 脚本在容器内的路径
|
|
|
worker_template = f"{settings.compute_node_workdir}/app/core/inference_worker.py"
|
|
worker_template = f"{settings.compute_node_workdir}/app/core/inference_worker.py"
|
|
@@ -237,15 +237,15 @@ async def _launch_remote_worker(task_id: str, model_path: str, port: int) -> str
|
|
|
f"docker exec {settings.compute_node_docker_container} "
|
|
f"docker exec {settings.compute_node_docker_container} "
|
|
|
f"bash -c 'cp {worker_template} {model_path}/inference_worker.py'"
|
|
f"bash -c 'cp {worker_template} {model_path}/inference_worker.py'"
|
|
|
)
|
|
)
|
|
|
- code, _, stderr = ssh_exec(copy_cmd, timeout=30)
|
|
|
|
|
|
|
+ code, _, stderr = await asyncio.to_thread(ssh_exec, copy_cmd, timeout=30)
|
|
|
if code != 0:
|
|
if code != 0:
|
|
|
raise RuntimeError(f"复制 inference_worker.py 失败: {stderr}")
|
|
raise RuntimeError(f"复制 inference_worker.py 失败: {stderr}")
|
|
|
|
|
|
|
|
- # 在容器内后台启动 worker
|
|
|
|
|
|
|
+ # 在容器内后台启动 worker(多卡推理:CUDA_VISIBLE_DEVICES 使用配置项)
|
|
|
launch_cmd = (
|
|
launch_cmd = (
|
|
|
f"docker exec "
|
|
f"docker exec "
|
|
|
f"-e MACA_MPS_MODE=1 "
|
|
f"-e MACA_MPS_MODE=1 "
|
|
|
- f"-e CUDA_VISIBLE_DEVICES=3 "
|
|
|
|
|
|
|
+ f"-e CUDA_VISIBLE_DEVICES={settings.inference_cuda_devices} "
|
|
|
f"-w {model_path} "
|
|
f"-w {model_path} "
|
|
|
f"{settings.compute_node_docker_container} "
|
|
f"{settings.compute_node_docker_container} "
|
|
|
f"bash -c '"
|
|
f"bash -c '"
|
|
@@ -256,7 +256,7 @@ async def _launch_remote_worker(task_id: str, model_path: str, port: int) -> str
|
|
|
f" echo $!'"
|
|
f" echo $!'"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- code, stdout, stderr = ssh_exec(launch_cmd, timeout=30)
|
|
|
|
|
|
|
+ code, stdout, stderr = await asyncio.to_thread(ssh_exec, launch_cmd, timeout=30)
|
|
|
if code != 0:
|
|
if code != 0:
|
|
|
raise RuntimeError(f"启动推理 worker 失败: {stderr}")
|
|
raise RuntimeError(f"启动推理 worker 失败: {stderr}")
|
|
|
|
|
|
|
@@ -265,9 +265,8 @@ async def _launch_remote_worker(task_id: str, model_path: str, port: int) -> str
|
|
|
|
|
|
|
|
# 等待模型加载(可能需要较长时间),检查 READY 标记
|
|
# 等待模型加载(可能需要较长时间),检查 READY 标记
|
|
|
# 每次轮询只用一次 SSH 连接,同时检查 READY 和进程状态
|
|
# 每次轮询只用一次 SSH 连接,同时检查 READY 和进程状态
|
|
|
- import asyncio as _aio
|
|
|
|
|
for attempt in range(60): # 最多等 5 分钟(60 * 5s)
|
|
for attempt in range(60): # 最多等 5 分钟(60 * 5s)
|
|
|
- await _aio.sleep(5)
|
|
|
|
|
|
|
+ await asyncio.sleep(5)
|
|
|
check_cmd = (
|
|
check_cmd = (
|
|
|
f"docker exec {settings.compute_node_docker_container} "
|
|
f"docker exec {settings.compute_node_docker_container} "
|
|
|
f"bash -c '"
|
|
f"bash -c '"
|
|
@@ -277,7 +276,7 @@ async def _launch_remote_worker(task_id: str, model_path: str, port: int) -> str
|
|
|
f" echo \"ALIVE\"; "
|
|
f" echo \"ALIVE\"; "
|
|
|
f"'"
|
|
f"'"
|
|
|
)
|
|
)
|
|
|
- code, stdout, stderr = ssh_exec(check_cmd, timeout=30)
|
|
|
|
|
|
|
+ code, stdout, stderr = await asyncio.to_thread(ssh_exec, check_cmd, timeout=30)
|
|
|
if code == 0:
|
|
if code == 0:
|
|
|
result = stdout.strip()
|
|
result = stdout.strip()
|
|
|
if result.startswith("READY:"):
|
|
if result.startswith("READY:"):
|
|
@@ -289,7 +288,7 @@ async def _launch_remote_worker(task_id: str, model_path: str, port: int) -> str
|
|
|
f"docker exec {settings.compute_node_docker_container} "
|
|
f"docker exec {settings.compute_node_docker_container} "
|
|
|
f"bash -c 'tail -20 /tmp/serve_{task_id}.log 2>/dev/null'"
|
|
f"bash -c 'tail -20 /tmp/serve_{task_id}.log 2>/dev/null'"
|
|
|
)
|
|
)
|
|
|
- _, log_stdout, _ = ssh_exec(log_cmd, timeout=30)
|
|
|
|
|
|
|
+ _, log_stdout, _ = await asyncio.to_thread(ssh_exec, log_cmd, timeout=30)
|
|
|
raise RuntimeError(f"Worker 进程已退出: {log_stdout}")
|
|
raise RuntimeError(f"Worker 进程已退出: {log_stdout}")
|
|
|
# result == "ALIVE" → 继续等待
|
|
# result == "ALIVE" → 继续等待
|
|
|
|
|
|
|
@@ -332,20 +331,23 @@ async def stop_serving(task_id: str, user_id: str = "") -> dict[str, Any]:
|
|
|
return {"error": "无权操作此任务"}
|
|
return {"error": "无权操作此任务"}
|
|
|
|
|
|
|
|
pid = record.pid
|
|
pid = record.pid
|
|
|
|
|
+ port = record.port
|
|
|
|
|
+ output_path = record.output_path
|
|
|
|
|
+
|
|
|
if pid and settings.use_remote_compute:
|
|
if pid and settings.use_remote_compute:
|
|
|
# 方式1: kill -9 主进程及其子进程
|
|
# 方式1: kill -9 主进程及其子进程
|
|
|
kill_cmd = (
|
|
kill_cmd = (
|
|
|
f"docker exec {settings.compute_node_docker_container} "
|
|
f"docker exec {settings.compute_node_docker_container} "
|
|
|
f"bash -c 'kill -9 {pid} 2>/dev/null; pkill -9 -P {pid} 2>/dev/null; true'"
|
|
f"bash -c 'kill -9 {pid} 2>/dev/null; pkill -9 -P {pid} 2>/dev/null; true'"
|
|
|
)
|
|
)
|
|
|
- code, _, _ = ssh_exec(kill_cmd, timeout=15)
|
|
|
|
|
|
|
+ code, _, _ = await asyncio.to_thread(ssh_exec, kill_cmd, timeout=15)
|
|
|
# 方式2: fuser 兜底清理端口(防止进程 kill 失败仍占着端口)
|
|
# 方式2: fuser 兜底清理端口(防止进程 kill 失败仍占着端口)
|
|
|
- if record.port:
|
|
|
|
|
|
|
+ if port:
|
|
|
fuser_cmd = (
|
|
fuser_cmd = (
|
|
|
f"docker exec {settings.compute_node_docker_container} "
|
|
f"docker exec {settings.compute_node_docker_container} "
|
|
|
- f"bash -c 'fuser -k {record.port}/tcp 2>/dev/null; sleep 1; true'"
|
|
|
|
|
|
|
+ f"bash -c 'fuser -k {port}/tcp 2>/dev/null; sleep 1; true'"
|
|
|
)
|
|
)
|
|
|
- ssh_exec(fuser_cmd, timeout=15)
|
|
|
|
|
|
|
+ await asyncio.to_thread(ssh_exec, fuser_cmd, timeout=15)
|
|
|
logger.info(f"Stop serving: task={task_id} pid={pid} kill_code={code}")
|
|
logger.info(f"Stop serving: task={task_id} pid={pid} kill_code={code}")
|
|
|
|
|
|
|
|
record.status = "stopped"
|
|
record.status = "stopped"
|
|
@@ -357,6 +359,63 @@ async def stop_serving(task_id: str, user_id: str = "") -> dict[str, Any]:
|
|
|
return {"task_id": task_id, "status": "stopped"}
|
|
return {"task_id": task_id, "status": "stopped"}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+async def restart_serving(task_id: str, user_id: str = "") -> dict[str, Any]:
|
|
|
|
|
+ """重启已停止的在线服务(不重新导出模型,只启动 worker)。"""
|
|
|
|
|
+ async with async_session() as session:
|
|
|
|
|
+ result = await session.execute(select(DeployTaskModel).where(DeployTaskModel.id == task_id))
|
|
|
|
|
+ record = result.scalar_one_or_none()
|
|
|
|
|
+ if not record:
|
|
|
|
|
+ return {"error": "任务不存在"}
|
|
|
|
|
+ if record.deploy_mode != "serve":
|
|
|
|
|
+ return {"error": "该任务不是在线服务"}
|
|
|
|
|
+ if record.status != "stopped":
|
|
|
|
|
+ return {"error": f"只能重启已停止的服务(当前状态: {record.status})"}
|
|
|
|
|
+ if user_id and record.user_id and record.user_id != user_id:
|
|
|
|
|
+ return {"error": "无权操作此任务"}
|
|
|
|
|
+ if not record.output_path:
|
|
|
|
|
+ return {"error": "模型文件路径丢失,无法重启,请重新部署"}
|
|
|
|
|
+
|
|
|
|
|
+ output_path = record.output_path
|
|
|
|
|
+
|
|
|
|
|
+ # 分配新端口
|
|
|
|
|
+ port = await _allocate_port()
|
|
|
|
|
+
|
|
|
|
|
+ # 更新状态为 pending,标记正在重启
|
|
|
|
|
+ await _update_deploy_status(task_id, "pending", port=port)
|
|
|
|
|
+
|
|
|
|
|
+ background_task_manager.register_task(task_id, "deployment", {"mode": "restart"})
|
|
|
|
|
+ await background_task_manager.run(
|
|
|
|
|
+ task_id, "deployment", _execute_restart(task_id, output_path, port)
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"Restart serving: task={task_id} output_path={output_path} port={port}")
|
|
|
|
|
+ return {"task_id": task_id, "status": "pending", "deploy_mode": "serve", "port": port}
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+async def _execute_restart(task_id: str, output_path: str, port: int) -> dict:
|
|
|
|
|
+ """后台执行重启:只启动 worker,不重新导出。"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ if settings.use_remote_compute:
|
|
|
|
|
+ pid = await _launch_remote_worker(task_id, output_path, port)
|
|
|
|
|
+ else:
|
|
|
|
|
+ pid = await _launch_local_worker(task_id, output_path, port)
|
|
|
|
|
+
|
|
|
|
|
+ endpoint_url = f"/api/v1/deployment/proxy/{task_id}/v1"
|
|
|
|
|
+ await _update_deploy_status(
|
|
|
|
|
+ task_id, "running",
|
|
|
|
|
+ output_path=output_path,
|
|
|
|
|
+ endpoint_url=endpoint_url,
|
|
|
|
|
+ port=port,
|
|
|
|
|
+ pid=pid,
|
|
|
|
|
+ )
|
|
|
|
|
+ return {"endpoint_url": endpoint_url, "port": port, "pid": pid}
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"Restart failed for task {task_id}: {e}")
|
|
|
|
|
+ await _update_deploy_status(task_id, "failed", error=str(e))
|
|
|
|
|
+ return {"error": str(e)}
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
async def list_deployed_services(user_id: str = "") -> list[dict[str, Any]]:
|
|
async def list_deployed_services(user_id: str = "") -> list[dict[str, Any]]:
|
|
|
"""列出 serve 模式的部署任务(按用户过滤)。"""
|
|
"""列出 serve 模式的部署任务(按用户过滤)。"""
|
|
|
async with async_session() as session:
|
|
async with async_session() as session:
|
|
@@ -373,7 +432,7 @@ async def list_deployed_services(user_id: str = "") -> list[dict[str, Any]]:
|
|
|
# 对 running 状态,检查远程进程是否还活着
|
|
# 对 running 状态,检查远程进程是否还活着
|
|
|
if status == "running" and r.pid and settings.use_remote_compute:
|
|
if status == "running" and r.pid and settings.use_remote_compute:
|
|
|
from app.core.remote_executor import is_process_running
|
|
from app.core.remote_executor import is_process_running
|
|
|
- if not is_process_running(r.pid):
|
|
|
|
|
|
|
+ if not await asyncio.to_thread(is_process_running, r.pid):
|
|
|
status = "stopped"
|
|
status = "stopped"
|
|
|
await _update_deploy_status(r.id, "stopped", error="进程已退出")
|
|
await _update_deploy_status(r.id, "stopped", error="进程已退出")
|
|
|
# 释放端口和 PID,确保下次分配时可用
|
|
# 释放端口和 PID,确保下次分配时可用
|
|
@@ -452,7 +511,7 @@ async def _run_remote_export(task_id: str, job_id: str, merge_with_base: bool, e
|
|
|
"print(json.dumps(result, ensure_ascii=False))\" 2>&1"
|
|
"print(json.dumps(result, ensure_ascii=False))\" 2>&1"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- code, stdout, stderr = ssh_exec(remote_cmd, timeout=600)
|
|
|
|
|
|
|
+ code, stdout, stderr = await asyncio.to_thread(ssh_exec, remote_cmd, timeout=600)
|
|
|
|
|
|
|
|
if code != 0:
|
|
if code != 0:
|
|
|
raise RuntimeError(f"Remote export failed: {stderr}")
|
|
raise RuntimeError(f"Remote export failed: {stderr}")
|
|
@@ -515,14 +574,14 @@ async def _run_local_export(task_id: str, job_id: str, merge_with_base: bool) ->
|
|
|
return {"output_path": str(output_path)}
|
|
return {"output_path": str(output_path)}
|
|
|
|
|
|
|
|
|
|
|
|
|
-def _copy_worker_template_remote(output_path: str):
|
|
|
|
|
|
|
+async def _copy_worker_template_remote(output_path: str):
|
|
|
"""把 inference_worker.py 和启动脚本复制到远程模型目录。"""
|
|
"""把 inference_worker.py 和启动脚本复制到远程模型目录。"""
|
|
|
worker_template = f"{settings.compute_node_workdir}/app/core/inference_worker.py"
|
|
worker_template = f"{settings.compute_node_workdir}/app/core/inference_worker.py"
|
|
|
copy_cmd = (
|
|
copy_cmd = (
|
|
|
f"docker exec {settings.compute_node_docker_container} "
|
|
f"docker exec {settings.compute_node_docker_container} "
|
|
|
f"bash -c 'cp {worker_template} {output_path}/inference_worker.py'"
|
|
f"bash -c 'cp {worker_template} {output_path}/inference_worker.py'"
|
|
|
)
|
|
)
|
|
|
- code, _, stderr = ssh_exec(copy_cmd, timeout=30)
|
|
|
|
|
|
|
+ code, _, stderr = await asyncio.to_thread(ssh_exec, copy_cmd, timeout=30)
|
|
|
if code != 0:
|
|
if code != 0:
|
|
|
logger.warning(f"复制 inference_worker.py 到 {output_path} 失败: {stderr}")
|
|
logger.warning(f"复制 inference_worker.py 到 {output_path} 失败: {stderr}")
|
|
|
|
|
|
|
@@ -530,7 +589,7 @@ def _copy_worker_template_remote(output_path: str):
|
|
|
start_script = (
|
|
start_script = (
|
|
|
f"#!/bin/bash\n"
|
|
f"#!/bin/bash\n"
|
|
|
f"cd {output_path}\n"
|
|
f"cd {output_path}\n"
|
|
|
- f"CUDA_VISIBLE_DEVICES=3 MACA_MPS_MODE=1 "
|
|
|
|
|
|
|
+ f"CUDA_VISIBLE_DEVICES={settings.inference_cuda_devices} MACA_MPS_MODE=1 "
|
|
|
f"{settings.compute_node_python} inference_worker.py "
|
|
f"{settings.compute_node_python} inference_worker.py "
|
|
|
f"--model-path . --port 8100\n"
|
|
f"--model-path . --port 8100\n"
|
|
|
)
|
|
)
|
|
@@ -539,7 +598,7 @@ def _copy_worker_template_remote(output_path: str):
|
|
|
f"bash -c 'cat > {output_path}/start.sh << \"EOF\"\n{start_script}EOF\n"
|
|
f"bash -c 'cat > {output_path}/start.sh << \"EOF\"\n{start_script}EOF\n"
|
|
|
f"chmod +x {output_path}/start.sh'"
|
|
f"chmod +x {output_path}/start.sh'"
|
|
|
)
|
|
)
|
|
|
- code, _, _ = ssh_exec(script_cmd, timeout=15)
|
|
|
|
|
|
|
+ code, _, _ = await asyncio.to_thread(ssh_exec, script_cmd, timeout=15)
|
|
|
if code != 0:
|
|
if code != 0:
|
|
|
logger.warning(f"生成 start.sh 失败")
|
|
logger.warning(f"生成 start.sh 失败")
|
|
|
|
|
|
|
@@ -574,6 +633,10 @@ async def _update_deploy_status(
|
|
|
record.pid = pid
|
|
record.pid = pid
|
|
|
if status in ("completed", "failed", "stopped"):
|
|
if status in ("completed", "failed", "stopped"):
|
|
|
record.finished_at = datetime.utcnow()
|
|
record.finished_at = datetime.utcnow()
|
|
|
|
|
+ if status == "pending":
|
|
|
|
|
+ # 重启时清除完成时间和错误信息
|
|
|
|
|
+ record.finished_at = None
|
|
|
|
|
+ record.error = None
|
|
|
await session.commit()
|
|
await session.commit()
|
|
|
|
|
|
|
|
background_task_manager.update_task(
|
|
background_task_manager.update_task(
|