Skip to content

Commit

Permalink
[GLUTEN-6750] Fix optimize error if file mappings not loaded
Browse files Browse the repository at this point in the history
  • Loading branch information
lwz9103 committed Aug 8, 2024
1 parent e60dfd2 commit 163b774
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper

import io.delta.tables.ClickhouseTable
import org.apache.commons.io.FileUtils

import java.io.File

Expand Down Expand Up @@ -500,5 +501,47 @@ class GlutenClickHouseMergeTreeOptimizeSuite
}
}
}

test("GLUTEN-6750: Optimize error if file metadata not exist") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_bucket_s3;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS lineitem_mergetree_bucket_s3
|(
| l_orderkey bigint,
| l_partkey bigint,
| l_suppkey bigint,
| l_linenumber bigint,
| l_quantity double,
| l_extendedprice double,
| l_discount double,
| l_tax double,
| l_returnflag string,
| l_linestatus string,
| l_shipdate date,
| l_commitdate date,
| l_receiptdate date,
| l_shipinstruct string,
| l_shipmode string,
| l_comment string
|)
|USING clickhouse
|PARTITIONED BY (l_returnflag)
|CLUSTERED BY (l_orderkey)
|${if (sparkVersion.equals("3.2")) "" else "SORTED BY (l_partkey)"} INTO 4 BUCKETS
|LOCATION 's3a://$BUCKET_NAME/lineitem_mergetree_bucket_s3'
|TBLPROPERTIES (storage_policy='__s3_main')
|""".stripMargin)

spark.sql(s"""
| insert into table lineitem_mergetree_bucket_s3
| select /*+ REPARTITION(3) */ * from lineitem
|""".stripMargin)

FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
spark.sql("optimize lineitem_mergetree_bucket_s3");
}
}
// scalastyle:off line.size.limit
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu
val HDFS_URL_ENDPOINT = s"hdfs://127.0.0.1:8020"
val HDFS_URL = s"$HDFS_URL_ENDPOINT/$sparkVersion"

val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4"
val S3_SECRET_KEY = "K9MDaGItPSaphorZM8t4hXf30gHF9dBWi6L2dK5E"
val S3_ACCESS_KEY = "minioadmin"
val S3_SECRET_KEY = "minioadmin"

val CH_DEFAULT_STORAGE_DIR = "/data"

Expand Down Expand Up @@ -89,6 +89,7 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu
.set("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
.set("spark.hadoop.fs.s3a.path.style.access", "true")
.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.path", "/tmp")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.type",
"s3_gluten")
Expand Down
2 changes: 2 additions & 0 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,8 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
// each task using its own CustomStorageMergeTree, don't reuse
auto temp_storage
= local_engine::MergeTreeRelParser::copyToVirtualStorage(merge_tree_table, context);
// prefetch all needed parts metadata before merge
local_engine::restoreMetaData(temp_storage, merge_tree_table, *context);

local_engine::TempStorageFreer freer{temp_storage->getStorageID()}; // to release temp CustomStorageMergeTree with RAII
std::vector<DB::DataPartPtr> selected_parts = local_engine::StorageMergeTreeFactory::instance().getDataPartsByNames(
Expand Down

0 comments on commit 163b774

Please sign in to comment.