Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Jul 1, 2024
1 parent ae2d125 commit 7060440
Show file tree
Hide file tree
Showing 14 changed files with 97 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ object RowToVeloxColumnarExec {
try {
val handle = jniWrapper
.nativeConvertRowToColumnar(r2cHandle, rowLength.toArray, arrowBuf.memoryAddress())
val cb = ColumnarBatches.create(runtime, handle)
val cb = ColumnarBatches.create(handle)
convertTime += System.currentTimeMillis() - startNative
cb
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe
val batchHandle =
jniWrapper
.deserialize(deserializerHandle, cachedBatch.bytes)
val batch = ColumnarBatches.create(runtime, batchHandle)
val batch = ColumnarBatches.create(batchHandle)
if (shouldSelectAttributes) {
try {
ColumnarBatches.select(batch, requestedColumnIndices.toArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@
*/
package org.apache.spark.sql.execution.datasources.velox;

import org.apache.gluten.exec.Runtimes;
import org.apache.gluten.columnarbatch.ColumnarBatches;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.execution.datasources.BlockStripe;
import org.apache.spark.sql.execution.datasources.BlockStripes;
import org.apache.gluten.columnarbatch.ColumnarBatches;

import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -53,8 +51,7 @@ public BlockStripe next() {
return new BlockStripe() {
@Override
public ColumnarBatch getColumnarBatch() {
return ColumnarBatches.create(
Runtimes.contextInstance("VeloxBlockStripes"), blockAddresses[0]);
return ColumnarBatches.create(blockAddresses[0]);
}

@Override
Expand Down
12 changes: 0 additions & 12 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -750,18 +750,6 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWra
JNI_METHOD_END(kInvalidObjectHandle)
}

JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_obtainOwnership( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong batchHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
auto newHandle = ctx->saveObject(batch);
return newHandle;
JNI_METHOD_END(-1L)
}

JNIEXPORT void JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_close( // NOLINT
JNIEnv* env,
jobject wrapper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,6 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](
if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
return 0L
}
if (nativeShuffleWriter == -1L) {
throw new IllegalStateException(
"Fatal: spill() called before a celeborn shuffle writer " +
"is created. This behavior should be" +
"optimized by moving memory " +
"allocations from make() to split()")
}
logInfo(s"Gluten shuffle writer: Trying to push $size bytes of data")
// fixme pass true when being called by self
val pushed =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ public static ColumnarBatchJniWrapper create(Runtime runtime) {

public native long select(long batch, int[] columnIndices);

public native long obtainOwnership(long batch);

public native void close(long batch);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public static ColumnarBatch select(ColumnarBatch batch, int[] columnIndices) {
final IndicatorVector iv = getIndicatorVector(batch);
long outputBatchHandle =
ColumnarBatchJniWrapper.create(runtime).select(iv.handle(), columnIndices);
return create(runtime, outputBatchHandle);
return create(outputBatchHandle);
case HEAVY:
return new ColumnarBatch(
Arrays.stream(columnIndices).mapToObj(batch::column).toArray(ColumnVector[]::new),
Expand Down Expand Up @@ -218,7 +218,7 @@ private static ColumnarBatch offload(BufferAllocator allocator, ColumnarBatch in
long handle =
ColumnarBatchJniWrapper.create(runtime)
.createWithArrowArray(cSchema.memoryAddress(), cArray.memoryAddress());
ColumnarBatch output = ColumnarBatches.create(runtime, handle);
ColumnarBatch output = ColumnarBatches.create(handle);

// Follow input's reference count. This might be optimized using
// automatic clean-up or once the extensibility of ColumnarBatch is enriched
Expand Down Expand Up @@ -348,8 +348,8 @@ private static ColumnarBatch create(IndicatorVector iv) {
return new ColumnarBatch(columnVectors, numRows);
}

public static ColumnarBatch create(Runtime runtime, long nativeHandle) {
return create(new IndicatorVector(runtime, nativeHandle));
public static ColumnarBatch create(long nativeHandle) {
return create(IndicatorVector.obtain(nativeHandle));
}

public static void retain(ColumnarBatch b) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,24 @@
*/
package org.apache.gluten.columnarbatch;

import org.apache.gluten.exec.Runtime;
import org.apache.spark.util.TaskResources;

import java.util.concurrent.atomic.AtomicLong;

public class IndicatorVector extends IndicatorVectorBase {
private final IndicatorVectorPool pool;
private final AtomicLong refCnt = new AtomicLong(1L);

protected IndicatorVector(Runtime runtime, long handle) {
super(runtime, handle);
protected IndicatorVector(IndicatorVectorPool pool, long handle) {
super(handle);
this.pool = pool;
}

static IndicatorVector obtain(long handle) {
final IndicatorVectorPool pool =
TaskResources.addResourceIfNotRegistered(
IndicatorVectorPool.class.getName(), IndicatorVectorPool::new);
return pool.obtain(handle);
}

@Override
Expand All @@ -44,6 +53,7 @@ void release() {
return;
}
if (refCnt.decrementAndGet() == 0) {
pool.remove(handle);
jniWrapper.close(handle);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.columnarbatch;

import org.apache.gluten.exec.Runtime;
import org.apache.gluten.exec.Runtimes;

import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Decimal;
Expand All @@ -26,35 +26,14 @@
import org.apache.spark.unsafe.types.UTF8String;

public abstract class IndicatorVectorBase extends ColumnVector {
private final Runtime runtime;
protected final long handle;
protected final ColumnarBatchJniWrapper jniWrapper;
protected final long handle;

protected IndicatorVectorBase(Runtime runtime, long handle) {
protected IndicatorVectorBase(long handle) {
super(DataTypes.NullType);
this.runtime = runtime;
this.jniWrapper = ColumnarBatchJniWrapper.create(runtime);
this.handle = takeOwnership(handle);
}

private long takeOwnership(long handle) {
// Note: Underlying memory of returned batch still holds
// reference to the original memory manager. As
// a result, once its original resident runtime / mm is
// released, data may become invalid. Currently, it's
// the caller's responsibility to make sure the original
// runtime / mm keep alive even this function
// was called.
//
// Additionally, as in Gluten we have principle that runtime
// mm that were created earlier will be released
// later, this FILO practice is what helps the runtime that
// took ownership be able to access the data constantly
// because the original runtime will live longer than
// itself.
long newHandle = jniWrapper.obtainOwnership(handle);
jniWrapper.close(handle);
return newHandle;
this.jniWrapper =
ColumnarBatchJniWrapper.create(Runtimes.contextInstance("IndicatorVectorBase#init"));
this.handle = handle;
}

public String getType() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.columnarbatch;

import org.apache.spark.util.TaskResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class IndicatorVectorPool implements TaskResource {
private static final Logger LOG = LoggerFactory.getLogger(IndicatorVectorPool.class);
// A pool for all alive indicator vectors. The reason we adopt the pool
// is, we don't want one native columnar batch (which is located via the
// long int handle through JNI bridge) to be owned by more than one IndicatorVector
// instance so release method of the native columnar batch could be guaranteed
// to be called and only called once.
private final Map<Long, IndicatorVector> uniqueInstances = new ConcurrentHashMap<>();

IndicatorVectorPool() {}

@Override
public void release() throws Exception {
if (!uniqueInstances.isEmpty()) {
LOG.warn(
"There are still unreleased native columnar batches during ending the task."
+ " Will close them automatically however the batches should be better released"
+ " manually to minimize memory pressure.");
}
}

IndicatorVector obtain(long handle) {
return uniqueInstances.computeIfAbsent(handle, h -> new IndicatorVector(this, handle));
}

void remove(long handle) {
if (uniqueInstances.remove(handle) == null) {
throw new IllegalStateException("Indicator vector not found in pool, this should not happen");
}
}

@Override
public int priority() {
return 0;
}

@Override
public String resourceName() {
return IndicatorVectorPool.class.getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public ColumnarBatch nextInternal() throws IOException {
if (batchHandle == -1L) {
return null; // stream ended
}
return ColumnarBatches.create(runtime, batchHandle);
return ColumnarBatches.create(batchHandle);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra
jniWrapper
.deserialize(serializeHandle, batches(batchId))
batchId += 1
ColumnarBatches.create(runtime, handle)
ColumnarBatches.create(handle)
}
})
.protectInvocationFlow()
Expand Down Expand Up @@ -124,7 +124,7 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra
batchId += 1
val batchHandle =
serializerJniWrapper.deserialize(serializeHandle, batchBytes)
val batch = ColumnarBatches.create(runtime, batchHandle)
val batch = ColumnarBatches.create(batchHandle)
if (batch.numRows == 0) {
batch.close()
Iterator.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,7 @@ object ExecUtil {
val newHandle = ColumnarBatches.compose(pidBatch, cb)
// Composed batch already hold pidBatch's shared ref, so close is safe.
ColumnarBatches.forceClose(pidBatch)
(
0,
ColumnarBatches
.create(Runtimes.contextInstance("ExecUtil#getShuffleDependency"), newHandle))
(0, ColumnarBatches.create(newHandle))
})
.recyclePayload(p => ColumnarBatches.forceClose(p._2)) // FIXME why force close?
.create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,6 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) {
if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
return 0L;
}
if (nativeShuffleWriter == -1) {
throw new IllegalStateException(
"Fatal: spill() called before a shuffle shuffle writer "
+ "evaluator is created. This behavior should be"
+ "optimized by moving memory "
+ "allocations from make() to split()");
}
LOG.info("Gluten shuffle writer: Trying to push {} bytes of data", size);
long pushed = jniWrapper.nativeEvict(nativeShuffleWriter, size, false);
LOG.info("Gluten shuffle writer: Pushed {} / {} bytes of data", pushed, size);
Expand Down

0 comments on commit 7060440

Please sign in to comment.