Skip to content

Commit

Permalink
Add new dask_cudf.read_parquet API (#17250)
Browse files Browse the repository at this point in the history
It's time to clean up the `dask_cudf.read_parquet` API and prioritize GPU-specific optimizations. To this end, it makes sense to expose our own `read_parquet` API within Dask cuDF. 

**Notes**:

- The "new" `dask_cudf.read_parquet` API is only relevant when query-planning is enabled (the default).
- Using `filesystem="arrow"` now uses `cudf.read_parquet` when reading from local storage (rather than PyArrow).
- (specific to Dask cuDF): The default `blocksize` argument is now specific to the "smallest" NVIDIA device detected within the active dask cluster (or the first device visible to the the client). More specifically, we use `pynvml` to find this representative device size, and we set `blocksize` to be 1/32 this size.
  - The user may also pass in something like `blocksize=0.125` to use `1/8` the minimum device size (or `blocksize='1GiB'` to bypass the default logic altogether).
- (specific to Dask cuDF): When `blocksize` is `None`, we disable partition fusion at optimization time.
- (specific to Dask cuDF): When `blocksize` is **not** `None`, we use the parquet metadata from the first few files to inform partition fusion at optimization time (instead of a rough column-count ratio).

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)
  - Lawrence Mitchell (https://github.com/wence-)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #17250
  • Loading branch information
rjzamora authored Nov 20, 2024
1 parent a2a62a1 commit 3111aa4
Show file tree
Hide file tree
Showing 6 changed files with 784 additions and 193 deletions.
8 changes: 8 additions & 0 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,14 @@ def _process_dataset(
file_list = paths
if len(paths) == 1 and ioutils.is_directory(paths[0]):
paths = ioutils.stringify_pathlike(paths[0])
elif (
filters is None
and isinstance(dataset_kwargs, dict)
and dataset_kwargs.get("partitioning") is None
):
# Skip dataset processing if we have no filters
# or hive/directory partitioning to deal with.
return paths, row_groups, [], {}

# Convert filters to ds.Expression
if filters is not None:
Expand Down
3 changes: 2 additions & 1 deletion python/dask_cudf/dask_cudf/_legacy/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ def _read_paths(
)

dataset_kwargs = dataset_kwargs or {}
dataset_kwargs["partitioning"] = partitioning or "hive"
if partitions:
dataset_kwargs["partitioning"] = partitioning or "hive"

# Use cudf to read in data
try:
Expand Down
136 changes: 3 additions & 133 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,140 +700,10 @@ def from_dict(
)

@staticmethod
def read_parquet(path, *args, filesystem="fsspec", engine=None, **kwargs):
import dask_expr as dx
import fsspec

if (
isinstance(filesystem, fsspec.AbstractFileSystem)
or isinstance(filesystem, str)
and filesystem.lower() == "fsspec"
):
# Default "fsspec" filesystem
from dask_cudf._legacy.io.parquet import CudfEngine
def read_parquet(*args, **kwargs):
from dask_cudf.io.parquet import read_parquet as read_parquet_expr

_raise_unsupported_parquet_kwargs(**kwargs)
return _default_backend(
dx.read_parquet,
path,
*args,
filesystem=filesystem,
engine=CudfEngine,
**kwargs,
)

else:
# EXPERIMENTAL filesystem="arrow" support.
# This code path uses PyArrow for IO, which is only
# beneficial for remote storage (e.g. S3)

from fsspec.utils import stringify_path
from pyarrow import fs as pa_fs

# CudfReadParquetPyarrowFS requires import of distributed beforehand
# (See: https://github.com/dask/dask/issues/11352)
import distributed # noqa: F401
from dask.core import flatten
from dask.dataframe.utils import pyarrow_strings_enabled

from dask_cudf.io.parquet import CudfReadParquetPyarrowFS

if args:
raise ValueError(f"Unexpected positional arguments: {args}")

if not (
isinstance(filesystem, pa_fs.FileSystem)
or isinstance(filesystem, str)
and filesystem.lower() in ("arrow", "pyarrow")
):
raise ValueError(f"Unexpected filesystem value: {filesystem}.")

if not PYARROW_GE_15:
raise NotImplementedError(
"Experimental Arrow filesystem support requires pyarrow>=15"
)

if not isinstance(path, str):
path = stringify_path(path)

# Extract kwargs
columns = kwargs.pop("columns", None)
filters = kwargs.pop("filters", None)
categories = kwargs.pop("categories", None)
index = kwargs.pop("index", None)
storage_options = kwargs.pop("storage_options", None)
dtype_backend = kwargs.pop("dtype_backend", None)
calculate_divisions = kwargs.pop("calculate_divisions", False)
ignore_metadata_file = kwargs.pop("ignore_metadata_file", False)
metadata_task_size = kwargs.pop("metadata_task_size", None)
split_row_groups = kwargs.pop("split_row_groups", "infer")
blocksize = kwargs.pop("blocksize", "default")
aggregate_files = kwargs.pop("aggregate_files", None)
parquet_file_extension = kwargs.pop(
"parquet_file_extension", (".parq", ".parquet", ".pq")
)
arrow_to_pandas = kwargs.pop("arrow_to_pandas", None)
open_file_options = kwargs.pop("open_file_options", None)

# Validate and normalize kwargs
kwargs["dtype_backend"] = dtype_backend
if arrow_to_pandas is not None:
raise ValueError(
"arrow_to_pandas not supported for the 'cudf' backend."
)
if open_file_options is not None:
raise ValueError(
"The open_file_options argument is no longer supported "
"by the 'cudf' backend."
)
if filters is not None:
for filter in flatten(filters, container=list):
_, op, val = filter
if op == "in" and not isinstance(val, (set, list, tuple)):
raise TypeError(
"Value of 'in' filter must be a list, set or tuple."
)
if metadata_task_size is not None:
raise NotImplementedError(
"metadata_task_size is not supported when using the pyarrow filesystem."
)
if split_row_groups != "infer":
raise NotImplementedError(
"split_row_groups is not supported when using the pyarrow filesystem."
)
if parquet_file_extension != (".parq", ".parquet", ".pq"):
raise NotImplementedError(
"parquet_file_extension is not supported when using the pyarrow filesystem."
)
if blocksize is not None and blocksize != "default":
warnings.warn(
"blocksize is not supported when using the pyarrow filesystem."
"blocksize argument will be ignored."
)
if aggregate_files is not None:
warnings.warn(
"aggregate_files is not supported when using the pyarrow filesystem. "
"Please use the 'dataframe.parquet.minimum-partition-size' config."
"aggregate_files argument will be ignored."
)

return dx.new_collection(
CudfReadParquetPyarrowFS(
path,
columns=dx._util._convert_to_list(columns),
filters=filters,
categories=categories,
index=index,
calculate_divisions=calculate_divisions,
storage_options=storage_options,
filesystem=filesystem,
ignore_metadata_file=ignore_metadata_file,
arrow_to_pandas=arrow_to_pandas,
pyarrow_strings_enabled=pyarrow_strings_enabled(),
kwargs=kwargs,
_series=isinstance(columns, str),
)
)
return read_parquet_expr(*args, **kwargs)

@staticmethod
def read_csv(
Expand Down
13 changes: 9 additions & 4 deletions python/dask_cudf/dask_cudf/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from dask_cudf import _deprecated_api
from dask_cudf import _deprecated_api, QUERY_PLANNING_ON

from . import csv, orc, json, parquet, text # noqa: F401

Expand All @@ -22,9 +22,14 @@
read_text = _deprecated_api(
"dask_cudf.io.read_text", new_api="dask_cudf.read_text"
)
read_parquet = _deprecated_api(
"dask_cudf.io.read_parquet", new_api="dask_cudf.read_parquet"
)
if QUERY_PLANNING_ON:
read_parquet = parquet.read_parquet
else:
read_parquet = _deprecated_api(
"The legacy dask_cudf.io.read_parquet API",
new_api="dask_cudf.read_parquet",
rec="",
)
to_parquet = _deprecated_api(
"dask_cudf.io.to_parquet",
new_api="dask_cudf._legacy.io.parquet.to_parquet",
Expand Down
Loading

0 comments on commit 3111aa4

Please sign in to comment.