diff --git a/velox/common/base/PrefixSortConfig.h b/velox/common/base/PrefixSortConfig.h index 27048174d857..46ac49391554 100644 --- a/velox/common/base/PrefixSortConfig.h +++ b/velox/common/base/PrefixSortConfig.h @@ -24,14 +24,16 @@ namespace facebook::velox::common { struct PrefixSortConfig { PrefixSortConfig() = default; - PrefixSortConfig(int64_t _maxNormalizedKeySize, int32_t _threshold) - : maxNormalizedKeySize(_maxNormalizedKeySize), threshold(_threshold) {} + PrefixSortConfig(uint32_t _maxNormalizedKeyBytes, uint32_t _minNumRows) + : maxNormalizedKeyBytes(_maxNormalizedKeyBytes), + minNumRows(_minNumRows) {} - /// Max number of bytes can store normalized keys in prefix-sort buffer per - /// entry. Same with QueryConfig kPrefixSortNormalizedKeyMaxBytes. - int64_t maxNormalizedKeySize{128}; + /// Maximum bytes that can be used to store normalized keys in prefix-sort + /// buffer per entry. Same with QueryConfig kPrefixSortNormalizedKeyMaxBytes. + uint32_t maxNormalizedKeyBytes{128}; - /// PrefixSort will have performance regression when the dateset is too small. - int32_t threshold{130}; + /// Minimum number of rows to apply prefix sort. Prefix sort does not perform + /// with small datasets. + uint32_t minNumRows{128}; }; } // namespace facebook::velox::common diff --git a/velox/common/base/SpillConfig.h b/velox/common/base/SpillConfig.h index 09790dff2d7d..aeca392f3878 100644 --- a/velox/common/base/SpillConfig.h +++ b/velox/common/base/SpillConfig.h @@ -75,6 +75,11 @@ struct SpillConfig { /// Checks if the given 'startBitOffset' has exceeded the max spill limit. bool exceedSpillLevelLimit(uint8_t startBitOffset) const; + /// Returns true if prefix sort is enabled. + bool prefixSortEnabled() const { + return prefixSortConfig.has_value(); + } + /// A callback function that returns the spill directory path. Implementations /// can use it to ensure the path exists before returning. GetSpillDirectoryPathCB getSpillDirPathCb; diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index 263feb95e8a3..36c115b6a2c0 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -237,8 +237,8 @@ bool AggregationNode::canSpill(const QueryConfig& queryConfig) const { } // TODO: add spilling for pre-grouped aggregation later: // https://github.com/facebookincubator/velox/issues/3264 - return (isFinal() || isSingle()) && preGroupedKeys().empty() && - queryConfig.aggregationSpillEnabled(); + return (isFinal() || isSingle()) && !groupingKeys().empty() && + preGroupedKeys().empty() && queryConfig.aggregationSpillEnabled(); } void AggregationNode::addDetails(std::stringstream& stream) const { diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 8bdcd72ebc67..7bceb06d117b 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -836,12 +836,12 @@ class QueryConfig { return get(kDriverCpuTimeSliceLimitMs, 0); } - int64_t prefixSortNormalizedKeyMaxBytes() const { - return get(kPrefixSortNormalizedKeyMaxBytes, 128); + uint32_t prefixSortNormalizedKeyMaxBytes() const { + return get(kPrefixSortNormalizedKeyMaxBytes, 128); } - int32_t prefixSortMinRows() const { - return get(kPrefixSortMinRows, 130); + uint32_t prefixSortMinRows() const { + return get(kPrefixSortMinRows, 128); } double scaleWriterRebalanceMaxMemoryUsageRatio() const { diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index e6ce8423b28b..f25b89a5518e 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -142,7 +142,7 @@ Generic Configuration - Maximum number of bytes to use for the normalized key in prefix-sort. Use 0 to disable prefix-sort. * - prefixsort_min_rows - integer - - 130 + - 128 - Minimum number of rows to use prefix-sort. The default value has been derived using micro-benchmarking. .. _expression-evaluation-conf: diff --git a/velox/docs/monitoring/stats.rst b/velox/docs/monitoring/stats.rst index f77d2ba824ca..53538d854974 100644 --- a/velox/docs/monitoring/stats.rst +++ b/velox/docs/monitoring/stats.rst @@ -190,3 +190,18 @@ These stats are reported by shuffle operators. - Indicates the vector serde kind used by an operator for shuffle with 1 for Presto, 2 for CompactRow, 3 for UnsafeRow. It is reported by Exchange, MergeExchange and PartitionedOutput operators for now. + +PrefixSort +---------- +These stats are reported by prefix sort. + +.. list-table:: + :widths: 50 25 50 + :header-rows: 1 + + * - Stats + - Unit + - Description + * - numPrefixSortKeys + - + - The number of columns sorted using prefix sort. diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index 31d1d3e2ad0d..b0ba52cd9828 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -44,6 +44,7 @@ GroupingSet::GroupingSet( const RowTypePtr& inputType, std::vector>&& hashers, std::vector&& preGroupedKeys, + std::vector&& groupingKeyOutputProjections, std::vector&& aggregates, bool ignoreNullKeys, bool isPartial, @@ -55,6 +56,7 @@ GroupingSet::GroupingSet( OperatorCtx* operatorCtx, folly::Synchronized* spillStats) : preGroupedKeyChannels_(std::move(preGroupedKeys)), + groupingKeyOutputProjections_(std::move(groupingKeyOutputProjections)), hashers_(std::move(hashers)), isGlobal_(hashers_.empty()), isPartial_(isPartial), @@ -74,9 +76,21 @@ GroupingSet::GroupingSet( spillStats_(spillStats) { VELOX_CHECK_NOT_NULL(nonReclaimableSection_); VELOX_CHECK(pool_.trackUsage()); + for (auto& hasher : hashers_) { keyChannels_.push_back(hasher->channel()); } + + if (groupingKeyOutputProjections_.empty()) { + groupingKeyOutputProjections_.resize(keyChannels_.size()); + std::iota( + groupingKeyOutputProjections_.begin(), + groupingKeyOutputProjections_.end(), + 0); + } else { + VELOX_CHECK_EQ(groupingKeyOutputProjections_.size(), keyChannels_.size()); + } + std::unordered_map channelUseCount; for (const auto& aggregate : aggregates_) { for (auto channel : aggregate.inputs) { @@ -124,17 +138,18 @@ std::unique_ptr GroupingSet::createForMarkDistinct( return std::make_unique( inputType, std::move(hashers), - /*preGroupedKeys*/ std::vector{}, - /*aggregates*/ std::vector{}, - /*ignoreNullKeys*/ false, - /*isPartial*/ false, - /*isRawInput*/ false, - /*globalGroupingSets*/ std::vector{}, - /*groupIdColumn*/ std::nullopt, - /*spillConfig*/ nullptr, + /*preGroupedKeys=*/std::vector{}, + /*groupingKeyOutputProjections=*/std::vector{}, + /*aggregates=*/std::vector{}, + /*ignoreNullKeys=*/false, + /*isPartial=*/false, + /*isRawInput=*/false, + /*globalGroupingSets=*/std::vector{}, + /*groupIdColumn=*/std::nullopt, + /*spillConfig=*/nullptr, nonReclaimableSection, operatorCtx, - /*spillStats_*/ nullptr); + /*spillStats=*/nullptr); }; namespace { @@ -302,7 +317,6 @@ void GroupingSet::addRemainingInput() { } namespace { - void initializeAggregates( const std::vector& aggregates, RowContainer& rows, @@ -758,10 +772,14 @@ void GroupingSet::extractGroups( return; } RowContainer& rows = *table_->rows(); - auto totalKeys = rows.keyTypes().size(); + const auto totalKeys = rows.keyTypes().size(); for (int32_t i = 0; i < totalKeys; ++i) { - auto keyVector = result->childAt(i); - rows.extractColumn(groups.data(), groups.size(), i, keyVector); + auto& keyVector = result->childAt(i); + rows.extractColumn( + groups.data(), + groups.size(), + groupingKeyOutputProjections_[i], + keyVector); } for (int32_t i = 0; i < aggregates_.size(); ++i) { if (!aggregates_[i].sortingKeys.empty()) { @@ -1330,7 +1348,8 @@ void GroupingSet::toIntermediate( } for (auto i = 0; i < keyChannels_.size(); ++i) { - result->childAt(i) = input->childAt(keyChannels_[i]); + const auto inputKeyChannel = keyChannels_[groupingKeyOutputProjections_[i]]; + result->childAt(i) = input->childAt(inputKeyChannel); } for (auto i = 0; i < aggregates_.size(); ++i) { auto& function = aggregates_[i].function; diff --git a/velox/exec/GroupingSet.h b/velox/exec/GroupingSet.h index a80d223c8520..fda8f4eea02f 100644 --- a/velox/exec/GroupingSet.h +++ b/velox/exec/GroupingSet.h @@ -32,6 +32,7 @@ class GroupingSet { const RowTypePtr& inputType, std::vector>&& hashers, std::vector&& preGroupedKeys, + std::vector&& groupingKeyOutputProjections, std::vector&& aggregates, bool ignoreNullKeys, bool isPartial, @@ -272,9 +273,14 @@ class GroupingSet { std::vector keyChannels_; - /// A subset of grouping keys on which the input is clustered. + // A subset of grouping keys on which the input is clustered. const std::vector preGroupedKeyChannels_; + // Provides the column projections for extracting the grouping keys from + // 'table_' for output. The vector index is the output channel and the value + // is the corresponding column index stored in 'table_'. + std::vector groupingKeyOutputProjections_; + std::vector> hashers_; const bool isGlobal_; const bool isPartial_; diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index ef072d059d2c..14e87f336ec2 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -14,7 +14,9 @@ * limitations under the License. */ #include "velox/exec/HashAggregation.h" + #include +#include "velox/exec/PrefixSort.h" #include "velox/exec/Task.h" #include "velox/expression/Expr.h" @@ -54,9 +56,13 @@ void HashAggregation::initialize() { VELOX_CHECK(pool()->trackUsage()); const auto& inputType = aggregationNode_->sources()[0]->outputType(); - auto hashers = - createVectorHashers(inputType, aggregationNode_->groupingKeys()); - auto numHashers = hashers.size(); + std::vector groupingKeyInputChannels; + std::vector groupingKeyOutputChannels; + setupGroupingKeyChannelProjections( + groupingKeyInputChannels, groupingKeyOutputChannels); + + auto hashers = createVectorHashers(inputType, groupingKeyInputChannels); + const auto numHashers = hashers.size(); std::vector preGroupedChannels; preGroupedChannels.reserve(aggregationNode_->preGroupedKeys().size()); @@ -82,7 +88,8 @@ void HashAggregation::initialize() { } for (auto i = 0; i < hashers.size(); ++i) { - identityProjections_.emplace_back(hashers[i]->channel(), i); + identityProjections_.emplace_back( + hashers[groupingKeyOutputChannels[i]]->channel(), i); } std::optional groupIdChannel; @@ -96,6 +103,7 @@ void HashAggregation::initialize() { inputType, std::move(hashers), std::move(preGroupedChannels), + std::move(groupingKeyOutputChannels), std::move(aggregateInfos), aggregationNode_->ignoreNullKeys(), isPartialOutput_, @@ -110,6 +118,54 @@ void HashAggregation::initialize() { aggregationNode_.reset(); } +void HashAggregation::setupGroupingKeyChannelProjections( + std::vector& groupingKeyInputChannels, + std::vector& groupingKeyOutputChannels) const { + VELOX_CHECK(groupingKeyInputChannels.empty()); + VELOX_CHECK(groupingKeyOutputChannels.empty()); + + const auto& inputType = aggregationNode_->sources()[0]->outputType(); + const auto& groupingKeys = aggregationNode_->groupingKeys(); + // The map from the grouping key output channel to the input channel. + // + // NOTE: grouping key output order is specified as 'groupingKeys' in + // 'aggregationNode_'. + std::vector groupingKeyProjections; + groupingKeyProjections.reserve(groupingKeys.size()); + for (auto i = 0; i < groupingKeys.size(); ++i) { + groupingKeyProjections.emplace_back( + exprToChannel(groupingKeys[i].get(), inputType), i); + } + + const bool reorderGroupingKeys = + canSpill() && spillConfig()->prefixSortEnabled(); + // If prefix sort is enabled, we need to sort the grouping key's layout in the + // grouping set to maximize the prefix sort acceleration if spill is + // triggered. The reorder stores the grouping key with smaller prefix sort + // encoded size first. + if (reorderGroupingKeys) { + PrefixSortLayout::optimizeSortKeysOrder(inputType, groupingKeyProjections); + } + + groupingKeyInputChannels.reserve(groupingKeys.size()); + for (auto i = 0; i < groupingKeys.size(); ++i) { + groupingKeyInputChannels.push_back(groupingKeyProjections[i].inputChannel); + } + + groupingKeyOutputChannels.resize(groupingKeys.size()); + if (!reorderGroupingKeys) { + // If there is no reorder, then grouping key output channels are the same as + // the column index order int he grouping set. + std::iota( + groupingKeyOutputChannels.begin(), groupingKeyOutputChannels.end(), 0); + return; + } + + for (auto i = 0; i < groupingKeys.size(); ++i) { + groupingKeyOutputChannels[groupingKeyProjections[i].outputChannel] = i; + } +} + bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const { VELOX_CHECK(isPartialOutput_ && !isGlobal_); return numInputRows_ > abandonPartialAggregationMinRows_ && @@ -328,7 +384,7 @@ RowVectorPtr HashAggregation::getDistinctOutput() { auto& lookup = groupingSet_->hashLookup(); const auto size = lookup.newGroups.size(); BufferPtr indices = allocateIndices(size, operatorCtx_->pool()); - auto indicesPtr = indices->asMutable(); + auto* indicesPtr = indices->asMutable(); std::copy(lookup.newGroups.begin(), lookup.newGroups.end(), indicesPtr); newDistincts_ = false; auto output = fillOutput(size, indices); diff --git a/velox/exec/HashAggregation.h b/velox/exec/HashAggregation.h index 1bf28c43428a..5cd44f77cb4b 100644 --- a/velox/exec/HashAggregation.h +++ b/velox/exec/HashAggregation.h @@ -72,6 +72,16 @@ class HashAggregation : public Operator { RowVectorPtr getDistinctOutput(); + // Setups the projections for accessing grouping keys stored in grouping + // set. + // For 'groupingKeyInputChannels', the index is the key column index from + // the grouping set, and the value is the key column channel from the input. + // For 'outputChannelProjections', the index is the key column channel from + // the output, and the value is the key column index from the grouping set. + void setupGroupingKeyChannelProjections( + std::vector& groupingKeyInputChannels, + std::vector& groupingKeyOutputChannels) const; + void updateEstimatedOutputRowSize(); std::shared_ptr aggregationNode_; diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index db48b26df11a..de06d78da65a 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -27,16 +27,16 @@ namespace facebook::velox::exec { -// Represents a column that is copied from input to output, possibly -// with cardinality change, i.e. values removed or duplicated. +/// Represents a column that is copied from input to output, possibly +/// with cardinality change, i.e. values removed or duplicated. struct IdentityProjection { IdentityProjection( column_index_t _inputChannel, column_index_t _outputChannel) : inputChannel(_inputChannel), outputChannel(_outputChannel) {} - const column_index_t inputChannel; - const column_index_t outputChannel; + column_index_t inputChannel; + column_index_t outputChannel; }; struct MemoryStats { diff --git a/velox/exec/PrefixSort.cpp b/velox/exec/PrefixSort.cpp index 8488194c72e8..b4dfb1f6e0d1 100644 --- a/velox/exec/PrefixSort.cpp +++ b/velox/exec/PrefixSort.cpp @@ -126,6 +126,7 @@ compareByWord(uint64_t* left, uint64_t* right, int32_t bytes) { } // namespace +// static. PrefixSortLayout PrefixSortLayout::makeSortLayout( const std::vector& types, const std::vector& compareFlags, @@ -169,6 +170,42 @@ PrefixSortLayout PrefixSortLayout::makeSortLayout( numPaddingBytes}; } +// static. +void PrefixSortLayout::optimizeSortKeysOrder( + const RowTypePtr& rowType, + std::vector& keyColumnProjections) { + std::vector> encodedKeySizes( + rowType->size(), std::nullopt); + for (const auto& projection : keyColumnProjections) { + encodedKeySizes[projection.inputChannel] = PrefixSortEncoder::encodedSize( + rowType->childAt(projection.inputChannel)->kind()); + } + + std::sort( + keyColumnProjections.begin(), + keyColumnProjections.end(), + [&](const IdentityProjection& lhs, const IdentityProjection& rhs) { + const auto& lhsEncodedSize = encodedKeySizes[lhs.inputChannel]; + const auto& rhsEncodedSize = encodedKeySizes[rhs.inputChannel]; + if (lhsEncodedSize.has_value() && !rhsEncodedSize.has_value()) { + return true; + } + if (!lhsEncodedSize.has_value() && rhsEncodedSize.has_value()) { + return false; + } + if (lhsEncodedSize.has_value() && rhsEncodedSize.has_value()) { + if (lhsEncodedSize.value() < rhsEncodedSize.value()) { + return true; + } + if (lhsEncodedSize.value() > rhsEncodedSize.value()) { + return false; + } + } + // Tie breaks with the original key column order. + return lhs.outputChannel < rhs.outputChannel; + }); +} + FOLLY_ALWAYS_INLINE int PrefixSort::compareAllNormalizedKeys( char* left, char* right) { @@ -236,12 +273,12 @@ uint32_t PrefixSort::maxRequiredBytes( const std::vector& compareFlags, const velox::common::PrefixSortConfig& config, memory::MemoryPool* pool) { - if (rowContainer->numRows() < config.threshold) { + if (rowContainer->numRows() < config.minNumRows) { return 0; } VELOX_CHECK_EQ(rowContainer->keyTypes().size(), compareFlags.size()); const auto sortLayout = PrefixSortLayout::makeSortLayout( - rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeySize); + rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeyBytes); if (!sortLayout.hasNormalizedKeys) { return 0; } @@ -303,6 +340,12 @@ void PrefixSort::sortInternal( PrefixSortRunner sortRunner(entrySize, swapBuffer->asMutable()); auto* prefixBufferStart = prefixBuffer; auto* prefixBufferEnd = prefixBuffer + numRows * entrySize; + if (sortLayout_.numNormalizedKeys > 0) { + addThreadLocalRuntimeStat( + PrefixSort::kNumPrefixSortKeys, + RuntimeCounter( + sortLayout_.numNormalizedKeys, RuntimeCounter::Unit::kNone)); + } if (sortLayout_.hasNonNormalizedKey) { sortRunner.quickSort( prefixBufferStart, prefixBufferEnd, [&](char* lhs, char* rhs) { diff --git a/velox/exec/PrefixSort.h b/velox/exec/PrefixSort.h index 2a8235c6e701..7ac34b430551 100644 --- a/velox/exec/PrefixSort.h +++ b/velox/exec/PrefixSort.h @@ -16,6 +16,7 @@ #pragma once #include "velox/common/base/PrefixSortConfig.h" +#include "velox/exec/Operator.h" #include "velox/exec/RowContainer.h" #include "velox/exec/prefixsort/PrefixSortAlgorithm.h" #include "velox/exec/prefixsort/PrefixSortEncoder.h" @@ -67,6 +68,17 @@ struct PrefixSortLayout { const std::vector& types, const std::vector& compareFlags, uint32_t maxNormalizedKeySize); + + /// Optimizes the order of sort key columns to maximize the number of prefix + /// sort keys for acceleration. This only applies for use case which doesn't + /// need a total order such as spill sort for hash aggregation. + /// 'keyColumnProjections' provides the mapping from the orginal key column + /// order to its channel in 'rowType'. The function reoders + /// 'keyColumnProjections' based on the prefix sort encoded size of each key + /// column type with smaller size first. + static void optimizeSortKeysOrder( + const RowTypePtr& rowType, + std::vector& keyColumnProjections); }; class PrefixSort { @@ -105,14 +117,14 @@ class PrefixSort { const velox::common::PrefixSortConfig& config, memory::MemoryPool* pool, std::vector>& rows) { - if (rowContainer->numRows() < config.threshold) { + if (rowContainer->numRows() < config.minNumRows) { stdSort(rows, rowContainer, compareFlags); return; } VELOX_CHECK_EQ(rowContainer->keyTypes().size(), compareFlags.size()); const auto sortLayout = PrefixSortLayout::makeSortLayout( - rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeySize); + rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeyBytes); // All keys can not normalize, skip the binary string compare opt. // Putting this outside sort-internal helps with stdSort. if (!sortLayout.hasNormalizedKeys) { @@ -133,6 +145,10 @@ class PrefixSort { const velox::common::PrefixSortConfig& config, memory::MemoryPool* pool); + /// The runtime stats name collected for prefix sort. + /// The number of prefix sort keys. + static inline const std::string kNumPrefixSortKeys{"numPrefixSortKeys"}; + private: /// Fallback to stdSort when prefix sort conditions such as config and memory /// are not satisfied. stdSort provides >2X performance win than std::sort for diff --git a/velox/exec/VectorHasher.cpp b/velox/exec/VectorHasher.cpp index a98e3d9b3d46..dc4c921fb035 100644 --- a/velox/exec/VectorHasher.cpp +++ b/velox/exec/VectorHasher.cpp @@ -896,14 +896,25 @@ std::vector> createVectorHashers( const RowTypePtr& rowType, const std::vector& keys) { const auto numKeys = keys.size(); - - std::vector> hashers; - hashers.reserve(numKeys); + std::vector keyChannels; + keyChannels.reserve(numKeys); for (const auto& key : keys) { const auto channel = exprToChannel(key.get(), rowType); - hashers.push_back(VectorHasher::create(key->type(), channel)); + keyChannels.push_back(channel); } + return createVectorHashers(rowType, keyChannels); +} +std::vector> createVectorHashers( + const RowTypePtr& rowType, + const std::vector& keyChannels) { + const auto numKeys = keyChannels.size(); + std::vector> hashers; + hashers.reserve(numKeys); + for (const auto& keyChannel : keyChannels) { + hashers.push_back( + VectorHasher::create(rowType->childAt(keyChannel), keyChannel)); + } return hashers; } diff --git a/velox/exec/VectorHasher.h b/velox/exec/VectorHasher.h index 425fe267bd8a..5ef4899673e5 100644 --- a/velox/exec/VectorHasher.h +++ b/velox/exec/VectorHasher.h @@ -709,6 +709,10 @@ std::vector> createVectorHashers( const RowTypePtr& rowType, const std::vector& keys); +std::vector> createVectorHashers( + const RowTypePtr& rowType, + const std::vector& keyChannels); + } // namespace facebook::velox::exec #include "velox/exec/VectorHasher-inl.h" diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 37e304ceaa78..dc4724ace32b 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -28,7 +28,9 @@ #include "velox/exec/Aggregate.h" #include "velox/exec/GroupingSet.h" #include "velox/exec/PlanNodeStats.h" +#include "velox/exec/PrefixSort.h" #include "velox/exec/Values.h" +#include "velox/exec/prefixsort/PrefixSortEncoder.h" #include "velox/exec/tests/utils/ArbitratorTestUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/OperatorTestBase.h" @@ -960,6 +962,42 @@ TEST_F(AggregationTest, partialDistinctWithAbandon) { .assertResults("SELECT distinct c0, sum(c0) FROM tmp group by c0"); } +TEST_F(AggregationTest, distinctWithGroupingKeysReordered) { + rowType_ = ROW( + {"c0", "c1", "c2", "c3"}, {BIGINT(), INTEGER(), VARCHAR(), VARCHAR()}); + + const int vectorSize = 2'000; + VectorFuzzer::Options options; + options.vectorSize = vectorSize; + options.stringVariableLength = false; + options.stringLength = 128; + VectorFuzzer fuzzer(options, pool()); + const int numVectors{5}; + std::vector vectors; + for (int i = 0; i < numVectors; ++i) { + vectors.push_back(fuzzer.fuzzRow(rowType_)); + } + + createDuckDbTable(vectors); + + // Distinct aggregation with grouping key with larger prefix encoded size + // first. + auto spillDirectory = exec::test::TempDirectoryPath::create(); + auto task = AssertQueryBuilder(duckDbQueryRunner_) + .config(QueryConfig::kAbandonPartialAggregationMinRows, 100) + .config(QueryConfig::kAbandonPartialAggregationMinPct, 50) + .spillDirectory(spillDirectory->getPath()) + .config(QueryConfig::kSpillEnabled, true) + .config(QueryConfig::kAggregationSpillEnabled, true) + .config(QueryConfig::kSpillPrefixSortEnabled, true) + .config("max_drivers_per_task", 1) + .plan(PlanBuilder() + .values(vectors) + .singleAggregation({"c2", "c0"}, {}) + .planNode()) + .assertResults("SELECT distinct c2, c0 FROM tmp"); +} + TEST_F(AggregationTest, largeValueRangeArray) { // We have keys that map to integer range. The keys are // a little under max array hash table size apart. This wastes 16MB of @@ -1957,6 +1995,201 @@ TEST_F(AggregationTest, spillingForAggrsWithSorting) { plan, "SELECT c0 % 7, array_agg(c1 ORDER BY c1) FROM tmp GROUP BY 1"); } +TEST_F(AggregationTest, spillPrefixSortOptimization) { + const RowTypePtr rowType{ + ROW({"c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"}, + {BIGINT(), + SMALLINT(), + VARCHAR(), + TINYINT(), + INTEGER(), + BIGINT(), + REAL(), + DOUBLE(), + VARCHAR()})}; + auto vectors = makeVectors(rowType, 1024, 2); + int64_t groupingKeyValue{0}; + for (auto& vector : vectors) { + auto groupingVector = BaseVector::create( + vector->childAt(0)->type(), vector->childAt(0)->size(), pool_.get()); + auto* flatGroupingKeyVector = groupingVector->asFlatVector(); + for (auto i = 0; i < flatGroupingKeyVector->size(); ++i) { + flatGroupingKeyVector->set(i, groupingKeyValue++); + } + vector->childAt(0) = groupingVector; + } + + createDuckDbTable(vectors); + struct { + bool prefixSortSpillEnabled; + uint32_t maxNormalizedKeyBytes; + uint32_t minNumRows; + uint32_t expectedNumPrefixSortKeys; + + std::string debugString() const { + return fmt::format( + "prefixSortSpillEnabled {}, maxNormalizedKeyBytes {}, minNumRows {}, expectedNumPrefixSortKeys {}", + prefixSortSpillEnabled, + maxNormalizedKeyBytes, + minNumRows, + expectedNumPrefixSortKeys); + } + } testSettings[] = { + {true, 0, 0, 0}, + {false, 0, 0, 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() - + 1, + 0, + 0}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() - + 1, + 0, + 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value(), + 0, + 1}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value(), + 0, + 0}, + {true, 1'000'000, 0, 3}, + {false, 1'000'000, 0, 0}, + {true, 1'000'000, 1'000'000, 0}, + {false, 1'000'000, 1'000'000, 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 0, + 2}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 0, + 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 1'000'000, + 0}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 1'000'000, + 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT).value(), + 0, + 2}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT).value(), + 0, + 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT) + .value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value() - + 1, + 0, + 2}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT) + .value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value() - + 1, + 0, + 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT) + .value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 0, + 3}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT) + .value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 0, + 0}}; + + for (const auto& testData : testSettings) { + auto spillDirectory = exec::test::TempDirectoryPath::create(); + + core::PlanNodeId aggrNodeId; + + auto testPlan = [&](const core::PlanNodePtr& plan, const std::string& sql) { + SCOPED_TRACE(sql); + TestScopedSpillInjection scopedSpillInjection(100); + auto task = AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->getPath()) + .config(QueryConfig::kSpillEnabled, true) + .config(QueryConfig::kAggregationSpillEnabled, true) + .config( + QueryConfig::kSpillPrefixSortEnabled, + testData.prefixSortSpillEnabled) + .config( + QueryConfig::kPrefixSortMinRows, + std::to_string(testData.minNumRows)) + .config( + QueryConfig::kPrefixSortNormalizedKeyMaxBytes, + std::to_string(testData.maxNormalizedKeyBytes)) + .plan(plan) + .assertResults(sql); + + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& stats = taskStats.at(aggrNodeId); + checkSpillStats(stats, true); + if (testData.expectedNumPrefixSortKeys > 0) { + ASSERT_GE( + stats.customStats.at(PrefixSort::kNumPrefixSortKeys).sum, + testData.expectedNumPrefixSortKeys); + ASSERT_EQ( + stats.customStats.at(PrefixSort::kNumPrefixSortKeys).max, + testData.expectedNumPrefixSortKeys); + ASSERT_EQ( + stats.customStats.at(PrefixSort::kNumPrefixSortKeys).min, + testData.expectedNumPrefixSortKeys); + } else { + ASSERT_EQ(stats.customStats.count(PrefixSort::kNumPrefixSortKeys), 0); + } + OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); + }; + + auto plan = PlanBuilder() + .values(vectors) + .singleAggregation( + {"c8", "c2", "c1", "c0", "c4", "c3"}, + {"max(c8)", + "max(c2)", + "sum(c1)", + "sum(c0)", + "min(c4)", + "max(c3)"}, + {}) + .capturePlanNodeId(aggrNodeId) + .planNode(); + testPlan( + plan, + "SELECT c8, c2, c1, c0, c4, c3, max(c8), max(c2), sum(c1), sum(c0), min(c4), max(c3) FROM tmp GROUP BY 1, 2, 3, 4, 5, 6"); + } +} + TEST_F(AggregationTest, preGroupedAggregationWithSpilling) { std::vector vectors; int64_t val = 0; diff --git a/velox/exec/tests/PrefixSortTest.cpp b/velox/exec/tests/PrefixSortTest.cpp index 248d1473bd0d..9bad8aeb035c 100644 --- a/velox/exec/tests/PrefixSortTest.cpp +++ b/velox/exec/tests/PrefixSortTest.cpp @@ -325,5 +325,51 @@ TEST_F(PrefixSortTest, checkMaxNormalizedKeySizeForMultipleKeys) { ASSERT_EQ(sortLayoutTwoKeys.prefixOffsets[0], 0); ASSERT_EQ(sortLayoutTwoKeys.prefixOffsets[1], 9); } + +TEST_F(PrefixSortTest, optimizeSortKeysOrder) { + struct { + RowTypePtr inputType; + std::vector keyChannels; + std::vector expectedSortedKeyChannels; + + std::string debugString() const { + return fmt::format( + "inputType {}, keyChannels {}, expectedSortedKeyChannels {}", + inputType->toString(), + fmt::join(keyChannels, ":"), + fmt::join(expectedSortedKeyChannels, ":")); + } + } testSettings[] = { + {ROW({BIGINT(), BIGINT()}), {0, 1}, {0, 1}}, + {ROW({BIGINT(), BIGINT()}), {1, 0}, {1, 0}}, + {ROW({BIGINT(), BIGINT(), BIGINT()}), {1, 0}, {1, 0}}, + {ROW({BIGINT(), BIGINT(), BIGINT()}), {1, 0}, {1, 0}}, + {ROW({BIGINT(), SMALLINT(), BIGINT()}), {0, 1}, {1, 0}}, + {ROW({BIGINT(), SMALLINT(), VARCHAR()}), {0, 1, 2}, {1, 0, 2}}, + {ROW({TINYINT(), BIGINT(), VARCHAR(), TINYINT(), INTEGER(), VARCHAR()}), + {2, 1, 0, 4, 5, 3}, + {4, 1, 2, 0, 5, 3}}, + {ROW({INTEGER(), BIGINT(), VARCHAR(), TINYINT(), INTEGER(), VARCHAR()}), + {5, 4, 3, 2, 1, 0}, + {4, 0, 1, 5, 3, 2}}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + std::vector projections; + for (auto i = 0; i < testData.keyChannels.size(); ++i) { + projections.emplace_back(testData.keyChannels[i], i); + } + PrefixSortLayout::optimizeSortKeysOrder(testData.inputType, projections); + std::unordered_set outputChannelSet; + for (auto i = 0; i < projections.size(); ++i) { + ASSERT_EQ( + projections[i].inputChannel, testData.expectedSortedKeyChannels[i]); + ASSERT_EQ( + testData.keyChannels[projections[i].outputChannel], + projections[i].inputChannel); + } + } +} } // namespace } // namespace facebook::velox::exec::prefixsort::test diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index 8ebb04f9b0fb..f021ce5c5c24 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -31,6 +31,23 @@ using namespace facebook::velox; using namespace facebook::velox::memory; namespace facebook::velox::functions::test { +namespace { +// Class to write runtime stats in the tests to the stats container. +class TestRuntimeStatWriter : public BaseRuntimeStatWriter { + public: + explicit TestRuntimeStatWriter( + std::unordered_map& stats) + : stats_{stats} {} + + void addRuntimeStat(const std::string& name, const RuntimeCounter& value) + override { + addOperatorRuntimeStats(name, value, stats_); + } + + private: + std::unordered_map& stats_; +}; +} // namespace class SortBufferTest : public OperatorTestBase, public testing::WithParamInterface { @@ -39,6 +56,8 @@ class SortBufferTest : public OperatorTestBase, OperatorTestBase::SetUp(); filesystems::registerLocalFileSystem(); rng_.seed(123); + statWriter_ = std::make_unique(stats_); + setThreadLocalRunTimeStatWriter(statWriter_.get()); } void TearDown() override { @@ -69,7 +88,9 @@ class SortBufferTest : public OperatorTestBase, const bool enableSpillPrefixSort_{GetParam()}; const velox::common::PrefixSortConfig prefixSortConfig_ = - velox::common::PrefixSortConfig{std::numeric_limits::max(), 130}; + velox::common::PrefixSortConfig{ + std::numeric_limits::max(), + GetParam() ? 8 : std::numeric_limits::max()}; const std::optional spillPrefixSortConfig_ = enableSpillPrefixSort_ ? std::optional(prefixSortConfig_) @@ -101,9 +122,32 @@ class SortBufferTest : public OperatorTestBase, tsan_atomic nonReclaimableSection_{false}; folly::Random::DefaultGenerator rng_; + std::unordered_map stats_; + std::unique_ptr statWriter_; }; TEST_P(SortBufferTest, singleKey) { + const RowVectorPtr data = makeRowVector( + {makeFlatVector({1, 2, 3, 4, 5, 6, 8, 10, 12, 15}), + makeFlatVector( + {17, 16, 15, 14, 13, 10, 8, 7, 4, 3}), // sorted column + makeFlatVector({1, 2, 3, 4, 5, 6, 8, 10, 12, 15}), + makeFlatVector( + {1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 10.1}), + makeFlatVector( + {1.1, 2.2, 2.2, 5.5, 5.5, 6.6, 7.7, 8.8, 9.9, 10.1}), + makeFlatVector( + {"hello", + "world", + "today", + "is", + "great", + "hello", + "world", + "is", + "great", + "today"})}); + struct { std::vector sortCompareFlags; std::vector expectedResult; @@ -124,12 +168,12 @@ TEST_P(SortBufferTest, singleKey) { true, false, CompareFlags::NullHandlingMode::kNullAsValue}}, // Ascending - {1, 2, 3, 4, 5}}, + {3, 4, 7, 8, 10, 13, 14, 15, 16, 17}}, {{{true, false, false, CompareFlags::NullHandlingMode::kNullAsValue}}, // Descending - {5, 4, 3, 2, 1}}}; + {17, 16, 15, 14, 13, 10, 8, 7, 4, 3}}}; // Specifies the sort columns ["c1"]. sortColumnIndices_ = {1}; @@ -143,25 +187,30 @@ TEST_P(SortBufferTest, singleKey) { &nonReclaimableSection_, prefixSortConfig_); - RowVectorPtr data = makeRowVector( - {makeFlatVector({1, 2, 3, 4, 5}), - makeFlatVector({5, 4, 3, 2, 1}), // sorted column - makeFlatVector({1, 2, 3, 4, 5}), - makeFlatVector({1.1, 2.2, 3.3, 4.4, 5.5}), - makeFlatVector({1.1, 2.2, 2.2, 5.5, 5.5}), - makeFlatVector( - {"hello", "world", "today", "is", "great"})}); - sortBuffer->addInput(data); sortBuffer->noMoreInput(); auto output = sortBuffer->getOutput(10000); - ASSERT_EQ(output->size(), 5); + ASSERT_EQ(output->size(), 10); int resultIndex = 0; for (int expectedValue : testData.expectedResult) { ASSERT_EQ( output->childAt(1)->asFlatVector()->valueAt(resultIndex++), expectedValue); } + if (GetParam()) { + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).sum, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).max, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).min, + sortColumnIndices_.size()); + } else { + ASSERT_EQ(stats_.count(PrefixSort::kNumPrefixSortKeys), 0); + } + stats_.clear(); } } @@ -175,23 +224,54 @@ TEST_P(SortBufferTest, multipleKeys) { prefixSortConfig_); RowVectorPtr data = makeRowVector( - {makeFlatVector({1, 2, 3, 4, 5}), - makeFlatVector({5, 4, 3, 2, 1}), // sorted-2 column - makeFlatVector({1, 2, 3, 4, 5}), - makeFlatVector({1.1, 2.2, 3.3, 4.4, 5.5}), - makeFlatVector({1.1, 2.2, 2.2, 5.5, 5.5}), // sorted-1 column + {makeFlatVector({1, 2, 3, 4, 5, 6, 8, 10, 12, 15}), + makeFlatVector( + {15, 12, 9, 8, 7, 6, 5, 4, 3, 1}), // sorted-2 column + makeFlatVector({1, 2, 3, 4, 5, 6, 8, 10, 12, 15}), + makeFlatVector( + {1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 10.1}), + makeFlatVector( + {1.1, 2.2, 2.2, 5.5, 5.5, 7.7, 8.1, 8, 8.1, 8.1, 10.0}), // sorted-1 + // column makeFlatVector( - {"hello", "world", "today", "is", "great"})}); + {"hello", + "world", + "today", + "is", + "great", + "hello", + "world", + "is", + "sort", + "sorted"})}); sortBuffer->addInput(data); sortBuffer->noMoreInput(); auto output = sortBuffer->getOutput(10000); - ASSERT_EQ(output->size(), 5); - ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(0), 5); - ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(1), 3); - ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(2), 4); - ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(3), 1); - ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(4), 2); + ASSERT_EQ(output->size(), 10); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(0), 15); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(1), 9); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(2), 12); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(3), 7); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(4), 8); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(5), 6); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(6), 4); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(7), 1); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(8), 3); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(9), 5); + if (GetParam()) { + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).sum, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).max, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).min, + sortColumnIndices_.size()); + } else { + ASSERT_EQ(stats_.count(PrefixSort::kNumPrefixSortKeys), 0); + } } // TODO: enable it later with test utility to compare the sorted result. @@ -267,6 +347,7 @@ TEST_P(SortBufferTest, DISABLED_randomData) { inputVectors.push_back(input); } sortBuffer->noMoreInput(); + stats_.clear(); // todo: have a utility function buildExpectedSortResult and verify the // sorting result for random data. } @@ -469,6 +550,20 @@ TEST_P(SortBufferTest, spill) { memory::spillMemoryPool()->stats().peakBytes, peakSpillMemoryUsage); } } + if (GetParam()) { + ASSERT_GE( + stats_.at(PrefixSort::kNumPrefixSortKeys).sum, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).max, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).min, + sortColumnIndices_.size()); + } else { + ASSERT_EQ(stats_.count(PrefixSort::kNumPrefixSortKeys), 0); + } + stats_.clear(); } }