diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml index 4ad40b07e82e..656e04fcc84b 100755 --- a/backends-velox/pom.xml +++ b/backends-velox/pom.xml @@ -185,6 +185,11 @@ 1.3.0 compile + + io.delta + delta-core_${scala.binary.version} + test + diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxDeltaSuite.scala b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxDeltaSuite.scala new file mode 100644 index 000000000000..31007588c723 --- /dev/null +++ b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxDeltaSuite.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import org.apache.spark.SparkConf +import org.apache.spark.sql.Row + +import java.io.File + +class VeloxDeltaSuite extends WholeStageTransformerSuite { + + protected val rootPath: String = getClass.getResource("/").getPath + override protected val backend: String = "velox" + override protected val resourcePath: String = "/tpch-data-parquet-velox" + override protected val fileFormat: String = "parquet" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.sql.files.maxPartitionBytes", "1g") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.memory.offHeap.size", "2g") + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set("spark.sql.sources.useV1SourceList", "avro") + .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + } + + test("column mapping mode") { + spark.sql(s""" + |create table delta_cm1 (id int, name string) using delta + |tblproperties ("delta.columnMapping.mode"= "id") + |""".stripMargin) + spark.sql(s""" + |insert into delta_cm1 values (1, "v1"), (2, "v2") + |""".stripMargin) + val df1 = runQueryAndCompare("select * from delta_cm1") { _ => } + checkLengthAndPlan(df1, 2) + checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil) + + val df2 = runQueryAndCompare("select name from delta_cm1 where id = 2") { _ => } + checkLengthAndPlan(df2, 1) + checkAnswer(df2, Row("v2") :: Nil) + } +} + +class VeloxTPCHDeltaSuite extends VeloxTPCHSuite { + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + } + + override protected def createTPCHNotNullTables(): Unit = { + TPCHTables = TPCHTableNames.map { + table => + val tableDir = getClass.getResource(resourcePath).getFile + val tablePath = new File(tableDir, table).getAbsolutePath + val tableDF = spark.read.format(fileFormat).load(tablePath) + tableDF.write.format("delta").mode("append").saveAsTable(table) + (table, tableDF) + }.toMap + } +} diff --git a/gluten-core/pom.xml b/gluten-core/pom.xml index 14fa2bdfabc2..4cc37259dfd9 100644 --- a/gluten-core/pom.xml +++ b/gluten-core/pom.xml @@ -19,6 +19,10 @@ + + io.delta + delta-core_${scala.binary.version} + io.glutenproject gluten-ui diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala index ffe1b6015f2b..eb460ec9c1d1 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala @@ -285,6 +285,7 @@ class FileSourceScanExecTransformer( case "DwrfFileFormat" => ReadFileFormat.DwrfReadFormat case "DeltaMergeTreeFileFormat" => ReadFileFormat.MergeTreeReadFormat case "CSVFileFormat" => ReadFileFormat.TextReadFormat + case "DeltaParquetFileFormat" => ReadFileFormat.ParquetReadFormat case _ => ReadFileFormat.UnknownFormat } diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index 2497439725b7..6b03088056ef 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -32,18 +32,138 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide import org.apache.spark.sql.catalyst.plans.{LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning} import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} +import org.apache.spark.sql.delta.{DeltaParquetFileFormat, NoMapping} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python.EvalPythonExec import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.hive.HiveTableScanExecTransformer +import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.util.SparkRuleUtil +import scala.collection._ + +/** + * This Rule is used for the necessary transformation for SparkPlan, like: 1) Lake format related + * transformation which can't be done in Spark, e.g. Delta lake; 2) Needed to be applied before any + * other Rules, to avoid information lack like Plan Hint tag; + */ +case class RewritePlanIfNeeded(session: SparkSession) extends Rule[SparkPlan] { + def apply(plan: SparkPlan): SparkPlan = { + plan.transformUpWithSubqueries { + // If it enables Delta Column Mapping(e.g. nameMapping and idMapping), + // transform the metadata of Delta into Parquet's, + // so that gluten can read Delta File using Parquet Reader. + case p: FileSourceScanExec if isDeltaColumnMappingFileFormat(p.relation.fileFormat) => + transformColumnMappingPlan(p) + } + } + + /** Check if `FileFormat` is in Delta ColumnMapping Mode(e.g. nameMapping and idMapping). */ + private def isDeltaColumnMappingFileFormat(fileFormat: FileFormat): Boolean = fileFormat match { + case d: DeltaParquetFileFormat if d.columnMappingMode != NoMapping => + true + case _ => + false + } + + /** + * This method is only used for Delta ColumnMapping FileFormat(e.g. nameMapping and idMapping) + * transform the metadata of Delta into Parquet's, each plan should only be transformed once. + */ + private def transformColumnMappingPlan(sPlan: SparkPlan): SparkPlan = sPlan match { + case plan: FileSourceScanExec => + val fmt = plan.relation.fileFormat.asInstanceOf[DeltaParquetFileFormat] + // a mapping between the table schemas name to parquet schemas. + val columnNameMapping = mutable.Map.empty[String, String] + fmt.referenceSchema.foreach { + f => + val pName = f.metadata.getString("delta.columnMapping.physicalName") + val lName = f.name + columnNameMapping += (lName -> pName) + } + + // transform HadoopFsRelation + val relation = plan.relation + val newDataFields = relation.dataSchema.map(e => e.copy(columnNameMapping(e.name))) + val newPartitionFields = relation.partitionSchema.map { + e => e.copy(columnNameMapping(e.name)) + } + val newFsRelation = relation.copy( + partitionSchema = StructType(newPartitionFields), + dataSchema = StructType(newDataFields) + )(session) + + // transform output's name into physical name so Reader can read data correctly + // should keep the columns order the same as the origin output + val originColumnNames = mutable.ListBuffer.empty[String] + val transformedAttrs = mutable.ListBuffer.empty[Attribute] + val newOutput = plan.output.map { + o => + val newAttr = o.withName(columnNameMapping(o.name)) + if (!originColumnNames.contains(o.name)) { + transformedAttrs += newAttr + originColumnNames += o.name + } + newAttr + } + // transform dataFilters + val newDataFilters = plan.dataFilters.map { + e => + e.transformDown { + case attr: AttributeReference => + val newAttr = attr.withName(columnNameMapping(attr.name)).toAttribute + if (!originColumnNames.contains(attr.name)) { + transformedAttrs += newAttr + originColumnNames += attr.name + } + newAttr + } + } + // transform partitionFilters + val newPartitionFilters = plan.partitionFilters.map { + e => + e.transformDown { + case attr: AttributeReference => + val newAttr = attr.withName(columnNameMapping(attr.name)).toAttribute + if (!originColumnNames.contains(attr.name)) { + transformedAttrs += newAttr + originColumnNames += attr.name + } + newAttr + } + } + // replace tableName in schema with physicalName + val newRequiredFields = plan.requiredSchema.map { + e => StructField(columnNameMapping(e.name), e.dataType, e.nullable, e.metadata) + } + val newPlan = FileSourceScanExec( + newFsRelation, + newOutput, + StructType(newRequiredFields), + newPartitionFilters, + plan.optionalBucketSet, + plan.optionalNumCoalescedBuckets, + newDataFilters, + plan.tableIdentifier, + plan.disableBucketedScan + ) + + // alias physicalName into tableName + val expr = (transformedAttrs, originColumnNames).zipped.map { + (attr, columnName) => Alias(attr, columnName)(exprId = attr.exprId) + } + ProjectExec(expr, newPlan) + case _ => sPlan + } +} + // This rule will conduct the conversion from Spark plan to the plan transformer. case class TransformPreOverrides(isAdaptiveContext: Boolean) extends Rule[SparkPlan] @@ -810,6 +930,7 @@ case class ColumnarOverrideRules(session: SparkSession) } tagBeforeTransformHitsRules ::: List( + (spark: SparkSession) => RewritePlanIfNeeded(spark), (spark: SparkSession) => PlanOneRowRelation(spark), (_: SparkSession) => FallbackEmptySchemaRelation(), (_: SparkSession) => AddTransformHintRule(), diff --git a/gluten-core/src/test/scala/io/glutenproject/execution/WholeStageTransformerSuite.scala b/gluten-core/src/test/scala/io/glutenproject/execution/WholeStageTransformerSuite.scala index 818489df053b..a6a9e5c976f1 100644 --- a/gluten-core/src/test/scala/io/glutenproject/execution/WholeStageTransformerSuite.scala +++ b/gluten-core/src/test/scala/io/glutenproject/execution/WholeStageTransformerSuite.scala @@ -39,6 +39,9 @@ abstract class WholeStageTransformerSuite extends GlutenQueryTest with SharedSpa protected val fileFormat: String protected val logLevel: String = "WARN" + protected val TPCHTableNames: Seq[String] = + Seq("customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier") + protected var TPCHTables: Map[String, DataFrame] = _ override def beforeAll(): Unit = { @@ -54,15 +57,7 @@ abstract class WholeStageTransformerSuite extends GlutenQueryTest with SharedSpa } protected def createTPCHNotNullTables(): Unit = { - TPCHTables = Seq( - "customer", - "lineitem", - "nation", - "orders", - "part", - "partsupp", - "region", - "supplier").map { + TPCHTables = TPCHTableNames.map { table => val tableDir = getClass.getResource(resourcePath).getFile val tablePath = new File(tableDir, table).getAbsolutePath