Skip to content

Replace execution_mode with emission_type and boundedness #13823

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
0d37441
feat: update execution modes and add bitflags dependency
jayzhan-synnada Dec 10, 2024
ac3cb45
add exec API
jayzhan-synnada Dec 10, 2024
a669ce1
replace done but has stackoverflow
jayzhan-synnada Dec 10, 2024
7e353da
exec API done
jayzhan-synnada Dec 10, 2024
7713a55
Refactor execution plan properties to remove execution mode
jayzhan-synnada Dec 10, 2024
3f61dcf
Refactor execution plan to remove `ExecutionMode` and introduce `Emis…
jayzhan-synnada Dec 10, 2024
4c4c636
fix test
jayzhan-synnada Dec 10, 2024
b196516
Refactor join handling and emission type logic
jayzhan-synnada Dec 10, 2024
32b9632
Implement emission type for execution plans
jayzhan-synnada Dec 10, 2024
1283ecb
Enhance join type documentation and refine emission type logic
jayzhan-synnada Dec 10, 2024
2d0adf8
Refactor emission type logic in join and sort execution plans
jayzhan-synnada Dec 10, 2024
ce1c25c
Refactor emission type handling in execution plans
jayzhan-synnada Dec 10, 2024
66e036c
Enhance execution plan properties with boundedness and emission type
jayzhan-synnada Dec 12, 2024
34891e9
Refactor execution plans to enhance boundedness and emission type han…
jayzhan-synnada Dec 13, 2024
c64873a
Refactor memory handling in execution plans
jayzhan-synnada Dec 13, 2024
09c85ca
Refactor boundedness checks in execution plans
jayzhan-synnada Dec 13, 2024
8fda3a3
Remove TODO comment regarding unbounded execution plans in `Unbounded…
jayzhan-synnada Dec 13, 2024
e20b7f7
Refactor execution plan boundedness and emission type handling
jayzhan-synnada Dec 13, 2024
5ca01d0
Refactor emission type and boundedness handling in execution plans
jayzhan-synnada Dec 13, 2024
530de0e
Refactor GlobalLimitExec and LocalLimitExec to enhance boundedness ha…
jayzhan-synnada Dec 13, 2024
3557182
Merge branch 'apache_main' into exec-mode
berkaysynnada Dec 16, 2024
bd5f6e4
Review Part1
berkaysynnada Dec 16, 2024
7e8ab0a
Update sanity_checker.rs
berkaysynnada Dec 17, 2024
0613416
addressing reviews
berkaysynnada Dec 17, 2024
754201b
Review Part 1
ozankabak Dec 17, 2024
5f10f98
Update datafusion/physical-plan/src/execution_plan.rs
ozankabak Dec 17, 2024
af83ce8
Update datafusion/physical-plan/src/execution_plan.rs
ozankabak Dec 17, 2024
b0d7139
Shorten imports
ozankabak Dec 18, 2024
6074af4
Enhance documentation for JoinType and Boundedness enums
jayzhan-synnada Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ use crate::{
};

use datafusion::common::instant::Instant;
use datafusion::common::plan_datafusion_err;
use datafusion::common::{plan_datafusion_err, plan_err};
use datafusion::config::ConfigFileType;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::execution_plan::EmissionType;
use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;
Expand Down Expand Up @@ -234,10 +235,19 @@ pub(super) async fn exec_and_print(
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

if physical_plan.execution_mode().is_unbounded() {
if physical_plan.boundedness().is_unbounded() {
if physical_plan.pipeline_behavior() == EmissionType::Final {
return plan_err!(
"The given query can generate a valid result only once \
the source finishes, but the source is unbounded"
);
}
// As the input stream comes, we can generate results.
// However, memory safety is not guaranteed.
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
// Bounded stream; collected results are printed after all input consumed.
let schema = physical_plan.schema();
let results = collect(physical_plan, task_ctx.clone()).await?;
adjusted.into_inner().print_batches(schema, &results, now)?;
Expand Down
8 changes: 5 additions & 3 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ use datafusion::error::Result;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::LogicalPlanBuilder;
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
Partitioning, PlanProperties, SendableRecordBatchStream,
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream,
};
use datafusion::prelude::*;

Expand Down Expand Up @@ -214,7 +215,8 @@ impl CustomExec {
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
EmissionType::Incremental,
Boundedness::Bounded,
)
}
}
Expand Down
25 changes: 17 additions & 8 deletions datafusion/common/src/join_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,30 @@ use crate::{DataFusionError, Result};
/// Join type
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
pub enum JoinType {
/// Inner Join
/// Inner Join - Returns only rows where there is a matching value in both tables based on the join condition.
/// For example, if joining table A and B on A.id = B.id, only rows where A.id equals B.id will be included.
/// All columns from both tables are returned for the matching rows. Non-matching rows are excluded entirely.
Inner,
/// Left Join
/// Left Join - Returns all rows from the left table and matching rows from the right table.
/// If no match, NULL values are returned for columns from the right table.
Left,
/// Right Join
/// Right Join - Returns all rows from the right table and matching rows from the left table.
/// If no match, NULL values are returned for columns from the left table.
Right,
/// Full Join
/// Full Join (also called Full Outer Join) - Returns all rows from both tables, matching rows where possible.
/// When a row from either table has no match in the other table, the missing columns are filled with NULL values.
/// For example, if table A has row X with no match in table B, the result will contain row X with NULL values for all of table B's columns.
/// This join type preserves all records from both tables, making it useful when you need to see all data regardless of matches.
Full,
/// Left Semi Join
/// Left Semi Join - Returns rows from the left table that have matching rows in the right table.
/// Only columns from the left table are returned.
LeftSemi,
/// Right Semi Join
/// Right Semi Join - Returns rows from the right table that have matching rows in the left table.
/// Only columns from the right table are returned.
RightSemi,
/// Left Anti Join
/// Left Anti Join - Returns rows from the left table that do not have a matching row in the right table.
LeftAnti,
/// Right Anti Join
/// Right Anti Join - Returns rows from the right table that do not have a matching row in the left table.
RightAnti,
/// Left Mark join
///
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::Statistics;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_plan::{ExecutionMode, PlanProperties};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::PlanProperties;

use futures::StreamExt;
use itertools::Itertools;
Expand Down Expand Up @@ -97,7 +98,8 @@ impl ArrowExec {
PlanProperties::new(
eq_properties,
Self::output_partitioning_helper(file_scan_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
EmissionType::Incremental,
Boundedness::Bounded,
)
}

Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ use super::FileScanConfig;
use crate::error::Result;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream, Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream, Statistics,
};

use arrow::datatypes::SchemaRef;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};

/// Execution plan for scanning Avro data source
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -81,7 +82,8 @@ impl AvroExec {
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(n_partitions), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
EmissionType::Incremental,
Boundedness::Bounded,
)
}
}
Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use crate::datasource::physical_plan::FileMeta;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
PlanProperties, SendableRecordBatchStream, Statistics,
};

use arrow::csv;
Expand All @@ -43,6 +43,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use futures::{StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
use object_store::{GetOptions, GetResultPayload, ObjectStore};
Expand Down Expand Up @@ -327,7 +328,8 @@ impl CsvExec {
PlanProperties::new(
eq_properties,
Self::output_partitioning_helper(file_scan_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
EmissionType::Incremental,
Boundedness::Bounded,
)
}

Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ use crate::datasource::physical_plan::FileMeta;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
PlanProperties, SendableRecordBatchStream, Statistics,
};

use arrow::json::ReaderBuilder;
use arrow::{datatypes::SchemaRef, json};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};

use futures::{StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
Expand Down Expand Up @@ -107,7 +108,8 @@ impl NdJsonExec {
PlanProperties::new(
eq_properties,
Self::output_partitioning_helper(file_scan_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
EmissionType::Incremental,
Boundedness::Bounded,
)
}

Expand Down
11 changes: 5 additions & 6 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ use crate::{
physical_optimizer::pruning::PruningPredicate,
physical_plan::{
metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream, Statistics,
},
};

use arrow::datatypes::SchemaRef;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};

use itertools::Itertools;
use log::debug;
Expand Down Expand Up @@ -654,13 +655,11 @@ impl ParquetExec {
orderings: &[LexOrdering],
file_config: &FileScanConfig,
) -> PlanProperties {
// Equivalence Properties
let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);

PlanProperties::new(
eq_properties,
EquivalenceProperties::new_with_orderings(schema, orderings),
Self::output_partitioning_helper(file_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
EmissionType::Incremental,
Boundedness::Bounded,
)
}

Expand Down
15 changes: 11 additions & 4 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ use datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef,
};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::execution_plan::EmissionType;
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
use datafusion_physical_plan::ExecutionPlanProperties;

use datafusion_physical_expr_common::sort_expr::LexOrdering;
use itertools::izip;

/// The `EnforceDistribution` rule ensures that distribution requirements are
Expand Down Expand Up @@ -1161,12 +1162,17 @@ fn ensure_distribution(
let should_use_estimates = config
.execution
.use_row_number_estimates_to_optimize_partitioning;
let is_unbounded = dist_context.plan.execution_mode().is_unbounded();
let unbounded_and_pipeline_friendly = dist_context.plan.boundedness().is_unbounded()
&& matches!(
dist_context.plan.pipeline_behavior(),
EmissionType::Incremental | EmissionType::Both
);
// Use order preserving variants either of the conditions true
// - it is desired according to config
// - when plan is unbounded
// - when it is pipeline friendly (can incrementally produce results)
let order_preserving_variants_desirable =
is_unbounded || config.optimizer.prefer_existing_sort;
unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort;

// Remove unnecessary repartition from the physical plan if any
let DistributionContext {
Expand Down Expand Up @@ -1459,7 +1465,8 @@ pub(crate) mod tests {
PlanProperties::new(
input.equivalence_properties().clone(), // Equivalence Properties
input.output_partitioning().clone(), // Output Partitioning
input.execution_mode(), // Execution Mode
input.pipeline_behavior(), // Pipeline Behavior
input.boundedness(), // Boundedness
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ fn replace_with_partial_sort(
let plan_any = plan.as_any();
if let Some(sort_plan) = plan_any.downcast_ref::<SortExec>() {
let child = Arc::clone(sort_plan.children()[0]);
if !child.execution_mode().is_unbounded() {
if !child.boundedness().is_unbounded() {
return Ok(plan);
}

Expand Down
28 changes: 19 additions & 9 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::execution_plan::EmissionType;

/// The [`JoinSelection`] rule tries to modify a given plan so that it can
/// accommodate infinite sources and optimize joins in the plan according to
Expand Down Expand Up @@ -516,7 +517,8 @@ fn statistical_join_selection_subrule(
pub type PipelineFixerSubrule =
dyn Fn(Arc<dyn ExecutionPlan>, &ConfigOptions) -> Result<Arc<dyn ExecutionPlan>>;

/// Converts a hash join to a symmetric hash join in the case of infinite inputs on both sides.
/// Converts a hash join to a symmetric hash join if both its inputs are
/// unbounded and incremental.
///
/// This subrule checks if a hash join can be replaced with a symmetric hash join when dealing
/// with unbounded (infinite) inputs on both sides. This replacement avoids pipeline breaking and
Expand All @@ -537,10 +539,18 @@ fn hash_join_convert_symmetric_subrule(
) -> Result<Arc<dyn ExecutionPlan>> {
// Check if the current plan node is a HashJoinExec.
if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>() {
let left_unbounded = hash_join.left.execution_mode().is_unbounded();
let right_unbounded = hash_join.right.execution_mode().is_unbounded();
// Process only if both left and right sides are unbounded.
if left_unbounded && right_unbounded {
let left_unbounded = hash_join.left.boundedness().is_unbounded();
let left_incremental = matches!(
hash_join.left.pipeline_behavior(),
EmissionType::Incremental | EmissionType::Both
);
let right_unbounded = hash_join.right.boundedness().is_unbounded();
let right_incremental = matches!(
hash_join.right.pipeline_behavior(),
EmissionType::Incremental | EmissionType::Both
);
// Process only if both left and right sides are unbounded and incrementally emit.
if left_unbounded && right_unbounded & left_incremental & right_incremental {
// Determine the partition mode based on configuration.
let mode = if config_options.optimizer.repartition_joins {
StreamJoinPartitionMode::Partitioned
Expand Down Expand Up @@ -669,8 +679,8 @@ fn hash_join_swap_subrule(
_config_options: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>() {
if hash_join.left.execution_mode().is_unbounded()
&& !hash_join.right.execution_mode().is_unbounded()
if hash_join.left.boundedness().is_unbounded()
&& !hash_join.right.boundedness().is_unbounded()
&& matches!(
*hash_join.join_type(),
JoinType::Inner
Expand Down Expand Up @@ -2025,12 +2035,12 @@ mod hash_join_tests {
assert_eq!(
(
t.case.as_str(),
if left.execution_mode().is_unbounded() {
if left.boundedness().is_unbounded() {
SourceType::Unbounded
} else {
SourceType::Bounded
},
if right.execution_mode().is_unbounded() {
if right.boundedness().is_unbounded() {
SourceType::Unbounded
} else {
SourceType::Bounded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::Transformed;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::execution_plan::EmissionType;
use datafusion_physical_plan::tree_node::PlanContext;
use datafusion_physical_plan::ExecutionPlanProperties;

use datafusion_physical_expr_common::sort_expr::LexOrdering;
use itertools::izip;

/// For a given `plan`, this object carries the information one needs from its
Expand Down Expand Up @@ -246,7 +247,8 @@ pub(crate) fn replace_with_order_preserving_variants(
// For unbounded cases, we replace with the order-preserving variant in any
// case, as doing so helps fix the pipeline. Also replace if config allows.
let use_order_preserving_variant = config.optimizer.prefer_existing_sort
|| !requirements.plan.execution_mode().pipeline_friendly();
|| (requirements.plan.boundedness().is_unbounded()
&& requirements.plan.pipeline_behavior() == EmissionType::Final);

// Create an alternate plan with order-preserving variants:
let mut alternate_plan = plan_with_order_preserving_variants(
Expand Down
Loading
Loading