Skip to content

Commit 22583c1

Browse files
authored
fix: implement lazy evaluation in Coalesce function (#2270)
1 parent a74fea1 commit 22583c1

File tree

3 files changed

+56
-6
lines changed

3 files changed

+56
-6
lines changed

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,8 @@ object QueryPlanSerde extends Logging with CometExprShim {
194194
classOf[Unhex] -> CometUnhex,
195195
classOf[Pow] -> CometScalarFunction[Pow]("pow"),
196196
classOf[If] -> CometIf,
197-
classOf[CaseWhen] -> CometCaseWhen)
197+
classOf[CaseWhen] -> CometCaseWhen,
198+
classOf[Coalesce] -> CometCoalesce)
198199

199200
/**
200201
* Mapping of Spark aggregate expression class to Comet expression handler.
@@ -984,10 +985,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
984985
None
985986
}
986987

987-
case a @ Coalesce(_) =>
988-
val exprChildren = a.children.map(exprToProtoInternal(_, inputs, binding))
989-
scalarFunctionExprToProto("coalesce", exprChildren: _*)
990-
991988
// With Spark 3.4, CharVarcharCodegenUtils.readSidePadding gets called to pad spaces for
992989
// char types.
993990
// See https://github.com/apache/spark/pull/38151

spark/src/main/scala/org/apache/comet/serde/conditional.scala

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ package org.apache.comet.serde
2121

2222
import scala.collection.JavaConverters._
2323

24-
import org.apache.spark.sql.catalyst.expressions.{Attribute, CaseWhen, Expression, If}
24+
import org.apache.spark.sql.catalyst.expressions.{Attribute, CaseWhen, Coalesce, Expression, If, IsNotNull}
2525

2626
import org.apache.comet.CometSparkSessionExtensions.withInfo
2727
import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal
@@ -91,3 +91,42 @@ object CometCaseWhen extends CometExpressionSerde[CaseWhen] {
9191
}
9292
}
9393
}
94+
95+
object CometCoalesce extends CometExpressionSerde[Coalesce] {
96+
override def convert(
97+
expr: Coalesce,
98+
inputs: Seq[Attribute],
99+
binding: Boolean): Option[ExprOuterClass.Expr] = {
100+
val branches = expr.children.dropRight(1).map { child =>
101+
(IsNotNull(child), child)
102+
}
103+
val elseValue = expr.children.last
104+
val whenSeq = branches.map(elements => {
105+
exprToProtoInternal(elements._1, inputs, binding)
106+
})
107+
val thenSeq = branches.map(elements => {
108+
exprToProtoInternal(elements._2, inputs, binding)
109+
})
110+
assert(whenSeq.length == thenSeq.length)
111+
if (whenSeq.forall(_.isDefined) && thenSeq.forall(_.isDefined)) {
112+
val builder = ExprOuterClass.CaseWhen.newBuilder()
113+
builder.addAllWhen(whenSeq.map(_.get).asJava)
114+
builder.addAllThen(thenSeq.map(_.get).asJava)
115+
val elseValueExpr = exprToProtoInternal(elseValue, inputs, binding)
116+
if (elseValueExpr.isDefined) {
117+
builder.setElseExpr(elseValueExpr.get)
118+
} else {
119+
withInfo(expr, elseValue)
120+
return None
121+
}
122+
Some(
123+
ExprOuterClass.Expr
124+
.newBuilder()
125+
.setCaseWhen(builder)
126+
.build())
127+
} else {
128+
withInfo(expr, branches.map(_._2): _*)
129+
None
130+
}
131+
}
132+
}

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,20 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
394394
}
395395
}
396396

397+
test("test coalesce lazy eval") {
398+
withSQLConf(
399+
SQLConf.ANSI_ENABLED.key -> "true",
400+
CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
401+
val data = Seq((9999999999999L, 0))
402+
withParquetTable(data, "t1") {
403+
val res = spark.sql("""
404+
|SELECT coalesce(_1, CAST(_1 AS TINYINT)) from t1;
405+
| """.stripMargin)
406+
checkSparkAnswerAndOperator(res)
407+
}
408+
}
409+
}
410+
397411
test("dictionary arithmetic") {
398412
// TODO: test ANSI mode
399413
withSQLConf(SQLConf.ANSI_ENABLED.key -> "false", "parquet.enable.dictionary" -> "true") {

0 commit comments

Comments
 (0)