Skip to content

Commit

Permalink
misc: Improve OutputBufferManager initialization
Browse files Browse the repository at this point in the history
Starting from C++11, the C++ standard guarantees that the initialization
of function-local static variables is thread-safe. This is better than
using a global mutex, especially for subsequent getInstance() calls. This
is because the overhead of using a static local variable only needs to do
a simple check to see if the variable has already been initialized, while
for the global mutex case, all getInstance() calls need to aquire this
lock exclusively.
  • Loading branch information
yingsu00 committed Dec 4, 2024
1 parent 03f0894 commit fdfa1fd
Show file tree
Hide file tree
Showing 15 changed files with 29 additions and 37 deletions.
2 changes: 1 addition & 1 deletion velox/exec/Exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Exchange : public SourceOperator {
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
exchangeClient_{std::move(exchangeClient)} {
options_.compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
OutputBufferManager::getInstanceRef()->compressionKind();
}

~Exchange() override {
Expand Down
19 changes: 8 additions & 11 deletions velox/exec/OutputBufferManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,18 @@
#include "velox/exec/Task.h"

namespace facebook::velox::exec {

// static
void OutputBufferManager::initialize(const Options& options) {
std::lock_guard<std::mutex> l(initMutex_);
VELOX_CHECK(
instance_ == nullptr, "May initialize OutputBufferManager only once");
instance_ = std::make_shared<OutputBufferManager>(options);
std::weak_ptr<OutputBufferManager> OutputBufferManager::getInstance() {
return getInstanceRef();
}

// static
std::weak_ptr<OutputBufferManager> OutputBufferManager::getInstance() {
std::lock_guard<std::mutex> l(initMutex_);
if (!instance_) {
instance_ = std::make_shared<OutputBufferManager>(Options());
}
return instance_;
const std::shared_ptr<OutputBufferManager>&
OutputBufferManager::getInstanceRef() {
static const std::shared_ptr<OutputBufferManager> instance =
std::make_shared<OutputBufferManager>(Options());
return instance;
}

std::shared_ptr<OutputBuffer> OutputBufferManager::getBuffer(
Expand Down
9 changes: 2 additions & 7 deletions velox/exec/OutputBufferManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,10 @@ class OutputBufferManager {

void removeTask(const std::string& taskId);

/// Initializes singleton with 'options'. May be called once before
/// getInstance().
static void initialize(const Options& options);

static std::weak_ptr<OutputBufferManager> getInstance();

static const std::shared_ptr<OutputBufferManager>& getInstanceRef();

uint64_t numBuffers() const;

// Returns a new stream listener if a listener factory has been set.
Expand Down Expand Up @@ -157,8 +155,5 @@ class OutputBufferManager {

std::function<std::unique_ptr<OutputStreamListener>()> listenerFactory_{
nullptr};

inline static std::shared_ptr<OutputBufferManager> instance_;
inline static std::mutex initMutex_;
};
} // namespace facebook::velox::exec
4 changes: 2 additions & 2 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ BlockingReason Destination::advance(
if (serde_->kind() == VectorSerde::Kind::kPresto) {
serializer::presto::PrestoVectorSerde::PrestoOptions options;
options.compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
OutputBufferManager::getInstanceRef()->compressionKind();
options.minCompressionRatio = PartitionedOutput::minCompressionRatio();
current_->createStreamTree(rowType, rowsInCurrent_, &options);
} else {
Expand Down Expand Up @@ -165,7 +165,7 @@ PartitionedOutput::PartitionedOutput(
planNode->inputType(),
planNode->outputType(),
planNode->outputType())),
bufferManager_(OutputBufferManager::getInstance()),
bufferManager_(OutputBufferManager::getInstanceRef()),
// NOTE: 'bufferReleaseFn_' holds a reference on the associated task to
// prevent it from deleting while there are output buffers being accessed
// out of the partitioned output buffer manager such as in Prestissimo,
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ Task::Task(
consumerSupplier_(std::move(consumerSupplier)),
onError_(std::move(onError)),
splitsStates_(buildSplitStates(planFragment_.planNode)),
bufferManager_(OutputBufferManager::getInstance()) {
bufferManager_(OutputBufferManager::getInstanceRef()) {
// NOTE: the executor must not be folly::InlineLikeExecutor for parallel
// execution.
if (mode_ == Task::ExecutionMode::kParallel) {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/ExchangeClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class ExchangeClientTest
if (!isRegisteredVectorSerde()) {
velox::serializer::presto::PrestoVectorSerde::registerVectorSerde();
}
bufferManager_ = OutputBufferManager::getInstance().lock();
bufferManager_ = OutputBufferManager::getInstanceRef();

common::testutil::TestValue::enable();
}
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/tests/GroupedExecutionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithOutputBuffer) {

// 'Delete results' from output buffer triggers 'set all output consumed',
// which should finish the task.
auto outputBufferManager = exec::OutputBufferManager::getInstance().lock();
auto outputBufferManager = exec::OutputBufferManager::getInstanceRef();
outputBufferManager->deleteResults(task->taskId(), 0);

// Task must be finished at this stage.
Expand Down Expand Up @@ -471,7 +471,7 @@ DEBUG_ONLY_TEST_F(

// 'Delete results' from output buffer triggers 'set all output consumed',
// which should finish the task.
auto outputBufferManager = exec::OutputBufferManager::getInstance().lock();
auto outputBufferManager = exec::OutputBufferManager::getInstanceRef();
outputBufferManager->deleteResults(task->taskId(), 0);

// Task must be finished at this stage.
Expand Down Expand Up @@ -627,7 +627,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithHashAndNestedLoopJoin) {

// 'Delete results' from output buffer triggers 'set all output consumed',
// which should finish the task.
auto outputBufferManager = exec::OutputBufferManager::getInstance().lock();
auto outputBufferManager = exec::OutputBufferManager::getInstanceRef();
outputBufferManager->deleteResults(task->taskId(), 0);

// Task must be finished at this stage.
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/LimitTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ TEST_F(LimitTest, partialLimitEagerFlush) {
params.planNode = builder.partitionedOutput({}, 1).planNode();
auto cursor = TaskCursor::create(params);
ASSERT_FALSE(cursor->moveNext());
auto bufferManager = exec::OutputBufferManager::getInstance().lock();
auto bufferManager = exec::OutputBufferManager::getInstanceRef();
auto [numPagesPromise, numPagesFuture] = folly::makePromiseContract<int>();
ASSERT_TRUE(bufferManager->getData(
cursor->task()->taskId(),
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class MultiFragmentTest
std::vector<std::shared_ptr<TempFilePath>> filePaths_;
std::vector<RowVectorPtr> vectors_;
std::shared_ptr<OutputBufferManager> bufferManager_{
OutputBufferManager::getInstance().lock()};
OutputBufferManager::getInstanceRef()};
};

TEST_P(MultiFragmentTest, aggregationSingleKey) {
Expand Down Expand Up @@ -2160,7 +2160,7 @@ class DataFetcher {
std::vector<std::vector<std::size_t>> packetPageSizes_;

std::shared_ptr<OutputBufferManager> bufferManager_{
OutputBufferManager::getInstance().lock()};
OutputBufferManager::getInstanceRef()};
};

/// Verify that POBM::getData() honors maxBytes parameter roughly at 1MB
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/OutputBufferManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class OutputBufferManagerTest : public testing::Test {

void SetUp() override {
pool_ = facebook::velox::memory::memoryManager()->addLeafPool();
bufferManager_ = OutputBufferManager::getInstance().lock();
bufferManager_ = OutputBufferManager::getInstanceRef();
if (!isRegisteredVectorSerde()) {
facebook::velox::serializer::presto::PrestoVectorSerde::
registerVectorSerde();
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/PartitionedOutputTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class PartitionedOutputTest

private:
const std::shared_ptr<OutputBufferManager> bufferManager_{
OutputBufferManager::getInstance().lock()};
OutputBufferManager::getInstanceRef()};
};

TEST_P(PartitionedOutputTest, flush) {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ TEST_F(TaskTest, updateBroadCastOutputBuffers) {
.project({"c0 % 10"})
.partitionedOutputBroadcast({})
.planFragment();
auto bufferManager = OutputBufferManager::getInstance().lock();
auto bufferManager = OutputBufferManager::getInstanceRef();
{
auto task = Task::create(
"t0",
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/tests/utils/LocalExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class LocalExchangeSource : public exec::ExchangeSource {

promise_ = std::move(promise);

auto buffers = OutputBufferManager::getInstance().lock();
auto buffers = OutputBufferManager::getInstanceRef();
VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager");
VELOX_CHECK(requestPending_);
auto requestedSequence = sequence_;
Expand Down Expand Up @@ -164,7 +164,7 @@ class LocalExchangeSource : public exec::ExchangeSource {
void pause() override {
common::testutil::TestValue::adjust(
"facebook::velox::exec::test::LocalExchangeSource::pause", nullptr);
auto buffers = OutputBufferManager::getInstance().lock();
auto buffers = OutputBufferManager::getInstanceRef();
VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager");
int64_t ackSequence;
{
Expand All @@ -177,7 +177,7 @@ class LocalExchangeSource : public exec::ExchangeSource {
void close() override {
checkSetRequestPromise();

auto buffers = OutputBufferManager::getInstance().lock();
auto buffers = OutputBufferManager::getInstanceRef();
buffers->deleteResults(taskId_, destination_);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/PartitionedOutputReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase {
const core::PartitionedOutputNode* const originalNode_;
const VectorSerde::Kind serdeKind_;
const std::shared_ptr<exec::OutputBufferManager> bufferManager_{
exec::OutputBufferManager::getInstance().lock()};
exec::OutputBufferManager::getInstanceRef()};
const std::unique_ptr<folly::Executor> executor_{
std::make_unique<folly::CPUThreadPoolExecutor>(
std::thread::hardware_concurrency(),
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class PartitionedOutputReplayerTest
}

const std::shared_ptr<OutputBufferManager> bufferManager_{
exec::OutputBufferManager::getInstance().lock()};
exec::OutputBufferManager::getInstanceRef()};
};

TEST_P(PartitionedOutputReplayerTest, defaultConsumer) {
Expand Down

0 comments on commit fdfa1fd

Please sign in to comment.