diff --git a/datafusion/physical-plan/src/coalesce/mod.rs b/datafusion/physical-plan/src/coalesce/mod.rs index 5962362d7681..5e2a9daaa6f3 100644 --- a/datafusion/physical-plan/src/coalesce/mod.rs +++ b/datafusion/physical-plan/src/coalesce/mod.rs @@ -15,76 +15,40 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{ - builder::StringViewBuilder, cast::AsArray, Array, ArrayRef, RecordBatch, - RecordBatchOptions, -}; -use arrow::compute::concat_batches; +use arrow::array::RecordBatch; +use arrow::compute::BatchCoalescer; use arrow::datatypes::SchemaRef; -use std::sync::Arc; +use datafusion_common::{internal_err, Result}; -/// Concatenate multiple [`RecordBatch`]es -/// -/// `BatchCoalescer` concatenates multiple small [`RecordBatch`]es, produced by -/// operations such as `FilterExec` and `RepartitionExec`, into larger ones for -/// more efficient processing by subsequent operations. -/// -/// # Background -/// -/// Generally speaking, larger [`RecordBatch`]es are more efficient to process -/// than smaller record batches (until the CPU cache is exceeded) because there -/// is fixed processing overhead per batch. DataFusion tries to operate on -/// batches of `target_batch_size` rows to amortize this overhead -/// -/// ```text -/// ┌────────────────────┐ -/// │ RecordBatch │ -/// │ num_rows = 23 │ -/// └────────────────────┘ ┌────────────────────┐ -/// │ │ -/// ┌────────────────────┐ Coalesce │ │ -/// │ │ Batches │ │ -/// │ RecordBatch │ │ │ -/// │ num_rows = 50 │ ─ ─ ─ ─ ─ ─ ▶ │ │ -/// │ │ │ RecordBatch │ -/// │ │ │ num_rows = 106 │ -/// └────────────────────┘ │ │ -/// │ │ -/// ┌────────────────────┐ │ │ -/// │ │ │ │ -/// │ RecordBatch │ │ │ -/// │ num_rows = 33 │ └────────────────────┘ -/// │ │ -/// └────────────────────┘ -/// ``` -/// -/// # Notes: -/// -/// 1. Output rows are produced in the same order as the input rows -/// -/// 2. The output is a sequence of batches, with all but the last being at least -/// `target_batch_size` rows. -/// -/// 3. Eventually this may also be able to handle other optimizations such as a -/// combined filter/coalesce operation. +/// Concatenate multiple [`RecordBatch`]es and apply a limit /// +/// See [`BatchCoalescer`] for more details on how this works. #[derive(Debug)] -pub struct BatchCoalescer { - /// The input schema - schema: SchemaRef, - /// Minimum number of rows for coalesces batches - target_batch_size: usize, +pub struct LimitedBatchCoalescer { + /// The arrow structure that builds the output batches + inner: BatchCoalescer, /// Total number of rows returned so far total_rows: usize, - /// Buffered batches - buffer: Vec, - /// Buffered row count - buffered_rows: usize, /// Limit: maximum number of rows to fetch, `None` means fetch all rows fetch: Option, + /// Indicates if the coalescer is finished + finished: bool, + /// The biggest size of the coalesced batch + biggest_coalesce_size: usize, +} + +/// Status returned by [`LimitedBatchCoalescer::push_batch`] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PushBatchStatus { + /// The limit has **not** been reached, and more batches can be pushed + Continue, + /// The limit **has** been reached after processing this batch + /// The caller should call [`LimitedBatchCoalescer::finish`] + /// to flush any buffered rows and stop pushing more batches. + LimitReached, } -impl BatchCoalescer { +impl LimitedBatchCoalescer { /// Create a new `BatchCoalescer` /// /// # Arguments @@ -92,203 +56,108 @@ impl BatchCoalescer { /// - `target_batch_size` - the minimum number of rows for each /// output batch (until limit reached) /// - `fetch` - the maximum number of rows to fetch, `None` means fetch all rows + /// - `biggest_coalesce_size` - the max size of the batch to coalesce, now it's fixed to `target_batch_size / 2` pub fn new( schema: SchemaRef, target_batch_size: usize, fetch: Option, ) -> Self { Self { - schema, - target_batch_size, + inner: BatchCoalescer::new(schema, target_batch_size), total_rows: 0, - buffer: vec![], - buffered_rows: 0, fetch, + finished: false, + biggest_coalesce_size: target_batch_size / 2, } } /// Return the schema of the output batches pub fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) + self.inner.schema() } - /// Push next batch, and returns [`CoalescerState`] indicating the current - /// state of the buffer. - pub fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState { - let batch = gc_string_view_batch(&batch); - if self.limit_reached(&batch) { - CoalescerState::LimitReached - } else if self.target_reached(batch) { - CoalescerState::TargetReached - } else { - CoalescerState::Continue + /// Pushes the next [`RecordBatch`] into the coalescer and returns its status. + /// + /// # Arguments + /// * `batch` - The [`RecordBatch`] to append. + /// + /// # Returns + /// * [`PushBatchStatus::Continue`] - More batches can still be pushed. + /// * [`PushBatchStatus::LimitReached`] - The row limit was reached after processing + /// this batch. The caller should call [`Self::finish`] before retrieving the + /// remaining buffered batches. + /// + /// # Errors + /// Returns an error if called after [`Self::finish`] or if the internal push + /// operation fails. + pub fn push_batch(&mut self, batch: RecordBatch) -> Result { + if self.finished { + return internal_err!( + "LimitedBatchCoalescer: cannot push batch after finish" + ); } - } - /// Return true if the there is no data buffered - pub fn is_empty(&self) -> bool { - self.buffer.is_empty() - } + // if we are at the limit, return LimitReached + if let Some(fetch) = self.fetch { + // limit previously reached + if self.total_rows >= fetch { + return Ok(PushBatchStatus::LimitReached); + } - /// Checks if the buffer will reach the specified limit after getting - /// `batch`. - /// - /// If fetch would be exceeded, slices the received batch, updates the - /// buffer with it, and returns `true`. - /// - /// Otherwise: does nothing and returns `false`. - fn limit_reached(&mut self, batch: &RecordBatch) -> bool { - match self.fetch { - Some(fetch) if self.total_rows + batch.num_rows() >= fetch => { + // limit now reached + if self.total_rows + batch.num_rows() >= fetch { // Limit is reached let remaining_rows = fetch - self.total_rows; debug_assert!(remaining_rows > 0); - let batch = batch.slice(0, remaining_rows); - self.buffered_rows += batch.num_rows(); - self.total_rows = fetch; - self.buffer.push(batch); - true + let batch_head = batch.slice(0, remaining_rows); + self.total_rows += batch_head.num_rows(); + self.inner.push_batch(batch_head)?; + return Ok(PushBatchStatus::LimitReached); } - _ => false, } - } - /// Updates the buffer with the given batch. - /// - /// If the target batch size is reached, returns `true`. Otherwise, returns - /// `false`. - fn target_reached(&mut self, batch: RecordBatch) -> bool { - if batch.num_rows() == 0 { - false - } else { - self.total_rows += batch.num_rows(); - self.buffered_rows += batch.num_rows(); - self.buffer.push(batch); - self.buffered_rows >= self.target_batch_size + let num_rows = batch.num_rows(); + // Limit not reached, push the entire batch + self.total_rows += batch.num_rows(); + self.inner.push_batch(batch)?; + + // If the number of rows in the current batch exceeds the coalesce size, + // we emit the buffered batch early to avoid coalescing for large batches. + if num_rows > self.biggest_coalesce_size { + self.inner.finish_buffered_batch()?; } + Ok(PushBatchStatus::Continue) } - /// Concatenates and returns all buffered batches, and clears the buffer. - pub fn finish_batch(&mut self) -> datafusion_common::Result { - let batch = concat_batches(&self.schema, &self.buffer)?; - self.buffer.clear(); - self.buffered_rows = 0; - Ok(batch) + /// Return true if there is no data buffered + pub fn is_empty(&self) -> bool { + self.inner.is_empty() } -} -/// Indicates the state of the [`BatchCoalescer`] buffer after the -/// [`BatchCoalescer::push_batch()`] operation. -/// -/// The caller should take different actions, depending on the variant returned. -pub enum CoalescerState { - /// Neither the limit nor the target batch size is reached. + /// Complete the current buffered batch and finish the coalescer /// - /// Action: continue pushing batches. - Continue, - /// The limit has been reached. - /// - /// Action: call [`BatchCoalescer::finish_batch()`] to get the final - /// buffered results as a batch and finish the query. - LimitReached, - /// The specified minimum number of rows a batch should have is reached. - /// - /// Action: call [`BatchCoalescer::finish_batch()`] to get the current - /// buffered results as a batch and then continue pushing batches. - TargetReached, -} - -/// Heuristically compact `StringViewArray`s to reduce memory usage, if needed -/// -/// Decides when to consolidate the StringView into a new buffer to reduce -/// memory usage and improve string locality for better performance. -/// -/// This differs from `StringViewArray::gc` because: -/// 1. It may not compact the array depending on a heuristic. -/// 2. It uses a precise block size to reduce the number of buffers to track. -/// -/// # Heuristic -/// -/// If the average size of each view is larger than 32 bytes, we compact the array. -/// -/// `StringViewArray` include pointers to buffer that hold the underlying data. -/// One of the great benefits of `StringViewArray` is that many operations -/// (e.g., `filter`) can be done without copying the underlying data. -/// -/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the -/// `StringViewArray` may only refer to a small portion of the buffer, -/// significantly increasing memory usage. -fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { - let new_columns: Vec = batch - .columns() - .iter() - .map(|c| { - // Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long. - let Some(s) = c.as_string_view_opt() else { - return Arc::clone(c); - }; - - // Fast path: if the data buffers are empty, we can return the original array - if s.data_buffers().is_empty() { - return Arc::clone(c); - } - - let ideal_buffer_size: usize = s - .views() - .iter() - .map(|v| { - let len = (*v as u32) as usize; - if len > 12 { - len - } else { - 0 - } - }) - .sum(); - - // We don't use get_buffer_memory_size here, because gc is for the contents of the - // data buffers, not views and nulls. - let actual_buffer_size = - s.data_buffers().iter().map(|b| b.capacity()).sum::(); - - // Re-creating the array copies data and can be time consuming. - // We only do it if the array is sparse - if actual_buffer_size > (ideal_buffer_size * 2) { - // We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches. - // See https://github.com/apache/arrow-rs/issues/6094 for more details. - let mut builder = StringViewBuilder::with_capacity(s.len()); - if ideal_buffer_size > 0 { - builder = builder.with_fixed_block_size(ideal_buffer_size as u32); - } - - for v in s.iter() { - builder.append_option(v); - } - - let gc_string = builder.finish(); - - debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0 + /// Any subsequent calls to `push_batch()` will return an Err + pub fn finish(&mut self) -> Result<()> { + self.inner.finish_buffered_batch()?; + self.finished = true; + Ok(()) + } - Arc::new(gc_string) - } else { - Arc::clone(c) - } - }) - .collect(); - let mut options = RecordBatchOptions::new(); - options = options.with_row_count(Some(batch.num_rows())); - RecordBatch::try_new_with_options(batch.schema(), new_columns, &options) - .expect("Failed to re-create the gc'ed record batch") + /// Return the next completed batch, if any + pub fn next_completed_batch(&mut self) -> Option { + self.inner.next_completed_batch() + } } #[cfg(test)] mod tests { - use std::ops::Range; - use super::*; + use std::ops::Range; + use std::sync::Arc; - use arrow::array::{builder::ArrayBuilder, StringViewArray, UInt32Array}; + use arrow::array::UInt32Array; + use arrow::compute::concat_batches; use arrow::datatypes::{DataType, Field, Schema}; #[test] @@ -296,9 +165,9 @@ mod tests { let batch = uint32_batch(0..8); Test::new() .with_batches(std::iter::repeat_n(batch, 10)) - // expected output is batches of at least 20 rows (except for the final batch) + // expected output is batches of exactly 21 rows (except for the final batch) .with_target_batch_size(21) - .with_expected_output_sizes(vec![24, 24, 24, 8]) + .with_expected_output_sizes(vec![21, 21, 21, 17]) .run() } @@ -311,7 +180,7 @@ mod tests { // expected to behave the same as `test_concat_batches` .with_target_batch_size(21) .with_fetch(Some(100)) - .with_expected_output_sizes(vec![24, 24, 24, 8]) + .with_expected_output_sizes(vec![21, 21, 21, 17]) .run(); } @@ -323,7 +192,7 @@ mod tests { // input is 10 batches x 8 rows (80 rows) with fetch limit of 50 .with_target_batch_size(21) .with_fetch(Some(50)) - .with_expected_output_sizes(vec![24, 24, 2]) + .with_expected_output_sizes(vec![21, 21, 8]) .run(); } @@ -333,7 +202,7 @@ mod tests { Test::new() .with_batches(std::iter::repeat_n(batch, 10)) // input is 10 batches x 8 rows (80 rows) with fetch limit of 48 - .with_target_batch_size(21) + .with_target_batch_size(24) .with_fetch(Some(48)) .with_expected_output_sizes(vec![24, 24]) .run(); @@ -362,7 +231,7 @@ mod tests { .run() } - /// Test for [`BatchCoalescer`] + /// Test for [`LimitedBatchCoalescer`] /// /// Pushes the input batches to the coalescer and verifies that the resulting /// batches have the expected number of rows and contents. @@ -435,26 +304,32 @@ mod tests { let single_input_batch = concat_batches(&schema, &input_batches).unwrap(); let mut coalescer = - BatchCoalescer::new(Arc::clone(&schema), target_batch_size, fetch); + LimitedBatchCoalescer::new(Arc::clone(&schema), target_batch_size, fetch); let mut output_batches = vec![]; for batch in input_batches { - match coalescer.push_batch(batch) { - CoalescerState::Continue => {} - CoalescerState::LimitReached => { - output_batches.push(coalescer.finish_batch().unwrap()); - break; + match coalescer.push_batch(batch).unwrap() { + PushBatchStatus::Continue => { + // continue pushing batches } - CoalescerState::TargetReached => { - coalescer.buffered_rows = 0; - output_batches.push(coalescer.finish_batch().unwrap()); + PushBatchStatus::LimitReached => { + break; } } } - if coalescer.buffered_rows != 0 { - output_batches.extend(coalescer.buffer); + coalescer.finish().unwrap(); + while let Some(batch) = coalescer.next_completed_batch() { + output_batches.push(batch); } + let actual_output_sizes: Vec = + output_batches.iter().map(|b| b.num_rows()).collect(); + assert_eq!( + expected_output_sizes, actual_output_sizes, + "Unexpected number of rows in output batches\n\ + Expected\n{expected_output_sizes:#?}\nActual:{actual_output_sizes:#?}" + ); + // make sure we got the expected number of output batches and content let mut starting_idx = 0; assert_eq!(expected_output_sizes.len(), output_batches.len()); @@ -498,110 +373,6 @@ mod tests { .unwrap() } - #[test] - fn test_gc_string_view_batch_small_no_compact() { - // view with only short strings (no buffers) --> no need to compact - let array = StringViewTest { - rows: 1000, - strings: vec![Some("a"), Some("b"), Some("c")], - } - .build(); - - let gc_array = do_gc(array.clone()); - compare_string_array_values(&array, &gc_array); - assert_eq!(array.data_buffers().len(), 0); - assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction - } - - #[test] - fn test_gc_string_view_test_batch_empty() { - let schema = Schema::empty(); - let batch = RecordBatch::new_empty(schema.into()); - let output_batch = gc_string_view_batch(&batch); - assert_eq!(batch.num_columns(), output_batch.num_columns()); - assert_eq!(batch.num_rows(), output_batch.num_rows()); - } - - #[test] - fn test_gc_string_view_batch_large_no_compact() { - // view with large strings (has buffers) but full --> no need to compact - let array = StringViewTest { - rows: 1000, - strings: vec![Some("This string is longer than 12 bytes")], - } - .build(); - - let gc_array = do_gc(array.clone()); - compare_string_array_values(&array, &gc_array); - assert_eq!(array.data_buffers().len(), 5); - assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction - } - - #[test] - fn test_gc_string_view_batch_large_slice_compact() { - // view with large strings (has buffers) and only partially used --> no need to compact - let array = StringViewTest { - rows: 1000, - strings: vec![Some("this string is longer than 12 bytes")], - } - .build(); - - // slice only 11 rows, so most of the buffer is not used - let array = array.slice(11, 22); - - let gc_array = do_gc(array.clone()); - compare_string_array_values(&array, &gc_array); - assert_eq!(array.data_buffers().len(), 5); - assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a single buffer - } - - /// Compares the values of two string view arrays - fn compare_string_array_values(arr1: &StringViewArray, arr2: &StringViewArray) { - assert_eq!(arr1.len(), arr2.len()); - for (s1, s2) in arr1.iter().zip(arr2.iter()) { - assert_eq!(s1, s2); - } - } - - /// runs garbage collection on string view array - /// and ensures the number of rows are the same - fn do_gc(array: StringViewArray) -> StringViewArray { - let batch = - RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap(); - let gc_batch = gc_string_view_batch(&batch); - assert_eq!(batch.num_rows(), gc_batch.num_rows()); - assert_eq!(batch.schema(), gc_batch.schema()); - gc_batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap() - .clone() - } - - /// Describes parameters for creating a `StringViewArray` - struct StringViewTest { - /// The number of rows in the array - rows: usize, - /// The strings to use in the array (repeated over and over - strings: Vec>, - } - - impl StringViewTest { - /// Create a `StringViewArray` with the parameters specified in this struct - fn build(self) -> StringViewArray { - let mut builder = - StringViewBuilder::with_capacity(100).with_fixed_block_size(8192); - loop { - for &v in self.strings.iter() { - builder.append_option(v); - if builder.len() >= self.rows { - return builder.finish(); - } - } - } - } - } fn batch_to_pretty_strings(batch: &RecordBatch) -> String { arrow::util::pretty::pretty_format_batches(std::slice::from_ref(batch)) .unwrap() diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index d98530d28e91..20f4ca9d0b6a 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -34,7 +34,7 @@ use datafusion_common::Result; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; -use crate::coalesce::{BatchCoalescer, CoalescerState}; +use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus}; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, @@ -53,7 +53,7 @@ use futures::stream::{Stream, StreamExt}; /// buffering and returns the final batch once the number of collected rows /// reaches the `fetch` value. /// -/// See [`BatchCoalescer`] for more information +/// See [`LimitedBatchCoalescer`] for more information #[derive(Debug, Clone)] pub struct CoalesceBatchesExec { /// The input plan @@ -182,14 +182,13 @@ impl ExecutionPlan for CoalesceBatchesExec { ) -> Result { Ok(Box::pin(CoalesceBatchesStream { input: self.input.execute(partition, context)?, - coalescer: BatchCoalescer::new( + coalescer: LimitedBatchCoalescer::new( self.input.schema(), self.target_batch_size, self.fetch, ), baseline_metrics: BaselineMetrics::new(&self.metrics, partition), - // Start by pulling data - inner_state: CoalesceBatchesStreamState::Pull, + completed: false, })) } @@ -252,12 +251,11 @@ struct CoalesceBatchesStream { /// The input plan input: SendableRecordBatchStream, /// Buffer for combining batches - coalescer: BatchCoalescer, + coalescer: LimitedBatchCoalescer, /// Execution metrics baseline_metrics: BaselineMetrics, - /// The current inner state of the stream. This state dictates the current - /// action or operation to be performed in the streaming process. - inner_state: CoalesceBatchesStreamState, + /// is the input stream exhausted or limit reached? + completed: bool, } impl Stream for CoalesceBatchesStream { @@ -277,50 +275,6 @@ impl Stream for CoalesceBatchesStream { } } -/// Enumeration of possible states for `CoalesceBatchesStream`. -/// It represents different stages in the lifecycle of a stream of record batches. -/// -/// An example of state transition: -/// Notation: -/// `[3000]`: A batch with size 3000 -/// `{[2000], [3000]}`: `CoalesceBatchStream`'s internal buffer with 2 batches buffered -/// Input of `CoalesceBatchStream` will generate three batches `[2000], [3000], [4000]` -/// The coalescing procedure will go through the following steps with 4096 coalescing threshold: -/// 1. Read the first batch and get it buffered. -/// - initial state: `Pull` -/// - initial buffer: `{}` -/// - updated buffer: `{[2000]}` -/// - next state: `Pull` -/// 2. Read the second batch, the coalescing target is reached since 2000 + 3000 > 4096 -/// - initial state: `Pull` -/// - initial buffer: `{[2000]}` -/// - updated buffer: `{[2000], [3000]}` -/// - next state: `ReturnBuffer` -/// 4. Two batches in the batch get merged and consumed by the upstream operator. -/// - initial state: `ReturnBuffer` -/// - initial buffer: `{[2000], [3000]}` -/// - updated buffer: `{}` -/// - next state: `Pull` -/// 5. Read the third input batch. -/// - initial state: `Pull` -/// - initial buffer: `{}` -/// - updated buffer: `{[4000]}` -/// - next state: `Pull` -/// 5. The input is ended now. Jump to exhaustion state preparing the finalized data. -/// - initial state: `Pull` -/// - initial buffer: `{[4000]}` -/// - updated buffer: `{[4000]}` -/// - next state: `Exhausted` -#[derive(Debug, Clone, Eq, PartialEq)] -enum CoalesceBatchesStreamState { - /// State to pull a new batch from the input stream. - Pull, - /// State to return a buffered batch. - ReturnBuffer, - /// State indicating that the stream is exhausted. - Exhausted, -} - impl CoalesceBatchesStream { fn poll_next_inner( self: &mut Pin<&mut Self>, @@ -328,51 +282,39 @@ impl CoalesceBatchesStream { ) -> Poll>> { let cloned_time = self.baseline_metrics.elapsed_compute().clone(); loop { - match &self.inner_state { - CoalesceBatchesStreamState::Pull => { - // Attempt to pull the next batch from the input stream. - let input_batch = ready!(self.input.poll_next_unpin(cx)); - // Start timing the operation. The timer records time upon being dropped. - let _timer = cloned_time.timer(); - - match input_batch { - Some(Ok(batch)) => match self.coalescer.push_batch(batch) { - CoalescerState::Continue => {} - CoalescerState::LimitReached => { - self.inner_state = CoalesceBatchesStreamState::Exhausted; - } - CoalescerState::TargetReached => { - self.inner_state = - CoalesceBatchesStreamState::ReturnBuffer; - } - }, - None => { - // End of input stream, but buffered batches might still be present. - self.inner_state = CoalesceBatchesStreamState::Exhausted; + // If there is any completed batch ready, return it + if let Some(batch) = self.coalescer.next_completed_batch() { + return Poll::Ready(Some(Ok(batch))); + } + if self.completed { + // If input is done and no batches are ready, return None to signal end of stream. + return Poll::Ready(None); + } + // Attempt to pull the next batch from the input stream. + let input_batch = ready!(self.input.poll_next_unpin(cx)); + // Start timing the operation. The timer records time upon being dropped. + let _timer = cloned_time.timer(); + + match input_batch { + None => { + // Input stream is exhausted, finalize any remaining batches + self.completed = true; + self.coalescer.finish()?; + } + Some(Ok(batch)) => { + match self.coalescer.push_batch(batch)? { + PushBatchStatus::Continue => { + // Keep pushing more batches + } + PushBatchStatus::LimitReached => { + // limit was reached, so stop early + self.completed = true; + self.coalescer.finish()?; } - other => return Poll::Ready(other), } } - CoalesceBatchesStreamState::ReturnBuffer => { - let _timer = cloned_time.timer(); - // Combine buffered batches into one batch and return it. - let batch = self.coalescer.finish_batch()?; - // Set to pull state for the next iteration. - self.inner_state = CoalesceBatchesStreamState::Pull; - return Poll::Ready(Some(Ok(batch))); - } - CoalesceBatchesStreamState::Exhausted => { - // Handle the end of the input stream. - return if self.coalescer.is_empty() { - // If buffer is empty, return None indicating the stream is fully consumed. - Poll::Ready(None) - } else { - let _timer = cloned_time.timer(); - // If the buffer still contains batches, prepare to return them. - let batch = self.coalescer.finish_batch()?; - Poll::Ready(Some(Ok(batch))) - }; - } + // Error case + other => return Poll::Ready(other), } } }