Skip to content

Commit

Permalink
upmerge
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Nov 11, 2024
2 parents e52fe77 + 82a9b95 commit 16033d9
Show file tree
Hide file tree
Showing 22 changed files with 1,080 additions and 1,078 deletions.
83 changes: 46 additions & 37 deletions common/src/main/java/org/apache/comet/parquet/ColumnReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.CometSchemaImporter;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
Expand All @@ -50,6 +52,7 @@

public class ColumnReader extends AbstractColumnReader {
protected static final Logger LOG = LoggerFactory.getLogger(ColumnReader.class);
protected final BufferAllocator ALLOCATOR = new RootAllocator();

/**
* The current Comet vector holding all the values read by this column reader. Owned by this
Expand Down Expand Up @@ -87,6 +90,9 @@ public class ColumnReader extends AbstractColumnReader {

private final CometSchemaImporter importer;

private ArrowArray array = null;
private ArrowSchema schema = null;

public ColumnReader(
DataType type,
ColumnDescriptor descriptor,
Expand Down Expand Up @@ -201,53 +207,56 @@ public CometDecodedVector loadVector() {
boolean isUuid =
logicalTypeAnnotation instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;

long[] addresses = Native.currentBatch(nativeHandle);
array = ArrowArray.allocateNew(ALLOCATOR);
schema = ArrowSchema.allocateNew(ALLOCATOR);

try (ArrowArray array = ArrowArray.wrap(addresses[0]);
ArrowSchema schema = ArrowSchema.wrap(addresses[1])) {
FieldVector vector = importer.importVector(array, schema);
long arrayAddr = array.memoryAddress();
long schemaAddr = schema.memoryAddress();

DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary();
Native.currentBatch(nativeHandle, arrayAddr, schemaAddr);

CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128);
FieldVector vector = importer.importVector(array, schema);

// Update whether the current vector contains any null values. This is used in the following
// batch(s) to determine whether we can skip loading the native vector.
hadNull = cometVector.hasNull();
DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary();

if (dictionaryEncoding == null) {
if (dictionary != null) {
// This means the column was using dictionary encoding but now has fall-back to plain
// encoding, on the native side. Setting 'dictionary' to null here, so we can use it as
// a condition to check if we can re-use vector later.
dictionary = null;
}
// Either the column is not dictionary encoded, or it was using dictionary encoding but
// a new data page has switched back to use plain encoding. For both cases we should
// return plain vector.
currentVector = cometVector;
return currentVector;
}
CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128);

// Update whether the current vector contains any null values. This is used in the following
// batch(s) to determine whether we can skip loading the native vector.
hadNull = cometVector.hasNull();

// We should already re-initiate `CometDictionary` here because `Data.importVector` API will
// release the previous dictionary vector and create a new one.
Dictionary arrowDictionary = importer.getProvider().lookup(dictionaryEncoding.getId());
CometPlainVector dictionaryVector =
new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid);
if (dictionaryEncoding == null) {
if (dictionary != null) {
dictionary.setDictionaryVector(dictionaryVector);
} else {
dictionary = new CometDictionary(dictionaryVector);
// This means the column was using dictionary encoding but now has fall-back to plain
// encoding, on the native side. Setting 'dictionary' to null here, so we can use it as
// a condition to check if we can re-use vector later.
dictionary = null;
}

currentVector =
new CometDictionaryVector(
cometVector, dictionary, importer.getProvider(), useDecimal128, false, isUuid);

currentVector =
new CometDictionaryVector(cometVector, dictionary, importer.getProvider(), useDecimal128);
// Either the column is not dictionary encoded, or it was using dictionary encoding but
// a new data page has switched back to use plain encoding. For both cases we should
// return plain vector.
currentVector = cometVector;
return currentVector;
}

// We should already re-initiate `CometDictionary` here because `Data.importVector` API will
// release the previous dictionary vector and create a new one.
Dictionary arrowDictionary = importer.getProvider().lookup(dictionaryEncoding.getId());
CometPlainVector dictionaryVector =
new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid);
if (dictionary != null) {
dictionary.setDictionaryVector(dictionaryVector);
} else {
dictionary = new CometDictionary(dictionaryVector);
}

currentVector =
new CometDictionaryVector(
cometVector, dictionary, importer.getProvider(), useDecimal128, false, isUuid);

currentVector =
new CometDictionaryVector(cometVector, dictionary, importer.getProvider(), useDecimal128);
return currentVector;
}

protected void readPage() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@
/** A metadata column reader that can be extended by {@link RowIndexColumnReader} etc. */
public class MetadataColumnReader extends AbstractColumnReader {
private final BufferAllocator allocator = new RootAllocator();

private CometVector vector;

private ArrowArray array = null;
private ArrowSchema schema = null;

public MetadataColumnReader(DataType type, ColumnDescriptor descriptor, boolean useDecimal128) {
// TODO: should we handle legacy dates & timestamps for metadata columns?
super(type, descriptor, useDecimal128, false);
Expand All @@ -50,13 +54,17 @@ public void setBatchSize(int batchSize) {
@Override
public void readBatch(int total) {
if (vector == null) {
long[] addresses = Native.currentBatch(nativeHandle);
try (ArrowArray array = ArrowArray.wrap(addresses[0]);
ArrowSchema schema = ArrowSchema.wrap(addresses[1])) {
FieldVector fieldVector = Data.importVector(allocator, array, schema, null);
vector = new CometPlainVector(fieldVector, useDecimal128);
}
array = ArrowArray.allocateNew(allocator);
schema = ArrowSchema.allocateNew(allocator);

long arrayAddr = array.memoryAddress();
long schemaAddr = schema.memoryAddress();

Native.currentBatch(nativeHandle, arrayAddr, schemaAddr);
FieldVector fieldVector = Data.importVector(allocator, array, schema, null);
vector = new CometPlainVector(fieldVector, useDecimal128);
}

vector.setNumValues(total);
}

Expand Down
6 changes: 3 additions & 3 deletions common/src/main/java/org/apache/comet/parquet/Native.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ public static native void setPageV2(
* Returns the current batch constructed via 'readBatch'
*
* @param handle the handle to the native Parquet column reader
* @return a long array with 2 elements, the first is the address to native Arrow array, and the
* second is the address to the Arrow schema.
* @param arrayAddr the memory address to the ArrowArray struct
* @param schemaAddr the memory address to the ArrowSchema struct
*/
public static native long[] currentBatch(long handle);
public static native void currentBatch(long handle, long arrayAddr, long schemaAddr);

/** Set methods to set a constant value for the reader, so it'll return constant vectors */
public static native void setNull(long handle);
Expand Down
77 changes: 37 additions & 40 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object CometConf extends ShimCometConf {
"Whether to enable native scans. When this is turned on, Spark will use Comet to " +
"read supported data sources (currently only Parquet is supported natively). Note " +
"that to enable native vectorized execution, both this config and " +
"'spark.comet.exec.enabled' need to be enabled. By default, this config is true.")
"'spark.comet.exec.enabled' need to be enabled.")
.booleanConf
.createWithDefault(true)

Expand All @@ -82,7 +82,7 @@ object CometConf extends ShimCometConf {
.doc(
"Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads " +
"ranges of consecutive data in a file in parallel. It is faster for large files and " +
"row groups but uses more resources. The parallel reader is enabled by default.")
"row groups but uses more resources.")
.booleanConf
.createWithDefault(true)

Expand All @@ -98,7 +98,7 @@ object CometConf extends ShimCometConf {
.doc(
"When enabled the parallel reader will try to merge ranges of data that are separated " +
"by less than 'comet.parquet.read.io.mergeRanges.delta' bytes. Longer continuous reads " +
"are faster on cloud storage. The default behavior is to merge consecutive ranges.")
"are faster on cloud storage.")
.booleanConf
.createWithDefault(true)

Expand All @@ -115,7 +115,7 @@ object CometConf extends ShimCometConf {
.doc("In the parallel reader, if the read ranges submitted are skewed in sizes, this " +
"option will cause the reader to break up larger read ranges into smaller ranges to " +
"reduce the skew. This will result in a slightly larger number of connections opened to " +
"the file system but may give improved performance. The option is off by default.")
"the file system but may give improved performance.")
.booleanConf
.createWithDefault(false)

Expand Down Expand Up @@ -153,7 +153,7 @@ object CometConf extends ShimCometConf {
"native space. Note: each operator is associated with a separate config in the " +
"format of 'spark.comet.exec.<operator_name>.enabled' at the moment, and both the " +
"config and this need to be turned on, in order for the operator to be executed in " +
"native. By default, this config is true.")
"native.")
.booleanConf
.createWithDefault(true)

Expand Down Expand Up @@ -215,7 +215,7 @@ object CometConf extends ShimCometConf {
"spark.comet.memory.overhead.factor")
.doc(
"Fraction of executor memory to be allocated as additional non-heap memory per executor " +
"process for Comet. Default value is 0.2.")
"process for Comet.")
.doubleConf
.checkValue(
factor => factor > 0,
Expand Down Expand Up @@ -247,8 +247,7 @@ object CometConf extends ShimCometConf {
"is enabled. Available modes are 'native', 'jvm', and 'auto'. " +
"'native' is for native shuffle which has best performance in general. " +
"'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. " +
"'auto' is for Comet to choose the best shuffle mode based on the query plan. " +
"By default, this config is 'auto'.")
"'auto' is for Comet to choose the best shuffle mode based on the query plan.")
.internal()
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
Expand All @@ -258,8 +257,8 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
.doc(
"Whether to force enabling broadcasting for Comet native operators. By default, " +
"this config is false. Comet broadcast feature will be enabled automatically by " +
"Whether to force enabling broadcasting for Comet native operators. " +
"Comet broadcast feature will be enabled automatically by " +
"Comet extension. But for unit tests, we need this feature to force enabling it " +
"for invalid cases. So this config is only used for unit test.")
.internal()
Expand All @@ -280,27 +279,26 @@ object CometConf extends ShimCometConf {
.stringConf
.createWithDefault("zstd")

val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] = conf(
"spark.comet.columnar.shuffle.async.enabled")
.doc(
"Whether to enable asynchronous shuffle for Arrow-based shuffle. By default, this config " +
"is false.")
.booleanConf
.createWithDefault(false)
val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.columnar.shuffle.async.enabled")
.doc("Whether to enable asynchronous shuffle for Arrow-based shuffle.")
.booleanConf
.createWithDefault(false)

val COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] =
conf("spark.comet.columnar.shuffle.async.thread.num")
.doc("Number of threads used for Comet async columnar shuffle per shuffle task. " +
"By default, this config is 3. Note that more threads means more memory requirement to " +
"buffer shuffle data before flushing to disk. Also, more threads may not always " +
"improve performance, and should be set based on the number of cores available.")
.doc(
"Number of threads used for Comet async columnar shuffle per shuffle task. " +
"Note that more threads means more memory requirement to " +
"buffer shuffle data before flushing to disk. Also, more threads may not always " +
"improve performance, and should be set based on the number of cores available.")
.intConf
.createWithDefault(3)

val COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = {
conf("spark.comet.columnar.shuffle.async.max.thread.num")
.doc("Maximum number of threads on an executor used for Comet async columnar shuffle. " +
"By default, this config is 100. This is the upper bound of total number of shuffle " +
"This is the upper bound of total number of shuffle " +
"threads per executor. In other words, if the number of cores * the number of shuffle " +
"threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than " +
"this config. Comet will use this config as the number of shuffle threads per " +
Expand All @@ -317,8 +315,7 @@ object CometConf extends ShimCometConf {
"Higher value means more memory requirement to buffer shuffle data before " +
"flushing to disk. As Comet uses columnar shuffle which is columnar format, " +
"higher value usually helps to improve shuffle data compression ratio. This is " +
"internal config for testing purpose or advanced tuning. By default, " +
"this config is Int.Max.")
"internal config for testing purpose or advanced tuning.")
.internal()
.intConf
.createWithDefault(Int.MaxValue)
Expand All @@ -341,8 +338,7 @@ object CometConf extends ShimCometConf {
.doc(
"Fraction of Comet memory to be allocated per executor process for Comet shuffle. " +
"Comet memory size is specified by `spark.comet.memoryOverhead` or " +
"calculated by `spark.comet.memory.overhead.factor` * `spark.executor.memory`. " +
"By default, this config is 1.0.")
"calculated by `spark.comet.memory.overhead.factor` * `spark.executor.memory`.")
.doubleConf
.checkValue(
factor => factor > 0,
Expand All @@ -360,11 +356,12 @@ object CometConf extends ShimCometConf {

val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf(
"spark.comet.shuffle.preferDictionary.ratio")
.doc("The ratio of total values to distinct values in a string column to decide whether to " +
"prefer dictionary encoding when shuffling the column. If the ratio is higher than " +
"this config, dictionary encoding will be used on shuffling string column. This config " +
"is effective if it is higher than 1.0. By default, this config is 10.0. Note that this " +
"config is only used when `spark.comet.exec.shuffle.mode` is `jvm`.")
.doc(
"The ratio of total values to distinct values in a string column to decide whether to " +
"prefer dictionary encoding when shuffling the column. If the ratio is higher than " +
"this config, dictionary encoding will be used on shuffling string column. This config " +
"is effective if it is higher than 1.0. Note that this " +
"config is only used when `spark.comet.exec.shuffle.mode` is `jvm`.")
.doubleConf
.createWithDefault(10.0)

Expand All @@ -377,7 +374,7 @@ object CometConf extends ShimCometConf {
val COMET_DEBUG_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.debug.enabled")
.doc(
"Whether to enable debug mode for Comet. By default, this config is false. " +
"Whether to enable debug mode for Comet. " +
"When enabled, Comet will do additional checks for debugging purpose. For example, " +
"validating array when importing arrays from JVM at native side. Note that these " +
"checks may be expensive in performance and should only be enabled for debugging " +
Expand Down Expand Up @@ -437,27 +434,27 @@ object CometConf extends ShimCometConf {
"The fraction of memory from Comet memory overhead that the native memory " +
"manager can use for execution. The purpose of this config is to set aside memory for " +
"untracked data structures, as well as imprecise size estimation during memory " +
"acquisition. Default value is 0.7.")
"acquisition.")
.doubleConf
.createWithDefault(0.7)

val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] = conf(
"spark.comet.parquet.enable.directBuffer")
.doc("Whether to use Java direct byte buffer when reading Parquet. By default, this is false")
.booleanConf
.createWithDefault(false)
val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] =
conf("spark.comet.parquet.enable.directBuffer")
.doc("Whether to use Java direct byte buffer when reading Parquet.")
.booleanConf
.createWithDefault(false)

val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.preFetch.enabled")
.doc("Whether to enable pre-fetching feature of CometScan. By default is disabled.")
.doc("Whether to enable pre-fetching feature of CometScan.")
.booleanConf
.createWithDefault(false)

val COMET_SCAN_PREFETCH_THREAD_NUM: ConfigEntry[Int] =
conf("spark.comet.scan.preFetch.threadNum")
.doc(
"The number of threads running pre-fetching for CometScan. Effective if " +
s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. By default it is 2. Note that more " +
s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. Note that more " +
"pre-fetching threads means more memory requirement to store pre-fetched row groups.")
.intConf
.createWithDefault(2)
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ The following cast operations are generally compatible with Spark except for the
| decimal | long | |
| decimal | float | |
| decimal | double | |
| decimal | string | There can be formatting differences in some case due to Spark using scientific notation where Comet does not |
| string | boolean | |
| string | byte | |
| string | short | |
Expand Down
Loading

0 comments on commit 16033d9

Please sign in to comment.