Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stackstac.stack to support one time coordinate per unique datetime? #66

Open
thuydotm opened this issue Aug 3, 2021 · 11 comments
Open

Comments

@thuydotm
Copy link

thuydotm commented Aug 3, 2021

Please see below an example running on the Planetary Computer using Esri 10m Land Cover data, where each STAC item is derived from a mosaic of many images. The output is a 4D cube with the time dimension is 4 and the time coordinates are just 2020-06-01T00:00:00 repeated.

The documentation clearly stated that ``time`` will be equal in length to the number of items you pass in, and indexed by STAC Item datetime. But in a more natural way, it's expected that the dataarray should have one time coordinate per unique datetime in the STAC items. Would stackstac.stack support this feature?

import numpy as np
import planetary_computer as pc
import pystac_client
import stackstac

catalog = pystac_client.Client.open(
    "https://planetarycomputer.microsoft.com/api/stac/v1/"
)
point = {"type": "Point", "coordinates": [-97.807733, 33.2133019]}

io_lulc_search = catalog.search(collections=["io-lulc"], intersects=point)
io_lulc_items = [pc.sign(item).to_dict() for item in io_lulc_search.get_items()]
data = stackstac.stack(io_lulc_items, assets=["data"], epsg=3857)

data.shape, np.unique(data.time)

Output: ((4, 1, 185098, 134598), array(['2020-06-01T00:00:00.000000000'], dtype='datetime64[ns]'))

@TomAugspurger
Copy link
Contributor

As a bit of context, the STAC items in this global IO-LULC dataset all have the same timestamp. AFAICT, if you wanted to load 100 of these items with stackstac, memory usage would be ~100x larger than necessary, since the array would be (100, 1, width, height), rather than (1, 1, width, height).

@gjoseph92
Copy link
Owner

gjoseph92 commented Aug 9, 2021

What about stackstac.mosaic(data)? It would certainly give you the output you're looking for. But it would probably use more memory than necessary, since you'd briefly create 100 layers of unnecessary NaNs before flattening them (as Tom said above). However, these NaNs would maybe not actually take 100x the memory, since we use this broadcast trick optimization for the out-of-bounds chunks:

# no dataset, or we didn't overlap it: return empty data.
# use the broadcast trick for even fewer memz
return np.broadcast_to(np.nan, (1, 1) + windows.shape(current_window))

I haven't tested the memory usage; I'd be curious what actually happens.

it's expected that the dataarray should have one time coordinate per unique datetime in the STAC items. Would stackstac.stack support this feature?

We could think about it, but it might be tricky to implement during the stacking itself. If a chunk of the array could be sourced from multiple GDAL datasets, that would require rethinking a lot of the dask graph generation logic.

I might prefer to implement this through optimized mosaic logic instead. The thing we want to skip is generating that all-NaN array for out-of-bounds chunks when we know it will immediately get dropped by a mosaic. There's already logic that avoids even opening an asset for spatial chunks that don't overlap with that asset:

# Drop asset if it doesn't overlap with the output bounds at all
if asset_bbox_proj is not None and not geom_utils.bounds_overlap(
asset_bbox_proj, bounds
):
# I've got a blank space in my ndarray, baby / And I'll write your name
continue

If we did #13, I have a feeling mosaic could look at each spatial chunk, pluck out only the assets that overlap with that spatial chunk, and mosaic only those together.
However, I'd want to test the current mosaic performance, because between the broadcast trick and short-circuiting the dataset open, performance might already be pretty good.


So if mosaic performance is good (or can be good), I might be in favor of a flatten_dates kwarg to stack, or even a separate function, which is just a convenience for
arr.groupby("time").apply(stackstac.mosaic) or something like that.

@TomAugspurger
Copy link
Contributor

So if mosaic performance is good (or can be good), I might be in favor of a flatten_dates kwarg to stack, or even a separate function, which is just a convenience for
arr.groupby("time").apply(stackstac.mosaic) or something like that.

👍 either of those sound perfect to me.

@gjoseph92
Copy link
Owner

@TomAugspurger or @thuydotm, any interest in trying out mosaic on these cases and seeing how performance is right now?

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Aug 16, 2021

I think we can conclude that stackstac(...).pipe(stackstac.mosaic) works well enough. I was able to build a DataArray with this property (all the same datetime) whose size in memory would be 860.26 GiB. I ran it through stackstac.mosaic, which cut things down to 215.06 GiB and then persisted the result on a cluster with ~256GB of memory. Here's a performance report.

https://gistcdn.githack.com/TomAugspurger/587bd0e00860b52be9c3830e3bbca35c/raw/17323077f808cfae02ef7dad79d1648cd8591499/stackstac-mosaic.html

There is a decent amount of communication, but I'm not sure how concerned to be about that... IIUC, mosaic might need to transfer chunks from one worker to another. I do have a theoretical (unconfirmed) concern that the broadcasting trick with np.nan doesn't play well distributed's worker assignment. When distributed is in decide_worker, do we know what it thinks the size of the all-NaN chunk is? Hopefully it's the reduced size, and not the size of the full array.

Here's the code I ran

import stackstac
from pystac_client import Client
import planetary_computer as pc

bounds = (120.9, 14.8, 120.91, 14.81)

catalog = Client.open("https://planetarycomputer.microsoft.com/api/stac/v1")

search = catalog.search(collections=["io-lulc"], bbox=bounds)

# Check how many items were returned
items = list(search.get_items())
print(f"Returned {len(items)} Items")

signed_items = [pc.sign(item).to_dict() for item in items]

ds = stackstac.stack(signed_items, epsg=32650, chunksize=4096)

# cluster
from dask_gateway import GatewayCluster

cluster = GatewayCluster()
cluster.scale(28)
client = cluster.get_client()

ds2 = stackstac.mosaic(ds).persist()

@gjoseph92
Copy link
Owner

Nice to not see any spilling to disk in that performance report!

IIUC, mosaic might need to transfer chunks from one worker to another

It definitely would, just like any other reduction (mean, sum, etc.). I haven't looked at the graphs it makes yet, but they may be a little weird, since it's not a tree-reduce graph, but conceptually more like functools.reduce (since a mosaic is order-dependent, you have to have the mosaic of the previous layers in order to do the next one). There might be a way to improve the parallelism by redesigning it to be tree-like, but preserving order. This isn't memory-related, just another optimization idea. (Not that it looks like that task stream needs any more parallelism?)

I do have a theoretical (unconfirmed) concern that the broadcasting trick with np.nan doesn't play well distributed's worker assignment

Seems like it does!

In [1]: import numpy as np

In [2]: from dask.sizeof import sizeof

In [3]: trick = np.broadcast_to(np.nan, (2048, 2048))

In [4]: sizeof(trick)
Out[4]: 8

In [5]: full = np.full_like(trick, np.nan)

In [6]: sizeof(full)
Out[6]: 33554432

So hopefully decide_worker would schedule the where on the worker that already had one of the real input layers, instead of the one where the all-NaN chunk lived.

One interesting thing to look at would be how the initial chunks of the dataset get prioritized by dask.order. With dask/distributed#4967, if we can get chunks that are spatially next to each other to also have similar priorities, they'll mostly get scheduled to the same worker, which will reduce the transfer necessary for the mosaic. This could actually be an argument for an optimization that skips the all-NaN chunks when building the mosaic graph, since I think dask.order would end up better prioritizing that graph since those irrelevant tasks won't take up space.

@RichardScottOZ
Copy link
Contributor

Thanks! I had been looking at that landcover dataset a few weeks ago and wondering about an approach.

@gjoseph92
Copy link
Owner

Following up; is there anything we want to do here?

Do we want to add any convenience functionality:

So if mosaic performance is good (or can be good), I might be in favor of a flatten_dates kwarg to stack, or even a separate function, which is just a convenience for arr.groupby("time").apply(stackstac.mosaic) or something like that.

Or maybe some documentation about this?

Or just close?

@RichardScottOZ
Copy link
Contributor

Convenience functions are good I think Gabe.

@hrodmn
Copy link

hrodmn commented Jun 10, 2022

+1 for a flatten_dates arg to stackstac.stack, though it will be important to make sure the the raster nodata and stackstac.stack fill_value parameters are being used correctly.

@gjoseph92
Copy link
Owner

I'm leaning towards that too. I think that with #116, some of the hard part is already done. The process would basically be:

  • Calculate the grouping based on STAC metadata here. You want to end up with something that could be a chunks argument for the time dimension (like (3, 1, 1, 2, 5, 4, ...) where each number is the number of subsequent items that belong to the same group, like a run-length encoding)
  • Pass that into items_to_dask as flatten_time=
  • Use it as the normalized time chunks. There will need to be some validation that you can't specify chunking for the time dimension if you also specify flatten_dates
  • If flatten_time is passed, before the fetch_raster_window blockwise part, replace chunks with a copy of the tuple where the first element is (1,) * len(chunks[0]) (assuming chunks[0] == flatten_time currently). This will track the fact that unlike the asset table, where each chunk contains N entries along the time dimension, every chunk in the final array will have 1 entry along time (because they'll all be flattened)
  • Pass a flatten=bool(flatten_time) argument into the fetch_raster_window blockwise generation. Also make sure to include it in the tokenization!
  • Inside fetch_raster_window will be the biggest changes. It may make sense to have two different functions, or at least two different helper functions that are called within the loop. You'll need to accept the flatten: bool argument, and handle it. Rather than allocating the full-size array, it'll obviously only be 1-length. And when looping for assets, you'll keep mosaicing the new result with the previous, and obviously short-circuiting and not loading any more assets as soon as every pixel is filled in.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants