Skip to content

Commit

Permalink
Add helper function ColumnarBatches.toString
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Aug 16, 2024
1 parent 745f1f3 commit c89c6fa
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, JoinedRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFileLinesReader, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types.{StructField, StructType}
Expand Down Expand Up @@ -310,16 +309,9 @@ object ArrowCSVFileFormat {
schema: StructType,
batchSize: Int,
it: Iterator[InternalRow]): Iterator[ColumnarBatch] = {
// note, these metrics are unused but just make `RowToVeloxColumnarExec` happy
val numInputRows = new SQLMetric("numInputRows")
val numOutputBatches = new SQLMetric("numOutputBatches")
val convertTime = new SQLMetric("convertTime")
val veloxBatch = RowToVeloxColumnarExec.toColumnarBatchIterator(
it,
schema,
numInputRows,
numOutputBatches,
convertTime,
batchSize
)
veloxBatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,23 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBas
}

object RowToVeloxColumnarExec {

def toColumnarBatchIterator(
it: Iterator[InternalRow],
schema: StructType,
columnBatchSize: Int): Iterator[ColumnarBatch] = {
val numInputRows = new SQLMetric("numInputRows")
val numOutputBatches = new SQLMetric("numOutputBatches")
val convertTime = new SQLMetric("convertTime")
RowToVeloxColumnarExec.toColumnarBatchIterator(
it,
schema,
numInputRows,
numOutputBatches,
convertTime,
columnBatchSize)
}

def toColumnarBatchIterator(
it: Iterator[InternalRow],
schema: StructType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,22 @@ case class VeloxColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExecBas
}

object VeloxColumnarToRowExec {

def toRowIterator(
batches: Iterator[ColumnarBatch],
output: Seq[Attribute]): Iterator[InternalRow] = {
val numOutputRows = new SQLMetric("numOutputRows")
val numInputBatches = new SQLMetric("numInputBatches")
val convertTime = new SQLMetric("convertTime")
toRowIterator(
batches,
output,
numOutputRows,
numInputBatches,
convertTime
)
}

def toRowIterator(
batches: Iterator[ColumnarBatch],
output: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.gluten.vectorized.ColumnarBatchSerializerJniWrapper

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
Expand Down Expand Up @@ -134,22 +133,9 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe
conf)
}

// note, these metrics are unused but just make `RowToVeloxColumnarExec` happy
val metrics = BackendsApiManager.getMetricsApiInstance.genRowToColumnarMetrics(
SparkSession.getActiveSession.orNull.sparkContext)
val numInputRows = metrics("numInputRows")
val numOutputBatches = metrics("numOutputBatches")
val convertTime = metrics("convertTime")
val numRows = conf.columnBatchSize
val rddColumnarBatch = input.mapPartitions {
it =>
RowToVeloxColumnarExec.toColumnarBatchIterator(
it,
localSchema,
numInputRows,
numOutputBatches,
convertTime,
numRows)
it => RowToVeloxColumnarExec.toColumnarBatchIterator(it, localSchema, numRows)
}
convertColumnarBatchToCachedBatch(rddColumnarBatch, schema, storageLevel, conf)
}
Expand All @@ -169,22 +155,10 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe
conf)
}

// note, these metrics are unused but just make `VeloxColumnarToRowExec` happy
val metrics = BackendsApiManager.getMetricsApiInstance.genColumnarToRowMetrics(
SparkSession.getActiveSession.orNull.sparkContext)
val numOutputRows = metrics("numOutputRows")
val numInputBatches = metrics("numInputBatches")
val convertTime = metrics("convertTime")
val rddColumnarBatch =
convertCachedBatchToColumnarBatch(input, cacheAttributes, selectedAttributes, conf)
rddColumnarBatch.mapPartitions {
it =>
VeloxColumnarToRowExec.toRowIterator(
it,
selectedAttributes,
numOutputRows,
numInputBatches,
convertTime)
it => VeloxColumnarToRowExec.toRowIterator(it, selectedAttributes)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
*/
package org.apache.gluten.columnarbatch;

import org.apache.gluten.execution.RowToVeloxColumnarExec;
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
import org.apache.gluten.test.VeloxBackendTestBase;
import org.apache.gluten.vectorized.ArrowWritableColumnVector;

import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.util.TaskResources$;
Expand All @@ -30,6 +32,8 @@
import java.util.Spliterators;
import java.util.stream.StreamSupport;

import scala.collection.JavaConverters;

public class ColumnarBatchTest extends VeloxBackendTestBase {

@Test
Expand Down Expand Up @@ -95,15 +99,8 @@ public void testCreateByHandle() {
public void testOffloadAndLoadReadRow() {
TaskResources$.MODULE$.runUnsafe(
() -> {
final int numRows = 100;
final ColumnarBatch batch = newArrowBatch("a boolean, b int", numRows);
final ArrowWritableColumnVector col0 = (ArrowWritableColumnVector) batch.column(0);
final ArrowWritableColumnVector col1 = (ArrowWritableColumnVector) batch.column(1);
for (int j = 0; j < numRows; j++) {
col0.putBoolean(j, j % 2 == 0);
col1.putInt(j, 15 - j);
}
col1.putNull(numRows - 1);
final int numRows = 20;
final ColumnarBatch batch = newArrowBatch(numRows);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(batch));
final ColumnarBatch offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
Expand All @@ -124,6 +121,26 @@ public void testOffloadAndLoadReadRow() {
});
}

@Test
public void testToString() {
TaskResources$.MODULE$.runUnsafe(
() -> {
final ColumnarBatch batch = newArrowBatch(20);
StructType structType = new StructType();
structType = structType.add("a", DataTypes.BooleanType, true);
structType = structType.add("b", DataTypes.IntegerType, true);
ColumnarBatch veloxBatch =
RowToVeloxColumnarExec.toColumnarBatchIterator(
JavaConverters.asScalaIterator(batch.rowIterator()), structType, 20)
.next();
Assert.assertEquals("[true,15]\n[false,14]", ColumnarBatches.toString(veloxBatch, 0, 2));
Assert.assertEquals(
"[true,-3]\n[false,null]", ColumnarBatches.toString(veloxBatch, 18, 2));
veloxBatch.close();
return null;
});
}

private static ColumnarBatch newArrowBatch(String schema, int numRows) {
final ArrowWritableColumnVector[] columns =
ArrowWritableColumnVector.allocateColumns(numRows, StructType.fromDDL(schema));
Expand All @@ -134,4 +151,23 @@ private static ColumnarBatch newArrowBatch(String schema, int numRows) {
batch.setNumRows(numRows);
return batch;
}

private static ColumnarBatch newArrowBatch(int numRows) {
String schema = "a boolean, b int";
final ArrowWritableColumnVector[] columns =
ArrowWritableColumnVector.allocateColumns(numRows, StructType.fromDDL(schema));
ArrowWritableColumnVector col1 = columns[0];
ArrowWritableColumnVector col2 = columns[1];
for (int j = 0; j < numRows; j++) {
col1.putBoolean(j, j % 2 == 0);
col2.putInt(j, 15 - j);
}
col2.putNull(numRows - 1);
for (ArrowWritableColumnVector col : columns) {
col.setValueCount(numRows);
}
final ColumnarBatch batch = new ColumnarBatch(columns);
batch.setNumRows(numRows);
return batch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
*/
package org.apache.gluten.columnarbatch;

import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
import org.apache.gluten.runtime.Runtime;
import org.apache.gluten.runtime.Runtimes;
import org.apache.gluten.utils.ArrowAbiUtil;
import org.apache.gluten.utils.ArrowUtil;
import org.apache.gluten.utils.ImplicitClass;
import org.apache.gluten.utils.InternalRowUtl;
import org.apache.gluten.vectorized.ArrowWritableColumnVector;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -31,6 +33,8 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.utils.SparkArrowUtil;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarBatchUtil;
Expand All @@ -39,6 +43,8 @@
import java.util.Iterator;
import java.util.NoSuchElementException;

import scala.collection.JavaConverters;

public class ColumnarBatches {

private ColumnarBatches() {}
Expand Down Expand Up @@ -353,4 +359,11 @@ public static void release(ColumnarBatch b) {
public static long getNativeHandle(ColumnarBatch batch) {
return getIndicatorVector(batch).handle();
}

public static String toString(ColumnarBatch batch, int start, int length) {
ColumnarBatch loadedBatch = ensureLoaded(ArrowBufferAllocators.contextInstance(), batch);
StructType type = SparkArrowUtil.fromArrowSchema(ArrowUtil.toSchema(loadedBatch));
return InternalRowUtl.toString(
type, JavaConverters.asScalaIterator(loadedBatch.rowIterator()), start, length);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.utils

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType

object InternalRowUtl {
def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
val encoder = RowEncoder(struct).resolveAndBind()
val deserializer = encoder.createDeserializer()
rows.map(deserializer).mkString(System.lineSeparator())
}

def toString(struct: StructType, rows: Iterator[InternalRow], start: Int, length: Int): String = {
toString(struct, rows.slice(start, start + length))
}

}

0 comments on commit c89c6fa

Please sign in to comment.