diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml
index 67e97394bd73..e0f96eda1d3d 100644
--- a/backends-clickhouse/pom.xml
+++ b/backends-clickhouse/pom.xml
@@ -156,22 +156,18 @@
com.fasterxml.jackson.core
jackson-databind
- provided
com.fasterxml.jackson.core
jackson-annotations
- provided
com.fasterxml.jackson.core
jackson-core
- provided
com.fasterxml.jackson.module
jackson-module-scala_${scala.binary.version}
- provided
org.apache.hadoop
diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml
index 4482521ed8ca..8f45986cdba1 100755
--- a/backends-velox/pom.xml
+++ b/backends-velox/pom.xml
@@ -147,22 +147,18 @@
com.fasterxml.jackson.core
jackson-databind
- test
com.fasterxml.jackson.core
jackson-annotations
- test
com.fasterxml.jackson.core
jackson-core
- test
com.fasterxml.jackson.module
jackson-module-scala_${scala.binary.version}
- test
com.google.jimfs
diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
index 50111ccb4ec5..18447ceb5dd6 100644
--- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
+++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
@@ -17,21 +17,62 @@
package org.apache.spark.sql.execution
import io.glutenproject.columnarbatch.ColumnarBatches
+import io.glutenproject.execution.WriteFilesExecTransformer
+import io.glutenproject.extension.GlutenPlan
import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators
+import org.apache.spark.TaskContext
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericInternalRow}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.execution.datasources.{BasicWriteTaskStats, ExecutedWriteSummary, FileFormat, PartitioningUtils, WriteFilesExec, WriteFilesSpec, WriteTaskResult}
-import org.apache.spark.sql.vectorized.ColumnarBatch
-import shaded.parquet.com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
import scala.collection.mutable
+// Velox write files metrics start
+//
+// Follows the code in velox `HiveDataSink::close()`
+// The json can be as following:
+// {
+// "inMemoryDataSizeInBytes":0,
+// "containsNumberedFileNames":true,
+// "onDiskDataSizeInBytes":307,
+// "fileWriteInfos":[
+// {
+// "fileSize":307,
+// "writeFileName":
+// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet",
+// "targetFileName":
+// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet"
+// }
+// ],
+// "writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
+// "rowCount":1,
+// "targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
+// "updateMode":"NEW",
+// "name":"part1=1/part2=1"
+// }
+case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String, fileSize: Long)
+
+case class VeloxWriteFilesMetrics(
+ name: String,
+ updateMode: String,
+ writePath: String,
+ targetPath: String,
+ fileWriteInfos: Seq[VeloxWriteFilesInfo],
+ rowCount: Long,
+ inMemoryDataSizeInBytes: Long,
+ onDiskDataSizeInBytes: Long,
+ containsNumberedFileNames: Boolean)
+
+// Velox write files metrics end
+
class VeloxColumnarWriteFilesExec(
child: SparkPlan,
fileFormat: FileFormat,
@@ -39,13 +80,10 @@ class VeloxColumnarWriteFilesExec(
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec)
- extends WriteFilesExec(
- child,
- fileFormat,
- partitionColumns,
- bucketSpec,
- options,
- staticPartitions) {
+ extends WriteFilesExec(child, fileFormat, partitionColumns, bucketSpec, options, staticPartitions)
+ with GlutenPlan {
+
+ override lazy val references: AttributeSet = AttributeSet.empty
override def supportsColumnar(): Boolean = true
@@ -58,81 +96,77 @@ class VeloxColumnarWriteFilesExec(
// during the actual execution, specifically in the doExecuteWrite method of
// ColumnarWriteFilesExec, where it is available within the WriteFilesSpec.
// Therefore, we use this hack method to pass the writePath.
- child.session.sparkContext.setLocalProperty("writePath", writeFilesSpec.description.path)
- child.executeColumnar().mapPartitionsInternal {
- iter =>
- // Currently, the cb contains three columns: row, fragments, and context.
- // The first row in the row column contains the number of written numRows.
- // The fragments column contains detailed information about the file writes.
- // The json can be as following:
- // {
- // "inMemoryDataSizeInBytes":0,
- // "containsNumberedFileNames":true,
- // "onDiskDataSizeInBytes":307,
- // "fileWriteInfos":[
- // {
- // "fileSize":307,
- // "writeFileName":
- // "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet",
- // "targetFileName":
- // "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet"
- // }
- // ],
- // "writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
- // "rowCount":1,
- // "targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
- // "updateMode":"NEW",
- // "name":"part1=1/part2=1"
- // }
- assert(iter.hasNext)
- val cb = iter.next()
- val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
- val numWrittenRows = loadedCb.column(0).getLong(0)
-
- var updatedPartitions = Set.empty[String]
- val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]()
- var numBytes = 0L
- for (i <- 0 until loadedCb.numRows() - 1) {
- val fragments = loadedCb.column(1).getUTF8String(i + 1)
+ WriteFilesExecTransformer.withWriteFilePath(writeFilesSpec.description.path) {
+ child.executeColumnar().mapPartitionsInternal {
+ iter =>
+ // Currently, the cb contains three columns: row, fragments, and context.
+ // The first row in the row column contains the number of written numRows.
+ // The fragments column contains detailed information about the file writes.
+ assert(iter.hasNext)
+ val cb = iter.next()
+ val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
+ assert(loadedCb.numCols() == 3)
+ val numWrittenRows = loadedCb.column(0).getLong(0)
+
+ var updatedPartitions = Set.empty[String]
+ val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]()
+ var numBytes = 0L
val objectMapper = new ObjectMapper()
- val jsonObject = objectMapper.readTree(fragments.toString)
-
- val fileWriteInfos = jsonObject.get("fileWriteInfos").elements()
- if (jsonObject.get("fileWriteInfos").elements().hasNext) {
- val writeInfo = fileWriteInfos.next();
- numBytes += writeInfo.get("fileSize").longValue()
- // Get partition information.
- if (jsonObject.get("name").textValue().nonEmpty) {
- val targetFileName = writeInfo.get("targetFileName").textValue()
- val partitionDir = jsonObject.get("name").textValue()
- updatedPartitions += partitionDir
- val tmpOutputPath =
- writeFilesSpec.description.path + "/" + partitionDir + "/" + targetFileName
- val absOutputPathObject =
- writeFilesSpec.description.customPartitionLocations.get(
- PartitioningUtils.parsePathFragment(partitionDir))
- if (absOutputPathObject.nonEmpty) {
- val absOutputPath = absOutputPathObject.get + "/" + targetFileName
- addedAbsPathFiles(tmpOutputPath) = absOutputPath
+ objectMapper.registerModule(DefaultScalaModule)
+ for (i <- 0 until loadedCb.numRows() - 1) {
+ val fragments = loadedCb.column(1).getUTF8String(i + 1)
+ val metrics = objectMapper
+ .readValue(fragments.toString.getBytes("UTF-8"), classOf[VeloxWriteFilesMetrics])
+ logDebug(s"Velox write files metrics: $metrics")
+
+ val fileWriteInfos = metrics.fileWriteInfos
+ assert(fileWriteInfos.length == 1)
+ val fileWriteInfo = fileWriteInfos.head
+ numBytes += fileWriteInfo.fileSize
+ val targetFileName = fileWriteInfo.targetFileName
+ val outputPath = writeFilesSpec.description.path
+
+ // part1=1/part2=1
+ val partitionFragment = metrics.name
+ // write a non-partitioned table
+ if (partitionFragment != "") {
+ updatedPartitions += partitionFragment
+ val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + targetFileName
+ val customOutputPath = writeFilesSpec.description.customPartitionLocations.get(
+ PartitioningUtils.parsePathFragment(partitionFragment))
+ if (customOutputPath.isDefined) {
+ addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName
}
}
}
- }
- // TODO: need to get the partition Internal row?
- val stats = BasicWriteTaskStats(Seq.empty, loadedCb.numRows() - 1, numBytes, numWrittenRows)
- val summary =
- ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats))
+ // Reports bytesWritten and recordsWritten to the Spark output metrics.
+ Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach {
+ outputMetrics =>
+ outputMetrics.setBytesWritten(numBytes)
+ outputMetrics.setRecordsWritten(numWrittenRows)
+ }
- val result = WriteTaskResult(
- new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
- summary)
- Iterator.single(result)
- }
- }
+ val partitionsInternalRows = updatedPartitions.map {
+ part =>
+ val parts = new Array[Any](1)
+ parts(0) = part
+ new GenericInternalRow(parts)
+ }.toSeq
+ val stats = BasicWriteTaskStats(
+ partitions = partitionsInternalRows,
+ numFiles = loadedCb.numRows() - 1,
+ numBytes = numBytes,
+ numRows = numWrittenRows)
+ val summary =
+ ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats))
- override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
- throw new UnsupportedOperationException(s"This operator doesn't support doExecuteColumnar().")
+ val result = WriteTaskResult(
+ new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
+ summary)
+ Iterator.single(result)
+ }
+ }
}
override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExec =
diff --git a/gluten-core/pom.xml b/gluten-core/pom.xml
index de033959fb3d..de9d9cca7668 100644
--- a/gluten-core/pom.xml
+++ b/gluten-core/pom.xml
@@ -198,22 +198,18 @@
com.fasterxml.jackson.core
jackson-databind
- test
com.fasterxml.jackson.core
jackson-annotations
- test
com.fasterxml.jackson.core
jackson-core
- test
com.fasterxml.jackson.module
jackson-module-scala_${scala.binary.version}
- test
diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala
index 042abe30b564..350f65251d25 100644
--- a/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala
+++ b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala
@@ -156,8 +156,7 @@ case class WriteFilesExecTransformer(
}
override def doTransform(context: SubstraitContext): TransformContext = {
- val writePath = child.session.sparkContext.getLocalProperty("writePath")
- assert(writePath.size > 0)
+ val writePath = WriteFilesExecTransformer.getWriteFilePath
val childCtx = child.asInstanceOf[TransformSupport].doTransform(context)
val operatorId = context.nextOperatorId(this.nodeName)
@@ -175,3 +174,23 @@ case class WriteFilesExecTransformer(
override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExecTransformer =
copy(child = newChild)
}
+
+object WriteFilesExecTransformer {
+ private val writeFilePathThreadLocal = new ThreadLocal[String]
+
+ def withWriteFilePath[T](path: String)(f: => T): T = {
+ val origin = writeFilePathThreadLocal.get()
+ writeFilePathThreadLocal.set(path)
+ try {
+ f
+ } finally {
+ writeFilePathThreadLocal.set(origin)
+ }
+ }
+
+ def getWriteFilePath: String = {
+ val writeFilePath = writeFilePathThreadLocal.get()
+ assert(writeFilePath != null)
+ writeFilePath
+ }
+}
diff --git a/gluten-ui/pom.xml b/gluten-ui/pom.xml
index 3a359a94cb37..7ccea38cfa37 100644
--- a/gluten-ui/pom.xml
+++ b/gluten-ui/pom.xml
@@ -31,17 +31,14 @@
com.fasterxml.jackson.core
jackson-core
- provided
com.fasterxml.jackson.core
jackson-databind
- provided
com.fasterxml.jackson.core
jackson-annotations
- provided
diff --git a/gluten-ut/pom.xml b/gluten-ut/pom.xml
index c2e4a7dbf504..1c8d49f1508e 100644
--- a/gluten-ut/pom.xml
+++ b/gluten-ut/pom.xml
@@ -118,22 +118,18 @@
com.fasterxml.jackson.core
jackson-databind
- test
com.fasterxml.jackson.core
jackson-annotations
- test
com.fasterxml.jackson.core
jackson-core
- test
com.fasterxml.jackson.module
jackson-module-scala_${scala.binary.version}
- test
org.scalatestplus
diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
index 165d51731302..a34e6c065c79 100644
--- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
+++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
@@ -16,6 +16,67 @@
*/
package org.apache.spark.sql.sources
+import org.apache.spark.SparkConf
+import org.apache.spark.executor.OutputMetrics
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql._
+import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, VeloxColumnarWriteFilesExec}
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.util.QueryExecutionListener
-class GlutenInsertSuite extends InsertSuite with GlutenSQLTestsBaseTrait {}
+class GlutenInsertSuite extends InsertSuite with GlutenSQLTestsBaseTrait {
+
+ override def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.sql.leafNodeDefaultParallelism", "1")
+ }
+
+ test("Gluten: insert partition table") {
+ withTable("pt") {
+ spark.sql("CREATE TABLE pt (c1 int, c2 string) USING PARQUET PARTITIONED BY (pt string)")
+
+ var taskMetrics: OutputMetrics = null
+ val taskListener = new SparkListener {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+ taskMetrics = taskEnd.taskMetrics.outputMetrics
+ }
+ }
+
+ var sqlMetrics: Map[String, SQLMetric] = null
+ val queryListener = new QueryExecutionListener {
+ override def onFailure(f: String, qe: QueryExecution, e: Exception): Unit = {}
+ override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
+ qe.executedPlan match {
+ case dataWritingCommandExec: DataWritingCommandExec =>
+ sqlMetrics = dataWritingCommandExec.cmd.metrics
+ case _ =>
+ }
+ }
+ }
+ spark.sparkContext.addSparkListener(taskListener)
+ spark.listenerManager.register(queryListener)
+ try {
+ val df =
+ spark.sql("INSERT INTO TABLE pt partition(pt='a') SELECT * FROM VALUES(1, 'a'),(2, 'b')")
+ spark.sparkContext.listenerBus.waitUntilEmpty()
+ val writeFiles = df.queryExecution.executedPlan
+ .asInstanceOf[CommandResultExec]
+ .commandPhysicalPlan
+ .children
+ .head
+ assert(writeFiles.isInstanceOf[VeloxColumnarWriteFilesExec])
+
+ assert(taskMetrics.bytesWritten > 0)
+ assert(taskMetrics.recordsWritten == 2)
+ assert(sqlMetrics("numParts").value == 1)
+ assert(sqlMetrics("numOutputRows").value == 2)
+ assert(sqlMetrics("numOutputBytes").value > 0)
+ assert(sqlMetrics("numFiles").value == 1)
+
+ } finally {
+ spark.sparkContext.removeSparkListener(taskListener)
+ spark.listenerManager.unregister(queryListener)
+ }
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index a225bd234c56..bedb42cc3839 100644
--- a/pom.xml
+++ b/pom.xml
@@ -484,31 +484,31 @@
com.fasterxml.jackson.core
jackson-annotations
${fasterxml.version}
- test
+ provided
com.fasterxml.jackson.core
jackson-core
${fasterxml.version}
- test
+ provided
com.fasterxml.jackson.core
jackson-databind
${fasterxml.version}
- test
+ provided
com.fasterxml.jackson.datatype
jackson-datatype-guava
${fasterxml.version}
- test
+ provided
com.fasterxml.jackson.module
jackson-module-scala_${scala.binary.version}
${fasterxml.version}
- test
+ provided
org.apache.maven.plugins
diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
deleted file mode 100644
index e970cdb67d8a..000000000000
--- a/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive.execution
-
-import io.glutenproject.execution.datasource.GlutenOrcWriterInjects
-import io.glutenproject.execution.datasource.GlutenParquetWriterInjects
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.SPECULATION_ENABLED
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory}
-import org.apache.spark.sql.execution.datasources.orc.OrcOptions
-import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
-import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
-import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
-import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableJobConf
-
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.hive.ql.exec.Utilities
-import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
-import org.apache.hadoop.hive.serde2.Serializer
-import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector}
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred.{JobConf, Reporter}
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
-
-import scala.collection.JavaConverters._
-
-/**
- * `FileFormat` for writing Hive tables.
- *
- * TODO: implement the read logic.
- */
-class HiveFileFormat(fileSinkConf: FileSinkDesc)
- extends FileFormat
- with DataSourceRegister
- with Logging {
-
- def this() = this(null)
-
- override def shortName(): String = "hive"
-
- override def inferSchema(
- sparkSession: SparkSession,
- options: Map[String, String],
- files: Seq[FileStatus]): Option[StructType] = {
- throw QueryExecutionErrors.inferSchemaUnsupportedForHiveError()
- }
-
- override def prepareWrite(
- sparkSession: SparkSession,
- job: Job,
- options: Map[String, String],
- dataSchema: StructType): OutputWriterFactory = {
- val conf = job.getConfiguration
- val tableDesc = fileSinkConf.getTableInfo
- conf.set("mapred.output.format.class", tableDesc.getOutputFileFormatClassName)
-
- // When speculation is on and output committer class name contains "Direct", we should warn
- // users that they may loss data if they are using a direct output committer.
- val speculationEnabled = sparkSession.sparkContext.conf.get(SPECULATION_ENABLED)
- val outputCommitterClass = conf.get("mapred.output.committer.class", "")
- if (speculationEnabled && outputCommitterClass.contains("Direct")) {
- val warningMessage =
- s"$outputCommitterClass may be an output committer that writes data directly to " +
- "the final location. Because speculation is enabled, this output committer may " +
- "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
- "committer that does not have this behavior (e.g. FileOutputCommitter)."
- logWarning(warningMessage)
- }
-
- // Add table properties from storage handler to hadoopConf, so any custom storage
- // handler settings can be set to hadoopConf
- HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, false)
- Utilities.copyTableJobPropertiesToConf(tableDesc, conf)
-
- // Avoid referencing the outer object.
- val fileSinkConfSer = fileSinkConf
- val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName
- if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) {
- val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat")
- val isParquetFormat = nativeFormat.equals("parquet")
- val compressionCodec = if (fileSinkConf.compressed) {
- // hive related configurations
- fileSinkConf.compressCodec
- } else if (isParquetFormat) {
- val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
- parquetOptions.compressionCodecClassName
- } else {
- val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
- orcOptions.compressionCodec
- }
-
- val nativeConf = if (isParquetFormat) {
- logInfo("Use Gluten parquet write for hive")
- GlutenParquetWriterInjects.getInstance().nativeConf(options, compressionCodec)
- } else {
- logInfo("Use Gluten orc write for hive")
- GlutenOrcWriterInjects.getInstance().nativeConf(options, compressionCodec)
- }
-
- new OutputWriterFactory {
- private val jobConf = new SerializableJobConf(new JobConf(conf))
- @transient private lazy val outputFormat =
- jobConf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
-
- override def getFileExtension(context: TaskAttemptContext): String = {
- Utilities.getFileExtension(jobConf.value, fileSinkConfSer.getCompressed, outputFormat)
- }
-
- override def newInstance(
- path: String,
- dataSchema: StructType,
- context: TaskAttemptContext): OutputWriter = {
- if (isParquetFormat) {
- GlutenParquetWriterInjects
- .getInstance()
- .createOutputWriter(path, dataSchema, context, nativeConf);
- } else {
- GlutenOrcWriterInjects
- .getInstance()
- .createOutputWriter(path, dataSchema, context, nativeConf);
- }
- }
- }
- } else {
- new OutputWriterFactory {
- private val jobConf = new SerializableJobConf(new JobConf(conf))
- @transient private lazy val outputFormat =
- jobConf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
-
- override def getFileExtension(context: TaskAttemptContext): String = {
- Utilities.getFileExtension(jobConf.value, fileSinkConfSer.getCompressed, outputFormat)
- }
-
- override def newInstance(
- path: String,
- dataSchema: StructType,
- context: TaskAttemptContext): OutputWriter = {
- new HiveOutputWriter(path, fileSinkConfSer, jobConf.value, dataSchema)
- }
- }
- }
- }
-
- override def supportFieldName(name: String): Boolean = {
- fileSinkConf.getTableInfo.getOutputFileFormatClassName match {
- case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" =>
- !name.matches(".*[ ,;{}()\n\t=].*")
- case "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat" =>
- try {
- TypeInfoUtils.getTypeInfoFromTypeString(s"struct<$name:int>")
- true
- } catch {
- case _: IllegalArgumentException => false
- }
- case _ => true
- }
- }
-}
-
-class HiveOutputWriter(
- val path: String,
- fileSinkConf: FileSinkDesc,
- jobConf: JobConf,
- dataSchema: StructType)
- extends OutputWriter
- with HiveInspectors {
-
- private def tableDesc = fileSinkConf.getTableInfo
-
- private val serializer = {
- val serializer =
- tableDesc.getDeserializerClass.getConstructor().newInstance().asInstanceOf[Serializer]
- serializer.initialize(jobConf, tableDesc.getProperties)
- serializer
- }
-
- private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter(
- jobConf,
- tableDesc,
- serializer.getSerializedClass,
- fileSinkConf,
- new Path(path),
- Reporter.NULL)
-
- /**
- * Since SPARK-30201 ObjectInspectorCopyOption.JAVA change to ObjectInspectorCopyOption.DEFAULT.
- * The reason is DEFAULT option can convert `UTF8String` to `Text` with bytes and we can
- * compatible with non UTF-8 code bytes during write.
- */
- private val standardOI = ObjectInspectorUtils
- .getStandardObjectInspector(
- tableDesc.getDeserializer(jobConf).getObjectInspector,
- ObjectInspectorCopyOption.DEFAULT)
- .asInstanceOf[StructObjectInspector]
-
- private val fieldOIs =
- standardOI.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray
- private val dataTypes = dataSchema.map(_.dataType).toArray
- private val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) }
- private val outputData = new Array[Any](fieldOIs.length)
-
- override def write(row: InternalRow): Unit = {
- var i = 0
- while (i < fieldOIs.length) {
- outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i)))
- i += 1
- }
- hiveWriter.write(serializer.serialize(outputData, standardOI))
- }
-
- override def close(): Unit = {
- // Seems the boolean value passed into close does not matter.
- hiveWriter.close(false)
- }
-}