Skip to content

make local_queue optional #831

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions src/codeflare_sdk/common/kueue/kueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from codeflare_sdk.common.kubernetes_cluster.auth import config_check, get_api_client
from kubernetes import client
from kubernetes.client.exceptions import ApiException
import warnings


def get_default_kueue_name(namespace: str) -> Optional[str]:
Expand Down Expand Up @@ -157,18 +158,24 @@ def add_queue_label(item: dict, namespace: str, local_queue: Optional[str]):
The namespace of the local queue.
local_queue (str, optional):
The name of the local queue to use. Defaults to None.

Raises:
ValueError:
If the provided or default local queue does not exist in the namespace.
"""
lq_name = local_queue or get_default_kueue_name(namespace)
if lq_name == None:
return
elif not local_queue_exists(namespace, lq_name):
raise ValueError(
"local_queue provided does not exist or is not in this namespace. Please provide the correct local_queue name in Cluster Configuration"
available_queues = list_local_queues(namespace)
if available_queues is None:
warnings.warn(
f"Local queue '{local_queue}' does not exist in namespace '{namespace}'. "
"Unable to retrieve list of available queues."
)
return
available_queue_names = [q["name"] for q in available_queues]
warnings.warn(
f"Local queue '{local_queue}' does not exist in namespace '{namespace}'. "
f"Available queues are: {', '.join(available_queue_names)}"
)
return
if not "labels" in item["metadata"]:
item["metadata"]["labels"] = {}
item["metadata"]["labels"].update({"kueue.x-k8s.io/queue-name": lq_name})
114 changes: 107 additions & 7 deletions src/codeflare_sdk/common/kueue/test_kueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,14 @@ def test_get_local_queue_exists_fail(mocker):
)
mocker.patch(
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
return_value={
"items": [
{
"metadata": {"name": "local-queue-default"},
"status": {"flavors": [{"name": "default"}]},
}
]
},
)
config = create_cluster_config()
config.name = "unit-test-aw-kueue"
Expand Down Expand Up @@ -267,15 +274,105 @@ def test_add_queue_label_with_valid_local_queue(mocker):
assert item["metadata"]["labels"] == {"kueue.x-k8s.io/queue-name": "valid-queue"}


def test_add_queue_label_with_invalid_local_queue(mocker):
def test_add_queue_label_with_invalid_local_queue_shows_available_queues(mocker):
# Mock the kubernetes.client.CustomObjectsApi and its response
mock_api_instance = mocker.patch("kubernetes.client.CustomObjectsApi")
mock_api_instance.return_value.list_namespaced_custom_object.return_value = {
"items": [
{"metadata": {"name": "valid-queue"}},
{
"metadata": {"name": "queue1"},
"status": {"flavors": [{"name": "default"}]},
},
{
"metadata": {"name": "queue2"},
"status": {"flavors": [{"name": "default"}]},
},
]
}

# Mock the local_queue_exists function to return False
mocker.patch("codeflare_sdk.common.kueue.local_queue_exists", return_value=False)

# Define input item and parameters
item = {"metadata": {}}
namespace = "test-namespace"
local_queue = "invalid-queue"

# Call the function and expect a warning with available queues in the message
with pytest.warns(
UserWarning,
match=f"Local queue '{local_queue}' does not exist in namespace '{namespace}'. Available queues are: queue1, queue2",
):
add_queue_label(item, namespace, local_queue)

# Assert that no label is added
assert "labels" not in item["metadata"]


def test_add_queue_label_with_no_local_queue(mocker):
# Mock the kubernetes.client.CustomObjectsApi and its response
mock_api_instance = mocker.patch("kubernetes.client.CustomObjectsApi")
mock_api_instance.return_value.list_namespaced_custom_object.return_value = {
"items": []
}

# Mock get_default_kueue_name to return None
mocker.patch(
"codeflare_sdk.common.kueue.get_default_kueue_name",
return_value=None,
)

# Define input item and parameters
item = {"metadata": {}}
namespace = "test-namespace"
local_queue = None

# Call the function
add_queue_label(item, namespace, local_queue)

# Assert that no label is added
assert "labels" not in item["metadata"]


def test_add_queue_label_with_default_queue(mocker):
# Mock the kubernetes.client.CustomObjectsApi and its response
mock_api_instance = mocker.patch("kubernetes.client.CustomObjectsApi")
mock_api_instance.return_value.list_namespaced_custom_object.return_value = {
"items": [
{
"metadata": {
"name": "default-queue",
"annotations": {"kueue.x-k8s.io/default-queue": "true"},
}
}
]
}

# Mock get_default_kueue_name to return a default queue
mocker.patch(
"codeflare_sdk.common.kueue.get_default_kueue_name",
return_value="default-queue",
)

# Define input item and parameters
item = {"metadata": {}}
namespace = "test-namespace"
local_queue = None

# Call the function
add_queue_label(item, namespace, local_queue)

# Assert that the default queue label is added
assert item["metadata"]["labels"] == {"kueue.x-k8s.io/queue-name": "default-queue"}


def test_add_queue_label_with_invalid_local_queue_and_no_available_queues(mocker):
# Mock the kubernetes.client.CustomObjectsApi and its response
mock_api_instance = mocker.patch("kubernetes.client.CustomObjectsApi")
mock_api_instance.return_value.list_namespaced_custom_object.return_value = {
"items": [] # Empty list instead of None
}

# Mock the local_queue_exists function to return False
mocker.patch("codeflare_sdk.common.kueue.local_queue_exists", return_value=False)

Expand All @@ -284,13 +381,16 @@ def test_add_queue_label_with_invalid_local_queue(mocker):
namespace = "test-namespace"
local_queue = "invalid-queue"

# Call the function and expect a ValueError
with pytest.raises(
ValueError,
match="local_queue provided does not exist or is not in this namespace",
# Call the function and expect a warning about unavailable queues
with pytest.warns(
UserWarning,
match=f"Local queue '{local_queue}' does not exist in namespace '{namespace}'. Available queues are:",
):
add_queue_label(item, namespace, local_queue)

# Assert that no label is added
assert "labels" not in item["metadata"]


# Make sure to always keep this function last
def test_cleanup():
Expand Down
23 changes: 18 additions & 5 deletions src/codeflare_sdk/ray/cluster/build_ray_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
(in the cluster sub-module) for RayCluster/AppWrapper generation.
"""
from typing import List, Union, Tuple, Dict

from ...common.kueue.kueue import list_local_queues
from ...common import _kube_api_error_handling
from ...common.kubernetes_cluster import get_api_client, config_check
from kubernetes.client.exceptions import ApiException
Expand Down Expand Up @@ -482,15 +484,26 @@ def head_worker_extended_resources_from_cluster(
# Local Queue related functions
def add_queue_label(cluster: "codeflare_sdk.ray.cluster.Cluster", labels: dict):
"""
The add_queue_label() function updates the given base labels with the local queue label if Kueue exists on the Cluster
The add_queue_label() function updates the given base labels with the local queue label if Kueue exists on the Cluster.
If no local_queue is provided, no queue label will be added.
"""
lq_name = cluster.config.local_queue or get_default_local_queue(cluster, labels)
if lq_name == None:
if lq_name is None:
return
elif not local_queue_exists(cluster):
raise ValueError(
"local_queue provided does not exist or is not in this namespace. Please provide the correct local_queue name in Cluster Configuration"
elif cluster.config.local_queue and not local_queue_exists(cluster):
available_queues = list_local_queues(cluster.config.namespace)
if available_queues is None:
print(
f"WARNING: Local queue '{cluster.config.local_queue}' does not exist in namespace '{cluster.config.namespace}'. "
"Unable to retrieve list of available queues."
)
return
available_queue_names = [q["name"] for q in available_queues]
print(
f"WARNING: Local queue '{cluster.config.local_queue}' does not exist in namespace '{cluster.config.namespace}'. "
f"Available queues are: {', '.join(available_queue_names)}"
)
return
labels.update({"kueue.x-k8s.io/queue-name": lq_name})


Expand Down
Loading