Skip to content

Commit

Permalink
[GLUTEN-3378][CORE] Datasource V2 data lake read support (#3843)
Browse files Browse the repository at this point in the history
* Datasource V2 data lake read support

* Remove SupportFormat

* use service loader

* combine api

* remove BatchScanExec in filter pushdown
  • Loading branch information
liujiayi771 authored Dec 5, 2023
1 parent f31cc82 commit a462434
Show file tree
Hide file tree
Showing 33 changed files with 988 additions and 243 deletions.
22 changes: 11 additions & 11 deletions .github/workflows/velox_be.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ jobs:
run: |
docker exec ubuntu2004-test-slow-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss -DargLine="-Dspark.test.home=/opt/spark322" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
mvn clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss -Piceberg -DargLine="-Dspark.test.home=/opt/spark322" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2
run: |
docker exec ubuntu2004-test-slow-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -148,7 +148,7 @@ jobs:
- name: Build and Run unit test for Spark 3.3.1(slow tests)
run: |
docker exec ubuntu2004-test-spark33-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten && \
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.3
run: |
docker exec ubuntu2004-test-spark33-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -184,7 +184,7 @@ jobs:
- name: Build and Run unit test for Spark 3.3.1(other tests)
run: |
docker exec ubuntu2004-test-spark33-$GITHUB_RUN_ID bash -c 'cd /opt/gluten && \
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \
mvn test -Pspark-3.3 -Pbackends-velox -DtagsToExclude=None -DtagsToInclude=io.glutenproject.tags.UDFTest'
- name: Exit docker container
if: ${{ always() }}
Expand Down Expand Up @@ -214,7 +214,7 @@ jobs:
- name: Build and Run unit test for Spark 3.4.1(slow tests)
run: |
docker exec ubuntu2004-test-spark34-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten && \
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.4
run: |
docker exec ubuntu2004-test-spark34-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -250,7 +250,7 @@ jobs:
- name: Build and Run unit test for Spark 3.4.1(other tests)
run: |
docker exec ubuntu2004-test-spark34-$GITHUB_RUN_ID bash -c 'cd /opt/gluten && \
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \
mvn test -Pspark-3.4 -Pbackends-velox -DtagsToExclude=None -DtagsToInclude=io.glutenproject.tags.UDFTest'
- name: Exit docker container
if: ${{ always() }}
Expand Down Expand Up @@ -280,7 +280,7 @@ jobs:
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests'
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests'
- name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.2
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand All @@ -307,7 +307,7 @@ jobs:
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -DskipTests'
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -DskipTests'
- name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.3
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand All @@ -320,7 +320,7 @@ jobs:
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -DskipTests'
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -DskipTests'
- name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.4
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -360,7 +360,7 @@ jobs:
run: |
docker exec centos8-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests'
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests'
- name: TPC-H SF1.0 && TPC-DS SF30.0 Parquet local spark3.2
run: |
docker exec centos8-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -411,7 +411,7 @@ jobs:
run: |
docker exec centos7-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests'
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests'
- name: TPC-H SF1.0 && TPC-DS SF30.0 Parquet local spark3.2
run: |
docker exec centos7-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -493,7 +493,7 @@ jobs:
run: |
docker exec static-build-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests && \
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests && \
cd /opt/gluten/tools/gluten-it && \
mvn clean install -Pspark-3.2'
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 (centos 8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
override def needOutputSchemaForPlan(): Boolean = true

override def allowDecimalArithmetic: Boolean = !SQLConf.get.decimalOperationsAllowPrecisionLoss

override def requiredInputFilePaths(): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,8 @@ case class FilterExecTransformer(condition: Expression, child: SparkPlan)
private def getLeftCondition: Expression = {
val scanFilters = child match {
// Get the filters including the manually pushed down ones.
case batchScanTransformer: BatchScanExecTransformer =>
batchScanTransformer.filterExprs()
case fileScanTransformer: FileSourceScanExecTransformer =>
fileScanTransformer.filterExprs()
case basicScanExecTransformer: BasicScanExecTransformer =>
basicScanExecTransformer.filterExprs()
// For fallback scan, we need to keep original filter.
case _ =>
Seq.empty[Expression]
Expand Down
17 changes: 17 additions & 0 deletions docs/get-started/Velox.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,23 @@ After the two steps, you can query delta table by gluten/velox without scan's fa
Gluten with velox backends also support the column mapping of delta tables.
About column mapping, see more [here](https://docs.delta.io/latest/delta-column-mapping.html).

## Iceberg Support

Gluten with velox backend supports [Iceberg](https://iceberg.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported.

### How to use

First of all, compile gluten-iceberg module by a `iceberg` profile, as follows:

```
mvn clean package -Pbackends-velox -Pspark-3.3 -Piceberg -DskipTests
```

Then, put the additional gluten-iceberg jar to the class path (usually it's `$SPARK_HOME/jars`).
The gluten-iceberg jar is in `gluten-iceberg/target` directory.

After the two steps, you can query iceberg table by gluten/velox without scan's fallback.

# Coverage
Spark3.3 has 387 functions in total. ~240 are commonly used. Velox's functions have two category, Presto and Spark. Presto has 124 functions implemented. Spark has 62 functions. Spark functions are verified to have the same result as Vanilla Spark. Some Presto functions have the same result as Vanilla Spark but some others have different. Gluten prefer to use Spark functions firstly. If it's not in Spark's list but implemented in Presto, we currently offload to Presto one until we noted some result mismatch, then we need to reimplement the function in Spark category. Gluten currently offloads 94 functions and 14 operators, more details refer to [Velox Backend's Supported Operators & Functions](../velox-backend-support-progress.md).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,6 @@ trait BackendSettingsApi {
def requiredChildOrderingForWindow(): Boolean = false

def staticPartitionWriteOnly(): Boolean = false

def requiredInputFilePaths(): Boolean = false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.execution

import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.types.StructType

trait BaseDataSource {

/** Returns the actual schema of this data source scan. */
def getDataSchema: StructType

/** Returns the required partition schema, used to generate partition column. */
def getPartitionSchema: StructType

/** Returns the partitions generated by this data source scan. */
def getPartitions: Seq[InputPartition]

/** Returns the input file paths, used to validate the partition column path */
def getInputFilePathsInternal: Seq[String]
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package io.glutenproject.execution
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer}
import io.glutenproject.extension.{GlutenPlan, ValidationResult}
import io.glutenproject.extension.columnar.TransformHints
import io.glutenproject.metrics.MetricsUpdater
import io.glutenproject.sql.shims.SparkShimLoader
import io.glutenproject.substrait.`type`.TypeBuilder
import io.glutenproject.substrait.SubstraitContext
import io.glutenproject.substrait.extensions.ExtensionBuilder
Expand Down Expand Up @@ -416,53 +414,15 @@ object FilterHandler {

// Separate and compare the filter conditions in Scan and Filter.
// Push down the left conditions in Filter into Scan.
def applyFilterPushdownToScan(plan: FilterExec, reuseSubquery: Boolean): SparkPlan =
plan.child match {
def applyFilterPushdownToScan(filter: FilterExec, reuseSubquery: Boolean): GlutenPlan =
filter.child match {
case fileSourceScan: FileSourceScanExec =>
val leftFilters =
getLeftFilters(fileSourceScan.dataFilters, flattenCondition(plan.condition))
// transform BroadcastExchangeExec to ColumnarBroadcastExchangeExec in partitionFilters
val newPartitionFilters =
ExpressionConverter.transformDynamicPruningExpr(
fileSourceScan.partitionFilters,
reuseSubquery)
new FileSourceScanExecTransformer(
fileSourceScan.relation,
fileSourceScan.output,
fileSourceScan.requiredSchema,
newPartitionFilters,
fileSourceScan.optionalBucketSet,
fileSourceScan.optionalNumCoalescedBuckets,
fileSourceScan.dataFilters ++ leftFilters,
fileSourceScan.tableIdentifier,
fileSourceScan.disableBucketedScan
)
case batchScan: BatchScanExec =>
batchScan.scan match {
case scan: FileScan =>
val leftFilters =
getLeftFilters(scan.dataFilters, flattenCondition(plan.condition))
val newPartitionFilters =
ExpressionConverter.transformDynamicPruningExpr(scan.partitionFilters, reuseSubquery)
new BatchScanExecTransformer(
batchScan.output,
scan,
leftFilters ++ newPartitionFilters,
table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScan))
case _ =>
if (batchScan.runtimeFilters.isEmpty) {
throw new UnsupportedOperationException(
s"${batchScan.scan.getClass.toString} is not supported.")
} else {
// IF filter expressions aren't empty, we need to transform the inner operators.
val newSource = batchScan.copy(runtimeFilters = ExpressionConverter
.transformDynamicPruningExpr(batchScan.runtimeFilters, reuseSubquery))
TransformHints.tagNotTransformable(
newSource,
"The scan in BatchScanExec is not a FileScan")
newSource
}
}
getLeftFilters(fileSourceScan.dataFilters, flattenCondition(filter.condition))
ScanTransformerFactory.createFileSourceScanTransformer(
fileSourceScan,
reuseSubquery,
extraFilters = leftFilters)
case other =>
throw new UnsupportedOperationException(s"${other.getClass.toString} is not supported.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,45 @@ import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.expression.{ConverterUtils, ExpressionConverter}
import io.glutenproject.extension.ValidationResult
import io.glutenproject.substrait.`type`.ColumnTypeNode
import io.glutenproject.substrait.{SubstraitContext, SupportFormat}
import io.glutenproject.substrait.SubstraitContext
import io.glutenproject.substrait.plan.PlanBuilder
import io.glutenproject.substrait.rel.{ReadRelNode, RelBuilder, SplitInfo}
import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.InSubqueryExec
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression}
import org.apache.spark.sql.vectorized.ColumnarBatch

import com.google.common.collect.Lists

import scala.collection.JavaConverters._

trait BasicScanExecTransformer extends LeafTransformSupport with SupportFormat {

// The key of merge schema option in Parquet reader.
protected val mergeSchemaOptionKey = "mergeschema"
trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource {

/** Returns the filters that can be pushed down to native file scan */
def filterExprs(): Seq[Expression]

def outputAttributes(): Seq[Attribute]

def getPartitions: Seq[InputPartition]

def getPartitionSchemas: StructType

def getDataSchemas: StructType
/** This can be used to report FileFormat for a file based scan operator. */
val fileFormat: ReadFileFormat

// TODO: Remove this expensive call when CH support scan custom partition location.
def getInputFilePaths: Seq[String]
def getInputFilePaths: Seq[String] = {
// This is a heavy operation, and only the required backend executes the corresponding logic.
if (BackendsApiManager.getSettings.requiredInputFilePaths()) {
getInputFilePathsInternal
} else {
Seq.empty
}
}

def getSplitInfos: Seq[SplitInfo] =
/** Returns the split infos that will be processed by the underlying native engine. */
def getSplitInfos: Seq[SplitInfo] = {
getPartitions.map(
BackendsApiManager.getIteratorApiInstance
.genSplitInfo(_, getPartitionSchemas, fileFormat))
.genSplitInfo(_, getPartitionSchema, fileFormat))
}

def doExecuteColumnarInternal(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("outputRows")
Expand Down Expand Up @@ -85,13 +87,12 @@ trait BasicScanExecTransformer extends LeafTransformSupport with SupportFormat {
.supportFileFormatRead(
fileFormat,
schema.fields,
getPartitionSchemas.nonEmpty,
getPartitionSchema.nonEmpty,
getInputFilePaths)
) {
return ValidationResult.notOk(
s"Not supported file format or complex type for scan: $fileFormat")
}

val substraitContext = new SubstraitContext
val relNode = doTransform(substraitContext).root

Expand All @@ -102,10 +103,9 @@ trait BasicScanExecTransformer extends LeafTransformSupport with SupportFormat {
val output = outputAttributes()
val typeNodes = ConverterUtils.collectAttributeTypeNodes(output)
val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output)
val partitionSchemas = getPartitionSchemas
val columnTypeNodes = output.map {
attr =>
if (partitionSchemas.exists(_.name.equals(attr.name))) {
if (getPartitionSchema.exists(_.name.equals(attr.name))) {
new ColumnTypeNode(1)
} else {
new ColumnTypeNode(0)
Expand All @@ -125,11 +125,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with SupportFormat {
exprNode,
context,
context.nextOperatorId(this.nodeName))
relNode.asInstanceOf[ReadRelNode].setDataSchema(getDataSchemas)
relNode.asInstanceOf[ReadRelNode].setDataSchema(getDataSchema)
TransformContext(output, output, relNode)
}

def executeInSubqueryForDynamicPruningExpression(inSubquery: InSubqueryExec): Unit = {
if (inSubquery.values().isEmpty) inSubquery.updateResult()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,17 @@ class BatchScanExecTransformer(

override def getPartitions: Seq[InputPartition] = filteredFlattenPartitions

override def getPartitionSchemas: StructType = scan match {
override def getPartitionSchema: StructType = scan match {
case fileScan: FileScan => fileScan.readPartitionSchema
case _ => new StructType()
}

override def getDataSchemas: StructType = scan match {
override def getDataSchema: StructType = scan match {
case fileScan: FileScan => fileScan.readDataSchema
case _ => new StructType()
}

override def getInputFilePaths: Seq[String] = {
override def getInputFilePathsInternal: Seq[String] = {
scan match {
case fileScan: FileScan => fileScan.fileIndex.inputFiles.toSeq
case _ => Seq.empty
Expand Down
Loading

0 comments on commit a462434

Please sign in to comment.