diff --git a/velox/exec/Exchange.h b/velox/exec/Exchange.h index 0aa69fa9f9509..8dbe683141540 100644 --- a/velox/exec/Exchange.h +++ b/velox/exec/Exchange.h @@ -55,7 +55,7 @@ class Exchange : public SourceOperator { processSplits_{operatorCtx_->driverCtx()->driverId == 0}, exchangeClient_{std::move(exchangeClient)} { options_.compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); + OutputBufferManager::getInstanceStrong()->compressionKind(); } ~Exchange() override { diff --git a/velox/exec/OutputBufferManager.cpp b/velox/exec/OutputBufferManager.cpp index 0425218d175a5..07fde7513622d 100644 --- a/velox/exec/OutputBufferManager.cpp +++ b/velox/exec/OutputBufferManager.cpp @@ -17,21 +17,18 @@ #include "velox/exec/Task.h" namespace facebook::velox::exec { + // static -void OutputBufferManager::initialize(const Options& options) { - std::lock_guard l(initMutex_); - VELOX_CHECK( - instance_ == nullptr, "May initialize OutputBufferManager only once"); - instance_ = std::make_shared(options); +std::weak_ptr OutputBufferManager::getInstance() { + return getInstanceStrong(); } // static -std::weak_ptr OutputBufferManager::getInstance() { - std::lock_guard l(initMutex_); - if (!instance_) { - instance_ = std::make_shared(Options()); - } - return instance_; +const std::shared_ptr& +OutputBufferManager::getInstanceStrong() { + static const std::shared_ptr instance = + std::make_shared(Options()); + return instance; } std::shared_ptr OutputBufferManager::getBuffer( diff --git a/velox/exec/OutputBufferManager.h b/velox/exec/OutputBufferManager.h index 847532cd554e9..18ef7b1bbfc37 100644 --- a/velox/exec/OutputBufferManager.h +++ b/velox/exec/OutputBufferManager.h @@ -98,12 +98,10 @@ class OutputBufferManager { void removeTask(const std::string& taskId); - /// Initializes singleton with 'options'. May be called once before - /// getInstance(). - static void initialize(const Options& options); - static std::weak_ptr getInstance(); + static const std::shared_ptr& getInstanceStrong(); + uint64_t numBuffers() const; // Returns a new stream listener if a listener factory has been set. @@ -157,8 +155,5 @@ class OutputBufferManager { std::function()> listenerFactory_{ nullptr}; - - inline static std::shared_ptr instance_; - inline static std::mutex initMutex_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index 67504891ec9e2..e50f2fd23880d 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -60,7 +60,7 @@ BlockingReason Destination::advance( if (serde_->kind() == VectorSerde::Kind::kPresto) { serializer::presto::PrestoVectorSerde::PrestoOptions options; options.compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); + OutputBufferManager::getInstanceStrong()->compressionKind(); options.minCompressionRatio = PartitionedOutput::minCompressionRatio(); current_->createStreamTree(rowType, rowsInCurrent_, &options); } else { @@ -165,7 +165,7 @@ PartitionedOutput::PartitionedOutput( planNode->inputType(), planNode->outputType(), planNode->outputType())), - bufferManager_(OutputBufferManager::getInstance()), + bufferManager_(OutputBufferManager::getInstanceStrong()), // NOTE: 'bufferReleaseFn_' holds a reference on the associated task to // prevent it from deleting while there are output buffers being accessed // out of the partitioned output buffer manager such as in Prestissimo, diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 0bce65f91d14e..764b903ae8d05 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -296,7 +296,7 @@ Task::Task( consumerSupplier_(std::move(consumerSupplier)), onError_(std::move(onError)), splitsStates_(buildSplitStates(planFragment_.planNode)), - bufferManager_(OutputBufferManager::getInstance()) { + bufferManager_(OutputBufferManager::getInstanceStrong()) { // NOTE: the executor must not be folly::InlineLikeExecutor for parallel // execution. if (mode_ == Task::ExecutionMode::kParallel) { diff --git a/velox/exec/tests/ExchangeClientTest.cpp b/velox/exec/tests/ExchangeClientTest.cpp index 60de09c2513f4..f4f3bd6c06ffe 100644 --- a/velox/exec/tests/ExchangeClientTest.cpp +++ b/velox/exec/tests/ExchangeClientTest.cpp @@ -61,7 +61,7 @@ class ExchangeClientTest if (!isRegisteredVectorSerde()) { velox::serializer::presto::PrestoVectorSerde::registerVectorSerde(); } - bufferManager_ = OutputBufferManager::getInstance().lock(); + bufferManager_ = OutputBufferManager::getInstanceStrong(); common::testutil::TestValue::enable(); } diff --git a/velox/exec/tests/GroupedExecutionTest.cpp b/velox/exec/tests/GroupedExecutionTest.cpp index e2e788506a89c..dae983bec3bcd 100644 --- a/velox/exec/tests/GroupedExecutionTest.cpp +++ b/velox/exec/tests/GroupedExecutionTest.cpp @@ -287,7 +287,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithOutputBuffer) { // 'Delete results' from output buffer triggers 'set all output consumed', // which should finish the task. - auto outputBufferManager = exec::OutputBufferManager::getInstance().lock(); + auto outputBufferManager = exec::OutputBufferManager::getInstanceStrong(); outputBufferManager->deleteResults(task->taskId(), 0); // Task must be finished at this stage. @@ -471,7 +471,7 @@ DEBUG_ONLY_TEST_F( // 'Delete results' from output buffer triggers 'set all output consumed', // which should finish the task. - auto outputBufferManager = exec::OutputBufferManager::getInstance().lock(); + auto outputBufferManager = exec::OutputBufferManager::getInstanceStrong(); outputBufferManager->deleteResults(task->taskId(), 0); // Task must be finished at this stage. @@ -627,7 +627,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithHashAndNestedLoopJoin) { // 'Delete results' from output buffer triggers 'set all output consumed', // which should finish the task. - auto outputBufferManager = exec::OutputBufferManager::getInstance().lock(); + auto outputBufferManager = exec::OutputBufferManager::getInstanceStrong(); outputBufferManager->deleteResults(task->taskId(), 0); // Task must be finished at this stage. diff --git a/velox/exec/tests/LimitTest.cpp b/velox/exec/tests/LimitTest.cpp index 0c1cae8fe15e4..352ed4abd5411 100644 --- a/velox/exec/tests/LimitTest.cpp +++ b/velox/exec/tests/LimitTest.cpp @@ -121,7 +121,7 @@ TEST_F(LimitTest, partialLimitEagerFlush) { params.planNode = builder.partitionedOutput({}, 1).planNode(); auto cursor = TaskCursor::create(params); ASSERT_FALSE(cursor->moveNext()); - auto bufferManager = exec::OutputBufferManager::getInstance().lock(); + auto bufferManager = exec::OutputBufferManager::getInstanceStrong(); auto [numPagesPromise, numPagesFuture] = folly::makePromiseContract(); ASSERT_TRUE(bufferManager->getData( cursor->task()->taskId(), diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index f6ef796693911..73fac96e06cc7 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -206,7 +206,7 @@ class MultiFragmentTest std::vector> filePaths_; std::vector vectors_; std::shared_ptr bufferManager_{ - OutputBufferManager::getInstance().lock()}; + OutputBufferManager::getInstanceStrong()}; }; TEST_P(MultiFragmentTest, aggregationSingleKey) { @@ -2057,7 +2057,7 @@ class DataFetcher { int64_t totalBytes_{0}; std::shared_ptr bufferManager_{ - OutputBufferManager::getInstance().lock()}; + OutputBufferManager::getInstanceStrong()}; }; /// Verify that POBM::getData() honors maxBytes parameter roughly at 1MB diff --git a/velox/exec/tests/OutputBufferManagerTest.cpp b/velox/exec/tests/OutputBufferManagerTest.cpp index e341ae4329fe0..507cf1d47c008 100644 --- a/velox/exec/tests/OutputBufferManagerTest.cpp +++ b/velox/exec/tests/OutputBufferManagerTest.cpp @@ -62,7 +62,7 @@ class OutputBufferManagerTest : public testing::Test { void SetUp() override { pool_ = facebook::velox::memory::memoryManager()->addLeafPool(); - bufferManager_ = OutputBufferManager::getInstance().lock(); + bufferManager_ = OutputBufferManager::getInstanceStrong(); if (!isRegisteredVectorSerde()) { facebook::velox::serializer::presto::PrestoVectorSerde:: registerVectorSerde(); diff --git a/velox/exec/tests/PartitionedOutputTest.cpp b/velox/exec/tests/PartitionedOutputTest.cpp index 4ce97e38d02eb..e34b6032abdbb 100644 --- a/velox/exec/tests/PartitionedOutputTest.cpp +++ b/velox/exec/tests/PartitionedOutputTest.cpp @@ -91,7 +91,7 @@ class PartitionedOutputTest private: const std::shared_ptr bufferManager_{ - OutputBufferManager::getInstance().lock()}; + OutputBufferManager::getInstanceStrong()}; }; TEST_P(PartitionedOutputTest, flush) { diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 474c1e4ca5032..578572f3b39f3 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -1044,7 +1044,7 @@ TEST_F(TaskTest, updateBroadCastOutputBuffers) { .project({"c0 % 10"}) .partitionedOutputBroadcast({}) .planFragment(); - auto bufferManager = OutputBufferManager::getInstance().lock(); + auto bufferManager = OutputBufferManager::getInstanceStrong(); { auto task = Task::create( "t0", diff --git a/velox/exec/tests/utils/LocalExchangeSource.cpp b/velox/exec/tests/utils/LocalExchangeSource.cpp index 9d012c908c95e..2bb05691185cd 100644 --- a/velox/exec/tests/utils/LocalExchangeSource.cpp +++ b/velox/exec/tests/utils/LocalExchangeSource.cpp @@ -52,7 +52,7 @@ class LocalExchangeSource : public exec::ExchangeSource { promise_ = std::move(promise); - auto buffers = OutputBufferManager::getInstance().lock(); + auto buffers = OutputBufferManager::getInstanceStrong(); VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager"); VELOX_CHECK(requestPending_); auto requestedSequence = sequence_; @@ -164,7 +164,7 @@ class LocalExchangeSource : public exec::ExchangeSource { void pause() override { common::testutil::TestValue::adjust( "facebook::velox::exec::test::LocalExchangeSource::pause", nullptr); - auto buffers = OutputBufferManager::getInstance().lock(); + auto buffers = OutputBufferManager::getInstanceStrong(); VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager"); int64_t ackSequence; { @@ -177,7 +177,7 @@ class LocalExchangeSource : public exec::ExchangeSource { void close() override { checkSetRequestPromise(); - auto buffers = OutputBufferManager::getInstance().lock(); + auto buffers = OutputBufferManager::getInstanceStrong(); buffers->deleteResults(taskId_, destination_); } diff --git a/velox/tool/trace/PartitionedOutputReplayer.h b/velox/tool/trace/PartitionedOutputReplayer.h index 873bb185addc3..7ac5d779624fc 100644 --- a/velox/tool/trace/PartitionedOutputReplayer.h +++ b/velox/tool/trace/PartitionedOutputReplayer.h @@ -60,7 +60,7 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase { const core::PartitionedOutputNode* const originalNode_; const VectorSerde::Kind serdeKind_; const std::shared_ptr bufferManager_{ - exec::OutputBufferManager::getInstance().lock()}; + exec::OutputBufferManager::getInstanceStrong()}; const std::unique_ptr executor_{ std::make_unique( std::thread::hardware_concurrency(), diff --git a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp index c9fd1498eef73..8328c7c771e6b 100644 --- a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp +++ b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp @@ -120,7 +120,7 @@ class PartitionedOutputReplayerTest } const std::shared_ptr bufferManager_{ - exec::OutputBufferManager::getInstance().lock()}; + exec::OutputBufferManager::getInstanceStrong()}; }; TEST_P(PartitionedOutputReplayerTest, defaultConsumer) {