Skip to content

Commit

Permalink
First working prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
PointKernel committed Nov 27, 2024
1 parent 6e3ab5f commit 849fe60
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 74 deletions.
42 changes: 15 additions & 27 deletions cpp/include/cudf/table/primitive_row_operators.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ class element_equality_comparator {
__device__ bool operator()(size_type lhs_element_index,
size_type rhs_element_index) const noexcept
{
return equality_compare(lhs.element<Element>(lhs_element_index),
rhs.element<Element>(rhs_element_index));
return primitive::equality_compare(lhs.element<Element>(lhs_element_index),
rhs.element<Element>(rhs_element_index));
}

// @cond
Expand Down Expand Up @@ -183,10 +183,11 @@ class row_equality_comparator {
*/
__device__ bool operator()(size_type lhs_row_index, size_type rhs_row_index) const noexcept
{
return cudf::type_dispatcher(lhs.begin().type(),
element_equality_comparator{lhs.begin(), rhs.begin()},
lhs_row_index,
rhs_row_index);
return cudf::type_dispatcher<dispatch_storage_type>(
lhs.begin()->type(),
element_equality_comparator{*lhs.begin(), *rhs.begin()},
lhs_row_index,
rhs_row_index);
}

private:
Expand All @@ -213,18 +214,6 @@ class element_relational_comparator {
{
}

/**
* @brief Construct type-dispatched function object for performing a relational comparison between
* two elements in two columns.
*
* @param lhs The column containing the first element
* @param rhs The column containing the second element (may be the same as lhs)
*/
__host__ __device__ element_relational_comparator(column_device_view lhs, column_device_view rhs)
: lhs{lhs}, rhs{rhs}
{
}

/**
* @brief Performs a relational comparison between the specified elements
*
Expand All @@ -233,13 +222,12 @@ class element_relational_comparator {
* @return Indicates the relationship between the elements in
* the `lhs` and `rhs` columns.
*/
template <typename Element,
CUDF_ENABLE_IF(cudf::is_relationally_comparable<Element, Element>() >)>
template <typename Element, CUDF_ENABLE_IF(cudf::is_relationally_comparable<Element, Element>())>
__device__ weak_ordering operator()(size_type lhs_element_index,
size_type rhs_element_index) const noexcept
{
return relational_compare(lhs.element<Element>(lhs_element_index),
rhs.element<Element>(rhs_element_index));
return primitive::relational_compare(lhs.element<Element>(lhs_element_index),
rhs.element<Element>(rhs_element_index));
}

// @cond
Expand Down Expand Up @@ -288,7 +276,7 @@ class row_lexicographic_comparator {
row_lexicographic_comparator(table_device_view lhs,
table_device_view rhs,
order const* column_order = nullptr)
: _lhs{lhs}, _rhs{rhs}, _column_order{column_order},
: _lhs{lhs}, _rhs{rhs}, _column_order{column_order}
{
CUDF_EXPECTS(_lhs.num_columns() == _rhs.num_columns(), "Mismatched number of columns.");
}
Expand All @@ -309,8 +297,8 @@ class row_lexicographic_comparator {

auto comparator = element_relational_comparator{_lhs.column(i), _rhs.column(i)};

weak_ordering state =
cudf::type_dispatcher(_lhs.column(i).type(), comparator, lhs_index, rhs_index);
weak_ordering state = cudf::type_dispatcher<dispatch_storage_type>(
_lhs.column(i).type(), comparator, lhs_index, rhs_index);

if (state == weak_ordering::EQUIVALENT) { continue; }

Expand All @@ -330,7 +318,7 @@ class row_lexicographic_comparator {
*
* @tparam Hash Hash functor to use for hashing elements
*/
template <class Hash>
template <template <typename> class Hash>
class element_hasher {
public:
/**
Expand Down Expand Up @@ -377,7 +365,7 @@ class element_hasher {
*
* @tparam Hash Hash functor to use for hashing elements.
*/
template <class Hash>
template <template <typename> class Hash>
class row_hasher {
public:
row_hasher() = delete;
Expand Down
52 changes: 35 additions & 17 deletions cpp/src/stream_compaction/distinct.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/stream_compaction.hpp>
#include <cudf/hashing/detail/xxhash_64.cuh>
#include <cudf/table/experimental/row_operators.cuh>
#include <cudf/table/primitive_row_operators.cuh>
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>
Expand Down Expand Up @@ -55,20 +57,21 @@ rmm::device_uvector<cudf::size_type> dispatch_row_equal(
nan_equality compare_nans,
bool has_nulls,
cudf::experimental::row::equality::self_comparator row_equal,
distinct_hasher_t const& d_hash,
Func&& func)
{
if (compare_nans == nan_equality::ALL_EQUAL) {
auto const d_equal = row_equal.equal_to<HasNested>(
nullate::DYNAMIC{has_nulls},
compare_nulls,
cudf::experimental::row::equality::nan_equal_physical_equality_comparator{});
return func(d_equal);
return func(d_equal, d_hash);
} else {
auto const d_equal = row_equal.equal_to<HasNested>(
nullate::DYNAMIC{has_nulls},
compare_nulls,
cudf::experimental::row::equality::physical_equality_comparator{});
return func(d_equal);
return func(d_equal, d_hash);
}
}
} // namespace
Expand All @@ -86,34 +89,49 @@ rmm::device_uvector<size_type> distinct_indices(table_view const& input,
return rmm::device_uvector<size_type>(0, stream, mr);
}

auto const preprocessed_input =
cudf::experimental::row::hash::preprocessed_table::create(input, stream);
auto const has_nulls = nullate::DYNAMIC{cudf::has_nested_nulls(input)};
auto const has_nested_columns = cudf::detail::has_nested_columns(input);

auto const row_hash = cudf::experimental::row::hash::row_hasher(preprocessed_input);
auto const row_equal = cudf::experimental::row::equality::self_comparator(preprocessed_input);

auto const helper_func = [&](auto const& d_equal) {
auto const helper_func = [&](auto const& d_equal, auto const& d_hash) {
using RowEqual = std::decay_t<decltype(d_equal)>;
using RowHasher = std::decay_t<decltype(row_hash.device_hasher(has_nulls))>;
auto set = hash_set_type<RowEqual, RowHasher>{
using RowHasher = std::decay_t<decltype(d_hash)>;
auto set = distinct_set_t<RowEqual, RowHasher>{
num_rows,
0.5, // desired load factor
CUCO_DESIRED_LOAD_FACTOR, // 50% load factor
cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL},
d_equal,
{row_hash.device_hasher(has_nulls)},
d_hash,
{},
{},
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream},
stream.value()};
return detail::reduce_by_row(set, num_rows, keep, stream, mr);
};

// fast code-path for single-column numeric types
if (cudf::nullable(input) and input.num_columns() == 1 and
cudf::is_numeric(input.column(0).type())) {
auto d_input_view = cudf::table_device_view::create(input, stream);
auto const d_equal =
cudf::row::primitive::row_equality_comparator{*d_input_view, *d_input_view};
auto const d_hasher =
cudf::row::primitive::row_hasher<cudf::hashing::detail::XXHash_64>{*d_input_view};

return helper_func(d_equal, d_hasher);
}

auto const preprocessed_input =
cudf::experimental::row::hash::preprocessed_table::create(input, stream);
auto const has_nulls = nullate::DYNAMIC{cudf::has_nested_nulls(input)};
auto const has_nested_columns = cudf::detail::has_nested_columns(input);

auto const row_hash = cudf::experimental::row::hash::row_hasher(preprocessed_input);
auto const row_equal = cudf::experimental::row::equality::self_comparator(preprocessed_input);

auto const d_hash = row_hash.device_hasher(has_nulls);
if (cudf::detail::has_nested_columns(input)) {
return dispatch_row_equal<true>(nulls_equal, nans_equal, has_nulls, row_equal, helper_func);
return dispatch_row_equal<true>(
nulls_equal, nans_equal, has_nulls, row_equal, d_hash, helper_func);
} else {
return dispatch_row_equal<false>(nulls_equal, nans_equal, has_nulls, row_equal, helper_func);
return dispatch_row_equal<false>(
nulls_equal, nans_equal, has_nulls, row_equal, d_hash, helper_func);
}
}

Expand Down
53 changes: 30 additions & 23 deletions cpp/src/stream_compaction/distinct_helpers.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

#include "distinct_helpers.hpp"

#include <cudf/hashing/detail/xxhash_64.cuh>
#include <cudf/table/primitive_row_operators.cuh>

#include <cuda/functional>
#include <cuda/std/atomic>

Expand Down Expand Up @@ -99,49 +102,53 @@ rmm::device_uvector<size_type> reduce_by_row(HashSet& set,
return output_indices;
}

using row_hash_t =
cudf::experimental::row::hash::device_row_hasher<cudf::hashing::detail::default_hash,
cudf::nullate::DYNAMIC>;
template rmm::device_uvector<size_type> reduce_by_row(
distinct_set_t<cudf::row::primitive::row_equality_comparator,
cudf::row::primitive::row_hasher<cudf::hashing::detail::XXHash_64>>& set,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

template rmm::device_uvector<size_type> reduce_by_row(
hash_set_type<cudf::experimental::row::equality::device_row_comparator<
false,
cudf::nullate::DYNAMIC,
cudf::experimental::row::equality::nan_equal_physical_equality_comparator>,
row_hash_t>& set,
distinct_set_t<cudf::experimental::row::equality::device_row_comparator<
false,
cudf::nullate::DYNAMIC,
cudf::experimental::row::equality::nan_equal_physical_equality_comparator>,
distinct_hasher_t>& set,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

template rmm::device_uvector<size_type> reduce_by_row(
hash_set_type<cudf::experimental::row::equality::device_row_comparator<
true,
cudf::nullate::DYNAMIC,
cudf::experimental::row::equality::nan_equal_physical_equality_comparator>,
row_hash_t>& set,
distinct_set_t<cudf::experimental::row::equality::device_row_comparator<
true,
cudf::nullate::DYNAMIC,
cudf::experimental::row::equality::nan_equal_physical_equality_comparator>,
distinct_hasher_t>& set,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

template rmm::device_uvector<size_type> reduce_by_row(
hash_set_type<cudf::experimental::row::equality::device_row_comparator<
false,
cudf::nullate::DYNAMIC,
cudf::experimental::row::equality::physical_equality_comparator>,
row_hash_t>& set,
distinct_set_t<cudf::experimental::row::equality::device_row_comparator<
false,
cudf::nullate::DYNAMIC,
cudf::experimental::row::equality::physical_equality_comparator>,
distinct_hasher_t>& set,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

template rmm::device_uvector<size_type> reduce_by_row(
hash_set_type<cudf::experimental::row::equality::device_row_comparator<
true,
cudf::nullate::DYNAMIC,
cudf::experimental::row::equality::physical_equality_comparator>,
row_hash_t>& set,
distinct_set_t<cudf::experimental::row::equality::device_row_comparator<
true,
cudf::nullate::DYNAMIC,
cudf::experimental::row::equality::physical_equality_comparator>,
distinct_hasher_t>& set,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
Expand Down
18 changes: 11 additions & 7 deletions cpp/src/stream_compaction/distinct_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,18 @@ auto constexpr reduction_init_value(duplicate_keep_option keep)
}
}

using distinct_hasher_t =
cudf::experimental::row::hash::device_row_hasher<cudf::hashing::detail::default_hash,
cudf::nullate::DYNAMIC>;

template <typename RowEqual, typename RowHasher>
using hash_set_type = cuco::static_set<size_type,
cuco::extent<int64_t>,
cuda::thread_scope_device,
RowEqual,
cuco::linear_probing<1, RowHasher>,
cudf::detail::cuco_allocator<char>,
cuco::storage<1>>;
using distinct_set_t = cuco::static_set<size_type,
cuco::extent<int64_t>,
cuda::thread_scope_device,
RowEqual,
cuco::linear_probing<1, RowHasher>,
cudf::detail::cuco_allocator<char>,
cuco::storage<1>>;

/**
* @brief Perform a reduction on groups of rows that are compared equal and returns output indices
Expand Down

0 comments on commit 849fe60

Please sign in to comment.