From 4bec2eb2f9a62ca50757b4413eca99d4ab1fa68d Mon Sep 17 00:00:00 2001 From: Kerwin Zhang Date: Wed, 10 May 2023 20:56:12 +0800 Subject: [PATCH] [GLUTEN-1583][VL] Fix the memory leak of shuffle reader (#1602) --- .../spark/shuffle/CelebornColumnarBatchSerializer.scala | 6 ++++-- .../vectorized/GlutenColumnarBatchSerializer.scala | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala b/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala index 858aa9c21417..ea1d10a65b1e 100644 --- a/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala +++ b/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala @@ -61,13 +61,14 @@ private class CelebornColumnarBatchSerializerInstance(schema: StructType, .contextInstance() .newChildAllocator("GlutenColumnarBatch deserialize", 0, Long.MaxValue) + private val jniByteInputStream = JniByteInputStreams.create(in) + private lazy val shuffleReaderHandle = { val cSchema = ArrowSchema.allocateNew(ArrowBufferAllocators.contextInstance()) val arrowSchema = SparkSchemaUtil.toArrowSchema(schema, SQLConf.get.sessionLocalTimeZone) GlutenArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema) - val handle = ShuffleReaderJniWrapper.make( - JniByteInputStreams.create(in), cSchema.memoryAddress(), + val handle = ShuffleReaderJniWrapper.make(jniByteInputStream, cSchema.memoryAddress(), NativeMemoryAllocators.contextInstance.getNativeInstanceId) cSchema.close() // Close shuffle reader instance as lately as the end of task processing, @@ -175,6 +176,7 @@ private class CelebornColumnarBatchSerializerInstance(schema: StructType, readBatchNumRows.set(numRowsTotal.toDouble / numBatchesTotal) } numOutputRows += numRowsTotal + jniByteInputStream.close() if (cb != null) cb.close() allocator.close() isClosed = true diff --git a/gluten-data/src/main/scala/io/glutenproject/vectorized/GlutenColumnarBatchSerializer.scala b/gluten-data/src/main/scala/io/glutenproject/vectorized/GlutenColumnarBatchSerializer.scala index 9deb59092723..4ce974ef54f8 100644 --- a/gluten-data/src/main/scala/io/glutenproject/vectorized/GlutenColumnarBatchSerializer.scala +++ b/gluten-data/src/main/scala/io/glutenproject/vectorized/GlutenColumnarBatchSerializer.scala @@ -59,13 +59,14 @@ private class GlutenColumnarBatchSerializerInstance(schema: StructType, .contextInstance() .newChildAllocator("GlutenColumnarBatch deserialize", 0, Long.MaxValue) + private val jniByteInputStream = JniByteInputStreams.create(in) + private val shuffleReaderHandle = { val cSchema = ArrowSchema.allocateNew(ArrowBufferAllocators.contextInstance()) val arrowSchema = SparkSchemaUtil.toArrowSchema(schema, SQLConf.get.sessionLocalTimeZone) GlutenArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema) - val handle = ShuffleReaderJniWrapper.make( - JniByteInputStreams.create(in), cSchema.memoryAddress(), + val handle = ShuffleReaderJniWrapper.make(jniByteInputStream, cSchema.memoryAddress(), NativeMemoryAllocators.contextInstance.getNativeInstanceId) cSchema.close() // Close shuffle reader instance as lately as the end of task processing, @@ -141,6 +142,7 @@ private class GlutenColumnarBatchSerializerInstance(schema: StructType, readBatchNumRows.set(numRowsTotal.toDouble / numBatchesTotal) } numOutputRows += numRowsTotal + jniByteInputStream.close() if (cb != null) cb.close() allocator.close() isClosed = true