Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc: Improve OutputBufferManager initialization #11350

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion velox/exec/Exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Exchange : public SourceOperator {
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
exchangeClient_{std::move(exchangeClient)} {
options_.compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
OutputBufferManager::getInstanceRef()->compressionKind();
}

~Exchange() override {
Expand Down
19 changes: 8 additions & 11 deletions velox/exec/OutputBufferManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,18 @@
#include "velox/exec/Task.h"

namespace facebook::velox::exec {

// static
void OutputBufferManager::initialize(const Options& options) {
std::lock_guard<std::mutex> l(initMutex_);
VELOX_CHECK(
instance_ == nullptr, "May initialize OutputBufferManager only once");
instance_ = std::make_shared<OutputBufferManager>(options);
std::weak_ptr<OutputBufferManager> OutputBufferManager::getInstance() {
return getInstanceRef();
}

// static
std::weak_ptr<OutputBufferManager> OutputBufferManager::getInstance() {
std::lock_guard<std::mutex> l(initMutex_);
if (!instance_) {
instance_ = std::make_shared<OutputBufferManager>(Options());
}
return instance_;
const std::shared_ptr<OutputBufferManager>&
OutputBufferManager::getInstanceRef() {
static const std::shared_ptr<OutputBufferManager> instance =
std::make_shared<OutputBufferManager>(Options());
return instance;
}

std::shared_ptr<OutputBuffer> OutputBufferManager::getBuffer(
Expand Down
9 changes: 2 additions & 7 deletions velox/exec/OutputBufferManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,10 @@ class OutputBufferManager {

void removeTask(const std::string& taskId);

/// Initializes singleton with 'options'. May be called once before
/// getInstance().
static void initialize(const Options& options);

static std::weak_ptr<OutputBufferManager> getInstance();

static const std::shared_ptr<OutputBufferManager>& getInstanceRef();

uint64_t numBuffers() const;

// Returns a new stream listener if a listener factory has been set.
Expand Down Expand Up @@ -157,8 +155,5 @@ class OutputBufferManager {

std::function<std::unique_ptr<OutputStreamListener>()> listenerFactory_{
nullptr};

inline static std::shared_ptr<OutputBufferManager> instance_;
inline static std::mutex initMutex_;
};
} // namespace facebook::velox::exec
4 changes: 2 additions & 2 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ BlockingReason Destination::advance(
if (serde_->kind() == VectorSerde::Kind::kPresto) {
serializer::presto::PrestoVectorSerde::PrestoOptions options;
options.compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
OutputBufferManager::getInstanceRef()->compressionKind();
options.minCompressionRatio = PartitionedOutput::minCompressionRatio();
current_->createStreamTree(rowType, rowsInCurrent_, &options);
} else {
Expand Down Expand Up @@ -165,7 +165,7 @@ PartitionedOutput::PartitionedOutput(
planNode->inputType(),
planNode->outputType(),
planNode->outputType())),
bufferManager_(OutputBufferManager::getInstance()),
bufferManager_(OutputBufferManager::getInstanceRef()),
// NOTE: 'bufferReleaseFn_' holds a reference on the associated task to
// prevent it from deleting while there are output buffers being accessed
// out of the partitioned output buffer manager such as in Prestissimo,
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ Task::Task(
consumerSupplier_(std::move(consumerSupplier)),
onError_(std::move(onError)),
splitsStates_(buildSplitStates(planFragment_.planNode)),
bufferManager_(OutputBufferManager::getInstance()) {
bufferManager_(OutputBufferManager::getInstanceRef()) {
// NOTE: the executor must not be folly::InlineLikeExecutor for parallel
// execution.
if (mode_ == Task::ExecutionMode::kParallel) {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/ExchangeClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class ExchangeClientTest
if (!isRegisteredVectorSerde()) {
velox::serializer::presto::PrestoVectorSerde::registerVectorSerde();
}
bufferManager_ = OutputBufferManager::getInstance().lock();
bufferManager_ = OutputBufferManager::getInstanceRef();

common::testutil::TestValue::enable();
}
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/tests/GroupedExecutionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithOutputBuffer) {

// 'Delete results' from output buffer triggers 'set all output consumed',
// which should finish the task.
auto outputBufferManager = exec::OutputBufferManager::getInstance().lock();
auto outputBufferManager = exec::OutputBufferManager::getInstanceRef();
outputBufferManager->deleteResults(task->taskId(), 0);

// Task must be finished at this stage.
Expand Down Expand Up @@ -471,7 +471,7 @@ DEBUG_ONLY_TEST_F(

// 'Delete results' from output buffer triggers 'set all output consumed',
// which should finish the task.
auto outputBufferManager = exec::OutputBufferManager::getInstance().lock();
auto outputBufferManager = exec::OutputBufferManager::getInstanceRef();
outputBufferManager->deleteResults(task->taskId(), 0);

// Task must be finished at this stage.
Expand Down Expand Up @@ -627,7 +627,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithHashAndNestedLoopJoin) {

// 'Delete results' from output buffer triggers 'set all output consumed',
// which should finish the task.
auto outputBufferManager = exec::OutputBufferManager::getInstance().lock();
auto outputBufferManager = exec::OutputBufferManager::getInstanceRef();
outputBufferManager->deleteResults(task->taskId(), 0);

// Task must be finished at this stage.
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/LimitTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ TEST_F(LimitTest, partialLimitEagerFlush) {
params.planNode = builder.partitionedOutput({}, 1).planNode();
auto cursor = TaskCursor::create(params);
ASSERT_FALSE(cursor->moveNext());
auto bufferManager = exec::OutputBufferManager::getInstance().lock();
auto bufferManager = exec::OutputBufferManager::getInstanceRef();
auto [numPagesPromise, numPagesFuture] = folly::makePromiseContract<int>();
ASSERT_TRUE(bufferManager->getData(
cursor->task()->taskId(),
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class MultiFragmentTest
std::vector<std::shared_ptr<TempFilePath>> filePaths_;
std::vector<RowVectorPtr> vectors_;
std::shared_ptr<OutputBufferManager> bufferManager_{
OutputBufferManager::getInstance().lock()};
OutputBufferManager::getInstanceRef()};
};

TEST_P(MultiFragmentTest, aggregationSingleKey) {
Expand Down Expand Up @@ -2160,7 +2160,7 @@ class DataFetcher {
std::vector<std::vector<std::size_t>> packetPageSizes_;

std::shared_ptr<OutputBufferManager> bufferManager_{
OutputBufferManager::getInstance().lock()};
OutputBufferManager::getInstanceRef()};
};

/// Verify that POBM::getData() honors maxBytes parameter roughly at 1MB
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/OutputBufferManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class OutputBufferManagerTest : public testing::Test {

void SetUp() override {
pool_ = facebook::velox::memory::memoryManager()->addLeafPool();
bufferManager_ = OutputBufferManager::getInstance().lock();
bufferManager_ = OutputBufferManager::getInstanceRef();
if (!isRegisteredVectorSerde()) {
facebook::velox::serializer::presto::PrestoVectorSerde::
registerVectorSerde();
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/PartitionedOutputTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class PartitionedOutputTest

private:
const std::shared_ptr<OutputBufferManager> bufferManager_{
OutputBufferManager::getInstance().lock()};
OutputBufferManager::getInstanceRef()};
};

TEST_P(PartitionedOutputTest, flush) {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ TEST_F(TaskTest, updateBroadCastOutputBuffers) {
.project({"c0 % 10"})
.partitionedOutputBroadcast({})
.planFragment();
auto bufferManager = OutputBufferManager::getInstance().lock();
auto bufferManager = OutputBufferManager::getInstanceRef();
{
auto task = Task::create(
"t0",
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/tests/utils/LocalExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class LocalExchangeSource : public exec::ExchangeSource {

promise_ = std::move(promise);

auto buffers = OutputBufferManager::getInstance().lock();
auto buffers = OutputBufferManager::getInstanceRef();
VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager");
VELOX_CHECK(requestPending_);
auto requestedSequence = sequence_;
Expand Down Expand Up @@ -164,7 +164,7 @@ class LocalExchangeSource : public exec::ExchangeSource {
void pause() override {
common::testutil::TestValue::adjust(
"facebook::velox::exec::test::LocalExchangeSource::pause", nullptr);
auto buffers = OutputBufferManager::getInstance().lock();
auto buffers = OutputBufferManager::getInstanceRef();
VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager");
int64_t ackSequence;
{
Expand All @@ -177,7 +177,7 @@ class LocalExchangeSource : public exec::ExchangeSource {
void close() override {
checkSetRequestPromise();

auto buffers = OutputBufferManager::getInstance().lock();
auto buffers = OutputBufferManager::getInstanceRef();
buffers->deleteResults(taskId_, destination_);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/PartitionedOutputReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase {
const core::PartitionedOutputNode* const originalNode_;
const VectorSerde::Kind serdeKind_;
const std::shared_ptr<exec::OutputBufferManager> bufferManager_{
exec::OutputBufferManager::getInstance().lock()};
exec::OutputBufferManager::getInstanceRef()};
const std::unique_ptr<folly::Executor> executor_{
std::make_unique<folly::CPUThreadPoolExecutor>(
std::thread::hardware_concurrency(),
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class PartitionedOutputReplayerTest
}

const std::shared_ptr<OutputBufferManager> bufferManager_{
exec::OutputBufferManager::getInstance().lock()};
exec::OutputBufferManager::getInstanceRef()};
};

TEST_P(PartitionedOutputReplayerTest, defaultConsumer) {
Expand Down
Loading