-
Notifications
You must be signed in to change notification settings - Fork 71
[Review] Add fast path for multi-column sorting #229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## main #229 +/- ##
===========================================
- Coverage 100.00% 99.61% -0.39%
===========================================
Files 64 64
Lines 2590 2609 +19
Branches 361 367 +6
===========================================
+ Hits 2590 2599 +9
- Misses 0 8 +8
- Partials 0 2 +2
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, I like that! Once we are sure about the testing (as you have said) we can merge.
Regarding the multi-column sorting in dask: one problem is (as you mentioned) the quantile method, another one is however: how to handle the index, as Dask does currently not support multi-index (multi-column) divisions (as far as I recall). How do you handle this in dask-cudf?
Sorry, stupid me. The code of course works for a single column. |
I'm going to be taking this PR over from @quasiben - could I get workflow approval? |
Thanks @charlesbluca I approved so you should be able to get CI runs now |
@nils-braun if you have time I think this is ready for another review |
FYI I am currently working on implementing If that gets merged first, we could update the sorting logic here to reflect this change |
In some cases, dask-cudf's sorting doesn't match up with dask-sql's; for example: import cudf
import pandas as pd
from dask_sql import Context
df = pd.DataFrame({"user_id": [2, 1, 2, 3], "b": [3, 3, 1, 3]})
c = Context()
c.create_table("gdf", df, gpu=True, npartitions=2)
c.create_table("pdf", df, gpu=False, npartitions=2)
got, expect = (c.sql(
"""
SELECT
*
FROM %s
ORDER BY user_id
""" % s
).compute() for s in ("gdf", "pdf"))
print(got)
print(expect)
user_id b
0 1 3
0 2 3
1 2 1
2 3 3
user_id b
0 1 3
0 2 1
1 2 3
2 3 3 I'm not sure if the dask-sql's sorting of rows with identical values in the sort-by column is intentional, but for now I worked around this by testing the SQL sorted dask-cudf table against the same table sorted by calling |
if dask_cudf is not None and isinstance(df, dask_cudf.DataFrame): | ||
if (all(sort_ascending) or not any(sort_ascending)) and not any( | ||
sort_null_first[1:] | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really not a fan of these single partition checks, but this is reflective of cuDF's sort_values
behavior, which can only do:
- a list for
ascending
with no support forna_position
- a single boolean for
ascending
with support forna_position
Ideally we would want cuDF to match Pandas' behavior and support na_position
when providing a list for ascending
, which would simplify this whole block to:
if df.npartitions == 1 and not any(sort_null_first[1:]):
return df.map_partitions(
M.sort_values,
by=sort_columns,
ascending=sort_ascending,
na_position="first" if sort_null_first[0] else "last",
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you include these comments in the code so we can go back an fix at a later date. Also, can you file an issue with cuDF to help track support for na_position
with a list for ascending ? Hmm, are these cudf or dask-cudf issues ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are cuDF specific - right now ascending sort / null positioning are still WIP in dask-cudf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right. Ok, then maybe comments with links to those PRs and we can continue iterating
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also something that might be worth discussing is the eventual upstreaming of this logic - in the long run, it might make sense to have these map_partition
calls done implicitly in df.sort_values
if we see that df.npartitions == 1
. This would allow us to simplify the logic here to just a single sort_values
call that is optimal regardless of partition count.
I discussed that possibility for Dask briefly here:
rerun tests |
Closes #9158 Adds support for descending sort in dask-cudf's `sort_values`, which should open up the use cases for it in dask-contrib/dask-sql#229. Authors: - Charles Blackmon-Luca (https://github.com/charlesbluca) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: #9250
This PR adds an optional fast path for multi-column sorting for libraries like Dask-cuDF was has a work implementation. The implementation currently does not support ascending/na_position kwargs -- we might be able to do something similar to what is currently implemented in _sort_first_column where we check for NAs/Descending. For now, though, if the users needs those options the fall back is to the default implementation.
We can add tests for this PR after #227 is resolved.
Lastly, if we want to resolve multi-column sorting for Dask, we could use similar implementation in dask-cudf which relies on multi-column quantiles . This would mean adding a
quantiles
method to pandascc @rjzamora