Skip to content

Commit 161c596

Browse files
committed
Revert "[SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning"
This reverts commit 4b93435.
1 parent b37defe commit 161c596

File tree

3 files changed

+8
-58
lines changed

3 files changed

+8
-58
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -314,25 +314,6 @@ object NestedColumnAliasing {
314314
}
315315
}
316316

317-
object GeneratorUnrequiredChildrenPruning {
318-
def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
319-
case p @ Project(_, g: Generate) =>
320-
val requiredAttrs = p.references ++ g.generator.references
321-
val newChild = ColumnPruning.prunedChild(g.child, requiredAttrs)
322-
val unrequired = g.generator.references -- p.references
323-
val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1))
324-
.map(_._2)
325-
if (!newChild.fastEquals(g.child) ||
326-
unrequiredIndices.toSet != g.unrequiredChildIndex.toSet) {
327-
Some(p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices)))
328-
} else {
329-
None
330-
}
331-
case _ => None
332-
}
333-
}
334-
335-
336317
/**
337318
* This prunes unnecessary nested columns from [[Generate]], or [[Project]] -> [[Generate]]
338319
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -862,12 +862,13 @@ object ColumnPruning extends Rule[LogicalPlan] {
862862
e.copy(child = prunedChild(child, e.references))
863863

864864
// prune unrequired references
865-
// There are 2 types of pruning here:
866-
// 1. For attributes in g.child.outputSet that is not used by the generator nor the project,
867-
// we directly remove it from the output list of g.child.
868-
// 2. For attributes that is not used by the project but it is used by the generator, we put
869-
// it in g.unrequiredChildIndex to save memory usage.
870-
case GeneratorUnrequiredChildrenPruning(rewrittenPlan) => rewrittenPlan
865+
case p @ Project(_, g: Generate) if p.references != g.outputSet =>
866+
val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references
867+
val newChild = prunedChild(g.child, requiredAttrs)
868+
val unrequired = g.generator.references -- p.references
869+
val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1))
870+
.map(_._2)
871+
p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices))
871872

872873
// prune unrequired nested fields from `Generate`.
873874
case GeneratorNestedColumnAliasing(rewrittenPlan) => rewrittenPlan
@@ -928,7 +929,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
928929
})
929930

930931
/** Applies a projection only when the child is producing unnecessary attributes */
931-
def prunedChild(c: LogicalPlan, allReferences: AttributeSet): LogicalPlan =
932+
private def prunedChild(c: LogicalPlan, allReferences: AttributeSet) =
932933
if (!c.outputSet.subsetOf(allReferences)) {
933934
Project(c.output.filter(allReferences.contains), c)
934935
} else {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
2424
import org.apache.spark.sql.catalyst.dsl.plans._
2525
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2626
import org.apache.spark.sql.catalyst.expressions._
27-
import org.apache.spark.sql.catalyst.optimizer.NestedColumnAliasingSuite.collectGeneratedAliases
2827
import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
2928
import org.apache.spark.sql.catalyst.plans.logical._
3029
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -465,37 +464,6 @@ class ColumnPruningSuite extends PlanTest {
465464
comparePlans(Optimize.execute(plan1.analyze), correctAnswer1)
466465
}
467466

468-
test("SPARK-38531: Nested field pruning for Project and PosExplode") {
469-
val name = StructType.fromDDL("first string, middle string, last string")
470-
val employer = StructType.fromDDL("id int, company struct<name:string, address:string>")
471-
val contact = LocalRelation(
472-
'id.int,
473-
'name.struct(name),
474-
'address.string,
475-
'friends.array(name),
476-
'relatives.map(StringType, name),
477-
'employer.struct(employer))
478-
479-
val query = contact
480-
.select('id, 'friends)
481-
.generate(PosExplode('friends))
482-
.select('col.getField("middle"))
483-
.analyze
484-
val optimized = Optimize.execute(query)
485-
486-
val aliases = collectGeneratedAliases(optimized)
487-
488-
val expected = contact
489-
// GetStructField is pushed down, unused id column is pruned.
490-
.select(
491-
'friends.getField("middle").as(aliases(0)))
492-
.generate(PosExplode($"${aliases(0)}"),
493-
unrequiredChildIndex = Seq(0)) // unrequiredChildIndex is added.
494-
.select('col.as("col.middle"))
495-
.analyze
496-
comparePlans(optimized, expected)
497-
}
498-
499467
test("SPARK-39445: Remove the window if windowExpressions is empty in column pruning") {
500468
object CustomOptimize extends RuleExecutor[LogicalPlan] {
501469
val batches = Batch("Column pruning", FixedPoint(10),

0 commit comments

Comments
 (0)