Skip to content

Commit

Permalink
[GLUTEN-5476][CH] Triger merge on insert task (#5529)
Browse files Browse the repository at this point in the history
What changes were proposed in this pull request?
(Fixes: #5476)

How was this patch tested?
Test by ut
  • Loading branch information
loneylee authored May 6, 2024
1 parent 414d667 commit 8efe3e4
Show file tree
Hide file tree
Showing 22 changed files with 603 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public native long nativeInitMergeTreeWriterWrapper(
String uuid,
String taskId,
String partition_dir,
String bucket_dir);
String bucket_dir,
byte[] confArray,
long allocId);

public native String nativeMergeMTParts(
byte[] plan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.delta

import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
import org.apache.gluten.execution.ColumnarToRowExecBase

import org.apache.spark.SparkException
Expand Down Expand Up @@ -109,7 +110,7 @@ class ClickhouseOptimisticTransaction(

// Retain only a minimal selection of Spark writer options to avoid any potential
// compatibility issues
val options = writeOptions match {
var options = writeOptions match {
case None => Map.empty[String, String]
case Some(writeOptions) =>
writeOptions.options.filterKeys {
Expand All @@ -119,6 +120,16 @@ class ClickhouseOptimisticTransaction(
}.toMap
}

spark.conf.getAll.foreach(
entry => {
if (
entry._1.startsWith(s"${CHBackendSettings.getBackendConfigPrefix}.runtime_settings")
|| entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key)
) {
options += (entry._1 -> entry._2)
}
})

try {
val tableV2 = ClickHouseTableV2.getTable(deltaLog)
MergeTreeFileFormatWriter.write(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
*/
package org.apache.spark.sql.execution.datasources.v1

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.memory.alloc.CHNativeMemoryAllocators
import org.apache.gluten.substrait.`type`.ColumnTypeNode
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
import org.apache.gluten.substrait.expression.{ExpressionBuilder, StringMapNode}
import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode, ExtensionBuilder}
import org.apache.gluten.substrait.plan.PlanBuilder
import org.apache.gluten.substrait.rel.{ExtensionTableBuilder, RelBuilder}

Expand All @@ -39,18 +42,15 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext
import java.util.{ArrayList => JList, Map => JMap, UUID}

import scala.collection.JavaConverters._
import scala.collection.mutable

case class PlanWithSplitInfo(plan: Array[Byte], splitInfo: Array[Byte])

class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase {

override def nativeConf(
options: Map[String, String],
compressionCodec: String): JMap[String, String] = {
// pass options to native so that velox can take user-specified conf to write parquet,
// i.e., compression, block size, block rows.
val sparkOptions = new mutable.HashMap[String, String]()
sparkOptions.asJava
options.asJava
}

override def createOutputWriter(
Expand Down Expand Up @@ -95,7 +95,7 @@ class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase {
clickhouseTableConfigs,
tableSchema.toAttributes // use table schema instead of data schema
)

val allocId = CHNativeMemoryAllocators.contextInstance.getNativeInstanceId
val datasourceJniWrapper = new CHDatasourceJniWrapper()
val instance =
datasourceJniWrapper.nativeInitMergeTreeWriterWrapper(
Expand All @@ -104,7 +104,9 @@ class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase {
uuid,
context.getTaskAttemptID.getTaskID.getId.toString,
context.getConfiguration.get("mapreduce.task.gluten.mergetree.partition.dir"),
context.getConfiguration.get("mapreduce.task.gluten.mergetree.bucketid.str")
context.getConfiguration.get("mapreduce.task.gluten.mergetree.bucketid.str"),
buildNativeConf(nativeConf),
allocId
)

new MergeTreeOutputWriter(database, tableName, datasourceJniWrapper, instance, path)
Expand All @@ -121,6 +123,13 @@ class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase {
override def getFormatName(): String = {
"mergetree"
}

private def buildNativeConf(confs: JMap[String, String]): Array[Byte] = {
val stringMapNode: StringMapNode = ExpressionBuilder.makeStringMap(confs)
val extensionNode: AdvancedExtensionNode = ExtensionBuilder.makeAdvancedExtension(
BackendsApiManager.getTransformerApiInstance.packPBMessage(stringMapNode.toProtobuf))
PlanBuilder.makePlan(extensionNode).toProtobuf.toByteArray
}
}

object CHMergeTreeWriterInjects {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import io.delta.tables.ClickhouseTable

import java.io.File

import scala.concurrent.duration.DurationInt

// Some sqls' line length exceeds 100
// scalastyle:off line.size.limit

Expand Down Expand Up @@ -54,6 +56,9 @@ class GlutenClickHouseMergeTreeOptimizeSuite
"spark.databricks.delta.retentionDurationCheck.enabled",
"false"
) // otherwise RETAIN 0 HOURS will fail
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
"false")
}

override protected def createTPCHNotNullTables(): Unit = {
Expand Down Expand Up @@ -426,5 +431,31 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect()
assert(ret.apply(0).get(0) == 600572)
}

test("test mergetree insert with optimize basic") {
withSQLConf(
("spark.databricks.delta.optimize.minFileSize" -> "200000000"),
("spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true")
) {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_insert_optimize_basic;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS lineitem_mergetree_insert_optimize_basic
|USING clickhouse
|LOCATION '$basePath/lineitem_mergetree_insert_optimize_basic'
| as select * from lineitem
|""".stripMargin)

val ret = spark.sql("select count(*) from lineitem_mergetree_insert_optimize_basic").collect()
assert(ret.apply(0).get(0) == 600572)
eventually(timeout(60.seconds), interval(3.seconds)) {
assert(
new File(s"$basePath/lineitem_mergetree_insert_optimize_basic").listFiles().length == 2
)
}
}
}
}
// scalastyle:off line.size.limit
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows",
"100000")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
"false")
}

override protected def createTPCHNotNullTables(): Unit = {
Expand Down Expand Up @@ -170,8 +173,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
.format("clickhouse")
.load(dataPath)
.where("l_shipdate = date'1998-09-02'")
.collect()
assert(result.apply(0).get(0) == 110501)
.count()
assert(result == 183)
}

test("test mergetree path based write with dataframe api") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "error")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
"false")
}

override protected def beforeEach(): Unit = {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class GlutenClickHouseMergeTreeWriteSuite
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows",
"100000")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
"false")
}

override protected def createTPCHNotNullTables(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class GlutenClickHouseTableAfterRestart
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows",
"100000")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
"false")
}

override protected def createTPCHNotNullTables(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu
}

override def beforeAll(): Unit = {
// is not exist may cause some ut error
assert(new File("/data").exists())

// prepare working paths
val basePathDir = new File(basePath)
if (basePathDir.exists()) {
Expand Down
Loading

0 comments on commit 8efe3e4

Please sign in to comment.