diff --git a/dataproc/snippets/create_cluster_test.py b/dataproc/snippets/create_cluster_test.py index 9cebef3b7fe..c72ef2a55c4 100644 --- a/dataproc/snippets/create_cluster_test.py +++ b/dataproc/snippets/create_cluster_test.py @@ -16,7 +16,7 @@ import uuid import backoff -from google.api_core.exceptions import (InternalServerError, InvalidArgument, NotFound, +from google.api_core.exceptions import (AlreadyExists, InternalServerError, InvalidArgument, NotFound, ServiceUnavailable) from google.cloud import dataproc_v1 as dataproc @@ -27,11 +27,13 @@ CLUSTER_NAME = "py-cc-test-{}".format(str(uuid.uuid4())) +cluster_client = dataproc.ClusterControllerClient( + client_options={"api_endpoint": f"{REGION}-dataproc.googleapis.com:443"} +) + + @backoff.on_exception(backoff.expo, (Exception), max_tries=5) def teardown(): - cluster_client = dataproc.ClusterControllerClient( - client_options={"api_endpoint": f"{REGION}-dataproc.googleapis.com:443"} - ) # Client library function try: operation = cluster_client.delete_cluster( @@ -53,8 +55,13 @@ def test_cluster_create(capsys): # Wrapper function for client library function try: create_cluster.create_cluster(PROJECT_ID, REGION, CLUSTER_NAME) + out, _ = capsys.readouterr() + assert CLUSTER_NAME in out + except AlreadyExists: + request = dataproc.GetClusterRequest(project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME) + response = cluster_client.get_cluster(request=request) + assert response.status.state == dataproc.ClusterStatus.State.RUNNING # verify the cluster is in the RUNNING state + out, _ = capsys.readouterr() + assert CLUSTER_NAME in out finally: teardown() - - out, _ = capsys.readouterr() - assert CLUSTER_NAME in out diff --git a/dataproc/snippets/submit_job_test.py b/dataproc/snippets/submit_job_test.py index 06a4f5382c9..807ac88b457 100644 --- a/dataproc/snippets/submit_job_test.py +++ b/dataproc/snippets/submit_job_test.py @@ -16,7 +16,7 @@ import uuid import backoff -from google.api_core.exceptions import (AlreadyExists, InternalServerError, NotFound, +from google.api_core.exceptions import (AlreadyExists, InternalServerError, InvalidArgument, NotFound, ServiceUnavailable) from google.cloud import dataproc_v1 as dataproc import pytest @@ -36,7 +36,7 @@ def cluster_client(): ) -@backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=5) +@backoff.on_exception(backoff.expo, (ServiceUnavailable, InvalidArgument), max_tries=5) def setup_cluster(cluster_client, curr_cluster_name): CLUSTER = { @@ -85,7 +85,8 @@ def cluster_name(cluster_client): teardown_cluster(cluster_client, curr_cluster_name) -@backoff.on_exception(backoff.expo, (InternalServerError, ServiceUnavailable), max_tries=5) +# InvalidArgument is thrown when the subnetwork is not ready +@backoff.on_exception(backoff.expo, (InvalidArgument, InternalServerError, ServiceUnavailable), max_tries=5) def test_submit_job(capsys, cluster_name): submit_job.submit_job(PROJECT_ID, REGION, cluster_name) out, _ = capsys.readouterr() diff --git a/dataproc/snippets/update_cluster_test.py b/dataproc/snippets/update_cluster_test.py index 26cb3ddedec..cbcc7bf671b 100644 --- a/dataproc/snippets/update_cluster_test.py +++ b/dataproc/snippets/update_cluster_test.py @@ -20,6 +20,7 @@ import backoff from google.api_core.exceptions import ( + AlreadyExists, Cancelled, InternalServerError, InvalidArgument, @@ -57,11 +58,14 @@ def cluster_client(): @backoff.on_exception(backoff.expo, (ServiceUnavailable, InvalidArgument), max_tries=5) def setup_cluster(cluster_client): - # Create the cluster. - operation = cluster_client.create_cluster( - request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} - ) - operation.result() + try: + # Create the cluster. + operation = cluster_client.create_cluster( + request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} + ) + operation.result() + except AlreadyExists: + print("Cluster already exists, utilize existing cluster") @backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=5)