diff --git a/analytical_engine/core/context/java_context_base.h b/analytical_engine/core/context/java_context_base.h index cec36bc86619..6c4601d2a0f6 100644 --- a/analytical_engine/core/context/java_context_base.h +++ b/analytical_engine/core/context/java_context_base.h @@ -86,8 +86,27 @@ class JavaContextBase : public grape::ContextBase { } JNIEnvMark m; if (m.env()) { - m.env()->DeleteGlobalRef(url_class_loader_object_); - VLOG(1) << "Delete URL class loader"; + // Delete the java objects + if (app_object_) { + m.env()->DeleteGlobalRef(app_object_); + VLOG(1) << "Delete app object"; + } + if (context_object_) { + m.env()->DeleteGlobalRef(context_object_); + VLOG(1) << "Delete context object"; + } + if (fragment_object_) { + m.env()->DeleteGlobalRef(fragment_object_); + VLOG(1) << "Delete fragment object"; + } + if (mm_object_) { + m.env()->DeleteGlobalRef(mm_object_); + VLOG(1) << "Delete message manager object"; + } + if (url_class_loader_object_) { + m.env()->DeleteGlobalRef(url_class_loader_object_); + VLOG(1) << "Delete URL class loader object"; + } } else { LOG(ERROR) << "JNI env not available."; } diff --git a/analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/parallel/mm/impl/GiraphMpiMessageManager.java b/analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/parallel/mm/impl/GiraphMpiMessageManager.java index a24cbcc670a1..cb5fbd0476e1 100644 --- a/analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/parallel/mm/impl/GiraphMpiMessageManager.java +++ b/analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/parallel/mm/impl/GiraphMpiMessageManager.java @@ -117,6 +117,7 @@ public void receiveMessages() { "Frag [{}] totally Received [{}] bytes from others starting deserialization", fragId, bytesOfReceivedMsg); + tmpVector.delete(); } /** diff --git a/analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/parallel/utils/Utils.java b/analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/parallel/utils/Utils.java index 6e4e5126934e..38f0399b2610 100644 --- a/analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/parallel/utils/Utils.java +++ b/analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/parallel/utils/Utils.java @@ -78,20 +78,26 @@ private static String[] actAsCoordinator( outputStream.writeUTF(res[i]); info(workerId, "from worker: " + i + ": " + res[i]); } + outputStream.writeLong(0, outputStream.bytesWriten() - 8); + outputStream.finishSetting(); + + // Distribute to others; + for (int dstWorkerId = 1; dstWorkerId < workerNum; ++dstWorkerId) { + FFIByteVector tempVec = (FFIByteVector) FFIByteVectorFactory.INSTANCE.create(); + tempVec.appendVector(0, outputStream.getVector()); + info(workerId, " sending to worker: [" + dstWorkerId + "] " + tempVec.size()); + communicator.sendTo(dstWorkerId, tempVec); + info( + workerId, + " Successfully send to worker: [" + dstWorkerId + "] " + tempVec.size()); + tempVec.delete(); + } + vec.delete(); + outputStream.close(); + inputStream.close(); } catch (IOException e) { e.printStackTrace(); } - outputStream.writeLong(0, outputStream.bytesWriten() - 8); - outputStream.finishSetting(); - - // Distribute to others; - for (int dstWorkerId = 1; dstWorkerId < workerNum; ++dstWorkerId) { - FFIByteVector tempVec = (FFIByteVector) FFIByteVectorFactory.INSTANCE.create(); - tempVec.appendVector(0, outputStream.getVector()); - info(workerId, " sending to worker: [" + dstWorkerId + "] " + tempVec.size()); - communicator.sendTo(dstWorkerId, tempVec); - info(workerId, " Successfully send to worker: [" + dstWorkerId + "] " + tempVec.size()); - } return res; } diff --git a/analytical_engine/java/grape-graphx/src/main/java/com/alibaba/graphscope/graphx/GraphXParallelPIE.java b/analytical_engine/java/grape-graphx/src/main/java/com/alibaba/graphscope/graphx/GraphXParallelPIE.java index a6dfb7e3a7a8..a25b33e35feb 100644 --- a/analytical_engine/java/grape-graphx/src/main/java/com/alibaba/graphscope/graphx/GraphXParallelPIE.java +++ b/analytical_engine/java/grape-graphx/src/main/java/com/alibaba/graphscope/graphx/GraphXParallelPIE.java @@ -481,6 +481,8 @@ private void receiveMessage() { projectedFragment.fid(), bytesOfReceivedMsg); countDownLatch.countDown(); + messageInBuffer.delete(); + tmpVector.delete(); }); } try { @@ -513,6 +515,7 @@ private void receiveEdgeMessage() { projectedFragment.fid(), receivedMsgSize); countDownLatch.countDown(); + messageInBuffer.delete(); }); } try { diff --git a/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/parallel/MessageInBuffer.java b/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/parallel/MessageInBuffer.java index 59ca7443d884..ce6f9f0192fc 100644 --- a/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/parallel/MessageInBuffer.java +++ b/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/parallel/MessageInBuffer.java @@ -25,11 +25,11 @@ import static com.alibaba.graphscope.utils.CppHeaderName.GRAPE_PARALLEL_MESSAGE_IN_BUFFER_H; import com.alibaba.fastffi.CXXHead; +import com.alibaba.fastffi.CXXPointer; import com.alibaba.fastffi.CXXReference; import com.alibaba.fastffi.FFIFactory; import com.alibaba.fastffi.FFIGen; import com.alibaba.fastffi.FFINameAlias; -import com.alibaba.fastffi.FFIPointer; import com.alibaba.fastffi.FFISkip; import com.alibaba.fastffi.FFITypeAlias; import com.alibaba.graphscope.ds.Vertex; @@ -50,7 +50,7 @@ CORE_JAVA_TYPE_ALIAS_H, CORE_JAVA_JAVA_MESSAGES_H }) -public interface MessageInBuffer extends FFIPointer { +public interface MessageInBuffer extends CXXPointer { default boolean getMessage( @CXXReference IFragment frag, @CXXReference Vertex vertex, diff --git a/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/parallel/ParallelMessageManager.java b/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/parallel/ParallelMessageManager.java index 52fdf3d5f0ce..18cd3985d138 100644 --- a/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/parallel/ParallelMessageManager.java +++ b/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/parallel/ParallelMessageManager.java @@ -509,6 +509,8 @@ public void run() { } } countDownLatch.countDown(); + messageInBuffer.delete(); + vertex.delete(); } }); } diff --git a/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/parallel/ParallelPropertyMessageManager.java b/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/parallel/ParallelPropertyMessageManager.java index 314cf640e6b9..772a4d7d593f 100644 --- a/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/parallel/ParallelPropertyMessageManager.java +++ b/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/parallel/ParallelPropertyMessageManager.java @@ -138,6 +138,8 @@ public void run() { } } countDownLatch.countDown(); + messageInBuffer.delete(); + vertex.delete(); } }); } @@ -190,6 +192,8 @@ public void run() { } } countDownLatch.countDown(); + messageInBuffer.delete(); + vertex.delete(); } }); } diff --git a/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/serialization/FFIByteVectorInputStream.java b/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/serialization/FFIByteVectorInputStream.java index 81e9f21b8711..3ce90e1b9293 100644 --- a/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/serialization/FFIByteVectorInputStream.java +++ b/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/serialization/FFIByteVectorInputStream.java @@ -643,4 +643,9 @@ public int available() throws IOException { public long longAvailable() { return readableLimit - offset; } + + @Override + public void close() throws IOException { + vector.delete(); + } } diff --git a/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/serialization/FFIByteVectorOutputStream.java b/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/serialization/FFIByteVectorOutputStream.java index 8956d8bd87b4..543f32db60da 100644 --- a/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/serialization/FFIByteVectorOutputStream.java +++ b/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/serialization/FFIByteVectorOutputStream.java @@ -412,4 +412,9 @@ public void write(int b) throws IOException { vector.setRawByte(offset, (byte) b); offset += 1; } + + @Override + public void close() throws IOException { + vector.delete(); + } }