Skip to content

Conversation

joyang-nv
Copy link
Collaborator

@joyang-nv joyang-nv commented Sep 4, 2025

Summary by CodeRabbit

  • New Features
    • Optional Ray-based orchestration for inference (multi-node, disaggregated, async).
    • Non-MPI path using PyTorch ProcessGroup, DeviceMesh-based mapping, and PG-backed collectives (allgather/allreduce/reducescatter).
    • Ray executor and GPU worker with queue-based result delivery.
    • LLM API: collective RPC and async/sync weight update helpers.
  • Documentation
    • New READMEs for Ray workflows and disaggregated serving.
  • Examples
    • Ray inference scripts, multi-node cluster launcher, disagg-serving CLI.
  • Tests
    • Ray-enabled unit/integration tests and pytest markers/flag.
  • Chores
    • Added ray dependency; build/packaging updates for new utilities.

Description

Test Coverage

PR Checklist

Please review the following before submitting your PR:

  • PR description clearly explains what and why. If using CodeRabbit's summary, please make sure it makes sense.

  • PR Follows TRT-LLM CODING GUIDELINES to the best of your knowledge.

  • Test cases are provided for new code paths (see test instructions)

  • Any new dependencies have been scanned for license and vulnerabilities

  • CODEOWNERS updated if ownership changes

  • Documentation updated as needed

  • The reviewers assigned automatically/manually are appropriate for the PR.

  • Please check this after reviewing the above items as appropriate for this PR.

GitHub Bot Help

/bot [-h] ['run', 'kill', 'skip', 'reuse-pipeline'] ...

Provide a user friendly way for developers to interact with a Jenkins server.

Run /bot [-h|--help] to print this help message.

See details below for each supported subcommand.

run [--reuse-test (optional)pipeline-id --disable-fail-fast --skip-test --stage-list "A10-PyTorch-1, xxx" --gpu-type "A30, H100_PCIe" --test-backend "pytorch, cpp" --add-multi-gpu-test --only-multi-gpu-test --disable-multi-gpu-test --post-merge --extra-stage "H100_PCIe-TensorRT-Post-Merge-1, xxx" --detailed-log --debug(experimental)]

Launch build/test pipelines. All previously running jobs will be killed.

--reuse-test (optional)pipeline-id (OPTIONAL) : Allow the new pipeline to reuse build artifacts and skip successful test stages from a specified pipeline or the last pipeline if no pipeline-id is indicated. If the Git commit ID has changed, this option will be always ignored. The DEFAULT behavior of the bot is to reuse build artifacts and successful test results from the last pipeline.

--disable-reuse-test (OPTIONAL) : Explicitly prevent the pipeline from reusing build artifacts and skipping successful test stages from a previous pipeline. Ensure that all builds and tests are run regardless of previous successes.

--disable-fail-fast (OPTIONAL) : Disable fail fast on build/tests/infra failures.

--skip-test (OPTIONAL) : Skip all test stages, but still run build stages, package stages and sanity check stages. Note: Does NOT update GitHub check status.

--stage-list "A10-PyTorch-1, xxx" (OPTIONAL) : Only run the specified test stages. Examples: "A10-PyTorch-1, xxx". Note: Does NOT update GitHub check status.

--gpu-type "A30, H100_PCIe" (OPTIONAL) : Only run the test stages on the specified GPU types. Examples: "A30, H100_PCIe". Note: Does NOT update GitHub check status.

--test-backend "pytorch, cpp" (OPTIONAL) : Skip test stages which don't match the specified backends. Only support [pytorch, cpp, tensorrt, triton]. Examples: "pytorch, cpp" (does not run test stages with tensorrt or triton backend). Note: Does NOT update GitHub pipeline status.

--only-multi-gpu-test (OPTIONAL) : Only run the multi-GPU tests. Note: Does NOT update GitHub check status.

--disable-multi-gpu-test (OPTIONAL) : Disable the multi-GPU tests. Note: Does NOT update GitHub check status.

--add-multi-gpu-test (OPTIONAL) : Force run the multi-GPU tests in addition to running L0 pre-merge pipeline.

--post-merge (OPTIONAL) : Run the L0 post-merge pipeline instead of the ordinary L0 pre-merge pipeline.

--extra-stage "H100_PCIe-TensorRT-Post-Merge-1, xxx" (OPTIONAL) : Run the ordinary L0 pre-merge pipeline and specified test stages. Examples: --extra-stage "H100_PCIe-TensorRT-Post-Merge-1, xxx".

--detailed-log (OPTIONAL) : Enable flushing out all logs to the Jenkins console. This will significantly increase the log volume and may slow down the job.

--debug (OPTIONAL) : Experimental feature. Enable access to the CI container for debugging purpose. Note: Specify exactly one stage in the stage-list parameter to access the appropriate container environment. Note: Does NOT update GitHub check status.

For guidance on mapping tests to stage names, see docs/source/reference/ci-overview.md
and the scripts/test_to_stage_mapping.py helper.

kill

kill

Kill all running builds associated with pull request.

skip

skip --comment COMMENT

Skip testing for latest commit on pull request. --comment "Reason for skipping build/test" is required. IMPORTANT NOTE: This is dangerous since lack of user care and validation can cause top of tree to break.

reuse-pipeline

reuse-pipeline

Reuse a previous pipeline to validate current commit. This action will also kill all currently running builds associated with the pull request. IMPORTANT NOTE: This is dangerous since lack of user care and validation can cause top of tree to break.

@joyang-nv joyang-nv requested review from a team as code owners September 4, 2025 06:15
@hchings hchings changed the title Adding ray for new orchestrator type [TRTLLM-7349][feat] Adding new Ray orchestrator Sep 4, 2025
Copy link
Contributor

coderabbitai bot commented Sep 4, 2025

📝 Walkthrough

Walkthrough

Introduces a ProcessGroup-based distributed path alongside MPI across C++ and Python: new pg_utils library, CacheTransceiverComm abstraction, PG-backed collectives in thop ops, MPI gating via TLLM_DISABLE_MPI, and Ray-based orchestration (executor, workers, examples, tests). Adds bindings, build targets, packaging entries, and tests for PG/Ray flows.

Changes

Cohort / File(s) Summary
Build & Packaging
cpp/tensorrt_llm/runtime/CMakeLists.txt, cpp/tensorrt_llm/runtime/utils/CMakeLists.txt, cpp/tensorrt_llm/batch_manager/CMakeLists.txt, cpp/tensorrt_llm/executor/cache_transmission/ucx_utils/CMakeLists.txt, cpp/tensorrt_llm/thop/CMakeLists.txt, scripts/build_wheel.py, setup.py, requirements.txt, .gitignore
Adds pg_utils library and bindings, links Torch/Torch Python where needed, packages libpg_utils.so and pgBroker.cpp, includes Ray dependency, ignores pg_utils_bindings.*.so.
Runtime Utils: PG/MPI
cpp/include/tensorrt_llm/runtime/utils/pgUtils.h, cpp/tensorrt_llm/runtime/utils/pgUtils.cpp, cpp/tensorrt_llm/runtime/utils/pgUtilsBindings.cpp, tensorrt_llm/_torch/distributed/pg_utils.py, tensorrt_llm/_torch/distributed/pgBroker.cpp, cpp/include/tensorrt_llm/runtime/utils/mpiUtils.h, cpp/tensorrt_llm/runtime/utils/mpiUtils.cpp, tensorrt_llm/_utils.py
Adds ProcessGroup helper APIs, global PG broker init from Python, bindings, and Python-side split. Introduces MpiComm::couldUseMPI() and environment-driven mpi_disabled() with rank/trace adjustments.
Batch Manager Comm Abstraction
cpp/include/tensorrt_llm/batch_manager/cacheTransceiver.h, cpp/tensorrt_llm/batch_manager/cacheTransceiver.cpp, cpp/tensorrt_llm/pybind/batch_manager/cacheTransceiver.cpp
Introduces CacheTransceiverComm (MPI/PG wrapper), refactors transceiver to use unified comm, implements PG path (split, allgather/allgatherv), and adds temporary Python bindings.
UCX Cache Transmission
cpp/tensorrt_llm/executor/cache_transmission/ucx_utils/ucxCacheCommunicator.{h,cpp}, cpp/tensorrt_llm/executor/cache_transmission/ucx_utils/connection.cpp
Makes rank/world-size backend-agnostic (MPI or PG), updates logs to use manager rank, adds PG-based gather paths.
Torch Ops PG Variants
cpp/tensorrt_llm/thop/allgatherOp.cpp, cpp/tensorrt_llm/thop/reducescatterOp.cpp, cpp/tensorrt_llm/thop/allreduceOp.cpp
Adds ProcessGroup-backed implementations and Torch bindings for allgather/reducescatter/allreduce; dispatches between NCCL and PG.
Distributed Communicator (Python)
tensorrt_llm/_torch/distributed/communicator.py, tensorrt_llm/_torch/distributed/ops.py, tensorrt_llm/_ipc_utils.py, tensorrt_llm/_torch/pyexecutor/{py_executor.py,executor_request_queue.py,py_executor_creator.py,resource_manager.py,sampler.py}
Introduces TorchDist with cluster/local groups, PG-based pp-comm alternative, PG collectives when MPI disabled, Ray-friendly recv/send paths, timeouts, and async object transfers.
Device Mesh & Mapping
tensorrt_llm/_torch/device_mesh.py, tensorrt_llm/mapping.py
Adds DeviceMeshTopology and Mapping wrapper selecting MPI vs DeviceMesh at runtime; exposes PG group accessors and lazy mesh build.
Executor & Ray Orchestration
tensorrt_llm/executor/executor.py, tensorrt_llm/executor/ray_executor.py, tensorrt_llm/executor/ray_gpu_worker.py, tensorrt_llm/executor/result.py
Adds RayExecutor, Ray GPU worker/actor wrapper, Ray-backed result queues, and factory routing via orchestrator_type.
LLM API Extensions
tensorrt_llm/llmapi/llm.py, tensorrt_llm/llmapi/llm_args.py
Adds orchestrator_type arg, enables Ray path when set or MPI disabled, exposes collective_rpc and async/sync weight update methods.
Examples (Ray)
examples/ray/*, examples/ray/disaggregated/*, examples/ray/multi_nodes/*, examples/ray/to_delete/*
Adds Ray-based examples, scripts, and experimental tools for async/distributed inference, disaggregated serving, benchmarks, and tests.
Tests (Ray/PG)
tests/unittest/_torch/multi_gpu/*, tests/unittest/_torch/ray/*, tests/unittest/conftest.py, tests/integration/defs/{conftest.py,trt_test_alternative.py}, tests/unittest/_torch/executor/test_overlap_scheduler.py, tests/unittest/llmapi/*, tests/unittest/utils/util.py
Adds PG/Ray multi-GPU tests, Ray test controls/markers, MPI↔Ray parity parametrization, and utility for free port discovery.
Minor Edits
tensorrt_llm/_torch/models/modeling_{phi4mm,utils}.py, tensorrt_llm/_torch/auto_deploy/distributed/common.py
Formatting tweaks and internal port utility redirection.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor User
  participant LLM as LLM.create()
  participant Exec as GenerationExecutor
  participant Ray as RayExecutor
  participant PG as Torch Dist (PG)
  participant W as RayGPUWorker[*]

  User->>LLM: create(model, orchestrator_type="ray", tp_size)
  LLM->>Exec: GenerationExecutor.create(**args)
  alt orchestrator_type == "ray"
    Exec->>Ray: _create_ray_executor(worker_kwargs, world_size, tp_size, ...)
    Ray->>Ray: init Ray cluster / placement group
    Ray->>PG: initialize process group(s)
    Ray->>W: create actors (world_size)
  else
    Exec->>Exec: fallback (MPI)
  end
  User->>LLM: generate/request
  LLM->>Ray: submit(request)
  Ray->>W: enqueue_request(leader)
  W-->>Ray: result stream/items
  Ray-->>LLM: GenerationResult
  LLM-->>User: outputs
Loading
sequenceDiagram
  autonumber
  participant CT as CacheTransceiver
  participant Comm as CacheTransceiverComm
  participant MPI as MPI Comm
  participant PG as ProcessGroup
  note over CT: Initialize
  CT->>Comm: construct (from MPI or PG)
  alt MPI path
    Comm->>MPI: split(color,key)
    Comm->>MPI: allgather/allgatherv
  else PG path
    Comm->>PG: split via Python helper
    Comm->>PG: allgather/allgatherv (PgHelper)
  end
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~150 minutes

Possibly related PRs

Suggested labels

Community want to contribute

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai or @coderabbitai title anywhere in the PR title to generate the title automatically.

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 93

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (10)
tensorrt_llm/_torch/pyexecutor/model_engine.py (1)

973-996: Preserve MetaInit path when ModelConfig.clone() is unavailable.

Switching from deepcopy to clone() risks skipping the MetaInit path if clone() isn't implemented (AttributeError) or not universally supported across configs, causing a fallback to full materialization and potential OOM/perf regressions. Prefer clone() but fall back to deepcopy only for the copy step to retain the MetaInit flow.

             try:
-                # config will be modified in-place for some models, like Qwen2
-                config_copy = config.clone()
+                # config will be modified in-place for some models, like Qwen2
+                # Prefer lightweight clone; fall back to deepcopy to preserve MetaInit path if clone is unavailable.
+                try:
+                    if hasattr(config, "clone"):
+                        config_copy = config.clone()
+                    else:
+                        import copy as _copy
+                        config_copy = _copy.deepcopy(config)
+                except Exception:
+                    import copy as _copy
+                    config_copy = _copy.deepcopy(config)
                 with MetaInitMode():
                     model = AutoModelForCausalLM.from_config(config_copy)
 
                 memo = dict()
tensorrt_llm/_torch/pyexecutor/py_executor_creator.py (1)

255-256: Fix logger call to avoid formatting error at runtime.

logger.info("ATTENTION RUNTIME FEATURES: ", attn_runtime_features) will be formatted with % under std logging and can raise. Use %s or f-string.

-    logger.info("ATTENTION RUNTIME FEATURES: ", attn_runtime_features)
+    logger.info("ATTENTION RUNTIME FEATURES: %s", attn_runtime_features)
cpp/tensorrt_llm/runtime/utils/mpiUtils.cpp (2)

303-316: Fix incorrect printf specifiers for size_t in log messages.

size is size_t but logged with %d, which is UB on LP64 and truncates on 64-bit. Use %zu (or cast to unsigned long long with %llu).

Apply:

-    TLLM_LOG_DEBUG("start MPI_Isend with dest %d, tag %d, size %d", dest, static_cast<int>(tag), size);
+    TLLM_LOG_DEBUG("start MPI_Isend with dest %d, tag %d, size %zu", dest, static_cast<int>(tag), size);
@@
-    TLLM_LOG_DEBUG("end MPI_Isend with dest %d, tag %d, size %d", dest, static_cast<int>(tag), size);
+    TLLM_LOG_DEBUG("end MPI_Isend with dest %d, tag %d, size %zu", dest, static_cast<int>(tag), size);
@@
-    TLLM_LOG_DEBUG("start MPI_Send with dest %d, tag %d, size %d", dest, tag, size);
+    TLLM_LOG_DEBUG("start MPI_Send with dest %d, tag %d, size %zu", dest, tag, size);
@@
-    TLLM_LOG_DEBUG("end MPI_Send with dest %d, tag %d, size %d", dest, tag, size);
+    TLLM_LOG_DEBUG("end MPI_Send with dest %d, tag %d, size %zu", dest, tag, size);
@@
-    TLLM_LOG_DEBUG("start MPI_Recv with source %d, tag %d, size %d", source, tag, size);
+    TLLM_LOG_DEBUG("start MPI_Recv with source %d, tag %d, size %zu", source, tag, size);
@@
-    TLLM_LOG_DEBUG("end MPI_Recv with source %d, tag %d, size %d", source, tag, size);
+    TLLM_LOG_DEBUG("end MPI_Recv with source %d, tag %d, size %zu", source, tag, size);

Also applies to: 324-334, 348-360


429-466: Gate probe APIs with couldUseMPI() for consistent MPI-disable behavior.

mprobe/improbe/iprobe bypass the new runtime MPI guard. They should early-guard like other ops.

 void MpiComm::mprobeRawTag(int source, int tag, MPI_Message* msg, MPI_Status* status) const
 {
+    couldUseMPI();
 #if ENABLE_MULTI_DEVICE
@@
 bool MpiComm::improbe(int source, MpiTag tag, MPI_Message* msg, MPI_Status* status) const
 {
+    couldUseMPI();
 #if ENABLE_MULTI_DEVICE
@@
 bool MpiComm::iprobe(int source, MpiTag tag, MPI_Status* status) const
 {
+    couldUseMPI();
 #if ENABLE_MULTI_DEVICE
setup.py (2)

175-187: Fix undefined exception variable 'e' when raising SetupError.

e is not defined in this scope; this raises UnboundLocalError instead of the intended SetupError.

-            else:
-                raise SetupError(
-                    f"Failed to get wheel file from {precompiled_path}.") from e
+            else:
+                raise SetupError(
+                    f"Failed to get wheel file from {precompiled_path}.")

206-211: Align python_requires with actual syntax usage (PEP 604 unions).

The file uses type hints like str | None, which require Python 3.10+. Current python_requires is ">=3.7", which will break installs.

-    python_requires=">=3.7, <4")
+    python_requires=">=3.10, <4")

Also applies to: 264-264

cpp/tensorrt_llm/batch_manager/CMakeLists.txt (1)

112-116: Ensure Python3::Python target exists before linking
In cpp/tensorrt_llm/batch_manager/CMakeLists.txt before line 112, wrap the Python target usage with a guarded find_package:

+if(NOT TARGET Python3::Python)
+  find_package(Python3 REQUIRED COMPONENTS Interpreter Development)
+endif()
 find_library(TORCH_PYTHON_LIB torch_python REQUIRED
              HINTS ${TORCH_INSTALL_PREFIX}/lib)
 target_link_libraries(${BATCH_MANAGER_STATIC_TARGET}
                       PUBLIC ${TORCH_PYTHON_LIB} Python3::Python pg_utils)
tests/unittest/_torch/ray/test_placement.py (1)

54-67: Test cleanup and potential race condition concerns

The test modifies and deletes CUDA_VISIBLE_DEVICES but doesn't ensure proper cleanup if the test fails. Additionally, there's no Ray cleanup.

Apply this diff to ensure proper cleanup:

 @pytest.mark.gpu2
 def test_cuda_visible_device():
     """Placement via cuda_visible_device"""
+    original_cuda_visible = os.environ.get("CUDA_VISIBLE_DEVICES")
     os.environ["CUDA_VISIBLE_DEVICES"] = "1"
-
-    llm = LLM(model="TinyLlama/TinyLlama-1.1B-Chat-v1.0",
-              orchestrator_type="ray")
-
-    infer_actor_uuids = llm.collective_rpc("report_device_id")
-
-    del os.environ["CUDA_VISIBLE_DEVICES"]
-    assert infer_actor_uuids[0] == get_device_uuid(1)
-    print(f"{infer_actor_uuids=}")
+    try:
+        ray.init()
+        llm = LLM(model="TinyLlama/TinyLlama-1.1B-Chat-v1.0",
+                  orchestrator_type="ray")
+        
+        infer_actor_uuids = llm.collective_rpc("report_device_id")
+        assert infer_actor_uuids[0] == get_device_uuid(1)
+        print(f"{infer_actor_uuids=}")
+    finally:
+        # Restore original environment
+        if original_cuda_visible is not None:
+            os.environ["CUDA_VISIBLE_DEVICES"] = original_cuda_visible
+        else:
+            os.environ.pop("CUDA_VISIBLE_DEVICES", None)
+        ray.shutdown()
tensorrt_llm/_torch/pyexecutor/py_executor.py (2)

862-871: Disjoint tag namespaces for different inter-PP messages.

Tokens and logits both use tag=prev_microbatch_id between the same src/dst. Per prior incident, these must not share a tag space to avoid message collisions.

I used the retrieved learning from PR #7455. Suggest using distinct, documented offsets, e.g., kTOKENS_TAG_BASE=0, kLOGITS_TAG_BASE=100000, then tag=k*_BASE+prev_microbatch_id.

Also applies to: 1849-1863


518-546: Initialize or remove self.global_rank
tensorrt_llm/_torch/pyexecutor/py_executor.py logs self.global_rank (around lines 552–556) but PyExecutor.__init__ never sets it, causing an AttributeError at runtime. Either add self.global_rank = dist.rank in the constructor or drop it from the log.

Comment on lines +79 to 86
std::string error = std::string("Error in UcxConnection constructor for rank ")
+ std::to_string(mManager->getRank()) + ": " + e.what();
TLLM_THROW(error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid relying on possibly-uninitialized rank in error logs.

mManager->getRank() depends on UcxConnectionManager::mRank. If not set yet, this read is UB. Please default-initialize (see header comment) and/or guard logs to fall back to a sentinel when rank is unknown.

}
else
{
TLLM_LOG_DEBUG(mRank, "WARNING: Process group is null, defaulting to single process");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix log level inconsistency

Line 184 uses TLLM_LOG_DEBUG with mRank as the first parameter, but mRank is uninitialized at this point (it's assigned on line 185).

-                TLLM_LOG_DEBUG(mRank, "WARNING: Process group is null, defaulting to single process");
+                TLLM_LOG_WARNING("Process group is null, defaulting to single process");
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
TLLM_LOG_DEBUG(mRank, "WARNING: Process group is null, defaulting to single process");
TLLM_LOG_WARNING("Process group is null, defaulting to single process");
🤖 Prompt for AI Agents
In
cpp/tensorrt_llm/executor/cache_transmission/ucx_utils/ucxCacheCommunicator.cpp
around line 184, the log call uses mRank before it is assigned on line 185; move
the log statement so it runs after mRank is initialized (or alternatively
replace the first parameter with a known safe value like 0 or a local default)
and keep the same log level; ensure no use of uninitialized mRank in any logging
calls.

Comment on lines +275 to +319
char* begin = reinterpret_cast<char*>(recvBuffer.data());
for (int r = 0; r < mWorldSize; ++r)
{
std::vector<char> serBuffer(begin, begin + sizeofBuffer[r]);
begin += sizeofBuffer[r];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Potential buffer overflow risk in pointer arithmetic

The deserialization loop uses raw pointer arithmetic without bounds checking. While the buffer size is calculated correctly, adding explicit bounds validation would be safer.

 // deserialize
 char* begin = reinterpret_cast<char*>(recvBuffer.data());
+char* end = begin + recvBuffer.size();
 for (int r = 0; r < mWorldSize; ++r)
 {
+    TLLM_CHECK_WITH_INFO(begin + sizeofBuffer[r] <= end, 
+        "Buffer overflow detected during deserialization");
     std::vector<char> serBuffer(begin, begin + sizeofBuffer[r]);
     begin += sizeofBuffer[r];
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
char* begin = reinterpret_cast<char*>(recvBuffer.data());
for (int r = 0; r < mWorldSize; ++r)
{
std::vector<char> serBuffer(begin, begin + sizeofBuffer[r]);
begin += sizeofBuffer[r];
// deserialize
char* begin = reinterpret_cast<char*>(recvBuffer.data());
char* end = begin + recvBuffer.size();
for (int r = 0; r < mWorldSize; ++r)
{
TLLM_CHECK_WITH_INFO(
begin + sizeofBuffer[r] <= end,
"Buffer overflow detected during deserialization"
);
std::vector<char> serBuffer(begin, begin + sizeofBuffer[r]);
begin += sizeofBuffer[r];
}
🤖 Prompt for AI Agents
In
cpp/tensorrt_llm/executor/cache_transmission/ucx_utils/ucxCacheCommunicator.cpp
around lines 275-279, the loop that constructs serBuffer using pointer
arithmetic can overflow because it assumes sizeofBuffer entries and cumulative
offsets are valid; add explicit bounds checks before constructing each
serBuffer: validate each sizeofBuffer[r] is non-negative and that offset +
sizeofBuffer[r] <= recvBuffer.size() (use size_t for offsets), compute remaining
bytes and reject or handle the error (log/throw/return) if any check fails, and
only then create the vector from begin to begin + sizeofBuffer[r] and advance
the offset/pointer safely.

Comment on lines +58 to 60
int mRank;
int mWorldSize;
std::atomic<UcxConnection::ConnectionIdType> mConnectionIdCounter{1};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Initialize new members to avoid UB and noisy ranks in logs.

mRank/mWorldSize are not default-initialized; getRank() is used immediately for error/log messages. Reading an uninitialized int is UB.

Apply:

-    int mRank;
-    int mWorldSize;
+    int mRank{-1};
+    int mWorldSize{0};

To ensure they’re set at runtime, verify constructor/initialization paths:


🏁 Script executed:

#!/bin/bash
rg -nP 'UcxConnectionManager::UcxConnectionManager|mRank\s*=|mWorldSize\s*=' -C3

Length of output: 26686


Initialize mRank and mWorldSize to default values
In ucxCacheCommunicator.h (lines 58–60), add default initializers to prevent undefined behavior when the null-process-group branch in UcxConnectionManager (ucxCacheCommunicator.cpp lines 183–185) logs mRank before it’s assigned:

-    int mRank;
-    int mWorldSize;
+    int mRank{-1};
+    int mWorldSize{0};
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
int mRank;
int mWorldSize;
std::atomic<UcxConnection::ConnectionIdType> mConnectionIdCounter{1};
int mRank{-1};
int mWorldSize{0};
std::atomic<UcxConnection::ConnectionIdType> mConnectionIdCounter{1};
🤖 Prompt for AI Agents
In cpp/tensorrt_llm/executor/cache_transmission/ucx_utils/ucxCacheCommunicator.h
around lines 58 to 60, mRank and mWorldSize are declared but not initialized so
logging them before assignment can cause undefined behavior; add default
initializers (e.g., int mRank = -1; int mWorldSize = -1;) so they have a defined
sentinel value until properly set, leaving the existing atomic
mConnectionIdCounter line unchanged.

Comment on lines +84 to +87
[[nodiscard]] int getRank() const
{
return mRank;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

API symmetry: expose world size or remove the unused field.

Either add getWorldSize() or drop mWorldSize until needed to prevent -Wunused-private-field in Werror builds.

     [[nodiscard]] int getRank() const
     {
         return mRank;
     }
+    [[nodiscard]] int getWorldSize() const
+    {
+        return mWorldSize;
+    }

Check if mWorldSize is referenced anywhere:


🏁 Script executed:

#!/bin/bash
rg -n '\bmWorldSize\b' -C2

Length of output: 15032


Add getWorldSize() accessor for API symmetry.
mWorldSize is actively used in ucxCacheCommunicator.cpp (initialization, buffer sizing and loops) and must be retained—expose it alongside getRank():

     [[nodiscard]] int getRank() const
     {
         return mRank;
     }
+    [[nodiscard]] int getWorldSize() const
+    {
+        return mWorldSize;
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
[[nodiscard]] int getRank() const
{
return mRank;
}
[[nodiscard]] int getRank() const
{
return mRank;
}
[[nodiscard]] int getWorldSize() const
{
return mWorldSize;
}
🤖 Prompt for AI Agents
In cpp/tensorrt_llm/executor/cache_transmission/ucx_utils/ucxCacheCommunicator.h
around lines 84-87, add a symmetric accessor for mWorldSize similar to
getRank(): declare and implement a [[nodiscard]] int getWorldSize() const that
returns mWorldSize so callers in ucxCacheCommunicator.cpp can access world size
for initialization, buffer sizing and loops; ensure the method is const and
placed alongside getRank() in the class declaration.

Comment on lines +98 to +89
def physical_to_local_id(phys_id: int) -> int:
visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES")
if not visible_devices:
return phys_id
id_mapping = list(map(int, visible_devices.split(",")))
return id_mapping.index(phys_id)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Robust mapping for CUDA_VISIBLE_DEVICES.

Guard against phys_id not found to avoid ValueError.

     @staticmethod
     def physical_to_local_id(phys_id: int) -> int:
         visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES")
         if not visible_devices:
             return phys_id
-        id_mapping = list(map(int, visible_devices.split(",")))
-        return id_mapping.index(phys_id)
+        id_mapping = [int(x) for x in visible_devices.split(",") if x != ""]
+        for idx, val in enumerate(id_mapping):
+            if val == phys_id:
+                return idx
+        # Fallback: if phys_id not in mapping, assume single device context
+        return 0
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def physical_to_local_id(phys_id: int) -> int:
visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES")
if not visible_devices:
return phys_id
id_mapping = list(map(int, visible_devices.split(",")))
return id_mapping.index(phys_id)
@staticmethod
def physical_to_local_id(phys_id: int) -> int:
visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES")
if not visible_devices:
return phys_id
id_mapping = [int(x) for x in visible_devices.split(",") if x != ""]
for idx, val in enumerate(id_mapping):
if val == phys_id:
return idx
# Fallback: if phys_id not in mapping, assume single-device context
return 0
🤖 Prompt for AI Agents
In tensorrt_llm/executor/ray_gpu_worker.py around lines 98 to 104, the
physical_to_local_id function calls id_mapping.index(phys_id) which will raise a
ValueError if phys_id is not present; to fix this, check membership first (e.g.,
if phys_id in id_mapping) and only call index when present, otherwise handle the
missing mapping gracefully—either return phys_id unchanged or raise a clearer
error with context; also defensively parse CUDA_VISIBLE_DEVICES (strip entries
and skip empty strings) before building id_mapping.

Comment on lines +325 to +284
def enqueue_request(self,
request: GenerationRequest,
result_wait_queue: Queue | None = None) -> int:
# Largely adapted from GenerationExecutorWorker. WAR: Keep in sync manually.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Python 3.8 compatibility: avoid PEP 604 union syntax (second occurrence).

-    def enqueue_request(self,
-                        request: GenerationRequest,
-                        result_wait_queue: Queue | None = None) -> int:
+    def enqueue_request(self,
+                        request: GenerationRequest,
+                        result_wait_queue: Optional[Queue] = None) -> int:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def enqueue_request(self,
request: GenerationRequest,
result_wait_queue: Queue | None = None) -> int:
# Largely adapted from GenerationExecutorWorker. WAR: Keep in sync manually.
def enqueue_request(self,
request: GenerationRequest,
result_wait_queue: Optional[Queue] = None) -> int:
# Largely adapted from GenerationExecutorWorker. WAR: Keep in sync manually.
🤖 Prompt for AI Agents
In tensorrt_llm/executor/ray_gpu_worker.py around lines 325 to 328, the method
signature uses PEP 604 union syntax "Queue | None" which is not compatible with
Python 3.8; change the annotation to use typing.Optional (e.g., Optional[Queue])
and ensure Optional is imported from typing (or add it to the existing typing
imports), and update any other identical occurrences in the file to maintain
Python 3.8 compatibility.

Comment on lines +336 to +297
lora_config = tllm.LoraConfig(
task_id=request.lora_request.adapter_id,
weights=self._lora_manager.cpp_lora_weights[uid]
if not adapter_in_cache else None,
config=self._lora_manager.cpp_lora_config[uid])
py_lora_path = request.lora_request.lora_path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Likely API misuse: methods vs. mappings on LoraManager.

cpp_lora_weights/cpp_lora_config are methods in LoraManager; indexing without calling will raise “'method' object is not subscriptable”.

-                weights=self._lora_manager.cpp_lora_weights[uid]
+                weights=self._lora_manager.cpp_lora_weights()[uid]
                 if not adapter_in_cache else None,
-                config=self._lora_manager.cpp_lora_config[uid])
+                config=self._lora_manager.cpp_lora_config()[uid])
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
lora_config = tllm.LoraConfig(
task_id=request.lora_request.adapter_id,
weights=self._lora_manager.cpp_lora_weights[uid]
if not adapter_in_cache else None,
config=self._lora_manager.cpp_lora_config[uid])
py_lora_path = request.lora_request.lora_path
lora_config = tllm.LoraConfig(
task_id=request.lora_request.adapter_id,
weights=self._lora_manager.cpp_lora_weights()[uid]
if not adapter_in_cache else None,
config=self._lora_manager.cpp_lora_config()[uid])
py_lora_path = request.lora_request.lora_path
🤖 Prompt for AI Agents
In tensorrt_llm/executor/ray_gpu_worker.py around lines 336 to 341, the code
treats cpp_lora_weights and cpp_lora_config as mappings (using [uid]) but they
are methods on LoraManager; change the calls to invoke the methods with uid
(e.g., use self._lora_manager.cpp_lora_weights(uid) when not adapter_in_cache
else None, and config=self._lora_manager.cpp_lora_config(uid)) so you call the
functions instead of attempting to subscript method objects.

Comment on lines +530 to +495
def shutdown(self):

if self.doing_shutdown:
return
else:
self.doing_shutdown = True

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

self.doing_shutdown used before initialization.

Accessing an undefined attribute can raise AttributeError during shutdown or GC.

     def shutdown(self):
 
-        if self.doing_shutdown:
+        if getattr(self, "doing_shutdown", False):
             return
         else:
             self.doing_shutdown = True

Also initialize it in __init__:

         self.llm_args: Optional[BaseLlmArgs] = None,
     ) -> None:
+        self.doing_shutdown = False
         postproc_config = postproc_worker_config or PostprocWorkerConfig()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def shutdown(self):
if self.doing_shutdown:
return
else:
self.doing_shutdown = True
def shutdown(self):
if getattr(self, "doing_shutdown", False):
return
else:
self.doing_shutdown = True
def __init__(
self,
# … other parameters …
llm_args: Optional[BaseLlmArgs] = None,
) -> None:
self.doing_shutdown = False
postproc_config = postproc_worker_config or PostprocWorkerConfig()
# … rest of initialization …
🤖 Prompt for AI Agents
In tensorrt_llm/executor/ray_gpu_worker.py around lines 530-536,
self.doing_shutdown is referenced before being defined which can raise
AttributeError; add self.doing_shutdown = False to the class __init__ to ensure
it's always present and set to False by default, then keep the shutdown method
setting it to True when starting shutdown; optionally make the shutdown guard
defensive (if getattr(self, "doing_shutdown", False): return) if you want extra
safety.

Comment on lines 2332 to 2352
orchestrator_type: Optional[str] = Field(
default=None,
description=
"The orchestrator type to use. Options: 'ray'. Defaults to None, which uses MPI.",
status="prototype",
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Constrain and validate orchestrator_type (avoid silent misconfig).

Limit values to a known set and normalize case; today only "ray" is supported. This prevents typos from falling back to MPI unexpectedly.

Apply:

-    orchestrator_type: Optional[str] = Field(
+    orchestrator_type: Optional[Literal["ray"]] = Field(
         default=None,
         description=
         "The orchestrator type to use. Options: 'ray'. Defaults to None, which uses MPI.",
         status="prototype",
     )

Add validator within TorchLlmArgs to normalize input:

@field_validator("orchestrator_type", mode="before")
@classmethod
def _validate_orchestrator_type(cls, v):
    if v is None:
        return None
    val = str(v).strip().lower()
    if val == "ray":
        return "ray"
    raise ValueError("Invalid orchestrator_type. Allowed: 'ray' or None.")
🤖 Prompt for AI Agents
In tensorrt_llm/llmapi/llm_args.py around lines 2332 to 2338 the
orchestrator_type Field accepts arbitrary strings and can silently fall back to
MPI; add a before-mode field validator on TorchLlmArgs named like
_validate_orchestrator_type that normalizes the input (strip and lower), returns
None when input is None, returns "ray" for any case-variation of "ray", and
raises ValueError("Invalid orchestrator_type. Allowed: 'ray' or None.") for
anything else; ensure the validator is a @field_validator("orchestrator_type",
mode="before") @classmethod so invalid values are rejected early.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review continued from previous batch...

Comment on lines +38 to 39
#include <optional>
#include <thread>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing headers for std::string and std::runtime_error.

couldUseMPI() uses std::string and std::runtime_error but and are not included. This will fail to compile.

Apply:

 #include <mutex>
 #include <optional>
+#include <stdexcept>
+#include <string>
 #include <thread>
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#include <optional>
#include <thread>
#include <mutex>
#include <optional>
#include <stdexcept>
#include <string>
#include <thread>
🤖 Prompt for AI Agents
In cpp/include/tensorrt_llm/runtime/utils/mpiUtils.h around lines 38-39, the
file uses std::string and std::runtime_error in couldUseMPI() but does not
include the corresponding headers; add #include <string> and #include
<stdexcept> near the other includes so the types are declared and the file
compiles cleanly.

Comment on lines 28 to 30
#include <torch/csrc/jit/python/pybind_utils.h>
#include <torch/extension.h>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add missing header for std::accumulate.

std::accumulate is used below but <numeric> is not included, which will fail to compile.

Apply:

 #include <pybind11/stl_bind.h>
+#include <numeric>
 #include <torch/csrc/jit/python/pybind_utils.h>
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#include <torch/csrc/jit/python/pybind_utils.h>
#include <torch/extension.h>
#include <pybind11/stl_bind.h>
#include <numeric>
#include <torch/csrc/jit/python/pybind_utils.h>
#include <torch/extension.h>
🤖 Prompt for AI Agents
In cpp/tensorrt_llm/pybind/batch_manager/cacheTransceiver.cpp around lines 28 to
30, the file uses std::accumulate but does not include the <numeric> header; add
#include <numeric> alongside the existing includes so the code compiles.

Comment on lines 81 to 85
{
auto pg = torch::jit::toCustomClass<c10d::ProcessGroup>(pg_obj);
return tb::CacheTransceiverComm(pg);
}),
py::arg("process_group"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Constructor may require a boxed (ScriptObject) ProcessGroup; add type check and clear error.

torch::jit::toCustomClass<c10d::ProcessGroup>(pg_obj) expects a TorchScript custom class object. If a user passes a regular torch.distributed.ProcessGroup, this will throw with a vague error.

Apply a safer conversion and explicit message:

-                [](py::object pg_obj)
+                [](py::object pg_obj)
                 {
-                    auto pg = torch::jit::toCustomClass<c10d::ProcessGroup>(pg_obj);
+                    c10::intrusive_ptr<c10d::ProcessGroup> pg;
+                    try {
+                        // Accept either boxed (ScriptObject) or regular ProcessGroup
+                        if (py::hasattr(pg_obj, "boxed")) {
+                            auto boxed = pg_obj.attr("boxed")();
+                            pg = torch::jit::toCustomClass<c10d::ProcessGroup>(boxed);
+                        } else {
+                            pg = torch::jit::toCustomClass<c10d::ProcessGroup>(pg_obj);
+                        }
+                    } catch (const std::exception& e) {
+                        throw std::invalid_argument(
+                            std::string("CacheTransceiverComm requires a torch.distributed ProcessGroup (boxed or ScriptObject). Error: ") + e.what());
+                    }
                     return tb::CacheTransceiverComm(pg);
                 }),
🤖 Prompt for AI Agents
In cpp/tensorrt_llm/pybind/batch_manager/cacheTransceiver.cpp around lines
81-85, the conversion using
torch::jit::toCustomClass<c10d::ProcessGroup>(pg_obj) assumes pg_obj is a
TorchScript boxed custom class and will throw a vague error for regular
torch.distributed.ProcessGroup objects; add an explicit type check and clearer
error handling: first verify pg_obj is a ScriptObject / custom class (or use the
appropriate torch::jit API to test instance type) and if not throw a
py::type_error with a message explaining that a boxed ScriptObject ProcessGroup
is required (and suggest how to obtain/convert one), and wrap the toCustomClass
call in a try/catch to rethrow any conversion exception as a py::type_error with
the same clear message so callers get actionable feedback.

Comment on lines +127 to +191
int total_size = std::accumulate(sizes.begin(), sizes.end(), 0);
std::vector<int64_t> output(total_size);
bool ok = self.allgatherv(std::ref(input), std::ref(output), std::cref(sizes));
return py::make_tuple(ok, output);
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use size_t and avoid int overflow in total_size calculation.

int total_size = std::accumulate(..., 0); can overflow; also the initial value selects int accumulation. Prefer size_t with a size_t{0} initializer.

Apply:

-                int total_size = std::accumulate(sizes.begin(), sizes.end(), 0);
-                std::vector<int64_t> output(total_size);
+                std::size_t total_size = std::accumulate(sizes.begin(), sizes.end(), std::size_t{0});
+                std::vector<int64_t> output(static_cast<std::size_t>(total_size));
-                int total_size = std::accumulate(sizes.begin(), sizes.end(), 0);
-                std::vector<double> output(total_size);
+                std::size_t total_size = std::accumulate(sizes.begin(), sizes.end(), std::size_t{0});
+                std::vector<double> output(static_cast<std::size_t>(total_size));
-                int total_size = std::accumulate(sizes.begin(), sizes.end(), 0);
-                std::vector<char> output(total_size);
+                std::size_t total_size = std::accumulate(sizes.begin(), sizes.end(), std::size_t{0});
+                std::vector<char> output(static_cast<std::size_t>(total_size));

Also applies to: 138-142, 149-153

🤖 Prompt for AI Agents
In cpp/tensorrt_llm/pybind/batch_manager/cacheTransceiver.cpp around lines
127-131, the total_size accumulation uses int and a 0 initializer which can
overflow; change the accumulation to use size_t total_size =
std::accumulate(sizes.begin(), sizes.end(), size_t{0}); keep output as
std::vector<int64_t>(total_size) so the size type matches. Apply the same change
to the other occurrences mentioned around lines 138-142 and 149-153 so all
total_size calculations use size_t with size_t{0} initializer to avoid integer
overflow.

Comment on lines 25 to 31
target_link_libraries(pg_utils PUBLIC ${TORCH_LIBRARIES})

# Find torch_python
find_library(TORCH_PYTHON_LIB torch_python REQUIRED
HINTS ${TORCH_INSTALL_PREFIX}/lib)

# Create Python binding module
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Propagate TORCH_CXX_FLAGS to avoid C++ ABI/link issues.

On many setups, Torch sets critical flags (e.g., -D_GLIBCXX_USE_CXX11_ABI). Without them you can hit link/runtime errors.

 target_link_libraries(pg_utils PUBLIC ${TORCH_LIBRARIES})
 
 # Find torch_python
 find_library(TORCH_PYTHON_LIB torch_python REQUIRED
              HINTS ${TORCH_INSTALL_PREFIX}/lib)
 
+# Honor Torch CXX flags for ABI compatibility
+if(DEFINED TORCH_CXX_FLAGS)
+  target_compile_options(pg_utils PRIVATE ${TORCH_CXX_FLAGS})
+  target_compile_options(pg_utils_bindings PRIVATE ${TORCH_CXX_FLAGS})
+endif()

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +1 to +3
import copy
import os
import unittest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Copyright header missing.

According to the coding guidelines, all source files should have an NVIDIA copyright header prepended.

Add the NVIDIA copyright header at the beginning of the file:

#
# Copyright (c) 2025, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
🤖 Prompt for AI Agents
In tests/unittest/_torch/ray/test_mapping.py around lines 1 to 3, the file is
missing the required NVIDIA copyright header; prepend the standard NVIDIA
Copyright header block (including year 2025, NVIDIA CORPORATION, and the Apache
License 2.0 boilerplate shown in the review comment) at the very top of the file
before any imports so the file starts with the full header lines followed by the
existing imports.

Comment on lines +27 to +28
# 8 GPPUs with cp
(4, 1, 2, -1, -1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix typo in comment.

There's a typo in the comment: "GPPUs" should be "GPUs".

-            # 8 GPPUs with cp
+            # 8 GPUs with cp
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# 8 GPPUs with cp
(4, 1, 2, -1, -1),
# 8 GPUs with cp
(4, 1, 2, -1, -1),
🤖 Prompt for AI Agents
In tests/unittest/_torch/ray/test_mapping.py around lines 27 to 28, there's a
typo in the inline comment "GPPUs" — change it to "GPUs" so the comment reads "8
GPUs with cp" (or the intended wording), preserving punctuation and alignment.

Comment on lines +1 to +11
import os

import pytest
import ray
from ray.util.placement_group import placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from utils.llm_data import llm_models_root

from tensorrt_llm import LLM
from tensorrt_llm._torch.utils import get_device_uuid
from tensorrt_llm.llmapi import KvCacheConfig
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add copyright header to comply with coding guidelines

All source files must include the NVIDIA copyright header according to the coding guidelines.

Add the following copyright header at the beginning of the file:

# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
🤖 Prompt for AI Agents
In tests/unittest/_torch/ray/test_placement.py around lines 1 to 11, the file is
missing the required NVIDIA copyright header; add the provided SPDX and
Apache-2.0 license header as the very first lines of the file (above all imports
and code) exactly as given in the review comment so the file complies with the
project's licensing/coding guidelines.

Comment on lines +50 to +51
infer_worker_uuids = ray.get(llm.collective_rpc.remote("report_device_id"))
print(f"{infer_worker_uuids=}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add assertions to validate test expectations

The test prints the results but doesn't validate that the placement actually happened on the expected bundles/devices.

Add assertions to verify the test results:

     infer_worker_uuids = ray.get(llm.collective_rpc.remote("report_device_id"))
     print(f"{infer_worker_uuids=}")
+    
+    # Verify that we got responses from 2 workers (tensor_parallel_size=2)
+    assert len(infer_worker_uuids) == 2, f"Expected 2 worker responses, got {len(infer_worker_uuids)}"
+    
+    # Verify that the workers are on different GPUs
+    assert len(set(infer_worker_uuids)) == 2, "Workers should be on different GPUs"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
infer_worker_uuids = ray.get(llm.collective_rpc.remote("report_device_id"))
print(f"{infer_worker_uuids=}")
infer_worker_uuids = ray.get(llm.collective_rpc.remote("report_device_id"))
print(f"{infer_worker_uuids=}")
# Verify that we got responses from 2 workers (tensor_parallel_size=2)
assert len(infer_worker_uuids) == 2, f"Expected 2 worker responses, got {len(infer_worker_uuids)}"
# Verify that the workers are on different GPUs
assert len(set(infer_worker_uuids)) == 2, "Workers should be on different GPUs"
🤖 Prompt for AI Agents
In tests/unittest/_torch/ray/test_placement.py around lines 50-51, the test
currently prints infer_worker_uuids but has no assertions to confirm placement;
add proper assertions to validate placement by checking that the returned
infer_worker_uuids match the expected devices/bundles (e.g., assert the length
equals the number of expected workers, and assert each reported device_id or
bundle identifier is in the expected set or matches the expected mapping), and
include a clear failure message for each assertion so the test fails when
placement is incorrect.

Comment on lines +169 to +180
def pytest_generate_tests(metafunc: pytest.Metafunc):
if metafunc.definition.get_closest_marker('mpi_ray_parity'):
run_ray = metafunc.config.getoption("--run-ray") or os.environ.get(
"TLLM_RUN_RAY_TESTS") == "1"
if metafunc.definition.get_closest_marker('mpi_ray_parity') and run_ray:
metafunc.parametrize(
'ray_mode',
[
pytest.param('ray', id='ray', marks=pytest.mark.ray),
],
indirect=True,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix redundant conditional check in pytest_generate_tests.

There's a duplicate check for mpi_ray_parity marker on lines 170 and 173.

Apply this diff to remove the redundancy:

 def pytest_generate_tests(metafunc: pytest.Metafunc):
     if metafunc.definition.get_closest_marker('mpi_ray_parity'):
         run_ray = metafunc.config.getoption("--run-ray") or os.environ.get(
             "TLLM_RUN_RAY_TESTS") == "1"
-        if metafunc.definition.get_closest_marker('mpi_ray_parity') and run_ray:
+        if run_ray:
             metafunc.parametrize(
                 'ray_mode',
                 [
                     pytest.param('ray', id='ray', marks=pytest.mark.ray),
                 ],
                 indirect=True,
             )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def pytest_generate_tests(metafunc: pytest.Metafunc):
if metafunc.definition.get_closest_marker('mpi_ray_parity'):
run_ray = metafunc.config.getoption("--run-ray") or os.environ.get(
"TLLM_RUN_RAY_TESTS") == "1"
if metafunc.definition.get_closest_marker('mpi_ray_parity') and run_ray:
metafunc.parametrize(
'ray_mode',
[
pytest.param('ray', id='ray', marks=pytest.mark.ray),
],
indirect=True,
)
def pytest_generate_tests(metafunc: pytest.Metafunc):
if metafunc.definition.get_closest_marker('mpi_ray_parity'):
run_ray = metafunc.config.getoption("--run-ray") or os.environ.get(
"TLLM_RUN_RAY_TESTS") == "1"
if run_ray:
metafunc.parametrize(
'ray_mode',
[
pytest.param('ray', id='ray', marks=pytest.mark.ray),
],
indirect=True,
)
🤖 Prompt for AI Agents
In tests/unittest/conftest.py around lines 169 to 180, the pytest_generate_tests
function redundantly checks for the 'mpi_ray_parity' marker twice; remove the
duplicate conditional so the code checks the marker once, evaluates run_ray, and
only if the marker is present and run_ray is true calls metafunc.parametrize to
set 'ray_mode' with the ray param; ensure indentation and logic remain unchanged
otherwise.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review continued from previous batch...

* limitations under the License.
*/

#pragma once
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use include guards per repo guidelines (replace #pragma once).

Coding guidelines require TRTLLM__H include guards for headers. Replace pragma once and add a closing guard.

Apply this diff:

-#pragma once
+#ifndef TRTLLM_PGUTILS_H
+#define TRTLLM_PGUTILS_H
@@
-} // namespace tensorrt_llm::pg_utils
+} // namespace tensorrt_llm::pg_utils
+
+#endif // TRTLLM_PGUTILS_H

Also applies to: 287-287

Comment on lines +20 to +27
#include <algorithm>
#include <functional>
#include <memory>
#include <numeric>
#include <torch/csrc/distributed/c10d/ProcessGroup.hpp>
#include <torch/torch.h>
#include <vector>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing standard headers break compilation and exception handling.

You use std::enable_if_t/std::is_* (type_traits), std::string (string), std::getenv (cstdlib), and std::rethrow_exception/exception_ptr (exception) but don’t include the respective headers.

Apply this diff to add the missing includes:

 #include <algorithm>
 #include <functional>
 #include <memory>
 #include <numeric>
+#include <exception>
+#include <string>
+#include <type_traits>
+#include <cstdlib>
 #include <torch/csrc/distributed/c10d/ProcessGroup.hpp>
 #include <torch/torch.h>
 #include <vector>
🤖 Prompt for AI Agents
In cpp/include/tensorrt_llm/runtime/utils/pgUtils.h around lines 20 to 27,
several standard headers are missing causing compilation and exception-handling
failures; add the appropriate includes for <type_traits> (for std::enable_if_t
and std::is_*), <string> (for std::string), <cstdlib> (for std::getenv) and
<exception> (for std::rethrow_exception and std::exception_ptr) near the top
alongside the existing headers so the code compiles and exception utilities are
available.

Comment on lines +31 to +50
// Check async op.
inline c10::intrusive_ptr<c10d::Work> pgCheckHelper(
c10::intrusive_ptr<c10d::Work> work, char const* const file, int const line, char const* info)
{
work->wait();
if (work->exception())
{
try
{
std::rethrow_exception(work->exception());
}
catch (...)
{
auto msg = std::string("[TensorRT-LLM][ERROR] Torch distributed operation error: ") + info;
std::throw_with_nested(tensorrt_llm::common::TllmException(file, line, msg.c_str()));
}
}

return work;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Work::wait() return value ignored — failures can be silently missed.

pgCheckHelper(work) calls wait() but doesn’t check its boolean result; some backends signal failure via a false return without setting an exception.

Apply this diff to fail fast and preserve file/line context:

 inline c10::intrusive_ptr<c10d::Work> pgCheckHelper(
     c10::intrusive_ptr<c10d::Work> work, char const* const file, int const line, char const* info)
 {
-    work->wait();
+    bool const ok = work->wait();
+    if (!ok)
+    {
+        auto msg = std::string("[TensorRT-LLM][ERROR] Torch distributed operation wait failed: ") + info;
+        throw tensorrt_llm::common::TllmException(file, line, msg.c_str());
+    }
     if (work->exception())
     {
         try
         {
             std::rethrow_exception(work->exception());
         }
         catch (...)
         {
             auto msg = std::string("[TensorRT-LLM][ERROR] Torch distributed operation error: ") + info;
             std::throw_with_nested(tensorrt_llm::common::TllmException(file, line, msg.c_str()));
         }
     }
 
     return work;
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Check async op.
inline c10::intrusive_ptr<c10d::Work> pgCheckHelper(
c10::intrusive_ptr<c10d::Work> work, char const* const file, int const line, char const* info)
{
work->wait();
if (work->exception())
{
try
{
std::rethrow_exception(work->exception());
}
catch (...)
{
auto msg = std::string("[TensorRT-LLM][ERROR] Torch distributed operation error: ") + info;
std::throw_with_nested(tensorrt_llm::common::TllmException(file, line, msg.c_str()));
}
}
return work;
}
// Check async op.
inline c10::intrusive_ptr<c10d::Work> pgCheckHelper(
c10::intrusive_ptr<c10d::Work> work, char const* const file, int const line, char const* info)
{
bool const ok = work->wait();
if (!ok)
{
auto msg = std::string("[TensorRT-LLM][ERROR] Torch distributed operation wait failed: ") + info;
throw tensorrt_llm::common::TllmException(file, line, msg.c_str());
}
if (work->exception())
{
try
{
std::rethrow_exception(work->exception());
}
catch (...)
{
auto msg = std::string("[TensorRT-LLM][ERROR] Torch distributed operation error: ") + info;
std::throw_with_nested(tensorrt_llm::common::TllmException(file, line, msg.c_str()));
}
}
return work;
}
🤖 Prompt for AI Agents
In cpp/include/tensorrt_llm/runtime/utils/pgUtils.h around lines 31 to 50, the
call to work->wait() ignores its boolean return value so failures signaled by a
false return (without an exception) are missed; modify the helper to capture the
wait() result, and if it returns false immediately throw a
tensorrt_llm::common::TllmException built with file, line and a descriptive
message (including info) so we fail fast; keep the existing path that checks
work->exception() and rethrows with nested TllmException so both false-return
and exception cases preserve file/line context and original info.

Comment on lines +126 to +131
template <typename T>
torch::Tensor wrap_tensor(T const* data, size_t size)
{
return wrap_tensor(const_cast<T*>(data), size);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Don’t cast away const in wrap_tensor; use const-pointer overload of from_blob.

Casting away const may let callees write into read-only memory. Use from_blob with const data.

 template <typename T>
 torch::Tensor wrap_tensor(T const* data, size_t size)
 {
-    return wrap_tensor(const_cast<T*>(data), size);
+    if constexpr (std::is_same_v<std::decay_t<T>, char>)
+    {
+        return at::from_blob(data, {static_cast<int64_t>(size)}, c10::TensorOptions{}.dtype(torch::kChar));
+    }
+    else if constexpr (std::is_same_v<std::decay_t<T>, uint64_t>)
+    {
+        return at::from_blob(data, {static_cast<int64_t>(size)}, c10::TensorOptions{}.dtype(torch::kLong));
+    }
+    else
+    {
+        return at::from_blob(
+            data, {static_cast<int64_t>(size)},
+            c10::TensorOptions{}.dtype(torch::CppTypeToScalarType<std::decay_t<T>>::value));
+    }
 }

Note: The std::vector const& and std::reference_wrapper<std::vector const> overloads will automatically benefit from this fix.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
template <typename T>
torch::Tensor wrap_tensor(T const* data, size_t size)
{
return wrap_tensor(const_cast<T*>(data), size);
}
template <typename T>
torch::Tensor wrap_tensor(T const* data, size_t size)
{
if constexpr (std::is_same_v<std::decay_t<T>, char>)
{
return at::from_blob(
data,
{static_cast<int64_t>(size)},
c10::TensorOptions{}.dtype(torch::kChar));
}
else if constexpr (std::is_same_v<std::decay_t<T>, uint64_t>)
{
return at::from_blob(
data,
{static_cast<int64_t>(size)},
c10::TensorOptions{}.dtype(torch::kLong));
}
else
{
return at::from_blob(
data,
{static_cast<int64_t>(size)},
c10::TensorOptions{}.dtype(
torch::CppTypeToScalarType<std::decay_t<T>>::value));
}
}
🤖 Prompt for AI Agents
In cpp/include/tensorrt_llm/runtime/utils/pgUtils.h around lines 126-131, the
wrap_tensor overload currently casts away const and forwards to the non-const
overload; instead add/implement a const-safe overload that accepts T const* and
uses torch::from_blob (the overload that accepts a const pointer / const void*)
to create the tensor without removing constness so callers cannot mutate
read-only memory — remove the const_cast, call from_blob with the const data
pointer and the correct size/shape and default deleter, and ensure the function
signature and return correctly preserve const safety.

{
if (first_device_id == second_device_id
|| visited_device.find(second_device_id) != visited_device.end())
if (first_device_id >= second_device_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fix redundant condition in p2p check.

The condition first_device_id >= second_device_id will skip when devices are equal, which is redundant since p2p check between the same device is meaningless. The previous check likely had two conditions to prevent duplicate checks.

The simplified condition correctly avoids duplicate checks and self-comparison.

🤖 Prompt for AI Agents
In cpp/tensorrt_llm/thop/allreduceOp.cpp around line 904, the p2p loop uses the
condition "if (first_device_id >= second_device_id)" which redundantly skips the
equal-device case and prevents correct pairwise checks; change the condition to
"if (first_device_id > second_device_id)" so you only skip duplicate/previous
pairs while still allowing the logic to handle self-comparison explicitly where
needed (i.e., ensure any needed self-checks remain handled elsewhere).

Comment on lines +36 to +37
os.environ['RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES'] = '1'
os.environ["RAY_DEDUP_LOGS"] = "0" # for debug

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Security: Setting environment variables affects the entire process

Setting RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES and RAY_DEDUP_LOGS in os.environ affects the global process state and may conflict with user settings or other components.

Consider using Ray's runtime_env for these settings exclusively:

-        os.environ['RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES'] = '1'
-        os.environ["RAY_DEDUP_LOGS"] = "0"  # for debug

         super().__init__(model_world_size, postproc_worker_config,
                          is_llm_executor)

         self.has_start_local_cluser = False
         runtime_env = {
             "env_vars": {
-                "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES": "1"
+                "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES": "1",
+                "RAY_DEDUP_LOGS": "0"
             }
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
os.environ['RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES'] = '1'
os.environ["RAY_DEDUP_LOGS"] = "0" # for debug
# Don’t set process-wide env vars; scope them to Ray via runtime_env
- os.environ['RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES'] = '1'
- os.environ["RAY_DEDUP_LOGS"] = "0" # for debug
super().__init__(model_world_size, postproc_worker_config,
is_llm_executor)
self.has_start_local_cluser = False
runtime_env = {
"env_vars": {
"RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES": "1",
"RAY_DEDUP_LOGS": "0"
}
}

Comment on lines +85 to +89
def create_actor_weak_ref(actor_handle: ray.actor.ActorHandle):
state, _, _ = actor_handle._serialization_helper()
return ray.actor.ActorHandle._deserialization_helper(state,
weak_ref=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Security: Accessing private API may break with Ray updates

The method accesses Ray's private _serialization_helper() and _deserialization_helper() APIs which are not part of the public interface and may change without notice.

Consider documenting this dependency and adding version checks:

 @staticmethod
 def create_actor_weak_ref(actor_handle: ray.actor.ActorHandle):
+    """Create a weak reference to a Ray actor handle.
+    
+    WARNING: This uses Ray's private API which may change between versions.
+    Tested with Ray version X.Y.Z.
+    """
+    # TODO: Request official Ray API for weak actor references
     state, _, _ = actor_handle._serialization_helper()
     return ray.actor.ActorHandle._deserialization_helper(state,
                                                          weak_ref=True)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def create_actor_weak_ref(actor_handle: ray.actor.ActorHandle):
state, _, _ = actor_handle._serialization_helper()
return ray.actor.ActorHandle._deserialization_helper(state,
weak_ref=True)
@staticmethod
def create_actor_weak_ref(actor_handle: ray.actor.ActorHandle):
"""Create a weak reference to a Ray actor handle.
WARNING: This uses Ray's private API which may change between versions.
Tested with Ray version X.Y.Z.
"""
# TODO: Request official Ray API for weak actor references
state, _, _ = actor_handle._serialization_helper()
return ray.actor.ActorHandle._deserialization_helper(
state, weak_ref=True
)

Comment on lines +282 to +260
if self.world_size % tp_size:
raise ValueError("world_size must be a multiple of tp_size")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add validation for tp_size value

The code checks if world_size is divisible by tp_size but doesn't validate that tp_size is positive.

+        if self.tp_size <= 0:
+            raise ValueError(f"tp_size must be positive, got {self.tp_size}")
         if self.world_size % tp_size:
             raise ValueError("world_size must be a multiple of tp_size")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if self.world_size % tp_size:
raise ValueError("world_size must be a multiple of tp_size")
if self.tp_size <= 0:
raise ValueError(f"tp_size must be positive, got {self.tp_size}")
if self.world_size % tp_size:
raise ValueError("world_size must be a multiple of tp_size")
🤖 Prompt for AI Agents
In tensorrt_llm/executor/ray_executor.py around lines 282-283, the code only
checks divisibility but does not validate tp_size itself; add a validation that
tp_size is an int > 0 (and optionally <= world_size) before performing the
modulus check, raising a ValueError with a clear message if tp_size is
non-positive or not an integer so the subsequent world_size % tp_size check is
safe and meaningful.

Comment on lines +125 to +132
if moe_cluster_size > 1:
assert moe_ep_size == 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Verify MOE cluster and EP size constraint.

The assertion that moe_cluster_size > 1 implies moe_ep_size == 1 should have an informative error message.

 if moe_cluster_size > 1:
-    assert moe_ep_size == 1
+    assert moe_ep_size == 1, f"When moe_cluster_size > 1, moe_ep_size must be 1, but got {moe_ep_size}"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if moe_cluster_size > 1:
assert moe_ep_size == 1
if moe_cluster_size > 1:
assert moe_ep_size == 1, f"When moe_cluster_size > 1, moe_ep_size must be 1, but got {moe_ep_size}"
🤖 Prompt for AI Agents
In tensorrt_llm/mapping.py around lines 125 to 127, the assertion enforcing that
when moe_cluster_size > 1 then moe_ep_size == 1 currently lacks an informative
message; replace the bare assert with a check that raises an AssertionError (or
ValueError) including a clear message like "When moe_cluster_size > 1,
moe_ep_size must be 1 (got {moe_ep_size})" so callers can understand the
constraint and the actual value that violated it.

@joyang-nv joyang-nv changed the title [TRTLLM-7349][feat] Adding new Ray orchestrator [TRTLLM-7349][feat] Adding new orchestrator type -- ray Sep 4, 2025
@joyang-nv joyang-nv marked this pull request as draft September 4, 2025 07:42
@tongyuantongyu
Copy link
Member

/bot run --stage-list "H100_PCIe-PyTorch-1,H100_PCIe-PyTorch-Ray-1" --disable-fail-fast

@tongyuantongyu
Copy link
Member

/bot run --stage-list "H100_PCIe-PyTorch-1, H100_PCIe-PyTorch-Ray-1" --disable-fail-fast

@tensorrt-cicd
Copy link
Collaborator

PR_Github #17661 [ run ] triggered by Bot

@tensorrt-cicd
Copy link
Collaborator

PR_Github #17676 [ run ] triggered by Bot

@NVIDIA NVIDIA deleted a comment from tensorrt-cicd Sep 14, 2025
@NVIDIA NVIDIA deleted a comment from tensorrt-cicd Sep 14, 2025
@NVIDIA NVIDIA deleted a comment from tensorrt-cicd Sep 14, 2025
@hchings
Copy link
Collaborator

hchings commented Sep 14, 2025

/bot run --stage-list "GB200-4_GPUs-PyTorch-1" --disable-fail-fast

@tensorrt-cicd
Copy link
Collaborator

PR_Github #18520 [ run ] triggered by Bot

@tensorrt-cicd
Copy link
Collaborator

PR_Github #18520 [ run ] completed with state SUCCESS
/LLM/main/L0_MergeRequest_PR pipeline #13902 (Partly Tested) completed with status: 'SUCCESS'

self.master_port = os.environ["MASTER_PORT"]

# Ray can't pickle TensorRT logger; import/use it inside methods only.
from tensorrt_llm.logger import logger
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this embeded import be moved to the beginning of this file?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we import it at the beginning of the file, Ray Actor will have pickle issue. @shuyixiong has changed it to global in this commit. PTAL.

@@ -0,0 +1,37 @@
/*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Propose to put this cpp file shall be put into cpp/tensorrt_llm folder

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file has to be shipped in the pip wheel and compiled during runtime, otherwise we using incompatible version of pybind11with PyTorch will prevent the code from working. We will revise if this is still necessary after we fully switched to nanobind.

echo "Checking if ray cluster is already running..."
if ray status > /dev/null 2>&1; then
echo "Ray cluster is already running. Stopping existing cluster first..."
ray stop
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this? Can one existing ray cluster support multiple client applications?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Disagg + Ray example workflow we currently show is using a single Ray cluster for all clients (ctx/gen servers). This setup is recommended but not mandatory. i.e., Users can have one Ray cluster per server if preferred.

Can one existing ray cluster support multiple client applications?

Yes. TRTLLM (the client application) will auto-detect whether there's an existing Ray cluster and join.

@@ -0,0 +1,54 @@
# Generate text asynchronously with Ray orchestrator.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nit. But +1


@torch.library.register_fake("trtllm::allreduce")
def _(
def allreduce(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Bit. But is there issue of original style using "_"?

Copy link
Collaborator

@hchings hchings Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ @tongyuantongyu @shuyixiong - any context on this part?

Copy link
Member

@tongyuantongyu tongyuantongyu Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to reuse the code when registering allreduce_pg by calling this function. So we need a unique name to reference it.

pg_util_path = os.path.normpath(f"{src_path}/../../libs")
pg_broker = torch.utils.cpp_extension.load(
name="pg_broker",
sources=[f"{src_path}/pgBroker.cpp"],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall this pg broker extenstion be built during TRTLLM wheel build time, instead of on the fly?

And what happens if multiple processing is building this one?

Copy link
Collaborator

@hchings hchings Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this for loading? + @tongyuantongyu to handle this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall this pg broker extenstion be built during TRTLLM wheel build time, instead of on the fly?

See #7520 (comment) for explanation. We also have another plan to avoid packaging this .cpp source, with the cost of being slower.

And what happens if multiple processing is building this one?

PyTorch should have safeguards to build it only once and safely.


} // namespace tensorrt_llm::pg_broker

PYBIND11_MODULE(TORCH_EXTENSION_NAME, m)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How it works with nanobind? I think we want to make nanobind the default in 1.1 timeframe.

@Linda-Stadter for vis about this discussion.

Copy link
Member

@tongyuantongyu tongyuantongyu Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is JIT-ed using pybind11 headers vendored by PyTorch. We use JIT instead of build via CMake to avoid using the pybind11 headers under 3rdparty, since it's incompatible with the version PyTorch uses and prevents custom casters defined by PyTorch from working properly.

@laikhtewari
Copy link
Collaborator

At least one outstanding instance of "experimental" in example disagg readme

to_delete dir should be deleted or renamed

from typing import Optional

import numpy as np
import ray
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a conditional import that only runs when the orchestrator type is set to 'ray'? This would avoid requiring MPI users to install Ray.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the dependency on ray in mpi path here and tested it with a docker image that does not have ray installed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preserving Shuyi's commit link here 976aa13 as I need to squash for rebase.

@@ -0,0 +1,130 @@
import os

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this file is created mainly for testing Ray, so how about renaming it to something like test_allgather_ray.py?



@ray.remote(num_gpus=1)
class AllreducePGTest:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw the newly added code shares little code with the existing logic, so maybe it is better to create a new file test_allreduce_ray.py for it. That will highlight the orchestrator type of the tests and make it easier to arbitrate when a test fails.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So are other test files whose name is generic but is Ray-dedicated. Ideally, the test file without _ray suffix is by default to seen as mpi, and the test files with _ray are easily identified as Ray-dedicated.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. @shuyixiong can help moving all ray multi gpu tests into a single test_ops_ray.py.

tongyuantongyu and others added 3 commits September 17, 2025 19:24
Signed-off-by: Yuan Tong <[email protected]>

Cleanup

Signed-off-by: shuyix <[email protected]>

Fix ucxCommTest compilation

Signed-off-by: Yuan Tong <[email protected]>

cleanup ray worker

Signed-off-by: Erin Ho <[email protected]>

fix config copy

Signed-off-by: Yuan Tong <[email protected]>

refactor mapping and device mesh

Signed-off-by: Yuan Tong <[email protected]>

fix cacheTransceiverTest compilation

Signed-off-by: Yuan Tong <[email protected]>

add API desc

Signed-off-by: Yuan Tong <[email protected]>

fix orchestrator_type detect

Signed-off-by: Yuan Tong <[email protected]>

add fake impl for custom operator

Signed-off-by: Yuan Tong <[email protected]>

add back global_rank

Signed-off-by: Yuan Tong <[email protected]>

Fix cache transceiver assertion

Signed-off-by: shuyix <[email protected]>

remove pg wait

Signed-off-by: Erin Ho <[email protected]>

add PP collective ops and fix result_wait_queue

Signed-off-by: shuyix <[email protected]>

precommit

update docs per comments

Signed-off-by: Erin Ho <[email protected]>

Remove redundant local logger import

Signed-off-by: shuyix <[email protected]>

minor fixes, add pp test

Signed-off-by: Erin Ho <[email protected]>

Add multigpu ray stages

Signed-off-by: Yuan Tong <[email protected]>

Refactor result_wait_queues cleanup

Signed-off-by: shuyix <[email protected]>

change experimental to prototype disagg readme

Signed-off-by: Laikh Tewari <[email protected]>

address comments

revert lazy init of ray queues

Signed-off-by: Erin Ho <[email protected]>

Remove dependancy of ray in mpi path

Signed-off-by: shuyix <[email protected]>
Copy link
Collaborator

@HuiGao-NV HuiGao-NV left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.