Skip to content

Commit a371d26

Browse files
committed
Route bin/pyspark through Spark submit
The bin/pyspark script takes two pathways, depending on the application. If the application is a python file, bin/pyspark passes the python file directly to Spark submit, which launches the python application as a sub-process within the JVM. If the application is the pyspark shell, however, bin/pyspark starts the python REPL as the parent process, which launches the JVM as a sub-process. A significant benefit here is that all keyboard signals are propagated first to the Python interpreter properly. The existing code already provided a code path to do this; all we need to change is to use spark-submit instead of spark-class to launch the JVM. This requires modifications to Spark submit to handle the pyspark shell as a special case. This has been tested locally (OSX) for both cases, and using IPython.
1 parent d52761d commit a371d26

File tree

7 files changed

+70
-32
lines changed

7 files changed

+70
-32
lines changed

bin/pyspark

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,20 @@ export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
5151
# Load the PySpark shell.py script when ./pyspark is used interactively:
5252
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
5353
export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py
54+
export PYSPARK_SUBMIT_ARGS="$@"
5455

5556
if [ -n "$IPYTHON_OPTS" ]; then
5657
IPYTHON=1
5758
fi
5859

59-
# Only use ipython if no command line arguments were provided [SPARK-1134]
60-
if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then
61-
exec ipython $IPYTHON_OPTS
60+
# If a python file is provided, directly run spark-submit
61+
if [[ "$1" =~ \.py$ ]]; then
62+
exec $FWDIR/bin/spark-submit $PYSPARK_SUBMIT_ARGS
6263
else
63-
exec "$PYSPARK_PYTHON" "$@"
64+
# Only use ipython if no command line arguments were provided [SPARK-1134]
65+
if [[ "$IPYTHON" = "1" ]]; then
66+
exec ipython $IPYTHON_OPTS
67+
else
68+
exec "$PYSPARK_PYTHON"
69+
fi
6470
fi

core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ object PythonRunner {
4242
// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
4343
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument
4444
val pathElements = new ArrayBuffer[String]
45-
pathElements ++= pyFiles.split(",")
45+
pathElements ++= Option(pyFiles).getOrElse("").split(",")
4646
pathElements += PythonUtils.sparkPythonPath
4747
pathElements += sys.env.getOrElse("PYTHONPATH", "")
4848
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@ object SparkSubmit {
4141
private var clusterManager: Int = LOCAL
4242

4343
/**
44-
* A special jar name that indicates the class being run is inside of Spark itself,
45-
* and therefore no user jar is needed.
44+
* Special primary resource names that represent shells rather than application jars.
4645
*/
47-
private val RESERVED_JAR_NAME = "spark-internal"
46+
private val SPARK_SHELL = "spark-shell"
47+
private val PYSPARK_SHELL = "pyspark-shell"
4848

4949
def main(args: Array[String]) {
5050
val appArgs = new SparkSubmitArguments(args)
@@ -71,8 +71,8 @@ object SparkSubmit {
7171
* entries for the child, a list of system properties, a list of env vars
7272
* and the main class for the child
7373
*/
74-
private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String],
75-
ArrayBuffer[String], Map[String, String], String) = {
74+
private[spark] def createLaunchEnv(args: SparkSubmitArguments)
75+
: (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
7676
if (args.master.startsWith("local")) {
7777
clusterManager = LOCAL
7878
} else if (args.master.startsWith("yarn")) {
@@ -121,24 +121,30 @@ object SparkSubmit {
121121
printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
122122
}
123123

124-
// If we're running a Python app, set the Java class to run to be our PythonRunner, add
125-
// Python files to deployment list, and pass the main file and Python path to PythonRunner
124+
// If we're running a python app, set the main class to our specific python runner
126125
if (isPython) {
127126
if (deployOnCluster) {
128127
printErrorAndExit("Cannot currently run Python driver programs on cluster")
129128
}
130-
args.mainClass = "org.apache.spark.deploy.PythonRunner"
131-
args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource)
129+
if (args.primaryResource == PYSPARK_SHELL) {
130+
args.mainClass = "py4j.GatewayServer"
131+
args.childArgs ++= ArrayBuffer("--die-on-broken-pipe", "0")
132+
} else {
133+
// If a python file is provided, add it to the child arguments and list of files to deploy.
134+
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
135+
args.mainClass = "org.apache.spark.deploy.PythonRunner"
136+
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
137+
args.files = Utils.mergeFileLists(args.files, args.primaryResource)
138+
}
132139
val pyFiles = Option(args.pyFiles).getOrElse("")
133-
args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs
134-
args.primaryResource = RESERVED_JAR_NAME
140+
args.files = Utils.mergeFileLists(args.files, pyFiles)
135141
sysProps("spark.submit.pyFiles") = pyFiles
136142
}
137143

138144
// If we're deploying into YARN, use yarn.Client as a wrapper around the user class
139145
if (!deployOnCluster) {
140146
childMainClass = args.mainClass
141-
if (args.primaryResource != RESERVED_JAR_NAME) {
147+
if (isUserJar(args.primaryResource)) {
142148
childClasspath += args.primaryResource
143149
}
144150
} else if (clusterManager == YARN) {
@@ -219,7 +225,7 @@ object SparkSubmit {
219225
// For python files, the primary resource is already distributed as a regular file
220226
if (!isYarnCluster && !isPython) {
221227
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
222-
if (args.primaryResource != RESERVED_JAR_NAME) {
228+
if (isUserJar(args.primaryResource)) {
223229
jars = jars ++ Seq(args.primaryResource)
224230
}
225231
sysProps.put("spark.jars", jars.mkString(","))
@@ -293,8 +299,8 @@ object SparkSubmit {
293299
}
294300

295301
private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
296-
val localJarFile = new File(new URI(localJar).getPath())
297-
if (!localJarFile.exists()) {
302+
val localJarFile = new File(new URI(localJar).getPath)
303+
if (!localJarFile.exists) {
298304
printWarning(s"Jar $localJar does not exist, skipping.")
299305
}
300306

@@ -303,14 +309,24 @@ object SparkSubmit {
303309
}
304310

305311
/**
306-
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
307-
* no files, into a single comma-separated string.
312+
* Return whether the given primary resource represents a user jar.
313+
*/
314+
private def isUserJar(primaryResource: String): Boolean = {
315+
!isShell(primaryResource) && !isPython(primaryResource)
316+
}
317+
318+
/**
319+
* Return whether the given primary resource represents a shell.
320+
*/
321+
private def isShell(primaryResource: String): Boolean = {
322+
primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL
323+
}
324+
325+
/**
326+
* Return whether the given primary resource requires running python.
308327
*/
309-
private[spark] def mergeFileLists(lists: String*): String = {
310-
val merged = lists.filter(_ != null)
311-
.flatMap(_.split(","))
312-
.mkString(",")
313-
if (merged == "") null else merged
328+
private[spark] def isPython(primaryResource: String): Boolean = {
329+
primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
314330
}
315331
}
316332

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
298298
case v =>
299299
primaryResource = v
300300
inSparkOpts = false
301-
isPython = v.endsWith(".py")
301+
isPython = SparkSubmit.isPython(v)
302302
parse(tail)
303303
}
304304
} else {

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1166,4 +1166,16 @@ private[spark] object Utils extends Logging {
11661166
true
11671167
}
11681168
}
1169+
1170+
/**
1171+
* Merge a sequence of comma-separated file lists into a single comma-separated string.
1172+
* The provided strings may be null or empty to indicate no files.
1173+
*/
1174+
def mergeFileLists(lists: String*): String = {
1175+
lists
1176+
.filter(_ != null)
1177+
.filter(_ != "")
1178+
.flatMap(_.split(","))
1179+
.mkString(",")
1180+
}
11691181
}

python/pyspark/java_gateway.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,13 @@ def launch_gateway():
3434
# Launch the Py4j gateway using Spark's run command so that we pick up the
3535
# proper classpath and settings from spark-env.sh
3636
on_windows = platform.system() == "Windows"
37-
script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
38-
command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
39-
"--die-on-broken-pipe", "0"]
37+
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
38+
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS")
39+
if submit_args is not None:
40+
submit_args = submit_args.split(" ")
41+
else:
42+
submit_args = []
43+
command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args
4044
if not on_windows:
4145
# Don't send ctrl-c / SIGINT to the Java gateway:
4246
def preexec_func():

python/pyspark/shell.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
if os.environ.get("SPARK_EXECUTOR_URI"):
4141
SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])
4242

43-
sc = SparkContext(os.environ.get("MASTER", "local[*]"), "PySparkShell", pyFiles=add_files)
43+
sc = SparkContext(appName="PySparkShell", pyFiles=add_files)
4444

4545
print("""Welcome to
4646
____ __

0 commit comments

Comments
 (0)