Skip to content

[SPARK-1808] Route bin/pyspark through Spark submit #799

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ export SPARK_HOME="$FWDIR"

SCALA_VERSION=2.10

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./bin/pyspark [options]"
./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit 0
fi

# Exit if the user hasn't compiled Spark
if [ ! -f "$FWDIR/RELEASE" ]; then
# Exit if the user hasn't compiled Spark
Expand Down Expand Up @@ -52,13 +58,34 @@ export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py

# If IPython options are specified, assume user wants to run IPython
if [ -n "$IPYTHON_OPTS" ]; then
IPYTHON=1
fi

# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then
exec ipython $IPYTHON_OPTS
# Build up arguments list manually to preserve quotes and backslashes.
# We export Spark submit arguments as an environment variable because shell.py must run as a
# PYTHONSTARTUP script, which does not take in arguments. This is required for IPython notebooks.

PYSPARK_SUBMIT_ARGS=""
whitespace="[[:space:]]"
for i in "$@"; do
if [[ $i =~ \" ]]; then i=$(echo $i | sed 's/\"/\\\"/g'); fi
if [[ $i =~ $whitespace ]]; then i=\"$i\"; fi
PYSPARK_SUBMIT_ARGS="$PYSPARK_SUBMIT_ARGS $i"
done
export PYSPARK_SUBMIT_ARGS

# If a python file is provided, directly run spark-submit.
if [[ "$1" =~ \.py$ ]]; then
echo -e "\nWARNING: Running python applications through ./bin/pyspark is deprecated as of Spark 1.0." 1>&2
echo -e "Use ./bin/spark-submit <python file>\n" 1>&2
exec $FWDIR/bin/spark-submit "$@"
else
exec "$PYSPARK_PYTHON" "$@"
# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" ]]; then
exec ipython $IPYTHON_OPTS
else
exec "$PYSPARK_PYTHON"
fi
fi
21 changes: 18 additions & 3 deletions bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ set FOUND_JAR=0
for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do (
set FOUND_JAR=1
)
if "%FOUND_JAR%"=="0" (
if [%FOUND_JAR%] == [0] (
echo Failed to find Spark assembly JAR.
echo You need to build Spark with sbt\sbt assembly before running this program.
goto exit
Expand All @@ -42,15 +42,30 @@ rem Load environment variables from conf\spark-env.cmd, if it exists
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"

rem Figure out which Python to use.
if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python
if [%PYSPARK_PYTHON%] == [] set PYSPARK_PYTHON=python

set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
set PYSPARK_SUBMIT_ARGS=%*

echo Running %PYSPARK_PYTHON% with PYTHONPATH=%PYTHONPATH%

"%PYSPARK_PYTHON%" %*
rem Check whether the argument is a file
for /f %%i in ('echo %1^| findstr /R "\.py"') do (
set PYTHON_FILE=%%i
)

if [%PYTHON_FILE%] == [] (
%PYSPARK_PYTHON%
) else (
echo.
echo WARNING: Running python applications through ./bin/pyspark.cmd is deprecated as of Spark 1.0.
echo Use ./bin/spark-submit ^<python file^>
echo.
"%FWDIR%\bin\spark-submit.cmd" %PYSPARK_SUBMIT_ARGS%
)

:exit
6 changes: 3 additions & 3 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ esac
# Enter posix mode for bash
set -o posix

if [[ "$@" == *--help* ]]; then
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./bin/spark-shell [options]"
./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit 0
Expand All @@ -46,11 +46,11 @@ function main(){
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
$FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main
$FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
$FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main
$FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
fi
}

Expand Down
2 changes: 1 addition & 1 deletion bin/spark-shell.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ rem

set SPARK_HOME=%~dp0..

cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-internal %* --class org.apache.spark.repl.Main
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell %* --class org.apache.spark.repl.Main
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object PythonRunner {
// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument
val pathElements = new ArrayBuffer[String]
pathElements ++= pyFiles.split(",")
pathElements ++= Option(pyFiles).getOrElse("").split(",")
pathElements += PythonUtils.sparkPythonPath
pathElements += sys.env.getOrElse("PYTHONPATH", "")
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
Expand Down
55 changes: 41 additions & 14 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ object SparkSubmit {
private var clusterManager: Int = LOCAL

/**
* A special jar name that indicates the class being run is inside of Spark itself,
* and therefore no user jar is needed.
* Special primary resource names that represent shells rather than application jars.
*/
private val RESERVED_JAR_NAME = "spark-internal"
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"

def main(args: Array[String]) {
val appArgs = new SparkSubmitArguments(args)
Expand All @@ -71,8 +71,8 @@ object SparkSubmit {
* entries for the child, a list of system properties, a list of env vars
* and the main class for the child
*/
private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String],
ArrayBuffer[String], Map[String, String], String) = {
private[spark] def createLaunchEnv(args: SparkSubmitArguments)
: (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
if (args.master.startsWith("local")) {
clusterManager = LOCAL
} else if (args.master.startsWith("yarn")) {
Expand Down Expand Up @@ -121,24 +121,30 @@ object SparkSubmit {
printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
}

// If we're running a Python app, set the Java class to run to be our PythonRunner, add
// Python files to deployment list, and pass the main file and Python path to PythonRunner
// If we're running a python app, set the main class to our specific python runner
if (isPython) {
if (deployOnCluster) {
printErrorAndExit("Cannot currently run Python driver programs on cluster")
}
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource)
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "py4j.GatewayServer"
args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
} else {
// If a python file is provided, add it to the child arguments and list of files to deploy.
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
args.files = mergeFileLists(args.files, args.primaryResource)
}
val pyFiles = Option(args.pyFiles).getOrElse("")
args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs
args.primaryResource = RESERVED_JAR_NAME
args.files = mergeFileLists(args.files, pyFiles)
sysProps("spark.submit.pyFiles") = pyFiles
}

// If we're deploying into YARN, use yarn.Client as a wrapper around the user class
if (!deployOnCluster) {
childMainClass = args.mainClass
if (args.primaryResource != RESERVED_JAR_NAME) {
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
} else if (clusterManager == YARN) {
Expand Down Expand Up @@ -219,7 +225,7 @@ object SparkSubmit {
// For python files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
if (args.primaryResource != RESERVED_JAR_NAME) {
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sysProps.put("spark.jars", jars.mkString(","))
Expand Down Expand Up @@ -293,7 +299,7 @@ object SparkSubmit {
}

private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
val localJarFile = new File(new URI(localJar).getPath())
val localJarFile = new File(new URI(localJar).getPath)
if (!localJarFile.exists()) {
printWarning(s"Jar $localJar does not exist, skipping.")
}
Expand All @@ -302,6 +308,27 @@ object SparkSubmit {
loader.addURL(url)
}

/**
* Return whether the given primary resource represents a user jar.
*/
private def isUserJar(primaryResource: String): Boolean = {
!isShell(primaryResource) && !isPython(primaryResource)
}

/**
* Return whether the given primary resource represents a shell.
*/
private def isShell(primaryResource: String): Boolean = {
primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL
}

/**
* Return whether the given primary resource requires running python.
*/
private[spark] def isPython(primaryResource: String): Boolean = {
primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
}

/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,13 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
case v =>
primaryResource = v
inSparkOpts = false
isPython = v.endsWith(".py")
isPython = SparkSubmit.isPython(v)
parse(tail)
}
} else {
childArgs += value
if (!value.isEmpty) {
childArgs += value
}
parse(tail)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ private[spark] object Utils extends Logging {
* Strip the directory from a path name
*/
def stripDirectory(path: String): String = {
path.split(File.separator).last
new File(path).getName
}

/**
Expand Down
10 changes: 6 additions & 4 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import os
import sys
import signal
import shlex
import platform
from subprocess import Popen, PIPE
from threading import Thread
from py4j.java_gateway import java_import, JavaGateway, GatewayClient


def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]

Expand All @@ -34,9 +34,11 @@ def launch_gateway():
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
"--die-on-broken-pipe", "0"]
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS")
submit_args = submit_args if submit_args is not None else ""
submit_args = shlex.split(submit_args)
command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
if os.environ.get("SPARK_EXECUTOR_URI"):
SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])

sc = SparkContext(os.environ.get("MASTER", "local[*]"), "PySparkShell", pyFiles=add_files)
sc = SparkContext(appName="PySparkShell", pyFiles=add_files)

print("""Welcome to
____ __
Expand Down