Skip to content

Commit

Permalink
[GLUTEN-7600][VL] Remove EmptySchemaWorkaround (#7620)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored Oct 22, 2024
1 parent 3572f80 commit 53e8161
Show file tree
Hide file tree
Showing 26 changed files with 206 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.gluten.backendsapi.{BackendsApiManager, SparkPlanExecApi}
import org.apache.gluten.exception.{GlutenException, GlutenNotSupportException}
import org.apache.gluten.execution._
import org.apache.gluten.expression._
import org.apache.gluten.expression.ExpressionNames.MONOTONICALLY_INCREASING_ID
import org.apache.gluten.extension.ExpressionExtensionTrait
import org.apache.gluten.extension.columnar.AddFallbackTagRule
import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
Expand Down Expand Up @@ -579,7 +580,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
override def extraExpressionMappings: Seq[Sig] = {
List(
Sig[CollectList](ExpressionNames.COLLECT_LIST),
Sig[CollectSet](ExpressionNames.COLLECT_SET)
Sig[CollectSet](ExpressionNames.COLLECT_SET),
Sig[MonotonicallyIncreasingID](MONOTONICALLY_INCREASING_ID)
) ++
ExpressionExtensionTrait.expressionExtensionTransformer.expressionSigList ++
SparkShimLoader.getSparkShims.bloomFilterExpressionMappings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,36 @@
public final class VeloxColumnarBatches {
public static final String COMPREHENSIVE_TYPE_VELOX = "velox";

public static void checkVeloxBatch(ColumnarBatch batch) {
private static boolean isVeloxBatch(ColumnarBatch batch) {
final String comprehensiveType = ColumnarBatches.getComprehensiveLightBatchType(batch);
return Objects.equals(comprehensiveType, COMPREHENSIVE_TYPE_VELOX);
}

public static void checkVeloxBatch(ColumnarBatch batch) {
if (ColumnarBatches.isZeroColumnBatch(batch)) {
return;
}
Preconditions.checkArgument(
Objects.equals(comprehensiveType, COMPREHENSIVE_TYPE_VELOX),
isVeloxBatch(batch),
String.format(
"Expected comprehensive batch type %s, but got %s",
COMPREHENSIVE_TYPE_VELOX, comprehensiveType));
COMPREHENSIVE_TYPE_VELOX, ColumnarBatches.getComprehensiveLightBatchType(batch)));
}

public static void checkNonVeloxBatch(ColumnarBatch batch) {
final String comprehensiveType = ColumnarBatches.getComprehensiveLightBatchType(batch);
if (ColumnarBatches.isZeroColumnBatch(batch)) {
return;
}
Preconditions.checkArgument(
!Objects.equals(comprehensiveType, COMPREHENSIVE_TYPE_VELOX),
!isVeloxBatch(batch),
String.format("Comprehensive batch type is already %s", COMPREHENSIVE_TYPE_VELOX));
}

public static ColumnarBatch toVeloxBatch(ColumnarBatch input) {
checkNonVeloxBatch(input);
if (ColumnarBatches.isZeroColumnBatch(input)) {
return input;
}
Preconditions.checkArgument(!isVeloxBatch(input));
final Runtime runtime = Runtimes.contextInstance("VeloxColumnarBatches#toVeloxBatch");
final long handle = ColumnarBatches.getNativeHandle(input);
final long outHandle = VeloxColumnarBatchJniWrapper.create(runtime).from(handle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,12 @@ object VeloxBackendSettings extends BackendSettingsApi {
windowFunctions.foreach(
func => {
val windowExpression = func match {
case alias: Alias => WindowFunctionsBuilder.extractWindowExpression(alias.child)
case alias: Alias =>
val we = WindowFunctionsBuilder.extractWindowExpression(alias.child)
if (we == null) {
throw new GlutenNotSupportException(s"$func is not supported.")
}
we
case _ => throw new GlutenNotSupportException(s"$func is not supported.")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.gluten.backendsapi.velox
import org.apache.gluten.backendsapi.RuleApi
import org.apache.gluten.datasource.ArrowConvertorRule
import org.apache.gluten.extension._
import org.apache.gluten.extension.EmptySchemaWorkaround.{FallbackEmptySchemaRelation, PlanOneRowRelation}
import org.apache.gluten.extension.columnar._
import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast, TransformPreOverrides}
import org.apache.gluten.extension.columnar.enumerated.EnumeratedTransform
Expand Down Expand Up @@ -58,11 +57,9 @@ private object VeloxRuleApi {
injector.injectTransform(_ => PushDownInputFileExpression.PreOffload)
injector.injectTransform(c => FallbackOnANSIMode.apply(c.session))
injector.injectTransform(c => FallbackMultiCodegens.apply(c.session))
injector.injectTransform(c => PlanOneRowRelation.apply(c.session))
injector.injectTransform(_ => RewriteSubqueryBroadcast())
injector.injectTransform(c => BloomFilterMightContainJointRewriteRule.apply(c.session))
injector.injectTransform(c => ArrowScanReplaceRule.apply(c.session))
injector.injectTransform(_ => FallbackEmptySchemaRelation())
injector.injectTransform(_ => RewriteSparkPlanRulesManager())
injector.injectTransform(_ => AddFallbackTagRule())
injector.injectTransform(_ => TransformPreOverrides())
Expand Down Expand Up @@ -99,8 +96,6 @@ private object VeloxRuleApi {
injector.inject(_ => RemoveTransitions)
injector.inject(_ => PushDownInputFileExpression.PreOffload)
injector.inject(c => FallbackOnANSIMode.apply(c.session))
injector.inject(c => PlanOneRowRelation.apply(c.session))
injector.inject(_ => FallbackEmptySchemaRelation())
injector.inject(_ => RewriteSubqueryBroadcast())
injector.inject(c => BloomFilterMightContainJointRewriteRule.apply(c.session))
injector.inject(c => ArrowScanReplaceRule.apply(c.session))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class VeloxValidatorApi extends ValidatorApi {
override def doColumnarShuffleExchangeExecValidate(
outputPartitioning: Partitioning,
child: SparkPlan): Option[String] = {
if (child.output.isEmpty) {
// See: https://github.com/apache/incubator-gluten/issues/7600.
return Some("Shuffle with empty schema is not supported")
}
doSchemaValidate(child.schema)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ object VeloxColumnarToRowExec {
}

val runtime = Runtimes.contextInstance("ColumnarToRow")
// TODO:: pass the jni jniWrapper and arrowSchema and serializeSchema method by broadcast
// TODO: Pass the jni jniWrapper and arrowSchema and serializeSchema method by broadcast.
val jniWrapper = NativeColumnarToRowJniWrapper.create(runtime)
val c2rId = jniWrapper.nativeColumnarToRowInit()

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,7 @@ class ColumnarShuffleWriter[K, V](
@throws[IOException]
def internalWrite(records: Iterator[Product2[K, V]]): Unit = {
if (!records.hasNext) {
partitionLengths = new Array[Long](dep.partitioner.numPartitions)
shuffleBlockResolver.writeMetadataFileAndCommit(
dep.shuffleId,
mapId,
partitionLengths,
Array[Long](),
null)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
handleEmptyInput()
return
}

Expand Down Expand Up @@ -194,6 +187,11 @@ class ColumnarShuffleWriter[K, V](
cb.close()
}

if (nativeShuffleWriter == -1L) {
handleEmptyInput()
return
}

val startTime = System.nanoTime()
assert(nativeShuffleWriter != -1L)
splitResult = jniWrapper.stop(nativeShuffleWriter)
Expand Down Expand Up @@ -241,16 +239,28 @@ class ColumnarShuffleWriter[K, V](
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
}

private def handleEmptyInput(): Unit = {
partitionLengths = new Array[Long](dep.partitioner.numPartitions)
shuffleBlockResolver.writeMetadataFileAndCommit(
dep.shuffleId,
mapId,
partitionLengths,
Array[Long](),
null)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
}

@throws[IOException]
override def write(records: Iterator[Product2[K, V]]): Unit = {
internalWrite(records)
}

private def closeShuffleWriter(): Unit = {
if (nativeShuffleWriter != -1L) {
jniWrapper.close(nativeShuffleWriter)
nativeShuffleWriter = -1L
if (nativeShuffleWriter == -1L) {
return
}
jniWrapper.close(nativeShuffleWriter)
nativeShuffleWriter = -1L
}

override def stop(success: Boolean): Option[MapStatus] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.velox

import org.apache.gluten.columnarbatch.{ColumnarBatches, ColumnarBatchJniWrapper}
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.datasource.{VeloxDataSourceJniWrapper, VeloxDataSourceUtil}
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.execution.datasource.GlutenRowSplitter
Expand Down Expand Up @@ -76,13 +76,8 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase {
ColumnarBatches.checkOffloaded(batch)
ColumnarBatches.retain(batch)
val batchHandle = {
if (batch.numCols == 0) {
// the operation will find a zero column batch from a task-local pool
ColumnarBatchJniWrapper.create(runtime).getForEmptySchema(batch.numRows)
} else {
ColumnarBatches.checkOffloaded(batch)
ColumnarBatches.getNativeHandle(batch)
}
ColumnarBatches.checkOffloaded(batch)
ColumnarBatches.getNativeHandle(batch)
}
datasourceJniWrapper.writeBatch(dsHandle, batchHandle)
batch.close()
Expand Down
Loading

0 comments on commit 53e8161

Please sign in to comment.