From 68e02e05ea041911123810ba4e2aba8ad32a0782 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 15 Dec 2024 14:07:33 -0800 Subject: [PATCH] more --- .../apache/comet/parquet/ColumnReader.java | 22 ------------------- .../comet/parquet/ConstantColumnReader.java | 4 ++-- .../comet/parquet/MetadataColumnReader.java | 9 ++++++-- .../comet/parquet/RowIndexColumnReader.java | 2 +- .../apache/comet/vector/CometPlainVector.java | 16 ++++++++++++++ .../sql/comet/CometColumnarToRowExec.scala | 10 ++++++++- 6 files changed, 35 insertions(+), 28 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java index 1cc42f62e..1927868a1 100644 --- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java @@ -172,28 +172,6 @@ public void close() { /** Returns a decoded {@link CometDecodedVector Comet vector}. */ public CometDecodedVector loadVector() { - // Only re-use Comet vector iff: - // 1. if we're not using dictionary encoding, since with dictionary encoding, the native - // side may fallback to plain encoding and the underlying memory address for the vector - // will change as result. - // 2. if the column type is of fixed width, in other words, string/binary are not supported - // since the native side may resize the vector and therefore change memory address. - // 3. if the last loaded vector contains null values: if values of last vector are all not - // null, Arrow C data API will skip loading the native validity buffer, therefore we - // should not re-use the vector in that case. - // 4. if the last loaded vector doesn't contain any null value, but the current vector also - // are all not null, which means we can also re-use the loaded vector. - // 5. if the new number of value is the same or smaller - if ((hadNull || currentNumNulls == 0) - && currentVector != null - && dictionary == null - && currentVector.isFixedLength() - && currentVector.numValues() >= currentNumValues) { - currentVector.setNumNulls(currentNumNulls); - currentVector.setNumValues(currentNumValues); - return currentVector; - } - LOG.debug("Reloading vector"); // Close the previous vector first to release struct memory allocated to import Arrow array & diff --git a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java index 8de2376f9..5fd348eeb 100644 --- a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java @@ -53,13 +53,13 @@ public ConstantColumnReader( public ConstantColumnReader( DataType type, ColumnDescriptor descriptor, Object value, boolean useDecimal128) { - super(type, descriptor, useDecimal128); + super(type, descriptor, useDecimal128, true); this.value = value; } ConstantColumnReader( DataType type, ColumnDescriptor descriptor, int batchSize, boolean useDecimal128) { - super(type, descriptor, useDecimal128); + super(type, descriptor, useDecimal128, true); this.batchSize = batchSize; initNative(); } diff --git a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java index 13b90e256..2820c42f8 100644 --- a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java @@ -40,9 +40,14 @@ public class MetadataColumnReader extends AbstractColumnReader { private ArrowArray array = null; private ArrowSchema schema = null; - public MetadataColumnReader(DataType type, ColumnDescriptor descriptor, boolean useDecimal128) { + private boolean isConstant; + + public MetadataColumnReader( + DataType type, ColumnDescriptor descriptor, boolean useDecimal128, boolean isConstant) { // TODO: should we handle legacy dates & timestamps for metadata columns? super(type, descriptor, useDecimal128, false); + + this.isConstant = isConstant; } @Override @@ -62,7 +67,7 @@ public void readBatch(int total) { Native.currentBatch(nativeHandle, arrayAddr, schemaAddr); FieldVector fieldVector = Data.importVector(allocator, array, schema, null); - vector = new CometPlainVector(fieldVector, useDecimal128); + vector = new CometPlainVector(fieldVector, useDecimal128, false, isConstant); } vector.setNumValues(total); diff --git a/common/src/main/java/org/apache/comet/parquet/RowIndexColumnReader.java b/common/src/main/java/org/apache/comet/parquet/RowIndexColumnReader.java index 8448318db..46e6ee67f 100644 --- a/common/src/main/java/org/apache/comet/parquet/RowIndexColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/RowIndexColumnReader.java @@ -33,7 +33,7 @@ public class RowIndexColumnReader extends MetadataColumnReader { private long offset; public RowIndexColumnReader(StructField field, int batchSize, long[] indices) { - super(field.dataType(), TypeUtil.convertToParquet(field), false); + super(field.dataType(), TypeUtil.convertToParquet(field), false, false); this.indices = indices; setBatchSize(batchSize); } diff --git a/common/src/main/java/org/apache/comet/vector/CometPlainVector.java b/common/src/main/java/org/apache/comet/vector/CometPlainVector.java index 65cc876bd..f3803d53a 100644 --- a/common/src/main/java/org/apache/comet/vector/CometPlainVector.java +++ b/common/src/main/java/org/apache/comet/vector/CometPlainVector.java @@ -38,11 +38,18 @@ public class CometPlainVector extends CometDecodedVector { private byte booleanByteCache; private int booleanByteCacheIndex = -1; + private boolean isReused; + public CometPlainVector(ValueVector vector, boolean useDecimal128) { this(vector, useDecimal128, false); } public CometPlainVector(ValueVector vector, boolean useDecimal128, boolean isUuid) { + this(vector, useDecimal128, isUuid, false); + } + + public CometPlainVector( + ValueVector vector, boolean useDecimal128, boolean isUuid, boolean isReused) { super(vector, vector.getField(), useDecimal128, isUuid); // NullType doesn't have data buffer. if (vector instanceof NullVector) { @@ -52,6 +59,15 @@ public CometPlainVector(ValueVector vector, boolean useDecimal128, boolean isUui } isBaseFixedWidthVector = valueVector instanceof BaseFixedWidthVector; + this.isReused = isReused; + } + + public boolean isReused() { + return isReused; + } + + public void setReused(boolean isReused) { + this.isReused = isReused; } @Override diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala index 1322129e0..18d95a473 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala @@ -34,6 +34,8 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.util.Utils +import org.apache.comet.vector.CometPlainVector + /** * Copied from Spark `ColumnarToRowExec`. Comet needs the fix for SPARK-50235 but cannot wait for * the fix to be released in Spark versions. We copy the implementation here to apply the fix. @@ -158,6 +160,7 @@ case class CometColumnarToRowExec(child: SparkPlan) val writableColumnVectorClz = classOf[WritableColumnVector].getName val constantColumnVectorClz = classOf[ConstantColumnVector].getName + val cometPlainColumnVectorClz = classOf[CometPlainVector].getName // scalastyle:off line.size.limit s""" @@ -176,8 +179,13 @@ case class CometColumnarToRowExec(child: SparkPlan) | | // Comet fix for SPARK-50235 | for (int i = 0; i < ${colVars.length}; i++) { - | if (!($batch.column(i) instanceof $writableColumnVectorClz || $batch.column(i) instanceof $constantColumnVectorClz)) { + | if (!($batch.column(i) instanceof $writableColumnVectorClz || $batch.column(i) instanceof $constantColumnVectorClz || $batch.column(i) instanceof $cometPlainColumnVectorClz)) { | $batch.column(i).close(); + | } else if ($batch.column(i) instanceof $cometPlainColumnVectorClz) { + | $cometPlainColumnVectorClz cometPlainColumnVector = ($cometPlainColumnVectorClz) $batch.column(i); + | if (!cometPlainColumnVector.isReused()) { + | cometPlainColumnVector.close(); + | } | } | } |