From 652858f7a9c92936b78fbf22188188aa9c4daadb Mon Sep 17 00:00:00 2001 From: Leah Cole Date: Fri, 7 Apr 2023 10:11:14 -0400 Subject: [PATCH 1/6] add retries for subnetwork not ready errors --- dataproc/snippets/create_cluster_test.py | 5 +++-- dataproc/snippets/submit_job_test.py | 4 +++- dataproc/snippets/update_cluster_test.py | 4 +++- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/dataproc/snippets/create_cluster_test.py b/dataproc/snippets/create_cluster_test.py index c180c4abd5d..ebdb0e11210 100644 --- a/dataproc/snippets/create_cluster_test.py +++ b/dataproc/snippets/create_cluster_test.py @@ -17,7 +17,7 @@ import backoff from google.api_core.exceptions import (InternalServerError, NotFound, - ServiceUnavailable) + ServiceUnavailable, InvalidArgument) from google.cloud import dataproc_v1 as dataproc import pytest @@ -50,7 +50,8 @@ def teardown(): print("Cluster already deleted") -@backoff.on_exception(backoff.expo, (InternalServerError, ServiceUnavailable), max_tries=5) +# InvalidArgument is thrown when the subnetwork is not ready +@backoff.on_exception(backoff.expo, (InternalServerError, ServiceUnavailable, InvalidArgument), max_tries=5) def test_cluster_create(capsys): # Wrapper function for client library function create_cluster.create_cluster(PROJECT_ID, REGION, CLUSTER_NAME) diff --git a/dataproc/snippets/submit_job_test.py b/dataproc/snippets/submit_job_test.py index a3eef340478..37e29cac66e 100644 --- a/dataproc/snippets/submit_job_test.py +++ b/dataproc/snippets/submit_job_test.py @@ -17,7 +17,7 @@ import backoff from google.api_core.exceptions import (InternalServerError, NotFound, - ServiceUnavailable) + ServiceUnavailable, InvalidArgument) from google.cloud import dataproc_v1 as dataproc import pytest @@ -36,7 +36,9 @@ } +# Retry on InvalidArgument subnetwork not ready error @pytest.fixture(autouse=True) +@backoff.on_exception(backoff.expo, (InvalidArgument), max_tries=3) def setup_teardown(): try: cluster_client = dataproc.ClusterControllerClient( diff --git a/dataproc/snippets/update_cluster_test.py b/dataproc/snippets/update_cluster_test.py index 3b35e3e5b3a..8a57d108909 100644 --- a/dataproc/snippets/update_cluster_test.py +++ b/dataproc/snippets/update_cluster_test.py @@ -20,7 +20,7 @@ import backoff from google.api_core.exceptions import (InternalServerError, NotFound, - ServiceUnavailable) + ServiceUnavailable, InvalidArgument) from google.cloud.dataproc_v1.services.cluster_controller.client import \ ClusterControllerClient import pytest @@ -49,7 +49,9 @@ def cluster_client(): return cluster_client +# InvalidArgument is thrown when the subnetwork is not ready @pytest.fixture(autouse=True) +@backoff.on_exception(backoff.expo, (InvalidArgument), max_tries=3) def setup_teardown(cluster_client): try: # Create the cluster. From 34fa21524cbe83b192caed036dee7dfa8812475e Mon Sep 17 00:00:00 2001 From: Leah Cole Date: Fri, 7 Apr 2023 10:19:29 -0400 Subject: [PATCH 2/6] fix lint --- dataproc/snippets/create_cluster_test.py | 4 ++-- dataproc/snippets/submit_job_test.py | 4 ++-- dataproc/snippets/update_cluster_test.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dataproc/snippets/create_cluster_test.py b/dataproc/snippets/create_cluster_test.py index ebdb0e11210..fb0d4bb7e97 100644 --- a/dataproc/snippets/create_cluster_test.py +++ b/dataproc/snippets/create_cluster_test.py @@ -16,8 +16,8 @@ import uuid import backoff -from google.api_core.exceptions import (InternalServerError, NotFound, - ServiceUnavailable, InvalidArgument) +from google.api_core.exceptions import (InternalServerError, InvalidArgument, NotFound, + ServiceUnavailable) from google.cloud import dataproc_v1 as dataproc import pytest diff --git a/dataproc/snippets/submit_job_test.py b/dataproc/snippets/submit_job_test.py index 37e29cac66e..3ed7f644aef 100644 --- a/dataproc/snippets/submit_job_test.py +++ b/dataproc/snippets/submit_job_test.py @@ -16,8 +16,8 @@ import uuid import backoff -from google.api_core.exceptions import (InternalServerError, NotFound, - ServiceUnavailable, InvalidArgument) +from google.api_core.exceptions import (InternalServerError, InvalidArgument, NotFound, + ServiceUnavailable) from google.cloud import dataproc_v1 as dataproc import pytest diff --git a/dataproc/snippets/update_cluster_test.py b/dataproc/snippets/update_cluster_test.py index 8a57d108909..702957c0eee 100644 --- a/dataproc/snippets/update_cluster_test.py +++ b/dataproc/snippets/update_cluster_test.py @@ -19,8 +19,8 @@ import uuid import backoff -from google.api_core.exceptions import (InternalServerError, NotFound, - ServiceUnavailable, InvalidArgument) +from google.api_core.exceptions import (InternalServerError, InvalidArgument, NotFound, + ServiceUnavailable) from google.cloud.dataproc_v1.services.cluster_controller.client import \ ClusterControllerClient import pytest From 1e865259e65235c682b04dc0a4c3044794bb267e Mon Sep 17 00:00:00 2001 From: Leah Cole Date: Fri, 7 Apr 2023 11:49:25 -0400 Subject: [PATCH 3/6] experiment: remove autouse --- dataproc/snippets/submit_job_test.py | 4 ++-- dataproc/snippets/update_cluster_test.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dataproc/snippets/submit_job_test.py b/dataproc/snippets/submit_job_test.py index 3ed7f644aef..6d4b7e413da 100644 --- a/dataproc/snippets/submit_job_test.py +++ b/dataproc/snippets/submit_job_test.py @@ -37,7 +37,7 @@ # Retry on InvalidArgument subnetwork not ready error -@pytest.fixture(autouse=True) +@pytest.fixture(scope="module") @backoff.on_exception(backoff.expo, (InvalidArgument), max_tries=3) def setup_teardown(): try: @@ -71,7 +71,7 @@ def setup_teardown(): @backoff.on_exception(backoff.expo, (InternalServerError, ServiceUnavailable), max_tries=5) -def test_submit_job(capsys): +def test_submit_job(capsys, setup_teardown): submit_job.submit_job(PROJECT_ID, REGION, CLUSTER_NAME) out, _ = capsys.readouterr() diff --git a/dataproc/snippets/update_cluster_test.py b/dataproc/snippets/update_cluster_test.py index 702957c0eee..1f8752d3a48 100644 --- a/dataproc/snippets/update_cluster_test.py +++ b/dataproc/snippets/update_cluster_test.py @@ -50,7 +50,7 @@ def cluster_client(): # InvalidArgument is thrown when the subnetwork is not ready -@pytest.fixture(autouse=True) +@pytest.fixture(scope="module") @backoff.on_exception(backoff.expo, (InvalidArgument), max_tries=3) def setup_teardown(cluster_client): try: @@ -76,7 +76,7 @@ def setup_teardown(cluster_client): @backoff.on_exception(backoff.expo, (InternalServerError, ServiceUnavailable), max_tries=5) -def test_update_cluster(capsys, cluster_client: ClusterControllerClient): +def test_update_cluster(capsys, cluster_client: ClusterControllerClient, setup_teardown): # Wrapper function for client library function update_cluster.update_cluster(PROJECT_ID, REGION, CLUSTER_NAME, NEW_NUM_INSTANCES) new_num_cluster = cluster_client.get_cluster( From 5cac160da7dbfab61d91e7a79f1aeff096503d7a Mon Sep 17 00:00:00 2001 From: Leah Cole Date: Fri, 7 Apr 2023 12:15:39 -0400 Subject: [PATCH 4/6] chain decorators --- dataproc/snippets/submit_job_test.py | 58 +++++++++++++----------- dataproc/snippets/update_cluster_test.py | 44 +++++++++--------- 2 files changed, 54 insertions(+), 48 deletions(-) diff --git a/dataproc/snippets/submit_job_test.py b/dataproc/snippets/submit_job_test.py index 6d4b7e413da..6b3f6f4ff6e 100644 --- a/dataproc/snippets/submit_job_test.py +++ b/dataproc/snippets/submit_job_test.py @@ -36,42 +36,46 @@ } -# Retry on InvalidArgument subnetwork not ready error -@pytest.fixture(scope="module") -@backoff.on_exception(backoff.expo, (InvalidArgument), max_tries=3) -def setup_teardown(): - try: - cluster_client = dataproc.ClusterControllerClient( - client_options={ - "api_endpoint": "{}-dataproc.googleapis.com:443".format(REGION) - } - ) - - # Create the cluster. - operation = cluster_client.create_cluster( - request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} - ) - operation.result() - yield - - finally: +@pytest.fixture(autouse=True) +def setup_teardown(): + # Retry on InvalidArgument subnetwork not ready error + @backoff.on_exception(backoff.expo, (InvalidArgument), max_tries=3) + def eventually_consistent_operation(): try: - operation = cluster_client.delete_cluster( - request={ - "project_id": PROJECT_ID, - "region": REGION, - "cluster_name": CLUSTER_NAME, + cluster_client = dataproc.ClusterControllerClient( + client_options={ + "api_endpoint": "{}-dataproc.googleapis.com:443".format(REGION) } ) + + # Create the cluster. + operation = cluster_client.create_cluster( + request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} + ) + print("hi") operation.result() + print("yes") + yield + + finally: + try: + operation = cluster_client.delete_cluster( + request={ + "project_id": PROJECT_ID, + "region": REGION, + "cluster_name": CLUSTER_NAME, + } + ) + operation.result() - except NotFound: - print("Cluster already deleted") + except NotFound: + print("Cluster already deleted") + eventually_consistent_operation() @backoff.on_exception(backoff.expo, (InternalServerError, ServiceUnavailable), max_tries=5) -def test_submit_job(capsys, setup_teardown): +def test_submit_job(capsys): submit_job.submit_job(PROJECT_ID, REGION, CLUSTER_NAME) out, _ = capsys.readouterr() diff --git a/dataproc/snippets/update_cluster_test.py b/dataproc/snippets/update_cluster_test.py index 1f8752d3a48..7be5e60729e 100644 --- a/dataproc/snippets/update_cluster_test.py +++ b/dataproc/snippets/update_cluster_test.py @@ -49,34 +49,36 @@ def cluster_client(): return cluster_client -# InvalidArgument is thrown when the subnetwork is not ready -@pytest.fixture(scope="module") -@backoff.on_exception(backoff.expo, (InvalidArgument), max_tries=3) +@pytest.fixture(autouse=True) def setup_teardown(cluster_client): - try: - # Create the cluster. - operation = cluster_client.create_cluster( - request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} - ) - operation.result() - - yield - finally: + # InvalidArgument is thrown when the subnetwork is not ready + @backoff.on_exception(backoff.expo, (InvalidArgument), max_tries=3) + def eventually_consistent_operation(): try: - operation = cluster_client.delete_cluster( - request={ - "project_id": PROJECT_ID, - "region": REGION, - "cluster_name": CLUSTER_NAME, - } + # Create the cluster. + operation = cluster_client.create_cluster( + request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} ) operation.result() - except NotFound: - print("Cluster already deleted") + + yield + finally: + try: + operation = cluster_client.delete_cluster( + request={ + "project_id": PROJECT_ID, + "region": REGION, + "cluster_name": CLUSTER_NAME, + } + ) + operation.result() + except NotFound: + print("Cluster already deleted") + eventually_consistent_operation() @backoff.on_exception(backoff.expo, (InternalServerError, ServiceUnavailable), max_tries=5) -def test_update_cluster(capsys, cluster_client: ClusterControllerClient, setup_teardown): +def test_update_cluster(capsys, cluster_client: ClusterControllerClient): # Wrapper function for client library function update_cluster.update_cluster(PROJECT_ID, REGION, CLUSTER_NAME, NEW_NUM_INSTANCES) new_num_cluster = cluster_client.get_cluster( From 2756a9b38f8610877bfa58192e766f60c7e45a4f Mon Sep 17 00:00:00 2001 From: Leah Cole Date: Fri, 7 Apr 2023 12:20:18 -0400 Subject: [PATCH 5/6] fix lint --- dataproc/snippets/submit_job_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dataproc/snippets/submit_job_test.py b/dataproc/snippets/submit_job_test.py index 6b3f6f4ff6e..caae19e791b 100644 --- a/dataproc/snippets/submit_job_test.py +++ b/dataproc/snippets/submit_job_test.py @@ -36,7 +36,6 @@ } - @pytest.fixture(autouse=True) def setup_teardown(): # Retry on InvalidArgument subnetwork not ready error From f51d1e4db5e80bad6cdf41a8bbb8cbe538095963 Mon Sep 17 00:00:00 2001 From: Leah Cole Date: Fri, 7 Apr 2023 13:49:22 -0400 Subject: [PATCH 6/6] refactor fixtures with inner functions --- dataproc/snippets/submit_job_test.py | 52 ++++++++++++------------ dataproc/snippets/update_cluster_test.py | 40 +++++++++--------- 2 files changed, 47 insertions(+), 45 deletions(-) diff --git a/dataproc/snippets/submit_job_test.py b/dataproc/snippets/submit_job_test.py index caae19e791b..e9395659c75 100644 --- a/dataproc/snippets/submit_job_test.py +++ b/dataproc/snippets/submit_job_test.py @@ -38,39 +38,39 @@ @pytest.fixture(autouse=True) def setup_teardown(): + cluster_client = dataproc.ClusterControllerClient( + client_options={ + "api_endpoint": "{}-dataproc.googleapis.com:443".format(REGION) + } + ) + # Retry on InvalidArgument subnetwork not ready error @backoff.on_exception(backoff.expo, (InvalidArgument), max_tries=3) - def eventually_consistent_operation(): + def setup(): + # Create the cluster. + operation = cluster_client.create_cluster( + request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} + ) + operation.result() + + def teardown(): try: - cluster_client = dataproc.ClusterControllerClient( - client_options={ - "api_endpoint": "{}-dataproc.googleapis.com:443".format(REGION) + operation = cluster_client.delete_cluster( + request={ + "project_id": PROJECT_ID, + "region": REGION, + "cluster_name": CLUSTER_NAME, } ) - - # Create the cluster. - operation = cluster_client.create_cluster( - request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} - ) - print("hi") operation.result() - print("yes") - yield - - finally: - try: - operation = cluster_client.delete_cluster( - request={ - "project_id": PROJECT_ID, - "region": REGION, - "cluster_name": CLUSTER_NAME, - } - ) - operation.result() - except NotFound: - print("Cluster already deleted") - eventually_consistent_operation() + except NotFound: + print("Cluster already deleted") + try: + setup() + yield + finally: + teardown() @backoff.on_exception(backoff.expo, (InternalServerError, ServiceUnavailable), max_tries=5) diff --git a/dataproc/snippets/update_cluster_test.py b/dataproc/snippets/update_cluster_test.py index 7be5e60729e..df421e67071 100644 --- a/dataproc/snippets/update_cluster_test.py +++ b/dataproc/snippets/update_cluster_test.py @@ -53,28 +53,30 @@ def cluster_client(): def setup_teardown(cluster_client): # InvalidArgument is thrown when the subnetwork is not ready @backoff.on_exception(backoff.expo, (InvalidArgument), max_tries=3) - def eventually_consistent_operation(): + def setup(): + # Create the cluster. + operation = cluster_client.create_cluster( + request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} + ) + operation.result() + + def teardown(): try: - # Create the cluster. - operation = cluster_client.create_cluster( - request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} + operation = cluster_client.delete_cluster( + request={ + "project_id": PROJECT_ID, + "region": REGION, + "cluster_name": CLUSTER_NAME, + } ) operation.result() - - yield - finally: - try: - operation = cluster_client.delete_cluster( - request={ - "project_id": PROJECT_ID, - "region": REGION, - "cluster_name": CLUSTER_NAME, - } - ) - operation.result() - except NotFound: - print("Cluster already deleted") - eventually_consistent_operation() + except NotFound: + print("Cluster already deleted") + try: + setup() + yield + finally: + teardown() @backoff.on_exception(backoff.expo, (InternalServerError, ServiceUnavailable), max_tries=5)