-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat: Add example for implementing SAMPLE
using extension APIs
#17633
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This looks amazing -- I will try and review it soon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @theirix -- this is an amazing PR that shows how to "support your own SQL" dialect. I am sorry for the delay in reviewing
I recommend we call this example something more general like "custom_sql_dialect" or something (we can rename it as a follow on PR)
I also had several suggestions on how we can improve / simplify this example / improve the underlying APIs as well, though they can also be done as follow ons I think, if you prefer.
What I suggest is:
- Implement some of the simpler changes (move
main
up, add comments, simplifyEq
, cleanup imports, etc) - Merge this PR
- Work on improvements to API and comments as follow on PRs (you don't have to do it)
Again, this is really really cool. I have dreamed about this type of example for years 💤
seed: u64, | ||
} | ||
|
||
impl Hash for TableSamplePlanNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can derive these implementations automatically, as in
#[derive(Debug, Clone, Hash, Eq, PartialEq, PartialOrd)]
struct TableSamplePlanNode {
...
Rather than provide the implementations directly
Update: I tried that and found it was due to the f64
error[E0277]: the trait bound `f64: std::hash::Hash` is not satisfied
--> datafusion-examples/examples/table_sample.rs:117:5
|
113 | #[derive(Debug, Clone, Hash, Eq, PartialEq, PartialOrd)]
| ---- in this derive macro expansion
...
117 | lower_bound: f64,
| ^^^^^^^^^^^^^^^^ the trait `std::hash::Hash` is not implemented for `f64`
|
Here is one proposal
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#![allow(unused_imports)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed? I think the code will get much simpler if it is removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, I forgot to remove all these experimental includes
expr: &ast::Expr, | ||
) -> Option<T> { | ||
match expr { | ||
ast::Expr::BinaryOp { left, op, right } => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
most of the rest of DataFusion would use an Expr
here and reuse the existing simplification implementation (e.g.
let expr = col("ts").eq(to_timestamp(vec![lit("2020-09-08T12:00:00+00:00")])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried to rework this functionality using a more generic expression simplified. Since this change is quite large, I would prefer to submit it as a different follow-up PR.
|
||
// Context provider for tests | ||
|
||
struct MockContextProvider<'a> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about making SessionContextProvider
public so it can be more easily reused? Having to define a very similar one here is a fairly large usability papercut in my opinion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what I initially thought, but I decided not to touch the core. All in-crate tests were able to reuse SessionContextProvider
. It would be reasonable and much more ergonomic to provide this functionality to the example and extensions as well.
I changed the visibility of this struct and added a public constructor while keeping the internals private. Please let me know if I should do it in a different PR.
) | ||
.await?; | ||
|
||
let table_source = provider_as_source(ctx.table_provider("alltypes_plain").await?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we made SessionContextProvider public this would be much simpler i think
} | ||
} | ||
|
||
#[tokio::main] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we move this main
to the top of the file so people can see what this example does / how it can be used, before having to wade through all the implementation details
match poll { | ||
Poll::Ready(Some(Ok(batch))) => { | ||
let start = baseline_metrics.elapsed_compute().clone(); | ||
let result = self.sampler.sample(&batch); | ||
let _timer = start.timer(); | ||
Poll::Ready(Some(result)) | ||
} | ||
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), | ||
Poll::Ready(None) => Poll::Ready(None), | ||
Poll::Pending => Poll::Pending, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use the ready! macro here to make this more concise it you want
Something like
match ready!(poll) {
Some(Ok(..))
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's really useful, thank you. Switched to it and recorded additional metrics
use rand::{Rng, SeedableRng}; | ||
use rand_distr::{Distribution, Poisson}; | ||
|
||
/// This example demonstrates the table sample support. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this undersells somewhat what this example does
How about something more like this:
/// This example demonstrates the table sample support. | |
/// This example demonstrates how to extend DataFusion's SQL parser to recognize | |
/// other syntax. | |
/// | |
/// This example shows how to extend the DataFusion SQL planner to support the | |
/// `TABLESAMPLE` clause in SQL queries and then use a custom user defined node | |
/// to implement the sampling logic in the physical plan. |
Use newtype to get hash/eq for f64
Co-authored-by: Andrew Lamb <[email protected]>
Signed-off-by: theirix <[email protected]>
Signed-off-by: theirix <[email protected]>
Signed-off-by: theirix <[email protected]>
Signed-off-by: theirix <[email protected]>
Thank you! I am glad it helps showcase Datafusion's extensibility. It wasn't immediately clear to me how to approach it, so it could be helpful for developers as a reference or a starting point. I have attempted to address most of the review suggestions, including a small core API change. I would prefer to move with smaller iterations with follow-ups. |
use futures::{ready, TryStreamExt}; | ||
use rand::rngs::StdRng; | ||
use rand::{Rng, SeedableRng}; | ||
use rand_distr::{Distribution, Poisson}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drive by comment: Would these imports be tidier if they were in the same block? There's several sections of datafusion, several sections of arrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for reviewing! I cleaned up comments and reordered imports - now it'd make much more sense!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noting some stuff from reading through this out of interest. Sorry if you're already on top of these changes!
// Classical way | ||
// let sql_to_rel = SqlToRel::new(&context_provider); | ||
// let logical_plan = sql_to_rel.sql_statement_to_plan(statement.clone())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Classical way to do what?
// Execute via standard sql call - doesn't work | ||
// let df = ctx.sql(sql).await?; | ||
// let batches = df.collect().await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove?
info!("Batches: {:?}", &batches); | ||
|
||
let result_string = pretty_format_batches(&batches) | ||
// pretty_format_batches_with_schema(table_source.schema(), &batches) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
Ok(()) | ||
} | ||
|
||
/// Hashable and comparible f64 for sampling bounds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Hashable and comparible f64 for sampling bounds | |
/// Hashable and comparable f64 for sampling bounds |
I am hoping to help push this PR along this week. Thank you for your patience, as always, @theirix |
SAMPLE
using extension APIs
Hi @theirix! For the next steps, I have a work in progress implementation of a similar idea, but which allows injecting any number of custom relation planners, and supports nested custom relations as well. I've added your If you want to give it a look, the draft PR is #17843 . |
Hello @geoffreyclaude ! I reckon it was only tested for the root sample clause so far. Let me check it soon |
Which issue does this PR close?
Rationale for this change
The rationale is explained in #13563 in detail with known syntax examples.
This is the third design for the table sample support.
My first design was an addition of an explicit rewrite function baked in into a select logical plan - Support data source sampling with TABLESAMPLE #16325
Second design introduced dedicated flexible logical and physical plans, but tied to datafusion core - feat: support table sample #16505
This third design abstracts the second design out of datafusion core into extensions.
What changes are included in this PR?
All changes are bundled to an example file since it is a PoC of extensibility - as discussed with @alamb in #13563 (comment) .
If the idea is viable, the code could be modularised, which is not possible in a
datafusion-examples
crate.It adds several components:
TableSamplePlanNode
trait for a sampling logical planTableSampleQueryPlanner
(traitQueryPlanner
)SampleExec
- mostly adapted from the second design, all kudos and thanks to @chenkovsky !ExtensionPlanner
) to build a physical planThe setup, as seen in main, is a bit cumbersome, but it works. Building a SQL extension with access to AST and without introducing a whole new statement (as large projects like arroyo, greptime or cube introduce new syntax) is complicated.
For full modularity I would propose a few extension points to
SqlToRel
. It could help to avoid manual parsing / logical plan / physical plan / execution conversions and would keep the concise client syntax.Are these changes tested?
Are there any user-facing changes?
No