Skip to content

Commit

Permalink
#519: fix high CPU usage with Circuit Breaker combined with ClientFl…
Browse files Browse the repository at this point in the history
…ow (#523) (#527)

timeLeftForNextElemToTimeout should return timeout (if no elements in
timeout queue) or a duration greater than our timer precision (10ms)

(cherry picked from commit dd3455)
  • Loading branch information
sebady authored and akara committed Oct 16, 2017
1 parent fead9e7 commit 4f48c56
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions squbs-ext/src/main/scala/org/squbs/streams/TimeoutBidi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ abstract class TimeoutGraphStageLogic[In, FromWrapped, Out](shape: BidiShape[In,
protected def isBuffersEmpty: Boolean

protected def timeLeftForNextElemToTimeout: Long = {
timeoutAsMillis - NANOSECONDS.toMillis(System.nanoTime() - firstElemStartTime)
val firstElemTime = firstElemStartTime
if (firstElemTime == 0) timeoutAsMillis
else {
val timeLeftInmillis = timeoutAsMillis - NANOSECONDS.toMillis(System.nanoTime() - firstElemTime)
if (MILLISECONDS.toNanos(timeLeftInmillis) < precision) NANOSECONDS.toMillis(precision)
else timeLeftInmillis
}
}

protected def expirationTime(): Long = System.nanoTime() - timeoutAsNanos - precision
Expand Down Expand Up @@ -334,9 +340,13 @@ final class TimeoutBidiUnordered[In, Out, Context](timeout: FiniteDuration,
} map { case(id, (context, _)) =>
timeouts.remove(id)
(Failure(FlowTimeoutException()), context)
} orElse Try(readyToPush.dequeue()).toOption.map { case(elem, _) => elem }
} orElse dequeueOption().map { case(elem, _) => elem }
}

private def dequeueOption(): Option[((Try[Out], Context), Long)] =
if (readyToPush.nonEmpty) Some(readyToPush.dequeue())
else None

override protected def onPullOut() = pickNextElemToPush()

override protected def onScheduledTimeout() = pickNextElemToPush()
Expand Down Expand Up @@ -437,12 +447,11 @@ final class TimeoutBidiOrdered[In, Out](timeout: FiniteDuration, cleanUp: Out =>
override def enqueueInTimeoutQueue(elem: In): Unit = timeouts.enqueue(TimeoutTracker(System.nanoTime(), false))

override def onPushFromWrapped(elem: Out, isOutAvailable: Boolean): Option[Try[Out]] = {
if (isOutAvailable) {
if (isOutAvailable && timeouts.nonEmpty) {
if (timeouts.dequeue().isTimedOut) {
tryCleanUp(elem, cleanUp)
None
}
else Some(Success(elem))
} else Some(Success(elem))
} else None
}

Expand Down

0 comments on commit 4f48c56

Please sign in to comment.