Skip to content

Unnest Correlated Subquery #17110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 204 commits into
base: main
Choose a base branch
from
Draft

Conversation

irenjj
Copy link
Contributor

@irenjj irenjj commented Aug 10, 2025

Which issue does this PR close?

  • Closes #.

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate common Related to common crate proto Related to proto crate labels Aug 10, 2025
@github-actions github-actions bot added physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate labels Aug 10, 2025
@irenjj
Copy link
Contributor Author

irenjj commented Aug 10, 2025

Decorrelate Subqueries

Complete decorrelation consists of three parts:

  • rewrite dependent join: rewrite_dependent_join.rs
  • decorrelate dependent join: decorrelate_dependent_join.rs
  • eliminate duplicated delim scan: deliminator.rs

1. Rewrite Dependent Join

Rewrite dependent join will rewrite plan_nodes containing subquery expressions into dependent joins, to facilitate subsequent processing of dependent joins, with the ultimate goal of eliminating dependent joins.

DependentJoinRewriter structure:

pub struct DependentJoinRewriter {
    // ID of the currently traversed node, incremented on each f_down(), decremented on each f_up().
    current_id: usize,
    // Depth of the current DependentJoin,
    // incremented by 1 when encountering nested DependentJoin in f_down(),
    // decremented by 1 when processing DependentJoin in f_up().
    subquery_depth: usize,
    // current_id -> Node
    // Maintains an IndexMap structure that stores all plan_nodes currently traversed.
    nodes: IndexMap<usize, Node>,
    // all the node ids from root to the current node
    stack: Vec<usize>,
    // track for each column, the nodes plan that reference to its within the tree
    all_outer_ref_columns: IndexMap<Column, Vec<ColumnAccess>>,
}

It performs pre-order traversal of the operator tree, then rewrites the traversed operators through f_down() and f_up(). It assigns a unique current_id to each plan_node when traversing.

1.1 f_down

f_down checks whether there are Subqueries in the expressions of each plan_node, and marks plan_nodes containing Subqueries as is_dependent_join_node.
Subsequently, after traversing to the plan_node of the Subquery, it obtains the Subquery type.

2. Decorrelate Dependent Join

Decorrelate dependent join performs decorrelation on the results generated by rewrite dependent join.

Decorrelate dependent join is basically implemented according to the code in the paper, but with slight differences. The entry function is DecorrelateDepednentJoin::rewrite(), which constructs a DependentJoinDecorrelator in rewrite() and calls the decorrelate() function.

In the decorrelate() function, it performs pre-order traversal of the plan_node tree until it finds a DependentJoin:

fn decorrelate(plan: &LogicalPlan, ...) -> Result<LogicalPlan> {
    if let LogicalPlan::DependentJoin(djoin) = plan {
        // handle dependent join
    } else {
        Ok(plan
            .clone()
            .map_children(|n| Ok(Transformed::yes(self.decorrelate(&n, true, 0)?)))?
            .data)
    }
}

Processing DependentJoin consists of several major steps:

  • Handle Left Child
  • Handle Right Child
  • Construct Join Condition

DependentJoinDecorrelator structure:

pub struct DependentJoinDecorrelator {
    // All correlated column information for the current depth.
    domains: IndexSet<CorrelatedColumnInfo>,
    // outer table column -> delim scan table column
    correlated_map: IndexMap<Column, Column>,
    // Whether the current DependentJoin is a nested DependentJoin
    is_initial: bool,

    // all correlated columns in current depth and downward
    correlated_columns: Vec<CorrelatedColumnInfo>,
    // check if we have to replace any COUNT aggregates into "CASE WHEN X IS NULL THEN 0 ELSE COUNT END"
    // store a mapping between a expr and its original index in the logplan output
    replacement_map: IndexMap<String, Expr>,
    // if during the top down traversal, we observe any operator that requires
    // joining all rows from the lhs with nullable rows on the rhs
    any_join: bool,
    delim_scan_id: usize,
    // All columns of the previously constructed delim scan.
    dscan_cols: Vec<Column>,
}

2.1. Handle Left Child

First handle the left child, divided into two cases:

  • The currently processed DependentJoin is not a nested DependentJoin.
  • The currently processed DependentJoin is a nested DependentJoin.

Then based on the mapping relationship in self.correlated_map, rewrite outer table columns to delim scan table columns.

2.1.1. Non-Nesting DependentJoin

For non-nested DependentJoin, it recursively calls decorrelate(left), with the purpose of inheriting the information of the current DependentJoinDecorrelator. Since it's a non-nesting DependentJoin, no changes need to be made to the correlated column information.

2.1.2. Nesting DependentJoin

For Nesting DependentJoin, it determines whether the LHS has correlated columns at the current depth (corresponding to accessing in the paper). The logic for determining the existence of correlated columns is in detect_correlated_expressions, which traverses the plan, finds all outer ref expressions for each plan_node, and checks whether they are in the domains of the current DecorrelateDependentJoin. The domains record all correlated columns of the current depth.

  • If the LHS has no correlated columns at the current depth, a new DependentJoinDecorrelator can be constructed to decorrelate the left child, because subsequent LHS processing is independent of the information (mainly correlated columns) in the current DependentJoinDecorrelator.
  • If the LHS has correlated columns at the current depth, the DependentJoin in the LHS needs to be pushed down to eliminate the DependentJoin of the LHS at the current depth.

2.2. Handle Right Child

Processing the RHS constructs a new DependentJoinDecorrelator. The domains of the new DependentJoinDecorrelator are parent correlated columns of current level + dependent join of current level. By inheriting the correlated columns of the parent node, it handles multi-level dependent joins.

Since the RHS of DependentJoin must have correlated columns, the right child is directly processed through push_down_dependent_join.

2.3. Join Condition

In delim_join_conditions, different join types are constructed based on the subquery type in DependentJoin, and then Join Conditions of correlated column IsNotDistinctFrom delim scan column are constructed.

2.4 Unnest Function

All Unnest logic is implemented in push_down_dependent_join, with different processing for each type of plan_node.

3. Eliminate Duplicated DelimGet

The above two steps completed the decorrelation operation and introduced two new logical operators DelimJoin and DelimGet to fully construct the logical plan. At this stage, redundant DelimJoin and DelimGet will be eliminated.

Construct a new use case to illustrate how the work at this stage is carried out:

SELECT s.name
FROM students s
WHERE s.grade > (
    -- subquery 1
    SELECT AVG(e1.score)
    FROM exams e1
    WHERE e1.sid = s.id
    AND e1.score > (
        -- subquery 2
        SELECT MIN(e2.score)
        FROM exams e2
        WHERE e2.sid = s.id
        AND e2.type = 'final'
    )
);

After decorrelating the above SQL, the result is generated:

            Projection
                |
              Filter
                |
            DelimJoin1
          /            \
       Get           Projection
                           |
                       ComparisonJoin
                     /               \
                  DelimGet1         Aggregate
                                       |
                                     Filter
                                       |
                                    DelimJoin2
                                 /              \
                         CrossProduct          Projection
                         /      \                  |
                      Get    DelimGet2       ComparisonJoin
                                             /            \
                                        DelimGet3        Aggregate
                                                             |
                                                           Filter
                                                             |
                                                       CrossProduct
                                                       /           \
                                                     Get        DelimGet4

First, all DelimJoins and DelimGets under DelimJoins are collected as candidates. For the joins array under each candidate, they are sorted by depth from largest to smallest, finding the DelimGet with the deepest depth.

candidate1: {
    DelimJoin2,
    joins: [DelimGet4(depth=3), DelimGet3(depth=1)],
}

candidate2: {
    DelimJoin1,
    joins: [DelimGet2(depth=5), DelimGet1(depth=1)],
}

     Projection
         |
       Filter
         |
     DelimJoin1
   /    ^       \
Get     |     Projection
        |           |
        |       ComparisonJoin
        |     /               \
        |  DelimGet1         Aggregate
        |      |                |
        +------+              Filter
        |                       |
        |                    DelimJoin2
        |                 /      ^       \
        |         CrossProduct   |      Projection
        |         /      \       |          |
        |      Get    DelimGet2  |    ComparisonJoin
        |                 |      |    /            \
        + ----------------+      |  DelimGet3     Aggregate
                                 |      |             |
                                 +------+           Filter
                                 |                    |
                                 |              CrossProduct
                                 |              /           \
                                 |            Get        DelimGet4
                                 |                          |
                                 +--------------------------+

Process the deduplication of DelimGet under each candidate separately. Taking candidate1 as an example:

candidate1: {
    DelimJoin2,
    joins: [DelimGet4(depth=3), DelimGet3(depth=1)],
}

The situation is divided into two categories:

  1. If DelimJoin has selection conditions (filter/where clauses), DelimGet can be retained. This is because selection conditions can greatly reduce the amount of data in the right subtree of DelimJoin. This is an optimization trade-off that sacrifices certain deduplication optimization opportunities (retaining one DelimGet) but gains better query performance (through early filtering of selection conditions). This is why the code deliberately retains the deepest join with DelimGet when there are selection conditions. In this case:
    • Retaining the deepest join with DelimGet is meaningful because this selection condition can filter out a large amount of data early
    • Removing other shallower joins is still safe because the deepest one has already guaranteed data correctness
    • It also avoids duplicate join operations
  2. For cases where DelimJoin has no selection conditions, it's necessary to continue determining whether DelimGet can be removed. The ultimate goal is to remove all correlated columns.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate proto Related to proto crate sql SQL Planner sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants