Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .buildkite/test-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ steps:
commands:
- # the following commands are for the first node, with ip 192.168.10.10 (ray environment already set up)
- VLLM_TEST_SAME_HOST=0 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_same_node.py
- VLLM_MULTI_NODE=1 pytest -v -s distributed/test_multi_node_assignment.py
- VLLM_MULTI_NODE=1 pytest -v -s distributed/test_pipeline_parallel.py
- # the following commands are for the second node, with ip 192.168.10.11 (ray environment already set up)
- VLLM_TEST_SAME_HOST=0 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_same_node.py
Expand Down
64 changes: 64 additions & 0 deletions tests/distributed/test_multi_node_assignment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Make sure ray assigns GPU workers to the correct node.

Run:
```sh
cd $VLLM_PATH/tests

pytest distributed/test_multi_node_assignment.py
```
"""

import os

import pytest
import ray
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

from vllm import initialize_ray_cluster
from vllm.config import ParallelConfig
from vllm.executor.ray_utils import _wait_until_pg_removed
from vllm.utils import get_ip

VLLM_MULTI_NODE = os.getenv("VLLM_MULTI_NODE", "0") == "1"


@pytest.mark.skipif(not VLLM_MULTI_NODE,
reason="Need at least 2 nodes to run the test.")
def test_multi_node_assignment() -> None:

# NOTE: important to keep this class definition here
# to let ray use cloudpickle to serialize it.
class Actor:

def get_ip(self):
return get_ip()

for _ in range(10):
config = ParallelConfig(1, 2)
initialize_ray_cluster(config)

current_ip = get_ip()
workers = []
for bundle_id, bundle in enumerate(
config.placement_group.bundle_specs):
if not bundle.get("GPU", 0):
continue
scheduling_strategy = PlacementGroupSchedulingStrategy(
placement_group=config.placement_group,
placement_group_capture_child_tasks=True,
placement_group_bundle_index=bundle_id,
)

worker = ray.remote(
num_cpus=0,
num_gpus=1,
scheduling_strategy=scheduling_strategy,
)(Actor).remote()
worker_ip = ray.get(worker.get_ip.remote())
assert worker_ip == current_ip
workers.append(worker)

for worker in workers:
ray.kill(worker)

_wait_until_pg_removed(config.placement_group)
141 changes: 132 additions & 9 deletions vllm/executor/ray_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import List, Optional, Tuple, Union
import time
from collections import defaultdict
from typing import Dict, List, Optional, Tuple, Union

import msgspec

Expand All @@ -11,9 +13,13 @@
from vllm.worker.worker_base import WorkerWrapperBase

logger = init_logger(__name__)
PG_WAIT_TIMEOUT = 1800

try:
import ray
from ray._private.state import available_resources_per_node
from ray.util import placement_group_table
from ray.util.placement_group import PlacementGroup

class RayWorkerWrapper(WorkerWrapperBase):
"""Ray wrapper for vllm.worker.Worker, allowing Worker to be
Expand Down Expand Up @@ -98,6 +104,106 @@ def assert_ray_available():
"`pip install ray`.") from ray_import_err


def _verify_bundles(placement_group: "PlacementGroup",
parallel_config: ParallelConfig, device_str: str):
"""Verify a given placement group has bundles located in the right place.

There are 2 rules.
- Warn if all tensor parallel workers cannot fit in a single node.
- Fail if driver node is not included in a placement group.
"""
assert ray.is_initialized(), (
"Ray is not initialized although distributed-executor-backend is ray.")
pg_data = placement_group_table(placement_group)
# bundle_idx -> node_id
bundle_to_node_ids = pg_data["bundles_to_node_id"]
# bundle_idx -> bundle (e.g., {"GPU": 1})
bundles = pg_data["bundles"]
# node_id -> List of bundle (e.g., {"GPU": 1})
node_id_to_bundle: Dict[str, List[Dict[str, float]]] = defaultdict(list)

for bundle_idx, node_id in bundle_to_node_ids.items():
node_id_to_bundle[node_id].append(bundles[bundle_idx])
driver_node_id = ray.get_runtime_context().get_node_id()

if driver_node_id not in node_id_to_bundle:
raise RuntimeError(
f"driver node id {driver_node_id} is not included in a placement "
f"group {placement_group.id}. Node id -> bundles "
f"{node_id_to_bundle}. "
"You don't have enough GPUs available in a current node. Check "
"`ray status` to see if you have available GPUs in a node "
f"{driver_node_id} before starting an vLLM engine.")

for node_id, bundles in node_id_to_bundle.items():
if len(bundles) < parallel_config.tensor_parallel_size:
logger.warning(
"tensor_parallel_size=%d "
"is bigger than a reserved number of %ss (%d "
"%ss) in a node %s. Tensor parallel workers can be "
"spread out to 2+ nodes which can degrade the performance "
"unless you have fast interconnect across nodes, like "
"Infiniband. To resolve this issue, make sure you have more "
"than %d GPUs available at each node.",
parallel_config.tensor_parallel_size, device_str, len(bundles),
device_str, node_id, parallel_config.tensor_parallel_size)


def _wait_until_pg_ready(current_placement_group: "PlacementGroup"):
"""Wait until a placement group is ready.

It prints the informative log messages if the placement group is
not created within time.

"""
# Wait until PG is ready - this will block until all
# requested resources are available, and will timeout
# if they cannot be provisioned.
placement_group_specs = current_placement_group.bundle_specs

s = time.time()
pg_ready_ref = current_placement_group.ready()
wait_interval = 10
while time.time() - s < PG_WAIT_TIMEOUT:
ready, _ = ray.wait([pg_ready_ref], timeout=wait_interval)
if len(ready) > 0:
break

# Exponential backoff for warning print.
wait_interval *= 2
logger.info(
"Waiting for creating a placement group of specs for "
"%d seconds. specs=%s. Check "
"`ray status` to see if you have enough resources.",
int(time.time() - s), placement_group_specs)

try:
ray.get(pg_ready_ref, timeout=0)
except ray.exceptions.GetTimeoutError:
raise ValueError(
"Cannot provide a placement group of "
f"{placement_group_specs=} within {PG_WAIT_TIMEOUT} seconds. See "
"`ray status` to make sure the cluster has enough resources."
) from None


def _wait_until_pg_removed(current_placement_group: "PlacementGroup"):
ray.util.remove_placement_group(current_placement_group)
s = time.time()
wait_interval = 10
while time.time() - s < PG_WAIT_TIMEOUT:
pg = ray.util.get_current_placement_group()
if pg is None:
break

# Exponential backoff for warning print.
wait_interval *= 2
logger.info(
"Waiting for removing a placement group of specs for "
"%d seconds.", int(time.time() - s))
time.sleep(wait_interval)


def initialize_ray_cluster(
parallel_config: ParallelConfig,
ray_address: Optional[str] = None,
Expand Down Expand Up @@ -156,15 +262,32 @@ def initialize_ray_cluster(
f"The number of required {device_str}s exceeds the total "
f"number of available {device_str}s in the placement group.")
# Create a new placement group
placement_group_specs = ([{
device_str: 1
}] * parallel_config.world_size)
placement_group_specs: List[Dict[str, float]] = ([{
device_str: 1.0
} for _ in range(parallel_config.world_size)])

# vLLM engine is also a worker to execute model with an accelerator,
# so it requires to have the device in a current node. Check if
# the current node has at least one device.
current_ip = get_ip()
current_node_id = ray.get_runtime_context().get_node_id()
current_node_resource = available_resources_per_node()[current_node_id]
if current_node_resource.get(device_str, 0) < 1:
raise ValueError(
f"Current node has no {device_str} available. "
f"{current_node_resource=}. vLLM engine cannot start without "
f"{device_str}. Make sure you have at least 1 {device_str} "
f"available in a node {current_node_id=} {current_ip=}.")
# This way, at least bundle is required to be created in a current
# node.
placement_group_specs[0][f"node:{current_ip}"] = 0.001
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the key to make sure current node is included in the placement group?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's right. I failed to find other clean way to support this unfortunately...


# By default, Ray packs resources as much as possible.
current_placement_group = ray.util.placement_group(
placement_group_specs)
# Wait until PG is ready - this will block until all
# requested resources are available, and will timeout
# if they cannot be provisioned.
ray.get(current_placement_group.ready(), timeout=1800)
placement_group_specs, strategy="PACK")
_wait_until_pg_ready(current_placement_group)

assert current_placement_group is not None
_verify_bundles(current_placement_group, parallel_config, device_str)
# Set the placement group in the parallel config
parallel_config.placement_group = current_placement_group