diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h index d5c9f2b3b18b..8f8002b2c216 100644 --- a/cpp/core/jni/JniCommon.h +++ b/cpp/core/jni/JniCommon.h @@ -362,6 +362,10 @@ static inline gluten::CompressionMode getCompressionMode(JNIEnv* env, jstring co } } +/* +NOTE: the class must be thread safe + */ + class SparkAllocationListener final : public gluten::AllocationListener { public: SparkAllocationListener(JavaVM* vm, jobject jListenerLocalRef, jmethodID jReserveMethod, jmethodID jUnreserveMethod) @@ -399,8 +403,9 @@ class SparkAllocationListener final : public gluten::AllocationListener { env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, size); checkException(env); } - bytesReserved_ += size; - maxBytesReserved_ = std::max(bytesReserved_, maxBytesReserved_); + // atomic operation is enough here, no need to use mutex + bytesReserved_.fetch_add(size); + maxBytesReserved_.store(std::max(bytesReserved_.load(), maxBytesReserved_.load())); } int64_t currentBytes() override { @@ -414,10 +419,10 @@ class SparkAllocationListener final : public gluten::AllocationListener { private: JavaVM* vm_; jobject jListenerGlobalRef_; - jmethodID jReserveMethod_; - jmethodID jUnreserveMethod_; - int64_t bytesReserved_ = 0L; - int64_t maxBytesReserved_ = 0L; + const jmethodID jReserveMethod_; + const jmethodID jUnreserveMethod_; + std::atomic_int64_t bytesReserved_{0L}; + std::atomic_int64_t maxBytesReserved_{0L}; }; class BacktraceAllocationListener final : public gluten::AllocationListener { diff --git a/cpp/core/memory/AllocationListener.h b/cpp/core/memory/AllocationListener.h index d43a621de9a0..695552cefbe3 100644 --- a/cpp/core/memory/AllocationListener.h +++ b/cpp/core/memory/AllocationListener.h @@ -19,6 +19,7 @@ #include #include +#include namespace gluten { @@ -46,6 +47,7 @@ class AllocationListener { }; /// Memory changes will be round to specified block size which aim to decrease delegated listener calls. +// The class must be thread safe class BlockAllocationListener final : public AllocationListener { public: BlockAllocationListener(AllocationListener* delegated, uint64_t blockSize) @@ -55,19 +57,24 @@ class BlockAllocationListener final : public AllocationListener { if (diff == 0) { return; } + std::unique_lock guard{mutex_}; if (diff > 0) { if (reservationBytes_ - usedBytes_ < diff) { auto roundSize = (diff + (blockSize_ - 1)) / blockSize_ * blockSize_; - delegated_->allocationChanged(roundSize); reservationBytes_ += roundSize; peakBytes_ = std::max(peakBytes_, reservationBytes_); + guard.unlock(); + // unnecessary to lock the delegated listener, assume it's thread safe + delegated_->allocationChanged(roundSize); } usedBytes_ += diff; } else { usedBytes_ += diff; auto unreservedSize = (reservationBytes_ - usedBytes_) / blockSize_ * blockSize_; - delegated_->allocationChanged(-unreservedSize); reservationBytes_ -= unreservedSize; + guard.unlock(); + // unnecessary to lock the delegated listener + delegated_->allocationChanged(-unreservedSize); } } @@ -80,11 +87,13 @@ class BlockAllocationListener final : public AllocationListener { } private: - AllocationListener* delegated_; - uint64_t blockSize_{0L}; + AllocationListener* const delegated_; + const uint64_t blockSize_; uint64_t usedBytes_{0L}; uint64_t peakBytes_{0L}; uint64_t reservationBytes_{0L}; + + mutable std::mutex mutex_; }; } // namespace gluten diff --git a/cpp/core/memory/MemoryAllocator.cc b/cpp/core/memory/MemoryAllocator.cc index ac869219d5c1..01818636aa52 100644 --- a/cpp/core/memory/MemoryAllocator.cc +++ b/cpp/core/memory/MemoryAllocator.cc @@ -92,8 +92,8 @@ int64_t ListenableMemoryAllocator::peakBytes() const { void ListenableMemoryAllocator::updateUsage(int64_t size) { listener_->allocationChanged(size); - usedBytes_ += size; - peakBytes_ = std::max(peakBytes_, usedBytes_); + usedBytes_.fetch_add(size); + peakBytes_.store(std::max(peakBytes_.load(), usedBytes_.load())); } bool StdMemoryAllocator::allocate(int64_t size, void** out) { diff --git a/cpp/core/memory/MemoryAllocator.h b/cpp/core/memory/MemoryAllocator.h index bc8f9de1815e..01271cc94673 100644 --- a/cpp/core/memory/MemoryAllocator.h +++ b/cpp/core/memory/MemoryAllocator.h @@ -45,6 +45,7 @@ class MemoryAllocator { virtual int64_t peakBytes() const = 0; }; +// The class must be thread safe class ListenableMemoryAllocator final : public MemoryAllocator { public: explicit ListenableMemoryAllocator(MemoryAllocator* delegated, AllocationListener* listener) @@ -69,10 +70,10 @@ class ListenableMemoryAllocator final : public MemoryAllocator { private: void updateUsage(int64_t size); - MemoryAllocator* delegated_; - AllocationListener* listener_; - uint64_t usedBytes_{0L}; - uint64_t peakBytes_{0L}; + MemoryAllocator* const delegated_; + AllocationListener* const listener_; + std::atomic_int64_t usedBytes_{0L}; + std::atomic_int64_t peakBytes_{0L}; }; class StdMemoryAllocator final : public MemoryAllocator { diff --git a/cpp/velox/memory/VeloxMemoryManager.h b/cpp/velox/memory/VeloxMemoryManager.h index 3607ca793f3e..7a96b87e16be 100644 --- a/cpp/velox/memory/VeloxMemoryManager.h +++ b/cpp/velox/memory/VeloxMemoryManager.h @@ -25,6 +25,7 @@ namespace gluten { +// Make sure the class is thread safe class VeloxMemoryManager final : public MemoryManager { public: VeloxMemoryManager(std::unique_ptr listener);