Skip to content

Commit

Permalink
Issue #12 initial version of a split_temporal function
Browse files Browse the repository at this point in the history
  • Loading branch information
VincentVerelst committed Jan 30, 2024
1 parent 2ea5f5f commit ed0a02c
Show file tree
Hide file tree
Showing 20 changed files with 333 additions and 272 deletions.
4 changes: 1 addition & 3 deletions src/openeo_gfmap/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ def vito_connection(capfd: Optional = None) -> openeo.Connection:

def cdse_connection(capfd: Optional = None) -> openeo.Connection:
"""Performs a connection to the CDSE backend using oidc authentication."""
connection = openeo.connect(
"https://openeo.dataspace.copernicus.eu/openeo/1.2"
)
connection = openeo.connect("https://openeo.dataspace.copernicus.eu/openeo/1.2")
if capfd is not None:
with capfd.disabled():
# Temporarily disable output capturing, to make sure that OIDC device
Expand Down
6 changes: 4 additions & 2 deletions src/openeo_gfmap/fetching/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from .s2 import build_sentinel2_l2a_extractor

__all__ = [
"build_sentinel2_l2a_extractor", "CollectionFetcher", "FetchType",
"build_sentinel1_grd_extractor"
"build_sentinel2_l2a_extractor",
"CollectionFetcher",
"FetchType",
"build_sentinel1_grd_extractor",
]
3 changes: 1 addition & 2 deletions src/openeo_gfmap/fetching/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ def load_collection(
pre_mask = params.get("pre_mask", None)
if pre_mask is not None:
assert isinstance(pre_mask, openeo.DataCube), (
f"The 'pre_mask' parameter must be an openeo datacube, "
f"got {pre_mask}."
f"The 'pre_mask' parameter must be an openeo datacube, " f"got {pre_mask}."
)
cube = cube.mask(pre_mask.resample_cube_spatial(cube))

Expand Down
26 changes: 9 additions & 17 deletions src/openeo_gfmap/fetching/s1.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def get_s1_grd_default_processor(
def s1_grd_default_processor(cube: openeo.DataCube, **params):
"""Default collection preprocessing method.
This method performs:
* Compute the backscatter of all the S1 products. By default, the
"sigma0-ellipsoid" method is used with "COPERNICUS_30" DEM, but those
can be changed by specifying "coefficient" and "elevation_model" in
Expand All @@ -113,9 +113,7 @@ def s1_grd_default_processor(cube: openeo.DataCube, **params):
)

cube = resample_reproject(
cube,
params.get("target_resolution", 10.0),
params.get("target_crs", None)
cube, params.get("target_resolution", 10.0), params.get("target_crs", None)
)

cube = rename_bands(cube, BASE_SENTINEL1_GRD_MAPPING)
Expand All @@ -127,35 +125,29 @@ def s1_grd_default_processor(cube: openeo.DataCube, **params):

SENTINEL1_GRD_BACKEND_MAP = {
Backend.TERRASCOPE: {
"default": partial(
get_s1_grd_default_fetcher, collection_name="SENTINEL1_GRD"
),
"default": partial(get_s1_grd_default_fetcher, collection_name="SENTINEL1_GRD"),
"preprocessor": partial(
get_s1_grd_default_processor, collection_name="SENTINEL1_GRD"
)
),
},
Backend.CDSE: {
"default": partial(
get_s1_grd_default_fetcher, collection_name="SENTINEL1_GRD"
),
"default": partial(get_s1_grd_default_fetcher, collection_name="SENTINEL1_GRD"),
"preprocessor": partial(
get_s1_grd_default_processor, collection_name="SENTINEL1_GRD"
)
}
),
},
}


def build_sentinel1_grd_extractor(
backend_context: BackendContext, bands: list, fetch_type: FetchType, **params
) -> CollectionFetcher:
""" Creates a S1 GRD collection extractor for the given backend."""
"""Creates a S1 GRD collection extractor for the given backend."""
backend_functions = SENTINEL1_GRD_BACKEND_MAP.get(backend_context.backend)

fetcher, preprocessor = (
backend_functions["default"](fetch_type=fetch_type),
backend_functions["preprocessor"](fetch_type=fetch_type),
)

return CollectionFetcher(
backend_context, bands, fetcher, preprocessor, **params
)
return CollectionFetcher(backend_context, bands, fetcher, preprocessor, **params)
2 changes: 1 addition & 1 deletion src/openeo_gfmap/preprocessing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@
"get_bap_score",
"get_bap_mask",
"bap_masking",
]
]
65 changes: 34 additions & 31 deletions src/openeo_gfmap/preprocessing/cloudmasking.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@
SCL_HARMONIZED_NAME: str = "S2-SCL"
BAPSCORE_HARMONIZED_NAME: str = "S2-BAPSCORE"


def mask_scl_dilation(cube: openeo.DataCube, **params: dict) -> openeo.DataCube:
"""Creates a mask from the SCL, dilates it and applies the mask to the optical
bands of the datacube. The other bands such as DEM, SAR and METEO will not
be affected by the mask.
"""
# Asserts if the SCL layer exists
assert SCL_HARMONIZED_NAME in cube.metadata.band_names, (
f"The SCL band ({SCL_HARMONIZED_NAME}) is not present in the datacube."
)
assert (
SCL_HARMONIZED_NAME in cube.metadata.band_names
), f"The SCL band ({SCL_HARMONIZED_NAME}) is not present in the datacube."

kernel1_size = params.get("kernel1_size", 17)
kernel2_size = params.get("kernel2_size", 3)
Expand All @@ -28,9 +29,7 @@ def mask_scl_dilation(cube: openeo.DataCube, **params: dict) -> openeo.DataCube:

# Only applies the filtering to the optical part of the cube
optical_cube = cube.filter_bands(
bands=list(
filter(lambda band: band.startswith("S2"), cube.metadata.band_names)
)
bands=list(filter(lambda band: band.startswith("S2"), cube.metadata.band_names))
)

nonoptical_cube = cube.filter_bands(
Expand All @@ -47,14 +46,15 @@ def mask_scl_dilation(cube: openeo.DataCube, **params: dict) -> openeo.DataCube:
kernel2_size=kernel2_size,
mask1_values=[2, 4, 5, 6, 7],
mask2_values=[3, 8, 9, 10, 11],
erosion_kernel_size=erosion_kernel_size
erosion_kernel_size=erosion_kernel_size,
)

if len(nonoptical_cube.metadata.band_names) == 0:
return optical_cube

return optical_cube.merge_cubes(nonoptical_cube)


def get_bap_score(cube: openeo.DataCube, **params: dict) -> openeo.DataCube:
"""Calculates the Best Available Pixel (BAP) score for the given datacube,
computed from the SCL layer.
Expand All @@ -70,7 +70,7 @@ def get_bap_score(cube: openeo.DataCube, **params: dict) -> openeo.DataCube:
* Coverage Score: Per date, the percentage of all pixels that are classified
as a cloud over the entire spatial extent is calculated. The Coverage
Score is then equal to 1 - the cloud percentage.
* Date Score: In order to favor pixels that are observed in the middle of a
* Date Score: In order to favor pixels that are observed in the middle of a
month, a date score is calculated, which follows a Gaussian shape. I.e.
the largest scores are given for days in the middle of the month, the
lowest scores are given for days at the beginning and end of the month.
Expand Down Expand Up @@ -113,23 +113,30 @@ def get_bap_score(cube: openeo.DataCube, **params: dict) -> openeo.DataCube:
kernel2_size=kernel2_size,
mask1_values=[2, 4, 5, 6, 7],
mask2_values=[3, 8, 9, 10, 11],
erosion_kernel_size=erosion_kernel_size
erosion_kernel_size=erosion_kernel_size,
)

# Replace NaN to 0 to avoid issues in the UDF
scl_cube = scl_cube.apply(lambda x: if_(is_nan(x), 0, x))

score = scl_cube.apply_neighborhood(
process=openeo.UDF.from_file(str(udf_path)),
size=[{'dimension': 'x', 'unit': 'px', 'value': 256}, {'dimension': 'y', 'unit': 'px', 'value': 256}],
overlap=[{'dimension': 'x', 'unit': 'px', 'value': 16}, {'dimension': 'y', 'unit': 'px', 'value': 16}],
size=[
{"dimension": "x", "unit": "px", "value": 256},
{"dimension": "y", "unit": "px", "value": 256},
],
overlap=[
{"dimension": "x", "unit": "px", "value": 16},
{"dimension": "y", "unit": "px", "value": 16},
],
)

score = score.rename_labels('bands', [BAPSCORE_HARMONIZED_NAME])
score = score.rename_labels("bands", [BAPSCORE_HARMONIZED_NAME])

# Merge the score to the scl cube
return score


def get_bap_mask(cube: openeo.DataCube, period: Union[str, list], **params: dict):
"""Computes the bap score and masks the optical bands of the datacube using
the best scores for each pixel on a given time period. This method both
Expand All @@ -155,13 +162,14 @@ def get_bap_mask(cube: openeo.DataCube, period: Union[str, list], **params: dict
The datacube with the BAP mask applied.
"""
# Checks if the S2-SCL band is present in the datacube
assert SCL_HARMONIZED_NAME in cube.metadata.band_names, (
f"The {SCL_HARMONIZED_NAME} band is not present in the datacube."
)
assert (
SCL_HARMONIZED_NAME in cube.metadata.band_names
), f"The {SCL_HARMONIZED_NAME} band is not present in the datacube."

bap_score = get_bap_score(cube, **params)

if isinstance(period, str):

def max_score_selection(score):
max_score = score.max()
return score.array_apply(lambda x: x != max_score)
Expand All @@ -171,27 +179,26 @@ def max_score_selection(score):
size=[
{"dimension": "x", "unit": "px", "value": 1},
{"dimension": "y", "unit": "px", "value": 1},
{"dimension": "t", "value": period}
{"dimension": "t", "value": period},
],
overlap=[]
overlap=[],
)
elif isinstance(period, list):
udf_path = Path(__file__).parent / "udf_rank.py"
rank_mask = bap_score.apply_neighborhood(
process=openeo.UDF.from_file(
str(udf_path),
context={"intervals": period}
),
process=openeo.UDF.from_file(str(udf_path), context={"intervals": period}),
size=[
{'dimension': 'x', 'unit': 'px', 'value': 256},
{'dimension': 'y', 'unit': 'px', 'value': 256}
{"dimension": "x", "unit": "px", "value": 256},
{"dimension": "y", "unit": "px", "value": 256},
],
overlap=[],
)
else:
raise ValueError(f"'period' must be a string or a list of dates (in YYYY-mm-dd format), got {period}.")
raise ValueError(
f"'period' must be a string or a list of dates (in YYYY-mm-dd format), got {period}."
)

return rank_mask.rename_labels('bands', ['S2-BAPMASK'])
return rank_mask.rename_labels("bands", ["S2-BAPMASK"])


def bap_masking(cube: openeo.DataCube, period: Union[str, list], **params: dict):
Expand All @@ -213,9 +220,7 @@ def bap_masking(cube: openeo.DataCube, period: Union[str, list], **params: dict)
The datacube with the BAP mask applied.
"""
optical_cube = cube.filter_bands(
bands=list(
filter(lambda band: band.startswith("S2"), cube.metadata.band_names)
)
bands=list(filter(lambda band: band.startswith("S2"), cube.metadata.band_names))
)

nonoptical_cube = cube.filter_bands(
Expand All @@ -226,9 +231,7 @@ def bap_masking(cube: openeo.DataCube, period: Union[str, list], **params: dict)

rank_mask = get_bap_mask(optical_cube, period, **params)

optical_cube = optical_cube.mask(
rank_mask.resample_cube_spatial(cube)
)
optical_cube = optical_cube.mask(rank_mask.resample_cube_spatial(cube))

# Do not merge if bands are empty!
if len(nonoptical_cube.metadata.band_names) == 0:
Expand Down
17 changes: 13 additions & 4 deletions src/openeo_gfmap/preprocessing/compositing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,25 @@
import openeo


def median_compositing(cube: openeo.DataCube, period: Union[str, list]) -> openeo.DataCube:
def median_compositing(
cube: openeo.DataCube, period: Union[str, list]
) -> openeo.DataCube:
"""Perfrom median compositing on the given datacube."""
if isinstance(period, str):
return cube.aggregate_temporal_period(period=period, reducer="median", dimension="t")
return cube.aggregate_temporal_period(
period=period, reducer="median", dimension="t"
)
elif isinstance(period, list):
return cube.aggregate_temporal(intervals=period, reducer="median", dimension="t")
return cube.aggregate_temporal(
intervals=period, reducer="median", dimension="t"
)


def mean_compositing(cube: openeo.DataCube, period: str) -> openeo.DataCube:
"""Perfrom mean compositing on the given datacube."""
if isinstance(period, str):
return cube.aggregate_temporal_period(period=period, reducer="mean", dimension="t")
return cube.aggregate_temporal_period(
period=period, reducer="mean", dimension="t"
)
elif isinstance(period, list):
return cube.aggregate_temporal(intervals=period, reducer="mean", dimension="t")
8 changes: 4 additions & 4 deletions src/openeo_gfmap/preprocessing/interpolation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import openeo


def linear_interpolation(cube: openeo.DataCube,) -> openeo.DataCube:
def linear_interpolation(
cube: openeo.DataCube,
) -> openeo.DataCube:
"""Perform linear interpolation on the given datacube."""
return cube.apply_dimension(
dimension="t", process="array_interpolate_linear"
)
return cube.apply_dimension(dimension="t", process="array_interpolate_linear")
9 changes: 3 additions & 6 deletions src/openeo_gfmap/preprocessing/udf_rank.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,23 @@ def apply_datacube(cube: XarrayDataCube, context: dict) -> XarrayDataCube:
"""
# First check if the period is defined in the context
intervals = context.get("intervals", None)
array = cube.get_array().transpose('t', 'bands', 'y', 'x')
array = cube.get_array().transpose("t", "bands", "y", "x")

bap_score = array.sel(bands="S2-BAPSCORE")

def select_maximum(score: xr.DataArray):
max_score = score.max(dim="t")
return score == max_score


if isinstance(intervals, str):
raise NotImplementedError(
"Period as string is not implemented yet, please provide a list of interval tuples."
)
elif isinstance(intervals, list):
# Convert YYYY-mm-dd to datetime64 objects
time_bins = [np.datetime64(interval[0]) for interval in intervals]

rank_mask = bap_score.groupby_bins('t', bins=time_bins).map(
select_maximum
)

rank_mask = bap_score.groupby_bins("t", bins=time_bins).map(select_maximum)
else:
raise ValueError("Period is not defined in the UDF. Cannot run it.")

Expand Down
Loading

0 comments on commit ed0a02c

Please sign in to comment.