From bb2bfc2d88e55a557d49aefe0299f5cfe97f6a4b Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Sat, 12 Oct 2024 17:54:53 +0800 Subject: [PATCH] Support hyper log log plus plus(HLL++) Signed-off-by: Chong Gao --- cpp/CMakeLists.txt | 2 + cpp/include/cudf/aggregation.hpp | 82 ++- .../cudf/detail/aggregation/aggregation.hpp | 70 ++ cpp/src/aggregation/aggregation.cpp | 48 ++ cpp/src/groupby/sort/aggregate.cpp | 39 + cpp/src/groupby/sort/group_hyper_log_log.cu | 667 ++++++++++++++++++ .../groupby/sort/group_merge_hyper_log_log.cu | 422 +++++++++++ cpp/src/groupby/sort/group_reductions.hpp | 13 + cpp/tests/CMakeLists.txt | 1 + cpp/tests/groupby/hll_tests.cpp | 75 ++ .../main/java/ai/rapids/cudf/Aggregation.java | 74 +- .../ai/rapids/cudf/GroupByAggregation.java | 8 + .../ai/rapids/cudf/ReductionAggregation.java | 8 + java/src/main/native/src/AggregationJni.cpp | 28 +- .../test/java/ai/rapids/cudf/TableTest.java | 15 +- 15 files changed, 1512 insertions(+), 40 deletions(-) create mode 100644 cpp/src/groupby/sort/group_hyper_log_log.cu create mode 100644 cpp/src/groupby/sort/group_merge_hyper_log_log.cu create mode 100644 cpp/tests/groupby/hll_tests.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index e4b9cbf8921..ae9bff04676 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -382,6 +382,7 @@ add_library( src/groupby/sort/group_correlation.cu src/groupby/sort/group_count.cu src/groupby/sort/group_histogram.cu + src/groupby/sort/group_hyper_log_log.cu src/groupby/sort/group_m2.cu src/groupby/sort/group_max.cu src/groupby/sort/group_min.cu @@ -396,6 +397,7 @@ add_library( src/groupby/sort/scan.cpp src/groupby/sort/group_count_scan.cu src/groupby/sort/group_max_scan.cu + src/groupby/sort/group_merge_hyper_log_log.cu src/groupby/sort/group_min_scan.cu src/groupby/sort/group_product_scan.cu src/groupby/sort/group_rank_scan.cu diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index f5f514d26d9..355e4f59f60 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -84,43 +84,45 @@ class aggregation { * @brief Possible aggregation operations */ enum Kind { - SUM, ///< sum reduction - PRODUCT, ///< product reduction - MIN, ///< min reduction - MAX, ///< max reduction - COUNT_VALID, ///< count number of valid elements - COUNT_ALL, ///< count number of elements - ANY, ///< any reduction - ALL, ///< all reduction - SUM_OF_SQUARES, ///< sum of squares reduction - MEAN, ///< arithmetic mean reduction - M2, ///< sum of squares of differences from the mean - VARIANCE, ///< variance - STD, ///< standard deviation - MEDIAN, ///< median reduction - QUANTILE, ///< compute specified quantile(s) - ARGMAX, ///< Index of max element - ARGMIN, ///< Index of min element - NUNIQUE, ///< count number of unique elements - NTH_ELEMENT, ///< get the nth element - ROW_NUMBER, ///< get row-number of current index (relative to rolling window) - EWMA, ///< get exponential weighted moving average at current index - RANK, ///< get rank of current index - COLLECT_LIST, ///< collect values into a list - COLLECT_SET, ///< collect values into a list without duplicate entries - LEAD, ///< window function, accesses row at specified offset following current row - LAG, ///< window function, accesses row at specified offset preceding current row - PTX, ///< PTX UDF based reduction - CUDA, ///< CUDA UDF based reduction - MERGE_LISTS, ///< merge multiple lists values into one list - MERGE_SETS, ///< merge multiple lists values into one list then drop duplicate entries - MERGE_M2, ///< merge partial values of M2 aggregation, - COVARIANCE, ///< covariance between two sets of elements - CORRELATION, ///< correlation between two sets of elements - TDIGEST, ///< create a tdigest from a set of input values - MERGE_TDIGEST, ///< create a tdigest by merging multiple tdigests together - HISTOGRAM, ///< compute frequency of each element - MERGE_HISTOGRAM ///< merge partial values of HISTOGRAM aggregation, + SUM, ///< sum reduction + PRODUCT, ///< product reduction + MIN, ///< min reduction + MAX, ///< max reduction + COUNT_VALID, ///< count number of valid elements + COUNT_ALL, ///< count number of elements + ANY, ///< any reduction + ALL, ///< all reduction + SUM_OF_SQUARES, ///< sum of squares reduction + MEAN, ///< arithmetic mean reduction + M2, ///< sum of squares of differences from the mean + VARIANCE, ///< variance + STD, ///< standard deviation + MEDIAN, ///< median reduction + QUANTILE, ///< compute specified quantile(s) + ARGMAX, ///< Index of max element + ARGMIN, ///< Index of min element + NUNIQUE, ///< count number of unique elements + NTH_ELEMENT, ///< get the nth element + ROW_NUMBER, ///< get row-number of current index (relative to rolling window) + EWMA, ///< get exponential weighted moving average at current index + RANK, ///< get rank of current index + COLLECT_LIST, ///< collect values into a list + COLLECT_SET, ///< collect values into a list without duplicate entries + LEAD, ///< window function, accesses row at specified offset following current row + LAG, ///< window function, accesses row at specified offset preceding current row + PTX, ///< PTX UDF based reduction + CUDA, ///< CUDA UDF based reduction + MERGE_LISTS, ///< merge multiple lists values into one list + MERGE_SETS, ///< merge multiple lists values into one list then drop duplicate entries + MERGE_M2, ///< merge partial values of M2 aggregation, + COVARIANCE, ///< covariance between two sets of elements + CORRELATION, ///< correlation between two sets of elements + TDIGEST, ///< create a tdigest from a set of input values + MERGE_TDIGEST, ///< create a tdigest by merging multiple tdigests together + HISTOGRAM, ///< compute frequency of each element + MERGE_HISTOGRAM, ///< merge partial values of HISTOGRAM aggregation + HLLPP, ///< approximating the number of distinct items by using hyper log log plus plus (HLLPP) + MERGE_HLLPP ///< merge partial values of HLLPP aggregation }; aggregation() = delete; @@ -770,5 +772,11 @@ std::unique_ptr make_tdigest_aggregation(int max_centroids = 1000); template std::unique_ptr make_merge_tdigest_aggregation(int max_centroids = 1000); +template +std::unique_ptr make_hyper_log_log_aggregation(int num_registers_per_sketch); + +template +std::unique_ptr make_merge_hyper_log_log_aggregation(int const num_registers_per_sketch); + /** @} */ // end of group } // namespace CUDF_EXPORT cudf diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp index 6661a461b8b..01424cac766 100644 --- a/cpp/include/cudf/detail/aggregation/aggregation.hpp +++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp @@ -104,6 +104,10 @@ class simple_aggregations_collector { // Declares the interface for the simple class tdigest_aggregation const& agg); virtual std::vector> visit( data_type col_type, class merge_tdigest_aggregation const& agg); + virtual std::vector> visit( + data_type col_type, class hyper_log_log_aggregation const& agg); + virtual std::vector> visit( + data_type col_type, class merge_hyper_log_log_aggregation const& agg); }; class aggregation_finalizer { // Declares the interface for the finalizer @@ -144,6 +148,8 @@ class aggregation_finalizer { // Declares the interface for the finalizer virtual void visit(class tdigest_aggregation const& agg); virtual void visit(class merge_tdigest_aggregation const& agg); virtual void visit(class ewma_aggregation const& agg); + virtual void visit(class hyper_log_log_aggregation const& agg); + virtual void visit(class merge_hyper_log_log_aggregation const& agg); }; /** @@ -1186,6 +1192,54 @@ class merge_tdigest_aggregation final : public groupby_aggregation, public reduc void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); } }; +/** + * @brief Derived aggregation class for specifying TDIGEST aggregation + */ +class hyper_log_log_aggregation final : public groupby_aggregation, public reduce_aggregation { + public: + explicit hyper_log_log_aggregation(int const num_registers_per_sketch_) + : aggregation{HLLPP}, num_registers_per_sketch(num_registers_per_sketch_) + { + } + + int const num_registers_per_sketch; + + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(*this); + } + std::vector> get_simple_aggregations( + data_type col_type, simple_aggregations_collector& collector) const override + { + return collector.visit(col_type, *this); + } + void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); } +}; + +/** + * @brief Derived aggregation class for specifying MERGE_TDIGEST aggregation + */ +class merge_hyper_log_log_aggregation final : public groupby_aggregation, + public reduce_aggregation { + public: + explicit merge_hyper_log_log_aggregation(int const num_registers_per_sketch_) + : aggregation{MERGE_HLLPP}, num_registers_per_sketch(num_registers_per_sketch_) + { + } + int const num_registers_per_sketch; + + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(*this); + } + std::vector> get_simple_aggregations( + data_type col_type, simple_aggregations_collector& collector) const override + { + return collector.visit(col_type, *this); + } + void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); } +}; + /** * @brief Sentinel value used for `ARGMAX` aggregation. * @@ -1319,6 +1373,12 @@ struct target_type_impl { using type = double; }; +// Always use list for HLLPP +template +struct target_type_impl { + using type = list_view; +}; + // Always use `double` for VARIANCE template struct target_type_impl { @@ -1426,6 +1486,12 @@ struct target_type_impl { using type = struct_view; }; +// Always use list for MERGE_HLLPP +template +struct target_type_impl { + using type = list_view; +}; + // Use list for MERGE_HISTOGRAM template struct target_type_impl { @@ -1579,6 +1645,10 @@ CUDF_HOST_DEVICE inline decltype(auto) aggregation_dispatcher(aggregation::Kind return f.template operator()(std::forward(args)...); case aggregation::EWMA: return f.template operator()(std::forward(args)...); + case aggregation::HLLPP: + return f.template operator()(std::forward(args)...); + case aggregation::MERGE_HLLPP: + return f.template operator()(std::forward(args)...); default: { #ifndef __CUDA_ARCH__ CUDF_FAIL("Unsupported aggregation."); diff --git a/cpp/src/aggregation/aggregation.cpp b/cpp/src/aggregation/aggregation.cpp index a60a7f63882..561fad1742d 100644 --- a/cpp/src/aggregation/aggregation.cpp +++ b/cpp/src/aggregation/aggregation.cpp @@ -237,6 +237,18 @@ std::vector> simple_aggregations_collector::visit( return visit(col_type, static_cast(agg)); } +std::vector> simple_aggregations_collector::visit( + data_type col_type, hyper_log_log_aggregation const& agg) +{ + return visit(col_type, static_cast(agg)); +} + +std::vector> simple_aggregations_collector::visit( + data_type col_type, merge_hyper_log_log_aggregation const& agg) +{ + return visit(col_type, static_cast(agg)); +} + // aggregation_finalizer ---------------------------------------- void aggregation_finalizer::visit(aggregation const& agg) {} @@ -410,6 +422,16 @@ void aggregation_finalizer::visit(merge_tdigest_aggregation const& agg) visit(static_cast(agg)); } +void aggregation_finalizer::visit(hyper_log_log_aggregation const& agg) +{ + visit(static_cast(agg)); +} + +void aggregation_finalizer::visit(merge_hyper_log_log_aggregation const& agg) +{ + visit(static_cast(agg)); +} + } // namespace detail std::vector> aggregation::get_simple_aggregations( @@ -917,6 +939,32 @@ make_merge_tdigest_aggregation(int max_centroids); template CUDF_EXPORT std::unique_ptr make_merge_tdigest_aggregation(int max_centroids); +/// Factory to create a HLLPP aggregation +template +std::unique_ptr make_hyper_log_log_aggregation(int const num_registers_per_sketch) +{ + return std::make_unique(num_registers_per_sketch); +} +template CUDF_EXPORT std::unique_ptr make_hyper_log_log_aggregation( + int num_registers_per_sketch); +template CUDF_EXPORT std::unique_ptr +make_hyper_log_log_aggregation(int num_registers_per_sketch); +template CUDF_EXPORT std::unique_ptr +make_hyper_log_log_aggregation(int num_registers_per_sketch); + +/// Factory to create a MERGE_HLLPP aggregation +template +std::unique_ptr make_merge_hyper_log_log_aggregation(int const num_registers_per_sketch) +{ + return std::make_unique(num_registers_per_sketch); +} +template CUDF_EXPORT std::unique_ptr make_merge_hyper_log_log_aggregation( + int const num_registers_per_sketch); +template CUDF_EXPORT std::unique_ptr +make_merge_hyper_log_log_aggregation(int const num_registers_per_sketch); +template CUDF_EXPORT std::unique_ptr +make_merge_hyper_log_log_aggregation(int const num_registers_per_sketch); + namespace detail { namespace { struct target_type_functor { diff --git a/cpp/src/groupby/sort/aggregate.cpp b/cpp/src/groupby/sort/aggregate.cpp index a9085a1f1fd..8afa9bbe09a 100644 --- a/cpp/src/groupby/sort/aggregate.cpp +++ b/cpp/src/groupby/sort/aggregate.cpp @@ -750,6 +750,26 @@ void aggregate_result_functor::operator()(aggregation cons mr)); } +template <> +void aggregate_result_functor::operator()(aggregation const& agg) +{ + if (cache.has_result(values, agg)) { return; } + + int const num_registers_per_sketch = + dynamic_cast(agg).num_registers_per_sketch; + + printf("my-debug: dynamic cast, num is %d \n", num_registers_per_sketch); + + cache.add_result(values, + agg, + detail::group_hyper_log_log(get_grouped_values(), + helper.num_groups(stream), + helper.group_labels(stream), + num_registers_per_sketch, + stream, + mr)); +} + /** * @brief Generate a merged tdigest column from a grouped set of input tdigest columns. * @@ -792,6 +812,25 @@ void aggregate_result_functor::operator()(aggregatio mr)); } +template <> +void aggregate_result_functor::operator()(aggregation const& agg) +{ + if (cache.has_result(values, agg)) { return; } + + int const num_registers_per_sketch = + dynamic_cast(agg) + .num_registers_per_sketch; + + cache.add_result(values, + agg, + detail::group_merge_hyper_log_log(get_grouped_values(), + helper.num_groups(stream), + helper.group_labels(stream), + num_registers_per_sketch, + stream, + mr)); +} + } // namespace detail // Sort-based groupby diff --git a/cpp/src/groupby/sort/group_hyper_log_log.cu b/cpp/src/groupby/sort/group_hyper_log_log.cu new file mode 100644 index 00000000000..bc4e761cbd8 --- /dev/null +++ b/cpp/src/groupby/sort/group_hyper_log_log.cu @@ -0,0 +1,667 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include // TODO #include once available +#include +#include +#include +#include + +namespace cudf { +namespace groupby { +namespace detail { +namespace { + +// The number of bits that is required per register. +constexpr int REGISTER_VALUE_BITS = 6; +constexpr int REGISTERS_PER_LONG = 64 / REGISTER_VALUE_BITS; // 10 +constexpr int64_t SEED = 42L; // XXHash seed + +/** + * Normalization of floating point NaNs, passthrough for all other values. + */ +template +T __device__ inline normalize_nans(T const& key) +{ + if constexpr (cudf::is_floating_point()) { + if (std::isnan(key)) { return std::numeric_limits::quiet_NaN(); } + } + return key; +} + +/** + * Normalization of floating point NaNs and zeros, passthrough for all other values. + */ +template +T __device__ inline normalize_nans_and_zeros(T const& key) +{ + if constexpr (cudf::is_floating_point()) { + if (key == T{0.0}) { return T{0.0}; } + } + return normalize_nans(key); +} + +/** + * @brief Converts a cudf decimal128 value to a java bigdecimal value. + * + * @param key The cudf decimal value + * + * @returns A 128 bit value containing the converted decimal bits and a length + * representing the relevant number of bytes in the value. + * + */ +__device__ __inline__ std::pair<__int128_t, cudf::size_type> to_java_bigdecimal( + numeric::decimal128 key) +{ + // java.math.BigDecimal.valueOf(unscaled_value, _scale).unscaledValue().toByteArray() + // https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L381 + __int128_t const val = key.value(); + constexpr cudf::size_type key_size = sizeof(__int128_t); + std::byte const* data = reinterpret_cast(&val); + + // Small negative values start with 0xff..., small positive values start with 0x00... + bool const is_negative = val < 0; + std::byte const zero_value = is_negative ? std::byte{0xff} : std::byte{0x00}; + + // If the value can be represented with a shorter than 16-byte integer, the + // leading bytes of the little-endian value are truncated and are not hashed. + auto const reverse_begin = thrust::reverse_iterator(data + key_size); + auto const reverse_end = thrust::reverse_iterator(data); + auto const first_nonzero_byte = + thrust::find_if_not(thrust::seq, reverse_begin, reverse_end, [zero_value](std::byte const& v) { + return v == zero_value; + }).base(); + // Max handles special case of 0 and -1 which would shorten to 0 length otherwise + cudf::size_type length = + std::max(1, static_cast(thrust::distance(data, first_nonzero_byte))); + + // Preserve the 2's complement sign bit by adding a byte back on if necessary. + // e.g. 0x0000ff would shorten to 0x00ff. The 0x00 byte is retained to + // preserve the sign bit, rather than leaving an "f" at the front which would + // change the sign bit. However, 0x00007f would shorten to 0x7f. No extra byte + // is needed because the leftmost bit matches the sign bit. Similarly for + // negative values: 0xffff00 --> 0xff00 and 0xffff80 --> 0x80. + if ((length < key_size) && (is_negative ^ bool(data[length - 1] & std::byte{0x80}))) { ++length; } + + // Convert to big endian by reversing the range of nonzero bytes. Only those bytes are hashed. + __int128_t big_endian_value = 0; + auto big_endian_data = reinterpret_cast(&big_endian_value); + thrust::reverse_copy(thrust::seq, data, data + length, big_endian_data); + + return {big_endian_value, length}; +} + +using hash_value_type = int64_t; +using half_size_type = int32_t; + +constexpr __device__ inline int64_t rotate_bits_left_signed(hash_value_type h, int8_t r) +{ + return (h << r) | (h >> (64 - r)) & ~(-1 << r); +} + +template +struct XXHash_64 { + using result_type = hash_value_type; + + constexpr XXHash_64() = delete; + constexpr XXHash_64(hash_value_type seed) : m_seed(seed) {} + + template + __device__ inline T getblock32(std::byte const* data, cudf::size_type offset) const + { + // Read a 4-byte value from the data pointer as individual bytes for safe + // unaligned access (very likely for string types). + auto block = reinterpret_cast(data + offset); + uint32_t result = static_cast(block[0]) | (static_cast(block[1]) << 8) | + (static_cast(block[2]) << 16) | + (static_cast(block[3]) << 24); + return reinterpret_cast(&result)[0]; + } + + __device__ inline hash_value_type getblock64(std::byte const* data, cudf::size_type offset) const + { + uint64_t result = static_cast(getblock32(data, offset)) | + static_cast(getblock32(data, offset + 4)) << 32; + return reinterpret_cast(&result)[0]; + } + + result_type __device__ inline operator()(Key const& key) const { return compute(key); } + + template + result_type __device__ inline compute(T const& key) const + { + return compute_bytes(reinterpret_cast(&key), sizeof(T)); + } + + result_type __device__ inline compute_remaining_bytes(std::byte const* data, + cudf::size_type const nbytes, + cudf::size_type offset, + result_type h64) const + { + // remaining data can be processed in 8-byte chunks + if ((nbytes % 32) >= 8) { + for (; offset <= nbytes - 8; offset += 8) { + hash_value_type k1 = getblock64(data, offset) * prime2; + k1 = rotate_bits_left_signed(k1, 31) * prime1; + h64 ^= k1; + h64 = rotate_bits_left_signed(h64, 27) * prime1 + prime4; + } + } + + // remaining data can be processed in 4-byte chunks + if (((nbytes % 32) % 8) >= 4) { + for (; offset <= nbytes - 4; offset += 4) { + h64 ^= (getblock32(data, offset) & 0xffffffffL) * prime1; + h64 = rotate_bits_left_signed(h64, 23) * prime2 + prime3; + } + } + + // and the rest + if (nbytes % 4) { + while (offset < nbytes) { + h64 ^= (static_cast(data[offset]) & 0xff) * prime5; + h64 = rotate_bits_left_signed(h64, 11) * prime1; + ++offset; + } + } + return h64; + } + + result_type __device__ compute_bytes(std::byte const* data, cudf::size_type const nbytes) const + { + uint64_t offset = 0; + hash_value_type h64; + // data can be processed in 32-byte chunks + if (nbytes >= 32) { + auto limit = nbytes - 32; + hash_value_type v1 = m_seed + prime1 + prime2; + hash_value_type v2 = m_seed + prime2; + hash_value_type v3 = m_seed; + hash_value_type v4 = m_seed - prime1; + + do { + // pipeline 4*8byte computations + v1 += getblock64(data, offset) * prime2; + v1 = rotate_bits_left_signed(v1, 31); + v1 *= prime1; + offset += 8; + v2 += getblock64(data, offset) * prime2; + v2 = rotate_bits_left_signed(v2, 31); + v2 *= prime1; + offset += 8; + v3 += getblock64(data, offset) * prime2; + v3 = rotate_bits_left_signed(v3, 31); + v3 *= prime1; + offset += 8; + v4 += getblock64(data, offset) * prime2; + v4 = rotate_bits_left_signed(v4, 31); + v4 *= prime1; + offset += 8; + } while (offset <= limit); + + h64 = rotate_bits_left_signed(v1, 1) + rotate_bits_left_signed(v2, 7) + + rotate_bits_left_signed(v3, 12) + rotate_bits_left_signed(v4, 18); + + v1 *= prime2; + v1 = rotate_bits_left_signed(v1, 31); + v1 *= prime1; + h64 ^= v1; + h64 = h64 * prime1 + prime4; + + v2 *= prime2; + v2 = rotate_bits_left_signed(v2, 31); + v2 *= prime1; + h64 ^= v2; + h64 = h64 * prime1 + prime4; + + v3 *= prime2; + v3 = rotate_bits_left_signed(v3, 31); + v3 *= prime1; + h64 ^= v3; + h64 = h64 * prime1 + prime4; + + v4 *= prime2; + v4 = rotate_bits_left_signed(v4, 31); + v4 *= prime1; + h64 ^= v4; + h64 = h64 * prime1 + prime4; + } else { + h64 = m_seed + prime5; + } + + h64 += nbytes; + h64 = compute_remaining_bytes(data, nbytes, offset, h64); + + return finalize(h64); + } + + constexpr __host__ __device__ hash_value_type finalize(hash_value_type h) const noexcept + { + h ^= static_cast(static_cast(h) >> 33); + h *= prime2; + h ^= static_cast(static_cast(h) >> 29); + h *= prime3; + h ^= static_cast(static_cast(h) >> 32); + return h; + } + + private: + hash_value_type m_seed{}; + + static constexpr hash_value_type prime1 = 0x9E3779B185EBCA87L; + static constexpr hash_value_type prime2 = 0xC2B2AE3D27D4EB4FL; + static constexpr hash_value_type prime3 = 0x165667B19E3779F9L; + static constexpr hash_value_type prime4 = 0x85EBCA77C2B2AE63L; + static constexpr hash_value_type prime5 = 0x27D4EB2F165667C5L; +}; + +template <> +hash_value_type __device__ inline XXHash_64::operator()(bool const& key) const +{ + return compute(key); +} + +template <> +hash_value_type __device__ inline XXHash_64::operator()(int8_t const& key) const +{ + return compute(key); +} + +template <> +hash_value_type __device__ inline XXHash_64::operator()(uint8_t const& key) const +{ + return compute(key); +} + +template <> +hash_value_type __device__ inline XXHash_64::operator()(int16_t const& key) const +{ + return compute(key); +} + +template <> +hash_value_type __device__ inline XXHash_64::operator()(uint16_t const& key) const +{ + return compute(key); +} + +template <> +hash_value_type __device__ inline XXHash_64::operator()(float const& key) const +{ + return compute(normalize_nans_and_zeros(key)); +} + +template <> +hash_value_type __device__ inline XXHash_64::operator()(double const& key) const +{ + return compute(normalize_nans_and_zeros(key)); +} + +template <> +hash_value_type __device__ inline XXHash_64::operator()( + cudf::string_view const& key) const +{ + auto const data = reinterpret_cast(key.data()); + auto const len = key.size_bytes(); + return compute_bytes(data, len); +} + +template <> +hash_value_type __device__ inline XXHash_64::operator()( + numeric::decimal32 const& key) const +{ + return compute(key.value()); +} + +template <> +hash_value_type __device__ inline XXHash_64::operator()( + numeric::decimal64 const& key) const +{ + return compute(key.value()); +} + +template <> +hash_value_type __device__ inline XXHash_64::operator()( + numeric::decimal128 const& key) const +{ + auto [java_d, length] = to_java_bigdecimal(key); + auto bytes = reinterpret_cast(&java_d); + return compute_bytes(bytes, length); +} + +/** + * @brief Computes the hash value of a value in the given column. + * + * @tparam Nullate A cudf::nullate type describing whether to check for nulls. + */ +template +struct device_row_hasher { + public: + Nullate const check_nulls; + cudf::column_device_view const d_input; + + device_row_hasher(Nullate check_nulls_, cudf::column_device_view const& d_input_) + : check_nulls(check_nulls_), d_input(d_input_) + { + } + + __device__ auto operator()(cudf::size_type row_index) const noexcept + { + return cudf::type_dispatcher( + d_input.type(), element_hasher_adapter{}, d_input, row_index, check_nulls); + } + + /** + * @brief Computes the hash value of an element in the given column. + */ + struct element_hasher_adapter { + template ())> + __device__ int64_t operator()(cudf::column_device_view const& d_input, + cudf::size_type row_index, + Nullate const check_nulls) const noexcept + { + if (check_nulls && d_input.is_null(row_index)) { return SEED; } + auto const hasher = XXHash_64{SEED}; + return hasher(d_input.element(row_index)); + } + + template ())> + __device__ int64_t operator()(cudf::column_device_view const&, + cudf::size_type, + Nullate const) const noexcept + { + CUDF_UNREACHABLE("Only atomic types are supported in HLLPP groupby aggregation"); + } + }; +}; + +/** + * Only supports block_size = 256 now. + * Sketch contains `m` registers. + * Each thread computes `num_hashs_per_thread` hashs, generates a merged sketch, and saves the + * sketch of previous group when meets new group. + * `m` must be 1 or 2 times of block_size. + * supports p is 8 or 9 + */ +template +CUDF_KERNEL void group_sketches_kernel( + column_device_view hashs, + cudf::device_span group_lables, + unsigned int const m, // num of register per sketch, e.g.: 2^9=512 + int* const sketch_output, // num is num_groups * m; int can hold 6 bits. + int* const sketches_cache, // num is num_threads * m; int can hold 6 bits. + size_type* const group_lables_cache // save the group lables for each thread +) +{ + auto const tid = cudf::detail::grid_1d::global_thread_id(); + int64_t const num_hashs = hashs.size(); + if (tid * num_hashs_per_thread >= hashs.size()) { return; } + + int const p = cuda::std::countr_zero(m); // register bits, e.g.: 9, 2^p = m + uint64_t const w_padding = 1L << (p - 1); // 1L << (p - 1), e.g.: integer in binary: 1 0000 0000 + int const idx_shift = 64 - p; // 64 - p, e.g.: 64 - 9 = 55 + + auto const hash_first = tid * num_hashs_per_thread; + auto const hash_end = cuda::std::min((tid + 1) * num_hashs_per_thread, num_hashs); + + // init sketches for each thread + int* const sketch_ptr = sketches_cache + tid * m; + for (auto i = 0; i < m; i++) { + sketch_ptr[i] = 0; + } + + size_type prev_group = group_lables[hash_first]; + for (auto hash_idx = hash_first; hash_idx < hash_end; hash_idx++) { + size_type curr_group = group_lables[hash_idx]; + // reinterpret to unsigned, then >> will shift without preserve the sign bit. + uint64_t const hash = static_cast(hashs.element(hash_idx)); + auto const reg_idx = hash >> idx_shift; + // max num of zero in a long is 64, int can hold the max num of zero + int const reg_v = static_cast(cuda::std::countl_zero((hash << p) | w_padding) + 1ULL); + if (curr_group == prev_group) { + // still in the same group, update the max value + if (reg_v > sketch_ptr[reg_idx]) { sketch_ptr[reg_idx] = reg_v; } + } else { + // meets new group, save output for the previous group + for (auto i = 0; i < m; i++) { + sketch_output[prev_group * m + i] = sketch_ptr[i]; + } + // reset the thread cache + for (auto i = 0; i < m; i++) { + sketch_ptr[i] = 0; + } + // save the max value + sketch_ptr[reg_idx] = reg_v; + } + + prev_group = curr_group; + } + + // save the last group lable + group_lables_cache[tid] = group_lables[hash_end - 1]; + + // sync the whole block + __syncthreads(); + + // scan all the sketches vertically for threads + // split sketch(`m` registers) to slots, one thread handles one slot + // each slot contains 1(p = 8) or 2(p = 9) registers + int const num_sketches = block_size; // each thread generates one sketch + int const regs_in_slot = m / block_size; + int const reg_slot_begin = threadIdx.x * regs_in_slot; + prev_group = group_lables_cache[0]; + int max_register[] = {0, 0}; // supports p is 8, 9 + + for (auto sketch_idx = 0; sketch_idx < num_sketches; sketch_idx++) { + int curr_group = group_lables_cache[sketch_idx]; + if (curr_group != prev_group or sketch_idx == num_sketches - 1) { + // meets new group or reach to the last sketch + for (auto reg_in_slot = 0; reg_in_slot < regs_in_slot; reg_in_slot++) { + // store to output + cuda::atomic_ref register_ref( + sketch_output[prev_group * m + reg_slot_begin + reg_in_slot]); + register_ref.fetch_max(max_register[reg_in_slot], cuda::memory_order_relaxed); + } + // reset + max_register[0] = max_register[1] = 0; + } + + // in the same group + for (auto reg_in_slot = 0; reg_in_slot < regs_in_slot; reg_in_slot++) { + int curr_register = sketches_cache[sketch_idx * m + reg_slot_begin + reg_in_slot]; + if (curr_register > max_register[reg_in_slot]) { max_register[reg_in_slot] = curr_register; } + } + + prev_group = curr_group; + } +} + +struct compact_functor { + int const num_sketches; + int const num_registers_per_sketch; + int* const uncompressed_sketches; + cudf::device_span d_results; + + __device__ void operator()(size_type row_idx) + { + int num_longs_per_sketch = num_registers_per_sketch / REGISTERS_PER_LONG + 1; + for (auto i = 0; i < num_longs_per_sketch; i++) { + long packed = 0; + for (auto j = 0; j < REGISTERS_PER_LONG; j++) { + long reg = + uncompressed_sketches[row_idx * num_registers_per_sketch + i * REGISTERS_PER_LONG + j]; + packed |= (reg << (j * REGISTER_VALUE_BITS)); + } + d_results[i][row_idx] = packed; + } + } +}; + +/** + * + * Compress HLLPP sketchs like Spark does + * The input is a int column, each sketch contains `num_registers_per_sketch` integers + * Each register value is 6 bits, 6 bits is big enough to hold 64(the max num of zeros in a long) + * Spark result layout is multiple long columns with each long contains 6 registers + * + * e.g.: + * three registers values are in binary: 100001, 100001, 10001 + * + * The input integers are(in binary): + * | 3 zeros bytes - 00100001 | 3 zeros bytes - 00100001 | 3 zeros bytes - 00100001 | + * Each integer saves 1 register(6 bits), most of the bits are wasted. + * + * The Spark bits in a long are: + * | leading zeros-100001-100001-100001| + */ +std::unique_ptr compact(int const num_sketches, + int const num_registers_per_sketch, + int* const uncompressed_sketches, + cudf::device_span d_results, + rmm::cuda_stream_view stream) +{ + // Consistent with Spark + int num_longs_per_sketch = num_registers_per_sketch / REGISTERS_PER_LONG + 1; + + std::vector> children; + for (auto i = 0; i < num_longs_per_sketch; i++) { + auto long_ptr = + make_numeric_column(data_type{type_id::INT64}, num_sketches, mask_state::ALL_VALID, stream); + children.push_back(std::move(long_ptr)); + } + auto result = cudf::make_structs_column(num_sketches, + std::move(children), + 0, // null count + rmm::device_buffer{}, // null mask + stream); + thrust::for_each_n( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + num_sketches, + compact_functor{num_sketches, num_registers_per_sketch, uncompressed_sketches, d_results}); + + return result; +} + +std::unique_ptr compute_hll(column_view const& input, + size_type num_groups, + cudf::device_span group_lables, + int const num_registers_per_sketch, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + printf("my-debug: p1\n"); + + // 1. compute all the hashs + auto hash_col = + make_numeric_column(data_type{type_id::INT64}, input.size(), mask_state::ALL_VALID, stream, mr); + auto d_input = cudf::column_device_view::create(input, stream); + bool const nullable = input.has_nulls(); + thrust::tabulate(rmm::exec_policy(stream), + hash_col->mutable_view().begin(), + hash_col->mutable_view().end(), + device_row_hasher(nullable, *d_input)); + auto d_hashs = cudf::column_device_view::create(hash_col->view(), stream); + + // 2. execute group by + constexpr int block_size = 256; + constexpr int num_hashs_per_thread = 32; // handles 32 items per thread + cudf::detail::grid_1d grid{input.size(), block_size, num_hashs_per_thread}; + int64_t total_threads = cudf::util::div_rounding_up_safe(input.size(), num_hashs_per_thread); + auto sketches_output = + rmm::device_uvector(static_cast(num_groups) * num_registers_per_sketch, stream); + auto sketches_cache = rmm::device_uvector(total_threads * num_registers_per_sketch, stream); + auto group_lables_cache = rmm::device_uvector(total_threads, stream); + group_sketches_kernel<<>>( + *d_hashs, + group_lables, + static_cast(num_registers_per_sketch), + sketches_output.begin(), + sketches_cache.begin(), + group_lables_cache.begin()); + + // 3. create output columns + auto num_long_columns = num_registers_per_sketch / REGISTERS_PER_LONG + 1; + auto const results_iter = cudf::detail::make_counting_transform_iterator(0, [&](int i) { + return make_numeric_column( + data_type{type_id::INT64}, num_groups, mask_state::ALL_VALID, stream, mr); + }); + auto children = + std::vector>(results_iter, results_iter + num_long_columns); + auto d_results = [&] { + auto host_results_pointer_iter = + thrust::make_transform_iterator(children.begin(), [](auto const& results_column) { + return results_column->mutable_view().template data(); + }); + auto host_results_pointers = + std::vector(host_results_pointer_iter, host_results_pointer_iter + children.size()); + return cudf::detail::make_device_uvector_async(host_results_pointers, stream, mr); + }(); + auto result = cudf::make_structs_column(num_groups, + std::move(children), + 0, // null count + rmm::device_buffer{}, // null mask + stream); + + // 4. compact sketches + compact(num_groups, num_registers_per_sketch, sketches_output.begin(), d_results, stream); + + printf("my-debug: p2\n"); + + return result; +} + +} // namespace + +/** + * Compute hyper log log against the input values and merge the sketches in the same group. + * Output is a struct column with multiple long columns which are consistent with Spark. + */ +std::unique_ptr group_hyper_log_log(column_view const& input, + size_type num_groups, + cudf::device_span group_lables, + int const num_registers_per_sketch, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + printf("my-debug: num_registers_per_sketch: %d\n", num_registers_per_sketch); + auto input_type = + cudf::is_dictionary(input.type()) ? dictionary_column_view(input).keys().type() : input.type(); + + return compute_hll(input, num_groups, group_lables, num_registers_per_sketch, stream, mr); +} + +} // namespace detail +} // namespace groupby +} // namespace cudf diff --git a/cpp/src/groupby/sort/group_merge_hyper_log_log.cu b/cpp/src/groupby/sort/group_merge_hyper_log_log.cu new file mode 100644 index 00000000000..8a13e353109 --- /dev/null +++ b/cpp/src/groupby/sort/group_merge_hyper_log_log.cu @@ -0,0 +1,422 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include // TODO #include once available +#include +#include +#include +#include +#include +#include +#include +#include + +namespace cudf { +namespace groupby { +namespace detail { +namespace { + +// Num of bits for register value; max zeros in long is 64; +// Six bits are sufficient. +constexpr uint64_t REGISTER_VALUE_BITS = 6; + +// saves 10 register values per long +constexpr uint64_t REGISTERS_PER_LONG = 10; + +// MASK binary: 111111 +constexpr uint64_t MASK = 1L << REGISTER_VALUE_BITS - 1L; + +__device__ inline int get_register_value(uint64_t const long_10_registers, int reg_idx) +{ + uint64_t shift_mask = MASK << (REGISTER_VALUE_BITS * reg_idx); + uint64_t v = (long_10_registers & shift_mask) >> (REGISTER_VALUE_BITS * reg_idx); + return static_cast(v); +} + +/** + * Phase 1 function to group sketches. + * + * Only supports register addressing bits: 8 or 9 + * + * Each sketch contains `num_registers_per_sketch` registers. + * Max value of register is 64, 64 is the zero number of hash value(long type). + * So six bits can hold a register value. Each long can hold 10 register values. + * + * e.g.: num_registers_per_sketch = 512. + * Each sketch uses 52 (512 / 10 + 1) longs. + * + * Input: + * col_0 col_1 col_51 + * sketch_0: long, long, ..., long + * sketch_1: long, long, ..., long + * sketch_2: long, long, ..., long + * + * num_threads = 52 * num_sketches_input / num_longs_per_threads. + * Each thread scans and merge 32 longs, output the max register value when meets a new group. + * For the end long in a thread, output the result into `registers_thread_cache`. + * + * Output e.g.: + * + * group_lables_thread_cache: + * [ + * g0 + * g0 + * g1 + * ... + * gN + * ] + * Has num_threads rows. + * + * registers_thread_cache: + * [ + * r0_g0, r1_g0, r2_g0, r3_g0, ... , r511_g0 // register values for group 0 + * r0_g0, r1_g0, r2_g0, r3_g0, ... , r511_g0 // register values for group 0 + * r0_g1, r1_g1, r2_g1, r3_g1, ... , r511_g1 // register values for group 1 + * ... + * r0_gN, r1_gN, r2_gN, r3_gN, ... , r511_gN // register values for group N + * ] + * Has num_threads rows, each row is corresponding to `group_lables_thread_cache` + * + * registers_output_cache: + * [ + * r0_g0, r1_g0, r2_g0, r3_g0, ... , r511_g0 // register values for group 0 + * r0_g1, r1_g1, r2_g1, r3_g1, ... , r511_g1 // register values for group 1 + * ... + * r0_gN, r1_gN, r2_gN, r3_gN, ... , r511_gN // register values for group N + * ] + * Has num_groups rows. + * + */ +template +CUDF_KERNEL void group_sketches_p1_kernel(cudf::device_span sketches_input, + int64_t const num_sketches_input, + int64_t const num_registers_per_sketch, + int64_t const num_groups, + cudf::device_span group_lables, + // num_groups * num_registers_per_sketch integers + int* const registers_output_cache, + // num_threads * num_registers_per_sketch integers + int* const registers_thread_cache, + // num_threads integers + size_type* const group_lables_thread_cache) +{ + auto const tid = cudf::detail::grid_1d::global_thread_id(); + auto num_threads_per_col = num_sketches_input / num_longs_per_threads; + auto left_tmp = num_sketches_input % num_longs_per_threads; + num_threads_per_col = left_tmp == 0 ? num_threads_per_col : (num_threads_per_col + 1); + auto const num_long_cols = sketches_input.size(); + if (tid >= num_threads_per_col * num_long_cols) { return; } + + auto const long_idx = tid / num_threads_per_col; + auto const thread_idx_in_cols = tid % num_threads_per_col; + int64_t const* const longs_ptr = sketches_input[long_idx]; + int* const registers_thread_ptr = registers_thread_cache + tid * num_registers_per_sketch; + auto const sketch_first = thread_idx_in_cols * num_longs_per_threads; + auto const sketch_end = cuda::std::min(sketch_first + num_longs_per_threads, num_sketches_input); + + int num_regs = REGISTERS_PER_LONG; + if (long_idx == num_long_cols) { num_regs = num_registers_per_sketch % REGISTERS_PER_LONG + 1; } + + for (auto i = 0; i < num_regs; i++) { + size_type prev_group = group_lables[sketch_first]; + int max_reg_v = 0; + int reg_idx_in_sketch = long_idx * REGISTERS_PER_LONG + i; + for (auto sketch_idx = sketch_first; sketch_idx < sketch_end; sketch_idx++) { + size_type curr_group = group_lables[sketch_idx]; + int64_t output_idx_for_prev_group = num_registers_per_sketch * prev_group + reg_idx_in_sketch; + int64_t output_idx_for_curr_group = num_registers_per_sketch * curr_group + reg_idx_in_sketch; + int curr_reg_v = get_register_value(longs_ptr[sketch_idx], i); + if (curr_group == prev_group) { + // still in the same group, update the max value + if (curr_reg_v > max_reg_v) { max_reg_v = curr_reg_v; } + } else { + // meets new group, save output for the previous group + registers_output_cache[output_idx_for_prev_group] = max_reg_v; + + // reset the cache + max_reg_v = curr_reg_v; + } + + // special logic for the last sketch in this thread + if (sketch_idx == sketch_end - 1) { + if (sketch_idx == num_sketches_input - 1) { + // last sketch, always outputs + registers_output_cache[output_idx_for_curr_group] = max_reg_v; + max_reg_v = curr_reg_v; + } + if (sketch_idx < num_sketches_input - 1 && curr_group != group_lables[sketch_idx + 1]) { + // look one mroe forward + registers_output_cache[output_idx_for_curr_group] = max_reg_v; + max_reg_v = curr_reg_v; + } + } + + prev_group = curr_group; + } + + // For each thread, output register values + registers_thread_ptr[reg_idx_in_sketch] = max_reg_v; + } + if (long_idx == 0) { + group_lables_thread_cache[thread_idx_in_cols] = group_lables[sketch_end - 1]; + } +} + +/* + * + * Phase 2 function to group sketches. + * + * For all register at the same index, starts a thread to merge the max value. + * num_threads = num_registers_per_sketch. + * + * Input e.g.: + * + * group_lables_thread_cache: + * [ + * g0 + * g0 + * g1 + * ... + * gN + * ] + * Has num_threads rows. + * + * registers_thread_cache: + * [ + * r0_g0, r1_g0, r2_g0, r3_g0, ... , r511_g0 // register values for group 0 + * r0_g0, r1_g0, r2_g0, r3_g0, ... , r511_g0 // register values for group 0 + * r0_g1, r1_g1, r2_g1, r3_g1, ... , r511_g1 // register values for group 1 + * ... + * r0_gN, r1_gN, r2_gN, r3_gN, ... , r511_gN // register values for group N + * ] + * Has num_threads rows, each row is corresponding to `group_lables_thread_cache` + * + * registers_output_cache: + * [ + * r0_g0, r1_g0, r2_g0, r3_g0, ... , r511_g0 // register values for group 0 + * r0_g1, r1_g1, r2_g1, r3_g1, ... , r511_g1 // register values for group 1 + * ... + * r0_gN, r1_gN, r2_gN, r3_gN, ... , r511_gN // register values for group N + * ] + * Has num_groups rows. + * + * Logic: + * Merges the register values vertically. + * e.g.: + * thread_0 merges [r0_g0, r0_g0, r0_g1, ..., r0_gN] + */ +CUDF_KERNEL void group_sketches_p2_kernel( + int64_t num_sketches, + int64_t num_registers_per_sketch, + int* const registers_output_cache, + int const* const registers_thread_cache, + cudf::device_span group_lables_thread_cache) +{ + auto const tid = cudf::detail::grid_1d::global_thread_id(); + auto const reg_idx = tid; + auto const register_first = tid * num_sketches; + int reg_max = 0; + int prev_group = group_lables_thread_cache[register_first]; + for (auto i = 0; i < num_sketches; i++) { + int curr_group = group_lables_thread_cache[i]; + int curr_reg_v = registers_thread_cache[register_first + num_sketches]; + if (curr_group == prev_group) { + if (curr_reg_v > reg_max) { reg_max = curr_reg_v; } + } else { + // meets a new group, store the result for previous group + cuda::atomic_ref register_ref( + registers_output_cache[prev_group * num_registers_per_sketch + reg_idx]); + register_ref.fetch_max(reg_max); + reg_max = curr_reg_v; + } + + if (i == num_sketches - 1) { + // handle the last register in this thread + cuda::atomic_ref register_ref( + registers_output_cache[curr_group * num_registers_per_sketch + reg_idx]); + register_ref.fetch_max(reg_max); + } + + prev_group = curr_group; + } +} + +/** + * + * Phase 3 function to group sketches. + * Compact register values, compact six register values in to a long. + * Number of threads is num_groups * num_longs_per_sketch + * + * e.g.: + * Input: + * registers_output_cache: + * [ + * r0_g0, r1_g0, r2_g0, r3_g0, ... , r511_g0 // register values for group 0 + * r0_g1, r1_g1, r2_g1, r3_g1, ... , r511_g1 // register values for group 1 + * ... + * r0_gN, r1_gN, r2_gN, r3_gN, ... , r511_gN // register values for group N + * ] + * Has num_groups rows. + * + */ +CUDF_KERNEL void group_sketches_p3_kernel(int64_t const num_groups, + int64_t const num_registers_per_sketch, + int64_t const num_longs_per_sketch, + cudf::device_span sketches_output, + // num_groups * num_registers_per_sketch integers + cudf::device_span registers_output_cache) +{ + auto const tid = cudf::detail::grid_1d::global_thread_id(); + if (tid >= num_groups * num_longs_per_sketch) { return; } + + auto const group_idx = tid / num_longs_per_sketch; + auto const long_idx = tid % num_longs_per_sketch; + + auto const reg_begin_idx = group_idx * num_registers_per_sketch + long_idx * REGISTERS_PER_LONG; + int64_t num_regs = REGISTERS_PER_LONG; + if (long_idx == num_longs_per_sketch - 1) { + num_regs = num_registers_per_sketch % REGISTERS_PER_LONG; + } + + uint64_t ten_registers = 0; + for (auto i = 0; i < num_regs; i++) { + uint64_t tmp = registers_output_cache[reg_begin_idx + i] << (REGISTER_VALUE_BITS * i); + ten_registers = ten_registers | tmp; + } + + sketches_output[long_idx][group_idx] = ten_registers; +} + +std::unique_ptr merge_hyper_log_log( + column_view const& hll_input, // struct column + int64_t const num_groups, + cudf::device_span group_lables, + int64_t const num_registers_per_sketch, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + printf("my-debug: m p1\n"); + int64_t const num_sketches = hll_input.size(); + int64_t const num_long_cols = num_registers_per_sketch / REGISTERS_PER_LONG + 1; + constexpr int64_t num_longs_per_threads = 32; + constexpr int64_t block_size = 256; + int64_t num_threads = + cudf::util::div_rounding_up_safe(num_sketches, num_longs_per_threads) * num_long_cols; + int64_t num_blocks = cudf::util::div_rounding_up_safe(num_threads, block_size); + + auto registers_output_cache = + rmm::device_uvector(num_registers_per_sketch * num_groups, stream, mr); + { + auto registers_thread_cache = + rmm::device_uvector(num_registers_per_sketch * num_threads, stream, mr); + auto group_lables_thread_cache = + rmm::device_uvector(num_registers_per_sketch * num_groups, stream, mr); + + auto const input_iter = cudf::detail::make_counting_transform_iterator( + 0, [&](int i) { return hll_input.child(i).begin(); }); + auto input_cols = std::vector(input_iter, input_iter + num_long_cols); + auto d_inputs = cudf::detail::make_device_uvector_async(input_cols, stream, mr); + + group_sketches_p1_kernel + <<>>(d_inputs, + num_sketches, + num_registers_per_sketch, + num_groups, + group_lables, + registers_output_cache.begin(), + registers_thread_cache.begin(), + group_lables_thread_cache.begin()); + + auto num_phase2_threads = num_registers_per_sketch / block_size; + auto num_phase2_blocks = cudf::util::div_rounding_up_safe(num_phase2_threads, block_size); + group_sketches_p2_kernel<<>>( + num_sketches, + num_registers_per_sketch, + registers_output_cache.begin(), + registers_thread_cache.begin(), + group_lables_thread_cache); + } + + // create output columns + auto const results_iter = cudf::detail::make_counting_transform_iterator(0, [&](int i) { + return make_numeric_column( + data_type{type_id::INT64}, num_groups, mask_state::ALL_VALID, stream, mr); + }); + auto results = std::vector>(results_iter, results_iter + num_long_cols); + auto sketches_output = [&] { + auto host_results_pointer_iter = + thrust::make_transform_iterator(results.begin(), [](auto const& results_column) { + return results_column->mutable_view().template data(); + }); + auto host_results_pointers = + std::vector(host_results_pointer_iter, host_results_pointer_iter + results.size()); + return cudf::detail::make_device_uvector_async(host_results_pointers, stream, mr); + }(); + + // compact + auto num_phase3_threads = num_groups * num_long_cols; + auto num_phase3_blocks = cudf::util::div_rounding_up_safe(num_phase3_threads, block_size); + + group_sketches_p3_kernel<<>>( + num_groups, num_registers_per_sketch, num_long_cols, sketches_output, registers_output_cache); + + printf("my-debug: m p2\n"); + + return make_structs_column(num_groups, std::move(results), 0, rmm::device_buffer{}); +} + +} // namespace + +std::unique_ptr group_merge_hyper_log_log(column_view const& values, + int64_t const num_groups, + cudf::device_span group_lables, + int64_t const num_registers_per_sketch, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + printf("my-debug: num_registers_per_sketch: %ld\n", num_registers_per_sketch); + + CUDF_EXPECTS(values.type().id() == type_id::STRUCT, + "HyperLogLogPlusPlus buffer type must be a STRUCT of long columns."); + for (auto i = 0; i < values.num_children(); i++) { + CUDF_EXPECTS(values.child(i).type().id() == type_id::INT64, + "HyperLogLogPlusPlus buffer type must be a STRUCT of long columns."); + } + + return merge_hyper_log_log( + values, num_groups, group_lables, num_registers_per_sketch, stream, mr); +} + +} // namespace detail +} // namespace groupby +} // namespace cudf diff --git a/cpp/src/groupby/sort/group_reductions.hpp b/cpp/src/groupby/sort/group_reductions.hpp index f8a531094c6..13600a598bc 100644 --- a/cpp/src/groupby/sort/group_reductions.hpp +++ b/cpp/src/groupby/sort/group_reductions.hpp @@ -539,6 +539,19 @@ std::unique_ptr group_correlation(column_view const& covariance, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); +std::unique_ptr group_hyper_log_log(column_view const& values, + size_type num_groups, + cudf::device_span group_offsets, + int const num_registers_per_sketch, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +std::unique_ptr group_merge_hyper_log_log(column_view const& values, + long const num_groups, + cudf::device_span group_offsets, + long const num_registers_per_sketch, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); } // namespace detail } // namespace groupby } // namespace cudf diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index a4213dcbe94..3a5b884ea61 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -133,6 +133,7 @@ ConfigureTest( groupby/groupby_test_util.cpp groupby/groups_tests.cpp groupby/histogram_tests.cpp + groupby/hll_tests.cpp groupby/keys_tests.cpp groupby/lists_tests.cpp groupby/m2_tests.cpp diff --git a/cpp/tests/groupby/hll_tests.cpp b/cpp/tests/groupby/hll_tests.cpp new file mode 100644 index 00000000000..02300c6e32a --- /dev/null +++ b/cpp/tests/groupby/hll_tests.cpp @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include + +using namespace cudf::test::iterators; + +namespace { +constexpr cudf::test::debug_output_level verbosity{cudf::test::debug_output_level::FIRST_ERROR}; +constexpr int32_t null{0}; // Mark for null elements +constexpr double NaN{std::numeric_limits::quiet_NaN()}; // Mark for NaN double elements + +template +using keys_col = cudf::test::fixed_width_column_wrapper; + +template +using vals_col = cudf::test::fixed_width_column_wrapper; + +template +using M2s_col = cudf::test::fixed_width_column_wrapper; + +auto compute_HLL(cudf::column_view const& keys, cudf::column_view const& values) +{ + std::vector requests; + requests.emplace_back(); + requests[0].values = values; + requests[0].aggregations.emplace_back( + // TODO + cudf::make_hyper_log_log_aggregation(416)); + + auto gb_obj = cudf::groupby::groupby(cudf::table_view({keys})); + auto result = gb_obj.aggregate(requests); + return std::pair(std::move(result.first->release()[0]), std::move(result.second[0].results[0])); +} +} // namespace + +template +struct GroupbyHLLTypedTest : public cudf::test::BaseFixture {}; + +using TestTypes = cudf::test::Concat, + cudf::test::FloatingPointTypes>; +TYPED_TEST_SUITE(GroupbyHLLTypedTest, TestTypes); + +TYPED_TEST(GroupbyHLLTypedTest, SimpleInput) +{ + using T = TypeParam; + + // key = 1: vals = [0, 3, 6] + // key = 2: vals = [1, 4, 5, 9] + // key = 3: vals = [2, 7, 8] + auto const keys = keys_col{1, 2, 3, 1, 2, 2, 1, 3, 3, 2}; + auto const vals = vals_col{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + + compute_HLL(keys, vals); +} diff --git a/java/src/main/java/ai/rapids/cudf/Aggregation.java b/java/src/main/java/ai/rapids/cudf/Aggregation.java index 379750bb0b7..754c1e7b594 100644 --- a/java/src/main/java/ai/rapids/cudf/Aggregation.java +++ b/java/src/main/java/ai/rapids/cudf/Aggregation.java @@ -70,7 +70,9 @@ enum Kind { TDIGEST(31), // This can take a delta argument for accuracy level MERGE_TDIGEST(32), // This can take a delta argument for accuracy level HISTOGRAM(33), - MERGE_HISTOGRAM(34); + MERGE_HISTOGRAM(34), + HLLPP(35), + MERGE_HLLPP(36); final int nativeId; @@ -912,6 +914,66 @@ public boolean equals(Object other) { } } + private static final class HLLAggregation extends Aggregation { + private final int num_registers_per_sketch; + + public HLLAggregation(Kind kind, int num_registers_per_sketch) { + super(kind); + this.num_registers_per_sketch = num_registers_per_sketch; + } + + @Override + long createNativeInstance() { + return Aggregation.createHLLAgg(kind.nativeId, num_registers_per_sketch); + } + + @Override + public int hashCode() { + return 31 * kind.hashCode() + num_registers_per_sketch; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } else if (other instanceof HLLAggregation) { + HLLAggregation o = (HLLAggregation) other; + return o.num_registers_per_sketch == this.num_registers_per_sketch; + } + return false; + } + } + + static final class MergeHLLAggregation extends Aggregation { + private final int num_registers_per_sketch; + + public MergeHLLAggregation(Kind kind, int num_registers_per_sketch) { + super(kind); + this.num_registers_per_sketch = num_registers_per_sketch; + } + + @Override + long createNativeInstance() { + return Aggregation.createHLLAgg(kind.nativeId, num_registers_per_sketch); + } + + @Override + public int hashCode() { + return 31 * kind.hashCode() + num_registers_per_sketch; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } else if (other instanceof MergeHLLAggregation) { + MergeHLLAggregation o = (MergeHLLAggregation) other; + return o.num_registers_per_sketch == this.num_registers_per_sketch; + } + return false; + } + } + static TDigestAggregation createTDigest(int delta) { return new TDigestAggregation(Kind.TDIGEST, delta); } @@ -940,6 +1002,14 @@ static MergeHistogramAggregation mergeHistogram() { return new MergeHistogramAggregation(); } + static HLLAggregation HLLPP(int numRegistersPerSketch) { + return new HLLAggregation(Kind.HLLPP, numRegistersPerSketch); + } + + static MergeHLLAggregation mergeHLLPP(int numRegistersPerSketch) { + return new MergeHLLAggregation(Kind.MERGE_HLLPP, numRegistersPerSketch); + } + /** * Create one of the aggregations that only needs a kind, no other parameters. This does not * work for all types and for code safety reasons each kind is added separately. @@ -990,4 +1060,6 @@ static MergeHistogramAggregation mergeHistogram() { * Create a TDigest aggregation. */ private static native long createTDigestAgg(int kind, int delta); + + private static native long createHLLAgg(int kind, int numRegistersPerSketch); } diff --git a/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java b/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java index 0fae33927b6..5fcba0c1619 100644 --- a/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java +++ b/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java @@ -337,4 +337,12 @@ public static GroupByAggregation histogram() { public static GroupByAggregation mergeHistogram() { return new GroupByAggregation(Aggregation.mergeHistogram()); } + + public static GroupByAggregation HLLPP(int numRegistersPerSketch) { + return new GroupByAggregation(Aggregation.HLLPP(numRegistersPerSketch)); + } + + public static GroupByAggregation mergeHLL(int numRegistersPerSketch) { + return new GroupByAggregation(Aggregation.mergeHLLPP(numRegistersPerSketch)); + } } diff --git a/java/src/main/java/ai/rapids/cudf/ReductionAggregation.java b/java/src/main/java/ai/rapids/cudf/ReductionAggregation.java index ba8ae379bae..02dc2e33c0b 100644 --- a/java/src/main/java/ai/rapids/cudf/ReductionAggregation.java +++ b/java/src/main/java/ai/rapids/cudf/ReductionAggregation.java @@ -304,4 +304,12 @@ public static ReductionAggregation histogram() { public static ReductionAggregation mergeHistogram() { return new ReductionAggregation(Aggregation.mergeHistogram()); } + + public static ReductionAggregation HLLPP(int numRegistersPerSketch) { + return new ReductionAggregation(Aggregation.HLLPP(numRegistersPerSketch)); + } + + public static ReductionAggregation mergeHLL(int numRegistersPerSketch) { + return new ReductionAggregation(Aggregation.mergeHLLPP(numRegistersPerSketch)); + } } diff --git a/java/src/main/native/src/AggregationJni.cpp b/java/src/main/native/src/AggregationJni.cpp index c40f1c55500..26e8c958430 100644 --- a/java/src/main/native/src/AggregationJni.cpp +++ b/java/src/main/native/src/AggregationJni.cpp @@ -100,7 +100,6 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createNoParamAgg(JNIEnv* return cudf::make_histogram_aggregation(); case 34: // MERGE_HISTOGRAM return cudf::make_merge_histogram_aggregation(); - default: throw std::logic_error("Unsupported No Parameter Aggregation Operation"); } }(); @@ -296,4 +295,31 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createMergeSetsAgg(JNIEn CATCH_STD(env, 0); } +JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createHLLAgg(JNIEnv* env, + jclass class_object, + jint kind, + jint num_registers_per_sketch) +{ + try { + cudf::jni::auto_set_device(env); + + // TODO + printf("my-debug: jni num is %d \n", num_registers_per_sketch); + + std::unique_ptr ret; + // These numbers come from Aggregation.java and must stay in sync + switch (kind) { + case 35: // HLLPP + ret = cudf::make_hyper_log_log_aggregation(num_registers_per_sketch); + break; + case 36: // MERGE_HLLPP + ret = cudf::make_merge_hyper_log_log_aggregation(num_registers_per_sketch); + break; + default: throw std::logic_error("Unsupported HyperLogLog++ Aggregation Operation"); + } + return reinterpret_cast(ret.release()); + } + CATCH_STD(env, 0); +} + } // extern "C" diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index c7fcb1756b6..5a0a6b5cea4 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -24,7 +24,7 @@ import ai.rapids.cudf.HostColumnVector.ListType; import ai.rapids.cudf.HostColumnVector.StructData; import ai.rapids.cudf.HostColumnVector.StructType; - +import ai.rapids.cudf.Table.TestBuilder; import ai.rapids.cudf.ast.BinaryOperation; import ai.rapids.cudf.ast.BinaryOperator; import ai.rapids.cudf.ast.ColumnReference; @@ -58,7 +58,9 @@ import static ai.rapids.cudf.AssertUtils.assertPartialTablesAreEqual; import static ai.rapids.cudf.AssertUtils.assertTableTypes; import static ai.rapids.cudf.AssertUtils.assertTablesAreEqual; +import static ai.rapids.cudf.ColumnWriterOptions.listBuilder; import static ai.rapids.cudf.ColumnWriterOptions.mapColumn; +import static ai.rapids.cudf.ColumnWriterOptions.structBuilder; import static ai.rapids.cudf.ParquetWriterOptions.listBuilder; import static ai.rapids.cudf.ParquetWriterOptions.structBuilder; import static ai.rapids.cudf.Table.TestBuilder; @@ -10016,4 +10018,15 @@ void testSample() { } } } + + @Test + void testGroupByHLL() { + // A trivial test: + try (Table input = new Table.TestBuilder().column(1, 2, 3, 1, 2, 2, 1, 3, 3, 2) + .column(0, 1, -2, 3, -4, -5, -6, 7, -8, 9) + .build()){ + input.groupBy(0).aggregate(GroupByAggregation.M2() + .onColumn(1)); + } + } }