From 5b90d3a7785e8ad59966c7a55c0d513e663eb00b Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 23 Apr 2022 07:17:27 -0700 Subject: [PATCH 01/11] Implementing POWER function --- datafusion/core/src/logical_plan/mod.rs | 2 +- .../core/src/physical_plan/functions.rs | 20 +++++ datafusion/core/tests/sql/functions.rs | 77 +++++++++++++++++++ datafusion/expr/src/built_in_function.rs | 4 + datafusion/expr/src/expr_fn.rs | 2 + .../physical-expr/src/math_expressions.rs | 48 +++++++++++- datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/from_proto.rs | 3 +- datafusion/proto/src/to_proto.rs | 1 + dev/build-ballista-docker.sh | 2 +- dev/docker/ballista.dockerfile | 3 +- pv.yaml | 26 +++++++ 12 files changed, 184 insertions(+), 5 deletions(-) create mode 100644 pv.yaml diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs index cc5023008f53..d73f89a642ea 100644 --- a/datafusion/core/src/logical_plan/mod.rs +++ b/datafusion/core/src/logical_plan/mod.rs @@ -47,7 +47,7 @@ pub use expr::{ count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, exp, exprlist_to_fields, floor, in_list, initcap, left, length, lit, lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5, min, now, now_expr, nullif, - octet_length, or, random, regexp_match, regexp_replace, repeat, replace, reverse, + octet_length, or, power, random, regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim, trunc, unalias, upper, diff --git a/datafusion/core/src/physical_plan/functions.rs b/datafusion/core/src/physical_plan/functions.rs index ae7a2bd7bbd7..79d85ed9a071 100644 --- a/datafusion/core/src/physical_plan/functions.rs +++ b/datafusion/core/src/physical_plan/functions.rs @@ -221,6 +221,11 @@ pub fn return_type( } }), + BuiltinScalarFunction::Power => match &input_expr_types[0] { + DataType::Int32 | DataType::Int64 => Ok(DataType::Int64), + _ => Ok(DataType::Float64), + }, + BuiltinScalarFunction::Abs | BuiltinScalarFunction::Acos | BuiltinScalarFunction::Asin @@ -623,6 +628,16 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature { fun.volatility(), ), BuiltinScalarFunction::Random => Signature::exact(vec![], fun.volatility()), + + BuiltinScalarFunction::Power => { + Signature::one_of( + vec![ + TypeSignature::Exact(vec![DataType::Int64, DataType::Int64]), + TypeSignature::Exact(vec![DataType::Float64, DataType::Float64]), + ], + fun.volatility(), + ) + } // math expressions expect 1 argument of type f64 or f32 // priority is given to f64 because e.g. `sqrt(1i32)` is in IR (real numbers) and thus we // return the best approximation for it (in f64). @@ -762,6 +777,11 @@ pub fn create_physical_fun( BuiltinScalarFunction::Sqrt => Arc::new(math_expressions::sqrt), BuiltinScalarFunction::Tan => Arc::new(math_expressions::tan), BuiltinScalarFunction::Trunc => Arc::new(math_expressions::trunc), + + BuiltinScalarFunction::Power => { + Arc::new(|args| make_scalar_function(math_expressions::power)(args)) + } + // string functions BuiltinScalarFunction::Array => Arc::new(array_expressions::array), BuiltinScalarFunction::Ascii => Arc::new(|args| match args[0].data_type() { diff --git a/datafusion/core/tests/sql/functions.rs b/datafusion/core/tests/sql/functions.rs index 226bb8159d78..a7b8b1a1e1ca 100644 --- a/datafusion/core/tests/sql/functions.rs +++ b/datafusion/core/tests/sql/functions.rs @@ -357,3 +357,80 @@ async fn coalesce_mul_with_default_value() -> Result<()> { assert_batches_eq!(expected, &actual); Ok(()) } + +#[tokio::test] +async fn test_power() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("i32", DataType::Int32, true), + Field::new("i64", DataType::Int64, true), + Field::new("f32", DataType::Float32, true), + Field::new("f64", DataType::Float64, true), + ])); + + let data = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![ + Some(2), + Some(5), + Some(0), + Some(-14), + None, + ])), + Arc::new(Int64Array::from(vec![ + Some(2), + Some(5), + Some(0), + Some(-14), + None, + ])), + Arc::new(Float32Array::from(vec![ + Some(1.0), + Some(2.5), + Some(0.0), + Some(-14.5), + None, + ])), + Arc::new(Float64Array::from(vec![ + Some(1.0), + Some(2.5), + Some(0.0), + Some(-14.5), + None, + ])), + ], + )?; + + let table = MemTable::try_new(schema, vec![vec![data]])?; + + let ctx = SessionContext::new(); + ctx.register_table("test", Arc::new(table))?; + let sql = + r"SELECT power(i32, 3) as power_i32, + power(i64, 3) as power_i64, + power(f32, 3) as power_f32, + power(f64, 3) as power_f64, + power(2, 3) as power_int_scalar, + power(2.5, 3) as power_float_scalar + FROM test"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-----------+-----------+-----------+-----------+------------------+--------------------+", + "| power_i32 | power_i64 | power_f32 | power_f64 | power_int_scalar | power_float_scalar |", + "+-----------+-----------+-----------+-----------+------------------+--------------------+", + "| 8 | 8 | 1 | 1 | 8 | 15.625 |", + "| 125 | 125 | 15.625 | 15.625 | 8 | 15.625 |", + "| 0 | 0 | 0 | 0 | 8 | 15.625 |", + "| -2744 | -2744 | -3048.625 | -3048.625 | 8 | 15.625 |", + "| | | | | 8 | 15.625 |", + "+-----------+-----------+-----------+-----------+------------------+--------------------+", + ]; + assert_batches_eq!(expected, &actual); + //dbg!(actual[0].schema().fields()); + assert_eq!(actual[0].schema().field_with_name("power_i32").unwrap().data_type().to_owned(), DataType::Int64); + assert_eq!(actual[0].schema().field_with_name("power_i64").unwrap().data_type().to_owned(), DataType::Int64); + assert_eq!(actual[0].schema().field_with_name("power_f32").unwrap().data_type().to_owned(), DataType::Float64); + assert_eq!(actual[0].schema().field_with_name("power_f64").unwrap().data_type().to_owned(), DataType::Float64); + + Ok(()) +} \ No newline at end of file diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index 7cc03546131e..17df179ed400 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -54,6 +54,8 @@ pub enum BuiltinScalarFunction { Log10, /// log2 Log2, + /// power + Power, /// round Round, /// signum @@ -184,6 +186,7 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::Log => Volatility::Immutable, BuiltinScalarFunction::Log10 => Volatility::Immutable, BuiltinScalarFunction::Log2 => Volatility::Immutable, + BuiltinScalarFunction::Power => Volatility::Immutable, BuiltinScalarFunction::Round => Volatility::Immutable, BuiltinScalarFunction::Signum => Volatility::Immutable, BuiltinScalarFunction::Sin => Volatility::Immutable, @@ -267,6 +270,7 @@ impl FromStr for BuiltinScalarFunction { "log" => BuiltinScalarFunction::Log, "log10" => BuiltinScalarFunction::Log10, "log2" => BuiltinScalarFunction::Log2, + "power" => BuiltinScalarFunction::Power, "round" => BuiltinScalarFunction::Round, "signum" => BuiltinScalarFunction::Signum, "sin" => BuiltinScalarFunction::Sin, diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index d9cd6a552acb..89d5675149e4 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -240,6 +240,8 @@ unary_scalar_expr!(Log2, log2); unary_scalar_expr!(Log10, log10); unary_scalar_expr!(Ln, ln); unary_scalar_expr!(NullIf, nullif); +scalar_expr!(Power, power, exponent); + // string functions scalar_expr!(Ascii, ascii, string); diff --git a/datafusion/physical-expr/src/math_expressions.rs b/datafusion/physical-expr/src/math_expressions.rs index b16a59634f50..dd44ecbabe4d 100644 --- a/datafusion/physical-expr/src/math_expressions.rs +++ b/datafusion/physical-expr/src/math_expressions.rs @@ -17,7 +17,7 @@ //! Math expressions -use arrow::array::{Float32Array, Float64Array}; +use arrow::array::{Float32Array, Float64Array, Int64Array}; use arrow::datatypes::DataType; use datafusion_common::ScalarValue; use datafusion_common::{DataFusionError, Result}; @@ -25,6 +25,8 @@ use datafusion_expr::ColumnarValue; use rand::{thread_rng, Rng}; use std::iter; use std::sync::Arc; +use arrow::array::ArrayRef; +use std::any::type_name; macro_rules! downcast_compute_op { ($ARRAY:expr, $NAME:expr, $FUNC:ident, $TYPE:ident) => {{ @@ -86,6 +88,18 @@ macro_rules! math_unary_function { }; } +macro_rules! downcast_arg { + ($ARG:expr, $NAME:expr, $ARRAY_TYPE:ident) => {{ + $ARG.as_any().downcast_ref::<$ARRAY_TYPE>().ok_or_else(|| { + DataFusionError::Internal(format!( + "could not cast {} to {}", + $NAME, + type_name::<$ARRAY_TYPE>() + )) + })? + }}; +} + math_unary_function!("sqrt", sqrt); math_unary_function!("sin", sin); math_unary_function!("cos", cos); @@ -120,6 +134,38 @@ pub fn random(args: &[ColumnarValue]) -> Result { Ok(ColumnarValue::Array(Arc::new(array))) } +macro_rules! make_function_inputs2 { + ($ARG1: expr, $ARG2: expr, $NAME1:expr, $NAME2: expr, $ARRAY_TYPE:ident, $FUNC: block) => {{ + let arg1 = downcast_arg!($ARG1, $NAME1, $ARRAY_TYPE); + let arg2 = downcast_arg!($ARG2, $NAME2, $ARRAY_TYPE); + + arg1 + .iter() + .zip(arg2.iter()) + .map(|(a1, a2)| match (a1, a2) { + (Some(a1), Some(a2)) => Some($FUNC(a1, a2.try_into().unwrap())), + _ => None, + }) + .collect::<$ARRAY_TYPE>() + }}; +} + +pub fn power(args: &[ArrayRef]) -> Result { + match args[0].data_type() { + DataType::Float32 | DataType::Float64 => + Ok(Arc::new(make_function_inputs2!(&args[0], &args[1], "base", "exponent", Float64Array, {f64::powf})) as ArrayRef), + + DataType::Int32 | DataType::Int64 => + Ok(Arc::new(make_function_inputs2!(&args[0], &args[1], "base", "exponent", Int64Array, {i64::pow})) as ArrayRef), + + other => Err(DataFusionError::Internal(format!( + "Unsupported data type {:?} for function power", + other + ))) + } +} + + #[cfg(test)] mod tests { diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 1e5a797cede0..1651a70191ea 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -184,6 +184,7 @@ enum ScalarFunction { Trim=61; Upper=62; Coalesce=63; + Power=64; } message ScalarFunctionNode { diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index 0aa24dd8ea0b..9b638c63ee65 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -30,7 +30,7 @@ use datafusion::{ logical_plan::{ abs, acos, ascii, asin, atan, ceil, character_length, chr, concat_expr, concat_ws_expr, cos, digest, exp, floor, left, ln, log10, log2, now_expr, nullif, - random, regexp_replace, repeat, replace, reverse, right, round, signum, sin, + power, random, regexp_replace, repeat, replace, reverse, right, round, signum, sin, split_part, sqrt, starts_with, strpos, substr, tan, to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trunc, window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits}, @@ -468,6 +468,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::Translate => Self::Translate, ScalarFunction::RegexpMatch => Self::RegexpMatch, ScalarFunction::Coalesce => Self::Coalesce, + ScalarFunction::Power => Self::Power, } } } diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs index 1e05f80472ad..0a99eda57cd1 100644 --- a/datafusion/proto/src/to_proto.rs +++ b/datafusion/proto/src/to_proto.rs @@ -1072,6 +1072,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::Translate => Self::Translate, BuiltinScalarFunction::RegexpMatch => Self::RegexpMatch, BuiltinScalarFunction::Coalesce => Self::Coalesce, + BuiltinScalarFunction::Power => Self::Power, }; Ok(scalar_function) diff --git a/dev/build-ballista-docker.sh b/dev/build-ballista-docker.sh index bc028da9e716..f8412e212c31 100755 --- a/dev/build-ballista-docker.sh +++ b/dev/build-ballista-docker.sh @@ -21,4 +21,4 @@ set -e . ./dev/build-set-env.sh docker build -t ballista-base:$BALLISTA_VERSION -f dev/docker/ballista-base.dockerfile . -docker build -t ballista:$BALLISTA_VERSION -f dev/docker/ballista.dockerfile . +DOCKER_BUILDKIT=0 docker build --progress=plain -t ballista:$BALLISTA_VERSION -f dev/docker/ballista.dockerfile . diff --git a/dev/docker/ballista.dockerfile b/dev/docker/ballista.dockerfile index a0a6ac94ad7c..eaaed0f3db2b 100644 --- a/dev/docker/ballista.dockerfile +++ b/dev/docker/ballista.dockerfile @@ -25,7 +25,7 @@ ARG RELEASE_FLAG=--release FROM ballista-base:0.6.0 AS base WORKDIR /tmp/ballista RUN apt-get -y install cmake -RUN cargo install cargo-chef --version 0.1.23 +RUN cargo install cargo-chef --version 0.1.34 FROM base as planner ADD Cargo.toml . @@ -75,6 +75,7 @@ ARG RELEASE_FLAG=--release # force build.rs to run to generate configure_me code. ENV FORCE_REBUILD='true' +RUN echo $(rustc --version) RUN cargo build $RELEASE_FLAG # put the executor on /executor (need to be copied from different places depending on FLAG) diff --git a/pv.yaml b/pv.yaml new file mode 100644 index 000000000000..328cb0b58a01 --- /dev/null +++ b/pv.yaml @@ -0,0 +1,26 @@ +apiVersion: v1 +kind: PersistentVolume +metadata: + name: data-pv + labels: + type: local +spec: + storageClassName: manual + capacity: + storage: 10Gi + accessModes: + - ReadWriteOnce + hostPath: + path: "/mnt" +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: data-pv-claim +spec: + storageClassName: manual + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 3Gi \ No newline at end of file From 2484973c5dcba5f3b13ecf504b9883dd7a77ace0 Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 23 Apr 2022 07:39:33 -0700 Subject: [PATCH 02/11] Delete pv.yaml --- pv.yaml | 26 -------------------------- 1 file changed, 26 deletions(-) delete mode 100644 pv.yaml diff --git a/pv.yaml b/pv.yaml deleted file mode 100644 index 328cb0b58a01..000000000000 --- a/pv.yaml +++ /dev/null @@ -1,26 +0,0 @@ -apiVersion: v1 -kind: PersistentVolume -metadata: - name: data-pv - labels: - type: local -spec: - storageClassName: manual - capacity: - storage: 10Gi - accessModes: - - ReadWriteOnce - hostPath: - path: "/mnt" ---- -apiVersion: v1 -kind: PersistentVolumeClaim -metadata: - name: data-pv-claim -spec: - storageClassName: manual - accessModes: - - ReadWriteOnce - resources: - requests: - storage: 3Gi \ No newline at end of file From 668700989faa81324da9f2bc7d88353c4ad53d10 Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 23 Apr 2022 10:34:20 -0700 Subject: [PATCH 03/11] Delete build-ballista-docker.sh --- dev/build-ballista-docker.sh | 24 ------------------------ 1 file changed, 24 deletions(-) delete mode 100755 dev/build-ballista-docker.sh diff --git a/dev/build-ballista-docker.sh b/dev/build-ballista-docker.sh deleted file mode 100755 index f8412e212c31..000000000000 --- a/dev/build-ballista-docker.sh +++ /dev/null @@ -1,24 +0,0 @@ -#!/bin/bash - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -set -e - -. ./dev/build-set-env.sh -docker build -t ballista-base:$BALLISTA_VERSION -f dev/docker/ballista-base.dockerfile . -DOCKER_BUILDKIT=0 docker build --progress=plain -t ballista:$BALLISTA_VERSION -f dev/docker/ballista.dockerfile . From 07e7f59a1c0142a5566c61f6e29c7661639603db Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 23 Apr 2022 10:34:30 -0700 Subject: [PATCH 04/11] Delete ballista.dockerfile --- dev/docker/ballista.dockerfile | 109 --------------------------------- 1 file changed, 109 deletions(-) delete mode 100644 dev/docker/ballista.dockerfile diff --git a/dev/docker/ballista.dockerfile b/dev/docker/ballista.dockerfile deleted file mode 100644 index eaaed0f3db2b..000000000000 --- a/dev/docker/ballista.dockerfile +++ /dev/null @@ -1,109 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Turn .dockerignore to .dockerallow by excluding everything and explicitly -# allowing specific files and directories. This enables us to quickly add -# dependency files to the docker content without scanning the whole directory. -# This setup requires to all of our docker containers have arrow's source -# as a mounted directory. - -ARG RELEASE_FLAG=--release -FROM ballista-base:0.6.0 AS base -WORKDIR /tmp/ballista -RUN apt-get -y install cmake -RUN cargo install cargo-chef --version 0.1.34 - -FROM base as planner -ADD Cargo.toml . -COPY ballista ./ballista/ -COPY ballista-examples ./ballista-examples/ -COPY benchmarks ./benchmarks/ -COPY datafusion ./datafusion/ -COPY datafusion-cli ./datafusion-cli/ -COPY datafusion-common ./datafusion-common/ -COPY datafusion-expr ./datafusion-expr/ -COPY datafusion-physical-expr ./datafusion-physical-expr/ -COPY datafusion-jit ./datafusion-jit/ -COPY datafusion-proto ./datafusion-proto/ -COPY datafusion-examples ./datafusion-examples/ -RUN cargo chef prepare --recipe-path recipe.json - -FROM base as cacher -COPY --from=planner /tmp/ballista/recipe.json recipe.json -RUN cargo chef cook $RELEASE_FLAG --recipe-path recipe.json - -FROM base as builder -RUN mkdir /tmp/ballista/ballista -RUN mkdir /tmp/ballista/ballista-examples -RUN mkdir /tmp/ballista/benchmarks -RUN mkdir /tmp/ballista/datafusion -RUN mkdir /tmp/ballista/datafusion-cli -RUN mkdir /tmp/ballista/datafusion-common -RUN mkdir /tmp/ballista/datafusion-expr -RUN mkdir /tmp/ballista/datafusion-physical-expr -RUN mkdir /tmp/ballista/datafusion-jit -RUN mkdir /tmp/ballista/datafusion-proto -RUN mkdir /tmp/ballista/datafusion-examples -ADD Cargo.toml . -COPY ballista ./ballista/ -COPY ballista-examples ./ballista-examples/ -COPY benchmarks ./benchmarks/ -COPY datafusion ./datafusion/ -COPY datafusion-cli ./datafusion-cli/ -COPY datafusion-common ./datafusion-common/ -COPY datafusion-expr ./datafusion-expr/ -COPY datafusion-physical-expr ./datafusion-physical-expr/ -COPY datafusion-jit ./datafusion-jit/ -COPY datafusion-proto ./datafusion-proto/ -COPY datafusion-examples ./datafusion-examples/ -COPY --from=cacher /tmp/ballista/target target -ARG RELEASE_FLAG=--release - -# force build.rs to run to generate configure_me code. -ENV FORCE_REBUILD='true' -RUN echo $(rustc --version) -RUN cargo build $RELEASE_FLAG - -# put the executor on /executor (need to be copied from different places depending on FLAG) -ENV RELEASE_FLAG=${RELEASE_FLAG} -RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/ballista-executor /executor; else mv /tmp/ballista/target/release/ballista-executor /executor; fi - -# put the scheduler on /scheduler (need to be copied from different places depending on FLAG) -ENV RELEASE_FLAG=${RELEASE_FLAG} -RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/ballista-scheduler /scheduler; else mv /tmp/ballista/target/release/ballista-scheduler /scheduler; fi - -# put the tpch on /tpch (need to be copied from different places depending on FLAG) -ENV RELEASE_FLAG=${RELEASE_FLAG} -RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/tpch /tpch; else mv /tmp/ballista/target/release/tpch /tpch; fi - -# Copy the binary into a new container for a smaller docker image -FROM ballista-base:0.6.0 - -COPY --from=builder /executor / - -COPY --from=builder /scheduler / - -COPY --from=builder /tpch / - -ADD benchmarks/run.sh / -RUN mkdir /queries -COPY benchmarks/queries/ /queries/ - -ENV RUST_LOG=info -ENV RUST_BACKTRACE=full - -CMD ["/executor"] From d8c73c1ac6242992a96b45a78d880e4c43bfcafd Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 23 Apr 2022 12:31:28 -0700 Subject: [PATCH 05/11] aligining with latest upstream changes --- .../core/src/physical_plan/functions.rs | 486 +----------------- datafusion/expr/src/expr_fn.rs | 2 +- datafusion/expr/src/function.rs | 14 + .../physical-expr/src/math_expressions.rs | 33 +- datafusion/proto/src/from_proto.rs | 4 + 5 files changed, 37 insertions(+), 502 deletions(-) diff --git a/datafusion/core/src/physical_plan/functions.rs b/datafusion/core/src/physical_plan/functions.rs index 2b73300d39dd..57928f27c104 100644 --- a/datafusion/core/src/physical_plan/functions.rs +++ b/datafusion/core/src/physical_plan/functions.rs @@ -52,200 +52,6 @@ use datafusion_physical_expr::math_expressions; use datafusion_physical_expr::string_expressions; use std::sync::Arc; -macro_rules! make_utf8_to_return_type { - ($FUNC:ident, $largeUtf8Type:expr, $utf8Type:expr) => { - fn $FUNC(arg_type: &DataType, name: &str) -> Result { - Ok(match arg_type { - DataType::LargeUtf8 => $largeUtf8Type, - DataType::Utf8 => $utf8Type, - _ => { - // this error is internal as `data_types` should have captured this. - return Err(DataFusionError::Internal(format!( - "The {:?} function can only accept strings.", - name - ))); - } - }) - } - }; -} - -make_utf8_to_return_type!(utf8_to_str_type, DataType::LargeUtf8, DataType::Utf8); -make_utf8_to_return_type!(utf8_to_int_type, DataType::Int64, DataType::Int32); -make_utf8_to_return_type!(utf8_to_binary_type, DataType::Binary, DataType::Binary); - -/// Returns the datatype of the scalar function -pub fn return_type( - fun: &BuiltinScalarFunction, - input_expr_types: &[DataType], -) -> Result { - // Note that this function *must* return the same type that the respective physical expression returns - // or the execution panics. - - if input_expr_types.is_empty() && !fun.supports_zero_argument() { - return Err(DataFusionError::Internal(format!( - "Builtin scalar function {} does not support empty arguments", - fun - ))); - } - - // verify that this is a valid set of data types for this function - data_types(input_expr_types, &signature(fun))?; - - // the return type of the built in function. - // Some built-in functions' return type depends on the incoming type. - match fun { - BuiltinScalarFunction::Array => Ok(DataType::FixedSizeList( - Box::new(Field::new("item", input_expr_types[0].clone(), true)), - input_expr_types.len() as i32, - )), - BuiltinScalarFunction::Ascii => Ok(DataType::Int32), - BuiltinScalarFunction::BitLength => { - utf8_to_int_type(&input_expr_types[0], "bit_length") - } - BuiltinScalarFunction::Btrim => utf8_to_str_type(&input_expr_types[0], "btrim"), - BuiltinScalarFunction::CharacterLength => { - utf8_to_int_type(&input_expr_types[0], "character_length") - } - BuiltinScalarFunction::Chr => Ok(DataType::Utf8), - BuiltinScalarFunction::Coalesce => { - // COALESCE has multiple args and they might get coerced, get a preview of this - let coerced_types = data_types(input_expr_types, &signature(fun)); - coerced_types.map(|types| types[0].clone()) - } - BuiltinScalarFunction::Concat => Ok(DataType::Utf8), - BuiltinScalarFunction::ConcatWithSeparator => Ok(DataType::Utf8), - BuiltinScalarFunction::DatePart => Ok(DataType::Int32), - BuiltinScalarFunction::DateTrunc => { - Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)) - } - BuiltinScalarFunction::InitCap => { - utf8_to_str_type(&input_expr_types[0], "initcap") - } - BuiltinScalarFunction::Left => utf8_to_str_type(&input_expr_types[0], "left"), - BuiltinScalarFunction::Lower => utf8_to_str_type(&input_expr_types[0], "lower"), - BuiltinScalarFunction::Lpad => utf8_to_str_type(&input_expr_types[0], "lpad"), - BuiltinScalarFunction::Ltrim => utf8_to_str_type(&input_expr_types[0], "ltrim"), - BuiltinScalarFunction::MD5 => utf8_to_str_type(&input_expr_types[0], "md5"), - BuiltinScalarFunction::NullIf => { - // NULLIF has two args and they might get coerced, get a preview of this - let coerced_types = data_types(input_expr_types, &signature(fun)); - coerced_types.map(|typs| typs[0].clone()) - } - BuiltinScalarFunction::OctetLength => { - utf8_to_int_type(&input_expr_types[0], "octet_length") - } - BuiltinScalarFunction::Random => Ok(DataType::Float64), - BuiltinScalarFunction::RegexpReplace => { - utf8_to_str_type(&input_expr_types[0], "regex_replace") - } - BuiltinScalarFunction::Repeat => utf8_to_str_type(&input_expr_types[0], "repeat"), - BuiltinScalarFunction::Replace => { - utf8_to_str_type(&input_expr_types[0], "replace") - } - BuiltinScalarFunction::Reverse => { - utf8_to_str_type(&input_expr_types[0], "reverse") - } - BuiltinScalarFunction::Right => utf8_to_str_type(&input_expr_types[0], "right"), - BuiltinScalarFunction::Rpad => utf8_to_str_type(&input_expr_types[0], "rpad"), - BuiltinScalarFunction::Rtrim => utf8_to_str_type(&input_expr_types[0], "rtrimp"), - BuiltinScalarFunction::SHA224 => { - utf8_to_binary_type(&input_expr_types[0], "sha224") - } - BuiltinScalarFunction::SHA256 => { - utf8_to_binary_type(&input_expr_types[0], "sha256") - } - BuiltinScalarFunction::SHA384 => { - utf8_to_binary_type(&input_expr_types[0], "sha384") - } - BuiltinScalarFunction::SHA512 => { - utf8_to_binary_type(&input_expr_types[0], "sha512") - } - BuiltinScalarFunction::Digest => { - utf8_to_binary_type(&input_expr_types[0], "digest") - } - BuiltinScalarFunction::SplitPart => { - utf8_to_str_type(&input_expr_types[0], "split_part") - } - BuiltinScalarFunction::StartsWith => Ok(DataType::Boolean), - BuiltinScalarFunction::Strpos => utf8_to_int_type(&input_expr_types[0], "strpos"), - BuiltinScalarFunction::Substr => utf8_to_str_type(&input_expr_types[0], "substr"), - BuiltinScalarFunction::ToHex => Ok(match input_expr_types[0] { - DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => { - DataType::Utf8 - } - _ => { - // this error is internal as `data_types` should have captured this. - return Err(DataFusionError::Internal( - "The to_hex function can only accept integers.".to_string(), - )); - } - }), - BuiltinScalarFunction::ToTimestamp => { - Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)) - } - BuiltinScalarFunction::ToTimestampMillis => { - Ok(DataType::Timestamp(TimeUnit::Millisecond, None)) - } - BuiltinScalarFunction::ToTimestampMicros => { - Ok(DataType::Timestamp(TimeUnit::Microsecond, None)) - } - BuiltinScalarFunction::ToTimestampSeconds => { - Ok(DataType::Timestamp(TimeUnit::Second, None)) - } - BuiltinScalarFunction::Now => Ok(DataType::Timestamp( - TimeUnit::Nanosecond, - Some("UTC".to_owned()), - )), - BuiltinScalarFunction::Translate => { - utf8_to_str_type(&input_expr_types[0], "translate") - } - BuiltinScalarFunction::Trim => utf8_to_str_type(&input_expr_types[0], "trim"), - BuiltinScalarFunction::Upper => utf8_to_str_type(&input_expr_types[0], "upper"), - BuiltinScalarFunction::RegexpMatch => Ok(match input_expr_types[0] { - DataType::LargeUtf8 => { - DataType::List(Box::new(Field::new("item", DataType::LargeUtf8, true))) - } - DataType::Utf8 => { - DataType::List(Box::new(Field::new("item", DataType::Utf8, true))) - } - _ => { - // this error is internal as `data_types` should have captured this. - return Err(DataFusionError::Internal( - "The regexp_extract function can only accept strings.".to_string(), - )); - } - }), - - BuiltinScalarFunction::Power => match &input_expr_types[0] { - DataType::Int32 | DataType::Int64 => Ok(DataType::Int64), - _ => Ok(DataType::Float64), - }, - - BuiltinScalarFunction::Abs - | BuiltinScalarFunction::Acos - | BuiltinScalarFunction::Asin - | BuiltinScalarFunction::Atan - | BuiltinScalarFunction::Ceil - | BuiltinScalarFunction::Cos - | BuiltinScalarFunction::Exp - | BuiltinScalarFunction::Floor - | BuiltinScalarFunction::Log - | BuiltinScalarFunction::Ln - | BuiltinScalarFunction::Log10 - | BuiltinScalarFunction::Log2 - | BuiltinScalarFunction::Round - | BuiltinScalarFunction::Signum - | BuiltinScalarFunction::Sin - | BuiltinScalarFunction::Sqrt - | BuiltinScalarFunction::Tan - | BuiltinScalarFunction::Trunc => match input_expr_types[0] { - DataType::Float32 => Ok(DataType::Float32), - _ => Ok(DataType::Float64), - }, - } -} - /// Create a physical (function) expression. /// This function errors when `args`' can't be coerced to a valid argument type of the function. pub fn create_physical_expr( @@ -361,293 +167,6 @@ pub fn create_physical_expr( ))) } -/// the signatures supported by the function `fun`. -fn signature(fun: &BuiltinScalarFunction) -> Signature { - // note: the physical expression must accept the type returned by this function or the execution panics. - - // for now, the list is small, as we do not have many built-in functions. - match fun { - BuiltinScalarFunction::Array => Signature::variadic( - array_expressions::SUPPORTED_ARRAY_TYPES.to_vec(), - fun.volatility(), - ), - BuiltinScalarFunction::Concat | BuiltinScalarFunction::ConcatWithSeparator => { - Signature::variadic(vec![DataType::Utf8], fun.volatility()) - } - BuiltinScalarFunction::Coalesce => Signature::variadic( - conditional_expressions::SUPPORTED_COALESCE_TYPES.to_vec(), - fun.volatility(), - ), - BuiltinScalarFunction::Ascii - | BuiltinScalarFunction::BitLength - | BuiltinScalarFunction::CharacterLength - | BuiltinScalarFunction::InitCap - | BuiltinScalarFunction::Lower - | BuiltinScalarFunction::MD5 - | BuiltinScalarFunction::OctetLength - | BuiltinScalarFunction::Reverse - | BuiltinScalarFunction::SHA224 - | BuiltinScalarFunction::SHA256 - | BuiltinScalarFunction::SHA384 - | BuiltinScalarFunction::SHA512 - | BuiltinScalarFunction::Trim - | BuiltinScalarFunction::Upper => Signature::uniform( - 1, - vec![DataType::Utf8, DataType::LargeUtf8], - fun.volatility(), - ), - BuiltinScalarFunction::Btrim - | BuiltinScalarFunction::Ltrim - | BuiltinScalarFunction::Rtrim => Signature::one_of( - vec![ - TypeSignature::Exact(vec![DataType::Utf8]), - TypeSignature::Exact(vec![DataType::Utf8, DataType::Utf8]), - ], - fun.volatility(), - ), - BuiltinScalarFunction::Chr | BuiltinScalarFunction::ToHex => { - Signature::uniform(1, vec![DataType::Int64], fun.volatility()) - } - BuiltinScalarFunction::Lpad | BuiltinScalarFunction::Rpad => Signature::one_of( - vec![ - TypeSignature::Exact(vec![DataType::Utf8, DataType::Int64]), - TypeSignature::Exact(vec![DataType::LargeUtf8, DataType::Int64]), - TypeSignature::Exact(vec![ - DataType::Utf8, - DataType::Int64, - DataType::Utf8, - ]), - TypeSignature::Exact(vec![ - DataType::LargeUtf8, - DataType::Int64, - DataType::Utf8, - ]), - TypeSignature::Exact(vec![ - DataType::Utf8, - DataType::Int64, - DataType::LargeUtf8, - ]), - TypeSignature::Exact(vec![ - DataType::LargeUtf8, - DataType::Int64, - DataType::LargeUtf8, - ]), - ], - fun.volatility(), - ), - BuiltinScalarFunction::Left - | BuiltinScalarFunction::Repeat - | BuiltinScalarFunction::Right => Signature::one_of( - vec![ - TypeSignature::Exact(vec![DataType::Utf8, DataType::Int64]), - TypeSignature::Exact(vec![DataType::LargeUtf8, DataType::Int64]), - ], - fun.volatility(), - ), - BuiltinScalarFunction::ToTimestamp => Signature::uniform( - 1, - vec![ - DataType::Utf8, - DataType::Int64, - DataType::Timestamp(TimeUnit::Millisecond, None), - DataType::Timestamp(TimeUnit::Microsecond, None), - DataType::Timestamp(TimeUnit::Second, None), - ], - fun.volatility(), - ), - BuiltinScalarFunction::ToTimestampMillis => Signature::uniform( - 1, - vec![ - DataType::Utf8, - DataType::Int64, - DataType::Timestamp(TimeUnit::Nanosecond, None), - DataType::Timestamp(TimeUnit::Microsecond, None), - DataType::Timestamp(TimeUnit::Second, None), - ], - fun.volatility(), - ), - BuiltinScalarFunction::ToTimestampMicros => Signature::uniform( - 1, - vec![ - DataType::Utf8, - DataType::Int64, - DataType::Timestamp(TimeUnit::Nanosecond, None), - DataType::Timestamp(TimeUnit::Millisecond, None), - DataType::Timestamp(TimeUnit::Second, None), - ], - fun.volatility(), - ), - BuiltinScalarFunction::ToTimestampSeconds => Signature::uniform( - 1, - vec![ - DataType::Utf8, - DataType::Int64, - DataType::Timestamp(TimeUnit::Nanosecond, None), - DataType::Timestamp(TimeUnit::Microsecond, None), - DataType::Timestamp(TimeUnit::Millisecond, None), - ], - fun.volatility(), - ), - BuiltinScalarFunction::Digest => { - Signature::exact(vec![DataType::Utf8, DataType::Utf8], fun.volatility()) - } - BuiltinScalarFunction::DateTrunc => Signature::exact( - vec![ - DataType::Utf8, - DataType::Timestamp(TimeUnit::Nanosecond, None), - ], - fun.volatility(), - ), - BuiltinScalarFunction::DatePart => Signature::one_of( - vec![ - TypeSignature::Exact(vec![DataType::Utf8, DataType::Date32]), - TypeSignature::Exact(vec![DataType::Utf8, DataType::Date64]), - TypeSignature::Exact(vec![ - DataType::Utf8, - DataType::Timestamp(TimeUnit::Second, None), - ]), - TypeSignature::Exact(vec![ - DataType::Utf8, - DataType::Timestamp(TimeUnit::Microsecond, None), - ]), - TypeSignature::Exact(vec![ - DataType::Utf8, - DataType::Timestamp(TimeUnit::Millisecond, None), - ]), - TypeSignature::Exact(vec![ - DataType::Utf8, - DataType::Timestamp(TimeUnit::Nanosecond, None), - ]), - ], - fun.volatility(), - ), - BuiltinScalarFunction::SplitPart => Signature::one_of( - vec![ - TypeSignature::Exact(vec![ - DataType::Utf8, - DataType::Utf8, - DataType::Int64, - ]), - TypeSignature::Exact(vec![ - DataType::LargeUtf8, - DataType::Utf8, - DataType::Int64, - ]), - TypeSignature::Exact(vec![ - DataType::Utf8, - DataType::LargeUtf8, - DataType::Int64, - ]), - TypeSignature::Exact(vec![ - DataType::LargeUtf8, - DataType::LargeUtf8, - DataType::Int64, - ]), - ], - fun.volatility(), - ), - - BuiltinScalarFunction::Strpos | BuiltinScalarFunction::StartsWith => { - Signature::one_of( - vec![ - TypeSignature::Exact(vec![DataType::Utf8, DataType::Utf8]), - TypeSignature::Exact(vec![DataType::Utf8, DataType::LargeUtf8]), - TypeSignature::Exact(vec![DataType::LargeUtf8, DataType::Utf8]), - TypeSignature::Exact(vec![DataType::LargeUtf8, DataType::LargeUtf8]), - ], - fun.volatility(), - ) - } - - BuiltinScalarFunction::Substr => Signature::one_of( - vec![ - TypeSignature::Exact(vec![DataType::Utf8, DataType::Int64]), - TypeSignature::Exact(vec![DataType::LargeUtf8, DataType::Int64]), - TypeSignature::Exact(vec![ - DataType::Utf8, - DataType::Int64, - DataType::Int64, - ]), - TypeSignature::Exact(vec![ - DataType::LargeUtf8, - DataType::Int64, - DataType::Int64, - ]), - ], - fun.volatility(), - ), - - BuiltinScalarFunction::Replace | BuiltinScalarFunction::Translate => { - Signature::one_of( - vec![TypeSignature::Exact(vec![ - DataType::Utf8, - DataType::Utf8, - DataType::Utf8, - ])], - fun.volatility(), - ) - } - BuiltinScalarFunction::RegexpReplace => Signature::one_of( - vec![ - TypeSignature::Exact(vec![ - DataType::Utf8, - DataType::Utf8, - DataType::Utf8, - ]), - TypeSignature::Exact(vec![ - DataType::Utf8, - DataType::Utf8, - DataType::Utf8, - DataType::Utf8, - ]), - ], - fun.volatility(), - ), - - BuiltinScalarFunction::NullIf => { - Signature::uniform(2, SUPPORTED_NULLIF_TYPES.to_vec(), fun.volatility()) - } - BuiltinScalarFunction::RegexpMatch => Signature::one_of( - vec![ - TypeSignature::Exact(vec![DataType::Utf8, DataType::Utf8]), - TypeSignature::Exact(vec![DataType::LargeUtf8, DataType::Utf8]), - TypeSignature::Exact(vec![ - DataType::Utf8, - DataType::Utf8, - DataType::Utf8, - ]), - TypeSignature::Exact(vec![ - DataType::LargeUtf8, - DataType::Utf8, - DataType::Utf8, - ]), - ], - fun.volatility(), - ), - BuiltinScalarFunction::Random => Signature::exact(vec![], fun.volatility()), - - BuiltinScalarFunction::Power => { - Signature::one_of( - vec![ - TypeSignature::Exact(vec![DataType::Int64, DataType::Int64]), - TypeSignature::Exact(vec![DataType::Float64, DataType::Float64]), - ], - fun.volatility(), - ) - } - // math expressions expect 1 argument of type f64 or f32 - // priority is given to f64 because e.g. `sqrt(1i32)` is in IR (real numbers) and thus we - // return the best approximation for it (in f64). - // We accept f32 because in this case it is clear that the best approximation - // will be as good as the number of digits in the number - _ => Signature::uniform( - 1, - vec![DataType::Float64, DataType::Float32], - fun.volatility(), - ), - } -} - pub use datafusion_physical_expr::ScalarFunctionExpr; #[cfg(feature = "crypto_expressions")] @@ -774,11 +293,10 @@ pub fn create_physical_fun( BuiltinScalarFunction::Sqrt => Arc::new(math_expressions::sqrt), BuiltinScalarFunction::Tan => Arc::new(math_expressions::tan), BuiltinScalarFunction::Trunc => Arc::new(math_expressions::trunc), - BuiltinScalarFunction::Power => { Arc::new(|args| make_scalar_function(math_expressions::power)(args)) } - + // string functions BuiltinScalarFunction::Array => Arc::new(array_expressions::array), BuiltinScalarFunction::Ascii => Arc::new(|args| match args[0].data_type() { @@ -3649,4 +3167,4 @@ mod tests { Ok(()) } -} +} \ No newline at end of file diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index a8139e2db004..9b024f3e5eeb 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -241,7 +241,7 @@ unary_scalar_expr!(Log2, log2); unary_scalar_expr!(Log10, log10); unary_scalar_expr!(Ln, ln); unary_scalar_expr!(NullIf, nullif); -scalar_expr!(Power, power, exponent); +scalar_expr!(Power, power, base, exponent); // string functions diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs index 93c5d0e12fce..57d6d228b53e 100644 --- a/datafusion/expr/src/function.rs +++ b/datafusion/expr/src/function.rs @@ -216,6 +216,11 @@ pub fn return_type( )); } }), + + BuiltinScalarFunction::Power => match &input_expr_types[0] { + DataType::Int32 | DataType::Int64 => Ok(DataType::Int64), + _ => Ok(DataType::Float64), + }, BuiltinScalarFunction::Abs | BuiltinScalarFunction::Acos @@ -505,6 +510,15 @@ pub fn signature(fun: &BuiltinScalarFunction) -> Signature { fun.volatility(), ), BuiltinScalarFunction::Random => Signature::exact(vec![], fun.volatility()), + BuiltinScalarFunction::Power => { + Signature::one_of( + vec![ + TypeSignature::Exact(vec![DataType::Int64, DataType::Int64]), + TypeSignature::Exact(vec![DataType::Float64, DataType::Float64]), + ], + fun.volatility(), + ) + }, // math expressions expect 1 argument of type f64 or f32 // priority is given to f64 because e.g. `sqrt(1i32)` is in IR (real numbers) and thus we // return the best approximation for it (in f64). diff --git a/datafusion/physical-expr/src/math_expressions.rs b/datafusion/physical-expr/src/math_expressions.rs index dd44ecbabe4d..516aaa4b2317 100644 --- a/datafusion/physical-expr/src/math_expressions.rs +++ b/datafusion/physical-expr/src/math_expressions.rs @@ -100,6 +100,22 @@ macro_rules! downcast_arg { }}; } +macro_rules! make_function_inputs2 { + ($ARG1: expr, $ARG2: expr, $NAME1:expr, $NAME2: expr, $ARRAY_TYPE:ident, $FUNC: block) => {{ + let arg1 = downcast_arg!($ARG1, $NAME1, $ARRAY_TYPE); + let arg2 = downcast_arg!($ARG2, $NAME2, $ARRAY_TYPE); + + arg1 + .iter() + .zip(arg2.iter()) + .map(|(a1, a2)| match (a1, a2) { + (Some(a1), Some(a2)) => Some($FUNC(a1, a2.try_into().unwrap())), + _ => None, + }) + .collect::<$ARRAY_TYPE>() + }}; +} + math_unary_function!("sqrt", sqrt); math_unary_function!("sin", sin); math_unary_function!("cos", cos); @@ -134,22 +150,6 @@ pub fn random(args: &[ColumnarValue]) -> Result { Ok(ColumnarValue::Array(Arc::new(array))) } -macro_rules! make_function_inputs2 { - ($ARG1: expr, $ARG2: expr, $NAME1:expr, $NAME2: expr, $ARRAY_TYPE:ident, $FUNC: block) => {{ - let arg1 = downcast_arg!($ARG1, $NAME1, $ARRAY_TYPE); - let arg2 = downcast_arg!($ARG2, $NAME2, $ARRAY_TYPE); - - arg1 - .iter() - .zip(arg2.iter()) - .map(|(a1, a2)| match (a1, a2) { - (Some(a1), Some(a2)) => Some($FUNC(a1, a2.try_into().unwrap())), - _ => None, - }) - .collect::<$ARRAY_TYPE>() - }}; -} - pub fn power(args: &[ArrayRef]) -> Result { match args[0].data_type() { DataType::Float32 | DataType::Float64 => @@ -165,7 +165,6 @@ pub fn power(args: &[ArrayRef]) -> Result { } } - #[cfg(test)] mod tests { diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index d65efadb304b..11ad812e57f3 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -1244,6 +1244,10 @@ pub fn parse_expr( .map(|expr| parse_expr(expr, registry)) .collect::, _>>()?, )), + ScalarFunction::Power => Ok(power( + parse_expr(&args[0], registry)?, + parse_expr(&args[1], registry)?, + )), _ => Err(proto_error( "Protobuf deserialization error: Unsupported scalar function", )), From e2fd3392931160eaafdb3abb6cafe2c3b79422ec Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 23 Apr 2022 12:37:58 -0700 Subject: [PATCH 06/11] Readding docker files --- dev/build-ballista-docker.sh | 24 ++++++++ dev/docker/ballista.dockerfile | 108 +++++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+) create mode 100644 dev/build-ballista-docker.sh create mode 100644 dev/docker/ballista.dockerfile diff --git a/dev/build-ballista-docker.sh b/dev/build-ballista-docker.sh new file mode 100644 index 000000000000..7add135d9c18 --- /dev/null +++ b/dev/build-ballista-docker.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -e + +. ./dev/build-set-env.sh +docker build -t ballista-base:$BALLISTA_VERSION -f dev/docker/ballista-base.dockerfile . +docker build -t ballista:$BALLISTA_VERSION -f dev/docker/ballista.dockerfile . \ No newline at end of file diff --git a/dev/docker/ballista.dockerfile b/dev/docker/ballista.dockerfile new file mode 100644 index 000000000000..c452e4684844 --- /dev/null +++ b/dev/docker/ballista.dockerfile @@ -0,0 +1,108 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Turn .dockerignore to .dockerallow by excluding everything and explicitly +# allowing specific files and directories. This enables us to quickly add +# dependency files to the docker content without scanning the whole directory. +# This setup requires to all of our docker containers have arrow's source +# as a mounted directory. + +ARG RELEASE_FLAG=--release +FROM ballista-base:0.6.0 AS base +WORKDIR /tmp/ballista +RUN apt-get -y install cmake +RUN cargo install cargo-chef --version 0.1.34 + +FROM base as planner +ADD Cargo.toml . +COPY ballista ./ballista/ +COPY ballista-examples ./ballista-examples/ +COPY benchmarks ./benchmarks/ +COPY datafusion ./datafusion/ +COPY datafusion-cli ./datafusion-cli/ +COPY datafusion-common ./datafusion-common/ +COPY datafusion-expr ./datafusion-expr/ +COPY datafusion-physical-expr ./datafusion-physical-expr/ +COPY datafusion-jit ./datafusion-jit/ +COPY datafusion-proto ./datafusion-proto/ +COPY datafusion-examples ./datafusion-examples/ +RUN cargo chef prepare --recipe-path recipe.json + +FROM base as cacher +COPY --from=planner /tmp/ballista/recipe.json recipe.json +RUN cargo chef cook $RELEASE_FLAG --recipe-path recipe.json + +FROM base as builder +RUN mkdir /tmp/ballista/ballista +RUN mkdir /tmp/ballista/ballista-examples +RUN mkdir /tmp/ballista/benchmarks +RUN mkdir /tmp/ballista/datafusion +RUN mkdir /tmp/ballista/datafusion-cli +RUN mkdir /tmp/ballista/datafusion-common +RUN mkdir /tmp/ballista/datafusion-expr +RUN mkdir /tmp/ballista/datafusion-physical-expr +RUN mkdir /tmp/ballista/datafusion-jit +RUN mkdir /tmp/ballista/datafusion-proto +RUN mkdir /tmp/ballista/datafusion-examples +ADD Cargo.toml . +COPY ballista ./ballista/ +COPY ballista-examples ./ballista-examples/ +COPY benchmarks ./benchmarks/ +COPY datafusion ./datafusion/ +COPY datafusion-cli ./datafusion-cli/ +COPY datafusion-common ./datafusion-common/ +COPY datafusion-expr ./datafusion-expr/ +COPY datafusion-physical-expr ./datafusion-physical-expr/ +COPY datafusion-jit ./datafusion-jit/ +COPY datafusion-proto ./datafusion-proto/ +COPY datafusion-examples ./datafusion-examples/ +COPY --from=cacher /tmp/ballista/target target +ARG RELEASE_FLAG=--release + +# force build.rs to run to generate configure_me code. +ENV FORCE_REBUILD='true' +RUN cargo build $RELEASE_FLAG + +# put the executor on /executor (need to be copied from different places depending on FLAG) +ENV RELEASE_FLAG=${RELEASE_FLAG} +RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/ballista-executor /executor; else mv /tmp/ballista/target/release/ballista-executor /executor; fi + +# put the scheduler on /scheduler (need to be copied from different places depending on FLAG) +ENV RELEASE_FLAG=${RELEASE_FLAG} +RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/ballista-scheduler /scheduler; else mv /tmp/ballista/target/release/ballista-scheduler /scheduler; fi + +# put the tpch on /tpch (need to be copied from different places depending on FLAG) +ENV RELEASE_FLAG=${RELEASE_FLAG} +RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/tpch /tpch; else mv /tmp/ballista/target/release/tpch /tpch; fi + +# Copy the binary into a new container for a smaller docker image +FROM ballista-base:0.6.0 + +COPY --from=builder /executor / + +COPY --from=builder /scheduler / + +COPY --from=builder /tpch / + +ADD benchmarks/run.sh / +RUN mkdir /queries +COPY benchmarks/queries/ /queries/ + +ENV RUST_LOG=info +ENV RUST_BACKTRACE=full + +CMD ["/executor"] \ No newline at end of file From 6e0cf104cb5a469268e7e6485e0290fdacbb7293 Mon Sep 17 00:00:00 2001 From: comphead Date: Sun, 24 Apr 2022 00:59:46 -0700 Subject: [PATCH 07/11] Formatting --- datafusion/core/src/logical_plan/mod.rs | 6 +-- .../core/src/physical_plan/functions.rs | 2 +- datafusion/core/tests/sql/functions.rs | 47 +++++++++++++++---- datafusion/expr/src/expr_fn.rs | 1 - datafusion/expr/src/function.rs | 18 ++++--- .../physical-expr/src/math_expressions.rs | 47 ++++++++++++------- datafusion/proto/src/from_proto.rs | 6 +-- 7 files changed, 83 insertions(+), 44 deletions(-) diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs index fdfd080a5303..fb8d1e10a1b4 100644 --- a/datafusion/core/src/logical_plan/mod.rs +++ b/datafusion/core/src/logical_plan/mod.rs @@ -43,9 +43,9 @@ pub use expr::{ count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, exp, exprlist_to_fields, floor, in_list, initcap, left, length, lit, lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5, min, now, now_expr, nullif, - octet_length, or, power, random, regexp_match, regexp_replace, repeat, replace, reverse, - right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part, - sqrt, starts_with, strpos, substr, sum, tan, to_hex, to_timestamp_micros, + octet_length, or, power, random, regexp_match, regexp_replace, repeat, replace, + reverse, right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, sin, + split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim, trunc, unalias, upper, when, Column, Expr, ExprSchema, Literal, }; diff --git a/datafusion/core/src/physical_plan/functions.rs b/datafusion/core/src/physical_plan/functions.rs index 57928f27c104..20917fa9b4d1 100644 --- a/datafusion/core/src/physical_plan/functions.rs +++ b/datafusion/core/src/physical_plan/functions.rs @@ -3167,4 +3167,4 @@ mod tests { Ok(()) } -} \ No newline at end of file +} diff --git a/datafusion/core/tests/sql/functions.rs b/datafusion/core/tests/sql/functions.rs index a7b8b1a1e1ca..092a11e2a6a5 100644 --- a/datafusion/core/tests/sql/functions.rs +++ b/datafusion/core/tests/sql/functions.rs @@ -405,8 +405,7 @@ async fn test_power() -> Result<()> { let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(table))?; - let sql = - r"SELECT power(i32, 3) as power_i32, + let sql = r"SELECT power(i32, 3) as power_i32, power(i64, 3) as power_i64, power(f32, 3) as power_f32, power(f64, 3) as power_f64, @@ -427,10 +426,42 @@ async fn test_power() -> Result<()> { ]; assert_batches_eq!(expected, &actual); //dbg!(actual[0].schema().fields()); - assert_eq!(actual[0].schema().field_with_name("power_i32").unwrap().data_type().to_owned(), DataType::Int64); - assert_eq!(actual[0].schema().field_with_name("power_i64").unwrap().data_type().to_owned(), DataType::Int64); - assert_eq!(actual[0].schema().field_with_name("power_f32").unwrap().data_type().to_owned(), DataType::Float64); - assert_eq!(actual[0].schema().field_with_name("power_f64").unwrap().data_type().to_owned(), DataType::Float64); - + assert_eq!( + actual[0] + .schema() + .field_with_name("power_i32") + .unwrap() + .data_type() + .to_owned(), + DataType::Int64 + ); + assert_eq!( + actual[0] + .schema() + .field_with_name("power_i64") + .unwrap() + .data_type() + .to_owned(), + DataType::Int64 + ); + assert_eq!( + actual[0] + .schema() + .field_with_name("power_f32") + .unwrap() + .data_type() + .to_owned(), + DataType::Float64 + ); + assert_eq!( + actual[0] + .schema() + .field_with_name("power_f64") + .unwrap() + .data_type() + .to_owned(), + DataType::Float64 + ); + Ok(()) -} \ No newline at end of file +} diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 9b024f3e5eeb..36c3a8f1c077 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -243,7 +243,6 @@ unary_scalar_expr!(Ln, ln); unary_scalar_expr!(NullIf, nullif); scalar_expr!(Power, power, base, exponent); - // string functions scalar_expr!(Ascii, ascii, string); scalar_expr!(BitLength, bit_length, string); diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs index 57d6d228b53e..ae6fb060437f 100644 --- a/datafusion/expr/src/function.rs +++ b/datafusion/expr/src/function.rs @@ -216,7 +216,7 @@ pub fn return_type( )); } }), - + BuiltinScalarFunction::Power => match &input_expr_types[0] { DataType::Int32 | DataType::Int64 => Ok(DataType::Int64), _ => Ok(DataType::Float64), @@ -510,15 +510,13 @@ pub fn signature(fun: &BuiltinScalarFunction) -> Signature { fun.volatility(), ), BuiltinScalarFunction::Random => Signature::exact(vec![], fun.volatility()), - BuiltinScalarFunction::Power => { - Signature::one_of( - vec![ - TypeSignature::Exact(vec![DataType::Int64, DataType::Int64]), - TypeSignature::Exact(vec![DataType::Float64, DataType::Float64]), - ], - fun.volatility(), - ) - }, + BuiltinScalarFunction::Power => Signature::one_of( + vec![ + TypeSignature::Exact(vec![DataType::Int64, DataType::Int64]), + TypeSignature::Exact(vec![DataType::Float64, DataType::Float64]), + ], + fun.volatility(), + ), // math expressions expect 1 argument of type f64 or f32 // priority is given to f64 because e.g. `sqrt(1i32)` is in IR (real numbers) and thus we // return the best approximation for it (in f64). diff --git a/datafusion/physical-expr/src/math_expressions.rs b/datafusion/physical-expr/src/math_expressions.rs index 516aaa4b2317..bd164d29b712 100644 --- a/datafusion/physical-expr/src/math_expressions.rs +++ b/datafusion/physical-expr/src/math_expressions.rs @@ -17,16 +17,16 @@ //! Math expressions +use arrow::array::ArrayRef; use arrow::array::{Float32Array, Float64Array, Int64Array}; use arrow::datatypes::DataType; use datafusion_common::ScalarValue; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::ColumnarValue; use rand::{thread_rng, Rng}; +use std::any::type_name; use std::iter; use std::sync::Arc; -use arrow::array::ArrayRef; -use std::any::type_name; macro_rules! downcast_compute_op { ($ARRAY:expr, $NAME:expr, $FUNC:ident, $TYPE:ident) => {{ @@ -104,15 +104,14 @@ macro_rules! make_function_inputs2 { ($ARG1: expr, $ARG2: expr, $NAME1:expr, $NAME2: expr, $ARRAY_TYPE:ident, $FUNC: block) => {{ let arg1 = downcast_arg!($ARG1, $NAME1, $ARRAY_TYPE); let arg2 = downcast_arg!($ARG2, $NAME2, $ARRAY_TYPE); - - arg1 - .iter() - .zip(arg2.iter()) - .map(|(a1, a2)| match (a1, a2) { - (Some(a1), Some(a2)) => Some($FUNC(a1, a2.try_into().unwrap())), - _ => None, - }) - .collect::<$ARRAY_TYPE>() + + arg1.iter() + .zip(arg2.iter()) + .map(|(a1, a2)| match (a1, a2) { + (Some(a1), Some(a2)) => Some($FUNC(a1, a2.try_into().unwrap())), + _ => None, + }) + .collect::<$ARRAY_TYPE>() }}; } @@ -152,16 +151,28 @@ pub fn random(args: &[ColumnarValue]) -> Result { pub fn power(args: &[ArrayRef]) -> Result { match args[0].data_type() { - DataType::Float32 | DataType::Float64 => - Ok(Arc::new(make_function_inputs2!(&args[0], &args[1], "base", "exponent", Float64Array, {f64::powf})) as ArrayRef), - - DataType::Int32 | DataType::Int64 => - Ok(Arc::new(make_function_inputs2!(&args[0], &args[1], "base", "exponent", Int64Array, {i64::pow})) as ArrayRef), - + DataType::Float32 | DataType::Float64 => Ok(Arc::new(make_function_inputs2!( + &args[0], + &args[1], + "base", + "exponent", + Float64Array, + { f64::powf } + )) as ArrayRef), + + DataType::Int32 | DataType::Int64 => Ok(Arc::new(make_function_inputs2!( + &args[0], + &args[1], + "base", + "exponent", + Int64Array, + { i64::pow } + )) as ArrayRef), + other => Err(DataFusionError::Internal(format!( "Unsupported data type {:?} for function power", other - ))) + ))), } } diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index 11ad812e57f3..37466dae207d 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -31,9 +31,9 @@ use datafusion::{ logical_plan::{ abs, acos, ascii, asin, atan, ceil, character_length, chr, concat_expr, concat_ws_expr, cos, digest, exp, floor, left, ln, log10, log2, now_expr, nullif, - power, random, regexp_replace, repeat, replace, reverse, right, round, signum, sin, - split_part, sqrt, starts_with, strpos, substr, tan, to_hex, to_timestamp_micros, - to_timestamp_millis, to_timestamp_seconds, translate, trunc, + power, random, regexp_replace, repeat, replace, reverse, right, round, signum, + sin, split_part, sqrt, starts_with, strpos, substr, tan, to_hex, + to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trunc, window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits}, Column, DFField, DFSchema, DFSchemaRef, Expr, Operator, }, From caf24139e82d123a2c08093807b0a0a6a9466367 Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 25 Apr 2022 08:10:44 -0700 Subject: [PATCH 08/11] Leaving only 64bit types --- datafusion/core/tests/sql/functions.rs | 22 +++++++++++++++++-- datafusion/expr/src/function.rs | 2 +- .../physical-expr/src/math_expressions.rs | 4 ++-- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/sql/functions.rs b/datafusion/core/tests/sql/functions.rs index 092a11e2a6a5..6424381c7f51 100644 --- a/datafusion/core/tests/sql/functions.rs +++ b/datafusion/core/tests/sql/functions.rs @@ -361,7 +361,7 @@ async fn coalesce_mul_with_default_value() -> Result<()> { #[tokio::test] async fn test_power() -> Result<()> { let schema = Arc::new(Schema::new(vec![ - Field::new("i32", DataType::Int32, true), + Field::new("i32", DataType::Int16, true), Field::new("i64", DataType::Int64, true), Field::new("f32", DataType::Float32, true), Field::new("f64", DataType::Float64, true), @@ -370,7 +370,7 @@ async fn test_power() -> Result<()> { let data = RecordBatch::try_new( schema.clone(), vec![ - Arc::new(Int32Array::from(vec![ + Arc::new(Int16Array::from(vec![ Some(2), Some(5), Some(0), @@ -462,6 +462,24 @@ async fn test_power() -> Result<()> { .to_owned(), DataType::Float64 ); + assert_eq!( + actual[0] + .schema() + .field_with_name("power_int_scalar") + .unwrap() + .data_type() + .to_owned(), + DataType::Int64 + ); + assert_eq!( + actual[0] + .schema() + .field_with_name("power_float_scalar") + .unwrap() + .data_type() + .to_owned(), + DataType::Float64 + ); Ok(()) } diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs index ae6fb060437f..385e247bd3a6 100644 --- a/datafusion/expr/src/function.rs +++ b/datafusion/expr/src/function.rs @@ -218,7 +218,7 @@ pub fn return_type( }), BuiltinScalarFunction::Power => match &input_expr_types[0] { - DataType::Int32 | DataType::Int64 => Ok(DataType::Int64), + DataType::Int64 => Ok(DataType::Int64), _ => Ok(DataType::Float64), }, diff --git a/datafusion/physical-expr/src/math_expressions.rs b/datafusion/physical-expr/src/math_expressions.rs index bd164d29b712..99ce9735899d 100644 --- a/datafusion/physical-expr/src/math_expressions.rs +++ b/datafusion/physical-expr/src/math_expressions.rs @@ -151,7 +151,7 @@ pub fn random(args: &[ColumnarValue]) -> Result { pub fn power(args: &[ArrayRef]) -> Result { match args[0].data_type() { - DataType::Float32 | DataType::Float64 => Ok(Arc::new(make_function_inputs2!( + DataType::Float64 => Ok(Arc::new(make_function_inputs2!( &args[0], &args[1], "base", @@ -160,7 +160,7 @@ pub fn power(args: &[ArrayRef]) -> Result { { f64::powf } )) as ArrayRef), - DataType::Int32 | DataType::Int64 => Ok(Arc::new(make_function_inputs2!( + DataType::Int64 => Ok(Arc::new(make_function_inputs2!( &args[0], &args[1], "base", From 8faa7f2423ed944856003fa275acf788edd28d48 Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 27 Apr 2022 01:57:10 -0700 Subject: [PATCH 09/11] Adding tests, remove type conversion --- datafusion/core/src/logical_plan/mod.rs | 6 +++--- datafusion/core/tests/sql/functions.rs | 14 +++++++------- datafusion/physical-expr/src/math_expressions.rs | 14 ++++++++------ 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs index 1abfdbb94364..7da1ba03bcc7 100644 --- a/datafusion/core/src/logical_plan/mod.rs +++ b/datafusion/core/src/logical_plan/mod.rs @@ -43,9 +43,9 @@ pub use expr::{ count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, exists, exp, exprlist_to_fields, floor, in_list, initcap, left, length, lit, lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5, min, now, - now_expr, nullif, octet_length, or, power, random, regexp_match, regexp_replace, repeat, - replace, reverse, right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, - sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex, + now_expr, nullif, octet_length, or, power, random, regexp_match, regexp_replace, + repeat, replace, reverse, right, round, rpad, rtrim, sha224, sha256, sha384, sha512, + signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim, trunc, unalias, upper, when, Column, Expr, ExprSchema, Literal, }; diff --git a/datafusion/core/tests/sql/functions.rs b/datafusion/core/tests/sql/functions.rs index 8319abf5f7d2..01101ff892fd 100644 --- a/datafusion/core/tests/sql/functions.rs +++ b/datafusion/core/tests/sql/functions.rs @@ -533,13 +533,13 @@ async fn test_power() -> Result<()> { let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(table))?; - let sql = r"SELECT power(i32, 3) as power_i32, - power(i64, 3) as power_i64, - power(f32, 3) as power_f32, - power(f64, 3) as power_f64, + let sql = r"SELECT power(i32, exp_i) as power_i32, + power(i64, exp_f) as power_i64, + power(f32, exp_i) as power_f32, + power(f64, exp_f) as power_f64, power(2, 3) as power_int_scalar, - power(2.5, 3) as power_float_scalar - FROM test"; + power(2.5, 3.0) as power_float_scalar + FROM (select test.*, 3 as exp_i, 3.0 as exp_f from test) a"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ "+-----------+-----------+-----------+-----------+------------------+--------------------+", @@ -570,7 +570,7 @@ async fn test_power() -> Result<()> { .unwrap() .data_type() .to_owned(), - DataType::Int64 + DataType::Float64 ); assert_eq!( actual[0] diff --git a/datafusion/physical-expr/src/math_expressions.rs b/datafusion/physical-expr/src/math_expressions.rs index 99ce9735899d..26b6a8b352bc 100644 --- a/datafusion/physical-expr/src/math_expressions.rs +++ b/datafusion/physical-expr/src/math_expressions.rs @@ -17,7 +17,7 @@ //! Math expressions -use arrow::array::ArrayRef; +use arrow::array::{ArrayRef, UInt32Array}; use arrow::array::{Float32Array, Float64Array, Int64Array}; use arrow::datatypes::DataType; use datafusion_common::ScalarValue; @@ -101,17 +101,17 @@ macro_rules! downcast_arg { } macro_rules! make_function_inputs2 { - ($ARG1: expr, $ARG2: expr, $NAME1:expr, $NAME2: expr, $ARRAY_TYPE:ident, $FUNC: block) => {{ - let arg1 = downcast_arg!($ARG1, $NAME1, $ARRAY_TYPE); - let arg2 = downcast_arg!($ARG2, $NAME2, $ARRAY_TYPE); + ($ARG1: expr, $ARG2: expr, $NAME1:expr, $NAME2: expr, $ARRAY_TYPE1:ident, $ARRAY_TYPE2:ident, $FUNC: block) => {{ + let arg1 = downcast_arg!($ARG1, $NAME1, $ARRAY_TYPE1); + let arg2 = downcast_arg!($ARG2, $NAME2, $ARRAY_TYPE2); arg1.iter() .zip(arg2.iter()) .map(|(a1, a2)| match (a1, a2) { - (Some(a1), Some(a2)) => Some($FUNC(a1, a2.try_into().unwrap())), + (Some(a1), Some(a2)) => Some($FUNC(a1, a2)), _ => None, }) - .collect::<$ARRAY_TYPE>() + .collect::<$ARRAY_TYPE1>() }}; } @@ -157,6 +157,7 @@ pub fn power(args: &[ArrayRef]) -> Result { "base", "exponent", Float64Array, + Float64Array, { f64::powf } )) as ArrayRef), @@ -166,6 +167,7 @@ pub fn power(args: &[ArrayRef]) -> Result { "base", "exponent", Int64Array, + UInt32Array, { i64::pow } )) as ArrayRef), From d60d217c62952a3148819f93dfb3ec8c2643d2d9 Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 27 Apr 2022 05:14:25 -0700 Subject: [PATCH 10/11] fix for cast --- datafusion/physical-expr/src/math_expressions.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-expr/src/math_expressions.rs b/datafusion/physical-expr/src/math_expressions.rs index 26b6a8b352bc..7f41268154a9 100644 --- a/datafusion/physical-expr/src/math_expressions.rs +++ b/datafusion/physical-expr/src/math_expressions.rs @@ -17,7 +17,7 @@ //! Math expressions -use arrow::array::{ArrayRef, UInt32Array}; +use arrow::array::ArrayRef; use arrow::array::{Float32Array, Float64Array, Int64Array}; use arrow::datatypes::DataType; use datafusion_common::ScalarValue; @@ -101,17 +101,17 @@ macro_rules! downcast_arg { } macro_rules! make_function_inputs2 { - ($ARG1: expr, $ARG2: expr, $NAME1:expr, $NAME2: expr, $ARRAY_TYPE1:ident, $ARRAY_TYPE2:ident, $FUNC: block) => {{ - let arg1 = downcast_arg!($ARG1, $NAME1, $ARRAY_TYPE1); - let arg2 = downcast_arg!($ARG2, $NAME2, $ARRAY_TYPE2); + ($ARG1: expr, $ARG2: expr, $NAME1:expr, $NAME2: expr, $ARRAY_TYPE:ident, $FUNC: block) => {{ + let arg1 = downcast_arg!($ARG1, $NAME1, $ARRAY_TYPE); + let arg2 = downcast_arg!($ARG2, $NAME2, $ARRAY_TYPE); arg1.iter() .zip(arg2.iter()) .map(|(a1, a2)| match (a1, a2) { - (Some(a1), Some(a2)) => Some($FUNC(a1, a2)), + (Some(a1), Some(a2)) => Some($FUNC(a1, a2.try_into().ok()?)), _ => None, }) - .collect::<$ARRAY_TYPE1>() + .collect::<$ARRAY_TYPE>() }}; } @@ -157,7 +157,6 @@ pub fn power(args: &[ArrayRef]) -> Result { "base", "exponent", Float64Array, - Float64Array, { f64::powf } )) as ArrayRef), @@ -167,7 +166,6 @@ pub fn power(args: &[ArrayRef]) -> Result { "base", "exponent", Int64Array, - UInt32Array, { i64::pow } )) as ArrayRef), From e3b234f8ec056839522c429077579ce8ef0b27c6 Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 27 Apr 2022 12:38:53 -0700 Subject: [PATCH 11/11] Update functions.rs --- datafusion/core/tests/sql/functions.rs | 40 -------------------------- 1 file changed, 40 deletions(-) diff --git a/datafusion/core/tests/sql/functions.rs b/datafusion/core/tests/sql/functions.rs index 01101ff892fd..857781aa35a3 100644 --- a/datafusion/core/tests/sql/functions.rs +++ b/datafusion/core/tests/sql/functions.rs @@ -446,46 +446,6 @@ async fn case_builtin_math_expression() { } } -#[tokio::test] -async fn case_sensitive_identifiers_aggregates() { - let ctx = SessionContext::new(); - ctx.register_table("t", table_with_sequence(1, 1).unwrap()) - .unwrap(); - - let expected = vec![ - "+----------+", - "| MAX(t.i) |", - "+----------+", - "| 1 |", - "+----------+", - ]; - - let results = plan_and_collect(&ctx, "SELECT max(i) FROM t") - .await - .unwrap(); - - assert_batches_sorted_eq!(expected, &results); - - let results = plan_and_collect(&ctx, "SELECT MAX(i) FROM t") - .await - .unwrap(); - assert_batches_sorted_eq!(expected, &results); - - // Using double quotes allows specifying the function name with capitalization - let err = plan_and_collect(&ctx, "SELECT \"MAX\"(i) FROM t") - .await - .unwrap_err(); - assert_eq!( - err.to_string(), - "Error during planning: Invalid function 'MAX'" - ); - - let results = plan_and_collect(&ctx, "SELECT \"max\"(i) FROM t") - .await - .unwrap(); - assert_batches_sorted_eq!(expected, &results); -} - #[tokio::test] async fn test_power() -> Result<()> { let schema = Arc::new(Schema::new(vec![