Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Following #6526, minor fixes and improvements #6554

Merged
merged 3 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,26 +403,34 @@ class SparkAllocationListener final : public gluten::AllocationListener {
env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, size);
checkException(env);
}
// atomic operation is enough here, no need to use mutex
bytesReserved_.fetch_add(size);
maxBytesReserved_.store(std::max(bytesReserved_.load(), maxBytesReserved_.load()));
usedBytes_ += size;
while (true) {
int64_t savedPeakBytes = peakBytes_;
if (usedBytes_ <= savedPeakBytes) {
break;
}
// usedBytes_ > savedPeakBytes, update peak
if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) {
break;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With C++26 in future, we can use fetch_max directly. https://en.cppreference.com/w/cpp/atomic/atomic/fetch_max

}

int64_t currentBytes() override {
return bytesReserved_;
return usedBytes_;
}

int64_t peakBytes() override {
return maxBytesReserved_;
return peakBytes_;
}

private:
JavaVM* vm_;
jobject jListenerGlobalRef_;
const jmethodID jReserveMethod_;
const jmethodID jUnreserveMethod_;
std::atomic_int64_t bytesReserved_{0L};
std::atomic_int64_t maxBytesReserved_{0L};
std::atomic_int64_t usedBytes_{0L};
std::atomic_int64_t peakBytes_{0L};
};

class BacktraceAllocationListener final : public gluten::AllocationListener {
Expand Down
47 changes: 25 additions & 22 deletions cpp/core/memory/AllocationListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,32 +50,18 @@ class AllocationListener {
// The class must be thread safe
class BlockAllocationListener final : public AllocationListener {
public:
BlockAllocationListener(AllocationListener* delegated, uint64_t blockSize)
BlockAllocationListener(AllocationListener* delegated, int64_t blockSize)
: delegated_(delegated), blockSize_(blockSize) {}

void allocationChanged(int64_t diff) override {
if (diff == 0) {
return;
}
std::unique_lock<std::mutex> guard{mutex_};
if (diff > 0) {
if (reservationBytes_ - usedBytes_ < diff) {
auto roundSize = (diff + (blockSize_ - 1)) / blockSize_ * blockSize_;
reservationBytes_ += roundSize;
peakBytes_ = std::max(peakBytes_, reservationBytes_);
guard.unlock();
// unnecessary to lock the delegated listener, assume it's thread safe
delegated_->allocationChanged(roundSize);
}
usedBytes_ += diff;
} else {
usedBytes_ += diff;
auto unreservedSize = (reservationBytes_ - usedBytes_) / blockSize_ * blockSize_;
reservationBytes_ -= unreservedSize;
guard.unlock();
// unnecessary to lock the delegated listener
delegated_->allocationChanged(-unreservedSize);
int64_t granted = reserve(diff);
if (granted == 0) {
return;
}
delegated_->allocationChanged(granted);
}

int64_t currentBytes() override {
Expand All @@ -87,11 +73,28 @@ class BlockAllocationListener final : public AllocationListener {
}

private:
inline int64_t reserve(int64_t diff) {
std::lock_guard<std::mutex> lock(mutex_);
usedBytes_ += diff;
int64_t newBlockCount;
if (usedBytes_ == 0) {
newBlockCount = 0;
} else {
// ceil to get the required block number
newBlockCount = (usedBytes_ - 1) / blockSize_ + 1;
}
int64_t bytesGranted = (newBlockCount - blocksReserved_) * blockSize_;
blocksReserved_ = newBlockCount;
peakBytes_ = std::max(peakBytes_, usedBytes_);
return bytesGranted;
}

AllocationListener* const delegated_;
const uint64_t blockSize_;
uint64_t usedBytes_{0L};
uint64_t peakBytes_{0L};
uint64_t reservationBytes_{0L};
int64_t blocksReserved_{0L};
int64_t usedBytes_{0L};
int64_t peakBytes_{0L};
int64_t reservationBytes_{0L};

mutable std::mutex mutex_;
};
Expand Down
13 changes: 11 additions & 2 deletions cpp/core/memory/MemoryAllocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,17 @@ int64_t ListenableMemoryAllocator::peakBytes() const {

void ListenableMemoryAllocator::updateUsage(int64_t size) {
listener_->allocationChanged(size);
usedBytes_.fetch_add(size);
peakBytes_.store(std::max(peakBytes_.load(), usedBytes_.load()));
usedBytes_ += size;
while (true) {
int64_t savedPeakBytes = peakBytes_;
if (usedBytes_ <= savedPeakBytes) {
break;
}
// usedBytes_ > savedPeakBytes, update peak
if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) {
break;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With C++26 in future, we can use fetch_max directly. https://en.cppreference.com/w/cpp/atomic/atomic/fetch_max

}

bool StdMemoryAllocator::allocate(int64_t size, void** out) {
Expand Down
Loading