deploy_service.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595
  1. """部署服务 —— 导出模型 / 部署为在线推理服务。
  2. 架构:
  3. - 253 算力节点运行轻量 inference_worker.py(纯 stdlib + torch/transformers,不需要 fastapi/uvicorn)
  4. - 151 主节点对外提供 OpenAI 兼容代理 API,通过 TCP 转发请求到 253
  5. """
  6. import asyncio
  7. import json
  8. import socket
  9. import struct
  10. import uuid
  11. from datetime import datetime, timezone
  12. from pathlib import Path
  13. from typing import Any
  14. from app.config import get_settings
  15. from app.core.background_tasks import background_task_manager
  16. from app.core.db import async_session, DeployTaskModel
  17. from app.core.logging import logger
  18. from app.core.remote_executor import ssh_exec
  19. from sqlalchemy import select
  20. settings = get_settings()
  21. # 253 上 worker 的 TCP 端口范围
  22. _SERVE_PORT_MIN = 8100
  23. _SERVE_PORT_MAX = 8199
  24. # ---------------------------------------------------------------------------
  25. # TCP 代理:151 → 253 inference_worker
  26. # ---------------------------------------------------------------------------
  27. async def proxy_to_worker(task_id: str, request: dict) -> dict:
  28. """通过 TCP 把推理请求转发到 253 的 inference_worker,返回响应。
  29. 协议:4 字节大端长度前缀 + JSON body
  30. """
  31. # 查 DB 获取 worker 监听的端口
  32. async with async_session() as session:
  33. result = await session.execute(
  34. select(DeployTaskModel).where(DeployTaskModel.id == task_id)
  35. )
  36. record = result.scalar_one_or_none()
  37. if not record:
  38. return {"error": "部署任务不存在"}
  39. if record.status != "running":
  40. return {"error": f"服务未运行(当前状态: {record.status})"}
  41. port = record.port
  42. if not port:
  43. return {"error": "未找到 worker 端口"}
  44. # 通过 asyncio 在线程池中执行同步 TCP 操作
  45. return await asyncio.to_thread(_tcp_request, settings.compute_node_host, port, request)
  46. def _tcp_request(host: str, port: int, request: dict) -> dict:
  47. """同步 TCP 请求:连接到 worker,发送请求,接收响应。"""
  48. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  49. sock.settimeout(120) # 推理可能耗时较长
  50. try:
  51. sock.connect((host, port))
  52. # 发送:4 字节长度 + JSON
  53. body = json.dumps(request, ensure_ascii=False).encode("utf-8")
  54. sock.sendall(struct.pack(">I", len(body)))
  55. sock.sendall(body)
  56. # 接收:4 字节长度 + JSON
  57. len_data = _recv_exact(sock, 4)
  58. resp_len = struct.unpack(">I", len_data)[0]
  59. resp_data = _recv_exact(sock, resp_len)
  60. return json.loads(resp_data.decode("utf-8"))
  61. except socket.timeout:
  62. return {"error": "推理超时(120s)"}
  63. except ConnectionRefusedError:
  64. return {"error": f"无法连接到推理 worker({host}:{port}),服务可能已停止"}
  65. except Exception as e:
  66. return {"error": f"代理请求失败: {e}"}
  67. finally:
  68. sock.close()
  69. def _recv_exact(sock: socket.socket, n: int) -> bytes:
  70. """确保接收恰好 n 字节。"""
  71. buf = bytearray()
  72. while len(buf) < n:
  73. chunk = sock.recv(n - len(buf))
  74. if not chunk:
  75. raise ConnectionError("Connection closed while reading")
  76. buf.extend(chunk)
  77. return bytes(buf)
  78. # ---------------------------------------------------------------------------
  79. # 导出 Adapter(导出文件模式)
  80. # ---------------------------------------------------------------------------
  81. async def export_adapter(job_id: str, config: dict[str, Any], user_id: str = "") -> dict[str, Any]:
  82. """启动导出后台任务,立即返回 task_id。"""
  83. task_id = str(uuid.uuid4())
  84. merge_with_base = config.get("merge_with_base", False)
  85. export_format = config.get("export_format", "safetensors")
  86. task = DeployTaskModel(
  87. id=task_id,
  88. job_id=job_id,
  89. user_id=user_id or None,
  90. status="pending",
  91. deploy_mode="export",
  92. )
  93. async with async_session() as session:
  94. session.add(task)
  95. await session.commit()
  96. background_task_manager.register_task(task_id, "deployment", {"job_id": job_id})
  97. await background_task_manager.run(
  98. task_id, "deployment", _execute_export(task_id, job_id, merge_with_base, export_format)
  99. )
  100. logger.info(f"Deploy task started: job={job_id} (task_id={task_id})")
  101. return {"task_id": task_id, "job_id": job_id, "status": "pending", "deploy_mode": "export"}
  102. async def _execute_export(task_id: str, job_id: str, merge_with_base: bool, export_format: str) -> dict:
  103. """后台执行导出。"""
  104. try:
  105. if settings.use_remote_compute:
  106. result = await _run_remote_export(task_id, job_id, merge_with_base, export_format)
  107. else:
  108. result = await _run_local_export(task_id, job_id, merge_with_base)
  109. output_path = result.get("output_path")
  110. # 把 inference_worker.py 和启动脚本复制到输出目录
  111. if output_path and settings.use_remote_compute:
  112. _copy_worker_template_remote(output_path)
  113. await _update_deploy_status(task_id, "completed", output_path=output_path)
  114. return {"output_path": output_path}
  115. except Exception as e:
  116. logger.error(f"Export failed for job {job_id}: {e}")
  117. await _update_deploy_status(task_id, "failed", error=str(e))
  118. return {"error": str(e)}
  119. # ---------------------------------------------------------------------------
  120. # 部署为在线服务(serve 模式)
  121. # ---------------------------------------------------------------------------
  122. async def start_serving(job_id: str, config: dict[str, Any], user_id: str = "") -> dict[str, Any]:
  123. """部署为在线推理服务,151 代理对外,253 worker 做推理。"""
  124. task_id = str(uuid.uuid4())
  125. merge_with_base = config.get("merge_with_base", True)
  126. port = config.get("port")
  127. if not port:
  128. port = await _allocate_port()
  129. task = DeployTaskModel(
  130. id=task_id,
  131. job_id=job_id,
  132. user_id=user_id or None,
  133. status="pending",
  134. deploy_mode="serve",
  135. port=port,
  136. )
  137. async with async_session() as session:
  138. session.add(task)
  139. await session.commit()
  140. background_task_manager.register_task(task_id, "deployment", {"job_id": job_id, "mode": "serve"})
  141. await background_task_manager.run(
  142. task_id, "deployment", _execute_serve(task_id, job_id, merge_with_base, port)
  143. )
  144. logger.info(f"Serve task started: job={job_id} port={port} (task_id={task_id})")
  145. return {"task_id": task_id, "job_id": job_id, "status": "pending", "deploy_mode": "serve", "port": port}
  146. async def _execute_serve(task_id: str, job_id: str, merge_with_base: bool, port: int) -> dict:
  147. """后台执行:导出模型 → 复制 worker → 启动 TCP 推理 worker。"""
  148. try:
  149. # 第一步:导出(合并 adapter)
  150. if settings.use_remote_compute:
  151. export_result = await _run_remote_export(task_id, job_id, merge_with_base, "safetensors")
  152. output_path = export_result.get("output_path")
  153. else:
  154. export_result = await _run_local_export(task_id, job_id, merge_with_base)
  155. output_path = export_result.get("output_path")
  156. if not output_path:
  157. raise RuntimeError("导出失败,无法获取输出路径")
  158. # 第二步:启动推理 worker
  159. if settings.use_remote_compute:
  160. pid = await _launch_remote_worker(task_id, output_path, port)
  161. else:
  162. pid = await _launch_local_worker(task_id, output_path, port)
  163. # endpoint_url 是 151 上的代理路径(相对路径,前端拼接 origin)
  164. endpoint_url = f"/api/v1/deployment/proxy/{task_id}/v1"
  165. await _update_deploy_status(
  166. task_id, "running",
  167. output_path=output_path,
  168. endpoint_url=endpoint_url,
  169. port=port,
  170. pid=pid,
  171. )
  172. return {"endpoint_url": endpoint_url, "port": port, "pid": pid}
  173. except Exception as e:
  174. logger.error(f"Serve failed for job {job_id}: {e}")
  175. await _update_deploy_status(task_id, "failed", error=str(e))
  176. return {"error": str(e)}
  177. async def _launch_remote_worker(task_id: str, model_path: str, port: int) -> str:
  178. """在远程 253 容器里启动 inference_worker.py,返回进程 PID。
  179. 只依赖 torch + transformers(不需要 fastapi/uvicorn)。
  180. """
  181. # worker 脚本在容器内的路径
  182. worker_template = f"{settings.compute_node_workdir}/app/core/inference_worker.py"
  183. # 复制 worker 到模型目录
  184. copy_cmd = (
  185. f"docker exec {settings.compute_node_docker_container} "
  186. f"bash -c 'cp {worker_template} {model_path}/inference_worker.py'"
  187. )
  188. code, _, stderr = ssh_exec(copy_cmd, timeout=30)
  189. if code != 0:
  190. raise RuntimeError(f"复制 inference_worker.py 失败: {stderr}")
  191. # 在容器内后台启动 worker
  192. # 使用 exec 让 Python 进程直接占用 PID,避免 setsid session leader PID 不匹配
  193. launch_cmd = (
  194. f"docker exec "
  195. f"-e MACA_MPS_MODE=1 "
  196. f"-e CUDA_VISIBLE_DEVICES=3 "
  197. f"-w {model_path} "
  198. f"{settings.compute_node_docker_container} "
  199. f"bash -c '"
  200. f"nohup {settings.compute_node_python} inference_worker.py "
  201. f"--model-path {model_path} "
  202. f"--port {port} "
  203. f"</dev/null >/tmp/serve_{task_id}.log 2>&1 &"
  204. f" echo $!'"
  205. )
  206. code, stdout, stderr = ssh_exec(launch_cmd, timeout=30)
  207. if code != 0:
  208. raise RuntimeError(f"启动推理 worker 失败: {stderr}")
  209. pid = stdout.strip()
  210. logger.info(f"Remote worker launched: task={task_id} port={port} pid={pid}")
  211. # 等待模型加载(可能需要较长时间),检查 READY 标记
  212. # 每次轮询只用一次 SSH 连接,同时检查 READY 和进程状态
  213. import asyncio as _aio
  214. for attempt in range(60): # 最多等 5 分钟(60 * 5s)
  215. await _aio.sleep(5)
  216. check_cmd = (
  217. f"docker exec {settings.compute_node_docker_container} "
  218. f"bash -c '"
  219. f" ready=$(grep -c READY /tmp/serve_{task_id}.log 2>/dev/null || echo 0); "
  220. f" if [ \"$ready\" != \"0\" ]; then echo \"READY:$ready\"; exit 0; fi; "
  221. f" if ! kill -0 {pid} 2>/dev/null; then echo \"DEAD\"; exit 0; fi; "
  222. f" echo \"ALIVE\"; "
  223. f"'"
  224. )
  225. code, stdout, stderr = ssh_exec(check_cmd, timeout=30)
  226. if code == 0:
  227. result = stdout.strip()
  228. if result.startswith("READY:"):
  229. logger.info(f"Worker ready: task={task_id} (after ~{(attempt+1)*5}s)")
  230. return pid
  231. elif result == "DEAD":
  232. # 读取日志看什么错了
  233. log_cmd = (
  234. f"docker exec {settings.compute_node_docker_container} "
  235. f"bash -c 'tail -20 /tmp/serve_{task_id}.log 2>/dev/null'"
  236. )
  237. _, log_stdout, _ = ssh_exec(log_cmd, timeout=30)
  238. raise RuntimeError(f"Worker 进程已退出: {log_stdout}")
  239. # result == "ALIVE" → 继续等待
  240. logger.warning(f"Worker not ready after 5min: task={task_id}, proceeding anyway")
  241. return pid
  242. async def _launch_local_worker(task_id: str, model_path: str, port: int) -> str:
  243. """在本地启动推理 worker(开发用)。"""
  244. import subprocess
  245. import shutil
  246. import sys
  247. worker_src = Path(__file__).resolve().parent.parent / "core" / "inference_worker.py"
  248. shutil.copy(worker_src, Path(model_path) / "inference_worker.py")
  249. proc = subprocess.Popen(
  250. [sys.executable, "inference_worker.py", "--model-path", model_path, "--port", str(port)],
  251. cwd=model_path,
  252. stdout=subprocess.DEVNULL,
  253. stderr=subprocess.DEVNULL,
  254. )
  255. return str(proc.pid)
  256. # ---------------------------------------------------------------------------
  257. # 停止服务 / 列表 / 状态
  258. # ---------------------------------------------------------------------------
  259. async def stop_serving(task_id: str, user_id: str = "") -> dict[str, Any]:
  260. """停止已部署的在线服务。"""
  261. async with async_session() as session:
  262. result = await session.execute(select(DeployTaskModel).where(DeployTaskModel.id == task_id))
  263. record = result.scalar_one_or_none()
  264. if not record:
  265. return {"error": "任务不存在"}
  266. if record.deploy_mode != "serve":
  267. return {"error": "该任务不是在线服务"}
  268. if user_id and record.user_id and record.user_id != user_id:
  269. return {"error": "无权操作此任务"}
  270. pid = record.pid
  271. if pid and settings.use_remote_compute:
  272. # 杀掉远程 worker 进程及其子线程
  273. kill_cmd = (
  274. f"docker exec {settings.compute_node_docker_container} "
  275. f"bash -c 'kill {pid} 2>/dev/null; pkill -P {pid} 2>/dev/null; true'"
  276. )
  277. code, _, _ = ssh_exec(kill_cmd, timeout=15)
  278. logger.info(f"Stop serving: task={task_id} pid={pid} kill_code={code}")
  279. record.status = "stopped"
  280. record.pid = None
  281. record.finished_at = datetime.utcnow()
  282. await session.commit()
  283. background_task_manager.update_task(task_id, status="stopped")
  284. return {"task_id": task_id, "status": "stopped"}
  285. async def list_deployed_services(user_id: str = "") -> list[dict[str, Any]]:
  286. """列出 serve 模式的部署任务(按用户过滤)。"""
  287. async with async_session() as session:
  288. query = select(DeployTaskModel).where(DeployTaskModel.deploy_mode == "serve")
  289. if user_id:
  290. query = query.where(DeployTaskModel.user_id == user_id)
  291. query = query.order_by(DeployTaskModel.created_at.desc())
  292. result = await session.execute(query)
  293. records = result.scalars().all()
  294. services = []
  295. for r in records:
  296. status = r.status
  297. # 对 running 状态,检查远程进程是否还活着
  298. if status == "running" and r.pid and settings.use_remote_compute:
  299. from app.core.remote_executor import is_process_running
  300. if not is_process_running(r.pid):
  301. status = "stopped"
  302. await _update_deploy_status(r.id, "stopped", error="进程已退出")
  303. services.append({
  304. "task_id": r.id,
  305. "job_id": r.job_id,
  306. "status": status,
  307. "endpoint_url": r.endpoint_url,
  308. "base_url": r.endpoint_url,
  309. "port": r.port,
  310. "output_path": r.output_path,
  311. "created_at": r.created_at.isoformat() if r.created_at else None,
  312. "error": r.error,
  313. })
  314. return services
  315. async def get_deploy_status(task_id: str) -> dict[str, Any]:
  316. """获取部署任务状态。"""
  317. async with async_session() as session:
  318. result = await session.execute(select(DeployTaskModel).where(DeployTaskModel.id == task_id))
  319. record = result.scalar_one_or_none()
  320. if record:
  321. return {
  322. "task_id": record.id,
  323. "job_id": record.job_id,
  324. "status": record.status,
  325. "deploy_mode": record.deploy_mode or "export",
  326. "progress": record.progress,
  327. "output_path": record.output_path,
  328. "endpoint_url": record.endpoint_url,
  329. "port": record.port,
  330. "error": record.error,
  331. }
  332. return {"task_id": None, "job_id": "", "status": "not_found", "deploy_mode": "export",
  333. "progress": 0.0, "output_path": None, "endpoint_url": None, "port": None, "error": None}
  334. # ---------------------------------------------------------------------------
  335. # 辅助函数
  336. # ---------------------------------------------------------------------------
  337. async def _allocate_port() -> int:
  338. """从端口池里分配一个未使用的端口。"""
  339. async with async_session() as session:
  340. result = await session.execute(
  341. select(DeployTaskModel.port).where(
  342. DeployTaskModel.deploy_mode == "serve",
  343. DeployTaskModel.status.in_(["pending", "running"]),
  344. DeployTaskModel.port.isnot(None),
  345. )
  346. )
  347. used = {row[0] for row in result.all()}
  348. for port in range(_SERVE_PORT_MIN, _SERVE_PORT_MAX + 1):
  349. if port not in used:
  350. return port
  351. raise RuntimeError(f"无可用端口({_SERVE_PORT_MIN}-{_SERVE_PORT_MAX} 全部占用)")
  352. async def _run_remote_export(task_id: str, job_id: str, merge_with_base: bool, export_format: str) -> dict:
  353. """通过 SSH 在远程容器执行模型合并/导出。"""
  354. remote_cmd = (
  355. f"docker exec "
  356. f"-e MACA_MPS_MODE=1 "
  357. f"-e CUDA_VISIBLE_DEVICES=3 "
  358. f"-w {settings.compute_node_workdir} "
  359. f"{settings.compute_node_docker_container} "
  360. f"{settings.compute_node_python} -c \""
  361. "import asyncio, json; "
  362. "from app.core.remote_deploy import run_remote_export; "
  363. f"result = asyncio.run(run_remote_export('{job_id}', {merge_with_base}, '{export_format}')); "
  364. "print(json.dumps(result, ensure_ascii=False))\" 2>&1"
  365. )
  366. code, stdout, stderr = ssh_exec(remote_cmd, timeout=600)
  367. if code != 0:
  368. raise RuntimeError(f"Remote export failed: {stderr}")
  369. for line in reversed(stdout.strip().split("\n")):
  370. line = line.strip()
  371. if line.startswith("{"):
  372. try:
  373. result = json.loads(line)
  374. if "error" in result:
  375. raise RuntimeError(result["error"])
  376. return result
  377. except json.JSONDecodeError:
  378. continue
  379. raise RuntimeError(f"Invalid response: {stdout[:500]}")
  380. async def _run_local_export(task_id: str, job_id: str, merge_with_base: bool) -> dict:
  381. """本地执行导出(开发用)。"""
  382. adapter_path = settings.adapters_dir / job_id
  383. if not adapter_path.exists():
  384. raise ValueError("Adapter not found")
  385. output_path = settings.adapters_dir / f"{job_id}_merged"
  386. if merge_with_base:
  387. import torch
  388. from transformers import AutoModelForCausalLM, AutoTokenizer
  389. base_model_id = _get_base_model_id_local(job_id)
  390. if base_model_id:
  391. from peft import PeftModel
  392. base_model = AutoModelForCausalLM.from_pretrained(
  393. base_model_id, torch_dtype=torch.float16, device_map="auto"
  394. )
  395. peft_model = PeftModel.from_pretrained(base_model, adapter_path)
  396. merged = peft_model.merge_and_unload()
  397. merged.save_pretrained(output_path)
  398. tokenizer = AutoTokenizer.from_pretrained(adapter_path)
  399. tokenizer.save_pretrained(output_path)
  400. else:
  401. from peft import PeftModel
  402. merged = PeftModel.from_pretrained(
  403. AutoModelForCausalLM.from_pretrained(
  404. str(adapter_path), torch_dtype=torch.float16
  405. ),
  406. str(adapter_path),
  407. )
  408. merged = merged.merge_and_unload()
  409. merged.save_pretrained(output_path)
  410. tokenizer = AutoTokenizer.from_pretrained(adapter_path)
  411. tokenizer.save_pretrained(output_path)
  412. else:
  413. import shutil
  414. if output_path.exists():
  415. shutil.rmtree(output_path)
  416. shutil.copytree(adapter_path, output_path)
  417. return {"output_path": str(output_path)}
  418. def _copy_worker_template_remote(output_path: str):
  419. """把 inference_worker.py 和启动脚本复制到远程模型目录。"""
  420. worker_template = f"{settings.compute_node_workdir}/app/core/inference_worker.py"
  421. copy_cmd = (
  422. f"docker exec {settings.compute_node_docker_container} "
  423. f"bash -c 'cp {worker_template} {output_path}/inference_worker.py'"
  424. )
  425. code, _, stderr = ssh_exec(copy_cmd, timeout=30)
  426. if code != 0:
  427. logger.warning(f"复制 inference_worker.py 到 {output_path} 失败: {stderr}")
  428. # 生成快捷启动脚本
  429. start_script = (
  430. f"#!/bin/bash\n"
  431. f"cd {output_path}\n"
  432. f"CUDA_VISIBLE_DEVICES=3 MACA_MPS_MODE=1 "
  433. f"{settings.compute_node_python} inference_worker.py "
  434. f"--model-path . --port 8100\n"
  435. )
  436. script_cmd = (
  437. f"docker exec {settings.compute_node_docker_container} "
  438. f"bash -c 'cat > {output_path}/start.sh << \"EOF\"\n{start_script}EOF\n"
  439. f"chmod +x {output_path}/start.sh'"
  440. )
  441. code, _, _ = ssh_exec(script_cmd, timeout=15)
  442. if code != 0:
  443. logger.warning(f"生成 start.sh 失败")
  444. def _get_base_model_id_local(job_id: str):
  445. config_path = settings.adapters_dir / job_id / "adapter_config.json"
  446. if config_path.exists():
  447. with open(config_path) as f:
  448. return json.load(f).get("base_model_name_or_path")
  449. return None
  450. async def _update_deploy_status(
  451. task_id: str, status: str,
  452. output_path: str = None, error: str = None,
  453. endpoint_url: str = None, port: int = None, pid: str = None,
  454. ):
  455. async with async_session() as session:
  456. result = await session.execute(select(DeployTaskModel).where(DeployTaskModel.id == task_id))
  457. record = result.scalar_one_or_none()
  458. if record:
  459. record.status = status
  460. if output_path:
  461. record.output_path = output_path
  462. if error:
  463. record.error = error
  464. if endpoint_url:
  465. record.endpoint_url = endpoint_url
  466. if port:
  467. record.port = port
  468. if pid:
  469. record.pid = pid
  470. if status in ("completed", "failed", "stopped"):
  471. record.finished_at = datetime.utcnow()
  472. await session.commit()
  473. background_task_manager.update_task(
  474. task_id, status=status, output_path=output_path, error=error,
  475. endpoint_url=endpoint_url,
  476. )
  477. async def recover_stale_deploys() -> None:
  478. async with async_session() as session:
  479. result = await session.execute(
  480. select(DeployTaskModel).where(
  481. DeployTaskModel.status.in_(["pending", "running"])
  482. )
  483. )
  484. records = result.scalars().all()
  485. for record in records:
  486. if record.deploy_mode == "export":
  487. record.status = "failed"
  488. record.error = "Server restarted, task interrupted"
  489. elif record.deploy_mode == "serve":
  490. if record.pid and settings.use_remote_compute:
  491. from app.core.remote_executor import is_process_running
  492. if not is_process_running(record.pid):
  493. record.status = "stopped"
  494. record.error = "Server restarted, process no longer running"
  495. else:
  496. continue # 进程还在,保持 running
  497. else:
  498. record.status = "stopped"
  499. record.error = "Server restarted, process state unknown"
  500. record.finished_at = datetime.utcnow()
  501. if records:
  502. await session.commit()
  503. logger.info(f"Recovered {len(records)} stale deploy tasks")