Skip to content

Commit

Permalink
Add fs fault injection in memory arbitrator fuzzer spill (facebookinc…
Browse files Browse the repository at this point in the history
…ubator#11432)

Summary:
In memory arbitrator fuzzer, add file system fault injections to randomly inject failures on spill files across all read, readv, write paths and their different combinations. The newly introduced gflags for tuning the trigger conditions are:
- faulty_spill_ratio
- faulty_spill_max_trigger_threshold

Pull Request resolved: facebookincubator#11432

Reviewed By: xiaoxmeng

Differential Revision: D65773917

Pulled By: tanjialiang

fbshipit-source-id: dd95212439e45d2b850a5fa96c77a22042a89cc9
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Nov 13, 2024
1 parent d4bdc3b commit 117b5df
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 15 deletions.
97 changes: 85 additions & 12 deletions velox/exec/fuzzer/MemoryArbitrationFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,17 @@
#include <boost/random/uniform_int_distribution.hpp>

#include "velox/common/file/FileSystems.h"
#include "velox/common/file/tests/FaultyFileSystem.h"
#include "velox/common/memory/SharedArbitrator.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/dwio/dwrf/RegisterDwrfReader.h" // @manual
#include "velox/dwio/dwrf/RegisterDwrfWriter.h" // @manual
#include "velox/exec/MemoryReclaimer.h"
#include "velox/exec/TableWriter.h"
#include "velox/exec/fuzzer/FuzzerUtil.h"
#include "velox/exec/tests/utils/ArbitratorTestUtil.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/functions/lib/aggregates/AverageAggregateBase.h"
#include "velox/functions/sparksql/aggregates/Register.h"
#include "velox/serializers/CompactRowSerializer.h"
#include "velox/serializers/PrestoSerializer.h"
Expand Down Expand Up @@ -84,6 +82,20 @@ DEFINE_int32(
"Each second, the percentage chance of triggering global arbitration by "
"calling shrinking pools globally.");

DEFINE_double(
spill_faulty_fs_ratio,
0.1,
"Chance of spill filesystem being faulty(expressed as double from 0 to 1)");

DEFINE_int32(
spill_fs_fault_injection_ratio,
0.01,
"The chance of actually injecting fault in file operations for spill "
"filesystem. This is only applicable when 'spill_faulty_fs_ratio' is "
"larger than 0");

using namespace facebook::velox::tests::utils;

namespace facebook::velox::exec::test {
namespace {

Expand Down Expand Up @@ -131,6 +143,8 @@ class MemoryArbitrationFuzzer {
return boost::random::uniform_int_distribution<int32_t>(min, max)(rng_);
}

std::shared_ptr<TempDirectoryPath> maybeGenerateFaultySpillDirectory();

// Returns a list of randomly generated key types for join and aggregation.
std::vector<TypePtr> generateKeyTypes(int32_t numKeys);

Expand Down Expand Up @@ -684,15 +698,52 @@ MemoryArbitrationFuzzer::orderByPlans(const std::string& tableDir) {

struct ThreadLocalStats {
uint64_t taskAbortCount{0};
uint64_t spillFsFaultCount{0};
};

// Stats that keeps track of per thread execution status in verify()
thread_local ThreadLocalStats threadLocalStats;

std::shared_ptr<TempDirectoryPath>
MemoryArbitrationFuzzer::maybeGenerateFaultySpillDirectory() {
FuzzerGenerator fsRng(rng_());
const auto injectFsFault =
coinToss(fsRng, FLAGS_spill_fs_fault_injection_ratio);
if (!injectFsFault) {
return exec::test::TempDirectoryPath::create(false);
}
using OpType = FaultFileOperation::Type;
static const std::vector<std::unordered_set<OpType>> opTypes{
{OpType::kRead},
{OpType::kReadv},
{OpType::kWrite},
{OpType::kRead, OpType::kReadv},
{OpType::kRead, OpType::kWrite},
{OpType::kReadv, OpType::kWrite}};

const auto directory = exec::test::TempDirectoryPath::create(true);
auto faultyFileSystem = std::dynamic_pointer_cast<FaultyFileSystem>(
filesystems::getFileSystem(directory->getPath(), nullptr));
faultyFileSystem->setFileInjectionHook(
[this, injectTypes = opTypes[getRandomIndex(fsRng, opTypes.size() - 1)]](
FaultFileOperation* op) {
if (injectTypes.count(op->type) == 0) {
return;
}
FuzzerGenerator fsRng(rng_());
if (coinToss(fsRng, FLAGS_spill_fs_fault_injection_ratio)) {
++threadLocalStats.spillFsFaultCount;
VELOX_FAIL(
"Fault file injection on {}",
FaultFileOperation::typeString(op->type));
}
});
return directory;
}

void MemoryArbitrationFuzzer::verify() {
const auto outputDirectory = TempDirectoryPath::create();
const auto spillDirectory = exec::test::TempDirectoryPath::create();
const auto tableScanDir = exec::test::TempDirectoryPath::create();
auto spillDirectory = maybeGenerateFaultySpillDirectory();
const auto tableScanDir = exec::test::TempDirectoryPath::create(false);

// Set a percentage chance for the task to be externally aborted.
TestScopedAbortInjection scopedAbortInjection(
Expand All @@ -716,6 +767,10 @@ void MemoryArbitrationFuzzer::verify() {

SCOPE_EXIT {
waitForAllTasksToBeDeleted();
if (auto faultyFileSystem = std::dynamic_pointer_cast<FaultyFileSystem>(
filesystems::getFileSystem(spillDirectory->getPath(), nullptr))) {
faultyFileSystem->clearFileFaultInjections();
}
};

const auto numThreads = FLAGS_num_threads;
Expand All @@ -724,10 +779,11 @@ void MemoryArbitrationFuzzer::verify() {
queryThreads.reserve(numThreads);
for (int i = 0; i < numThreads; ++i) {
auto seed = rng_();
queryThreads.emplace_back([&, i, seed]() {
queryThreads.emplace_back([&, spillDirectory, i, seed]() {
FuzzerGenerator rng(seed);
while (!stop) {
const auto prevAbortCount = threadLocalStats.taskAbortCount;
const auto prevSpillFsFaultCount = threadLocalStats.spillFsFaultCount;
try {
const auto queryCtx = newQueryCtx(
memory::memoryManager(),
Expand All @@ -751,18 +807,35 @@ void MemoryArbitrationFuzzer::verify() {
.copyResults(pool_.get());
}
++stats_.wlock()->successCount;
VELOX_CHECK_EQ(
threadLocalStats.spillFsFaultCount, prevSpillFsFaultCount);
} catch (const VeloxException& e) {
auto lockedStats = stats_.wlock();
if (e.errorCode() == error_code::kMemCapExceeded.c_str()) {
++lockedStats->oomCount;
} else if (e.errorCode() == error_code::kMemAborted.c_str()) {
++lockedStats->abortCount;
} else if (e.errorCode() == error_code::kInvalidState.c_str()) {
// Triggered abort on task.
VELOX_CHECK_GT(threadLocalStats.taskAbortCount, prevAbortCount);
VELOX_CHECK(
e.message().find("Aborted for external error") !=
std::string::npos);
const auto injectedAbort =
threadLocalStats.taskAbortCount > prevAbortCount;
const auto injectedSpillFsFault =
threadLocalStats.spillFsFaultCount > prevSpillFsFaultCount;
VELOX_CHECK(injectedAbort || injectedSpillFsFault);
if (injectedAbort && !injectedSpillFsFault) {
VELOX_CHECK(
e.message().find("Aborted for external error") !=
std::string::npos);
} else if (!injectedAbort && injectedSpillFsFault) {
VELOX_CHECK(
e.message().find("Fault file injection on") !=
std::string::npos);
} else {
VELOX_CHECK(
e.message().find("Fault file injection on") !=
std::string::npos ||
e.message().find("Aborted for external error") !=
std::string::npos);
}
} else {
LOG(ERROR) << "Unexpected exception:\n" << e.what();
std::rethrow_exception(std::current_exception());
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/fuzzer/MemoryArbitrationFuzzerRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "velox/common/file/FileSystems.h"

#include "velox/common/file/tests/FaultyFileSystem.h"
#include "velox/exec/fuzzer/MemoryArbitrationFuzzer.h"
#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h"
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
Expand All @@ -31,6 +32,7 @@ class MemoryArbitrationFuzzerRunner {
static int run(size_t seed) {
serializer::presto::PrestoVectorSerde::registerVectorSerde();
filesystems::registerLocalFileSystem();
tests::utils::registerFaultyFileSystem();
functions::prestosql::registerAllScalarFunctions();
aggregate::prestosql::registerAllAggregateFunctions();
memoryArbitrationFuzzer(seed);
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/tests/utils/TempDirectoryPath.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ class TempDirectoryPath {
TempDirectoryPath(const TempDirectoryPath&) = delete;
TempDirectoryPath& operator=(const TempDirectoryPath&) = delete;

/// If fault injection is enabled, the returned the file path has the faulty
/// If fault injection is enabled, the returned file path will have the faulty
/// file system prefix scheme. The velox fs then opens the directory through
/// the faulty file system. The actual file operation might either fails or
/// delegate to the actual file.
/// the faulty file system. The file operation will then either fail or be
/// delegated to the actual file.
const std::string& getPath() const {
return path_;
}
Expand Down

0 comments on commit 117b5df

Please sign in to comment.