diff --git a/dataproc/snippets/submit_job_test.py b/dataproc/snippets/submit_job_test.py index 826e0c8a7cf..d2201c15230 100644 --- a/dataproc/snippets/submit_job_test.py +++ b/dataproc/snippets/submit_job_test.py @@ -12,106 +12,122 @@ # See the License for the specific language governing permissions and # limitations under the License. + import os import uuid import backoff from google.api_core.exceptions import ( AlreadyExists, + Cancelled, InternalServerError, InvalidArgument, NotFound, ServiceUnavailable, ) -from google.cloud import dataproc_v1 as dataproc +from google.cloud.dataproc_v1 import ClusterStatus, GetClusterRequest +from google.cloud.dataproc_v1.services.cluster_controller.client import ( + ClusterControllerClient, +) import pytest import submit_job PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] REGION = "us-central1" +CLUSTER_NAME = f"py-sj-test-{str(uuid.uuid4())}" +NEW_NUM_INSTANCES = 3 +CLUSTER = { + "project_id": PROJECT_ID, + "cluster_name": CLUSTER_NAME, + "config": { + "master_config": { + "num_instances": 1, + "machine_type_uri": "n1-standard-2", + "disk_config": {"boot_disk_size_gb": 100}, + }, + "worker_config": { + "num_instances": 2, + "machine_type_uri": "n1-standard-2", + "disk_config": {"boot_disk_size_gb": 100}, + }, + }, +} @pytest.fixture(scope="module") def cluster_client(): - return dataproc.ClusterControllerClient( + cluster_client = ClusterControllerClient( client_options={"api_endpoint": f"{REGION}-dataproc.googleapis.com:443"} ) + return cluster_client @backoff.on_exception(backoff.expo, (ServiceUnavailable, InvalidArgument), max_tries=5) -def setup_cluster(cluster_client, curr_cluster_name): - CLUSTER = { - "project_id": PROJECT_ID, - "cluster_name": curr_cluster_name, - "config": { - "master_config": { - "num_instances": 1, - "machine_type_uri": "n1-standard-2", - "disk_config": {"boot_disk_size_gb": 100}, - }, - "worker_config": { - "num_instances": 2, - "machine_type_uri": "n1-standard-2", - "disk_config": {"boot_disk_size_gb": 100}, - }, - }, - } - - # Create the cluster. - operation = cluster_client.create_cluster( - request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} - ) - operation.result() +def setup_cluster(cluster_client): + try: + # Create the cluster. + operation = cluster_client.create_cluster( + request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} + ) + operation.result() + except AlreadyExists: + print("Cluster already exists, utilize existing cluster") @backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=5) -def teardown_cluster(cluster_client, curr_cluster_name): +def teardown_cluster(cluster_client): try: operation = cluster_client.delete_cluster( request={ "project_id": PROJECT_ID, "region": REGION, - "cluster_name": curr_cluster_name, + "cluster_name": CLUSTER_NAME, } ) operation.result() - except NotFound: print("Cluster already deleted") -@pytest.fixture(scope="module") -def cluster_name(cluster_client): - curr_cluster_name = f"py-sj-test-{str(uuid.uuid4())}" - - try: - setup_cluster(cluster_client, curr_cluster_name) - yield curr_cluster_name - except ( - AlreadyExists - ): # 409 can happen when we backoff on service errors during submission - print("Already exists, skipping cluster creation") - yield curr_cluster_name - finally: - teardown_cluster(cluster_client, curr_cluster_name) - - -# InvalidArgument is thrown when the subnetwork is not ready @backoff.on_exception( - backoff.expo, - (InvalidArgument, InternalServerError, ServiceUnavailable), - max_tries=5, + backoff.expo, (InternalServerError, ServiceUnavailable, Cancelled), max_tries=5 ) -def test_submit_job(capsys, cluster_name, cluster_client): - request = dataproc.GetClusterRequest( - project_id=PROJECT_ID, region=REGION, cluster_name=cluster_name - ) - response = cluster_client.get_cluster(request=request) - # verify the cluster is in the RUNNING state before proceeding - # this prevents a retry on InvalidArgument if the cluster is in an ERROR state - assert response.status.state == dataproc.ClusterStatus.State.RUNNING - submit_job.submit_job(PROJECT_ID, REGION, cluster_name) - out, _ = capsys.readouterr() - - assert "Job finished successfully" in out +def test_submit_job(capsys, cluster_client: ClusterControllerClient): + # using this inner function instead of backoff to retry on an Error in the created cluster + # means that we can retry on the AssertionError of an errored out cluster but not other + # AssertionErrors, and it means we don't have to retry on an InvalidArgument that would occur in + # update cluster if the cluster were in an error state + def test_submit_job_inner( + cluster_client: ClusterControllerClient, submit_retries: int + ): + try: + setup_cluster(cluster_client) + request = GetClusterRequest( + project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME + ) + response = cluster_client.get_cluster(request=request) + # verify the cluster is in the RUNNING state before proceeding + # this prevents a retry on InvalidArgument if the cluster is in an ERROR state + assert response.status.state == ClusterStatus.State.RUNNING + submit_job.submit_job(PROJECT_ID, REGION, CLUSTER_NAME) + out, _ = capsys.readouterr() + + assert "Job finished successfully" in out + except AssertionError as e: + if ( + submit_retries < 3 + and response.status.state == ClusterStatus.State.ERROR + ): + teardown_cluster(cluster_client) + test_submit_job_inner( + cluster_client=cluster_client, submit_retries=submit_retries + 1 + ) + else: + # if we have exceeded the number of retries or the assertion error + # is not related to the cluster being in error, raise it + raise e + finally: + teardown_cluster(cluster_client) + + test_submit_job_inner(cluster_client=cluster_client, submit_retries=0) diff --git a/dataproc/snippets/update_cluster_test.py b/dataproc/snippets/update_cluster_test.py index 91b0afb93ce..09fba7dce31 100644 --- a/dataproc/snippets/update_cluster_test.py +++ b/dataproc/snippets/update_cluster_test.py @@ -96,26 +96,50 @@ def teardown_cluster(cluster_client): backoff.expo, (InternalServerError, ServiceUnavailable, Cancelled), max_tries=5 ) def test_update_cluster(capsys, cluster_client: ClusterControllerClient): - try: - setup_cluster(cluster_client) - request = GetClusterRequest( - project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME - ) - response = cluster_client.get_cluster(request=request) - # verify the cluster is in the RUNNING state before proceeding - # this prevents a retry on InvalidArgument if the cluster is in an ERROR state - assert response.status.state == ClusterStatus.State.RUNNING - - # Wrapper function for client library function - update_cluster.update_cluster( - PROJECT_ID, REGION, CLUSTER_NAME, NEW_NUM_INSTANCES - ) - new_num_cluster = cluster_client.get_cluster( - project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME - ) - out, _ = capsys.readouterr() - assert CLUSTER_NAME in out - assert new_num_cluster.config.worker_config.num_instances == NEW_NUM_INSTANCES + # using this inner function instead of backoff to retry on an Error in the created cluster + # means that we can retry on the AssertionError of an errored out cluster but not other + # AssertionErrors, and it means we don't have to retry on an InvalidArgument that would occur in + # update cluster if the cluster were in an error state + def test_update_cluster_inner( + cluster_client: ClusterControllerClient, update_retries: int + ): + try: + setup_cluster(cluster_client) + request = GetClusterRequest( + project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME + ) + response = cluster_client.get_cluster(request=request) + + # verify the cluster is in the RUNNING state before proceeding + # this prevents a retry on InvalidArgument if the cluster is in an ERROR state + assert response.status.state == ClusterStatus.State.RUNNING + + # Wrapper function for client library function + update_cluster.update_cluster( + PROJECT_ID, REGION, CLUSTER_NAME, NEW_NUM_INSTANCES + ) + new_num_cluster = cluster_client.get_cluster( + project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME + ) + out, _ = capsys.readouterr() + assert CLUSTER_NAME in out + assert ( + new_num_cluster.config.worker_config.num_instances == NEW_NUM_INSTANCES + ) + except AssertionError as e: + if ( + update_retries < 3 + and response.status.state == ClusterStatus.State.ERROR + ): + teardown_cluster(cluster_client) + test_update_cluster_inner( + cluster_client=cluster_client, update_retries=update_retries + 1 + ) + else: + # if we have exceeded the number of retries or the assertion error + # is not related to the cluster being in error, raise it + raise e + finally: + teardown_cluster(cluster_client) - finally: - teardown_cluster(cluster_client) + test_update_cluster_inner(cluster_client=cluster_client, update_retries=0)