Skip to content

Commit

Permalink
[VL] IndicatorVectorPool to avoid sharing native columnar batches' ow…
Browse files Browse the repository at this point in the history
…nerships among runtime instances (#6293)
  • Loading branch information
zhztheplayer authored Jul 2, 2024
1 parent b0d836b commit 457898b
Show file tree
Hide file tree
Showing 18 changed files with 300 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ object RowToVeloxColumnarExec {
try {
val handle = jniWrapper
.nativeConvertRowToColumnar(r2cHandle, rowLength.toArray, arrowBuf.memoryAddress())
val cb = ColumnarBatches.create(runtime, handle)
val cb = ColumnarBatches.create(handle)
convertTime += System.currentTimeMillis() - startNative
cb
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe
val batchHandle =
jniWrapper
.deserialize(deserializerHandle, cachedBatch.bytes)
val batch = ColumnarBatches.create(runtime, batchHandle)
val batch = ColumnarBatches.create(batchHandle)
if (shouldSelectAttributes) {
try {
ColumnarBatches.select(batch, requestedColumnIndices.toArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@
*/
package org.apache.spark.sql.execution.datasources.velox;

import org.apache.gluten.exec.Runtimes;
import org.apache.gluten.columnarbatch.ColumnarBatches;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.execution.datasources.BlockStripe;
import org.apache.spark.sql.execution.datasources.BlockStripes;
import org.apache.gluten.columnarbatch.ColumnarBatches;

import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -53,8 +51,7 @@ public BlockStripe next() {
return new BlockStripe() {
@Override
public ColumnarBatch getColumnarBatch() {
return ColumnarBatches.create(
Runtimes.contextInstance("VeloxBlockStripes"), blockAddresses[0]);
return ColumnarBatches.create(blockAddresses[0]);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.columnarbatch;

import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
import org.apache.gluten.test.VeloxBackendTestBase;
import org.apache.gluten.vectorized.ArrowWritableColumnVector;

import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.util.TaskResources$;
import org.junit.Assert;
import org.junit.Test;

import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.StreamSupport;

public class ColumnarBatchTest extends VeloxBackendTestBase {

@Test
public void testOffloadAndLoad() {
TaskResources$.MODULE$.runUnsafe(
() -> {
final int numRows = 100;
final ColumnarBatch batch = newArrowBatch("a boolean, b int", numRows);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(batch));
final ColumnarBatch offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
Assert.assertTrue(ColumnarBatches.isLightBatch(offloaded));
final ColumnarBatch loaded =
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), offloaded);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(loaded));
long cnt =
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
loaded.rowIterator(), Spliterator.ORDERED),
false)
.count();
Assert.assertEquals(numRows, cnt);
loaded.close();
return null;
});
}

@Test
public void testCreateByHandle() {
TaskResources$.MODULE$.runUnsafe(
() -> {
final int numRows = 100;
final ColumnarBatch batch = newArrowBatch("a boolean, b int", numRows);
Assert.assertEquals(1, ColumnarBatches.getRefCnt(batch));
final ColumnarBatch offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
Assert.assertEquals(1, ColumnarBatches.getRefCnt(offloaded));
final long handle = ColumnarBatches.getNativeHandle(offloaded);
final ColumnarBatch created = ColumnarBatches.create(handle);
Assert.assertEquals(handle, ColumnarBatches.getNativeHandle(created));
Assert.assertEquals(1, ColumnarBatches.getRefCnt(offloaded));
Assert.assertEquals(1, ColumnarBatches.getRefCnt(created));
ColumnarBatches.retain(created);
Assert.assertEquals(2, ColumnarBatches.getRefCnt(offloaded));
Assert.assertEquals(2, ColumnarBatches.getRefCnt(created));
ColumnarBatches.retain(offloaded);
Assert.assertEquals(3, ColumnarBatches.getRefCnt(offloaded));
Assert.assertEquals(3, ColumnarBatches.getRefCnt(created));
created.close();
Assert.assertEquals(2, ColumnarBatches.getRefCnt(offloaded));
Assert.assertEquals(2, ColumnarBatches.getRefCnt(created));
offloaded.close();
Assert.assertEquals(1, ColumnarBatches.getRefCnt(offloaded));
Assert.assertEquals(1, ColumnarBatches.getRefCnt(created));
created.close();
Assert.assertEquals(0, ColumnarBatches.getRefCnt(offloaded));
Assert.assertEquals(0, ColumnarBatches.getRefCnt(created));
return null;
});
}

private static ColumnarBatch newArrowBatch(String schema, int numRows) {
final ArrowWritableColumnVector[] columns =
ArrowWritableColumnVector.allocateColumns(numRows, StructType.fromDDL(schema));
for (ArrowWritableColumnVector col : columns) {
col.setValueCount(numRows);
}
final ColumnarBatch batch = new ColumnarBatch(columns);
batch.setNumRows(numRows);
return batch;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.test;

import org.apache.gluten.GlutenConfig;
import org.apache.gluten.backendsapi.ListenerApi;
import org.apache.gluten.backendsapi.velox.VeloxListenerApi;

import com.codahale.metrics.MetricRegistry;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.plugin.PluginContext;
import org.apache.spark.resource.ResourceInformation;
import org.junit.BeforeClass;

import java.io.IOException;
import java.util.Map;

/** For testing Velox backend without starting a Spark context. */
public abstract class VeloxBackendTestBase {
@BeforeClass
public static void setup() {
final ListenerApi api = new VeloxListenerApi();
api.onDriverStart(mockSparkContext(), mockPluginContext());
}

private static SparkContext mockSparkContext() {
// Not yet implemented.
return null;
}

private static PluginContext mockPluginContext() {
return new PluginContext() {
@Override
public MetricRegistry metricRegistry() {
throw new UnsupportedOperationException();
}

@Override
public SparkConf conf() {
final SparkConf conf = new SparkConf();
conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY(), "0");
return conf;
}

@Override
public String executorID() {
throw new UnsupportedOperationException();
}

@Override
public String hostname() {
throw new UnsupportedOperationException();
}

@Override
public Map<String, ResourceInformation> resources() {
throw new UnsupportedOperationException();
}

@Override
public void send(Object message) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public Object ask(Object message) throws Exception {
throw new UnsupportedOperationException();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,18 @@
*/
package org.apache.gluten.utils;

import org.apache.gluten.GlutenConfig;
import org.apache.gluten.backendsapi.ListenerApi;
import org.apache.gluten.backendsapi.velox.VeloxListenerApi;

import com.codahale.metrics.MetricRegistry;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.plugin.PluginContext;
import org.apache.spark.resource.ResourceInformation;
import org.apache.gluten.test.VeloxBackendTestBase;

import org.apache.spark.util.TaskResources$;
import org.apache.spark.util.sketch.BloomFilter;
import org.apache.spark.util.sketch.IncompatibleMergeException;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;

public class VeloxBloomFilterTest {
@BeforeClass
public static void setup() {
final ListenerApi api = new VeloxListenerApi();
api.onDriverStart(mockSparkContext(), mockPluginContext());
}

public class VeloxBloomFilterTest extends VeloxBackendTestBase {
@Test
public void testEmpty() {
TaskResources$.MODULE$.runUnsafe(
Expand Down Expand Up @@ -191,50 +175,4 @@ private static void checkFalsePositives(BloomFilter filter, int start) {
Assert.assertTrue(negativeFalsePositives > 0);
Assert.assertTrue(negativeFalsePositives < attemptCount);
}

private static SparkContext mockSparkContext() {
// Not yet implemented.
return null;
}

private static PluginContext mockPluginContext() {
return new PluginContext() {
@Override
public MetricRegistry metricRegistry() {
throw new UnsupportedOperationException();
}

@Override
public SparkConf conf() {
final SparkConf conf = new SparkConf();
conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY(), "0");
return conf;
}

@Override
public String executorID() {
throw new UnsupportedOperationException();
}

@Override
public String hostname() {
throw new UnsupportedOperationException();
}

@Override
public Map<String, ResourceInformation> resources() {
throw new UnsupportedOperationException();
}

@Override
public void send(Object message) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public Object ask(Object message) throws Exception {
throw new UnsupportedOperationException();
}
};
}
}
12 changes: 0 additions & 12 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -750,18 +750,6 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWra
JNI_METHOD_END(kInvalidObjectHandle)
}

JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_obtainOwnership( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong batchHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
auto newHandle = ctx->saveObject(batch);
return newHandle;
JNI_METHOD_END(-1L)
}

JNIEXPORT void JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_close( // NOLINT
JNIEnv* env,
jobject wrapper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,6 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](
if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
return 0L
}
if (nativeShuffleWriter == -1L) {
throw new IllegalStateException(
"Fatal: spill() called before a celeborn shuffle writer " +
"is created. This behavior should be" +
"optimized by moving memory " +
"allocations from make() to split()")
}
logInfo(s"Gluten shuffle writer: Trying to push $size bytes of data")
// fixme pass true when being called by self
val pushed =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public void append(Spiller spiller) {
public long spill(MemoryTarget self, Phase phase, final long size) {
long remainingBytes = size;
for (Spiller spiller : spillers) {
if (remainingBytes <= 0) {
break;
}
remainingBytes -= spiller.spill(self, phase, remainingBytes);
}
return size - remainingBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ public static ColumnarBatchJniWrapper create(Runtime runtime) {

public native long select(long batch, int[] columnIndices);

public native long obtainOwnership(long batch);

public native void close(long batch);

@Override
Expand Down
Loading

0 comments on commit 457898b

Please sign in to comment.