diff --git a/dataproc/snippets/quickstart/quickstart.py b/dataproc/snippets/quickstart/quickstart.py index c9e73002665..68c3e205b9e 100644 --- a/dataproc/snippets/quickstart/quickstart.py +++ b/dataproc/snippets/quickstart/quickstart.py @@ -44,8 +44,8 @@ def quickstart(project_id, region, cluster_name, job_file_path): "project_id": project_id, "cluster_name": cluster_name, "config": { - "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"}, - "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"}, + "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2", "disk_config": {"boot_disk_size_gb": 100}}, + "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2", "disk_config": {"boot_disk_size_gb": 100}}, }, } diff --git a/dataproc/snippets/quickstart/quickstart_test.py b/dataproc/snippets/quickstart/quickstart_test.py index 47494cb10c7..db8dd854610 100644 --- a/dataproc/snippets/quickstart/quickstart_test.py +++ b/dataproc/snippets/quickstart/quickstart_test.py @@ -26,10 +26,8 @@ PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] REGION = "us-central1" -CLUSTER_NAME = "py-qs-test-{}".format(str(uuid.uuid4())) -STAGING_BUCKET = "py-dataproc-qs-bucket-{}".format(str(uuid.uuid4())) + JOB_FILE_NAME = "sum.py" -JOB_FILE_PATH = "gs://{}/{}".format(STAGING_BUCKET, JOB_FILE_NAME) SORT_CODE = ( "import pyspark\n" "sc = pyspark.SparkContext()\n" @@ -38,28 +36,43 @@ ) -@pytest.fixture(autouse=True) -def blob(): - storage_client = storage.Client() +@backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=5) +def delete_bucket(bucket): + bucket.delete() + + +@backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=5) +def delete_blob(blob): + blob.delete() - @backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=5) - def create_bucket(): - return storage_client.create_bucket(STAGING_BUCKET) - bucket = create_bucket() +@backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=5) +def upload_blob(bucket, contents): blob = bucket.blob(JOB_FILE_NAME) - blob.upload_from_string(SORT_CODE) + blob.upload_from_string(contents) + return blob - yield - blob.delete() - bucket.delete() +@backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=5) +def create_bucket(bucket_name): + storage_client = storage.Client() + return storage_client.create_bucket(bucket_name) -@pytest.fixture(autouse=True) -def cluster(): - yield +@pytest.fixture(scope="module") +def staging_bucket_name(): + bucket_name = "py-dataproc-qs-bucket-{}".format(str(uuid.uuid4())) + bucket = create_bucket(bucket_name) + blob = upload_blob(bucket, SORT_CODE) + try: + yield bucket_name + finally: + delete_blob(blob) + delete_bucket(bucket) + +@backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=5) +def verify_cluster_teardown(cluster_name): # The quickstart sample deletes the cluster, but if the test fails # before cluster deletion occurs, it can be manually deleted here. cluster_client = dataproc.ClusterControllerClient( @@ -69,23 +82,31 @@ def cluster(): clusters = cluster_client.list_clusters( request={"project_id": PROJECT_ID, "region": REGION} ) - for cluster in clusters: - if cluster.cluster_name == CLUSTER_NAME: + if cluster.cluster_name == cluster_name: cluster_client.delete_cluster( request={ "project_id": PROJECT_ID, "region": REGION, - "cluster_name": CLUSTER_NAME, + "cluster_name": cluster_name, } ) @backoff.on_exception(backoff.expo, InvalidArgument, max_tries=3) -def test_quickstart(capsys): - quickstart.quickstart(PROJECT_ID, REGION, CLUSTER_NAME, JOB_FILE_PATH) - out, _ = capsys.readouterr() - - assert "Cluster created successfully" in out - assert "Job finished successfully" in out - assert "successfully deleted" in out +def test_quickstart(capsys, staging_bucket_name): + cluster_name = "py-qs-test-{}".format(str(uuid.uuid4())) + try: + quickstart.quickstart( + PROJECT_ID, + REGION, + cluster_name, + "gs://{}/{}".format(staging_bucket_name, JOB_FILE_NAME) + ) + out, _ = capsys.readouterr() + + assert "Cluster created successfully" in out + assert "Job finished successfully" in out + assert "successfully deleted" in out + finally: + verify_cluster_teardown(cluster_name)