Skip to content

Commit

Permalink
resolving merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhawvipul committed Jul 14, 2024
2 parents 6b60290 + c434872 commit 30d7c72
Show file tree
Hide file tree
Showing 457 changed files with 7,593 additions and 10,310 deletions.
6 changes: 6 additions & 0 deletions .github/actions/rust-test/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ runs:
cd native
cargo check --benches
- name: Check unused dependencies
shell: bash
run: |
cd native
cargo install cargo-machete && cargo machete
- name: Cache Maven dependencies
uses: actions/cache@v4
with:
Expand Down
49 changes: 49 additions & 0 deletions .github/workflows/miri.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: Run Miri Safety Checks

on:
push:
paths-ignore:
- "doc/**"
- "docs/**"
- "**.md"
pull_request:
paths-ignore:
- "doc/**"
- "docs/**"
- "**.md"
# manual trigger
# https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow
workflow_dispatch:

jobs:
miri:
name: "Miri"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Miri
run: |
rustup toolchain install nightly --component miri
rustup override set nightly
cargo miri setup
- name: Test with Miri
run: |
cd native
MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test
5 changes: 3 additions & 2 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,10 @@ object CometConf extends ShimCometConf {
conf("spark.comet.explainFallback.enabled")
.doc(
"When this setting is enabled, Comet will provide logging explaining the reason(s) " +
"why a query stage cannot be executed natively.")
"why a query stage cannot be executed natively. Set this to false to " +
"reduce the amount of logging.")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
.doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.")
Expand Down
11 changes: 11 additions & 0 deletions common/src/main/scala/org/apache/comet/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,19 @@ package org.apache

import java.util.Properties

import org.apache.arrow.memory.RootAllocator

package object comet {

/**
* The root allocator for Comet execution. Because Arrow Java memory management is based on
* reference counting, exposed arrays increase the reference count of the underlying buffers.
* Until the reference count is zero, the memory will not be released. If the consumer side is
* finished later than the close of the allocator, the allocator will think the memory is
* leaked. To avoid this, we use a single allocator for the whole execution process.
*/
val CometArrowAllocator = new RootAllocator(Long.MaxValue)

/**
* Provides access to build information about the Comet libraries. This will be used by the
* benchmarking software to provide the source revision and repository. In addition, the build
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ package org.apache.comet.vector
import scala.collection.mutable

import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictionaryProvider, Data}
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.dictionary.DictionaryProvider
import org.apache.spark.SparkException
import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.CometArrowAllocator

class NativeUtil {
import Utils._

private val allocator = new RootAllocator(Long.MaxValue)
.newChildAllocator(this.getClass.getSimpleName, 0, Long.MaxValue)
private val allocator = CometArrowAllocator
private val dictionaryProvider: CDataDictionaryProvider = new CDataDictionaryProvider
private val importer = new ArrowImporter(allocator)

Expand Down
12 changes: 5 additions & 7 deletions common/src/main/scala/org/apache/comet/vector/StreamReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ package org.apache.comet.vector

import java.nio.channels.ReadableByteChannel

import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ReadChannel}
import org.apache.arrow.vector.ipc.message.MessageChannelReader
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.CometArrowAllocator

/**
* A reader that consumes Arrow data from an input channel, and produces Comet batches.
*/
case class StreamReader(channel: ReadableByteChannel, source: String) extends AutoCloseable {
private var allocator = new RootAllocator(Long.MaxValue)
.newChildAllocator(s"${this.getClass.getSimpleName}/$source", 0, Long.MaxValue)
private val channelReader = new MessageChannelReader(new ReadChannel(channel), allocator)
private var arrowReader = new ArrowStreamReader(channelReader, allocator)
private val channelReader =
new MessageChannelReader(new ReadChannel(channel), CometArrowAllocator)
private var arrowReader = new ArrowStreamReader(channelReader, CometArrowAllocator)
private var root = arrowReader.getVectorSchemaRoot

def nextBatch(): Option[ColumnarBatch] = {
Expand All @@ -53,11 +53,9 @@ case class StreamReader(channel: ReadableByteChannel, source: String) extends Au
if (root != null) {
arrowReader.close()
root.close()
allocator.close()

arrowReader = null
root = null
allocator = null
}
}
}
4 changes: 2 additions & 2 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2491,8 +2491,8 @@ index dd55fcfe42c..293e9dc2986 100644
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
+ case CometFilterExec(_, _, _, child, _) => child
+ case CometProjectExec(_, _, _, _, CometFilterExec(_, _, _, child, _), _) => child
+ case CometFilterExec(_, _, _, _, child, _) => child
+ case CometProjectExec(_, _, _, _, CometFilterExec(_, _, _, _, child, _), _) => child
}

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
Expand Down
4 changes: 2 additions & 2 deletions dev/diffs/3.5.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2650,8 +2650,8 @@ index dd55fcfe42c..293e9dc2986 100644
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
+ case CometFilterExec(_, _, _, child, _) => child
+ case CometProjectExec(_, _, _, _, CometFilterExec(_, _, _, child, _), _) => child
+ case CometFilterExec(_, _, _, _, child, _) => child
+ case CometProjectExec(_, _, _, _, CometFilterExec(_, _, _, _, child, _), _) => child
}

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
Expand Down
28 changes: 14 additions & 14 deletions dev/diffs/4.0.0-preview1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ index 16a493b5290..3f0b70e2d59 100644
assert(exchanges.size == 2)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index 2c24cc7d570..50a2ce86117 100644
index 2c24cc7d570..d46dc5e138a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
Expand All @@ -442,7 +442,7 @@ index 2c24cc7d570..50a2ce86117 100644

- test("partition pruning in broadcast hash joins with aliases") {
+ test("partition pruning in broadcast hash joins with aliases",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
Given("alias with simple join condition, using attribute names only")
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
Expand All @@ -452,7 +452,7 @@ index 2c24cc7d570..50a2ce86117 100644

- test("partition pruning in broadcast hash joins") {
+ test("partition pruning in broadcast hash joins",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
Given("disable broadcast pruning and disable subquery duplication")
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
Expand All @@ -462,7 +462,7 @@ index 2c24cc7d570..50a2ce86117 100644

- test("different broadcast subqueries with identical children") {
+ test("different broadcast subqueries with identical children",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withTable("fact", "dim") {
spark.range(100).select(
Expand Down Expand Up @@ -492,7 +492,7 @@ index 2c24cc7d570..50a2ce86117 100644

- test("SPARK-32817: DPP throws error when the broadcast side is empty") {
+ test("SPARK-32817: DPP throws error when the broadcast side is empty",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
Expand All @@ -502,7 +502,7 @@ index 2c24cc7d570..50a2ce86117 100644

- test("SPARK-36444: Remove OptimizeSubqueries from batch of PartitionPruning") {
+ test("SPARK-36444: Remove OptimizeSubqueries from batch of PartitionPruning",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
val df = sql(
"""
Expand All @@ -521,7 +521,7 @@ index 2c24cc7d570..50a2ce86117 100644

- test("SPARK-38674: Remove useless deduplicate in SubqueryBroadcastExec") {
+ test("SPARK-38674: Remove useless deduplicate in SubqueryBroadcastExec",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
withTable("duplicate_keys") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
Seq[(Int, String)]((1, "NL"), (1, "NL"), (3, "US"), (3, "US"), (3, "US"))
Expand All @@ -531,7 +531,7 @@ index 2c24cc7d570..50a2ce86117 100644

- test("SPARK-39338: Remove dynamic pruning subquery if pruningKey's references is empty") {
+ test("SPARK-39338: Remove dynamic pruning subquery if pruningKey's references is empty",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
val df = sql(
"""
Expand All @@ -541,7 +541,7 @@ index 2c24cc7d570..50a2ce86117 100644

- test("SPARK-39217: Makes DPP support the pruning side has Union") {
+ test("SPARK-39217: Makes DPP support the pruning side has Union",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
val df = sql(
"""
Expand Down Expand Up @@ -1159,7 +1159,7 @@ index 15de4c5cc5b..6a85dfb6883 100644

setupTestData()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
index b5bac8079c4..544c1ddc697 100644
index b5bac8079c4..a3731888e12 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
@@ -17,7 +17,8 @@
Expand Down Expand Up @@ -1190,7 +1190,7 @@ index b5bac8079c4..544c1ddc697 100644

- test("join with ordering requirement") {
+ test("join with ordering requirement",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
val query = "select * from (select key, a, c, b from testView) as t1 join " +
"(select key, a, b, c from testView) as t2 on t1.key = t2.key where t2.a > 50"
assertProjectExec(query, 2, 2)
Expand Down Expand Up @@ -2590,7 +2590,7 @@ index af07aceaed1..ed0b5e6d9be 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 5fbf379644f..32711763ec1 100644
index 5fbf379644f..6153046a787 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
Expand Down Expand Up @@ -2651,8 +2651,8 @@ index 5fbf379644f..32711763ec1 100644
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
+ case CometFilterExec(_, _, _, child, _) => child
+ case CometProjectExec(_, _, _, _, CometFilterExec(_, _, _, child, _), _) => child
+ case CometFilterExec(_, _, _, _, child, _) => child
+ case CometProjectExec(_, _, _, _, CometFilterExec(_, _, _, _, child, _), _) => child
}

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false |
| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general. 'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. 'auto' is for Comet to choose the best shuffle mode based on the query plan. By default, this config is 'jvm'. | jvm |
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false |
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. | false |
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | true |
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. Default value is 0.2. | 0.2 |
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
Expand Down
Loading

0 comments on commit 30d7c72

Please sign in to comment.