Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support hyper log log plus plus(HLL++) #17133

Draft
wants to merge 11 commits into
base: branch-25.02
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ add_library(
src/groupby/sort/group_correlation.cu
src/groupby/sort/group_count.cu
src/groupby/sort/group_histogram.cu
src/groupby/sort/group_hyper_log_log_plus_plus.cu
src/groupby/sort/group_m2.cu
src/groupby/sort/group_max.cu
src/groupby/sort/group_min.cu
Expand Down
82 changes: 45 additions & 37 deletions cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,43 +84,45 @@ class aggregation {
* @brief Possible aggregation operations
*/
enum Kind {
SUM, ///< sum reduction
PRODUCT, ///< product reduction
MIN, ///< min reduction
MAX, ///< max reduction
COUNT_VALID, ///< count number of valid elements
COUNT_ALL, ///< count number of elements
ANY, ///< any reduction
ALL, ///< all reduction
SUM_OF_SQUARES, ///< sum of squares reduction
MEAN, ///< arithmetic mean reduction
M2, ///< sum of squares of differences from the mean
VARIANCE, ///< variance
STD, ///< standard deviation
MEDIAN, ///< median reduction
QUANTILE, ///< compute specified quantile(s)
ARGMAX, ///< Index of max element
ARGMIN, ///< Index of min element
NUNIQUE, ///< count number of unique elements
NTH_ELEMENT, ///< get the nth element
ROW_NUMBER, ///< get row-number of current index (relative to rolling window)
EWMA, ///< get exponential weighted moving average at current index
RANK, ///< get rank of current index
COLLECT_LIST, ///< collect values into a list
COLLECT_SET, ///< collect values into a list without duplicate entries
LEAD, ///< window function, accesses row at specified offset following current row
LAG, ///< window function, accesses row at specified offset preceding current row
PTX, ///< PTX UDF based reduction
CUDA, ///< CUDA UDF based reduction
MERGE_LISTS, ///< merge multiple lists values into one list
MERGE_SETS, ///< merge multiple lists values into one list then drop duplicate entries
MERGE_M2, ///< merge partial values of M2 aggregation,
COVARIANCE, ///< covariance between two sets of elements
CORRELATION, ///< correlation between two sets of elements
TDIGEST, ///< create a tdigest from a set of input values
MERGE_TDIGEST, ///< create a tdigest by merging multiple tdigests together
HISTOGRAM, ///< compute frequency of each element
MERGE_HISTOGRAM ///< merge partial values of HISTOGRAM aggregation,
SUM, ///< sum reduction
PRODUCT, ///< product reduction
MIN, ///< min reduction
MAX, ///< max reduction
COUNT_VALID, ///< count number of valid elements
COUNT_ALL, ///< count number of elements
ANY, ///< any reduction
ALL, ///< all reduction
SUM_OF_SQUARES, ///< sum of squares reduction
MEAN, ///< arithmetic mean reduction
M2, ///< sum of squares of differences from the mean
VARIANCE, ///< variance
STD, ///< standard deviation
MEDIAN, ///< median reduction
QUANTILE, ///< compute specified quantile(s)
ARGMAX, ///< Index of max element
ARGMIN, ///< Index of min element
NUNIQUE, ///< count number of unique elements
NTH_ELEMENT, ///< get the nth element
ROW_NUMBER, ///< get row-number of current index (relative to rolling window)
EWMA, ///< get exponential weighted moving average at current index
RANK, ///< get rank of current index
COLLECT_LIST, ///< collect values into a list
COLLECT_SET, ///< collect values into a list without duplicate entries
LEAD, ///< window function, accesses row at specified offset following current row
LAG, ///< window function, accesses row at specified offset preceding current row
PTX, ///< PTX UDF based reduction
CUDA, ///< CUDA UDF based reduction
MERGE_LISTS, ///< merge multiple lists values into one list
MERGE_SETS, ///< merge multiple lists values into one list then drop duplicate entries
MERGE_M2, ///< merge partial values of M2 aggregation,
COVARIANCE, ///< covariance between two sets of elements
CORRELATION, ///< correlation between two sets of elements
TDIGEST, ///< create a tdigest from a set of input values
MERGE_TDIGEST, ///< create a tdigest by merging multiple tdigests together
HISTOGRAM, ///< compute frequency of each element
MERGE_HISTOGRAM, ///< merge partial values of HISTOGRAM aggregation
HLLPP, ///< approximating the number of distinct items by using HyperLogLogPlusPlus (HLLPP)
MERGE_HLLPP ///< merge partial values of HyperLogLogPlusPlus aggregation
};

aggregation() = delete;
Expand Down Expand Up @@ -770,5 +772,11 @@ std::unique_ptr<Base> make_tdigest_aggregation(int max_centroids = 1000);
template <typename Base>
std::unique_ptr<Base> make_merge_tdigest_aggregation(int max_centroids = 1000);

template <typename Base = aggregation>
std::unique_ptr<Base> make_hyper_log_log_aggregation(int num_registers_per_sketch);

template <typename Base = aggregation>
std::unique_ptr<Base> make_merge_hyper_log_log_aggregation(int const num_registers_per_sketch);

/** @} */ // end of group
} // namespace CUDF_EXPORT cudf
70 changes: 70 additions & 0 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ class simple_aggregations_collector { // Declares the interface for the simple
class tdigest_aggregation const& agg);
virtual std::vector<std::unique_ptr<aggregation>> visit(
data_type col_type, class merge_tdigest_aggregation const& agg);
virtual std::vector<std::unique_ptr<aggregation>> visit(
data_type col_type, class hyper_log_log_aggregation const& agg);
virtual std::vector<std::unique_ptr<aggregation>> visit(
data_type col_type, class merge_hyper_log_log_aggregation const& agg);
};

class aggregation_finalizer { // Declares the interface for the finalizer
Expand Down Expand Up @@ -144,6 +148,8 @@ class aggregation_finalizer { // Declares the interface for the finalizer
virtual void visit(class tdigest_aggregation const& agg);
virtual void visit(class merge_tdigest_aggregation const& agg);
virtual void visit(class ewma_aggregation const& agg);
virtual void visit(class hyper_log_log_aggregation const& agg);
virtual void visit(class merge_hyper_log_log_aggregation const& agg);
};

/**
Expand Down Expand Up @@ -1186,6 +1192,54 @@ class merge_tdigest_aggregation final : public groupby_aggregation, public reduc
void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
};

/**
* @brief Derived aggregation class for specifying TDIGEST aggregation
*/
class hyper_log_log_aggregation final : public groupby_aggregation, public reduce_aggregation {
public:
explicit hyper_log_log_aggregation(int const precision_)
: aggregation{HLLPP}, precision(precision_)
{
}

int const precision;

[[nodiscard]] std::unique_ptr<aggregation> clone() const override
{
return std::make_unique<hyper_log_log_aggregation>(*this);
}
std::vector<std::unique_ptr<aggregation>> get_simple_aggregations(
data_type col_type, simple_aggregations_collector& collector) const override
{
return collector.visit(col_type, *this);
}
void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
};

/**
* @brief Derived aggregation class for specifying MERGE_TDIGEST aggregation
*/
class merge_hyper_log_log_aggregation final : public groupby_aggregation,
public reduce_aggregation {
public:
explicit merge_hyper_log_log_aggregation(int const precision_)
: aggregation{MERGE_HLLPP}, precision(precision_)
{
}
int const precision;

[[nodiscard]] std::unique_ptr<aggregation> clone() const override
{
return std::make_unique<merge_hyper_log_log_aggregation>(*this);
}
std::vector<std::unique_ptr<aggregation>> get_simple_aggregations(
data_type col_type, simple_aggregations_collector& collector) const override
{
return collector.visit(col_type, *this);
}
void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
};

/**
* @brief Sentinel value used for `ARGMAX` aggregation.
*
Expand Down Expand Up @@ -1319,6 +1373,12 @@ struct target_type_impl<SourceType, aggregation::M2> {
using type = double;
};

// Always use list for HLLPP
template <typename SourceType>
struct target_type_impl<SourceType, aggregation::HLLPP> {
using type = list_view;
};

// Always use `double` for VARIANCE
template <typename SourceType>
struct target_type_impl<SourceType, aggregation::VARIANCE> {
Expand Down Expand Up @@ -1426,6 +1486,12 @@ struct target_type_impl<SourceType, aggregation::MERGE_M2> {
using type = struct_view;
};

// Always use list for MERGE_HLLPP
template <typename SourceType>
struct target_type_impl<SourceType, aggregation::MERGE_HLLPP> {
using type = list_view;
};

// Use list for MERGE_HISTOGRAM
template <typename SourceType>
struct target_type_impl<SourceType, aggregation::MERGE_HISTOGRAM> {
Expand Down Expand Up @@ -1579,6 +1645,10 @@ CUDF_HOST_DEVICE inline decltype(auto) aggregation_dispatcher(aggregation::Kind
return f.template operator()<aggregation::MERGE_TDIGEST>(std::forward<Ts>(args)...);
case aggregation::EWMA:
return f.template operator()<aggregation::EWMA>(std::forward<Ts>(args)...);
case aggregation::HLLPP:
return f.template operator()<aggregation::HLLPP>(std::forward<Ts>(args)...);
case aggregation::MERGE_HLLPP:
return f.template operator()<aggregation::MERGE_HLLPP>(std::forward<Ts>(args)...);
default: {
#ifndef __CUDA_ARCH__
CUDF_FAIL("Unsupported aggregation.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/types.hpp>
#include <cudf/utilities/memory_resource.hpp>

#include <rmm/cuda_stream_view.hpp>

namespace cudf {
namespace groupby::detail {

/**
* Compute the hashs of the input column, then generate a scalar that is a sketch in long array
* format
*/
std::unique_ptr<scalar> reduce_hyper_log_log_plus_plus(column_view const& input,
int64_t const precision,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/**
* Merge sketches in long array format, and compute the estimated distinct value(long)
* Input is a struct column with multiple long columns which is consistent with Spark.
*/
std::unique_ptr<scalar> reduce_merge_hyper_log_log_plus_plus(column_view const& input,
int64_t const precision,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

} // namespace groupby::detail
} // namespace cudf
94 changes: 94 additions & 0 deletions cpp/include/cudf/hashing/detail/xxhash_64_for_hllpp.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/algorithm.cuh>
#include <cudf/hashing/detail/hashing.hpp>
#include <cudf/hashing/detail/xxhash_64.cuh>
#include <cudf/table/table_device_view.cuh>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/tabulate.h>

/**
* This file is for HyperLogLogPlusPlus, it returns seed when input is null.
* This is a temp file, TODO use xxhash_64 in JNI repo to handle NaN Inf like Spark does.
*/
namespace cudf::hashing::detail {

using hash_value_type = uint64_t;

/**
* @brief Computes the hash value of a row in the given table.
*
* @tparam Nullate A cudf::nullate type describing whether to check for nulls.
*/
template <typename Nullate>
class xxhash_64_hllpp_row_hasher {
public:
xxhash_64_hllpp_row_hasher(Nullate nulls, table_device_view const& t, hash_value_type seed)
: _check_nulls(nulls), _table(t), _seed(seed)
{
}

__device__ auto operator()(size_type row_index) const noexcept
{
return cudf::detail::accumulate(
_table.begin(),
_table.end(),
_seed,
[row_index, nulls = _check_nulls] __device__(auto hash, auto column) {
return cudf::type_dispatcher(
column.type(), element_hasher_adapter{}, column, row_index, nulls, hash);
});
}

/**
* @brief Computes the hash value of an element in the given column.
*/
class element_hasher_adapter {
public:
template <typename T, CUDF_ENABLE_IF(column_device_view::has_element_accessor<T>())>
__device__ hash_value_type operator()(column_device_view const& col,
size_type const row_index,
Nullate const _check_nulls,
hash_value_type const _seed) const noexcept
{
if (_check_nulls && col.is_null(row_index)) { return _seed; }
auto const hasher = XXHash_64<T>{_seed};
return hasher(col.element<T>(row_index));
}

template <typename T, CUDF_ENABLE_IF(not column_device_view::has_element_accessor<T>())>
__device__ hash_value_type operator()(column_device_view const&,
size_type const,
Nullate const,
hash_value_type const) const noexcept
{
CUDF_UNREACHABLE("Unsupported type for XXHash_64");
}
};

Nullate const _check_nulls;
table_device_view const _table;
hash_value_type const _seed;
};

} // namespace cudf::hashing::detail
Loading