Skip to content

Commit

Permalink
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20241101) (apache#7762
Browse files Browse the repository at this point in the history
)

* [GLUTEN-1632][CH]Daily Update Clickhouse Version (20241101)

* Fix Build due to ClickHouse/ClickHouse#71105

* Fix assert issue  in debug build

* using SPARK_DIR_NAME instead of sparkVersion

* Add UT for ClickHouse/ClickHouse#71105

---------

Co-authored-by: kyligence-git <[email protected]>
Co-authored-by: Chang Chen <[email protected]>
  • Loading branch information
3 people authored Nov 1, 2024
1 parent 62ed4ab commit 61ae6f6
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1461,4 +1461,26 @@ class GlutenClickHouseFileFormatSuite
spark.createDataFrame(data, schema).toDF().write.parquet(fileName)
fileName
}

/** TODO: fix the issue and test in spark 3.5 */
testSparkVersionLE33("write into hdfs") {

/**
* There is a bug in pipeline write to HDFS; when a pipeline returns column batch, it doesn't
* close the hdfs file, and hence the file is not flushed.HDFS file is closed when LocalExecutor
* is destroyed, but before that, the file moved by spark committer.
*/
val tableName = "write_into_hdfs"
val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/$tableName/"
val format = "parquet"
val sql =
s"""
| select *
| from $format.`$tablePath`
| where long_field > 30
|""".stripMargin
withSQLConf(("spark.gluten.sql.native.writer.enabled", "true")) {
testFileFormatBase(tablePath, format, sql, df => {})
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,23 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu
val DBL_RELAX_EPSILON: Double = Math.pow(10, -11)
val FLT_EPSILON = 1.19209290e-07f

protected val sparkVersion: String = {
private val sparkVersion: String = {
val version = SPARK_VERSION_SHORT.split("\\.")
version(0) + "." + version(1)
}
val SPARK_DIR_NAME: String = sparkVersion.replace(".", "-")

val S3_METADATA_PATH = s"/tmp/metadata/s3/$sparkVersion/"
val S3_CACHE_PATH = s"/tmp/s3_cache/$sparkVersion/"
val S3_METADATA_PATH = s"/tmp/metadata/s3/$SPARK_DIR_NAME/"
val S3_CACHE_PATH = s"/tmp/s3_cache/$SPARK_DIR_NAME/"
val S3_ENDPOINT = "s3://127.0.0.1:9000/"
val MINIO_ENDPOINT: String = S3_ENDPOINT.replace("s3", "http")
val SPARK_DIR_NAME: String = sparkVersion.replace(".", "-")
val BUCKET_NAME: String = SPARK_DIR_NAME
val WHOLE_PATH: String = MINIO_ENDPOINT + BUCKET_NAME + "/"

val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$sparkVersion/"
val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$sparkVersion/"
val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$SPARK_DIR_NAME/"
val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$SPARK_DIR_NAME/"
val HDFS_URL_ENDPOINT = "hdfs://127.0.0.1:8020"
val HDFS_URL = s"$HDFS_URL_ENDPOINT/$sparkVersion"
val HDFS_URL = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME"

val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4"
val S3_SECRET_KEY = "K9MDaGItPSaphorZM8t4hXf30gHF9dBWi6L2dK5E"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs")
val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
assertResult(22)(metaPath.list().length)
assert(countFiles(dataPath) > initial_cache_files)
Expand Down Expand Up @@ -238,7 +238,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs")
val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
eventually(timeout(60.seconds), interval(2.seconds)) {
assertResult(22)(metaPath.list().length)
Expand Down Expand Up @@ -346,7 +346,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs")
val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
assertResult(22)(metaPath.list().length)
assert(countFiles(dataPath) > initial_cache_files)
Expand Down Expand Up @@ -439,7 +439,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
val dataPath = new File(HDFS_CACHE_PATH)
val initial_cache_files = countFiles(dataPath)

val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs")
val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
val res1 = spark.sql(s"cache data select * from lineitem_mergetree_hdfs").collect()
assertResult(true)(res1(0).getBoolean(0))
assertResult(1)(metaPath.list().length)
Expand Down Expand Up @@ -539,7 +539,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs")
val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
assertResult(22)(metaPath.list().length)
assert(countFiles(dataPath) > initial_cache_files)
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20241030
CH_COMMIT=847cfa6237c
CH_BRANCH=rebase_ch/20241101
CH_COMMIT=7cd7bb8ece2
5 changes: 2 additions & 3 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,10 @@ std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(const substr

QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack)
{
DB::QueryPlanPtr query_plan;
auto rel_parser = RelParserFactory::instance().getBuilder(rel.rel_type_case())(parser_context);

auto all_input_rels = rel_parser->getInputs(rel);
assert(all_input_rels.size() == 1 || all_input_rels.size() == 2);
assert(all_input_rels.size() == 0 || all_input_rels.size() == 1 || all_input_rels.size() == 2);
std::vector<DB::QueryPlanPtr> input_query_plans;
rel_stack.push_back(&rel);
for (const auto * input_rel : all_input_rels)
Expand Down Expand Up @@ -276,7 +275,7 @@ QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list
}
}

query_plan = rel_parser->parse(input_query_plans, rel, rel_stack);
DB::QueryPlanPtr query_plan = rel_parser->parse(input_query_plans, rel, rel_stack);
for (auto & extra_plan : rel_parser->extraPlans())
{
extra_plan_holder.push_back(std::move(extra_plan));
Expand Down
10 changes: 6 additions & 4 deletions cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int BAD_ARGUMENTS;
}
}

Expand Down Expand Up @@ -78,14 +78,16 @@ class HDFSFileWriteBufferBuilder : public WriteBufferBuilder

auto builder = DB::createHDFSBuilder(new_file_uri, context->getConfigRef());
auto fs = DB::createHDFSFS(builder.get());
auto first = new_file_uri.find('/', new_file_uri.find("//") + 2);
auto begin_of_path = new_file_uri.find('/', new_file_uri.find("//") + 2);
auto last = new_file_uri.find_last_of('/');
auto dir = new_file_uri.substr(first, last - first);
auto dir = new_file_uri.substr(begin_of_path, last - begin_of_path);
if (hdfsCreateDirectory(fs.get(), dir.c_str()))
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Cannot create dir for {} because {}", dir, std::string(hdfsGetLastError()));

const std::string hdfs_file_path = new_file_uri.substr(begin_of_path);
const std::string hdfs_uri_without_path = new_file_uri.substr(0, begin_of_path);
DB::WriteSettings write_settings;
return std::make_unique<DB::WriteBufferFromHDFS>(new_file_uri, context->getConfigRef(), 0, write_settings);
return std::make_unique<DB::WriteBufferFromHDFS>(hdfs_uri_without_path, hdfs_file_path, context->getConfigRef(), 0, write_settings);
}
};
#endif
Expand Down

0 comments on commit 61ae6f6

Please sign in to comment.