diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala index e8ddbd12f1fc..88a34a786a8c 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala @@ -1461,4 +1461,26 @@ class GlutenClickHouseFileFormatSuite spark.createDataFrame(data, schema).toDF().write.parquet(fileName) fileName } + + /** TODO: fix the issue and test in spark 3.5 */ + testSparkVersionLE33("write into hdfs") { + + /** + * There is a bug in pipeline write to HDFS; when a pipeline returns column batch, it doesn't + * close the hdfs file, and hence the file is not flushed.HDFS file is closed when LocalExecutor + * is destroyed, but before that, the file moved by spark committer. + */ + val tableName = "write_into_hdfs" + val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/$tableName/" + val format = "parquet" + val sql = + s""" + | select * + | from $format.`$tablePath` + | where long_field > 30 + |""".stripMargin + withSQLConf(("spark.gluten.sql.native.writer.enabled", "true")) { + testFileFormatBase(tablePath, format, sql, df => {}) + } + } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala index b3e1bd21e957..0bd19dd97172 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala @@ -35,23 +35,23 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu val DBL_RELAX_EPSILON: Double = Math.pow(10, -11) val FLT_EPSILON = 1.19209290e-07f - protected val sparkVersion: String = { + private val sparkVersion: String = { val version = SPARK_VERSION_SHORT.split("\\.") version(0) + "." + version(1) } + val SPARK_DIR_NAME: String = sparkVersion.replace(".", "-") - val S3_METADATA_PATH = s"/tmp/metadata/s3/$sparkVersion/" - val S3_CACHE_PATH = s"/tmp/s3_cache/$sparkVersion/" + val S3_METADATA_PATH = s"/tmp/metadata/s3/$SPARK_DIR_NAME/" + val S3_CACHE_PATH = s"/tmp/s3_cache/$SPARK_DIR_NAME/" val S3_ENDPOINT = "s3://127.0.0.1:9000/" val MINIO_ENDPOINT: String = S3_ENDPOINT.replace("s3", "http") - val SPARK_DIR_NAME: String = sparkVersion.replace(".", "-") val BUCKET_NAME: String = SPARK_DIR_NAME val WHOLE_PATH: String = MINIO_ENDPOINT + BUCKET_NAME + "/" - val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$sparkVersion/" - val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$sparkVersion/" + val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$SPARK_DIR_NAME/" + val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$SPARK_DIR_NAME/" val HDFS_URL_ENDPOINT = "hdfs://127.0.0.1:8020" - val HDFS_URL = s"$HDFS_URL_ENDPOINT/$sparkVersion" + val HDFS_URL = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME" val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4" val S3_SECRET_KEY = "K9MDaGItPSaphorZM8t4hXf30gHF9dBWi6L2dK5E" diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala index bf3be1e52907..a85a9094d38f 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala @@ -134,7 +134,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite | aaa='ccc')""".stripMargin) .collect() assertResult(true)(res(0).getBoolean(0)) - val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs") + val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs") assertResult(true)(metaPath.exists() && metaPath.isDirectory) assertResult(22)(metaPath.list().length) assert(countFiles(dataPath) > initial_cache_files) @@ -238,7 +238,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite | aaa='ccc')""".stripMargin) .collect() assertResult(true)(res(0).getBoolean(0)) - val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs") + val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs") assertResult(true)(metaPath.exists() && metaPath.isDirectory) eventually(timeout(60.seconds), interval(2.seconds)) { assertResult(22)(metaPath.list().length) @@ -346,7 +346,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite | aaa='ccc')""".stripMargin) .collect() assertResult(true)(res(0).getBoolean(0)) - val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs") + val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs") assertResult(true)(metaPath.exists() && metaPath.isDirectory) assertResult(22)(metaPath.list().length) assert(countFiles(dataPath) > initial_cache_files) @@ -439,7 +439,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite val dataPath = new File(HDFS_CACHE_PATH) val initial_cache_files = countFiles(dataPath) - val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs") + val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs") val res1 = spark.sql(s"cache data select * from lineitem_mergetree_hdfs").collect() assertResult(true)(res1(0).getBoolean(0)) assertResult(1)(metaPath.list().length) @@ -539,7 +539,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite | aaa='ccc')""".stripMargin) .collect() assertResult(true)(res(0).getBoolean(0)) - val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs") + val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs") assertResult(true)(metaPath.exists() && metaPath.isDirectory) assertResult(22)(metaPath.list().length) assert(countFiles(dataPath) > initial_cache_files) diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version index 6bdb05c33269..445cd99068a1 100644 --- a/cpp-ch/clickhouse.version +++ b/cpp-ch/clickhouse.version @@ -1,3 +1,3 @@ CH_ORG=Kyligence -CH_BRANCH=rebase_ch/20241030 -CH_COMMIT=847cfa6237c \ No newline at end of file +CH_BRANCH=rebase_ch/20241101 +CH_COMMIT=7cd7bb8ece2 \ No newline at end of file diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index 32ead1070880..9799933b3385 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -229,11 +229,10 @@ std::unique_ptr SerializedPlanParser::createExecutor(const substr QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list & rel_stack) { - DB::QueryPlanPtr query_plan; auto rel_parser = RelParserFactory::instance().getBuilder(rel.rel_type_case())(parser_context); auto all_input_rels = rel_parser->getInputs(rel); - assert(all_input_rels.size() == 1 || all_input_rels.size() == 2); + assert(all_input_rels.size() == 0 || all_input_rels.size() == 1 || all_input_rels.size() == 2); std::vector input_query_plans; rel_stack.push_back(&rel); for (const auto * input_rel : all_input_rels) @@ -276,7 +275,7 @@ QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list } } - query_plan = rel_parser->parse(input_query_plans, rel, rel_stack); + DB::QueryPlanPtr query_plan = rel_parser->parse(input_query_plans, rel, rel_stack); for (auto & extra_plan : rel_parser->extraPlans()) { extra_plan_holder.push_back(std::move(extra_plan)); diff --git a/cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp b/cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp index ea93480b7683..6926b86a34e0 100644 --- a/cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp +++ b/cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp @@ -29,7 +29,7 @@ namespace DB { namespace ErrorCodes { - extern const int BAD_ARGUMENTS; +extern const int BAD_ARGUMENTS; } } @@ -78,14 +78,16 @@ class HDFSFileWriteBufferBuilder : public WriteBufferBuilder auto builder = DB::createHDFSBuilder(new_file_uri, context->getConfigRef()); auto fs = DB::createHDFSFS(builder.get()); - auto first = new_file_uri.find('/', new_file_uri.find("//") + 2); + auto begin_of_path = new_file_uri.find('/', new_file_uri.find("//") + 2); auto last = new_file_uri.find_last_of('/'); - auto dir = new_file_uri.substr(first, last - first); + auto dir = new_file_uri.substr(begin_of_path, last - begin_of_path); if (hdfsCreateDirectory(fs.get(), dir.c_str())) throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Cannot create dir for {} because {}", dir, std::string(hdfsGetLastError())); + const std::string hdfs_file_path = new_file_uri.substr(begin_of_path); + const std::string hdfs_uri_without_path = new_file_uri.substr(0, begin_of_path); DB::WriteSettings write_settings; - return std::make_unique(new_file_uri, context->getConfigRef(), 0, write_settings); + return std::make_unique(hdfs_uri_without_path, hdfs_file_path, context->getConfigRef(), 0, write_settings); } }; #endif