Skip to content

Commit

Permalink
Merge branch 'apache:main' into gayangya/input_file_name_ut_refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyangxiaozhu authored Jul 3, 2024
2 parents 4fc8f58 + 80bb848 commit 1c51a13
Show file tree
Hide file tree
Showing 114 changed files with 1,628 additions and 584 deletions.
26 changes: 26 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Apache Gluten(incubating)
Copyright 2023-2024 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).

Apache Spark
Copyright 2014 and onwards The Apache Software Foundation.

Apache Celeborn
Copyright 2022-2024 The Apache Software Foundation.

Apache Uniffle (incubating)
Copyright 2022 and onwards The Apache Software Foundation.

Apache Iceberg
Copyright 2017-2024 The Apache Software Foundation.

Apache Parquet MR
Copyright 2014-2024 The Apache Software Foundation.

Apache ORC
Copyright 2013 and onwards The Apache Software Foundation.

Apache Thrift
Copyright (C) 2006 - 2019, The Apache Software Foundation.
6 changes: 3 additions & 3 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<version>1.13.5</version>
<version>1.17.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -126,13 +126,13 @@
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-mockito_2.12</artifactId>
<artifactId>scalatestplus-mockito_${scala.binary.version}</artifactId>
<version>1.0.0-M2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-scalacheck_2.12</artifactId>
<artifactId>scalatestplus-scalacheck_${scala.binary.version}</artifactId>
<version>3.1.0.0-RC2</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ClickhouseOptimisticTransaction(
def this(
deltaLog: DeltaLog,
catalogTable: Option[CatalogTable],
snapshotOpt: Option[Snapshot] = None) {
snapshotOpt: Option[Snapshot] = None) = {
this(
deltaLog,
catalogTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.sql.delta.stats.StatisticsCollection
import org.apache.spark.sql.delta.util.DeltaCommitFileProvider
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.StateCache
import org.apache.spark.sql.util.ScalaExtensions._
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.sql._
Expand Down Expand Up @@ -126,7 +125,27 @@ class Snapshot(
* This potentially triggers an IO operation to read the inCommitTimestamp.
* This is a lazy val, so repeated calls will not trigger multiple IO operations.
*/
protected lazy val getInCommitTimestampOpt: Option[Long] =
protected lazy val getInCommitTimestampOpt: Option[Long] = {
// --- modified start
// This implicit is for scala 2.12, copy from scala 2.13
implicit class OptionExtCompanion(opt: Option.type) {
/**
* When a given condition is true, evaluates the a argument and returns Some(a).
* When the condition is false, a is not evaluated and None is returned.
*/
def when[A](cond: Boolean)(a: => A): Option[A] = if (cond) Some(a) else None

/**
* When a given condition is false, evaluates the a argument and returns Some(a).
* When the condition is true, a is not evaluated and None is returned.
*/
def whenNot[A](cond: Boolean)(a: => A): Option[A] = if (!cond) Some(a) else None

/** Sum up all the `options`, substituting `default` for each `None`. */
def sum[N: Numeric](default: N)(options: Option[N]*): N =
options.map(_.getOrElse(default)).sum
}
// --- modified end
Option.when(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(metadata)) {
_reconstructedProtocolMetadataAndICT.inCommitTimestamp
.getOrElse {
Expand Down Expand Up @@ -158,6 +177,7 @@ class Snapshot(
}
}
}
}


private[delta] lazy val nonFileActions: Seq[Action] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ trait VacuumCommandImpl extends DeltaCommand {
// This is never going to be a path relative to `basePath` for DVs.
None
}
case None => None
case _ => None
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata)
setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
clickhouseTableConfigs: Map[String, String],
partitionColumns: Seq[String]) {
partitionColumns: Seq[String]) = {
this(protocol, metadata)
this.database = database
this.tableName = tableName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.gluten.vectorized;

import org.apache.gluten.metrics.IMetrics;
import org.apache.gluten.metrics.NativeMetrics;

import org.apache.spark.sql.execution.utils.CHExecUtil;
import org.apache.spark.sql.vectorized.ColumnVector;
Expand Down Expand Up @@ -50,7 +51,7 @@ public String getId() {

private native void nativeCancel(long nativeHandle);

private native IMetrics nativeFetchMetrics(long nativeHandle);
private native String nativeFetchMetrics(long nativeHandle);

@Override
public boolean hasNextInternal() throws IOException {
Expand All @@ -72,8 +73,8 @@ public ColumnarBatch nextInternal() throws IOException {
}

@Override
public IMetrics getMetricsInternal() throws IOException, ClassNotFoundException {
return nativeFetchMetrics(handle);
public IMetrics getMetricsInternal() {
return new NativeMetrics(nativeFetchMetrics(handle));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
}
dataSchema += newField
}
StructType(dataSchema)
StructType(dataSchema.toSeq)
}

private def createNativeIterator(
Expand Down Expand Up @@ -114,7 +114,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
if (scan.fileFormat == ReadFileFormat.TextReadFormat) {
val names =
ConverterUtils.collectAttributeNamesWithoutExprId(scan.outputAttributes())
localFilesNode.setFileSchema(getFileSchema(scan.getDataSchema, names.asScala))
localFilesNode.setFileSchema(getFileSchema(scan.getDataSchema, names.asScala.toSeq))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.expression._
import org.apache.gluten.extension.{CountDistinctWithoutExpand, FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage, RewriteToDateExpresstionRule}
import org.apache.gluten.extension.columnar.AddTransformHintRule
import org.apache.gluten.extension.columnar.AddFallbackTagRule
import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.sql.shims.SparkShimLoader
Expand Down Expand Up @@ -146,7 +146,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {

child match {
case scan: FileSourceScanExec if (checkMergeTreeFileFormat(scan.relation)) =>
// For the validation phase of the AddTransformHintRule
// For the validation phase of the AddFallbackTagRule
CHFilterExecTransformer(condition, child)
case scan: FileSourceScanExecTransformerBase if (checkMergeTreeFileFormat(scan.relation)) =>
// For the transform phase, the FileSourceScanExec is already transformed
Expand Down Expand Up @@ -226,7 +226,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
// FIXME: The operation happens inside ReplaceSingleNode().
// Caller may not know it adds project on top of the shuffle.
val project = TransformPreOverrides().apply(
AddTransformHintRule().apply(
AddFallbackTagRule().apply(
ProjectExec(plan.child.output ++ projectExpressions, plan.child)))
var newExprs = Seq[Expression]()
for (i <- exprs.indices) {
Expand All @@ -251,7 +251,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
// FIXME: The operation happens inside ReplaceSingleNode().
// Caller may not know it adds project on top of the shuffle.
val project = TransformPreOverrides().apply(
AddTransformHintRule().apply(
AddFallbackTagRule().apply(
ProjectExec(plan.child.output ++ projectExpressions, plan.child)))
var newOrderings = Seq[SortOrder]()
for (i <- orderings.indices) {
Expand Down Expand Up @@ -842,6 +842,24 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
CHGenerateExecTransformer(generator, requiredChildOutput, outer, generatorOutput, child)
}

/** Transform array filter to Substrait. */
override def genArrayFilterTransformer(
substraitExprName: String,
argument: ExpressionTransformer,
function: ExpressionTransformer,
expr: ArrayFilter): ExpressionTransformer = {
GenericExpressionTransformer(substraitExprName, Seq(argument, function), expr)
}

/** Transform array transform to Substrait. */
override def genArrayTransformTransformer(
substraitExprName: String,
argument: ExpressionTransformer,
function: ExpressionTransformer,
expr: ArrayTransform): ExpressionTransformer = {
GenericExpressionTransformer(substraitExprName, Seq(argument, function), expr)
}

override def genPreProjectForGenerate(generate: GenerateExec): SparkPlan = generate

override def genPostProjectForGenerate(generate: GenerateExec): SparkPlan = generate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.gluten.extension
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.columnar._
import org.apache.gluten.extension.columnar.TransformHints.EncodeTransformableTagImplicits
import org.apache.gluten.extension.columnar.FallbackTags.EncodeFallbackTagImplicits
import org.apache.gluten.utils.PhysicalPlanSelector

import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -61,7 +61,7 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend
"columnar broadcast exchange is disabled or " +
"columnar broadcast join is disabled")
} else {
if (TransformHints.isNotTransformable(bhj)) {
if (FallbackTags.nonEmpty(bhj)) {
ValidationResult.notOk("broadcast join is already tagged as not transformable")
} else {
val bhjTransformer = BackendsApiManager.getSparkPlanExecApiInstance
Expand All @@ -83,8 +83,8 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend
}
}
}
TransformHints.tagNotTransformable(bhj, isTransformable)
TransformHints.tagNotTransformable(exchange, isTransformable)
FallbackTags.add(bhj, isTransformable)
FallbackTags.add(exchange, isTransformable)
case _ =>
// Skip. This might be the case that the exchange was already
// executed in earlier stage
Expand Down Expand Up @@ -116,7 +116,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl
// Currently their doBroadcast() methods just propagate child's broadcast
// payloads which is not right in speaking of columnar.
if (!enableColumnarBroadcastJoin) {
TransformHints.tagNotTransformable(
FallbackTags.add(
bhj,
"columnar BroadcastJoin is not enabled in BroadcastHashJoinExec")
} else {
Expand Down Expand Up @@ -149,7 +149,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl
case Some(exchange @ BroadcastExchangeExec(mode, child)) =>
isBhjTransformable.tagOnFallback(bhj)
if (!isBhjTransformable.isValid) {
TransformHints.tagNotTransformable(exchange, isBhjTransformable)
FallbackTags.add(exchange, isBhjTransformable)
}
case None =>
// we are in AQE, find the hidden exchange
Expand Down Expand Up @@ -182,7 +182,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl
// to conform to the underlying exchange's type, columnar or vanilla
exchange match {
case BroadcastExchangeExec(mode, child) =>
TransformHints.tagNotTransformable(
FallbackTags.add(
bhj,
"it's a materialized broadcast exchange or reused broadcast exchange")
case ColumnarBroadcastExchangeExec(mode, child) =>
Expand All @@ -199,7 +199,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl
}
} catch {
case e: UnsupportedOperationException =>
TransformHints.tagNotTransformable(
FallbackTags.add(
p,
s"${e.getMessage}, original Spark plan is " +
s"${p.getClass}(${p.children.toList.map(_.getClass)})")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,12 @@ object MetricsUtil extends Logging {

/** Get all processors */
def getAllProcessorList(metricData: MetricsData): Seq[MetricsProcessor] = {
metricData.steps.asScala.flatMap(
step => {
step.processors.asScala
})
metricData.steps.asScala
.flatMap(
step => {
step.processors.asScala
})
.toSeq
}

/** Update extra time metric by the processors */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
sparkSession
)
}
partitions
partitions.toSeq
}

def genInputPartitionSeq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,12 @@ abstract class MergeTreeFileFormatDataWriter(
releaseResources()
val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs {
// committer.commitTask(taskAttemptContext)
val statuses = returnedMetrics.map(
v => {
v._2
})
val statuses = returnedMetrics
.map(
v => {
v._2
})
.toSeq
new TaskCommitMessage(statuses)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.types.DoubleType
import java.util.concurrent.ForkJoinPool

import scala.collection.parallel.ForkJoinTaskSupport
import scala.collection.parallel.immutable.ParVector

class GlutenClickHouseTPCHParquetAQEConcurrentSuite
extends GlutenClickHouseTPCHAbstractSuite
Expand Down Expand Up @@ -74,7 +75,7 @@ class GlutenClickHouseTPCHParquetAQEConcurrentSuite

test("fix race condition at the global variable of ColumnarOverrideRules::isAdaptiveContext") {

val queries = ((1 to 22) ++ (1 to 22) ++ (1 to 22) ++ (1 to 22)).par
val queries = ParVector((1 to 22) ++ (1 to 22) ++ (1 to 22) ++ (1 to 22): _*)
queries.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(22))
queries.map(queryId => runTPCHQuery(queryId) { df => })

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,19 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
compareResultsAgainstVanillaSpark(query_sql, true, { _ => })
spark.sql("drop table test")
}

test("intersect all") {
spark.sql("create table t1 (a int, b string) using parquet")
spark.sql("insert into t1 values (1, '1'),(2, '2'),(3, '3'),(4, '4'),(5, '5'),(6, '6')")
spark.sql("create table t2 (a int, b string) using parquet")
spark.sql("insert into t2 values (4, '4'),(5, '5'),(6, '6'),(7, '7'),(8, '8'),(9, '9')")
runQueryAndCompare(
"""
|SELECT a,b FROM t1 INTERSECT ALL SELECT a,b FROM t2
|""".stripMargin
)(df => checkFallbackOperators(df, 0))
spark.sql("drop table t1")
spark.sql("drop table t2")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -713,4 +713,21 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS
}

}

test("array functions with lambda") {
withTable("tb_array") {
sql("create table tb_array(ids array<int>) using parquet")
sql("""
|insert into tb_array values (array(1,5,2,null, 3)), (array(1,1,3,2)), (null), (array())
|""".stripMargin)
val transform_sql = "select transform(ids, x -> x + 1) from tb_array"
runQueryAndCompare(transform_sql)(checkGlutenOperatorMatch[ProjectExecTransformer])

val filter_sql = "select filter(ids, x -> x % 2 == 1) from tb_array";
runQueryAndCompare(filter_sql)(checkGlutenOperatorMatch[ProjectExecTransformer])

val aggregate_sql = "select ids, aggregate(ids, 3, (acc, x) -> acc + x) from tb_array";
runQueryAndCompare(aggregate_sql)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
}
Loading

0 comments on commit 1c51a13

Please sign in to comment.