From fee04b886797032b3b67cbadf8cc9d9220c7d25d Mon Sep 17 00:00:00 2001 From: Pramod Date: Mon, 28 Oct 2024 10:36:14 -0700 Subject: [PATCH] Extract common `toSerializedPage` test function (#9462) Summary: Refactors `toSerializedPage` function to add a common helper function for serializing `RowVector` to `PrestoPage` format, in file `SerializedPageUtil.cpp`. Pull Request resolved: https://github.com/facebookincubator/velox/pull/9462 Reviewed By: xiaoxmeng Differential Revision: D64872524 Pulled By: kagamiori fbshipit-source-id: b306f61cde1bb21b60a97d93efe9cefef7eca6a4 --- velox/exec/tests/ExchangeClientTest.cpp | 17 ++------- velox/exec/tests/MultiFragmentTest.cpp | 15 +------- velox/exec/tests/OutputBufferManagerTest.cpp | 17 +-------- velox/exec/tests/utils/CMakeLists.txt | 3 +- velox/exec/tests/utils/SerializedPageUtil.cpp | 38 +++++++++++++++++++ velox/exec/tests/utils/SerializedPageUtil.h | 31 +++++++++++++++ 6 files changed, 78 insertions(+), 43 deletions(-) create mode 100644 velox/exec/tests/utils/SerializedPageUtil.cpp create mode 100644 velox/exec/tests/utils/SerializedPageUtil.h diff --git a/velox/exec/tests/ExchangeClientTest.cpp b/velox/exec/tests/ExchangeClientTest.cpp index 8376b58ab38d..cf0a9eabe557 100644 --- a/velox/exec/tests/ExchangeClientTest.cpp +++ b/velox/exec/tests/ExchangeClientTest.cpp @@ -24,6 +24,7 @@ #include "velox/exec/tests/utils/LocalExchangeSource.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/QueryAssertions.h" +#include "velox/exec/tests/utils/SerializedPageUtil.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/vector/tests/utils/VectorTestBase.h" @@ -56,18 +57,6 @@ class ExchangeClientTest : public testing::Test, test::testingShutdownLocalExchangeSource(); } - std::unique_ptr toSerializedPage(const RowVectorPtr& vector) { - auto data = std::make_unique(pool()); - auto size = vector->size(); - auto range = IndexRange{0, size}; - data->createStreamTree(asRowType(vector->type()), size); - data->append(vector, folly::Range(&range, 1)); - auto listener = bufferManager_->newListener(); - IOBufOutputStream stream(*pool(), listener.get(), data->size()); - data->flush(&stream); - return std::make_unique(stream.getIOBuf(), nullptr, size); - } - std::shared_ptr makeTask( const std::string& taskId, const std::optional maxOutputBufferSizeInBytes = {}) { @@ -93,7 +82,7 @@ class ExchangeClientTest : public testing::Test, const std::string& taskId, int32_t destination, const RowVectorPtr& data) { - auto page = toSerializedPage(data); + auto page = test::toSerializedPage(data, bufferManager_, pool()); const auto pageSize = page->size(); ContinueFuture unused; auto blocked = @@ -231,7 +220,7 @@ TEST_F(ExchangeClientTest, flowControl) { makeFlatVector(10'000, [](auto row) { return row; }), }); - auto page = toSerializedPage(data); + auto page = test::toSerializedPage(data, bufferManager_, pool()); // Set limit at 3.5 pages. auto client = std::make_shared( diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index e1785b8d99cc..dfb4efb15eee 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -29,6 +29,7 @@ #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/LocalExchangeSource.h" #include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/SerializedPageUtil.h" using namespace facebook::velox::exec::test; @@ -154,23 +155,11 @@ class MultiFragmentTest : public HiveConnectorTestBase { createDuckDbTable(vectors_); } - std::unique_ptr toSerializedPage(const RowVectorPtr& vector) { - auto data = std::make_unique(pool()); - auto size = vector->size(); - auto range = IndexRange{0, size}; - data->createStreamTree(asRowType(vector->type()), size); - data->append(vector, folly::Range(&range, 1)); - auto listener = bufferManager_->newListener(); - IOBufOutputStream stream(*pool(), listener.get(), data->size()); - data->flush(&stream); - return std::make_unique(stream.getIOBuf(), nullptr, size); - } - int32_t enqueue( const std::string& taskId, int32_t destination, const RowVectorPtr& data) { - auto page = toSerializedPage(data); + auto page = toSerializedPage(data, bufferManager_, pool()); const auto pageSize = page->size(); ContinueFuture unused; diff --git a/velox/exec/tests/OutputBufferManagerTest.cpp b/velox/exec/tests/OutputBufferManagerTest.cpp index f61e4eb8258b..c8414c3b2ecc 100644 --- a/velox/exec/tests/OutputBufferManagerTest.cpp +++ b/velox/exec/tests/OutputBufferManagerTest.cpp @@ -20,6 +20,7 @@ #include "velox/dwio/common/tests/utils/BatchMaker.h" #include "velox/exec/Task.h" #include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/SerializedPageUtil.h" #include "velox/serializers/PrestoSerializer.h" using namespace facebook::velox; @@ -90,21 +91,7 @@ class OutputBufferManagerTest : public testing::Test { vector_size_t size) { auto vector = std::dynamic_pointer_cast( BatchMaker::createBatch(rowType, size, *pool_)); - return toSerializedPage(vector); - } - - std::unique_ptr toSerializedPage(VectorPtr vector) { - auto data = std::make_unique(pool_.get()); - auto size = vector->size(); - auto range = IndexRange{0, size}; - data->createStreamTree( - std::dynamic_pointer_cast(vector->type()), size); - data->append( - std::dynamic_pointer_cast(vector), folly::Range(&range, 1)); - auto listener = bufferManager_->newListener(); - IOBufOutputStream stream(*pool_, listener.get(), data->size()); - data->flush(&stream); - return std::make_unique(stream.getIOBuf(), nullptr, size); + return exec::test::toSerializedPage(vector, bufferManager_, pool_.get()); } void enqueue( diff --git a/velox/exec/tests/utils/CMakeLists.txt b/velox/exec/tests/utils/CMakeLists.txt index 8e28b4dcdc89..c2d227410c1e 100644 --- a/velox/exec/tests/utils/CMakeLists.txt +++ b/velox/exec/tests/utils/CMakeLists.txt @@ -30,7 +30,8 @@ add_library( SumNonPODAggregate.cpp TpchQueryBuilder.cpp VectorTestUtil.cpp - PortUtil.cpp) + PortUtil.cpp + SerializedPageUtil.cpp) target_link_libraries( velox_exec_test_lib diff --git a/velox/exec/tests/utils/SerializedPageUtil.cpp b/velox/exec/tests/utils/SerializedPageUtil.cpp new file mode 100644 index 000000000000..512c59457c79 --- /dev/null +++ b/velox/exec/tests/utils/SerializedPageUtil.cpp @@ -0,0 +1,38 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/tests/utils/SerializedPageUtil.h" + +using namespace facebook::velox; + +namespace facebook::velox::exec::test { + +std::unique_ptr toSerializedPage( + const RowVectorPtr& vector, + const std::shared_ptr& bufferManager, + memory::MemoryPool* pool) { + auto data = std::make_unique(pool); + auto size = vector->size(); + auto range = IndexRange{0, size}; + data->createStreamTree(asRowType(vector->type()), size); + data->append(vector, folly::Range(&range, 1)); + auto listener = bufferManager->newListener(); + IOBufOutputStream stream(*pool, listener.get(), data->size()); + data->flush(&stream); + return std::make_unique(stream.getIOBuf(), nullptr, size); +} + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/SerializedPageUtil.h b/velox/exec/tests/utils/SerializedPageUtil.h new file mode 100644 index 000000000000..3dddd2206169 --- /dev/null +++ b/velox/exec/tests/utils/SerializedPageUtil.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/exec/ExchangeQueue.h" +#include "velox/exec/OutputBufferManager.h" +#include "velox/vector/ComplexVector.h" +#include "velox/vector/VectorStream.h" + +namespace facebook::velox::exec::test { + +/// Helper function for serializing RowVector to PrestoPage format. +std::unique_ptr toSerializedPage( + const RowVectorPtr& vector, + const std::shared_ptr& bufferManager, + memory::MemoryPool* pool); + +} // namespace facebook::velox::exec::test