Skip to content

Commit

Permalink
[VL] Add reader process to shuffle benchmark (#6682)
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma authored Aug 6, 2024
1 parent 5cedfd1 commit d746f0f
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 91 deletions.
18 changes: 15 additions & 3 deletions cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -609,11 +609,23 @@ arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr<

if (lastEvictPid_ != -1 && partitionId < lastEvictPid_) {
RETURN_NOT_OK(finishSpill(true));
lastEvictPid_ = -1;
}
lastEvictPid_ = partitionId;

RETURN_NOT_OK(requestSpill(stop));
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload)));

if (!stop) {
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload)));
} else {
if (spills_.size() > 0) {
for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) {
auto bytesEvicted = totalBytesEvicted_;
RETURN_NOT_OK(mergeSpills(pid));
partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted;
}
}
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload)));
}
lastEvictPid_ = partitionId;
return arrow::Status::OK();
}

Expand Down
1 change: 0 additions & 1 deletion cpp/core/shuffle/Payload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> BlockPayload::readBufferAt(uint32_

arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> BlockPayload::deserialize(
arrow::io::InputStream* inputStream,
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
arrow::MemoryPool* pool,
uint32_t& numRows,
Expand Down
1 change: 0 additions & 1 deletion cpp/core/shuffle/Payload.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ class BlockPayload final : public Payload {

static arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> deserialize(
arrow::io::InputStream* inputStream,
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
arrow::MemoryPool* pool,
uint32_t& numRows,
Expand Down
Loading

0 comments on commit d746f0f

Please sign in to comment.