Skip to content

Commit

Permalink
[GLUTEN-5414] [VL] Support arrow csv option and schema (#5850)
Browse files Browse the repository at this point in the history
Support basic option now, will support more options after arrow patch merged.

apache/arrow#41646

Before this patch, if the required schema is different with file schema, csv read will fallback.
And changed to use index in file instead of check the file column name considering case sensitive.
Add a new common test function when the rule applies to Logical plan.

Compile arrow with version 15.0.0-gluten, upgrade arrow-dataset and arrow-c-data version from 15.0.0 to 15.0.0-gluten.
  • Loading branch information
jinchengchenghh authored Jun 3, 2024
1 parent 7a036dd commit ad817ed
Show file tree
Hide file tree
Showing 30 changed files with 1,475 additions and 313 deletions.
71 changes: 56 additions & 15 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,24 @@ jobs:
id: cache
uses: actions/cache/restore@v3
with:
path: ./cpp/build/releases/
path: |
./cpp/build/releases/
~/.m2/repository/org/apache/arrow/
key: cache-velox-build-${{ hashFiles('./cache-key') }}
- name: Build Gluten Velox third party
if: ${{ steps.cache.outputs.cache-hit != 'true' }}
run: |
source dev/ci-velox-buildstatic.sh
- uses: actions/upload-artifact@v2
- name: Upload Artifact Native
uses: actions/upload-artifact@v2
with:
path: ./cpp/build/releases/
name: velox-native-lib-centos-7-${{github.sha}}
- name: Upload Artifact Arrow Jar
uses: actions/upload-artifact@v2
with:
path: /root/.m2/repository/org/apache/arrow/
name: velox-arrow-jar-centos-7-${{github.sha}}

run-tpc-test-ubuntu:
needs: build-native-lib-centos-7
Expand All @@ -92,11 +100,16 @@ jobs:
container: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
- name: Download All Native Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Setup java and maven
run: |
if [ "${{ matrix.java }}" = "java-17" ]; then
Expand All @@ -105,6 +118,7 @@ jobs:
apt-get update && apt-get install -y openjdk-8-jdk maven
fi
apt remove openjdk-11* -y
ls -l /root/.m2/repository/org/apache/arrow/arrow-dataset/15.0.0-gluten/
- name: Build and run TPCH/DS
run: |
cd $GITHUB_WORKSPACE/
Expand Down Expand Up @@ -140,11 +154,16 @@ jobs:
container: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
- name: Download All Native Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Update mirror list
if: matrix.os == 'centos:8'
run: |
Expand Down Expand Up @@ -215,11 +234,16 @@ jobs:
sudo docker image prune --all --force > /dev/null
df -h
- uses: actions/checkout@v2
- name: Download All Artifacts
- name: Download All Native Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /home/runner/.m2/repository/org/apache/arrow/
- name: Setup java and maven
run: |
sudo apt-get update
Expand Down Expand Up @@ -312,11 +336,16 @@ jobs:
sudo docker image prune --all --force > /dev/null
df -h
- uses: actions/checkout@v2
- name: Download All Artifacts
- name: Download All Native Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /home/runner/.m2/repository/org/apache/arrow/
- name: Setup java and maven
run: |
sudo apt-get update
Expand Down Expand Up @@ -400,11 +429,16 @@ jobs:
container: centos:8
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
- name: Download All Native Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Update mirror list
run: |
sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true
Expand Down Expand Up @@ -461,11 +495,16 @@ jobs:
container: ubuntu:22.04
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
- name: Download All Native Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Setup java and maven
run: |
apt-get update && apt-get install -y openjdk-8-jdk maven wget
Expand Down Expand Up @@ -506,7 +545,9 @@ jobs:
id: cache
uses: actions/cache/restore@v3
with:
path: ./cpp/build/releases/
path: |
./cpp/build/releases/
/root/.m2/repository/org/apache/arrow/
key: cache-velox-build-centos-8-${{ hashFiles('./cache-key') }}
- name: Setup build dependency
if: ${{ steps.cache.outputs.cache-hit != 'true' }}
Expand Down
8 changes: 6 additions & 2 deletions .github/workflows/velox_docker_cache.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ jobs:
uses: actions/cache/restore@v3
with:
lookup-only: true
path: ./cpp/build/releases/
path: |
./cpp/build/releases/
/root/.m2/repository/org/apache/arrow/
key: cache-velox-build-${{ hashFiles('./cache-key') }}
- name: Build Gluten Velox third party
if: steps.check-cache.outputs.cache-hit != 'true'
Expand All @@ -49,7 +51,9 @@ jobs:
id: cache
uses: actions/cache/save@v3
with:
path: ./cpp/build/releases/
path: |
./cpp/build/releases/
/root/.m2/repository/org/apache/arrow/
key: cache-velox-build-${{ hashFiles('./cache-key') }}

# ccache-native-lib-ubuntu-velox-ut:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

import org.apache.arrow.c.ArrowSchema
import org.apache.arrow.dataset.file.FileSystemDatasetFactory
import org.apache.arrow.dataset.scanner.ScanOptions
import org.apache.arrow.dataset.scanner.csv.CsvFragmentScanOptions
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector.VectorUnloader
import org.apache.arrow.vector.types.pojo.Schema
Expand All @@ -51,11 +53,17 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import java.net.URLDecoder
import java.util.Optional

import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.JavaConverters.{asJavaIterableConverter, asScalaBufferConverter}

class ArrowCSVFileFormat extends FileFormat with DataSourceRegister with Logging with Serializable {
class ArrowCSVFileFormat(parsedOptions: CSVOptions)
extends FileFormat
with DataSourceRegister
with Logging
with Serializable {

private val fileFormat = org.apache.arrow.dataset.file.FileFormat.CSV
private lazy val pool = ArrowNativeMemoryPool.arrowPool("FileSystem Read")
var fallback = false

override def isSplitable(
sparkSession: SparkSession,
Expand All @@ -68,9 +76,11 @@ class ArrowCSVFileFormat extends FileFormat with DataSourceRegister with Logging
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions)
ArrowUtil.readSchema(
files,
fileFormat,
arrowConfig,
ArrowBufferAllocators.contextInstance(),
ArrowNativeMemoryPool.arrowPool("infer schema"))
}
Expand All @@ -89,51 +99,74 @@ class ArrowCSVFileFormat extends FileFormat with DataSourceRegister with Logging
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val batchSize = sqlConf.columnBatchSize
val caseSensitive = sqlConf.caseSensitiveAnalysis
val columnPruning = sqlConf.csvColumnPruning &&
!requiredSchema.exists(_.name == sparkSession.sessionState.conf.columnNameOfCorruptRecord)
val parsedOptions = new CSVOptions(
options,
columnPruning,
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
val actualFilters =
filters.filterNot(_.references.contains(parsedOptions.columnNameOfCorruptRecord))
(file: PartitionedFile) => {
val actualDataSchema = StructType(
dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val actualRequiredSchema = StructType(
requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
ArrowCSVFileFormat.checkHeader(
file,
dataSchema,
requiredSchema,
actualDataSchema,
actualRequiredSchema,
parsedOptions,
actualFilters,
broadcastedHadoopConf.value.value)
val factory =
ArrowUtil.makeArrowDiscovery(

val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions)
val allocator = ArrowBufferAllocators.contextInstance()
// todo predicate validation / pushdown
val fileNames = ArrowUtil
.readArrowFileColumnNames(
URLDecoder.decode(file.filePath.toString, "UTF-8"),
fileFormat,
arrowConfig,
ArrowBufferAllocators.contextInstance(),
ArrowNativeMemoryPool.arrowPool("FileSystemDatasetFactory")
)
// todo predicate validation / pushdown
val fileFields = factory.inspect().getFields.asScala
pool)
val tokenIndexArr =
actualRequiredSchema
.map(f => java.lang.Integer.valueOf(actualDataSchema.indexOf(f)))
.toArray
val fileIndex = tokenIndexArr.filter(_ < fileNames.length)
val requestSchema = new StructType(
fileIndex
.map(index => StructField(fileNames(index), actualDataSchema(index).dataType)))
val missingIndex = tokenIndexArr.filter(_ >= fileNames.length)
val missingSchema = new StructType(missingIndex.map(actualDataSchema(_)))
// TODO: support array/map/struct types in out-of-order schema reading.
val cSchema: ArrowSchema = ArrowSchema.allocateNew(allocator)
val cSchema2: ArrowSchema = ArrowSchema.allocateNew(allocator)
try {
val actualReadFields =
ArrowUtil.getRequestedField(requiredSchema, fileFields, caseSensitive)
ArrowCSVOptionConverter.schema(requestSchema, cSchema, allocator, arrowConfig)
val factory =
ArrowUtil.makeArrowDiscovery(
URLDecoder.decode(file.filePath.toString, "UTF-8"),
fileFormat,
Optional.of(arrowConfig),
ArrowBufferAllocators.contextInstance(),
pool)
val fields = factory.inspect().getFields
val actualReadFields = new Schema(
fileIndex.map(index => fields.get(index)).toIterable.asJava)
ArrowCSVOptionConverter.schema(requestSchema, cSchema2, allocator, arrowConfig)
ArrowCSVFileFormat
.readArrow(
ArrowBufferAllocators.contextInstance(),
file,
actualReadFields,
caseSensitive,
requiredSchema,
missingSchema,
partitionSchema,
factory,
batchSize)
batchSize,
arrowConfig)
.asInstanceOf[Iterator[InternalRow]]
} catch {
case e: SchemaMismatchException =>
logWarning(e.getMessage)
fallback = true
val iter = ArrowCSVFileFormat.fallbackReadVanilla(
dataSchema,
requiredSchema,
Expand All @@ -148,8 +181,10 @@ class ArrowCSVFileFormat extends FileFormat with DataSourceRegister with Logging
.rowToColumn(schema, batchSize, rows)
.asInstanceOf[Iterator[InternalRow]]
case d: Exception => throw d
} finally {
cSchema.close()
cSchema2.close()
}

}
}

Expand Down Expand Up @@ -184,28 +219,23 @@ object ArrowCSVFileFormat {
allocator: BufferAllocator,
file: PartitionedFile,
actualReadFields: Schema,
caseSensitive: Boolean,
requiredSchema: StructType,
missingSchema: StructType,
partitionSchema: StructType,
factory: FileSystemDatasetFactory,
batchSize: Int): Iterator[ColumnarBatch] = {
val compare = ArrowUtil.compareStringFunc(caseSensitive)
batchSize: Int,
arrowConfig: CsvFragmentScanOptions): Iterator[ColumnarBatch] = {
val actualReadFieldNames = actualReadFields.getFields.asScala.map(_.getName).toArray
val actualReadSchema = new StructType(
actualReadFieldNames.map(f => requiredSchema.find(field => compare(f, field.name)).get))
val dataset = factory.finish(actualReadFields)

val hasMissingColumns = actualReadFields.getFields.size() != requiredSchema.size

val scanOptions = new ScanOptions(batchSize, Optional.of(actualReadFieldNames))
val scanOptions = new ScanOptions.Builder(batchSize)
.columns(Optional.of(actualReadFieldNames))
.fragmentScanOptions(arrowConfig)
.build()
val scanner = dataset.newScan(scanOptions)

val partitionVectors =
ArrowUtil.loadPartitionColumns(batchSize, partitionSchema, file.partitionValues)

val nullVectors = if (hasMissingColumns) {
val missingSchema =
new StructType(requiredSchema.filterNot(actualReadSchema.contains).toArray)
val nullVectors = if (missingSchema.nonEmpty) {
ArrowUtil.loadMissingColumns(batchSize, missingSchema)
} else {
Array.empty[ArrowWritableColumnVector]
Expand All @@ -225,8 +255,7 @@ object ArrowCSVFileFormat {
val batch = ArrowUtil.loadBatch(
allocator,
unloader.getRecordBatch,
actualReadSchema,
requiredSchema,
actualReadFields,
partitionVectors,
nullVectors)
batch
Expand All @@ -246,19 +275,15 @@ object ArrowCSVFileFormat {

def checkHeader(
file: PartitionedFile,
dataSchema: StructType,
requiredSchema: StructType,
actualDataSchema: StructType,
actualRequiredSchema: StructType,
parsedOptions: CSVOptions,
actualFilters: Seq[Filter],
conf: Configuration): Unit = {
val isStartOfFile = file.start == 0
if (!isStartOfFile) {
return
}
val actualDataSchema = StructType(
dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val actualRequiredSchema = StructType(
requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val parser =
new UnivocityParser(actualDataSchema, actualRequiredSchema, parsedOptions, actualFilters)
val schema = if (parsedOptions.columnPruning) actualRequiredSchema else actualDataSchema
Expand Down
Loading

0 comments on commit ad817ed

Please sign in to comment.