Skip to content

Commit

Permalink
Extract common toSerializedPage test function (#9462)
Browse files Browse the repository at this point in the history
Summary:
Refactors `toSerializedPage` function to add a common helper function for serializing
`RowVector` to `PrestoPage` format, in file `SerializedPageUtil.cpp`.

Pull Request resolved: #9462

Reviewed By: xiaoxmeng

Differential Revision: D64872524

Pulled By: kagamiori

fbshipit-source-id: b306f61cde1bb21b60a97d93efe9cefef7eca6a4
  • Loading branch information
pramodsatya authored and facebook-github-bot committed Oct 28, 2024
1 parent 2e381e4 commit fee04b8
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 43 deletions.
17 changes: 3 additions & 14 deletions velox/exec/tests/ExchangeClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "velox/exec/tests/utils/LocalExchangeSource.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/QueryAssertions.h"
#include "velox/exec/tests/utils/SerializedPageUtil.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

Expand Down Expand Up @@ -56,18 +57,6 @@ class ExchangeClientTest : public testing::Test,
test::testingShutdownLocalExchangeSource();
}

std::unique_ptr<SerializedPage> toSerializedPage(const RowVectorPtr& vector) {
auto data = std::make_unique<VectorStreamGroup>(pool());
auto size = vector->size();
auto range = IndexRange{0, size};
data->createStreamTree(asRowType(vector->type()), size);
data->append(vector, folly::Range(&range, 1));
auto listener = bufferManager_->newListener();
IOBufOutputStream stream(*pool(), listener.get(), data->size());
data->flush(&stream);
return std::make_unique<SerializedPage>(stream.getIOBuf(), nullptr, size);
}

std::shared_ptr<Task> makeTask(
const std::string& taskId,
const std::optional<uint64_t> maxOutputBufferSizeInBytes = {}) {
Expand All @@ -93,7 +82,7 @@ class ExchangeClientTest : public testing::Test,
const std::string& taskId,
int32_t destination,
const RowVectorPtr& data) {
auto page = toSerializedPage(data);
auto page = test::toSerializedPage(data, bufferManager_, pool());
const auto pageSize = page->size();
ContinueFuture unused;
auto blocked =
Expand Down Expand Up @@ -231,7 +220,7 @@ TEST_F(ExchangeClientTest, flowControl) {
makeFlatVector<int64_t>(10'000, [](auto row) { return row; }),
});

auto page = toSerializedPage(data);
auto page = test::toSerializedPage(data, bufferManager_, pool());

// Set limit at 3.5 pages.
auto client = std::make_shared<ExchangeClient>(
Expand Down
15 changes: 2 additions & 13 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
#include "velox/exec/tests/utils/LocalExchangeSource.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/SerializedPageUtil.h"

using namespace facebook::velox::exec::test;

Expand Down Expand Up @@ -154,23 +155,11 @@ class MultiFragmentTest : public HiveConnectorTestBase {
createDuckDbTable(vectors_);
}

std::unique_ptr<SerializedPage> toSerializedPage(const RowVectorPtr& vector) {
auto data = std::make_unique<VectorStreamGroup>(pool());
auto size = vector->size();
auto range = IndexRange{0, size};
data->createStreamTree(asRowType(vector->type()), size);
data->append(vector, folly::Range(&range, 1));
auto listener = bufferManager_->newListener();
IOBufOutputStream stream(*pool(), listener.get(), data->size());
data->flush(&stream);
return std::make_unique<SerializedPage>(stream.getIOBuf(), nullptr, size);
}

int32_t enqueue(
const std::string& taskId,
int32_t destination,
const RowVectorPtr& data) {
auto page = toSerializedPage(data);
auto page = toSerializedPage(data, bufferManager_, pool());
const auto pageSize = page->size();

ContinueFuture unused;
Expand Down
17 changes: 2 additions & 15 deletions velox/exec/tests/OutputBufferManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "velox/dwio/common/tests/utils/BatchMaker.h"
#include "velox/exec/Task.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/SerializedPageUtil.h"
#include "velox/serializers/PrestoSerializer.h"

using namespace facebook::velox;
Expand Down Expand Up @@ -90,21 +91,7 @@ class OutputBufferManagerTest : public testing::Test {
vector_size_t size) {
auto vector = std::dynamic_pointer_cast<RowVector>(
BatchMaker::createBatch(rowType, size, *pool_));
return toSerializedPage(vector);
}

std::unique_ptr<SerializedPage> toSerializedPage(VectorPtr vector) {
auto data = std::make_unique<VectorStreamGroup>(pool_.get());
auto size = vector->size();
auto range = IndexRange{0, size};
data->createStreamTree(
std::dynamic_pointer_cast<const RowType>(vector->type()), size);
data->append(
std::dynamic_pointer_cast<RowVector>(vector), folly::Range(&range, 1));
auto listener = bufferManager_->newListener();
IOBufOutputStream stream(*pool_, listener.get(), data->size());
data->flush(&stream);
return std::make_unique<SerializedPage>(stream.getIOBuf(), nullptr, size);
return exec::test::toSerializedPage(vector, bufferManager_, pool_.get());
}

void enqueue(
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/tests/utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ add_library(
SumNonPODAggregate.cpp
TpchQueryBuilder.cpp
VectorTestUtil.cpp
PortUtil.cpp)
PortUtil.cpp
SerializedPageUtil.cpp)

target_link_libraries(
velox_exec_test_lib
Expand Down
38 changes: 38 additions & 0 deletions velox/exec/tests/utils/SerializedPageUtil.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/exec/tests/utils/SerializedPageUtil.h"

using namespace facebook::velox;

namespace facebook::velox::exec::test {

std::unique_ptr<SerializedPage> toSerializedPage(
const RowVectorPtr& vector,
const std::shared_ptr<OutputBufferManager>& bufferManager,
memory::MemoryPool* pool) {
auto data = std::make_unique<VectorStreamGroup>(pool);
auto size = vector->size();
auto range = IndexRange{0, size};
data->createStreamTree(asRowType(vector->type()), size);
data->append(vector, folly::Range(&range, 1));
auto listener = bufferManager->newListener();
IOBufOutputStream stream(*pool, listener.get(), data->size());
data->flush(&stream);
return std::make_unique<SerializedPage>(stream.getIOBuf(), nullptr, size);
}

} // namespace facebook::velox::exec::test
31 changes: 31 additions & 0 deletions velox/exec/tests/utils/SerializedPageUtil.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "velox/exec/ExchangeQueue.h"
#include "velox/exec/OutputBufferManager.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/VectorStream.h"

namespace facebook::velox::exec::test {

/// Helper function for serializing RowVector to PrestoPage format.
std::unique_ptr<SerializedPage> toSerializedPage(
const RowVectorPtr& vector,
const std::shared_ptr<OutputBufferManager>& bufferManager,
memory::MemoryPool* pool);

} // namespace facebook::velox::exec::test

0 comments on commit fee04b8

Please sign in to comment.