Skip to content

Commit

Permalink
Merge branch 'apache:main' into wip-fix-regexp_replace
Browse files Browse the repository at this point in the history
  • Loading branch information
kecookier authored Jul 2, 2024
2 parents 1330eb8 + 6b6444e commit c2f6d5e
Show file tree
Hide file tree
Showing 59 changed files with 1,123 additions and 514 deletions.
22 changes: 11 additions & 11 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,11 @@ jobs:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Update mirror list
if: matrix.os == 'centos:8'
run: |
sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true
sed -i -e "s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g" /etc/yum.repos.d/CentOS-* || true
if [ "${{ matrix.os }}" = "centos:7" ] || [ "${{ matrix.os }}" = "centos:8" ]; then
sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true
sed -i -e "s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g" /etc/yum.repos.d/CentOS-* || true
fi
- name: Setup java and maven
run: |
if [ "${{ matrix.java }}" = "java-17" ]; then
Expand Down Expand Up @@ -334,8 +335,7 @@ jobs:
-d=FLUSH_MODE:DISABLED,spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:ABANDONED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:FLUSHED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=0.05,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=0.1,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0
- name: (To be fixed) TPC-DS SF30.0 Parquet local spark3.2 Q23A/Q23B low memory, memory isolation on
if: false # Disabled as error https://gist.github.com/zhztheplayer/abd5e83ccdc48730678ae7ebae479fcc
- name: (To be fixed) TPC-DS SF30.0 Parquet local spark3.2 Q23A/Q23B low memory, memory isolation on # Disabled as error https://gist.github.com/zhztheplayer/abd5e83ccdc48730678ae7ebae479fcc
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
Expand All @@ -345,8 +345,8 @@ jobs:
-d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
-d=FLUSH_MODE:DISABLED,spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:ABANDONED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:FLUSHED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=0.05,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=0.1,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0
- name: (To be fixed) TPC-DS SF30.0 Parquet local spark3.2 Q97 low memory # The case currently causes crash with "free: invalid size".
-d=FLUSH_MODE:FLUSHED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=0.05,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=0.1,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 || true
- name: TPC-DS SF30.0 Parquet local spark3.2 Q97 low memory
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
Expand Down Expand Up @@ -616,6 +616,10 @@ jobs:
install_arrow_deps
./dev/builddeps-veloxbe.sh --run_setup_script=OFF --enable_ep_cache=OFF --build_tests=ON \
--build_examples=ON --build_benchmarks=ON --build_protobuf=ON
- name: Gluten CPP Test
run: |
cd ./cpp/build && \
ctest -V
- uses: actions/upload-artifact@v2
with:
name: velox-native-lib-centos-8-${{github.sha}}
Expand Down Expand Up @@ -681,10 +685,6 @@ jobs:
working-directory: ${{ github.workspace }}
run: |
mkdir -p '${{ env.CCACHE_DIR }}'
- name: Gluten CPP Test
run: |
cd $GITHUB_WORKSPACE/cpp/build && \
ctest -V
- name: Prepare spark.test.home for Spark 3.2.2 (other tests)
run: |
cd $GITHUB_WORKSPACE/ && \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.gluten.vectorized;

import org.apache.gluten.metrics.IMetrics;
import org.apache.gluten.metrics.NativeMetrics;

import org.apache.spark.sql.execution.utils.CHExecUtil;
import org.apache.spark.sql.vectorized.ColumnVector;
Expand Down Expand Up @@ -50,7 +51,7 @@ public String getId() {

private native void nativeCancel(long nativeHandle);

private native IMetrics nativeFetchMetrics(long nativeHandle);
private native String nativeFetchMetrics(long nativeHandle);

@Override
public boolean hasNextInternal() throws IOException {
Expand All @@ -72,8 +73,8 @@ public ColumnarBatch nextInternal() throws IOException {
}

@Override
public IMetrics getMetricsInternal() throws IOException, ClassNotFoundException {
return nativeFetchMetrics(handle);
public IMetrics getMetricsInternal() {
return new NativeMetrics(nativeFetchMetrics(handle));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,13 +612,6 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
CHStringTranslateTransformer(substraitExprName, srcExpr, matchingExpr, replaceExpr, original)
}

override def genSizeExpressionTransformer(
substraitExprName: String,
child: ExpressionTransformer,
original: Size): ExpressionTransformer = {
CHSizeExpressionTransformer(substraitExprName, child, original)
}

override def genLikeTransformer(
substraitExprName: String,
left: ExpressionTransformer,
Expand Down Expand Up @@ -849,6 +842,24 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
CHGenerateExecTransformer(generator, requiredChildOutput, outer, generatorOutput, child)
}

/** Transform array filter to Substrait. */
override def genArrayFilterTransformer(
substraitExprName: String,
argument: ExpressionTransformer,
function: ExpressionTransformer,
expr: ArrayFilter): ExpressionTransformer = {
GenericExpressionTransformer(substraitExprName, Seq(argument, function), expr)
}

/** Transform array transform to Substrait. */
override def genArrayTransformTransformer(
substraitExprName: String,
argument: ExpressionTransformer,
function: ExpressionTransformer,
expr: ArrayTransform): ExpressionTransformer = {
GenericExpressionTransformer(substraitExprName, Seq(argument, function), expr)
}

override def genPreProjectForGenerate(generate: GenerateExec): SparkPlan = generate

override def genPostProjectForGenerate(generate: GenerateExec): SparkPlan = generate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,6 @@ import com.google.common.collect.Lists

import java.util.Locale

case class CHSizeExpressionTransformer(
substraitExprName: String,
expr: ExpressionTransformer,
original: Size)
extends BinaryExpressionTransformer {
override def left: ExpressionTransformer = expr
// Pass legacyLiteral as second argument in substrait function
override def right: ExpressionTransformer = LiteralTransformer(original.legacySizeOfNull)
}

case class CHTruncTimestampTransformer(
substraitExprName: String,
format: ExpressionTransformer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,4 +713,21 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS
}

}

test("array functions with lambda") {
withTable("tb_array") {
sql("create table tb_array(ids array<int>) using parquet")
sql("""
|insert into tb_array values (array(1,5,2,null, 3)), (array(1,1,3,2)), (null), (array())
|""".stripMargin)
val transform_sql = "select transform(ids, x -> x + 1) from tb_array"
runQueryAndCompare(transform_sql)(checkGlutenOperatorMatch[ProjectExecTransformer])

val filter_sql = "select filter(ids, x -> x % 2 == 1) from tb_array";
runQueryAndCompare(filter_sql)(checkGlutenOperatorMatch[ProjectExecTransformer])

val aggregate_sql = "select ids, aggregate(ids, 3, (acc, x) -> acc + x) from tb_array";
runQueryAndCompare(aggregate_sql)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,28 +134,10 @@ class VeloxListenerApi extends ListenerApi {
) {
loadLibFromJar(loader, conf)
}
loader
.newTransaction()
.loadAndCreateLink(s"libarrow.so.$ARROW_VERSION.0.0", s"libarrow.so.$ARROW_VERSION", false)
.loadAndCreateLink(
s"libparquet.so.$ARROW_VERSION.0.0",
s"libparquet.so.$ARROW_VERSION",
false)
.commit()
}

private def loadLibWithMacOS(loader: JniLibLoader): Unit = {
loader
.newTransaction()
.loadAndCreateLink(
s"libarrow.$ARROW_VERSION.0.0.dylib",
s"libarrow.$ARROW_VERSION.dylib",
false)
.loadAndCreateLink(
s"libparquet.$ARROW_VERSION.0.0.dylib",
s"libparquet.$ARROW_VERSION.dylib",
false)
.commit()
// Placeholder for loading shared libs on MacOS if user needs.
}

private def initialize(conf: SparkConf, isDriver: Boolean): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ object RowToVeloxColumnarExec {
try {
val handle = jniWrapper
.nativeConvertRowToColumnar(r2cHandle, rowLength.toArray, arrowBuf.memoryAddress())
val cb = ColumnarBatches.create(runtime, handle)
val cb = ColumnarBatches.create(handle)
convertTime += System.currentTimeMillis() - startNative
cb
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe
val batchHandle =
jniWrapper
.deserialize(deserializerHandle, cachedBatch.bytes)
val batch = ColumnarBatches.create(runtime, batchHandle)
val batch = ColumnarBatches.create(batchHandle)
if (shouldSelectAttributes) {
try {
ColumnarBatches.select(batch, requestedColumnIndices.toArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@
*/
package org.apache.spark.sql.execution.datasources.velox;

import org.apache.gluten.exec.Runtimes;
import org.apache.gluten.columnarbatch.ColumnarBatches;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.execution.datasources.BlockStripe;
import org.apache.spark.sql.execution.datasources.BlockStripes;
import org.apache.gluten.columnarbatch.ColumnarBatches;

import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -53,8 +51,7 @@ public BlockStripe next() {
return new BlockStripe() {
@Override
public ColumnarBatch getColumnarBatch() {
return ColumnarBatches.create(
Runtimes.contextInstance("VeloxBlockStripes"), blockAddresses[0]);
return ColumnarBatches.create(blockAddresses[0]);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.columnarbatch;

import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
import org.apache.gluten.test.VeloxBackendTestBase;
import org.apache.gluten.vectorized.ArrowWritableColumnVector;

import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.util.TaskResources$;
import org.junit.Assert;
import org.junit.Test;

import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.StreamSupport;

public class ColumnarBatchTest extends VeloxBackendTestBase {

@Test
public void testOffloadAndLoad() {
TaskResources$.MODULE$.runUnsafe(
() -> {
final int numRows = 100;
final ColumnarBatch batch = newArrowBatch("a boolean, b int", numRows);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(batch));
final ColumnarBatch offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
Assert.assertTrue(ColumnarBatches.isLightBatch(offloaded));
final ColumnarBatch loaded =
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), offloaded);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(loaded));
long cnt =
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
loaded.rowIterator(), Spliterator.ORDERED),
false)
.count();
Assert.assertEquals(numRows, cnt);
loaded.close();
return null;
});
}

@Test
public void testCreateByHandle() {
TaskResources$.MODULE$.runUnsafe(
() -> {
final int numRows = 100;
final ColumnarBatch batch = newArrowBatch("a boolean, b int", numRows);
Assert.assertEquals(1, ColumnarBatches.getRefCnt(batch));
final ColumnarBatch offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
Assert.assertEquals(1, ColumnarBatches.getRefCnt(offloaded));
final long handle = ColumnarBatches.getNativeHandle(offloaded);
final ColumnarBatch created = ColumnarBatches.create(handle);
Assert.assertEquals(handle, ColumnarBatches.getNativeHandle(created));
Assert.assertEquals(1, ColumnarBatches.getRefCnt(offloaded));
Assert.assertEquals(1, ColumnarBatches.getRefCnt(created));
ColumnarBatches.retain(created);
Assert.assertEquals(2, ColumnarBatches.getRefCnt(offloaded));
Assert.assertEquals(2, ColumnarBatches.getRefCnt(created));
ColumnarBatches.retain(offloaded);
Assert.assertEquals(3, ColumnarBatches.getRefCnt(offloaded));
Assert.assertEquals(3, ColumnarBatches.getRefCnt(created));
created.close();
Assert.assertEquals(2, ColumnarBatches.getRefCnt(offloaded));
Assert.assertEquals(2, ColumnarBatches.getRefCnt(created));
offloaded.close();
Assert.assertEquals(1, ColumnarBatches.getRefCnt(offloaded));
Assert.assertEquals(1, ColumnarBatches.getRefCnt(created));
created.close();
Assert.assertEquals(0, ColumnarBatches.getRefCnt(offloaded));
Assert.assertEquals(0, ColumnarBatches.getRefCnt(created));
return null;
});
}

private static ColumnarBatch newArrowBatch(String schema, int numRows) {
final ArrowWritableColumnVector[] columns =
ArrowWritableColumnVector.allocateColumns(numRows, StructType.fromDDL(schema));
for (ArrowWritableColumnVector col : columns) {
col.setValueCount(numRows);
}
final ColumnarBatch batch = new ColumnarBatch(columns);
batch.setNumRows(numRows);
return batch;
}
}
Loading

0 comments on commit c2f6d5e

Please sign in to comment.