diff --git a/velox/exec/Exchange.h b/velox/exec/Exchange.h index 0aa69fa9f950..a8d1066022b0 100644 --- a/velox/exec/Exchange.h +++ b/velox/exec/Exchange.h @@ -55,7 +55,7 @@ class Exchange : public SourceOperator { processSplits_{operatorCtx_->driverCtx()->driverId == 0}, exchangeClient_{std::move(exchangeClient)} { options_.compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); + OutputBufferManager::getInstanceRef()->compressionKind(); } ~Exchange() override { diff --git a/velox/exec/OutputBufferManager.cpp b/velox/exec/OutputBufferManager.cpp index 0425218d175a..a78a26b257cc 100644 --- a/velox/exec/OutputBufferManager.cpp +++ b/velox/exec/OutputBufferManager.cpp @@ -17,21 +17,18 @@ #include "velox/exec/Task.h" namespace facebook::velox::exec { + // static -void OutputBufferManager::initialize(const Options& options) { - std::lock_guard l(initMutex_); - VELOX_CHECK( - instance_ == nullptr, "May initialize OutputBufferManager only once"); - instance_ = std::make_shared(options); +std::weak_ptr OutputBufferManager::getInstance() { + return getInstanceRef(); } // static -std::weak_ptr OutputBufferManager::getInstance() { - std::lock_guard l(initMutex_); - if (!instance_) { - instance_ = std::make_shared(Options()); - } - return instance_; +const std::shared_ptr& +OutputBufferManager::getInstanceRef() { + static const std::shared_ptr instance = + std::make_shared(Options()); + return instance; } std::shared_ptr OutputBufferManager::getBuffer( diff --git a/velox/exec/OutputBufferManager.h b/velox/exec/OutputBufferManager.h index 847532cd554e..4b0f24259ff5 100644 --- a/velox/exec/OutputBufferManager.h +++ b/velox/exec/OutputBufferManager.h @@ -98,12 +98,10 @@ class OutputBufferManager { void removeTask(const std::string& taskId); - /// Initializes singleton with 'options'. May be called once before - /// getInstance(). - static void initialize(const Options& options); - static std::weak_ptr getInstance(); + static const std::shared_ptr& getInstanceRef(); + uint64_t numBuffers() const; // Returns a new stream listener if a listener factory has been set. @@ -157,8 +155,5 @@ class OutputBufferManager { std::function()> listenerFactory_{ nullptr}; - - inline static std::shared_ptr instance_; - inline static std::mutex initMutex_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index 67504891ec9e..e7525262877f 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -60,7 +60,7 @@ BlockingReason Destination::advance( if (serde_->kind() == VectorSerde::Kind::kPresto) { serializer::presto::PrestoVectorSerde::PrestoOptions options; options.compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); + OutputBufferManager::getInstanceRef()->compressionKind(); options.minCompressionRatio = PartitionedOutput::minCompressionRatio(); current_->createStreamTree(rowType, rowsInCurrent_, &options); } else { @@ -165,7 +165,7 @@ PartitionedOutput::PartitionedOutput( planNode->inputType(), planNode->outputType(), planNode->outputType())), - bufferManager_(OutputBufferManager::getInstance()), + bufferManager_(OutputBufferManager::getInstanceRef()), // NOTE: 'bufferReleaseFn_' holds a reference on the associated task to // prevent it from deleting while there are output buffers being accessed // out of the partitioned output buffer manager such as in Prestissimo, diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 7b10948b53a0..2726395cb112 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -296,7 +296,7 @@ Task::Task( consumerSupplier_(std::move(consumerSupplier)), onError_(std::move(onError)), splitsStates_(buildSplitStates(planFragment_.planNode)), - bufferManager_(OutputBufferManager::getInstance()) { + bufferManager_(OutputBufferManager::getInstanceRef()) { // NOTE: the executor must not be folly::InlineLikeExecutor for parallel // execution. if (mode_ == Task::ExecutionMode::kParallel) { diff --git a/velox/exec/tests/ExchangeClientTest.cpp b/velox/exec/tests/ExchangeClientTest.cpp index 60de09c2513f..f0dfaeac99cf 100644 --- a/velox/exec/tests/ExchangeClientTest.cpp +++ b/velox/exec/tests/ExchangeClientTest.cpp @@ -61,7 +61,7 @@ class ExchangeClientTest if (!isRegisteredVectorSerde()) { velox::serializer::presto::PrestoVectorSerde::registerVectorSerde(); } - bufferManager_ = OutputBufferManager::getInstance().lock(); + bufferManager_ = OutputBufferManager::getInstanceRef(); common::testutil::TestValue::enable(); } diff --git a/velox/exec/tests/GroupedExecutionTest.cpp b/velox/exec/tests/GroupedExecutionTest.cpp index e2e788506a89..2c75ac3dfc0e 100644 --- a/velox/exec/tests/GroupedExecutionTest.cpp +++ b/velox/exec/tests/GroupedExecutionTest.cpp @@ -287,7 +287,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithOutputBuffer) { // 'Delete results' from output buffer triggers 'set all output consumed', // which should finish the task. - auto outputBufferManager = exec::OutputBufferManager::getInstance().lock(); + auto outputBufferManager = exec::OutputBufferManager::getInstanceRef(); outputBufferManager->deleteResults(task->taskId(), 0); // Task must be finished at this stage. @@ -471,7 +471,7 @@ DEBUG_ONLY_TEST_F( // 'Delete results' from output buffer triggers 'set all output consumed', // which should finish the task. - auto outputBufferManager = exec::OutputBufferManager::getInstance().lock(); + auto outputBufferManager = exec::OutputBufferManager::getInstanceRef(); outputBufferManager->deleteResults(task->taskId(), 0); // Task must be finished at this stage. @@ -627,7 +627,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithHashAndNestedLoopJoin) { // 'Delete results' from output buffer triggers 'set all output consumed', // which should finish the task. - auto outputBufferManager = exec::OutputBufferManager::getInstance().lock(); + auto outputBufferManager = exec::OutputBufferManager::getInstanceRef(); outputBufferManager->deleteResults(task->taskId(), 0); // Task must be finished at this stage. diff --git a/velox/exec/tests/LimitTest.cpp b/velox/exec/tests/LimitTest.cpp index 0c1cae8fe15e..f15fa605384e 100644 --- a/velox/exec/tests/LimitTest.cpp +++ b/velox/exec/tests/LimitTest.cpp @@ -121,7 +121,7 @@ TEST_F(LimitTest, partialLimitEagerFlush) { params.planNode = builder.partitionedOutput({}, 1).planNode(); auto cursor = TaskCursor::create(params); ASSERT_FALSE(cursor->moveNext()); - auto bufferManager = exec::OutputBufferManager::getInstance().lock(); + auto bufferManager = exec::OutputBufferManager::getInstanceRef(); auto [numPagesPromise, numPagesFuture] = folly::makePromiseContract(); ASSERT_TRUE(bufferManager->getData( cursor->task()->taskId(), diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index b57a55e8fee9..772b6b5ef140 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -208,7 +208,7 @@ class MultiFragmentTest std::vector> filePaths_; std::vector vectors_; std::shared_ptr bufferManager_{ - OutputBufferManager::getInstance().lock()}; + OutputBufferManager::getInstanceRef()}; }; TEST_P(MultiFragmentTest, aggregationSingleKey) { @@ -2160,7 +2160,7 @@ class DataFetcher { std::vector> packetPageSizes_; std::shared_ptr bufferManager_{ - OutputBufferManager::getInstance().lock()}; + OutputBufferManager::getInstanceRef()}; }; /// Verify that POBM::getData() honors maxBytes parameter roughly at 1MB diff --git a/velox/exec/tests/OutputBufferManagerTest.cpp b/velox/exec/tests/OutputBufferManagerTest.cpp index 3902ead5ffc4..b84cea9a46f7 100644 --- a/velox/exec/tests/OutputBufferManagerTest.cpp +++ b/velox/exec/tests/OutputBufferManagerTest.cpp @@ -62,7 +62,7 @@ class OutputBufferManagerTest : public testing::Test { void SetUp() override { pool_ = facebook::velox::memory::memoryManager()->addLeafPool(); - bufferManager_ = OutputBufferManager::getInstance().lock(); + bufferManager_ = OutputBufferManager::getInstanceRef(); if (!isRegisteredVectorSerde()) { facebook::velox::serializer::presto::PrestoVectorSerde:: registerVectorSerde(); diff --git a/velox/exec/tests/PartitionedOutputTest.cpp b/velox/exec/tests/PartitionedOutputTest.cpp index 4ce97e38d02e..be921e80c913 100644 --- a/velox/exec/tests/PartitionedOutputTest.cpp +++ b/velox/exec/tests/PartitionedOutputTest.cpp @@ -91,7 +91,7 @@ class PartitionedOutputTest private: const std::shared_ptr bufferManager_{ - OutputBufferManager::getInstance().lock()}; + OutputBufferManager::getInstanceRef()}; }; TEST_P(PartitionedOutputTest, flush) { diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 0dc1fd057bc7..d8f153a45558 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -1045,7 +1045,7 @@ TEST_F(TaskTest, updateBroadCastOutputBuffers) { .project({"c0 % 10"}) .partitionedOutputBroadcast({}) .planFragment(); - auto bufferManager = OutputBufferManager::getInstance().lock(); + auto bufferManager = OutputBufferManager::getInstanceRef(); { auto task = Task::create( "t0", diff --git a/velox/exec/tests/utils/LocalExchangeSource.cpp b/velox/exec/tests/utils/LocalExchangeSource.cpp index 9d012c908c95..3bb497907951 100644 --- a/velox/exec/tests/utils/LocalExchangeSource.cpp +++ b/velox/exec/tests/utils/LocalExchangeSource.cpp @@ -52,7 +52,7 @@ class LocalExchangeSource : public exec::ExchangeSource { promise_ = std::move(promise); - auto buffers = OutputBufferManager::getInstance().lock(); + auto buffers = OutputBufferManager::getInstanceRef(); VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager"); VELOX_CHECK(requestPending_); auto requestedSequence = sequence_; @@ -164,7 +164,7 @@ class LocalExchangeSource : public exec::ExchangeSource { void pause() override { common::testutil::TestValue::adjust( "facebook::velox::exec::test::LocalExchangeSource::pause", nullptr); - auto buffers = OutputBufferManager::getInstance().lock(); + auto buffers = OutputBufferManager::getInstanceRef(); VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager"); int64_t ackSequence; { @@ -177,7 +177,7 @@ class LocalExchangeSource : public exec::ExchangeSource { void close() override { checkSetRequestPromise(); - auto buffers = OutputBufferManager::getInstance().lock(); + auto buffers = OutputBufferManager::getInstanceRef(); buffers->deleteResults(taskId_, destination_); } diff --git a/velox/tool/trace/PartitionedOutputReplayer.h b/velox/tool/trace/PartitionedOutputReplayer.h index 0855cbe483d9..dfa9c1a793ce 100644 --- a/velox/tool/trace/PartitionedOutputReplayer.h +++ b/velox/tool/trace/PartitionedOutputReplayer.h @@ -61,7 +61,7 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase { const core::PartitionedOutputNode* const originalNode_; const VectorSerde::Kind serdeKind_; const std::shared_ptr bufferManager_{ - exec::OutputBufferManager::getInstance().lock()}; + exec::OutputBufferManager::getInstanceRef()}; const std::unique_ptr executor_{ std::make_unique( std::thread::hardware_concurrency(), diff --git a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp index 28b821cadea3..4c758dad4a28 100644 --- a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp +++ b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp @@ -120,7 +120,7 @@ class PartitionedOutputReplayerTest } const std::shared_ptr bufferManager_{ - exec::OutputBufferManager::getInstance().lock()}; + exec::OutputBufferManager::getInstanceRef()}; }; TEST_P(PartitionedOutputReplayerTest, defaultConsumer) {