diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000..c1293afc00 --- /dev/null +++ b/.gitignore @@ -0,0 +1,85 @@ +# NOTE: if you modify this file, you probably need to modify the file set that +# is an input to 'maven-assembly-plugin' that generates source distribution. +# This is typically in files named 'src.xml' throughout this repository. + +# Ignore any offline repositories the user may have created. +**/offline-repository/**/* + +# Ignore files generated by the Gradle build process. +**/.gradle/**/* +**/.gogradle/**/* +**/gogradle.lock +**/build/**/* +sdks/**/vendor/**/* +runners/**/vendor/**/* +**/.gradletasknamecache + +# Ignore files generated by the Maven build process. +**/bin/**/* +**/dependency-reduced-pom.xml +**/target/**/* + +# Ignore generated archetypes +sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/ +sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/ + +# Ignore files generated by the Python build process. +**/*.pyc +**/*.pyo +**/*.pyd +**/*.egg-info/ +**/.eggs/ +**/nose-*.egg/ +**/.tox/**/* +**/dist/**/* +**/distribute-*/**/* +**/env/**/* +sdks/python/**/*.c +sdks/python/**/*.so +sdks/python/**/*.egg +sdks/python/LICENSE +sdks/python/NOTICE +sdks/python/README.md +sdks/python/apache_beam/portability/api/*pb2*.* +sdks/python/nosetests.xml + +# Igonore data files +**/*data/ +**/data/ + +# Ignore IntelliJ files. +**/.idea/**/* +**/*.iml +**/*.ipr +**/*.iws +**/out/**/* + +# Ignore Eclipse files. +**/.classpath +**/.project +**/.factorypath +**/.checkstyle +**/.fbExcludeFilterFile +**/.apt_generated/**/* +**/.settings/**/* + +# Ignore Visual Studio Code files. +**/.vscode/**/* + +# Hotspot VM leaves this log in a non-target directory when java crashes +**/hs_err_pid*.log + +# Ignore files that end with '~', since they are most likely auto-save files +# produced by a text editor. +**/*~ + +# Ignore MacOSX files. +**/.DS_Store/**/* +**/.DS_Store + +# Ignore Jupyter notebook checkpoints. +**/.ipynb_checkpoints/**/* + +# NOTE: if you modify this file, you probably need to modify the file set that +# is an input to 'maven-assembly-plugin' that generates source distribution. +# This is typically in files named 'src.xml' throughout this repository. diff --git a/examples/chicago_taxi/custom_requirements.txt b/examples/chicago_taxi/custom_requirements.txt new file mode 100644 index 0000000000..399fec0735 --- /dev/null +++ b/examples/chicago_taxi/custom_requirements.txt @@ -0,0 +1,114 @@ +absl-py==0.6.1 +apache-beam==2.10.0.dev0 +astor==0.7.1 +avro==1.8.2 +backports-abc==0.5 +backports.shutil-get-terminal-size==1.0.0 +backports.weakref==1.0.post1 +bleach==3.0.2 +cachetools==3.0.0 +certifi==2018.10.15 +chardet==3.0.4 +configparser==3.5.0 +crcmod==1.7 +decorator==4.3.0 +defusedxml==0.5.0 +dill==0.2.8.2 +docopt==0.6.2 +entrypoints==0.2.3 +enum34==1.1.6 +fastavro==0.21.13 +fasteners==0.14.1 +funcsigs==1.0.2 +functools32==3.2.3.post2 +future==0.17.1 +futures==3.2.0 +gast==0.2.0 +google-api-core==1.5.2 +google-apitools==0.5.24 +google-auth==1.6.1 +google-auth-httplib2==0.0.3 +google-cloud-bigquery==1.6.0 +google-cloud-core==0.28.1 +google-cloud-dataproc==0.2.0 +google-cloud-pubsub==0.35.4 +google-resumable-media==0.3.1 +googleapis-common-protos==1.5.3 +googledatastore==7.0.1 +grpc==0.3.post19 +grpc-google-iam-v1==0.11.1 +grpcio==1.8.6 +hdfs==2.1.0 +httplib2==0.9.2 +idna==2.7 +ipaddress==1.0.22 +ipykernel==4.10.0 +ipython==5.8.0 +ipython-genutils==0.2.0 +ipywidgets==7.4.2 +Jinja2==2.10 +jsonschema==2.6.0 +jupyter==1.0.0 +jupyter-client==5.2.3 +jupyter-console==5.2.0 +jupyter-core==4.4.0 +Markdown==3.0.1 +MarkupSafe==1.1.0 +mistune==0.8.4 +mock==2.0.0 +monotonic==1.5 +msgpack-python==0.5.6 +nbconvert==5.4.0 +nbformat==4.4.0 +notebook==5.7.0 +numpy==1.13.3 +oauth2client==3.0.0 +pandas==0.23.4 +pandocfilters==1.4.2 +pathlib2==2.3.2 +pbr==5.1.1 +pexpect==4.6.0 +pickleshare==0.7.5 +prometheus-client==0.4.2 +prompt-toolkit==1.0.15 +proto-google-cloud-datastore-v1==0.90.4 +proto-google-cloud-pubsub-v1==0.15.4 +protobuf==3.6.0 +ptyprocess==0.6.0 +pyasn1==0.4.4 +pyasn1-modules==0.2.2 +pydot==1.2.4 +Pygments==2.2.0 +pyparsing==2.3.0 +python-dateutil==2.7.5 +pytz==2018.4 +PyVCF==0.6.8 +PyYAML==3.13 +pyzmq==17.1.2 +qtconsole==4.4.3 +requests==2.20.1 +rsa==4.0 +scandir==1.9.0 +Send2Trash==1.5.0 +simplegeneric==0.8.1 +singledispatch==3.4.0.3 +six==1.11.0 +tensorboard==1.9.0 +tensorflow==1.9.0 +tensorflow-data-validation==0.9.0 +tensorflow-metadata==0.9.0 +tensorflow-model-analysis==0.9.2 +tensorflow-serving-api==1.9.0 +tensorflow-transform==0.9.0 +termcolor==1.1.0 +terminado==0.8.1 +testpath==0.4.2 +tfx-chicago-taxi==0.9.2 +tornado==5.1.1 +traitlets==4.3.2 +typing==3.6.6 +urllib3==1.24.1 +wcwidth==0.1.7 +webencodings==0.5.1 +Werkzeug==0.14.1 +widgetsnbextension==3.4.2 diff --git a/examples/chicago_taxi/dataproc/create_cluster.sh b/examples/chicago_taxi/dataproc/create_cluster.sh new file mode 100755 index 0000000000..538c9ba5a3 --- /dev/null +++ b/examples/chicago_taxi/dataproc/create_cluster.sh @@ -0,0 +1,136 @@ +#!/usr/bin/env bash + +#set -x +set -e +USER=`whoami` +CLUSTER_NAME="$USER-flink-156" +NUM_WORKERS=2 +FLINK_VERSION=1.5.6 +WORK_DIR="gs://clouddfe-$USER/tmp" +CLOUD_WORKER_IMAGE="gcr.io/dataflow-build/$USER/beam_fnapi_python:latest" +TASK_MANAGER_MEM=10240 +FLINK_LOCAL_PORT=8081 +TASK_MANAGER_SLOTS=1 +DATAPROC_VERSION=1.2 + +MASTER_NAME="$CLUSTER_NAME-m" +FLINK_INIT="$WORK_DIR/flink/flink-init-dataproc.sh" +DOCKER_INIT="$WORK_DIR/flink/docker-init.sh" +LOCAL_WORKER_IMAGE="$USER-docker-apache.bintray.io/beam/python:latest" +FLINK_DOWNLOAD_URL="http://archive.apache.org/dist/flink/flink-$FLINK_VERSION/flink-$FLINK_VERSION-bin-hadoop28-scala_2.11.tgz" + +YARN_APPLICATION="" +YARN_APPLICATION_MASTER="" + + +function is_master() { + local role="$(/usr/share/google/get_metadata_value attributes/dataproc-role)" + if [[ "$role" == 'Master' ]] ; then + true + else + false + fi +} + +function get_leader() { + local i=0 + local -A application_ids + local -A application_masters + #gcloud compute ssh yarn@$MASTER_NAME --command="yarn application -list" | grep "$CLUSTER_NAME" + echo "Yarn Applications" + while read line; do + echo $line + application_ids[$i]=`echo $line | sed "s/ .*//"` + application_masters[$i]=`echo $line | sed "s/.*$CLUSTER_NAME/$CLUSTER_NAME/" | sed "s/ .*//"` + i=$((i+1)) + done <<< $(gcloud compute ssh yarn@$MASTER_NAME --command="yarn application -list" | grep "$CLUSTER_NAME") + + if [ $i != 1 ]; then + echo "Multiple applications found. Make sure that only 1 application is running on the cluster." + for app in ${application_ids[*]}; + do + echo $app + done + + echo "Execute 'gcloud compute ssh yarn@$MASTER_NAME --command=\"yarn application -kill \"' to kill the yarn application." + exit 1 + fi + + YARN_APPLICATION=${application_ids[0]} + YARN_APPLICATION_MASTER=${application_masters[0]} + echo "Using Yarn Application $YARN_APPLICATION $YARN_APPLICATION_MASTER" +} + +function upload_worker_image() { + echo "Tagging worker image $LOCAL_WORKER_IMAGE to $CLOUD_WORKER_IMAGE" + docker tag $LOCAL_WORKER_IMAGE $CLOUD_WORKER_IMAGE + echo "Pushing worker image $CLOUD_WORKER_IMAGE to GCR" + docker push $CLOUD_WORKER_IMAGE +} + +function pull_worker_image() { + echo "Pulling worker image $CLOUD_WORKER_IMAGE on workers $(gcloud compute instances list | sed "s/ .*//" | grep "^\($CLUSTER_NAME-m$\|$CLUSTER_NAME-w-[a-zA-Z0-9]*$\)")" + gcloud compute instances list | sed "s/ .*//" | grep "^\($CLUSTER_NAME-m$\|$CLUSTER_NAME-w-[a-zA-Z0-9]*$\)" | xargs -I INSTANCE -P 100 gcloud compute ssh yarn@INSTANCE --command="docker pull $CLOUD_WORKER_IMAGE" +} + +function start_yarn_application() { + echo "Starting yarn application on $MASTER_NAME" + execute_on_master "/usr/lib/flink/bin/yarn-session.sh -n $NUM_WORKERS -tm $TASK_MANAGER_MEM -s $TASK_MANAGER_SLOTS -d -nm flink_yarn" +} + +function execute_on_master() { + gcloud compute ssh yarn@$MASTER_NAME --command="$1" +} + +function upload_resources() { + local TMP_FOLDER=`mktemp -d -t flink_tmp_XXXX` + + echo "Downloading flink at $TMP_FOLDER" + wget -P $TMP_FOLDER $FLINK_DOWNLOAD_URL + + echo "Uploading resources to GCS $WORK_DIR" + cp ./create_cluster.sh $TMP_FOLDER + cp ./docker-init.sh $TMP_FOLDER + cp ./flink-init-dataproc.sh $TMP_FOLDER + + gsutil cp -r $TMP_FOLDER/* $WORK_DIR/flink + + rm -r $TMP_FOLDER +} + +function start_tunnel() { + local job_server_config=`execute_on_master "curl -s \"http://$YARN_APPLICATION_MASTER/jobmanager/config\""` + local key="jobmanager.rpc.port" + local yarn_application_master_host=`echo $YARN_APPLICATION_MASTER | cut -d ":" -f1` + + jobmanager_rpc_port=`echo $job_server_config | python -c "import sys, json; print [ e['value'] for e in json.load(sys.stdin) if e['key'] == u'$key'][0]"` + local tunnel_command="gcloud compute ssh $MASTER_NAME -- -L $FLINK_LOCAL_PORT:$YARN_APPLICATION_MASTER -L $jobmanager_rpc_port:$yarn_application_master_host:$jobmanager_rpc_port -D 1080" + local kill_command="gcloud compute ssh yarn@$MASTER_NAME --command=\"yarn application -kill $YARN_APPLICATION\"" + echo "===================Closing the shell does not stop the yarn application===================" + echo "Execute \"$kill_command\" to kill the yarn application." + echo "Starting tunnel \"$tunnel_command\"" + echo "Exposing flink jobserver at localhost:$FLINK_LOCAL_PORT" + gcloud compute ssh yarn@$MASTER_NAME -- -L $FLINK_LOCAL_PORT:$YARN_APPLICATION_MASTER -L $jobmanager_rpc_port:$yarn_application_master_host:$jobmanager_rpc_port -D 1080 + echo "===================Closing the shell does not stop the yarn application===================" + echo "Execute \"$kill_command\" to kill the yarn application." + echo "To re-establish tunnel use \"$tunnel_command\"" +} + +function create_cluster() { + echo "Starting dataproc cluster." + gcloud dataproc clusters create $CLUSTER_NAME --num-workers=$NUM_WORKERS --initialization-actions $FLINK_INIT,$DOCKER_INIT --metadata flink_version=$FLINK_VERSION,work_dir=$WORK_DIR/flink --image-version=$DATAPROC_VERSION + echo "Sleeping for 30 sec" + sleep 30s +} + +function main() { + upload_resources + create_cluster # Comment this line to use existing cluster. + start_yarn_application # Comment this line if yarn application is already running on the cluster. + get_leader + upload_worker_image + pull_worker_image + start_tunnel +} + +main "$@" \ No newline at end of file diff --git a/examples/chicago_taxi/dataproc/docker-init.sh b/examples/chicago_taxi/dataproc/docker-init.sh new file mode 100755 index 0000000000..a2dfaaf017 --- /dev/null +++ b/examples/chicago_taxi/dataproc/docker-init.sh @@ -0,0 +1,77 @@ +#!/usr/bin/env bash + +set -euxo pipefail + +# TODO: Allow this to be configured by metadata. +readonly DOCKER_VERSION='18.06.0~ce~3-0~debian' +readonly CREDENTIAL_HELPER_VERSION='1.5.0' + + +function is_master() { + local role="$(/usr/share/google/get_metadata_value attributes/dataproc-role)" + if [[ "$role" == 'Master' ]] ; then + true + else + false + fi +} + +function get_docker_gpg() { + curl -fsSL https://download.docker.com/linux/debian/gpg +} + +function update_apt_get() { + for ((i = 0; i < 10; i++)) ; do + if apt-get update; then + return 0 + fi + sleep 5 + done + return 1 +} + +function install_docker() { + update_apt_get + apt-get install -y apt-transport-https ca-certificates curl gnupg2 + get_docker_gpg | apt-key add - + echo "deb [arch=amd64] https://download.docker.com/linux/debian $(lsb_release -cs) stable" >/etc/apt/sources.list.d/docker.list + update_apt_get + apt-get install -y docker-ce="${DOCKER_VERSION}" +} + +function configure_gcr() { + # this standalone method is recommended here: + # https://cloud.google.com/container-registry/docs/advanced-authentication#standalone_docker_credential_helper + curl -fsSL "https://github.com/GoogleCloudPlatform/docker-credential-gcr/releases/download/v${CREDENTIAL_HELPER_VERSION}/docker-credential-gcr_linux_amd64-${CREDENTIAL_HELPER_VERSION}.tar.gz" \ + | tar xz --to-stdout ./docker-credential-gcr \ + > /usr/local/bin/docker-credential-gcr && chmod +x /usr/local/bin/docker-credential-gcr + + # this command configures docker on a per-user basis. Therefore we configure + # the root user, as well as the yarn user which is part of the docker group. + # If additional users are added to the docker group later, this command will + # need to be run for them as well. + docker-credential-gcr configure-docker + su yarn --command "docker-credential-gcr configure-docker" +} + +function configure_docker() { + # The installation package should create `docker` group. + usermod -aG docker yarn + # configure docker to use Google Cloud Registry + configure_gcr + + systemctl enable docker + # Restart YARN daemons to pick up new group without restarting nodes. + if is_master ; then + systemctl restart hadoop-yarn-resourcemanager + else + systemctl restart hadoop-yarn-nodemanager + fi +} + +function main() { + install_docker + configure_docker +} + +main "$@" diff --git a/examples/chicago_taxi/dataproc/flink-init-dataproc.sh b/examples/chicago_taxi/dataproc/flink-init-dataproc.sh new file mode 100644 index 0000000000..e0ae830a7f --- /dev/null +++ b/examples/chicago_taxi/dataproc/flink-init-dataproc.sh @@ -0,0 +1,59 @@ +#!/usr/bin/env bash +set -euxo pipefail +#time HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -c com.google.cloud.flink.sandbox.BigShuffle -p 396 -m yarn-cluster -ys 4 -yn 99 -ytm 12000 ~/flink-sandbox-assembly-0.1.0-SNAPSHOT.jar 'gs://sidhom-scratch-us-central1/teragen/100tb/ascii_sort_1GB_input.0000*' gs://sidhom-scratch-us-central1/bigshuffle/result +#time gcloud dataproc clusters create sidhom-flink --master-machine-type n1-standard-4 --worker-machine-type n1-standard-4 --num-workers 100 --initialization-actions=gs://dataproc-initialization-actions/flink/flink.sh --metadata=flink-start-yarn-session=false --zone us-central1-b +#time gcloud dataproc clusters create goenka-flink-155 --num-workers=2 --initialization-actions gs://clouddfe-goenka/flink/flink-init-155.sh,gs://clouddfe-goenka/flink/docker-init.sh +#time gcloud compute instances list | grep goenka-flink-155-16 | sed "s/ .*//" | xargs -I INSTANCE -P 100 gcloud compute ssh yarn@INSTANCE --command="docker pull gcr.io/dataflow-build/goenka/beam_fnapi_python:latest" +#time /usr/lib/flink/bin/yarn-session.sh -n 2 -tm 10240 -s 4 -d -nm flink_yarn + +#readonly FLINK_VERSION=1.5.5 +readonly FLINK_VERSION=$(/usr/share/google/get_metadata_value attributes/flink_version) +readonly FLINK_TOPLEVEL="flink-$FLINK_VERSION" +#readonly FLINK_GCS="gs://clouddfe-goenka/flink/dist/flink-$FLINK_VERSION-bin-hadoop28-scala_2.11.tgz" +readonly FLINK_GCS=$(/usr/share/google/get_metadata_value attributes/work_dir)/flink-$FLINK_VERSION-bin-hadoop28-scala_2.11.tgz +readonly FLINK_LOCAL="/tmp/flink-$FLINK_VERSION-bin-hadoop28-scala_2.11.tgz" +readonly FLINK_INSTALL_DIR='/usr/lib/flink' +function is_master() { + local role="$(/usr/share/google/get_metadata_value attributes/dataproc-role)" + if [[ "$role" == 'Master' ]] ; then + true + else + false + fi +} +function primary_master() { + local primary="$(/usr/share/google/get_metadata_value attributes/dataproc-master)" + echo -n "$primary" +} +function install_flink() { + gsutil cp "$FLINK_GCS" "$FLINK_LOCAL" + tar -xvf "$FLINK_LOCAL" -C /tmp + mv "/tmp/$FLINK_TOPLEVEL" "$FLINK_INSTALL_DIR" + rm "$FLINK_LOCAL" +} +function configure_flink() { + local hdfs_master="$(primary_master)" + # TODO(sidhom): How do we get HDFS port from config? + local history_dir="hdfs://$hdfs_master:8020/user/yarn/flink-history" + hdfs dfs -mkdir "$history_dir" + #hdfs dfs -chown yarn "$history_dir" + mkdir /var/log/flink + chmod a+rwxt /var/log/flink + cat >>"$FLINK_INSTALL_DIR/conf/flink-conf.yaml" </dev/null) +SCHEMA_PATH=./data/flink_tfdv_output/schema.pbtxt + +echo Using GCP project: $MYPROJECT +echo Job input path: $JOB_INPUT_PATH +echo Job output path: $JOB_OUTPUT_PATH +echo TFT output path: $TFT_OUTPUT_PATH + +: << 'END' +JOB_ID="chicago-taxi-preprocess-$(date +%Y%m%d-%H%M%S)" +JOB_INPUT_PATH=$(pwd)/data +JOB_OUTPUT_PATH=$(pwd)/flink_output +export TFT_OUTPUT_PATH=$JOB_OUTPUT_PATH/tft_output +TEMP_PATH=/tmp/flink-tfx +MYPROJECT=$(gcloud config list --format 'value(core.project)' 2>/dev/null) + +echo Using GCP project: $MYPROJECT +echo Job input path: $JOB_INPUT_PATH +echo Job output path: $JOB_OUTPUT_PATH +echo TFT output path: $TFT_OUTPUT_PATH +END + +# move data to gcs +echo Uploading data to GCS +gsutil cp -r ./data/eval/ ./data/train/ $JOB_INPUT_PATH/ + + +# Preprocess the eval files +echo Preprocessing eval data... +rm -R -f $(pwd)/data/eval/local_chicago_taxi_output + +#image="$(whoami)-docker-apache.bintray.io/beam/python" +image="gcr.io/dataflow-build/goenka/beam_fnapi_python" + + +#input=gs://clouddfe-goenka/chicago_taxi_data/taxi_trips_000000000000.csv +input=$JOB_INPUT_PATH/eval/data.csv +#input=$JOB_INPUT_PATH/eval/data_medium.csv +#input=$JOB_INPUT_PATH/eval/data_133M.csv + + +threads=100 +#sdk=--sdk_location=/usr/local/google/home/goenka/d/work/beam/beam/sdks/python/build/apache-beam-2.9.0.dev0.tar.gz +sdk="" + +#extra_args="--retain_docker_containers=true" +extra_args="" + +python preprocess.py \ + --output_dir $JOB_OUTPUT_PATH/eval/local_chicago_taxi_output \ + --schema_file $SCHEMA_PATH \ + --outfile_prefix eval_transformed \ + --input $input \ + --setup_file ./setup.py \ + --experiments=beam_fn_api \ + --runner PortableRunner \ + --job_endpoint=localhost:8099 \ + --experiments=worker_threads=$threads \ + $sdk \ + $extra_args \ + --environment_type=DOCKER \ + --environment_config=$image \ + --execution_mode_for_batch=BATCH_FORCED + + +# Preprocess the train files, keeping the transform functions +echo Preprocessing train data... +rm -R -f $(pwd)/data/train/local_chicago_taxi_output +python preprocess.py \ + --output_dir $JOB_OUTPUT_PATH/train/local_chicago_taxi_output \ + --schema_file $SCHEMA_PATH \ + --outfile_prefix train_transformed \ + --input $JOB_INPUT_PATH/train/data.csv \ + --setup_file ./setup.py \ + --experiments=beam_fn_api \ + --runner PortableRunner \ + --job_endpoint=localhost:8099 \ + --experiments=worker_threads=$threads \ + $sdk \ + $extra_args \ + --environment_type=DOCKER \ + --environment_config=$image \ + --execution_mode_for_batch=BATCH_FORCED + diff --git a/examples/chicago_taxi/requirements.txt b/examples/chicago_taxi/requirements.txt index cd8c47d3d7..7d1e0773b5 100644 --- a/examples/chicago_taxi/requirements.txt +++ b/examples/chicago_taxi/requirements.txt @@ -1,3 +1,3 @@ --index-url https://pypi.python.org/simple/ --e . +-e . \ No newline at end of file diff --git a/examples/chicago_taxi/setup.py b/examples/chicago_taxi/setup.py index ca96afd7ec..8c0722afa6 100644 --- a/examples/chicago_taxi/setup.py +++ b/examples/chicago_taxi/setup.py @@ -24,7 +24,7 @@ install_requires=[ 'apache-beam[gcp]==2.8.0', 'jupyter==1.0', - 'numpy==1.13.3', + 'numpy==1.15.4', 'protobuf==3.6.0', 'tensorflow==' + TF_VERSION, 'tensorflow-data-validation==0.11.0', @@ -32,4 +32,5 @@ 'tensorflow-model-analysis==0.11.0', 'tensorflow-serving-api==1.11.0', 'tensorflow-transform==0.11.0', + 'grpcio==1.17.1', ]) diff --git a/examples/chicago_taxi/tfdv_analyze_and_validate_dataflow_portable.sh b/examples/chicago_taxi/tfdv_analyze_and_validate_dataflow_portable.sh new file mode 100644 index 0000000000..c4336e58b2 --- /dev/null +++ b/examples/chicago_taxi/tfdv_analyze_and_validate_dataflow_portable.sh @@ -0,0 +1,91 @@ +#!/bin/bash +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +set -u + +echo Starting distributed TFDV stats computation and schema generation... + +if [ -z "$MYBUCKET" ]; then + echo MYBUCKET was not set + echo Please set MYBUCKET to your GCP bucket using: export MYBUCKET=gs://bucket + exit 1 +fi + +JOB_ID="chicago-taxi-tfdv-$(date +%Y%m%d-%H%M%S)" +JOB_INPUT_PATH=$MYBUCKET/$JOB_ID/chicago_taxi_input +JOB_OUTPUT_PATH=$MYBUCKET/$JOB_ID/chicago_taxi_output +TEMP_PATH=$MYBUCKET/$JOB_ID/tmp/ +MYPROJECT=$(gcloud config list --format 'value(core.project)' 2>/dev/null) + +# Variables needed for subsequent stages. +export TFDV_OUTPUT_PATH=$JOB_OUTPUT_PATH/tfdv_output +export SCHEMA_PATH=$TFDV_OUTPUT_PATH/schema.pbtxt + +echo Using GCP project: $MYPROJECT +echo Job input path: $JOB_INPUT_PATH +echo Job output path: $JOB_OUTPUT_PATH +echo TFDV output path: $TFDV_OUTPUT_PATH + +# move data to gcs +echo Uploading data to GCS +gsutil cp -r ./data/eval/ ./data/train/ $JOB_INPUT_PATH/ + +# eval_input=bigquery-public-data.chicago_taxi_trips.taxi_trips +eval_input=$JOB_INPUT_PATH/eval/data.csv +# train_input=bigquery-public-data.chicago_taxi_trips.taxi_trips +train_input=$JOB_INPUT_PATH/train/data.csv + +sdk=--sdk_location=/usr/local/google/home/goenka/d/work/beam/beam/sdks/python/build/apache-beam-2.9.0.dev0.tar.gz +#sdk="" + +# Compute stats and generate a schema based on the stats. +python tfdv_analyze_and_validate.py \ + --input $eval_input \ + --infer_schema \ + --stats_path $TFDV_OUTPUT_PATH/train_stats.tfrecord \ + --schema_path $SCHEMA_PATH \ + --project $MYPROJECT \ + --temp_location $TEMP_PATH \ + --job_name $JOB_ID \ + --setup_file ./setup.py \ + --save_main_session True \ + --runner DataflowRunner \ + --experiments=beam_fn_api \ + $sdk + +EVAL_JOB_ID=$JOB_ID-eval + +# Compute stats for eval data and validate stats against the schema. +python tfdv_analyze_and_validate.py \ + --input $train_input \ + --for_eval \ + --schema_path $SCHEMA_PATH \ + --validate_stats \ + --stats_path $TFDV_OUTPUT_PATH/eval_stats.tfrecord \ + --anomalies_path $TFDV_OUTPUT_PATH/anomalies.pbtxt \ + --project $MYPROJECT \ + --temp_location $TEMP_PATH \ + --job_name $EVAL_JOB_ID \ + --setup_file ./setup.py \ + --save_main_session True \ + --runner DataflowRunner \ + --experiments=beam_fn_api \ + $sdk + + +echo +echo +echo " TFDV_OUTPUT_PATH=$TFDV_OUTPUT_PATH" +echo " SCHEMA_PATH=$SCHEMA_PATH" +echo diff --git a/examples/chicago_taxi/tfdv_analyze_and_validate_flink.sh b/examples/chicago_taxi/tfdv_analyze_and_validate_flink.sh new file mode 100644 index 0000000000..e8fd296928 --- /dev/null +++ b/examples/chicago_taxi/tfdv_analyze_and_validate_flink.sh @@ -0,0 +1,125 @@ +#!/bin/bash +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +set -u +set -x + +echo Starting distributed TFDV stats computation and schema generation... + +if [ "${MYBUCKET:-unset}" = "unset" ]; then + echo MYBUCKET was not set + echo Please set MYBUCKET to your GCP bucket using: export MYBUCKET=gs://bucket + exit 1 +fi + +JOB_ID="chicago-taxi-tfdv-$(date +%Y%m%d-%H%M%S)" +JOB_INPUT_PATH=$MYBUCKET/$JOB_ID/chicago_taxi_input +JOB_OUTPUT_PATH=$MYBUCKET/$JOB_ID/chicago_taxi_output +TEMP_PATH=$MYBUCKET/$JOB_ID/tmp/ +MYPROJECT=$(gcloud config list --format 'value(core.project)' 2>/dev/null) + +# Variables needed for subsequent stages. +export TFDV_OUTPUT_PATH=$JOB_OUTPUT_PATH/tfdv_output +export SCHEMA_PATH=$TFDV_OUTPUT_PATH/schema.pbtxt + +echo Using GCP project: $MYPROJECT +echo Job input path: $JOB_INPUT_PATH +echo Job output path: $JOB_OUTPUT_PATH +echo TFDV output path: $TFDV_OUTPUT_PATH + +# move data to gcs +echo Uploading data to GCS +gsutil cp -r ./data/eval/ ./data/train/ $JOB_INPUT_PATH/ + +#image="goenka-docker-apache.bintray.io/beam/python" +image="gcr.io/dataflow-build/goenka/beam_fnapi_python" + +#input=bigquery-public-data.chicago_taxi_trips.taxi_trips +eval_input=$JOB_INPUT_PATH/eval/data.csv + +train_input=$JOB_INPUT_PATH/train/data.csv + +threads=100 +sdk="" + +extra_args="" +#extra_args="--retain_docker_containers=true" + +environment_type=DOCKER +environment_config=$image + +if [ "${BEAM_SDK:-unset}" != "unset" ]; then + sdk="--sdk_location==$BEAM_SDK" +fi + +if [ "${EXTRA_ARGS:-unset}" != "unset" ]; then + extra_args=$EXTRA_ARGS +fi + +if [ "${ENVIRONMENT_TYPE:-unset}" != "unset" ]; then + environment_type=$ENVIRONMENT_TYPE +fi + +if [ "${ENVIRONMENT_CONFIG:-unset}" != "unset" ]; then + environment_config=$ENVIRONMENT_CONFIG +fi + + +# Compute stats and generate a schema based on the stats. +python tfdv_analyze_and_validate.py \ + --infer_schema \ + --stats_path $TFDV_OUTPUT_PATH/train_stats.tfrecord \ + --schema_path $SCHEMA_PATH \ + --setup_file ./setup.py \ + --save_main_session True \ + --input $train_input \ + --runner PortableRunner \ + --job_endpoint=localhost:8099 \ + --experiments=worker_threads=$threads \ + $sdk \ + $extra_args \ + --environment_type=$environment_type \ + --environment_config="$environment_config" + +EVAL_JOB_ID=$JOB_ID-eval + +# Compute stats for eval data and validate stats against the schema. +python tfdv_analyze_and_validate.py \ + --for_eval \ + --schema_path $SCHEMA_PATH \ + --validate_stats \ + --stats_path $TFDV_OUTPUT_PATH/eval_stats.tfrecord \ + --anomalies_path $TFDV_OUTPUT_PATH/anomalies.pbtxt \ + --setup_file ./setup.py \ + --save_main_session True \ + --input $eval_input \ + --experiments=beam_fn_api \ + --runner PortableRunner \ + --job_endpoint=localhost:8099 \ + --experiments=worker_threads=$threads \ + $sdk \ + $extra_args \ + --environment_type=$environment_type \ + --environment_config="$environment_config" + + +echo +echo +echo " TFDV_OUTPUT_PATH=$TFDV_OUTPUT_PATH" +echo " SCHEMA_PATH=$SCHEMA_PATH" +mkdir -p data/flink_tfdv_output +# move data to gcs +echo Downloading data from GCS +gsutil cp -r $TFDV_OUTPUT_PATH/* data/flink_tfdv_output/ +echo