Skip to content

Commit 42f9060

Browse files
authored
feat(substrait): add wildcard handling to producer (#12987)
* feat(substrait): add wildcard expand rule in producer * add comment describing need for ExpandWildcardRule
1 parent 97f7491 commit 42f9060

File tree

2 files changed

+42
-2
lines changed

2 files changed

+42
-2
lines changed

datafusion/substrait/src/logical_plan/producer.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use datafusion::config::ConfigOptions;
19+
use datafusion::optimizer::analyzer::expand_wildcard_rule::ExpandWildcardRule;
20+
use datafusion::optimizer::AnalyzerRule;
1821
use std::sync::Arc;
1922
use substrait::proto::expression_reference::ExprType;
2023

@@ -103,9 +106,14 @@ pub fn to_substrait_plan(plan: &LogicalPlan, ctx: &SessionContext) -> Result<Box
103106
// Parse relation nodes
104107
// Generate PlanRel(s)
105108
// Note: Only 1 relation tree is currently supported
109+
110+
// We have to expand wildcard expressions first as wildcards can't be represented in substrait
111+
let plan = Arc::new(ExpandWildcardRule::new())
112+
.analyze(plan.clone(), &ConfigOptions::default())?;
113+
106114
let plan_rels = vec![PlanRel {
107115
rel_type: Some(plan_rel::RelType::Root(RelRoot {
108-
input: Some(*to_substrait_rel(plan, ctx, &mut extensions)?),
116+
input: Some(*to_substrait_rel(&plan, ctx, &mut extensions)?),
109117
names: to_substrait_named_struct(plan.schema(), &mut extensions)?.names,
110118
})),
111119
}];

datafusion/substrait/tests/cases/roundtrip_logical_plan.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,13 @@ async fn simple_select() -> Result<()> {
183183

184184
#[tokio::test]
185185
async fn wildcard_select() -> Result<()> {
186-
roundtrip("SELECT * FROM data").await
186+
assert_expected_plan_unoptimized(
187+
"SELECT * FROM data",
188+
"Projection: data.a, data.b, data.c, data.d, data.e, data.f\
189+
\n TableScan: data",
190+
true,
191+
)
192+
.await
187193
}
188194

189195
#[tokio::test]
@@ -1174,6 +1180,32 @@ async fn verify_post_join_filter_value(proto: Box<Plan>) -> Result<()> {
11741180
Ok(())
11751181
}
11761182

1183+
async fn assert_expected_plan_unoptimized(
1184+
sql: &str,
1185+
expected_plan_str: &str,
1186+
assert_schema: bool,
1187+
) -> Result<()> {
1188+
let ctx = create_context().await?;
1189+
let df = ctx.sql(sql).await?;
1190+
let plan = df.into_unoptimized_plan();
1191+
let proto = to_substrait_plan(&plan, &ctx)?;
1192+
let plan2 = from_substrait_plan(&ctx, &proto).await?;
1193+
1194+
println!("{plan}");
1195+
println!("{plan2}");
1196+
1197+
println!("{proto:?}");
1198+
1199+
if assert_schema {
1200+
assert_eq!(plan.schema(), plan2.schema());
1201+
}
1202+
1203+
let plan2str = format!("{plan2}");
1204+
assert_eq!(expected_plan_str, &plan2str);
1205+
1206+
Ok(())
1207+
}
1208+
11771209
async fn assert_expected_plan(
11781210
sql: &str,
11791211
expected_plan_str: &str,

0 commit comments

Comments
 (0)