diff --git a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala index 52f77563814e..24adf74d3e07 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala @@ -43,7 +43,6 @@ import org.apache.spark.util.SerializableConfiguration import org.apache.arrow.c.ArrowSchema import org.apache.arrow.dataset.file.FileSystemDatasetFactory import org.apache.arrow.dataset.scanner.ScanOptions -import org.apache.arrow.dataset.scanner.csv.CsvFragmentScanOptions import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector.VectorUnloader import org.apache.arrow.vector.types.pojo.Schema @@ -76,11 +75,9 @@ class ArrowCSVFileFormat(parsedOptions: CSVOptions) sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { - val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions) ArrowUtil.readSchema( files, fileFormat, - arrowConfig, ArrowBufferAllocators.contextInstance(), ArrowNativeMemoryPool.arrowPool("infer schema")) } @@ -116,14 +113,12 @@ class ArrowCSVFileFormat(parsedOptions: CSVOptions) actualFilters, broadcastedHadoopConf.value.value) - val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions) val allocator = ArrowBufferAllocators.contextInstance() // todo predicate validation / pushdown val fileNames = ArrowUtil .readArrowFileColumnNames( URLDecoder.decode(file.filePath.toString, "UTF-8"), fileFormat, - arrowConfig, ArrowBufferAllocators.contextInstance(), pool) val tokenIndexArr = @@ -140,18 +135,15 @@ class ArrowCSVFileFormat(parsedOptions: CSVOptions) val cSchema: ArrowSchema = ArrowSchema.allocateNew(allocator) val cSchema2: ArrowSchema = ArrowSchema.allocateNew(allocator) try { - ArrowCSVOptionConverter.schema(requestSchema, cSchema, allocator, arrowConfig) val factory = ArrowUtil.makeArrowDiscovery( URLDecoder.decode(file.filePath.toString, "UTF-8"), fileFormat, - Optional.of(arrowConfig), ArrowBufferAllocators.contextInstance(), pool) val fields = factory.inspect().getFields val actualReadFields = new Schema( fileIndex.map(index => fields.get(index)).toIterable.asJava) - ArrowCSVOptionConverter.schema(requestSchema, cSchema2, allocator, arrowConfig) ArrowCSVFileFormat .readArrow( ArrowBufferAllocators.contextInstance(), @@ -160,8 +152,7 @@ class ArrowCSVFileFormat(parsedOptions: CSVOptions) missingSchema, partitionSchema, factory, - batchSize, - arrowConfig) + batchSize) .asInstanceOf[Iterator[InternalRow]] } catch { case e: SchemaMismatchException => @@ -222,13 +213,11 @@ object ArrowCSVFileFormat { missingSchema: StructType, partitionSchema: StructType, factory: FileSystemDatasetFactory, - batchSize: Int, - arrowConfig: CsvFragmentScanOptions): Iterator[ColumnarBatch] = { + batchSize: Int): Iterator[ColumnarBatch] = { val actualReadFieldNames = actualReadFields.getFields.asScala.map(_.getName).toArray val dataset = factory.finish(actualReadFields) val scanOptions = new ScanOptions.Builder(batchSize) .columns(Optional.of(actualReadFieldNames)) - .fragmentScanOptions(arrowConfig) .build() val scanner = dataset.newScan(scanOptions) diff --git a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVOptionConverter.scala b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVOptionConverter.scala deleted file mode 100644 index 7d6a54c2ac7a..000000000000 --- a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVOptionConverter.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.datasource - -import org.apache.gluten.utils.ArrowAbiUtil - -import org.apache.spark.sql.catalyst.csv.CSVOptions -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.utils.SparkSchemaUtil - -import com.google.common.collect.ImmutableMap -import org.apache.arrow.c.ArrowSchema -import org.apache.arrow.dataset.scanner.csv.{CsvConvertOptions, CsvFragmentScanOptions} -import org.apache.arrow.memory.BufferAllocator - -import java.util - -object ArrowCSVOptionConverter { - def convert(option: CSVOptions): CsvFragmentScanOptions = { - val parseMap = new util.HashMap[String, String]() - val default = new CSVOptions( - CaseInsensitiveMap(Map()), - option.columnPruning, - SparkSchemaUtil.getLocalTimezoneID) - parseMap.put("strings_can_be_null", "true") - if (option.delimiter != default.delimiter) { - parseMap.put("delimiter", option.delimiter) - } - if (option.escapeQuotes != default.escapeQuotes) { - parseMap.put("quoting", (!option.escapeQuotes).toString) - } - - val convertOptions = new CsvConvertOptions(ImmutableMap.of()) - new CsvFragmentScanOptions(convertOptions, ImmutableMap.of(), parseMap) - } - - def schema( - requiredSchema: StructType, - cSchema: ArrowSchema, - allocator: BufferAllocator, - option: CsvFragmentScanOptions): Unit = { - val schema = SparkSchemaUtil.toArrowSchema(requiredSchema) - ArrowAbiUtil.exportSchema(allocator, schema, cSchema) - option.getConvertOptions.setArrowSchema(cSchema) - } - -} diff --git a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala index c930cebebe69..f6875e250728 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.datasource.v2 -import org.apache.gluten.datasource.{ArrowCSVFileFormat, ArrowCSVOptionConverter} +import org.apache.gluten.datasource.ArrowCSVFileFormat import org.apache.gluten.exception.SchemaMismatchException import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.memory.arrow.pool.ArrowNativeMemoryPool @@ -40,7 +40,6 @@ import org.apache.arrow.c.ArrowSchema import org.apache.arrow.vector.types.pojo.Schema import java.net.URLDecoder -import java.util.Optional import scala.collection.JavaConverters.asJavaIterableConverter @@ -91,12 +90,10 @@ case class ArrowCSVPartitionReaderFactory( ArrowBufferAllocators.contextInstance(), ArrowNativeMemoryPool.arrowPool("FileSystemFactory")) } - val arrowConfig = ArrowCSVOptionConverter.convert(options) val fileNames = ArrowUtil .readArrowFileColumnNames( URLDecoder.decode(partitionedFile.filePath.toString, "UTF-8"), fileFormat, - arrowConfig, ArrowBufferAllocators.contextInstance(), pool) val tokenIndexArr = @@ -113,18 +110,15 @@ case class ArrowCSVPartitionReaderFactory( // TODO: support array/map/struct types in out-of-order schema reading. val iter = try { - ArrowCSVOptionConverter.schema(requestSchema, cSchema, allocator, arrowConfig) val factory = ArrowUtil.makeArrowDiscovery( URLDecoder.decode(partitionedFile.filePath.toString, "UTF-8"), fileFormat, - Optional.of(arrowConfig), ArrowBufferAllocators.contextInstance(), pool) val fields = factory.inspect().getFields val actualReadFields = new Schema( fileIndex.map(index => fields.get(index)).toIterable.asJava) - ArrowCSVOptionConverter.schema(requestSchema, cSchema2, allocator, arrowConfig) ArrowCSVFileFormat .readArrow( ArrowBufferAllocators.contextInstance(), @@ -133,8 +127,7 @@ case class ArrowCSVPartitionReaderFactory( missingSchema, readPartitionSchema, factory, - batchSize, - arrowConfig) + batchSize) } catch { case e: SchemaMismatchException => logWarning(e.getMessage) diff --git a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala index 3eaf4e35fd21..6c3ca0416ec5 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala @@ -16,7 +16,6 @@ */ package org.apache.gluten.datasource.v2 -import org.apache.gluten.datasource.ArrowCSVOptionConverter import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.memory.arrow.pool.ArrowNativeMemoryPool import org.apache.gluten.utils.ArrowUtil @@ -58,11 +57,9 @@ case class ArrowCSVTable( sparkSession.sessionState.conf.sessionLocalTimeZone, sparkSession.sessionState.conf.columnNameOfCorruptRecord ) - val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions) ArrowUtil.readSchema( files.head, org.apache.arrow.dataset.file.FileFormat.CSV, - arrowConfig, allocator, pool ) diff --git a/dev/build_arrow.sh b/dev/build_arrow.sh index e7496350f988..508767a1a7e9 100755 --- a/dev/build_arrow.sh +++ b/dev/build_arrow.sh @@ -16,6 +16,8 @@ set -exu +BUILD_ARROW_JAVA=ON + CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd) export SUDO=sudo source ${CURRENT_DIR}/build_helper_functions.sh @@ -23,12 +25,25 @@ VELOX_ARROW_BUILD_VERSION=15.0.0 ARROW_PREFIX=$CURRENT_DIR/../ep/_ep/arrow_ep BUILD_TYPE=Release +for arg in "$@" +do + case $arg in + --build_arrow_java=*) + BUILD_ARROW_JAVA=("${arg#*=}") + shift # Remove argument name from processing + ;; + *) + OTHER_ARGUMENTS+=("$1") + shift # Remove generic argument from processing + ;; + esac +done + function prepare_arrow_build() { mkdir -p ${ARROW_PREFIX}/../ && pushd ${ARROW_PREFIX}/../ && sudo rm -rf arrow_ep/ wget_and_untar https://archive.apache.org/dist/arrow/arrow-${VELOX_ARROW_BUILD_VERSION}/apache-arrow-${VELOX_ARROW_BUILD_VERSION}.tar.gz arrow_ep cd arrow_ep patch -p1 < $CURRENT_DIR/../ep/build-velox/src/modify_arrow.patch - patch -p1 < $CURRENT_DIR/../ep/build-velox/src/modify_arrow_dataset_scan_option.patch popd } @@ -108,5 +123,9 @@ echo "Start to build Arrow" prepare_arrow_build build_arrow_cpp echo "Finished building arrow CPP" -build_arrow_java -echo "Finished building arrow Java" + +if [ $BUILD_ARROW_JAVA == "ON" ]; then + build_arrow_java + echo "Finished building arrow Java" +fi + diff --git a/dev/builddeps-veloxbe.sh b/dev/builddeps-veloxbe.sh index 1ed6e62e29d3..69cb38ba388e 100755 --- a/dev/builddeps-veloxbe.sh +++ b/dev/builddeps-veloxbe.sh @@ -30,6 +30,7 @@ VELOX_BRANCH="" VELOX_HOME="" VELOX_PARAMETER="" BUILD_ARROW=ON +BUILD_ARROW_JAVA=ON SPARK_VERSION=ALL # set default number of threads as cpu cores minus 2 @@ -184,7 +185,7 @@ concat_velox_param function build_arrow { cd $GLUTEN_DIR/dev - ./build_arrow.sh + ./build_arrow.sh --build_arrow_java=$BUILD_ARROW_JAVA } function build_velox { diff --git a/dev/ci-velox-buildshared-centos-8.sh b/dev/ci-velox-buildshared-centos-8.sh index ff9a62b798b5..c5edcf19f08e 100755 --- a/dev/ci-velox-buildshared-centos-8.sh +++ b/dev/ci-velox-buildshared-centos-8.sh @@ -4,4 +4,4 @@ set -e source /opt/rh/gcc-toolset-11/enable ./dev/builddeps-veloxbe.sh --run_setup_script=OFF --build_arrow=OFF --enable_ep_cache=OFF --build_tests=ON \ - --build_examples=ON --build_benchmarks=ON + --build_examples=ON --build_benchmarks=ON --build_arrow_java=OFF diff --git a/dev/ci-velox-buildstatic-centos-7.sh b/dev/ci-velox-buildstatic-centos-7.sh index 6895891a98e9..8bc4d53f44df 100755 --- a/dev/ci-velox-buildstatic-centos-7.sh +++ b/dev/ci-velox-buildstatic-centos-7.sh @@ -5,4 +5,4 @@ set -e source /opt/rh/devtoolset-11/enable export NUM_THREADS=4 ./dev/builddeps-veloxbe.sh --enable_vcpkg=ON --build_arrow=OFF --build_tests=OFF --build_benchmarks=OFF \ - --build_examples=OFF --enable_s3=ON --enable_gcs=ON --enable_hdfs=ON --enable_abfs=ON + --build_examples=OFF --enable_s3=ON --enable_gcs=ON --enable_hdfs=ON --enable_abfs=ON --build_arrow_java=OFF diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index 9752cf2a8a81..2f36defca4ee 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -171,11 +171,8 @@ function apply_compilation_fixes { velox_home=$2 sudo cp ${current_dir}/modify_arrow.patch ${velox_home}/CMake/resolve_dependency_modules/arrow/ - sudo cp ${current_dir}/modify_arrow_dataset_scan_option.patch ${velox_home}/CMake/resolve_dependency_modules/arrow/ git add ${velox_home}/CMake/resolve_dependency_modules/arrow/modify_arrow.patch # to avoid the file from being deleted by git clean -dffx :/ - git add ${velox_home}/CMake/resolve_dependency_modules/arrow/modify_arrow_dataset_scan_option.patch # to avoid the file from being deleted by git clean -dffx :/ - } function setup_linux { diff --git a/ep/build-velox/src/modify_arrow_dataset_scan_option.patch b/ep/build-velox/src/modify_arrow_dataset_scan_option.patch deleted file mode 100644 index 4af78c030c00..000000000000 --- a/ep/build-velox/src/modify_arrow_dataset_scan_option.patch +++ /dev/null @@ -1,883 +0,0 @@ -diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc -index 09ab77572..f09377cf9 100644 ---- a/cpp/src/arrow/dataset/file_csv.cc -+++ b/cpp/src/arrow/dataset/file_csv.cc -@@ -24,6 +24,7 @@ - #include - #include - -+#include "arrow/c/bridge.h" - #include "arrow/csv/options.h" - #include "arrow/csv/parser.h" - #include "arrow/csv/reader.h" -@@ -52,6 +53,9 @@ using internal::Executor; - using internal::SerialExecutor; - - namespace dataset { -+namespace { -+inline bool parseBool(const std::string& value) { return value == "true" ? true : false; } -+} // namespace - - struct CsvInspectedFragment : public InspectedFragment { - CsvInspectedFragment(std::vector column_names, -@@ -503,5 +507,33 @@ Future<> CsvFileWriter::FinishInternal() { - return Status::OK(); - } - -+Result> CsvFragmentScanOptions::from( -+ const std::unordered_map& configs) { -+ std::shared_ptr options = -+ std::make_shared(); -+ for (auto const& it : configs) { -+ auto& key = it.first; -+ auto& value = it.second; -+ if (key == "delimiter") { -+ options->parse_options.delimiter = value.data()[0]; -+ } else if (key == "quoting") { -+ options->parse_options.quoting = parseBool(value); -+ } else if (key == "column_types") { -+ int64_t schema_address = std::stol(value); -+ ArrowSchema* cSchema = reinterpret_cast(schema_address); -+ ARROW_ASSIGN_OR_RAISE(auto schema, arrow::ImportSchema(cSchema)); -+ auto& column_types = options->convert_options.column_types; -+ for (auto field : schema->fields()) { -+ column_types[field->name()] = field->type(); -+ } -+ } else if (key == "strings_can_be_null") { -+ options->convert_options.strings_can_be_null = parseBool(value); -+ } else { -+ return Status::Invalid("Config " + it.first + "is not supported."); -+ } -+ } -+ return options; -+} -+ - } // namespace dataset - } // namespace arrow -diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h -index 42e3fd724..4d2825183 100644 ---- a/cpp/src/arrow/dataset/file_csv.h -+++ b/cpp/src/arrow/dataset/file_csv.h -@@ -85,6 +85,9 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { - struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { - std::string type_name() const override { return kCsvTypeName; } - -+ static Result> from( -+ const std::unordered_map& configs); -+ - using StreamWrapFunc = std::function>( - std::shared_ptr)>; - -diff --git a/cpp/src/arrow/engine/substrait/expression_internal.cc b/cpp/src/arrow/engine/substrait/expression_internal.cc -index 5d892af9a..0f8b0448b 100644 ---- a/cpp/src/arrow/engine/substrait/expression_internal.cc -+++ b/cpp/src/arrow/engine/substrait/expression_internal.cc -@@ -1337,5 +1337,17 @@ Result> ToProto( - return std::move(out); - } - -+Status FromProto(const substrait::Expression::Literal& literal, -+ std::unordered_map& out) { -+ ARROW_RETURN_IF(!literal.has_map(), Status::Invalid("Literal does not have a map.")); -+ auto literalMap = literal.map(); -+ auto size = literalMap.key_values_size(); -+ for (auto i = 0; i < size; i++) { -+ substrait::Expression_Literal_Map_KeyValue keyValue = literalMap.key_values(i); -+ out.emplace(keyValue.key().string(), keyValue.value().string()); -+ } -+ return Status::OK(); -+} -+ - } // namespace engine - } // namespace arrow -diff --git a/cpp/src/arrow/engine/substrait/expression_internal.h b/cpp/src/arrow/engine/substrait/expression_internal.h -index 2ce2ee76a..9be81b7ab 100644 ---- a/cpp/src/arrow/engine/substrait/expression_internal.h -+++ b/cpp/src/arrow/engine/substrait/expression_internal.h -@@ -61,5 +61,9 @@ ARROW_ENGINE_EXPORT - Result FromProto(const substrait::AggregateFunction&, bool is_hash, - const ExtensionSet&, const ConversionOptions&); - -+ARROW_ENGINE_EXPORT -+Status FromProto(const substrait::Expression::Literal& literal, -+ std::unordered_map& out); -+ - } // namespace engine - } // namespace arrow -diff --git a/cpp/src/arrow/engine/substrait/serde.cc b/cpp/src/arrow/engine/substrait/serde.cc -index 9e670f121..02e5c7171 100644 ---- a/cpp/src/arrow/engine/substrait/serde.cc -+++ b/cpp/src/arrow/engine/substrait/serde.cc -@@ -247,6 +247,16 @@ Result DeserializeExpressions( - return FromProto(extended_expression, ext_set_out, conversion_options, registry); - } - -+Status DeserializeMap(const Buffer& buf, -+ std::unordered_map& out) { -+ // ARROW_ASSIGN_OR_RAISE(auto advanced_extension, -+ // ParseFromBuffer(buf)); -+ // return FromProto(advanced_extension, out); -+ ARROW_ASSIGN_OR_RAISE(auto literal, -+ ParseFromBuffer(buf)); -+ return FromProto(literal, out); -+} -+ - namespace { - - Result> MakeSingleDeclarationPlan( -diff --git a/cpp/src/arrow/engine/substrait/serde.h b/cpp/src/arrow/engine/substrait/serde.h -index ab749f4a6..6312ec239 100644 ---- a/cpp/src/arrow/engine/substrait/serde.h -+++ b/cpp/src/arrow/engine/substrait/serde.h -@@ -23,6 +23,7 @@ - #include - #include - #include -+#include - #include - - #include "arrow/compute/type_fwd.h" -@@ -183,6 +184,9 @@ ARROW_ENGINE_EXPORT Result DeserializeExpressions( - const ConversionOptions& conversion_options = {}, - ExtensionSet* ext_set_out = NULLPTR); - -+ARROW_ENGINE_EXPORT Status -+DeserializeMap(const Buffer& buf, std::unordered_map& out); -+ - /// \brief Deserializes a Substrait Type message to the corresponding Arrow type - /// - /// \param[in] buf a buffer containing the protobuf serialization of a Substrait Type -diff --git a/java/dataset/pom.xml b/java/dataset/pom.xml -index d4d3e2c0f..ce72eaa1f 100644 ---- a/java/dataset/pom.xml -+++ b/java/dataset/pom.xml -@@ -25,9 +25,10 @@ - jar - - ../../../cpp/release-build/ -- 2.5.0 - 1.11.0 - 1.11.3 -+ 0.31.0 -+ 3.25.3 - - - -@@ -47,6 +48,18 @@ - arrow-c-data - compile - -+ -+ io.substrait -+ core -+ ${substrait.version} -+ provided -+ -+ -+ com.google.protobuf -+ protobuf-java -+ ${protobuf.version} -+ provided -+ - - org.apache.arrow - arrow-memory-netty -diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc -index 8d7dafd84..89cdc39fe 100644 ---- a/java/dataset/src/main/cpp/jni_wrapper.cc -+++ b/java/dataset/src/main/cpp/jni_wrapper.cc -@@ -25,6 +25,7 @@ - #include "arrow/c/helpers.h" - #include "arrow/dataset/api.h" - #include "arrow/dataset/file_base.h" -+#include "arrow/dataset/file_csv.h" - #include "arrow/filesystem/localfs.h" - #include "arrow/filesystem/path_util.h" - #ifdef ARROW_S3 -@@ -122,6 +123,19 @@ arrow::Result> GetFileFormat( - } - } - -+arrow::Result> -+GetFragmentScanOptions(jint file_format_id, -+ const std::unordered_map& configs) { -+ switch (file_format_id) { -+#ifdef ARROW_CSV -+ case 3: -+ return arrow::dataset::CsvFragmentScanOptions::from(configs); -+#endif -+ default: -+ return arrow::Status::Invalid("Illegal file format id: " ,file_format_id); -+ } -+} -+ - class ReserveFromJava : public arrow::dataset::jni::ReservationListener { - public: - ReserveFromJava(JavaVM* vm, jobject java_reservation_listener) -@@ -460,12 +474,13 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_closeDataset - /* - * Class: org_apache_arrow_dataset_jni_JniWrapper - * Method: createScanner -- * Signature: (J[Ljava/lang/String;Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;JJ)J -+ * Signature: -+ * (J[Ljava/lang/String;Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;JJ;Ljava/nio/ByteBuffer;J)J - */ - JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScanner( - JNIEnv* env, jobject, jlong dataset_id, jobjectArray columns, -- jobject substrait_projection, jobject substrait_filter, -- jlong batch_size, jlong memory_pool_id) { -+ jobject substrait_projection, jobject substrait_filter, jlong batch_size, -+ jlong file_format_id, jobject options, jlong memory_pool_id) { - JNI_METHOD_START - arrow::MemoryPool* pool = reinterpret_cast(memory_pool_id); - if (pool == nullptr) { -@@ -514,6 +529,14 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScann - } - JniAssertOkOrThrow(scanner_builder->Filter(*filter_expr)); - } -+ if (file_format_id != -1 && options != nullptr) { -+ std::unordered_map option_map; -+ std::shared_ptr buffer = LoadArrowBufferFromByteBuffer(env, options); -+ JniAssertOkOrThrow(arrow::engine::DeserializeMap(*buffer, option_map)); -+ std::shared_ptr scan_options = -+ JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map)); -+ JniAssertOkOrThrow(scanner_builder->FragmentScanOptions(scan_options)); -+ } - JniAssertOkOrThrow(scanner_builder->BatchSize(batch_size)); - - auto scanner = JniGetOrThrow(scanner_builder->Finish()); -@@ -627,14 +650,31 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_ensureS3Fina - /* - * Class: org_apache_arrow_dataset_file_JniWrapper - * Method: makeFileSystemDatasetFactory -- * Signature: (Ljava/lang/String;II)J -+ * Signature: (Ljava/lang/String;IILjava/lang/String;Ljava/nio/ByteBuffer)J - */ - JNIEXPORT jlong JNICALL --Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljava_lang_String_2I( -- JNIEnv* env, jobject, jstring uri, jint file_format_id) { -+Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( -+ JNIEnv* env, jobject, jstring uri, jint file_format_id, jobject options) { - JNI_METHOD_START - std::shared_ptr file_format = - JniGetOrThrow(GetFileFormat(file_format_id)); -+ if (options != nullptr) { -+ std::unordered_map option_map; -+ std::shared_ptr buffer = LoadArrowBufferFromByteBuffer(env, options); -+ JniAssertOkOrThrow(arrow::engine::DeserializeMap(*buffer, option_map)); -+ std::shared_ptr scan_options = -+ JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map)); -+ file_format->default_fragment_scan_options = scan_options; -+#ifdef ARROW_CSV -+ if (file_format_id == 3) { -+ std::shared_ptr csv_file_format = -+ std::dynamic_pointer_cast(file_format); -+ csv_file_format->parse_options = -+ std::dynamic_pointer_cast(scan_options) -+ ->parse_options; -+ } -+#endif -+ } - arrow::dataset::FileSystemFactoryOptions options; - std::shared_ptr d = - JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make( -@@ -645,16 +685,33 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljav - - /* - * Class: org_apache_arrow_dataset_file_JniWrapper -- * Method: makeFileSystemDatasetFactory -- * Signature: ([Ljava/lang/String;II)J -+ * Method: makeFileSystemDatasetFactoryWithFiles -+ * Signature: ([Ljava/lang/String;IIJ;Ljava/nio/ByteBuffer)J - */ - JNIEXPORT jlong JNICALL --Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Ljava_lang_String_2I( -- JNIEnv* env, jobject, jobjectArray uris, jint file_format_id) { -+Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactoryWithFiles( -+ JNIEnv* env, jobject, jobjectArray uris, jint file_format_id, jobject options) { - JNI_METHOD_START - - std::shared_ptr file_format = - JniGetOrThrow(GetFileFormat(file_format_id)); -+ if (options != nullptr) { -+ std::unordered_map option_map; -+ std::shared_ptr buffer = LoadArrowBufferFromByteBuffer(env, options); -+ JniAssertOkOrThrow(arrow::engine::DeserializeMap(*buffer, option_map)); -+ std::shared_ptr scan_options = -+ JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map)); -+ file_format->default_fragment_scan_options = scan_options; -+#ifdef ARROW_CSV -+ if (file_format_id == 3) { -+ std::shared_ptr csv_file_format = -+ std::dynamic_pointer_cast(file_format); -+ csv_file_format->parse_options = -+ std::dynamic_pointer_cast(scan_options) -+ ->parse_options; -+ } -+#endif -+ } - arrow::dataset::FileSystemFactoryOptions options; - - std::vector uri_vec = ToStringVector(env, uris); -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java -index aa3156905..a0b6fb168 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java -@@ -17,8 +17,11 @@ - - package org.apache.arrow.dataset.file; - -+import java.util.Optional; -+ - import org.apache.arrow.dataset.jni.NativeDatasetFactory; - import org.apache.arrow.dataset.jni.NativeMemoryPool; -+import org.apache.arrow.dataset.scanner.FragmentScanOptions; - import org.apache.arrow.memory.BufferAllocator; - - /** -@@ -27,21 +30,34 @@ import org.apache.arrow.memory.BufferAllocator; - public class FileSystemDatasetFactory extends NativeDatasetFactory { - - public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, -- String uri) { -- super(allocator, memoryPool, createNative(format, uri)); -+ String uri, Optional fragmentScanOptions) { -+ super(allocator, memoryPool, createNative(format, uri, fragmentScanOptions)); -+ } -+ -+ public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, -+ String uri) { -+ super(allocator, memoryPool, createNative(format, uri, Optional.empty())); -+ } -+ -+ public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, -+ String[] uris, Optional fragmentScanOptions) { -+ super(allocator, memoryPool, createNative(format, uris, fragmentScanOptions)); - } - - public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, - String[] uris) { -- super(allocator, memoryPool, createNative(format, uris)); -+ super(allocator, memoryPool, createNative(format, uris, Optional.empty())); - } - -- private static long createNative(FileFormat format, String uri) { -- return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id()); -+ private static long createNative(FileFormat format, String uri, Optional fragmentScanOptions) { -+ return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id(), -+ fragmentScanOptions.map(FragmentScanOptions::serialize).orElse(null)); - } - -- private static long createNative(FileFormat format, String[] uris) { -- return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id()); -+ private static long createNative(FileFormat format, String[] uris, -+ Optional fragmentScanOptions) { -+ return JniWrapper.get().makeFileSystemDatasetFactoryWithFiles(uris, format.id(), -+ fragmentScanOptions.map(FragmentScanOptions::serialize).orElse(null)); - } - - } -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java -index c3a1a4e58..c3f8e12b3 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java -@@ -17,6 +17,8 @@ - - package org.apache.arrow.dataset.file; - -+import java.nio.ByteBuffer; -+ - import org.apache.arrow.dataset.jni.JniLoader; - - /** -@@ -43,7 +45,8 @@ public class JniWrapper { - * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. - * @see FileFormat - */ -- public native long makeFileSystemDatasetFactory(String uri, int fileFormat); -+ public native long makeFileSystemDatasetFactory(String uri, int fileFormat, -+ ByteBuffer serializedFragmentScanOptions); - - /** - * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a -@@ -54,7 +57,8 @@ public class JniWrapper { - * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. - * @see FileFormat - */ -- public native long makeFileSystemDatasetFactory(String[] uris, int fileFormat); -+ public native long makeFileSystemDatasetFactoryWithFiles(String[] uris, int fileFormat, -+ ByteBuffer serializedFragmentScanOptions); - - /** - * Write the content in a {@link org.apache.arrow.c.ArrowArrayStream} into files. This internally -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java -index 637a3e8f2..6d6309140 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java -@@ -80,7 +80,8 @@ public class JniWrapper { - * @return the native pointer of the arrow::dataset::Scanner instance. - */ - public native long createScanner(long datasetId, String[] columns, ByteBuffer substraitProjection, -- ByteBuffer substraitFilter, long batchSize, long memoryPool); -+ ByteBuffer substraitFilter, long batchSize, long fileFormat, -+ ByteBuffer serializedFragmentScanOptions, long memoryPool); - - /** - * Get a serialized schema from native instance of a Scanner. -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java -index d9abad997..3a96fe768 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java -@@ -17,6 +17,9 @@ - - package org.apache.arrow.dataset.jni; - -+import java.nio.ByteBuffer; -+ -+import org.apache.arrow.dataset.scanner.FragmentScanOptions; - import org.apache.arrow.dataset.scanner.ScanOptions; - import org.apache.arrow.dataset.source.Dataset; - -@@ -40,11 +43,18 @@ public class NativeDataset implements Dataset { - if (closed) { - throw new NativeInstanceReleasedException(); - } -- -+ int fileFormat = -1; -+ ByteBuffer serialized = null; -+ if (options.getFragmentScanOptions().isPresent()) { -+ FragmentScanOptions fragmentScanOptions = options.getFragmentScanOptions().get(); -+ fileFormat = fragmentScanOptions.fileFormatId(); -+ serialized = fragmentScanOptions.serialize(); -+ } - long scannerId = JniWrapper.get().createScanner(datasetId, options.getColumns().orElse(null), - options.getSubstraitProjection().orElse(null), - options.getSubstraitFilter().orElse(null), -- options.getBatchSize(), context.getMemoryPool().getNativeInstanceId()); -+ options.getBatchSize(), fileFormat, serialized, -+ context.getMemoryPool().getNativeInstanceId()); - - return new NativeScanner(context, scannerId); - } -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java -new file mode 100644 -index 000000000..8acb2b2d4 ---- /dev/null -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java -@@ -0,0 +1,50 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.arrow.dataset.scanner; -+ -+import java.nio.ByteBuffer; -+import java.util.Map; -+ -+import org.apache.arrow.dataset.substrait.util.ConvertUtil; -+ -+import io.substrait.proto.Expression; -+ -+public interface FragmentScanOptions { -+ String typeName(); -+ -+ int fileFormatId(); -+ -+ ByteBuffer serialize(); -+ -+ /** -+ * serialize the map. -+ * -+ * @param config config map -+ * @return bufer to jni call argument, should be DirectByteBuffer -+ */ -+ default ByteBuffer serializeMap(Map config) { -+ if (config.isEmpty()) { -+ return null; -+ } -+ -+ Expression.Literal literal = ConvertUtil.mapToExpressionLiteral(config); -+ ByteBuffer buf = ByteBuffer.allocateDirect(literal.getSerializedSize()); -+ buf.put(literal.toByteArray()); -+ return buf; -+ } -+} -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java -index 995d05ac3..aad71930c 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java -@@ -31,6 +31,8 @@ public class ScanOptions { - private final Optional substraitProjection; - private final Optional substraitFilter; - -+ private final Optional fragmentScanOptions; -+ - /** - * Constructor. - * @param columns Projected columns. Empty for scanning all columns. -@@ -61,6 +63,7 @@ public class ScanOptions { - this.columns = columns; - this.substraitProjection = Optional.empty(); - this.substraitFilter = Optional.empty(); -+ this.fragmentScanOptions = Optional.empty(); - } - - public ScanOptions(long batchSize) { -@@ -83,6 +86,10 @@ public class ScanOptions { - return substraitFilter; - } - -+ public Optional getFragmentScanOptions() { -+ return fragmentScanOptions; -+ } -+ - /** - * Builder for Options used during scanning. - */ -@@ -91,6 +98,7 @@ public class ScanOptions { - private Optional columns; - private ByteBuffer substraitProjection; - private ByteBuffer substraitFilter; -+ private FragmentScanOptions fragmentScanOptions; - - /** - * Constructor. -@@ -136,6 +144,18 @@ public class ScanOptions { - return this; - } - -+ /** -+ * Set the FragmentScanOptions. -+ * -+ * @param fragmentScanOptions scan options -+ * @return the ScanOptions configured. -+ */ -+ public Builder fragmentScanOptions(FragmentScanOptions fragmentScanOptions) { -+ Preconditions.checkNotNull(fragmentScanOptions); -+ this.fragmentScanOptions = fragmentScanOptions; -+ return this; -+ } -+ - public ScanOptions build() { - return new ScanOptions(this); - } -@@ -146,5 +166,6 @@ public class ScanOptions { - columns = builder.columns; - substraitProjection = Optional.ofNullable(builder.substraitProjection); - substraitFilter = Optional.ofNullable(builder.substraitFilter); -+ fragmentScanOptions = Optional.ofNullable(builder.fragmentScanOptions); - } - } -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java -new file mode 100644 -index 000000000..08e35ede2 ---- /dev/null -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java -@@ -0,0 +1,51 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.arrow.dataset.scanner.csv; -+ -+import java.util.Map; -+import java.util.Optional; -+ -+import org.apache.arrow.c.ArrowSchema; -+ -+public class CsvConvertOptions { -+ -+ private final Map configs; -+ -+ private Optional cSchema = Optional.empty(); -+ -+ public CsvConvertOptions(Map configs) { -+ this.configs = configs; -+ } -+ -+ public Optional getArrowSchema() { -+ return cSchema; -+ } -+ -+ public Map getConfigs() { -+ return configs; -+ } -+ -+ public void set(String key, String value) { -+ configs.put(key, value); -+ } -+ -+ public void setArrowSchema(ArrowSchema cSchema) { -+ this.cSchema = Optional.of(cSchema); -+ } -+ -+} -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java -new file mode 100644 -index 000000000..88973f0a0 ---- /dev/null -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java -@@ -0,0 +1,97 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.arrow.dataset.scanner.csv; -+ -+import java.io.Serializable; -+import java.nio.ByteBuffer; -+import java.util.Locale; -+import java.util.Map; -+import java.util.stream.Collectors; -+import java.util.stream.Stream; -+ -+import org.apache.arrow.dataset.file.FileFormat; -+import org.apache.arrow.dataset.scanner.FragmentScanOptions; -+ -+public class CsvFragmentScanOptions implements Serializable, FragmentScanOptions { -+ private final CsvConvertOptions convertOptions; -+ private final Map readOptions; -+ private final Map parseOptions; -+ -+ -+ /** -+ * csv scan options, map to CPP struct CsvFragmentScanOptions. -+ * -+ * @param convertOptions same struct in CPP -+ * @param readOptions same struct in CPP -+ * @param parseOptions same struct in CPP -+ */ -+ public CsvFragmentScanOptions(CsvConvertOptions convertOptions, -+ Map readOptions, -+ Map parseOptions) { -+ this.convertOptions = convertOptions; -+ this.readOptions = readOptions; -+ this.parseOptions = parseOptions; -+ } -+ -+ public String typeName() { -+ return FileFormat.CSV.name().toLowerCase(Locale.ROOT); -+ } -+ -+ /** -+ * File format id. -+ * -+ * @return id -+ */ -+ public int fileFormatId() { -+ return FileFormat.CSV.id(); -+ } -+ -+ /** -+ * Serialize this class to ByteBuffer and then called by jni call. -+ * -+ * @return DirectByteBuffer -+ */ -+ public ByteBuffer serialize() { -+ Map options = Stream.concat(Stream.concat(readOptions.entrySet().stream(), -+ parseOptions.entrySet().stream()), -+ convertOptions.getConfigs().entrySet().stream()).collect( -+ Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); -+ -+ if (convertOptions.getArrowSchema().isPresent()) { -+ options.put("column_types", Long.toString(convertOptions.getArrowSchema().get().memoryAddress())); -+ } -+ return serializeMap(options); -+ } -+ -+ public static CsvFragmentScanOptions deserialize(String serialized) { -+ throw new UnsupportedOperationException("Not implemented now"); -+ } -+ -+ public CsvConvertOptions getConvertOptions() { -+ return convertOptions; -+ } -+ -+ public Map getReadOptions() { -+ return readOptions; -+ } -+ -+ public Map getParseOptions() { -+ return parseOptions; -+ } -+ -+} -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/util/ConvertUtil.java b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/util/ConvertUtil.java -new file mode 100644 -index 000000000..31a4023af ---- /dev/null -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/util/ConvertUtil.java -@@ -0,0 +1,46 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.arrow.dataset.substrait.util; -+ -+import java.util.Map; -+ -+import io.substrait.proto.Expression; -+ -+public class ConvertUtil { -+ -+ /** -+ * Convert map to substrait Expression. -+ * -+ * @return Substrait Expression -+ */ -+ public static Expression.Literal mapToExpressionLiteral(Map values) { -+ Expression.Literal.Builder literalBuilder = Expression.Literal.newBuilder(); -+ Expression.Literal.Map.KeyValue.Builder keyValueBuilder = -+ Expression.Literal.Map.KeyValue.newBuilder(); -+ Expression.Literal.Map.Builder mapBuilder = Expression.Literal.Map.newBuilder(); -+ for (Map.Entry entry : values.entrySet()) { -+ literalBuilder.setString(entry.getKey()); -+ keyValueBuilder.setKey(literalBuilder.build()); -+ literalBuilder.setString(entry.getValue()); -+ keyValueBuilder.setValue(literalBuilder.build()); -+ mapBuilder.addKeyValues(keyValueBuilder.build()); -+ } -+ literalBuilder.setMap(mapBuilder.build()); -+ return literalBuilder.build(); -+ } -+} -diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java b/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java -index 0fba72892..e7903b7a4 100644 ---- a/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java -+++ b/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java -@@ -31,6 +31,9 @@ import java.util.HashMap; - import java.util.Map; - import java.util.Optional; - -+import org.apache.arrow.c.ArrowSchema; -+import org.apache.arrow.c.CDataDictionaryProvider; -+import org.apache.arrow.c.Data; - import org.apache.arrow.dataset.ParquetWriteSupport; - import org.apache.arrow.dataset.TestDataset; - import org.apache.arrow.dataset.file.FileFormat; -@@ -38,8 +41,11 @@ import org.apache.arrow.dataset.file.FileSystemDatasetFactory; - import org.apache.arrow.dataset.jni.NativeMemoryPool; - import org.apache.arrow.dataset.scanner.ScanOptions; - import org.apache.arrow.dataset.scanner.Scanner; -+import org.apache.arrow.dataset.scanner.csv.CsvConvertOptions; -+import org.apache.arrow.dataset.scanner.csv.CsvFragmentScanOptions; - import org.apache.arrow.dataset.source.Dataset; - import org.apache.arrow.dataset.source.DatasetFactory; -+import org.apache.arrow.memory.BufferAllocator; - import org.apache.arrow.vector.ipc.ArrowReader; - import org.apache.arrow.vector.types.pojo.ArrowType; - import org.apache.arrow.vector.types.pojo.Field; -@@ -49,6 +55,8 @@ import org.junit.ClassRule; - import org.junit.Test; - import org.junit.rules.TemporaryFolder; - -+import com.google.common.collect.ImmutableMap; -+ - public class TestAceroSubstraitConsumer extends TestDataset { - - @ClassRule -@@ -457,4 +465,42 @@ public class TestAceroSubstraitConsumer extends TestDataset { - substraitExpression.put(decodedSubstrait); - return substraitExpression; - } -+ -+ @Test -+ public void testCsvConvertOptions() throws Exception { -+ final Schema schema = new Schema(Arrays.asList( -+ Field.nullable("Id", new ArrowType.Int(32, true)), -+ Field.nullable("Name", new ArrowType.Utf8()), -+ Field.nullable("Language", new ArrowType.Utf8()) -+ ), null); -+ String path = "file://" + getClass().getResource("/").getPath() + "/data/student.csv"; -+ BufferAllocator allocator = rootAllocator(); -+ try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator); -+ CDataDictionaryProvider provider = new CDataDictionaryProvider()) { -+ Data.exportSchema(allocator, schema, provider, cSchema); -+ CsvConvertOptions convertOptions = new CsvConvertOptions(ImmutableMap.of("delimiter", ";")); -+ convertOptions.setArrowSchema(cSchema); -+ CsvFragmentScanOptions fragmentScanOptions = new CsvFragmentScanOptions( -+ convertOptions, ImmutableMap.of(), ImmutableMap.of()); -+ ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768) -+ .columns(Optional.empty()) -+ .fragmentScanOptions(fragmentScanOptions) -+ .build(); -+ try ( -+ DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), -+ FileFormat.CSV, path); -+ Dataset dataset = datasetFactory.finish(); -+ Scanner scanner = dataset.newScan(options); -+ ArrowReader reader = scanner.scanBatches() -+ ) { -+ assertEquals(schema.getFields(), reader.getVectorSchemaRoot().getSchema().getFields()); -+ int rowCount = 0; -+ while (reader.loadNextBatch()) { -+ assertEquals("[1, 2, 3]", reader.getVectorSchemaRoot().getVector("Id").toString()); -+ rowCount += reader.getVectorSchemaRoot().getRowCount(); -+ } -+ assertEquals(3, rowCount); -+ } -+ } -+ } - } -diff --git a/java/dataset/src/test/resources/data/student.csv b/java/dataset/src/test/resources/data/student.csv -new file mode 100644 -index 000000000..329194609 ---- /dev/null -+++ b/java/dataset/src/test/resources/data/student.csv -@@ -0,0 +1,4 @@ -+Id;Name;Language -+1;Juno;Java -+2;Peter;Python -+3;Celin;C++ diff --git a/gluten-arrow/pom.xml b/gluten-arrow/pom.xml index ffba2682e9dc..70a40ddd3a74 100644 --- a/gluten-arrow/pom.xml +++ b/gluten-arrow/pom.xml @@ -87,13 +87,13 @@ org.apache.arrow ${arrow-memory.artifact} - ${arrow-gluten.version} + ${arrow.version} runtime org.apache.arrow arrow-memory-core - ${arrow-gluten.version} + ${arrow.version} compile @@ -109,7 +109,7 @@ org.apache.arrow arrow-vector - ${arrow-gluten.version} + ${arrow.version} io.netty @@ -128,7 +128,7 @@ org.apache.arrow arrow-c-data - ${arrow-gluten.version} + ${arrow.version} org.apache.arrow @@ -144,7 +144,7 @@ org.apache.arrow arrow-dataset - ${arrow-gluten.version} + ${arrow.version} io.netty diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala b/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala index a94f8f2e3d49..4575e9553927 100644 --- a/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala +++ b/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.arrow.c.{ArrowSchema, CDataDictionaryProvider, Data} import org.apache.arrow.dataset.file.{FileFormat, FileSystemDatasetFactory} import org.apache.arrow.dataset.jni.NativeMemoryPool -import org.apache.arrow.dataset.scanner.FragmentScanOptions import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector.ipc.message.ArrowRecordBatch import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema} @@ -37,7 +36,6 @@ import org.apache.hadoop.fs.FileStatus import java.net.{URI, URLDecoder} import java.util -import java.util.Optional import scala.collection.JavaConverters._ import scala.collection.mutable @@ -120,23 +118,21 @@ object ArrowUtil extends Logging { def makeArrowDiscovery( encodedUri: String, format: FileFormat, - option: Optional[FragmentScanOptions], allocator: BufferAllocator, pool: NativeMemoryPool ): FileSystemDatasetFactory = { val factory = - new FileSystemDatasetFactory(allocator, pool, format, rewriteUri(encodedUri), option) + new FileSystemDatasetFactory(allocator, pool, format, rewriteUri(encodedUri)) factory } def readArrowSchema( file: String, format: FileFormat, - option: FragmentScanOptions, allocator: BufferAllocator, pool: NativeMemoryPool): Schema = { val factory: FileSystemDatasetFactory = - makeArrowDiscovery(file, format, Optional.of(option), allocator, pool) + makeArrowDiscovery(file, format, allocator, pool) val schema = factory.inspect() factory.close() schema @@ -145,11 +141,10 @@ object ArrowUtil extends Logging { def readArrowFileColumnNames( file: String, format: FileFormat, - option: FragmentScanOptions, allocator: BufferAllocator, pool: NativeMemoryPool): Array[String] = { val fileFields = ArrowUtil - .readArrowSchema(URLDecoder.decode(file, "UTF-8"), format, option, allocator, pool) + .readArrowSchema(URLDecoder.decode(file, "UTF-8"), format, allocator, pool) .getFields .asScala fileFields.map(_.getName).toArray @@ -158,11 +153,10 @@ object ArrowUtil extends Logging { def readSchema( file: FileStatus, format: FileFormat, - option: FragmentScanOptions, allocator: BufferAllocator, pool: NativeMemoryPool): Option[StructType] = { val factory: FileSystemDatasetFactory = - makeArrowDiscovery(file.getPath.toString, format, Optional.of(option), allocator, pool) + makeArrowDiscovery(file.getPath.toString, format, allocator, pool) val schema = factory.inspect() try { Option(SparkSchemaUtil.fromArrowSchema(schema)) @@ -174,14 +168,13 @@ object ArrowUtil extends Logging { def readSchema( files: Seq[FileStatus], format: FileFormat, - option: FragmentScanOptions, allocator: BufferAllocator, pool: NativeMemoryPool): Option[StructType] = { if (files.isEmpty) { throw new IllegalArgumentException("No input file specified") } - readSchema(files.head, format, option, allocator, pool) + readSchema(files.head, format, allocator, pool) } def loadMissingColumns( diff --git a/pom.xml b/pom.xml index 4d704dc9b448..1057e728ba04 100644 --- a/pom.xml +++ b/pom.xml @@ -68,7 +68,6 @@ 0.5.2 0.9.1 15.0.0 - 15.0.0-gluten arrow-memory-unsafe 2.7.4 2.0.7