Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Dec 13, 2024
1 parent cdd52de commit 6c02824
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 56 deletions.
32 changes: 32 additions & 0 deletions velox/exec/Exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,38 @@

namespace facebook::velox::exec {

namespace {
std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions(
VectorSerde::Kind kind) {
std::unique_ptr<VectorSerde::Options> options =
kind == VectorSerde::Kind::kPresto
? std::make_unique<serializer::presto::PrestoVectorSerde::PrestoOptions>()
: std::make_unique<VectorSerde::Options>();
options->compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
return options;
}
} // namespace

Exchange::Exchange(
int32_t operatorId,
DriverCtx* driverCtx,
const std::shared_ptr<const core::ExchangeNode>& exchangeNode,
std::shared_ptr<ExchangeClient> exchangeClient,
const std::string& operatorType)
: SourceOperator(
driverCtx,
exchangeNode->outputType(),
operatorId,
exchangeNode->id(),
operatorType),
preferredOutputBatchBytes_{
driverCtx->queryConfig().preferredOutputBatchBytes()},
serdeKind_(exchangeNode->serdeKind()),
options_{getVectorSerdeOptions(serdeKind_)},
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
exchangeClient_{std::move(exchangeClient)} {}

void Exchange::addTaskIds(std::vector<std::string>& taskIds) {
std::shuffle(std::begin(taskIds), std::end(taskIds), rng_);
for (const std::string& taskId : taskIds) {
Expand Down
46 changes: 14 additions & 32 deletions velox/exec/Exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,7 @@ class Exchange : public SourceOperator {
DriverCtx* driverCtx,
const std::shared_ptr<const core::ExchangeNode>& exchangeNode,
std::shared_ptr<ExchangeClient> exchangeClient,
const std::string& operatorType = "Exchange")
: SourceOperator(
driverCtx,
exchangeNode->outputType(),
operatorId,
exchangeNode->id(),
operatorType),
preferredOutputBatchBytes_{
driverCtx->queryConfig().preferredOutputBatchBytes()},
serdeKind_(exchangeNode->serdeKind()),
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
exchangeClient_{std::move(exchangeClient)} {
if (serdeKind_ == VectorSerde::Kind::kPresto) {
options_ = std::make_unique<
serializer::presto::PrestoVectorSerde::PrestoOptions>();
} else {
options_ = std::make_unique<VectorSerde::Options>();
}
options_->compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
}
const std::string& operatorType = "Exchange");

~Exchange() override {
close();
Expand All @@ -82,16 +62,16 @@ class Exchange : public SourceOperator {
private:
// Invoked to create exchange client for remote tasks.
// The function shuffles the source task ids first to randomize the source
// tasks we fetch data from. This helps to avoid different tasks fetching from
// the same source task in a distributed system.
// tasks we fetch data from. This helps to avoid different tasks fetching
// from the same source task in a distributed system.
void addTaskIds(std::vector<std::string>& taskIds);

/// Fetches splits from the task until there are no more splits or task
/// returns a future that will be complete when more splits arrive. Adds
/// splits to exchangeClient_. Returns true if received a future from the task
/// and sets the 'future' parameter. Returns false if fetched all splits or if
/// this operator is not the first operator in the pipeline and therefore is
/// not responsible for fetching splits and adding them to the
/// splits to exchangeClient_. Returns true if received a future from the
/// task and sets the 'future' parameter. Returns false if fetched all
/// splits or if this operator is not the first operator in the pipeline and
/// therefore is not responsible for fetching splits and adding them to the
/// exchangeClient_.
bool getSplits(ContinueFuture* future);

Expand All @@ -103,16 +83,19 @@ class Exchange : public SourceOperator {

const VectorSerde::Kind serdeKind_;

/// True if this operator is responsible for fetching splits from the Task and
/// passing these to ExchangeClient.
const std::unique_ptr<VectorSerde::Options> options_;

/// True if this operator is responsible for fetching splits from the Task
/// and passing these to ExchangeClient.
const bool processSplits_;

bool noMoreSplits_ = false;

std::shared_ptr<ExchangeClient> exchangeClient_;

/// A future received from Task::getSplitOrFuture(). It will be complete when
/// there are more splits available or no-more-splits signal has arrived.
/// A future received from Task::getSplitOrFuture(). It will be complete
/// when there are more splits available or no-more-splits signal has
/// arrived.
ContinueFuture splitFuture_{ContinueFuture::makeEmpty()};

// Reusable result vector.
Expand All @@ -121,7 +104,6 @@ class Exchange : public SourceOperator {
std::vector<std::unique_ptr<SerializedPage>> currentPages_;
bool atEnd_{false};
std::default_random_engine rng_{std::random_device{}()};
std::unique_ptr<VectorSerde::Options> options_;
};

} // namespace facebook::velox::exec
9 changes: 6 additions & 3 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ Destination::Destination(
const std::string& taskId,
int destination,
VectorSerde* serde,
VectorSerde::Options* options,
memory::MemoryPool* pool,
bool eagerFlush,
std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued)
: taskId_(taskId),
destination_(destination),
serde_(serde),
options_(getVectorSerdeOptions(serde_->kind())),
options_(options),
pool_(pool),
eagerFlush_(eagerFlush),
recordEnqueued_(std::move(recordEnqueued)) {
Expand Down Expand Up @@ -88,7 +89,7 @@ BlockingReason Destination::advance(
if (current_ == nullptr) {
current_ = std::make_unique<VectorStreamGroup>(pool_, serde_);
const auto rowType = asRowType(output->type());
current_->createStreamTree(rowType, rowsInCurrent_, options_.get());
current_->createStreamTree(rowType, rowsInCurrent_, options_);
}

const auto rows = folly::Range(&rows_[firstRow], rowIdx_ - firstRow);
Expand Down Expand Up @@ -198,7 +199,8 @@ PartitionedOutput::PartitionedOutput(
->queryConfig()
.maxPartitionedOutputBufferSize()),
eagerFlush_(eagerFlush),
serde_(getNamedVectorSerde(planNode->serdeKind())) {
serde_(getNamedVectorSerde(planNode->serdeKind())),
options_(getVectorSerdeOptions(planNode->serdeKind())) {
if (!planNode->isPartitioned()) {
VELOX_USER_CHECK_EQ(numDestinations_, 1);
}
Expand Down Expand Up @@ -254,6 +256,7 @@ void PartitionedOutput::initializeDestinations() {
taskId,
i,
serde_,
options_.get(),
pool(),
eagerFlush_,
[&](uint64_t bytes, uint64_t rows) {
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/PartitionedOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Destination {
const std::string& taskId,
int destination,
VectorSerde* serde,
VectorSerde::Options* options,
memory::MemoryPool* pool,
bool eagerFlush,
std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued);
Expand Down Expand Up @@ -104,7 +105,7 @@ class Destination {
const std::string taskId_;
const int destination_;
VectorSerde* const serde_;
const std::unique_ptr<VectorSerde::Options> options_;
VectorSerde::Options* const options_;
memory::MemoryPool* const pool_;
const bool eagerFlush_;
const std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued_;
Expand Down Expand Up @@ -219,6 +220,7 @@ class PartitionedOutput : public Operator {
const int64_t maxBufferedBytes_;
const bool eagerFlush_;
VectorSerde* const serde_;
const std::unique_ptr<VectorSerde::Options> options_;

BlockingReason blockingReason_{BlockingReason::kNotBlocked};
ContinueFuture future_;
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/SpillFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ uint64_t SpillWriter::write(
kDefaultUseLosslessTimestamp,
compressionKind_,
0.8,
true /*nullsFirst*/};
/*nullsFirst=*/true};
batch_ = std::make_unique<VectorStreamGroup>(pool_, serde_);
batch_->createStreamTree(
std::static_pointer_cast<const RowType>(rows->type()),
Expand Down
33 changes: 14 additions & 19 deletions velox/serializers/CompactRowSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,14 @@ namespace facebook::velox::serializer {
namespace {
using TRowSize = uint32_t;

void writeInt32(OutputStream* out, int32_t value) {
out->write(reinterpret_cast<char*>(&value), sizeof(value));
}

void writeBool(OutputStream* out, bool value) {
char writeValue = value ? 1 : 0;
out->write(reinterpret_cast<char*>(&writeValue), sizeof(char));
}

VectorSerde::Options toValidOptions(const VectorSerde::Options* options) {
if (options == nullptr) {
return VectorSerde::Options();
}
return *options;
}

std::unique_ptr<folly::IOBuf> buffersToIOBuf(
const std::vector<BufferPtr>& buffers) {
std::unique_ptr<folly::IOBuf> toIOBuf(const std::vector<BufferPtr>& buffers) {
std::unique_ptr<folly::IOBuf> iobuf;
for (const auto& buffer : buffers) {
auto newBuf =
Expand Down Expand Up @@ -71,6 +61,13 @@ struct CompactRowHeader {
return header;
}

void write(OutputStream* out) {
out->write(reinterpret_cast<char*>(&uncompressedSize), sizeof(int32_t));
out->write(reinterpret_cast<char*>(&compressedSize), sizeof(int32_t));
char writeValue = compressed ? 1 : 0;
out->write(reinterpret_cast<char*>(&writeValue), sizeof(char));
}

static size_t size() {
return sizeof(int32_t) * 2 + sizeof(char);
}
Expand Down Expand Up @@ -210,9 +207,9 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer {
++stats_.numCompressionSkipped;
} else {
// Compress the buffer if satisfied condition.
const auto toCompress = buffersToIOBuf(buffers_);
const auto toCompress = toIOBuf(buffers_);
const auto compressedBuffer = codec_->compress(toCompress.get());
const auto compressedSize = compressedBuffer->length();
const int32_t compressedSize = compressedBuffer->length();
stats_.compressionInputBytes += size;
stats_.compressedBytes += compressedSize;
if (compressedSize > opts_.minCompressionRatio * size) {
Expand All @@ -222,9 +219,8 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer {
flushUncompressed(size, stream);
} else {
// Do the compression.
writeInt32(stream, size);
writeInt32(stream, compressedSize);
writeBool(stream, true);
CompactRowHeader header = {size, compressedSize, true};
header.write(stream);
for (auto range : *compressedBuffer) {
stream->write(
reinterpret_cast<const char*>(range.data()), range.size());
Expand Down Expand Up @@ -265,9 +261,8 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer {
}

void flushUncompressed(int32_t size, OutputStream* stream) {
writeInt32(stream, size);
writeInt32(stream, size);
writeBool(stream, false);
CompactRowHeader header = {size, size, false};
header.write(stream);
for (const auto& buffer : buffers_) {
stream->write(buffer->as<char>(), buffer->size());
}
Expand Down

0 comments on commit 6c02824

Please sign in to comment.