From 516cc9b7d8bb3a9ed4fb8926c01e4dda165fd25b Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Mon, 21 Oct 2024 22:10:34 -0700 Subject: [PATCH] Support to clear cache in expr set (#11308) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11308 Support to clear the internally cached buffers from expr set which are allocated through velox memory pool. This is used by memory arbitration to reclaim memory from eval expression. Unit test is added to verify. Reviewed By: bikramSingh91 Differential Revision: D64546974 fbshipit-source-id: fef0b2d54d9aaa7afe82d36be77545d621ad7c1e --- velox/expression/ConjunctExpr.h | 8 +- velox/expression/Expr.cpp | 11 +- velox/expression/Expr.h | 26 ++-- velox/expression/SwitchExpr.h | 5 + velox/expression/tests/ExprEncodingsTest.cpp | 118 ++++++++++++------- 5 files changed, 114 insertions(+), 54 deletions(-) diff --git a/velox/expression/ConjunctExpr.h b/velox/expression/ConjunctExpr.h index 5fb6426632fc..5385ca03b8c6 100644 --- a/velox/expression/ConjunctExpr.h +++ b/velox/expression/ConjunctExpr.h @@ -67,6 +67,12 @@ class ConjunctExpr : public SpecialForm { std::string toSql( std::vector* complexConstants = nullptr) const override; + void clearCache() override { + Expr::clearCache(); + tempValues_.reset(); + tempNulls_.reset(); + } + private: static TypePtr resolveType(const std::vector& argTypes); @@ -89,8 +95,6 @@ class ConjunctExpr : public SpecialForm { // true if conjunction (and), false if disjunction (or). const bool isAnd_; - // Errors encountered before processing the current input. - FlatVectorPtr errors_; // temp space for nulls and values of inputs BufferPtr tempValues_; BufferPtr tempNulls_; diff --git a/velox/expression/Expr.cpp b/velox/expression/Expr.cpp index 5f7215103903..d4d0404cea1b 100644 --- a/velox/expression/Expr.cpp +++ b/velox/expression/Expr.cpp @@ -890,7 +890,6 @@ void Expr::evaluateSharedSubexpr( } else { // Otherwise, simply evaluate it and return without caching the results. eval(rows, context, result); - return; } } @@ -932,7 +931,7 @@ void Expr::evaluateSharedSubexpr( // Identify a subset of rows that need to be computed: rows - // sharedSubexprRows_. LocalSelectivityVector missingRowsHolder(context, rows); - auto missingRows = missingRowsHolder.get(); + auto* missingRows = missingRowsHolder.get(); missingRows->deselect(*sharedSubexprRows); VELOX_DCHECK(missingRows->hasSelections()); @@ -940,7 +939,7 @@ void Expr::evaluateSharedSubexpr( // Final selection of rows need to include sharedSubexprRows_, missingRows and // current final selection of rows if set. LocalSelectivityVector newFinalSelectionHolder(context, *sharedSubexprRows); - auto newFinalSelection = newFinalSelectionHolder.get(); + auto* newFinalSelection = newFinalSelectionHolder.get(); newFinalSelection->select(*missingRows); if (!context.isFinalSelection()) { newFinalSelection->select(*context.finalSelection()); @@ -1964,6 +1963,12 @@ void ExprSet::clear() { multiplyReferencedFields_.clear(); } +void ExprSet::clearCache() { + for (auto& expr : exprs_) { + expr->clearCache(); + } +} + void ExprSetSimplified::eval( int32_t begin, int32_t end, diff --git a/velox/expression/Expr.h b/velox/expression/Expr.h index cc6b0391ee83..863759914b37 100644 --- a/velox/expression/Expr.h +++ b/velox/expression/Expr.h @@ -265,6 +265,14 @@ class Expr { cachedDictionaryIndices_ = nullptr; } + virtual void clearCache() { + sharedSubexprResults_.clear(); + clearMemo(); + for (auto& input : inputs_) { + input->clearCache(); + } + } + const TypePtr& type() const { return type_; } @@ -393,7 +401,7 @@ class Expr { return vectorFunctionMetadata_; } - auto& inputValues() { + std::vector& inputValues() { return inputValues_; } @@ -595,11 +603,11 @@ class Expr { // The distinct references to input columns in 'inputs_' // subtrees. Empty if this is the same as 'distinctFields_' of // parent Expr. - std::vector distinctFields_; + std::vector distinctFields_; // Fields referenced by multiple inputs, which is subset of distinctFields_. // Used to determine pre-loading of lazy vectors at current expr. - std::unordered_set multiplyReferencedFields_; + std::unordered_set multiplyReferencedFields_; // True if a null in any of 'distinctFields_' causes 'this' to be // null for the row. @@ -693,8 +701,7 @@ class Expr { }; /// Generate a selectivity vector of a single row. -SelectivityVector* FOLLY_NONNULL -singleRow(LocalSelectivityVector& holder, vector_size_t row); +SelectivityVector* singleRow(LocalSelectivityVector& holder, vector_size_t row); using ExprPtr = std::shared_ptr; @@ -736,6 +743,11 @@ class ExprSet { void clear(); + /// Clears the internally cached buffers used for shared sub-expressions and + /// dictionary memoization which are allocated through memory pool. This is + /// used by memory arbitration to reclaim memory. + void clearCache(); + core::ExecCtx* execCtx() const { return execCtx_; } @@ -785,10 +797,10 @@ class ExprSet { std::vector> exprs_; // The distinct references to input columns among all expressions in ExprSet. - std::vector distinctFields_; + std::vector distinctFields_; // Fields referenced by multiple expressions in ExprSet. - std::unordered_set multiplyReferencedFields_; + std::unordered_set multiplyReferencedFields_; // Distinct Exprs reachable from 'exprs_' for which reset() needs to // be called at the start of eval(). diff --git a/velox/expression/SwitchExpr.h b/velox/expression/SwitchExpr.h index 560b3a2e2e74..cddbcbce64b8 100644 --- a/velox/expression/SwitchExpr.h +++ b/velox/expression/SwitchExpr.h @@ -52,6 +52,11 @@ class SwitchExpr : public SpecialForm { return true; } + void clearCache() override { + Expr::clearCache(); + tempValues_.reset(); + } + private: static TypePtr resolveType(const std::vector& argTypes); diff --git a/velox/expression/tests/ExprEncodingsTest.cpp b/velox/expression/tests/ExprEncodingsTest.cpp index 7eb5b2e91d0c..0cb706072ee1 100644 --- a/velox/expression/tests/ExprEncodingsTest.cpp +++ b/velox/expression/tests/ExprEncodingsTest.cpp @@ -355,51 +355,83 @@ class ExprEncodingsTest /// Evaluates 'text' expression on 'testDataRow()' twice. First, evaluates the /// expression on the first 2/3 of the rows. Then, evaluates the expression on - /// the last 1/3 of the rows. + /// the last 1/3 of the rows. It also performs an iteration which clears all + /// caches utilized during expression evaluation, such as shared + /// sub-expressions, memoization, vector pool, and error vector, and then + /// confirms that the memory usage before and after evaluation is consistent template void run( const std::string& text, std::function(int32_t)> reference) { - auto source = {parseExpression(text, testDataType_)}; - auto exprSet = std::make_unique(source, execCtx_.get()); - auto row = testDataRow(); - exec::EvalCtx context(execCtx_.get(), exprSet.get(), row.get()); - auto size = row->size(); - - auto expectedResult = makeFlatVector( - size, - [&](auto row) { - auto v = reference(row); - return v.has_value() ? v.value() : T(); - }, - [&](auto row) { return !reference(row).has_value(); }); - - SelectivityVector allRows(size); - *context.mutableIsFinalSelection() = false; - *context.mutableFinalSelection() = &allRows; - - { - vector_size_t begin = 0; - vector_size_t end = size / 3 * 2; - auto rows = selectRange(begin, end); - std::vector result(1); - exprSet->eval(rows, context, result); - - SCOPED_TRACE(text); - SCOPED_TRACE(fmt::format("[{} - {})", begin, end)); - assertEqualRows(expectedResult, result[0], rows); - } - - { - vector_size_t begin = size / 3; - vector_size_t end = size; - auto rows = selectRange(begin, end); - std::vector result(1); - exprSet->eval(0, 1, false, rows, context, result); + for (auto clearCache : {false, true}) { + SCOPED_TRACE(fmt::format("clearCache: {}", clearCache)); + + const auto source = {parseExpression(text, testDataType_)}; + auto exprSet = std::make_unique(source, execCtx_.get()); + const auto row = testDataRow(); + exec::EvalCtx context(execCtx_.get(), exprSet.get(), row.get()); + context.vectorPool()->clear(); + const auto size = row->size(); + + auto expectedResult = makeFlatVector( + size, + [&](auto row) { + auto v = reference(row); + return v.has_value() ? v.value() : T(); + }, + [&](auto row) { return !reference(row).has_value(); }); + + SelectivityVector allRows(size); + *context.mutableIsFinalSelection() = false; + *context.mutableFinalSelection() = &allRows; + + { + vector_size_t begin = 0; + vector_size_t end = size / 3 * 2; + const auto rows = selectRange(begin, end); + std::vector result(1); + const auto memoryUsage = context.pool()->usedBytes(); + exprSet->eval(rows, context, result); + + SCOPED_TRACE(text); + SCOPED_TRACE(fmt::format("[{} - {})", begin, end)); + assertEqualRows(expectedResult, result[0], rows); + + if (clearCache) { + result.clear(); + exprSet->clearCache(); + context.vectorPool()->clear(); + { + exec::EvalErrorsPtr clearErrors; + context.moveAppendErrors(clearErrors); + } + ASSERT_EQ(context.pool()->usedBytes(), memoryUsage); + } + } - SCOPED_TRACE(text); - SCOPED_TRACE(fmt::format("[{} - {})", begin, end)); - assertEqualRows(expectedResult, result[0], rows); + { + const vector_size_t begin = size / 3; + const vector_size_t end = size; + const auto rows = selectRange(begin, end); + std::vector result(1); + const auto memoryUsage = context.pool()->usedBytes(); + exprSet->eval(0, 1, false, rows, context, result); + + SCOPED_TRACE(text); + SCOPED_TRACE(fmt::format("[{} - {})", begin, end)); + assertEqualRows(expectedResult, result[0], rows); + + if (clearCache) { + result.clear(); + exprSet->clearCache(); + context.vectorPool()->clear(); + { + exec::EvalErrorsPtr clearErrors; + context.moveAppendErrors(clearErrors); + } + ASSERT_EQ(context.pool()->usedBytes(), memoryUsage); + } + } } } @@ -490,8 +522,10 @@ class ExprEncodingsTest } std::shared_ptr queryCtx_{velox::core::QueryCtx::create()}; + std::shared_ptr execCtxPool_{ + rootPool_->addLeafChild("execCtx")}; std::unique_ptr execCtx_{ - std::make_unique(pool_.get(), queryCtx_.get())}; + std::make_unique(execCtxPool_.get(), queryCtx_.get())}; TestData testData_; RowTypePtr testDataType_; std::vector> testEncodings_; @@ -585,7 +619,7 @@ TEST_P(ExprEncodingsTest, commonSubExpressions) { if (!IS_BIGINT1 || !IS_BIGINT2) { return std::nullopt; } else { - auto sum = BIGINT1 + BIGINT2; + const auto sum = BIGINT1 + BIGINT2; return (BIGINT1 % 2 == 0 ? 2 * sum : 3 * sum) + 4 * sum; } });