diff --git a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala index a8e65b0539c7c..5629811f4d226 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, JoinedRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFileLinesReader, OutputWriterFactory, PartitionedFile} import org.apache.spark.sql.execution.datasources.csv.CSVDataSource -import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{DataSourceRegister, Filter} import org.apache.spark.sql.types.{StructField, StructType} @@ -310,16 +309,9 @@ object ArrowCSVFileFormat { schema: StructType, batchSize: Int, it: Iterator[InternalRow]): Iterator[ColumnarBatch] = { - // note, these metrics are unused but just make `RowToVeloxColumnarExec` happy - val numInputRows = new SQLMetric("numInputRows") - val numOutputBatches = new SQLMetric("numOutputBatches") - val convertTime = new SQLMetric("convertTime") val veloxBatch = RowToVeloxColumnarExec.toColumnarBatchIterator( it, schema, - numInputRows, - numOutputBatches, - convertTime, batchSize ) veloxBatch diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala index 2f3e88f9af9cb..4a6902cabaf9a 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala @@ -92,6 +92,23 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBas } object RowToVeloxColumnarExec { + + def toColumnarBatchIterator( + it: Iterator[InternalRow], + schema: StructType, + columnBatchSize: Int): Iterator[ColumnarBatch] = { + val numInputRows = new SQLMetric("numInputRows") + val numOutputBatches = new SQLMetric("numOutputBatches") + val convertTime = new SQLMetric("convertTime") + RowToVeloxColumnarExec.toColumnarBatchIterator( + it, + schema, + numInputRows, + numOutputBatches, + convertTime, + columnBatchSize) + } + def toColumnarBatchIterator( it: Iterator[InternalRow], schema: StructType, diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala index 2c46893e4576a..f37d6b33fe9fa 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala @@ -99,6 +99,22 @@ case class VeloxColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExecBas } object VeloxColumnarToRowExec { + + def toRowIterator( + batches: Iterator[ColumnarBatch], + output: Seq[Attribute]): Iterator[InternalRow] = { + val numOutputRows = new SQLMetric("numOutputRows") + val numInputBatches = new SQLMetric("numInputBatches") + val convertTime = new SQLMetric("convertTime") + toRowIterator( + batches, + output, + numOutputRows, + numInputBatches, + convertTime + ) + } + def toRowIterator( batches: Iterator[ColumnarBatch], output: Seq[Attribute], diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala index 15fd51abef489..06ef9ffdfab84 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala @@ -28,7 +28,6 @@ import org.apache.gluten.vectorized.ColumnarBatchSerializerJniWrapper import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer} @@ -134,22 +133,9 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe conf) } - // note, these metrics are unused but just make `RowToVeloxColumnarExec` happy - val metrics = BackendsApiManager.getMetricsApiInstance.genRowToColumnarMetrics( - SparkSession.getActiveSession.orNull.sparkContext) - val numInputRows = metrics("numInputRows") - val numOutputBatches = metrics("numOutputBatches") - val convertTime = metrics("convertTime") val numRows = conf.columnBatchSize val rddColumnarBatch = input.mapPartitions { - it => - RowToVeloxColumnarExec.toColumnarBatchIterator( - it, - localSchema, - numInputRows, - numOutputBatches, - convertTime, - numRows) + it => RowToVeloxColumnarExec.toColumnarBatchIterator(it, localSchema, numRows) } convertColumnarBatchToCachedBatch(rddColumnarBatch, schema, storageLevel, conf) } @@ -169,22 +155,10 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe conf) } - // note, these metrics are unused but just make `VeloxColumnarToRowExec` happy - val metrics = BackendsApiManager.getMetricsApiInstance.genColumnarToRowMetrics( - SparkSession.getActiveSession.orNull.sparkContext) - val numOutputRows = metrics("numOutputRows") - val numInputBatches = metrics("numInputBatches") - val convertTime = metrics("convertTime") val rddColumnarBatch = convertCachedBatchToColumnarBatch(input, cacheAttributes, selectedAttributes, conf) rddColumnarBatch.mapPartitions { - it => - VeloxColumnarToRowExec.toRowIterator( - it, - selectedAttributes, - numOutputRows, - numInputBatches, - convertTime) + it => VeloxColumnarToRowExec.toRowIterator(it, selectedAttributes) } } diff --git a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java index cd2ac50d350c3..55399bda83073 100644 --- a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java +++ b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java @@ -16,10 +16,12 @@ */ package org.apache.gluten.columnarbatch; +import org.apache.gluten.execution.RowToVeloxColumnarExec; import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators; import org.apache.gluten.test.VeloxBackendTestBase; import org.apache.gluten.vectorized.ArrowWritableColumnVector; +import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.vectorized.ColumnarBatch; import org.apache.spark.util.TaskResources$; @@ -30,6 +32,8 @@ import java.util.Spliterators; import java.util.stream.StreamSupport; +import scala.collection.JavaConverters; + public class ColumnarBatchTest extends VeloxBackendTestBase { @Test @@ -91,6 +95,25 @@ public void testCreateByHandle() { }); } + @Test + public void testToString() { + TaskResources$.MODULE$.runUnsafe( + () -> { + final ColumnarBatch batch = newArrowBatch(20); + StructType structType = new StructType(); + structType = structType.add("a", DataTypes.BooleanType, true); + structType = structType.add("b", DataTypes.IntegerType, true); + ColumnarBatch veloxBatch = + RowToVeloxColumnarExec.toColumnarBatchIterator( + JavaConverters.asScalaIterator(batch.rowIterator()), structType, 20) + .next(); + Assert.assertEquals( + "0: {true, 15}\n1: {false, 14}", ColumnarBatches.toString(veloxBatch, 0, 2)); + veloxBatch.close(); + return null; + }); + } + private static ColumnarBatch newArrowBatch(String schema, int numRows) { final ArrowWritableColumnVector[] columns = ArrowWritableColumnVector.allocateColumns(numRows, StructType.fromDDL(schema)); @@ -101,4 +124,22 @@ private static ColumnarBatch newArrowBatch(String schema, int numRows) { batch.setNumRows(numRows); return batch; } + + private static ColumnarBatch newArrowBatch(int numRows) { + String schema = "a boolean, b int"; + final ArrowWritableColumnVector[] columns = + ArrowWritableColumnVector.allocateColumns(numRows, StructType.fromDDL(schema)); + ArrowWritableColumnVector col1 = columns[0]; + ArrowWritableColumnVector col2 = columns[1]; + for (int j = 0; j < numRows; j++) { + col1.putBoolean(j, j % 2 == 0); + col2.putInt(j, 15 - j); + } + for (ArrowWritableColumnVector col : columns) { + col.setValueCount(numRows); + } + final ColumnarBatch batch = new ColumnarBatch(columns); + batch.setNumRows(numRows); + return batch; + } } diff --git a/cpp/core/memory/ColumnarBatch.h b/cpp/core/memory/ColumnarBatch.h index 55da2dac6a53a..b4ab5f220f08c 100644 --- a/cpp/core/memory/ColumnarBatch.h +++ b/cpp/core/memory/ColumnarBatch.h @@ -52,7 +52,9 @@ class ColumnarBatch { // Serializes one single row to byte array that can be accessed as Spark-compatible unsafe row. virtual std::vector toUnsafeRow(int32_t rowId) const; - virtual std::string toString(int32_t start, int32_t length) const = 0; + virtual std::string toString(int32_t start, int32_t length) const { + throw GlutenException("Not implement"); + } friend std::ostream& operator<<(std::ostream& os, const ColumnarBatch& columnarBatch); diff --git a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java index 275df41cd55ca..afe71ea75a612 100644 --- a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java +++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java @@ -100,7 +100,6 @@ private static void transferVectors(ColumnarBatch from, ColumnarBatch target) { newVectors[i] = from.column(i); } FIELD_COLUMNS.set(target, newVectors); - System.out.println(); } catch (IllegalAccessException e) { throw new GlutenException(e); } @@ -382,8 +381,7 @@ public static long getNativeHandle(ColumnarBatch batch) { } public static String toString(ColumnarBatch batch, int start, int length) { - return ColumnarBatchJniWrapper - .create(Runtimes.contextInstance("ColumnarBatches#toString")) + return ColumnarBatchJniWrapper.create(Runtimes.contextInstance("ColumnarBatches#toString")) .toString(getNativeHandle(batch), start, length); } }