Skip to content
This repository has been archived by the owner on Jul 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #111 from zalando-nakadi/retryOnFailedPublishEvent
Browse files Browse the repository at this point in the history
Add ability to retry on failure of publishing events
  • Loading branch information
mdedetrich authored Jun 8, 2019
2 parents 0d9b5e5 + dbb3fbc commit 99f002c
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 21 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,20 @@ reconnect after a minute. Note that when you are deploying a new instance of you
more instances than partitions the above code will handle this situation (when the old instance will terminate
and disconnect from the stream it will free up some slots, so the new instance will eventually reconnect)

#### Automatically retrying sending of events

Kanadi has a configuration option `kanadi.http-config.failed-publish-event-retry` which allows Kanadi to automatically
resend events should they fail. The setting can also be set using the environment variable
`KANADI_HTTP_CONFIG_FAILED_PUBLISH_EVENT_RETRY`. By default this setting is `false` since enabling this can cause
events to be sent out of order, in other words you shouldn't enable it if you (or your consumers) rely on ordering
of events. Kanadi will only resend the events which actually failed to send and it will refuse to send
events which failed due to schema validation (since resending such events is pointless).

Since Nakadi will only fail to publish an event in extreme circumstances (i.e. under heavy load) the retry
uses an exponential backoff which can be configured with `kanadi.exponential-backoff-config` settings (see
`reference.conf` for information on the settings). If reach the maximum number of retries then `Events.publish`
will fail with the original `Events.Errors.EventValidation` exception.

#### Modifying the akka-stream source

It is possible to modify the underlying akka stream when using `Subscriptions.eventsStreamed` or
Expand Down
11 changes: 11 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,16 @@ kanadi {
no-empty-slots-cursor-reset-retry-delay = ${?KANADI_HTTP_CONFIG_NO_EMPTY_SLOTS_CURSOR_RESET_RETRY_DELAY}
server-disconnect-retry-delay = "10 seconds"
server-disconnect-retry-delay = ${?KANADI_HTTP_CONFIG_SERVER_DISCONNECT_RETRY_DELAY}
failed-publish-event-retry = false
failed-publish-event-retry = ${?KANADI_HTTP_CONFIG_FAILED_PUBLISH_EVENT_RETRY}
}

exponential-backoff-config {
initial-delay = 100 millis
initial-delay = ${?KANADI_EXPONENTIAL_BACKOFF_CONFIG_INITIAL_DELAY}
backoff-factor = 1.5
backoff-factor = ${?KANADI_EXPONENTIAL_BACKOFF_CONFIG_BACKOFF_FACTOR}
max-retries = 5
max-retries = ${?KANADI_EXPONENTIAL_BACKOFF_CONFIG_MAX_RETRIES}
}
}
5 changes: 4 additions & 1 deletion src/main/scala/org/zalando/kanadi/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package org.zalando.kanadi
import java.net.URI

import net.ceedubs.ficus.Ficus._
import org.zalando.kanadi.models.HttpConfig
import org.zalando.kanadi.models.{ExponentialBackoffConfig, HttpConfig}
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import net.ceedubs.ficus.readers.namemappers.implicits.hyphenCase

Expand All @@ -13,4 +13,7 @@ trait Config {
lazy val nakadiUri: URI = new URI(config.as[String]("kanadi.nakadi.uri"))

implicit lazy val kanadiHttpConfig: HttpConfig = config.as[HttpConfig]("kanadi.http-config")

implicit lazy val kanadiExponentialBackoffConfig: ExponentialBackoffConfig =
config.as[ExponentialBackoffConfig]("kanadi.exponential-backoff-config")
}
120 changes: 101 additions & 19 deletions src/main/scala/org/zalando/kanadi/api/Events.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import cats.syntax.either._
import com.typesafe.scalalogging.{Logger, LoggerTakingImplicit}
import de.heikoseeberger.akkahttpcirce.ErrorAccumulatingCirceSupport._
Expand All @@ -25,8 +24,16 @@ import org.mdedetrich.webmodels.circe._
import org.zalando.kanadi.models._

import scala.concurrent.{ExecutionContext, Future}

sealed abstract class Event[T](val data: T)
import scala.concurrent.duration._
import scala.language.postfixOps

sealed abstract class Event[T](val data: T) {
def getMetadata: Option[Metadata] = this match {
case e: Event.DataChange[_] => Some(e.metadata)
case e: Event.Business[_] => Some(e.metadata)
case _: Event.Undefined[_] => None
}
}

object Event {
final case class DataChange[T](override val data: T,
Expand Down Expand Up @@ -246,10 +253,12 @@ object Events {
}
}

case class Events(baseUri: URI, oAuth2TokenProvider: Option[OAuth2TokenProvider] = None)(implicit
kanadiHttpConfig: HttpConfig,
http: HttpExt,
materializer: Materializer)
case class Events(baseUri: URI, oAuth2TokenProvider: Option[OAuth2TokenProvider] = None)(
implicit
kanadiHttpConfig: HttpConfig,
exponentialBackoffConfig: ExponentialBackoffConfig,
http: HttpExt,
materializer: Materializer)
extends EventsInterface {
private val baseUri_ = Uri(baseUri.toString)
protected val logger: LoggerTakingImplicit[FlowId] = Logger.takingImplicit[FlowId](classOf[Events])
Expand Down Expand Up @@ -278,6 +287,81 @@ case class Events(baseUri: URI, oAuth2TokenProvider: Option[OAuth2TokenProvider]
implicit encoder: Encoder[T],
flowId: FlowId = randomFlowId(),
executionContext: ExecutionContext
): Future[Unit] =
if (kanadiHttpConfig.failedPublishEventRetry) {
publishWithRecover(name, events, List.empty, fillMetadata, exponentialBackoffConfig.initialDelay, 0)
} else publishBase(name, events, fillMetadata)

private[api] def publishWithRecover[T](name: EventTypeName,
events: List[Event[T]],
currentNotValidEvents: List[Events.BatchItemResponse],
fillMetadata: Boolean,
currentDuration: FiniteDuration,
count: Int)(
implicit encoder: Encoder[T],
flowId: FlowId = randomFlowId(),
executionContext: ExecutionContext
): Future[Unit] =
publishBase(name, events, fillMetadata).recoverWith {
case Events.Errors.EventValidation(errors) =>
if (count > exponentialBackoffConfig.maxRetries) {
val finalEvents =
(errors ++ currentNotValidEvents).filter(_.publishingStatus != Events.PublishingStatus.Submitted)
logger.error(
s"Max retry failed for publishing events, event id's still not submitted are ${finalEvents.flatMap(_.eid.map(_.id)).mkString(",")}")
Future.failed(Events.Errors.EventValidation(finalEvents))
} else {
val (notValid, retry) = errors.partition(
response =>
response.step
.contains(Events.Step.Validating) || response.publishingStatus == Events.PublishingStatus.Submitted)
val toRetry = events.filter { event =>
eventWithUndefinedEventIdFallback(event) match {
case Some(eid) => !retry.exists(_.eid.contains(eid))
case None => false
}
}

val newDuration = exponentialBackoffConfig.calculate(count, currentDuration)

logger.warn(
s"Events with eid's ${retry.flatMap(_.eid).map(_.id).mkString(",")} failed to submit, retrying in ${newDuration.toMillis} millis")

val invalidSchemaEvents = notValid.filter(_.publishingStatus != Events.PublishingStatus.Submitted)

if (invalidSchemaEvents.nonEmpty)
logger.error(
s"Events ${notValid.flatMap(_.eid).map(_.id).mkString(",")} did not pass validation schema, not submitting")

val newNotValidEvents = (currentNotValidEvents ++ notValid).distinct

akka.pattern.after(newDuration, http.system.scheduler)(
publishWithRecover(name, toRetry, newNotValidEvents, fillMetadata, newDuration, count + 1))
}
}

/**
* If we have an event of type [[Event.Undefined]], this function will try and manually parse the event to see if
* it has an "eid" field. The "eid" field is not mandatory in [[Event.Undefined]] however there is a chance it can
* still be there.
*
* @param event
* @param encoder
* @tparam T
* @return
*/
private[api] def eventWithUndefinedEventIdFallback[T](event: Event[T])(
implicit encoder: Encoder[T]): Option[EventId] =
event.getMetadata.map(_.eid) orElse {
(event.data.asJson \\ "eid").headOption.flatMap { json =>
json.as[EventId].toOption
}
}

private[api] def publishBase[T](name: EventTypeName, events: List[Event[T]], fillMetadata: Boolean = true)(
implicit encoder: Encoder[T],
flowId: FlowId = randomFlowId(),
executionContext: ExecutionContext
): Future[Unit] = {
val uri =
baseUri_.withPath(baseUri_.path / "event-types" / name.name / "events")
Expand Down Expand Up @@ -308,18 +392,16 @@ case class Events(baseUri: URI, oAuth2TokenProvider: Option[OAuth2TokenProvider]
_ = logger.debug(request.toString)
response <- http.singleRequest(request)
result <- {
if (response.status.isSuccess()) {
response.discardEntityBytes()
Future.successful(())
} else {
response.status match {
case StatusCodes.UnprocessableEntity =>
Unmarshal(response.entity.httpEntity.withContentType(ContentTypes.`application/json`))
.to[List[Events.BatchItemResponse]]
.map(x => throw Events.Errors.EventValidation(x))
case _ =>
processNotSuccessful(response)
}
response.status match {
case StatusCodes.UnprocessableEntity | StatusCodes.MultiStatus =>
Unmarshal(response.entity.httpEntity.withContentType(ContentTypes.`application/json`))
.to[List[Events.BatchItemResponse]]
.map(x => throw Events.Errors.EventValidation(x))
case s if s.isSuccess() =>
response.discardEntityBytes()
Future.successful(())
case _ =>
processNotSuccessful(response)
}
}
} yield result
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.zalando.kanadi.models

import scala.concurrent.duration._

final case class ExponentialBackoffConfig(initialDelay: FiniteDuration, backoffFactor: Double, maxRetries: Int) {

def calculate(retry: Int, interval: FiniteDuration): FiniteDuration =
interval * Math.pow(backoffFactor, retry.toDouble) match {
case f: FiniteDuration => f
case _ => throw new Exception("Expected FiniteDuration")
}
}
3 changes: 2 additions & 1 deletion src/main/scala/org/zalando/kanadi/models/HttpConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ final case class HttpConfig(censorOAuth2Token: Boolean,
singleStringChunkLength: Int,
eventListChunkLength: Int,
noEmptySlotsCursorResetRetryDelay: FiniteDuration,
serverDisconnectRetryDelay: FiniteDuration)
serverDisconnectRetryDelay: FiniteDuration,
failedPublishEventRetry: Boolean)
Loading

0 comments on commit 99f002c

Please sign in to comment.