-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-53127][SQL] Enable LIMIT ALL to override recursion row limit #51847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[SPARK-53127][SQL] Enable LIMIT ALL to override recursion row limit #51847
Conversation
@Pajaraja Do you need to do the comparison tests against Snowflake and PostgreSQL? The current changes modify how limits are handled in recursive CTEs, which could affect query results and compatibility with other databases. |
…essions/LimitAllExpr.scala
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/LimitAllExpr.scala
Outdated
Show resolved
Hide resolved
I think this won't affect compatibility since LIMIT ALL should be completely no op in these, but they don't have the same row limit as spark. |
LimitAll(withOffset) | ||
} else { | ||
withOffset.optional(limit) { | ||
if (forPipeOperators && clause.nonEmpty && clause != PipeOperators.offsetClause) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to avoid duplicated code:
if (ctx.LIMIIT != null) {
if (forPipeOperators ...) ...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed!
@@ -1659,6 +1659,15 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr | |||
copy(child = newChild) | |||
} | |||
|
|||
case class LimitAll(child: LogicalPlan) extends OrderPreservingUnaryNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add some code comment to explain it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be a regular unary node, not OrderPreservingUnaryNode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment, changed parent class.
applyLimitAllToPlan(la.child, isInLimitAll = true) | ||
case cteRef: CTERelationRef if isInLimitAll => | ||
cteRef.copy(isUnlimitedRecursion = Some(true)) | ||
case other => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we define an allow list? It seems wrong to propagate LimitAll all the way down to CTERelationRef
, even though there are plan nodes like Sort
in the middle that breaks Limit semantic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made an allowlist based on the LimitPushDown optimizer rule and added Filter. Is there any other nodes we should add?
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/cteOperators.scala
Outdated
Show resolved
Hide resolved
cteRef.copy(isUnlimitedRecursion = Some(true)) | ||
// Allow-list for pushing down Limit All. | ||
case _: Project | _: Filter | _: Join | _: Union | _: Offset | | ||
_: BatchEvalPython | _: ArrowEvalPython | _: SubqueryAlias => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SubqueryAlias
is removed before optimizer kicks in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this happens in analyzer, right after CTESubstitution, so we have to include it.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
Outdated
Show resolved
Hide resolved
throw QueryParsingErrors.multipleQueryResultClausesWithPipeOperatorsUnsupportedError( | ||
ctx, clause, PipeOperators.limitClause) | ||
} | ||
if (forPipeOperators && clause.nonEmpty && clause != PipeOperators.offsetClause) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should only fail if limit != null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
...alyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
Outdated
Show resolved
Hide resolved
@@ -194,7 +194,8 @@ case class CTERelationRef( | |||
override val isStreaming: Boolean, | |||
statsOpt: Option[Statistics] = None, | |||
recursive: Boolean = false, | |||
override val maxRows: Option[Long] = None) extends LeafNode with MultiInstanceRelation { | |||
override val maxRows: Option[Long] = None, | |||
isUnlimitedRecursion: Option[Boolean] = None) extends LeafNode with MultiInstanceRelation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this means we have 3 values: None
, Some(true)
, Some(false)
. Is it necessary and what does each of them mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially did optional to avoid changing golden files. I've now changed it to a single boolean.
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/UnionLoopExec.scala
Outdated
Show resolved
Hide resolved
@@ -4281,6 +4282,29 @@ object RemoveTempResolvedColumn extends Rule[LogicalPlan] { | |||
} | |||
} | |||
|
|||
object ApplyLimitAll extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move it to a new file?
@@ -4281,6 +4282,29 @@ object RemoveTempResolvedColumn extends Rule[LogicalPlan] { | |||
} | |||
} | |||
|
|||
object ApplyLimitAll extends Rule[LogicalPlan] { | |||
def applyLimitAllToPlan(plan: LogicalPlan, isInLimitAll: Boolean = false): LogicalPlan = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def applyLimitAllToPlan(plan: LogicalPlan, isInLimitAll: Boolean = false): LogicalPlan = { | |
private def applyLimitAllToPlan(plan: LogicalPlan, isInLimitAll: Boolean = false): LogicalPlan = { |
@@ -189,14 +189,20 @@ case class InlineCTE( | |||
|
|||
case ref: CTERelationRef => | |||
val refInfo = cteMap(ref.cteId) | |||
|
|||
val cteBody = if (ref.isUnlimitedRecursion) { | |||
setUnlimitedRecursion(refInfo.cteDef.child, ref.cteId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setUnlimitedRecursion(refInfo.cteDef.child, ref.cteId) | |
setUnlimitedRecursion(refInfo.cteDef.child, ref.cteId) |
@@ -97,6 +97,37 @@ SELECT * FROM t LIMIT 60; | |||
|
|||
DROP VIEW ZeroAndOne; | |||
|
|||
-- limited recursion allowed to stop from failing by putting LIMIT ALL | |||
WITH RECURSIVE t(n) MAX RECURSION LEVEL 100 AS ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so LIMIT ALL can override user-specified MAX RECURSION LEVEL ?
@@ -786,6 +786,7 @@ class ParametersSuite extends QueryTest with SharedSparkSession { | |||
} | |||
|
|||
test("SPARK-50892: parameterized identifier inside a recursive CTE") { | |||
spark.conf.set("spark.sql.cteRecursionRowLimit", "50") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use withSQLConf
What changes were proposed in this pull request?
Introduce LimitAll for LIMIT ALL node that gets pushed into UnionAll to have unbounded number of rows returned by the recursion.
Why are the changes needed?
LIMIT should override the recursion row limit, so LIMIT ALL should remove this limit. Currently LIMIT ALL is completely no-op (it doesn't create any) node. We introduce this new node and propagate it through its subtree into any UnionLoop.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New tests in LimitPushdownSuite, golden file test in cte-recursion. Existing golden file tests.
Was this patch authored or co-authored using generative AI tooling?
No.