Skip to content

Add flink scripts for preprocess and tfdv in chicago taxi #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# NOTE: if you modify this file, you probably need to modify the file set that
# is an input to 'maven-assembly-plugin' that generates source distribution.
# This is typically in files named 'src.xml' throughout this repository.

# Ignore any offline repositories the user may have created.
**/offline-repository/**/*

# Ignore files generated by the Gradle build process.
**/.gradle/**/*
**/.gogradle/**/*
**/gogradle.lock
**/build/**/*
sdks/**/vendor/**/*
runners/**/vendor/**/*
**/.gradletasknamecache

# Ignore files generated by the Maven build process.
**/bin/**/*
**/dependency-reduced-pom.xml
**/target/**/*

# Ignore generated archetypes
sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/
sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/

# Ignore files generated by the Python build process.
**/*.pyc
**/*.pyo
**/*.pyd
**/*.egg-info/
**/.eggs/
**/nose-*.egg/
**/.tox/**/*
**/dist/**/*
**/distribute-*/**/*
**/env/**/*
sdks/python/**/*.c
sdks/python/**/*.so
sdks/python/**/*.egg
sdks/python/LICENSE
sdks/python/NOTICE
sdks/python/README.md
sdks/python/apache_beam/portability/api/*pb2*.*
sdks/python/nosetests.xml

# Igonore data files
**/*data/
**/data/

# Ignore IntelliJ files.
**/.idea/**/*
**/*.iml
**/*.ipr
**/*.iws
**/out/**/*

# Ignore Eclipse files.
**/.classpath
**/.project
**/.factorypath
**/.checkstyle
**/.fbExcludeFilterFile
**/.apt_generated/**/*
**/.settings/**/*

# Ignore Visual Studio Code files.
**/.vscode/**/*

# Hotspot VM leaves this log in a non-target directory when java crashes
**/hs_err_pid*.log

# Ignore files that end with '~', since they are most likely auto-save files
# produced by a text editor.
**/*~

# Ignore MacOSX files.
**/.DS_Store/**/*
**/.DS_Store

# Ignore Jupyter notebook checkpoints.
**/.ipynb_checkpoints/**/*

# NOTE: if you modify this file, you probably need to modify the file set that
# is an input to 'maven-assembly-plugin' that generates source distribution.
# This is typically in files named 'src.xml' throughout this repository.
114 changes: 114 additions & 0 deletions examples/chicago_taxi/custom_requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
absl-py==0.6.1
apache-beam==2.10.0.dev0
astor==0.7.1
avro==1.8.2
backports-abc==0.5
backports.shutil-get-terminal-size==1.0.0
backports.weakref==1.0.post1
bleach==3.0.2
cachetools==3.0.0
certifi==2018.10.15
chardet==3.0.4
configparser==3.5.0
crcmod==1.7
decorator==4.3.0
defusedxml==0.5.0
dill==0.2.8.2
docopt==0.6.2
entrypoints==0.2.3
enum34==1.1.6
fastavro==0.21.13
fasteners==0.14.1
funcsigs==1.0.2
functools32==3.2.3.post2
future==0.17.1
futures==3.2.0
gast==0.2.0
google-api-core==1.5.2
google-apitools==0.5.24
google-auth==1.6.1
google-auth-httplib2==0.0.3
google-cloud-bigquery==1.6.0
google-cloud-core==0.28.1
google-cloud-dataproc==0.2.0
google-cloud-pubsub==0.35.4
google-resumable-media==0.3.1
googleapis-common-protos==1.5.3
googledatastore==7.0.1
grpc==0.3.post19
grpc-google-iam-v1==0.11.1
grpcio==1.8.6
hdfs==2.1.0
httplib2==0.9.2
idna==2.7
ipaddress==1.0.22
ipykernel==4.10.0
ipython==5.8.0
ipython-genutils==0.2.0
ipywidgets==7.4.2
Jinja2==2.10
jsonschema==2.6.0
jupyter==1.0.0
jupyter-client==5.2.3
jupyter-console==5.2.0
jupyter-core==4.4.0
Markdown==3.0.1
MarkupSafe==1.1.0
mistune==0.8.4
mock==2.0.0
monotonic==1.5
msgpack-python==0.5.6
nbconvert==5.4.0
nbformat==4.4.0
notebook==5.7.0
numpy==1.13.3
oauth2client==3.0.0
pandas==0.23.4
pandocfilters==1.4.2
pathlib2==2.3.2
pbr==5.1.1
pexpect==4.6.0
pickleshare==0.7.5
prometheus-client==0.4.2
prompt-toolkit==1.0.15
proto-google-cloud-datastore-v1==0.90.4
proto-google-cloud-pubsub-v1==0.15.4
protobuf==3.6.0
ptyprocess==0.6.0
pyasn1==0.4.4
pyasn1-modules==0.2.2
pydot==1.2.4
Pygments==2.2.0
pyparsing==2.3.0
python-dateutil==2.7.5
pytz==2018.4
PyVCF==0.6.8
PyYAML==3.13
pyzmq==17.1.2
qtconsole==4.4.3
requests==2.20.1
rsa==4.0
scandir==1.9.0
Send2Trash==1.5.0
simplegeneric==0.8.1
singledispatch==3.4.0.3
six==1.11.0
tensorboard==1.9.0
tensorflow==1.9.0
tensorflow-data-validation==0.9.0
tensorflow-metadata==0.9.0
tensorflow-model-analysis==0.9.2
tensorflow-serving-api==1.9.0
tensorflow-transform==0.9.0
termcolor==1.1.0
terminado==0.8.1
testpath==0.4.2
tfx-chicago-taxi==0.9.2
tornado==5.1.1
traitlets==4.3.2
typing==3.6.6
urllib3==1.24.1
wcwidth==0.1.7
webencodings==0.5.1
Werkzeug==0.14.1
widgetsnbextension==3.4.2
136 changes: 136 additions & 0 deletions examples/chicago_taxi/dataproc/create_cluster.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#!/usr/bin/env bash

#set -x
set -e
USER=`whoami`
CLUSTER_NAME="$USER-flink-156"
NUM_WORKERS=2
FLINK_VERSION=1.5.6
WORK_DIR="gs://clouddfe-$USER/tmp"
CLOUD_WORKER_IMAGE="gcr.io/dataflow-build/$USER/beam_fnapi_python:latest"
TASK_MANAGER_MEM=10240
FLINK_LOCAL_PORT=8081
TASK_MANAGER_SLOTS=1
DATAPROC_VERSION=1.2

MASTER_NAME="$CLUSTER_NAME-m"
FLINK_INIT="$WORK_DIR/flink/flink-init-dataproc.sh"
DOCKER_INIT="$WORK_DIR/flink/docker-init.sh"
LOCAL_WORKER_IMAGE="$USER-docker-apache.bintray.io/beam/python:latest"
FLINK_DOWNLOAD_URL="http://archive.apache.org/dist/flink/flink-$FLINK_VERSION/flink-$FLINK_VERSION-bin-hadoop28-scala_2.11.tgz"

YARN_APPLICATION=""
YARN_APPLICATION_MASTER=""


function is_master() {
local role="$(/usr/share/google/get_metadata_value attributes/dataproc-role)"
if [[ "$role" == 'Master' ]] ; then
true
else
false
fi
}

function get_leader() {
local i=0
local -A application_ids
local -A application_masters
#gcloud compute ssh yarn@$MASTER_NAME --command="yarn application -list" | grep "$CLUSTER_NAME"
echo "Yarn Applications"
while read line; do
echo $line
application_ids[$i]=`echo $line | sed "s/ .*//"`
application_masters[$i]=`echo $line | sed "s/.*$CLUSTER_NAME/$CLUSTER_NAME/" | sed "s/ .*//"`
i=$((i+1))
done <<< $(gcloud compute ssh yarn@$MASTER_NAME --command="yarn application -list" | grep "$CLUSTER_NAME")

if [ $i != 1 ]; then
echo "Multiple applications found. Make sure that only 1 application is running on the cluster."
for app in ${application_ids[*]};
do
echo $app
done

echo "Execute 'gcloud compute ssh yarn@$MASTER_NAME --command=\"yarn application -kill <APP_NAME>\"' to kill the yarn application."
exit 1
fi

YARN_APPLICATION=${application_ids[0]}
YARN_APPLICATION_MASTER=${application_masters[0]}
echo "Using Yarn Application $YARN_APPLICATION $YARN_APPLICATION_MASTER"
}

function upload_worker_image() {
echo "Tagging worker image $LOCAL_WORKER_IMAGE to $CLOUD_WORKER_IMAGE"
docker tag $LOCAL_WORKER_IMAGE $CLOUD_WORKER_IMAGE
echo "Pushing worker image $CLOUD_WORKER_IMAGE to GCR"
docker push $CLOUD_WORKER_IMAGE
}

function pull_worker_image() {
echo "Pulling worker image $CLOUD_WORKER_IMAGE on workers $(gcloud compute instances list | sed "s/ .*//" | grep "^\($CLUSTER_NAME-m$\|$CLUSTER_NAME-w-[a-zA-Z0-9]*$\)")"
gcloud compute instances list | sed "s/ .*//" | grep "^\($CLUSTER_NAME-m$\|$CLUSTER_NAME-w-[a-zA-Z0-9]*$\)" | xargs -I INSTANCE -P 100 gcloud compute ssh yarn@INSTANCE --command="docker pull $CLOUD_WORKER_IMAGE"
}

function start_yarn_application() {
echo "Starting yarn application on $MASTER_NAME"
execute_on_master "/usr/lib/flink/bin/yarn-session.sh -n $NUM_WORKERS -tm $TASK_MANAGER_MEM -s $TASK_MANAGER_SLOTS -d -nm flink_yarn"
}

function execute_on_master() {
gcloud compute ssh yarn@$MASTER_NAME --command="$1"
}

function upload_resources() {
local TMP_FOLDER=`mktemp -d -t flink_tmp_XXXX`

echo "Downloading flink at $TMP_FOLDER"
wget -P $TMP_FOLDER $FLINK_DOWNLOAD_URL

echo "Uploading resources to GCS $WORK_DIR"
cp ./create_cluster.sh $TMP_FOLDER
cp ./docker-init.sh $TMP_FOLDER
cp ./flink-init-dataproc.sh $TMP_FOLDER

gsutil cp -r $TMP_FOLDER/* $WORK_DIR/flink

rm -r $TMP_FOLDER
}

function start_tunnel() {
local job_server_config=`execute_on_master "curl -s \"http://$YARN_APPLICATION_MASTER/jobmanager/config\""`
local key="jobmanager.rpc.port"
local yarn_application_master_host=`echo $YARN_APPLICATION_MASTER | cut -d ":" -f1`

jobmanager_rpc_port=`echo $job_server_config | python -c "import sys, json; print [ e['value'] for e in json.load(sys.stdin) if e['key'] == u'$key'][0]"`
local tunnel_command="gcloud compute ssh $MASTER_NAME -- -L $FLINK_LOCAL_PORT:$YARN_APPLICATION_MASTER -L $jobmanager_rpc_port:$yarn_application_master_host:$jobmanager_rpc_port -D 1080"
local kill_command="gcloud compute ssh yarn@$MASTER_NAME --command=\"yarn application -kill $YARN_APPLICATION\""
echo "===================Closing the shell does not stop the yarn application==================="
echo "Execute \"$kill_command\" to kill the yarn application."
echo "Starting tunnel \"$tunnel_command\""
echo "Exposing flink jobserver at localhost:$FLINK_LOCAL_PORT"
gcloud compute ssh yarn@$MASTER_NAME -- -L $FLINK_LOCAL_PORT:$YARN_APPLICATION_MASTER -L $jobmanager_rpc_port:$yarn_application_master_host:$jobmanager_rpc_port -D 1080
echo "===================Closing the shell does not stop the yarn application==================="
echo "Execute \"$kill_command\" to kill the yarn application."
echo "To re-establish tunnel use \"$tunnel_command\""
}

function create_cluster() {
echo "Starting dataproc cluster."
gcloud dataproc clusters create $CLUSTER_NAME --num-workers=$NUM_WORKERS --initialization-actions $FLINK_INIT,$DOCKER_INIT --metadata flink_version=$FLINK_VERSION,work_dir=$WORK_DIR/flink --image-version=$DATAPROC_VERSION
echo "Sleeping for 30 sec"
sleep 30s
}

function main() {
upload_resources
create_cluster # Comment this line to use existing cluster.
start_yarn_application # Comment this line if yarn application is already running on the cluster.
get_leader
upload_worker_image
pull_worker_image
start_tunnel
}

main "$@"
Loading