diff --git a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index cd3ce793747c..6c7b2ab87664 100644 --- a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -39,6 +39,7 @@ import org.apache.spark.util.SerializableConfiguration import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.fs.Path +import org.apache.spark.sql.delta.perf.DeltaOptimizedWriterExec import scala.collection.mutable.ListBuffer @@ -69,20 +70,22 @@ class ClickhouseOptimisticTransaction( additionalConstraints: Seq[Constraint]): Seq[FileAction] = { val nativeWrite = GlutenConfig.getConf.enableNativeWriter.getOrElse(false) if (writingMergeTree) { - if (isOptimize) { - throw new UnsupportedOperationException("Optimize is not supported for ClickHouse") - } // TODO: update FallbackByBackendSettings for mergetree always return true val onePipeline = nativeWrite && CHConf.get.enableOnePipelineMergeTreeWrite if (onePipeline) - pipelineWriteFiles(inputData, writeOptions, isOptimize = false, additionalConstraints) - else + pipelineWriteFiles(inputData, writeOptions, isOptimize, additionalConstraints) + else { + if (isOptimize) { + throw new UnsupportedOperationException( + "Optimize is only supported in one pipeline native write mode") + } writeMergeTree(inputData, writeOptions, additionalConstraints) + } } else { - if (isOptimize || !nativeWrite) { - super.writeFiles(inputData, writeOptions, isOptimize, additionalConstraints) + if (nativeWrite) { + pipelineWriteFiles(inputData, writeOptions, isOptimize, additionalConstraints) } else { - pipelineWriteFiles(inputData, writeOptions, isOptimize = false, additionalConstraints) + super.writeFiles(inputData, writeOptions, isOptimize, additionalConstraints) } } } @@ -217,6 +220,19 @@ class ClickhouseOptimisticTransaction( tableV2.tableName) } + /** + * Writes out the dataframe in pipeline mode after performing schema validation.Returns a list of + * actions to append these files to the reservoir. + * + * @param inputData + * Data to write out. + * @param writeOptions + * Options to decide how to write out the data. + * @param isOptimize + * Whether the operation writing this is Optimize or not. + * @param additionalConstraints + * Additional constraints on the write. + */ private def pipelineWriteFiles( inputData: Dataset[_], writeOptions: Option[DeltaOptions], @@ -232,21 +248,13 @@ class ClickhouseOptimisticTransaction( normalizeData(deltaLog, writeOptions, data) val partitioningColumns = getPartitioningColumns(partitionSchema, output) - val fileFormat = deltaLog.fileFormat(protocol, metadata) // TODO support changing formats. - - val (committer, collectStats) = fileFormat.toString match { - case "MergeTree" => (getCommitter2(outputPath), false) - case _ => (getCommitter(outputPath), true) - } + val committer = if (writingMergeTree) getCommitter2(outputPath) else getCommitter(outputPath) // If Statistics Collection is enabled, then create a stats tracker that will be injected during // the FileFormatWriter.write call below and will collect per-file stats using // StatisticsCollection - val (optionalStatsTracker, _) = if (collectStats) { + val (optionalStatsTracker, _) = getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data) - } else { - (None, None) - } val constraints = Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints @@ -259,18 +267,18 @@ class ClickhouseOptimisticTransaction( // TODO: val checkInvariants = DeltaInvariantCheckerExec(empty2NullPlan, constraints) val checkInvariants = empty2NullPlan + // TODO: DeltaOptimizedWriterExec // No need to plan optimized write if the write command is OPTIMIZE, which aims to produce // evenly-balanced data files already. - // TODO: val physicalPlan = - // if ( - // !isOptimize && - // shouldOptimizeWrite(writeOptions, spark.sessionState.conf) - // ) { - // DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns, deltaLog) - // } else { - // checkInvariants - // } - val physicalPlan = checkInvariants + val physicalPlan = + if ( + !isOptimize && + shouldOptimizeWrite(writeOptions, spark.sessionState.conf) + ) { + DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns, deltaLog) + } else { + checkInvariants + } val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer() @@ -296,6 +304,7 @@ class ClickhouseOptimisticTransaction( }.toMap }) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> writePartitionColumns.toString) + val fileFormat = deltaLog.fileFormat(protocol, metadata) // TODO support changing formats. val executedPlan = DeltaV1Writes( spark, physicalPlan, diff --git a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala index 439111df1b1c..77313399ff7a 100644 --- a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala +++ b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala @@ -39,21 +39,12 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode} import org.apache.spark.sql.execution.command.RunnableCommand -import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig -import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric import org.apache.spark.sql.types._ import org.apache.spark.util.{SystemClock, ThreadUtils} -/** - * Gluten overwrite Delta: - * - * This file is copied from Delta 3.2.1. It is modified in: - * 1. getDeltaTable supports to get ClickHouseTableV2 - * 2. runOptimizeBinJobClickhouse - * 3. groupFilesIntoBinsClickhouse - */ +// TODO: Remove this file once we needn't support bucket /** Base class defining abstract optimize command */ abstract class OptimizeTableCommandBase extends RunnableCommand with DeltaCommand { @@ -152,10 +143,7 @@ case class OptimizeTableCommand( copy(child = newChild)(zOrderBy) override def run(sparkSession: SparkSession): Seq[Row] = { - // --- modified start - val table = OptimizeTableCommandOverwrites.getDeltaTable(child, "OPTIMIZE") - // --- modified end - + val table = getDeltaTable(child, "OPTIMIZE") val txn = table.startTransaction() if (txn.readVersion == -1) { throw DeltaErrors.notADeltaTableException(table.deltaLog.dataPath.toString) @@ -268,12 +256,6 @@ class OptimizeExecutor( def optimize(): Seq[Row] = { recordDeltaOperation(txn.deltaLog, "delta.optimize") { - - // --- modified start - val isMergeTreeFormat = ClickHouseConfig - .isMergeTreeFormatEngine(txn.deltaLog.unsafeVolatileMetadata.configuration) - // --- modified end - val minFileSize = optimizeContext.minFileSize.getOrElse( sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE)) val maxFileSize = optimizeContext.maxFileSize.getOrElse( @@ -288,39 +270,15 @@ class OptimizeExecutor( case Some(reorgOperation) => reorgOperation.filterFilesToReorg(txn.snapshot, candidateFiles) case None => filterCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles) } - // --- modified start - val maxThreads = - sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS) - val (updates, jobs) = if (isMergeTreeFormat) { - val partitionsToCompact = filesToProcess - .groupBy(file => (file.asInstanceOf[AddMergeTreeParts].bucketNum, file.partitionValues)) - .toSeq - val jobs = OptimizeTableCommandOverwrites - .groupFilesIntoBinsClickhouse(partitionsToCompact, maxFileSize) - val updates = ThreadUtils.parmap(jobs, "OptimizeJob", maxThreads) { - partitionBinGroup => - // --- modified start - OptimizeTableCommandOverwrites.runOptimizeBinJobClickhouse( - txn, - partitionBinGroup._1._2, - partitionBinGroup._1._1, - partitionBinGroup._2, - maxFileSize) - // --- modified end - }.flatten - // uniform the jobs type - (updates, jobs.map(v => (v._1._2 ++ Map("bucketNum" -> v._1.toString()), v._2))) - } else { - val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq + val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq - val jobs = groupFilesIntoBins(partitionsToCompact) + val jobs = groupFilesIntoBins(partitionsToCompact) - val updates = ThreadUtils.parmap(jobs, "OptimizeJob", maxThreads) { partitionBinGroup => - runOptimizeBinJob(txn, partitionBinGroup._1, partitionBinGroup._2, maxFileSize) - }.flatten - (updates, jobs) - } - // --- modified end + val maxThreads = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS) + val updates = ThreadUtils.parmap(jobs, "OptimizeJob", maxThreads) { partitionBinGroup => + runOptimizeBinJob(txn, partitionBinGroup._1, partitionBinGroup._2, maxFileSize) + }.flatten val addedFiles = updates.collect { case a: AddFile => a } val removedFiles = updates.collect { case r: RemoveFile => r } diff --git a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/FileDeltaColumnarWrite.scala b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/FileDeltaColumnarWrite.scala index df7ef7e23409..b02eed88e672 100644 --- a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/FileDeltaColumnarWrite.scala +++ b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/FileDeltaColumnarWrite.scala @@ -26,9 +26,8 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Projection, UnsafeProjection} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, DeclarativeAggregate} import org.apache.spark.sql.delta.files.{FileDelayedCommitProtocol, MergeTreeDelayedCommitProtocol2} -import org.apache.spark.sql.delta.stats.DeltaFileStatistics +import org.apache.spark.sql.delta.stats.{DeltaFileStatistics, DeltaJobStatisticsTracker} import org.apache.spark.sql.execution.datasources.{ExecutedWriteSummary, WriteJobDescription, WriteTaskResult} -import org.apache.spark.sql.types.StringType import org.apache.spark.util.Utils import scala.collection.mutable.ArrayBuffer @@ -38,7 +37,7 @@ case class DeltaFileCommitInfo(committer: FileDelayedCommitProtocol) val addedFiles: ArrayBuffer[(Map[String, String], String)] = new ArrayBuffer[(Map[String, String], String)] override def apply(stat: NativeFileWriteResult): Unit = { - if (stat.partition_id == "__NO_PARTITION_ID__") { + if (stat.partition_id == CHColumnarWrite.EMPTY_PARTITION_ID) { addedFiles.append((Map.empty[String, String], stat.filename)) } else { val partitionValues = committer.parsePartitions(stat.partition_id) @@ -61,14 +60,15 @@ case class NativeDeltaStats(projection: Projection) extends (InternalRow => Unit def result: DeltaFileStatistics = DeltaFileStatistics(results.toMap) } -case class FileDeltaColumnarWrite( - override val jobTrackerID: String, - override val description: WriteJobDescription, - override val committer: FileDelayedCommitProtocol) - extends CHColumnarWrite[FileDelayedCommitProtocol] - with Logging { - private lazy val nativeDeltaStats: Option[NativeDeltaStats] = { +trait SupportNativeDeltaStats[T <: FileCommitProtocol] extends CHColumnarWrite[T] { + + private lazy val deltaWriteJobStatsTracker: Option[DeltaJobStatisticsTracker] = + description.statsTrackers + .find(_.isInstanceOf[DeltaJobStatisticsTracker]) + .map(_.asInstanceOf[DeltaJobStatisticsTracker]) + + lazy val nativeDeltaStats: Option[NativeDeltaStats] = { deltaWriteJobStatsTracker .map( delta => { @@ -77,10 +77,7 @@ case class FileDeltaColumnarWrite( if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] => ae.aggregateFunction.asInstanceOf[DeclarativeAggregate].evaluateExpression } - val z = Seq( - AttributeReference("filename", StringType, nullable = false)(), - AttributeReference("partition_id", StringType, nullable = false)()) - val s = + val vanillaSchema = delta.statsColExpr .collect { case ae: AggregateExpression @@ -92,10 +89,24 @@ case class FileDeltaColumnarWrite( NativeDeltaStats( UnsafeProjection.create( exprs = Seq(r), - inputSchema = z :++ s + inputSchema = nativeStatsSchema(vanillaSchema) )) }) } + + def nativeStatsSchema(vanilla: Seq[AttributeReference]): Seq[AttributeReference] +} + +case class FileDeltaColumnarWrite( + override val jobTrackerID: String, + override val description: WriteJobDescription, + override val committer: FileDelayedCommitProtocol) + extends SupportNativeDeltaStats[FileDelayedCommitProtocol] + with Logging { + + override def nativeStatsSchema(vanilla: Seq[AttributeReference]): Seq[AttributeReference] = + NativeFileWriteResult.nativeStatsSchema(vanilla) + override def doSetupNativeTask(): Unit = { assert(description.path == committer.outputPath) val nameSpec = CreateFileNameSpec(taskAttemptContext, description) diff --git a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/MergeTreeDeltaColumnarWrite.scala b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/MergeTreeDeltaColumnarWrite.scala index 3ac9d4c305e7..df33fde66c06 100644 --- a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/MergeTreeDeltaColumnarWrite.scala +++ b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/MergeTreeDeltaColumnarWrite.scala @@ -22,13 +22,14 @@ import org.apache.gluten.vectorized.NativeExpressionEvaluator import org.apache.spark.internal.Logging import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericInternalRow} import org.apache.spark.sql.delta.actions.{AddFile, FileAction} import org.apache.spark.sql.delta.files.MergeTreeDelayedCommitProtocol2 import org.apache.spark.sql.delta.stats.DeltaStatistics import org.apache.spark.sql.delta.util.{JsonUtils, MergeTreePartitionUtils} import org.apache.spark.sql.execution.datasources.{BasicWriteTaskStats, ExecutedWriteSummary, WriteJobDescription, WriteTaskResult} import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta +import org.apache.spark.sql.types.{LongType, StringType} import org.apache.spark.util.Utils import org.apache.hadoop.fs.Path @@ -39,17 +40,6 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.language.implicitConversions -/** - * {{{ - * val schema = - * StructType( - * StructField("part_name", StringType, false) :: - * StructField("partition_id", StringType, false) :: - * StructField("record_count", LongType, false) :: - * StructField("marks_count", LongType, false) :: - * StructField("size_in_bytes", LongType, false) :: Nil) - * }}} - */ case class MergeTreeWriteResult( part_name: String, partition_id: String, @@ -62,10 +52,10 @@ case class MergeTreeWriteResult( path: Path, modificationTime: Long, hostName: Seq[String]): FileAction = { - val partitionValues = if (partition_id == "__NO_PARTITION_ID__") { - Map.empty[String, String] + val (partitionValues, part_path) = if (partition_id == CHColumnarWrite.EMPTY_PARTITION_ID) { + (Map.empty[String, String], part_name) } else { - MergeTreePartitionUtils.parsePartitions(partition_id) + (MergeTreePartitionUtils.parsePartitions(partition_id), s"$partition_id/$part_name") } val tags = Map[String, String]( "database" -> database, @@ -98,7 +88,7 @@ case class MergeTreeWriteResult( DeltaStatistics.NULL_COUNT -> "" ) AddFile( - part_name, + part_path, partitionValues, size_in_bytes, modificationTime, @@ -115,6 +105,32 @@ object MergeTreeWriteResult { row.getLong(2), row.getLong(3), row.getLong(4)) + + /** + * {{{ + * val schema = + * StructType( + * StructField("part_name", StringType, false) :: + * StructField("partition_id", StringType, false) :: + * StructField("record_count", LongType, false) :: <= overlap with vanilla => + * StructField("marks_count", LongType, false) :: + * StructField("size_in_bytes", LongType, false) :: + * min... + * max... + * null_count...) + * }}} + */ + private val inputBasedSchema: Seq[AttributeReference] = Seq( + AttributeReference("part_name", StringType, nullable = false)(), + AttributeReference("partition_id", StringType, nullable = false)(), + AttributeReference("record_count", LongType, nullable = false)(), + AttributeReference("marks_count", LongType, nullable = false)(), + AttributeReference("size_in_bytes", LongType, nullable = false)() + ) + + def nativeStatsSchema(vanilla: Seq[AttributeReference]): Seq[AttributeReference] = { + inputBasedSchema.take(2) ++ vanilla.take(1) ++ inputBasedSchema.drop(3) ++ vanilla.drop(1) + } } case class MergeTreeCommitInfo(committer: MergeTreeDelayedCommitProtocol2) @@ -137,7 +153,7 @@ case class MergeTreeBasicWriteTaskStatsTracker() extends (MergeTreeWriteResult = private var numFiles: Int = 0 def apply(stat: MergeTreeWriteResult): Unit = { - if (stat.partition_id != "__NO_PARTITION_ID__") { + if (stat.partition_id != CHColumnarWrite.EMPTY_PARTITION_ID) { partitions.append(new GenericInternalRow(Array[Any](stat.partition_id))) } numFiles += 1 @@ -153,13 +169,19 @@ case class MergeTreeDeltaColumnarWrite( override val jobTrackerID: String, override val description: WriteJobDescription, override val committer: MergeTreeDelayedCommitProtocol2) - extends CHColumnarWrite[MergeTreeDelayedCommitProtocol2] + extends SupportNativeDeltaStats[MergeTreeDelayedCommitProtocol2] with Logging { + + override def nativeStatsSchema(vanilla: Seq[AttributeReference]): Seq[AttributeReference] = + MergeTreeWriteResult.nativeStatsSchema(vanilla) + override def doSetupNativeTask(): Unit = { assert(description.path == committer.outputPath) val writePath = StorageMeta.normalizeRelativePath(committer.outputPath) val split = taskAttemptContext.getTaskAttemptID.getTaskID.getId - val partPrefixWithoutPartitionAndBucket = s"${UUID.randomUUID.toString}_$split" + val partPrefixWithoutPartitionAndBucket = + if (description.partitionColumns.isEmpty) s"${UUID.randomUUID.toString}_$split" + else s"_$split" logDebug( s"Pipeline write path: $writePath with part prefix: $partPrefixWithoutPartitionAndBucket") val settings = @@ -176,12 +198,14 @@ case class MergeTreeDeltaColumnarWrite( val commitInfo = MergeTreeCommitInfo(committer) val mergeTreeStat = MergeTreeBasicWriteTaskStatsTracker() val basicNativeStats = Seq(commitInfo, mergeTreeStat) - NativeStatCompute(stats)(basicNativeStats) + NativeStatCompute(stats)(basicNativeStats, nativeDeltaStats) Some { WriteTaskResult( new TaskCommitMessage(commitInfo.result), - ExecutedWriteSummary(updatedPartitions = Set.empty, stats = Seq(mergeTreeStat.result)) + ExecutedWriteSummary( + updatedPartitions = Set.empty, + stats = nativeDeltaStats.map(_.result).toSeq ++ Seq(mergeTreeStat.result)) ) } } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeConfig.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeConfig.scala index 055c3b9d87b8..d381ba30ad83 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeConfig.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeConfig.scala @@ -19,7 +19,7 @@ package org.apache.gluten.backendsapi.clickhouse import org.apache.spark.sql.internal.SQLConf object RuntimeConfig { - import CHConf._ + import CHConf.runtimeConfig import SQLConf._ /** Clickhouse Configuration */ diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala index c2747cf1eb53..1031778cd105 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala @@ -20,9 +20,19 @@ import org.apache.spark.sql.internal.SQLConf object RuntimeSettings { - import CHConf._ + import CHConf.runtimeSettings import SQLConf._ + /** Clickhouse settings */ + // scalastyle:off line.size.limit + val MIN_INSERT_BLOCK_SIZE_ROWS = + buildConf(runtimeSettings("min_insert_block_size_rows")) + .doc("https://clickhouse.com/docs/en/operations/settings/settings#min_insert_block_size_rows") + .longConf + .createWithDefault(1048449) + // scalastyle:on line.size.limit + + /** Gluten Configuration */ val NATIVE_WRITE_RESERVE_PARTITION_COLUMNS = buildConf(runtimeSettings("gluten.write.reserve_partition_columns")) .doc("Whether reserve partition columns for Native write or not, default is false") diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/MergeTreeConf.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/MergeTreeConf.scala new file mode 100644 index 000000000000..b759f8ae5502 --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/MergeTreeConf.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.gluten.backendsapi.clickhouse.CHConf + +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.sql.internal.SQLConf + +/** [[CHConf]] entries for MergeTree features. */ +trait MergeTreeConfBase { + + private val CH_PREFIX: String = CHConf.runtimeSettings("merge_tree") + private val GLUTEN_PREFIX: String = CHConf.runtimeSettings("mergetree") + + private def buildCHConf(key: String): ConfigBuilder = SQLConf.buildConf(s"$CH_PREFIX.$key") + private def buildGLUTENConf(key: String): ConfigBuilder = + SQLConf.buildConf(s"$GLUTEN_PREFIX.$key") + + // scalastyle:off line.size.limit + val ASSIGN_PART_UUIDS = + buildCHConf("assign_part_uuids") + .doc(s""" Used in UT for compatibility with old compaction algorithm. + | https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree#virtual-columns + |""".stripMargin) + .booleanConf + .createWithDefault(false) + + val MIN_ROWS_FOR_WIDE_PART = + buildCHConf("min_rows_for_wide_part") + .doc(s""" Used in UT for compatibility with old compaction algorithm. + | https://clickhouse.com/docs/en/operations/settings/merge-tree-settings#min_bytes_for_wide_part + |""".stripMargin) + .longConf + .createWithDefault(0) + // scalastyle:on line.size.limit + + val OPTIMIZE_TASK = + buildGLUTENConf("optimize_task") + .doc(s""" Used in UT for compatibility with old compaction algorithm. + | This flag is used to notify pipeline that the current running task + | is an MergeTree optimize task. + | """.stripMargin) + .booleanConf + .createWithDefault(false) +} + +object MergeTreeConf extends MergeTreeConfBase diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala index 427db0aad2b5..87fc50ef3ac4 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala @@ -24,9 +24,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec, HadoopMapReduceCommitProtocol} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.delta.stats.DeltaJobStatisticsTracker -import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericInternalRow} +import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, BasicWriteTaskStats, ExecutedWriteSummary, PartitioningUtils, WriteJobDescription, WriteTaskResult, WriteTaskStatsTracker} +import org.apache.spark.sql.types.{LongType, StringType} import org.apache.spark.util.Utils import org.apache.hadoop.fs.Path @@ -61,11 +61,6 @@ trait CHColumnarWrite[T <: FileCommitProtocol] { .map(_.newTaskInstance()) .get - lazy val deltaWriteJobStatsTracker: Option[DeltaJobStatisticsTracker] = - description.statsTrackers - .find(_.isInstanceOf[DeltaJobStatisticsTracker]) - .map(_.asInstanceOf[DeltaJobStatisticsTracker]) - lazy val (taskAttemptContext: TaskAttemptContext, jobId: String) = { // Copied from `SparkHadoopWriterUtils.createJobID` to be compatible with multi-version def createJobID(jobTrackerID: String, id: Int): JobID = { @@ -162,17 +157,8 @@ case class HadoopMapReduceAdapter(sparkCommitter: HadoopMapReduceCommitProtocol) } } -/** - * {{{ - * val schema = - * StructType( - * StructField("filename", StringType, false) :: - * StructField("partition_id", StringType, false) :: - * StructField("record_count", LongType, false) :: Nil) - * }}} - */ case class NativeFileWriteResult(filename: String, partition_id: String, record_count: Long) { - lazy val relativePath: String = if (partition_id == "__NO_PARTITION_ID__") { + lazy val relativePath: String = if (partition_id == CHColumnarWrite.EMPTY_PARTITION_ID) { filename } else { s"$partition_id/$filename" @@ -183,6 +169,28 @@ object NativeFileWriteResult { implicit def apply(row: InternalRow): NativeFileWriteResult = { NativeFileWriteResult(row.getString(0), row.getString(1), row.getLong(2)) } + + /** + * {{{ + * val schema = + * StructType( + * StructField("filename", StringType, false) :: + * StructField("partition_id", StringType, false) :: + * StructField("record_count", LongType, false) :: <= overlap with vanilla => + * min... + * max... + * null_count...) + * }}} + */ + private val inputBasedSchema: Seq[AttributeReference] = Seq( + AttributeReference("filename", StringType, nullable = false)(), + AttributeReference("partittion_id", StringType, nullable = false)(), + AttributeReference("record_count", LongType, nullable = false)() + ) + + def nativeStatsSchema(vanilla: Seq[AttributeReference]): Seq[AttributeReference] = { + inputBasedSchema.take(2) ++ vanilla + } } case class NativeStatCompute(rows: Seq[InternalRow]) { @@ -204,7 +212,7 @@ case class NativeBasicWriteTaskStatsTracker( private var numWrittenRows: Long = 0 override def apply(stat: NativeFileWriteResult): Unit = { val absolutePath = s"$writeDir/${stat.relativePath}" - if (stat.partition_id != "__NO_PARTITION_ID__") { + if (stat.partition_id != CHColumnarWrite.EMPTY_PARTITION_ID) { basicWriteJobStatsTracker.newPartition(new GenericInternalRow(Array[Any](stat.partition_id))) } basicWriteJobStatsTracker.newFile(absolutePath) @@ -225,7 +233,7 @@ case class FileCommitInfo(description: WriteJobDescription) def apply(stat: NativeFileWriteResult): Unit = { val tmpAbsolutePath = s"${description.path}/${stat.relativePath}" - if (stat.partition_id != "__NO_PARTITION_ID__") { + if (stat.partition_id != CHColumnarWrite.EMPTY_PARTITION_ID) { partitions += stat.partition_id val customOutputPath = description.customPartitionLocations.get( @@ -316,4 +324,6 @@ object CHColumnarWrite { .asInstanceOf[CHColumnarWrite[FileCommitProtocol]] case other => CHDeltaColumnarWrite(jobTrackerID, description, other) } + + val EMPTY_PARTITION_ID = "__NO_PARTITION_ID__" } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala index 83bde9d168fd..21d113248821 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala @@ -158,6 +158,7 @@ object CHExecUtil extends Logging { columns: Int, rows: Int): Iterator[InternalRow] = { val rowInfo = CHBlockConverterJniWrapper.convertColumnarToRow(blockAddress, null) + assert(rowInfo.fieldsNum == columns) getRowIterFromSparkRowInfo(rowInfo, columns, rows) } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/RunTPCHTest.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/RunTPCHTest.scala index 0f709dbd77ec..46f93f15014e 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/RunTPCHTest.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/RunTPCHTest.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten -import org.apache.gluten.backendsapi.clickhouse.CHConf +import org.apache.gluten.backendsapi.clickhouse.RuntimeConfig import org.apache.gluten.benchmarks.GenTPCHTableScripts import org.apache.spark.sql.SparkSession @@ -101,7 +101,7 @@ object RunTPCHTest { .config("spark.sql.columnVector.offheap.enabled", "true") .config("spark.memory.offHeap.enabled", "true") .config("spark.memory.offHeap.size", offHeapSize) - .config(CHConf.runtimeConfig("logger.level"), "error") + .config(RuntimeConfig.LOGGER_LEVEL.key, "error") .config("spark.sql.warehouse.dir", warehouse) .config( "javax.jdo.option.ConnectionURL", diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala index 3736f0f14415..866c41c510fe 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala @@ -1025,12 +1025,11 @@ class GlutenClickHouseDeltaParquetWriteSuite } } - // FIXME: optimize - testSparkVersionLE33("test parquet optimize with the path based table") { + test("test parquet optimize with the path based table") { val dataPath = s"$basePath/lineitem_delta_parquet_optimize_path_based" clearDataPath(dataPath) withSQLConf( - "spark.databricks.delta.optimize.maxFileSize" -> "1000000", + "spark.databricks.delta.optimize.maxFileSize" -> "1100000", "spark.databricks.delta.optimize.minFileSize" -> "838000") { val sourceDF = spark.sql(s""" @@ -1047,10 +1046,14 @@ class GlutenClickHouseDeltaParquetWriteSuite val clickhouseTable = DeltaTable.forPath(spark, dataPath) clickhouseTable.optimize().executeCompaction() + // There are 75 parquet files + 2 json files after compaction + assert(countFiles(new File(dataPath)) === 77) + clickhouseTable.vacuum(0.0) if (spark32) { assert(countFiles(new File(dataPath)) === 27) } else { + // There are 25 parquet files + 4 json files after vacuum assert(countFiles(new File(dataPath)) === 29) } @@ -1060,7 +1063,7 @@ class GlutenClickHouseDeltaParquetWriteSuite withSQLConf( "spark.databricks.delta.optimize.maxFileSize" -> "10000000", - "spark.databricks.delta.optimize.minFileSize" -> "1000000") { + "spark.databricks.delta.optimize.minFileSize" -> "1100000") { val clickhouseTable = DeltaTable.forPath(spark, dataPath) clickhouseTable.optimize().executeCompaction() @@ -1069,6 +1072,7 @@ class GlutenClickHouseDeltaParquetWriteSuite if (spark32) { assert(countFiles(new File(dataPath)) === 6) } else { + // There are 3 parquet files + 7 json files + 2 check point files after vacuum assert(countFiles(new File(dataPath)) === 12) } @@ -1084,6 +1088,7 @@ class GlutenClickHouseDeltaParquetWriteSuite if (spark32) { assert(countFiles(new File(dataPath)) === 5) } else { + // There are 1 parquet file + 10 json files + 2 check point files after vacuum assert(countFiles(new File(dataPath)) === 13) } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala index 16ed302a02f4..d77779f95e33 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala @@ -17,6 +17,7 @@ package org.apache.gluten.execution.hive import org.apache.gluten.GlutenConfig +import org.apache.gluten.backendsapi.clickhouse.RuntimeConfig import org.apache.gluten.execution.GlutenClickHouseWholeStageTransformerSuite import org.apache.gluten.test.AllDataTypesWithComplexType.genTestData import org.apache.gluten.utils.UTSystemParameters @@ -37,8 +38,6 @@ class GlutenClickHouseNativeWriteTableSuite with NativeWriteChecker { override protected def sparkConf: SparkConf = { - import org.apache.gluten.backendsapi.clickhouse.CHConf._ - var sessionTimeZone = "GMT" if (isSparkVersionGE("3.5")) { sessionTimeZone = java.util.TimeZone.getDefault.getID @@ -67,7 +66,7 @@ class GlutenClickHouseNativeWriteTableSuite .set("spark.sql.storeAssignmentPolicy", "legacy") .set("spark.sql.warehouse.dir", getWarehouseDir) .set("spark.sql.session.timeZone", sessionTimeZone) - .setCHConfig("logger.level", "error") + .set(RuntimeConfig.LOGGER_LEVEL.key, "error") .setMaster("local[1]") } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala index 5c5f93c62c6b..59bdb4738303 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala @@ -16,6 +16,8 @@ */ package org.apache.gluten.execution.hive +import org.apache.gluten.GlutenConfig +import org.apache.gluten.backendsapi.clickhouse.{CHConf, RuntimeConfig, RuntimeSettings} import org.apache.gluten.execution.GlutenClickHouseTPCHAbstractSuite import org.apache.spark.SparkConf @@ -54,11 +56,13 @@ class GlutenClickHouseTableAfterRestart .set("spark.sql.shuffle.partitions", "5") .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.sql.adaptive.enabled", "true") - .setCHConfig("logger.level", "error") + .set(RuntimeConfig.LOGGER_LEVEL.key, "error") .setCHConfig("user_defined_path", "/tmp/user_defined") .set("spark.sql.files.maxPartitionBytes", "20000000") .set("spark.ui.enabled", "true") - .setCHSettings("min_insert_block_size_rows", 100000) + .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true") + .set(CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString) + .set(RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key, "100000") .setCHSettings("mergetree.merge_after_insert", false) .setCHSettings("input_format_parquet_max_block_size", 8192) .setMaster("local[2]") diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala index a01c708adabe..8722c676cf31 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala @@ -16,7 +16,8 @@ */ package org.apache.gluten.execution.mergetree -import org.apache.gluten.backendsapi.clickhouse.CHConf +import org.apache.gluten.GlutenConfig +import org.apache.gluten.backendsapi.clickhouse.{CHConf, RuntimeConfig} import org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite} import org.apache.spark.SparkConf @@ -55,8 +56,10 @@ class GlutenClickHouseMergeTreeCacheDataSuite .set("spark.sql.shuffle.partitions", "5") .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.sql.adaptive.enabled", "true") - .setCHConfig("logger.level", "error") + .set(RuntimeConfig.LOGGER_LEVEL.key, "error") .set("spark.gluten.soft-affinity.enabled", "true") + .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true") + .set(CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString) .setCHSettings("mergetree.merge_after_insert", false) } @@ -398,7 +401,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite spark.sql("drop table lineitem_mergetree_hdfs purge") } - test("test cache mergetree data no partition columns") { + testSparkVersionLE33("test cache mergetree data no partition columns") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_hdfs; @@ -487,7 +490,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite spark.sql("drop table lineitem_mergetree_hdfs purge") } - test("test cache mergetree data with upper case column name") { + testSparkVersionLE33("test cache mergetree data with upper case column name") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_hdfs; diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala index 9effd64a277f..42ff6d91360f 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala @@ -16,11 +16,13 @@ */ package org.apache.gluten.execution.mergetree -import org.apache.gluten.backendsapi.clickhouse.CHConf +import org.apache.gluten.GlutenConfig +import org.apache.gluten.backendsapi.clickhouse.{CHConf, RuntimeConfig, RuntimeSettings} import org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite} import org.apache.spark.SparkConf import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.delta.MergeTreeConf import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import io.delta.tables.ClickhouseTable @@ -49,8 +51,10 @@ class GlutenClickHouseMergeTreeOptimizeSuite .set("spark.sql.shuffle.partitions", "5") .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.sql.adaptive.enabled", "true") - .setCHConfig("logger.level", "error") - .setCHSettings("min_insert_block_size_rows", 10000) + .set(RuntimeConfig.LOGGER_LEVEL.key, "error") + .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true") + .set(CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString) + .set(RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key, "10000") .set( "spark.databricks.delta.retentionDurationCheck.enabled", "false" @@ -59,6 +63,23 @@ class GlutenClickHouseMergeTreeOptimizeSuite .setCHSettings("input_format_parquet_max_block_size", 8192) } + private def with_ut_conf(f: => Unit): Unit = { + val defaultBlockSize = RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key -> "1048449" + + /** The old merge-path will create uuid.txt by default, so we need to enable it for UT. */ + val assign_part_uuids = MergeTreeConf.ASSIGN_PART_UUIDS.key -> true.toString + + /** + * The old merge-path uses uncompressed bytes to choose wide or compaction mode, which is more + * accurate. By Using min_rows_for_wide_part, we can more accurately control the choosing of the + * mergetree table mode. + */ + val min_rows_for_wide_part = MergeTreeConf.MIN_ROWS_FOR_WIDE_PART.key -> "65536" + + val optimized = MergeTreeConf.OPTIMIZE_TASK.key -> true.toString + withSQLConf(defaultBlockSize, assign_part_uuids, optimized, min_rows_for_wide_part)(f) + } + override protected def createTPCHNotNullTables(): Unit = { createNotNullTPCHTablesInParquet(tablesPath) } @@ -76,7 +97,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite | as select * from lineitem |""".stripMargin) - spark.sql("optimize lineitem_mergetree_optimize") + with_ut_conf(spark.sql("optimize lineitem_mergetree_optimize")) val ret = spark.sql("select count(*) from lineitem_mergetree_optimize").collect() assertResult(600572)(ret.apply(0).get(0)) @@ -112,7 +133,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite |""".stripMargin) spark.sparkContext.setJobGroup("test", "test") - spark.sql("optimize lineitem_mergetree_optimize_p") + with_ut_conf(spark.sql("optimize lineitem_mergetree_optimize_p")) val job_ids = spark.sparkContext.statusTracker.getJobIdsForGroup("test") if (spark35) { assertResult(4)(job_ids.length) @@ -151,7 +172,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite |""".stripMargin) spark.sparkContext.setJobGroup("test2", "test2") - spark.sql("optimize lineitem_mergetree_optimize_p2") + with_ut_conf(spark.sql("optimize lineitem_mergetree_optimize_p2")) val job_ids = spark.sparkContext.statusTracker.getJobIdsForGroup("test2") if (spark32) { assertResult(7)(job_ids.length) // WILL trigger actual merge job @@ -197,7 +218,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite | as select * from lineitem |""".stripMargin) - spark.sql("optimize lineitem_mergetree_optimize_p3") + with_ut_conf(spark.sql("optimize lineitem_mergetree_optimize_p3")) val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect() assertResult(600572)(ret.apply(0).get(0)) @@ -234,7 +255,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite | as select * from lineitem |""".stripMargin) - spark.sql("optimize lineitem_mergetree_optimize_p4") + with_ut_conf(spark.sql("optimize lineitem_mergetree_optimize_p4")) val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect() assertResult(600572)(ret.apply(0).get(0)) @@ -272,7 +293,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite | as select * from lineitem |""".stripMargin) - spark.sql("optimize lineitem_mergetree_optimize_p5") + with_ut_conf(spark.sql("optimize lineitem_mergetree_optimize_p5")) spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") @@ -296,7 +317,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite // 1 merged part from 2 original parts, 1 merged part from 34 original parts // and 1 original part (size 838255) - spark.sql("optimize lineitem_mergetree_optimize_p5") + with_ut_conf(spark.sql("optimize lineitem_mergetree_optimize_p5")) spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") @@ -312,7 +333,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite } // now merge all parts (testing merging from merged parts) - spark.sql("optimize lineitem_mergetree_optimize_p5") + with_ut_conf(spark.sql("optimize lineitem_mergetree_optimize_p5")) spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") spark.sql("VACUUM lineitem_mergetree_optimize_p5 RETAIN 0 HOURS") @@ -327,7 +348,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite assertResult(600572)(ret.apply(0).get(0)) } - test("test mergetree optimize table with partition and bucket") { + testSparkVersionLE33("test mergetree optimize table with partition and bucket") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_optimize_p6; |""".stripMargin) @@ -374,7 +395,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite | as select * from lineitem |""".stripMargin) - spark.sql("optimize lineitem_mergetree_index") + with_ut_conf(spark.sql("optimize lineitem_mergetree_index")) spark.sql("vacuum lineitem_mergetree_index") val df = spark @@ -417,7 +438,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite .save(dataPath) val clickhouseTable = ClickhouseTable.forPath(spark, dataPath) - clickhouseTable.optimize().executeCompaction() + with_ut_conf(clickhouseTable.optimize().executeCompaction()) clickhouseTable.vacuum(0.0) clickhouseTable.vacuum(0.0) @@ -440,7 +461,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite // and 1 original part (size 838255) val clickhouseTable = ClickhouseTable.forPath(spark, dataPath) - clickhouseTable.optimize().executeCompaction() + with_ut_conf(clickhouseTable.optimize().executeCompaction()) clickhouseTable.vacuum(0.0) clickhouseTable.vacuum(0.0) @@ -456,7 +477,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite // now merge all parts (testing merging from merged parts) val clickhouseTable = ClickhouseTable.forPath(spark, dataPath) - clickhouseTable.optimize().executeCompaction() + with_ut_conf(clickhouseTable.optimize().executeCompaction()) clickhouseTable.vacuum(0.0) clickhouseTable.vacuum(0.0) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala index 76b3ba9f3724..cec4bdc105b2 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.gluten.execution.mergetree +import org.apache.gluten.GlutenConfig +import org.apache.gluten.backendsapi.clickhouse.{CHConf, RuntimeSettings} import org.apache.gluten.execution._ import org.apache.gluten.utils.Arm @@ -58,7 +60,9 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite .set("spark.sql.adaptive.enabled", "true") .set("spark.sql.files.maxPartitionBytes", "20000000") .set("spark.ui.enabled", "true") - .setCHSettings("min_insert_block_size_rows", 100000) + .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true") + .set(CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString) + .set(RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key, "100000") .setCHSettings("mergetree.merge_after_insert", false) .setCHSettings("input_format_parquet_max_block_size", 8192) @@ -304,7 +308,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite } } - test("test mergetree path based table update") { + testSparkVersionLE33("test mergetree path based table update") { val dataPath = s"$basePath/lineitem_mergetree_update" clearDataPath(dataPath) @@ -385,7 +389,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite assertResult(600572)(df.count()) } - test("test mergetree path based table delete") { + testSparkVersionLE33("test mergetree path based table delete") { val dataPath = s"$basePath/lineitem_mergetree_delete" clearDataPath(dataPath) @@ -653,7 +657,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite } } - test("test mergetree path based write with bucket table") { + testSparkVersionLE33("test mergetree path based write with bucket table") { val dataPath = s"$basePath/lineitem_mergetree_bucket" clearDataPath(dataPath) @@ -772,7 +776,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite } } - test("test mergetree path based CTAS complex") { + test("test mergetree path based CTAS partition") { val dataPath = s"$basePath/lineitem_mergetree_ctas2" clearDataPath(dataPath) @@ -780,8 +784,6 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite |CREATE TABLE clickhouse.`$dataPath` |USING clickhouse |PARTITIONED BY (l_shipdate) - |CLUSTERED BY (l_orderkey) - |${if (spark32) "" else "SORTED BY (l_partkey, l_returnflag)"} INTO 4 BUCKETS | as select * from lineitem |""".stripMargin) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala index b147173d255b..413448730042 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala @@ -16,7 +16,8 @@ */ package org.apache.gluten.execution.mergetree -import org.apache.gluten.backendsapi.clickhouse.CHConf +import org.apache.gluten.GlutenConfig +import org.apache.gluten.backendsapi.clickhouse.{CHConf, RuntimeConfig, RuntimeSettings} import org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite} import org.apache.spark.SparkConf @@ -57,7 +58,9 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite .set("spark.sql.shuffle.partitions", "5") .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.sql.adaptive.enabled", "true") - .setCHConfig("logger.level", "error") + .set(RuntimeConfig.LOGGER_LEVEL.key, "error") + .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true") + .set(CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString) .setCHSettings("mergetree.merge_after_insert", false) } @@ -359,7 +362,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite spark.sql("drop table lineitem_mergetree_partition_hdfs") } - test("test mergetree write with bucket table") { + testSparkVersionLE33("test mergetree write with bucket table") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_bucket_hdfs; |""".stripMargin) @@ -435,7 +438,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite spark.sql("drop table lineitem_mergetree_bucket_hdfs") } - test("test mergetree write with the path based") { + testSparkVersionLE33("test mergetree write with the path based") { val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs" val sourceDF = spark.sql(s""" @@ -500,7 +503,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite "spark.databricks.delta.optimize.minFileSize" -> "200000000", CHConf.runtimeSettings("mergetree.merge_after_insert") -> "true", CHConf.runtimeSettings("mergetree.insert_without_local_storage") -> "true", - CHConf.runtimeSettings("min_insert_block_size_rows") -> "10000" + RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key -> "10000" ) { spark.sql(s""" |DROP TABLE IF EXISTS $tableName; diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala index 2540186c9984..d689ad7ba1ca 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala @@ -16,7 +16,8 @@ */ package org.apache.gluten.execution.mergetree -import org.apache.gluten.backendsapi.clickhouse.CHConf +import org.apache.gluten.GlutenConfig +import org.apache.gluten.backendsapi.clickhouse.{CHConf, RuntimeConfig, RuntimeSettings} import org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite} import org.apache.spark.SparkConf @@ -57,7 +58,9 @@ class GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite .set("spark.sql.shuffle.partitions", "5") .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.sql.adaptive.enabled", "true") - .setCHConfig("logger.level", "error") + .set(RuntimeConfig.LOGGER_LEVEL.key, "error") + .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true") + .set(CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString) .setCHSettings("mergetree.merge_after_insert", false) } @@ -359,7 +362,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite spark.sql("drop table lineitem_mergetree_partition_hdfs") } - test("test mergetree write with bucket table") { + testSparkVersionLE33("test mergetree write with bucket table") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_bucket_hdfs; |""".stripMargin) @@ -435,7 +438,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite spark.sql("drop table lineitem_mergetree_bucket_hdfs purge") } - test("test mergetree write with the path based") { + testSparkVersionLE33("test mergetree write with the path based") { val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs" val sourceDF = spark.sql(s""" @@ -500,7 +503,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite "spark.databricks.delta.optimize.minFileSize" -> "200000000", CHConf.runtimeSettings("mergetree.merge_after_insert") -> "true", CHConf.runtimeSettings("mergetree.insert_without_local_storage") -> "true", - CHConf.runtimeSettings("min_insert_block_size_rows") -> "10000" + RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key -> "10000" ) { spark.sql(s""" |DROP TABLE IF EXISTS $tableName; diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala index 8e698adb4b51..edd41b6bea08 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala @@ -16,7 +16,8 @@ */ package org.apache.gluten.execution.mergetree -import org.apache.gluten.backendsapi.clickhouse.CHConf._ +import org.apache.gluten.GlutenConfig +import org.apache.gluten.backendsapi.clickhouse.{CHConf, RuntimeConfig} import org.apache.gluten.execution.{BasicScanExecTransformer, FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite} import org.apache.spark.SparkConf @@ -62,7 +63,9 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite .set("spark.sql.shuffle.partitions", "5") .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.sql.adaptive.enabled", "true") - .setCHConfig("logger.level", "error") + .set(RuntimeConfig.LOGGER_LEVEL.key, "error") + .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true") + .set(CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString) } override protected def beforeEach(): Unit = { @@ -420,7 +423,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite } - test("test mergetree write with bucket table") { + testSparkVersionLE33("test mergetree write with bucket table") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_bucket_s3; |""".stripMargin) @@ -496,7 +499,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite spark.sql("drop table lineitem_mergetree_bucket_s3") } - test("test mergetree write with the path based") { + testSparkVersionLE33("test mergetree write with the path based") { val dataPath = s"s3a://$BUCKET_NAME/lineitem_mergetree_bucket_s3" val sourceDF = spark.sql(s""" @@ -559,8 +562,8 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite withSQLConf( "spark.databricks.delta.optimize.minFileSize" -> "200000000", - runtimeSettings("mergetree.insert_without_local_storage") -> "true", - runtimeSettings("mergetree.merge_after_insert") -> "true" + CHConf.runtimeSettings("mergetree.insert_without_local_storage") -> "true", + CHConf.runtimeSettings("mergetree.merge_after_insert") -> "true" ) { spark.sql(s""" |DROP TABLE IF EXISTS $tableName; @@ -619,7 +622,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite FileUtils.forceDelete(new File(S3_METADATA_PATH)) - withSQLConf(runtimeSettings("enabled_driver_filter_mergetree_index") -> "true") { + withSQLConf(CHConf.runtimeSettings("enabled_driver_filter_mergetree_index") -> "true") { runTPCHQueryBySQL(6, q6(tableName)) { df => val scanExec = collect(df.queryExecution.executedPlan) { @@ -634,7 +637,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite case scanExec: BasicScanExecTransformer => scanExec } assertResult(1)(plans.size) - assertResult(1)(plans.head.getSplitInfos.size) + assertResult(1)(plans.head.getSplitInfos().size) } } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala index 60ca58d9fc29..5f1d531e0dcf 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala @@ -17,7 +17,7 @@ package org.apache.gluten.execution.mergetree import org.apache.gluten.GlutenConfig -import org.apache.gluten.backendsapi.clickhouse.CHConf +import org.apache.gluten.backendsapi.clickhouse.{CHConf, RuntimeSettings} import org.apache.gluten.execution._ import org.apache.gluten.utils.Arm @@ -58,7 +58,7 @@ class GlutenClickHouseMergeTreeWriteSuite .set("spark.sql.files.maxPartitionBytes", "20000000") .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true") .set(CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString) - .setCHSettings("min_insert_block_size_rows", 100000) + .set(RuntimeSettings.MIN_INSERT_BLOCK_SIZE_ROWS.key, "100000") .setCHSettings("mergetree.merge_after_insert", false) .setCHSettings("input_format_parquet_max_block_size", 8192) } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite.scala index 767585c4c41e..b1885874f5ae 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.gluten.execution.mergetree +import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.clickhouse.CHConf import org.apache.gluten.execution.GlutenClickHouseTPCHAbstractSuite @@ -42,6 +43,8 @@ class GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite .set("spark.sql.adaptive.enabled", "true") .set("spark.sql.files.maxPartitionBytes", "20000000") .set("spark.memory.offHeap.size", "4G") + .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true") + .set(CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString) } override protected def createTPCHNotNullTables(): Unit = { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala index dcb351458102..dfb41ef6ca38 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.gluten.execution.metrics +import org.apache.gluten.backendsapi.clickhouse.RuntimeConfig import org.apache.gluten.execution._ import org.apache.gluten.execution.GlutenPlan @@ -49,7 +50,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite .set("spark.io.compression.codec", "LZ4") .set("spark.sql.shuffle.partitions", "1") .set("spark.sql.autoBroadcastJoinThreshold", "10MB") - .setCHConfig("logger.level", "error") + .set(RuntimeConfig.LOGGER_LEVEL.key, "error") .setCHSettings("input_format_parquet_max_block_size", parquetMaxBlockSize) .setCHConfig("enable_pre_projection_for_join_conditions", "false") .setCHConfig("enable_streaming_aggregating", true) diff --git a/cpp-ch/local-engine/Common/DebugUtils.cpp b/cpp-ch/local-engine/Common/DebugUtils.cpp index 8cda6734670e..5bad9dfc3c1d 100644 --- a/cpp-ch/local-engine/Common/DebugUtils.cpp +++ b/cpp-ch/local-engine/Common/DebugUtils.cpp @@ -26,8 +26,10 @@ #include #include #include +#include #include #include +#include #include namespace pb_util = google::protobuf::util; @@ -289,6 +291,17 @@ static std::string showString(const NameAndColumns & block, size_t numRows, size /// +void dumpMemoryUsage(const char * type) +{ + auto logger = getLogger("QueryContextManager"); + if (!logger) + return; + auto task_id = local_engine::QueryContext::instance().currentTaskIdOrEmpty(); + task_id = task_id.empty() ? "" : "(" + task_id + ")"; + auto usage = local_engine::currentThreadGroupMemoryUsage(); + LOG_ERROR(logger, "{}{} Memory Usage {}", type, task_id, formatReadableSizeWithBinarySuffix(usage)); +} + void dumpPlan(DB::QueryPlan & plan, const char * type, bool force, LoggerPtr logger) { if (!logger) @@ -339,6 +352,24 @@ void headBlock(const DB::Block & block, size_t count) std::cerr << showString(block, count) << std::endl; } +void printBlockHeader(const DB::Block & block, const std::string & prefix) +{ + auto nameColumn = local_engine::STRING()->createColumn(); + auto typeColumn = local_engine::STRING()->createColumn(); + + for (const auto & column : block.getColumnsWithTypeAndName()) + { + nameColumn->insert(column.name); + typeColumn->insert(column.type->getName()); + } + + if (!prefix.empty()) + std::cerr << prefix << std::endl; + + std::cerr << Utils::showString({{"[Name]", nameColumn->getPtr()}, {"[type]", typeColumn->getPtr()}}, nameColumn->size(), 100, false) + << std::endl; +} + void headColumn(const DB::ColumnPtr & column, size_t count) { std::cerr << Utils::showString({{"Column", column}}, count, 20, false) << std::endl; diff --git a/cpp-ch/local-engine/Common/DebugUtils.h b/cpp-ch/local-engine/Common/DebugUtils.h index 7b9fb637dcc6..850da408fb22 100644 --- a/cpp-ch/local-engine/Common/DebugUtils.h +++ b/cpp-ch/local-engine/Common/DebugUtils.h @@ -29,11 +29,13 @@ class QueryPlan; namespace debug { +void dumpMemoryUsage(const char * type); void dumpPlan(DB::QueryPlan & plan, const char * type = "clickhouse plan", bool force = false, LoggerPtr = nullptr); void dumpMessage(const google::protobuf::Message & message, const char * type, bool force = false, LoggerPtr = nullptr); void headBlock(const DB::Block & block, size_t count = 10); void headColumn(const DB::ColumnPtr & column, size_t count = 10); +void printBlockHeader(const DB::Block & block, const std::string & prefix = ""); std::string showString(const DB::Block & block, size_t numRows = 20, size_t truncate = 20, bool vertical = false); inline std::string verticalShowString(const DB::Block & block, size_t numRows = 20, size_t truncate = 20) { diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp b/cpp-ch/local-engine/Common/QueryContext.cpp index eca9ad5b18de..32d84b0e2c79 100644 --- a/cpp-ch/local-engine/Common/QueryContext.cpp +++ b/cpp-ch/local-engine/Common/QueryContext.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include namespace DB @@ -133,7 +134,7 @@ String QueryContext::currentTaskIdOrEmpty() const int64_t id = reinterpret_cast(thread_group.get()); return query_map_.get(id)->task_id; } - return ""; + return ""; } void QueryContext::logCurrentPerformanceCounters(ProfileEvents::Counters & counters, const String & task_id) const @@ -170,23 +171,24 @@ void QueryContext::finalizeQuery(int64_t id) if (!CurrentThread::getGroup()) throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found."); std::shared_ptr context = query_map_.get(id); + query_map_.erase(id); + auto query_context = context->thread_status->getQueryContext(); if (!query_context) throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "query context not found"); context->thread_status->flushUntrackedMemory(); context->thread_status->finalizePerformanceCounters(); - LOG_INFO(logger_, "Task finished, peak memory usage: {} bytes", currentPeakMemory(id)); + auto peak = context->thread_group->memory_tracker.getPeak(); + LOG_INFO(logger_, "Task {} finished, peak memory usage: {}", context->task_id, formatReadableSizeWithBinarySuffix(peak)); - if (currentThreadGroupMemoryUsage() > 2_MiB) - LOG_WARNING(logger_, "{} bytes memory didn't release, There may be a memory leak!", currentThreadGroupMemoryUsage()); + auto final_usage = context->thread_group->memory_tracker.get(); + if (final_usage > 2_MiB) + LOG_WARNING(logger_, "{} memory didn't release, There may be a memory leak!", formatReadableSizeWithBinarySuffix(final_usage)); logCurrentPerformanceCounters(context->thread_group->performance_counters, context->task_id); context->thread_status->detachFromGroup(); context->thread_group.reset(); context->thread_status.reset(); query_context.reset(); - { - query_map_.erase(id); - } } size_t currentThreadGroupMemoryUsage() diff --git a/cpp-ch/local-engine/Parser/CHColumnToSparkRow.cpp b/cpp-ch/local-engine/Parser/CHColumnToSparkRow.cpp index d5a6c9641990..cf7056b57634 100644 --- a/cpp-ch/local-engine/Parser/CHColumnToSparkRow.cpp +++ b/cpp-ch/local-engine/Parser/CHColumnToSparkRow.cpp @@ -17,28 +17,26 @@ #include "CHColumnToSparkRow.h" #include #include -#include #include #include #include #include -#include #include #include #include #include -#include #include #include #include +#include #include namespace DB { namespace ErrorCodes { - extern const int LOGICAL_ERROR; - extern const int UNKNOWN_TYPE; +extern const int LOGICAL_ERROR; +extern const int UNKNOWN_TYPE; } } @@ -297,11 +295,7 @@ static void writeValue( } SparkRowInfo::SparkRowInfo( - const DB::ColumnsWithTypeAndName & cols, - const DB::DataTypes & dataTypes, - size_t col_size, - size_t row_size, - const MaskVector & masks) + const DB::ColumnsWithTypeAndName & cols, const DB::DataTypes & dataTypes, size_t col_size, size_t row_size, const MaskVector & masks) : types(dataTypes) , num_rows(masks == nullptr ? row_size : masks->size()) , num_cols(col_size) @@ -461,8 +455,7 @@ std::unique_ptr CHColumnToSparkRow::convertCHColumnToSparkRow(cons const auto & col = block.getByPosition(col_idx); int64_t field_offset = spark_row_info->getFieldOffset(col_idx); - ColumnWithTypeAndName col_full{col.column->convertToFullIfNeeded(), - removeLowCardinality(col.type), col.name}; + ColumnWithTypeAndName col_full{col.column->convertToFullIfNeeded(), removeLowCardinality(col.type), col.name}; writeValue( spark_row_info->getBufferAddress(), field_offset, @@ -585,8 +578,7 @@ int64_t BackingDataLengthCalculator::getArrayElementSize(const DataTypePtr & nes return 1; else if (nested_which.isUInt16() || nested_which.isInt16() || nested_which.isDate()) return 2; - else if ( - nested_which.isUInt32() || nested_which.isInt32() || nested_which.isFloat32() || nested_which.isDate32()) + else if (nested_which.isUInt32() || nested_which.isInt32() || nested_which.isFloat32() || nested_which.isDate32()) return 4; else if ( nested_which.isUInt64() || nested_which.isInt64() || nested_which.isFloat64() || nested_which.isDateTime64() @@ -686,9 +678,7 @@ int64_t VariableLengthDataWriter::writeArray(size_t row_idx, const DB::Array & a if (elem.isNull()) bitSet(buffer_address + offset + start + 8, i); else - { writer.write(elem, buffer_address + offset + start + 8 + len_null_bitmap + i * elem_size); - } } } else @@ -957,4 +947,34 @@ void FixedLengthDataWriter::unsafeWrite(const char * __restrict src, char * __re memcpy(buffer, src, type_without_nullable->getSizeOfValueInMemory()); } +namespace SparkRowInfoJNI +{ +static jclass spark_row_info_class; +static jmethodID spark_row_info_constructor; +void init(JNIEnv * env) +{ + spark_row_info_class = CreateGlobalClassReference(env, "Lorg/apache/gluten/row/SparkRowInfo;"); + spark_row_info_constructor = GetMethodID(env, spark_row_info_class, "", "([J[JJJJ)V"); +} +void destroy(JNIEnv * env) +{ + env->DeleteGlobalRef(spark_row_info_class); +} +jobject create(JNIEnv * env, const SparkRowInfo & spark_row_info) +{ + auto * offsets_arr = env->NewLongArray(spark_row_info.getNumRows()); + const auto * offsets_src = spark_row_info.getOffsets().data(); + env->SetLongArrayRegion(offsets_arr, 0, spark_row_info.getNumRows(), offsets_src); + auto * lengths_arr = env->NewLongArray(spark_row_info.getNumRows()); + const auto * lengths_src = spark_row_info.getLengths().data(); + env->SetLongArrayRegion(lengths_arr, 0, spark_row_info.getNumRows(), lengths_src); + int64_t address = reinterpret_cast(spark_row_info.getBufferAddress()); + int64_t column_number = spark_row_info.getNumCols(); + int64_t total_size = spark_row_info.getTotalBytes(); + + return env->NewObject( + spark_row_info_class, spark_row_info_constructor, offsets_arr, lengths_arr, address, column_number, total_size); +} +} + } diff --git a/cpp-ch/local-engine/Parser/CHColumnToSparkRow.h b/cpp-ch/local-engine/Parser/CHColumnToSparkRow.h index 6af580850d14..36bcfccb32f2 100644 --- a/cpp-ch/local-engine/Parser/CHColumnToSparkRow.h +++ b/cpp-ch/local-engine/Parser/CHColumnToSparkRow.h @@ -16,6 +16,7 @@ */ #pragma once #include +#include #include #include #include @@ -84,7 +85,7 @@ class SparkRowInfo : public boost::noncopyable using SparkRowInfoPtr = std::unique_ptr; -class CHColumnToSparkRow : private Allocator +class CHColumnToSparkRow : private Allocator // class CHColumnToSparkRow : public DB::Arena { public: @@ -198,4 +199,11 @@ class FixedLengthDataWriter const DB::WhichDataType which; }; +namespace SparkRowInfoJNI +{ +void init(JNIEnv *); +void destroy(JNIEnv *); +jobject create(JNIEnv * env, const SparkRowInfo & spark_row_info); +} + } diff --git a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp index 0d57d53ff640..97292a8b050e 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp @@ -150,16 +150,14 @@ void addMergeTreeSinkTransform( { Chain chain; // - auto stats = std::make_shared(header); + auto stats = MergeTreeStats::create(header, partition_by); chain.addSink(stats); // SparkMergeTreeWriteSettings write_settings{context}; - if (partition_by.empty()) - write_settings.partition_settings.partition_dir = SubstraitFileSink::NO_PARTITION_ID; auto sink = partition_by.empty() - ? SparkMergeTreeSink::create(merge_tree_table, write_settings, context->getGlobalContext(), {stats}) + ? SparkMergeTreeSink::create(merge_tree_table, write_settings, context->getQueryContext(), DeltaStats{header.columns()}, {stats}) : std::make_shared(header, partition_by, merge_tree_table, write_settings, context, stats); chain.addSource(sink); @@ -171,7 +169,7 @@ void addNormalFileWriterSinkTransform( const DB::QueryPipelineBuilderPtr & builder, const std::string & format_hint, const DB::Block & output, - const DB::Names & partitionCols) + const DB::Names & partition_by) { GlutenWriteSettings write_settings = GlutenWriteSettings::get(context); @@ -183,14 +181,14 @@ void addNormalFileWriterSinkTransform( FileNameGenerator generator(write_settings.task_write_filename_pattern); - auto stats = WriteStats::create(output, partitionCols); + auto stats = WriteStats::create(output, partition_by); builder->addSimpleTransform( [&](const Block & cur_header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr { if (stream_type != QueryPipelineBuilder::StreamType::Main) return nullptr; - return make_sink(context, partitionCols, cur_header, output, write_settings.task_write_tmp_dir, generator, format_hint, stats); + return make_sink(context, partition_by, cur_header, output, write_settings.task_write_tmp_dir, generator, format_hint, stats); }); builder->addSimpleTransform( [&](const Block &, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr diff --git a/cpp-ch/local-engine/Storages/MergeTree/MergeSparkMergeTreeTask.h b/cpp-ch/local-engine/Storages/MergeTree/MergeSparkMergeTreeTask.h index 60b3328f0d1b..9472c4bf51e4 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/MergeSparkMergeTreeTask.h +++ b/cpp-ch/local-engine/Storages/MergeTree/MergeSparkMergeTreeTask.h @@ -116,11 +116,4 @@ class MergeSparkMergeTreeTask : public IExecutableTask using MergeSparkMergeTreeTaskPtr = std::shared_ptr; - -[[maybe_unused]] static void executeHere(MergeSparkMergeTreeTaskPtr task) -{ - while (task->executeStep()) {} -} - - } diff --git a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp index 84dbc3a8d3bb..1b4685e4ea9b 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp @@ -69,19 +69,21 @@ std::unordered_map extractPartMetaData(ReadBuffer & in) enum SupportedMetaDataStorageType { - UNKNOWN =0, + UNKNOWN = 0, ROCKSDB, LOCAL }; template -static void restoreMetaData(const SparkStorageMergeTreePtr & storage, const MergeTreeTableInstance & mergeTreeTable, const Context & context) +static void +restoreMetaData(const SparkStorageMergeTreePtr & storage, const MergeTreeTableInstance & mergeTreeTable, const Context & context) { UNREACHABLE(); } template <> -void restoreMetaData(const SparkStorageMergeTreePtr & storage, const MergeTreeTableInstance & mergeTreeTable, const Context & context) +void restoreMetaData( + const SparkStorageMergeTreePtr & storage, const MergeTreeTableInstance & mergeTreeTable, const Context & context) { auto data_disk = storage->getStoragePolicy()->getAnyDisk(); std::unordered_set not_exists_part; @@ -109,23 +111,23 @@ void restoreMetaData(const SparkStorageMergeTreePtr & storage, const Me for (const auto & part : not_exists_part) { - auto part_path = table_path / part; - auto metadata_file_path = part_path / METADATA_FILE_NAME; - - if (metadata_storage->existsDirectory(part_path)) - return; - else - transaction->createDirectoryRecursive(part_path); - auto key = s3->generateObjectKeyForPath(metadata_file_path.generic_string(), std::nullopt); - StoredObject metadata_object(key.serialize()); - auto read_settings = ReadSettings{}; - read_settings.enable_filesystem_cache = false; - auto part_metadata = extractPartMetaData(*s3->readObject(metadata_object, read_settings)); - for (const auto & item : part_metadata) - { - auto item_path = part_path / item.first; - transaction->writeStringToFile(item_path, item.second); - } + auto part_path = table_path / part; + auto metadata_file_path = part_path / METADATA_FILE_NAME; + + if (metadata_storage->existsDirectory(part_path)) + return; + else + transaction->createDirectoryRecursive(part_path); + auto key = s3->generateObjectKeyForPath(metadata_file_path.generic_string(), std::nullopt); + StoredObject metadata_object(key.serialize()); + auto read_settings = ReadSettings{}; + read_settings.enable_filesystem_cache = false; + auto part_metadata = extractPartMetaData(*s3->readObject(metadata_object, read_settings)); + for (const auto & item : part_metadata) + { + auto item_path = part_path / item.first; + transaction->writeStringToFile(item_path, item.second); + } } transaction->commit(); } @@ -212,25 +214,16 @@ void restoreMetaData(const SparkStorageMergeTreePtr & storage, const MergeTreeTa return; auto metadata_storage = data_disk->getMetadataStorage(); if (metadata_storage->getType() == MetadataStorageType::Local) - { restoreMetaData(storage, mergeTreeTable, context); - } // None is RocksDB else if (metadata_storage->getType() == MetadataStorageType::None) - { restoreMetaData(storage, mergeTreeTable, context); - } else - { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported metadata storage type {}.", metadata_storage->getType()); - } } void saveFileStatus( - const DB::MergeTreeData & storage, - const DB::ContextPtr& context, - const String & part_name, - IDataPartStorage & data_part_storage) + const DB::MergeTreeData & storage, const DB::ContextPtr & context, const String & part_name, IDataPartStorage & data_part_storage) { const DiskPtr disk = storage.getStoragePolicy()->getAnyDisk(); if (!disk->isRemote()) @@ -252,11 +245,11 @@ void saveFileStatus( } -std::vector mergeParts( +MergeTreeDataPartPtr mergeParts( std::vector selected_parts, const String & new_part_uuid, SparkStorageMergeTree & storage, - const String & partition_dir, + const String & partition_dir, const String & bucket_dir) { auto future_part = std::make_shared(); @@ -264,41 +257,30 @@ std::vector mergeParts( future_part->assign(std::move(selected_parts)); future_part->part_info = MergeListElement::FAKE_RESULT_PART_FOR_PROJECTION; - future_part->name = partition_dir.empty() ? "" : partition_dir + "/"; - if(!bucket_dir.empty()) - { + //TODO: name + future_part->name = partition_dir.empty() ? "" : partition_dir + "/"; + if (!bucket_dir.empty()) future_part->name = future_part->name + bucket_dir + "/"; - } - future_part->name = future_part->name + new_part_uuid + "-merged"; + future_part->name = future_part->name + new_part_uuid + "-merged"; - auto entry = std::make_shared(future_part, DB::CurrentlyMergingPartsTaggerPtr{}, std::make_shared()); + auto entry = std::make_shared( + future_part, DB::CurrentlyMergingPartsTaggerPtr{}, std::make_shared()); // Copying a vector of columns `deduplicate by columns. DB::IExecutableTask::TaskResultCallback f = [](bool) {}; - auto task = std::make_shared( - storage, storage.getInMemoryMetadataPtr(), false, std::vector{}, false, entry, - DB::TableLockHolder{}, f); + const auto task = std::make_shared( + storage, storage.getInMemoryMetadataPtr(), false, std::vector{}, false, entry, DB::TableLockHolder{}, f); task->setCurrentTransaction(DB::MergeTreeTransactionHolder{}, DB::MergeTreeTransactionPtr{}); - executeHere(task); - - std::unordered_set to_load{future_part->name}; - std::vector merged = storage.loadDataPartsWithNames(to_load); - return merged; -} - -/** TODO: Remove it. - * Extract partition values from partition directory, we implement it in the java */ -void extractPartitionValues(const String & partition_dir, std::unordered_map & partition_values) -{ - Poco::StringTokenizer partitions(partition_dir, "/"); - for (const auto & partition : partitions) + while (task->executeStep()) { - Poco::StringTokenizer key_value(partition, "="); - chassert(key_value.count() == 2); - partition_values.emplace(key_value[0], key_value[1]); } + + std::vector merged = storage.loadDataPartsWithNames({future_part->name}); + assert(merged.size() == 1); + return merged[0]; } + } \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h index 827788a35a65..faa5734f572b 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h +++ b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h @@ -24,7 +24,6 @@ namespace local_engine { - bool isMergeTreePartMetaDataFile(const String & file_name); void restoreMetaData(const SparkStorageMergeTreePtr & storage, const MergeTreeTableInstance & mergeTreeTable, const Context & context); @@ -32,7 +31,7 @@ void restoreMetaData(const SparkStorageMergeTreePtr & storage, const MergeTreeTa void saveFileStatus( const DB::MergeTreeData & storage, const DB::ContextPtr & context, const String & part_name, IDataPartStorage & data_part_storage); -std::vector mergeParts( +MergeTreeDataPartPtr mergeParts( std::vector selected_parts, const String & new_part_uuid, SparkStorageMergeTree & storage, diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp index fae8f3ecef9e..372396598fc2 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp @@ -262,15 +262,6 @@ MergeTreeTable::MergeTreeTable(const local_engine::Write & write, const substrai table_configs.storage_policy = merge_tree_write.storage_policy(); } -std::unique_ptr buildMergeTreeSettings(const MergeTreeTableSettings & config) -{ - auto settings = std::make_unique(); - settings->set("allow_nullable_key", Field(1)); - if (!config.storage_policy.empty()) - settings->set("storage_policy", Field(config.storage_policy)); - return settings; -} - std::unique_ptr buildQueryInfo(NamesAndTypesList & names_and_types_list) { std::unique_ptr query_info = std::make_unique(); diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.h b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.h index 87b2d8403be3..773452bfc717 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.h +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.h @@ -16,11 +16,9 @@ */ #pragma once +#include #include #include - -#include -#include #include #include #include @@ -96,7 +94,5 @@ struct MergeTreeTableInstance : MergeTreeTable explicit MergeTreeTableInstance(const std::string & info); }; -std::unique_ptr buildMergeTreeSettings(const MergeTreeTableSettings & config); - std::unique_ptr buildQueryInfo(NamesAndTypesList & names_and_types_list); } diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp index d41e71fb848d..3c6b7ae824e2 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp @@ -42,12 +42,24 @@ namespace local_engine void SparkMergeTreeSink::write(const Chunk & chunk) { CurrentThread::flushUntrackedMemory(); - - /// Reset earlier, so put it in the scope - BlockWithPartition item{getHeader().cloneWithColumns(chunk.getColumns()), Row{}}; - - sink_helper->writeTempPart(item, context, part_num); - part_num++; + { + PartWithStats part_with_stats{ + .data_part = nullptr, + .delta_stats = empty_delta_stats_ + .transform( + [&](const auto & stats) + { + auto newStats = std::make_shared(stats); + newStats->update(chunk); + return newStats; + }) + .value_or(nullptr)}; + /// Reset earlier, so put it in the scope + BlockWithPartition item{getHeader().cloneWithColumns(chunk.getColumns()), Row{}}; + + sink_helper->writeTempPart(item, std::move(part_with_stats), context, part_num); + part_num++; + } } void SparkMergeTreeSink::consume(Chunk & chunk) @@ -86,6 +98,7 @@ SinkToStoragePtr SparkMergeTreeSink::create( const MergeTreeTable & merge_tree_table, const SparkMergeTreeWriteSettings & write_settings_, const DB::ContextMutablePtr & context, + const DeltaStatsOption & delta_stats, const SinkStatsOption & stats) { if (write_settings_.partition_settings.part_name_prefix.empty()) @@ -108,7 +121,12 @@ SinkToStoragePtr SparkMergeTreeSink::create( sink_helper = std::make_shared(dest_storage, write_settings_, isRemoteStorage); const DB::Settings & settings = context->getSettingsRef(); return std::make_shared( - sink_helper, context, stats, settings[Setting::min_insert_block_size_rows], settings[Setting::min_insert_block_size_bytes]); + sink_helper, + context, + delta_stats, + stats, + settings[Setting::min_insert_block_size_rows], + settings[Setting::min_insert_block_size_bytes]); } SinkHelper::SinkHelper(const SparkStorageMergeTreePtr & data_, const SparkMergeTreeWriteSettings & write_settings_, bool isRemoteStorage_) @@ -125,70 +143,95 @@ void SinkHelper::saveMetadata(const DB::ContextPtr & context) if (!isRemoteStorage) return; - const std::deque & parts = new_parts.unsafeGet(); + const std::deque & parts = new_parts.unsafeGet(); for (const auto & merge_tree_data_part : parts) { - auto part = dest_storage().loadDataPartsWithNames({merge_tree_data_part->name}); + auto part = dest_storage().loadDataPartsWithNames({merge_tree_data_part.data_part->name}); if (part.empty()) { LOG_WARNING( &Poco::Logger::get("SparkMergeTreeWriter"), "Save metadata failed because dest storage load part name {} empty.", - merge_tree_data_part->name); + merge_tree_data_part.data_part->name); continue; } saveFileStatus( - dest_storage(), context, merge_tree_data_part->name, const_cast(part.at(0)->getDataPartStorage())); + dest_storage(), + context, + merge_tree_data_part.data_part->name, + const_cast(part.at(0)->getDataPartStorage())); } } -void SinkHelper::doMergePartsAsync(const std::vector & prepare_merge_parts) +void SinkHelper::doMergePartsAsync(const std::vector & merge_parts_with_stats) { - for (const auto & selected_part : prepare_merge_parts) - tmp_parts.emplace(selected_part->name); + for (const auto & selected_part : merge_parts_with_stats) + tmp_parts.emplace(selected_part.data_part->name); // check a thread group initialized in task thread currentThreadGroupMemoryUsage(); thread_pool.scheduleOrThrow( - [this, prepare_merge_parts, thread_group = CurrentThread::getGroup()]() -> void + [this, merge_parts_with_stats, thread_group = CurrentThread::getGroup()]() -> void { Stopwatch watch; CurrentThread::detachFromGroupIfNotDetached(); CurrentThread::attachToGroup(thread_group); size_t before_size = 0; - size_t after_size = 0; - for (const auto & prepare_merge_part : prepare_merge_parts) - before_size += prepare_merge_part->getBytesOnDisk(); - const auto merged_parts = mergeParts( - prepare_merge_parts, + std::vector prepare_merge_parts_; + for (const auto & prepare_merge_part : merge_parts_with_stats) + { + before_size += prepare_merge_part.data_part->getBytesOnDisk(); + prepare_merge_parts_.emplace_back(prepare_merge_part.data_part); + } + + const auto merged_part = mergeParts( + prepare_merge_parts_, toString(UUIDHelpers::generateV4()), dataRef(), write_settings.partition_settings.partition_dir, write_settings.partition_settings.bucket_dir); - for (const auto & merge_tree_data_part : merged_parts) - after_size += merge_tree_data_part->getBytesOnDisk(); - new_parts.emplace_back(merged_parts); + size_t after_size = merged_part->getBytesOnDisk(); + if (std::ranges::any_of(merge_parts_with_stats, [](const auto & part) { return part.delta_stats == nullptr; })) + { + // no stats + new_parts.emplace_back(PartWithStats{std::move(merged_part), nullptr}); + } + else + { + auto merge_stats = merge_parts_with_stats.begin()->delta_stats; + for (auto begin = merge_parts_with_stats.begin() + 1; begin != merge_parts_with_stats.end(); ++begin) + merge_stats->merge(*begin->delta_stats); + new_parts.emplace_back(PartWithStats{std::move(merged_part), std::move(merge_stats)}); + } + watch.stop(); LOG_INFO( &Poco::Logger::get("SparkMergeTreeWriter"), "Merge success. Before merge part size {}, part count {}, after part size {}, part count {}, " "total elapsed {} ms", - before_size, - prepare_merge_parts.size(), - after_size, - merged_parts.size(), + before_size, // before size + merge_parts_with_stats.size(), // before part count + after_size, // after size + 1, // after part count watch.elapsedMilliseconds()); }); } -void SinkHelper::writeTempPart(DB::BlockWithPartition & block_with_partition, const ContextPtr & context, int part_num) +void SinkHelper::writeTempPart( + DB::BlockWithPartition & block_with_partition, PartWithStats part_with_stats, const ContextPtr & context, int part_num) { + assert(!metadata_snapshot->hasPartitionKey()); const std::string & part_name_prefix = write_settings.partition_settings.part_name_prefix; - std::string part_dir = fmt::format("{}_{:03d}", part_name_prefix, part_num); - auto tmp = dataRef().getWriter().writeTempPart(block_with_partition, metadata_snapshot, context, part_dir); - new_parts.emplace_back(tmp.part); + std::string part_dir; + if (write_settings.is_optimize_task) + part_dir = fmt::format("{}-merged", part_name_prefix); + else + part_dir = fmt::format("{}_{:03d}", part_name_prefix, part_num); + const auto tmp = dataRef().getWriter().writeTempPart(block_with_partition, metadata_snapshot, context, part_dir); + part_with_stats.data_part = tmp.part; + new_parts.emplace_back(std::move(part_with_stats)); } void SinkHelper::checkAndMerge(bool force) @@ -199,22 +242,22 @@ void SinkHelper::checkAndMerge(bool force) if (!force && new_parts.size() < write_settings.merge_limit_parts) return; - std::vector selected_parts; + std::vector selected_parts; selected_parts.reserve(write_settings.merge_limit_parts); size_t total_size = 0; - std::vector skip_parts; + std::vector skip_parts; while (const auto merge_tree_data_part_option = new_parts.pop_front()) { auto merge_tree_data_part = merge_tree_data_part_option.value(); - if (merge_tree_data_part->getBytesOnDisk() >= write_settings.merge_min_size) + if (merge_tree_data_part.data_part->getBytesOnDisk() >= write_settings.merge_min_size) { skip_parts.emplace_back(merge_tree_data_part); continue; } selected_parts.emplace_back(merge_tree_data_part); - total_size += merge_tree_data_part->getBytesOnDisk(); + total_size += merge_tree_data_part.data_part->getBytesOnDisk(); if (write_settings.merge_min_size > total_size && write_settings.merge_limit_parts > selected_parts.size()) continue; @@ -262,13 +305,13 @@ void CopyToRemoteSinkHelper::commit(const ReadSettings & read_settings, const Wr LOG_DEBUG( &Poco::Logger::get("SparkMergeTreeWriter"), "Begin upload to disk {}.", dest_storage().getStoragePolicy()->getAnyDisk()->getName()); - const std::deque & parts = new_parts.unsafeGet(); + const std::deque & parts = new_parts.unsafeGet(); Stopwatch watch; for (const auto & merge_tree_data_part : parts) { - String local_relative_path = dataRef().getRelativeDataPath() + "/" + merge_tree_data_part->name; - String remote_relative_path = dest_storage().getRelativeDataPath() + "/" + merge_tree_data_part->name; + String local_relative_path = dataRef().getRelativeDataPath() + "/" + merge_tree_data_part.data_part->name; + String remote_relative_path = dest_storage().getRelativeDataPath() + "/" + merge_tree_data_part.data_part->name; std::vector files; dataRef().getStoragePolicy()->getAnyDisk()->listFiles(local_relative_path, files); @@ -287,7 +330,7 @@ void CopyToRemoteSinkHelper::commit(const ReadSettings & read_settings, const Wr LOG_DEBUG( &Poco::Logger::get("SparkMergeTreeWriter"), "Upload part {} to disk {} success.", - merge_tree_data_part->name, + merge_tree_data_part.data_part->name, dest_storage().getStoragePolicy()->getAnyDisk()->getName()); } watch.stop(); @@ -307,7 +350,7 @@ void DirectSinkHelper::cleanup() // default storage need clean temp. std::unordered_set final_parts; for (const auto & merge_tree_data_part : new_parts.unsafeGet()) - final_parts.emplace(merge_tree_data_part->name); + final_parts.emplace(merge_tree_data_part.data_part->name); for (const auto & tmp_part : tmp_parts) { diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h index 828332d2d6c9..5ec374f02a31 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h @@ -55,7 +55,7 @@ class ConcurrentDeque deq.emplace_back(value); } - void emplace_back(std::vector values) + void emplace_back(const std::vector & values) { std::lock_guard lock(mtx); deq.insert(deq.end(), values.begin(), values.end()); @@ -87,13 +87,20 @@ class ConcurrentDeque mutable std::mutex mtx; }; +// +struct PartWithStats +{ + DB::MergeTreeDataPartPtr data_part; + std::shared_ptr delta_stats; // maybe null +}; +// class SinkHelper { protected: SparkStorageMergeTreePtr data; bool isRemoteStorage; - ConcurrentDeque new_parts; + ConcurrentDeque new_parts; std::unordered_set tmp_parts{}; ThreadPool thread_pool; @@ -104,7 +111,7 @@ class SinkHelper protected: virtual SparkStorageMergeTree & dest_storage() { return *data; } - void doMergePartsAsync(const std::vector & prepare_merge_parts); + void doMergePartsAsync(const std::vector & merge_parts_with_stats); void finalizeMerge(); virtual void cleanup() { } virtual void commit(const ReadSettings & read_settings, const WriteSettings & write_settings) { } @@ -112,9 +119,9 @@ class SinkHelper SparkWriteStorageMergeTree & dataRef() const { return assert_cast(*data); } public: - const std::deque & unsafeGet() const { return new_parts.unsafeGet(); } - - void writeTempPart(DB::BlockWithPartition & block_with_partition, const ContextPtr & context, int part_num); + const std::deque & unsafeGet() const { return new_parts.unsafeGet(); } + void + writeTempPart(DB::BlockWithPartition & block_with_partition, PartWithStats part_with_stats, const ContextPtr & context, int part_num); void checkAndMerge(bool force = false); void finish(const DB::ContextPtr & context); @@ -163,19 +170,21 @@ class MergeTreeStats : public WriteStatsBase partition_id, record_count, marks_count, - size_in_bytes + size_in_bytes, + stats_column_start = size_in_bytes + 1 }; - static DB::Block statsHeader() + static DB::ColumnsWithTypeAndName statsHeaderBase() { - return makeBlockHeader( - {{STRING(), "part_name"}, - {STRING(), "partition_id"}, - {BIGINT(), "record_count"}, - {BIGINT(), "marks_count"}, - {BIGINT(), "size_in_bytes"}}); + return { + {STRING(), "part_name"}, + {STRING(), "partition_id"}, + {BIGINT(), "record_count"}, + {BIGINT(), "marks_count"}, + {BIGINT(), "size_in_bytes"}}; } +protected: DB::Chunk final_result() override { size_t rows = columns_[part_name]->size(); @@ -183,15 +192,29 @@ class MergeTreeStats : public WriteStatsBase } public: - explicit MergeTreeStats(const DB::Block & input_header_) - : WriteStatsBase(input_header_, statsHeader()), columns_(statsHeader().cloneEmptyColumns()) + explicit MergeTreeStats(const DB::Block & input, const DB::Block & output) + : WriteStatsBase(input, output), columns_(output.cloneEmptyColumns()) { } - + static std::shared_ptr create(const DB::Block & input, const DB::Names & partition) + { + return std::make_shared(input, DeltaStats::statsHeader(input, partition, statsHeaderBase())); + } String getName() const override { return "MergeTreeStats"; } - void collectStats(const std::deque & parts, const std::string & partition) const + void collectStats(const std::deque & parts, const std::string & partition_dir) const { + const std::string & partition = partition_dir.empty() ? WriteStatsBase::NO_PARTITION_ID : partition_dir; + size_t columnSize = parts[0].delta_stats->min.size(); + assert(columns_.size() == stats_column_start + columnSize * 3); + assert(std::ranges::all_of( + parts, + [&](const auto & part) + { + return part.delta_stats->min.size() == columnSize && part.delta_stats->max.size() == columnSize + && part.delta_stats->null_count.size() == columnSize; + })); + const size_t size = parts.size() + columns_[part_name]->size(); columns_[part_name]->reserve(size); columns_[partition_id]->reserve(size); @@ -207,12 +230,23 @@ class MergeTreeStats : public WriteStatsBase for (const auto & part : parts) { - columns_[part_name]->insertData(part->name.c_str(), part->name.size()); + std::string part_name_without_partition + = partition_dir.empty() ? part.data_part->name : part.data_part->name.substr(partition_dir.size() + 1); + columns_[part_name]->insertData(part_name_without_partition.c_str(), part_name_without_partition.size()); columns_[partition_id]->insertData(partition.c_str(), partition.size()); - countColData.emplace_back(part->rows_count); - marksColData.emplace_back(part->getMarksCount()); - bytesColData.emplace_back(part->getBytesOnDisk()); + countColData.emplace_back(part.data_part->rows_count); + marksColData.emplace_back(part.data_part->getMarksCount()); + bytesColData.emplace_back(part.data_part->getBytesOnDisk()); + + for (int i = 0; i < columnSize; ++i) + { + size_t offset = stats_column_start + i; + columns_[offset]->insert(part.delta_stats->min[i]); + columns_[columnSize + offset]->insert(part.delta_stats->max[i]); + auto & nullCountData = static_cast &>(*columns_[(columnSize * 2) + offset]).getData(); + nullCountData.emplace_back(part.delta_stats->null_count[i]); + } } } }; @@ -221,15 +255,18 @@ class SparkMergeTreeSink : public DB::SinkToStorage { public: using SinkStatsOption = std::optional>; + using DeltaStatsOption = std::optional; static SinkToStoragePtr create( const MergeTreeTable & merge_tree_table, const SparkMergeTreeWriteSettings & write_settings_, const DB::ContextMutablePtr & context, + const DeltaStatsOption & delta_stats = {}, const SinkStatsOption & stats = {}); explicit SparkMergeTreeSink( const SinkHelperPtr & sink_helper_, const ContextPtr & context_, + const DeltaStatsOption & delta_stats, const SinkStatsOption & stats, size_t min_block_size_rows, size_t min_block_size_bytes) @@ -238,6 +275,7 @@ class SparkMergeTreeSink : public DB::SinkToStorage , sink_helper(sink_helper_) , stats_(stats) , squashing(sink_helper_->metadata_snapshot->getSampleBlock(), min_block_size_rows, min_block_size_bytes) + , empty_delta_stats_(delta_stats) { } ~SparkMergeTreeSink() override = default; @@ -257,6 +295,7 @@ class SparkMergeTreeSink : public DB::SinkToStorage std::optional> stats_; Squashing squashing; Chunk squashed_chunk; + DeltaStatsOption empty_delta_stats_; int part_num = 1; }; @@ -284,12 +323,12 @@ class SparkMergeTreePartitionedFileSink final : public SparkPartitionedBaseSink assert(write_settings.partition_settings.partition_dir.empty()); assert(write_settings.partition_settings.bucket_dir.empty()); - write_settings.partition_settings.part_name_prefix - = fmt::format("{}/{}", partition_id, write_settings.partition_settings.part_name_prefix); + write_settings.partition_settings.part_name_prefix = fmt::format( + "{}/{}{}", partition_id, toString(DB::UUIDHelpers::generateV4()), write_settings.partition_settings.part_name_prefix); write_settings.partition_settings.partition_dir = partition_id; return SparkMergeTreeSink::create( - table, write_settings, context_->getGlobalContext(), {std::dynamic_pointer_cast(stats_)}); + table, write_settings, context_->getQueryContext(), empty_delta_stats_, {std::dynamic_pointer_cast(stats_)}); } // TODO implement with bucket diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.cpp index e584b003d2c6..d81961a4bcb6 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.cpp @@ -38,6 +38,9 @@ SparkMergeTreeWriteSettings::SparkMergeTreeWriteSettings(const DB::ContextPtr & if (DB::Field limit_cnt_field; settings.tryGet("mergetree.max_num_part_per_merge_task", limit_cnt_field)) merge_limit_parts = limit_cnt_field.safeGet() <= 0 ? merge_limit_parts : limit_cnt_field.safeGet(); + + if (settingsEqual(context->getSettingsRef(), MergeTreeConf::OPTIMIZE_TASK, "true")) + is_optimize_task = true; } } diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.h b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.h index e89b2aaf5e44..84569be863cd 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.h +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.h @@ -32,9 +32,19 @@ struct SparkMergeTreeWriteSettings SparkMergeTreeWritePartitionSettings partition_settings; bool merge_after_insert{true}; bool insert_without_local_storage{false}; - size_t merge_min_size = 1024 * 1024 * 1024; + bool is_optimize_task{false}; + size_t merge_min_size = 1024 * 1024 * 1024; // 1GB size_t merge_limit_parts = 10; explicit SparkMergeTreeWriteSettings(const DB::ContextPtr & context); }; + +struct MergeTreeConf +{ + inline static const String CH_CONF{"merge_tree"}; + inline static const String GLUTEN_CONF{"mergetree"}; + + inline static const String OPTIMIZE_TASK{GLUTEN_CONF + ".optimize_task"}; + // inline static const String MAX_NUM_PART_PER_MERGE_TASK{GLUTEN_CONF + ".max_num_part_per_merge_task"}; +}; } \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp index 95145d43fab9..f412bfac0a6b 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp @@ -154,10 +154,10 @@ std::vector SparkMergeTreeWriter::getAllPartInfo() const for (const auto & part : parts) { res.emplace_back(PartInfo{ - part->name, - part->getMarksCount(), - part->getBytesOnDisk(), - part->rows_count, + part.data_part->name, + part.data_part->getMarksCount(), + part.data_part->getBytesOnDisk(), + part.data_part->rows_count, sink_helper.write_settings.partition_settings.partition_dir, sink_helper.write_settings.partition_settings.bucket_dir}); } diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp index c1a95a6be02c..fda49e65df73 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp @@ -34,6 +34,7 @@ namespace DB { namespace MergeTreeSetting { +extern const MergeTreeSettingsBool assign_part_uuids; extern const MergeTreeSettingsFloat ratio_of_defaults_for_sparse_serialization; extern const MergeTreeSettingsBool fsync_part_directory; extern const MergeTreeSettingsBool fsync_after_insert; @@ -463,6 +464,10 @@ MergeTreeDataWriter::TemporaryPart SparkMergeTreeDataWriter::writeTempPart( new_data_part->minmax_idx = std::move(minmax_idx); data_part_storage->beginTransaction(); + + if (data_settings[MergeTreeSetting::assign_part_uuids]) + new_data_part->uuid = UUIDHelpers::generateV4(); + SyncGuardPtr sync_guard; if (new_data_part->isStoredOnDisk()) { @@ -518,6 +523,25 @@ MergeTreeDataWriter::TemporaryPart SparkMergeTreeDataWriter::writeTempPart( return temp_part; } +std::unique_ptr +SparkWriteStorageMergeTree::buildMergeTreeSettings(const ContextMutablePtr & context, const MergeTreeTableSettings & config) +{ + //TODO: set settings though ASTStorage + auto settings = std::make_unique(); + + settings->set("allow_nullable_key", Field(true)); + if (!config.storage_policy.empty()) + settings->set("storage_policy", Field(config.storage_policy)); + + if (settingsEqual(context->getSettingsRef(), "merge_tree.assign_part_uuids", "true")) + settings->set("assign_part_uuids", Field(true)); + + if (String min_rows_for_wide_part; tryGetString(context->getSettingsRef(), "merge_tree.min_rows_for_wide_part", min_rows_for_wide_part)) + settings->set("min_rows_for_wide_part", Field(std::strtoll(min_rows_for_wide_part.c_str(), nullptr, 10))); + + return settings; +} + SinkToStoragePtr SparkWriteStorageMergeTree::write( const ASTPtr &, const StorageMetadataPtr & /*storage_in_memory_metadata*/, ContextPtr context, bool /*async_insert*/) { diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h index 237cf6919208..be7abd7beece 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h @@ -20,10 +20,12 @@ #include #include #include +#include #include #include #include #include +#include namespace local_engine { @@ -71,7 +73,7 @@ class SparkStorageMergeTree : public MergeTreeData std::map getUnfinishedMutationCommands() const override; std::vector loadDataPartsWithNames(const std::unordered_set & parts); void removePartFromMemory(const MergeTreeData::DataPart & part_to_detach); - void prefetchPartDataFile(const std::unordered_set& parts) const; + void prefetchPartDataFile(const std::unordered_set & parts) const; MergeTreeDataSelectExecutor reader; MergeTreeDataMergerMutator merger_mutator; @@ -92,8 +94,8 @@ class SparkStorageMergeTree : public MergeTreeData static std::atomic part_num; SimpleIncrement increment; - void prefetchPartFiles(const std::unordered_set& parts, String file_name) const; - void prefetchMetaDataFile(const std::unordered_set& parts) const; + void prefetchPartFiles(const std::unordered_set & parts, String file_name) const; + void prefetchMetaDataFile(const std::unordered_set & parts) const; void startBackgroundMovesIfNeeded() override; std::unique_ptr getDefaultSettings() const override; LoadPartResult loadDataPart( @@ -119,6 +121,9 @@ class SparkStorageMergeTree : public MergeTreeData class SparkWriteStorageMergeTree final : public SparkStorageMergeTree { + static std::unique_ptr + buildMergeTreeSettings(const ContextMutablePtr & context, const MergeTreeTableSettings & config); + public: SparkWriteStorageMergeTree(const MergeTreeTable & table_, const StorageInMemoryMetadata & metadata, const ContextMutablePtr & context_) : SparkStorageMergeTree( @@ -129,7 +134,7 @@ class SparkWriteStorageMergeTree final : public SparkStorageMergeTree context_, "", MergingParams(), - buildMergeTreeSettings(table_.table_configs), + buildMergeTreeSettings(context_, table_.table_configs), false /*has_force_restore_data_flag*/) , table(table_) , writer(*this) diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp index 2d70380a8959..1f3d7f0b2598 100644 --- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp +++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp @@ -16,19 +16,19 @@ */ #include "NormalFileWriter.h" +#include +#include +#include #include #include #include -#include -#include -#include namespace local_engine { using namespace DB; -const std::string SubstraitFileSink::NO_PARTITION_ID{"__NO_PARTITION_ID__"}; +const std::string WriteStatsBase::NO_PARTITION_ID{"__NO_PARTITION_ID__"}; const std::string SparkPartitionedBaseSink::DEFAULT_PARTITION_NAME{"__HIVE_DEFAULT_PARTITION__"}; const std::string SparkPartitionedBaseSink::BUCKET_COLUMN_NAME{"__bucket_value__"}; const std::vector FileNameGenerator::SUPPORT_PLACEHOLDERS{"{id}", "{bucket}"}; diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h index 998f8d624721..9e848f82d782 100644 --- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h +++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h @@ -33,6 +33,7 @@ #include #include #include +#include namespace local_engine { @@ -105,11 +106,14 @@ struct DeltaStats explicit DeltaStats(size_t size, const std::set & partition_index_ = {}) : row_count(0), min(size), max(size), null_count(size, 0), partition_index(partition_index_) { + assert(size > 0); } + bool initialized() const { return row_count > 0; } + void update(const DB::Chunk & chunk) { - row_count += chunk.getNumRows(); + assert(chunk.getNumRows() > 0); const auto & columns = chunk.getColumns(); assert(columns.size() == min.size() + partition_index.size()); for (size_t i = 0, col = 0; col < columns.size(); ++col) @@ -127,21 +131,50 @@ struct DeltaStats this->null_count[i] += null_count; DB::Field min_value, max_value; - column->getExtremes(min_value, max_value); + if (const auto * column_nullable = typeid_cast(column.get())) + column_nullable->getExtremesNullLast(min_value, max_value); + else + column->getExtremes(min_value, max_value); + assert(min[i].isNull() || min_value.getType() == min[i].getType()); assert(max[i].isNull() || max_value.getType() == max[i].getType()); - if (min[i].isNull() || min_value < min[i]) + + if (!initialized()) + { min[i] = min_value; - if (max[i].isNull() || max_value > max[i]) max[i] = max_value; - + } + else + { + min[i] = applyVisitor(DB::FieldVisitorAccurateLess(), min[i], min_value) ? min[i] : min_value; + max[i] = applyVisitor(DB::FieldVisitorAccurateLess(), max[i], max_value) ? max_value : max[i]; + } ++i; } + + row_count += chunk.getNumRows(); + } + + void merge(const DeltaStats & right) + { + assert(min.size() == right.min.size()); + assert(partition_index == right.partition_index); + + for (size_t i = 0; i < min.size(); ++i) + { + null_count[i] += right.null_count[i]; + min[i] = std::min(min[i], right.min[i]); + max[i] = std::max(max[i], right.max[i]); + } } }; class WriteStatsBase : public DB::ISimpleTransform { +public: + /// visible for UTs + static const std::string NO_PARTITION_ID; + protected: bool all_chunks_processed_ = false; /// flag to determine if we have already processed all chunks virtual DB::Chunk final_result() = 0; @@ -181,8 +214,13 @@ class WriteStats : public WriteStatsBase { filename, partition_id, - record_count + record_count, + stats_column_start = record_count + 1 }; + static DB::ColumnsWithTypeAndName statsHeaderBase() + { + return {{STRING(), "filename"}, {STRING(), "partition_id"}, {BIGINT(), "record_count"}}; + } protected: DB::Chunk final_result() override @@ -196,30 +234,28 @@ class WriteStats : public WriteStatsBase : WriteStatsBase(input_header_, output_header_), columns_(output_header_.cloneEmptyColumns()) { } - - static std::shared_ptr create(const DB::Block & input_header_, const DB::Names & partition) + static std::shared_ptr create(const DB::Block & input, const DB::Names & partition) { - return std::make_shared( - input_header_, - DeltaStats::statsHeader( - input_header_, partition, {{STRING(), "filename"}, {STRING(), "partition_id"}, {BIGINT(), "record_count"}})); + return std::make_shared(input, DeltaStats::statsHeader(input, partition, statsHeaderBase())); } String getName() const override { return "WriteStats"; } - void collectStats(const String & filename, const String & partition, const DeltaStats & stats) const + void collectStats(const String & filename, const String & partition_dir, const DeltaStats & stats) const { - // 3 => filename, partition_id, record_count - constexpr size_t baseOffset = 3; - assert(columns_.size() == baseOffset + stats.min.size() + stats.max.size() + stats.null_count.size()); + const std::string & partition = partition_dir.empty() ? WriteStatsBase::NO_PARTITION_ID : partition_dir; + size_t columnSize = stats.min.size(); + assert(columns_.size() == stats_column_start + columnSize * 3); + assert(stats.min.size() == stats.max.size() && stats.min.size() == stats.null_count.size()); + columns_[ColumnIndex::filename]->insertData(filename.c_str(), filename.size()); columns_[partition_id]->insertData(partition.c_str(), partition.size()); auto & countColData = static_cast &>(*columns_[record_count]).getData(); countColData.emplace_back(stats.row_count); - size_t columnSize = stats.min.size(); + for (int i = 0; i < columnSize; ++i) { - size_t offset = baseOffset + i; + size_t offset = stats_column_start + i; columns_[offset]->insert(stats.min[i]); columns_[columnSize + offset]->insert(stats.max[i]); auto & nullCountData = static_cast &>(*columns_[(columnSize * 2) + offset]).getData(); @@ -236,21 +272,18 @@ struct FileNameGenerator const std::vector need_to_replace; const std::string file_pattern; - FileNameGenerator(const std::string & file_pattern) - : file_pattern(file_pattern), need_to_replace(compute_need_to_replace(file_pattern)) + FileNameGenerator(const std::string & file_pattern) : file_pattern(file_pattern), need_to_replace(compute_need_to_replace(file_pattern)) { } std::vector compute_need_to_replace(const std::string & file_pattern) { std::vector result; - for(const std::string& placeholder: SUPPORT_PLACEHOLDERS) - { + for (const std::string & placeholder : SUPPORT_PLACEHOLDERS) if (file_pattern.find(placeholder) != std::string::npos) result.push_back(true); else result.push_back(false); - } return result; } @@ -295,9 +328,6 @@ class SubstraitFileSink final : public DB::SinkToStorage } public: - /// visible for UTs - static const std::string NO_PARTITION_ID; - explicit SubstraitFileSink( const DB::ContextPtr & context, const std::string & base_path, @@ -309,7 +339,7 @@ class SubstraitFileSink final : public DB::SinkToStorage const std::shared_ptr & stats, const DeltaStats & delta_stats) : SinkToStorage(header) - , partition_id_(partition_id.empty() ? NO_PARTITION_ID : partition_id) + , partition_id_(partition_id) , bucketed_write_(bucketed_write) , relative_path_(relative) , format_file_(createOutputFormatFile(context, makeAbsoluteFilename(base_path, partition_id, relative), header, format_hint)) @@ -353,15 +383,13 @@ class SubstraitFileSink final : public DB::SinkToStorage class SparkPartitionedBaseSink : public DB::PartitionedSink { - public: static const std::string DEFAULT_PARTITION_NAME; static const std::string BUCKET_COLUMN_NAME; static bool isBucketedWrite(const DB::Block & input_header) { - return input_header.has(BUCKET_COLUMN_NAME) && - input_header.getPositionByName(BUCKET_COLUMN_NAME) == input_header.columns() - 1; + return input_header.has(BUCKET_COLUMN_NAME) && input_header.getPositionByName(BUCKET_COLUMN_NAME) == input_header.columns() - 1; } /// visible for UTs @@ -387,7 +415,7 @@ class SparkPartitionedBaseSink : public DB::PartitionedSink } if (isBucketedWrite(input_header)) { - DB::ASTs args {std::make_shared("%05d"), std::make_shared(BUCKET_COLUMN_NAME)}; + DB::ASTs args{std::make_shared("%05d"), std::make_shared(BUCKET_COLUMN_NAME)}; arguments.emplace_back(DB::makeASTFunction("printf", std::move(args))); } assert(!arguments.empty()); diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 2ec0622311be..c39c8925f840 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -93,9 +93,6 @@ namespace dbms class LocalExecutor; } -static jclass spark_row_info_class; -static jmethodID spark_row_info_constructor; - static jclass block_stripes_class; static jmethodID block_stripes_constructor; @@ -113,9 +110,6 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/) local_engine::JniErrorsGlobalState::instance().initialize(env); - spark_row_info_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/row/SparkRowInfo;"); - spark_row_info_constructor = local_engine::GetMethodID(env, spark_row_info_class, "", "([J[JJJJ)V"); - block_stripes_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/spark/sql/execution/datasources/BlockStripes;"); block_stripes_constructor = local_engine::GetMethodID(env, block_stripes_class, "", "(J[J[II)V"); @@ -164,6 +158,7 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/) local_engine::BroadCastJoinBuilder::init(env); local_engine::CacheManager::initJNI(env); local_engine::SparkMergeTreeWriterJNI::init(env); + local_engine::SparkRowInfoJNI::init(env); local_engine::JNIUtils::vm = vm; return JNI_VERSION_1_8; @@ -180,8 +175,8 @@ JNIEXPORT void JNI_OnUnload(JavaVM * vm, void * /*reserved*/) local_engine::JniErrorsGlobalState::instance().destroy(env); local_engine::BroadCastJoinBuilder::destroy(env); local_engine::SparkMergeTreeWriterJNI::destroy(env); + local_engine::SparkRowInfoJNI::destroy(env); - env->DeleteGlobalRef(spark_row_info_class); env->DeleteGlobalRef(block_stripes_class); env->DeleteGlobalRef(split_result_class); env->DeleteGlobalRef(block_stats_class); @@ -784,9 +779,6 @@ JNIEXPORT jobject Java_org_apache_gluten_vectorized_CHBlockConverterJniWrapper_c JNIEnv * env, jclass, jlong block_address, jintArray masks) { LOCAL_ENGINE_JNI_METHOD_START - local_engine::CHColumnToSparkRow converter; - - std::unique_ptr spark_row_info = nullptr; local_engine::MaskVector mask = nullptr; DB::Block * block = reinterpret_cast(block_address); if (masks != nullptr) @@ -797,21 +789,9 @@ JNIEXPORT jobject Java_org_apache_gluten_vectorized_CHBlockConverterJniWrapper_c mask->push_back(safeArray.elems()[j]); } - spark_row_info = converter.convertCHColumnToSparkRow(*block, mask); - - auto * offsets_arr = env->NewLongArray(spark_row_info->getNumRows()); - const auto * offsets_src = spark_row_info->getOffsets().data(); - env->SetLongArrayRegion(offsets_arr, 0, spark_row_info->getNumRows(), offsets_src); - auto * lengths_arr = env->NewLongArray(spark_row_info->getNumRows()); - const auto * lengths_src = spark_row_info->getLengths().data(); - env->SetLongArrayRegion(lengths_arr, 0, spark_row_info->getNumRows(), lengths_src); - int64_t address = reinterpret_cast(spark_row_info->getBufferAddress()); - int64_t column_number = spark_row_info->getNumCols(); - int64_t total_size = spark_row_info->getTotalBytes(); - - jobject spark_row_info_object - = env->NewObject(spark_row_info_class, spark_row_info_constructor, offsets_arr, lengths_arr, address, column_number, total_size); - return spark_row_info_object; + local_engine::CHColumnToSparkRow converter; + std::unique_ptr spark_row_info = converter.convertCHColumnToSparkRow(*block, mask); + return local_engine::SparkRowInfoJNI::create(env, *spark_row_info); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } @@ -1014,7 +994,7 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn local_engine::MergeTreeTableInstance merge_tree_table(extension_table); auto context = local_engine::QueryContext::instance().currentQueryContext(); - // each task using its own CustomStorageMergeTree, don't reuse + // each task, using its own CustomStorageMergeTree, doesn't reuse auto temp_storage = merge_tree_table.copyToVirtualStorage(context); // prefetch all needed parts metadata before merge local_engine::restoreMetaData(temp_storage, merge_tree_table, *context); @@ -1026,16 +1006,13 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn std::vector selected_parts = local_engine::StorageMergeTreeFactory::getDataPartsByNames(temp_storage->getStorageID(), "", merge_tree_table.getPartNames()); - std::vector loaded - = local_engine::mergeParts(selected_parts, uuid_str, *temp_storage, partition_dir, bucket_dir); + DB::MergeTreeDataPartPtr loaded = local_engine::mergeParts(selected_parts, uuid_str, *temp_storage, partition_dir, bucket_dir); std::vector res; - for (auto & partPtr : loaded) - { - saveFileStatus(*temp_storage, context, partPtr->name, const_cast(partPtr->getDataPartStorage())); - res.emplace_back(local_engine::PartInfo{ - partPtr->name, partPtr->getMarksCount(), partPtr->getBytesOnDisk(), partPtr->rows_count, partition_dir, bucket_dir}); - } + + saveFileStatus(*temp_storage, context, loaded->name, const_cast(loaded->getDataPartStorage())); + res.emplace_back(local_engine::PartInfo{ + loaded->name, loaded->getMarksCount(), loaded->getBytesOnDisk(), loaded->rows_count, partition_dir, bucket_dir}); auto json_info = local_engine::PartInfo::toJson(res); return local_engine::charTojstring(env, json_info.c_str()); diff --git a/cpp-ch/local-engine/tests/gluten_test_util.h b/cpp-ch/local-engine/tests/gluten_test_util.h index a61612662961..7a4f2912ccd9 100644 --- a/cpp-ch/local-engine/tests/gluten_test_util.h +++ b/cpp-ch/local-engine/tests/gluten_test_util.h @@ -83,6 +83,8 @@ std::pair> create_plan_and_execu } +using TestSettings = std::map; + inline std::string replaceLocalFilesWildcards(const std::string_view haystack, const std::string_view replaced) { static constexpr auto wildcard = "{replace_local_files}"; diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp index a36601d6afa5..9761d837d0e9 100644 --- a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp +++ b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp @@ -157,7 +157,7 @@ TEST(WritePipeline, SubstraitFileSink) const auto & col_a = *(x.getColumns()[0]); EXPECT_EQ(settings.task_write_filename_pattern, col_a.getDataAt(0)); const auto & col_b = *(x.getColumns()[1]); - EXPECT_EQ(SubstraitFileSink::NO_PARTITION_ID, col_b.getDataAt(0)); + EXPECT_EQ(WriteStatsBase::NO_PARTITION_ID, col_b.getDataAt(0)); const auto & col_c = *(x.getColumns()[2]); EXPECT_EQ(10000, col_c.getInt(0)); } diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp b/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp index a5cd3fd7f39c..7bc39ccc8a2a 100644 --- a/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp +++ b/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp @@ -261,18 +261,19 @@ namespace void writeMerge( std::string_view json_plan, const std::string & outputPath, + const TestSettings & test_settings, const std::function & callback, std::optional input = std::nullopt) { - const auto context = DB::Context::createCopy(QueryContext::globalContext()); - - auto queryid = QueryContext::instance().initializeQuery("gtest_mergetree"); - SCOPE_EXIT({ QueryContext::instance().finalizeQuery(queryid); }); - + auto query_id = QueryContext::instance().initializeQuery("gtest_mergetree"); + SCOPE_EXIT({ QueryContext::instance().finalizeQuery(query_id); }); + const auto context = QueryContext::instance().currentQueryContext(); + for (const auto & x : test_settings) + context->setSetting(x.first, x.second); GlutenWriteSettings settings{.task_write_tmp_dir = outputPath}; settings.set(context); - SparkMergeTreeWritePartitionSettings partition_settings{.part_name_prefix = "pipline_prefix"}; + SparkMergeTreeWritePartitionSettings partition_settings{.part_name_prefix = "_1"}; partition_settings.set(context); auto input_json = input.value_or(replaceLocalFilesWithTPCH(EMBEDDED_PLAN(_3_mergetree_plan_input_))); @@ -284,15 +285,20 @@ void writeMerge( } INCBIN(_3_mergetree_plan_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/3_one_pipeline.json"); INCBIN(_4_mergetree_plan_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/4_one_pipeline.json"); +INCBIN(_lowcard_plan_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/lowcard.json"); +INCBIN(_case_sensitive_plan_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/case_sensitive.json"); TEST(MergeTree, Pipeline) { + // context->setSetting("mergetree.max_num_part_per_merge_task", 1); writeMerge( EMBEDDED_PLAN(_3_mergetree_plan_), "tmp/lineitem_mergetree", + {{"min_insert_block_size_rows", 100000} + /*, {"optimize.minFileSize", 1024 * 1024 * 10}*/}, [&](const DB::Block & block) { EXPECT_EQ(1, block.rows()); - debug::headBlock(block); + std::cerr << debug::verticalShowString(block, 10, 50) << std::endl; }); } @@ -301,9 +307,38 @@ TEST(MergeTree, PipelineWithPartition) writeMerge( EMBEDDED_PLAN(_4_mergetree_plan_), "tmp/lineitem_mergetree_p", + {}, [&](const DB::Block & block) { EXPECT_EQ(3815, block.rows()); - debug::headBlock(block); + std::cerr << debug::showString(block, 50, 50) << std::endl; + }); +} + +TEST(MergeTree, lowcard) +{ + writeMerge( + EMBEDDED_PLAN(_lowcard_plan_), + "tmp/lineitem_mergetre_lowcard", + {}, + [&](const DB::Block & block) + { + EXPECT_EQ(1, block.rows()); + std::cerr << debug::verticalShowString(block, 10, 50) << std::endl; + }); +} + +TEST(MergeTree, case_sensitive) +{ + //TODO: case_sensitive + GTEST_SKIP(); + writeMerge( + EMBEDDED_PLAN(_case_sensitive_plan_), + "tmp/LINEITEM_MERGETREE_CASE_SENSITIVE", + {}, + [&](const DB::Block & block) + { + EXPECT_EQ(1, block.rows()); + std::cerr << debug::showString(block, 20, 50) << std::endl; }); } \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/json/mergetree/case_sensitive.json b/cpp-ch/local-engine/tests/json/mergetree/case_sensitive.json new file mode 100644 index 000000000000..6ca35e5e6f3d --- /dev/null +++ b/cpp-ch/local-engine/tests/json/mergetree/case_sensitive.json @@ -0,0 +1,792 @@ +{ + "extensions": [ + { + "extensionFunction": { + "functionAnchor": 3, + "name": "alias:date" + } + }, + { + "extensionFunction": { + "functionAnchor": 2, + "name": "alias:str" + } + }, + { + "extensionFunction": { + "functionAnchor": 1, + "name": "alias:fp64" + } + }, + { + "extensionFunction": { + "name": "alias:i64" + } + } + ], + "relations": [ + { + "root": { + "input": { + "write": { + "namedTable": { + "advancedExtension": { + "optimization": { + "@type": "type.googleapis.com/local_engine.Write", + "common": { + "format": "mergetree" + }, + "mergetree": { + "database": "default", + "table": "lineitem_mergetree_case_sensitive", + "snapshotId": "1733308441225_0", + "orderByKey": "l_discount", + "storagePolicy": "default" + } + }, + "enhancement": { + "@type": "type.googleapis.com/substrait.Type", + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "nullability": "NULLABILITY_REQUIRED" + } + } + } + }, + "tableSchema": { + "names": [ + "L_ORDERKEY", + "L_PARTKEY", + "L_SUPPKEY", + "L_LINENUMBER", + "L_QUANTITY", + "L_EXTENDEDPRICE", + "L_DISCOUNT", + "L_TAX", + "L_RETURNFLAG", + "L_LINESTATUS", + "L_SHIPDATE", + "L_COMMITDATE", + "L_RECEIPTDATE", + "L_SHIPINSTRUCT", + "L_SHIPMODE", + "L_COMMENT" + ], + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ] + }, + "columnTypes": [ + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "PARTITION_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL" + ] + }, + "input": { + "project": { + "common": { + "emit": { + "outputMapping": [ + 16, + 17, + 18, + 19, + 20, + 21, + 22, + 23, + 24, + 25, + 26, + 27, + 28, + 29, + 30, + 31 + ] + } + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "baseSchema": { + "names": [ + "l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate", + "l_commitdate", + "l_receiptdate", + "l_shipinstruct", + "l_shipmode", + "l_comment" + ], + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ] + }, + "columnTypes": [ + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL" + ] + }, + "advancedExtension": { + "optimization": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "isMergeTree=0\n" + } + } + } + }, + "expressions": [ + { + "scalarFunction": { + "outputType": { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": {} + } + } + } + } + ] + } + }, + { + "scalarFunction": { + "outputType": { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + } + } + } + } + ] + } + }, + { + "scalarFunction": { + "outputType": { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 2 + } + } + } + } + } + ] + } + }, + { + "scalarFunction": { + "outputType": { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 3 + } + } + } + } + } + ] + } + }, + { + "scalarFunction": { + "functionReference": 1, + "outputType": { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 4 + } + } + } + } + } + ] + } + }, + { + "scalarFunction": { + "functionReference": 1, + "outputType": { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 5 + } + } + } + } + } + ] + } + }, + { + "scalarFunction": { + "functionReference": 1, + "outputType": { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 6 + } + } + } + } + } + ] + } + }, + { + "scalarFunction": { + "functionReference": 1, + "outputType": { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 7 + } + } + } + } + } + ] + } + }, + { + "scalarFunction": { + "functionReference": 2, + "outputType": { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 8 + } + } + } + } + } + ] + } + }, + { + "scalarFunction": { + "functionReference": 2, + "outputType": { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 9 + } + } + } + } + } + ] + } + }, + { + "scalarFunction": { + "functionReference": 3, + "outputType": { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 10 + } + } + } + } + } + ] + } + }, + { + "scalarFunction": { + "functionReference": 3, + "outputType": { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 11 + } + } + } + } + } + ] + } + }, + { + "scalarFunction": { + "functionReference": 3, + "outputType": { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 12 + } + } + } + } + } + ] + } + }, + { + "scalarFunction": { + "functionReference": 2, + "outputType": { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 13 + } + } + } + } + } + ] + } + }, + { + "scalarFunction": { + "functionReference": 2, + "outputType": { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 14 + } + } + } + } + } + ] + } + }, + { + "scalarFunction": { + "functionReference": 2, + "outputType": { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 15 + } + } + } + } + } + ] + } + } + ] + } + } + } + }, + "outputSchema": { + "nullability": "NULLABILITY_REQUIRED" + } + } + } + ] +} \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/json/mergetree/lowcard.json b/cpp-ch/local-engine/tests/json/mergetree/lowcard.json new file mode 100644 index 000000000000..d16a3507791e --- /dev/null +++ b/cpp-ch/local-engine/tests/json/mergetree/lowcard.json @@ -0,0 +1,378 @@ +{ + "relations": [ + { + "root": { + "input": { + "write": { + "namedTable": { + "advancedExtension": { + "optimization": { + "@type": "type.googleapis.com/local_engine.Write", + "common": { + "format": "mergetree" + }, + "mergetree": { + "database": "default", + "table": "lineitem_mergetree_lowcard", + "snapshotId": "1733306791131_0", + "orderByKey": "tuple()", + "lowCardKey": "l_returnflag,l_linestatus,l_quantity", + "storagePolicy": "default" + } + }, + "enhancement": { + "@type": "type.googleapis.com/substrait.Type", + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "nullability": "NULLABILITY_REQUIRED" + } + } + } + }, + "tableSchema": { + "names": [ + "l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate", + "l_commitdate", + "l_receiptdate", + "l_shipinstruct", + "l_shipmode", + "l_comment" + ], + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ] + }, + "columnTypes": [ + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL" + ] + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "baseSchema": { + "names": [ + "l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate", + "l_commitdate", + "l_receiptdate", + "l_shipinstruct", + "l_shipmode", + "l_comment" + ], + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ] + }, + "columnTypes": [ + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL" + ] + }, + "advancedExtension": { + "optimization": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "isMergeTree=0\n" + } + } + } + } + } + }, + "outputSchema": { + "nullability": "NULLABILITY_REQUIRED" + } + } + } + ] +} \ No newline at end of file