From 457898b08ac4aa41739898f9042d65176b17785f Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 2 Jul 2024 15:02:16 +0800 Subject: [PATCH] [VL] IndicatorVectorPool to avoid sharing native columnar batches' ownerships among runtime instances (#6293) --- .../execution/RowToVeloxColumnarExec.scala | 2 +- .../ColumnarCachedBatchSerializer.scala | 2 +- .../datasources/velox/VeloxBlockStripes.java | 7 +- .../columnarbatch/ColumnarBatchTest.java | 104 ++++++++++++++++++ .../gluten/test/VeloxBackendTestBase.java | 86 +++++++++++++++ .../gluten/utils/VeloxBloomFilterTest.java | 68 +----------- cpp/core/jni/JniWrapper.cc | 12 -- ...lebornHashBasedColumnarShuffleWriter.scala | 7 -- .../gluten/memory/memtarget/Spillers.java | 3 + .../ColumnarBatchJniWrapper.java | 2 - .../gluten/columnarbatch/ColumnarBatches.java | 18 +-- .../gluten/columnarbatch/IndicatorVector.java | 16 ++- .../columnarbatch/IndicatorVectorBase.java | 33 +----- .../columnarbatch/IndicatorVectorPool.java | 66 +++++++++++ .../vectorized/ColumnarBatchOutIterator.java | 2 +- .../execution/ColumnarBuildSideRelation.scala | 4 +- .../spark/sql/execution/utils/ExecUtil.scala | 5 +- .../VeloxUniffleColumnarShuffleWriter.java | 7 -- 18 files changed, 300 insertions(+), 144 deletions(-) create mode 100644 backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java create mode 100644 backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java create mode 100644 gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala index 289df1a6e54d..7bcf56f7edb0 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala @@ -216,7 +216,7 @@ object RowToVeloxColumnarExec { try { val handle = jniWrapper .nativeConvertRowToColumnar(r2cHandle, rowLength.toArray, arrowBuf.memoryAddress()) - val cb = ColumnarBatches.create(runtime, handle) + val cb = ColumnarBatches.create(handle) convertTime += System.currentTimeMillis() - startNative cb } finally { diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala index 3f82f919b4d8..15fd51abef48 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala @@ -247,7 +247,7 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe val batchHandle = jniWrapper .deserialize(deserializerHandle, cachedBatch.bytes) - val batch = ColumnarBatches.create(runtime, batchHandle) + val batch = ColumnarBatches.create(batchHandle) if (shouldSelectAttributes) { try { ColumnarBatches.select(batch, requestedColumnIndices.toArray) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java index 56df7b9ad57f..f9848d4ab634 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java @@ -16,13 +16,11 @@ */ package org.apache.spark.sql.execution.datasources.velox; -import org.apache.gluten.exec.Runtimes; +import org.apache.gluten.columnarbatch.ColumnarBatches; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; import org.apache.spark.sql.execution.datasources.BlockStripe; import org.apache.spark.sql.execution.datasources.BlockStripes; -import org.apache.gluten.columnarbatch.ColumnarBatches; - import org.apache.spark.sql.vectorized.ColumnarBatch; import org.jetbrains.annotations.NotNull; @@ -53,8 +51,7 @@ public BlockStripe next() { return new BlockStripe() { @Override public ColumnarBatch getColumnarBatch() { - return ColumnarBatches.create( - Runtimes.contextInstance("VeloxBlockStripes"), blockAddresses[0]); + return ColumnarBatches.create(blockAddresses[0]); } @Override diff --git a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java new file mode 100644 index 000000000000..cd2ac50d350c --- /dev/null +++ b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.columnarbatch; + +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators; +import org.apache.gluten.test.VeloxBackendTestBase; +import org.apache.gluten.vectorized.ArrowWritableColumnVector; + +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.spark.util.TaskResources$; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.StreamSupport; + +public class ColumnarBatchTest extends VeloxBackendTestBase { + + @Test + public void testOffloadAndLoad() { + TaskResources$.MODULE$.runUnsafe( + () -> { + final int numRows = 100; + final ColumnarBatch batch = newArrowBatch("a boolean, b int", numRows); + Assert.assertTrue(ColumnarBatches.isHeavyBatch(batch)); + final ColumnarBatch offloaded = + ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch); + Assert.assertTrue(ColumnarBatches.isLightBatch(offloaded)); + final ColumnarBatch loaded = + ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), offloaded); + Assert.assertTrue(ColumnarBatches.isHeavyBatch(loaded)); + long cnt = + StreamSupport.stream( + Spliterators.spliteratorUnknownSize( + loaded.rowIterator(), Spliterator.ORDERED), + false) + .count(); + Assert.assertEquals(numRows, cnt); + loaded.close(); + return null; + }); + } + + @Test + public void testCreateByHandle() { + TaskResources$.MODULE$.runUnsafe( + () -> { + final int numRows = 100; + final ColumnarBatch batch = newArrowBatch("a boolean, b int", numRows); + Assert.assertEquals(1, ColumnarBatches.getRefCnt(batch)); + final ColumnarBatch offloaded = + ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch); + Assert.assertEquals(1, ColumnarBatches.getRefCnt(offloaded)); + final long handle = ColumnarBatches.getNativeHandle(offloaded); + final ColumnarBatch created = ColumnarBatches.create(handle); + Assert.assertEquals(handle, ColumnarBatches.getNativeHandle(created)); + Assert.assertEquals(1, ColumnarBatches.getRefCnt(offloaded)); + Assert.assertEquals(1, ColumnarBatches.getRefCnt(created)); + ColumnarBatches.retain(created); + Assert.assertEquals(2, ColumnarBatches.getRefCnt(offloaded)); + Assert.assertEquals(2, ColumnarBatches.getRefCnt(created)); + ColumnarBatches.retain(offloaded); + Assert.assertEquals(3, ColumnarBatches.getRefCnt(offloaded)); + Assert.assertEquals(3, ColumnarBatches.getRefCnt(created)); + created.close(); + Assert.assertEquals(2, ColumnarBatches.getRefCnt(offloaded)); + Assert.assertEquals(2, ColumnarBatches.getRefCnt(created)); + offloaded.close(); + Assert.assertEquals(1, ColumnarBatches.getRefCnt(offloaded)); + Assert.assertEquals(1, ColumnarBatches.getRefCnt(created)); + created.close(); + Assert.assertEquals(0, ColumnarBatches.getRefCnt(offloaded)); + Assert.assertEquals(0, ColumnarBatches.getRefCnt(created)); + return null; + }); + } + + private static ColumnarBatch newArrowBatch(String schema, int numRows) { + final ArrowWritableColumnVector[] columns = + ArrowWritableColumnVector.allocateColumns(numRows, StructType.fromDDL(schema)); + for (ArrowWritableColumnVector col : columns) { + col.setValueCount(numRows); + } + final ColumnarBatch batch = new ColumnarBatch(columns); + batch.setNumRows(numRows); + return batch; + } +} diff --git a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java new file mode 100644 index 000000000000..1d7df23566df --- /dev/null +++ b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.test; + +import org.apache.gluten.GlutenConfig; +import org.apache.gluten.backendsapi.ListenerApi; +import org.apache.gluten.backendsapi.velox.VeloxListenerApi; + +import com.codahale.metrics.MetricRegistry; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.api.plugin.PluginContext; +import org.apache.spark.resource.ResourceInformation; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.Map; + +/** For testing Velox backend without starting a Spark context. */ +public abstract class VeloxBackendTestBase { + @BeforeClass + public static void setup() { + final ListenerApi api = new VeloxListenerApi(); + api.onDriverStart(mockSparkContext(), mockPluginContext()); + } + + private static SparkContext mockSparkContext() { + // Not yet implemented. + return null; + } + + private static PluginContext mockPluginContext() { + return new PluginContext() { + @Override + public MetricRegistry metricRegistry() { + throw new UnsupportedOperationException(); + } + + @Override + public SparkConf conf() { + final SparkConf conf = new SparkConf(); + conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY(), "0"); + return conf; + } + + @Override + public String executorID() { + throw new UnsupportedOperationException(); + } + + @Override + public String hostname() { + throw new UnsupportedOperationException(); + } + + @Override + public Map resources() { + throw new UnsupportedOperationException(); + } + + @Override + public void send(Object message) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Object ask(Object message) throws Exception { + throw new UnsupportedOperationException(); + } + }; + } +} diff --git a/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java b/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java index fda4003ddd20..cf568b166582 100644 --- a/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java +++ b/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java @@ -16,34 +16,18 @@ */ package org.apache.gluten.utils; -import org.apache.gluten.GlutenConfig; -import org.apache.gluten.backendsapi.ListenerApi; -import org.apache.gluten.backendsapi.velox.VeloxListenerApi; - -import com.codahale.metrics.MetricRegistry; -import org.apache.spark.SparkConf; -import org.apache.spark.SparkContext; -import org.apache.spark.api.plugin.PluginContext; -import org.apache.spark.resource.ResourceInformation; +import org.apache.gluten.test.VeloxBackendTestBase; + import org.apache.spark.util.TaskResources$; import org.apache.spark.util.sketch.BloomFilter; import org.apache.spark.util.sketch.IncompatibleMergeException; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.function.ThrowingRunnable; -import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Map; - -public class VeloxBloomFilterTest { - @BeforeClass - public static void setup() { - final ListenerApi api = new VeloxListenerApi(); - api.onDriverStart(mockSparkContext(), mockPluginContext()); - } +public class VeloxBloomFilterTest extends VeloxBackendTestBase { @Test public void testEmpty() { TaskResources$.MODULE$.runUnsafe( @@ -191,50 +175,4 @@ private static void checkFalsePositives(BloomFilter filter, int start) { Assert.assertTrue(negativeFalsePositives > 0); Assert.assertTrue(negativeFalsePositives < attemptCount); } - - private static SparkContext mockSparkContext() { - // Not yet implemented. - return null; - } - - private static PluginContext mockPluginContext() { - return new PluginContext() { - @Override - public MetricRegistry metricRegistry() { - throw new UnsupportedOperationException(); - } - - @Override - public SparkConf conf() { - final SparkConf conf = new SparkConf(); - conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY(), "0"); - return conf; - } - - @Override - public String executorID() { - throw new UnsupportedOperationException(); - } - - @Override - public String hostname() { - throw new UnsupportedOperationException(); - } - - @Override - public Map resources() { - throw new UnsupportedOperationException(); - } - - @Override - public void send(Object message) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public Object ask(Object message) throws Exception { - throw new UnsupportedOperationException(); - } - }; - } } diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 23eea2db7ce6..ea5c9d271c92 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -750,18 +750,6 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWra JNI_METHOD_END(kInvalidObjectHandle) } -JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_obtainOwnership( // NOLINT - JNIEnv* env, - jobject wrapper, - jlong batchHandle) { - JNI_METHOD_START - auto ctx = gluten::getRuntime(env, wrapper); - auto batch = ObjectStore::retrieve(batchHandle); - auto newHandle = ctx->saveObject(batch); - return newHandle; - JNI_METHOD_END(-1L) -} - JNIEXPORT void JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_close( // NOLINT JNIEnv* env, jobject wrapper, diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala index b8e6513cf009..87b16c65bd09 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala @@ -116,13 +116,6 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V]( if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) { return 0L } - if (nativeShuffleWriter == -1L) { - throw new IllegalStateException( - "Fatal: spill() called before a celeborn shuffle writer " + - "is created. This behavior should be" + - "optimized by moving memory " + - "allocations from make() to split()") - } logInfo(s"Gluten shuffle writer: Trying to push $size bytes of data") // fixme pass true when being called by self val pushed = diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/Spillers.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/Spillers.java index 4477e2956db7..38ed88f57778 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/Spillers.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/Spillers.java @@ -80,6 +80,9 @@ public void append(Spiller spiller) { public long spill(MemoryTarget self, Phase phase, final long size) { long remainingBytes = size; for (Spiller spiller : spillers) { + if (remainingBytes <= 0) { + break; + } remainingBytes -= spiller.spill(self, phase, remainingBytes); } return size - remainingBytes; diff --git a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatchJniWrapper.java b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatchJniWrapper.java index a834e13a4348..e71e9d7bee1b 100644 --- a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatchJniWrapper.java +++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatchJniWrapper.java @@ -48,8 +48,6 @@ public static ColumnarBatchJniWrapper create(Runtime runtime) { public native long select(long batch, int[] columnIndices); - public native long obtainOwnership(long batch); - public native void close(long batch); @Override diff --git a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java index fc3b56c1bcad..cb68e032dc5b 100644 --- a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java +++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java @@ -24,6 +24,7 @@ import org.apache.gluten.utils.ImplicitClass; import org.apache.gluten.vectorized.ArrowWritableColumnVector; +import com.google.common.annotations.VisibleForTesting; import org.apache.arrow.c.ArrowArray; import org.apache.arrow.c.ArrowSchema; import org.apache.arrow.c.CDataDictionaryProvider; @@ -94,10 +95,12 @@ private static void transferVectors(ColumnarBatch from, ColumnarBatch target) { if (target.numCols() != from.numCols()) { throw new IllegalStateException(); } - final ColumnVector[] vectors = (ColumnVector[]) FIELD_COLUMNS.get(target); + final ColumnVector[] newVectors = new ColumnVector[from.numCols()]; for (int i = 0; i < target.numCols(); i++) { - vectors[i] = from.column(i); + newVectors[i] = from.column(i); } + FIELD_COLUMNS.set(target, newVectors); + System.out.println(); } catch (IllegalAccessException e) { throw new GlutenException(e); } @@ -127,7 +130,7 @@ public static ColumnarBatch select(ColumnarBatch batch, int[] columnIndices) { final IndicatorVector iv = getIndicatorVector(batch); long outputBatchHandle = ColumnarBatchJniWrapper.create(runtime).select(iv.handle(), columnIndices); - return create(runtime, outputBatchHandle); + return create(outputBatchHandle); case HEAVY: return new ColumnarBatch( Arrays.stream(columnIndices).mapToObj(batch::column).toArray(ColumnVector[]::new), @@ -218,7 +221,7 @@ private static ColumnarBatch offload(BufferAllocator allocator, ColumnarBatch in long handle = ColumnarBatchJniWrapper.create(runtime) .createWithArrowArray(cSchema.memoryAddress(), cArray.memoryAddress()); - ColumnarBatch output = ColumnarBatches.create(runtime, handle); + ColumnarBatch output = ColumnarBatches.create(handle); // Follow input's reference count. This might be optimized using // automatic clean-up or once the extensibility of ColumnarBatch is enriched @@ -294,7 +297,8 @@ private static long getRefCntHeavy(ColumnarBatch input) { return refCnt; } - private static long getRefCnt(ColumnarBatch input) { + @VisibleForTesting + static long getRefCnt(ColumnarBatch input) { switch (identifyBatchType(input)) { case LIGHT: return getRefCntLight(input); @@ -348,8 +352,8 @@ private static ColumnarBatch create(IndicatorVector iv) { return new ColumnarBatch(columnVectors, numRows); } - public static ColumnarBatch create(Runtime runtime, long nativeHandle) { - return create(new IndicatorVector(runtime, nativeHandle)); + public static ColumnarBatch create(long nativeHandle) { + return create(IndicatorVector.obtain(nativeHandle)); } public static void retain(ColumnarBatch b) { diff --git a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVector.java b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVector.java index 0ec5b78ce500..7fe87e95fa54 100644 --- a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVector.java +++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVector.java @@ -16,15 +16,24 @@ */ package org.apache.gluten.columnarbatch; -import org.apache.gluten.exec.Runtime; +import org.apache.spark.util.TaskResources; import java.util.concurrent.atomic.AtomicLong; public class IndicatorVector extends IndicatorVectorBase { + private final IndicatorVectorPool pool; private final AtomicLong refCnt = new AtomicLong(1L); - protected IndicatorVector(Runtime runtime, long handle) { - super(runtime, handle); + protected IndicatorVector(IndicatorVectorPool pool, long handle) { + super(handle); + this.pool = pool; + } + + static IndicatorVector obtain(long handle) { + final IndicatorVectorPool pool = + TaskResources.addResourceIfNotRegistered( + IndicatorVectorPool.class.getName(), IndicatorVectorPool::new); + return pool.obtain(handle); } @Override @@ -44,6 +53,7 @@ void release() { return; } if (refCnt.decrementAndGet() == 0) { + pool.remove(handle); jniWrapper.close(handle); } } diff --git a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorBase.java b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorBase.java index fa695127adbf..1bc685bd5ceb 100644 --- a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorBase.java +++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorBase.java @@ -16,7 +16,7 @@ */ package org.apache.gluten.columnarbatch; -import org.apache.gluten.exec.Runtime; +import org.apache.gluten.exec.Runtimes; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Decimal; @@ -26,35 +26,14 @@ import org.apache.spark.unsafe.types.UTF8String; public abstract class IndicatorVectorBase extends ColumnVector { - private final Runtime runtime; - protected final long handle; protected final ColumnarBatchJniWrapper jniWrapper; + protected final long handle; - protected IndicatorVectorBase(Runtime runtime, long handle) { + protected IndicatorVectorBase(long handle) { super(DataTypes.NullType); - this.runtime = runtime; - this.jniWrapper = ColumnarBatchJniWrapper.create(runtime); - this.handle = takeOwnership(handle); - } - - private long takeOwnership(long handle) { - // Note: Underlying memory of returned batch still holds - // reference to the original memory manager. As - // a result, once its original resident runtime / mm is - // released, data may become invalid. Currently, it's - // the caller's responsibility to make sure the original - // runtime / mm keep alive even this function - // was called. - // - // Additionally, as in Gluten we have principle that runtime - // mm that were created earlier will be released - // later, this FILO practice is what helps the runtime that - // took ownership be able to access the data constantly - // because the original runtime will live longer than - // itself. - long newHandle = jniWrapper.obtainOwnership(handle); - jniWrapper.close(handle); - return newHandle; + this.jniWrapper = + ColumnarBatchJniWrapper.create(Runtimes.contextInstance("IndicatorVectorBase#init")); + this.handle = handle; } public String getType() { diff --git a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java new file mode 100644 index 000000000000..6e46742b564a --- /dev/null +++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.columnarbatch; + +import org.apache.spark.util.TaskResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class IndicatorVectorPool implements TaskResource { + private static final Logger LOG = LoggerFactory.getLogger(IndicatorVectorPool.class); + // A pool for all alive indicator vectors. The reason we adopt the pool + // is, we don't want one native columnar batch (which is located via the + // long int handle through JNI bridge) to be owned by more than one IndicatorVector + // instance so release method of the native columnar batch could be guaranteed + // to be called and only called once. + private final Map uniqueInstances = new ConcurrentHashMap<>(); + + IndicatorVectorPool() {} + + @Override + public void release() throws Exception { + if (!uniqueInstances.isEmpty()) { + LOG.warn( + "There are still unreleased native columnar batches during ending the task." + + " Will close them automatically however the batches should be better released" + + " manually to minimize memory pressure."); + } + } + + IndicatorVector obtain(long handle) { + return uniqueInstances.computeIfAbsent(handle, h -> new IndicatorVector(this, handle)); + } + + void remove(long handle) { + if (uniqueInstances.remove(handle) == null) { + throw new IllegalStateException("Indicator vector not found in pool, this should not happen"); + } + } + + @Override + public int priority() { + return 0; + } + + @Override + public String resourceName() { + return IndicatorVectorPool.class.getName(); + } +} diff --git a/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java b/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java index ddf00844f9b0..9dd0404384ad 100644 --- a/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java +++ b/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java @@ -67,7 +67,7 @@ public ColumnarBatch nextInternal() throws IOException { if (batchHandle == -1L) { return null; // stream ended } - return ColumnarBatches.create(runtime, batchHandle); + return ColumnarBatches.create(batchHandle); } @Override diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala index 2f6abdc370d6..f7bcfd694d52 100644 --- a/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala +++ b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala @@ -68,7 +68,7 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra jniWrapper .deserialize(serializeHandle, batches(batchId)) batchId += 1 - ColumnarBatches.create(runtime, handle) + ColumnarBatches.create(handle) } }) .protectInvocationFlow() @@ -124,7 +124,7 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra batchId += 1 val batchHandle = serializerJniWrapper.deserialize(serializeHandle, batchBytes) - val batch = ColumnarBatches.create(runtime, batchHandle) + val batch = ColumnarBatches.create(batchHandle) if (batch.numRows == 0) { batch.close() Iterator.empty diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala index 22b376a1b608..77f35ff48fcc 100644 --- a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala +++ b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala @@ -145,10 +145,7 @@ object ExecUtil { val newHandle = ColumnarBatches.compose(pidBatch, cb) // Composed batch already hold pidBatch's shared ref, so close is safe. ColumnarBatches.forceClose(pidBatch) - ( - 0, - ColumnarBatches - .create(Runtimes.contextInstance("ExecUtil#getShuffleDependency"), newHandle)) + (0, ColumnarBatches.create(newHandle)) }) .recyclePayload(p => ColumnarBatches.forceClose(p._2)) // FIXME why force close? .create() diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index bd205ba7a469..ca5b3ad9529f 100644 --- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -165,13 +165,6 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) { if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) { return 0L; } - if (nativeShuffleWriter == -1) { - throw new IllegalStateException( - "Fatal: spill() called before a shuffle shuffle writer " - + "evaluator is created. This behavior should be" - + "optimized by moving memory " - + "allocations from make() to split()"); - } LOG.info("Gluten shuffle writer: Trying to push {} bytes of data", size); long pushed = jniWrapper.nativeEvict(nativeShuffleWriter, size, false); LOG.info("Gluten shuffle writer: Pushed {} / {} bytes of data", pushed, size);