diff --git a/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala b/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala index 858aa9c21417..ea1d10a65b1e 100644 --- a/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala +++ b/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala @@ -61,13 +61,14 @@ private class CelebornColumnarBatchSerializerInstance(schema: StructType, .contextInstance() .newChildAllocator("GlutenColumnarBatch deserialize", 0, Long.MaxValue) + private val jniByteInputStream = JniByteInputStreams.create(in) + private lazy val shuffleReaderHandle = { val cSchema = ArrowSchema.allocateNew(ArrowBufferAllocators.contextInstance()) val arrowSchema = SparkSchemaUtil.toArrowSchema(schema, SQLConf.get.sessionLocalTimeZone) GlutenArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema) - val handle = ShuffleReaderJniWrapper.make( - JniByteInputStreams.create(in), cSchema.memoryAddress(), + val handle = ShuffleReaderJniWrapper.make(jniByteInputStream, cSchema.memoryAddress(), NativeMemoryAllocators.contextInstance.getNativeInstanceId) cSchema.close() // Close shuffle reader instance as lately as the end of task processing, @@ -175,6 +176,7 @@ private class CelebornColumnarBatchSerializerInstance(schema: StructType, readBatchNumRows.set(numRowsTotal.toDouble / numBatchesTotal) } numOutputRows += numRowsTotal + jniByteInputStream.close() if (cb != null) cb.close() allocator.close() isClosed = true diff --git a/gluten-data/src/main/scala/io/glutenproject/vectorized/GlutenColumnarBatchSerializer.scala b/gluten-data/src/main/scala/io/glutenproject/vectorized/GlutenColumnarBatchSerializer.scala index 9deb59092723..4ce974ef54f8 100644 --- a/gluten-data/src/main/scala/io/glutenproject/vectorized/GlutenColumnarBatchSerializer.scala +++ b/gluten-data/src/main/scala/io/glutenproject/vectorized/GlutenColumnarBatchSerializer.scala @@ -59,13 +59,14 @@ private class GlutenColumnarBatchSerializerInstance(schema: StructType, .contextInstance() .newChildAllocator("GlutenColumnarBatch deserialize", 0, Long.MaxValue) + private val jniByteInputStream = JniByteInputStreams.create(in) + private val shuffleReaderHandle = { val cSchema = ArrowSchema.allocateNew(ArrowBufferAllocators.contextInstance()) val arrowSchema = SparkSchemaUtil.toArrowSchema(schema, SQLConf.get.sessionLocalTimeZone) GlutenArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema) - val handle = ShuffleReaderJniWrapper.make( - JniByteInputStreams.create(in), cSchema.memoryAddress(), + val handle = ShuffleReaderJniWrapper.make(jniByteInputStream, cSchema.memoryAddress(), NativeMemoryAllocators.contextInstance.getNativeInstanceId) cSchema.close() // Close shuffle reader instance as lately as the end of task processing, @@ -141,6 +142,7 @@ private class GlutenColumnarBatchSerializerInstance(schema: StructType, readBatchNumRows.set(numRowsTotal.toDouble / numBatchesTotal) } numOutputRows += numRowsTotal + jniByteInputStream.close() if (cb != null) cb.close() allocator.close() isClosed = true