Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/gaoyangxiaozhu/gluten into …
Browse files Browse the repository at this point in the history
…gayangya/metadatacolumns
  • Loading branch information
Yangyang Gao committed Dec 6, 2023
2 parents 84ac7ff + 06b9d47 commit b86896a
Show file tree
Hide file tree
Showing 59 changed files with 1,582 additions and 387 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/stale.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ jobs:
steps:
- uses: actions/stale@v8
with:
stale-pr-message: 'This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.'
close-pr-message: 'This PR was closed because it has been stalled for 10 days with no activity.'
days-before-pr-stale: 45
days-before-pr-close: 10
stale-pr-message: 'This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.'
close-pr-message: 'This PR was auto-closed because it has been stalled for 10 days with no activity. Please feel free to reopen if it's still valid. Thanks.'
days-before-issue-stale: -1 # disabled
days-before-issue-close: -1 # disabled
stale-issue-message: 'This issue is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.'
close-issue-message: 'This issue was auto-closed because it has been stalled for 10 days with no activity. Please feel free to reopen if it's still valid. Thanks.'
operations-per-run: 300
24 changes: 12 additions & 12 deletions .github/workflows/velox_be.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ jobs:
run: |
docker exec ubuntu2004-test-slow-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss -DargLine="-Dspark.test.home=/opt/spark322" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
mvn clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss -Piceberg -DargLine="-Dspark.test.home=/opt/spark322" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2
run: |
docker exec ubuntu2004-test-slow-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -148,7 +148,7 @@ jobs:
- name: Build and Run unit test for Spark 3.3.1(slow tests)
run: |
docker exec ubuntu2004-test-spark33-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten && \
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.3
run: |
docker exec ubuntu2004-test-spark33-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -184,7 +184,7 @@ jobs:
- name: Build and Run unit test for Spark 3.3.1(other tests)
run: |
docker exec ubuntu2004-test-spark33-$GITHUB_RUN_ID bash -c 'cd /opt/gluten && \
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \
mvn test -Pspark-3.3 -Pbackends-velox -DtagsToExclude=None -DtagsToInclude=io.glutenproject.tags.UDFTest'
- name: Exit docker container
if: ${{ always() }}
Expand Down Expand Up @@ -214,7 +214,7 @@ jobs:
- name: Build and Run unit test for Spark 3.4.1(slow tests)
run: |
docker exec ubuntu2004-test-spark34-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten && \
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest'
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.4
run: |
docker exec ubuntu2004-test-spark34-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -250,7 +250,7 @@ jobs:
- name: Build and Run unit test for Spark 3.4.1(other tests)
run: |
docker exec ubuntu2004-test-spark34-$GITHUB_RUN_ID bash -c 'cd /opt/gluten && \
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \
mvn test -Pspark-3.4 -Pbackends-velox -DtagsToExclude=None -DtagsToInclude=io.glutenproject.tags.UDFTest'
- name: Exit docker container
if: ${{ always() }}
Expand Down Expand Up @@ -280,7 +280,7 @@ jobs:
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests'
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests'
- name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.2
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand All @@ -307,7 +307,7 @@ jobs:
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -DskipTests'
mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -DskipTests'
- name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.3
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand All @@ -320,7 +320,7 @@ jobs:
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -DskipTests'
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -DskipTests'
- name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.4
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -360,7 +360,7 @@ jobs:
run: |
docker exec centos8-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests'
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests'
- name: TPC-H SF1.0 && TPC-DS SF30.0 Parquet local spark3.2
run: |
docker exec centos8-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -411,7 +411,7 @@ jobs:
run: |
docker exec centos7-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests'
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests'
- name: TPC-H SF1.0 && TPC-DS SF30.0 Parquet local spark3.2
run: |
docker exec centos7-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
Expand Down Expand Up @@ -488,12 +488,12 @@ jobs:
cd /opt/gluten && \
sudo -E ./dev/vcpkg/setup-build-depends.sh && \
source ./dev/vcpkg/env.sh && \
./dev/builddeps-veloxbe.sh --build_tests=ON --build_benchmarks=ON --enable_s3=ON --enable_gcs=ON --enable_hdfs=ON'
./dev/builddeps-veloxbe.sh --run_setup_script=OFF --build_tests=ON --build_benchmarks=ON --enable_s3=ON --enable_gcs=ON --enable_hdfs=ON'
- name: Build for Spark 3.2.2
run: |
docker exec static-build-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests && \
mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests && \
cd /opt/gluten/tools/gluten-it && \
mvn clean install -Pspark-3.2'
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 (centos 8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
override def needOutputSchemaForPlan(): Boolean = true

override def allowDecimalArithmetic: Boolean = !SQLConf.get.decimalOperationsAllowPrecisionLoss

override def requiredInputFilePaths(): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,32 @@ class MetricsApiImpl extends MetricsApi with Logging {
sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"),
"prepareTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to prepare left list"),
"processTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to process"),
"joinTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to merge join"),
"totaltimeSortmergejoin" -> SQLMetrics
.createTimingMetric(sparkContext, "totaltime sortmergejoin")
"numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of merge join"),
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations"),
"streamPreProjectionCpuCount" -> SQLMetrics.createMetric(
sparkContext,
"stream preProject cpu wall time count"),
"streamPreProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"totaltime of stream preProjection"),
"bufferPreProjectionCpuCount" -> SQLMetrics.createMetric(
sparkContext,
"buffer preProject cpu wall time count"),
"bufferPreProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"totaltime of buffer preProjection"),
"postProjectionCpuCount" -> SQLMetrics.createMetric(
sparkContext,
"postProject cpu wall time count"),
"postProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"totaltime of postProjection")
)

override def genSortMergeJoinTransformerMetricsUpdater(
Expand Down Expand Up @@ -476,20 +496,13 @@ class MetricsApiImpl extends MetricsApi with Logging {
"postProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"totaltime of postProjection"),
"postProjectionOutputRows" -> SQLMetrics.createMetric(
sparkContext,
"number of postProjection output rows"),
"postProjectionOutputVectors" -> SQLMetrics.createMetric(
sparkContext,
"number of postProjection output vectors"),
"finalOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of final output rows"),
"finalOutputVectors" -> SQLMetrics.createMetric(
sparkContext,
"number of final output vectors")
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes")
)

override def genHashJoinTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater = new HashJoinMetricsUpdaterImpl(metrics)
metrics: Map[String, SQLMetric]): MetricsUpdater = new HashJoinMetricsUpdater(metrics)

override def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = {
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,8 @@ case class FilterExecTransformer(condition: Expression, child: SparkPlan)
private def getLeftCondition: Expression = {
val scanFilters = child match {
// Get the filters including the manually pushed down ones.
case batchScanTransformer: BatchScanExecTransformer =>
batchScanTransformer.filterExprs()
case fileScanTransformer: FileSourceScanExecTransformer =>
fileScanTransformer.filterExprs()
case basicScanExecTransformer: BasicScanExecTransformer =>
basicScanExecTransformer.filterExprs()
// For fallback scan, we need to keep original filter.
case _ =>
Seq.empty[Expression]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.execution

import io.glutenproject.GlutenConfig

import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.internal.SQLConf

class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper {
override protected val backend: String = "velox"
override protected val resourcePath: String = "/tpch-data-parquet-velox"
override protected val fileFormat: String = "parquet"

override def beforeAll(): Unit = {
super.beforeAll()

spark
.range(100)
.selectExpr("id as c1", "id % 3 as c2")
.write
.format("parquet")
.saveAsTable("metrics_t1")

spark
.range(200)
.selectExpr("id as c1", "id % 3 as c2")
.write
.format("parquet")
.saveAsTable("metrics_t2")
}

override protected def afterAll(): Unit = {
spark.sql("drop table metrics_t1")
spark.sql("drop table metrics_t2")

super.afterAll()
}

test("test sort merge join metrics") {
withSQLConf(
GlutenConfig.COLUMNAR_FPRCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
// without preproject
runQueryAndCompare(
"SELECT * FROM metrics_t1 join metrics_t2 on metrics_t1.c1 = metrics_t2.c1"
) {
df =>
val smj = find(df.queryExecution.executedPlan) {
case _: SortMergeJoinExecTransformer => true
case _ => false
}
assert(smj.isDefined)
val metrics = smj.get.metrics
assert(metrics("numOutputRows").value == 100)
assert(metrics("numOutputVectors").value > 0)
assert(metrics("numOutputBytes").value > 0)
}

// with preproject
runQueryAndCompare(
"SELECT * FROM metrics_t1 join metrics_t2 on metrics_t1.c1 + 1 = metrics_t2.c1 + 1"
) {
df =>
val smj = find(df.queryExecution.executedPlan) {
case _: SortMergeJoinExecTransformer => true
case _ => false
}
assert(smj.isDefined)
val metrics = smj.get.metrics
assert(metrics("numOutputRows").value == 100)
assert(metrics("numOutputVectors").value > 0)
assert(metrics("streamPreProjectionCpuCount").value > 0)
assert(metrics("bufferPreProjectionCpuCount").value > 0)
}
}
}

test("test shuffle hash join metrics") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
// without preproject
runQueryAndCompare(
"SELECT * FROM metrics_t1 join metrics_t2 on metrics_t1.c1 = metrics_t2.c1"
) {
df =>
val smj = find(df.queryExecution.executedPlan) {
case _: ShuffledHashJoinExecTransformer => true
case _ => false
}
assert(smj.isDefined)
val metrics = smj.get.metrics
assert(metrics("numOutputRows").value == 100)
assert(metrics("numOutputVectors").value > 0)
assert(metrics("numOutputBytes").value > 0)
}

// with preproject
runQueryAndCompare(
"SELECT * FROM metrics_t1 join metrics_t2 on metrics_t1.c1 + 1 = metrics_t2.c1 + 1"
) {
df =>
val smj = find(df.queryExecution.executedPlan) {
case _: ShuffledHashJoinExecTransformer => true
case _ => false
}
assert(smj.isDefined)
val metrics = smj.get.metrics
assert(metrics("numOutputRows").value == 100)
assert(metrics("numOutputVectors").value > 0)
assert(metrics("streamPreProjectionCpuCount").value > 0)
assert(metrics("buildPreProjectionCpuCount").value > 0)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,15 @@ class VeloxTPCHV1Suite extends VeloxTPCHSuite {
class VeloxTPCHV1BhjSuite extends VeloxTPCHSuite {
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.sql.sources.useV1SourceList", "")
.set("spark.sql.sources.useV1SourceList", "parquet")
.set("spark.sql.autoBroadcastJoinThreshold", "30M")
}
}

class VeloxTPCHV2Suite extends VeloxTPCHSuite {
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.sql.sources.useV1SourceList", "parquet")
.set("spark.sql.sources.useV1SourceList", "")
}
}

Expand Down
15 changes: 4 additions & 11 deletions dev/builddeps-veloxbe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ ENABLE_GCS=OFF
ENABLE_S3=OFF
ENABLE_HDFS=OFF
ENABLE_EP_CACHE=OFF
SKIP_BUILD_EP=OFF
ARROW_ENABLE_CUSTOM_CODEC=OFF
ENABLE_VCPKG=OFF

Expand Down Expand Up @@ -82,10 +81,6 @@ do
ENABLE_EP_CACHE=("${arg#*=}")
shift # Remove argument name from processing
;;
--skip_build_ep=*)
SKIP_BUILD_EP=("${arg#*=}")
shift # Remove argument name from processing
;;
--enable_vcpkg=*)
ENABLE_VCPKG=("${arg#*=}")
shift # Remove argument name from processing
Expand All @@ -104,12 +99,10 @@ if [ "$ENABLE_VCPKG" = "ON" ]; then
fi

##install velox
if [ "$SKIP_BUILD_EP" != "ON" ]; then
cd $GLUTEN_DIR/ep/build-velox/src
./get_velox.sh --enable_hdfs=$ENABLE_HDFS --build_protobuf=$BUILD_PROTOBUF --enable_s3=$ENABLE_S3 --enable_gcs=$ENABLE_GCS
./build_velox.sh --enable_s3=$ENABLE_S3 --enable_gcs=$ENABLE_GCS --build_type=$BUILD_TYPE --enable_hdfs=$ENABLE_HDFS \
--enable_ep_cache=$ENABLE_EP_CACHE --build_tests=$BUILD_TESTS --build_benchmarks=$BUILD_BENCHMARKS
fi
cd $GLUTEN_DIR/ep/build-velox/src
./get_velox.sh --enable_hdfs=$ENABLE_HDFS --build_protobuf=$BUILD_PROTOBUF --enable_s3=$ENABLE_S3 --enable_gcs=$ENABLE_GCS
./build_velox.sh --enable_s3=$ENABLE_S3 --enable_gcs=$ENABLE_GCS --build_type=$BUILD_TYPE --enable_hdfs=$ENABLE_HDFS \
--enable_ep_cache=$ENABLE_EP_CACHE --build_tests=$BUILD_TESTS --build_benchmarks=$BUILD_BENCHMARKS

## compile gluten cpp
cd $GLUTEN_DIR/cpp
Expand Down
Loading

0 comments on commit b86896a

Please sign in to comment.