Skip to content

Commit

Permalink
Merge branch 'main' into GPIP
Browse files Browse the repository at this point in the history
  • Loading branch information
yikf authored Dec 4, 2024
2 parents b220e37 + d7331be commit 59a5af5
Show file tree
Hide file tree
Showing 63 changed files with 1,585 additions and 280 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/velox_weekly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ jobs:
runs-on: ubuntu-20.04
container: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- name: Update mirror list
run: |
sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true
Expand Down Expand Up @@ -72,7 +71,7 @@ jobs:
mv apache-maven-3.8.8 /usr/lib/maven && \
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
cd $GITHUB_WORKSPACE/ && \
git clone -b main --depth=1 https://github.com/apache/incubator-gluten.git && cd incubator-gluten/
./dev/package.sh
build-on-ubuntu:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,15 @@ case class FileDeltaColumnarWrite(
* {{{
* part-00000-7d672b28-c079-4b00-bb0a-196c15112918-c000.snappy.parquet
* =>
* part-00000-{}.snappy.parquet
* part-00000-{id}.snappy.parquet
* }}}
*/
val guidPattern =
""".*-([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})(?:-c(\d+)\..*)?$""".r
val fileNamePattern =
guidPattern.replaceAllIn(writeFileName, m => writeFileName.replace(m.group(1), "{}"))
guidPattern.replaceAllIn(
writeFileName,
m => writeFileName.replace(m.group(1), FileNamePlaceHolder.ID))

logDebug(s"Native staging write path: $writePath and with pattern: $fileNamePattern")
val settings =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,20 +246,11 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}
}

def validateBucketSpec(): Option[String] = {
if (bucketSpec.nonEmpty) {
Some("Unsupported native write: bucket write is not supported.")
} else {
None
}
}

validateCompressionCodec()
.orElse(validateFileFormat())
.orElse(validateFieldMetadata())
.orElse(validateDateTypes())
.orElse(validateWriteFilesOptions())
.orElse(validateBucketSpec()) match {
.orElse(validateWriteFilesOptions()) match {
case Some(reason) => ValidationResult.failed(reason)
case _ => ValidationResult.succeeded
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.plan.PlanNode
import org.apache.gluten.substrait.rel._
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.vectorized.{BatchIterator, CHNativeExpressionEvaluator, CloseableCHColumnBatchIterator, NativeExpressionEvaluator}
import org.apache.gluten.vectorized.{BatchIterator, CHNativeExpressionEvaluator, CloseableCHColumnBatchIterator}

import org.apache.spark.{InterruptibleIterator, SparkConf, TaskContext}
import org.apache.spark.affinity.CHAffinity
Expand Down Expand Up @@ -322,17 +322,6 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
createNativeIterator(splitInfoByteArray, wsPlan, materializeInput, inputIterators))
}

/**
* This function used to inject the staging write path before initializing the native plan.Only
* used in a pipeline model (spark 3.5) for writing parquet or orc files.
*/
override def injectWriteFilesTempPath(path: String, fileName: String): Unit = {
val settings =
Map(
RuntimeSettings.TASK_WRITE_TMP_DIR.key -> path,
RuntimeSettings.TASK_WRITE_FILENAME.key -> fileName)
NativeExpressionEvaluator.updateQueryRuntimeSettings(settings)
}
}

class CollectMetricIterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,14 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
s"SampleTransformer metrics update is not supported in CH backend")
}

override def genUnionTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
throw new UnsupportedOperationException(
"UnionExecTransformer metrics update is not supported in CH backend")

override def genUnionTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
throw new UnsupportedOperationException(
"UnionExecTransformer metrics update is not supported in CH backend")

def genWriteFilesTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"physicalWrittenBytes" -> SQLMetrics.createMetric(sparkContext, "number of written bytes"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ object CHRuleApi {
injector.injectPreTransform(_ => RewriteSubqueryBroadcast())
injector.injectPreTransform(c => FallbackBroadcastHashJoin.apply(c.session))
injector.injectPreTransform(c => MergeTwoPhasesHashBaseAggregate.apply(c.session))
injector.injectPreTransform(_ => WriteFilesWithBucketValue)

// Legacy: The legacy transform rule.
val validatorBuilder: GlutenConfig => Validator = conf =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ object RuntimeSettings {
.stringConf
.createWithDefault("")

val TASK_WRITE_FILENAME =
buildConf(runtimeSettings("gluten.task_write_filename"))
.doc("The temporary file name for writing data")
.stringConf
.createWithDefault("")

val TASK_WRITE_FILENAME_PATTERN =
buildConf(runtimeSettings("gluten.task_write_filename_pattern"))
.doc("The pattern to generate file name for writing delta parquet in spark 3.5")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.extension

import org.apache.gluten.GlutenConfig

import org.apache.spark.sql.catalyst.expressions.{Alias, BitwiseAnd, Expression, HiveHash, Literal, Pmod}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.WriteFilesExec

/**
* Wrap with bucket value to specify the bucket file name in native write. Native writer will remove
* this value in the final output.
*/
object WriteFilesWithBucketValue extends Rule[SparkPlan] {

val optionForHiveCompatibleBucketWrite = "__hive_compatible_bucketed_table_insertion__"

override def apply(plan: SparkPlan): SparkPlan = {
if (
GlutenConfig.getConf.enableGluten
&& GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
) {
plan.transformDown {
case writeFiles: WriteFilesExec if writeFiles.bucketSpec.isDefined =>
val bucketIdExp = getWriterBucketIdExp(writeFiles)
val wrapBucketValue = ProjectExec(
writeFiles.child.output :+ Alias(bucketIdExp, "__bucket_value__")(),
writeFiles.child)
writeFiles.copy(child = wrapBucketValue)
}
} else {
plan
}
}

private def getWriterBucketIdExp(writeFilesExec: WriteFilesExec): Expression = {
val partitionColumns = writeFilesExec.partitionColumns
val outputColumns = writeFilesExec.child.output
val dataColumns = outputColumns.filterNot(partitionColumns.contains)
val bucketSpec = writeFilesExec.bucketSpec.get
val bucketColumns = bucketSpec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
if (writeFilesExec.options.getOrElse(optionForHiveCompatibleBucketWrite, "false") == "true") {
val hashId = BitwiseAnd(HiveHash(bucketColumns), Literal(Int.MaxValue))
Pmod(hashId, Literal(bucketSpec.numBuckets))
// The bucket file name prefix is following Hive, Presto and Trino conversion, so this
// makes sure Hive bucketed table written by Spark, can be read by other SQL engines.
//
// Hive: `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()`.
// Trino: `io.trino.plugin.hive.BackgroundHiveSplitLoader#BUCKET_PATTERNS`.

} else {
// Spark bucketed table: use `HashPartitioning.partitionIdExpression` as bucket id
// expression, so that we can guarantee the data distribution is same between shuffle and
// bucketed data source, which enables us to only shuffle one side when join a bucketed
// table and a normal one.
HashPartitioning(bucketColumns, bucketSpec.numBuckets).partitionIdExpression
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*/
package org.apache.spark.sql.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.backendsapi.clickhouse.RuntimeSettings
import org.apache.gluten.vectorized.NativeExpressionEvaluator

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
Expand All @@ -25,11 +26,11 @@ import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.delta.stats.DeltaJobStatisticsTracker
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, BasicWriteTaskStats, ExecutedWriteSummary, PartitioningUtils, WriteJobDescription, WriteTaskResult, WriteTaskStatsTracker}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.util.Utils

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobID, OutputCommitter, TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

Expand Down Expand Up @@ -102,6 +103,12 @@ object CreateFileNameSpec {
}
}

// More details in local_engine::FileNameGenerator in NormalFileWriter.cpp
object FileNamePlaceHolder {
val ID = "{id}"
val BUCKET = "{bucket}"
}

/** [[HadoopMapReduceAdapter]] for [[HadoopMapReduceCommitProtocol]]. */
case class HadoopMapReduceAdapter(sparkCommitter: HadoopMapReduceCommitProtocol) {
private lazy val committer: OutputCommitter = {
Expand Down Expand Up @@ -132,12 +139,26 @@ case class HadoopMapReduceAdapter(sparkCommitter: HadoopMapReduceCommitProtocol)
GetFilename.invoke(sparkCommitter, taskContext, spec).asInstanceOf[String]
}

def getTaskAttemptTempPathAndFilename(
def getTaskAttemptTempPathAndFilePattern(
taskContext: TaskAttemptContext,
description: WriteJobDescription): (String, String) = {
val stageDir = newTaskAttemptTempPath(description.path)
val filename = getFilename(taskContext, CreateFileNameSpec(taskContext, description))
(stageDir, filename)

if (isBucketWrite(description)) {
val filePart = getFilename(taskContext, FileNameSpec("", ""))
val fileSuffix = CreateFileNameSpec(taskContext, description).suffix
(stageDir, s"${filePart}_${FileNamePlaceHolder.BUCKET}$fileSuffix")
} else {
val filename = getFilename(taskContext, CreateFileNameSpec(taskContext, description))
(stageDir, filename)
}
}

private def isBucketWrite(desc: WriteJobDescription): Boolean = {
// In Spark 3.2, bucketSpec is not defined, instead, it uses bucketIdExpression.
val bucketSpecField: Field = desc.getClass.getDeclaredField("bucketSpec")
bucketSpecField.setAccessible(true)
bucketSpecField.get(desc).asInstanceOf[Option[_]].isDefined
}
}

Expand Down Expand Up @@ -234,10 +255,15 @@ case class HadoopMapReduceCommitProtocolWrite(
* initializing the native plan and collect native write files metrics for each backend.
*/
override def doSetupNativeTask(): Unit = {
val (writePath, writeFileName) =
adapter.getTaskAttemptTempPathAndFilename(taskAttemptContext, description)
logDebug(s"Native staging write path: $writePath and file name: $writeFileName")
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath, writeFileName)
val (writePath, writeFilePattern) =
adapter.getTaskAttemptTempPathAndFilePattern(taskAttemptContext, description)
logDebug(s"Native staging write path: $writePath and file pattern: $writeFilePattern")

val settings =
Map(
RuntimeSettings.TASK_WRITE_TMP_DIR.key -> writePath,
RuntimeSettings.TASK_WRITE_FILENAME_PATTERN.key -> writeFilePattern)
NativeExpressionEvaluator.updateQueryRuntimeSettings(settings)
}

def doCollectNativeResult(stats: Seq[InternalRow]): Option[WriteTaskResult] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu
}
val SPARK_DIR_NAME: String = sparkVersion.replace(".", "-")

val S3_METADATA_PATH = s"/tmp/metadata/s3/$SPARK_DIR_NAME/"
val S3_METADATA_PATH = s"/tmp/metadata/s3/$SPARK_DIR_NAME"
val S3_CACHE_PATH = s"/tmp/s3_cache/$SPARK_DIR_NAME/"
val S3_ENDPOINT = "s3://127.0.0.1:9000/"
val MINIO_ENDPOINT: String = S3_ENDPOINT.replace("s3", "http")
val BUCKET_NAME: String = SPARK_DIR_NAME
val WHOLE_PATH: String = MINIO_ENDPOINT + BUCKET_NAME + "/"

val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$SPARK_DIR_NAME/"
val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$SPARK_DIR_NAME"
val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$SPARK_DIR_NAME/"
val HDFS_URL_ENDPOINT = "hdfs://127.0.0.1:8020"
val HDFS_URL = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ class GlutenClickHouseNativeWriteTableSuite
// spark write does not support bucketed table
// https://issues.apache.org/jira/browse/SPARK-19256
val table_name = table_name_template.format(format)
writeAndCheckRead(origin_table, table_name, fields_.keys.toSeq, isSparkVersionLE("3.3")) {
writeAndCheckRead(origin_table, table_name, fields_.keys.toSeq) {
fields =>
spark
.table("origin_table")
Expand Down Expand Up @@ -589,16 +589,17 @@ class GlutenClickHouseNativeWriteTableSuite
("byte_field", "byte"),
("boolean_field", "boolean"),
("decimal_field", "decimal(23,12)"),
("date_field", "date"),
("timestamp_field", "timestamp")
("date_field", "date")
// ("timestamp_field", "timestamp")
// FIXME https://github.com/apache/incubator-gluten/issues/8053
)
val origin_table = "origin_table"
withSource(genTestData(), origin_table) {
nativeWrite {
format =>
val table_name = table_name_template.format(format)
val testFields = fields.keys.toSeq
writeAndCheckRead(origin_table, table_name, testFields, isSparkVersionLE("3.3")) {
writeAndCheckRead(origin_table, table_name, testFields) {
fields =>
spark
.table(origin_table)
Expand Down Expand Up @@ -658,7 +659,7 @@ class GlutenClickHouseNativeWriteTableSuite
nativeWrite {
format =>
val table_name = table_name_template.format(format)
writeAndCheckRead(origin_table, table_name, fields.keys.toSeq, isSparkVersionLE("3.3")) {
writeAndCheckRead(origin_table, table_name, fields.keys.toSeq) {
fields =>
spark
.table("origin_table")
Expand Down Expand Up @@ -762,7 +763,7 @@ class GlutenClickHouseNativeWriteTableSuite
format =>
val table_name = table_name_template.format(format)
spark.sql(s"drop table IF EXISTS $table_name")
withNativeWriteCheck(checkNative = isSparkVersionLE("3.3")) {
withNativeWriteCheck(checkNative = true) {
spark
.range(10000000)
.selectExpr("id", "cast('2020-01-01' as date) as p")
Expand Down Expand Up @@ -798,7 +799,7 @@ class GlutenClickHouseNativeWriteTableSuite
format =>
val table_name = table_name_template.format(format)
spark.sql(s"drop table IF EXISTS $table_name")
withNativeWriteCheck(checkNative = isSparkVersionLE("3.3")) {
withNativeWriteCheck(checkNative = true) {
spark
.range(30000)
.selectExpr("id", "cast(null as string) as p")
Expand Down
Loading

0 comments on commit 59a5af5

Please sign in to comment.