Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Jun 27, 2024
1 parent e4343c3 commit f43313b
Showing 1 changed file with 5 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.shuffle
import org.apache.gluten.GlutenConfig
import org.apache.gluten.exec.Runtimes
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.memory.nmm.NativeMemoryManagers
import org.apache.gluten.utils.ArrowAbiUtil
import org.apache.gluten.vectorized._

Expand Down Expand Up @@ -65,7 +64,7 @@ private class CelebornColumnarBatchSerializerInstance(
extends SerializerInstance
with Logging {

private val nmm = NativeMemoryManagers.contextInstance("ShuffleReader")
private val runtime = Runtimes.contextInstance("CelebornShuffleReader")

private val shuffleReaderHandle = {
val allocator: BufferAllocator = ArrowBufferAllocators
Expand All @@ -86,12 +85,11 @@ private class CelebornColumnarBatchSerializerInstance(
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull
val shuffleWriterType =
conf.get("spark.celeborn.client.spark.shuffle.writer", "hash").toLowerCase(Locale.ROOT)
val jniWrapper = ShuffleReaderJniWrapper.create()
val jniWrapper = ShuffleReaderJniWrapper.create(runtime)
val batchSize = GlutenConfig.getConf.maxBatchSize
val handle = jniWrapper
.make(
cSchema.memoryAddress(),
nmm.getNativeInstanceHandle,
compressionCodec,
compressionCodecBackend,
batchSize,
Expand Down Expand Up @@ -119,11 +117,10 @@ private class CelebornColumnarBatchSerializerInstance(
with TaskResource {
private val byteIn: JniByteInputStream = JniByteInputStreams.create(in)
private val wrappedOut: GeneralOutIterator = new ColumnarBatchOutIterator(
Runtimes.contextInstance(),
runtime,
ShuffleReaderJniWrapper
.create()
.readStream(shuffleReaderHandle, byteIn),
nmm)
.create(runtime)
.readStream(shuffleReaderHandle, byteIn))

private var cb: ColumnarBatch = _

Expand Down

0 comments on commit f43313b

Please sign in to comment.