Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into compression-level
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jul 11, 2024
2 parents 886f754 + 1dc092f commit 1e92732
Show file tree
Hide file tree
Showing 545 changed files with 7,632 additions and 6,717 deletions.
2 changes: 1 addition & 1 deletion .github/actions/java-test/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ runs:
- name: Run Cargo build
shell: bash
run: |
cd core
cd native
cargo build
- name: Cache Maven dependencies
Expand Down
14 changes: 10 additions & 4 deletions .github/actions/rust-test/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,27 @@ runs:
- name: Check Cargo fmt
shell: bash
run: |
cd core
cd native
cargo fmt --all -- --check --color=never
- name: Check Cargo clippy
shell: bash
run: |
cd core
cd native
cargo clippy --color=never -- -D warnings
- name: Check compilation
shell: bash
run: |
cd core
cd native
cargo check --benches
- name: Check unused dependencies
shell: bash
run: |
cd native
cargo install cargo-machete && cargo machete
- name: Cache Maven dependencies
uses: actions/cache@v4
with:
Expand All @@ -56,5 +62,5 @@ runs:
- name: Run Cargo test
shell: bash
run: |
cd core
cd native
RUST_BACKTRACE=1 cargo test
6 changes: 3 additions & 3 deletions .github/workflows/benchmark-tpch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ jobs:
with:
name: libcomet-${{ github.run_id }}
path: |
core/target/release/libcomet.so
core/target/release/libcomet.dylib
native/target/release/libcomet.so
native/target/release/libcomet.dylib
retention-days: 1 # remove the artifact after 1 day, only valid for this workflow
overwrite: true
- name: Generate TPC-H (SF=1) table data
Expand Down Expand Up @@ -119,7 +119,7 @@ jobs:
uses: actions/download-artifact@v4
with:
name: libcomet-${{ github.run_id }}
path: core/target/release
path: native/target/release
- name: Run TPC-H queries
run: |
SPARK_HOME=`pwd` SPARK_TPCH_DATA=`pwd`/tpch/sf1_parquet ./mvnw -B -Prelease -Dsuites=org.apache.spark.sql.CometTPCHQuerySuite test
6 changes: 3 additions & 3 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ jobs:
with:
name: libcomet-${{ github.run_id }}
path: |
core/target/release/libcomet.so
core/target/release/libcomet.dylib
native/target/release/libcomet.so
native/target/release/libcomet.dylib
retention-days: 1 # remove the artifact after 1 day, only valid for this workflow
overwrite: true
- name: Build tpcds-kit
Expand Down Expand Up @@ -134,7 +134,7 @@ jobs:
uses: actions/download-artifact@v4
with:
name: libcomet-${{ github.run_id }}
path: core/target/release
path: native/target/release
- name: Run TPC-DS queries (Sort merge join)
if: matrix.join == 'sort_merge'
run: |
Expand Down
49 changes: 49 additions & 0 deletions .github/workflows/miri.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: Run Miri Safety Checks

on:
push:
paths-ignore:
- "doc/**"
- "docs/**"
- "**.md"
pull_request:
paths-ignore:
- "doc/**"
- "docs/**"
- "**.md"
# manual trigger
# https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow
workflow_dispatch:

jobs:
miri:
name: "Miri"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Miri
run: |
rustup toolchain install nightly --component miri
rustup override set nightly
cargo miri setup
- name: Test with Miri
run: |
cd native
MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ derby.log
metastore_db/
spark-warehouse/
dependency-reduced-pom.xml
core/src/execution/generated
native/core/src/execution/generated
prebuild
.flattened-pom.xml
rat.txt
Expand Down
36 changes: 18 additions & 18 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,65 +20,65 @@
all: core jvm

core:
cd core && cargo build
cd native && cargo build
test-rust:
# We need to compile CometException so that the cargo test can pass
./mvnw compile -pl common -DskipTests $(PROFILES)
cd core && cargo build && \
cd native && cargo build && \
RUST_BACKTRACE=1 cargo test
jvm:
./mvnw clean package -DskipTests $(PROFILES)
test-jvm: core
SPARK_HOME=`pwd` COMET_CONF_DIR=$(shell pwd)/conf RUST_BACKTRACE=1 ./mvnw verify $(PROFILES)
test: test-rust test-jvm
clean:
cd core && cargo clean
cd native && cargo clean
./mvnw clean $(PROFILES)
rm -rf .dist
bench:
cd core && RUSTFLAGS="-Ctarget-cpu=native" cargo bench $(filter-out $@,$(MAKECMDGOALS))
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo bench $(filter-out $@,$(MAKECMDGOALS))
format:
cd core && cargo fmt
cd native && cargo fmt
./mvnw compile test-compile scalafix:scalafix -Psemanticdb $(PROFILES)
./mvnw spotless:apply $(PROFILES)

core-amd64:
rustup target add x86_64-apple-darwin
cd core && RUSTFLAGS="-Ctarget-cpu=skylake -Ctarget-feature=-prefer-256-bit" CC=o64-clang CXX=o64-clang++ cargo build --target x86_64-apple-darwin --release
cd native && RUSTFLAGS="-Ctarget-cpu=skylake -Ctarget-feature=-prefer-256-bit" CC=o64-clang CXX=o64-clang++ cargo build --target x86_64-apple-darwin --release
mkdir -p common/target/classes/org/apache/comet/darwin/x86_64
cp core/target/x86_64-apple-darwin/release/libcomet.dylib common/target/classes/org/apache/comet/darwin/x86_64
cd core && RUSTFLAGS="-Ctarget-cpu=haswell -Ctarget-feature=-prefer-256-bit" cargo build --release
cp native/target/x86_64-apple-darwin/release/libcomet.dylib common/target/classes/org/apache/comet/darwin/x86_64
cd native && RUSTFLAGS="-Ctarget-cpu=haswell -Ctarget-feature=-prefer-256-bit" cargo build --release
mkdir -p common/target/classes/org/apache/comet/linux/amd64
cp core/target/release/libcomet.so common/target/classes/org/apache/comet/linux/amd64
cp native/target/release/libcomet.so common/target/classes/org/apache/comet/linux/amd64
jar -cf common/target/comet-native-x86_64.jar \
-C common/target/classes/org/apache/comet darwin \
-C common/target/classes/org/apache/comet linux
./dev/deploy-file common/target/comet-native-x86_64.jar comet-native-x86_64${COMET_CLASSIFIER} jar

core-arm64:
rustup target add aarch64-apple-darwin
cd core && RUSTFLAGS="-Ctarget-cpu=apple-m1" CC=arm64-apple-darwin21.4-clang CXX=arm64-apple-darwin21.4-clang++ CARGO_FEATURE_NEON=1 cargo build --target aarch64-apple-darwin --release
cd native && RUSTFLAGS="-Ctarget-cpu=apple-m1" CC=arm64-apple-darwin21.4-clang CXX=arm64-apple-darwin21.4-clang++ CARGO_FEATURE_NEON=1 cargo build --target aarch64-apple-darwin --release
mkdir -p common/target/classes/org/apache/comet/darwin/aarch64
cp core/target/aarch64-apple-darwin/release/libcomet.dylib common/target/classes/org/apache/comet/darwin/aarch64
cd core && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
cp native/target/aarch64-apple-darwin/release/libcomet.dylib common/target/classes/org/apache/comet/darwin/aarch64
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
mkdir -p common/target/classes/org/apache/comet/linux/aarch64
cp core/target/release/libcomet.so common/target/classes/org/apache/comet/linux/aarch64
cp native/target/release/libcomet.so common/target/classes/org/apache/comet/linux/aarch64
jar -cf common/target/comet-native-aarch64.jar \
-C common/target/classes/org/apache/comet darwin \
-C common/target/classes/org/apache/comet linux
./dev/deploy-file common/target/comet-native-aarch64.jar comet-native-aarch64${COMET_CLASSIFIER} jar

release-linux: clean
rustup target add aarch64-apple-darwin x86_64-apple-darwin
cd core && RUSTFLAGS="-Ctarget-cpu=apple-m1" CC=arm64-apple-darwin21.4-clang CXX=arm64-apple-darwin21.4-clang++ CARGO_FEATURE_NEON=1 cargo build --target aarch64-apple-darwin --release
cd core && RUSTFLAGS="-Ctarget-cpu=skylake -Ctarget-feature=-prefer-256-bit" CC=o64-clang CXX=o64-clang++ cargo build --target x86_64-apple-darwin --release
cd core && RUSTFLAGS="-Ctarget-cpu=native -Ctarget-feature=-prefer-256-bit" cargo build --release
cd native && RUSTFLAGS="-Ctarget-cpu=apple-m1" CC=arm64-apple-darwin21.4-clang CXX=arm64-apple-darwin21.4-clang++ CARGO_FEATURE_NEON=1 cargo build --target aarch64-apple-darwin --release
cd native && RUSTFLAGS="-Ctarget-cpu=skylake -Ctarget-feature=-prefer-256-bit" CC=o64-clang CXX=o64-clang++ cargo build --target x86_64-apple-darwin --release
cd native && RUSTFLAGS="-Ctarget-cpu=native -Ctarget-feature=-prefer-256-bit" cargo build --release
./mvnw install -Prelease -DskipTests $(PROFILES)
release:
cd core && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
./mvnw install -Prelease -DskipTests $(PROFILES)
release-nogit:
cd core && RUSTFLAGS="-Ctarget-cpu=native" cargo build --features nightly --release
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --features nightly --release
./mvnw install -Prelease -DskipTests $(PROFILES) -Dmaven.gitcommitid.skip=true
benchmark-%: clean release
cd spark && COMET_CONF_DIR=$(shell pwd)/conf MAVEN_OPTS='-Xmx20g' ../mvnw exec:java -Dexec.mainClass="$*" -Dexec.classpathScope="test" -Dexec.cleanupDaemonThreads="false" -Dexec.args="$(filter-out $@,$(MAKECMDGOALS))" $(PROFILES)
Expand Down
4 changes: 2 additions & 2 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,14 @@ under the License.
<directory>${project.basedir}/src/main/resources</directory>
</resource>
<resource>
<directory>${project.basedir}/../core/target/x86_64-apple-darwin/release</directory>
<directory>${project.basedir}/../native/target/x86_64-apple-darwin/release</directory>
<includes>
<include>libcomet.dylib</include>
</includes>
<targetPath>org/apache/comet/darwin/x86_64</targetPath>
</resource>
<resource>
<directory>${project.basedir}/../core/target/aarch64-apple-darwin/release</directory>
<directory>${project.basedir}/../native/target/aarch64-apple-darwin/release</directory>
<includes>
<include>libcomet.dylib</include>
</includes>
Expand Down
11 changes: 11 additions & 0 deletions common/src/main/scala/org/apache/comet/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,19 @@ package org.apache

import java.util.Properties

import org.apache.arrow.memory.RootAllocator

package object comet {

/**
* The root allocator for Comet execution. Because Arrow Java memory management is based on
* reference counting, exposed arrays increase the reference count of the underlying buffers.
* Until the reference count is zero, the memory will not be released. If the consumer side is
* finished later than the close of the allocator, the allocator will think the memory is
* leaked. To avoid this, we use a single allocator for the whole execution process.
*/
val CometArrowAllocator = new RootAllocator(Long.MaxValue)

/**
* Provides access to build information about the Comet libraries. This will be used by the
* benchmarking software to provide the source revision and repository. In addition, the build
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ package org.apache.comet.vector
import scala.collection.mutable

import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictionaryProvider, Data}
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.dictionary.DictionaryProvider
import org.apache.spark.SparkException
import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.CometArrowAllocator

class NativeUtil {
import Utils._

private val allocator = new RootAllocator(Long.MaxValue)
.newChildAllocator(this.getClass.getSimpleName, 0, Long.MaxValue)
private val allocator = CometArrowAllocator
private val dictionaryProvider: CDataDictionaryProvider = new CDataDictionaryProvider
private val importer = new ArrowImporter(allocator)

Expand Down
12 changes: 5 additions & 7 deletions common/src/main/scala/org/apache/comet/vector/StreamReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ package org.apache.comet.vector

import java.nio.channels.ReadableByteChannel

import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ReadChannel}
import org.apache.arrow.vector.ipc.message.MessageChannelReader
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.CometArrowAllocator

/**
* A reader that consumes Arrow data from an input channel, and produces Comet batches.
*/
case class StreamReader(channel: ReadableByteChannel, source: String) extends AutoCloseable {
private var allocator = new RootAllocator(Long.MaxValue)
.newChildAllocator(s"${this.getClass.getSimpleName}/$source", 0, Long.MaxValue)
private val channelReader = new MessageChannelReader(new ReadChannel(channel), allocator)
private var arrowReader = new ArrowStreamReader(channelReader, allocator)
private val channelReader =
new MessageChannelReader(new ReadChannel(channel), CometArrowAllocator)
private var arrowReader = new ArrowStreamReader(channelReader, CometArrowAllocator)
private var root = arrowReader.getVectorSchemaRoot

def nextBatch(): Option[ColumnarBatch] = {
Expand All @@ -53,11 +53,9 @@ case class StreamReader(channel: ReadableByteChannel, source: String) extends Au
if (root != null) {
arrowReader.close()
root.close()
allocator.close()

arrowReader = null
root = null
allocator = null
}
}
}
4 changes: 2 additions & 2 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2491,8 +2491,8 @@ index dd55fcfe42c..293e9dc2986 100644
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
+ case CometFilterExec(_, _, _, child, _) => child
+ case CometProjectExec(_, _, _, _, CometFilterExec(_, _, _, child, _), _) => child
+ case CometFilterExec(_, _, _, _, child, _) => child
+ case CometProjectExec(_, _, _, _, CometFilterExec(_, _, _, _, child, _), _) => child
}

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
Expand Down
4 changes: 2 additions & 2 deletions dev/diffs/3.5.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2650,8 +2650,8 @@ index dd55fcfe42c..293e9dc2986 100644
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
+ case CometFilterExec(_, _, _, child, _) => child
+ case CometProjectExec(_, _, _, _, CometFilterExec(_, _, _, child, _), _) => child
+ case CometFilterExec(_, _, _, _, child, _) => child
+ case CometProjectExec(_, _, _, _, CometFilterExec(_, _, _, _, child, _), _) => child
}

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
Expand Down
Loading

0 comments on commit 1e92732

Please sign in to comment.