diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h index d5c9f2b3b18b4..667c3bc804845 100644 --- a/cpp/core/jni/JniCommon.h +++ b/cpp/core/jni/JniCommon.h @@ -399,25 +399,41 @@ class SparkAllocationListener final : public gluten::AllocationListener { env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, size); checkException(env); } - bytesReserved_ += size; - maxBytesReserved_ = std::max(bytesReserved_, maxBytesReserved_); + usedBytes_ += size; + while (true) { + int64_t savedPeakBytes = peakBytes_; + if (usedBytes_ <= savedPeakBytes) { + break; + } + // usedBytes_ > savedPeakBytes, update peak + if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) { + break; + } + } } int64_t currentBytes() override { - return bytesReserved_; + return usedBytes_; } int64_t peakBytes() override { - return maxBytesReserved_; + return peakBytes_; } private: JavaVM* vm_; jobject jListenerGlobalRef_; +<<<<<<< HEAD jmethodID jReserveMethod_; jmethodID jUnreserveMethod_; int64_t bytesReserved_ = 0L; int64_t maxBytesReserved_ = 0L; +======= + const jmethodID jReserveMethod_; + const jmethodID jUnreserveMethod_; + std::atomic_int64_t usedBytes_{0L}; + std::atomic_int64_t peakBytes_{0L}; +>>>>>>> 33d2f2d31... [VL] Following #6526, minor fixes and improvements (#6554) }; class BacktraceAllocationListener final : public gluten::AllocationListener { diff --git a/cpp/core/memory/AllocationListener.h b/cpp/core/memory/AllocationListener.h index d43a621de9a06..1435ee9bea95a 100644 --- a/cpp/core/memory/AllocationListener.h +++ b/cpp/core/memory/AllocationListener.h @@ -48,27 +48,18 @@ class AllocationListener { /// Memory changes will be round to specified block size which aim to decrease delegated listener calls. class BlockAllocationListener final : public AllocationListener { public: - BlockAllocationListener(AllocationListener* delegated, uint64_t blockSize) + BlockAllocationListener(AllocationListener* delegated, int64_t blockSize) : delegated_(delegated), blockSize_(blockSize) {} void allocationChanged(int64_t diff) override { if (diff == 0) { return; } - if (diff > 0) { - if (reservationBytes_ - usedBytes_ < diff) { - auto roundSize = (diff + (blockSize_ - 1)) / blockSize_ * blockSize_; - delegated_->allocationChanged(roundSize); - reservationBytes_ += roundSize; - peakBytes_ = std::max(peakBytes_, reservationBytes_); - } - usedBytes_ += diff; - } else { - usedBytes_ += diff; - auto unreservedSize = (reservationBytes_ - usedBytes_) / blockSize_ * blockSize_; - delegated_->allocationChanged(-unreservedSize); - reservationBytes_ -= unreservedSize; + int64_t granted = reserve(diff); + if (granted == 0) { + return; } + delegated_->allocationChanged(granted); } int64_t currentBytes() override { @@ -80,11 +71,38 @@ class BlockAllocationListener final : public AllocationListener { } private: +<<<<<<< HEAD AllocationListener* delegated_; uint64_t blockSize_{0L}; uint64_t usedBytes_{0L}; uint64_t peakBytes_{0L}; uint64_t reservationBytes_{0L}; +======= + inline int64_t reserve(int64_t diff) { + std::lock_guard lock(mutex_); + usedBytes_ += diff; + int64_t newBlockCount; + if (usedBytes_ == 0) { + newBlockCount = 0; + } else { + // ceil to get the required block number + newBlockCount = (usedBytes_ - 1) / blockSize_ + 1; + } + int64_t bytesGranted = (newBlockCount - blocksReserved_) * blockSize_; + blocksReserved_ = newBlockCount; + peakBytes_ = std::max(peakBytes_, usedBytes_); + return bytesGranted; + } + + AllocationListener* const delegated_; + const uint64_t blockSize_; + int64_t blocksReserved_{0L}; + int64_t usedBytes_{0L}; + int64_t peakBytes_{0L}; + int64_t reservationBytes_{0L}; + + mutable std::mutex mutex_; +>>>>>>> 33d2f2d31... [VL] Following #6526, minor fixes and improvements (#6554) }; } // namespace gluten diff --git a/cpp/core/memory/MemoryAllocator.cc b/cpp/core/memory/MemoryAllocator.cc index ac869219d5c13..c637c6a9c13d9 100644 --- a/cpp/core/memory/MemoryAllocator.cc +++ b/cpp/core/memory/MemoryAllocator.cc @@ -93,7 +93,16 @@ int64_t ListenableMemoryAllocator::peakBytes() const { void ListenableMemoryAllocator::updateUsage(int64_t size) { listener_->allocationChanged(size); usedBytes_ += size; - peakBytes_ = std::max(peakBytes_, usedBytes_); + while (true) { + int64_t savedPeakBytes = peakBytes_; + if (usedBytes_ <= savedPeakBytes) { + break; + } + // usedBytes_ > savedPeakBytes, update peak + if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) { + break; + } + } } bool StdMemoryAllocator::allocate(int64_t size, void** out) {