Skip to content

Commit

Permalink
[VL] Add helper function ColumnarBatches.toString and InternalRow toS…
Browse files Browse the repository at this point in the history
…tring (#6458)
  • Loading branch information
jinchengchenghh authored Aug 20, 2024
1 parent 45d629e commit 6d1de90
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, JoinedRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFileLinesReader, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types.{StructField, StructType}
Expand Down Expand Up @@ -310,16 +309,9 @@ object ArrowCSVFileFormat {
schema: StructType,
batchSize: Int,
it: Iterator[InternalRow]): Iterator[ColumnarBatch] = {
// note, these metrics are unused but just make `RowToVeloxColumnarExec` happy
val numInputRows = new SQLMetric("numInputRows")
val numOutputBatches = new SQLMetric("numOutputBatches")
val convertTime = new SQLMetric("convertTime")
val veloxBatch = RowToVeloxColumnarExec.toColumnarBatchIterator(
it,
schema,
numInputRows,
numOutputBatches,
convertTime,
batchSize
)
veloxBatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,23 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBas
}

object RowToVeloxColumnarExec {

def toColumnarBatchIterator(
it: Iterator[InternalRow],
schema: StructType,
columnBatchSize: Int): Iterator[ColumnarBatch] = {
val numInputRows = new SQLMetric("numInputRows")
val numOutputBatches = new SQLMetric("numOutputBatches")
val convertTime = new SQLMetric("convertTime")
RowToVeloxColumnarExec.toColumnarBatchIterator(
it,
schema,
numInputRows,
numOutputBatches,
convertTime,
columnBatchSize)
}

def toColumnarBatchIterator(
it: Iterator[InternalRow],
schema: StructType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,22 @@ case class VeloxColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExecBas
}

object VeloxColumnarToRowExec {

def toRowIterator(
batches: Iterator[ColumnarBatch],
output: Seq[Attribute]): Iterator[InternalRow] = {
val numOutputRows = new SQLMetric("numOutputRows")
val numInputBatches = new SQLMetric("numInputBatches")
val convertTime = new SQLMetric("convertTime")
toRowIterator(
batches,
output,
numOutputRows,
numInputBatches,
convertTime
)
}

def toRowIterator(
batches: Iterator[ColumnarBatch],
output: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.gluten.vectorized.ColumnarBatchSerializerJniWrapper

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
Expand Down Expand Up @@ -134,22 +133,9 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe
conf)
}

// note, these metrics are unused but just make `RowToVeloxColumnarExec` happy
val metrics = BackendsApiManager.getMetricsApiInstance.genRowToColumnarMetrics(
SparkSession.getActiveSession.orNull.sparkContext)
val numInputRows = metrics("numInputRows")
val numOutputBatches = metrics("numOutputBatches")
val convertTime = metrics("convertTime")
val numRows = conf.columnBatchSize
val rddColumnarBatch = input.mapPartitions {
it =>
RowToVeloxColumnarExec.toColumnarBatchIterator(
it,
localSchema,
numInputRows,
numOutputBatches,
convertTime,
numRows)
it => RowToVeloxColumnarExec.toColumnarBatchIterator(it, localSchema, numRows)
}
convertColumnarBatchToCachedBatch(rddColumnarBatch, schema, storageLevel, conf)
}
Expand All @@ -169,22 +155,10 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe
conf)
}

// note, these metrics are unused but just make `VeloxColumnarToRowExec` happy
val metrics = BackendsApiManager.getMetricsApiInstance.genColumnarToRowMetrics(
SparkSession.getActiveSession.orNull.sparkContext)
val numOutputRows = metrics("numOutputRows")
val numInputBatches = metrics("numInputBatches")
val convertTime = metrics("convertTime")
val rddColumnarBatch =
convertCachedBatchToColumnarBatch(input, cacheAttributes, selectedAttributes, conf)
rddColumnarBatch.mapPartitions {
it =>
VeloxColumnarToRowExec.toRowIterator(
it,
selectedAttributes,
numOutputRows,
numInputBatches,
convertTime)
it => VeloxColumnarToRowExec.toRowIterator(it, selectedAttributes)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
*/
package org.apache.gluten.columnarbatch;

import org.apache.gluten.execution.RowToVeloxColumnarExec;
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
import org.apache.gluten.test.VeloxBackendTestBase;
import org.apache.gluten.vectorized.ArrowWritableColumnVector;

import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.util.TaskResources$;
Expand All @@ -30,6 +32,8 @@
import java.util.Spliterators;
import java.util.stream.StreamSupport;

import scala.collection.JavaConverters;

public class ColumnarBatchTest extends VeloxBackendTestBase {

@Test
Expand Down Expand Up @@ -95,7 +99,7 @@ public void testCreateByHandle() {
public void testOffloadAndLoadReadRow() {
TaskResources$.MODULE$.runUnsafe(
() -> {
final int numRows = 100;
final int numRows = 20;
final ColumnarBatch batch = newArrowBatch("a boolean, b int", numRows);
final ArrowWritableColumnVector col0 = (ArrowWritableColumnVector) batch.column(0);
final ArrowWritableColumnVector col1 = (ArrowWritableColumnVector) batch.column(1);
Expand Down Expand Up @@ -124,6 +128,34 @@ public void testOffloadAndLoadReadRow() {
});
}

@Test
public void testToString() {
TaskResources$.MODULE$.runUnsafe(
() -> {
final int numRows = 20;
final ColumnarBatch batch = newArrowBatch("a boolean, b int", numRows);
final ArrowWritableColumnVector col0 = (ArrowWritableColumnVector) batch.column(0);
final ArrowWritableColumnVector col1 = (ArrowWritableColumnVector) batch.column(1);
for (int j = 0; j < numRows; j++) {
col0.putBoolean(j, j % 2 == 0);
col1.putInt(j, 15 - j);
}
col1.putNull(numRows - 1);
StructType structType = new StructType();
structType = structType.add("a", DataTypes.BooleanType, true);
structType = structType.add("b", DataTypes.IntegerType, true);
ColumnarBatch veloxBatch =
RowToVeloxColumnarExec.toColumnarBatchIterator(
JavaConverters.asScalaIterator(batch.rowIterator()), structType, numRows)
.next();
Assert.assertEquals("[true,15]\n[false,14]", ColumnarBatches.toString(veloxBatch, 0, 2));
Assert.assertEquals(
"[true,-3]\n[false,null]", ColumnarBatches.toString(veloxBatch, 18, 2));
veloxBatch.close();
return null;
});
}

private static ColumnarBatch newArrowBatch(String schema, int numRows) {
final ArrowWritableColumnVector[] columns =
ArrowWritableColumnVector.allocateColumns(numRows, StructType.fromDDL(schema));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
*/
package org.apache.gluten.columnarbatch;

import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
import org.apache.gluten.runtime.Runtime;
import org.apache.gluten.runtime.Runtimes;
import org.apache.gluten.utils.ArrowAbiUtil;
import org.apache.gluten.utils.ArrowUtil;
import org.apache.gluten.utils.ImplicitClass;
import org.apache.gluten.utils.InternalRowUtl;
import org.apache.gluten.vectorized.ArrowWritableColumnVector;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -31,6 +33,8 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.utils.SparkArrowUtil;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarBatchUtil;
Expand All @@ -39,6 +43,8 @@
import java.util.Iterator;
import java.util.NoSuchElementException;

import scala.collection.JavaConverters;

public class ColumnarBatches {

private ColumnarBatches() {}
Expand Down Expand Up @@ -353,4 +359,11 @@ public static void release(ColumnarBatch b) {
public static long getNativeHandle(ColumnarBatch batch) {
return getIndicatorVector(batch).handle();
}

public static String toString(ColumnarBatch batch, int start, int length) {
ColumnarBatch loadedBatch = ensureLoaded(ArrowBufferAllocators.contextInstance(), batch);
StructType type = SparkArrowUtil.fromArrowSchema(ArrowUtil.toSchema(loadedBatch));
return InternalRowUtl.toString(
type, JavaConverters.asScalaIterator(loadedBatch.rowIterator()), start, length);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.utils

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType

object InternalRowUtl {
def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
val encoder = RowEncoder(struct).resolveAndBind()
val deserializer = encoder.createDeserializer()
rows.map(deserializer).mkString(System.lineSeparator())
}

def toString(struct: StructType, rows: Iterator[InternalRow], start: Int, length: Int): String = {
toString(struct, rows.slice(start, start + length))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.utils

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType

object InternalRowUtl {
def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
val encoder = RowEncoder(struct).resolveAndBind()
val deserializer = encoder.createDeserializer()
rows.map(deserializer).mkString(System.lineSeparator())
}

def toString(struct: StructType, rows: Iterator[InternalRow], start: Int, length: Int): String = {
toString(struct, rows.slice(start, start + length))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.utils

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType

object InternalRowUtl {
def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
val encoder = RowEncoder(struct).resolveAndBind()
val deserializer = encoder.createDeserializer()
rows.map(deserializer).mkString(System.lineSeparator())
}

def toString(struct: StructType, rows: Iterator[InternalRow], start: Int, length: Int): String = {
toString(struct, rows.slice(start, start + length))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.utils

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types.StructType

object InternalRowUtl {
def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
val encoder = ExpressionEncoder(struct).resolveAndBind()
val deserializer = encoder.createDeserializer()
rows.map(deserializer).mkString(System.lineSeparator())
}

def toString(struct: StructType, rows: Iterator[InternalRow], start: Int, length: Int): String = {
toString(struct, rows.slice(start, start + length))
}
}

0 comments on commit 6d1de90

Please sign in to comment.