Skip to content

Commit

Permalink
Fix transfer vector
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Aug 15, 2024
1 parent 0d1aa7a commit 13d83e4
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void testCreateByHandle() {
}

@Test
public void testOffloadAndLoadWithWrites() {
public void testOffloadAndLoadReadRow() {
TaskResources$.MODULE$.runUnsafe(
() -> {
final int numRows = 100;
Expand Down Expand Up @@ -124,7 +124,6 @@ public void testOffloadAndLoadWithWrites() {
});
}


private static ColumnarBatch newArrowBatch(String schema, int numRows) {
final ArrowWritableColumnVector[] columns =
ArrowWritableColumnVector.allocateColumns(numRows, StructType.fromDDL(schema));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.gluten.runtime.Runtimes;
import org.apache.gluten.utils.ArrowAbiUtil;
import org.apache.gluten.utils.ArrowUtil;
import org.apache.gluten.utils.ImplicitClass;
import org.apache.gluten.vectorized.ArrowWritableColumnVector;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -33,6 +34,7 @@
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarBatchRowUtil;

import java.lang.reflect.Field;
import java.util.Arrays;
Expand Down Expand Up @@ -99,6 +101,10 @@ private static void transferVectors(ColumnarBatch from, ColumnarBatch target) {
newVectors[i] = from.column(i);
}
FIELD_COLUMNS.set(target, newVectors);
// Light batch does not need the row.
if (isHeavyBatch(target)) {
ColumnarBatchRowUtil.setColumnarBatchRow(newVectors, target);
}
} catch (IllegalAccessException e) {
throw new GlutenException(e);
}
Expand Down Expand Up @@ -185,13 +191,23 @@ private static ColumnarBatch load(BufferAllocator allocator, ColumnarBatch input
ColumnarBatch output =
ArrowAbiUtil.importToSparkColumnarBatch(allocator, arrowSchema, cArray);

// Loaded Arrow ColumnarBatch lifecycle is controlled by the caller. The GC can help clean it.
// Follow gluten input's reference count. This might be optimized using
// automatic clean-up or once the extensibility of ColumnarBatch is enriched
IndicatorVector giv = (IndicatorVector) input.column(0);
ImplicitClass.ArrowColumnarBatchRetainer retainer =
new ImplicitClass.ArrowColumnarBatchRetainer(output);
for (long i = 0; i < (giv.refCnt() - 1); i++) {
retainer.retain();
}

// close the input one
for (long i = 0; i < giv.refCnt(); i++) {
input.close();
}

// populate new vectors to input
transferVectors(output, input);

return output;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.vectorized;

import org.apache.gluten.exception.GlutenException;

import java.lang.reflect.Field;

public class ColumnarBatchRowUtil {

private static final Field FIELD_COLUMNAR_BATCH_ROW;

static {
try {
Field row = ColumnarBatch.class.getDeclaredField("row");
row.setAccessible(true);
FIELD_COLUMNAR_BATCH_ROW = row;
} catch (NoSuchFieldException e) {
throw new GlutenException(e);
}
}

public static void setColumnarBatchRow(ColumnVector[] columns, ColumnarBatch target) {
ColumnarBatchRow row = new ColumnarBatchRow(columns);
try {
FIELD_COLUMNAR_BATCH_ROW.set(target, row);
} catch (IllegalAccessException e) {
throw new GlutenException(e);
}
}
}

0 comments on commit 13d83e4

Please sign in to comment.