Skip to content

Commit

Permalink
Support to clear cache in expr set (facebookincubator#11308)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#11308

Support to clear the internally cached buffers from expr set which are allocated through velox memory pool.
This is used by memory arbitration to reclaim memory from eval expression. Unit test is added to verify.

Reviewed By: bikramSingh91

Differential Revision: D64546974

fbshipit-source-id: fef0b2d54d9aaa7afe82d36be77545d621ad7c1e
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Oct 22, 2024
1 parent eaff500 commit 516cc9b
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 54 deletions.
8 changes: 6 additions & 2 deletions velox/expression/ConjunctExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ class ConjunctExpr : public SpecialForm {
std::string toSql(
std::vector<VectorPtr>* complexConstants = nullptr) const override;

void clearCache() override {
Expr::clearCache();
tempValues_.reset();
tempNulls_.reset();
}

private:
static TypePtr resolveType(const std::vector<TypePtr>& argTypes);

Expand All @@ -89,8 +95,6 @@ class ConjunctExpr : public SpecialForm {
// true if conjunction (and), false if disjunction (or).
const bool isAnd_;

// Errors encountered before processing the current input.
FlatVectorPtr<StringView> errors_;
// temp space for nulls and values of inputs
BufferPtr tempValues_;
BufferPtr tempNulls_;
Expand Down
11 changes: 8 additions & 3 deletions velox/expression/Expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,6 @@ void Expr::evaluateSharedSubexpr(
} else {
// Otherwise, simply evaluate it and return without caching the results.
eval(rows, context, result);

return;
}
}
Expand Down Expand Up @@ -932,15 +931,15 @@ void Expr::evaluateSharedSubexpr(
// Identify a subset of rows that need to be computed: rows -
// sharedSubexprRows_.
LocalSelectivityVector missingRowsHolder(context, rows);
auto missingRows = missingRowsHolder.get();
auto* missingRows = missingRowsHolder.get();
missingRows->deselect(*sharedSubexprRows);
VELOX_DCHECK(missingRows->hasSelections());

// Fix finalSelection to avoid losing values outside missingRows.
// Final selection of rows need to include sharedSubexprRows_, missingRows and
// current final selection of rows if set.
LocalSelectivityVector newFinalSelectionHolder(context, *sharedSubexprRows);
auto newFinalSelection = newFinalSelectionHolder.get();
auto* newFinalSelection = newFinalSelectionHolder.get();
newFinalSelection->select(*missingRows);
if (!context.isFinalSelection()) {
newFinalSelection->select(*context.finalSelection());
Expand Down Expand Up @@ -1964,6 +1963,12 @@ void ExprSet::clear() {
multiplyReferencedFields_.clear();
}

void ExprSet::clearCache() {
for (auto& expr : exprs_) {
expr->clearCache();
}
}

void ExprSetSimplified::eval(
int32_t begin,
int32_t end,
Expand Down
26 changes: 19 additions & 7 deletions velox/expression/Expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,14 @@ class Expr {
cachedDictionaryIndices_ = nullptr;
}

virtual void clearCache() {
sharedSubexprResults_.clear();
clearMemo();
for (auto& input : inputs_) {
input->clearCache();
}
}

const TypePtr& type() const {
return type_;
}
Expand Down Expand Up @@ -393,7 +401,7 @@ class Expr {
return vectorFunctionMetadata_;
}

auto& inputValues() {
std::vector<VectorPtr>& inputValues() {
return inputValues_;
}

Expand Down Expand Up @@ -595,11 +603,11 @@ class Expr {
// The distinct references to input columns in 'inputs_'
// subtrees. Empty if this is the same as 'distinctFields_' of
// parent Expr.
std::vector<FieldReference * FOLLY_NONNULL> distinctFields_;
std::vector<FieldReference*> distinctFields_;

// Fields referenced by multiple inputs, which is subset of distinctFields_.
// Used to determine pre-loading of lazy vectors at current expr.
std::unordered_set<FieldReference * FOLLY_NONNULL> multiplyReferencedFields_;
std::unordered_set<FieldReference*> multiplyReferencedFields_;

// True if a null in any of 'distinctFields_' causes 'this' to be
// null for the row.
Expand Down Expand Up @@ -693,8 +701,7 @@ class Expr {
};

/// Generate a selectivity vector of a single row.
SelectivityVector* FOLLY_NONNULL
singleRow(LocalSelectivityVector& holder, vector_size_t row);
SelectivityVector* singleRow(LocalSelectivityVector& holder, vector_size_t row);

using ExprPtr = std::shared_ptr<Expr>;

Expand Down Expand Up @@ -736,6 +743,11 @@ class ExprSet {

void clear();

/// Clears the internally cached buffers used for shared sub-expressions and
/// dictionary memoization which are allocated through memory pool. This is
/// used by memory arbitration to reclaim memory.
void clearCache();

core::ExecCtx* execCtx() const {
return execCtx_;
}
Expand Down Expand Up @@ -785,10 +797,10 @@ class ExprSet {
std::vector<std::shared_ptr<Expr>> exprs_;

// The distinct references to input columns among all expressions in ExprSet.
std::vector<FieldReference * FOLLY_NONNULL> distinctFields_;
std::vector<FieldReference*> distinctFields_;

// Fields referenced by multiple expressions in ExprSet.
std::unordered_set<FieldReference * FOLLY_NONNULL> multiplyReferencedFields_;
std::unordered_set<FieldReference*> multiplyReferencedFields_;

// Distinct Exprs reachable from 'exprs_' for which reset() needs to
// be called at the start of eval().
Expand Down
5 changes: 5 additions & 0 deletions velox/expression/SwitchExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ class SwitchExpr : public SpecialForm {
return true;
}

void clearCache() override {
Expr::clearCache();
tempValues_.reset();
}

private:
static TypePtr resolveType(const std::vector<TypePtr>& argTypes);

Expand Down
118 changes: 76 additions & 42 deletions velox/expression/tests/ExprEncodingsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,51 +355,83 @@ class ExprEncodingsTest

/// Evaluates 'text' expression on 'testDataRow()' twice. First, evaluates the
/// expression on the first 2/3 of the rows. Then, evaluates the expression on
/// the last 1/3 of the rows.
/// the last 1/3 of the rows. It also performs an iteration which clears all
/// caches utilized during expression evaluation, such as shared
/// sub-expressions, memoization, vector pool, and error vector, and then
/// confirms that the memory usage before and after evaluation is consistent
template <typename T>
void run(
const std::string& text,
std::function<std::optional<T>(int32_t)> reference) {
auto source = {parseExpression(text, testDataType_)};
auto exprSet = std::make_unique<exec::ExprSet>(source, execCtx_.get());
auto row = testDataRow();
exec::EvalCtx context(execCtx_.get(), exprSet.get(), row.get());
auto size = row->size();

auto expectedResult = makeFlatVector<T>(
size,
[&](auto row) {
auto v = reference(row);
return v.has_value() ? v.value() : T();
},
[&](auto row) { return !reference(row).has_value(); });

SelectivityVector allRows(size);
*context.mutableIsFinalSelection() = false;
*context.mutableFinalSelection() = &allRows;

{
vector_size_t begin = 0;
vector_size_t end = size / 3 * 2;
auto rows = selectRange(begin, end);
std::vector<VectorPtr> result(1);
exprSet->eval(rows, context, result);

SCOPED_TRACE(text);
SCOPED_TRACE(fmt::format("[{} - {})", begin, end));
assertEqualRows(expectedResult, result[0], rows);
}

{
vector_size_t begin = size / 3;
vector_size_t end = size;
auto rows = selectRange(begin, end);
std::vector<VectorPtr> result(1);
exprSet->eval(0, 1, false, rows, context, result);
for (auto clearCache : {false, true}) {
SCOPED_TRACE(fmt::format("clearCache: {}", clearCache));

const auto source = {parseExpression(text, testDataType_)};
auto exprSet = std::make_unique<exec::ExprSet>(source, execCtx_.get());
const auto row = testDataRow();
exec::EvalCtx context(execCtx_.get(), exprSet.get(), row.get());
context.vectorPool()->clear();
const auto size = row->size();

auto expectedResult = makeFlatVector<T>(
size,
[&](auto row) {
auto v = reference(row);
return v.has_value() ? v.value() : T();
},
[&](auto row) { return !reference(row).has_value(); });

SelectivityVector allRows(size);
*context.mutableIsFinalSelection() = false;
*context.mutableFinalSelection() = &allRows;

{
vector_size_t begin = 0;
vector_size_t end = size / 3 * 2;
const auto rows = selectRange(begin, end);
std::vector<VectorPtr> result(1);
const auto memoryUsage = context.pool()->usedBytes();
exprSet->eval(rows, context, result);

SCOPED_TRACE(text);
SCOPED_TRACE(fmt::format("[{} - {})", begin, end));
assertEqualRows(expectedResult, result[0], rows);

if (clearCache) {
result.clear();
exprSet->clearCache();
context.vectorPool()->clear();
{
exec::EvalErrorsPtr clearErrors;
context.moveAppendErrors(clearErrors);
}
ASSERT_EQ(context.pool()->usedBytes(), memoryUsage);
}
}

SCOPED_TRACE(text);
SCOPED_TRACE(fmt::format("[{} - {})", begin, end));
assertEqualRows(expectedResult, result[0], rows);
{
const vector_size_t begin = size / 3;
const vector_size_t end = size;
const auto rows = selectRange(begin, end);
std::vector<VectorPtr> result(1);
const auto memoryUsage = context.pool()->usedBytes();
exprSet->eval(0, 1, false, rows, context, result);

SCOPED_TRACE(text);
SCOPED_TRACE(fmt::format("[{} - {})", begin, end));
assertEqualRows(expectedResult, result[0], rows);

if (clearCache) {
result.clear();
exprSet->clearCache();
context.vectorPool()->clear();
{
exec::EvalErrorsPtr clearErrors;
context.moveAppendErrors(clearErrors);
}
ASSERT_EQ(context.pool()->usedBytes(), memoryUsage);
}
}
}
}

Expand Down Expand Up @@ -490,8 +522,10 @@ class ExprEncodingsTest
}

std::shared_ptr<core::QueryCtx> queryCtx_{velox::core::QueryCtx::create()};
std::shared_ptr<memory::MemoryPool> execCtxPool_{
rootPool_->addLeafChild("execCtx")};
std::unique_ptr<core::ExecCtx> execCtx_{
std::make_unique<core::ExecCtx>(pool_.get(), queryCtx_.get())};
std::make_unique<core::ExecCtx>(execCtxPool_.get(), queryCtx_.get())};
TestData testData_;
RowTypePtr testDataType_;
std::vector<std::vector<EncodingOptions>> testEncodings_;
Expand Down Expand Up @@ -585,7 +619,7 @@ TEST_P(ExprEncodingsTest, commonSubExpressions) {
if (!IS_BIGINT1 || !IS_BIGINT2) {
return std::nullopt;
} else {
auto sum = BIGINT1 + BIGINT2;
const auto sum = BIGINT1 + BIGINT2;
return (BIGINT1 % 2 == 0 ? 2 * sum : 3 * sum) + 4 * sum;
}
});
Expand Down

0 comments on commit 516cc9b

Please sign in to comment.