Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request batching #392

Closed
kylebarron opened this issue Nov 17, 2023 · 22 comments
Closed

Request batching #392

kylebarron opened this issue Nov 17, 2023 · 22 comments

Comments

@kylebarron
Copy link
Owner

Currently the async requests are really slow because IIUC they do all fetches sequentially. See here for a painful example. If the data isn't cached, it'll slowly fetch in:

Screen.Recording.2023-11-17.at.4.24.32.PM.mov

You should take a closer look at the individual request ranges, but I think it's fetching every column independently. There are 7 record batches and 7 columns. Each field other than geometry is pretty small.
image

And indeed most of the fetches are in the low kb size, other than some 1.5-2MB fetches, which are probably for the geometry column.

It looks like the way to fix this is to implement a custom AsyncFileReader which will implement get_byte_ranges to combine range requests.

https://github.com/pka/http-range-client may also be helpful. cc @H-Plus-Time in case you have interest/insight

@kylebarron
Copy link
Owner Author

Ok yeah so the default implementation just calls self.get_bytes(range).await?; sequentially.

From georust discord:

There's two different kinds of merging - one for index traversal and one for feature traversal and they have different strategies reflecting their different layouts.

Here's the index request merging: flatgeobuf/flatgeobuf#93
And here's the feature request merging: flatgeobuf/flatgeobuf#319

Though you should probably refer to the latest version of that same code since some bugs have been fixed.

It also looks like maybe it would be good to try and use the ObjectStore API directly,

Also object store exposes this publicly: https://docs.rs/object_store/latest/object_store/fn.coalesce_ranges.html

@H-Plus-Time
Copy link
Contributor

H-Plus-Time commented Nov 20, 2023

Yeah, I think if it were possible to amp up the coalescing to multiple row_groups (I'd say on an average connection, the UScounties.parquet file would be great to serve with 1 meta/head + 2x 3-4 row group requests), the streaming workflow would make total sense. I've played around with one builder per row group + StreamExt::Buffered to reasonable effect (~30% reduction vs sequential request / row group) - with just that and a sufficiently large file (12MB is small enough that cloudflare's ~300ms/request lag is noticeable), you could probably saturate your network link (especially if the row group sizes are bigger - swapping to 2 row groups for the same UScounties.parquet file dropped the execution time penalty from 40% to 14%).

@H-Plus-Time
Copy link
Contributor

Ah :|, it really is a caching problem - with caching configured properly (I just used an 'all parquet files are eligible for caching' rule in cloudflare and left it at that), the per row group version (the content of #393) is IO bound (saturates my network link), and the per column, per row group is moderately slower (~20%).

@kylebarron
Copy link
Owner Author

possible to amp up the coalescing to multiple row_groups

That should be possible too. Under the hood we can pass a vector of row group indexes.

I'm not sure I want to set the default to fetching multiple row groups per request (though you could implement some heuristics based on the row group byte size, which you learn from the file metadata).

@kylebarron
Copy link
Owner Author

In the existing code, we also create a new client per request, IIUC, whereas the new content in #393 reuses the same client, and maybe that helps with connection keep-alive?

@H-Plus-Time
Copy link
Contributor

possible to amp up the coalescing to multiple row_groups

That should be possible too. Under the hood we can pass a vector of row group indexes.

I'm not sure I want to set the default to fetching multiple row groups per request (though you could implement some heuristics based on the row group byte size, which you learn from the file metadata).

Yeah, it looks like it's particularly difficult to do via the parquet streaming interface (passing a vector of row groups, I still end up with serialized row group requests), tinkering with the AsyncFileReader seems to be the way the ParquetObjectStoreReader and datafusion's physical plan manage to do it.

Yeah, the moment we expose things like with_row_filter, with_projection, meddling with multi-row group requests would need to be ignored (the intra-row group coalescing still makes sense).

I think I'd be comfortable with a desired_request_size parameter in the stream interface, and bulk coalescing of contiguous row groups (minimum of 1 row group per request, maximum of 2x desired_request_size per request). That would open the door for an easy usability win in the form of row group filtering, and a very clearly marked "skip messing with the requests if any of these conditions are met" point.

@kylebarron
Copy link
Owner Author

Unless/until we can send concurrent requests, it seems like merging requests is always going to be a better option. At the moment I took away something like desired_request_size in favor of coalesce_size: the allowed number of bytes between two request ranges to merge and send as one request.

@H-Plus-Time
Copy link
Contributor

Yes, merged intra-row-group, concurrent requests appears to be optimal.

Alright, results from the week of experiments:

  • It is in fact possible to use ParquetObjectStore if you're willing to implement your own (almost entirely stubbed) impl ObjectStore for FoobarObjectStore. All told, ~250-300 lines for an equally fault-tolerant equivalent to HttpObjectStore.
    • The upside is that object_store emits exactly the same coalesced range requests
    • Probably identical copy count compared to the current fetch code (pretty sure).
    • Downside - I would have liked to get this into object_store itself, but the full-fat version of HttpObjectStore is far more feature-rich than this (the point about decoupling it from hyper, reqwest still stands though).
    • Downside - the timing and range requests are no better than the custom HttpFileReader. Potential for that to change with upstream modifications I suppose.
  • Concurrent requests ended up being straightforward once I admitted defeat trying to get ParquetRecordBatchStream to behave concurrently, and opted for one per row group:
      let outer_stream = (0..num_row_groups).map(move |i| {
          let builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
              reader.clone(),
              meta.clone(),
          );
          builder.with_row_groups(vec![i]).build().unwrap().try_collect::<Vec<_>>()
      });
      let buffered = stream::iter(outer_stream).buffered(concurrency);
      let out_stream = buffered.flat_map(|maybe_record_batches| {
          stream::iter(maybe_record_batches.unwrap()).map(|record_batch| {
              Ok(RecordBatch::new(record_batch).into())
          })
      });
  • (the above kicks off $n$ requests immediately, and likely boosts the peak memory usage by a bit (decompressed row groups, mind, so concurrency above 8 with beefy row groups would probably be... a bad idea)).
  • Bulk chunking wasn't worth it (as in requests spanning multiple row groups)
    • On cached, h2/h3 hosts, no advantage over coalesced, concurrent requests.
    • Uncached, h2/h3 hosts, slightly higher variance.
    • Cached h1.1 hosts (older CDNs) - better than coalesced, serial requests, but still unacceptably slow (with the added downside that the first record batch arrives later than the non-chunked equivalent).
    • Uncached, h1.1 - no improvement at all.

The sweet spot for concurrency on h1.1 endpoints is ~4 (accounting for the two briefly used up by the HEAD and GET), but I'd stress that the concurrent partial read story for http/1.1 is never going to be a pleasant one in browser environments (because of the connection cap and lack of multiplexing, mixed with the tendency for object stores to have large fixed latencies).

@H-Plus-Time
Copy link
Contributor

It also looks like Cloudflare freezes the protocol of R2 custom domains at creation - anything before ~April this year is stuck on 1.1 (a lot more recent than I'd assumed), and up to somewhere around October they're all http2 (apparently R2 is under extremely active development, a custom domain created today is set to http3).
Only way to change it is to remove the domain in R2 settings and re-add it (https://community.cloudflare.com/t/how-can-i-enable-http-2-on-my-r2-bucket/501520).

@kylebarron
Copy link
Owner Author

Thanks!! This is really awesome stuff!

  • It is in fact possible to use ParquetObjectStore if you're willing to implement your own (almost entirely stubbed) impl ObjectStore for FoobarObjectStore

do you have a preference for using a stubbed ObjectStore vs a manual implementation? I'm not sure which would be more maintainable.

It seems like a lot of work to upstream full wasm support into object-store, which is why I was learning towards a fully custom implementation.

  • Concurrent requests ended up being straightforward once I admitted defeat trying to get ParquetRecordBatchStream to behave concurrently, and opted for one per row group:

That looks amazing!

Oh I see, you create a different reader for each record batch index. That seems fine to me.

  • the above kicks off $n$ requests immediately, and likely boosts the peak memory usage by a bit (decompressed row groups, mind, so concurrency above 8 with beefy row groups would probably be... a bad idea)).

I don't fully understand some of how stream processing works in JS, but if you have a stream reader on the JS side, as soon as the batch is returned, the memory should clear in wasm?

Does this maintain ordering? If you want record batches [5, 2, 4], will it ensure it's always emitted in that order? If the first record batch is for some reason way larger than the others, will the others buffer in memory?

@kylebarron
Copy link
Owner Author

Would concurrency also work from the JS side, if it created $n$ different promises for $n$ different row groups, and then called Promise.all(), would that still fetch all at the same time?

@H-Plus-Time
Copy link
Contributor

H-Plus-Time commented Nov 30, 2023

Maybe object_store? The extension points would probably be quite handy when getting to things like partitioned datasets (well, for WebDAV at least). I suppose the biggest upshot would be that eventually, the 1st party request code largely goes away.

I could certainly see circa Q2 2024 wasm-compatible object stores for:

I'll put up a PR for it though, mainly to demonstrate what was involved.

Oh I see, you create a different reader for each record batch index. That seems fine to me.

Each row group, yep.

Yeah, I think they'll only be very brief spikes, and it would be very hard to push it into OOM territory; just an observation really.

Does this maintain ordering?

Yep, ordered both within row groups and between them - there is actually a StreamExt::buffered_unordered function that'd help in a situation like that (highly variable row group sizes), but tbh those use-cases are so rare I wouldn't bother allowing it.

Would concurrency also work from the JS side

Yep, if the target server/browser/network conditions allow it (I've noticed h2 connections sometimes delaying requests, though the browser always schedules them immediately).

@kylebarron
Copy link
Owner Author

Yeah, I think they'll only be very brief spikes, and it would be very hard to push it into OOM territory; just an observation really

One consideration here is that I think Wasm memory gets resized upwards but not downwards. So if the peak memory is short but high, you'll be sitting with a very large but mostly unused ArrayBuffer when it ends, right?

Maybe object_store

One other thing to keep in mind with the object store APIs is that they don't have a way to fetch a suffix range request, because apparently azure doesn't support that 😭 apache/arrow-rs#4611

This means that if you use object store, you always first need a HEAD request to get the content length, right? Ref #272

In my prototype in #393 I'm able to avoid any HEAD request for the length because I fetch the metadata manually.

@kylebarron
Copy link
Owner Author

I could certainly see circa Q2 2024 wasm-compatible object stores for:

That's a lot of different target systems... I'm somewhat inclined to only support HTTP to keep the scope down and maintenance minimal, and users can fetch signed URLs to support other object stores.

@H-Plus-Time
Copy link
Contributor

Mm, good point, that makes it a very easy decision - custom implementation.

Going by the suffix thread, maintainers are receptive, I'll queue the From changes up for advent of code (ahead/concurrently with the wasm changes).

Fair re scope. http, a readablestream output, and a file input is sufficient for users to handle the rest in JS land (that last one is largely to support Response, FileSystemAccessHandle, both of which have cheap file methods (it turns out that's all that's really needed to emulate AsyncRead + AsyncSeek) and integrate into ~3-4 core browser APIs.

@kylebarron
Copy link
Owner Author

  • It is in fact possible to use ParquetObjectStore if you're willing to implement your own (almost entirely stubbed) impl ObjectStore for FoobarObjectStore. All told, ~250-300 lines for an equally fault-tolerant equivalent to HttpObjectStore.

Do you still have this experiment?

@H-Plus-Time
Copy link
Contributor

H-Plus-Time commented Apr 3, 2024

I do, yep, it's over at object-store-wasm - it still uses the collect stream -> wrap up as a single item stream trick (up to date with object-store 0.9 though).

Tinkering with ehttp atm (which seems to have markedly less mature header constants), I'm hoping that it'll make it a bit more tractable to do actual streaming (I suspect it's a moot point anyway wrt the parquet reader, but worth it for more general purpose use elsewhere)

@kylebarron
Copy link
Owner Author

that repo appears to be private 🥲

@H-Plus-Time
Copy link
Contributor

Whoops, forgot the protocol in the link 😬

@kylebarron
Copy link
Owner Author

That's cool! Are you able to add a license to the repo? I started an attempt to connect to object store here but hit a wall and stopped. At present to get things working I vendored an impl of AsyncFileReader, but I'm guessing that using ObjectStore would be more maintainable. I have some time available to get GeoParquet working for remote datasets in JS.

Also note https://github.com/JanKaul/object_store_s3_wasm if you haven't seen it.

@H-Plus-Time
Copy link
Contributor

Yep, license added. Yeah, that was the main bit of prior art that convinced me to just re-implement the client parts. I'll be doing a bit of internal shuffling, but stuff like HttpStore should still be a top-level export.

@kylebarron
Copy link
Owner Author

I think this is all resolved in 0.6?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants