From 13d83e4cc0e3a6a9f9458d3ab688ca2050f77e2d Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Thu, 15 Aug 2024 16:46:18 +0000 Subject: [PATCH] Fix transfer vector --- .../columnarbatch/ColumnarBatchTest.java | 3 +- .../gluten/columnarbatch/ColumnarBatches.java | 18 +++++++- .../sql/vectorized/ColumnarBatchRowUtil.java | 45 +++++++++++++++++++ 3 files changed, 63 insertions(+), 3 deletions(-) create mode 100644 gluten-data/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRowUtil.java diff --git a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java index 956c75e4d528b..3b78a40677933 100644 --- a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java +++ b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java @@ -92,7 +92,7 @@ public void testCreateByHandle() { } @Test - public void testOffloadAndLoadWithWrites() { + public void testOffloadAndLoadReadRow() { TaskResources$.MODULE$.runUnsafe( () -> { final int numRows = 100; @@ -124,7 +124,6 @@ public void testOffloadAndLoadWithWrites() { }); } - private static ColumnarBatch newArrowBatch(String schema, int numRows) { final ArrowWritableColumnVector[] columns = ArrowWritableColumnVector.allocateColumns(numRows, StructType.fromDDL(schema)); diff --git a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java index 36e231143703c..a34cd331b865e 100644 --- a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java +++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java @@ -21,6 +21,7 @@ import org.apache.gluten.runtime.Runtimes; import org.apache.gluten.utils.ArrowAbiUtil; import org.apache.gluten.utils.ArrowUtil; +import org.apache.gluten.utils.ImplicitClass; import org.apache.gluten.vectorized.ArrowWritableColumnVector; import com.google.common.annotations.VisibleForTesting; @@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.spark.sql.vectorized.ColumnarBatchRowUtil; import java.lang.reflect.Field; import java.util.Arrays; @@ -99,6 +101,10 @@ private static void transferVectors(ColumnarBatch from, ColumnarBatch target) { newVectors[i] = from.column(i); } FIELD_COLUMNS.set(target, newVectors); + // Light batch does not need the row. + if (isHeavyBatch(target)) { + ColumnarBatchRowUtil.setColumnarBatchRow(newVectors, target); + } } catch (IllegalAccessException e) { throw new GlutenException(e); } @@ -185,13 +191,23 @@ private static ColumnarBatch load(BufferAllocator allocator, ColumnarBatch input ColumnarBatch output = ArrowAbiUtil.importToSparkColumnarBatch(allocator, arrowSchema, cArray); - // Loaded Arrow ColumnarBatch lifecycle is controlled by the caller. The GC can help clean it. + // Follow gluten input's reference count. This might be optimized using + // automatic clean-up or once the extensibility of ColumnarBatch is enriched IndicatorVector giv = (IndicatorVector) input.column(0); + ImplicitClass.ArrowColumnarBatchRetainer retainer = + new ImplicitClass.ArrowColumnarBatchRetainer(output); + for (long i = 0; i < (giv.refCnt() - 1); i++) { + retainer.retain(); + } + // close the input one for (long i = 0; i < giv.refCnt(); i++) { input.close(); } + // populate new vectors to input + transferVectors(output, input); + return output; } } diff --git a/gluten-data/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRowUtil.java b/gluten-data/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRowUtil.java new file mode 100644 index 0000000000000..a7ca37016fa74 --- /dev/null +++ b/gluten-data/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRowUtil.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.vectorized; + +import org.apache.gluten.exception.GlutenException; + +import java.lang.reflect.Field; + +public class ColumnarBatchRowUtil { + + private static final Field FIELD_COLUMNAR_BATCH_ROW; + + static { + try { + Field row = ColumnarBatch.class.getDeclaredField("row"); + row.setAccessible(true); + FIELD_COLUMNAR_BATCH_ROW = row; + } catch (NoSuchFieldException e) { + throw new GlutenException(e); + } + } + + public static void setColumnarBatchRow(ColumnVector[] columns, ColumnarBatch target) { + ColumnarBatchRow row = new ColumnarBatchRow(columns); + try { + FIELD_COLUMNAR_BATCH_ROW.set(target, row); + } catch (IllegalAccessException e) { + throw new GlutenException(e); + } + } +}