Skip to content

Commit

Permalink
Merge branch 'main' into upgrade_folly
Browse files Browse the repository at this point in the history
  • Loading branch information
boneanxs authored Jul 8, 2024
2 parents 988e09c + 97d0f18 commit 0c12d48
Show file tree
Hide file tree
Showing 56 changed files with 895 additions and 151 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/build_bundle_package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

name: Build bundle package

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

concurrency:
group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
cancel-in-progress: true
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ on:
- 'dev/**'

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
MVN_CMD: 'mvn -ntp'

concurrency:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, DenseRank, Expression, Lag, Lead, Literal, NamedExpression, Rank, RowNumber}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.execution.SparkPlan
Expand Down Expand Up @@ -237,7 +237,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}

wExpression.windowFunction match {
case _: RowNumber | _: AggregateExpression | _: Rank | _: DenseRank =>
case _: RowNumber | _: AggregateExpression | _: Rank | _: DenseRank | _: NTile =>
allSupported = allSupported
case l: Lag =>
checkLagOrLead(l.third)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
val columnName = s"${aliasExpr.name}_${aliasExpr.exprId.id}"
val wExpression = aliasExpr.child.asInstanceOf[WindowExpression]
wExpression.windowFunction match {
case wf @ (RowNumber() | Rank(_) | DenseRank(_) | CumeDist() | PercentRank(_)) =>
case wf @ (RowNumber() | Rank(_) | DenseRank(_)) =>
val aggWindowFunc = wf.asInstanceOf[AggregateWindowFunction]
val frame = aggWindowFunc.frame.asInstanceOf[SpecifiedWindowFrame]
val windowFunctionNode = ExpressionBuilder.makeWindowFunction(
Expand Down Expand Up @@ -795,6 +795,22 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
originalInputAttributes.asJava
)
windowExpressionNodes.add(windowFunctionNode)
case wf @ NTile(buckets: Expression) =>
val frame = wExpression.windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
val childrenNodeList = new JArrayList[ExpressionNode]()
val literal = buckets.asInstanceOf[Literal]
childrenNodeList.add(LiteralTransformer(literal).doTransform(args))
val windowFunctionNode = ExpressionBuilder.makeWindowFunction(
WindowFunctionsBuilder.create(args, wf).toInt,
childrenNodeList,
columnName,
ConverterUtils.getTypeNode(wf.dataType, wf.nullable),
frame.upper,
frame.lower,
frame.frameType.sql,
originalInputAttributes.asJava
)
windowExpressionNodes.add(windowFunctionNode)
case _ =>
throw new GlutenNotSupportException(
"unsupported window function type: " +
Expand Down Expand Up @@ -860,6 +876,15 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
GenericExpressionTransformer(substraitExprName, Seq(argument, function), expr)
}

/** Transform array sort to Substrait. */
override def genArraySortTransformer(
substraitExprName: String,
argument: ExpressionTransformer,
function: ExpressionTransformer,
expr: ArraySort): ExpressionTransformer = {
GenericExpressionTransformer(substraitExprName, Seq(argument, function), expr)
}

override def genPreProjectForGenerate(generate: GenerateExec): SparkPlan = generate

override def genPostProjectForGenerate(generate: GenerateExec): SparkPlan = generate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ class RangePartitionerBoundsGenerator[K: Ordering: ClassTag, V](
case d: DecimalType =>
val decimal = row.getDecimal(i, d.precision, d.scale).toString()
node.put("value", decimal)
case _: TimestampType => node.put("value", row.getLong(i))
case _ =>
throw new IllegalArgumentException(
s"Unsupported data type ${ordering.dataType.toString}")
Expand Down Expand Up @@ -244,6 +245,7 @@ object RangePartitionerBoundsGenerator {
case _: StringType => true
case _: DateType => true
case _: DecimalType => true
case _: TimestampType => true
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{"a":1,"b":2,"c":3}
{"a":"a5", "B":"b6", "c":7}
{"a":"4"}
{"t":{"ta":"cc","tb":100,"tc":1.234}}
{"t":{"ta":"cc","tb":100,"td":"ignore"}}
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,26 @@ class GlutenClickHouseDatetimeExpressionSuite
|""".stripMargin
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}

test("support range partition by timestamp") {
import testImplicits._
val df = Seq(
(1, Timestamp.valueOf("2015-07-22 10:01:40.123456")),
(2, Timestamp.valueOf("2014-12-31 05:29:06.123456")),
(3, Timestamp.valueOf("2015-07-22 16:01:40.123456")),
(4, Timestamp.valueOf("2012-02-29 23:01:40.123456"))
).toDF("i", "t")

df.createOrReplaceTempView("test")

val sql =
s"""
| select
| /** repartition(2) */
| *
| from test
| order by t
|""".stripMargin
compareResultsAgainstVanillaSpark(sql, compareResult = true, { _ => })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,22 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}

test("window ntile") {
val sql =
"""
| select n_regionkey, n_nationkey,
| first_value(n_nationkey) over (partition by n_regionkey order by n_nationkey) as
| first_v,
| ntile(4) over (partition by n_regionkey order by n_nationkey) as ntile_v
| from
| (
| select n_regionkey, if(n_nationkey = 1, null, n_nationkey) as n_nationkey from nation
| ) as t
| order by n_regionkey, n_nationkey
""".stripMargin
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}

test("window first value with nulls") {
val sql =
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ object VeloxBackendSettings extends BackendSettingsApi {

override def supportTransformWriteFiles: Boolean = true

override def allowDecimalArithmetic: Boolean = SQLConf.get.decimalOperationsAllowPrecisionLoss
override def allowDecimalArithmetic: Boolean = true

override def enableNativeWriteFiles(): Boolean = {
GlutenConfig.getConf.enableNativeWriter.getOrElse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class VeloxListenerApi extends ListenerApi {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Anolis") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (system.contains("tencentos") && system.contains("2.4")) {
new SharedLibraryLoaderCentos7
} else if (system.contains("tencentos") && system.contains("3.2")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("9")) {
Expand All @@ -94,7 +96,7 @@ class VeloxListenerApi extends ListenerApi {
throw new GlutenException(
s"Found unsupported OS($systemName, $systemVersion)! Currently, Gluten's Velox backend" +
" only supports Ubuntu 20.04/22.04, CentOS 7/8, " +
"Alibaba Cloud Linux 2/3 & Anolis 7/8, tencentos 3.2, RedHat 7/8/9, " +
"Alibaba Cloud Linux 2/3 & Anolis 7/8, tencentos 2.4/3.2, RedHat 7/8, " +
"Debian 11/12.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.exception.GlutenException
Expand Down Expand Up @@ -56,9 +57,7 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBas
val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
val convertTime = longMetric("convertTime")
// Instead of creating a new config we are reusing columnBatchSize. In the future if we do
// combine with some of the Arrow conversion tools we will need to unify some of the configs.
val numRows = conf.columnBatchSize
val numRows = GlutenConfig.getConf.maxBatchSize
// This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
// plan (this) in the closure.
val localSchema = schema
Expand All @@ -78,9 +77,7 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBas
val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
val convertTime = longMetric("convertTime")
// Instead of creating a new config we are reusing columnBatchSize. In the future if we do
// combine with some of the Arrow conversion tools we will need to unify some of the configs.
val numRows = conf.columnBatchSize
val numRows = GlutenConfig.getConf.maxBatchSize
val mode = BroadcastUtils.getBroadcastMode(outputPartitioning)
val relation = child.executeBroadcast()
BroadcastUtils.sparkToVeloxUnsafe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import org.apache.arrow.vector.types.pojo.Schema

import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}

class VeloxColumnarBatchIterator(schema: Schema, allocator: BufferAllocator)
class VeloxColumnarBatchIterator(schema: Schema, allocator: BufferAllocator, queueSize: Int)
extends Iterator[ColumnarBatch]
with AutoCloseable {
private val writeQueue = new ArrayBlockingQueue[ColumnarBatch](64)
private val writeQueue = new ArrayBlockingQueue[ColumnarBatch](queueSize)
private var currentBatch: Option[ColumnarBatch] = None

def enqueue(batch: ColumnarBatch): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ class VeloxWriteQueue(
schema: Schema,
allocator: BufferAllocator,
datasourceJniWrapper: DatasourceJniWrapper,
outputFileURI: String)
outputFileURI: String,
queueSize: Int)
extends AutoCloseable {
private val scanner = new VeloxColumnarBatchIterator(schema, allocator)
private val scanner = new VeloxColumnarBatchIterator(schema, allocator, queueSize)
private val writeException = new AtomicReference[Throwable]

private val writeThread = new Thread(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.velox

import org.apache.gluten.GlutenConfig
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.datasource.DatasourceJniWrapper
import org.apache.gluten.exception.GlutenException
Expand Down Expand Up @@ -73,14 +74,18 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase {
cSchema.close()
}

// FIXME: remove this once we support push-based write.
val queueSize = context.getConfiguration.getInt(GlutenConfig.VELOX_WRITER_QUEUE_SIZE.key, 64)

val writeQueue =
new VeloxWriteQueue(
TaskResources.getLocalTaskContext(),
dsHandle,
arrowSchema,
allocator,
datasourceJniWrapper,
filePath)
filePath,
queueSize)

new OutputWriter {
override def write(row: InternalRow): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1911,4 +1911,10 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
val resultLength = df.collect().length
assert(resultLength > 25000 && resultLength < 35000)
}

test("Deduplicate sorting keys") {
runQueryAndCompare("select * from lineitem order by l_orderkey, l_orderkey") {
checkGlutenOperatorMatch[SortExecTransformer]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,32 @@ abstract class VeloxAggregateFunctionsSuite extends VeloxWholeStageTransformerSu
df.select(max(col("txn"))).collect

}

test("drop redundant partial sort which has pre-project when offload sortAgg") {
// Spark 3.2 does not have this configuration, but it does not affect the test results.
withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") {
withTempView("t1") {
Seq((-1, 2), (-1, 3), (2, 3), (3, 4), (-3, 5), (4, 5))
.toDF("c1", "c2")
.createOrReplaceTempView("t1")
runQueryAndCompare("select c2, sum(if(c1<0,0,c1)) from t1 group by c2") {
df =>
{
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[HashAggregateExecTransformer]
}) == 2)
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[SortExecTransformer]
}) == 0)
}
}
}
}
}
}

class VeloxAggregateFunctionsDefaultSuite extends VeloxAggregateFunctionsSuite {
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20240704
CH_COMMIT=f617655ccea
CH_BRANCH=rebase_ch/20240706
CH_COMMIT=25bf31bfbdf

1 change: 1 addition & 0 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ void BackendInitializerUtil::initSettings(std::map<std::string, std::string> & b
settings.set("input_format_parquet_import_nested", true);
settings.set("input_format_json_read_numbers_as_strings", true);
settings.set("input_format_json_read_bools_as_numbers", false);
settings.set("input_format_json_ignore_key_case", true);
settings.set("input_format_csv_trim_whitespaces", false);
settings.set("input_format_csv_allow_cr_end_of_line", true);
settings.set("output_format_orc_string_as_string", true);
Expand Down
Loading

0 comments on commit 0c12d48

Please sign in to comment.