Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize idxmin, idxmax with dask #9800

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions xarray/core/computation.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@
from xarray.core.merge import merge_attrs, merge_coordinates_without_align
from xarray.core.options import OPTIONS, _get_keep_attrs
from xarray.core.types import Dims, T_DataArray
from xarray.core.utils import is_dict_like, is_scalar, parse_dims_as_set, result_name
from xarray.core.utils import (
is_dict_like,
is_scalar,
parse_dims_as_set,
result_name,
)
from xarray.core.variable import Variable
from xarray.namedarray.parallelcompat import get_chunked_array_type
from xarray.namedarray.pycompat import is_chunked_array
Expand Down Expand Up @@ -2165,18 +2170,14 @@ def _calc_idxminmax(
indx = func(array, dim=dim, axis=None, keep_attrs=keep_attrs, skipna=skipna)

# Handle chunked arrays (e.g. dask).
coord = array[dim]._variable.to_base_variable()
if is_chunked_array(array.data):
chunkmanager = get_chunked_array_type(array.data)
chunks = dict(zip(array.dims, array.chunks, strict=True))
dask_coord = chunkmanager.from_array(array[dim].data, chunks=chunks[dim])
data = dask_coord[duck_array_ops.ravel(indx.data)]
res = indx.copy(data=duck_array_ops.reshape(data, indx.shape))
# we need to attach back the dim name
res.name = dim
else:
res = array[dim][(indx,)]
# The dim is gone but we need to remove the corresponding coordinate.
del res.coords[dim]
coord_array = chunkmanager.from_array(
array[dim].data, chunks=((array.sizes[dim],),)
)
coord = coord.copy(data=coord_array)
res = indx._replace(coord[(indx.variable,)]).rename(dim)

if skipna or (skipna is None and array.dtype.kind in na_dtypes):
# Put the NaN values back in after removing them
Expand Down
49 changes: 42 additions & 7 deletions xarray/core/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import enum
import functools
import math
import operator
from collections import Counter, defaultdict
from collections.abc import Callable, Hashable, Iterable, Mapping
Expand Down Expand Up @@ -472,12 +473,12 @@ def __init__(self, key: tuple[slice | np.ndarray[Any, np.dtype[np.generic]], ...
for k in key:
if isinstance(k, slice):
k = as_integer_slice(k)
elif is_duck_dask_array(k):
raise ValueError(
"Vectorized indexing with Dask arrays is not supported. "
"Please pass a numpy array by calling ``.compute``. "
"See https://github.com/dask/dask/issues/8958."
)
# elif is_duck_dask_array(k):
# raise ValueError(
# "Vectorized indexing with Dask arrays is not supported. "
# "Please pass a numpy array by calling ``.compute``. "
# "See https://github.com/dask/dask/issues/8958."
# )
elif is_duck_array(k):
if not np.issubdtype(k.dtype, np.integer):
raise TypeError(
Expand Down Expand Up @@ -1607,6 +1608,18 @@ def transpose(self, order):
return xp.permute_dims(self.array, order)


def _apply_vectorized_indexer_dask_wrapper(indices, coord):
from xarray.core.indexing import (
VectorizedIndexer,
apply_indexer,
as_indexable,
)

return apply_indexer(
as_indexable(coord), VectorizedIndexer((indices.squeeze(axis=-1),))
)


class DaskIndexingAdapter(ExplicitlyIndexedNDArrayMixin):
"""Wrap a dask array to support explicit indexing."""

Expand All @@ -1630,7 +1643,29 @@ def _oindex_get(self, indexer: OuterIndexer):
return value

def _vindex_get(self, indexer: VectorizedIndexer):
return self.array.vindex[indexer.tuple]
try:
return self.array.vindex[indexer.tuple]
except IndexError as e:
# TODO: upstream to dask
has_dask = any(is_duck_dask_array(i) for i in indexer.tuple)
if not has_dask or (has_dask and len(indexer.tuple) > 1):
raise e
if math.prod(self.array.numblocks) > 1 or self.array.ndim > 1:
raise e
(idxr,) = indexer.tuple
if idxr.ndim == 0:
return self.array[idxr.data]
else:
import dask.array

return dask.array.map_blocks(
_apply_vectorized_indexer_dask_wrapper,
idxr[..., np.newaxis],
self.array,
chunks=idxr.chunks,
drop_axis=-1,
dtype=self.array.dtype,
)

def __getitem__(self, indexer: ExplicitIndexer):
self._check_and_raise_if_non_basic_indexer(indexer)
Expand Down
13 changes: 13 additions & 0 deletions xarray/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -1814,3 +1814,16 @@ def test_minimize_graph_size():
# all the other dimensions.
# e.g. previously for 'x', actual == numchunks['y'] * numchunks['z']
assert actual == numchunks[var], (actual, numchunks[var])


def test_idxmin_chunking():
# GH
x, y, t = 100, 100, 10
rang = np.random.randn(t * x * y)
da = xr.DataArray(
rang.reshape(t, x, y), coords={"time": range(t), "x": range(x), "y": range(y)}
)
da = da.chunk(dict(time=-1, x=25, y=25))
actual = da.idxmin("time")
assert actual.chunksizes == {k: da.chunksizes[k] for k in ["x", "y"]}
assert_identical(actual, da.compute().idxmin("time"))
15 changes: 8 additions & 7 deletions xarray/tests/test_dataarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -4949,7 +4949,13 @@ def test_argmax(

assert_identical(result2, expected2)

@pytest.mark.parametrize("use_dask", [True, False])
@pytest.mark.parametrize(
"use_dask",
[
pytest.param(True, marks=requires_dask),
False,
],
)
def test_idxmin(
self,
x: np.ndarray,
Expand All @@ -4958,16 +4964,11 @@ def test_idxmin(
nanindex: int | None,
use_dask: bool,
) -> None:
if use_dask and not has_dask:
pytest.skip("requires dask")
if use_dask and x.dtype.kind == "M":
pytest.xfail("dask operation 'argmin' breaks when dtype is datetime64 (M)")
ar0_raw = xr.DataArray(
x, dims=["x"], coords={"x": np.arange(x.size) * 4}, attrs=self.attrs
)

if use_dask:
ar0 = ar0_raw.chunk({})
ar0 = ar0_raw.chunk()
else:
ar0 = ar0_raw

Expand Down
3 changes: 1 addition & 2 deletions xarray/tests/test_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,8 +1010,7 @@ def test_vectorized_indexing_dask_array():
coords={"y": range(4), "x": range(2)},
dims=("y", "x"),
)
with pytest.raises(ValueError, match="Vectorized indexing with Dask arrays"):
darr[indexer.chunk({"y": 2})]
darr[indexer.chunk({"y": 2})]


@requires_dask
Expand Down
Loading