-
Notifications
You must be signed in to change notification settings - Fork 1.7k
[https://nvbugs/5461980][draft] torch.cuda.event.synchronize() hangs #7027
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…eadlock Signed-off-by: Lizhi Zhou <[email protected]>
Signed-off-by: Lizhi Zhou <[email protected]>
📝 WalkthroughWalkthroughAdds GPU tracing, NVTX ranges, detailed runtime logging, and shutdown handling to the PyExecutor; introduces a DebugEvent CUDA event wrapper in the sampler; adds optional NSYS profiling and relaxes output checks in a disaggregated test; and removes host cache volume mounting from Docker run for LOCAL_USER. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant PrevPP as Prev PP Rank
participant CurrPP as Current PP Rank
participant NextPP as Next PP Rank
participant Sampler as Sampler (CUDA)
participant Tracer as Tracing/NVTX
participant MPI as MPI (isend/irecv)
Tracer->>CurrPP: Activate GPU tracing (env-gated)
loop Executor loop (pp_size > 1)
MPI-->>CurrPP: irecv logits for prev_batch
Tracer->>CurrPP: NVTX _handle_new_tokens_inter_pp...
CurrPP->>Sampler: Record DebugEvent, sync prior events as needed
Sampler-->>CurrPP: Sample tokens (assert non-None at last PP)
alt not last PP
CurrPP->>MPI: Wait/reset prior send handle
CurrPP->>MPI: isend_object(new logits + host state)
Tracer->>CurrPP: NVTX _send_new_tokens_...
else last PP
Tracer->>CurrPP: NVTX _sync_new_tokens_last_pp_...
CurrPP->>Sampler: synchronize(sampler_event)
end
end
note over CurrPP,MPI: On shutdown, wait on all outstanding send handles
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
Status, Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tensorrt_llm/_torch/pyexecutor/py_executor.py (1)
1126-1134
: Use DebugEvent for inter-PP forward step to keep sampler_event type uniformThis path still returns a plain
torch.cuda.Event
, which breaks callers that assumeDebugEvent
. Align with the new type.def _forward_step_inter_pp(self, scheduled_batch) -> SampleState: self._forward_step(scheduled_batch) - sampler_event = torch.cuda.Event() + from .sampler import DebugEvent # local import to avoid cycles + sampler_event = DebugEvent() sampler_event.record() self._update_request_states(scheduled_batch) sampler_event.synchronize() return self.sampler.SampleState( scheduled_requests=scheduled_batch, sampler_event=sampler_event, )
🧹 Nitpick comments (6)
tensorrt_llm/_torch/pyexecutor/sampler.py (2)
1-1
: Missing NVIDIA copyright headerPer repo guidelines, prepend the current year NVIDIA copyright/SPDX header to this source file.
Apply at the very top of the file (outside the shown range):
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +
5-5
: Unused import: traceback
traceback
is imported but not used. Please remove to keep the module clean.-import traceback
tests/integration/defs/disaggregated/test_disaggregated.py (1)
269-269
: Intentional: content checks disabled for this pathResetting
expected_strings
to an empty list effectively disables content-based asserts. If this is temporary for profiling/debugging, consider guarding via an env flag to keep some validation in CI.tensorrt_llm/_torch/pyexecutor/py_executor.py (3)
1-1
: Missing NVIDIA copyright headerPer repo guidelines, prepend the current year NVIDIA copyright/SPDX header to this source file.
Apply at the very top of the file (outside the shown range):
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +
412-414
: Reduce log noise in hot path; fix long line (E501)
should_stop_processing
is polled frequently; logging at INFO every iteration is noisy and expensive. Use DEBUG and shorten to satisfy E501.- logger.info( - f"rank {self.dist.pp_rank} should_stop_processing: {self.is_shutdown} {len(self.active_requests)} {self.executor_request_queue.get_waiting_queue_size()} handle {len([h for h in self.send_handles if h is not None])}" - ) + logger.debug( + "rank %s should_stop_processing: %s actv=%d waitq=%d handles=%d", + self.dist.pp_rank, self.is_shutdown, len(self.active_requests), + self.executor_request_queue.get_waiting_queue_size(), + sum(h is not None for h in self.send_handles), + )
1647-1650
: Tighten logging and satisfy E501Reduce verbosity and wrap to meet line length.
- logger.info( - f"rank {self.dist.pp_rank} _handle_responses: {len(self.active_requests)} {len(new_active_requests)} {len(requests_to_terminate)}" - ) + logger.debug( + "rank %s _handle_responses: active=%d new=%d done=%d", + self.dist.pp_rank, len(self.active_requests), + len(new_active_requests), len(requests_to_terminate), + )
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
docker/Makefile
(0 hunks)tensorrt_llm/_torch/pyexecutor/py_executor.py
(7 hunks)tensorrt_llm/_torch/pyexecutor/sampler.py
(4 hunks)tests/integration/defs/disaggregated/test_disaggregated.py
(2 hunks)
💤 Files with no reviewable changes (1)
- docker/Makefile
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py
📄 CodeRabbit Inference Engine (CODING_GUIDELINES.md)
**/*.py
: Python code must target Python 3.8+
Python indentation: 4 spaces, no tabs
Maintain module namespace in imports (from package.subpackage import foo; then use foo.SomeClass())
Python file names use snake_case
Python class names use PascalCase
Python functions/methods and local variables use snake_case; variables starting with a number get k_ prefix (e.g., k_99th_percentile)
Global variables use G_ prefixed UPPER_SNAKE_CASE (e.g., G_MY_GLOBAL)
Constants use UPPER_SNAKE_CASE in Python
Avoid shadowing variables from outer scopes in Python
Initialize all externally visible members of a Python class in init
Prefer docstrings for interfaces used outside a file; comments for local code
Use Google-style docstrings for classes and functions (Sphinx-parsable)
Document attributes/variables inline with short docstrings
Avoid reflection when simple alternatives exist (e.g., prefer explicit parameters over dict(**locals()))
In try/except, catch the narrowest exceptions possible
For duck-typing with try/except, keep try body minimal and put logic in else
Files:
tensorrt_llm/_torch/pyexecutor/sampler.py
tests/integration/defs/disaggregated/test_disaggregated.py
tensorrt_llm/_torch/pyexecutor/py_executor.py
**/*.{cpp,cxx,cc,cu,h,hpp,hxx,hh,cuh,py}
📄 CodeRabbit Inference Engine (CODING_GUIDELINES.md)
Prepend NVIDIA copyright header (current year) to all source files
Files:
tensorrt_llm/_torch/pyexecutor/sampler.py
tests/integration/defs/disaggregated/test_disaggregated.py
tensorrt_llm/_torch/pyexecutor/py_executor.py
🪛 Ruff (0.12.2)
tensorrt_llm/_torch/pyexecutor/py_executor.py
44-44: Line too long (128 > 120)
(E501)
46-46: Line too long (125 > 120)
(E501)
47-47: Line too long (121 > 120)
(E501)
413-413: Line too long (227 > 120)
(E501)
1648-1648: Line too long (142 > 120)
(E501)
🔇 Additional comments (3)
tensorrt_llm/_torch/pyexecutor/sampler.py (1)
396-403
: LGTM: Instrumented sampler_event creationUsing
DebugEvent()
here is consistent with the new instrumentation and enables the NVTX naming in the executor.tests/integration/defs/disaggregated/test_disaggregated.py (1)
174-178
: LGTM: Prefixing workers with NSYS commandOnce gating and path usage are corrected above, prefixing
mpirun
withnsys_cmd
is a clean way to toggle profiling via envs.tensorrt_llm/_torch/pyexecutor/py_executor.py (1)
646-651
: LGTM: Proactive cleanup of outstanding send handles on shutdownWaiting on outstanding send handles eliminates a class of shutdown deadlocks between PP ranks.
torch._C._activate_gpu_trace() | ||
torch.cuda._gpu_trace.register_callback_for_event_synchronization(lambda event: logger.info(f"TorchEvent {event} synchronized")) | ||
torch.cuda._gpu_trace.register_callback_for_event_creation(lambda event: logger.info(f"TorchEvent {event} created")) | ||
torch.cuda._gpu_trace.register_callback_for_event_record(lambda event, t: logger.info(f"TorchEvent {event} recorded at {t}")) | ||
torch.cuda._gpu_trace.register_callback_for_event_wait(lambda event, t: logger.info(f"TorchEvent {event} waited at {t}")) | ||
torch.cuda._gpu_trace.register_callback_for_event_deletion(lambda event: logger.info(f"TorchEvent {event} destroyed")) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Guard and wrap private PyTorch GPU trace hooks; fix long lines (E501)
- This unconditionally activates private APIs (
torch._C._activate_gpu_trace
,torch.cuda._gpu_trace.*
) at import time. Risky and costly by default. - Gate with an env var and try/except; downgrade to a warning on failure.
- Split long lines per Ruff E501.
- torch._C._activate_gpu_trace()
- torch.cuda._gpu_trace.register_callback_for_event_synchronization(lambda event: logger.info(f"TorchEvent {event} synchronized"))
- torch.cuda._gpu_trace.register_callback_for_event_creation(lambda event: logger.info(f"TorchEvent {event} created"))
- torch.cuda._gpu_trace.register_callback_for_event_record(lambda event, t: logger.info(f"TorchEvent {event} recorded at {t}"))
- torch.cuda._gpu_trace.register_callback_for_event_wait(lambda event, t: logger.info(f"TorchEvent {event} waited at {t}"))
- torch.cuda._gpu_trace.register_callback_for_event_deletion(lambda event: logger.info(f"TorchEvent {event} destroyed"))
+if os.environ.get("TLLM_TORCH_GPU_TRACE") == "1":
+ try:
+ torch._C._activate_gpu_trace()
+ torch.cuda._gpu_trace.register_callback_for_event_synchronization(
+ lambda event: logger.info(f"TorchEvent {event} synchronized"))
+ torch.cuda._gpu_trace.register_callback_for_event_creation(
+ lambda event: logger.info(f"TorchEvent {event} created"))
+ torch.cuda._gpu_trace.register_callback_for_event_record(
+ lambda event, t: logger.info(f"TorchEvent {event} recorded at {t}"))
+ torch.cuda._gpu_trace.register_callback_for_event_wait(
+ lambda event, t: logger.info(f"TorchEvent {event} waited at {t}"))
+ torch.cuda._gpu_trace.register_callback_for_event_deletion(
+ lambda event: logger.info(f"TorchEvent {event} destroyed"))
+ except Exception as e:
+ logger.warning("Failed to activate Torch GPU tracing: %s", e)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
torch._C._activate_gpu_trace() | |
torch.cuda._gpu_trace.register_callback_for_event_synchronization(lambda event: logger.info(f"TorchEvent {event} synchronized")) | |
torch.cuda._gpu_trace.register_callback_for_event_creation(lambda event: logger.info(f"TorchEvent {event} created")) | |
torch.cuda._gpu_trace.register_callback_for_event_record(lambda event, t: logger.info(f"TorchEvent {event} recorded at {t}")) | |
torch.cuda._gpu_trace.register_callback_for_event_wait(lambda event, t: logger.info(f"TorchEvent {event} waited at {t}")) | |
torch.cuda._gpu_trace.register_callback_for_event_deletion(lambda event: logger.info(f"TorchEvent {event} destroyed")) | |
if os.environ.get("TLLM_TORCH_GPU_TRACE") == "1": | |
try: | |
torch._C._activate_gpu_trace() | |
torch.cuda._gpu_trace.register_callback_for_event_synchronization( | |
lambda event: logger.info(f"TorchEvent {event} synchronized")) | |
torch.cuda._gpu_trace.register_callback_for_event_creation( | |
lambda event: logger.info(f"TorchEvent {event} created")) | |
torch.cuda._gpu_trace.register_callback_for_event_record( | |
lambda event, t: logger.info(f"TorchEvent {event} recorded at {t}")) | |
torch.cuda._gpu_trace.register_callback_for_event_wait( | |
lambda event, t: logger.info(f"TorchEvent {event} waited at {t}")) | |
torch.cuda._gpu_trace.register_callback_for_event_deletion( | |
lambda event: logger.info(f"TorchEvent {event} destroyed")) | |
except Exception as e: | |
logger.warning("Failed to activate Torch GPU tracing: %s", e) |
🧰 Tools
🪛 Ruff (0.12.2)
44-44: Line too long (128 > 120)
(E501)
46-46: Line too long (125 > 120)
(E501)
47-47: Line too long (121 > 120)
(E501)
🤖 Prompt for AI Agents
In tensorrt_llm/_torch/pyexecutor/py_executor.py around lines 43 to 49, the code
unconditionally calls private PyTorch GPU trace APIs and uses very long lines;
wrap activation and registrations behind an environment variable check (e.g.,
read a specific TRITON/TORCH_TRACE env var) and perform attribute existence
checks before calling, then enclose the activation and each register_* call in a
try/except that logs a warning (logger.warning) on any Exception instead of
raising; also break the logging callback expressions into shorter lines to
satisfy line-length rules and avoid long f-strings in single lines.
with torch.cuda.nvtx.range( | ||
f"_handle_new_tokens_inter_pp{self.dist.pp_rank}_pr{self.dist.prev_pp_rank}_mb{prev_microbatch_id}"): | ||
# Receive tokens from previous pp rank (w.r.t model forward direction) | ||
( | ||
logits, | ||
sample_state.host, | ||
) = self.dist.recv_object( | ||
src=self.dist.prev_pp_rank, | ||
tag=prev_microbatch_id, | ||
) | ||
if logits is not None: | ||
logits_host = torch.from_numpy(logits) | ||
sample_state.host.logits = logits_host | ||
sample_state.device.logits = logits_host.to( | ||
self.device_id) | ||
( | ||
logits, | ||
previous_batch.sample_state.host, | ||
) = self.dist.recv_object( | ||
src=self.dist.prev_pp_rank, | ||
tag=prev_microbatch_id, | ||
) | ||
if logits is not None: | ||
logits_host = torch.from_numpy(logits) | ||
previous_batch.sample_state.host.logits = logits_host | ||
previous_batch.sample_state.device.logits = logits_host.to( | ||
self.device_id) | ||
else: | ||
torch.cuda.nvtx.range_push("_handle_new_tokens_last_pp") | ||
sample_state.sampler_event.synchronize() | ||
with torch.cuda.nvtx.range( | ||
f"_sync_new_tokens_last_pp_{previous_batch.sample_state.sampler_event.counter}"): | ||
previous_batch.sample_state.sampler_event.synchronize() | ||
|
||
# Send tokens to next pp rank (w.r.t model forward direction) | ||
# Second last rank does not need to since last rank has original decoded tokens | ||
if not self.dist.is_second_last_pp_rank: | ||
if self.send_handles[prev_microbatch_id] is not None: | ||
self.send_handles[prev_microbatch_id].wait() | ||
needs_logits = ( | ||
self._need_return_logits(scheduled_batch) | ||
or (self._need_return_log_probs(scheduled_batch) | ||
and sample_state.host.log_probs is not None)) | ||
serialized_logits = sample_state.host.logits.numpy( | ||
) if needs_logits else None | ||
self.send_handles[ | ||
prev_microbatch_id] = self.dist.isend_object( | ||
( | ||
serialized_logits, | ||
sample_state.host, | ||
), | ||
dest=self.dist.next_pp_rank, | ||
tag=prev_microbatch_id) | ||
torch.cuda.nvtx.range_pop() | ||
with torch.cuda.nvtx.range( | ||
f"_send_new_tokens_{self.dist.pp_rank}_pr{self.dist.next_pp_rank}_mb{prev_microbatch_id}"): | ||
if self.send_handles[prev_microbatch_id] is not None: | ||
self.send_handles[prev_microbatch_id].wait() | ||
self.send_handles[prev_microbatch_id] = None | ||
needs_logits = ( | ||
self._need_return_logits(scheduled_batch) | ||
or (self._need_return_log_probs(scheduled_batch) | ||
and sample_state.host.log_probs is not None)) | ||
serialized_logits = sample_state.host.logits.numpy( | ||
) if needs_logits else None | ||
self.send_handles[ | ||
prev_microbatch_id] = self.dist.isend_object( | ||
( | ||
serialized_logits, | ||
sample_state.host, | ||
), | ||
dest=self.dist.next_pp_rank, | ||
tag=prev_microbatch_id) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Make last-PP NVTX naming robust; fix logits send logic to reference the correct batch
previous_batch.sample_state.sampler_event
may be a plaintorch.cuda.Event
(e.g., TRTLLMSampler), so.counter
may not exist.needs_logits
is computed from the current batch but you’re sending tokens/logits for the previous microbatch. Useprevious_batch
’s host state.- When serializing logits, use the previous batch’s
host.logits
, not the currentsample_state.host.logits
.
- with torch.cuda.nvtx.range(
- f"_sync_new_tokens_last_pp_{previous_batch.sample_state.sampler_event.counter}"):
- previous_batch.sample_state.sampler_event.synchronize()
+ _ev = previous_batch.sample_state.sampler_event
+ _ctr = getattr(_ev, "counter", -1)
+ with torch.cuda.nvtx.range(
+ f"_sync_new_tokens_last_pp_{_ctr}"):
+ _ev.synchronize()
@@
- if self.send_handles[prev_microbatch_id] is not None:
+ if self.send_handles[prev_microbatch_id] is not None:
self.send_handles[prev_microbatch_id].wait()
self.send_handles[prev_microbatch_id] = None
- needs_logits = (
- self._need_return_logits(scheduled_batch)
- or (self._need_return_log_probs(scheduled_batch)
- and sample_state.host.log_probs is not None))
- serialized_logits = sample_state.host.logits.numpy(
- ) if needs_logits else None
+ prev_host = previous_batch.sample_state.host
+ needs_logits = (
+ (getattr(prev_host, "logits", None) is not None)
+ or (getattr(prev_host, "log_probs", None) is not None)
+ )
+ serialized_logits = (
+ prev_host.logits.numpy() if needs_logits else None
+ )
self.send_handles[
prev_microbatch_id] = self.dist.isend_object(
(
serialized_logits,
- sample_state.host,
+ prev_host,
),
dest=self.dist.next_pp_rank,
tag=prev_microbatch_id)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
with torch.cuda.nvtx.range( | |
f"_handle_new_tokens_inter_pp{self.dist.pp_rank}_pr{self.dist.prev_pp_rank}_mb{prev_microbatch_id}"): | |
# Receive tokens from previous pp rank (w.r.t model forward direction) | |
( | |
logits, | |
sample_state.host, | |
) = self.dist.recv_object( | |
src=self.dist.prev_pp_rank, | |
tag=prev_microbatch_id, | |
) | |
if logits is not None: | |
logits_host = torch.from_numpy(logits) | |
sample_state.host.logits = logits_host | |
sample_state.device.logits = logits_host.to( | |
self.device_id) | |
( | |
logits, | |
previous_batch.sample_state.host, | |
) = self.dist.recv_object( | |
src=self.dist.prev_pp_rank, | |
tag=prev_microbatch_id, | |
) | |
if logits is not None: | |
logits_host = torch.from_numpy(logits) | |
previous_batch.sample_state.host.logits = logits_host | |
previous_batch.sample_state.device.logits = logits_host.to( | |
self.device_id) | |
else: | |
torch.cuda.nvtx.range_push("_handle_new_tokens_last_pp") | |
sample_state.sampler_event.synchronize() | |
with torch.cuda.nvtx.range( | |
f"_sync_new_tokens_last_pp_{previous_batch.sample_state.sampler_event.counter}"): | |
previous_batch.sample_state.sampler_event.synchronize() | |
# Send tokens to next pp rank (w.r.t model forward direction) | |
# Second last rank does not need to since last rank has original decoded tokens | |
if not self.dist.is_second_last_pp_rank: | |
if self.send_handles[prev_microbatch_id] is not None: | |
self.send_handles[prev_microbatch_id].wait() | |
needs_logits = ( | |
self._need_return_logits(scheduled_batch) | |
or (self._need_return_log_probs(scheduled_batch) | |
and sample_state.host.log_probs is not None)) | |
serialized_logits = sample_state.host.logits.numpy( | |
) if needs_logits else None | |
self.send_handles[ | |
prev_microbatch_id] = self.dist.isend_object( | |
( | |
serialized_logits, | |
sample_state.host, | |
), | |
dest=self.dist.next_pp_rank, | |
tag=prev_microbatch_id) | |
torch.cuda.nvtx.range_pop() | |
with torch.cuda.nvtx.range( | |
f"_send_new_tokens_{self.dist.pp_rank}_pr{self.dist.next_pp_rank}_mb{prev_microbatch_id}"): | |
if self.send_handles[prev_microbatch_id] is not None: | |
self.send_handles[prev_microbatch_id].wait() | |
self.send_handles[prev_microbatch_id] = None | |
needs_logits = ( | |
self._need_return_logits(scheduled_batch) | |
or (self._need_return_log_probs(scheduled_batch) | |
and sample_state.host.log_probs is not None)) | |
serialized_logits = sample_state.host.logits.numpy( | |
) if needs_logits else None | |
self.send_handles[ | |
prev_microbatch_id] = self.dist.isend_object( | |
( | |
serialized_logits, | |
sample_state.host, | |
), | |
dest=self.dist.next_pp_rank, | |
tag=prev_microbatch_id) | |
with torch.cuda.nvtx.range( | |
f"_handle_new_tokens_inter_pp{self.dist.pp_rank}_pr{self.dist.prev_pp_rank}_mb{prev_microbatch_id}"): | |
# Receive tokens from previous pp rank (w.r.t model forward direction) | |
( | |
logits, | |
previous_batch.sample_state.host, | |
) = self.dist.recv_object( | |
src=self.dist.prev_pp_rank, | |
tag=prev_microbatch_id, | |
) | |
if logits is not None: | |
logits_host = torch.from_numpy(logits) | |
previous_batch.sample_state.host.logits = logits_host | |
previous_batch.sample_state.device.logits = logits_host.to( | |
self.device_id) | |
else: | |
_ev = previous_batch.sample_state.sampler_event | |
_ctr = getattr(_ev, "counter", -1) | |
with torch.cuda.nvtx.range( | |
f"_sync_new_tokens_last_pp_{_ctr}"): | |
_ev.synchronize() | |
# Send tokens to next pp rank (w.r.t model forward direction) | |
# Second last rank does not need to since last rank has original decoded tokens | |
if not self.dist.is_second_last_pp_rank: | |
with torch.cuda.nvtx.range( | |
f"_send_new_tokens_{self.dist.pp_rank}_pr{self.dist.next_pp_rank}_mb{prev_microbatch_id}"): | |
if self.send_handles[prev_microbatch_id] is not None: | |
self.send_handles[prev_microbatch_id].wait() | |
self.send_handles[prev_microbatch_id] = None | |
prev_host = previous_batch.sample_state.host | |
needs_logits = ( | |
(getattr(prev_host, "logits", None) is not None) | |
or (getattr(prev_host, "log_probs", None) is not None) | |
) | |
serialized_logits = ( | |
prev_host.logits.numpy() if needs_logits else None | |
) | |
self.send_handles[ | |
prev_microbatch_id] = self.dist.isend_object( | |
( | |
serialized_logits, | |
prev_host, | |
), | |
dest=self.dist.next_pp_rank, | |
tag=prev_microbatch_id) |
🤖 Prompt for AI Agents
In tensorrt_llm/_torch/pyexecutor/py_executor.py around lines 801 to 843, make
the last-PP NVTX naming and logits-send logic robust: replace the direct access
to previous_batch.sample_state.sampler_event.counter with a safe fallback (e.g.,
check hasattr(event, "counter") and use it, otherwise use a stable identifier
such as previous_microbatch_id or omit the counter) so plain torch.cuda.Event
instances don't break; compute needs_logits using previous_batch.sample_state
(use previous_batch.sample_state.host.log_probs when checking for log-probs)
because you are sending the previous microbatch, and serialize/send
previous_batch.sample_state.host.logits (and convert to device from
previous_batch.sample_state.device if needed) instead of using the current
sample_state.host.logits; keep existing send/handle logic but ensure you
reference previous_batch consistently for both the condition and the serialized
payload.
class DebugEvent(torch.cuda.Event): | ||
counter = 0 | ||
|
||
def __init__(self): | ||
super().__init__() | ||
self.counter = DebugEvent.counter | ||
DebugEvent.counter += 1 | ||
|
||
def __del__(self): | ||
logger.info(f"DebugEvent {self.counter} destroyed") | ||
|
||
def synchronize(self): | ||
logger.info(f"DebugEvent {self.counter} synchronized") | ||
super().synchronize() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
DebugEvent destructor and logging may cause shutdown-time issues; make logging safe and less chatty
- Logging in
__del__
can run during interpreter shutdown when logging infra is partially torn down. - Use debug level to avoid flooding logs.
- Wrap destructor logging in try/except to avoid noisy exceptions at teardown.
class DebugEvent(torch.cuda.Event):
counter = 0
def __init__(self):
super().__init__()
self.counter = DebugEvent.counter
DebugEvent.counter += 1
def __del__(self):
- logger.info(f"DebugEvent {self.counter} destroyed")
+ try:
+ logger.debug(f"DebugEvent {self.counter} destroyed")
+ except Exception:
+ # Best-effort logging; ignore errors during interpreter shutdown.
+ pass
def synchronize(self):
- logger.info(f"DebugEvent {self.counter} synchronized")
+ logger.debug(f"DebugEvent {self.counter} synchronized")
super().synchronize()
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
class DebugEvent(torch.cuda.Event): | |
counter = 0 | |
def __init__(self): | |
super().__init__() | |
self.counter = DebugEvent.counter | |
DebugEvent.counter += 1 | |
def __del__(self): | |
logger.info(f"DebugEvent {self.counter} destroyed") | |
def synchronize(self): | |
logger.info(f"DebugEvent {self.counter} synchronized") | |
super().synchronize() | |
class DebugEvent(torch.cuda.Event): | |
counter = 0 | |
def __init__(self): | |
super().__init__() | |
self.counter = DebugEvent.counter | |
DebugEvent.counter += 1 | |
def __del__(self): | |
try: | |
logger.debug(f"DebugEvent {self.counter} destroyed") | |
except Exception: | |
# Best-effort logging; ignore errors during interpreter shutdown. | |
pass | |
def synchronize(self): | |
logger.debug(f"DebugEvent {self.counter} synchronized") | |
super().synchronize() |
🤖 Prompt for AI Agents
In tensorrt_llm/_torch/pyexecutor/sampler.py around lines 41 to 55, the
DebugEvent __del__ and synchronize logging can cause shutdown-time issues and
are too chatty; change both logging calls to logger.debug and wrap the __del__
logging in a try/except that suppresses all exceptions (silently ignore
failures) so nothing raises during interpreter teardown, i.e., perform
logger.debug inside a try block and except Exception: pass to ensure safe,
non-throwing, low-verbosity teardown logging.
@@ -44,7 +61,7 @@ | |||
device: SampleStateTensors = None | |||
host: SampleStateTensors = None | |||
|
|||
sampler_event: torch.cuda.Event = None | |||
sampler_event: DebugEvent = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type now forces DebugEvent, but TRTLLMSampler still returns torch.cuda.Event
SampleState.sampler_event
is now DebugEvent
, yet TRTLLMSampler (below) constructs a plain torch.cuda.Event
. Callers (py_executor) access .counter
, which will raise AttributeError when used with TRTLLMSampler.
Align TRTLLMSampler to also use DebugEvent
.
Outside the shown range (in this same file), update TRTLLMSampler.sample_async:
@@
- sampler_event = torch.cuda.Event()
+ sampler_event = DebugEvent()
sampler_event.record()
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
sampler_event: DebugEvent = None | |
sampler_event = DebugEvent() | |
sampler_event.record() |
nsys_path = os.getenv("NSYS_PATH", "nsys") | ||
nsys_file = os.getenv("NSYS_FILE", f"/tmp/disagg_test_{test_desc}") | ||
nsys_cmd = [ | ||
"nsys", | ||
"profile", | ||
"--trace", | ||
"cuda,cublas,nvtx", | ||
"--output=" + nsys_file, | ||
"--force-overwrite=true", | ||
"--duration=120", | ||
] if nsys_path and nsys_file else [] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NSYS profiling is always enabled and ignores NSYS_PATH; fix gating and path usage
NSYS_PATH
is defaulted to "nsys", making the condition truthy even when unset; profiling will always run.- The command uses the literal "nsys" instead of
nsys_path
.
Gate profiling strictly on both env vars being set, and use the provided NSYS binary. Optionally make duration configurable.
- nsys_path = os.getenv("NSYS_PATH", "nsys")
- nsys_file = os.getenv("NSYS_FILE", f"/tmp/disagg_test_{test_desc}")
- nsys_cmd = [
- "nsys",
- "profile",
- "--trace",
- "cuda,cublas,nvtx",
- "--output=" + nsys_file,
- "--force-overwrite=true",
- "--duration=120",
- ] if nsys_path and nsys_file else []
+ nsys_path = os.getenv("NSYS_PATH")
+ nsys_file = os.getenv("NSYS_FILE")
+ nsys_duration = os.getenv("NSYS_DURATION", "120")
+ nsys_cmd = []
+ if nsys_path and nsys_file:
+ nsys_cmd = [
+ nsys_path,
+ "profile",
+ "--trace",
+ "cuda,cublas,nvtx",
+ f"--output={nsys_file}",
+ "--force-overwrite=true",
+ f"--duration={nsys_duration}",
+ ]
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
nsys_path = os.getenv("NSYS_PATH", "nsys") | |
nsys_file = os.getenv("NSYS_FILE", f"/tmp/disagg_test_{test_desc}") | |
nsys_cmd = [ | |
"nsys", | |
"profile", | |
"--trace", | |
"cuda,cublas,nvtx", | |
"--output=" + nsys_file, | |
"--force-overwrite=true", | |
"--duration=120", | |
] if nsys_path and nsys_file else [] | |
nsys_path = os.getenv("NSYS_PATH") | |
nsys_file = os.getenv("NSYS_FILE") | |
nsys_duration = os.getenv("NSYS_DURATION", "120") | |
nsys_cmd = [] | |
if nsys_path and nsys_file: | |
nsys_cmd = [ | |
nsys_path, | |
"profile", | |
"--trace", | |
"cuda,cublas,nvtx", | |
f"--output={nsys_file}", | |
"--force-overwrite=true", | |
f"--duration={nsys_duration}", | |
] |
🤖 Prompt for AI Agents
In tests/integration/defs/disaggregated/test_disaggregated.py around lines 159
to 170, NSYS profiling is incorrectly always enabled because NSYS_PATH is
defaulted to "nsys" and the command uses the literal "nsys"; change the logic to
only enable profiling when both NSYS_PATH and NSYS_FILE environment variables
are explicitly set (no default for NSYS_PATH), use the resolved nsys_path
variable in the command array instead of the string "nsys", and optionally read
a NSYS_DURATION env var (with a sensible default) to make the duration
configurable.
Summary by CodeRabbit
Description
Test Coverage
GitHub Bot Help
/bot [-h] ['run', 'kill', 'skip', 'reuse-pipeline'] ...
Provide a user friendly way for developers to interact with a Jenkins server.
Run
/bot [-h|--help]
to print this help message.See details below for each supported subcommand.
run [--reuse-test (optional)pipeline-id --disable-fail-fast --skip-test --stage-list "A10-PyTorch-1, xxx" --gpu-type "A30, H100_PCIe" --test-backend "pytorch, cpp" --add-multi-gpu-test --only-multi-gpu-test --disable-multi-gpu-test --post-merge --extra-stage "H100_PCIe-TensorRT-Post-Merge-1, xxx" --detailed-log --debug(experimental)]
Launch build/test pipelines. All previously running jobs will be killed.
--reuse-test (optional)pipeline-id
(OPTIONAL) : Allow the new pipeline to reuse build artifacts and skip successful test stages from a specified pipeline or the last pipeline if no pipeline-id is indicated. If the Git commit ID has changed, this option will be always ignored. The DEFAULT behavior of the bot is to reuse build artifacts and successful test results from the last pipeline.--disable-reuse-test
(OPTIONAL) : Explicitly prevent the pipeline from reusing build artifacts and skipping successful test stages from a previous pipeline. Ensure that all builds and tests are run regardless of previous successes.--disable-fail-fast
(OPTIONAL) : Disable fail fast on build/tests/infra failures.--skip-test
(OPTIONAL) : Skip all test stages, but still run build stages, package stages and sanity check stages. Note: Does NOT update GitHub check status.--stage-list "A10-PyTorch-1, xxx"
(OPTIONAL) : Only run the specified test stages. Examples: "A10-PyTorch-1, xxx". Note: Does NOT update GitHub check status.--gpu-type "A30, H100_PCIe"
(OPTIONAL) : Only run the test stages on the specified GPU types. Examples: "A30, H100_PCIe". Note: Does NOT update GitHub check status.--test-backend "pytorch, cpp"
(OPTIONAL) : Skip test stages which don't match the specified backends. Only support [pytorch, cpp, tensorrt, triton]. Examples: "pytorch, cpp" (does not run test stages with tensorrt or triton backend). Note: Does NOT update GitHub pipeline status.--only-multi-gpu-test
(OPTIONAL) : Only run the multi-GPU tests. Note: Does NOT update GitHub check status.--disable-multi-gpu-test
(OPTIONAL) : Disable the multi-GPU tests. Note: Does NOT update GitHub check status.--add-multi-gpu-test
(OPTIONAL) : Force run the multi-GPU tests in addition to running L0 pre-merge pipeline.--post-merge
(OPTIONAL) : Run the L0 post-merge pipeline instead of the ordinary L0 pre-merge pipeline.--extra-stage "H100_PCIe-TensorRT-Post-Merge-1, xxx"
(OPTIONAL) : Run the ordinary L0 pre-merge pipeline and specified test stages. Examples: --extra-stage "H100_PCIe-TensorRT-Post-Merge-1, xxx".--detailed-log
(OPTIONAL) : Enable flushing out all logs to the Jenkins console. This will significantly increase the log volume and may slow down the job.--debug
(OPTIONAL) : Experimental feature. Enable access to the CI container for debugging purpose. Note: Specify exactly one stage in thestage-list
parameter to access the appropriate container environment. Note: Does NOT update GitHub check status.For guidance on mapping tests to stage names, see
docs/source/reference/ci-overview.md
and the
scripts/test_to_stage_mapping.py
helper.kill
kill
Kill all running builds associated with pull request.
skip
skip --comment COMMENT
Skip testing for latest commit on pull request.
--comment "Reason for skipping build/test"
is required. IMPORTANT NOTE: This is dangerous since lack of user care and validation can cause top of tree to break.reuse-pipeline
reuse-pipeline
Reuse a previous pipeline to validate current commit. This action will also kill all currently running builds associated with the pull request. IMPORTANT NOTE: This is dangerous since lack of user care and validation can cause top of tree to break.