From 8797350233304cfaef570a7fef83e6b1965ce609 Mon Sep 17 00:00:00 2001 From: Xiduo You Date: Wed, 3 Jan 2024 10:19:11 +0800 Subject: [PATCH] [VL] Followup for native write files (#4246) --- backends-clickhouse/pom.xml | 4 - backends-velox/pom.xml | 4 - .../VeloxColumnarWriteFilesExec.scala | 190 ++++++++------ gluten-core/pom.xml | 4 - .../execution/WriteFilesExecTransformer.scala | 23 +- gluten-ui/pom.xml | 3 - gluten-ut/pom.xml | 4 - .../spark/sql/sources/GlutenInsertSuite.scala | 63 ++++- pom.xml | 10 +- .../sql/hive/execution/HiveFileFormat.scala | 237 ------------------ 10 files changed, 200 insertions(+), 342 deletions(-) delete mode 100644 shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml index 67e97394bd73..e0f96eda1d3d 100644 --- a/backends-clickhouse/pom.xml +++ b/backends-clickhouse/pom.xml @@ -156,22 +156,18 @@ com.fasterxml.jackson.core jackson-databind - provided com.fasterxml.jackson.core jackson-annotations - provided com.fasterxml.jackson.core jackson-core - provided com.fasterxml.jackson.module jackson-module-scala_${scala.binary.version} - provided org.apache.hadoop diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml index 4482521ed8ca..8f45986cdba1 100755 --- a/backends-velox/pom.xml +++ b/backends-velox/pom.xml @@ -147,22 +147,18 @@ com.fasterxml.jackson.core jackson-databind - test com.fasterxml.jackson.core jackson-annotations - test com.fasterxml.jackson.core jackson-core - test com.fasterxml.jackson.module jackson-module-scala_${scala.binary.version} - test com.google.jimfs diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala index 50111ccb4ec5..18447ceb5dd6 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala @@ -17,21 +17,62 @@ package org.apache.spark.sql.execution import io.glutenproject.columnarbatch.ColumnarBatches +import io.glutenproject.execution.WriteFilesExecTransformer +import io.glutenproject.extension.GlutenPlan import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators +import org.apache.spark.TaskContext import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericInternalRow} import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.execution.datasources.{BasicWriteTaskStats, ExecutedWriteSummary, FileFormat, PartitioningUtils, WriteFilesExec, WriteFilesSpec, WriteTaskResult} -import org.apache.spark.sql.vectorized.ColumnarBatch -import shaded.parquet.com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import scala.collection.mutable +// Velox write files metrics start +// +// Follows the code in velox `HiveDataSink::close()` +// The json can be as following: +// { +// "inMemoryDataSizeInBytes":0, +// "containsNumberedFileNames":true, +// "onDiskDataSizeInBytes":307, +// "fileWriteInfos":[ +// { +// "fileSize":307, +// "writeFileName": +// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet", +// "targetFileName": +// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet" +// } +// ], +// "writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1", +// "rowCount":1, +// "targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1", +// "updateMode":"NEW", +// "name":"part1=1/part2=1" +// } +case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String, fileSize: Long) + +case class VeloxWriteFilesMetrics( + name: String, + updateMode: String, + writePath: String, + targetPath: String, + fileWriteInfos: Seq[VeloxWriteFilesInfo], + rowCount: Long, + inMemoryDataSizeInBytes: Long, + onDiskDataSizeInBytes: Long, + containsNumberedFileNames: Boolean) + +// Velox write files metrics end + class VeloxColumnarWriteFilesExec( child: SparkPlan, fileFormat: FileFormat, @@ -39,13 +80,10 @@ class VeloxColumnarWriteFilesExec( bucketSpec: Option[BucketSpec], options: Map[String, String], staticPartitions: TablePartitionSpec) - extends WriteFilesExec( - child, - fileFormat, - partitionColumns, - bucketSpec, - options, - staticPartitions) { + extends WriteFilesExec(child, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) + with GlutenPlan { + + override lazy val references: AttributeSet = AttributeSet.empty override def supportsColumnar(): Boolean = true @@ -58,81 +96,77 @@ class VeloxColumnarWriteFilesExec( // during the actual execution, specifically in the doExecuteWrite method of // ColumnarWriteFilesExec, where it is available within the WriteFilesSpec. // Therefore, we use this hack method to pass the writePath. - child.session.sparkContext.setLocalProperty("writePath", writeFilesSpec.description.path) - child.executeColumnar().mapPartitionsInternal { - iter => - // Currently, the cb contains three columns: row, fragments, and context. - // The first row in the row column contains the number of written numRows. - // The fragments column contains detailed information about the file writes. - // The json can be as following: - // { - // "inMemoryDataSizeInBytes":0, - // "containsNumberedFileNames":true, - // "onDiskDataSizeInBytes":307, - // "fileWriteInfos":[ - // { - // "fileSize":307, - // "writeFileName": - // "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet", - // "targetFileName": - // "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet" - // } - // ], - // "writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1", - // "rowCount":1, - // "targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1", - // "updateMode":"NEW", - // "name":"part1=1/part2=1" - // } - assert(iter.hasNext) - val cb = iter.next() - val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb) - val numWrittenRows = loadedCb.column(0).getLong(0) - - var updatedPartitions = Set.empty[String] - val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() - var numBytes = 0L - for (i <- 0 until loadedCb.numRows() - 1) { - val fragments = loadedCb.column(1).getUTF8String(i + 1) + WriteFilesExecTransformer.withWriteFilePath(writeFilesSpec.description.path) { + child.executeColumnar().mapPartitionsInternal { + iter => + // Currently, the cb contains three columns: row, fragments, and context. + // The first row in the row column contains the number of written numRows. + // The fragments column contains detailed information about the file writes. + assert(iter.hasNext) + val cb = iter.next() + val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb) + assert(loadedCb.numCols() == 3) + val numWrittenRows = loadedCb.column(0).getLong(0) + + var updatedPartitions = Set.empty[String] + val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() + var numBytes = 0L val objectMapper = new ObjectMapper() - val jsonObject = objectMapper.readTree(fragments.toString) - - val fileWriteInfos = jsonObject.get("fileWriteInfos").elements() - if (jsonObject.get("fileWriteInfos").elements().hasNext) { - val writeInfo = fileWriteInfos.next(); - numBytes += writeInfo.get("fileSize").longValue() - // Get partition information. - if (jsonObject.get("name").textValue().nonEmpty) { - val targetFileName = writeInfo.get("targetFileName").textValue() - val partitionDir = jsonObject.get("name").textValue() - updatedPartitions += partitionDir - val tmpOutputPath = - writeFilesSpec.description.path + "/" + partitionDir + "/" + targetFileName - val absOutputPathObject = - writeFilesSpec.description.customPartitionLocations.get( - PartitioningUtils.parsePathFragment(partitionDir)) - if (absOutputPathObject.nonEmpty) { - val absOutputPath = absOutputPathObject.get + "/" + targetFileName - addedAbsPathFiles(tmpOutputPath) = absOutputPath + objectMapper.registerModule(DefaultScalaModule) + for (i <- 0 until loadedCb.numRows() - 1) { + val fragments = loadedCb.column(1).getUTF8String(i + 1) + val metrics = objectMapper + .readValue(fragments.toString.getBytes("UTF-8"), classOf[VeloxWriteFilesMetrics]) + logDebug(s"Velox write files metrics: $metrics") + + val fileWriteInfos = metrics.fileWriteInfos + assert(fileWriteInfos.length == 1) + val fileWriteInfo = fileWriteInfos.head + numBytes += fileWriteInfo.fileSize + val targetFileName = fileWriteInfo.targetFileName + val outputPath = writeFilesSpec.description.path + + // part1=1/part2=1 + val partitionFragment = metrics.name + // write a non-partitioned table + if (partitionFragment != "") { + updatedPartitions += partitionFragment + val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + targetFileName + val customOutputPath = writeFilesSpec.description.customPartitionLocations.get( + PartitioningUtils.parsePathFragment(partitionFragment)) + if (customOutputPath.isDefined) { + addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName } } } - } - // TODO: need to get the partition Internal row? - val stats = BasicWriteTaskStats(Seq.empty, loadedCb.numRows() - 1, numBytes, numWrittenRows) - val summary = - ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) + // Reports bytesWritten and recordsWritten to the Spark output metrics. + Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { + outputMetrics => + outputMetrics.setBytesWritten(numBytes) + outputMetrics.setRecordsWritten(numWrittenRows) + } - val result = WriteTaskResult( - new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), - summary) - Iterator.single(result) - } - } + val partitionsInternalRows = updatedPartitions.map { + part => + val parts = new Array[Any](1) + parts(0) = part + new GenericInternalRow(parts) + }.toSeq + val stats = BasicWriteTaskStats( + partitions = partitionsInternalRows, + numFiles = loadedCb.numRows() - 1, + numBytes = numBytes, + numRows = numWrittenRows) + val summary = + ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) - override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { - throw new UnsupportedOperationException(s"This operator doesn't support doExecuteColumnar().") + val result = WriteTaskResult( + new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), + summary) + Iterator.single(result) + } + } } override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExec = diff --git a/gluten-core/pom.xml b/gluten-core/pom.xml index de033959fb3d..de9d9cca7668 100644 --- a/gluten-core/pom.xml +++ b/gluten-core/pom.xml @@ -198,22 +198,18 @@ com.fasterxml.jackson.core jackson-databind - test com.fasterxml.jackson.core jackson-annotations - test com.fasterxml.jackson.core jackson-core - test com.fasterxml.jackson.module jackson-module-scala_${scala.binary.version} - test diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala index 042abe30b564..350f65251d25 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala @@ -156,8 +156,7 @@ case class WriteFilesExecTransformer( } override def doTransform(context: SubstraitContext): TransformContext = { - val writePath = child.session.sparkContext.getLocalProperty("writePath") - assert(writePath.size > 0) + val writePath = WriteFilesExecTransformer.getWriteFilePath val childCtx = child.asInstanceOf[TransformSupport].doTransform(context) val operatorId = context.nextOperatorId(this.nodeName) @@ -175,3 +174,23 @@ case class WriteFilesExecTransformer( override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExecTransformer = copy(child = newChild) } + +object WriteFilesExecTransformer { + private val writeFilePathThreadLocal = new ThreadLocal[String] + + def withWriteFilePath[T](path: String)(f: => T): T = { + val origin = writeFilePathThreadLocal.get() + writeFilePathThreadLocal.set(path) + try { + f + } finally { + writeFilePathThreadLocal.set(origin) + } + } + + def getWriteFilePath: String = { + val writeFilePath = writeFilePathThreadLocal.get() + assert(writeFilePath != null) + writeFilePath + } +} diff --git a/gluten-ui/pom.xml b/gluten-ui/pom.xml index 3a359a94cb37..7ccea38cfa37 100644 --- a/gluten-ui/pom.xml +++ b/gluten-ui/pom.xml @@ -31,17 +31,14 @@ com.fasterxml.jackson.core jackson-core - provided com.fasterxml.jackson.core jackson-databind - provided com.fasterxml.jackson.core jackson-annotations - provided diff --git a/gluten-ut/pom.xml b/gluten-ut/pom.xml index c2e4a7dbf504..1c8d49f1508e 100644 --- a/gluten-ut/pom.xml +++ b/gluten-ut/pom.xml @@ -118,22 +118,18 @@ com.fasterxml.jackson.core jackson-databind - test com.fasterxml.jackson.core jackson-annotations - test com.fasterxml.jackson.core jackson-core - test com.fasterxml.jackson.module jackson-module-scala_${scala.binary.version} - test org.scalatestplus diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala index 165d51731302..a34e6c065c79 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala @@ -16,6 +16,67 @@ */ package org.apache.spark.sql.sources +import org.apache.spark.SparkConf +import org.apache.spark.executor.OutputMetrics +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql._ +import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, VeloxColumnarWriteFilesExec} +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.util.QueryExecutionListener -class GlutenInsertSuite extends InsertSuite with GlutenSQLTestsBaseTrait {} +class GlutenInsertSuite extends InsertSuite with GlutenSQLTestsBaseTrait { + + override def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.leafNodeDefaultParallelism", "1") + } + + test("Gluten: insert partition table") { + withTable("pt") { + spark.sql("CREATE TABLE pt (c1 int, c2 string) USING PARQUET PARTITIONED BY (pt string)") + + var taskMetrics: OutputMetrics = null + val taskListener = new SparkListener { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + taskMetrics = taskEnd.taskMetrics.outputMetrics + } + } + + var sqlMetrics: Map[String, SQLMetric] = null + val queryListener = new QueryExecutionListener { + override def onFailure(f: String, qe: QueryExecution, e: Exception): Unit = {} + override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { + qe.executedPlan match { + case dataWritingCommandExec: DataWritingCommandExec => + sqlMetrics = dataWritingCommandExec.cmd.metrics + case _ => + } + } + } + spark.sparkContext.addSparkListener(taskListener) + spark.listenerManager.register(queryListener) + try { + val df = + spark.sql("INSERT INTO TABLE pt partition(pt='a') SELECT * FROM VALUES(1, 'a'),(2, 'b')") + spark.sparkContext.listenerBus.waitUntilEmpty() + val writeFiles = df.queryExecution.executedPlan + .asInstanceOf[CommandResultExec] + .commandPhysicalPlan + .children + .head + assert(writeFiles.isInstanceOf[VeloxColumnarWriteFilesExec]) + + assert(taskMetrics.bytesWritten > 0) + assert(taskMetrics.recordsWritten == 2) + assert(sqlMetrics("numParts").value == 1) + assert(sqlMetrics("numOutputRows").value == 2) + assert(sqlMetrics("numOutputBytes").value > 0) + assert(sqlMetrics("numFiles").value == 1) + + } finally { + spark.sparkContext.removeSparkListener(taskListener) + spark.listenerManager.unregister(queryListener) + } + } + } +} diff --git a/pom.xml b/pom.xml index a225bd234c56..bedb42cc3839 100644 --- a/pom.xml +++ b/pom.xml @@ -484,31 +484,31 @@ com.fasterxml.jackson.core jackson-annotations ${fasterxml.version} - test + provided com.fasterxml.jackson.core jackson-core ${fasterxml.version} - test + provided com.fasterxml.jackson.core jackson-databind ${fasterxml.version} - test + provided com.fasterxml.jackson.datatype jackson-datatype-guava ${fasterxml.version} - test + provided com.fasterxml.jackson.module jackson-module-scala_${scala.binary.version} ${fasterxml.version} - test + provided org.apache.maven.plugins diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala deleted file mode 100644 index e970cdb67d8a..000000000000 --- a/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.hive.execution - -import io.glutenproject.execution.datasource.GlutenOrcWriterInjects -import io.glutenproject.execution.datasource.GlutenParquetWriterInjects - -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.SPECULATION_ENABLED -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory} -import org.apache.spark.sql.execution.datasources.orc.OrcOptions -import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions -import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil} -import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} -import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.SerializableJobConf - -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.hive.ql.exec.Utilities -import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} -import org.apache.hadoop.hive.serde2.Serializer -import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils -import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapred.{JobConf, Reporter} -import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} - -import scala.collection.JavaConverters._ - -/** - * `FileFormat` for writing Hive tables. - * - * TODO: implement the read logic. - */ -class HiveFileFormat(fileSinkConf: FileSinkDesc) - extends FileFormat - with DataSourceRegister - with Logging { - - def this() = this(null) - - override def shortName(): String = "hive" - - override def inferSchema( - sparkSession: SparkSession, - options: Map[String, String], - files: Seq[FileStatus]): Option[StructType] = { - throw QueryExecutionErrors.inferSchemaUnsupportedForHiveError() - } - - override def prepareWrite( - sparkSession: SparkSession, - job: Job, - options: Map[String, String], - dataSchema: StructType): OutputWriterFactory = { - val conf = job.getConfiguration - val tableDesc = fileSinkConf.getTableInfo - conf.set("mapred.output.format.class", tableDesc.getOutputFileFormatClassName) - - // When speculation is on and output committer class name contains "Direct", we should warn - // users that they may loss data if they are using a direct output committer. - val speculationEnabled = sparkSession.sparkContext.conf.get(SPECULATION_ENABLED) - val outputCommitterClass = conf.get("mapred.output.committer.class", "") - if (speculationEnabled && outputCommitterClass.contains("Direct")) { - val warningMessage = - s"$outputCommitterClass may be an output committer that writes data directly to " + - "the final location. Because speculation is enabled, this output committer may " + - "cause data loss (see the case in SPARK-10063). If possible, please use an output " + - "committer that does not have this behavior (e.g. FileOutputCommitter)." - logWarning(warningMessage) - } - - // Add table properties from storage handler to hadoopConf, so any custom storage - // handler settings can be set to hadoopConf - HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, false) - Utilities.copyTableJobPropertiesToConf(tableDesc, conf) - - // Avoid referencing the outer object. - val fileSinkConfSer = fileSinkConf - val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) { - val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat") - val isParquetFormat = nativeFormat.equals("parquet") - val compressionCodec = if (fileSinkConf.compressed) { - // hive related configurations - fileSinkConf.compressCodec - } else if (isParquetFormat) { - val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) - parquetOptions.compressionCodecClassName - } else { - val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) - orcOptions.compressionCodec - } - - val nativeConf = if (isParquetFormat) { - logInfo("Use Gluten parquet write for hive") - GlutenParquetWriterInjects.getInstance().nativeConf(options, compressionCodec) - } else { - logInfo("Use Gluten orc write for hive") - GlutenOrcWriterInjects.getInstance().nativeConf(options, compressionCodec) - } - - new OutputWriterFactory { - private val jobConf = new SerializableJobConf(new JobConf(conf)) - @transient private lazy val outputFormat = - jobConf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]] - - override def getFileExtension(context: TaskAttemptContext): String = { - Utilities.getFileExtension(jobConf.value, fileSinkConfSer.getCompressed, outputFormat) - } - - override def newInstance( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): OutputWriter = { - if (isParquetFormat) { - GlutenParquetWriterInjects - .getInstance() - .createOutputWriter(path, dataSchema, context, nativeConf); - } else { - GlutenOrcWriterInjects - .getInstance() - .createOutputWriter(path, dataSchema, context, nativeConf); - } - } - } - } else { - new OutputWriterFactory { - private val jobConf = new SerializableJobConf(new JobConf(conf)) - @transient private lazy val outputFormat = - jobConf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]] - - override def getFileExtension(context: TaskAttemptContext): String = { - Utilities.getFileExtension(jobConf.value, fileSinkConfSer.getCompressed, outputFormat) - } - - override def newInstance( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): OutputWriter = { - new HiveOutputWriter(path, fileSinkConfSer, jobConf.value, dataSchema) - } - } - } - } - - override def supportFieldName(name: String): Boolean = { - fileSinkConf.getTableInfo.getOutputFileFormatClassName match { - case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" => - !name.matches(".*[ ,;{}()\n\t=].*") - case "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat" => - try { - TypeInfoUtils.getTypeInfoFromTypeString(s"struct<$name:int>") - true - } catch { - case _: IllegalArgumentException => false - } - case _ => true - } - } -} - -class HiveOutputWriter( - val path: String, - fileSinkConf: FileSinkDesc, - jobConf: JobConf, - dataSchema: StructType) - extends OutputWriter - with HiveInspectors { - - private def tableDesc = fileSinkConf.getTableInfo - - private val serializer = { - val serializer = - tableDesc.getDeserializerClass.getConstructor().newInstance().asInstanceOf[Serializer] - serializer.initialize(jobConf, tableDesc.getProperties) - serializer - } - - private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter( - jobConf, - tableDesc, - serializer.getSerializedClass, - fileSinkConf, - new Path(path), - Reporter.NULL) - - /** - * Since SPARK-30201 ObjectInspectorCopyOption.JAVA change to ObjectInspectorCopyOption.DEFAULT. - * The reason is DEFAULT option can convert `UTF8String` to `Text` with bytes and we can - * compatible with non UTF-8 code bytes during write. - */ - private val standardOI = ObjectInspectorUtils - .getStandardObjectInspector( - tableDesc.getDeserializer(jobConf).getObjectInspector, - ObjectInspectorCopyOption.DEFAULT) - .asInstanceOf[StructObjectInspector] - - private val fieldOIs = - standardOI.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray - private val dataTypes = dataSchema.map(_.dataType).toArray - private val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) } - private val outputData = new Array[Any](fieldOIs.length) - - override def write(row: InternalRow): Unit = { - var i = 0 - while (i < fieldOIs.length) { - outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i))) - i += 1 - } - hiveWriter.write(serializer.serialize(outputData, standardOI)) - } - - override def close(): Unit = { - // Seems the boolean value passed into close does not matter. - hiveWriter.close(false) - } -}