1
+ #! /usr/bin/env bash
2
+
3
+ # set -x
4
+ set -e
5
+ USER=` whoami`
6
+ CLUSTER_NAME=" $USER -flink-156"
7
+ NUM_WORKERS=2
8
+ FLINK_VERSION=1.5.6
9
+ WORK_DIR=" gs://clouddfe-$USER /tmp"
10
+ CLOUD_WORKER_IMAGE=" gcr.io/dataflow-build/$USER /beam_fnapi_python:latest"
11
+ TASK_MANAGER_MEM=10240
12
+ FLINK_LOCAL_PORT=8081
13
+ TASK_MANAGER_SLOTS=1
14
+ DATAPROC_VERSION=1.2
15
+
16
+ MASTER_NAME=" $CLUSTER_NAME -m"
17
+ FLINK_INIT=" $WORK_DIR /flink/flink-init-dataproc.sh"
18
+ DOCKER_INIT=" $WORK_DIR /flink/docker-init.sh"
19
+ LOCAL_WORKER_IMAGE=" $USER -docker-apache.bintray.io/beam/python:latest"
20
+ FLINK_DOWNLOAD_URL=" http://archive.apache.org/dist/flink/flink-$FLINK_VERSION /flink-$FLINK_VERSION -bin-hadoop28-scala_2.11.tgz"
21
+
22
+ YARN_APPLICATION=" "
23
+ YARN_APPLICATION_MASTER=" "
24
+
25
+
26
+ function is_master() {
27
+ local role=" $( /usr/share/google/get_metadata_value attributes/dataproc-role) "
28
+ if [[ " $role " == ' Master' ]] ; then
29
+ true
30
+ else
31
+ false
32
+ fi
33
+ }
34
+
35
+ function get_leader() {
36
+ local i=0
37
+ local -A application_ids
38
+ local -A application_masters
39
+ # gcloud compute ssh yarn@$MASTER_NAME --command="yarn application -list" | grep "$CLUSTER_NAME"
40
+ echo " Yarn Applications"
41
+ while read line; do
42
+ echo $line
43
+ application_ids[$i ]=` echo $line | sed " s/ .*//" `
44
+ application_masters[$i ]=` echo $line | sed " s/.*$CLUSTER_NAME /$CLUSTER_NAME /" | sed " s/ .*//" `
45
+ i=$(( i+ 1 ))
46
+ done <<< $( gcloud compute ssh yarn@$MASTER_NAME --command=" yarn application -list" | grep " $CLUSTER_NAME " )
47
+
48
+ if [ $i != 1 ]; then
49
+ echo " Multiple applications found. Make sure that only 1 application is running on the cluster."
50
+ for app in ${application_ids[*]} ;
51
+ do
52
+ echo $app
53
+ done
54
+
55
+ echo " Execute 'gcloud compute ssh yarn@$MASTER_NAME --command=\" yarn application -kill <APP_NAME>\" ' to kill the yarn application."
56
+ exit 1
57
+ fi
58
+
59
+ YARN_APPLICATION=${application_ids[0]}
60
+ YARN_APPLICATION_MASTER=${application_masters[0]}
61
+ echo " Using Yarn Application $YARN_APPLICATION $YARN_APPLICATION_MASTER "
62
+ }
63
+
64
+ function upload_worker_image() {
65
+ echo " Tagging worker image $LOCAL_WORKER_IMAGE to $CLOUD_WORKER_IMAGE "
66
+ docker tag $LOCAL_WORKER_IMAGE $CLOUD_WORKER_IMAGE
67
+ echo " Pushing worker image $CLOUD_WORKER_IMAGE to GCR"
68
+ docker push $CLOUD_WORKER_IMAGE
69
+ }
70
+
71
+ function pull_worker_image() {
72
+ echo " Pulling worker image $CLOUD_WORKER_IMAGE on workers $( gcloud compute instances list | sed " s/ .*//" | grep " ^\($CLUSTER_NAME -m$\|$CLUSTER_NAME -w-[a-zA-Z0-9]*$\)" ) "
73
+ gcloud compute instances list | sed " s/ .*//" | grep " ^\($CLUSTER_NAME -m$\|$CLUSTER_NAME -w-[a-zA-Z0-9]*$\)" | xargs -I INSTANCE -P 100 gcloud compute ssh yarn@INSTANCE --command=" docker pull $CLOUD_WORKER_IMAGE "
74
+ }
75
+
76
+ function start_yarn_application() {
77
+ echo " Starting yarn application on $MASTER_NAME "
78
+ execute_on_master " /usr/lib/flink/bin/yarn-session.sh -n $NUM_WORKERS -tm $TASK_MANAGER_MEM -s $TASK_MANAGER_SLOTS -d -nm flink_yarn"
79
+ }
80
+
81
+ function execute_on_master() {
82
+ gcloud compute ssh yarn@$MASTER_NAME --command=" $1 "
83
+ }
84
+
85
+ function upload_resources() {
86
+ local TMP_FOLDER=` mktemp -d -t flink_tmp_XXXX`
87
+
88
+ echo " Downloading flink at $TMP_FOLDER "
89
+ wget -P $TMP_FOLDER $FLINK_DOWNLOAD_URL
90
+
91
+ echo " Uploading resources to GCS $WORK_DIR "
92
+ cp ./create_cluster.sh $TMP_FOLDER
93
+ cp ./docker-init.sh $TMP_FOLDER
94
+ cp ./flink-init-dataproc.sh $TMP_FOLDER
95
+
96
+ gsutil cp -r $TMP_FOLDER /* $WORK_DIR /flink
97
+
98
+ rm -r $TMP_FOLDER
99
+ }
100
+
101
+ function start_tunnel() {
102
+ local job_server_config=` execute_on_master " curl -s \" http://$YARN_APPLICATION_MASTER /jobmanager/config\" " `
103
+ local key=" jobmanager.rpc.port"
104
+ local yarn_application_master_host=` echo $YARN_APPLICATION_MASTER | cut -d " :" -f1`
105
+
106
+ jobmanager_rpc_port=` echo $job_server_config | python -c " import sys, json; print [ e['value'] for e in json.load(sys.stdin) if e['key'] == u'$key '][0]" `
107
+ local tunnel_command=" gcloud compute ssh $MASTER_NAME -- -L $FLINK_LOCAL_PORT :$YARN_APPLICATION_MASTER -L $jobmanager_rpc_port :$yarn_application_master_host :$jobmanager_rpc_port -D 1080"
108
+ local kill_command=" gcloud compute ssh yarn@$MASTER_NAME --command=\" yarn application -kill $YARN_APPLICATION \" "
109
+ echo " ===================Closing the shell does not stop the yarn application==================="
110
+ echo " Execute \" $kill_command \" to kill the yarn application."
111
+ echo " Starting tunnel \" $tunnel_command \" "
112
+ echo " Exposing flink jobserver at localhost:$FLINK_LOCAL_PORT "
113
+ gcloud compute ssh yarn@$MASTER_NAME -- -L $FLINK_LOCAL_PORT :$YARN_APPLICATION_MASTER -L $jobmanager_rpc_port :$yarn_application_master_host :$jobmanager_rpc_port -D 1080
114
+ echo " ===================Closing the shell does not stop the yarn application==================="
115
+ echo " Execute \" $kill_command \" to kill the yarn application."
116
+ echo " To re-establish tunnel use \" $tunnel_command \" "
117
+ }
118
+
119
+ function create_cluster() {
120
+ echo " Starting dataproc cluster."
121
+ gcloud dataproc clusters create $CLUSTER_NAME --num-workers=$NUM_WORKERS --initialization-actions $FLINK_INIT ,$DOCKER_INIT --metadata flink_version=$FLINK_VERSION ,work_dir=$WORK_DIR /flink --image-version=$DATAPROC_VERSION
122
+ echo " Sleeping for 30 sec"
123
+ sleep 30s
124
+ }
125
+
126
+ function main() {
127
+ upload_resources
128
+ create_cluster # Comment this line to use existing cluster.
129
+ start_yarn_application # Comment this line if yarn application is already running on the cluster.
130
+ get_leader
131
+ upload_worker_image
132
+ pull_worker_image
133
+ start_tunnel
134
+ }
135
+
136
+ main " $@ "
0 commit comments