|
|
@@ -291,6 +291,8 @@ class JobQueue:
|
|
|
last_bytes = 0
|
|
|
poll_interval = 5
|
|
|
max_polls = 8640
|
|
|
+ consecutive_empty_polls = 0
|
|
|
+ max_consecutive_empty = 12 # 60 秒无响应就开始检查 stderr
|
|
|
|
|
|
for _ in range(max_polls):
|
|
|
if self.is_cancelled(job_id):
|
|
|
@@ -312,11 +314,14 @@ class JobQueue:
|
|
|
except ValueError:
|
|
|
file_size = 0
|
|
|
|
|
|
+ has_new_log = False
|
|
|
if file_size > last_bytes:
|
|
|
read_cmd = f"docker exec {settings.compute_node_docker_container} bash -c 'tail -c +{last_bytes + 1} {remote_log} 2>/dev/null'"
|
|
|
code, log_content, _ = await asyncio.to_thread(ssh_exec, read_cmd, timeout=30)
|
|
|
|
|
|
if code == 0 and log_content.strip():
|
|
|
+ has_new_log = True
|
|
|
+ consecutive_empty_polls = 0
|
|
|
for line in log_content.strip().split("\n"):
|
|
|
line = line.strip()
|
|
|
if not line:
|
|
|
@@ -366,11 +371,24 @@ class JobQueue:
|
|
|
|
|
|
last_bytes = file_size
|
|
|
|
|
|
+ if not has_new_log:
|
|
|
+ consecutive_empty_polls += 1
|
|
|
+
|
|
|
# 进程已退出但日志里没有 completed/error
|
|
|
if not process_alive:
|
|
|
+ # 多等几秒让日志写完
|
|
|
await asyncio.sleep(2)
|
|
|
if not await asyncio.to_thread(is_process_running, pid):
|
|
|
+ # 进程退出但没有写 completed/error 日志,读取 stderr 日志兜底
|
|
|
error_msg = f"Remote process exited unexpectedly (pid={pid})"
|
|
|
+ try:
|
|
|
+ from app.core.remote_executor import get_remote_stderr
|
|
|
+ stderr_content = await asyncio.to_thread(get_remote_stderr, job_id)
|
|
|
+ if stderr_content:
|
|
|
+ error_msg = stderr_content[-1000:]
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
logger.error(f"Remote job {job_id} failed: {error_msg}")
|
|
|
self.update_job(job_id,
|
|
|
status=JobStatus.FAILED,
|
|
|
@@ -379,6 +397,15 @@ class JobQueue:
|
|
|
await send_error(job_id, error_msg)
|
|
|
return
|
|
|
|
|
|
+ # 长时间无日志且进程异常,也标记为失败
|
|
|
+ if consecutive_empty_polls >= max_consecutive_empty and not process_alive:
|
|
|
+ error_msg = f"Remote process exited unexpectedly (pid={pid}), no error log found"
|
|
|
+ logger.error(f"Remote job {job_id} failed: {error_msg}")
|
|
|
+ self.update_job(job_id, status=JobStatus.FAILED, error_message=error_msg)
|
|
|
+ await self._notify_callbacks()
|
|
|
+ await send_error(job_id, error_msg)
|
|
|
+ return
|
|
|
+
|
|
|
await asyncio.sleep(poll_interval)
|
|
|
|
|
|
# 超时
|