26
26
27
27
PROJECT_ID = os .environ ["GOOGLE_CLOUD_PROJECT" ]
28
28
REGION = "us-central1"
29
- CLUSTER_NAME = "py-qs-test-{}" .format (str (uuid .uuid4 ()))
30
- STAGING_BUCKET = "py-dataproc-qs-bucket-{}" .format (str (uuid .uuid4 ()))
29
+
31
30
JOB_FILE_NAME = "sum.py"
32
- JOB_FILE_PATH = "gs://{}/{}" .format (STAGING_BUCKET , JOB_FILE_NAME )
33
31
SORT_CODE = (
34
32
"import pyspark\n "
35
33
"sc = pyspark.SparkContext()\n "
38
36
)
39
37
40
38
41
- @pytest .fixture (autouse = True )
42
- def blob ():
43
- storage_client = storage .Client ()
39
+ @backoff .on_exception (backoff .expo , ServiceUnavailable , max_tries = 5 )
40
+ def delete_bucket (bucket ):
41
+ bucket .delete ()
42
+
43
+
44
+ @backoff .on_exception (backoff .expo , ServiceUnavailable , max_tries = 5 )
45
+ def delete_blob (blob ):
46
+ blob .delete ()
44
47
45
- @backoff .on_exception (backoff .expo , ServiceUnavailable , max_tries = 5 )
46
- def create_bucket ():
47
- return storage_client .create_bucket (STAGING_BUCKET )
48
48
49
- bucket = create_bucket ()
49
+ @backoff .on_exception (backoff .expo , ServiceUnavailable , max_tries = 5 )
50
+ def upload_blob (bucket , contents ):
50
51
blob = bucket .blob (JOB_FILE_NAME )
51
- blob .upload_from_string (SORT_CODE )
52
+ blob .upload_from_string (contents )
53
+ return blob
52
54
53
- yield
54
55
55
- blob .delete ()
56
- bucket .delete ()
56
+ @backoff .on_exception (backoff .expo , ServiceUnavailable , max_tries = 5 )
57
+ def create_bucket (bucket_name ):
58
+ storage_client = storage .Client ()
59
+ return storage_client .create_bucket (bucket_name )
57
60
58
61
59
- @pytest .fixture (autouse = True )
60
- def cluster ():
61
- yield
62
+ @pytest .fixture (scope = "module" )
63
+ def staging_bucket_name ():
64
+ bucket_name = "py-dataproc-qs-bucket-{}" .format (str (uuid .uuid4 ()))
65
+ bucket = create_bucket (bucket_name )
66
+ blob = upload_blob (bucket , SORT_CODE )
67
+ try :
68
+ yield bucket_name
69
+ finally :
70
+ delete_blob (blob )
71
+ delete_bucket (bucket )
62
72
73
+
74
+ @backoff .on_exception (backoff .expo , ServiceUnavailable , max_tries = 5 )
75
+ def verify_cluster_teardown (cluster_name ):
63
76
# The quickstart sample deletes the cluster, but if the test fails
64
77
# before cluster deletion occurs, it can be manually deleted here.
65
78
cluster_client = dataproc .ClusterControllerClient (
@@ -69,23 +82,31 @@ def cluster():
69
82
clusters = cluster_client .list_clusters (
70
83
request = {"project_id" : PROJECT_ID , "region" : REGION }
71
84
)
72
-
73
85
for cluster in clusters :
74
- if cluster .cluster_name == CLUSTER_NAME :
86
+ if cluster .cluster_name == cluster_name :
75
87
cluster_client .delete_cluster (
76
88
request = {
77
89
"project_id" : PROJECT_ID ,
78
90
"region" : REGION ,
79
- "cluster_name" : CLUSTER_NAME ,
91
+ "cluster_name" : cluster_name ,
80
92
}
81
93
)
82
94
83
95
84
96
@backoff .on_exception (backoff .expo , InvalidArgument , max_tries = 3 )
85
- def test_quickstart (capsys ):
86
- quickstart .quickstart (PROJECT_ID , REGION , CLUSTER_NAME , JOB_FILE_PATH )
87
- out , _ = capsys .readouterr ()
88
-
89
- assert "Cluster created successfully" in out
90
- assert "Job finished successfully" in out
91
- assert "successfully deleted" in out
97
+ def test_quickstart (capsys , staging_bucket_name ):
98
+ cluster_name = "py-qs-test-{}" .format (str (uuid .uuid4 ()))
99
+ try :
100
+ quickstart .quickstart (
101
+ PROJECT_ID ,
102
+ REGION ,
103
+ cluster_name ,
104
+ "gs://{}/{}" .format (staging_bucket_name , JOB_FILE_NAME )
105
+ )
106
+ out , _ = capsys .readouterr ()
107
+
108
+ assert "Cluster created successfully" in out
109
+ assert "Job finished successfully" in out
110
+ assert "successfully deleted" in out
111
+ finally :
112
+ verify_cluster_teardown (cluster_name )
0 commit comments