Skip to content

Commit

Permalink
Support dask>=2024.11.2 in Dask cuDF (#17439)
Browse files Browse the repository at this point in the history
This updates Dask cuDF so that we can unpin from 2024.11.2 for 25.02 development in 

**TODO**: Create a branch in [rapids-dask-dependency](https://github.com/rapidsai/rapids-dask-dependency) to unpin from 2024.11.2 (to demonstrate that CI passes).

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Peter Andreas Entschev (https://github.com/pentschev)
  - Matthew Murray (https://github.com/Matt711)

URL: #17439
  • Loading branch information
rjzamora authored Nov 27, 2024
1 parent 9db132a commit 2c89dba
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 47 deletions.
1 change: 1 addition & 0 deletions docs/cudf/source/user_guide/api_docs/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Attributes
Series.values
Series.data
Series.dtype
Series.dtypes
Series.shape
Series.ndim
Series.nullable
Expand Down
9 changes: 9 additions & 0 deletions python/cudf/cudf/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1598,6 +1598,15 @@ def dtype(self):
"""The dtype of the Series."""
return self._column.dtype

@property # type: ignore
@_performance_tracking
def dtypes(self):
"""The dtype of the Series.
This is an alias for `Series.dtype`.
"""
return self.dtype

@classmethod
@_performance_tracking
def _concat(cls, objs, axis=0, index: bool = True):
Expand Down
6 changes: 6 additions & 0 deletions python/cudf/cudf/tests/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2934,3 +2934,9 @@ def test_empty_astype_always_castable(type1, type2, as_dtype, copy):
assert ser._column is result._column
else:
assert ser._column is not result._column


def test_dtype_dtypes_equal():
ser = cudf.Series([0])
assert ser.dtype is ser.dtypes
assert ser.dtypes is ser.to_pandas().dtypes
6 changes: 1 addition & 5 deletions python/dask_cudf/dask_cudf/_expr/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,12 @@ def to_orc(self, *args, **kwargs):
from dask_cudf._legacy.io import to_orc

return to_orc(self, *args, **kwargs)
# return self.to_legacy_dataframe().to_orc(*args, **kwargs)

@staticmethod
def read_text(*args, **kwargs):
from dask_expr import from_legacy_dataframe

from dask_cudf._legacy.io.text import read_text as legacy_read_text

ddf = legacy_read_text(*args, **kwargs)
return from_legacy_dataframe(ddf)
return legacy_read_text(*args, **kwargs)


class Series(DXSeries, CudfFrameBase):
Expand Down
32 changes: 14 additions & 18 deletions python/dask_cudf/dask_cudf/_legacy/io/orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
from pyarrow import orc as orc

from dask import dataframe as dd
from dask.base import tokenize
from dask.dataframe.io.utils import _get_pyarrow_dtypes

import cudf


def _read_orc_stripe(fs, path, stripe, columns, kwargs=None):
def _read_orc_stripe(source, fs, columns=None, kwargs=None):
"""Pull out specific columns from specific stripe"""
path, stripe = source
if kwargs is None:
kwargs = {}
with fs.open(path, "rb") as f:
Expand Down Expand Up @@ -67,7 +67,7 @@ def read_orc(path, columns=None, filters=None, storage_options=None, **kwargs):
"""

storage_options = storage_options or {}
fs, fs_token, paths = get_fs_token_paths(
fs, _, paths = get_fs_token_paths(
path, mode="rb", storage_options=storage_options
)
schema = None
Expand Down Expand Up @@ -100,27 +100,23 @@ def read_orc(path, columns=None, filters=None, storage_options=None, **kwargs):
**kwargs,
)

name = "read-orc-" + tokenize(fs_token, path, columns, filters, **kwargs)
dsk = {}
N = 0
sources = []
for path, n in zip(paths, nstripes_per_file):
for stripe in (
range(n)
if filters is None
else cudf.io.orc._filter_stripes(filters, path)
):
dsk[(name, N)] = (
_read_orc_stripe,
fs,
path,
stripe,
columns,
kwargs,
)
N += 1

divisions = [None] * (len(dsk) + 1)
return dd.core.new_dd_object(dsk, name, meta, divisions)
sources.append((path, stripe))

return dd.from_map(
_read_orc_stripe,
sources,
args=[fs],
columns=columns,
kwargs=kwargs,
meta=meta,
)


def write_orc_partition(df, path, fs, filename, compression="snappy"):
Expand Down
40 changes: 21 additions & 19 deletions python/dask_cudf/dask_cudf/_legacy/io/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@
from glob import glob

import dask.dataframe as dd
from dask.base import tokenize
from dask.utils import apply, parse_bytes
from dask.utils import parse_bytes

import cudf


def read_text(path, chunksize="256 MiB", **kwargs):
def _read_text(source, **kwargs):
# Wrapper for cudf.read_text operation
fn, byte_range = source
return cudf.read_text(fn, byte_range=byte_range, **kwargs)


def read_text(path, chunksize="256 MiB", byte_range=None, **kwargs):
if isinstance(chunksize, str):
chunksize = parse_bytes(chunksize)

Expand All @@ -27,28 +32,25 @@ def read_text(path, chunksize="256 MiB", **kwargs):
msg = f"A file in: {filenames} does not exist."
raise FileNotFoundError(msg)

name = "read-text-" + tokenize(path, tokenize, **kwargs)
if chunksize and byte_range:
raise ValueError("Cannot specify both chunksize and byte_range.")

if chunksize:
dsk = {}
i = 0
sources = []
for fn in filenames:
size = os.path.getsize(fn)
for start in range(0, size, chunksize):
kwargs1 = kwargs.copy()
kwargs1["byte_range"] = (
byte_range = (
start,
chunksize,
) # specify which chunk of the file we care about

dsk[(name, i)] = (apply, cudf.read_text, [fn], kwargs1)
i += 1
sources.append((fn, byte_range))
else:
dsk = {
(name, i): (apply, cudf.read_text, [fn], kwargs)
for i, fn in enumerate(filenames)
}

meta = cudf.Series([], dtype="O")
divisions = [None] * (len(dsk) + 1)
return dd.core.new_dd_object(dsk, name, meta, divisions)
sources = [(fn, byte_range) for fn in filenames]

return dd.from_map(
_read_text,
sources,
meta=cudf.Series([], dtype="O"),
**kwargs,
)
5 changes: 1 addition & 4 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,9 +738,6 @@ def read_json(*args, **kwargs):

@staticmethod
def read_orc(*args, **kwargs):
from dask_expr import from_legacy_dataframe

from dask_cudf._legacy.io.orc import read_orc as legacy_read_orc

ddf = legacy_read_orc(*args, **kwargs)
return from_legacy_dataframe(ddf)
return legacy_read_orc(*args, **kwargs)
11 changes: 10 additions & 1 deletion python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@
from dask.tokenize import tokenize
from dask.utils import parse_bytes

try:
# TODO: Remove try/except when dask>2024.11.2
from dask._task_spec import List as TaskList
except ImportError:

def TaskList(*x):
return list(x)


import cudf

from dask_cudf import QUERY_PLANNING_ON, _deprecated_api
Expand Down Expand Up @@ -447,7 +456,7 @@ def _task(self, name, index: int):
return Task(
name,
cudf.concat,
[expr._filtered_task(name, i) for i in bucket],
TaskList(*(expr._filtered_task(name, i) for i in bucket)),
)

pieces = []
Expand Down

0 comments on commit 2c89dba

Please sign in to comment.