Skip to content

Commit fc70323

Browse files
authored
refactor: migrate LinearSearch to HashTable (#13658)
For #13433.
1 parent ce330ec commit fc70323

File tree

1 file changed

+15
-12
lines changed

1 file changed

+15
-12
lines changed

datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
6565

6666
use futures::stream::Stream;
6767
use futures::{ready, StreamExt};
68-
use hashbrown::raw::RawTable;
68+
use hashbrown::hash_table::HashTable;
6969
use indexmap::IndexMap;
7070
use log::debug;
7171

@@ -442,16 +442,16 @@ pub struct LinearSearch {
442442
/// is ordered by a, b and the window expression contains a PARTITION BY b, a
443443
/// clause, this attribute stores [1, 0].
444444
ordered_partition_by_indices: Vec<usize>,
445-
/// We use this [`RawTable`] to calculate unique partitions for each new
445+
/// We use this [`HashTable`] to calculate unique partitions for each new
446446
/// RecordBatch. First entry in the tuple is the hash value, the second
447447
/// entry is the unique ID for each partition (increments from 0 to n).
448-
row_map_batch: RawTable<(u64, usize)>,
449-
/// We use this [`RawTable`] to calculate the output columns that we can
448+
row_map_batch: HashTable<(u64, usize)>,
449+
/// We use this [`HashTable`] to calculate the output columns that we can
450450
/// produce at each cycle. First entry in the tuple is the hash value, the
451451
/// second entry is the unique ID for each partition (increments from 0 to n).
452452
/// The third entry stores how many new outputs are calculated for the
453453
/// corresponding partition.
454-
row_map_out: RawTable<(u64, usize, usize)>,
454+
row_map_out: HashTable<(u64, usize, usize)>,
455455
input_schema: SchemaRef,
456456
}
457457

@@ -610,8 +610,8 @@ impl LinearSearch {
610610
input_buffer_hashes: VecDeque::new(),
611611
random_state: Default::default(),
612612
ordered_partition_by_indices,
613-
row_map_batch: RawTable::with_capacity(256),
614-
row_map_out: RawTable::with_capacity(256),
613+
row_map_batch: HashTable::with_capacity(256),
614+
row_map_out: HashTable::with_capacity(256),
615615
input_schema,
616616
}
617617
}
@@ -631,7 +631,7 @@ impl LinearSearch {
631631
// res stores PartitionKey and row indices (indices where these partition occurs in the `batch`) for each partition.
632632
let mut result: Vec<(PartitionKey, Vec<u32>)> = vec![];
633633
for (hash, row_idx) in batch_hashes.into_iter().zip(0u32..) {
634-
let entry = self.row_map_batch.get_mut(hash, |(_, group_idx)| {
634+
let entry = self.row_map_batch.find_mut(hash, |(_, group_idx)| {
635635
// We can safely get the first index of the partition indices
636636
// since partition indices has one element during initialization.
637637
let row = get_row_at_idx(columns, row_idx as usize).unwrap();
@@ -641,8 +641,11 @@ impl LinearSearch {
641641
if let Some((_, group_idx)) = entry {
642642
result[*group_idx].1.push(row_idx)
643643
} else {
644-
self.row_map_batch
645-
.insert(hash, (hash, result.len()), |(hash, _)| *hash);
644+
self.row_map_batch.insert_unique(
645+
hash,
646+
(hash, result.len()),
647+
|(hash, _)| *hash,
648+
);
646649
let row = get_row_at_idx(columns, row_idx as usize)?;
647650
// This is a new partition its only index is row_idx for now.
648651
result.push((row, vec![row_idx]));
@@ -667,7 +670,7 @@ impl LinearSearch {
667670
self.row_map_out.clear();
668671
let mut partition_indices: Vec<(PartitionKey, Vec<u32>)> = vec![];
669672
for (hash, row_idx) in self.input_buffer_hashes.iter().zip(0u32..) {
670-
let entry = self.row_map_out.get_mut(*hash, |(_, group_idx, _)| {
673+
let entry = self.row_map_out.find_mut(*hash, |(_, group_idx, _)| {
671674
let row =
672675
get_row_at_idx(&partition_by_columns, row_idx as usize).unwrap();
673676
row == partition_indices[*group_idx].0
@@ -693,7 +696,7 @@ impl LinearSearch {
693696
if min_out == 0 {
694697
break;
695698
}
696-
self.row_map_out.insert(
699+
self.row_map_out.insert_unique(
697700
*hash,
698701
(*hash, partition_indices.len(), min_out),
699702
|(hash, _, _)| *hash,

0 commit comments

Comments
 (0)