Skip to content

Commit

Permalink
Unnest respect kPrefferenOutputBatchRows strictly
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Aug 12, 2024
1 parent c6c67f1 commit 58fcd43
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 80 deletions.
170 changes: 148 additions & 22 deletions velox/exec/Unnest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/exec/Unnest.h"
#include <iostream>
#include "velox/common/base/Nulls.h"
#include "velox/vector/FlatVector.h"

Expand All @@ -29,7 +30,8 @@ Unnest::Unnest(
operatorId,
unnestNode->id(),
"Unnest"),
withOrdinality_(unnestNode->withOrdinality()) {
withOrdinality_(unnestNode->withOrdinality()),
maxOutputSize_(outputBatchRows()) {
const auto& inputType = unnestNode->sources()[0]->outputType();
const auto& unnestVariables = unnestNode->unnestVariables();
for (const auto& variable : unnestVariables) {
Expand Down Expand Up @@ -111,20 +113,61 @@ RowVectorPtr Unnest::getOutput() {
}

const auto size = input_->size();
const auto maxOutputSize = outputBatchRows();

// Limit the number of input rows to keep output batch size within
// 'maxOutputSize' if possible. Process each input row fully. Do not break
// single row's output into multiple batches.
// 'maxOutputSize' if possible. Not process each input row fully when single
// row's output exceeds maxOutputSize. Single row's output maybe into
// multiple batches.
vector_size_t numInput = 0;
vector_size_t numElements = 0;
for (auto row = nextInputRow_; row < size; ++row) {
numElements += rawMaxSizes_[row];
vector_size_t partialProcessRowStartSize = -1;
vector_size_t firstRowEndSize = -1;
// Process first row.
if (nextInputRow_ < size) {
auto firstRow = nextInputRow_;
firstRowEndSize = rawMaxSizes_[firstRow];
vector_size_t remainingSize = firstRowEndSize - firstRowStartSize_;
if (numElements + remainingSize > maxOutputSize_) {
// Single row's output is into multiple batches.
// Read the size range from them, not use 0 to rawMaxSizes_[row].
firstRowEndSize = firstRowStartSize_ + maxOutputSize_ - numElements;
// Process maxOutputSize_ in this getOutput.
numElements = maxOutputSize_;
partialProcessRowStartSize = firstRowEndSize;
} else {
// Not need to split this row
numElements += remainingSize;
}
++numInput;

if (numElements >= maxOutputSize) {
}
// Not split middle row.
// If there is only 1 row, the end row will not take effect, its startSize is
// always 0.
vector_size_t endRowEndSize = -1;
for (auto row = nextInputRow_ + 1; row < size; ++row) {
if (numElements >= maxOutputSize_) {
break;
}
vector_size_t remainingSize = rawMaxSizes_[row];
if (numElements + remainingSize > maxOutputSize_) {
// This is the end row.
// Single row's output is into multiple batches.
// read the size range from them, not use 0 to rawMaxSizes_[row].
endRowEndSize = maxOutputSize_ - numElements;
// Process maxOutputSize_ in this getOutput.
numElements = maxOutputSize_;
partialProcessRowStartSize = endRowEndSize;
++numInput;
break;
} else {
// Not split this row.
numElements += remainingSize;
++numInput;
}
}
// The end row is not partial, set it to the maxSize.
if (endRowEndSize == -1 && numInput > 1) {
endRowEndSize = rawMaxSizes_[nextInputRow_ + numInput - 1];
}

if (numElements == 0) {
Expand All @@ -134,9 +177,20 @@ RowVectorPtr Unnest::getOutput() {
return nullptr;
}

auto output = generateOutput(nextInputRow_, numInput, numElements);

nextInputRow_ += numInput;
std::cout << "row1 startSize " << firstRowStartSize_ << " endSize "
<< firstRowEndSize << " end row endSize " << endRowEndSize
<< std::endl;

auto output = generateOutput(
nextInputRow_, numInput, numElements, firstRowEndSize, endRowEndSize);
std::cout << "generate output" << output->toString(0, 300) << std::endl;
if (partialProcessRowStartSize != -1) {
firstRowStartSize_ = partialProcessRowStartSize;
nextInputRow_ += numInput - 1;
} else {
firstRowStartSize_ = 0;
nextInputRow_ += numInput;
}

if (nextInputRow_ >= size) {
input_ = nullptr;
Expand All @@ -150,17 +204,29 @@ void Unnest::generateRepeatedColumns(
vector_size_t start,
vector_size_t size,
vector_size_t numElements,
std::vector<VectorPtr>& outputs) {
std::vector<VectorPtr>& outputs,
vector_size_t firstRowEndSize,
vector_size_t endRowEndSize) {
// Create "indices" buffer to repeat rows as many times as there are elements
// in the array (or map) in unnestDecoded.
auto repeatedIndices = allocateIndices(numElements, pool());
auto* rawRepeatedIndices = repeatedIndices->asMutable<vector_size_t>();
vector_size_t index = 0;
for (auto row = start; row < start + size; ++row) {
if (size > 0) {
for (auto i = firstRowStartSize_; i < firstRowEndSize; i++) {
rawRepeatedIndices[index++] = start;
}
}
for (auto row = start + 1; row < start + size - 1; ++row) {
for (auto i = 0; i < rawMaxSizes_[row]; i++) {
rawRepeatedIndices[index++] = row;
}
}
if (size > 1) {
for (auto i = 0; i < endRowEndSize; i++) {
rawRepeatedIndices[index++] = start + size - 1;
}
}

// Wrap "replicated" columns in a dictionary using 'repeatedIndices'.
for (const auto& projection : identityProjections_) {
Expand All @@ -176,7 +242,9 @@ const Unnest::UnnestChannelEncoding Unnest::generateEncodingForChannel(
column_index_t channel,
vector_size_t start,
vector_size_t size,
vector_size_t numElements) {
vector_size_t numElements,
vector_size_t firstRowEndSize,
vector_size_t endRowEndSize) {
BufferPtr elementIndices = allocateIndices(numElements, pool());
auto* rawElementIndices = elementIndices->asMutable<vector_size_t>();

Expand All @@ -191,7 +259,45 @@ const Unnest::UnnestChannelEncoding Unnest::generateEncodingForChannel(
// Make dictionary index for elements column since they may be out of order.
vector_size_t index = 0;
bool identityMapping = true;
for (auto row = start; row < start + size; ++row) {
if (firstRowStartSize_ != 0) {
identityMapping = false;
}

auto firstEndRowGenerator =
[&](vector_size_t row, vector_size_t startSize, vector_size_t endSize) {
if (!currentDecoded.isNullAt(row)) {
const auto offset = currentOffsets[currentIndices[row]];
const auto unnestSize = currentSizes[currentIndices[row]];
if (index != offset || endSize != rawMaxSizes_[row] ||
unnestSize < endSize) {
identityMapping = false;
}
auto currentUnnestSize = std::min(endSize, unnestSize);
std::cout << "for channel " << channel << " numElements "
<< numElements << " for row " << row << "startSize "
<< startSize << "endSize " << endSize << " offset "
<< offset << " unnestSize " << unnestSize << std::endl;
for (auto i = startSize; i < currentUnnestSize; i++) {
rawElementIndices[index++] = offset + i;
}

for (auto i = currentUnnestSize; i < endSize; ++i) {
bits::setNull(rawNulls, index++, true);
}
} else if (endSize - startSize > 0) {
identityMapping = false;

for (auto i = startSize; i < endSize; ++i) {
bits::setNull(rawNulls, index++, true);
}
}
};

if (size > 0) {
firstEndRowGenerator(start, firstRowStartSize_, firstRowEndSize);
}

for (auto row = start + 1; row < start + size - 1; ++row) {
const auto maxSize = rawMaxSizes_[row];

if (!currentDecoded.isNullAt(row)) {
Expand All @@ -217,40 +323,59 @@ const Unnest::UnnestChannelEncoding Unnest::generateEncodingForChannel(
}
}
}

if (size > 1) {
firstEndRowGenerator(start + size - 1, 0, endRowEndSize);
}

return {elementIndices, nulls, identityMapping};
}

VectorPtr Unnest::generateOrdinalityVector(
vector_size_t start,
vector_size_t size,
vector_size_t numElements) {
vector_size_t numElements,
vector_size_t firstRowEndSize,
vector_size_t endRowEndSize) {
auto ordinalityVector =
BaseVector::create<FlatVector<int64_t>>(BIGINT(), numElements, pool());

// Set the ordinality at each result row to be the index of the element in
// the original array (or map) plus one.
auto* rawOrdinality = ordinalityVector->mutableRawValues();
for (auto row = start; row < start + size; ++row) {
if (size > 0) {
const auto maxSize = firstRowEndSize - firstRowStartSize_;
std::iota(rawOrdinality, rawOrdinality + maxSize, firstRowStartSize_ + 1);
rawOrdinality += maxSize;
}
for (auto row = start + 1; row < start + size - 1; ++row) {
const auto maxSize = rawMaxSizes_[row];
std::iota(rawOrdinality, rawOrdinality + maxSize, 1);
rawOrdinality += maxSize;
}
if (size > 1) {
std::iota(rawOrdinality, rawOrdinality + endRowEndSize, 1);
rawOrdinality += endRowEndSize;
}

return ordinalityVector;
}

RowVectorPtr Unnest::generateOutput(
vector_size_t start,
vector_size_t size,
vector_size_t numElements) {
vector_size_t numElements,
vector_size_t firstRowEndSize,
vector_size_t endRowEndSize) {
std::vector<VectorPtr> outputs(outputType_->size());
generateRepeatedColumns(start, size, numElements, outputs);
generateRepeatedColumns(
start, size, numElements, outputs, firstRowEndSize, endRowEndSize);

// Create unnest columns.
vector_size_t outputsIndex = identityProjections_.size();
for (auto channel = 0; channel < unnestChannels_.size(); ++channel) {
const auto unnestChannelEncoding =
generateEncodingForChannel(channel, start, size, numElements);
const auto unnestChannelEncoding = generateEncodingForChannel(
channel, start, size, numElements, firstRowEndSize, endRowEndSize);

auto& currentDecoded = unnestDecoded_[channel];
if (currentDecoded.base()->typeKind() == TypeKind::ARRAY) {
Expand All @@ -272,7 +397,8 @@ RowVectorPtr Unnest::generateOutput(

if (withOrdinality_) {
// Ordinality column is always at the end.
outputs.back() = generateOrdinalityVector(start, size, numElements);
outputs.back() = generateOrdinalityVector(
start, size, numElements, firstRowEndSize, endRowEndSize);
}

return std::make_shared<RowVector>(
Expand Down
20 changes: 16 additions & 4 deletions velox/exec/Unnest.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,27 @@ class Unnest : public Operator {

private:
// Generate output for 'size' input rows starting from 'start' input row.
// Get the firstRowStartSize from class member `firstRowStartSize_`.
//
// @param start First input row to include in the output.
// @param size Number of input rows to include in the output.
// @param outputSize Pre-computed number of output rows.
RowVectorPtr generateOutput(
vector_size_t start,
vector_size_t size,
vector_size_t outputSize);
vector_size_t outputSize,
vector_size_t firstRowEndSize,
vector_size_t endRowEndSize);

// Invoked by generateOutput function above to generate the repeated output
// columns.
void generateRepeatedColumns(
vector_size_t start,
vector_size_t size,
vector_size_t numElements,
std::vector<VectorPtr>& outputs);
std::vector<VectorPtr>& outputs,
vector_size_t firstRowEndSize,
vector_size_t endRowEndSize);

struct UnnestChannelEncoding {
BufferPtr indices;
Expand All @@ -71,22 +76,29 @@ class Unnest : public Operator {
column_index_t channel,
vector_size_t start,
vector_size_t size,
vector_size_t numElements);
vector_size_t numElements,
vector_size_t firstRowEndSize,
vector_size_t endRowEndSize);

// Invoked by generateOutput for the ordinality column.
VectorPtr generateOrdinalityVector(
vector_size_t start,
vector_size_t size,
vector_size_t numElements);
vector_size_t numElements,
vector_size_t firstRowEndSize,
vector_size_t endRowEndSize);

const bool withOrdinality_;
std::vector<column_index_t> unnestChannels_;

std::vector<DecodedVector> unnestDecoded_;

const uint32_t maxOutputSize_;
BufferPtr maxSizes_;
vector_size_t* rawMaxSizes_{nullptr};

vector_size_t firstRowStartSize_ = 0;

std::vector<const vector_size_t*> rawSizes_;
std::vector<const vector_size_t*> rawOffsets_;
std::vector<const vector_size_t*> rawIndices_;
Expand Down
Loading

0 comments on commit 58fcd43

Please sign in to comment.