Skip to content

Commit

Permalink
Add pool name regex in TestScopedSpillInjection (#9413)
Browse files Browse the repository at this point in the history
Summary:
Add a pool name regex in TestScopedSpillInjection to control
the spill injection for target operators.

Pull Request resolved: #9413

Reviewed By: tanjialiang

Differential Revision: D56005393

Pulled By: xiaoxmeng

fbshipit-source-id: b3d904a1e1c2ba6025bb990520e1bd23623f3cd0
  • Loading branch information
duanmeng authored and facebook-github-bot committed Apr 11, 2024
1 parent efb0213 commit 0643556
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 16 deletions.
4 changes: 2 additions & 2 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ void GroupingSet::ensureInputFits(const RowVectorPtr& input) {
const int64_t flatBytes = input->estimateFlatSize();

// Test-only spill path.
if (testingTriggerSpill()) {
if (testingTriggerSpill(pool_.name())) {
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
memory::testingRunArbitration(&pool_);
return;
Expand Down Expand Up @@ -905,7 +905,7 @@ void GroupingSet::ensureOutputFits() {
}

// Test-only spill path.
if (testingTriggerSpill()) {
if (testingTriggerSpill(pool_.name())) {
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
memory::testingRunArbitration(&pool_);
return;
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ void HashBuild::ensureInputFits(RowVectorPtr& input) {

if (numRows != 0) {
// Test-only spill path.
if (testingTriggerSpill()) {
if (testingTriggerSpill(pool()->name())) {
Operator::ReclaimableSectionGuard guard(this);
memory::testingRunArbitration(pool());
return;
Expand Down Expand Up @@ -777,7 +777,7 @@ void HashBuild::ensureTableFits(uint64_t numRows) {
}

// Test-only spill path.
if (testingTriggerSpill()) {
if (testingTriggerSpill(pool()->name())) {
Operator::ReclaimableSectionGuard guard(this);
memory::testingRunArbitration(pool());
return;
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1524,7 +1524,7 @@ void HashProbe::ensureOutputFits() {
return;
}

if (testingTriggerSpill()) {
if (testingTriggerSpill(pool()->name())) {
Operator::ReclaimableSectionGuard guard(this);
memory::testingRunArbitration(pool());
}
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/RowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void RowNumber::ensureInputFits(const RowVectorPtr& input) {
const auto outOfLineBytesPerRow = outOfLineBytes / numDistinct;

// Test-only spill path.
if (testingTriggerSpill()) {
if (testingTriggerSpill(pool()->name())) {
Operator::ReclaimableSectionGuard guard(this);
memory::testingRunArbitration(pool());
return;
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ void SortBuffer::ensureInputFits(const VectorPtr& input) {
const int64_t flatInputBytes = input->estimateFlatSize();

// Test-only spill path.
if (numRows > 0 && testingTriggerSpill()) {
if (numRows > 0 && testingTriggerSpill(pool_->name())) {
spill();
return;
}
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void SortWindowBuild::ensureInputFits(const RowVectorPtr& input) {
}

// Test-only spill path.
if (testingTriggerSpill()) {
if (testingTriggerSpill(pool_->name())) {
spill();
return;
}
Expand Down
23 changes: 19 additions & 4 deletions velox/exec/Spill.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,25 +300,33 @@ tsan_atomic<uint32_t>& maxSpillInjections() {
static tsan_atomic<uint32_t> maxInjections{0};
return maxInjections;
}
} // namespace

tsan_atomic<uint32_t>& testingSpillPct() {
static tsan_atomic<uint32_t> spillPct{0};
return spillPct;
}

tsan_atomic<std::string>& testingSpillPoolRegExp() {
static tsan_atomic<std::string> spillPoolRegExp{".*"};
return spillPoolRegExp;
}
} // namespace

TestScopedSpillInjection::TestScopedSpillInjection(
int32_t spillPct,
const std::string& poolRegExp,
uint32_t maxInjections) {
VELOX_CHECK_EQ(injectedSpillCount(), 0);
testingSpillPct() = spillPct;
testingSpillPoolRegExp() = poolRegExp;
maxSpillInjections() = maxInjections;
injectedSpillCount() = 0;
}

TestScopedSpillInjection::~TestScopedSpillInjection() {
testingSpillPct() = 0;
injectedSpillCount() = 0;
testingSpillPoolRegExp() = ".*";
maxSpillInjections() = 0;
}

Expand All @@ -327,17 +335,24 @@ tsan_atomic<uint32_t>& injectedSpillCount() {
return injectedCount;
}

bool testingTriggerSpill() {
bool testingTriggerSpill(const std::string& pool) {
// Do not evaluate further if trigger is not set.
if (testingSpillPct() <= 0) {
if (!pool.empty() && !RE2::FullMatch(pool, testingSpillPoolRegExp())) {
return false;
}
if (folly::Random::rand32() % 100 > testingSpillPct()) {

if (testingSpillPct() <= 0) {
return false;
}

if (injectedSpillCount() >= maxSpillInjections()) {
return false;
}

if (folly::Random::rand32() % 100 > testingSpillPct()) {
return false;
}

++injectedSpillCount();
return true;
}
Expand Down
10 changes: 7 additions & 3 deletions velox/exec/Spill.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <folly/container/F14Set.h>

#include <re2/re2.h>
#include "velox/common/base/SpillConfig.h"
#include "velox/common/base/SpillStats.h"
#include "velox/common/compression/Compression.h"
Expand Down Expand Up @@ -452,22 +453,25 @@ SpillPartitionIdSet toSpillPartitionIdSet(
/// triggered spill.
/// 'spillPct' indicates the chance of triggering spilling. 100% means spill
/// will always be triggered.
/// 'pools' is a regular expression string used to define the specific memory
/// pools targeted for injecting spilling.
/// 'maxInjections' indicates the max number of actual triggering. e.g. when
/// 'spillPct' is 20 and 'maxInjections' is 10, continuous calls to
/// testingTriggerSpill() will keep rolling the dice that has a chance of 20%
/// triggering until 10 triggers have been invoked.
/// testingTriggerSpill(poolName) will keep rolling the dice that has a
/// chance of 20% triggering until 10 triggers have been invoked.
class TestScopedSpillInjection {
public:
explicit TestScopedSpillInjection(
int32_t spillPct,
const std::string& poolRegExp = ".*",
uint32_t maxInjections = std::numeric_limits<uint32_t>::max());

~TestScopedSpillInjection();
};

/// Test utility that returns true if triggered spill is evaluated to happen,
/// false otherwise.
bool testingTriggerSpill();
bool testingTriggerSpill(const std::string& poolName = "");

tsan_atomic<uint32_t>& injectedSpillCount();

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/TopNRowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ void TopNRowNumber::ensureInputFits(const RowVectorPtr& input) {
}

// Test-only spill path.
if (testingTriggerSpill()) {
if (testingTriggerSpill(pool()->name())) {
spill();
return;
}
Expand Down
41 changes: 41 additions & 0 deletions velox/exec/tests/SpillTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,47 @@ TEST(SpillTest, removeEmptyPartitions) {
}
}

TEST(SpillTest, scopedSpillInjectionRegex) {
{
TestScopedSpillInjection scopedSpillInjection(100, ".*?(TableWrite).*");
ASSERT_TRUE(testingTriggerSpill("op.1.0.0.TableWrite"));
ASSERT_TRUE(testingTriggerSpill("op.1.0.0.TableWrite.hive-xyz"));
ASSERT_TRUE(testingTriggerSpill("op.1.0.0.TableWrite-hive-xyz"));
ASSERT_FALSE(testingTriggerSpill("op.1.0.0.RowNumber"));
ASSERT_TRUE(testingTriggerSpill(""));
}

{
TestScopedSpillInjection scopedSpillInjection(
100, ".*?(RowNumber|TableWrite|HashBuild).*");
ASSERT_TRUE(testingTriggerSpill("op.1.0.RowNumber"));
ASSERT_TRUE(testingTriggerSpill("op.1.0.0.TableWrite.hive-xyz"));
ASSERT_TRUE(testingTriggerSpill("op.1.0.0.TableWrite-hive-xyz"));
ASSERT_TRUE(testingTriggerSpill("op.1..0.HashBuild"));
ASSERT_FALSE(testingTriggerSpill("op.1.0.0.Aggregation"));
ASSERT_TRUE(testingTriggerSpill(""));
}

{
TestScopedSpillInjection scopedSpillInjection(
100, R"(.*?(RowNumber|TableWrite|HashBuild)(?:\..*)?)");
ASSERT_TRUE(testingTriggerSpill("op.1.RowNumber"));
ASSERT_TRUE(testingTriggerSpill("op.1.0.0.TableWrite.hive-xyz"));
ASSERT_FALSE(testingTriggerSpill("op.1.0.0.TableWrite-hive-xyz"));
ASSERT_FALSE(testingTriggerSpill("op.1.0.0.Aggregation"));
ASSERT_TRUE(testingTriggerSpill(""));
}

{
TestScopedSpillInjection scopedSpillInjection(100);
ASSERT_TRUE(testingTriggerSpill("op.1.RowNumber"));
ASSERT_TRUE(testingTriggerSpill("op.1.0.0.TableWrite.hive-xyz"));
ASSERT_TRUE(testingTriggerSpill("op.1.0.0.TableWrite-hive-xyz"));
ASSERT_TRUE(testingTriggerSpill("op.1.0.0.Aggregation"));
ASSERT_TRUE(testingTriggerSpill());
}
}

INSTANTIATE_TEST_SUITE_P(
SpillTestSuite,
SpillTest,
Expand Down

0 comments on commit 0643556

Please sign in to comment.