Skip to content

[Review] Add fast path for multi-column sorting #229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion dask_sql/physical/utils/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@

import dask.dataframe as dd
import pandas as pd
from dask.utils import M

from dask_sql.utils import make_pickable_without_dask_sql, new_temporary_column
from dask_sql.utils import make_pickable_without_dask_sql

try:
import dask_cudf
except ImportError:
dask_cudf = None


def apply_sort(
Expand All @@ -12,6 +18,46 @@ def apply_sort(
sort_ascending: List[bool],
sort_null_first: List[bool],
) -> dd.DataFrame:
# if we have a single partition, we can sometimes sort with map_partitions
if df.npartitions == 1:
if dask_cudf is not None and isinstance(df, dask_cudf.DataFrame):
# cudf only supports null positioning if `ascending` is a single boolean:
# https://github.com/rapidsai/cudf/issues/9400
if (all(sort_ascending) or not any(sort_ascending)) and not any(
sort_null_first[1:]
):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really not a fan of these single partition checks, but this is reflective of cuDF's sort_values behavior, which can only do:

  • a list for ascending with no support for na_position
  • a single boolean for ascending with support for na_position

Ideally we would want cuDF to match Pandas' behavior and support na_position when providing a list for ascending, which would simplify this whole block to:

     if df.npartitions == 1 and not any(sort_null_first[1:]):
         return df.map_partitions(
             M.sort_values,
             by=sort_columns,
             ascending=sort_ascending,
             na_position="first" if sort_null_first[0] else "last",
         )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include these comments in the code so we can go back an fix at a later date. Also, can you file an issue with cuDF to help track support for na_position with a list for ascending ? Hmm, are these cudf or dask-cudf issues ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are cuDF specific - right now ascending sort / null positioning are still WIP in dask-cudf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right. Ok, then maybe comments with links to those PRs and we can continue iterating

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also something that might be worth discussing is the eventual upstreaming of this logic - in the long run, it might make sense to have these map_partition calls done implicitly in df.sort_values if we see that df.npartitions == 1. This would allow us to simplify the logic here to just a single sort_values call that is optimal regardless of partition count.

I discussed that possibility for Dask briefly here:

dask/dask#8225 (comment)

return df.map_partitions(
M.sort_values,
by=sort_columns,
ascending=all(sort_ascending),
na_position="first" if sort_null_first[0] else "last",
)
if not any(sort_null_first):
return df.map_partitions(
M.sort_values, by=sort_columns, ascending=sort_ascending
)
elif not any(sort_null_first[1:]):
return df.map_partitions(
M.sort_values,
by=sort_columns,
ascending=sort_ascending,
na_position="first" if sort_null_first[0] else "last",
)

# dask-cudf only supports ascending sort / nulls last:
# https://github.com/rapidsai/cudf/pull/9250
# https://github.com/rapidsai/cudf/pull/9264
if (
dask_cudf is not None
and isinstance(df, dask_cudf.DataFrame)
and all(sort_ascending)
and not any(sort_null_first)
):
try:
return df.sort_values(sort_columns, ignore_index=True)
except ValueError:
pass

# Split the first column. We need to handle this one with set_index
first_sort_column = sort_columns[0]
first_sort_ascending = sort_ascending[0]
Expand Down
28 changes: 28 additions & 0 deletions tests/integration/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
from dask.distributed import Client
from pandas.testing import assert_frame_equal

try:
import cudf
except ImportError:
cudf = None


@pytest.fixture()
def timeseries_df(c):
Expand Down Expand Up @@ -86,6 +91,21 @@ def datetime_table():
)


@pytest.fixture()
def gpu_user_table_1(user_table_1):
return cudf.from_pandas(user_table_1) if cudf else None


@pytest.fixture()
def gpu_df(df):
return cudf.from_pandas(df) if cudf else None


@pytest.fixture()
def gpu_long_table(long_table):
return cudf.from_pandas(long_table) if cudf else None


@pytest.fixture()
def c(
df_simple,
Expand All @@ -97,6 +117,9 @@ def c(
user_table_nan,
string_table,
datetime_table,
gpu_user_table_1,
gpu_df,
gpu_long_table,
):
dfs = {
"df_simple": df_simple,
Expand All @@ -108,13 +131,18 @@ def c(
"user_table_nan": user_table_nan,
"string_table": string_table,
"datetime_table": datetime_table,
"gpu_user_table_1": gpu_user_table_1,
"gpu_df": gpu_df,
"gpu_long_table": gpu_long_table,
}

# Lazy import, otherwise the pytest framework has problems
from dask_sql.context import Context

c = Context()
for df_name, df in dfs.items():
if df is None:
continue
dask_df = dd.from_pandas(df, npartitions=3)
c.create_table(df_name, dask_df)

Expand Down
20 changes: 20 additions & 0 deletions tests/integration/test_show.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
import pytest
from pandas.testing import assert_frame_equal

try:
import cudf
except ImportError:
cudf = None


def test_schemas(c):
df = c.sql("SHOW SCHEMAS")
Expand Down Expand Up @@ -36,6 +41,21 @@ def test_tables(c):
"string_table",
"datetime_table",
]
if cudf is None
else [
"df",
"df_simple",
"user_table_1",
"user_table_2",
"long_table",
"user_table_inf",
"user_table_nan",
"string_table",
"datetime_table",
"gpu_user_table_1",
"gpu_df",
"gpu_long_table",
]
}
)

Expand Down
Loading