diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index bdbdfed0d0d0..a8a05c40f1cd 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -271,13 +271,10 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { } } - override def genColumnarShuffleExchange( - shuffle: ShuffleExchangeExec, - child: SparkPlan): SparkPlan = { + override def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec): SparkPlan = { + val child = shuffle.child if ( - BackendsApiManager.getSettings.supportShuffleWithProject( - shuffle.outputPartitioning, - shuffle.child) + BackendsApiManager.getSettings.supportShuffleWithProject(shuffle.outputPartitioning, child) ) { val (projectColumnNumber, newPartitioning, newChild) = addProjectionForShuffleExchange(shuffle) diff --git a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppender.java b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppender.java new file mode 100644 index 000000000000..1bf34b5ce3b6 --- /dev/null +++ b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppender.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.utils; + +import org.apache.gluten.exec.Runtime; +import org.apache.gluten.exec.Runtimes; +import org.apache.gluten.memory.nmm.NativeMemoryManager; +import org.apache.gluten.memory.nmm.NativeMemoryManagers; +import org.apache.gluten.vectorized.ColumnarBatchInIterator; +import org.apache.gluten.vectorized.ColumnarBatchOutIterator; + +import org.apache.spark.sql.vectorized.ColumnarBatch; + +import java.util.Iterator; + +public final class VeloxBatchAppender { + public static ColumnarBatchOutIterator create( + int minOutputBatchSize, Iterator in) { + final Runtime runtime = Runtimes.contextInstance(); + final NativeMemoryManager nmm = NativeMemoryManagers.contextInstance("VeloxBatchAppender"); + long outHandle = + VeloxBatchAppenderJniWrapper.forRuntime(runtime) + .create( + nmm.getNativeInstanceHandle(), minOutputBatchSize, new ColumnarBatchInIterator(in)); + return new ColumnarBatchOutIterator(runtime, outHandle, nmm); + } +} diff --git a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppenderJniWrapper.java b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppenderJniWrapper.java new file mode 100644 index 000000000000..9e2531951ccc --- /dev/null +++ b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppenderJniWrapper.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.utils; + +import org.apache.gluten.exec.Runtime; +import org.apache.gluten.exec.RuntimeAware; +import org.apache.gluten.vectorized.ColumnarBatchInIterator; + +public class VeloxBatchAppenderJniWrapper implements RuntimeAware { + private final Runtime runtime; + + private VeloxBatchAppenderJniWrapper(Runtime runtime) { + this.runtime = runtime; + } + + public static VeloxBatchAppenderJniWrapper forRuntime(Runtime runtime) { + return new VeloxBatchAppenderJniWrapper(runtime); + } + + @Override + public long handle() { + return runtime.getHandle(); + } + + public native long create( + long memoryManagerHandle, int minOutputBatchSize, ColumnarBatchInIterator itr); +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index b20eccafb625..459a7886ea23 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -203,7 +203,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { resIter.close() } .recyclePayload(batch => batch.close()) - .addToPipelineTime(pipelineTime) + .collectLifeMillis(millis => pipelineTime += millis) .asInterruptible(context) .create() } @@ -246,7 +246,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { nativeResultIterator.close() } .recyclePayload(batch => batch.close()) - .addToPipelineTime(pipelineTime) + .collectLifeMillis(millis => pipelineTime += millis) .create() } // scalastyle:on argcount diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index f8af80a9b44d..66ca8660a50c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -320,9 +320,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { aggregateAttributes: Seq[Attribute]): HashAggregateExecPullOutBaseHelper = HashAggregateExecPullOutHelper(aggregateExpressions, aggregateAttributes) - override def genColumnarShuffleExchange( - shuffle: ShuffleExchangeExec, - newChild: SparkPlan): SparkPlan = { + override def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec): SparkPlan = { def allowHashOnMap[T](f: => T): T = { val originalAllowHash = SQLConf.get.getConf(SQLConf.LEGACY_ALLOW_HASH_ON_MAPTYPE) try { @@ -333,20 +331,28 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { } } + def maybeAddAppendBatchesExec(plan: SparkPlan): SparkPlan = { + if (GlutenConfig.getConf.veloxCoalesceBatchesBeforeShuffle) { + VeloxAppendBatchesExec(plan, GlutenConfig.getConf.veloxMinBatchSizeForShuffle) + } else { + plan + } + } + + val child = shuffle.child + shuffle.outputPartitioning match { case HashPartitioning(exprs, _) => val hashExpr = new Murmur3Hash(exprs) - val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ newChild.output - val projectTransformer = ProjectExecTransformer(projectList, newChild) + val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ child.output + val projectTransformer = ProjectExecTransformer(projectList, child) val validationResult = projectTransformer.doValidate() if (validationResult.isValid) { - ColumnarShuffleExchangeExec( - shuffle, - projectTransformer, - projectTransformer.output.drop(1)) + val newChild = maybeAddAppendBatchesExec(projectTransformer) + ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output.drop(1)) } else { TransformHints.tagNotTransformable(shuffle, validationResult) - shuffle.withNewChildren(newChild :: Nil) + shuffle.withNewChildren(child :: Nil) } case RoundRobinPartitioning(num) if SQLConf.get.sortBeforeRepartition && num > 1 => // scalastyle:off line.size.limit @@ -357,19 +363,20 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { allowHashOnMap { // Velox hash expression does not support null type and we also do not need to sort // null type since the value always be null. - val columnsForHash = newChild.output.filterNot(_.dataType == NullType) + val columnsForHash = child.output.filterNot(_.dataType == NullType) if (columnsForHash.isEmpty) { + val newChild = maybeAddAppendBatchesExec(child) ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output) } else { val hashExpr = new Murmur3Hash(columnsForHash) - val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ newChild.output - val projectTransformer = ProjectExecTransformer(projectList, newChild) + val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ child.output + val projectTransformer = ProjectExecTransformer(projectList, child) val projectBeforeSortValidationResult = projectTransformer.doValidate() // Make sure we support offload hash expression val projectBeforeSort = if (projectBeforeSortValidationResult.isValid) { projectTransformer } else { - val project = ProjectExec(projectList, newChild) + val project = ProjectExec(projectList, child) TransformHints.tagNotTransformable(project, projectBeforeSortValidationResult) project } @@ -380,17 +387,16 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { ProjectExecTransformer(projectList.drop(1), sortByHashCode) val validationResult = dropSortColumnTransformer.doValidate() if (validationResult.isValid) { - ColumnarShuffleExchangeExec( - shuffle, - dropSortColumnTransformer, - dropSortColumnTransformer.output) + val newChild = maybeAddAppendBatchesExec(dropSortColumnTransformer) + ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output) } else { TransformHints.tagNotTransformable(shuffle, validationResult) - shuffle.withNewChildren(newChild :: Nil) + shuffle.withNewChildren(child :: Nil) } } } case _ => + val newChild = maybeAddAppendBatchesExec(child) ColumnarShuffleExchangeExec(shuffle, newChild, null) } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxAppendBatchesExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxAppendBatchesExec.scala new file mode 100644 index 000000000000..8c2834574204 --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxAppendBatchesExec.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.extension.GlutenPlan +import org.apache.gluten.utils.{Iterators, VeloxBatchAppender} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.vectorized.ColumnarBatch + +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.JavaConverters._ + +/** + * An operator to coalesce input batches by appending the later batches to the one that comes + * earlier. + */ +case class VeloxAppendBatchesExec(override val child: SparkPlan, minOutputBatchSize: Int) + extends GlutenPlan + with UnaryExecNode { + + override lazy val metrics: Map[String, SQLMetric] = Map( + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"), + "appendTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to append batches") + ) + + override def supportsColumnar: Boolean = true + override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() + + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + val numInputRows = longMetric("numInputRows") + val numInputBatches = longMetric("numInputBatches") + val numOutputRows = longMetric("numOutputRows") + val numOutputBatches = longMetric("numOutputBatches") + val appendTime = longMetric("appendTime") + + child.executeColumnar().mapPartitions { + in => + // Append millis = Out millis - In millis. + val appendMillis = new AtomicLong(0L) + + val appender = VeloxBatchAppender.create( + minOutputBatchSize, + Iterators + .wrap(in) + .collectReadMillis(inMillis => appendMillis.getAndAdd(-inMillis)) + .create() + .map { + inBatch => + numInputRows += inBatch.numRows() + numInputBatches += 1 + inBatch + } + .asJava + ) + + val out = Iterators + .wrap(appender.asScala) + .collectReadMillis(outMillis => appendMillis.getAndAdd(outMillis)) + .recyclePayload(_.close()) + .recycleIterator { + appender.close() + appendTime += appendMillis.get() + } + .create() + .map { + outBatch => + numOutputRows += outBatch.numRows() + numOutputBatches += 1 + outBatch + } + + out + } + } + + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = + copy(child = newChild) +} diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala index cd1f21a0a31c..ae8d64a09937 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala @@ -24,6 +24,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.SparkConf import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -33,7 +34,7 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters -class TestOperator extends VeloxWholeStageTransformerSuite { +class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper { protected val rootPath: String = getClass.getResource("/").getPath override protected val resourcePath: String = "/tpch-data-parquet-velox" @@ -703,6 +704,29 @@ class TestOperator extends VeloxWholeStageTransformerSuite { } } + test("combine small batches before shuffle") { + val minBatchSize = 15 + withSQLConf( + "spark.gluten.sql.columnar.backend.velox.coalesceBatchesBeforeShuffle" -> "true", + "spark.gluten.sql.columnar.maxBatchSize" -> "2", + "spark.gluten.sql.columnar.backend.velox.minBatchSizeForShuffle" -> s"$minBatchSize" + ) { + val df = runQueryAndCompare( + "select l_orderkey, sum(l_partkey) as sum from lineitem " + + "where l_orderkey < 100 group by l_orderkey") { _ => } + checkLengthAndPlan(df, 27) + val ops = collect(df.queryExecution.executedPlan) { case p: VeloxAppendBatchesExec => p } + assert(ops.size == 1) + val op = ops.head + assert(op.minOutputBatchSize == minBatchSize) + val metrics = op.metrics + assert(metrics("numInputRows").value == 27) + assert(metrics("numInputBatches").value == 14) + assert(metrics("numOutputRows").value == 27) + assert(metrics("numOutputBatches").value == 2) + } + } + test("test OneRowRelation") { val df = sql("SELECT 1") checkAnswer(df, Row(1)) diff --git a/cpp/core/jni/JniCommon.cc b/cpp/core/jni/JniCommon.cc index 328a7b7722f9..08c5cb1d40cb 100644 --- a/cpp/core/jni/JniCommon.cc +++ b/cpp/core/jni/JniCommon.cc @@ -65,3 +65,61 @@ gluten::Runtime* gluten::getRuntime(JNIEnv* env, jobject runtimeAware) { GLUTEN_CHECK(ctx != nullptr, "FATAL: resource instance should not be null."); return ctx; } + +std::unique_ptr gluten::makeJniColumnarBatchIterator( + JNIEnv* env, + jobject jColumnarBatchItr, + gluten::Runtime* runtime, + std::shared_ptr writer) { + return std::make_unique(env, jColumnarBatchItr, runtime, writer); +} + +gluten::JniColumnarBatchIterator::JniColumnarBatchIterator( + JNIEnv* env, + jobject jColumnarBatchItr, + gluten::Runtime* runtime, + std::shared_ptr writer) + : runtime_(runtime), writer_(writer) { + // IMPORTANT: DO NOT USE LOCAL REF IN DIFFERENT THREAD + if (env->GetJavaVM(&vm_) != JNI_OK) { + std::string errorMessage = "Unable to get JavaVM instance"; + throw gluten::GlutenException(errorMessage); + } + serializedColumnarBatchIteratorClass_ = + createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/ColumnarBatchInIterator;"); + serializedColumnarBatchIteratorHasNext_ = + getMethodIdOrError(env, serializedColumnarBatchIteratorClass_, "hasNext", "()Z"); + serializedColumnarBatchIteratorNext_ = getMethodIdOrError(env, serializedColumnarBatchIteratorClass_, "next", "()J"); + jColumnarBatchItr_ = env->NewGlobalRef(jColumnarBatchItr); +} + +gluten::JniColumnarBatchIterator::~JniColumnarBatchIterator() { + JNIEnv* env; + attachCurrentThreadAsDaemonOrThrow(vm_, &env); + env->DeleteGlobalRef(jColumnarBatchItr_); + env->DeleteGlobalRef(serializedColumnarBatchIteratorClass_); + vm_->DetachCurrentThread(); +} + +std::shared_ptr gluten::JniColumnarBatchIterator::next() { + JNIEnv* env; + attachCurrentThreadAsDaemonOrThrow(vm_, &env); + if (!env->CallBooleanMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorHasNext_)) { + checkException(env); + return nullptr; // stream ended + } + + checkException(env); + jlong handle = env->CallLongMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorNext_); + checkException(env); + auto batch = runtime_->objectStore()->retrieve(handle); + if (writer_ != nullptr) { + // save snapshot of the batch to file + std::shared_ptr schema = batch->exportArrowSchema(); + std::shared_ptr array = batch->exportArrowArray(); + auto rb = gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(), schema.get())); + GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get()))); + GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb)); + } + return batch; +} diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h index 5858a70e9a77..bc5cf84f6ff4 100644 --- a/cpp/core/jni/JniCommon.h +++ b/cpp/core/jni/JniCommon.h @@ -28,6 +28,7 @@ #include "memory/AllocationListener.h" #include "shuffle/rss/RssClient.h" #include "utils/Compression.h" +#include "utils/ResourceMap.h" #include "utils/exception.h" static jint jniVersion = JNI_VERSION_1_8; @@ -119,6 +120,12 @@ static inline void attachCurrentThreadAsDaemonOrThrow(JavaVM* vm, JNIEnv** out) } } +template +static T* jniCastOrThrow(gluten::ResourceHandle handle) { + auto instance = reinterpret_cast(handle); + GLUTEN_CHECK(instance != nullptr, "FATAL: resource instance should not be null."); + return instance; +} namespace gluten { class JniCommonState { @@ -251,6 +258,40 @@ DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kLong, jlongArray, Long) DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kFloat, jfloatArray, Float) DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kDouble, jdoubleArray, Double) +class JniColumnarBatchIterator : public ColumnarBatchIterator { + public: + explicit JniColumnarBatchIterator( + JNIEnv* env, + jobject jColumnarBatchItr, + Runtime* runtime, + std::shared_ptr writer); + + // singleton + JniColumnarBatchIterator(const JniColumnarBatchIterator&) = delete; + JniColumnarBatchIterator(JniColumnarBatchIterator&&) = delete; + JniColumnarBatchIterator& operator=(const JniColumnarBatchIterator&) = delete; + JniColumnarBatchIterator& operator=(JniColumnarBatchIterator&&) = delete; + + virtual ~JniColumnarBatchIterator(); + + std::shared_ptr next() override; + + private: + JavaVM* vm_; + jobject jColumnarBatchItr_; + Runtime* runtime_; + std::shared_ptr writer_; + + jclass serializedColumnarBatchIteratorClass_; + jmethodID serializedColumnarBatchIteratorHasNext_; + jmethodID serializedColumnarBatchIteratorNext_; +}; + +std::unique_ptr makeJniColumnarBatchIterator( + JNIEnv* env, + jobject jColumnarBatchItr, + Runtime* runtime, + std::shared_ptr writer); } // namespace gluten // TODO: Move the static functions to namespace gluten diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index db498f43adbf..4e069ec7a6d6 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -58,13 +58,8 @@ static jmethodID splitResultConstructor; static jclass columnarBatchSerializeResultClass; static jmethodID columnarBatchSerializeResultConstructor; -static jclass serializedColumnarBatchIteratorClass; static jclass metricsBuilderClass; static jmethodID metricsBuilderConstructor; - -static jmethodID serializedColumnarBatchIteratorHasNext; -static jmethodID serializedColumnarBatchIteratorNext; - static jclass nativeColumnarToRowInfoClass; static jmethodID nativeColumnarToRowInfoConstructor; @@ -147,80 +142,6 @@ class JavaInputStreamAdaptor final : public arrow::io::InputStream { bool closed_ = false; }; -class JniColumnarBatchIterator : public ColumnarBatchIterator { - public: - explicit JniColumnarBatchIterator( - JNIEnv* env, - jobject jColumnarBatchItr, - Runtime* runtime, - std::shared_ptr writer) - : runtime_(runtime), writer_(writer) { - // IMPORTANT: DO NOT USE LOCAL REF IN DIFFERENT THREAD - if (env->GetJavaVM(&vm_) != JNI_OK) { - std::string errorMessage = "Unable to get JavaVM instance"; - throw gluten::GlutenException(errorMessage); - } - jColumnarBatchItr_ = env->NewGlobalRef(jColumnarBatchItr); - } - - // singleton - JniColumnarBatchIterator(const JniColumnarBatchIterator&) = delete; - JniColumnarBatchIterator(JniColumnarBatchIterator&&) = delete; - JniColumnarBatchIterator& operator=(const JniColumnarBatchIterator&) = delete; - JniColumnarBatchIterator& operator=(JniColumnarBatchIterator&&) = delete; - - virtual ~JniColumnarBatchIterator() { - JNIEnv* env; - attachCurrentThreadAsDaemonOrThrow(vm_, &env); - env->DeleteGlobalRef(jColumnarBatchItr_); - vm_->DetachCurrentThread(); - } - - std::shared_ptr next() override { - JNIEnv* env; - attachCurrentThreadAsDaemonOrThrow(vm_, &env); - if (!env->CallBooleanMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorHasNext)) { - checkException(env); - return nullptr; // stream ended - } - - checkException(env); - jlong handle = env->CallLongMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorNext); - checkException(env); - auto batch = runtime_->objectStore()->retrieve(handle); - if (writer_ != nullptr) { - // save snapshot of the batch to file - std::shared_ptr schema = batch->exportArrowSchema(); - std::shared_ptr array = batch->exportArrowArray(); - auto rb = gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(), schema.get())); - GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get()))); - GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb)); - } - return batch; - } - - private: - JavaVM* vm_; - jobject jColumnarBatchItr_; - Runtime* runtime_; - std::shared_ptr writer_; -}; - -std::unique_ptr makeJniColumnarBatchIterator( - JNIEnv* env, - jobject jColumnarBatchItr, - Runtime* runtime, - std::shared_ptr writer) { - return std::make_unique(env, jColumnarBatchItr, runtime, writer); -} - -template -T* jniCastOrThrow(ResourceHandle handle) { - auto instance = reinterpret_cast(handle); - GLUTEN_CHECK(instance != nullptr, "FATAL: resource instance should not be null."); - return instance; -} - #ifdef __cplusplus extern "C" { #endif @@ -253,14 +174,6 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { metricsBuilderConstructor = getMethodIdOrError( env, metricsBuilderClass, "", "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V"); - serializedColumnarBatchIteratorClass = - createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/ColumnarBatchInIterator;"); - - serializedColumnarBatchIteratorHasNext = - getMethodIdOrError(env, serializedColumnarBatchIteratorClass, "hasNext", "()Z"); - - serializedColumnarBatchIteratorNext = getMethodIdOrError(env, serializedColumnarBatchIteratorClass, "next", "()J"); - nativeColumnarToRowInfoClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;"); nativeColumnarToRowInfoConstructor = getMethodIdOrError(env, nativeColumnarToRowInfoClass, "", "([I[IJ)V"); @@ -293,7 +206,6 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { env->DeleteGlobalRef(jniByteInputStreamClass); env->DeleteGlobalRef(splitResultClass); env->DeleteGlobalRef(columnarBatchSerializeResultClass); - env->DeleteGlobalRef(serializedColumnarBatchIteratorClass); env->DeleteGlobalRef(nativeColumnarToRowInfoClass); env->DeleteGlobalRef(byteArrayClass); env->DeleteGlobalRef(shuffleReaderMetricsClass); diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 05ecf9635eb0..34cc9001cf38 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -327,6 +327,7 @@ set(VELOX_SRCS utils/VeloxArrowUtils.cc utils/ConfigExtractor.cc utils/Common.cc + utils/VeloxBatchAppender.cc ) if (ENABLE_HDFS) diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 9da7355d1b3a..3b52eaa86b2f 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -30,6 +30,7 @@ #include "jni/JniFileSystem.h" #include "memory/VeloxMemoryManager.h" #include "substrait/SubstraitToVeloxPlanValidator.h" +#include "utils/VeloxBatchAppender.h" #include "velox/common/base/BloomFilter.h" #include @@ -246,6 +247,23 @@ JNIEXPORT jbyteArray JNICALL Java_org_apache_gluten_utils_VeloxBloomFilterJniWra JNI_METHOD_END(nullptr) } +JNIEXPORT jlong JNICALL Java_org_apache_gluten_utils_VeloxBatchAppenderJniWrapper_create( // NOLINT + JNIEnv* env, + jobject wrapper, + jlong memoryManagerHandle, + jint minOutputBatchSize, + jobject jIter) { + JNI_METHOD_START + auto ctx = gluten::getRuntime(env, wrapper); + auto memoryManager = jniCastOrThrow(memoryManagerHandle); + auto pool = gluten::VeloxRuntime::getLeafVeloxPool(memoryManager); + auto iter = gluten::makeJniColumnarBatchIterator(env, jIter, ctx, nullptr); + auto appender = std::make_shared( + std::make_unique(pool.get(), minOutputBatchSize, std::move(iter))); + return ctx->objectStore()->save(appender); + JNI_METHOD_END(gluten::kInvalidResourceHandle) +} + #ifdef __cplusplus } #endif diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc index cc648cf7fdd0..741ca8ab9b40 100644 --- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc @@ -303,17 +303,7 @@ arrow::Status VeloxHashBasedShuffleWriter::write(std::shared_ptr numRows -= length; } while (numRows); } else { - if (accumulateRows_ + rv->size() < 8192) { - accumulateRows_ += rv->size(); - initAccumulateDataset(rv); - accumulateDataset_->append(rv.get()); - } else { - initAccumulateDataset(rv); - accumulateDataset_->append(rv.get()); - RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_), memLimit)); - accumulateDataset_ = nullptr; - accumulateRows_ = 0; - } + RETURN_NOT_OK(partitioningAndDoSplit(std::move(rv), memLimit)); } } return arrow::Status::OK(); @@ -339,10 +329,6 @@ arrow::Status VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velo } arrow::Status VeloxHashBasedShuffleWriter::stop() { - if (accumulateDataset_ != nullptr) { - RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_), kMinMemLimit)); - accumulateRows_ = 0; - } if (options_.partitioning != Partitioning::kSingle) { for (auto pid = 0; pid < numPartitions_; ++pid) { RETURN_NOT_OK(evictPartitionBuffers(pid, false)); diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h index 142c7978bdc9..a11f84e952a6 100644 --- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h @@ -303,15 +303,6 @@ class VeloxHashBasedShuffleWriter : public VeloxShuffleWriter { arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, int64_t memLimit); - void initAccumulateDataset(facebook::velox::RowVectorPtr& rv) { - if (accumulateDataset_) { - return; - } - std::vector children(rv->children().size(), nullptr); - accumulateDataset_ = - std::make_shared(veloxPool_.get(), rv->type(), nullptr, 0, std::move(children)); - } - BinaryArrayResizeState binaryArrayResizeState_{}; bool hasComplexType_ = false; diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h b/cpp/velox/shuffle/VeloxShuffleWriter.h index 2855831c51ae..104b87616291 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxShuffleWriter.h @@ -124,10 +124,6 @@ class VeloxShuffleWriter : public ShuffleWriter { int32_t maxBatchSize_{0}; - uint32_t accumulateRows_{0}; - - facebook::velox::RowVectorPtr accumulateDataset_; - enum EvictState { kEvictable, kUnevictable }; // stat diff --git a/cpp/velox/utils/VeloxBatchAppender.cc b/cpp/velox/utils/VeloxBatchAppender.cc new file mode 100644 index 000000000000..8fa1ade217e0 --- /dev/null +++ b/cpp/velox/utils/VeloxBatchAppender.cc @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "VeloxBatchAppender.h" + +namespace gluten { + +gluten::VeloxBatchAppender::VeloxBatchAppender( + facebook::velox::memory::MemoryPool* pool, + int32_t minOutputBatchSize, + std::unique_ptr in) + : pool_(pool), minOutputBatchSize_(minOutputBatchSize), in_(std::move(in)) {} + +std::shared_ptr VeloxBatchAppender::next() { + auto cb = in_->next(); + if (cb == nullptr) { + // Input iterator was drained. + return nullptr; + } + if (cb->numRows() >= minOutputBatchSize_) { + // Fast flush path. + return cb; + } + + auto vb = VeloxColumnarBatch::from(pool_, cb); + auto rv = vb->getRowVector(); + auto buffer = facebook::velox::RowVector::createEmpty(rv->type(), pool_); + buffer->append(rv.get()); + + for (auto nextCb = in_->next(); nextCb != nullptr; nextCb = in_->next()) { + auto nextVb = VeloxColumnarBatch::from(pool_, nextCb); + auto nextRv = nextVb->getRowVector(); + buffer->append(nextRv.get()); + if (buffer->size() >= minOutputBatchSize_) { + // Buffer is full. + break; + } + } + return std::make_shared(buffer); +} + +int64_t VeloxBatchAppender::spillFixedSize(int64_t size) { + return in_->spillFixedSize(size); +} +} // namespace gluten diff --git a/cpp/velox/utils/VeloxBatchAppender.h b/cpp/velox/utils/VeloxBatchAppender.h new file mode 100644 index 000000000000..3698381d0add --- /dev/null +++ b/cpp/velox/utils/VeloxBatchAppender.h @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "memory/ColumnarBatchIterator.h" +#include "memory/VeloxColumnarBatch.h" +#include "utils/exception.h" +#include "velox/common/memory/MemoryPool.h" +#include "velox/vector/ComplexVector.h" + +namespace gluten { +class VeloxBatchAppender : public ColumnarBatchIterator { + public: + VeloxBatchAppender( + facebook::velox::memory::MemoryPool* pool, + int32_t minOutputBatchSize, + std::unique_ptr in); + + std::shared_ptr next() override; + + int64_t spillFixedSize(int64_t size) override; + + private: + facebook::velox::memory::MemoryPool* pool_; + const int32_t minOutputBatchSize_; + std::unique_ptr in_; +}; +} // namespace gluten diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index 8a086f896ba4..8a1baae51092 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -101,7 +101,7 @@ trait SparkPlanExecApi { aggregateExpressions: Seq[AggregateExpression], aggregateAttributes: Seq[Attribute]): HashAggregateExecPullOutBaseHelper - def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec, newChild: SparkPlan): SparkPlan + def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec): SparkPlan /** Generate ShuffledHashJoinExecTransformer. */ def genShuffledHashJoinExecTransformer( diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala index 39cc8ad2e2e6..6e4d37f633eb 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala @@ -101,23 +101,17 @@ case class OffloadAggregate() extends OffloadSingleNode with LogLevelUtil { // Exchange transformation. case class OffloadExchange() extends OffloadSingleNode with LogLevelUtil { override def offload(plan: SparkPlan): SparkPlan = plan match { - case plan if TransformHints.isNotTransformable(plan) => - plan - case plan: ShuffleExchangeExec => - logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - val child = plan.child - if ( - (child.supportsColumnar || GlutenConfig.getConf.enablePreferColumnar) && - BackendsApiManager.getSettings.supportColumnarShuffleExec() - ) { - BackendsApiManager.getSparkPlanExecApiInstance.genColumnarShuffleExchange(plan, child) - } else { - plan.withNewChildren(Seq(child)) - } - case plan: BroadcastExchangeExec => - val child = plan.child - logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - ColumnarBroadcastExchangeExec(plan.mode, child) + case p if TransformHints.isNotTransformable(p) => + p + case s: ShuffleExchangeExec + if (s.child.supportsColumnar || GlutenConfig.getConf.enablePreferColumnar) && + BackendsApiManager.getSettings.supportColumnarShuffleExec() => + logDebug(s"Columnar Processing for ${s.getClass} is currently supported.") + BackendsApiManager.getSparkPlanExecApiInstance.genColumnarShuffleExchange(s) + case b: BroadcastExchangeExec => + val child = b.child + logDebug(s"Columnar Processing for ${b.getClass} is currently supported.") + ColumnarBroadcastExchangeExec(b.mode, child) case other => other } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/utils/Iterators.scala b/gluten-core/src/main/scala/org/apache/gluten/utils/Iterators.scala index 81ff2dc0b177..1e3681355d6c 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/utils/Iterators.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/utils/Iterators.scala @@ -17,7 +17,6 @@ package org.apache.gluten.utils import org.apache.spark.{InterruptibleIterator, TaskContext} -import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.util.TaskResources import java.util.concurrent.TimeUnit @@ -85,12 +84,12 @@ private class IteratorCompleter[A](in: Iterator[A])(completionCallback: => Unit) } } -private class PipelineTimeAccumulator[A](in: Iterator[A], pipelineTime: SQLMetric) +private class LifeTimeAccumulator[A](in: Iterator[A], onCollected: Long => Unit) extends Iterator[A] { private val closed = new AtomicBoolean(false) private val startTime = System.nanoTime() - TaskResources.addRecycler("Iterators#PipelineTimeAccumulator", 100) { + TaskResources.addRecycler("Iterators#LifeTimeAccumulator", 100) { tryFinish() } @@ -111,9 +110,31 @@ private class PipelineTimeAccumulator[A](in: Iterator[A], pipelineTime: SQLMetri if (!closed.compareAndSet(false, true)) { return } - pipelineTime += TimeUnit.NANOSECONDS.toMillis( + val lifeTime = TimeUnit.NANOSECONDS.toMillis( System.nanoTime() - startTime ) + onCollected(lifeTime) + } +} + +private class ReadTimeAccumulator[A](in: Iterator[A], onAdded: Long => Unit) extends Iterator[A] { + + override def hasNext: Boolean = { + val prev = System.nanoTime() + val out = in.hasNext + val after = System.nanoTime() + val duration = TimeUnit.NANOSECONDS.toMillis(after - prev) + onAdded(duration) + out + } + + override def next(): A = { + val prev = System.nanoTime() + val out = in.next() + val after = System.nanoTime() + val duration = TimeUnit.NANOSECONDS.toMillis(after - prev) + onAdded(duration) + out } } @@ -171,8 +192,13 @@ class WrapperBuilder[A](in: Iterator[A]) { // FIXME how to make the ctor compani this } - def addToPipelineTime(pipelineTime: SQLMetric): WrapperBuilder[A] = { - wrapped = new PipelineTimeAccumulator[A](wrapped, pipelineTime) + def collectLifeMillis(onCollected: Long => Unit): WrapperBuilder[A] = { + wrapped = new LifeTimeAccumulator[A](wrapped, onCollected) + this + } + + def collectReadMillis(onAdded: Long => Unit): WrapperBuilder[A] = { + wrapped = new ReadTimeAccumulator[A](wrapped, onAdded) this } diff --git a/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java b/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java index 37de9894392c..3a2a741bef0b 100644 --- a/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java +++ b/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java @@ -31,8 +31,7 @@ public class ColumnarBatchOutIterator extends GeneralOutIterator implements Runt private final long iterHandle; private final NativeMemoryManager nmm; - public ColumnarBatchOutIterator(Runtime runtime, long iterHandle, NativeMemoryManager nmm) - throws IOException { + public ColumnarBatchOutIterator(Runtime runtime, long iterHandle, NativeMemoryManager nmm) { super(); this.runtime = runtime; this.iterHandle = iterHandle; diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index d76e698dcf2a..2376a1f39c1e 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -187,6 +187,7 @@ class GlutenConfig(conf: SQLConf) extends Logging { def columnarShuffleCompressionThreshold: Int = conf.getConf(COLUMNAR_SHUFFLE_COMPRESSION_THRESHOLD) + // FIXME: Not clear: MIN or MAX ? def maxBatchSize: Int = conf.getConf(COLUMNAR_MAX_BATCH_SIZE) def shuffleWriterBufferSize: Int = conf @@ -295,6 +296,14 @@ class GlutenConfig(conf: SQLConf) extends Logging { def veloxBloomFilterMaxNumBits: Long = conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS) + def veloxCoalesceBatchesBeforeShuffle: Boolean = + conf.getConf(COLUMNAR_VELOX_COALESCE_BATCHES_BEFORE_SHUFFLE) + + def veloxMinBatchSizeForShuffle: Int = + conf + .getConf(COLUMNAR_VELOX_MIN_BATCH_SIZE_FOR_SHUFFLE) + .getOrElse(conf.getConf(COLUMNAR_MAX_BATCH_SIZE)) + def chColumnarShufflePreferSpill: Boolean = conf.getConf(COLUMNAR_CH_SHUFFLE_PREFER_SPILL_ENABLED) def chColumnarShuffleSpillThreshold: Long = { @@ -1395,6 +1404,23 @@ object GlutenConfig { .checkValue(_ > 0, "must be a positive number") .createWithDefault(10000) + val COLUMNAR_VELOX_COALESCE_BATCHES_BEFORE_SHUFFLE = + buildConf("spark.gluten.sql.columnar.backend.velox.coalesceBatchesBeforeShuffle") + .internal() + .doc(s"If true, combine small columnar batches together before sending to shuffle. " + + s"The default minimum output batch size is equal to $GLUTEN_MAX_BATCH_SIZE_KEY") + .booleanConf + .createWithDefault(false) + + val COLUMNAR_VELOX_MIN_BATCH_SIZE_FOR_SHUFFLE = + buildConf("spark.gluten.sql.columnar.backend.velox.minBatchSizeForShuffle") + .internal() + .doc(s"The minimum batch size for shuffle. If the batch size is smaller than this value, " + + s"it will be combined with other batches before sending to shuffle. Only functions when " + + s"${COLUMNAR_VELOX_COALESCE_BATCHES_BEFORE_SHUFFLE.key} is set to true.") + .intConf + .createOptional + val COLUMNAR_CH_SHUFFLE_PREFER_SPILL_ENABLED = buildConf("spark.gluten.sql.columnar.backend.ch.shuffle.preferSpill") .internal()