diff --git a/src/cli.rs b/src/cli.rs index 281f60f51..ffb3a80eb 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -260,11 +260,12 @@ pub struct Options { help = "Set a fixed memory limit for query in GiB" )] pub query_memory_pool_size: Option, - + // reduced the max row group size from 1048576 + // smaller row groups help in faster query performance in multi threaded query #[arg( long, env = "P_PARQUET_ROW_GROUP_SIZE", - default_value = "1048576", + default_value = "262144", help = "Number of rows in a row group" )] pub row_group_size: usize, diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 47f2acb1b..f3e0bfcb8 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -215,10 +215,14 @@ impl FlightService for AirServiceImpl { Status::permission_denied("User Does not have permission to access this") })?; let time = Instant::now(); - let (records, _) = query - .execute(stream_name.clone()) - .await - .map_err(|err| Status::internal(err.to_string()))?; + + let stream_name_clone = stream_name.clone(); + let (records, _) = + match tokio::task::spawn_blocking(move || query.execute(stream_name_clone)).await { + Ok(Ok((records, fields))) => (records, fields), + Ok(Err(e)) => return Err(Status::internal(e.to_string())), + Err(err) => return Err(Status::internal(err.to_string())), + }; /* * INFO: No returning the schema with the data. diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 3b6f4dedf..0d7d0b340 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -19,6 +19,7 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder}; +use arrow_array::RecordBatch; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; @@ -130,7 +131,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Result<(Vec, Vec), QueryError> { + match tokio::task::spawn_blocking(move || query.execute(stream_name)).await { + Ok(Ok(result)) => Ok(result), + Ok(Err(e)) => Err(QueryError::Execute(e)), + Err(e) => Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))), + } +} + pub async fn get_counts( req: HttpRequest, counts_request: Json, diff --git a/src/query/mod.rs b/src/query/mod.rs index 44dcca003..fa422e9a1 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -88,15 +88,12 @@ impl Query { let runtime_config = runtime_config.with_memory_limit(pool_size, fraction); let runtime = Arc::new(runtime_config.build().unwrap()); + // All the config options are explained here - + // https://datafusion.apache.org/user-guide/configs.html let mut config = SessionConfig::default() .with_parquet_pruning(true) .with_prefer_existing_sort(true) - .with_round_robin_repartition(true); - - // For more details refer https://datafusion.apache.org/user-guide/configs.html - - // Reduce the number of rows read (if possible) - config.options_mut().execution.parquet.enable_page_index = true; + .with_batch_size(1000000); // Pushdown filters allows DF to push the filters as far down in the plan as possible // and thus, reducing the number of rows decoded @@ -104,14 +101,11 @@ impl Query { // Reorder filters allows DF to decide the order of filters minimizing the cost of filter evaluation config.options_mut().execution.parquet.reorder_filters = true; - - // Enable StringViewArray - // https://www.influxdata.com/blog/faster-queries-with-stringview-part-one-influxdb/ + config.options_mut().execution.parquet.binary_as_string = true; config .options_mut() .execution - .parquet - .schema_force_view_types = true; + .use_row_number_estimates_to_optimize_partitioning = true; let state = SessionStateBuilder::new() .with_default_features() @@ -135,6 +129,7 @@ impl Query { SessionContext::new_with_state(state) } + #[tokio::main(flavor = "multi_thread")] pub async fn execute( &self, stream_name: String,