From 3cf194824e64952bac314762a35d0746ce5c4e68 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 12 Sep 2023 10:04:52 -0700 Subject: [PATCH 01/19] Refactor `hash_reduce_by_row` --- cpp/src/reductions/hash_reduce_by_row.cuh | 171 ++++++++++++++++++ cpp/src/stream_compaction/distinct.cu | 20 +- cpp/src/stream_compaction/distinct_reduce.cu | 114 ++++-------- cpp/src/stream_compaction/distinct_reduce.cuh | 2 +- 4 files changed, 218 insertions(+), 89 deletions(-) create mode 100644 cpp/src/reductions/hash_reduce_by_row.cuh diff --git a/cpp/src/reductions/hash_reduce_by_row.cuh b/cpp/src/reductions/hash_reduce_by_row.cuh new file mode 100644 index 00000000000..2566cee6c7f --- /dev/null +++ b/cpp/src/reductions/hash_reduce_by_row.cuh @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include + +#include +#include +#include + +#include + +namespace cudf::detail { + +/** + * @brief Perform a reduction on groups of rows that are compared equal. + * + * This is essentially a reduce-by-key operation with keys are non-contiguous rows and are compared + * equal. A hash table is used to find groups of equal rows. + * + * Depending on the `keep` parameter, the reduction operation for each row group is: + * - If `keep == KEEP_FIRST`: min of row indices in the group. + * - If `keep == KEEP_LAST`: max of row indices in the group. + * - If `keep == KEEP_NONE`: count of equivalent rows (group size). + * + * At the beginning of the operation, the entire output array is filled with a value given by + * the `reduction_init_value()` function. Then, the reduction result for each row group is written + * into the output array at the index of an unspecified row in the group. + * + * @param map The auxiliary map to perform reduction + * @param preprocessed_input The preprocessed of the input rows for computing row hashing and row + * comparisons + * @param num_rows The number of all input rows + * @param has_nulls Indicate whether the input rows has any nulls at any nested levels + * @param has_nested_columns Indicates whether the input table has any nested columns + * @param keep The parameter to determine what type of reduction to perform + * @param nulls_equal Flag to specify whether null elements should be considered as equal + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned vector + * @return A device_uvector containing the reduction results + */ +rmm::device_uvector hash_reduce_by_row( + hash_map_type const& map, + std::shared_ptr const preprocessed_input, + size_type num_rows, + cudf::nullate::DYNAMIC has_nulls, + bool has_nested_columns, + duplicate_keep_option keep, + null_equality nulls_equal, + nan_equality nans_equal, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); + +/** + * @brief A functor to perform reduce-by-key with keys are rows that compared equal. + * + * TODO: We need to switch to use `static_reduction_map` when it is ready + * (https://github.com/NVIDIA/cuCollections/pull/98). + */ +template +struct reduce_by_row_fn_base { + MapView const d_map; + KeyHasher const d_hasher; + KeyEqual const d_equal; + OutputType* const d_output; + + reduce_by_row_fn_base(MapView const& d_map, + KeyHasher const& d_hasher, + KeyEqual const& d_equal, + OutputType* const d_output) + : d_map{d_map}, d_hasher{d_hasher}, d_equal{d_equal}, d_output{d_output} + { + } + + protected: + __device__ OutputType* get_output_ptr(size_type const idx) const + { + auto const iter = d_map.find(idx, d_hasher, d_equal); + + if (iter != d_map.end()) { + // Only one index value of the duplicate rows could be inserted into the map. + // As such, looking up for all indices of duplicate rows always returns the same value. + auto const inserted_idx = iter->second.load(cuda::std::memory_order_relaxed); + + // All duplicate rows will have concurrent access to this same output slot. + return &d_output[inserted_idx]; + } else { + // All input `idx` values have been inserted into the map before. + // Thus, searching for an `idx` key resulting in the `end()` iterator only happens if + // `d_equal(idx, idx) == false`. + // Such situations are due to comparing nulls or NaNs which are considered as always unequal. + // In those cases, all rows containing nulls or NaNs are distinct. Just return their direct + // output slot. + return &d_output[idx]; + } + } +}; + +template +rmm::device_uvector hash_reduce_by_row( + hash_map_type const& map, + std::shared_ptr const preprocessed_input, + size_type num_rows, + cudf::nullate::DYNAMIC has_nulls, + bool has_nested_columns, + null_equality nulls_equal, + nan_equality nans_equal, + ReduceFuncBuilder func_builder, + OutputType init, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + auto reduction_results = rmm::device_uvector(num_rows, stream, mr); + + thrust::uninitialized_fill( + rmm::exec_policy(stream), reduction_results.begin(), reduction_results.end(), init); + + auto const map_dview = map.get_device_view(); + auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input); + auto const key_hasher = experimental::compaction_hash(row_hasher.device_hasher(has_nulls)); + + auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input); + + auto const reduce_by_row = [&](auto const value_comp) { + if (has_nested_columns) { + auto const key_equal = row_comp.equal_to(has_nulls, nulls_equal, value_comp); + thrust::for_each( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_rows), + func_builder.build(map_dview, key_hasher, key_equal, reduction_results.begin())); + } else { + auto const key_equal = row_comp.equal_to(has_nulls, nulls_equal, value_comp); + thrust::for_each( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_rows), + func_builder.build(map_dview, key_hasher, key_equal, reduction_results.begin())); + } + }; + + if (nans_equal == nan_equality::ALL_EQUAL) { + using nan_equal_comparator = + cudf::experimental::row::equality::nan_equal_physical_equality_comparator; + reduce_by_row(nan_equal_comparator{}); + } else { + using nan_unequal_comparator = cudf::experimental::row::equality::physical_equality_comparator; + reduce_by_row(nan_unequal_comparator{}); + } + + return reduction_results; +} + +} // namespace cudf::detail diff --git a/cpp/src/stream_compaction/distinct.cu b/cpp/src/stream_compaction/distinct.cu index cc60b2a12ea..8b0710372a6 100644 --- a/cpp/src/stream_compaction/distinct.cu +++ b/cpp/src/stream_compaction/distinct.cu @@ -96,16 +96,16 @@ rmm::device_uvector get_distinct_indices(table_view const& input, } // For other keep options, reduce by row on rows that compare equal. - auto const reduction_results = hash_reduce_by_row(map, - std::move(preprocessed_input), - input.num_rows(), - has_nulls, - has_nested_columns, - keep, - nulls_equal, - nans_equal, - stream, - rmm::mr::get_current_device_resource()); + auto const reduction_results = distinct_reduce(map, + std::move(preprocessed_input), + input.num_rows(), + has_nulls, + has_nested_columns, + keep, + nulls_equal, + nans_equal, + stream, + rmm::mr::get_current_device_resource()); // Extract the desired output indices from reduction results. auto const map_end = [&] { diff --git a/cpp/src/stream_compaction/distinct_reduce.cu b/cpp/src/stream_compaction/distinct_reduce.cu index 020e6a495bc..0b621f87fbf 100644 --- a/cpp/src/stream_compaction/distinct_reduce.cu +++ b/cpp/src/stream_compaction/distinct_reduce.cu @@ -16,6 +16,8 @@ #include "distinct_reduce.cuh" +#include + #include #include #include @@ -24,31 +26,27 @@ namespace cudf::detail { namespace { /** - * @brief A functor to perform reduce-by-key with keys are rows that compared equal. + * @brief * - * TODO: We need to switch to use `static_reduction_map` when it is ready - * (https://github.com/NVIDIA/cuCollections/pull/98). */ template -struct reduce_by_row_fn { - MapView const d_map; - KeyHasher const d_hasher; - KeyEqual const d_equal; +struct distinct_reduce_fn : reduce_by_row_fn_base { duplicate_keep_option const keep; - size_type* const d_output; - reduce_by_row_fn(MapView const& d_map, - KeyHasher const& d_hasher, - KeyEqual const& d_equal, - duplicate_keep_option const keep, - size_type* const d_output) - : d_map{d_map}, d_hasher{d_hasher}, d_equal{d_equal}, keep{keep}, d_output{d_output} + distinct_reduce_fn(MapView const& d_map, + KeyHasher const& d_hasher, + KeyEqual const& d_equal, + duplicate_keep_option const keep, + size_type* const d_output) + : reduce_by_row_fn_base( + d_map, d_hasher, d_equal, d_output), + keep{keep} { } __device__ void operator()(size_type const idx) const { - auto const out_ptr = get_output_ptr(idx); + auto const out_ptr = this->get_output_ptr(idx); if (keep == duplicate_keep_option::KEEP_FIRST) { // Store the smallest index of all rows that are equal. @@ -61,34 +59,25 @@ struct reduce_by_row_fn { atomicAdd(out_ptr, size_type{1}); } } +}; - private: - __device__ size_type* get_output_ptr(size_type const idx) const - { - auto const iter = d_map.find(idx, d_hasher, d_equal); - - if (iter != d_map.end()) { - // Only one index value of the duplicate rows could be inserted into the map. - // As such, looking up for all indices of duplicate rows always returns the same value. - auto const inserted_idx = iter->second.load(cuda::std::memory_order_relaxed); +struct reduce_func_builder { + duplicate_keep_option keep; - // All duplicate rows will have concurrent access to this same output slot. - return &d_output[inserted_idx]; - } else { - // All input `idx` values have been inserted into the map before. - // Thus, searching for an `idx` key resulting in the `end()` iterator only happens if - // `d_equal(idx, idx) == false`. - // Such situations are due to comparing nulls or NaNs which are considered as always unequal. - // In those cases, all rows containing nulls or NaNs are distinct. Just return their direct - // output slot. - return &d_output[idx]; - } + template + auto build(MapView const& d_map, + KeyHasher const& d_hasher, + KeyEqual const& d_equal, + size_type* const d_output) + { + return distinct_reduce_fn{ + d_map, d_hasher, d_equal, keep, d_output}; } }; } // namespace -rmm::device_uvector hash_reduce_by_row( +rmm::device_uvector distinct_reduce( hash_map_type const& map, std::shared_ptr const preprocessed_input, size_type num_rows, @@ -103,48 +92,17 @@ rmm::device_uvector hash_reduce_by_row( CUDF_EXPECTS(keep != duplicate_keep_option::KEEP_ANY, "This function should not be called with KEEP_ANY"); - auto reduction_results = rmm::device_uvector(num_rows, stream, mr); - - thrust::uninitialized_fill(rmm::exec_policy(stream), - reduction_results.begin(), - reduction_results.end(), - reduction_init_value(keep)); - - auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input); - auto const key_hasher = experimental::compaction_hash(row_hasher.device_hasher(has_nulls)); - - auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input); - - auto const reduce_by_row = [&](auto const value_comp) { - if (has_nested_columns) { - auto const key_equal = row_comp.equal_to(has_nulls, nulls_equal, value_comp); - thrust::for_each( - rmm::exec_policy(stream), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(num_rows), - reduce_by_row_fn{ - map.get_device_view(), key_hasher, key_equal, keep, reduction_results.begin()}); - } else { - auto const key_equal = row_comp.equal_to(has_nulls, nulls_equal, value_comp); - thrust::for_each( - rmm::exec_policy(stream), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(num_rows), - reduce_by_row_fn{ - map.get_device_view(), key_hasher, key_equal, keep, reduction_results.begin()}); - } - }; - - if (nans_equal == nan_equality::ALL_EQUAL) { - using nan_equal_comparator = - cudf::experimental::row::equality::nan_equal_physical_equality_comparator; - reduce_by_row(nan_equal_comparator{}); - } else { - using nan_unequal_comparator = cudf::experimental::row::equality::physical_equality_comparator; - reduce_by_row(nan_unequal_comparator{}); - } - - return reduction_results; + return hash_reduce_by_row(map, + preprocessed_input, + num_rows, + has_nulls, + has_nested_columns, + nulls_equal, + nans_equal, + reduce_func_builder{keep}, + reduction_init_value(keep), + stream, + mr); } } // namespace cudf::detail diff --git a/cpp/src/stream_compaction/distinct_reduce.cuh b/cpp/src/stream_compaction/distinct_reduce.cuh index 8ec1fa18205..74fba8196f4 100644 --- a/cpp/src/stream_compaction/distinct_reduce.cuh +++ b/cpp/src/stream_compaction/distinct_reduce.cuh @@ -72,7 +72,7 @@ auto constexpr reduction_init_value(duplicate_keep_option keep) * @param mr Device memory resource used to allocate the returned vector * @return A device_uvector containing the reduction results */ -rmm::device_uvector hash_reduce_by_row( +rmm::device_uvector distinct_reduce( hash_map_type const& map, std::shared_ptr const preprocessed_input, size_type num_rows, From 84886467639b5303572f2e85402b3db075e36cbd Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 12 Sep 2023 10:54:46 -0700 Subject: [PATCH 02/19] Rewrite `hash_reduce_by_row.cuh` --- cpp/src/reductions/hash_reduce_by_row.cuh | 91 ++++++++++------------- 1 file changed, 40 insertions(+), 51 deletions(-) diff --git a/cpp/src/reductions/hash_reduce_by_row.cuh b/cpp/src/reductions/hash_reduce_by_row.cuh index 2566cee6c7f..d30e96bc9d2 100644 --- a/cpp/src/reductions/hash_reduce_by_row.cuh +++ b/cpp/src/reductions/hash_reduce_by_row.cuh @@ -16,8 +16,6 @@ #include -#include -#include #include #include @@ -25,57 +23,22 @@ #include #include -#include +#include +#include +#include namespace cudf::detail { /** - * @brief Perform a reduction on groups of rows that are compared equal. - * - * This is essentially a reduce-by-key operation with keys are non-contiguous rows and are compared - * equal. A hash table is used to find groups of equal rows. - * - * Depending on the `keep` parameter, the reduction operation for each row group is: - * - If `keep == KEEP_FIRST`: min of row indices in the group. - * - If `keep == KEEP_LAST`: max of row indices in the group. - * - If `keep == KEEP_NONE`: count of equivalent rows (group size). - * - * At the beginning of the operation, the entire output array is filled with a value given by - * the `reduction_init_value()` function. Then, the reduction result for each row group is written - * into the output array at the index of an unspecified row in the group. - * - * @param map The auxiliary map to perform reduction - * @param preprocessed_input The preprocessed of the input rows for computing row hashing and row - * comparisons - * @param num_rows The number of all input rows - * @param has_nulls Indicate whether the input rows has any nulls at any nested levels - * @param has_nested_columns Indicates whether the input table has any nested columns - * @param keep The parameter to determine what type of reduction to perform - * @param nulls_equal Flag to specify whether null elements should be considered as equal - * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource used to allocate the returned vector - * @return A device_uvector containing the reduction results - */ -rmm::device_uvector hash_reduce_by_row( - hash_map_type const& map, - std::shared_ptr const preprocessed_input, - size_type num_rows, - cudf::nullate::DYNAMIC has_nulls, - bool has_nested_columns, - duplicate_keep_option keep, - null_equality nulls_equal, - nan_equality nans_equal, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); - -/** - * @brief A functor to perform reduce-by-key with keys are rows that compared equal. + * @brief The base struct for customized reduction functor to perform reduce-by-key with keys are + * rows that compared equal. * * TODO: We need to switch to use `static_reduction_map` when it is ready * (https://github.com/NVIDIA/cuCollections/pull/98). */ template struct reduce_by_row_fn_base { + protected: MapView const d_map; KeyHasher const d_hasher; KeyEqual const d_equal; @@ -89,13 +52,18 @@ struct reduce_by_row_fn_base { { } - protected: + /** + * @brief Return a pointer to the output array at the given index. + * + * @param idx The access index + * @return A pointer to the given index in the output array + */ __device__ OutputType* get_output_ptr(size_type const idx) const { auto const iter = d_map.find(idx, d_hasher, d_equal); if (iter != d_map.end()) { - // Only one index value of the duplicate rows could be inserted into the map. + // Only one (undetermined) index value of the duplicate rows could be inserted into the map. // As such, looking up for all indices of duplicate rows always returns the same value. auto const inserted_idx = iter->second.load(cuda::std::memory_order_relaxed); @@ -113,6 +81,29 @@ struct reduce_by_row_fn_base { } }; +/** + * @brief Perform a reduction on groups of rows that are compared equal. + * + * This is essentially a reduce-by-key operation with keys are non-contiguous rows and are compared + * equal. A hash table is used to find groups of equal rows. + * + * At the beginning of the operation, the entire output array is filled with a value given by + * the `init` parameter. Then, the reduction result for each row group is written into the output + * array at the index of an unspecified row in the group. + * + * @param map The auxiliary map to perform reduction + * @param preprocessed_input The preprocessed of the input rows for computing row hashing and row + * comparisons + * @param num_rows The number of all input rows + * @param has_nulls Indicate whether the input rows has any nulls at any nested levels + * @param has_nested_columns Indicates whether the input table has any nested columns + * @param nulls_equal Flag to specify whether null elements should be considered as equal + * @param nans_equal Flag to specify whether NaN values in floating point column should be + * considered equal. + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned vector + * @return A device_uvector containing the reduction results + */ template rmm::device_uvector hash_reduce_by_row( hash_map_type const& map, @@ -127,16 +118,14 @@ rmm::device_uvector hash_reduce_by_row( rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - auto reduction_results = rmm::device_uvector(num_rows, stream, mr); - - thrust::uninitialized_fill( - rmm::exec_policy(stream), reduction_results.begin(), reduction_results.end(), init); - auto const map_dview = map.get_device_view(); auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input); auto const key_hasher = experimental::compaction_hash(row_hasher.device_hasher(has_nulls)); + auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input); - auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input); + auto reduction_results = rmm::device_uvector(num_rows, stream, mr); + thrust::uninitialized_fill( + rmm::exec_policy(stream), reduction_results.begin(), reduction_results.end(), init); auto const reduce_by_row = [&](auto const value_comp) { if (has_nested_columns) { From 1994684b77c43fc81dd33e4b564a87b7c3b84a9c Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 12 Sep 2023 10:55:57 -0700 Subject: [PATCH 03/19] Rename and rewrite `distinct_reduce.hpp` --- .../{distinct_reduce.cuh => distinct_reduce.hpp} | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) rename cpp/src/stream_compaction/{distinct_reduce.cuh => distinct_reduce.hpp} (93%) diff --git a/cpp/src/stream_compaction/distinct_reduce.cuh b/cpp/src/stream_compaction/distinct_reduce.hpp similarity index 93% rename from cpp/src/stream_compaction/distinct_reduce.cuh rename to cpp/src/stream_compaction/distinct_reduce.hpp index 74fba8196f4..236b6c860c3 100644 --- a/cpp/src/stream_compaction/distinct_reduce.cuh +++ b/cpp/src/stream_compaction/distinct_reduce.hpp @@ -14,18 +14,14 @@ * limitations under the License. */ -#include "stream_compaction_common.cuh" +#include "stream_compaction_common.hpp" -#include #include #include #include #include #include -#include - -#include namespace cudf::detail { @@ -56,6 +52,8 @@ auto constexpr reduction_init_value(duplicate_keep_option keep) * - If `keep == KEEP_LAST`: max of row indices in the group. * - If `keep == KEEP_NONE`: count of equivalent rows (group size). * + * Note that this function is not needed when `keep == KEEP_NONE`. + * * At the beginning of the operation, the entire output array is filled with a value given by * the `reduction_init_value()` function. Then, the reduction result for each row group is written * into the output array at the index of an unspecified row in the group. @@ -68,6 +66,8 @@ auto constexpr reduction_init_value(duplicate_keep_option keep) * @param has_nested_columns Indicates whether the input table has any nested columns * @param keep The parameter to determine what type of reduction to perform * @param nulls_equal Flag to specify whether null elements should be considered as equal + * @param nans_equal Flag to specify whether NaN values in floating point column should be + * considered equal. * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned vector * @return A device_uvector containing the reduction results From 5dcbac9dec96e4525369029bb23774020f9b5c1e Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 12 Sep 2023 11:00:16 -0700 Subject: [PATCH 04/19] Rewrite `distinct.cu` --- cpp/src/stream_compaction/distinct.cu | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/stream_compaction/distinct.cu b/cpp/src/stream_compaction/distinct.cu index 8b0710372a6..b551df96765 100644 --- a/cpp/src/stream_compaction/distinct.cu +++ b/cpp/src/stream_compaction/distinct.cu @@ -14,7 +14,8 @@ * limitations under the License. */ -#include "distinct_reduce.cuh" +#include "distinct_reduce.hpp" +#include "stream_compaction_common.cuh" #include #include From 6236fcc9551685b8da497c0b86eda08769615f61 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 12 Sep 2023 11:10:14 -0700 Subject: [PATCH 05/19] Rewrite `distinct_reduce.cu` --- cpp/src/stream_compaction/distinct_reduce.cu | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/cpp/src/stream_compaction/distinct_reduce.cu b/cpp/src/stream_compaction/distinct_reduce.cu index 0b621f87fbf..24926cdbd4a 100644 --- a/cpp/src/stream_compaction/distinct_reduce.cu +++ b/cpp/src/stream_compaction/distinct_reduce.cu @@ -14,20 +14,15 @@ * limitations under the License. */ -#include "distinct_reduce.cuh" +#include "distinct_reduce.hpp" #include -#include -#include -#include - namespace cudf::detail { namespace { /** - * @brief - * + * @brief The functor to find the first/last/none duplicate row for rows that compared equal. */ template struct distinct_reduce_fn : reduce_by_row_fn_base { @@ -61,6 +56,10 @@ struct distinct_reduce_fn : reduce_by_row_fn_base Date: Tue, 12 Sep 2023 11:10:23 -0700 Subject: [PATCH 06/19] Rewrite `hash_reduce_by_row.cuh` --- cpp/src/reductions/hash_reduce_by_row.cuh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cpp/src/reductions/hash_reduce_by_row.cuh b/cpp/src/reductions/hash_reduce_by_row.cuh index d30e96bc9d2..1cff009b17b 100644 --- a/cpp/src/reductions/hash_reduce_by_row.cuh +++ b/cpp/src/reductions/hash_reduce_by_row.cuh @@ -91,6 +91,9 @@ struct reduce_by_row_fn_base { * the `init` parameter. Then, the reduction result for each row group is written into the output * array at the index of an unspecified row in the group. * + * @tparam ReduceFuncBuilder The builder class that must have a `build()` method returning a + * reduction functor derived from `reduce_by_row_fn_base` + * @tparam OutputType Type of the reduction results * @param map The auxiliary map to perform reduction * @param preprocessed_input The preprocessed of the input rows for computing row hashing and row * comparisons @@ -100,6 +103,7 @@ struct reduce_by_row_fn_base { * @param nulls_equal Flag to specify whether null elements should be considered as equal * @param nans_equal Flag to specify whether NaN values in floating point column should be * considered equal. + * @param init The initial value for reduction of each row group * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned vector * @return A device_uvector containing the reduction results From 584ff8dc600a6c6d13f5f5adadae719c0aa7eb2f Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 12 Sep 2023 11:22:06 -0700 Subject: [PATCH 07/19] Minor changes --- cpp/src/stream_compaction/distinct_reduce.cu | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/stream_compaction/distinct_reduce.cu b/cpp/src/stream_compaction/distinct_reduce.cu index 24926cdbd4a..a451643794d 100644 --- a/cpp/src/stream_compaction/distinct_reduce.cu +++ b/cpp/src/stream_compaction/distinct_reduce.cu @@ -33,8 +33,8 @@ struct distinct_reduce_fn : reduce_by_row_fn_base( - d_map, d_hasher, d_equal, d_output), + : reduce_by_row_fn_base{ + d_map, d_hasher, d_equal, d_output}, keep{keep} { } @@ -61,7 +61,7 @@ struct distinct_reduce_fn : reduce_by_row_fn_base auto build(MapView const& d_map, From 4a3d60d62598c6180431b99f3ab03c3787fd445f Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 12 Sep 2023 11:28:51 -0700 Subject: [PATCH 08/19] Fix style --- cpp/src/stream_compaction/distinct_reduce.cu | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cpp/src/stream_compaction/distinct_reduce.cu b/cpp/src/stream_compaction/distinct_reduce.cu index a451643794d..8cfb7b93515 100644 --- a/cpp/src/stream_compaction/distinct_reduce.cu +++ b/cpp/src/stream_compaction/distinct_reduce.cu @@ -33,8 +33,10 @@ struct distinct_reduce_fn : reduce_by_row_fn_base{ - d_map, d_hasher, d_equal, d_output}, + : reduce_by_row_fn_base{d_map, + d_hasher, + d_equal, + d_output}, keep{keep} { } From 34cb488c27880f40a579686ebd18d447f734f240 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 12 Sep 2023 11:32:09 -0700 Subject: [PATCH 09/19] Fix comment --- cpp/src/stream_compaction/distinct_reduce.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/stream_compaction/distinct_reduce.cu b/cpp/src/stream_compaction/distinct_reduce.cu index 8cfb7b93515..64d29ae2ff0 100644 --- a/cpp/src/stream_compaction/distinct_reduce.cu +++ b/cpp/src/stream_compaction/distinct_reduce.cu @@ -22,7 +22,7 @@ namespace cudf::detail { namespace { /** - * @brief The functor to find the first/last/none duplicate row for rows that compared equal. + * @brief The functor to find the first/last/all duplicate row for rows that compared equal. */ template struct distinct_reduce_fn : reduce_by_row_fn_base { From 95e4463262aa72b250df41a33367f9f66237a825 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 12 Sep 2023 12:41:51 -0700 Subject: [PATCH 10/19] Move file --- .../reductions => include/cudf/detail}/hash_reduce_by_row.cuh | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename cpp/{src/reductions => include/cudf/detail}/hash_reduce_by_row.cuh (100%) diff --git a/cpp/src/reductions/hash_reduce_by_row.cuh b/cpp/include/cudf/detail/hash_reduce_by_row.cuh similarity index 100% rename from cpp/src/reductions/hash_reduce_by_row.cuh rename to cpp/include/cudf/detail/hash_reduce_by_row.cuh From 723ae4c720c3fc4a5f950c230657abeac60644c5 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 12 Sep 2023 12:45:16 -0700 Subject: [PATCH 11/19] Merge `distinct_reduce.*` into `distinct.cu` --- cpp/CMakeLists.txt | 1 - cpp/src/stream_compaction/distinct.cu | 101 ++++++++++++++-- cpp/src/stream_compaction/distinct_reduce.cu | 109 ------------------ cpp/src/stream_compaction/distinct_reduce.hpp | 87 -------------- 4 files changed, 90 insertions(+), 208 deletions(-) delete mode 100644 cpp/src/stream_compaction/distinct_reduce.cu delete mode 100644 cpp/src/stream_compaction/distinct_reduce.hpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 516865e5782..5703318592f 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -530,7 +530,6 @@ add_library( src/stream_compaction/apply_boolean_mask.cu src/stream_compaction/distinct.cu src/stream_compaction/distinct_count.cu - src/stream_compaction/distinct_reduce.cu src/stream_compaction/drop_nans.cu src/stream_compaction/drop_nulls.cu src/stream_compaction/stable_distinct.cu diff --git a/cpp/src/stream_compaction/distinct.cu b/cpp/src/stream_compaction/distinct.cu index b551df96765..8a7f6daa193 100644 --- a/cpp/src/stream_compaction/distinct.cu +++ b/cpp/src/stream_compaction/distinct.cu @@ -14,11 +14,11 @@ * limitations under the License. */ -#include "distinct_reduce.hpp" #include "stream_compaction_common.cuh" #include #include +#include #include #include #include @@ -39,6 +39,80 @@ namespace cudf { namespace detail { +namespace { +/** + * @brief Return the reduction identity used to initialize results of `hash_reduce_by_row`. + * + * @param keep A value of `duplicate_keep_option` type, must not be `KEEP_ANY`. + * @return The initial reduction value. + */ +auto constexpr reduction_init_value(duplicate_keep_option keep) +{ + switch (keep) { + case duplicate_keep_option::KEEP_FIRST: return std::numeric_limits::max(); + case duplicate_keep_option::KEEP_LAST: return std::numeric_limits::min(); + case duplicate_keep_option::KEEP_NONE: return size_type{0}; + default: CUDF_UNREACHABLE("This function should not be called with KEEP_ANY"); + } +} + +/** + * @brief The functor to find the first/last/all duplicate row for rows that compared equal. + */ +template +struct distinct_reduce_fn : reduce_by_row_fn_base { + duplicate_keep_option const keep; + + distinct_reduce_fn(MapView const& d_map, + KeyHasher const& d_hasher, + KeyEqual const& d_equal, + duplicate_keep_option const keep, + size_type* const d_output) + : reduce_by_row_fn_base{d_map, + d_hasher, + d_equal, + d_output}, + keep{keep} + { + } + + __device__ void operator()(size_type const idx) const + { + auto const out_ptr = this->get_output_ptr(idx); + + if (keep == duplicate_keep_option::KEEP_FIRST) { + // Store the smallest index of all rows that are equal. + atomicMin(out_ptr, idx); + } else if (keep == duplicate_keep_option::KEEP_LAST) { + // Store the greatest index of all rows that are equal. + atomicMax(out_ptr, idx); + } else { + // Count the number of rows in each group of rows that are compared equal. + atomicAdd(out_ptr, size_type{1}); + } + } +}; + +/** + * @brief The builder to construct an instance of `distinct_reduce_fn` functor base on the given + * value of the `duplicate_keep_option` member variable. + */ +struct reduce_func_builder { + duplicate_keep_option const keep; + + template + auto build(MapView const& d_map, + KeyHasher const& d_hasher, + KeyEqual const& d_equal, + size_type* const d_output) + { + return distinct_reduce_fn{ + d_map, d_hasher, d_equal, keep, d_output}; + } +}; + +} // namespace + rmm::device_uvector get_distinct_indices(table_view const& input, duplicate_keep_option keep, null_equality nulls_equal, @@ -97,16 +171,21 @@ rmm::device_uvector get_distinct_indices(table_view const& input, } // For other keep options, reduce by row on rows that compare equal. - auto const reduction_results = distinct_reduce(map, - std::move(preprocessed_input), - input.num_rows(), - has_nulls, - has_nested_columns, - keep, - nulls_equal, - nans_equal, - stream, - rmm::mr::get_current_device_resource()); + // Depending on the `keep` parameter, the reduction operation for each row group is: + // - If `keep == KEEP_FIRST`: min of row indices in the group. + // - If `keep == KEEP_LAST`: max of row indices in the group. + // - If `keep == KEEP_NONE`: count of equivalent rows (group size). + auto const reduction_results = hash_reduce_by_row(map, + std::move(preprocessed_input), + input.num_rows(), + has_nulls, + has_nested_columns, + nulls_equal, + nans_equal, + reduce_func_builder{keep}, + reduction_init_value(keep), + stream, + mr); // Extract the desired output indices from reduction results. auto const map_end = [&] { diff --git a/cpp/src/stream_compaction/distinct_reduce.cu b/cpp/src/stream_compaction/distinct_reduce.cu deleted file mode 100644 index 64d29ae2ff0..00000000000 --- a/cpp/src/stream_compaction/distinct_reduce.cu +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "distinct_reduce.hpp" - -#include - -namespace cudf::detail { - -namespace { -/** - * @brief The functor to find the first/last/all duplicate row for rows that compared equal. - */ -template -struct distinct_reduce_fn : reduce_by_row_fn_base { - duplicate_keep_option const keep; - - distinct_reduce_fn(MapView const& d_map, - KeyHasher const& d_hasher, - KeyEqual const& d_equal, - duplicate_keep_option const keep, - size_type* const d_output) - : reduce_by_row_fn_base{d_map, - d_hasher, - d_equal, - d_output}, - keep{keep} - { - } - - __device__ void operator()(size_type const idx) const - { - auto const out_ptr = this->get_output_ptr(idx); - - if (keep == duplicate_keep_option::KEEP_FIRST) { - // Store the smallest index of all rows that are equal. - atomicMin(out_ptr, idx); - } else if (keep == duplicate_keep_option::KEEP_LAST) { - // Store the greatest index of all rows that are equal. - atomicMax(out_ptr, idx); - } else { - // Count the number of rows in each group of rows that are compared equal. - atomicAdd(out_ptr, size_type{1}); - } - } -}; - -/** - * @brief The builder to construct an instance of `distinct_reduce_fn` functor base on the given - * value of the `duplicate_keep_option` member variable. - */ -struct reduce_func_builder { - duplicate_keep_option const keep; - - template - auto build(MapView const& d_map, - KeyHasher const& d_hasher, - KeyEqual const& d_equal, - size_type* const d_output) - { - return distinct_reduce_fn{ - d_map, d_hasher, d_equal, keep, d_output}; - } -}; - -} // namespace - -rmm::device_uvector distinct_reduce( - hash_map_type const& map, - std::shared_ptr const preprocessed_input, - size_type num_rows, - cudf::nullate::DYNAMIC has_nulls, - bool has_nested_columns, - duplicate_keep_option keep, - null_equality nulls_equal, - nan_equality nans_equal, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - CUDF_EXPECTS(keep != duplicate_keep_option::KEEP_ANY, - "This function should not be called with KEEP_ANY"); - - return hash_reduce_by_row(map, - preprocessed_input, - num_rows, - has_nulls, - has_nested_columns, - nulls_equal, - nans_equal, - reduce_func_builder{keep}, - reduction_init_value(keep), - stream, - mr); -} - -} // namespace cudf::detail diff --git a/cpp/src/stream_compaction/distinct_reduce.hpp b/cpp/src/stream_compaction/distinct_reduce.hpp deleted file mode 100644 index 236b6c860c3..00000000000 --- a/cpp/src/stream_compaction/distinct_reduce.hpp +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "stream_compaction_common.hpp" - -#include -#include -#include - -#include -#include - -namespace cudf::detail { - -/** - * @brief Return the reduction identity used to initialize results of `hash_reduce_by_row`. - * - * @param keep A value of `duplicate_keep_option` type, must not be `KEEP_ANY`. - * @return The initial reduction value. - */ -auto constexpr reduction_init_value(duplicate_keep_option keep) -{ - switch (keep) { - case duplicate_keep_option::KEEP_FIRST: return std::numeric_limits::max(); - case duplicate_keep_option::KEEP_LAST: return std::numeric_limits::min(); - case duplicate_keep_option::KEEP_NONE: return size_type{0}; - default: CUDF_UNREACHABLE("This function should not be called with KEEP_ANY"); - } -} - -/** - * @brief Perform a reduction on groups of rows that are compared equal. - * - * This is essentially a reduce-by-key operation with keys are non-contiguous rows and are compared - * equal. A hash table is used to find groups of equal rows. - * - * Depending on the `keep` parameter, the reduction operation for each row group is: - * - If `keep == KEEP_FIRST`: min of row indices in the group. - * - If `keep == KEEP_LAST`: max of row indices in the group. - * - If `keep == KEEP_NONE`: count of equivalent rows (group size). - * - * Note that this function is not needed when `keep == KEEP_NONE`. - * - * At the beginning of the operation, the entire output array is filled with a value given by - * the `reduction_init_value()` function. Then, the reduction result for each row group is written - * into the output array at the index of an unspecified row in the group. - * - * @param map The auxiliary map to perform reduction - * @param preprocessed_input The preprocessed of the input rows for computing row hashing and row - * comparisons - * @param num_rows The number of all input rows - * @param has_nulls Indicate whether the input rows has any nulls at any nested levels - * @param has_nested_columns Indicates whether the input table has any nested columns - * @param keep The parameter to determine what type of reduction to perform - * @param nulls_equal Flag to specify whether null elements should be considered as equal - * @param nans_equal Flag to specify whether NaN values in floating point column should be - * considered equal. - * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource used to allocate the returned vector - * @return A device_uvector containing the reduction results - */ -rmm::device_uvector distinct_reduce( - hash_map_type const& map, - std::shared_ptr const preprocessed_input, - size_type num_rows, - cudf::nullate::DYNAMIC has_nulls, - bool has_nested_columns, - duplicate_keep_option keep, - null_equality nulls_equal, - nan_equality nans_equal, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); - -} // namespace cudf::detail From 8fb7a9e7124a3bfcac780c108b6cc7e629c47219 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 12 Sep 2023 13:21:57 -0700 Subject: [PATCH 12/19] Revert "Merge `distinct_reduce.*` into `distinct.cu`" This reverts commit 723ae4c720c3fc4a5f950c230657abeac60644c5. --- cpp/CMakeLists.txt | 1 + cpp/src/stream_compaction/distinct.cu | 101 ++-------------- cpp/src/stream_compaction/distinct_reduce.cu | 109 ++++++++++++++++++ cpp/src/stream_compaction/distinct_reduce.hpp | 87 ++++++++++++++ 4 files changed, 208 insertions(+), 90 deletions(-) create mode 100644 cpp/src/stream_compaction/distinct_reduce.cu create mode 100644 cpp/src/stream_compaction/distinct_reduce.hpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 5703318592f..516865e5782 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -530,6 +530,7 @@ add_library( src/stream_compaction/apply_boolean_mask.cu src/stream_compaction/distinct.cu src/stream_compaction/distinct_count.cu + src/stream_compaction/distinct_reduce.cu src/stream_compaction/drop_nans.cu src/stream_compaction/drop_nulls.cu src/stream_compaction/stable_distinct.cu diff --git a/cpp/src/stream_compaction/distinct.cu b/cpp/src/stream_compaction/distinct.cu index 8a7f6daa193..b551df96765 100644 --- a/cpp/src/stream_compaction/distinct.cu +++ b/cpp/src/stream_compaction/distinct.cu @@ -14,11 +14,11 @@ * limitations under the License. */ +#include "distinct_reduce.hpp" #include "stream_compaction_common.cuh" #include #include -#include #include #include #include @@ -39,80 +39,6 @@ namespace cudf { namespace detail { -namespace { -/** - * @brief Return the reduction identity used to initialize results of `hash_reduce_by_row`. - * - * @param keep A value of `duplicate_keep_option` type, must not be `KEEP_ANY`. - * @return The initial reduction value. - */ -auto constexpr reduction_init_value(duplicate_keep_option keep) -{ - switch (keep) { - case duplicate_keep_option::KEEP_FIRST: return std::numeric_limits::max(); - case duplicate_keep_option::KEEP_LAST: return std::numeric_limits::min(); - case duplicate_keep_option::KEEP_NONE: return size_type{0}; - default: CUDF_UNREACHABLE("This function should not be called with KEEP_ANY"); - } -} - -/** - * @brief The functor to find the first/last/all duplicate row for rows that compared equal. - */ -template -struct distinct_reduce_fn : reduce_by_row_fn_base { - duplicate_keep_option const keep; - - distinct_reduce_fn(MapView const& d_map, - KeyHasher const& d_hasher, - KeyEqual const& d_equal, - duplicate_keep_option const keep, - size_type* const d_output) - : reduce_by_row_fn_base{d_map, - d_hasher, - d_equal, - d_output}, - keep{keep} - { - } - - __device__ void operator()(size_type const idx) const - { - auto const out_ptr = this->get_output_ptr(idx); - - if (keep == duplicate_keep_option::KEEP_FIRST) { - // Store the smallest index of all rows that are equal. - atomicMin(out_ptr, idx); - } else if (keep == duplicate_keep_option::KEEP_LAST) { - // Store the greatest index of all rows that are equal. - atomicMax(out_ptr, idx); - } else { - // Count the number of rows in each group of rows that are compared equal. - atomicAdd(out_ptr, size_type{1}); - } - } -}; - -/** - * @brief The builder to construct an instance of `distinct_reduce_fn` functor base on the given - * value of the `duplicate_keep_option` member variable. - */ -struct reduce_func_builder { - duplicate_keep_option const keep; - - template - auto build(MapView const& d_map, - KeyHasher const& d_hasher, - KeyEqual const& d_equal, - size_type* const d_output) - { - return distinct_reduce_fn{ - d_map, d_hasher, d_equal, keep, d_output}; - } -}; - -} // namespace - rmm::device_uvector get_distinct_indices(table_view const& input, duplicate_keep_option keep, null_equality nulls_equal, @@ -171,21 +97,16 @@ rmm::device_uvector get_distinct_indices(table_view const& input, } // For other keep options, reduce by row on rows that compare equal. - // Depending on the `keep` parameter, the reduction operation for each row group is: - // - If `keep == KEEP_FIRST`: min of row indices in the group. - // - If `keep == KEEP_LAST`: max of row indices in the group. - // - If `keep == KEEP_NONE`: count of equivalent rows (group size). - auto const reduction_results = hash_reduce_by_row(map, - std::move(preprocessed_input), - input.num_rows(), - has_nulls, - has_nested_columns, - nulls_equal, - nans_equal, - reduce_func_builder{keep}, - reduction_init_value(keep), - stream, - mr); + auto const reduction_results = distinct_reduce(map, + std::move(preprocessed_input), + input.num_rows(), + has_nulls, + has_nested_columns, + keep, + nulls_equal, + nans_equal, + stream, + rmm::mr::get_current_device_resource()); // Extract the desired output indices from reduction results. auto const map_end = [&] { diff --git a/cpp/src/stream_compaction/distinct_reduce.cu b/cpp/src/stream_compaction/distinct_reduce.cu new file mode 100644 index 00000000000..64d29ae2ff0 --- /dev/null +++ b/cpp/src/stream_compaction/distinct_reduce.cu @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "distinct_reduce.hpp" + +#include + +namespace cudf::detail { + +namespace { +/** + * @brief The functor to find the first/last/all duplicate row for rows that compared equal. + */ +template +struct distinct_reduce_fn : reduce_by_row_fn_base { + duplicate_keep_option const keep; + + distinct_reduce_fn(MapView const& d_map, + KeyHasher const& d_hasher, + KeyEqual const& d_equal, + duplicate_keep_option const keep, + size_type* const d_output) + : reduce_by_row_fn_base{d_map, + d_hasher, + d_equal, + d_output}, + keep{keep} + { + } + + __device__ void operator()(size_type const idx) const + { + auto const out_ptr = this->get_output_ptr(idx); + + if (keep == duplicate_keep_option::KEEP_FIRST) { + // Store the smallest index of all rows that are equal. + atomicMin(out_ptr, idx); + } else if (keep == duplicate_keep_option::KEEP_LAST) { + // Store the greatest index of all rows that are equal. + atomicMax(out_ptr, idx); + } else { + // Count the number of rows in each group of rows that are compared equal. + atomicAdd(out_ptr, size_type{1}); + } + } +}; + +/** + * @brief The builder to construct an instance of `distinct_reduce_fn` functor base on the given + * value of the `duplicate_keep_option` member variable. + */ +struct reduce_func_builder { + duplicate_keep_option const keep; + + template + auto build(MapView const& d_map, + KeyHasher const& d_hasher, + KeyEqual const& d_equal, + size_type* const d_output) + { + return distinct_reduce_fn{ + d_map, d_hasher, d_equal, keep, d_output}; + } +}; + +} // namespace + +rmm::device_uvector distinct_reduce( + hash_map_type const& map, + std::shared_ptr const preprocessed_input, + size_type num_rows, + cudf::nullate::DYNAMIC has_nulls, + bool has_nested_columns, + duplicate_keep_option keep, + null_equality nulls_equal, + nan_equality nans_equal, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + CUDF_EXPECTS(keep != duplicate_keep_option::KEEP_ANY, + "This function should not be called with KEEP_ANY"); + + return hash_reduce_by_row(map, + preprocessed_input, + num_rows, + has_nulls, + has_nested_columns, + nulls_equal, + nans_equal, + reduce_func_builder{keep}, + reduction_init_value(keep), + stream, + mr); +} + +} // namespace cudf::detail diff --git a/cpp/src/stream_compaction/distinct_reduce.hpp b/cpp/src/stream_compaction/distinct_reduce.hpp new file mode 100644 index 00000000000..236b6c860c3 --- /dev/null +++ b/cpp/src/stream_compaction/distinct_reduce.hpp @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "stream_compaction_common.hpp" + +#include +#include +#include + +#include +#include + +namespace cudf::detail { + +/** + * @brief Return the reduction identity used to initialize results of `hash_reduce_by_row`. + * + * @param keep A value of `duplicate_keep_option` type, must not be `KEEP_ANY`. + * @return The initial reduction value. + */ +auto constexpr reduction_init_value(duplicate_keep_option keep) +{ + switch (keep) { + case duplicate_keep_option::KEEP_FIRST: return std::numeric_limits::max(); + case duplicate_keep_option::KEEP_LAST: return std::numeric_limits::min(); + case duplicate_keep_option::KEEP_NONE: return size_type{0}; + default: CUDF_UNREACHABLE("This function should not be called with KEEP_ANY"); + } +} + +/** + * @brief Perform a reduction on groups of rows that are compared equal. + * + * This is essentially a reduce-by-key operation with keys are non-contiguous rows and are compared + * equal. A hash table is used to find groups of equal rows. + * + * Depending on the `keep` parameter, the reduction operation for each row group is: + * - If `keep == KEEP_FIRST`: min of row indices in the group. + * - If `keep == KEEP_LAST`: max of row indices in the group. + * - If `keep == KEEP_NONE`: count of equivalent rows (group size). + * + * Note that this function is not needed when `keep == KEEP_NONE`. + * + * At the beginning of the operation, the entire output array is filled with a value given by + * the `reduction_init_value()` function. Then, the reduction result for each row group is written + * into the output array at the index of an unspecified row in the group. + * + * @param map The auxiliary map to perform reduction + * @param preprocessed_input The preprocessed of the input rows for computing row hashing and row + * comparisons + * @param num_rows The number of all input rows + * @param has_nulls Indicate whether the input rows has any nulls at any nested levels + * @param has_nested_columns Indicates whether the input table has any nested columns + * @param keep The parameter to determine what type of reduction to perform + * @param nulls_equal Flag to specify whether null elements should be considered as equal + * @param nans_equal Flag to specify whether NaN values in floating point column should be + * considered equal. + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned vector + * @return A device_uvector containing the reduction results + */ +rmm::device_uvector distinct_reduce( + hash_map_type const& map, + std::shared_ptr const preprocessed_input, + size_type num_rows, + cudf::nullate::DYNAMIC has_nulls, + bool has_nested_columns, + duplicate_keep_option keep, + null_equality nulls_equal, + nan_equality nans_equal, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); + +} // namespace cudf::detail From 65427c8211f5e4f63b2f3174f3fad284cf17f258 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 12 Sep 2023 13:31:16 -0700 Subject: [PATCH 13/19] Rename function --- cpp/CMakeLists.txt | 2 +- cpp/src/stream_compaction/distinct.cu | 22 ++++++++--------- ...distinct_reduce.cu => distinct_helpers.cu} | 24 +++++++++---------- ...stinct_reduce.hpp => distinct_helpers.hpp} | 2 +- 4 files changed, 25 insertions(+), 25 deletions(-) rename cpp/src/stream_compaction/{distinct_reduce.cu => distinct_helpers.cu} (82%) rename cpp/src/stream_compaction/{distinct_reduce.hpp => distinct_helpers.hpp} (98%) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 516865e5782..ca6444bd2f7 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -530,7 +530,7 @@ add_library( src/stream_compaction/apply_boolean_mask.cu src/stream_compaction/distinct.cu src/stream_compaction/distinct_count.cu - src/stream_compaction/distinct_reduce.cu + src/stream_compaction/distinct_helpers.cu src/stream_compaction/drop_nans.cu src/stream_compaction/drop_nulls.cu src/stream_compaction/stable_distinct.cu diff --git a/cpp/src/stream_compaction/distinct.cu b/cpp/src/stream_compaction/distinct.cu index b551df96765..de2cd6da0dd 100644 --- a/cpp/src/stream_compaction/distinct.cu +++ b/cpp/src/stream_compaction/distinct.cu @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "distinct_reduce.hpp" +#include "distinct_helpers.hpp" #include "stream_compaction_common.cuh" #include @@ -97,16 +97,16 @@ rmm::device_uvector get_distinct_indices(table_view const& input, } // For other keep options, reduce by row on rows that compare equal. - auto const reduction_results = distinct_reduce(map, - std::move(preprocessed_input), - input.num_rows(), - has_nulls, - has_nested_columns, - keep, - nulls_equal, - nans_equal, - stream, - rmm::mr::get_current_device_resource()); + auto const reduction_results = indices_reduce_by_row(map, + std::move(preprocessed_input), + input.num_rows(), + has_nulls, + has_nested_columns, + keep, + nulls_equal, + nans_equal, + stream, + rmm::mr::get_current_device_resource()); // Extract the desired output indices from reduction results. auto const map_end = [&] { diff --git a/cpp/src/stream_compaction/distinct_reduce.cu b/cpp/src/stream_compaction/distinct_helpers.cu similarity index 82% rename from cpp/src/stream_compaction/distinct_reduce.cu rename to cpp/src/stream_compaction/distinct_helpers.cu index 64d29ae2ff0..5d31e87943a 100644 --- a/cpp/src/stream_compaction/distinct_reduce.cu +++ b/cpp/src/stream_compaction/distinct_helpers.cu @@ -14,9 +14,9 @@ * limitations under the License. */ -#include "distinct_reduce.hpp" +#include "distinct_helpers.hpp" -#include +#include namespace cudf::detail { @@ -25,14 +25,14 @@ namespace { * @brief The functor to find the first/last/all duplicate row for rows that compared equal. */ template -struct distinct_reduce_fn : reduce_by_row_fn_base { +struct reduce_fn : reduce_by_row_fn_base { duplicate_keep_option const keep; - distinct_reduce_fn(MapView const& d_map, - KeyHasher const& d_hasher, - KeyEqual const& d_equal, - duplicate_keep_option const keep, - size_type* const d_output) + reduce_fn(MapView const& d_map, + KeyHasher const& d_hasher, + KeyEqual const& d_equal, + duplicate_keep_option const keep, + size_type* const d_output) : reduce_by_row_fn_base{d_map, d_hasher, d_equal, @@ -59,7 +59,7 @@ struct distinct_reduce_fn : reduce_by_row_fn_base{ - d_map, d_hasher, d_equal, keep, d_output}; + return reduce_fn{d_map, d_hasher, d_equal, keep, d_output}; } }; } // namespace -rmm::device_uvector distinct_reduce( +// This function is split from `distinct.cu` to improve compile time. +rmm::device_uvector indices_reduce_by_row( hash_map_type const& map, std::shared_ptr const preprocessed_input, size_type num_rows, diff --git a/cpp/src/stream_compaction/distinct_reduce.hpp b/cpp/src/stream_compaction/distinct_helpers.hpp similarity index 98% rename from cpp/src/stream_compaction/distinct_reduce.hpp rename to cpp/src/stream_compaction/distinct_helpers.hpp index 236b6c860c3..9ae29783ca4 100644 --- a/cpp/src/stream_compaction/distinct_reduce.hpp +++ b/cpp/src/stream_compaction/distinct_helpers.hpp @@ -72,7 +72,7 @@ auto constexpr reduction_init_value(duplicate_keep_option keep) * @param mr Device memory resource used to allocate the returned vector * @return A device_uvector containing the reduction results */ -rmm::device_uvector distinct_reduce( +rmm::device_uvector indices_reduce_by_row( hash_map_type const& map, std::shared_ptr const preprocessed_input, size_type num_rows, From 0c0c7ac8eb66d2e4192ef8499cbff1ef0b385014 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 12 Sep 2023 15:56:14 -0700 Subject: [PATCH 14/19] Fix output type --- cpp/include/cudf/detail/hash_reduce_by_row.cuh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/include/cudf/detail/hash_reduce_by_row.cuh b/cpp/include/cudf/detail/hash_reduce_by_row.cuh index 1cff009b17b..35654b90bc0 100644 --- a/cpp/include/cudf/detail/hash_reduce_by_row.cuh +++ b/cpp/include/cudf/detail/hash_reduce_by_row.cuh @@ -109,7 +109,7 @@ struct reduce_by_row_fn_base { * @return A device_uvector containing the reduction results */ template -rmm::device_uvector hash_reduce_by_row( +rmm::device_uvector hash_reduce_by_row( hash_map_type const& map, std::shared_ptr const preprocessed_input, size_type num_rows, From 01cc1c2bf82924c0f239ea90a6e360602ee34a60 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 12 Sep 2023 19:28:35 -0700 Subject: [PATCH 15/19] Move file --- .../cudf/detail => src/reductions}/hash_reduce_by_row.cuh | 0 cpp/src/stream_compaction/distinct_helpers.cu | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename cpp/{include/cudf/detail => src/reductions}/hash_reduce_by_row.cuh (100%) diff --git a/cpp/include/cudf/detail/hash_reduce_by_row.cuh b/cpp/src/reductions/hash_reduce_by_row.cuh similarity index 100% rename from cpp/include/cudf/detail/hash_reduce_by_row.cuh rename to cpp/src/reductions/hash_reduce_by_row.cuh diff --git a/cpp/src/stream_compaction/distinct_helpers.cu b/cpp/src/stream_compaction/distinct_helpers.cu index 5d31e87943a..cb0dc4b1c50 100644 --- a/cpp/src/stream_compaction/distinct_helpers.cu +++ b/cpp/src/stream_compaction/distinct_helpers.cu @@ -16,7 +16,7 @@ #include "distinct_helpers.hpp" -#include +#include namespace cudf::detail { From f5a6a1a66841b82b1da0ff75a21d0faa98440847 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 12 Sep 2023 19:30:15 -0700 Subject: [PATCH 16/19] Rename function --- cpp/src/stream_compaction/distinct.cu | 20 +++++++++---------- cpp/src/stream_compaction/distinct_helpers.cu | 2 +- .../stream_compaction/distinct_helpers.hpp | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/cpp/src/stream_compaction/distinct.cu b/cpp/src/stream_compaction/distinct.cu index de2cd6da0dd..e031727c21a 100644 --- a/cpp/src/stream_compaction/distinct.cu +++ b/cpp/src/stream_compaction/distinct.cu @@ -97,16 +97,16 @@ rmm::device_uvector get_distinct_indices(table_view const& input, } // For other keep options, reduce by row on rows that compare equal. - auto const reduction_results = indices_reduce_by_row(map, - std::move(preprocessed_input), - input.num_rows(), - has_nulls, - has_nested_columns, - keep, - nulls_equal, - nans_equal, - stream, - rmm::mr::get_current_device_resource()); + auto const reduction_results = reduce_by_row(map, + std::move(preprocessed_input), + input.num_rows(), + has_nulls, + has_nested_columns, + keep, + nulls_equal, + nans_equal, + stream, + rmm::mr::get_current_device_resource()); // Extract the desired output indices from reduction results. auto const map_end = [&] { diff --git a/cpp/src/stream_compaction/distinct_helpers.cu b/cpp/src/stream_compaction/distinct_helpers.cu index cb0dc4b1c50..a9df0bc98b8 100644 --- a/cpp/src/stream_compaction/distinct_helpers.cu +++ b/cpp/src/stream_compaction/distinct_helpers.cu @@ -78,7 +78,7 @@ struct reduce_func_builder { } // namespace // This function is split from `distinct.cu` to improve compile time. -rmm::device_uvector indices_reduce_by_row( +rmm::device_uvector reduce_by_row( hash_map_type const& map, std::shared_ptr const preprocessed_input, size_type num_rows, diff --git a/cpp/src/stream_compaction/distinct_helpers.hpp b/cpp/src/stream_compaction/distinct_helpers.hpp index 9ae29783ca4..b667d0b04f0 100644 --- a/cpp/src/stream_compaction/distinct_helpers.hpp +++ b/cpp/src/stream_compaction/distinct_helpers.hpp @@ -72,7 +72,7 @@ auto constexpr reduction_init_value(duplicate_keep_option keep) * @param mr Device memory resource used to allocate the returned vector * @return A device_uvector containing the reduction results */ -rmm::device_uvector indices_reduce_by_row( +rmm::device_uvector reduce_by_row( hash_map_type const& map, std::shared_ptr const preprocessed_input, size_type num_rows, From a4de11bf0c4b7348536f0fe276bcc71f45d6e01b Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Wed, 13 Sep 2023 13:55:27 -0700 Subject: [PATCH 17/19] Revert "Move file" This reverts commit 01cc1c2bf82924c0f239ea90a6e360602ee34a60. --- .../reductions => include/cudf/detail}/hash_reduce_by_row.cuh | 0 cpp/src/stream_compaction/distinct_helpers.cu | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename cpp/{src/reductions => include/cudf/detail}/hash_reduce_by_row.cuh (100%) diff --git a/cpp/src/reductions/hash_reduce_by_row.cuh b/cpp/include/cudf/detail/hash_reduce_by_row.cuh similarity index 100% rename from cpp/src/reductions/hash_reduce_by_row.cuh rename to cpp/include/cudf/detail/hash_reduce_by_row.cuh diff --git a/cpp/src/stream_compaction/distinct_helpers.cu b/cpp/src/stream_compaction/distinct_helpers.cu index a9df0bc98b8..8f36ec98f4a 100644 --- a/cpp/src/stream_compaction/distinct_helpers.cu +++ b/cpp/src/stream_compaction/distinct_helpers.cu @@ -16,7 +16,7 @@ #include "distinct_helpers.hpp" -#include +#include namespace cudf::detail { From bdbb1dbe4403ad42fb3f69a8e955067216c96a35 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Wed, 13 Sep 2023 14:09:09 -0700 Subject: [PATCH 18/19] Get rid of `compaction_hash` and sentinel values --- .../cudf/detail/hash_reduce_by_row.cuh | 9 +++++--- cpp/src/stream_compaction/distinct.cu | 6 ++--- cpp/src/stream_compaction/distinct_count.cu | 4 ++-- .../stream_compaction_common.cuh | 22 ------------------- .../stream_compaction_common.hpp | 5 ----- 5 files changed, 11 insertions(+), 35 deletions(-) diff --git a/cpp/include/cudf/detail/hash_reduce_by_row.cuh b/cpp/include/cudf/detail/hash_reduce_by_row.cuh index 35654b90bc0..2d2b43f1d4a 100644 --- a/cpp/include/cudf/detail/hash_reduce_by_row.cuh +++ b/cpp/include/cudf/detail/hash_reduce_by_row.cuh @@ -14,8 +14,6 @@ * limitations under the License. */ -#include - #include #include @@ -27,8 +25,13 @@ #include #include +#include + namespace cudf::detail { +using hash_map_type = + cuco::static_map; + /** * @brief The base struct for customized reduction functor to perform reduce-by-key with keys are * rows that compared equal. @@ -124,7 +127,7 @@ rmm::device_uvector hash_reduce_by_row( { auto const map_dview = map.get_device_view(); auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input); - auto const key_hasher = experimental::compaction_hash(row_hasher.device_hasher(has_nulls)); + auto const key_hasher = row_hasher.device_hasher(has_nulls); auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input); auto reduction_results = rmm::device_uvector(num_rows, stream, mr); diff --git a/cpp/src/stream_compaction/distinct.cu b/cpp/src/stream_compaction/distinct.cu index e031727c21a..8d2b12ab141 100644 --- a/cpp/src/stream_compaction/distinct.cu +++ b/cpp/src/stream_compaction/distinct.cu @@ -51,8 +51,8 @@ rmm::device_uvector get_distinct_indices(table_view const& input, } auto map = hash_map_type{compute_hash_table_size(input.num_rows()), - cuco::empty_key{COMPACTION_EMPTY_KEY_SENTINEL}, - cuco::empty_value{COMPACTION_EMPTY_VALUE_SENTINEL}, + cuco::empty_key{-1}, + cuco::empty_value{std::numeric_limits::min()}, detail::hash_table_allocator_type{default_allocator{}, stream}, stream.value()}; @@ -62,7 +62,7 @@ rmm::device_uvector get_distinct_indices(table_view const& input, auto const has_nested_columns = cudf::detail::has_nested_columns(input); auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input); - auto const key_hasher = experimental::compaction_hash(row_hasher.device_hasher(has_nulls)); + auto const key_hasher = row_hasher.device_hasher(has_nulls); auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input); diff --git a/cpp/src/stream_compaction/distinct_count.cu b/cpp/src/stream_compaction/distinct_count.cu index 4bca0827efe..ac4811ad279 100644 --- a/cpp/src/stream_compaction/distinct_count.cu +++ b/cpp/src/stream_compaction/distinct_count.cu @@ -136,14 +136,14 @@ cudf::size_type distinct_count(table_view const& keys, auto const preprocessed_input = cudf::experimental::row::hash::preprocessed_table::create(keys, stream); auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input); - auto const hash_key = experimental::compaction_hash(row_hasher.device_hasher(has_nulls)); + auto const hash_key = row_hasher.device_hasher(has_nulls); auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input); auto const comparator_helper = [&](auto const row_equal) { using hasher_type = decltype(hash_key); auto key_set = cuco::experimental::static_set{ cuco::experimental::extent{compute_hash_table_size(num_rows)}, - cuco::empty_key{COMPACTION_EMPTY_KEY_SENTINEL}, + cuco::empty_key{-1}, row_equal, cuco::experimental::linear_probing<1, hasher_type>{hash_key}, detail::hash_table_allocator_type{default_allocator{}, stream}, diff --git a/cpp/src/stream_compaction/stream_compaction_common.cuh b/cpp/src/stream_compaction/stream_compaction_common.cuh index 4779cd990fd..839672d6a56 100644 --- a/cpp/src/stream_compaction/stream_compaction_common.cuh +++ b/cpp/src/stream_compaction/stream_compaction_common.cuh @@ -29,28 +29,6 @@ namespace cudf { namespace detail { -namespace experimental { - -/** - * @brief Device callable to hash a given row. - */ -template -class compaction_hash { - public: - compaction_hash(RowHash row_hasher) : _hash{row_hasher} {} - - __device__ inline auto operator()(size_type i) const noexcept - { - auto hash = _hash(i); - return (hash == COMPACTION_EMPTY_KEY_SENTINEL) ? (hash - 1) : hash; - } - - private: - RowHash _hash; -}; - -} // namespace experimental - /**  * @brief Device functor to determine if a row is valid.  */ diff --git a/cpp/src/stream_compaction/stream_compaction_common.hpp b/cpp/src/stream_compaction/stream_compaction_common.hpp index 0cd2d8f4b14..58d958d2ff4 100644 --- a/cpp/src/stream_compaction/stream_compaction_common.hpp +++ b/cpp/src/stream_compaction/stream_compaction_common.hpp @@ -30,11 +30,6 @@ namespace cudf { namespace detail { -constexpr auto COMPACTION_EMPTY_KEY_SENTINEL = std::numeric_limits::max(); -constexpr auto COMPACTION_EMPTY_VALUE_SENTINEL = std::numeric_limits::min(); - -using hash_type = cuco::murmurhash3_32; - using hash_table_allocator_type = rmm::mr::stream_allocator_adaptor>; using hash_map_type = From 458d75ac0cd7909fbe6790b01402fd99cd3d64fd Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Wed, 13 Sep 2023 14:12:09 -0700 Subject: [PATCH 19/19] Remove unused header --- cpp/src/stream_compaction/distinct.cu | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/stream_compaction/distinct.cu b/cpp/src/stream_compaction/distinct.cu index 8d2b12ab141..cc1e3423d42 100644 --- a/cpp/src/stream_compaction/distinct.cu +++ b/cpp/src/stream_compaction/distinct.cu @@ -15,7 +15,6 @@ */ #include "distinct_helpers.hpp" -#include "stream_compaction_common.cuh" #include #include