-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-53074][SQL] Avoid partial clustering in SPJ to meet a child's required distribution #51818
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@szehon-ho @sunchao Could you take a look at this PR? Thanks! |
@chirag-s-db Should you be adding the tests to cover the following scenarios?
|
|WHERE p.RN = 1 | ||
|""".stripMargin) | ||
val shuffles = collectShuffles(df.queryExecution.executedPlan) | ||
assert(shuffles.isEmpty, "should not contain any shuffle") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we check number of partitions to make sure that partial cluster distribution replication did not kick in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, done.
@@ -490,9 +502,15 @@ case class EnsureRequirements( | |||
// whether partially clustered distribution can be applied. For instance, the | |||
// optimization cannot be applied to a left outer join, where the left hand | |||
// side is chosen as the side to replicate partitions according to stats. | |||
// Similarly, the partially clustered distribution cannot be applied if the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we add a comment about side-effects, like select row_number()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added expansion to this comment.
*/ | ||
private def canApplyPartialClusteredDistribution(plan: SparkPlan): Boolean = { | ||
!plan.exists { | ||
case u: UnaryExecNode => u.requiredChildDistribution.head != UnspecifiedDistribution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry just some question here:
- unary node is just to make sure we exclude window functions, and not joins? Should we check window function node specifically, if that's the goal? Just trying to understand the reason why unary node in particular here.
- Is it a better check to do something like:
u.child.outputPartitioning.satisfies(u.requiredChildDistribution.head)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unary node is just to make sure we exclude window functions, and not joins? Should we check window function node specifically, if that's the goal? Just trying to understand the reason why unary node in particular here.
We don't want to restrict this only to window functions - any exec node that has a specified required distribution applies (including windows, grouping aggregates, etc.). Checking for unary nodes just generally makes this easier to reason about (since it guarantees that there's only a single required child distribution to check).
Note that we also exclude any non-unary nodes (see the line below) like JOINs. The reason that it's not safe to apply to a JOIN is that when applying the partially clustered distribution, all scans on the partially clustered side are marked as partially clustered, which would create incorrect results on the lower JOIN. For example, suppose we had 3 partitioned tables A, B, and C and joined like A JOIN (b JOIN C)
(assuming all keys lined up with the partitioning etc). It would be safe to apply a partially clustered distribution to A (since the tasks on b JOIN c
can simply be replicated), but it would not be safe to apply a partially clustered distribution to the right side (since the results of b JOIN c
would be incorrect.
Is it a better check to do something like:
u.child.outputPartitioning.satisfies(u.requiredChildDistribution.head)?
I don't think this would work - the KeyGroupedPartitioning would still (on paper) return that it satisfies the required distribution, even if the distribution actually will be partially clustered. In addition, we haven't yet actually applied the partially clustered distribution here, so the output partitioning is still the original KeyGroupedPartitioning (without any of the pushed down spjParams applied).
What changes were proposed in this pull request?
Currently, SPJ logic can apply partial clustering (when enabled) to either side of an inner JOIN as long as the nodes between the scan and JOIN preserve partitioning. This doesn't work if one of these nodes is using the scan's key-grouped partitioning to satisfy its required distribution (for example, a grouping agg or window function).
This PR avoids this issue by avoiding applying a partially clustered distribution to a JOIN's child if any node in that child relies on the KeyGroupedPartitioning to satisfy its required distribution (since it's not safe to do so with a partially clustered distribution).
Why are the changes needed?
Without this fix, using a partially-clustered distribution with SPJ may cause correctness issues.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
See test changes.
Was this patch authored or co-authored using generative AI tooling?
No.