Skip to content

Commit

Permalink
Merge branch 'gayangya/offload_project' of https://github.com/gaoyang…
Browse files Browse the repository at this point in the history
…xiaozhu/gluten into gayangya/offload_project
  • Loading branch information
gaoyangxiaozhu committed Jun 27, 2024
2 parents 1c935cc + 280df5f commit d2bd539
Show file tree
Hide file tree
Showing 84 changed files with 3,182 additions and 1,643 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ jobs:
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Setup tzdata
run: |
if [ "${{ matrix.os }}" = "ubuntu:22.04" ]; then
apt-get update
TZ="Etc/GMT" DEBIAN_FRONTEND=noninteractive apt-get install -y tzdata
fi
- name: Setup java and maven
run: |
if [ "${{ matrix.java }}" = "java-17" ]; then
Expand Down Expand Up @@ -530,6 +536,10 @@ jobs:
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Setup tzdata
run: |
apt-get update
TZ="Etc/GMT" DEBIAN_FRONTEND=noninteractive apt-get install -y tzdata
- name: Setup java and maven
run: |
apt-get update && apt-get install -y openjdk-8-jdk maven wget
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.{GlutenConfig, GlutenNumaBindingInfo}
import org.apache.gluten.GlutenNumaBindingInfo
import org.apache.gluten.backendsapi.IteratorApi
import org.apache.gluten.execution._
import org.apache.gluten.expression.ConverterUtils
Expand Down Expand Up @@ -61,6 +61,52 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
StructType(dataSchema)
}

private def createNativeIterator(
splitInfoByteArray: Array[Array[Byte]],
wsPlan: Array[Byte],
materializeInput: Boolean,
inputIterators: Seq[Iterator[ColumnarBatch]]): BatchIterator = {

/** Generate closeable ColumnBatch iterator. */
val listIterator =
inputIterators
.map {
case i: CloseableCHColumnBatchIterator => i
case it => new CloseableCHColumnBatchIterator(it)
}
.map(it => new ColumnarNativeIterator(it.asJava).asInstanceOf[GeneralInIterator])
.asJava
new CHNativeExpressionEvaluator().createKernelWithBatchIterator(
wsPlan,
splitInfoByteArray,
listIterator,
materializeInput
)
}

private def createCloseIterator(
context: TaskContext,
pipelineTime: SQLMetric,
updateNativeMetrics: IMetrics => Unit,
updateInputMetrics: Option[InputMetricsWrapper => Unit] = None,
nativeIter: BatchIterator): CloseableCHColumnBatchIterator = {

val iter = new CollectMetricIterator(
nativeIter,
updateNativeMetrics,
updateInputMetrics,
updateInputMetrics.map(_ => context.taskMetrics().inputMetrics).orNull)

context.addTaskFailureListener(
(ctx, _) => {
if (ctx.isInterrupted()) {
iter.cancel()
}
})
context.addTaskCompletionListener[Unit](_ => iter.close())
new CloseableCHColumnBatchIterator(iter, Some(pipelineTime))
}

// only set file schema for text format table
private def setFileSchemaForLocalFiles(
localFilesNode: LocalFilesNode,
Expand Down Expand Up @@ -198,45 +244,24 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()
): Iterator[ColumnarBatch] = {

assert(
require(
inputPartition.isInstanceOf[GlutenPartition],
"CH backend only accepts GlutenPartition in GlutenWholeStageColumnarRDD.")

val transKernel = new CHNativeExpressionEvaluator()
val inBatchIters = new JArrayList[GeneralInIterator](inputIterators.map {
iter => new ColumnarNativeIterator(CHIteratorApi.genCloseableColumnBatchIterator(iter).asJava)
}.asJava)

val splitInfoByteArray = inputPartition
.asInstanceOf[GlutenPartition]
.splitInfosByteArray
val nativeIter =
transKernel.createKernelWithBatchIterator(
inputPartition.plan,
splitInfoByteArray,
inBatchIters,
false)
val wsPlan = inputPartition.plan
val materializeInput = false

val iter = new CollectMetricIterator(
nativeIter,
updateNativeMetrics,
updateInputMetrics,
context.taskMetrics().inputMetrics)

context.addTaskFailureListener(
(ctx, _) => {
if (ctx.isInterrupted()) {
iter.cancel()
}
})
context.addTaskCompletionListener[Unit](_ => iter.close())

// TODO: SPARK-25083 remove the type erasure hack in data source scan
new InterruptibleIterator(
context,
new CloseableCHColumnBatchIterator(
iter.asInstanceOf[Iterator[ColumnarBatch]],
Some(pipelineTime)))
createCloseIterator(
context,
pipelineTime,
updateNativeMetrics,
Some(updateInputMetrics),
createNativeIterator(splitInfoByteArray, wsPlan, materializeInput, inputIterators))
)
}

// Generate Iterator[ColumnarBatch] for final stage.
Expand All @@ -252,52 +277,26 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
partitionIndex: Int,
materializeInput: Boolean): Iterator[ColumnarBatch] = {
// scalastyle:on argcount
GlutenConfig.getConf

val transKernel = new CHNativeExpressionEvaluator()
val columnarNativeIterator =
new JArrayList[GeneralInIterator](inputIterators.map {
iter =>
new ColumnarNativeIterator(CHIteratorApi.genCloseableColumnBatchIterator(iter).asJava)
}.asJava)
// we need to complete dependency RDD's firstly
val nativeIterator = transKernel.createKernelWithBatchIterator(
rootNode.toProtobuf.toByteArray,
// Final iterator does not contain scan split, so pass empty split info to native here.
new Array[Array[Byte]](0),
columnarNativeIterator,
materializeInput
)

val iter = new CollectMetricIterator(nativeIterator, updateNativeMetrics, null, null)

context.addTaskFailureListener(
(ctx, _) => {
if (ctx.isInterrupted()) {
iter.cancel()
}
})
context.addTaskCompletionListener[Unit](_ => iter.close())
new CloseableCHColumnBatchIterator(iter, Some(pipelineTime))
}
}
// Final iterator does not contain scan split, so pass empty split info to native here.
val splitInfoByteArray = new Array[Array[Byte]](0)
val wsPlan = rootNode.toProtobuf.toByteArray

object CHIteratorApi {

/** Generate closeable ColumnBatch iterator. */
def genCloseableColumnBatchIterator(iter: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = {
iter match {
case _: CloseableCHColumnBatchIterator => iter
case _ => new CloseableCHColumnBatchIterator(iter)
}
// we need to complete dependency RDD's firstly
createCloseIterator(
context,
pipelineTime,
updateNativeMetrics,
None,
createNativeIterator(splitInfoByteArray, wsPlan, materializeInput, inputIterators))
}
}

class CollectMetricIterator(
val nativeIterator: BatchIterator,
val updateNativeMetrics: IMetrics => Unit,
val updateInputMetrics: InputMetricsWrapper => Unit,
val inputMetrics: InputMetrics
val updateInputMetrics: Option[InputMetricsWrapper => Unit] = None,
val inputMetrics: InputMetrics = null
) extends Iterator[ColumnarBatch] {
private var outputRowCount = 0L
private var outputVectorCount = 0L
Expand Down Expand Up @@ -329,9 +328,7 @@ class CollectMetricIterator(
val nativeMetrics = nativeIterator.getMetrics.asInstanceOf[NativeMetrics]
nativeMetrics.setFinalOutputMetrics(outputRowCount, outputVectorCount)
updateNativeMetrics(nativeMetrics)
if (updateInputMetrics != null) {
updateInputMetrics(inputMetrics)
}
updateInputMetrics.foreach(_(inputMetrics))
metricsUpdated = true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.NativeWritePostRule
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
Expand Down Expand Up @@ -583,14 +582,6 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
override def genExtendedColumnarTransformRules(): List[SparkSession => Rule[SparkPlan]] =
List()

/**
* Generate extended columnar post-rules.
*
* @return
*/
override def genExtendedColumnarPostRules(): List[SparkSession => Rule[SparkPlan]] =
List(spark => NativeWritePostRule(spark))

override def genInjectPostHocResolutionRules(): List[SparkSession => Rule[LogicalPlan]] = {
List()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.clickhouse.CHIteratorApi
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.utils.{BroadcastHashJoinStrategy, CHJoinValidateUtil, ShuffleHashJoinStrategy}

Expand Down Expand Up @@ -75,7 +74,7 @@ case class CHBroadcastBuildSideRDD(

override def genBroadcastBuildSideIterator(): Iterator[ColumnarBatch] = {
CHBroadcastBuildSideCache.getOrBuildBroadcastHashTable(broadcasted, broadcastContext)
CHIteratorApi.genCloseableColumnBatchIterator(Iterator.empty)
Iterator.empty
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ object CHExpressionUtil {
UNIX_MICROS -> DefaultValidator(),
TIMESTAMP_MILLIS -> DefaultValidator(),
TIMESTAMP_MICROS -> DefaultValidator(),
FLATTEN -> DefaultValidator(),
STACK -> DefaultValidator()
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ class GlutenClickHouseDecimalSuite
private val decimalTPCHTables: Seq[(DecimalType, Seq[Int])] = Seq.apply(
(DecimalType.apply(9, 4), Seq()),
// 1: ch decimal avg is float
(DecimalType.apply(18, 8), Seq(1)),
(DecimalType.apply(18, 8), Seq()),
// 1: ch decimal avg is float, 3/10: all value is null and compare with limit
(DecimalType.apply(38, 19), Seq(1, 3, 10))
(DecimalType.apply(38, 19), Seq(3, 10))
)

private def createDecimalTables(dataType: DecimalType): Unit = {
Expand Down Expand Up @@ -337,7 +337,6 @@ class GlutenClickHouseDecimalSuite
allowPrecisionLoss =>
Range
.inclusive(1, 22)
.filter(_ != 17) // Ignore Q17 which include avg
.foreach {
sql_num =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMerg

import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.{FileSystem, Path}

import java.io.File

import scala.concurrent.duration.DurationInt

// Some sqls' line length exceeds 100
// scalastyle:off line.size.limit

Expand Down Expand Up @@ -614,5 +616,45 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
.count()
assertResult(600572)(result)
}

test("test mergetree insert with optimize basic") {
val tableName = "lineitem_mergetree_insert_optimize_basic_hdfs"
val dataPath = s"$HDFS_URL/test/$tableName"

withSQLConf(
"spark.databricks.delta.optimize.minFileSize" -> "200000000",
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true",
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.insert_without_local_storage" -> "true",
"spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows" -> "10000"
) {
spark.sql(s"""
|DROP TABLE IF EXISTS $tableName;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS $tableName
|USING clickhouse
|LOCATION '$dataPath'
|TBLPROPERTIES (storage_policy='__hdfs_main')
| as select * from lineitem
|""".stripMargin)

val ret = spark.sql(s"select count(*) from $tableName").collect()
assertResult(600572)(ret.apply(0).get(0))
val conf = new Configuration
conf.set("fs.defaultFS", HDFS_URL)
val fs = FileSystem.get(conf)

eventually(timeout(60.seconds), interval(2.seconds)) {
val it = fs.listFiles(new Path(dataPath), true)
var files = 0
while (it.hasNext) {
it.next()
files += 1
}
assertResult(72)(files)
}
}
}
}
// scalastyle:off line.size.limit
Loading

0 comments on commit d2bd539

Please sign in to comment.