From 85866a016d8dba25944f460fcb54f0340a84e4e1 Mon Sep 17 00:00:00 2001 From: loneylee Date: Tue, 11 Jun 2024 15:29:52 +0800 Subject: [PATCH 1/2] fix filter bug --- ...tenClickHouseMergeTreeWriteOnS3Suite.scala | 86 +++++++++++++++++-- .../Parser/MergeTreeRelParser.cpp | 2 +- 2 files changed, 82 insertions(+), 6 deletions(-) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala index 44c2af76f933..cac88b28be67 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala @@ -635,29 +635,105 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite } test("test mergetree insert with optimize basic") { - val table_name = "lineitem_mergetree_insert_optimize_basic_s3" - val dataPath = s"s3a://$BUCKET_NAME/$table_name" + val tableName = "lineitem_mergetree_insert_optimize_basic_s3" + val dataPath = s"s3a://$BUCKET_NAME/$tableName" withSQLConf( ("spark.databricks.delta.optimize.minFileSize" -> "200000000"), ("spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true") ) { spark.sql(s""" - |DROP TABLE IF EXISTS $table_name; + |DROP TABLE IF EXISTS $tableName; |""".stripMargin) spark.sql(s""" - |CREATE TABLE IF NOT EXISTS $table_name + |CREATE TABLE IF NOT EXISTS $tableName |USING clickhouse |LOCATION '$dataPath' | as select * from lineitem |""".stripMargin) - val ret = spark.sql(s"select count(*) from $table_name").collect() + val ret = spark.sql(s"select count(*) from $tableName").collect() assert(ret.apply(0).get(0) == 600572) assert( !new File(s"$CH_DEFAULT_STORAGE_DIR/lineitem_mergetree_insert_optimize_basic").exists()) } } + + test("test mergetree with primary keys pruning by driver") { + val tableName = "lineitem_mergetree_pk_pruning_by_driver_s3" + val dataPath = s"s3a://$BUCKET_NAME/$tableName" + spark.sql( + s""" + |DROP TABLE IF EXISTS $tableName; + |""".stripMargin) + + spark.sql( + s""" + |CREATE TABLE IF NOT EXISTS $tableName + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |TBLPROPERTIES (storage_policy='__s3_main', orderByKey='l_shipdate') + |LOCATION '$dataPath' + |""".stripMargin) + + spark.sql( + s""" + | insert into table $tableName + | select * from lineitem + |""".stripMargin) + + FileUtils.forceDelete(new File(S3_METADATA_PATH)) + + val sqlStr = + s""" + |SELECT + | sum(l_extendedprice * l_discount) AS revenue + |FROM + | $tableName + |WHERE + | l_shipdate >= date'1994-01-01' + | AND l_shipdate < date'1994-01-01' + interval 1 year + | AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01 + | AND l_quantity < 24 + |""".stripMargin + + withSQLConf( + ("spark.gluten.sql.columnar.backend.ch.runtime_settings.enabled_driver_filter_mergetree_index" -> "true")) { + runTPCHQueryBySQL(6, sqlStr) { + df => + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + + val mergetreeScan = scanExec(0) + assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) + + val plans = collect(df.queryExecution.executedPlan) { + case scanExec: BasicScanExecTransformer => scanExec + } + assert(plans.size == 1) + assert(plans(0).getSplitInfos.size == 1) + } + } + } } // scalastyle:off line.size.limit diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp index 9afa83973cd8..c36db6b7484a 100644 --- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp +++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp @@ -398,7 +398,7 @@ String MergeTreeRelParser::filterRangesOnDriver(const substrait::ReadRel & read_ google::protobuf::StringValue table; table.ParseFromString(read_rel.advanced_extension().enhancement().value()); auto merge_tree_table = parseMergeTreeTableString(table.value()); - auto custom_storage_mergetree = parseStorage(merge_tree_table, global_context); + auto custom_storage_mergetree = parseStorage(merge_tree_table, global_context, true); auto input = TypeParser::buildBlockFromNamedStruct(read_rel.base_schema()); auto names_and_types_list = input.getNamesAndTypesList(); From 955778bf433404f2a83ae00f067c9a3d93aea726 Mon Sep 17 00:00:00 2001 From: loneylee Date: Tue, 11 Jun 2024 16:33:36 +0800 Subject: [PATCH 2/2] fix style --- ...tenClickHouseMergeTreeWriteOnS3Suite.scala | 67 +++++++++---------- 1 file changed, 32 insertions(+), 35 deletions(-) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala index cac88b28be67..c5dc3a23754e 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala @@ -663,42 +663,39 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite test("test mergetree with primary keys pruning by driver") { val tableName = "lineitem_mergetree_pk_pruning_by_driver_s3" val dataPath = s"s3a://$BUCKET_NAME/$tableName" - spark.sql( - s""" - |DROP TABLE IF EXISTS $tableName; - |""".stripMargin) + spark.sql(s""" + |DROP TABLE IF EXISTS $tableName; + |""".stripMargin) - spark.sql( - s""" - |CREATE TABLE IF NOT EXISTS $tableName - |( - | l_orderkey bigint, - | l_partkey bigint, - | l_suppkey bigint, - | l_linenumber bigint, - | l_quantity double, - | l_extendedprice double, - | l_discount double, - | l_tax double, - | l_returnflag string, - | l_linestatus string, - | l_shipdate date, - | l_commitdate date, - | l_receiptdate date, - | l_shipinstruct string, - | l_shipmode string, - | l_comment string - |) - |USING clickhouse - |TBLPROPERTIES (storage_policy='__s3_main', orderByKey='l_shipdate') - |LOCATION '$dataPath' - |""".stripMargin) - - spark.sql( - s""" - | insert into table $tableName - | select * from lineitem - |""".stripMargin) + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS $tableName + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |TBLPROPERTIES (storage_policy='__s3_main', orderByKey='l_shipdate') + |LOCATION '$dataPath' + |""".stripMargin) + + spark.sql(s""" + | insert into table $tableName + | select * from lineitem + |""".stripMargin) FileUtils.forceDelete(new File(S3_METADATA_PATH))