Skip to content

Commit

Permalink
Improve OutputBufferManager initialization
Browse files Browse the repository at this point in the history
Starting from C++11, the C++ standard guarantees that the initialization
of function-local static variables is thread-safe. This is better than
using a global mutex, especially for subsequent getInstance() calls. This
is because the overhead of using a static local variable only needs to do
a simple check to see if the variable has already been initialized, while
for the global mutex case, all getInstance() calls need to aquire this
lock exclusively.
  • Loading branch information
yingsu00 committed Nov 2, 2024
1 parent 6c7bcd7 commit 67d13ea
Show file tree
Hide file tree
Showing 14 changed files with 22 additions and 38 deletions.
2 changes: 1 addition & 1 deletion velox/exec/Exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class Exchange : public SourceOperator {
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
exchangeClient_{std::move(exchangeClient)} {
options_.compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
OutputBufferManager::getInstance()->compressionKind();
}

~Exchange() override {
Expand Down
17 changes: 4 additions & 13 deletions velox/exec/OutputBufferManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,12 @@
#include "velox/exec/Task.h"

namespace facebook::velox::exec {
// static
void OutputBufferManager::initialize(const Options& options) {
std::lock_guard<std::mutex> l(initMutex_);
VELOX_CHECK(
instance_ == nullptr, "May initialize OutputBufferManager only once");
instance_ = std::make_shared<OutputBufferManager>(options);
}

// static
std::weak_ptr<OutputBufferManager> OutputBufferManager::getInstance() {
std::lock_guard<std::mutex> l(initMutex_);
if (!instance_) {
instance_ = std::make_shared<OutputBufferManager>(Options());
}
return instance_;
const std::shared_ptr<OutputBufferManager>& OutputBufferManager::getInstance() {
static const std::shared_ptr<OutputBufferManager> instance =
std::make_shared<OutputBufferManager>(Options());
return instance;
}

std::shared_ptr<OutputBuffer> OutputBufferManager::getBuffer(
Expand Down
9 changes: 1 addition & 8 deletions velox/exec/OutputBufferManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,7 @@ class OutputBufferManager {

void removeTask(const std::string& taskId);

/// Initializes singleton with 'options'. May be called once before
/// getInstance().
static void initialize(const Options& options);

static std::weak_ptr<OutputBufferManager> getInstance();
static const std::shared_ptr<OutputBufferManager>& getInstance();

uint64_t numBuffers() const;

Expand Down Expand Up @@ -157,8 +153,5 @@ class OutputBufferManager {

std::function<std::unique_ptr<OutputStreamListener>()> listenerFactory_{
nullptr};

inline static std::shared_ptr<OutputBufferManager> instance_;
inline static std::mutex initMutex_;
};
} // namespace facebook::velox::exec
2 changes: 1 addition & 1 deletion velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ BlockingReason Destination::advance(
auto rowType = asRowType(output->type());
serializer::presto::PrestoVectorSerde::PrestoOptions options;
options.compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
OutputBufferManager::getInstance()->compressionKind();
options.minCompressionRatio = PartitionedOutput::minCompressionRatio();
current_->createStreamTree(rowType, rowsInCurrent_, &options);
}
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/ExchangeClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ExchangeClientTest : public testing::Test,
if (!isRegisteredVectorSerde()) {
velox::serializer::presto::PrestoVectorSerde::registerVectorSerde();
}
bufferManager_ = OutputBufferManager::getInstance().lock();
bufferManager_ = OutputBufferManager::getInstance();

common::testutil::TestValue::enable();
}
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/tests/GroupedExecutionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithOutputBuffer) {

// 'Delete results' from output buffer triggers 'set all output consumed',
// which should finish the task.
auto outputBufferManager = exec::OutputBufferManager::getInstance().lock();
auto outputBufferManager = exec::OutputBufferManager::getInstance();
outputBufferManager->deleteResults(task->taskId(), 0);

// Task must be finished at this stage.
Expand Down Expand Up @@ -471,7 +471,7 @@ DEBUG_ONLY_TEST_F(

// 'Delete results' from output buffer triggers 'set all output consumed',
// which should finish the task.
auto outputBufferManager = exec::OutputBufferManager::getInstance().lock();
auto outputBufferManager = exec::OutputBufferManager::getInstance();
outputBufferManager->deleteResults(task->taskId(), 0);

// Task must be finished at this stage.
Expand Down Expand Up @@ -627,7 +627,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithHashAndNestedLoopJoin) {

// 'Delete results' from output buffer triggers 'set all output consumed',
// which should finish the task.
auto outputBufferManager = exec::OutputBufferManager::getInstance().lock();
auto outputBufferManager = exec::OutputBufferManager::getInstance();
outputBufferManager->deleteResults(task->taskId(), 0);

// Task must be finished at this stage.
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/LimitTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ TEST_F(LimitTest, partialLimitEagerFlush) {
params.planNode = builder.partitionedOutput({}, 1).planNode();
auto cursor = TaskCursor::create(params);
ASSERT_FALSE(cursor->moveNext());
auto bufferManager = exec::OutputBufferManager::getInstance().lock();
auto bufferManager = exec::OutputBufferManager::getInstance();
auto [numPagesPromise, numPagesFuture] = folly::makePromiseContract<int>();
ASSERT_TRUE(bufferManager->getData(
cursor->task()->taskId(),
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class MultiFragmentTest : public HiveConnectorTestBase {
std::vector<std::shared_ptr<TempFilePath>> filePaths_;
std::vector<RowVectorPtr> vectors_;
std::shared_ptr<OutputBufferManager> bufferManager_{
OutputBufferManager::getInstance().lock()};
OutputBufferManager::getInstance()};
};

TEST_F(MultiFragmentTest, aggregationSingleKey) {
Expand Down Expand Up @@ -1940,7 +1940,7 @@ class DataFetcher {
int64_t totalBytes_{0};

std::shared_ptr<OutputBufferManager> bufferManager_{
OutputBufferManager::getInstance().lock()};
OutputBufferManager::getInstance()};
};

/// Verify that POBM::getData() honors maxBytes parameter roughly at 1MB
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/OutputBufferManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class OutputBufferManagerTest : public testing::Test {

void SetUp() override {
pool_ = facebook::velox::memory::memoryManager()->addLeafPool();
bufferManager_ = OutputBufferManager::getInstance().lock();
bufferManager_ = OutputBufferManager::getInstance();
if (!isRegisteredVectorSerde()) {
facebook::velox::serializer::presto::PrestoVectorSerde::
registerVectorSerde();
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/PartitionedOutputTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class PartitionedOutputTest : public OperatorTestBase {

private:
const std::shared_ptr<OutputBufferManager> bufferManager_{
OutputBufferManager::getInstance().lock()};
OutputBufferManager::getInstance()};
};

TEST_F(PartitionedOutputTest, flush) {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ TEST_F(TaskTest, updateBroadCastOutputBuffers) {
.project({"c0 % 10"})
.partitionedOutputBroadcast({})
.planFragment();
auto bufferManager = OutputBufferManager::getInstance().lock();
auto bufferManager = OutputBufferManager::getInstance();
{
auto task = Task::create(
"t0",
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/tests/utils/LocalExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class LocalExchangeSource : public exec::ExchangeSource {

promise_ = std::move(promise);

auto buffers = OutputBufferManager::getInstance().lock();
auto buffers = OutputBufferManager::getInstance();
VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager");
VELOX_CHECK(requestPending_);
auto requestedSequence = sequence_;
Expand Down Expand Up @@ -164,7 +164,7 @@ class LocalExchangeSource : public exec::ExchangeSource {
void pause() override {
common::testutil::TestValue::adjust(
"facebook::velox::exec::test::LocalExchangeSource::pause", nullptr);
auto buffers = OutputBufferManager::getInstance().lock();
auto buffers = OutputBufferManager::getInstance();
VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager");
int64_t ackSequence;
{
Expand All @@ -177,7 +177,7 @@ class LocalExchangeSource : public exec::ExchangeSource {
void close() override {
checkSetRequestPromise();

auto buffers = OutputBufferManager::getInstance().lock();
auto buffers = OutputBufferManager::getInstance();
buffers->deleteResults(taskId_, destination_);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/PartitionedOutputReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase {

const core::PartitionedOutputNode* const originalNode_;
const std::shared_ptr<exec::OutputBufferManager> bufferManager_{
exec::OutputBufferManager::getInstance().lock()};
exec::OutputBufferManager::getInstance()};
const std::unique_ptr<folly::Executor> executor_{
std::make_unique<folly::CPUThreadPoolExecutor>(
std::thread::hardware_concurrency(),
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class PartitionedOutputReplayerTest : public HiveConnectorTestBase {
}

const std::shared_ptr<OutputBufferManager> bufferManager_{
exec::OutputBufferManager::getInstance().lock()};
exec::OutputBufferManager::getInstance()};
};

TEST_F(PartitionedOutputReplayerTest, defaultConsumer) {
Expand Down

0 comments on commit 67d13ea

Please sign in to comment.