From 8584eb32d4a4f3090e5c362e1448f6690f650738 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 29 Jun 2024 15:18:41 -0700 Subject: [PATCH] feat: Use unified allocator for execution iterators --- common/src/main/scala/org/apache/comet/package.scala | 3 +++ .../scala/org/apache/comet/vector/NativeUtil.scala | 6 +++--- .../scala/org/apache/comet/vector/StreamReader.scala | 12 +++++------- .../org/apache/spark/sql/CometTPCDSQuerySuite.scala | 2 ++ 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/package.scala b/common/src/main/scala/org/apache/comet/package.scala index c9aca7538..087e9d612 100644 --- a/common/src/main/scala/org/apache/comet/package.scala +++ b/common/src/main/scala/org/apache/comet/package.scala @@ -21,7 +21,10 @@ package org.apache import java.util.Properties +import org.apache.arrow.memory.RootAllocator + package object comet { + val CometArrowAllocator = new RootAllocator(Long.MaxValue) /** * Provides access to build information about the Comet libraries. This will be used by the diff --git a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala index 595c0a427..89f79c9cd 100644 --- a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala +++ b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala @@ -22,18 +22,18 @@ package org.apache.comet.vector import scala.collection.mutable import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictionaryProvider, Data} -import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.dictionary.DictionaryProvider import org.apache.spark.SparkException import org.apache.spark.sql.comet.util.Utils import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.comet.CometArrowAllocator + class NativeUtil { import Utils._ - private val allocator = new RootAllocator(Long.MaxValue) - .newChildAllocator(this.getClass.getSimpleName, 0, Long.MaxValue) + private val allocator = CometArrowAllocator private val dictionaryProvider: CDataDictionaryProvider = new CDataDictionaryProvider private val importer = new ArrowImporter(allocator) diff --git a/common/src/main/scala/org/apache/comet/vector/StreamReader.scala b/common/src/main/scala/org/apache/comet/vector/StreamReader.scala index 4a08f0521..b8106a96e 100644 --- a/common/src/main/scala/org/apache/comet/vector/StreamReader.scala +++ b/common/src/main/scala/org/apache/comet/vector/StreamReader.scala @@ -21,20 +21,20 @@ package org.apache.comet.vector import java.nio.channels.ReadableByteChannel -import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.ipc.{ArrowStreamReader, ReadChannel} import org.apache.arrow.vector.ipc.message.MessageChannelReader import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.comet.CometArrowAllocator + /** * A reader that consumes Arrow data from an input channel, and produces Comet batches. */ case class StreamReader(channel: ReadableByteChannel, source: String) extends AutoCloseable { - private var allocator = new RootAllocator(Long.MaxValue) - .newChildAllocator(s"${this.getClass.getSimpleName}/$source", 0, Long.MaxValue) - private val channelReader = new MessageChannelReader(new ReadChannel(channel), allocator) - private var arrowReader = new ArrowStreamReader(channelReader, allocator) + private val channelReader = + new MessageChannelReader(new ReadChannel(channel), CometArrowAllocator) + private var arrowReader = new ArrowStreamReader(channelReader, CometArrowAllocator) private var root = arrowReader.getVectorSchemaRoot def nextBatch(): Option[ColumnarBatch] = { @@ -53,11 +53,9 @@ case class StreamReader(channel: ReadableByteChannel, source: String) extends Au if (root != null) { arrowReader.close() root.close() - allocator.close() arrowReader = null root = null - allocator = null } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala index 53186b131..979a49f2e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala @@ -158,6 +158,8 @@ class CometTPCDSQuerySuite conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "20g") + conf.set(CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key, "true") + conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") conf.set(MEMORY_OFFHEAP_ENABLED.key, "true") conf.set(MEMORY_OFFHEAP_SIZE.key, "20g") conf