From bba6e1af88c527337e8860bbc4d40e1bdf28a8f9 Mon Sep 17 00:00:00 2001 From: liuneng1994 Date: Wed, 11 Sep 2024 10:43:19 +0800 Subject: [PATCH] fix load cache missing columns --- .../spark/rpc/GlutenExecutorEndpoint.scala | 7 +- .../commands/GlutenCHCacheDataCommand.scala | 4 +- ...enClickHouseMergeTreeCacheDataSuite.scala} | 195 +++++++++++++++++- .../substrait/rel/ExtensionTableBuilder.java | 14 +- 4 files changed, 210 insertions(+), 10 deletions(-) rename backends-clickhouse/src/test/scala/org/apache/gluten/execution/{GlutenClickHouseMergeTreeCacheDataSSuite.scala => GlutenClickHouseMergeTreeCacheDataSuite.scala} (67%) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala index 7f2b94eea314..559a22cb12c2 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala @@ -75,9 +75,12 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf) val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns) context.reply(CacheJobInfo(status = true, jobId)) } catch { - case _: Exception => + case e: Exception => context.reply( - CacheJobInfo(status = false, "", s"executor: $executorId cache data failed.")) + CacheJobInfo( + status = false, + "", + s"executor: $executorId cache data failed: ${e.getMessage}.")) } case GlutenCacheLoadStatus(jobId) => val status = CHNativeCacheManager.getCacheStatus(jobId) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala index 43e3b4b7ab98..7aca290b1691 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, import org.apache.spark.sql.delta._ import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.execution.commands.GlutenCacheBase._ +import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.types.{BooleanType, StringType} @@ -180,7 +181,8 @@ case class GlutenCHCacheDataCommand( ClickhouseSnapshot.genSnapshotId(snapshot), onePart.tablePath, pathToCache.toString, - snapshot.metadata.configuration.getOrElse("orderByKey", ""), + snapshot.metadata.configuration + .getOrElse("orderByKey", MergeTreeDeltaUtil.DEFAULT_ORDER_BY_KEY), snapshot.metadata.configuration.getOrElse("lowCardKey", ""), snapshot.metadata.configuration.getOrElse("minmaxIndexKey", ""), snapshot.metadata.configuration.getOrElse("bloomfilterIndexKey", ""), diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSuite.scala similarity index 67% rename from backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala rename to backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSuite.scala index a55067185e68..88bb00faced3 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSuite.scala @@ -32,7 +32,7 @@ import scala.concurrent.duration.DurationInt // Some sqls' line length exceeds 100 // scalastyle:off line.size.limit -class GlutenClickHouseMergeTreeCacheDataSSuite +class GlutenClickHouseMergeTreeCacheDataSuite extends GlutenClickHouseTPCHAbstractSuite with AdaptiveSparkPlanHelper { @@ -398,5 +398,198 @@ class GlutenClickHouseMergeTreeCacheDataSSuite }) spark.sql("drop table lineitem_mergetree_hdfs purge") } + + test("test cache mergetree data no partition columns") { + + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_hdfs; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs + |( + | L_ORDERKEY bigint, + | L_PARTKEY bigint, + | L_SUPPKEY bigint, + | L_LINENUMBER bigint, + | L_QUANTITY double, + | L_EXTENDEDPRICE double, + | L_DISCOUNT double, + | L_TAX double, + | L_RETURNFLAG string, + | L_LINESTATUS string, + | L_SHIPDATE date, + | L_COMMITDATE date, + | L_RECEIPTDATE date, + | L_SHIPINSTRUCT string, + | L_SHIPMODE string, + | L_COMMENT string + |) + |USING clickhouse + |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs' + |TBLPROPERTIES (storage_policy='__hdfs_main') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_hdfs + | select * from lineitem a + | where a.l_shipdate between date'1995-01-01' and date'1995-01-31' + |""".stripMargin) + FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH)) + FileUtils.forceMkdir(new File(HDFS_METADATA_PATH)) + val dataPath = new File(HDFS_CACHE_PATH) + val initial_cache_files = countFiles(dataPath) + + val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs") + val res1 = spark.sql(s"cache data select * from lineitem_mergetree_hdfs").collect() + assertResult(true)(res1(0).getBoolean(0)) + assertResult(1)(metaPath.list().length) + assert(countFiles(dataPath) > initial_cache_files) + + val sqlStr = + s""" + |SELECT + | l_returnflag, + | l_linestatus, + | sum(l_quantity) AS sum_qty, + | sum(l_extendedprice) AS sum_base_price, + | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, + | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, + | avg(l_quantity) AS avg_qty, + | avg(l_extendedprice) AS avg_price, + | avg(l_discount) AS avg_disc, + | count(*) AS count_order + |FROM + | lineitem_mergetree_hdfs + |WHERE + | l_shipdate >= date'1995-01-10' + |GROUP BY + | l_returnflag, + | l_linestatus + |ORDER BY + | l_returnflag, + | l_linestatus; + | + |""".stripMargin + runSql(sqlStr)( + df => { + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assertResult(1)(scanExec.size) + + val mergetreeScan = scanExec.head + assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) + + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + assertResult(7898)(addFiles.map(_.rows).sum) + }) + spark.sql("drop table lineitem_mergetree_hdfs purge") + } + + test("test cache mergetree data with upper case column name") { + + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_hdfs; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs + |( + | L_ORDERKEY bigint, + | L_PARTKEY bigint, + | L_SUPPKEY bigint, + | L_LINENUMBER bigint, + | L_QUANTITY double, + | L_EXTENDEDPRICE double, + | L_DISCOUNT double, + | L_TAX double, + | L_RETURNFLAG string, + | L_LINESTATUS string, + | L_SHIPDATE date, + | L_COMMITDATE date, + | L_RECEIPTDATE date, + | L_SHIPINSTRUCT string, + | L_SHIPMODE string, + | L_COMMENT string + |) + |USING clickhouse + |PARTITIONED BY (L_SHIPDATE) + |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs' + |TBLPROPERTIES (storage_policy='__hdfs_main', + | orderByKey='L_LINENUMBER,L_ORDERKEY') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_hdfs + | select * from lineitem a + | where a.l_shipdate between date'1995-01-01' and date'1995-01-31' + |""".stripMargin) + FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH)) + FileUtils.forceMkdir(new File(HDFS_METADATA_PATH)) + val dataPath = new File(HDFS_CACHE_PATH) + val initial_cache_files = countFiles(dataPath) + + val res = spark + .sql(s""" + |cache data + | select * from '$HDFS_URL/test/lineitem_mergetree_hdfs' + | after L_SHIPDATE AS OF '1995-01-10' + | CACHEPROPERTIES(storage_policy='__hdfs_main', + | aaa='ccc')""".stripMargin) + .collect() + assertResult(true)(res(0).getBoolean(0)) + val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs") + assertResult(true)(metaPath.exists() && metaPath.isDirectory) + assertResult(22)(metaPath.list().length) + assert(countFiles(dataPath) > initial_cache_files) + val first_cache_files = countFiles(dataPath) + val res1 = spark.sql(s"cache data select * from lineitem_mergetree_hdfs").collect() + assertResult(true)(res1(0).getBoolean(0)) + assertResult(31)(metaPath.list().length) + assert(countFiles(dataPath) > first_cache_files) + + val sqlStr = + s""" + |SELECT + | l_returnflag, + | l_linestatus, + | sum(l_quantity) AS sum_qty, + | sum(l_extendedprice) AS sum_base_price, + | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, + | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, + | avg(l_quantity) AS avg_qty, + | avg(l_extendedprice) AS avg_price, + | avg(l_discount) AS avg_disc, + | count(*) AS count_order + |FROM + | lineitem_mergetree_hdfs + |WHERE + | l_shipdate >= date'1995-01-10' + |GROUP BY + | l_returnflag, + | l_linestatus + |ORDER BY + | l_returnflag, + | l_linestatus; + | + |""".stripMargin + runSql(sqlStr)( + df => { + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assertResult(1)(scanExec.size) + + val mergetreeScan = scanExec.head + assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) + + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + assertResult(7898)(addFiles.map(_.rows).sum) + }) + spark.sql("drop table lineitem_mergetree_hdfs purge") + } } // scalastyle:off line.size.limit diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java index 34c017d80cd1..b2b813dd9f0e 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java @@ -16,6 +16,8 @@ */ package org.apache.gluten.substrait.rel; +import org.apache.gluten.expression.ConverterUtils; + import java.util.List; import java.util.Map; @@ -50,12 +52,12 @@ public static ExtensionTableNode makeExtensionTable( snapshotId, relativeTablePath, absoluteTablePath, - orderByKey, - lowCardKey, - minmaxIndexKey, - bfIndexKey, - setIndexKey, - primaryKey, + ConverterUtils.normalizeColName(orderByKey), + ConverterUtils.normalizeColName(lowCardKey), + ConverterUtils.normalizeColName(minmaxIndexKey), + ConverterUtils.normalizeColName(bfIndexKey), + ConverterUtils.normalizeColName(setIndexKey), + ConverterUtils.normalizeColName(primaryKey), partList, starts, lengths,