From 50bd444d7c8fe637f56d9e9038316451f3b13d42 Mon Sep 17 00:00:00 2001 From: David Reveman Date: Thu, 28 Nov 2024 18:10:36 -0500 Subject: [PATCH] feat: Add key/value support to radix sort algorithm in breeze --- .../breeze/breeze/algorithms/sort.h | 96 +++++-- .../perftest/algorithms/sort_perftest.cu | 261 +++++++++++++++--- .../breeze/perftest/queries/device_column.h | 15 +- .../perftest/queries/order_by_perftest.cu | 218 +++++++++++++-- .../algorithms/algorithm-kernels.template.h | 51 +++- .../test/algorithms/algorithm_test.template.h | 11 +- .../breeze/test/algorithms/kernels.clcpp | 34 ++- .../breeze/test/algorithms/kernels.metal | 46 +-- .../breeze/test/algorithms/sort_unittest.cpp | 59 +++- .../algorithms/algorithm_test-cuda.cuh | 14 +- .../algorithms/algorithm_test-openmp.h | 17 +- .../generated/algorithms/kernels-cuda.cuh | 53 ++-- .../generated/algorithms/kernels-openmp.h | 50 +++- 13 files changed, 718 insertions(+), 207 deletions(-) diff --git a/velox/experimental/breeze/breeze/algorithms/sort.h b/velox/experimental/breeze/breeze/algorithms/sort.h index c09218445c4f..7b702684bb9d 100644 --- a/velox/experimental/breeze/breeze/algorithms/sort.h +++ b/velox/experimental/breeze/breeze/algorithms/sort.h @@ -146,7 +146,20 @@ struct SortBlockType { } }; -template +template +struct KeyValueScatterType { + KeyT keys[BLOCK_ITEMS]; + ValueT values[BLOCK_ITEMS]; +}; + +// partial specialization where ValueT is NullType +template +struct KeyValueScatterType { + KeyT keys[BLOCK_ITEMS]; +}; + +template struct DeviceRadixSort { enum { BLOCK_THREADS = PlatformT::BLOCK_THREADS, @@ -164,25 +177,26 @@ struct DeviceRadixSort { unsigned global_offsets[NUM_BINS]; int block_idx; }; - struct { - T items[BLOCK_ITEMS]; - } scatter; + KeyValueScatterType scatter; }; }; - template - static ATTR void Sort(PlatformT p, const InputSlice in, + template + static ATTR void Sort(PlatformT p, const KeyInputSlice in_keys, + const ValueInputSlice in_values, const OffsetSlice in_offsets, int start_bit, - int num_pass_bits, OutputSlice out, + int num_pass_bits, KeyOutputSlice out_keys, + ValueOutputSlice out_values, BlockIdxSlice next_block_idx, BlockSlice blocks, ScratchSlice scratch, int num_items) { using namespace functions; using namespace utils; enum { - END_BIT = sizeof(T) * /*BITS_PER_BYTE=*/8, + END_BIT = sizeof(KeyT) * /*BITS_PER_BYTE=*/8, WARP_THREADS = PlatformT::WARP_THREADS, NUM_WARPS = BLOCK_THREADS / WARP_THREADS, WARP_ITEMS = WARP_THREADS * ITEMS_PER_THREAD, @@ -211,19 +225,19 @@ struct DeviceRadixSort { // load items into warp-striped arrangement after initializing all values // to all bits set as that allows us to always use the fast-path version // radix rank function - T items[ITEMS_PER_THREAD]; + KeyT keys[ITEMS_PER_THREAD]; #pragma unroll for (int i = 0; i < ITEMS_PER_THREAD; ++i) { - items[i] = NumericLimits::max(); + keys[i] = NumericLimits::max(); } - const InputSlice it = in.subslice(block.offset); + const KeyInputSlice it = in_keys.subslice(block.offset); BlockLoad( - p, it, make_slice(items), block.num_items); + p, it, make_slice(keys), block.num_items); // convert items to bit ordered representation #pragma unroll for (int i = 0; i < ITEMS_PER_THREAD; ++i) { - items[i] = RadixSortTraits::to_bit_ordered(items[i]); + keys[i] = RadixSortTraits::to_bit_ordered(keys[i]); } // determine stable rank for each item @@ -232,18 +246,31 @@ struct DeviceRadixSort { int exclusive_scan[BINS_PER_THREAD]; BlockRadixRankT::Rank( p, - make_bitfield_extractor(make_slice(items), + make_bitfield_extractor(make_slice(keys), start_bit, num_pass_bits), make_slice(ranks), make_slice(histogram), blocks.subslice(block_idx * NUM_BINS), make_slice(exclusive_scan), make_slice(&scratch->rank)); p.syncthreads(); - // scatter items by storing them in shared memory using ranks + // scatter keys by storing them in scratch using ranks BlockStoreAt( - p, make_slice(items), + p, make_slice(keys), make_slice(ranks), - make_slice(scratch->scatter.items)); + make_slice(scratch->scatter.keys)); + + // load and scatter optional values + ValueT values[ITEMS_PER_THREAD]; + if constexpr (IsDifferent::VALUE) { + const ValueInputSlice it = in_values.subslice(block.offset); + BlockLoad( + p, it, make_slice(values), block.num_items); + // scatter values by storing them in scratch using ranks + BlockStoreAt( + p, make_slice(values), + make_slice(ranks), + make_slice(scratch->scatter.values)); + } p.syncthreads(); // first block loads initial global offsets from input and other blocks @@ -334,9 +361,16 @@ struct DeviceRadixSort { global_offsets[i] -= exclusive_scan[i]; } - // gather scattered items from scratch + // gather scattered keys from scratch BlockLoad( - p, make_slice(scratch->scatter.items), make_slice(items)); + p, make_slice(scratch->scatter.keys), make_slice(keys)); + + // gather optional scattered values from scratch + if constexpr (IsDifferent::VALUE) { + // gather scattered values from scratch + BlockLoad( + p, make_slice(scratch->scatter.values), make_slice(values)); + } p.syncthreads(); // store global offsets in scratch @@ -349,7 +383,7 @@ struct DeviceRadixSort { unsigned out_offsets[ITEMS_PER_THREAD]; BlockLoadFrom( p, make_slice(scratch->global_offsets), - make_bitfield_extractor(make_slice(items), start_bit, num_pass_bits), + make_bitfield_extractor(make_slice(keys), start_bit, num_pass_bits), make_slice(out_offsets)); // add item index (same as rank after scatter/gather) to output offsets @@ -358,15 +392,23 @@ struct DeviceRadixSort { out_offsets[i] += p.thread_idx() + i * BLOCK_THREADS; } - // convert items back to original representation + // convert keys back to original representation #pragma unroll for (int i = 0; i < ITEMS_PER_THREAD; ++i) { - items[i] = RadixSortTraits::from_bit_ordered(items[i]); + keys[i] = RadixSortTraits::from_bit_ordered(keys[i]); } - // store gathered items in global memory using output offsets - BlockStoreAt( - p, make_slice(items), make_slice(out_offsets), out, block.num_items); + // store gathered keys in global memory using output offsets + BlockStoreAt(p, make_slice(keys), + make_slice(out_offsets), + out_keys, block.num_items); + + // store gathered values in global memory using output offsets + if constexpr (IsDifferent::VALUE) { + BlockStoreAt( + p, make_slice(values), make_slice(out_offsets), out_values, + block.num_items); + } } }; diff --git a/velox/experimental/breeze/perftest/algorithms/sort_perftest.cu b/velox/experimental/breeze/perftest/algorithms/sort_perftest.cu index 56513553181c..f509538c8dd8 100644 --- a/velox/experimental/breeze/perftest/algorithms/sort_perftest.cu +++ b/velox/experimental/breeze/perftest/algorithms/sort_perftest.cu @@ -43,19 +43,42 @@ namespace kernels { enum { CUDA_WARP_THREADS = 32 }; template + typename OffsetT, typename BlockT> __global__ __launch_bounds__(BLOCK_THREADS) void RadixSort( - const T* in, const U* in_offsets, int start_bit, int num_pass_bits, T* out, + const T* in, const OffsetT* in_offsets, int start_bit, int num_pass_bits, + T* out, int* next_block_idx, BlockT* blocks, int num_items) { + CudaPlatform p; + using DeviceRadixSortT = + DeviceRadixSort; + extern __shared__ char radix_sort_scratch[]; + auto scratch = + reinterpret_cast(radix_sort_scratch); + DeviceRadixSortT::template Sort( + p, make_slice(in), breeze::utils::make_empty_slice(), + make_slice(in_offsets), start_bit, num_pass_bits, + make_slice(out), breeze::utils::make_empty_slice(), + make_slice(next_block_idx), make_slice(blocks), + make_slice(scratch).template reinterpret(), num_items); +} + +template +__global__ __launch_bounds__(BLOCK_THREADS) void RadixSort( + const T* in_keys, const U* in_values, const OffsetT* in_offsets, + int start_bit, int num_pass_bits, T* out_keys, U* out_values, int* next_block_idx, BlockT* blocks, int num_items) { CudaPlatform p; using DeviceRadixSortT = - DeviceRadixSort; - __shared__ typename DeviceRadixSortT::Scratch scratch; + DeviceRadixSort; + extern __shared__ char radix_sort_scratch[]; + auto scratch = + reinterpret_cast(radix_sort_scratch); DeviceRadixSortT::template Sort( - p, make_slice(in), make_slice(in_offsets), start_bit, - num_pass_bits, make_slice(out), + p, make_slice(in_keys), make_slice(in_values), + make_slice(in_offsets), start_bit, num_pass_bits, + make_slice(out_keys), make_slice(out_values), make_slice(next_block_idx), make_slice(blocks), - make_slice(&scratch).template reinterpret(), num_items); + make_slice(scratch).template reinterpret(), num_items); } template > start_bit) & ((1u << num_pass_bits) - 1); } -using SortConfig = PerfTestArrayConfig<11>; - -const SortConfig kConfig = {{{"num_input_rows", "400000"}, - {"num_input_rows_short", "6400"}, - {"num_input_rows_grande", "6400000"}, - {"num_input_rows_venti", "64000000"}, - {"input_generate_method", "RANDOM"}, - {"input_random_engine", "MT19937"}, - {"input_random_shuffle", "1"}, - {"input_random_stride", "1000"}, - {"input_random_stride_short", "10"}, - {"input_random_stride_grande", "100000"}, - {"input_random_stride_venti", "100000"}}}; +using SortConfig = PerfTestArrayConfig<16>; + +const SortConfig kConfig = {{ + {"num_key_rows", "400000"}, + {"num_key_rows_short", "6400"}, + {"num_key_rows_grande", "6400000"}, + {"num_key_rows_venti", "64000000"}, + {"key_generate_method", "RANDOM"}, + {"key_random_engine", "MT19937"}, + {"key_random_shuffle", "1"}, + {"key_random_stride", "1000"}, + {"key_random_stride_short", "10"}, + {"key_random_stride_grande", "100000"}, + {"key_random_stride_venti", "100000"}, + {"num_value_rows", "400000"}, + {"num_value_rows_short", "6400"}, + {"num_value_rows_grande", "6400000"}, + {"num_value_rows_venti", "64000000"}, + {"value_generate_method", "SEQUENCE"}, +}}; template class SortPerfTest : public PerfTest, public testing::Test { @@ -153,24 +183,35 @@ using TestTypes = TYPED_TEST_SUITE(SortPerfTest, TestTypes, TestTypeNames); TYPED_TEST(SortPerfTest, RadixSort) { - using value_type = typename TypeParam::item_type::type; + using item_type = typename TypeParam::item_type::type; using size_type = unsigned; using block_type = unsigned; - auto input = this->template GetConfigColumn("input"); + auto input = this->template GetConfigColumn("key"); ASSERT_NE(input.size(), 0u); auto check_result = this->GetConfigValue("check_result", true); - device_vector items(input.size()); + device_vector items(input.size()); constexpr int kBlockThreads = TypeParam::launch_params::BLOCK_THREADS; constexpr int kItemsPerThread = TypeParam::launch_params::ITEMS_PER_THREAD; constexpr int kBlockItems = kBlockThreads * kItemsPerThread; constexpr int kRadixBits = TypeParam::RADIX_BITS; - constexpr int kEndBit = sizeof(value_type) * /*BITS_PER_BYTE=*/8; + constexpr int kEndBit = sizeof(item_type) * /*BITS_PER_BYTE=*/8; constexpr int kNumBins = 1 << kRadixBits; + constexpr int kRadixSortSharedMemorySize = + sizeof(typename DeviceRadixSort< + CudaPlatform, + kItemsPerThread, kRadixBits, item_type, NullType>::Scratch); + if (kRadixSortSharedMemorySize > this->MaxSharedMemory() && + !getenv("GTEST_ALSO_RUN_SKIPPED_TESTS")) { + GTEST_SKIP() << "skipping test that requires too much shared memory: " + << kRadixSortSharedMemorySize << " > " + << this->MaxSharedMemory(); + } + auto start_bit = this->GetConfigValue("start_bit", 0); ASSERT_LT(start_bit, kEndBit); @@ -193,7 +234,7 @@ TYPED_TEST(SortPerfTest, RadixSort) { device_vector blocks(num_blocks * kNumBins); device_vector prefix_sum(kNumBins); device_vector offsets(kNumBins); - device_vector out(input.size()); + device_vector out(input.size()); // copy input to device memory items.copy_from_host(input.data(), input.size()); @@ -201,10 +242,15 @@ TYPED_TEST(SortPerfTest, RadixSort) { // provide throughput information this->set_element_count(input.size()); - this->set_element_size(sizeof(value_type)); + this->set_element_size(sizeof(item_type)); this->set_elements_per_thread(kItemsPerThread); - this->template set_global_memory_loads(input.size()); - this->template set_global_memory_stores(input.size()); + this->template set_global_memory_loads(input.size()); + this->template set_global_memory_stores(input.size()); + + cudaFuncSetAttribute( + &kernels::RadixSort, + cudaFuncAttributeMaxDynamicSharedMemorySize, kRadixSortSharedMemorySize); this->MeasureWithSetup( kConfig, @@ -217,17 +263,17 @@ TYPED_TEST(SortPerfTest, RadixSort) { }, [&]() { kernels::RadixSort - <<>>( + <<>>( items.data(), offsets.data(), start_bit, num_pass_bits, out.data(), next_block_idx.data(), blocks.data(), items.size()); }); if (check_result) { - std::vector actual_result(out.size()); + std::vector actual_result(out.size()); out.copy_to_host(actual_result.data(), actual_result.size()); - std::vector expected_result = input; + std::vector expected_result = input; std::stable_sort(expected_result.begin(), expected_result.end(), - [start_bit, num_pass_bits](value_type a, value_type b) { + [start_bit, num_pass_bits](item_type a, item_type b) { return extract_bits(a, start_bit, num_pass_bits) < extract_bits(b, start_bit, num_pass_bits); }); @@ -235,20 +281,145 @@ TYPED_TEST(SortPerfTest, RadixSort) { } } -const SortConfig kHistogramConfig = {{{"num_input_rows", "16750000"}, - {"num_input_rows_short", "2048000"}, - {"num_input_rows_grande", "268000000"}, - {"num_input_rows_venti", "2144000000"}, - {"input_generate_method", "RANDOM"}, - {"input_random_engine", "MT19937"}, - {"input_random_shuffle", "1"}, - {"input_random_stride", "1000"}, - {"input_random_stride_short", "10"}, - {"input_random_stride_grande", "100000"}, - {"input_random_stride_venti", "100000"}}}; +TYPED_TEST(SortPerfTest, RadixSortKeyValues) { + using key_type = typename TypeParam::item_type::type; + using value_type = unsigned; + using size_type = unsigned; + using block_type = unsigned; + + auto input_keys = this->template GetConfigColumn("key"); + ASSERT_NE(input_keys.size(), 0u); + + auto input_values = this->template GetConfigColumn("value"); + ASSERT_EQ(input_values.size(), input_keys.size()); + + auto check_result = this->GetConfigValue("check_result", true); + + device_vector keys(input_keys.size()); + device_vector values(input_values.size()); + + constexpr int kBlockThreads = TypeParam::launch_params::BLOCK_THREADS; + constexpr int kItemsPerThread = TypeParam::launch_params::ITEMS_PER_THREAD; + constexpr int kBlockItems = kBlockThreads * kItemsPerThread; + constexpr int kRadixBits = TypeParam::RADIX_BITS; + constexpr int kEndBit = sizeof(key_type) * /*BITS_PER_BYTE=*/8; + constexpr int kNumBins = 1 << kRadixBits; + + constexpr int kRadixSortSharedMemorySize = + sizeof(typename DeviceRadixSort< + CudaPlatform, + kItemsPerThread, kRadixBits, key_type, value_type>::Scratch); + if (kRadixSortSharedMemorySize > this->MaxSharedMemory() && + !getenv("GTEST_ALSO_RUN_SKIPPED_TESTS")) { + GTEST_SKIP() << "skipping test that requires too much shared memory: " + << kRadixSortSharedMemorySize << " > " + << this->MaxSharedMemory(); + } + + auto start_bit = this->GetConfigValue("start_bit", 0); + ASSERT_LT(start_bit, kEndBit); + + int num_pass_bits = std::min(kRadixBits, kEndBit - start_bit); + std::vector input_histogram(kNumBins); + for (const auto& key : input_keys) { + int bin = extract_bits(to_bit_ordered(key), start_bit, num_pass_bits); + input_histogram[bin] += 1u; + } + size_type sum = 0; + std::vector input_prefix_sum(kNumBins); + for (size_t i = 0; i < kNumBins; ++i) { + input_prefix_sum[i] = sum; + sum += input_histogram[i]; + } + + int num_blocks = (input_keys.size() + kBlockItems - 1) / kBlockItems; + + device_vector next_block_idx(1); + device_vector blocks(num_blocks * kNumBins); + device_vector prefix_sum(kNumBins); + device_vector offsets(kNumBins); + device_vector out_keys(input_keys.size()); + device_vector out_values(input_values.size()); + + // copy input to device memory + keys.copy_from_host(input_keys.data(), input_keys.size()); + values.copy_from_host(input_values.data(), input_values.size()); + prefix_sum.copy_from_host(input_prefix_sum.data(), input_prefix_sum.size()); + + // provide throughput information + constexpr size_t kKVSize = sizeof(key_type) + sizeof(value_type); + this->set_element_count(input_keys.size()); + this->set_element_size(kKVSize); + this->set_elements_per_thread(kItemsPerThread); + this->template set_global_memory_loads(input_keys.size() * kKVSize); + this->template set_global_memory_stores(input_keys.size() * kKVSize); + + cudaFuncSetAttribute( + &kernels::RadixSort, + cudaFuncAttributeMaxDynamicSharedMemorySize, kRadixSortSharedMemorySize); + + this->MeasureWithSetup( + kConfig, + [&]() { + cudaMemsetAsync(next_block_idx.data(), 0, sizeof(int)); + cudaMemsetAsync(blocks.data(), 0, + sizeof(block_type) * num_blocks * kNumBins); + cudaMemcpyAsync(offsets.data(), prefix_sum.data(), + sizeof(size_type) * kNumBins, cudaMemcpyDeviceToDevice); + }, + [&]() { + kernels::RadixSort + <<>>( + keys.data(), values.data(), offsets.data(), start_bit, + num_pass_bits, out_keys.data(), out_values.data(), + next_block_idx.data(), blocks.data(), keys.size()); + }); + + if (check_result) { + std::vector indices(keys.size()); + std::iota(indices.begin(), indices.end(), 0); + std::stable_sort( + indices.begin(), indices.end(), + [&input_keys, start_bit, num_pass_bits](unsigned a, unsigned b) { + return extract_bits(to_bit_ordered(input_keys[a]), start_bit, + num_pass_bits) < + extract_bits(to_bit_ordered(input_keys[b]), start_bit, + num_pass_bits); + }); + std::vector expected_out_keys(input_keys.size()); + std::vector expected_out_values(input_values.size()); + for (size_t i = 0; i < indices.size(); ++i) { + expected_out_keys[i] = input_keys[indices[i]]; + expected_out_values[i] = input_values[indices[i]]; + } + std::vector actual_result_keys(out_keys.size()); + out_keys.copy_to_host(actual_result_keys.data(), actual_result_keys.size()); + std::vector actual_result_values(out_values.size()); + out_values.copy_to_host(actual_result_values.data(), + actual_result_values.size()); + EXPECT_EQ(expected_out_keys, actual_result_keys); + EXPECT_EQ(expected_out_values, actual_result_values); + } +} + +using SortHistogramConfig = PerfTestArrayConfig<11>; + +const SortHistogramConfig kHistogramConfig = { + {{"num_input_rows", "16750000"}, + {"num_input_rows_short", "2048000"}, + {"num_input_rows_grande", "268000000"}, + {"num_input_rows_venti", "2144000000"}, + {"input_generate_method", "RANDOM"}, + {"input_random_engine", "MT19937"}, + {"input_random_shuffle", "1"}, + {"input_random_stride", "1000"}, + {"input_random_stride_short", "10"}, + {"input_random_stride_grande", "100000"}, + {"input_random_stride_venti", "100000"}}}; template -class SortHistogramPerfTest : public PerfTest, +class SortHistogramPerfTest : public PerfTest, public testing::Test { public: template @@ -342,7 +513,7 @@ TYPED_TEST(SortHistogramPerfTest, RadixSortHistogram) { this->set_global_memory_stores(num_atomic_adds * sizeof(size_type)); this->MeasureWithSetup( - kConfig, + kHistogramConfig, [&]() { cudaMemsetAsync(histogram.data(), 0, sizeof(size_type) * kHistogramSize); diff --git a/velox/experimental/breeze/perftest/queries/device_column.h b/velox/experimental/breeze/perftest/queries/device_column.h index 12cf2f893d72..3ffed1588658 100644 --- a/velox/experimental/breeze/perftest/queries/device_column.h +++ b/velox/experimental/breeze/perftest/queries/device_column.h @@ -40,26 +40,21 @@ class device_column : public utils::device_vector { template > class device_column_buffered { typedef typename Allocator::template rebind::other PtrAllocator; - typedef typename Allocator::template rebind::other SelectorAllocator; public: explicit device_column_buffered(const Allocator& allocator = Allocator()) : buffers_{device_column(allocator), device_column(allocator)}, - ptrs_( - utils::device_vector(2, PtrAllocator(allocator))), - selector_(utils::device_vector( - 1, SelectorAllocator(allocator))) { + ptrs_(utils::device_vector(2, + PtrAllocator(allocator))) { UpdatePtrs(); } explicit device_column_buffered(utils::size_type size, const Allocator& allocator = Allocator()) : buffers_{device_column(size, allocator), device_column(size, allocator)}, - ptrs_( - utils::device_vector(2, PtrAllocator(allocator))), - selector_(utils::device_vector( - 1, SelectorAllocator(allocator))) { + ptrs_(utils::device_vector(2, + PtrAllocator(allocator))) { UpdatePtrs(); } @@ -71,7 +66,6 @@ class device_column_buffered { size_t size() const { return buffers_[0].size(); } device_column& buffer(int index) { return buffers_[index]; } utils::device_vector& ptrs() { return ptrs_; } - utils::device_vector& selector() { return selector_; } private: void UpdatePtrs() { @@ -81,7 +75,6 @@ class device_column_buffered { device_column buffers_[2]; utils::device_vector ptrs_; - utils::device_vector selector_; }; } // namespace breeze diff --git a/velox/experimental/breeze/perftest/queries/order_by_perftest.cu b/velox/experimental/breeze/perftest/queries/order_by_perftest.cu index c411401edafb..4a0a3ba32759 100644 --- a/velox/experimental/breeze/perftest/queries/order_by_perftest.cu +++ b/velox/experimental/breeze/perftest/queries/order_by_perftest.cu @@ -140,9 +140,9 @@ __global__ __launch_bounds__(BLOCK_THREADS) void UpdateBufferSelectors( } template + typename OffsetT, typename BlockT> __global__ __launch_bounds__(BLOCK_THREADS) void RadixSort( - const int* in_buffer_selectors, const U* in_offsets, int start_bit, + const int* in_buffer_selectors, const OffsetT* in_offsets, int start_bit, int num_pass_bits, T* buffers[2], int* next_block_idx, BlockT* blocks, int num_items) { using namespace algorithms; @@ -150,8 +150,10 @@ __global__ __launch_bounds__(BLOCK_THREADS) void RadixSort( CudaPlatform p; using RadixSortT = - DeviceRadixSort; - __shared__ typename RadixSortT::Scratch scratch; + DeviceRadixSort; + extern __shared__ char radix_sort_scratch[]; + auto scratch = + reinterpret_cast(radix_sort_scratch); // load buffer selectors int current_selector = in_buffer_selectors[0]; @@ -161,18 +163,53 @@ __global__ __launch_bounds__(BLOCK_THREADS) void RadixSort( if (current_selector != alternate_selector) { const T* in = buffers[current_selector]; T* out = buffers[alternate_selector]; + RadixSortT::template Sort( + p, make_slice(in), make_empty_slice(), + make_slice(in_offsets), start_bit, num_pass_bits, + make_slice(out), make_empty_slice(), + make_slice(next_block_idx), make_slice(blocks), + make_slice(scratch).template reinterpret(), num_items); + } +} + +template +__global__ __launch_bounds__(BLOCK_THREADS) void RadixSort( + const int* in_buffer_selectors, const OffsetT* in_offsets, int start_bit, + int num_pass_bits, T* key_buffers[2], U* value_buffers[2], + int* next_block_idx, BlockT* blocks, int num_items) { + using namespace algorithms; + using namespace utils; + CudaPlatform p; + using RadixSortT = + DeviceRadixSort; + extern __shared__ char radix_sort_scratch[]; + auto scratch = + reinterpret_cast(radix_sort_scratch); + + // load buffer selectors + int current_selector = in_buffer_selectors[0]; + int alternate_selector = in_buffer_selectors[1]; + + // sorting pass is only needed if input and output selectors are different + if (current_selector != alternate_selector) { + const T* in_keys = key_buffers[current_selector]; + const U* in_values = value_buffers[current_selector]; + T* out_keys = key_buffers[alternate_selector]; + U* out_values = value_buffers[alternate_selector]; RadixSortT::template Sort( - p, make_slice(in), make_slice(in_offsets), start_bit, - num_pass_bits, make_slice(out), + p, make_slice(in_keys), make_slice(in_values), + make_slice(in_offsets), start_bit, num_pass_bits, + make_slice(out_keys), make_slice(out_values), make_slice(next_block_idx), make_slice(blocks), - make_slice(&scratch).template reinterpret(), num_items); + make_slice(scratch).template reinterpret(), num_items); } } } // namespace kernels -using OrderByConfig = PerfTestArrayConfig<11>; +using OrderByConfig = PerfTestArrayConfig<16>; const OrderByConfig kConfig = {{ {"num_key_rows", "400000"}, @@ -186,6 +223,11 @@ const OrderByConfig kConfig = {{ {"key_random_stride_short", "10"}, {"key_random_stride_grande", "100000"}, {"key_random_stride_venti", "100000"}, + {"num_value_rows", "400000"}, + {"num_value_rows_short", "6400"}, + {"num_value_rows_grande", "6400000"}, + {"num_value_rows_venti", "64000000"}, + {"value_generate_method", "SEQUENCE"}, }}; template @@ -236,29 +278,38 @@ struct OrderByTestType { std::to_string(RADIX_BITS); } - static size_t GlobalMemoryLoads(size_t num_keys) { + static size_t GlobalMemoryLoads(size_t num_keys, size_t kv_size) { int num_histogram_blocks = (num_keys + HISTOGRAM_TILE_ITEMS - 1) / HISTOGRAM_TILE_ITEMS; // count each atomic add as 1 load + 1 store int num_atomic_loads = HISTOGRAM_SIZE * num_histogram_blocks; // 1N global memory loads for histogram + 1N for each sorting pass - return (num_keys * (1ll + NUM_PASSES)) * sizeof(key_type) + + return (num_keys * (1ll + NUM_PASSES)) * kv_size + num_atomic_loads * sizeof(unsigned); } - static size_t GlobalMemoryStores(size_t num_keys) { + static size_t GlobalMemoryStores(size_t num_keys, size_t kv_size) { int num_histogram_blocks = (num_keys + HISTOGRAM_TILE_ITEMS - 1) / HISTOGRAM_TILE_ITEMS; // count the store of each atomic add int num_atomic_stores = HISTOGRAM_SIZE * num_histogram_blocks; // 1N global memory stores for each sorting pass - return (num_keys * NUM_PASSES) * sizeof(key_type) + + return (num_keys * NUM_PASSES) * kv_size + num_atomic_stores * sizeof(unsigned); } - template - static void SortKeys(device_column_buffered& keys, - const Allocator& allocator) { + template + static constexpr int SortSharedMemorySize() { + return sizeof(typename algorithms::DeviceRadixSort< + CudaPlatform, + ITEMS_PER_THREAD, RADIX_BITS, key_type, ValueT>::Scratch); + } + + template + static void Sort(device_column_buffered& keys, + device_column_buffered& values, + utils::device_vector& kv_selector, + const Allocator& allocator) { using namespace utils; // constant size temporary storage that needs to be zero initialized @@ -296,10 +347,16 @@ struct OrderByTestType { cudaMemsetAsync(temp_storage.data(), 0, sizeof(TempStorage)); cudaMemsetAsync(blocks.data(), 0, sizeof(unsigned) * blocks.size()); + cudaFuncSetAttribute( + &kernels::RadixSort, + cudaFuncAttributeMaxDynamicSharedMemorySize, + SortSharedMemorySize()); + kernels::BuildRadixSortHistogram <<>>( - keys.ptrs().data(), keys.selector().data(), + keys.ptrs().data(), kv_selector.data(), temp_storage.data()->histogram, keys.size()); // exclusive scan of histogram and set buffer advancements @@ -314,7 +371,7 @@ struct OrderByTestType { /*BLOCK_THREADS=*/1, /*ITEMS_PER_THREAD=*/NUM_PASSES> <<>>(buffer_advancements.data(), - keys.selector().data(), + kv_selector.data(), buffer_selectors.data()); // start from lsb and loop until no bits are left @@ -331,10 +388,10 @@ struct OrderByTestType { // radix sorting pass kernels::RadixSort - <<>>( + <<()>>>( pass_buffer_selectors, pass_offsets, start_bit, num_pass_bits, - keys.ptrs().data(), pass_next_block_idx, pass_blocks, - keys.size()); + keys.ptrs().data(), values.ptrs().data(), pass_next_block_idx, + pass_blocks, keys.size()); // advance start bit for next pass start_bit += RADIX_BITS; @@ -360,16 +417,28 @@ TYPED_TEST(OrderByPerfTest, SelectKeysOrderByKeys) { using key_type = typename TypeParam::key_type; using indices_type = utils::size_type; + constexpr int kSortSharedMemorySize = + TypeParam::template SortSharedMemorySize(); + if (kSortSharedMemorySize > this->MaxSharedMemory() && + !getenv("GTEST_ALSO_RUN_SKIPPED_TESTS")) { + GTEST_SKIP() << "skipping test that requires too much shared memory: " + << kSortSharedMemorySize << " > " << this->MaxSharedMemory(); + } + auto items = this->template GetConfigColumn("key"); ASSERT_NE(items.size(), 0u); auto check_result = this->GetConfigValue("check_result", true); auto result_file = this->GetConfigValue("result_file", std::string()); - device_column_buffered d_items(items.size()); int input_selector = 0; + utils::device_vector kv_selector(1); + kv_selector.copy_from_host(&input_selector, 1); + + device_column_buffered d_items(items.size()); d_items.buffer(input_selector).copy_from_host(items.data(), items.size()); - d_items.selector().copy_from_host(&input_selector, 1); + + device_column_buffered d_ignored_values; auto free_list = std::make_shared< caching_device_allocator::free_list_type>(); @@ -379,14 +448,18 @@ TYPED_TEST(OrderByPerfTest, SelectKeysOrderByKeys) { this->set_element_count(items.size()); this->set_element_size(sizeof(key_type)); this->set_elements_per_thread(TypeParam::launch_params::ITEMS_PER_THREAD); - this->set_global_memory_loads(TypeParam::GlobalMemoryLoads(items.size())); - this->set_global_memory_stores(TypeParam::GlobalMemoryStores(items.size())); + this->set_global_memory_loads( + TypeParam::GlobalMemoryLoads(items.size(), sizeof(key_type))); + this->set_global_memory_stores( + TypeParam::GlobalMemoryStores(items.size(), sizeof(key_type))); - this->Measure(kConfig, [&]() { TypeParam::SortKeys(d_items, allocator); }); + this->Measure(kConfig, [&]() { + TypeParam::Sort(d_items, d_ignored_values, kv_selector, allocator); + }); if (check_result) { int output_selector; - d_items.selector().copy_to_host(&output_selector, 1); + kv_selector.copy_to_host(&output_selector, 1); std::vector h_sorted_items(items.size()); d_items.buffer(output_selector) .copy_to_host(h_sorted_items.data(), items.size()); @@ -398,7 +471,7 @@ TYPED_TEST(OrderByPerfTest, SelectKeysOrderByKeys) { if (!result_file.empty()) { int output_selector; - d_items.selector().copy_to_host(&output_selector, 1); + kv_selector.copy_to_host(&output_selector, 1); std::vector h_sorted_items(items.size()); d_items.buffer(output_selector) .copy_to_host(h_sorted_items.data(), items.size()); @@ -419,5 +492,96 @@ TYPED_TEST(OrderByPerfTest, SelectKeysOrderByKeys) { } } +TYPED_TEST(OrderByPerfTest, SelectValuesOrderByKeys) { + using key_type = typename TypeParam::key_type; + using value_type = unsigned; + using indices_type = utils::size_type; + + constexpr int kSortSharedMemorySize = + TypeParam::template SortSharedMemorySize(); + if (kSortSharedMemorySize > this->MaxSharedMemory() && + !getenv("GTEST_ALSO_RUN_SKIPPED_TESTS")) { + GTEST_SKIP() << "skipping test that requires too much shared memory: " + << kSortSharedMemorySize << " > " << this->MaxSharedMemory(); + } + + auto keys = this->template GetConfigColumn("key"); + ASSERT_NE(keys.size(), 0u); + + auto values = this->template GetConfigColumn("value"); + ASSERT_EQ(keys.size(), values.size()); + + auto check_result = this->GetConfigValue("check_result", true); + auto result_file = this->GetConfigValue("result_file", std::string()); + + int input_selector = 0; + utils::device_vector kv_selector(1); + kv_selector.copy_from_host(&input_selector, 1); + + device_column_buffered d_keys(keys.size()); + d_keys.buffer(input_selector).copy_from_host(keys.data(), keys.size()); + + device_column_buffered d_values(values.size()); + d_values.buffer(input_selector).copy_from_host(values.data(), values.size()); + + auto free_list = std::make_shared< + caching_device_allocator::free_list_type>(); + caching_device_allocator allocator(free_list); + + // provide throughput information + constexpr size_t kKVSize = sizeof(key_type) + sizeof(value_type); + this->set_element_count(keys.size()); + this->set_element_size(kKVSize); + this->set_elements_per_thread(TypeParam::launch_params::ITEMS_PER_THREAD); + this->set_global_memory_loads( + TypeParam::GlobalMemoryLoads(keys.size(), kKVSize)); + this->set_global_memory_stores( + TypeParam::GlobalMemoryStores(keys.size(), kKVSize)); + + this->Measure(kConfig, [&]() { + TypeParam::Sort(d_keys, d_values, kv_selector, allocator); + }); + + if (check_result) { + int output_selector; + kv_selector.copy_to_host(&output_selector, 1); + std::vector h_sorted_values(values.size()); + d_values.buffer(output_selector) + .copy_to_host(h_sorted_values.data(), values.size()); + std::vector indices(keys.size()); + std::iota(indices.begin(), indices.end(), 0); + std::stable_sort( + indices.begin(), indices.end(), + [&keys](unsigned a, unsigned b) { return keys[a] < keys[b]; }); + std::vector expected_sorted_values(values.size()); + for (size_t i = 0; i < indices.size(); ++i) { + expected_sorted_values[i] = values[indices[i]]; + } + EXPECT_EQ(expected_sorted_values, h_sorted_values); + } + + if (!result_file.empty()) { + int output_selector; + kv_selector.copy_to_host(&output_selector, 1); + std::vector h_sorted_values(values.size()); + d_values.buffer(output_selector) + .copy_to_host(h_sorted_values.data(), values.size()); + + std::ofstream result_out; + result_out.open(result_file); + ASSERT_TRUE(result_out.is_open()) + << "failed to open result file: " << result_file; + result_out << "sorted_item" << std::endl; + for (size_t i = 0; i < h_sorted_values.size(); ++i) { + result_out << h_sorted_values[i] << std::endl; + } + result_out.close(); + } + + for (auto entry : *free_list) { + cudaFree(entry.second); + } +} + } // namespace test } // namespace breeze diff --git a/velox/experimental/breeze/test/algorithms/algorithm-kernels.template.h b/velox/experimental/breeze/test/algorithms/algorithm-kernels.template.h index e2a22132e819..0cb83b871928 100644 --- a/velox/experimental/breeze/test/algorithms/algorithm-kernels.template.h +++ b/velox/experimental/breeze/test/algorithms/algorithm-kernels.template.h @@ -89,24 +89,45 @@ void RadixSortHistogram(const T* in, unsigned* out, int num_items) { breeze::utils::make_slice(scratch), num_items); } -template +template PLATFORM("p") SHARED_MEM( - "typename breeze::algorithms::DeviceRadixSort::Scratch", + "typename breeze::algorithms::DeviceRadixSort::Scratch", "scratch") -void RadixSort(const T* in, const unsigned* in_offsets, const int* start_bit, - const int* num_pass_bits, T* out, int* next_block_idx, - unsigned* blocks, int num_items) { - breeze::algorithms::DeviceRadixSort:: - template Sort( - p, breeze::utils::make_slice(in), - breeze::utils::make_slice(in_offsets), - *start_bit, *num_pass_bits, - breeze::utils::make_slice(out), - breeze::utils::make_slice(next_block_idx), - breeze::utils::make_slice(blocks), - breeze::utils::make_slice(scratch), num_items); +void RadixSort(const T* in_keys, const U* in_values, const unsigned* in_offsets, + const int* start_bit, const int* num_pass_bits, T* out_keys, + U* out_values, int* next_block_idx, unsigned* blocks, + int num_items) { + if constexpr (breeze::utils::IsSame::VALUE) { + breeze::algorithms::DeviceRadixSort:: + template Sort( + p, breeze::utils::make_slice(in_keys), + breeze::utils::make_empty_slice(), + breeze::utils::make_slice(in_offsets), + *start_bit, *num_pass_bits, + breeze::utils::make_slice(out_keys), + breeze::utils::make_empty_slice(), + breeze::utils::make_slice(next_block_idx), + breeze::utils::make_slice(blocks), + breeze::utils::make_slice(scratch), + num_items); + } else { + breeze::algorithms::DeviceRadixSort:: + template Sort( + p, breeze::utils::make_slice(in_keys), + breeze::utils::make_slice(in_values), + breeze::utils::make_slice(in_offsets), + *start_bit, *num_pass_bits, + breeze::utils::make_slice(out_keys), + breeze::utils::make_slice(out_values), + breeze::utils::make_slice(next_block_idx), + breeze::utils::make_slice(blocks), + breeze::utils::make_slice(scratch), + num_items); + } } } // namespace kernels diff --git a/velox/experimental/breeze/test/algorithms/algorithm_test.template.h b/velox/experimental/breeze/test/algorithms/algorithm_test.template.h index b181258a1e12..8c0b6c97e8d7 100644 --- a/velox/experimental/breeze/test/algorithms/algorithm_test.template.h +++ b/velox/experimental/breeze/test/algorithms/algorithm_test.template.h @@ -63,12 +63,13 @@ class AlgorithmTest : public ::testing::Test { void RadixSortHistogram(USE_AS_SIZE const std::vector& in, std::vector& out, BLOCK_COUNT int num_blocks); - template + template SHARED_MEM_TYPE( - "typename breeze::algorithms::DeviceRadixSort::Scratch") - void RadixSort(USE_AS_SIZE const std::vector& in, + "typename breeze::algorithms::DeviceRadixSort::Scratch") + void RadixSort(USE_AS_SIZE const std::vector& in_keys, + const std::vector& in_values, const std::vector& in_offsets, int start_bit, - int num_pass_bits, std::vector& out, - std::vector& next_block_idx, + int num_pass_bits, std::vector& out_keys, + std::vector& out_values, std::vector& next_block_idx, std::vector& blocks, BLOCK_COUNT int num_blocks); }; diff --git a/velox/experimental/breeze/test/algorithms/kernels.clcpp b/velox/experimental/breeze/test/algorithms/kernels.clcpp index 30b2607f0e12..c12798a75b34 100644 --- a/velox/experimental/breeze/test/algorithms/kernels.clcpp +++ b/velox/experimental/breeze/test/algorithms/kernels.clcpp @@ -27,11 +27,13 @@ enum { OPENCL_WARP_THREADS = 32 }; // kernel specializations using namespace breeze::algorithms; +using namespace breeze::utils; #define _C(X, Y) X##Y #define C(X, Y) _C(X, Y) #define NAME(F, T, BT, IPT) C(, F##_##T##_##BT##x##IPT) +#define NAME2(F, T, U, BT, IPT) C(, F##_##U##_##T##_##BT##x##IPT) #define add_reduce_op ReduceOpAdd #define min_reduce_op ReduceOpMin @@ -87,17 +89,25 @@ GEN_SCAN(add) GEN_RADIX_SORT_HISTOGRAM(int) GEN_RADIX_SORT_HISTOGRAM(uint) -#define GEN_RADIX_SORT(T, BT, IPT, RB) \ - kernel void NAME(radix_sort, T, BT, IPT##x##RB)( \ - const global T *in, const global uint *in_offsets, \ - const global int *start_bit, const global int *num_pass_bits, \ - global T *out, global int *next_block_idx, global uint *blocks, \ - const global int *num_items) { \ - using PlatformT = OpenCLPlatform; \ - local DeviceRadixSort::Scratch scratch; \ - radix_sort(in, in_offsets, start_bit, num_pass_bits, out, \ - next_block_idx, blocks, &scratch, *num_items); \ +#define null_value_type NullType +#define uint_value_type uint + +#define GEN_RADIX_SORT(KT, VT, BT, IPT, RB) \ + kernel void NAME2(radix_sort, KT, VT, BT, IPT##x##RB)( \ + const global KT *in_keys, const global VT##_value_type *in_values, \ + const global uint *in_offsets, const global int *start_bit, \ + const global int *num_pass_bits, global KT *out_keys, \ + global VT##_value_type *out_values, global int *next_block_idx, \ + global uint *blocks, const global int *num_items) { \ + using PlatformT = OpenCLPlatform; \ + local DeviceRadixSort::Scratch \ + scratch; \ + radix_sort(in_keys, in_values, in_offsets, start_bit, \ + num_pass_bits, out_keys, out_values, \ + next_block_idx, blocks, &scratch, *num_items); \ } -GEN_RADIX_SORT(int, 64, 2, 6) -GEN_RADIX_SORT(uint, 64, 2, 6) +GEN_RADIX_SORT(int, null, 64, 2, 6) +GEN_RADIX_SORT(int, uint, 64, 2, 6) +GEN_RADIX_SORT(uint, null, 64, 2, 6) +GEN_RADIX_SORT(uint, uint, 64, 2, 6) diff --git a/velox/experimental/breeze/test/algorithms/kernels.metal b/velox/experimental/breeze/test/algorithms/kernels.metal index c6363c5e250f..07a91a7161bd 100644 --- a/velox/experimental/breeze/test/algorithms/kernels.metal +++ b/velox/experimental/breeze/test/algorithms/kernels.metal @@ -25,11 +25,13 @@ // kernel specializations using namespace breeze::algorithms; +using namespace breeze::utils; #define _C(X, Y) X##Y #define C(X, Y) _C(X, Y) #define NAME(F, T, BT, IPT) C(, F##_##T##_##BT##x##IPT) +#define NAME2(F, T, U, BT, IPT) C(, F##_##U##_##T##_##BT##x##IPT) #define add_reduce_op ReduceOpAdd #define min_reduce_op ReduceOpMin @@ -97,22 +99,32 @@ GEN_SCAN(add) GEN_RADIX_SORT_HISTOGRAM(int) GEN_RADIX_SORT_HISTOGRAM(uint) -#define GEN_RADIX_SORT(T, BT, IPT, RB) \ - kernel void NAME(radix_sort, T, BT, IPT##x##RB)( \ - const device T *in [[buffer(0)]], \ - const device uint *in_offsets [[buffer(1)]], \ - const device int *start_bit [[buffer(2)]], \ - const device int *num_pass_bits [[buffer(3)]], \ - device T *out [[buffer(4)]], device int *next_block_idx [[buffer(5)]], \ - device uint *blocks [[buffer(6)]], \ - const device int *num_items [[buffer(7)]], \ - uint thread_idx [[thread_index_in_threadgroup]], \ - uint block_idx [[threadgroup_position_in_grid]]) { \ - MetalPlatform p{thread_idx, block_idx}; \ - threadgroup DeviceRadixSort::Scratch scratch; \ - radix_sort(p, in, in_offsets, start_bit, num_pass_bits, out, \ - next_block_idx, blocks, &scratch, *num_items); \ +#define null_value_type NullType +#define uint_value_type uint + +#define GEN_RADIX_SORT(KT, VT, BT, IPT, RB) \ + kernel void NAME2(radix_sort, KT, VT, BT, IPT##x##RB)( \ + const device KT *in_keys [[buffer(0)]], \ + const device VT##_value_type *in_values [[buffer(1)]], \ + const device uint *in_offsets [[buffer(2)]], \ + const device int *start_bit [[buffer(3)]], \ + const device int *num_pass_bits [[buffer(4)]], \ + device KT *out_keys [[buffer(5)]], \ + device VT##_value_type *out_values [[buffer(6)]], \ + device int *next_block_idx [[buffer(7)]], \ + device uint *blocks [[buffer(8)]], \ + const device int *num_items [[buffer(9)]], \ + uint thread_idx [[thread_index_in_threadgroup]], \ + uint block_idx [[threadgroup_position_in_grid]]) { \ + MetalPlatform p{thread_idx, block_idx}; \ + threadgroup DeviceRadixSort::Scratch scratch; \ + radix_sort(p, in_keys, in_values, in_offsets, start_bit, \ + num_pass_bits, out_keys, out_values, \ + next_block_idx, blocks, &scratch, *num_items); \ } -GEN_RADIX_SORT(int, 64, 2, 6) -GEN_RADIX_SORT(uint, 64, 2, 6) +GEN_RADIX_SORT(int, null, 64, 2, 6) +GEN_RADIX_SORT(int, uint, 64, 2, 6) +GEN_RADIX_SORT(uint, null, 64, 2, 6) +GEN_RADIX_SORT(uint, uint, 64, 2, 6) diff --git a/velox/experimental/breeze/test/algorithms/sort_unittest.cpp b/velox/experimental/breeze/test/algorithms/sort_unittest.cpp index ffddd0b18029..1611cec20c91 100644 --- a/velox/experimental/breeze/test/algorithms/sort_unittest.cpp +++ b/velox/experimental/breeze/test/algorithms/sort_unittest.cpp @@ -157,10 +157,11 @@ TYPED_TEST(AlgorithmTest, RadixSort) { int num_blocks = (in.size() + kBlockItems - 1) / kBlockItems; std::vector next_block_idx(1); std::vector blocks(num_blocks * kNumBins); + std::vector ignored_values(1); this->template RadixSort( - in, in_offsets, kStartBit, kRadixBits, out, next_block_idx, blocks, - num_blocks); + in, ignored_values, in_offsets, kStartBit, kRadixBits, out, + ignored_values, next_block_idx, blocks, num_blocks); std::vector expected_result = in; std::stable_sort( @@ -172,3 +173,57 @@ TYPED_TEST(AlgorithmTest, RadixSort) { }); EXPECT_EQ(expected_result, out); } + +TYPED_TEST(AlgorithmTest, RadixSortKeyValues) { + constexpr int kBlockItems = kBlockThreads * kItemsPerThread; + constexpr int kRadixBits = 6; + constexpr int kStartBit = 0; + constexpr int kNumBins = 1 << kRadixBits; + + std::vector in_keys(400, 0); + std::iota(in_keys.begin(), in_keys.end(), + std::is_signed::value ? -199 : 1); + static std::minstd_rand rng; + std::shuffle(in_keys.begin(), in_keys.end(), rng); + + std::vector in_histogram(kNumBins); + for (const auto& value : in_keys) { + int bin = extract_bits(to_bit_ordered(value), kStartBit, kRadixBits); + in_histogram[bin] += 1u; + } + unsigned sum = 0; + std::vector in_offsets(kNumBins); + for (size_t i = 0; i < kNumBins; ++i) { + in_offsets[i] = sum; + sum += in_histogram[i]; + } + std::vector indices(in_keys.size()); + std::iota(indices.begin(), indices.end(), 0); + std::vector out_keys(in_keys.size(), 0); + std::vector out_values(in_keys.size(), 0); + int num_blocks = (in_keys.size() + kBlockItems - 1) / kBlockItems; + std::vector next_block_idx(1); + std::vector blocks(num_blocks * kNumBins); + + this->template RadixSort( + in_keys, indices, in_offsets, kStartBit, kRadixBits, out_keys, out_values, + next_block_idx, blocks, num_blocks); + + std::vector sorted_indices = indices; + std::stable_sort(sorted_indices.begin(), sorted_indices.end(), + [&in_keys, start_bit = kStartBit, + num_pass_bits = kRadixBits](unsigned a, unsigned b) { + return extract_bits(to_bit_ordered(in_keys[a]), start_bit, + num_pass_bits) < + extract_bits(to_bit_ordered(in_keys[b]), start_bit, + num_pass_bits); + }); + std::vector expected_out_keys(in_keys.size()); + std::vector expected_out_values(in_keys.size()); + for (size_t i = 0; i < sorted_indices.size(); ++i) { + expected_out_keys[i] = in_keys[sorted_indices[i]]; + expected_out_values[i] = indices[sorted_indices[i]]; + } + EXPECT_EQ(expected_out_keys, out_keys); + EXPECT_EQ(expected_out_values, out_values); +} diff --git a/velox/experimental/breeze/test/generated/algorithms/algorithm_test-cuda.cuh b/velox/experimental/breeze/test/generated/algorithms/algorithm_test-cuda.cuh index 2f96445832d4..27a644f52c27 100644 --- a/velox/experimental/breeze/test/generated/algorithms/algorithm_test-cuda.cuh +++ b/velox/experimental/breeze/test/generated/algorithms/algorithm_test-cuda.cuh @@ -71,18 +71,18 @@ class AlgorithmTest : public ::testing::Test { in, out, in.size()); } - template - void RadixSort(const std::vector& in, + template + void RadixSort(const std::vector& in_keys, const std::vector& in_values, const std::vector& in_offsets, int start_bit, - int num_pass_bits, std::vector& out, - std::vector& next_block_idx, + int num_pass_bits, std::vector& out_keys, + std::vector& out_values, std::vector& next_block_idx, std::vector& blocks, int num_blocks) { const std::vector vec_start_bit(1, start_bit); const std::vector vec_num_pass_bits(1, num_pass_bits); CudaTestLaunch( num_blocks, - &kernels::RadixSort, in, - in_offsets, vec_start_bit, vec_num_pass_bits, out, next_block_idx, - blocks, in.size()); + &kernels::RadixSort, + in_keys, in_values, in_offsets, vec_start_bit, vec_num_pass_bits, + out_keys, out_values, next_block_idx, blocks, in_keys.size()); } }; diff --git a/velox/experimental/breeze/test/generated/algorithms/algorithm_test-openmp.h b/velox/experimental/breeze/test/generated/algorithms/algorithm_test-openmp.h index 662442b84f08..7af59c6e81aa 100644 --- a/velox/experimental/breeze/test/generated/algorithms/algorithm_test-openmp.h +++ b/velox/experimental/breeze/test/generated/algorithms/algorithm_test-openmp.h @@ -82,21 +82,22 @@ class AlgorithmTest : public ::testing::Test { in.data(), out.data(), in.size()); } - template - void RadixSort(const std::vector& in, + template + void RadixSort(const std::vector& in_keys, const std::vector& in_values, const std::vector& in_offsets, int start_bit, - int num_pass_bits, std::vector& out, - std::vector& next_block_idx, + int num_pass_bits, std::vector& out_keys, + std::vector& out_values, std::vector& next_block_idx, std::vector& blocks, int num_blocks) { using PlatformT = OpenMPPlatform; using SharedMemType = typename breeze::algorithms::DeviceRadixSort< - PlatformT, ITEMS_PER_THREAD, RADIX_BITS, T>::Scratch; + PlatformT, ITEMS_PER_THREAD, RADIX_BITS, T, U>::Scratch; OpenMPTestLaunch( num_blocks, - &kernels::RadixSort, - in.data(), in_offsets.data(), &start_bit, &num_pass_bits, out.data(), - next_block_idx.data(), blocks.data(), in.size()); + in_keys.data(), in_values.data(), in_offsets.data(), &start_bit, + &num_pass_bits, out_keys.data(), out_values.data(), + next_block_idx.data(), blocks.data(), in_keys.size()); } }; diff --git a/velox/experimental/breeze/test/generated/algorithms/kernels-cuda.cuh b/velox/experimental/breeze/test/generated/algorithms/kernels-cuda.cuh index e3e864b90698..b9583abe2501 100644 --- a/velox/experimental/breeze/test/generated/algorithms/kernels-cuda.cuh +++ b/velox/experimental/breeze/test/generated/algorithms/kernels-cuda.cuh @@ -92,29 +92,50 @@ __global__ void RadixSortHistogram(const T* in, unsigned* out, int num_items) { breeze::utils::make_slice(scratch), num_items); } -template -__global__ void RadixSort(const T* in, const unsigned* in_offsets, - const int* start_bit, const int* num_pass_bits, - T* out, int* next_block_idx, unsigned* blocks, +template +__global__ void RadixSort(const T* in_keys, const U* in_values, + const unsigned* in_offsets, const int* start_bit, + const int* num_pass_bits, T* out_keys, U* out_values, + int* next_block_idx, unsigned* blocks, int num_items) { using PlatformT = CudaPlatform; PlatformT p; __shared__ typename breeze::algorithms::DeviceRadixSort< - PlatformT, ITEMS_PER_THREAD, RADIX_BITS, T>::Scratch scratch_; + PlatformT, ITEMS_PER_THREAD, RADIX_BITS, T, U>::Scratch scratch_; auto scratch = (typename breeze::algorithms::DeviceRadixSort< - PlatformT, ITEMS_PER_THREAD, RADIX_BITS, T>::Scratch*)&scratch_; + PlatformT, ITEMS_PER_THREAD, RADIX_BITS, T, U>::Scratch*)&scratch_; - breeze::algorithms::DeviceRadixSort:: - template Sort( - p, breeze::utils::make_slice(in), - breeze::utils::make_slice(in_offsets), - *start_bit, *num_pass_bits, - breeze::utils::make_slice(out), - breeze::utils::make_slice(next_block_idx), - breeze::utils::make_slice(blocks), - breeze::utils::make_slice(scratch), num_items); + if constexpr (breeze::utils::IsSame::VALUE) { + breeze::algorithms::DeviceRadixSort:: + template Sort( + p, breeze::utils::make_slice(in_keys), + breeze::utils::make_empty_slice(), + breeze::utils::make_slice(in_offsets), + *start_bit, *num_pass_bits, + breeze::utils::make_slice(out_keys), + breeze::utils::make_empty_slice(), + breeze::utils::make_slice(next_block_idx), + breeze::utils::make_slice(blocks), + breeze::utils::make_slice(scratch), + num_items); + } else { + breeze::algorithms::DeviceRadixSort:: + template Sort( + p, breeze::utils::make_slice(in_keys), + breeze::utils::make_slice(in_values), + breeze::utils::make_slice(in_offsets), + *start_bit, *num_pass_bits, + breeze::utils::make_slice(out_keys), + breeze::utils::make_slice(out_values), + breeze::utils::make_slice(next_block_idx), + breeze::utils::make_slice(blocks), + breeze::utils::make_slice(scratch), + num_items); + } } } // namespace kernels diff --git a/velox/experimental/breeze/test/generated/algorithms/kernels-openmp.h b/velox/experimental/breeze/test/generated/algorithms/kernels-openmp.h index f44a6a497f00..7be9ebcc12ea 100644 --- a/velox/experimental/breeze/test/generated/algorithms/kernels-openmp.h +++ b/velox/experimental/breeze/test/generated/algorithms/kernels-openmp.h @@ -75,22 +75,42 @@ void RadixSortHistogram(PlatformT p, SharedMemType* scratch, const T* in, } template > -void RadixSort(PlatformT p, SharedMemType* scratch, const T* in, - const unsigned* in_offsets, const int* start_bit, - const int* num_pass_bits, T* out, int* next_block_idx, - unsigned* blocks, int num_items) { - breeze::algorithms::DeviceRadixSort:: - template Sort( - p, breeze::utils::make_slice(in), - breeze::utils::make_slice(in_offsets), - *start_bit, *num_pass_bits, - breeze::utils::make_slice(out), - breeze::utils::make_slice(next_block_idx), - breeze::utils::make_slice(blocks), - breeze::utils::make_slice(scratch), num_items); +void RadixSort(PlatformT p, SharedMemType* scratch, const T* in_keys, + const U* in_values, const unsigned* in_offsets, + const int* start_bit, const int* num_pass_bits, T* out_keys, + U* out_values, int* next_block_idx, unsigned* blocks, + int num_items) { + if constexpr (breeze::utils::IsSame::VALUE) { + breeze::algorithms::DeviceRadixSort:: + template Sort( + p, breeze::utils::make_slice(in_keys), + breeze::utils::make_empty_slice(), + breeze::utils::make_slice(in_offsets), + *start_bit, *num_pass_bits, + breeze::utils::make_slice(out_keys), + breeze::utils::make_empty_slice(), + breeze::utils::make_slice(next_block_idx), + breeze::utils::make_slice(blocks), + breeze::utils::make_slice(scratch), + num_items); + } else { + breeze::algorithms::DeviceRadixSort:: + template Sort( + p, breeze::utils::make_slice(in_keys), + breeze::utils::make_slice(in_values), + breeze::utils::make_slice(in_offsets), + *start_bit, *num_pass_bits, + breeze::utils::make_slice(out_keys), + breeze::utils::make_slice(out_values), + breeze::utils::make_slice(next_block_idx), + breeze::utils::make_slice(blocks), + breeze::utils::make_slice(scratch), + num_items); + } } } // namespace kernels