Skip to content

Commit

Permalink
feat: Separate null count and minmax from column stats (#11860)
Browse files Browse the repository at this point in the history
Summary:

PR to address #11741

- Removed the use of columnHasNulls in RowContainer and replaced them with row stats
- Separate null count/sumBytes from minmax. In the case of rows erasure, only min/max is invalidated.

Differential Revision: D67229925
  • Loading branch information
zation99 authored and facebook-github-bot committed Dec 14, 2024
1 parent 19c5771 commit 329f9a9
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 29 deletions.
2 changes: 1 addition & 1 deletion velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ std::optional<uint64_t> HashProbe::estimatedRowSize(
varSizeListColumnsStats.reserve(varSizedColumns.size());
for (uint32_t i = 0; i < varSizedColumns.size(); ++i) {
auto statsOpt = columnStats(varSizedColumns[i]);
if (!statsOpt.has_value()) {
if (!statsOpt.has_value() || !statsOpt->isMinMaxColumnStatsValid()) {
return std::nullopt;
}
varSizeListColumnsStats.push_back(statsOpt.value());
Expand Down
34 changes: 23 additions & 11 deletions velox/exec/RowContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ RowContainer::RowContainer(
if (nullableKeys_) {
++nullOffset;
}
columnHasNulls_.push_back(false);
}
// Make offset at least sizeof pointer so that there is space for a
// free list next pointer below the bit at 'freeFlagOffset_'.
Expand Down Expand Up @@ -217,7 +216,6 @@ RowContainer::RowContainer(
nullOffsets_.push_back(nullOffset);
++nullOffset;
isVariableWidth |= !type->isFixedWidth();
columnHasNulls_.push_back(false);
}
if (hasProbedFlag) {
nullOffsets_.push_back(nullOffset);
Expand Down Expand Up @@ -336,16 +334,38 @@ char* RowContainer::initializeRow(char* row, bool reuse) {
return row;
}

/**
* Removes the data of a given row from the column stats by iterating
* over each column in the row and updates the column stats.
*
* @param row - The row from which the column stats are to be updated.
*/
void RowContainer::removeRowFromColumnStats(const char* row) {
// Update row column stats accordingly
for (auto c = 0; c < keyTypes_.size(); ++c) {
auto byteSize = typeKindSize(typeKinds_[c]);
if (isNullAt(row, columnAt(c))) {
rowColumnsStats_[c].removeCellSize(0, true);
} else if (types_[c]->isFixedWidth()) {
rowColumnsStats_[c].removeCellSize(fixedSizeAt(c), false);
} else {
rowColumnsStats_[c].removeCellSize(variableSizeAt(row, c), false);
}
}
invalidateMinMaxColumnStats();
}

void RowContainer::eraseRows(folly::Range<char**> rows) {
freeRowsExtraMemory(rows, /*freeNextRowVector=*/true);
for (auto* row : rows) {
VELOX_CHECK(!bits::isBitSet(row, freeFlagOffset_), "Double free of row");
bits::setBit(row, freeFlagOffset_);
nextFree(row) = firstFreeRow_;
firstFreeRow_ = row;

removeRowFromColumnStats(row);
}
numFreeRows_ += rows.size();
invalidateColumnStats();
}

int32_t RowContainer::findRows(folly::Range<char**> rows, char** result) const {
Expand Down Expand Up @@ -466,13 +486,6 @@ void RowContainer::freeRowsExtraMemory(
numRows_ -= rows.size();
}

void RowContainer::invalidateColumnStats() {
if (rowColumnsStats_.empty()) {
return;
}
rowColumnsStats_.clear();
}

// static
RowColumn::Stats RowColumn::Stats::merge(
const std::vector<RowColumn::Stats>& statsList) {
Expand Down Expand Up @@ -816,7 +829,6 @@ void RowContainer::storeComplexType(
if (decoded.isNullAt(index)) {
VELOX_DCHECK(nullMask);
row[nullByte] |= nullMask;
updateColumnHasNulls(column, true);
return;
}
RowSizeTracker tracker(row[rowSizeOffset_], *stringAllocator_);
Expand Down
58 changes: 43 additions & 15 deletions velox/exec/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,19 @@ class RowColumn {
++nullCount_;
}

void removeCellSize(int32_t bytes, bool isNull) {
// we only update nullCount, nonNullCount, and numBytes
// when the cell is removed. Because min/max need the
// full column data and not recorded in stats.
if (isNull) {
--nullCount_;
} else {
--nonNullCount_;
sumBytes_ -= bytes;
}
invalidateMinMaxColumnStats();
}

int32_t maxBytes() const {
return maxBytes_;
}
Expand Down Expand Up @@ -218,6 +231,14 @@ class RowColumn {
return nullCount_ + nonNullCount_;
}

void invalidateMinMaxColumnStats() {
minMaxStatsValid_ = false;
}

bool isMinMaxColumnStatsValid() const {
return minMaxStatsValid_;
}

/// Merges multiple aggregated stats of the same column into a single one.
static Stats merge(const std::vector<Stats>& statsList);

Expand All @@ -229,6 +250,7 @@ class RowColumn {

uint32_t nonNullCount_{0};
uint32_t nullCount_{0};
bool minMaxStatsValid_{true};
};

private:
Expand Down Expand Up @@ -316,6 +338,8 @@ class RowContainer {
: 0);
}

void removeRowFromColumnStats(const char* row);

/// Sets all fields, aggregates, keys and dependents to null. Used when making
/// a row with uninitialized keys for aggregates with no-op partial
/// aggregation.
Expand All @@ -324,7 +348,7 @@ class RowContainer {
memset(row + nullByte(nullOffsets_[0]), 0xff, initialNulls_.size());
bits::clearBit(row, freeFlagOffset_);
}
invalidateColumnStats();
removeRowFromColumnStats(row);
}

/// The row size excluding any out-of-line stored variable length values.
Expand Down Expand Up @@ -817,13 +841,22 @@ class RowContainer {
/// invalidated. Any row erase operations will invalidate column stats.
std::optional<RowColumn::Stats> columnStats(int32_t columnIndex) const;

uint32_t columnNullCount(int32_t columnIndex) {
return rowColumnsStats_[columnIndex].nullCount();
}

bool isMinMaxColumnStatsValid() const {
return rowColumnsStatsMinMaxValid_;
}

const auto& keyTypes() const {
return keyTypes_;
}

/// Returns true if specified column may have nulls, false otherwise.
/// Returns true if specified column has nulls, false otherwise.
inline bool columnHasNulls(int32_t columnIndex) const {
return columnHasNulls_[columnIndex];
return columnStats(columnIndex).has_value() &&
columnStats(columnIndex)->nullCount() > 0;
}

const std::vector<Accumulator>& accumulators() const {
Expand Down Expand Up @@ -1015,7 +1048,6 @@ class RowContainer {
// Do not leave an uninitialized value in the case of a
// null. This is an error with valgrind/asan.
*reinterpret_cast<T*>(row + offset) = T();
updateColumnHasNulls(columnIndex, true);
return;
}
if constexpr (std::is_same_v<T, StringView>) {
Expand Down Expand Up @@ -1466,14 +1498,10 @@ class RowContainer {
char* row,
int32_t columnIndex);

// Light weight aggregated column stats does not support row erasures. This
// Min/max column stats do not support row erasures. This
// method is called whenever a row is erased.
void invalidateColumnStats();

// Updates the specific column's columnHasNulls_ flag, if 'hasNulls' is true.
// columnHasNulls_ flag is false by default.
inline void updateColumnHasNulls(int32_t columnIndex, bool hasNulls) {
columnHasNulls_[columnIndex] = columnHasNulls_[columnIndex] || hasNulls;
void invalidateMinMaxColumnStats() {
rowColumnsStatsMinMaxValid_ = false;
}

const std::vector<TypePtr> keyTypes_;
Expand All @@ -1484,8 +1512,6 @@ class RowContainer {

const std::unique_ptr<HashStringAllocator> stringAllocator_;

std::vector<bool> columnHasNulls_;

// Indicates if we can add new row to this row container. It is set to false
// after user calls 'getRowPartitions()' to create 'rowPartitions' object for
// parallel join build.
Expand All @@ -1510,9 +1536,12 @@ class RowContainer {
// Offset and null indicator offset of non-aggregate fields as a single word.
// Corresponds pairwise to 'types_'.
std::vector<RowColumn> rowColumns_;
// Optional aggregated column stats(e.g. min/max size) for non-aggregate
// Aggregated column stats(e.g. min/max size) for non-aggregate
// fields. Index aligns with 'rowColumns_'.
std::vector<RowColumn::Stats> rowColumnsStats_;
// Indicates if the min/max column stats are valid. This is set to false
// whenever a row is erased.
bool rowColumnsStatsMinMaxValid_ = true;
// Bit offset of the probed flag for a full or right outer join payload. 0 if
// not applicable.
int32_t probedFlagOffset_ = 0;
Expand Down Expand Up @@ -1640,7 +1669,6 @@ inline void RowContainer::storeWithNulls<TypeKind::HUGEINT>(
if (decoded.isNullAt(rowIndex)) {
row[nullByte] |= nullMask;
memset(row + offset, 0, sizeof(int128_t));
updateColumnHasNulls(columnIndex, true);
return;
}
HugeInt::serialize(decoded.valueAt<int128_t>(rowIndex), row + offset);
Expand Down
35 changes: 33 additions & 2 deletions velox/exec/tests/RowContainerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ class RowContainerTestHelper {
if (rowContainer_->types_[i]->isFixedWidth()) {
continue;
}
VELOX_CHECK_EQ(expectedStats.maxBytes(), storedStats.maxBytes());
VELOX_CHECK_EQ(expectedStats.minBytes(), storedStats.minBytes());
if (storedStats.isMinMaxColumnStatsValid()) {
VELOX_CHECK_EQ(expectedStats.maxBytes(), storedStats.maxBytes());
VELOX_CHECK_EQ(expectedStats.minBytes(), storedStats.minBytes());
}
VELOX_CHECK_EQ(expectedStats.sumBytes(), storedStats.sumBytes());
VELOX_CHECK_EQ(expectedStats.avgBytes(), storedStats.avgBytes());
VELOX_CHECK_EQ(
Expand Down Expand Up @@ -2596,6 +2598,22 @@ TEST_F(RowContainerTest, rowColumnStats) {
EXPECT_EQ(stats.nonNullCount(), 6);
EXPECT_EQ(stats.nullCount(), 4);
EXPECT_EQ(stats.numCells(), 10);

stats.removeCellSize(25, false);
EXPECT_EQ(stats.isMinMaxColumnStatsValid(), false);
EXPECT_EQ(stats.sumBytes(), 60);
EXPECT_EQ(stats.avgBytes(), 12);
EXPECT_EQ(stats.numCells(), 9);
EXPECT_EQ(stats.nonNullCount(), 5);
EXPECT_EQ(stats.nullCount(), 4);

stats.removeCellSize(0, true);
EXPECT_EQ(stats.isMinMaxColumnStatsValid(), false);
EXPECT_EQ(stats.sumBytes(), 60);
EXPECT_EQ(stats.avgBytes(), 12);
EXPECT_EQ(stats.numCells(), 8);
EXPECT_EQ(stats.nonNullCount(), 5);
EXPECT_EQ(stats.nullCount(), 3);
}

TEST_F(RowContainerTest, storeAndCollectColumnStats) {
Expand Down Expand Up @@ -2636,5 +2654,18 @@ TEST_F(RowContainerTest, storeAndCollectColumnStats) {
EXPECT_EQ(stats.avgBytes(), 13);
}
}

rowContainer->eraseRows(folly::Range(rows.data(), 10)); // there are 2 nulls
EXPECT_EQ(rowContainer->isMinMaxColumnStatsValid(), false);
for (int i = 0; i < rowContainer->columnTypes().size(); ++i) {
const auto stats = rowContainer->columnStats(i).value();
EXPECT_EQ(stats.nonNullCount(), 849);
EXPECT_EQ(stats.nullCount(), 141);
EXPECT_EQ(stats.numCells(), kNumRows - 10);
if (rowVector->childAt(i)->typeKind() == TypeKind::VARCHAR) {
EXPECT_EQ(stats.sumBytes(), 11809);
EXPECT_EQ(stats.avgBytes(), 13);
}
}
}
} // namespace facebook::velox::exec::test

0 comments on commit 329f9a9

Please sign in to comment.