Skip to content

Commit 0cfeee2

Browse files
committed
Fix errors introduced during rebase
1 parent df0eb14 commit 0cfeee2

File tree

12 files changed

+112
-133
lines changed

12 files changed

+112
-133
lines changed

datafusion/core/src/physical_optimizer/projection_pushdown.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1295,7 +1295,7 @@ mod tests {
12951295
Distribution, Partitioning, PhysicalExpr, PhysicalSortExpr,
12961296
PhysicalSortRequirement, ScalarFunctionExpr,
12971297
};
1298-
use datafusion_physical_plan::joins::{SymmetricHashJoinExec, SlidingHashJoinExec};
1298+
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
12991299
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
13001300
use datafusion_physical_plan::union::UnionExec;
13011301

datafusion/expr/src/interval_arithmetic.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,17 @@ impl Interval {
601601
}
602602
}
603603

604+
/// Decide if this interval is a superset of `other`. If argument `strict`
605+
/// is `true`, only returns `true` if this interval is a strict superset.
606+
///
607+
/// NOTE: This function only works with intervals of the same data type.
608+
/// Attempting to compare intervals of different data types will lead
609+
/// to an error.
610+
pub fn is_superset(&self, other: &Interval, strict: bool) -> Result<bool> {
611+
Ok(!(strict && self.eq(other))
612+
&& (self.contains(other)? == Interval::CERTAINLY_TRUE))
613+
}
614+
604615
/// Add the given interval (`other`) to this interval. Say we have intervals
605616
/// `[a1, b1]` and `[a2, b2]`, then their sum is `[a1 + a2, b1 + b2]`. Note
606617
/// that this represents all possible values the sum can take if one can

datafusion/physical-expr/src/expressions/negative.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use std::any::Any;
2121
use std::hash::{Hash, Hasher};
2222
use std::sync::Arc;
2323

24-
use crate::intervals::Interval;
2524
use crate::physical_expr::down_cast_any_ref;
2625
use crate::sort_properties::SortProperties;
2726
use crate::PhysicalExpr;

datafusion/physical-expr/src/intervals/cp_solver.rs

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ use std::sync::Arc;
2424
use super::utils::{
2525
convert_duration_type_to_interval, convert_interval_type_to_duration, get_inverse_op,
2626
};
27-
use super::IntervalBound;
2827
use crate::expressions::Literal;
29-
use crate::intervals::interval_aritmetic::{apply_operator, Interval};
3028
use crate::sort_properties::SortProperties;
3129
use crate::utils::{build_dag, ExprTreeNode};
3230
use crate::{PhysicalExpr, PhysicalSortExpr};
@@ -167,21 +165,25 @@ impl Display for ExprIntervalGraphNode {
167165
impl ExprIntervalGraphNode {
168166
/// Constructs a new DAEG node with an [-∞, ∞] range.
169167
pub fn new_unbounded(expr: Arc<dyn PhysicalExpr>, dt: &DataType) -> Result<Self> {
170-
Interval::make_unbounded(dt)
171-
.map(|interval| ExprIntervalGraphNode { expr, interval })
168+
Interval::make_unbounded(dt).map(|interval| ExprIntervalGraphNode {
169+
expr,
170+
interval,
171+
is_strict_subset: false,
172+
sort_properties: SortProperties::Unordered,
173+
})
172174
}
173175

174176
/// Constructs a new DAEG node with the given range.
175177
pub fn new_with_interval(
176178
expr: Arc<dyn PhysicalExpr>,
177179
interval: Interval,
178-
sort_option: SortProperties,
180+
sort_properties: SortProperties,
179181
) -> Self {
180182
ExprIntervalGraphNode {
181183
expr,
182184
interval,
183185
is_strict_subset: false,
184-
sort_properties: sort_option,
186+
sort_properties,
185187
}
186188
}
187189

@@ -197,8 +199,9 @@ impl ExprIntervalGraphNode {
197199
let expr = node.expression().clone();
198200
if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
199201
let value = literal.value();
200-
Interval::try_new(value.clone(), value.clone())
201-
.map(|interval| Self::new_with_interval(expr, interval))
202+
Interval::try_new(value.clone(), value.clone()).map(|interval| {
203+
Self::new_with_interval(expr, interval, SortProperties::Singleton)
204+
})
202205
} else {
203206
expr.data_type(schema)
204207
.and_then(|dt| Self::new_unbounded(expr, &dt))
@@ -659,7 +662,10 @@ impl ExprIntervalGraph {
659662
.propagate_constraints(node_interval, &children_intervals)?;
660663
if let Some(propagated_intervals) = propagated_intervals {
661664
for (child, interval) in children.into_iter().zip(propagated_intervals) {
662-
self.graph[child].interval = interval;
665+
let child_node = &mut self.graph[child];
666+
let inside = child_node.interval.is_superset(&interval, true)?;
667+
child_node.interval = interval;
668+
child_node.is_strict_subset = inside;
663669
}
664670
} else {
665671
// The constraint is infeasible, report:
@@ -669,25 +675,6 @@ impl ExprIntervalGraph {
669675
Ok(PropagationResult::Success)
670676
}
671677

672-
/// Updates intervals for all expressions in the DAEG by successive
673-
/// bottom-up and top-down traversals.
674-
pub fn update_ranges(
675-
&mut self,
676-
leaf_bounds: &mut [(usize, Interval)],
677-
) -> Result<PropagationResult> {
678-
self.assign_intervals(leaf_bounds);
679-
let bounds = self.evaluate_bounds()?;
680-
if bounds == &Interval::CERTAINLY_FALSE {
681-
Ok(PropagationResult::Infeasible)
682-
} else if bounds == &Interval::UNCERTAIN {
683-
let result = self.propagate_constraints();
684-
self.update_intervals(leaf_bounds);
685-
result
686-
} else {
687-
Ok(PropagationResult::CannotPropagate)
688-
}
689-
}
690-
691678
/// Returns the deepest pruning expressions from the graph.
692679
///
693680
/// This function traverses the graph in a depth-first search post-order manner. For each node, it checks its neighbors

datafusion/physical-plan/src/joins/partitioned_hash_join.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ use crate::joins::sliding_window_join_utils::{
1414
partitioned_join_output_partitioning, EagerWindowJoinOperations, LazyJoinStream,
1515
LazyJoinStreamState, ProbeBuffer,
1616
};
17+
use crate::joins::stream_join_utils::{
18+
get_filter_representation_of_join_side, prepare_sorted_exprs, EagerJoinStream,
19+
EagerJoinStreamState, SortedFilterExpr, StreamJoinStateResult,
20+
};
21+
use crate::joins::symmetric_hash_join::StreamJoinMetrics;
1722
use crate::joins::utils::{
1823
apply_join_filter_to_indices, build_batch_from_indices, build_join_schema,
1924
calculate_join_output_ordering, check_join_is_valid, ColumnIndex, JoinFilter, JoinOn,
@@ -29,6 +34,7 @@ use arrow::compute::concat_batches;
2934
use arrow_array::builder::{UInt32Builder, UInt64Builder};
3035
use arrow_array::{ArrayRef, RecordBatch, UInt32Array, UInt64Array};
3136
use arrow_schema::{Field, Schema, SchemaRef};
37+
use datafusion_common::hash_utils::create_hashes;
3238
use datafusion_common::utils::{
3339
get_record_batch_at_indices, get_row_at_idx, linear_search,
3440
};
@@ -37,22 +43,17 @@ use datafusion_common::{
3743
};
3844
use datafusion_execution::memory_pool::MemoryConsumer;
3945
use datafusion_execution::TaskContext;
46+
use datafusion_expr::interval_arithmetic::Interval;
47+
use datafusion_physical_expr::equivalence::join_equivalence_properties;
4048
use datafusion_physical_expr::expressions::Column;
41-
use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval};
49+
use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
4250
use datafusion_physical_expr::window::PartitionKey;
4351
use datafusion_physical_expr::{
4452
EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
4553
};
4654

47-
use crate::joins::stream_join_utils::{
48-
get_filter_representation_of_join_side, prepare_sorted_exprs, EagerJoinStream,
49-
EagerJoinStreamState, SortedFilterExpr, StreamJoinStateResult,
50-
};
51-
use crate::joins::symmetric_hash_join::StreamJoinMetrics;
5255
use ahash::RandomState;
5356
use async_trait::async_trait;
54-
use datafusion_common::hash_utils::create_hashes;
55-
use datafusion_physical_expr::equivalence::join_equivalence_properties;
5657
use futures::Stream;
5758
use hashbrown::raw::RawTable;
5859
use parking_lot::Mutex;
@@ -604,9 +605,9 @@ impl BuildBuffer {
604605

605606
// Get the lower or upper interval based on the sort direction:
606607
let target = if options.descending {
607-
&interval.lower.value
608+
interval.lower()
608609
} else {
609-
&interval.upper.value
610+
interval.upper()
610611
}
611612
.clone();
612613

datafusion/physical-plan/src/joins/sliding_hash_join.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,14 @@ use crate::joins::{
3030
adjust_probe_side_indices_by_join_type,
3131
calculate_build_outer_indices_by_join_type,
3232
calculate_the_necessary_build_side_range_helper, joinable_probe_batch_helper,
33-
EagerWindowJoinOperations, LazyJoinStream, LazyJoinStreamState, ProbeBuffer,
33+
partitioned_join_output_partitioning, EagerWindowJoinOperations, LazyJoinStream,
34+
LazyJoinStreamState, ProbeBuffer,
3435
},
35-
symmetric_hash_join::OneSideHashJoiner,
36+
stream_join_utils::{
37+
combine_two_batches, prepare_sorted_exprs, record_visited_indices,
38+
EagerJoinStream, EagerJoinStreamState, SortedFilterExpr, StreamJoinStateResult,
39+
},
40+
symmetric_hash_join::{OneSideHashJoiner, StreamJoinMetrics},
3641
utils::{
3742
build_batch_from_indices, build_join_schema, calculate_join_output_ordering,
3843
check_join_is_valid, swap_filter, swap_join_on, swap_join_type,
@@ -52,18 +57,13 @@ use arrow::record_batch::RecordBatch;
5257
use datafusion_common::{internal_err, plan_err, JoinSide, JoinType};
5358
use datafusion_execution::memory_pool::MemoryConsumer;
5459
use datafusion_execution::TaskContext;
55-
use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval};
60+
use datafusion_expr::interval_arithmetic::Interval;
61+
use datafusion_physical_expr::equivalence::join_equivalence_properties;
62+
use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
5663
use datafusion_physical_expr::PhysicalSortRequirement;
5764

58-
use crate::joins::sliding_window_join_utils::partitioned_join_output_partitioning;
59-
use crate::joins::stream_join_utils::{
60-
combine_two_batches, prepare_sorted_exprs, record_visited_indices, EagerJoinStream,
61-
EagerJoinStreamState, SortedFilterExpr, StreamJoinStateResult,
62-
};
63-
use crate::joins::symmetric_hash_join::StreamJoinMetrics;
6465
use ahash::RandomState;
6566
use async_trait::async_trait;
66-
use datafusion_physical_expr::equivalence::join_equivalence_properties;
6767
use futures::Stream;
6868
use parking_lot::Mutex;
6969

datafusion/physical-plan/src/joins/sliding_nested_loop_join.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ use crate::joins::sliding_window_join_utils::{
1919
partitioned_join_output_partitioning, EagerWindowJoinOperations, LazyJoinStream,
2020
LazyJoinStreamState, ProbeBuffer,
2121
};
22+
use crate::joins::stream_join_utils::{
23+
calculate_side_prune_length_helper, combine_two_batches, prepare_sorted_exprs,
24+
record_visited_indices, EagerJoinStream, EagerJoinStreamState, SortedFilterExpr,
25+
StreamJoinStateResult,
26+
};
27+
use crate::joins::symmetric_hash_join::StreamJoinMetrics;
2228
use crate::joins::utils::{
2329
apply_join_filter_to_indices, build_batch_from_indices, build_join_schema,
2430
calculate_join_output_ordering, estimate_join_statistics, swap_filter,
@@ -36,23 +42,18 @@ use crate::{
3642
use arrow::array::{UInt32Array, UInt32Builder, UInt64Array, UInt64Builder};
3743
use arrow::datatypes::{Schema, SchemaRef};
3844
use arrow::record_batch::RecordBatch;
39-
use async_trait::async_trait;
4045
use datafusion_common::{internal_err, DataFusionError, JoinSide, Result, Statistics};
4146
use datafusion_execution::memory_pool::MemoryConsumer;
4247
use datafusion_execution::TaskContext;
48+
use datafusion_expr::interval_arithmetic::Interval;
4349
use datafusion_expr::JoinType;
44-
use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval};
50+
use datafusion_physical_expr::equivalence::join_equivalence_properties;
51+
use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
4552
use datafusion_physical_expr::{
4653
EquivalenceProperties, PhysicalSortExpr, PhysicalSortRequirement,
4754
};
4855

49-
use crate::joins::stream_join_utils::{
50-
calculate_side_prune_length_helper, combine_two_batches, prepare_sorted_exprs,
51-
record_visited_indices, EagerJoinStream, EagerJoinStreamState, SortedFilterExpr,
52-
StreamJoinStateResult,
53-
};
54-
use crate::joins::symmetric_hash_join::StreamJoinMetrics;
55-
use datafusion_physical_expr::equivalence::join_equivalence_properties;
56+
use async_trait::async_trait;
5657
use futures::Stream;
5758
use hashbrown::HashSet;
5859
use parking_lot::Mutex;

datafusion/physical-plan/src/joins/sliding_window_join_utils.rs

Lines changed: 23 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,37 @@
11
// Copyright (C) Synnada, Inc. - All Rights Reserved.
22
// This file does not contain any Apache Software Foundation copyrighted code.
33

4+
use std::task::Poll;
5+
46
use crate::joins::{
57
stream_join_utils::{
6-
get_pruning_anti_indices, get_pruning_semi_indices, SortedFilterExpr,
7-
StreamJoinStateResult,
8+
calculate_side_prune_length_helper, get_build_side_pruned_exprs,
9+
get_filter_representation_of_join_side,
10+
get_filter_representation_schema_of_build_side, get_pruning_anti_indices,
11+
get_pruning_semi_indices, SortedFilterExpr, StreamJoinStateResult,
812
},
9-
utils,
10-
utils::{append_right_indices, get_anti_indices, get_semi_indices, JoinFilter},
13+
symmetric_hash_join::StreamJoinMetrics,
14+
utils::{self, append_right_indices, get_anti_indices, get_semi_indices, JoinFilter},
1115
};
12-
use std::task::Poll;
16+
use crate::{handle_async_state, handle_state};
1317

18+
use arrow::compute::concat_batches;
1419
use arrow_array::{
1520
builder::{PrimitiveBuilder, UInt32Builder, UInt64Builder},
1621
types::{UInt32Type, UInt64Type},
1722
ArrowPrimitiveType, NativeAdapter, PrimitiveArray, RecordBatch, UInt32Array,
1823
UInt64Array,
1924
};
2025
use arrow_schema::SchemaRef;
21-
use async_trait::async_trait;
2226
use datafusion_common::{DataFusionError, JoinSide, JoinType, Result, ScalarValue};
2327
use datafusion_execution::SendableRecordBatchStream;
28+
use datafusion_expr::interval_arithmetic::Interval;
2429
use datafusion_physical_expr::expressions::{Column, PhysicalSortExpr};
25-
use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval, IntervalBound};
26-
use futures::ready;
27-
28-
use crate::joins::stream_join_utils::{
29-
calculate_side_prune_length_helper, get_build_side_pruned_exprs,
30-
get_filter_representation_of_join_side,
31-
get_filter_representation_schema_of_build_side,
32-
};
33-
use crate::joins::symmetric_hash_join::StreamJoinMetrics;
34-
use crate::{handle_async_state, handle_state};
35-
use arrow::compute::concat_batches;
30+
use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
3631
use datafusion_physical_expr::Partitioning;
37-
use futures::{FutureExt, StreamExt};
32+
33+
use async_trait::async_trait;
34+
use futures::{ready, FutureExt, StreamExt};
3835
use hashbrown::HashSet;
3936
use std::{mem, usize};
4037

@@ -937,7 +934,7 @@ pub fn calculate_the_necessary_build_side_range_helper(
937934
.collect::<Vec<_>>();
938935

939936
// Update the physical expression graph using the join filter intervals:
940-
graph.update_ranges(&mut filter_intervals)?;
937+
graph.update_ranges(&mut filter_intervals, Interval::CERTAINLY_TRUE)?;
941938

942939
let intermediate_schema = get_filter_representation_schema_of_build_side(
943940
filter.schema(),
@@ -995,12 +992,12 @@ pub fn check_if_sliding_window_condition_is_met(
995992
// Data is sorted in descending order, so check if latest value is less
996993
// than the lower bound of the interval. If it is, we must have processed
997994
// all rows that are needed from the build side for this window.
998-
latest_value < interval.lower.value
995+
&latest_value < interval.lower()
999996
} else {
1000997
// Data is sorted in ascending order, so check if latest value is greater
1001998
// than the upper bound of the interval. If it is, we must have processed
1002999
// all rows that are needed from the build side for this window.
1003-
latest_value > interval.upper.value
1000+
&latest_value > interval.upper()
10041001
})
10051002
})
10061003
.collect::<Result<Vec<_>>>()?;
@@ -1301,18 +1298,13 @@ pub(crate) fn update_filter_expr_bounds(
13011298
.intermediate_batch_filter_expr()
13021299
.data_type(&build_inner_buffer.schema())?;
13031300

1304-
// Create a null scalar value with the obtained datatype:
1305-
let null_scalar = ScalarValue::try_from(build_order_datatype)?;
13061301
// Create a null interval using the null scalar value:
1307-
let null_interval = Interval::new(
1308-
IntervalBound::new(null_scalar.clone(), true),
1309-
IntervalBound::new(null_scalar, true),
1310-
);
1302+
let unbounded_interval = Interval::make_unbounded(&build_order_datatype)?;
13111303

13121304
build_sorted_filter_exprs
13131305
.iter_mut()
13141306
.for_each(|sorted_filter_expr| {
1315-
sorted_filter_expr.set_interval(null_interval.clone());
1307+
sorted_filter_expr.set_interval(unbounded_interval.clone());
13161308
});
13171309

13181310
let first_probe_intermediate_batch = get_filter_representation_of_join_side(
@@ -1349,15 +1341,9 @@ pub(crate) fn update_filter_expr_bounds(
13491341
let right_value = ScalarValue::try_from_array(&last_array, 0)?;
13501342
// Determine the interval bounds based on sort options:
13511343
let interval = if sorted_filter_expr.order().descending {
1352-
Interval::new(
1353-
IntervalBound::new(right_value, false),
1354-
IntervalBound::new(left_value, false),
1355-
)
1344+
Interval::try_new(right_value, left_value)?
13561345
} else {
1357-
Interval::new(
1358-
IntervalBound::new(left_value, false),
1359-
IntervalBound::new(right_value, false),
1360-
)
1346+
Interval::try_new(left_value, right_value)?
13611347
};
13621348
// Set the calculated interval for the sorted filter expression:
13631349
sorted_filter_expr.set_interval(interval);
@@ -1368,6 +1354,7 @@ pub(crate) fn update_filter_expr_bounds(
13681354
#[cfg(test)]
13691355
mod tests {
13701356
use crate::joins::sliding_window_join_utils::append_probe_indices_in_order;
1357+
13711358
use arrow_array::{UInt32Array, UInt64Array};
13721359

13731360
#[test]

0 commit comments

Comments
 (0)