Skip to content

Commit

Permalink
Let arbitrator system use nano second (facebookincubator#11415)
Browse files Browse the repository at this point in the history
Summary:
Use nano seconds in arbitrator system to avoid flakiness in time dependent tests.

Pull Request resolved: facebookincubator#11415

Reviewed By: xiaoxmeng

Differential Revision: D65385102

Pulled By: tanjialiang

fbshipit-source-id: f1dc631d69a4e17850a9b90ebf0d5e29fb5aec86
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Nov 5, 2024
1 parent 1e22de8 commit 62b0a12
Show file tree
Hide file tree
Showing 14 changed files with 327 additions and 227 deletions.
66 changes: 33 additions & 33 deletions velox/common/memory/ArbitrationOperation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ using namespace facebook::velox::memory;
ArbitrationOperation::ArbitrationOperation(
ScopedArbitrationParticipant&& participant,
uint64_t requestBytes,
uint64_t timeoutMs)
uint64_t timeoutNs)
: requestBytes_(requestBytes),
timeoutMs_(timeoutMs),
createTimeMs_(getCurrentTimeMs()),
timeoutNs_(timeoutNs),
createTimeNs_(getCurrentTimeNano()),
participant_(std::move(participant)) {
VELOX_CHECK_GT(requestBytes_, 0);
}
Expand Down Expand Up @@ -84,45 +84,45 @@ void ArbitrationOperation::start() {
VELOX_CHECK_EQ(state_, State::kInit);
participant_->startArbitration(this);
setState(ArbitrationOperation::State::kRunning);
VELOX_CHECK_EQ(startTimeMs_, 0);
startTimeMs_ = getCurrentTimeMs();
VELOX_CHECK_EQ(startTimeNs_, 0);
startTimeNs_ = getCurrentTimeNano();
}

void ArbitrationOperation::finish() {
setState(State::kFinished);
VELOX_CHECK_EQ(finishTimeMs_, 0);
finishTimeMs_ = getCurrentTimeMs();
VELOX_CHECK_EQ(finishTimeNs_, 0);
finishTimeNs_ = getCurrentTimeNano();
participant_->finishArbitration(this);
}

bool ArbitrationOperation::aborted() const {
return participant_->aborted();
}

size_t ArbitrationOperation::executionTimeMs() const {
uint64_t ArbitrationOperation::executionTimeNs() const {
if (state_ == State::kFinished) {
VELOX_CHECK_GE(finishTimeMs_, createTimeMs_);
return finishTimeMs_ - createTimeMs_;
VELOX_CHECK_GE(finishTimeNs_, createTimeNs_);
return finishTimeNs_ - createTimeNs_;
} else {
const auto currentTimeMs = getCurrentTimeMs();
VELOX_CHECK_GE(currentTimeMs, createTimeMs_);
return currentTimeMs - createTimeMs_;
const auto currentTimeNs = getCurrentTimeNano();
VELOX_CHECK_GE(currentTimeNs, createTimeNs_);
return currentTimeNs - createTimeNs_;
}
}

bool ArbitrationOperation::hasTimeout() const {
return state_ != State::kFinished && timeoutMs() <= 0;
return state_ != State::kFinished && timeoutNs() <= 0;
}

size_t ArbitrationOperation::timeoutMs() const {
uint64_t ArbitrationOperation::timeoutNs() const {
if (state_ == State::kFinished) {
return 0;
}
const auto execTimeMs = executionTimeMs();
if (execTimeMs >= timeoutMs_) {
const auto execTimeNs = executionTimeNs();
if (execTimeNs >= timeoutNs_) {
return 0;
}
return timeoutMs_ - execTimeMs;
return timeoutNs_ - execTimeNs;
}

void ArbitrationOperation::setGrowTargets() {
Expand All @@ -139,28 +139,28 @@ void ArbitrationOperation::setGrowTargets() {

ArbitrationOperation::Stats ArbitrationOperation::stats() const {
VELOX_CHECK_EQ(state_, State::kFinished);
VELOX_CHECK_NE(startTimeMs_, 0);
VELOX_CHECK_NE(startTimeNs_, 0);

const uint64_t executionTimeMs = this->executionTimeMs();
const uint64_t executionTimeNs = this->executionTimeNs();

VELOX_CHECK_GE(startTimeMs_, createTimeMs_);
const uint64_t localArbitrationWaitTimeMs = startTimeMs_ - createTimeMs_;
if (globalArbitrationStartTimeMs_ == 0) {
VELOX_CHECK_GE(startTimeNs_, createTimeNs_);
const uint64_t localArbitrationWaitTimeNs = startTimeNs_ - createTimeNs_;
if (globalArbitrationStartTimeNs_ == 0) {
return {
localArbitrationWaitTimeMs,
finishTimeMs_ - startTimeMs_,
localArbitrationWaitTimeNs,
finishTimeNs_ - startTimeNs_,
0,
executionTimeMs};
executionTimeNs};
}

VELOX_CHECK_GE(globalArbitrationStartTimeMs_, startTimeMs_);
const uint64_t localArbitrationExecTimeMs =
globalArbitrationStartTimeMs_ - startTimeMs_;
VELOX_CHECK_GE(globalArbitrationStartTimeNs_, startTimeNs_);
const uint64_t localArbitrationExecTimeNs =
globalArbitrationStartTimeNs_ - startTimeNs_;
return {
localArbitrationWaitTimeMs,
localArbitrationExecTimeMs,
finishTimeMs_ - globalArbitrationStartTimeMs_,
executionTimeMs};
localArbitrationWaitTimeNs,
localArbitrationExecTimeNs,
finishTimeNs_ - globalArbitrationStartTimeNs_,
executionTimeNs};
}

std::ostream& operator<<(std::ostream& out, ArbitrationOperation::State state) {
Expand Down
28 changes: 14 additions & 14 deletions velox/common/memory/ArbitrationOperation.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ArbitrationOperation {
ArbitrationOperation(
ScopedArbitrationParticipant&& pool,
uint64_t requestBytes,
uint64_t timeoutMs);
uint64_t timeoutNs);

~ArbitrationOperation();

Expand Down Expand Up @@ -95,28 +95,28 @@ class ArbitrationOperation {

/// Returns the remaining execution time for this operation before time out.
/// If the operation has already finished, this returns zero.
size_t timeoutMs() const;
uint64_t timeoutNs() const;

/// Returns true if this operation has timed out.
bool hasTimeout() const;

/// Returns the execution time of this arbitration operation since creation.
size_t executionTimeMs() const;
uint64_t executionTimeNs() const;

/// Invoked to mark the start of global arbitration. This is used to measure
/// how much time spent in waiting for global arbitration.
void recordGlobalArbitrationStartTime() {
VELOX_CHECK_EQ(globalArbitrationStartTimeMs_, 0);
VELOX_CHECK_EQ(globalArbitrationStartTimeNs_, 0);
VELOX_CHECK_EQ(state_, State::kRunning);
globalArbitrationStartTimeMs_ = getCurrentTimeMs();
globalArbitrationStartTimeNs_ = getCurrentTimeNano();
}

/// The execution stats of this arbitration operation after completion.
struct Stats {
uint64_t localArbitrationWaitTimeMs{0};
uint64_t localArbitrationExecTimeMs{0};
uint64_t globalArbitrationWaitTimeMs{0};
uint64_t executionTimeMs{0};
uint64_t localArbitrationWaitTimeNs{0};
uint64_t localArbitrationExecTimeNs{0};
uint64_t globalArbitrationWaitTimeNs{0};
uint64_t executionTimeNs{0};
};

/// NOTE: should only called after this arbitration operation finishes.
Expand All @@ -126,22 +126,22 @@ class ArbitrationOperation {
void setState(State state);

const uint64_t requestBytes_;
const uint64_t timeoutMs_;
const uint64_t timeoutNs_;

// The start time of this arbitration operation.
const uint64_t createTimeMs_;
const uint64_t createTimeNs_;
const ScopedArbitrationParticipant participant_;

State state_{State::kInit};

uint64_t startTimeMs_{0};
uint64_t finishTimeMs_{0};
uint64_t startTimeNs_{0};
uint64_t finishTimeNs_{0};

uint64_t maxGrowBytes_{0};
uint64_t minGrowBytes_{0};

// The time that starts global arbitration wait
uint64_t globalArbitrationStartTimeMs_{};
uint64_t globalArbitrationStartTimeNs_{};

friend class ArbitrationParticipant;
};
Expand Down
12 changes: 6 additions & 6 deletions velox/common/memory/ArbitrationParticipant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ ArbitrationParticipant::ArbitrationParticipant(
pool_(pool.get()),
config_(config),
maxCapacity_(pool_->maxCapacity()),
createTimeUs_(getCurrentTimeMicro()) {
createTimeNs_(getCurrentTimeNano()) {
VELOX_CHECK_LE(
config_->minCapacity,
maxCapacity_,
Expand Down Expand Up @@ -261,7 +261,7 @@ void ArbitrationParticipant::finishArbitration(ArbitrationOperation* op) {

uint64_t ArbitrationParticipant::reclaim(
uint64_t targetBytes,
uint64_t maxWaitTimeMs,
uint64_t maxWaitTimeNs,
MemoryReclaimer::Stats& stats) noexcept {
targetBytes = std::max(targetBytes, config_->minReclaimBytes);
if (targetBytes == 0) {
Expand All @@ -275,7 +275,7 @@ uint64_t ArbitrationParticipant::reclaim(
++numReclaims_;
VELOX_MEM_LOG(INFO) << "Reclaiming from memory pool " << pool_->name()
<< " with target " << succinctBytes(targetBytes);
pool_->reclaim(targetBytes, maxWaitTimeMs, stats);
pool_->reclaim(targetBytes, maxWaitTimeNs / 1'000'000, stats);
reclaimedBytes = shrink(/*reclaimAll=*/false);
} catch (const std::exception& e) {
VELOX_MEM_LOG(ERROR) << "Failed to reclaim from memory pool "
Expand Down Expand Up @@ -354,9 +354,9 @@ uint64_t ArbitrationParticipant::abortLocked(
}

bool ArbitrationParticipant::waitForReclaimOrAbort(
uint64_t maxWaitTimeMs) const {
uint64_t maxWaitTimeNs) const {
std::unique_lock<std::timed_mutex> l(
reclaimLock_, std::chrono::milliseconds(maxWaitTimeMs));
reclaimLock_, std::chrono::nanoseconds(maxWaitTimeNs));
return l.owns_lock();
}

Expand All @@ -380,7 +380,7 @@ std::string ArbitrationParticipant::Stats::toString() const {
succinctBytes(reclaimedBytes),
succinctBytes(growBytes),
aborted,
succinctMicros(durationUs));
succinctNanos(durationNs));
}

ScopedArbitrationParticipant::ScopedArbitrationParticipant(
Expand Down
22 changes: 11 additions & 11 deletions velox/common/memory/ArbitrationParticipant.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ class ArbitrationParticipant
}

/// Returns the duration of this arbitration participant since its creation.
uint64_t durationUs() const {
const auto now = getCurrentTimeMicro();
VELOX_CHECK_GE(now, createTimeUs_);
return now - createTimeUs_;
uint64_t durationNs() const {
const auto now = getCurrentTimeNano();
VELOX_CHECK_GE(now, createTimeNs_);
return now - createTimeNs_;
}

/// Invoked to acquire a shared reference to this arbitration participant
Expand Down Expand Up @@ -206,11 +206,11 @@ class ArbitrationParticipant
/// restriction.
uint64_t shrink(bool reclaimAll = false);

// Invoked to reclaim used memory from this memory pool with specified
// 'targetBytes'. The function returns the actually freed capacity.
/// Invoked to reclaim used memory from this memory pool with specified
/// 'targetBytes'. The function returns the actually freed capacity.
uint64_t reclaim(
uint64_t targetBytes,
uint64_t maxWaitTimeMs,
uint64_t maxWaitTimeNs,
MemoryReclaimer::Stats& stats) noexcept;

/// Invoked to abort the query memory pool and returns the reclaimed bytes
Expand All @@ -226,7 +226,7 @@ class ArbitrationParticipant
/// Invoked to wait for the pending memory reclaim or abort operation to
/// complete within a 'maxWaitTimeMs' time window. The function returns false
/// if the wait has timed out.
bool waitForReclaimOrAbort(uint64_t maxWaitTimeMs) const;
bool waitForReclaimOrAbort(uint64_t maxWaitTimeNs) const;

/// Invoked to start arbitration operation 'op'. The operation needs to wait
/// for the prior arbitration operations to finish first before executing to
Expand All @@ -246,7 +246,7 @@ class ArbitrationParticipant
size_t numWaitingOps() const;

struct Stats {
uint64_t durationUs{0};
uint64_t durationNs{0};
uint32_t numRequests{0};
uint32_t numReclaims{0};
uint32_t numShrinks{0};
Expand All @@ -260,7 +260,7 @@ class ArbitrationParticipant

Stats stats() const {
Stats stats;
stats.durationUs = durationUs();
stats.durationNs = durationNs();
stats.aborted = aborted_;
stats.numRequests = numRequests_;
stats.numGrows = numGrows_;
Expand Down Expand Up @@ -310,7 +310,7 @@ class ArbitrationParticipant
MemoryPool* const pool_;
const Config* const config_;
const uint64_t maxCapacity_;
const size_t createTimeUs_;
const uint64_t createTimeNs_;

mutable std::mutex stateLock_;
bool aborted_{false};
Expand Down
Loading

0 comments on commit 62b0a12

Please sign in to comment.