Skip to content

Commit

Permalink
Merge distinct_reduce.* into distinct.cu
Browse files Browse the repository at this point in the history
  • Loading branch information
ttnghia committed Sep 12, 2023
1 parent e73c07f commit 40e8730
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 208 deletions.
1 change: 0 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,6 @@ add_library(
src/stream_compaction/apply_boolean_mask.cu
src/stream_compaction/distinct.cu
src/stream_compaction/distinct_count.cu
src/stream_compaction/distinct_reduce.cu
src/stream_compaction/drop_nans.cu
src/stream_compaction/drop_nulls.cu
src/stream_compaction/stable_distinct.cu
Expand Down
101 changes: 90 additions & 11 deletions cpp/src/stream_compaction/distinct.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
* limitations under the License.
*/

#include "distinct_reduce.hpp"
#include "stream_compaction_common.cuh"

#include <cudf/column/column_view.hpp>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/hash_reduce_by_row.cuh>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/stream_compaction.hpp>
Expand All @@ -39,6 +39,80 @@
namespace cudf {
namespace detail {

namespace {
/**
* @brief Return the reduction identity used to initialize results of `hash_reduce_by_row`.
*
* @param keep A value of `duplicate_keep_option` type, must not be `KEEP_ANY`.
* @return The initial reduction value.
*/
auto constexpr reduction_init_value(duplicate_keep_option keep)
{
switch (keep) {
case duplicate_keep_option::KEEP_FIRST: return std::numeric_limits<size_type>::max();
case duplicate_keep_option::KEEP_LAST: return std::numeric_limits<size_type>::min();
case duplicate_keep_option::KEEP_NONE: return size_type{0};
default: CUDF_UNREACHABLE("This function should not be called with KEEP_ANY");
}
}

/**
* @brief The functor to find the first/last/all duplicate row for rows that compared equal.
*/
template <typename MapView, typename KeyHasher, typename KeyEqual>
struct distinct_reduce_fn : reduce_by_row_fn_base<MapView, KeyHasher, KeyEqual, size_type> {
duplicate_keep_option const keep;

distinct_reduce_fn(MapView const& d_map,
KeyHasher const& d_hasher,
KeyEqual const& d_equal,
duplicate_keep_option const keep,
size_type* const d_output)
: reduce_by_row_fn_base<MapView, KeyHasher, KeyEqual, size_type>{d_map,
d_hasher,
d_equal,
d_output},
keep{keep}
{
}

__device__ void operator()(size_type const idx) const
{
auto const out_ptr = this->get_output_ptr(idx);

if (keep == duplicate_keep_option::KEEP_FIRST) {
// Store the smallest index of all rows that are equal.
atomicMin(out_ptr, idx);
} else if (keep == duplicate_keep_option::KEEP_LAST) {
// Store the greatest index of all rows that are equal.
atomicMax(out_ptr, idx);
} else {
// Count the number of rows in each group of rows that are compared equal.
atomicAdd(out_ptr, size_type{1});
}
}
};

/**
* @brief The builder to construct an instance of `distinct_reduce_fn` functor base on the given
* value of the `duplicate_keep_option` member variable.
*/
struct reduce_func_builder {
duplicate_keep_option const keep;

template <typename MapView, typename KeyHasher, typename KeyEqual>
auto build(MapView const& d_map,
KeyHasher const& d_hasher,
KeyEqual const& d_equal,
size_type* const d_output)
{
return distinct_reduce_fn<MapView, KeyHasher, KeyEqual>{
d_map, d_hasher, d_equal, keep, d_output};
}
};

} // namespace

rmm::device_uvector<size_type> get_distinct_indices(table_view const& input,
duplicate_keep_option keep,
null_equality nulls_equal,
Expand Down Expand Up @@ -97,16 +171,21 @@ rmm::device_uvector<size_type> get_distinct_indices(table_view const& input,
}

// For other keep options, reduce by row on rows that compare equal.
auto const reduction_results = distinct_reduce(map,
std::move(preprocessed_input),
input.num_rows(),
has_nulls,
has_nested_columns,
keep,
nulls_equal,
nans_equal,
stream,
rmm::mr::get_current_device_resource());
// Depending on the `keep` parameter, the reduction operation for each row group is:
// - If `keep == KEEP_FIRST`: min of row indices in the group.
// - If `keep == KEEP_LAST`: max of row indices in the group.
// - If `keep == KEEP_NONE`: count of equivalent rows (group size).
auto const reduction_results = hash_reduce_by_row(map,
std::move(preprocessed_input),
input.num_rows(),
has_nulls,
has_nested_columns,
nulls_equal,
nans_equal,
reduce_func_builder{keep},
reduction_init_value(keep),
stream,
mr);

// Extract the desired output indices from reduction results.
auto const map_end = [&] {
Expand Down
109 changes: 0 additions & 109 deletions cpp/src/stream_compaction/distinct_reduce.cu

This file was deleted.

87 changes: 0 additions & 87 deletions cpp/src/stream_compaction/distinct_reduce.hpp

This file was deleted.

0 comments on commit 40e8730

Please sign in to comment.