diff --git a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala index a8e65b0539c7..5629811f4d22 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, JoinedRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFileLinesReader, OutputWriterFactory, PartitionedFile} import org.apache.spark.sql.execution.datasources.csv.CSVDataSource -import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{DataSourceRegister, Filter} import org.apache.spark.sql.types.{StructField, StructType} @@ -310,16 +309,9 @@ object ArrowCSVFileFormat { schema: StructType, batchSize: Int, it: Iterator[InternalRow]): Iterator[ColumnarBatch] = { - // note, these metrics are unused but just make `RowToVeloxColumnarExec` happy - val numInputRows = new SQLMetric("numInputRows") - val numOutputBatches = new SQLMetric("numOutputBatches") - val convertTime = new SQLMetric("convertTime") val veloxBatch = RowToVeloxColumnarExec.toColumnarBatchIterator( it, schema, - numInputRows, - numOutputBatches, - convertTime, batchSize ) veloxBatch diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala index 9ceb6b5b6e52..aa30cc80d4db 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala @@ -92,6 +92,23 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBas } object RowToVeloxColumnarExec { + + def toColumnarBatchIterator( + it: Iterator[InternalRow], + schema: StructType, + columnBatchSize: Int): Iterator[ColumnarBatch] = { + val numInputRows = new SQLMetric("numInputRows") + val numOutputBatches = new SQLMetric("numOutputBatches") + val convertTime = new SQLMetric("convertTime") + RowToVeloxColumnarExec.toColumnarBatchIterator( + it, + schema, + numInputRows, + numOutputBatches, + convertTime, + columnBatchSize) + } + def toColumnarBatchIterator( it: Iterator[InternalRow], schema: StructType, diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala index d3fb9c3ffc70..4bd553b01235 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala @@ -99,6 +99,22 @@ case class VeloxColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExecBas } object VeloxColumnarToRowExec { + + def toRowIterator( + batches: Iterator[ColumnarBatch], + output: Seq[Attribute]): Iterator[InternalRow] = { + val numOutputRows = new SQLMetric("numOutputRows") + val numInputBatches = new SQLMetric("numInputBatches") + val convertTime = new SQLMetric("convertTime") + toRowIterator( + batches, + output, + numOutputRows, + numInputBatches, + convertTime + ) + } + def toRowIterator( batches: Iterator[ColumnarBatch], output: Seq[Attribute], diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala index db9e75a05681..7f4235fdf107 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala @@ -28,7 +28,6 @@ import org.apache.gluten.vectorized.ColumnarBatchSerializerJniWrapper import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer} @@ -134,22 +133,9 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe conf) } - // note, these metrics are unused but just make `RowToVeloxColumnarExec` happy - val metrics = BackendsApiManager.getMetricsApiInstance.genRowToColumnarMetrics( - SparkSession.getActiveSession.orNull.sparkContext) - val numInputRows = metrics("numInputRows") - val numOutputBatches = metrics("numOutputBatches") - val convertTime = metrics("convertTime") val numRows = conf.columnBatchSize val rddColumnarBatch = input.mapPartitions { - it => - RowToVeloxColumnarExec.toColumnarBatchIterator( - it, - localSchema, - numInputRows, - numOutputBatches, - convertTime, - numRows) + it => RowToVeloxColumnarExec.toColumnarBatchIterator(it, localSchema, numRows) } convertColumnarBatchToCachedBatch(rddColumnarBatch, schema, storageLevel, conf) } @@ -169,22 +155,10 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe conf) } - // note, these metrics are unused but just make `VeloxColumnarToRowExec` happy - val metrics = BackendsApiManager.getMetricsApiInstance.genColumnarToRowMetrics( - SparkSession.getActiveSession.orNull.sparkContext) - val numOutputRows = metrics("numOutputRows") - val numInputBatches = metrics("numInputBatches") - val convertTime = metrics("convertTime") val rddColumnarBatch = convertCachedBatchToColumnarBatch(input, cacheAttributes, selectedAttributes, conf) rddColumnarBatch.mapPartitions { - it => - VeloxColumnarToRowExec.toRowIterator( - it, - selectedAttributes, - numOutputRows, - numInputBatches, - convertTime) + it => VeloxColumnarToRowExec.toRowIterator(it, selectedAttributes) } } diff --git a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java index 3b78a4067793..298f11073bff 100644 --- a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java +++ b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java @@ -16,10 +16,12 @@ */ package org.apache.gluten.columnarbatch; +import org.apache.gluten.execution.RowToVeloxColumnarExec; import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators; import org.apache.gluten.test.VeloxBackendTestBase; import org.apache.gluten.vectorized.ArrowWritableColumnVector; +import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.vectorized.ColumnarBatch; import org.apache.spark.util.TaskResources$; @@ -30,6 +32,8 @@ import java.util.Spliterators; import java.util.stream.StreamSupport; +import scala.collection.JavaConverters; + public class ColumnarBatchTest extends VeloxBackendTestBase { @Test @@ -95,15 +99,8 @@ public void testCreateByHandle() { public void testOffloadAndLoadReadRow() { TaskResources$.MODULE$.runUnsafe( () -> { - final int numRows = 100; - final ColumnarBatch batch = newArrowBatch("a boolean, b int", numRows); - final ArrowWritableColumnVector col0 = (ArrowWritableColumnVector) batch.column(0); - final ArrowWritableColumnVector col1 = (ArrowWritableColumnVector) batch.column(1); - for (int j = 0; j < numRows; j++) { - col0.putBoolean(j, j % 2 == 0); - col1.putInt(j, 15 - j); - } - col1.putNull(numRows - 1); + final int numRows = 20; + final ColumnarBatch batch = newArrowBatch(numRows); Assert.assertTrue(ColumnarBatches.isHeavyBatch(batch)); final ColumnarBatch offloaded = ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch); @@ -124,6 +121,26 @@ public void testOffloadAndLoadReadRow() { }); } + @Test + public void testToString() { + TaskResources$.MODULE$.runUnsafe( + () -> { + final ColumnarBatch batch = newArrowBatch(20); + StructType structType = new StructType(); + structType = structType.add("a", DataTypes.BooleanType, true); + structType = structType.add("b", DataTypes.IntegerType, true); + ColumnarBatch veloxBatch = + RowToVeloxColumnarExec.toColumnarBatchIterator( + JavaConverters.asScalaIterator(batch.rowIterator()), structType, 20) + .next(); + Assert.assertEquals("[true,15]\n[false,14]", ColumnarBatches.toString(veloxBatch, 0, 2)); + Assert.assertEquals( + "[true,-3]\n[false,null]", ColumnarBatches.toString(veloxBatch, 18, 2)); + veloxBatch.close(); + return null; + }); + } + private static ColumnarBatch newArrowBatch(String schema, int numRows) { final ArrowWritableColumnVector[] columns = ArrowWritableColumnVector.allocateColumns(numRows, StructType.fromDDL(schema)); @@ -134,4 +151,23 @@ private static ColumnarBatch newArrowBatch(String schema, int numRows) { batch.setNumRows(numRows); return batch; } + + private static ColumnarBatch newArrowBatch(int numRows) { + String schema = "a boolean, b int"; + final ArrowWritableColumnVector[] columns = + ArrowWritableColumnVector.allocateColumns(numRows, StructType.fromDDL(schema)); + ArrowWritableColumnVector col1 = columns[0]; + ArrowWritableColumnVector col2 = columns[1]; + for (int j = 0; j < numRows; j++) { + col1.putBoolean(j, j % 2 == 0); + col2.putInt(j, 15 - j); + } + col2.putNull(numRows - 1); + for (ArrowWritableColumnVector col : columns) { + col.setValueCount(numRows); + } + final ColumnarBatch batch = new ColumnarBatch(columns); + batch.setNumRows(numRows); + return batch; + } } diff --git a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java index d00efd7b80c7..fd9c72c36060 100644 --- a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java +++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java @@ -16,11 +16,13 @@ */ package org.apache.gluten.columnarbatch; +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators; import org.apache.gluten.runtime.Runtime; import org.apache.gluten.runtime.Runtimes; import org.apache.gluten.utils.ArrowAbiUtil; import org.apache.gluten.utils.ArrowUtil; import org.apache.gluten.utils.ImplicitClass; +import org.apache.gluten.utils.InternalRowUtl; import org.apache.gluten.vectorized.ArrowWritableColumnVector; import com.google.common.annotations.VisibleForTesting; @@ -31,6 +33,8 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.utils.SparkArrowUtil; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; import org.apache.spark.sql.vectorized.ColumnarBatchUtil; @@ -39,6 +43,8 @@ import java.util.Iterator; import java.util.NoSuchElementException; +import scala.collection.JavaConverters; + public class ColumnarBatches { private ColumnarBatches() {} @@ -353,4 +359,11 @@ public static void release(ColumnarBatch b) { public static long getNativeHandle(ColumnarBatch batch) { return getIndicatorVector(batch).handle(); } + + public static String toString(ColumnarBatch batch, int start, int length) { + ColumnarBatch loadedBatch = ensureLoaded(ArrowBufferAllocators.contextInstance(), batch); + StructType type = SparkArrowUtil.fromArrowSchema(ArrowUtil.toSchema(loadedBatch)); + return InternalRowUtl.toString( + type, JavaConverters.asScalaIterator(loadedBatch.rowIterator()), start, length); + } } diff --git a/gluten-data/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala b/gluten-data/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala new file mode 100644 index 000000000000..32d694371393 --- /dev/null +++ b/gluten-data/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.utils + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.types.StructType + +object InternalRowUtl { + def toString(struct: StructType, rows: Iterator[InternalRow]): String = { + val encoder = RowEncoder(struct).resolveAndBind() + val deserializer = encoder.createDeserializer() + rows.map(deserializer).mkString(System.lineSeparator()) + } + + def toString(struct: StructType, rows: Iterator[InternalRow], start: Int, length: Int): String = { + toString(struct, rows.slice(start, start + length)) + } + +}