Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
WindowsSubstitution,
EliminateUnions,
EliminateLazyExpression),
Batch("Apply Limit All", Once, ApplyLimitAll),
Batch("Disable Hints", Once,
new ResolveHints.DisableHints),
Batch("Hints", fixedPoint,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, BaseEvalPython, CTERelationRef, Filter, Join, LimitAll, LogicalPlan, Offset, Project, SubqueryAlias, Union, Window}
import org.apache.spark.sql.catalyst.rules.Rule

object ApplyLimitAll extends Rule[LogicalPlan] {
private def applyLimitAllToPlan(plan: LogicalPlan, isInLimitAll: Boolean = false): LogicalPlan = {
plan match {
case la: LimitAll =>
applyLimitAllToPlan(la.child, isInLimitAll = true)
case cteRef: CTERelationRef if isInLimitAll =>
cteRef.copy(isUnlimitedRecursion = true)
// Allow-list for pushing down Limit All.
case _: Project | _: Filter | _: Join | _: Union | _: Offset |
_: BaseEvalPython | _: Aggregate | _: Window | _: SubqueryAlias =>
plan.withNewChildren(plan.children
.map(child => applyLimitAllToPlan(child, isInLimitAll)))
case other =>
other.withNewChildren(plan.children
.map(child => applyLimitAllToPlan(child, isInLimitAll = false)))
}
}

def apply(plan: LogicalPlan): LogicalPlan = {
applyLimitAllToPlan(plan)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
cteDefMap.get(ref.cteId).map { cteDef =>
// cteDef is certainly resolved, otherwise it would not have been in the map.
CTERelationRef(
cteDef.id, cteDef.resolved, cteDef.output, cteDef.isStreaming, maxRows = cteDef.maxRows)
cteDef.id, cteDef.resolved, cteDef.output, cteDef.isStreaming, maxRows = cteDef.maxRows,
isUnlimitedRecursion = ref.isUnlimitedRecursion)
}.getOrElse {
ref
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ package object dsl extends SQLConfHelper {

def globalLimit(limitExpr: Expression): LogicalPlan = GlobalLimit(limitExpr, logicalPlan)

def limitAll(): LogicalPlan = LimitAll(logicalPlan)

def offset(offsetExpr: Expression): LogicalPlan = Offset(offsetExpr, logicalPlan)

def join(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable
import org.apache.spark.sql.catalyst.analysis.DeduplicateRelations
import org.apache.spark.sql.catalyst.expressions.{Alias, OuterReference, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, CTERelationRef, Join, JoinHint, LogicalPlan, Project, Subquery, WithCTE}
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, CTERelationRef, Join, JoinHint, LogicalPlan, Project, Subquery, UnionLoop, WithCTE}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{CTE, PLAN_EXPRESSION}

Expand Down Expand Up @@ -189,14 +189,20 @@ case class InlineCTE(

case ref: CTERelationRef =>
val refInfo = cteMap(ref.cteId)

val cteBody = if (ref.isUnlimitedRecursion) {
setUnlimitedRecursion(refInfo.cteDef.child, ref.cteId)
} else {
refInfo.cteDef.child
}
if (refInfo.shouldInline) {
if (ref.outputSet == refInfo.cteDef.outputSet) {
refInfo.cteDef.child
cteBody
} else {
val ctePlan = DeduplicateRelations(
Join(
refInfo.cteDef.child,
refInfo.cteDef.child,
cteBody,
cteBody,
Inner,
None,
JoinHint(None, None)
Expand Down Expand Up @@ -226,6 +232,18 @@ case class InlineCTE(
case _ => plan
}
}

// Helper function to set unlimited recursion.
private def setUnlimitedRecursion(plan: LogicalPlan, id: Long): LogicalPlan = {
plan match {
case ul: UnionLoop if ul.id == id =>
// Since there is exactly one UnionLoop node with this id in the CTE body, we can stop the
// recursion here.
ul.copy(limit = Some(-1))
case other =>
other.withNewChildren(plan.children.map(child => setUnlimitedRecursion(child, id)))
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends Rule[LogicalPlan] {
cteDef
}

case cteRef @ CTERelationRef(cteId, _, output, _, _, _, _) =>
case cteRef @ CTERelationRef(cteId, _, output, _, _, _, _, _) =>
val (cteDef, _, _, newAttrSet) = cteMap(cteId)
if (needsPruning(cteDef.child, newAttrSet)) {
val indices = newAttrSet.toSeq.map(cteDef.output.indexOf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1346,15 +1346,23 @@ class AstBuilder extends DataTypeAstBuilder
}

// LIMIT
// - LIMIT ALL is the same as omitting the LIMIT clause
withOffset.optional(limit) {
if (forPipeOperators && clause.nonEmpty && clause != PipeOperators.offsetClause) {
throw QueryParsingErrors.multipleQueryResultClausesWithPipeOperatorsUnsupportedError(
ctx, clause, PipeOperators.limitClause)
}
if (forPipeOperators && clause.nonEmpty
&& clause != PipeOperators.offsetClause && limit != null) {
throw QueryParsingErrors.multipleQueryResultClausesWithPipeOperatorsUnsupportedError(
ctx, clause, PipeOperators.limitClause)
}
// LIMIT ALL creates LimitAll node which can be used for infinite recursions in recursive CTEs.
if (ctx.ALL() != null) {
clause = PipeOperators.limitClause
Limit(typedVisit(limit), withOffset)
LimitAll(withOffset)
} else {
withOffset.optional(limit) {
clause = PipeOperators.limitClause
Limit(typedVisit(limit), withOffset)
}
}


}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1659,6 +1659,22 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr
copy(child = newChild)
}

/**
* Logical node that represents the LIMIT ALL operation. This operation is usually no-op and exists
* to provide compatability with other databases. However, in case of recursive CTEs, Limit nodes
* serve another purpose, to override the default row limit which is determined by a flag. As a
* result, LIMIT ALL should also be used to completely negate the row limit, which is exactly what
* this node is used for.
*/
case class LimitAll(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output

final override val nodePatterns: Seq[TreePattern] = Seq(LIMIT)

override protected def withNewChildInternal(newChild: LogicalPlan): LimitAll =
copy(child = newChild)
}

object OffsetAndLimit {
def unapply(p: GlobalLimit): Option[(Int, Int, LogicalPlan)] = {
p match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ case class CTERelationDef(
override def output: Seq[Attribute] = if (resolved) child.output else Nil

lazy val hasSelfReferenceAsCTERef: Boolean = child.collectFirstWithSubqueries {
case CTERelationRef(this.id, _, _, _, _, true, _) => true
case CTERelationRef(this.id, _, _, _, _, true, _, _) => true
}.getOrElse(false)
lazy val hasSelfReferenceInAnchor: Boolean = {
val unionNode: Option[Union] = child match {
Expand All @@ -144,7 +144,7 @@ case class CTERelationDef(
}
if (unionNode.isDefined) {
unionNode.get.children.head.collectFirstWithSubqueries {
case CTERelationRef(this.id, _, _, _, _, true, _) => true
case CTERelationRef(this.id, _, _, _, _, true, _, _) => true
}.getOrElse(false)
} else {
false
Expand All @@ -160,7 +160,7 @@ case class CTERelationDef(
}
if (withCTENode.isDefined) {
withCTENode.exists(_.cteDefs.exists(_.collectFirstWithSubqueries {
case CTERelationRef(this.id, _, _, _, _, true, _) => true
case CTERelationRef(this.id, _, _, _, _, true, _, _) => true
}.isDefined))
} else {
false
Expand Down Expand Up @@ -194,7 +194,8 @@ case class CTERelationRef(
override val isStreaming: Boolean,
statsOpt: Option[Statistics] = None,
recursive: Boolean = false,
override val maxRows: Option[Long] = None) extends LeafNode with MultiInstanceRelation {
override val maxRows: Option[Long] = None,
isUnlimitedRecursion: Boolean = false) extends LeafNode with MultiInstanceRelation {

final override val nodePatterns: Seq[TreePattern] = Seq(CTE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ case class UnionLoopExec(
private def executeAndCacheAndCount(plan: LogicalPlan, currentLimit: Int) = {
// In case limit is defined, we create a (local) limit node above the plan and execute
// the newly created plan.
val planWithLimit = if (limit.isDefined) {
val planWithLimit = if (limit.isDefined && limit.get >= 0) {
LocalLimit(Literal(currentLimit), plan)
} else {
plan
Expand Down Expand Up @@ -167,6 +167,8 @@ case class UnionLoopExec(
// the user knows they aren't getting all the rows they requested.
var currentLimit = limit.getOrElse(rowLimit)

val unlimitedRecursion = currentLimit == -1

val userSpecifiedLimit = limit.isDefined

val unionChildren = mutable.ArrayBuffer.empty[LogicalPlan]
Expand Down Expand Up @@ -240,7 +242,7 @@ case class UnionLoopExec(

unionChildren += prevPlan

if (rowLimit != -1) {
if (!unlimitedRecursion) {
currentLimit -= prevCount.toInt
if (currentLimit <= 0) {
if (userSpecifiedLimit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ WithCTE
: +- Relation spark_catalog.default.t1[s#x,utf8_binary#x,utf8_lcase#x] parquet
+- Project [concat_ws(' ' collate UTF8_LCASE, utf8_lcase, utf8_lcase)#x]
+- SubqueryAlias cte
+- CTERelationRef xxxx, true, [concat_ws(' ' collate UTF8_LCASE, utf8_lcase, utf8_lcase)#x], false, false
+- CTERelationRef xxxx, true, [concat_ws(' ' collate UTF8_LCASE, utf8_lcase, utf8_lcase)#x], false, false, false


-- !query
Expand Down Expand Up @@ -232,7 +232,7 @@ Project [scalar-subquery#x [] AS scalarsubquery()#x]
: +- LocalLimit 1
: +- Project [concat_ws(' ' collate UTF8_LCASE, utf8_lcase, utf8_lcase)#x]
: +- SubqueryAlias cte
: +- CTERelationRef xxxx, true, [concat_ws(' ' collate UTF8_LCASE, utf8_lcase, utf8_lcase)#x], false, false
: +- CTERelationRef xxxx, true, [concat_ws(' ' collate UTF8_LCASE, utf8_lcase, utf8_lcase)#x], false, false, false
+- OneRowRelation


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`cte_tbl`, ErrorI
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias s
+- CTERelationRef xxxx, true, [col#x], false, false, 1
+- CTERelationRef xxxx, true, [col#x], false, false, 1, false


-- !query
Expand All @@ -32,7 +32,7 @@ CreateViewCommand `cte_view`, WITH s AS (SELECT 42 AS col) SELECT * FROM s, fals
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias s
+- CTERelationRef xxxx, true, [col#x], false, false, 1
+- CTERelationRef xxxx, true, [col#x], false, false, 1, false


-- !query
Expand All @@ -49,7 +49,7 @@ Project [col#x]
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias s
+- CTERelationRef xxxx, true, [col#x], false, false, 1
+- CTERelationRef xxxx, true, [col#x], false, false, 1, false


-- !query
Expand All @@ -64,7 +64,7 @@ InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_d
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias S
+- CTERelationRef xxxx, true, [col#x], false, false, 1
+- CTERelationRef xxxx, true, [col#x], false, false, 1, false


-- !query
Expand All @@ -86,7 +86,7 @@ InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_d
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias s
+- CTERelationRef xxxx, true, [col#x], false, false, 1
+- CTERelationRef xxxx, true, [col#x], false, false, 1, false


-- !query
Expand Down
Loading