diff --git a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java index 9ca301efb230..c041ee352c42 100644 --- a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java +++ b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java @@ -27,7 +27,9 @@ public native long nativeInitMergeTreeWriterWrapper( String uuid, String taskId, String partition_dir, - String bucket_dir); + String bucket_dir, + byte[] confArray, + long allocId); public native String nativeMergeMTParts( byte[] plan, diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index c73b7e7aff9b..e31560259720 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.delta +import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings import org.apache.gluten.execution.ColumnarToRowExecBase import org.apache.spark.SparkException @@ -109,7 +110,7 @@ class ClickhouseOptimisticTransaction( // Retain only a minimal selection of Spark writer options to avoid any potential // compatibility issues - val options = writeOptions match { + var options = writeOptions match { case None => Map.empty[String, String] case Some(writeOptions) => writeOptions.options.filterKeys { @@ -119,6 +120,16 @@ class ClickhouseOptimisticTransaction( }.toMap } + spark.conf.getAll.foreach( + entry => { + if ( + entry._1.startsWith(s"${CHBackendSettings.getBackendConfigPrefix}.runtime_settings") + || entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key) + ) { + options += (entry._1 -> entry._2) + } + }) + try { val tableV2 = ClickHouseTableV2.getTable(deltaLog) MergeTreeFileFormatWriter.write( diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala index 64aa8863b881..8a61385fcbd0 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala @@ -16,10 +16,13 @@ */ package org.apache.spark.sql.execution.datasources.v1 +import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.expression.ConverterUtils +import org.apache.gluten.memory.alloc.CHNativeMemoryAllocators import org.apache.gluten.substrait.`type`.ColumnTypeNode import org.apache.gluten.substrait.SubstraitContext -import org.apache.gluten.substrait.extensions.ExtensionBuilder +import org.apache.gluten.substrait.expression.{ExpressionBuilder, StringMapNode} +import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode, ExtensionBuilder} import org.apache.gluten.substrait.plan.PlanBuilder import org.apache.gluten.substrait.rel.{ExtensionTableBuilder, RelBuilder} @@ -39,7 +42,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext import java.util.{ArrayList => JList, Map => JMap, UUID} import scala.collection.JavaConverters._ -import scala.collection.mutable + case class PlanWithSplitInfo(plan: Array[Byte], splitInfo: Array[Byte]) class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase { @@ -47,10 +50,7 @@ class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase { override def nativeConf( options: Map[String, String], compressionCodec: String): JMap[String, String] = { - // pass options to native so that velox can take user-specified conf to write parquet, - // i.e., compression, block size, block rows. - val sparkOptions = new mutable.HashMap[String, String]() - sparkOptions.asJava + options.asJava } override def createOutputWriter( @@ -95,7 +95,7 @@ class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase { clickhouseTableConfigs, tableSchema.toAttributes // use table schema instead of data schema ) - + val allocId = CHNativeMemoryAllocators.contextInstance.getNativeInstanceId val datasourceJniWrapper = new CHDatasourceJniWrapper() val instance = datasourceJniWrapper.nativeInitMergeTreeWriterWrapper( @@ -104,7 +104,9 @@ class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase { uuid, context.getTaskAttemptID.getTaskID.getId.toString, context.getConfiguration.get("mapreduce.task.gluten.mergetree.partition.dir"), - context.getConfiguration.get("mapreduce.task.gluten.mergetree.bucketid.str") + context.getConfiguration.get("mapreduce.task.gluten.mergetree.bucketid.str"), + buildNativeConf(nativeConf), + allocId ) new MergeTreeOutputWriter(database, tableName, datasourceJniWrapper, instance, path) @@ -121,6 +123,13 @@ class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase { override def getFormatName(): String = { "mergetree" } + + private def buildNativeConf(confs: JMap[String, String]): Array[Byte] = { + val stringMapNode: StringMapNode = ExpressionBuilder.makeStringMap(confs) + val extensionNode: AdvancedExtensionNode = ExtensionBuilder.makeAdvancedExtension( + BackendsApiManager.getTransformerApiInstance.packPBMessage(stringMapNode.toProtobuf)) + PlanBuilder.makePlan(extensionNode).toProtobuf.toByteArray + } } object CHMergeTreeWriterInjects { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala index 9635b9958875..ae0cd170d3fa 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala @@ -24,6 +24,8 @@ import io.delta.tables.ClickhouseTable import java.io.File +import scala.concurrent.duration.DurationInt + // Some sqls' line length exceeds 100 // scalastyle:off line.size.limit @@ -54,6 +56,9 @@ class GlutenClickHouseMergeTreeOptimizeSuite "spark.databricks.delta.retentionDurationCheck.enabled", "false" ) // otherwise RETAIN 0 HOURS will fail + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert", + "false") } override protected def createTPCHNotNullTables(): Unit = { @@ -426,5 +431,31 @@ class GlutenClickHouseMergeTreeOptimizeSuite val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect() assert(ret.apply(0).get(0) == 600572) } + + test("test mergetree insert with optimize basic") { + withSQLConf( + ("spark.databricks.delta.optimize.minFileSize" -> "200000000"), + ("spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true") + ) { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_insert_optimize_basic; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_insert_optimize_basic + |USING clickhouse + |LOCATION '$basePath/lineitem_mergetree_insert_optimize_basic' + | as select * from lineitem + |""".stripMargin) + + val ret = spark.sql("select count(*) from lineitem_mergetree_insert_optimize_basic").collect() + assert(ret.apply(0).get(0) == 600572) + eventually(timeout(60.seconds), interval(3.seconds)) { + assert( + new File(s"$basePath/lineitem_mergetree_insert_optimize_basic").listFiles().length == 2 + ) + } + } + } } // scalastyle:off line.size.limit diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala index d852175367be..93f22baa2575 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala @@ -57,6 +57,9 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite .set( "spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows", "100000") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert", + "false") } override protected def createTPCHNotNullTables(): Unit = { @@ -170,8 +173,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite .format("clickhouse") .load(dataPath) .where("l_shipdate = date'1998-09-02'") - .collect() - assert(result.apply(0).get(0) == 110501) + .count() + assert(result == 183) } test("test mergetree path based write with dataframe api") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala index 49011e031962..ca5b39fff1ac 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala @@ -54,6 +54,9 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.sql.adaptive.enabled", "true") .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "error") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert", + "false") } override protected def beforeEach(): Unit = { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala deleted file mode 100644 index 90db0f5d86c3..000000000000 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.execution - -import org.apache.gluten.GlutenConfig - -import org.apache.spark.sql.SparkSession - -import _root_.org.apache.spark.SparkConf -import _root_.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.commons.io.FileUtils - -import java.io.File - -// Some sqls' line length exceeds 100 -// scalastyle:off line.size.limit - -class GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite - extends GlutenClickHouseTPCHAbstractSuite - with AdaptiveSparkPlanHelper { - private var _spark: SparkSession = _ - - override protected def spark: SparkSession = _spark - - override protected val needCopyParquetToTablePath = true - - override protected val tablesPath: String = basePath + "/tpch-data" - override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch" - override protected val queriesResults: String = rootPath + "mergetree-queries-output" - - override protected def initializeSession(): Unit = { - if (_spark == null) { - _spark = SparkSession - .builder() - .appName("Gluten-UT-RemoteHS") - .config(sparkConf) - .getOrCreate() - } - } - - override protected def sparkConf: SparkConf = { - super.sparkConf - .setMaster("local[2]") - .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") - .set("spark.io.compression.codec", "LZ4") - .set("spark.sql.shuffle.partitions", "5") - .set("spark.sql.autoBroadcastJoinThreshold", "10MB") - .set("spark.sql.adaptive.enabled", "true") - .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "error") - } - override protected def createTPCHNotNullTables(): Unit = { - createNotNullTPCHTablesInParquet(tablesPath) - } - - override protected def afterAll(): Unit = { - try { - super.afterAll() - } finally { - try { - if (_spark != null) { - try { - _spark.sessionState.catalog.reset() - } finally { - _spark.stop() - _spark = null - } - } - } finally { - SparkSession.clearActiveSession() - SparkSession.clearDefaultSession() - } - } - - FileUtils.forceDelete(new File(basePath)) - // init GlutenConfig in the next beforeAll - GlutenConfig.ins = null - } -} -// scalastyle:off line.size.limit diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala index 6f10035faca9..439a1b58fd4f 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala @@ -54,6 +54,9 @@ class GlutenClickHouseMergeTreeWriteSuite .set( "spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows", "100000") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert", + "false") } override protected def createTPCHNotNullTables(): Unit = { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala index 36002b7e5a0d..a673d4ba3bb4 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala @@ -58,6 +58,9 @@ class GlutenClickHouseTableAfterRestart .set( "spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows", "100000") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert", + "false") } override protected def createTPCHNotNullTables(): Unit = { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala index e455c956d3c8..a891d6d1027b 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala @@ -161,6 +161,9 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu } override def beforeAll(): Unit = { + // is not exist may cause some ut error + assert(new File("/data").exists()) + // prepare working paths val basePathDir = new File(basePath) if (basePathDir.exists()) { diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index ba86fe30673b..9704b3041cd9 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -323,34 +323,6 @@ std::string PlanUtil::explainPlan(DB::QueryPlan & plan) return plan_str; } -std::vector MergeTreeUtil::getAllMergeTreeParts(const Path & storage_path) -{ - if (!fs::exists(storage_path)) - throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid merge tree store path:{}", storage_path.string()); - - // TODO: May need to check the storage format version - std::vector res; - for (const auto & entry : fs::directory_iterator(storage_path)) - { - auto filename = entry.path().filename(); - if (filename == "format_version.txt" || filename == "detached" || filename == "_delta_log") - continue; - res.push_back(entry.path()); - } - return res; -} - -DB::NamesAndTypesList MergeTreeUtil::getSchemaFromMergeTreePart(const fs::path & part_path) -{ - DB::NamesAndTypesList names_types_list; - if (!fs::exists(part_path)) - throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid merge tree store path:{}", part_path.string()); - DB::ReadBufferFromFile readbuffer((part_path / "columns.txt").string()); - names_types_list.readText(readbuffer); - return names_types_list; -} - - NestedColumnExtractHelper::NestedColumnExtractHelper(const DB::Block & block_, bool case_insentive_) : block(block_), case_insentive(case_insentive_) { @@ -594,10 +566,21 @@ void BackendInitializerUtil::initEnvs(DB::Context::ConfigurationPtr config) spark_user = spark_user_c_str; } +DB::Field BackendInitializerUtil::toField(const String key, const String value) +{ + if (BOOL_VALUE_SETTINGS.contains(key)) + return DB::Field(value == "true" || value == "1"); + else if (LONG_VALUE_SETTINGS.contains(key)) + return DB::Field(std::strtoll(value.c_str(), NULL, 10)); + else + return DB::Field(value); +} + void BackendInitializerUtil::initSettings(std::map & backend_conf_map, DB::Settings & settings) { /// Initialize default setting. settings.set("date_time_input_format", "best_effort"); + settings.set("mergetree.merge_after_insert", true); for (const auto & [key, value] : backend_conf_map) { @@ -609,7 +592,8 @@ void BackendInitializerUtil::initSettings(std::map & b } else if (key.starts_with(CH_RUNTIME_SETTINGS_PREFIX)) { - settings.set(key.substr(CH_RUNTIME_SETTINGS_PREFIX.size()), value); + auto k = key.substr(CH_RUNTIME_SETTINGS_PREFIX.size()); + settings.set(k, toField(k, value)); LOG_DEBUG(&Poco::Logger::get("CHUtil"), "Set settings key:{} value:{}", key, value); } else if (key.starts_with(SPARK_HADOOP_PREFIX + S3A_PREFIX)) @@ -624,6 +608,12 @@ void BackendInitializerUtil::initSettings(std::map & b // 4. fs.s3a.bucket.bucket_name.assumed.role.externalId (non hadoop official) settings.set(key.substr(SPARK_HADOOP_PREFIX.length()), value); } + else if (key.starts_with(SPARK_DELTA_PREFIX)) + { + auto k = key.substr(SPARK_DELTA_PREFIX.size()); + settings.set(k, toField(k, value)); + LOG_DEBUG(&Poco::Logger::get("CHUtil"), "Set settings key:{} value:{}", key, value); + } } /// Finally apply some fixed kvs to settings. diff --git a/cpp-ch/local-engine/Common/CHUtil.h b/cpp-ch/local-engine/Common/CHUtil.h index 308e22422cdb..574cdbe4c8d7 100644 --- a/cpp-ch/local-engine/Common/CHUtil.h +++ b/cpp-ch/local-engine/Common/CHUtil.h @@ -32,6 +32,10 @@ namespace local_engine { +static const std::unordered_set BOOL_VALUE_SETTINGS{"mergetree.merge_after_insert"}; +static const std::unordered_set LONG_VALUE_SETTINGS{ + "optimize.maxfilesize", "optimize.minFileSize", "mergetree.max_num_part_per_merge_task"}; + class BlockUtil { public: @@ -98,14 +102,6 @@ class PlanUtil static std::string explainPlan(DB::QueryPlan & plan); }; -class MergeTreeUtil -{ -public: - using Path = std::filesystem::path; - static std::vector getAllMergeTreeParts(const Path & storage_path); - static DB::NamesAndTypesList getSchemaFromMergeTreePart(const Path & part_path); -}; - class ActionsDAGUtil { public: @@ -131,6 +127,8 @@ class JNIUtils; class BackendInitializerUtil { public: + static DB::Field toField(const String key, const String value); + /// Initialize two kinds of resources /// 1. global level resources like global_context/shared_context, notice that they can only be initialized once in process lifetime /// 2. session level resources like settings/configs, they can be initialized multiple times following the lifetime of executor/driver @@ -166,6 +164,7 @@ class BackendInitializerUtil inline static const std::string HADOOP_S3_CLIENT_CACHE_IGNORE = "fs.s3a.client.cached.ignore"; inline static const std::string SPARK_HADOOP_PREFIX = "spark.hadoop."; inline static const std::string S3A_PREFIX = "fs.s3a."; + inline static const std::string SPARK_DELTA_PREFIX = "spark.databricks.delta."; /// On yarn mode, native writing on hdfs cluster takes yarn container user as the user passed to libhdfs3, which /// will cause permission issue because yarn container user is not the owner of the hdfs dir to be written. @@ -228,4 +227,60 @@ class MemoryUtil static UInt64 getMemoryRSS(); }; +template +class ConcurrentDeque +{ +public: + std::optional pop_front() + { + std::lock_guard lock(mtx); + + if (deq.empty()) + return {}; + + T t = deq.front(); + deq.pop_front(); + return t; + } + + void emplace_back(T value) + { + std::lock_guard lock(mtx); + deq.emplace_back(value); + } + + void emplace_back(std::vector values) + { + std::lock_guard lock(mtx); + deq.insert(deq.end(), values.begin(), values.end()); + } + + void emplace_front(T value) + { + std::lock_guard lock(mtx); + deq.emplace_front(value); + } + + size_t size() + { + std::lock_guard lock(mtx); + return deq.size(); + } + + bool empty() + { + std::lock_guard lock(mtx); + return deq.empty(); + } + + std::deque unsafeGet() + { + return deq; + } + +private: + std::deque deq; + mutable std::mutex mtx; +}; + } diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp b/cpp-ch/local-engine/Common/QueryContext.cpp index 9d2ea7f26ab6..c659e6f34ea1 100644 --- a/cpp-ch/local-engine/Common/QueryContext.cpp +++ b/cpp-ch/local-engine/Common/QueryContext.cpp @@ -68,6 +68,7 @@ int64_t initializeQuery(ReservationListenerWrapperPtr listener) listener->reserve(size); }; CurrentMemoryTracker::before_free = [listener](Int64 size) -> void { listener->free(size); }; + CurrentMemoryTracker::current_memory = [listener]() -> Int64 { return listener->currentMemory(); }; allocator_map.insert(allocator_id, allocator_context); return allocator_id; } diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp index cff32a83f394..368015fb9278 100644 --- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp +++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp @@ -26,6 +26,8 @@ namespace DB namespace ErrorCodes { extern const int DUPLICATE_DATA_PART; +extern const int NO_SUCH_DATA_PART; + } } @@ -146,9 +148,9 @@ CustomStorageMergeTree::CustomStorageMergeTree( std::atomic CustomStorageMergeTree::part_num; -MergeTreeData::MutableDataPartsVector CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set parts) +std::vector CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set parts) { - MutableDataPartsVector data_parts; + std::vector data_parts; const auto disk = getStoragePolicy()->getDisks().at(0); for (const auto& name : parts) { @@ -158,14 +160,8 @@ MergeTreeData::MutableDataPartsVector CustomStorageMergeTree::loadDataPartsWithN data_parts.emplace_back(res.part); } - if(getStorageID().hasUUID()) - { - // the following lines will modify storage's member. - // So when current storage is shared (when UUID is default Nil value), - // we should avoid modify because we don't have locks here - - calculateColumnAndSecondaryIndexSizesImpl(); // without it "test mergetree optimize partitioned by one low card column" will log ERROR - } + // without it "test mergetree optimize partitioned by one low card column" will log ERROR + calculateColumnAndSecondaryIndexSizesImpl(); return data_parts; } @@ -246,6 +242,38 @@ MergeTreeData::LoadPartResult CustomStorageMergeTree::loadDataPart( return res; } +void CustomStorageMergeTree::removePartFromMemory(const MergeTreeData::DataPartPtr & part_to_detach) +{ + auto lock = lockParts(); + bool removed_active_part = false; + bool restored_active_part = false; + + auto it_part = data_parts_by_info.find(part_to_detach->info); + if (it_part == data_parts_by_info.end()) + { + LOG_DEBUG(log, "No such data part {}", part_to_detach->getNameWithState()); + return; + } + + /// What if part_to_detach is a reference to *it_part? Make a new owner just in case. + /// Important to own part pointer here (not const reference), because it will be removed from data_parts_indexes + /// few lines below. + DataPartPtr part = *it_part; // NOLINT + + if (part->getState() == DataPartState::Active) + { + removePartContributionToColumnAndSecondaryIndexSizes(part); + removed_active_part = true; + } + + modifyPartState(it_part, DataPartState::Deleting); + LOG_TEST(log, "removePartFromMemory: removing {} from data_parts_indexes", part->getNameWithState()); + data_parts_indexes.erase(it_part); + + if (removed_active_part || restored_active_part) + resetObjectColumnsFromActiveParts(lock); +} + void CustomStorageMergeTree::dropPartNoWaitNoThrow(const String & /*part_name*/) { throw std::runtime_error("not implement"); diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h index 0aeee4ef98df..cd507a3ac751 100644 --- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h +++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h @@ -53,8 +53,8 @@ class CustomStorageMergeTree final : public MergeTreeData std::vector getMutationsStatus() const override; bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) override; std::map getUnfinishedMutationCommands() const override; - MutableDataPartsVector loadDataPartsWithNames(std::unordered_set parts); - + std::vector loadDataPartsWithNames(std::unordered_set parts); + void removePartFromMemory(const MergeTreeData::DataPartPtr & part_to_detach); MergeTreeDataWriter writer; MergeTreeDataSelectExecutor reader; @@ -84,6 +84,7 @@ class CustomStorageMergeTree final : public MergeTreeData bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & /*part*/) const override { return {}; } void attachRestoredParts(MutableDataPartsVector && /*parts*/) override { throw std::runtime_error("not implement"); } + }; } diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp index 21c0fc96895d..57bb804fa9d1 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp +++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp @@ -15,9 +15,13 @@ * limitations under the License. */ #include "MetaDataHelper.h" + #include #include +#include +#include +#include namespace CurrentMetrics { @@ -123,6 +127,7 @@ void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & void saveFileStatus( const DB::MergeTreeData & storage, const DB::ContextPtr& context, + const String & part_name, IDataPartStorage & data_part_storage) { const DiskPtr disk = storage.getStoragePolicy()->getAnyDisk(); @@ -142,5 +147,61 @@ void saveFileStatus( } out->finalize(); } + + LOG_DEBUG(&Poco::Logger::get("MetaDataHelper"), "Save part {} metadata success.", part_name); +} + + +std::vector mergeParts( + std::vector selected_parts, + std::unordered_map & partition_values, + const String & new_part_uuid, + CustomStorageMergeTreePtr storage, + const String & partition_dir, + const String & bucket_dir) +{ + auto future_part = std::make_shared(); + future_part->uuid = UUIDHelpers::generateV4(); + + future_part->assign(std::move(selected_parts)); + + future_part->name = ""; + if(!partition_dir.empty()) + { + future_part->name = partition_dir + "/"; + extractPartitionValues(partition_dir, partition_values); + } + if(!bucket_dir.empty()) + { + future_part->name = future_part->name + bucket_dir + "/"; + } + future_part->name = future_part->name + new_part_uuid + "-merged"; + + auto entry = std::make_shared(future_part, DB::CurrentlyMergingPartsTaggerPtr{}, std::make_shared()); + + // Copying a vector of columns `deduplicate by columns. + DB::IExecutableTask::TaskResultCallback f = [](bool) {}; + auto task = std::make_shared( + *storage, storage->getInMemoryMetadataPtr(), false, std::vector{}, false, entry, + DB::TableLockHolder{}, f); + + task->setCurrentTransaction(DB::MergeTreeTransactionHolder{}, DB::MergeTreeTransactionPtr{}); + + executeHere(task); + + std::unordered_set to_load{future_part->name}; + std::vector merged = storage->loadDataPartsWithNames(to_load); + return merged; +} + +void extractPartitionValues(const String & partition_dir, std::unordered_map & partition_values) +{ + Poco::StringTokenizer partitions(partition_dir, "/"); + for (const auto & partition : partitions) + { + Poco::StringTokenizer key_value(partition, "="); + chassert(key_value.count() == 2); + partition_values.emplace(key_value[0], key_value[1]); + } } } \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h index b15a15322d10..7163ee02c1e2 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h +++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h @@ -28,6 +28,16 @@ void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & void saveFileStatus( const DB::MergeTreeData & storage, const DB::ContextPtr& context, + const String & part_name, IDataPartStorage & data_part_storage); +std::vector mergeParts( + std::vector selected_parts, + std::unordered_map & partition_values, + const String & new_part_uuid, + CustomStorageMergeTreePtr storage, + const String & partition_dir, + const String & bucket_dir); + +void extractPartitionValues(const String & partition_dir, std::unordered_map & partition_values); } diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp index 40e716a56da4..c709a5f24cca 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp +++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp @@ -16,12 +16,24 @@ */ #include "SparkMergeTreeWriter.h" -#include #include +#include #include #include -#include #include +#include +#include + + +namespace CurrentMetrics +{ +extern const Metric LocalThread; +extern const Metric LocalThreadActive; +extern const Metric LocalThreadScheduled; +extern const Metric GlobalThread; +extern const Metric GlobalThreadActive; +extern const Metric GlobalThreadScheduled; +} using namespace DB; @@ -42,6 +54,43 @@ Block removeColumnSuffix(const DB::Block & block) return Block(columns); } +SparkMergeTreeWriter::SparkMergeTreeWriter( + CustomStorageMergeTreePtr storage_, + const DB::StorageMetadataPtr & metadata_snapshot_, + const DB::ContextPtr & context_, + const String & uuid_, + const String & partition_dir_, + const String & bucket_dir_) + : storage(storage_) + , metadata_snapshot(metadata_snapshot_) + , context(context_) + , uuid(uuid_) + , partition_dir(partition_dir_) + , bucket_dir(bucket_dir_) + , thread_pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1, 1, 100000) +{ + const DB::Settings & settings = context->getSettingsRef(); + squashing_transform + = std::make_unique(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); + if (!partition_dir.empty()) + { + extractPartitionValues(partition_dir, partition_values); + } + header = metadata_snapshot->getSampleBlock(); + + Field is_merge; + if (context->getSettings().tryGet("mergetree.merge_after_insert", is_merge)) + merge_after_insert = is_merge.get(); + + Field limit_size_field; + if (context->getSettings().tryGet("optimize.minFileSize", limit_size_field)) + merge_min_size = limit_size_field.get() <= 0 ? merge_min_size : limit_size_field.get(); + + Field limit_cnt_field; + if (context->getSettings().tryGet("mergetree.max_num_part_per_merge_task", limit_cnt_field)) + merge_limit_parts = limit_cnt_field.get() <= 0 ? merge_limit_parts : limit_cnt_field.get(); +} + void SparkMergeTreeWriter::write(DB::Block & block) { auto new_block = removeColumnSuffix(block); @@ -52,11 +101,57 @@ void SparkMergeTreeWriter::write(DB::Block & block) do_convert.execute(new_block); } - auto blocks_with_partition = MergeTreeDataWriter::splitBlockIntoParts(squashing_transform->add(new_block), 10, metadata_snapshot, context); - for (auto & item : blocks_with_partition) + auto blocks_with_partition + = MergeTreeDataWriter::splitBlockIntoParts(squashing_transform->add(new_block), 10, metadata_snapshot, context); + for (auto & item : blocks_with_partition) + { + size_t before_write_memory = 0; + if (auto * memory_tracker = CurrentThread::getMemoryTracker()) { - new_parts.emplace_back(writeTempPartAndFinalize(item, metadata_snapshot).part); + CurrentThread::flushUntrackedMemory(); + before_write_memory = memory_tracker->get(); + } + + new_parts.emplace_back(writeTempPartAndFinalize(item, metadata_snapshot).part); part_num++; + manualFreeMemory(before_write_memory); + /// Reset earlier to free memory + item.block.clear(); + item.partition.clear(); + } + + if (!blocks_with_partition.empty() && merge_after_insert) + checkAndMerge(); +} + +void SparkMergeTreeWriter::manualFreeMemory(size_t before_write_memory) +{ + // If mergetree disk is not local fs, like remote fs s3 or hdfs, + // it may alloc memory in current thread, and free on global thread. + // Now, wo have not idea to clear global memory by used spark thread tracker. + // So we manually correct the memory usage. + auto disk = storage->getStoragePolicy()->getAnyDisk(); + if (!disk->isRemote()) + return; + + std::lock_guard lock(memory_mutex); + auto * memory_tracker = CurrentThread::getMemoryTracker(); + if (memory_tracker && CurrentMemoryTracker::before_free) + { + CurrentThread::flushUntrackedMemory(); + const size_t ch_alloc = memory_tracker->get(); + if (disk->getName().contains("s3") && context->getSettings().s3_allow_parallel_part_upload && ch_alloc > before_write_memory) + { + const size_t diff_ch_alloc = before_write_memory - ch_alloc; + memory_tracker->adjustWithUntrackedMemory(diff_ch_alloc); + } + + const size_t a = memory_tracker->get(); + const size_t spark_alloc = CurrentMemoryTracker::current_memory(); + const size_t diff_alloc = spark_alloc - memory_tracker->get(); + + if (diff_alloc > 0) + CurrentMemoryTracker::before_free(diff_alloc); } } @@ -67,7 +162,61 @@ void SparkMergeTreeWriter::finalize() { auto blocks_with_partition = MergeTreeDataWriter::splitBlockIntoParts(std::move(block), 10, metadata_snapshot, context); for (auto & item : blocks_with_partition) + { + size_t before_write_memory = 0; + if (auto * memory_tracker = CurrentThread::getMemoryTracker()) + { + CurrentThread::flushUntrackedMemory(); + before_write_memory = memory_tracker->get(); + } + new_parts.emplace_back(writeTempPartAndFinalize(item, metadata_snapshot).part); + part_num++; + manualFreeMemory(before_write_memory); + /// Reset earlier to free memory + item.block.clear(); + item.partition.clear(); + } + } + + SCOPE_EXIT({ + for (auto merge_tree_data_part : new_parts.unsafeGet()) + saveFileStatus( + *storage, context, merge_tree_data_part->name, const_cast(merge_tree_data_part->getDataPartStorage())); + }); + + if (!merge_after_insert) + return; + + // wait all merge task end and do final merge + thread_pool.wait(); + + size_t before_merge_size; + do + { + before_merge_size = new_parts.size(); + checkAndMerge(true); + thread_pool.wait(); + } while (before_merge_size != new_parts.size()); + + std::unordered_set final_parts; + for (auto merge_tree_data_part : new_parts.unsafeGet()) + final_parts.emplace(merge_tree_data_part->name); + + for (const auto & tmp_part : tmp_parts) + { + if (final_parts.contains(tmp_part)) + continue; + + GlobalThreadPool::instance().scheduleOrThrow( + [&]() -> void + { + for (auto disk : storage->getDisks()) + { + auto full_path = storage->getFullPathOnDisk(disk); + disk->removeRecursive(full_path + "/" + tmp_part); + } + }); } } @@ -76,16 +225,15 @@ SparkMergeTreeWriter::writeTempPartAndFinalize( DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot) { - auto temp_part = writeTempPart(block_with_partition, metadata_snapshot); + MergeTreeDataWriter::TemporaryPart temp_part; + writeTempPart(temp_part, block_with_partition, metadata_snapshot); temp_part.finalize(); - saveFileStatus(storage, context, temp_part.part->getDataPartStorage()); return temp_part; } -MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPart( +void SparkMergeTreeWriter::writeTempPart(MergeTreeDataWriter::TemporaryPart & temp_part, BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot) { - MergeTreeDataWriter::TemporaryPart temp_part; Block & block = block_with_partition.block; auto columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames()); @@ -95,7 +243,7 @@ MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPart( column.type = block.getByName(column.name).type; auto minmax_idx = std::make_shared(); - minmax_idx->update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey())); + minmax_idx->update(block, storage->getMinMaxColumnsNames(metadata_snapshot->getPartitionKey())); MergeTreePartition partition(block_with_partition.partition); @@ -121,13 +269,13 @@ MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPart( String part_name = part_dir; - temp_part.temporary_directory_lock = storage.getTemporaryPartDirectoryHolder(part_dir); + temp_part.temporary_directory_lock = storage->getTemporaryPartDirectoryHolder(part_dir); auto indices = MergeTreeIndexFactory::instance().getMany(metadata_snapshot->getSecondaryIndices()); /// If we need to calculate some columns to sort. if (metadata_snapshot->hasSortingKey() || metadata_snapshot->hasSecondaryIndices()) - storage.getSortingKeyAndSkipIndicesExpression(metadata_snapshot, indices)->execute(block); + storage->getSortingKeyAndSkipIndicesExpression(metadata_snapshot, indices)->execute(block); Names sort_columns = metadata_snapshot->getSortingKeyColumns(); SortDescription sort_description; @@ -157,19 +305,19 @@ MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPart( /// If optimize_on_insert is true, block may become empty after merge. /// There is no need to create empty part. if (expected_size == 0) - return temp_part; + return; - VolumePtr volume = storage.getStoragePolicy()->getVolume(0); + VolumePtr volume = storage->getStoragePolicy()->getVolume(0); VolumePtr data_part_volume = std::make_shared(volume->getName(), volume->getDisk(), volume->max_data_part_size); - auto new_data_part = storage.getDataPartBuilder(part_name, data_part_volume, part_dir) - .withPartFormat(storage.choosePartFormat(expected_size, block.rows())) + auto new_data_part = storage->getDataPartBuilder(part_name, data_part_volume, part_dir) + .withPartFormat(storage->choosePartFormat(expected_size, block.rows())) .withPartInfo(new_part_info) .build(); auto data_part_storage = new_data_part->getDataPartStoragePtr(); - const auto & data_settings = storage.getSettings(); + const auto & data_settings = storage->getSettings(); SerializationInfo::Settings settings{data_settings->ratio_of_defaults_for_sparse_serialization, true}; SerializationInfoByName infos(columns, settings); @@ -194,7 +342,7 @@ MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPart( data_part_storage->createDirectories(); - if (storage.getSettings()->fsync_part_directory) + if (storage->getSettings()->fsync_part_directory) { const auto disk = data_part_volume->getDisk(); sync_guard = disk->getDirectorySyncGuard(full_path); @@ -203,7 +351,7 @@ MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPart( /// This effectively chooses minimal compression method: /// either default lz4 or compression method with zero thresholds on absolute and relative part size. - auto compression_codec = storage.getContext()->chooseCompressionCodec(0, 0); + auto compression_codec = storage->getContext()->chooseCompressionCodec(0, 0); auto out = std::make_unique( new_data_part, @@ -218,21 +366,24 @@ MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPart( context->getWriteSettings()); out->writeWithPermutation(block, perm_ptr); - - auto finalizer = out->finalizePartAsync(new_data_part, data_settings->fsync_after_insert, nullptr, nullptr); temp_part.part = new_data_part; - temp_part.streams.emplace_back(MergeTreeDataWriter::TemporaryPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)}); - - return temp_part; + temp_part.streams.emplace_back( + MergeTreeDataWriter::TemporaryPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)}); } std::vector SparkMergeTreeWriter::getAllPartInfo() { std::vector res; - for (const MergeTreeDataPartPtr & part : new_parts) - res.emplace_back(PartInfo{part->name, part->getMarksCount(), part->getBytesOnDisk(), part->rows_count, partition_values, bucket_dir}); + res.reserve(new_parts.size()); + + for (auto part : new_parts.unsafeGet()) + { + res.emplace_back( + PartInfo{part->name, part->getMarksCount(), part->getBytesOnDisk(), part->rows_count, partition_values, bucket_dir}); + } + return res; } @@ -268,4 +419,90 @@ String SparkMergeTreeWriter::partInfosToJson(const std::vector & part_ return result.GetString(); } +void SparkMergeTreeWriter::checkAndMerge(bool force) +{ + // Only finalize should force merge. + if (!force && new_parts.size() < merge_limit_parts) + return; + + auto doTask = [this]( + const ThreadGroupPtr & thread_group, + const std::vector prepare_merge_parts, + CustomStorageMergeTreePtr & storage, + String & partition_dir, + String & bucket_dir) -> std::vector + { + setThreadName("InsertWithMerge"); + ThreadStatus thread_status; + thread_status.attachToGroup(thread_group); + + size_t before_size = 0; + size_t after_size = 0; + for (const auto & prepare_merge_part : prepare_merge_parts) + before_size += prepare_merge_part->getBytesOnDisk(); + + std::unordered_map partition_values; + auto merged_parts + = mergeParts(prepare_merge_parts, partition_values, toString(UUIDHelpers::generateV4()), storage, partition_dir, bucket_dir); + for (const auto & merge_tree_data_part : merged_parts) + after_size += merge_tree_data_part->getBytesOnDisk(); + + LOG_DEBUG( + &Poco::Logger::get("SparkMergeTreeWriter"), + "Mergetree merge on insert finished, before merge part size {}, part count {}, after part size {}, part count {}.", + before_size, + prepare_merge_parts.size(), + after_size, + merged_parts.size()); + + return merged_parts; + }; + + std::vector selected_parts; + selected_parts.reserve(merge_limit_parts); + size_t totol_size = 0; + std::vector skip_parts; + + while (const auto merge_tree_data_part_option = new_parts.pop_front()) + { + auto merge_tree_data_part = merge_tree_data_part_option.value(); + if (merge_tree_data_part->getBytesOnDisk() >= merge_min_size) + { + skip_parts.emplace_back(merge_tree_data_part); + continue; + } + + selected_parts.emplace_back(merge_tree_data_part); + totol_size += merge_tree_data_part->getBytesOnDisk(); + if (merge_min_size > totol_size && merge_limit_parts > selected_parts.size()) + continue; + + for (auto selected_part : selected_parts) + { + tmp_parts.emplace(selected_part->name); + } + + thread_pool.scheduleOrThrow([this, doTask, selected_parts, thread_group = CurrentThread::getGroup()]() -> void + { new_parts.emplace_back(doTask(thread_group, selected_parts, storage, partition_dir, bucket_dir)); }); + selected_parts.clear(); + totol_size = 0; + } + + if (!selected_parts.empty()) + { + if (force && selected_parts.size() > 1) + { + for (auto selected_part : selected_parts) + tmp_parts.emplace(selected_part->name); + thread_pool.scheduleOrThrow( + [this, doTask, selected_parts, thread_group = CurrentThread::getGroup()]() -> void + { new_parts.emplace_back(doTask(thread_group, selected_parts, storage, partition_dir, bucket_dir)); }); + } + else + new_parts.emplace_back(selected_parts); + } + + new_parts.emplace_back(skip_parts); +} + } \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h index 000d009fe068..5251d4cc447a 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h +++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h @@ -19,7 +19,9 @@ #include #include #include +#include #include +#include namespace DB { @@ -40,6 +42,8 @@ struct PartInfo size_t row_count; std::unordered_map partition_values; String bucket_id; + + bool operator<(const PartInfo & rhs) const { return disk_size < rhs.disk_size; } }; class SparkMergeTreeWriter @@ -47,56 +51,44 @@ class SparkMergeTreeWriter public: static String partInfosToJson(const std::vector & part_infos); SparkMergeTreeWriter( - DB::MergeTreeData & storage_, + CustomStorageMergeTreePtr storage_, const DB::StorageMetadataPtr & metadata_snapshot_, const DB::ContextPtr & context_, const String & uuid_, const String & partition_dir_ = "", - const String & bucket_dir_ = "") - : storage(storage_) - , metadata_snapshot(metadata_snapshot_) - , context(context_) - , uuid(uuid_) - , partition_dir(partition_dir_) - , bucket_dir(bucket_dir_) - { - const DB::Settings & settings = context->getSettingsRef(); - squashing_transform - = std::make_unique(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); - if (!partition_dir.empty()) - { - Poco::StringTokenizer partitions(partition_dir, "/"); - for (const auto & partition : partitions) - { - Poco::StringTokenizer key_value(partition, "="); - chassert(key_value.count() == 2); - partition_values.emplace(key_value[0], key_value[1]); - } - } - header = metadata_snapshot->getSampleBlock(); - } + const String & bucket_dir_ = ""); void write(DB::Block & block); void finalize(); std::vector getAllPartInfo(); private: - DB::MergeTreeDataWriter::TemporaryPart - writeTempPart(DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot); + void + writeTempPart(MergeTreeDataWriter::TemporaryPart & temp_part, DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot); DB::MergeTreeDataWriter::TemporaryPart writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot); + void checkAndMerge(bool force = false); + void safeEmplaceBackPart(DB::MergeTreeDataPartPtr); + void safeAddPart(DB::MergeTreeDataPartPtr); + void manualFreeMemory(size_t before_write_memory); String uuid; String partition_dir; String bucket_dir; - DB::MergeTreeData & storage; + CustomStorageMergeTreePtr storage; DB::StorageMetadataPtr metadata_snapshot; DB::ContextPtr context; std::unique_ptr squashing_transform; int part_num = 1; - std::vector new_parts; + ConcurrentDeque new_parts; std::unordered_map partition_values; + std::unordered_set tmp_parts; DB::Block header; + bool merge_after_insert; + FreeThreadPool thread_pool; + size_t merge_min_size = 1024 * 1024 * 1024; + size_t merge_limit_parts = 10; + std::mutex memory_mutex; }; } diff --git a/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp b/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp index b5c5f8cd0157..65b29c2a2d1c 100644 --- a/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp +++ b/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp @@ -24,6 +24,7 @@ jclass ReservationListenerWrapper::reservation_listener_class = nullptr; jmethodID ReservationListenerWrapper::reservation_listener_reserve = nullptr; jmethodID ReservationListenerWrapper::reservation_listener_reserve_or_throw = nullptr; jmethodID ReservationListenerWrapper::reservation_listener_unreserve = nullptr; +jmethodID ReservationListenerWrapper::reservation_listener_currentMemory = nullptr; ReservationListenerWrapper::ReservationListenerWrapper(jobject listener_) : listener(listener_) { @@ -56,4 +57,12 @@ void ReservationListenerWrapper::free(int64_t size) safeCallVoidMethod(env, listener, reservation_listener_unreserve, size); CLEAN_JNIENV } + +size_t ReservationListenerWrapper::currentMemory() +{ + GET_JNIENV(env) + int64_t res = safeCallLongMethod(env, listener, reservation_listener_currentMemory); + return res; + CLEAN_JNIENV +} } diff --git a/cpp-ch/local-engine/jni/ReservationListenerWrapper.h b/cpp-ch/local-engine/jni/ReservationListenerWrapper.h index 93efd497db95..1dfb3671f21b 100644 --- a/cpp-ch/local-engine/jni/ReservationListenerWrapper.h +++ b/cpp-ch/local-engine/jni/ReservationListenerWrapper.h @@ -28,12 +28,16 @@ class ReservationListenerWrapper static jmethodID reservation_listener_reserve; static jmethodID reservation_listener_reserve_or_throw; static jmethodID reservation_listener_unreserve; + static jmethodID reservation_listener_currentMemory; explicit ReservationListenerWrapper(jobject listener); ~ReservationListenerWrapper(); void reserve(int64_t size); void reserveOrThrow(int64_t size); void free(int64_t size); + size_t currentMemory(); + + private: jobject listener; diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 34b81d7a1591..7baad210e0f3 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -186,6 +186,9 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/) = local_engine::GetMethodID(env, local_engine::ReservationListenerWrapper::reservation_listener_class, "reserveOrThrow", "(J)V"); local_engine::ReservationListenerWrapper::reservation_listener_unreserve = local_engine::GetMethodID(env, local_engine::ReservationListenerWrapper::reservation_listener_class, "unreserve", "(J)J"); + local_engine::ReservationListenerWrapper::reservation_listener_currentMemory + = local_engine::GetMethodID(env, local_engine::ReservationListenerWrapper::reservation_listener_class, "currentMemory", "()J"); + native_metrics_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/metrics/NativeMetrics;"); native_metrics_constructor = local_engine::GetMethodID(env, native_metrics_class, "", "(Ljava/lang/String;)V"); @@ -997,9 +1000,26 @@ JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW } JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeInitMergeTreeWriterWrapper( - JNIEnv * env, jobject, jbyteArray plan_, jbyteArray split_info_, jstring uuid_, jstring task_id_, jstring partition_dir_, jstring bucket_dir_) + JNIEnv * env, + jobject, + jbyteArray plan_, + jbyteArray split_info_, + jstring uuid_, + jstring task_id_, + jstring partition_dir_, + jstring bucket_dir_, + jbyteArray conf_plan, + jlong allocator_id) { LOCAL_ENGINE_JNI_METHOD_START + auto query_context = local_engine::getAllocator(allocator_id)->query_context; + // by task update new configs ( in case of dynamic config update ) + jsize conf_plan_buf_size = env->GetArrayLength(conf_plan); + jbyte * conf_plan_buf_addr = env->GetByteArrayElements(conf_plan, nullptr); + std::string conf_plan_str; + conf_plan_str.assign(reinterpret_cast(conf_plan_buf_addr), conf_plan_buf_size); + local_engine::BackendInitializerUtil::updateConfig(query_context, &conf_plan_str); + const auto uuid_str = jstring2string(env, uuid_); const auto task_id = jstring2string(env, task_id_); const auto partition_dir = jstring2string(env, partition_dir_); @@ -1035,10 +1055,11 @@ JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW extension_table, local_engine::SerializedPlanParser::global_context); auto uuid = uuid_str + "_" + task_id; auto * writer = new local_engine::SparkMergeTreeWriter( - *storage, storage->getInMemoryMetadataPtr(), local_engine::SerializedPlanParser::global_context, uuid, partition_dir, bucket_dir); + storage, storage->getInMemoryMetadataPtr(), query_context, uuid, partition_dir, bucket_dir); env->ReleaseByteArrayElements(plan_, plan_buf_addr, JNI_ABORT); env->ReleaseByteArrayElements(split_info_, split_info_addr, JNI_ABORT); + env->ReleaseByteArrayElements(conf_plan, conf_plan_buf_addr, JNI_ABORT); return reinterpret_cast(writer); LOCAL_ENGINE_JNI_METHOD_END(env, 0) } @@ -1180,53 +1201,20 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn auto storage_factory = local_engine::StorageMergeTreeFactory::instance(); std::vector selected_parts = storage_factory.getDataParts(table_id, merge_tree_table.snapshot_id, merge_tree_table.getPartNames()); - auto future_part = std::make_shared(); - future_part->uuid = DB::UUIDHelpers::generateV4(); - - future_part->assign(std::move(selected_parts)); - - future_part->name = ""; std::unordered_map partition_values; - if(!partition_dir.empty()) - { - future_part->name = partition_dir + "/"; - Poco::StringTokenizer partitions(partition_dir, "/"); - for (const auto & partition : partitions) - { - Poco::StringTokenizer key_value(partition, "="); - chassert(key_value.count() == 2); - partition_values.emplace(key_value[0], key_value[1]); - } - } - if(!bucket_dir.empty()) - { - future_part->name = future_part->name + bucket_dir + "/"; - } - future_part->name = future_part->name + uuid_str + "-merged"; - - auto entry = std::make_shared(future_part, DB::CurrentlyMergingPartsTaggerPtr{}, std::make_shared()); - - - // Copying a vector of columns `deduplicate by columns. - DB::IExecutableTask::TaskResultCallback f = [](bool) {}; - auto task = std::make_shared( - *storage, storage->getInMemoryMetadataPtr(), false, std::vector{}, false, entry, - DB::TableLockHolder{}, f); - - task->setCurrentTransaction(DB::MergeTreeTransactionHolder{}, DB::MergeTreeTransactionPtr{}); - - executeHere(task); + std::vector loaded = + local_engine::mergeParts(selected_parts, partition_values, uuid_str, storage, partition_dir, bucket_dir); - std::unordered_set to_load{future_part->name}; - std::vector> loaded = storage->loadDataPartsWithNames(to_load); std::vector res; for (auto & partPtr : loaded) { - local_engine::saveFileStatus(*storage, local_engine::SerializedPlanParser::global_context, partPtr->getDataPartStorage()); - res.emplace_back( - local_engine::PartInfo{partPtr->name, partPtr->getMarksCount(), partPtr->getBytesOnDisk(), partPtr->rows_count, - /*partition_value*/ partition_values, - bucket_dir}); + saveFileStatus( + *storage, + local_engine::SerializedPlanParser::global_context, + partPtr->name, + const_cast(partPtr->getDataPartStorage())); + res.emplace_back(local_engine::PartInfo{ + partPtr->name, partPtr->getMarksCount(), partPtr->getBytesOnDisk(), partPtr->rows_count, partition_values, bucket_dir}); } auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(res);