From 1ebe7fd4345d453b4cddfd3f0868a1b8b9746d61 Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Mon, 29 Jul 2024 22:22:02 +0800 Subject: [PATCH 1/5] Initial --- .../spark/sql/delta/ClickhouseOptimisticTransaction.scala | 2 +- .../spark/sql/delta/ClickhouseOptimisticTransaction.scala | 2 +- .../spark/sql/delta/ClickhouseOptimisticTransaction.scala | 2 +- .../execution/datasources/GlutenWriterColumnarRules.scala | 4 ++-- .../spark/sql/execution/datasources/FileFormatWriter.scala | 2 +- .../spark/sql/execution/datasources/orc/OrcFileFormat.scala | 4 ++-- .../execution/datasources/parquet/ParquetFileFormat.scala | 6 +++--- .../apache/spark/sql/hive/execution/HiveFileFormat.scala | 2 +- .../spark/sql/execution/datasources/FileFormatWriter.scala | 2 +- .../spark/sql/execution/datasources/orc/OrcFileFormat.scala | 4 ++-- .../execution/datasources/parquet/ParquetFileFormat.scala | 6 +++--- .../apache/spark/sql/hive/execution/HiveFileFormat.scala | 2 +- 12 files changed, 19 insertions(+), 19 deletions(-) diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index 4133b5c605b8..3314465c5022 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -177,7 +177,7 @@ class ClickhouseOptimisticTransaction( // 1. insert FakeRowAdaptor // 2. DeltaInvariantCheckerExec transform // 3. DeltaTaskStatisticsTracker collect null count / min values / max values - // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable', + // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable', // 'nativeFormat' in the LocalProperty of the sparkcontext super.writeFiles(inputData, writeOptions, additionalConstraints) } diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index 4133b5c605b8..3314465c5022 100644 --- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -177,7 +177,7 @@ class ClickhouseOptimisticTransaction( // 1. insert FakeRowAdaptor // 2. DeltaInvariantCheckerExec transform // 3. DeltaTaskStatisticsTracker collect null count / min values / max values - // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable', + // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable', // 'nativeFormat' in the LocalProperty of the sparkcontext super.writeFiles(inputData, writeOptions, additionalConstraints) } diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index 9e79c4f2e984..6eec68efece3 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -185,7 +185,7 @@ class ClickhouseOptimisticTransaction( // 1. insert FakeRowAdaptor // 2. DeltaInvariantCheckerExec transform // 3. DeltaTaskStatisticsTracker collect null count / min values / max values - // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable', + // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable', // 'nativeFormat' in the LocalProperty of the sparkcontext super.writeFiles(inputData, writeOptions, additionalConstraints) } diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala index 7063c3f67b80..fdc23a20cab4 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala @@ -170,7 +170,7 @@ object GlutenWriterColumnarRules { BackendsApiManager.getSettings.staticPartitionWriteOnly().toString) // FIXME: We should only use context property if having no other approaches. // Should see if there is another way to pass these options. - session.sparkContext.setLocalProperty("isNativeAppliable", format.isDefined.toString) + session.sparkContext.setLocalProperty("isNativeApplicable", format.isDefined.toString) session.sparkContext.setLocalProperty("nativeFormat", format.getOrElse("")) if (format.isDefined) { injectFakeRowAdaptor(rc, child) @@ -181,7 +181,7 @@ object GlutenWriterColumnarRules { session.sparkContext.setLocalProperty( "staticPartitionWriteOnly", BackendsApiManager.getSettings.staticPartitionWriteOnly().toString) - session.sparkContext.setLocalProperty("isNativeAppliable", "false") + session.sparkContext.setLocalProperty("isNativeApplicable", "false") session.sparkContext.setLocalProperty("nativeFormat", "") rc.withNewChildren(rc.children.map(apply)) diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index a5c857103910..9fb0d4383a22 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -148,7 +148,7 @@ object FileFormatWriter extends Logging { numStaticPartitionCols: Int = 0): Set[String] = { val nativeEnabled = - "true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) + "true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) val staticPartitionWriteOnly = "true".equals(sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")) diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 34873c46b09e..0b328f433b16 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -83,7 +83,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { // Why if (false)? Such code requires comments when being written. - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) && false) { + if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) && false) { GlutenOrcWriterInjects .getInstance() .inferSchema(sparkSession, Map.empty[String, String], files) @@ -109,7 +109,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable .asInstanceOf[JobConf] .setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]]) - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) { + if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable"))) { // pass compression to job conf so that the file extension can be aware of it. val nativeConf = GlutenOrcWriterInjects diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index c6b383136590..83d5362b59cc 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -83,7 +83,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) { + if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable"))) { // pass compression to job conf so that the file extension can be aware of it. val conf = ContextUtil.getConfiguration(job) @@ -201,7 +201,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging parameters: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { // Why if (false)? Such code requires comments when being written. - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) && false) { + if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) && false) { GlutenParquetWriterInjects.getInstance().inferSchema(sparkSession, parameters, files) } else { // the vanilla spark case ParquetUtils.inferSchema(sparkSession, parameters, files) @@ -210,7 +210,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging /** Returns whether the reader will return the rows as batch or not. */ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) { + if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable"))) { true } else { val conf = sparkSession.sessionState.conf diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index 162dd342bcf0..d524671b2f04 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -100,7 +100,7 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) // Avoid referencing the outer object. val fileSinkConfSer = fileSinkConf val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) { + if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable"))) { val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat") val isParquetFormat = nativeFormat.equals("parquet") val compressionCodec = if (fileSinkConf.compressed) { diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index ebf45e76e74e..93965b6e227e 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -140,7 +140,7 @@ object FileFormatWriter extends Logging { numStaticPartitionCols: Int = 0): Set[String] = { val nativeEnabled = - "true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) + "true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) val staticPartitionWriteOnly = "true".equals(sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")) diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 49ac28d73322..5bf21916a0c5 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -66,7 +66,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { // Why if (false)? Such code requires comments when being written. - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) && false) { + if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) && false) { GlutenOrcWriterInjects.getInstance().inferSchema(sparkSession, options, files) } else { // the vanilla spark case OrcUtils.inferSchema(sparkSession, files, options) @@ -88,7 +88,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable .asInstanceOf[JobConf] .setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]]) - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) { + if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable"))) { // pass compression to job conf so that the file extension can be aware of it. val nativeConf = GlutenOrcWriterInjects diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index b0573f68e46d..49348feaca66 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -75,7 +75,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) { + if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable"))) { // pass compression to job conf so that the file extension can be aware of it. val conf = ContextUtil.getConfiguration(job) @@ -197,7 +197,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging parameters: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { // Why if (false)? Such code requires comments when being written. - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) && false) { + if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) && false) { GlutenParquetWriterInjects.getInstance().inferSchema(sparkSession, parameters, files) } else { // the vanilla spark case ParquetUtils.inferSchema(sparkSession, parameters, files) @@ -206,7 +206,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging /** Returns whether the reader will return the rows as batch or not. */ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) { + if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable"))) { true } else { val conf = sparkSession.sessionState.conf diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index 7a824c43670d..5d5a44299e73 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -97,7 +97,7 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) // Avoid referencing the outer object. val fileSinkConfSer = fileSinkConf val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) { + if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable"))) { val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat") val isParquetFormat = nativeFormat.equals("parquet") val compressionCodec = if (fileSinkConf.compressed) { From 310a02c17585fc5ffdf4d8fe019fa4a1fb087847 Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Tue, 30 Jul 2024 07:43:12 +0800 Subject: [PATCH 2/5] Use == for null safe comparing --- .../datasources/GlutenWriterColumnarRules.scala | 2 +- .../execution/datasources/FileFormatWriter.scala | 4 ++-- .../datasources/orc/OrcFileFormat.scala | 4 ++-- .../datasources/parquet/ParquetFileFormat.scala | 16 ++++++---------- .../sql/hive/execution/HiveFileFormat.scala | 2 +- .../execution/datasources/FileFormatWriter.scala | 2 +- .../datasources/orc/OrcFileFormat.scala | 4 ++-- .../datasources/parquet/ParquetFileFormat.scala | 14 +++++--------- .../sql/hive/execution/HiveFileFormat.scala | 2 +- 9 files changed, 21 insertions(+), 29 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala index fdc23a20cab4..889f9de7308e 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala @@ -181,7 +181,7 @@ object GlutenWriterColumnarRules { session.sparkContext.setLocalProperty( "staticPartitionWriteOnly", BackendsApiManager.getSettings.staticPartitionWriteOnly().toString) - session.sparkContext.setLocalProperty("isNativeApplicable", "false") + session.sparkContext.setLocalProperty("isNativeApplicable", null) session.sparkContext.setLocalProperty("nativeFormat", "") rc.withNewChildren(rc.children.map(apply)) diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 9fb0d4383a22..bcb08c89c08c 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -148,9 +148,9 @@ object FileFormatWriter extends Logging { numStaticPartitionCols: Int = 0): Set[String] = { val nativeEnabled = - "true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) + "true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") val staticPartitionWriteOnly = - "true".equals(sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")) + "true" == sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly") if (nativeEnabled) { logInfo("Use Gluten partition write for hive") diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 0b328f433b16..619fa64ace6d 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -83,7 +83,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { // Why if (false)? Such code requires comments when being written. - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) && false) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) { GlutenOrcWriterInjects .getInstance() .inferSchema(sparkSession, Map.empty[String, String], files) @@ -109,7 +109,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable .asInstanceOf[JobConf] .setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]]) - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable"))) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) { // pass compression to job conf so that the file extension can be aware of it. val nativeConf = GlutenOrcWriterInjects diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 83d5362b59cc..42a63c7ebcd1 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -83,7 +83,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable"))) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) { // pass compression to job conf so that the file extension can be aware of it. val conf = ContextUtil.getConfiguration(job) @@ -201,7 +201,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging parameters: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { // Why if (false)? Such code requires comments when being written. - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) && false) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) { GlutenParquetWriterInjects.getInstance().inferSchema(sparkSession, parameters, files) } else { // the vanilla spark case ParquetUtils.inferSchema(sparkSession, parameters, files) @@ -210,14 +210,10 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging /** Returns whether the reader will return the rows as batch or not. */ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable"))) { - true - } else { - val conf = sparkSession.sessionState.conf - conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled && - schema.length <= conf.wholeStageMaxNumFields && - schema.forall(_.dataType.isInstanceOf[AtomicType]) - } + val conf = sparkSession.sessionState.conf + conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled && + schema.length <= conf.wholeStageMaxNumFields && + schema.forall(_.dataType.isInstanceOf[AtomicType]) } override def vectorTypes( diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index d524671b2f04..24743abdf96e 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -100,7 +100,7 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) // Avoid referencing the outer object. val fileSinkConfSer = fileSinkConf val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable"))) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) { val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat") val isParquetFormat = nativeFormat.equals("parquet") val compressionCodec = if (fileSinkConf.compressed) { diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 93965b6e227e..66db925cbada 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -140,7 +140,7 @@ object FileFormatWriter extends Logging { numStaticPartitionCols: Int = 0): Set[String] = { val nativeEnabled = - "true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) + "true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") val staticPartitionWriteOnly = "true".equals(sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")) diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 5bf21916a0c5..9891f6851d00 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -66,7 +66,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { // Why if (false)? Such code requires comments when being written. - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) && false) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) { GlutenOrcWriterInjects.getInstance().inferSchema(sparkSession, options, files) } else { // the vanilla spark case OrcUtils.inferSchema(sparkSession, files, options) @@ -88,7 +88,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable .asInstanceOf[JobConf] .setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]]) - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable"))) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) { // pass compression to job conf so that the file extension can be aware of it. val nativeConf = GlutenOrcWriterInjects diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 49348feaca66..403e31c1cb30 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -75,7 +75,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable"))) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) { // pass compression to job conf so that the file extension can be aware of it. val conf = ContextUtil.getConfiguration(job) @@ -197,7 +197,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging parameters: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { // Why if (false)? Such code requires comments when being written. - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) && false) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) { GlutenParquetWriterInjects.getInstance().inferSchema(sparkSession, parameters, files) } else { // the vanilla spark case ParquetUtils.inferSchema(sparkSession, parameters, files) @@ -206,13 +206,9 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging /** Returns whether the reader will return the rows as batch or not. */ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable"))) { - true - } else { - val conf = sparkSession.sessionState.conf - ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && conf.wholeStageEnabled && - !WholeStageCodegenExec.isTooManyFields(conf, schema) - } + val conf = sparkSession.sessionState.conf + ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && conf.wholeStageEnabled && + !WholeStageCodegenExec.isTooManyFields(conf, schema) } override def vectorTypes( diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index 5d5a44299e73..e1f071a98fa7 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -97,7 +97,7 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) // Avoid referencing the outer object. val fileSinkConfSer = fileSinkConf val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName - if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeApplicable"))) { + if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) { val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat") val isParquetFormat = nativeFormat.equals("parquet") val compressionCodec = if (fileSinkConf.compressed) { From 8be4756a575c8921c1f6808de97b2ac4871dc288 Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Tue, 30 Jul 2024 09:52:09 +0800 Subject: [PATCH 3/5] Reset local properties --- .../sql/execution/datasources/GlutenWriterColumnarRules.scala | 4 ++++ .../spark/sql/execution/datasources/FileFormatWriter.scala | 2 +- .../org/apache/spark/sql/hive/execution/HiveFileFormat.scala | 2 +- .../spark/sql/execution/datasources/FileFormatWriter.scala | 4 ++-- .../org/apache/spark/sql/hive/execution/HiveFileFormat.scala | 2 +- 5 files changed, 9 insertions(+), 5 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala index 889f9de7308e..a70f158cb968 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala @@ -163,6 +163,10 @@ object GlutenWriterColumnarRules { BackendsApiManager.getSettings.enableNativeWriteFiles() => injectFakeRowAdaptor(rc, rc.child) case rc @ DataWritingCommandExec(cmd, child) => + // These properties can be set by the same thread in last query submission. + session.sparkContext.setLocalProperty("isNativeApplicable", null) + session.sparkContext.setLocalProperty("nativeFormat", null) + session.sparkContext.setLocalProperty("staticPartitionWriteOnly", null) if (BackendsApiManager.getSettings.supportNativeWrite(child.output.toStructType.fields)) { val format = getNativeFormat(cmd) session.sparkContext.setLocalProperty( diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index bcb08c89c08c..96a044c0cbbe 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -257,7 +257,7 @@ object FileFormatWriter extends Logging { } val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat") - if ("parquet".equals(nativeFormat)) { + if ("parquet" == nativeFormat) { (GlutenParquetWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), None) } else { (GlutenOrcWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), None) diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index 24743abdf96e..eb0f6a5d97df 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -102,7 +102,7 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) { val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat") - val isParquetFormat = nativeFormat.equals("parquet") + val isParquetFormat = nativeFormat == "parquet" val compressionCodec = if (fileSinkConf.compressed) { // hive related configurations fileSinkConf.compressCodec diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 66db925cbada..f5e932337c02 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -142,7 +142,7 @@ object FileFormatWriter extends Logging { val nativeEnabled = "true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") val staticPartitionWriteOnly = - "true".equals(sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")) + "true" == sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly") if (nativeEnabled) { logInfo("Use Gluten partition write for hive") @@ -277,7 +277,7 @@ object FileFormatWriter extends Logging { } val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat") - if ("parquet".equals(nativeFormat)) { + if ("parquet" == nativeFormat) { (GlutenParquetWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), None) } else { (GlutenOrcWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), None) diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index e1f071a98fa7..b9c1622cbee5 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -99,7 +99,7 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) { val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat") - val isParquetFormat = nativeFormat.equals("parquet") + val isParquetFormat = nativeFormat == "parquet" val compressionCodec = if (fileSinkConf.compressed) { // hive related configurations fileSinkConf.compressCodec From fa2e4c8d20f8ca62f0a4440a0f4064d9d2eb90b5 Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Tue, 30 Jul 2024 10:19:13 +0800 Subject: [PATCH 4/5] Revert a change --- .../sql/execution/datasources/GlutenWriterColumnarRules.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala index a70f158cb968..531d7ad897f3 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala @@ -185,7 +185,7 @@ object GlutenWriterColumnarRules { session.sparkContext.setLocalProperty( "staticPartitionWriteOnly", BackendsApiManager.getSettings.staticPartitionWriteOnly().toString) - session.sparkContext.setLocalProperty("isNativeApplicable", null) + session.sparkContext.setLocalProperty("isNativeApplicable", "false") session.sparkContext.setLocalProperty("nativeFormat", "") rc.withNewChildren(rc.children.map(apply)) From df5dae9bbdbda4a9afc1dadf392d483f5ae7e597 Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Tue, 30 Jul 2024 10:50:27 +0800 Subject: [PATCH 5/5] Remove setting in else branch --- .../execution/datasources/GlutenWriterColumnarRules.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala index 531d7ad897f3..20b00601531f 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala @@ -182,12 +182,6 @@ object GlutenWriterColumnarRules { rc.withNewChildren(rc.children.map(apply)) } } else { - session.sparkContext.setLocalProperty( - "staticPartitionWriteOnly", - BackendsApiManager.getSettings.staticPartitionWriteOnly().toString) - session.sparkContext.setLocalProperty("isNativeApplicable", "false") - session.sparkContext.setLocalProperty("nativeFormat", "") - rc.withNewChildren(rc.children.map(apply)) } case plan: SparkPlan => plan.withNewChildren(plan.children.map(apply))