Skip to content

Commit

Permalink
Merge branch 'main' of github.com:kgpai/velox-1
Browse files Browse the repository at this point in the history
  • Loading branch information
kgpai committed Dec 20, 2023
2 parents 3559328 + a6c9936 commit e6dc44c
Show file tree
Hide file tree
Showing 53 changed files with 1,360 additions and 875 deletions.
8 changes: 4 additions & 4 deletions .circleci/dist_compile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ jobs:
command: |
mkdir -p /tmp/spark_aggregate_fuzzer_repro/
chmod -R 777 /tmp/spark_aggregate_fuzzer_repro
_build/debug/velox/exec/tests/spark_aggregation_fuzzer_test \
_build/debug/velox/functions/sparksql/fuzzer/spark_aggregation_fuzzer_test \
--seed ${RANDOM} \
--duration_sec 60 \
--logtostderr=1 \
Expand All @@ -332,7 +332,7 @@ jobs:
mkdir -p /tmp/aggregate_fuzzer_repro/
rm -rfv /tmp/aggregate_fuzzer_repro/*
chmod -R 777 /tmp/aggregate_fuzzer_repro
_build/debug/velox/exec/tests/velox_aggregation_fuzzer_test \
_build/debug/velox/functions/prestosql/fuzzer/velox_aggregation_fuzzer_test \
--seed ${RANDOM} \
--duration_sec 1800 \
--logtostderr=1 \
Expand Down Expand Up @@ -503,7 +503,7 @@ jobs:
fuzzer_output: "/tmp/spark_aggregate_fuzzer.log"
fuzzer_repro: "/tmp/spark_aggregate_fuzzer_repro"
fuzzer_name: "SparkAggregate"
fuzzer_exe: "_build/debug/velox/exec/tests/spark_aggregation_fuzzer_test"
fuzzer_exe: "_build/debug/velox/functions/sparksql/fuzzer/spark_aggregation_fuzzer_test"
fuzzer_args: " --seed ${RANDOM} --duration_sec 600 --logtostderr=1 --minloglevel=0 \
--repro_persist_path=/tmp/spark_aggregate_fuzzer_repro"

Expand All @@ -518,7 +518,7 @@ jobs:
fuzzer_output: "/tmp/aggregate_fuzzer.log"
fuzzer_repro: "/tmp/aggregate_fuzzer_repro"
fuzzer_name: "Aggregate"
fuzzer_exe: "_build/debug/velox/exec/tests/velox_aggregation_fuzzer_test"
fuzzer_exe: "_build/debug/velox/functions/prestosql/fuzzer/velox_aggregation_fuzzer_test"
fuzzer_args: " --seed ${RANDOM} --duration_sec 3600 --logtostderr=1 --minloglevel=0 \
--repro_persist_path=/tmp/aggregate_fuzzer_repro"

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/scheduled.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ jobs:
name: spark
path: |
velox/_build/debug/velox/expression/tests/spark_expression_fuzzer_test
velox/_build/debug/velox/expression/tests/spark_aggregation_fuzzer_test
velox/_build/debug/velox/functions/sparksql/fuzzer/spark_aggregation_fuzzer_test
- name: Upload aggregation fuzzer
uses: actions/upload-artifact@v3
with:
name: aggregation
path: velox/_build/debug/velox/exec/tests/velox_aggregation_fuzzer_test
path: velox/_build/debug/velox/functions/prestosql/fuzzer/velox_aggregation_fuzzer_test

- name: Upload join fuzzer
uses: actions/upload-artifact@v3
Expand Down
28 changes: 15 additions & 13 deletions velox/benchmarks/basic/VectorFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ namespace {

using namespace facebook::velox;

std::shared_ptr<memory::MemoryPool> pool{
memory::MemoryManager::getInstance()->addLeafPool()};
memory::MemoryPool* pool() {
static auto leaf = memory::MemoryManager::getInstance()->addLeafPool();
return leaf.get();
}

VectorFuzzer::Options getOpts(size_t n, double nullRatio = 0) {
VectorFuzzer::Options opts;
Expand All @@ -39,25 +41,25 @@ VectorFuzzer::Options getOpts(size_t n, double nullRatio = 0) {
}

BENCHMARK_MULTI(flatInteger, n) {
VectorFuzzer fuzzer(getOpts(n), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n), pool(), FLAGS_fuzzer_seed);
folly::doNotOptimizeAway(fuzzer.fuzzFlat(BIGINT()));
return n;
}

BENCHMARK_RELATIVE_MULTI(flatIntegerHalfNull, n) {
VectorFuzzer fuzzer(getOpts(n, 0.5), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n, 0.5), pool(), FLAGS_fuzzer_seed);
folly::doNotOptimizeAway(fuzzer.fuzzFlat(BIGINT()));
return n;
}

BENCHMARK_RELATIVE_MULTI(flatDouble, n) {
VectorFuzzer fuzzer(getOpts(n), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n), pool(), FLAGS_fuzzer_seed);
folly::doNotOptimizeAway(fuzzer.fuzzFlat(DOUBLE()));
return n;
}

BENCHMARK_RELATIVE_MULTI(flatBool, n) {
VectorFuzzer fuzzer(getOpts(n), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n), pool(), FLAGS_fuzzer_seed);
folly::doNotOptimizeAway(fuzzer.fuzzFlat(BOOLEAN()));
return n;
}
Expand All @@ -66,7 +68,7 @@ BENCHMARK_RELATIVE_MULTI(flatVarcharAscii, n) {
auto opts = getOpts(n);
opts.charEncodings = {UTF8CharList::ASCII};

VectorFuzzer fuzzer(opts, pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(opts, pool(), FLAGS_fuzzer_seed);
folly::doNotOptimizeAway(fuzzer.fuzzFlat(VARCHAR()));
return n;
}
Expand All @@ -75,37 +77,37 @@ BENCHMARK_RELATIVE_MULTI(flatVarcharUtf8, n) {
auto opts = getOpts(n);
opts.charEncodings = {UTF8CharList::EXTENDED_UNICODE};

VectorFuzzer fuzzer(opts, pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(opts, pool(), FLAGS_fuzzer_seed);
folly::doNotOptimizeAway(fuzzer.fuzzFlat(VARCHAR()));
return n;
}

BENCHMARK_DRAW_LINE();

BENCHMARK_RELATIVE_MULTI(constantInteger, n) {
VectorFuzzer fuzzer(getOpts(n), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n), pool(), FLAGS_fuzzer_seed);
folly::doNotOptimizeAway(fuzzer.fuzzConstant(BIGINT()));
return n;
}

BENCHMARK_RELATIVE_MULTI(dictionaryInteger, n) {
VectorFuzzer fuzzer(getOpts(n), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n), pool(), FLAGS_fuzzer_seed);
folly::doNotOptimizeAway(fuzzer.fuzzDictionary(fuzzer.fuzzFlat(BIGINT())));
return n;
}

BENCHMARK_DRAW_LINE();

BENCHMARK_RELATIVE_MULTI(flatArray, n) {
VectorFuzzer fuzzer(getOpts(n), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n), pool(), FLAGS_fuzzer_seed);
const size_t elementsSize = n * fuzzer.getOptions().containerLength;
folly::doNotOptimizeAway(
fuzzer.fuzzArray(fuzzer.fuzzFlat(BIGINT(), elementsSize), n));
return n;
}

BENCHMARK_RELATIVE_MULTI(flatMap, n) {
VectorFuzzer fuzzer(getOpts(n), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n), pool(), FLAGS_fuzzer_seed);
const size_t elementsSize = n * fuzzer.getOptions().containerLength;
folly::doNotOptimizeAway(fuzzer.fuzzMap(
fuzzer.fuzzFlat(BIGINT(), elementsSize),
Expand All @@ -115,7 +117,7 @@ BENCHMARK_RELATIVE_MULTI(flatMap, n) {
}

BENCHMARK_RELATIVE_MULTI(flatMapArrayNested, n) {
VectorFuzzer fuzzer(getOpts(n), pool.get(), FLAGS_fuzzer_seed);
VectorFuzzer fuzzer(getOpts(n), pool(), FLAGS_fuzzer_seed);
const size_t elementsSize = n * fuzzer.getOptions().containerLength;

folly::doNotOptimizeAway(fuzzer.fuzzMap(
Expand Down
15 changes: 15 additions & 0 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
namespace facebook::velox {

void registerVeloxMetrics() {
/// ================== Task Execution Counters =================
// The number of driver yield count when exceeds the per-driver cpu time slice
// limit if enforced.
DEFINE_METRIC(kMetricDriverYieldCount, facebook::velox::StatType::COUNT);

/// ================== Cache Counters =================

// Tracks hive handle generation latency in range of [0, 100s] and reports
// P50, P90, P99, and P100.
DEFINE_HISTOGRAM_METRIC(
Expand Down Expand Up @@ -96,6 +103,14 @@ void registerVeloxMetrics() {
DEFINE_METRIC(
kMetricArbitratorFreeCapacityBytes, facebook::velox::StatType::AVG);

// Tracks the memory pool usage leak in bytes.
DEFINE_METRIC(
kMetricMemoryPoolUsageLeakBytes, facebook::velox::StatType::SUM);

// Tracks the memory pool reservation leak in bytes.
DEFINE_METRIC(
kMetricMemoryPoolReservationLeakBytes, facebook::velox::StatType::SUM);

/// ================== Spill related Counters =================

// The number of bytes in memory to spill.
Expand Down
9 changes: 9 additions & 0 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ constexpr folly::StringPiece kMetricMemoryReclaimWaitTimeoutCount{
constexpr folly::StringPiece kMetricMemoryNonReclaimableCount{
"velox.memory_non_reclaimable_count"};

constexpr folly::StringPiece kMetricMemoryPoolUsageLeakBytes{
"velox.memory_pool_usage_leak_bytes"};

constexpr folly::StringPiece kMetricMemoryPoolReservationLeakBytes{
"velox.memory_pool_reservation_leak_bytes"};

constexpr folly::StringPiece kMetricArbitratorRequestsCount{
"velox.arbitrator_requests_count"};

Expand All @@ -67,6 +73,9 @@ constexpr folly::StringPiece kMetricArbitratorArbitrationTimeMs{
constexpr folly::StringPiece kMetricArbitratorFreeCapacityBytes{
"velox.arbitrator_free_capacity_bytes"};

constexpr folly::StringPiece kMetricDriverYieldCount{
"velox.driver_yield_count"};

constexpr folly::StringPiece kMetricSpilledInputBytes{
"velox.spill_input_bytes"};

Expand Down
2 changes: 0 additions & 2 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options)
.alignment = alignment_,
.maxCapacity = kMaxMemory,
.trackUsage = options.trackDefaultUsage,
.checkUsageLeak = options.checkUsageLeak,
.debugEnabled = options.debugEnabled,
.coreOnAllocationFailureEnabled =
options.coreOnAllocationFailureEnabled})} {
Expand Down Expand Up @@ -161,7 +160,6 @@ std::shared_ptr<MemoryPool> MemoryManager::addRootPool(
options.alignment = alignment_;
options.maxCapacity = capacity;
options.trackUsage = true;
options.checkUsageLeak = checkUsageLeak_;
options.debugEnabled = debugEnabled_;
options.coreOnAllocationFailureEnabled = coreOnAllocationFailureEnabled_;

Expand Down
28 changes: 20 additions & 8 deletions velox/common/memory/MemoryPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <signal.h>
#include <set>

#include "velox/common/base/Counters.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/common/base/SuccinctPrinter.h"
#include "velox/common/memory/Memory.h"
#include "velox/common/testutil/TestValue.h"
Expand Down Expand Up @@ -212,7 +214,6 @@ MemoryPool::MemoryPool(
maxCapacity_(parent_ == nullptr ? options.maxCapacity : kMaxMemory),
trackUsage_(options.trackUsage),
threadSafe_(options.threadSafe),
checkUsageLeak_(options.checkUsageLeak),
debugEnabled_(options.debugEnabled),
coreOnAllocationFailureEnabled_(options.coreOnAllocationFailureEnabled) {
VELOX_CHECK(!isRoot() || !isLeaf());
Expand Down Expand Up @@ -404,13 +405,25 @@ MemoryPoolImpl::~MemoryPoolImpl() {
if (parent_ != nullptr) {
toImpl(parent_)->dropChild(this);
}
if (checkUsageLeak_) {
VELOX_CHECK(
(usedReservationBytes_ == 0) && (reservationBytes_ == 0) &&
(minReservationBytes_ == 0),
"Bad memory usage track state: {}",
toString());

if (isLeaf()) {
if (usedReservationBytes_ > 0) {
RECORD_METRIC_VALUE(
kMetricMemoryPoolUsageLeakBytes, usedReservationBytes_);
}

if (minReservationBytes_ > 0) {
RECORD_METRIC_VALUE(
kMetricMemoryPoolReservationLeakBytes, minReservationBytes_);
}
}

VELOX_DCHECK(
(usedReservationBytes_ == 0) && (reservationBytes_ == 0) &&
(minReservationBytes_ == 0),
"Bad memory usage track state: {}",
toString());

if (destructionCb_ != nullptr) {
destructionCb_(this);
}
Expand Down Expand Up @@ -652,7 +665,6 @@ std::shared_ptr<MemoryPool> MemoryPoolImpl::genChild(
.alignment = alignment_,
.trackUsage = trackUsage_,
.threadSafe = threadSafe,
.checkUsageLeak = checkUsageLeak_,
.debugEnabled = debugEnabled_,
.coreOnAllocationFailureEnabled = coreOnAllocationFailureEnabled_});
}
Expand Down
16 changes: 0 additions & 16 deletions velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,6 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
/// memory pools from the same root memory pool independently.
bool threadSafe{true};

/// TODO: deprecate this flag after all the existing memory leak use cases
/// have been fixed.
///
/// If true, checks the memory usage leak on destruction.
///
/// NOTE: user can turn on/off the memory leak check of each individual
/// memory pools from the same root memory pool independently.
bool checkUsageLeak{FLAGS_velox_memory_leak_check_enabled};

/// If true, tracks the allocation and free call stacks to detect the source
/// of memory leak for testing purpose.
bool debugEnabled{FLAGS_velox_memory_pool_debug_enabled};
Expand Down Expand Up @@ -212,12 +203,6 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
return threadSafe_;
}

/// Returns true if this memory pool checks memory leak on destruction.
/// Used only for test purposes.
virtual bool testingCheckUsageLeak() const {
return checkUsageLeak_;
}

/// Invoked to traverse the memory pool subtree rooted at this, and calls
/// 'visitor' on each visited child memory pool with the parent pool's
/// 'poolMutex_' reader lock held. The 'visitor' must not access the
Expand Down Expand Up @@ -531,7 +516,6 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
const int64_t maxCapacity_;
const bool trackUsage_;
const bool threadSafe_;
const bool checkUsageLeak_;
const bool debugEnabled_;
const bool coreOnAllocationFailureEnabled_;

Expand Down
11 changes: 0 additions & 11 deletions velox/common/memory/tests/MemoryManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -616,17 +616,6 @@ TEST_F(MemoryManagerTest, quotaEnforcement) {
}
}

TEST_F(MemoryManagerTest, testCheckUsageLeak) {
FLAGS_velox_memory_leak_check_enabled = true;
auto& manager = MemoryManager::testingSetInstance(
memory::MemoryManagerOptions{.checkUsageLeak = false});

auto rootPool = manager.addRootPool("duplicateRootPool", kMaxMemory);
auto leafPool = manager.addLeafPool("duplicateLeafPool", true);
ASSERT_FALSE(rootPool->testingCheckUsageLeak());
ASSERT_FALSE(leafPool->testingCheckUsageLeak());
}

} // namespace memory
} // namespace velox
} // namespace facebook
14 changes: 12 additions & 2 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,11 +312,11 @@ class QueryConfig {
static constexpr const char* kSparkBloomFilterExpectedNumItems =
"spark.bloom_filter.expected_num_items";

// The default number of bits to use for the bloom filter.
/// The default number of bits to use for the bloom filter.
static constexpr const char* kSparkBloomFilterNumBits =
"spark.bloom_filter.num_bits";

// The max number of bits to use for the bloom filter.
/// The max number of bits to use for the bloom filter.
static constexpr const char* kSparkBloomFilterMaxNumBits =
"spark.bloom_filter.max_num_bits";

Expand Down Expand Up @@ -358,6 +358,12 @@ class QueryConfig {
static constexpr const char* kMaxSplitPreloadPerDriver =
"max_split_preload_per_driver";

/// If not zero, specifies the cpu time slice limit in ms that a driver thread
/// can continuously run without yielding. If it is zero, then there is no
/// limit.
static constexpr const char* kDriverCpuTimeSliceLimitMs =
"driver_cpu_time_slice_limit_ms";

uint64_t queryMaxMemoryPerNode() const {
return toCapacity(
get<std::string>(kQueryMaxMemoryPerNode, "0B"), CapacityUnit::BYTE);
Expand Down Expand Up @@ -714,6 +720,10 @@ class QueryConfig {
return get<int32_t>(kMaxSplitPreloadPerDriver, 2);
}

uint32_t driverCpuTimeSliceLimitMs() const {
return get<uint32_t>(kDriverCpuTimeSliceLimitMs, 0);
}

template <typename T>
T get(const std::string& key, const T& defaultValue) const {
return config_->get<T>(key, defaultValue);
Expand Down
5 changes: 5 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ Generic Configuration
- true
- Whether to enable caches in expression evaluation. If set to true, optimizations including vector pools and
evalWithMemo are enabled.
* - driver_cpu_time_slice_limit_ms
- integer
- 0
- If it is not zero, specifies the time limit that a driver can continuously
run on a thread before yield. If it is zero, then it no limit.

.. _expression-evaluation-conf:

Expand Down
Loading

0 comments on commit e6dc44c

Please sign in to comment.