Skip to content

Commit

Permalink
Merge branch 'main' into acvictor/shuffle
Browse files Browse the repository at this point in the history
  • Loading branch information
acvictor authored Jun 27, 2024
2 parents 92696fc + b65ecce commit a6c9a6d
Show file tree
Hide file tree
Showing 119 changed files with 3,833 additions and 1,595 deletions.
25 changes: 23 additions & 2 deletions .github/workflows/build_bundle_package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ on:
jobs:
build-native-lib:
runs-on: ubuntu-20.04
container: apache/gluten:gluten-vcpkg-builder_2024_03_17
container: apache/gluten:gluten-vcpkg-builder_2024_05_29
steps:
- uses: actions/checkout@v2
- name: Build Gluten velox third party
Expand All @@ -53,11 +53,17 @@ jobs:
export NUM_THREADS=4
./dev/builddeps-veloxbe.sh --build_tests=OFF --build_benchmarks=OFF --enable_s3=OFF \
--enable_gcs=OFF --enable_hdfs=ON --enable_abfs=OFF
- uses: actions/upload-artifact@v2
- name: Upload native libs
uses: actions/upload-artifact@v2
with:
path: ./cpp/build/releases/
name: velox-native-lib-${{github.sha}}
retention-days: 1
- name: Upload Artifact Arrow Jar
uses: actions/upload-artifact@v2
with:
path: /root/.m2/repository/org/apache/arrow/
name: velox-arrow-jar-centos-7-${{github.sha}}

build-bundle-package-ubuntu:
if: startsWith(github.event.inputs.os, 'ubuntu')
Expand All @@ -71,6 +77,11 @@ jobs:
with:
name: velox-native-lib-${{github.sha}}
path: ./cpp/build/releases
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Setup java and maven
run: |
apt-get update && \
Expand Down Expand Up @@ -99,6 +110,11 @@ jobs:
with:
name: velox-native-lib-${{github.sha}}
path: ./cpp/build/releases
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Setup java and maven
run: |
yum update -y && yum install -y java-1.8.0-openjdk-devel wget && \
Expand Down Expand Up @@ -130,6 +146,11 @@ jobs:
with:
name: velox-native-lib-${{github.sha}}
path: ./cpp/build/releases
- name: Download All Arrow Jar Artifacts
uses: actions/download-artifact@v2
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Update mirror list
run: |
sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true && \
Expand Down
16 changes: 13 additions & 3 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ jobs:
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Setup tzdata
run: |
if [ "${{ matrix.os }}" = "ubuntu:22.04" ]; then
apt-get update
TZ="Etc/GMT" DEBIAN_FRONTEND=noninteractive apt-get install -y tzdata
fi
- name: Setup java and maven
run: |
if [ "${{ matrix.java }}" = "java-17" ]; then
Expand Down Expand Up @@ -515,7 +521,7 @@ jobs:
fail-fast: false
matrix:
spark: ["spark-3.2"]
celeborn: ["celeborn-0.4.0", "celeborn-0.3.2"]
celeborn: ["celeborn-0.4.1", "celeborn-0.3.2-incubating"]
runs-on: ubuntu-20.04
container: ubuntu:22.04
steps:
Expand All @@ -530,6 +536,10 @@ jobs:
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Setup tzdata
run: |
apt-get update
TZ="Etc/GMT" DEBIAN_FRONTEND=noninteractive apt-get install -y tzdata
- name: Setup java and maven
run: |
apt-get update && apt-get install -y openjdk-8-jdk maven wget
Expand All @@ -547,8 +557,8 @@ jobs:
fi
echo "EXTRA_PROFILE: ${EXTRA_PROFILE}"
cd /opt && mkdir -p celeborn && \
wget https://archive.apache.org/dist/incubator/celeborn/${{ matrix.celeborn }}-incubating/apache-${{ matrix.celeborn }}-incubating-bin.tgz && \
tar xzf apache-${{ matrix.celeborn }}-incubating-bin.tgz -C /opt/celeborn --strip-components=1 && cd celeborn && \
wget https://archive.apache.org/dist/celeborn/${{ matrix.celeborn }}/apache-${{ matrix.celeborn }}-bin.tgz && \
tar xzf apache-${{ matrix.celeborn }}-bin.tgz -C /opt/celeborn --strip-components=1 && cd celeborn && \
mv ./conf/celeborn-env.sh.template ./conf/celeborn-env.sh && \
bash -c "echo -e 'CELEBORN_MASTER_MEMORY=4g\nCELEBORN_WORKER_MEMORY=4g\nCELEBORN_WORKER_OFFHEAP_MEMORY=8g' > ./conf/celeborn-env.sh" && \
bash -c "echo -e 'celeborn.worker.commitFiles.threads 128\nceleborn.worker.sortPartition.threads 64' > ./conf/celeborn-defaults.conf" && \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.{GlutenConfig, GlutenNumaBindingInfo}
import org.apache.gluten.GlutenNumaBindingInfo
import org.apache.gluten.backendsapi.IteratorApi
import org.apache.gluten.execution._
import org.apache.gluten.expression.ConverterUtils
Expand Down Expand Up @@ -61,6 +61,52 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
StructType(dataSchema)
}

private def createNativeIterator(
splitInfoByteArray: Array[Array[Byte]],
wsPlan: Array[Byte],
materializeInput: Boolean,
inputIterators: Seq[Iterator[ColumnarBatch]]): BatchIterator = {

/** Generate closeable ColumnBatch iterator. */
val listIterator =
inputIterators
.map {
case i: CloseableCHColumnBatchIterator => i
case it => new CloseableCHColumnBatchIterator(it)
}
.map(it => new ColumnarNativeIterator(it.asJava).asInstanceOf[GeneralInIterator])
.asJava
new CHNativeExpressionEvaluator().createKernelWithBatchIterator(
wsPlan,
splitInfoByteArray,
listIterator,
materializeInput
)
}

private def createCloseIterator(
context: TaskContext,
pipelineTime: SQLMetric,
updateNativeMetrics: IMetrics => Unit,
updateInputMetrics: Option[InputMetricsWrapper => Unit] = None,
nativeIter: BatchIterator): CloseableCHColumnBatchIterator = {

val iter = new CollectMetricIterator(
nativeIter,
updateNativeMetrics,
updateInputMetrics,
updateInputMetrics.map(_ => context.taskMetrics().inputMetrics).orNull)

context.addTaskFailureListener(
(ctx, _) => {
if (ctx.isInterrupted()) {
iter.cancel()
}
})
context.addTaskCompletionListener[Unit](_ => iter.close())
new CloseableCHColumnBatchIterator(iter, Some(pipelineTime))
}

// only set file schema for text format table
private def setFileSchemaForLocalFiles(
localFilesNode: LocalFilesNode,
Expand Down Expand Up @@ -198,45 +244,24 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()
): Iterator[ColumnarBatch] = {

assert(
require(
inputPartition.isInstanceOf[GlutenPartition],
"CH backend only accepts GlutenPartition in GlutenWholeStageColumnarRDD.")

val transKernel = new CHNativeExpressionEvaluator()
val inBatchIters = new JArrayList[GeneralInIterator](inputIterators.map {
iter => new ColumnarNativeIterator(CHIteratorApi.genCloseableColumnBatchIterator(iter).asJava)
}.asJava)

val splitInfoByteArray = inputPartition
.asInstanceOf[GlutenPartition]
.splitInfosByteArray
val nativeIter =
transKernel.createKernelWithBatchIterator(
inputPartition.plan,
splitInfoByteArray,
inBatchIters,
false)
val wsPlan = inputPartition.plan
val materializeInput = false

val iter = new CollectMetricIterator(
nativeIter,
updateNativeMetrics,
updateInputMetrics,
context.taskMetrics().inputMetrics)

context.addTaskFailureListener(
(ctx, _) => {
if (ctx.isInterrupted()) {
iter.cancel()
}
})
context.addTaskCompletionListener[Unit](_ => iter.close())

// TODO: SPARK-25083 remove the type erasure hack in data source scan
new InterruptibleIterator(
context,
new CloseableCHColumnBatchIterator(
iter.asInstanceOf[Iterator[ColumnarBatch]],
Some(pipelineTime)))
createCloseIterator(
context,
pipelineTime,
updateNativeMetrics,
Some(updateInputMetrics),
createNativeIterator(splitInfoByteArray, wsPlan, materializeInput, inputIterators))
)
}

// Generate Iterator[ColumnarBatch] for final stage.
Expand All @@ -252,52 +277,26 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
partitionIndex: Int,
materializeInput: Boolean): Iterator[ColumnarBatch] = {
// scalastyle:on argcount
GlutenConfig.getConf

val transKernel = new CHNativeExpressionEvaluator()
val columnarNativeIterator =
new JArrayList[GeneralInIterator](inputIterators.map {
iter =>
new ColumnarNativeIterator(CHIteratorApi.genCloseableColumnBatchIterator(iter).asJava)
}.asJava)
// we need to complete dependency RDD's firstly
val nativeIterator = transKernel.createKernelWithBatchIterator(
rootNode.toProtobuf.toByteArray,
// Final iterator does not contain scan split, so pass empty split info to native here.
new Array[Array[Byte]](0),
columnarNativeIterator,
materializeInput
)

val iter = new CollectMetricIterator(nativeIterator, updateNativeMetrics, null, null)

context.addTaskFailureListener(
(ctx, _) => {
if (ctx.isInterrupted()) {
iter.cancel()
}
})
context.addTaskCompletionListener[Unit](_ => iter.close())
new CloseableCHColumnBatchIterator(iter, Some(pipelineTime))
}
}
// Final iterator does not contain scan split, so pass empty split info to native here.
val splitInfoByteArray = new Array[Array[Byte]](0)
val wsPlan = rootNode.toProtobuf.toByteArray

object CHIteratorApi {

/** Generate closeable ColumnBatch iterator. */
def genCloseableColumnBatchIterator(iter: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = {
iter match {
case _: CloseableCHColumnBatchIterator => iter
case _ => new CloseableCHColumnBatchIterator(iter)
}
// we need to complete dependency RDD's firstly
createCloseIterator(
context,
pipelineTime,
updateNativeMetrics,
None,
createNativeIterator(splitInfoByteArray, wsPlan, materializeInput, inputIterators))
}
}

class CollectMetricIterator(
val nativeIterator: BatchIterator,
val updateNativeMetrics: IMetrics => Unit,
val updateInputMetrics: InputMetricsWrapper => Unit,
val inputMetrics: InputMetrics
val updateInputMetrics: Option[InputMetricsWrapper => Unit] = None,
val inputMetrics: InputMetrics = null
) extends Iterator[ColumnarBatch] {
private var outputRowCount = 0L
private var outputVectorCount = 0L
Expand Down Expand Up @@ -329,9 +328,7 @@ class CollectMetricIterator(
val nativeMetrics = nativeIterator.getMetrics.asInstanceOf[NativeMetrics]
nativeMetrics.setFinalOutputMetrics(outputRowCount, outputVectorCount)
updateNativeMetrics(nativeMetrics)
if (updateInputMetrics != null) {
updateInputMetrics(inputMetrics)
}
updateInputMetrics.foreach(_(inputMetrics))
metricsUpdated = true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.NativeWritePostRule
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
Expand Down Expand Up @@ -583,14 +582,6 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
override def genExtendedColumnarTransformRules(): List[SparkSession => Rule[SparkPlan]] =
List()

/**
* Generate extended columnar post-rules.
*
* @return
*/
override def genExtendedColumnarPostRules(): List[SparkSession => Rule[SparkPlan]] =
List(spark => NativeWritePostRule(spark))

override def genInjectPostHocResolutionRules(): List[SparkSession => Rule[LogicalPlan]] = {
List()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.clickhouse.CHIteratorApi
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.utils.{BroadcastHashJoinStrategy, CHJoinValidateUtil, ShuffleHashJoinStrategy}

Expand Down Expand Up @@ -75,7 +74,7 @@ case class CHBroadcastBuildSideRDD(

override def genBroadcastBuildSideIterator(): Iterator[ColumnarBatch] = {
CHBroadcastBuildSideCache.getOrBuildBroadcastHashTable(broadcasted, broadcastContext)
CHIteratorApi.genCloseableColumnBatchIterator(Iterator.empty)
Iterator.empty
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ object CHExpressionUtil {
UNIX_MICROS -> DefaultValidator(),
TIMESTAMP_MILLIS -> DefaultValidator(),
TIMESTAMP_MICROS -> DefaultValidator(),
FLATTEN -> DefaultValidator(),
STACK -> DefaultValidator()
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ class GlutenClickHouseDecimalSuite
private val decimalTPCHTables: Seq[(DecimalType, Seq[Int])] = Seq.apply(
(DecimalType.apply(9, 4), Seq()),
// 1: ch decimal avg is float
(DecimalType.apply(18, 8), Seq(1)),
(DecimalType.apply(18, 8), Seq()),
// 1: ch decimal avg is float, 3/10: all value is null and compare with limit
(DecimalType.apply(38, 19), Seq(1, 3, 10))
(DecimalType.apply(38, 19), Seq(3, 10))
)

private def createDecimalTables(dataType: DecimalType): Unit = {
Expand Down Expand Up @@ -337,7 +337,6 @@ class GlutenClickHouseDecimalSuite
allowPrecisionLoss =>
Range
.inclusive(1, 22)
.filter(_ != 17) // Ignore Q17 which include avg
.foreach {
sql_num =>
{
Expand Down
Loading

0 comments on commit a6c9a6d

Please sign in to comment.