Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Add thread_safe to several VeloxRuntime classes #6526

Merged
merged 2 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ static inline gluten::CompressionMode getCompressionMode(JNIEnv* env, jstring co
}
}

/*
NOTE: the class must be thread safe
*/

class SparkAllocationListener final : public gluten::AllocationListener {
public:
SparkAllocationListener(JavaVM* vm, jobject jListenerLocalRef, jmethodID jReserveMethod, jmethodID jUnreserveMethod)
Expand Down Expand Up @@ -399,8 +403,9 @@ class SparkAllocationListener final : public gluten::AllocationListener {
env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, size);
checkException(env);
}
bytesReserved_ += size;
maxBytesReserved_ = std::max(bytesReserved_, maxBytesReserved_);
// atomic operation is enough here, no need to use mutex
bytesReserved_.fetch_add(size);
maxBytesReserved_.store(std::max(bytesReserved_.load(), maxBytesReserved_.load()));
}

int64_t currentBytes() override {
Expand All @@ -414,10 +419,10 @@ class SparkAllocationListener final : public gluten::AllocationListener {
private:
JavaVM* vm_;
jobject jListenerGlobalRef_;
jmethodID jReserveMethod_;
jmethodID jUnreserveMethod_;
int64_t bytesReserved_ = 0L;
int64_t maxBytesReserved_ = 0L;
const jmethodID jReserveMethod_;
const jmethodID jUnreserveMethod_;
std::atomic_int64_t bytesReserved_{0L};
std::atomic_int64_t maxBytesReserved_{0L};
};

class BacktraceAllocationListener final : public gluten::AllocationListener {
Expand Down
17 changes: 13 additions & 4 deletions cpp/core/memory/AllocationListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <algorithm>
#include <memory>
#include <mutex>

namespace gluten {

Expand Down Expand Up @@ -46,6 +47,7 @@ class AllocationListener {
};

/// Memory changes will be round to specified block size which aim to decrease delegated listener calls.
// The class must be thread safe
class BlockAllocationListener final : public AllocationListener {
public:
BlockAllocationListener(AllocationListener* delegated, uint64_t blockSize)
Expand All @@ -55,19 +57,24 @@ class BlockAllocationListener final : public AllocationListener {
if (diff == 0) {
return;
}
std::unique_lock<std::mutex> guard{mutex_};
if (diff > 0) {
if (reservationBytes_ - usedBytes_ < diff) {
auto roundSize = (diff + (blockSize_ - 1)) / blockSize_ * blockSize_;
delegated_->allocationChanged(roundSize);
reservationBytes_ += roundSize;
peakBytes_ = std::max(peakBytes_, reservationBytes_);
guard.unlock();
// unnecessary to lock the delegated listener, assume it's thread safe
delegated_->allocationChanged(roundSize);
}
usedBytes_ += diff;
} else {
usedBytes_ += diff;
auto unreservedSize = (reservationBytes_ - usedBytes_) / blockSize_ * blockSize_;
delegated_->allocationChanged(-unreservedSize);
reservationBytes_ -= unreservedSize;
guard.unlock();
// unnecessary to lock the delegated listener
delegated_->allocationChanged(-unreservedSize);
}
}

Expand All @@ -80,11 +87,13 @@ class BlockAllocationListener final : public AllocationListener {
}

private:
AllocationListener* delegated_;
uint64_t blockSize_{0L};
AllocationListener* const delegated_;
const uint64_t blockSize_;
uint64_t usedBytes_{0L};
uint64_t peakBytes_{0L};
uint64_t reservationBytes_{0L};

mutable std::mutex mutex_;
};

} // namespace gluten
4 changes: 2 additions & 2 deletions cpp/core/memory/MemoryAllocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ int64_t ListenableMemoryAllocator::peakBytes() const {

void ListenableMemoryAllocator::updateUsage(int64_t size) {
listener_->allocationChanged(size);
usedBytes_ += size;
peakBytes_ = std::max(peakBytes_, usedBytes_);
usedBytes_.fetch_add(size);
peakBytes_.store(std::max(peakBytes_.load(), usedBytes_.load()));
}

bool StdMemoryAllocator::allocate(int64_t size, void** out) {
Expand Down
9 changes: 5 additions & 4 deletions cpp/core/memory/MemoryAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class MemoryAllocator {
virtual int64_t peakBytes() const = 0;
};

// The class must be thread safe
class ListenableMemoryAllocator final : public MemoryAllocator {
public:
explicit ListenableMemoryAllocator(MemoryAllocator* delegated, AllocationListener* listener)
Expand All @@ -69,10 +70,10 @@ class ListenableMemoryAllocator final : public MemoryAllocator {

private:
void updateUsage(int64_t size);
MemoryAllocator* delegated_;
AllocationListener* listener_;
uint64_t usedBytes_{0L};
uint64_t peakBytes_{0L};
MemoryAllocator* const delegated_;
AllocationListener* const listener_;
std::atomic_int64_t usedBytes_{0L};
std::atomic_int64_t peakBytes_{0L};
};

class StdMemoryAllocator final : public MemoryAllocator {
Expand Down
1 change: 1 addition & 0 deletions cpp/velox/memory/VeloxMemoryManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

namespace gluten {

// Make sure the class is thread safe
class VeloxMemoryManager final : public MemoryManager {
public:
VeloxMemoryManager(std::unique_ptr<AllocationListener> listener);
Expand Down
Loading