diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 06fe8c34ca4a5..265750c2c78c0 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -307,8 +307,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { } wExpression.windowFunction match { - case _: RowNumber | _: AggregateExpression | _: Rank | _: DenseRank | _: PercentRank | - _: NTile => + case _: RowNumber | _: AggregateExpression | _: Rank | _: DenseRank | _: NTile => allSupported = allSupported case l: Lag => checkLagOrLead(l.third) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala index 0efc1414ce338..912a5c95de631 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala @@ -978,7 +978,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr compareResultsAgainstVanillaSpark(sql, true, { _ => }) } - test("window percent_rank") { + ignore("window percent_rank") { val sql = """ |select n_regionkey, n_nationkey, diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 0eb6126876b55..33efdbc5ec93d 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -66,6 +66,7 @@ object VeloxBackendSettings extends BackendSettingsApi { val GLUTEN_VELOX_UDF_LIB_PATHS = getBackendConfigPrefix() + ".udfLibraryPaths" val GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS = getBackendConfigPrefix() + ".driver.udfLibraryPaths" val GLUTEN_VELOX_INTERNAL_UDF_LIB_PATHS = getBackendConfigPrefix() + ".internal.udfLibraryPaths" + val GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION = getBackendConfigPrefix() + ".udfAllowTypeConversion" val MAXIMUM_BATCH_SIZE: Int = 32768 diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala index 99f9faf9914a0..a2b6d5259a11e 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala @@ -27,11 +27,12 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExpressionInfo, Unevaluable} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, Expression, ExpressionInfo, Unevaluable} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.util.Utils @@ -74,18 +75,21 @@ trait UDFSignatureBase { val expressionType: ExpressionType val children: Seq[DataType] val variableArity: Boolean + val allowTypeConversion: Boolean } case class UDFSignature( expressionType: ExpressionType, children: Seq[DataType], - variableArity: Boolean) + variableArity: Boolean, + allowTypeConversion: Boolean) extends UDFSignatureBase case class UDAFSignature( expressionType: ExpressionType, children: Seq[DataType], variableArity: Boolean, + allowTypeConversion: Boolean, intermediateAttrs: Seq[AttributeReference]) extends UDFSignatureBase @@ -130,26 +134,30 @@ object UDFResolver extends Logging { name: String, returnType: Array[Byte], argTypes: Array[Byte], - variableArity: Boolean): Unit = { + variableArity: Boolean, + allowTypeConversion: Boolean): Unit = { registerUDF( name, ConverterUtils.parseFromBytes(returnType), ConverterUtils.parseFromBytes(argTypes), - variableArity) + variableArity, + allowTypeConversion) } private def registerUDF( name: String, returnType: ExpressionType, argTypes: ExpressionType, - variableArity: Boolean): Unit = { + variableArity: Boolean, + allowTypeConversion: Boolean): Unit = { assert(argTypes.dataType.isInstanceOf[StructType]) val v = UDFMap.getOrElseUpdate(name, mutable.MutableList[UDFSignature]()) v += UDFSignature( returnType, argTypes.dataType.asInstanceOf[StructType].fields.map(_.dataType), - variableArity) + variableArity, + allowTypeConversion) UDFNames += name logInfo(s"Registered UDF: $name($argTypes) -> $returnType") } @@ -159,13 +167,15 @@ object UDFResolver extends Logging { returnType: Array[Byte], argTypes: Array[Byte], intermediateTypes: Array[Byte], - variableArity: Boolean): Unit = { + variableArity: Boolean, + enableTypeConversion: Boolean): Unit = { registerUDAF( name, ConverterUtils.parseFromBytes(returnType), ConverterUtils.parseFromBytes(argTypes), ConverterUtils.parseFromBytes(intermediateTypes), - variableArity + variableArity, + enableTypeConversion ) } @@ -174,7 +184,8 @@ object UDFResolver extends Logging { returnType: ExpressionType, argTypes: ExpressionType, intermediateTypes: ExpressionType, - variableArity: Boolean): Unit = { + variableArity: Boolean, + allowTypeConversion: Boolean): Unit = { assert(argTypes.dataType.isInstanceOf[StructType]) val aggBufferAttributes: Seq[AttributeReference] = @@ -194,6 +205,7 @@ object UDFResolver extends Logging { returnType, argTypes.dataType.asInstanceOf[StructType].fields.map(_.dataType), variableArity, + allowTypeConversion, aggBufferAttributes) UDAFNames += name logInfo(s"Registered UDAF: $name($argTypes) -> $returnType") @@ -346,16 +358,27 @@ object UDFResolver extends Logging { } } + private def checkAllowTypeConversion: Boolean = { + SQLConf.get + .getConfString(VeloxBackendSettings.GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION, "false") + .toBoolean + } + private def getUdfExpression(name: String)(children: Seq[Expression]) = { def errorMessage: String = s"UDF $name -> ${children.map(_.dataType.simpleString).mkString(", ")} is not registered." + val allowTypeConversion = checkAllowTypeConversion val signatures = UDFMap.getOrElse(name, throw new UnsupportedOperationException(errorMessage)); - - signatures.find(sig => tryBind(sig, children.map(_.dataType))) match { + signatures.find(sig => tryBind(sig, children.map(_.dataType), allowTypeConversion)) match { case Some(sig) => - UDFExpression(name, sig.expressionType.dataType, sig.expressionType.nullable, children) + UDFExpression( + name, + sig.expressionType.dataType, + sig.expressionType.nullable, + if (!allowTypeConversion && !sig.allowTypeConversion) children + else applyCast(children, sig)) case None => throw new UnsupportedOperationException(errorMessage) } @@ -365,50 +388,77 @@ object UDFResolver extends Logging { def errorMessage: String = s"UDAF $name -> ${children.map(_.dataType.simpleString).mkString(", ")} is not registered." + val allowTypeConversion = checkAllowTypeConversion val signatures = UDAFMap.getOrElse( name, throw new UnsupportedOperationException(errorMessage) ) - - signatures.find(sig => tryBind(sig, children.map(_.dataType))) match { + signatures.find(sig => tryBind(sig, children.map(_.dataType), allowTypeConversion)) match { case Some(sig) => UserDefinedAggregateFunction( name, sig.expressionType.dataType, sig.expressionType.nullable, - children, - sig.intermediateAttrs) + if (!allowTypeConversion && !sig.allowTypeConversion) children + else applyCast(children, sig), + sig.intermediateAttrs + ) case None => throw new UnsupportedOperationException(errorMessage) } } + private def tryBind( + sig: UDFSignatureBase, + requiredDataTypes: Seq[DataType], + allowTypeConversion: Boolean): Boolean = { + if ( + !tryBindStrict(sig, requiredDataTypes) && (allowTypeConversion || sig.allowTypeConversion) + ) { + tryBindWithTypeConversion(sig, requiredDataTypes) + } else { + true + } + } + // Returns true if required data types match the function signature. // If the function signature is variable arity, the number of the last argument can be zero // or more. - private def tryBind(sig: UDFSignatureBase, requiredDataTypes: Seq[DataType]): Boolean = { + private def tryBindWithTypeConversion( + sig: UDFSignatureBase, + requiredDataTypes: Seq[DataType]): Boolean = { + tryBind0(sig, requiredDataTypes, Cast.canCast) + } + + private def tryBindStrict(sig: UDFSignatureBase, requiredDataTypes: Seq[DataType]): Boolean = { + tryBind0(sig, requiredDataTypes, DataTypeUtils.sameType) + } + + private def tryBind0( + sig: UDFSignatureBase, + requiredDataTypes: Seq[DataType], + checkType: (DataType, DataType) => Boolean): Boolean = { if (!sig.variableArity) { sig.children.size == requiredDataTypes.size && - sig.children - .zip(requiredDataTypes) - .forall { case (candidate, required) => DataTypeUtils.sameType(candidate, required) } + requiredDataTypes + .zip(sig.children) + .forall { case (required, candidate) => checkType(required, candidate) } } else { // If variableArity is true, there must be at least one argument in the signature. if (requiredDataTypes.size < sig.children.size - 1) { false } else if (requiredDataTypes.size == sig.children.size - 1) { - sig.children - .dropRight(1) - .zip(requiredDataTypes) - .forall { case (candidate, required) => DataTypeUtils.sameType(candidate, required) } + requiredDataTypes + .zip(sig.children.dropRight(1)) + .forall { case (required, candidate) => checkType(required, candidate) } } else { val varArgStartIndex = sig.children.size - 1 // First check all var args has the same type with the last argument of the signature. if ( !requiredDataTypes .drop(varArgStartIndex) - .forall(argType => DataTypeUtils.sameType(sig.children.last, argType)) + .forall(argType => checkType(argType, sig.children.last)) ) { false } else if (varArgStartIndex == 0) { @@ -416,11 +466,38 @@ object UDFResolver extends Logging { true } else { // Whether fixed args matches. - sig.children - .dropRight(1) - .zip(requiredDataTypes.dropRight(1 + requiredDataTypes.size - sig.children.size)) - .forall { case (candidate, required) => DataTypeUtils.sameType(candidate, required) } + requiredDataTypes + .dropRight(1 + requiredDataTypes.size - sig.children.size) + .zip(sig.children.dropRight(1)) + .forall { case (required, candidate) => checkType(required, candidate) } + } + } + } + } + + private def applyCast(children: Seq[Expression], sig: UDFSignatureBase): Seq[Expression] = { + def maybeCast(expr: Expression, toType: DataType): Expression = { + if (!expr.dataType.sameType(toType)) { + Cast(expr, toType) + } else { + expr + } + } + + if (!sig.variableArity) { + children.zip(sig.children).map { case (expr, toType) => maybeCast(expr, toType) } + } else { + val fixedArgs = Math.min(children.size, sig.children.size) + val newChildren = children.take(fixedArgs).zip(sig.children.take(fixedArgs)).map { + case (expr, toType) => maybeCast(expr, toType) + } + if (children.size > sig.children.size) { + val varArgType = sig.children.last + newChildren ++ children.takeRight(children.size - sig.children.size).map { + expr => maybeCast(expr, varArgType) } + } else { + newChildren } } } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala index 5ca5087d9ef4a..2a26719269942 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala @@ -900,12 +900,11 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla test("combine small batches before shuffle") { val minBatchSize = 15 - val maxBatchSize = 100 withSQLConf( "spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput" -> "true", "spark.gluten.sql.columnar.maxBatchSize" -> "2", - "spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range" -> - s"$minBatchSize~$maxBatchSize" + "spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize" -> + s"$minBatchSize" ) { val df = runQueryAndCompare( "select l_orderkey, sum(l_partkey) as sum from lineitem " + @@ -921,16 +920,10 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla assert(metrics("numOutputRows").value == 27) assert(metrics("numOutputBatches").value == 2) } - } - test("split small batches before shuffle") { - val minBatchSize = 1 - val maxBatchSize = 4 withSQLConf( "spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput" -> "true", - "spark.gluten.sql.columnar.maxBatchSize" -> "100", - "spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range" -> - s"$minBatchSize~$maxBatchSize" + "spark.gluten.sql.columnar.maxBatchSize" -> "2" ) { val df = runQueryAndCompare( "select l_orderkey, sum(l_partkey) as sum from lineitem " + @@ -939,12 +932,12 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla val ops = collect(df.queryExecution.executedPlan) { case p: VeloxResizeBatchesExec => p } assert(ops.size == 1) val op = ops.head - assert(op.minOutputBatchSize == minBatchSize) + assert(op.minOutputBatchSize == 1) val metrics = op.metrics assert(metrics("numInputRows").value == 27) - assert(metrics("numInputBatches").value == 1) + assert(metrics("numInputBatches").value == 14) assert(metrics("numOutputRows").value == 27) - assert(metrics("numOutputBatches").value == 7) + assert(metrics("numOutputBatches").value == 14) } } diff --git a/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala index 534a8d9f1c74d..008337b9400ed 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.gluten.expression +import org.apache.gluten.backendsapi.velox.VeloxBackendSettings import org.apache.gluten.tags.{SkipTestTags, UDFTest} import org.apache.spark.SparkConf @@ -88,6 +89,23 @@ abstract class VeloxUdfSuite extends GlutenQueryTest with SQLHelper { .sameElements(Array(Row(105L, 6, 6L, 5, 6, 11, 6L, 11L, Date.valueOf("2024-03-30"))))) } + test("test udf allow type conversion") { + withSQLConf(VeloxBackendSettings.GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION -> "true") { + val df = spark.sql("""select myudf1("100"), myudf1(1), mydate('2024-03-25', 5)""") + assert( + df.collect() + .sameElements(Array(Row(105L, 6L, Date.valueOf("2024-03-30"))))) + } + + withSQLConf(VeloxBackendSettings.GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION -> "false") { + assert( + spark + .sql("select mydate2('2024-03-25', 5)") + .collect() + .sameElements(Array(Row(Date.valueOf("2024-03-30"))))) + } + } + test("test udaf") { val df = spark.sql("""select | myavg(1), @@ -101,6 +119,15 @@ abstract class VeloxUdfSuite extends GlutenQueryTest with SQLHelper { df.collect() .sameElements(Array(Row(1.0, 1.0, 1.0, 1.0, 1L)))) } + + test("test udaf allow type conversion") { + withSQLConf(VeloxBackendSettings.GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION -> "true") { + val df = spark.sql("""select myavg("1"), myavg("1.0"), mycount_if("true")""") + assert( + df.collect() + .sameElements(Array(Row(1.0, 1.0, 1L)))) + } + } } @UDFTest diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index fc5d758f8c8b2..031be791bdf01 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -609,11 +609,23 @@ arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr< if (lastEvictPid_ != -1 && partitionId < lastEvictPid_) { RETURN_NOT_OK(finishSpill(true)); + lastEvictPid_ = -1; } - lastEvictPid_ = partitionId; - RETURN_NOT_OK(requestSpill(stop)); - RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload))); + + if (!stop) { + RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload))); + } else { + if (spills_.size() > 0) { + for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) { + auto bytesEvicted = totalBytesEvicted_; + RETURN_NOT_OK(mergeSpills(pid)); + partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted; + } + } + RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload))); + } + lastEvictPid_ = partitionId; return arrow::Status::OK(); } diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc index daeef24ce5a6d..b8d8274cb782a 100644 --- a/cpp/core/shuffle/Payload.cc +++ b/cpp/core/shuffle/Payload.cc @@ -293,7 +293,6 @@ arrow::Result> BlockPayload::readBufferAt(uint32_ arrow::Result>> BlockPayload::deserialize( arrow::io::InputStream* inputStream, - const std::shared_ptr& schema, const std::shared_ptr& codec, arrow::MemoryPool* pool, uint32_t& numRows, diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h index 4c53065a6ed94..0a317d9c3af95 100644 --- a/cpp/core/shuffle/Payload.h +++ b/cpp/core/shuffle/Payload.h @@ -88,7 +88,6 @@ class BlockPayload final : public Payload { static arrow::Result>> deserialize( arrow::io::InputStream* inputStream, - const std::shared_ptr& schema, const std::shared_ptr& codec, arrow::MemoryPool* pool, uint32_t& numRows, diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index a7776f9f1074b..4b46dbd599411 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -30,6 +30,7 @@ #include "compute/VeloxPlanConverter.h" #include "compute/VeloxRuntime.h" #include "config/GlutenConfig.h" +#include "config/VeloxConfig.h" #include "shuffle/LocalPartitionWriter.h" #include "shuffle/VeloxShuffleWriter.h" #include "shuffle/rss/RssPartitionWriter.h" @@ -44,22 +45,23 @@ using namespace gluten; namespace { +DEFINE_bool(run_example, false, "Run the example and exit."); DEFINE_bool(print_result, true, "Print result for execution"); DEFINE_string(save_output, "", "Path to parquet file for saving the task output iterator"); DEFINE_bool(with_shuffle, false, "Add shuffle split at end."); +DEFINE_bool(run_shuffle, false, "Only run shuffle write."); +DEFINE_bool(run_shuffle_read, false, "Whether to run shuffle read when run_shuffle is true."); +DEFINE_string(shuffle_writer, "hash", "Shuffle writer type. Can be hash or sort"); DEFINE_string( partitioning, "rr", "Short partitioning name. Valid options are rr, hash, range, single, random (only for test purpose)"); -DEFINE_string(shuffle_writer, "hash", "Shuffle writer type. Can be hash or sort"); DEFINE_bool(rss, false, "Mocking rss."); DEFINE_string( compression, "lz4", "Specify the compression codec. Valid options are lz4, zstd, qat_gzip, qat_zstd, iaa_gzip"); DEFINE_int32(shuffle_partitions, 200, "Number of shuffle split (reducer) partitions"); -DEFINE_bool(run_shuffle, false, "Only run shuffle write."); -DEFINE_bool(run_example, false, "Run the example and exit."); DEFINE_string(plan, "", "Path to input json file of the substrait plan."); DEFINE_string( @@ -76,15 +78,21 @@ DEFINE_string( "Scan mode for reading parquet data." "'stream' mode: Input file scan happens inside of the pipeline." "'buffered' mode: First read all data into memory and feed the pipeline with it."); +DEFINE_bool(debug_mode, false, "Whether to enable debug mode. Same as setting `spark.gluten.sql.debug`"); struct WriterMetrics { - int64_t splitTime; - int64_t evictTime; - int64_t writeTime; - int64_t compressTime; + int64_t splitTime{0}; + int64_t evictTime{0}; + int64_t writeTime{0}; + int64_t compressTime{0}; + + int64_t bytesSpilled{0}; + int64_t bytesWritten{0}; +}; - public: - explicit WriterMetrics() : splitTime(0), evictTime(0), writeTime(0), compressTime(0) {} +struct ReaderMetrics { + int64_t decompressTime{0}; + int64_t deserializeTime{0}; }; void setUpBenchmark(::benchmark::internal::Benchmark* bm) { @@ -98,9 +106,10 @@ void setUpBenchmark(::benchmark::internal::Benchmark* bm) { } } -std::shared_ptr -createShuffleWriter(Runtime* runtime, const std::string& dataFile, const std::vector& localDirs) { +PartitionWriterOptions createPartitionWriterOptions() { PartitionWriterOptions partitionWriterOptions{}; + // Disable writer's merge. + partitionWriterOptions.mergeThreshold = 0; // Configure compression. if (FLAGS_compression == "lz4") { @@ -121,27 +130,39 @@ createShuffleWriter(Runtime* runtime, const std::string& dataFile, const std::ve partitionWriterOptions.codecBackend = CodecBackend::IAA; partitionWriterOptions.compressionType = arrow::Compression::GZIP; } + return partitionWriterOptions; +} +std::unique_ptr createPartitionWriter( + Runtime* runtime, + PartitionWriterOptions options, + const std::string& dataFile, + const std::vector& localDirs) { std::unique_ptr partitionWriter; if (FLAGS_rss) { auto rssClient = std::make_unique(dataFile); partitionWriter = std::make_unique( FLAGS_shuffle_partitions, - std::move(partitionWriterOptions), + std::move(options), runtime->memoryManager()->getArrowMemoryPool(), std::move(rssClient)); } else { partitionWriter = std::make_unique( FLAGS_shuffle_partitions, - std::move(partitionWriterOptions), + std::move(options), runtime->memoryManager()->getArrowMemoryPool(), dataFile, localDirs); } + return partitionWriter; +} +std::shared_ptr createShuffleWriter( + Runtime* runtime, + std::unique_ptr partitionWriter) { auto options = ShuffleWriterOptions{}; options.partitioning = gluten::toPartitioning(FLAGS_partitioning); - if (FLAGS_rss) { + if (FLAGS_rss || FLAGS_shuffle_writer == "rss_sort") { options.shuffleWriterType = gluten::kRssSortShuffle; } else if (FLAGS_shuffle_writer == "sort") { options.shuffleWriterType = gluten::kSortShuffle; @@ -163,6 +184,8 @@ void populateWriterMetrics( if (splitTime > 0) { metrics.splitTime += splitTime; } + metrics.bytesWritten += shuffleWriter->totalBytesWritten(); + metrics.bytesSpilled += shuffleWriter->totalBytesEvicted(); } void setCpu(::benchmark::State& state) { @@ -171,7 +194,7 @@ void setCpu(::benchmark::State& state) { if (FLAGS_cpu != -1) { cpu += FLAGS_cpu; } - LOG(INFO) << "Setting CPU for thread " << state.thread_index() << " to " << cpu; + LOG(WARNING) << "Setting CPU for thread " << state.thread_index() << " to " << cpu; gluten::setCpu(cpu); } @@ -179,26 +202,56 @@ void runShuffle( Runtime* runtime, BenchmarkAllocationListener* listener, const std::shared_ptr& resultIter, - WriterMetrics& metrics) { + WriterMetrics& writerMetrics, + ReaderMetrics& readerMetrics, + bool readAfterWrite) { std::string dataFile; std::vector localDirs; bool isFromEnv; GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(dataFile, localDirs, isFromEnv)); - auto shuffleWriter = createShuffleWriter(runtime, dataFile, localDirs); + auto partitionWriterOptions = createPartitionWriterOptions(); + auto partitionWriter = createPartitionWriter(runtime, partitionWriterOptions, dataFile, localDirs); + auto shuffleWriter = createShuffleWriter(runtime, std::move(partitionWriter)); listener->setShuffleWriter(shuffleWriter.get()); int64_t totalTime = 0; + std::shared_ptr cSchema; { gluten::ScopedTimer timer(&totalTime); while (resultIter->hasNext()) { - GLUTEN_THROW_NOT_OK( - shuffleWriter->write(resultIter->next(), ShuffleWriter::kMaxMemLimit - shuffleWriter->cachedPayloadSize())); + auto cb = resultIter->next(); + if (!cSchema) { + cSchema = cb->exportArrowSchema(); + } + GLUTEN_THROW_NOT_OK(shuffleWriter->write(cb, ShuffleWriter::kMaxMemLimit - shuffleWriter->cachedPayloadSize())); } GLUTEN_THROW_NOT_OK(shuffleWriter->stop()); } - populateWriterMetrics(shuffleWriter, totalTime, metrics); + populateWriterMetrics(shuffleWriter, totalTime, writerMetrics); + + if (readAfterWrite && cSchema) { + auto readerOptions = ShuffleReaderOptions{}; + readerOptions.shuffleWriterType = shuffleWriter->options().shuffleWriterType; + readerOptions.compressionType = partitionWriterOptions.compressionType; + readerOptions.codecBackend = partitionWriterOptions.codecBackend; + readerOptions.compressionTypeStr = partitionWriterOptions.compressionTypeStr; + + std::shared_ptr schema = + gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast(cSchema.get()))); + auto reader = runtime->createShuffleReader(schema, readerOptions); + + GLUTEN_ASSIGN_OR_THROW(auto in, arrow::io::ReadableFile::Open(dataFile)); + // Read all partitions. + auto iter = reader->readStream(in); + while (iter->hasNext()) { + // Read and discard. + auto cb = iter->next(); + } + readerMetrics.decompressTime = reader->getDecompressTime(); + readerMetrics.deserializeTime = reader->getDeserializeTime(); + } // Cleanup shuffle outputs cleanupShuffleOutput(dataFile, localDirs, isFromEnv); } @@ -207,20 +260,37 @@ void updateBenchmarkMetrics( ::benchmark::State& state, const int64_t& elapsedTime, const int64_t& readInputTime, - const WriterMetrics& writerMetrics) { + const WriterMetrics& writerMetrics, + const ReaderMetrics& readerMetrics) { state.counters["read_input_time"] = benchmark::Counter(readInputTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); state.counters["elapsed_time"] = benchmark::Counter(elapsedTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); - state.counters["shuffle_write_time"] = benchmark::Counter( - writerMetrics.writeTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); - state.counters["shuffle_spill_time"] = benchmark::Counter( - writerMetrics.evictTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); - state.counters["shuffle_split_time"] = benchmark::Counter( - writerMetrics.splitTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); - state.counters["shuffle_compress_time"] = benchmark::Counter( - writerMetrics.compressTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + if (FLAGS_run_shuffle || FLAGS_with_shuffle) { + state.counters["shuffle_write_time"] = benchmark::Counter( + writerMetrics.writeTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + state.counters["shuffle_spill_time"] = benchmark::Counter( + writerMetrics.evictTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + state.counters["shuffle_compress_time"] = benchmark::Counter( + writerMetrics.compressTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + state.counters["shuffle_decompress_time"] = benchmark::Counter( + readerMetrics.decompressTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + state.counters["shuffle_deserialize_time"] = benchmark::Counter( + readerMetrics.deserializeTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + + auto splitTime = writerMetrics.splitTime; + if (FLAGS_scan_mode == "stream") { + splitTime -= readInputTime; + } + state.counters["shuffle_split_time"] = + benchmark::Counter(splitTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + + state.counters["shuffle_spilled_bytes"] = benchmark::Counter( + writerMetrics.bytesSpilled, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1024); + state.counters["shuffle_write_bytes"] = benchmark::Counter( + writerMetrics.bytesWritten, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1024); + } } } // namespace @@ -246,6 +316,7 @@ auto BM_Generic = [](::benchmark::State& state, } WriterMetrics writerMetrics{}; + ReaderMetrics readerMetrics{}; int64_t readInputTime = 0; int64_t elapsedTime = 0; @@ -275,7 +346,7 @@ auto BM_Generic = [](::benchmark::State& state, listenerPtr->setIterator(resultIter.get()); if (FLAGS_with_shuffle) { - runShuffle(runtime, listenerPtr, resultIter, writerMetrics); + runShuffle(runtime, listenerPtr, resultIter, writerMetrics, readerMetrics, false); } else { // May write the output into file. auto veloxPlan = dynamic_cast(runtime)->getVeloxPlan(); @@ -299,7 +370,7 @@ auto BM_Generic = [](::benchmark::State& state, return; } if (FLAGS_print_result) { - LOG(INFO) << maybeBatch.ValueOrDie()->ToString(); + LOG(WARNING) << maybeBatch.ValueOrDie()->ToString(); } if (!FLAGS_save_output.empty()) { GLUTEN_THROW_NOT_OK(writer.writeInBatches(maybeBatch.ValueOrDie())); @@ -322,18 +393,18 @@ auto BM_Generic = [](::benchmark::State& state, const auto* task = rawIter->task(); const auto* planNode = rawIter->veloxPlan(); auto statsStr = facebook::velox::exec::printPlanWithStats(*planNode, task->taskStats(), true); - LOG(INFO) << statsStr; + LOG(WARNING) << statsStr; } } - updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics); + updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics, readerMetrics); Runtime::release(runtime); }; -auto BM_ShuffleWrite = [](::benchmark::State& state, - const std::string& inputFile, - RuntimeFactory runtimeFactory, - FileReaderType readerType) { +auto BM_ShuffleWriteRead = [](::benchmark::State& state, + const std::string& inputFile, + RuntimeFactory runtimeFactory, + FileReaderType readerType) { setCpu(state); auto listener = std::make_unique(FLAGS_memory_limit); @@ -341,31 +412,48 @@ auto BM_ShuffleWrite = [](::benchmark::State& state, auto runtime = runtimeFactory(std::move(listener)); WriterMetrics writerMetrics{}; + ReaderMetrics readerMetrics{}; int64_t readInputTime = 0; int64_t elapsedTime = 0; { ScopedTimer timer(&elapsedTime); for (auto _ : state) { auto resultIter = getInputIteratorFromFileReader(inputFile, readerType); - runShuffle(runtime, listenerPtr, resultIter, writerMetrics); + runShuffle(runtime, listenerPtr, resultIter, writerMetrics, readerMetrics, FLAGS_run_shuffle_read); auto reader = static_cast(resultIter->getInputIter()); readInputTime += reader->getCollectBatchTime(); } } - updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics); + updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics, readerMetrics); Runtime::release(runtime); }; int main(int argc, char** argv) { - ::benchmark::Initialize(&argc, argv); gflags::ParseCommandLineFlags(&argc, &argv, true); + std::ostringstream ss; + ss << "Setting flags from command line args: " << std::endl; + std::vector flags; + google::GetAllFlags(&flags); + auto filename = std::filesystem::path(__FILE__).filename(); + for (const auto& flag : flags) { + if (std::filesystem::path(flag.filename).filename() == filename) { + ss << " FLAGS_" << flag.name << ": default = " << flag.default_value << ", current = " << flag.current_value + << std::endl; + } + } + LOG(WARNING) << ss.str(); + + ::benchmark::Initialize(&argc, argv); + // Init Velox backend. auto backendConf = gluten::defaultConf(); auto sessionConf = gluten::defaultConf(); - backendConf.insert({gluten::kSparkBatchSize, std::to_string(FLAGS_batch_size)}); + backendConf.insert({gluten::kDebugModeEnabled, std::to_string(FLAGS_debug_mode)}); + backendConf.insert({gluten::kGlogVerboseLevel, std::to_string(FLAGS_v)}); + backendConf.insert({gluten::kGlogSeverityLevel, std::to_string(FLAGS_minloglevel)}); if (!FLAGS_conf.empty()) { abortIfFileNotExists(FLAGS_conf); std::ifstream file(FLAGS_conf); @@ -425,7 +513,7 @@ int main(int argc, char** argv) { std::vector dataFiles{}; if (FLAGS_run_example) { - LOG(INFO) << "Running example..."; + LOG(WARNING) << "Running example..."; dataFiles.resize(2); try { substraitJsonFile = getGeneratedFilePath("example.json"); @@ -484,33 +572,23 @@ int main(int argc, char** argv) { if (!errorMsg.empty()) { LOG(ERROR) << "Incorrect usage: " << errorMsg << std::endl - << "If simulating a first stage, the usage is:" << std::endl - << "./generic_benchmark " - << "--plan /absolute-path/to/substrait_json_file " - << "--split /absolute-path/to/split_json_file_1,/abosolute-path/to/split_json_file_2,..." - << "--data /absolute-path/to/data_file_1,/absolute-path/to/data_file_2,..." << std::endl - << "If simulating a middle stage, the usage is:" << std::endl - << "./generic_benchmark " - << "--plan /absolute-path/to/substrait_json_file " - << "--data /absolute-path/to/data_file_1,/absolute-path/to/data_file_2,..."; - LOG(ERROR) << "*** Please check docs/developers/MicroBenchmarks.md for the full usage. ***"; + << "*** Please check docs/developers/MicroBenchmarks.md for the full usage. ***"; ::benchmark::Shutdown(); std::exit(EXIT_FAILURE); } } - // Check whether input files exist. - LOG(INFO) << "Using substrait json file: " << std::endl << substraitJsonFile; + LOG(WARNING) << "Using substrait json file: " << std::endl << substraitJsonFile; if (!splitFiles.empty()) { - LOG(INFO) << "Using " << splitFiles.size() << " input split file(s): "; + LOG(WARNING) << "Using " << splitFiles.size() << " input split file(s): "; for (const auto& splitFile : splitFiles) { - LOG(INFO) << splitFile; + LOG(WARNING) << splitFile; } } if (!dataFiles.empty()) { - LOG(INFO) << "Using " << dataFiles.size() << " input data file(s): "; + LOG(WARNING) << "Using " << dataFiles.size() << " input data file(s): "; for (const auto& dataFile : dataFiles) { - LOG(INFO) << dataFile; + LOG(WARNING) << dataFile; } } @@ -528,37 +606,28 @@ int main(int argc, char** argv) { setUpBenchmark(bm); \ } while (0) -#define SHUFFLE_WRITE_BENCHMARK(READER_TYPE) \ - do { \ - auto* bm = \ - ::benchmark::RegisterBenchmark("ShuffleWrite", BM_ShuffleWrite, dataFiles[0], runtimeFactory, READER_TYPE) \ - ->MeasureProcessCPUTime() \ - ->UseRealTime(); \ - setUpBenchmark(bm); \ +#define SHUFFLE_WRITE_READ_BENCHMARK(READER_TYPE) \ + do { \ + auto* bm = ::benchmark::RegisterBenchmark( \ + "ShuffleWriteRead", BM_ShuffleWriteRead, dataFiles[0], runtimeFactory, READER_TYPE) \ + ->MeasureProcessCPUTime() \ + ->UseRealTime(); \ + setUpBenchmark(bm); \ } while (0) - LOG(INFO) << "Using options: "; - LOG(INFO) << "threads: " << FLAGS_threads; - LOG(INFO) << "iterations: " << FLAGS_iterations; - LOG(INFO) << "cpu: " << FLAGS_cpu; - LOG(INFO) << "print_result: " << FLAGS_print_result; - LOG(INFO) << "save_output: " << FLAGS_save_output; - LOG(INFO) << "batch_size: " << FLAGS_batch_size; - LOG(INFO) << "write_path: " << FLAGS_write_path; - if (dataFiles.empty()) { GENERIC_BENCHMARK(FileReaderType::kNone); } else { FileReaderType readerType; if (FLAGS_scan_mode == "buffered") { readerType = FileReaderType::kBuffered; - LOG(INFO) << "Using buffered mode for reading parquet data."; + LOG(WARNING) << "Using buffered mode for reading parquet data."; } else { readerType = FileReaderType::kStream; - LOG(INFO) << "Using stream mode for reading parquet data."; + LOG(WARNING) << "Using stream mode for reading parquet data."; } if (FLAGS_run_shuffle) { - SHUFFLE_WRITE_BENCHMARK(readerType); + SHUFFLE_WRITE_READ_BENCHMARK(readerType); } else { GENERIC_BENCHMARK(readerType); } diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.cc b/cpp/velox/benchmarks/common/BenchmarkUtils.cc index c3baa2f339151..345f9da8e16d1 100644 --- a/cpp/velox/benchmarks/common/BenchmarkUtils.cc +++ b/cpp/velox/benchmarks/common/BenchmarkUtils.cc @@ -159,7 +159,11 @@ setLocalDirsAndDataFileFromEnv(std::string& dataFile, std::vector& // Set local dirs. auto joinedDirs = std::string(joinedDirsC); // Split local dirs and use thread id to choose one directory for data file. - localDirs = gluten::splitPaths(joinedDirs); + auto dirs = gluten::splitPaths(joinedDirs); + for (const auto& dir : dirs) { + localDirs.push_back(arrow::fs::internal::ConcatAbstractPath(dir, "temp_shuffle_" + generateUuid())); + std::filesystem::create_directory(localDirs.back()); + } size_t id = std::hash{}(std::this_thread::get_id()) % localDirs.size(); ARROW_ASSIGN_OR_RAISE(dataFile, gluten::createTempShuffleFile(localDirs[id])); } else { diff --git a/cpp/velox/jni/JniUdf.cc b/cpp/velox/jni/JniUdf.cc index cab90b325fe50..8230724f12602 100644 --- a/cpp/velox/jni/JniUdf.cc +++ b/cpp/velox/jni/JniUdf.cc @@ -41,8 +41,8 @@ void gluten::initVeloxJniUDF(JNIEnv* env) { udfResolverClass = createGlobalClassReferenceOrError(env, kUdfResolverClassPath.c_str()); // methods - registerUDFMethod = getMethodIdOrError(env, udfResolverClass, "registerUDF", "(Ljava/lang/String;[B[BZ)V"); - registerUDAFMethod = getMethodIdOrError(env, udfResolverClass, "registerUDAF", "(Ljava/lang/String;[B[B[BZ)V"); + registerUDFMethod = getMethodIdOrError(env, udfResolverClass, "registerUDF", "(Ljava/lang/String;[B[BZZ)V"); + registerUDAFMethod = getMethodIdOrError(env, udfResolverClass, "registerUDAF", "(Ljava/lang/String;[B[B[BZZ)V"); } void gluten::finalizeVeloxJniUDF(JNIEnv* env) { @@ -71,9 +71,23 @@ void gluten::jniGetFunctionSignatures(JNIEnv* env) { signature->intermediateType.length(), reinterpret_cast(signature->intermediateType.c_str())); env->CallVoidMethod( - instance, registerUDAFMethod, name, returnType, argTypes, intermediateType, signature->variableArity); + instance, + registerUDAFMethod, + name, + returnType, + argTypes, + intermediateType, + signature->variableArity, + signature->allowTypeConversion); } else { - env->CallVoidMethod(instance, registerUDFMethod, name, returnType, argTypes, signature->variableArity); + env->CallVoidMethod( + instance, + registerUDFMethod, + name, + returnType, + argTypes, + signature->variableArity, + signature->allowTypeConversion); } checkException(env); } diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index e165d4a91da8f..5c6c4470b2b56 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -1404,7 +1404,7 @@ arrow::Result VeloxHashShuffleWriter::partitionBufferSizeAfterShrink(u arrow::Status VeloxHashShuffleWriter::preAllocPartitionBuffers(uint32_t preAllocBufferSize) { for (auto& pid : partitionUsed_) { auto newSize = std::max(preAllocBufferSize, partition2RowCount_[pid]); - DLOG_IF(INFO, partitionBufferSize_[pid] != newSize) + LOG_IF(WARNING, partitionBufferSize_[pid] != newSize) << "Actual partition buffer size - current: " << partitionBufferSize_[pid] << ", newSize: " << newSize << std::endl; // Make sure the size to be allocated is larger than the size to be filled. diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index ab93d9a33d04a..4d002499c9afe 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -312,8 +312,7 @@ std::shared_ptr VeloxHashShuffleReaderDeserializer::next() { if (hasComplexType_) { uint32_t numRows; GLUTEN_ASSIGN_OR_THROW( - auto arrowBuffers, - BlockPayload::deserialize(in_.get(), schema_, codec_, memoryPool_, numRows, decompressTime_)); + auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows, decompressTime_)); if (numRows == 0) { // Reach EOS. return nullptr; @@ -332,7 +331,7 @@ std::shared_ptr VeloxHashShuffleReaderDeserializer::next() { uint32_t numRows = 0; while (!merged_ || merged_->numRows() < batchSize_) { GLUTEN_ASSIGN_OR_THROW( - arrowBuffers, BlockPayload::deserialize(in_.get(), schema_, codec_, memoryPool_, numRows, decompressTime_)); + arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows, decompressTime_)); if (numRows == 0) { reachEos_ = true; break; @@ -401,7 +400,7 @@ std::shared_ptr VeloxSortShuffleReaderDeserializer::next() { while (cachedRows_ < batchSize_) { uint32_t numRows; GLUTEN_ASSIGN_OR_THROW( - auto arrowBuffers, BlockPayload::deserialize(in_.get(), schema_, codec_, arrowPool_, numRows, decompressTime_)); + auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, arrowPool_, numRows, decompressTime_)); if (numRows == 0) { reachEos_ = true; diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt index f7bc1cb13ee79..b8ea12e944aae 100644 --- a/cpp/velox/tests/CMakeLists.txt +++ b/cpp/velox/tests/CMakeLists.txt @@ -39,9 +39,13 @@ set(VELOX_TEST_COMMON_SRCS JsonToProtoConverter.cc FilePathGenerator.cc) add_velox_test(velox_shuffle_writer_test SOURCES VeloxShuffleWriterTest.cc) # TODO: ORC is not well supported. add_velox_test(orc_test SOURCES OrcTest.cc) add_velox_test( - velox_operators_test SOURCES VeloxColumnarToRowTest.cc - VeloxRowToColumnarTest.cc VeloxColumnarBatchSerializerTest.cc - VeloxColumnarBatchTest.cc) + velox_operators_test + SOURCES + VeloxColumnarToRowTest.cc + VeloxRowToColumnarTest.cc + VeloxColumnarBatchSerializerTest.cc + VeloxColumnarBatchTest.cc + VeloxBatchResizerTest.cc) add_velox_test( velox_plan_conversion_test SOURCES diff --git a/cpp/velox/tests/VeloxBatchResizerTest.cc b/cpp/velox/tests/VeloxBatchResizerTest.cc new file mode 100644 index 0000000000000..aecd52f927cc8 --- /dev/null +++ b/cpp/velox/tests/VeloxBatchResizerTest.cc @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "utils/VeloxBatchResizer.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; + +namespace gluten { +class ColumnarBatchArray : public ColumnarBatchIterator { + public: + explicit ColumnarBatchArray(const std::vector> batches) + : batches_(std::move(batches)) {} + + std::shared_ptr next() override { + if (cursor_ >= batches_.size()) { + return nullptr; + } + return batches_[cursor_++]; + } + + private: + const std::vector> batches_; + int32_t cursor_ = 0; +}; + +class VeloxBatchResizerTest : public ::testing::Test, public test::VectorTestBase { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + } + + RowVectorPtr newVector(size_t numRows) { + auto constant = makeConstant(1, numRows); + auto out = + std::make_shared(pool(), ROW({INTEGER()}), nullptr, numRows, std::vector{constant}); + return out; + } + + void checkResize(int32_t min, int32_t max, std::vector inSizes, std::vector outSizes) { + auto inBatches = std::vector>(); + for (const auto& size : inSizes) { + inBatches.push_back(std::make_shared(newVector(size))); + } + VeloxBatchResizer resizer(pool(), min, max, std::make_unique(std::move(inBatches))); + auto actualOutSizes = std::vector(); + while (true) { + auto next = resizer.next(); + if (next == nullptr) { + break; + } + actualOutSizes.push_back(next->numRows()); + } + ASSERT_EQ(actualOutSizes, outSizes); + } +}; + +TEST_F(VeloxBatchResizerTest, sanity) { + checkResize(100, std::numeric_limits::max(), {30, 50, 30, 40, 30}, {110, 70}); + checkResize(1, 40, {10, 20, 50, 30, 40, 30}, {10, 20, 40, 10, 30, 40, 30}); + checkResize(1, 39, {10, 20, 50, 30, 40, 30}, {10, 20, 39, 11, 30, 39, 1, 30}); + checkResize(40, 40, {10, 20, 50, 30, 40, 30}, {30, 40, 10, 30, 40, 30}); + checkResize(39, 39, {10, 20, 50, 30, 40, 30}, {30, 39, 11, 30, 39, 1, 30}); + checkResize(100, 200, {5, 900, 50}, {5, 200, 200, 200, 200, 100, 50}); + checkResize(100, 200, {5, 900, 30, 80}, {5, 200, 200, 200, 200, 100, 110}); + checkResize(100, 200, {5, 900, 700}, {5, 200, 200, 200, 200, 100, 200, 200, 200, 100}); + ASSERT_ANY_THROW(checkResize(0, 0, {}, {})); +} +} // namespace gluten diff --git a/cpp/velox/udf/Udaf.h b/cpp/velox/udf/Udaf.h index 2f292fbc6cb34..4555bdfdf8a34 100644 --- a/cpp/velox/udf/Udaf.h +++ b/cpp/velox/udf/Udaf.h @@ -28,6 +28,7 @@ struct UdafEntry { const char* intermediateType{nullptr}; bool variableArity{false}; + bool allowTypeConversion{false}; }; #define GLUTEN_GET_NUM_UDAF getNumUdaf diff --git a/cpp/velox/udf/Udf.h b/cpp/velox/udf/Udf.h index a32bdaefe9ec4..e0b3a70004e8c 100644 --- a/cpp/velox/udf/Udf.h +++ b/cpp/velox/udf/Udf.h @@ -27,6 +27,7 @@ struct UdfEntry { const char** argTypes; bool variableArity{false}; + bool allowTypeConversion{false}; }; #define GLUTEN_GET_NUM_UDF getNumUdf diff --git a/cpp/velox/udf/UdfLoader.cc b/cpp/velox/udf/UdfLoader.cc index 02aa410a95e10..8a99181662188 100644 --- a/cpp/velox/udf/UdfLoader.cc +++ b/cpp/velox/udf/UdfLoader.cc @@ -86,7 +86,8 @@ std::unordered_set> UdfLoader::getRegis const auto& entry = udfEntries[i]; auto dataType = toSubstraitTypeStr(entry.dataType); auto argTypes = toSubstraitTypeStr(entry.numArgs, entry.argTypes); - signatures_.insert(std::make_shared(entry.name, dataType, argTypes, entry.variableArity)); + signatures_.insert(std::make_shared( + entry.name, dataType, argTypes, entry.variableArity, entry.allowTypeConversion)); } free(udfEntries); } else { @@ -110,8 +111,8 @@ std::unordered_set> UdfLoader::getRegis auto dataType = toSubstraitTypeStr(entry.dataType); auto argTypes = toSubstraitTypeStr(entry.numArgs, entry.argTypes); auto intermediateType = toSubstraitTypeStr(entry.intermediateType); - signatures_.insert( - std::make_shared(entry.name, dataType, argTypes, intermediateType, entry.variableArity)); + signatures_.insert(std::make_shared( + entry.name, dataType, argTypes, intermediateType, entry.variableArity, entry.allowTypeConversion)); } free(udafEntries); } else { diff --git a/cpp/velox/udf/UdfLoader.h b/cpp/velox/udf/UdfLoader.h index 2783beb855119..51264e67cc4d7 100644 --- a/cpp/velox/udf/UdfLoader.h +++ b/cpp/velox/udf/UdfLoader.h @@ -37,21 +37,33 @@ class UdfLoader { std::string intermediateType{}; bool variableArity; + bool allowTypeConversion; - UdfSignature(std::string name, std::string returnType, std::string argTypes, bool variableArity) - : name(name), returnType(returnType), argTypes(argTypes), variableArity(variableArity) {} + UdfSignature( + std::string name, + std::string returnType, + std::string argTypes, + bool variableArity, + bool allowTypeConversion) + : name(name), + returnType(returnType), + argTypes(argTypes), + variableArity(variableArity), + allowTypeConversion(allowTypeConversion) {} UdfSignature( std::string name, std::string returnType, std::string argTypes, std::string intermediateType, - bool variableArity) + bool variableArity, + bool allowTypeConversion) : name(name), returnType(returnType), argTypes(argTypes), intermediateType(intermediateType), - variableArity(variableArity) {} + variableArity(variableArity), + allowTypeConversion(allowTypeConversion) {} ~UdfSignature() = default; }; diff --git a/cpp/velox/udf/examples/MyUDF.cc b/cpp/velox/udf/examples/MyUDF.cc index ee20ca39d0264..db1c5d7709f01 100644 --- a/cpp/velox/udf/examples/MyUDF.cc +++ b/cpp/velox/udf/examples/MyUDF.cc @@ -222,6 +222,30 @@ class MyDateRegisterer final : public gluten::UdfRegisterer { const std::string name_ = "mydate"; const char* myDateArg_[2] = {kDate, kInteger}; }; + +// name: mydate +// signatures: +// date, integer -> bigint +// type: SimpleFunction +// enable type conversion +class MyDate2Registerer final : public gluten::UdfRegisterer { + public: + int getNumUdf() override { + return 1; + } + + void populateUdfEntries(int& index, gluten::UdfEntry* udfEntries) override { + udfEntries[index++] = {name_.c_str(), kDate, 2, myDateArg_, false, true}; + } + + void registerSignatures() override { + facebook::velox::registerFunction({name_}); + } + + private: + const std::string name_ = "mydate2"; + const char* myDateArg_[2] = {kDate, kInteger}; +}; } // namespace mydate std::vector>& globalRegisters() { @@ -239,6 +263,7 @@ void setupRegisterers() { registerers.push_back(std::make_shared()); registerers.push_back(std::make_shared()); registerers.push_back(std::make_shared()); + registerers.push_back(std::make_shared()); inited = true; } } // namespace diff --git a/cpp/velox/utils/VeloxBatchResizer.cc b/cpp/velox/utils/VeloxBatchResizer.cc index 7b51463068c94..56429299464ac 100644 --- a/cpp/velox/utils/VeloxBatchResizer.cc +++ b/cpp/velox/utils/VeloxBatchResizer.cc @@ -23,9 +23,7 @@ namespace { class SliceRowVector : public ColumnarBatchIterator { public: SliceRowVector(int32_t maxOutputBatchSize, facebook::velox::RowVectorPtr in) - : maxOutputBatchSize_(maxOutputBatchSize), in_(in) { - GLUTEN_CHECK(in->size() > maxOutputBatchSize, "Invalid state"); - } + : maxOutputBatchSize_(maxOutputBatchSize), in_(in) {} std::shared_ptr next() override { int32_t remainingLength = in_->size() - cursor_; @@ -55,7 +53,11 @@ gluten::VeloxBatchResizer::VeloxBatchResizer( : pool_(pool), minOutputBatchSize_(minOutputBatchSize), maxOutputBatchSize_(maxOutputBatchSize), - in_(std::move(in)) {} + in_(std::move(in)) { + GLUTEN_CHECK( + minOutputBatchSize_ > 0 && maxOutputBatchSize_ > 0, + "Either minOutputBatchSize or maxOutputBatchSize should be larger than 0"); +} std::shared_ptr VeloxBatchResizer::next() { if (next_) { @@ -82,6 +84,11 @@ std::shared_ptr VeloxBatchResizer::next() { for (auto nextCb = in_->next(); nextCb != nullptr; nextCb = in_->next()) { auto nextVb = VeloxColumnarBatch::from(pool_, nextCb); auto nextRv = nextVb->getRowVector(); + if (buffer->size() + nextRv->size() > maxOutputBatchSize_) { + GLUTEN_CHECK(next_ == nullptr, "Invalid state"); + next_ = std::make_unique(maxOutputBatchSize_, nextRv); + return std::make_shared(buffer); + } buffer->append(nextRv.get()); if (buffer->size() >= minOutputBatchSize_) { // Buffer is full. diff --git a/docs/developers/MicroBenchmarks.md b/docs/developers/MicroBenchmarks.md index 21f222b42690d..bd469f34c81cc 100644 --- a/docs/developers/MicroBenchmarks.md +++ b/docs/developers/MicroBenchmarks.md @@ -320,23 +320,44 @@ cd /path/to/gluten/cpp/build/velox/benchmarks --threads 1 ``` -### Run shuffle write task only +### Run shuffle write/read task only Developers can only run shuffle write task via specifying `--run-shuffle` and `--data` options. The parquet format input will be read from arrow-parquet reader and sent to shuffle writer. -This option is similar to the `--with-shuffle` option, but it doesn't require the plan and split files. +The `--run-shuffle` option is similar to the `--with-shuffle` option, but it doesn't require the plan and split files. The round-robin partitioner is used by default. Besides, random partitioning can be used for testing purpose. By specifying option `--partitioning random`, the partitioner will generate a random partition id for each row. +To evaluate the shuffle reader performance, developers can set `--run-shuffle-read` option to add read process after the write task finishes. + +The below command will run shuffle write/read in single thread, using sort shuffle writer with 40000 partitions and random partition id. ```shell cd /path/to/gluten/cpp/build/velox/benchmarks ./generic_benchmark \ --run-shuffle \ +--run-shuffle-read \ --data /path/to/input_for_shuffle_write.parquet --shuffle-writer sort \ +--partitioning random \ +--shuffle-partitions 40000 \ --threads 1 ``` +The output should be like: + +``` +------------------------------------------------------------------------------------------------------------------------- +Benchmark Time CPU Iterations UserCounters... +------------------------------------------------------------------------------------------------------------------------- +ShuffleWriteRead/iterations:1/process_time/real_time/threads:1 121637629714 ns 121309450910 ns 1 elapsed_time=121.638G read_input_time=25.2637G shuffle_compress_time=10.8311G shuffle_decompress_time=4.04055G shuffle_deserialize_time=7.24289G shuffle_spill_time=0 shuffle_split_time=69.9098G shuffle_write_time=2.03274G +``` + +## Enable debug mode + +`spark.gluten.sql.debug`(debug mode) is set to false by default thereby the google glog levels are limited to only print `WARNING` or higher severity logs. +Unless `spark.gluten.sql.debug` is set in the INI file via `--conf`, the logging behavior is same as debug mode off. +Developers can use `--debug-mode` command line flag to turn on debug mode when needed, and set verbosity/severity level via command line flags `--v` and `--minloglevel`. Note that constructing and deconstructing log strings can be very time-consuming, which may cause benchmark times to be inaccurate. + ## Simulate write tasks The last operator for a write task is a file write operator, and the output from Velox pipeline only diff --git a/docs/developers/VeloxUDF.md b/docs/developers/VeloxUDF.md index c896fd6726573..25b896929a43e 100644 --- a/docs/developers/VeloxUDF.md +++ b/docs/developers/VeloxUDF.md @@ -21,18 +21,18 @@ The following steps demonstrate how to set up a UDF library project: - **Implement the Interface Functions:** Implement the following interface functions that integrate UDF into Project Gluten: - - `getNumUdf()`: - This function should return the number of UDF in the library. - This is used to allocating udfEntries array as the argument for the next function `getUdfEntries`. + - `getNumUdf()`: + This function should return the number of UDF in the library. + This is used to allocating udfEntries array as the argument for the next function `getUdfEntries`. - - `getUdfEntries(gluten::UdfEntry* udfEntries)`: - This function should populate the provided udfEntries array with the details of the UDF, including function names and signatures. + - `getUdfEntries(gluten::UdfEntry* udfEntries)`: + This function should populate the provided udfEntries array with the details of the UDF, including function names and signatures. - - `registerUdf()`: - This function is called to register the UDF to Velox function registry. - This is where users should register functions by calling `facebook::velox::exec::registerVecotorFunction` or other Velox APIs. + - `registerUdf()`: + This function is called to register the UDF to Velox function registry. + This is where users should register functions by calling `facebook::velox::exec::registerVecotorFunction` or other Velox APIs. - - The interface functions are mapped to marcos in [Udf.h](../../cpp/velox/udf/Udf.h). Here's an example of how to implement these functions: + - The interface functions are mapped to marcos in [Udf.h](../../cpp/velox/udf/Udf.h). Here's an example of how to implement these functions: ``` // Filename MyUDF.cc @@ -176,6 +176,14 @@ The output from spark-shell will be like +------------------+----------------+ ``` +## Configurations + +| Parameters | Description | +|----------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------| +| spark.gluten.sql.columnar.backend.velox.udfLibraryPaths | Path to the udf/udaf libraries. | +| spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths | Path to the udf/udaf libraries on driver node. Only applicable on yarn-client mode. | +| spark.gluten.sql.columnar.backend.velox.udfAllowTypeConversion | Whether to inject possible `cast` to convert mismatched data types from input to one registered signatures. | + # Pandas UDFs (a.k.a. Vectorized UDFs) ## Introduction diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index 26527e1c81c90..b3b7603acf530 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -212,9 +212,13 @@ object ColumnarShuffleExchangeExec extends Logging { } def useSortBasedShuffle(partitioning: Partitioning, output: Seq[Attribute]): Boolean = { + val conf = GlutenConfig.getConf + lazy val isCelebornSortBasedShuffle = conf.isUseCelebornShuffleManager && + conf.celebornShuffleWriterType == GlutenConfig.GLUTEN_SORT_SHUFFLE_WRITER partitioning != SinglePartition && (partitioning.numPartitions >= GlutenConfig.getConf.columnarShuffleSortPartitionsThreshold || - output.size >= GlutenConfig.getConf.columnarShuffleSortColumnsThreshold) + output.size >= GlutenConfig.getConf.columnarShuffleSortColumnsThreshold) || + isCelebornSortBasedShuffle } class DummyPairRDDWithPartitions(@transient private val sc: SparkContext, numPartitions: Int) diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index ed7a811929940..1310b75a1f857 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -187,7 +187,6 @@ class GlutenConfig(conf: SQLConf) extends Logging { def columnarShuffleCompressionThreshold: Int = conf.getConf(COLUMNAR_SHUFFLE_COMPRESSION_THRESHOLD) - // FIXME: Not clear: MIN or MAX ? def maxBatchSize: Int = conf.getConf(COLUMNAR_MAX_BATCH_SIZE) def columnarToRowMemThreshold: Long = @@ -329,12 +328,11 @@ class GlutenConfig(conf: SQLConf) extends Logging { def veloxResizeBatchesShuffleInputRange: ResizeRange = { val standardSize = conf.getConf(COLUMNAR_MAX_BATCH_SIZE) - val defaultRange: ResizeRange = - ResizeRange((0.25 * standardSize).toInt.max(1), 4 * standardSize) - conf - .getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_RANGE) - .map(ResizeRange.parse) - .getOrElse(defaultRange) + val defaultMinSize: Int = (0.25 * standardSize).toInt.max(1) + val minSize = conf + .getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE) + .getOrElse(defaultMinSize) + ResizeRange(minSize, Int.MaxValue) } def chColumnarShuffleSpillThreshold: Long = { @@ -1492,17 +1490,16 @@ object GlutenConfig { .booleanConf .createWithDefault(true) - val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_RANGE = - buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range") + val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE = + buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize") .internal() .doc( - s"The minimum and maximum batch sizes for shuffle. If the batch size is " + - s"smaller / bigger than minimum / maximum value, it will be combined with other " + - s"batches / split before sending to shuffle. Only functions when " + + s"The minimum batch size for shuffle. If size of an input batch is " + + s"smaller than the value, it will be combined with other " + + s"batches before sending to shuffle. Only functions when " + s"${COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT.key} is set to true. " + - s"A valid value for the option is min~max. " + - s"E.g., s.g.s.c.b.v.resizeBatches.shuffleInput.range=100~10000") - .stringConf + s"Default value: 0.25 * ") + .intConf .createOptional val COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD = diff --git a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/SparkJvmOptions.java b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/SparkJvmOptions.java new file mode 100644 index 0000000000000..85c0912fd7c7a --- /dev/null +++ b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/SparkJvmOptions.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gluten.integration; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +public class SparkJvmOptions { + private static final String MODULE_OPTIONS_CLASS_NAME = "org.apache.spark.launcher.JavaModuleOptions"; + + public static String read() { + try { + final Class clazz = Class.forName("org.apache.spark.launcher.JavaModuleOptions"); + final Method method = clazz.getMethod("defaultModuleOptions"); + return (String) method.invoke(null); + } catch (ClassNotFoundException e) { + // Could happen in Spark 3.2 which doesn't have this class yet. + return ""; + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + public static void main(String[] args) { + System.out.println(read()); + } +} diff --git a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java index 6750b90e9e495..d186b5d0b1d63 100644 --- a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java +++ b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java @@ -332,6 +332,7 @@ public String getSparkMasterUrl() { @Override public Map extraSparkConf() { final Map extras = new HashMap<>(); + extras.put(SparkLauncher.EXECUTOR_DEFAULT_JAVA_OPTIONS, "-Dio.netty.tryReflectionSetAccessible=true"); extras.put(SparkLauncher.EXECUTOR_CORES, String.valueOf(resourceEnumeration.lcExecutorCores())); extras.put(SparkLauncher.EXECUTOR_MEMORY, String.format("%dm", resourceEnumeration.lcExecutorHeapMem())); extras.put("spark.memory.offHeap.enabled", "true"); diff --git a/tools/gluten-it/sbin/gluten-it.sh b/tools/gluten-it/sbin/gluten-it.sh index 00ff78e349977..8c1a6413b5ec2 100755 --- a/tools/gluten-it/sbin/gluten-it.sh +++ b/tools/gluten-it/sbin/gluten-it.sh @@ -16,8 +16,6 @@ set -euf -GLUTEN_IT_JVM_ARGS=${GLUTEN_IT_JVM_ARGS:-"-Xmx2G -XX:ErrorFile=/var/log/java/hs_err_pid%p.log"} - BASEDIR=$(dirname $0) LIB_DIR=$BASEDIR/../package/target/lib @@ -28,32 +26,25 @@ fi JAR_PATH=$LIB_DIR/* +SPARK_JVM_OPTIONS=$($JAVA_HOME/bin/java -cp $JAR_PATH org.apache.gluten.integration.SparkJvmOptions) + EMBEDDED_SPARK_HOME=$BASEDIR/../spark-home +# We temporarily disallow setting these two variables by caller. +SPARK_HOME="" +SPARK_SCALA_VERSION="" export SPARK_HOME=${SPARK_HOME:-$EMBEDDED_SPARK_HOME} export SPARK_SCALA_VERSION=${SPARK_SCALA_VERSION:-'2.12'} echo "SPARK_HOME set at [$SPARK_HOME]." echo "SPARK_SCALA_VERSION set at [$SPARK_SCALA_VERSION]." -$JAVA_HOME/bin/java $GLUTEN_IT_JVM_ARGS \ - -XX:+IgnoreUnrecognizedVMOptions \ - --add-opens=java.base/java.lang=ALL-UNNAMED \ - --add-opens=java.base/java.lang.invoke=ALL-UNNAMED \ - --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \ - --add-opens=java.base/java.io=ALL-UNNAMED \ - --add-opens=java.base/java.net=ALL-UNNAMED \ - --add-opens=java.base/java.nio=ALL-UNNAMED \ - --add-opens=java.base/java.util=ALL-UNNAMED \ - --add-opens=java.base/java.util.concurrent=ALL-UNNAMED \ - --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED \ - --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED \ - --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED \ - --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \ - --add-opens=java.base/sun.nio.cs=ALL-UNNAMED \ - --add-opens=java.base/sun.security.action=ALL-UNNAMED \ - --add-opens=java.base/sun.util.calendar=ALL-UNNAMED \ - -Djdk.reflect.useDirectMethodHandle=false \ +GLUTEN_IT_JVM_ARGS=${GLUTEN_IT_JVM_ARGS:-"-Xmx2G"} + +$JAVA_HOME/bin/java \ + $SPARK_JVM_OPTIONS \ + $GLUTEN_IT_JVM_ARGS \ + -XX:ErrorFile=/var/log/java/hs_err_pid%p.log \ -Dio.netty.tryReflectionSetAccessible=true \ -cp $JAR_PATH \ org.apache.gluten.integration.Cli $@