Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Dec 16, 2024
1 parent bbd6b3f commit 68e02e0
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 28 deletions.
22 changes: 0 additions & 22 deletions common/src/main/java/org/apache/comet/parquet/ColumnReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,28 +172,6 @@ public void close() {

/** Returns a decoded {@link CometDecodedVector Comet vector}. */
public CometDecodedVector loadVector() {
// Only re-use Comet vector iff:
// 1. if we're not using dictionary encoding, since with dictionary encoding, the native
// side may fallback to plain encoding and the underlying memory address for the vector
// will change as result.
// 2. if the column type is of fixed width, in other words, string/binary are not supported
// since the native side may resize the vector and therefore change memory address.
// 3. if the last loaded vector contains null values: if values of last vector are all not
// null, Arrow C data API will skip loading the native validity buffer, therefore we
// should not re-use the vector in that case.
// 4. if the last loaded vector doesn't contain any null value, but the current vector also
// are all not null, which means we can also re-use the loaded vector.
// 5. if the new number of value is the same or smaller
if ((hadNull || currentNumNulls == 0)
&& currentVector != null
&& dictionary == null
&& currentVector.isFixedLength()
&& currentVector.numValues() >= currentNumValues) {
currentVector.setNumNulls(currentNumNulls);
currentVector.setNumValues(currentNumValues);
return currentVector;
}

LOG.debug("Reloading vector");

// Close the previous vector first to release struct memory allocated to import Arrow array &
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ public ConstantColumnReader(

public ConstantColumnReader(
DataType type, ColumnDescriptor descriptor, Object value, boolean useDecimal128) {
super(type, descriptor, useDecimal128);
super(type, descriptor, useDecimal128, true);
this.value = value;
}

ConstantColumnReader(
DataType type, ColumnDescriptor descriptor, int batchSize, boolean useDecimal128) {
super(type, descriptor, useDecimal128);
super(type, descriptor, useDecimal128, true);
this.batchSize = batchSize;
initNative();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,14 @@ public class MetadataColumnReader extends AbstractColumnReader {
private ArrowArray array = null;
private ArrowSchema schema = null;

public MetadataColumnReader(DataType type, ColumnDescriptor descriptor, boolean useDecimal128) {
private boolean isConstant;

public MetadataColumnReader(
DataType type, ColumnDescriptor descriptor, boolean useDecimal128, boolean isConstant) {
// TODO: should we handle legacy dates & timestamps for metadata columns?
super(type, descriptor, useDecimal128, false);

this.isConstant = isConstant;
}

@Override
Expand All @@ -62,7 +67,7 @@ public void readBatch(int total) {

Native.currentBatch(nativeHandle, arrayAddr, schemaAddr);
FieldVector fieldVector = Data.importVector(allocator, array, schema, null);
vector = new CometPlainVector(fieldVector, useDecimal128);
vector = new CometPlainVector(fieldVector, useDecimal128, false, isConstant);
}

vector.setNumValues(total);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class RowIndexColumnReader extends MetadataColumnReader {
private long offset;

public RowIndexColumnReader(StructField field, int batchSize, long[] indices) {
super(field.dataType(), TypeUtil.convertToParquet(field), false);
super(field.dataType(), TypeUtil.convertToParquet(field), false, false);
this.indices = indices;
setBatchSize(batchSize);
}
Expand Down
16 changes: 16 additions & 0 deletions common/src/main/java/org/apache/comet/vector/CometPlainVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,18 @@ public class CometPlainVector extends CometDecodedVector {
private byte booleanByteCache;
private int booleanByteCacheIndex = -1;

private boolean isReused;

public CometPlainVector(ValueVector vector, boolean useDecimal128) {
this(vector, useDecimal128, false);
}

public CometPlainVector(ValueVector vector, boolean useDecimal128, boolean isUuid) {
this(vector, useDecimal128, isUuid, false);
}

public CometPlainVector(
ValueVector vector, boolean useDecimal128, boolean isUuid, boolean isReused) {
super(vector, vector.getField(), useDecimal128, isUuid);
// NullType doesn't have data buffer.
if (vector instanceof NullVector) {
Expand All @@ -52,6 +59,15 @@ public CometPlainVector(ValueVector vector, boolean useDecimal128, boolean isUui
}

isBaseFixedWidthVector = valueVector instanceof BaseFixedWidthVector;
this.isReused = isReused;
}

public boolean isReused() {
return isReused;
}

public void setReused(boolean isReused) {
this.isReused = isReused;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.util.Utils

import org.apache.comet.vector.CometPlainVector

/**
* Copied from Spark `ColumnarToRowExec`. Comet needs the fix for SPARK-50235 but cannot wait for
* the fix to be released in Spark versions. We copy the implementation here to apply the fix.
Expand Down Expand Up @@ -158,6 +160,7 @@ case class CometColumnarToRowExec(child: SparkPlan)

val writableColumnVectorClz = classOf[WritableColumnVector].getName
val constantColumnVectorClz = classOf[ConstantColumnVector].getName
val cometPlainColumnVectorClz = classOf[CometPlainVector].getName

// scalastyle:off line.size.limit
s"""
Expand All @@ -176,8 +179,13 @@ case class CometColumnarToRowExec(child: SparkPlan)
|
| // Comet fix for SPARK-50235
| for (int i = 0; i < ${colVars.length}; i++) {
| if (!($batch.column(i) instanceof $writableColumnVectorClz || $batch.column(i) instanceof $constantColumnVectorClz)) {
| if (!($batch.column(i) instanceof $writableColumnVectorClz || $batch.column(i) instanceof $constantColumnVectorClz || $batch.column(i) instanceof $cometPlainColumnVectorClz)) {
| $batch.column(i).close();
| } else if ($batch.column(i) instanceof $cometPlainColumnVectorClz) {
| $cometPlainColumnVectorClz cometPlainColumnVector = ($cometPlainColumnVectorClz) $batch.column(i);
| if (!cometPlainColumnVector.isReused()) {
| cometPlainColumnVector.close();
| }
| }
| }
|
Expand Down

0 comments on commit 68e02e0

Please sign in to comment.