From 1ca4fa2d3cd0f4986096480e2a6bffac511bbfb4 Mon Sep 17 00:00:00 2001 From: Scott Young Date: Fri, 13 Dec 2024 12:51:50 -0800 Subject: [PATCH] add bufferPool for nimble parallel writer (#103) Summary: change the buffer class to a bufferPool to handle multihreaded buffers without mutexes Differential Revision: D64774959 --- dwio/nimble/common/Buffer.h | 8 +- dwio/nimble/common/tests/BufferPoolTests.cpp | 101 +++++++++++++++++++ dwio/nimble/velox/FieldWriter.cpp | 65 +++++++++++- dwio/nimble/velox/FieldWriter.h | 58 ++++++++--- dwio/nimble/velox/VeloxWriter.cpp | 20 ++-- dwio/nimble/velox/VeloxWriter.h | 1 - dwio/nimble/velox/VeloxWriterOptions.h | 1 + dwio/nimble/velox/tests/VeloxWriterTests.cpp | 14 ++- 8 files changed, 231 insertions(+), 37 deletions(-) create mode 100644 dwio/nimble/common/tests/BufferPoolTests.cpp diff --git a/dwio/nimble/common/Buffer.h b/dwio/nimble/common/Buffer.h index e6850d8..d3e08e8 100644 --- a/dwio/nimble/common/Buffer.h +++ b/dwio/nimble/common/Buffer.h @@ -34,7 +34,7 @@ namespace facebook::nimble { -// Internally manages memory in chunks. Releases memory only upon destruction. +// Internally manages memory in chunks. releases memory when destroyed // Buffer is NOT threadsafe: external locking is required. class Buffer { using MemoryPool = facebook::velox::memory::MemoryPool; @@ -52,7 +52,6 @@ class Buffer { // to, and guarantees for the lifetime of *this that that region will remain // valid. Does NOT guarantee that the region is initially 0'd. char* reserve(uint64_t bytes) { - std::scoped_lock l(mutex_); if (reserveEnd_ + bytes <= chunkEnd_) { pos_ = reserveEnd_; reserveEnd_ += bytes; @@ -98,11 +97,6 @@ class Buffer { char* reserveEnd_; std::vector chunks_; MemoryPool& memoryPool_; - // NOTE: this is temporary fix, to quickly enable parallel access to the - // buffer class. In the near future, we are going to templetize this class to - // produce a concurrent and a non-concurrent variants, and change the call - // sites to use each variant when needed. - std::mutex mutex_; }; } // namespace facebook::nimble diff --git a/dwio/nimble/common/tests/BufferPoolTests.cpp b/dwio/nimble/common/tests/BufferPoolTests.cpp new file mode 100644 index 0000000..5fdd9bb --- /dev/null +++ b/dwio/nimble/common/tests/BufferPoolTests.cpp @@ -0,0 +1,101 @@ +/* + * Copyright (c) Meta Platforms, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "dwio/nimble/common/Exceptions.h" +#include "dwio/nimble/velox/FieldWriter.h" +#include "folly/executors/CPUThreadPoolExecutor.h" +#include "velox/common/memory/Memory.h" +#include "velox/common/memory/MemoryArbitrator.h" +#include "velox/common/memory/SharedArbitrator.h" +#include "velox/dwio/common/ExecutorBarrier.h" + +namespace facebook::nimble::test { +using MemoryPool = velox::memory::MemoryPool; +using ExecutorBarrier = velox::dwio::common::ExecutorBarrier; + +class BufferPoolTest : public ::testing::Test { + protected: + static void SetUpTestCase() { + velox::memory::SharedArbitrator::registerFactory(); + velox::memory::MemoryManager::testingSetInstance( + {.arbitratorKind = "SHARED"}); + } + + void SetUp() override { + rootPool_ = velox::memory::memoryManager()->addRootPool("default_root"); + leafPool_ = rootPool_->addLeafChild("default_leaf"); + } + + std::shared_ptr rootPool_; + std::shared_ptr leafPool_; +}; + +TEST_F(BufferPoolTest, CreateBufferPoolBadMaxPool) { + try { + auto bufferPool = BufferPool{*leafPool_, /* maxPoolSize */ 0}; + FAIL(); + } catch (const NimbleUserError& e) { + EXPECT_EQ(e.errorMessage(), "max pool size must be > 0"); + } +} + +TEST_F(BufferPoolTest, ReserveBuffer) { + auto bufferPool = BufferPool{*leafPool_, /* maxPoolSize */ 10}; + EXPECT_EQ(bufferPool.size(), 10); + { + auto buffer = bufferPool.reserveBuffer(); + EXPECT_EQ(bufferPool.size(), 9); + } + EXPECT_EQ(bufferPool.size(), 10); +} + +TEST_F(BufferPoolTest, EmptyFillBufferPool) { + size_t iterations = 10; + auto bufferPool = BufferPool{*leafPool_, /* maxPoolSize */ iterations}; + + for (auto i = 0; i < iterations; ++i) { + { + auto buffer1 = bufferPool.reserveBuffer(); + auto buffer2 = bufferPool.reserveBuffer(); + auto buffer3 = bufferPool.reserveBuffer(); + + EXPECT_EQ(bufferPool.size(), iterations - 3); + } + EXPECT_EQ(bufferPool.size(), iterations); + } +} + +TEST_F(BufferPoolTest, ParallelFillPool) { + auto parallelismFactor = std::thread::hardware_concurrency(); + auto executor = + std::make_shared(parallelismFactor); + ExecutorBarrier barrier{executor}; + auto bufferPool = BufferPool{*leafPool_}; + EXPECT_EQ(bufferPool.size(), parallelismFactor); + + for (auto i = 0; i < parallelismFactor; ++i) { + barrier.add([&]() { + for (auto j = 0; j < 100000; ++j) { + auto buffer = bufferPool.reserveBuffer(); + } + }); + } + + barrier.waitAll(); + EXPECT_LE(bufferPool.size(), parallelismFactor); +} +} // namespace facebook::nimble::test diff --git a/dwio/nimble/velox/FieldWriter.cpp b/dwio/nimble/velox/FieldWriter.cpp index 9035ab8..486acc0 100644 --- a/dwio/nimble/velox/FieldWriter.cpp +++ b/dwio/nimble/velox/FieldWriter.cpp @@ -289,9 +289,8 @@ class SimpleFieldWriter : public FieldWriter { const OrderedRanges& ranges, folly::Executor*) override { auto size = ranges.size(); - auto& buffer = context_.stringBuffer(); auto& data = valuesStream_.mutableData(); - + auto bufferObject = context_.bufferPool().reserveBuffer(); if (auto flat = vector->asFlatVector()) { valuesStream_.ensureNullsCapacity(flat->mayHaveNulls(), size); bool rangeCopied = false; @@ -331,6 +330,7 @@ class SimpleFieldWriter : public FieldWriter { valuesStream_.mutableNonNulls(), Flat{vector}, [&](SourceType value) { + auto& buffer = bufferObject.get(); data.push_back( C::convert(value, buffer, valuesStream_.extraMemory())); }); @@ -344,6 +344,7 @@ class SimpleFieldWriter : public FieldWriter { valuesStream_.mutableNonNulls(), Decoded{decoded}, [&](SourceType value) { + auto& buffer = bufferObject.get(); data.push_back( C::convert(value, buffer, valuesStream_.extraMemory())); }); @@ -1590,6 +1591,66 @@ size_t DecodingContextPool::size() const { return pool_.size(); } +BufferPool::BufferObject::BufferObject( + BufferPool& pool, + std::unique_ptr buffer) + : pool_{pool}, buffer_{std::move(buffer)} {} + +BufferPool::BufferObject::~BufferObject() { + pool_.addBuffer(std::move(buffer_)); +} + +Buffer& BufferPool::BufferObject::get() { + return *buffer_; +} + +BufferPool::BufferPool( + facebook::velox::memory::MemoryPool& memoryPool, + size_t maxPoolSize, + uint64_t initialChunkSize) + : defaultInitialChunkSize_{initialChunkSize}, + semaphore_{0}, + memoryPool_{memoryPool} { + NIMBLE_CHECK(maxPoolSize > 0, "max pool size must be > 0") + pool_.reserve(maxPoolSize); + for (size_t i = 0; i < maxPoolSize; ++i) { + pool_.emplace_back(newBuffer()); + semaphore_.release(); + } +} + +facebook::velox::memory::MemoryPool& BufferPool::getMemoryPool() { + return memoryPool_; +} + +// buffer back to the pool. +void BufferPool::addBuffer(std::unique_ptr buffer) { + std::scoped_lock lock(mutex_); + pool_.push_back(std::move(buffer)); + semaphore_.release(); +} + +// Reserves a buffer from the pool. Adds a new buffer to the pool +// while there are buffers available +BufferPool::BufferObject BufferPool::reserveBuffer() { + semaphore_.acquire(); + + std::scoped_lock lock(mutex_); + auto buffer = std::move(pool_.back()); + pool_.pop_back(); + + return BufferPool::BufferObject{*this, std::move(buffer)}; +} + +// Returns estimated number of buffers in the pool +size_t BufferPool::size() { + return pool_.size(); +} + +std::unique_ptr BufferPool::newBuffer() { + return std::make_unique(memoryPool_, defaultInitialChunkSize_); +} + std::unique_ptr FieldWriter::create( FieldWriterContext& context, const std::shared_ptr& type, diff --git a/dwio/nimble/velox/FieldWriter.h b/dwio/nimble/velox/FieldWriter.h index 8d8d413..b3da471 100644 --- a/dwio/nimble/velox/FieldWriter.h +++ b/dwio/nimble/velox/FieldWriter.h @@ -76,20 +76,59 @@ class DecodingContextPool { std::unique_ptr selectivityVector); }; +// Manages a pool of buffers. Buffers are returned to the pool when released. +// maxPoolSize should be set to at least 90% of capacity for performance +class BufferPool { + public: + class BufferObject { + public: + explicit BufferObject(BufferPool& pool, std::unique_ptr buffer); + + ~BufferObject(); + Buffer& get(); + + private: + BufferPool& pool_; + std::unique_ptr buffer_; + }; + + explicit BufferPool( + facebook::velox::memory::MemoryPool& memoryPool, + size_t maxPoolSize = std::thread::hardware_concurrency(), + uint64_t initialChunkSize = kMinChunkSize); + + facebook::velox::memory::MemoryPool& getMemoryPool(); + BufferObject reserveBuffer(); + size_t size(); + + private: + static const uint64_t kMinChunkSize = 1LL << 20; + const uint64_t defaultInitialChunkSize_; + + std::mutex mutex_; + std::counting_semaphore<> semaphore_; + std::vector> pool_; + facebook::velox::memory::MemoryPool& memoryPool_; + + void addBuffer(std::unique_ptr buffer); + std::unique_ptr newBuffer(); +}; + struct FieldWriterContext { explicit FieldWriterContext( velox::memory::MemoryPool& memoryPool, std::unique_ptr reclaimer = nullptr, - std::function vectorDecoderVisitor = []() {}) + std::function vectorDecoderVisitor = []() {}, + size_t maxPoolSize = std::thread::hardware_concurrency()) : bufferMemoryPool{memoryPool.addLeafChild( "field_writer_buffer", true, std::move(reclaimer))}, inputBufferGrowthPolicy{ DefaultInputBufferGrowthPolicy::withDefaultRanges()}, - decodingContextPool_{std::move(vectorDecoderVisitor)} { - resetStringBuffer(); - } + bufferPool_{ + std::make_unique(*bufferMemoryPool, maxPoolSize)}, + decodingContextPool_{std::move(vectorDecoderVisitor)} {} std::shared_ptr bufferMemoryPool; std::mutex flatMapSchemaMutex; @@ -112,13 +151,8 @@ struct FieldWriterContext { return decodingContextPool_.reserveContext(); } - Buffer& stringBuffer() { - return *buffer_; - } - - // Reset writer context for use by next stripe. - void resetStringBuffer() { - buffer_ = std::make_unique(*bufferMemoryPool); + BufferPool& bufferPool() { + return *bufferPool_; } const std::vector>& streams() { @@ -148,7 +182,7 @@ struct FieldWriterContext { } private: - std::unique_ptr buffer_; + std::unique_ptr bufferPool_; DecodingContextPool decodingContextPool_; std::vector> streams_; }; diff --git a/dwio/nimble/velox/VeloxWriter.cpp b/dwio/nimble/velox/VeloxWriter.cpp index 0222178..cbae115 100644 --- a/dwio/nimble/velox/VeloxWriter.cpp +++ b/dwio/nimble/velox/VeloxWriter.cpp @@ -64,7 +64,12 @@ class WriterContext : public FieldWriterContext { WriterContext( velox::memory::MemoryPool& memoryPool, VeloxWriterOptions options) - : FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor}, + : FieldWriterContext{ + memoryPool, + options.reclaimerFactory(), + options.vectorDecoderVisitor, + options.maxPoolSize + }, options{std::move(options)}, logger{this->options.metricsLogger} { flushPolicy = this->options.flushPolicyFactory(); @@ -622,9 +627,6 @@ void VeloxWriter::writeChunk(bool lastChunk) { LoggingScope scope{*context_->logger}; velox::CpuWallTimer veloxTimer{context_->stripeFlushTiming}; - if (!encodingBuffer_) { - encodingBuffer_ = std::make_unique(*encodingMemoryPool_); - } streams_.resize(context_->schemaBuilder.nodeCount()); // When writing null streams, we write the nulls as data, and the stream @@ -668,9 +670,11 @@ void VeloxWriter::writeChunk(bool lastChunk) { auto encode = [&](StreamData& streamData) { const auto offset = streamData.descriptor().offset(); - auto encoded = encodeStream(*context_, *encodingBuffer_, streamData); + auto bufferObject = context_->bufferPool().reserveBuffer(); + auto& buffer = bufferObject.get(); + auto encoded = encodeStream(*context_, buffer, streamData); if (!encoded.empty()) { - ChunkedStreamWriter chunkWriter{*encodingBuffer_}; + ChunkedStreamWriter chunkWriter{buffer}; NIMBLE_DASSERT(offset < streams_.size(), "Stream offset out of range."); auto& stream = streams_[offset]; for (auto& buffer : chunkWriter.encode(encoded)) { @@ -782,10 +786,6 @@ uint32_t VeloxWriter::writeStripe() { uint64_t startSize = writer_.size(); writer_.writeStripe(context_->rowsInStripe, std::move(streams_)); stripeSize = writer_.size() - startSize; - encodingBuffer_.reset(); - // TODO: once chunked string fields are supported, move string buffer - // reset to writeChunk() - context_->resetStringBuffer(); } NIMBLE_ASSERT( diff --git a/dwio/nimble/velox/VeloxWriter.h b/dwio/nimble/velox/VeloxWriter.h index 5ff5137..0f2f18e 100644 --- a/dwio/nimble/velox/VeloxWriter.h +++ b/dwio/nimble/velox/VeloxWriter.h @@ -77,7 +77,6 @@ class VeloxWriter { TabletWriter writer_; std::unique_ptr root_; - std::unique_ptr encodingBuffer_; std::vector streams_; std::exception_ptr lastException_; const velox::common::SpillConfig* const spillConfig_; diff --git a/dwio/nimble/velox/VeloxWriterOptions.h b/dwio/nimble/velox/VeloxWriterOptions.h index 2a1f202..cdbff22 100644 --- a/dwio/nimble/velox/VeloxWriterOptions.h +++ b/dwio/nimble/velox/VeloxWriterOptions.h @@ -126,6 +126,7 @@ struct VeloxWriterOptions { const velox::common::SpillConfig* spillConfig{nullptr}; + size_t maxPoolSize = std::thread::hardware_concurrency(); // If provided, internal encoding operations will happen in parallel using // this executor. std::shared_ptr encodingExecutor; diff --git a/dwio/nimble/velox/tests/VeloxWriterTests.cpp b/dwio/nimble/velox/tests/VeloxWriterTests.cpp index 485fb1f..0999860 100644 --- a/dwio/nimble/velox/tests/VeloxWriterTests.cpp +++ b/dwio/nimble/velox/tests/VeloxWriterTests.cpp @@ -319,11 +319,15 @@ TEST_F(VeloxWriterTests, MemoryReclaimPath) { std::string file; auto writeFile = std::make_unique(&file); std::atomic_bool reclaimEntered = false; - nimble::VeloxWriterOptions writerOptions{.reclaimerFactory = [&]() { - auto reclaimer = std::make_unique(); - reclaimer->setEnterArbitrationFunc([&]() { reclaimEntered = true; }); - return reclaimer; - }}; + nimble::VeloxWriterOptions writerOptions{ + .reclaimerFactory = + [&]() { + auto reclaimer = std::make_unique(); + reclaimer->setEnterArbitrationFunc( + [&]() { reclaimEntered = true; }); + return reclaimer; + }, + .maxPoolSize = 2}; nimble::VeloxWriter writer( *writerPool, type, std::move(writeFile), std::move(writerOptions)); auto batches = generateBatches(type, 100, 4000, 20221110, *leafPool_);