Skip to content

Commit

Permalink
Global stream pool (#13922)
Browse files Browse the repository at this point in the history
#13637 added a static stream pool object for use by the Parquet reader. This PR expands upon that by:

- Moving the stream pool to the `cudf::detail` namespace.
- Adding a debugging implementation that always returns the default stream.
- Hiding implementation details behind a more streamlined interface.
- Using cuda events for synchronization.

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Mark Harris (https://github.com/harrism)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Mark Harris (https://github.com/harrism)

URL: #13922
  • Loading branch information
etseidl authored Sep 13, 2023
1 parent 258e0fe commit 3be772f
Show file tree
Hide file tree
Showing 5 changed files with 341 additions and 71 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ add_library(
src/utilities/linked_column.cpp
src/utilities/logger.cpp
src/utilities/stacktrace.cpp
src/utilities/stream_pool.cpp
src/utilities/traits.cpp
src/utilities/type_checks.cpp
src/utilities/type_dispatcher.cpp
Expand Down
64 changes: 64 additions & 0 deletions cpp/include/cudf/detail/utilities/stream_pool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>

#include <cstddef>
#include <vector>

namespace cudf::detail {

/**
* @brief Acquire a set of `cuda_stream_view` objects and synchronize them to an event on another
* stream.
*
* By default an underlying `rmm::cuda_stream_pool` is used to obtain the streams. The only other
* implementation at present is a debugging version that always returns the stream returned by
* `cudf::get_default_stream()`. To use this debugging version, set the environment variable
* `LIBCUDF_USE_DEBUG_STREAM_POOL`.
*
* Example usage:
* @code{.cpp}
* auto stream = cudf::get_default_stream();
* auto const num_streams = 2;
* // do work on stream
* // allocate streams and wait for an event on stream before executing on any of streams
* auto streams = cudf::detail::fork_stream(stream, num_streams);
* // do work on streams[0] and streams[1]
* // wait for event on streams before continuing to do work on stream
* cudf::detail::join_streams(streams, stream);
* @endcode
*
* @param stream Stream that the returned streams will wait on.
* @param count The number of `cuda_stream_view` objects to return.
* @return Vector containing `count` stream views.
*/
[[nodiscard]] std::vector<rmm::cuda_stream_view> fork_streams(rmm::cuda_stream_view stream,
std::size_t count);

/**
* @brief Synchronize a stream to an event on a set of streams.
*
* @param streams Streams to wait on.
* @param stream Joined stream that synchronizes with the waited-on streams.
*/
void join_streams(host_span<rmm::cuda_stream_view const> streams, rmm::cuda_stream_view stream);

} // namespace cudf::detail
43 changes: 13 additions & 30 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,15 @@

#include <cudf/detail/stream_compaction.hpp>
#include <cudf/detail/transform.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <rmm/cuda_stream_pool.hpp>

#include <bitset>
#include <numeric>

namespace cudf::io::detail::parquet {

namespace {

int constexpr NUM_DECODERS = 3; // how many decode kernels are there to run
int constexpr APPROX_NUM_THREADS = 4; // guestimate from DaveB
int constexpr STREAM_POOL_SIZE = NUM_DECODERS * APPROX_NUM_THREADS;

auto& get_stream_pool()
{
// TODO: creating this on the heap because there were issues with trying to call the
// stream pool destructor during cuda shutdown that lead to a segmentation fault in
// nvbench. this allocation is being deliberately leaked to avoid the above, but still
// results in non-fatal warnings when running nvbench in cuda-gdb.
static auto pool = new rmm::cuda_stream_pool{STREAM_POOL_SIZE};
return *pool;
}

} // namespace

void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
{
auto& chunks = _file_itm_data.chunks;
Expand Down Expand Up @@ -178,34 +162,33 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
chunks.host_to_device_async(_stream);
chunk_nested_valids.host_to_device_async(_stream);
chunk_nested_data.host_to_device_async(_stream);
_stream.synchronize();

auto const level_type_size = _file_itm_data.level_type_size;
// get the number of streams we need from the pool and tell them to wait on the H2D copies
int const nkernels = std::bitset<32>(kernel_mask).count();
auto streams = cudf::detail::fork_streams(_stream, nkernels);

// vector of launched streams
std::vector<rmm::cuda_stream_view> streams;
auto const level_type_size = _file_itm_data.level_type_size;

// launch string decoder
int s_idx = 0;
if (has_strings) {
streams.push_back(get_stream_pool().get_stream());
chunk_nested_str_data.host_to_device_async(streams.back());
gpu::DecodeStringPageData(pages, chunks, num_rows, skip_rows, level_type_size, streams.back());
auto& stream = streams[s_idx++];
chunk_nested_str_data.host_to_device_async(stream);
gpu::DecodeStringPageData(pages, chunks, num_rows, skip_rows, level_type_size, stream);
}

// launch delta binary decoder
if ((kernel_mask & gpu::KERNEL_MASK_DELTA_BINARY) != 0) {
streams.push_back(get_stream_pool().get_stream());
gpu::DecodeDeltaBinary(pages, chunks, num_rows, skip_rows, level_type_size, streams.back());
gpu::DecodeDeltaBinary(pages, chunks, num_rows, skip_rows, level_type_size, streams[s_idx++]);
}

// launch the catch-all page decoder
if ((kernel_mask & gpu::KERNEL_MASK_GENERAL) != 0) {
streams.push_back(get_stream_pool().get_stream());
gpu::DecodePageData(pages, chunks, num_rows, skip_rows, level_type_size, streams.back());
gpu::DecodePageData(pages, chunks, num_rows, skip_rows, level_type_size, streams[s_idx++]);
}

// synchronize the streams
std::for_each(streams.begin(), streams.end(), [](auto& stream) { stream.synchronize(); });
cudf::detail::join_streams(streams, _stream);

pages.device_to_host_async(_stream);
page_nesting.device_to_host_async(_stream);
Expand Down
48 changes: 7 additions & 41 deletions cpp/src/io/text/multibyte_split.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/io/text/byte_range_info.hpp>
#include <cudf/io/text/data_chunk_source.hpp>
#include <cudf/io/text/detail/multistate.hpp>
Expand All @@ -32,7 +33,6 @@
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_pool.hpp>
#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>
#include <rmm/mr/device/device_memory_resource.hpp>
Expand Down Expand Up @@ -301,44 +301,12 @@ namespace io {
namespace text {
namespace detail {

void fork_stream(std::vector<rmm::cuda_stream_view> streams, rmm::cuda_stream_view stream)
{
cudaEvent_t event;
CUDF_CUDA_TRY(cudaEventCreate(&event));
CUDF_CUDA_TRY(cudaEventRecord(event, stream));
for (uint32_t i = 0; i < streams.size(); i++) {
CUDF_CUDA_TRY(cudaStreamWaitEvent(streams[i], event, 0));
}
CUDF_CUDA_TRY(cudaEventDestroy(event));
}

void join_stream(std::vector<rmm::cuda_stream_view> streams, rmm::cuda_stream_view stream)
{
cudaEvent_t event;
CUDF_CUDA_TRY(cudaEventCreate(&event));
for (uint32_t i = 0; i < streams.size(); i++) {
CUDF_CUDA_TRY(cudaEventRecord(event, streams[i]));
CUDF_CUDA_TRY(cudaStreamWaitEvent(stream, event, 0));
}
CUDF_CUDA_TRY(cudaEventDestroy(event));
}

std::vector<rmm::cuda_stream_view> get_streams(int32_t count, rmm::cuda_stream_pool& stream_pool)
{
auto streams = std::vector<rmm::cuda_stream_view>();
for (int32_t i = 0; i < count; i++) {
streams.emplace_back(stream_pool.get_stream());
}
return streams;
}

std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source const& source,
std::string const& delimiter,
byte_range_info byte_range,
bool strip_delimiters,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr,
rmm::cuda_stream_pool& stream_pool)
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();

Expand All @@ -365,8 +333,7 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
CUDF_EXPECTS(delimiter.size() < multistate::max_segment_value,
"delimiter contains too many total tokens to produce a deterministic result.");

auto concurrency = 2;
auto streams = get_streams(concurrency, stream_pool);
auto const concurrency = 2;

// must be at least 32 when using warp-reduce on partials
// must be at least 1 more than max possible concurrent tiles
Expand Down Expand Up @@ -411,7 +378,7 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
output_builder<byte_offset> row_offset_storage(ITEMS_PER_CHUNK, max_growth, stream);
output_builder<char> char_storage(ITEMS_PER_CHUNK, max_growth, stream);

fork_stream(streams, stream);
auto streams = cudf::detail::fork_streams(stream, concurrency);

cudaEvent_t last_launch_event;
CUDF_CUDA_TRY(cudaEventCreate(&last_launch_event));
Expand Down Expand Up @@ -532,7 +499,7 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source

CUDF_CUDA_TRY(cudaEventDestroy(last_launch_event));

join_stream(streams, stream);
cudf::detail::join_streams(streams, stream);

// if the input was empty, we didn't find a delimiter at all,
// or the first delimiter was also the last: empty output
Expand Down Expand Up @@ -602,11 +569,10 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
parse_options options,
rmm::mr::device_memory_resource* mr)
{
auto stream = cudf::get_default_stream();
auto stream_pool = rmm::cuda_stream_pool(2);
auto stream = cudf::get_default_stream();

auto result = detail::multibyte_split(
source, delimiter, options.byte_range, options.strip_delimiters, stream, mr, stream_pool);
source, delimiter, options.byte_range, options.strip_delimiters, stream, mr);

return result;
}
Expand Down
Loading

0 comments on commit 3be772f

Please sign in to comment.