Skip to content

feat: support logical plan for EXECUTE statement #13194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,9 @@ impl DefaultPhysicalPlanner {
// statement can be prepared)
return not_impl_err!("Unsupported logical plan: Prepare");
}
LogicalPlan::Execute(_) => {
return not_impl_err!("Unsupported logical plan: Execute");
}
LogicalPlan::Dml(dml) => {
// DataFusion is a read-only query engine, but also a library, so consumers may implement this
return not_impl_err!("Unsupported logical plan: Dml({0})", dml.op);
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ impl NamePreserver {
| LogicalPlan::Join(_)
| LogicalPlan::TableScan(_)
| LogicalPlan::Limit(_)
| LogicalPlan::Execute(_)
),
}
}
Expand Down
17 changes: 13 additions & 4 deletions datafusion/expr/src/logical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use std::collections::HashMap;
use std::fmt;

use crate::{
expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr,
Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery,
Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan,
Unnest, Values, Window,
expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Execute,
Expr, Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection,
RecursiveQuery, Repartition, Sort, Subquery, SubqueryAlias,
TableProviderFilterPushDown, TableScan, Unnest, Values, Window,
};

use crate::dml::CopyTo;
Expand Down Expand Up @@ -626,6 +626,15 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
"Data Types": format!("{:?}", data_types)
})
}
LogicalPlan::Execute(Execute {
name, parameters, ..
}) => {
json!({
"Node Type": "Execute",
"Name": name,
"Parameters": expr_vec_fmt!(parameters),
})
}
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
json!({
"Node Type": "DescribeTable"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub use ddl::{
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct,
DistinctOn, EmptyRelation, Explain, Extension, FetchType, Filter, Join,
DistinctOn, EmptyRelation, Execute, Explain, Extension, FetchType, Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
Expand Down
39 changes: 39 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ pub enum LogicalPlan {
/// Prepare a statement and find any bind parameters
/// (e.g. `?`). This is used to implement SQL-prepared statements.
Prepare(Prepare),
/// Execute a prepared statement. This is used to implement SQL 'EXECUTE'.
Execute(Execute),
/// Data Manipulation Language (DML): Insert / Update / Delete
Dml(DmlStatement),
/// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
Expand Down Expand Up @@ -314,6 +316,7 @@ impl LogicalPlan {
LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
LogicalPlan::Prepare(Prepare { input, .. }) => input.schema(),
LogicalPlan::Execute(Execute { schema, .. }) => schema,
LogicalPlan::Explain(explain) => &explain.schema,
LogicalPlan::Analyze(analyze) => &analyze.schema,
LogicalPlan::Extension(extension) => extension.node.schema(),
Expand Down Expand Up @@ -457,6 +460,7 @@ impl LogicalPlan {
| LogicalPlan::Statement { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::Execute { .. }
| LogicalPlan::DescribeTable(_) => vec![],
}
}
Expand Down Expand Up @@ -560,6 +564,7 @@ impl LogicalPlan {
LogicalPlan::Subquery(_) => Ok(None),
LogicalPlan::EmptyRelation(_)
| LogicalPlan::Prepare(_)
| LogicalPlan::Execute(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Values(_)
| LogicalPlan::Explain(_)
Expand Down Expand Up @@ -712,6 +717,7 @@ impl LogicalPlan {
LogicalPlan::Analyze(_) => Ok(self),
LogicalPlan::Explain(_) => Ok(self),
LogicalPlan::Prepare(_) => Ok(self),
LogicalPlan::Execute(_) => Ok(self),
LogicalPlan::TableScan(_) => Ok(self),
LogicalPlan::EmptyRelation(_) => Ok(self),
LogicalPlan::Statement(_) => Ok(self),
Expand Down Expand Up @@ -1072,6 +1078,14 @@ impl LogicalPlan {
input: Arc::new(input),
}))
}
LogicalPlan::Execute(Execute { name, schema, .. }) => {
self.assert_no_inputs(inputs)?;
Ok(LogicalPlan::Execute(Execute {
name: name.clone(),
schema: Arc::clone(schema),
parameters: expr,
}))
}
LogicalPlan::TableScan(ts) => {
self.assert_no_inputs(inputs)?;
Ok(LogicalPlan::TableScan(TableScan {
Expand Down Expand Up @@ -1330,6 +1344,7 @@ impl LogicalPlan {
| LogicalPlan::Copy(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Prepare(_)
| LogicalPlan::Execute(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Extension(_) => None,
}
Expand Down Expand Up @@ -1933,6 +1948,9 @@ impl LogicalPlan {
}) => {
write!(f, "Prepare: {name:?} {data_types:?} ")
}
LogicalPlan::Execute(Execute { name, parameters, .. }) => {
write!(f, "Execute: {} params=[{}]", name, expr_vec_fmt!(parameters))
}
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
write!(f, "DescribeTable")
}
Expand Down Expand Up @@ -2599,6 +2617,27 @@ pub struct Prepare {
pub input: Arc<LogicalPlan>,
}

/// Execute a prepared statement.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Execute {
/// The name of the prepared statement to execute
pub name: String,
/// The execute parameters
pub parameters: Vec<Expr>,
/// Dummy schema
pub schema: DFSchemaRef,
}

// Comparison excludes the `schema` field.
impl PartialOrd for Execute {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.name.partial_cmp(&other.name) {
Some(Ordering::Equal) => self.parameters.partial_cmp(&other.parameters),
cmp => cmp,
}
}
}

/// Describe the schema of table
///
/// # Example output:
Expand Down
26 changes: 22 additions & 4 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
//! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions
use crate::{
dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement,
Distinct, DistinctOn, DmlStatement, Explain, Expr, Extension, Filter, Join, Limit,
LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition, Sort,
Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, Values,
Window,
Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join,
Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition,
Sort, Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode,
Values, Window,
};
use std::ops::Deref;
use std::sync::Arc;
Expand Down Expand Up @@ -363,6 +363,7 @@ impl TreeNode for LogicalPlan {
| LogicalPlan::Statement { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::Execute { .. }
| LogicalPlan::DescribeTable(_) => Transformed::no(self),
})
}
Expand Down Expand Up @@ -505,6 +506,9 @@ impl LogicalPlan {
.chain(fetch.iter())
.map(|e| e.deref())
.apply_until_stop(f),
LogicalPlan::Execute(Execute { parameters, .. }) => {
parameters.iter().apply_until_stop(f)
}
// plans without expressions
LogicalPlan::EmptyRelation(_)
| LogicalPlan::RecursiveQuery(_)
Expand Down Expand Up @@ -734,6 +738,20 @@ impl LogicalPlan {
})
})
}
LogicalPlan::Execute(Execute {
parameters,
name,
schema,
}) => parameters
.into_iter()
.map_until_stop_and_collect(f)?
.update_data(|parameters| {
LogicalPlan::Execute(Execute {
parameters,
name,
schema,
})
}),
// plans without expressions
LogicalPlan::EmptyRelation(_)
| LogicalPlan::Unnest(_)
Expand Down
3 changes: 2 additions & 1 deletion datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,8 @@ impl OptimizerRule for CommonSubexprEliminate {
| LogicalPlan::Copy(_)
| LogicalPlan::Unnest(_)
| LogicalPlan::RecursiveQuery(_)
| LogicalPlan::Prepare(_) => {
| LogicalPlan::Prepare(_)
| LogicalPlan::Execute(_) => {
// This rule handles recursion itself in a `ApplyOrder::TopDown` like
// manner.
plan.map_children(|c| self.rewrite(c, config))?
Expand Down
3 changes: 2 additions & 1 deletion datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ fn optimize_projections(
| LogicalPlan::RecursiveQuery(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Values(_)
| LogicalPlan::DescribeTable(_) => {
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Execute(_) => {
// These operators have no inputs, so stop the optimization process.
return Ok(Transformed::no(plan));
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1633,6 +1633,9 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlan::RecursiveQuery(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for RecursiveQuery",
)),
LogicalPlan::Execute(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for Execute",
)),
}
}
}
26 changes: 25 additions & 1 deletion datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use datafusion_expr::{
CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, DescribeTable,
DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView, EmptyRelation,
Explain, Expr, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder,
Execute, Explain, Expr, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder,
OperateFunctionArg, PlanType, Prepare, SetVariable, SortExpr,
Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
Expand Down Expand Up @@ -642,6 +642,30 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input: Arc::new(plan),
}))
}
Statement::Execute {
name,
parameters,
using,
} => {
// `USING` is a MySQL-specific syntax and currently not supported.
if !using.is_empty() {
return not_impl_err!(
"Execute statement with USING is not supported"
);
}

let empty_schema = DFSchema::empty();
let parameters = parameters
.into_iter()
.map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
.collect::<Result<Vec<Expr>>>()?;

Ok(LogicalPlan::Execute(Execute {
name: ident_to_string(&name),
parameters,
schema: DFSchemaRef::new(empty_schema),
}))
}

Statement::ShowTables {
extended,
Expand Down
1 change: 1 addition & 0 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl Unparser<'_> {
| LogicalPlan::Analyze(_)
| LogicalPlan::Extension(_)
| LogicalPlan::Prepare(_)
| LogicalPlan::Execute(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Copy(_)
| LogicalPlan::DescribeTable(_)
Expand Down
18 changes: 18 additions & 0 deletions datafusion/sqllogictest/test_files/prepare.slt
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,21 @@ PREPARE my_plan(INT, DOUBLE, DOUBLE, DOUBLE) AS SELECT id, SUM(age) FROM person

statement error
PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t (num, letter);

# test creating logical plan for EXECUTE statements
query TT
EXPLAIN EXECUTE my_plan;
----
logical_plan Execute: my_plan params=[]

query TT
EXPLAIN EXECUTE my_plan(10*2 + 1, 'Foo');
----
logical_plan Execute: my_plan params=[Int64(21), Utf8("Foo")]

query error DataFusion error: Schema error: No field named a\.
EXPLAIN EXECUTE my_plan(a);

# TODO: support EXECUTE queries
query error DataFusion error: This feature is not implemented: Unsupported logical plan: Execute
EXECUTE my_plan;