Skip to content

Commit

Permalink
Merge branch 'main' into editDistance
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 authored Jun 20, 2024
2 parents 4dc210b + 79e1d58 commit f235d76
Show file tree
Hide file tree
Showing 342 changed files with 36,958 additions and 30,824 deletions.
2 changes: 2 additions & 0 deletions .github/ISSUE_TEMPLATE/bug.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ body:
options:
- Spark-3.2.x
- Spark-3.3.x
- Spark-3.4.x
- Spark-3.5.x
validations:
required: false

Expand Down
36 changes: 30 additions & 6 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ jobs:
matrix:
os: [ "ubuntu:20.04", "ubuntu:22.04" ]
spark: [ "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5" ]
java: [ "java-8", "java-17" ]
java: [ "java-8", "java-11", "java-17" ]
# Spark supports JDK17 since 3.3 and later, see https://issues.apache.org/jira/browse/SPARK-33772
exclude:
- spark: spark-3.2
Expand All @@ -96,8 +96,16 @@ jobs:
java: java-17
- spark: spark-3.5
java: java-17
- spark: spark-3.2
java: java-11
- spark: spark-3.3
java: java-11
- spark: spark-3.4
java: java-11
- os: ubuntu:22.04
java: java-17
- os: ubuntu:22.04
java: java-11
runs-on: ubuntu-20.04
container: ${{ matrix.os }}
steps:
Expand All @@ -116,10 +124,13 @@ jobs:
run: |
if [ "${{ matrix.java }}" = "java-17" ]; then
apt-get update && apt-get install -y openjdk-17-jdk maven
apt remove openjdk-11* -y
elif [ "${{ matrix.java }}" = "java-11" ]; then
apt-get update && apt-get install -y openjdk-11-jdk maven
else
apt-get update && apt-get install -y openjdk-8-jdk maven
apt remove openjdk-11* -y
fi
apt remove openjdk-11* -y
ls -l /root/.m2/repository/org/apache/arrow/arrow-dataset/15.0.0-gluten/
- name: Build and run TPCH/DS
run: |
Expand All @@ -141,7 +152,7 @@ jobs:
matrix:
os: [ "centos:7", "centos:8" ]
spark: [ "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5" ]
java: [ "java-8", "java-17" ]
java: [ "java-8", "java-11", "java-17" ]
# Spark supports JDK17 since 3.3 and later, see https://issues.apache.org/jira/browse/SPARK-33772
exclude:
- spark: spark-3.2
Expand All @@ -150,8 +161,16 @@ jobs:
java: java-17
- spark: spark-3.5
java: java-17
- spark: spark-3.2
java: java-11
- spark: spark-3.3
java: java-11
- spark: spark-3.4
java: java-11
- os: centos:7
java: java-17
- os: centos:7
java: java-11
runs-on: ubuntu-20.04
container: ${{ matrix.os }}
steps:
Expand All @@ -175,6 +194,8 @@ jobs:
run: |
if [ "${{ matrix.java }}" = "java-17" ]; then
yum update -y && yum install -y java-17-openjdk-devel wget
elif [ "${{ matrix.java }}" = "java-11" ]; then
yum update -y && yum install -y java-11-openjdk-devel wget
else
yum update -y && yum install -y java-1.8.0-openjdk-devel wget
fi
Expand All @@ -186,6 +207,8 @@ jobs:
echo "PATH=${PATH}:/usr/lib/maven/bin" >> $GITHUB_ENV
if [ "${{ matrix.java }}" = "java-17" ]; then
echo "JAVA_HOME=/usr/lib/jvm/java-17-openjdk" >> $GITHUB_ENV
elif [ "${{ matrix.java }}" = "java-11" ]; then
echo "JAVA_HOME=/usr/lib/jvm/java-11-openjdk" >> $GITHUB_ENV
else
echo "JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk" >> $GITHUB_ENV
fi
Expand Down Expand Up @@ -367,7 +390,7 @@ jobs:
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh queries \
--local --preset=velox --benchmark-type=ds --error-on-memleak -s=30.0 --off-heap-size=8g --threads=12 --shuffle-partitions=72 --iterations=1 \
--skip-data-gen --random-kill-tasks
--skip-data-gen --random-kill-tasks --no-session-reuse
# run-tpc-test-ubuntu-sf30:
# needs: build-native-lib-centos-7
Expand Down Expand Up @@ -461,7 +484,7 @@ jobs:
run: |
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
export export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk && \
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk && \
cd /opt && \
git clone -b branch-0.8 https://github.com/apache/incubator-uniffle.git && \
cd incubator-uniffle && \
Expand Down Expand Up @@ -510,13 +533,14 @@ jobs:
- name: Setup java and maven
run: |
apt-get update && apt-get install -y openjdk-8-jdk maven wget
apt remove openjdk-11* -y
echo "JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64" >> $GITHUB_ENV
- name: Build for Spark ${{ matrix.spark }}
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean install -P${{ matrix.spark }} -Pbackends-velox -Pceleborn -DskipTests
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with ${{ matrix.celeborn }}
run: |
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
EXTRA_PROFILE=""
if [ "${{ matrix.celeborn }}" = "celeborn-0.4.0" ]; then
EXTRA_PROFILE="-Pceleborn-0.4"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, DenseRank, Lag, Lead, NamedExpression, Rank, RowNumber}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, DenseRank, Expression, Lag, Lead, Literal, NamedExpression, Rank, RowNumber}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.execution.SparkPlan
Expand Down Expand Up @@ -225,10 +225,25 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
func => {
val aliasExpr = func.asInstanceOf[Alias]
val wExpression = WindowFunctionsBuilder.extractWindowExpression(aliasExpr.child)

def checkLagOrLead(third: Expression): Unit = {
third match {
case _: Literal =>
allSupported = allSupported
case _ =>
logInfo("Not support lag/lead function with default value not literal null")
allSupported = false
break
}
}

wExpression.windowFunction match {
case _: RowNumber | _: AggregateExpression | _: Rank | _: Lead | _: Lag |
_: DenseRank =>
case _: RowNumber | _: AggregateExpression | _: Rank | _: DenseRank =>
allSupported = allSupported
case l: Lag =>
checkLagOrLead(l.third)
case l: Lead =>
checkLagOrLead(l.third)
case _ =>
logDebug(s"Not support window function: ${wExpression.getClass}")
allSupported = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import org.apache.gluten.substrait.plan.PlanNode
import org.apache.gluten.substrait.rel._
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.utils.LogLevelUtil
import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator, CloseableCHColumnBatchIterator, GeneralInIterator}
import org.apache.gluten.vectorized.{BatchIterator, CHNativeExpressionEvaluator, CloseableCHColumnBatchIterator, GeneralInIterator}

import org.apache.spark.{InterruptibleIterator, SparkConf, TaskContext}
import org.apache.spark.affinity.CHAffinity
import org.apache.spark.executor.InputMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.FilePartition
Expand Down Expand Up @@ -209,46 +210,26 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
val splitInfoByteArray = inputPartition
.asInstanceOf[GlutenPartition]
.splitInfosByteArray
val resIter =
val nativeIter =
transKernel.createKernelWithBatchIterator(
inputPartition.plan,
splitInfoByteArray,
inBatchIters,
false)

val iter = new CollectMetricIterator(
nativeIter,
updateNativeMetrics,
updateInputMetrics,
context.taskMetrics().inputMetrics)

context.addTaskFailureListener(
(ctx, _) => {
if (ctx.isInterrupted()) {
resIter.cancel()
iter.cancel()
}
})
context.addTaskCompletionListener[Unit](_ => resIter.close())
val iter = new Iterator[Any] {
private val inputMetrics = context.taskMetrics().inputMetrics
private var outputRowCount = 0L
private var outputVectorCount = 0L
private var metricsUpdated = false

override def hasNext: Boolean = {
val res = resIter.hasNext
// avoid to collect native metrics more than once, 'hasNext' is a idempotent operation
if (!res && !metricsUpdated) {
val nativeMetrics = resIter.getMetrics.asInstanceOf[NativeMetrics]
nativeMetrics.setFinalOutputMetrics(outputRowCount, outputVectorCount)
updateNativeMetrics(nativeMetrics)
updateInputMetrics(inputMetrics)
metricsUpdated = true
}
res
}

override def next(): Any = {
val cb = resIter.next()
outputVectorCount += 1
outputRowCount += cb.numRows()
cb
}
}
context.addTaskCompletionListener[Unit](_ => iter.close())

// TODO: SPARK-25083 remove the type erasure hack in data source scan
new InterruptibleIterator(
Expand Down Expand Up @@ -288,51 +269,16 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
materializeInput
)

val resIter = new Iterator[ColumnarBatch] {
private var outputRowCount = 0L
private var outputVectorCount = 0L
private var metricsUpdated = false

override def hasNext: Boolean = {
val res = nativeIterator.hasNext
// avoid to collect native metrics more than once, 'hasNext' is a idempotent operation
if (!res && !metricsUpdated) {
val nativeMetrics = nativeIterator.getMetrics.asInstanceOf[NativeMetrics]
nativeMetrics.setFinalOutputMetrics(outputRowCount, outputVectorCount)
updateNativeMetrics(nativeMetrics)
metricsUpdated = true
}
res
}

override def next(): ColumnarBatch = {
val cb = nativeIterator.next()
outputVectorCount += 1
outputRowCount += cb.numRows()
cb
}
}
var closed = false
val cancelled = false

def close(): Unit = {
closed = true
nativeIterator.close()
// relationHolder.clear()
}

def cancel(): Unit = {
nativeIterator.cancel()
}
val iter = new CollectMetricIterator(nativeIterator, updateNativeMetrics, null, null)

context.addTaskFailureListener(
(ctx, _) => {
if (ctx.isInterrupted()) {
cancel()
iter.cancel()
}
})
context.addTaskCompletionListener[Unit](_ => close())
new CloseableCHColumnBatchIterator(resIter, Some(pipelineTime))
context.addTaskCompletionListener[Unit](_ => iter.close())
new CloseableCHColumnBatchIterator(iter, Some(pipelineTime))
}
}

Expand All @@ -346,3 +292,47 @@ object CHIteratorApi {
}
}
}

class CollectMetricIterator(
val nativeIterator: BatchIterator,
val updateNativeMetrics: IMetrics => Unit,
val updateInputMetrics: InputMetricsWrapper => Unit,
val inputMetrics: InputMetrics
) extends Iterator[ColumnarBatch] {
private var outputRowCount = 0L
private var outputVectorCount = 0L
private var metricsUpdated = false

override def hasNext: Boolean = {
nativeIterator.hasNext
}

override def next(): ColumnarBatch = {
val cb = nativeIterator.next()
outputVectorCount += 1
outputRowCount += cb.numRows()
cb
}

def close(): Unit = {
collectStageMetrics()
nativeIterator.close()
}

def cancel(): Unit = {
collectStageMetrics()
nativeIterator.cancel()
}

private def collectStageMetrics(): Unit = {
if (!metricsUpdated) {
val nativeMetrics = nativeIterator.getMetrics.asInstanceOf[NativeMetrics]
nativeMetrics.setFinalOutputMetrics(outputRowCount, outputVectorCount)
updateNativeMetrics(nativeMetrics)
if (updateInputMetrics != null) {
updateInputMetrics(inputMetrics)
}
metricsUpdated = true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ object CHExpressionUtil {
TIMESTAMP_MILLIS -> DefaultValidator(),
TIMESTAMP_MICROS -> DefaultValidator(),
FLATTEN -> DefaultValidator(),
RINT -> DefaultValidator(),
STACK -> DefaultValidator()
)
}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,11 @@ class GlutenClickHouseDecimalSuite
)
}

test("Fix issue(6015) allow overflow when converting decimal to integer") {
val sql = "select int(cast(id * 9999999999 as decimal(29, 2))) from range(10)"
runQueryAndCompare(sql)(checkGlutenOperatorMatch[ProjectExecTransformer])
}

def testFromRandomBase(
sql: String,
customCheck: DataFrame => Unit,
Expand Down
Loading

0 comments on commit f235d76

Please sign in to comment.