diff --git a/graph-commons/src/main/scala/io/renku/events/producers/EventSender.scala b/graph-commons/src/main/scala/io/renku/events/producers/EventSender.scala index a1bf2aefb7..f4ae257dc8 100644 --- a/graph-commons/src/main/scala/io/renku/events/producers/EventSender.scala +++ b/graph-commons/src/main/scala/io/renku/events/producers/EventSender.scala @@ -57,12 +57,32 @@ object EventSender { ): F[EventSender[F]] = for { consumerUrl <- consumerUrlFactory(config) sentEventsGauge <- SentEventsGauge[F] - } yield new EventSenderImpl(consumerUrl, sentEventsGauge, onBusySleep = 2 seconds, onErrorSleep = 15 seconds) + } yield new EventSenderImpl(consumerUrl, sentEventsGauge, onBusySleep = 15 seconds, onErrorSleep = 30 seconds) def apply[F[_]: Async: Logger: MetricsRegistry](consumerUrlFactory: EventConsumerUrlFactory): F[EventSender[F]] = apply(consumerUrlFactory, ConfigFactory.load) - final case class EventContext(categoryName: CategoryName, errorMessage: String) + final case class EventContext(categoryName: CategoryName, + errorMessage: String, + retries: Option[EventContext.Retries] + ) { + def nextAttempt(): EventContext = copy(retries = retries.map(_.nextAttempt())) + lazy val hasAttemptsLeft: Boolean = retries.forall(_.hasAttemptsLeft) + } + + object EventContext { + + final case class Retries(max: Int, left: Int) { + def nextAttempt(): Retries = copy(left = left - 1) + lazy val hasAttemptsLeft: Boolean = left > 0 + } + + def apply(categoryName: CategoryName, errorMessage: String): EventContext = + EventContext(categoryName, errorMessage, retries = None) + + def apply(categoryName: CategoryName, errorMessage: String, maxRetriesNumber: Int): EventContext = + EventContext(categoryName, errorMessage, Retries(maxRetriesNumber, maxRetriesNumber).some) + } } class EventSenderImpl[F[_]: Async: Logger]( @@ -113,11 +133,17 @@ class EventSenderImpl[F[_]: Async: Logger]( private def sendWithRetry(request: Request[F], context: EventContext): F[Status] = send(request)(responseMapping) - .recoverWith(retryOnServerError(Eval.always(sendWithRetry(request, context)), context)) + .recoverWith { ex => + val updatedContext = context.nextAttempt() + retryOnServerError(Eval.always(sendWithRetry(request, updatedContext)), updatedContext)(ex) + } private def retryOnServerError(retry: Eval[F[Status]], context: EventContext ): PartialFunction[Throwable, F[Status]] = { + case exception if !context.hasAttemptsLeft => + val message = s"${context.errorMessage} - no more attempts left" + Logger[F].error(exception)(message) >> new Exception(message, exception).raiseError[F, Status] case UnexpectedResponseException(TooManyRequests, _) => waitAndRetry(retry, context.errorMessage) case exception @ UnexpectedResponseException(ServiceUnavailable | GatewayTimeout | BadGateway, _) => diff --git a/graph-commons/src/test/scala/io/renku/events/producers/EventSenderSpec.scala b/graph-commons/src/test/scala/io/renku/events/producers/EventSenderSpec.scala index 066255c53c..daba036bb9 100644 --- a/graph-commons/src/test/scala/io/renku/events/producers/EventSenderSpec.scala +++ b/graph-commons/src/test/scala/io/renku/events/producers/EventSenderSpec.scala @@ -28,7 +28,7 @@ import eu.timepit.refined.auto._ import io.renku.events.CategoryName import io.renku.events.Generators._ import io.renku.generators.Generators.Implicits._ -import io.renku.generators.Generators.nonEmptyStrings +import io.renku.generators.Generators.{nonEmptyStrings, positiveInts} import io.renku.graph.config.EventConsumerUrl import io.renku.graph.metrics.SentEventsGauge import io.renku.interpreters.TestLogger @@ -36,6 +36,7 @@ import io.renku.stubbing.ExternalServiceStubbing import io.renku.testtools.IOSpec import org.http4s.Status.{Accepted, BadGateway, GatewayTimeout, NotFound, ServiceUnavailable, TooManyRequests} import org.scalamock.scalatest.MockFactory +import org.scalatest.OptionValues import org.scalatest.matchers.should import org.scalatest.prop.TableDrivenPropertyChecks import org.scalatest.wordspec.AnyWordSpec @@ -48,6 +49,7 @@ class EventSenderSpec with ExternalServiceStubbing with MockFactory with should.Matchers + with OptionValues with TableDrivenPropertyChecks { forAll { @@ -55,12 +57,12 @@ class EventSenderSpec "Request Content Type" -> "SendEvent generator", "No Payload" -> { (sender: EventSender[IO], categoryName: CategoryName) => eventRequestContentNoPayloads.map(ev => - sender.sendEvent(ev, EventContext(categoryName, nonEmptyStrings().generateOne)) + sender.sendEvent(ev, EventContext(categoryName, nonEmptyStrings().generateOne, maxRetriesNumber = 3)) ) }, "With Payload" -> { (sender: EventSender[IO], categoryName: CategoryName) => eventRequestContentWithZippedPayloads.map(ev => - sender.sendEvent(ev, EventContext(categoryName, nonEmptyStrings().generateOne)) + sender.sendEvent(ev, EventContext(categoryName, nonEmptyStrings().generateOne, maxRetriesNumber = 3)) ) } ) @@ -84,6 +86,7 @@ class EventSenderSpec Set(TooManyRequests, BadGateway, ServiceUnavailable, GatewayTimeout) foreach { errorStatus => s"retry if remote responds with status such as $errorStatus" in new TestCase { + val eventRequest = post(urlEqualTo(s"/events")).inScenario("Retry") stubFor { @@ -152,6 +155,62 @@ class EventSenderSpec reset() } } + + "fail when the specified number of retries is exceeded" in new TestCase { + + val eventRequest = post(urlEqualTo(s"/events")) + val response = aResponse().withFault(CONNECTION_RESET_BY_PEER) + + stubFor { + eventRequest.willReturn(response) + } + + (sentEventsGauge.increment _).expects(categoryName).returning(().pure[IO]).anyNumberOfTimes() + + intercept[Exception]( + callGenerator(eventSender, categoryName).generateOne.unsafeRunSync() + ).getMessage should endWith("no more attempts left") + } + } + } + + "EventContext.nextAttempt" should { + + "return a copy of the context object with decremented number of retries" in { + + val initialRetries = positiveInts().generateOne.value + val ctx = generateEventContext(maxRetries = initialRetries.some) + + ctx.retries.value.max shouldBe initialRetries + ctx.retries.value.left shouldBe initialRetries + + val nextAttempt = ctx.nextAttempt() + nextAttempt.retries.value.max shouldBe initialRetries + nextAttempt.retries.value.left shouldBe initialRetries - 1 + } + + "return the same instance if no retries specified" in { + + val ctx = generateEventContext(maxRetries = None) + + ctx.retries shouldBe None + + ctx.nextAttempt() shouldBe ctx + } + } + + "EventContext.hasAttemptsLeft" should { + + "return true when the number of retries is > 0" in { + generateEventContext(maxRetries = positiveInts().generateSome.map(_.value)).hasAttemptsLeft shouldBe true + } + + "return false when the number of retries is <= 0" in { + generateEventContext(maxRetries = 0.some).hasAttemptsLeft shouldBe false + } + + "return true when no retries specified" in { + generateEventContext(maxRetries = None).hasAttemptsLeft shouldBe true } } @@ -175,4 +234,12 @@ class EventSenderSpec requestTimeoutOverride = Some(requestTimeout) ) } + + private def generateEventContext(maxRetries: Option[Int]) = + maxRetries match { + case None => + EventContext(categoryNames.generateOne, nonEmptyStrings().generateOne) + case Some(retries) => + EventContext(categoryNames.generateOne, nonEmptyStrings().generateOne, retries) + } } diff --git a/triples-generator-api/src/main/scala/io/renku/triplesgenerator/api/events/Client.scala b/triples-generator-api/src/main/scala/io/renku/triplesgenerator/api/events/Client.scala index 320e29b390..420c77b139 100644 --- a/triples-generator-api/src/main/scala/io/renku/triplesgenerator/api/events/Client.scala +++ b/triples-generator-api/src/main/scala/io/renku/triplesgenerator/api/events/Client.scala @@ -51,28 +51,33 @@ private class ClientImpl[F[_]](eventSender: EventSender[F]) extends Client[F] { import io.circe.syntax._ override def send(event: CleanUpEvent): F[Unit] = - send(event, CleanUpEvent.categoryName) + send(event, CleanUpEvent.categoryName, maxRetriesNumber = None) override def send(event: DatasetViewedEvent): F[Unit] = - send(event, DatasetViewedEvent.categoryName) + send(event, DatasetViewedEvent.categoryName, maxRetriesNumber = 5.some) override def send(event: ProjectActivated): F[Unit] = - send(event, ProjectActivated.categoryName) + send(event, ProjectActivated.categoryName, maxRetriesNumber = None) override def send(event: ProjectViewedEvent): F[Unit] = - send(event, ProjectViewedEvent.categoryName) + send(event, ProjectViewedEvent.categoryName, maxRetriesNumber = 5.some) override def send(event: ProjectViewingDeletion): F[Unit] = - send(event, ProjectViewingDeletion.categoryName) + send(event, ProjectViewingDeletion.categoryName, maxRetriesNumber = None) override def send(event: SyncRepoMetadata): F[Unit] = - send(event, SyncRepoMetadata.categoryName) + send(event, SyncRepoMetadata.categoryName, maxRetriesNumber = None) - private def send[E](event: E, category: CategoryName)(implicit enc: Encoder[E], show: Show[E]): F[Unit] = + private def send[E](event: E, category: CategoryName, maxRetriesNumber: Option[Int])(implicit + enc: Encoder[E], + show: Show[E] + ): F[Unit] = { + val message = show"$category: sending event $event failed" eventSender.sendEvent( EventRequestContent.NoPayload(event.asJson), - EventContext(category, show"$category: sending event $event failed") + maxRetriesNumber.map(mr => EventContext(category, message, mr)).getOrElse(EventContext(category, message)) ) + } private def send[E, P](event: E, payload: P, category: CategoryName)(implicit eventEnc: Encoder[E], diff --git a/triples-generator-api/src/test/scala/io/renku/triplesgenerator/api/events/ClientSpec.scala b/triples-generator-api/src/test/scala/io/renku/triplesgenerator/api/events/ClientSpec.scala index 54fb5c1190..f7cc722bd1 100644 --- a/triples-generator-api/src/test/scala/io/renku/triplesgenerator/api/events/ClientSpec.scala +++ b/triples-generator-api/src/test/scala/io/renku/triplesgenerator/api/events/ClientSpec.scala @@ -43,7 +43,7 @@ class ClientSpec extends AnyWordSpec with should.Matchers with MockFactory with val event = projectActivatedEvents.generateOne - givenSending(event, ProjectActivated.categoryName, returning = ().pure[Try]) + givenSending(event, ProjectActivated.categoryName, maxRetriesNumber = None, returning = ().pure[Try]) client.send(event).success.value shouldBe () } @@ -55,7 +55,7 @@ class ClientSpec extends AnyWordSpec with should.Matchers with MockFactory with val event = datasetViewedEvents.generateOne - givenSending(event, DatasetViewedEvent.categoryName, returning = ().pure[Try]) + givenSending(event, DatasetViewedEvent.categoryName, maxRetriesNumber = 5.some, returning = ().pure[Try]) client.send(event).success.value shouldBe () } @@ -67,7 +67,7 @@ class ClientSpec extends AnyWordSpec with should.Matchers with MockFactory with val event = projectViewedEvents.generateOne - givenSending(event, ProjectViewedEvent.categoryName, returning = ().pure[Try]) + givenSending(event, ProjectViewedEvent.categoryName, maxRetriesNumber = 5.some, returning = ().pure[Try]) client.send(event).success.value shouldBe () } @@ -79,7 +79,7 @@ class ClientSpec extends AnyWordSpec with should.Matchers with MockFactory with val event = projectViewingDeletions.generateOne - givenSending(event, ProjectViewingDeletion.categoryName, returning = ().pure[Try]) + givenSending(event, ProjectViewingDeletion.categoryName, maxRetriesNumber = None, returning = ().pure[Try]) client.send(event).success.value shouldBe () } @@ -91,7 +91,7 @@ class ClientSpec extends AnyWordSpec with should.Matchers with MockFactory with val event = syncRepoMetadataEvents.generateOne - givenSending(event, SyncRepoMetadata.categoryName, returning = ().pure[Try]) + givenSending(event, SyncRepoMetadata.categoryName, maxRetriesNumber = None, returning = ().pure[Try]) client.send(event).success.value shouldBe () } @@ -102,16 +102,22 @@ class ClientSpec extends AnyWordSpec with should.Matchers with MockFactory with private val eventSender = mock[EventSender[Try]] val client = new ClientImpl[Try](eventSender) - def givenSending[E](event: E, categoryName: CategoryName, returning: Try[Unit])(implicit - encoder: Encoder[E], - show: Show[E] - ) = (eventSender - .sendEvent(_: EventRequestContent.NoPayload, _: EventSender.EventContext)) - .expects( - EventRequestContent.NoPayload(event.asJson), - EventSender.EventContext(categoryName, show"$categoryName: sending event $event failed") - ) - .returning(returning) + def givenSending[E](event: E, + categoryName: CategoryName, + maxRetriesNumber: Option[Int], + returning: Try[Unit] + )(implicit encoder: Encoder[E], show: Show[E]) = { + + val ctxMessage = show"$categoryName: sending event $event failed" + val ctx = maxRetriesNumber + .map(EventSender.EventContext(categoryName, ctxMessage, _)) + .getOrElse(EventSender.EventContext(categoryName, ctxMessage)) + + (eventSender + .sendEvent(_: EventRequestContent.NoPayload, _: EventSender.EventContext)) + .expects(EventRequestContent.NoPayload(event.asJson), ctx) + .returning(returning) + } def givenSending[E, P](event: E, payload: P, categoryName: CategoryName, returning: Try[Unit])(implicit eventEncoder: Encoder[E],