Skip to content

Commit

Permalink
Inline circuit breaker metrics implementation (#405)
Browse files Browse the repository at this point in the history
* Inline circuit breaker metrics implementation

* Fix initialization issue

* Allow empty labels
  • Loading branch information
svroonland authored Mar 10, 2024
1 parent 35e43ad commit 8f93a91
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ import java.time.Instant
* 2) Failure rate. When the fraction of failed calls in some sample period exceeds a threshold (between 0 and 1), the
* circuit breaker is tripped. The decision to trip the Circuit Breaker is made after every call (including successful
* ones!)
*
* CircuitBreaker can record the following metrics, if a non-empty set of MetricLabels is given:
* - rezilience_circuit_breaker_state: current state (0 = closed, 1 = half-open, 2 = open)
* - rezilience_circuit_breaker_state_changes: number of state changes
* - rezilience_circuit_breaker_calls_success: number of successful calls
* - rezilience_circuit_breaker_calls_failure: number of failed calls
* - rezilience_circuit_breaker_calls_rejected: number of calls rejected in the open state
*/
trait CircuitBreaker[-E] {
self =>
Expand Down Expand Up @@ -120,15 +127,19 @@ object CircuitBreaker {
* @param isFailure
* Only failures that match according to `isFailure` are treated as failures by the circuit breaker. Other failures
* are passed on, circumventing the circuit breaker's failure counter.
* @param metricLabels
* Set of labels to annotate metrics with, to distinguish this circuit breaker from others in the same application.
* No metrics are recorded if None is passed.
* @return
* The CircuitBreaker as a managed resource
*/
def withMaxFailures[E](
maxFailures: Int,
resetPolicy: Schedule[Any, Any, Any] = Retry.Schedules.exponentialBackoff(1.second, 1.minute),
isFailure: PartialFunction[E, Boolean] = isFailureAny[E]
isFailure: PartialFunction[E, Boolean] = isFailureAny[E],
metricLabels: Option[Set[MetricLabel]] = None
): ZIO[Scope, Nothing, CircuitBreaker[E]] =
make(TrippingStrategy.failureCount(maxFailures), resetPolicy, isFailure)
make(TrippingStrategy.failureCount(maxFailures), resetPolicy, isFailure, metricLabels)

/**
* Create a CircuitBreaker with the given tripping strategy
Expand All @@ -140,13 +151,17 @@ object CircuitBreaker {
* @param isFailure
* Only failures that match according to `isFailure` are treated as failures by the circuit breaker. Other failures
* are passed on, circumventing the circuit breaker's failure counter.
* @param metricLabels
* Set of labels to annotate metrics with, to distinguish this circuit breaker from others in the same application.
* No metrics are recorded if None is passed.
* @return
*/
def make[E](
trippingStrategy: ZIO[Scope, Nothing, TrippingStrategy],
resetPolicy: Schedule[Any, Any, Any] =
Retry.Schedules.exponentialBackoff(1.second, 1.minute), // TODO should move to its own namespace
isFailure: PartialFunction[E, Boolean] = isFailureAny[E]
isFailure: PartialFunction[E, Boolean] = isFailureAny[E],
metricLabels: Option[Set[MetricLabel]] = None
): ZIO[Scope, Nothing, CircuitBreaker[E]] =
for {
strategy <- trippingStrategy
Expand All @@ -155,28 +170,19 @@ object CircuitBreaker {
schedule <- resetPolicy.driver
resetRequests <- Queue.bounded[Unit](1)
stateChangesHub <- Hub.sliding[StateChange](32).withFinalizer(_.shutdown)
_ <- ZStream
.fromQueue(resetRequests)
.mapZIO { _ =>
for {
_ <- schedule.next(()) // TODO handle schedule completion?
_ <- halfOpenSwitch.set(true)
_ <- state.set(HalfOpen)
now <- ZIO.clockWith(_.instant)
_ <- stateChangesHub.publish(StateChange(Open, HalfOpen, now))
} yield ()
}
.runDrain
.forkScoped
} yield new CircuitBreakerImpl[resetPolicy.State, E](
state,
resetRequests,
strategy,
stateChangesHub,
schedule,
isFailure,
halfOpenSwitch
)
cb = new CircuitBreakerImpl[resetPolicy.State, E](
state,
resetRequests,
strategy,
stateChangesHub,
schedule,
isFailure,
halfOpenSwitch,
metricLabels
)
_ <- cb.resetProcess
_ <- cb.trackStateChanges
} yield cb

private case class CircuitBreakerImpl[ScheduleState, -E](
state: Ref[State],
Expand All @@ -185,9 +191,29 @@ object CircuitBreaker {
stateChangesHub: Hub[StateChange],
schedule: Schedule.Driver[ScheduleState, Any, Any, Any],
isFailure: PartialFunction[E, Boolean],
halfOpenSwitch: Ref[Boolean]
halfOpenSwitch: Ref[Boolean],
labels: Option[Set[MetricLabel]]
) extends CircuitBreaker[E] {

override val stateChanges: ZIO[Scope, Nothing, Dequeue[StateChange]] = stateChangesHub.subscribe

val metrics = labels.map { labels =>
CircuitBreakerMetrics(
state = Metric
.gauge("rezilience_circuit_breaker_state")
.tagged(labels),
nrStateChanges = Metric.counter("rezilience_circuit_breaker_state_changes").tagged(labels),
callsSuccess = Metric.counter("rezilience_circuit_breaker_calls_success").tagged(labels),
callsFailure = Metric.counter("rezilience_circuit_breaker_calls_failure").tagged(labels),
callsRejected = Metric
.counter("rezilience_circuit_breaker_calls_rejected")
.tagged(labels)
)
}

private def withMetrics(f: CircuitBreakerMetrics => UIO[Unit]): UIO[Unit] =
ZIO.fromOption(metrics).flatMap(f).ignore

val changeToOpen: ZIO[Any, Nothing, Unit] =
for {
oldState <- state.getAndSet(Open)
Expand All @@ -204,8 +230,40 @@ object CircuitBreaker {
_ <- stateChangesHub.publish(StateChange(oldState, Closed, now))
} yield ()

val resetProcess: ZIO[Scope, Nothing, Any] = ZStream
.fromQueue(resetRequests)
.mapZIO { _ =>
for {
_ <- schedule.next(()) // TODO handle schedule completion?
_ <- halfOpenSwitch.set(true)
_ <- state.set(HalfOpen)
now <- ZIO.clockWith(_.instant)
_ <- stateChangesHub.publish(StateChange(Open, HalfOpen, now))
} yield ()
}
.runDrain
.forkScoped

val trackStateChanges: ZIO[Scope, Nothing, Any] = for {
stateChanges <- stateChanges
_ <-
ZStream
.fromQueue(stateChanges)
.tap { stateChange =>
val stateAsInt = stateChange.to match {
case State.Closed => 0
case State.HalfOpen => 1
case State.Open => 2
}

withMetrics(metrics => metrics.nrStateChanges.increment *> metrics.state.set(stateAsInt.doubleValue))
}
.runDrain
.forkScoped
} yield ()

override def apply[R, E1 <: E, A](f: ZIO[R, E1, A]): ZIO[R, CircuitBreakerCallError[E1], A] =
for {
(for {
currentState <- state.get
result <- currentState match {
case Closed =>
Expand Down Expand Up @@ -238,7 +296,14 @@ object CircuitBreaker {
}
} yield result
}
} yield result
} yield result)
.tapBoth(
{
case CircuitBreaker.CircuitBreakerOpen => withMetrics(_.callsRejected.increment)
case CircuitBreaker.WrappedError(_) => withMetrics(_.callsFailure.increment)
},
_ => withMetrics(_.callsSuccess.increment)
)

private def tapZIOOnUserDefinedFailure[R, E1 <: E, A](
f: ZIO[R, E1, A]
Expand All @@ -260,12 +325,11 @@ object CircuitBreaker {
stateChangesHub,
schedule,
pf andThen isFailure,
halfOpenSwitch
halfOpenSwitch,
labels
)

override def currentState: UIO[State] = state.get

override val stateChanges: ZIO[Scope, Nothing, Dequeue[StateChange]] = stateChangesHub.subscribe
}

private[rezilience] def isFailureAny[E]: PartialFunction[E, Boolean] = { case _ => true }
Expand All @@ -277,86 +341,4 @@ object CircuitBreaker {
callsFailure: Metric.Counter[Long],
callsRejected: Metric.Counter[Long]
)

/**
* Takes an existing CircuitBreaker and returns a new one that records metrics
*
* Metrics are
* - rezilience_circuit_breaker_state: current state (0 = closed, 1 = half-open, 2 = open)
* - rezilience_circuit_breaker_state_changes: number of state changes
* - rezilience_circuit_breaker_calls_success: number of successful calls
* - rezilience_circuit_breaker_calls_failure: number of failed calls
* - rezilience_circuit_breaker_calls_rejected: number of calls rejected in the open state
*
* Be sure to use only the returned CircuitBreaker and not the one given as parameter, otherwise no metrics will be
* recorded. Recommended usage is to create it in go, eg `cb <-
* CircuitBreaker.withMaxFailures(10).flatMap(CircuitBreaker.withMetrics(_, labels))`
*
* @param circuitBreaker
* Existing CircuitBreaker
* @param labels
* Set of labels to annotate metrics with, to distinguish this circuit breaker from others in the same application.
*
* @return
* CircuitBreaker that records metrics
*/
def withMetrics[E](
circuitBreaker: CircuitBreaker[E],
labels: Set[MetricLabel]
): ZIO[Scope, Nothing, CircuitBreakerWithMetrics[E]] = {

val metrics = CircuitBreakerMetrics(
state = Metric
.gauge("rezilience_circuit_breaker_state")
.tagged(labels),
nrStateChanges = Metric.counter("rezilience_circuit_breaker_state_changes").tagged(labels),
callsSuccess = Metric.counter("rezilience_circuit_breaker_calls_success").tagged(labels),
callsFailure = Metric.counter("rezilience_circuit_breaker_calls_failure").tagged(labels),
callsRejected = Metric
.counter("rezilience_circuit_breaker_calls_rejected")
.tagged(labels)
)

for {
stateChanges <- circuitBreaker.stateChanges
_ <- ZStream
.fromQueue(stateChanges)
.tap { stateChange =>
val stateAsInt = stateChange.to match {
case State.Closed => 0
case State.HalfOpen => 1
case State.Open => 2
}

metrics.nrStateChanges.increment *> metrics.state.set(stateAsInt.doubleValue)
}
.runDrain
.forkScoped
} yield new CircuitBreakerWithMetrics[E](circuitBreaker, metrics)
}

private[rezilience] case class CircuitBreakerWithMetrics[E](
circuitBreaker: CircuitBreaker[E],
metrics: CircuitBreakerMetrics
) extends CircuitBreaker[E] {
override def apply[R, E1 <: E, A](
f: ZIO[R, E1, A]
): ZIO[R, CircuitBreakerCallError[E1], A] = circuitBreaker
.apply(f)
.tapBoth(
{
case CircuitBreaker.CircuitBreakerOpen => metrics.callsRejected.increment
case CircuitBreaker.WrappedError(_) => metrics.callsFailure.increment
},
_ => metrics.callsSuccess.increment
)

override def widen[E2](pf: PartialFunction[E2, E]): CircuitBreaker[E2] =
CircuitBreakerWithMetrics(circuitBreaker.widen[E2](pf), metrics)

override def currentState: UIO[State] = circuitBreaker.currentState

override val stateChanges: ZIO[Scope, Nothing, Dequeue[StateChange]] = circuitBreaker.stateChanges
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ object CircuitBreakerSpec extends ZIOSpecDefault {
for {
labels <- ZIO.randomWith(_.nextUUID).map(uuid => Set(MetricLabel("test_id", uuid.toString)))
_ <- CircuitBreaker
.withMaxFailures(3)
.flatMap(CircuitBreaker.withMetrics(_, labels))
.withMaxFailures(3, metricLabels = Some(labels))
metricState <- Metric.gauge("rezilience_circuit_breaker_calls_state").tagged(labels).value
metricStateChanges <- Metric.counter("rezilience_circuit_breaker_calls_state_changes").tagged(labels).value
metricSuccess <- Metric.counter("rezilience_circuit_breaker_calls_success").tagged(labels).value
Expand All @@ -172,8 +171,7 @@ object CircuitBreakerSpec extends ZIOSpecDefault {
for {
labels <- ZIO.randomWith(_.nextUUID).map(uuid => Set(MetricLabel("test_id", uuid.toString)))
cb <- CircuitBreaker
.withMaxFailures(3)
.flatMap(CircuitBreaker.withMetrics(_, labels))
.withMaxFailures(3, metricLabels = Some(labels))
_ <- cb(ZIO.unit)
_ <- cb(ZIO.fail("Failed")).either
metricSuccess <- Metric.counter("rezilience_circuit_breaker_calls_success").tagged(labels).value
Expand All @@ -184,8 +182,7 @@ object CircuitBreakerSpec extends ZIOSpecDefault {
for {
labels <- ZIO.randomWith(_.nextUUID).map(uuid => Set(MetricLabel("test_id", uuid.toString)))
cb <- CircuitBreaker
.withMaxFailures(10, Schedule.exponential(1.second))
.flatMap(CircuitBreaker.withMetrics(_, labels))
.withMaxFailures(10, Schedule.exponential(1.second), metricLabels = Some(labels))

metricStateChanges = Metric.counter("rezilience_circuit_breaker_state_changes").tagged(labels)
metricState = Metric.gauge("rezilience_circuit_breaker_state").tagged(labels)
Expand Down

0 comments on commit 8f93a91

Please sign in to comment.