diff --git a/velox/exec/Exchange.h b/velox/exec/Exchange.h index dea346773355a..fd79ab68a002a 100644 --- a/velox/exec/Exchange.h +++ b/velox/exec/Exchange.h @@ -54,7 +54,7 @@ class Exchange : public SourceOperator { processSplits_{operatorCtx_->driverCtx()->driverId == 0}, exchangeClient_{std::move(exchangeClient)} { options_.compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); + OutputBufferManager::getInstance()->compressionKind(); } ~Exchange() override { diff --git a/velox/exec/OutputBufferManager.cpp b/velox/exec/OutputBufferManager.cpp index 0425218d175a5..e34bd6b06b059 100644 --- a/velox/exec/OutputBufferManager.cpp +++ b/velox/exec/OutputBufferManager.cpp @@ -17,21 +17,12 @@ #include "velox/exec/Task.h" namespace facebook::velox::exec { -// static -void OutputBufferManager::initialize(const Options& options) { - std::lock_guard l(initMutex_); - VELOX_CHECK( - instance_ == nullptr, "May initialize OutputBufferManager only once"); - instance_ = std::make_shared(options); -} // static -std::weak_ptr OutputBufferManager::getInstance() { - std::lock_guard l(initMutex_); - if (!instance_) { - instance_ = std::make_shared(Options()); - } - return instance_; +const std::shared_ptr& OutputBufferManager::getInstance() { + static const std::shared_ptr instance = + std::make_shared(Options()); + return instance; } std::shared_ptr OutputBufferManager::getBuffer( diff --git a/velox/exec/OutputBufferManager.h b/velox/exec/OutputBufferManager.h index 847532cd554e9..4368059d248f3 100644 --- a/velox/exec/OutputBufferManager.h +++ b/velox/exec/OutputBufferManager.h @@ -98,11 +98,7 @@ class OutputBufferManager { void removeTask(const std::string& taskId); - /// Initializes singleton with 'options'. May be called once before - /// getInstance(). - static void initialize(const Options& options); - - static std::weak_ptr getInstance(); + static const std::shared_ptr& getInstance(); uint64_t numBuffers() const; @@ -157,8 +153,5 @@ class OutputBufferManager { std::function()> listenerFactory_{ nullptr}; - - inline static std::shared_ptr instance_; - inline static std::mutex initMutex_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index 6b49fe22efa8e..7b49d1d1e9fca 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -56,7 +56,7 @@ BlockingReason Destination::advance( auto rowType = asRowType(output->type()); serializer::presto::PrestoVectorSerde::PrestoOptions options; options.compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); + OutputBufferManager::getInstance()->compressionKind(); options.minCompressionRatio = PartitionedOutput::minCompressionRatio(); current_->createStreamTree(rowType, rowsInCurrent_, &options); } diff --git a/velox/exec/tests/ExchangeClientTest.cpp b/velox/exec/tests/ExchangeClientTest.cpp index cf0a9eabe5571..7cb0584cbd995 100644 --- a/velox/exec/tests/ExchangeClientTest.cpp +++ b/velox/exec/tests/ExchangeClientTest.cpp @@ -47,7 +47,7 @@ class ExchangeClientTest : public testing::Test, if (!isRegisteredVectorSerde()) { velox::serializer::presto::PrestoVectorSerde::registerVectorSerde(); } - bufferManager_ = OutputBufferManager::getInstance().lock(); + bufferManager_ = OutputBufferManager::getInstance(); common::testutil::TestValue::enable(); } diff --git a/velox/exec/tests/GroupedExecutionTest.cpp b/velox/exec/tests/GroupedExecutionTest.cpp index e2e788506a89c..4379567e33f83 100644 --- a/velox/exec/tests/GroupedExecutionTest.cpp +++ b/velox/exec/tests/GroupedExecutionTest.cpp @@ -287,7 +287,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithOutputBuffer) { // 'Delete results' from output buffer triggers 'set all output consumed', // which should finish the task. - auto outputBufferManager = exec::OutputBufferManager::getInstance().lock(); + auto outputBufferManager = exec::OutputBufferManager::getInstance(); outputBufferManager->deleteResults(task->taskId(), 0); // Task must be finished at this stage. @@ -471,7 +471,7 @@ DEBUG_ONLY_TEST_F( // 'Delete results' from output buffer triggers 'set all output consumed', // which should finish the task. - auto outputBufferManager = exec::OutputBufferManager::getInstance().lock(); + auto outputBufferManager = exec::OutputBufferManager::getInstance(); outputBufferManager->deleteResults(task->taskId(), 0); // Task must be finished at this stage. @@ -627,7 +627,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithHashAndNestedLoopJoin) { // 'Delete results' from output buffer triggers 'set all output consumed', // which should finish the task. - auto outputBufferManager = exec::OutputBufferManager::getInstance().lock(); + auto outputBufferManager = exec::OutputBufferManager::getInstance(); outputBufferManager->deleteResults(task->taskId(), 0); // Task must be finished at this stage. diff --git a/velox/exec/tests/LimitTest.cpp b/velox/exec/tests/LimitTest.cpp index 0c1cae8fe15e4..fdc9fb10c1b91 100644 --- a/velox/exec/tests/LimitTest.cpp +++ b/velox/exec/tests/LimitTest.cpp @@ -121,7 +121,7 @@ TEST_F(LimitTest, partialLimitEagerFlush) { params.planNode = builder.partitionedOutput({}, 1).planNode(); auto cursor = TaskCursor::create(params); ASSERT_FALSE(cursor->moveNext()); - auto bufferManager = exec::OutputBufferManager::getInstance().lock(); + auto bufferManager = exec::OutputBufferManager::getInstance(); auto [numPagesPromise, numPagesFuture] = folly::makePromiseContract(); ASSERT_TRUE(bufferManager->getData( cursor->task()->taskId(), diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index dfb4efb15eeeb..6d98f4d0d2a36 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -197,7 +197,7 @@ class MultiFragmentTest : public HiveConnectorTestBase { std::vector> filePaths_; std::vector vectors_; std::shared_ptr bufferManager_{ - OutputBufferManager::getInstance().lock()}; + OutputBufferManager::getInstance()}; }; TEST_F(MultiFragmentTest, aggregationSingleKey) { @@ -1940,7 +1940,7 @@ class DataFetcher { int64_t totalBytes_{0}; std::shared_ptr bufferManager_{ - OutputBufferManager::getInstance().lock()}; + OutputBufferManager::getInstance()}; }; /// Verify that POBM::getData() honors maxBytes parameter roughly at 1MB diff --git a/velox/exec/tests/OutputBufferManagerTest.cpp b/velox/exec/tests/OutputBufferManagerTest.cpp index c8414c3b2ecc5..5561bd4164873 100644 --- a/velox/exec/tests/OutputBufferManagerTest.cpp +++ b/velox/exec/tests/OutputBufferManagerTest.cpp @@ -43,7 +43,7 @@ class OutputBufferManagerTest : public testing::Test { void SetUp() override { pool_ = facebook::velox::memory::memoryManager()->addLeafPool(); - bufferManager_ = OutputBufferManager::getInstance().lock(); + bufferManager_ = OutputBufferManager::getInstance(); if (!isRegisteredVectorSerde()) { facebook::velox::serializer::presto::PrestoVectorSerde:: registerVectorSerde(); diff --git a/velox/exec/tests/PartitionedOutputTest.cpp b/velox/exec/tests/PartitionedOutputTest.cpp index 37f44de6f6f47..409c190b510f6 100644 --- a/velox/exec/tests/PartitionedOutputTest.cpp +++ b/velox/exec/tests/PartitionedOutputTest.cpp @@ -78,7 +78,7 @@ class PartitionedOutputTest : public OperatorTestBase { private: const std::shared_ptr bufferManager_{ - OutputBufferManager::getInstance().lock()}; + OutputBufferManager::getInstance()}; }; TEST_F(PartitionedOutputTest, flush) { diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 474c1e4ca5032..7b408bdd5032b 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -1044,7 +1044,7 @@ TEST_F(TaskTest, updateBroadCastOutputBuffers) { .project({"c0 % 10"}) .partitionedOutputBroadcast({}) .planFragment(); - auto bufferManager = OutputBufferManager::getInstance().lock(); + auto bufferManager = OutputBufferManager::getInstance(); { auto task = Task::create( "t0", diff --git a/velox/exec/tests/utils/LocalExchangeSource.cpp b/velox/exec/tests/utils/LocalExchangeSource.cpp index 9d012c908c95e..28e49a9342abb 100644 --- a/velox/exec/tests/utils/LocalExchangeSource.cpp +++ b/velox/exec/tests/utils/LocalExchangeSource.cpp @@ -52,7 +52,7 @@ class LocalExchangeSource : public exec::ExchangeSource { promise_ = std::move(promise); - auto buffers = OutputBufferManager::getInstance().lock(); + auto buffers = OutputBufferManager::getInstance(); VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager"); VELOX_CHECK(requestPending_); auto requestedSequence = sequence_; @@ -164,7 +164,7 @@ class LocalExchangeSource : public exec::ExchangeSource { void pause() override { common::testutil::TestValue::adjust( "facebook::velox::exec::test::LocalExchangeSource::pause", nullptr); - auto buffers = OutputBufferManager::getInstance().lock(); + auto buffers = OutputBufferManager::getInstance(); VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager"); int64_t ackSequence; { @@ -177,7 +177,7 @@ class LocalExchangeSource : public exec::ExchangeSource { void close() override { checkSetRequestPromise(); - auto buffers = OutputBufferManager::getInstance().lock(); + auto buffers = OutputBufferManager::getInstance(); buffers->deleteResults(taskId_, destination_); } diff --git a/velox/tool/trace/PartitionedOutputReplayer.h b/velox/tool/trace/PartitionedOutputReplayer.h index a90c222dbf657..c8e6193911bfb 100644 --- a/velox/tool/trace/PartitionedOutputReplayer.h +++ b/velox/tool/trace/PartitionedOutputReplayer.h @@ -59,7 +59,7 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase { const core::PartitionedOutputNode* const originalNode_; const std::shared_ptr bufferManager_{ - exec::OutputBufferManager::getInstance().lock()}; + exec::OutputBufferManager::getInstance()}; const std::unique_ptr executor_{ std::make_unique( std::thread::hardware_concurrency(), diff --git a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp index d2d9c2c4e49f3..fc8b35fc4193f 100644 --- a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp +++ b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp @@ -111,7 +111,7 @@ class PartitionedOutputReplayerTest : public HiveConnectorTestBase { } const std::shared_ptr bufferManager_{ - exec::OutputBufferManager::getInstance().lock()}; + exec::OutputBufferManager::getInstance()}; }; TEST_F(PartitionedOutputReplayerTest, defaultConsumer) {