Skip to content

Commit

Permalink
[GLUTEN-1583][VL] Fix the memory leak of shuffle reader (apache#1602)
Browse files Browse the repository at this point in the history
  • Loading branch information
kerwin-zk authored May 10, 2023
1 parent 2daae78 commit 4bec2eb
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,14 @@ private class CelebornColumnarBatchSerializerInstance(schema: StructType,
.contextInstance()
.newChildAllocator("GlutenColumnarBatch deserialize", 0, Long.MaxValue)

private val jniByteInputStream = JniByteInputStreams.create(in)

private lazy val shuffleReaderHandle = {
val cSchema = ArrowSchema.allocateNew(ArrowBufferAllocators.contextInstance())
val arrowSchema =
SparkSchemaUtil.toArrowSchema(schema, SQLConf.get.sessionLocalTimeZone)
GlutenArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
val handle = ShuffleReaderJniWrapper.make(
JniByteInputStreams.create(in), cSchema.memoryAddress(),
val handle = ShuffleReaderJniWrapper.make(jniByteInputStream, cSchema.memoryAddress(),
NativeMemoryAllocators.contextInstance.getNativeInstanceId)
cSchema.close()
// Close shuffle reader instance as lately as the end of task processing,
Expand Down Expand Up @@ -175,6 +176,7 @@ private class CelebornColumnarBatchSerializerInstance(schema: StructType,
readBatchNumRows.set(numRowsTotal.toDouble / numBatchesTotal)
}
numOutputRows += numRowsTotal
jniByteInputStream.close()
if (cb != null) cb.close()
allocator.close()
isClosed = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ private class GlutenColumnarBatchSerializerInstance(schema: StructType,
.contextInstance()
.newChildAllocator("GlutenColumnarBatch deserialize", 0, Long.MaxValue)

private val jniByteInputStream = JniByteInputStreams.create(in)

private val shuffleReaderHandle = {
val cSchema = ArrowSchema.allocateNew(ArrowBufferAllocators.contextInstance())
val arrowSchema =
SparkSchemaUtil.toArrowSchema(schema, SQLConf.get.sessionLocalTimeZone)
GlutenArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
val handle = ShuffleReaderJniWrapper.make(
JniByteInputStreams.create(in), cSchema.memoryAddress(),
val handle = ShuffleReaderJniWrapper.make(jniByteInputStream, cSchema.memoryAddress(),
NativeMemoryAllocators.contextInstance.getNativeInstanceId)
cSchema.close()
// Close shuffle reader instance as lately as the end of task processing,
Expand Down Expand Up @@ -141,6 +142,7 @@ private class GlutenColumnarBatchSerializerInstance(schema: StructType,
readBatchNumRows.set(numRowsTotal.toDouble / numBatchesTotal)
}
numOutputRows += numRowsTotal
jniByteInputStream.close()
if (cb != null) cb.close()
allocator.close()
isClosed = true
Expand Down

0 comments on commit 4bec2eb

Please sign in to comment.