Skip to content

Commit

Permalink
Revert "[GLUTEN-3378][CORE] Datasource V2 data lake read support (apa…
Browse files Browse the repository at this point in the history
…che#3843)"

This reverts commit a462434.
  • Loading branch information
loneylee committed Dec 7, 2023
1 parent 82d0a14 commit 809fae8
Show file tree
Hide file tree
Showing 33 changed files with 242 additions and 989 deletions.
22 changes: 11 additions & 11 deletions .github/workflows/velox_be.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ jobs:
run: |
docker exec ubuntu2004-test-slow-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss -Piceberg -DargLine="-Dspark.test.home=/opt/spark322" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
mvn clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss -DargLine="-Dspark.test.home=/opt/spark322" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2
run: |
docker exec ubuntu2004-test-slow-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -148,7 +148,7 @@ jobs:
- name: Build and Run unit test for Spark 3.3.1(slow tests)
run: |
docker exec ubuntu2004-test-spark33-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten && \
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.3
run: |
docker exec ubuntu2004-test-spark33-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -184,7 +184,7 @@ jobs:
- name: Build and Run unit test for Spark 3.3.1(other tests)
run: |
docker exec ubuntu2004-test-spark33-$GITHUB_RUN_ID bash -c 'cd /opt/gluten && \
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \
mvn test -Pspark-3.3 -Pbackends-velox -DtagsToExclude=None -DtagsToInclude=io.glutenproject.tags.UDFTest'
- name: Exit docker container
if: ${{ always() }}
Expand Down Expand Up @@ -214,7 +214,7 @@ jobs:
- name: Build and Run unit test for Spark 3.4.1(slow tests)
run: |
docker exec ubuntu2004-test-spark34-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten && \
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.4
run: |
docker exec ubuntu2004-test-spark34-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -250,7 +250,7 @@ jobs:
- name: Build and Run unit test for Spark 3.4.1(other tests)
run: |
docker exec ubuntu2004-test-spark34-$GITHUB_RUN_ID bash -c 'cd /opt/gluten && \
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \
mvn test -Pspark-3.4 -Pbackends-velox -DtagsToExclude=None -DtagsToInclude=io.glutenproject.tags.UDFTest'
- name: Exit docker container
if: ${{ always() }}
Expand Down Expand Up @@ -280,7 +280,7 @@ jobs:
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests'
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests'
- name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.2
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand All @@ -307,7 +307,7 @@ jobs:
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -DskipTests'
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -DskipTests'
- name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.3
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand All @@ -320,7 +320,7 @@ jobs:
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -DskipTests'
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -DskipTests'
- name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.4
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -360,7 +360,7 @@ jobs:
run: |
docker exec centos8-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests'
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests'
- name: TPC-H SF1.0 && TPC-DS SF30.0 Parquet local spark3.2
run: |
docker exec centos8-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -411,7 +411,7 @@ jobs:
run: |
docker exec centos7-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests'
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests'
- name: TPC-H SF1.0 && TPC-DS SF30.0 Parquet local spark3.2
run: |
docker exec centos7-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -493,7 +493,7 @@ jobs:
run: |
docker exec static-build-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests && \
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests && \
cd /opt/gluten/tools/gluten-it && \
mvn clean install -Pspark-3.2'
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 (centos 8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,4 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
override def needOutputSchemaForPlan(): Boolean = true

override def allowDecimalArithmetic: Boolean = !SQLConf.get.decimalOperationsAllowPrecisionLoss

override def requiredInputFilePaths(): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ case class FilterExecTransformer(condition: Expression, child: SparkPlan)
private def getLeftCondition: Expression = {
val scanFilters = child match {
// Get the filters including the manually pushed down ones.
case basicScanExecTransformer: BasicScanExecTransformer =>
basicScanExecTransformer.filterExprs()
case batchScanTransformer: BatchScanExecTransformer =>
batchScanTransformer.filterExprs()
case fileScanTransformer: FileSourceScanExecTransformer =>
fileScanTransformer.filterExprs()
// For fallback scan, we need to keep original filter.
case _ =>
Seq.empty[Expression]
Expand Down
17 changes: 0 additions & 17 deletions docs/get-started/Velox.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,23 +260,6 @@ After the two steps, you can query delta table by gluten/velox without scan's fa
Gluten with velox backends also support the column mapping of delta tables.
About column mapping, see more [here](https://docs.delta.io/latest/delta-column-mapping.html).

## Iceberg Support

Gluten with velox backend supports [Iceberg](https://iceberg.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported.

### How to use

First of all, compile gluten-iceberg module by a `iceberg` profile, as follows:

```
mvn clean package -Pbackends-velox -Pspark-3.3 -Piceberg -DskipTests
```

Then, put the additional gluten-iceberg jar to the class path (usually it's `$SPARK_HOME/jars`).
The gluten-iceberg jar is in `gluten-iceberg/target` directory.

After the two steps, you can query iceberg table by gluten/velox without scan's fallback.

# Coverage
Spark3.3 has 387 functions in total. ~240 are commonly used. Velox's functions have two category, Presto and Spark. Presto has 124 functions implemented. Spark has 62 functions. Spark functions are verified to have the same result as Vanilla Spark. Some Presto functions have the same result as Vanilla Spark but some others have different. Gluten prefer to use Spark functions firstly. If it's not in Spark's list but implemented in Presto, we currently offload to Presto one until we noted some result mismatch, then we need to reimplement the function in Spark category. Gluten currently offloads 94 functions and 14 operators, more details refer to [Velox Backend's Supported Operators & Functions](../velox-backend-support-progress.md).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,4 @@ trait BackendSettingsApi {
def requiredChildOrderingForWindow(): Boolean = false

def staticPartitionWriteOnly(): Boolean = false

def requiredInputFilePaths(): Boolean = false
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package io.glutenproject.execution
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer}
import io.glutenproject.extension.{GlutenPlan, ValidationResult}
import io.glutenproject.extension.columnar.TransformHints
import io.glutenproject.metrics.MetricsUpdater
import io.glutenproject.sql.shims.SparkShimLoader
import io.glutenproject.substrait.`type`.TypeBuilder
import io.glutenproject.substrait.SubstraitContext
import io.glutenproject.substrait.extensions.ExtensionBuilder
Expand Down Expand Up @@ -414,15 +416,53 @@ object FilterHandler {

// Separate and compare the filter conditions in Scan and Filter.
// Push down the left conditions in Filter into Scan.
def applyFilterPushdownToScan(filter: FilterExec, reuseSubquery: Boolean): GlutenPlan =
filter.child match {
def applyFilterPushdownToScan(plan: FilterExec, reuseSubquery: Boolean): SparkPlan =
plan.child match {
case fileSourceScan: FileSourceScanExec =>
val leftFilters =
getLeftFilters(fileSourceScan.dataFilters, flattenCondition(filter.condition))
ScanTransformerFactory.createFileSourceScanTransformer(
fileSourceScan,
reuseSubquery,
extraFilters = leftFilters)
getLeftFilters(fileSourceScan.dataFilters, flattenCondition(plan.condition))
// transform BroadcastExchangeExec to ColumnarBroadcastExchangeExec in partitionFilters
val newPartitionFilters =
ExpressionConverter.transformDynamicPruningExpr(
fileSourceScan.partitionFilters,
reuseSubquery)
new FileSourceScanExecTransformer(
fileSourceScan.relation,
fileSourceScan.output,
fileSourceScan.requiredSchema,
newPartitionFilters,
fileSourceScan.optionalBucketSet,
fileSourceScan.optionalNumCoalescedBuckets,
fileSourceScan.dataFilters ++ leftFilters,
fileSourceScan.tableIdentifier,
fileSourceScan.disableBucketedScan
)
case batchScan: BatchScanExec =>
batchScan.scan match {
case scan: FileScan =>
val leftFilters =
getLeftFilters(scan.dataFilters, flattenCondition(plan.condition))
val newPartitionFilters =
ExpressionConverter.transformDynamicPruningExpr(scan.partitionFilters, reuseSubquery)
new BatchScanExecTransformer(
batchScan.output,
scan,
leftFilters ++ newPartitionFilters,
table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScan))
case _ =>
if (batchScan.runtimeFilters.isEmpty) {
throw new UnsupportedOperationException(
s"${batchScan.scan.getClass.toString} is not supported.")
} else {
// IF filter expressions aren't empty, we need to transform the inner operators.
val newSource = batchScan.copy(runtimeFilters = ExpressionConverter
.transformDynamicPruningExpr(batchScan.runtimeFilters, reuseSubquery))
TransformHints.tagNotTransformable(
newSource,
"The scan in BatchScanExec is not a FileScan")
newSource
}
}
case other =>
throw new UnsupportedOperationException(s"${other.getClass.toString} is not supported.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,43 @@ import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.expression.{ConverterUtils, ExpressionConverter}
import io.glutenproject.extension.ValidationResult
import io.glutenproject.substrait.`type`.ColumnTypeNode
import io.glutenproject.substrait.SubstraitContext
import io.glutenproject.substrait.{SubstraitContext, SupportFormat}
import io.glutenproject.substrait.plan.PlanBuilder
import io.glutenproject.substrait.rel.{ReadRelNode, RelBuilder, SplitInfo}
import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.InSubqueryExec
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

import com.google.common.collect.Lists

import scala.collection.JavaConverters._

trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource {
trait BasicScanExecTransformer extends LeafTransformSupport with SupportFormat {

// The key of merge schema option in Parquet reader.
protected val mergeSchemaOptionKey = "mergeschema"

/** Returns the filters that can be pushed down to native file scan */
def filterExprs(): Seq[Expression]

def outputAttributes(): Seq[Attribute]

/** This can be used to report FileFormat for a file based scan operator. */
val fileFormat: ReadFileFormat
def getPartitions: Seq[InputPartition]

def getPartitionSchemas: StructType

def getDataSchemas: StructType

// TODO: Remove this expensive call when CH support scan custom partition location.
def getInputFilePaths: Seq[String] = {
// This is a heavy operation, and only the required backend executes the corresponding logic.
if (BackendsApiManager.getSettings.requiredInputFilePaths()) {
getInputFilePathsInternal
} else {
Seq.empty
}
}
def getInputFilePaths: Seq[String]

/** Returns the split infos that will be processed by the underlying native engine. */
def getSplitInfos: Seq[SplitInfo] = {
def getSplitInfos: Seq[SplitInfo] =
getPartitions.map(
BackendsApiManager.getIteratorApiInstance
.genSplitInfo(_, getPartitionSchema, fileFormat))
}
.genSplitInfo(_, getPartitionSchemas, fileFormat))

def doExecuteColumnarInternal(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("outputRows")
Expand Down Expand Up @@ -87,12 +85,13 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
.supportFileFormatRead(
fileFormat,
schema.fields,
getPartitionSchema.nonEmpty,
getPartitionSchemas.nonEmpty,
getInputFilePaths)
) {
return ValidationResult.notOk(
s"Not supported file format or complex type for scan: $fileFormat")
}

val substraitContext = new SubstraitContext
val relNode = doTransform(substraitContext).root

Expand All @@ -103,9 +102,10 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
val output = outputAttributes()
val typeNodes = ConverterUtils.collectAttributeTypeNodes(output)
val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output)
val partitionSchemas = getPartitionSchemas
val columnTypeNodes = output.map {
attr =>
if (getPartitionSchema.exists(_.name.equals(attr.name))) {
if (partitionSchemas.exists(_.name.equals(attr.name))) {
new ColumnTypeNode(1)
} else {
new ColumnTypeNode(0)
Expand All @@ -125,7 +125,11 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
exprNode,
context,
context.nextOperatorId(this.nodeName))
relNode.asInstanceOf[ReadRelNode].setDataSchema(getDataSchema)
relNode.asInstanceOf[ReadRelNode].setDataSchema(getDataSchemas)
TransformContext(output, output, relNode)
}

def executeInSubqueryForDynamicPruningExpression(inSubquery: InSubqueryExec): Unit = {
if (inSubquery.values().isEmpty) inSubquery.updateResult()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,17 @@ class BatchScanExecTransformer(

override def getPartitions: Seq[InputPartition] = filteredFlattenPartitions

override def getPartitionSchema: StructType = scan match {
override def getPartitionSchemas: StructType = scan match {
case fileScan: FileScan => fileScan.readPartitionSchema
case _ => new StructType()
}

override def getDataSchema: StructType = scan match {
override def getDataSchemas: StructType = scan match {
case fileScan: FileScan => fileScan.readDataSchema
case _ => new StructType()
}

override def getInputFilePathsInternal: Seq[String] = {
override def getInputFilePaths: Seq[String] = {
scan match {
case fileScan: FileScan => fileScan.fileIndex.inputFiles.toSeq
case _ => Seq.empty
Expand Down
Loading

0 comments on commit 809fae8

Please sign in to comment.