Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable async chunk reading #106

Merged
merged 9 commits into from
Apr 12, 2023
Merged

Enable async chunk reading #106

merged 9 commits into from
Apr 12, 2023

Conversation

meggart
Copy link
Collaborator

@meggart meggart commented Feb 16, 2023

This is an attempt at implementing asynchronous reading of when multiple chunks are acessed at the same time, as a follow-up to #105 . @agoodm and @alex-s-gardner could you maybe give this branch a try and check if you see a speed-up as well? I also added an inplace method for Blosc decompression which reduces a few allocations and speeds up reading time by another second so that I am very close to python reading times now. Here are my tests:

Zarr.Blosc.set_num_threads(8) # To match blosc defaults for python
g = zopen("https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1", consolidated=true)
s = g["analysed_sst"]

Zarr.concurrent_io_tasks[] = 1
@time res = s[1:3600, 1:1799, 1:500];

Zarr.concurrent_io_tasks[] = 10
@time res = s[1:3600, 1:1799, 1:500];

Zarr.concurrent_io_tasks[] = 20
@time res = s[1:3600, 1:1799, 1:500];

returns

1

 42.317771 seconds (241.77 k allocations: 6.330 GiB, 0.76% gc time, 0.13% compilation time: 100% of which was recompilation)

10

 13.646421 seconds (156.78 k allocations: 6.885 GiB, 6.23% gc time)

20

  9.090681 seconds (171.24 k allocations: 7.475 GiB, 18.24% gc time)

@coveralls
Copy link

coveralls commented Feb 16, 2023

Pull Request Test Coverage Report for Build 4616813855

Warning: This coverage report may be inaccurate.

This pull request's base commit is no longer the HEAD commit of its target branch. This means it includes changes from outside the original pull request, including, potentially, unrelated coverage changes.

Details

  • 145 of 159 (91.19%) changed or added relevant lines in 8 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage increased (+0.7%) to 86.483%

Changes Missing Coverage Covered Lines Changed/Added Lines %
src/ZGroup.jl 1 2 50.0%
src/ZArray.jl 109 112 97.32%
src/Storage/Storage.jl 28 38 73.68%
Totals Coverage Status
Change from base Build 3711894192: 0.7%
Covered Lines: 723
Relevant Lines: 836

💛 - Coveralls

@agoodm
Copy link

agoodm commented Feb 16, 2023

@meggart Many thanks for attempting to address this so quickly!

I just gave it a quick test. Indeed this is a massive improvement compared to before. Note on the machine I am testing on I have gotten the most optimal results while setting the number of concurrent tasks to just 4. I guess this number would depend on your bandwidth?

julia> @time res = s[1:3600, 1:1799, 1:500];
  6.620273 seconds (87.20 k allocations: 6.523 GiB, 8.24% gc time)

Anyways we are now much closer to bridging the gap, but this is still about 2x slower than the results I was getting with python. However this particular dataset has very small chunk sizes (< 1 MB). Many other datasets (including the one I'll eventually be using for my use-case) should have much larger chunk sizes than this, so it's possible that the results will be more comparable for those cases.

@meggart
Copy link
Collaborator Author

meggart commented Feb 21, 2023

It is really hard for me to reproduce, since for me your python version takes 11s so I could already beat it on my system. I am not sure what else to optimize. An open question still remains what we should set as the default for Zarr.concurrent_io_tasks since the optimum here is determined by many factors such as available memory, gc pressure, bandwidth, chunk sizes etc. Maybe some benchmarks from people with different systems would be good, I will perform a few tests on a computing cluster with shared file system, but also on a local work station.

Also pinging @gdkrmr @twinGu @felixcremer in case they are interested to do some tests.

@martin-gutwin
Copy link
Contributor

martin-gutwin commented Feb 21, 2023

Hi all,
I am currently setting up my new personal laptop anyways, so I would be happy to run the tests from #105 (python) and #106. But I will only have proper time towards the end of the week I'm afraid.

@gdkrmr
Copy link
Contributor

gdkrmr commented Feb 21, 2023

On a 100Mbit connection I can confirm that it gets much faster with more tasks. Loading from a local SSD, loading times increase a bit with more tasks, here the number of threads for Blosc is much more important.

  • It would be nice if we could set the number of tasks in an environmental variable.

@agoodm
Copy link

agoodm commented Feb 21, 2023

@meggart On the python side I am testing in a python 3.10 conda environment with the latest versions of zarr and fsspec installed. My Julia installation is also fairly recent (1.8.5). One thing I have noticed in my testing is that changing the number of threads in blosc makes a much more dramatic difference for the python version. When changing from 8 to 1, it's 2-3x slower which makes it closer (but still a little bit faster for me at least) than Julia. Changing this in the Julia version to any number seems to make practically no difference for me on the other hand.

@alex-s-gardner
Copy link
Contributor

@meggart Great to see this moving forward! Thanks for all of your efforts on this.

Here I've tested 2 different files with differing chunking schemes, both hosted on S3.

  • 10 concurrent tasks much faster than 1
  • 20 concurrent tasks the same or slower than 10
  • Zarr.jl fastest time is consistently 10-40% slower than Python
using Zarr;
Zarr.Blosc.set_num_threads(8);

zarr_url = "https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1"
var = "analysed_sst";

dc = Zarr.zopen(zarr_url, consolidated=true);
s = dc[var];

Zarr.concurrent_io_tasks[] = 1
@time res = s[1:3600, 1:1799, 1:500]; # 100 chuncks

Zarr.concurrent_io_tasks[] = 10
@time res = s[1:3600, 1:1799, 1:500]; # 100 chuncks

Zarr.concurrent_io_tasks[] = 20
@time res = s[1:3600, 1:1799, 1:500]; # 100 chuncks

zarr_url = "https://its-live-data.s3.amazonaws.com/test_datacubes/v02_latest/ALA/ITS_LIVE_vel_EPSG3413_G0120_X-3250000_Y250000.zarr"
var = "v";

dc = Zarr.zopen(zarr_url, consolidated=true);
s = dc[var];

Zarr.concurrent_io_tasks[] = 1
@time res = s[1:100, 1:100, :]; # 500 chunks

Zarr.concurrent_io_tasks[] = 10
@time res = s[1:100, 1:100, :];

Zarr.concurrent_io_tasks[] = 20
@time res = s[1:100, 1:100, :];

return

"https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1"

1

 41.100506 seconds (410.80 k allocations: 6.329 GiB, 0.93% gc time)

10

 14.547571 seconds (347.21 k allocations: 6.875 GiB, 0.86% gc time)

20

 13.853891 seconds (317.82 k allocations: 7.472 GiB, 16.33% gc time)
 
 "https://its-live-data.s3.amazonaws.com/test_datacubes/v02_latest/ALA/ITS_LIVE_vel_EPSG3413_G0120_X-3250000_Y250000.zarr"
 
 1

 98.220884 seconds (551.15 k allocations: 1.907 GiB, 0.58% gc time)

10

 16.954888 seconds (461.55 k allocations: 1.941 GiB)

20

 18.104497 seconds (455.68 k allocations: 1.976 GiB, 0.70% gc time)
import zarr
import time

zarr_url = "https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1"
print(zarr_url)
g = zarr.open_consolidated(zarr_url)
s = g.analysed_sst
tic = time.perf_counter()
s[:500, :1799, :3600] # 100 chunks
toc = time.perf_counter()
print(f"{toc - tic:0.4f}")

zarr_url = "https://its-live-data.s3.amazonaws.com/test_datacubes/v02_latest/ALA/ITS_LIVE_vel_EPSG3413_G0120_X-3250000_Y250000.zarr"
print(zarr_url)
g = zarr.open_consolidated(zarr_url)
s = g.v
tic = time.perf_counter()
s[:, :100, :100] # 500 chunks
toc = time.perf_counter()
print(f"{toc - tic:0.4f}")

return

https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1
13.0594
https://its-live-data.s3.amazonaws.com/test_datacubes/v02_latest/ALA/ITS_LIVE_vel_EPSG3413_G0120_X-3250000_Y250000.zarr
11.3875

@gdkrmr
Copy link
Contributor

gdkrmr commented Feb 22, 2023

I see that currently the default number of tasks is set to 10, maybe leave it at 1 as this seems to be the most performant setting on disks. I am not sure if reading from disk or from object store is more common though.

@alex-s-gardner
Copy link
Contributor

Zarr is a format specifically designed for object store / distributed storage so I would leave the default number of concurrent tasks at 10

@agoodm
Copy link

agoodm commented Feb 22, 2023

I agree with @alex-s-gardner , at least in Earth Science most of us that are using zarr are intending to get our data from the cloud rather than reading a file locally. However rather than changing the default to 10 globally I think it would make sense to have separate sync and async implementations that are dispatched depending on the type of storage (remote storage like GCS/AWS/HTTP should be async).

@gdkrmr
Copy link
Contributor

gdkrmr commented Feb 23, 2023

stupid question: is AWS throttling connections or why is it so much faster to have simultaneous downloads?

@martin-gutwin
Copy link
Contributor

For me julia-zarr (21.537s on 10 tasks, 16.459s on 20 task) beats python-zarr default (22.62s) by a very small margin.
This was for the mur-sst dataset. Do I understand correctly, that mur-sst is hosted on a US-Server? Or are we sure that there is a cache on a European server as well?
My suspicion is that latency somehow could make a difference, i.e. Julia performs better with latency and python performs better without, thus explaining our different outcomes here.
If so, does anyone know of a dataset hosted on a European server?

@agoodm
Copy link

agoodm commented Feb 23, 2023

Latency can make a big difference, certainly, but throttling is probably not playing a role. S3 can process 5500 get requests per second per prefix which is far bigger than the number needed for this benchmark (100). The async really helps though because otherwise lots of time is wasted in retrieving responses for individual requests.

It's true that I am doing my testing on a server with fairly high bandwidth (~3 Gbps) that is in the same region as the MUR-SST dataset so latency is very low. These two factors probably explain why I am getting faster results than others here. The chunks are rather small also (< 1 MB). My guess is that for a different dataset with larger chunks the differences between Zarr.jl and zarr-python will be less noticeable.

Copy link

@agoodm agoodm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function zuncompress!(data, compressed, c::BloscCompressor)
Blosc.decompress!(vec(data), compressed)
end

Even though this is calling the Blosc decompression in-place, it still allocates the chunks individually and then copies them to the larger output array. It would be much better to pass a view of the chunk-sized slice of the output array to avoid having to copy later.

Copy link

@agoodm agoodm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zarr.jl/src/ZArray.jl

Lines 158 to 176 in a47d449

asyncmap(blockr,ntasks=concurrent_io_tasks[]) do bI
a = get!(()->getchunkarray(z),bufferdict,current_task())
curchunk = OffsetArray(a,map((s,i)->s*(i-1),size(a),Tuple(bI))...)
inds = CartesianIndices(map(boundint,r.indices,axes(curchunk)))
# Uncompress current chunk
if !readmode && !(isinitialized(z.storage, z.path, bI) && (size(inds) != size(a)))
resetbuffer!(z.metadata.fill_value,a)
else
readchunk!(maybeinner(a), z, bI)
end
if readmode
# Read data
copyto!(aout,inds,curchunk,inds)
else
copyto!(curchunk,inds,aout,inds)
writechunk!(maybeinner(a), z, bI)

Really only the retrieval of the raw compressed chunks over HTTP should be done asyc while the allocations/decompression are done sync, so this part should be portioned out into two separate loops. I thought it was strange that explicitly setting the number of concurrent tasks for asyncmap was required for better performance. If the async is done on the HTTP requests only, then doing this shouldn't be necessary as it is not the case in the python version. I suspect this is is also why increasing Zarr.Blosc.set_num_threads() was effectively doing nothing. I think it would also be better if the async logic was implemented differently depending on the storage, so this could just be done by defining a separate AsyncableStorage subtype and then separately dispatch a getitems() function to retrieve all the compressed chunks in a loop, either async or sync.

@agoodm
Copy link

agoodm commented Feb 25, 2023

@meggart @alex-s-gardner Apologies for sending a ping on the weekend but I am very excited to report that with a bit of detective work I have figured out how to close the gap with the python version! The working code snippet will be at the bottom of this post but I would also like to explain in detail how I was able to figure this all out.

There are two main issues: The first thing I noticed is that the implementation in this PR is running the entire process of loading each chunk (both the HTTP get and array allocations) asynchronously through asyncmap. The other is that the although the Blosc decompression is operating on the destination arrays in place, the destination arrays themselves are not part of one big contiguous output so they still need to be copied, so really a view needs to be passed in instead. I described these issues in more detail in the review comments I just posted. I also looked at how this all works in zarr-python just to see if our approaches were consistent and sure enough this is pretty much how it's done there. The storage layers (in this case through fsspec) individually implement the async logic to retrieve all chunks, then chunks are processed synchronously in one loop. Relevant sections of code can be found here:

Putting this all together I was able to implement something that now runs effectively as fast (it still seems to be slightly slower on average, but the differences are negligible enough) and it now scales properly when setting Blosc.num_threads(). Without further ado:

function is_partial_chunk(crange, chunks)
    if length(crange) != length(chunks)
        return true
    end
    for i in 1:length(chunks)
        if length(crange[i]) != chunks[i]
            return true
        end
    end
    return false
end

function decompress_partial_chunk!(result, cdata, crange)
    data = Zarr.Blosc.decompress(eltype(result), cdata)
    ind = CartesianIndices(crange)
    for j in 1:length(ind)
        result[ind[j]] = data[j]
    end
end

function zread_async(z, ind...)
    ci = CartesianIndices(ind)
    v = view(z, ind...)
    result = similar(v)
    if length(ci) == 1
        cranges = [ind]
    else
        cranges = Zarr.DiskArrays.eachchunk(v)
    end
    cinds = CartesianIndices(map(Zarr.trans_ind, ci.indices, z.metadata.chunks))
    # Retrieve all raw chunks first async
    cdata = asyncmap(ci->z.storage[z.path, Zarr.citostring(ci)], cinds)
    # Separate loop to decompress data
    for i in 1:length(cranges)
        if is_partial_chunk(cranges[i], z.metadata.chunks)
            decompress_partial_chunk!(result, cdata[i], cranges[i])
        else
            vi = view(result, cranges[i]...)
            Zarr.Blosc.blosc_decompress(cdata[i], vi, sizeof(vi))
        end
    end
    if length(result) == 1
        return result[1]
    end
    return result
end

And the benchmark result now gives:

# 1 thread
@btime zread_async(s, 1:3600, 1:1799, 1:500);
  8.296 s (92076 allocations: 6.24 GiB)

# 8 threads
Zarr.Blosc.set_num_threads(8)
@btime zread_async(s, 1:3600, 1:1799, 1:500);
  2.568 s (87833 allocations: 6.20 GiB)

And to confirm the outputs match:

x1 = zread_async(s, 1:3600, 1:1799, 1:500);
x2 = s[1:3600, 1:1799, 1:500];
all(x1 .== x2)
true

Lastly you may notice I needed to use the lower level Blosc.blosc_decompress() rather than Blosc.decompress!() because for whatever reason the function signature is very inflexible and only allows vectors and not array views. Maybe some work upstream should be done to implement a version that works on arrays with arbitrary dimensions but I digress...

Hopefully I think this should be enough information but if you are busy I would be very happy to help incorporate the necessary changes into Zarr.jl myself. Just let me know.

@alex-s-gardner
Copy link
Contributor

@agoodm this is some fantastic sleuthing

@agoodm
Copy link

agoodm commented Feb 27, 2023

Sorry I realized I jumped the gun a bit. My previous example would only work if you requested complete chunks, partial chunks would be left empty. I have updated my code so it now accounts for that. Unfortunately Blosc.jl doesn't have support for decompressing chunks partially in place but it's not a huge deal since when a sufficient number of chunks are loaded, only the very last chunk is partial.

@meggart
Copy link
Collaborator Author

meggart commented Feb 27, 2023

Thanks a lot for digging into this, this is great progress. Two thing still concern me with this implementatio:

  1. There are many use cases where all chunks are accessed only partially, not just the last chunk. As an extreme scenario: Imagine someone wanted to extract a single full time series from your dataset s[200,200,:]. Given your implementation, all compressed chunks would need to be held in memory at the same time, which might lead to unnecessary out-of-memory errors for large arrays. I think a combination of an asyncmap with limited amount of tasks, streaming results into a channel where a single process takes care of decompression might be a way to go.
  2. It is unsafe to directly call into the c library Zarr.Blosc.blosc_decompress without making sure that the view we are decompressing into is contigouus. This is because we can not assume that the view into your output array is a contigous view into the data, and I am almost sure unit tests would fail/return segfaults with the current implementation. This limitation, that compression libraries are c libs which need contiguous data, was the main reason to introduce these temporary worker arrays for uncompressed data, which I tried to re-use as much as possible through that dict approach. I don't see a way to circumvent these temp arrays, except special-casing for whenever we have a situation in which all views are contiguous.

@agoodm
Copy link

agoodm commented Feb 27, 2023

  1. There are many use cases where all chunks are accessed only partially, not just the last chunk. As an extreme scenario: Imagine someone wanted to extract a single full time series from your dataset s[200,200,:]. Given your implementation, all compressed chunks would need to be held in memory at the same time, which might lead to unnecessary out-of-memory errors for large arrays. I think a combination of an asyncmap with limited amount of tasks, streaming results into a channel where a single process takes care of decompression might be a way to go.

I think this concern feels a little off the mark in a number of ways. First in this particular example the raw chunks are heavily compressed (both through scale_factor and add_offset packing and lz4 compression), so even if they were all loaded into memory at the same time you would still be using under 3 GB of memory, certainly inefficient relative to the actual size of the data that's needed but not enough to cause an out of memory error. More importantly regardless of the memory usage this would be an extremely inefficient data access pattern in terms of loading time that would only really make it suitable for doing some brief exploratory analysis rather than processing at scale. I think when weighing between both of these types of use cases the latter is far more important, especially because in such cases this would be done using cloud compute resources where both bandwidth and memory can be made plentiful and running time is among the greatest sources of expenses incurred during processing.

Furthermore it's generally the responsibility of the groups archiving the data to be chunking in such a way that lines up well with the expected use-cases. To illustrate this point, the MUR-SST I am using in my benchmark is actually stored in two archives with completely different chunking schemes, one for general use and another specifically designed for time series analysis. The former (https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1) has chunk size (3600, 1799, 5), while the latter (https://mur-sst.s3.us-west-2.amazonaws.com/zarr) has chunk size (100, 100, 6443) which would have virtually no issues with memory usage for the access pattern you described.

Finally if it was really necessary to access the data in such a suboptimal way to begin with, it's quite easy for users to work-around this by writing a simple loop to access one partial chunk at a time. Conversely in the opposite case where aggressively optimizing memory usage is done at the expense of access times, it's a lot less straightforward for the end user to apply the async logic themselves in the optimal way (ie they would have to write something like the code I have just written). What you are proposing sounds somewhat complicated (it may require more configurable parameters for the end user to tune like the concurrent number of tasks) and I worry that it may still be considerably less performant. As far as I can tell the zarr python library doesn't go out of its way to protect users from excessive memory consumption and does retrieve all raw chunks at once if the the storage class supports concurrent retrieval of chunks (https://github.com/zarr-developers/zarr-python/blob/main/zarr/core.py#L1275-L1291). So I think in that case if anything it would make more sense to do the retrieval in one-go for asyncable storage (HTTP/GCS/S3) and then do it sequentially like we currently do for the others.

  1. It is unsafe to directly call into the c library Zarr.Blosc.blosc_decompress without making sure that the view we are decompressing into is contigouus. This is because we can not assume that the view into your output array is a contigous view into the data, and I am almost sure unit tests would fail/return segfaults with the current implementation. This limitation, that compression libraries are c libs which need contiguous data, was the main reason to introduce these temporary worker arrays for uncompressed data, which I tried to re-use as much as possible through that dict approach. I don't see a way to circumvent these temp arrays, except special-casing for whenever we have a situation in which all views are contiguous.

You are absolutely right in that we should ensure the views are contiguous before deciding to call to the lower level blosc_decompress, but this should be a fairly trivial check to implement (taken from Blosc.jl but applied to SubArrays):

function iscontiguous(A)
    p = sortperm([strides(A)...])
    s = 1
    for k = 1:ndims(A)
        if stride(A,p[k]) != s
            return false
        end
        s *= size(A,p[k])
    end
    return true
end

Again I will point out that the python library has figured this problem out (and in fact, it too checks if the view is contiguous before deciding whether or not to do the Blosc decompression in-place, see https://github.com/zarr-developers/zarr-python/blob/main/zarr/core.py#L1877-L1884). I don't see why we can't just do a quick check to see if the slice of the output array is contiguous when iterating through each chunk, what am I missing here?

@agoodm
Copy link

agoodm commented Mar 8, 2023

@meggart Have you had some time to give this some additional thought?

@meggart
Copy link
Collaborator Author

meggart commented Mar 13, 2023

Thanks a lot for the feedback. Yes, I tend to agree with your first point. probably there won't be many real-life scenarios where this would be a bottleneck. I am just trying to come up with a good way forward to see how to implement the changes suggested. I think the following points would be necessary:

  1. Disentangle reading of raw chunk data from the decompression into separate steps.
  2. Add a vectorized getindex method for AbstractStore which would be the equivalent to zarr-pythons get_items. (E.g. getindex(s::AbstractStore, i::Vector{CartesianIndex})) This way, backends like HTTP and S3 can parallelize access of different chunks while a sequential fallback would be implemented for other stores.
  3. Whenever views into the output array are contiguous, directly decompress into the output, this would be done through an extra iscontig check. Maybe even a PR to Blosc.jl might help. relaxing their signature to accept views, so we don't have to deal with the Blosc C API, which I would prefer to avoid.

With these ingredients, I think we could make the reading inside Zarr.jl equivalent to what you posted in your example. I will be quite busy the next 1-2 weeks, afterwards I would like to tackle these

@agoodm
Copy link

agoodm commented Mar 13, 2023

Great! Glad to see this all moving forward.

Some other possible ideas:

  1. The python API indeed allows for concurrency by checking that the store contains a getitems() method. But rather than implement this individually in each storage class, it may be cleaner and more in line with Julia best practices if we use separate dispatch to call the asyncmap chunk retrieval through an Asyncable trait.
  2. I believe fsspec also has separate synchronous implementations of some remote FS's, so perhaps we could support an optional global config setting to disable it?

@meggart
Copy link
Collaborator Author

meggart commented Mar 24, 2023

Ok, I did a first attempt on implementing this and pushed it to this branch. Currently writing data is broken, so please just try this for benchmarking. The problem is that I am still unable to match the timings in python.

For me it looks like the following:

  1. Zarr-Python: ~5 seconds
  2. This branch: ~12seconds
  3. Suggestion by @agoodm in this thread (zread_async) ~13seconds

I really don't know how to reach the python speed here. I tend to think that we are limited by HTTP.jl somehow. Look at this simple example which simply downloads 100 chunks concurrently (without any Zarr.jl usage, without decompression):

import HTTP
function http_only()
    r = Any[nothing for _ in 1:100]
    @sync for i in 0:99
        @async r[i+1] = HTTP.request("GET","https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1/analysed_sst/$i.0.0",status_exception = false);
    end
end
@time http_only()

I ran this very often and never got it to run faster than 11 seconds, so this is nowhere close to the timings we get from the python implementation. @agoodm could you try this code snippet and report if the download itself can match fsspec?

Copy link

@agoodm agoodm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have also pulled these changes and tested them with my previous example and noticed some other things that need to be addressed:

  1. The running time doesn't seem to be scaling with Blosc.num_threads()
  2. I am not getting the correct result, instead reading returns a big array of 0's.

src/Storage/http.jl Show resolved Hide resolved
@@ -29,6 +29,7 @@ push!(storageregexlist,r"^https://"=>HTTPStore)
push!(storageregexlist,r"^http://"=>HTTPStore)
storefromstring(::Type{<:HTTPStore}, s,_) = ConsolidatedStore(HTTPStore(s),""),""

store_read_strategy(::HTTPStore) = ConcurrentRead(concurrent_io_tasks[])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are using channels, then I think it would be better to change the default concurrent_io_tasks size to around 100 rather than the current default (10). This corresponds to the maximum number of concurrent connections in aiohttp for example and I think it would be better to prioritize runtime over memory usage, then users would set this lower only if they need to do suboptimal chunk access and want to limit their memory usage.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I have increased this to 50 for now. Do you still see speedups for 100 tasks compared to 50? If yes, then I am happy to increase this further.

@agoodm
Copy link

agoodm commented Mar 27, 2023

So initially I was unable to replicate the big discrepancies you were seeing. I replicated your code snippet in python with fsspec and found that both were within a second of each-other (with running times averaging around 2 s). Here is what the fsspec version looks like:

import fsspec
keys = [f"analysed_sst/{i}.0.0" for i in range(100)]
fs = fsspec.filesystem("http")
m = fs.get_mapper("https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1/")
cdatas = m.getitems(keys)

However, I have stated multiple times that I am pretty sure I have access to a substantially faster internet connection than you do which is why we have been getting different numbers. In fact the story changes completely if I use a different set of chunks that are larger in size. For that dataset the given chunks are for the bottom left corner of the grid so there are many values set to filler which is probably greatly increasing the compression ratios of these chunks as they are generally under 1 MB. Changing the above code to use:

keys = [f"analysed_sst/{i}.1.0" for i in range(100)]

now takes 10s, as this time the compressed size of the chunks are many times larger. When retrieving these chunks in Julia via HTTP.jl, it now takes around 35 s, now around 3x slower! I spent a lot of time trying to figure out some possible explanations and I think your initial hunch is correct, this is likely due to limitations from HTTP.jl. fsspec is actually retrieving the chunks using aiohttp which is a very mature and heavily optimized HTTP library designed specifically for being used asynchronously. The details for as to why it's so much faster are beyond my expertise on the subject but I think this page in the aiohttp documentation provides some possible insights. Maybe the fact that every single component of loading the response being async (including the separation of loading headers and the payload) could explain the big differences I have observed with downloading smaller vs larger chunks, since with a fast connection the download times are negligible for the small chunk case.

I still think these changes are very much worthwhile even with these limitations from HTTP.jl because it's still much faster than purely retrieving each chunk synchronously. One possible workaround to fully match zarr-python would be to use PythonCall.jl or PyCall.jl to wrap the chunk retrieval part with fsspec (or maybe even asyncio / aiohttp if we really wanted to minimize python dependency requirements). I have actually tested this and found with PythonCall we can even pass the raw compressed chunk data loaded through fsspec directly to Blosc with very little overhead as long as the result is wrapped in a PyArray object.

Resorting to calling python code from Zarr.jl may be controversial so I could understand why it would be undesirable to resort to doing that. It would also beg the (naive) question of why not just call zarr-python through Py(thon)Call then, but I think it's worth pointing out that doing so has the downside of adding potentially substantial overhead from converting entire arrays back and forth between Python and Julia which is why using only native Julia code is always ideal (and one reason why Zarr.jl exists!). However the nice thing about this method is that the raw compressed chunk data can be directly passed to Blosc decompression with little to no overhead whether they are Python or Julia arrays, then since the rest of the work is done through Julia you don't get that overhead either form copying a python array to julia.

@meggart
Copy link
Collaborator Author

meggart commented Mar 28, 2023

Thanks a lot for the feedback. I agree that, although we currently can not match the python reading speeds we should merge some version of this PR to enable async reading. However, to me this PR has become too conflated for now. In particular I am not sure if these check about the contigouity of the array couldn't be done better, and (at least for my test cases) these were not causing any speedups. So I will step back a bit in this PR and only introduce the async read trait for now. The other change that optimises the decompression of a chunk directly into the output array I would move to a separate PR.

In addition, I think it might be good to document the pure async http benchmarks (HTTP.jl vs aiohttp) in an issue over at HTTP.jl and ask if there is any chance we might match the aiohttp performance in the future. Only if the authors of HTTP.jl think this is infeasible, I would would think about PyCall options to retrieve the data faster.

@agoodm
Copy link

agoodm commented Mar 28, 2023 via email

@mkitti
Copy link
Member

mkitti commented Mar 30, 2023

Responding to @agoodm 's query on Discourse the important optimization steps for HTTP.jl I have found so far are:

  1. Raising the connection_limit via HTTP. request.
  2. Preallocating the buffers via 'response_stream'

Also generally making sure the Julia code compiles well was also beneficial.

Here's the full report:
https://discourse.julialang.org/t/http-jl-async-is-slow-compared-to-python-aiohttp/96736/5

@agoodm
Copy link

agoodm commented Apr 3, 2023

So in the course of a lot of discussion through that discourse thread I think we have managed to arrive at the solution. For now we just need to specify socket_type_tls=OpenSSL.SSLStream in the call to HTTP.request. Can you try applying this modification and benchmarking things on your end again?

Even though this change may remove the performance gap, I think it may still be a good idea to consider supporting fsspec compatible storage through PythonCall.jl, either as a custom Storage type itself or perhaps as a separate extension outside of Zarr.jl. This would provide support for an even greater number of storage systems, in particular for kerchunk references which I know several people have expressed interest in but there is no native implementation available yet in Julia. What are your thoughts @meggart ?

@meggart
Copy link
Collaborator Author

meggart commented Apr 3, 2023

Thank you so much @agoodm for initiating this great discourse thread. I was following but unfortunately could not contribute much due to lack of expertise. I was experimenting a lot last Friday and got quite frustrated because I was nowhere near closing the performance gap even with the suggested changes, but I am very optimistic now that avoiding MbedTLS will help.

I think it would be a great idea to make an fsspec-store that works through PythonCall. If there are no volunteers, I can work on that next week. Since we have real optional dependencies now for Julia 1.9 I think it is also good timing to think about #76 again.

@agoodm
Copy link

agoodm commented Apr 3, 2023

Glad to hear that. I am definitely far from an expert myself but I managed to learn a lot from that thread too.

In addition to the performance issues with MbedTLS, there is one other outstanding issue with HTTP.jl (JuliaWeb/HTTP.jl#1033) but I think it's not too important at least for Zarr.jl since in all my testing so far changing that number barely made any impact (though it may be worth testing some more and consider increasing the default limit a little higher to, say, 25 or even 100 (which is the case for python), just to be safe).

Another conclusion I found is that in my benchmarks once accounting for compilation and multi-threading (eg using when using only one thread) pre-allocating the response buffers made hardly any noticeable differences. I think a low level optimization like that is probably going to matter much more for cases where large numbers of requests are being made on small amounts of data, but I don't think that's going to be the case with zarr chunks most of the time. Maybe worth exploring again later if we encounter an appropriate dataset / use-case for that.

If you don't mind I would be happy to write and develop the fsspec Storage interface for Zarr.jl. Thanks!

@meggart
Copy link
Collaborator Author

meggart commented Apr 4, 2023

If you don't mind I would be happy to write and develop the fsspec Storage interface for Zarr.jl. Thanks!

Great! Thanks for the heads-up, so I will not start this. I would be very curious how far this can get us, especially regarding using things like kerchunk.

@meggart
Copy link
Collaborator Author

meggart commented Apr 4, 2023

So I have cleaned this PR a bit and tried to include the essence of the respective discourse discussions in this PR. Hopefully this gives sufficient read performance for most people.

Unfortunately the changes in the workflow broke the LRU cache implementation for uncompressed chunks, so I will continue to work on this.

Copy link

@agoodm agoodm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have spent a little bit of time testing these changes with my original benchmark. I was initially curious about how well this would work using channels instead of just a much simpler call to asyncmap but so far this portion works really well! I haven't noticed any real performance penalty from using the channel, and in fact after playing around with the concurrent_io_tasks[] parameter I have found that lowering it also didn't seem to have any negative impacts or even make it a little faster. So it may be alright to backtrack on my previous suggestion and lower it back to 10 to be on the conservative side. I encourage you to try testing it on your connection just to be sure these conclusions hold for you too.

One issue that remains is that copying the decompressed chunk data to the output remains a significant bottleneck, so implementing that special case for contiguous views would be very worthwhile. In my original benchmark:

julia> g = zopen("https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1", consolidated=true)
ZarrGroup at Consolidated Consolidated Zarr.HTTPStore("https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1") and path 
Variables: lat analysed_sst analysis_error mask time lon sea_ice_fraction 

julia> s = g["analysed_sst"]
ZArray{Int16} of size 36000 x 17999 x 6443

julia> @time s[1:3600, 1:1799, 1:500];
 18.504524 seconds (44.49 k allocations: 6.312 GiB, 0.28% gc time)

Since we have optimized the raw compressed chunk retrieval, nearly all of this is due to the cost of copying. I have however managed to find a way to at least make this copying much more efficient (see my below suggestion). It gives a 2-3x speedup.

src/ZArray.jl Outdated Show resolved Hide resolved
@mkitti
Copy link
Member

mkitti commented Apr 5, 2023

Part of the utility of using response_stream is that you could manage the memory buffers directly and reuse them.

       buffer_channel = Channel{IOBuffer}(10)
       for i in 1:10
           put!(buffer_channel, IOBuffer(; sizehint = 64*1024^2))
       end


      asyncmap(urls) do url
           buffer = take!(buffer_channel)
           try
               seekstart(buffer)
               HTTP.get(url, response_stream = buffer, socket_type_tls=OpenSSL.SSLStream, connection_limit = 20)
               compressed_chunk = @view buffer.data[1:bytesavailable(buffer)]
               uncompressed_chunk = uncompress(compressed_chunk) # psuedo code
           finally
               put!(buffer_channel, buffer)
           end
           return uncompressed_chunk
       end

@agoodm
Copy link

agoodm commented Apr 6, 2023

Tested it again with the latest commit, it LGTM 👍

@meggart
Copy link
Collaborator Author

meggart commented Apr 11, 2023

Part of the utility of using response_stream is that you could manage the memory buffers directly and reuse them.

Hi @mkitti I had this response_stream reuse in an earlier version of the PR. The main problem with it was that in general this would also affect other backends when done generically. In principle I like the idea to let storage backends write into pre-allocated and re-usable IOBuffers but I would really like to give this a bit more thought. Would it be ok to leave the PR as-is now and save this in an issue for later optimization?

I think there should be no performance regression with this PR, but already improvements for many common use cases.

@mkitti
Copy link
Member

mkitti commented Apr 11, 2023

I think there should be no performance regression with this PR, but already improvements for many common use cases.

No objections from me. I'm just explaining the potential utility and why I introduced it.

@meggart meggart merged commit 21c1ce9 into master Apr 12, 2023
@mkitti
Copy link
Member

mkitti commented Apr 27, 2023

Update, see: JuliaWeb/HTTP.jl#1034

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants