Skip to content

feat: add raw aggregate udf planner #11371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

tshauck
Copy link
Contributor

@tshauck tshauck commented Jul 9, 2024

Which issue does this PR close?

Related to #11228

Rationale for this change

This builds on the other work around plannable expressions (see: #11180).

This also may be used in #11229 but it's unclear yet what the best implementation pattern is, so I wanted to peal this part off after the name change to ExprPlanner.

What changes are included in this PR?

Adds an AggregateUDFPlanner which can plan aggregated expressions.

Are these changes tested?

Manually yes, but I couldn't find a good example to copy from the other planners, so open to something around a unittest.

Are there any user-facing changes?

No

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions core Core DataFusion crate labels Jul 9, 2024

// An `AggregateUDF` to be planned.
#[derive(Debug, Clone)]
pub struct RawAggregateUDF {
Copy link
Contributor Author

@tshauck tshauck Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thought I had during this is it might be nice to also update the naming pattern for these intermediate structs... maybe from Raw* to Plannable*?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that Plannable sounds better (perhaps we can propose the change as a follow on PR)?

What do you think @samuelcolvin and @jayzhan211 ?

@tshauck tshauck marked this pull request as ready for review July 9, 2024 22:00
&self,
aggregate_function: RawAggregateUDF,
) -> Result<PlannerResult<RawAggregateUDF>> {
Ok(PlannerResult::Original(aggregate_function))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we agree to convert count() to count(1) in planner. The conversion would be like

    fn plan_aggregate_udf(
        &self,
        aggregate_function: RawAggregateUDF,
    ) -> datafusion_common::Result<PlannerResult<RawAggregateUDF>> {

        let RawAggregateUDF { udf, args, distinct, filter, order_by, null_treatment } = aggregate_function;
        if udf.name() == "count" && args.is_empty() {
            let args = vec![lit(1)];
            let expr = Expr::AggregateFunction(AggregateFunction {func_def: datafusion_expr::expr::AggregateFunctionDefinition::UDF(udf), args, distinct, filter, order_by, null_treatment});
            return Ok(PlannerResult::Planned(expr))
        }

        let aggregate_function = RawAggregateUDF { udf, args, distinct, filter, order_by, null_treatment };
        Ok(PlannerResult::Original(aggregate_function))
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the path is after that to getting from count(1) to count() as we're talking about in #11229 (that also works in .select). I think until there's a clear path to that, I'm hesitant to expand the scope of this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the conversion from count(1) to count() is more like an optinization (rather than something for hte sql planner) as it would apply equally to SQL and to dataframe apis

Perhaps we could add it as part of https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.AggregateUDFImpl.html#method.simplify 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm think of dataframe API is that we only provide one count_star() function, so we don't need to deal with function rewrite for dataframe APIs.

For sql, count(), count(1) and count(*) are equivalent things that can all to one single Expr, so I think it is possible to have the conversion in planner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sql, count(), count(1) and count(*) are equivalent things that can all to one single Expr, so I think it is possible to have the conversion in planner.

I agree they are equivalent things. But I am thinking it may be hard to teach everyone who builds Exprs to call count_star rather than count(lit(1)) or count(Expr::Wildcard)

To be clear I don't have a massively strong opinion, but it seems like since we don't control people's creation of Expr we aren't going to be able to prevent stuff like count(1)

Copy link
Contributor

@jayzhan211 jayzhan211 Jul 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rewrite could be simple enough that we don't need to push down to optimizer.

pub fn count_star() -> Expr {
    Expr::AggregateFunction(datafusion_expr::expr::AggregateFunction::new_udf(
        count_udaf(),
        vec![Expr::Literal(COUNT_STAR_EXPANSION)],
        false,
        None,
        None,
        None,
    ))
}

pub fn count(expr: Expr) -> Expr {
    // For backward compatility, could use count_star instead
    if let Expr::Wildcard { qualifier: _ } = expr {
        count_star()
    } else {
        Expr::AggregateFunction(datafusion_expr::expr::AggregateFunction::new_udf(
            count_udaf(),
            vec![expr],
            false,
            None,
            None,
            None,
        ))
    }
}

I think we need a clear role between expression rewrite in planner vs rewrite in optimizer.
ExprPlanner is the first spot we get the expressions from SQLExpr and build up datafusion::Expr. For syntax rewrite like dict {a: b}, compound id a.b or equivalent args rewrite count(*) are all good candidate to determine the expected datafusion::Expr in ExprPlanner. Others should push down to optimizer since they may need const evaluation or multiple passes.

If the reason to rewrite expression in optimizer is because that benefit both sql and dataframe API, maybe redesign dataframe API is what we need 🤔 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a clear role between expression rewrite in planner vs rewrite in optimizer. ExprPlanner is the first spot we get the expressions from SQLExpr and build up datafusion::Expr. For syntax rewrite like dict {a: b}, compound id a.b or equivalent args rewrite count(*) are all good candidate to determine the expected datafusion::Expr in ExprPlanner. Others should push down to optimizer since they may need const evaluation or multiple passes.

If the reason to rewrite expression in optimizer is because that benefit both sql and dataframe API, maybe redesign dataframe API is what we need 🤔 ?

I guess in my mind the distinction is

  1. the planner is a mechanical transformation to something that could be run
  2. the optimizer rewrites expressions / plans to keep the exact same semantics (output results) but faster.

So in my mind the transformation of count(1) --> count() is an optimization as it potentially avoids having to accumulate a known column in the execution. Maybe there is a better way to think about it

This might be a good distinction to clarify in the documentation 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess in my mind the distinction is
the planner is a mechanical transformation to something that could be run
the optimizer rewrites expressions / plans to keep the exact same semantics (output results) but faster.

I agree with this. Rewrite in logical optimizer results to the better one.

So in my mind the transformation of count(1) --> count() is an optimization as it potentially avoids having to accumulate a known column in the execution. Maybe there is a better way to think about it

In the case of count, count(*), count(1), and count() are equivalent things, not one is faster than the other.

The optimization that counts the row number is not in the logical optimizer but in the physical optimizer. Therefore, the transformation from count() and count(*) to count(1) is not close to the second point IMO. What we are doing before the physical optimizer is to transform the equivalent expr to a single one (count(1)), so we just need to process that one in the physical optimizer. Note that we could process those 3 expressions individually in the physical optimizer, but standardizing to a single expression as early as possible might reduce the complexity of the downstream query processing. It is more like rewriting expressions to the equivalent expression to me.

This might be a good distinction to clarify in the documentation 🤔

It is nice to do so, we should clarify the role between these two.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @tshauck

I think this is an interesting PR. I don't fully understand if it is needed to support COUNT()

I tried running a query with count():

> select count() from values (1);
Error during planning: Error during planning: count does not support zero arguments. No function matches the given name and argument types 'count()'. You might need to add explicit type casts.
	Candidate functions:
	count(Any, .., Any)

It seems to me like the issue is that signature supplied by the implementation of count https://github.com/apache/datafusion/blob/main/datafusion/functions-aggregate/src/count.rs

Which appears to be Signature::VariadicAny:

signature: Signature::variadic_any(Volatility::Immutable),

For some reason does not support zero arguments:

pub fn supports_zero_argument(&self) -> bool {
match &self {
TypeSignature::Exact(vec) => vec.is_empty(),
TypeSignature::Uniform(0, _) | TypeSignature::Any(0) => true,
TypeSignature::OneOf(types) => types
.iter()
.any(|type_sig| type_sig.supports_zero_argument()),
_ => false,
}
}

Perhaps we just need to update the definition of count to allow zero arguments and the exsiting planner would "just work"

&self,
aggregate_function: RawAggregateUDF,
) -> Result<PlannerResult<RawAggregateUDF>> {
Ok(PlannerResult::Original(aggregate_function))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the conversion from count(1) to count() is more like an optinization (rather than something for hte sql planner) as it would apply equally to SQL and to dataframe apis

Perhaps we could add it as part of https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.AggregateUDFImpl.html#method.simplify 🤔


// An `AggregateUDF` to be planned.
#[derive(Debug, Clone)]
pub struct RawAggregateUDF {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that Plannable sounds better (perhaps we can propose the change as a follow on PR)?

What do you think @samuelcolvin and @jayzhan211 ?

args,
distinct,
filter,
order_by,
null_treatment,
};

for planner in self.planners.iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means that any planner extensions would have precidence over the built in aggregate functions from the Registry. I think that is ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can avoid clone like this

for planner in self.planners.iter() {
                    match planner.plan_aggregate_udf(raw_aggregate_function)? {
                        PlannerResult::Planned(e) => return Ok(e),
                        PlannerResult::Original(e) => {
                            raw_aggregate_function = e;
                        }
                    }
                }

                return Ok(Expr::AggregateFunction(expr::AggregateFunction::new_udf(
                    raw_aggregate_function.udf,
                    raw_aggregate_function.args,
                    raw_aggregate_function.distinct,
                    raw_aggregate_function.filter,
                    raw_aggregate_function.order_by,
                    raw_aggregate_function.null_treatment,
                )));

@tshauck tshauck closed this Jul 15, 2024
@tshauck tshauck deleted the add-raw-aggregate-udf branch July 15, 2024 21:24
@tshauck
Copy link
Contributor Author

tshauck commented Jul 15, 2024

I'm going to close this as I don't think it is necessary for count() though it seemed to be an incremental step towards supporting planned aggregate udf expressions.

For count() I actually did try that in the first commit into this PR (#11229). I tried turning it into a plannable expression which resulted in this PR, but it ends up being quiet complex and has some reduced functionality w.r.t. the way the dataframe api works. I'm gonna revert that PR back to the original approach, though I may do so in a new PR to keep things cleaner.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants