Skip to content

Commit

Permalink
Merge branch 'main' into drop_partial_sort
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored Jul 2, 2024
2 parents b4947dc + 264ff2e commit 7375148
Show file tree
Hide file tree
Showing 130 changed files with 2,354 additions and 1,846 deletions.
45 changes: 28 additions & 17 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,11 @@ jobs:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Update mirror list
if: matrix.os == 'centos:8'
run: |
sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true
sed -i -e "s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g" /etc/yum.repos.d/CentOS-* || true
if [ "${{ matrix.os }}" = "centos:7" ] || [ "${{ matrix.os }}" = "centos:8" ]; then
sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true
sed -i -e "s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g" /etc/yum.repos.d/CentOS-* || true
fi
- name: Setup java and maven
run: |
if [ "${{ matrix.java }}" = "java-17" ]; then
Expand Down Expand Up @@ -292,7 +293,7 @@ jobs:
- name: TPC-DS SF30.0 Parquet local spark3.2 Q67/Q95 low memory, memory isolation off
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh parameterized \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q67,q95 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
--skip-data-gen -m=OffHeapExecutionMemory \
-d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
Expand All @@ -304,7 +305,7 @@ jobs:
- name: TPC-DS SF30.0 Parquet local spark3.2 Q67 low memory, memory isolation on
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh parameterized \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q67 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
--skip-data-gen -m=OffHeapExecutionMemory \
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1 \
Expand All @@ -315,7 +316,7 @@ jobs:
- name: (To be fixed) TPC-DS SF30.0 Parquet local spark3.2 Q95 low memory, memory isolation on
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh parameterized \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q95 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
--skip-data-gen -m=OffHeapExecutionMemory \
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1 \
Expand All @@ -326,19 +327,29 @@ jobs:
- name: TPC-DS SF30.0 Parquet local spark3.2 Q23A/Q23B low memory
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh parameterized \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q23a,q23b -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
--skip-data-gen -m=OffHeapExecutionMemory \
-d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1 \
-d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
-d=FLUSH_MODE:DISABLED,spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:ABANDONED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:FLUSHED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=0.05,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=0.1,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0
- name: (To be fixed) TPC-DS SF30.0 Parquet local spark3.2 Q97 low memory # The case currently causes crash with "free: invalid size".
- name: (To be fixed) TPC-DS SF30.0 Parquet local spark3.2 Q23A/Q23B low memory, memory isolation on # Disabled as error https://gist.github.com/zhztheplayer/abd5e83ccdc48730678ae7ebae479fcc
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q23a,q23b -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
--skip-data-gen -m=OffHeapExecutionMemory \
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1 \
-d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
-d=FLUSH_MODE:DISABLED,spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:ABANDONED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:FLUSHED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=0.05,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=0.1,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 || true
- name: TPC-DS SF30.0 Parquet local spark3.2 Q97 low memory
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh parameterized \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q97 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
--skip-data-gen -m=OffHeapExecutionMemory \
-d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
Expand Down Expand Up @@ -455,7 +466,7 @@ jobs:
strategy:
fail-fast: false
matrix:
spark: ["spark-3.2"]
spark: [ "spark-3.2" ]
runs-on: ubuntu-20.04
container: centos:8
steps:
Expand Down Expand Up @@ -520,8 +531,8 @@ jobs:
strategy:
fail-fast: false
matrix:
spark: ["spark-3.2"]
celeborn: ["celeborn-0.4.1", "celeborn-0.3.2-incubating"]
spark: [ "spark-3.2" ]
celeborn: [ "celeborn-0.4.1", "celeborn-0.3.2-incubating" ]
runs-on: ubuntu-20.04
container: ubuntu:22.04
steps:
Expand Down Expand Up @@ -605,6 +616,10 @@ jobs:
install_arrow_deps
./dev/builddeps-veloxbe.sh --run_setup_script=OFF --enable_ep_cache=OFF --build_tests=ON \
--build_examples=ON --build_benchmarks=ON --build_protobuf=ON
- name: Gluten CPP Test
run: |
cd ./cpp/build && \
ctest -V
- uses: actions/upload-artifact@v2
with:
name: velox-native-lib-centos-8-${{github.sha}}
Expand Down Expand Up @@ -670,10 +685,6 @@ jobs:
working-directory: ${{ github.workspace }}
run: |
mkdir -p '${{ env.CCACHE_DIR }}'
- name: Gluten CPP Test
run: |
cd $GITHUB_WORKSPACE/cpp/build && \
ctest -V
- name: Prepare spark.test.home for Spark 3.2.2 (other tests)
run: |
cd $GITHUB_WORKSPACE/ && \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
import org.apache.gluten.memory.SimpleMemoryUsageRecorder;
import org.apache.gluten.memory.memtarget.MemoryTargets;
import org.apache.gluten.memory.memtarget.Spiller;
import org.apache.gluten.memory.memtarget.Spillers;

import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.TaskResources;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
* Built-in toolkit for managing native memory allocations. To use the facility, one should import
Expand All @@ -46,12 +45,12 @@ private CHNativeMemoryAllocators() {}
private static CHNativeMemoryAllocatorManager createNativeMemoryAllocatorManager(
String name,
TaskMemoryManager taskMemoryManager,
List<Spiller> spillers,
Spiller spiller,
SimpleMemoryUsageRecorder usage) {

CHManagedCHReservationListener rl =
new CHManagedCHReservationListener(
MemoryTargets.newConsumer(taskMemoryManager, name, spillers, Collections.emptyMap()),
MemoryTargets.newConsumer(taskMemoryManager, name, spiller, Collections.emptyMap()),
usage);
return new CHNativeMemoryAllocatorManagerImpl(CHNativeMemoryAllocator.createListenable(rl));
}
Expand All @@ -67,7 +66,7 @@ public static CHNativeMemoryAllocator contextInstance() {
createNativeMemoryAllocatorManager(
"ContextInstance",
TaskResources.getLocalTaskContext().taskMemoryManager(),
Collections.emptyList(),
Spillers.NOOP,
TaskResources.getSharedUsage());
TaskResources.addResource(id, manager);
}
Expand All @@ -78,7 +77,7 @@ public static CHNativeMemoryAllocator contextInstanceForUT() {
return CHNativeMemoryAllocator.getDefaultForUT();
}

public static CHNativeMemoryAllocator createSpillable(String name, Spiller... spillers) {
public static CHNativeMemoryAllocator createSpillable(String name, Spiller spiller) {
if (!TaskResources.inSparkTask()) {
throw new IllegalStateException("spiller must be used in a Spark task");
}
Expand All @@ -87,7 +86,7 @@ public static CHNativeMemoryAllocator createSpillable(String name, Spiller... sp
createNativeMemoryAllocatorManager(
name,
TaskResources.getLocalTaskContext().taskMemoryManager(),
Arrays.asList(spillers),
spiller,
TaskResources.getSharedUsage());
TaskResources.addAnonymousResource(manager);
// force add memory consumer to task memory manager, will release by inactivate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,13 +612,6 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
CHStringTranslateTransformer(substraitExprName, srcExpr, matchingExpr, replaceExpr, original)
}

override def genSizeExpressionTransformer(
substraitExprName: String,
child: ExpressionTransformer,
original: Size): ExpressionTransformer = {
CHSizeExpressionTransformer(substraitExprName, child, original)
}

override def genLikeTransformer(
substraitExprName: String,
left: ExpressionTransformer,
Expand Down Expand Up @@ -849,6 +842,24 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
CHGenerateExecTransformer(generator, requiredChildOutput, outer, generatorOutput, child)
}

/** Transform array filter to Substrait. */
override def genArrayFilterTransformer(
substraitExprName: String,
argument: ExpressionTransformer,
function: ExpressionTransformer,
expr: ArrayFilter): ExpressionTransformer = {
GenericExpressionTransformer(substraitExprName, Seq(argument, function), expr)
}

/** Transform array transform to Substrait. */
override def genArrayTransformTransformer(
substraitExprName: String,
argument: ExpressionTransformer,
function: ExpressionTransformer,
expr: ArrayTransform): ExpressionTransformer = {
GenericExpressionTransformer(substraitExprName, Seq(argument, function), expr)
}

override def genPreProjectForGenerate(generate: GenerateExec): SparkPlan = generate

override def genPostProjectForGenerate(generate: GenerateExec): SparkPlan = generate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,6 @@ import com.google.common.collect.Lists

import java.util.Locale

case class CHSizeExpressionTransformer(
substraitExprName: String,
expr: ExpressionTransformer,
original: Size)
extends BinaryExpressionTransformer {
override def left: ExpressionTransformer = expr
// Pass legacyLiteral as second argument in substrait function
override def right: ExpressionTransformer = LiteralTransformer(original.legacySizeOfNull)
}

case class CHTruncTimestampTransformer(
substraitExprName: String,
format: ExpressionTransformer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ object CHExpressionUtil {
URL_ENCODE -> DefaultValidator(),
SKEWNESS -> DefaultValidator(),
SOUNDEX -> DefaultValidator(),
BIT_LENGTH -> DefaultValidator(),
MAKE_YM_INTERVAL -> DefaultValidator(),
MAP_ZIP_WITH -> DefaultValidator(),
ZIP_WITH -> DefaultValidator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{SparkDirectoryUtil, Utils}

import java.io.IOException
import java.util
import java.util.{Locale, UUID}

class CHColumnarShuffleWriter[K, V](
Expand Down Expand Up @@ -122,7 +121,10 @@ class CHColumnarShuffleWriter[K, V](
CHNativeMemoryAllocators.createSpillable(
"ShuffleWriter",
new Spiller() {
override def spill(self: MemoryTarget, size: Long): Long = {
override def spill(self: MemoryTarget, phase: Spiller.Phase, size: Long): Long = {
if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
return 0L;
}
if (nativeSplitter == 0) {
throw new IllegalStateException(
"Fatal: spill() called before a shuffle writer " +
Expand All @@ -134,8 +136,6 @@ class CHColumnarShuffleWriter[K, V](
logError(s"Gluten shuffle writer: Spilled $spilled / $size bytes of data")
spilled
}

override def applicablePhases(): util.Set[Spiller.Phase] = Spillers.PHASE_SET_SPILL_ONLY
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.gluten.memory.alloc.CHNativeMemoryAllocator;
import org.apache.gluten.memory.alloc.CHNativeMemoryAllocatorManagerImpl;
import org.apache.gluten.memory.memtarget.MemoryTargets;
import org.apache.gluten.memory.memtarget.Spillers;

import org.apache.spark.SparkConf;
import org.apache.spark.internal.config.package$;
Expand Down Expand Up @@ -52,7 +53,7 @@ public void initMemoryManager() {
listener =
new CHManagedCHReservationListener(
MemoryTargets.newConsumer(
taskMemoryManager, "test", Collections.emptyList(), Collections.emptyMap()),
taskMemoryManager, "test", Spillers.NOOP, Collections.emptyMap()),
new SimpleMemoryUsageRecorder());

manager = new CHNativeMemoryAllocatorManagerImpl(new CHNativeMemoryAllocator(-1L, listener));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,13 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr
}
}

test("bit_get/bit_count") {
runQueryAndCompare(
"select bit_count(id), bit_get(id, 0), bit_get(id, 1), bit_get(id, 2), bit_get(id, 3) from range(100)") {
checkGlutenOperatorMatch[ProjectExecTransformer]
}
}

test("test 'EqualNullSafe'") {
runQueryAndCompare("select l_linenumber <=> l_orderkey, l_linenumber <=> null from lineitem") {
checkGlutenOperatorMatch[ProjectExecTransformer]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,4 +713,21 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS
}

}

test("array functions with lambda") {
withTable("tb_array") {
sql("create table tb_array(ids array<int>) using parquet")
sql("""
|insert into tb_array values (array(1,5,2,null, 3)), (array(1,1,3,2)), (null), (array())
|""".stripMargin)
val transform_sql = "select transform(ids, x -> x + 1) from tb_array"
runQueryAndCompare(transform_sql)(checkGlutenOperatorMatch[ProjectExecTransformer])

val filter_sql = "select filter(ids, x -> x % 2 == 1) from tb_array";
runQueryAndCompare(filter_sql)(checkGlutenOperatorMatch[ProjectExecTransformer])

val aggregate_sql = "select ids, aggregate(ids, 3, (acc, x) -> acc + x) from tb_array";
runQueryAndCompare(aggregate_sql)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import org.apache.gluten.exec.Runtime;
import org.apache.gluten.exec.Runtimes;
import org.apache.gluten.memory.nmm.NativeMemoryManager;
import org.apache.gluten.memory.nmm.NativeMemoryManagers;
import org.apache.gluten.vectorized.ColumnarBatchInIterator;
import org.apache.gluten.vectorized.ColumnarBatchOutIterator;

Expand All @@ -30,12 +28,10 @@
public final class VeloxBatchAppender {
public static ColumnarBatchOutIterator create(
int minOutputBatchSize, Iterator<ColumnarBatch> in) {
final Runtime runtime = Runtimes.contextInstance();
final NativeMemoryManager nmm = NativeMemoryManagers.contextInstance("VeloxBatchAppender");
final Runtime runtime = Runtimes.contextInstance("VeloxBatchAppender");
long outHandle =
VeloxBatchAppenderJniWrapper.forRuntime(runtime)
.create(
nmm.getNativeInstanceHandle(), minOutputBatchSize, new ColumnarBatchInIterator(in));
return new ColumnarBatchOutIterator(runtime, outHandle, nmm);
VeloxBatchAppenderJniWrapper.create(runtime)
.create(minOutputBatchSize, new ColumnarBatchInIterator(in));
return new ColumnarBatchOutIterator(runtime, outHandle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private VeloxBatchAppenderJniWrapper(Runtime runtime) {
this.runtime = runtime;
}

public static VeloxBatchAppenderJniWrapper forRuntime(Runtime runtime) {
public static VeloxBatchAppenderJniWrapper create(Runtime runtime) {
return new VeloxBatchAppenderJniWrapper(runtime);
}

Expand All @@ -36,6 +36,5 @@ public long handle() {
return runtime.getHandle();
}

public native long create(
long memoryManagerHandle, int minOutputBatchSize, ColumnarBatchInIterator itr);
public native long create(int minOutputBatchSize, ColumnarBatchInIterator itr);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.gluten.utils;

import org.apache.gluten.exec.Runtimes;

import org.apache.commons.io.IOUtils;
import org.apache.spark.util.sketch.BloomFilter;
import org.apache.spark.util.sketch.IncompatibleMergeException;
Expand All @@ -27,17 +29,15 @@
import java.io.OutputStream;

public class VeloxBloomFilter extends BloomFilter {

private final VeloxBloomFilterJniWrapper jni;
private final VeloxBloomFilterJniWrapper jni =
VeloxBloomFilterJniWrapper.create(Runtimes.contextInstance("VeloxBloomFilter"));
private final long handle;

private VeloxBloomFilter(byte[] data) {
jni = VeloxBloomFilterJniWrapper.create();
handle = jni.init(data);
}

private VeloxBloomFilter(int capacity) {
jni = VeloxBloomFilterJniWrapper.create();
handle = jni.empty(capacity);
}

Expand Down
Loading

0 comments on commit 7375148

Please sign in to comment.