Forráskód Böngészése

修复进度读取不到错误

lxylxy123321 1 hete
szülő
commit
6e08d74dd5
3 módosított fájl, 74 hozzáadás és 48 törlés
  1. 12 3
      backend/app/config.py
  2. 22 32
      backend/app/core/job_queue.py
  3. 40 13
      result.txt

+ 12 - 3
backend/app/config.py

@@ -6,12 +6,13 @@ from pydantic import Field, field_validator
 from pydantic_settings import (
     BaseSettings,
     EnvSettingsSource,
+    DotEnvSettingsSource,
     SettingsConfigDict,
 )
 
 
-class EnvSettingsSourceWithCommaLists(EnvSettingsSource):
-    """Override decode_complex_value to handle comma-separated lists."""
+class _CommaListMixin:
+    """Mixin for handling comma-separated list values in env/dotenv sources."""
 
     def decode_complex_value(self, field_name, field, value):
         if field_name == "backend_cors_origins" and isinstance(value, str):
@@ -19,6 +20,14 @@ class EnvSettingsSourceWithCommaLists(EnvSettingsSource):
         return super().decode_complex_value(field_name, field, value)
 
 
+class DotEnvSourceWithCommaLists(_CommaListMixin, DotEnvSettingsSource):
+    """Dotenv source that handles comma-separated lists."""
+
+
+class EnvSettingsSourceWithCommaLists(_CommaListMixin, EnvSettingsSource):
+    """Env source that handles comma-separated lists."""
+
+
 class Settings(BaseSettings):
     model_config = SettingsConfigDict(
         env_file=str(Path(__file__).resolve().parent.parent / ".env"),
@@ -33,7 +42,7 @@ class Settings(BaseSettings):
     ):
         return (
             init_settings,
-            dotenv_settings,
+            DotEnvSourceWithCommaLists(settings_cls),
             EnvSettingsSourceWithCommaLists(settings_cls),
             file_secret_settings,
         )

+ 22 - 32
backend/app/core/job_queue.py

@@ -276,21 +276,19 @@ class JobQueue:
             return text_engine
 
     async def _poll_remote_progress(self, job_id: str, pid: str):
-        """轮询共享日志文件,解析远程训练进度并通过 WebSocket 推送。"""
+        """通过 SSH 读取远程日志文件,解析训练进度。"""
         from app.config import get_settings
         from app.core.websocket import send_progress, send_epoch_done, send_completed, send_error
+        from app.core.remote_executor import ssh_exec
 
         settings = get_settings()
-        log_file = settings.data_dir / "logs" / f"{job_id}.jsonl"
-        last_offset = 0
-        poll_interval = 5  # 每 5 秒轮询一次
-        max_polls = 8640  # 最多轮询 12 小时 (8640 * 5s)
+        remote_log = f"{settings.compute_node_remote_data_dir}/logs/{job_id}.jsonl"
+        last_bytes = 0
+        poll_interval = 5
+        max_polls = 8640
 
         for _ in range(max_polls):
             if self.is_cancelled(job_id):
-                # 取消容器内的远程进程
-                from app.core.remote_executor import ssh_exec
-                from app.config import get_settings
                 _s = get_settings()
                 ssh_exec(f"docker exec {_s.compute_node_docker_container} bash -c 'kill {pid} 2>/dev/null'", timeout=10)
                 self.update_job(job_id, status=JobStatus.CANCELLED)
@@ -302,15 +300,20 @@ class JobQueue:
             from app.core.remote_executor import is_process_running
             process_alive = is_process_running(pid)
 
-            # 读取新的日志行
-            if log_file.exists():
-                try:
-                    with open(log_file, "r", encoding="utf-8") as f:
-                        f.seek(last_offset)
-                        new_lines = f.readlines()
-                        last_offset = f.tell()
+            # 通过 SSH 远程读取日志文件(追加部分)
+            cat_cmd = f"docker exec {settings.compute_node_docker_container} bash -c 'wc -c < {remote_log} 2>/dev/null || echo 0'"
+            code, size_out, _ = ssh_exec(cat_cmd, timeout=30)
+            try:
+                file_size = int(size_out.strip()) if code == 0 and size_out.strip() else 0
+            except ValueError:
+                file_size = 0
+
+            if file_size > last_bytes:
+                read_cmd = f"docker exec {settings.compute_node_docker_container} bash -c 'tail -c +{last_bytes + 1} {remote_log} 2>/dev/null'"
+                code, log_content, _ = ssh_exec(read_cmd, timeout=30)
 
-                    for line in new_lines:
+                if code == 0 and log_content.strip():
+                    for line in log_content.strip().split("\n"):
                         line = line.strip()
                         if not line:
                             continue
@@ -322,7 +325,6 @@ class JobQueue:
                         entry_type = entry.get("type")
                         if entry_type == "progress":
                             self.update_job(job_id,
-                                            epoch=entry.get("epoch", 0),
                                             current_step=entry.get("step", 0),
                                             total_steps=entry.get("total_steps", 0),
                                             loss=entry.get("loss"),
@@ -355,25 +357,13 @@ class JobQueue:
                             await self._notify_callbacks()
                             await send_error(job_id, entry.get("message", "Unknown error"))
                             return
-                except Exception as e:
-                    logger.warning(f"Error reading remote log file: {e}")
 
-            # 进程已退出但日志里没有 completed/error,可能异常退出
+                    last_bytes = file_size
+
+            # 进程已退出但日志里没有 completed/error
             if not process_alive:
-                # 再等一轮确认
                 await asyncio.sleep(2)
                 if not is_process_running(pid):
-                    # 检查日志里是否有最终状态
-                    if log_file.exists():
-                        try:
-                            with open(log_file, "r", encoding="utf-8") as f:
-                                content = f.read()
-                                if "completed" in content or "error" in content:
-                                    # 上面已经处理过了
-                                    continue
-                        except Exception:
-                            pass
-                    # 进程退出但没有最终状态,视为失败
                     self.update_job(job_id,
                                     status=JobStatus.FAILED,
                                     error_message=f"Remote process exited unexpectedly (pid={pid})")

+ 40 - 13
result.txt

@@ -1,15 +1,13 @@
-(base) [root@localhost ~]# docker exec finetune-trainer cat /tmp/train_7bcbc0bb-72c7-408f-a4c6-c38fb05b8382.log
+(base) [root@localhost ~]# docker exec -w /root/Fine-tuning/backend finetune-trainer /opt/conda/bin/python -m app.engines.remote_train cce886de-4dd5-460a-b0ac-2404731cd9f8 Qwen/Qwen3.5-0.8B text yanalong/yanalong /root/Fine-tuning/backend/data/config_cce886de-4dd5-460a-b0ac-2404731cd9f8.json
 Traceback (most recent call last):
-  File "/opt/conda/lib/python3.10/runpy.py", line 196, in _run_module_as_main
-    return _run_code(code, main_globals, None,
-  File "/opt/conda/lib/python3.10/runpy.py", line 86, in _run_code
-    exec(code, run_globals)
-  File "/root/Fine-tuning/backend/app/engines/remote_train.py", line 179, in <module>
-    main()
-  File "/root/Fine-tuning/backend/app/engines/remote_train.py", line 173, in main
-    config = json.load(f)
-  File "/opt/conda/lib/python3.10/json/__init__.py", line 293, in load
-    return loads(fp.read(),
+  File "/opt/conda/lib/python3.10/site-packages/pydantic_settings/sources/base.py", line 551, in __call__
+    field_value = self.prepare_field_value(field_name, field, field_value, value_is_complex)
+  File "/opt/conda/lib/python3.10/site-packages/pydantic_settings/sources/providers/env.py", line 134, in prepare_field_value
+    raise e
+  File "/opt/conda/lib/python3.10/site-packages/pydantic_settings/sources/providers/env.py", line 131, in prepare_field_value
+    value = self.decode_complex_value(field_name, field, value)
+  File "/opt/conda/lib/python3.10/site-packages/pydantic_settings/sources/base.py", line 194, in decode_complex_value
+    return json.loads(value)
   File "/opt/conda/lib/python3.10/json/__init__.py", line 346, in loads
     return _default_decoder.decode(s)
   File "/opt/conda/lib/python3.10/json/decoder.py", line 337, in decode
@@ -17,5 +15,34 @@ Traceback (most recent call last):
   File "/opt/conda/lib/python3.10/json/decoder.py", line 355, in raw_decode
     raise JSONDecodeError("Expecting value", s, err.value) from None
 json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
-(base) [root@localhost ~]# docker exec finetune-trainer /opt/conda/bin/python -c "import sys; sys.path.insert(0, '/root/Fine-tuning/backend'); from app.engines.remote_train import run_training; print('ok')"
-ok
+
+The above exception was the direct cause of the following exception:
+
+Traceback (most recent call last):
+  File "/opt/conda/lib/python3.10/runpy.py", line 196, in _run_module_as_main
+    return _run_code(code, main_globals, None,
+  File "/opt/conda/lib/python3.10/runpy.py", line 86, in _run_code
+    exec(code, run_globals)
+  File "/root/Fine-tuning/backend/app/engines/remote_train.py", line 179, in <module>
+    main()
+  File "/root/Fine-tuning/backend/app/engines/remote_train.py", line 175, in main
+    asyncio.run(run_training(job_id, model_id, model_type, dataset_id, config))
+  File "/opt/conda/lib/python3.10/asyncio/runners.py", line 44, in run
+    return loop.run_until_complete(main)
+  File "/opt/conda/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
+    return future.result()
+  File "/root/Fine-tuning/backend/app/engines/remote_train.py", line 78, in run_training
+    from app.core.logging import logger
+  File "/root/Fine-tuning/backend/app/core/logging.py", line 5, in <module>
+    settings = get_settings()
+  File "/root/Fine-tuning/backend/app/config.py", line 141, in get_settings
+    settings = Settings()
+  File "/opt/conda/lib/python3.10/site-packages/pydantic_settings/main.py", line 247, in __init__
+    super().__init__(**__pydantic_self__.__class__._settings_build_values(sources, init_kwargs))
+  File "/opt/conda/lib/python3.10/site-packages/pydantic_settings/main.py", line 470, in _settings_build_values
+    source_state = source()
+  File "/opt/conda/lib/python3.10/site-packages/pydantic_settings/sources/providers/dotenv.py", line 112, in __call__
+    data: dict[str, Any] = super().__call__()
+  File "/opt/conda/lib/python3.10/site-packages/pydantic_settings/sources/base.py", line 553, in __call__
+    raise SettingsError(
+pydantic_settings.exceptions.SettingsError: error parsing value for field "backend_cors_origins" from source "DotEnvSettingsSource"