diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 554b3791dad3..75003d51c195 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -307,6 +307,14 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { FilterExecTransformer(condition, child) } + override def genSparkPartialProjectColumnarExec(original: ProjectExec): GlutenPlan = { + SparkPartialProjectColumnarExec.create(original) + } + + override def genProjectColumnarExec(original: ProjectExec): GlutenPlan = { + ProjectColumnarExec(original.projectList, original.child) + } + /** Generate HashAggregateExecTransformer. */ override def genHashAggregateExecTransformer( requiredChildDistributionExpressions: Option[Seq[Expression]], diff --git a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala index 2778710155bf..34583d0449e2 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala @@ -87,7 +87,7 @@ case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] { options, columnPruning = session.sessionState.conf.csvColumnPruning, session.sessionState.conf.sessionLocalTimeZone) - checkSchema(dataSchema) && + SparkSchemaUtil.checkSchema(dataSchema) && checkCsvOptions(csvOptions, session.sessionState.conf.sessionLocalTimeZone) && dataSchema.nonEmpty } @@ -106,13 +106,4 @@ case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] { SparkShimLoader.getSparkShims.dateTimestampFormatInReadIsDefaultValue(csvOptions, timeZone) } - private def checkSchema(schema: StructType): Boolean = { - try { - SparkSchemaUtil.toArrowSchema(schema) - true - } catch { - case _: Exception => - false - } - } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/ProjectColumnarExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/ProjectColumnarExec.scala new file mode 100644 index 000000000000..ecc0543fc5d8 --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/ProjectColumnarExec.scala @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.GlutenConfig +import org.apache.gluten.columnarbatch.ColumnarBatches +import org.apache.gluten.extension.{GlutenPlan, ValidationResult} +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators +import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.gluten.utils.iterator.Iterators +import org.apache.gluten.vectorized.ArrowWritableColumnVector +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, MutableProjection, NamedExpression, SortOrder, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.{ExplainUtils, OrderPreservingNodeShim, PartitioningPreservingNodeShim, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.vectorized.{MutableColumnarRow, WritableColumnVector} +import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, NullType, ShortType, StringType, TimestampType, YearMonthIntervalType} +import org.apache.spark.sql.utils.SparkSchemaUtil +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} + +case class ProjectColumnarExec(projectList: Seq[NamedExpression], child: SparkPlan) + extends UnaryExecNode + with PartitioningPreservingNodeShim + with OrderPreservingNodeShim + with GlutenPlan { + + @transient override lazy val metrics = Map( + "time" -> SQLMetrics.createTimingMetric(sparkContext, "time of project"), + "column_to_row_time" -> SQLMetrics.createTimingMetric( + sparkContext, + "time of velox to Arrow ColumnarBatch"), + "row_to_column_time" -> SQLMetrics.createTimingMetric( + sparkContext, + "time of Arrow ColumnarBatch to velox") + ) + + private val (mutableProjectList, projectIndexInChild, projectIndexes) = removeAttributeReferenceFromProjectList() + + private val mutableProjectOutput = mutableProjectList.map(_.toAttribute) + + override protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering + + override protected def outputExpressions: Seq[NamedExpression] = projectList + + override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() + + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = + copy(child = newChild) + + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def supportsColumnar: Boolean = true + + + + // Only the mutable data type is valid + private def validateDataType(dataType: DataType): Boolean = { + dataType match { + case _: BooleanType => true + case _: ByteType => true + case _: ShortType => true + case _: IntegerType => true + case _: LongType => true + case _: FloatType => true + case _: DoubleType => true + case _: TimestampType => true + case _: DateType => true + case _: DecimalType => true + case _ => false + } + } + + // Return the expression in projectList to MutableProjection + // Return the index in child output + // Return the index in this operator output + private def removeAttributeReferenceFromProjectList(): (Seq[NamedExpression], Seq[Int], Seq[Int]) = { + val childOutput = child.output + val (attrs, notAttrs) = projectList.zipWithIndex.partition(e => e._1.isInstanceOf[AttributeReference]) + (notAttrs.map(_._1), attrs.map( a => childOutput.indexWhere(a._1.exprId == _.exprId)), attrs.map(_._2)) + } + +// override def doExecuteColumnar(): RDD[ColumnarBatch] = { +// val timeMetric = longMetric("time") +// val c2rTime = longMetric("column_to_row_time") +// val r2cTime = longMetric("row_to_column_time") +// child.executeColumnar().mapPartitions { +// batches => +// val res: Iterator[Iterator[ColumnarBatch]] = new Iterator[Iterator[ColumnarBatch]] { +// override def hasNext: Boolean = batches.hasNext +// +// override def next(): Iterator[ColumnarBatch] = { +// val batch = batches.next() +// if (batch.numRows == 0) { +// Iterator.empty +// } else { +// val start = System.currentTimeMillis() +// val proj = MutableProjection.create(mutableProjectList, child.output) +// val numRows = batch.numRows() +// val c2rStart = System.currentTimeMillis() +// val arrowBatch = +// ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), batch) +// c2rTime += System.currentTimeMillis() - c2rStart +// val selectedBatch = ColumnarBatches.select(batch, projectIndexInChild.toArray) +// +// val schema = +// SparkShimLoader.getSparkShims.structFromAttributes(mutableProjectOutput) +// val vectors: Array[WritableColumnVector] = ArrowWritableColumnVector +// .allocateColumns(numRows, schema) +// .map { +// vector => +// vector.setValueCount(numRows) +// vector.asInstanceOf[WritableColumnVector] +// } +// val targetRow = new MutableColumnarRow(vectors) +// for (i <- 0 until numRows) { +// targetRow.rowId = i +// proj.target(targetRow).apply(arrowBatch.getRow(i)) +// } +// val r2cStart = System.currentTimeMillis() +// val targetBatch = +// new ColumnarBatch(vectors.map(_.asInstanceOf[ColumnVector]), numRows) +// val veloxBatch = ColumnarBatches +// .ensureOffloaded(ArrowBufferAllocators.contextInstance(), targetBatch) +// r2cTime += System.currentTimeMillis() - r2cStart +// val composeBatch = if (selectedBatch.numCols() == 0) { +// ColumnarBatches.retain(veloxBatch) +// veloxBatch +// } else { +// val composeBatch = ColumnarBatches.composeWithReorder(selectedBatch, projectIndexes.toArray, veloxBatch) +// veloxBatch.close() +// composeBatch +// } +// timeMetric += System.currentTimeMillis() - start +// arrowBatch.close() +// targetBatch.close() +// selectedBatch.close() +// Iterator.single(composeBatch) +// } +// } +// } +// Iterators +// .wrap(res.flatten) +// .protectInvocationFlow() // Spark may call `hasNext()` again after a false output which +// // is not allowed by Gluten iterators. E.g. GroupedIterator#fetchNextGroupIterator +// .recyclePayload(_.close()) +// .create() +// } +// } + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val timeMetric = longMetric("time") + val c2rTime = longMetric("column_to_row_time") + val r2cTime = longMetric("row_to_column_time") + val childOutput = child.output + child.executeColumnar().mapPartitions { + batches => + val res: Iterator[Iterator[ColumnarBatch]] = new Iterator[Iterator[ColumnarBatch]] { + override def hasNext: Boolean = batches.hasNext + + override def next(): Iterator[ColumnarBatch] = { + val batch = batches.next() + if (batch.numRows == 0) { + Iterator.empty + } else { + val start = System.currentTimeMillis() + val proj = UnsafeProjection.create(mutableProjectList, childOutput) + val c2rStart = System.currentTimeMillis() + val rows = VeloxColumnarToRowExec.toRowIterator(Iterator.single(batch), childOutput).map(proj) + c2rTime += System.currentTimeMillis() - c2rStart + val selectedBatch = ColumnarBatches.select(batch, projectIndexInChild.toArray) + val schema = + SparkShimLoader.getSparkShims.structFromAttributes(mutableProjectOutput) + val iter = RowToVeloxColumnarExec.toColumnarBatchIterator( + rows, + schema, + batch.numRows()).map { + b => if (selectedBatch.numCols() == 0) { + ColumnarBatches.retain(b) + selectedBatch.close() + b + } else { + print("project batch" + ColumnarBatches.toString(b, 0, 20)) + val composeBatch = ColumnarBatches.composeWithReorder(selectedBatch, projectIndexes.toArray, b) + b.close() + selectedBatch.close() + composeBatch + } + } + timeMetric += System.currentTimeMillis() - start + iter + } + } + } + Iterators + .wrap(res.flatten) + .protectInvocationFlow() // Spark may call `hasNext()` again after a false output which + // is not allowed by Gluten iterators. E.g. GroupedIterator#fetchNextGroupIterator + .recyclePayload(_.close()) + .create() + } + } + + override def verboseStringWithOperatorId(): String = { + s""" + |$formattedNodeName + |${ExplainUtils.generateFieldString("Output", output)} + |${ExplainUtils.generateFieldString("Input", child.output)} + |${ExplainUtils.generateFieldString("MutableProject", mutableProjectList)} + |""".stripMargin + } + + override protected def doValidateInternal(): ValidationResult = { + if (!GlutenConfig.getConf.enableProjectColumnarExec) { + return ValidationResult.failed("Config disable this feature") + } + if (mutableProjectOutput.exists(f => !validateDataType(f.dataType))) { + return ValidationResult.failed("Output type is not mutable") + } + if (mutableProjectOutput.size == projectList.size) { + return ValidationResult.failed("All the project list is needed") + } + ValidationResult.succeeded + } +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala index aa30cc80d4db..6856309ab1e1 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala @@ -48,6 +48,9 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBas val numInputRows = longMetric("numInputRows") val numOutputBatches = longMetric("numOutputBatches") val convertTime = longMetric("convertTime") + + // Instead of creating a new config we are reusing columnBatchSize. In the future if we do + // combine with some of the Arrow conversion tools we will need to unify some of the configs. val numRows = GlutenConfig.getConf.maxBatchSize // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire // plan (this) in the closure. diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/SparkPartialProjectColumnarExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/SparkPartialProjectColumnarExec.scala new file mode 100644 index 000000000000..ba9972fed75d --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/SparkPartialProjectColumnarExec.scala @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.GlutenConfig +import org.apache.gluten.columnarbatch.ColumnarBatches +import org.apache.gluten.extension.{GlutenPlan, ValidationResult} +import org.apache.gluten.extension.columnar.validator.Validator.Passed +import org.apache.gluten.extension.columnar.validator.Validators.FallbackComplexExpressions +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators +import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.gluten.utils.iterator.Iterators +import org.apache.gluten.vectorized.ArrowWritableColumnVector + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, Coalesce, Expression, If, LambdaFunction, MutableProjection, NamedExpression, NaNvl, ScalaUDF, UnsafeProjection} +import org.apache.spark.sql.execution.{ExplainUtils, ProjectExec, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.execution.vectorized.{MutableColumnarRow, WritableColumnVector} +import org.apache.spark.sql.hive.HiveUdfUtil +import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, NullType, ShortType, StringType, TimestampType, YearMonthIntervalType} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} + +import scala.collection.mutable.ListBuffer + +/** + * Change the Project to ProjectExecTransformer + SparkPartialProjectColumnarExec e.g. sum(myudf(a) + * + b + hash(c)), child is (a, b,c ) SparkPartialProjectColumnarExec (a, b, c, myudf(a)), + * ProjectExecTransformer(myudf(a) + b + hash(c)) + * + * @param original + * extract the ScalaUDF from original project list as Alias in UnsafeProjection and + * AttributeReference in SparkPartialProjectColumnarExec output + * @param child + * child plan + */ +case class SparkPartialProjectColumnarExec(original: ProjectExec, child: SparkPlan)( + replacedAliasUdf: ListBuffer[Alias]) + extends UnaryExecNode + with GlutenPlan { + + private val debug = GlutenConfig.getConf.debug + + private val projectAttributes: ListBuffer[Attribute] = ListBuffer() + private val projectIndexInChild: ListBuffer[Int] = ListBuffer() + private var UDFAttrNotExists = false + private var hasComplexDataType = replacedAliasUdf.exists(a => !validateDataType(a.dataType)) + if (!hasComplexDataType) { + getProjectIndexInChildOutput(replacedAliasUdf) + } + + @transient override lazy val metrics = Map( + "time" -> SQLMetrics.createTimingMetric(sparkContext, "time of project"), + "column_to_row_time" -> SQLMetrics.createTimingMetric( + sparkContext, + "time of velox to Arrow ColumnarBatch"), + "row_to_column_time" -> SQLMetrics.createTimingMetric( + sparkContext, + "time of Arrow ColumnarBatch to velox") + ) + + override def output: Seq[Attribute] = child.output ++ replacedAliasUdf.map(_.toAttribute) + + final override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException( + s"${this.getClass.getSimpleName} doesn't support doExecute") + } + + final override protected def otherCopyArgs: Seq[AnyRef] = { + replacedAliasUdf :: Nil + } + + final override lazy val supportsColumnar: Boolean = true + + private def validateExpression(expr: Expression): Boolean = { + expr.deterministic && !expr.isInstanceOf[LambdaFunction] && expr.children + .forall(validateExpression) + } + + private def validateDataType(dataType: DataType): Boolean = { + dataType match { + case _: BooleanType => true + case _: ByteType => true + case _: ShortType => true + case _: IntegerType => true + case _: LongType => true + case _: FloatType => true + case _: DoubleType => true + case _: StringType => true + case _: TimestampType => true + case _: DateType => true + case _: BinaryType => true + case _: DecimalType => true + case YearMonthIntervalType.DEFAULT => true + case _: NullType => true + case _ => false + } + } + + private def getProjectIndexInChildOutput(exprs: Seq[Expression]): Unit = { + exprs.foreach { + case a: AttributeReference => + val index = child.output.indexWhere(s => s.exprId.equals(a.exprId)) + // Some child operator as HashAggregateTransformer will not have udf child column + if (index < 0) { + UDFAttrNotExists = true + log.debug(s"Expression $a should exist in child output ${child.output}") + return + } else if (!validateDataType(a.dataType)) { + hasComplexDataType = true + log.debug(s"Expression $a contains unsupported data type ${a.dataType}") + } else if (!projectIndexInChild.contains(index)) { + projectAttributes.append(a.toAttribute) + projectIndexInChild.append(index) + } + case p => getProjectIndexInChildOutput(p.children) + } + } + + override protected def doValidateInternal(): ValidationResult = { + if (!GlutenConfig.getConf.enableColumnarPartialProject) { + return ValidationResult.failed("Config disable this feature") + } + if (UDFAttrNotExists) { + ValidationResult.failed("Attribute in the UDF does not exists in its child") + } else if (hasComplexDataType) { + ValidationResult.failed("Attribute in the UDF contains unsupported type") + } else if (projectAttributes.size == child.output.size) { + ValidationResult.failed("UDF need all the columns in child output") + } else if (original.output.isEmpty) { + ValidationResult.failed("Project fallback because output is empty") + } else if (replacedAliasUdf.isEmpty) { + ValidationResult.failed("No UDF") + } else if (replacedAliasUdf.size > original.output.size) { + // e.g. udf1(col) + udf2(col), it will introduce 2 cols for r2c + ValidationResult.failed("Number of RowToColumn columns is more than ProjectExec") + } else if (!original.projectList.forall(validateExpression(_))) { + ValidationResult.failed("Contains expression not supported") + } else if (isComplexExpression()) { + ValidationResult.failed("Fallback by complex expression") + } else { + ValidationResult.succeeded + } + } + + private def isComplexExpression(): Boolean = { + new FallbackComplexExpressions(GlutenConfig.getConf.fallbackExpressionsThreshold) + .validate(original) match { + case Passed => false + case _ => true + } + } + + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + val totalTime = longMetric("time") + val c2r = longMetric("column_to_row_time") + val r2c = longMetric("row_to_column_time") + val isMutable = canUseMutableProjection() + child.executeColumnar().mapPartitions { + batches => + val res: Iterator[Iterator[ColumnarBatch]] = new Iterator[Iterator[ColumnarBatch]] { + override def hasNext: Boolean = batches.hasNext + + override def next(): Iterator[ColumnarBatch] = { + val batch = batches.next() + if (batch.numRows == 0) { + Iterator.empty + } else { + val start = System.currentTimeMillis() + val childData = ColumnarBatches.select(batch, projectIndexInChild.toArray) + val projectedBatch = if (isMutable) { + getProjectedBatchArrow(childData, c2r, r2c) + } else getProjectedBatch(childData, c2r, r2c) + val batchIterator = projectedBatch.map { + b => +// print("batch 1" + ColumnarBatches.toString(batch, 0, 20) + "\n") +// print("batch 2" + ColumnarBatches.toString(b, 0, 20) + "\n") + val compositeBatch = if (b.numCols() != 0) { + val handle = ColumnarBatches.compose(batch, b) + b.close() + ColumnarBatches.create(handle) + } else { + b.close() + ColumnarBatches.retain(batch) + batch + } + if (debug && compositeBatch.numCols() != output.length) { + throw new IllegalStateException( + s"Composite batch column number is ${compositeBatch.numCols()}, " + + s"output size is ${output.length}, " + + s"original batch column number is ${batch.numCols()}") + } + compositeBatch + } + childData.close() + totalTime += System.currentTimeMillis() - start + batchIterator + } + } + } + Iterators + .wrap(res.flatten) + .protectInvocationFlow() // Spark may call `hasNext()` again after a false output which + // is not allowed by Gluten iterators. E.g. GroupedIterator#fetchNextGroupIterator + .recyclePayload(_.close()) + .create() + + } + } + + // scalastyle:off line.size.limit + // String type cannot use MutableProjection + // Otherwise will throw java.lang.UnsupportedOperationException: Datatype not supported StringType + // at org.apache.spark.sql.execution.vectorized.MutableColumnarRow.update(MutableColumnarRow.java:224) + // at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source) + // scalastyle:on line.size.limit + private def canUseMutableProjection(): Boolean = { + replacedAliasUdf.forall( + r => + r.dataType match { + case StringType | BinaryType => false + case _ => true + }) + } + + /** + * add c2r and r2c for unsupported expression child data c2r get Iterator[InternalRow], then call + * Spark project, then r2c + */ + private def getProjectedBatch( + childData: ColumnarBatch, + c2r: SQLMetric, + r2c: SQLMetric): Iterator[ColumnarBatch] = { + // select part of child output and child data + val proj = UnsafeProjection.create(replacedAliasUdf, projectAttributes) + val numOutputRows = new SQLMetric("numOutputRows") + val numInputBatches = new SQLMetric("numInputBatches") + val rows = VeloxColumnarToRowExec + .toRowIterator( + Iterator.single[ColumnarBatch](childData), + projectAttributes, + numOutputRows, + numInputBatches, + c2r) + .map(proj) + + val schema = + SparkShimLoader.getSparkShims.structFromAttributes(replacedAliasUdf.map(_.toAttribute)) + RowToVeloxColumnarExec.toColumnarBatchIterator( + rows, + schema, + numOutputRows, + numInputBatches, + r2c, + childData.numRows()) + // TODO: should check the size <= 1, but now it has bug, will change iterator to empty + } + + private def getProjectedBatchArrow( + childData: ColumnarBatch, + c2r: SQLMetric, + r2c: SQLMetric): Iterator[ColumnarBatch] = { + // select part of child output and child data + val proj = MutableProjection.create(replacedAliasUdf, projectAttributes) + val numRows = childData.numRows() + val start = System.currentTimeMillis() + val arrowBatch = + ColumnarBatches.ensureLoaded( + ArrowBufferAllocators.contextInstance(), + childData) + c2r += System.currentTimeMillis() - start + + val schema = + SparkShimLoader.getSparkShims.structFromAttributes(replacedAliasUdf.map(_.toAttribute)) + val vectors: Array[WritableColumnVector] = ArrowWritableColumnVector + .allocateColumns(numRows, schema) + .map { + vector => + vector.setValueCount(numRows) + vector + } + val targetRow = new MutableColumnarRow(vectors) + for (i <- 0 until numRows) { + targetRow.rowId = i + proj.target(targetRow).apply(arrowBatch.getRow(i)) + } + val targetBatch = new ColumnarBatch(vectors.map(_.asInstanceOf[ColumnVector]), numRows) + val start2 = System.currentTimeMillis() + val veloxBatch = + ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), targetBatch) + r2c += System.currentTimeMillis() - start2 + Iterators + .wrap(Iterator.single(veloxBatch)) + .recycleIterator({ + arrowBatch.close() + targetBatch.close() + }) + .create() + // TODO: should check the size <= 1, but now it has bug, will change iterator to empty + } + + override def verboseStringWithOperatorId(): String = { + s""" + |$formattedNodeName + |${ExplainUtils.generateFieldString("Output", output)} + |${ExplainUtils.generateFieldString("Input", child.output)} + |${ExplainUtils.generateFieldString("ScalaUDF", replacedAliasUdf)} + |${ExplainUtils.generateFieldString("ProjectOutput", projectAttributes)} + |${ExplainUtils.generateFieldString("ProjectInputIndex", projectIndexInChild)} + |""".stripMargin + } + + override def simpleString(maxFields: Int): String = + super.simpleString(maxFields) + " PartialProject " + replacedAliasUdf + + override protected def withNewChildInternal( + newChild: SparkPlan): SparkPartialProjectColumnarExec = { + copy(child = newChild)(replacedAliasUdf) + } +} + +object SparkPartialProjectColumnarExec { + + val projectPrefix = "_SparkPartialProject" + + private def containsUDF(expr: Expression): Boolean = { + if (expr == null) return false + expr match { + case _: ScalaUDF => true + case h if HiveUdfUtil.isHiveUdf(h) => true + case p => p.children.exists(c => containsUDF(c)) + } + } + + private def replaceByAlias(expr: Expression, replacedAliasUdf: ListBuffer[Alias]): Expression = { + val replaceIndex = replacedAliasUdf.indexWhere(r => r.child.equals(expr)) + if (replaceIndex == -1) { + val replace = Alias(expr, s"$projectPrefix${replacedAliasUdf.size}")() + replacedAliasUdf.append(replace) + replace.toAttribute + } else { + replacedAliasUdf(replaceIndex).toAttribute + } + } + + private def isConditionalExpression(expr: Expression): Boolean = expr match { + case _: If => true + case _: CaseWhen => true + case _: NaNvl => true + case _: Coalesce => true + case _ => false + } + + private def replaceExpressionUDF( + expr: Expression, + replacedAliasUdf: ListBuffer[Alias]): Expression = { + if (expr == null) return null + expr match { + case u: ScalaUDF => + replaceByAlias(u, replacedAliasUdf) + case h if HiveUdfUtil.isHiveUdf(h) => + replaceByAlias(h, replacedAliasUdf) + case au @ Alias(_: ScalaUDF, _) => + val replaceIndex = replacedAliasUdf.indexWhere(r => r.exprId == au.exprId) + if (replaceIndex == -1) { + replacedAliasUdf.append(au) + au.toAttribute + } else { + replacedAliasUdf(replaceIndex).toAttribute + } + // Alias(HiveSimpleUDF) not exists, only be Alias(ToPrettyString(HiveSimpleUDF)), + // so don't process this condition + case x if isConditionalExpression(x) => + // For example: + // myudf is udf((x: Int) => x + 1) + // if (isnull(cast(l_extendedprice#9 as bigint))) null + // else myudf(knownnotnull(cast(l_extendedprice#9 as bigint))) + // if we extract else branch, and use the data child l_extendedprice, + // the result is incorrect for null value + if (containsUDF(expr)) { + replaceByAlias(expr, replacedAliasUdf) + } else expr + case p => p.withNewChildren(p.children.map(c => replaceExpressionUDF(c, replacedAliasUdf))) + } + } + + def create(original: ProjectExec): ProjectExecTransformer = { + val replacedAliasUdf: ListBuffer[Alias] = ListBuffer() + val newProjectList = original.projectList.map { + p => replaceExpressionUDF(p, replacedAliasUdf).asInstanceOf[NamedExpression] + } + val partialProject = SparkPartialProjectColumnarExec(original, original.child)(replacedAliasUdf) + ProjectExecTransformer(newProjectList, partialProject) + } +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala index 4bd553b01235..34796655f9c7 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala @@ -114,7 +114,6 @@ object VeloxColumnarToRowExec { convertTime ) } - def toRowIterator( batches: Iterator[ColumnarBatch], output: Seq[Attribute], diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/hive/HiveUdfUtil.scala b/backends-velox/src/main/scala/org/apache/spark/sql/hive/HiveUdfUtil.scala new file mode 100644 index 000000000000..76864558960c --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/spark/sql/hive/HiveUdfUtil.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql.catalyst.expressions.Expression + +object HiveUdfUtil { + def isHiveUdf(expr: Expression): Boolean = expr match { + case _: HiveSimpleUDF => true + case _: HiveGenericUDF => true + case _: HiveUDAFFunction => true + case _: HiveGenericUDTF => true + case _ => false + } + +} diff --git a/backends-velox/src/test/scala/org/apache/gluten/expression/ProjectColumnarSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/expression/ProjectColumnarSuite.scala new file mode 100644 index 000000000000..4f4c7d93dbab --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/expression/ProjectColumnarSuite.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.expression + +import org.apache.gluten.execution.{ProjectColumnarExec, WholeStageTransformerSuite} +import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, NullPropagation} +import org.apache.spark.sql.functions.udf + +import java.io.File + +class ProjectColumnarSuite extends WholeStageTransformerSuite { + disableFallbackCheck + override protected val resourcePath: String = "/tpch-data-parquet-velox" + override protected val fileFormat: String = "parquet" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.sql.files.maxPartitionBytes", "1g") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.memory.offHeap.size", "2g") + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set("spark.sql.sources.useV1SourceList", "avro") + .set( + "spark.sql.optimizer.excludedRules", + ConstantFolding.ruleName + "," + + NullPropagation.ruleName) + .set("spark.gluten.sql.columnar.partial.project", "false") + } + + override def beforeAll(): Unit = { + super.beforeAll() + val table = "lineitem" + val tableDir = getClass.getResource(resourcePath).getFile + val tablePath = new File(tableDir, table).getAbsolutePath + val tableDF = spark.read.format(fileFormat).load(tablePath) + tableDF.createOrReplaceTempView(table) + + val plusOne = udf((x: Long) => x + 1) + spark.udf.register("plus_one", plusOne) + val noArgument = udf(() => 15) + spark.udf.register("no_argument", noArgument) + + } + + test("test plus_one") { + runQueryAndCompare("SELECT sum(plus_one(cast(l_orderkey as long))) from lineitem") { + checkGlutenOperatorMatch[ProjectColumnarExec] + } + } + + test("test plus_one with column used twice") { + runQueryAndCompare( + "SELECT sum(plus_one(cast(l_orderkey as long)) + hash(l_orderkey)) from lineitem") { + checkGlutenOperatorMatch[ProjectColumnarExec] + } + } + + test("test plus_one without cast") { + runQueryAndCompare("SELECT sum(plus_one(l_orderkey) + hash(l_orderkey)) from lineitem") { + checkGlutenOperatorMatch[ProjectColumnarExec] + } + } + + test("test plus_one with many columns") { + runQueryAndCompare( + "SELECT sum(plus_one(cast(l_orderkey as long)) + hash(l_partkey))" + + "from lineitem " + + "where l_orderkey < 3") { + checkGlutenOperatorMatch[ProjectColumnarExec] + } + } + + test("test plus_one with many columns in project") { + runQueryAndCompare("SELECT plus_one(cast(l_orderkey as long)), hash(l_partkey), l_partkey from lineitem") { + checkGlutenOperatorMatch[ProjectColumnarExec] + } + } + + test("test function no argument") { + runQueryAndCompare("""SELECT no_argument(), l_orderkey + | from lineitem limit 100""".stripMargin) { + checkGlutenOperatorMatch[ProjectColumnarExec] + } + } + + test("test nondeterministic function input_file_name") { + val df = spark.sql("""SELECT input_file_name(), l_orderkey + | from lineitem limit 100""".stripMargin) + df.collect() + assert( + df.queryExecution.executedPlan + .find(p => p.isInstanceOf[ProjectColumnarExec]) + .isEmpty) + } + + test("udf in agg simple") { + runQueryAndCompare("""select sum(hash(plus_one(l_extendedprice)) + hash(l_orderkey) ) as revenue + | from lineitem""".stripMargin) { + checkGlutenOperatorMatch[ProjectColumnarExec] + } + } + + test("udf in agg") { + runQueryAndCompare("""select sum(hash(plus_one(l_extendedprice)) * l_discount + | + hash(l_orderkey) + hash(l_comment)) as revenue + | from lineitem""".stripMargin) { + checkGlutenOperatorMatch[ProjectColumnarExec] + } + } + + test("udf in agg has 1 column") { + runQueryAndCompare( + """select sum(hash(plus_one(l_extendedprice)) *l_discount + hash(l_orderkey) + | + hash(l_partkey) + hash(l_suppkey) + hash(l_linenumber) + hash(l_comment) + | + hash(l_shipinstruct)) as revenue + | from lineitem;""".stripMargin) { + checkGlutenOperatorMatch[ProjectColumnarExec] + } + } + + test("udf in agg has 2 column") { + runQueryAndCompare( + """select max_by(hash(plus_one(l_extendedprice)) *l_discount + hash(l_orderkey) + | + hash(l_partkey) + hash(l_suppkey) + hash(l_linenumber) + hash(l_comment) + | + hash(l_shipinstruct), l_extendedprice) as revenue + | from lineitem;""".stripMargin) { + checkGlutenOperatorMatch[ProjectColumnarExec] + } + } + +} + diff --git a/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala new file mode 100644 index 000000000000..318df93fd702 --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.expression + +import org.apache.gluten.execution.{SparkPartialProjectColumnarExec, WholeStageTransformerSuite} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, NullPropagation} +import org.apache.spark.sql.functions.udf + +import java.io.File + +class UDFPartialProjectSuite extends WholeStageTransformerSuite { + disableFallbackCheck + override protected val resourcePath: String = "/tpch-data-parquet-velox" + override protected val fileFormat: String = "parquet" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.sql.files.maxPartitionBytes", "1g") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.memory.offHeap.size", "2g") + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set("spark.sql.sources.useV1SourceList", "avro") + .set( + "spark.sql.optimizer.excludedRules", + ConstantFolding.ruleName + "," + + NullPropagation.ruleName) + .set("spark.gluten.sql.debug", "false") + } + + override def beforeAll(): Unit = { + super.beforeAll() + val table = "lineitem" + val tableDir = getClass.getResource(resourcePath).getFile + val tablePath = new File(tableDir, table).getAbsolutePath + val tableDF = spark.read.format(fileFormat).load(tablePath) + tableDF.createOrReplaceTempView(table) + + val plusOne = udf((x: Long) => x + 1) + spark.udf.register("plus_one", plusOne) + val noArgument = udf(() => 15) + spark.udf.register("no_argument", noArgument) + + } + + ignore("test plus_one") { + runQueryAndCompare("SELECT sum(plus_one(cast(l_orderkey as long))) from lineitem") { + checkGlutenOperatorMatch[SparkPartialProjectColumnarExec] + } + } + + ignore("test plus_one with column used twice") { + runQueryAndCompare( + "SELECT sum(plus_one(cast(l_orderkey as long)) + hash(l_orderkey)) from lineitem") { + checkGlutenOperatorMatch[SparkPartialProjectColumnarExec] + } + } + + ignore("test plus_one without cast") { + runQueryAndCompare("SELECT sum(plus_one(l_orderkey) + hash(l_orderkey)) from lineitem") { + checkGlutenOperatorMatch[SparkPartialProjectColumnarExec] + } + } + + test("test plus_one with many columns") { + runQueryAndCompare( + "SELECT sum(plus_one(cast(l_orderkey as long)) + hash(l_partkey))" + + "from lineitem " + + "where l_orderkey < 3") { + checkGlutenOperatorMatch[SparkPartialProjectColumnarExec] + } + } + + test("test plus_one with many columns in project") { + runQueryAndCompare("SELECT plus_one(cast(l_orderkey as long)), hash(l_partkey) from lineitem") { + checkGlutenOperatorMatch[SparkPartialProjectColumnarExec] + } + } + + test("test function no argument") { + runQueryAndCompare("""SELECT no_argument(), l_orderkey + | from lineitem limit 100""".stripMargin) { + checkGlutenOperatorMatch[SparkPartialProjectColumnarExec] + } + } + + test("test nondeterministic function input_file_name") { + val df = spark.sql("""SELECT input_file_name(), l_orderkey + | from lineitem limit 100""".stripMargin) + df.collect() + assert( + df.queryExecution.executedPlan + .find(p => p.isInstanceOf[SparkPartialProjectColumnarExec]) + .isEmpty) + } + + test("udf in agg simple") { + runQueryAndCompare("""select sum(hash(plus_one(l_extendedprice)) + hash(l_orderkey) ) as revenue + | from lineitem""".stripMargin) { + checkGlutenOperatorMatch[SparkPartialProjectColumnarExec] + } + } + + test("udf in agg") { + runQueryAndCompare("""select sum(hash(plus_one(l_extendedprice)) * l_discount + | + hash(l_orderkey) + hash(l_comment)) as revenue + | from lineitem""".stripMargin) { + checkGlutenOperatorMatch[SparkPartialProjectColumnarExec] + } + } + +} diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 1662b200bf8a..3e0d01035cf6 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -687,6 +687,28 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWra JNI_METHOD_END(kInvalidObjectHandle) } +JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_composeWithReorder( // NOLINT + JNIEnv* env, + jobject wrapper, + jlong cb1, + jintArray jcb1Indices, + jlong cb2) { + JNI_METHOD_START + auto ctx = gluten::getRuntime(env, wrapper); + auto safeArray = gluten::getIntArrayElementsSafe(env, jcb1Indices); + int size = env->GetArrayLength(jcb1Indices); + std::vector cb1Indices; + cb1Indices.reserve(size); + for (int32_t i = 0; i < size; i++) { + cb1Indices.push_back(safeArray.elems()[i]); + } + auto batch1 = ObjectStore::retrieve(cb1); + auto batch2 = ObjectStore::retrieve(cb2); + auto newBatch = CompositeReorderColumnarBatch::create(std::move(batch1), std::move(cb1Indices), std::move(batch2)); + return ctx->saveObject(newBatch); + JNI_METHOD_END(kInvalidObjectHandle) +} + JNIEXPORT void JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_exportToArrow( // NOLINT JNIEnv* env, jobject wrapper, diff --git a/cpp/core/memory/ColumnarBatch.cc b/cpp/core/memory/ColumnarBatch.cc index 23567535d50a..200bbf70ba0a 100644 --- a/cpp/core/memory/ColumnarBatch.cc +++ b/cpp/core/memory/ColumnarBatch.cc @@ -213,4 +213,66 @@ void CompositeColumnarBatch::ensureUnderlyingBatchCreated() { arrow::RecordBatch::Make(std::make_shared(fields), numRows(), arrays)); } +std::shared_ptr CompositeReorderColumnarBatch::create( + std::shared_ptr batch1, + std::vector cb1ColumnIndices, + std::shared_ptr batch2) { + int32_t numRows = batch1->numRows(); + if (batch2->numRows() != numRows) { + throw GlutenException( + "Mismatched row counts among the input batches during creating CompositeReorderColumnarBatch"); + } + + GLUTEN_CHECK( + batch1->numColumns() == cb1ColumnIndices.size(), + "Number column of batch1 should be equal to the number of its index"); + + int32_t numColumns = batch1->numColumns() + batch2->numColumns(); + return std::shared_ptr(new CompositeReorderColumnarBatch( + numColumns, numRows, std::move(batch1), std::move(cb1ColumnIndices), std::move(batch2))); +} + +std::string CompositeReorderColumnarBatch::getType() const { + return "composite_reorder"; +} + +int64_t CompositeReorderColumnarBatch::numBytes() { + if (compositeBatch_) { + return compositeBatch_->numBytes(); + } else { + int64_t numBytes = batch1_->numBytes() + batch2_->numBytes(); + return numBytes; + } +} + +std::shared_ptr CompositeReorderColumnarBatch::exportArrowArray() { + ensureUnderlyingBatchCreated(); + return compositeBatch_->exportArrowArray(); +} + +std::shared_ptr CompositeReorderColumnarBatch::exportArrowSchema() { + ensureUnderlyingBatchCreated(); + return compositeBatch_->exportArrowSchema(); +} + +std::vector CompositeReorderColumnarBatch::toUnsafeRow(int32_t rowId) const { + throw gluten::GlutenException("#toUnsafeRow of CompositeReorderColumnarBatch is not implemented"); +} + +CompositeReorderColumnarBatch::CompositeReorderColumnarBatch( + int32_t numColumns, + int32_t numRows, + std::shared_ptr batch1, + std::vector cb1ColumnIndices, + std::shared_ptr batch2) + : ColumnarBatch(numColumns, numRows) { + this->batch1_ = std::move(batch1); + this->batch2_ = std::move(batch2); + this->cb1ColumnIndices_ = std::move(cb1ColumnIndices); +} + +void CompositeReorderColumnarBatch::ensureUnderlyingBatchCreated() { + throw gluten::GlutenException("#ensureUnderlyingBatchCreated of CompositeReorderColumnarBatch is not implemented"); +} + } // namespace gluten diff --git a/cpp/core/memory/ColumnarBatch.h b/cpp/core/memory/ColumnarBatch.h index fd8189aa6a20..ea3dae7b318c 100644 --- a/cpp/core/memory/ColumnarBatch.h +++ b/cpp/core/memory/ColumnarBatch.h @@ -136,6 +136,56 @@ class CompositeColumnarBatch final : public ColumnarBatch { std::shared_ptr compositeBatch_ = nullptr; }; +/** + * A columnar batch implementations that creates a view on top of a couple of child batches. + * Fields in the child batches will be reorganized in the parent batch. + */ +class CompositeReorderColumnarBatch final : public ColumnarBatch { + public: + static std::shared_ptr create( + std::shared_ptr batch1, + std::vector cb1ColumnIndices, + std::shared_ptr batch2); + + std::string getType() const override; + + int64_t numBytes() override; + + std::shared_ptr exportArrowArray() override; + + std::shared_ptr exportArrowSchema() override; + + const std::shared_ptr getBatch1() const { + return batch1_; + } + + const std::shared_ptr getBatch2() const { + return batch2_; + } + + const std::vector getBatch1ColumnIndices() const { + return cb1ColumnIndices_; + } + + std::vector toUnsafeRow(int32_t rowId) const override; + + private: + explicit CompositeReorderColumnarBatch( + int32_t numColumns, + int32_t numRows, + std::shared_ptr batch1, + std::vector cb1ColumnIndices, + std::shared_ptr batch2); + + // We use ArrowColumnarBatch as the way to compose columnar batches + void ensureUnderlyingBatchCreated(); + + std::shared_ptr batch1_; + std::shared_ptr batch2_; + std::vector cb1ColumnIndices_; + std::shared_ptr compositeBatch_ = nullptr; +}; + std::shared_ptr createZeroColumnBatch(int32_t numRows); } // namespace gluten diff --git a/cpp/velox/memory/VeloxColumnarBatch.cc b/cpp/velox/memory/VeloxColumnarBatch.cc index 0d8db312721a..d0ce74bb0b5e 100644 --- a/cpp/velox/memory/VeloxColumnarBatch.cc +++ b/cpp/velox/memory/VeloxColumnarBatch.cc @@ -118,6 +118,30 @@ std::shared_ptr VeloxColumnarBatch::from( auto compositeVeloxVector = makeRowVector(childNames, childVectors, cb->numRows(), pool); return std::make_shared(compositeVeloxVector); } + if (cb->getType() == "composite_reorder") { + // Initialize children as batch2 children, then insert specified columns in batch1 + auto composite = std::dynamic_pointer_cast(cb); + auto vector1 = from(pool, composite->getBatch1())->getRowVector(); + auto vector2 = from(pool, composite->getBatch2())->getRowVector(); + auto& vector1Names = facebook::velox::asRowType(vector1->type())->names(); + auto& vector2Names = facebook::velox::asRowType(vector2->type())->names(); + std::vector childNames; + std::vector childVectors; + childNames.reserve(composite->numColumns()); + childVectors.reserve(composite->numColumns()); + auto& batch1Indices = composite->getBatch1ColumnIndices(); + for (int32_t i = 0, cb1Index = 0, cb2Index = 0; i < composite->numColumns(); i++) { + if (batch1Indices[cb1Index] == i) { + childNames.push_back(vector1Names[cb1Index]); + childVectors.push_back(vector1->childAt(cb1Index++)); + } else { + childNames.push_back(vector2Names[cb2Index]); + childVectors.push_back(vector2->childAt(cb2Index++)); + } + } + auto compositeVeloxVector = makeRowVector(std::move(childNames), std::move(childVectors), cb->numRows(), pool); + return std::make_shared(compositeVeloxVector); + } auto vp = velox::importFromArrowAsOwner(*cb->exportArrowSchema(), *cb->exportArrowArray(), pool); return std::make_shared(std::dynamic_pointer_cast(vp)); } diff --git a/cpp/velox/memory/VeloxColumnarBatch.h b/cpp/velox/memory/VeloxColumnarBatch.h index 6c79f2772d2d..d09684223ed4 100644 --- a/cpp/velox/memory/VeloxColumnarBatch.h +++ b/cpp/velox/memory/VeloxColumnarBatch.h @@ -42,6 +42,7 @@ class VeloxColumnarBatch final : public ColumnarBatch { std::shared_ptr exportArrowSchema() override; std::shared_ptr exportArrowArray() override; std::vector toUnsafeRow(int32_t rowId) const override; + std::shared_ptr select(facebook::velox::memory::MemoryPool* pool, std::vector columnIndices); facebook::velox::RowVectorPtr getRowVector() const; facebook::velox::RowVectorPtr getFlattenedRowVector(); diff --git a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc index 75fa3c3d3262..a247b4021a42 100644 --- a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc +++ b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc @@ -21,6 +21,248 @@ #include "velox/vector/arrow/Bridge.h" using namespace facebook::velox; +namespace { + +inline int64_t calculateBitSetWidthInBytes(int32_t numFields) { + return ((numFields + 63) / 64) * 8; +} + +inline int64_t getFieldOffset(int64_t nullBitsetWidthInBytes, int32_t index) { + return nullBitsetWidthInBytes + 8L * index; +} + +inline bool isNull(uint8_t* buffer_address, int32_t index) { + int64_t mask = 1L << (index & 0x3f); // mod 64 and shift + int64_t wordOffset = (index >> 6) * 8; + int64_t value = *((int64_t*)(buffer_address + wordOffset)); + return (value & mask) != 0; +} + +int32_t getTotalStringSize( + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress) { + size_t size = 0; + for (auto pos = 0; pos < numRows; pos++) { + if (isNull(memoryAddress + offsets[pos], columnIdx)) { + continue; + } + + int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset); + int32_t length = static_cast(offsetAndSize); + if (!StringView::isInline(length)) { + size += length; + } + } + return size; +} + +template +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + using T = typename TypeTraits::NativeType; + auto typeWidth = sizeof(T); + auto column = BaseVector::create>(type, numRows, pool); + auto rawValues = column->template mutableRawValues(); +#if defined(__x86_64__) + auto shift = _tzcnt_u32(typeWidth); +#else + auto shift = __builtin_ctz((uint32_t)typeWidth); +#endif + for (auto pos = 0; pos < numRows; pos++) { + if (!isNull(memoryAddress + offsets[pos], columnIdx)) { + const uint8_t* srcptr = (memoryAddress + offsets[pos] + fieldOffset); + uint8_t* destptr = rawValues + (pos << shift); + memcpy(destptr, srcptr, typeWidth); + } else { + column->setNull(pos, true); + } + } + return column; +} + +template <> +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + auto column = BaseVector::create>(type, numRows, pool); + auto rawValues = column->mutableRawValues(); + auto typeWidth = sizeof(int128_t); +#if defined(__x86_64__) + auto shift = _tzcnt_u32(typeWidth); +#else + auto shift = __builtin_ctz((uint32_t)typeWidth); +#endif + for (auto pos = 0; pos < numRows; pos++) { + if (!isNull(memoryAddress + offsets[pos], columnIdx)) { + uint8_t* destptr = rawValues + (pos << shift); + int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset); + int32_t length = static_cast(offsetAndSize); + int32_t wordoffset = static_cast(offsetAndSize >> 32); + uint8_t bytesValue[length]; + memcpy(bytesValue, memoryAddress + offsets[pos] + wordoffset, length); + uint8_t bytesValue2[16]{}; + for (int k = length - 1; k >= 0; k--) { + bytesValue2[length - 1 - k] = bytesValue[k]; + } + if (int8_t(bytesValue[0]) < 0) { + memset(bytesValue2 + length, 255, 16 - length); + } + memcpy(destptr, bytesValue2, typeWidth); + } else { + column->setNull(pos, true); + } + } + return column; +} + +template <> +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + auto column = BaseVector::create>(type, numRows, pool); + auto rawValues = column->mutableRawValues(); + for (auto pos = 0; pos < numRows; pos++) { + if (!isNull(memoryAddress + offsets[pos], columnIdx)) { + bool value = *(bool*)(memoryAddress + offsets[pos] + fieldOffset); + bits::setBit(rawValues, pos, value); + } else { + column->setNull(pos, true); + } + } + return column; +} + +template <> +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + auto column = BaseVector::create>(type, numRows, pool); + for (auto pos = 0; pos < numRows; pos++) { + if (!isNull(memoryAddress + offsets[pos], columnIdx)) { + int64_t value = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset); + column->set(pos, Timestamp::fromMicros(value)); + } else { + column->setNull(pos, true); + } + } + return column; +} + +VectorPtr createFlatVectorStringView( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + auto column = BaseVector::create>(type, numRows, pool); + auto size = getTotalStringSize(columnIdx, numRows, fieldOffset, offsets, memoryAddress); + char* rawBuffer = column->getRawStringBufferWithSpace(size, true); + for (auto pos = 0; pos < numRows; pos++) { + if (!isNull(memoryAddress + offsets[pos], columnIdx)) { + int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset); + int32_t length = static_cast(offsetAndSize); + int32_t wordoffset = static_cast(offsetAndSize >> 32); + auto valueSrcPtr = memoryAddress + offsets[pos] + wordoffset; + if (StringView::isInline(length)) { + column->set(pos, StringView(reinterpret_cast(valueSrcPtr), length)); + } else { + memcpy(rawBuffer, valueSrcPtr, length); + column->setNoCopy(pos, StringView(rawBuffer, length)); + rawBuffer += length; + } + } else { + column->setNull(pos, true); + } + } + return column; +} + +template <> +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + return createFlatVectorStringView(type, columnIdx, numRows, fieldOffset, offsets, memoryAddress, pool); +} + +template <> +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + return createFlatVectorStringView(type, columnIdx, numRows, fieldOffset, offsets, memoryAddress, pool); +} + +template <> +VectorPtr createFlatVector( + const TypePtr& /*type*/, + int32_t /*columnIdx*/, + int32_t numRows, + int64_t /*fieldOffset*/, + std::vector& /*offsets*/, + uint8_t* /*memoryAddress*/, + memory::MemoryPool* pool) { + auto nulls = allocateNulls(numRows, pool, bits::kNull); + return std::make_shared>( + pool, + UNKNOWN(), + nulls, + numRows, + nullptr, // values + std::vector{}); // stringBuffers +} + +bool supporteType(const RowTypePtr rowType) { + for (auto i = 0; i < rowType->size(); i++) { + auto kind = rowType->childAt(i)->kind(); + switch (kind) { + case TypeKind::ARRAY: + case TypeKind::MAP: + case TypeKind::ROW: + return false; + default: + break; + } + } + return true; +} + +} // namespace + namespace gluten { VeloxRowToColumnarConverter::VeloxRowToColumnarConverter( struct ArrowSchema* cSchema, @@ -32,6 +274,10 @@ VeloxRowToColumnarConverter::VeloxRowToColumnarConverter( std::shared_ptr VeloxRowToColumnarConverter::convert(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress) { + if (supporteType(asRowType(rowType_))) { + return convertPrimitive(numRows, rowLength, memoryAddress); + } + std::vector> data; int64_t offset = 0; for (auto i = 0; i < numRows; i++) { @@ -41,4 +287,28 @@ VeloxRowToColumnarConverter::convert(int64_t numRows, int64_t* rowLength, uint8_ auto vp = row::UnsafeRowDeserializer::deserialize(data, rowType_, pool_.get()); return std::make_shared(std::dynamic_pointer_cast(vp)); } + +std::shared_ptr +VeloxRowToColumnarConverter::convertPrimitive(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress) { + auto numFields = rowType_->size(); + int64_t nullBitsetWidthInBytes = calculateBitSetWidthInBytes(numFields); + std::vector offsets; + offsets.resize(numRows); + for (auto i = 1; i < numRows; i++) { + offsets[i] = offsets[i - 1] + rowLength[i - 1]; + } + + std::vector columns; + columns.resize(numFields); + + for (auto i = 0; i < numFields; i++) { + auto fieldOffset = getFieldOffset(nullBitsetWidthInBytes, i); + auto& type = rowType_->childAt(i); + columns[i] = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( + createFlatVector, type->kind(), type, i, numRows, fieldOffset, offsets, memoryAddress, pool_.get()); + } + + auto rowVector = std::make_shared(pool_.get(), rowType_, BufferPtr(nullptr), numRows, std::move(columns)); + return std::make_shared(rowVector); +} } // namespace gluten diff --git a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h index 30006c4f0757..1fbc4f6021a7 100644 --- a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h +++ b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h @@ -33,7 +33,9 @@ class VeloxRowToColumnarConverter final : public RowToColumnarConverter { std::shared_ptr convert(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress); - protected: + private: + std::shared_ptr convertPrimitive(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress); + facebook::velox::TypePtr rowType_; std::shared_ptr pool_; }; diff --git a/cpp/velox/tests/VeloxColumnarBatchTest.cc b/cpp/velox/tests/VeloxColumnarBatchTest.cc index ba66afb40fdf..34c7dfa86dce 100644 --- a/cpp/velox/tests/VeloxColumnarBatchTest.cc +++ b/cpp/velox/tests/VeloxColumnarBatchTest.cc @@ -61,4 +61,31 @@ TEST_F(VeloxColumnarBatchTest, flattenTruncatedVector) { auto batchOfMap = std::make_shared(inputOfMap); ASSERT_NO_THROW(batchOfMap->getFlattenedRowVector()); } + +TEST_F(VeloxColumnarBatchTest, fromCompositeReorderColumnarBatch) { + std::vector children1 = { + makeNullableFlatVector({1, 2, 3, std::nullopt, 4}), + makeNullableFlatVector({1, -1, std::nullopt, std::nullopt, -2}), + makeNullableFlatVector({1, 2, 3, 4, std::nullopt})}; + + std::vector children2 = { + makeNullableFlatVector({10, 20, 30, std::nullopt, 40}), + makeNullableFlatVector({12, -1, std::nullopt, std::nullopt, -2}), + makeNullableFlatVector({13, 2, 3, 4, std::nullopt})}; + + auto batch1 = std::make_shared(makeRowVector(children1)); + auto batch2 = std::make_shared(makeRowVector(children2)); + std::vector cb1Indices = {1, 4}; + auto reorderBatch = CompositeReorderColumnarBatch::create(batch1, cb1Indices, batch2); + auto vector = VeloxColumnarBatch::from(pool(), reorderBatch)->getRowVector(); + std::cout << "vector content " << vector->toString(0, 10) << std::endl; + std::vector childrenExpected = { + makeNullableFlatVector({10, 20, 30, std::nullopt, 40}), + makeNullableFlatVector({1, -1, std::nullopt, std::nullopt, -2}), + makeNullableFlatVector({12, -1, std::nullopt, std::nullopt, -2}), + makeNullableFlatVector({13, 2, 3, 4, std::nullopt}), + makeNullableFlatVector({1, 2, 3, 4, std::nullopt})}; + auto expectedVector = makeRowVector(childrenExpected); + test::assertEqualVectors(vector, expectedVector); +} } // namespace gluten diff --git a/cpp/velox/tests/VeloxRowToColumnarTest.cc b/cpp/velox/tests/VeloxRowToColumnarTest.cc index c784dbd59c34..0d11dd4acbc9 100644 --- a/cpp/velox/tests/VeloxRowToColumnarTest.cc +++ b/cpp/velox/tests/VeloxRowToColumnarTest.cc @@ -87,10 +87,58 @@ TEST_F(VeloxRowToColumnarTest, allTypes) { makeNullableFlatVector( {std::nullopt, true, false, std::nullopt, true, true, false, true, std::nullopt, std::nullopt}), makeFlatVector( - {"alice0", "bob1", "alice2", "bob3", "Alice4", "Bob5", "AlicE6", "boB7", "ALICE8", "BOB9"}), + {"alice0", + "bob1", + "alice2", + "bob3", + "Alice4", + "Bob5123456789098766notinline", + "AlicE6", + "boB7", + "ALICE8", + "BOB9"}), makeNullableFlatVector( {"alice", "bob", std::nullopt, std::nullopt, "Alice", "Bob", std::nullopt, "alicE", std::nullopt, "boB"}), }); testRowVectorEqual(vector); } + +TEST_F(VeloxRowToColumnarTest, bigint) { + auto vector = makeRowVector({ + makeNullableFlatVector({1, 2, 3, std::nullopt, 4, std::nullopt, 5, 6, std::nullopt, 7}), + }); + testRowVectorEqual(vector); +} + +TEST_F(VeloxRowToColumnarTest, decimal) { + auto vector = makeRowVector({ + makeNullableFlatVector( + {123456, HugeInt::build(1045, 1789), 3678, std::nullopt, 4, std::nullopt, 5, 687987, std::nullopt, 7}, + DECIMAL(38, 2)), + makeNullableFlatVector( + {178987, 2, 3, std::nullopt, 4, std::nullopt, 5, 6, std::nullopt, 7}, DECIMAL(12, 3)), + }); + testRowVectorEqual(vector); +} + +TEST_F(VeloxRowToColumnarTest, timestamp) { + auto vector = makeRowVector({ + makeNullableFlatVector( + {Timestamp(-946684800, 0), + Timestamp(-7266, 0), + Timestamp(0, 0), + Timestamp(946684800, 0), + Timestamp(9466848000, 0), + Timestamp(94668480000, 0), + Timestamp(946729316, 0), + Timestamp(946729316, 0), + Timestamp(946729316, 0), + Timestamp(7266, 0), + Timestamp(-50049331200, 0), + Timestamp(253405036800, 0), + Timestamp(-62480037600, 0), + std::nullopt}), + }); + testRowVectorEqual(vector); +} } // namespace gluten diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index a55926d76d12..fa1758cc1077 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -19,6 +19,7 @@ package org.apache.gluten.backendsapi import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.execution._ import org.apache.gluten.expression._ +import org.apache.gluten.extension.GlutenPlan import org.apache.gluten.extension.columnar.transition.{Convention, ConventionFunc} import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode} @@ -34,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.BuildSide import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} @@ -82,6 +84,10 @@ trait SparkPlanExecApi { child: SparkPlan): ProjectExecTransformer = ProjectExecTransformer.createUnsafe(projectList, child) + def genSparkPartialProjectColumnarExec(original: ProjectExec): GlutenPlan = null + + def genProjectColumnarExec(original: ProjectExec): GlutenPlan = null + /** Generate HashAggregateExecTransformer. */ def genHashAggregateExecTransformer( requiredChildDistributionExpressions: Option[Seq[Expression]], diff --git a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index c5ba3a8a7839..09033ee27aca 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -678,19 +678,31 @@ object ExpressionConverter extends SQLConfHelper with Logging { } private def getAndCheckSubstraitName(expr: Expression, expressionsMap: Map[Class[_], String]) = { + val res = getSubstraitName(expr, expressionsMap) + if (res._1 != null) { + res._1 + } else { + throw new GlutenNotSupportException(res._2) + } + } + + def getSubstraitName( + expr: Expression, + expressionsMap: Map[Class[_], String]): (String, String) = { TestStats.addExpressionClassName(expr.getClass.getName) // Check whether Gluten supports this expression val substraitExprNameOpt = expressionsMap.get(expr.getClass) if (substraitExprNameOpt.isEmpty) { - throw new GlutenNotSupportException( + return ( + null, s"Not supported to map spark function name" + s" to substrait function name: $expr, class name: ${expr.getClass.getSimpleName}.") } val substraitExprName = substraitExprNameOpt.get // Check whether each backend supports this expression if (!BackendsApiManager.getValidatorApiInstance.doExprValidate(substraitExprName, expr)) { - throw new GlutenNotSupportException(s"Not supported: $expr.") + return (null, s"Not supported: $expr.") } - substraitExprName + (substraitExprName, "OK") } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala index b7a30f7e177a..cafb72fbd329 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala @@ -39,7 +39,9 @@ object MiscColumnarRules { OffloadOthers(), OffloadAggregate(), OffloadExchange(), - OffloadJoin() + OffloadJoin(), + OffloadProjectPartial(), + OffloadProjectColumnar() ) ) } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala index 70b85165c37b..c4aed68758ef 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala @@ -120,6 +120,39 @@ case class OffloadExchange() extends OffloadSingleNode with LogLevelUtil { } } +case class OffloadProjectPartial() extends OffloadSingleNode with LogLevelUtil { + override def offload(plan: SparkPlan): SparkPlan = plan match { + case p: ProjectExec if FallbackTags.nonEmpty(p) => + val original = p + val partialProject = BackendsApiManager.getSparkPlanExecApiInstance + .genSparkPartialProjectColumnarExec(original) + if (partialProject != null) { + val projectTransformer = partialProject.asInstanceOf[ProjectExecTransformer] + if (projectTransformer.doValidate().ok()) { + val project = projectTransformer.child.asInstanceOf[GlutenPlan] + if (project.doValidate().ok()) { + partialProject + } else p + } else p + } else p + case other => other + } +} + +case class OffloadProjectColumnar() extends OffloadSingleNode with LogLevelUtil { + override def offload(plan: SparkPlan): SparkPlan = plan match { + case p: ProjectExec if FallbackTags.nonEmpty(p) => + val projectColumnar = BackendsApiManager.getSparkPlanExecApiInstance + .genProjectColumnarExec(p) + if (projectColumnar != null) { + if (projectColumnar.doValidate().ok()) { + projectColumnar + } else p + } else p + case other => other + } +} + // Join transformation. case class OffloadJoin() extends OffloadSingleNode with LogLevelUtil { diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index f1cb4792383b..24cebf960363 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -116,7 +116,7 @@ object Validators { } } - private class FallbackComplexExpressions(threshold: Int) extends Validator { + class FallbackComplexExpressions(threshold: Int) extends Validator { override def validate(plan: SparkPlan): Validator.OutCome = { if (plan.expressions.exists(e => ExpressionUtils.getExpressionTreeDepth(e) > threshold)) { return fail( diff --git a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatchJniWrapper.java b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatchJniWrapper.java index 37376951c543..cbac719d6003 100644 --- a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatchJniWrapper.java +++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatchJniWrapper.java @@ -44,6 +44,16 @@ public static ColumnarBatchJniWrapper create(Runtime runtime) { public native long compose(long[] batches); + /** + * Reorder column to generate a new ColumnarBatch. + * + * @param cb1 column batch + * @param cb1ColumnIndices the index in result columnar batch + * @param cb2 column batch + * @return compose batch handle + */ + public native long composeWithReorder(long cb1, int[] cb1ColumnIndices, long cb2); + public native void exportToArrow(long batch, long cSchema, long cArray); public native long select(long batch, int[] columnIndices); diff --git a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java index fd9c72c36060..88e281c048a0 100644 --- a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java +++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java @@ -315,6 +315,14 @@ public static long compose(ColumnarBatch... batches) { .compose(handles); } + // If cb1ColumnIndices is different with cb1 columns, ignore the front column in cb1 + public static ColumnarBatch composeWithReorder( + ColumnarBatch cb1, int[] cb1ColumnIndices, ColumnarBatch cb2) { + long handle = ColumnarBatchJniWrapper.create(Runtimes.contextInstance("ColumnarBatches#composeWithReorder")) + .composeWithReorder(getNativeHandle(cb1), cb1ColumnIndices, getNativeHandle(cb2)); + return ColumnarBatches.create(handle); + } + private static ColumnarBatch create(IndicatorVector iv) { int numColumns = Math.toIntExact(iv.getNumColumns()); int numRows = Math.toIntExact(iv.getNumRows()); diff --git a/gluten-data/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java b/gluten-data/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java index dfd570debc0a..b6624718a00e 100644 --- a/gluten-data/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java +++ b/gluten-data/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java @@ -63,6 +63,7 @@ import org.slf4j.LoggerFactory; import java.math.BigDecimal; +import java.math.BigInteger; import java.nio.ByteOrder; import java.util.Arrays; import java.util.Date; @@ -718,6 +719,16 @@ public Decimal getDecimal(int rowId, int precision, int scale) { return accessor.getDecimal(rowId, precision, scale); } + public void putDecimal(int rowId, Decimal value, int precision) { + if (precision <= Decimal.MAX_INT_DIGITS()) { + putInt(rowId, (int) value.toUnscaledLong()); + } else if (precision <= Decimal.MAX_LONG_DIGITS()) { + putLong(rowId, value.toUnscaledLong()); + } else { + writer.setBytes(rowId, value.toJavaBigDecimal()); + } + } + @Override public UTF8String getUTF8String(int rowId) { if (isNullAt(rowId)) { @@ -1255,9 +1266,8 @@ void setNull(int rowId) { throw new UnsupportedOperationException(); } - void setNotNull(int rowId) { - throw new UnsupportedOperationException(); - } + // No need to setNotNull then setValue, only setValue is enough + void setNotNull(int rowId) {} void setNulls(int rowId, int count) { throw new UnsupportedOperationException(); diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkSchemaUtil.scala b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkSchemaUtil.scala index b49077bd2740..8e66981ac72f 100644 --- a/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkSchemaUtil.scala +++ b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkSchemaUtil.scala @@ -37,6 +37,16 @@ object SparkSchemaUtil { SparkArrowUtil.toArrowSchema(schema, timeZoneId) } + def checkSchema(schema: StructType): Boolean = { + try { + SparkSchemaUtil.toArrowSchema(schema) + true + } catch { + case _: Exception => + false + } + } + def isTimeZoneIDEquivalentToUTC(zoneId: String): Boolean = { getTimeZoneIDOffset(zoneId) == 0 } diff --git a/gluten-ut/pom.xml b/gluten-ut/pom.xml index a016eccaed20..47a0b92841b1 100644 --- a/gluten-ut/pom.xml +++ b/gluten-ut/pom.xml @@ -79,6 +79,11 @@ spark-core_${scala.binary.version} test-jar + + org.apache.spark + spark-hive_${scala.binary.version} + test-jar + org.apache.spark spark-sql_${scala.binary.version} diff --git a/gluten-ut/spark32/src/test/java/org/apache/gluten/execution/CustomerUDF.java b/gluten-ut/spark32/src/test/java/org/apache/gluten/execution/CustomerUDF.java new file mode 100644 index 000000000000..257bd07021f2 --- /dev/null +++ b/gluten-ut/spark32/src/test/java/org/apache/gluten/execution/CustomerUDF.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDF; + +/** + * UDF that generates a the link id (MD5 hash) of a URL. Used to join with link join. + * + *

Usage example: + * + *

CREATE TEMPORARY FUNCTION linkid AS 'com.pinterest.hadoop.hive.LinkIdUDF'; + */ +@Description( + name = "linkid", + value = "_FUNC_(String) - Returns linkid as String, it's the MD5 hash of url.") +public class CustomerUDF extends UDF { + public String evaluate(String url) { + if (url == null || url == "") { + return ""; + } + return "extendedudf" + url; + } +} diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveUDFSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveUDFSuite.scala new file mode 100644 index 000000000000..cc9f6f1d893a --- /dev/null +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveUDFSuite.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution + +import org.apache.gluten.execution.CustomerUDF + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.internal.config.UI.UI_ENABLED +import org.apache.spark.sql.{GlutenTestsBaseTrait, QueryTest, SparkSession} +import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode +import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation +import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.hive.test.TestHiveContext +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH +import org.apache.spark.sql.test.SQLTestUtils + +import org.scalatest.BeforeAndAfterAll + +import java.io.File + +trait GlutenTestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll { + override protected val enableAutoThreadAudit = false + +} + +object GlutenTestHive + extends TestHiveContext( + new SparkContext( + System.getProperty("spark.sql.test.master", "local[1]"), + "TestSQLContext", + new SparkConf() + .set("spark.sql.test", "") + .set(SQLConf.CODEGEN_FALLBACK.key, "false") + .set(SQLConf.CODEGEN_FACTORY_MODE.key, CodegenObjectFactoryMode.CODEGEN_ONLY.toString) + .set( + HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key, + "org.apache.spark.sql.hive.execution.PairSerDe") + .set(WAREHOUSE_PATH.key, TestHiveContext.makeWarehouseDir().toURI.getPath) + // SPARK-8910 + .set(UI_ENABLED, false) + .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true) + // Hive changed the default of hive.metastore.disallow.incompatible.col.type.changes + // from false to true. For details, see the JIRA HIVE-12320 and HIVE-17764. + .set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes", "false") + .set("spark.driver.memory", "1G") + .set("spark.sql.adaptive.enabled", "true") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.sql.files.maxPartitionBytes", "134217728") + .set("spark.memory.offHeap.enabled", "true") + .set("spark.memory.offHeap.size", "1024MB") + .set("spark.plugins", "org.apache.gluten.GlutenPlugin") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + // Disable ConvertToLocalRelation for better test coverage. Test cases built on + // LocalRelation will exercise the optimization rules better by disabling it as + // this rule may potentially block testing of other optimization rules such as + // ConstantPropagation etc. + .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) + ), + false + ) {} + +class GlutenHiveUDFSuite + extends QueryTest + with GlutenTestHiveSingleton + with SQLTestUtils + with GlutenTestsBaseTrait { + override protected val spark: SparkSession = GlutenTestHive.sparkSession + protected val hiveContext: TestHiveContext = GlutenTestHive + protected val hiveClient: HiveClient = + spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client + + override protected def beforeAll(): Unit = { + super.beforeAll() + val table = "lineitem" + val tableDir = + getClass.getResource("").getPath + "/../../../../../../../../../../../" + + "/backends-velox/src/test/resources/tpch-data-parquet-velox/" + val tablePath = new File(tableDir, table).getAbsolutePath + val tableDF = spark.read.format("parquet").load(tablePath) + tableDF.createOrReplaceTempView(table) + } + + override protected def afterAll(): Unit = { + try { + hiveContext.reset() + } finally { + super.afterAll() + } + } + + override protected def shouldRun(testName: String): Boolean = { + false + } + + test("customer udf") { + sql(s"CREATE TEMPORARY FUNCTION testUDF AS '${classOf[CustomerUDF].getName}'") + val df = spark.sql("""select testUDF(l_comment) + | from lineitem""".stripMargin) + df.show() + print(df.queryExecution.executedPlan) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDF") + hiveContext.reset() + } + + test("customer udf wrapped in function") { + sql(s"CREATE TEMPORARY FUNCTION testUDF AS '${classOf[CustomerUDF].getName}'") + val df = spark.sql("""select hash(testUDF(l_comment)) + | from lineitem""".stripMargin) + df.show() + print(df.queryExecution.executedPlan) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDF") + hiveContext.reset() + } + + test("example") { + spark.sql("CREATE TEMPORARY FUNCTION testUDF AS 'org.apache.hadoop.hive.ql.udf.UDFSubstr';") + spark.sql("select testUDF('l_commen', 1, 5)").show() + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDF") + hiveContext.reset() + } + +} diff --git a/gluten-ut/spark35/src/test/java/org/apache/gluten/execution/CustomerUDF.java b/gluten-ut/spark35/src/test/java/org/apache/gluten/execution/CustomerUDF.java new file mode 100644 index 000000000000..257bd07021f2 --- /dev/null +++ b/gluten-ut/spark35/src/test/java/org/apache/gluten/execution/CustomerUDF.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDF; + +/** + * UDF that generates a the link id (MD5 hash) of a URL. Used to join with link join. + * + *

Usage example: + * + *

CREATE TEMPORARY FUNCTION linkid AS 'com.pinterest.hadoop.hive.LinkIdUDF'; + */ +@Description( + name = "linkid", + value = "_FUNC_(String) - Returns linkid as String, it's the MD5 hash of url.") +public class CustomerUDF extends UDF { + public String evaluate(String url) { + if (url == null || url == "") { + return ""; + } + return "extendedudf" + url; + } +} diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index e8d8730e9366..8dc6baed39b0 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite} import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} import org.apache.spark.sql.gluten.GlutenFallbackSuite -import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite +import org.apache.spark.sql.hive.execution.{GlutenHiveSQLQuerySuite, GlutenHiveUDFSuite} import org.apache.spark.sql.sources._ // Some settings' line length exceeds 100 @@ -1231,6 +1231,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenParquetFileMetadataStructRowIndexSuite] enableSuite[GlutenTableLocationSuite] enableSuite[GlutenRemoveRedundantWindowGroupLimitsSuite] + enableSuite[GlutenHiveUDFSuite] override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveUDFSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveUDFSuite.scala new file mode 100644 index 000000000000..cc9f6f1d893a --- /dev/null +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveUDFSuite.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution + +import org.apache.gluten.execution.CustomerUDF + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.internal.config.UI.UI_ENABLED +import org.apache.spark.sql.{GlutenTestsBaseTrait, QueryTest, SparkSession} +import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode +import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation +import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.hive.test.TestHiveContext +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH +import org.apache.spark.sql.test.SQLTestUtils + +import org.scalatest.BeforeAndAfterAll + +import java.io.File + +trait GlutenTestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll { + override protected val enableAutoThreadAudit = false + +} + +object GlutenTestHive + extends TestHiveContext( + new SparkContext( + System.getProperty("spark.sql.test.master", "local[1]"), + "TestSQLContext", + new SparkConf() + .set("spark.sql.test", "") + .set(SQLConf.CODEGEN_FALLBACK.key, "false") + .set(SQLConf.CODEGEN_FACTORY_MODE.key, CodegenObjectFactoryMode.CODEGEN_ONLY.toString) + .set( + HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key, + "org.apache.spark.sql.hive.execution.PairSerDe") + .set(WAREHOUSE_PATH.key, TestHiveContext.makeWarehouseDir().toURI.getPath) + // SPARK-8910 + .set(UI_ENABLED, false) + .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true) + // Hive changed the default of hive.metastore.disallow.incompatible.col.type.changes + // from false to true. For details, see the JIRA HIVE-12320 and HIVE-17764. + .set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes", "false") + .set("spark.driver.memory", "1G") + .set("spark.sql.adaptive.enabled", "true") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.sql.files.maxPartitionBytes", "134217728") + .set("spark.memory.offHeap.enabled", "true") + .set("spark.memory.offHeap.size", "1024MB") + .set("spark.plugins", "org.apache.gluten.GlutenPlugin") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + // Disable ConvertToLocalRelation for better test coverage. Test cases built on + // LocalRelation will exercise the optimization rules better by disabling it as + // this rule may potentially block testing of other optimization rules such as + // ConstantPropagation etc. + .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) + ), + false + ) {} + +class GlutenHiveUDFSuite + extends QueryTest + with GlutenTestHiveSingleton + with SQLTestUtils + with GlutenTestsBaseTrait { + override protected val spark: SparkSession = GlutenTestHive.sparkSession + protected val hiveContext: TestHiveContext = GlutenTestHive + protected val hiveClient: HiveClient = + spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client + + override protected def beforeAll(): Unit = { + super.beforeAll() + val table = "lineitem" + val tableDir = + getClass.getResource("").getPath + "/../../../../../../../../../../../" + + "/backends-velox/src/test/resources/tpch-data-parquet-velox/" + val tablePath = new File(tableDir, table).getAbsolutePath + val tableDF = spark.read.format("parquet").load(tablePath) + tableDF.createOrReplaceTempView(table) + } + + override protected def afterAll(): Unit = { + try { + hiveContext.reset() + } finally { + super.afterAll() + } + } + + override protected def shouldRun(testName: String): Boolean = { + false + } + + test("customer udf") { + sql(s"CREATE TEMPORARY FUNCTION testUDF AS '${classOf[CustomerUDF].getName}'") + val df = spark.sql("""select testUDF(l_comment) + | from lineitem""".stripMargin) + df.show() + print(df.queryExecution.executedPlan) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDF") + hiveContext.reset() + } + + test("customer udf wrapped in function") { + sql(s"CREATE TEMPORARY FUNCTION testUDF AS '${classOf[CustomerUDF].getName}'") + val df = spark.sql("""select hash(testUDF(l_comment)) + | from lineitem""".stripMargin) + df.show() + print(df.queryExecution.executedPlan) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDF") + hiveContext.reset() + } + + test("example") { + spark.sql("CREATE TEMPORARY FUNCTION testUDF AS 'org.apache.hadoop.hive.ql.udf.UDFSubstr';") + spark.sql("select testUDF('l_commen', 1, 5)").show() + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDF") + hiveContext.reset() + } + +} diff --git a/pom.xml b/pom.xml index 4de4e2158a7f..c9fab345e594 100644 --- a/pom.xml +++ b/pom.xml @@ -179,7 +179,7 @@ -explaintypes -target:jvm-1.8 -Wconf:cat=deprecation:wv,any:e - -Wunused:imports + @@ -663,6 +663,13 @@ test-jar test + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + test-jar + test + org.apache.hadoop hadoop-client @@ -841,7 +848,7 @@ ${scala.recompile.mode} -Wconf:msg=While parsing annotations in:silent,any:e - -Ywarn-unused:imports + -deprecation -feature -Wconf:cat=deprecation:wv,any:e diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 5c032d4b0ef4..d4cfe3c50dd0 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -444,6 +444,10 @@ class GlutenConfig(conf: SQLConf) extends Logging { def enableColumnarProjectCollapse: Boolean = conf.getConf(ENABLE_COLUMNAR_PROJECT_COLLAPSE) + def enableColumnarPartialProject: Boolean = conf.getConf(ENABLE_COLUMNAR_PARTIAL_PROJECT) + + def enableProjectColumnarExec: Boolean = conf.getConf(ENABLE_PROJECT_COLUMNAR_EXEC) + def awsSdkLogLevel: String = conf.getConf(AWS_SDK_LOG_LEVEL) def awsS3RetryMode: String = conf.getConf(AWS_S3_RETRY_MODE) @@ -1846,6 +1850,18 @@ object GlutenConfig { .booleanConf .createWithDefault(true) + val ENABLE_COLUMNAR_PARTIAL_PROJECT = + buildConf("spark.gluten.sql.columnar.partial.project") + .doc("Execute partial project which is not supported in backend in Spark") + .booleanConf + .createWithDefault(true) + + val ENABLE_PROJECT_COLUMNAR_EXEC = + buildConf("spark.gluten.sql.columnar.project.exec") + .doc("Execute project whose input and output is columnar batch, control ProjectColumnarExec") + .booleanConf + .createWithDefault(true) + val ENABLE_COMMON_SUBEXPRESSION_ELIMINATE = buildConf("spark.gluten.sql.commonSubexpressionEliminate") .internal()