diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 516865e5782..ca6444bd2f7 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -530,7 +530,7 @@ add_library( src/stream_compaction/apply_boolean_mask.cu src/stream_compaction/distinct.cu src/stream_compaction/distinct_count.cu - src/stream_compaction/distinct_reduce.cu + src/stream_compaction/distinct_helpers.cu src/stream_compaction/drop_nans.cu src/stream_compaction/drop_nulls.cu src/stream_compaction/stable_distinct.cu diff --git a/cpp/include/cudf/detail/hash_reduce_by_row.cuh b/cpp/include/cudf/detail/hash_reduce_by_row.cuh new file mode 100644 index 00000000000..2d2b43f1d4a --- /dev/null +++ b/cpp/include/cudf/detail/hash_reduce_by_row.cuh @@ -0,0 +1,167 @@ +/* + * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include + +namespace cudf::detail { + +using hash_map_type = + cuco::static_map; + +/** + * @brief The base struct for customized reduction functor to perform reduce-by-key with keys are + * rows that compared equal. + * + * TODO: We need to switch to use `static_reduction_map` when it is ready + * (https://github.com/NVIDIA/cuCollections/pull/98). + */ +template +struct reduce_by_row_fn_base { + protected: + MapView const d_map; + KeyHasher const d_hasher; + KeyEqual const d_equal; + OutputType* const d_output; + + reduce_by_row_fn_base(MapView const& d_map, + KeyHasher const& d_hasher, + KeyEqual const& d_equal, + OutputType* const d_output) + : d_map{d_map}, d_hasher{d_hasher}, d_equal{d_equal}, d_output{d_output} + { + } + + /** + * @brief Return a pointer to the output array at the given index. + * + * @param idx The access index + * @return A pointer to the given index in the output array + */ + __device__ OutputType* get_output_ptr(size_type const idx) const + { + auto const iter = d_map.find(idx, d_hasher, d_equal); + + if (iter != d_map.end()) { + // Only one (undetermined) index value of the duplicate rows could be inserted into the map. + // As such, looking up for all indices of duplicate rows always returns the same value. + auto const inserted_idx = iter->second.load(cuda::std::memory_order_relaxed); + + // All duplicate rows will have concurrent access to this same output slot. + return &d_output[inserted_idx]; + } else { + // All input `idx` values have been inserted into the map before. + // Thus, searching for an `idx` key resulting in the `end()` iterator only happens if + // `d_equal(idx, idx) == false`. + // Such situations are due to comparing nulls or NaNs which are considered as always unequal. + // In those cases, all rows containing nulls or NaNs are distinct. Just return their direct + // output slot. + return &d_output[idx]; + } + } +}; + +/** + * @brief Perform a reduction on groups of rows that are compared equal. + * + * This is essentially a reduce-by-key operation with keys are non-contiguous rows and are compared + * equal. A hash table is used to find groups of equal rows. + * + * At the beginning of the operation, the entire output array is filled with a value given by + * the `init` parameter. Then, the reduction result for each row group is written into the output + * array at the index of an unspecified row in the group. + * + * @tparam ReduceFuncBuilder The builder class that must have a `build()` method returning a + * reduction functor derived from `reduce_by_row_fn_base` + * @tparam OutputType Type of the reduction results + * @param map The auxiliary map to perform reduction + * @param preprocessed_input The preprocessed of the input rows for computing row hashing and row + * comparisons + * @param num_rows The number of all input rows + * @param has_nulls Indicate whether the input rows has any nulls at any nested levels + * @param has_nested_columns Indicates whether the input table has any nested columns + * @param nulls_equal Flag to specify whether null elements should be considered as equal + * @param nans_equal Flag to specify whether NaN values in floating point column should be + * considered equal. + * @param init The initial value for reduction of each row group + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned vector + * @return A device_uvector containing the reduction results + */ +template +rmm::device_uvector hash_reduce_by_row( + hash_map_type const& map, + std::shared_ptr const preprocessed_input, + size_type num_rows, + cudf::nullate::DYNAMIC has_nulls, + bool has_nested_columns, + null_equality nulls_equal, + nan_equality nans_equal, + ReduceFuncBuilder func_builder, + OutputType init, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + auto const map_dview = map.get_device_view(); + auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input); + auto const key_hasher = row_hasher.device_hasher(has_nulls); + auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input); + + auto reduction_results = rmm::device_uvector(num_rows, stream, mr); + thrust::uninitialized_fill( + rmm::exec_policy(stream), reduction_results.begin(), reduction_results.end(), init); + + auto const reduce_by_row = [&](auto const value_comp) { + if (has_nested_columns) { + auto const key_equal = row_comp.equal_to(has_nulls, nulls_equal, value_comp); + thrust::for_each( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_rows), + func_builder.build(map_dview, key_hasher, key_equal, reduction_results.begin())); + } else { + auto const key_equal = row_comp.equal_to(has_nulls, nulls_equal, value_comp); + thrust::for_each( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_rows), + func_builder.build(map_dview, key_hasher, key_equal, reduction_results.begin())); + } + }; + + if (nans_equal == nan_equality::ALL_EQUAL) { + using nan_equal_comparator = + cudf::experimental::row::equality::nan_equal_physical_equality_comparator; + reduce_by_row(nan_equal_comparator{}); + } else { + using nan_unequal_comparator = cudf::experimental::row::equality::physical_equality_comparator; + reduce_by_row(nan_unequal_comparator{}); + } + + return reduction_results; +} + +} // namespace cudf::detail diff --git a/cpp/src/stream_compaction/distinct.cu b/cpp/src/stream_compaction/distinct.cu index cc60b2a12ea..cc1e3423d42 100644 --- a/cpp/src/stream_compaction/distinct.cu +++ b/cpp/src/stream_compaction/distinct.cu @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "distinct_reduce.cuh" +#include "distinct_helpers.hpp" #include #include @@ -50,8 +50,8 @@ rmm::device_uvector get_distinct_indices(table_view const& input, } auto map = hash_map_type{compute_hash_table_size(input.num_rows()), - cuco::empty_key{COMPACTION_EMPTY_KEY_SENTINEL}, - cuco::empty_value{COMPACTION_EMPTY_VALUE_SENTINEL}, + cuco::empty_key{-1}, + cuco::empty_value{std::numeric_limits::min()}, detail::hash_table_allocator_type{default_allocator{}, stream}, stream.value()}; @@ -61,7 +61,7 @@ rmm::device_uvector get_distinct_indices(table_view const& input, auto const has_nested_columns = cudf::detail::has_nested_columns(input); auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input); - auto const key_hasher = experimental::compaction_hash(row_hasher.device_hasher(has_nulls)); + auto const key_hasher = row_hasher.device_hasher(has_nulls); auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input); @@ -96,16 +96,16 @@ rmm::device_uvector get_distinct_indices(table_view const& input, } // For other keep options, reduce by row on rows that compare equal. - auto const reduction_results = hash_reduce_by_row(map, - std::move(preprocessed_input), - input.num_rows(), - has_nulls, - has_nested_columns, - keep, - nulls_equal, - nans_equal, - stream, - rmm::mr::get_current_device_resource()); + auto const reduction_results = reduce_by_row(map, + std::move(preprocessed_input), + input.num_rows(), + has_nulls, + has_nested_columns, + keep, + nulls_equal, + nans_equal, + stream, + rmm::mr::get_current_device_resource()); // Extract the desired output indices from reduction results. auto const map_end = [&] { diff --git a/cpp/src/stream_compaction/distinct_count.cu b/cpp/src/stream_compaction/distinct_count.cu index 4bca0827efe..ac4811ad279 100644 --- a/cpp/src/stream_compaction/distinct_count.cu +++ b/cpp/src/stream_compaction/distinct_count.cu @@ -136,14 +136,14 @@ cudf::size_type distinct_count(table_view const& keys, auto const preprocessed_input = cudf::experimental::row::hash::preprocessed_table::create(keys, stream); auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input); - auto const hash_key = experimental::compaction_hash(row_hasher.device_hasher(has_nulls)); + auto const hash_key = row_hasher.device_hasher(has_nulls); auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input); auto const comparator_helper = [&](auto const row_equal) { using hasher_type = decltype(hash_key); auto key_set = cuco::experimental::static_set{ cuco::experimental::extent{compute_hash_table_size(num_rows)}, - cuco::empty_key{COMPACTION_EMPTY_KEY_SENTINEL}, + cuco::empty_key{-1}, row_equal, cuco::experimental::linear_probing<1, hasher_type>{hash_key}, detail::hash_table_allocator_type{default_allocator{}, stream}, diff --git a/cpp/src/stream_compaction/distinct_helpers.cu b/cpp/src/stream_compaction/distinct_helpers.cu new file mode 100644 index 00000000000..8f36ec98f4a --- /dev/null +++ b/cpp/src/stream_compaction/distinct_helpers.cu @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "distinct_helpers.hpp" + +#include + +namespace cudf::detail { + +namespace { +/** + * @brief The functor to find the first/last/all duplicate row for rows that compared equal. + */ +template +struct reduce_fn : reduce_by_row_fn_base { + duplicate_keep_option const keep; + + reduce_fn(MapView const& d_map, + KeyHasher const& d_hasher, + KeyEqual const& d_equal, + duplicate_keep_option const keep, + size_type* const d_output) + : reduce_by_row_fn_base{d_map, + d_hasher, + d_equal, + d_output}, + keep{keep} + { + } + + __device__ void operator()(size_type const idx) const + { + auto const out_ptr = this->get_output_ptr(idx); + + if (keep == duplicate_keep_option::KEEP_FIRST) { + // Store the smallest index of all rows that are equal. + atomicMin(out_ptr, idx); + } else if (keep == duplicate_keep_option::KEEP_LAST) { + // Store the greatest index of all rows that are equal. + atomicMax(out_ptr, idx); + } else { + // Count the number of rows in each group of rows that are compared equal. + atomicAdd(out_ptr, size_type{1}); + } + } +}; + +/** + * @brief The builder to construct an instance of `reduce_fn` functor base on the given + * value of the `duplicate_keep_option` member variable. + */ +struct reduce_func_builder { + duplicate_keep_option const keep; + + template + auto build(MapView const& d_map, + KeyHasher const& d_hasher, + KeyEqual const& d_equal, + size_type* const d_output) + { + return reduce_fn{d_map, d_hasher, d_equal, keep, d_output}; + } +}; + +} // namespace + +// This function is split from `distinct.cu` to improve compile time. +rmm::device_uvector reduce_by_row( + hash_map_type const& map, + std::shared_ptr const preprocessed_input, + size_type num_rows, + cudf::nullate::DYNAMIC has_nulls, + bool has_nested_columns, + duplicate_keep_option keep, + null_equality nulls_equal, + nan_equality nans_equal, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + CUDF_EXPECTS(keep != duplicate_keep_option::KEEP_ANY, + "This function should not be called with KEEP_ANY"); + + return hash_reduce_by_row(map, + preprocessed_input, + num_rows, + has_nulls, + has_nested_columns, + nulls_equal, + nans_equal, + reduce_func_builder{keep}, + reduction_init_value(keep), + stream, + mr); +} + +} // namespace cudf::detail diff --git a/cpp/src/stream_compaction/distinct_reduce.cuh b/cpp/src/stream_compaction/distinct_helpers.hpp similarity index 92% rename from cpp/src/stream_compaction/distinct_reduce.cuh rename to cpp/src/stream_compaction/distinct_helpers.hpp index 8ec1fa18205..b667d0b04f0 100644 --- a/cpp/src/stream_compaction/distinct_reduce.cuh +++ b/cpp/src/stream_compaction/distinct_helpers.hpp @@ -14,18 +14,14 @@ * limitations under the License. */ -#include "stream_compaction_common.cuh" +#include "stream_compaction_common.hpp" -#include #include #include #include #include #include -#include - -#include namespace cudf::detail { @@ -56,6 +52,8 @@ auto constexpr reduction_init_value(duplicate_keep_option keep) * - If `keep == KEEP_LAST`: max of row indices in the group. * - If `keep == KEEP_NONE`: count of equivalent rows (group size). * + * Note that this function is not needed when `keep == KEEP_NONE`. + * * At the beginning of the operation, the entire output array is filled with a value given by * the `reduction_init_value()` function. Then, the reduction result for each row group is written * into the output array at the index of an unspecified row in the group. @@ -68,11 +66,13 @@ auto constexpr reduction_init_value(duplicate_keep_option keep) * @param has_nested_columns Indicates whether the input table has any nested columns * @param keep The parameter to determine what type of reduction to perform * @param nulls_equal Flag to specify whether null elements should be considered as equal + * @param nans_equal Flag to specify whether NaN values in floating point column should be + * considered equal. * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned vector * @return A device_uvector containing the reduction results */ -rmm::device_uvector hash_reduce_by_row( +rmm::device_uvector reduce_by_row( hash_map_type const& map, std::shared_ptr const preprocessed_input, size_type num_rows, diff --git a/cpp/src/stream_compaction/distinct_reduce.cu b/cpp/src/stream_compaction/distinct_reduce.cu deleted file mode 100644 index 020e6a495bc..00000000000 --- a/cpp/src/stream_compaction/distinct_reduce.cu +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "distinct_reduce.cuh" - -#include -#include -#include - -namespace cudf::detail { - -namespace { -/** - * @brief A functor to perform reduce-by-key with keys are rows that compared equal. - * - * TODO: We need to switch to use `static_reduction_map` when it is ready - * (https://github.com/NVIDIA/cuCollections/pull/98). - */ -template -struct reduce_by_row_fn { - MapView const d_map; - KeyHasher const d_hasher; - KeyEqual const d_equal; - duplicate_keep_option const keep; - size_type* const d_output; - - reduce_by_row_fn(MapView const& d_map, - KeyHasher const& d_hasher, - KeyEqual const& d_equal, - duplicate_keep_option const keep, - size_type* const d_output) - : d_map{d_map}, d_hasher{d_hasher}, d_equal{d_equal}, keep{keep}, d_output{d_output} - { - } - - __device__ void operator()(size_type const idx) const - { - auto const out_ptr = get_output_ptr(idx); - - if (keep == duplicate_keep_option::KEEP_FIRST) { - // Store the smallest index of all rows that are equal. - atomicMin(out_ptr, idx); - } else if (keep == duplicate_keep_option::KEEP_LAST) { - // Store the greatest index of all rows that are equal. - atomicMax(out_ptr, idx); - } else { - // Count the number of rows in each group of rows that are compared equal. - atomicAdd(out_ptr, size_type{1}); - } - } - - private: - __device__ size_type* get_output_ptr(size_type const idx) const - { - auto const iter = d_map.find(idx, d_hasher, d_equal); - - if (iter != d_map.end()) { - // Only one index value of the duplicate rows could be inserted into the map. - // As such, looking up for all indices of duplicate rows always returns the same value. - auto const inserted_idx = iter->second.load(cuda::std::memory_order_relaxed); - - // All duplicate rows will have concurrent access to this same output slot. - return &d_output[inserted_idx]; - } else { - // All input `idx` values have been inserted into the map before. - // Thus, searching for an `idx` key resulting in the `end()` iterator only happens if - // `d_equal(idx, idx) == false`. - // Such situations are due to comparing nulls or NaNs which are considered as always unequal. - // In those cases, all rows containing nulls or NaNs are distinct. Just return their direct - // output slot. - return &d_output[idx]; - } - } -}; - -} // namespace - -rmm::device_uvector hash_reduce_by_row( - hash_map_type const& map, - std::shared_ptr const preprocessed_input, - size_type num_rows, - cudf::nullate::DYNAMIC has_nulls, - bool has_nested_columns, - duplicate_keep_option keep, - null_equality nulls_equal, - nan_equality nans_equal, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - CUDF_EXPECTS(keep != duplicate_keep_option::KEEP_ANY, - "This function should not be called with KEEP_ANY"); - - auto reduction_results = rmm::device_uvector(num_rows, stream, mr); - - thrust::uninitialized_fill(rmm::exec_policy(stream), - reduction_results.begin(), - reduction_results.end(), - reduction_init_value(keep)); - - auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input); - auto const key_hasher = experimental::compaction_hash(row_hasher.device_hasher(has_nulls)); - - auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input); - - auto const reduce_by_row = [&](auto const value_comp) { - if (has_nested_columns) { - auto const key_equal = row_comp.equal_to(has_nulls, nulls_equal, value_comp); - thrust::for_each( - rmm::exec_policy(stream), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(num_rows), - reduce_by_row_fn{ - map.get_device_view(), key_hasher, key_equal, keep, reduction_results.begin()}); - } else { - auto const key_equal = row_comp.equal_to(has_nulls, nulls_equal, value_comp); - thrust::for_each( - rmm::exec_policy(stream), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(num_rows), - reduce_by_row_fn{ - map.get_device_view(), key_hasher, key_equal, keep, reduction_results.begin()}); - } - }; - - if (nans_equal == nan_equality::ALL_EQUAL) { - using nan_equal_comparator = - cudf::experimental::row::equality::nan_equal_physical_equality_comparator; - reduce_by_row(nan_equal_comparator{}); - } else { - using nan_unequal_comparator = cudf::experimental::row::equality::physical_equality_comparator; - reduce_by_row(nan_unequal_comparator{}); - } - - return reduction_results; -} - -} // namespace cudf::detail diff --git a/cpp/src/stream_compaction/stream_compaction_common.cuh b/cpp/src/stream_compaction/stream_compaction_common.cuh index 4779cd990fd..839672d6a56 100644 --- a/cpp/src/stream_compaction/stream_compaction_common.cuh +++ b/cpp/src/stream_compaction/stream_compaction_common.cuh @@ -29,28 +29,6 @@ namespace cudf { namespace detail { -namespace experimental { - -/** - * @brief Device callable to hash a given row. - */ -template -class compaction_hash { - public: - compaction_hash(RowHash row_hasher) : _hash{row_hasher} {} - - __device__ inline auto operator()(size_type i) const noexcept - { - auto hash = _hash(i); - return (hash == COMPACTION_EMPTY_KEY_SENTINEL) ? (hash - 1) : hash; - } - - private: - RowHash _hash; -}; - -} // namespace experimental - /**  * @brief Device functor to determine if a row is valid.  */ diff --git a/cpp/src/stream_compaction/stream_compaction_common.hpp b/cpp/src/stream_compaction/stream_compaction_common.hpp index 0cd2d8f4b14..58d958d2ff4 100644 --- a/cpp/src/stream_compaction/stream_compaction_common.hpp +++ b/cpp/src/stream_compaction/stream_compaction_common.hpp @@ -30,11 +30,6 @@ namespace cudf { namespace detail { -constexpr auto COMPACTION_EMPTY_KEY_SENTINEL = std::numeric_limits::max(); -constexpr auto COMPACTION_EMPTY_VALUE_SENTINEL = std::numeric_limits::min(); - -using hash_type = cuco::murmurhash3_32; - using hash_table_allocator_type = rmm::mr::stream_allocator_adaptor>; using hash_map_type =