From b55b9b667504bd2b50104dee5c39ce80df704922 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 3 Mar 2025 01:58:11 -0500 Subject: [PATCH 1/8] fix: multi threaded query execution add multi threading execution to query execute method allows datafusion to use multiple threads to perform parallel execution of plans improves query performance --- src/handlers/airplane.rs | 12 ++++++++---- src/handlers/http/query.rs | 9 +++++++-- src/query/mod.rs | 4 +++- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 47f2acb1b..f3e0bfcb8 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -215,10 +215,14 @@ impl FlightService for AirServiceImpl { Status::permission_denied("User Does not have permission to access this") })?; let time = Instant::now(); - let (records, _) = query - .execute(stream_name.clone()) - .await - .map_err(|err| Status::internal(err.to_string()))?; + + let stream_name_clone = stream_name.clone(); + let (records, _) = + match tokio::task::spawn_blocking(move || query.execute(stream_name_clone)).await { + Ok(Ok((records, fields))) => (records, fields), + Ok(Err(e)) => return Err(Status::internal(e.to_string())), + Err(err) => return Err(Status::internal(err.to_string())), + }; /* * INFO: No returning the schema with the data. diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 3b6f4dedf..88fe21e79 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -130,8 +130,13 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result (records, fields), + Ok(Err(e)) => return Err(QueryError::Execute(e)), + Err(e) => return Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))), + }; let response = QueryResponse { records, fields, diff --git a/src/query/mod.rs b/src/query/mod.rs index 44dcca003..31d79f58e 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -91,7 +91,8 @@ impl Query { let mut config = SessionConfig::default() .with_parquet_pruning(true) .with_prefer_existing_sort(true) - .with_round_robin_repartition(true); + .with_round_robin_repartition(true) + .with_batch_size(8192); // For more details refer https://datafusion.apache.org/user-guide/configs.html @@ -135,6 +136,7 @@ impl Query { SessionContext::new_with_state(state) } + #[tokio::main(flavor = "multi_thread")] pub async fn execute( &self, stream_name: String, From a09e56a611b123566b03049d91b60b0cd101de78 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 3 Mar 2025 02:20:08 -0500 Subject: [PATCH 2/8] separated code for execute query in tokio task --- src/handlers/http/query.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 88fe21e79..0d7d0b340 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -19,6 +19,7 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder}; +use arrow_array::RecordBatch; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; @@ -130,13 +131,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result (records, fields), - Ok(Err(e)) => return Err(QueryError::Execute(e)), - Err(e) => return Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))), - }; + let (records, fields) = execute_query(query, table_name.clone()).await?; + let response = QueryResponse { records, fields, @@ -154,6 +150,17 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result<(Vec, Vec), QueryError> { + match tokio::task::spawn_blocking(move || query.execute(stream_name)).await { + Ok(Ok(result)) => Ok(result), + Ok(Err(e)) => Err(QueryError::Execute(e)), + Err(e) => Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))), + } +} + pub async fn get_counts( req: HttpRequest, counts_request: Json, From fe492e7150932d3fd33677651d42a1115c9ead1f Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 3 Mar 2025 07:14:10 -0500 Subject: [PATCH 3/8] binary as string --- src/query/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/query/mod.rs b/src/query/mod.rs index 31d79f58e..5dd8e08b1 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -114,6 +114,8 @@ impl Query { .parquet .schema_force_view_types = true; + config.options_mut().execution.parquet.binary_as_string = true; + let state = SessionStateBuilder::new() .with_default_features() .with_config(config) From 475d1a700cab4e1e8d3c9d609442a833e824aa99 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 6 Mar 2025 02:29:01 -0500 Subject: [PATCH 4/8] config options updated --- src/query/mod.rs | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/src/query/mod.rs b/src/query/mod.rs index 5dd8e08b1..8b799ac00 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -91,30 +91,23 @@ impl Query { let mut config = SessionConfig::default() .with_parquet_pruning(true) .with_prefer_existing_sort(true) - .with_round_robin_repartition(true) - .with_batch_size(8192); + .with_information_schema(true) + .with_batch_size(1000000) + .with_coalesce_batches(true); // For more details refer https://datafusion.apache.org/user-guide/configs.html - // Reduce the number of rows read (if possible) - config.options_mut().execution.parquet.enable_page_index = true; - // Pushdown filters allows DF to push the filters as far down in the plan as possible // and thus, reducing the number of rows decoded config.options_mut().execution.parquet.pushdown_filters = true; // Reorder filters allows DF to decide the order of filters minimizing the cost of filter evaluation config.options_mut().execution.parquet.reorder_filters = true; - - // Enable StringViewArray - // https://www.influxdata.com/blog/faster-queries-with-stringview-part-one-influxdb/ + config.options_mut().execution.parquet.binary_as_string = true; config .options_mut() .execution - .parquet - .schema_force_view_types = true; - - config.options_mut().execution.parquet.binary_as_string = true; + .use_row_number_estimates_to_optimize_partitioning = true; let state = SessionStateBuilder::new() .with_default_features() @@ -149,7 +142,9 @@ impl Query { .execute_logical_plan(self.final_logical_plan(&time_partition)) .await?; - let fields = df + let optimised_df = df.repartition(Partitioning::RoundRobinBatch(16))?; + + let fields = optimised_df .schema() .fields() .iter() @@ -161,7 +156,7 @@ impl Query { return Ok((vec![], fields)); } - let results = df.collect().await?; + let results = optimised_df.collect().await?; Ok((results, fields)) } From a09765dd6a577a677785dfaf6e903563cebd5369 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 6 Mar 2025 06:37:41 -0500 Subject: [PATCH 5/8] removed hardcoded repartition size --- src/query/mod.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/query/mod.rs b/src/query/mod.rs index 8b799ac00..55e80a31e 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -91,7 +91,6 @@ impl Query { let mut config = SessionConfig::default() .with_parquet_pruning(true) .with_prefer_existing_sort(true) - .with_information_schema(true) .with_batch_size(1000000) .with_coalesce_batches(true); @@ -142,9 +141,7 @@ impl Query { .execute_logical_plan(self.final_logical_plan(&time_partition)) .await?; - let optimised_df = df.repartition(Partitioning::RoundRobinBatch(16))?; - - let fields = optimised_df + let fields = df .schema() .fields() .iter() @@ -156,7 +153,7 @@ impl Query { return Ok((vec![], fields)); } - let results = optimised_df.collect().await?; + let results = df.collect().await?; Ok((results, fields)) } From e31e6550ca352741b3a48aa36ae7775bc114cd05 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 6 Mar 2025 06:40:16 -0500 Subject: [PATCH 6/8] reduced row group size --- src/cli.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cli.rs b/src/cli.rs index 281f60f51..10723b73e 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -264,7 +264,7 @@ pub struct Options { #[arg( long, env = "P_PARQUET_ROW_GROUP_SIZE", - default_value = "1048576", + default_value = "262144", help = "Number of rows in a row group" )] pub row_group_size: usize, From 51d3ac223fd88a52b1af8534b0bb365aae0e0225 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 6 Mar 2025 07:31:33 -0500 Subject: [PATCH 7/8] removed coalesce batches as it is default behaviour --- src/query/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/query/mod.rs b/src/query/mod.rs index 55e80a31e..fa0b155f5 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -91,9 +91,7 @@ impl Query { let mut config = SessionConfig::default() .with_parquet_pruning(true) .with_prefer_existing_sort(true) - .with_batch_size(1000000) - .with_coalesce_batches(true); - + .with_batch_size(1000000); // For more details refer https://datafusion.apache.org/user-guide/configs.html // Pushdown filters allows DF to push the filters as far down in the plan as possible From 5a5dc0c6da28ae815dc408f1ce5c37c4f632edf6 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 6 Mar 2025 10:10:35 -0500 Subject: [PATCH 8/8] added comments to the configs updated --- src/cli.rs | 3 ++- src/query/mod.rs | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 10723b73e..ffb3a80eb 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -260,7 +260,8 @@ pub struct Options { help = "Set a fixed memory limit for query in GiB" )] pub query_memory_pool_size: Option, - + // reduced the max row group size from 1048576 + // smaller row groups help in faster query performance in multi threaded query #[arg( long, env = "P_PARQUET_ROW_GROUP_SIZE", diff --git a/src/query/mod.rs b/src/query/mod.rs index fa0b155f5..fa422e9a1 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -88,11 +88,12 @@ impl Query { let runtime_config = runtime_config.with_memory_limit(pool_size, fraction); let runtime = Arc::new(runtime_config.build().unwrap()); + // All the config options are explained here - + // https://datafusion.apache.org/user-guide/configs.html let mut config = SessionConfig::default() .with_parquet_pruning(true) .with_prefer_existing_sort(true) .with_batch_size(1000000); - // For more details refer https://datafusion.apache.org/user-guide/configs.html // Pushdown filters allows DF to push the filters as far down in the plan as possible // and thus, reducing the number of rows decoded