From 5ad586d208f1d54c111365d873baa9c3a6ea0214 Mon Sep 17 00:00:00 2001 From: Benjamin Zaitlen Date: Fri, 27 Aug 2021 06:51:52 -0700 Subject: [PATCH 01/15] add fast path for multi-column sorting --- dask_sql/physical/utils/sort.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 1170ba07c..31932577f 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -5,6 +5,15 @@ from dask_sql.utils import make_pickable_without_dask_sql, new_temporary_column +def multi_col_sort( + df: dd.DataFrame, + sort_columns: List[str], + sort_ascending: List[bool], + sort_null_first: List[bool], +) -> dd.DataFrame: + + df = df.sort_values(sort_columns) + return df.persist() def apply_sort( df: dd.DataFrame, @@ -12,6 +21,19 @@ def apply_sort( sort_ascending: List[bool], sort_null_first: List[bool], ) -> dd.DataFrame: + + # Try fast path for multi-column sorting before falling back to + # sort_partition_func. Tools like dask-cudf have a limited but fast + # multi-column sort implementation. We check if any sorting/null sorting + # is required. If so, we fall back to default sorting implementation + if any(sort_null_first) is False and all(sort_ascending) is True: + try: + return multi_col_sort(df, sort_columns, sort_ascending, + sort_null_first) + except NotImplementedError: + pass + + # Split the first column. We need to handle this one with set_index first_sort_column = sort_columns[0] first_sort_ascending = sort_ascending[0] From 8fc9a7ba2f744da80c7a63b300a149e0adcd5807 Mon Sep 17 00:00:00 2001 From: Benjamin Zaitlen Date: Fri, 27 Aug 2021 07:26:50 -0700 Subject: [PATCH 02/15] lint --- dask_sql/physical/utils/sort.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 31932577f..6909b0998 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -5,6 +5,7 @@ from dask_sql.utils import make_pickable_without_dask_sql, new_temporary_column + def multi_col_sort( df: dd.DataFrame, sort_columns: List[str], @@ -15,6 +16,7 @@ def multi_col_sort( df = df.sort_values(sort_columns) return df.persist() + def apply_sort( df: dd.DataFrame, sort_columns: List[str], @@ -28,12 +30,10 @@ def apply_sort( # is required. If so, we fall back to default sorting implementation if any(sort_null_first) is False and all(sort_ascending) is True: try: - return multi_col_sort(df, sort_columns, sort_ascending, - sort_null_first) + return multi_col_sort(df, sort_columns, sort_ascending, sort_null_first) except NotImplementedError: pass - # Split the first column. We need to handle this one with set_index first_sort_column = sort_columns[0] first_sort_ascending = sort_ascending[0] From c86cdab09ffea861bd18134a2c12bfc67cfff7fa Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Tue, 31 Aug 2021 09:13:21 -0700 Subject: [PATCH 03/15] Prevent single column Dask dataframes from calling sort_values --- dask_sql/physical/utils/sort.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 6909b0998..9b6833597 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -1,6 +1,7 @@ from typing import List import dask.dataframe as dd +import dask_cudf import pandas as pd from dask_sql.utils import make_pickable_without_dask_sql, new_temporary_column @@ -28,10 +29,14 @@ def apply_sort( # sort_partition_func. Tools like dask-cudf have a limited but fast # multi-column sort implementation. We check if any sorting/null sorting # is required. If so, we fall back to default sorting implementation - if any(sort_null_first) is False and all(sort_ascending) is True: + if ( + isinstance(df, dask_cudf.DataFrame) + and all(sort_ascending) + and not any(sort_null_first) + ): try: return multi_col_sort(df, sort_columns, sort_ascending, sort_null_first) - except NotImplementedError: + except ValueError: pass # Split the first column. We need to handle this one with set_index From d321ca3e3fe100287f8c892111b018fef24119a8 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 2 Sep 2021 07:01:46 -0700 Subject: [PATCH 04/15] Wrap dask_cudf import in try/except block --- dask_sql/physical/utils/sort.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 9b6833597..675e17f60 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -1,11 +1,15 @@ from typing import List import dask.dataframe as dd -import dask_cudf import pandas as pd from dask_sql.utils import make_pickable_without_dask_sql, new_temporary_column +try: + import dask_cudf +except ImportError: + dask_cudf = None + def multi_col_sort( df: dd.DataFrame, @@ -30,7 +34,8 @@ def apply_sort( # multi-column sort implementation. We check if any sorting/null sorting # is required. If so, we fall back to default sorting implementation if ( - isinstance(df, dask_cudf.DataFrame) + dask_cudf is not None + and isinstance(df, dask_cudf.DataFrame) and all(sort_ascending) and not any(sort_null_first) ): From ed6522883dc48b42833edf3a8fabd2a60c4a3eca Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 2 Sep 2021 07:09:36 -0700 Subject: [PATCH 05/15] Add test for fast multi column sort --- dask_sql/physical/utils/sort.py | 1 - tests/integration/fixtures.py | 8 ++++++++ tests/integration/test_dask_cudf.py | 22 ++++++++++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 tests/integration/test_dask_cudf.py diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 675e17f60..e59e6540d 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -28,7 +28,6 @@ def apply_sort( sort_ascending: List[bool], sort_null_first: List[bool], ) -> dd.DataFrame: - # Try fast path for multi-column sorting before falling back to # sort_partition_func. Tools like dask-cudf have a limited but fast # multi-column sort implementation. We check if any sorting/null sorting diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index 4566d3690..8129aed61 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -9,6 +9,11 @@ from dask.distributed import Client from pandas.testing import assert_frame_equal +try: + import dask_cudf +except ImportError: + dask_cudf = None + @pytest.fixture() def timeseries_df(c): @@ -117,6 +122,9 @@ def c( for df_name, df in dfs.items(): dask_df = dd.from_pandas(df, npartitions=3) c.create_table(df_name, dask_df) + if dask_cudf is not None: + cudf_df = dask_cudf.from_dask_dataframe(dask_df) + c.create_table("cudf_" + df_name, cudf_df) yield c diff --git a/tests/integration/test_dask_cudf.py b/tests/integration/test_dask_cudf.py new file mode 100644 index 000000000..add15f1ce --- /dev/null +++ b/tests/integration/test_dask_cudf.py @@ -0,0 +1,22 @@ +import pandas as pd +import pytest +from pandas.testing import assert_frame_equal + +pytest.importorskip("dask_cudf") + + +def test_cudf_order_by(c): + df = c.sql( + """ + SELECT + * + FROM cudf_user_table_1 + ORDER BY user_id + """ + ) + df = df.compute().to_pandas() + + expected_df = pd.DataFrame( + {"user_id": [2, 1, 2, 3], "b": [3, 3, 1, 3]} + ).sort_values(by="user_id") + assert_frame_equal(df, expected_df) From 76eb2aa4313e49d743b2155dc90ad10865d1e19d Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 2 Sep 2021 07:39:45 -0700 Subject: [PATCH 06/15] Move multi_col_sort contents to apply_sort --- dask_sql/physical/utils/sort.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index e59e6540d..ae6a9f7f7 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -3,7 +3,7 @@ import dask.dataframe as dd import pandas as pd -from dask_sql.utils import make_pickable_without_dask_sql, new_temporary_column +from dask_sql.utils import make_pickable_without_dask_sql try: import dask_cudf @@ -11,17 +11,6 @@ dask_cudf = None -def multi_col_sort( - df: dd.DataFrame, - sort_columns: List[str], - sort_ascending: List[bool], - sort_null_first: List[bool], -) -> dd.DataFrame: - - df = df.sort_values(sort_columns) - return df.persist() - - def apply_sort( df: dd.DataFrame, sort_columns: List[str], @@ -39,7 +28,8 @@ def apply_sort( and not any(sort_null_first) ): try: - return multi_col_sort(df, sort_columns, sort_ascending, sort_null_first) + df = df.sort_values(sort_columns) + return df.persist() except ValueError: pass From 927c6185751be62855299cd02e7ea5fc881a9608 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Fri, 10 Sep 2021 06:51:38 -0700 Subject: [PATCH 07/15] Ignore index for dask-cudf sorting --- dask_sql/physical/utils/sort.py | 2 +- tests/integration/test_dask_cudf.py | 22 +++++++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index ae6a9f7f7..85075b0bb 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -28,7 +28,7 @@ def apply_sort( and not any(sort_null_first) ): try: - df = df.sort_values(sort_columns) + df = df.sort_values(sort_columns, ignore_index=True) return df.persist() except ValueError: pass diff --git a/tests/integration/test_dask_cudf.py b/tests/integration/test_dask_cudf.py index add15f1ce..d8ec7bb66 100644 --- a/tests/integration/test_dask_cudf.py +++ b/tests/integration/test_dask_cudf.py @@ -1,9 +1,9 @@ -import pandas as pd import pytest -from pandas.testing import assert_frame_equal pytest.importorskip("dask_cudf") +from cudf.testing._utils import assert_eq + def test_cudf_order_by(c): df = c.sql( @@ -13,10 +13,18 @@ def test_cudf_order_by(c): FROM cudf_user_table_1 ORDER BY user_id """ + ).compute() + + expected_df = ( + c.sql( + """ + SELECT + * + FROM cudf_user_table_1 + """ + ) + .sort_values(by="user_id", ignore_index=True) + .compute() ) - df = df.compute().to_pandas() - expected_df = pd.DataFrame( - {"user_id": [2, 1, 2, 3], "b": [3, 3, 1, 3]} - ).sort_values(by="user_id") - assert_frame_equal(df, expected_df) + assert_eq(df, expected_df) From 963ad5e2f16a2303a92492c96423fe2b173818f4 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Fri, 10 Sep 2021 06:55:42 -0700 Subject: [PATCH 08/15] Fix show tables test for cudf enabled fixture --- tests/integration/test_show.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/integration/test_show.py b/tests/integration/test_show.py index 2165699ca..aeb1af7b5 100644 --- a/tests/integration/test_show.py +++ b/tests/integration/test_show.py @@ -2,6 +2,11 @@ import pytest from pandas.testing import assert_frame_equal +try: + import dask_cudf +except ImportError: + dask_cudf = None + def test_schemas(c): df = c.sql("SHOW SCHEMAS") @@ -36,6 +41,27 @@ def test_tables(c): "string_table", "datetime_table", ] + if dask_cudf is None + else [ + "df_simple", + "cudf_df_simple", + "df", + "cudf_df", + "user_table_1", + "cudf_user_table_1", + "user_table_2", + "cudf_user_table_2", + "long_table", + "cudf_long_table", + "user_table_inf", + "cudf_user_table_inf", + "user_table_nan", + "cudf_user_table_nan", + "string_table", + "cudf_string_table", + "datetime_table", + "cudf_datetime_table", + ] } ) From 5fb3c41ab574cf44057d445981d5e7fc47660561 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 30 Sep 2021 08:59:31 -0700 Subject: [PATCH 09/15] Trigger CI From 645eebf8f37171f5d11acb1f28db44c8e76a24dd Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Wed, 6 Oct 2021 07:09:25 -0700 Subject: [PATCH 10/15] Add single partition sort case --- dask_sql/physical/utils/sort.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 85075b0bb..387b31e3c 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -2,6 +2,8 @@ import dask.dataframe as dd import pandas as pd +from dask.array.routines import isin +from dask.utils import M from dask_sql.utils import make_pickable_without_dask_sql @@ -17,6 +19,30 @@ def apply_sort( sort_ascending: List[bool], sort_null_first: List[bool], ) -> dd.DataFrame: + # single partition cases for cudf/pandas + if df.npartitions == 1: + if dask_cudf is not None and isinstance(df, dask_cudf.DataFrame): + if (all(sort_ascending) or not any(sort_ascending)) and not any( + sort_null_first[1:] + ): + return df.map_partitions( + M.sort_values, + by=sort_columns, + ascending=all(sort_ascending), + na_position="first" if sort_null_first[0] else "last", + ) + if not any(sort_null_first): + return df.map_partitions( + M.sort_values, by=sort_columns, ascending=sort_ascending + ) + elif not any(sort_null_first[1:]): + return df.map_partitions( + M.sort_values, + by=sort_columns, + ascending=sort_ascending, + na_position="first" if sort_null_first[0] else "last", + ) + # Try fast path for multi-column sorting before falling back to # sort_partition_func. Tools like dask-cudf have a limited but fast # multi-column sort implementation. We check if any sorting/null sorting From 2e65ca80325272d83d0e659e75f5a5364fa220e4 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Wed, 6 Oct 2021 07:28:25 -0700 Subject: [PATCH 11/15] Return cudf sorted dataframe without persisting --- dask_sql/physical/utils/sort.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 387b31e3c..bf46f936d 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -54,8 +54,7 @@ def apply_sort( and not any(sort_null_first) ): try: - df = df.sort_values(sort_columns, ignore_index=True) - return df.persist() + return df.sort_values(sort_columns, ignore_index=True) except ValueError: pass From 49d6cf2a4615ced3468a886ba200ea4f330cfc8e Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 7 Oct 2021 09:01:28 -0700 Subject: [PATCH 12/15] Update nan sort test to reflect Pandas' sort_values ordering --- tests/integration/test_sort.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_sort.py b/tests/integration/test_sort.py index 34dc94e4f..065deca88 100644 --- a/tests/integration/test_sort.py +++ b/tests/integration/test_sort.py @@ -164,7 +164,7 @@ def test_sort_with_nan(): assert_frame_equal( df_result, pd.DataFrame( - {"a": [float("nan"), 2, 2, 1], "b": [5, float("inf"), float("nan"), 4]} + {"a": [float("nan"), 2, 2, 1], "b": [5, float("nan"), float("inf"), 4]} ), ) @@ -176,7 +176,7 @@ def test_sort_with_nan(): assert_frame_equal( df_result, pd.DataFrame( - {"a": [float("nan"), 2, 2, 1], "b": [5, float("inf"), float("nan"), 4]} + {"a": [float("nan"), 2, 2, 1], "b": [5, float("nan"), float("inf"), 4]} ), ) @@ -188,7 +188,7 @@ def test_sort_with_nan(): assert_frame_equal( df_result, pd.DataFrame( - {"a": [2, 2, 1, float("nan")], "b": [float("inf"), float("nan"), 4, 5]} + {"a": [2, 2, 1, float("nan")], "b": [float("nan"), float("inf"), 4, 5]} ), ) From d55fc8426ea412d77efe1e6ce3fa9287b2f874c9 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 7 Oct 2021 10:57:44 -0700 Subject: [PATCH 13/15] Add comments tracking relevant [dask-]cudf issues --- dask_sql/physical/utils/sort.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index bf46f936d..138b7d99a 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -19,9 +19,11 @@ def apply_sort( sort_ascending: List[bool], sort_null_first: List[bool], ) -> dd.DataFrame: - # single partition cases for cudf/pandas + # if we have a single partition, we can sometimes sort with map_partitions if df.npartitions == 1: if dask_cudf is not None and isinstance(df, dask_cudf.DataFrame): + # cudf only supports null positioning if `ascending` is a single boolean: + # https://github.com/rapidsai/cudf/issues/9400 if (all(sort_ascending) or not any(sort_ascending)) and not any( sort_null_first[1:] ): @@ -43,10 +45,9 @@ def apply_sort( na_position="first" if sort_null_first[0] else "last", ) - # Try fast path for multi-column sorting before falling back to - # sort_partition_func. Tools like dask-cudf have a limited but fast - # multi-column sort implementation. We check if any sorting/null sorting - # is required. If so, we fall back to default sorting implementation + # dask-cudf only supports ascending sort / nulls last: + # https://github.com/rapidsai/cudf/pull/9250 + # https://github.com/rapidsai/cudf/pull/9264 if ( dask_cudf is not None and isinstance(df, dask_cudf.DataFrame) From a1857553e4c0ff547af39744990dfa7aef3fe6f0 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 7 Oct 2021 12:16:28 -0700 Subject: [PATCH 14/15] Move GPU sorting tests to test_sort.py --- tests/integration/fixtures.py | 30 +++- tests/integration/test_dask_cudf.py | 30 ---- tests/integration/test_show.py | 20 +-- tests/integration/test_sort.py | 224 ++++++++++++++++++++++------ 4 files changed, 212 insertions(+), 92 deletions(-) delete mode 100644 tests/integration/test_dask_cudf.py diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index 8129aed61..7fc3ff32b 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -10,9 +10,9 @@ from pandas.testing import assert_frame_equal try: - import dask_cudf + import cudf except ImportError: - dask_cudf = None + cudf = None @pytest.fixture() @@ -91,6 +91,21 @@ def datetime_table(): ) +@pytest.fixture() +def gpu_user_table_1(user_table_1): + return cudf.from_pandas(user_table_1) if cudf else None + + +@pytest.fixture() +def gpu_df(df): + return cudf.from_pandas(df) if cudf else None + + +@pytest.fixture() +def gpu_long_table(long_table): + return cudf.from_pandas(long_table) if cudf else None + + @pytest.fixture() def c( df_simple, @@ -102,6 +117,9 @@ def c( user_table_nan, string_table, datetime_table, + gpu_user_table_1, + gpu_df, + gpu_long_table, ): dfs = { "df_simple": df_simple, @@ -113,6 +131,9 @@ def c( "user_table_nan": user_table_nan, "string_table": string_table, "datetime_table": datetime_table, + "gpu_user_table_1": gpu_user_table_1, + "gpu_df": gpu_df, + "gpu_long_table": gpu_long_table, } # Lazy import, otherwise the pytest framework has problems @@ -120,11 +141,10 @@ def c( c = Context() for df_name, df in dfs.items(): + if df is None: + continue dask_df = dd.from_pandas(df, npartitions=3) c.create_table(df_name, dask_df) - if dask_cudf is not None: - cudf_df = dask_cudf.from_dask_dataframe(dask_df) - c.create_table("cudf_" + df_name, cudf_df) yield c diff --git a/tests/integration/test_dask_cudf.py b/tests/integration/test_dask_cudf.py deleted file mode 100644 index d8ec7bb66..000000000 --- a/tests/integration/test_dask_cudf.py +++ /dev/null @@ -1,30 +0,0 @@ -import pytest - -pytest.importorskip("dask_cudf") - -from cudf.testing._utils import assert_eq - - -def test_cudf_order_by(c): - df = c.sql( - """ - SELECT - * - FROM cudf_user_table_1 - ORDER BY user_id - """ - ).compute() - - expected_df = ( - c.sql( - """ - SELECT - * - FROM cudf_user_table_1 - """ - ) - .sort_values(by="user_id", ignore_index=True) - .compute() - ) - - assert_eq(df, expected_df) diff --git a/tests/integration/test_show.py b/tests/integration/test_show.py index aeb1af7b5..eb9c18337 100644 --- a/tests/integration/test_show.py +++ b/tests/integration/test_show.py @@ -3,9 +3,9 @@ from pandas.testing import assert_frame_equal try: - import dask_cudf + import cudf except ImportError: - dask_cudf = None + cudf = None def test_schemas(c): @@ -41,26 +41,20 @@ def test_tables(c): "string_table", "datetime_table", ] - if dask_cudf is None + if cudf is None else [ - "df_simple", - "cudf_df_simple", "df", - "cudf_df", + "df_simple", "user_table_1", - "cudf_user_table_1", "user_table_2", - "cudf_user_table_2", "long_table", - "cudf_long_table", "user_table_inf", - "cudf_user_table_inf", "user_table_nan", - "cudf_user_table_nan", "string_table", - "cudf_string_table", "datetime_table", - "cudf_datetime_table", + "gpu_user_table_1", + "gpu_df", + "gpu_long_table", ] } ) diff --git a/tests/integration/test_sort.py b/tests/integration/test_sort.py index 065deca88..fb8af592c 100644 --- a/tests/integration/test_sort.py +++ b/tests/integration/test_sort.py @@ -15,12 +15,12 @@ def test_sort(c, user_table_1, df): ORDER BY b, user_id DESC """ ) - df_result = df_result.compute().reset_index(drop=True) - df_expected = user_table_1.sort_values( - ["b", "user_id"], ascending=[True, False] - ).reset_index(drop=True) + df_result = df_result.compute() + df_expected = user_table_1.sort_values(["b", "user_id"], ascending=[True, False]) - assert_frame_equal(df_result, df_expected) + assert_frame_equal( + df_result.reset_index(drop=True), df_expected.reset_index(drop=True) + ) df_result = c.sql( """ @@ -87,17 +87,23 @@ def test_sort_by_alias(c, user_table_1): assert_frame_equal(df_result, df_expected) -def test_sort_with_nan(): +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_sort_with_nan(gpu): + if gpu: + xd = pytest.importorskip("cudf") + else: + xd = pd + c = Context() - df = pd.DataFrame( + df = xd.DataFrame( {"a": [1, 2, float("nan"), 2], "b": [4, float("nan"), 5, float("inf")]} ) c.create_table("df", df) df_result = c.sql("SELECT * FROM df ORDER BY a").compute().reset_index(drop=True) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( + xd.DataFrame( {"a": [1, 2, 2, float("nan")], "b": [4, float("nan"), float("inf"), 5]} ), ) @@ -107,9 +113,9 @@ def test_sort_with_nan(): .compute() .reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( + xd.DataFrame( {"a": [float("nan"), 1, 2, 2], "b": [5, 4, float("nan"), float("inf")]} ), ) @@ -117,9 +123,9 @@ def test_sort_with_nan(): df_result = ( c.sql("SELECT * FROM df ORDER BY a NULLS LAST").compute().reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( + xd.DataFrame( {"a": [1, 2, 2, float("nan")], "b": [4, float("nan"), float("inf"), 5]} ), ) @@ -127,9 +133,9 @@ def test_sort_with_nan(): df_result = ( c.sql("SELECT * FROM df ORDER BY a ASC").compute().reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( + xd.DataFrame( {"a": [1, 2, 2, float("nan")], "b": [4, float("nan"), float("inf"), 5]} ), ) @@ -139,9 +145,9 @@ def test_sort_with_nan(): .compute() .reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( + xd.DataFrame( {"a": [float("nan"), 1, 2, 2], "b": [5, 4, float("nan"), float("inf")]} ), ) @@ -151,9 +157,9 @@ def test_sort_with_nan(): .compute() .reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( + xd.DataFrame( {"a": [1, 2, 2, float("nan")], "b": [4, float("nan"), float("inf"), 5]} ), ) @@ -161,9 +167,9 @@ def test_sort_with_nan(): df_result = ( c.sql("SELECT * FROM df ORDER BY a DESC").compute().reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( + xd.DataFrame( {"a": [float("nan"), 2, 2, 1], "b": [5, float("nan"), float("inf"), 4]} ), ) @@ -173,9 +179,9 @@ def test_sort_with_nan(): .compute() .reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( + xd.DataFrame( {"a": [float("nan"), 2, 2, 1], "b": [5, float("nan"), float("inf"), 4]} ), ) @@ -185,17 +191,23 @@ def test_sort_with_nan(): .compute() .reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( + xd.DataFrame( {"a": [2, 2, 1, float("nan")], "b": [float("nan"), float("inf"), 4, 5]} ), ) -def test_sort_with_nan_more_columns(): +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_sort_with_nan_more_columns(gpu): + if gpu: + xd = pytest.importorskip("cudf") + else: + xd = pd + c = Context() - df = pd.DataFrame( + df = xd.DataFrame( { "a": [1, 1, 2, 2, float("nan"), float("nan")], "b": [1, 1, 2, float("nan"), float("inf"), 5], @@ -211,9 +223,7 @@ def test_sort_with_nan_more_columns(): .c.compute() .reset_index(drop=True) ) - assert_series_equal( - df_result, pd.Series([5, 6, float("nan"), 1, 3, 4]), check_names=False - ) + dd.assert_eq(df_result, xd.Series([5, 6, float("nan"), 1, 3, 4]), check_names=False) df_result = ( c.sql( @@ -222,14 +232,18 @@ def test_sort_with_nan_more_columns(): .c.compute() .reset_index(drop=True) ) - assert_series_equal( - df_result, pd.Series([1, float("nan"), 4, 3, 5, 6]), check_names=False - ) + dd.assert_eq(df_result, xd.Series([1, float("nan"), 4, 3, 5, 6]), check_names=False) + +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_sort_with_nan_many_partitions(gpu): + if gpu: + xd = pytest.importorskip("cudf") + else: + xd = pd -def test_sort_with_nan_many_partitions(): c = Context() - df = pd.DataFrame({"a": [float("nan"), 1] * 30, "b": [1, 2, 3] * 20,}) + df = xd.DataFrame({"a": [float("nan"), 1] * 30, "b": [1, 2, 3] * 20,}) c.create_table("df", dd.from_pandas(df, npartitions=10)) df_result = ( @@ -238,9 +252,9 @@ def test_sort_with_nan_many_partitions(): .reset_index(drop=True) ) - assert_frame_equal( + dd.assert_eq( df_result, - pd.DataFrame( + xd.DataFrame( { "a": [float("nan")] * 30 + [1] * 30, "b": [1] * 10 + [2] * 10 + [3] * 10 + [1] * 10 + [2] * 10 + [3] * 10, @@ -248,16 +262,22 @@ def test_sort_with_nan_many_partitions(): ), ) - df = pd.DataFrame({"a": [float("nan"), 1] * 30}) + df = xd.DataFrame({"a": [float("nan"), 1] * 30}) c.create_table("df", dd.from_pandas(df, npartitions=10)) df_result = c.sql("SELECT * FROM df ORDER BY a").compute().reset_index(drop=True) - assert_frame_equal(df_result, pd.DataFrame({"a": [1] * 30 + [float("nan")] * 30,})) + dd.assert_eq(df_result, xd.DataFrame({"a": [1] * 30 + [float("nan")] * 30,})) -def test_sort_strings(c): - string_table = pd.DataFrame({"a": ["zzhsd", "öfjdf", "baba"]}) +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_sort_strings(c, gpu): + if gpu: + xd = pytest.importorskip("cudf") + else: + xd = pd + + string_table = xd.DataFrame({"a": ["zzhsd", "öfjdf", "baba"]}) c.create_table("string_table", string_table) df_result = c.sql( @@ -271,13 +291,19 @@ def test_sort_strings(c): df_result = df_result.compute().reset_index(drop=True) df_expected = string_table.sort_values(["a"], ascending=True).reset_index(drop=True) - assert_frame_equal(df_result, df_expected) + dd.assert_eq(df_result, df_expected) + +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_sort_not_allowed(c, gpu): + if gpu: + table_name = "gpu_user_table_1" + else: + table_name = "user_table_1" -def test_sort_not_allowed(c): # Wrong column with pytest.raises(Exception): - c.sql("SELECT * FROM user_table_1 ORDER BY 42") + c.sql(f"SELECT * FROM {table_name} ORDER BY 42") def test_limit(c, long_table): @@ -310,3 +336,113 @@ def test_limit(c, long_table): df_result = df_result.compute() assert_frame_equal(df_result, long_table.iloc[101 : 101 + 101]) + + +@pytest.mark.gpu +def test_sort_gpu(c, gpu_user_table_1, gpu_df): + df_result = c.sql( + """ + SELECT + * + FROM gpu_user_table_1 + ORDER BY b, user_id DESC + """ + ) + df_result = df_result.compute() + df_expected = gpu_user_table_1.sort_values( + ["b", "user_id"], ascending=[True, False] + ) + + dd.assert_eq(df_result.reset_index(drop=True), df_expected.reset_index(drop=True)) + + df_result = c.sql( + """ + SELECT + * + FROM gpu_df + ORDER BY b DESC, a DESC + """ + ) + df_result = df_result.compute() + df_expected = gpu_df.sort_values(["b", "a"], ascending=[False, False]) + + dd.assert_eq(df_result.reset_index(drop=True), df_expected.reset_index(drop=True)) + + df_result = c.sql( + """ + SELECT + * + FROM gpu_df + ORDER BY a DESC, b + """ + ) + df_result = df_result.compute() + df_expected = gpu_df.sort_values(["a", "b"], ascending=[False, True]) + + dd.assert_eq(df_result.reset_index(drop=True), df_expected.reset_index(drop=True)) + + df_result = c.sql( + """ + SELECT + * + FROM gpu_df + ORDER BY b, a + """ + ) + df_result = df_result.compute() + df_expected = gpu_df.sort_values(["b", "a"], ascending=[True, True]) + + dd.assert_eq(df_result.reset_index(drop=True), df_expected.reset_index(drop=True)) + + +@pytest.mark.gpu +def test_sort_gpu_by_alias(c, gpu_user_table_1): + df_result = c.sql( + """ + SELECT + b AS my_column + FROM gpu_user_table_1 + ORDER BY my_column, user_id DESC + """ + ) + df_result = ( + df_result.compute().reset_index(drop=True).rename(columns={"my_column": "b"}) + ) + df_expected = gpu_user_table_1.sort_values( + ["b", "user_id"], ascending=[True, False] + ).reset_index(drop=True)[["b"]] + + dd.assert_eq(df_result, df_expected) + + +@pytest.mark.gpu +def test_limit_gpu(c, gpu_long_table): + df_result = c.sql("SELECT * FROM gpu_long_table LIMIT 101") + df_result = df_result.compute() + + dd.assert_eq(df_result, gpu_long_table.iloc[:101]) + + df_result = c.sql("SELECT * FROM gpu_long_table LIMIT 200") + df_result = df_result.compute() + + dd.assert_eq(df_result, gpu_long_table.iloc[:200]) + + df_result = c.sql("SELECT * FROM gpu_long_table LIMIT 100") + df_result = df_result.compute() + + dd.assert_eq(df_result, gpu_long_table.iloc[:100]) + + df_result = c.sql("SELECT * FROM gpu_long_table LIMIT 100 OFFSET 99") + df_result = df_result.compute() + + dd.assert_eq(df_result, gpu_long_table.iloc[99 : 99 + 100]) + + df_result = c.sql("SELECT * FROM gpu_long_table LIMIT 100 OFFSET 100") + df_result = df_result.compute() + + dd.assert_eq(df_result, gpu_long_table.iloc[100 : 100 + 100]) + + df_result = c.sql("SELECT * FROM gpu_long_table LIMIT 101 OFFSET 101") + df_result = df_result.compute() + + dd.assert_eq(df_result, gpu_long_table.iloc[101 : 101 + 101]) From 9f4868c730518d045ef7f80a9bbeb773fa23f307 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 7 Oct 2021 12:17:34 -0700 Subject: [PATCH 15/15] Remove unnecessary isin import --- dask_sql/physical/utils/sort.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 138b7d99a..5857a3321 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -2,7 +2,6 @@ import dask.dataframe as dd import pandas as pd -from dask.array.routines import isin from dask.utils import M from dask_sql.utils import make_pickable_without_dask_sql