diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index 2b0111353f89..15dea54cc809 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -303,7 +303,7 @@ std::optional HashProbe::estimatedRowSize( varSizeListColumnsStats.reserve(varSizedColumns.size()); for (uint32_t i = 0; i < varSizedColumns.size(); ++i) { auto statsOpt = columnStats(varSizedColumns[i]); - if (!statsOpt.has_value()) { + if (!statsOpt.has_value() || !statsOpt->isMinMaxColumnStatsValid()) { return std::nullopt; } varSizeListColumnsStats.push_back(statsOpt.value()); diff --git a/velox/exec/RowContainer.cpp b/velox/exec/RowContainer.cpp index 74d49266b0fd..e8d4469dcf1a 100644 --- a/velox/exec/RowContainer.cpp +++ b/velox/exec/RowContainer.cpp @@ -188,7 +188,6 @@ RowContainer::RowContainer( if (nullableKeys_) { ++nullOffset; } - columnHasNulls_.push_back(false); } // Make offset at least sizeof pointer so that there is space for a // free list next pointer below the bit at 'freeFlagOffset_'. @@ -217,7 +216,6 @@ RowContainer::RowContainer( nullOffsets_.push_back(nullOffset); ++nullOffset; isVariableWidth |= !type->isFixedWidth(); - columnHasNulls_.push_back(false); } if (hasProbedFlag) { nullOffsets_.push_back(nullOffset); @@ -336,6 +334,27 @@ char* RowContainer::initializeRow(char* row, bool reuse) { return row; } +/** + * Removes the data of a given row from the column stats by iterating + * over each column in the row and updates the column stats. + * + * @param row - The row from which the column stats are to be updated. + */ +void RowContainer::removeRowFromColumnStats(const char* row) { + // Update row column stats accordingly + for (auto c = 0; c < keyTypes_.size(); ++c) { + auto byteSize = typeKindSize(typeKinds_[c]); + if (isNullAt(row, columnAt(c))) { + rowColumnsStats_[c].removeCellSize(0, true); + } else if (types_[c]->isFixedWidth()) { + rowColumnsStats_[c].removeCellSize(fixedSizeAt(c), false); + } else { + rowColumnsStats_[c].removeCellSize(variableSizeAt(row, c), false); + } + } + invalidateMinMaxColumnStats(); +} + void RowContainer::eraseRows(folly::Range rows) { freeRowsExtraMemory(rows, /*freeNextRowVector=*/true); for (auto* row : rows) { @@ -343,9 +362,10 @@ void RowContainer::eraseRows(folly::Range rows) { bits::setBit(row, freeFlagOffset_); nextFree(row) = firstFreeRow_; firstFreeRow_ = row; + + removeRowFromColumnStats(row); } numFreeRows_ += rows.size(); - invalidateColumnStats(); } int32_t RowContainer::findRows(folly::Range rows, char** result) const { @@ -466,13 +486,6 @@ void RowContainer::freeRowsExtraMemory( numRows_ -= rows.size(); } -void RowContainer::invalidateColumnStats() { - if (rowColumnsStats_.empty()) { - return; - } - rowColumnsStats_.clear(); -} - // static RowColumn::Stats RowColumn::Stats::merge( const std::vector& statsList) { @@ -816,7 +829,6 @@ void RowContainer::storeComplexType( if (decoded.isNullAt(index)) { VELOX_DCHECK(nullMask); row[nullByte] |= nullMask; - updateColumnHasNulls(column, true); return; } RowSizeTracker tracker(row[rowSizeOffset_], *stringAllocator_); diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index 72cd08e276dd..330232bd30c1 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -187,6 +187,19 @@ class RowColumn { ++nullCount_; } + void removeCellSize(int32_t bytes, bool isNull) { + // we only update nullCount, nonNullCount, and numBytes + // when the cell is removed. Because min/max need the + // full column data and not recorded in stats. + if (isNull) { + --nullCount_; + } else { + --nonNullCount_; + sumBytes_ -= bytes; + } + invalidateMinMaxColumnStats(); + } + int32_t maxBytes() const { return maxBytes_; } @@ -218,6 +231,14 @@ class RowColumn { return nullCount_ + nonNullCount_; } + void invalidateMinMaxColumnStats() { + minMaxStatsValid_ = false; + } + + bool isMinMaxColumnStatsValid() const { + return minMaxStatsValid_; + } + /// Merges multiple aggregated stats of the same column into a single one. static Stats merge(const std::vector& statsList); @@ -229,6 +250,7 @@ class RowColumn { uint32_t nonNullCount_{0}; uint32_t nullCount_{0}; + bool minMaxStatsValid_{true}; }; private: @@ -316,6 +338,8 @@ class RowContainer { : 0); } + void removeRowFromColumnStats(const char* row); + /// Sets all fields, aggregates, keys and dependents to null. Used when making /// a row with uninitialized keys for aggregates with no-op partial /// aggregation. @@ -324,7 +348,7 @@ class RowContainer { memset(row + nullByte(nullOffsets_[0]), 0xff, initialNulls_.size()); bits::clearBit(row, freeFlagOffset_); } - invalidateColumnStats(); + removeRowFromColumnStats(row); } /// The row size excluding any out-of-line stored variable length values. @@ -817,13 +841,22 @@ class RowContainer { /// invalidated. Any row erase operations will invalidate column stats. std::optional columnStats(int32_t columnIndex) const; + uint32_t columnNullCount(int32_t columnIndex) { + return rowColumnsStats_[columnIndex].nullCount(); + } + + bool isMinMaxColumnStatsValid() const { + return rowColumnsStatsMinMaxValid_; + } + const auto& keyTypes() const { return keyTypes_; } - /// Returns true if specified column may have nulls, false otherwise. + /// Returns true if specified column has nulls, false otherwise. inline bool columnHasNulls(int32_t columnIndex) const { - return columnHasNulls_[columnIndex]; + return columnStats(columnIndex).has_value() && + columnStats(columnIndex)->nullCount() > 0; } const std::vector& accumulators() const { @@ -1015,7 +1048,6 @@ class RowContainer { // Do not leave an uninitialized value in the case of a // null. This is an error with valgrind/asan. *reinterpret_cast(row + offset) = T(); - updateColumnHasNulls(columnIndex, true); return; } if constexpr (std::is_same_v) { @@ -1466,14 +1498,10 @@ class RowContainer { char* row, int32_t columnIndex); - // Light weight aggregated column stats does not support row erasures. This + // Min/max column stats do not support row erasures. This // method is called whenever a row is erased. - void invalidateColumnStats(); - - // Updates the specific column's columnHasNulls_ flag, if 'hasNulls' is true. - // columnHasNulls_ flag is false by default. - inline void updateColumnHasNulls(int32_t columnIndex, bool hasNulls) { - columnHasNulls_[columnIndex] = columnHasNulls_[columnIndex] || hasNulls; + void invalidateMinMaxColumnStats() { + rowColumnsStatsMinMaxValid_ = false; } const std::vector keyTypes_; @@ -1484,8 +1512,6 @@ class RowContainer { const std::unique_ptr stringAllocator_; - std::vector columnHasNulls_; - // Indicates if we can add new row to this row container. It is set to false // after user calls 'getRowPartitions()' to create 'rowPartitions' object for // parallel join build. @@ -1510,9 +1536,12 @@ class RowContainer { // Offset and null indicator offset of non-aggregate fields as a single word. // Corresponds pairwise to 'types_'. std::vector rowColumns_; - // Optional aggregated column stats(e.g. min/max size) for non-aggregate + // Aggregated column stats(e.g. min/max size) for non-aggregate // fields. Index aligns with 'rowColumns_'. std::vector rowColumnsStats_; + // Indicates if the min/max column stats are valid. This is set to false + // whenever a row is erased. + bool rowColumnsStatsMinMaxValid_ = true; // Bit offset of the probed flag for a full or right outer join payload. 0 if // not applicable. int32_t probedFlagOffset_ = 0; @@ -1640,7 +1669,6 @@ inline void RowContainer::storeWithNulls( if (decoded.isNullAt(rowIndex)) { row[nullByte] |= nullMask; memset(row + offset, 0, sizeof(int128_t)); - updateColumnHasNulls(columnIndex, true); return; } HugeInt::serialize(decoded.valueAt(rowIndex), row + offset); diff --git a/velox/exec/tests/RowContainerTest.cpp b/velox/exec/tests/RowContainerTest.cpp index a2040721f664..8e8cf35120bb 100644 --- a/velox/exec/tests/RowContainerTest.cpp +++ b/velox/exec/tests/RowContainerTest.cpp @@ -90,8 +90,10 @@ class RowContainerTestHelper { if (rowContainer_->types_[i]->isFixedWidth()) { continue; } - VELOX_CHECK_EQ(expectedStats.maxBytes(), storedStats.maxBytes()); - VELOX_CHECK_EQ(expectedStats.minBytes(), storedStats.minBytes()); + if (storedStats.isMinMaxColumnStatsValid()) { + VELOX_CHECK_EQ(expectedStats.maxBytes(), storedStats.maxBytes()); + VELOX_CHECK_EQ(expectedStats.minBytes(), storedStats.minBytes()); + } VELOX_CHECK_EQ(expectedStats.sumBytes(), storedStats.sumBytes()); VELOX_CHECK_EQ(expectedStats.avgBytes(), storedStats.avgBytes()); VELOX_CHECK_EQ( @@ -2596,6 +2598,22 @@ TEST_F(RowContainerTest, rowColumnStats) { EXPECT_EQ(stats.nonNullCount(), 6); EXPECT_EQ(stats.nullCount(), 4); EXPECT_EQ(stats.numCells(), 10); + + stats.removeCellSize(25, false); + EXPECT_EQ(stats.isMinMaxColumnStatsValid(), false); + EXPECT_EQ(stats.sumBytes(), 60); + EXPECT_EQ(stats.avgBytes(), 12); + EXPECT_EQ(stats.numCells(), 9); + EXPECT_EQ(stats.nonNullCount(), 5); + EXPECT_EQ(stats.nullCount(), 4); + + stats.removeCellSize(0, true); + EXPECT_EQ(stats.isMinMaxColumnStatsValid(), false); + EXPECT_EQ(stats.sumBytes(), 60); + EXPECT_EQ(stats.avgBytes(), 12); + EXPECT_EQ(stats.numCells(), 8); + EXPECT_EQ(stats.nonNullCount(), 5); + EXPECT_EQ(stats.nullCount(), 3); } TEST_F(RowContainerTest, storeAndCollectColumnStats) { @@ -2636,5 +2654,18 @@ TEST_F(RowContainerTest, storeAndCollectColumnStats) { EXPECT_EQ(stats.avgBytes(), 13); } } + + rowContainer->eraseRows(folly::Range(rows.data(), 10)); // there are 2 nulls + EXPECT_EQ(rowContainer->isMinMaxColumnStatsValid(), false); + for (int i = 0; i < rowContainer->columnTypes().size(); ++i) { + const auto stats = rowContainer->columnStats(i).value(); + EXPECT_EQ(stats.nonNullCount(), 849); + EXPECT_EQ(stats.nullCount(), 141); + EXPECT_EQ(stats.numCells(), kNumRows - 10); + if (rowVector->childAt(i)->typeKind() == TypeKind::VARCHAR) { + EXPECT_EQ(stats.sumBytes(), 11809); + EXPECT_EQ(stats.avgBytes(), 13); + } + } } } // namespace facebook::velox::exec::test