diff --git a/.github/workflows/velox_backend.yml b/.github/workflows/velox_backend.yml index b0744c1ed717..a4fd3e440365 100644 --- a/.github/workflows/velox_backend.yml +++ b/.github/workflows/velox_backend.yml @@ -346,16 +346,16 @@ jobs: -d=FLUSH_MODE:DISABLED,spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \ -d=FLUSH_MODE:ABANDONED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \ -d=FLUSH_MODE:FLUSHED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=0.05,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=0.1,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 - - name: TPC-DS SF30.0 Parquet local spark3.2 Q97 low memory - run: | - cd tools/gluten-it \ - && GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \ - --local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q97 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \ - --data-gen=skip -m=OffHeapExecutionMemory \ - -d=ISOLATION:OFF,spark.gluten.memory.isolation=false \ - -d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1 \ - -d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \ - -d=OFFHEAP_SIZE:1g,spark.memory.offHeap.size=1g || true + # - name: TPC-DS SF30.0 Parquet local spark3.2 Q97 low memory + # run: | + # cd tools/gluten-it \ + # && GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \ + # --local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q97 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \ + # --data-gen=skip -m=OffHeapExecutionMemory \ + # -d=ISOLATION:OFF,spark.gluten.memory.isolation=false \ + # -d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1 \ + # -d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \ + # -d=OFFHEAP_SIZE:1g,spark.memory.offHeap.size=1g || true run-tpc-test-ubuntu-randomkill: needs: build-native-lib-centos-7 diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/memory/CHThreadGroup.java b/backends-clickhouse/src/main/java/org/apache/gluten/memory/CHThreadGroup.java index a06c552a9f6b..119dc61893d9 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/memory/CHThreadGroup.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/memory/CHThreadGroup.java @@ -17,8 +17,8 @@ package org.apache.gluten.memory; import org.apache.spark.TaskContext; -import org.apache.spark.util.TaskResource; -import org.apache.spark.util.TaskResources; +import org.apache.spark.task.TaskResource; +import org.apache.spark.task.TaskResources; public class CHThreadGroup implements TaskResource { diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java index 1c4c1302d2ae..d53ff1718882 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java @@ -47,7 +47,8 @@ private static native long nativeBuild( int joinType, boolean hasMixedFiltCondition, boolean isExistenceJoin, - byte[] namedStruct); + byte[] namedStruct, + boolean isNullAwareAntiJoin); private StorageJoinBuilder() {} @@ -94,7 +95,8 @@ public static long build( joinType, broadCastContext.hasMixedFiltCondition(), broadCastContext.isExistenceJoin(), - toNameStruct(output).toByteArray()); + toNameStruct(output).toByteArray(), + broadCastContext.isNullAwareAntiJoin()); } /** create table named struct */ diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala index 9b6b2958ccc7..41def4d42c4e 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala @@ -193,7 +193,8 @@ case class BroadCastHashJoinContext( hasMixedFiltCondition: Boolean, isExistenceJoin: Boolean, buildSideStructure: Seq[Attribute], - buildHashTableId: String) + buildHashTableId: String, + isNullAwareAntiJoin: Boolean = false) case class CHBroadcastHashJoinExecTransformer( leftKeys: Seq[Expression], @@ -230,9 +231,6 @@ case class CHBroadcastHashJoinExecTransformer( if (shouldFallback) { return ValidationResult.failed("ch join validate fail") } - if (isNullAwareAntiJoin) { - return ValidationResult.failed("ch does not support NAAJ") - } super.doValidateInternal() } @@ -256,7 +254,9 @@ case class CHBroadcastHashJoinExecTransformer( isMixedCondition(condition), joinType.isInstanceOf[ExistenceJoin], buildPlan.output, - buildHashTableId) + buildHashTableId, + isNullAwareAntiJoin + ) val broadcastRDD = CHBroadcastBuildSideRDD(sparkContext, broadcast, context) // FIXME: Do we have to make build side a RDD? streamedRDD :+ broadcastRDD diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala index 4c49cc2d9f46..1b79e296595f 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala @@ -99,7 +99,7 @@ class GlutenClickHouseColumnarMemorySortShuffleSuite } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala index e5da78de3fd6..d70c019c0cca 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala @@ -137,7 +137,7 @@ class GlutenClickHouseColumnarShuffleAQESuite } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2ColumnarShuffleSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2ColumnarShuffleSuite.scala index 17bd9912b032..fef884a07b51 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2ColumnarShuffleSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2ColumnarShuffleSuite.scala @@ -124,7 +124,7 @@ class GlutenClickHouseDSV2ColumnarShuffleSuite extends GlutenClickHouseTPCHAbstr } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2Suite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2Suite.scala index a58b6b1c1292..9c71d6f34f3b 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2Suite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2Suite.scala @@ -106,7 +106,7 @@ class GlutenClickHouseDSV2Suite extends GlutenClickHouseTPCHAbstractSuite { } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala index bd831e64bf38..40f442bc2948 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala @@ -343,22 +343,18 @@ class GlutenClickHouseDecimalSuite decimalTPCHTables.foreach { dt => { - val fallBack = (sql_num == 16) val compareResult = !dt._2.contains(sql_num) - val native = if (fallBack) "fallback" else "native" val compare = if (compareResult) "compare" else "noCompare" val PrecisionLoss = s"allowPrecisionLoss=$allowPrecisionLoss" val decimalType = dt._1 test(s"""TPCH Decimal(${decimalType.precision},${decimalType.scale}) - | Q$sql_num[$PrecisionLoss,$native,$compare]""".stripMargin) { + | Q$sql_num[$PrecisionLoss,native,$compare]""".stripMargin) { spark.sql(s"use decimal_${decimalType.precision}_${decimalType.scale}") withSQLConf( (SQLConf.DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key, allowPrecisionLoss)) { - runTPCHQuery( - sql_num, - tpchQueries, - compareResult = compareResult, - noFallBack = !fallBack) { _ => {} } + runTPCHQuery(sql_num, tpchQueries, compareResult = compareResult) { + _ => {} + } } spark.sql(s"use default") } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableColumnarShuffleSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableColumnarShuffleSuite.scala index 5f9aa0dbda60..d6cf33a43420 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableColumnarShuffleSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableColumnarShuffleSuite.scala @@ -147,7 +147,7 @@ class GlutenClickHouseTPCHNullableColumnarShuffleSuite extends GlutenClickHouseT } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala index e0e4d3380450..d5e1156ba9d2 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala @@ -150,7 +150,7 @@ class GlutenClickHouseTPCHNullableSuite extends GlutenClickHouseTPCHAbstractSuit } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala index f25a1313255f..93c13d15c6ce 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala @@ -151,7 +151,7 @@ class GlutenClickHouseTPCHSuite extends GlutenClickHouseTPCHAbstractSuite { } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala index 79017eab0d51..3e3899b0e115 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala @@ -23,7 +23,7 @@ import org.apache.gluten.vectorized.GeneralInIterator import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.InputIteratorTransformer -import org.apache.spark.util.TaskResources +import org.apache.spark.task.TaskResources import scala.collection.JavaConverters._ diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala index 509967125a64..a5d81b781b32 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala @@ -23,7 +23,7 @@ import org.apache.gluten.vectorized.GeneralInIterator import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.InputIteratorTransformer -import org.apache.spark.util.TaskResources +import org.apache.spark.task.TaskResources import scala.collection.JavaConverters._ diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala index 39dc7baf9695..28b874e21bac 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala @@ -251,7 +251,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala index 2aadac05d348..5f9f01fb697a 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala @@ -185,7 +185,7 @@ class GlutenClickHouseTPCHParquetAQESuite } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala index e21df203dac0..1db37e00f946 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala @@ -311,7 +311,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr // see issue https://github.com/Kyligence/ClickHouse/issues/93 test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { @@ -2797,5 +2797,144 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr |""".stripMargin compareResultsAgainstVanillaSpark(sql, true, { _ => }) } + + test("GLUTEN-341: Support BHJ + isNullAwareAntiJoin for the CH backend") { + def checkBHJWithIsNullAwareAntiJoin(df: DataFrame): Unit = { + val bhjs = df.queryExecution.executedPlan.collect { + case bhj: CHBroadcastHashJoinExecTransformer if bhj.isNullAwareAntiJoin => true + } + assert(bhjs.size == 1) + } + + val sql = + s""" + |SELECT + | p_brand, + | p_type, + | p_size, + | count(DISTINCT ps_suppkey) AS supplier_cnt + |FROM + | partsupp, + | part + |WHERE + | p_partkey = ps_partkey + | AND p_brand <> 'Brand#45' + | AND p_type NOT LIKE 'MEDIUM POLISHED%' + | AND p_size IN (49, 14, 23, 45, 19, 3, 36, 9) + | AND ps_suppkey NOT IN ( + | SELECT + | s_suppkey + | FROM + | supplier + | WHERE + | s_comment is null) + |GROUP BY + | p_brand, + | p_type, + | p_size + |ORDER BY + | supplier_cnt DESC, + | p_brand, + | p_type, + | p_size; + |""".stripMargin + compareResultsAgainstVanillaSpark( + sql, + true, + df => { + checkBHJWithIsNullAwareAntiJoin(df) + }) + + val sql1 = + s""" + |SELECT + | p_brand, + | p_type, + | p_size, + | count(DISTINCT ps_suppkey) AS supplier_cnt + |FROM + | partsupp, + | part + |WHERE + | p_partkey = ps_partkey + | AND p_brand <> 'Brand#45' + | AND p_type NOT LIKE 'MEDIUM POLISHED%' + | AND p_size IN (49, 14, 23, 45, 19, 3, 36, 9) + | AND ps_suppkey NOT IN ( + | SELECT + | s_suppkey + | FROM + | supplier + | WHERE + | s_comment LIKE '%Customer%Complaints11%') + |GROUP BY + | p_brand, + | p_type, + | p_size + |ORDER BY + | supplier_cnt DESC, + | p_brand, + | p_type, + | p_size; + |""".stripMargin + compareResultsAgainstVanillaSpark( + sql1, + true, + df => { + checkBHJWithIsNullAwareAntiJoin(df) + }) + + val sql2 = + s""" + |select * from partsupp + |where + |ps_suppkey NOT IN (SELECT suppkey FROM VALUES (50), (null) sub(suppkey)) + |""".stripMargin + compareResultsAgainstVanillaSpark( + sql2, + true, + df => { + checkBHJWithIsNullAwareAntiJoin(df) + }) + + val sql3 = + s""" + |select * from partsupp + |where + |ps_suppkey NOT IN (SELECT suppkey FROM VALUES (50) sub(suppkey) WHERE suppkey > 100) + |""".stripMargin + compareResultsAgainstVanillaSpark( + sql3, + true, + df => { + checkBHJWithIsNullAwareAntiJoin(df) + }) + + val sql4 = + s""" + |select * from partsupp + |where + |ps_suppkey NOT IN (SELECT suppkey FROM VALUES (50), (60) sub(suppkey)) + |""".stripMargin + compareResultsAgainstVanillaSpark( + sql4, + true, + df => { + checkBHJWithIsNullAwareAntiJoin(df) + }) + + val sql5 = + s""" + |select * from partsupp + |where + |ps_suppkey NOT IN (SELECT suppkey FROM VALUES (null) sub(suppkey)) + |""".stripMargin + compareResultsAgainstVanillaSpark( + sql5, + true, + df => { + checkBHJWithIsNullAwareAntiJoin(df) + }) + } } // scalastyle:on line.size.limit diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala index 37d8acc7f2b7..7cc4f5e8100a 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} import org.apache.spark.sql.types._ -import org.apache.spark.util.TaskResources +import org.apache.spark.task.TaskResources import org.apache.spark.util.collection.BitSet import com.google.protobuf.{Any, Message} diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala index 131951a96fc8..e15270f47304 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types._ -import org.apache.spark.util.TaskResources +import org.apache.spark.task.TaskResources class VeloxValidatorApi extends ValidatorApi { diff --git a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala index 4af5022a6252..c930cebebe69 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala @@ -33,7 +33,8 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.{SerializableConfiguration, TaskResources} +import org.apache.spark.task.TaskResources +import org.apache.spark.util.SerializableConfiguration import org.apache.arrow.c.ArrowSchema import org.apache.arrow.vector.types.pojo.Schema diff --git a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala index 02485975e705..3eaf4e35fd21 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap -import org.apache.spark.util.TaskResources +import org.apache.spark.task.TaskResources import org.apache.hadoop.fs.FileStatus diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala index aa30cc80d4db..542c325f095e 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala @@ -34,8 +34,8 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.utils.SparkArrowUtil import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.task.TaskResources import org.apache.spark.unsafe.Platform -import org.apache.spark.util.TaskResources import org.apache.arrow.c.ArrowSchema import org.apache.arrow.memory.ArrowBuf diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala index 8c2a7e492a49..05e1e3eb481f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper import org.apache.spark.sql.types.DataType -import org.apache.spark.util.TaskResources +import org.apache.spark.task.TaskResources /** * Velox's bloom-filter implementation uses different algorithms internally comparing to vanilla diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxBloomFilterAggregate.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxBloomFilterAggregate.scala index d22cc7023abf..976abb9e21fb 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxBloomFilterAggregate.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxBloomFilterAggregate.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggreg import org.apache.spark.sql.catalyst.trees.TernaryLike import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.DataType -import org.apache.spark.util.TaskResources +import org.apache.spark.task.TaskResources import org.apache.spark.util.sketch.BloomFilter /** diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala index 29a12f532bea..5bc6b7c56da5 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPar import org.apache.spark.sql.execution.joins.{HashedRelation, HashedRelationBroadcastMode, LongHashedRelation} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.TaskResources +import org.apache.spark.task.TaskResources import scala.collection.mutable.ArrayBuffer; diff --git a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java index 54994ccd4836..819d35a100c3 100644 --- a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java +++ b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java @@ -24,7 +24,7 @@ import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.vectorized.ColumnarBatch; -import org.apache.spark.util.TaskResources$; +import org.apache.spark.task.TaskResources$; import org.junit.Assert; import org.junit.Test; diff --git a/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java b/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java index cf568b166582..86b14fa46496 100644 --- a/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java +++ b/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java @@ -18,7 +18,7 @@ import org.apache.gluten.test.VeloxBackendTestBase; -import org.apache.spark.util.TaskResources$; +import org.apache.spark.task.TaskResources$; import org.apache.spark.util.sketch.BloomFilter; import org.apache.spark.util.sketch.IncompatibleMergeException; import org.junit.Assert; diff --git a/backends-velox/src/test/java/org/apache/gluten/vectorized/ArrowColumnVectorTest.java b/backends-velox/src/test/java/org/apache/gluten/vectorized/ArrowColumnVectorTest.java index 11330544df78..c7c13450ce88 100644 --- a/backends-velox/src/test/java/org/apache/gluten/vectorized/ArrowColumnVectorTest.java +++ b/backends-velox/src/test/java/org/apache/gluten/vectorized/ArrowColumnVectorTest.java @@ -19,7 +19,7 @@ import org.apache.spark.sql.execution.vectorized.MutableColumnarRow; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.StructType; -import org.apache.spark.util.TaskResources$; +import org.apache.spark.task.TaskResources$; import org.junit.Assert; import org.junit.Test; diff --git a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp index da301dcb89f8..0b88137b328b 100644 --- a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp +++ b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp @@ -115,7 +115,8 @@ std::shared_ptr buildJoin( jint join_type, bool has_mixed_join_condition, bool is_existence_join, - const std::string & named_struct) + const std::string & named_struct, + bool is_null_aware_anti_join) { auto join_key_list = Poco::StringTokenizer(join_keys, ","); Names key_names; @@ -191,7 +192,8 @@ std::shared_ptr buildJoin( columns_description, ConstraintsDescription(), key, - true); + true, + is_null_aware_anti_join); } void init(JNIEnv * env) diff --git a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h index a97bd77a84d0..d089d7420a8a 100644 --- a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h +++ b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h @@ -38,7 +38,8 @@ std::shared_ptr buildJoin( jint join_type, bool has_mixed_join_condition, bool is_existence_join, - const std::string & named_struct); + const std::string & named_struct, + bool is_null_aware_anti_join); void cleanBuildHashTable(const std::string & hash_table_id, jlong instance); std::shared_ptr getJoin(const std::string & hash_table_id); diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp index 1d87b5f57a09..90baa754f31b 100644 --- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp +++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include @@ -72,8 +73,9 @@ StorageJoinFromReadBuffer::StorageJoinFromReadBuffer( const ColumnsDescription & columns, const ConstraintsDescription & constraints, const String & comment, - const bool overwrite_) - : key_names(key_names_), use_nulls(use_nulls_), row_count(row_count_), overwrite(overwrite_) + const bool overwrite_, + bool is_null_aware_anti_join_) + : key_names(key_names_), use_nulls(use_nulls_), row_count(row_count_), overwrite(overwrite_), is_null_aware_anti_join(is_null_aware_anti_join_) { storage_metadata.setColumns(columns); storage_metadata.setConstraints(constraints); @@ -104,8 +106,36 @@ void StorageJoinFromReadBuffer::buildJoin(Blocks & data, const Block header, std auto build_join = [&] { join = std::make_shared(analyzed_join, header, overwrite, row_count); - for (Block block : data) - join->addBlockToJoin(std::move(block), true); + // only when is_null_aware_anti_join is true, it needs to check whether is null key value exists + if (is_null_aware_anti_join) + { + is_empty_hash_table = data.empty(); + size_t total_size = 0; + for (Block block : data) + { + for (size_t i = 0; i < block.columns(); ++i) + { + const auto & column = block.getByPosition(i); + if (column.name == key_names.at(0)) + { + if (const auto * nullable = checkAndGetColumn(column.column.get())) + { + const auto & null_map_data = nullable->getNullMapData(); + // check whether there is null key value + has_null_key_value = !DB::memoryIsZero(null_map_data.data(), 0, null_map_data.size()); + } + } + } + total_size += block.rows(); + join->addBlockToJoin(std::move(block), true); + } + is_empty_hash_table = (total_size < 1); + } + else + { + for (Block block : data) + join->addBlockToJoin(std::move(block), true); + } }; /// Record memory usage in Total Memory Tracker ThreadFromGlobalPoolNoTracingContextPropagation thread(build_join); diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h index d9766c4392f4..8c9416b54457 100644 --- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h +++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h @@ -46,7 +46,11 @@ class StorageJoinFromReadBuffer const DB::ColumnsDescription & columns_, const DB::ConstraintsDescription & constraints_, const String & comment, - bool overwrite_); + bool overwrite_, + bool is_null_aware_anti_join_); + + bool has_null_key_value = false; + bool is_empty_hash_table = false; /// The columns' names in right_header may be different from the names in the ColumnsDescription /// in the constructor. @@ -64,6 +68,7 @@ class StorageJoinFromReadBuffer std::shared_mutex join_mutex; std::list input_blocks; std::shared_ptr join = nullptr; + bool is_null_aware_anti_join; void readAllBlocksFromInput(DB::ReadBuffer & in); void buildJoin(DB::Blocks & data, const DB::Block header, std::shared_ptr analyzed_join); diff --git a/cpp-ch/local-engine/Operator/EarlyStopStep.cpp b/cpp-ch/local-engine/Operator/EarlyStopStep.cpp new file mode 100644 index 000000000000..ff148a203b47 --- /dev/null +++ b/cpp-ch/local-engine/Operator/EarlyStopStep.cpp @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} +} + +namespace local_engine +{ +static DB::ITransformingStep::Traits getTraits() +{ + return DB::ITransformingStep::Traits + { + { + .preserves_number_of_streams = false, + .preserves_sorting = false, + }, + { + .preserves_number_of_rows = false, + } + }; +} + +EarlyStopStep::EarlyStopStep( + const DB::DataStream & input_stream_) + : DB::ITransformingStep( + input_stream_, transformHeader(input_stream_.header), getTraits()) +{ +} + +DB::Block EarlyStopStep::transformHeader(const DB::Block& input) +{ + return input.cloneEmpty(); +} + +void EarlyStopStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings & /*settings*/) +{ + pipeline.addSimpleTransform( + [&](const DB::Block & header) + { + return std::make_shared(header); + }); +} + +void EarlyStopStep::describeActions(DB::IQueryPlanStep::FormatSettings & settings) const +{ + if (!processors.empty()) + DB::IQueryPlanStep::describePipeline(processors, settings); +} + +void EarlyStopStep::updateOutputStream() +{ + output_stream = createOutputStream(input_streams.front(), transformHeader(input_streams.front().header), getDataStreamTraits()); +} + +EarlyStopTransform::EarlyStopTransform(const DB::Block &header_) + : DB::IProcessor({header_}, {EarlyStopStep::transformHeader(header_)}) +{ +} + +EarlyStopTransform::Status EarlyStopTransform::prepare() +{ + auto & output = outputs.front(); + auto & input = inputs.front(); + if (!input.isFinished()) + { + input.close(); + } + output.finish(); + return Status::Finished; +} + +void EarlyStopTransform::work() +{ +} +} diff --git a/cpp-ch/local-engine/Operator/EarlyStopStep.h b/cpp-ch/local-engine/Operator/EarlyStopStep.h new file mode 100644 index 000000000000..becfd46b83f2 --- /dev/null +++ b/cpp-ch/local-engine/Operator/EarlyStopStep.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include + +namespace local_engine +{ +/// This step will return empty block. +class EarlyStopStep : public DB::ITransformingStep +{ +public: + explicit EarlyStopStep( + const DB::DataStream & input_stream_); + ~EarlyStopStep() override = default; + + String getName() const override { return "EarlyStopStep"; } + + static DB::Block transformHeader(const DB::Block& input); + + void transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) override; + + void describeActions(DB::IQueryPlanStep::FormatSettings & settings) const override; +private: + void updateOutputStream() override; +}; + +class EarlyStopTransform : public DB::IProcessor +{ +public: + using Status = DB::IProcessor::Status; + explicit EarlyStopTransform(const DB::Block &header_); + ~EarlyStopTransform() override = default; + + Status prepare() override; + void work() override; + String getName() const override { return "EarlyStopTransform"; } +private: + DB::Block header; +}; +} diff --git a/cpp-ch/local-engine/Parser/JoinRelParser.cpp b/cpp-ch/local-engine/Parser/JoinRelParser.cpp index 2959e1986875..6f8877523b13 100644 --- a/cpp-ch/local-engine/Parser/JoinRelParser.cpp +++ b/cpp-ch/local-engine/Parser/JoinRelParser.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -268,6 +269,30 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const substrait::JoinRel & join, DB::Q if (storage_join) { + if (join_opt_info.is_null_aware_anti_join && join.type() == substrait::JoinRel_JoinType_JOIN_TYPE_ANTI) + { + if (storage_join->has_null_key_value) + { + // if there is a null key value on the build side, it will return the empty result + auto empty_step = std::make_unique(left->getCurrentDataStream()); + left->addStep(std::move(empty_step)); + } + else if (!storage_join->is_empty_hash_table) + { + auto input_header = left->getCurrentDataStream().header; + DB::ActionsDAG filter_is_not_null_dag{input_header.getColumnsWithTypeAndName()}; + // when is_null_aware_anti_join is true, there is only one join key + const auto * key_field = filter_is_not_null_dag.getInputs()[join.expression().scalar_function().arguments().at(0).value().selection().direct_reference().struct_field().field()]; + + auto result_node = filter_is_not_null_dag.tryFindInOutputs(key_field->result_name); + // add a function isNotNull to filter the null key on the left side + const auto * cond_node = plan_parser->toFunctionNode(filter_is_not_null_dag, "isNotNull", {result_node}); + filter_is_not_null_dag.addOrReplaceInOutputs(*cond_node); + auto filter_step = std::make_unique(left->getCurrentDataStream(), std::move(filter_is_not_null_dag), cond_node->result_name, true); + left->addStep(std::move(filter_step)); + } + // other case: is_empty_hash_table, don't need to handle + } applyJoinFilter(*table_join, join, *left, *right, true); auto broadcast_hash_join = storage_join->getJoinLocked(table_join, context); diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index c80379a879f8..6ed9b4a43c44 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -1071,7 +1071,8 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild jint join_type_, jboolean has_mixed_join_condition, jboolean is_existence_join, - jbyteArray named_struct) + jbyteArray named_struct, + jboolean is_null_aware_anti_join) { LOCAL_ENGINE_JNI_METHOD_START const auto hash_table_id = jstring2string(env, key); @@ -1084,7 +1085,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild DB::CompressedReadBuffer input(read_buffer_from_java_array); local_engine::configureCompressedReadBuffer(input); const auto * obj = make_wrapper(local_engine::BroadCastJoinBuilder::buildJoin( - hash_table_id, input, row_count_, join_key, join_type_, has_mixed_join_condition, is_existence_join, struct_string)); + hash_table_id, input, row_count_, join_key, join_type_, has_mixed_join_condition, is_existence_join, struct_string, is_null_aware_anti_join)); return obj->instance(); LOCAL_ENGINE_JNI_METHOD_END(env, 0) } diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index 9311b71ed83e..4c84d15f3a77 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -38,14 +38,14 @@ endif() set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake" ${CMAKE_MODULE_PATH}) +set(GLUTEN_PROTO_SRC_DIR + ${GLUTEN_HOME}/gluten-core/src/main/resources/org/apache/gluten/proto) +message(STATUS "Set Gluten Proto Directory in ${GLUTEN_PROTO_SRC_DIR}") + set(SUBSTRAIT_PROTO_SRC_DIR ${GLUTEN_HOME}/gluten-substrait/src/main/resources/substrait/proto) message(STATUS "Set Substrait Proto Directory in ${SUBSTRAIT_PROTO_SRC_DIR}") -set(GLUTEN_PROTO_SRC_DIR - ${GLUTEN_HOME}/gluten-substrait/src/main/resources/org/apache/gluten/proto) -message(STATUS "Set Gluten Proto Directory in ${GLUTEN_PROTO_SRC_DIR}") - find_program(CCACHE_FOUND ccache) if(CCACHE_FOUND) set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ccache) diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index f6394f850231..1ef2c848b102 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -17,7 +17,7 @@ set -exu VELOX_REPO=https://github.com/oap-project/velox.git -VELOX_BRANCH=2024_09_01 +VELOX_BRANCH=2024_09_03 VELOX_HOME="" OS=`uname -s` diff --git a/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarMemorySortShuffleSuite.scala b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarMemorySortShuffleSuite.scala index ef1c4180b11b..ee7657c505ac 100644 --- a/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarMemorySortShuffleSuite.scala +++ b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarMemorySortShuffleSuite.scala @@ -107,7 +107,7 @@ class GlutenClickHouseRSSColumnarMemorySortShuffleSuite } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarShuffleAQESuite.scala b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarShuffleAQESuite.scala index 50220264e063..00f3bee8eb7b 100644 --- a/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarShuffleAQESuite.scala +++ b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarShuffleAQESuite.scala @@ -142,7 +142,7 @@ class GlutenClickHouseRSSColumnarShuffleAQESuite } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala index 696a3c3438b0..5355baed5b58 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.utils.SparkSchemaUtil import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.{TaskResource, TaskResources} +import org.apache.spark.task.{TaskResource, TaskResources} import org.apache.arrow.c.ArrowSchema import org.apache.arrow.memory.BufferAllocator diff --git a/gluten-core/pom.xml b/gluten-core/pom.xml index 08f5de5801a0..448bc9ee2d13 100644 --- a/gluten-core/pom.xml +++ b/gluten-core/pom.xml @@ -128,6 +128,28 @@ target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes + + + org.xolstice.maven.plugins + protobuf-maven-plugin + + + compile-gluten-proto + generate-sources + + compile + test-compile + + + + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + + src/main/resources/org/apache/gluten/proto + false + + + + org.apache.maven.plugins maven-resources-plugin diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/MemoryUsageRecorder.java b/gluten-core/src/main/java/org/apache/gluten/memory/MemoryUsageRecorder.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/MemoryUsageRecorder.java rename to gluten-core/src/main/java/org/apache/gluten/memory/MemoryUsageRecorder.java diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/MemoryUsageStatsBuilder.java b/gluten-core/src/main/java/org/apache/gluten/memory/MemoryUsageStatsBuilder.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/MemoryUsageStatsBuilder.java rename to gluten-core/src/main/java/org/apache/gluten/memory/MemoryUsageStatsBuilder.java diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/SimpleMemoryUsageRecorder.java b/gluten-core/src/main/java/org/apache/gluten/memory/SimpleMemoryUsageRecorder.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/SimpleMemoryUsageRecorder.java rename to gluten-core/src/main/java/org/apache/gluten/memory/SimpleMemoryUsageRecorder.java diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/KnownNameAndStats.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/KnownNameAndStats.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/KnownNameAndStats.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/KnownNameAndStats.java diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/LoggingMemoryTarget.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/LoggingMemoryTarget.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/LoggingMemoryTarget.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/LoggingMemoryTarget.java diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/MemoryTarget.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTarget.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/MemoryTarget.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTarget.java diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetUtil.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetUtil.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetUtil.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetUtil.java diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/NoopMemoryTarget.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/NoopMemoryTarget.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/NoopMemoryTarget.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/NoopMemoryTarget.java diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/OverAcquire.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/OverAcquire.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/OverAcquire.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/OverAcquire.java diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/Spiller.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/Spiller.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/Spiller.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/Spiller.java diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/Spillers.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/Spillers.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/Spillers.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/Spillers.java diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java similarity index 99% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java index 74e0cbb8779b..67edb713a7ce 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java @@ -20,7 +20,7 @@ import org.apache.spark.memory.SparkMemoryUtil; import org.apache.spark.sql.internal.SQLConf; -import org.apache.spark.util.TaskResources; +import org.apache.spark.task.TaskResources; import org.apache.spark.util.Utils; public class ThrowOnOomMemoryTarget implements MemoryTarget { diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/TreeMemoryTarget.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/TreeMemoryTarget.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/TreeMemoryTarget.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/TreeMemoryTarget.java diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/TreeMemoryTargets.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/TreeMemoryTargets.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/TreeMemoryTargets.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/TreeMemoryTargets.java diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/spark/RegularMemoryConsumer.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/RegularMemoryConsumer.java similarity index 99% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/spark/RegularMemoryConsumer.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/RegularMemoryConsumer.java index 76aa63aebb64..054b9ef106a7 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/spark/RegularMemoryConsumer.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/RegularMemoryConsumer.java @@ -30,7 +30,7 @@ import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.MemoryMode; import org.apache.spark.memory.TaskMemoryManager; -import org.apache.spark.util.TaskResources; +import org.apache.spark.task.TaskResources; import java.util.Map; import java.util.stream.Collectors; diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumer.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumer.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumer.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumer.java diff --git a/gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java similarity index 100% rename from gluten-substrait/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/utils/TaskListener.scala b/gluten-core/src/main/java/org/apache/gluten/task/TaskListener.scala similarity index 96% rename from gluten-substrait/src/main/scala/org/apache/gluten/utils/TaskListener.scala rename to gluten-core/src/main/java/org/apache/gluten/task/TaskListener.scala index 3ffb3206b900..d99c7e8856f7 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/utils/TaskListener.scala +++ b/gluten-core/src/main/java/org/apache/gluten/task/TaskListener.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.utils +package org.apache.gluten.task import org.apache.spark.TaskFailedReason diff --git a/gluten-substrait/src/main/resources/org/apache/gluten/proto/config.proto b/gluten-core/src/main/resources/org/apache/gluten/proto/config.proto similarity index 100% rename from gluten-substrait/src/main/resources/org/apache/gluten/proto/config.proto rename to gluten-core/src/main/resources/org/apache/gluten/proto/config.proto diff --git a/gluten-substrait/src/main/resources/org/apache/gluten/proto/memory.proto b/gluten-core/src/main/resources/org/apache/gluten/proto/memory.proto similarity index 100% rename from gluten-substrait/src/main/resources/org/apache/gluten/proto/memory.proto rename to gluten-core/src/main/resources/org/apache/gluten/proto/memory.proto diff --git a/gluten-substrait/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala similarity index 97% rename from gluten-substrait/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala rename to gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala index 6fd2d7a3ec10..d221fafce418 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala +++ b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.memory -import org.apache.gluten.memory.memtarget.{DynamicOffHeapSizingMemoryTarget, KnownNameAndStats, LoggingMemoryTarget, MemoryTarget, MemoryTargetVisitor, NoopMemoryTarget, OverAcquire, ThrowOnOomMemoryTarget, TreeMemoryTargets} +import org.apache.gluten.memory.memtarget._ import org.apache.gluten.memory.memtarget.spark.{RegularMemoryConsumer, TreeMemoryConsumer} import org.apache.gluten.proto.MemoryUsageStats diff --git a/gluten-substrait/src/main/scala/org/apache/spark/util/TaskResource.scala b/gluten-core/src/main/scala/org/apache/spark/task/TaskResource.scala similarity index 97% rename from gluten-substrait/src/main/scala/org/apache/spark/util/TaskResource.scala rename to gluten-core/src/main/scala/org/apache/spark/task/TaskResource.scala index 8dfa2110b53b..ca9bbd9dd981 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/util/TaskResource.scala +++ b/gluten-core/src/main/scala/org/apache/spark/task/TaskResource.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.util +package org.apache.spark.task /** * Manages the lifecycle for a specific type of memory resource managed by Spark. See also diff --git a/gluten-substrait/src/main/scala/org/apache/spark/util/TaskResources.scala b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala similarity index 95% rename from gluten-substrait/src/main/scala/org/apache/spark/util/TaskResources.scala rename to gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala index 2ab2a41a4f9b..b061aa332c74 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/util/TaskResources.scala +++ b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala @@ -14,15 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.util +package org.apache.spark.task + +import org.apache.gluten.task.TaskListener import org.apache.spark.{TaskContext, TaskFailedReason, TaskKilledException, UnknownReason} import org.apache.spark.internal.Logging import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{TaskCompletionListener, TaskFailureListener} import _root_.org.apache.gluten.memory.SimpleMemoryUsageRecorder import _root_.org.apache.gluten.sql.shims.SparkShimLoader -import _root_.org.apache.gluten.utils.TaskListener import java.util import java.util.{Collections, Properties, UUID} @@ -288,7 +290,7 @@ class TaskResourceRegistry extends Logging { } /** Release all managed resources according to priority and reversed order */ - private[util] def releaseAll(): Unit = lock { + private[task] def releaseAll(): Unit = lock { val table = new util.ArrayList(priorityToResourcesMapping.entrySet()) Collections.sort( table, @@ -310,7 +312,7 @@ class TaskResourceRegistry extends Logging { } /** Release single resource by ID */ - private[util] def releaseResource(id: String): Unit = lock { + private[task] def releaseResource(id: String): Unit = lock { if (!resources.containsKey(id)) { throw new IllegalArgumentException( String.format("TaskResource with ID %s is not registered", id)) @@ -328,7 +330,7 @@ class TaskResourceRegistry extends Logging { resources.remove(id) } - private[util] def addResourceIfNotRegistered[T <: TaskResource](id: String, factory: () => T): T = + private[task] def addResourceIfNotRegistered[T <: TaskResource](id: String, factory: () => T): T = lock { if (resources.containsKey(id)) { return resources.get(id).asInstanceOf[T] @@ -338,7 +340,7 @@ class TaskResourceRegistry extends Logging { resource } - private[util] def addResource[T <: TaskResource](id: String, resource: T): T = lock { + private[task] def addResource[T <: TaskResource](id: String, resource: T): T = lock { if (resources.containsKey(id)) { throw new IllegalArgumentException( String.format("TaskResource with ID %s is already registered", id)) @@ -347,11 +349,11 @@ class TaskResourceRegistry extends Logging { resource } - private[util] def isResourceRegistered(id: String): Boolean = lock { + private[task] def isResourceRegistered(id: String): Boolean = lock { resources.containsKey(id) } - private[util] def getResource[T <: TaskResource](id: String): T = lock { + private[task] def getResource[T <: TaskResource](id: String): T = lock { if (!resources.containsKey(id)) { throw new IllegalArgumentException( String.format("TaskResource with ID %s is not registered", id)) @@ -359,7 +361,7 @@ class TaskResourceRegistry extends Logging { resources.get(id).asInstanceOf[T] } - private[util] def getSharedUsage(): SimpleMemoryUsageRecorder = lock { + private[task] def getSharedUsage(): SimpleMemoryUsageRecorder = lock { sharedUsage } } diff --git a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVector.java b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVector.java index 7fe87e95fa54..3f09a3619b3a 100644 --- a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVector.java +++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVector.java @@ -16,7 +16,7 @@ */ package org.apache.gluten.columnarbatch; -import org.apache.spark.util.TaskResources; +import org.apache.spark.task.TaskResources; import java.util.concurrent.atomic.AtomicLong; diff --git a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java index 6e46742b564a..c122cc1cca48 100644 --- a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java +++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java @@ -16,7 +16,7 @@ */ package org.apache.gluten.columnarbatch; -import org.apache.spark.util.TaskResource; +import org.apache.spark.task.TaskResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java index f51852ab8d2b..4a57a0349916 100644 --- a/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java +++ b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java @@ -23,8 +23,8 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.spark.memory.TaskMemoryManager; -import org.apache.spark.util.TaskResource; -import org.apache.spark.util.TaskResources; +import org.apache.spark.task.TaskResource; +import org.apache.spark.task.TaskResources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java index 04a6e0002ade..a1f07c949eba 100644 --- a/gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java +++ b/gluten-data/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java @@ -17,8 +17,8 @@ package org.apache.gluten.memory.arrow.pool; import org.apache.arrow.dataset.jni.NativeMemoryPool; -import org.apache.spark.util.TaskResource; -import org.apache.spark.util.TaskResources; +import org.apache.spark.task.TaskResource; +import org.apache.spark.task.TaskResources; import org.apache.spark.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/gluten-data/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java b/gluten-data/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java index db5ac8426df0..b221db13e375 100644 --- a/gluten-data/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java +++ b/gluten-data/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java @@ -22,7 +22,7 @@ import org.apache.gluten.memory.memtarget.*; import org.apache.spark.memory.TaskMemoryManager; -import org.apache.spark.util.TaskResources; +import org.apache.spark.task.TaskResources; import java.util.Collections; import java.util.Map; diff --git a/gluten-data/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala b/gluten-data/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala index 5053cc2ba03d..1bf422b0a0f5 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala @@ -20,7 +20,7 @@ import org.apache.gluten.substrait.AggregationParams import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.SparkMetricsUtil -import org.apache.spark.util.TaskResources +import org.apache.spark.task.TaskResources trait HashAggregateMetricsUpdater extends MetricsUpdater { def updateAggregationMetrics( diff --git a/gluten-data/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala b/gluten-data/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala index fe1fa2ad6caf..9b8f03dd7f45 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala @@ -21,7 +21,7 @@ import org.apache.gluten.substrait.JoinParams import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.SparkMetricsUtil -import org.apache.spark.util.TaskResources +import org.apache.spark.task.TaskResources import java.util diff --git a/gluten-data/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala b/gluten-data/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala index 0ba48c62cdfe..cb01ae151035 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala @@ -18,7 +18,7 @@ package org.apache.gluten.metrics import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.SparkMetricsUtil -import org.apache.spark.util.TaskResources +import org.apache.spark.task.TaskResources class SortMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater { diff --git a/gluten-data/src/main/scala/org/apache/gluten/runtime/Runtime.scala b/gluten-data/src/main/scala/org/apache/gluten/runtime/Runtime.scala index 63db3e58e51f..b18d4044dd69 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/runtime/Runtime.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/runtime/Runtime.scala @@ -27,7 +27,7 @@ import org.apache.gluten.proto.MemoryUsageStats import org.apache.spark.memory.SparkMemoryUtil import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf} -import org.apache.spark.util.TaskResource +import org.apache.spark.task.TaskResource import org.slf4j.LoggerFactory diff --git a/gluten-data/src/main/scala/org/apache/gluten/runtime/Runtimes.scala b/gluten-data/src/main/scala/org/apache/gluten/runtime/Runtimes.scala index 6d5e11afeff5..bfb2465b12b9 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/runtime/Runtimes.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/runtime/Runtimes.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.runtime -import org.apache.spark.util.{TaskResource, TaskResources} +import org.apache.spark.task.{TaskResource, TaskResources} object Runtimes { diff --git a/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala b/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala index 0bd78cb92c5c..c9f0d06f5d06 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.utils.SparkSchemaUtil import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.{TaskResource, TaskResources} +import org.apache.spark.task.{TaskResource, TaskResources} import org.apache.arrow.c.ArrowSchema import org.apache.arrow.memory.BufferAllocator diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala index cb65dbca4db0..09805cc2f324 100644 --- a/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala +++ b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.joins.BuildSideRelation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.utils.SparkArrowUtil import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.TaskResources +import org.apache.spark.task.TaskResources import org.apache.arrow.c.ArrowSchema diff --git a/gluten-data/src/test/scala/org/apache/gluten/execution/MassiveMemoryAllocationSuite.scala b/gluten-data/src/test/scala/org/apache/gluten/execution/MassiveMemoryAllocationSuite.scala index ebfa0e6123fd..f53c99e5e035 100644 --- a/gluten-data/src/test/scala/org/apache/gluten/execution/MassiveMemoryAllocationSuite.scala +++ b/gluten-data/src/test/scala/org/apache/gluten/execution/MassiveMemoryAllocationSuite.scala @@ -22,7 +22,7 @@ import org.apache.gluten.memory.memtarget.{MemoryTarget, Spiller, Spillers} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.util.TaskResources +import org.apache.spark.task.TaskResources import java.util.concurrent.{Callable, Executors, TimeUnit} import java.util.concurrent.atomic.AtomicLong diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/GlutenPlugin.scala b/gluten-substrait/src/main/scala/org/apache/gluten/GlutenPlugin.scala index d3179f6bbd32..66bab72bf577 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/GlutenPlugin.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/GlutenPlugin.scala @@ -21,8 +21,8 @@ import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.events.GlutenBuildInfoEvent import org.apache.gluten.exception.GlutenException import org.apache.gluten.extension.GlutenSessionExtensions.{GLUTEN_SESSION_EXTENSION_NAME, SPARK_SESSION_EXTS_KEY} +import org.apache.gluten.task.TaskListener import org.apache.gluten.test.TestStats -import org.apache.gluten.utils.TaskListener import org.apache.spark.{HdfsConfGenerator, SparkConf, SparkContext, TaskFailedReason} import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} @@ -31,7 +31,8 @@ import org.apache.spark.listener.GlutenListenerFactory import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.execution.ui.GlutenEventUtils import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.{SparkResourceUtil, TaskResources} +import org.apache.spark.task.TaskResources +import org.apache.spark.util.SparkResourceUtil import java.util import java.util.Collections diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala b/gluten-substrait/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala index d981de8046a9..5df4f572c398 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala @@ -28,7 +28,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.{ColumnarToRowExec, LeafExecNode, SparkPlan} -import org.apache.spark.util.{SparkTaskUtil, TaskResources} +import org.apache.spark.task.TaskResources +import org.apache.spark.util.SparkTaskUtil import java.util.{Objects, Properties} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/utils/iterator/IteratorsV1.scala b/gluten-substrait/src/main/scala/org/apache/gluten/utils/iterator/IteratorsV1.scala index 3e9248c44458..1090c6944ae5 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/utils/iterator/IteratorsV1.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/utils/iterator/IteratorsV1.scala @@ -19,7 +19,7 @@ package org.apache.gluten.utils.iterator import org.apache.gluten.utils.iterator.Iterators.WrapperBuilder import org.apache.spark.{InterruptibleIterator, TaskContext} -import org.apache.spark.util.TaskResources +import org.apache.spark.task.TaskResources import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean diff --git a/gluten-substrait/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java b/gluten-substrait/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java index bbc43ba5dea9..befe449186e7 100644 --- a/gluten-substrait/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java +++ b/gluten-substrait/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java @@ -24,7 +24,7 @@ import org.apache.spark.TaskContext; import org.apache.spark.sql.internal.SQLConf; -import org.apache.spark.util.TaskResources$; +import org.apache.spark.task.TaskResources$; import org.junit.Assert; import org.junit.Before; import org.junit.Test; diff --git a/gluten-substrait/src/test/scala/org/apache/gluten/utils/TaskResourceSuite.scala b/gluten-substrait/src/test/scala/org/apache/gluten/utils/TaskResourceSuite.scala index 0c91b4faf004..47ffc8812d1c 100644 --- a/gluten-substrait/src/test/scala/org/apache/gluten/utils/TaskResourceSuite.scala +++ b/gluten-substrait/src/test/scala/org/apache/gluten/utils/TaskResourceSuite.scala @@ -19,7 +19,8 @@ package org.apache.gluten.utils import org.apache.spark.memory.{MemoryConsumer, MemoryMode} import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.{SparkTaskUtil, TaskResource, TaskResources} +import org.apache.spark.task.{TaskResource, TaskResources} +import org.apache.spark.util.SparkTaskUtil import org.scalatest.funsuite.AnyFunSuite diff --git a/gluten-substrait/src/test/scala/org/apache/gluten/utils/iterator/IteratorSuite.scala b/gluten-substrait/src/test/scala/org/apache/gluten/utils/iterator/IteratorSuite.scala index 1a84d671922d..9a70f235a339 100644 --- a/gluten-substrait/src/test/scala/org/apache/gluten/utils/iterator/IteratorSuite.scala +++ b/gluten-substrait/src/test/scala/org/apache/gluten/utils/iterator/IteratorSuite.scala @@ -18,7 +18,7 @@ package org.apache.gluten.utils.iterator import org.apache.gluten.utils.iterator.Iterators.{V1, WrapperBuilder} -import org.apache.spark.util.TaskResources +import org.apache.spark.task.TaskResources import org.scalatest.funsuite.AnyFunSuite diff --git a/gluten-substrait/src/test/scala/org/apache/spark/utils/iterator/IteratorBenchmark.scala b/gluten-substrait/src/test/scala/org/apache/spark/utils/iterator/IteratorBenchmark.scala index aa69f309aac8..31f4848f3863 100644 --- a/gluten-substrait/src/test/scala/org/apache/spark/utils/iterator/IteratorBenchmark.scala +++ b/gluten-substrait/src/test/scala/org/apache/spark/utils/iterator/IteratorBenchmark.scala @@ -20,7 +20,7 @@ import org.apache.gluten.utils.iterator.Iterators import org.apache.gluten.utils.iterator.Iterators.V1 import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} -import org.apache.spark.util.TaskResources +import org.apache.spark.task.TaskResources object IteratorBenchmark extends BenchmarkBase { diff --git a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index c096603dee3b..90d1dbae89e4 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1161,6 +1161,7 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE") .exclude("SPARK-31220, SPARK-32056: repartition by range with AQE") .exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with AQE") + .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") .exclude("SPARK-32753: Only copy tags to node with no tags") .exclude("Logging plan changes for AQE") .exclude("SPARK-33551: Do not use AQE shuffle read for repartition") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index ca7f19a0c58d..1115af3164eb 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1182,6 +1182,7 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE") .exclude("SPARK-31220, SPARK-32056: repartition by range with AQE") .exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with AQE") + .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") .exclude("SPARK-32753: Only copy tags to node with no tags") .exclude("Logging plan changes for AQE") .exclude("SPARK-33551: Do not use AQE shuffle read for repartition") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index f0c1151b9a51..9ededd6c5370 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1027,6 +1027,7 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE") .exclude("SPARK-31220, SPARK-32056: repartition by range with AQE") .exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with AQE") + .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") .exclude("SPARK-32753: Only copy tags to node with no tags") .exclude("Logging plan changes for AQE") .exclude("SPARK-33551: Do not use AQE shuffle read for repartition") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 9fb1f2d34f76..fd1326cb1c15 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1027,6 +1027,7 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE") .exclude("SPARK-31220, SPARK-32056: repartition by range with AQE") .exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with AQE") + .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") .exclude("SPARK-32753: Only copy tags to node with no tags") .exclude("Logging plan changes for AQE") .exclude("SPARK-33551: Do not use AQE shuffle read for repartition") diff --git a/tools/gluten-it/package/pom.xml b/tools/gluten-it/package/pom.xml index 87e8db65d10c..6a11b987f12c 100644 --- a/tools/gluten-it/package/pom.xml +++ b/tools/gluten-it/package/pom.xml @@ -8,7 +8,6 @@ 1.3.0-SNAPSHOT gluten-it-package - Archetype - assembly http://maven.apache.org pom