Skip to content

Commit 05879fa

Browse files
committed
Merge branch 'master' of github.com:apache/spark into pyspark-submit
2 parents a823661 + 032d663 commit 05879fa

File tree

12 files changed

+161
-30
lines changed

12 files changed

+161
-30
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ private[spark] class Worker(
6565
val REGISTRATION_TIMEOUT = 20.seconds
6666
val REGISTRATION_RETRIES = 3
6767

68-
val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", true)
68+
val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
6969
// How often worker will clean up old app folders
7070
val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
7171
// TTL for app folders/data; after TTL expires it will be cleaned up

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.util.ByteBufferInputStream
3333
* - [[org.apache.spark.scheduler.ResultTask]]
3434
*
3535
* A Spark job consists of one or more stages. The very last stage in a job consists of multiple
36-
* ResultTask's, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task
36+
* ResultTasks, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task
3737
* and sends the task output back to the driver application. A ShuffleMapTask executes the task
3838
* and divides the task output to multiple buckets (based on the task's partitioner).
3939
*

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ private[spark] class TaskSchedulerImpl(
105105
SchedulingMode.withName(schedulingModeConf.toUpperCase)
106106
} catch {
107107
case e: java.util.NoSuchElementException =>
108-
throw new SparkException(s"Urecognized spark.scheduler.mode: $schedulingModeConf")
108+
throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf")
109109
}
110110

111111
// This is a var so that we can reset it for testing purposes.

docs/configuration.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,10 +390,11 @@ Apart from these, the following properties are also available, and may be useful
390390
</tr>
391391
<tr>
392392
<td>spark.worker.cleanup.enabled</td>
393-
<td>true</td>
393+
<td>false</td>
394394
<td>
395395
Enable periodic cleanup of worker / application directories. Note that this only affects standalone
396-
mode, as YARN works differently.
396+
mode, as YARN works differently. Applications directories are cleaned up regardless of whether
397+
the application is still running.
397398
</td>
398399
</tr>
399400
<tr>

graphx/src/main/scala/org/apache/spark/graphx/Edge.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,13 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
5656

5757
object Edge {
5858
private[graphx] def lexicographicOrdering[ED] = new Ordering[Edge[ED]] {
59-
override def compare(a: Edge[ED], b: Edge[ED]): Int =
60-
(if (a.srcId != b.srcId) a.srcId - b.srcId else a.dstId - b.dstId).toInt
59+
override def compare(a: Edge[ED], b: Edge[ED]): Int = {
60+
if (a.srcId == b.srcId) {
61+
if (a.dstId == b.dstId) 0
62+
else if (a.dstId < b.dstId) -1
63+
else 1
64+
} else if (a.srcId < b.srcId) -1
65+
else 1
66+
}
6167
}
6268
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.graphx
19+
20+
import org.scalatest.FunSuite
21+
22+
class EdgeSuite extends FunSuite {
23+
test ("compare") {
24+
// decending order
25+
val testEdges: Array[Edge[Int]] = Array(
26+
Edge(0x7FEDCBA987654321L, -0x7FEDCBA987654321L, 1),
27+
Edge(0x2345L, 0x1234L, 1),
28+
Edge(0x1234L, 0x5678L, 1),
29+
Edge(0x1234L, 0x2345L, 1),
30+
Edge(-0x7FEDCBA987654321L, 0x7FEDCBA987654321L, 1)
31+
)
32+
// to ascending order
33+
val sortedEdges = testEdges.sorted(Edge.lexicographicOrdering[Int])
34+
35+
for (i <- 0 until testEdges.length) {
36+
assert(sortedEdges(i) == testEdges(testEdges.length - i - 1))
37+
}
38+
}
39+
}

pom.xml

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@
123123
<protobuf.version>2.4.1</protobuf.version>
124124
<yarn.version>${hadoop.version}</yarn.version>
125125
<hbase.version>0.94.6</hbase.version>
126+
<zookeeper.version>3.4.5</zookeeper.version>
126127
<hive.version>0.12.0</hive.version>
127128
<parquet.version>1.4.3</parquet.version>
128129
<jblas.version>1.2.3</jblas.version>
@@ -194,6 +195,17 @@
194195
<enabled>false</enabled>
195196
</snapshots>
196197
</repository>
198+
<repository>
199+
<id>mapr-repo</id>
200+
<name>MapR Repository</name>
201+
<url>http://repository.mapr.com/maven</url>
202+
<releases>
203+
<enabled>true</enabled>
204+
</releases>
205+
<snapshots>
206+
<enabled>false</enabled>
207+
</snapshots>
208+
</repository>
197209
</repositories>
198210

199211
<dependencyManagement>
@@ -490,6 +502,14 @@
490502
<groupId>commons-logging</groupId>
491503
<artifactId>commons-logging</artifactId>
492504
</exclusion>
505+
<exclusion>
506+
<groupId>org.mortbay.jetty</groupId>
507+
<artifactId>servlet-api-2.5</artifactId>
508+
</exclusion>
509+
<exclusion>
510+
<groupId>junit</groupId>
511+
<artifactId>junit</artifactId>
512+
</exclusion>
493513
</exclusions>
494514
</dependency>
495515
<dependency>
@@ -979,6 +999,19 @@
979999
</modules>
9801000
</profile>
9811001

1002+
<profile>
1003+
<id>mapr</id>
1004+
<activation>
1005+
<activeByDefault>false</activeByDefault>
1006+
</activation>
1007+
<properties>
1008+
<hadoop.version>1.0.3-mapr-3.0.3</hadoop.version>
1009+
<yarn.version>2.3.0-mapr-4.0.0-beta</yarn.version>
1010+
<hbase.version>0.94.17-mapr-1403</hbase.version>
1011+
<zookeeper.version>3.4.5-mapr-1401</zookeeper.version>
1012+
</properties>
1013+
</profile>
1014+
9821015
<!-- Build without Hadoop dependencies that are included in some runtime environments. -->
9831016
<profile>
9841017
<id>hadoop-provided</id>
@@ -1024,7 +1057,7 @@
10241057
<dependency>
10251058
<groupId>org.apache.zookeeper</groupId>
10261059
<artifactId>zookeeper</artifactId>
1027-
<version>3.4.5</version>
1060+
<version>${zookeeper.version}</version>
10281061
<scope>provided</scope>
10291062
</dependency>
10301063
</dependencies>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,19 @@ case class And(left: Expression, right: Expression) extends BinaryPredicate {
9898

9999
override def eval(input: Row): Any = {
100100
val l = left.eval(input)
101-
val r = right.eval(input)
102-
if (l == false || r == false) {
103-
false
104-
} else if (l == null || r == null ) {
105-
null
101+
if (l == false) {
102+
false
106103
} else {
107-
true
104+
val r = right.eval(input)
105+
if (r == false) {
106+
false
107+
} else {
108+
if (l != null && r != null) {
109+
true
110+
} else {
111+
null
112+
}
113+
}
108114
}
109115
}
110116
}
@@ -114,13 +120,19 @@ case class Or(left: Expression, right: Expression) extends BinaryPredicate {
114120

115121
override def eval(input: Row): Any = {
116122
val l = left.eval(input)
117-
val r = right.eval(input)
118-
if (l == true || r == true) {
123+
if (l == true) {
119124
true
120-
} else if (l == null || r == null) {
121-
null
122125
} else {
123-
false
126+
val r = right.eval(input)
127+
if (r == true) {
128+
true
129+
} else {
130+
if (l != null && r != null) {
131+
false
132+
} else {
133+
null
134+
}
135+
}
124136
}
125137
}
126138
}
@@ -133,8 +145,12 @@ case class Equals(left: Expression, right: Expression) extends BinaryComparison
133145
def symbol = "="
134146
override def eval(input: Row): Any = {
135147
val l = left.eval(input)
136-
val r = right.eval(input)
137-
if (l == null || r == null) null else l == r
148+
if (l == null) {
149+
null
150+
} else {
151+
val r = right.eval(input)
152+
if (r == null) null else l == r
153+
}
138154
}
139155
}
140156

@@ -162,7 +178,7 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
162178
extends Expression {
163179

164180
def children = predicate :: trueValue :: falseValue :: Nil
165-
def nullable = trueValue.nullable || falseValue.nullable
181+
override def nullable = trueValue.nullable || falseValue.nullable
166182
def references = children.flatMap(_.references).toSet
167183
override lazy val resolved = childrenResolved && trueValue.dataType == falseValue.dataType
168184
def dataType = {
@@ -175,8 +191,9 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
175191
}
176192

177193
type EvaluatedType = Any
194+
178195
override def eval(input: Row): Any = {
179-
if (predicate.eval(input).asInstanceOf[Boolean]) {
196+
if (true == predicate.eval(input)) {
180197
trueValue.eval(input)
181198
} else {
182199
falseValue.eval(input)

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,11 @@ private[hive] object HiveQl {
233233
}
234234
} catch {
235235
case e: Exception => throw new ParseException(sql, e)
236+
case e: NotImplementedError => sys.error(
237+
s"""
238+
|Unsupported language features in query: $sql
239+
|${dumpTree(getAst(sql))}
240+
""".stripMargin)
236241
}
237242
}
238243

@@ -865,6 +870,17 @@ private[hive] object HiveQl {
865870
IsNull(nodeToExpr(child))
866871
case Token("TOK_FUNCTION", Token("IN", Nil) :: value :: list) =>
867872
In(nodeToExpr(value), list.map(nodeToExpr))
873+
case Token("TOK_FUNCTION",
874+
Token("between", Nil) ::
875+
Token("KW_FALSE", Nil) ::
876+
target ::
877+
minValue ::
878+
maxValue :: Nil) =>
879+
880+
val targetExpression = nodeToExpr(target)
881+
And(
882+
GreaterThanOrEqual(targetExpression, nodeToExpr(minValue)),
883+
LessThanOrEqual(targetExpression, nodeToExpr(maxValue)))
868884

869885
/* Boolean Logic */
870886
case Token(AND(), left :: right:: Nil) => And(nodeToExpr(left), nodeToExpr(right))

sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -248,17 +248,31 @@ private[hive] case class HiveGenericUdf(name: String, children: Seq[Expression])
248248
isUDFDeterministic && children.foldLeft(true)((prev, n) => prev && n.foldable)
249249
}
250250

251+
protected lazy val deferedObjects = Array.fill[DeferredObject](children.length)({
252+
new DeferredObjectAdapter
253+
})
254+
255+
// Adapter from Catalyst ExpressionResult to Hive DeferredObject
256+
class DeferredObjectAdapter extends DeferredObject {
257+
private var func: () => Any = _
258+
def set(func: () => Any) {
259+
this.func = func
260+
}
261+
override def prepare(i: Int) = {}
262+
override def get(): AnyRef = wrap(func())
263+
}
264+
251265
val dataType: DataType = inspectorToDataType(returnInspector)
252266

253267
override def eval(input: Row): Any = {
254268
returnInspector // Make sure initialized.
255-
val args = children.map { v =>
256-
new DeferredObject {
257-
override def prepare(i: Int) = {}
258-
override def get(): AnyRef = wrap(v.eval(input))
259-
}
260-
}.toArray
261-
unwrap(function.evaluate(args))
269+
var i = 0
270+
while (i < children.length) {
271+
val idx = i
272+
deferedObjects(i).asInstanceOf[DeferredObjectAdapter].set(() => {children(idx).eval(input)})
273+
i += 1
274+
}
275+
unwrap(function.evaluate(deferedObjects))
262276
}
263277
}
264278

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
2 val_2

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ import org.apache.spark.sql.hive.test.TestHive._
2424
*/
2525
class HiveQuerySuite extends HiveComparisonTest {
2626

27+
createQueryTest("between",
28+
"SELECT * FROM src WHERE key between 1 and 2"
29+
)
30+
2731
test("Query expressed in SQL") {
2832
assert(sql("SELECT 1").collect() === Array(Seq(1)))
2933
}

0 commit comments

Comments
 (0)