From 3be5ae85de5ddc002dfeb8c014e4f9380c044674 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 7 Jun 2024 10:00:38 +0800 Subject: [PATCH] fixup --- .../backendsapi/velox/VeloxIteratorApi.scala | 4 +- .../execution/VeloxAppendBatchesExec.scala | 31 +++++++++++---- .../org/apache/gluten/utils/Iterators.scala | 38 ++++++++++++++++--- 3 files changed, 58 insertions(+), 15 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 5f9b5afa9976d..ca63fe4158e85 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -184,7 +184,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { resIter.close() } .recyclePayload(batch => batch.close()) - .addToPipelineTime(pipelineTime) + .collectLifeMillis(millis => pipelineTime += millis) .asInterruptible(context) .create() } @@ -227,7 +227,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { nativeResultIterator.close() } .recyclePayload(batch => batch.close()) - .addToPipelineTime(pipelineTime) + .collectLifeMillis(millis => pipelineTime += millis) .create() } // scalastyle:on argcount diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxAppendBatchesExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxAppendBatchesExec.scala index 48c469e088f84..8c28345742047 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxAppendBatchesExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxAppendBatchesExec.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.vectorized.ColumnarBatch +import java.util.concurrent.atomic.AtomicLong + import scala.collection.JavaConverters._ /** @@ -41,7 +43,8 @@ case class VeloxAppendBatchesExec(override val child: SparkPlan, minOutputBatchS "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"), "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches") + "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"), + "appendTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to append batches") ) override def supportsColumnar: Boolean = true @@ -52,22 +55,35 @@ case class VeloxAppendBatchesExec(override val child: SparkPlan, minOutputBatchS val numInputBatches = longMetric("numInputBatches") val numOutputRows = longMetric("numOutputRows") val numOutputBatches = longMetric("numOutputBatches") + val appendTime = longMetric("appendTime") child.executeColumnar().mapPartitions { in => + // Append millis = Out millis - In millis. + val appendMillis = new AtomicLong(0L) + val appender = VeloxBatchAppender.create( minOutputBatchSize, - in.map { - inBatch => - numInputRows += inBatch.numRows() - numInputBatches += 1 - inBatch - }.asJava) + Iterators + .wrap(in) + .collectReadMillis(inMillis => appendMillis.getAndAdd(-inMillis)) + .create() + .map { + inBatch => + numInputRows += inBatch.numRows() + numInputBatches += 1 + inBatch + } + .asJava + ) + val out = Iterators .wrap(appender.asScala) + .collectReadMillis(outMillis => appendMillis.getAndAdd(outMillis)) .recyclePayload(_.close()) .recycleIterator { appender.close() + appendTime += appendMillis.get() } .create() .map { @@ -76,6 +92,7 @@ case class VeloxAppendBatchesExec(override val child: SparkPlan, minOutputBatchS numOutputBatches += 1 outBatch } + out } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/utils/Iterators.scala b/gluten-core/src/main/scala/org/apache/gluten/utils/Iterators.scala index 81ff2dc0b177d..1e3681355d6c3 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/utils/Iterators.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/utils/Iterators.scala @@ -17,7 +17,6 @@ package org.apache.gluten.utils import org.apache.spark.{InterruptibleIterator, TaskContext} -import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.util.TaskResources import java.util.concurrent.TimeUnit @@ -85,12 +84,12 @@ private class IteratorCompleter[A](in: Iterator[A])(completionCallback: => Unit) } } -private class PipelineTimeAccumulator[A](in: Iterator[A], pipelineTime: SQLMetric) +private class LifeTimeAccumulator[A](in: Iterator[A], onCollected: Long => Unit) extends Iterator[A] { private val closed = new AtomicBoolean(false) private val startTime = System.nanoTime() - TaskResources.addRecycler("Iterators#PipelineTimeAccumulator", 100) { + TaskResources.addRecycler("Iterators#LifeTimeAccumulator", 100) { tryFinish() } @@ -111,9 +110,31 @@ private class PipelineTimeAccumulator[A](in: Iterator[A], pipelineTime: SQLMetri if (!closed.compareAndSet(false, true)) { return } - pipelineTime += TimeUnit.NANOSECONDS.toMillis( + val lifeTime = TimeUnit.NANOSECONDS.toMillis( System.nanoTime() - startTime ) + onCollected(lifeTime) + } +} + +private class ReadTimeAccumulator[A](in: Iterator[A], onAdded: Long => Unit) extends Iterator[A] { + + override def hasNext: Boolean = { + val prev = System.nanoTime() + val out = in.hasNext + val after = System.nanoTime() + val duration = TimeUnit.NANOSECONDS.toMillis(after - prev) + onAdded(duration) + out + } + + override def next(): A = { + val prev = System.nanoTime() + val out = in.next() + val after = System.nanoTime() + val duration = TimeUnit.NANOSECONDS.toMillis(after - prev) + onAdded(duration) + out } } @@ -171,8 +192,13 @@ class WrapperBuilder[A](in: Iterator[A]) { // FIXME how to make the ctor compani this } - def addToPipelineTime(pipelineTime: SQLMetric): WrapperBuilder[A] = { - wrapped = new PipelineTimeAccumulator[A](wrapped, pipelineTime) + def collectLifeMillis(onCollected: Long => Unit): WrapperBuilder[A] = { + wrapped = new LifeTimeAccumulator[A](wrapped, onCollected) + this + } + + def collectReadMillis(onAdded: Long => Unit): WrapperBuilder[A] = { + wrapped = new ReadTimeAccumulator[A](wrapped, onAdded) this }