Skip to content

Commit

Permalink
add bufferPool for nimble parallel writer (#103)
Browse files Browse the repository at this point in the history
Summary:

change the buffer class to a bufferPool to handle multihreaded buffers without mutexes

Differential Revision: D64774959
  • Loading branch information
Scott Young authored and facebook-github-bot committed Nov 7, 2024
1 parent a3be2f8 commit d558355
Show file tree
Hide file tree
Showing 8 changed files with 379 additions and 39 deletions.
136 changes: 127 additions & 9 deletions dwio/nimble/common/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
*/
#pragma once

#include "dwio/nimble/common/Exceptions.h"
#include "folly/concurrency/DynamicBoundedQueue.h"
#include "folly/coro/AsyncScope.h"
#include "folly/coro/BlockingWait.h"
#include "velox/buffer/Buffer.h"
#include "velox/common/memory/Memory.h"

Expand All @@ -33,12 +37,12 @@
// and so on

namespace facebook::nimble {
using MemoryPool = facebook::velox::memory::MemoryPool;
using AsyncScope = folly::coro::AsyncScope;

// Internally manages memory in chunks. Releases memory only upon destruction.
// Internally manages memory in chunks. releases memory when destroyed
// Buffer is NOT threadsafe: external locking is required.
class Buffer {
using MemoryPool = facebook::velox::memory::MemoryPool;

public:
explicit Buffer(
MemoryPool& memoryPool,
Expand All @@ -52,7 +56,6 @@ class Buffer {
// to, and guarantees for the lifetime of *this that that region will remain
// valid. Does NOT guarantee that the region is initially 0'd.
char* reserve(uint64_t bytes) {
std::scoped_lock<std::mutex> l(mutex_);
if (reserveEnd_ + bytes <= chunkEnd_) {
pos_ = reserveEnd_;
reserveEnd_ += bytes;
Expand Down Expand Up @@ -98,11 +101,126 @@ class Buffer {
char* reserveEnd_;
std::vector<velox::BufferPtr> chunks_;
MemoryPool& memoryPool_;
// NOTE: this is temporary fix, to quickly enable parallel access to the
// buffer class. In the near future, we are going to templetize this class to
// produce a concurrent and a non-concurrent variants, and change the call
// sites to use each variant when needed.
std::mutex mutex_;
};

using BufferPtr = std::unique_ptr<Buffer>;

// Manages a pool of buffers. Buffers are returned to the pool when released.
// maxPoolSize should be set to at least 90% of capacity for performance
class BufferPool {
public:
explicit BufferPool(
MemoryPool& memoryPool,
std::shared_ptr<folly::Executor> executor = nullptr,
std::chrono::milliseconds timeout = std::chrono::milliseconds(1000 * 10),
size_t maxPoolSize = std::thread::hardware_concurrency(),
size_t initialBufferCount = 1,
uint64_t initialChunkSize = kMinChunkSize)
: defaultInitialChunkSize_{initialChunkSize},
timeout_{timeout},
bufferQueue_{maxPoolSize},
memoryPool_{memoryPool} {
NIMBLE_CHECK(maxPoolSize > 0, "max pool size must be > 0")
NIMBLE_CHECK(timeout_.count() > 0, "timeout must be > 0")
NIMBLE_CHECK(0 < initialBufferCount, "initial pool size must be > 0");
NIMBLE_CHECK(
initialBufferCount <= maxPoolSize,
"initial pool size must be <= max pool size")

if (executor) {
backgroundScope_ = std::make_unique<AsyncScope>(executor);
}

for (size_t i = 0; i < initialBufferCount; ++i) {
addBuffer();
}
}
~BufferPool() {
if (backgroundScope_) {
folly::coro::blockingWait(backgroundScope_->joinAsync());
}
deleteBufferPool();
}

MemoryPool& getMemoryPool() {
return memoryPool_;
}

void co_addBuffer(BufferPtr buffer) {
backgroundScope_->add(folly::coro::co_invoke(
[this, bufferRef = std::move(buffer)]() mutable
-> folly::coro::Task<void> {
addBuffer(std::move(bufferRef));
co_return;
})
.scheduleOn(executor_.get()));
}

// Releases the buffer back to the pool.
void addBuffer(BufferPtr buffer) {
const auto status =
bufferQueue_.try_enqueue_for(std::move(buffer), timeout_);
if (!status) {
NIMBLE_UNKNOWN(
"Timed out enqueuing for buffer. Timeout set to " +
std::to_string(timeout_.count()) + " ms");
}
}

// Reserves a buffer from the pool. Adds a new buffer to the pool
// while there are buffers available
BufferPtr reserveBuffer() {
if (bufferQueue_.empty()) {
return newBuffer();
}

BufferPtr buffer;
const auto status = bufferQueue_.try_dequeue_for(buffer, timeout_);
if (!status) {
NIMBLE_UNREACHABLE(
"Timed out dequeuing for buffer. Timeout set to " +
std::to_string(timeout_.count()) + " ms");
}
return buffer;
}

// Returns estimated number of buffers in the pool
size_t size() {
return bufferQueue_.size();
}

private:
static const uint64_t kMinChunkSize = 1LL << 20;
const uint64_t defaultInitialChunkSize_ = kMinChunkSize;
const std::chrono::milliseconds timeout_ = std::chrono::milliseconds(1000);

std::shared_ptr<folly::Executor> executor_;
folly::DMPMCQueue<BufferPtr, false> bufferQueue_;
std::unique_ptr<AsyncScope> backgroundScope_;

MemoryPool& memoryPool_;

BufferPtr newBuffer() {
return std::make_unique<Buffer>(memoryPool_, defaultInitialChunkSize_);
}

void addBuffer() {
auto status = bufferQueue_.try_enqueue_for(newBuffer(), timeout_);
if (!status) {
NIMBLE_UNKNOWN(
"Timed out enqueuing for buffer. Timeout set to " +
std::to_string(timeout_.count()) + " ms");
}
}

// clears all buffers in the pool
void deleteBufferPool() {
while (!bufferQueue_.empty()) {
BufferPtr buffer;
bufferQueue_.dequeue(buffer);
buffer.reset();
}
}
};

} // namespace facebook::nimble
205 changes: 205 additions & 0 deletions dwio/nimble/common/tests/BufferPoolTests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* Copyright (c) Meta Platforms, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <gtest/gtest.h>
#include "dwio/nimble/common/Buffer.h"
#include "dwio/nimble/common/Exceptions.h"
#include "folly/executors/CPUThreadPoolExecutor.h"
#include "velox/common/memory/Memory.h"
#include "velox/dwio/common/ExecutorBarrier.h"

namespace facebook::nimble::test {
using MemoryPool = velox::memory::MemoryPool;
using ExecutorBarrier = velox::dwio::common::ExecutorBarrier;

class BufferPoolTest : public ::testing::Test {
protected:
static void SetUpTestCase() {}

void SetUp() override {
memPool_ = velox::memory::deprecatedAddDefaultLeafMemoryPool();
executor_ = std::make_unique<folly::CPUThreadPoolExecutor>(1);
}

std::shared_ptr<velox::memory::MemoryPool> memPool_;
std::unique_ptr<folly::CPUThreadPoolExecutor> executor_;
};

TEST_F(BufferPoolTest, CreateBufferPool) {
auto bufferPool = BufferPool{*memPool_, std::move(executor_)};
EXPECT_NO_THROW(bufferPool);
}

TEST_F(BufferPoolTest, CreateBufferPoolWithInitialSize) {
auto bufferPool = BufferPool{
*memPool_, std::move(executor_), std::chrono::milliseconds{10}, 10, 10};
EXPECT_EQ(bufferPool.size(), 10);
}

TEST_F(BufferPoolTest, CreateBufferPoolBadTimeout) {
auto throwLambda = [&]() {
auto bufferPool = BufferPool{
*memPool_, std::move(executor_), std::chrono::milliseconds{0}};
};
EXPECT_THROW(throwLambda(), NimbleUserError);

try {
throwLambda();
} catch (const NimbleUserError& e) {
EXPECT_EQ(e.errorMessage(), "timeout must be > 0");
}
}

TEST_F(BufferPoolTest, CreateBufferPoolBadMaxPool) {
auto throwLambda = [&]() {
auto bufferPool = BufferPool{
*memPool_, std::move(executor_), std::chrono::milliseconds{1}, 0};
};
EXPECT_THROW(throwLambda(), NimbleUserError);

try {
throwLambda();
} catch (const NimbleUserError& e) {
EXPECT_EQ(e.errorMessage(), "max pool size must be > 0");
}
}

TEST_F(BufferPoolTest, CreateBufferPoolBadInitialSize) {
auto throwLambda = [&]() {
auto bufferPool = BufferPool{
*memPool_, std::move(executor_), std::chrono::milliseconds{1}, 10, 0};
};
EXPECT_THROW(throwLambda(), NimbleUserError);
try {
throwLambda();
} catch (const NimbleUserError& e) {
EXPECT_EQ(e.errorMessage(), "initial pool size must be > 0");
}
}

TEST_F(BufferPoolTest, CreateBufferPoolBadInitialSizeMaxSize) {
auto throwLambda = [&]() {
auto bufferPool = BufferPool{
*memPool_, std::move(executor_), std::chrono::milliseconds{1}, 1, 2};
};
EXPECT_THROW(throwLambda(), NimbleUserError);

try {
throwLambda();
} catch (const NimbleUserError& e) {
EXPECT_EQ(e.errorMessage(), "initial pool size must be <= max pool size");
}
}

TEST_F(BufferPoolTest, ReserveBuffer) {
auto bufferPool = BufferPool{*memPool_, std::move(executor_)};
auto buffer = bufferPool.reserveBuffer();
EXPECT_NE(buffer, nullptr);
}

TEST_F(BufferPoolTest, AddBuffer) {
auto bufferPool = BufferPool{*memPool_, std::move(executor_)};
auto buffer = bufferPool.reserveBuffer();
EXPECT_NE(buffer, nullptr);
EXPECT_EQ(bufferPool.size(), 0);
bufferPool.addBuffer(std::move(buffer));
EXPECT_EQ(bufferPool.size(), 1);
}

TEST_F(BufferPoolTest, DestroyBufferPool) {
auto bufferPoolPtr =
std::make_unique<BufferPool>(*memPool_, std::move(executor_));
bufferPoolPtr.reset();
EXPECT_NO_THROW();
}

TEST_F(BufferPoolTest, emptyBufferPool) {
auto bufferPool = BufferPool{
*memPool_, std::move(executor_), std::chrono::milliseconds{10}, 1};
EXPECT_EQ(bufferPool.size(), 1);

auto buffer1 = bufferPool.reserveBuffer();
EXPECT_EQ(bufferPool.size(), 0);
}

TEST_F(BufferPoolTest, OverfillBufferPool) {
// not guarenteed to fail at size of 2 due to DMPMCQueue
auto bufferPool = BufferPool{
*memPool_, std::move(executor_), std::chrono::milliseconds{10}, 1};
auto throwLambda = [&]() {
for (auto i = 0; i < 5; i++) {
auto buffer = std::make_unique<Buffer>(*memPool_);
bufferPool.addBuffer(std::move(buffer));
}
};

EXPECT_THROW(throwLambda(), NimbleInternalError);
}

TEST_F(BufferPoolTest, FillEmptyFillBufferPool) {
size_t iterations = 10;
std::vector<BufferPtr> buffers;

auto bufferPool = BufferPool{
*memPool_,
std::move(executor_),
std::chrono::milliseconds{10},
iterations};
for (auto i = 0; i < iterations; i++) {
auto buffer = bufferPool.reserveBuffer();
buffers.push_back(std::move(buffer));
}
EXPECT_EQ(bufferPool.size(), 0);
EXPECT_EQ(buffers.size(), iterations);
for (auto& buffer : buffers) {
bufferPool.addBuffer(std::move(buffer));
}
EXPECT_EQ(bufferPool.size(), iterations);
buffers.clear();

for (auto i = 0; i < iterations; i++) {
auto buffer = bufferPool.reserveBuffer();
buffers.push_back(std::move(buffer));
}
EXPECT_EQ(bufferPool.size(), 0);
EXPECT_EQ(buffers.size(), iterations);
}

TEST_F(BufferPoolTest, ParallelFillPool) {
folly::CPUThreadPoolExecutor executor{10};
ExecutorBarrier barrier{executor};
auto bufferPool = BufferPool{
*memPool_,
std::move(executor_),
std::chrono::milliseconds{1000 * 10},
100};
auto fillPool = [&]() {
for (auto i = 0; i < 10; i++) {
EXPECT_NO_THROW(
auto buffer = bufferPool.reserveBuffer();
std::this_thread::sleep_for(std::chrono::milliseconds{1000});
bufferPool.addBuffer(std::move(buffer)););
}
};

for (auto i = 0; i < 10; i++) {
barrier.add(fillPool);
}

barrier.waitAll();
}

} // namespace facebook::nimble::test
Loading

0 comments on commit d558355

Please sign in to comment.