Skip to content

Commit 6fba412

Browse files
committed
Deal with quotes + address various comments
1 parent fe4c8a7 commit 6fba412

File tree

4 files changed

+63
-24
lines changed

4 files changed

+63
-24
lines changed

bin/pyspark

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export SPARK_HOME="$FWDIR"
2626
SCALA_VERSION=2.10
2727

2828
if [[ "$@" == *--help* ]]; then
29-
echo "Usage: ./bin/pyspark [python file] [options]"
29+
echo "Usage: ./bin/pyspark [options]"
3030
./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
3131
exit 0
3232
fi
@@ -57,15 +57,31 @@ export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
5757
# Load the PySpark shell.py script when ./pyspark is used interactively:
5858
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
5959
export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py
60-
export PYSPARK_SUBMIT_ARGS="$@"
6160

61+
# If IPython options are specified, assume user wants to run IPython
6262
if [ -n "$IPYTHON_OPTS" ]; then
6363
IPYTHON=1
6464
fi
6565

66-
# If a python file is provided, directly run spark-submit
66+
# Build up arguments list manually to preserve quotes. We export Spark submit arguments as an
67+
# environment variable because shell.py must run as a PYTHONSTARTUP script, which does not take
68+
# in arguments. This is required mainly for IPython notebooks.
69+
70+
PYSPARK_SUBMIT_ARGS=""
71+
whitespace="[[:space:]]"
72+
for i in "$@"; do
73+
if [[ $i =~ $whitespace ]]; then
74+
i=\"$i\"
75+
fi
76+
PYSPARK_SUBMIT_ARGS="$PYSPARK_SUBMIT_ARGS $i"
77+
done
78+
export PYSPARK_SUBMIT_ARGS
79+
80+
# If a python file is provided, directly run spark-submit.
6781
if [[ "$1" =~ \.py$ ]]; then
68-
exec $FWDIR/bin/spark-submit $PYSPARK_SUBMIT_ARGS
82+
echo -e "\nWARNING: Running python applications through ./bin/pyspark is deprecated as of Spark 1.0."
83+
echo -e "Use ./bin/spark-submit <python file>\n"
84+
exec $FWDIR/bin/spark-submit "$@"
6985
else
7086
# Only use ipython if no command line arguments were provided [SPARK-1134]
7187
if [[ "$IPYTHON" = "1" ]]; then

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,10 @@ object SparkSubmit {
134134
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
135135
args.mainClass = "org.apache.spark.deploy.PythonRunner"
136136
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
137-
args.files = Utils.mergeFileLists(args.files, args.primaryResource)
137+
args.files = mergeFileLists(args.files, args.primaryResource)
138138
}
139139
val pyFiles = Option(args.pyFiles).getOrElse("")
140-
args.files = Utils.mergeFileLists(args.files, pyFiles)
140+
args.files = mergeFileLists(args.files, pyFiles)
141141
sysProps("spark.submit.pyFiles") = pyFiles
142142
}
143143

@@ -300,7 +300,7 @@ object SparkSubmit {
300300

301301
private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
302302
val localJarFile = new File(new URI(localJar).getPath)
303-
if (!localJarFile.exists) {
303+
if (!localJarFile.exists()) {
304304
printWarning(s"Jar $localJar does not exist, skipping.")
305305
}
306306

@@ -328,6 +328,18 @@ object SparkSubmit {
328328
private[spark] def isPython(primaryResource: String): Boolean = {
329329
primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
330330
}
331+
332+
/**
333+
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
334+
* no files, into a single comma-separated string.
335+
*/
336+
private[spark] def mergeFileLists(lists: String*): String = {
337+
lists
338+
.filter(_ != null)
339+
.filter(_ != "")
340+
.flatMap(_.split(","))
341+
.mkString(",")
342+
}
331343
}
332344

333345
/**

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1166,16 +1166,4 @@ private[spark] object Utils extends Logging {
11661166
true
11671167
}
11681168
}
1169-
1170-
/**
1171-
* Merge a sequence of comma-separated file lists into a single comma-separated string.
1172-
* The provided strings may be null or empty to indicate no files.
1173-
*/
1174-
def mergeFileLists(lists: String*): String = {
1175-
lists
1176-
.filter(_ != null)
1177-
.filter(_ != "")
1178-
.flatMap(_.split(","))
1179-
.mkString(",")
1180-
}
11811169
}

python/pyspark/java_gateway.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
from threading import Thread
2424
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
2525

26-
2726
def launch_gateway():
2827
SPARK_HOME = os.environ["SPARK_HOME"]
2928

@@ -36,10 +35,7 @@ def launch_gateway():
3635
on_windows = platform.system() == "Windows"
3736
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
3837
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS")
39-
if submit_args is not None:
40-
submit_args = submit_args.split(" ")
41-
else:
42-
submit_args = []
38+
submit_args = split_preserve_quotes(submit_args)
4339
command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args
4440
if not on_windows:
4541
# Don't send ctrl-c / SIGINT to the Java gateway:
@@ -80,3 +76,30 @@ def run(self):
8076
java_import(gateway.jvm, "scala.Tuple2")
8177

8278
return gateway
79+
80+
def split_preserve_quotes(args):
81+
"""
82+
Given a string of space-delimited arguments with quotes,
83+
split it into a list while preserving the quote boundaries.
84+
"""
85+
if args is None:
86+
return []
87+
split_list = []
88+
quoted_string = ""
89+
wait_for_quote = False
90+
for arg in args.split(" "):
91+
if not wait_for_quote:
92+
if arg.startswith("\""):
93+
wait_for_quote = True
94+
quoted_string = arg
95+
else:
96+
split_list.append(arg)
97+
else:
98+
quoted_string += " " + arg
99+
if quoted_string.endswith("\""):
100+
# Strip quotes
101+
quoted_string = quoted_string[1:-1]
102+
split_list.append(quoted_string)
103+
quoted_string = ""
104+
wait_for_quote = False
105+
return split_list

0 commit comments

Comments
 (0)