Skip to content

Commit

Permalink
add aggregate temporal and tests (#287)
Browse files Browse the repository at this point in the history
* add aggregate temporal and tests

* update xarray groupby function

* update tests

* update tests

* update tests

* update tests

* convert pandas to numpy datetime to fix test

* fix test
  • Loading branch information
ValentinaHutter authored Oct 25, 2024
1 parent c001517 commit 1011a6c
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 10 deletions.
38 changes: 33 additions & 5 deletions openeo_processes_dask/process_implementations/cubes/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def aggregate_temporal(
raise DimensionNotAvailable(
f"A dimension with the specified name: {dimension} does not exist."
)
applicable_temporal_dimension = dimension
t = dimension
else:
if not temporal_dims:
raise DimensionNotAvailable(
Expand All @@ -53,13 +53,41 @@ def aggregate_temporal(
raise TooManyDimensions(
f"The data cube contains multiple temporal dimensions: {temporal_dims}. The parameter `dimension` must be specified."
)
applicable_temporal_dimension = temporal_dims[0]
t = temporal_dims[0]

intervals_np = np.array(intervals, dtype=np.datetime64).astype(float)
intervals_flat = np.reshape(
intervals_np, np.shape(intervals_np)[0] * np.shape(intervals_np)[1]
)

aggregated_data = data.groupby_bins(
group=applicable_temporal_dimension, labels=labels
if not labels:
labels = np.array(intervals, dtype="datetime64[s]").astype(str)[:, 0]
if (intervals_np[1:, 0] < intervals_np[:-1, 1]).any():
raise NotImplementedError(
"Aggregating data for overlapping time ranges is not implemented. "
)

mask = np.zeros((len(labels) * 2) - 2).astype(bool)
mask[1::2] = np.isin(intervals_np[1:, 0], intervals_np[:-1, 1])
mask = np.append(mask, np.array([False, True]))

labels_nans = np.arange(len(labels) * 2).astype(str)
labels_nans[::2] = labels
labels_nans = labels_nans[~mask]

intervals_flat = np.unique(intervals_flat)
t_coords = data[t].values.astype(str)
data[t] = np.array(t_coords, dtype="datetime64[s]").astype(float)
grouped_data = data.groupby_bins(t, bins=intervals_flat)
positional_parameters = {"data": 0}
groups = grouped_data.reduce(
reducer, keep_attrs=True, positional_parameters=positional_parameters
)
groups[t + "_bins"] = labels_nans
data_agg_temp = groups.sel({t + "_bins": labels})
data_agg_temp = data_agg_temp.rename({t + "_bins": t})

raise NotImplementedError("aggregate_temporal is currently not implemented")
return data_agg_temp


def aggregate_temporal_period(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "openeo-processes-dask"
version = "2024.10.1"
version = "2024.10.2"
description = "Python implementations of many OpenEO processes, dask-friendly by default."
authors = ["Lukas Weidenholzer <[email protected]>", "Sean Hoyal <[email protected]>", "Valentina Hutter <[email protected]>"]
maintainers = ["EODC Staff <[email protected]>"]
Expand Down
58 changes: 54 additions & 4 deletions tests/test_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,66 @@
import xvec
from openeo_pg_parser_networkx.pg_schema import ParameterReference, TemporalInterval

from openeo_processes_dask.process_implementations.cubes.aggregate import (
aggregate_spatial,
aggregate_temporal_period,
)
from openeo_processes_dask.process_implementations.cubes.aggregate import *
from openeo_processes_dask.process_implementations.cubes.reduce import reduce_dimension
from openeo_processes_dask.process_implementations.math import mean
from tests.general_checks import assert_numpy_equals_dask_numpy, general_output_checks
from tests.mockdata import create_fake_rastercube


@pytest.mark.parametrize("size", [(6, 5, 100, 4)])
@pytest.mark.parametrize("dtype", [np.float64])
@pytest.mark.parametrize(
"temporal_extent,intervals,labels, expected",
[
(
["2018-01-01T00:00:00", "2019-01-01T00:00:00"],
[
["2018-01-01T12:00:00", "2018-06-01T12:00:00"],
["2018-07-01T12:00:00", "2018-12-01T12:00:00"],
],
["half-1", "half-2"],
2,
)
],
)
def test_aggregate_temporal(
temporal_extent,
intervals,
labels,
expected,
bounding_box,
random_raster_data,
process_registry,
):
""""""
input_cube = create_fake_rastercube(
data=random_raster_data,
spatial_extent=bounding_box,
temporal_extent=TemporalInterval.parse_obj(temporal_extent),
bands=["B02", "B03", "B04", "B08"],
)

reducer = partial(
process_registry["mean"].implementation,
data=ParameterReference(from_parameter="data"),
)

output_cube = aggregate_temporal(
data=input_cube, intervals=intervals, reducer=reducer, labels=labels
)

general_output_checks(
input_cube=input_cube,
output_cube=output_cube,
verify_attrs=True,
verify_crs=True,
)

assert len(output_cube.t) == expected
assert isinstance(output_cube.t.values[0], str)


@pytest.mark.parametrize("size", [(6, 5, 4, 4)])
@pytest.mark.parametrize("dtype", [np.float64])
@pytest.mark.parametrize(
Expand Down

0 comments on commit 1011a6c

Please sign in to comment.