Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/e2e_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ jobs:
kubectl create clusterrolebinding sdk-user-service-reader --clusterrole=service-reader --user=sdk-user
kubectl create clusterrole port-forward-pods --verb=create --resource=pods/portforward
kubectl create clusterrolebinding sdk-user-port-forward-pods-binding --clusterrole=port-forward-pods --user=sdk-user
kubectl create clusterrole rayjob-creator --verb=get,list,create,delete,patch --resource=rayjobs --resource=rayjobs/status
kubectl create clusterrolebinding sdk-user-rayjob-creator --clusterrole=rayjob-creator --user=sdk-user
kubectl config use-context sdk-user

- name: Run e2e tests
Expand All @@ -122,7 +124,8 @@ jobs:
pip install poetry
poetry install --with test,docs
echo "Running e2e tests..."
poetry run pytest -v -s ./tests/e2e -m 'kind and nvidia_gpu' > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1
# poetry run pytest -v -s ./tests/e2e -m 'kind and nvidia_gpu' > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1
poetry run pytest -v -s ./tests/e2e/rayjob_existing_cluster_kind_test.py::TestRayJobExistingClusterKind::test_rayjob_ray_cluster_sdk_kind_nvidia_gpu > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1
env:
GRPC_DNS_RESOLVER: "native"

Expand Down
24 changes: 20 additions & 4 deletions docs/sphinx/user-docs/e2e.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,28 @@ instructions <https://www.substratus.ai/blog/kind-with-gpus>`__.
# Add RBAC permissions to sdk-user
kubectl create clusterrole list-ingresses --verb=get,list --resource=ingresses
kubectl create clusterrolebinding sdk-user-list-ingresses --clusterrole=list-ingresses --user=sdk-user
kubectl create clusterrole appwrapper-creator --verb=get,list,create,delete,patch --resource=appwrappers
kubectl create clusterrolebinding sdk-user-appwrapper-creator --clusterrole=appwrapper-creator --user=sdk-user
kubectl create clusterrole namespace-creator --verb=get,list,create,delete,patch --resource=namespaces
kubectl create clusterrolebinding sdk-user-namespace-creator --clusterrole=namespace-creator --user=sdk-user
kubectl create clusterrole list-rayclusters --verb=get,list --resource=rayclusters
kubectl create clusterrolebinding sdk-user-list-rayclusters --clusterrole=list-rayclusters --user=sdk-user
kubectl create clusterrole raycluster-creator --verb=get,list,create,delete,patch --resource=rayclusters
kubectl create clusterrolebinding sdk-user-raycluster-creator --clusterrole=raycluster-creator --user=sdk-user
kubectl create clusterrole appwrapper-creator --verb=get,list,create,delete,patch --resource=appwrappers
kubectl create clusterrolebinding sdk-user-appwrapper-creator --clusterrole=appwrapper-creator --user=sdk-user
kubectl create clusterrole resourceflavor-creator --verb=get,list,create,delete --resource=resourceflavors
kubectl create clusterrolebinding sdk-user-resourceflavor-creator --clusterrole=resourceflavor-creator --user=sdk-user
kubectl create clusterrole clusterqueue-creator --verb=get,list,create,delete,patch --resource=clusterqueues
kubectl create clusterrolebinding sdk-user-clusterqueue-creator --clusterrole=clusterqueue-creator --user=sdk-user
kubectl create clusterrole localqueue-creator --verb=get,list,create,delete,patch --resource=localqueues
kubectl create clusterrolebinding sdk-user-localqueue-creator --clusterrole=localqueue-creator --user=sdk-user
kubectl create clusterrole list-secrets --verb=get,list --resource=secrets
kubectl create clusterrolebinding sdk-user-list-secrets --clusterrole=list-secrets --user=sdk-user
kubectl create clusterrole pod-creator --verb=get,list,watch --resource=pods
kubectl create clusterrolebinding sdk-user-pod-creator --clusterrole=pod-creator --user=sdk-user
kubectl create clusterrole service-reader --verb=get,list,watch --resource=services
kubectl create clusterrolebinding sdk-user-service-reader --clusterrole=service-reader --user=sdk-user
kubectl create clusterrole port-forward-pods --verb=create --resource=pods/portforward
kubectl create clusterrolebinding sdk-user-port-forward-pods-binding --clusterrole=port-forward-pods --user=sdk-user
kubectl create clusterrole rayjob-creator --verb=get,list,create,delete,patch --resource=rayjobs --resource=rayjobs/status
kubectl create clusterrolebinding sdk-user-rayjob-creator --clusterrole=rayjob-creator --user=sdk-user
kubectl config use-context sdk-user

- Install the latest development version of kueue
Expand Down
13 changes: 13 additions & 0 deletions src/codeflare_sdk/ray/rayjobs/rayjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ def __init__(
shutdown_after_job_finishes: Optional[bool] = None,
ttl_seconds_after_finished: int = 0,
active_deadline_seconds: Optional[int] = None,
entrypoint_num_cpus: Optional[int] = None,
entrypoint_num_gpus: Optional[int] = None,
backoff_limit: int = 3,
):
"""
Initialize a RayJob instance.
Expand Down Expand Up @@ -100,6 +103,9 @@ def __init__(
self.runtime_env = runtime_env
self.ttl_seconds_after_finished = ttl_seconds_after_finished
self.active_deadline_seconds = active_deadline_seconds
self.entrypoint_num_cpus = entrypoint_num_cpus
self.entrypoint_num_gpus = entrypoint_num_gpus
self.backoff_limit = backoff_limit

# Auto-set shutdown_after_job_finishes based on cluster_config presence
# If cluster_config is provided, we want to clean up the cluster after job finishes
Expand Down Expand Up @@ -182,13 +188,20 @@ def _build_rayjob_cr(self) -> Dict[str, Any]:
"entrypoint": self.entrypoint,
"shutdownAfterJobFinishes": self.shutdown_after_job_finishes,
"ttlSecondsAfterFinished": self.ttl_seconds_after_finished,
"backoffLimit": self.backoff_limit,
},
}

# Add active deadline if specified
if self.active_deadline_seconds:
rayjob_cr["spec"]["activeDeadlineSeconds"] = self.active_deadline_seconds

# Add entrypoint resource requirements if specified
if self.entrypoint_num_cpus is not None:
rayjob_cr["spec"]["entrypointNumCpus"] = self.entrypoint_num_cpus
if self.entrypoint_num_gpus is not None:
rayjob_cr["spec"]["entrypointNumGpus"] = self.entrypoint_num_gpus

# Add runtime environment if specified
if self.runtime_env:
rayjob_cr["spec"]["runtimeEnvYAML"] = str(self.runtime_env)
Expand Down
152 changes: 152 additions & 0 deletions tests/e2e/rayjob_existing_cluster_kind_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from time import sleep

from codeflare_sdk import Cluster, ClusterConfiguration
from codeflare_sdk.ray.rayjobs import RayJob
from codeflare_sdk.ray.rayjobs.status import CodeflareRayJobStatus

import pytest

from support import *

# This test creates a Ray Cluster and then submits a RayJob against the existing cluster on Kind Cluster


@pytest.mark.kind
class TestRayJobExistingClusterKind:
def setup_method(self):
initialize_kubernetes_client(self)

def teardown_method(self):
delete_namespace(self)
delete_kueue_resources(self)

def test_rayjob_ray_cluster_sdk_kind(self):
self.setup_method()
create_namespace(self)
create_kueue_resources(self)
self.run_rayjob_against_existing_cluster_kind(accelerator="cpu")

@pytest.mark.nvidia_gpu
def test_rayjob_ray_cluster_sdk_kind_nvidia_gpu(self):
self.setup_method()
create_namespace(self)
create_kueue_resources(self)
# self.run_rayjob_against_existing_cluster_kind(
# accelerator="gpu", number_of_gpus=1
# )
self.run_rayjob_against_existing_cluster_kind(
accelerator="cpu", number_of_gpus=0
)

def run_rayjob_against_existing_cluster_kind(
self, accelerator, gpu_resource_name="nvidia.com/gpu", number_of_gpus=0
):
cluster_name = "existing-cluster"
cluster = Cluster(
ClusterConfiguration(
name=cluster_name,
namespace=self.namespace,
num_workers=1,
head_cpu_requests="500m",
head_cpu_limits="500m",
worker_cpu_requests="500m",
worker_cpu_limits=1,
worker_memory_requests=1,
worker_memory_limits=4,
# worker_extended_resource_requests={gpu_resource_name: number_of_gpus},
# image="rayproject/ray:2.47.1",
write_to_file=True,
verify_tls=False,
)
)

cluster.apply()
cluster.status()
cluster.wait_ready()
cluster.status()
cluster.details()

print(f"✅ Ray cluster '{cluster_name}' is ready")

# test RayJob submission against the existing cluster
self.assert_rayjob_submit_against_existing_cluster(
cluster, accelerator, number_of_gpus
)

# Cleanup - manually tear down the cluster since job won't do it
print("🧹 Cleaning up Ray cluster")
cluster.down()

def assert_rayjob_submit_against_existing_cluster(
self, cluster, accelerator, number_of_gpus
):
"""
Test RayJob submission against an existing Ray cluster.
"""
cluster_name = cluster.config.name
job_name = f"mnist-rayjob-{accelerator}"

print(f"🚀 Testing RayJob submission against existing cluster '{cluster_name}'")

# Create RayJob targeting the existing cluster
rayjob = RayJob(
job_name=job_name,
cluster_name=cluster_name,
namespace=self.namespace,
entrypoint="python mnist.py",
runtime_env={
"working_dir": "./tests/e2e/",
"pip": "./tests/e2e/mnist_pip_requirements.txt",
"env_vars": get_setup_env_variables(ACCELERATOR=accelerator),
},
shutdown_after_job_finishes=False, # Don't shutdown the existing cluster
)

# Submit the job
submission_result = rayjob.submit()
assert (
submission_result == job_name
), f"Job submission failed, expected {job_name}, got {submission_result}"
print(f"✅ Successfully submitted RayJob '{job_name}' against existing cluster")

# Monitor the job status until completion
self.monitor_rayjob_completion(rayjob, timeout=900)

print(f"✅ RayJob '{job_name}' completed successfully against existing cluster!")

def monitor_rayjob_completion(self, rayjob: RayJob, timeout: int = 900):
"""
Monitor a RayJob until it completes or fails.
Args:
rayjob: The RayJob instance to monitor
timeout: Maximum time to wait in seconds (default: 15 minutes)
"""
print(f"⏳ Monitoring RayJob '{rayjob.name}' status...")

elapsed_time = 0
check_interval = 10 # Check every 10 seconds

while elapsed_time < timeout:
status, ready = rayjob.status(print_to_console=True)

# Check if job has completed (either successfully or failed)
if status == CodeflareRayJobStatus.COMPLETE:
print(f"✅ RayJob '{rayjob.name}' completed successfully!")
return
elif status == CodeflareRayJobStatus.FAILED:
raise AssertionError(f"❌ RayJob '{rayjob.name}' failed!")
elif status == CodeflareRayJobStatus.RUNNING:
print(f"🏃 RayJob '{rayjob.name}' is still running...")
elif status == CodeflareRayJobStatus.UNKNOWN:
print(f"❓ RayJob '{rayjob.name}' status is unknown")

# Wait before next check
sleep(check_interval)
elapsed_time += check_interval

# If we reach here, the job has timed out
final_status, _ = rayjob.status(print_to_console=True)
raise TimeoutError(
f"⏰ RayJob '{rayjob.name}' did not complete within {timeout} seconds. "
f"Final status: {final_status}"
)
135 changes: 135 additions & 0 deletions tests/e2e/rayjob_existing_cluster_oauth_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import pytest
from time import sleep

from codeflare_sdk import (
Cluster,
ClusterConfiguration,
TokenAuthentication,
)
from codeflare_sdk.ray.rayjobs import RayJob
from codeflare_sdk.ray.rayjobs.status import CodeflareRayJobStatus

from support import *

# This test creates a Ray Cluster and then submits a RayJob against the existing cluster on OpenShift


@pytest.mark.openshift
class TestRayJobExistingClusterOauth:
def setup_method(self):
initialize_kubernetes_client(self)

def teardown_method(self):
delete_namespace(self)
delete_kueue_resources(self)

def test_rayjob_against_existing_cluster_oauth(self):
self.setup_method()
create_namespace(self)
create_kueue_resources(self)
self.run_rayjob_against_existing_cluster_oauth()

def run_rayjob_against_existing_cluster_oauth(self):
ray_image = get_ray_image()

auth = TokenAuthentication(
token=run_oc_command(["whoami", "--show-token=true"]),
server=run_oc_command(["whoami", "--show-server=true"]),
skip_tls=True,
)
auth.login()

cluster_name = "existing-cluster"

cluster = Cluster(
ClusterConfiguration(
name=cluster_name,
namespace=self.namespace,
num_workers=1,
head_cpu_requests="500m",
head_cpu_limits="500m",
worker_cpu_requests=1,
worker_cpu_limits=1,
worker_memory_requests=1,
worker_memory_limits=4,
image=ray_image,
write_to_file=True,
verify_tls=False,
)
)

cluster.apply()
cluster.status()
cluster.wait_ready()
cluster.status()
cluster.details()

print(f"✅ Ray cluster '{cluster_name}' is ready")

job_name = "existing-cluster-rayjob"

rayjob = RayJob(
job_name=job_name,
cluster_name=cluster_name,
namespace=self.namespace,
entrypoint="python -c \"import ray; ray.init(); print('Hello from RayJob!'); print(f'Ray version: {ray.__version__}'); import time; time.sleep(30); print('RayJob completed successfully!')\"",
runtime_env={
"pip": ["torch", "pytorch-lightning", "torchmetrics", "torchvision"],
"env_vars": get_setup_env_variables(ACCELERATOR="cpu"),
},
shutdown_after_job_finishes=False,
)

# Submit the job
print(
f"🚀 Submitting RayJob '{job_name}' against existing cluster '{cluster_name}'"
)
submission_result = rayjob.submit()
assert (
submission_result == job_name
), f"Job submission failed, expected {job_name}, got {submission_result}"
print(f"✅ Successfully submitted RayJob '{job_name}'")

# Monitor the job status until completion
self.monitor_rayjob_completion(rayjob)

# Cleanup - manually tear down the cluster since job won't do it
print("🧹 Cleaning up Ray cluster")
cluster.down()

def monitor_rayjob_completion(self, rayjob: RayJob, timeout: int = 300):
"""
Monitor a RayJob until it completes or fails.
Args:
rayjob: The RayJob instance to monitor
timeout: Maximum time to wait in seconds (default: 15 minutes)
"""
print(f"⏳ Monitoring RayJob '{rayjob.name}' status...")

elapsed_time = 0
check_interval = 10 # Check every 10 seconds

while elapsed_time < timeout:
status, ready = rayjob.status(print_to_console=True)

# Check if job has completed (either successfully or failed)
if status == CodeflareRayJobStatus.COMPLETE:
print(f"✅ RayJob '{rayjob.name}' completed successfully!")
return
elif status == CodeflareRayJobStatus.FAILED:
raise AssertionError(f"❌ RayJob '{rayjob.name}' failed!")
elif status == CodeflareRayJobStatus.RUNNING:
print(f"🏃 RayJob '{rayjob.name}' is still running...")
elif status == CodeflareRayJobStatus.UNKNOWN:
print(f"❓ RayJob '{rayjob.name}' status is unknown")

# Wait before next check
sleep(check_interval)
elapsed_time += check_interval

# If we reach here, the job has timed out
final_status, _ = rayjob.status(print_to_console=True)
raise TimeoutError(
f"⏰ RayJob '{rayjob.name}' did not complete within {timeout} seconds. "
f"Final status: {final_status}"
)
Loading