Skip to content

Commit

Permalink
[VL] Add thread_safe to several VeloxRuntime classes (#6526)
Browse files Browse the repository at this point in the history
VeloxRuntime is shared by many threads, like task threads or parquet writter threads. We must make sure the objects hold by VeloxRuntime are thread safe.
  • Loading branch information
FelixYBW authored and weiting-chen committed Jul 25, 2024
1 parent 5e22be8 commit 6511cb3
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 16 deletions.
17 changes: 11 additions & 6 deletions cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ static inline gluten::CompressionMode getCompressionMode(JNIEnv* env, jstring co
}
}

/*
NOTE: the class must be thread safe
*/

class SparkAllocationListener final : public gluten::AllocationListener {
public:
SparkAllocationListener(JavaVM* vm, jobject jListenerLocalRef, jmethodID jReserveMethod, jmethodID jUnreserveMethod)
Expand Down Expand Up @@ -399,8 +403,9 @@ class SparkAllocationListener final : public gluten::AllocationListener {
env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, size);
checkException(env);
}
bytesReserved_ += size;
maxBytesReserved_ = std::max(bytesReserved_, maxBytesReserved_);
// atomic operation is enough here, no need to use mutex
bytesReserved_.fetch_add(size);
maxBytesReserved_.store(std::max(bytesReserved_.load(), maxBytesReserved_.load()));
}

int64_t currentBytes() override {
Expand All @@ -414,10 +419,10 @@ class SparkAllocationListener final : public gluten::AllocationListener {
private:
JavaVM* vm_;
jobject jListenerGlobalRef_;
jmethodID jReserveMethod_;
jmethodID jUnreserveMethod_;
int64_t bytesReserved_ = 0L;
int64_t maxBytesReserved_ = 0L;
const jmethodID jReserveMethod_;
const jmethodID jUnreserveMethod_;
std::atomic_int64_t bytesReserved_{0L};
std::atomic_int64_t maxBytesReserved_{0L};
};

class BacktraceAllocationListener final : public gluten::AllocationListener {
Expand Down
17 changes: 13 additions & 4 deletions cpp/core/memory/AllocationListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <algorithm>
#include <memory>
#include <mutex>

namespace gluten {

Expand Down Expand Up @@ -46,6 +47,7 @@ class AllocationListener {
};

/// Memory changes will be round to specified block size which aim to decrease delegated listener calls.
// The class must be thread safe
class BlockAllocationListener final : public AllocationListener {
public:
BlockAllocationListener(AllocationListener* delegated, uint64_t blockSize)
Expand All @@ -55,19 +57,24 @@ class BlockAllocationListener final : public AllocationListener {
if (diff == 0) {
return;
}
std::unique_lock<std::mutex> guard{mutex_};
if (diff > 0) {
if (reservationBytes_ - usedBytes_ < diff) {
auto roundSize = (diff + (blockSize_ - 1)) / blockSize_ * blockSize_;
delegated_->allocationChanged(roundSize);
reservationBytes_ += roundSize;
peakBytes_ = std::max(peakBytes_, reservationBytes_);
guard.unlock();
// unnecessary to lock the delegated listener, assume it's thread safe
delegated_->allocationChanged(roundSize);
}
usedBytes_ += diff;
} else {
usedBytes_ += diff;
auto unreservedSize = (reservationBytes_ - usedBytes_) / blockSize_ * blockSize_;
delegated_->allocationChanged(-unreservedSize);
reservationBytes_ -= unreservedSize;
guard.unlock();
// unnecessary to lock the delegated listener
delegated_->allocationChanged(-unreservedSize);
}
}

Expand All @@ -80,11 +87,13 @@ class BlockAllocationListener final : public AllocationListener {
}

private:
AllocationListener* delegated_;
uint64_t blockSize_{0L};
AllocationListener* const delegated_;
const uint64_t blockSize_;
uint64_t usedBytes_{0L};
uint64_t peakBytes_{0L};
uint64_t reservationBytes_{0L};

mutable std::mutex mutex_;
};

} // namespace gluten
4 changes: 2 additions & 2 deletions cpp/core/memory/MemoryAllocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ int64_t ListenableMemoryAllocator::peakBytes() const {

void ListenableMemoryAllocator::updateUsage(int64_t size) {
listener_->allocationChanged(size);
usedBytes_ += size;
peakBytes_ = std::max(peakBytes_, usedBytes_);
usedBytes_.fetch_add(size);
peakBytes_.store(std::max(peakBytes_.load(), usedBytes_.load()));
}

bool StdMemoryAllocator::allocate(int64_t size, void** out) {
Expand Down
9 changes: 5 additions & 4 deletions cpp/core/memory/MemoryAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class MemoryAllocator {
virtual int64_t peakBytes() const = 0;
};

// The class must be thread safe
class ListenableMemoryAllocator final : public MemoryAllocator {
public:
explicit ListenableMemoryAllocator(MemoryAllocator* delegated, AllocationListener* listener)
Expand All @@ -69,10 +70,10 @@ class ListenableMemoryAllocator final : public MemoryAllocator {

private:
void updateUsage(int64_t size);
MemoryAllocator* delegated_;
AllocationListener* listener_;
uint64_t usedBytes_{0L};
uint64_t peakBytes_{0L};
MemoryAllocator* const delegated_;
AllocationListener* const listener_;
std::atomic_int64_t usedBytes_{0L};
std::atomic_int64_t peakBytes_{0L};
};

class StdMemoryAllocator final : public MemoryAllocator {
Expand Down
1 change: 1 addition & 0 deletions cpp/velox/memory/VeloxMemoryManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

namespace gluten {

// Make sure the class is thread safe
class VeloxMemoryManager final : public MemoryManager {
public:
VeloxMemoryManager(std::unique_ptr<AllocationListener> listener);
Expand Down

0 comments on commit 6511cb3

Please sign in to comment.