Skip to content

Commit

Permalink
Revert "[GLUTEN-5414] [VL] Support arrow csv option and schema (apach…
Browse files Browse the repository at this point in the history
…e#5850)"

This reverts commit ad817ed.
  • Loading branch information
WangGuangxin committed Jun 19, 2024
1 parent e4388e6 commit 7f79100
Show file tree
Hide file tree
Showing 30 changed files with 312 additions and 1,474 deletions.
71 changes: 15 additions & 56 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,16 @@ jobs:
id: cache
uses: actions/cache/restore@v3
with:
path: |
./cpp/build/releases/
~/.m2/repository/org/apache/arrow/
path: ./cpp/build/releases/
key: cache-velox-build-${{ hashFiles('./cache-key') }}
- name: Build Gluten Velox third party
if: ${{ steps.cache.outputs.cache-hit != 'true' }}
run: |
source dev/ci-velox-buildstatic.sh
- name: Upload Artifact Native
uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v2
with:
path: ./cpp/build/releases/
name: velox-native-lib-centos-7-${{github.sha}}
- name: Upload Artifact Arrow Jar
uses: actions/upload-artifact@v2
with:
path: /root/.m2/repository/org/apache/arrow/
name: velox-arrow-jar-centos-7-${{github.sha}}

run-tpc-test-ubuntu:
needs: build-native-lib-centos-7
Expand All @@ -102,16 +94,11 @@ jobs:
container: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- name: Download All Native Artifacts
- name: Download All Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
path: ./cpp/build/releases
- name: Setup java and maven
run: |
if [ "${{ matrix.java }}" = "java-17" ]; then
Expand All @@ -120,7 +107,6 @@ jobs:
apt-get update && apt-get install -y openjdk-8-jdk maven
fi
apt remove openjdk-11* -y
ls -l /root/.m2/repository/org/apache/arrow/arrow-dataset/15.0.0-gluten/
- name: Build and run TPCH/DS
run: |
cd $GITHUB_WORKSPACE/
Expand Down Expand Up @@ -156,16 +142,11 @@ jobs:
container: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- name: Download All Native Artifacts
- name: Download All Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
path: ./cpp/build/releases
- name: Update mirror list
if: matrix.os == 'centos:8'
run: |
Expand Down Expand Up @@ -236,16 +217,11 @@ jobs:
sudo docker image prune --all --force > /dev/null
df -h
- uses: actions/checkout@v2
- name: Download All Native Artifacts
- name: Download All Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /home/runner/.m2/repository/org/apache/arrow/
path: ./cpp/build/releases
- name: Setup java and maven
run: |
sudo apt-get update
Expand Down Expand Up @@ -338,16 +314,11 @@ jobs:
sudo docker image prune --all --force > /dev/null
df -h
- uses: actions/checkout@v2
- name: Download All Native Artifacts
- name: Download All Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /home/runner/.m2/repository/org/apache/arrow/
path: ./cpp/build/releases
- name: Setup java and maven
run: |
sudo apt-get update
Expand Down Expand Up @@ -431,16 +402,11 @@ jobs:
container: centos:8
steps:
- uses: actions/checkout@v2
- name: Download All Native Artifacts
- name: Download All Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
path: ./cpp/build/releases
- name: Update mirror list
run: |
sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true
Expand Down Expand Up @@ -497,16 +463,11 @@ jobs:
container: ubuntu:22.04
steps:
- uses: actions/checkout@v2
- name: Download All Native Artifacts
- name: Download All Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases/
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
path: ./cpp/build/releases
- name: Setup java and maven
run: |
apt-get update && apt-get install -y openjdk-8-jdk maven wget
Expand Down Expand Up @@ -547,9 +508,7 @@ jobs:
id: cache
uses: actions/cache/restore@v3
with:
path: |
./cpp/build/releases/
/root/.m2/repository/org/apache/arrow/
path: ./cpp/build/releases/
key: cache-velox-build-centos-8-${{ hashFiles('./cache-key') }}
- name: Update mirror list
run: |
Expand Down
8 changes: 2 additions & 6 deletions .github/workflows/velox_docker_cache.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ jobs:
uses: actions/cache/restore@v3
with:
lookup-only: true
path: |
./cpp/build/releases/
/root/.m2/repository/org/apache/arrow/
path: ./cpp/build/releases/
key: cache-velox-build-${{ hashFiles('./cache-key') }}
- name: Build Gluten Velox third party
if: steps.check-cache.outputs.cache-hit != 'true'
Expand All @@ -51,9 +49,7 @@ jobs:
id: cache
uses: actions/cache/save@v3
with:
path: |
./cpp/build/releases/
/root/.m2/repository/org/apache/arrow/
path: ./cpp/build/releases/
key: cache-velox-build-${{ hashFiles('./cache-key') }}

# ccache-native-lib-ubuntu-velox-ut:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@ import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

import org.apache.arrow.c.ArrowSchema
import org.apache.arrow.dataset.file.FileSystemDatasetFactory
import org.apache.arrow.dataset.scanner.ScanOptions
import org.apache.arrow.dataset.scanner.csv.CsvFragmentScanOptions
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector.VectorUnloader
import org.apache.arrow.vector.types.pojo.Schema
Expand All @@ -53,17 +51,11 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import java.net.URLDecoder
import java.util.Optional

import scala.collection.JavaConverters.{asJavaIterableConverter, asScalaBufferConverter}
import scala.collection.JavaConverters.asScalaBufferConverter

class ArrowCSVFileFormat(parsedOptions: CSVOptions)
extends FileFormat
with DataSourceRegister
with Logging
with Serializable {
class ArrowCSVFileFormat extends FileFormat with DataSourceRegister with Logging with Serializable {

private val fileFormat = org.apache.arrow.dataset.file.FileFormat.CSV
private lazy val pool = ArrowNativeMemoryPool.arrowPool("FileSystem Read")
var fallback = false

override def isSplitable(
sparkSession: SparkSession,
Expand All @@ -76,11 +68,9 @@ class ArrowCSVFileFormat(parsedOptions: CSVOptions)
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions)
ArrowUtil.readSchema(
files,
fileFormat,
arrowConfig,
ArrowBufferAllocators.contextInstance(),
ArrowNativeMemoryPool.arrowPool("infer schema"))
}
Expand All @@ -99,74 +89,51 @@ class ArrowCSVFileFormat(parsedOptions: CSVOptions)
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val batchSize = sqlConf.columnBatchSize
val caseSensitive = sqlConf.caseSensitiveAnalysis
val columnPruning = sqlConf.csvColumnPruning &&
!requiredSchema.exists(_.name == sparkSession.sessionState.conf.columnNameOfCorruptRecord)
val parsedOptions = new CSVOptions(
options,
columnPruning,
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
val actualFilters =
filters.filterNot(_.references.contains(parsedOptions.columnNameOfCorruptRecord))
(file: PartitionedFile) => {
val actualDataSchema = StructType(
dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val actualRequiredSchema = StructType(
requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
ArrowCSVFileFormat.checkHeader(
file,
actualDataSchema,
actualRequiredSchema,
dataSchema,
requiredSchema,
parsedOptions,
actualFilters,
broadcastedHadoopConf.value.value)

val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions)
val allocator = ArrowBufferAllocators.contextInstance()
// todo predicate validation / pushdown
val fileNames = ArrowUtil
.readArrowFileColumnNames(
val factory =
ArrowUtil.makeArrowDiscovery(
URLDecoder.decode(file.filePath.toString, "UTF-8"),
fileFormat,
arrowConfig,
ArrowBufferAllocators.contextInstance(),
pool)
val tokenIndexArr =
actualRequiredSchema
.map(f => java.lang.Integer.valueOf(actualDataSchema.indexOf(f)))
.toArray
val fileIndex = tokenIndexArr.filter(_ < fileNames.length)
val requestSchema = new StructType(
fileIndex
.map(index => StructField(fileNames(index), actualDataSchema(index).dataType)))
val missingIndex = tokenIndexArr.filter(_ >= fileNames.length)
val missingSchema = new StructType(missingIndex.map(actualDataSchema(_)))
ArrowNativeMemoryPool.arrowPool("FileSystemDatasetFactory")
)
// todo predicate validation / pushdown
val fileFields = factory.inspect().getFields.asScala
// TODO: support array/map/struct types in out-of-order schema reading.
val cSchema: ArrowSchema = ArrowSchema.allocateNew(allocator)
val cSchema2: ArrowSchema = ArrowSchema.allocateNew(allocator)
try {
ArrowCSVOptionConverter.schema(requestSchema, cSchema, allocator, arrowConfig)
val factory =
ArrowUtil.makeArrowDiscovery(
URLDecoder.decode(file.filePath.toString, "UTF-8"),
fileFormat,
Optional.of(arrowConfig),
ArrowBufferAllocators.contextInstance(),
pool)
val fields = factory.inspect().getFields
val actualReadFields = new Schema(
fileIndex.map(index => fields.get(index)).toIterable.asJava)
ArrowCSVOptionConverter.schema(requestSchema, cSchema2, allocator, arrowConfig)
val actualReadFields =
ArrowUtil.getRequestedField(requiredSchema, fileFields, caseSensitive)
ArrowCSVFileFormat
.readArrow(
ArrowBufferAllocators.contextInstance(),
file,
actualReadFields,
missingSchema,
caseSensitive,
requiredSchema,
partitionSchema,
factory,
batchSize,
arrowConfig)
batchSize)
.asInstanceOf[Iterator[InternalRow]]
} catch {
case e: SchemaMismatchException =>
logWarning(e.getMessage)
fallback = true
val iter = ArrowCSVFileFormat.fallbackReadVanilla(
dataSchema,
requiredSchema,
Expand All @@ -181,10 +148,8 @@ class ArrowCSVFileFormat(parsedOptions: CSVOptions)
.rowToColumn(schema, batchSize, rows)
.asInstanceOf[Iterator[InternalRow]]
case d: Exception => throw d
} finally {
cSchema.close()
cSchema2.close()
}

}
}

Expand Down Expand Up @@ -219,23 +184,28 @@ object ArrowCSVFileFormat {
allocator: BufferAllocator,
file: PartitionedFile,
actualReadFields: Schema,
missingSchema: StructType,
caseSensitive: Boolean,
requiredSchema: StructType,
partitionSchema: StructType,
factory: FileSystemDatasetFactory,
batchSize: Int,
arrowConfig: CsvFragmentScanOptions): Iterator[ColumnarBatch] = {
batchSize: Int): Iterator[ColumnarBatch] = {
val compare = ArrowUtil.compareStringFunc(caseSensitive)
val actualReadFieldNames = actualReadFields.getFields.asScala.map(_.getName).toArray
val actualReadSchema = new StructType(
actualReadFieldNames.map(f => requiredSchema.find(field => compare(f, field.name)).get))
val dataset = factory.finish(actualReadFields)
val scanOptions = new ScanOptions.Builder(batchSize)
.columns(Optional.of(actualReadFieldNames))
.fragmentScanOptions(arrowConfig)
.build()

val hasMissingColumns = actualReadFields.getFields.size() != requiredSchema.size

val scanOptions = new ScanOptions(batchSize, Optional.of(actualReadFieldNames))
val scanner = dataset.newScan(scanOptions)

val partitionVectors =
ArrowUtil.loadPartitionColumns(batchSize, partitionSchema, file.partitionValues)

val nullVectors = if (missingSchema.nonEmpty) {
val nullVectors = if (hasMissingColumns) {
val missingSchema =
new StructType(requiredSchema.filterNot(actualReadSchema.contains).toArray)
ArrowUtil.loadMissingColumns(batchSize, missingSchema)
} else {
Array.empty[ArrowWritableColumnVector]
Expand All @@ -255,7 +225,8 @@ object ArrowCSVFileFormat {
val batch = ArrowUtil.loadBatch(
allocator,
unloader.getRecordBatch,
actualReadFields,
actualReadSchema,
requiredSchema,
partitionVectors,
nullVectors)
batch
Expand All @@ -275,15 +246,19 @@ object ArrowCSVFileFormat {

def checkHeader(
file: PartitionedFile,
actualDataSchema: StructType,
actualRequiredSchema: StructType,
dataSchema: StructType,
requiredSchema: StructType,
parsedOptions: CSVOptions,
actualFilters: Seq[Filter],
conf: Configuration): Unit = {
val isStartOfFile = file.start == 0
if (!isStartOfFile) {
return
}
val actualDataSchema = StructType(
dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val actualRequiredSchema = StructType(
requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val parser =
new UnivocityParser(actualDataSchema, actualRequiredSchema, parsedOptions, actualFilters)
val schema = if (parsedOptions.columnPruning) actualRequiredSchema else actualDataSchema
Expand Down
Loading

0 comments on commit 7f79100

Please sign in to comment.