From 571be8cb835b5b4bd86e74b1cce0f37ccdac4ff9 Mon Sep 17 00:00:00 2001 From: Xiduo You Date: Wed, 10 Jan 2024 09:09:33 +0800 Subject: [PATCH] [VL] Improve native write files fallback (#4329) --- .../backendsapi/clickhouse/CHBackend.scala | 5 +- .../backendsapi/velox/VeloxBackend.scala | 58 +++++++++++++------ .../SubstraitToVeloxPlanValidator.cc | 3 + .../backendsapi/BackendSettingsApi.scala | 7 ++- .../execution/WriteFilesExecTransformer.scala | 32 +++++++--- .../utils/velox/VeloxTestSettings.scala | 2 - 6 files changed, 77 insertions(+), 30 deletions(-) diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala index 43719f973cc4..464b65ba61b3 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala @@ -25,6 +25,7 @@ import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat._ import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, DenseRank, Lag, Lead, NamedExpression, Rank, RowNumber} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} @@ -273,5 +274,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { override def supportWriteFilesExec( format: FileFormat, - fields: Array[StructField]): Option[String] = None + fields: Array[StructField], + bucketSpec: Option[BucketSpec], + options: Map[String, String]): Option[String] = Some("Unsupported") } diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala index 42e06a204cc9..8de31be680e2 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala @@ -18,11 +18,13 @@ package io.glutenproject.backendsapi.velox import io.glutenproject.{GlutenConfig, GlutenPlugin, VELOX_BRANCH, VELOX_REVISION, VELOX_REVISION_TIME} import io.glutenproject.backendsapi._ +import io.glutenproject.execution.WriteFilesExecTransformer import io.glutenproject.expression.WindowFunctionsBuilder import io.glutenproject.extension.ValidationResult import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat, OrcReadFormat, ParquetReadFormat} +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, Descending, Expression, Literal, NamedExpression, NthValue, PercentRank, Rand, RangeFrame, Rank, RowNumber, SortOrder, SpecialFrameBoundary, SpecifiedWindowFrame} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count, Sum} import org.apache.spark.sql.catalyst.plans.JoinType @@ -109,7 +111,7 @@ object BackendSettings extends BackendSettingsApi { "StructType as element in ArrayType" case StructField(_, arrayType: ArrayType, _, _) if arrayType.elementType.isInstanceOf[ArrayType] => - "A rrayType as element in ArrayType" + "ArrayType as element in ArrayType" case StructField(_, mapType: MapType, _, _) if mapType.keyType.isInstanceOf[StructType] => "StructType as Key in MapType" case StructField(_, mapType: MapType, _, _) @@ -129,11 +131,15 @@ object BackendSettings extends BackendSettingsApi { override def supportWriteFilesExec( format: FileFormat, - fields: Array[StructField]): Option[String] = { + fields: Array[StructField], + bucketSpec: Option[BucketSpec], + options: Map[String, String]): Option[String] = { + def validateCompressionCodec(): Option[String] = { // Velox doesn't support brotli and lzo. val unSupportedCompressions = Set("brotli, lzo") - if (unSupportedCompressions.contains(SQLConf.get.parquetCompressionCodec.toLowerCase())) { + val compressionCodec = WriteFilesExecTransformer.getCompressionCodec(options) + if (unSupportedCompressions.contains(compressionCodec)) { Some("brotli or lzo compression codec is not support in velox backend.") } else { None @@ -141,40 +147,58 @@ object BackendSettings extends BackendSettingsApi { } // Validate if all types are supported. - def validateDateTypes(fields: Array[StructField]): Option[String] = { + def validateDateTypes(): Option[String] = { fields.flatMap { field => field.dataType match { case _: TimestampType => Some("TimestampType") - case struct: StructType => - Some("StructType") - case array: ArrayType => - Some("ArrayType") - case map: MapType => - Some("MapType") + case _: StructType => Some("StructType") + case _: ArrayType => Some("ArrayType") + case _: MapType => Some("MapType") case _ => None } }.headOption } - def validateFieldMetadata(fields: Array[StructField]): Option[String] = { + def validateFieldMetadata(): Option[String] = { if (fields.exists(!_.metadata.equals(Metadata.empty))) { Some("StructField contain the metadata information.") } else None } - def validateFileFormat(format: FileFormat): Option[String] = { + def validateFileFormat(): Option[String] = { format match { case _: ParquetFileFormat => None case _: FileFormat => Some("Only parquet fileformat is supported in native write.") } } - val fileFormat = validateFileFormat(format) - val metadata = validateFieldMetadata(fields) - val dataTypes = validateDateTypes(fields) - val compression = validateCompressionCodec() - compression.orElse(dataTypes).orElse(metadata).orElse(fileFormat) + def validateWriteFilesOptions(): Option[String] = { + val maxRecordsPerFile = options + .get("maxRecordsPerFile") + .map(_.toLong) + .getOrElse(SQLConf.get.maxRecordsPerFile) + if (maxRecordsPerFile > 0) { + Some("Unsupported native write: maxRecordsPerFile not supported.") + } else { + None + } + } + + def validateBucketSpec(): Option[String] = { + if (bucketSpec.nonEmpty) { + Some("Unsupported native write: bucket write is not supported.") + } else { + None + } + } + + validateCompressionCodec() + .orElse(validateFileFormat()) + .orElse(validateFieldMetadata()) + .orElse(validateDateTypes()) + .orElse(validateWriteFilesOptions()) + .orElse(validateBucketSpec()) } override def supportExpandExec(): Boolean = true diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc index 3cdcd945d0ff..82fd79a7a281 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc @@ -361,6 +361,9 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::WriteRel& writeR case TypeKind::VARBINARY: break; default: + logValidateMsg( + "Validation failed for input type validation in WriteRel, not support partition column type: " + + mapTypeKindToName(types[i]->kind())); return false; } } diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala index a79405828dfb..fa1d76a9fa80 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala @@ -21,6 +21,7 @@ import io.glutenproject.extension.ValidationResult import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning @@ -35,7 +36,11 @@ trait BackendSettingsApi { fields: Array[StructField], partTable: Boolean, paths: Seq[String]): ValidationResult = ValidationResult.ok - def supportWriteFilesExec(format: FileFormat, fields: Array[StructField]): Option[String] + def supportWriteFilesExec( + format: FileFormat, + fields: Array[StructField], + bucketSpec: Option[BucketSpec], + options: Map[String, String]): Option[String] def supportExpandExec(): Boolean = false def supportSortExec(): Boolean = false def supportSortMergeJoinExec(): Boolean = true diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala index 8a8d4786dde1..d5182919f963 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WriteFilesExecTransformer.scala @@ -28,11 +28,15 @@ import io.glutenproject.substrait.rel.{RelBuilder, RelNode} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.internal.SQLConf import com.google.protobuf.{Any, StringValue} +import org.apache.parquet.hadoop.ParquetOutputFormat + +import java.util.Locale import scala.collection.JavaConverters._ import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` @@ -58,11 +62,11 @@ case class WriteFilesExecTransformer( override def output: Seq[Attribute] = Seq.empty - def genWriteParameters(): Any = { - val compressionCodec = if (options.get("parquet.compression").nonEmpty) { - options.get("parquet.compression").get.toLowerCase().capitalize - } else SQLConf.get.parquetCompressionCodec.toLowerCase().capitalize + private val caseInsensitiveOptions = CaseInsensitiveMap(options) + def genWriteParameters(): Any = { + val compressionCodec = + WriteFilesExecTransformer.getCompressionCodec(caseInsensitiveOptions).capitalize val writeParametersStr = new StringBuffer("WriteParameters:") writeParametersStr.append("is").append(compressionCodec).append("=1").append("\n") val message = StringValue @@ -131,15 +135,13 @@ case class WriteFilesExecTransformer( val supportedWrite = BackendsApiManager.getSettings.supportWriteFilesExec( fileFormat, - child.output.toStructType.fields) + child.output.toStructType.fields, + bucketSpec, + caseInsensitiveOptions) if (supportedWrite.nonEmpty) { return ValidationResult.notOk("Unsupported native write: " + supportedWrite.get) } - if (bucketSpec.nonEmpty) { - return ValidationResult.notOk("Unsupported native write: bucket write is not supported.") - } - val substraitContext = new SubstraitContext val operatorId = substraitContext.nextOperatorId(this.nodeName) val relNode = getRelNode(substraitContext, child.output, operatorId, null, validation = true) @@ -158,3 +160,15 @@ case class WriteFilesExecTransformer( override protected def withNewChildInternal(newChild: SparkPlan): WriteFilesExecTransformer = copy(child = newChild) } + +object WriteFilesExecTransformer { + def getCompressionCodec(options: Map[String, String]): String = { + // From `ParquetOptions` + val parquetCompressionConf = options.get(ParquetOutputFormat.COMPRESSION) + options + .get("compression") + .orElse(parquetCompressionConf) + .getOrElse(SQLConf.get.parquetCompressionCodec) + .toLowerCase(Locale.ROOT) + } +} diff --git a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index f02a04fca3e1..d7672443aa41 100644 --- a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -942,8 +942,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-39557 INSERT INTO statements with tables with struct defaults") .exclude("SPARK-39557 INSERT INTO statements with tables with map defaults") enableSuite[GlutenPartitionedWriteSuite] - // Velox doesn't support maxRecordsPerFile parameter. - .exclude("maxRecordsPerFile setting in non-partitioned write path") enableSuite[GlutenPathOptionSuite] enableSuite[GlutenPrunedScanSuite] enableSuite[GlutenResolvedDataSourceSuite]