From 9e32bd04fdf33f30191bd67a656314143f06cd88 Mon Sep 17 00:00:00 2001 From: aditi-pandit Date: Tue, 5 Nov 2024 20:57:18 +0530 Subject: [PATCH] Refactor TopNRowNumber::getOutputFromMemory --- velox/exec/TopNRowNumber.cpp | 83 +++++++++++++++--------------------- velox/exec/TopNRowNumber.h | 28 ++++++------ 2 files changed, 49 insertions(+), 62 deletions(-) diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index 8d22ac3bb0b7..932128c48d0c 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -321,14 +321,14 @@ void TopNRowNumber::updateEstimatedOutputRowSize() { TopNRowNumber::TopRows* TopNRowNumber::nextPartition() { if (!table_) { - if (!currentPartition_) { - currentPartition_ = 0; + if (!currentPartitionNumber_) { + currentPartitionNumber_ = 0; return singlePartition_.get(); } return nullptr; } - if (!currentPartition_) { + if (!currentPartitionNumber_) { numPartitions_ = table_->listAllRows( &partitionIt_, partitions_.size(), @@ -339,38 +339,28 @@ TopNRowNumber::TopRows* TopNRowNumber::nextPartition() { return nullptr; } - currentPartition_ = 0; + currentPartitionNumber_ = 0; } else { - ++currentPartition_.value(); - if (currentPartition_ >= numPartitions_) { - currentPartition_.reset(); + ++currentPartitionNumber_.value(); + if (currentPartitionNumber_ >= numPartitions_) { + currentPartitionNumber_.reset(); return nextPartition(); } } - return ¤tPartition(); -} - -TopNRowNumber::TopRows& TopNRowNumber::currentPartition() { - VELOX_CHECK(currentPartition_.has_value()); - - if (!table_) { - return *singlePartition_; - } - - return partitionAt(partitions_[currentPartition_.value()]); + return &partitionAt(partitions_[currentPartitionNumber_.value()]); } void TopNRowNumber::appendPartitionRows( TopRows& partition, - vector_size_t start, - vector_size_t size, + vector_size_t numRows, vector_size_t outputOffset, FlatVector* rowNumbers) { - // Append 'size' partition rows in reverse order starting from 'start' row. - auto rowNumber = partition.rows.size() - start; - for (auto i = 0; i < size; ++i) { - const auto index = outputOffset + size - i - 1; + // The partition.rows priority queue pops rows in order of reverse + // row numbers. + auto rowNumber = partition.rows.size(); + for (auto i = 0; i < numRows; ++i) { + auto index = outputOffset + i; if (rowNumbers) { rowNumbers->set(index, rowNumber--); } @@ -433,37 +423,34 @@ RowVectorPtr TopNRowNumber::getOutputFromMemory() { } vector_size_t offset = 0; - if (remainingRowsInPartition_ > 0) { - auto& partition = currentPartition(); - auto start = partition.rows.size() - remainingRowsInPartition_; - const auto numRows = - std::min(outputBatchSize_, remainingRowsInPartition_); - appendPartitionRows(partition, start, numRows, offset, rowNumbers); - offset += numRows; - remainingRowsInPartition_ -= numRows; - } - + // Continue to output as many remaining partitions as possible. while (offset < outputBatchSize_) { - auto* partition = nextPartition(); - if (!partition) { - break; + // No previous partition to output (since this is the first partition). + if (!currentPartition_) { + currentPartition_ = nextPartition(); + if (!currentPartition_) { + break; + } } - auto numRows = partition->rows.size(); - if (offset + numRows > outputBatchSize_) { - remainingRowsInPartition_ = offset + numRows - outputBatchSize_; - - // Add a subset of partition rows. - numRows -= remainingRowsInPartition_; - appendPartitionRows(*partition, 0, numRows, offset, rowNumbers); - offset += numRows; + auto numOutputRowsLeft = outputBatchSize_ - offset; + if (currentPartition_->rows.size() > numOutputRowsLeft) { + // Only a partial partition can be output in this getOutput() call. + // Output as many rows as possible. + appendPartitionRows( + *currentPartition_, numOutputRowsLeft, offset, rowNumbers); + offset += numOutputRowsLeft; break; } // Add all partition rows. - appendPartitionRows(*partition, 0, numRows, offset, rowNumbers); - offset += numRows; - remainingRowsInPartition_ = 0; + auto numPartitionRows = currentPartition_->rows.size(); + appendPartitionRows( + *currentPartition_, numPartitionRows, offset, rowNumbers); + offset += numPartitionRows; + + // Move to the next partition. + currentPartition_ = nextPartition(); } if (offset == 0) { diff --git a/velox/exec/TopNRowNumber.h b/velox/exec/TopNRowNumber.h index a9163ab0086d..883d815b72ba 100644 --- a/velox/exec/TopNRowNumber.h +++ b/velox/exec/TopNRowNumber.h @@ -103,15 +103,11 @@ class TopNRowNumber : public Operator { // partitions left. TopRows* nextPartition(); - // Returns partition that was partially added to the previous output batch. - TopRows& currentPartition(); - - // Appends partition rows to outputRows_ and optionally populates row - // numbers. + // Appends numRows of partition rows to outputRows_. Note : partition.rows + // tops rows in reverse row number order. void appendPartitionRows( TopRows& partition, - vector_size_t start, - vector_size_t size, + vector_size_t numRows, vector_size_t outputOffset, FlatVector* rowNumbers); @@ -219,29 +215,33 @@ class TopNRowNumber : public Operator { // Maximum number of rows in the output batch. vector_size_t outputBatchSize_; + // The below variables are used when outputting from memory. + // Vector of pointers to individual rows in the RowContainer for the current + // output block. std::vector outputRows_; - // Number of partitions to fetch from a HashTable in a single listAllRows // call. static const size_t kPartitionBatchSize = 100; BaseHashTable::RowsIterator partitionIt_; - std::vector partitions_{kPartitionBatchSize}; - size_t numPartitions_{0}; - std::optional currentPartition_; - - vector_size_t remainingRowsInPartition_{0}; + // THis is the index of the current partition within partitions_ which is + // obtained from the HashTable iterator. + std::optional currentPartitionNumber_; + // This is the currentPartition being output. It is possible that the + // partition is output across multiple output blocks. + TopNRowNumber::TopRows* currentPartition_{nullptr}; + // The below variables are used when outputting from the spiller. // Spiller for contents of the 'data_'. std::unique_ptr spiller_; // Used to sort-merge spilled data. std::unique_ptr> merge_; - // Row number for the first row in the next output batch. + // Row number for the first row in the next output batch from the spiller. int32_t nextRowNumber_{0}; }; } // namespace facebook::velox::exec