Skip to content

Commit

Permalink
fix load cache missing columns
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed Sep 11, 2024
1 parent 5b00edd commit 445345a
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf)
val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns)
context.reply(CacheJobInfo(status = true, jobId))
} catch {
case _: Exception =>
case e: Exception =>
context.reply(
CacheJobInfo(status = false, "", s"executor: $executorId cache data failed."))
CacheJobInfo(status = false, "", s"executor: $executorId cache data failed: ${e.getMessage}."))
}
case GlutenCacheLoadStatus(jobId) =>
val status = CHNativeCacheManager.getCacheStatus(jobId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.commands

import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.substrait.rel.ExtensionTableBuilder

import org.apache.spark.affinity.CHAffinity
import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenMergeTreeCacheLoad}
Expand All @@ -30,12 +29,11 @@ import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.commands.GlutenCacheBase._
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.types.{BooleanType, StringType}

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil

import java.net.URI
import java.util.{ArrayList => JList}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
Expand Down Expand Up @@ -180,7 +178,7 @@ case class GlutenCHCacheDataCommand(
ClickhouseSnapshot.genSnapshotId(snapshot),
onePart.tablePath,
pathToCache.toString,
snapshot.metadata.configuration.getOrElse("orderByKey", ""),
snapshot.metadata.configuration.getOrElse("orderByKey", MergeTreeDeltaUtil.DEFAULT_ORDER_BY_KEY),
snapshot.metadata.configuration.getOrElse("lowCardKey", ""),
snapshot.metadata.configuration.getOrElse("minmaxIndexKey", ""),
snapshot.metadata.configuration.getOrElse("bloomfilterIndexKey", ""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import scala.concurrent.duration.DurationInt
// Some sqls' line length exceeds 100
// scalastyle:off line.size.limit

class GlutenClickHouseMergeTreeCacheDataSSuite
class GlutenClickHouseMergeTreeCacheDataSuite
extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {

Expand Down Expand Up @@ -398,5 +398,198 @@ class GlutenClickHouseMergeTreeCacheDataSSuite
})
spark.sql("drop table lineitem_mergetree_hdfs purge")
}

test("test cache mergetree data no partition columns") {

spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
|(
| L_ORDERKEY bigint,
| L_PARTKEY bigint,
| L_SUPPKEY bigint,
| L_LINENUMBER bigint,
| L_QUANTITY double,
| L_EXTENDEDPRICE double,
| L_DISCOUNT double,
| L_TAX double,
| L_RETURNFLAG string,
| L_LINESTATUS string,
| L_SHIPDATE date,
| L_COMMITDATE date,
| L_RECEIPTDATE date,
| L_SHIPINSTRUCT string,
| L_SHIPMODE string,
| L_COMMENT string
|)
|USING clickhouse
|LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
|TBLPROPERTIES (storage_policy='__hdfs_main')
|""".stripMargin)

spark.sql(s"""
| insert into table lineitem_mergetree_hdfs
| select * from lineitem a
| where a.l_shipdate between date'1995-01-01' and date'1995-01-31'
|""".stripMargin)
FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
val dataPath = new File(HDFS_CACHE_PATH)
val initial_cache_files = countFiles(dataPath)

val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs")
val res1 = spark.sql(s"cache data select * from lineitem_mergetree_hdfs").collect()
assertResult(true)(res1(0).getBoolean(0))
assertResult(1)(metaPath.list().length)
assert(countFiles(dataPath) > initial_cache_files)

val sqlStr =
s"""
|SELECT
| l_returnflag,
| l_linestatus,
| sum(l_quantity) AS sum_qty,
| sum(l_extendedprice) AS sum_base_price,
| sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
| sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
| avg(l_quantity) AS avg_qty,
| avg(l_extendedprice) AS avg_price,
| avg(l_discount) AS avg_disc,
| count(*) AS count_order
|FROM
| lineitem_mergetree_hdfs
|WHERE
| l_shipdate >= date'1995-01-10'
|GROUP BY
| l_returnflag,
| l_linestatus
|ORDER BY
| l_returnflag,
| l_linestatus;
|
|""".stripMargin
runSql(sqlStr)(
df => {
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assertResult(1)(scanExec.size)

val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assertResult(7898)(addFiles.map(_.rows).sum)
})
spark.sql("drop table lineitem_mergetree_hdfs purge")
}

test("test cache mergetree data with upper case column name") {

spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
|(
| L_ORDERKEY bigint,
| L_PARTKEY bigint,
| L_SUPPKEY bigint,
| L_LINENUMBER bigint,
| L_QUANTITY double,
| L_EXTENDEDPRICE double,
| L_DISCOUNT double,
| L_TAX double,
| L_RETURNFLAG string,
| L_LINESTATUS string,
| L_SHIPDATE date,
| L_COMMITDATE date,
| L_RECEIPTDATE date,
| L_SHIPINSTRUCT string,
| L_SHIPMODE string,
| L_COMMENT string
|)
|USING clickhouse
|PARTITIONED BY (L_SHIPDATE)
|LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
|TBLPROPERTIES (storage_policy='__hdfs_main',
| orderByKey='L_LINENUMBER,L_ORDERKEY')
|""".stripMargin)

spark.sql(s"""
| insert into table lineitem_mergetree_hdfs
| select * from lineitem a
| where a.l_shipdate between date'1995-01-01' and date'1995-01-31'
|""".stripMargin)
FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
val dataPath = new File(HDFS_CACHE_PATH)
val initial_cache_files = countFiles(dataPath)

val res = spark
.sql(s"""
|cache data
| select * from '$HDFS_URL/test/lineitem_mergetree_hdfs'
| after L_SHIPDATE AS OF '1995-01-10'
| CACHEPROPERTIES(storage_policy='__hdfs_main',
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs")
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
assertResult(22)(metaPath.list().length)
assert(countFiles(dataPath) > initial_cache_files)
val first_cache_files = countFiles(dataPath)
val res1 = spark.sql(s"cache data select * from lineitem_mergetree_hdfs").collect()
assertResult(true)(res1(0).getBoolean(0))
assertResult(31)(metaPath.list().length)
assert(countFiles(dataPath) > first_cache_files)

val sqlStr =
s"""
|SELECT
| l_returnflag,
| l_linestatus,
| sum(l_quantity) AS sum_qty,
| sum(l_extendedprice) AS sum_base_price,
| sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
| sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
| avg(l_quantity) AS avg_qty,
| avg(l_extendedprice) AS avg_price,
| avg(l_discount) AS avg_disc,
| count(*) AS count_order
|FROM
| lineitem_mergetree_hdfs
|WHERE
| l_shipdate >= date'1995-01-10'
|GROUP BY
| l_returnflag,
| l_linestatus
|ORDER BY
| l_returnflag,
| l_linestatus;
|
|""".stripMargin
runSql(sqlStr)(
df => {
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assertResult(1)(scanExec.size)

val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assertResult(7898)(addFiles.map(_.rows).sum)
})
spark.sql("drop table lineitem_mergetree_hdfs purge")
}
}
// scalastyle:off line.size.limit
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.gluten.substrait.rel;

import org.apache.gluten.expression.ConverterUtils;

import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -50,12 +52,12 @@ public static ExtensionTableNode makeExtensionTable(
snapshotId,
relativeTablePath,
absoluteTablePath,
orderByKey,
lowCardKey,
minmaxIndexKey,
bfIndexKey,
setIndexKey,
primaryKey,
ConverterUtils.normalizeColName(orderByKey),
ConverterUtils.normalizeColName(lowCardKey),
ConverterUtils.normalizeColName(minmaxIndexKey),
ConverterUtils.normalizeColName(bfIndexKey),
ConverterUtils.normalizeColName(setIndexKey),
ConverterUtils.normalizeColName(primaryKey),
partList,
starts,
lengths,
Expand Down

0 comments on commit 445345a

Please sign in to comment.