Skip to content

Commit

Permalink
Merge branch 'main' into libhdfs-replace
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyuan authored Oct 15, 2024
2 parents ae803f7 + 74c6641 commit e78faca
Show file tree
Hide file tree
Showing 101 changed files with 2,548 additions and 1,235 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check_license.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ jobs:
- name: Check License Header
run: |
git fetch --recurse-submodules=no origin main ${{github.event.pull_request.base.sha}}
pip install regex
pip install regex --break-system-packages
cd $GITHUB_WORKSPACE/
./.github/workflows/util/check.sh ${{github.event.pull_request.base.sha}}
23 changes: 11 additions & 12 deletions .github/workflows/velox_backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,7 @@ jobs:
cd $GITHUB_WORKSPACE/tools/gluten-it
$MVN_CMD clean install -P${{ matrix.spark }}
GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh data-gen-only --local --benchmark-type=ds -s=30.0 --threads=12
- name: (To be fixed) TPC-DS SF30.0 Parquet local spark3.2 Q67/Q95 low memory, memory isolation off
continue-on-error: true
- name: TPC-DS SF30.0 Parquet local spark3.2 Q67/Q95 low memory, memory isolation off
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
Expand Down Expand Up @@ -573,7 +572,7 @@ jobs:
run-spark-test-spark32:
needs: build-native-lib-centos-7
runs-on: ubuntu-20.04
container: ghcr.io/facebookincubator/velox-dev:centos8
container: apache/gluten:centos-8
env:
CCACHE_DIR: "${{ github.workspace }}/.ccache"
steps:
Expand Down Expand Up @@ -628,7 +627,7 @@ jobs:
run-spark-test-spark32-slow:
needs: build-native-lib-centos-7
runs-on: ubuntu-20.04
container: ghcr.io/facebookincubator/velox-dev:centos8
container: apache/gluten:centos-8
env:
CCACHE_DIR: "${{ github.workspace }}/.ccache"
steps:
Expand Down Expand Up @@ -678,7 +677,7 @@ jobs:
run-spark-test-spark33:
needs: build-native-lib-centos-7
runs-on: ubuntu-20.04
container: ghcr.io/facebookincubator/velox-dev:centos8
container: apache/gluten:centos-8
env:
CCACHE_DIR: "${{ github.workspace }}/.ccache"
steps:
Expand Down Expand Up @@ -736,7 +735,7 @@ jobs:
run-spark-test-spark33-slow:
needs: build-native-lib-centos-7
runs-on: ubuntu-20.04
container: ghcr.io/facebookincubator/velox-dev:centos8
container: apache/gluten:centos-8
env:
CCACHE_DIR: "${{ github.workspace }}/.ccache"
steps:
Expand Down Expand Up @@ -787,7 +786,7 @@ jobs:
run-spark-test-spark34:
needs: build-native-lib-centos-7
runs-on: ubuntu-20.04
container: ghcr.io/facebookincubator/velox-dev:centos8
container: apache/gluten:centos-8
env:
CCACHE_DIR: "${{ github.workspace }}/.ccache"
steps:
Expand Down Expand Up @@ -845,7 +844,7 @@ jobs:
run-spark-test-spark34-slow:
needs: build-native-lib-centos-7
runs-on: ubuntu-20.04
container: ghcr.io/facebookincubator/velox-dev:centos8
container: apache/gluten:centos-8
env:
CCACHE_DIR: "${{ github.workspace }}/.ccache"
steps:
Expand Down Expand Up @@ -896,7 +895,7 @@ jobs:
run-spark-test-spark35:
needs: build-native-lib-centos-7
runs-on: ubuntu-20.04
container: ghcr.io/facebookincubator/velox-dev:centos8
container: apache/gluten:centos-8
env:
CCACHE_DIR: "${{ github.workspace }}/.ccache"
steps:
Expand Down Expand Up @@ -953,7 +952,7 @@ jobs:
run-spark-test-spark35-scala213:
needs: build-native-lib-centos-7
runs-on: ubuntu-20.04
container: ghcr.io/facebookincubator/velox-dev:centos8
container: apache/gluten:centos-8
env:
CCACHE_DIR: "${{ github.workspace }}/.ccache"
steps:
Expand Down Expand Up @@ -1010,7 +1009,7 @@ jobs:
run-spark-test-spark35-slow:
needs: build-native-lib-centos-7
runs-on: ubuntu-20.04
container: ghcr.io/facebookincubator/velox-dev:centos8
container: apache/gluten:centos-8
env:
CCACHE_DIR: "${{ github.workspace }}/.ccache"
steps:
Expand Down Expand Up @@ -1060,7 +1059,7 @@ jobs:

run-cpp-test-udf-test:
runs-on: ubuntu-20.04
container: ghcr.io/facebookincubator/velox-dev:centos8
container: apache/gluten:centos-8
steps:
- uses: actions/checkout@v2
- name: Generate cache key
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/velox_backend_cache.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:

cache-native-lib-centos-8:
runs-on: ubuntu-20.04
container: ghcr.io/facebookincubator/velox-dev:centos8
container: apache/gluten:centos-8
steps:
- uses: actions/checkout@v2
- name: Generate cache key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
"totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "time")
)

override def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
override def genFilterTransformerMetricsUpdater(
metrics: Map[String, SQLMetric],
extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
new FilterMetricsUpdater(metrics)

override def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Expand All @@ -182,7 +184,9 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
)

override def genProjectTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater = new ProjectMetricsUpdater(metrics)
metrics: Map[String, SQLMetric],
extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
new ProjectMetricsUpdater(metrics)

override def genHashAggregateTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.types._

import com.google.protobuf.{Any, StringValue}
Expand All @@ -53,6 +55,19 @@ object CHHashAggregateExecTransformer {
def newStructFieldId(): Long = curId.getAndIncrement()
}

/**
* About aggregate modes. In general, all the modes of aggregate expressions in the same
* HashAggregateExec are the same. And the aggregation will be divided into two stages, partial
* aggregate and final merge aggregated. But there are some exceptions.
* - f(distinct x). This will be divided into four stages (without stages merged by
* `MergeTwoPhasesHashBaseAggregate`). The first two stages use `x` as a grouping key and
* without aggregate functions. The last two stages aggregate without `x` as a grouping key and
* with aggregate function `f`.
* - f1(distinct x), f(2). This will be divided into four stages. The first two stages use `x` as
* a grouping key and with partial aggregate function `f2`. The last two stages aggregate
* without `x` as a grouping key and with aggregate function `f1` and `f2`. The 3rd stages hase
* different modes at the same time. `f2` is partial merge but `f1` is partial.
*/
case class CHHashAggregateExecTransformer(
requiredChildDistributionExpressions: Option[Seq[Expression]],
groupingExpressions: Seq[NamedExpression],
Expand Down Expand Up @@ -402,13 +417,59 @@ case class CHHashAggregateExecTransformer(
} else {
null
}
val optimizationContent = s"has_required_child_distribution_expressions=" +
s"${requiredChildDistributionExpressions.isDefined}\n"
val parametersStrBuf = new StringBuffer("AggregateParams:")
parametersStrBuf
.append(s"hasPrePartialAggregate=$hasPrePartialAggregate")
.append("\n")
.append(s"hasRequiredChildDistributionExpressions=" +
s"${requiredChildDistributionExpressions.isDefined}")
.append("\n")
val optimization =
BackendsApiManager.getTransformerApiInstance.packPBMessage(
StringValue.newBuilder.setValue(optimizationContent).build)
StringValue.newBuilder.setValue(parametersStrBuf.toString).build)
ExtensionBuilder.makeAdvancedExtension(optimization, enhancement)
}

// Check that there is a partial aggregation ahead this HashAggregateExec.
// This is useful when aggregate expressions are empty.
private def hasPrePartialAggregate(): Boolean = {
def isSameAggregation(agg1: BaseAggregateExec, agg2: BaseAggregateExec): Boolean = {
val res = agg1.groupingExpressions.length == agg2.groupingExpressions.length &&
agg1.groupingExpressions.zip(agg2.groupingExpressions).forall {
case (e1, e2) =>
e1.toAttribute == e2.toAttribute
}
res
}

def checkChild(exec: SparkPlan): Boolean = {
exec match {
case agg: BaseAggregateExec =>
isSameAggregation(this, agg)
case shuffle: ShuffleExchangeLike =>
checkChild(shuffle.child)
case iter: InputIteratorTransformer =>
checkChild(iter.child)
case inputAdapter: ColumnarInputAdapter =>
checkChild(inputAdapter.child)
case wholeStage: WholeStageTransformer =>
checkChild(wholeStage.child)
case aqeShuffleRead: AQEShuffleReadExec =>
checkChild(aqeShuffleRead.child)
case shuffle: ShuffleQueryStageExec =>
checkChild(shuffle.plan)
case _ =>
false
}
}

// It's more complex when the aggregate expressions are empty. We need to iterate the plan
// to find whether there is a partial aggregation. `count(distinct x)` is one of these cases.
if (aggregateExpressions.length > 0) {
modes.exists(mode => mode == PartialMerge || mode == Final)
} else {
checkChild(child)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.gluten.extension

import org.apache.gluten.GlutenConfig

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, Final, Partial}
import org.apache.spark.sql.catalyst.rules.Rule
Expand All @@ -33,7 +34,9 @@ import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregat
* Note: this rule must be applied before the `PullOutPreProject` rule, because the
* `PullOutPreProject` rule will modify the attributes in some cases.
*/
case class MergeTwoPhasesHashBaseAggregate(session: SparkSession) extends Rule[SparkPlan] {
case class MergeTwoPhasesHashBaseAggregate(session: SparkSession)
extends Rule[SparkPlan]
with Logging {

val columnarConf: GlutenConfig = GlutenConfig.getConf
val scanOnly: Boolean = columnarConf.enableScanOnly
Expand Down Expand Up @@ -73,7 +76,6 @@ case class MergeTwoPhasesHashBaseAggregate(session: SparkSession) extends Rule[S
// convert to complete mode aggregate expressions
val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete))
hashAgg.copy(
requiredChildDistributionExpressions = None,
groupingExpressions = child.groupingExpressions,
aggregateExpressions = completeAggregateExpressions,
initialInputBufferOffset = 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,12 @@ object HashAggregateMetricsUpdater {
"AggregatingTransform",
"StreamingAggregatingTransform",
"MergingAggregatedTransform",
"GraceAggregatingTransform",
"GraceMergingAggregatedTransform")
val CH_PLAN_NODE_NAME = Array(
"AggregatingTransform",
"StreamingAggregatingTransform",
"MergingAggregatedTransform",
"GraceAggregatingTransform",
"GraceMergingAggregatedTransform")
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ class GlutenClickhouseCountDistinctSuite extends GlutenClickHouseWholeStageTrans
}
}

test("test distinct with not-distinct") {
val sql = "select a, count(distinct(b)), sum(c) from " +
"values (0, null,1), (0,null,1), (1, 1,1), (2, 2, 1) ,(2,2,2) as data(a,b,c) group by a"
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}

test("check all data types") {
spark.createDataFrame(genTestData()).createOrReplaceTempView("all_data_types")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -818,4 +818,14 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS
""".stripMargin
runQueryAndCompare(sql)(checkGlutenOperatorMatch[ProjectExecTransformer])
}

test("GLUTEN-7432 get_json_object returns array") {
val sql = """
|select
|get_json_object(a, '$.a[*].x')
|from values('{"a":[{"x":1}, {"x":5}]}'), ('{"a":[{"x":1}]}')
|as data(a)
""".stripMargin
runQueryAndCompare(sql)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,18 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
}
}

test("GLUTEN-7389: cast map to string diff with spark") {
withTable("test_7389") {
sql("create table test_7389(a map<string, int>) using parquet")
sql("insert into test_7389 values(map('a', 1, 'b', 2))")
compareResultsAgainstVanillaSpark(
"""
|select cast(a as string) from test_7389
|""".stripMargin,
true,
{ _ => }
)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
}

override def injectWriteFilesTempPath(path: String, fileName: String): Unit = {
val transKernel = NativePlanEvaluator.create()
transKernel.injectWriteFilesTempPath(path)
NativePlanEvaluator.injectWriteFilesTempPath(path)
}

/** Generate Iterator[ColumnarBatch] for first stage. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"number of memory allocations")
)

override def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
new FilterMetricsUpdater(metrics)
override def genFilterTransformerMetricsUpdater(
metrics: Map[String, SQLMetric],
extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
new FilterMetricsUpdater(metrics, extraMetrics)

override def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
Expand All @@ -192,7 +194,9 @@ class VeloxMetricsApi extends MetricsApi with Logging {
)

override def genProjectTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater = new ProjectMetricsUpdater(metrics)
metrics: Map[String, SQLMetric],
extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
new ProjectMetricsUpdater(metrics, extraMetrics)

override def genHashAggregateTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,11 @@
*/
package org.apache.gluten.execution

import org.apache.spark.sql.catalyst.expressions.{And, Expression}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.SparkPlan

case class FilterExecTransformer(condition: Expression, child: SparkPlan)
extends FilterExecTransformerBase(condition, child) {
// FIXME: Should use field "condition" to store the actual executed filter expressions.
// To make optimization easier (like to remove filter when it actually does nothing)
override protected def getRemainingCondition: Expression = {
val scanFilters = child match {
// Get the filters including the manually pushed down ones.
case basicScanExecTransformer: BasicScanExecTransformer =>
basicScanExecTransformer.filterExprs()
// For fallback scan, we need to keep original filter.
case _ =>
Seq.empty[Expression]
}
if (scanFilters.isEmpty) {
condition
} else {
val remainingFilters =
FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition))
remainingFilters.reduceLeftOption(And).orNull
}
}

override protected def withNewChildInternal(newChild: SparkPlan): FilterExecTransformer =
copy(child = newChild)
Expand Down
Loading

0 comments on commit e78faca

Please sign in to comment.