From 61ed96ec71221ed3365747dee4d8038e9a054c4a Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Thu, 24 Oct 2024 23:38:27 -0700 Subject: [PATCH] Improve OutputBufferManager initialization Starting from C++11, the C++ standard guarantees that the initialization of function-local static variables is thread-safe. This is better than using a global mutex, especially for subsequent getInstance() calls. This is because the overhead of using a static local variable only needs to do a simple check to see if the variable has already been initialized, while for the global mutex case, all getInstance() calls need to aquire this lock exclusively. --- velox/exec/Exchange.h | 2 +- velox/exec/OutputBufferManager.cpp | 17 ++++------------- velox/exec/OutputBufferManager.h | 9 +-------- velox/exec/PartitionedOutput.cpp | 2 +- velox/exec/tests/ExchangeClientTest.cpp | 2 +- velox/exec/tests/GroupedExecutionTest.cpp | 6 +++--- velox/exec/tests/LimitTest.cpp | 2 +- velox/exec/tests/MultiFragmentTest.cpp | 4 ++-- velox/exec/tests/OutputBufferManagerTest.cpp | 2 +- velox/exec/tests/PartitionedOutputTest.cpp | 2 +- velox/exec/tests/TaskTest.cpp | 2 +- velox/exec/tests/utils/LocalExchangeSource.cpp | 6 +++--- velox/tool/trace/PartitionedOutputReplayer.h | 2 +- .../tests/PartitionedOutputReplayerTest.cpp | 2 +- 14 files changed, 22 insertions(+), 38 deletions(-) diff --git a/velox/exec/Exchange.h b/velox/exec/Exchange.h index 0aa69fa9f9509..ff231098be449 100644 --- a/velox/exec/Exchange.h +++ b/velox/exec/Exchange.h @@ -55,7 +55,7 @@ class Exchange : public SourceOperator { processSplits_{operatorCtx_->driverCtx()->driverId == 0}, exchangeClient_{std::move(exchangeClient)} { options_.compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); + OutputBufferManager::getInstance()->compressionKind(); } ~Exchange() override { diff --git a/velox/exec/OutputBufferManager.cpp b/velox/exec/OutputBufferManager.cpp index 0425218d175a5..e34bd6b06b059 100644 --- a/velox/exec/OutputBufferManager.cpp +++ b/velox/exec/OutputBufferManager.cpp @@ -17,21 +17,12 @@ #include "velox/exec/Task.h" namespace facebook::velox::exec { -// static -void OutputBufferManager::initialize(const Options& options) { - std::lock_guard l(initMutex_); - VELOX_CHECK( - instance_ == nullptr, "May initialize OutputBufferManager only once"); - instance_ = std::make_shared(options); -} // static -std::weak_ptr OutputBufferManager::getInstance() { - std::lock_guard l(initMutex_); - if (!instance_) { - instance_ = std::make_shared(Options()); - } - return instance_; +const std::shared_ptr& OutputBufferManager::getInstance() { + static const std::shared_ptr instance = + std::make_shared(Options()); + return instance; } std::shared_ptr OutputBufferManager::getBuffer( diff --git a/velox/exec/OutputBufferManager.h b/velox/exec/OutputBufferManager.h index 847532cd554e9..4368059d248f3 100644 --- a/velox/exec/OutputBufferManager.h +++ b/velox/exec/OutputBufferManager.h @@ -98,11 +98,7 @@ class OutputBufferManager { void removeTask(const std::string& taskId); - /// Initializes singleton with 'options'. May be called once before - /// getInstance(). - static void initialize(const Options& options); - - static std::weak_ptr getInstance(); + static const std::shared_ptr& getInstance(); uint64_t numBuffers() const; @@ -157,8 +153,5 @@ class OutputBufferManager { std::function()> listenerFactory_{ nullptr}; - - inline static std::shared_ptr instance_; - inline static std::mutex initMutex_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index 67504891ec9e2..8deb7aa861fd5 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -60,7 +60,7 @@ BlockingReason Destination::advance( if (serde_->kind() == VectorSerde::Kind::kPresto) { serializer::presto::PrestoVectorSerde::PrestoOptions options; options.compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); + OutputBufferManager::getInstance()->compressionKind(); options.minCompressionRatio = PartitionedOutput::minCompressionRatio(); current_->createStreamTree(rowType, rowsInCurrent_, &options); } else { diff --git a/velox/exec/tests/ExchangeClientTest.cpp b/velox/exec/tests/ExchangeClientTest.cpp index 60de09c2513f4..4ad7f9d5a02c7 100644 --- a/velox/exec/tests/ExchangeClientTest.cpp +++ b/velox/exec/tests/ExchangeClientTest.cpp @@ -61,7 +61,7 @@ class ExchangeClientTest if (!isRegisteredVectorSerde()) { velox::serializer::presto::PrestoVectorSerde::registerVectorSerde(); } - bufferManager_ = OutputBufferManager::getInstance().lock(); + bufferManager_ = OutputBufferManager::getInstance(); common::testutil::TestValue::enable(); } diff --git a/velox/exec/tests/GroupedExecutionTest.cpp b/velox/exec/tests/GroupedExecutionTest.cpp index e2e788506a89c..4379567e33f83 100644 --- a/velox/exec/tests/GroupedExecutionTest.cpp +++ b/velox/exec/tests/GroupedExecutionTest.cpp @@ -287,7 +287,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithOutputBuffer) { // 'Delete results' from output buffer triggers 'set all output consumed', // which should finish the task. - auto outputBufferManager = exec::OutputBufferManager::getInstance().lock(); + auto outputBufferManager = exec::OutputBufferManager::getInstance(); outputBufferManager->deleteResults(task->taskId(), 0); // Task must be finished at this stage. @@ -471,7 +471,7 @@ DEBUG_ONLY_TEST_F( // 'Delete results' from output buffer triggers 'set all output consumed', // which should finish the task. - auto outputBufferManager = exec::OutputBufferManager::getInstance().lock(); + auto outputBufferManager = exec::OutputBufferManager::getInstance(); outputBufferManager->deleteResults(task->taskId(), 0); // Task must be finished at this stage. @@ -627,7 +627,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithHashAndNestedLoopJoin) { // 'Delete results' from output buffer triggers 'set all output consumed', // which should finish the task. - auto outputBufferManager = exec::OutputBufferManager::getInstance().lock(); + auto outputBufferManager = exec::OutputBufferManager::getInstance(); outputBufferManager->deleteResults(task->taskId(), 0); // Task must be finished at this stage. diff --git a/velox/exec/tests/LimitTest.cpp b/velox/exec/tests/LimitTest.cpp index 0c1cae8fe15e4..fdc9fb10c1b91 100644 --- a/velox/exec/tests/LimitTest.cpp +++ b/velox/exec/tests/LimitTest.cpp @@ -121,7 +121,7 @@ TEST_F(LimitTest, partialLimitEagerFlush) { params.planNode = builder.partitionedOutput({}, 1).planNode(); auto cursor = TaskCursor::create(params); ASSERT_FALSE(cursor->moveNext()); - auto bufferManager = exec::OutputBufferManager::getInstance().lock(); + auto bufferManager = exec::OutputBufferManager::getInstance(); auto [numPagesPromise, numPagesFuture] = folly::makePromiseContract(); ASSERT_TRUE(bufferManager->getData( cursor->task()->taskId(), diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index f6ef796693911..83c4b17d3af77 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -206,7 +206,7 @@ class MultiFragmentTest std::vector> filePaths_; std::vector vectors_; std::shared_ptr bufferManager_{ - OutputBufferManager::getInstance().lock()}; + OutputBufferManager::getInstance()}; }; TEST_P(MultiFragmentTest, aggregationSingleKey) { @@ -2057,7 +2057,7 @@ class DataFetcher { int64_t totalBytes_{0}; std::shared_ptr bufferManager_{ - OutputBufferManager::getInstance().lock()}; + OutputBufferManager::getInstance()}; }; /// Verify that POBM::getData() honors maxBytes parameter roughly at 1MB diff --git a/velox/exec/tests/OutputBufferManagerTest.cpp b/velox/exec/tests/OutputBufferManagerTest.cpp index e341ae4329fe0..1e711f32f33cf 100644 --- a/velox/exec/tests/OutputBufferManagerTest.cpp +++ b/velox/exec/tests/OutputBufferManagerTest.cpp @@ -62,7 +62,7 @@ class OutputBufferManagerTest : public testing::Test { void SetUp() override { pool_ = facebook::velox::memory::memoryManager()->addLeafPool(); - bufferManager_ = OutputBufferManager::getInstance().lock(); + bufferManager_ = OutputBufferManager::getInstance(); if (!isRegisteredVectorSerde()) { facebook::velox::serializer::presto::PrestoVectorSerde:: registerVectorSerde(); diff --git a/velox/exec/tests/PartitionedOutputTest.cpp b/velox/exec/tests/PartitionedOutputTest.cpp index 4ce97e38d02eb..154f71bf3f0d2 100644 --- a/velox/exec/tests/PartitionedOutputTest.cpp +++ b/velox/exec/tests/PartitionedOutputTest.cpp @@ -91,7 +91,7 @@ class PartitionedOutputTest private: const std::shared_ptr bufferManager_{ - OutputBufferManager::getInstance().lock()}; + OutputBufferManager::getInstance()}; }; TEST_P(PartitionedOutputTest, flush) { diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 474c1e4ca5032..7b408bdd5032b 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -1044,7 +1044,7 @@ TEST_F(TaskTest, updateBroadCastOutputBuffers) { .project({"c0 % 10"}) .partitionedOutputBroadcast({}) .planFragment(); - auto bufferManager = OutputBufferManager::getInstance().lock(); + auto bufferManager = OutputBufferManager::getInstance(); { auto task = Task::create( "t0", diff --git a/velox/exec/tests/utils/LocalExchangeSource.cpp b/velox/exec/tests/utils/LocalExchangeSource.cpp index 9d012c908c95e..28e49a9342abb 100644 --- a/velox/exec/tests/utils/LocalExchangeSource.cpp +++ b/velox/exec/tests/utils/LocalExchangeSource.cpp @@ -52,7 +52,7 @@ class LocalExchangeSource : public exec::ExchangeSource { promise_ = std::move(promise); - auto buffers = OutputBufferManager::getInstance().lock(); + auto buffers = OutputBufferManager::getInstance(); VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager"); VELOX_CHECK(requestPending_); auto requestedSequence = sequence_; @@ -164,7 +164,7 @@ class LocalExchangeSource : public exec::ExchangeSource { void pause() override { common::testutil::TestValue::adjust( "facebook::velox::exec::test::LocalExchangeSource::pause", nullptr); - auto buffers = OutputBufferManager::getInstance().lock(); + auto buffers = OutputBufferManager::getInstance(); VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager"); int64_t ackSequence; { @@ -177,7 +177,7 @@ class LocalExchangeSource : public exec::ExchangeSource { void close() override { checkSetRequestPromise(); - auto buffers = OutputBufferManager::getInstance().lock(); + auto buffers = OutputBufferManager::getInstance(); buffers->deleteResults(taskId_, destination_); } diff --git a/velox/tool/trace/PartitionedOutputReplayer.h b/velox/tool/trace/PartitionedOutputReplayer.h index 873bb185addc3..ff3cf67fde729 100644 --- a/velox/tool/trace/PartitionedOutputReplayer.h +++ b/velox/tool/trace/PartitionedOutputReplayer.h @@ -60,7 +60,7 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase { const core::PartitionedOutputNode* const originalNode_; const VectorSerde::Kind serdeKind_; const std::shared_ptr bufferManager_{ - exec::OutputBufferManager::getInstance().lock()}; + exec::OutputBufferManager::getInstance()}; const std::unique_ptr executor_{ std::make_unique( std::thread::hardware_concurrency(), diff --git a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp index c9fd1498eef73..b3b7bb460cc88 100644 --- a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp +++ b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp @@ -120,7 +120,7 @@ class PartitionedOutputReplayerTest } const std::shared_ptr bufferManager_{ - exec::OutputBufferManager::getInstance().lock()}; + exec::OutputBufferManager::getInstance()}; }; TEST_P(PartitionedOutputReplayerTest, defaultConsumer) {