Skip to content

Commit

Permalink
[Gluten-core][Vl] supports DeltaLake2.2 read
Browse files Browse the repository at this point in the history
  • Loading branch information
mokunhua authored and 寻径 committed Oct 12, 2023
1 parent dfea063 commit bc6f0f4
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 9 deletions.
5 changes: 5 additions & 0 deletions backends-velox/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@
<version>1.3.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.execution

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row

import java.io.File

class VeloxDeltaSuite extends WholeStageTransformerSuite {

protected val rootPath: String = getClass.getResource("/").getPath
override protected val backend: String = "velox"
override protected val resourcePath: String = "/tpch-data-parquet-velox"
override protected val fileFormat: String = "parquet"

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.sql.files.maxPartitionBytes", "1g")
.set("spark.sql.shuffle.partitions", "1")
.set("spark.memory.offHeap.size", "2g")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
.set("spark.sql.sources.useV1SourceList", "avro")
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
}

test("column mapping mode") {
spark.sql(s"""
|create table delta_cm1 (id int, name string) using delta
|tblproperties ("delta.columnMapping.mode"= "id")
|""".stripMargin)
spark.sql(s"""
|insert into delta_cm1 values (1, "v1"), (2, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_cm1") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)

val df2 = runQueryAndCompare("select name from delta_cm1 where id = 2") { _ => }
checkLengthAndPlan(df2, 1)
checkAnswer(df2, Row("v2") :: Nil)
}
}

class VeloxTPCHDeltaSuite extends VeloxTPCHSuite {
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
}

override protected def createTPCHNotNullTables(): Unit = {
TPCHTables = TPCHTableNames.map {
table =>
val tableDir = getClass.getResource(resourcePath).getFile
val tablePath = new File(tableDir, table).getAbsolutePath
val tableDF = spark.read.format(fileFormat).load(tablePath)
tableDF.write.format("delta").mode("append").saveAsTable(table)
(table, tableDF)
}.toMap
}
}
4 changes: 4 additions & 0 deletions gluten-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
</properties>

<dependencies>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>io.glutenproject</groupId>
<artifactId>gluten-ui</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ class FileSourceScanExecTransformer(
case "DwrfFileFormat" => ReadFileFormat.DwrfReadFormat
case "DeltaMergeTreeFileFormat" => ReadFileFormat.MergeTreeReadFormat
case "CSVFileFormat" => ReadFileFormat.TextReadFormat
case "DeltaParquetFileFormat" => ReadFileFormat.ParquetReadFormat
case _ => ReadFileFormat.UnknownFormat
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,138 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide
import org.apache.spark.sql.catalyst.plans.{LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning}
import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.delta.{DeltaParquetFileFormat, NoMapping}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.EvalPythonExec
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.SparkRuleUtil

import scala.collection._

/**
* This Rule is used for the necessary transformation for SparkPlan, like: 1) Lake format related
* transformation which can't be done in Spark, e.g. Delta lake; 2) Needed to be applied before any
* other Rules, to avoid information lack like Plan Hint tag;
*/
case class RewritePlanIfNeeded(session: SparkSession) extends Rule[SparkPlan] {
def apply(plan: SparkPlan): SparkPlan = {
plan.transformUpWithSubqueries {
// If it enables Delta Column Mapping(e.g. nameMapping and idMapping),
// transform the metadata of Delta into Parquet's,
// so that gluten can read Delta File using Parquet Reader.
case p: FileSourceScanExec if isDeltaColumnMappingFileFormat(p.relation.fileFormat) =>
transformColumnMappingPlan(p)
}
}

/** Check if `FileFormat` is in Delta ColumnMapping Mode(e.g. nameMapping and idMapping). */
private def isDeltaColumnMappingFileFormat(fileFormat: FileFormat): Boolean = fileFormat match {
case d: DeltaParquetFileFormat if d.columnMappingMode != NoMapping =>
true
case _ =>
false
}

/**
* This method is only used for Delta ColumnMapping FileFormat(e.g. nameMapping and idMapping)
* transform the metadata of Delta into Parquet's, each plan should only be transformed once.
*/
private def transformColumnMappingPlan(sPlan: SparkPlan): SparkPlan = sPlan match {
case plan: FileSourceScanExec =>
val fmt = plan.relation.fileFormat.asInstanceOf[DeltaParquetFileFormat]
// a mapping between the table schemas name to parquet schemas.
val columnNameMapping = mutable.Map.empty[String, String]
fmt.referenceSchema.foreach {
f =>
val pName = f.metadata.getString("delta.columnMapping.physicalName")
val lName = f.name
columnNameMapping += (lName -> pName)
}

// transform HadoopFsRelation
val relation = plan.relation
val newDataFields = relation.dataSchema.map(e => e.copy(columnNameMapping(e.name)))
val newPartitionFields = relation.partitionSchema.map {
e => e.copy(columnNameMapping(e.name))
}
val newFsRelation = relation.copy(
partitionSchema = StructType(newPartitionFields),
dataSchema = StructType(newDataFields)
)(session)

// transform output's name into physical name so Reader can read data correctly
// should keep the columns order the same as the origin output
val originColumnNames = mutable.ListBuffer.empty[String]
val transformedAttrs = mutable.ListBuffer.empty[Attribute]
val newOutput = plan.output.map {
o =>
val newAttr = o.withName(columnNameMapping(o.name))
if (!originColumnNames.contains(o.name)) {
transformedAttrs += newAttr
originColumnNames += o.name
}
newAttr
}
// transform dataFilters
val newDataFilters = plan.dataFilters.map {
e =>
e.transformDown {
case attr: AttributeReference =>
val newAttr = attr.withName(columnNameMapping(attr.name)).toAttribute
if (!originColumnNames.contains(attr.name)) {
transformedAttrs += newAttr
originColumnNames += attr.name
}
newAttr
}
}
// transform partitionFilters
val newPartitionFilters = plan.partitionFilters.map {
e =>
e.transformDown {
case attr: AttributeReference =>
val newAttr = attr.withName(columnNameMapping(attr.name)).toAttribute
if (!originColumnNames.contains(attr.name)) {
transformedAttrs += newAttr
originColumnNames += attr.name
}
newAttr
}
}
// replace tableName in schema with physicalName
val newRequiredFields = plan.requiredSchema.map {
e => StructField(columnNameMapping(e.name), e.dataType, e.nullable, e.metadata)
}
val newPlan = FileSourceScanExec(
newFsRelation,
newOutput,
StructType(newRequiredFields),
newPartitionFilters,
plan.optionalBucketSet,
plan.optionalNumCoalescedBuckets,
newDataFilters,
plan.tableIdentifier,
plan.disableBucketedScan
)

// alias physicalName into tableName
val expr = (transformedAttrs, originColumnNames).zipped.map {
(attr, columnName) => Alias(attr, columnName)(exprId = attr.exprId)
}
ProjectExec(expr, newPlan)
case _ => sPlan
}
}

// This rule will conduct the conversion from Spark plan to the plan transformer.
case class TransformPreOverrides(isAdaptiveContext: Boolean)
extends Rule[SparkPlan]
Expand Down Expand Up @@ -810,6 +930,7 @@ case class ColumnarOverrideRules(session: SparkSession)
}
tagBeforeTransformHitsRules :::
List(
(spark: SparkSession) => RewritePlanIfNeeded(spark),
(spark: SparkSession) => PlanOneRowRelation(spark),
(_: SparkSession) => FallbackEmptySchemaRelation(),
(_: SparkSession) => AddTransformHintRule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ abstract class WholeStageTransformerSuite extends GlutenQueryTest with SharedSpa
protected val fileFormat: String
protected val logLevel: String = "WARN"

protected val TPCHTableNames: Seq[String] =
Seq("customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier")

protected var TPCHTables: Map[String, DataFrame] = _

override def beforeAll(): Unit = {
Expand All @@ -54,15 +57,7 @@ abstract class WholeStageTransformerSuite extends GlutenQueryTest with SharedSpa
}

protected def createTPCHNotNullTables(): Unit = {
TPCHTables = Seq(
"customer",
"lineitem",
"nation",
"orders",
"part",
"partsupp",
"region",
"supplier").map {
TPCHTables = TPCHTableNames.map {
table =>
val tableDir = getClass.getResource(resourcePath).getFile
val tablePath = new File(tableDir, table).getAbsolutePath
Expand Down

0 comments on commit bc6f0f4

Please sign in to comment.