Skip to content

Commit

Permalink
Support hyper log log plus plus(HLL++)
Browse files Browse the repository at this point in the history
Signed-off-by: Chong Gao <[email protected]>
  • Loading branch information
Chong Gao committed Oct 30, 2024
1 parent 074ab74 commit 171e417
Show file tree
Hide file tree
Showing 16 changed files with 1,381 additions and 306 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ add_library(
src/groupby/sort/group_correlation.cu
src/groupby/sort/group_count.cu
src/groupby/sort/group_histogram.cu
src/groupby/sort/group_hyper_log_log_plus_plus.cu
src/groupby/sort/group_m2.cu
src/groupby/sort/group_max.cu
src/groupby/sort/group_min.cu
Expand Down
82 changes: 45 additions & 37 deletions cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,43 +84,45 @@ class aggregation {
* @brief Possible aggregation operations
*/
enum Kind {
SUM, ///< sum reduction
PRODUCT, ///< product reduction
MIN, ///< min reduction
MAX, ///< max reduction
COUNT_VALID, ///< count number of valid elements
COUNT_ALL, ///< count number of elements
ANY, ///< any reduction
ALL, ///< all reduction
SUM_OF_SQUARES, ///< sum of squares reduction
MEAN, ///< arithmetic mean reduction
M2, ///< sum of squares of differences from the mean
VARIANCE, ///< variance
STD, ///< standard deviation
MEDIAN, ///< median reduction
QUANTILE, ///< compute specified quantile(s)
ARGMAX, ///< Index of max element
ARGMIN, ///< Index of min element
NUNIQUE, ///< count number of unique elements
NTH_ELEMENT, ///< get the nth element
ROW_NUMBER, ///< get row-number of current index (relative to rolling window)
EWMA, ///< get exponential weighted moving average at current index
RANK, ///< get rank of current index
COLLECT_LIST, ///< collect values into a list
COLLECT_SET, ///< collect values into a list without duplicate entries
LEAD, ///< window function, accesses row at specified offset following current row
LAG, ///< window function, accesses row at specified offset preceding current row
PTX, ///< PTX UDF based reduction
CUDA, ///< CUDA UDF based reduction
MERGE_LISTS, ///< merge multiple lists values into one list
MERGE_SETS, ///< merge multiple lists values into one list then drop duplicate entries
MERGE_M2, ///< merge partial values of M2 aggregation,
COVARIANCE, ///< covariance between two sets of elements
CORRELATION, ///< correlation between two sets of elements
TDIGEST, ///< create a tdigest from a set of input values
MERGE_TDIGEST, ///< create a tdigest by merging multiple tdigests together
HISTOGRAM, ///< compute frequency of each element
MERGE_HISTOGRAM ///< merge partial values of HISTOGRAM aggregation,
SUM, ///< sum reduction
PRODUCT, ///< product reduction
MIN, ///< min reduction
MAX, ///< max reduction
COUNT_VALID, ///< count number of valid elements
COUNT_ALL, ///< count number of elements
ANY, ///< any reduction
ALL, ///< all reduction
SUM_OF_SQUARES, ///< sum of squares reduction
MEAN, ///< arithmetic mean reduction
M2, ///< sum of squares of differences from the mean
VARIANCE, ///< variance
STD, ///< standard deviation
MEDIAN, ///< median reduction
QUANTILE, ///< compute specified quantile(s)
ARGMAX, ///< Index of max element
ARGMIN, ///< Index of min element
NUNIQUE, ///< count number of unique elements
NTH_ELEMENT, ///< get the nth element
ROW_NUMBER, ///< get row-number of current index (relative to rolling window)
EWMA, ///< get exponential weighted moving average at current index
RANK, ///< get rank of current index
COLLECT_LIST, ///< collect values into a list
COLLECT_SET, ///< collect values into a list without duplicate entries
LEAD, ///< window function, accesses row at specified offset following current row
LAG, ///< window function, accesses row at specified offset preceding current row
PTX, ///< PTX UDF based reduction
CUDA, ///< CUDA UDF based reduction
MERGE_LISTS, ///< merge multiple lists values into one list
MERGE_SETS, ///< merge multiple lists values into one list then drop duplicate entries
MERGE_M2, ///< merge partial values of M2 aggregation,
COVARIANCE, ///< covariance between two sets of elements
CORRELATION, ///< correlation between two sets of elements
TDIGEST, ///< create a tdigest from a set of input values
MERGE_TDIGEST, ///< create a tdigest by merging multiple tdigests together
HISTOGRAM, ///< compute frequency of each element
MERGE_HISTOGRAM, ///< merge partial values of HISTOGRAM aggregation
HLLPP, ///< approximating the number of distinct items by using hyper log log plus plus (HLLPP)
MERGE_HLLPP ///< merge partial values of HLLPP aggregation
};

aggregation() = delete;
Expand Down Expand Up @@ -770,5 +772,11 @@ std::unique_ptr<Base> make_tdigest_aggregation(int max_centroids = 1000);
template <typename Base>
std::unique_ptr<Base> make_merge_tdigest_aggregation(int max_centroids = 1000);

template <typename Base = aggregation>
std::unique_ptr<Base> make_hyper_log_log_aggregation(int num_registers_per_sketch);

template <typename Base = aggregation>
std::unique_ptr<Base> make_merge_hyper_log_log_aggregation(int const num_registers_per_sketch);

/** @} */ // end of group
} // namespace CUDF_EXPORT cudf
70 changes: 70 additions & 0 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ class simple_aggregations_collector { // Declares the interface for the simple
class tdigest_aggregation const& agg);
virtual std::vector<std::unique_ptr<aggregation>> visit(
data_type col_type, class merge_tdigest_aggregation const& agg);
virtual std::vector<std::unique_ptr<aggregation>> visit(
data_type col_type, class hyper_log_log_aggregation const& agg);
virtual std::vector<std::unique_ptr<aggregation>> visit(
data_type col_type, class merge_hyper_log_log_aggregation const& agg);
};

class aggregation_finalizer { // Declares the interface for the finalizer
Expand Down Expand Up @@ -144,6 +148,8 @@ class aggregation_finalizer { // Declares the interface for the finalizer
virtual void visit(class tdigest_aggregation const& agg);
virtual void visit(class merge_tdigest_aggregation const& agg);
virtual void visit(class ewma_aggregation const& agg);
virtual void visit(class hyper_log_log_aggregation const& agg);
virtual void visit(class merge_hyper_log_log_aggregation const& agg);
};

/**
Expand Down Expand Up @@ -1186,6 +1192,54 @@ class merge_tdigest_aggregation final : public groupby_aggregation, public reduc
void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
};

/**
* @brief Derived aggregation class for specifying TDIGEST aggregation
*/
class hyper_log_log_aggregation final : public groupby_aggregation, public reduce_aggregation {
public:
explicit hyper_log_log_aggregation(int const precision_)
: aggregation{HLLPP}, precision(precision_)
{
}

int const precision;

[[nodiscard]] std::unique_ptr<aggregation> clone() const override
{
return std::make_unique<hyper_log_log_aggregation>(*this);
}
std::vector<std::unique_ptr<aggregation>> get_simple_aggregations(
data_type col_type, simple_aggregations_collector& collector) const override
{
return collector.visit(col_type, *this);
}
void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
};

/**
* @brief Derived aggregation class for specifying MERGE_TDIGEST aggregation
*/
class merge_hyper_log_log_aggregation final : public groupby_aggregation,
public reduce_aggregation {
public:
explicit merge_hyper_log_log_aggregation(int const precision_)
: aggregation{MERGE_HLLPP}, precision(precision_)
{
}
int const precision;

[[nodiscard]] std::unique_ptr<aggregation> clone() const override
{
return std::make_unique<merge_hyper_log_log_aggregation>(*this);
}
std::vector<std::unique_ptr<aggregation>> get_simple_aggregations(
data_type col_type, simple_aggregations_collector& collector) const override
{
return collector.visit(col_type, *this);
}
void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
};

/**
* @brief Sentinel value used for `ARGMAX` aggregation.
*
Expand Down Expand Up @@ -1319,6 +1373,12 @@ struct target_type_impl<SourceType, aggregation::M2> {
using type = double;
};

// Always use list for HLLPP
template <typename SourceType>
struct target_type_impl<SourceType, aggregation::HLLPP> {
using type = list_view;
};

// Always use `double` for VARIANCE
template <typename SourceType>
struct target_type_impl<SourceType, aggregation::VARIANCE> {
Expand Down Expand Up @@ -1426,6 +1486,12 @@ struct target_type_impl<SourceType, aggregation::MERGE_M2> {
using type = struct_view;
};

// Always use list for MERGE_HLLPP
template <typename SourceType>
struct target_type_impl<SourceType, aggregation::MERGE_HLLPP> {
using type = list_view;
};

// Use list for MERGE_HISTOGRAM
template <typename SourceType>
struct target_type_impl<SourceType, aggregation::MERGE_HISTOGRAM> {
Expand Down Expand Up @@ -1579,6 +1645,10 @@ CUDF_HOST_DEVICE inline decltype(auto) aggregation_dispatcher(aggregation::Kind
return f.template operator()<aggregation::MERGE_TDIGEST>(std::forward<Ts>(args)...);
case aggregation::EWMA:
return f.template operator()<aggregation::EWMA>(std::forward<Ts>(args)...);
case aggregation::HLLPP:
return f.template operator()<aggregation::HLLPP>(std::forward<Ts>(args)...);
case aggregation::MERGE_HLLPP:
return f.template operator()<aggregation::MERGE_HLLPP>(std::forward<Ts>(args)...);
default: {
#ifndef __CUDA_ARCH__
CUDF_FAIL("Unsupported aggregation.");
Expand Down
Loading

0 comments on commit 171e417

Please sign in to comment.