Skip to content

Commit 6b2240e

Browse files
authored
feat: Stream DGXC logs (#377)
* feat: Stream DGXC logs Signed-off-by: oliver könig <[email protected]> * revert Signed-off-by: oliver könig <[email protected]> * revert Signed-off-by: oliver könig <[email protected]> * remove Signed-off-by: oliver könig <[email protected]> * fix tests Signed-off-by: oliver könig <[email protected]> * remove Signed-off-by: oliver könig <[email protected]> * fix tests Signed-off-by: oliver könig <[email protected]> * fix Signed-off-by: oliver könig <[email protected]> * fix Signed-off-by: oliver könig <[email protected]> * fix Signed-off-by: oliver könig <[email protected]> * add test for executor Signed-off-by: oliver könig <[email protected]> * add test for scheduler Signed-off-by: oliver könig <[email protected]> * fix Signed-off-by: oliver könig <[email protected]> * remove Signed-off-by: oliver könig <[email protected]> * fix Signed-off-by: oliver könig <[email protected]> * add date Signed-off-by: oliver könig <[email protected]> * fix Signed-off-by: oliver könig <[email protected]> * fix Signed-off-by: oliver könig <[email protected]> * token Signed-off-by: oliver könig <[email protected]> * test Signed-off-by: oliver könig <[email protected]> * fix Signed-off-by: oliver könig <[email protected]> * test Signed-off-by: oliver könig <[email protected]> * test Signed-off-by: oliver könig <[email protected]> * fix Signed-off-by: oliver könig <[email protected]> * newline Signed-off-by: oliver könig <[email protected]> * fix Signed-off-by: oliver könig <[email protected]> * revert Signed-off-by: oliver könig <[email protected]> * revert Signed-off-by: oliver könig <[email protected]> * fix Signed-off-by: oliver könig <[email protected]> * test Signed-off-by: oliver könig <[email protected]> * fix Signed-off-by: oliver könig <[email protected]> * verify=False Signed-off-by: oliver könig <[email protected]> * fix Signed-off-by: oliver könig <[email protected]> * decode_unicode=True Signed-off-by: oliver könig <[email protected]> * headers Signed-off-by: oliver könig <[email protected]> * fix Signed-off-by: oliver könig <[email protected]> * newline Signed-off-by: oliver könig <[email protected]> * fix Signed-off-by: oliver könig <[email protected]> * fix Signed-off-by: oliver könig <[email protected]> * more tests Signed-off-by: oliver könig <[email protected]> * disable flake8 Signed-off-by: oliver könig <[email protected]> * remove unused import Signed-off-by: oliver könig <[email protected]> * test stream_url_async Signed-off-by: oliver könig <[email protected]> * cleanup Signed-off-by: oliver könig <[email protected]> * use native typing Signed-off-by: oliver könig <[email protected]> --------- Signed-off-by: oliver könig <[email protected]>
1 parent e6b5d43 commit 6b2240e

File tree

6 files changed

+381
-29
lines changed

6 files changed

+381
-29
lines changed

.vscode/settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"python.testing.pytestArgs": [
2020
"test"
2121
],
22+
"flake8.enabled": false,
2223
"python.testing.unittestEnabled": false,
2324
"python.testing.pytestEnabled": true
2425
}

nemo_run/core/execution/dgxcloud.py

Lines changed: 90 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717
import json
1818
import logging
1919
import os
20+
import queue
2021
import subprocess
2122
import tempfile
23+
import threading
2224
import time
2325
from dataclasses import dataclass, field
2426
from enum import Enum
2527
from pathlib import Path
26-
from typing import Any, Optional, Type
28+
from typing import Any, Iterable, Optional
2729

2830
import requests
2931
from invoke.context import Context
@@ -65,6 +67,7 @@ class DGXCloudExecutor(Executor):
6567
"""
6668

6769
base_url: str
70+
kube_apiserver_url: str
6871
app_id: str
6972
app_secret: str
7073
project_name: str
@@ -359,6 +362,92 @@ def status(self, job_id: str) -> Optional[DGXCloudState]:
359362
r_json = response.json()
360363
return DGXCloudState(r_json["phase"])
361364

365+
def _stream_url_sync(self, url: str, headers: dict, q: queue.Queue):
366+
"""Stream a single URL using requests and put chunks into the queue"""
367+
try:
368+
with requests.get(url, stream=True, headers=headers, verify=False) as response:
369+
for line in response.iter_lines(decode_unicode=True):
370+
q.put((url, f"{line}\n"))
371+
except Exception as e:
372+
logger.error(f"Error streaming URL {url}: {e}")
373+
374+
finally:
375+
q.put((url, None))
376+
377+
def fetch_logs(
378+
self,
379+
job_id: str,
380+
stream: bool,
381+
stderr: Optional[bool] = None,
382+
stdout: Optional[bool] = None,
383+
) -> Iterable[str]:
384+
token = self.get_auth_token()
385+
if not token:
386+
logger.error("Failed to retrieve auth token for fetch logs request.")
387+
yield ""
388+
389+
response = requests.get(
390+
f"{self.base_url}/workloads", headers=self._default_headers(token=token)
391+
)
392+
workload_name = next(
393+
(
394+
workload["name"]
395+
for workload in response.json()["workloads"]
396+
if workload["id"] == job_id
397+
),
398+
None,
399+
)
400+
if workload_name is None:
401+
logger.error(f"No workload found with id {job_id}")
402+
yield ""
403+
404+
urls = [
405+
f"{self.kube_apiserver_url}/api/v1/namespaces/runai-{self.project_name}/pods/{workload_name}-worker-{i}/log?container=pytorch"
406+
for i in range(self.nodes)
407+
]
408+
409+
if stream:
410+
urls = [url + "&follow=true" for url in urls]
411+
412+
while self.status(job_id) != DGXCloudState.RUNNING:
413+
logger.info("Waiting for job to start...")
414+
time.sleep(15)
415+
416+
time.sleep(10)
417+
418+
q = queue.Queue()
419+
active_urls = set(urls)
420+
421+
# Start threads
422+
threads = [
423+
threading.Thread(
424+
target=self._stream_url_sync, args=(url, self._default_headers(token=token), q)
425+
)
426+
for url in urls
427+
]
428+
for t in threads:
429+
t.start()
430+
431+
# Yield chunks as they arrive
432+
while active_urls:
433+
url, item = q.get()
434+
if item is None or self.status(job_id) in [
435+
DGXCloudState.DELETING,
436+
DGXCloudState.STOPPED,
437+
DGXCloudState.STOPPING,
438+
DGXCloudState.DEGRADED,
439+
DGXCloudState.FAILED,
440+
DGXCloudState.COMPLETED,
441+
DGXCloudState.TERMINATING,
442+
]:
443+
active_urls.discard(url)
444+
else:
445+
yield item
446+
447+
# Wait for threads
448+
for t in threads:
449+
t.join()
450+
362451
def cancel(self, job_id: str):
363452
# Retrieve the authentication token for the REST calls
364453
token = self.get_auth_token()
@@ -385,12 +474,6 @@ def cancel(self, job_id: str):
385474
response.text,
386475
)
387476

388-
@classmethod
389-
def logs(cls: Type["DGXCloudExecutor"], app_id: str, fallback_path: Optional[str]):
390-
logger.warning(
391-
"Logs not available for DGXCloudExecutor based jobs. Please visit the cluster UI to view the logs."
392-
)
393-
394477
def cleanup(self, handle: str): ...
395478

396479
def assign(

nemo_run/run/logs.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@
3030
from nemo_run.core.execution.base import LogSupportedExecutor
3131
from nemo_run.core.frontend.console.api import CONSOLE
3232
from nemo_run.run.torchx_backend.runner import Runner, get_runner
33-
from nemo_run.run.torchx_backend.schedulers.api import (
34-
REVERSE_EXECUTOR_MAPPING,
35-
)
33+
from nemo_run.run.torchx_backend.schedulers.api import REVERSE_EXECUTOR_MAPPING
3634

3735
logger: logging.Logger = logging.getLogger(__name__)
3836

@@ -60,6 +58,8 @@ def print_log_lines(
6058
role_name,
6159
replica_id,
6260
regex,
61+
None,
62+
None,
6363
should_tail=should_tail,
6464
streams=streams,
6565
):

nemo_run/run/torchx_backend/schedulers/dgxcloud.py

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919
import shutil
2020
import tempfile
2121
from dataclasses import dataclass
22+
from datetime import datetime
2223
from pathlib import Path
23-
from typing import Any, Optional
24+
from typing import Any, Iterable, Optional
2425

2526
import fiddle as fdl
2627
import fiddle._src.experimental.dataclasses as fdl_dc
@@ -29,15 +30,10 @@
2930
DescribeAppResponse,
3031
ListAppResponse,
3132
Scheduler,
33+
Stream,
34+
split_lines,
3235
)
33-
from torchx.specs import (
34-
AppDef,
35-
AppState,
36-
ReplicaStatus,
37-
Role,
38-
RoleStatus,
39-
runopts,
40-
)
36+
from torchx.specs import AppDef, AppState, ReplicaStatus, Role, RoleStatus, runopts
4137

4238
from nemo_run.config import get_nemorun_home
4339
from nemo_run.core.execution.base import Executor
@@ -189,6 +185,36 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
189185
ui_url=f"{executor.base_url}/workloads/distributed/{job_id}",
190186
)
191187

188+
def log_iter(
189+
self,
190+
app_id: str,
191+
role_name: str,
192+
k: int = 0,
193+
regex: Optional[str] = None,
194+
since: Optional[datetime] = None,
195+
until: Optional[datetime] = None,
196+
should_tail: bool = False,
197+
streams: Optional[Stream] = None,
198+
) -> Iterable[str]:
199+
stored_data = _get_job_dirs()
200+
job_info = stored_data.get(app_id)
201+
_, _, job_id = app_id.split("___")
202+
executor: Optional[DGXCloudExecutor] = job_info.get("executor", None) # type: ignore
203+
if not executor:
204+
return [""]
205+
206+
logs = executor.fetch_logs(
207+
job_id=job_id,
208+
stream=should_tail,
209+
) # type: ignore
210+
if isinstance(logs, str):
211+
if len(logs) == 0:
212+
logs = []
213+
else:
214+
logs = split_lines(logs)
215+
216+
return logs
217+
192218
def _cancel_existing(self, app_id: str) -> None:
193219
"""
194220
Cancels the job by calling the DGXExecutor's cancel method.

0 commit comments

Comments
 (0)