Skip to content

Commit

Permalink
Merge branch 'apache:main' into gayangya/assert_true
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyangxiaozhu authored Jul 5, 2024
2 parents c51e4d1 + f8e6b75 commit 4920289
Show file tree
Hide file tree
Showing 36 changed files with 697 additions and 92 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ on:
- 'dev/**'

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
MVN_CMD: 'mvn -ntp'

concurrency:
Expand Down Expand Up @@ -532,7 +533,7 @@ jobs:
fail-fast: false
matrix:
spark: [ "spark-3.2" ]
celeborn: [ "celeborn-0.4.1", "celeborn-0.3.2-incubating" ]
celeborn: [ "celeborn-0.5.0", "celeborn-0.4.1", "celeborn-0.3.2-incubating" ]
runs-on: ubuntu-20.04
container: ubuntu:22.04
steps:
Expand Down Expand Up @@ -563,8 +564,10 @@ jobs:
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with ${{ matrix.celeborn }}
run: |
EXTRA_PROFILE=""
if [ "${{ matrix.celeborn }}" = "celeborn-0.4.0" ]; then
if [ "${{ matrix.celeborn }}" = "celeborn-0.4.1" ]; then
EXTRA_PROFILE="-Pceleborn-0.4"
elif [ "${{ matrix.celeborn }}" = "celeborn-0.5.0" ]; then
EXTRA_PROFILE="-Pceleborn-0.5"
fi
echo "EXTRA_PROFILE: ${EXTRA_PROFILE}"
cd /opt && mkdir -p celeborn && \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, DenseRank, Expression, Lag, Lead, Literal, NamedExpression, Rank, RowNumber}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.execution.SparkPlan
Expand Down Expand Up @@ -237,7 +237,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}

wExpression.windowFunction match {
case _: RowNumber | _: AggregateExpression | _: Rank | _: DenseRank =>
case _: RowNumber | _: AggregateExpression | _: Rank | _: DenseRank | _: NTile =>
allSupported = allSupported
case l: Lag =>
checkLagOrLead(l.third)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
val columnName = s"${aliasExpr.name}_${aliasExpr.exprId.id}"
val wExpression = aliasExpr.child.asInstanceOf[WindowExpression]
wExpression.windowFunction match {
case wf @ (RowNumber() | Rank(_) | DenseRank(_) | CumeDist() | PercentRank(_)) =>
case wf @ (RowNumber() | Rank(_) | DenseRank(_)) =>
val aggWindowFunc = wf.asInstanceOf[AggregateWindowFunction]
val frame = aggWindowFunc.frame.asInstanceOf[SpecifiedWindowFrame]
val windowFunctionNode = ExpressionBuilder.makeWindowFunction(
Expand Down Expand Up @@ -795,6 +795,22 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
originalInputAttributes.asJava
)
windowExpressionNodes.add(windowFunctionNode)
case wf @ NTile(buckets: Expression) =>
val frame = wExpression.windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
val childrenNodeList = new JArrayList[ExpressionNode]()
val literal = buckets.asInstanceOf[Literal]
childrenNodeList.add(LiteralTransformer(literal).doTransform(args))
val windowFunctionNode = ExpressionBuilder.makeWindowFunction(
WindowFunctionsBuilder.create(args, wf).toInt,
childrenNodeList,
columnName,
ConverterUtils.getTypeNode(wf.dataType, wf.nullable),
frame.upper,
frame.lower,
frame.frameType.sql,
originalInputAttributes.asJava
)
windowExpressionNodes.add(windowFunctionNode)
case _ =>
throw new GlutenNotSupportException(
"unsupported window function type: " +
Expand Down Expand Up @@ -860,6 +876,15 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
GenericExpressionTransformer(substraitExprName, Seq(argument, function), expr)
}

/** Transform array sort to Substrait. */
override def genArraySortTransformer(
substraitExprName: String,
argument: ExpressionTransformer,
function: ExpressionTransformer,
expr: ArraySort): ExpressionTransformer = {
GenericExpressionTransformer(substraitExprName, Seq(argument, function), expr)
}

override def genPreProjectForGenerate(generate: GenerateExec): SparkPlan = generate

override def genPostProjectForGenerate(generate: GenerateExec): SparkPlan = generate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ class RangePartitionerBoundsGenerator[K: Ordering: ClassTag, V](
case d: DecimalType =>
val decimal = row.getDecimal(i, d.precision, d.scale).toString()
node.put("value", decimal)
case _: TimestampType => node.put("value", row.getLong(i))
case _ =>
throw new IllegalArgumentException(
s"Unsupported data type ${ordering.dataType.toString}")
Expand Down Expand Up @@ -244,6 +245,7 @@ object RangePartitionerBoundsGenerator {
case _: StringType => true
case _: DateType => true
case _: DecimalType => true
case _: TimestampType => true
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,26 @@ class GlutenClickHouseDatetimeExpressionSuite
|""".stripMargin
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}

test("support range partition by timestamp") {
import testImplicits._
val df = Seq(
(1, Timestamp.valueOf("2015-07-22 10:01:40.123456")),
(2, Timestamp.valueOf("2014-12-31 05:29:06.123456")),
(3, Timestamp.valueOf("2015-07-22 16:01:40.123456")),
(4, Timestamp.valueOf("2012-02-29 23:01:40.123456"))
).toDF("i", "t")

df.createOrReplaceTempView("test")

val sql =
s"""
| select
| /** repartition(2) */
| *
| from test
| order by t
|""".stripMargin
compareResultsAgainstVanillaSpark(sql, compareResult = true, { _ => })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,22 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}

test("window ntile") {
val sql =
"""
| select n_regionkey, n_nationkey,
| first_value(n_nationkey) over (partition by n_regionkey order by n_nationkey) as
| first_v,
| ntile(4) over (partition by n_regionkey order by n_nationkey) as ntile_v
| from
| (
| select n_regionkey, if(n_nationkey = 1, null, n_nationkey) as n_nationkey from nation
| ) as t
| order by n_regionkey, n_nationkey
""".stripMargin
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}

test("window first value with nulls") {
val sql =
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class VeloxListenerApi extends ListenerApi {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Anolis") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (system.contains("tencentos") && system.contains("2.4")) {
new SharedLibraryLoaderCentos7
} else if (system.contains("tencentos") && system.contains("3.2")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("9")) {
Expand All @@ -94,7 +96,7 @@ class VeloxListenerApi extends ListenerApi {
throw new GlutenException(
s"Found unsupported OS($systemName, $systemVersion)! Currently, Gluten's Velox backend" +
" only supports Ubuntu 20.04/22.04, CentOS 7/8, " +
"Alibaba Cloud Linux 2/3 & Anolis 7/8, tencentos 3.2, RedHat 7/8/9, " +
"Alibaba Cloud Linux 2/3 & Anolis 7/8, tencentos 2.4/3.2, RedHat 7/8, " +
"Debian 11/12.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1911,4 +1911,10 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
val resultLength = df.collect().length
assert(resultLength > 25000 && resultLength < 35000)
}

test("Deduplicate sorting keys") {
runQueryAndCompare("select * from lineitem order by l_orderkey, l_orderkey") {
checkGlutenOperatorMatch[SortExecTransformer]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,32 @@ abstract class VeloxAggregateFunctionsSuite extends VeloxWholeStageTransformerSu
df.select(max(col("txn"))).collect

}

test("drop redundant partial sort which has pre-project when offload sortAgg") {
// Spark 3.2 does not have this configuration, but it does not affect the test results.
withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") {
withTempView("t1") {
Seq((-1, 2), (-1, 3), (2, 3), (3, 4), (-3, 5), (4, 5))
.toDF("c1", "c2")
.createOrReplaceTempView("t1")
runQueryAndCompare("select c2, sum(if(c1<0,0,c1)) from t1 group by c2") {
df =>
{
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[HashAggregateExecTransformer]
}) == 2)
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[SortExecTransformer]
}) == 0)
}
}
}
}
}
}

class VeloxAggregateFunctionsDefaultSuite extends VeloxAggregateFunctionsSuite {
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20240703
CH_COMMIT=aa71be074ad
CH_BRANCH=rebase_ch/20240704
CH_COMMIT=f617655ccea

Loading

0 comments on commit 4920289

Please sign in to comment.