|
@@ -275,35 +275,34 @@ class JobQueue:
|
|
|
return None
|
|
return None
|
|
|
|
|
|
|
|
async def _cleanup_remote_processes(self):
|
|
async def _cleanup_remote_processes(self):
|
|
|
- """通过 SSH 清理容器内所有残留的 python 进程(包括僵尸进程),释放 GPU ring buffer。"""
|
|
|
|
|
|
|
+ """通过 SSH 清理容器内所有残留的 python 进程(包括僵尸进程),释放 GPU ring buffer。
|
|
|
|
|
+
|
|
|
|
|
+ 所有操作合并为一条 SSH 命令,避免多次连接导致超时。
|
|
|
|
|
+ """
|
|
|
from app.config import get_settings
|
|
from app.config import get_settings
|
|
|
from app.core.remote_executor import ssh_exec
|
|
from app.core.remote_executor import ssh_exec
|
|
|
|
|
|
|
|
settings = get_settings()
|
|
settings = get_settings()
|
|
|
container = settings.compute_node_docker_container
|
|
container = settings.compute_node_docker_container
|
|
|
|
|
|
|
|
- # 先检查容器是否存活,避免在容器异常时卡住 SSH
|
|
|
|
|
- check_cmd = f"docker inspect -f '{{{{.State.Running}}}}' {container} 2>/dev/null"
|
|
|
|
|
- code, stdout, _ = await asyncio.to_thread(ssh_exec, check_cmd, timeout=10)
|
|
|
|
|
- if code != 0 or "true" not in stdout.strip().lower():
|
|
|
|
|
- logger.warning(f"Container {container} is not running, skipping cleanup")
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- # 查找所有 python 进程(包括僵尸)
|
|
|
|
|
- cmd = f"docker exec {container} bash -c 'ps aux | grep \"[p]ython\" | grep -v grep | awk \"{{print \\$2}}\"'"
|
|
|
|
|
- code, stdout, _ = await asyncio.to_thread(ssh_exec, cmd, timeout=30)
|
|
|
|
|
- if code == 0 and stdout.strip():
|
|
|
|
|
- pids = stdout.strip().split("\n")
|
|
|
|
|
- for pid in pids:
|
|
|
|
|
- pid = pid.strip()
|
|
|
|
|
- if not pid:
|
|
|
|
|
- continue
|
|
|
|
|
- # 强制 kill(僵尸进程需要父进程 reaper 清理,kill -9 后 PID 1 会自动 reap)
|
|
|
|
|
- kill_cmd = f"docker exec {container} bash -c 'kill -9 {pid} 2>/dev/null; wait {pid} 2>/dev/null'"
|
|
|
|
|
- await asyncio.to_thread(ssh_exec, kill_cmd, timeout=10)
|
|
|
|
|
- logger.info(f"Cleaned up {len(pids)} remote python processes in container {container}")
|
|
|
|
|
|
|
+ # 一条命令完成:检查容器 → 查找 python 进程 → 逐个 kill → 输出清理结果
|
|
|
|
|
+ cmd = (
|
|
|
|
|
+ f"docker inspect -f '{{{{.State.Running}}}}' {container} 2>/dev/null || echo false; "
|
|
|
|
|
+ f"if [ \"$(docker inspect -f '{{{{.State.Running}}}}' {container} 2>/dev/null)\" = 'true' ]; then "
|
|
|
|
|
+ f"pids=$(docker exec {container} bash -c 'ps aux 2>/dev/null | grep \"[p]ython\" | grep -v grep | awk \"{{{{print \\$2}}}}\"'); "
|
|
|
|
|
+ f"if [ -n \"$pids\" ]; then "
|
|
|
|
|
+ f"echo \"$pids\" | while read pid; do "
|
|
|
|
|
+ f"docker exec {container} bash -c 'kill -9 $pid 2>/dev/null; wait $pid 2>/dev/null'; "
|
|
|
|
|
+ f"done; "
|
|
|
|
|
+ f"echo \"cleaned $(echo \"$pids\" | wc -l) processes\"; "
|
|
|
|
|
+ f"else echo 'no python processes'; fi; "
|
|
|
|
|
+ f"else echo 'container not running'; fi"
|
|
|
|
|
+ )
|
|
|
|
|
+ code, stdout, stderr = await asyncio.to_thread(ssh_exec, cmd, timeout=60)
|
|
|
|
|
+ if code != 0:
|
|
|
|
|
+ logger.warning(f"Remote cleanup failed: code={code}, stderr={stderr}")
|
|
|
else:
|
|
else:
|
|
|
- logger.info(f"No python processes found in container {container}, no cleanup needed")
|
|
|
|
|
|
|
+ logger.info(f"Remote cleanup result: {stdout.strip()}")
|
|
|
|
|
|
|
|
async def _lookup_dataset_db(self, dataset_id: str) -> str | None:
|
|
async def _lookup_dataset_db(self, dataset_id: str) -> str | None:
|
|
|
"""从数据库查找数据集路径。"""
|
|
"""从数据库查找数据集路径。"""
|
|
@@ -363,8 +362,8 @@ class JobQueue:
|
|
|
)
|
|
)
|
|
|
logger.info(f"Killed remote process {pid} via docker exec")
|
|
logger.info(f"Killed remote process {pid} via docker exec")
|
|
|
return
|
|
return
|
|
|
- except Exception:
|
|
|
|
|
- pass
|
|
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.warning(f"Failed to kill process {pid} via docker exec: {e}")
|
|
|
|
|
|
|
|
# 方式2: nsenter 从宿主机直接进入进程 namespace 发信号
|
|
# 方式2: nsenter 从宿主机直接进入进程 namespace 发信号
|
|
|
try:
|
|
try:
|
|
@@ -375,8 +374,8 @@ class JobQueue:
|
|
|
)
|
|
)
|
|
|
logger.info(f"Killed remote process {pid} via nsenter")
|
|
logger.info(f"Killed remote process {pid} via nsenter")
|
|
|
return
|
|
return
|
|
|
- except Exception:
|
|
|
|
|
- pass
|
|
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.warning(f"Failed to kill process {pid} via nsenter: {e}")
|
|
|
|
|
|
|
|
# 方式3: 终极方案 — 重启整个容器(释放所有 GPU 资源)
|
|
# 方式3: 终极方案 — 重启整个容器(释放所有 GPU 资源)
|
|
|
try:
|
|
try:
|
|
@@ -386,8 +385,8 @@ class JobQueue:
|
|
|
timeout=30,
|
|
timeout=30,
|
|
|
)
|
|
)
|
|
|
logger.warning(f"Force restarted container {container} to release GPU resources")
|
|
logger.warning(f"Force restarted container {container} to release GPU resources")
|
|
|
- except Exception:
|
|
|
|
|
- pass
|
|
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"Failed to restart container {container}: {e}")
|
|
|
|
|
|
|
|
async def _mark_failed(error_msg: str):
|
|
async def _mark_failed(error_msg: str):
|
|
|
"""统一标记失败:先 kill 远程进程,再更新状态。"""
|
|
"""统一标记失败:先 kill 远程进程,再更新状态。"""
|