From d634e88094bdd1e3e217e75d57a1b74ba49d0495 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Thu, 2 Jan 2025 19:36:32 -0800 Subject: [PATCH] Avoid small batches in Exchange Summary: Prevent exchange client from unblocking to early. Unblocking to early impedes effectiveness of page merging. When the cost of creating a vector is high (for example for data sets with high number of columns) creating small pages can make queries significantly less efficient. For example it was observed that when network is congested and Exchange buffers are not filled up as fast query may experience CPU efficiency drop up to 4x: T211034421 Differential Revision: D67615570 --- velox/core/QueryConfig.h | 14 + velox/exec/ExchangeClient.h | 5 +- velox/exec/ExchangeQueue.cpp | 25 +- velox/exec/ExchangeQueue.h | 9 + velox/exec/MergeSource.cpp | 1 + velox/exec/Task.cpp | 1 + velox/exec/tests/ExchangeClientTest.cpp | 263 +++++++++++++++++-- velox/exec/tests/OutputBufferManagerTest.cpp | 4 +- 8 files changed, 302 insertions(+), 20 deletions(-) diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 630bebdc8fc0..2f6c3e55cfd6 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -113,6 +113,15 @@ class QueryConfig { static constexpr const char* kMaxMergeExchangeBufferSize = "merge_exchange.max_buffer_size"; + /// The minimum number of bytes to accumulate in the ExchangeQueue + /// before unblocking a consumer. This is used to avoid creating tiny + /// batches which may have a negative impact on performance when the + /// cost of creating vectors is high (for example, when there are many + /// columns). To avoid latency degradation, the exchange client unblocks a + /// consumer when 1% of the data size observed so far is accumulated. + static constexpr const char* kMinExchangeOutputBatchBytes = + "min_exchange_output_batch_bytes"; + static constexpr const char* kMaxPartialAggregationMemory = "max_partial_aggregation_memory"; @@ -594,6 +603,11 @@ class QueryConfig { return get(kMaxMergeExchangeBufferSize, kDefault); } + uint64_t minExchangeOutputBatchBytes() const { + static constexpr uint64_t kDefault = 2UL << 20; + return get(kMinExchangeOutputBatchBytes, kDefault); + } + uint64_t preferredOutputBatchBytes() const { static constexpr uint64_t kDefault = 10UL << 20; return get(kPreferredOutputBatchBytes, kDefault); diff --git a/velox/exec/ExchangeClient.h b/velox/exec/ExchangeClient.h index 29bc3b884fdd..c25585236513 100644 --- a/velox/exec/ExchangeClient.h +++ b/velox/exec/ExchangeClient.h @@ -25,6 +25,8 @@ namespace facebook::velox::exec { class ExchangeClient : public std::enable_shared_from_this { public: static constexpr int32_t kDefaultMaxQueuedBytes = 32 << 20; // 32 MB. + static constexpr int32_t kDefaultMinExchangeOutputBatchBytes{ + 2 << 20}; // 2 MB. static constexpr std::chrono::seconds kRequestDataSizesMaxWait{10}; static constexpr std::chrono::milliseconds kRequestDataMaxWait{100}; static inline const std::string kBackgroundCpuTimeMs = "backgroundCpuTimeMs"; @@ -33,6 +35,7 @@ class ExchangeClient : public std::enable_shared_from_this { std::string taskId, int destination, int64_t maxQueuedBytes, + uint64_t minOutputBatchBytes, memory::MemoryPool* pool, folly::Executor* executor) : taskId_{std::move(taskId)}, @@ -40,7 +43,7 @@ class ExchangeClient : public std::enable_shared_from_this { maxQueuedBytes_{maxQueuedBytes}, pool_(pool), executor_(executor), - queue_(std::make_shared()) { + queue_(std::make_shared(minOutputBatchBytes)) { VELOX_CHECK_NOT_NULL(pool_); VELOX_CHECK_NOT_NULL(executor_); // NOTE: the executor is used to run async response callback from the diff --git a/velox/exec/ExchangeQueue.cpp b/velox/exec/ExchangeQueue.cpp index f19745191d47..13b8d57ed0c5 100644 --- a/velox/exec/ExchangeQueue.cpp +++ b/velox/exec/ExchangeQueue.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ #include "velox/exec/ExchangeQueue.h" +#include namespace facebook::velox::exec { @@ -64,6 +65,15 @@ void ExchangeQueue::close() { clearPromises(promises); } +int64_t ExchangeQueue::getMinOutputBatchBytesLocked() const { + // always allow to unblock when at end + if (atEnd_) { + return 0; + } + // At most 1% of received bytes so far to minimize latency for small exchanges + return std::min(minOutputBatchBytes_, receivedBytes_ / 100); +} + void ExchangeQueue::enqueueLocked( std::unique_ptr&& page, std::vector& promises) { @@ -86,10 +96,13 @@ void ExchangeQueue::enqueueLocked( receivedBytes_ += page->size(); queue_.push_back(std::move(page)); - if (!promises_.empty()) { + const auto minBatchSize = getMinOutputBatchBytesLocked(); + while (!promises_.empty() && + (totalBytes_ - (inflightConsumers * minBatchSize)) >= minBatchSize) { // Resume one of the waiting drivers. promises.push_back(std::move(promises_.back())); promises_.pop_back(); + inflightConsumers++; } } @@ -105,6 +118,16 @@ std::vector> ExchangeQueue::dequeueLocked( *atEnd = false; + if (inflightConsumers > 0) { + inflightConsumers--; + } + + if (totalBytes_ < getMinOutputBatchBytesLocked()) { + promises_.emplace_back("ExchangeQueue::dequeue"); + *future = promises_.back().getSemiFuture(); + return {}; + } + std::vector> pages; uint32_t pageBytes = 0; for (;;) { diff --git a/velox/exec/ExchangeQueue.h b/velox/exec/ExchangeQueue.h index 91e3a663aa06..b2c4c093eac3 100644 --- a/velox/exec/ExchangeQueue.h +++ b/velox/exec/ExchangeQueue.h @@ -81,6 +81,9 @@ class SerializedPage { /// for input. class ExchangeQueue { public: + explicit ExchangeQueue(uint64_t minOutputBatchBytes) + : minOutputBatchBytes_{minOutputBatchBytes} {} + ~ExchangeQueue() { clearAllPromises(); } @@ -185,6 +188,10 @@ class ExchangeQueue { } } + int64_t getMinOutputBatchBytesLocked() const; + + const uint64_t minOutputBatchBytes_; + int numCompleted_{0}; int numSources_{0}; bool noMoreSources_{false}; @@ -205,5 +212,7 @@ class ExchangeQueue { int64_t receivedBytes_{0}; // Maximum value of totalBytes_. int64_t peakBytes_{0}; + // Number of unblocked consumers expected to consume data shortly + int64_t inflightConsumers{0}; }; } // namespace facebook::velox::exec diff --git a/velox/exec/MergeSource.cpp b/velox/exec/MergeSource.cpp index 786bd40045cc..1bf433d18ac7 100644 --- a/velox/exec/MergeSource.cpp +++ b/velox/exec/MergeSource.cpp @@ -128,6 +128,7 @@ class MergeExchangeSource : public MergeSource { mergeExchange->taskId(), destination, maxQueuedBytes, + 1, pool, executor)) { client_->addRemoteTaskId(taskId); diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 587c571b76f9..fb827ffc2db6 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -2997,6 +2997,7 @@ void Task::createExchangeClientLocked( taskId_, destination_, queryCtx()->queryConfig().maxExchangeBufferSize(), + queryCtx()->queryConfig().minExchangeOutputBatchBytes(), addExchangeClientPool(planNodeId, pipelineId), queryCtx()->executor()); exchangeClientByPlanNode_.emplace(planNodeId, exchangeClients_[pipelineId]); diff --git a/velox/exec/tests/ExchangeClientTest.cpp b/velox/exec/tests/ExchangeClientTest.cpp index 2ffcd78cbd7d..db27ea401bf9 100644 --- a/velox/exec/tests/ExchangeClientTest.cpp +++ b/velox/exec/tests/ExchangeClientTest.cpp @@ -110,10 +110,10 @@ class ExchangeClientTest int32_t numPages) { std::vector> allPages; for (auto i = 0; i < numPages; ++i) { - bool atEnd; + bool atEnd{false}; ContinueFuture future; auto pages = client.next(1, &atEnd, &future); - if (pages.empty()) { + while (!atEnd && pages.empty()) { auto& exec = folly::QueuedImmediateExecutor::instance(); std::move(future).via(&exec).wait(); pages = client.next(1, &atEnd, &future); @@ -170,7 +170,7 @@ TEST_P(ExchangeClientTest, nonVeloxCreateExchangeSourceException) { }); auto client = std::make_shared( - "t", 1, ExchangeClient::kDefaultMaxQueuedBytes, pool(), executor()); + "t", 1, ExchangeClient::kDefaultMaxQueuedBytes, 1, pool(), executor()); VELOX_ASSERT_THROW( client->addRemoteTaskId("task.1.2.3"), @@ -199,7 +199,12 @@ TEST_P(ExchangeClientTest, stats) { task, core::PartitionedOutputNode::Kind::kPartitioned, 100, 16); auto client = std::make_shared( - "t", 17, ExchangeClient::kDefaultMaxQueuedBytes, pool(), executor()); + "t", + 17, + ExchangeClient::kDefaultMaxQueuedBytes, + ExchangeClient::kDefaultMinExchangeOutputBatchBytes, + pool(), + executor()); client->addRemoteTaskId(taskId); // Enqueue 3 pages. @@ -239,7 +244,12 @@ TEST_P(ExchangeClientTest, flowControl) { // Set limit at 3.5 pages. auto client = std::make_shared( - "flow.control", 17, page->size() * 3.5, pool(), executor()); + "flow.control", + 17, + page->size() * 3.5, + ExchangeClient::kDefaultMinExchangeOutputBatchBytes, + pool(), + executor()); // Make 10 tasks. std::vector> tasks; @@ -277,10 +287,16 @@ TEST_P(ExchangeClientTest, flowControl) { TEST_P(ExchangeClientTest, largeSinglePage) { auto data = { makeRowVector({makeFlatVector(10000, folly::identity)}), - makeRowVector({makeFlatVector(1, folly::identity)}), + // second page is >1% of total payload size + makeRowVector({makeFlatVector(150, folly::identity)}), }; - auto client = - std::make_shared("test", 1, 1000, pool(), executor()); + auto client = std::make_shared( + "test", + 1, + 1000, + ExchangeClient::kDefaultMinExchangeOutputBatchBytes, + pool(), + executor()); auto task = makeTask("local://producer"); bufferManager_->initializeTask( task, core::PartitionedOutputNode::Kind::kArbitrary, 1, 1); @@ -290,18 +306,23 @@ TEST_P(ExchangeClientTest, largeSinglePage) { client->addRemoteTaskId(task->taskId()); auto pages = fetchPages(*client, 1); ASSERT_EQ(pages.size(), 1); - ASSERT_GT(pages[0]->size(), 1000); + ASSERT_GT(pages[0]->size(), 80000); pages = fetchPages(*client, 1); ASSERT_EQ(pages.size(), 1); - ASSERT_LT(pages[0]->size(), 1000); + ASSERT_LT(pages[0]->size(), 4000); task->requestCancel(); bufferManager_->removeTask(task->taskId()); client->close(); } TEST_P(ExchangeClientTest, multiPageFetch) { - auto client = - std::make_shared("test", 17, 1 << 20, pool(), executor()); + auto client = std::make_shared( + "test", + 17, + 1 << 20, + ExchangeClient::kDefaultMinExchangeOutputBatchBytes, + pool(), + executor()); { bool atEnd; @@ -353,8 +374,13 @@ TEST_P(ExchangeClientTest, multiPageFetch) { TEST_P(ExchangeClientTest, sourceTimeout) { constexpr int32_t kNumSources = 3; - auto client = - std::make_shared("test", 17, 1 << 20, pool(), executor()); + auto client = std::make_shared( + "test", + 17, + 1 << 20, + ExchangeClient::kDefaultMinExchangeOutputBatchBytes, + pool(), + executor()); bool atEnd; ContinueFuture future; @@ -434,8 +460,13 @@ TEST_P(ExchangeClientTest, sourceTimeout) { TEST_P(ExchangeClientTest, callNextAfterClose) { constexpr int32_t kNumSources = 3; common::testutil::TestValue::enable(); - auto client = - std::make_shared("test", 17, 1 << 20, pool(), executor()); + auto client = std::make_shared( + "test", + 17, + 1 << 20, + ExchangeClient::kDefaultMinExchangeOutputBatchBytes, + pool(), + executor()); bool atEnd; ContinueFuture future; @@ -497,6 +528,7 @@ TEST_P(ExchangeClientTest, acknowledge) { "local://test-acknowledge-client-task", 1, clientBufferSize, + ExchangeClient::kDefaultMinExchangeOutputBatchBytes, pool(), executor()); auto clientCloseGuard = folly::makeGuard([client]() { client->close(); }); @@ -615,6 +647,205 @@ TEST_P(ExchangeClientTest, acknowledge) { ASSERT_TRUE(atEnd); } +TEST_P(ExchangeClientTest, minOutputBatchBytesInitialBatches) { + // Initial batches should not block to avoid impacting latency of small + // exchanges + + const auto minOutputBatchBytes = 10000; + + auto client = std::make_shared( + "test", + 17, + ExchangeClient::kDefaultMaxQueuedBytes, + minOutputBatchBytes, + pool(), + executor()); + + const auto& queue = client->queue(); + addSources(*queue, 1); + + bool atEnd; + ContinueFuture future = ContinueFuture::makeEmpty(); + + // first page should unblock right away + auto pages = client->next(1, &atEnd, &future); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(2000)); + ASSERT_TRUE(future.isReady()); + pages = client->next(1, &atEnd, &future); + ASSERT_EQ(1, pages.size()); + + // page larger than 1% of total should unblock right away + pages = client->next(1, &atEnd, &future); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(100)); + ASSERT_TRUE(future.isReady()); + pages = client->next(1, &atEnd, &future); + ASSERT_EQ(1, pages.size()); + + // small page (<1% of total) should block + pages = client->next(1, &atEnd, &future); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(15)); + ASSERT_FALSE(future.isReady()); + // one more small page should unblock now + enqueue(*queue, makePage(10)); + ASSERT_TRUE(future.isReady()); + pages = client->next(100, &atEnd, &future); + ASSERT_EQ(2, pages.size()); + + // Signal no-more-data. + enqueue(*queue, nullptr); + + pages = client->next(10'000, &atEnd, &future); + ASSERT_EQ(0, pages.size()); + ASSERT_TRUE(atEnd); + + client->close(); +} + +TEST_P(ExchangeClientTest, minOutputBatchBytesSingleConsumer) { + const auto minOutputBatchBytes = 1000; + + auto client = std::make_shared( + "test", + 17, + ExchangeClient::kDefaultMaxQueuedBytes, + minOutputBatchBytes, + pool(), + executor()); + + const auto& queue = client->queue(); + addSources(*queue, 1); + + bool atEnd; + ContinueFuture future = ContinueFuture::makeEmpty(); + + // first page should unblock right away + auto pages = client->next(1, &atEnd, &future); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(minOutputBatchBytes * 150)); + ASSERT_TRUE(future.isReady()); + pages = client->next(1, &atEnd, &future); + ASSERT_EQ(1, pages.size()); + + pages = client->next(1, &atEnd, &future); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(minOutputBatchBytes / 2)); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(minOutputBatchBytes / 3)); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(minOutputBatchBytes / 3)); + ASSERT_TRUE(future.isReady()); + + pages = client->next(minOutputBatchBytes, &atEnd, &future); + ASSERT_EQ(2, pages.size()); + + pages = client->next(1, &atEnd, &future); + ASSERT_FALSE(future.isReady()); + enqueue(*queue, makePage(minOutputBatchBytes / 2)); + ASSERT_FALSE(future.isReady()); + + // Signal no-more-data. + enqueue(*queue, nullptr); + ASSERT_TRUE(future.isReady()); + + pages = client->next(10'000, &atEnd, &future); + ASSERT_EQ(2, pages.size()); + ASSERT_TRUE(atEnd); + + client->close(); +} + +TEST_P(ExchangeClientTest, minOutputBatchBytesMultipleConsumers) { + const auto minOutputBatchBytes = 1000; + + auto client = std::make_shared( + "test", + 17, + ExchangeClient::kDefaultMaxQueuedBytes, + minOutputBatchBytes, + pool(), + executor()); + + const auto& queue = client->queue(); + addSources(*queue, 1); + + bool atEnd; + + ContinueFuture consumer1 = ContinueFuture::makeEmpty(); + ContinueFuture consumer2 = ContinueFuture::makeEmpty(); + ContinueFuture consumer3 = ContinueFuture::makeEmpty(); + + client->next(1, &atEnd, &consumer1); + ASSERT_FALSE(consumer1.isReady()); + client->next(1, &atEnd, &consumer2); + ASSERT_FALSE(consumer2.isReady()); + client->next(1, &atEnd, &consumer3); + ASSERT_FALSE(consumer3.isReady()); + + // first page should unblock right away + + enqueue(*queue, makePage(minOutputBatchBytes * 150)); + ASSERT_TRUE(consumer1.isReady()); + ASSERT_TRUE(consumer2.isReady()); + ASSERT_TRUE(consumer3.isReady()); + + auto pages = client->next(1, &atEnd, &consumer1); + ASSERT_EQ(1, pages.size()); + + client->next(1, &atEnd, &consumer1); + ASSERT_FALSE(consumer1.isReady()); + client->next(1, &atEnd, &consumer2); + ASSERT_FALSE(consumer2.isReady()); + client->next(1, &atEnd, &consumer3); + ASSERT_FALSE(consumer3.isReady()); + + enqueue(*queue, makePage(minOutputBatchBytes)); + ASSERT_FALSE(consumer1.isReady()); + ASSERT_FALSE(consumer2.isReady()); + ASSERT_TRUE(consumer3.isReady()); + + enqueue(*queue, makePage(minOutputBatchBytes / 2)); + ASSERT_FALSE(consumer1.isReady()); + ASSERT_FALSE(consumer2.isReady()); + ASSERT_TRUE(consumer3.isReady()); + + enqueue(*queue, makePage(minOutputBatchBytes)); + ASSERT_FALSE(consumer1.isReady()); + ASSERT_TRUE(consumer2.isReady()); + ASSERT_TRUE(consumer3.isReady()); + + pages = client->next(1, &atEnd, &consumer3); + ASSERT_EQ(1, pages.size()); + pages = client->next(1, &atEnd, &consumer2); + ASSERT_EQ(1, pages.size()); + + client->next(1, &atEnd, &consumer1); + ASSERT_FALSE(consumer1.isReady()); + client->next(1, &atEnd, &consumer2); + ASSERT_FALSE(consumer2.isReady()); + client->next(1, &atEnd, &consumer3); + ASSERT_FALSE(consumer3.isReady()); + + enqueue(*queue, makePage(minOutputBatchBytes / 2)); + ASSERT_FALSE(consumer1.isReady()); + ASSERT_FALSE(consumer2.isReady()); + ASSERT_FALSE(consumer3.isReady()); + + // Signal no-more-data. + enqueue(*queue, nullptr); + ASSERT_TRUE(consumer1.isReady()); + ASSERT_TRUE(consumer2.isReady()); + ASSERT_TRUE(consumer3.isReady()); + + pages = client->next(10'000, &atEnd, &consumer1); + ASSERT_EQ(1, pages.size()); + ASSERT_TRUE(atEnd); + + client->close(); +} + VELOX_INSTANTIATE_TEST_SUITE_P( ExchangeClientTest, ExchangeClientTest, diff --git a/velox/exec/tests/OutputBufferManagerTest.cpp b/velox/exec/tests/OutputBufferManagerTest.cpp index 3902ead5ffc4..b5ab64c1e037 100644 --- a/velox/exec/tests/OutputBufferManagerTest.cpp +++ b/velox/exec/tests/OutputBufferManagerTest.cpp @@ -1451,7 +1451,7 @@ TEST_P(OutputBufferManagerWithDifferentSerdeKindsTest, outOfOrderAcks) { } TEST_F(OutputBufferManagerTest, errorInQueue) { - auto queue = std::make_shared(); + auto queue = std::make_shared(1); queue->setError("Forced failure"); std::lock_guard l(queue->mutex()); @@ -1473,7 +1473,7 @@ TEST_P( auto page = std::make_unique(std::move(iobuf)); - auto queue = std::make_shared(); + auto queue = std::make_shared(1); std::vector promises; { std::lock_guard l(queue->mutex());