Skip to content

Commit 32a523b

Browse files
beliefercloud-fan
authored andcommitted
[SPARK-34234][SQL] Remove TreeNodeException that didn't work
### What changes were proposed in this pull request? `TreeNodeException` causes the error msg not clear and it didn't work well. Because the `TreeNodeException` looks redundancy, we could remove it. There are show a case: ``` val df = Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4)).toDF("x", "y") val hashAggDF = df.groupBy("x").agg(c, sum("y")) ``` The above code will use `HashAggregateExec`. In order to ensure that an exception will be thrown when executing `HashAggregateExec`, I added `throw new RuntimeException("calculate error")` into https://github.com/apache/spark/blob/72b7f8abfb60d0008f1f9bed94ce1c367a7d7cce/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L85 So, if the above code is executed, `RuntimeException("calculate error")` will be thrown. Before this PR, the error is: ``` execute, tree: ShuffleQueryStage 0 +- Exchange hashpartitioning(x#105, 5), ENSURE_REQUIREMENTS, [id=#168] +- HashAggregate(keys=[x#105], functions=[partial_sum(y#106)], output=[x#105, sum#118L]) +- Project [_1#100 AS x#105, _2#101 AS y#106] +- LocalTableScan [_1#100, _2#101] org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: ShuffleQueryStage 0 +- Exchange hashpartitioning(x#105, 5), ENSURE_REQUIREMENTS, [id=#168] +- HashAggregate(keys=[x#105], functions=[partial_sum(y#106)], output=[x#105, sum#118L]) +- Project [_1#100 AS x#105, _2#101 AS y#106] +- LocalTableScan [_1#100, _2#101] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:163) at org.apache.spark.sql.execution.adaptive.QueryStageExec.$anonfun$materialize$1(QueryStageExec.scala:81) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:79) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:207) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:205) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:205) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:179) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:289) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3708) at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2977) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3699) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3697) at org.apache.spark.sql.Dataset.collect(Dataset.scala:2977) at org.apache.spark.sql.DataFrameAggregateSuite.$anonfun$assertNoExceptions$3(DataFrameAggregateSuite.scala:665) at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54) at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38) at org.apache.spark.sql.DataFrameAggregateSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(DataFrameAggregateSuite.scala:37) at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:246) at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:244) at org.apache.spark.sql.DataFrameAggregateSuite.withSQLConf(DataFrameAggregateSuite.scala:37) at org.apache.spark.sql.DataFrameAggregateSuite.$anonfun$assertNoExceptions$2(DataFrameAggregateSuite.scala:659) at org.apache.spark.sql.DataFrameAggregateSuite.$anonfun$assertNoExceptions$2$adapted(DataFrameAggregateSuite.scala:655) at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876) at org.apache.spark.sql.DataFrameAggregateSuite.assertNoExceptions(DataFrameAggregateSuite.scala:655) at org.apache.spark.sql.DataFrameAggregateSuite.$anonfun$new$126(DataFrameAggregateSuite.scala:695) at org.apache.spark.sql.DataFrameAggregateSuite.$anonfun$new$126$adapted(DataFrameAggregateSuite.scala:695) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.DataFrameAggregateSuite.$anonfun$new$125(DataFrameAggregateSuite.scala:695) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:190) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:176) at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:188) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:200) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:200) at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:182) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:61) at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:61) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:233) at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) at scala.collection.immutable.List.foreach(List.scala:392) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:233) at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:232) at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563) at org.scalatest.Suite.run(Suite.scala:1112) at org.scalatest.Suite.run$(Suite.scala:1094) at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:237) at org.scalatest.SuperEngine.runImpl(Engine.scala:535) at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:237) at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:236) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:61) at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:61) at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314) at scala.collection.immutable.List.foreach(List.scala:392) at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971) at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480) at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971) at org.scalatest.tools.Runner$.run(Runner.scala:798) at org.scalatest.tools.Runner.run(Runner.scala) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28) Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: HashAggregate(keys=[x#105], functions=[partial_sum(y#106)], output=[x#105, sum#118L]) +- Project [_1#100 AS x#105, _2#101 AS y#106] +- LocalTableScan [_1#100, _2#101] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doExecute(HashAggregateExec.scala:84) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:118) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:118) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:122) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:121) at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.$anonfun$doMaterialize$1(QueryStageExec.scala:163) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 91 more Caused by: java.lang.RuntimeException: calculate error at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1(HashAggregateExec.scala:85) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 103 more ``` After this PR, the error is: ``` calculate error java.lang.RuntimeException: calculate error at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doExecute(HashAggregateExec.scala:84) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:117) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:117) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:121) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:120) at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:161) at org.apache.spark.sql.execution.adaptive.QueryStageExec.$anonfun$materialize$1(QueryStageExec.scala:80) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:78) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:207) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:205) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:205) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:179) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:289) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3708) at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2977) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3699) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3697) at org.apache.spark.sql.Dataset.collect(Dataset.scala:2977) at org.apache.spark.sql.DataFrameAggregateSuite.$anonfun$assertNoExceptions$3(DataFrameAggregateSuite.scala:665) at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54) at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38) at org.apache.spark.sql.DataFrameAggregateSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(DataFrameAggregateSuite.scala:37) at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:246) at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:244) at org.apache.spark.sql.DataFrameAggregateSuite.withSQLConf(DataFrameAggregateSuite.scala:37) at org.apache.spark.sql.DataFrameAggregateSuite.$anonfun$assertNoExceptions$2(DataFrameAggregateSuite.scala:659) at org.apache.spark.sql.DataFrameAggregateSuite.$anonfun$assertNoExceptions$2$adapted(DataFrameAggregateSuite.scala:655) at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876) at org.apache.spark.sql.DataFrameAggregateSuite.assertNoExceptions(DataFrameAggregateSuite.scala:655) at org.apache.spark.sql.DataFrameAggregateSuite.$anonfun$new$126(DataFrameAggregateSuite.scala:695) at org.apache.spark.sql.DataFrameAggregateSuite.$anonfun$new$126$adapted(DataFrameAggregateSuite.scala:695) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.DataFrameAggregateSuite.$anonfun$new$125(DataFrameAggregateSuite.scala:695) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:190) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:176) at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:188) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:200) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:200) at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:182) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:61) at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:61) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:233) at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) at scala.collection.immutable.List.foreach(List.scala:392) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:233) at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:232) at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563) at org.scalatest.Suite.run(Suite.scala:1112) at org.scalatest.Suite.run$(Suite.scala:1094) at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:237) at org.scalatest.SuperEngine.runImpl(Engine.scala:535) at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:237) at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:236) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:61) at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:61) at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314) at scala.collection.immutable.List.foreach(List.scala:392) at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971) at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480) at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971) at org.scalatest.tools.Runner$.run(Runner.scala:798) at org.scalatest.tools.Runner.run(Runner.scala) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28) ``` ### Why are the changes needed? `TreeNodeException` didn't work well. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? Jenkins test. Closes #31337 from beliefer/SPARK-34234. Lead-authored-by: gengjiaan <[email protected]> Co-authored-by: beliefer <[email protected]> Co-authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent c082c53 commit 32a523b

File tree

24 files changed

+96
-175
lines changed

24 files changed

+96
-175
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala

Lines changed: 44 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis
1919

20+
import org.apache.spark.sql.AnalysisException
2021
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
21-
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2222
import org.apache.spark.sql.catalyst.expressions._
2323
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
2424
import org.apache.spark.sql.catalyst.parser.ParserUtils
2525
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode}
26-
import org.apache.spark.sql.catalyst.trees.TreeNode
2726
import org.apache.spark.sql.catalyst.util.quoteIdentifier
2827
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
2928
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
@@ -34,8 +33,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
3433
* Thrown when an invalid attempt is made to access a property of a tree that has yet to be fully
3534
* resolved.
3635
*/
37-
class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: String)
38-
extends TreeNodeException(tree, s"Invalid call to $function on unresolved object", null)
36+
class UnresolvedException(function: String)
37+
extends AnalysisException(s"Invalid call to $function on unresolved object")
3938

4039
/**
4140
* Holds the name of a relation that has yet to be looked up in a catalog.
@@ -145,10 +144,10 @@ case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute with Un
145144
def name: String =
146145
nameParts.map(n => if (n.contains(".")) s"`$n`" else n).mkString(".")
147146

148-
override def exprId: ExprId = throw new UnresolvedException(this, "exprId")
149-
override def dataType: DataType = throw new UnresolvedException(this, "dataType")
150-
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
151-
override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier")
147+
override def exprId: ExprId = throw new UnresolvedException("exprId")
148+
override def dataType: DataType = throw new UnresolvedException("dataType")
149+
override def nullable: Boolean = throw new UnresolvedException("nullable")
150+
override def qualifier: Seq[String] = throw new UnresolvedException("qualifier")
152151
override lazy val resolved = false
153152

154153
override def newInstance(): UnresolvedAttribute = this
@@ -235,10 +234,10 @@ object UnresolvedAttribute {
235234
case class UnresolvedGenerator(name: FunctionIdentifier, children: Seq[Expression])
236235
extends Generator {
237236

238-
override def elementSchema: StructType = throw new UnresolvedException(this, "elementTypes")
239-
override def dataType: DataType = throw new UnresolvedException(this, "dataType")
240-
override def foldable: Boolean = throw new UnresolvedException(this, "foldable")
241-
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
237+
override def elementSchema: StructType = throw new UnresolvedException("elementTypes")
238+
override def dataType: DataType = throw new UnresolvedException("dataType")
239+
override def foldable: Boolean = throw new UnresolvedException("foldable")
240+
override def nullable: Boolean = throw new UnresolvedException("nullable")
242241
override lazy val resolved = false
243242

244243
override def prettyName: String = name.unquotedString
@@ -264,8 +263,8 @@ case class UnresolvedFunction(
264263

265264
override def children: Seq[Expression] = arguments ++ filter.toSeq
266265

267-
override def dataType: DataType = throw new UnresolvedException(this, "dataType")
268-
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
266+
override def dataType: DataType = throw new UnresolvedException("dataType")
267+
override def nullable: Boolean = throw new UnresolvedException("nullable")
269268
override lazy val resolved = false
270269

271270
override def prettyName: String = name.unquotedString
@@ -287,13 +286,13 @@ object UnresolvedFunction {
287286
*/
288287
abstract class Star extends LeafExpression with NamedExpression {
289288

290-
override def name: String = throw new UnresolvedException(this, "name")
291-
override def exprId: ExprId = throw new UnresolvedException(this, "exprId")
292-
override def dataType: DataType = throw new UnresolvedException(this, "dataType")
293-
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
294-
override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier")
295-
override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute")
296-
override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance")
289+
override def name: String = throw new UnresolvedException("name")
290+
override def exprId: ExprId = throw new UnresolvedException("exprId")
291+
override def dataType: DataType = throw new UnresolvedException("dataType")
292+
override def nullable: Boolean = throw new UnresolvedException("nullable")
293+
override def qualifier: Seq[String] = throw new UnresolvedException("qualifier")
294+
override def toAttribute: Attribute = throw new UnresolvedException("toAttribute")
295+
override def newInstance(): NamedExpression = throw new UnresolvedException("newInstance")
297296
override lazy val resolved = false
298297

299298
def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression]
@@ -412,19 +411,19 @@ case class UnresolvedRegex(regexPattern: String, table: Option[String], caseSens
412411
case class MultiAlias(child: Expression, names: Seq[String])
413412
extends UnaryExpression with NamedExpression with Unevaluable {
414413

415-
override def name: String = throw new UnresolvedException(this, "name")
414+
override def name: String = throw new UnresolvedException("name")
416415

417-
override def exprId: ExprId = throw new UnresolvedException(this, "exprId")
416+
override def exprId: ExprId = throw new UnresolvedException("exprId")
418417

419-
override def dataType: DataType = throw new UnresolvedException(this, "dataType")
418+
override def dataType: DataType = throw new UnresolvedException("dataType")
420419

421-
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
420+
override def nullable: Boolean = throw new UnresolvedException("nullable")
422421

423-
override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier")
422+
override def qualifier: Seq[String] = throw new UnresolvedException("qualifier")
424423

425-
override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute")
424+
override def toAttribute: Attribute = throw new UnresolvedException("toAttribute")
426425

427-
override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance")
426+
override def newInstance(): NamedExpression = throw new UnresolvedException("newInstance")
428427

429428
override lazy val resolved = false
430429

@@ -439,7 +438,7 @@ case class MultiAlias(child: Expression, names: Seq[String])
439438
* @param expressions Expressions to expand.
440439
*/
441440
case class ResolvedStar(expressions: Seq[NamedExpression]) extends Star with Unevaluable {
442-
override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance")
441+
override def newInstance(): NamedExpression = throw new UnresolvedException("newInstance")
443442
override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = expressions
444443
override def toString: String = expressions.mkString("ResolvedStar(", ", ", ")")
445444
}
@@ -458,8 +457,8 @@ case class UnresolvedExtractValue(child: Expression, extraction: Expression)
458457
override def left: Expression = child
459458
override def right: Expression = extraction
460459

461-
override def dataType: DataType = throw new UnresolvedException(this, "dataType")
462-
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
460+
override def dataType: DataType = throw new UnresolvedException("dataType")
461+
override def nullable: Boolean = throw new UnresolvedException("nullable")
463462
override lazy val resolved = false
464463

465464
override def toString: String = s"$child[$extraction]"
@@ -479,13 +478,13 @@ case class UnresolvedAlias(
479478
aliasFunc: Option[Expression => String] = None)
480479
extends UnaryExpression with NamedExpression with Unevaluable {
481480

482-
override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute")
483-
override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier")
484-
override def exprId: ExprId = throw new UnresolvedException(this, "exprId")
485-
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
486-
override def dataType: DataType = throw new UnresolvedException(this, "dataType")
487-
override def name: String = throw new UnresolvedException(this, "name")
488-
override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance")
481+
override def toAttribute: Attribute = throw new UnresolvedException("toAttribute")
482+
override def qualifier: Seq[String] = throw new UnresolvedException("qualifier")
483+
override def exprId: ExprId = throw new UnresolvedException("exprId")
484+
override def nullable: Boolean = throw new UnresolvedException("nullable")
485+
override def dataType: DataType = throw new UnresolvedException("dataType")
486+
override def name: String = throw new UnresolvedException("name")
487+
override def newInstance(): NamedExpression = throw new UnresolvedException("newInstance")
489488

490489
override lazy val resolved = false
491490
}
@@ -527,14 +526,14 @@ case class UnresolvedDeserializer(deserializer: Expression, inputAttributes: Seq
527526
require(inputAttributes.forall(_.resolved), "Input attributes must all be resolved.")
528527

529528
override def child: Expression = deserializer
530-
override def dataType: DataType = throw new UnresolvedException(this, "dataType")
531-
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
529+
override def dataType: DataType = throw new UnresolvedException("dataType")
530+
override def nullable: Boolean = throw new UnresolvedException("nullable")
532531
override lazy val resolved = false
533532
}
534533

535534
case class GetColumnByOrdinal(ordinal: Int, dataType: DataType) extends LeafExpression
536535
with Unevaluable with NonSQLExpression {
537-
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
536+
override def nullable: Boolean = throw new UnresolvedException("nullable")
538537
override lazy val resolved = false
539538
}
540539

@@ -550,8 +549,8 @@ case class GetColumnByOrdinal(ordinal: Int, dataType: DataType) extends LeafExpr
550549
*/
551550
case class UnresolvedOrdinal(ordinal: Int)
552551
extends LeafExpression with Unevaluable with NonSQLExpression {
553-
override def dataType: DataType = throw new UnresolvedException(this, "dataType")
554-
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
552+
override def dataType: DataType = throw new UnresolvedException("dataType")
553+
override def nullable: Boolean = throw new UnresolvedException("nullable")
555554
override lazy val resolved = false
556555
}
557556

@@ -571,7 +570,7 @@ case class UnresolvedHaving(
571570
* A place holder expression used in random functions, will be replaced after analyze.
572571
*/
573572
case object UnresolvedSeed extends LeafExpression with Unevaluable {
574-
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
575-
override def dataType: DataType = throw new UnresolvedException(this, "dataType")
573+
override def nullable: Boolean = throw new UnresolvedException("nullable")
574+
override def dataType: DataType = throw new UnresolvedException("dataType")
576575
override lazy val resolved = false
577576
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala

Lines changed: 0 additions & 59 deletions
This file was deleted.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.expressions
1919

2020
import org.apache.spark.internal.Logging
2121
import org.apache.spark.sql.catalyst.InternalRow
22-
import org.apache.spark.sql.catalyst.errors.attachTree
2322
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral, JavaCode}
2423
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
2524
import org.apache.spark.sql.types._
@@ -72,17 +71,16 @@ object BindReferences extends Logging {
7271
input: AttributeSeq,
7372
allowFailures: Boolean = false): A = {
7473
expression.transform { case a: AttributeReference =>
75-
attachTree(a, "Binding attribute") {
76-
val ordinal = input.indexOf(a.exprId)
77-
if (ordinal == -1) {
78-
if (allowFailures) {
79-
a
80-
} else {
81-
sys.error(s"Couldn't find $a in ${input.attrs.mkString("[", ",", "]")}")
82-
}
74+
val ordinal = input.indexOf(a.exprId)
75+
if (ordinal == -1) {
76+
if (allowFailures) {
77+
a
8378
} else {
84-
BoundReference(ordinal, a.dataType, input(ordinal).nullable)
79+
throw new IllegalStateException(
80+
s"Couldn't find $a in ${input.attrs.mkString("[", ",", "]")}")
8581
}
82+
} else {
83+
BoundReference(ordinal, a.dataType, input(ordinal).nullable)
8684
}
8785
}.asInstanceOf[A] // Kind of a hack, but safe. TODO: Tighten return type when possible.
8886
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ case class UnresolvedNamedLambdaVariable(nameParts: Seq[String])
4040
override def name: String =
4141
nameParts.map(n => if (n.contains(".")) s"`$n`" else n).mkString(".")
4242

43-
override def exprId: ExprId = throw new UnresolvedException(this, "exprId")
44-
override def dataType: DataType = throw new UnresolvedException(this, "dataType")
45-
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
46-
override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier")
47-
override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute")
48-
override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance")
43+
override def exprId: ExprId = throw new UnresolvedException("exprId")
44+
override def dataType: DataType = throw new UnresolvedException("dataType")
45+
override def nullable: Boolean = throw new UnresolvedException("nullable")
46+
override def qualifier: Seq[String] = throw new UnresolvedException("qualifier")
47+
override def toAttribute: Attribute = throw new UnresolvedException("toAttribute")
48+
override def newInstance(): NamedExpression = throw new UnresolvedException("newInstance")
4949
override lazy val resolved = false
5050

5151
override def toString: String = s"lambda '$name"

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,8 @@ case class UnresolvedWindowExpression(
272272
child: Expression,
273273
windowSpec: WindowSpecReference) extends UnaryExpression with Unevaluable {
274274

275-
override def dataType: DataType = throw new UnresolvedException(this, "dataType")
276-
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
275+
override def dataType: DataType = throw new UnresolvedException("dataType")
276+
override def nullable: Boolean = throw new UnresolvedException("nullable")
277277
override lazy val resolved = false
278278
}
279279

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ case class MergeIntoTable(
390390
sealed abstract class MergeAction extends Expression with Unevaluable {
391391
def condition: Option[Expression]
392392
override def nullable: Boolean = false
393-
override def dataType: DataType = throw new UnresolvedException(this, "nullable")
393+
override def dataType: DataType = throw new UnresolvedException("nullable")
394394
override def children: Seq[Expression] = condition.toSeq
395395
}
396396

@@ -410,7 +410,7 @@ case class InsertAction(
410410

411411
case class Assignment(key: Expression, value: Expression) extends Expression with Unevaluable {
412412
override def nullable: Boolean = false
413-
override def dataType: DataType = throw new UnresolvedException(this, "nullable")
413+
override def dataType: DataType = throw new UnresolvedException("nullable")
414414
override def children: Seq[Expression] = key :: value :: Nil
415415
}
416416

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.rules
1919

2020
import org.apache.spark.internal.Logging
2121
import org.apache.spark.sql.catalyst.QueryPlanningTracker
22-
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2322
import org.apache.spark.sql.catalyst.trees.TreeNode
2423
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
2524
import org.apache.spark.sql.catalyst.util.sideBySide
@@ -169,7 +168,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
169168
|Once strategy's idempotence is broken for batch ${batch.name}
170169
|${sideBySide(plan.treeString, reOptimized.treeString).mkString("\n")}
171170
""".stripMargin
172-
throw new TreeNodeException(reOptimized, message, null)
171+
throw new RuntimeException(message)
173172
}
174173
}
175174

@@ -199,7 +198,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
199198
if (!isPlanIntegral(plan)) {
200199
val message = "The structural integrity of the input plan is broken in " +
201200
s"${this.getClass.getName.stripSuffix("$")}."
202-
throw new TreeNodeException(plan, message, null)
201+
throw new RuntimeException(message)
203202
}
204203

205204
batches.foreach { batch =>
@@ -232,7 +231,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
232231
if (effective && !isPlanIntegral(result)) {
233232
val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
234233
"the structural integrity of the plan is broken."
235-
throw new TreeNodeException(result, message, null)
234+
throw new RuntimeException(message)
236235
}
237236

238237
result
@@ -249,7 +248,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
249248
val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" +
250249
s"$endingMsg"
251250
if (Utils.isTesting || batch.strategy.errorOnExceed) {
252-
throw new TreeNodeException(curPlan, message, null)
251+
throw new RuntimeException(message)
253252
} else {
254253
logWarning(message)
255254
}

0 commit comments

Comments
 (0)