From 5363ade4add494b836e19dd56ec2f6ca01087814 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Fri, 3 Sep 2021 09:21:59 -0700 Subject: [PATCH 01/14] added extra check for datetime --- dask_sql/physical/rel/logical/aggregate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py index 171e20c79..3a8e9bddb 100644 --- a/dask_sql/physical/rel/logical/aggregate.py +++ b/dask_sql/physical/rel/logical/aggregate.py @@ -304,7 +304,7 @@ def _collect_aggregations( ) if isinstance(aggregation_function, AggregationSpecification): dtype = df[input_col].dtype - if pd.api.types.is_numeric_dtype(dtype): + if pd.api.types.is_numeric_dtype(dtype) or pd.api.types.is_datetime64_any_dtype(dtype): aggregation_function = aggregation_function.numerical_aggregation else: aggregation_function = ( From 07ad74a51e0e4cad289f8e51290bd1573e7ba015 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Fri, 3 Sep 2021 11:32:33 -0700 Subject: [PATCH 02/14] cleaned_up_code --- dask_sql/physical/rel/logical/aggregate.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py index 3a8e9bddb..f5c02fe4c 100644 --- a/dask_sql/physical/rel/logical/aggregate.py +++ b/dask_sql/physical/rel/logical/aggregate.py @@ -61,6 +61,19 @@ def __init__(self, numerical_aggregation, non_numerical_aggregation=None): non_numerical_aggregation or numerical_aggregation ) + def get_dtype_relevant_aggregation(self, dtype): + numeric_agg = self.numerical_aggregation + + if pd.api.types.is_numeric_dtype(dtype): + return numeric_agg + + # Todo: Add Categorical and String Checks + if numeric_agg in ["min", "max", "first"]: + if pd.api.types.is_datetime64_any_dtype(dtype): + return numeric_agg + + return self.non_numerical_aggregation + class LogicalAggregatePlugin(BaseRelPlugin): """ @@ -304,12 +317,9 @@ def _collect_aggregations( ) if isinstance(aggregation_function, AggregationSpecification): dtype = df[input_col].dtype - if pd.api.types.is_numeric_dtype(dtype) or pd.api.types.is_datetime64_any_dtype(dtype): - aggregation_function = aggregation_function.numerical_aggregation - else: - aggregation_function = ( - aggregation_function.non_numerical_aggregation - ) + aggregation_function = aggregation_function.get_dtype_relevant_aggregation( + dtype + ) # Finally, extract the output column name output_col = str(agg_call.getValue()) From 4239b4ec694b88213ef249df14ee69622cee2e2f Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Fri, 3 Sep 2021 11:55:58 -0700 Subject: [PATCH 03/14] added check for string --- dask_sql/physical/rel/logical/aggregate.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py index f5c02fe4c..4e212ca23 100644 --- a/dask_sql/physical/rel/logical/aggregate.py +++ b/dask_sql/physical/rel/logical/aggregate.py @@ -67,9 +67,11 @@ def get_dtype_relevant_aggregation(self, dtype): if pd.api.types.is_numeric_dtype(dtype): return numeric_agg - # Todo: Add Categorical and String Checks + # Todo: Add Categorical when support comes to dask-sql if numeric_agg in ["min", "max", "first"]: - if pd.api.types.is_datetime64_any_dtype(dtype): + if pd.api.types.is_datetime64_any_dtype( + dtype + ) or pd.api.types.is_string_dtype(dtype): return numeric_agg return self.non_numerical_aggregation From 86b3132f28e366524fb33cd499bc0b223307d04d Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Wed, 22 Sep 2021 15:50:58 -0700 Subject: [PATCH 04/14] added check for cudfDtype and PandasDtype --- dask_sql/physical/rel/logical/aggregate.py | 60 ++++++++++++++-------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py index 4e212ca23..1c183ef65 100644 --- a/dask_sql/physical/rel/logical/aggregate.py +++ b/dask_sql/physical/rel/logical/aggregate.py @@ -7,6 +7,13 @@ import dask.dataframe as dd import pandas as pd +try: + import cudf + import dask_cudf +except ImportError: + dask_cudf = None + cudf = None + from dask_sql.datacontainer import ColumnContainer, DataContainer from dask_sql.physical.rel.base import BaseRelPlugin from dask_sql.physical.rex.core.call import IsNullOperation @@ -48,33 +55,43 @@ class AggregationSpecification: """ Most of the aggregations in SQL are already implemented 1:1 in dask and can just be called via their name - (e.g. AVG is the mean). However sometimes those already - implemented functions only work well for numerical - functions. This small container class therefore + (e.g. AVG is the mean). However sometimes those + implemented functions only work well for some native data types. + This small container class therefore can have an additional aggregation function, which is - valid for non-numerical types. + valid for non-supported native types. """ - def __init__(self, numerical_aggregation, non_numerical_aggregation=None): - self.numerical_aggregation = numerical_aggregation - self.non_numerical_aggregation = ( - non_numerical_aggregation or numerical_aggregation - ) + def __init__(self, native_aggregation, custom_aggregation=None): + self.native_aggregation = native_aggregation + self.custom_aggregation = native_aggregation or custom_aggregation - def get_dtype_relevant_aggregation(self, dtype): - numeric_agg = self.numerical_aggregation + def get_supported_aggregation(self, series): + native_agg = self.native_aggregation - if pd.api.types.is_numeric_dtype(dtype): - return numeric_agg + # native_aggreagations work well for all numeric types + if pd.api.types.is_numeric_dtype(series.dtype): + return native_agg # Todo: Add Categorical when support comes to dask-sql - if numeric_agg in ["min", "max", "first"]: - if pd.api.types.is_datetime64_any_dtype( - dtype - ) or pd.api.types.is_string_dtype(dtype): - return numeric_agg + if native_agg in ["min", "max", "first"]: + if pd.api.types.is_datetime64_any_dtype(series.dtype): + return native_agg + + if pd.api.types.is_string_dtype(series.dtype): + ## If dask_cudf strings dtype return native aggregation + if isinstance(series, dask_cudf.Series): + return native_agg + + ## If pandas StringDtype return native aggregation + if isinstance(series, dd.Series) and isinstance( + series.dtype, pd.StringDtype + ): + return native_agg + + return native_agg - return self.non_numerical_aggregation + return self.custom_aggregation class LogicalAggregatePlugin(BaseRelPlugin): @@ -318,9 +335,8 @@ def _collect_aggregations( f"Aggregation function {aggregation_name} not implemented (yet)." ) if isinstance(aggregation_function, AggregationSpecification): - dtype = df[input_col].dtype - aggregation_function = aggregation_function.get_dtype_relevant_aggregation( - dtype + aggregation_function = aggregation_function.get_supported_aggregation( + df[input_col] ) # Finally, extract the output column name From b014dc5a1d6cf71ff6850cf79f2aed9c7437e34b Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Wed, 22 Sep 2021 16:04:44 -0700 Subject: [PATCH 05/14] fixed preference for native vs custom agg --- dask_sql/physical/rel/logical/aggregate.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py index 1c183ef65..dc682e586 100644 --- a/dask_sql/physical/rel/logical/aggregate.py +++ b/dask_sql/physical/rel/logical/aggregate.py @@ -64,7 +64,7 @@ class AggregationSpecification: def __init__(self, native_aggregation, custom_aggregation=None): self.native_aggregation = native_aggregation - self.custom_aggregation = native_aggregation or custom_aggregation + self.custom_aggregation = custom_aggregation or native_aggregation def get_supported_aggregation(self, series): native_agg = self.native_aggregation @@ -80,7 +80,7 @@ def get_supported_aggregation(self, series): if pd.api.types.is_string_dtype(series.dtype): ## If dask_cudf strings dtype return native aggregation - if isinstance(series, dask_cudf.Series): + if dask_cudf is not None and isinstance(series, dask_cudf.Series): return native_agg ## If pandas StringDtype return native aggregation @@ -89,8 +89,6 @@ def get_supported_aggregation(self, series): ): return native_agg - return native_agg - return self.custom_aggregation From 89822d92dd9436262d4096fcbecbb371207c6bfe Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Wed, 22 Sep 2021 18:00:51 -0700 Subject: [PATCH 06/14] removed first from agg --- dask_sql/physical/rel/logical/aggregate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py index dc682e586..cd40e68e8 100644 --- a/dask_sql/physical/rel/logical/aggregate.py +++ b/dask_sql/physical/rel/logical/aggregate.py @@ -74,7 +74,7 @@ def get_supported_aggregation(self, series): return native_agg # Todo: Add Categorical when support comes to dask-sql - if native_agg in ["min", "max", "first"]: + if native_agg in ["min", "max"]: if pd.api.types.is_datetime64_any_dtype(series.dtype): return native_agg From ea48c7a08bef1ec216ebc930e86808566a067b57 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Wed, 22 Sep 2021 19:23:16 -0700 Subject: [PATCH 07/14] added StringDtype to test_compatibility.py --- tests/integration/test_compatibility.py | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_compatibility.py b/tests/integration/test_compatibility.py index 6a8a334cc..e562353ac 100644 --- a/tests/integration/test_compatibility.py +++ b/tests/integration/test_compatibility.py @@ -54,6 +54,11 @@ def make_rand_df(size: int, **kwargs): r = [f"ssssss{x}" for x in range(10)] c = np.random.randint(10, size=size) s = np.array([r[x] for x in c]) + elif dt is pd.StringDtype: + r = [f"ssssss{x}" for x in range(10)] + c = np.random.randint(10, size=size) + s = np.array([r[x] for x in c]) + s = pd.array(s, dtype="string") elif dt is datetime: rt = [datetime(2020, 1, 1) + timedelta(days=x) for x in range(10)] c = np.random.randint(10, size=size) @@ -337,7 +342,13 @@ def test_agg_sum_avg(): def test_agg_min_max_no_group_by(): a = make_rand_df( - 100, a=(int, 50), b=(str, 50), c=(int, 30), d=(str, 40), e=(float, 40) + 100, + a=(int, 50), + b=(str, 50), + c=(int, 30), + d=(str, 40), + e=(float, 40), + f=(pd.StringDtype, 40), ) eq_sqlite( """ @@ -352,6 +363,8 @@ def test_agg_min_max_no_group_by(): MAX(d) AS max_d, MIN(e) AS min_e, MAX(e) AS max_e, + MIN(f) as min_f, + MAX(f) as max_f, MIN(a+e) AS mix_1, MIN(a)+MIN(e) AS mix_2 FROM a @@ -362,7 +375,13 @@ def test_agg_min_max_no_group_by(): def test_agg_min_max(): a = make_rand_df( - 100, a=(int, 50), b=(str, 50), c=(int, 30), d=(str, 40), e=(float, 40) + 100, + a=(int, 50), + b=(str, 50), + c=(int, 30), + d=(str, 40), + e=(float, 40), + f=(pd.StringDtype, 40), ) eq_sqlite( """ @@ -374,6 +393,8 @@ def test_agg_min_max(): MAX(d) AS max_d, MIN(e) AS min_e, MAX(e) AS max_e, + MIN(f) AS min_f, + MAX(f) AS max_f, MIN(a+e) AS mix_1, MIN(a)+MIN(e) AS mix_2 FROM a GROUP BY a, b From bb511258b459cd75edd68db0819dae39c6b3c2a7 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Wed, 22 Sep 2021 19:42:26 -0700 Subject: [PATCH 08/14] added better comment for why we take a different code path with StringsDtype --- dask_sql/physical/rel/logical/aggregate.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py index cd40e68e8..f6cc0a837 100644 --- a/dask_sql/physical/rel/logical/aggregate.py +++ b/dask_sql/physical/rel/logical/aggregate.py @@ -83,7 +83,8 @@ def get_supported_aggregation(self, series): if dask_cudf is not None and isinstance(series, dask_cudf.Series): return native_agg - ## If pandas StringDtype return native aggregation + ## If pandas StringDtype native aggregation works + ## and with ObjectDtype and Nulls native aggregation can fail if isinstance(series, dd.Series) and isinstance( series.dtype, pd.StringDtype ): From 65982596cf5d5791bf4d3606ea57ec07518dc101 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Thu, 23 Sep 2021 10:10:46 -0700 Subject: [PATCH 09/14] added test for datetime --- tests/integration/test_compatibility.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/integration/test_compatibility.py b/tests/integration/test_compatibility.py index e562353ac..760857356 100644 --- a/tests/integration/test_compatibility.py +++ b/tests/integration/test_compatibility.py @@ -19,6 +19,14 @@ from dask_sql import Context +def cast_datetime_to_string(df): + cols = df.select_dtypes(include=["datetime64[ns]"]).columns + # Casting to object first as + # directly converting to string looses second precision + df[cols] = df[cols].astype("object").astype("string") + return df + + def eq_sqlite(sql, **dfs): c = Context() engine = sqlite3.connect(":memory:") @@ -30,6 +38,10 @@ def eq_sqlite(sql, **dfs): dask_result = c.sql(sql).compute().reset_index(drop=True) sqlite_result = pd.read_sql(sql, engine).reset_index(drop=True) + # casting to object to ensure equality with sql-lite + # which returns object dtype for datetime inputs + dask_result = cast_datetime_to_string(dask_result) + # Make sure SQL and Dask use the same "NULL" value dask_result = dask_result.fillna(np.NaN) sqlite_result = sqlite_result.fillna(np.NaN) @@ -349,6 +361,7 @@ def test_agg_min_max_no_group_by(): d=(str, 40), e=(float, 40), f=(pd.StringDtype, 40), + g=(datetime, 40), ) eq_sqlite( """ @@ -365,6 +378,8 @@ def test_agg_min_max_no_group_by(): MAX(e) AS max_e, MIN(f) as min_f, MAX(f) as max_f, + MIN(g) as min_g, + MAX(g) as max_g, MIN(a+e) AS mix_1, MIN(a)+MIN(e) AS mix_2 FROM a @@ -382,6 +397,7 @@ def test_agg_min_max(): d=(str, 40), e=(float, 40), f=(pd.StringDtype, 40), + g=(datetime, 40), ) eq_sqlite( """ @@ -395,6 +411,8 @@ def test_agg_min_max(): MAX(e) AS max_e, MIN(f) AS min_f, MAX(f) AS max_f, + MIN(g) AS min_g, + MAX(g) AS max_g, MIN(a+e) AS mix_1, MIN(a)+MIN(e) AS mix_2 FROM a GROUP BY a, b From 9dccdc4fc2f7fc9ffa53d3cfbdf08d79ab269c44 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Thu, 23 Sep 2021 15:38:49 -0700 Subject: [PATCH 10/14] changed native to built in --- dask_sql/physical/rel/logical/aggregate.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py index f6cc0a837..6157223d8 100644 --- a/dask_sql/physical/rel/logical/aggregate.py +++ b/dask_sql/physical/rel/logical/aggregate.py @@ -56,39 +56,39 @@ class AggregationSpecification: Most of the aggregations in SQL are already implemented 1:1 in dask and can just be called via their name (e.g. AVG is the mean). However sometimes those - implemented functions only work well for some native data types. + implemented functions only work well for some built-in datatypes. This small container class therefore can have an additional aggregation function, which is - valid for non-supported native types. + valid for non-supported built-in types. """ - def __init__(self, native_aggregation, custom_aggregation=None): - self.native_aggregation = native_aggregation - self.custom_aggregation = custom_aggregation or native_aggregation + def __init__(self, built_in_aggregation, custom_aggregation=None): + self.built_in_aggregation = built_in_aggregation + self.custom_aggregation = custom_aggregation or built_in_aggregation def get_supported_aggregation(self, series): - native_agg = self.native_aggregation + built_in_aggregation = self.built_in_aggregation # native_aggreagations work well for all numeric types if pd.api.types.is_numeric_dtype(series.dtype): - return native_agg + return built_in_aggregation # Todo: Add Categorical when support comes to dask-sql - if native_agg in ["min", "max"]: + if built_in_aggregation in ["min", "max"]: if pd.api.types.is_datetime64_any_dtype(series.dtype): - return native_agg + return built_in_aggregation if pd.api.types.is_string_dtype(series.dtype): ## If dask_cudf strings dtype return native aggregation if dask_cudf is not None and isinstance(series, dask_cudf.Series): - return native_agg + return built_in_aggregation ## If pandas StringDtype native aggregation works ## and with ObjectDtype and Nulls native aggregation can fail if isinstance(series, dd.Series) and isinstance( series.dtype, pd.StringDtype ): - return native_agg + return built_in_aggregation return self.custom_aggregation From 57be03da40ef1daed8d96f05fadfb8c8dfd4630e Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Thu, 23 Sep 2021 15:41:58 -0700 Subject: [PATCH 11/14] made docstring cleaner --- dask_sql/physical/rel/logical/aggregate.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py index 6157223d8..911fe19af 100644 --- a/dask_sql/physical/rel/logical/aggregate.py +++ b/dask_sql/physical/rel/logical/aggregate.py @@ -56,10 +56,10 @@ class AggregationSpecification: Most of the aggregations in SQL are already implemented 1:1 in dask and can just be called via their name (e.g. AVG is the mean). However sometimes those - implemented functions only work well for some built-in datatypes. + implemented functions only work well for some datatypes. This small container class therefore - can have an additional aggregation function, which is - valid for non-supported built-in types. + can have an custom aggregation function, which is + valid for not supported dtypes. """ def __init__(self, built_in_aggregation, custom_aggregation=None): From 8c8c305f2399e1d368a307d754a8f77f5c422ec5 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Mon, 27 Sep 2021 09:06:02 -0700 Subject: [PATCH 12/14] removed unused import --- dask_sql/physical/rel/logical/aggregate.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py index 911fe19af..f2d853764 100644 --- a/dask_sql/physical/rel/logical/aggregate.py +++ b/dask_sql/physical/rel/logical/aggregate.py @@ -8,11 +8,9 @@ import pandas as pd try: - import cudf import dask_cudf except ImportError: dask_cudf = None - cudf = None from dask_sql.datacontainer import ColumnContainer, DataContainer from dask_sql.physical.rel.base import BaseRelPlugin @@ -69,7 +67,7 @@ def __init__(self, built_in_aggregation, custom_aggregation=None): def get_supported_aggregation(self, series): built_in_aggregation = self.built_in_aggregation - # native_aggreagations work well for all numeric types + # built-in_aggreagations work well for numeric types if pd.api.types.is_numeric_dtype(series.dtype): return built_in_aggregation @@ -79,12 +77,12 @@ def get_supported_aggregation(self, series): return built_in_aggregation if pd.api.types.is_string_dtype(series.dtype): - ## If dask_cudf strings dtype return native aggregation + # If dask_cudf strings dtype, return built-in aggregation if dask_cudf is not None and isinstance(series, dask_cudf.Series): return built_in_aggregation - ## If pandas StringDtype native aggregation works - ## and with ObjectDtype and Nulls native aggregation can fail + # With pandas StringDtype built-in aggregations work + # while with pandas ObjectDtype and Nulls built-in aggregations fail if isinstance(series, dd.Series) and isinstance( series.dtype, pd.StringDtype ): From 879870444affa6fa65055a37d001b46e6cda9a9d Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Mon, 27 Sep 2021 15:30:40 -0700 Subject: [PATCH 13/14] Trigger Build From 3eb6f4bebe2771425600c0eef11916df73899301 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Tue, 12 Oct 2021 11:06:26 -0400 Subject: [PATCH 14/14] Minor typo --- dask_sql/physical/rel/logical/aggregate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py index f2d853764..cfaf1a76a 100644 --- a/dask_sql/physical/rel/logical/aggregate.py +++ b/dask_sql/physical/rel/logical/aggregate.py @@ -67,7 +67,7 @@ def __init__(self, built_in_aggregation, custom_aggregation=None): def get_supported_aggregation(self, series): built_in_aggregation = self.built_in_aggregation - # built-in_aggreagations work well for numeric types + # built-in aggregations work well for numeric types if pd.api.types.is_numeric_dtype(series.dtype): return built_in_aggregation