Skip to content

Commit 53fc94f

Browse files
authored
Datafusion-cli: Redesign the datafusion-cli execution and print, make it totally streaming printing without memory overhead (#14877)
* Inital version for streaming batch preview * Add more reasonable code * polish code * polish code * Add unit test * fmt * remove println
1 parent 3d64de4 commit 53fc94f

File tree

3 files changed

+678
-69
lines changed

3 files changed

+678
-69
lines changed

datafusion-cli/src/exec.rs

Lines changed: 35 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,6 @@ use crate::{
2626
object_storage::get_object_store,
2727
print_options::{MaxRows, PrintOptions},
2828
};
29-
use futures::StreamExt;
30-
use std::collections::HashMap;
31-
use std::fs::File;
32-
use std::io::prelude::*;
33-
use std::io::BufReader;
34-
3529
use datafusion::common::instant::Instant;
3630
use datafusion::common::{plan_datafusion_err, plan_err};
3731
use datafusion::config::ConfigFileType;
@@ -41,13 +35,15 @@ use datafusion::logical_expr::{DdlStatement, LogicalPlan};
4135
use datafusion::physical_plan::execution_plan::EmissionType;
4236
use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties};
4337
use datafusion::sql::parser::{DFParser, Statement};
44-
use datafusion::sql::sqlparser::dialect::dialect_from_str;
45-
46-
use datafusion::execution::memory_pool::MemoryConsumer;
47-
use datafusion::physical_plan::spill::get_record_batch_memory_size;
4838
use datafusion::sql::sqlparser;
39+
use datafusion::sql::sqlparser::dialect::dialect_from_str;
4940
use rustyline::error::ReadlineError;
5041
use rustyline::Editor;
42+
use std::collections::HashMap;
43+
use std::fs::File;
44+
use std::io::prelude::*;
45+
use std::io::BufReader;
46+
use std::sync::Arc;
5147
use tokio::signal;
5248

5349
/// run and execute SQL statements and commands, against a context with the given print options
@@ -230,18 +226,17 @@ pub(super) async fn exec_and_print(
230226
for statement in statements {
231227
let adjusted =
232228
AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement);
233-
234229
let plan = create_plan(ctx, statement).await?;
235230
let adjusted = adjusted.with_plan(&plan);
236231

237232
let df = ctx.execute_logical_plan(plan).await?;
238233
let physical_plan = df.create_physical_plan().await?;
239234

240-
// Track memory usage for the query result if it's bounded
241-
let mut reservation =
242-
MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool());
235+
let is_unbounded = physical_plan.boundedness().is_unbounded();
236+
let mut stream = execute_stream(Arc::clone(&physical_plan), task_ctx.clone())?;
243237

244-
if physical_plan.boundedness().is_unbounded() {
238+
// Both bounded and unbounded streams are streaming prints
239+
if is_unbounded {
245240
if physical_plan.pipeline_behavior() == EmissionType::Final {
246241
return plan_err!(
247242
"The given query can generate a valid result only once \
@@ -250,37 +245,43 @@ pub(super) async fn exec_and_print(
250245
}
251246
// As the input stream comes, we can generate results.
252247
// However, memory safety is not guaranteed.
253-
let stream = execute_stream(physical_plan, task_ctx.clone())?;
254-
print_options.print_stream(stream, now).await?;
248+
print_options
249+
.print_stream(MaxRows::Unlimited, stream, now)
250+
.await?;
255251
} else {
256252
// Bounded stream; collected results size is limited by the maxrows option
257253
let schema = physical_plan.schema();
258-
let mut stream = execute_stream(physical_plan, task_ctx.clone())?;
259-
let mut results = vec![];
260-
let mut row_count = 0_usize;
261254
let max_rows = match print_options.maxrows {
262255
MaxRows::Unlimited => usize::MAX,
263256
MaxRows::Limited(n) => n,
264257
};
265-
while let Some(batch) = stream.next().await {
266-
let batch = batch?;
267-
let curr_num_rows = batch.num_rows();
268-
// Stop collecting results if the number of rows exceeds the limit
269-
// results batch should include the last batch that exceeds the limit
270-
if row_count < max_rows + curr_num_rows {
271-
// Try to grow the reservation to accommodate the batch in memory
272-
reservation.try_grow(get_record_batch_memory_size(&batch))?;
273-
results.push(batch);
274-
}
275-
row_count += curr_num_rows;
258+
let stdout = std::io::stdout();
259+
let mut writer = stdout.lock();
260+
261+
// If we don't want to print the table, we should use the streaming print same as above
262+
if print_options.format != PrintFormat::Table
263+
&& print_options.format != PrintFormat::Automatic
264+
{
265+
print_options
266+
.print_stream(print_options.maxrows, stream, now)
267+
.await?;
268+
continue;
276269
}
270+
271+
// into_inner will finalize the print options to table if it's automatic
277272
adjusted
278273
.into_inner()
279-
.print_batches(schema, &results, now, row_count)?;
280-
reservation.free();
274+
.print_table_batch(
275+
print_options,
276+
schema,
277+
&mut stream,
278+
max_rows,
279+
&mut writer,
280+
now,
281+
)
282+
.await?;
281283
}
282284
}
283-
284285
Ok(())
285286
}
286287

0 commit comments

Comments
 (0)