Skip to content

Commit

Permalink
fix: PROJECT_VIEWED & DATASET_VIEWED events to get max retries (#1650)
Browse files Browse the repository at this point in the history
  • Loading branch information
jachro authored Aug 8, 2023
1 parent de50721 commit 17b9f64
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,32 @@ object EventSender {
): F[EventSender[F]] = for {
consumerUrl <- consumerUrlFactory(config)
sentEventsGauge <- SentEventsGauge[F]
} yield new EventSenderImpl(consumerUrl, sentEventsGauge, onBusySleep = 2 seconds, onErrorSleep = 15 seconds)
} yield new EventSenderImpl(consumerUrl, sentEventsGauge, onBusySleep = 15 seconds, onErrorSleep = 30 seconds)

def apply[F[_]: Async: Logger: MetricsRegistry](consumerUrlFactory: EventConsumerUrlFactory): F[EventSender[F]] =
apply(consumerUrlFactory, ConfigFactory.load)

final case class EventContext(categoryName: CategoryName, errorMessage: String)
final case class EventContext(categoryName: CategoryName,
errorMessage: String,
retries: Option[EventContext.Retries]
) {
def nextAttempt(): EventContext = copy(retries = retries.map(_.nextAttempt()))
lazy val hasAttemptsLeft: Boolean = retries.forall(_.hasAttemptsLeft)
}

object EventContext {

final case class Retries(max: Int, left: Int) {
def nextAttempt(): Retries = copy(left = left - 1)
lazy val hasAttemptsLeft: Boolean = left > 0
}

def apply(categoryName: CategoryName, errorMessage: String): EventContext =
EventContext(categoryName, errorMessage, retries = None)

def apply(categoryName: CategoryName, errorMessage: String, maxRetriesNumber: Int): EventContext =
EventContext(categoryName, errorMessage, Retries(maxRetriesNumber, maxRetriesNumber).some)
}
}

class EventSenderImpl[F[_]: Async: Logger](
Expand Down Expand Up @@ -113,11 +133,17 @@ class EventSenderImpl[F[_]: Async: Logger](

private def sendWithRetry(request: Request[F], context: EventContext): F[Status] =
send(request)(responseMapping)
.recoverWith(retryOnServerError(Eval.always(sendWithRetry(request, context)), context))
.recoverWith { ex =>
val updatedContext = context.nextAttempt()
retryOnServerError(Eval.always(sendWithRetry(request, updatedContext)), updatedContext)(ex)
}

private def retryOnServerError(retry: Eval[F[Status]],
context: EventContext
): PartialFunction[Throwable, F[Status]] = {
case exception if !context.hasAttemptsLeft =>
val message = s"${context.errorMessage} - no more attempts left"
Logger[F].error(exception)(message) >> new Exception(message, exception).raiseError[F, Status]
case UnexpectedResponseException(TooManyRequests, _) =>
waitAndRetry(retry, context.errorMessage)
case exception @ UnexpectedResponseException(ServiceUnavailable | GatewayTimeout | BadGateway, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ import eu.timepit.refined.auto._
import io.renku.events.CategoryName
import io.renku.events.Generators._
import io.renku.generators.Generators.Implicits._
import io.renku.generators.Generators.nonEmptyStrings
import io.renku.generators.Generators.{nonEmptyStrings, positiveInts}
import io.renku.graph.config.EventConsumerUrl
import io.renku.graph.metrics.SentEventsGauge
import io.renku.interpreters.TestLogger
import io.renku.stubbing.ExternalServiceStubbing
import io.renku.testtools.IOSpec
import org.http4s.Status.{Accepted, BadGateway, GatewayTimeout, NotFound, ServiceUnavailable, TooManyRequests}
import org.scalamock.scalatest.MockFactory
import org.scalatest.OptionValues
import org.scalatest.matchers.should
import org.scalatest.prop.TableDrivenPropertyChecks
import org.scalatest.wordspec.AnyWordSpec
Expand All @@ -48,19 +49,20 @@ class EventSenderSpec
with ExternalServiceStubbing
with MockFactory
with should.Matchers
with OptionValues
with TableDrivenPropertyChecks {

forAll {
Table(
"Request Content Type" -> "SendEvent generator",
"No Payload" -> { (sender: EventSender[IO], categoryName: CategoryName) =>
eventRequestContentNoPayloads.map(ev =>
sender.sendEvent(ev, EventContext(categoryName, nonEmptyStrings().generateOne))
sender.sendEvent(ev, EventContext(categoryName, nonEmptyStrings().generateOne, maxRetriesNumber = 3))
)
},
"With Payload" -> { (sender: EventSender[IO], categoryName: CategoryName) =>
eventRequestContentWithZippedPayloads.map(ev =>
sender.sendEvent(ev, EventContext(categoryName, nonEmptyStrings().generateOne))
sender.sendEvent(ev, EventContext(categoryName, nonEmptyStrings().generateOne, maxRetriesNumber = 3))
)
}
)
Expand All @@ -84,6 +86,7 @@ class EventSenderSpec

Set(TooManyRequests, BadGateway, ServiceUnavailable, GatewayTimeout) foreach { errorStatus =>
s"retry if remote responds with status such as $errorStatus" in new TestCase {

val eventRequest = post(urlEqualTo(s"/events")).inScenario("Retry")

stubFor {
Expand Down Expand Up @@ -152,6 +155,62 @@ class EventSenderSpec
reset()
}
}

"fail when the specified number of retries is exceeded" in new TestCase {

val eventRequest = post(urlEqualTo(s"/events"))
val response = aResponse().withFault(CONNECTION_RESET_BY_PEER)

stubFor {
eventRequest.willReturn(response)
}

(sentEventsGauge.increment _).expects(categoryName).returning(().pure[IO]).anyNumberOfTimes()

intercept[Exception](
callGenerator(eventSender, categoryName).generateOne.unsafeRunSync()
).getMessage should endWith("no more attempts left")
}
}
}

"EventContext.nextAttempt" should {

"return a copy of the context object with decremented number of retries" in {

val initialRetries = positiveInts().generateOne.value
val ctx = generateEventContext(maxRetries = initialRetries.some)

ctx.retries.value.max shouldBe initialRetries
ctx.retries.value.left shouldBe initialRetries

val nextAttempt = ctx.nextAttempt()
nextAttempt.retries.value.max shouldBe initialRetries
nextAttempt.retries.value.left shouldBe initialRetries - 1
}

"return the same instance if no retries specified" in {

val ctx = generateEventContext(maxRetries = None)

ctx.retries shouldBe None

ctx.nextAttempt() shouldBe ctx
}
}

"EventContext.hasAttemptsLeft" should {

"return true when the number of retries is > 0" in {
generateEventContext(maxRetries = positiveInts().generateSome.map(_.value)).hasAttemptsLeft shouldBe true
}

"return false when the number of retries is <= 0" in {
generateEventContext(maxRetries = 0.some).hasAttemptsLeft shouldBe false
}

"return true when no retries specified" in {
generateEventContext(maxRetries = None).hasAttemptsLeft shouldBe true
}
}

Expand All @@ -175,4 +234,12 @@ class EventSenderSpec
requestTimeoutOverride = Some(requestTimeout)
)
}

private def generateEventContext(maxRetries: Option[Int]) =
maxRetries match {
case None =>
EventContext(categoryNames.generateOne, nonEmptyStrings().generateOne)
case Some(retries) =>
EventContext(categoryNames.generateOne, nonEmptyStrings().generateOne, retries)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,33 @@ private class ClientImpl[F[_]](eventSender: EventSender[F]) extends Client[F] {
import io.circe.syntax._

override def send(event: CleanUpEvent): F[Unit] =
send(event, CleanUpEvent.categoryName)
send(event, CleanUpEvent.categoryName, maxRetriesNumber = None)

override def send(event: DatasetViewedEvent): F[Unit] =
send(event, DatasetViewedEvent.categoryName)
send(event, DatasetViewedEvent.categoryName, maxRetriesNumber = 5.some)

override def send(event: ProjectActivated): F[Unit] =
send(event, ProjectActivated.categoryName)
send(event, ProjectActivated.categoryName, maxRetriesNumber = None)

override def send(event: ProjectViewedEvent): F[Unit] =
send(event, ProjectViewedEvent.categoryName)
send(event, ProjectViewedEvent.categoryName, maxRetriesNumber = 5.some)

override def send(event: ProjectViewingDeletion): F[Unit] =
send(event, ProjectViewingDeletion.categoryName)
send(event, ProjectViewingDeletion.categoryName, maxRetriesNumber = None)

override def send(event: SyncRepoMetadata): F[Unit] =
send(event, SyncRepoMetadata.categoryName)
send(event, SyncRepoMetadata.categoryName, maxRetriesNumber = None)

private def send[E](event: E, category: CategoryName)(implicit enc: Encoder[E], show: Show[E]): F[Unit] =
private def send[E](event: E, category: CategoryName, maxRetriesNumber: Option[Int])(implicit
enc: Encoder[E],
show: Show[E]
): F[Unit] = {
val message = show"$category: sending event $event failed"
eventSender.sendEvent(
EventRequestContent.NoPayload(event.asJson),
EventContext(category, show"$category: sending event $event failed")
maxRetriesNumber.map(mr => EventContext(category, message, mr)).getOrElse(EventContext(category, message))
)
}

private def send[E, P](event: E, payload: P, category: CategoryName)(implicit
eventEnc: Encoder[E],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ClientSpec extends AnyWordSpec with should.Matchers with MockFactory with

val event = projectActivatedEvents.generateOne

givenSending(event, ProjectActivated.categoryName, returning = ().pure[Try])
givenSending(event, ProjectActivated.categoryName, maxRetriesNumber = None, returning = ().pure[Try])

client.send(event).success.value shouldBe ()
}
Expand All @@ -55,7 +55,7 @@ class ClientSpec extends AnyWordSpec with should.Matchers with MockFactory with

val event = datasetViewedEvents.generateOne

givenSending(event, DatasetViewedEvent.categoryName, returning = ().pure[Try])
givenSending(event, DatasetViewedEvent.categoryName, maxRetriesNumber = 5.some, returning = ().pure[Try])

client.send(event).success.value shouldBe ()
}
Expand All @@ -67,7 +67,7 @@ class ClientSpec extends AnyWordSpec with should.Matchers with MockFactory with

val event = projectViewedEvents.generateOne

givenSending(event, ProjectViewedEvent.categoryName, returning = ().pure[Try])
givenSending(event, ProjectViewedEvent.categoryName, maxRetriesNumber = 5.some, returning = ().pure[Try])

client.send(event).success.value shouldBe ()
}
Expand All @@ -79,7 +79,7 @@ class ClientSpec extends AnyWordSpec with should.Matchers with MockFactory with

val event = projectViewingDeletions.generateOne

givenSending(event, ProjectViewingDeletion.categoryName, returning = ().pure[Try])
givenSending(event, ProjectViewingDeletion.categoryName, maxRetriesNumber = None, returning = ().pure[Try])

client.send(event).success.value shouldBe ()
}
Expand All @@ -91,7 +91,7 @@ class ClientSpec extends AnyWordSpec with should.Matchers with MockFactory with

val event = syncRepoMetadataEvents.generateOne

givenSending(event, SyncRepoMetadata.categoryName, returning = ().pure[Try])
givenSending(event, SyncRepoMetadata.categoryName, maxRetriesNumber = None, returning = ().pure[Try])

client.send(event).success.value shouldBe ()
}
Expand All @@ -102,16 +102,22 @@ class ClientSpec extends AnyWordSpec with should.Matchers with MockFactory with
private val eventSender = mock[EventSender[Try]]
val client = new ClientImpl[Try](eventSender)

def givenSending[E](event: E, categoryName: CategoryName, returning: Try[Unit])(implicit
encoder: Encoder[E],
show: Show[E]
) = (eventSender
.sendEvent(_: EventRequestContent.NoPayload, _: EventSender.EventContext))
.expects(
EventRequestContent.NoPayload(event.asJson),
EventSender.EventContext(categoryName, show"$categoryName: sending event $event failed")
)
.returning(returning)
def givenSending[E](event: E,
categoryName: CategoryName,
maxRetriesNumber: Option[Int],
returning: Try[Unit]
)(implicit encoder: Encoder[E], show: Show[E]) = {

val ctxMessage = show"$categoryName: sending event $event failed"
val ctx = maxRetriesNumber
.map(EventSender.EventContext(categoryName, ctxMessage, _))
.getOrElse(EventSender.EventContext(categoryName, ctxMessage))

(eventSender
.sendEvent(_: EventRequestContent.NoPayload, _: EventSender.EventContext))
.expects(EventRequestContent.NoPayload(event.asJson), ctx)
.returning(returning)
}

def givenSending[E, P](event: E, payload: P, categoryName: CategoryName, returning: Try[Unit])(implicit
eventEncoder: Encoder[E],
Expand Down

0 comments on commit 17b9f64

Please sign in to comment.