From 0f5ec0375443236aff2265841fb7c5dbee25dff9 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Tue, 5 Aug 2025 20:33:59 +0200 Subject: [PATCH] [SPARK-53094][SQL][4.0] Fix CUBE with aggregate containing HAVING clauses This is an alternative PR to https://github.com/apache/spark/pull/51810 to fix a regresion introduced in Spark 3.2 with https://github.com/apache/spark/pull/32470. This PR defers the resolution of not fully resolved `UnresolvedHaving` nodes from `ResolveGroupingAnalytics`: ``` === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics === 'Sort ['s DESC NULLS LAST], true 'Sort ['s DESC NULLS LAST], true !+- 'UnresolvedHaving ('count('product) > 2) +- 'UnresolvedHaving ('count(tempresolvedcolumn(product#261, product, false)) > 2) ! +- 'Aggregate [cube(Vector(0), Vector(1), product#261, region#262)], [product#261, region#262, sum(amount#263) AS s#264L] +- Aggregate [product#269, region#270, spark_grouping_id#268L], [product#269, region#270, sum(amount#263) AS s#264L] ! +- SubqueryAlias t +- Expand [[product#261, region#262, amount#263, product#266, region#267, 0], [product#261, region#262, amount#263, product#266, null, 1], [product#261, region#262, amount#263, null, region#267, 2], [product#261, region#262, amount#263, null, null, 3]], [product#261, region#262, amount#263, product#269, region#270, spark_grouping_id#268L] ! +- LocalRelation [product#261, region#262, amount#263] +- Project [product#261, region#262, amount#263, product#261 AS product#266, region#262 AS region#267] ! +- SubqueryAlias t ! +- LocalRelation [product#261, region#262, amount#263] ``` to `ResolveAggregateFunctions` to add the correct aggregate expressions (`count(product#261)`): ``` === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions === 'Sort ['s DESC NULLS LAST], true 'Sort ['s DESC NULLS LAST], true !+- 'UnresolvedHaving (count(tempresolvedcolumn(product#261, product, false)) > cast(2 as bigint)) +- Project [product#269, region#270, s#264L] ! +- Aggregate [product#269, region#270, spark_grouping_id#268L], [product#269, region#270, sum(amount#263) AS s#264L] +- Filter (count(product)#272L > cast(2 as bigint)) ! +- Expand [[product#261, region#262, amount#263, product#266, region#267, 0], [product#261, region#262, amount#263, product#266, null, 1], [product#261, region#262, amount#263, null, region#267, 2], [product#261, region#262, amount#263, null, null, 3]], [product#261, region#262, amount#263, product#269, region#270, spark_grouping_id#268L] +- Aggregate [product#269, region#270, spark_grouping_id#268L], [product#269, region#270, sum(amount#263) AS s#264L, count(product#261) AS count(product)#272L] ! +- Project [product#261, region#262, amount#263, product#261 AS product#266, region#262 AS region#267] +- Expand [[product#261, region#262, amount#263, product#266, region#267, 0], [product#261, region#262, amount#263, product#266, null, 1], [product#261, region#262, amount#263, null, region#267, 2], [product#261, region#262, amount#263, null, null, 3]], [product#261, region#262, amount#263, product#269, region#270, spark_grouping_id#268L] ! +- SubqueryAlias t +- Project [product#261, region#262, amount#263, product#261 AS product#266, region#262 AS region#267] ! +- LocalRelation [product#261, region#262, amount#263] +- SubqueryAlias t ! +- LocalRelation [product#261, region#262, amount#263] ``` Fix a correctness isue described in https://github.com/apache/spark/pull/51810. Yes, it fixes a correctness issue. Added new UT from https://github.com/apache/spark/pull/51810. No. Closes #51820 from peter-toth/SPARK-53094-fix-cube-having. Lead-authored-by: Peter Toth Co-authored-by: harris233 <1657417742@qq.com> Signed-off-by: Peter Toth --- .../spark/sql/catalyst/analysis/Analyzer.scala | 4 ++++ .../analyzer-results/grouping_set.sql.out | 2 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 16 ++++++++++++++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index bde89c7b5fa16..1045e73f22423 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -768,6 +768,10 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor } else { colResolved.havingCondition } + // `cond` might contain unresolved aggregate functions so defer its resolution to + // `ResolveAggregateFunctions` rule if needed. + if (!cond.resolved) return colResolved + // Try resolving the condition of the filter as though it is in the aggregate clause val (extraAggExprs, Seq(resolvedHavingCond)) = ResolveAggregateFunctions.resolveExprsWithAggregate(Seq(cond), aggForResolving) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/grouping_set.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/grouping_set.sql.out index b73ee16c8bdef..5fbf678bfdbf3 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/grouping_set.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/grouping_set.sql.out @@ -86,7 +86,7 @@ FROM (VALUES ('x', 'a', 10), ('y', 'b', 20) ) AS t (c1, c2, c3) GROUP BY GROUPING SETS ( ( c1 ), ( c2 ) ) HAVING GROUPING__ID > 1 -- !query analysis -Filter (grouping__id#xL > cast(1 as bigint)) +Filter (GROUPING__ID#xL > cast(1 as bigint)) +- Aggregate [c1#x, c2#x, spark_grouping_id#xL], [c1#x, c2#x, sum(c3#x) AS sum(c3)#xL, spark_grouping_id#xL AS grouping__id#xL] +- Expand [[c1#x, c2#x, c3#x, c1#x, null, 1], [c1#x, c2#x, c3#x, null, c2#x, 2]], [c1#x, c2#x, c3#x, c1#x, c2#x, spark_grouping_id#xL] +- Project [c1#x, c2#x, c3#x, c1#x AS c1#x, c2#x AS c2#x] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index b3fce19979e86..f294ff81021d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -4921,6 +4921,22 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark Row(Array(0), Array(0)), Row(Array(1), Array(1)), Row(Array(2), Array(2))) checkAnswer(df, expectedAnswer) } + + test("SPARK-53094: Fix cube-related data quality problem") { + val df = sql( + """SELECT product, region, sum(amount) AS s + |FROM VALUES + | ('a', 'east', 100), + | ('b', 'east', 200), + | ('a', 'west', 150), + | ('b', 'west', 250), + | ('a', 'east', 120) AS t(product, region, amount) + |GROUP BY product, region WITH CUBE + |HAVING count(product) > 2 + |ORDER BY s DESC""".stripMargin) + + checkAnswer(df, Seq(Row(null, null, 820), Row(null, "east", 420), Row("a", null, 370))) + } } case class Foo(bar: Option[String])