diff --git a/.github/workflows/e2e_tests.yaml b/.github/workflows/e2e_tests.yaml index d66e4b34..36d5aeb7 100644 --- a/.github/workflows/e2e_tests.yaml +++ b/.github/workflows/e2e_tests.yaml @@ -111,6 +111,8 @@ jobs: kubectl create clusterrolebinding sdk-user-service-reader --clusterrole=service-reader --user=sdk-user kubectl create clusterrole port-forward-pods --verb=create --resource=pods/portforward kubectl create clusterrolebinding sdk-user-port-forward-pods-binding --clusterrole=port-forward-pods --user=sdk-user + kubectl create clusterrole rayjob-creator --verb=get,list,create,delete,patch --resource=rayjobs --resource=rayjobs/status + kubectl create clusterrolebinding sdk-user-rayjob-creator --clusterrole=rayjob-creator --user=sdk-user kubectl config use-context sdk-user - name: Run e2e tests @@ -122,7 +124,8 @@ jobs: pip install poetry poetry install --with test,docs echo "Running e2e tests..." - poetry run pytest -v -s ./tests/e2e -m 'kind and nvidia_gpu' > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1 + # poetry run pytest -v -s ./tests/e2e -m 'kind and nvidia_gpu' > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1 + poetry run pytest -v -s ./tests/e2e/rayjob_existing_cluster_kind_test.py::TestRayJobExistingClusterKind::test_rayjob_ray_cluster_sdk_kind_nvidia_gpu > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1 env: GRPC_DNS_RESOLVER: "native" diff --git a/docs/sphinx/user-docs/e2e.rst b/docs/sphinx/user-docs/e2e.rst index 6f3d1462..b584cc29 100644 --- a/docs/sphinx/user-docs/e2e.rst +++ b/docs/sphinx/user-docs/e2e.rst @@ -66,12 +66,28 @@ instructions `__. # Add RBAC permissions to sdk-user kubectl create clusterrole list-ingresses --verb=get,list --resource=ingresses kubectl create clusterrolebinding sdk-user-list-ingresses --clusterrole=list-ingresses --user=sdk-user - kubectl create clusterrole appwrapper-creator --verb=get,list,create,delete,patch --resource=appwrappers - kubectl create clusterrolebinding sdk-user-appwrapper-creator --clusterrole=appwrapper-creator --user=sdk-user kubectl create clusterrole namespace-creator --verb=get,list,create,delete,patch --resource=namespaces kubectl create clusterrolebinding sdk-user-namespace-creator --clusterrole=namespace-creator --user=sdk-user - kubectl create clusterrole list-rayclusters --verb=get,list --resource=rayclusters - kubectl create clusterrolebinding sdk-user-list-rayclusters --clusterrole=list-rayclusters --user=sdk-user + kubectl create clusterrole raycluster-creator --verb=get,list,create,delete,patch --resource=rayclusters + kubectl create clusterrolebinding sdk-user-raycluster-creator --clusterrole=raycluster-creator --user=sdk-user + kubectl create clusterrole appwrapper-creator --verb=get,list,create,delete,patch --resource=appwrappers + kubectl create clusterrolebinding sdk-user-appwrapper-creator --clusterrole=appwrapper-creator --user=sdk-user + kubectl create clusterrole resourceflavor-creator --verb=get,list,create,delete --resource=resourceflavors + kubectl create clusterrolebinding sdk-user-resourceflavor-creator --clusterrole=resourceflavor-creator --user=sdk-user + kubectl create clusterrole clusterqueue-creator --verb=get,list,create,delete,patch --resource=clusterqueues + kubectl create clusterrolebinding sdk-user-clusterqueue-creator --clusterrole=clusterqueue-creator --user=sdk-user + kubectl create clusterrole localqueue-creator --verb=get,list,create,delete,patch --resource=localqueues + kubectl create clusterrolebinding sdk-user-localqueue-creator --clusterrole=localqueue-creator --user=sdk-user + kubectl create clusterrole list-secrets --verb=get,list --resource=secrets + kubectl create clusterrolebinding sdk-user-list-secrets --clusterrole=list-secrets --user=sdk-user + kubectl create clusterrole pod-creator --verb=get,list,watch --resource=pods + kubectl create clusterrolebinding sdk-user-pod-creator --clusterrole=pod-creator --user=sdk-user + kubectl create clusterrole service-reader --verb=get,list,watch --resource=services + kubectl create clusterrolebinding sdk-user-service-reader --clusterrole=service-reader --user=sdk-user + kubectl create clusterrole port-forward-pods --verb=create --resource=pods/portforward + kubectl create clusterrolebinding sdk-user-port-forward-pods-binding --clusterrole=port-forward-pods --user=sdk-user + kubectl create clusterrole rayjob-creator --verb=get,list,create,delete,patch --resource=rayjobs --resource=rayjobs/status + kubectl create clusterrolebinding sdk-user-rayjob-creator --clusterrole=rayjob-creator --user=sdk-user kubectl config use-context sdk-user - Install the latest development version of kueue diff --git a/src/codeflare_sdk/ray/rayjobs/rayjob.py b/src/codeflare_sdk/ray/rayjobs/rayjob.py index a1577d91..16bc8d44 100644 --- a/src/codeflare_sdk/ray/rayjobs/rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/rayjob.py @@ -54,6 +54,9 @@ def __init__( shutdown_after_job_finishes: Optional[bool] = None, ttl_seconds_after_finished: int = 0, active_deadline_seconds: Optional[int] = None, + entrypoint_num_cpus: Optional[int] = None, + entrypoint_num_gpus: Optional[int] = None, + backoff_limit: int = 3, ): """ Initialize a RayJob instance. @@ -100,6 +103,9 @@ def __init__( self.runtime_env = runtime_env self.ttl_seconds_after_finished = ttl_seconds_after_finished self.active_deadline_seconds = active_deadline_seconds + self.entrypoint_num_cpus = entrypoint_num_cpus + self.entrypoint_num_gpus = entrypoint_num_gpus + self.backoff_limit = backoff_limit # Auto-set shutdown_after_job_finishes based on cluster_config presence # If cluster_config is provided, we want to clean up the cluster after job finishes @@ -182,6 +188,7 @@ def _build_rayjob_cr(self) -> Dict[str, Any]: "entrypoint": self.entrypoint, "shutdownAfterJobFinishes": self.shutdown_after_job_finishes, "ttlSecondsAfterFinished": self.ttl_seconds_after_finished, + "backoffLimit": self.backoff_limit, }, } @@ -189,6 +196,12 @@ def _build_rayjob_cr(self) -> Dict[str, Any]: if self.active_deadline_seconds: rayjob_cr["spec"]["activeDeadlineSeconds"] = self.active_deadline_seconds + # Add entrypoint resource requirements if specified + if self.entrypoint_num_cpus is not None: + rayjob_cr["spec"]["entrypointNumCpus"] = self.entrypoint_num_cpus + if self.entrypoint_num_gpus is not None: + rayjob_cr["spec"]["entrypointNumGpus"] = self.entrypoint_num_gpus + # Add runtime environment if specified if self.runtime_env: rayjob_cr["spec"]["runtimeEnvYAML"] = str(self.runtime_env) diff --git a/tests/e2e/rayjob_existing_cluster_kind_test.py b/tests/e2e/rayjob_existing_cluster_kind_test.py new file mode 100644 index 00000000..3722c3aa --- /dev/null +++ b/tests/e2e/rayjob_existing_cluster_kind_test.py @@ -0,0 +1,152 @@ +from time import sleep + +from codeflare_sdk import Cluster, ClusterConfiguration +from codeflare_sdk.ray.rayjobs import RayJob +from codeflare_sdk.ray.rayjobs.status import CodeflareRayJobStatus + +import pytest + +from support import * + +# This test creates a Ray Cluster and then submits a RayJob against the existing cluster on Kind Cluster + + +@pytest.mark.kind +class TestRayJobExistingClusterKind: + def setup_method(self): + initialize_kubernetes_client(self) + + def teardown_method(self): + delete_namespace(self) + delete_kueue_resources(self) + + def test_rayjob_ray_cluster_sdk_kind(self): + self.setup_method() + create_namespace(self) + create_kueue_resources(self) + self.run_rayjob_against_existing_cluster_kind(accelerator="cpu") + + @pytest.mark.nvidia_gpu + def test_rayjob_ray_cluster_sdk_kind_nvidia_gpu(self): + self.setup_method() + create_namespace(self) + create_kueue_resources(self) + # self.run_rayjob_against_existing_cluster_kind( + # accelerator="gpu", number_of_gpus=1 + # ) + self.run_rayjob_against_existing_cluster_kind( + accelerator="cpu", number_of_gpus=0 + ) + + def run_rayjob_against_existing_cluster_kind( + self, accelerator, gpu_resource_name="nvidia.com/gpu", number_of_gpus=0 + ): + cluster_name = "existing-cluster" + cluster = Cluster( + ClusterConfiguration( + name=cluster_name, + namespace=self.namespace, + num_workers=1, + head_cpu_requests="500m", + head_cpu_limits="500m", + worker_cpu_requests="500m", + worker_cpu_limits=1, + worker_memory_requests=1, + worker_memory_limits=4, + # worker_extended_resource_requests={gpu_resource_name: number_of_gpus}, + # image="rayproject/ray:2.47.1", + write_to_file=True, + verify_tls=False, + ) + ) + + cluster.apply() + cluster.status() + cluster.wait_ready() + cluster.status() + cluster.details() + + print(f"โœ… Ray cluster '{cluster_name}' is ready") + + # test RayJob submission against the existing cluster + self.assert_rayjob_submit_against_existing_cluster( + cluster, accelerator, number_of_gpus + ) + + # Cleanup - manually tear down the cluster since job won't do it + print("๐Ÿงน Cleaning up Ray cluster") + cluster.down() + + def assert_rayjob_submit_against_existing_cluster( + self, cluster, accelerator, number_of_gpus + ): + """ + Test RayJob submission against an existing Ray cluster. + """ + cluster_name = cluster.config.name + job_name = f"mnist-rayjob-{accelerator}" + + print(f"๐Ÿš€ Testing RayJob submission against existing cluster '{cluster_name}'") + + # Create RayJob targeting the existing cluster + rayjob = RayJob( + job_name=job_name, + cluster_name=cluster_name, + namespace=self.namespace, + entrypoint="python mnist.py", + runtime_env={ + "working_dir": "./tests/e2e/", + "pip": "./tests/e2e/mnist_pip_requirements.txt", + "env_vars": get_setup_env_variables(ACCELERATOR=accelerator), + }, + shutdown_after_job_finishes=False, # Don't shutdown the existing cluster + ) + + # Submit the job + submission_result = rayjob.submit() + assert ( + submission_result == job_name + ), f"Job submission failed, expected {job_name}, got {submission_result}" + print(f"โœ… Successfully submitted RayJob '{job_name}' against existing cluster") + + # Monitor the job status until completion + self.monitor_rayjob_completion(rayjob, timeout=900) + + print(f"โœ… RayJob '{job_name}' completed successfully against existing cluster!") + + def monitor_rayjob_completion(self, rayjob: RayJob, timeout: int = 900): + """ + Monitor a RayJob until it completes or fails. + Args: + rayjob: The RayJob instance to monitor + timeout: Maximum time to wait in seconds (default: 15 minutes) + """ + print(f"โณ Monitoring RayJob '{rayjob.name}' status...") + + elapsed_time = 0 + check_interval = 10 # Check every 10 seconds + + while elapsed_time < timeout: + status, ready = rayjob.status(print_to_console=True) + + # Check if job has completed (either successfully or failed) + if status == CodeflareRayJobStatus.COMPLETE: + print(f"โœ… RayJob '{rayjob.name}' completed successfully!") + return + elif status == CodeflareRayJobStatus.FAILED: + raise AssertionError(f"โŒ RayJob '{rayjob.name}' failed!") + elif status == CodeflareRayJobStatus.RUNNING: + print(f"๐Ÿƒ RayJob '{rayjob.name}' is still running...") + elif status == CodeflareRayJobStatus.UNKNOWN: + print(f"โ“ RayJob '{rayjob.name}' status is unknown") + + # Wait before next check + sleep(check_interval) + elapsed_time += check_interval + + # If we reach here, the job has timed out + final_status, _ = rayjob.status(print_to_console=True) + raise TimeoutError( + f"โฐ RayJob '{rayjob.name}' did not complete within {timeout} seconds. " + f"Final status: {final_status}" + ) diff --git a/tests/e2e/rayjob_existing_cluster_oauth_test.py b/tests/e2e/rayjob_existing_cluster_oauth_test.py new file mode 100644 index 00000000..8647d745 --- /dev/null +++ b/tests/e2e/rayjob_existing_cluster_oauth_test.py @@ -0,0 +1,135 @@ +import pytest +from time import sleep + +from codeflare_sdk import ( + Cluster, + ClusterConfiguration, + TokenAuthentication, +) +from codeflare_sdk.ray.rayjobs import RayJob +from codeflare_sdk.ray.rayjobs.status import CodeflareRayJobStatus + +from support import * + +# This test creates a Ray Cluster and then submits a RayJob against the existing cluster on OpenShift + + +@pytest.mark.openshift +class TestRayJobExistingClusterOauth: + def setup_method(self): + initialize_kubernetes_client(self) + + def teardown_method(self): + delete_namespace(self) + delete_kueue_resources(self) + + def test_rayjob_against_existing_cluster_oauth(self): + self.setup_method() + create_namespace(self) + create_kueue_resources(self) + self.run_rayjob_against_existing_cluster_oauth() + + def run_rayjob_against_existing_cluster_oauth(self): + ray_image = get_ray_image() + + auth = TokenAuthentication( + token=run_oc_command(["whoami", "--show-token=true"]), + server=run_oc_command(["whoami", "--show-server=true"]), + skip_tls=True, + ) + auth.login() + + cluster_name = "existing-cluster" + + cluster = Cluster( + ClusterConfiguration( + name=cluster_name, + namespace=self.namespace, + num_workers=1, + head_cpu_requests="500m", + head_cpu_limits="500m", + worker_cpu_requests=1, + worker_cpu_limits=1, + worker_memory_requests=1, + worker_memory_limits=4, + image=ray_image, + write_to_file=True, + verify_tls=False, + ) + ) + + cluster.apply() + cluster.status() + cluster.wait_ready() + cluster.status() + cluster.details() + + print(f"โœ… Ray cluster '{cluster_name}' is ready") + + job_name = "existing-cluster-rayjob" + + rayjob = RayJob( + job_name=job_name, + cluster_name=cluster_name, + namespace=self.namespace, + entrypoint="python -c \"import ray; ray.init(); print('Hello from RayJob!'); print(f'Ray version: {ray.__version__}'); import time; time.sleep(30); print('RayJob completed successfully!')\"", + runtime_env={ + "pip": ["torch", "pytorch-lightning", "torchmetrics", "torchvision"], + "env_vars": get_setup_env_variables(ACCELERATOR="cpu"), + }, + shutdown_after_job_finishes=False, + ) + + # Submit the job + print( + f"๐Ÿš€ Submitting RayJob '{job_name}' against existing cluster '{cluster_name}'" + ) + submission_result = rayjob.submit() + assert ( + submission_result == job_name + ), f"Job submission failed, expected {job_name}, got {submission_result}" + print(f"โœ… Successfully submitted RayJob '{job_name}'") + + # Monitor the job status until completion + self.monitor_rayjob_completion(rayjob) + + # Cleanup - manually tear down the cluster since job won't do it + print("๐Ÿงน Cleaning up Ray cluster") + cluster.down() + + def monitor_rayjob_completion(self, rayjob: RayJob, timeout: int = 300): + """ + Monitor a RayJob until it completes or fails. + Args: + rayjob: The RayJob instance to monitor + timeout: Maximum time to wait in seconds (default: 15 minutes) + """ + print(f"โณ Monitoring RayJob '{rayjob.name}' status...") + + elapsed_time = 0 + check_interval = 10 # Check every 10 seconds + + while elapsed_time < timeout: + status, ready = rayjob.status(print_to_console=True) + + # Check if job has completed (either successfully or failed) + if status == CodeflareRayJobStatus.COMPLETE: + print(f"โœ… RayJob '{rayjob.name}' completed successfully!") + return + elif status == CodeflareRayJobStatus.FAILED: + raise AssertionError(f"โŒ RayJob '{rayjob.name}' failed!") + elif status == CodeflareRayJobStatus.RUNNING: + print(f"๐Ÿƒ RayJob '{rayjob.name}' is still running...") + elif status == CodeflareRayJobStatus.UNKNOWN: + print(f"โ“ RayJob '{rayjob.name}' status is unknown") + + # Wait before next check + sleep(check_interval) + elapsed_time += check_interval + + # If we reach here, the job has timed out + final_status, _ = rayjob.status(print_to_console=True) + raise TimeoutError( + f"โฐ RayJob '{rayjob.name}' did not complete within {timeout} seconds. " + f"Final status: {final_status}" + )