From 62b0a128702f4092b1680c4387d892d1d6829d20 Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Tue, 5 Nov 2024 03:46:47 -0800 Subject: [PATCH] Let arbitrator system use nano second (#11415) Summary: Use nano seconds in arbitrator system to avoid flakiness in time dependent tests. Pull Request resolved: https://github.com/facebookincubator/velox/pull/11415 Reviewed By: xiaoxmeng Differential Revision: D65385102 Pulled By: tanjialiang fbshipit-source-id: f1dc631d69a4e17850a9b90ebf0d5e29fb5aec86 --- velox/common/memory/ArbitrationOperation.cpp | 66 +++++------ velox/common/memory/ArbitrationOperation.h | 28 ++--- .../common/memory/ArbitrationParticipant.cpp | 12 +- velox/common/memory/ArbitrationParticipant.h | 22 ++-- velox/common/memory/SharedArbitrator.cpp | 103 ++++++++-------- velox/common/memory/SharedArbitrator.h | 14 +-- .../tests/ArbitrationParticipantTest.cpp | 84 ++++++------- .../memory/tests/MockSharedArbitratorTest.cpp | 112 +++++++++++++----- .../memory/tests/SharedArbitratorTestUtil.h | 4 +- velox/common/testutil/ScopedTestTime.cpp | 42 ++++--- velox/common/testutil/ScopedTestTime.h | 17 +-- .../testutil/tests/TestScopedTestTime.cpp | 17 +++ velox/common/time/Timer.cpp | 23 +++- velox/common/time/Timer.h | 10 +- 14 files changed, 327 insertions(+), 227 deletions(-) diff --git a/velox/common/memory/ArbitrationOperation.cpp b/velox/common/memory/ArbitrationOperation.cpp index b9cdd4917efc..b5c173c0dbcf 100644 --- a/velox/common/memory/ArbitrationOperation.cpp +++ b/velox/common/memory/ArbitrationOperation.cpp @@ -31,10 +31,10 @@ using namespace facebook::velox::memory; ArbitrationOperation::ArbitrationOperation( ScopedArbitrationParticipant&& participant, uint64_t requestBytes, - uint64_t timeoutMs) + uint64_t timeoutNs) : requestBytes_(requestBytes), - timeoutMs_(timeoutMs), - createTimeMs_(getCurrentTimeMs()), + timeoutNs_(timeoutNs), + createTimeNs_(getCurrentTimeNano()), participant_(std::move(participant)) { VELOX_CHECK_GT(requestBytes_, 0); } @@ -84,14 +84,14 @@ void ArbitrationOperation::start() { VELOX_CHECK_EQ(state_, State::kInit); participant_->startArbitration(this); setState(ArbitrationOperation::State::kRunning); - VELOX_CHECK_EQ(startTimeMs_, 0); - startTimeMs_ = getCurrentTimeMs(); + VELOX_CHECK_EQ(startTimeNs_, 0); + startTimeNs_ = getCurrentTimeNano(); } void ArbitrationOperation::finish() { setState(State::kFinished); - VELOX_CHECK_EQ(finishTimeMs_, 0); - finishTimeMs_ = getCurrentTimeMs(); + VELOX_CHECK_EQ(finishTimeNs_, 0); + finishTimeNs_ = getCurrentTimeNano(); participant_->finishArbitration(this); } @@ -99,30 +99,30 @@ bool ArbitrationOperation::aborted() const { return participant_->aborted(); } -size_t ArbitrationOperation::executionTimeMs() const { +uint64_t ArbitrationOperation::executionTimeNs() const { if (state_ == State::kFinished) { - VELOX_CHECK_GE(finishTimeMs_, createTimeMs_); - return finishTimeMs_ - createTimeMs_; + VELOX_CHECK_GE(finishTimeNs_, createTimeNs_); + return finishTimeNs_ - createTimeNs_; } else { - const auto currentTimeMs = getCurrentTimeMs(); - VELOX_CHECK_GE(currentTimeMs, createTimeMs_); - return currentTimeMs - createTimeMs_; + const auto currentTimeNs = getCurrentTimeNano(); + VELOX_CHECK_GE(currentTimeNs, createTimeNs_); + return currentTimeNs - createTimeNs_; } } bool ArbitrationOperation::hasTimeout() const { - return state_ != State::kFinished && timeoutMs() <= 0; + return state_ != State::kFinished && timeoutNs() <= 0; } -size_t ArbitrationOperation::timeoutMs() const { +uint64_t ArbitrationOperation::timeoutNs() const { if (state_ == State::kFinished) { return 0; } - const auto execTimeMs = executionTimeMs(); - if (execTimeMs >= timeoutMs_) { + const auto execTimeNs = executionTimeNs(); + if (execTimeNs >= timeoutNs_) { return 0; } - return timeoutMs_ - execTimeMs; + return timeoutNs_ - execTimeNs; } void ArbitrationOperation::setGrowTargets() { @@ -139,28 +139,28 @@ void ArbitrationOperation::setGrowTargets() { ArbitrationOperation::Stats ArbitrationOperation::stats() const { VELOX_CHECK_EQ(state_, State::kFinished); - VELOX_CHECK_NE(startTimeMs_, 0); + VELOX_CHECK_NE(startTimeNs_, 0); - const uint64_t executionTimeMs = this->executionTimeMs(); + const uint64_t executionTimeNs = this->executionTimeNs(); - VELOX_CHECK_GE(startTimeMs_, createTimeMs_); - const uint64_t localArbitrationWaitTimeMs = startTimeMs_ - createTimeMs_; - if (globalArbitrationStartTimeMs_ == 0) { + VELOX_CHECK_GE(startTimeNs_, createTimeNs_); + const uint64_t localArbitrationWaitTimeNs = startTimeNs_ - createTimeNs_; + if (globalArbitrationStartTimeNs_ == 0) { return { - localArbitrationWaitTimeMs, - finishTimeMs_ - startTimeMs_, + localArbitrationWaitTimeNs, + finishTimeNs_ - startTimeNs_, 0, - executionTimeMs}; + executionTimeNs}; } - VELOX_CHECK_GE(globalArbitrationStartTimeMs_, startTimeMs_); - const uint64_t localArbitrationExecTimeMs = - globalArbitrationStartTimeMs_ - startTimeMs_; + VELOX_CHECK_GE(globalArbitrationStartTimeNs_, startTimeNs_); + const uint64_t localArbitrationExecTimeNs = + globalArbitrationStartTimeNs_ - startTimeNs_; return { - localArbitrationWaitTimeMs, - localArbitrationExecTimeMs, - finishTimeMs_ - globalArbitrationStartTimeMs_, - executionTimeMs}; + localArbitrationWaitTimeNs, + localArbitrationExecTimeNs, + finishTimeNs_ - globalArbitrationStartTimeNs_, + executionTimeNs}; } std::ostream& operator<<(std::ostream& out, ArbitrationOperation::State state) { diff --git a/velox/common/memory/ArbitrationOperation.h b/velox/common/memory/ArbitrationOperation.h index 0096d15ee057..e494d93d2b85 100644 --- a/velox/common/memory/ArbitrationOperation.h +++ b/velox/common/memory/ArbitrationOperation.h @@ -31,7 +31,7 @@ class ArbitrationOperation { ArbitrationOperation( ScopedArbitrationParticipant&& pool, uint64_t requestBytes, - uint64_t timeoutMs); + uint64_t timeoutNs); ~ArbitrationOperation(); @@ -95,28 +95,28 @@ class ArbitrationOperation { /// Returns the remaining execution time for this operation before time out. /// If the operation has already finished, this returns zero. - size_t timeoutMs() const; + uint64_t timeoutNs() const; /// Returns true if this operation has timed out. bool hasTimeout() const; /// Returns the execution time of this arbitration operation since creation. - size_t executionTimeMs() const; + uint64_t executionTimeNs() const; /// Invoked to mark the start of global arbitration. This is used to measure /// how much time spent in waiting for global arbitration. void recordGlobalArbitrationStartTime() { - VELOX_CHECK_EQ(globalArbitrationStartTimeMs_, 0); + VELOX_CHECK_EQ(globalArbitrationStartTimeNs_, 0); VELOX_CHECK_EQ(state_, State::kRunning); - globalArbitrationStartTimeMs_ = getCurrentTimeMs(); + globalArbitrationStartTimeNs_ = getCurrentTimeNano(); } /// The execution stats of this arbitration operation after completion. struct Stats { - uint64_t localArbitrationWaitTimeMs{0}; - uint64_t localArbitrationExecTimeMs{0}; - uint64_t globalArbitrationWaitTimeMs{0}; - uint64_t executionTimeMs{0}; + uint64_t localArbitrationWaitTimeNs{0}; + uint64_t localArbitrationExecTimeNs{0}; + uint64_t globalArbitrationWaitTimeNs{0}; + uint64_t executionTimeNs{0}; }; /// NOTE: should only called after this arbitration operation finishes. @@ -126,22 +126,22 @@ class ArbitrationOperation { void setState(State state); const uint64_t requestBytes_; - const uint64_t timeoutMs_; + const uint64_t timeoutNs_; // The start time of this arbitration operation. - const uint64_t createTimeMs_; + const uint64_t createTimeNs_; const ScopedArbitrationParticipant participant_; State state_{State::kInit}; - uint64_t startTimeMs_{0}; - uint64_t finishTimeMs_{0}; + uint64_t startTimeNs_{0}; + uint64_t finishTimeNs_{0}; uint64_t maxGrowBytes_{0}; uint64_t minGrowBytes_{0}; // The time that starts global arbitration wait - uint64_t globalArbitrationStartTimeMs_{}; + uint64_t globalArbitrationStartTimeNs_{}; friend class ArbitrationParticipant; }; diff --git a/velox/common/memory/ArbitrationParticipant.cpp b/velox/common/memory/ArbitrationParticipant.cpp index 5a03ebb5033b..8ea63db8b485 100644 --- a/velox/common/memory/ArbitrationParticipant.cpp +++ b/velox/common/memory/ArbitrationParticipant.cpp @@ -102,7 +102,7 @@ ArbitrationParticipant::ArbitrationParticipant( pool_(pool.get()), config_(config), maxCapacity_(pool_->maxCapacity()), - createTimeUs_(getCurrentTimeMicro()) { + createTimeNs_(getCurrentTimeNano()) { VELOX_CHECK_LE( config_->minCapacity, maxCapacity_, @@ -261,7 +261,7 @@ void ArbitrationParticipant::finishArbitration(ArbitrationOperation* op) { uint64_t ArbitrationParticipant::reclaim( uint64_t targetBytes, - uint64_t maxWaitTimeMs, + uint64_t maxWaitTimeNs, MemoryReclaimer::Stats& stats) noexcept { targetBytes = std::max(targetBytes, config_->minReclaimBytes); if (targetBytes == 0) { @@ -275,7 +275,7 @@ uint64_t ArbitrationParticipant::reclaim( ++numReclaims_; VELOX_MEM_LOG(INFO) << "Reclaiming from memory pool " << pool_->name() << " with target " << succinctBytes(targetBytes); - pool_->reclaim(targetBytes, maxWaitTimeMs, stats); + pool_->reclaim(targetBytes, maxWaitTimeNs / 1'000'000, stats); reclaimedBytes = shrink(/*reclaimAll=*/false); } catch (const std::exception& e) { VELOX_MEM_LOG(ERROR) << "Failed to reclaim from memory pool " @@ -354,9 +354,9 @@ uint64_t ArbitrationParticipant::abortLocked( } bool ArbitrationParticipant::waitForReclaimOrAbort( - uint64_t maxWaitTimeMs) const { + uint64_t maxWaitTimeNs) const { std::unique_lock l( - reclaimLock_, std::chrono::milliseconds(maxWaitTimeMs)); + reclaimLock_, std::chrono::nanoseconds(maxWaitTimeNs)); return l.owns_lock(); } @@ -380,7 +380,7 @@ std::string ArbitrationParticipant::Stats::toString() const { succinctBytes(reclaimedBytes), succinctBytes(growBytes), aborted, - succinctMicros(durationUs)); + succinctNanos(durationNs)); } ScopedArbitrationParticipant::ScopedArbitrationParticipant( diff --git a/velox/common/memory/ArbitrationParticipant.h b/velox/common/memory/ArbitrationParticipant.h index 26c20b320160..8d4c677ad94b 100644 --- a/velox/common/memory/ArbitrationParticipant.h +++ b/velox/common/memory/ArbitrationParticipant.h @@ -144,10 +144,10 @@ class ArbitrationParticipant } /// Returns the duration of this arbitration participant since its creation. - uint64_t durationUs() const { - const auto now = getCurrentTimeMicro(); - VELOX_CHECK_GE(now, createTimeUs_); - return now - createTimeUs_; + uint64_t durationNs() const { + const auto now = getCurrentTimeNano(); + VELOX_CHECK_GE(now, createTimeNs_); + return now - createTimeNs_; } /// Invoked to acquire a shared reference to this arbitration participant @@ -206,11 +206,11 @@ class ArbitrationParticipant /// restriction. uint64_t shrink(bool reclaimAll = false); - // Invoked to reclaim used memory from this memory pool with specified - // 'targetBytes'. The function returns the actually freed capacity. + /// Invoked to reclaim used memory from this memory pool with specified + /// 'targetBytes'. The function returns the actually freed capacity. uint64_t reclaim( uint64_t targetBytes, - uint64_t maxWaitTimeMs, + uint64_t maxWaitTimeNs, MemoryReclaimer::Stats& stats) noexcept; /// Invoked to abort the query memory pool and returns the reclaimed bytes @@ -226,7 +226,7 @@ class ArbitrationParticipant /// Invoked to wait for the pending memory reclaim or abort operation to /// complete within a 'maxWaitTimeMs' time window. The function returns false /// if the wait has timed out. - bool waitForReclaimOrAbort(uint64_t maxWaitTimeMs) const; + bool waitForReclaimOrAbort(uint64_t maxWaitTimeNs) const; /// Invoked to start arbitration operation 'op'. The operation needs to wait /// for the prior arbitration operations to finish first before executing to @@ -246,7 +246,7 @@ class ArbitrationParticipant size_t numWaitingOps() const; struct Stats { - uint64_t durationUs{0}; + uint64_t durationNs{0}; uint32_t numRequests{0}; uint32_t numReclaims{0}; uint32_t numShrinks{0}; @@ -260,7 +260,7 @@ class ArbitrationParticipant Stats stats() const { Stats stats; - stats.durationUs = durationUs(); + stats.durationNs = durationNs(); stats.aborted = aborted_; stats.numRequests = numRequests_; stats.numGrows = numGrows_; @@ -310,7 +310,7 @@ class ArbitrationParticipant MemoryPool* const pool_; const Config* const config_; const uint64_t maxCapacity_; - const size_t createTimeUs_; + const uint64_t createTimeNs_; mutable std::mutex stateLock_; bool aborted_{false}; diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index c9c8a1fd9700..2dc13af45901 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -103,9 +103,9 @@ uint64_t SharedArbitrator::ExtraConfig::memoryPoolReservedCapacity( config::CapacityUnit::BYTE); } -uint64_t SharedArbitrator::ExtraConfig::memoryReclaimMaxWaitTimeMs( +uint64_t SharedArbitrator::ExtraConfig::memoryReclaimMaxWaitTimeNs( const std::unordered_map& configs) { - return std::chrono::duration_cast( + return std::chrono::duration_cast( config::toDuration(getConfig( configs, kMemoryReclaimMaxWaitTime, @@ -214,8 +214,8 @@ SharedArbitrator::SharedArbitrator(const Config& config) : MemoryArbitrator(config), reservedCapacity_(ExtraConfig::reservedCapacity(config.extraConfigs)), checkUsageLeak_(ExtraConfig::checkUsageLeak(config.extraConfigs)), - maxArbitrationTimeMs_( - ExtraConfig::memoryReclaimMaxWaitTimeMs(config.extraConfigs)), + maxArbitrationTimeNs_( + ExtraConfig::memoryReclaimMaxWaitTimeNs(config.extraConfigs)), participantConfig_( ExtraConfig::memoryPoolInitialCapacity(config.extraConfigs), ExtraConfig::memoryPoolReservedCapacity(config.extraConfigs), @@ -241,7 +241,7 @@ SharedArbitrator::SharedArbitrator(const Config& config) VELOX_CHECK_EQ(kind_, config.kind); VELOX_CHECK_LE(reservedCapacity_, capacity_); VELOX_CHECK_GT( - maxArbitrationTimeMs_, 0, "maxArbitrationTimeMs can't be zero"); + maxArbitrationTimeNs_, 0, "maxArbitrationTimeNs can't be zero"); VELOX_CHECK_LE( globalArbitrationMemoryReclaimPct_, @@ -270,7 +270,7 @@ SharedArbitrator::SharedArbitrator(const Config& config) << " reserved capacity"; if (globalArbitrationEnabled_) { VELOX_MEM_LOG(INFO) << "Arbitration config: max arbitration time " - << succinctMillis(maxArbitrationTimeMs_) + << succinctNanos(maxArbitrationTimeNs_) << ", global memory reclaim percentage " << globalArbitrationMemoryReclaimPct_ << ", global arbitration abort time ratio " @@ -385,39 +385,34 @@ void SharedArbitrator::finishArbitration(ArbitrationOperation* op) { op->finish(); const auto stats = op->stats(); - if (stats.executionTimeMs != 0) { + if (stats.executionTimeNs != 0) { RECORD_HISTOGRAM_METRIC_VALUE( - kMetricArbitratorOpExecTimeMs, stats.executionTimeMs); + kMetricArbitratorOpExecTimeMs, stats.executionTimeNs / 1'000'000); addThreadLocalRuntimeStat( kMemoryArbitrationWallNanos, - RuntimeCounter( - stats.executionTimeMs * 1'000 * 1'000, - RuntimeCounter::Unit::kNanos)); + RuntimeCounter(stats.executionTimeNs, RuntimeCounter::Unit::kNanos)); } - if (stats.localArbitrationWaitTimeMs != 0) { + if (stats.localArbitrationWaitTimeNs != 0) { addThreadLocalRuntimeStat( kLocalArbitrationWaitWallNanos, RuntimeCounter( - stats.localArbitrationWaitTimeMs * 1'000 * 1'000, - RuntimeCounter::Unit::kNanos)); + stats.localArbitrationWaitTimeNs, RuntimeCounter::Unit::kNanos)); } - if (stats.localArbitrationExecTimeMs != 0) { + if (stats.localArbitrationExecTimeNs != 0) { addThreadLocalRuntimeStat( kLocalArbitrationExecutionWallNanos, RuntimeCounter( - stats.localArbitrationExecTimeMs * 1'000 * 1'000, - RuntimeCounter::Unit::kNanos)); + stats.localArbitrationExecTimeNs, RuntimeCounter::Unit::kNanos)); } - if (stats.globalArbitrationWaitTimeMs != 0) { + if (stats.globalArbitrationWaitTimeNs != 0) { addThreadLocalRuntimeStat( kGlobalArbitrationWaitWallNanos, RuntimeCounter( - stats.globalArbitrationWaitTimeMs * 1'000 * 1'000, - RuntimeCounter::Unit::kNanos)); + stats.globalArbitrationWaitTimeNs, RuntimeCounter::Unit::kNanos)); RECORD_HISTOGRAM_METRIC_VALUE( kMetricArbitratorGlobalArbitrationWaitTimeMs, - stats.globalArbitrationWaitTimeMs); + stats.globalArbitrationWaitTimeNs / 1'000'000); } } @@ -655,7 +650,7 @@ uint64_t SharedArbitrator::shrinkCapacity( const uint64_t targetBytes = requestBytes == 0 ? capacity_ : requestBytes; ScopedMemoryArbitrationContext abitrationCtx{}; - const uint64_t startTimeMs = getCurrentTimeMs(); + const uint64_t startTimeNs = getCurrentTimeNano(); uint64_t totalReclaimedBytes{0}; if (allowSpill) { @@ -675,13 +670,13 @@ uint64_t SharedArbitrator::shrinkCapacity( } } - const uint64_t reclaimTimeMs = getCurrentTimeMs() - startTimeMs; + const uint64_t reclaimTimeNs = getCurrentTimeNano() - startTimeNs; VELOX_MEM_LOG(INFO) << "External shrink reclaimed " << succinctBytes(totalReclaimedBytes) << ", spent " - << succinctMillis(reclaimTimeMs) << ", spill " + << succinctNanos(reclaimTimeNs) << ", spill " << (allowSpill ? "allowed" : "not allowed") << ", abort " << (allowSpill ? "allowed" : "not allowed"); - updateGlobalArbitrationStats(reclaimTimeMs, totalReclaimedBytes); + updateGlobalArbitrationStats(reclaimTimeNs, totalReclaimedBytes); return totalReclaimedBytes; } @@ -694,7 +689,7 @@ ArbitrationOperation SharedArbitrator::createArbitrationOperation( auto participant = getParticipant(pool->name()); VELOX_CHECK(participant.has_value()); return ArbitrationOperation( - std::move(participant.value()), requestBytes, maxArbitrationTimeMs_); + std::move(participant.value()), requestBytes, maxArbitrationTimeNs_); } bool SharedArbitrator::growCapacity(MemoryPool* pool, uint64_t requestBytes) { @@ -756,7 +751,7 @@ bool SharedArbitrator::growCapacity(ArbitrationOperation& op) { reclaim( op.participant(), op.requestBytes(), - op.timeoutMs(), + op.timeoutNs(), /*localArbitration=*/true); checkIfAborted(op); RETURN_IF_TRUE(maybeGrowFromSelf(op)); @@ -803,13 +798,14 @@ bool SharedArbitrator::startAndWaitGlobalArbitration(ArbitrationOperation& op) { op.recordGlobalArbitrationStartTime(); wakeupGlobalArbitrationThread(); - const bool timeout = !std::move(arbitrationWaitFuture) - .wait(std::chrono::milliseconds(op.timeoutMs())); + const bool timeout = + !std::move(arbitrationWaitFuture) + .wait(std::chrono::microseconds(op.timeoutNs() / 1'000)); if (timeout) { VELOX_MEM_LOG(ERROR) << op.participant()->name() << " wait for memory arbitration timed out after running " - << succinctMillis(op.executionTimeMs()); + << succinctNanos(op.executionTimeNs()); removeGlobalArbitrationWaiter(op.participant()->id()); } @@ -826,16 +822,16 @@ bool SharedArbitrator::startAndWaitGlobalArbitration(ArbitrationOperation& op) { } void SharedArbitrator::updateGlobalArbitrationStats( - uint64_t arbitrationTimeMs, + uint64_t arbitrationTimeNs, uint64_t arbitrationBytes) { - globalArbitrationTimeMs_ += arbitrationTimeMs; + globalArbitrationTimeNs_ += arbitrationTimeNs; ++globalArbitrationRuns_; globalArbitrationBytes_ += arbitrationBytes; RECORD_METRIC_VALUE(kMetricArbitratorGlobalArbitrationCount); RECORD_HISTOGRAM_METRIC_VALUE( kMetricArbitratorGlobalArbitrationBytes, arbitrationBytes); RECORD_HISTOGRAM_METRIC_VALUE( - kMetricArbitratorGlobalArbitrationTimeMs, arbitrationTimeMs); + kMetricArbitratorGlobalArbitrationTimeMs, arbitrationTimeNs / 1'000'000); } void SharedArbitrator::globalArbitrationMain() { @@ -858,19 +854,19 @@ void SharedArbitrator::globalArbitrationMain() { } bool SharedArbitrator::globalArbitrationShouldReclaimByAbort( - uint64_t globalRunElapsedTimeMs, + uint64_t globalRunElapsedTimeNs, bool hasReclaimedByAbort, bool allParticipantsReclaimed, uint64_t lastReclaimedBytes) const { return globalArbitrationWithoutSpill_ || - (globalRunElapsedTimeMs > - maxArbitrationTimeMs_ * globalArbitrationAbortTimeRatio_ && + (globalRunElapsedTimeNs > + maxArbitrationTimeNs_ * globalArbitrationAbortTimeRatio_ && (hasReclaimedByAbort || (allParticipantsReclaimed && lastReclaimedBytes == 0))); } void SharedArbitrator::runGlobalArbitration() { - const uint64_t startTimeMs = getCurrentTimeMs(); + const uint64_t startTimeNs = getCurrentTimeNano(); uint64_t totalReclaimedBytes{0}; bool shouldReclaimByAbort{false}; uint64_t reclaimedBytes{0}; @@ -883,9 +879,9 @@ void SharedArbitrator::runGlobalArbitration() { size_t round{0}; for (;; ++round) { - uint64_t arbitrationTimeUs{0}; + uint64_t arbitrationTimeNs{0}; { - MicrosecondTimer timer(&arbitrationTimeUs); + NanosecondTimer timer(&arbitrationTimeNs); const uint64_t targetBytes = getGlobalArbitrationTarget(); if (targetBytes == 0) { break; @@ -894,7 +890,7 @@ void SharedArbitrator::runGlobalArbitration() { // Check if we need to abort participant to reclaim used memory to // accelerate global arbitration. shouldReclaimByAbort = globalArbitrationShouldReclaimByAbort( - getCurrentTimeMs() - startTimeMs, + getCurrentTimeNano() - startTimeNs, shouldReclaimByAbort, allParticipantsReclaimed, reclaimedBytes); @@ -911,12 +907,12 @@ void SharedArbitrator::runGlobalArbitration() { reclaimUnusedCapacity(); } - updateGlobalArbitrationStats(arbitrationTimeUs / 1'000, reclaimedBytes); + updateGlobalArbitrationStats(arbitrationTimeNs, reclaimedBytes); } VELOX_MEM_LOG(INFO) << "Global arbitration reclaimed " << succinctBytes(totalReclaimedBytes) << " " << reclaimedParticipants.size() << " victims, spent " - << succinctMillis(getCurrentTimeMs() - startTimeMs) + << succinctNanos(getCurrentTimeNano() - startTimeNs) << " with " << round << " rounds"; } @@ -953,7 +949,7 @@ void SharedArbitrator::checkIfTimeout(ArbitrationOperation& op) { VELOX_MEM_ARBITRATION_TIMEOUT(fmt::format( "Memory arbitration timed out on memory pool: {} after running {}", op.participant()->name(), - succinctMillis(op.executionTimeMs()))); + succinctNanos(op.executionTimeNs()))); } } @@ -1005,7 +1001,7 @@ bool SharedArbitrator::ensureCapacity(ArbitrationOperation& op) { reclaim( op.participant(), op.requestBytes(), - op.timeoutMs(), + op.timeoutNs(), /*localArbitration=*/true); // Checks if the requestor has been aborted in reclaim above. checkIfAborted(op); @@ -1122,7 +1118,7 @@ uint64_t SharedArbitrator::reclaimUsedMemoryBySpill( const uint64_t reclaimedBytes = reclaim( participant, victim.reclaimableUsedCapacity, - maxArbitrationTimeMs_, + maxArbitrationTimeNs_, /*localArbitration=*/false); return std::make_unique( participant->id(), reclaimedBytes); @@ -1183,14 +1179,14 @@ uint64_t SharedArbitrator::shrink( uint64_t SharedArbitrator::reclaim( const ScopedArbitrationParticipant& participant, uint64_t targetBytes, - uint64_t timeoutMs, + uint64_t timeoutNs, bool localArbitration) noexcept { - uint64_t reclaimTimeUs{0}; + uint64_t reclaimTimeNs{0}; uint64_t reclaimedBytes{0}; MemoryReclaimer::Stats stats; { - MicrosecondTimer reclaimTimer(&reclaimTimeUs); - reclaimedBytes = participant->reclaim(targetBytes, timeoutMs, stats); + NanosecondTimer reclaimTimer(&reclaimTimeNs); + reclaimedBytes = participant->reclaim(targetBytes, timeoutNs, stats); } // NOTE: if memory reclaim fails, then the participant is also aborted. If // it happens, we shall first fail the arbitration operation from the @@ -1201,11 +1197,11 @@ uint64_t SharedArbitrator::reclaim( freeCapacity(reclaimedBytes); updateMemoryReclaimStats( - reclaimedBytes, reclaimTimeUs / 1'000, localArbitration, stats); + reclaimedBytes, reclaimTimeNs, localArbitration, stats); VELOX_MEM_LOG(INFO) << "Reclaimed from memory pool " << participant->name() << " with target of " << succinctBytes(targetBytes) << ", reclaimed " << succinctBytes(reclaimedBytes) - << ", spent " << succinctMicros(reclaimTimeUs) + << ", spent " << succinctNanos(reclaimTimeNs) << ", local arbitration: " << localArbitration << " stats " << succinctBytes(stats.reclaimedBytes) << " numNonReclaimableAttempts " @@ -1223,7 +1219,7 @@ uint64_t SharedArbitrator::reclaim( void SharedArbitrator::updateMemoryReclaimStats( uint64_t reclaimedBytes, - uint64_t reclaimTimeMs, + uint64_t reclaimTimeNs, bool localArbitration, const MemoryReclaimer::Stats& stats) { if (localArbitration) { @@ -1232,7 +1228,8 @@ void SharedArbitrator::updateMemoryReclaimStats( reclaimedUsedBytes_ += reclaimedBytes; numNonReclaimableAttempts_ += stats.numNonReclaimableAttempts; RECORD_METRIC_VALUE(kMetricQueryMemoryReclaimCount); - RECORD_HISTOGRAM_METRIC_VALUE(kMetricQueryMemoryReclaimTimeMs, reclaimTimeMs); + RECORD_HISTOGRAM_METRIC_VALUE( + kMetricQueryMemoryReclaimTimeMs, reclaimTimeNs / 1'000'000); RECORD_HISTOGRAM_METRIC_VALUE( kMetricQueryMemoryReclaimedBytes, reclaimedBytes); } diff --git a/velox/common/memory/SharedArbitrator.h b/velox/common/memory/SharedArbitrator.h index 14f97d81b4e1..1ed5569ca9dd 100644 --- a/velox/common/memory/SharedArbitrator.h +++ b/velox/common/memory/SharedArbitrator.h @@ -84,7 +84,7 @@ class SharedArbitrator : public memory::MemoryArbitrator { static constexpr std::string_view kMemoryReclaimMaxWaitTime{ "memory-reclaim-max-wait-time"}; static constexpr std::string_view kDefaultMemoryReclaimMaxWaitTime{"5m"}; - static uint64_t memoryReclaimMaxWaitTimeMs( + static uint64_t memoryReclaimMaxWaitTimeNs( const std::unordered_map& configs); /// When shrinking capacity, the shrink bytes will be adjusted in a way such @@ -400,7 +400,7 @@ class SharedArbitrator : public memory::MemoryArbitrator { // iteration of global run should directly reclaim capacity by aborting // queries. bool globalArbitrationShouldReclaimByAbort( - uint64_t globalRunElapsedTimeMs, + uint64_t globalRunElapsedTimeNs, bool hasReclaimedByAbort, bool allParticipantsReclaimed, uint64_t lastReclaimedBytes) const; @@ -503,7 +503,7 @@ class SharedArbitrator : public memory::MemoryArbitrator { uint64_t reclaim( const ScopedArbitrationParticipant& participant, uint64_t targetBytes, - uint64_t timeoutMs, + uint64_t timeoutNs, bool localArbitration) noexcept; uint64_t shrink( @@ -574,7 +574,7 @@ class SharedArbitrator : public memory::MemoryArbitrator { void updateMemoryReclaimStats( uint64_t reclaimedBytes, - uint64_t reclaimTimeMs, + uint64_t reclaimTimeNs, bool localArbitration, const MemoryReclaimer::Stats& stats); @@ -583,12 +583,12 @@ class SharedArbitrator : public memory::MemoryArbitrator { void updateArbitrationFailureStats(); void updateGlobalArbitrationStats( - uint64_t arbitrationTimeMs, + uint64_t arbitrationTimeNs, uint64_t arbitrationBytes); const uint64_t reservedCapacity_; const bool checkUsageLeak_; - const uint64_t maxArbitrationTimeMs_; + const uint64_t maxArbitrationTimeNs_; const ArbitrationParticipant::Config participantConfig_; const double memoryReclaimThreadsHwMultiplier_; const bool globalArbitrationEnabled_; @@ -646,7 +646,7 @@ class SharedArbitrator : public memory::MemoryArbitrator { std::map globalArbitrationWaiters_; tsan_atomic globalArbitrationRuns_{0}; - tsan_atomic globalArbitrationTimeMs_{0}; + tsan_atomic globalArbitrationTimeNs_{0}; tsan_atomic globalArbitrationBytes_{0}; std::atomic_uint64_t numRequests_{0}; diff --git a/velox/common/memory/tests/ArbitrationParticipantTest.cpp b/velox/common/memory/tests/ArbitrationParticipantTest.cpp index 981416308e28..26330ec03b59 100644 --- a/velox/common/memory/tests/ArbitrationParticipantTest.cpp +++ b/velox/common/memory/tests/ArbitrationParticipantTest.cpp @@ -895,7 +895,7 @@ TEST_F(ArbitrationParticipantTest, reclaimableFreeCapacityAndShrink) { ASSERT_EQ(scopedParticipant->stats().numShrinks, 2); ASSERT_EQ(scopedParticipant->stats().numReclaims, 0); ASSERT_EQ(scopedParticipant->stats().numGrows, 1); - ASSERT_GE(scopedParticipant->stats().durationUs, 0); + ASSERT_GE(scopedParticipant->stats().durationNs, 0); ASSERT_FALSE(scopedParticipant->stats().aborted); if (buffer != nullptr) { @@ -1486,7 +1486,7 @@ DEBUG_ONLY_TEST_F(ArbitrationParticipantTest, reclaimLock) { DEBUG_ONLY_TEST_F(ArbitrationParticipantTest, waitForReclaimOrAbort) { struct { - uint64_t waitTimeUs; + uint64_t waitTimeNs; bool pendingReclaim; uint64_t reclaimWaitMs{0}; bool expectedTimeout; @@ -1494,7 +1494,7 @@ DEBUG_ONLY_TEST_F(ArbitrationParticipantTest, waitForReclaimOrAbort) { std::string debugString() const { return fmt::format( "waitTime {}, pendingReclaim {}, reclaimWait {}, expectedTimeout {}", - succinctMicros(waitTimeUs), + succinctNanos(waitTimeNs), pendingReclaim, succinctMillis(reclaimWaitMs), expectedTimeout); @@ -1502,8 +1502,8 @@ DEBUG_ONLY_TEST_F(ArbitrationParticipantTest, waitForReclaimOrAbort) { } testSettings[] = { {0, true, 1'000, true}, {0, false, 1'000, true}, - {1'000'000, true, 1'000, false}, - {1'000'000, true, 1'000, false}}; + {1'000'000'000'000UL, true, 1'000, false}, + {1'000'000'000'000UL, true, 1'000, false}}; for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); @@ -1540,7 +1540,8 @@ DEBUG_ONLY_TEST_F(ArbitrationParticipantTest, waitForReclaimOrAbort) { std::thread reclaimThread([&]() { if (testData.pendingReclaim) { memory::MemoryReclaimer::Stats stats; - ASSERT_EQ(scopedParticipant->reclaim(MB, 1'000'000, stats), MB); + ASSERT_EQ( + scopedParticipant->reclaim(MB, 1'000'000'000'000UL, stats), MB); } else { const std::string abortReason = "test abort"; try { @@ -1552,7 +1553,7 @@ DEBUG_ONLY_TEST_F(ArbitrationParticipantTest, waitForReclaimOrAbort) { }); reclaimWait.await([&]() { return reclaimWaitFlag.load(); }); ASSERT_EQ( - scopedParticipant->waitForReclaimOrAbort(testData.waitTimeUs), + scopedParticipant->waitForReclaimOrAbort(testData.waitTimeNs), !testData.expectedTimeout); reclaimThread.join(); } @@ -1657,20 +1658,20 @@ TEST_F(ArbitrationParticipantTest, arbitrationOperation) { ArbitrationParticipant::create(participantId, task->pool(), &config); auto scopedParticipant = participant->lock().value(); const int requestBytes = 1 << 20; - const int opTimeoutMs = 1'000'000; + const uint64_t opTimeoutNs = 1'000'000'000'000UL; ArbitrationOperation op( - participant->lock().value(), requestBytes, opTimeoutMs); + participant->lock().value(), requestBytes, opTimeoutNs); VELOX_ASSERT_THROW( - ArbitrationOperation(participant->lock().value(), 0, opTimeoutMs), ""); + ArbitrationOperation(participant->lock().value(), 0, opTimeoutNs), ""); VELOX_ASSERT_THROW(op.stats(), "(init vs. finished)"); ASSERT_EQ(op.requestBytes(), requestBytes); ASSERT_FALSE(op.aborted()); ASSERT_FALSE(op.hasTimeout()); - ASSERT_LE(op.timeoutMs(), opTimeoutMs); + ASSERT_LE(op.timeoutNs(), opTimeoutNs); std::this_thread::sleep_for(std::chrono::milliseconds(200)); // NOLINT - ASSERT_GE(op.executionTimeMs(), 200); - ASSERT_LE(op.timeoutMs(), opTimeoutMs - 200); + ASSERT_GE(op.executionTimeNs(), 200'000'000UL); + ASSERT_LE(op.timeoutNs(), opTimeoutNs - 200'000'000UL); ASSERT_EQ(op.maxGrowBytes(), 0); ASSERT_EQ(op.minGrowBytes(), 0); ASSERT_FALSE(op.hasTimeout()); @@ -1711,14 +1712,14 @@ TEST_F(ArbitrationParticipantTest, arbitrationOperation) { ASSERT_EQ(scopedParticipant->numWaitingOps(), 0); VELOX_ASSERT_THROW(op.recordGlobalArbitrationStartTime(), ""); ASSERT_FALSE(op.hasTimeout()); - const auto execTimeMs = op.executionTimeMs(); + const auto execTimeNs = op.executionTimeNs(); std::this_thread::sleep_for(std::chrono::milliseconds(200)); // NOLINT - ASSERT_EQ(op.executionTimeMs(), execTimeMs); + ASSERT_EQ(op.executionTimeNs(), execTimeNs); ASSERT_FALSE(op.hasTimeout()); - ASSERT_GE(op.stats().localArbitrationWaitTimeMs, 200); - ASSERT_GE(op.stats().localArbitrationExecTimeMs, 200); - ASSERT_GE(op.stats().globalArbitrationWaitTimeMs, 200); - ASSERT_GE(op.stats().executionTimeMs, 600); + ASSERT_GE(op.stats().localArbitrationWaitTimeNs, 200'000'000UL); + ASSERT_GE(op.stats().localArbitrationExecTimeNs, 200'000'000UL); + ASSERT_GE(op.stats().globalArbitrationWaitTimeNs, 200'000'000UL); + ASSERT_GE(op.stats().executionTimeNs, 600'000'000UL); // Operation timeout. { @@ -1759,25 +1760,25 @@ TEST_F(ArbitrationParticipantTest, arbitrationOperationStats) { ArbitrationParticipant::create(participantId, task->pool(), &config); auto scopedParticipant = participant->lock().value(); const int requestBytes = 1 << 20; - const int opTimeoutMs = 1'000'000; + const uint64_t opTimeoutNs = 1'000'000'000'000UL; // Operation stats without global arbitration. { ArbitrationOperation op( - participant->lock().value(), requestBytes, opTimeoutMs); + participant->lock().value(), requestBytes, opTimeoutNs); std::this_thread::sleep_for(std::chrono::milliseconds(200)); // NOLINT op.start(); std::this_thread::sleep_for(std::chrono::milliseconds(200)); // NOLINT op.finish(); const auto stats = op.stats(); - ASSERT_GE(stats.localArbitrationWaitTimeMs, 200); - ASSERT_GE(stats.localArbitrationExecTimeMs, 200); - ASSERT_GE(stats.globalArbitrationWaitTimeMs, 0); - ASSERT_GE(stats.executionTimeMs, 400); + ASSERT_GE(stats.localArbitrationWaitTimeNs, 200'000'000UL); + ASSERT_GE(stats.localArbitrationExecTimeNs, 200'000'000UL); + ASSERT_GE(stats.globalArbitrationWaitTimeNs, 0UL); + ASSERT_GE(stats.executionTimeNs, 400'000'000UL); } // Operation stats with global arbitration. { ArbitrationOperation op( - participant->lock().value(), requestBytes, opTimeoutMs); + participant->lock().value(), requestBytes, opTimeoutNs); std::this_thread::sleep_for(std::chrono::milliseconds(200)); // NOLINT op.start(); std::this_thread::sleep_for(std::chrono::milliseconds(200)); // NOLINT @@ -1785,16 +1786,16 @@ TEST_F(ArbitrationParticipantTest, arbitrationOperationStats) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); // NOLINT op.finish(); const auto stats = op.stats(); - ASSERT_GE(stats.localArbitrationWaitTimeMs, 200); - ASSERT_GE(stats.localArbitrationExecTimeMs, 200); - ASSERT_GE(stats.globalArbitrationWaitTimeMs, 200); - ASSERT_GE(stats.executionTimeMs, 600); + ASSERT_GE(stats.localArbitrationWaitTimeNs, 200'000'000UL); + ASSERT_GE(stats.localArbitrationExecTimeNs, 200'000'000UL); + ASSERT_GE(stats.globalArbitrationWaitTimeNs, 200'000'000UL); + ASSERT_GE(stats.executionTimeNs, 600'000'000UL); } // Operation stats not started. { ArbitrationOperation op( - participant->lock().value(), requestBytes, opTimeoutMs); + participant->lock().value(), requestBytes, opTimeoutNs); std::this_thread::sleep_for(std::chrono::milliseconds(200)); // NOLINT VELOX_ASSERT_THROW(op.finish(), ""); VELOX_ASSERT_THROW(op.stats(), ""); @@ -1807,14 +1808,15 @@ TEST_F(ArbitrationParticipantTest, arbitrationOperationWait) { auto participant = ArbitrationParticipant::create(10, task->pool(), &config); auto scopedParticipant = participant->lock().value(); const int requestBytes = 1 << 20; - const int opTimeoutMs = 1'000'000; + const uint64_t opTimeoutNs = 1'000'000'000'000UL; ArbitrationOperation op1( - participant->lock().value(), requestBytes, opTimeoutMs); + participant->lock().value(), requestBytes, opTimeoutNs); ArbitrationOperation op2( - participant->lock().value(), requestBytes, opTimeoutMs); + participant->lock().value(), requestBytes, opTimeoutNs); ArbitrationOperation op3( - participant->lock().value(), requestBytes, opTimeoutMs); - ArbitrationOperation op4(participant->lock().value(), requestBytes, 1'000); + participant->lock().value(), requestBytes, opTimeoutNs); + ArbitrationOperation op4( + participant->lock().value(), requestBytes, 1'000'000'000UL); ASSERT_FALSE(scopedParticipant->hasRunningOp()); ASSERT_EQ(scopedParticipant->numWaitingOps(), 0); @@ -1879,19 +1881,21 @@ TEST_F(ArbitrationParticipantTest, arbitrationOperationWait) { op1.finish(); ASSERT_EQ(op1.state(), ArbitrationOperation::State::kFinished); ASSERT_FALSE(op1.hasTimeout()); - ASSERT_GE(op1.executionTimeMs(), 1'000); + ASSERT_GE(op1.executionTimeNs(), 1'000'000'000UL); op2Thread.join(); ASSERT_EQ(op2.state(), ArbitrationOperation::State::kFinished); - ASSERT_GE(op2.executionTimeMs(), 1'000 + 500); + ASSERT_GE(op2.executionTimeNs(), 1'000'000'000UL + 500'000'000UL); op3Thread.join(); ASSERT_EQ(op3.state(), ArbitrationOperation::State::kFinished); - ASSERT_GE(op3.executionTimeMs(), 1'000 + 500 + 500); + ASSERT_GE( + op3.executionTimeNs(), 1'000'000'000UL + 500'000'000UL + 500'000'000UL); op4Thread.join(); ASSERT_EQ(op4.state(), ArbitrationOperation::State::kFinished); - ASSERT_GE(op4.executionTimeMs(), 1'000 + 500 + 500); + ASSERT_GE( + op4.executionTimeNs(), 1'000'000'000UL + 500'000'000UL + 500'000'000UL); ASSERT_FALSE(scopedParticipant->hasRunningOp()); ASSERT_EQ(scopedParticipant->numWaitingOps(), 0); diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 75ad335eb23a..d9d3b68f68d6 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -22,7 +22,6 @@ #include #include #include "folly/experimental/EventCount.h" -#include "folly/futures/Barrier.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/memory/MallocAllocator.h" #include "velox/common/memory/Memory.h" @@ -450,7 +449,7 @@ class MockSharedArbitrationTest : public testing::Test { kMemoryReclaimThreadsHwMultiplier, std::function arbitrationStateCheckCb = nullptr, bool globalArtbitrationEnabled = true, - uint64_t arbitrationTimeoutMs = 5 * 60 * 1'000, + uint64_t arbitrationTimeoutNs = 5 * 60 * 1'000'000'000UL, bool globalArbitrationWithoutSpill = false, double globalArbitrationAbortTimeRatio = 0.5) { MemoryManagerOptions options; @@ -483,7 +482,7 @@ class MockSharedArbitrationTest : public testing::Test { {std::string(ExtraConfig::kMemoryReclaimThreadsHwMultiplier), folly::to(memoryReclaimThreadsHwMultiplier)}, {std::string(ExtraConfig::kMemoryReclaimMaxWaitTime), - folly::to(arbitrationTimeoutMs) + "ms"}, + folly::to(arbitrationTimeoutNs) + "ns"}, {std::string(ExtraConfig::kGlobalArbitrationEnabled), folly::to(globalArtbitrationEnabled)}, {std::string(ExtraConfig::kGlobalArbitrationWithoutSpill), @@ -577,8 +576,8 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { SharedArbitrator::ExtraConfig::memoryPoolInitialCapacity(emptyConfigs), 256 << 20); ASSERT_EQ( - SharedArbitrator::ExtraConfig::memoryReclaimMaxWaitTimeMs(emptyConfigs), - 300'000); + SharedArbitrator::ExtraConfig::memoryReclaimMaxWaitTimeNs(emptyConfigs), + 300'000'000'000UL); ASSERT_EQ( SharedArbitrator::ExtraConfig::globalArbitrationEnabled(emptyConfigs), SharedArbitrator::ExtraConfig::kDefaultGlobalArbitrationEnabled); @@ -644,7 +643,8 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { ASSERT_EQ( SharedArbitrator::ExtraConfig::memoryPoolReservedCapacity(configs), 200); ASSERT_EQ( - SharedArbitrator::ExtraConfig::memoryReclaimMaxWaitTimeMs(configs), 5000); + SharedArbitrator::ExtraConfig::memoryReclaimMaxWaitTimeNs(configs), + 5'000'000'000); ASSERT_TRUE(SharedArbitrator::ExtraConfig::globalArbitrationEnabled(configs)); ASSERT_FALSE(SharedArbitrator::ExtraConfig::checkUsageLeak(configs)); ASSERT_EQ( @@ -707,7 +707,7 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { SharedArbitrator::ExtraConfig::memoryPoolReservedCapacity(configs), "Invalid capacity string 'invalid'"); VELOX_ASSERT_THROW( - SharedArbitrator::ExtraConfig::memoryReclaimMaxWaitTimeMs(configs), + SharedArbitrator::ExtraConfig::memoryReclaimMaxWaitTimeNs(configs), "Invalid duration 'invalid'"); VELOX_ASSERT_THROW( SharedArbitrator::ExtraConfig::globalArbitrationEnabled(configs), @@ -748,7 +748,7 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { VELOX_ASSERT_THROW( setupMemory( kMemoryCapacity, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, nullptr, false, 0), - "(0 vs. 0) maxArbitrationTimeMs can't be zero"); + "(0 vs. 0) maxArbitrationTimeNs can't be zero"); } TEST_F(MockSharedArbitrationTest, constructor) { @@ -1377,7 +1377,7 @@ DEBUG_ONLY_TEST_F( // This test verifies the global arbitration can switch to reclaim the other // query or abort when one query claims to be reclaimable but can't actually -// reclaim.h +// reclaim. TEST_F(MockSharedArbitrationTest, badNonReclaimableQuery) { const int64_t memoryCapacity = 256 << 20; const ReclaimInjectionCallback badReclaimInjectCallback = @@ -1715,10 +1715,10 @@ DEBUG_ONLY_TEST_F( DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, globalArbitrationAbortTimeRatio) { const int64_t memoryCapacity = 512 << 20; const uint64_t memoryPoolInitCapacity = memoryCapacity / 2; - const int64_t maxArbitrationTimeMs = 2'000; + const uint64_t maxArbitrationTimeNs = 2'000'000'000UL; const double globalArbitrationAbortTimeRatio = 0.5; - const int64_t abortTimeThresholdMs = - maxArbitrationTimeMs * globalArbitrationAbortTimeRatio; + const uint64_t abortTimeThresholdNs = + maxArbitrationTimeNs * globalArbitrationAbortTimeRatio; setupMemory( memoryCapacity, 0, @@ -1734,15 +1734,15 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, globalArbitrationAbortTimeRatio) { kMemoryReclaimThreadsHwMultiplier, nullptr, true, - maxArbitrationTimeMs, + maxArbitrationTimeNs, false, globalArbitrationAbortTimeRatio); test::SharedArbitratorTestHelper arbitratorHelper(arbitrator_); - for (uint64_t pauseTimeMs : - {abortTimeThresholdMs / 2, - (maxArbitrationTimeMs - abortTimeThresholdMs) / 2}) { + for (auto pauseTimeNs : + {abortTimeThresholdNs / 2, + (maxArbitrationTimeNs + abortTimeThresholdNs) / 2}) { auto task1 = addTask(memoryCapacity); auto* op1 = task1->addMemoryOp(false); op1->allocate(memoryCapacity / 2); @@ -1756,7 +1756,7 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, globalArbitrationAbortTimeRatio) { std::function( ([&](const SharedArbitrator* /*unused*/) { std::this_thread::sleep_for( - std::chrono::milliseconds(pauseTimeMs)); + std::chrono::nanoseconds(pauseTimeNs)); }))); std::unordered_map runtimeStats; @@ -1784,7 +1784,7 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, globalArbitrationAbortTimeRatio) { const auto deltaGlobalArbitrationRuns = arbitratorHelper.globalArbitrationRuns() - prevGlobalArbitrationRuns; - if (pauseTimeMs < abortTimeThresholdMs) { + if (pauseTimeNs < abortTimeThresholdNs) { ASSERT_GT(deltaGlobalArbitrationRuns, 2); } else { // In SharedArbitrator::runGlobalArbitration() @@ -1814,7 +1814,7 @@ TEST_F(MockSharedArbitrationTest, globalArbitrationWithoutSpill) { kMemoryReclaimThreadsHwMultiplier, nullptr, true, - 5 * 60 * 1'000, + 5 * 60 * 1'000'000'000UL, true); auto triggerTask = addTask(memoryCapacity); auto* triggerOp = triggerTask->addMemoryOp(false); @@ -2460,7 +2460,21 @@ TEST_F(MockSharedArbitrationTest, shutdown) { DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, shutdownWait) { uint64_t memoryCapacity = 256 * MB; setupMemory( - memoryCapacity, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1.0, nullptr, true, 2'000); + memoryCapacity, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 1.0, + nullptr, + true, + 2'000'000'000UL); std::shared_ptr task1 = addTask(memoryCapacity); auto* op1 = task1->addMemoryOp(true); op1->allocate(memoryCapacity / 2); @@ -2716,7 +2730,21 @@ DEBUG_ONLY_TEST_F( DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, globalArbitrationTimeout) { uint64_t memoryCapacity = 256 * MB; setupMemory( - memoryCapacity, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1.0, nullptr, true, 1'000); + memoryCapacity, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 1.0, + nullptr, + true, + 1'000'000'000UL); std::shared_ptr task1 = addTask(memoryCapacity); auto* op1 = task1->addMemoryOp(true); op1->allocate(memoryCapacity / 2); @@ -2759,7 +2787,21 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, globalArbitrationTimeout) { DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, localArbitrationTimeout) { uint64_t memoryCapacity = 256 * MB; setupMemory( - memoryCapacity, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1.0, nullptr, true, 1'000); + memoryCapacity, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 1.0, + nullptr, + true, + 1'000'000'000UL); std::shared_ptr task = addTask(memoryCapacity); ASSERT_EQ(task->capacity(), 0); auto* op = task->addMemoryOp(true); @@ -2787,7 +2829,21 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, localArbitrationTimeout) { DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, localArbitrationQueueTimeout) { uint64_t memoryCapacity = 256 * MB; setupMemory( - memoryCapacity, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1.0, nullptr, true, 1'000); + memoryCapacity, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 1.0, + nullptr, + true, + 1'000'000'000UL); std::shared_ptr task = addTask(memoryCapacity); ASSERT_EQ(task->capacity(), 0); auto* op = task->addMemoryOp(true); @@ -2798,7 +2854,7 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, localArbitrationQueueTimeout) { ([&](const SharedArbitrator* arbitrator) { test::SharedArbitratorTestHelper arbitratorHelper( const_cast(arbitrator)); - ASSERT_EQ(arbitratorHelper.maxArbitrationTimeMs(), 1'000); + ASSERT_EQ(arbitratorHelper.maxArbitrationTimeNs(), 1'000'000'000UL); std::this_thread::sleep_for(std::chrono::seconds(2)); // NOLINT }))); try { @@ -3324,7 +3380,7 @@ TEST_F(MockSharedArbitrationTest, arbitrateWithMemoryReclaim) { // gone. DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, abortWithNoCandidate) { const uint64_t memoryCapacity = 256 * MB; - const uint64_t maxArbitrationTimeMs = 1'000; + const uint64_t maxArbitrationTimeNs = 1'000'000'000UL; setupMemory( memoryCapacity, 0, @@ -3340,7 +3396,7 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, abortWithNoCandidate) { 1.0, nullptr, true, - maxArbitrationTimeMs); + maxArbitrationTimeNs); auto* reclaimedOp1 = addMemoryOp(nullptr, false); reclaimedOp1->allocate(memoryCapacity / 2); auto* reclaimedOp2 = addMemoryOp(nullptr, false); @@ -3390,7 +3446,7 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, abortWithNoCandidate) { // gone. DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, reclaimWithNoCandidate) { const uint64_t memoryCapacity = 256 * MB; - const uint64_t maxArbitrationTimeMs = 1'000; + const uint64_t maxArbitrationTimeNs = 1'000'000'000UL; setupMemory( memoryCapacity, 0, @@ -3406,7 +3462,7 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, reclaimWithNoCandidate) { 1.0, nullptr, true, - maxArbitrationTimeMs); + maxArbitrationTimeNs); auto* reclaimedOp1 = addMemoryOp(nullptr, true); reclaimedOp1->allocate(memoryCapacity / 2); auto* reclaimedOp2 = addMemoryOp(nullptr, true); diff --git a/velox/common/memory/tests/SharedArbitratorTestUtil.h b/velox/common/memory/tests/SharedArbitratorTestUtil.h index 9ab095e50e21..536a36f33ea7 100644 --- a/velox/common/memory/tests/SharedArbitratorTestUtil.h +++ b/velox/common/memory/tests/SharedArbitratorTestUtil.h @@ -53,8 +53,8 @@ class SharedArbitratorTestHelper { } } - uint64_t maxArbitrationTimeMs() const { - return arbitrator_->maxArbitrationTimeMs_; + uint64_t maxArbitrationTimeNs() const { + return arbitrator_->maxArbitrationTimeNs_; } folly::CPUThreadPoolExecutor* memoryReclaimExecutor() const { diff --git a/velox/common/testutil/ScopedTestTime.cpp b/velox/common/testutil/ScopedTestTime.cpp index ec25f40cfdb5..fe79f78e10d7 100644 --- a/velox/common/testutil/ScopedTestTime.cpp +++ b/velox/common/testutil/ScopedTestTime.cpp @@ -20,7 +20,7 @@ namespace facebook::velox::common::testutil { bool ScopedTestTime::enabled_ = false; -std::optional ScopedTestTime::testTimeUs_ = {}; +std::optional ScopedTestTime::testTimeNs_ = {}; ScopedTestTime::ScopedTestTime() { #ifndef NDEBUG @@ -32,32 +32,42 @@ ScopedTestTime::ScopedTestTime() { } ScopedTestTime::~ScopedTestTime() { - testTimeUs_.reset(); + testTimeNs_.reset(); enabled_ = false; } -void ScopedTestTime::setCurrentTestTimeSec(size_t currentTimeSec) { - setCurrentTestTimeMicro(currentTimeSec * 1000000); +void ScopedTestTime::setCurrentTestTimeSec(uint64_t currentTimeSec) { + setCurrentTestTimeNano(currentTimeSec * 1'000'000'000UL); } -void ScopedTestTime::setCurrentTestTimeMs(size_t currentTimeMs) { - setCurrentTestTimeMicro(currentTimeMs * 1000); +void ScopedTestTime::setCurrentTestTimeMs(uint64_t currentTimeMs) { + setCurrentTestTimeNano(currentTimeMs * 1'000'000UL); } -void ScopedTestTime::setCurrentTestTimeMicro(size_t currentTimeUs) { - testTimeUs_ = currentTimeUs; +void ScopedTestTime::setCurrentTestTimeMicro(uint64_t currentTimeUs) { + setCurrentTestTimeNano(currentTimeUs * 1'000UL); } -std::optional ScopedTestTime::getCurrentTestTimeSec() { - return testTimeUs_.has_value() ? std::make_optional(*testTimeUs_ / 1000000L) - : testTimeUs_; +void ScopedTestTime::setCurrentTestTimeNano(uint64_t currentTimeNs) { + testTimeNs_ = currentTimeNs; } -std::optional ScopedTestTime::getCurrentTestTimeMs() { - return testTimeUs_.has_value() ? std::make_optional(*testTimeUs_ / 1000L) - : testTimeUs_; + +std::optional ScopedTestTime::getCurrentTestTimeSec() { + return testTimeNs_.has_value() + ? std::make_optional(*testTimeNs_ / 1'000'000'000L) + : testTimeNs_; +} +std::optional ScopedTestTime::getCurrentTestTimeMs() { + return testTimeNs_.has_value() ? std::make_optional(*testTimeNs_ / 1000'000L) + : testTimeNs_; +} + +std::optional ScopedTestTime::getCurrentTestTimeMicro() { + return testTimeNs_.has_value() ? std::make_optional(*testTimeNs_ / 1000L) + : testTimeNs_; } -std::optional ScopedTestTime::getCurrentTestTimeMicro() { - return testTimeUs_; +std::optional ScopedTestTime::getCurrentTestTimeNano() { + return testTimeNs_; } } // namespace facebook::velox::common::testutil diff --git a/velox/common/testutil/ScopedTestTime.h b/velox/common/testutil/ScopedTestTime.h index 3a03a603fb31..dd635df98ee5 100644 --- a/velox/common/testutil/ScopedTestTime.h +++ b/velox/common/testutil/ScopedTestTime.h @@ -15,6 +15,7 @@ */ #pragma once +#include #include namespace facebook::velox::common::testutil { @@ -24,18 +25,20 @@ class ScopedTestTime { ScopedTestTime(); ~ScopedTestTime(); - void setCurrentTestTimeSec(size_t currentTimeSec); - void setCurrentTestTimeMs(size_t currentTimeMs); - void setCurrentTestTimeMicro(size_t currentTimeUs); + void setCurrentTestTimeSec(uint64_t currentTimeSec); + void setCurrentTestTimeMs(uint64_t currentTimeMs); + void setCurrentTestTimeMicro(uint64_t currentTimeUs); + void setCurrentTestTimeNano(uint64_t currentTimeNs); - static std::optional getCurrentTestTimeSec(); - static std::optional getCurrentTestTimeMs(); - static std::optional getCurrentTestTimeMicro(); + static std::optional getCurrentTestTimeSec(); + static std::optional getCurrentTestTimeMs(); + static std::optional getCurrentTestTimeMicro(); + static std::optional getCurrentTestTimeNano(); private: // Used to verify only one instance of ScopedTestTime exists at a time. static bool enabled_; // The overridden value of current time only. - static std::optional testTimeUs_; + static std::optional testTimeNs_; }; } // namespace facebook::velox::common::testutil diff --git a/velox/common/testutil/tests/TestScopedTestTime.cpp b/velox/common/testutil/tests/TestScopedTestTime.cpp index 4052ea256729..9bf9f1b6783f 100644 --- a/velox/common/testutil/tests/TestScopedTestTime.cpp +++ b/velox/common/testutil/tests/TestScopedTestTime.cpp @@ -61,6 +61,23 @@ DEBUG_ONLY_TEST(TestScopedTestTime, testSetCurrentTimeMicro) { ASSERT_NE(getCurrentTimeMicro(), 2000); } +DEBUG_ONLY_TEST(TestScopedTestTime, testSetCurrentTimeNano) { + { + ScopedTestTime scopedTestTime; + scopedTestTime.setCurrentTestTimeNano(1000); + ASSERT_EQ(getCurrentTimeMicro(), 1); + ASSERT_EQ(getCurrentTimeNano(), 1000); + scopedTestTime.setCurrentTestTimeNano(2000); + ASSERT_EQ(getCurrentTimeMicro(), 2); + ASSERT_EQ(getCurrentTimeNano(), 2000); + } + + // This should be the actual time, so we don't know what it is, but it + // shouldn't be equal to the overridden value. + ASSERT_NE(getCurrentTimeMicro(), 2); + ASSERT_NE(getCurrentTimeNano(), 2000); +} + DEBUG_ONLY_TEST(TestScopedTestTime, multipleScopedTestTimes) { { ScopedTestTime scopedTestTime; diff --git a/velox/common/time/Timer.cpp b/velox/common/time/Timer.cpp index 5598fb0ee6db..78416f03e67e 100644 --- a/velox/common/time/Timer.cpp +++ b/velox/common/time/Timer.cpp @@ -25,37 +25,48 @@ using common::testutil::ScopedTestTime; #ifndef NDEBUG -size_t getCurrentTimeSec() { +uint64_t getCurrentTimeSec() { return ScopedTestTime::getCurrentTestTimeSec().value_or( duration_cast(system_clock::now().time_since_epoch()).count()); } -size_t getCurrentTimeMs() { +uint64_t getCurrentTimeMs() { return ScopedTestTime::getCurrentTestTimeMs().value_or( duration_cast(system_clock::now().time_since_epoch()) .count()); } -size_t getCurrentTimeMicro() { +uint64_t getCurrentTimeMicro() { return ScopedTestTime::getCurrentTestTimeMicro().value_or( duration_cast(system_clock::now().time_since_epoch()) .count()); } + +uint64_t getCurrentTimeNano() { + return ScopedTestTime::getCurrentTestTimeNano().value_or( + duration_cast(system_clock::now().time_since_epoch()) + .count()); +} #else -size_t getCurrentTimeSec() { +uint64_t getCurrentTimeSec() { return duration_cast(system_clock::now().time_since_epoch()).count(); } -size_t getCurrentTimeMs() { +uint64_t getCurrentTimeMs() { return duration_cast(system_clock::now().time_since_epoch()) .count(); } -size_t getCurrentTimeMicro() { +uint64_t getCurrentTimeMicro() { return duration_cast(system_clock::now().time_since_epoch()) .count(); } + +uint64_t getCurrentTimeNano() { + return duration_cast(system_clock::now().time_since_epoch()) + .count(); +} #endif } // namespace facebook::velox diff --git a/velox/common/time/Timer.h b/velox/common/time/Timer.h index ce3d8ceb0386..af4ec4e7a24b 100644 --- a/velox/common/time/Timer.h +++ b/velox/common/time/Timer.h @@ -88,13 +88,15 @@ class ClockTimer { uint64_t start_; }; -// Returns the current epoch time in seconds. -size_t getCurrentTimeSec(); +/// Returns the current epoch time in seconds. +uint64_t getCurrentTimeSec(); /// Returns the current epoch time in milliseconds. -size_t getCurrentTimeMs(); +uint64_t getCurrentTimeMs(); /// Returns the current epoch time in microseconds. -size_t getCurrentTimeMicro(); +uint64_t getCurrentTimeMicro(); +/// Returns the current epoch time in nanoseconds. +uint64_t getCurrentTimeNano(); } // namespace facebook::velox