Skip to content

Commit

Permalink
[GLUTEN-6040][CH] Fix can't not load part after restart spark session (
Browse files Browse the repository at this point in the history
…#6041)

[CH] Fix can't not load part after restart spark session
  • Loading branch information
loneylee authored Jun 12, 2024
1 parent d3ccd4a commit 070022a
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -635,29 +635,102 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
}

test("test mergetree insert with optimize basic") {
val table_name = "lineitem_mergetree_insert_optimize_basic_s3"
val dataPath = s"s3a://$BUCKET_NAME/$table_name"
val tableName = "lineitem_mergetree_insert_optimize_basic_s3"
val dataPath = s"s3a://$BUCKET_NAME/$tableName"

withSQLConf(
("spark.databricks.delta.optimize.minFileSize" -> "200000000"),
("spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true")
) {
spark.sql(s"""
|DROP TABLE IF EXISTS $table_name;
|DROP TABLE IF EXISTS $tableName;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS $table_name
|CREATE TABLE IF NOT EXISTS $tableName
|USING clickhouse
|LOCATION '$dataPath'
| as select * from lineitem
|""".stripMargin)

val ret = spark.sql(s"select count(*) from $table_name").collect()
val ret = spark.sql(s"select count(*) from $tableName").collect()
assert(ret.apply(0).get(0) == 600572)
assert(
!new File(s"$CH_DEFAULT_STORAGE_DIR/lineitem_mergetree_insert_optimize_basic").exists())
}
}

test("test mergetree with primary keys pruning by driver") {
val tableName = "lineitem_mergetree_pk_pruning_by_driver_s3"
val dataPath = s"s3a://$BUCKET_NAME/$tableName"
spark.sql(s"""
|DROP TABLE IF EXISTS $tableName;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS $tableName
|(
| l_orderkey bigint,
| l_partkey bigint,
| l_suppkey bigint,
| l_linenumber bigint,
| l_quantity double,
| l_extendedprice double,
| l_discount double,
| l_tax double,
| l_returnflag string,
| l_linestatus string,
| l_shipdate date,
| l_commitdate date,
| l_receiptdate date,
| l_shipinstruct string,
| l_shipmode string,
| l_comment string
|)
|USING clickhouse
|TBLPROPERTIES (storage_policy='__s3_main', orderByKey='l_shipdate')
|LOCATION '$dataPath'
|""".stripMargin)

spark.sql(s"""
| insert into table $tableName
| select * from lineitem
|""".stripMargin)

FileUtils.forceDelete(new File(S3_METADATA_PATH))

val sqlStr =
s"""
|SELECT
| sum(l_extendedprice * l_discount) AS revenue
|FROM
| $tableName
|WHERE
| l_shipdate >= date'1994-01-01'
| AND l_shipdate < date'1994-01-01' + interval 1 year
| AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
| AND l_quantity < 24
|""".stripMargin

withSQLConf(
("spark.gluten.sql.columnar.backend.ch.runtime_settings.enabled_driver_filter_mergetree_index" -> "true")) {
runTPCHQueryBySQL(6, sqlStr) {
df =>
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)

val mergetreeScan = scanExec(0)
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val plans = collect(df.queryExecution.executedPlan) {
case scanExec: BasicScanExecTransformer => scanExec
}
assert(plans.size == 1)
assert(plans(0).getSplitInfos.size == 1)
}
}
}
}
// scalastyle:off line.size.limit
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ String MergeTreeRelParser::filterRangesOnDriver(const substrait::ReadRel & read_
google::protobuf::StringValue table;
table.ParseFromString(read_rel.advanced_extension().enhancement().value());
auto merge_tree_table = parseMergeTreeTableString(table.value());
auto custom_storage_mergetree = parseStorage(merge_tree_table, global_context);
auto custom_storage_mergetree = parseStorage(merge_tree_table, global_context, true);

auto input = TypeParser::buildBlockFromNamedStruct(read_rel.base_schema());
auto names_and_types_list = input.getNamesAndTypesList();
Expand Down

0 comments on commit 070022a

Please sign in to comment.