Skip to content

Commit 07f1e34

Browse files
committed
refactor: migrate LinearSearch to HashTable
For apache#13433.
1 parent 9fbd87f commit 07f1e34

File tree

1 file changed

+15
-12
lines changed

1 file changed

+15
-12
lines changed

datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ use datafusion_physical_expr::PhysicalExpr;
6464
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
6565
use futures::stream::Stream;
6666
use futures::{ready, StreamExt};
67-
use hashbrown::raw::RawTable;
67+
use hashbrown::hash_table::HashTable;
6868
use log::debug;
6969

7070
/// Window execution plan
@@ -440,16 +440,16 @@ pub struct LinearSearch {
440440
/// is ordered by a, b and the window expression contains a PARTITION BY b, a
441441
/// clause, this attribute stores [1, 0].
442442
ordered_partition_by_indices: Vec<usize>,
443-
/// We use this [`RawTable`] to calculate unique partitions for each new
443+
/// We use this [`HashTable`] to calculate unique partitions for each new
444444
/// RecordBatch. First entry in the tuple is the hash value, the second
445445
/// entry is the unique ID for each partition (increments from 0 to n).
446-
row_map_batch: RawTable<(u64, usize)>,
447-
/// We use this [`RawTable`] to calculate the output columns that we can
446+
row_map_batch: HashTable<(u64, usize)>,
447+
/// We use this [`HashTable`] to calculate the output columns that we can
448448
/// produce at each cycle. First entry in the tuple is the hash value, the
449449
/// second entry is the unique ID for each partition (increments from 0 to n).
450450
/// The third entry stores how many new outputs are calculated for the
451451
/// corresponding partition.
452-
row_map_out: RawTable<(u64, usize, usize)>,
452+
row_map_out: HashTable<(u64, usize, usize)>,
453453
input_schema: SchemaRef,
454454
}
455455

@@ -608,8 +608,8 @@ impl LinearSearch {
608608
input_buffer_hashes: VecDeque::new(),
609609
random_state: Default::default(),
610610
ordered_partition_by_indices,
611-
row_map_batch: RawTable::with_capacity(256),
612-
row_map_out: RawTable::with_capacity(256),
611+
row_map_batch: HashTable::with_capacity(256),
612+
row_map_out: HashTable::with_capacity(256),
613613
input_schema,
614614
}
615615
}
@@ -629,7 +629,7 @@ impl LinearSearch {
629629
// res stores PartitionKey and row indices (indices where these partition occurs in the `batch`) for each partition.
630630
let mut result: Vec<(PartitionKey, Vec<u32>)> = vec![];
631631
for (hash, row_idx) in batch_hashes.into_iter().zip(0u32..) {
632-
let entry = self.row_map_batch.get_mut(hash, |(_, group_idx)| {
632+
let entry = self.row_map_batch.find_mut(hash, |(_, group_idx)| {
633633
// We can safely get the first index of the partition indices
634634
// since partition indices has one element during initialization.
635635
let row = get_row_at_idx(columns, row_idx as usize).unwrap();
@@ -639,8 +639,11 @@ impl LinearSearch {
639639
if let Some((_, group_idx)) = entry {
640640
result[*group_idx].1.push(row_idx)
641641
} else {
642-
self.row_map_batch
643-
.insert(hash, (hash, result.len()), |(hash, _)| *hash);
642+
self.row_map_batch.insert_unique(
643+
hash,
644+
(hash, result.len()),
645+
|(hash, _)| *hash,
646+
);
644647
let row = get_row_at_idx(columns, row_idx as usize)?;
645648
// This is a new partition its only index is row_idx for now.
646649
result.push((row, vec![row_idx]));
@@ -665,7 +668,7 @@ impl LinearSearch {
665668
self.row_map_out.clear();
666669
let mut partition_indices: Vec<(PartitionKey, Vec<u32>)> = vec![];
667670
for (hash, row_idx) in self.input_buffer_hashes.iter().zip(0u32..) {
668-
let entry = self.row_map_out.get_mut(*hash, |(_, group_idx, _)| {
671+
let entry = self.row_map_out.find_mut(*hash, |(_, group_idx, _)| {
669672
let row =
670673
get_row_at_idx(&partition_by_columns, row_idx as usize).unwrap();
671674
row == partition_indices[*group_idx].0
@@ -691,7 +694,7 @@ impl LinearSearch {
691694
if min_out == 0 {
692695
break;
693696
}
694-
self.row_map_out.insert(
697+
self.row_map_out.insert_unique(
695698
*hash,
696699
(*hash, partition_indices.len(), min_out),
697700
|(hash, _, _)| *hash,

0 commit comments

Comments
 (0)