Skip to content

Commit 655bb3d

Browse files
committed
refactor: migrate GroupValuesColumn to HashTable
For #13433.
1 parent f517ed2 commit 655bb3d

File tree

1 file changed

+53
-57
lines changed
  • datafusion/physical-plan/src/aggregates/group_values/multi_group_by

1 file changed

+53
-57
lines changed

datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs

Lines changed: 53 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ use arrow_array::{Array, ArrayRef};
4242
use arrow_schema::{DataType, Schema, SchemaRef, TimeUnit};
4343
use datafusion_common::hash_utils::create_hashes;
4444
use datafusion_common::{not_impl_err, DataFusionError, Result};
45-
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
45+
use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt};
4646
use datafusion_expr::EmitTo;
4747
use datafusion_physical_expr::binary_map::OutputType;
4848

49-
use hashbrown::raw::RawTable;
49+
use hashbrown::hash_table::HashTable;
5050

5151
const NON_INLINED_FLAG: u64 = 0x8000000000000000;
5252
const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF;
@@ -180,7 +180,7 @@ pub struct GroupValuesColumn<const STREAMING: bool> {
180180
/// And we use [`GroupIndexView`] to represent such `group indices` in table.
181181
///
182182
///
183-
map: RawTable<(u64, GroupIndexView)>,
183+
map: HashTable<(u64, GroupIndexView)>,
184184

185185
/// The size of `map` in bytes
186186
map_size: usize,
@@ -261,7 +261,7 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
261261

262262
/// Create a new instance of GroupValuesColumn if supported for the specified schema
263263
pub fn try_new(schema: SchemaRef) -> Result<Self> {
264-
let map = RawTable::with_capacity(0);
264+
let map = HashTable::with_capacity(0);
265265
Ok(Self {
266266
schema,
267267
map,
@@ -338,7 +338,7 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
338338
for (row, &target_hash) in batch_hashes.iter().enumerate() {
339339
let entry = self
340340
.map
341-
.get_mut(target_hash, |(exist_hash, group_idx_view)| {
341+
.find_mut(target_hash, |(exist_hash, group_idx_view)| {
342342
// It is ensured to be inlined in `scalarized_intern`
343343
debug_assert!(!group_idx_view.is_non_inlined());
344344

@@ -506,7 +506,7 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
506506
for (row, &target_hash) in batch_hashes.iter().enumerate() {
507507
let entry = self
508508
.map
509-
.get(target_hash, |(exist_hash, _)| target_hash == *exist_hash);
509+
.find(target_hash, |(exist_hash, _)| target_hash == *exist_hash);
510510

511511
let Some((_, group_index_view)) = entry else {
512512
// 1. Bucket not found case
@@ -733,7 +733,7 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
733733

734734
for &row in &self.vectorized_operation_buffers.remaining_row_indices {
735735
let target_hash = batch_hashes[row];
736-
let entry = map.get_mut(target_hash, |(exist_hash, _)| {
736+
let entry = map.find_mut(target_hash, |(exist_hash, _)| {
737737
// Somewhat surprisingly, this closure can be called even if the
738738
// hash doesn't match, so check the hash first with an integer
739739
// comparison first avoid the more expensive comparison with
@@ -852,7 +852,7 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
852852
/// Return group indices of the hash, also if its `group_index_view` is non-inlined
853853
#[cfg(test)]
854854
fn get_indices_by_hash(&self, hash: u64) -> Option<(Vec<usize>, GroupIndexView)> {
855-
let entry = self.map.get(hash, |(exist_hash, _)| hash == *exist_hash);
855+
let entry = self.map.find(hash, |(exist_hash, _)| hash == *exist_hash);
856856

857857
match entry {
858858
Some((_, group_index_view)) => {
@@ -1083,67 +1083,63 @@ impl<const STREAMING: bool> GroupValues for GroupValuesColumn<STREAMING> {
10831083
.collect::<Vec<_>>();
10841084
let mut next_new_list_offset = 0;
10851085

1086-
// SAFETY: self.map outlives iterator and is not modified concurrently
1087-
unsafe {
1088-
for bucket in self.map.iter() {
1089-
// In non-streaming case, we need to check if the `group index view`
1090-
// is `inlined` or `non-inlined`
1091-
if !STREAMING && bucket.as_ref().1.is_non_inlined() {
1092-
// Non-inlined case
1093-
// We take `group_index_list` from `old_group_index_lists`
1094-
1095-
// list_offset is incrementally
1096-
self.emit_group_index_list_buffer.clear();
1097-
let list_offset = bucket.as_ref().1.value() as usize;
1098-
for group_index in self.group_index_lists[list_offset].iter()
1099-
{
1100-
if let Some(remaining) = group_index.checked_sub(n) {
1101-
self.emit_group_index_list_buffer.push(remaining);
1102-
}
1086+
self.map.retain(|(_exist_hash, group_idx_view)| {
1087+
// In non-streaming case, we need to check if the `group index view`
1088+
// is `inlined` or `non-inlined`
1089+
if !STREAMING && group_idx_view.is_non_inlined() {
1090+
// Non-inlined case
1091+
// We take `group_index_list` from `old_group_index_lists`
1092+
1093+
// list_offset is incrementally
1094+
self.emit_group_index_list_buffer.clear();
1095+
let list_offset = group_idx_view.value() as usize;
1096+
for group_index in self.group_index_lists[list_offset].iter() {
1097+
if let Some(remaining) = group_index.checked_sub(n) {
1098+
self.emit_group_index_list_buffer.push(remaining);
11031099
}
1104-
1105-
// The possible results:
1106-
// - `new_group_index_list` is empty, we should erase this bucket
1107-
// - only one value in `new_group_index_list`, switch the `view` to `inlined`
1108-
// - still multiple values in `new_group_index_list`, build and set the new `unlined view`
1109-
if self.emit_group_index_list_buffer.is_empty() {
1110-
self.map.erase(bucket);
1111-
} else if self.emit_group_index_list_buffer.len() == 1 {
1112-
let group_index =
1113-
self.emit_group_index_list_buffer.first().unwrap();
1114-
bucket.as_mut().1 =
1115-
GroupIndexView::new_inlined(*group_index as u64);
1116-
} else {
1117-
let group_index_list =
1118-
&mut self.group_index_lists[next_new_list_offset];
1119-
group_index_list.clear();
1120-
group_index_list
1121-
.extend(self.emit_group_index_list_buffer.iter());
1122-
bucket.as_mut().1 = GroupIndexView::new_non_inlined(
1123-
next_new_list_offset as u64,
1124-
);
1125-
next_new_list_offset += 1;
1126-
}
1127-
1128-
continue;
11291100
}
11301101

1102+
// The possible results:
1103+
// - `new_group_index_list` is empty, we should erase this bucket
1104+
// - only one value in `new_group_index_list`, switch the `view` to `inlined`
1105+
// - still multiple values in `new_group_index_list`, build and set the new `unlined view`
1106+
if self.emit_group_index_list_buffer.is_empty() {
1107+
false
1108+
} else if self.emit_group_index_list_buffer.len() == 1 {
1109+
let group_index =
1110+
self.emit_group_index_list_buffer.first().unwrap();
1111+
*group_idx_view =
1112+
GroupIndexView::new_inlined(*group_index as u64);
1113+
true
1114+
} else {
1115+
let group_index_list =
1116+
&mut self.group_index_lists[next_new_list_offset];
1117+
group_index_list.clear();
1118+
group_index_list
1119+
.extend(self.emit_group_index_list_buffer.iter());
1120+
*group_idx_view = GroupIndexView::new_non_inlined(
1121+
next_new_list_offset as u64,
1122+
);
1123+
next_new_list_offset += 1;
1124+
true
1125+
}
1126+
} else {
11311127
// In `streaming case`, the `group index view` is ensured to be `inlined`
1132-
debug_assert!(!bucket.as_ref().1.is_non_inlined());
1128+
debug_assert!(!group_idx_view.is_non_inlined());
11331129

11341130
// Inlined case, we just decrement group index by n)
1135-
let group_index = bucket.as_ref().1.value() as usize;
1131+
let group_index = group_idx_view.value() as usize;
11361132
match group_index.checked_sub(n) {
11371133
// Group index was >= n, shift value down
11381134
Some(sub) => {
1139-
bucket.as_mut().1 =
1140-
GroupIndexView::new_inlined(sub as u64)
1135+
*group_idx_view = GroupIndexView::new_inlined(sub as u64);
1136+
true
11411137
}
11421138
// Group index was < n, so remove from table
1143-
None => self.map.erase(bucket),
1139+
None => false,
11441140
}
11451141
}
1146-
}
1142+
});
11471143

11481144
if !STREAMING {
11491145
self.group_index_lists.truncate(next_new_list_offset);
@@ -1234,7 +1230,7 @@ mod tests {
12341230
use arrow::{compute::concat_batches, util::pretty::pretty_format_batches};
12351231
use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray, StringViewArray};
12361232
use arrow_schema::{DataType, Field, Schema, SchemaRef};
1237-
use datafusion_common::utils::proxy::RawTableAllocExt;
1233+
use datafusion_common::utils::proxy::HashTableAllocExt;
12381234
use datafusion_expr::EmitTo;
12391235

12401236
use crate::aggregates::group_values::{

0 commit comments

Comments
 (0)