Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6067][CH] Support Spark3.5 with Scala2.13 for CH backend #6311

Merged
merged 1 commit into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<version>1.13.5</version>
<version>1.17.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -126,13 +126,13 @@
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-mockito_2.12</artifactId>
<artifactId>scalatestplus-mockito_${scala.binary.version}</artifactId>
<version>1.0.0-M2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-scalacheck_2.12</artifactId>
<artifactId>scalatestplus-scalacheck_${scala.binary.version}</artifactId>
<version>3.1.0.0-RC2</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ClickhouseOptimisticTransaction(
def this(
deltaLog: DeltaLog,
catalogTable: Option[CatalogTable],
snapshotOpt: Option[Snapshot] = None) {
snapshotOpt: Option[Snapshot] = None) = {
this(
deltaLog,
catalogTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.sql.delta.stats.StatisticsCollection
import org.apache.spark.sql.delta.util.DeltaCommitFileProvider
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.StateCache
import org.apache.spark.sql.util.ScalaExtensions._
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.sql._
Expand Down Expand Up @@ -126,7 +125,27 @@ class Snapshot(
* This potentially triggers an IO operation to read the inCommitTimestamp.
* This is a lazy val, so repeated calls will not trigger multiple IO operations.
*/
protected lazy val getInCommitTimestampOpt: Option[Long] =
protected lazy val getInCommitTimestampOpt: Option[Long] = {
// --- modified start
// This implicit is for scala 2.12, copy from scala 2.13
implicit class OptionExtCompanion(opt: Option.type) {
/**
* When a given condition is true, evaluates the a argument and returns Some(a).
* When the condition is false, a is not evaluated and None is returned.
*/
def when[A](cond: Boolean)(a: => A): Option[A] = if (cond) Some(a) else None

/**
* When a given condition is false, evaluates the a argument and returns Some(a).
* When the condition is true, a is not evaluated and None is returned.
*/
def whenNot[A](cond: Boolean)(a: => A): Option[A] = if (!cond) Some(a) else None

/** Sum up all the `options`, substituting `default` for each `None`. */
def sum[N: Numeric](default: N)(options: Option[N]*): N =
options.map(_.getOrElse(default)).sum
}
// --- modified end
Option.when(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(metadata)) {
_reconstructedProtocolMetadataAndICT.inCommitTimestamp
.getOrElse {
Expand Down Expand Up @@ -158,6 +177,7 @@ class Snapshot(
}
}
}
}


private[delta] lazy val nonFileActions: Seq[Action] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ trait VacuumCommandImpl extends DeltaCommand {
// This is never going to be a path relative to `basePath` for DVs.
None
}
case None => None
case _ => None
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata)
setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
clickhouseTableConfigs: Map[String, String],
partitionColumns: Seq[String]) {
partitionColumns: Seq[String]) = {
this(protocol, metadata)
this.database = database
this.tableName = tableName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
}
dataSchema += newField
}
StructType(dataSchema)
StructType(dataSchema.toSeq)
}

private def createNativeIterator(
Expand Down Expand Up @@ -114,7 +114,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
if (scan.fileFormat == ReadFileFormat.TextReadFormat) {
val names =
ConverterUtils.collectAttributeNamesWithoutExprId(scan.outputAttributes())
localFilesNode.setFileSchema(getFileSchema(scan.getDataSchema, names.asScala))
localFilesNode.setFileSchema(getFileSchema(scan.getDataSchema, names.asScala.toSeq))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,12 @@ object MetricsUtil extends Logging {

/** Get all processors */
def getAllProcessorList(metricData: MetricsData): Seq[MetricsProcessor] = {
metricData.steps.asScala.flatMap(
step => {
step.processors.asScala
})
metricData.steps.asScala
.flatMap(
step => {
step.processors.asScala
})
.toSeq
}

/** Update extra time metric by the processors */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
sparkSession
)
}
partitions
partitions.toSeq
}

def genInputPartitionSeq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,12 @@ abstract class MergeTreeFileFormatDataWriter(
releaseResources()
val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs {
// committer.commitTask(taskAttemptContext)
val statuses = returnedMetrics.map(
v => {
v._2
})
val statuses = returnedMetrics
.map(
v => {
v._2
})
.toSeq
new TaskCommitMessage(statuses)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.types.DoubleType
import java.util.concurrent.ForkJoinPool

import scala.collection.parallel.ForkJoinTaskSupport
import scala.collection.parallel.immutable.ParVector

class GlutenClickHouseTPCHParquetAQEConcurrentSuite
extends GlutenClickHouseTPCHAbstractSuite
Expand Down Expand Up @@ -74,7 +75,7 @@ class GlutenClickHouseTPCHParquetAQEConcurrentSuite

test("fix race condition at the global variable of ColumnarOverrideRules::isAdaptiveContext") {

val queries = ((1 to 22) ++ (1 to 22) ++ (1 to 22) ++ (1 to 22)).par
val queries = ParVector((1 to 22) ++ (1 to 22) ++ (1 to 22) ++ (1 to 22): _*)
queries.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(22))
queries.map(queryId => runTPCHQuery(queryId) { df => })

Expand Down
6 changes: 3 additions & 3 deletions backends-velox/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<version>1.13.5</version>
<version>1.17.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -113,13 +113,13 @@
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-mockito_2.12</artifactId>
<artifactId>scalatestplus-mockito_${scala.binary.version}</artifactId>
<version>1.0.0-M2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-scalacheck_2.12</artifactId>
<artifactId>scalatestplus-scalacheck_${scala.binary.version}</artifactId>
<version>3.1.0.0-RC2</version>
<scope>test</scope>
</dependency>
Expand Down
4 changes: 2 additions & 2 deletions gluten-celeborn/clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-mockito_2.12</artifactId>
<artifactId>scalatestplus-mockito_${scala.binary.version}</artifactId>
<version>1.0.0-M2</version>
<scope>test</scope>
</dependency>
Expand All @@ -138,7 +138,7 @@
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-scalacheck_2.12</artifactId>
<artifactId>scalatestplus-scalacheck_${scala.binary.version}</artifactId>
<version>3.1.0.0-RC2</version>
<scope>test</scope>
</dependency>
Expand Down
6 changes: 3 additions & 3 deletions gluten-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<version>1.13.5</version>
<version>1.17.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -111,13 +111,13 @@
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-mockito_2.12</artifactId>
<artifactId>scalatestplus-mockito_${scala.binary.version}</artifactId>
<version>1.0.0-M2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-scalacheck_2.12</artifactId>
<artifactId>scalatestplus-scalacheck_${scala.binary.version}</artifactId>
<version>3.1.0.0-RC2</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ private[gluten] class GlutenSessionExtensions extends (SparkSessionExtensions =>
}

private[gluten] trait GlutenSparkExtensionsInjector {
def inject(extensions: SparkSessionExtensions)
def inject(extensions: SparkSessionExtensions): Unit
}

private[gluten] object GlutenPlugin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
}

transformChildren(child, basicScanExecTransformers)
basicScanExecTransformers
basicScanExecTransformers.toSeq
}

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object ConverterUtils extends Logging {
}

def collectAttributeTypeNodes(attributes: JList[Attribute]): JList[TypeNode] = {
collectAttributeTypeNodes(attributes.asScala)
collectAttributeTypeNodes(attributes.asScala.toSeq)
}

def collectAttributeTypeNodes(attributes: Seq[Attribute]): JList[TypeNode] = {
Expand All @@ -85,7 +85,7 @@ object ConverterUtils extends Logging {
}

def collectAttributeNamesWithExprId(attributes: JList[Attribute]): JList[String] = {
collectAttributeNamesWithExprId(attributes.asScala)
collectAttributeNamesWithExprId(attributes.asScala.toSeq)
}

def collectAttributeNamesWithExprId(attributes: Seq[Attribute]): JList[String] = {
Expand Down Expand Up @@ -197,7 +197,7 @@ object ConverterUtils extends Logging {
val (field, nullable) = parseFromSubstraitType(typ)
StructField("", field, nullable)
}
(StructType(fields), isNullable(substraitType.getStruct.getNullability))
(StructType(fields.toSeq), isNullable(substraitType.getStruct.getNullability))
case Type.KindCase.LIST =>
val list = substraitType.getList
val (elementType, containsNull) = parseFromSubstraitType(list.getType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object UDFMappings extends Logging {
val pythonUDFMap: Map[String, String] = Map()
val scalaUDFMap: Map[String, String] = Map()

private def appendKVToMap(key: String, value: String, res: Map[String, String]) {
private def appendKVToMap(key: String, value: String, res: Map[String, String]): Unit = {
if (key.isEmpty || value.isEmpty()) {
throw new IllegalArgumentException(s"key:$key or value:$value is empty")
}
Expand All @@ -46,7 +46,7 @@ object UDFMappings extends Logging {
res.put(key.toLowerCase(Locale.ROOT), value)
}

private def parseStringToMap(input: String, res: Map[String, String]) {
private def parseStringToMap(input: String, res: Map[String, String]): Unit = {
input.split(",").map {
item =>
val keyValue = item.split(":")
Expand All @@ -57,7 +57,7 @@ object UDFMappings extends Logging {
}
}

def loadFromSparkConf(conf: SparkConf) {
def loadFromSparkConf(conf: SparkConf): Unit = {
val strHiveUDFs = conf.get(GlutenConfig.GLUTEN_SUPPORTED_HIVE_UDFS, "")
if (!StringUtils.isBlank(strHiveUDFs)) {
parseStringToMap(strHiveUDFs, hiveUDFMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ class EnumeratedApplier(session: SparkSession)
with Logging
with LogLevelUtil {
// An empirical value.
private val aqeStackTraceIndex = 16
private val aqeStackTraceIndex =
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) {
16
} else {
14
}
private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)

override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ class HeuristicApplier(session: SparkSession)
with Logging
with LogLevelUtil {
// This is an empirical value, may need to be changed for supporting other versions of spark.
private val aqeStackTraceIndex = 19
private val aqeStackTraceIndex =
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) {
19
} else {
17
}
private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)

override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object Validators {
if (buffer.isEmpty) {
NoopValidator
} else {
new ValidatorPipeline(buffer)
new ValidatorPipeline(buffer.toSeq)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object GlutenOptimization {
GlutenMetadataModel(),
GlutenPropertyModel(),
GlutenExplain,
RasRule.Factory.reuse(rules))
RasRule.Factory.reuse(rules.toSeq))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ abstract class AffinityManager extends LogLevelUtil with Logging {
rand.shuffle(hosts)
logOnLevel(logLevel, s"get host for $f: ${hosts.distinct.mkString(",")}")
}
hosts.distinct
hosts.distinct.toSeq
}

def updatePartitionMap(f: FilePartition, rddId: Int): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ object GlutenImplicits {
FallbackSummary(
totalNumGlutenNodes,
totalNumFallbackNodes,
totalPhysicalPlanDescription,
totalFallbackNodeToReason
totalPhysicalPlanDescription.toSeq,
totalFallbackNodeToReason.toSeq
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class ShuffledColumnarBatchRDD(
}
}

override def clearDependencies() {
override def clearDependencies(): Unit = {
super.clearDependencies()
dependency = null
}
Expand Down
Loading
Loading