diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala index bd1ea255f82d0..822d656ff1821 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala @@ -23,14 +23,15 @@ import io.glutenproject.substrait.`type`.ColumnTypeNode import io.glutenproject.substrait.SubstraitContext import io.glutenproject.substrait.plan.PlanBuilder import io.glutenproject.substrait.rel.{ReadRelNode, RelBuilder, SplitInfo} +import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression} import org.apache.spark.sql.vectorized.ColumnarBatch + import com.google.common.collect.Lists -import scala.collection.JavaConverters._ -import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat +import scala.collection.JavaConverters._ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource { diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/DataSourceV2TransformerRegister.scala b/gluten-core/src/main/scala/io/glutenproject/execution/DataSourceV2TransformerRegister.scala new file mode 100644 index 0000000000000..da0305acd148d --- /dev/null +++ b/gluten-core/src/main/scala/io/glutenproject/execution/DataSourceV2TransformerRegister.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec + +/** + * Data sources v2 transformer should implement this trait so that they can register an alias to + * their data source v2 transformer. This allows users to give the data source v2 transformer alias + * as the format type over the fully qualified class name. + */ +trait DataSourceV2TransformerRegister { + + /** + * The scan class name that this data source v2 transformer provider adapts. This is overridden by + * children to provide a alias for the data source v2 transformer. For example: + * + * {{{ + * override def scanClassName(): String = "org.apache.iceberg.spark.source.SparkBatchQueryScan" + * }}} + */ + def scanClassName(): String + + def createDataSourceV2Transformer( + batchScan: BatchScanExec, + partitionFilters: Seq[Expression]): BatchScanExecTransformer +} diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala b/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala index 9b21c8e7e0964..e0a8766029a66 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala @@ -24,12 +24,14 @@ import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec} import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} -import scala.reflect.runtime.{universe => ru} +import java.util.ServiceLoader +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ object ScanTransformerFactory { - private val IcebergScanClassName = "org.apache.iceberg.spark.source.SparkBatchQueryScan" - private val IcebergTransformerClassName = "io.glutenproject.execution.IcebergScanTransformer" + private val dataSourceV2TransformerMap = new ConcurrentHashMap[String, Class[_]]() def createFileSourceScanTransformer( scanExec: FileSourceScanExec, @@ -87,8 +89,12 @@ object ScanTransformerFactory { } val scan = batchScanExec.scan scan match { - case _ if scan.getClass.getName == IcebergScanClassName => - createBatchScanTransformer(IcebergTransformerClassName, batchScanExec, newPartitionFilters) + case _ if dataSourceV2TransformerExists(scan.getClass.getName) => + val cls = lookupDataSourceV2Transformer(scan.getClass.getName) + cls + .newInstance() + .asInstanceOf[DataSourceV2TransformerRegister] + .createDataSourceV2Transformer(batchScanExec, newPartitionFilters) case _ => new BatchScanExecTransformer( batchScanExec.output, @@ -100,18 +106,30 @@ object ScanTransformerFactory { def supportedBatchScan(scan: Scan): Boolean = scan match { case _: FileScan => true - case _ if scan.getClass.getName == IcebergScanClassName => true + case _ if dataSourceV2TransformerExists(scan.getClass.getName) => true case _ => false } - private def createBatchScanTransformer( - className: String, - params: Any*): BatchScanExecTransformer = { - val classMirror = ru.runtimeMirror(getClass.getClassLoader) - val classModule = classMirror.staticModule(className) - val mirror = classMirror.reflectModule(classModule) - val apply = mirror.symbol.typeSignature.member(ru.TermName("apply")).asMethod - val objMirror = classMirror.reflect(mirror.instance) - objMirror.reflectMethod(apply)(params: _*).asInstanceOf[BatchScanExecTransformer] + private def lookupDataSourceV2Transformer(scanClassName: String): Class[_] = { + dataSourceV2TransformerMap.computeIfAbsent( + scanClassName, + _ => { + val loader = Option(Thread.currentThread().getContextClassLoader) + .getOrElse(getClass.getClassLoader) + val serviceLoader = ServiceLoader.load(classOf[DataSourceV2TransformerRegister], loader) + serviceLoader.asScala + .filter(_.scanClassName().equalsIgnoreCase(scanClassName)) + .toList match { + case head :: Nil => + // there is exactly one registered alias + head.getClass + case _ => null + } + } + ) + } + + private def dataSourceV2TransformerExists(scanClassName: String): Boolean = { + lookupDataSourceV2Transformer(scanClassName) != null } } diff --git a/gluten-iceberg/src/main/resources/META-INF/services/io.glutenproject.execution.DataSourceV2TransformerRegister b/gluten-iceberg/src/main/resources/META-INF/services/io.glutenproject.execution.DataSourceV2TransformerRegister new file mode 100644 index 0000000000000..658967bb99b6c --- /dev/null +++ b/gluten-iceberg/src/main/resources/META-INF/services/io.glutenproject.execution.DataSourceV2TransformerRegister @@ -0,0 +1 @@ +io.glutenproject.execution.IcebergTransformerProvider \ No newline at end of file diff --git a/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergTransformerProvider.scala b/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergTransformerProvider.scala new file mode 100644 index 0000000000000..17d146da2021d --- /dev/null +++ b/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergTransformerProvider.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec + +class IcebergTransformerProvider extends DataSourceV2TransformerRegister { + + override def scanClassName(): String = "org.apache.iceberg.spark.source.SparkBatchQueryScan" + + override def createDataSourceV2Transformer( + batchScan: BatchScanExec, + partitionFilters: Seq[Expression]): BatchScanExecTransformer = { + IcebergScanTransformer.apply(batchScan, partitionFilters) + } +} diff --git a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala index 5ce4d837c7915..2dcfe832e1575 100644 --- a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala +++ b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala @@ -22,7 +22,7 @@ import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.spark.softaffinity.SoftAffinityUtil import org.apache.spark.sql.connector.read.{InputPartition, Scan} -import org.apache.iceberg.{FileFormat, FileScanTask, ScanTask} +import org.apache.iceberg.{CombinedScanTask, FileFormat, FileScanTask, ScanTask} import java.lang.{Long => JLong} import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} @@ -30,6 +30,7 @@ import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} import scala.collection.JavaConverters._ object GlutenIcebergSourceUtil { + def genSplitInfo(inputPartition: InputPartition, index: Int): SplitInfo = inputPartition match { case partition: SparkInputPartition => val paths = new JArrayList[String]() @@ -39,43 +40,39 @@ object GlutenIcebergSourceUtil { var fileFormat = ReadFileFormat.UnknownFormat val tasks = partition.taskGroup[ScanTask]().tasks().asScala - if (tasks.forall(_.isInstanceOf[FileScanTask])) { - tasks.map(_.asInstanceOf[FileScanTask]).foreach { - task => - paths.add(task.file().path().toString) - starts.add(task.start()) - lengths.add(task.length()) - partitionColumns.add(new JHashMap[String, String]()) - val currentFileFormat = task.file().format() match { - case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat - case FileFormat.ORC => ReadFileFormat.OrcReadFormat - case _ => - throw new UnsupportedOperationException( - "Iceberg Only support parquet and orc file format.") - } - if (fileFormat == ReadFileFormat.UnknownFormat) { - fileFormat = currentFileFormat - } else if (fileFormat != currentFileFormat) { + asFileScanTask(tasks.toList).foreach { + task => + paths.add(task.file().path().toString) + starts.add(task.start()) + lengths.add(task.length()) + partitionColumns.add(new JHashMap[String, String]()) + val currentFileFormat = task.file().format() match { + case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat + case FileFormat.ORC => ReadFileFormat.OrcReadFormat + case _ => throw new UnsupportedOperationException( - s"Only one file format is supported, " + - s"find different file format $fileFormat and $currentFileFormat") - } - } - val preferredLoc = SoftAffinityUtil.getFilePartitionLocations( - paths.asScala.toArray, - inputPartition.preferredLocations()) - IcebergLocalFilesBuilder.makeIcebergLocalFiles( - index, - paths, - starts, - lengths, - partitionColumns, - fileFormat, - preferredLoc.toList.asJava - ) - } else { - throw new UnsupportedOperationException("Only support iceberg FileScanTask.") + "Iceberg Only support parquet and orc file format.") + } + if (fileFormat == ReadFileFormat.UnknownFormat) { + fileFormat = currentFileFormat + } else if (fileFormat != currentFileFormat) { + throw new UnsupportedOperationException( + s"Only one file format is supported, " + + s"find different file format $fileFormat and $currentFileFormat") + } } + val preferredLoc = SoftAffinityUtil.getFilePartitionLocations( + paths.asScala.toArray, + inputPartition.preferredLocations()) + IcebergLocalFilesBuilder.makeIcebergLocalFiles( + index, + paths, + starts, + lengths, + partitionColumns, + fileFormat, + preferredLoc.toList.asJava + ) case _ => throw new UnsupportedOperationException("Only support iceberg SparkInputPartition.") } @@ -83,10 +80,9 @@ object GlutenIcebergSourceUtil { def getFileFormat(sparkScan: Scan): ReadFileFormat = sparkScan match { case scan: SparkBatchQueryScan => val tasks = scan.tasks().asScala - tasks.map(_.asCombinedScanTask()).foreach { + asFileScanTask(tasks.toList).foreach { task => - val file = task.files().asScala.head.file() - file.format() match { + task.file().format() match { case FileFormat.PARQUET => return ReadFileFormat.ParquetReadFormat case FileFormat.ORC => return ReadFileFormat.OrcReadFormat case _ => @@ -97,4 +93,14 @@ object GlutenIcebergSourceUtil { throw new UnsupportedOperationException("Only support iceberg SparkBatchQueryScan.") } + private def asFileScanTask(tasks: List[ScanTask]): List[FileScanTask] = { + if (tasks.forall(_.isFileScanTask)) { + tasks.map(_.asFileScanTask()) + } else if (tasks.forall(_.isInstanceOf[CombinedScanTask])) { + tasks.flatMap(_.asCombinedScanTask().tasks().asScala) + } else { + throw new UnsupportedOperationException( + "Only support iceberg CombinedScanTask and FileScanTask.") + } + } } diff --git a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala new file mode 100644 index 0000000000000..97c590dce212a --- /dev/null +++ b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import org.apache.spark.SparkConf + +class VeloxIcebergSuite extends WholeStageTransformerSuite { + + protected val rootPath: String = getClass.getResource("/").getPath + override protected val backend: String = "velox" + override protected val resourcePath: String = "/tpch-data-parquet-velox" + override protected val fileFormat: String = "parquet" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.sql.files.maxPartitionBytes", "1g") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.memory.offHeap.size", "2g") + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") + .set("spark.sql.catalog.spark_catalog.type", "hadoop") + .set("spark.sql.catalog.spark_catalog.warehouse", s"file://$rootPath/tpch-data-iceberg-velox") + } + + test("iceberg transformer exists") { + spark.sql(""" + |create table iceberg_tb using iceberg as + |(select 1 as col1, 2 as col2, 3 as col3) + |""".stripMargin) + + runQueryAndCompare(""" + |select * from iceberg_tb; + |""".stripMargin) { + checkOperatorMatch[IcebergScanTransformer] + } + } +} diff --git a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala index 0a76a30e0dfbe..b8693a48ccab8 100644 --- a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala +++ b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala @@ -53,4 +53,31 @@ class VeloxTPCHIcebergSuite extends VeloxTPCHSuite { (table, tableDF) }.toMap } + + test("iceberg transformer exists") { + runQueryAndCompare(""" + |SELECT + | l_orderkey, + | o_orderdate + |FROM + | orders, + | lineitem + |WHERE + | l_orderkey = o_orderkey + |ORDER BY + | l_orderkey, + | o_orderdate + |LIMIT + | 10; + |""".stripMargin) { + df => + { + assert( + getExecutedPlan(df).count( + plan => { + plan.isInstanceOf[IcebergScanTransformer] + }) == 2) + } + } + } }