-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-53272][SQL] Refactor SPJ pushdown logic out of BatchScanExec #51979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! looks clean, only material change is the logic is encapsulated in partitionValueAccessor
} | ||
case None => | ||
spjParams.joinKeyPositions match { | ||
case Some(projectionPositions) => basePartitioning.partitionValues.map{r => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case Some(projectionPositions) => basePartitioning.partitionValues.map{r => | |
case Some(projectionPositions) => basePartitioning.partitionValues.map { r => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
*/ | ||
def getInputPartitionGrouping( | ||
p: KeyGroupedPartitioning, | ||
spjParams: StoragePartitionJoinParams, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for my curiosity: what's the relationship between p.expressions
and spjParams.keyGroupedPartitioning
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p.expressions
includes join key reordering of the expressions (ref), while spjParams.keyGroupedPartitioning
contains the partitioning expressions in their original ordering (which is why they must be reordered here if join key positions are present).
*/ | ||
def getOutputKeyGroupedPartitioning( | ||
basePartitioning: KeyGroupedPartitioning, | ||
spjParams: StoragePartitionJoinParams): KeyGroupedPartitioning = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move StoragePartitionJoinParams
to an individual file instead of BatchScanExec.scala
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, done.
Also cc: @sunchao and @szehon-ho for visibility |
Let's file a JIRA and add it to the PR title. |
@chirag-s-db can you follow the instructions and set up your Github Action? https://github.com/apache/spark/pull/51979/checks?check_run_id=47918325385 |
23d337b
to
1f1c551
Compare
@cloud-fan Checks should be running now, had to rebase on latest master. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @chirag-s-db !
What changes were proposed in this pull request?
SPJ logic is currently closely coupled with the DSV2-specific BatchScanExec physical node, making it difficult for connectors to take advantage of SPJ for other types of scans. This PR refactors the SPJ-specific logic out of BatchScanExec, exposing a parameterized base class for connectors to use. This base class requires a partition value accessor (mapping from the parameterized type to an
InternalRow
).Why are the changes needed?
Allow connectors to take advantage of SPJ on existing scans.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Pure refactor - existing tests should suffice.
Was this patch authored or co-authored using generative AI tooling?
No.