diff --git a/velox/exec/Exchange.cpp b/velox/exec/Exchange.cpp index f0fd1d964a3e..8dcf41ccc7a8 100644 --- a/velox/exec/Exchange.cpp +++ b/velox/exec/Exchange.cpp @@ -19,6 +19,38 @@ namespace facebook::velox::exec { +namespace { +std::unique_ptr getVectorSerdeOptions( + VectorSerde::Kind kind) { + std::unique_ptr options = + kind == VectorSerde::Kind::kPresto + ? std::make_unique() + : std::make_unique(); + options->compressionKind = + OutputBufferManager::getInstance().lock()->compressionKind(); + return options; +} +} // namespace + +Exchange::Exchange( + int32_t operatorId, + DriverCtx* driverCtx, + const std::shared_ptr& exchangeNode, + std::shared_ptr exchangeClient, + const std::string& operatorType) + : SourceOperator( + driverCtx, + exchangeNode->outputType(), + operatorId, + exchangeNode->id(), + operatorType), + preferredOutputBatchBytes_{ + driverCtx->queryConfig().preferredOutputBatchBytes()}, + serdeKind_(exchangeNode->serdeKind()), + options_{getVectorSerdeOptions(serdeKind_)}, + processSplits_{operatorCtx_->driverCtx()->driverId == 0}, + exchangeClient_{std::move(exchangeClient)} {} + void Exchange::addTaskIds(std::vector& taskIds) { std::shuffle(std::begin(taskIds), std::end(taskIds), rng_); for (const std::string& taskId : taskIds) { diff --git a/velox/exec/Exchange.h b/velox/exec/Exchange.h index cab700f2076d..45cdce8ceb59 100644 --- a/velox/exec/Exchange.h +++ b/velox/exec/Exchange.h @@ -42,27 +42,7 @@ class Exchange : public SourceOperator { DriverCtx* driverCtx, const std::shared_ptr& exchangeNode, std::shared_ptr exchangeClient, - const std::string& operatorType = "Exchange") - : SourceOperator( - driverCtx, - exchangeNode->outputType(), - operatorId, - exchangeNode->id(), - operatorType), - preferredOutputBatchBytes_{ - driverCtx->queryConfig().preferredOutputBatchBytes()}, - serdeKind_(exchangeNode->serdeKind()), - processSplits_{operatorCtx_->driverCtx()->driverId == 0}, - exchangeClient_{std::move(exchangeClient)} { - if (serdeKind_ == VectorSerde::Kind::kPresto) { - options_ = std::make_unique< - serializer::presto::PrestoVectorSerde::PrestoOptions>(); - } else { - options_ = std::make_unique(); - } - options_->compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); - } + const std::string& operatorType = "Exchange"); ~Exchange() override { close(); @@ -82,16 +62,16 @@ class Exchange : public SourceOperator { private: // Invoked to create exchange client for remote tasks. // The function shuffles the source task ids first to randomize the source - // tasks we fetch data from. This helps to avoid different tasks fetching from - // the same source task in a distributed system. + // tasks we fetch data from. This helps to avoid different tasks fetching + // from the same source task in a distributed system. void addTaskIds(std::vector& taskIds); /// Fetches splits from the task until there are no more splits or task /// returns a future that will be complete when more splits arrive. Adds - /// splits to exchangeClient_. Returns true if received a future from the task - /// and sets the 'future' parameter. Returns false if fetched all splits or if - /// this operator is not the first operator in the pipeline and therefore is - /// not responsible for fetching splits and adding them to the + /// splits to exchangeClient_. Returns true if received a future from the + /// task and sets the 'future' parameter. Returns false if fetched all + /// splits or if this operator is not the first operator in the pipeline and + /// therefore is not responsible for fetching splits and adding them to the /// exchangeClient_. bool getSplits(ContinueFuture* future); @@ -103,16 +83,19 @@ class Exchange : public SourceOperator { const VectorSerde::Kind serdeKind_; - /// True if this operator is responsible for fetching splits from the Task and - /// passing these to ExchangeClient. + const std::unique_ptr options_; + + /// True if this operator is responsible for fetching splits from the Task + /// and passing these to ExchangeClient. const bool processSplits_; bool noMoreSplits_ = false; std::shared_ptr exchangeClient_; - /// A future received from Task::getSplitOrFuture(). It will be complete when - /// there are more splits available or no-more-splits signal has arrived. + /// A future received from Task::getSplitOrFuture(). It will be complete + /// when there are more splits available or no-more-splits signal has + /// arrived. ContinueFuture splitFuture_{ContinueFuture::makeEmpty()}; // Reusable result vector. @@ -121,7 +104,6 @@ class Exchange : public SourceOperator { std::vector> currentPages_; bool atEnd_{false}; std::default_random_engine rng_{std::random_device{}()}; - std::unique_ptr options_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index 59794de58289..88663c9ad1e5 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -38,13 +38,14 @@ Destination::Destination( const std::string& taskId, int destination, VectorSerde* serde, + VectorSerde::Options* options, memory::MemoryPool* pool, bool eagerFlush, std::function recordEnqueued) : taskId_(taskId), destination_(destination), serde_(serde), - options_(getVectorSerdeOptions(serde_->kind())), + options_(options), pool_(pool), eagerFlush_(eagerFlush), recordEnqueued_(std::move(recordEnqueued)) { @@ -88,7 +89,7 @@ BlockingReason Destination::advance( if (current_ == nullptr) { current_ = std::make_unique(pool_, serde_); const auto rowType = asRowType(output->type()); - current_->createStreamTree(rowType, rowsInCurrent_, options_.get()); + current_->createStreamTree(rowType, rowsInCurrent_, options_); } const auto rows = folly::Range(&rows_[firstRow], rowIdx_ - firstRow); @@ -198,7 +199,8 @@ PartitionedOutput::PartitionedOutput( ->queryConfig() .maxPartitionedOutputBufferSize()), eagerFlush_(eagerFlush), - serde_(getNamedVectorSerde(planNode->serdeKind())) { + serde_(getNamedVectorSerde(planNode->serdeKind())), + options_(getVectorSerdeOptions(planNode->serdeKind())) { if (!planNode->isPartitioned()) { VELOX_USER_CHECK_EQ(numDestinations_, 1); } @@ -254,6 +256,7 @@ void PartitionedOutput::initializeDestinations() { taskId, i, serde_, + options_.get(), pool(), eagerFlush_, [&](uint64_t bytes, uint64_t rows) { diff --git a/velox/exec/PartitionedOutput.h b/velox/exec/PartitionedOutput.h index 1b3f6ba96bd4..699af6d65590 100644 --- a/velox/exec/PartitionedOutput.h +++ b/velox/exec/PartitionedOutput.h @@ -33,6 +33,7 @@ class Destination { const std::string& taskId, int destination, VectorSerde* serde, + VectorSerde::Options* options, memory::MemoryPool* pool, bool eagerFlush, std::function recordEnqueued); @@ -104,7 +105,7 @@ class Destination { const std::string taskId_; const int destination_; VectorSerde* const serde_; - const std::unique_ptr options_; + VectorSerde::Options* const options_; memory::MemoryPool* const pool_; const bool eagerFlush_; const std::function recordEnqueued_; @@ -219,6 +220,7 @@ class PartitionedOutput : public Operator { const int64_t maxBufferedBytes_; const bool eagerFlush_; VectorSerde* const serde_; + const std::unique_ptr options_; BlockingReason blockingReason_{BlockingReason::kNotBlocked}; ContinueFuture future_; diff --git a/velox/exec/SpillFile.cpp b/velox/exec/SpillFile.cpp index f2d3b07db759..35786be53032 100644 --- a/velox/exec/SpillFile.cpp +++ b/velox/exec/SpillFile.cpp @@ -178,7 +178,7 @@ uint64_t SpillWriter::write( kDefaultUseLosslessTimestamp, compressionKind_, 0.8, - true /*nullsFirst*/}; + /*nullsFirst=*/true}; batch_ = std::make_unique(pool_, serde_); batch_->createStreamTree( std::static_pointer_cast(rows->type()), diff --git a/velox/serializers/CompactRowSerializer.cpp b/velox/serializers/CompactRowSerializer.cpp index 0730ca546e33..d75fdb068573 100644 --- a/velox/serializers/CompactRowSerializer.cpp +++ b/velox/serializers/CompactRowSerializer.cpp @@ -23,15 +23,6 @@ namespace facebook::velox::serializer { namespace { using TRowSize = uint32_t; -void writeInt32(OutputStream* out, int32_t value) { - out->write(reinterpret_cast(&value), sizeof(value)); -} - -void writeBool(OutputStream* out, bool value) { - char writeValue = value ? 1 : 0; - out->write(reinterpret_cast(&writeValue), sizeof(char)); -} - VectorSerde::Options toValidOptions(const VectorSerde::Options* options) { if (options == nullptr) { return VectorSerde::Options(); @@ -39,8 +30,7 @@ VectorSerde::Options toValidOptions(const VectorSerde::Options* options) { return *options; } -std::unique_ptr buffersToIOBuf( - const std::vector& buffers) { +std::unique_ptr toIOBuf(const std::vector& buffers) { std::unique_ptr iobuf; for (const auto& buffer : buffers) { auto newBuf = @@ -71,6 +61,13 @@ struct CompactRowHeader { return header; } + void write(OutputStream* out) { + out->write(reinterpret_cast(&uncompressedSize), sizeof(int32_t)); + out->write(reinterpret_cast(&compressedSize), sizeof(int32_t)); + char writeValue = compressed ? 1 : 0; + out->write(reinterpret_cast(&writeValue), sizeof(char)); + } + static size_t size() { return sizeof(int32_t) * 2 + sizeof(char); } @@ -210,9 +207,9 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer { ++stats_.numCompressionSkipped; } else { // Compress the buffer if satisfied condition. - const auto toCompress = buffersToIOBuf(buffers_); + const auto toCompress = toIOBuf(buffers_); const auto compressedBuffer = codec_->compress(toCompress.get()); - const auto compressedSize = compressedBuffer->length(); + const int32_t compressedSize = compressedBuffer->length(); stats_.compressionInputBytes += size; stats_.compressedBytes += compressedSize; if (compressedSize > opts_.minCompressionRatio * size) { @@ -222,9 +219,8 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer { flushUncompressed(size, stream); } else { // Do the compression. - writeInt32(stream, size); - writeInt32(stream, compressedSize); - writeBool(stream, true); + CompactRowHeader header = {size, compressedSize, true}; + header.write(stream); for (auto range : *compressedBuffer) { stream->write( reinterpret_cast(range.data()), range.size()); @@ -265,9 +261,8 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer { } void flushUncompressed(int32_t size, OutputStream* stream) { - writeInt32(stream, size); - writeInt32(stream, size); - writeBool(stream, false); + CompactRowHeader header = {size, size, false}; + header.write(stream); for (const auto& buffer : buffers_) { stream->write(buffer->as(), buffer->size()); }