Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-5414] [VL] Support arrow csv option and schema #5850

Merged
merged 19 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 56 additions & 15 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,24 @@ jobs:
id: cache
uses: actions/cache/restore@v3
with:
path: ./cpp/build/releases/
path: |
./cpp/build/releases/
~/.m2/repository/org/apache/arrow/
Comment on lines +62 to +64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we modify velox_docker_cache.yml as well? Did you check that file already?

key: cache-velox-build-${{ hashFiles('./cache-key') }}
- name: Build Gluten Velox third party
if: ${{ steps.cache.outputs.cache-hit != 'true' }}
run: |
source dev/ci-velox-buildstatic.sh
- uses: actions/upload-artifact@v2
- name: Upload Artifact Native
uses: actions/upload-artifact@v2
with:
path: ./cpp/build/releases/
name: velox-native-lib-centos-7-${{github.sha}}
- name: Upload Artifact Arrow Jar
uses: actions/upload-artifact@v2
with:
path: /root/.m2/repository/org/apache/arrow/
name: velox-arrow-jar-centos-7-${{github.sha}}

run-tpc-test-ubuntu:
needs: build-native-lib-centos-7
Expand All @@ -92,11 +100,16 @@ jobs:
container: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
- name: Download All Native Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Setup java and maven
run: |
if [ "${{ matrix.java }}" = "java-17" ]; then
Expand All @@ -105,6 +118,7 @@ jobs:
apt-get update && apt-get install -y openjdk-8-jdk maven
fi
apt remove openjdk-11* -y
ls -l /root/.m2/repository/org/apache/arrow/arrow-dataset/15.0.0-gluten/
- name: Build and run TPCH/DS
run: |
cd $GITHUB_WORKSPACE/
Expand Down Expand Up @@ -140,11 +154,16 @@ jobs:
container: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
- name: Download All Native Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Update mirror list
if: matrix.os == 'centos:8'
run: |
Expand Down Expand Up @@ -215,11 +234,16 @@ jobs:
sudo docker image prune --all --force > /dev/null
df -h
- uses: actions/checkout@v2
- name: Download All Artifacts
- name: Download All Native Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /home/runner/.m2/repository/org/apache/arrow/
- name: Setup java and maven
run: |
sudo apt-get update
Expand Down Expand Up @@ -312,11 +336,16 @@ jobs:
sudo docker image prune --all --force > /dev/null
df -h
- uses: actions/checkout@v2
- name: Download All Artifacts
- name: Download All Native Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /home/runner/.m2/repository/org/apache/arrow/
- name: Setup java and maven
run: |
sudo apt-get update
Expand Down Expand Up @@ -400,11 +429,16 @@ jobs:
container: centos:8
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
- name: Download All Native Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Update mirror list
run: |
sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true
Expand Down Expand Up @@ -461,11 +495,16 @@ jobs:
container: ubuntu:22.04
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
- name: Download All Native Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Setup java and maven
run: |
apt-get update && apt-get install -y openjdk-8-jdk maven wget
Expand Down Expand Up @@ -506,7 +545,9 @@ jobs:
id: cache
uses: actions/cache/restore@v3
with:
path: ./cpp/build/releases/
path: |
./cpp/build/releases/
/root/.m2/repository/org/apache/arrow/
key: cache-velox-build-centos-8-${{ hashFiles('./cache-key') }}
- name: Setup build dependency
if: ${{ steps.cache.outputs.cache-hit != 'true' }}
Expand Down
8 changes: 6 additions & 2 deletions .github/workflows/velox_docker_cache.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ jobs:
uses: actions/cache/restore@v3
with:
lookup-only: true
path: ./cpp/build/releases/
path: |
./cpp/build/releases/
/root/.m2/repository/org/apache/arrow/
key: cache-velox-build-${{ hashFiles('./cache-key') }}
- name: Build Gluten Velox third party
if: steps.check-cache.outputs.cache-hit != 'true'
Expand All @@ -49,7 +51,9 @@ jobs:
id: cache
uses: actions/cache/save@v3
with:
path: ./cpp/build/releases/
path: |
./cpp/build/releases/
/root/.m2/repository/org/apache/arrow/
key: cache-velox-build-${{ hashFiles('./cache-key') }}

# ccache-native-lib-ubuntu-velox-ut:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

import org.apache.arrow.c.ArrowSchema
import org.apache.arrow.dataset.file.FileSystemDatasetFactory
import org.apache.arrow.dataset.scanner.ScanOptions
import org.apache.arrow.dataset.scanner.csv.CsvFragmentScanOptions
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector.VectorUnloader
import org.apache.arrow.vector.types.pojo.Schema
Expand All @@ -51,11 +53,17 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import java.net.URLDecoder
import java.util.Optional

import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.JavaConverters.{asJavaIterableConverter, asScalaBufferConverter}

class ArrowCSVFileFormat extends FileFormat with DataSourceRegister with Logging with Serializable {
class ArrowCSVFileFormat(parsedOptions: CSVOptions)
extends FileFormat
with DataSourceRegister
with Logging
with Serializable {

private val fileFormat = org.apache.arrow.dataset.file.FileFormat.CSV
private lazy val pool = ArrowNativeMemoryPool.arrowPool("FileSystem Read")
var fallback = false

override def isSplitable(
sparkSession: SparkSession,
Expand All @@ -68,9 +76,11 @@ class ArrowCSVFileFormat extends FileFormat with DataSourceRegister with Logging
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions)
ArrowUtil.readSchema(
files,
fileFormat,
arrowConfig,
ArrowBufferAllocators.contextInstance(),
ArrowNativeMemoryPool.arrowPool("infer schema"))
}
Expand All @@ -89,51 +99,74 @@ class ArrowCSVFileFormat extends FileFormat with DataSourceRegister with Logging
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val batchSize = sqlConf.columnBatchSize
val caseSensitive = sqlConf.caseSensitiveAnalysis
val columnPruning = sqlConf.csvColumnPruning &&
!requiredSchema.exists(_.name == sparkSession.sessionState.conf.columnNameOfCorruptRecord)
val parsedOptions = new CSVOptions(
options,
columnPruning,
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
val actualFilters =
filters.filterNot(_.references.contains(parsedOptions.columnNameOfCorruptRecord))
(file: PartitionedFile) => {
val actualDataSchema = StructType(
dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val actualRequiredSchema = StructType(
requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
ArrowCSVFileFormat.checkHeader(
file,
dataSchema,
requiredSchema,
actualDataSchema,
actualRequiredSchema,
parsedOptions,
actualFilters,
broadcastedHadoopConf.value.value)
val factory =
ArrowUtil.makeArrowDiscovery(

val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions)
val allocator = ArrowBufferAllocators.contextInstance()
// todo predicate validation / pushdown
val fileNames = ArrowUtil
.readArrowFileColumnNames(
URLDecoder.decode(file.filePath.toString, "UTF-8"),
fileFormat,
arrowConfig,
ArrowBufferAllocators.contextInstance(),
ArrowNativeMemoryPool.arrowPool("FileSystemDatasetFactory")
)
// todo predicate validation / pushdown
val fileFields = factory.inspect().getFields.asScala
pool)
val tokenIndexArr =
actualRequiredSchema
.map(f => java.lang.Integer.valueOf(actualDataSchema.indexOf(f)))
.toArray
val fileIndex = tokenIndexArr.filter(_ < fileNames.length)
val requestSchema = new StructType(
fileIndex
.map(index => StructField(fileNames(index), actualDataSchema(index).dataType)))
val missingIndex = tokenIndexArr.filter(_ >= fileNames.length)
val missingSchema = new StructType(missingIndex.map(actualDataSchema(_)))
// TODO: support array/map/struct types in out-of-order schema reading.
val cSchema: ArrowSchema = ArrowSchema.allocateNew(allocator)
val cSchema2: ArrowSchema = ArrowSchema.allocateNew(allocator)
try {
val actualReadFields =
ArrowUtil.getRequestedField(requiredSchema, fileFields, caseSensitive)
ArrowCSVOptionConverter.schema(requestSchema, cSchema, allocator, arrowConfig)
val factory =
ArrowUtil.makeArrowDiscovery(
URLDecoder.decode(file.filePath.toString, "UTF-8"),
fileFormat,
Optional.of(arrowConfig),
ArrowBufferAllocators.contextInstance(),
pool)
val fields = factory.inspect().getFields
val actualReadFields = new Schema(
fileIndex.map(index => fields.get(index)).toIterable.asJava)
ArrowCSVOptionConverter.schema(requestSchema, cSchema2, allocator, arrowConfig)
ArrowCSVFileFormat
.readArrow(
ArrowBufferAllocators.contextInstance(),
file,
actualReadFields,
caseSensitive,
requiredSchema,
missingSchema,
partitionSchema,
factory,
batchSize)
batchSize,
arrowConfig)
.asInstanceOf[Iterator[InternalRow]]
} catch {
case e: SchemaMismatchException =>
logWarning(e.getMessage)
fallback = true
val iter = ArrowCSVFileFormat.fallbackReadVanilla(
dataSchema,
requiredSchema,
Expand All @@ -148,8 +181,10 @@ class ArrowCSVFileFormat extends FileFormat with DataSourceRegister with Logging
.rowToColumn(schema, batchSize, rows)
.asInstanceOf[Iterator[InternalRow]]
case d: Exception => throw d
} finally {
cSchema.close()
cSchema2.close()
}

}
}

Expand Down Expand Up @@ -184,28 +219,23 @@ object ArrowCSVFileFormat {
allocator: BufferAllocator,
file: PartitionedFile,
actualReadFields: Schema,
caseSensitive: Boolean,
requiredSchema: StructType,
missingSchema: StructType,
partitionSchema: StructType,
factory: FileSystemDatasetFactory,
batchSize: Int): Iterator[ColumnarBatch] = {
val compare = ArrowUtil.compareStringFunc(caseSensitive)
batchSize: Int,
arrowConfig: CsvFragmentScanOptions): Iterator[ColumnarBatch] = {
val actualReadFieldNames = actualReadFields.getFields.asScala.map(_.getName).toArray
val actualReadSchema = new StructType(
actualReadFieldNames.map(f => requiredSchema.find(field => compare(f, field.name)).get))
val dataset = factory.finish(actualReadFields)

val hasMissingColumns = actualReadFields.getFields.size() != requiredSchema.size

val scanOptions = new ScanOptions(batchSize, Optional.of(actualReadFieldNames))
val scanOptions = new ScanOptions.Builder(batchSize)
.columns(Optional.of(actualReadFieldNames))
.fragmentScanOptions(arrowConfig)
.build()
val scanner = dataset.newScan(scanOptions)

val partitionVectors =
ArrowUtil.loadPartitionColumns(batchSize, partitionSchema, file.partitionValues)

val nullVectors = if (hasMissingColumns) {
val missingSchema =
new StructType(requiredSchema.filterNot(actualReadSchema.contains).toArray)
val nullVectors = if (missingSchema.nonEmpty) {
ArrowUtil.loadMissingColumns(batchSize, missingSchema)
} else {
Array.empty[ArrowWritableColumnVector]
Expand All @@ -225,8 +255,7 @@ object ArrowCSVFileFormat {
val batch = ArrowUtil.loadBatch(
allocator,
unloader.getRecordBatch,
actualReadSchema,
requiredSchema,
actualReadFields,
partitionVectors,
nullVectors)
batch
Expand All @@ -246,19 +275,15 @@ object ArrowCSVFileFormat {

def checkHeader(
file: PartitionedFile,
dataSchema: StructType,
requiredSchema: StructType,
actualDataSchema: StructType,
actualRequiredSchema: StructType,
parsedOptions: CSVOptions,
actualFilters: Seq[Filter],
conf: Configuration): Unit = {
val isStartOfFile = file.start == 0
if (!isStartOfFile) {
return
}
val actualDataSchema = StructType(
dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val actualRequiredSchema = StructType(
requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val parser =
new UnivocityParser(actualDataSchema, actualRequiredSchema, parsedOptions, actualFilters)
val schema = if (parsedOptions.columnPruning) actualRequiredSchema else actualDataSchema
Expand Down
Loading
Loading