Skip to content

Commit 8bfa093

Browse files
nicainleahecole
andauthored
fix: changes default region for dataproc quickstart test to us-central2 (#9764)
* fix: changes default region for dataproc quickstart test to us-central2 * Update dataproc/snippets/quickstart/quickstart_test.py Co-authored-by: Leah E. Cole <[email protected]> * changing to regions with unlimited persistent disk quota * refactor test_submit_job to utilize backoff retries correctly, and set disk_config to smaller values for several tests * reverting to us-central1 --------- Co-authored-by: Leah E. Cole <[email protected]>
1 parent 460e5a3 commit 8bfa093

File tree

3 files changed

+52
-43
lines changed

3 files changed

+52
-43
lines changed

dataproc/snippets/submit_job_test.py

Lines changed: 48 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import uuid
1717

1818
import backoff
19-
from google.api_core.exceptions import (InternalServerError, InvalidArgument, NotFound,
19+
from google.api_core.exceptions import (InternalServerError, NotFound,
2020
ServiceUnavailable)
2121
from google.cloud import dataproc_v1 as dataproc
2222
import pytest
@@ -25,57 +25,66 @@
2525

2626
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
2727
REGION = "us-central1"
28-
CLUSTER_NAME = "py-sj-test-{}".format(str(uuid.uuid4()))
29-
CLUSTER = {
30-
"project_id": PROJECT_ID,
31-
"cluster_name": CLUSTER_NAME,
32-
"config": {
33-
"master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
34-
"worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
35-
},
36-
}
37-
38-
39-
@pytest.fixture(autouse=True)
40-
def setup_teardown():
41-
cluster_client = dataproc.ClusterControllerClient(
28+
29+
30+
@pytest.fixture(scope='module')
31+
def cluster_client():
32+
return dataproc.ClusterControllerClient(
4233
client_options={
4334
"api_endpoint": "{}-dataproc.googleapis.com:443".format(REGION)
4435
}
4536
)
4637

47-
# Retry on InvalidArgument subnetwork not ready error
48-
@backoff.on_exception(backoff.expo, (InvalidArgument), max_tries=3)
49-
def setup():
50-
# Create the cluster.
51-
operation = cluster_client.create_cluster(
52-
request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER}
38+
39+
@backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=5)
40+
def setup_cluster(cluster_client, curr_cluster_name):
41+
42+
CLUSTER = {
43+
"project_id": PROJECT_ID,
44+
"cluster_name": curr_cluster_name,
45+
"config": {
46+
"master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2", "disk_config": {"boot_disk_size_gb": 100}},
47+
"worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2", "disk_config": {"boot_disk_size_gb": 100}},
48+
},
49+
}
50+
51+
# Create the cluster.
52+
operation = cluster_client.create_cluster(
53+
request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER}
54+
)
55+
operation.result()
56+
57+
58+
@backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=5)
59+
def teardown_cluster(cluster_client, curr_cluster_name):
60+
try:
61+
operation = cluster_client.delete_cluster(
62+
request={
63+
"project_id": PROJECT_ID,
64+
"region": REGION,
65+
"cluster_name": curr_cluster_name,
66+
}
5367
)
5468
operation.result()
5569

56-
def teardown():
57-
try:
58-
operation = cluster_client.delete_cluster(
59-
request={
60-
"project_id": PROJECT_ID,
61-
"region": REGION,
62-
"cluster_name": CLUSTER_NAME,
63-
}
64-
)
65-
operation.result()
66-
67-
except NotFound:
68-
print("Cluster already deleted")
70+
except NotFound:
71+
print("Cluster already deleted")
72+
73+
74+
@pytest.fixture(scope='module')
75+
def cluster_name(cluster_client):
76+
curr_cluster_name = "py-sj-test-{}".format(str(uuid.uuid4()))
77+
6978
try:
70-
setup()
71-
yield
79+
setup_cluster(cluster_client, curr_cluster_name)
80+
yield curr_cluster_name
7281
finally:
73-
teardown()
82+
teardown_cluster(cluster_client, curr_cluster_name)
7483

7584

7685
@backoff.on_exception(backoff.expo, (InternalServerError, ServiceUnavailable), max_tries=5)
77-
def test_submit_job(capsys):
78-
submit_job.submit_job(PROJECT_ID, REGION, CLUSTER_NAME)
86+
def test_submit_job(capsys, cluster_name):
87+
submit_job.submit_job(PROJECT_ID, REGION, cluster_name)
7988
out, _ = capsys.readouterr()
8089

8190
assert "Job finished successfully" in out

dataproc/snippets/update_cluster_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@
4141
"project_id": PROJECT_ID,
4242
"cluster_name": CLUSTER_NAME,
4343
"config": {
44-
"master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
45-
"worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
44+
"master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2", "disk_config": {"boot_disk_size_gb": 100}},
45+
"worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2", "disk_config": {"boot_disk_size_gb": 100}},
4646
},
4747
}
4848

pubsublite/spark-connector/spark_streaming_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ def dataproc_cluster() -> Generator[dataproc_v1.Cluster, None, None]:
124124
"project_id": PROJECT_ID,
125125
"cluster_name": CLUSTER_ID,
126126
"config": {
127-
"master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
128-
"worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
127+
"master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2", "disk_config": {"boot_disk_size_gb": 100}},
128+
"worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2", "disk_config": {"boot_disk_size_gb": 100}},
129129
"config_bucket": BUCKET,
130130
"temp_bucket": BUCKET,
131131
"software_config": {"image_version": "2.0-debian10"},

0 commit comments

Comments
 (0)