Skip to content

Commit

Permalink
Avoid small batches in Exchange
Browse files Browse the repository at this point in the history
Summary:
Prevent exchange client from unblocking to early. Unblocking to early impedes
effectiveness of page merging. When the cost of creating a vector is high (for
example for data sets with high number of columns) creating small pages can
make queries significantly less efficient.

For example it was observed that when network is congested and Exchange buffers
are not filled up as fast query may experience CPU efficiency drop up to 4x: T211034421

Differential Revision: D67615570
  • Loading branch information
arhimondr authored and facebook-github-bot committed Jan 3, 2025
1 parent 9da5fa7 commit d634e88
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 20 deletions.
14 changes: 14 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ class QueryConfig {
static constexpr const char* kMaxMergeExchangeBufferSize =
"merge_exchange.max_buffer_size";

/// The minimum number of bytes to accumulate in the ExchangeQueue
/// before unblocking a consumer. This is used to avoid creating tiny
/// batches which may have a negative impact on performance when the
/// cost of creating vectors is high (for example, when there are many
/// columns). To avoid latency degradation, the exchange client unblocks a
/// consumer when 1% of the data size observed so far is accumulated.
static constexpr const char* kMinExchangeOutputBatchBytes =
"min_exchange_output_batch_bytes";

static constexpr const char* kMaxPartialAggregationMemory =
"max_partial_aggregation_memory";

Expand Down Expand Up @@ -594,6 +603,11 @@ class QueryConfig {
return get<uint64_t>(kMaxMergeExchangeBufferSize, kDefault);
}

uint64_t minExchangeOutputBatchBytes() const {
static constexpr uint64_t kDefault = 2UL << 20;
return get<uint64_t>(kMinExchangeOutputBatchBytes, kDefault);
}

uint64_t preferredOutputBatchBytes() const {
static constexpr uint64_t kDefault = 10UL << 20;
return get<uint64_t>(kPreferredOutputBatchBytes, kDefault);
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/ExchangeClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ namespace facebook::velox::exec {
class ExchangeClient : public std::enable_shared_from_this<ExchangeClient> {
public:
static constexpr int32_t kDefaultMaxQueuedBytes = 32 << 20; // 32 MB.
static constexpr int32_t kDefaultMinExchangeOutputBatchBytes{
2 << 20}; // 2 MB.
static constexpr std::chrono::seconds kRequestDataSizesMaxWait{10};
static constexpr std::chrono::milliseconds kRequestDataMaxWait{100};
static inline const std::string kBackgroundCpuTimeMs = "backgroundCpuTimeMs";
Expand All @@ -33,14 +35,15 @@ class ExchangeClient : public std::enable_shared_from_this<ExchangeClient> {
std::string taskId,
int destination,
int64_t maxQueuedBytes,
uint64_t minOutputBatchBytes,
memory::MemoryPool* pool,
folly::Executor* executor)
: taskId_{std::move(taskId)},
destination_(destination),
maxQueuedBytes_{maxQueuedBytes},
pool_(pool),
executor_(executor),
queue_(std::make_shared<ExchangeQueue>()) {
queue_(std::make_shared<ExchangeQueue>(minOutputBatchBytes)) {
VELOX_CHECK_NOT_NULL(pool_);
VELOX_CHECK_NOT_NULL(executor_);
// NOTE: the executor is used to run async response callback from the
Expand Down
25 changes: 24 additions & 1 deletion velox/exec/ExchangeQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "velox/exec/ExchangeQueue.h"
#include <algorithm>

namespace facebook::velox::exec {

Expand Down Expand Up @@ -64,6 +65,15 @@ void ExchangeQueue::close() {
clearPromises(promises);
}

int64_t ExchangeQueue::getMinOutputBatchBytesLocked() const {
// always allow to unblock when at end
if (atEnd_) {
return 0;
}
// At most 1% of received bytes so far to minimize latency for small exchanges
return std::min<int64_t>(minOutputBatchBytes_, receivedBytes_ / 100);
}

void ExchangeQueue::enqueueLocked(
std::unique_ptr<SerializedPage>&& page,
std::vector<ContinuePromise>& promises) {
Expand All @@ -86,10 +96,13 @@ void ExchangeQueue::enqueueLocked(
receivedBytes_ += page->size();

queue_.push_back(std::move(page));
if (!promises_.empty()) {
const auto minBatchSize = getMinOutputBatchBytesLocked();
while (!promises_.empty() &&
(totalBytes_ - (inflightConsumers * minBatchSize)) >= minBatchSize) {
// Resume one of the waiting drivers.
promises.push_back(std::move(promises_.back()));
promises_.pop_back();
inflightConsumers++;
}
}

Expand All @@ -105,6 +118,16 @@ std::vector<std::unique_ptr<SerializedPage>> ExchangeQueue::dequeueLocked(

*atEnd = false;

if (inflightConsumers > 0) {
inflightConsumers--;
}

if (totalBytes_ < getMinOutputBatchBytesLocked()) {
promises_.emplace_back("ExchangeQueue::dequeue");
*future = promises_.back().getSemiFuture();
return {};
}

std::vector<std::unique_ptr<SerializedPage>> pages;
uint32_t pageBytes = 0;
for (;;) {
Expand Down
9 changes: 9 additions & 0 deletions velox/exec/ExchangeQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ class SerializedPage {
/// for input.
class ExchangeQueue {
public:
explicit ExchangeQueue(uint64_t minOutputBatchBytes)
: minOutputBatchBytes_{minOutputBatchBytes} {}

~ExchangeQueue() {
clearAllPromises();
}
Expand Down Expand Up @@ -185,6 +188,10 @@ class ExchangeQueue {
}
}

int64_t getMinOutputBatchBytesLocked() const;

const uint64_t minOutputBatchBytes_;

int numCompleted_{0};
int numSources_{0};
bool noMoreSources_{false};
Expand All @@ -205,5 +212,7 @@ class ExchangeQueue {
int64_t receivedBytes_{0};
// Maximum value of totalBytes_.
int64_t peakBytes_{0};
// Number of unblocked consumers expected to consume data shortly
int64_t inflightConsumers{0};
};
} // namespace facebook::velox::exec
1 change: 1 addition & 0 deletions velox/exec/MergeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class MergeExchangeSource : public MergeSource {
mergeExchange->taskId(),
destination,
maxQueuedBytes,
1,
pool,
executor)) {
client_->addRemoteTaskId(taskId);
Expand Down
1 change: 1 addition & 0 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2997,6 +2997,7 @@ void Task::createExchangeClientLocked(
taskId_,
destination_,
queryCtx()->queryConfig().maxExchangeBufferSize(),
queryCtx()->queryConfig().minExchangeOutputBatchBytes(),
addExchangeClientPool(planNodeId, pipelineId),
queryCtx()->executor());
exchangeClientByPlanNode_.emplace(planNodeId, exchangeClients_[pipelineId]);
Expand Down
Loading

0 comments on commit d634e88

Please sign in to comment.