Skip to content

Commit

Permalink
update metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Jul 24, 2024
1 parent d892f09 commit 4655d31
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
12 changes: 8 additions & 4 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
}

arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) {
auto payload = prepareToEvict(begin, end);
RETURN_NOT_OK(partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_));
return arrow::Status::OK();
}

std::unique_ptr<InMemoryPayload> VeloxSortShuffleWriter::prepareToEvict(size_t begin, size_t end) {
ScopedTimer timer(&sortTime_);
// Serialize [begin, end)
uint32_t numRows = end - begin;
uint64_t rawSize = numRows * sizeof(RowSizeType);
Expand Down Expand Up @@ -264,10 +271,7 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
std::vector<std::shared_ptr<arrow::Buffer>> buffers;
buffers.push_back(std::make_shared<arrow::Buffer>(rawData, rawSize));

auto payload = std::make_unique<InMemoryPayload>(numRows, nullptr, std::move(buffers));
RETURN_NOT_OK(
partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_));
return arrow::Status::OK();
return std::make_unique<InMemoryPayload>(numRows, nullptr, std::move(buffers));
}

uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t rows) {
Expand Down
2 changes: 2 additions & 0 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end);

std::unique_ptr<InMemoryPayload> prepareToEvict(size_t begin, size_t end);

uint32_t maxRowsToInsert(uint32_t offset, uint32_t rows);

void acquireNewBuffer(int64_t memLimit, uint64_t minSizeRequired);
Expand Down

0 comments on commit 4655d31

Please sign in to comment.