From d8380a453afe57d018398af670e2724d854f59c0 Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Sun, 23 Jun 2024 08:30:41 +0800 Subject: [PATCH] Revert "[GLUTEN-5414] [VL] Support arrow csv option and schema (#5850)" This reverts commit ad817ed51f381d037a9de250e639bb6514e94242. --- .github/workflows/velox_docker.yml | 72 +- .github/workflows/velox_docker_cache.yml | 8 +- .../datasource/ArrowCSVFileFormat.scala | 111 +-- .../datasource/ArrowCSVOptionConverter.scala | 62 -- .../datasource/ArrowConvertorRule.scala | 12 +- .../v2/ArrowCSVPartitionReaderFactory.scala | 79 +- .../gluten/datasource/v2/ArrowCSVTable.scala | 12 - .../extension/ArrowScanReplaceRule.scala | 3 +- .../datasource/csv/student_option.csv | 4 - .../datasource/csv/student_option_schema.csv | 4 - .../datasource/csv/student_option_str.csv | 4 - .../gluten/execution/TestOperator.scala | 235 +---- ep/build-velox/src/build_velox.sh | 8 +- ep/build-velox/src/get_velox.sh | 2 - .../modify_arrow_dataset_scan_option.patch | 883 ------------------ ep/build-velox/src/modify_velox.patch | 3 +- .../WholeStageTransformerSuite.scala | 20 - gluten-data/pom.xml | 4 +- .../org/apache/gluten/utils/ArrowUtil.scala | 147 ++- gluten-ut/spark32/pom.xml | 2 +- .../utils/velox/VeloxTestSettings.scala | 9 - gluten-ut/spark33/pom.xml | 2 +- .../utils/velox/VeloxTestSettings.scala | 9 - gluten-ut/spark34/pom.xml | 2 +- .../utils/velox/VeloxTestSettings.scala | 9 - gluten-ut/spark35/pom.xml | 2 +- .../utils/velox/VeloxTestSettings.scala | 11 - .../datasources/csv/GlutenCSVSuite.scala | 65 +- gluten-ut/test/pom.xml | 2 +- pom.xml | 1 - 30 files changed, 313 insertions(+), 1474 deletions(-) delete mode 100644 backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVOptionConverter.scala delete mode 100644 backends-velox/src/test/resources/datasource/csv/student_option.csv delete mode 100644 backends-velox/src/test/resources/datasource/csv/student_option_schema.csv delete mode 100644 backends-velox/src/test/resources/datasource/csv/student_option_str.csv delete mode 100644 ep/build-velox/src/modify_arrow_dataset_scan_option.patch diff --git a/.github/workflows/velox_docker.yml b/.github/workflows/velox_docker.yml index 5f64c9f7e0e8b..c98ee024f2bd5 100644 --- a/.github/workflows/velox_docker.yml +++ b/.github/workflows/velox_docker.yml @@ -61,24 +61,16 @@ jobs: id: cache uses: actions/cache/restore@v3 with: - path: | - ./cpp/build/releases/ - ~/.m2/repository/org/apache/arrow/ + path: ./cpp/build/releases/ key: cache-velox-build-${{ hashFiles('./cache-key') }} - name: Build Gluten Velox third party if: ${{ steps.cache.outputs.cache-hit != 'true' }} run: | source dev/ci-velox-buildstatic.sh - - name: Upload Artifact Native - uses: actions/upload-artifact@v2 + - uses: actions/upload-artifact@v2 with: path: ./cpp/build/releases/ name: velox-native-lib-centos-7-${{github.sha}} - - name: Upload Artifact Arrow Jar - uses: actions/upload-artifact@v2 - with: - path: /root/.m2/repository/org/apache/arrow/ - name: velox-arrow-jar-centos-7-${{github.sha}} run-tpc-test-ubuntu: needs: build-native-lib-centos-7 @@ -110,16 +102,11 @@ jobs: container: ${{ matrix.os }} steps: - uses: actions/checkout@v2 - - name: Download All Native Artifacts + - name: Download All Artifacts uses: actions/download-artifact@v2 with: name: velox-native-lib-centos-7-${{github.sha}} - path: ./cpp/build/releases/ - - name: Download All Arrow Jar Artifacts - uses: actions/download-artifact@v2 - with: - name: velox-arrow-jar-centos-7-${{github.sha}} - path: /root/.m2/repository/org/apache/arrow/ + path: ./cpp/build/releases - name: Setup java and maven run: | if [ "${{ matrix.java }}" = "java-17" ]; then @@ -131,7 +118,7 @@ jobs: apt-get update && apt-get install -y openjdk-8-jdk maven apt remove openjdk-11* -y fi - ls -l /root/.m2/repository/org/apache/arrow/arrow-dataset/15.0.0-gluten/ + apt remove openjdk-11* -y - name: Build and run TPCH/DS run: | cd $GITHUB_WORKSPACE/ @@ -175,16 +162,11 @@ jobs: container: ${{ matrix.os }} steps: - uses: actions/checkout@v2 - - name: Download All Native Artifacts + - name: Download All Artifacts uses: actions/download-artifact@v2 with: name: velox-native-lib-centos-7-${{github.sha}} - path: ./cpp/build/releases/ - - name: Download All Arrow Jar Artifacts - uses: actions/download-artifact@v2 - with: - name: velox-arrow-jar-centos-7-${{github.sha}} - path: /root/.m2/repository/org/apache/arrow/ + path: ./cpp/build/releases - name: Update mirror list if: matrix.os == 'centos:8' run: | @@ -259,16 +241,11 @@ jobs: sudo docker image prune --all --force > /dev/null df -h - uses: actions/checkout@v2 - - name: Download All Native Artifacts + - name: Download All Artifacts uses: actions/download-artifact@v2 with: name: velox-native-lib-centos-7-${{github.sha}} - path: ./cpp/build/releases/ - - name: Download All Arrow Jar Artifacts - uses: actions/download-artifact@v2 - with: - name: velox-arrow-jar-centos-7-${{github.sha}} - path: /home/runner/.m2/repository/org/apache/arrow/ + path: ./cpp/build/releases - name: Setup java and maven run: | sudo apt-get update @@ -361,16 +338,11 @@ jobs: sudo docker image prune --all --force > /dev/null df -h - uses: actions/checkout@v2 - - name: Download All Native Artifacts + - name: Download All Artifacts uses: actions/download-artifact@v2 with: name: velox-native-lib-centos-7-${{github.sha}} - path: ./cpp/build/releases/ - - name: Download All Arrow Jar Artifacts - uses: actions/download-artifact@v2 - with: - name: velox-arrow-jar-centos-7-${{github.sha}} - path: /home/runner/.m2/repository/org/apache/arrow/ + path: ./cpp/build/releases - name: Setup java and maven run: | sudo apt-get update @@ -454,16 +426,11 @@ jobs: container: centos:8 steps: - uses: actions/checkout@v2 - - name: Download All Native Artifacts + - name: Download All Artifacts uses: actions/download-artifact@v2 with: name: velox-native-lib-centos-7-${{github.sha}} - path: ./cpp/build/releases/ - - name: Download All Arrow Jar Artifacts - uses: actions/download-artifact@v2 - with: - name: velox-arrow-jar-centos-7-${{github.sha}} - path: /root/.m2/repository/org/apache/arrow/ + path: ./cpp/build/releases - name: Update mirror list run: | sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true @@ -520,16 +487,11 @@ jobs: container: ubuntu:22.04 steps: - uses: actions/checkout@v2 - - name: Download All Native Artifacts + - name: Download All Artifacts uses: actions/download-artifact@v2 with: name: velox-native-lib-centos-7-${{github.sha}} - path: ./cpp/build/releases/ - - name: Download All Arrow Jar Artifacts - uses: actions/download-artifact@v2 - with: - name: velox-arrow-jar-centos-7-${{github.sha}} - path: /root/.m2/repository/org/apache/arrow/ + path: ./cpp/build/releases - name: Setup java and maven run: | apt-get update && apt-get install -y openjdk-8-jdk maven wget @@ -571,9 +533,7 @@ jobs: id: cache uses: actions/cache/restore@v3 with: - path: | - ./cpp/build/releases/ - /root/.m2/repository/org/apache/arrow/ + path: ./cpp/build/releases/ key: cache-velox-build-centos-8-${{ hashFiles('./cache-key') }} - name: Update mirror list run: | diff --git a/.github/workflows/velox_docker_cache.yml b/.github/workflows/velox_docker_cache.yml index 3c05acf7eca21..44271c4fc0d0e 100644 --- a/.github/workflows/velox_docker_cache.yml +++ b/.github/workflows/velox_docker_cache.yml @@ -38,9 +38,7 @@ jobs: uses: actions/cache/restore@v3 with: lookup-only: true - path: | - ./cpp/build/releases/ - /root/.m2/repository/org/apache/arrow/ + path: ./cpp/build/releases/ key: cache-velox-build-${{ hashFiles('./cache-key') }} - name: Build Gluten Velox third party if: steps.check-cache.outputs.cache-hit != 'true' @@ -51,9 +49,7 @@ jobs: id: cache uses: actions/cache/save@v3 with: - path: | - ./cpp/build/releases/ - /root/.m2/repository/org/apache/arrow/ + path: ./cpp/build/releases/ key: cache-velox-build-${{ hashFiles('./cache-key') }} # ccache-native-lib-ubuntu-velox-ut: diff --git a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala index 7c3ca8fc8cde4..0f6813d8fc6a1 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala @@ -40,10 +40,8 @@ import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration -import org.apache.arrow.c.ArrowSchema import org.apache.arrow.dataset.file.FileSystemDatasetFactory import org.apache.arrow.dataset.scanner.ScanOptions -import org.apache.arrow.dataset.scanner.csv.CsvFragmentScanOptions import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector.VectorUnloader import org.apache.arrow.vector.types.pojo.Schema @@ -53,17 +51,11 @@ import org.apache.hadoop.fs.{FileStatus, Path} import java.net.URLDecoder import java.util.Optional -import scala.collection.JavaConverters.{asJavaIterableConverter, asScalaBufferConverter} +import scala.collection.JavaConverters.asScalaBufferConverter -class ArrowCSVFileFormat(parsedOptions: CSVOptions) - extends FileFormat - with DataSourceRegister - with Logging - with Serializable { +class ArrowCSVFileFormat extends FileFormat with DataSourceRegister with Logging with Serializable { private val fileFormat = org.apache.arrow.dataset.file.FileFormat.CSV - private lazy val pool = ArrowNativeMemoryPool.arrowPool("FileSystem Read") - var fallback = false override def isSplitable( sparkSession: SparkSession, @@ -76,11 +68,9 @@ class ArrowCSVFileFormat(parsedOptions: CSVOptions) sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { - val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions) ArrowUtil.readSchema( files, fileFormat, - arrowConfig, ArrowBufferAllocators.contextInstance(), ArrowNativeMemoryPool.arrowPool("infer schema")) } @@ -99,74 +89,51 @@ class ArrowCSVFileFormat(parsedOptions: CSVOptions) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) val batchSize = sqlConf.columnBatchSize + val caseSensitive = sqlConf.caseSensitiveAnalysis val columnPruning = sqlConf.csvColumnPruning && !requiredSchema.exists(_.name == sparkSession.sessionState.conf.columnNameOfCorruptRecord) + val parsedOptions = new CSVOptions( + options, + columnPruning, + sparkSession.sessionState.conf.sessionLocalTimeZone, + sparkSession.sessionState.conf.columnNameOfCorruptRecord) val actualFilters = filters.filterNot(_.references.contains(parsedOptions.columnNameOfCorruptRecord)) (file: PartitionedFile) => { - val actualDataSchema = StructType( - dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) - val actualRequiredSchema = StructType( - requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) ArrowCSVFileFormat.checkHeader( file, - actualDataSchema, - actualRequiredSchema, + dataSchema, + requiredSchema, parsedOptions, actualFilters, broadcastedHadoopConf.value.value) - - val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions) - val allocator = ArrowBufferAllocators.contextInstance() - // todo predicate validation / pushdown - val fileNames = ArrowUtil - .readArrowFileColumnNames( + val factory = + ArrowUtil.makeArrowDiscovery( URLDecoder.decode(file.filePath.toString, "UTF-8"), fileFormat, - arrowConfig, ArrowBufferAllocators.contextInstance(), - pool) - val tokenIndexArr = - actualRequiredSchema - .map(f => java.lang.Integer.valueOf(actualDataSchema.indexOf(f))) - .toArray - val fileIndex = tokenIndexArr.filter(_ < fileNames.length) - val requestSchema = new StructType( - fileIndex - .map(index => StructField(fileNames(index), actualDataSchema(index).dataType))) - val missingIndex = tokenIndexArr.filter(_ >= fileNames.length) - val missingSchema = new StructType(missingIndex.map(actualDataSchema(_))) + ArrowNativeMemoryPool.arrowPool("FileSystemDatasetFactory") + ) + // todo predicate validation / pushdown + val fileFields = factory.inspect().getFields.asScala // TODO: support array/map/struct types in out-of-order schema reading. - val cSchema: ArrowSchema = ArrowSchema.allocateNew(allocator) - val cSchema2: ArrowSchema = ArrowSchema.allocateNew(allocator) try { - ArrowCSVOptionConverter.schema(requestSchema, cSchema, allocator, arrowConfig) - val factory = - ArrowUtil.makeArrowDiscovery( - URLDecoder.decode(file.filePath.toString, "UTF-8"), - fileFormat, - Optional.of(arrowConfig), - ArrowBufferAllocators.contextInstance(), - pool) - val fields = factory.inspect().getFields - val actualReadFields = new Schema( - fileIndex.map(index => fields.get(index)).toIterable.asJava) - ArrowCSVOptionConverter.schema(requestSchema, cSchema2, allocator, arrowConfig) + val actualReadFields = + ArrowUtil.getRequestedField(requiredSchema, fileFields, caseSensitive) ArrowCSVFileFormat .readArrow( ArrowBufferAllocators.contextInstance(), file, actualReadFields, - missingSchema, + caseSensitive, + requiredSchema, partitionSchema, factory, - batchSize, - arrowConfig) + batchSize) .asInstanceOf[Iterator[InternalRow]] } catch { case e: SchemaMismatchException => logWarning(e.getMessage) - fallback = true val iter = ArrowCSVFileFormat.fallbackReadVanilla( dataSchema, requiredSchema, @@ -181,10 +148,8 @@ class ArrowCSVFileFormat(parsedOptions: CSVOptions) .rowToColumn(schema, batchSize, rows) .asInstanceOf[Iterator[InternalRow]] case d: Exception => throw d - } finally { - cSchema.close() - cSchema2.close() } + } } @@ -219,23 +184,28 @@ object ArrowCSVFileFormat { allocator: BufferAllocator, file: PartitionedFile, actualReadFields: Schema, - missingSchema: StructType, + caseSensitive: Boolean, + requiredSchema: StructType, partitionSchema: StructType, factory: FileSystemDatasetFactory, - batchSize: Int, - arrowConfig: CsvFragmentScanOptions): Iterator[ColumnarBatch] = { + batchSize: Int): Iterator[ColumnarBatch] = { + val compare = ArrowUtil.compareStringFunc(caseSensitive) val actualReadFieldNames = actualReadFields.getFields.asScala.map(_.getName).toArray + val actualReadSchema = new StructType( + actualReadFieldNames.map(f => requiredSchema.find(field => compare(f, field.name)).get)) val dataset = factory.finish(actualReadFields) - val scanOptions = new ScanOptions.Builder(batchSize) - .columns(Optional.of(actualReadFieldNames)) - .fragmentScanOptions(arrowConfig) - .build() + + val hasMissingColumns = actualReadFields.getFields.size() != requiredSchema.size + + val scanOptions = new ScanOptions(batchSize, Optional.of(actualReadFieldNames)) val scanner = dataset.newScan(scanOptions) val partitionVectors = ArrowUtil.loadPartitionColumns(batchSize, partitionSchema, file.partitionValues) - val nullVectors = if (missingSchema.nonEmpty) { + val nullVectors = if (hasMissingColumns) { + val missingSchema = + new StructType(requiredSchema.filterNot(actualReadSchema.contains).toArray) ArrowUtil.loadMissingColumns(batchSize, missingSchema) } else { Array.empty[ArrowWritableColumnVector] @@ -255,7 +225,8 @@ object ArrowCSVFileFormat { val batch = ArrowUtil.loadBatch( allocator, unloader.getRecordBatch, - actualReadFields, + actualReadSchema, + requiredSchema, partitionVectors, nullVectors) batch @@ -275,8 +246,8 @@ object ArrowCSVFileFormat { def checkHeader( file: PartitionedFile, - actualDataSchema: StructType, - actualRequiredSchema: StructType, + dataSchema: StructType, + requiredSchema: StructType, parsedOptions: CSVOptions, actualFilters: Seq[Filter], conf: Configuration): Unit = { @@ -284,6 +255,10 @@ object ArrowCSVFileFormat { if (!isStartOfFile) { return } + val actualDataSchema = StructType( + dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) + val actualRequiredSchema = StructType( + requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) val parser = new UnivocityParser(actualDataSchema, actualRequiredSchema, parsedOptions, actualFilters) val schema = if (parsedOptions.columnPruning) actualRequiredSchema else actualDataSchema diff --git a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVOptionConverter.scala b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVOptionConverter.scala deleted file mode 100644 index 7d6a54c2ac7a6..0000000000000 --- a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVOptionConverter.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.datasource - -import org.apache.gluten.utils.ArrowAbiUtil - -import org.apache.spark.sql.catalyst.csv.CSVOptions -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.utils.SparkSchemaUtil - -import com.google.common.collect.ImmutableMap -import org.apache.arrow.c.ArrowSchema -import org.apache.arrow.dataset.scanner.csv.{CsvConvertOptions, CsvFragmentScanOptions} -import org.apache.arrow.memory.BufferAllocator - -import java.util - -object ArrowCSVOptionConverter { - def convert(option: CSVOptions): CsvFragmentScanOptions = { - val parseMap = new util.HashMap[String, String]() - val default = new CSVOptions( - CaseInsensitiveMap(Map()), - option.columnPruning, - SparkSchemaUtil.getLocalTimezoneID) - parseMap.put("strings_can_be_null", "true") - if (option.delimiter != default.delimiter) { - parseMap.put("delimiter", option.delimiter) - } - if (option.escapeQuotes != default.escapeQuotes) { - parseMap.put("quoting", (!option.escapeQuotes).toString) - } - - val convertOptions = new CsvConvertOptions(ImmutableMap.of()) - new CsvFragmentScanOptions(convertOptions, ImmutableMap.of(), parseMap) - } - - def schema( - requiredSchema: StructType, - cSchema: ArrowSchema, - allocator: BufferAllocator, - option: CsvFragmentScanOptions): Unit = { - val schema = SparkSchemaUtil.toArrowSchema(requiredSchema) - ArrowAbiUtil.exportSchema(allocator, schema, cSchema) - option.getConvertOptions.setArrowSchema(cSchema) - } - -} diff --git a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala index 2778710155bf9..dab1ffd3b9e3f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala @@ -19,7 +19,6 @@ package org.apache.gluten.datasource import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.datasource.v2.ArrowCSVTable import org.apache.gluten.sql.shims.SparkShimLoader -import org.apache.gluten.utils.LogicalPlanSelector import org.apache.spark.annotation.Experimental import org.apache.spark.sql.SparkSession @@ -40,7 +39,7 @@ import scala.collection.convert.ImplicitConversions.`map AsScala` @Experimental case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = LogicalPlanSelector.maybe(session, plan) { + override def apply(plan: LogicalPlan): LogicalPlan = { if (!BackendsApiManager.getSettings.enableNativeArrowReadFiles()) { return plan } @@ -50,11 +49,7 @@ case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] { _, _, _) if validate(session, dataSchema, options) => - val csvOptions = new CSVOptions( - options, - columnPruning = session.sessionState.conf.csvColumnPruning, - session.sessionState.conf.sessionLocalTimeZone) - l.copy(relation = r.copy(fileFormat = new ArrowCSVFileFormat(csvOptions))(session)) + l.copy(relation = r.copy(fileFormat = new ArrowCSVFileFormat())(session)) case d @ DataSourceV2Relation( t @ CSVTable( name, @@ -93,8 +88,7 @@ case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] { } private def checkCsvOptions(csvOptions: CSVOptions, timeZone: String): Boolean = { - csvOptions.headerFlag && !csvOptions.multiLine && - csvOptions.delimiter.length == 1 && + csvOptions.headerFlag && !csvOptions.multiLine && csvOptions.delimiter == "," && csvOptions.quote == '\"' && csvOptions.escape == '\\' && csvOptions.lineSeparator.isEmpty && diff --git a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala index 4af5022a6252a..ddc7f797fb938 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.datasource.v2 -import org.apache.gluten.datasource.{ArrowCSVFileFormat, ArrowCSVOptionConverter} +import org.apache.gluten.datasource.ArrowCSVFileFormat import org.apache.gluten.exception.SchemaMismatchException import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.memory.arrow.pool.ArrowNativeMemoryPool @@ -31,17 +31,15 @@ import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.{SerializableConfiguration, TaskResources} -import org.apache.arrow.c.ArrowSchema -import org.apache.arrow.vector.types.pojo.Schema +import org.apache.arrow.dataset.file.FileFormat import java.net.URLDecoder -import java.util.Optional -import scala.collection.JavaConverters.asJavaIterableConverter +import scala.collection.JavaConverters.asScalaBufferConverter case class ArrowCSVPartitionReaderFactory( sqlConf: SQLConf, @@ -55,9 +53,8 @@ case class ArrowCSVPartitionReaderFactory( with Logging { private val batchSize = sqlConf.parquetVectorizedReaderBatchSize + private val caseSensitive: Boolean = sqlConf.caseSensitiveAnalysis private val csvColumnPruning: Boolean = sqlConf.csvColumnPruning - private val fileFormat = org.apache.arrow.dataset.file.FileFormat.CSV - var fallback = false override def supportColumnarReads(partition: InputPartition): Boolean = true @@ -70,12 +67,12 @@ case class ArrowCSVPartitionReaderFactory( partitionedFile: PartitionedFile): PartitionReader[ColumnarBatch] = { val actualDataSchema = StructType( dataSchema.filterNot(_.name == options.columnNameOfCorruptRecord)) - val actualRequiredSchema = StructType( + val actualReadDataSchema = StructType( readDataSchema.filterNot(_.name == options.columnNameOfCorruptRecord)) ArrowCSVFileFormat.checkHeader( partitionedFile, actualDataSchema, - actualRequiredSchema, + actualReadDataSchema, options, filters, broadcastedConf.value.value) @@ -90,54 +87,29 @@ case class ArrowCSVPartitionReaderFactory( ArrowBufferAllocators.contextInstance(), ArrowNativeMemoryPool.arrowPool("FileSystemFactory")) } - val arrowConfig = ArrowCSVOptionConverter.convert(options) - val fileNames = ArrowUtil - .readArrowFileColumnNames( - URLDecoder.decode(partitionedFile.filePath.toString, "UTF-8"), - fileFormat, - arrowConfig, - ArrowBufferAllocators.contextInstance(), - pool) - val tokenIndexArr = - actualRequiredSchema.map(f => java.lang.Integer.valueOf(actualDataSchema.indexOf(f))).toArray - val fileIndex = tokenIndexArr.filter(_ < fileNames.length) - val requestSchema = new StructType( - fileIndex - .map(index => StructField(fileNames(index), actualDataSchema(index).dataType))) - val missingIndex = tokenIndexArr.filter(_ >= fileNames.length) - val missingSchema = new StructType(missingIndex.map(actualDataSchema(_))) - // TODO: support array/map/struct types in out-of-order schema reading. - val cSchema: ArrowSchema = ArrowSchema.allocateNew(allocator) - val cSchema2: ArrowSchema = ArrowSchema.allocateNew(allocator) + val factory = ArrowUtil.makeArrowDiscovery( + URLDecoder.decode(partitionedFile.filePath.toString(), "UTF-8"), + FileFormat.CSV, + allocator, + pool) + val parquetFileFields = factory.inspect().getFields.asScala // TODO: support array/map/struct types in out-of-order schema reading. val iter = try { - ArrowCSVOptionConverter.schema(requestSchema, cSchema, allocator, arrowConfig) - val factory = - ArrowUtil.makeArrowDiscovery( - URLDecoder.decode(partitionedFile.filePath.toString, "UTF-8"), - fileFormat, - Optional.of(arrowConfig), - ArrowBufferAllocators.contextInstance(), - pool) - val fields = factory.inspect().getFields - val actualReadFields = new Schema( - fileIndex.map(index => fields.get(index)).toIterable.asJava) - ArrowCSVOptionConverter.schema(requestSchema, cSchema2, allocator, arrowConfig) - ArrowCSVFileFormat - .readArrow( - ArrowBufferAllocators.contextInstance(), - partitionedFile, - actualReadFields, - missingSchema, - readPartitionSchema, - factory, - batchSize, - arrowConfig) + val actualReadFields = + ArrowUtil.getRequestedField(readDataSchema, parquetFileFields, caseSensitive) + ArrowCSVFileFormat.readArrow( + allocator, + partitionedFile, + actualReadFields, + caseSensitive, + readDataSchema, + readPartitionSchema, + factory, + batchSize) } catch { case e: SchemaMismatchException => logWarning(e.getMessage) - fallback = true val iter = ArrowCSVFileFormat.fallbackReadVanilla( dataSchema, readDataSchema, @@ -153,9 +125,6 @@ case class ArrowCSVPartitionReaderFactory( partitionedFile) ArrowCSVFileFormat.rowToColumn(schema, batchSize, rows) case d: Exception => throw d - } finally { - cSchema.close() - cSchema2.close() } new PartitionReader[ColumnarBatch] { diff --git a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala index 02485975e7055..aa7f737f9cfcd 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala @@ -16,13 +16,11 @@ */ package org.apache.gluten.datasource.v2 -import org.apache.gluten.datasource.ArrowCSVOptionConverter import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.memory.arrow.pool.ArrowNativeMemoryPool import org.apache.gluten.utils.ArrowUtil import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat @@ -33,8 +31,6 @@ import org.apache.spark.util.TaskResources import org.apache.hadoop.fs.FileStatus -import scala.collection.JavaConverters.mapAsScalaMapConverter - case class ArrowCSVTable( name: String, sparkSession: SparkSession, @@ -52,17 +48,9 @@ case class ArrowCSVTable( } else { (ArrowBufferAllocators.contextInstance(), ArrowNativeMemoryPool.arrowPool("inferSchema")) } - val parsedOptions: CSVOptions = new CSVOptions( - options.asScala.toMap, - columnPruning = sparkSession.sessionState.conf.csvColumnPruning, - sparkSession.sessionState.conf.sessionLocalTimeZone, - sparkSession.sessionState.conf.columnNameOfCorruptRecord - ) - val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions) ArrowUtil.readSchema( files.head, org.apache.arrow.dataset.file.FileFormat.CSV, - arrowConfig, allocator, pool ) diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala index dba8df5cf1a1f..adfc6ca742c93 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala @@ -19,7 +19,6 @@ package org.apache.gluten.extension import org.apache.gluten.datasource.ArrowCSVFileFormat import org.apache.gluten.datasource.v2.ArrowCSVScan import org.apache.gluten.execution.datasource.v2.ArrowBatchScanExec -import org.apache.gluten.utils.PhysicalPlanSelector import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.rules.Rule @@ -27,7 +26,7 @@ import org.apache.spark.sql.execution.{ArrowFileSourceScanExec, FileSourceScanEx import org.apache.spark.sql.execution.datasources.v2.BatchScanExec case class ArrowScanReplaceRule(spark: SparkSession) extends Rule[SparkPlan] { - override def apply(plan: SparkPlan): SparkPlan = PhysicalPlanSelector.maybe(spark, plan) { + override def apply(plan: SparkPlan): SparkPlan = { plan.transformUp { case plan: FileSourceScanExec if plan.relation.fileFormat.isInstanceOf[ArrowCSVFileFormat] => ArrowFileSourceScanExec(plan) diff --git a/backends-velox/src/test/resources/datasource/csv/student_option.csv b/backends-velox/src/test/resources/datasource/csv/student_option.csv deleted file mode 100644 index 919b7387b53c0..0000000000000 --- a/backends-velox/src/test/resources/datasource/csv/student_option.csv +++ /dev/null @@ -1,4 +0,0 @@ -Name;Language -Juno;Java -Peter;Python -Celin;C++ diff --git a/backends-velox/src/test/resources/datasource/csv/student_option_schema.csv b/backends-velox/src/test/resources/datasource/csv/student_option_schema.csv deleted file mode 100644 index be8459a217393..0000000000000 --- a/backends-velox/src/test/resources/datasource/csv/student_option_schema.csv +++ /dev/null @@ -1,4 +0,0 @@ -id,name,language -1,Juno,Java -2,Peter,Python -3,Celin,C++ diff --git a/backends-velox/src/test/resources/datasource/csv/student_option_str.csv b/backends-velox/src/test/resources/datasource/csv/student_option_str.csv deleted file mode 100644 index b4214b390caea..0000000000000 --- a/backends-velox/src/test/resources/datasource/csv/student_option_str.csv +++ /dev/null @@ -1,4 +0,0 @@ -Name,Language -Juno,Java -Peter,Python -,C++ diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala index a892b6f313a4e..166529f193cc0 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DecimalType, IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{DecimalType, StringType, StructField, StructType} import java.util.concurrent.TimeUnit @@ -588,153 +588,42 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla } test("csv scan") { - val df = runAndCompare("select * from student") { - val filePath = rootPath + "/datasource/csv/student.csv" - val df = spark.read - .format("csv") - .option("header", "true") - .load(filePath) - df.createOrReplaceTempView("student") - } - val plan = df.queryExecution.executedPlan - assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined) - assert(plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined) - val scan = plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).toList.head - assert( - scan - .asInstanceOf[ArrowFileSourceScanExec] - .relation - .fileFormat - .isInstanceOf[ArrowCSVFileFormat]) - } - - test("csv scan with option string as null") { - val df = runAndCompare("select * from student") { - val filePath = rootPath + "/datasource/csv/student_option_str.csv" - // test strings as null - val df = spark.read - .format("csv") - .option("header", "true") - .load(filePath) - df.createOrReplaceTempView("student") - } - val plan = df.queryExecution.executedPlan - assert(plan.find(_.isInstanceOf[ColumnarToRowExec]).isDefined) - assert(plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined) - } - - test("csv scan with option delimiter") { - val df = runAndCompare("select * from student") { - val filePath = rootPath + "/datasource/csv/student_option.csv" - val df = spark.read - .format("csv") - .option("header", "true") - .option("delimiter", ";") - .load(filePath) - df.createOrReplaceTempView("student") - } - val plan = df.queryExecution.executedPlan - assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined) - assert(plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined) - } - - test("csv scan with schema") { - val df = runAndCompare("select * from student") { - val filePath = rootPath + "/datasource/csv/student_option_schema.csv" - val schema = new StructType() - .add("id", StringType) - .add("name", StringType) - .add("language", StringType) - val df = spark.read - .schema(schema) - .format("csv") - .option("header", "true") - .load(filePath) - df.createOrReplaceTempView("student") - } - val plan = df.queryExecution.executedPlan - assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined) - val scan = plan.find(_.isInstanceOf[ArrowFileSourceScanExec]) - assert(scan.isDefined) - assert( - !scan.get - .asInstanceOf[ArrowFileSourceScanExec] - .original - .relation - .fileFormat - .asInstanceOf[ArrowCSVFileFormat] - .fallback) - } - - test("csv scan with missing columns") { - val df = runAndCompare("select languagemissing, language, id_new_col from student") { - val filePath = rootPath + "/datasource/csv/student_option_schema.csv" - val schema = new StructType() - .add("id_new_col", IntegerType) - .add("name", StringType) - .add("language", StringType) - .add("languagemissing", StringType) - val df = spark.read - .schema(schema) - .format("csv") - .option("header", "true") - .load(filePath) - df.createOrReplaceTempView("student") - } - val plan = df.queryExecution.executedPlan - assert(plan.find(s => s.isInstanceOf[VeloxColumnarToRowExec]).isDefined) - assert(plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined) - } - - test("csv scan with different name") { - val df = runAndCompare("select * from student") { - val filePath = rootPath + "/datasource/csv/student_option_schema.csv" - val schema = new StructType() - .add("id_new_col", StringType) - .add("name", StringType) - .add("language", StringType) - val df = spark.read - .schema(schema) - .format("csv") - .option("header", "true") - .load(filePath) - df.createOrReplaceTempView("student") - } - val plan = df.queryExecution.executedPlan - assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined) - assert(plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined) - - val df2 = runAndCompare("select * from student_schema") { - val filePath = rootPath + "/datasource/csv/student_option_schema.csv" - val schema = new StructType() - .add("name", StringType) - .add("language", StringType) - val df = spark.read - .schema(schema) - .format("csv") - .option("header", "true") - .load(filePath) - df.createOrReplaceTempView("student_schema") + val filePath = rootPath + "/datasource/csv/student.csv" + val df = spark.read + .format("csv") + .option("header", "true") + .load(filePath) + df.createOrReplaceTempView("student") + runQueryAndCompare("select * from student") { + df => + val plan = df.queryExecution.executedPlan + assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined) + assert(plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined) + val scan = plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).toList.head + assert( + scan + .asInstanceOf[ArrowFileSourceScanExec] + .relation + .fileFormat + .isInstanceOf[ArrowCSVFileFormat]) } - val plan2 = df2.queryExecution.executedPlan - assert(plan2.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined) - assert(plan2.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined) } test("csv scan with filter") { - val df = runAndCompare("select * from student where Name = 'Peter'") { - val filePath = rootPath + "/datasource/csv/student.csv" - val df = spark.read - .format("csv") - .option("header", "true") - .load(filePath) - df.createOrReplaceTempView("student") + val filePath = rootPath + "/datasource/csv/student.csv" + val df = spark.read + .format("csv") + .option("header", "true") + .load(filePath) + df.createOrReplaceTempView("student") + runQueryAndCompare("select * from student where Name = 'Peter'") { + df => + assert(df.queryExecution.executedPlan.find(s => s.isInstanceOf[ColumnarToRowExec]).isEmpty) + assert( + df.queryExecution.executedPlan + .find(s => s.isInstanceOf[ArrowFileSourceScanExec]) + .isDefined) } - assert(df.queryExecution.executedPlan.find(s => s.isInstanceOf[ColumnarToRowExec]).isEmpty) - assert( - df.queryExecution.executedPlan - .find(s => s.isInstanceOf[ArrowFileSourceScanExec]) - .isDefined) } test("insert into select from csv") { @@ -756,55 +645,21 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla test("csv scan datasource v2") { withSQLConf("spark.sql.sources.useV1SourceList" -> "") { - val df = runAndCompare("select * from student") { - val filePath = rootPath + "/datasource/csv/student.csv" - val df = spark.read - .format("csv") - .option("header", "true") - .load(filePath) - df.createOrReplaceTempView("student") - } - val plan = df.queryExecution.executedPlan - assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined) - assert(plan.find(s => s.isInstanceOf[ArrowBatchScanExec]).isDefined) - } - } - - test("csv scan datasource v2 with filter") { - withSQLConf("spark.sql.sources.useV1SourceList" -> "") { - val df = runAndCompare("select * from student where Name = 'Peter'") { - val filePath = rootPath + "/datasource/csv/student.csv" - val df = spark.read - .format("csv") - .option("header", "true") - .load(filePath) - df.createOrReplaceTempView("student") + val filePath = rootPath + "/datasource/csv/student.csv" + val df = spark.read + .format("csv") + .option("header", "true") + .load(filePath) + df.createOrReplaceTempView("student") + runQueryAndCompare("select * from student") { + checkGlutenOperatorMatch[ArrowBatchScanExec] } - - val plan = df.queryExecution.executedPlan - assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isEmpty) - assert(plan.find(s => s.isInstanceOf[ArrowBatchScanExec]).isDefined) - } - } - - test("csv scan with schema datasource v2") { - withSQLConf("spark.sql.sources.useV1SourceList" -> "") { - val df = runAndCompare("select * from student") { - val filePath = rootPath + "/datasource/csv/student_option_schema.csv" - val schema = new StructType() - .add("id", StringType) - .add("name", StringType) - .add("language", StringType) - val df = spark.read - .schema(schema) - .format("csv") - .option("header", "true") - .load(filePath) - df.createOrReplaceTempView("student") + runQueryAndCompare("select * from student where Name = 'Peter'") { + df => + val plan = df.queryExecution.executedPlan + assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isEmpty) + assert(plan.find(s => s.isInstanceOf[ArrowBatchScanExec]).isDefined) } - val plan = df.queryExecution.executedPlan - assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined) - assert(plan.find(_.isInstanceOf[ArrowBatchScanExec]).isDefined) } } diff --git a/ep/build-velox/src/build_velox.sh b/ep/build-velox/src/build_velox.sh index 0224e95468611..5f0a242610e6f 100755 --- a/ep/build-velox/src/build_velox.sh +++ b/ep/build-velox/src/build_velox.sh @@ -282,11 +282,9 @@ function compile_arrow_java_module() { ARROW_INSTALL_DIR="${ARROW_HOME}/../../install" pushd $ARROW_HOME/java - # Because arrow-bom module need the -DprocessAllModules - mvn versions:set -DnewVersion=15.0.0-gluten -DprocessAllModules - mvn clean install -pl bom,maven/module-info-compiler-maven-plugin,vector -am \ - -DskipTests -Drat.skip -Dmaven.gitcommitid.skip -Dcheckstyle.skip -Dassembly.skipAssembly + mvn clean install -pl maven/module-info-compiler-maven-plugin -am \ + -Dmaven.test.skip -Drat.skip -Dmaven.gitcommitid.skip -Dcheckstyle.skip # Arrow C Data Interface CPP libraries mvn generate-resources -P generate-libs-cdata-all-os -Darrow.c.jni.dist.dir=$ARROW_INSTALL_DIR \ @@ -299,7 +297,7 @@ function compile_arrow_java_module() { -Dmaven.test.skip -Drat.skip -Dmaven.gitcommitid.skip -Dcheckstyle.skip -N # Arrow Java libraries - mvn install -Parrow-jni -P arrow-c-data -pl c,dataset -am \ + mvn clean install -Parrow-jni -P arrow-c-data -pl dataset,c -am \ -Darrow.c.jni.dist.dir=$ARROW_INSTALL_DIR/lib -Darrow.dataset.jni.dist.dir=$ARROW_INSTALL_DIR/lib -Darrow.cpp.build.dir=$ARROW_INSTALL_DIR/lib \ -Dmaven.test.skip -Drat.skip -Dmaven.gitcommitid.skip -Dcheckstyle.skip -Dassembly.skipAssembly popd diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index a0a7baa0da45e..4e5aea726e679 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -257,10 +257,8 @@ function apply_compilation_fixes { velox_home=$2 sudo cp ${current_dir}/modify_velox.patch ${velox_home}/ sudo cp ${current_dir}/modify_arrow.patch ${velox_home}/third_party/ - sudo cp ${current_dir}/modify_arrow_dataset_scan_option.patch ${velox_home}/third_party/ git add ${velox_home}/modify_velox.patch # to avoid the file from being deleted by git clean -dffx :/ git add ${velox_home}/third_party/modify_arrow.patch # to avoid the file from being deleted by git clean -dffx :/ - git add ${velox_home}/third_party/modify_arrow_dataset_scan_option.patch # to avoid the file from being deleted by git clean -dffx :/ cd ${velox_home} echo "Applying patch to Velox source code..." git apply modify_velox.patch diff --git a/ep/build-velox/src/modify_arrow_dataset_scan_option.patch b/ep/build-velox/src/modify_arrow_dataset_scan_option.patch deleted file mode 100644 index 4af78c030c006..0000000000000 --- a/ep/build-velox/src/modify_arrow_dataset_scan_option.patch +++ /dev/null @@ -1,883 +0,0 @@ -diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc -index 09ab77572..f09377cf9 100644 ---- a/cpp/src/arrow/dataset/file_csv.cc -+++ b/cpp/src/arrow/dataset/file_csv.cc -@@ -24,6 +24,7 @@ - #include - #include - -+#include "arrow/c/bridge.h" - #include "arrow/csv/options.h" - #include "arrow/csv/parser.h" - #include "arrow/csv/reader.h" -@@ -52,6 +53,9 @@ using internal::Executor; - using internal::SerialExecutor; - - namespace dataset { -+namespace { -+inline bool parseBool(const std::string& value) { return value == "true" ? true : false; } -+} // namespace - - struct CsvInspectedFragment : public InspectedFragment { - CsvInspectedFragment(std::vector column_names, -@@ -503,5 +507,33 @@ Future<> CsvFileWriter::FinishInternal() { - return Status::OK(); - } - -+Result> CsvFragmentScanOptions::from( -+ const std::unordered_map& configs) { -+ std::shared_ptr options = -+ std::make_shared(); -+ for (auto const& it : configs) { -+ auto& key = it.first; -+ auto& value = it.second; -+ if (key == "delimiter") { -+ options->parse_options.delimiter = value.data()[0]; -+ } else if (key == "quoting") { -+ options->parse_options.quoting = parseBool(value); -+ } else if (key == "column_types") { -+ int64_t schema_address = std::stol(value); -+ ArrowSchema* cSchema = reinterpret_cast(schema_address); -+ ARROW_ASSIGN_OR_RAISE(auto schema, arrow::ImportSchema(cSchema)); -+ auto& column_types = options->convert_options.column_types; -+ for (auto field : schema->fields()) { -+ column_types[field->name()] = field->type(); -+ } -+ } else if (key == "strings_can_be_null") { -+ options->convert_options.strings_can_be_null = parseBool(value); -+ } else { -+ return Status::Invalid("Config " + it.first + "is not supported."); -+ } -+ } -+ return options; -+} -+ - } // namespace dataset - } // namespace arrow -diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h -index 42e3fd724..4d2825183 100644 ---- a/cpp/src/arrow/dataset/file_csv.h -+++ b/cpp/src/arrow/dataset/file_csv.h -@@ -85,6 +85,9 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { - struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { - std::string type_name() const override { return kCsvTypeName; } - -+ static Result> from( -+ const std::unordered_map& configs); -+ - using StreamWrapFunc = std::function>( - std::shared_ptr)>; - -diff --git a/cpp/src/arrow/engine/substrait/expression_internal.cc b/cpp/src/arrow/engine/substrait/expression_internal.cc -index 5d892af9a..0f8b0448b 100644 ---- a/cpp/src/arrow/engine/substrait/expression_internal.cc -+++ b/cpp/src/arrow/engine/substrait/expression_internal.cc -@@ -1337,5 +1337,17 @@ Result> ToProto( - return std::move(out); - } - -+Status FromProto(const substrait::Expression::Literal& literal, -+ std::unordered_map& out) { -+ ARROW_RETURN_IF(!literal.has_map(), Status::Invalid("Literal does not have a map.")); -+ auto literalMap = literal.map(); -+ auto size = literalMap.key_values_size(); -+ for (auto i = 0; i < size; i++) { -+ substrait::Expression_Literal_Map_KeyValue keyValue = literalMap.key_values(i); -+ out.emplace(keyValue.key().string(), keyValue.value().string()); -+ } -+ return Status::OK(); -+} -+ - } // namespace engine - } // namespace arrow -diff --git a/cpp/src/arrow/engine/substrait/expression_internal.h b/cpp/src/arrow/engine/substrait/expression_internal.h -index 2ce2ee76a..9be81b7ab 100644 ---- a/cpp/src/arrow/engine/substrait/expression_internal.h -+++ b/cpp/src/arrow/engine/substrait/expression_internal.h -@@ -61,5 +61,9 @@ ARROW_ENGINE_EXPORT - Result FromProto(const substrait::AggregateFunction&, bool is_hash, - const ExtensionSet&, const ConversionOptions&); - -+ARROW_ENGINE_EXPORT -+Status FromProto(const substrait::Expression::Literal& literal, -+ std::unordered_map& out); -+ - } // namespace engine - } // namespace arrow -diff --git a/cpp/src/arrow/engine/substrait/serde.cc b/cpp/src/arrow/engine/substrait/serde.cc -index 9e670f121..02e5c7171 100644 ---- a/cpp/src/arrow/engine/substrait/serde.cc -+++ b/cpp/src/arrow/engine/substrait/serde.cc -@@ -247,6 +247,16 @@ Result DeserializeExpressions( - return FromProto(extended_expression, ext_set_out, conversion_options, registry); - } - -+Status DeserializeMap(const Buffer& buf, -+ std::unordered_map& out) { -+ // ARROW_ASSIGN_OR_RAISE(auto advanced_extension, -+ // ParseFromBuffer(buf)); -+ // return FromProto(advanced_extension, out); -+ ARROW_ASSIGN_OR_RAISE(auto literal, -+ ParseFromBuffer(buf)); -+ return FromProto(literal, out); -+} -+ - namespace { - - Result> MakeSingleDeclarationPlan( -diff --git a/cpp/src/arrow/engine/substrait/serde.h b/cpp/src/arrow/engine/substrait/serde.h -index ab749f4a6..6312ec239 100644 ---- a/cpp/src/arrow/engine/substrait/serde.h -+++ b/cpp/src/arrow/engine/substrait/serde.h -@@ -23,6 +23,7 @@ - #include - #include - #include -+#include - #include - - #include "arrow/compute/type_fwd.h" -@@ -183,6 +184,9 @@ ARROW_ENGINE_EXPORT Result DeserializeExpressions( - const ConversionOptions& conversion_options = {}, - ExtensionSet* ext_set_out = NULLPTR); - -+ARROW_ENGINE_EXPORT Status -+DeserializeMap(const Buffer& buf, std::unordered_map& out); -+ - /// \brief Deserializes a Substrait Type message to the corresponding Arrow type - /// - /// \param[in] buf a buffer containing the protobuf serialization of a Substrait Type -diff --git a/java/dataset/pom.xml b/java/dataset/pom.xml -index d4d3e2c0f..ce72eaa1f 100644 ---- a/java/dataset/pom.xml -+++ b/java/dataset/pom.xml -@@ -25,9 +25,10 @@ - jar - - ../../../cpp/release-build/ -- 2.5.0 - 1.11.0 - 1.11.3 -+ 0.31.0 -+ 3.25.3 - - - -@@ -47,6 +48,18 @@ - arrow-c-data - compile - -+ -+ io.substrait -+ core -+ ${substrait.version} -+ provided -+ -+ -+ com.google.protobuf -+ protobuf-java -+ ${protobuf.version} -+ provided -+ - - org.apache.arrow - arrow-memory-netty -diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc -index 8d7dafd84..89cdc39fe 100644 ---- a/java/dataset/src/main/cpp/jni_wrapper.cc -+++ b/java/dataset/src/main/cpp/jni_wrapper.cc -@@ -25,6 +25,7 @@ - #include "arrow/c/helpers.h" - #include "arrow/dataset/api.h" - #include "arrow/dataset/file_base.h" -+#include "arrow/dataset/file_csv.h" - #include "arrow/filesystem/localfs.h" - #include "arrow/filesystem/path_util.h" - #ifdef ARROW_S3 -@@ -122,6 +123,19 @@ arrow::Result> GetFileFormat( - } - } - -+arrow::Result> -+GetFragmentScanOptions(jint file_format_id, -+ const std::unordered_map& configs) { -+ switch (file_format_id) { -+#ifdef ARROW_CSV -+ case 3: -+ return arrow::dataset::CsvFragmentScanOptions::from(configs); -+#endif -+ default: -+ return arrow::Status::Invalid("Illegal file format id: " ,file_format_id); -+ } -+} -+ - class ReserveFromJava : public arrow::dataset::jni::ReservationListener { - public: - ReserveFromJava(JavaVM* vm, jobject java_reservation_listener) -@@ -460,12 +474,13 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_closeDataset - /* - * Class: org_apache_arrow_dataset_jni_JniWrapper - * Method: createScanner -- * Signature: (J[Ljava/lang/String;Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;JJ)J -+ * Signature: -+ * (J[Ljava/lang/String;Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;JJ;Ljava/nio/ByteBuffer;J)J - */ - JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScanner( - JNIEnv* env, jobject, jlong dataset_id, jobjectArray columns, -- jobject substrait_projection, jobject substrait_filter, -- jlong batch_size, jlong memory_pool_id) { -+ jobject substrait_projection, jobject substrait_filter, jlong batch_size, -+ jlong file_format_id, jobject options, jlong memory_pool_id) { - JNI_METHOD_START - arrow::MemoryPool* pool = reinterpret_cast(memory_pool_id); - if (pool == nullptr) { -@@ -514,6 +529,14 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScann - } - JniAssertOkOrThrow(scanner_builder->Filter(*filter_expr)); - } -+ if (file_format_id != -1 && options != nullptr) { -+ std::unordered_map option_map; -+ std::shared_ptr buffer = LoadArrowBufferFromByteBuffer(env, options); -+ JniAssertOkOrThrow(arrow::engine::DeserializeMap(*buffer, option_map)); -+ std::shared_ptr scan_options = -+ JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map)); -+ JniAssertOkOrThrow(scanner_builder->FragmentScanOptions(scan_options)); -+ } - JniAssertOkOrThrow(scanner_builder->BatchSize(batch_size)); - - auto scanner = JniGetOrThrow(scanner_builder->Finish()); -@@ -627,14 +650,31 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_ensureS3Fina - /* - * Class: org_apache_arrow_dataset_file_JniWrapper - * Method: makeFileSystemDatasetFactory -- * Signature: (Ljava/lang/String;II)J -+ * Signature: (Ljava/lang/String;IILjava/lang/String;Ljava/nio/ByteBuffer)J - */ - JNIEXPORT jlong JNICALL --Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljava_lang_String_2I( -- JNIEnv* env, jobject, jstring uri, jint file_format_id) { -+Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( -+ JNIEnv* env, jobject, jstring uri, jint file_format_id, jobject options) { - JNI_METHOD_START - std::shared_ptr file_format = - JniGetOrThrow(GetFileFormat(file_format_id)); -+ if (options != nullptr) { -+ std::unordered_map option_map; -+ std::shared_ptr buffer = LoadArrowBufferFromByteBuffer(env, options); -+ JniAssertOkOrThrow(arrow::engine::DeserializeMap(*buffer, option_map)); -+ std::shared_ptr scan_options = -+ JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map)); -+ file_format->default_fragment_scan_options = scan_options; -+#ifdef ARROW_CSV -+ if (file_format_id == 3) { -+ std::shared_ptr csv_file_format = -+ std::dynamic_pointer_cast(file_format); -+ csv_file_format->parse_options = -+ std::dynamic_pointer_cast(scan_options) -+ ->parse_options; -+ } -+#endif -+ } - arrow::dataset::FileSystemFactoryOptions options; - std::shared_ptr d = - JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make( -@@ -645,16 +685,33 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljav - - /* - * Class: org_apache_arrow_dataset_file_JniWrapper -- * Method: makeFileSystemDatasetFactory -- * Signature: ([Ljava/lang/String;II)J -+ * Method: makeFileSystemDatasetFactoryWithFiles -+ * Signature: ([Ljava/lang/String;IIJ;Ljava/nio/ByteBuffer)J - */ - JNIEXPORT jlong JNICALL --Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Ljava_lang_String_2I( -- JNIEnv* env, jobject, jobjectArray uris, jint file_format_id) { -+Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactoryWithFiles( -+ JNIEnv* env, jobject, jobjectArray uris, jint file_format_id, jobject options) { - JNI_METHOD_START - - std::shared_ptr file_format = - JniGetOrThrow(GetFileFormat(file_format_id)); -+ if (options != nullptr) { -+ std::unordered_map option_map; -+ std::shared_ptr buffer = LoadArrowBufferFromByteBuffer(env, options); -+ JniAssertOkOrThrow(arrow::engine::DeserializeMap(*buffer, option_map)); -+ std::shared_ptr scan_options = -+ JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map)); -+ file_format->default_fragment_scan_options = scan_options; -+#ifdef ARROW_CSV -+ if (file_format_id == 3) { -+ std::shared_ptr csv_file_format = -+ std::dynamic_pointer_cast(file_format); -+ csv_file_format->parse_options = -+ std::dynamic_pointer_cast(scan_options) -+ ->parse_options; -+ } -+#endif -+ } - arrow::dataset::FileSystemFactoryOptions options; - - std::vector uri_vec = ToStringVector(env, uris); -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java -index aa3156905..a0b6fb168 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java -@@ -17,8 +17,11 @@ - - package org.apache.arrow.dataset.file; - -+import java.util.Optional; -+ - import org.apache.arrow.dataset.jni.NativeDatasetFactory; - import org.apache.arrow.dataset.jni.NativeMemoryPool; -+import org.apache.arrow.dataset.scanner.FragmentScanOptions; - import org.apache.arrow.memory.BufferAllocator; - - /** -@@ -27,21 +30,34 @@ import org.apache.arrow.memory.BufferAllocator; - public class FileSystemDatasetFactory extends NativeDatasetFactory { - - public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, -- String uri) { -- super(allocator, memoryPool, createNative(format, uri)); -+ String uri, Optional fragmentScanOptions) { -+ super(allocator, memoryPool, createNative(format, uri, fragmentScanOptions)); -+ } -+ -+ public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, -+ String uri) { -+ super(allocator, memoryPool, createNative(format, uri, Optional.empty())); -+ } -+ -+ public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, -+ String[] uris, Optional fragmentScanOptions) { -+ super(allocator, memoryPool, createNative(format, uris, fragmentScanOptions)); - } - - public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, - String[] uris) { -- super(allocator, memoryPool, createNative(format, uris)); -+ super(allocator, memoryPool, createNative(format, uris, Optional.empty())); - } - -- private static long createNative(FileFormat format, String uri) { -- return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id()); -+ private static long createNative(FileFormat format, String uri, Optional fragmentScanOptions) { -+ return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id(), -+ fragmentScanOptions.map(FragmentScanOptions::serialize).orElse(null)); - } - -- private static long createNative(FileFormat format, String[] uris) { -- return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id()); -+ private static long createNative(FileFormat format, String[] uris, -+ Optional fragmentScanOptions) { -+ return JniWrapper.get().makeFileSystemDatasetFactoryWithFiles(uris, format.id(), -+ fragmentScanOptions.map(FragmentScanOptions::serialize).orElse(null)); - } - - } -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java -index c3a1a4e58..c3f8e12b3 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java -@@ -17,6 +17,8 @@ - - package org.apache.arrow.dataset.file; - -+import java.nio.ByteBuffer; -+ - import org.apache.arrow.dataset.jni.JniLoader; - - /** -@@ -43,7 +45,8 @@ public class JniWrapper { - * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. - * @see FileFormat - */ -- public native long makeFileSystemDatasetFactory(String uri, int fileFormat); -+ public native long makeFileSystemDatasetFactory(String uri, int fileFormat, -+ ByteBuffer serializedFragmentScanOptions); - - /** - * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a -@@ -54,7 +57,8 @@ public class JniWrapper { - * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. - * @see FileFormat - */ -- public native long makeFileSystemDatasetFactory(String[] uris, int fileFormat); -+ public native long makeFileSystemDatasetFactoryWithFiles(String[] uris, int fileFormat, -+ ByteBuffer serializedFragmentScanOptions); - - /** - * Write the content in a {@link org.apache.arrow.c.ArrowArrayStream} into files. This internally -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java -index 637a3e8f2..6d6309140 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java -@@ -80,7 +80,8 @@ public class JniWrapper { - * @return the native pointer of the arrow::dataset::Scanner instance. - */ - public native long createScanner(long datasetId, String[] columns, ByteBuffer substraitProjection, -- ByteBuffer substraitFilter, long batchSize, long memoryPool); -+ ByteBuffer substraitFilter, long batchSize, long fileFormat, -+ ByteBuffer serializedFragmentScanOptions, long memoryPool); - - /** - * Get a serialized schema from native instance of a Scanner. -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java -index d9abad997..3a96fe768 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java -@@ -17,6 +17,9 @@ - - package org.apache.arrow.dataset.jni; - -+import java.nio.ByteBuffer; -+ -+import org.apache.arrow.dataset.scanner.FragmentScanOptions; - import org.apache.arrow.dataset.scanner.ScanOptions; - import org.apache.arrow.dataset.source.Dataset; - -@@ -40,11 +43,18 @@ public class NativeDataset implements Dataset { - if (closed) { - throw new NativeInstanceReleasedException(); - } -- -+ int fileFormat = -1; -+ ByteBuffer serialized = null; -+ if (options.getFragmentScanOptions().isPresent()) { -+ FragmentScanOptions fragmentScanOptions = options.getFragmentScanOptions().get(); -+ fileFormat = fragmentScanOptions.fileFormatId(); -+ serialized = fragmentScanOptions.serialize(); -+ } - long scannerId = JniWrapper.get().createScanner(datasetId, options.getColumns().orElse(null), - options.getSubstraitProjection().orElse(null), - options.getSubstraitFilter().orElse(null), -- options.getBatchSize(), context.getMemoryPool().getNativeInstanceId()); -+ options.getBatchSize(), fileFormat, serialized, -+ context.getMemoryPool().getNativeInstanceId()); - - return new NativeScanner(context, scannerId); - } -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java -new file mode 100644 -index 000000000..8acb2b2d4 ---- /dev/null -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java -@@ -0,0 +1,50 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.arrow.dataset.scanner; -+ -+import java.nio.ByteBuffer; -+import java.util.Map; -+ -+import org.apache.arrow.dataset.substrait.util.ConvertUtil; -+ -+import io.substrait.proto.Expression; -+ -+public interface FragmentScanOptions { -+ String typeName(); -+ -+ int fileFormatId(); -+ -+ ByteBuffer serialize(); -+ -+ /** -+ * serialize the map. -+ * -+ * @param config config map -+ * @return bufer to jni call argument, should be DirectByteBuffer -+ */ -+ default ByteBuffer serializeMap(Map config) { -+ if (config.isEmpty()) { -+ return null; -+ } -+ -+ Expression.Literal literal = ConvertUtil.mapToExpressionLiteral(config); -+ ByteBuffer buf = ByteBuffer.allocateDirect(literal.getSerializedSize()); -+ buf.put(literal.toByteArray()); -+ return buf; -+ } -+} -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java -index 995d05ac3..aad71930c 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java -@@ -31,6 +31,8 @@ public class ScanOptions { - private final Optional substraitProjection; - private final Optional substraitFilter; - -+ private final Optional fragmentScanOptions; -+ - /** - * Constructor. - * @param columns Projected columns. Empty for scanning all columns. -@@ -61,6 +63,7 @@ public class ScanOptions { - this.columns = columns; - this.substraitProjection = Optional.empty(); - this.substraitFilter = Optional.empty(); -+ this.fragmentScanOptions = Optional.empty(); - } - - public ScanOptions(long batchSize) { -@@ -83,6 +86,10 @@ public class ScanOptions { - return substraitFilter; - } - -+ public Optional getFragmentScanOptions() { -+ return fragmentScanOptions; -+ } -+ - /** - * Builder for Options used during scanning. - */ -@@ -91,6 +98,7 @@ public class ScanOptions { - private Optional columns; - private ByteBuffer substraitProjection; - private ByteBuffer substraitFilter; -+ private FragmentScanOptions fragmentScanOptions; - - /** - * Constructor. -@@ -136,6 +144,18 @@ public class ScanOptions { - return this; - } - -+ /** -+ * Set the FragmentScanOptions. -+ * -+ * @param fragmentScanOptions scan options -+ * @return the ScanOptions configured. -+ */ -+ public Builder fragmentScanOptions(FragmentScanOptions fragmentScanOptions) { -+ Preconditions.checkNotNull(fragmentScanOptions); -+ this.fragmentScanOptions = fragmentScanOptions; -+ return this; -+ } -+ - public ScanOptions build() { - return new ScanOptions(this); - } -@@ -146,5 +166,6 @@ public class ScanOptions { - columns = builder.columns; - substraitProjection = Optional.ofNullable(builder.substraitProjection); - substraitFilter = Optional.ofNullable(builder.substraitFilter); -+ fragmentScanOptions = Optional.ofNullable(builder.fragmentScanOptions); - } - } -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java -new file mode 100644 -index 000000000..08e35ede2 ---- /dev/null -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java -@@ -0,0 +1,51 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.arrow.dataset.scanner.csv; -+ -+import java.util.Map; -+import java.util.Optional; -+ -+import org.apache.arrow.c.ArrowSchema; -+ -+public class CsvConvertOptions { -+ -+ private final Map configs; -+ -+ private Optional cSchema = Optional.empty(); -+ -+ public CsvConvertOptions(Map configs) { -+ this.configs = configs; -+ } -+ -+ public Optional getArrowSchema() { -+ return cSchema; -+ } -+ -+ public Map getConfigs() { -+ return configs; -+ } -+ -+ public void set(String key, String value) { -+ configs.put(key, value); -+ } -+ -+ public void setArrowSchema(ArrowSchema cSchema) { -+ this.cSchema = Optional.of(cSchema); -+ } -+ -+} -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java -new file mode 100644 -index 000000000..88973f0a0 ---- /dev/null -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java -@@ -0,0 +1,97 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.arrow.dataset.scanner.csv; -+ -+import java.io.Serializable; -+import java.nio.ByteBuffer; -+import java.util.Locale; -+import java.util.Map; -+import java.util.stream.Collectors; -+import java.util.stream.Stream; -+ -+import org.apache.arrow.dataset.file.FileFormat; -+import org.apache.arrow.dataset.scanner.FragmentScanOptions; -+ -+public class CsvFragmentScanOptions implements Serializable, FragmentScanOptions { -+ private final CsvConvertOptions convertOptions; -+ private final Map readOptions; -+ private final Map parseOptions; -+ -+ -+ /** -+ * csv scan options, map to CPP struct CsvFragmentScanOptions. -+ * -+ * @param convertOptions same struct in CPP -+ * @param readOptions same struct in CPP -+ * @param parseOptions same struct in CPP -+ */ -+ public CsvFragmentScanOptions(CsvConvertOptions convertOptions, -+ Map readOptions, -+ Map parseOptions) { -+ this.convertOptions = convertOptions; -+ this.readOptions = readOptions; -+ this.parseOptions = parseOptions; -+ } -+ -+ public String typeName() { -+ return FileFormat.CSV.name().toLowerCase(Locale.ROOT); -+ } -+ -+ /** -+ * File format id. -+ * -+ * @return id -+ */ -+ public int fileFormatId() { -+ return FileFormat.CSV.id(); -+ } -+ -+ /** -+ * Serialize this class to ByteBuffer and then called by jni call. -+ * -+ * @return DirectByteBuffer -+ */ -+ public ByteBuffer serialize() { -+ Map options = Stream.concat(Stream.concat(readOptions.entrySet().stream(), -+ parseOptions.entrySet().stream()), -+ convertOptions.getConfigs().entrySet().stream()).collect( -+ Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); -+ -+ if (convertOptions.getArrowSchema().isPresent()) { -+ options.put("column_types", Long.toString(convertOptions.getArrowSchema().get().memoryAddress())); -+ } -+ return serializeMap(options); -+ } -+ -+ public static CsvFragmentScanOptions deserialize(String serialized) { -+ throw new UnsupportedOperationException("Not implemented now"); -+ } -+ -+ public CsvConvertOptions getConvertOptions() { -+ return convertOptions; -+ } -+ -+ public Map getReadOptions() { -+ return readOptions; -+ } -+ -+ public Map getParseOptions() { -+ return parseOptions; -+ } -+ -+} -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/util/ConvertUtil.java b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/util/ConvertUtil.java -new file mode 100644 -index 000000000..31a4023af ---- /dev/null -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/util/ConvertUtil.java -@@ -0,0 +1,46 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.arrow.dataset.substrait.util; -+ -+import java.util.Map; -+ -+import io.substrait.proto.Expression; -+ -+public class ConvertUtil { -+ -+ /** -+ * Convert map to substrait Expression. -+ * -+ * @return Substrait Expression -+ */ -+ public static Expression.Literal mapToExpressionLiteral(Map values) { -+ Expression.Literal.Builder literalBuilder = Expression.Literal.newBuilder(); -+ Expression.Literal.Map.KeyValue.Builder keyValueBuilder = -+ Expression.Literal.Map.KeyValue.newBuilder(); -+ Expression.Literal.Map.Builder mapBuilder = Expression.Literal.Map.newBuilder(); -+ for (Map.Entry entry : values.entrySet()) { -+ literalBuilder.setString(entry.getKey()); -+ keyValueBuilder.setKey(literalBuilder.build()); -+ literalBuilder.setString(entry.getValue()); -+ keyValueBuilder.setValue(literalBuilder.build()); -+ mapBuilder.addKeyValues(keyValueBuilder.build()); -+ } -+ literalBuilder.setMap(mapBuilder.build()); -+ return literalBuilder.build(); -+ } -+} -diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java b/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java -index 0fba72892..e7903b7a4 100644 ---- a/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java -+++ b/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java -@@ -31,6 +31,9 @@ import java.util.HashMap; - import java.util.Map; - import java.util.Optional; - -+import org.apache.arrow.c.ArrowSchema; -+import org.apache.arrow.c.CDataDictionaryProvider; -+import org.apache.arrow.c.Data; - import org.apache.arrow.dataset.ParquetWriteSupport; - import org.apache.arrow.dataset.TestDataset; - import org.apache.arrow.dataset.file.FileFormat; -@@ -38,8 +41,11 @@ import org.apache.arrow.dataset.file.FileSystemDatasetFactory; - import org.apache.arrow.dataset.jni.NativeMemoryPool; - import org.apache.arrow.dataset.scanner.ScanOptions; - import org.apache.arrow.dataset.scanner.Scanner; -+import org.apache.arrow.dataset.scanner.csv.CsvConvertOptions; -+import org.apache.arrow.dataset.scanner.csv.CsvFragmentScanOptions; - import org.apache.arrow.dataset.source.Dataset; - import org.apache.arrow.dataset.source.DatasetFactory; -+import org.apache.arrow.memory.BufferAllocator; - import org.apache.arrow.vector.ipc.ArrowReader; - import org.apache.arrow.vector.types.pojo.ArrowType; - import org.apache.arrow.vector.types.pojo.Field; -@@ -49,6 +55,8 @@ import org.junit.ClassRule; - import org.junit.Test; - import org.junit.rules.TemporaryFolder; - -+import com.google.common.collect.ImmutableMap; -+ - public class TestAceroSubstraitConsumer extends TestDataset { - - @ClassRule -@@ -457,4 +465,42 @@ public class TestAceroSubstraitConsumer extends TestDataset { - substraitExpression.put(decodedSubstrait); - return substraitExpression; - } -+ -+ @Test -+ public void testCsvConvertOptions() throws Exception { -+ final Schema schema = new Schema(Arrays.asList( -+ Field.nullable("Id", new ArrowType.Int(32, true)), -+ Field.nullable("Name", new ArrowType.Utf8()), -+ Field.nullable("Language", new ArrowType.Utf8()) -+ ), null); -+ String path = "file://" + getClass().getResource("/").getPath() + "/data/student.csv"; -+ BufferAllocator allocator = rootAllocator(); -+ try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator); -+ CDataDictionaryProvider provider = new CDataDictionaryProvider()) { -+ Data.exportSchema(allocator, schema, provider, cSchema); -+ CsvConvertOptions convertOptions = new CsvConvertOptions(ImmutableMap.of("delimiter", ";")); -+ convertOptions.setArrowSchema(cSchema); -+ CsvFragmentScanOptions fragmentScanOptions = new CsvFragmentScanOptions( -+ convertOptions, ImmutableMap.of(), ImmutableMap.of()); -+ ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768) -+ .columns(Optional.empty()) -+ .fragmentScanOptions(fragmentScanOptions) -+ .build(); -+ try ( -+ DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), -+ FileFormat.CSV, path); -+ Dataset dataset = datasetFactory.finish(); -+ Scanner scanner = dataset.newScan(options); -+ ArrowReader reader = scanner.scanBatches() -+ ) { -+ assertEquals(schema.getFields(), reader.getVectorSchemaRoot().getSchema().getFields()); -+ int rowCount = 0; -+ while (reader.loadNextBatch()) { -+ assertEquals("[1, 2, 3]", reader.getVectorSchemaRoot().getVector("Id").toString()); -+ rowCount += reader.getVectorSchemaRoot().getRowCount(); -+ } -+ assertEquals(3, rowCount); -+ } -+ } -+ } - } -diff --git a/java/dataset/src/test/resources/data/student.csv b/java/dataset/src/test/resources/data/student.csv -new file mode 100644 -index 000000000..329194609 ---- /dev/null -+++ b/java/dataset/src/test/resources/data/student.csv -@@ -0,0 +1,4 @@ -+Id;Name;Language -+1;Juno;Java -+2;Peter;Python -+3;Celin;C++ diff --git a/ep/build-velox/src/modify_velox.patch b/ep/build-velox/src/modify_velox.patch index aee406c3eae08..bfaacd085d44e 100644 --- a/ep/build-velox/src/modify_velox.patch +++ b/ep/build-velox/src/modify_velox.patch @@ -84,12 +84,11 @@ index ce4c24dbe..785a2acc6 100644 -DARROW_WITH_THRIFT=ON -DARROW_WITH_LZ4=ON -DARROW_WITH_SNAPPY=ON -@@ -69,6 +71,8 @@ if(VELOX_ENABLE_ARROW) +@@ -69,6 +71,7 @@ if(VELOX_ENABLE_ARROW) arrow_ep PREFIX ${ARROW_PREFIX} URL ${VELOX_ARROW_SOURCE_URL} + PATCH_COMMAND patch -p1 < ${CMAKE_CURRENT_SOURCE_DIR}/modify_arrow.patch -+ COMMAND patch -p1 < ${CMAKE_CURRENT_SOURCE_DIR}/modify_arrow_dataset_scan_option.patch URL_HASH ${VELOX_ARROW_BUILD_SHA256_CHECKSUM} SOURCE_SUBDIR cpp CMAKE_ARGS ${ARROW_CMAKE_ARGS} diff --git a/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala b/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala index 95391a2c42f56..7d2d48828fb32 100644 --- a/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala +++ b/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala @@ -319,26 +319,6 @@ abstract class WholeStageTransformerSuite df } - /** - * Some rule on LogicalPlan will not only apply in select query, the total df.load() should in - * spark environment with gluten disabled config. - * - * @param sql - * @param f - * @return - */ - protected def runAndCompare(sql: String)(f: => Unit): DataFrame = { - var expected: Seq[Row] = null - withSQLConf(vanillaSparkConfs(): _*) { - f - expected = spark.sql(sql).collect() - } - f - val df = spark.sql(sql) - checkAnswer(df, expected) - df - } - protected def runQueryAndCompare( sqlStr: String, compareResult: Boolean = true, diff --git a/gluten-data/pom.xml b/gluten-data/pom.xml index c28490d77faad..bb84a06b4125d 100644 --- a/gluten-data/pom.xml +++ b/gluten-data/pom.xml @@ -124,7 +124,7 @@ org.apache.arrow arrow-c-data - ${arrow-gluten.version} + ${arrow.version} org.apache.arrow @@ -140,7 +140,7 @@ org.apache.arrow arrow-dataset - ${arrow-gluten.version} + ${arrow.version} io.netty diff --git a/gluten-data/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala b/gluten-data/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala index a94f8f2e3d49c..99eb72c70ea3f 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala @@ -16,12 +16,16 @@ */ package org.apache.gluten.utils +import org.apache.gluten.exception.SchemaMismatchException import org.apache.gluten.vectorized.ArrowWritableColumnVector import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.vectorized.ArrowColumnVectorUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.utils.{SparkArrowUtil, SparkSchemaUtil} import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} @@ -29,15 +33,13 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.arrow.c.{ArrowSchema, CDataDictionaryProvider, Data} import org.apache.arrow.dataset.file.{FileFormat, FileSystemDatasetFactory} import org.apache.arrow.dataset.jni.NativeMemoryPool -import org.apache.arrow.dataset.scanner.FragmentScanOptions import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector.ipc.message.ArrowRecordBatch import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema} import org.apache.hadoop.fs.FileStatus -import java.net.{URI, URLDecoder} +import java.net.URI import java.util -import java.util.Optional import scala.collection.JavaConverters._ import scala.collection.mutable @@ -97,6 +99,26 @@ object ArrowUtil extends Logging { new Schema(fields) } + def getFormat(format: String): FileFormat = { + format match { + case "parquet" => FileFormat.PARQUET + case "orc" => FileFormat.ORC + case "csv" => FileFormat.CSV + case _ => throw new IllegalArgumentException("Unrecognizable format") + } + } + + def getFormat(format: org.apache.spark.sql.execution.datasources.FileFormat): FileFormat = { + format match { + case _: ParquetFileFormat => + FileFormat.PARQUET + case _: CSVFileFormat => + FileFormat.CSV + case _ => + throw new IllegalArgumentException("Unrecognizable format") + } + } + private def rewriteUri(encodeUri: String): String = { val decodedUri = encodeUri val uri = URI.create(decodedUri) @@ -120,49 +142,19 @@ object ArrowUtil extends Logging { def makeArrowDiscovery( encodedUri: String, format: FileFormat, - option: Optional[FragmentScanOptions], allocator: BufferAllocator, - pool: NativeMemoryPool - ): FileSystemDatasetFactory = { - val factory = - new FileSystemDatasetFactory(allocator, pool, format, rewriteUri(encodedUri), option) + pool: NativeMemoryPool): FileSystemDatasetFactory = { + val factory = new FileSystemDatasetFactory(allocator, pool, format, rewriteUri(encodedUri)) factory } - def readArrowSchema( - file: String, - format: FileFormat, - option: FragmentScanOptions, - allocator: BufferAllocator, - pool: NativeMemoryPool): Schema = { - val factory: FileSystemDatasetFactory = - makeArrowDiscovery(file, format, Optional.of(option), allocator, pool) - val schema = factory.inspect() - factory.close() - schema - } - - def readArrowFileColumnNames( - file: String, - format: FileFormat, - option: FragmentScanOptions, - allocator: BufferAllocator, - pool: NativeMemoryPool): Array[String] = { - val fileFields = ArrowUtil - .readArrowSchema(URLDecoder.decode(file, "UTF-8"), format, option, allocator, pool) - .getFields - .asScala - fileFields.map(_.getName).toArray - } - def readSchema( file: FileStatus, format: FileFormat, - option: FragmentScanOptions, allocator: BufferAllocator, pool: NativeMemoryPool): Option[StructType] = { val factory: FileSystemDatasetFactory = - makeArrowDiscovery(file.getPath.toString, format, Optional.of(option), allocator, pool) + makeArrowDiscovery(file.getPath.toString, format, allocator, pool) val schema = factory.inspect() try { Option(SparkSchemaUtil.fromArrowSchema(schema)) @@ -174,14 +166,67 @@ object ArrowUtil extends Logging { def readSchema( files: Seq[FileStatus], format: FileFormat, - option: FragmentScanOptions, allocator: BufferAllocator, pool: NativeMemoryPool): Option[StructType] = { if (files.isEmpty) { throw new IllegalArgumentException("No input file specified") } - readSchema(files.head, format, option, allocator, pool) + readSchema(files.head, format, allocator, pool) + } + + def compareStringFunc(caseSensitive: Boolean): (String, String) => Boolean = { + if (caseSensitive) { (str1: String, str2: String) => str1.equals(str2) } + else { (str1: String, str2: String) => str1.equalsIgnoreCase(str2) } + } + + // If user specify schema by .schema(newSchemaDifferentWithFile) + def checkSchema( + requiredField: DataType, + parquetFileFieldType: ArrowType, + parquetFileFields: mutable.Buffer[Field]): Unit = { + val requiredFieldType = + SparkArrowUtil.toArrowType(requiredField, SparkSchemaUtil.getLocalTimezoneID) + if (!requiredFieldType.equals(parquetFileFieldType)) { + val arrowFileSchema = parquetFileFields + .map(f => f.toString) + .reduceLeft((f1, f2) => f1 + "\n" + f2) + throw new SchemaMismatchException( + s"Not support specified schema is different with file schema\n$arrowFileSchema") + } + } + + def getRequestedField( + requiredSchema: StructType, + parquetFileFields: mutable.Buffer[Field], + caseSensitive: Boolean): Schema = { + val compareFunc = compareStringFunc(caseSensitive) + requiredSchema.foreach { + readField => + // TODO: check schema inside of complex type + val matchedFields = + parquetFileFields.filter(field => compareFunc(field.getName, readField.name)) + if (!caseSensitive && matchedFields.size > 1) { + // Need to fail if there is ambiguity, i.e. more than one field is matched + val fieldsString = matchedFields.map(_.getName).mkString("[", ", ", "]") + throw new RuntimeException( + s""" + |Found duplicate field(s) "${readField.name}": $fieldsString + + |in case-insensitive mode""".stripMargin.replaceAll("\n", " ")) + } + if (matchedFields.nonEmpty) { + checkSchema( + readField.dataType, + matchedFields.head.getFieldType.getType, + parquetFileFields) + } + } + + val requestColNames = requiredSchema.map(_.name) + new Schema(parquetFileFields.filter { + field => requestColNames.exists(col => compareFunc(col, field.getName)) + }.asJava) } def loadMissingColumns( @@ -217,14 +262,19 @@ object ArrowUtil extends Logging { def loadBatch( allocator: BufferAllocator, input: ArrowRecordBatch, - dataSchema: Schema, + dataSchema: StructType, + requiredSchema: StructType, partitionVectors: Array[ArrowWritableColumnVector] = Array.empty, nullVectors: Array[ArrowWritableColumnVector] = Array.empty): ColumnarBatch = { val rowCount: Int = input.getLength val vectors = try { - ArrowWritableColumnVector.loadColumns(rowCount, dataSchema, input, allocator) + ArrowWritableColumnVector.loadColumns( + rowCount, + SparkSchemaUtil.toArrowSchema(dataSchema), + input, + allocator) } finally { input.close() } @@ -232,8 +282,21 @@ object ArrowUtil extends Logging { val totalVectors = if (nullVectors.nonEmpty) { val finalVectors = mutable.ArrayBuffer[ArrowWritableColumnVector]() - finalVectors.appendAll(vectors) - finalVectors.appendAll(nullVectors) + val requiredIterator = requiredSchema.iterator + val compareFunc = compareStringFunc(SQLConf.get.caseSensitiveAnalysis) + while (requiredIterator.hasNext) { + val field = requiredIterator.next() + finalVectors.append(vectors + .find(vector => compareFunc(vector.getValueVector.getName, field.name)) + .getOrElse { + // The missing column need to be find in nullVectors + val nullVector = + nullVectors.find(vector => compareFunc(vector.getValueVector.getName, field.name)).get + nullVector.setValueCount(rowCount) + nullVector.retain() + nullVector + }) + } finalVectors.toArray } else { vectors diff --git a/gluten-ut/spark32/pom.xml b/gluten-ut/spark32/pom.xml index b0744589d1611..e026bb424129d 100644 --- a/gluten-ut/spark32/pom.xml +++ b/gluten-ut/spark32/pom.xml @@ -76,7 +76,7 @@ org.apache.arrow arrow-c-data - ${arrow-gluten.version} + ${arrow.version} test diff --git a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 5df53953e4cce..584ace94fbcc1 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -411,9 +411,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("save csv with empty fields with user defined empty values") .exclude("save csv with quote") .exclude("SPARK-13543 Write the output as uncompressed via option()") - .exclude("DDL test with tab separated file") - .exclude("DDL test parsing decimal type") - .exclude("test with tab delimiter and double quote") // Arrow not support corrupt record .exclude("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord") enableSuite[GlutenCSVv2Suite] @@ -426,9 +423,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("save csv with empty fields with user defined empty values") .exclude("save csv with quote") .exclude("SPARK-13543 Write the output as uncompressed via option()") - .exclude("DDL test with tab separated file") - .exclude("DDL test parsing decimal type") - .exclude("test with tab delimiter and double quote") // Rule org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown in batch // Early Filter and Projection Push-Down generated an invalid plan .exclude("SPARK-26208: write and read empty data to csv file with headers") @@ -441,9 +435,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("save csv with empty fields with user defined empty values") .exclude("save csv with quote") .exclude("SPARK-13543 Write the output as uncompressed via option()") - .exclude("DDL test with tab separated file") - .exclude("DDL test parsing decimal type") - .exclude("test with tab delimiter and double quote") enableSuite[GlutenJsonV1Suite] // FIXME: Array direct selection fails .exclude("Complex field and type inferring") diff --git a/gluten-ut/spark33/pom.xml b/gluten-ut/spark33/pom.xml index 5f9a28e2459d3..0f0a0703c3c50 100644 --- a/gluten-ut/spark33/pom.xml +++ b/gluten-ut/spark33/pom.xml @@ -83,7 +83,7 @@ org.apache.arrow arrow-c-data - ${arrow-gluten.version} + ${arrow.version} test diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 8418cba237e31..223b2dd5e9668 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -206,9 +206,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("save csv with empty fields with user defined empty values") .exclude("save csv with quote") .exclude("SPARK-13543 Write the output as uncompressed via option()") - .exclude("DDL test with tab separated file") - .exclude("DDL test parsing decimal type") - .exclude("test with tab delimiter and double quote") // Arrow not support corrupt record .exclude("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord") enableSuite[GlutenCSVv2Suite] @@ -221,9 +218,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("save csv with empty fields with user defined empty values") .exclude("save csv with quote") .exclude("SPARK-13543 Write the output as uncompressed via option()") - .exclude("DDL test with tab separated file") - .exclude("DDL test parsing decimal type") - .exclude("test with tab delimiter and double quote") // Rule org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown in batch // Early Filter and Projection Push-Down generated an invalid plan .exclude("SPARK-26208: write and read empty data to csv file with headers") @@ -235,9 +229,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("save csv with empty fields with user defined empty values") .exclude("save csv with quote") .exclude("SPARK-13543 Write the output as uncompressed via option()") - .exclude("DDL test with tab separated file") - .exclude("DDL test parsing decimal type") - .exclude("test with tab delimiter and double quote") // Arrow not support corrupt record .exclude("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord") enableSuite[GlutenJsonV1Suite] diff --git a/gluten-ut/spark34/pom.xml b/gluten-ut/spark34/pom.xml index a8d24d5fd219f..d30f9644dbb4d 100644 --- a/gluten-ut/spark34/pom.xml +++ b/gluten-ut/spark34/pom.xml @@ -83,7 +83,7 @@ org.apache.arrow arrow-c-data - ${arrow-gluten.version} + ${arrow.version} test diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 91127c4ba9bb3..8f14ab50506db 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -185,9 +185,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("save csv with empty fields with user defined empty values") .exclude("save csv with quote") .exclude("SPARK-13543 Write the output as uncompressed via option()") - .exclude("DDL test with tab separated file") - .exclude("DDL test parsing decimal type") - .exclude("test with tab delimiter and double quote") // Arrow not support corrupt record .exclude("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord") enableSuite[GlutenCSVv2Suite] @@ -200,9 +197,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("save csv with empty fields with user defined empty values") .exclude("save csv with quote") .exclude("SPARK-13543 Write the output as uncompressed via option()") - .exclude("DDL test with tab separated file") - .exclude("DDL test parsing decimal type") - .exclude("test with tab delimiter and double quote") // Rule org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown in batch // Early Filter and Projection Push-Down generated an invalid plan .exclude("SPARK-26208: write and read empty data to csv file with headers") @@ -214,9 +208,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("save csv with empty fields with user defined empty values") .exclude("save csv with quote") .exclude("SPARK-13543 Write the output as uncompressed via option()") - .exclude("DDL test with tab separated file") - .exclude("DDL test parsing decimal type") - .exclude("test with tab delimiter and double quote") // Arrow not support corrupt record .exclude("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord") enableSuite[GlutenJsonV1Suite] diff --git a/gluten-ut/spark35/pom.xml b/gluten-ut/spark35/pom.xml index 2bf1c93a00522..0b7337c0f057d 100644 --- a/gluten-ut/spark35/pom.xml +++ b/gluten-ut/spark35/pom.xml @@ -117,7 +117,7 @@ org.apache.arrow arrow-c-data - ${arrow-gluten.version} + ${arrow.version} test diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 6162b56519806..57aa9d4ced953 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -188,9 +188,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("save csv with empty fields with user defined empty values") .exclude("save csv with quote") .exclude("SPARK-13543 Write the output as uncompressed via option()") - .exclude("DDL test with tab separated file") - .exclude("DDL test parsing decimal type") - .exclude("test with tab delimiter and double quote") // Arrow not support corrupt record .exclude("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord") enableSuite[GlutenCSVv2Suite] @@ -206,11 +203,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("save csv with empty fields with user defined empty values") .exclude("save csv with quote") .exclude("SPARK-13543 Write the output as uncompressed via option()") - .exclude("DDL test with tab separated file") - .exclude("DDL test parsing decimal type") - .exclude("test with tab delimiter and double quote") - // Arrow not support corrupt record - .exclude("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord") enableSuite[GlutenCSVLegacyTimeParserSuite] // file cars.csv include null string, Arrow not support to read .exclude("DDL test with schema") @@ -221,9 +213,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-13543 Write the output as uncompressed via option()") // Arrow not support corrupt record .exclude("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord") - .exclude("DDL test with tab separated file") - .exclude("DDL test parsing decimal type") - .exclude("test with tab delimiter and double quote") enableSuite[GlutenJsonV1Suite] // FIXME: Array direct selection fails .exclude("Complex field and type inferring") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala index 8b75dad33c385..cb7ce87f97dad 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala @@ -20,14 +20,16 @@ import org.apache.gluten.GlutenConfig import org.apache.gluten.exception.GlutenException import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, Row} +import org.apache.spark.sql.{AnalysisException, GlutenSQLTestsBaseTrait, Row} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DateType, IntegerType, StructType, TimestampType} +import org.apache.spark.sql.types.{DateType, IntegerType, StringType, StructType, TimestampType} import org.scalatest.exceptions.TestFailedException import java.sql.{Date, Timestamp} +import scala.collection.JavaConverters.seqAsJavaListConverter + class GlutenCSVSuite extends CSVSuite with GlutenSQLTestsBaseTrait { override def sparkConf: SparkConf = @@ -41,9 +43,68 @@ class GlutenCSVSuite extends CSVSuite with GlutenSQLTestsBaseTrait { } class GlutenCSVv1Suite extends GlutenCSVSuite { + import testImplicits._ override def sparkConf: SparkConf = super.sparkConf .set(SQLConf.USE_V1_SOURCE_LIST, "csv") + + testGluten("SPARK-23786: Ignore column name case if spark.sql.caseSensitive is false") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + withTempPath { + path => + val oschema = new StructType().add("A", StringType) + // change the row content 0 to string bbb in Gluten for test + val odf = spark.createDataFrame(List(Row("bbb")).asJava, oschema) + odf.write.option("header", true).csv(path.getCanonicalPath) + val ischema = new StructType().add("a", StringType) + val idf = spark.read + .schema(ischema) + .option("header", true) + .option("enforceSchema", false) + .csv(path.getCanonicalPath) + checkAnswer(idf, odf) + } + } + } + + testGluten("case sensitivity of filters references") { + Seq(true, false).foreach { + filterPushdown => + withSQLConf(SQLConf.CSV_FILTER_PUSHDOWN_ENABLED.key -> filterPushdown.toString) { + withTempPath { + path => + Seq("""aaa,BBB""", """0,1""", """2,3""") + .toDF() + .repartition(1) + .write + .text(path.getCanonicalPath) + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + // change the schema to Arrow schema to support read in Gluten + val readback = spark.read + .schema("aaa long, BBB long") + .option("header", true) + .csv(path.getCanonicalPath) + checkAnswer(readback, Seq(Row(2, 3), Row(0, 1))) + checkAnswer(readback.filter($"AAA" === 2 && $"bbb" === 3), Seq(Row(2, 3))) + } + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val readback = spark.read + .schema("aaa long, BBB long") + .option("header", true) + .csv(path.getCanonicalPath) + checkAnswer(readback, Seq(Row(2, 3), Row(0, 1))) + checkError( + exception = intercept[AnalysisException] { + readback.filter($"AAA" === 2 && $"bbb" === 3).collect() + }, + errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION", + parameters = Map("objectName" -> "`AAA`", "proposal" -> "`BBB`, `aaa`") + ) + } + } + } + } + } } class GlutenCSVv2Suite extends GlutenCSVSuite { diff --git a/gluten-ut/test/pom.xml b/gluten-ut/test/pom.xml index 25ec542deab2b..d55e6ca917e7b 100644 --- a/gluten-ut/test/pom.xml +++ b/gluten-ut/test/pom.xml @@ -90,7 +90,7 @@ org.apache.arrow arrow-c-data - ${arrow-gluten.version} + ${arrow.version} test diff --git a/pom.xml b/pom.xml index 81ce0e5d462af..1a47c50006eea 100644 --- a/pom.xml +++ b/pom.xml @@ -56,7 +56,6 @@ 0.3.2-incubating 0.8.0 15.0.0 - 15.0.0-gluten arrow-memory-unsafe 2.7.4 UTF-8