Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic Batching #261

Open
wants to merge 53 commits into
base: branch-24.12
Choose a base branch
from

Conversation

achirkin
Copy link
Contributor

@achirkin achirkin commented Jul 30, 2024

Non-blocking / stream-ordered dynamic batching as a new index type.

API

This PR implements dynamic batching as a new index type, mirroring the API of other indices.

  • [building is wrapping] Building the index means creating a lightweight wrapper on top of an existing index and initializing necessary components, such as IO batch buffers and synchronization primitives.
  • [type erasure] The underlying/upstream index type is erased once the dynamic_batching wrapper is created, i.e. there's no way to recover the original search index type or parameters.
  • [explicit control over batching] To allow multiple user requests group into a dynamic batch request, the users must use copies of the same dynamic batching index (the user-facing index type is a thin wrapper on top of a shared pointer, hence the copy is shallow and cheap). The search function is thread-safe.

Feature: stream-ordered dynamic batching

Non-blocking / stream-ordered dynamic batching means the batching does not involve synchronizing with a GPU stream. The control is returned to the user as soon as the necessary work is submitted to the GPU. This entails a few good-to-know features:

  1. The dynamic batching index has the same blocking properties as the upstream index: if the upstream index does not involve stream sync during search, that the dynamic batching index does not involve it as well (otherwise, the dynamic batching search obviously waits till the upstream search synchronizes under the hood).
  2. It's responsibility of the user to synchronize the stream before getting the results back - even if the upstream index search does not need it (the batch results are scattered back to the request threads in a post-processing kernel).
  3. If the upstream index does not synchronize during search, the dynamic batching index can group the queries even in a single-threaded application (try it with --no-lap-sync option in the ann-bench benchmarks).

Overall, stream-ordered dynamic batching makes it easy to modify existing cuVS indexes, because the wrapped index has the same execution behavior as the upstream index.

Work-in-progress TODO

  • Add dynamic batching option to more indices in ann-bench
  • Add tests
  • (postponed to 25.02) Do proper benchmarking and possibly fine-tune the inter-thread communication
  • Review the API side (cpp/include/cuvs/neighbors/dynamic_batching.hpp) [ready for review CC @cjnolet]
  • Review the algorithm side (cpp/src/neighbors/detail/dynamic_batching.cuh) [ready for preliminary review: requests for algorithm docsting/clarifications are especially welcome]

@cjnolet cjnolet added improvement Improves an existing functionality non-breaking Introduces a non-breaking change labels Jul 30, 2024
@achirkin achirkin changed the base branch from branch-24.08 to branch-24.12 September 26, 2024 09:09
@achirkin achirkin changed the title [WIP] dynamic batching Dynamic Batching Oct 23, 2024
@achirkin
Copy link
Contributor Author

Sneak-peek into performance (single-query benchmarks on a workstation):
CAGRA multi-cta original vs dynamically batched
CAGRA single-cta original vs dynamically batched

@achirkin
Copy link
Contributor Author

achirkin commented Nov 6, 2024

Current progress note

At the moment, all PRs, upon which this PR depends, are merged - there are no pending fixes/blockers in cuVS or raft.

There's one known bug that leads to a deadlock in rare cases:
When many concurrent threads submit their work, they subscribe for the incoming batch slots in the batch_queue_t (ahead of the free buffers pushed back by the GPU). If they do this fast enough and loop over the whole ring buffer, they may eventually skip the slot from the previous round over the buffer, which makes it impossible to push to that slot at a later point. Currently, the hardcoded buffer size is 256 slots. For benchmarks, I workaround this by increasing this size to 65536 or more, but a proper fix is planned (coming early next week probably).

@achirkin achirkin marked this pull request as ready for review November 14, 2024 11:13
@achirkin achirkin requested review from a team as code owners November 14, 2024 11:13
@achirkin
Copy link
Contributor Author

Current progress note

  • The implementation is ready for review; all known problems are resolved.
  • The API waits for more reviewers (see Dynamic Batching #261 (comment))
  • There are a few problems with the benchmark setup, which prevent dynamic batching from reaching good speedups. I suggest we merge this PR and leave further performance studies to a follow-on.

@achirkin
Copy link
Contributor Author

For the record, the list of identified benchmark problems, which require a further study:

  1. CAGRA produces uint32_t indices, which are mapped in a post-processing kernel to int64_t. This is not dynamically-batched; as a result all threads in the benchmark launch this kernel and cause CUDA context locks degrading the performance.
  2. IVF-PQ only makes sense with refinement, but suffers from the same problem as (1), because the refinement is performed in every thread rather than once per batch. Interestingly, the problem persists even if the refinement is performed on CPU (maybe it's the intermediate results copy using CUDA, maybe it's just CPU overuse).
  3. The benchmarks runs equal number of iterations in every thread in throughput mode. The dynamic batching loads the threads not equally and thus causes significant difference in the per-thread run time. This, in turn, leads to a long tail effect, where one or few threads run significantly longer than the others. The dynamic batcher is never able to get enough threads for the full batch and is forced to wait longer (full dispatch time on every iteration). As a result, the total average time/QPS can be hurt as well.

@achirkin achirkin requested a review from tfeher November 20, 2024 08:50
return index.runner->search(res, params, queries, neighbors, distances); \
}

CUVS_INST_DYNAMIC_BATCHING_INDEX(float, uint32_t, cuvs::neighbors::cagra, index<float, uint32_t>);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really unfortunate that we'll need to instantiate these individually for each index type. For example, Vamana is not included here. Is there any way we can remove this constaint? Can we just tie this to the search_params super class?

Copy link
Contributor Author

@achirkin achirkin Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree and I see couple solutions to this.

One is to go with the class-based polymorphism.
Then we'd have to make the search parameters neighbors::search_params and the index type neighbohrs::index virtual, by adding the virtual destructor type. We will also need a virtual clone() method, so we can copy implementation search parameters via the base class. This goes slightly against our initial design of keeping the search parameters a POD. This also means it would be dangerous to pass search parameters struct to kernels (but I think we haven't been doing this so far).
Then we would also need to add virtual search method to the index (and also dim() which is currently used by the dynamic batching), which goes slightly against our initial design of having search/build functions as plain functions.
Then there will be only one, non-templated dynamic_batching constructor taking the abstract upstream index and search parameters.

Another solution is to go with the template-based polymorphism.
I could define a template constructor in the public header file (similar to what I have in the dynamic_batching_test at the moment). It would take the index, search params, and the search function as the template parameters, so that users can instantiate it on the user side. I think I would have to slightly rework the detail::batch_runner and expose at least one of its constructors in the public header for that.
This obviously goes against our design of not instantiating anything on the user side (but it doesn't involve any cuda-specific code and should be fast).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the template-based solution would be slightly less disruptive to cuVS in general, but I also probably we should take this to a follow-on PR and a separate discussion for 25.02

std::shared_ptr<detail::batch_runner<T, IdxT>> runner;

/**
* @brief Construct a dynamic batching index by wrapping the upstream index.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please include a usage example and add this to the API docs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also explain how do we know if the batch is finished. Is it just a sync with the stream in res?

const cuvs::neighbors::filtering::base_filter* sample_filter = nullptr);
};

void search(raft::resources const& res,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document these functions. I know it seems redundant, but it's important users can look these functions up in the docs. Also please add usage examples.

@@ -138,6 +138,10 @@ using list_data = ivf::list<list_spec, SizeT, ValueT, IdxT>;
*/
template <typename T, typename IdxT>
struct index : cuvs::neighbors::index {
using index_params_type = ivf_flat::index_params;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also include the other index types (e.g. bfknn, vamana, etc...).

NAME
NEIGHBORS_DYNAMIC_BATCHING_TEST
PATH
neighbors/dynamic_batching/test_cagra.cu
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please include the other index types here as well (e.g. bfknn, vamana, etc...).

Copy link
Contributor

@tfeher tfeher left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reviewed the implementation details. Thanks Artem for the additional documentation, overall the code looks great.

To achieve high throughput and low latency, one has to watch out for intricate details for queuing and synchronization, which makes the implementation complex. I have left a few comments that requests additional explanation, and suggests potential refactoring to make the logic easier to follow.

param.dynamic_batching_n_queues = conf.at("dynamic_batching_n_queues");
}
param.dynamic_batching_k =
uint32_t(uint32_t(conf.at("k")) * float(conf.value("refine_ratio", 1.0f)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if there is inconsistency between this build param, and the regular search param k? Do we throw a reasonable error message?

if (batch_sizes_.has_value()) { batch_sizes_.value()(i).store(0, kMemOrder); }
dispatch_sequence_id_[i].store(uint32_t(-1), kMemOrder);
tokens_(i).store(
batch_token{static_cast<uint32_t>(slot_state::kEmpty) * kSize + kCounterLocMask},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain what the token values represent? Do we have any hidden assumption on how kSize of the batch queue relate to these state values like kEmpty?

private:
cuda::atomic<int32_t, cuda::thread_scope_system>* cpu_provided_remaining_time_us_;
uint64_t timestamp_ns_ = 0;
int32_t local_remaining_time_us_ = std::numeric_limits<int32_t>::max();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this initialization practically mean, that the time is counted from the first call to has_time(), because only then will the local_remaining_time_us_ variable set to the cpu provided value?

}

private:
raft::resources res_; // Sic! Store by value to copy the resource.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unusual. Why do we need to copy?

*
* The CPU threads atomically increment this counter until its size reaches `max_batch_size`.
*
* Any (CPU or GPU thread) my atomically write to the highest byte of this value, which indicates
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May?

Suggested change
* Any (CPU or GPU thread) my atomically write to the highest byte of this value, which indicates
* Any (CPU or GPU thread) may atomically write to the highest byte of this value, which indicates

Comment on lines +868 to +876
const auto seq_id = batch_queue_.head();
const auto commit_result = try_commit(seq_id, n_queries);
// The bool (busy or not) returned if no queries were committed:
if (std::holds_alternative<bool>(commit_result)) {
// Pause if the system is busy
// (otherwise the progress is guaranteed due to update of the head counter)
if (std::get<bool>(commit_result)) { to_commit.wait(); }
continue; // Try to get a new batch token
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unaware of the intricacies of how to queue the work, but it seems that we are doin queue state management at multiple levels: head(), is checking tail position and potentially waits, try_commit() is checking batch_status and maybe commits, maybe not, here in the loop we are checking the status and potentially waiting and trying again.

To keep the code simple, it would be great if try_commit not just tries, but actually commits it by moving this logit there.

But if there is a good reason to organize the logic this way, that could be also fine, after all this is an implementation detail.

Comment on lines +1039 to +1057
// The interpretation of the token status depends on the current seq_order_id and a similar
// counter in the token. This is to prevent conflicts when too many parallel requests wrap
// over the whole ring buffer (batch_queue_t).
token_status = batch_queue::batch_status(batch_token_observed, seq_id);
// Busy status means the current thread is a whole ring buffer ahead of the token.
// The thread should wait for the rest of the system.
if (token_status == slot_state::kFullBusy || token_status == slot_state::kEmptyBusy) {
return true;
}
// This branch checks if the token was recently filled or dispatched.
// This means the head counter of the ring buffer is slightly outdated.
if (token_status == slot_state::kEmptyPast || token_status == slot_state::kFullPast ||
batch_token_observed.size_committed() >= max_batch_size_) {
batch_queue_.pop(seq_id);
return false;
}
batch_token_updated = batch_token_observed;
batch_token_updated.size_committed() =
std::min(batch_token_observed.size_committed() + n_queries, max_batch_size_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the user of the queue have to be aware of all the possible states? Can't we hide this as implementation detail of the queue? In other words, could we have a head() function which simply return a slot that is valid, and move these state comparison details into the queue?

Comment on lines +886 to +906
local_waiter till_full{std::chrono::nanoseconds(size_t(params.dispatch_timeout_ms * 1e5)),
batch_queue_.niceness(seq_id)};
while (batch_queue::batch_status(batch_token_observed, seq_id) != slot_state::kFull) {
/* Note: waiting for batch IO buffers
The CPU threads can commit to the incoming batches in the queue in advance (this happens in
try_commit).
In this loop, a thread waits for the batch IO buffer to be released by a running search on
the GPU side (scatter_outputs kernel). Hence, this loop is engaged only if all buffers are
currently used, which suggests that the GPU is busy (or there's not enough IO buffers).
This also means the current search is not likely to meet the deadline set by the user.

The scatter kernel returns its buffer id into an acquired slot in the batch queue; in this
loop we wait for that id to arrive.

Generally, we want to waste as little as possible CPU cycles here to let other threads wait
on dispatch_sequence_id_ref below more efficiently. At the same time, we shouldn't use
`.wait()` here, because `.notify_all()` would have to come from GPU.
*/
till_full.wait();
batch_token_observed = batch_token_ref.load(cuda::std::memory_order_acquire);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be moved to a helper function of batch_queue, to keep this state checking an internal detail of the queue?

/* The remaining time may be updated on the host side: a thread with a tighter deadline may reduce
it (but not increase). */
cuda::atomic<int32_t, cuda::thread_scope_system>* remaining_time_us,
/* The token contains the current number of queries committed and is cleared in this kernel. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does committed mean? What is fully_committed?

&params = search_params_dynb,
index = index_dynb.value(),
query_view = raft::make_device_matrix_view<data_type, int64_t>(
queries->data_handle() + i * ps.dim, 1, ps.dim),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we submit one query at a time, right? Could we test with more than one queries as well?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CMake cpp improvement Improves an existing functionality non-breaking Introduces a non-breaking change
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

3 participants