Skip to content

Commit be5b89e

Browse files
committed
add more error handling
1 parent e3cd4e4 commit be5b89e

File tree

3 files changed

+55
-10
lines changed

3 files changed

+55
-10
lines changed

dataproc/snippets/create_cluster_test.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import datetime
1516
import os
1617
import uuid
1718

1819
import backoff
19-
from google.api_core.exceptions import (InternalServerError, InvalidArgument, NotFound,
20+
from google.api_core.exceptions import (AlreadyExists, InternalServerError, InvalidArgument, NotFound,
2021
ServiceUnavailable)
2122
from google.cloud import dataproc_v1 as dataproc
2223

@@ -26,12 +27,21 @@
2627
REGION = "us-west1"
2728
CLUSTER_NAME = "py-cc-test-{}".format(str(uuid.uuid4()))
2829

30+
cluster_client = dataproc.ClusterControllerClient(
31+
client_options={"api_endpoint": f"{REGION}-dataproc.googleapis.com:443"}
32+
)
33+
2934

3035
@backoff.on_exception(backoff.expo, (Exception), max_tries=5)
3136
def teardown():
37+
<<<<<<< HEAD
3238
cluster_client = dataproc.ClusterControllerClient(
3339
client_options={"api_endpoint": f"{REGION}-dataproc.googleapis.com:443"}
3440
)
41+
=======
42+
yield
43+
44+
>>>>>>> ceedc468a (add more error handling)
3545
# Client library function
3646
try:
3747
operation = cluster_client.delete_cluster(
@@ -53,8 +63,21 @@ def test_cluster_create(capsys):
5363
# Wrapper function for client library function
5464
try:
5565
create_cluster.create_cluster(PROJECT_ID, REGION, CLUSTER_NAME)
56-
finally:
57-
teardown()
66+
out, _ = capsys.readouterr()
67+
assert CLUSTER_NAME in out
68+
except AlreadyExists:
69+
request = dataproc.GetClusterRequest(project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME)
70+
response = cluster_client.get_cluster(request=request)
71+
assert response.status.state == dataproc.ClusterStatus.State(2) # verify the cluster is in the RUNNING state
5872

59-
out, _ = capsys.readouterr()
60-
assert CLUSTER_NAME in out
73+
status_start = response.status.state_start_time # when the cluster started being in the RUNNING state
74+
now = datetime.datetime.now(datetime.timezone.utc)
75+
diff = now-status_start
76+
# check that it's been running for less than 20 min
77+
# this means we probably did a backoff during the creation for a different reason
78+
# and that this cluster was created as part of this test, so we can continue
79+
assert diff.seconds/60 < 20
80+
out, _ = capsys.readouterr()
81+
assert CLUSTER_NAME in out
82+
finally:
83+
teardown()

dataproc/snippets/submit_job_test.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import uuid
1717

1818
import backoff
19-
from google.api_core.exceptions import (AlreadyExists, InternalServerError, NotFound,
19+
from google.api_core.exceptions import (AlreadyExists, InternalServerError, InvalidArgument, NotFound,
2020
ServiceUnavailable)
2121
from google.cloud import dataproc_v1 as dataproc
2222
import pytest
@@ -85,7 +85,8 @@ def cluster_name(cluster_client):
8585
teardown_cluster(cluster_client, curr_cluster_name)
8686

8787

88-
@backoff.on_exception(backoff.expo, (InternalServerError, ServiceUnavailable), max_tries=5)
88+
# InvalidArgument is thrown when the subnetwork is not ready
89+
@backoff.on_exception(backoff.expo, (InvalidArgument, InternalServerError, ServiceUnavailable), max_tries=5)
8990
def test_submit_job(capsys, cluster_name):
9091
submit_job.submit_job(PROJECT_ID, REGION, cluster_name)
9192
out, _ = capsys.readouterr()

dataproc/snippets/update_cluster_test.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import backoff
2222
from google.api_core.exceptions import (
23+
AlreadyExists,
2324
Cancelled,
2425
InternalServerError,
2526
InvalidArgument,
@@ -56,7 +57,7 @@ def cluster_client():
5657

5758

5859
@backoff.on_exception(backoff.expo, (ServiceUnavailable, InvalidArgument), max_tries=5)
59-
def setup_cluster(cluster_client):
60+
def setup(cluster_client):
6061
# Create the cluster.
6162
operation = cluster_client.create_cluster(
6263
request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER}
@@ -75,8 +76,28 @@ def teardown_cluster(cluster_client):
7576
}
7677
)
7778
operation.result()
78-
except NotFound:
79-
print("Cluster already deleted")
79+
80+
def teardown():
81+
try:
82+
operation = cluster_client.delete_cluster(
83+
request={
84+
"project_id": PROJECT_ID,
85+
"region": REGION,
86+
"cluster_name": CLUSTER_NAME,
87+
}
88+
)
89+
operation.result()
90+
except NotFound:
91+
print("Cluster already deleted")
92+
93+
try:
94+
setup()
95+
yield
96+
except AlreadyExists:
97+
print("Cluster exists, utilizing existing cluster")
98+
yield
99+
finally:
100+
teardown()
80101

81102

82103
@backoff.on_exception(

0 commit comments

Comments
 (0)