Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
fixup

fixup

fixup

fixup

fixup

fixup
  • Loading branch information
zhztheplayer committed Jul 3, 2024
1 parent 80bb848 commit 917653a
Show file tree
Hide file tree
Showing 47 changed files with 223 additions and 230 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,20 +172,20 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
format match {
case ParquetReadFormat =>
if (validateFilePath) {
ValidationResult.ok
ValidationResult.succeeded
} else {
ValidationResult.notOk("Validate file path failed.")
ValidationResult.failed("Validate file path failed.")
}
case OrcReadFormat => ValidationResult.ok
case MergeTreeReadFormat => ValidationResult.ok
case OrcReadFormat => ValidationResult.succeeded
case MergeTreeReadFormat => ValidationResult.succeeded
case TextReadFormat =>
if (!hasComplexType) {
ValidationResult.ok
ValidationResult.succeeded
} else {
ValidationResult.notOk("Has complex type.")
ValidationResult.failed("Has complex type.")
}
case JsonReadFormat => ValidationResult.ok
case _ => ValidationResult.notOk(s"Unsupported file format $format")
case JsonReadFormat => ValidationResult.succeeded
case _ => ValidationResult.failed(s"Unsupported file format $format")
}
}

Expand Down Expand Up @@ -290,7 +290,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
options: Map[String, String]): ValidationResult =
ValidationResult.notOk("CH backend is unsupported.")
ValidationResult.failed("CH backend is unsupported.")

override def enableNativeWriteFiles(): Boolean = {
GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ case class CHGenerateExecTransformer(
override protected def doGeneratorValidate(
generator: Generator,
outer: Boolean): ValidationResult =
ValidationResult.ok
ValidationResult.succeeded

override protected def getRelNode(
context: SubstraitContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ case class CHShuffledHashJoinExecTransformer(
right.outputSet,
condition)
if (shouldFallback) {
return ValidationResult.notOk("ch join validate fail")
return ValidationResult.failed("ch join validate fail")
}
super.doValidateInternal()
}
Expand Down Expand Up @@ -118,10 +118,10 @@ case class CHBroadcastHashJoinExecTransformer(
condition)

if (shouldFallback) {
return ValidationResult.notOk("ch join validate fail")
return ValidationResult.failed("ch join validate fail")
}
if (isNullAwareAntiJoin) {
return ValidationResult.notOk("ch does not support NAAJ")
return ValidationResult.failed("ch does not support NAAJ")
}
super.doValidateInternal()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ case class CHSortMergeJoinExecTransformer(
right.outputSet,
condition)
if (shouldFallback) {
return ValidationResult.notOk("ch join validate fail")
return ValidationResult.failed("ch join validate fail")
}
super.doValidateInternal()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend
!columnarConf.enableColumnarBroadcastExchange ||
!columnarConf.enableColumnarBroadcastJoin
) {
ValidationResult.notOk(
ValidationResult.failed(
"columnar broadcast exchange is disabled or " +
"columnar broadcast join is disabled")
} else {
if (FallbackTags.nonEmpty(bhj)) {
ValidationResult.notOk("broadcast join is already tagged as not transformable")
ValidationResult.failed("broadcast join is already tagged as not transformable")
} else {
val bhjTransformer = BackendsApiManager.getSparkPlanExecApiInstance
.genBroadcastHashJoinExecTransformer(
Expand All @@ -75,7 +75,7 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend
bhj.right,
bhj.isNullAwareAntiJoin)
val isBhjTransformable = bhjTransformer.doValidate()
if (isBhjTransformable.isValid) {
if (isBhjTransformable.ok()) {
val exchangeTransformer = ColumnarBroadcastExchangeExec(mode, child)
exchangeTransformer.doValidate()
} else {
Expand Down Expand Up @@ -148,7 +148,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl
maybeExchange match {
case Some(exchange @ BroadcastExchangeExec(mode, child)) =>
isBhjTransformable.tagOnFallback(bhj)
if (!isBhjTransformable.isValid) {
if (!isBhjTransformable.ok()) {
FallbackTags.add(exchange, isBhjTransformable)
}
case None =>
Expand Down Expand Up @@ -186,7 +186,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl
bhj,
"it's a materialized broadcast exchange or reused broadcast exchange")
case ColumnarBroadcastExchangeExec(mode, child) =>
if (!isBhjTransformable.isValid) {
if (!isBhjTransformable.ok()) {
throw new IllegalStateException(
s"BroadcastExchange has already been" +
s" transformed to columnar version but BHJ is determined as" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ case class CHColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExecBase(c
s"${field.dataType} is not supported in ColumnarToRowExecBase.")
}
}
ValidationResult.ok
ValidationResult.succeeded
}

override def doExecuteInternal(): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ object VeloxBackendSettings extends BackendSettingsApi {
// Collect unsupported types.
val unsupportedDataTypeReason = fields.collect(validatorFunc)
if (unsupportedDataTypeReason.isEmpty) {
ValidationResult.ok
ValidationResult.succeeded
} else {
ValidationResult.notOk(
ValidationResult.failed(
s"Found unsupported data type in $format: ${unsupportedDataTypeReason.mkString(", ")}.")
}
}
Expand Down Expand Up @@ -135,10 +135,10 @@ object VeloxBackendSettings extends BackendSettingsApi {
} else {
validateTypes(parquetTypeValidatorWithComplexTypeFallback)
}
case DwrfReadFormat => ValidationResult.ok
case DwrfReadFormat => ValidationResult.succeeded
case OrcReadFormat =>
if (!GlutenConfig.getConf.veloxOrcScanEnabled) {
ValidationResult.notOk(s"Velox ORC scan is turned off.")
ValidationResult.failed(s"Velox ORC scan is turned off.")
} else {
val typeValidator: PartialFunction[StructField, String] = {
case StructField(_, arrayType: ArrayType, _, _)
Expand All @@ -164,7 +164,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
validateTypes(orcTypeValidatorWithComplexTypeFallback)
}
}
case _ => ValidationResult.notOk(s"Unsupported file format for $format.")
case _ => ValidationResult.failed(s"Unsupported file format for $format.")
}
}

Expand Down Expand Up @@ -284,8 +284,8 @@ object VeloxBackendSettings extends BackendSettingsApi {
.orElse(validateDataTypes())
.orElse(validateWriteFilesOptions())
.orElse(validateBucketSpec()) match {
case Some(reason) => ValidationResult.notOk(reason)
case _ => ValidationResult.ok
case Some(reason) => ValidationResult.failed(reason)
case _ => ValidationResult.succeeded
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ child.output
val projectTransformer = ProjectExecTransformer(projectList, child)
val validationResult = projectTransformer.doValidate()
if (validationResult.isValid) {
if (validationResult.ok()) {
val newChild = maybeAddAppendBatchesExec(projectTransformer)
ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output.drop(1))
} else {
Expand All @@ -393,7 +393,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
val projectTransformer = ProjectExecTransformer(projectList, child)
val projectBeforeSortValidationResult = projectTransformer.doValidate()
// Make sure we support offload hash expression
val projectBeforeSort = if (projectBeforeSortValidationResult.isValid) {
val projectBeforeSort = if (projectBeforeSortValidationResult.ok()) {
projectTransformer
} else {
val project = ProjectExec(projectList, child)
Expand All @@ -406,7 +406,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
val dropSortColumnTransformer =
ProjectExecTransformer(projectList.drop(1), sortByHashCode)
val validationResult = dropSortColumnTransformer.doValidate()
if (validationResult.isValid) {
if (validationResult.ok()) {
val newChild = maybeAddAppendBatchesExec(dropSortColumnTransformer)
ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output)
} else {
Expand Down Expand Up @@ -891,7 +891,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
case p @ LimitTransformer(SortExecTransformer(sortOrder, _, child, _), 0, count) =>
val global = child.outputPartitioning.satisfies(AllTuples)
val topN = TopNTransformer(count, sortOrder, global, child)
if (topN.doValidate().isValid) {
if (topN.doValidate().ok()) {
topN
} else {
p
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ case class GenerateExecTransformer(
generator: Generator,
outer: Boolean): ValidationResult = {
if (!supportsGenerate(generator, outer)) {
ValidationResult.notOk(
ValidationResult.failed(
s"Velox backend does not support this generator: ${generator.getClass.getSimpleName}" +
s", outer: $outer")
} else {
ValidationResult.ok
ValidationResult.succeeded
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class VeloxColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExecBas
s"VeloxColumnarToRowExec.")
}
}
ValidationResult.ok
ValidationResult.succeeded
}

override def doExecuteInternal(): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.gluten.validate;

import org.apache.gluten.extension.ValidationResult;

import java.util.Vector;

public class NativePlanValidationInfo {
Expand All @@ -30,11 +32,13 @@ public NativePlanValidationInfo(int isSupported, String fallbackInfo) {
}
}

public boolean isSupported() {
return isSupported == 1;
}

public Vector<String> getFallbackInfo() {
return fallbackInfo;
public ValidationResult asResult() {
if (isSupported == 1) {
return ValidationResult.succeeded();
}
return ValidationResult.failed(
String.format(
"Native validation failed: %n%s",
fallbackInfo.stream().reduce((l, r) -> l + "\n" + r)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ trait BackendSettingsApi {
format: ReadFileFormat,
fields: Array[StructField],
partTable: Boolean,
paths: Seq[String]): ValidationResult = ValidationResult.ok
paths: Seq[String]): ValidationResult = ValidationResult.succeeded
def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
options: Map[String, String]): ValidationResult = ValidationResult.ok
options: Map[String, String]): ValidationResult = ValidationResult.succeeded
def supportNativeWrite(fields: Array[StructField]): Boolean = true
def supportNativeMetadataColumns(): Boolean = false
def supportNativeRowIndexColumn(): Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP
if (remainingCondition == null) {
// All the filters can be pushed down and the computing of this Filter
// is not needed.
return ValidationResult.ok
return ValidationResult.succeeded
}
val substraitContext = new SubstraitContext
val operatorId = substraitContext.nextOperatorId(this.nodeName)
Expand Down Expand Up @@ -316,9 +316,10 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan with Gl
BackendsApiManager.getValidatorApiInstance
.doSchemaValidate(schema)
.map {
reason => ValidationResult.notOk(s"Found schema check failure for $schema, due to: $reason")
reason =>
ValidationResult.failed(s"Found schema check failure for $schema, due to: $reason")
}
.getOrElse(ValidationResult.ok)
.getOrElse(ValidationResult.succeeded)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource

val validationResult = BackendsApiManager.getSettings
.supportFileFormatRead(fileFormat, fields, getPartitionSchema.nonEmpty, getInputFilePaths)
if (!validationResult.isValid) {
if (!validationResult.ok()) {
return validationResult
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,18 @@ abstract class BatchScanExecTransformerBase(

override def doValidateInternal(): ValidationResult = {
if (pushedAggregate.nonEmpty) {
return ValidationResult.notOk(s"Unsupported aggregation push down for $scan.")
return ValidationResult.failed(s"Unsupported aggregation push down for $scan.")
}

if (
SparkShimLoader.getSparkShims.findRowIndexColumnIndexInSchema(schema) > 0 &&
!BackendsApiManager.getSettings.supportNativeRowIndexColumn()
) {
return ValidationResult.notOk("Unsupported row index column scan in native.")
return ValidationResult.failed("Unsupported row index column scan in native.")
}

if (hasUnsupportedColumns) {
return ValidationResult.notOk(s"Unsupported columns scan in native.")
return ValidationResult.failed(s"Unsupported columns scan in native.")
}

super.doValidateInternal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,15 @@ abstract class BroadcastNestedLoopJoinExecTransformer(

override protected def doValidateInternal(): ValidationResult = {
if (!BackendsApiManager.getSettings.supportBroadcastNestedLoopJoinExec()) {
return ValidationResult.notOk("Broadcast Nested Loop join is not supported in this backend")
return ValidationResult.failed("Broadcast Nested Loop join is not supported in this backend")
}
if (substraitJoinType == CrossRel.JoinType.UNRECOGNIZED) {
return ValidationResult.notOk(s"$joinType join is not supported with BroadcastNestedLoopJoin")
return ValidationResult.failed(
s"$joinType join is not supported with BroadcastNestedLoopJoin")
}
(joinType, buildSide) match {
case (LeftOuter, BuildLeft) | (RightOuter, BuildRight) =>
return ValidationResult.notOk(s"$joinType join is not supported with $buildSide")
return ValidationResult.failed(s"$joinType join is not supported with $buildSide")
case _ => // continue
}
val substraitContext = new SubstraitContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.rel.RelBuilder
import org.apache.gluten.utils.SubstraitUtil

import org.apache.spark.{Dependency, NarrowDependency, Partition, SparkContext, TaskContext}
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
Expand Down Expand Up @@ -112,7 +112,7 @@ case class CartesianProductExecTransformer(

override protected def doValidateInternal(): ValidationResult = {
if (!BackendsApiManager.getSettings.supportCartesianProductExec()) {
return ValidationResult.notOk("Cartesian product is not supported in this backend")
return ValidationResult.failed("Cartesian product is not supported in this backend")
}
val substraitContext = new SubstraitContext
val expressionNode = condition.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ case class ExpandExecTransformer(

override protected def doValidateInternal(): ValidationResult = {
if (!BackendsApiManager.getSettings.supportExpandExec()) {
return ValidationResult.notOk("Current backend does not support expand")
return ValidationResult.failed("Current backend does not support expand")
}
if (projections.isEmpty) {
return ValidationResult.notOk("Current backend does not support empty projections in expand")
return ValidationResult.failed("Current backend does not support empty projections in expand")
}

val substraitContext = new SubstraitContext
Expand Down
Loading

0 comments on commit 917653a

Please sign in to comment.