From 58fcd43ea7735b135a3a385a117b094e21e4d5ca Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 12 Aug 2024 13:44:45 +0000 Subject: [PATCH] Unnest respect kPrefferenOutputBatchRows strictly --- velox/exec/Unnest.cpp | 170 ++++++++++++++++++--- velox/exec/Unnest.h | 20 ++- velox/exec/tests/UnnestTest.cpp | 141 ++++++++++------- velox/exec/tests/utils/OperatorTestBase.h | 6 + velox/exec/tests/utils/QueryAssertions.cpp | 8 + velox/exec/tests/utils/QueryAssertions.h | 3 + 6 files changed, 268 insertions(+), 80 deletions(-) diff --git a/velox/exec/Unnest.cpp b/velox/exec/Unnest.cpp index af39ffa7f4e7..86aae114367d 100644 --- a/velox/exec/Unnest.cpp +++ b/velox/exec/Unnest.cpp @@ -15,6 +15,7 @@ */ #include "velox/exec/Unnest.h" +#include #include "velox/common/base/Nulls.h" #include "velox/vector/FlatVector.h" @@ -29,7 +30,8 @@ Unnest::Unnest( operatorId, unnestNode->id(), "Unnest"), - withOrdinality_(unnestNode->withOrdinality()) { + withOrdinality_(unnestNode->withOrdinality()), + maxOutputSize_(outputBatchRows()) { const auto& inputType = unnestNode->sources()[0]->outputType(); const auto& unnestVariables = unnestNode->unnestVariables(); for (const auto& variable : unnestVariables) { @@ -111,20 +113,61 @@ RowVectorPtr Unnest::getOutput() { } const auto size = input_->size(); - const auto maxOutputSize = outputBatchRows(); // Limit the number of input rows to keep output batch size within - // 'maxOutputSize' if possible. Process each input row fully. Do not break - // single row's output into multiple batches. + // 'maxOutputSize' if possible. Not process each input row fully when single + // row's output exceeds maxOutputSize. Single row's output maybe into + // multiple batches. vector_size_t numInput = 0; vector_size_t numElements = 0; - for (auto row = nextInputRow_; row < size; ++row) { - numElements += rawMaxSizes_[row]; + vector_size_t partialProcessRowStartSize = -1; + vector_size_t firstRowEndSize = -1; + // Process first row. + if (nextInputRow_ < size) { + auto firstRow = nextInputRow_; + firstRowEndSize = rawMaxSizes_[firstRow]; + vector_size_t remainingSize = firstRowEndSize - firstRowStartSize_; + if (numElements + remainingSize > maxOutputSize_) { + // Single row's output is into multiple batches. + // Read the size range from them, not use 0 to rawMaxSizes_[row]. + firstRowEndSize = firstRowStartSize_ + maxOutputSize_ - numElements; + // Process maxOutputSize_ in this getOutput. + numElements = maxOutputSize_; + partialProcessRowStartSize = firstRowEndSize; + } else { + // Not need to split this row + numElements += remainingSize; + } ++numInput; - - if (numElements >= maxOutputSize) { + } + // Not split middle row. + // If there is only 1 row, the end row will not take effect, its startSize is + // always 0. + vector_size_t endRowEndSize = -1; + for (auto row = nextInputRow_ + 1; row < size; ++row) { + if (numElements >= maxOutputSize_) { break; } + vector_size_t remainingSize = rawMaxSizes_[row]; + if (numElements + remainingSize > maxOutputSize_) { + // This is the end row. + // Single row's output is into multiple batches. + // read the size range from them, not use 0 to rawMaxSizes_[row]. + endRowEndSize = maxOutputSize_ - numElements; + // Process maxOutputSize_ in this getOutput. + numElements = maxOutputSize_; + partialProcessRowStartSize = endRowEndSize; + ++numInput; + break; + } else { + // Not split this row. + numElements += remainingSize; + ++numInput; + } + } + // The end row is not partial, set it to the maxSize. + if (endRowEndSize == -1 && numInput > 1) { + endRowEndSize = rawMaxSizes_[nextInputRow_ + numInput - 1]; } if (numElements == 0) { @@ -134,9 +177,20 @@ RowVectorPtr Unnest::getOutput() { return nullptr; } - auto output = generateOutput(nextInputRow_, numInput, numElements); - - nextInputRow_ += numInput; + std::cout << "row1 startSize " << firstRowStartSize_ << " endSize " + << firstRowEndSize << " end row endSize " << endRowEndSize + << std::endl; + + auto output = generateOutput( + nextInputRow_, numInput, numElements, firstRowEndSize, endRowEndSize); + std::cout << "generate output" << output->toString(0, 300) << std::endl; + if (partialProcessRowStartSize != -1) { + firstRowStartSize_ = partialProcessRowStartSize; + nextInputRow_ += numInput - 1; + } else { + firstRowStartSize_ = 0; + nextInputRow_ += numInput; + } if (nextInputRow_ >= size) { input_ = nullptr; @@ -150,17 +204,29 @@ void Unnest::generateRepeatedColumns( vector_size_t start, vector_size_t size, vector_size_t numElements, - std::vector& outputs) { + std::vector& outputs, + vector_size_t firstRowEndSize, + vector_size_t endRowEndSize) { // Create "indices" buffer to repeat rows as many times as there are elements // in the array (or map) in unnestDecoded. auto repeatedIndices = allocateIndices(numElements, pool()); auto* rawRepeatedIndices = repeatedIndices->asMutable(); vector_size_t index = 0; - for (auto row = start; row < start + size; ++row) { + if (size > 0) { + for (auto i = firstRowStartSize_; i < firstRowEndSize; i++) { + rawRepeatedIndices[index++] = start; + } + } + for (auto row = start + 1; row < start + size - 1; ++row) { for (auto i = 0; i < rawMaxSizes_[row]; i++) { rawRepeatedIndices[index++] = row; } } + if (size > 1) { + for (auto i = 0; i < endRowEndSize; i++) { + rawRepeatedIndices[index++] = start + size - 1; + } + } // Wrap "replicated" columns in a dictionary using 'repeatedIndices'. for (const auto& projection : identityProjections_) { @@ -176,7 +242,9 @@ const Unnest::UnnestChannelEncoding Unnest::generateEncodingForChannel( column_index_t channel, vector_size_t start, vector_size_t size, - vector_size_t numElements) { + vector_size_t numElements, + vector_size_t firstRowEndSize, + vector_size_t endRowEndSize) { BufferPtr elementIndices = allocateIndices(numElements, pool()); auto* rawElementIndices = elementIndices->asMutable(); @@ -191,7 +259,45 @@ const Unnest::UnnestChannelEncoding Unnest::generateEncodingForChannel( // Make dictionary index for elements column since they may be out of order. vector_size_t index = 0; bool identityMapping = true; - for (auto row = start; row < start + size; ++row) { + if (firstRowStartSize_ != 0) { + identityMapping = false; + } + + auto firstEndRowGenerator = + [&](vector_size_t row, vector_size_t startSize, vector_size_t endSize) { + if (!currentDecoded.isNullAt(row)) { + const auto offset = currentOffsets[currentIndices[row]]; + const auto unnestSize = currentSizes[currentIndices[row]]; + if (index != offset || endSize != rawMaxSizes_[row] || + unnestSize < endSize) { + identityMapping = false; + } + auto currentUnnestSize = std::min(endSize, unnestSize); + std::cout << "for channel " << channel << " numElements " + << numElements << " for row " << row << "startSize " + << startSize << "endSize " << endSize << " offset " + << offset << " unnestSize " << unnestSize << std::endl; + for (auto i = startSize; i < currentUnnestSize; i++) { + rawElementIndices[index++] = offset + i; + } + + for (auto i = currentUnnestSize; i < endSize; ++i) { + bits::setNull(rawNulls, index++, true); + } + } else if (endSize - startSize > 0) { + identityMapping = false; + + for (auto i = startSize; i < endSize; ++i) { + bits::setNull(rawNulls, index++, true); + } + } + }; + + if (size > 0) { + firstEndRowGenerator(start, firstRowStartSize_, firstRowEndSize); + } + + for (auto row = start + 1; row < start + size - 1; ++row) { const auto maxSize = rawMaxSizes_[row]; if (!currentDecoded.isNullAt(row)) { @@ -217,24 +323,40 @@ const Unnest::UnnestChannelEncoding Unnest::generateEncodingForChannel( } } } + + if (size > 1) { + firstEndRowGenerator(start + size - 1, 0, endRowEndSize); + } + return {elementIndices, nulls, identityMapping}; } VectorPtr Unnest::generateOrdinalityVector( vector_size_t start, vector_size_t size, - vector_size_t numElements) { + vector_size_t numElements, + vector_size_t firstRowEndSize, + vector_size_t endRowEndSize) { auto ordinalityVector = BaseVector::create>(BIGINT(), numElements, pool()); // Set the ordinality at each result row to be the index of the element in // the original array (or map) plus one. auto* rawOrdinality = ordinalityVector->mutableRawValues(); - for (auto row = start; row < start + size; ++row) { + if (size > 0) { + const auto maxSize = firstRowEndSize - firstRowStartSize_; + std::iota(rawOrdinality, rawOrdinality + maxSize, firstRowStartSize_ + 1); + rawOrdinality += maxSize; + } + for (auto row = start + 1; row < start + size - 1; ++row) { const auto maxSize = rawMaxSizes_[row]; std::iota(rawOrdinality, rawOrdinality + maxSize, 1); rawOrdinality += maxSize; } + if (size > 1) { + std::iota(rawOrdinality, rawOrdinality + endRowEndSize, 1); + rawOrdinality += endRowEndSize; + } return ordinalityVector; } @@ -242,15 +364,18 @@ VectorPtr Unnest::generateOrdinalityVector( RowVectorPtr Unnest::generateOutput( vector_size_t start, vector_size_t size, - vector_size_t numElements) { + vector_size_t numElements, + vector_size_t firstRowEndSize, + vector_size_t endRowEndSize) { std::vector outputs(outputType_->size()); - generateRepeatedColumns(start, size, numElements, outputs); + generateRepeatedColumns( + start, size, numElements, outputs, firstRowEndSize, endRowEndSize); // Create unnest columns. vector_size_t outputsIndex = identityProjections_.size(); for (auto channel = 0; channel < unnestChannels_.size(); ++channel) { - const auto unnestChannelEncoding = - generateEncodingForChannel(channel, start, size, numElements); + const auto unnestChannelEncoding = generateEncodingForChannel( + channel, start, size, numElements, firstRowEndSize, endRowEndSize); auto& currentDecoded = unnestDecoded_[channel]; if (currentDecoded.base()->typeKind() == TypeKind::ARRAY) { @@ -272,7 +397,8 @@ RowVectorPtr Unnest::generateOutput( if (withOrdinality_) { // Ordinality column is always at the end. - outputs.back() = generateOrdinalityVector(start, size, numElements); + outputs.back() = generateOrdinalityVector( + start, size, numElements, firstRowEndSize, endRowEndSize); } return std::make_shared( diff --git a/velox/exec/Unnest.h b/velox/exec/Unnest.h index f1ae04e1137a..f824961fb7b4 100644 --- a/velox/exec/Unnest.h +++ b/velox/exec/Unnest.h @@ -40,6 +40,7 @@ class Unnest : public Operator { private: // Generate output for 'size' input rows starting from 'start' input row. + // Get the firstRowStartSize from class member `firstRowStartSize_`. // // @param start First input row to include in the output. // @param size Number of input rows to include in the output. @@ -47,7 +48,9 @@ class Unnest : public Operator { RowVectorPtr generateOutput( vector_size_t start, vector_size_t size, - vector_size_t outputSize); + vector_size_t outputSize, + vector_size_t firstRowEndSize, + vector_size_t endRowEndSize); // Invoked by generateOutput function above to generate the repeated output // columns. @@ -55,7 +58,9 @@ class Unnest : public Operator { vector_size_t start, vector_size_t size, vector_size_t numElements, - std::vector& outputs); + std::vector& outputs, + vector_size_t firstRowEndSize, + vector_size_t endRowEndSize); struct UnnestChannelEncoding { BufferPtr indices; @@ -71,22 +76,29 @@ class Unnest : public Operator { column_index_t channel, vector_size_t start, vector_size_t size, - vector_size_t numElements); + vector_size_t numElements, + vector_size_t firstRowEndSize, + vector_size_t endRowEndSize); // Invoked by generateOutput for the ordinality column. VectorPtr generateOrdinalityVector( vector_size_t start, vector_size_t size, - vector_size_t numElements); + vector_size_t numElements, + vector_size_t firstRowEndSize, + vector_size_t endRowEndSize); const bool withOrdinality_; std::vector unnestChannels_; std::vector unnestDecoded_; + const uint32_t maxOutputSize_; BufferPtr maxSizes_; vector_size_t* rawMaxSizes_{nullptr}; + vector_size_t firstRowStartSize_ = 0; + std::vector rawSizes_; std::vector rawOffsets_; std::vector rawIndices_; diff --git a/velox/exec/tests/UnnestTest.cpp b/velox/exec/tests/UnnestTest.cpp index afaad1b3530f..16da9f4f19b9 100644 --- a/velox/exec/tests/UnnestTest.cpp +++ b/velox/exec/tests/UnnestTest.cpp @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "velox/common/base/tests/GTestUtils.h" #include "velox/exec/PlanNodeStats.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/OperatorTestBase.h" @@ -21,9 +22,29 @@ using namespace facebook::velox; using namespace facebook::velox::exec::test; -class UnnestTest : public OperatorTestBase {}; +class UnnestTest : public OperatorTestBase, + public testing::WithParamInterface { + void SetUp() override { + OperatorTestBase::SetUp(); + } + + void TearDown() override { + OperatorTestBase::TearDown(); + } + + protected: + const int32_t batchSize_{GetParam()}; + const CursorParameters makeCursorParameters( + const core::PlanNodePtr planNode) const { + CursorParameters params; + params.planNode = planNode; + params.queryConfigs[core::QueryConfig::kPreferredOutputBatchRows] = + std::to_string(batchSize_); + return params; + } +}; -TEST_F(UnnestTest, basicArray) { +TEST_P(UnnestTest, basicArray) { auto vector = makeRowVector({ makeFlatVector(100, [](auto row) { return row; }), makeArrayVector( @@ -38,10 +59,12 @@ TEST_F(UnnestTest, basicArray) { // TODO Add tests with empty arrays. This requires better support in DuckDB. auto op = PlanBuilder().values({vector}).unnest({"c0"}, {"c1"}).planNode(); - assertQuery(op, "SELECT c0, UNNEST(c1) FROM tmp WHERE c0 % 7 > 0"); + assertQuery( + makeCursorParameters(op), + "SELECT c0, UNNEST(c1) FROM tmp WHERE c0 % 7 > 0"); } -TEST_F(UnnestTest, arrayWithOrdinality) { +TEST_P(UnnestTest, arrayWithOrdinality) { auto array = vectorMaker_.arrayVectorNullable( {{{1, 2, std::nullopt, 4}}, std::nullopt, @@ -73,7 +96,7 @@ TEST_F(UnnestTest, arrayWithOrdinality) { makeNullableFlatVector( {1, 2, std::nullopt, 4, 5, 6, std::nullopt, 7, 8, 9}), makeNullableFlatVector({1, 2, 3, 4, 1, 2, 1, 1, 2, 3})}); - assertQuery(op, expected); + assertQuery(makeCursorParameters(op), expected); // Test with array wrapped in dictionary. auto reversedIndices = makeIndicesInReverse(6); @@ -100,10 +123,10 @@ TEST_F(UnnestTest, arrayWithOrdinality) { makeNullableFlatVector( {7, 8, 9, std::nullopt, 5, 6, 1, 2, std::nullopt, 4}), makeNullableFlatVector({1, 2, 3, 1, 1, 2, 1, 2, 3, 4})}); - assertQuery(op, expectedInDict); + assertQuery(makeCursorParameters(op), expectedInDict); } -TEST_F(UnnestTest, basicMap) { +TEST_P(UnnestTest, basicMap) { auto vector = makeRowVector( {makeFlatVector(100, [](auto row) { return row; }), makeMapVector( @@ -125,10 +148,11 @@ TEST_F(UnnestTest, basicMap) { [](auto /* row */) { return 2; }, [](auto /* row */, auto index) { return index + 1; })}); createDuckDbTable({duckDbVector}); - assertQuery(op, "SELECT c0, UNNEST(c1), UNNEST(c2) FROM tmp"); + assertQuery( + makeCursorParameters(op), "SELECT c0, UNNEST(c1), UNNEST(c2) FROM tmp"); } -TEST_F(UnnestTest, mapWithOrdinality) { +TEST_P(UnnestTest, mapWithOrdinality) { auto map = makeMapVector( {{{1, 1.1}, {2, std::nullopt}}, {{3, 3.3}, {4, 4.4}, {5, 5.5}}, @@ -147,7 +171,7 @@ TEST_F(UnnestTest, mapWithOrdinality) { makeNullableFlatVector( {1.1, std::nullopt, 3.3, 4.4, 5.5, std::nullopt}), makeNullableFlatVector({1, 2, 1, 2, 3, 1})}); - assertQuery(op, expected); + assertQuery(makeCursorParameters(op), expected); // Test with map wrapped in dictionary. auto reversedIndices = makeIndicesInReverse(3); @@ -165,10 +189,26 @@ TEST_F(UnnestTest, mapWithOrdinality) { makeNullableFlatVector( {std::nullopt, 3.3, 4.4, 5.5, 1.1, std::nullopt}), makeNullableFlatVector({1, 1, 2, 3, 1, 2})}); - assertQuery(op, expectedInDict); + assertQuery(makeCursorParameters(op), expectedInDict); } -TEST_F(UnnestTest, multipleColumns) { +TEST_P(UnnestTest, offsets) { + std::vector offsets(100, 0); + for (int i = 1; i < 100; ++i) { + offsets[i] = offsets[i - 1] + i % 11 + 1; + } + + auto vector = makeRowVector({ + makeFlatVector(100, [](auto row) { return row; }), + makeArrayVector(offsets, makeConstant(7, 700)), + }); + + createDuckDbTable({vector}); + auto op = PlanBuilder().values({vector}).unnest({"c0"}, {"c1"}).planNode(); + assertQuery(makeCursorParameters(op), "SELECT c0, UNNEST(c1) FROM tmp"); +} + +TEST_P(UnnestTest, multipleColumns) { std::vector offsets(100, 0); for (int i = 1; i < 100; ++i) { offsets[i] = offsets[i - 1] + i % 11 + 1; @@ -216,10 +256,11 @@ TEST_F(UnnestTest, multipleColumns) { makeArrayVector(offsets, makeConstant(7, 700))}); createDuckDbTable({duckDbVector}); assertQuery( - op, "SELECT c0, UNNEST(c1), UNNEST(c2), UNNEST(c3), UNNEST(c4) FROM tmp"); + makeCursorParameters(op), + "SELECT c0, UNNEST(c1), UNNEST(c2), UNNEST(c3), UNNEST(c4) FROM tmp"); } -TEST_F(UnnestTest, multipleColumnsWithOrdinality) { +TEST_P(UnnestTest, multipleColumnsWithOrdinality) { std::vector offsets(100, 0); for (int i = 1; i < 100; ++i) { offsets[i] = offsets[i - 1] + i % 11 + 1; @@ -285,7 +326,7 @@ TEST_F(UnnestTest, multipleColumnsWithOrdinality) { })}); createDuckDbTable({duckDbVector}); assertQuery( - op, + makeCursorParameters(op), "SELECT c0, UNNEST(c1), UNNEST(c2), UNNEST(c3), UNNEST(c4), UNNEST(c5) FROM tmp"); // Test with empty arrays and maps. @@ -351,10 +392,10 @@ TEST_F(UnnestTest, multipleColumnsWithOrdinality) { std::nullopt, std::nullopt}), makeNullableFlatVector({1, 2, 3, 4, 1, 2, 3, 1, 2, 1})}); - assertQuery(op, expected); + assertQuery(makeCursorParameters(op), expected); } -TEST_F(UnnestTest, allEmptyOrNullArrays) { +TEST_P(UnnestTest, allEmptyOrNullArrays) { auto vector = makeRowVector( {makeFlatVector(100, [](auto row) { return row; }), makeArrayVector( @@ -370,16 +411,16 @@ TEST_F(UnnestTest, allEmptyOrNullArrays) { auto op = PlanBuilder().values({vector}).unnest({"c0"}, {"c1", "c2"}).planNode(); - assertQueryReturnsEmptyResult(op); + assertQueryReturnsEmptyResult(makeCursorParameters(op)); op = PlanBuilder() .values({vector}) .unnest({"c0"}, {"c1", "c2"}, "ordinal") .planNode(); - assertQueryReturnsEmptyResult(op); + assertQueryReturnsEmptyResult(makeCursorParameters(op)); } -TEST_F(UnnestTest, allEmptyOrNullMaps) { +TEST_P(UnnestTest, allEmptyOrNullMaps) { auto vector = makeRowVector( {makeFlatVector(100, [](auto row) { return row; }), makeMapVector( @@ -397,21 +438,25 @@ TEST_F(UnnestTest, allEmptyOrNullMaps) { auto op = PlanBuilder().values({vector}).unnest({"c0"}, {"c1", "c2"}).planNode(); - assertQueryReturnsEmptyResult(op); + assertQueryReturnsEmptyResult(makeCursorParameters(op)); op = PlanBuilder() .values({vector}) .unnest({"c0"}, {"c1", "c2"}, "ordinal") .planNode(); - assertQueryReturnsEmptyResult(op); + assertQueryReturnsEmptyResult(makeCursorParameters(op)); } -TEST_F(UnnestTest, batchSize) { +TEST_P(UnnestTest, batchSize) { + // Only test once. + if (batchSize_ != 2) { + return; + } auto data = makeRowVector({ - makeFlatVector(10'000, [](auto row) { return row; }), + makeFlatVector(1'000, [](auto row) { return row; }), }); - // Unnest 10K rows into 30K rows. + // Unnest 1K rows into 3K rows. core::PlanNodeId unnestId; auto plan = PlanBuilder() .values({data}) @@ -421,40 +466,28 @@ TEST_F(UnnestTest, batchSize) { .planNode(); auto expected = makeRowVector({ - makeFlatVector(10'000 * 3, [](auto row) { return 1 + row % 3; }), + makeFlatVector(1'000 * 3, [](auto row) { return 1 + row % 3; }), }); - // 17 rows per output allows to unnest 6 input rows at a time. - { - auto task = AssertQueryBuilder(plan) - .config(core::QueryConfig::kPreferredOutputBatchRows, "17") - .assertResults({expected}); - auto stats = exec::toPlanStats(task->taskStats()); - - ASSERT_EQ(30'000, stats.at(unnestId).outputRows); - ASSERT_EQ(1 + 10'000 / 6, stats.at(unnestId).outputVectors); - } - - // 2 rows per output allows to unnest 1 input row at a time. - { + auto testBatchSize = [&](uint32_t batchSize, + vector_size_t expectedNumVector) { auto task = AssertQueryBuilder(plan) - .config(core::QueryConfig::kPreferredOutputBatchRows, "2") + .config( + core::QueryConfig::kPreferredOutputBatchRows, + std::to_string(batchSize)) .assertResults({expected}); auto stats = exec::toPlanStats(task->taskStats()); - ASSERT_EQ(30'000, stats.at(unnestId).outputRows); - ASSERT_EQ(10'000, stats.at(unnestId).outputVectors); - } - - // 100K rows per output allows to unnest all at once. - { - auto task = - AssertQueryBuilder(plan) - .config(core::QueryConfig::kPreferredOutputBatchRows, "100000") - .assertResults({expected}); - auto stats = exec::toPlanStats(task->taskStats()); + ASSERT_EQ(3'000, stats.at(unnestId).outputRows); + ASSERT_EQ(expectedNumVector, stats.at(unnestId).outputVectors); + }; - ASSERT_EQ(30'000, stats.at(unnestId).outputRows); - ASSERT_EQ(1, stats.at(unnestId).outputVectors); - } + testBatchSize(17, 1 + 3'000 / 17); + testBatchSize(2, 3'000 / 2); + testBatchSize(100'000, 1); } + +VELOX_INSTANTIATE_TEST_SUITE_P( + UnnestTest, + UnnestTest, + testing::ValuesIn({2, 17, 1024})); diff --git a/velox/exec/tests/utils/OperatorTestBase.h b/velox/exec/tests/utils/OperatorTestBase.h index 15141fbc0938..916c92418940 100644 --- a/velox/exec/tests/utils/OperatorTestBase.h +++ b/velox/exec/tests/utils/OperatorTestBase.h @@ -118,6 +118,12 @@ class OperatorTestBase : public testing::Test, return test::assertQuery(plan, {expectedResults}); } + std::shared_ptr assertQuery( + const CursorParameters& params, + const RowVectorPtr& expectedResults) { + return test::assertQuery(params, {expectedResults}); + } + /// Assumes plan has a single leaf node. All splits are added to that node. std::shared_ptr assertQuery( const core::PlanNodePtr& plan, diff --git a/velox/exec/tests/utils/QueryAssertions.cpp b/velox/exec/tests/utils/QueryAssertions.cpp index 581ba4b31908..cce6a18e71f7 100644 --- a/velox/exec/tests/utils/QueryAssertions.cpp +++ b/velox/exec/tests/utils/QueryAssertions.cpp @@ -992,6 +992,14 @@ std::shared_ptr assertQueryReturnsEmptyResult( return cursor->task(); } +std::shared_ptr assertQueryReturnsEmptyResult( + const CursorParameters& params) { + VELOX_DCHECK_NOT_NULL(params.planNode); + auto [cursor, results] = readCursor(params, [](Task*) {}); + assertEmptyResults(results); + return cursor->task(); +} + void assertEmptyResults(const std::vector& results) { size_t totalCount = 0; for (const auto& vector : results) { diff --git a/velox/exec/tests/utils/QueryAssertions.h b/velox/exec/tests/utils/QueryAssertions.h index 217d12351e32..b2f2febf8d9c 100644 --- a/velox/exec/tests/utils/QueryAssertions.h +++ b/velox/exec/tests/utils/QueryAssertions.h @@ -242,6 +242,9 @@ std::shared_ptr assertQuery( std::shared_ptr assertQueryReturnsEmptyResult( const core::PlanNodePtr& plan); +std::shared_ptr assertQueryReturnsEmptyResult( + const CursorParameters& params); + void assertEmptyResults(const std::vector& results); void assertResults(