From f43313be37febdc2eb3baad33aea7e48a88f31f7 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 27 Jun 2024 09:24:33 +0800 Subject: [PATCH] fixup --- .../VeloxCelebornColumnarBatchSerializer.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala index 699626db12c50..1f125a164c8b6 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala @@ -19,7 +19,6 @@ package org.apache.spark.shuffle import org.apache.gluten.GlutenConfig import org.apache.gluten.exec.Runtimes import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators -import org.apache.gluten.memory.nmm.NativeMemoryManagers import org.apache.gluten.utils.ArrowAbiUtil import org.apache.gluten.vectorized._ @@ -65,7 +64,7 @@ private class CelebornColumnarBatchSerializerInstance( extends SerializerInstance with Logging { - private val nmm = NativeMemoryManagers.contextInstance("ShuffleReader") + private val runtime = Runtimes.contextInstance("CelebornShuffleReader") private val shuffleReaderHandle = { val allocator: BufferAllocator = ArrowBufferAllocators @@ -86,12 +85,11 @@ private class CelebornColumnarBatchSerializerInstance( GlutenConfig.getConf.columnarShuffleCodecBackend.orNull val shuffleWriterType = conf.get("spark.celeborn.client.spark.shuffle.writer", "hash").toLowerCase(Locale.ROOT) - val jniWrapper = ShuffleReaderJniWrapper.create() + val jniWrapper = ShuffleReaderJniWrapper.create(runtime) val batchSize = GlutenConfig.getConf.maxBatchSize val handle = jniWrapper .make( cSchema.memoryAddress(), - nmm.getNativeInstanceHandle, compressionCodec, compressionCodecBackend, batchSize, @@ -119,11 +117,10 @@ private class CelebornColumnarBatchSerializerInstance( with TaskResource { private val byteIn: JniByteInputStream = JniByteInputStreams.create(in) private val wrappedOut: GeneralOutIterator = new ColumnarBatchOutIterator( - Runtimes.contextInstance(), + runtime, ShuffleReaderJniWrapper - .create() - .readStream(shuffleReaderHandle, byteIn), - nmm) + .create(runtime) + .readStream(shuffleReaderHandle, byteIn)) private var cb: ColumnarBatch = _