diff --git a/dwio/nimble/common/Buffer.h b/dwio/nimble/common/Buffer.h index 6408ca5..455279b 100644 --- a/dwio/nimble/common/Buffer.h +++ b/dwio/nimble/common/Buffer.h @@ -15,6 +15,10 @@ */ #pragma once +#include "dwio/nimble/common/Exceptions.h" +#include "folly/concurrency/DynamicBoundedQueue.h" +#include "folly/coro/AsyncScope.h" +#include "folly/coro/BlockingWait.h" #include "velox/buffer/Buffer.h" #include "velox/common/memory/Memory.h" @@ -33,12 +37,13 @@ // and so on namespace facebook::nimble { +using MemoryPool = facebook::velox::memory::MemoryPool; +using AsyncScope = folly::coro::AsyncScope; -// Internally manages memory in chunks. Releases memory only upon destruction. +// Internally manages memory in chunks. releases memory to the pool when +// cleared is called // Buffer is NOT threadsafe: external locking is required. class Buffer { - using MemoryPool = facebook::velox::memory::MemoryPool; - public: explicit Buffer( MemoryPool& memoryPool, @@ -52,7 +57,6 @@ class Buffer { // to, and guarantees for the lifetime of *this that that region will remain // valid. Does NOT guarantee that the region is initially 0'd. char* reserve(uint64_t bytes) { - std::scoped_lock l(mutex_); if (reserveEnd_ + bytes <= chunkEnd_) { pos_ = reserveEnd_; reserveEnd_ += bytes; @@ -98,11 +102,123 @@ class Buffer { char* reserveEnd_; std::vector chunks_; MemoryPool& memoryPool_; - // NOTE: this is temporary fix, to quickly enable parallel access to the - // buffer class. In the near future, we are going to templetize this class to - // produce a concurrent and a non-concurrent variants, and change the call - // sites to use each variant when needed. - std::mutex mutex_; +}; + +using BufferPtr = std::unique_ptr; + +// Manages a pool of buffers. Buffers are returned to the pool when released. +// maxPoolSize should be set to at least 90% of capacity for performance +class BufferPool { + public: + explicit BufferPool( + MemoryPool& memoryPool, + std::shared_ptr executor, + std::chrono::milliseconds timeout = std::chrono::milliseconds(1000 * 10), + size_t maxPoolSize = std::thread::hardware_concurrency(), + size_t initialBufferCount = 1, + uint64_t initialChunkSize = kMinChunkSize) + : defaultInitialChunkSize_{initialChunkSize}, + timeout_{timeout}, + executor_{std::move(executor)}, + bufferQueue_{maxPoolSize}, + backgroundScope_{std::make_unique()}, + memoryPool_{memoryPool} { + NIMBLE_CHECK(maxPoolSize > 0, "max pool size must be > 0") + NIMBLE_CHECK(timeout_.count() > 0, "timeout must be > 0") + NIMBLE_CHECK(0 < initialBufferCount, "initial pool size must be > 0"); + NIMBLE_CHECK( + initialBufferCount <= maxPoolSize, + "initial pool size must be <= max pool size") + + for (size_t i = 0; i < initialBufferCount; ++i) { + addBuffer(); + } + } + ~BufferPool() { + folly::coro::blockingWait(backgroundScope_->joinAsync()); + deleteBufferPool(); + backgroundScope_.reset(); + } + + MemoryPool& getMemoryPool() { + return memoryPool_; + } + + void co_addBuffer(BufferPtr buffer) { + backgroundScope_->add(folly::coro::co_invoke( + [this, bufferRef = std::move(buffer)]() mutable + -> folly::coro::Task { + addBuffer(std::move(bufferRef)); + co_return; + }) + .scheduleOn(executor_.get())); + } + + // Releases the buffer back to the pool. + void addBuffer(BufferPtr buffer) { + const auto status = + bufferQueue_.try_enqueue_for(std::move(buffer), timeout_); + if (!status) { + NIMBLE_UNKNOWN( + "Timed out enqueuing for buffer. Timeout set to " + + std::to_string(timeout_.count()) + " ms"); + } + } + + // Reserves a buffer from the pool. Adds a new buffer to the pool + // while there are buffers available + BufferPtr reserveBuffer() { + if (bufferQueue_.empty()) { + return newBuffer(); + } + + BufferPtr buffer; + const auto status = bufferQueue_.try_dequeue_for(buffer, timeout_); + if (!status) { + NIMBLE_UNREACHABLE( + "Timed out dequeuing for buffer. Timeout set to " + + std::to_string(timeout_.count()) + " ms"); + } + return buffer; + } + + // Returns estimated number of buffers in the pool + size_t size() { + return bufferQueue_.size(); + } + + private: + static const uint64_t kMinChunkSize = 1LL << 20; + const uint64_t defaultInitialChunkSize_ = kMinChunkSize; + const std::chrono::milliseconds timeout_ = std::chrono::milliseconds(1000); + + std::shared_ptr executor_; + folly::DMPMCQueue bufferQueue_; + std::unique_ptr backgroundScope_; + + MemoryPool& memoryPool_; + + BufferPtr newBuffer() { + return std::make_unique(memoryPool_, defaultInitialChunkSize_); + } + + void addBuffer() { + auto status = bufferQueue_.try_enqueue_for(newBuffer(), timeout_); + if (!status) { + NIMBLE_UNKNOWN( + "Timed out enqueuing for buffer. Timeout set to " + + std::to_string(timeout_.count()) + " ms"); + } + } + + // clears all buffers in the pool + void deleteBufferPool() { + while (!bufferQueue_.empty()) { + BufferPtr buffer; + bufferQueue_.dequeue(buffer); + buffer.reset(); + } + } }; } // namespace facebook::nimble diff --git a/dwio/nimble/common/tests/BufferPoolTests.cpp b/dwio/nimble/common/tests/BufferPoolTests.cpp new file mode 100644 index 0000000..78f16b4 --- /dev/null +++ b/dwio/nimble/common/tests/BufferPoolTests.cpp @@ -0,0 +1,204 @@ +/* + * Copyright (c) Meta Platforms, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "dwio/nimble/common/Buffer.h" +#include "dwio/nimble/common/Exceptions.h" +#include "folly/executors/CPUThreadPoolExecutor.h" +#include "velox/common/memory/Memory.h" +#include "velox/dwio/common/ExecutorBarrier.h" + +namespace facebook::nimble::test { +using MemoryPool = velox::memory::MemoryPool; +using ExecutorBarrier = velox::dwio::common::ExecutorBarrier; + +class BufferPoolTest : public ::testing::Test { + protected: + static void SetUpTestCase() {} + + void SetUp() override { + memPool_ = velox::memory::deprecatedAddDefaultLeafMemoryPool(); + executor_ = std::make_unique(1); + } + + std::shared_ptr memPool_; + std::unique_ptr executor_; +}; + +TEST_F(BufferPoolTest, CreateBufferPool) { + auto bufferPool = BufferPool{*memPool_, std::move(executor_)}; + EXPECT_NO_THROW(bufferPool); +} + +TEST_F(BufferPoolTest, CreateBufferPoolWithInitialSize) { + auto bufferPool = BufferPool{ + *memPool_, std::move(executor_), std::chrono::milliseconds{10}, 10, 10}; + EXPECT_EQ(bufferPool.size(), 10); +} + +TEST_F(BufferPoolTest, CreateBufferPoolBadTimeout) { + auto throwLambda = [&]() { + auto bufferPool = BufferPool{ + *memPool_, std::move(executor_), std::chrono::milliseconds{0}}; + }; + EXPECT_THROW(throwLambda(), NimbleUserError); + + try { + throwLambda(); + } catch (const NimbleUserError& e) { + EXPECT_EQ(e.errorMessage(), "timeout must be > 0"); + } +} + +TEST_F(BufferPoolTest, CreateBufferPoolBadMaxPool) { + auto throwLambda = [&]() { + auto bufferPool = BufferPool{ + *memPool_, std::move(executor_), std::chrono::milliseconds{1}, 0}; + }; + EXPECT_THROW(throwLambda(), NimbleUserError); + + try { + throwLambda(); + } catch (const NimbleUserError& e) { + EXPECT_EQ(e.errorMessage(), "max pool size must be > 0"); + } +} + +TEST_F(BufferPoolTest, CreateBufferPoolBadInitialSize) { + auto throwLambda = [&]() { + auto bufferPool = BufferPool{ + *memPool_, std::move(executor_), std::chrono::milliseconds{1}, 10, 0}; + }; + EXPECT_THROW(throwLambda(), NimbleUserError); + try { + throwLambda(); + } catch (const NimbleUserError& e) { + EXPECT_EQ(e.errorMessage(), "initial pool size must be > 0"); + } +} + +TEST_F(BufferPoolTest, CreateBufferPoolBadInitialSizeMaxSize) { + auto throwLambda = [&]() { + auto bufferPool = BufferPool{ + *memPool_, std::move(executor_), std::chrono::milliseconds{1}, 1, 2}; + }; + EXPECT_THROW(throwLambda(), NimbleUserError); + + try { + throwLambda(); + } catch (const NimbleUserError& e) { + EXPECT_EQ(e.errorMessage(), "initial pool size must be <= max pool size"); + } +} + +TEST_F(BufferPoolTest, ReserveBuffer) { + auto bufferPool = BufferPool{*memPool_, std::move(executor_)}; + auto buffer = bufferPool.reserveBuffer(); + EXPECT_NE(buffer, nullptr); +} + +TEST_F(BufferPoolTest, AddBuffer) { + auto bufferPool = BufferPool{*memPool_, std::move(executor_)}; + auto buffer = bufferPool.reserveBuffer(); + EXPECT_NE(buffer, nullptr); + EXPECT_EQ(bufferPool.size(), 0); + bufferPool.addBuffer(std::move(buffer)); + EXPECT_EQ(bufferPool.size(), 1); +} + +TEST_F(BufferPoolTest, DestroyBufferPool) { + auto bufferPoolPtr = std::make_unique(*memPool_, std::move(executor_)); + bufferPoolPtr.reset(); + EXPECT_NO_THROW(); +} + +TEST_F(BufferPoolTest, emptyBufferPool) { + auto bufferPool = BufferPool{ + *memPool_, std::move(executor_), std::chrono::milliseconds{10}, 1}; + EXPECT_EQ(bufferPool.size(), 1); + + auto buffer1 = bufferPool.reserveBuffer(); + EXPECT_EQ(bufferPool.size(), 0); +} + +TEST_F(BufferPoolTest, OverfillBufferPool) { + // not guarenteed to fail at size of 2 due to DMPMCQueue + auto bufferPool = BufferPool{ + *memPool_, std::move(executor_), std::chrono::milliseconds{10}, 1}; + auto throwLambda = [&]() { + for (auto i = 0; i < 5; i++) { + auto buffer = std::make_unique(*memPool_); + bufferPool.addBuffer(std::move(buffer)); + } + }; + + EXPECT_THROW(throwLambda(), NimbleInternalError); +} + +TEST_F(BufferPoolTest, FillEmptyFillBufferPool) { + size_t iterations = 10; + std::vector buffers; + + auto bufferPool = BufferPool{ + *memPool_, + std::move(executor_), + std::chrono::milliseconds{10}, + iterations}; + for (auto i = 0; i < iterations; i++) { + auto buffer = bufferPool.reserveBuffer(); + buffers.push_back(std::move(buffer)); + } + EXPECT_EQ(bufferPool.size(), 0); + EXPECT_EQ(buffers.size(), iterations); + for (auto& buffer : buffers) { + bufferPool.addBuffer(std::move(buffer)); + } + EXPECT_EQ(bufferPool.size(), iterations); + buffers.clear(); + + for (auto i = 0; i < iterations; i++) { + auto buffer = bufferPool.reserveBuffer(); + buffers.push_back(std::move(buffer)); + } + EXPECT_EQ(bufferPool.size(), 0); + EXPECT_EQ(buffers.size(), iterations); +} + +TEST_F(BufferPoolTest, ParallelFillPool) { + folly::CPUThreadPoolExecutor executor{10}; + ExecutorBarrier barrier{executor}; + auto bufferPool = BufferPool{ + *memPool_, + std::move(executor_), + std::chrono::milliseconds{1000 * 10}, + 100}; + auto fillPool = [&]() { + for (auto i = 0; i < 10; i++) { + EXPECT_NO_THROW( + auto buffer = bufferPool.reserveBuffer(); + std::this_thread::sleep_for(std::chrono::milliseconds{1000}); + bufferPool.addBuffer(std::move(buffer));); + } + }; + + for (auto i = 0; i < 10; i++) { + barrier.add(fillPool); + } + + barrier.waitAll(); +} + +} // namespace facebook::nimble::test diff --git a/dwio/nimble/velox/FieldWriter.cpp b/dwio/nimble/velox/FieldWriter.cpp index d0b42bb..7e72c63 100644 --- a/dwio/nimble/velox/FieldWriter.cpp +++ b/dwio/nimble/velox/FieldWriter.cpp @@ -19,6 +19,7 @@ #include "dwio/nimble/velox/DeduplicationUtils.h" #include "dwio/nimble/velox/SchemaBuilder.h" #include "dwio/nimble/velox/SchemaTypes.h" +#include "folly/coro/AsyncScope.h" #include "velox/common/base/CompareFlags.h" #include "velox/vector/ComplexVector.h" #include "velox/vector/DictionaryVector.h" @@ -287,7 +288,6 @@ class SimpleFieldWriter : public FieldWriter { void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) override { auto size = ranges.size(); - auto& buffer = context_.stringBuffer(); auto& data = valuesStream_.mutableData(); if (auto flat = vector->asFlatVector()) { @@ -329,8 +329,11 @@ class SimpleFieldWriter : public FieldWriter { valuesStream_.mutableNonNulls(), Flat{vector}, [&](SourceType value) { + auto bufferPtr = context_.bufferPool().reserveBuffer(); + auto& buffer = *bufferPtr; data.push_back( C::convert(value, buffer, valuesStream_.extraMemory())); + context_.bufferPool().co_addBuffer(std::move(bufferPtr)); }); } } else { @@ -342,8 +345,11 @@ class SimpleFieldWriter : public FieldWriter { valuesStream_.mutableNonNulls(), Decoded{decoded}, [&](SourceType value) { + auto bufferPtr = context_.bufferPool().reserveBuffer(); + auto& buffer = *bufferPtr; data.push_back( C::convert(value, buffer, valuesStream_.extraMemory())); + context_.bufferPool().co_addBuffer(std::move(bufferPtr)); }); context_.decodingPairPool().addPair(std::move(pair)); } diff --git a/dwio/nimble/velox/FieldWriter.h b/dwio/nimble/velox/FieldWriter.h index aad6b45..03b3adb 100644 --- a/dwio/nimble/velox/FieldWriter.h +++ b/dwio/nimble/velox/FieldWriter.h @@ -119,22 +119,27 @@ class DecodingPairPool { struct FieldWriterContext { explicit FieldWriterContext( velox::memory::MemoryPool& memoryPool, + std::shared_ptr executor, std::unique_ptr reclaimer = nullptr, std::function vectorDecoderVisitor = []() {}, std::chrono::milliseconds timeout = std::chrono::milliseconds(1000 * 10), - size_t maxPoolSize = std::thread::hardware_concurrency()) + size_t maxPoolSize = std::thread::hardware_concurrency(), + size_t initialBufferCount = 10) : bufferMemoryPool{memoryPool.addLeafChild( "field_writer_buffer", true, std::move(reclaimer))}, inputBufferGrowthPolicy{ DefaultInputBufferGrowthPolicy::withDefaultRanges()}, - decodingPairPool_{std::make_unique( - std::move(vectorDecoderVisitor), + bufferPool_{std::make_unique( + *bufferMemoryPool, + std::move(executor), timeout, - maxPoolSize)} { - resetStringBuffer(); - } + maxPoolSize, + initialBufferCount)}, + decodingPairPool_{ + std::make_unique( + std::move(vectorDecoderVisitor), timeout, maxPoolSize)} {} std::shared_ptr bufferMemoryPool; SchemaBuilder schemaBuilder; @@ -156,13 +161,8 @@ struct FieldWriterContext { return *decodingPairPool_; } - Buffer& stringBuffer() { - return *buffer_; - } - - // Reset writer context for use by next stripe. - void resetStringBuffer() { - buffer_ = std::make_unique(*bufferMemoryPool); + BufferPool& bufferPool() { + return *bufferPool_; } const std::vector>& streams() { @@ -192,7 +192,7 @@ struct FieldWriterContext { } private: - std::unique_ptr buffer_; + std::unique_ptr bufferPool_; std::unique_ptr decodingPairPool_; std::vector> streams_; }; diff --git a/dwio/nimble/velox/VeloxWriter.cpp b/dwio/nimble/velox/VeloxWriter.cpp index 50fac4f..ad1bf90 100644 --- a/dwio/nimble/velox/VeloxWriter.cpp +++ b/dwio/nimble/velox/VeloxWriter.cpp @@ -64,7 +64,15 @@ class WriterContext : public FieldWriterContext { WriterContext( velox::memory::MemoryPool& memoryPool, VeloxWriterOptions options) - : FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor, options.poolTimeout, options.maxPoolSize}, + : FieldWriterContext{ + memoryPool, + options.vectorDecoderVisitor, + options.bufferExecutor, + options.reclaimerFactory(), + options.poolTimeout, + options.maxPoolSize, + options.initialBufferCount + }, options{std::move(options)}, logger{this->options.metricsLogger} { flushPolicy = this->options.flushPolicyFactory(); @@ -615,9 +623,6 @@ void VeloxWriter::writeChunk(bool lastChunk) { LoggingScope scope{*context_->logger}; velox::CpuWallTimer veloxTimer{context_->stripeFlushTiming}; - if (!encodingBuffer_) { - encodingBuffer_ = std::make_unique(*encodingMemoryPool_); - } streams_.resize(context_->schemaBuilder.nodeCount()); // When writing null streams, we write the nulls as data, and the stream @@ -661,9 +666,11 @@ void VeloxWriter::writeChunk(bool lastChunk) { auto encode = [&](StreamData& streamData) { const auto offset = streamData.descriptor().offset(); - auto encoded = encodeStream(*context_, *encodingBuffer_, streamData); + auto bufferPtr = context_->bufferPool().reserveBuffer(); + auto& buffer = *bufferPtr; + auto encoded = encodeStream(*context_, buffer, streamData); if (!encoded.empty()) { - ChunkedStreamWriter chunkWriter{*encodingBuffer_}; + ChunkedStreamWriter chunkWriter{buffer}; NIMBLE_DASSERT(offset < streams_.size(), "Stream offset out of range."); auto& stream = streams_[offset]; for (auto& buffer : chunkWriter.encode(encoded)) { @@ -672,6 +679,8 @@ void VeloxWriter::writeChunk(bool lastChunk) { } } streamData.reset(); + + context_->bufferPool().co_addBuffer(std::move(bufferPtr)); }; auto processStream = [&](StreamData& streamData, @@ -775,10 +784,6 @@ uint32_t VeloxWriter::writeStripe() { uint64_t startSize = writer_.size(); writer_.writeStripe(context_->rowsInStripe, std::move(streams_)); stripeSize = writer_.size() - startSize; - encodingBuffer_.reset(); - // TODO: once chunked string fields are supported, move string buffer - // reset to writeChunk() - context_->resetStringBuffer(); } NIMBLE_ASSERT( diff --git a/dwio/nimble/velox/VeloxWriter.h b/dwio/nimble/velox/VeloxWriter.h index adb1e5b..d67b2ca 100644 --- a/dwio/nimble/velox/VeloxWriter.h +++ b/dwio/nimble/velox/VeloxWriter.h @@ -77,7 +77,6 @@ class VeloxWriter { TabletWriter writer_; std::unique_ptr root_; - std::unique_ptr encodingBuffer_; std::vector streams_; std::exception_ptr lastException_; const velox::common::SpillConfig* const spillConfig_; diff --git a/dwio/nimble/velox/VeloxWriterOptions.h b/dwio/nimble/velox/VeloxWriterOptions.h index 68021e7..a3f875d 100644 --- a/dwio/nimble/velox/VeloxWriterOptions.h +++ b/dwio/nimble/velox/VeloxWriterOptions.h @@ -128,7 +128,10 @@ struct VeloxWriterOptions { std::chrono::milliseconds poolTimeout = std::chrono::milliseconds{1000 * 10}; size_t maxPoolSize = std::thread::hardware_concurrency(); - + size_t initialBufferCount = 20; // should be enough for 1 stripe + std::shared_ptr bufferExecutor = + std::make_shared( + std::thread::hardware_concurrency()); // If provided, internal encoding operations will happen in parallel using // this executor. std::shared_ptr encodingExecutor; diff --git a/dwio/nimble/velox/tests/VeloxWriterTests.cpp b/dwio/nimble/velox/tests/VeloxWriterTests.cpp index a39830e..4f3dc74 100644 --- a/dwio/nimble/velox/tests/VeloxWriterTests.cpp +++ b/dwio/nimble/velox/tests/VeloxWriterTests.cpp @@ -321,11 +321,15 @@ TEST_F(VeloxWriterTests, MemoryReclaimPath) { std::string file; auto writeFile = std::make_unique(&file); std::atomic_bool reclaimEntered = false; - nimble::VeloxWriterOptions writerOptions{.reclaimerFactory = [&]() { - auto reclaimer = std::make_unique(); - reclaimer->setEnterArbitrationFunc([&]() { reclaimEntered = true; }); - return reclaimer; - }}; + nimble::VeloxWriterOptions writerOptions{ + .reclaimerFactory = + [&]() { + auto reclaimer = std::make_unique(); + reclaimer->setEnterArbitrationFunc( + [&]() { reclaimEntered = true; }); + return reclaimer; + }, + .initialBufferCount = 2}; nimble::VeloxWriter writer( *writerPool, type, std::move(writeFile), std::move(writerOptions)); auto batches = generateBatches(type, 100, 4000, 20221110, *leafPool_);