Skip to content

Commit

Permalink
[GLUTEN-6564] [CH] add a profile for mergetree build #6564
Browse files Browse the repository at this point in the history
  • Loading branch information
loudongfeng committed Jul 23, 2024
1 parent 08115e1 commit a7de172
Show file tree
Hide file tree
Showing 26 changed files with 170 additions and 63 deletions.
35 changes: 34 additions & 1 deletion backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
<packaging>jar</packaging>
<name>Gluten Backends ClickHouse</name>

<properties>
<mergetree.source.path>src/main/dummy</mergetree.source.path>
<mergetree.test.path>src/test/dummy</mergetree.test.path>
<mergetree.delta.path>src/test/dummy</mergetree.delta.path>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
Expand Down Expand Up @@ -249,6 +254,21 @@
</dependency>
</dependencies>


<profiles>
<profile>
<id>mergetree</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<mergetree.source.path>src/main/mergetree</mergetree.source.path>
<mergetree.test.path>src/test/mergetree</mergetree.test.path>
<mergetree.delta.path>src/main/delta-${delta.binary.version}</mergetree.delta.path>
</properties>
</profile>
</profiles>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
Expand Down Expand Up @@ -359,7 +379,20 @@
</goals>
<configuration>
<sources>
<source>src/main/delta-${delta.binary.version}</source>
<source>${mergetree.source.path}</source>
<source>${mergetree.delta.path}</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-sources-test</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${mergetree.test.path}</source>
</sources>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.apache.gluten.utils

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.util.collection.BitSet

object MergeTreeUtil {
def checkMergeTreeFileFormat(relation: HadoopFsRelation): Boolean = false

def includedDeltaOperator(scanExec: FileSourceScanExec): Boolean = false

def ifMergeTree(relation: HadoopFsRelation): Boolean = false

def partsPartitions(relation: HadoopFsRelation,
selectedPartitions: Array[PartitionDirectory],
output: Seq[Attribute],
bucketedScan: Boolean,
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean,
filterExprs: Seq[Expression]): Seq[InputPartition] = Nil

def injectMergeTreeWriter(): Unit = {}

def cleanup(): Unit = {}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package org.apache.gluten.utils

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaLog, DeltaLogFileIndex}
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.util.collection.BitSet

object MergeTreeUtil {
def checkMergeTreeFileFormat(relation: HadoopFsRelation): Boolean = {
relation.location.isInstanceOf[TahoeFileIndex] &&
relation.fileFormat.isInstanceOf[DeltaMergeTreeFileFormat]
}

def includedDeltaOperator(scanExec: FileSourceScanExec): Boolean = {
scanExec.relation.location.isInstanceOf[DeltaLogFileIndex]
}

def ifMergeTree(relation: HadoopFsRelation): Boolean = {
relation.location match {
case _: TahoeFileIndex
if relation.fileFormat
.isInstanceOf[DeltaMergeTreeFileFormat] => true
case _ => false
}
}

def partsPartitions( relation: HadoopFsRelation,
selectedPartitions: Array[PartitionDirectory],
output: Seq[Attribute],
bucketedScan: Boolean,
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean,
filterExprs: Seq[Expression]): Seq[InputPartition] = {
ClickHouseTableV2
.partsPartitions(
relation.location.asInstanceOf[TahoeFileIndex].deltaLog,
relation,
selectedPartitions,
output,
bucketedScan,
optionalBucketSet,
optionalNumCoalescedBuckets,
disableBucketedScan,
filterExprs
)
}

def injectMergeTreeWriter(): Unit = {
GlutenMergeTreeWriterInjects.setInstance(new CHMergeTreeWriterInjects())
}

def cleanup(): Unit = {
ClickhouseSnapshot.clearAllFileStatusCache
DeltaLog.clearCache()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.delta.ClickhouseSnapshot
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.clickhouse.MergeTreePartFilterReturnedRange
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.util.collection.BitSet

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@ import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.memory.alloc.CHNativeMemoryAllocators
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.`type`.ColumnTypeNode
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.`type`.ColumnTypeNode
import org.apache.gluten.substrait.expression.{ExpressionBuilder, StringMapNode}
import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode, ExtensionBuilder}
import org.apache.gluten.substrait.plan.PlanBuilder
import org.apache.gluten.substrait.rel.{ExtensionTableBuilder, RelBuilder}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, GlutenFormatWriterInjectsBase, OutputWriter}
import org.apache.spark.sql.execution.datasources.orc.OrcUtils
import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil
import org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeOutputWriter
import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, GlutenFormatWriterInjectsBase, OutputWriter}
import org.apache.spark.sql.types.StructType

import com.google.common.collect.Lists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ package org.apache.spark.sql.execution.datasources.v1.clickhouse
import org.apache.gluten.execution.datasource.GlutenRowSplitter

import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec}
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.FileFormatWriter.ConcurrentOutputWriterSpec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StringType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,24 @@
*/
package org.apache.spark.sql.execution.datasources.v1.clickhouse

import org.apache.spark.{SparkException, TaskContext, TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.delta.constraints.Constraint
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.FileFormatWriter.{ConcurrentOutputWriterSpec, OutputSpec, processStats}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.FileFormatWriter.{processStats, ConcurrentOutputWriterSpec, OutputSpec}
import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.{SparkException, TaskContext, TaskOutputFileAlreadyExistException}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import org.apache.gluten.vectorized.CHColumnVector

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, FakeRow, OutputWriter}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddFileTags
import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, FakeRow, OutputWriter}
import org.apache.spark.util.Utils

import scala.collection.mutable.ArrayBuffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.gluten.backendsapi.ListenerApi
import org.apache.gluten.execution.CHBroadcastBuildSideCache
import org.apache.gluten.execution.datasource.{GlutenOrcWriterInjects, GlutenParquetWriterInjects, GlutenRowSplitter}
import org.apache.gluten.expression.UDFMappings
import org.apache.gluten.utils.MergeTreeUtil
import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator, JniLibLoader}

import org.apache.spark.{SparkConf, SparkContext}
Expand Down Expand Up @@ -104,7 +105,7 @@ class CHListenerApi extends ListenerApi with Logging {
// FIXME: The following set instances twice in local mode?
GlutenParquetWriterInjects.setInstance(new CHParquetWriterInjects())
GlutenOrcWriterInjects.setInstance(new CHOrcWriterInjects())
GlutenMergeTreeWriterInjects.setInstance(new CHMergeTreeWriterInjects())
MergeTreeUtil.injectMergeTreeWriter()
GlutenRowSplitter.setInstance(new CHRowSplitter())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverri
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode}
import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
import org.apache.gluten.utils.{CHJoinValidateUtil, MergeTreeUtil, UnknownJoinStrategy}
import org.apache.gluten.vectorized.CHColumnarBatchSerializer

import org.apache.spark.{ShuffleDependency, SparkException}
Expand All @@ -44,12 +44,10 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, HashPartitioning, Partitioning, RangePartitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, WriteJobDescription}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BuildSideRelation, ClickHouseBuildSideRelation, HashedRelationBroadcastMode}
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -138,8 +136,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
child: SparkPlan): FilterExecTransformerBase = {

def checkMergeTreeFileFormat(relation: HadoopFsRelation): Boolean = {
relation.location.isInstanceOf[TahoeFileIndex] &&
relation.fileFormat.isInstanceOf[DeltaMergeTreeFileFormat]
MergeTreeUtil.checkMergeTreeFileFormat(relation)
}

child match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,15 @@ import org.apache.gluten.backendsapi.TransformerApi
import org.apache.gluten.execution.CHHashAggregateExecTransformer
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.substrait.expression.{BooleanLiteralNode, ExpressionBuilder, ExpressionNode}
import org.apache.gluten.utils.{CHInputPartitionsUtil, ExpressionDocUtil}
import org.apache.gluten.utils.{CHInputPartitionsUtil, ExpressionDocUtil, MergeTreeUtil}

import org.apache.spark.internal.Logging
import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.BitSet

Expand All @@ -53,34 +50,28 @@ class CHTransformerApi extends TransformerApi with Logging {
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean,
filterExprs: Seq[Expression]): Seq[InputPartition] = {
relation.location match {
case index: TahoeFileIndex
if relation.fileFormat
.isInstanceOf[DeltaMergeTreeFileFormat] =>
// Generate NativeMergeTreePartition for MergeTree
ClickHouseTableV2
.partsPartitions(
index.deltaLog,
relation,
selectedPartitions,
output,
bucketedScan,
optionalBucketSet,
optionalNumCoalescedBuckets,
disableBucketedScan,
filterExprs
)
case _ =>
// Generate FilePartition for Parquet
CHInputPartitionsUtil(
relation,
requiredSchema,
selectedPartitions,
output,
bucketedScan,
optionalBucketSet,
optionalNumCoalescedBuckets,
disableBucketedScan).genInputPartitionSeq()

if (MergeTreeUtil.ifMergeTree(relation)) {
MergeTreeUtil.partsPartitions(
relation,
selectedPartitions,
output,
bucketedScan,
optionalBucketSet,
optionalNumCoalescedBuckets,
disableBucketedScan,
filterExprs)
} else {
// Generate FilePartition for Parquet
CHInputPartitionsUtil(
relation,
requiredSchema,
selectedPartitions,
output,
bucketedScan,
optionalBucketSet,
optionalNumCoalescedBuckets,
disableBucketedScan).genInputPartitionSeq()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import org.apache.gluten.expression.ExpressionConverter
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.expression.SelectionNode
import org.apache.gluten.substrait.plan.PlanNode
import org.apache.gluten.utils.CHExpressionUtil
import org.apache.gluten.utils.{CHExpressionUtil, MergeTreeUtil}
import org.apache.gluten.validate.NativePlanValidationInfo
import org.apache.gluten.vectorized.CHNativeExpressionEvaluator

import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.utils.RangePartitionerBoundsGenerator
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning}
import org.apache.spark.sql.delta.DeltaLogFileIndex
import org.apache.spark.sql.execution.{CommandResultExec, FileSourceScanExec, RDDScanExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
Expand Down Expand Up @@ -79,7 +78,7 @@ class CHValidatorApi extends ValidatorApi with AdaptiveSparkPlanHelper with Logg
// df.summary().show(100, false)

def includedDeltaOperator(scanExec: FileSourceScanExec): Boolean = {
scanExec.relation.location.isInstanceOf[DeltaLogFileIndex]
MergeTreeUtil.includedDeltaOperator(scanExec)
}

val includedUnsupportedPlans = collect(plan) {
Expand Down
Loading

0 comments on commit a7de172

Please sign in to comment.