Skip to content

Commit

Permalink
add bufferPool for nimble parallel writer (#103)
Browse files Browse the repository at this point in the history
Summary:

change the buffer class to a bufferPool to handle multihreaded buffers without mutexes

Differential Revision: D64774959
  • Loading branch information
Scott Young authored and facebook-github-bot committed Dec 13, 2024
1 parent 2fa1587 commit 1ca4fa2
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 37 deletions.
8 changes: 1 addition & 7 deletions dwio/nimble/common/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

namespace facebook::nimble {

// Internally manages memory in chunks. Releases memory only upon destruction.
// Internally manages memory in chunks. releases memory when destroyed
// Buffer is NOT threadsafe: external locking is required.
class Buffer {
using MemoryPool = facebook::velox::memory::MemoryPool;
Expand All @@ -52,7 +52,6 @@ class Buffer {
// to, and guarantees for the lifetime of *this that that region will remain
// valid. Does NOT guarantee that the region is initially 0'd.
char* reserve(uint64_t bytes) {
std::scoped_lock<std::mutex> l(mutex_);
if (reserveEnd_ + bytes <= chunkEnd_) {
pos_ = reserveEnd_;
reserveEnd_ += bytes;
Expand Down Expand Up @@ -98,11 +97,6 @@ class Buffer {
char* reserveEnd_;
std::vector<velox::BufferPtr> chunks_;
MemoryPool& memoryPool_;
// NOTE: this is temporary fix, to quickly enable parallel access to the
// buffer class. In the near future, we are going to templetize this class to
// produce a concurrent and a non-concurrent variants, and change the call
// sites to use each variant when needed.
std::mutex mutex_;
};

} // namespace facebook::nimble
101 changes: 101 additions & 0 deletions dwio/nimble/common/tests/BufferPoolTests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (c) Meta Platforms, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <gtest/gtest.h>
#include "dwio/nimble/common/Exceptions.h"
#include "dwio/nimble/velox/FieldWriter.h"
#include "folly/executors/CPUThreadPoolExecutor.h"
#include "velox/common/memory/Memory.h"
#include "velox/common/memory/MemoryArbitrator.h"
#include "velox/common/memory/SharedArbitrator.h"
#include "velox/dwio/common/ExecutorBarrier.h"

namespace facebook::nimble::test {
using MemoryPool = velox::memory::MemoryPool;
using ExecutorBarrier = velox::dwio::common::ExecutorBarrier;

class BufferPoolTest : public ::testing::Test {
protected:
static void SetUpTestCase() {
velox::memory::SharedArbitrator::registerFactory();
velox::memory::MemoryManager::testingSetInstance(
{.arbitratorKind = "SHARED"});
}

void SetUp() override {
rootPool_ = velox::memory::memoryManager()->addRootPool("default_root");
leafPool_ = rootPool_->addLeafChild("default_leaf");
}

std::shared_ptr<velox::memory::MemoryPool> rootPool_;
std::shared_ptr<velox::memory::MemoryPool> leafPool_;
};

TEST_F(BufferPoolTest, CreateBufferPoolBadMaxPool) {
try {
auto bufferPool = BufferPool{*leafPool_, /* maxPoolSize */ 0};
FAIL();
} catch (const NimbleUserError& e) {
EXPECT_EQ(e.errorMessage(), "max pool size must be > 0");
}
}

TEST_F(BufferPoolTest, ReserveBuffer) {
auto bufferPool = BufferPool{*leafPool_, /* maxPoolSize */ 10};
EXPECT_EQ(bufferPool.size(), 10);
{
auto buffer = bufferPool.reserveBuffer();
EXPECT_EQ(bufferPool.size(), 9);
}
EXPECT_EQ(bufferPool.size(), 10);
}

TEST_F(BufferPoolTest, EmptyFillBufferPool) {
size_t iterations = 10;
auto bufferPool = BufferPool{*leafPool_, /* maxPoolSize */ iterations};

for (auto i = 0; i < iterations; ++i) {
{
auto buffer1 = bufferPool.reserveBuffer();
auto buffer2 = bufferPool.reserveBuffer();
auto buffer3 = bufferPool.reserveBuffer();

EXPECT_EQ(bufferPool.size(), iterations - 3);
}
EXPECT_EQ(bufferPool.size(), iterations);
}
}

TEST_F(BufferPoolTest, ParallelFillPool) {
auto parallelismFactor = std::thread::hardware_concurrency();
auto executor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
ExecutorBarrier barrier{executor};
auto bufferPool = BufferPool{*leafPool_};
EXPECT_EQ(bufferPool.size(), parallelismFactor);

for (auto i = 0; i < parallelismFactor; ++i) {
barrier.add([&]() {
for (auto j = 0; j < 100000; ++j) {
auto buffer = bufferPool.reserveBuffer();
}
});
}

barrier.waitAll();
EXPECT_LE(bufferPool.size(), parallelismFactor);
}
} // namespace facebook::nimble::test
65 changes: 63 additions & 2 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,8 @@ class SimpleFieldWriter : public FieldWriter {
const OrderedRanges& ranges,
folly::Executor*) override {
auto size = ranges.size();
auto& buffer = context_.stringBuffer();
auto& data = valuesStream_.mutableData();

auto bufferObject = context_.bufferPool().reserveBuffer();
if (auto flat = vector->asFlatVector<SourceType>()) {
valuesStream_.ensureNullsCapacity(flat->mayHaveNulls(), size);
bool rangeCopied = false;
Expand Down Expand Up @@ -331,6 +330,7 @@ class SimpleFieldWriter : public FieldWriter {
valuesStream_.mutableNonNulls(),
Flat<SourceType>{vector},
[&](SourceType value) {
auto& buffer = bufferObject.get();
data.push_back(
C::convert(value, buffer, valuesStream_.extraMemory()));
});
Expand All @@ -344,6 +344,7 @@ class SimpleFieldWriter : public FieldWriter {
valuesStream_.mutableNonNulls(),
Decoded<SourceType>{decoded},
[&](SourceType value) {
auto& buffer = bufferObject.get();
data.push_back(
C::convert(value, buffer, valuesStream_.extraMemory()));
});
Expand Down Expand Up @@ -1590,6 +1591,66 @@ size_t DecodingContextPool::size() const {
return pool_.size();
}

BufferPool::BufferObject::BufferObject(
BufferPool& pool,
std::unique_ptr<Buffer> buffer)
: pool_{pool}, buffer_{std::move(buffer)} {}

BufferPool::BufferObject::~BufferObject() {
pool_.addBuffer(std::move(buffer_));
}

Buffer& BufferPool::BufferObject::get() {
return *buffer_;
}

BufferPool::BufferPool(
facebook::velox::memory::MemoryPool& memoryPool,
size_t maxPoolSize,
uint64_t initialChunkSize)
: defaultInitialChunkSize_{initialChunkSize},
semaphore_{0},
memoryPool_{memoryPool} {
NIMBLE_CHECK(maxPoolSize > 0, "max pool size must be > 0")
pool_.reserve(maxPoolSize);
for (size_t i = 0; i < maxPoolSize; ++i) {
pool_.emplace_back(newBuffer());
semaphore_.release();
}
}

facebook::velox::memory::MemoryPool& BufferPool::getMemoryPool() {
return memoryPool_;
}

// buffer back to the pool.
void BufferPool::addBuffer(std::unique_ptr<Buffer> buffer) {
std::scoped_lock<std::mutex> lock(mutex_);
pool_.push_back(std::move(buffer));
semaphore_.release();
}

// Reserves a buffer from the pool. Adds a new buffer to the pool
// while there are buffers available
BufferPool::BufferObject BufferPool::reserveBuffer() {
semaphore_.acquire();

std::scoped_lock<std::mutex> lock(mutex_);
auto buffer = std::move(pool_.back());
pool_.pop_back();

return BufferPool::BufferObject{*this, std::move(buffer)};
}

// Returns estimated number of buffers in the pool
size_t BufferPool::size() {
return pool_.size();
}

std::unique_ptr<Buffer> BufferPool::newBuffer() {
return std::make_unique<Buffer>(memoryPool_, defaultInitialChunkSize_);
}

std::unique_ptr<FieldWriter> FieldWriter::create(
FieldWriterContext& context,
const std::shared_ptr<const velox::dwio::common::TypeWithId>& type,
Expand Down
58 changes: 46 additions & 12 deletions dwio/nimble/velox/FieldWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,59 @@ class DecodingContextPool {
std::unique_ptr<velox::SelectivityVector> selectivityVector);
};

// Manages a pool of buffers. Buffers are returned to the pool when released.
// maxPoolSize should be set to at least 90% of capacity for performance
class BufferPool {
public:
class BufferObject {
public:
explicit BufferObject(BufferPool& pool, std::unique_ptr<Buffer> buffer);

~BufferObject();
Buffer& get();

private:
BufferPool& pool_;
std::unique_ptr<Buffer> buffer_;
};

explicit BufferPool(
facebook::velox::memory::MemoryPool& memoryPool,
size_t maxPoolSize = std::thread::hardware_concurrency(),
uint64_t initialChunkSize = kMinChunkSize);

facebook::velox::memory::MemoryPool& getMemoryPool();
BufferObject reserveBuffer();
size_t size();

private:
static const uint64_t kMinChunkSize = 1LL << 20;
const uint64_t defaultInitialChunkSize_;

std::mutex mutex_;
std::counting_semaphore<> semaphore_;
std::vector<std::unique_ptr<Buffer>> pool_;
facebook::velox::memory::MemoryPool& memoryPool_;

void addBuffer(std::unique_ptr<Buffer> buffer);
std::unique_ptr<Buffer> newBuffer();
};

struct FieldWriterContext {
explicit FieldWriterContext(
velox::memory::MemoryPool& memoryPool,
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer = nullptr,
std::function<void(void)> vectorDecoderVisitor = []() {})
std::function<void(void)> vectorDecoderVisitor = []() {},
size_t maxPoolSize = std::thread::hardware_concurrency())
: bufferMemoryPool{memoryPool.addLeafChild(
"field_writer_buffer",
true,
std::move(reclaimer))},
inputBufferGrowthPolicy{
DefaultInputBufferGrowthPolicy::withDefaultRanges()},
decodingContextPool_{std::move(vectorDecoderVisitor)} {
resetStringBuffer();
}
bufferPool_{
std::make_unique<BufferPool>(*bufferMemoryPool, maxPoolSize)},
decodingContextPool_{std::move(vectorDecoderVisitor)} {}

std::shared_ptr<velox::memory::MemoryPool> bufferMemoryPool;
std::mutex flatMapSchemaMutex;
Expand All @@ -112,13 +151,8 @@ struct FieldWriterContext {
return decodingContextPool_.reserveContext();
}

Buffer& stringBuffer() {
return *buffer_;
}

// Reset writer context for use by next stripe.
void resetStringBuffer() {
buffer_ = std::make_unique<Buffer>(*bufferMemoryPool);
BufferPool& bufferPool() {
return *bufferPool_;
}

const std::vector<std::unique_ptr<StreamData>>& streams() {
Expand Down Expand Up @@ -148,7 +182,7 @@ struct FieldWriterContext {
}

private:
std::unique_ptr<Buffer> buffer_;
std::unique_ptr<BufferPool> bufferPool_;
DecodingContextPool decodingContextPool_;
std::vector<std::unique_ptr<StreamData>> streams_;
};
Expand Down
20 changes: 10 additions & 10 deletions dwio/nimble/velox/VeloxWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ class WriterContext : public FieldWriterContext {
WriterContext(
velox::memory::MemoryPool& memoryPool,
VeloxWriterOptions options)
: FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor},
: FieldWriterContext{
memoryPool,
options.reclaimerFactory(),
options.vectorDecoderVisitor,
options.maxPoolSize
},
options{std::move(options)},
logger{this->options.metricsLogger} {
flushPolicy = this->options.flushPolicyFactory();
Expand Down Expand Up @@ -622,9 +627,6 @@ void VeloxWriter::writeChunk(bool lastChunk) {
LoggingScope scope{*context_->logger};
velox::CpuWallTimer veloxTimer{context_->stripeFlushTiming};

if (!encodingBuffer_) {
encodingBuffer_ = std::make_unique<Buffer>(*encodingMemoryPool_);
}
streams_.resize(context_->schemaBuilder.nodeCount());

// When writing null streams, we write the nulls as data, and the stream
Expand Down Expand Up @@ -668,9 +670,11 @@ void VeloxWriter::writeChunk(bool lastChunk) {

auto encode = [&](StreamData& streamData) {
const auto offset = streamData.descriptor().offset();
auto encoded = encodeStream(*context_, *encodingBuffer_, streamData);
auto bufferObject = context_->bufferPool().reserveBuffer();
auto& buffer = bufferObject.get();
auto encoded = encodeStream(*context_, buffer, streamData);
if (!encoded.empty()) {
ChunkedStreamWriter chunkWriter{*encodingBuffer_};
ChunkedStreamWriter chunkWriter{buffer};
NIMBLE_DASSERT(offset < streams_.size(), "Stream offset out of range.");
auto& stream = streams_[offset];
for (auto& buffer : chunkWriter.encode(encoded)) {
Expand Down Expand Up @@ -782,10 +786,6 @@ uint32_t VeloxWriter::writeStripe() {
uint64_t startSize = writer_.size();
writer_.writeStripe(context_->rowsInStripe, std::move(streams_));
stripeSize = writer_.size() - startSize;
encodingBuffer_.reset();
// TODO: once chunked string fields are supported, move string buffer
// reset to writeChunk()
context_->resetStringBuffer();
}

NIMBLE_ASSERT(
Expand Down
1 change: 0 additions & 1 deletion dwio/nimble/velox/VeloxWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ class VeloxWriter {
TabletWriter writer_;
std::unique_ptr<FieldWriter> root_;

std::unique_ptr<Buffer> encodingBuffer_;
std::vector<Stream> streams_;
std::exception_ptr lastException_;
const velox::common::SpillConfig* const spillConfig_;
Expand Down
1 change: 1 addition & 0 deletions dwio/nimble/velox/VeloxWriterOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ struct VeloxWriterOptions {

const velox::common::SpillConfig* spillConfig{nullptr};

size_t maxPoolSize = std::thread::hardware_concurrency();
// If provided, internal encoding operations will happen in parallel using
// this executor.
std::shared_ptr<folly::Executor> encodingExecutor;
Expand Down
14 changes: 9 additions & 5 deletions dwio/nimble/velox/tests/VeloxWriterTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,15 @@ TEST_F(VeloxWriterTests, MemoryReclaimPath) {
std::string file;
auto writeFile = std::make_unique<velox::InMemoryWriteFile>(&file);
std::atomic_bool reclaimEntered = false;
nimble::VeloxWriterOptions writerOptions{.reclaimerFactory = [&]() {
auto reclaimer = std::make_unique<MockReclaimer>();
reclaimer->setEnterArbitrationFunc([&]() { reclaimEntered = true; });
return reclaimer;
}};
nimble::VeloxWriterOptions writerOptions{
.reclaimerFactory =
[&]() {
auto reclaimer = std::make_unique<MockReclaimer>();
reclaimer->setEnterArbitrationFunc(
[&]() { reclaimEntered = true; });
return reclaimer;
},
.maxPoolSize = 2};
nimble::VeloxWriter writer(
*writerPool, type, std::move(writeFile), std::move(writerOptions));
auto batches = generateBatches(type, 100, 4000, 20221110, *leafPool_);
Expand Down

0 comments on commit 1ca4fa2

Please sign in to comment.