Skip to content

Commit

Permalink
[GLUTEN-6378][CH] Support delta count optimizer for the MergeTree for…
Browse files Browse the repository at this point in the history
…mat (#6379)

* [GLUTEN-6378][CH] Support delta count optimizer for the MergeTree format

Support delta count optimizer for the MergeTree format:

In Delta, it will use the rule PrepareDeltaScan to optimize the count command, which will directly use the delta meta to response the sql select count(*) from table, now support this optimizer for the MergeTree format.

Close #6378.

* fix ut
  • Loading branch information
zzcclp authored Jul 9, 2024
1 parent f12c51b commit ca35e47
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,15 @@ object AddFileTags {
"dirName" -> dirName,
"marks" -> marks.toString
)
AddFile(name, partitionValues, bytesOnDisk, modificationTime, dataChange, stats, tags)
val mapper: ObjectMapper = new ObjectMapper()
val rootNode = mapper.createObjectNode()
rootNode.put("numRecords", rows)
rootNode.put("minValues", "")
rootNode.put("maxValues", "")
rootNode.put("nullCount", "")
// Add the `stats` into delta meta log
val metricsStats = mapper.writeValueAsString(rootNode)
AddFile(name, partitionValues, bytesOnDisk, modificationTime, dataChange, metricsStats, tags)
}

def addFileToAddMergeTreeParts(addFile: AddFile): AddMergeTreeParts = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.LocalTableScanExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -1305,5 +1306,34 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
dataFileList = dataPathFile.list(fileFilter)
assertResult(6)(dataFileList.length)
}

test("GLUTEN-6378: Support delta count optimizer for the MergeTree format") {
val dataPath = s"$basePath/lineitem_mergetree_count_opti"
clearDataPath(dataPath)

val sourceDF = spark.sql(s"""
|select * from lineitem
|""".stripMargin)

sourceDF.write
.format("clickhouse")
.partitionBy("l_shipdate", "l_returnflag")
.option("clickhouse.orderByKey", "l_orderkey")
.option("clickhouse.primaryKey", "l_orderkey")
.mode(SaveMode.Append)
.save(dataPath)

val df = spark.read
.format("clickhouse")
.load(dataPath)
.groupBy()
.count()
val result = df.collect()
assertResult(600572)(result(0).getLong(0))
// Spark 3.2 + Delta 2.0 does not support this feature
if (!sparkVersion.equals("3.2")) {
assert(df.queryExecution.executedPlan.isInstanceOf[LocalTableScanExec])
}
}
}
// scalastyle:off line.size.limit
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.LocalTableScanExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts

Expand Down Expand Up @@ -357,34 +358,10 @@ class GlutenClickHouseMergeTreeWriteSuite
|""".stripMargin

val df = spark.sql(sql1)
val result = df.collect()
assertResult(1)(
// in test data, there are only 1 row with l_orderkey = 12647
result.apply(0).get(0)
df.collect().apply(0).get(0)
)
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assertResult(1)(scanExec.size)

val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assertResult(600572)(addFiles.map(_.rows).sum)

// 4 parts belong to the first batch
// 2 parts belong to the second batch (1 actual updated part, 1 passively updated).
assertResult(6)(addFiles.size)
val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_")))
assertResult(2)(filePaths.size)
assertResult(Array(2, 4))(filePaths.values.map(paths => paths.size).toArray.sorted)
}

val sql2 =
Expand Down Expand Up @@ -439,22 +416,9 @@ class GlutenClickHouseMergeTreeWriteSuite
val df = spark.sql(s"""
| select count(*) from lineitem_mergetree_delete
|""".stripMargin)
val result = df.collect()
assertResult(600571)(
result.apply(0).get(0)
df.collect().apply(0).get(0)
)
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
val mergetreeScan = scanExec.head
val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
// 4 parts belong to the first batch
// 2 parts belong to the second batch (1 actual updated part, 1 passively updated).
assertResult(6)(addFiles.size)
val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_")))
assertResult(2)(filePaths.size)
assertResult(Array(2, 4))(filePaths.values.map(paths => paths.size).toArray.sorted)
}

{
Expand Down Expand Up @@ -1491,19 +1455,6 @@ class GlutenClickHouseMergeTreeWriteSuite
val result = df.collect()
assertResult(1)(result.length)
assertResult(10)(result(0).getLong(0))

val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assertResult(1)(scanExec.size)

val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assertResult(1)(addFiles.size)
assertResult(10)(addFiles.head.rows)
})
}

Expand Down Expand Up @@ -1962,5 +1913,63 @@ class GlutenClickHouseMergeTreeWriteSuite
}
})
}

test("GLUTEN-6378: Support delta count optimizer for the MergeTree format") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_count_opti;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS lineitem_mergetree_count_opti
|(
| l_orderkey bigint,
| l_partkey bigint,
| l_suppkey bigint,
| l_linenumber bigint,
| l_quantity double,
| l_extendedprice double,
| l_discount double,
| l_tax double,
| l_returnflag string,
| l_linestatus string,
| l_shipdate date,
| l_commitdate date,
| l_receiptdate date,
| l_shipinstruct string,
| l_shipmode string,
| l_comment string
|)
|USING clickhouse
|PARTITIONED BY (l_shipdate, l_returnflag)
|TBLPROPERTIES (orderByKey='l_orderkey',
| primaryKey='l_orderkey')
|LOCATION '$basePath/lineitem_mergetree_count_opti'
|""".stripMargin)

// dynamic partitions
spark.sql(s"""
| insert into table lineitem_mergetree_count_opti
| select * from lineitem
|""".stripMargin)

val sqlStr =
s"""
|SELECT
| count(*) AS count_order
|FROM
| lineitem_mergetree_count_opti
|""".stripMargin
runSql(sqlStr)(
df => {
val result = df.collect()
assertResult(1)(result.length)
assertResult("600572")(result(0).getLong(0).toString)

// Spark 3.2 + Delta 2.0 does not support this feature
if (!sparkVersion.equals("3.2")) {
assert(df.queryExecution.executedPlan.isInstanceOf[LocalTableScanExec])
}
})
}
}
// scalastyle:off line.size.limit
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,13 @@ class GlutenClickHouseTPCHBucketSuite
runSql(SQL6)(
df => {
checkResult(df, Array(Row(600572)))
// there is a shuffle between two phase hash aggregates.
checkHashAggregateCount(df, 2)
if (sparkVersion.equals("3.2")) {
// there is a shuffle between two phase hash aggregate.
checkHashAggregateCount(df, 2)
} else {
// the delta will use the delta log meta to response this sql
checkHashAggregateCount(df, 0)
}
})

// test sort aggregates
Expand Down

0 comments on commit ca35e47

Please sign in to comment.