Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6612] Fix ParquetFileFormat issue caused by the setting of local property isNativeApplicable #6627

Merged
merged 5 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class ClickhouseOptimisticTransaction(
// 1. insert FakeRowAdaptor
// 2. DeltaInvariantCheckerExec transform
// 3. DeltaTaskStatisticsTracker collect null count / min values / max values
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable',
// 'nativeFormat' in the LocalProperty of the sparkcontext
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class ClickhouseOptimisticTransaction(
// 1. insert FakeRowAdaptor
// 2. DeltaInvariantCheckerExec transform
// 3. DeltaTaskStatisticsTracker collect null count / min values / max values
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable',
// 'nativeFormat' in the LocalProperty of the sparkcontext
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class ClickhouseOptimisticTransaction(
// 1. insert FakeRowAdaptor
// 2. DeltaInvariantCheckerExec transform
// 3. DeltaTaskStatisticsTracker collect null count / min values / max values
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable',
// 'nativeFormat' in the LocalProperty of the sparkcontext
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,27 +163,25 @@ object GlutenWriterColumnarRules {
BackendsApiManager.getSettings.enableNativeWriteFiles() =>
injectFakeRowAdaptor(rc, rc.child)
case rc @ DataWritingCommandExec(cmd, child) =>
// These properties can be set by the same thread in last query submission.
session.sparkContext.setLocalProperty("isNativeApplicable", null)
session.sparkContext.setLocalProperty("nativeFormat", null)
session.sparkContext.setLocalProperty("staticPartitionWriteOnly", null)
if (BackendsApiManager.getSettings.supportNativeWrite(child.output.toStructType.fields)) {
val format = getNativeFormat(cmd)
session.sparkContext.setLocalProperty(
"staticPartitionWriteOnly",
BackendsApiManager.getSettings.staticPartitionWriteOnly().toString)
// FIXME: We should only use context property if having no other approaches.
// Should see if there is another way to pass these options.
session.sparkContext.setLocalProperty("isNativeAppliable", format.isDefined.toString)
session.sparkContext.setLocalProperty("isNativeApplicable", format.isDefined.toString)
session.sparkContext.setLocalProperty("nativeFormat", format.getOrElse(""))
if (format.isDefined) {
injectFakeRowAdaptor(rc, child)
} else {
rc.withNewChildren(rc.children.map(apply))
}
} else {
session.sparkContext.setLocalProperty(
"staticPartitionWriteOnly",
BackendsApiManager.getSettings.staticPartitionWriteOnly().toString)
session.sparkContext.setLocalProperty("isNativeAppliable", "false")
session.sparkContext.setLocalProperty("nativeFormat", "")

rc.withNewChildren(rc.children.map(apply))
}
case plan: SparkPlan => plan.withNewChildren(plan.children.map(apply))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ object FileFormatWriter extends Logging {
numStaticPartitionCols: Int = 0): Set[String] = {

val nativeEnabled =
"true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))
"true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")
val staticPartitionWriteOnly =
"true".equals(sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly"))
"true" == sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")

if (nativeEnabled) {
logInfo("Use Gluten partition write for hive")
Expand Down Expand Up @@ -257,7 +257,7 @@ object FileFormatWriter extends Logging {
}

val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat")
if ("parquet".equals(nativeFormat)) {
if ("parquet" == nativeFormat) {
(GlutenParquetWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), None)
} else {
(GlutenOrcWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
// Why if (false)? Such code requires comments when being written.
if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) && false) {
if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be more straight forward to use sparkSession.sparkContext.getLocalProperty("isNativeApplicable").toBoolean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhztheplayer, the property can be null and needs extra check if we use that. Maybe, just keep the current change.

I am still a little bit concerned about the use of these local properties. Perhaps we can manage to remove them later some time.

Yes, it's not a good approach to use local property.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use true == xxx? why do we use string "true", is it from gluten config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FelixYBW, it's the raw value we obtained for this property.
The property value is kept by spark context and it is set by Gluten in GlutenWriterColumnarRules.

GlutenOrcWriterInjects
.getInstance()
.inferSchema(sparkSession, Map.empty[String, String], files)
Expand All @@ -109,7 +109,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
.asInstanceOf[JobConf]
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])

if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) {
if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
// pass compression to job conf so that the file extension can be aware of it.
val nativeConf =
GlutenOrcWriterInjects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) {
if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {

// pass compression to job conf so that the file extension can be aware of it.
val conf = ContextUtil.getConfiguration(job)
Expand Down Expand Up @@ -201,7 +201,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
parameters: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
// Why if (false)? Such code requires comments when being written.
if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) && false) {
if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) {
GlutenParquetWriterInjects.getInstance().inferSchema(sparkSession, parameters, files)
} else { // the vanilla spark case
ParquetUtils.inferSchema(sparkSession, parameters, files)
Expand All @@ -210,14 +210,10 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging

/** Returns whether the reader will return the rows as batch or not. */
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JkSelf, do you know why this check is added in the main code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PHILO-HE No need to add this check here. It is only used in parquet reader not writer.

true
} else {
val conf = sparkSession.sessionState.conf
conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled &&
schema.length <= conf.wholeStageMaxNumFields &&
schema.forall(_.dataType.isInstanceOf[AtomicType])
}
val conf = sparkSession.sessionState.conf
conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled &&
schema.length <= conf.wholeStageMaxNumFields &&
schema.forall(_.dataType.isInstanceOf[AtomicType])
}

override def vectorTypes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
// Avoid referencing the outer object.
val fileSinkConfSer = fileSinkConf
val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName
if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) {
if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat")
val isParquetFormat = nativeFormat.equals("parquet")
val isParquetFormat = nativeFormat == "parquet"
val compressionCodec = if (fileSinkConf.compressed) {
// hive related configurations
fileSinkConf.compressCodec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ object FileFormatWriter extends Logging {
numStaticPartitionCols: Int = 0): Set[String] = {

val nativeEnabled =
"true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))
"true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")
val staticPartitionWriteOnly =
"true".equals(sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly"))
"true" == sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")

if (nativeEnabled) {
logInfo("Use Gluten partition write for hive")
Expand Down Expand Up @@ -277,7 +277,7 @@ object FileFormatWriter extends Logging {
}

val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat")
if ("parquet".equals(nativeFormat)) {
if ("parquet" == nativeFormat) {
(GlutenParquetWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), None)
} else {
(GlutenOrcWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
// Why if (false)? Such code requires comments when being written.
if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) && false) {
if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) {
GlutenOrcWriterInjects.getInstance().inferSchema(sparkSession, options, files)
} else { // the vanilla spark case
OrcUtils.inferSchema(sparkSession, files, options)
Expand All @@ -88,7 +88,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
.asInstanceOf[JobConf]
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])

if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) {
if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
// pass compression to job conf so that the file extension can be aware of it.
val nativeConf =
GlutenOrcWriterInjects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) {
if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {

// pass compression to job conf so that the file extension can be aware of it.
val conf = ContextUtil.getConfiguration(job)
Expand Down Expand Up @@ -197,7 +197,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
parameters: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
// Why if (false)? Such code requires comments when being written.
if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) && false) {
if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) {
GlutenParquetWriterInjects.getInstance().inferSchema(sparkSession, parameters, files)
} else { // the vanilla spark case
ParquetUtils.inferSchema(sparkSession, parameters, files)
Expand All @@ -206,13 +206,9 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging

/** Returns whether the reader will return the rows as batch or not. */
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) {
true
} else {
val conf = sparkSession.sessionState.conf
ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && conf.wholeStageEnabled &&
!WholeStageCodegenExec.isTooManyFields(conf, schema)
}
val conf = sparkSession.sessionState.conf
ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && conf.wholeStageEnabled &&
!WholeStageCodegenExec.isTooManyFields(conf, schema)
}

override def vectorTypes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
// Avoid referencing the outer object.
val fileSinkConfSer = fileSinkConf
val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName
if ("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))) {
if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat")
val isParquetFormat = nativeFormat.equals("parquet")
val isParquetFormat = nativeFormat == "parquet"
val compressionCodec = if (fileSinkConf.compressed) {
// hive related configurations
fileSinkConf.compressCodec
Expand Down
Loading