Skip to content

Commit

Permalink
Merge branch 'oap-project:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
lhuang09287750 authored Nov 3, 2023
2 parents a5a6d82 + 55f1480 commit bb59502
Show file tree
Hide file tree
Showing 267 changed files with 18,559 additions and 1,710 deletions.
67 changes: 67 additions & 0 deletions .github/workflows/velox_be.yml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,73 @@ jobs:
if: ${{ always() }}
run: |
docker stop ubuntu2004-test-spark33-$GITHUB_RUN_ID || true
ubuntu2004-test-spark34-slow:
runs-on: velox-self-hosted
steps:
- uses: actions/checkout@v2
- name: Setup docker container
run: |
docker run --rm --init --privileged --ulimit nofile=65536:65536 --ulimit core=-1 --security-opt seccomp=unconfined \
-v $PWD:/opt/gluten --name ubuntu2004-test-spark34-slow-$GITHUB_RUN_ID -e NUM_THREADS=30 -detach 10.0.2.4:5000/gluten-dev/ubuntu:20.04 \
'cd /opt/gluten && sleep 14400'
- name: Build Gluten velox third party
run: |
docker exec ubuntu2004-test-spark34-slow-$GITHUB_RUN_ID bash -l -c '
cd /opt/gluten/ep/build-velox/src && \
./get_velox.sh --velox_home=/opt/velox && \
./build_velox.sh --velox_home=/opt/velox --enable_ep_cache=ON'
- name: Build Gluten CPP library
run: |
docker exec ubuntu2004-test-spark34-slow-$GITHUB_RUN_ID bash -l -c '
cd /opt/gluten/cpp && \
./compile.sh --build_velox_backend=ON --velox_home=/opt/velox --arrow_home=/opt/velox/_build/release/third_party/arrow_ep'
- name: Build and Run unit test for Spark 3.4.1(slow tests)
run: |
docker exec ubuntu2004-test-spark34-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten && \
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.4
run: |
docker exec ubuntu2004-test-spark34-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten/tools/gluten-it && \
mvn clean install -Pspark-3.4 \
&& GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=h --error-on-memleak --disable-aqe --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
&& GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=ds --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1'
- name: Exit docker container
if: ${{ always() }}
run: |
docker stop ubuntu2004-test-spark34-slow-$GITHUB_RUN_ID || true
ubuntu2004-test-spark34:
runs-on: velox-self-hosted
steps:
- uses: actions/checkout@v2
- name: Setup docker container
run: |
docker run --rm --init --privileged --ulimit nofile=65536:65536 --ulimit core=-1 --security-opt seccomp=unconfined \
-v $PWD:/opt/gluten --name ubuntu2004-test-spark34-$GITHUB_RUN_ID -e NUM_THREADS=30 -detach 10.0.2.4:5000/gluten-dev/ubuntu:20.04 \
'cd /opt/gluten && sleep 14400'
- name: Build Gluten velox third party
run: |
docker exec ubuntu2004-test-spark34-$GITHUB_RUN_ID bash -c '
cd /opt/gluten/ep/build-velox/src && \
./get_velox.sh --velox_home=/opt/velox && \
./build_velox.sh --velox_home=/opt/velox --enable_ep_cache=ON'
- name: Build Gluten CPP library
run: |
docker exec ubuntu2004-test-spark34-$GITHUB_RUN_ID bash -c '
cd /opt/gluten/cpp && \
./compile.sh --build_velox_backend=ON --velox_home=/opt/velox --arrow_home=/opt/velox/_build/release/third_party/arrow_ep --build_examples=ON'
- name: Build and Run unit test for Spark 3.4.1(other tests)
run: |
docker exec ubuntu2004-test-spark34-$GITHUB_RUN_ID bash -c 'cd /opt/gluten && \
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \
mvn test -Pspark-3.4 -Pbackends-velox -DtagsToExclude=None -DtagsToInclude=io.glutenproject.tags.UDFTest'
- name: Exit docker container
if: ${{ always() }}
run: |
docker stop ubuntu2004-test-spark34-$GITHUB_RUN_ID || true
ubuntu2204-test:
runs-on: velox-self-hosted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.glutenproject.substrait.plan.PlanNode;

import com.google.protobuf.Any;
import io.substrait.proto.Plan;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.internal.SQLConf;

Expand Down Expand Up @@ -80,12 +79,12 @@ private PlanNode buildNativeConfNode(Map<String, String> confs) {
// Used by WholeStageTransform to create the native computing pipeline and
// return a columnar result iterator.
public GeneralOutIterator createKernelWithBatchIterator(
Plan wsPlan, List<GeneralInIterator> iterList, boolean materializeInput) {
byte[] wsPlan, List<GeneralInIterator> iterList, boolean materializeInput) {
long allocId = CHNativeMemoryAllocators.contextInstance().getNativeInstanceId();
long handle =
jniWrapper.nativeCreateKernelWithIterator(
allocId,
getPlanBytesBuf(wsPlan),
wsPlan,
iterList.toArray(new GeneralInIterator[0]),
buildNativeConfNode(
GlutenConfig.getNativeBackendConf(
Expand Down Expand Up @@ -115,10 +114,6 @@ public GeneralOutIterator createKernelWithBatchIterator(
return createOutIterator(handle);
}

private byte[] getPlanBytesBuf(Plan planNode) {
return planNode.toByteArray();
}

private GeneralOutIterator createOutIterator(long nativeHandle) {
return new BatchIterator(nativeHandle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat
import io.glutenproject.utils.{LogLevelUtil, SubstraitPlanPrinterUtil}
import io.glutenproject.vectorized.{CHNativeExpressionEvaluator, CloseableCHColumnBatchIterator, GeneralInIterator, GeneralOutIterator}

import org.apache.spark.{InterruptibleIterator, Partition, SparkConf, SparkContext, TaskContext}
import org.apache.spark.{InterruptibleIterator, SparkConf, SparkContext, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -91,19 +91,19 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
fileFormats(i)),
SoftAffinityUtil.getFilePartitionLocations(f))
case _ =>
throw new UnsupportedOperationException(s"Unsupport operators.")
throw new UnsupportedOperationException(s"Unsupported input partition.")
})
wsCxt.substraitContext.initLocalFilesNodesIndex(0)
wsCxt.substraitContext.setLocalFilesNodes(localFilesNodesWithLocations.map(_._1))
val substraitPlan = wsCxt.root.toProtobuf
if (index < 3) {
if (index == 0) {
logOnLevel(
GlutenConfig.getConf.substraitPlanLogLevel,
s"The substrait plan for partition $index:\n${SubstraitPlanPrinterUtil
.substraitPlanToJson(substraitPlan)}"
)
}
GlutenPartition(index, substraitPlan, localFilesNodesWithLocations.head._2)
GlutenPartition(index, substraitPlan.toByteArray, localFilesNodesWithLocations.head._2)
}

/**
Expand Down Expand Up @@ -185,7 +185,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
}.asJava)
// we need to complete dependency RDD's firstly
transKernel.createKernelWithBatchIterator(
rootNode.toProtobuf,
rootNode.toProtobuf.toByteArray,
columnarNativeIterator,
materializeInput)
}
Expand Down Expand Up @@ -267,8 +267,6 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {

/** Compute for BroadcastBuildSideRDD */
override def genBroadcastBuildSideIterator(
split: Partition,
context: TaskContext,
broadcasted: Broadcast[BuildSideRelation],
broadCastContext: BroadCastHashJoinContext): Iterator[ColumnarBatch] = {
CHBroadcastBuildSideCache.getOrBuildBroadcastHashTable(broadcasted, broadCastContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat
import io.glutenproject.utils.Iterators
import io.glutenproject.vectorized._

import org.apache.spark.{Partition, SparkConf, SparkContext, TaskContext}
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -122,7 +122,7 @@ class IteratorApiImpl extends IteratorApi with Logging {
wsCxt.substraitContext.initLocalFilesNodesIndex(0)
wsCxt.substraitContext.setLocalFilesNodes(localFilesNodesWithLocations.map(_._1))
val substraitPlan = wsCxt.root.toProtobuf
GlutenPartition(index, substraitPlan, localFilesNodesWithLocations.head._2)
GlutenPartition(index, substraitPlan.toByteArray, localFilesNodesWithLocations.head._2)
}

/**
Expand Down Expand Up @@ -187,7 +187,9 @@ class IteratorApiImpl extends IteratorApi with Logging {
iter => new ColumnarBatchInIterator(iter.asJava)
}.asJava)
val nativeResultIterator =
transKernel.createKernelWithBatchIterator(rootNode.toProtobuf, columnarNativeIterator)
transKernel.createKernelWithBatchIterator(
rootNode.toProtobuf.toByteArray,
columnarNativeIterator)

pipelineTime += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beforeBuild)

Expand Down Expand Up @@ -217,8 +219,6 @@ class IteratorApiImpl extends IteratorApi with Logging {

/** Compute for BroadcastBuildSideRDD */
override def genBroadcastBuildSideIterator(
split: Partition,
context: TaskContext,
broadcasted: Broadcast[BuildSideRelation],
broadCastContext: BroadCastHashJoinContext): Iterator[ColumnarBatch] = {
val relation = broadcasted.value.asReadOnlyCopy(broadCastContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ class VeloxDataTypeValidationSuite extends VeloxWholeStageTransformerSuite {
}
}

test("Velox Parquet Write") {
ignore("Velox Parquet Write") {
withSQLConf(("spark.gluten.sql.native.writer.enabled", "true")) {
withTempDir {
dir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,4 +583,18 @@ class VeloxStringFunctionsSuite extends VeloxWholeStageTransformerSuite {
s"select l_orderkey, substring(l_comment, $NULL_STR_COL, 3) " +
s"from $LINEITEM_TABLE limit $LENGTH")(checkOperatorMatch[ProjectExecTransformer])
}

test("left") {
runQueryAndCompare(
s"select l_orderkey, left(l_comment, 1) " +
s"from $LINEITEM_TABLE limit $LENGTH")(checkOperatorMatch[ProjectExecTransformer])

runQueryAndCompare(
s"select l_orderkey, left($NULL_STR_COL, 1) " +
s"from $LINEITEM_TABLE limit $LENGTH")(checkOperatorMatch[ProjectExecTransformer])

runQueryAndCompare(
s"select l_orderkey, left(l_comment, $NULL_STR_COL) " +
s"from $LINEITEM_TABLE limit $LENGTH")(checkOperatorMatch[ProjectExecTransformer])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
_.getMessage.toString.contains("Use Gluten partition write for hive")) == native)
}

test("test hive static partition write table") {
ignore("test hive static partition write table") {
withTable("t") {
spark.sql(
"CREATE TABLE t (c int, d long, e long)" +
Expand Down Expand Up @@ -127,7 +127,7 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
}
}

test("test hive write table") {
ignore("test hive write table") {
withTable("t") {
spark.sql("CREATE TABLE t (c int) STORED AS PARQUET")
withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
super.sparkConf.set("spark.gluten.sql.native.writer.enabled", "true")
}

test("test write parquet with compression codec") {
ignore("test write parquet with compression codec") {
// compression codec details see `VeloxParquetDatasource.cc`
Seq("snappy", "gzip", "zstd", "lz4", "none", "uncompressed")
.foreach {
Expand Down Expand Up @@ -71,7 +71,7 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
}
}

test("test ctas") {
ignore("test ctas") {
withTable("velox_ctas") {
spark
.range(100)
Expand All @@ -82,7 +82,7 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
}
}

test("test parquet dynamic partition write") {
ignore("test parquet dynamic partition write") {
withTempPath {
f =>
val path = f.getCanonicalPath
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20231028
CH_COMMIT=520f2931398
CH_BRANCH=rebase_ch/20231031
CH_COMMIT=f15e8dcfa59
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include <AggregateFunctions/Combinators/AggregateFunctionCombinatorFactory.h>
#include <AggregateFunctions/AggregateFunctionPartialMerge.h>
#include <DataTypes/DataTypeAggregateFunction.h>

Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <filesystem>
#include <memory>
#include <optional>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include <AggregateFunctions/Combinators/AggregateFunctionCombinatorFactory.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/AggregateRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
#include "AggregateRelParser.h"
#include <memory>
#include <AggregateFunctions/AggregateFunctionIf.h>
#include <AggregateFunctions/Combinators/AggregateFunctionIf.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
Expand Down
59 changes: 1 addition & 58 deletions cpp/core/benchmarks/CompressionBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,62 +65,6 @@ const int32_t kQplGzip = 2;
const int32_t kLZ4 = 3;
const int32_t kZstd = 4;

class MyMemoryPool final : public arrow::MemoryPool {
public:
explicit MyMemoryPool() {}

Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override {
RETURN_NOT_OK(pool_->Allocate(size, out));
stats_.UpdateAllocatedBytes(size);
// std::cout << "Allocate: size = " << size << " addr = " << std::hex <<
// (uint64_t)*out << std::dec << std::endl; print_trace();
return arrow::Status::OK();
}

Status Reallocate(int64_t oldSize, int64_t newSize, int64_t alignment, uint8_t** ptr) override {
// auto old_ptr = *ptr;
RETURN_NOT_OK(pool_->Reallocate(oldSize, newSize, ptr));
stats_.UpdateAllocatedBytes(newSize - oldSize);
// std::cout << "Reallocate: old_size = " << old_size << " old_ptr = " <<
// std::hex << (uint64_t)old_ptr << std::dec << " new_size = " << new_size
// << " addr = " << std::hex << (uint64_t)*ptr << std::dec << std::endl;
// print_trace();
return arrow::Status::OK();
}

void Free(uint8_t* buffer, int64_t size, int64_t alignment) override {
pool_->Free(buffer, size);
stats_.UpdateAllocatedBytes(-size);
// std::cout << "Free: size = " << size << " addr = " << std::hex <<
// (uint64_t)buffer
// << std::dec << std::endl; print_trace();
}

int64_t bytes_allocated() const override {
return stats_.bytes_allocated();
}

int64_t max_memory() const override {
return pool_->max_memory();
}

std::string backend_name() const override {
return pool_->backend_name();
}

int64_t total_bytes_allocated() const override {
return pool_->total_bytes_allocated();
}

int64_t num_allocations() const override {
throw pool_->num_allocations();
}

private:
arrow::MemoryPool* pool_ = arrow::default_memory_pool();
arrow::internal::MemoryPoolStats stats_;
};

class BenchmarkCompression {
public:
explicit BenchmarkCompression(const std::string& fileName, uint32_t compressBufferSize) {
Expand Down Expand Up @@ -195,8 +139,7 @@ class BenchmarkCompression {
default:
throw GlutenException("Codec not supported. Only support LZ4 or QATGzip");
}
std::shared_ptr<arrow::MemoryPool> pool = std::make_shared<MyMemoryPool>();
ipcWriteOptions.memory_pool = pool.get();
ipcWriteOptions.memory_pool = arrow::default_memory_pool();

int64_t elapseRead = 0;
int64_t numBatches = 0;
Expand Down
4 changes: 2 additions & 2 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,6 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper

auto shuffleWriterOptions = ShuffleWriterOptions::defaults();
shuffleWriterOptions.partitioning_name = partitioningName;
shuffleWriterOptions.buffered_write = true;
if (bufferSize > 0) {
shuffleWriterOptions.buffer_size = bufferSize;
}
Expand Down Expand Up @@ -835,8 +834,9 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper
env->ReleaseStringUTFChars(dataFileJstr, dataFileC);

auto localDirs = env->GetStringUTFChars(localDirsJstr, JNI_FALSE);
setenv(gluten::kGlutenSparkLocalDirs.c_str(), localDirs, 1);
shuffleWriterOptions.local_dirs = std::string(localDirs);
env->ReleaseStringUTFChars(localDirsJstr, localDirs);

partitionWriterCreator = std::make_shared<LocalPartitionWriterCreator>();
} else if (partitionWriterType == "celeborn") {
shuffleWriterOptions.partition_writer_type = PartitionWriterType::kCeleborn;
Expand Down
Loading

0 comments on commit bb59502

Please sign in to comment.