From 9f6acc6fe431ac0fe51842451e80422a43696b43 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 1 Sep 2023 12:27:29 -0700 Subject: [PATCH 01/13] Support negative preceding/following for ROW window functions This commit adds support for "offset" ROW windows, where the preceding and following window bounds are allowed to have negative values. This allows window definitions to exclude the current row entirely. Prior to this change, ROW-based windows *had* to include the current row, causing `preceding` and `following` to support only non-negative values. Additionally, the inclusion of the current row would count against the `min_periods` check. The following is an example of the new "negative" semantics. Consider the input: ```c++ auto const row = ints_column{1, 2, 3, 4}; ``` If the window bounds are specified as (preceding=3, following=-1), then the window for the third row (`3`) is `{1, 2}`. `following=-1` indicates a "following" row *before* the current row. A negative value for `preceding` follows the existing convention of including the current row. This makes it slightly more involved: 1. `preceding=2` indicates *one* row before the current row. 2. `preceding=1` indicates the current row. 3. `preceding=0` indicates one row past (i.e. after) the current row. 4. `preceding=-1` indicates two rows after the current row. Et cetera. `min_periods` checks continue to be honoured as before, but the requirement for positive `min_periods` is dropped. `min_periods` only need be non-negative. Signed-off-by: MithunR --- cpp/src/rolling/detail/rolling.cuh | 10 +- .../rolling/detail/rolling_fixed_window.cu | 6 +- cpp/src/rolling/grouped_rolling.cu | 5 +- cpp/src/rolling/rolling.cu | 4 +- cpp/tests/CMakeLists.txt | 1 + cpp/tests/rolling/grouped_rolling_test.cpp | 2 +- cpp/tests/rolling/offset_row_window_test.cpp | 164 ++++++++++++++++++ cpp/tests/rolling/rolling_test.cpp | 23 +-- 8 files changed, 187 insertions(+), 28 deletions(-) create mode 100644 cpp/tests/rolling/offset_row_window_test.cpp diff --git a/cpp/src/rolling/detail/rolling.cuh b/cpp/src/rolling/detail/rolling.cuh index 3b6d53f43c4..6992f4f0a27 100644 --- a/cpp/src/rolling/detail/rolling.cuh +++ b/cpp/src/rolling/detail/rolling.cuh @@ -91,14 +91,14 @@ struct DeviceRolling { // operations we do support template - DeviceRolling(size_type _min_periods, std::enable_if_t()>* = nullptr) + explicit DeviceRolling(size_type _min_periods, std::enable_if_t()>* = nullptr) : min_periods(_min_periods) { } // operations we don't support template - DeviceRolling(size_type _min_periods, std::enable_if_t()>* = nullptr) + explicit DeviceRolling(size_type _min_periods, std::enable_if_t()>* = nullptr) : min_periods(_min_periods) { CUDF_FAIL("Invalid aggregation/type pair"); @@ -111,7 +111,7 @@ struct DeviceRolling { mutable_column_device_view& output, size_type start_index, size_type end_index, - size_type current_index) + size_type current_index) const { using AggOp = typename corresponding_operator::type; AggOp agg_op; @@ -144,7 +144,7 @@ struct DeviceRolling { template struct DeviceRollingArgMinMaxBase { size_type min_periods; - DeviceRollingArgMinMaxBase(size_type _min_periods) : min_periods(_min_periods) {} + explicit DeviceRollingArgMinMaxBase(size_type _min_periods) : min_periods(_min_periods) {} static constexpr bool is_supported() { @@ -162,7 +162,7 @@ struct DeviceRollingArgMinMaxBase { */ template struct DeviceRollingArgMinMaxString : DeviceRollingArgMinMaxBase { - DeviceRollingArgMinMaxString(size_type _min_periods) + explicit DeviceRollingArgMinMaxString(size_type _min_periods) : DeviceRollingArgMinMaxBase(_min_periods) { } diff --git a/cpp/src/rolling/detail/rolling_fixed_window.cu b/cpp/src/rolling/detail/rolling_fixed_window.cu index fb7b1b5f590..93f9ee3a206 100644 --- a/cpp/src/rolling/detail/rolling_fixed_window.cu +++ b/cpp/src/rolling/detail/rolling_fixed_window.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, NVIDIA CORPORATION. + * Copyright (c) 2022-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,6 @@ #include #include -#include namespace cudf::detail { @@ -43,6 +42,9 @@ std::unique_ptr rolling_window(column_view const& input, CUDF_EXPECTS((default_outputs.is_empty() || default_outputs.size() == input.size()), "Defaults column must be either empty or have as many rows as the input column."); + CUDF_EXPECTS(-(preceding_window - 1) <= following_window, + "Preceding window bounds must precede the following window bounds."); + if (agg.kind == aggregation::CUDA || agg.kind == aggregation::PTX) { // TODO: In future, might need to clamp preceding/following to column boundaries. return cudf::detail::rolling_window_udf(input, diff --git a/cpp/src/rolling/grouped_rolling.cu b/cpp/src/rolling/grouped_rolling.cu index ca5c04d1c4f..654358dee1e 100644 --- a/cpp/src/rolling/grouped_rolling.cu +++ b/cpp/src/rolling/grouped_rolling.cu @@ -111,7 +111,7 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, CUDF_EXPECTS((group_keys.num_columns() == 0 || group_keys.num_rows() == input.size()), "Size mismatch between group_keys and input vector."); - CUDF_EXPECTS((min_periods > 0), "min_periods must be positive"); + CUDF_EXPECTS((min_periods >= 0), "min_periods must be non-negative"); CUDF_EXPECTS((default_outputs.is_empty() || default_outputs.size() == input.size()), "Defaults column must be either empty or have as many rows as the input column."); @@ -127,6 +127,9 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, auto const preceding_window = preceding_window_bounds.value(); auto const following_window = following_window_bounds.value(); + CUDF_EXPECTS(-(preceding_window - 1) <= following_window, + "Preceding window bounds must precede the following window bounds."); + if (group_keys.num_columns() == 0) { // No Groupby columns specified. Treat as one big group. return rolling_window( diff --git a/cpp/src/rolling/rolling.cu b/cpp/src/rolling/rolling.cu index d699d7bea85..5c78cc4382d 100644 --- a/cpp/src/rolling/rolling.cu +++ b/cpp/src/rolling/rolling.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,6 @@ #include #include -#include - namespace cudf { // Applies a fixed-size rolling window function to the values in a column, with default output diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 8a0aa27b175..33a7b3225c2 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -453,6 +453,7 @@ ConfigureTest( rolling/grouped_rolling_test.cpp rolling/lead_lag_test.cpp rolling/nth_element_test.cpp + rolling/offset_row_window_test.cpp rolling/range_comparator_test.cu rolling/range_rolling_window_test.cpp rolling/range_window_bounds_test.cpp diff --git a/cpp/tests/rolling/grouped_rolling_test.cpp b/cpp/tests/rolling/grouped_rolling_test.cpp index 774f2f7fc40..af19c4e7e84 100644 --- a/cpp/tests/rolling/grouped_rolling_test.cpp +++ b/cpp/tests/rolling/grouped_rolling_test.cpp @@ -637,7 +637,7 @@ TYPED_TEST(GroupedRollingTest, ZeroWindow) key_1_vec.end()); const cudf::table_view grouping_keys{std::vector{key_0, key_1}}; - cudf::size_type preceding_window = 0; + cudf::size_type preceding_window = 1; cudf::size_type following_window = 0; std::vector expected_group_offsets{0, 4, 8, DATA_SIZE}; diff --git a/cpp/tests/rolling/offset_row_window_test.cpp b/cpp/tests/rolling/offset_row_window_test.cpp new file mode 100644 index 00000000000..58ecbd6255a --- /dev/null +++ b/cpp/tests/rolling/offset_row_window_test.cpp @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +template +using fwcw = cudf::test::fixed_width_column_wrapper; +template +using decimals_column = cudf::test::fixed_point_column_wrapper; +using ints_column = fwcw; +using bigints_column = fwcw; +using strings_column = cudf::test::strings_column_wrapper; +using lists_column = cudf::test::lists_column_wrapper; +using column_ptr = std::unique_ptr; +using cudf::test::iterators::no_nulls; +using cudf::test::iterators::nulls_at; + +auto constexpr null = int32_t{0}; // NULL representation for int32_t; + +struct OffsetRowWindowTest : public cudf::test::BaseFixture { + static ints_column const _keys; // {0, 0, 0, 0, 0, 0, 1, 1, 1, 1}; + static ints_column const _values; // {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + + struct rolling_runner { + cudf::window_bounds _preceding, _following; + cudf::size_type _min_periods; + + rolling_runner(cudf::window_bounds const& preceding, + cudf::window_bounds const& following, + cudf::size_type min_periods_ = 1) + : _preceding{preceding}, _following{following}, _min_periods{min_periods_} + { + } + + rolling_runner& min_periods(cudf::size_type min_periods_) + { + _min_periods = min_periods_; + return *this; + } + + std::unique_ptr operator()(cudf::rolling_aggregation const& agg) const + { + return cudf::grouped_rolling_window( + cudf::table_view{{_keys}}, _values, _preceding, _following, _min_periods, agg); + } + }; +}; + +ints_column const OffsetRowWindowTest::_keys{0, 0, 0, 0, 0, 0, 1, 1, 1, 1}; +ints_column const OffsetRowWindowTest::_values{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + +TEST_F(OffsetRowWindowTest, OffsetRowWindow_3_to_Minus_1) +{ + auto const preceding = cudf::window_bounds::get(3); + auto const following = cudf::window_bounds::get(-1); + auto run_rolling = rolling_runner{preceding, following, 1}; + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling( + *cudf::make_count_aggregation(cudf::null_policy::EXCLUDE)), + ints_column{{0, 1, 2, 2, 2, 2, 0, 1, 2, 2}, nulls_at({0, 6})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling( + *cudf::make_count_aggregation(cudf::null_policy::INCLUDE)), + ints_column{{0, 1, 2, 2, 2, 2, 0, 1, 2, 2}, nulls_at({0, 6})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling(*cudf::make_min_aggregation()), + ints_column{{null, 0, 0, 1, 2, 3, null, 6, 6, 7}, nulls_at({0, 6})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling(*cudf::make_max_aggregation()), + ints_column{{null, 0, 1, 2, 3, 4, null, 6, 7, 8}, nulls_at({0, 6})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling(*cudf::make_sum_aggregation()), + bigints_column{{null, 0, 1, 3, 5, 7, null, 6, 13, 15}, nulls_at({0, 6})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling(*cudf::make_collect_list_aggregation()), + lists_column{{{}, {0}, {0, 1}, {1, 2}, {2, 3}, {3, 4}, {}, {6}, {6, 7}, {7, 8}}, + nulls_at({0, 6})}); + + run_rolling.min_periods(0); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling( + *cudf::make_count_aggregation(cudf::null_policy::EXCLUDE)), + ints_column{{0, 1, 2, 2, 2, 2, 0, 1, 2, 2}, no_nulls()}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling( + *cudf::make_count_aggregation(cudf::null_policy::INCLUDE)), + ints_column{{0, 1, 2, 2, 2, 2, 0, 1, 2, 2}, no_nulls()}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling(*cudf::make_collect_list_aggregation()), + lists_column{{{}, {0}, {0, 1}, {1, 2}, {2, 3}, {3, 4}, {}, {6}, {6, 7}, {7, 8}}, no_nulls()}); +} + +TEST_F(OffsetRowWindowTest, OffsetRowWindow_0_to_2) +{ + auto const preceding = cudf::window_bounds::get(0); + auto const following = cudf::window_bounds::get(2); + auto run_rolling = rolling_runner{preceding, following, 1}; + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling( + *cudf::make_count_aggregation(cudf::null_policy::EXCLUDE)), + ints_column{{2, 2, 2, 2, 1, null, 2, 2, 1, null}, nulls_at({5, 9})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling( + *cudf::make_count_aggregation(cudf::null_policy::INCLUDE)), + ints_column{{2, 2, 2, 2, 1, null, 2, 2, 1, null}, nulls_at({5, 9})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling(*cudf::make_min_aggregation()), + ints_column{{1, 2, 3, 4, 5, null, 7, 8, 9, null}, nulls_at({5, 9})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling(*cudf::make_max_aggregation()), + ints_column{{2, 3, 4, 5, 5, null, 8, 9, 9, null}, nulls_at({5, 9})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling(*cudf::make_sum_aggregation()), + bigints_column{{3, 5, 7, 9, 5, null, 15, 17, 9, null}, nulls_at({5, 9})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling(*cudf::make_collect_list_aggregation()), + lists_column{{{1, 2}, {2, 3}, {3, 4}, {4, 5}, {5}, {}, {7, 8}, {8, 9}, {9}, {}}, + nulls_at({5, 9})}); + + run_rolling.min_periods(0); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling( + *cudf::make_count_aggregation(cudf::null_policy::INCLUDE)), + ints_column{{2, 2, 2, 2, 1, 0, 2, 2, 1, 0}, no_nulls()}); +} diff --git a/cpp/tests/rolling/rolling_test.cpp b/cpp/tests/rolling/rolling_test.cpp index e410e2488b3..d0181974479 100644 --- a/cpp/tests/rolling/rolling_test.cpp +++ b/cpp/tests/rolling/rolling_test.cpp @@ -148,20 +148,6 @@ TEST_F(RollingStringTest, MinPeriods) CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count_all, got_count_all->view()); } -TEST_F(RollingStringTest, ZeroWindowSize) -{ - cudf::test::strings_column_wrapper input( - {"This", "is", "rolling", "test", "being", "operated", "on", "string", "column"}, - {1, 0, 0, 1, 0, 1, 1, 1, 0}); - cudf::test::fixed_width_column_wrapper expected_count( - {0, 0, 0, 0, 0, 0, 0, 0, 0}, {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}); - - auto got_count = cudf::rolling_window( - input, 0, 0, 0, *cudf::make_count_aggregation()); - - CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count, got_count->view()); -} - // ========================================================================================= class RollingStructTest : public cudf::test::BaseFixture {}; @@ -970,6 +956,7 @@ TEST_F(RollingtVarStdTestUntyped, SimpleStaticVarianceStdInfNaN) #undef XXX } +/* // negative sizes TYPED_TEST(RollingTest, NegativeWindowSizes) { @@ -980,10 +967,12 @@ TYPED_TEST(RollingTest, NegativeWindowSizes) std::vector window{3}; std::vector negative_window{-2}; + this->run_test_col_agg(input, negative_window, window, 1); this->run_test_col_agg(input, window, negative_window, 1); this->run_test_col_agg(input, negative_window, negative_window, 1); } + */ // simple example from Pandas docs: TYPED_TEST(RollingTest, SimpleDynamic) @@ -1033,6 +1022,7 @@ TYPED_TEST(RollingTest, AllInvalid) } // window = following_window = 0 +// Note: Preceding includes current row, so its value is set to 1. TYPED_TEST(RollingTest, ZeroWindow) { cudf::size_type num_rows = 1000; @@ -1042,10 +1032,11 @@ TYPED_TEST(RollingTest, ZeroWindow) cudf::test::fixed_width_column_wrapper input( col_data.begin(), col_data.end(), col_mask.begin()); - std::vector window({0}); + std::vector preceding({0}); + std::vector following({1}); cudf::size_type periods = num_rows; - this->run_test_col_agg(input, window, window, periods); + this->run_test_col_agg(input, preceding, following, periods); } // min_periods = 0 From 7e4ae8bb05bc220449444e8c862779925a03dba4 Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 5 Sep 2023 15:09:46 -0700 Subject: [PATCH 02/13] Minor refactor to shorten agg operators. --- cpp/tests/rolling/offset_row_window_test.cpp | 50 ++++++++++++-------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/cpp/tests/rolling/offset_row_window_test.cpp b/cpp/tests/rolling/offset_row_window_test.cpp index 58ecbd6255a..956aabb2bbd 100644 --- a/cpp/tests/rolling/offset_row_window_test.cpp +++ b/cpp/tests/rolling/offset_row_window_test.cpp @@ -72,6 +72,13 @@ struct OffsetRowWindowTest : public cudf::test::BaseFixture { ints_column const OffsetRowWindowTest::_keys{0, 0, 0, 0, 0, 0, 1, 1, 1, 1}; ints_column const OffsetRowWindowTest::_values{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; +auto const AGG_COUNT_NON_NULL = cudf::make_count_aggregation(cudf::null_policy::EXCLUDE); +auto const AGG_COUNT_ALL = cudf::make_count_aggregation(cudf::null_policy::INCLUDE); +auto const AGG_MIN = cudf::make_min_aggregation(); +auto const AGG_MAX = cudf::make_max_aggregation(); +auto const AGG_SUM = cudf::make_sum_aggregation(); +auto const AGG_COLLECT_LIST = cudf::make_collect_list_aggregation(); + TEST_F(OffsetRowWindowTest, OffsetRowWindow_3_to_Minus_1) { auto const preceding = cudf::window_bounds::get(3); @@ -79,29 +86,27 @@ TEST_F(OffsetRowWindowTest, OffsetRowWindow_3_to_Minus_1) auto run_rolling = rolling_runner{preceding, following, 1}; CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling( - *cudf::make_count_aggregation(cudf::null_policy::EXCLUDE)), + *run_rolling(*AGG_COUNT_NON_NULL), ints_column{{0, 1, 2, 2, 2, 2, 0, 1, 2, 2}, nulls_at({0, 6})}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling( - *cudf::make_count_aggregation(cudf::null_policy::INCLUDE)), + *run_rolling(*AGG_COUNT_ALL), ints_column{{0, 1, 2, 2, 2, 2, 0, 1, 2, 2}, nulls_at({0, 6})}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling(*cudf::make_min_aggregation()), + *run_rolling(*AGG_MIN), ints_column{{null, 0, 0, 1, 2, 3, null, 6, 6, 7}, nulls_at({0, 6})}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling(*cudf::make_max_aggregation()), + *run_rolling(*AGG_MAX), ints_column{{null, 0, 1, 2, 3, 4, null, 6, 7, 8}, nulls_at({0, 6})}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling(*cudf::make_sum_aggregation()), + *run_rolling(*AGG_SUM), bigints_column{{null, 0, 1, 3, 5, 7, null, 6, 13, 15}, nulls_at({0, 6})}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling(*cudf::make_collect_list_aggregation()), + *run_rolling(*AGG_COLLECT_LIST), lists_column{{{}, {0}, {0, 1}, {1, 2}, {2, 3}, {3, 4}, {}, {6}, {6, 7}, {7, 8}}, nulls_at({0, 6})}); @@ -109,16 +114,16 @@ TEST_F(OffsetRowWindowTest, OffsetRowWindow_3_to_Minus_1) CUDF_TEST_EXPECT_COLUMNS_EQUAL( *run_rolling( - *cudf::make_count_aggregation(cudf::null_policy::EXCLUDE)), + *AGG_COUNT_NON_NULL), ints_column{{0, 1, 2, 2, 2, 2, 0, 1, 2, 2}, no_nulls()}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( *run_rolling( - *cudf::make_count_aggregation(cudf::null_policy::INCLUDE)), + *AGG_COUNT_ALL), ints_column{{0, 1, 2, 2, 2, 2, 0, 1, 2, 2}, no_nulls()}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling(*cudf::make_collect_list_aggregation()), + *run_rolling(*AGG_COLLECT_LIST), lists_column{{{}, {0}, {0, 1}, {1, 2}, {2, 3}, {3, 4}, {}, {6}, {6, 7}, {7, 8}}, no_nulls()}); } @@ -130,35 +135,42 @@ TEST_F(OffsetRowWindowTest, OffsetRowWindow_0_to_2) CUDF_TEST_EXPECT_COLUMNS_EQUAL( *run_rolling( - *cudf::make_count_aggregation(cudf::null_policy::EXCLUDE)), + *AGG_COUNT_NON_NULL), ints_column{{2, 2, 2, 2, 1, null, 2, 2, 1, null}, nulls_at({5, 9})}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( *run_rolling( - *cudf::make_count_aggregation(cudf::null_policy::INCLUDE)), + *AGG_COUNT_ALL), ints_column{{2, 2, 2, 2, 1, null, 2, 2, 1, null}, nulls_at({5, 9})}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling(*cudf::make_min_aggregation()), + *run_rolling(*AGG_MIN), ints_column{{1, 2, 3, 4, 5, null, 7, 8, 9, null}, nulls_at({5, 9})}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling(*cudf::make_max_aggregation()), + *run_rolling(*AGG_MAX), ints_column{{2, 3, 4, 5, 5, null, 8, 9, 9, null}, nulls_at({5, 9})}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling(*cudf::make_sum_aggregation()), + *run_rolling(*AGG_SUM), bigints_column{{3, 5, 7, 9, 5, null, 15, 17, 9, null}, nulls_at({5, 9})}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling(*cudf::make_collect_list_aggregation()), + *run_rolling(*AGG_COLLECT_LIST), lists_column{{{1, 2}, {2, 3}, {3, 4}, {4, 5}, {5}, {}, {7, 8}, {8, 9}, {9}, {}}, nulls_at({5, 9})}); run_rolling.min_periods(0); CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling( - *cudf::make_count_aggregation(cudf::null_policy::INCLUDE)), + *run_rolling(*AGG_COUNT_NON_NULL), ints_column{{2, 2, 2, 2, 1, 0, 2, 2, 1, 0}, no_nulls()}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling(*AGG_COUNT_ALL), + ints_column{{2, 2, 2, 2, 1, 0, 2, 2, 1, 0}, no_nulls()}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling(*AGG_COLLECT_LIST), + lists_column{{{1, 2}, {2, 3}, {3, 4}, {4, 5}, {5}, {}, {7, 8}, {8, 9}, {9}, {}}, no_nulls}); } From a577d2e5f2e552adad9ba60779fd8d351765c735 Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 5 Sep 2023 15:45:22 -0700 Subject: [PATCH 03/13] Refactor to support ungrouped case. --- cpp/tests/rolling/offset_row_window_test.cpp | 79 ++++++++++---------- 1 file changed, 38 insertions(+), 41 deletions(-) diff --git a/cpp/tests/rolling/offset_row_window_test.cpp b/cpp/tests/rolling/offset_row_window_test.cpp index 956aabb2bbd..dbc6d6c8168 100644 --- a/cpp/tests/rolling/offset_row_window_test.cpp +++ b/cpp/tests/rolling/offset_row_window_test.cpp @@ -47,6 +47,7 @@ struct OffsetRowWindowTest : public cudf::test::BaseFixture { struct rolling_runner { cudf::window_bounds _preceding, _following; cudf::size_type _min_periods; + bool _grouped = true; rolling_runner(cudf::window_bounds const& preceding, cudf::window_bounds const& following, @@ -61,10 +62,18 @@ struct OffsetRowWindowTest : public cudf::test::BaseFixture { return *this; } + rolling_runner& grouped(bool grouped_) + { + _grouped = grouped_; + return *this; + } + std::unique_ptr operator()(cudf::rolling_aggregation const& agg) const { + auto const grouping_keys = + _grouped ? std::vector{_keys} : std::vector{}; return cudf::grouped_rolling_window( - cudf::table_view{{_keys}}, _values, _preceding, _following, _min_periods, agg); + cudf::table_view{grouping_keys}, _values, _preceding, _following, _min_periods, agg); } }; }; @@ -72,34 +81,32 @@ struct OffsetRowWindowTest : public cudf::test::BaseFixture { ints_column const OffsetRowWindowTest::_keys{0, 0, 0, 0, 0, 0, 1, 1, 1, 1}; ints_column const OffsetRowWindowTest::_values{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; -auto const AGG_COUNT_NON_NULL = cudf::make_count_aggregation(cudf::null_policy::EXCLUDE); -auto const AGG_COUNT_ALL = cudf::make_count_aggregation(cudf::null_policy::INCLUDE); -auto const AGG_MIN = cudf::make_min_aggregation(); -auto const AGG_MAX = cudf::make_max_aggregation(); -auto const AGG_SUM = cudf::make_sum_aggregation(); -auto const AGG_COLLECT_LIST = cudf::make_collect_list_aggregation(); +auto const AGG_COUNT_NON_NULL = + cudf::make_count_aggregation(cudf::null_policy::EXCLUDE); +auto const AGG_COUNT_ALL = + cudf::make_count_aggregation(cudf::null_policy::INCLUDE); +auto const AGG_MIN = cudf::make_min_aggregation(); +auto const AGG_MAX = cudf::make_max_aggregation(); +auto const AGG_SUM = cudf::make_sum_aggregation(); +auto const AGG_COLLECT_LIST = cudf::make_collect_list_aggregation(); TEST_F(OffsetRowWindowTest, OffsetRowWindow_3_to_Minus_1) { auto const preceding = cudf::window_bounds::get(3); auto const following = cudf::window_bounds::get(-1); - auto run_rolling = rolling_runner{preceding, following, 1}; + auto run_rolling = rolling_runner{preceding, following}.min_periods(1).grouped(true); - CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling(*AGG_COUNT_NON_NULL), - ints_column{{0, 1, 2, 2, 2, 2, 0, 1, 2, 2}, nulls_at({0, 6})}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_COUNT_NON_NULL), + ints_column{{0, 1, 2, 2, 2, 2, 0, 1, 2, 2}, nulls_at({0, 6})}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling(*AGG_COUNT_ALL), - ints_column{{0, 1, 2, 2, 2, 2, 0, 1, 2, 2}, nulls_at({0, 6})}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_COUNT_ALL), + ints_column{{0, 1, 2, 2, 2, 2, 0, 1, 2, 2}, nulls_at({0, 6})}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling(*AGG_MIN), - ints_column{{null, 0, 0, 1, 2, 3, null, 6, 6, 7}, nulls_at({0, 6})}); + *run_rolling(*AGG_MIN), ints_column{{null, 0, 0, 1, 2, 3, null, 6, 6, 7}, nulls_at({0, 6})}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling(*AGG_MAX), - ints_column{{null, 0, 1, 2, 3, 4, null, 6, 7, 8}, nulls_at({0, 6})}); + *run_rolling(*AGG_MAX), ints_column{{null, 0, 1, 2, 3, 4, null, 6, 7, 8}, nulls_at({0, 6})}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( *run_rolling(*AGG_SUM), @@ -112,15 +119,11 @@ TEST_F(OffsetRowWindowTest, OffsetRowWindow_3_to_Minus_1) run_rolling.min_periods(0); - CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling( - *AGG_COUNT_NON_NULL), - ints_column{{0, 1, 2, 2, 2, 2, 0, 1, 2, 2}, no_nulls()}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_COUNT_NON_NULL), + ints_column{{0, 1, 2, 2, 2, 2, 0, 1, 2, 2}, no_nulls()}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling( - *AGG_COUNT_ALL), - ints_column{{0, 1, 2, 2, 2, 2, 0, 1, 2, 2}, no_nulls()}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_COUNT_ALL), + ints_column{{0, 1, 2, 2, 2, 2, 0, 1, 2, 2}, no_nulls()}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( *run_rolling(*AGG_COLLECT_LIST), @@ -131,25 +134,21 @@ TEST_F(OffsetRowWindowTest, OffsetRowWindow_0_to_2) { auto const preceding = cudf::window_bounds::get(0); auto const following = cudf::window_bounds::get(2); - auto run_rolling = rolling_runner{preceding, following, 1}; + auto run_rolling = rolling_runner{preceding, following}.min_periods(1).grouped(true); CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling( - *AGG_COUNT_NON_NULL), + *run_rolling(*AGG_COUNT_NON_NULL), ints_column{{2, 2, 2, 2, 1, null, 2, 2, 1, null}, nulls_at({5, 9})}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling( - *AGG_COUNT_ALL), + *run_rolling(*AGG_COUNT_ALL), ints_column{{2, 2, 2, 2, 1, null, 2, 2, 1, null}, nulls_at({5, 9})}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling(*AGG_MIN), - ints_column{{1, 2, 3, 4, 5, null, 7, 8, 9, null}, nulls_at({5, 9})}); + *run_rolling(*AGG_MIN), ints_column{{1, 2, 3, 4, 5, null, 7, 8, 9, null}, nulls_at({5, 9})}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling(*AGG_MAX), - ints_column{{2, 3, 4, 5, 5, null, 8, 9, 9, null}, nulls_at({5, 9})}); + *run_rolling(*AGG_MAX), ints_column{{2, 3, 4, 5, 5, null, 8, 9, 9, null}, nulls_at({5, 9})}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( *run_rolling(*AGG_SUM), @@ -162,13 +161,11 @@ TEST_F(OffsetRowWindowTest, OffsetRowWindow_0_to_2) run_rolling.min_periods(0); - CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling(*AGG_COUNT_NON_NULL), - ints_column{{2, 2, 2, 2, 1, 0, 2, 2, 1, 0}, no_nulls()}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_COUNT_NON_NULL), + ints_column{{2, 2, 2, 2, 1, 0, 2, 2, 1, 0}, no_nulls()}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL( - *run_rolling(*AGG_COUNT_ALL), - ints_column{{2, 2, 2, 2, 1, 0, 2, 2, 1, 0}, no_nulls()}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_COUNT_ALL), + ints_column{{2, 2, 2, 2, 1, 0, 2, 2, 1, 0}, no_nulls()}); CUDF_TEST_EXPECT_COLUMNS_EQUAL( *run_rolling(*AGG_COLLECT_LIST), From f2a9e1eb0c20c8d3ae178a24a4e9f8269b998c4b Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 5 Sep 2023 16:19:45 -0700 Subject: [PATCH 04/13] Tests for ungrouped cases. --- cpp/tests/rolling/offset_row_window_test.cpp | 84 +++++++++++++++++++- 1 file changed, 82 insertions(+), 2 deletions(-) diff --git a/cpp/tests/rolling/offset_row_window_test.cpp b/cpp/tests/rolling/offset_row_window_test.cpp index dbc6d6c8168..fa0eba6c1b6 100644 --- a/cpp/tests/rolling/offset_row_window_test.cpp +++ b/cpp/tests/rolling/offset_row_window_test.cpp @@ -90,7 +90,7 @@ auto const AGG_MAX = cudf::make_max_aggregation(); auto const AGG_COLLECT_LIST = cudf::make_collect_list_aggregation(); -TEST_F(OffsetRowWindowTest, OffsetRowWindow_3_to_Minus_1) +TEST_F(OffsetRowWindowTest, OffsetRowWindow_Grouped_3_to_Minus_1) { auto const preceding = cudf::window_bounds::get(3); auto const following = cudf::window_bounds::get(-1); @@ -130,7 +130,47 @@ TEST_F(OffsetRowWindowTest, OffsetRowWindow_3_to_Minus_1) lists_column{{{}, {0}, {0, 1}, {1, 2}, {2, 3}, {3, 4}, {}, {6}, {6, 7}, {7, 8}}, no_nulls()}); } -TEST_F(OffsetRowWindowTest, OffsetRowWindow_0_to_2) +TEST_F(OffsetRowWindowTest, OffsetRowWindow_Ungrouped_3_to_Minus_1) +{ + auto const preceding = cudf::window_bounds::get(3); + auto const following = cudf::window_bounds::get(-1); + auto run_rolling = rolling_runner{preceding, following}.min_periods(1).grouped(false); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_COUNT_NON_NULL), + ints_column{{0, 1, 2, 2, 2, 2, 2, 2, 2, 2}, nulls_at({0})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_COUNT_ALL), + ints_column{{0, 1, 2, 2, 2, 2, 2, 2, 2, 2}, nulls_at({0})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_MIN), + ints_column{{null, 0, 0, 1, 2, 3, 4, 5, 6, 7}, nulls_at({0})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_MAX), + ints_column{{null, 0, 1, 2, 3, 4, 5, 6, 7, 8}, nulls_at({0})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling(*AGG_SUM), bigints_column{{null, 0, 1, 3, 5, 7, 9, 11, 13, 15}, nulls_at({0})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling(*AGG_COLLECT_LIST), + lists_column{{{}, {0}, {0, 1}, {1, 2}, {2, 3}, {3, 4}, {4, 5}, {5, 6}, {6, 7}, {7, 8}}, + nulls_at({0})}); + + run_rolling.min_periods(0); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_COUNT_NON_NULL), + ints_column{{0, 1, 2, 2, 2, 2, 2, 2, 2, 2}, no_nulls()}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_COUNT_ALL), + ints_column{{0, 1, 2, 2, 2, 2, 2, 2, 2, 2}, no_nulls()}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling(*AGG_COLLECT_LIST), + lists_column{{{}, {0}, {0, 1}, {1, 2}, {2, 3}, {3, 4}, {4, 5}, {5, 6}, {6, 7}, {7, 8}}, + no_nulls()}); +} + +TEST_F(OffsetRowWindowTest, OffsetRowWindow_Grouped_0_to_2) { auto const preceding = cudf::window_bounds::get(0); auto const following = cudf::window_bounds::get(2); @@ -171,3 +211,43 @@ TEST_F(OffsetRowWindowTest, OffsetRowWindow_0_to_2) *run_rolling(*AGG_COLLECT_LIST), lists_column{{{1, 2}, {2, 3}, {3, 4}, {4, 5}, {5}, {}, {7, 8}, {8, 9}, {9}, {}}, no_nulls}); } + +TEST_F(OffsetRowWindowTest, OffsetRowWindow_Ungrouped_0_to_2) +{ + auto const preceding = cudf::window_bounds::get(0); + auto const following = cudf::window_bounds::get(2); + auto run_rolling = rolling_runner{preceding, following}.min_periods(1).grouped(false); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_COUNT_NON_NULL), + ints_column{{2, 2, 2, 2, 2, 2, 2, 2, 1, null}, nulls_at({9})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_COUNT_ALL), + ints_column{{2, 2, 2, 2, 2, 2, 2, 2, 1, null}, nulls_at({9})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_MIN), + ints_column{{1, 2, 3, 4, 5, 6, 7, 8, 9, null}, nulls_at({9})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_MAX), + ints_column{{2, 3, 4, 5, 6, 7, 8, 9, 9, null}, nulls_at({9})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling(*AGG_SUM), bigints_column{{3, 5, 7, 9, 11, 13, 15, 17, 9, null}, nulls_at({9})}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling(*AGG_COLLECT_LIST), + lists_column{{{1, 2}, {2, 3}, {3, 4}, {4, 5}, {5, 6}, {6, 7}, {7, 8}, {8, 9}, {9}, {}}, + nulls_at({9})}); + + run_rolling.min_periods(0); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_COUNT_NON_NULL), + ints_column{{2, 2, 2, 2, 2, 2, 2, 2, 1, 0}, no_nulls()}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*run_rolling(*AGG_COUNT_ALL), + ints_column{{2, 2, 2, 2, 2, 2, 2, 2, 1, 0}, no_nulls()}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *run_rolling(*AGG_COLLECT_LIST), + lists_column{{{1, 2}, {2, 3}, {3, 4}, {4, 5}, {5, 6}, {6, 7}, {7, 8}, {8, 9}, {9}, {}}, + no_nulls}); +} From e616ecc877f3c1e3f1b4e55e56d1204922d6f093 Mon Sep 17 00:00:00 2001 From: MithunR Date: Sat, 9 Sep 2023 20:41:08 -0700 Subject: [PATCH 05/13] Removed unused includes. --- cpp/tests/rolling/grouped_rolling_test.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/cpp/tests/rolling/grouped_rolling_test.cpp b/cpp/tests/rolling/grouped_rolling_test.cpp index af19c4e7e84..7dd72ace53c 100644 --- a/cpp/tests/rolling/grouped_rolling_test.cpp +++ b/cpp/tests/rolling/grouped_rolling_test.cpp @@ -33,9 +33,6 @@ #include #include -#include -#include - const std::string cuda_func{ R"***( template From 6d65898ad5ecefe914d76f988d2b028c025ab7df Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 11 Sep 2023 10:40:26 -0700 Subject: [PATCH 06/13] Tentative fix for off-by-1 reads of the next group. --- .../rolling/detail/rolling_fixed_window.cu | 39 +++++++++++ cpp/src/rolling/grouped_rolling.cu | 69 +++++++++++++------ cpp/tests/rolling/offset_row_window_test.cpp | 32 +++++++++ 3 files changed, 119 insertions(+), 21 deletions(-) diff --git a/cpp/src/rolling/detail/rolling_fixed_window.cu b/cpp/src/rolling/detail/rolling_fixed_window.cu index 93f9ee3a206..b43973abddc 100644 --- a/cpp/src/rolling/detail/rolling_fixed_window.cu +++ b/cpp/src/rolling/detail/rolling_fixed_window.cu @@ -19,8 +19,31 @@ #include #include +#include + #include +// TODO: DELETEME! +namespace +{ + using namespace cudf; + template + std::unique_ptr expand_to_column(Calculator const& calc, + size_type const& num_rows, + rmm::cuda_stream_view stream) + { + auto window_column = cudf::make_numeric_column( + cudf::data_type{type_to_id()}, num_rows, cudf::mask_state::UNALLOCATED, stream); + + auto begin = cudf::detail::make_counting_transform_iterator(0, calc); + + thrust::copy_n( + rmm::exec_policy(stream), begin, num_rows, window_column->mutable_view().data()); + + return window_column; + } +} + namespace cudf::detail { // Applies a fixed-size rolling window function to the values in a column. @@ -60,6 +83,7 @@ std::unique_ptr rolling_window(column_view const& input, // Clamp preceding/following to column boundaries. // E.g. If preceding_window == 2, then for a column of 5 elements, preceding_window will be: // [1, 2, 2, 2, 1] + // TODO: Handle capping preceding/following for negative values. auto const preceding_window_begin = cudf::detail::make_counting_transform_iterator( 0, [preceding_window] __device__(size_type i) { return thrust::min(i + 1, preceding_window); }); @@ -68,6 +92,21 @@ std::unique_ptr rolling_window(column_view const& input, return thrust::min(col_size - i - 1, following_window); }); + // TODO: DELETEME + auto const preceding_calc = + [preceding_window] __device__(size_type i) { return thrust::min(i + 1, preceding_window); }; + auto const tmp_preceding = expand_to_column(preceding_calc, input.size(), stream); + std::cout << "Ungrouped: Preceding: " << std::endl; + cudf::test::print(*tmp_preceding); + + auto const following_calc = + [col_size = input.size(), following_window] __device__(size_type i) { + return thrust::min(col_size - i - 1, following_window); + }; + auto const tmp_following = expand_to_column(following_calc, input.size(), stream); + std::cout << "Ungrouped: Following: " << std::endl; + cudf::test::print(*tmp_following); + return cudf::detail::rolling_window(input, default_outputs, preceding_window_begin, diff --git a/cpp/src/rolling/grouped_rolling.cu b/cpp/src/rolling/grouped_rolling.cu index 654358dee1e..4919c9f659e 100644 --- a/cpp/src/rolling/grouped_rolling.cu +++ b/cpp/src/rolling/grouped_rolling.cu @@ -29,6 +29,8 @@ #include #include +#include + #include #include #include @@ -92,6 +94,24 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, mr); } +namespace { +template +std::unique_ptr expand_to_column(Calculator const& calc, + size_type const& num_rows, + rmm::cuda_stream_view stream) +{ + auto window_column = cudf::make_numeric_column( + cudf::data_type{type_to_id()}, num_rows, cudf::mask_state::UNALLOCATED, stream); + + auto begin = cudf::detail::make_counting_transform_iterator(0, calc); + + thrust::copy_n( + rmm::exec_policy(stream), begin, num_rows, window_column->mutable_view().data()); + + return window_column; +} +} + namespace detail { std::unique_ptr grouped_rolling_window(table_view const& group_keys, @@ -164,19 +184,41 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, d_group_labels = group_labels.data(), preceding_window] __device__(size_type idx) { auto group_label = d_group_labels[idx]; - auto group_start = d_group_offsets[group_label]; - return thrust::minimum{}(preceding_window, - idx - group_start + 1); // Preceding includes current row. + if (preceding_window < 1) { // where 1 indicates only the current row. + // 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 + auto group_end = d_group_offsets[group_label + 1]; + return thrust::maximum{}(preceding_window, +// -(group_end - 1 - idx) + 1); + -(group_end - 1 - idx)); + } + else { + auto group_start = d_group_offsets[group_label]; + return thrust::minimum{}(preceding_window, + idx - group_start + 1); // Preceding includes current row. + } }; + std::cout << "Preceding column: " << std::endl; + auto const tmp_preceding = expand_to_column(preceding_calculator, input.size(), stream); + cudf::test::print(*tmp_preceding); + auto following_calculator = [d_group_offsets = group_offsets.data(), d_group_labels = group_labels.data(), following_window] __device__(size_type idx) { auto group_label = d_group_labels[idx]; - auto group_end = d_group_offsets[group_label + 1]; // Cannot fall off the end, since offsets - // is capped with `input.size()`. - return thrust::minimum{}(following_window, (group_end - 1) - idx); + if (following_window < 0) { + auto group_start = d_group_offsets[group_label]; + return thrust::maximum{}(following_window, -(idx - group_start)-1); + } + else { + auto group_end = d_group_offsets[group_label + 1]; // Cannot fall off the end, since offsets + // is capped with `input.size()`. + return thrust::minimum{}(following_window, (group_end - 1) - idx); + } }; + std::cout << "Following column: " << std::endl; + auto const tmp_following = expand_to_column(following_calculator, input.size(), stream); + cudf::test::print(*tmp_following); if (aggr.kind == aggregation::CUDA || aggr.kind == aggregation::PTX) { cudf::detail::preceding_window_wrapper grouped_preceding_window{ @@ -324,21 +366,6 @@ std::tuple get_null_bounds_for_orderby_column( : std::make_tuple(num_rows - num_nulls, num_rows); } -template -std::unique_ptr expand_to_column(Calculator const& calc, - size_type const& num_rows, - rmm::cuda_stream_view stream) -{ - auto window_column = cudf::make_numeric_column( - cudf::data_type{type_to_id()}, num_rows, cudf::mask_state::UNALLOCATED, stream); - - auto begin = cudf::detail::make_counting_transform_iterator(0, calc); - - thrust::copy_n( - rmm::exec_policy(stream), begin, num_rows, window_column->mutable_view().data()); - - return window_column; -} /// Range window computation, with /// 1. no grouping keys specified diff --git a/cpp/tests/rolling/offset_row_window_test.cpp b/cpp/tests/rolling/offset_row_window_test.cpp index fa0eba6c1b6..0f86b7c8f19 100644 --- a/cpp/tests/rolling/offset_row_window_test.cpp +++ b/cpp/tests/rolling/offset_row_window_test.cpp @@ -251,3 +251,35 @@ TEST_F(OffsetRowWindowTest, OffsetRowWindow_Ungrouped_0_to_2) lists_column{{{1, 2}, {2, 3}, {3, 4}, {4, 5}, {5, 6}, {6, 7}, {7, 8}, {8, 9}, {9}, {}}, no_nulls}); } + +TEST_F(OffsetRowWindowTest, Problematic) +{ + auto grp_iter = thrust::make_transform_iterator(thrust::make_counting_iterator(0), [](auto const& i) { + if (i < 10) return 1; + if (i < 20) return 2; + return 3; + }); + auto const grp = ints_column(grp_iter, grp_iter + 30); + auto const agg = ints_column(grp_iter, grp_iter + 30); + { + auto const results = cudf::grouped_rolling_window( + cudf::table_view{{grp}}, + agg, + -1, 4, 1, *cudf::make_max_aggregation() + ); + std::cout << "Max(-1, 4): " << std::endl; + cudf::test::print(*results); + std::cout << std::endl; + } + { + auto const results = cudf::grouped_rolling_window( + cudf::table_view{{grp}}, + agg, + -1, 4, 1, *cudf::make_min_aggregation() + ); + + std::cout << "Min(-1, 4): " << std::endl; + cudf::test::print(*results); + std::cout << std::endl; + } +} From eaff7dd7b4215da821427f455f190b60ea2e7c5f Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 11 Sep 2023 12:59:32 -0700 Subject: [PATCH 07/13] Materialized preceding/following offsets for fixed windows. Also, removed prints. --- cpp/src/rolling/detail/rolling.cuh | 21 ++++++-- .../rolling/detail/rolling_fixed_window.cu | 41 ++------------- cpp/src/rolling/grouped_rolling.cu | 52 +++++-------------- 3 files changed, 33 insertions(+), 81 deletions(-) diff --git a/cpp/src/rolling/detail/rolling.cuh b/cpp/src/rolling/detail/rolling.cuh index 6992f4f0a27..75ff8eb3519 100644 --- a/cpp/src/rolling/detail/rolling.cuh +++ b/cpp/src/rolling/detail/rolling.cuh @@ -70,7 +70,22 @@ namespace cudf { namespace detail { -namespace { // anonymous +/// Helper function to materialize preceding/following offsets. +template +std::unique_ptr expand_to_column(Calculator const& calc, + size_type const& num_rows, + rmm::cuda_stream_view stream) +{ + auto window_column = cudf::make_numeric_column( + cudf::data_type{type_to_id()}, num_rows, cudf::mask_state::UNALLOCATED, stream); + + auto begin = cudf::detail::make_counting_transform_iterator(0, calc); + + thrust::copy_n( + rmm::exec_policy(stream), begin, num_rows, window_column->mutable_view().data()); + + return window_column; +} /** * @brief Operator for applying a generic (non-specialized) rolling aggregation on a single window. @@ -461,7 +476,7 @@ struct agg_specific_empty_output { } }; -std::unique_ptr empty_output_for_rolling_aggregation(column_view const& input, +static std::unique_ptr empty_output_for_rolling_aggregation(column_view const& input, rolling_aggregation const& agg) { // TODO: @@ -1215,8 +1230,6 @@ struct dispatch_rolling { } }; -} // namespace - // Applies a user-defined rolling window function to the values in a column. template std::unique_ptr rolling_window_udf(column_view const& input, diff --git a/cpp/src/rolling/detail/rolling_fixed_window.cu b/cpp/src/rolling/detail/rolling_fixed_window.cu index b43973abddc..e70c1cf428f 100644 --- a/cpp/src/rolling/detail/rolling_fixed_window.cu +++ b/cpp/src/rolling/detail/rolling_fixed_window.cu @@ -23,27 +23,6 @@ #include -// TODO: DELETEME! -namespace -{ - using namespace cudf; - template - std::unique_ptr expand_to_column(Calculator const& calc, - size_type const& num_rows, - rmm::cuda_stream_view stream) - { - auto window_column = cudf::make_numeric_column( - cudf::data_type{type_to_id()}, num_rows, cudf::mask_state::UNALLOCATED, stream); - - auto begin = cudf::detail::make_counting_transform_iterator(0, calc); - - thrust::copy_n( - rmm::exec_policy(stream), begin, num_rows, window_column->mutable_view().data()); - - return window_column; - } -} - namespace cudf::detail { // Applies a fixed-size rolling window function to the values in a column. @@ -84,33 +63,21 @@ std::unique_ptr rolling_window(column_view const& input, // E.g. If preceding_window == 2, then for a column of 5 elements, preceding_window will be: // [1, 2, 2, 2, 1] // TODO: Handle capping preceding/following for negative values. - auto const preceding_window_begin = cudf::detail::make_counting_transform_iterator( - 0, - [preceding_window] __device__(size_type i) { return thrust::min(i + 1, preceding_window); }); - auto const following_window_begin = cudf::detail::make_counting_transform_iterator( - 0, [col_size = input.size(), following_window] __device__(size_type i) { - return thrust::min(col_size - i - 1, following_window); - }); - // TODO: DELETEME auto const preceding_calc = [preceding_window] __device__(size_type i) { return thrust::min(i + 1, preceding_window); }; - auto const tmp_preceding = expand_to_column(preceding_calc, input.size(), stream); - std::cout << "Ungrouped: Preceding: " << std::endl; - cudf::test::print(*tmp_preceding); auto const following_calc = [col_size = input.size(), following_window] __device__(size_type i) { return thrust::min(col_size - i - 1, following_window); }; - auto const tmp_following = expand_to_column(following_calc, input.size(), stream); - std::cout << "Ungrouped: Following: " << std::endl; - cudf::test::print(*tmp_following); + auto const preceding_column = expand_to_column(preceding_calc, input.size(), stream); + auto const following_column = expand_to_column(following_calc, input.size(), stream); return cudf::detail::rolling_window(input, default_outputs, - preceding_window_begin, - following_window_begin, + preceding_column->view().begin(), + following_column->view().begin(), min_periods, agg, stream, diff --git a/cpp/src/rolling/grouped_rolling.cu b/cpp/src/rolling/grouped_rolling.cu index 4919c9f659e..d68b21f1b3a 100644 --- a/cpp/src/rolling/grouped_rolling.cu +++ b/cpp/src/rolling/grouped_rolling.cu @@ -29,10 +29,7 @@ #include #include -#include - #include -#include #include #include #include @@ -94,24 +91,6 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, mr); } -namespace { -template -std::unique_ptr expand_to_column(Calculator const& calc, - size_type const& num_rows, - rmm::cuda_stream_view stream) -{ - auto window_column = cudf::make_numeric_column( - cudf::data_type{type_to_id()}, num_rows, cudf::mask_state::UNALLOCATED, stream); - - auto begin = cudf::detail::make_counting_transform_iterator(0, calc); - - thrust::copy_n( - rmm::exec_policy(stream), begin, num_rows, window_column->mutable_view().data()); - - return window_column; -} -} - namespace detail { std::unique_ptr grouped_rolling_window(table_view const& group_keys, @@ -185,10 +164,8 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, preceding_window] __device__(size_type idx) { auto group_label = d_group_labels[idx]; if (preceding_window < 1) { // where 1 indicates only the current row. - // 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 auto group_end = d_group_offsets[group_label + 1]; return thrust::maximum{}(preceding_window, -// -(group_end - 1 - idx) + 1); -(group_end - 1 - idx)); } else { @@ -198,10 +175,6 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, } }; - std::cout << "Preceding column: " << std::endl; - auto const tmp_preceding = expand_to_column(preceding_calculator, input.size(), stream); - cudf::test::print(*tmp_preceding); - auto following_calculator = [d_group_offsets = group_offsets.data(), d_group_labels = group_labels.data(), following_window] __device__(size_type idx) { @@ -216,9 +189,6 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, return thrust::minimum{}(following_window, (group_end - 1) - idx); } }; - std::cout << "Following column: " << std::endl; - auto const tmp_following = expand_to_column(following_calculator, input.size(), stream); - cudf::test::print(*tmp_following); if (aggr.kind == aggregation::CUDA || aggr.kind == aggregation::PTX) { cudf::detail::preceding_window_wrapper grouped_preceding_window{ @@ -237,11 +207,13 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, stream, mr); } else { + auto const preceding_column = cudf::detail::expand_to_column(preceding_calculator, input.size(), stream); + auto const following_column = cudf::detail::expand_to_column(following_calculator, input.size(), stream); return cudf::detail::rolling_window( input, default_outputs, - cudf::detail::make_counting_transform_iterator(0, preceding_calculator), - cudf::detail::make_counting_transform_iterator(0, following_calculator), + preceding_column->view().begin(), + following_column->view().begin(), min_periods, aggr, stream, @@ -420,7 +392,7 @@ std::unique_ptr range_window_ASC(column_view const& input, 1; // Add 1, for `preceding` to account for current row. }; - auto const preceding_column = expand_to_column(preceding_calculator, input.size(), stream); + auto const preceding_column = cudf::detail::expand_to_column(preceding_calculator, input.size(), stream); auto const following_calculator = [nulls_begin_idx = h_nulls_begin_idx, @@ -455,7 +427,7 @@ std::unique_ptr range_window_ASC(column_view const& input, 1; }; - auto const following_column = expand_to_column(following_calculator, input.size(), stream); + auto const following_column = cudf::detail::expand_to_column(following_calculator, input.size(), stream); return cudf::detail::rolling_window( input, preceding_column->view(), following_column->view(), min_periods, aggr, stream, mr); @@ -600,7 +572,7 @@ std::unique_ptr range_window_ASC(column_view const& input, 1; // Add 1, for `preceding` to account for current row. }; - auto const preceding_column = expand_to_column(preceding_calculator, input.size(), stream); + auto const preceding_column = cudf::detail::expand_to_column(preceding_calculator, input.size(), stream); auto const following_calculator = [d_group_offsets = group_offsets.data(), @@ -646,7 +618,7 @@ std::unique_ptr range_window_ASC(column_view const& input, 1; }; - auto const following_column = expand_to_column(following_calculator, input.size(), stream); + auto const following_column = cudf::detail::expand_to_column(following_calculator, input.size(), stream); return cudf::detail::rolling_window( input, preceding_column->view(), following_column->view(), min_periods, aggr, stream, mr); @@ -705,7 +677,7 @@ std::unique_ptr range_window_DESC(column_view const& input, 1; // Add 1, for `preceding` to account for current row. }; - auto const preceding_column = expand_to_column(preceding_calculator, input.size(), stream); + auto const preceding_column = cudf::detail::expand_to_column(preceding_calculator, input.size(), stream); auto const following_calculator = [nulls_begin_idx = h_nulls_begin_idx, @@ -740,7 +712,7 @@ std::unique_ptr range_window_DESC(column_view const& input, 1; }; - auto const following_column = expand_to_column(following_calculator, input.size(), stream); + auto const following_column = cudf::detail::expand_to_column(following_calculator, input.size(), stream); return cudf::detail::rolling_window( input, preceding_column->view(), following_column->view(), min_periods, aggr, stream, mr); @@ -804,7 +776,7 @@ std::unique_ptr range_window_DESC(column_view const& input, 1; // Add 1, for `preceding` to account for current row. }; - auto const preceding_column = expand_to_column(preceding_calculator, input.size(), stream); + auto const preceding_column = cudf::detail::expand_to_column(preceding_calculator, input.size(), stream); auto const following_calculator = [d_group_offsets = group_offsets.data(), @@ -847,7 +819,7 @@ std::unique_ptr range_window_DESC(column_view const& input, 1; }; - auto const following_column = expand_to_column(following_calculator, input.size(), stream); + auto const following_column = cudf::detail::expand_to_column(following_calculator, input.size(), stream); if (aggr.kind == aggregation::CUDA || aggr.kind == aggregation::PTX) { CUDF_FAIL("Ranged rolling window does NOT (yet) support UDF."); From c3e7f8b106236660e620fdebc4d419ebde08b6b4 Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 11 Sep 2023 13:00:29 -0700 Subject: [PATCH 08/13] Formatting. --- cpp/src/rolling/detail/rolling.cuh | 18 +++--- .../rolling/detail/rolling_fixed_window.cu | 13 ++-- cpp/src/rolling/grouped_rolling.cu | 63 ++++++++++--------- cpp/tests/rolling/offset_row_window_test.cpp | 35 ++++++----- 4 files changed, 70 insertions(+), 59 deletions(-) diff --git a/cpp/src/rolling/detail/rolling.cuh b/cpp/src/rolling/detail/rolling.cuh index 75ff8eb3519..0648ef3d30f 100644 --- a/cpp/src/rolling/detail/rolling.cuh +++ b/cpp/src/rolling/detail/rolling.cuh @@ -73,18 +73,18 @@ namespace detail { /// Helper function to materialize preceding/following offsets. template std::unique_ptr expand_to_column(Calculator const& calc, - size_type const& num_rows, - rmm::cuda_stream_view stream) + size_type const& num_rows, + rmm::cuda_stream_view stream) { - auto window_column = cudf::make_numeric_column( - cudf::data_type{type_to_id()}, num_rows, cudf::mask_state::UNALLOCATED, stream); + auto window_column = cudf::make_numeric_column( + cudf::data_type{type_to_id()}, num_rows, cudf::mask_state::UNALLOCATED, stream); - auto begin = cudf::detail::make_counting_transform_iterator(0, calc); + auto begin = cudf::detail::make_counting_transform_iterator(0, calc); - thrust::copy_n( - rmm::exec_policy(stream), begin, num_rows, window_column->mutable_view().data()); + thrust::copy_n( + rmm::exec_policy(stream), begin, num_rows, window_column->mutable_view().data()); - return window_column; + return window_column; } /** @@ -477,7 +477,7 @@ struct agg_specific_empty_output { }; static std::unique_ptr empty_output_for_rolling_aggregation(column_view const& input, - rolling_aggregation const& agg) + rolling_aggregation const& agg) { // TODO: // Ideally, for UDF aggregations, the returned column would match diff --git a/cpp/src/rolling/detail/rolling_fixed_window.cu b/cpp/src/rolling/detail/rolling_fixed_window.cu index e70c1cf428f..c34a4051cb1 100644 --- a/cpp/src/rolling/detail/rolling_fixed_window.cu +++ b/cpp/src/rolling/detail/rolling_fixed_window.cu @@ -64,13 +64,14 @@ std::unique_ptr rolling_window(column_view const& input, // [1, 2, 2, 2, 1] // TODO: Handle capping preceding/following for negative values. - auto const preceding_calc = - [preceding_window] __device__(size_type i) { return thrust::min(i + 1, preceding_window); }; + auto const preceding_calc = [preceding_window] __device__(size_type i) { + return thrust::min(i + 1, preceding_window); + }; - auto const following_calc = - [col_size = input.size(), following_window] __device__(size_type i) { - return thrust::min(col_size - i - 1, following_window); - }; + auto const following_calc = [col_size = input.size(), + following_window] __device__(size_type i) { + return thrust::min(col_size - i - 1, following_window); + }; auto const preceding_column = expand_to_column(preceding_calc, input.size(), stream); auto const following_column = expand_to_column(following_calc, input.size(), stream); diff --git a/cpp/src/rolling/grouped_rolling.cu b/cpp/src/rolling/grouped_rolling.cu index d68b21f1b3a..fdb38b2ec07 100644 --- a/cpp/src/rolling/grouped_rolling.cu +++ b/cpp/src/rolling/grouped_rolling.cu @@ -163,12 +163,10 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, d_group_labels = group_labels.data(), preceding_window] __device__(size_type idx) { auto group_label = d_group_labels[idx]; - if (preceding_window < 1) { // where 1 indicates only the current row. + if (preceding_window < 1) { // where 1 indicates only the current row. auto group_end = d_group_offsets[group_label + 1]; - return thrust::maximum{}(preceding_window, - -(group_end - 1 - idx)); - } - else { + return thrust::maximum{}(preceding_window, -(group_end - 1 - idx)); + } else { auto group_start = d_group_offsets[group_label]; return thrust::minimum{}(preceding_window, idx - group_start + 1); // Preceding includes current row. @@ -181,11 +179,10 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, auto group_label = d_group_labels[idx]; if (following_window < 0) { auto group_start = d_group_offsets[group_label]; - return thrust::maximum{}(following_window, -(idx - group_start)-1); - } - else { + return thrust::maximum{}(following_window, -(idx - group_start) - 1); + } else { auto group_end = d_group_offsets[group_label + 1]; // Cannot fall off the end, since offsets - // is capped with `input.size()`. + // is capped with `input.size()`. return thrust::minimum{}(following_window, (group_end - 1) - idx); } }; @@ -207,17 +204,18 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, stream, mr); } else { - auto const preceding_column = cudf::detail::expand_to_column(preceding_calculator, input.size(), stream); - auto const following_column = cudf::detail::expand_to_column(following_calculator, input.size(), stream); - return cudf::detail::rolling_window( - input, - default_outputs, - preceding_column->view().begin(), - following_column->view().begin(), - min_periods, - aggr, - stream, - mr); + auto const preceding_column = + cudf::detail::expand_to_column(preceding_calculator, input.size(), stream); + auto const following_column = + cudf::detail::expand_to_column(following_calculator, input.size(), stream); + return cudf::detail::rolling_window(input, + default_outputs, + preceding_column->view().begin(), + following_column->view().begin(), + min_periods, + aggr, + stream, + mr); } } @@ -338,7 +336,6 @@ std::tuple get_null_bounds_for_orderby_column( : std::make_tuple(num_rows - num_nulls, num_rows); } - /// Range window computation, with /// 1. no grouping keys specified /// 2. rows in ASCENDING order. @@ -392,7 +389,8 @@ std::unique_ptr range_window_ASC(column_view const& input, 1; // Add 1, for `preceding` to account for current row. }; - auto const preceding_column = cudf::detail::expand_to_column(preceding_calculator, input.size(), stream); + auto const preceding_column = + cudf::detail::expand_to_column(preceding_calculator, input.size(), stream); auto const following_calculator = [nulls_begin_idx = h_nulls_begin_idx, @@ -427,7 +425,8 @@ std::unique_ptr range_window_ASC(column_view const& input, 1; }; - auto const following_column = cudf::detail::expand_to_column(following_calculator, input.size(), stream); + auto const following_column = + cudf::detail::expand_to_column(following_calculator, input.size(), stream); return cudf::detail::rolling_window( input, preceding_column->view(), following_column->view(), min_periods, aggr, stream, mr); @@ -572,7 +571,8 @@ std::unique_ptr range_window_ASC(column_view const& input, 1; // Add 1, for `preceding` to account for current row. }; - auto const preceding_column = cudf::detail::expand_to_column(preceding_calculator, input.size(), stream); + auto const preceding_column = + cudf::detail::expand_to_column(preceding_calculator, input.size(), stream); auto const following_calculator = [d_group_offsets = group_offsets.data(), @@ -618,7 +618,8 @@ std::unique_ptr range_window_ASC(column_view const& input, 1; }; - auto const following_column = cudf::detail::expand_to_column(following_calculator, input.size(), stream); + auto const following_column = + cudf::detail::expand_to_column(following_calculator, input.size(), stream); return cudf::detail::rolling_window( input, preceding_column->view(), following_column->view(), min_periods, aggr, stream, mr); @@ -677,7 +678,8 @@ std::unique_ptr range_window_DESC(column_view const& input, 1; // Add 1, for `preceding` to account for current row. }; - auto const preceding_column = cudf::detail::expand_to_column(preceding_calculator, input.size(), stream); + auto const preceding_column = + cudf::detail::expand_to_column(preceding_calculator, input.size(), stream); auto const following_calculator = [nulls_begin_idx = h_nulls_begin_idx, @@ -712,7 +714,8 @@ std::unique_ptr range_window_DESC(column_view const& input, 1; }; - auto const following_column = cudf::detail::expand_to_column(following_calculator, input.size(), stream); + auto const following_column = + cudf::detail::expand_to_column(following_calculator, input.size(), stream); return cudf::detail::rolling_window( input, preceding_column->view(), following_column->view(), min_periods, aggr, stream, mr); @@ -776,7 +779,8 @@ std::unique_ptr range_window_DESC(column_view const& input, 1; // Add 1, for `preceding` to account for current row. }; - auto const preceding_column = cudf::detail::expand_to_column(preceding_calculator, input.size(), stream); + auto const preceding_column = + cudf::detail::expand_to_column(preceding_calculator, input.size(), stream); auto const following_calculator = [d_group_offsets = group_offsets.data(), @@ -819,7 +823,8 @@ std::unique_ptr range_window_DESC(column_view const& input, 1; }; - auto const following_column = cudf::detail::expand_to_column(following_calculator, input.size(), stream); + auto const following_column = + cudf::detail::expand_to_column(following_calculator, input.size(), stream); if (aggr.kind == aggregation::CUDA || aggr.kind == aggregation::PTX) { CUDF_FAIL("Ranged rolling window does NOT (yet) support UDF."); diff --git a/cpp/tests/rolling/offset_row_window_test.cpp b/cpp/tests/rolling/offset_row_window_test.cpp index 0f86b7c8f19..05de2e9024b 100644 --- a/cpp/tests/rolling/offset_row_window_test.cpp +++ b/cpp/tests/rolling/offset_row_window_test.cpp @@ -254,29 +254,34 @@ TEST_F(OffsetRowWindowTest, OffsetRowWindow_Ungrouped_0_to_2) TEST_F(OffsetRowWindowTest, Problematic) { - auto grp_iter = thrust::make_transform_iterator(thrust::make_counting_iterator(0), [](auto const& i) { - if (i < 10) return 1; - if (i < 20) return 2; - return 3; - }); + auto grp_iter = + thrust::make_transform_iterator(thrust::make_counting_iterator(0), [](auto const& i) { + if (i < 10) return 1; + if (i < 20) return 2; + return 3; + }); auto const grp = ints_column(grp_iter, grp_iter + 30); auto const agg = ints_column(grp_iter, grp_iter + 30); { - auto const results = cudf::grouped_rolling_window( - cudf::table_view{{grp}}, - agg, - -1, 4, 1, *cudf::make_max_aggregation() - ); + auto const results = + cudf::grouped_rolling_window(cudf::table_view{{grp}}, + agg, + -1, + 4, + 1, + *cudf::make_max_aggregation()); std::cout << "Max(-1, 4): " << std::endl; cudf::test::print(*results); std::cout << std::endl; } { - auto const results = cudf::grouped_rolling_window( - cudf::table_view{{grp}}, - agg, - -1, 4, 1, *cudf::make_min_aggregation() - ); + auto const results = + cudf::grouped_rolling_window(cudf::table_view{{grp}}, + agg, + -1, + 4, + 1, + *cudf::make_min_aggregation()); std::cout << "Min(-1, 4): " << std::endl; cudf::test::print(*results); From 589f2a57c4a979c832cf733d030b51cdc097ac54 Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 18 Sep 2023 13:13:51 -0700 Subject: [PATCH 09/13] Moved if-check for negative windows to constexpr. --- .../rolling/detail/rolling_fixed_window.cu | 1 - cpp/src/rolling/grouped_rolling.cu | 135 ++++++++++++++---- 2 files changed, 105 insertions(+), 31 deletions(-) diff --git a/cpp/src/rolling/detail/rolling_fixed_window.cu b/cpp/src/rolling/detail/rolling_fixed_window.cu index c34a4051cb1..e951db955e5 100644 --- a/cpp/src/rolling/detail/rolling_fixed_window.cu +++ b/cpp/src/rolling/detail/rolling_fixed_window.cu @@ -62,7 +62,6 @@ std::unique_ptr rolling_window(column_view const& input, // Clamp preceding/following to column boundaries. // E.g. If preceding_window == 2, then for a column of 5 elements, preceding_window will be: // [1, 2, 2, 2, 1] - // TODO: Handle capping preceding/following for negative values. auto const preceding_calc = [preceding_window] __device__(size_type i) { return thrust::min(i + 1, preceding_window); diff --git a/cpp/src/rolling/grouped_rolling.cu b/cpp/src/rolling/grouped_rolling.cu index fdb38b2ec07..6e69b5157c2 100644 --- a/cpp/src/rolling/grouped_rolling.cu +++ b/cpp/src/rolling/grouped_rolling.cu @@ -93,6 +93,109 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, namespace detail { +/// Preceding window calculation functor. +template +struct row_based_preceding_calc { + cudf::size_type const* _group_offsets_begin; + cudf::size_type const* _group_labels_begin; + cudf::size_type const _preceding_window; + + row_based_preceding_calc(rmm::device_uvector const& group_offsets, + rmm::device_uvector const& group_labels, + cudf::size_type const& preceding_window) + : _group_offsets_begin(group_offsets.data()), + _group_labels_begin(group_labels.data()), + _preceding_window(preceding_window) + { + } + + __device__ cudf::size_type operator()(cudf::size_type const& idx) const + { + auto group_label = _group_labels_begin[idx]; + if constexpr (preceding_less_than_1) { // where 1 indicates only the current row. + auto group_end = _group_offsets_begin[group_label + 1]; + return thrust::maximum{}(_preceding_window, -(group_end - 1 - idx)); + } else { + auto group_start = _group_offsets_begin[group_label]; + return thrust::minimum{}(_preceding_window, + idx - group_start + 1); // Preceding includes current row. + } + } +}; + +/// Helper to materialize preceding-window column, corrected to respect group boundaries. +/// E.g. If preceding window == 5, then, +/// 1. For the first row in the group, the preceding is set to 1, +/// 2. For the next row in the group, preceding is set to 2, etc. +std::unique_ptr make_preceding_column( + rmm::device_uvector const& group_offsets, + rmm::device_uvector const& group_labels, + cudf::size_type const& preceding_window, + cudf::size_type const& num_rows, + rmm::cuda_stream_view stream) +{ + if (preceding_window < 1) { + auto const calc = row_based_preceding_calc(group_offsets, group_labels, preceding_window); + return cudf::detail::expand_to_column(calc, num_rows, stream); + } else { + auto const calc = + row_based_preceding_calc(group_offsets, group_labels, preceding_window); + return cudf::detail::expand_to_column(calc, num_rows, stream); + } +} + +/// Following window calculation functor. +template +struct row_based_following_calc { + cudf::size_type const* _group_offsets_begin; + cudf::size_type const* _group_labels_begin; + cudf::size_type const _following_window; + + row_based_following_calc(rmm::device_uvector const& group_offsets, + rmm::device_uvector const& group_labels, + cudf::size_type const& following_window) + : _group_offsets_begin(group_offsets.data()), + _group_labels_begin(group_labels.data()), + _following_window(following_window) + { + } + + __device__ cudf::size_type operator()(cudf::size_type const& idx) const + { + auto group_label = _group_labels_begin[idx]; + if constexpr (following_less_than_0) { + auto group_start = _group_offsets_begin[group_label]; + return thrust::maximum{}(_following_window, -(idx - group_start) - 1); + } else { + auto group_end = + _group_offsets_begin[group_label + 1]; // Cannot fall off the end, since offsets + // is capped with `input.size()`. + return thrust::minimum{}(_following_window, (group_end - 1) - idx); + } + } +}; + +/// Helper to materialize following-window column, corrected to respect group boundaries. +/// i.e. If following window == 5, then: +/// 1. For the last row in the group, the following is set to 0. +/// 2. For the second last row in the group, following is set to 1, etc. +std::unique_ptr make_following_column( + rmm::device_uvector const& group_offsets, + rmm::device_uvector const& group_labels, + cudf::size_type const& following_window, + cudf::size_type const& num_rows, + rmm::cuda_stream_view stream) +{ + if (following_window < 0) { + auto const calc = row_based_following_calc(group_offsets, group_labels, following_window); + return cudf::detail::expand_to_column(calc, num_rows, stream); + } else { + auto const calc = + row_based_following_calc(group_offsets, group_labels, following_window); + return cudf::detail::expand_to_column(calc, num_rows, stream); + } +} + std::unique_ptr grouped_rolling_window(table_view const& group_keys, column_view const& input, column_view const& default_outputs, @@ -159,34 +262,6 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, group_offsets.element(group_offsets.size() - 1, stream) == input.size() && "Must have at least one group."); - auto preceding_calculator = [d_group_offsets = group_offsets.data(), - d_group_labels = group_labels.data(), - preceding_window] __device__(size_type idx) { - auto group_label = d_group_labels[idx]; - if (preceding_window < 1) { // where 1 indicates only the current row. - auto group_end = d_group_offsets[group_label + 1]; - return thrust::maximum{}(preceding_window, -(group_end - 1 - idx)); - } else { - auto group_start = d_group_offsets[group_label]; - return thrust::minimum{}(preceding_window, - idx - group_start + 1); // Preceding includes current row. - } - }; - - auto following_calculator = [d_group_offsets = group_offsets.data(), - d_group_labels = group_labels.data(), - following_window] __device__(size_type idx) { - auto group_label = d_group_labels[idx]; - if (following_window < 0) { - auto group_start = d_group_offsets[group_label]; - return thrust::maximum{}(following_window, -(idx - group_start) - 1); - } else { - auto group_end = d_group_offsets[group_label + 1]; // Cannot fall off the end, since offsets - // is capped with `input.size()`. - return thrust::minimum{}(following_window, (group_end - 1) - idx); - } - }; - if (aggr.kind == aggregation::CUDA || aggr.kind == aggregation::PTX) { cudf::detail::preceding_window_wrapper grouped_preceding_window{ group_offsets.data(), group_labels.data(), preceding_window}; @@ -205,9 +280,9 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, mr); } else { auto const preceding_column = - cudf::detail::expand_to_column(preceding_calculator, input.size(), stream); + make_preceding_column(group_offsets, group_labels, preceding_window, input.size(), stream); auto const following_column = - cudf::detail::expand_to_column(following_calculator, input.size(), stream); + make_following_column(group_offsets, group_labels, following_window, input.size(), stream); return cudf::detail::rolling_window(input, default_outputs, preceding_column->view().begin(), From 5f310a2f9dafa6b2f16878a1c677d20c576029ee Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 18 Sep 2023 13:30:03 -0700 Subject: [PATCH 10/13] Fixed up test. --- cpp/tests/rolling/offset_row_window_test.cpp | 22 ++++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/cpp/tests/rolling/offset_row_window_test.cpp b/cpp/tests/rolling/offset_row_window_test.cpp index 05de2e9024b..384177a67a6 100644 --- a/cpp/tests/rolling/offset_row_window_test.cpp +++ b/cpp/tests/rolling/offset_row_window_test.cpp @@ -35,6 +35,7 @@ using bigints_column = fwcw; using strings_column = cudf::test::strings_column_wrapper; using lists_column = cudf::test::lists_column_wrapper; using column_ptr = std::unique_ptr; +using cudf::test::iterators::all_nulls; using cudf::test::iterators::no_nulls; using cudf::test::iterators::nulls_at; @@ -252,7 +253,7 @@ TEST_F(OffsetRowWindowTest, OffsetRowWindow_Ungrouped_0_to_2) no_nulls}); } -TEST_F(OffsetRowWindowTest, Problematic) +TEST_F(OffsetRowWindowTest, CheckGroupBoundaries) { auto grp_iter = thrust::make_transform_iterator(thrust::make_counting_iterator(0), [](auto const& i) { @@ -266,13 +267,14 @@ TEST_F(OffsetRowWindowTest, Problematic) auto const results = cudf::grouped_rolling_window(cudf::table_view{{grp}}, agg, - -1, - 4, + -80, + 100, 1, *cudf::make_max_aggregation()); - std::cout << "Max(-1, 4): " << std::endl; - cudf::test::print(*results); - std::cout << std::endl; + auto const null_iter = thrust::make_constant_iterator(null); + auto const expected = ints_column(null_iter, null_iter + 30, all_nulls()); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(results->view(), expected); } { auto const results = @@ -282,9 +284,11 @@ TEST_F(OffsetRowWindowTest, Problematic) 4, 1, *cudf::make_min_aggregation()); + auto const expected = + ints_column{{1, 1, 1, 1, 1, 1, 1, 1, null, null, 2, 2, 2, 2, 2, + 2, 2, 2, null, null, 3, 3, 3, 3, 3, 3, 3, 3, null, null}, + nulls_at({8, 9, 18, 19, 28, 29})}; - std::cout << "Min(-1, 4): " << std::endl; - cudf::test::print(*results); - std::cout << std::endl; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(results->view(), expected); } } From ada90c2c77566451d52d8bcc7e35ca0fcf2c90a7 Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 19 Sep 2023 12:25:17 -0700 Subject: [PATCH 11/13] Proper test for clamping window bounds at group boundaries. --- cpp/tests/rolling/offset_row_window_test.cpp | 49 ++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/cpp/tests/rolling/offset_row_window_test.cpp b/cpp/tests/rolling/offset_row_window_test.cpp index 384177a67a6..ec726878b34 100644 --- a/cpp/tests/rolling/offset_row_window_test.cpp +++ b/cpp/tests/rolling/offset_row_window_test.cpp @@ -253,6 +253,55 @@ TEST_F(OffsetRowWindowTest, OffsetRowWindow_Ungrouped_0_to_2) no_nulls}); } +// To test that preceding bounds are clamped correctly at group boundaries. +TEST_F(OffsetRowWindowTest, TestNegativeBoundsClamp) +{ + auto const grp_iter = + thrust::make_transform_iterator(thrust::make_counting_iterator(0), [](auto const& i) { + return i / 10; // 0-9 in the first group, 10-19 in the second, etc. + }); + auto const agg_iter = thrust::make_constant_iterator(1); + + auto const grp = ints_column(grp_iter, grp_iter + 30); + auto const agg = ints_column(agg_iter, agg_iter + 30); + + auto const min_periods = 0; + auto const rolling_sum = [&](auto const preceding, auto const following) { + return cudf::grouped_rolling_window( + cudf::table_view{{grp}}, agg, preceding, following, min_periods, *AGG_SUM); + }; + + // Testing negative preceding. + for (auto const preceding : {0, -1, -2, -5, -10, -20, -50}) { + auto const results = rolling_sum(preceding, 100); + auto const expected_fun = [&](auto const& i) { + assert(preceding < 1); + auto const index_in_group = i % 10; + auto const start = std::min(-(preceding - 1) + index_in_group, 10); + return int64_t{10 - start}; + }; + auto const expected_iter = + thrust::make_transform_iterator(thrust::make_counting_iterator(0), expected_fun); + auto const expected = bigints_column(expected_iter, expected_iter + 30, no_nulls()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected); + } + + // Testing negative following. + for (auto const following : {-1, -2, -5, -10, -20, -50}) { + auto const results = rolling_sum(100, following); + auto const expected_fun = [&](auto const& i) { + assert(following < 0); + auto const index_in_group = i % 10; + auto const end = std::max(index_in_group + following, -1); + return int64_t{end + 1}; + }; + auto const expected_iter = + thrust::make_transform_iterator(thrust::make_counting_iterator(0), expected_fun); + auto const expected = bigints_column(expected_iter, expected_iter + 30, no_nulls()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected); + } +} + TEST_F(OffsetRowWindowTest, CheckGroupBoundaries) { auto grp_iter = From da78c8e8c70b59caa6f4605998cd4fa76e061fbe Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 20 Sep 2023 12:26:20 -0700 Subject: [PATCH 12/13] Updated grouped_rolling_window documentation: This now explains the semantics for negative window bounds. --- cpp/include/cudf/rolling.hpp | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/cpp/include/cudf/rolling.hpp b/cpp/include/cudf/rolling.hpp index efdb85691bd..b8eb3d3a444 100644 --- a/cpp/include/cudf/rolling.hpp +++ b/cpp/include/cudf/rolling.hpp @@ -199,6 +199,24 @@ struct window_bounds { * column of the same type as the input. Therefore it is suggested to convert integer column types * (especially low-precision integers) to `FLOAT32` or `FLOAT64` before doing a rolling `MEAN`. * + * Note: `preceding_window` and `following_window` could well have negative values. This yields + * windows where the current row might not be included at all. For instance, consider a window + * defined as (preceding=3, following=-1). This produces a window from 2 (i.e. 3-1) rows preceding + * the current row, and 1 row *preceding* the current row. For the example above, the window for + * row#3 is: + * + * [ 10, 20, 10, 50, 60, 20, 30, 80, 40 ] + * <--window--> ^ + * | + * current_row + * + * Similarly, `preceding` could have a negative value, indicating that the window begins at a + * position after the current row. It differs slightly from the semantics for `following`, because + * `preceding` includes the current row. Therefore: + * 1. preceding=1 => Window starts at the current row. + * 2. preceding=0 => Window starts at 1 past the current row. + * 3. preceding=-1 => Window starts at 2 past the current row. Etc. + * * @param[in] group_keys The (pre-sorted) grouping columns * @param[in] input The input column (to be aggregated) * @param[in] preceding_window The static rolling window size in the backward direction From 46de25047a3fc97545732c3f394c482515700f14 Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 20 Sep 2023 15:12:06 -0700 Subject: [PATCH 13/13] Adjusted description for preceding/following. --- cpp/include/cudf/rolling.hpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cpp/include/cudf/rolling.hpp b/cpp/include/cudf/rolling.hpp index b8eb3d3a444..ec93c709163 100644 --- a/cpp/include/cudf/rolling.hpp +++ b/cpp/include/cudf/rolling.hpp @@ -219,8 +219,10 @@ struct window_bounds { * * @param[in] group_keys The (pre-sorted) grouping columns * @param[in] input The input column (to be aggregated) - * @param[in] preceding_window The static rolling window size in the backward direction - * @param[in] following_window The static rolling window size in the forward direction + * @param[in] preceding_window The static rolling window size in the backward direction (for + * positive values), or forward direction (for negative values) + * @param[in] following_window The static rolling window size in the forward direction (for positive + * values), or backward direction (for negative values) * @param[in] min_periods Minimum number of observations in window required to have a value, * otherwise element `i` is null. * @param[in] aggr The rolling window aggregation type (SUM, MAX, MIN, etc.)