Skip to content

fix: add more dataproc error handling #9826

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 1, 2023
21 changes: 14 additions & 7 deletions dataproc/snippets/create_cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import uuid

import backoff
from google.api_core.exceptions import (InternalServerError, InvalidArgument, NotFound,
from google.api_core.exceptions import (AlreadyExists, InternalServerError, InvalidArgument, NotFound,
ServiceUnavailable)
from google.cloud import dataproc_v1 as dataproc

Expand All @@ -27,11 +27,13 @@
CLUSTER_NAME = "py-cc-test-{}".format(str(uuid.uuid4()))


cluster_client = dataproc.ClusterControllerClient(
client_options={"api_endpoint": f"{REGION}-dataproc.googleapis.com:443"}
)


@backoff.on_exception(backoff.expo, (Exception), max_tries=5)
def teardown():
cluster_client = dataproc.ClusterControllerClient(
client_options={"api_endpoint": f"{REGION}-dataproc.googleapis.com:443"}
)
# Client library function
try:
operation = cluster_client.delete_cluster(
Expand All @@ -53,8 +55,13 @@ def test_cluster_create(capsys):
# Wrapper function for client library function
try:
create_cluster.create_cluster(PROJECT_ID, REGION, CLUSTER_NAME)
out, _ = capsys.readouterr()
assert CLUSTER_NAME in out
except AlreadyExists:
request = dataproc.GetClusterRequest(project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME)
response = cluster_client.get_cluster(request=request)
assert response.status.state == dataproc.ClusterStatus.State.RUNNING # verify the cluster is in the RUNNING state
out, _ = capsys.readouterr()
assert CLUSTER_NAME in out
finally:
teardown()

out, _ = capsys.readouterr()
assert CLUSTER_NAME in out
7 changes: 4 additions & 3 deletions dataproc/snippets/submit_job_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import uuid

import backoff
from google.api_core.exceptions import (AlreadyExists, InternalServerError, NotFound,
from google.api_core.exceptions import (AlreadyExists, InternalServerError, InvalidArgument, NotFound,
ServiceUnavailable)
from google.cloud import dataproc_v1 as dataproc
import pytest
Expand All @@ -36,7 +36,7 @@ def cluster_client():
)


@backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=5)
@backoff.on_exception(backoff.expo, (ServiceUnavailable, InvalidArgument), max_tries=5)
def setup_cluster(cluster_client, curr_cluster_name):

CLUSTER = {
Expand Down Expand Up @@ -85,7 +85,8 @@ def cluster_name(cluster_client):
teardown_cluster(cluster_client, curr_cluster_name)


@backoff.on_exception(backoff.expo, (InternalServerError, ServiceUnavailable), max_tries=5)
# InvalidArgument is thrown when the subnetwork is not ready
@backoff.on_exception(backoff.expo, (InvalidArgument, InternalServerError, ServiceUnavailable), max_tries=5)
def test_submit_job(capsys, cluster_name):
submit_job.submit_job(PROJECT_ID, REGION, cluster_name)
out, _ = capsys.readouterr()
Expand Down
14 changes: 9 additions & 5 deletions dataproc/snippets/update_cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import backoff
from google.api_core.exceptions import (
AlreadyExists,
Cancelled,
InternalServerError,
InvalidArgument,
Expand Down Expand Up @@ -57,11 +58,14 @@ def cluster_client():

@backoff.on_exception(backoff.expo, (ServiceUnavailable, InvalidArgument), max_tries=5)
def setup_cluster(cluster_client):
# Create the cluster.
operation = cluster_client.create_cluster(
request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER}
)
operation.result()
try:
# Create the cluster.
operation = cluster_client.create_cluster(
request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER}
)
operation.result()
except AlreadyExists:
print("Cluster already exists, utilize existing cluster")


@backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=5)
Expand Down