Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Yohahaha committed Nov 9, 2023
1 parent 2bb522f commit 3b41af1
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.spark.{InterruptibleIterator, TaskContext}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.util.TaskResources

import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

private class PayloadCloser[A](in: Iterator[A])(closeCallback: A => Unit) extends Iterator[A] {
Expand Down Expand Up @@ -110,8 +111,10 @@ private class PipelineTimeAccumulator[A](in: Iterator[A], pipelineTime: SQLMetri
}

private def tryFinish(): Unit = {
pipelineTime += accumulatedTime.getAndSet(
0L
pipelineTime += TimeUnit.NANOSECONDS.toMillis(
accumulatedTime.getAndSet(
0L
)
) // make sure the accumulated time is submitted once
}
}
Expand Down

0 comments on commit 3b41af1

Please sign in to comment.