From c0eda05880b055bedb85c97a0e7187746794c32e Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 23 Jul 2024 10:12:37 +0800 Subject: [PATCH 1/3] fixup --- cpp/core/jni/JniCommon.h | 12 ++++--- cpp/core/memory/AllocationListener.h | 47 +++++++++++++++------------- 2 files changed, 32 insertions(+), 27 deletions(-) diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h index 8f8002b2c216..601a0d11cb39 100644 --- a/cpp/core/jni/JniCommon.h +++ b/cpp/core/jni/JniCommon.h @@ -403,9 +403,9 @@ class SparkAllocationListener final : public gluten::AllocationListener { env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, size); checkException(env); } - // atomic operation is enough here, no need to use mutex - bytesReserved_.fetch_add(size); - maxBytesReserved_.store(std::max(bytesReserved_.load(), maxBytesReserved_.load())); + std::lock_guard lock(mutex_); + bytesReserved_ += size; + maxBytesReserved_ = std::max(bytesReserved_, maxBytesReserved_); } int64_t currentBytes() override { @@ -421,8 +421,10 @@ class SparkAllocationListener final : public gluten::AllocationListener { jobject jListenerGlobalRef_; const jmethodID jReserveMethod_; const jmethodID jUnreserveMethod_; - std::atomic_int64_t bytesReserved_{0L}; - std::atomic_int64_t maxBytesReserved_{0L}; + int64_t bytesReserved_{0L}; + int64_t maxBytesReserved_{0L}; + + mutable std::mutex mutex_; }; class BacktraceAllocationListener final : public gluten::AllocationListener { diff --git a/cpp/core/memory/AllocationListener.h b/cpp/core/memory/AllocationListener.h index 695552cefbe3..a4e549cacc9d 100644 --- a/cpp/core/memory/AllocationListener.h +++ b/cpp/core/memory/AllocationListener.h @@ -50,32 +50,18 @@ class AllocationListener { // The class must be thread safe class BlockAllocationListener final : public AllocationListener { public: - BlockAllocationListener(AllocationListener* delegated, uint64_t blockSize) + BlockAllocationListener(AllocationListener* delegated, int64_t blockSize) : delegated_(delegated), blockSize_(blockSize) {} void allocationChanged(int64_t diff) override { if (diff == 0) { return; } - std::unique_lock guard{mutex_}; - if (diff > 0) { - if (reservationBytes_ - usedBytes_ < diff) { - auto roundSize = (diff + (blockSize_ - 1)) / blockSize_ * blockSize_; - reservationBytes_ += roundSize; - peakBytes_ = std::max(peakBytes_, reservationBytes_); - guard.unlock(); - // unnecessary to lock the delegated listener, assume it's thread safe - delegated_->allocationChanged(roundSize); - } - usedBytes_ += diff; - } else { - usedBytes_ += diff; - auto unreservedSize = (reservationBytes_ - usedBytes_) / blockSize_ * blockSize_; - reservationBytes_ -= unreservedSize; - guard.unlock(); - // unnecessary to lock the delegated listener - delegated_->allocationChanged(-unreservedSize); + int64_t granted = reserve(diff); + if (granted == 0) { + return; } + delegated_->allocationChanged(granted); } int64_t currentBytes() override { @@ -87,11 +73,28 @@ class BlockAllocationListener final : public AllocationListener { } private: + int64_t reserve(int64_t diff) { + std::lock_guard lock(mutex_); + usedBytes_ += diff; + int64_t newBlockCount; + if (usedBytes_ == 0) { + newBlockCount = 0; + } else { + // ceil to get the required block number + newBlockCount = (usedBytes_ - 1) / blockSize_ + 1; + } + int64_t bytesGranted = (newBlockCount - blocksReserved_) * blockSize_; + blocksReserved_ = newBlockCount; + peakBytes_ = std::max(peakBytes_, usedBytes_); + return bytesGranted; + } + AllocationListener* const delegated_; const uint64_t blockSize_; - uint64_t usedBytes_{0L}; - uint64_t peakBytes_{0L}; - uint64_t reservationBytes_{0L}; + int64_t blocksReserved_{0L}; + int64_t usedBytes_{0L}; + int64_t peakBytes_{0L}; + int64_t reservationBytes_{0L}; mutable std::mutex mutex_; }; From 8325fdabe5dc83bdbcd81292a792b2535b53e427 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 23 Jul 2024 10:20:13 +0800 Subject: [PATCH 2/3] fixup --- cpp/core/jni/JniCommon.h | 12 ++++++------ cpp/core/memory/MemoryAllocator.cc | 5 +++-- cpp/core/memory/MemoryAllocator.h | 6 ++++-- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h index 601a0d11cb39..8477f587ab17 100644 --- a/cpp/core/jni/JniCommon.h +++ b/cpp/core/jni/JniCommon.h @@ -404,16 +404,16 @@ class SparkAllocationListener final : public gluten::AllocationListener { checkException(env); } std::lock_guard lock(mutex_); - bytesReserved_ += size; - maxBytesReserved_ = std::max(bytesReserved_, maxBytesReserved_); + usedBytes_ += size; + peakBytes_ = std::max(usedBytes_, peakBytes_); } int64_t currentBytes() override { - return bytesReserved_; + return usedBytes_; } int64_t peakBytes() override { - return maxBytesReserved_; + return peakBytes_; } private: @@ -421,8 +421,8 @@ class SparkAllocationListener final : public gluten::AllocationListener { jobject jListenerGlobalRef_; const jmethodID jReserveMethod_; const jmethodID jUnreserveMethod_; - int64_t bytesReserved_{0L}; - int64_t maxBytesReserved_{0L}; + int64_t usedBytes_{0L}; + int64_t peakBytes_{0L}; mutable std::mutex mutex_; }; diff --git a/cpp/core/memory/MemoryAllocator.cc b/cpp/core/memory/MemoryAllocator.cc index 01818636aa52..4dd9de02827d 100644 --- a/cpp/core/memory/MemoryAllocator.cc +++ b/cpp/core/memory/MemoryAllocator.cc @@ -92,8 +92,9 @@ int64_t ListenableMemoryAllocator::peakBytes() const { void ListenableMemoryAllocator::updateUsage(int64_t size) { listener_->allocationChanged(size); - usedBytes_.fetch_add(size); - peakBytes_.store(std::max(peakBytes_.load(), usedBytes_.load())); + std::lock_guard lock(mutex_); + usedBytes_ += size; + peakBytes_ = std::max(peakBytes_, usedBytes_); } bool StdMemoryAllocator::allocate(int64_t size, void** out) { diff --git a/cpp/core/memory/MemoryAllocator.h b/cpp/core/memory/MemoryAllocator.h index 01271cc94673..ffb932aa3efd 100644 --- a/cpp/core/memory/MemoryAllocator.h +++ b/cpp/core/memory/MemoryAllocator.h @@ -72,8 +72,10 @@ class ListenableMemoryAllocator final : public MemoryAllocator { void updateUsage(int64_t size); MemoryAllocator* const delegated_; AllocationListener* const listener_; - std::atomic_int64_t usedBytes_{0L}; - std::atomic_int64_t peakBytes_{0L}; + int64_t usedBytes_{0L}; + int64_t peakBytes_{0L}; + + mutable std::mutex mutex_; }; class StdMemoryAllocator final : public MemoryAllocator { From dbf95b4b3835bc0f6dd8a3909f65cea893271306 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 24 Jul 2024 10:17:58 +0800 Subject: [PATCH 3/3] fixup --- cpp/core/jni/JniCommon.h | 18 ++++++++++++------ cpp/core/memory/AllocationListener.h | 2 +- cpp/core/memory/MemoryAllocator.cc | 12 ++++++++++-- cpp/core/memory/MemoryAllocator.h | 6 ++---- 4 files changed, 25 insertions(+), 13 deletions(-) diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h index 8477f587ab17..1d784f3a5eda 100644 --- a/cpp/core/jni/JniCommon.h +++ b/cpp/core/jni/JniCommon.h @@ -403,9 +403,17 @@ class SparkAllocationListener final : public gluten::AllocationListener { env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, size); checkException(env); } - std::lock_guard lock(mutex_); usedBytes_ += size; - peakBytes_ = std::max(usedBytes_, peakBytes_); + while (true) { + int64_t savedPeakBytes = peakBytes_; + if (usedBytes_ <= savedPeakBytes) { + break; + } + // usedBytes_ > savedPeakBytes, update peak + if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) { + break; + } + } } int64_t currentBytes() override { @@ -421,10 +429,8 @@ class SparkAllocationListener final : public gluten::AllocationListener { jobject jListenerGlobalRef_; const jmethodID jReserveMethod_; const jmethodID jUnreserveMethod_; - int64_t usedBytes_{0L}; - int64_t peakBytes_{0L}; - - mutable std::mutex mutex_; + std::atomic_int64_t usedBytes_{0L}; + std::atomic_int64_t peakBytes_{0L}; }; class BacktraceAllocationListener final : public gluten::AllocationListener { diff --git a/cpp/core/memory/AllocationListener.h b/cpp/core/memory/AllocationListener.h index a4e549cacc9d..41797641fe14 100644 --- a/cpp/core/memory/AllocationListener.h +++ b/cpp/core/memory/AllocationListener.h @@ -73,7 +73,7 @@ class BlockAllocationListener final : public AllocationListener { } private: - int64_t reserve(int64_t diff) { + inline int64_t reserve(int64_t diff) { std::lock_guard lock(mutex_); usedBytes_ += diff; int64_t newBlockCount; diff --git a/cpp/core/memory/MemoryAllocator.cc b/cpp/core/memory/MemoryAllocator.cc index 4dd9de02827d..c637c6a9c13d 100644 --- a/cpp/core/memory/MemoryAllocator.cc +++ b/cpp/core/memory/MemoryAllocator.cc @@ -92,9 +92,17 @@ int64_t ListenableMemoryAllocator::peakBytes() const { void ListenableMemoryAllocator::updateUsage(int64_t size) { listener_->allocationChanged(size); - std::lock_guard lock(mutex_); usedBytes_ += size; - peakBytes_ = std::max(peakBytes_, usedBytes_); + while (true) { + int64_t savedPeakBytes = peakBytes_; + if (usedBytes_ <= savedPeakBytes) { + break; + } + // usedBytes_ > savedPeakBytes, update peak + if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) { + break; + } + } } bool StdMemoryAllocator::allocate(int64_t size, void** out) { diff --git a/cpp/core/memory/MemoryAllocator.h b/cpp/core/memory/MemoryAllocator.h index ffb932aa3efd..01271cc94673 100644 --- a/cpp/core/memory/MemoryAllocator.h +++ b/cpp/core/memory/MemoryAllocator.h @@ -72,10 +72,8 @@ class ListenableMemoryAllocator final : public MemoryAllocator { void updateUsage(int64_t size); MemoryAllocator* const delegated_; AllocationListener* const listener_; - int64_t usedBytes_{0L}; - int64_t peakBytes_{0L}; - - mutable std::mutex mutex_; + std::atomic_int64_t usedBytes_{0L}; + std::atomic_int64_t peakBytes_{0L}; }; class StdMemoryAllocator final : public MemoryAllocator {