Skip to content

Commit 238dbf2

Browse files
authored
dataflow: improve waiting functions (#7323)
## Description Fixes #7175, #7377 Improves the waiting functions and makes the streaming sample to wait until the output table is created. Also replacing `subprocess.run` with `subprocess.check_call` to have better error messages. Related to #7140 and #7180 ## Checklist - [ ] I have followed [Sample Guidelines from AUTHORING_GUIDE.MD](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md) - [ ] README is updated to include [all relevant information](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#readme-file) - [ ] **Tests** pass: `nox -s py-3.6` (see [Test Environment Setup](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#test-environment-setup)) - [ ] **Lint** pass: `nox -s lint` (see [Test Environment Setup](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#test-environment-setup)) - [ ] These samples need a new **API enabled** in testing projects to pass (let us know which ones) - [ ] These samples need a new/updated **env vars** in testing projects set to pass (let us know which ones) - [ ] Please **merge** this PR for me once it is approved. - [ ] This sample adds a new sample directory, and I updated the [CODEOWNERS file](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/.github/CODEOWNERS) with the codeowners for this sample
1 parent 9a97472 commit 238dbf2

File tree

4 files changed

+134
-132
lines changed

4 files changed

+134
-132
lines changed

dataflow/conftest.py

Lines changed: 119 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@
1919
import platform
2020
import re
2121
import subprocess
22-
import sys
2322
import time
24-
from typing import Any, Callable, Dict, Iterable, Optional
23+
from typing import Any, Callable, Dict, Iterable, Optional, Set
2524
import uuid
2625

2726
import pytest
@@ -33,6 +32,7 @@
3332

3433
TIMEOUT_SEC = 30 * 60 # 30 minutes in seconds
3534
POLL_INTERVAL_SEC = 60 # 1 minute in seconds
35+
LIST_PAGE_SIZE = 100
3636

3737
HYPHEN_NAME_RE = re.compile(r"[^\w\d-]+")
3838
UNDERSCORE_NAME_RE = re.compile(r"[^\w\d_]+")
@@ -55,6 +55,18 @@ def hyphen_name(name: str) -> str:
5555
def underscore_name(name: str) -> str:
5656
return UNDERSCORE_NAME_RE.sub("_", Utils.hyphen_name(name))
5757

58+
@staticmethod
59+
def wait_until(
60+
is_done: Callable[[], bool],
61+
timeout_sec: int = TIMEOUT_SEC,
62+
poll_interval_sec: int = POLL_INTERVAL_SEC,
63+
) -> bool:
64+
for _ in range(0, timeout_sec, poll_interval_sec):
65+
if is_done():
66+
return True
67+
time.sleep(poll_interval_sec)
68+
return False
69+
5870
@staticmethod
5971
def storage_bucket(name: str) -> str:
6072
from google.cloud import storage
@@ -84,25 +96,40 @@ def bigquery_dataset(name: str, project: str = PROJECT) -> str:
8496

8597
bigquery_client = bigquery.Client()
8698

99+
dataset_name = Utils.underscore_name(name)
87100
dataset = bigquery_client.create_dataset(
88-
bigquery.Dataset(f"{project}.{Utils.underscore_name(name)}")
101+
bigquery.Dataset(f"{project}.{dataset_name}")
89102
)
90103

91104
logging.info(f"Created bigquery_dataset: {dataset.full_dataset_id}")
92-
yield dataset.full_dataset_id
105+
yield dataset_name
93106

94107
bigquery_client.delete_dataset(
95-
dataset.full_dataset_id.replace(":", "."), delete_contents=True
108+
f"{project}.{dataset_name}", delete_contents=True
96109
)
97110
logging.info(f"Deleted bigquery_dataset: {dataset.full_dataset_id}")
98111

99112
@staticmethod
100-
def bigquery_query(query: str) -> Iterable[Dict[str, Any]]:
113+
def bigquery_table_exists(
114+
dataset_name: str, table_name: str, project: str = PROJECT
115+
) -> bool:
116+
from google.cloud import bigquery
117+
from google.cloud.exceptions import NotFound
118+
119+
bigquery_client = bigquery.Client()
120+
try:
121+
bigquery_client.get_table(f"{project}.{dataset_name}.{table_name}")
122+
return True
123+
except NotFound:
124+
return False
125+
126+
@staticmethod
127+
def bigquery_query(query: str, region: str = REGION) -> Iterable[Dict[str, Any]]:
101128
from google.cloud import bigquery
102129

103130
bigquery_client = bigquery.Client()
104131
logging.info(f"Bigquery query: {query}")
105-
for row in bigquery_client.query(query):
132+
for row in bigquery_client.query(query, location=region):
106133
yield dict(row)
107134

108135
@staticmethod
@@ -122,7 +149,7 @@ def pubsub_topic(name: str, project: str = PROJECT) -> str:
122149
# https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4492
123150
cmd = ["gcloud", "pubsub", "--project", project, "topics", "delete", topic.name]
124151
logging.info(f"{cmd}")
125-
subprocess.run(cmd, check=True)
152+
subprocess.check_call(cmd)
126153
logging.info(f"Deleted pubsub_topic: {topic.name}")
127154

128155
@staticmethod
@@ -156,7 +183,7 @@ def pubsub_subscription(
156183
subscription.name,
157184
]
158185
logging.info(f"{cmd}")
159-
subprocess.run(cmd, check=True)
186+
subprocess.check_call(cmd)
160187
logging.info(f"Deleted pubsub_subscription: {subscription.name}")
161188

162189
@staticmethod
@@ -207,7 +234,7 @@ def cloud_build_submit(
207234
"""Sends a Cloud Build job, if an image_name is provided it will be deleted at teardown."""
208235
cmd = ["gcloud", "auth", "configure-docker"]
209236
logging.info(f"{cmd}")
210-
subprocess.run(cmd, check=True)
237+
subprocess.check_call(cmd)
211238

212239
if substitutions:
213240
cmd_substitutions = [
@@ -229,7 +256,7 @@ def cloud_build_submit(
229256
source,
230257
]
231258
logging.info(f"{cmd}")
232-
subprocess.run(cmd, check=True)
259+
subprocess.check_call(cmd)
233260
logging.info(f"Cloud build finished successfully: {config}")
234261
yield f.read()
235262
except Exception as e:
@@ -247,7 +274,7 @@ def cloud_build_submit(
247274
source,
248275
]
249276
logging.info(f"{cmd}")
250-
subprocess.run(cmd, check=True)
277+
subprocess.check_call(cmd)
251278
logging.info(f"Created image: gcr.io/{project}/{image_name}:{UUID}")
252279
yield f"{image_name}:{UUID}"
253280
else:
@@ -265,9 +292,17 @@ def cloud_build_submit(
265292
"--quiet",
266293
]
267294
logging.info(f"{cmd}")
268-
subprocess.run(cmd, check=True)
295+
subprocess.check_call(cmd)
269296
logging.info(f"Deleted image: gcr.io/{project}/{image_name}:{UUID}")
270297

298+
@staticmethod
299+
def dataflow_job_url(
300+
job_id: str,
301+
project: str = PROJECT,
302+
region: str = REGION,
303+
) -> str:
304+
return f"https://console.cloud.google.com/dataflow/jobs/{region}/{job_id}?project={project}"
305+
271306
@staticmethod
272307
def dataflow_jobs_list(
273308
project: str = PROJECT, page_size: int = 30
@@ -294,103 +329,83 @@ def dataflow_jobs_list(
294329
yield job
295330

296331
@staticmethod
297-
def dataflow_jobs_get(
298-
job_id: Optional[str] = None,
299-
job_name: Optional[str] = None,
300-
project: str = PROJECT,
301-
list_page_size: int = 30,
302-
) -> Optional[Dict[str, Any]]:
332+
def dataflow_job_id(
333+
job_name: str, project: str = PROJECT, list_page_size: int = LIST_PAGE_SIZE
334+
) -> str:
335+
for job in Utils.dataflow_jobs_list(project, list_page_size):
336+
if job["name"] == job_name:
337+
logging.info(f"Found Dataflow job: {job}")
338+
return job["id"]
339+
raise ValueError(f"Dataflow job not found: job_name={job_name}")
340+
341+
@staticmethod
342+
def dataflow_jobs_get(job_id: str, project: str = PROJECT) -> Dict[str, Any]:
303343
from googleapiclient.discovery import build
304344

305345
dataflow = build("dataflow", "v1b3")
306346

307-
if job_id:
308-
# For more info see:
309-
# https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/get
310-
request = (
311-
dataflow.projects()
312-
.jobs()
313-
.get(
314-
projectId=project,
315-
jobId=job_id,
316-
view="JOB_VIEW_SUMMARY",
317-
)
347+
# For more info see:
348+
# https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/get
349+
request = (
350+
dataflow.projects()
351+
.jobs()
352+
.get(
353+
projectId=project,
354+
jobId=job_id,
355+
view="JOB_VIEW_SUMMARY",
318356
)
319-
# If the job is not found, this throws an HttpError exception.
320-
job = request.execute()
321-
logging.info(f"Found Dataflow job: {job}")
322-
return job
323-
324-
elif job_name:
325-
for job in Utils.dataflow_jobs_list(project, list_page_size):
326-
if job["name"] == job_name:
327-
logging.info(f"Found Dataflow job: {job}")
328-
return job
329-
raise ValueError(f"Dataflow job not found: job_name={job_name}")
330-
331-
else:
332-
raise ValueError("must specify either `job_id` or `job_name`")
357+
)
358+
# If the job is not found, this throws an HttpError exception.
359+
return request.execute()
333360

334361
@staticmethod
335362
def dataflow_jobs_wait(
336-
job_id: Optional[str] = None,
337-
job_name: Optional[str] = None,
363+
job_id: str = None,
364+
job_name: str = None,
338365
project: str = PROJECT,
339366
region: str = REGION,
340-
until_status: str = "JOB_STATE_DONE",
341-
list_page_size: int = 100,
367+
target_states: Set[str] = {"JOB_STATE_DONE"},
368+
list_page_size: int = LIST_PAGE_SIZE,
342369
timeout_sec: str = TIMEOUT_SEC,
343370
poll_interval_sec: int = POLL_INTERVAL_SEC,
344371
) -> Optional[str]:
345372
"""For a list of all the valid states:
346373
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
347374
"""
348375

349-
# Wait until we reach the desired status, or the job finished in some way.
350-
target_status = {
351-
until_status,
376+
assert job_id or job_name, "required to pass either a job_id or a job_name"
377+
if not job_id:
378+
job_id = Utils.dataflow_job_id(job_name, project, list_page_size)
379+
380+
finish_states = {
352381
"JOB_STATE_DONE",
353382
"JOB_STATE_FAILED",
354383
"JOB_STATE_CANCELLED",
355384
"JOB_STATE_DRAINED",
356385
}
357386
logging.info(
358-
f"Waiting for Dataflow job until {target_status}: job_id={job_id}, job_name={job_name}"
387+
f"Waiting for Dataflow job {job_id} until {target_states}\n"
388+
+ Utils.dataflow_job_url(job_id, project, region)
359389
)
360-
status = None
361-
for _ in range(0, timeout_sec, poll_interval_sec):
390+
391+
def job_is_done() -> bool:
362392
try:
363-
job = Utils.dataflow_jobs_get(
364-
job_id=job_id,
365-
job_name=job_name,
366-
project=project,
367-
list_page_size=list_page_size,
368-
)
369-
status = job["currentState"]
370-
if status in target_status:
371-
logging.info(
372-
f"Job status {status} in {target_status}, done waiting"
373-
)
374-
return status
375-
elif status == "JOB_STATE_FAILED":
393+
job = Utils.dataflow_jobs_get(job_id, project)
394+
state = job["currentState"]
395+
if state in target_states:
396+
logging.info(f"Dataflow job found with state {state}")
397+
return True
398+
elif state in finish_states:
376399
raise RuntimeError(
377-
"Dataflow job failed:\n"
378-
f"https://console.cloud.google.com/dataflow/jobs/{region}/{job_id}?project={project}"
400+
f"Dataflow job finished with state {state}, but we were expecting {target_states}\n"
401+
+ Utils.dataflow_job_url(job_id, project, region)
379402
)
380-
logging.info(
381-
f"Job status {status} not in {target_status}, retrying in {poll_interval_sec} seconds"
382-
)
403+
return False
383404
except Exception as e:
384405
logging.exception(e)
385-
time.sleep(poll_interval_sec)
386-
if status is None:
387-
raise RuntimeError(
388-
f"Dataflow job not found: timeout_sec={timeout_sec}, target_status={target_status}, job_id={job_id}, job_name={job_name}"
389-
)
390-
else:
391-
raise RuntimeError(
392-
f"Dataflow job finished in status {status} but expected {target_status}: job_id={job_id}, job_name={job_name}"
393-
)
406+
return False
407+
408+
Utils.wait_until(job_is_done, timeout_sec, poll_interval_sec)
394409

395410
@staticmethod
396411
def dataflow_jobs_cancel(
@@ -416,10 +431,20 @@ def dataflow_jobs_cancel(
416431
f"--region={region}",
417432
]
418433
logging.info(f"{cmd}")
419-
subprocess.run(cmd, check=True)
434+
subprocess.check_call(cmd)
420435

421436
# After draining the job, we must wait until the job has actually finished.
422-
Utils.dataflow_jobs_wait(job_id, project=project, region=region)
437+
Utils.dataflow_jobs_wait(
438+
job_id,
439+
target_states={
440+
"JOB_STATE_DONE",
441+
"JOB_STATE_FAILED",
442+
"JOB_STATE_CANCELLED",
443+
"JOB_STATE_DRAINED",
444+
},
445+
project=project,
446+
region=region,
447+
)
423448

424449
else:
425450
# https://cloud.google.com/sdk/gcloud/reference/dataflow/jobs/cancel
@@ -433,7 +458,7 @@ def dataflow_jobs_cancel(
433458
f"--region={region}",
434459
]
435460
logging.info(f"{cmd}")
436-
subprocess.run(cmd, check=True)
461+
subprocess.check_call(cmd)
437462

438463
logging.info(f"Cancelled Dataflow job: {job_id}")
439464

@@ -459,7 +484,7 @@ def dataflow_flex_template_build(
459484
f"--metadata-file={metadata_file}",
460485
]
461486
logging.info(f"{cmd}")
462-
subprocess.run(cmd, check=True)
487+
subprocess.check_call(cmd)
463488

464489
logging.info(f"dataflow_flex_template_build: {template_gcs_path}")
465490
yield template_gcs_path
@@ -497,32 +522,19 @@ def dataflow_flex_template_run(
497522
]
498523
logging.info(f"{cmd}")
499524

500-
try:
501-
# The `capture_output` option was added in Python 3.7, so we must
502-
# pass the `stdout` and `stderr` options explicitly to support 3.6.
503-
# https://docs.python.org/3/library/subprocess.html#subprocess.run
504-
p = subprocess.run(
505-
cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
506-
)
507-
stdout = p.stdout.decode("utf-8")
508-
stderr = p.stderr.decode("utf-8")
509-
logging.info(f"Launched Dataflow Flex Template job: {unique_job_name}")
510-
except subprocess.CalledProcessError as e:
511-
logging.info(e, file=sys.stderr)
512-
stdout = e.stdout.decode("utf-8")
513-
stderr = e.stderr.decode("utf-8")
514-
finally:
515-
logging.info("--- stderr ---")
516-
logging.info(stderr)
517-
logging.info("--- stdout ---")
518-
logging.info(stdout)
519-
logging.info("--- end ---")
520-
return yaml.safe_load(stdout)["job"]["id"]
525+
stdout = subprocess.check_output(cmd).decode("utf-8")
526+
logging.info(f"Launched Dataflow Flex Template job: {unique_job_name}")
527+
job_id = yaml.safe_load(stdout)["job"]["id"]
528+
logging.info(f"Dataflow Flex Template job id: {job_id}")
529+
logging.info(f">> {Utils.dataflow_job_url(job_id, project, region)}")
530+
yield job_id
531+
532+
Utils.dataflow_jobs_cancel(job_id)
521533

522534

523535
@pytest.fixture(scope="session")
524536
def utils() -> Utils:
525537
logging.getLogger().setLevel(logging.INFO)
526538
logging.info(f"Test unique identifier: {UUID}")
527-
subprocess.run(["gcloud", "version"])
539+
subprocess.check_call(["gcloud", "version"])
528540
return Utils()

0 commit comments

Comments
 (0)