Skip to content

Commit

Permalink
fix(analytical): Fix memory leak for FFI Objects (#4185)
Browse files Browse the repository at this point in the history
As titled.
  • Loading branch information
zhanglei1949 authored Aug 27, 2024
1 parent d934a83 commit bbd05f8
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 15 deletions.
23 changes: 21 additions & 2 deletions analytical_engine/core/context/java_context_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,27 @@ class JavaContextBase : public grape::ContextBase {
}
JNIEnvMark m;
if (m.env()) {
m.env()->DeleteGlobalRef(url_class_loader_object_);
VLOG(1) << "Delete URL class loader";
// Delete the java objects
if (app_object_) {
m.env()->DeleteGlobalRef(app_object_);
VLOG(1) << "Delete app object";
}
if (context_object_) {
m.env()->DeleteGlobalRef(context_object_);
VLOG(1) << "Delete context object";
}
if (fragment_object_) {
m.env()->DeleteGlobalRef(fragment_object_);
VLOG(1) << "Delete fragment object";
}
if (mm_object_) {
m.env()->DeleteGlobalRef(mm_object_);
VLOG(1) << "Delete message manager object";
}
if (url_class_loader_object_) {
m.env()->DeleteGlobalRef(url_class_loader_object_);
VLOG(1) << "Delete URL class loader object";
}
} else {
LOG(ERROR) << "JNI env not available.";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public void receiveMessages() {
"Frag [{}] totally Received [{}] bytes from others starting deserialization",
fragId,
bytesOfReceivedMsg);
tmpVector.delete();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,26 @@ private static String[] actAsCoordinator(
outputStream.writeUTF(res[i]);
info(workerId, "from worker: " + i + ": " + res[i]);
}
outputStream.writeLong(0, outputStream.bytesWriten() - 8);
outputStream.finishSetting();

// Distribute to others;
for (int dstWorkerId = 1; dstWorkerId < workerNum; ++dstWorkerId) {
FFIByteVector tempVec = (FFIByteVector) FFIByteVectorFactory.INSTANCE.create();
tempVec.appendVector(0, outputStream.getVector());
info(workerId, " sending to worker: [" + dstWorkerId + "] " + tempVec.size());
communicator.sendTo(dstWorkerId, tempVec);
info(
workerId,
" Successfully send to worker: [" + dstWorkerId + "] " + tempVec.size());
tempVec.delete();
}
vec.delete();
outputStream.close();
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
outputStream.writeLong(0, outputStream.bytesWriten() - 8);
outputStream.finishSetting();

// Distribute to others;
for (int dstWorkerId = 1; dstWorkerId < workerNum; ++dstWorkerId) {
FFIByteVector tempVec = (FFIByteVector) FFIByteVectorFactory.INSTANCE.create();
tempVec.appendVector(0, outputStream.getVector());
info(workerId, " sending to worker: [" + dstWorkerId + "] " + tempVec.size());
communicator.sendTo(dstWorkerId, tempVec);
info(workerId, " Successfully send to worker: [" + dstWorkerId + "] " + tempVec.size());
}
return res;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@ private void receiveMessage() {
projectedFragment.fid(),
bytesOfReceivedMsg);
countDownLatch.countDown();
messageInBuffer.delete();
tmpVector.delete();
});
}
try {
Expand Down Expand Up @@ -513,6 +515,7 @@ private void receiveEdgeMessage() {
projectedFragment.fid(),
receivedMsgSize);
countDownLatch.countDown();
messageInBuffer.delete();
});
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import static com.alibaba.graphscope.utils.CppHeaderName.GRAPE_PARALLEL_MESSAGE_IN_BUFFER_H;

import com.alibaba.fastffi.CXXHead;
import com.alibaba.fastffi.CXXPointer;
import com.alibaba.fastffi.CXXReference;
import com.alibaba.fastffi.FFIFactory;
import com.alibaba.fastffi.FFIGen;
import com.alibaba.fastffi.FFINameAlias;
import com.alibaba.fastffi.FFIPointer;
import com.alibaba.fastffi.FFISkip;
import com.alibaba.fastffi.FFITypeAlias;
import com.alibaba.graphscope.ds.Vertex;
Expand All @@ -50,7 +50,7 @@
CORE_JAVA_TYPE_ALIAS_H,
CORE_JAVA_JAVA_MESSAGES_H
})
public interface MessageInBuffer extends FFIPointer {
public interface MessageInBuffer extends CXXPointer {
default <OID_T, VID_T, VDATA_T, EDATA_T, MSG_T, @FFISkip UNUSED_T> boolean getMessage(
@CXXReference IFragment<OID_T, VID_T, VDATA_T, EDATA_T> frag,
@CXXReference Vertex<VID_T> vertex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,8 @@ public void run() {
}
}
countDownLatch.countDown();
messageInBuffer.delete();
vertex.delete();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ public void run() {
}
}
countDownLatch.countDown();
messageInBuffer.delete();
vertex.delete();
}
});
}
Expand Down Expand Up @@ -190,6 +192,8 @@ public void run() {
}
}
countDownLatch.countDown();
messageInBuffer.delete();
vertex.delete();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,4 +643,9 @@ public int available() throws IOException {
public long longAvailable() {
return readableLimit - offset;
}

@Override
public void close() throws IOException {
vector.delete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -412,4 +412,9 @@ public void write(int b) throws IOException {
vector.setRawByte(offset, (byte) b);
offset += 1;
}

@Override
public void close() throws IOException {
vector.delete();
}
}

0 comments on commit bbd05f8

Please sign in to comment.