From 163b7749a1044b81630fe07640037f0f58c20a74 Mon Sep 17 00:00:00 2001 From: lwz9103 Date: Thu, 8 Aug 2024 11:32:07 +0800 Subject: [PATCH] [GLUTEN-6750] Fix optimize error if file mappings not loaded --- ...utenClickHouseMergeTreeOptimizeSuite.scala | 43 +++++++++++++++++++ ...ClickHouseWholeStageTransformerSuite.scala | 5 ++- cpp-ch/local-engine/local_engine_jni.cpp | 2 + 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala index 7989c02ba872a..65cc5d4675158 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.SaveMode import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import io.delta.tables.ClickhouseTable +import org.apache.commons.io.FileUtils import java.io.File @@ -500,5 +501,47 @@ class GlutenClickHouseMergeTreeOptimizeSuite } } } + + test("GLUTEN-6750: Optimize error if file metadata not exist") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_bucket_s3; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_bucket_s3 + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |PARTITIONED BY (l_returnflag) + |CLUSTERED BY (l_orderkey) + |${if (sparkVersion.equals("3.2")) "" else "SORTED BY (l_partkey)"} INTO 4 BUCKETS + |LOCATION 's3a://$BUCKET_NAME/lineitem_mergetree_bucket_s3' + |TBLPROPERTIES (storage_policy='__s3_main') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_bucket_s3 + | select /*+ REPARTITION(3) */ * from lineitem + |""".stripMargin) + + FileUtils.deleteDirectory(new File(S3_METADATA_PATH)) + spark.sql("optimize lineitem_mergetree_bucket_s3"); + } } // scalastyle:off line.size.limit diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala index 4972861152fde..8945be97a2d00 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala @@ -53,8 +53,8 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu val HDFS_URL_ENDPOINT = s"hdfs://127.0.0.1:8020" val HDFS_URL = s"$HDFS_URL_ENDPOINT/$sparkVersion" - val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4" - val S3_SECRET_KEY = "K9MDaGItPSaphorZM8t4hXf30gHF9dBWi6L2dK5E" + val S3_ACCESS_KEY = "minioadmin" + val S3_SECRET_KEY = "minioadmin" val CH_DEFAULT_STORAGE_DIR = "/data" @@ -89,6 +89,7 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu .set("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT) .set("spark.hadoop.fs.s3a.path.style.access", "true") .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") + .set("spark.gluten.sql.columnar.backend.ch.runtime_config.path", "/tmp") .set( "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.type", "s3_gluten") diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index c4e8ec67b106a..db0dd8b623b6c 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -995,6 +995,8 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn // each task using its own CustomStorageMergeTree, don't reuse auto temp_storage = local_engine::MergeTreeRelParser::copyToVirtualStorage(merge_tree_table, context); + // prefetch all needed parts metadata before merge + local_engine::restoreMetaData(temp_storage, merge_tree_table, *context); local_engine::TempStorageFreer freer{temp_storage->getStorageID()}; // to release temp CustomStorageMergeTree with RAII std::vector selected_parts = local_engine::StorageMergeTreeFactory::instance().getDataPartsByNames(