Skip to content

Commit

Permalink
[GLUTEN-8228][VL] Remove CSV option support
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Dec 19, 2024
1 parent 0e9aba4 commit 661d944
Show file tree
Hide file tree
Showing 13 changed files with 40 additions and 997 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import org.apache.spark.util.SerializableConfiguration
import org.apache.arrow.c.ArrowSchema
import org.apache.arrow.dataset.file.FileSystemDatasetFactory
import org.apache.arrow.dataset.scanner.ScanOptions
import org.apache.arrow.dataset.scanner.csv.CsvFragmentScanOptions
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector.VectorUnloader
import org.apache.arrow.vector.types.pojo.Schema
Expand Down Expand Up @@ -76,11 +75,9 @@ class ArrowCSVFileFormat(parsedOptions: CSVOptions)
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions)
ArrowUtil.readSchema(
files,
fileFormat,
arrowConfig,
ArrowBufferAllocators.contextInstance(),
ArrowNativeMemoryPool.arrowPool("infer schema"))
}
Expand Down Expand Up @@ -116,14 +113,12 @@ class ArrowCSVFileFormat(parsedOptions: CSVOptions)
actualFilters,
broadcastedHadoopConf.value.value)

val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions)
val allocator = ArrowBufferAllocators.contextInstance()
// todo predicate validation / pushdown
val fileNames = ArrowUtil
.readArrowFileColumnNames(
URLDecoder.decode(file.filePath.toString, "UTF-8"),
fileFormat,
arrowConfig,
ArrowBufferAllocators.contextInstance(),
pool)
val tokenIndexArr =
Expand All @@ -140,18 +135,15 @@ class ArrowCSVFileFormat(parsedOptions: CSVOptions)
val cSchema: ArrowSchema = ArrowSchema.allocateNew(allocator)
val cSchema2: ArrowSchema = ArrowSchema.allocateNew(allocator)
try {
ArrowCSVOptionConverter.schema(requestSchema, cSchema, allocator, arrowConfig)
val factory =
ArrowUtil.makeArrowDiscovery(
URLDecoder.decode(file.filePath.toString, "UTF-8"),
fileFormat,
Optional.of(arrowConfig),
ArrowBufferAllocators.contextInstance(),
pool)
val fields = factory.inspect().getFields
val actualReadFields = new Schema(
fileIndex.map(index => fields.get(index)).toIterable.asJava)
ArrowCSVOptionConverter.schema(requestSchema, cSchema2, allocator, arrowConfig)
ArrowCSVFileFormat
.readArrow(
ArrowBufferAllocators.contextInstance(),
Expand All @@ -160,8 +152,7 @@ class ArrowCSVFileFormat(parsedOptions: CSVOptions)
missingSchema,
partitionSchema,
factory,
batchSize,
arrowConfig)
batchSize)
.asInstanceOf[Iterator[InternalRow]]
} catch {
case e: SchemaMismatchException =>
Expand Down Expand Up @@ -222,13 +213,11 @@ object ArrowCSVFileFormat {
missingSchema: StructType,
partitionSchema: StructType,
factory: FileSystemDatasetFactory,
batchSize: Int,
arrowConfig: CsvFragmentScanOptions): Iterator[ColumnarBatch] = {
batchSize: Int): Iterator[ColumnarBatch] = {
val actualReadFieldNames = actualReadFields.getFields.asScala.map(_.getName).toArray
val dataset = factory.finish(actualReadFields)
val scanOptions = new ScanOptions.Builder(batchSize)
.columns(Optional.of(actualReadFieldNames))
.fragmentScanOptions(arrowConfig)
.build()
val scanner = dataset.newScan(scanOptions)

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.datasource.v2

import org.apache.gluten.datasource.{ArrowCSVFileFormat, ArrowCSVOptionConverter}
import org.apache.gluten.datasource.ArrowCSVFileFormat
import org.apache.gluten.exception.SchemaMismatchException
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.memory.arrow.pool.ArrowNativeMemoryPool
Expand All @@ -40,7 +40,6 @@ import org.apache.arrow.c.ArrowSchema
import org.apache.arrow.vector.types.pojo.Schema

import java.net.URLDecoder
import java.util.Optional

import scala.collection.JavaConverters.asJavaIterableConverter

Expand Down Expand Up @@ -91,12 +90,10 @@ case class ArrowCSVPartitionReaderFactory(
ArrowBufferAllocators.contextInstance(),
ArrowNativeMemoryPool.arrowPool("FileSystemFactory"))
}
val arrowConfig = ArrowCSVOptionConverter.convert(options)
val fileNames = ArrowUtil
.readArrowFileColumnNames(
URLDecoder.decode(partitionedFile.filePath.toString, "UTF-8"),
fileFormat,
arrowConfig,
ArrowBufferAllocators.contextInstance(),
pool)
val tokenIndexArr =
Expand All @@ -113,18 +110,15 @@ case class ArrowCSVPartitionReaderFactory(
// TODO: support array/map/struct types in out-of-order schema reading.
val iter =
try {
ArrowCSVOptionConverter.schema(requestSchema, cSchema, allocator, arrowConfig)
val factory =
ArrowUtil.makeArrowDiscovery(
URLDecoder.decode(partitionedFile.filePath.toString, "UTF-8"),
fileFormat,
Optional.of(arrowConfig),
ArrowBufferAllocators.contextInstance(),
pool)
val fields = factory.inspect().getFields
val actualReadFields = new Schema(
fileIndex.map(index => fields.get(index)).toIterable.asJava)
ArrowCSVOptionConverter.schema(requestSchema, cSchema2, allocator, arrowConfig)
ArrowCSVFileFormat
.readArrow(
ArrowBufferAllocators.contextInstance(),
Expand All @@ -133,8 +127,7 @@ case class ArrowCSVPartitionReaderFactory(
missingSchema,
readPartitionSchema,
factory,
batchSize,
arrowConfig)
batchSize)
} catch {
case e: SchemaMismatchException =>
logWarning(e.getMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.gluten.datasource.v2

import org.apache.gluten.datasource.ArrowCSVOptionConverter
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.memory.arrow.pool.ArrowNativeMemoryPool
import org.apache.gluten.utils.ArrowUtil
Expand Down Expand Up @@ -58,11 +57,9 @@ case class ArrowCSVTable(
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord
)
val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions)
ArrowUtil.readSchema(
files.head,
org.apache.arrow.dataset.file.FileFormat.CSV,
arrowConfig,
allocator,
pool
)
Expand Down
25 changes: 22 additions & 3 deletions dev/build_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,34 @@

set -exu

BUILD_ARROW_JAVA=ON

CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
export SUDO=sudo
source ${CURRENT_DIR}/build_helper_functions.sh
VELOX_ARROW_BUILD_VERSION=15.0.0
ARROW_PREFIX=$CURRENT_DIR/../ep/_ep/arrow_ep
BUILD_TYPE=Release

for arg in "$@"
do
case $arg in
--build_arrow_java=*)
BUILD_ARROW_JAVA=("${arg#*=}")
shift # Remove argument name from processing
;;
*)
OTHER_ARGUMENTS+=("$1")
shift # Remove generic argument from processing
;;
esac
done

function prepare_arrow_build() {
mkdir -p ${ARROW_PREFIX}/../ && pushd ${ARROW_PREFIX}/../ && sudo rm -rf arrow_ep/
wget_and_untar https://archive.apache.org/dist/arrow/arrow-${VELOX_ARROW_BUILD_VERSION}/apache-arrow-${VELOX_ARROW_BUILD_VERSION}.tar.gz arrow_ep
cd arrow_ep
patch -p1 < $CURRENT_DIR/../ep/build-velox/src/modify_arrow.patch
patch -p1 < $CURRENT_DIR/../ep/build-velox/src/modify_arrow_dataset_scan_option.patch
popd
}

Expand Down Expand Up @@ -108,5 +123,9 @@ echo "Start to build Arrow"
prepare_arrow_build
build_arrow_cpp
echo "Finished building arrow CPP"
build_arrow_java
echo "Finished building arrow Java"

if [ $BUILD_ARROW_JAVA == "ON" ]; then
build_arrow_java
echo "Finished building arrow Java"
fi

3 changes: 2 additions & 1 deletion dev/builddeps-veloxbe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ VELOX_BRANCH=""
VELOX_HOME=""
VELOX_PARAMETER=""
BUILD_ARROW=ON
BUILD_ARROW_JAVA=ON
SPARK_VERSION=ALL

# set default number of threads as cpu cores minus 2
Expand Down Expand Up @@ -184,7 +185,7 @@ concat_velox_param

function build_arrow {
cd $GLUTEN_DIR/dev
./build_arrow.sh
./build_arrow.sh --build_arrow_java=$BUILD_ARROW_JAVA
}

function build_velox {
Expand Down
2 changes: 1 addition & 1 deletion dev/ci-velox-buildshared-centos-8.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ set -e

source /opt/rh/gcc-toolset-11/enable
./dev/builddeps-veloxbe.sh --run_setup_script=OFF --build_arrow=OFF --enable_ep_cache=OFF --build_tests=ON \
--build_examples=ON --build_benchmarks=ON
--build_examples=ON --build_benchmarks=ON --build_arrow_java=OFF
2 changes: 1 addition & 1 deletion dev/ci-velox-buildstatic-centos-7.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ set -e
source /opt/rh/devtoolset-11/enable
export NUM_THREADS=4
./dev/builddeps-veloxbe.sh --enable_vcpkg=ON --build_arrow=OFF --build_tests=OFF --build_benchmarks=OFF \
--build_examples=OFF --enable_s3=ON --enable_gcs=ON --enable_hdfs=ON --enable_abfs=ON
--build_examples=OFF --enable_s3=ON --enable_gcs=ON --enable_hdfs=ON --enable_abfs=ON --build_arrow_java=OFF
3 changes: 0 additions & 3 deletions ep/build-velox/src/get_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,8 @@ function apply_compilation_fixes {
velox_home=$2

sudo cp ${current_dir}/modify_arrow.patch ${velox_home}/CMake/resolve_dependency_modules/arrow/
sudo cp ${current_dir}/modify_arrow_dataset_scan_option.patch ${velox_home}/CMake/resolve_dependency_modules/arrow/

git add ${velox_home}/CMake/resolve_dependency_modules/arrow/modify_arrow.patch # to avoid the file from being deleted by git clean -dffx :/
git add ${velox_home}/CMake/resolve_dependency_modules/arrow/modify_arrow_dataset_scan_option.patch # to avoid the file from being deleted by git clean -dffx :/

}

function setup_linux {
Expand Down
Loading

0 comments on commit 661d944

Please sign in to comment.