Skip to content

Commit

Permalink
Merge branch 'apache:main' into gayangya/metadtastructsuite_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyangxiaozhu authored Jun 27, 2024
2 parents 37ea5c1 + 7bf6cd4 commit ee0ac0c
Show file tree
Hide file tree
Showing 108 changed files with 3,587 additions and 1,789 deletions.
16 changes: 13 additions & 3 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ jobs:
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Setup tzdata
run: |
if [ "${{ matrix.os }}" = "ubuntu:22.04" ]; then
apt-get update
TZ="Etc/GMT" DEBIAN_FRONTEND=noninteractive apt-get install -y tzdata
fi
- name: Setup java and maven
run: |
if [ "${{ matrix.java }}" = "java-17" ]; then
Expand Down Expand Up @@ -515,7 +521,7 @@ jobs:
fail-fast: false
matrix:
spark: ["spark-3.2"]
celeborn: ["celeborn-0.4.0", "celeborn-0.3.2"]
celeborn: ["celeborn-0.4.1", "celeborn-0.3.2-incubating"]
runs-on: ubuntu-20.04
container: ubuntu:22.04
steps:
Expand All @@ -530,6 +536,10 @@ jobs:
with:
name: velox-arrow-jar-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Setup tzdata
run: |
apt-get update
TZ="Etc/GMT" DEBIAN_FRONTEND=noninteractive apt-get install -y tzdata
- name: Setup java and maven
run: |
apt-get update && apt-get install -y openjdk-8-jdk maven wget
Expand All @@ -547,8 +557,8 @@ jobs:
fi
echo "EXTRA_PROFILE: ${EXTRA_PROFILE}"
cd /opt && mkdir -p celeborn && \
wget https://archive.apache.org/dist/incubator/celeborn/${{ matrix.celeborn }}-incubating/apache-${{ matrix.celeborn }}-incubating-bin.tgz && \
tar xzf apache-${{ matrix.celeborn }}-incubating-bin.tgz -C /opt/celeborn --strip-components=1 && cd celeborn && \
wget https://archive.apache.org/dist/celeborn/${{ matrix.celeborn }}/apache-${{ matrix.celeborn }}-bin.tgz && \
tar xzf apache-${{ matrix.celeborn }}-bin.tgz -C /opt/celeborn --strip-components=1 && cd celeborn && \
mv ./conf/celeborn-env.sh.template ./conf/celeborn-env.sh && \
bash -c "echo -e 'CELEBORN_MASTER_MEMORY=4g\nCELEBORN_WORKER_MEMORY=4g\nCELEBORN_WORKER_OFFHEAP_MEMORY=8g' > ./conf/celeborn-env.sh" && \
bash -c "echo -e 'celeborn.worker.commitFiles.threads 128\nceleborn.worker.sortPartition.threads 64' > ./conf/celeborn-defaults.conf" && \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.{GlutenConfig, GlutenNumaBindingInfo}
import org.apache.gluten.GlutenNumaBindingInfo
import org.apache.gluten.backendsapi.IteratorApi
import org.apache.gluten.execution._
import org.apache.gluten.expression.ConverterUtils
Expand Down Expand Up @@ -61,6 +61,52 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
StructType(dataSchema)
}

private def createNativeIterator(
splitInfoByteArray: Array[Array[Byte]],
wsPlan: Array[Byte],
materializeInput: Boolean,
inputIterators: Seq[Iterator[ColumnarBatch]]): BatchIterator = {

/** Generate closeable ColumnBatch iterator. */
val listIterator =
inputIterators
.map {
case i: CloseableCHColumnBatchIterator => i
case it => new CloseableCHColumnBatchIterator(it)
}
.map(it => new ColumnarNativeIterator(it.asJava).asInstanceOf[GeneralInIterator])
.asJava
new CHNativeExpressionEvaluator().createKernelWithBatchIterator(
wsPlan,
splitInfoByteArray,
listIterator,
materializeInput
)
}

private def createCloseIterator(
context: TaskContext,
pipelineTime: SQLMetric,
updateNativeMetrics: IMetrics => Unit,
updateInputMetrics: Option[InputMetricsWrapper => Unit] = None,
nativeIter: BatchIterator): CloseableCHColumnBatchIterator = {

val iter = new CollectMetricIterator(
nativeIter,
updateNativeMetrics,
updateInputMetrics,
updateInputMetrics.map(_ => context.taskMetrics().inputMetrics).orNull)

context.addTaskFailureListener(
(ctx, _) => {
if (ctx.isInterrupted()) {
iter.cancel()
}
})
context.addTaskCompletionListener[Unit](_ => iter.close())
new CloseableCHColumnBatchIterator(iter, Some(pipelineTime))
}

// only set file schema for text format table
private def setFileSchemaForLocalFiles(
localFilesNode: LocalFilesNode,
Expand Down Expand Up @@ -198,45 +244,24 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()
): Iterator[ColumnarBatch] = {

assert(
require(
inputPartition.isInstanceOf[GlutenPartition],
"CH backend only accepts GlutenPartition in GlutenWholeStageColumnarRDD.")

val transKernel = new CHNativeExpressionEvaluator()
val inBatchIters = new JArrayList[GeneralInIterator](inputIterators.map {
iter => new ColumnarNativeIterator(CHIteratorApi.genCloseableColumnBatchIterator(iter).asJava)
}.asJava)

val splitInfoByteArray = inputPartition
.asInstanceOf[GlutenPartition]
.splitInfosByteArray
val nativeIter =
transKernel.createKernelWithBatchIterator(
inputPartition.plan,
splitInfoByteArray,
inBatchIters,
false)
val wsPlan = inputPartition.plan
val materializeInput = false

val iter = new CollectMetricIterator(
nativeIter,
updateNativeMetrics,
updateInputMetrics,
context.taskMetrics().inputMetrics)

context.addTaskFailureListener(
(ctx, _) => {
if (ctx.isInterrupted()) {
iter.cancel()
}
})
context.addTaskCompletionListener[Unit](_ => iter.close())

// TODO: SPARK-25083 remove the type erasure hack in data source scan
new InterruptibleIterator(
context,
new CloseableCHColumnBatchIterator(
iter.asInstanceOf[Iterator[ColumnarBatch]],
Some(pipelineTime)))
createCloseIterator(
context,
pipelineTime,
updateNativeMetrics,
Some(updateInputMetrics),
createNativeIterator(splitInfoByteArray, wsPlan, materializeInput, inputIterators))
)
}

// Generate Iterator[ColumnarBatch] for final stage.
Expand All @@ -252,52 +277,26 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
partitionIndex: Int,
materializeInput: Boolean): Iterator[ColumnarBatch] = {
// scalastyle:on argcount
GlutenConfig.getConf

val transKernel = new CHNativeExpressionEvaluator()
val columnarNativeIterator =
new JArrayList[GeneralInIterator](inputIterators.map {
iter =>
new ColumnarNativeIterator(CHIteratorApi.genCloseableColumnBatchIterator(iter).asJava)
}.asJava)
// we need to complete dependency RDD's firstly
val nativeIterator = transKernel.createKernelWithBatchIterator(
rootNode.toProtobuf.toByteArray,
// Final iterator does not contain scan split, so pass empty split info to native here.
new Array[Array[Byte]](0),
columnarNativeIterator,
materializeInput
)

val iter = new CollectMetricIterator(nativeIterator, updateNativeMetrics, null, null)

context.addTaskFailureListener(
(ctx, _) => {
if (ctx.isInterrupted()) {
iter.cancel()
}
})
context.addTaskCompletionListener[Unit](_ => iter.close())
new CloseableCHColumnBatchIterator(iter, Some(pipelineTime))
}
}
// Final iterator does not contain scan split, so pass empty split info to native here.
val splitInfoByteArray = new Array[Array[Byte]](0)
val wsPlan = rootNode.toProtobuf.toByteArray

object CHIteratorApi {

/** Generate closeable ColumnBatch iterator. */
def genCloseableColumnBatchIterator(iter: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = {
iter match {
case _: CloseableCHColumnBatchIterator => iter
case _ => new CloseableCHColumnBatchIterator(iter)
}
// we need to complete dependency RDD's firstly
createCloseIterator(
context,
pipelineTime,
updateNativeMetrics,
None,
createNativeIterator(splitInfoByteArray, wsPlan, materializeInput, inputIterators))
}
}

class CollectMetricIterator(
val nativeIterator: BatchIterator,
val updateNativeMetrics: IMetrics => Unit,
val updateInputMetrics: InputMetricsWrapper => Unit,
val inputMetrics: InputMetrics
val updateInputMetrics: Option[InputMetricsWrapper => Unit] = None,
val inputMetrics: InputMetrics = null
) extends Iterator[ColumnarBatch] {
private var outputRowCount = 0L
private var outputVectorCount = 0L
Expand Down Expand Up @@ -329,9 +328,7 @@ class CollectMetricIterator(
val nativeMetrics = nativeIterator.getMetrics.asInstanceOf[NativeMetrics]
nativeMetrics.setFinalOutputMetrics(outputRowCount, outputVectorCount)
updateNativeMetrics(nativeMetrics)
if (updateInputMetrics != null) {
updateInputMetrics(inputMetrics)
}
updateInputMetrics.foreach(_(inputMetrics))
metricsUpdated = true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.NativeWritePostRule
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
Expand Down Expand Up @@ -583,14 +582,6 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
override def genExtendedColumnarTransformRules(): List[SparkSession => Rule[SparkPlan]] =
List()

/**
* Generate extended columnar post-rules.
*
* @return
*/
override def genExtendedColumnarPostRules(): List[SparkSession => Rule[SparkPlan]] =
List(spark => NativeWritePostRule(spark))

override def genInjectPostHocResolutionRules(): List[SparkSession => Rule[LogicalPlan]] = {
List()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.clickhouse.CHIteratorApi
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.utils.{BroadcastHashJoinStrategy, CHJoinValidateUtil, ShuffleHashJoinStrategy}

Expand Down Expand Up @@ -75,7 +74,7 @@ case class CHBroadcastBuildSideRDD(

override def genBroadcastBuildSideIterator(): Iterator[ColumnarBatch] = {
CHBroadcastBuildSideCache.getOrBuildBroadcastHashTable(broadcasted, broadcastContext)
CHIteratorApi.genCloseableColumnBatchIterator(Iterator.empty)
Iterator.empty
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ object CHExpressionUtil {
UNIX_MICROS -> DefaultValidator(),
TIMESTAMP_MILLIS -> DefaultValidator(),
TIMESTAMP_MICROS -> DefaultValidator(),
FLATTEN -> DefaultValidator(),
STACK -> DefaultValidator()
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ class GlutenClickHouseDecimalSuite
private val decimalTPCHTables: Seq[(DecimalType, Seq[Int])] = Seq.apply(
(DecimalType.apply(9, 4), Seq()),
// 1: ch decimal avg is float
(DecimalType.apply(18, 8), Seq(1)),
(DecimalType.apply(18, 8), Seq()),
// 1: ch decimal avg is float, 3/10: all value is null and compare with limit
(DecimalType.apply(38, 19), Seq(1, 3, 10))
(DecimalType.apply(38, 19), Seq(3, 10))
)

private def createDecimalTables(dataType: DecimalType): Unit = {
Expand Down Expand Up @@ -337,7 +337,6 @@ class GlutenClickHouseDecimalSuite
allowPrecisionLoss =>
Range
.inclusive(1, 22)
.filter(_ != 17) // Ignore Q17 which include avg
.foreach {
sql_num =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1252,4 +1252,29 @@ class GlutenClickHouseHiveTableSuite
}
spark.sql("drop table test_tbl_3452")
}

test("GLUTEN-6235: Fix crash on ExpandTransform::work()") {
val tbl = "test_tbl_6235"
sql(s"drop table if exists $tbl")
val createSql =
s"""
|create table $tbl
|stored as textfile
|as select 1 as a1, 2 as a2, 3 as a3, 4 as a4, 5 as a5, 6 as a6, 7 as a7, 8 as a8, 9 as a9
|""".stripMargin
sql(createSql)
val select_sql =
s"""
|select
|a5,a6,a7,a8,a3,a4,a9
|,count(distinct a2) as a2
|,count(distinct a1) as a1
|,count(distinct if(a3=1,a2,null)) as a33
|,count(distinct if(a4=2,a1,null)) as a43
|from $tbl
|group by a5,a6,a7,a8,a3,a4,a9 with cube
|""".stripMargin
compareResultsAgainstVanillaSpark(select_sql, true, { _ => })
sql(s"drop table if exists $tbl")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMerg

import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.{FileSystem, Path}

import java.io.File

import scala.concurrent.duration.DurationInt

// Some sqls' line length exceeds 100
// scalastyle:off line.size.limit

Expand Down Expand Up @@ -614,5 +616,45 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
.count()
assertResult(600572)(result)
}

test("test mergetree insert with optimize basic") {
val tableName = "lineitem_mergetree_insert_optimize_basic_hdfs"
val dataPath = s"$HDFS_URL/test/$tableName"

withSQLConf(
"spark.databricks.delta.optimize.minFileSize" -> "200000000",
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true",
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.insert_without_local_storage" -> "true",
"spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows" -> "10000"
) {
spark.sql(s"""
|DROP TABLE IF EXISTS $tableName;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS $tableName
|USING clickhouse
|LOCATION '$dataPath'
|TBLPROPERTIES (storage_policy='__hdfs_main')
| as select * from lineitem
|""".stripMargin)

val ret = spark.sql(s"select count(*) from $tableName").collect()
assertResult(600572)(ret.apply(0).get(0))
val conf = new Configuration
conf.set("fs.defaultFS", HDFS_URL)
val fs = FileSystem.get(conf)

eventually(timeout(60.seconds), interval(2.seconds)) {
val it = fs.listFiles(new Path(dataPath), true)
var files = 0
while (it.hasNext) {
it.next()
files += 1
}
assertResult(72)(files)
}
}
}
}
// scalastyle:off line.size.limit
Loading

0 comments on commit ee0ac0c

Please sign in to comment.