Skip to content

Commit

Permalink
[CH] Fix load cache missing columns #7192
Browse files Browse the repository at this point in the history
What changes were proposed in this pull request?
Fix MergeTree cache load failed when column name is upper case

How was this patch tested?
unit tests

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
  • Loading branch information
liuneng1994 authored Sep 11, 2024
1 parent 9997927 commit 85d90c9
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,12 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf)
val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns)
context.reply(CacheJobInfo(status = true, jobId))
} catch {
case _: Exception =>
case e: Exception =>
context.reply(
CacheJobInfo(status = false, "", s"executor: $executorId cache data failed."))
CacheJobInfo(
status = false,
"",
s"executor: $executorId cache data failed: ${e.getMessage}."))
}
case GlutenCacheLoadStatus(jobId) =>
val status = CHNativeCacheManager.getCacheStatus(jobId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference,
import org.apache.spark.sql.delta._
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.commands.GlutenCacheBase._
import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.types.{BooleanType, StringType}

Expand Down Expand Up @@ -180,7 +181,8 @@ case class GlutenCHCacheDataCommand(
ClickhouseSnapshot.genSnapshotId(snapshot),
onePart.tablePath,
pathToCache.toString,
snapshot.metadata.configuration.getOrElse("orderByKey", ""),
snapshot.metadata.configuration
.getOrElse("orderByKey", MergeTreeDeltaUtil.DEFAULT_ORDER_BY_KEY),
snapshot.metadata.configuration.getOrElse("lowCardKey", ""),
snapshot.metadata.configuration.getOrElse("minmaxIndexKey", ""),
snapshot.metadata.configuration.getOrElse("bloomfilterIndexKey", ""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import scala.concurrent.duration.DurationInt
// Some sqls' line length exceeds 100
// scalastyle:off line.size.limit

class GlutenClickHouseMergeTreeCacheDataSSuite
class GlutenClickHouseMergeTreeCacheDataSuite
extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {

Expand Down Expand Up @@ -398,5 +398,198 @@ class GlutenClickHouseMergeTreeCacheDataSSuite
})
spark.sql("drop table lineitem_mergetree_hdfs purge")
}

test("test cache mergetree data no partition columns") {

spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
|(
| L_ORDERKEY bigint,
| L_PARTKEY bigint,
| L_SUPPKEY bigint,
| L_LINENUMBER bigint,
| L_QUANTITY double,
| L_EXTENDEDPRICE double,
| L_DISCOUNT double,
| L_TAX double,
| L_RETURNFLAG string,
| L_LINESTATUS string,
| L_SHIPDATE date,
| L_COMMITDATE date,
| L_RECEIPTDATE date,
| L_SHIPINSTRUCT string,
| L_SHIPMODE string,
| L_COMMENT string
|)
|USING clickhouse
|LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
|TBLPROPERTIES (storage_policy='__hdfs_main')
|""".stripMargin)

spark.sql(s"""
| insert into table lineitem_mergetree_hdfs
| select * from lineitem a
| where a.l_shipdate between date'1995-01-01' and date'1995-01-31'
|""".stripMargin)
FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
val dataPath = new File(HDFS_CACHE_PATH)
val initial_cache_files = countFiles(dataPath)

val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs")
val res1 = spark.sql(s"cache data select * from lineitem_mergetree_hdfs").collect()
assertResult(true)(res1(0).getBoolean(0))
assertResult(1)(metaPath.list().length)
assert(countFiles(dataPath) > initial_cache_files)

val sqlStr =
s"""
|SELECT
| l_returnflag,
| l_linestatus,
| sum(l_quantity) AS sum_qty,
| sum(l_extendedprice) AS sum_base_price,
| sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
| sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
| avg(l_quantity) AS avg_qty,
| avg(l_extendedprice) AS avg_price,
| avg(l_discount) AS avg_disc,
| count(*) AS count_order
|FROM
| lineitem_mergetree_hdfs
|WHERE
| l_shipdate >= date'1995-01-10'
|GROUP BY
| l_returnflag,
| l_linestatus
|ORDER BY
| l_returnflag,
| l_linestatus;
|
|""".stripMargin
runSql(sqlStr)(
df => {
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assertResult(1)(scanExec.size)

val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assertResult(7898)(addFiles.map(_.rows).sum)
})
spark.sql("drop table lineitem_mergetree_hdfs purge")
}

test("test cache mergetree data with upper case column name") {

spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
|(
| L_ORDERKEY bigint,
| L_PARTKEY bigint,
| L_SUPPKEY bigint,
| L_LINENUMBER bigint,
| L_QUANTITY double,
| L_EXTENDEDPRICE double,
| L_DISCOUNT double,
| L_TAX double,
| L_RETURNFLAG string,
| L_LINESTATUS string,
| L_SHIPDATE date,
| L_COMMITDATE date,
| L_RECEIPTDATE date,
| L_SHIPINSTRUCT string,
| L_SHIPMODE string,
| L_COMMENT string
|)
|USING clickhouse
|PARTITIONED BY (L_SHIPDATE)
|LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
|TBLPROPERTIES (storage_policy='__hdfs_main',
| orderByKey='L_LINENUMBER,L_ORDERKEY')
|""".stripMargin)

spark.sql(s"""
| insert into table lineitem_mergetree_hdfs
| select * from lineitem a
| where a.l_shipdate between date'1995-01-01' and date'1995-01-31'
|""".stripMargin)
FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
val dataPath = new File(HDFS_CACHE_PATH)
val initial_cache_files = countFiles(dataPath)

val res = spark
.sql(s"""
|cache data
| select * from '$HDFS_URL/test/lineitem_mergetree_hdfs'
| after L_SHIPDATE AS OF '1995-01-10'
| CACHEPROPERTIES(storage_policy='__hdfs_main',
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs")
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
assertResult(22)(metaPath.list().length)
assert(countFiles(dataPath) > initial_cache_files)
val first_cache_files = countFiles(dataPath)
val res1 = spark.sql(s"cache data select * from lineitem_mergetree_hdfs").collect()
assertResult(true)(res1(0).getBoolean(0))
assertResult(31)(metaPath.list().length)
assert(countFiles(dataPath) > first_cache_files)

val sqlStr =
s"""
|SELECT
| l_returnflag,
| l_linestatus,
| sum(l_quantity) AS sum_qty,
| sum(l_extendedprice) AS sum_base_price,
| sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
| sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
| avg(l_quantity) AS avg_qty,
| avg(l_extendedprice) AS avg_price,
| avg(l_discount) AS avg_disc,
| count(*) AS count_order
|FROM
| lineitem_mergetree_hdfs
|WHERE
| l_shipdate >= date'1995-01-10'
|GROUP BY
| l_returnflag,
| l_linestatus
|ORDER BY
| l_returnflag,
| l_linestatus;
|
|""".stripMargin
runSql(sqlStr)(
df => {
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assertResult(1)(scanExec.size)

val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assertResult(7898)(addFiles.map(_.rows).sum)
})
spark.sql("drop table lineitem_mergetree_hdfs purge")
}
}
// scalastyle:off line.size.limit
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.gluten.substrait.rel;

import org.apache.gluten.expression.ConverterUtils;

import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -50,12 +52,12 @@ public static ExtensionTableNode makeExtensionTable(
snapshotId,
relativeTablePath,
absoluteTablePath,
orderByKey,
lowCardKey,
minmaxIndexKey,
bfIndexKey,
setIndexKey,
primaryKey,
ConverterUtils.normalizeColName(orderByKey),
ConverterUtils.normalizeColName(lowCardKey),
ConverterUtils.normalizeColName(minmaxIndexKey),
ConverterUtils.normalizeColName(bfIndexKey),
ConverterUtils.normalizeColName(setIndexKey),
ConverterUtils.normalizeColName(primaryKey),
partList,
starts,
lengths,
Expand Down

0 comments on commit 85d90c9

Please sign in to comment.