From da36f51a56e25ec3a740d438300bb67772b4442b Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Wed, 30 Aug 2023 19:10:03 +0200 Subject: [PATCH] fix: rollbacktoawaitingdeletion status change to retry on deadlock --- .../DbUpdater.scala | 14 ++- .../io/renku/eventlog/EventDataFetching.scala | 8 +- .../eventlog/EventLogDataProvisioning.scala | 115 +++++++++++++----- .../DbUpdaterSpec.scala | 110 +++++++++-------- 4 files changed, 165 insertions(+), 82 deletions(-) diff --git a/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/rollbacktoawaitingdeletion/DbUpdater.scala b/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/rollbacktoawaitingdeletion/DbUpdater.scala index 62e3bda31e..ba2b56dcf6 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/rollbacktoawaitingdeletion/DbUpdater.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/rollbacktoawaitingdeletion/DbUpdater.scala @@ -16,7 +16,8 @@ * limitations under the License. */ -package io.renku.eventlog.events.consumers.statuschange.rollbacktoawaitingdeletion +package io.renku.eventlog.events.consumers.statuschange +package rollbacktoawaitingdeletion import cats.effect.MonadCancelThrow import cats.syntax.all._ @@ -32,13 +33,14 @@ import io.renku.eventlog.metrics.QueriesExecutionTimes import io.renku.graph.model.events.EventStatus.{AwaitingDeletion, Deleting} import io.renku.graph.model.events.{EventStatus, ExecutionDate} import io.renku.graph.model.projects +import org.typelevel.log4cats.Logger import skunk._ import skunk.data.Completion import skunk.implicits._ import java.time.Instant -private[statuschange] class DbUpdater[F[_]: MonadCancelThrow: QueriesExecutionTimes]( +private[statuschange] class DbUpdater[F[_]: MonadCancelThrow: Logger: QueriesExecutionTimes]( now: () => Instant = () => Instant.now ) extends DbClient(Some(QueriesExecutionTimes[F])) with statuschange.DBUpdater[F, RollbackToAwaitingDeletion] { @@ -66,6 +68,10 @@ private[statuschange] class DbUpdater[F[_]: MonadCancelThrow: QueriesExecutionTi } } - override def onRollback(event: RollbackToAwaitingDeletion)(implicit sr: SessionResource[F]): RollbackOp[F] = - RollbackOp.empty + override def onRollback(event: RollbackToAwaitingDeletion)(implicit sr: SessionResource[F]): RollbackOp[F] = { + case SqlState.DeadlockDetected(_) => + Logger[F].info(show"$categoryName: Deadlock happened while processing $event; retrying") >> + sr.useK(updateDB(event)) + .handleErrorWith(onRollback(event)) + } } diff --git a/event-log/src/test/scala/io/renku/eventlog/EventDataFetching.scala b/event-log/src/test/scala/io/renku/eventlog/EventDataFetching.scala index 3580080073..3855f874a8 100644 --- a/event-log/src/test/scala/io/renku/eventlog/EventDataFetching.scala +++ b/event-log/src/test/scala/io/renku/eventlog/EventDataFetching.scala @@ -19,6 +19,7 @@ package io.renku.eventlog import cats.data.Kleisli +import cats.effect.IO import io.renku.graph.model.events._ import io.renku.graph.model.projects import skunk._ @@ -105,8 +106,8 @@ trait EventDataFetching { protected def findEventMessage(eventId: CompoundEventId): Option[EventMessage] = findEvent(eventId).flatMap(_._3) - protected def findEvent(eventId: CompoundEventId): Option[(ExecutionDate, EventStatus, Option[EventMessage])] = - execute { + protected def findEventIO(eventId: CompoundEventId): IO[Option[(ExecutionDate, EventStatus, Option[EventMessage])]] = + executeIO { Kleisli { session => val query : Query[EventId *: projects.GitLabId *: EmptyTuple, (ExecutionDate, EventStatus, Option[EventMessage])] = @@ -121,6 +122,9 @@ trait EventDataFetching { } } + protected def findEvent(eventId: CompoundEventId): Option[(ExecutionDate, EventStatus, Option[EventMessage])] = + findEventIO(eventId).unsafeRunSync() + protected def findProcessingTime(eventId: CompoundEventId): List[(CompoundEventId, EventProcessingTime)] = execute { Kleisli { session => diff --git a/event-log/src/test/scala/io/renku/eventlog/EventLogDataProvisioning.scala b/event-log/src/test/scala/io/renku/eventlog/EventLogDataProvisioning.scala index ff266aa716..8d0a4717de 100644 --- a/event-log/src/test/scala/io/renku/eventlog/EventLogDataProvisioning.scala +++ b/event-log/src/test/scala/io/renku/eventlog/EventLogDataProvisioning.scala @@ -19,6 +19,8 @@ package io.renku.eventlog import cats.data.Kleisli +import cats.effect.IO +import cats.syntax.all._ import io.circe.Json import io.renku.eventlog.events.producers.eventdelivery._ import io.renku.events.Generators.{subscriberIds, subscriberUrls} @@ -84,6 +86,29 @@ trait EventLogDataProvisioning { (eventId.id, status, maybeMessage, maybePayload, processingTimes) } + protected def storeEventIO(compoundEventId: CompoundEventId, + eventStatus: EventStatus, + executionDate: ExecutionDate, + eventDate: EventDate, + eventBody: EventBody, + createdDate: CreatedDate = CreatedDate(Instant.now), + batchDate: BatchDate = BatchDate(Instant.now), + projectSlug: Slug = projectSlugs.generateOne, + maybeMessage: Option[EventMessage] = None, + maybeEventPayload: Option[ZippedEventPayload] = None + ): IO[Unit] = + upsertProjectIO(compoundEventId.projectId, projectSlug, eventDate) >> + insertEventIO(compoundEventId, + eventStatus, + executionDate, + eventDate, + eventBody, + createdDate, + batchDate, + maybeMessage + ) >> + upsertEventPayloadIO(compoundEventId, eventStatus, maybeEventPayload) + protected def storeEvent(compoundEventId: CompoundEventId, eventStatus: EventStatus, executionDate: ExecutionDate, @@ -94,21 +119,27 @@ trait EventLogDataProvisioning { projectSlug: Slug = projectSlugs.generateOne, maybeMessage: Option[EventMessage] = None, maybeEventPayload: Option[ZippedEventPayload] = None - ): Unit = { - upsertProject(compoundEventId, projectSlug, eventDate) - insertEvent(compoundEventId, eventStatus, executionDate, eventDate, eventBody, createdDate, batchDate, maybeMessage) - upsertEventPayload(compoundEventId, eventStatus, maybeEventPayload) - } + ): Unit = storeEventIO(compoundEventId, + eventStatus, + executionDate, + eventDate, + eventBody, + createdDate, + batchDate, + projectSlug, + maybeMessage, + maybeEventPayload + ).unsafeRunSync() - protected def insertEvent(compoundEventId: CompoundEventId, - eventStatus: EventStatus, - executionDate: ExecutionDate, - eventDate: EventDate, - eventBody: EventBody, - createdDate: CreatedDate, - batchDate: BatchDate, - maybeMessage: Option[EventMessage] - ): Unit = execute { + protected def insertEventIO(compoundEventId: CompoundEventId, + eventStatus: EventStatus, + executionDate: ExecutionDate, + eventDate: EventDate, + eventBody: EventBody, + createdDate: CreatedDate, + batchDate: BatchDate, + maybeMessage: Option[EventMessage] + ): IO[Unit] = executeIO { Kleisli { session => maybeMessage match { case None => @@ -147,6 +178,24 @@ trait EventLogDataProvisioning { } } + protected def insertEvent(compoundEventId: CompoundEventId, + eventStatus: EventStatus, + executionDate: ExecutionDate, + eventDate: EventDate, + eventBody: EventBody, + createdDate: CreatedDate, + batchDate: BatchDate, + maybeMessage: Option[EventMessage] + ): Unit = insertEventIO(compoundEventId, + eventStatus, + executionDate, + eventDate, + eventBody, + createdDate, + batchDate, + maybeMessage + ).unsafeRunSync() + protected def upsertProject(compoundEventId: CompoundEventId, projectSlug: projects.Slug, eventDate: EventDate @@ -155,28 +204,33 @@ trait EventLogDataProvisioning { protected def upsertProject(project: consumers.Project, eventDate: EventDate): Unit = upsertProject(project.id, project.slug, eventDate) - protected def upsertProject(projectId: projects.GitLabId, projectSlug: projects.Slug, eventDate: EventDate): Unit = - execute { - Kleisli { session => - val query: Command[projects.GitLabId *: projects.Slug *: EventDate *: EmptyTuple] = - sql"""INSERT INTO project (project_id, project_slug, latest_event_date) + protected def upsertProjectIO(projectId: projects.GitLabId, + projectSlug: projects.Slug, + eventDate: EventDate + ): IO[Unit] = executeIO { + Kleisli { session => + val query: Command[projects.GitLabId *: projects.Slug *: EventDate *: EmptyTuple] = + sql"""INSERT INTO project (project_id, project_slug, latest_event_date) VALUES ($projectIdEncoder, $projectSlugEncoder, $eventDateEncoder) ON CONFLICT (project_id) DO UPDATE SET latest_event_date = excluded.latest_event_date WHERE excluded.latest_event_date > project.latest_event_date """.command - session.prepare(query).flatMap(_.execute(projectId *: projectSlug *: eventDate *: EmptyTuple)).void - } + session.prepare(query).flatMap(_.execute(projectId *: projectSlug *: eventDate *: EmptyTuple)).void } + } - protected def upsertEventPayload(compoundEventId: CompoundEventId, - eventStatus: EventStatus, - maybePayload: Option[ZippedEventPayload] - ): Unit = eventStatus match { + protected def upsertProject(projectId: projects.GitLabId, projectSlug: projects.Slug, eventDate: EventDate): Unit = + upsertProjectIO(projectId, projectSlug, eventDate).unsafeRunSync() + + protected def upsertEventPayloadIO(compoundEventId: CompoundEventId, + eventStatus: EventStatus, + maybePayload: Option[ZippedEventPayload] + ): IO[Unit] = eventStatus match { case TriplesGenerated | TransformingTriples | TransformationRecoverableFailure | TransformationNonRecoverableFailure | TriplesStore | AwaitingDeletion => maybePayload .map { payload => - execute[Unit] { + executeIO { Kleisli { session => val query: Command[EventId *: projects.GitLabId *: ZippedEventPayload *: EmptyTuple] = sql""" INSERT INTO event_payload (event_id, project_id, payload) @@ -191,10 +245,15 @@ trait EventLogDataProvisioning { } } } - .getOrElse(()) - case _ => () + .getOrElse(().pure[IO]) + case _ => ().pure[IO] } + protected def upsertEventPayload(compoundEventId: CompoundEventId, + eventStatus: EventStatus, + maybePayload: Option[ZippedEventPayload] + ): Unit = upsertEventPayloadIO(compoundEventId, eventStatus, maybePayload).unsafeRunSync() + protected def upsertProcessingTime(compoundEventId: CompoundEventId, eventStatus: EventStatus, processingTime: EventProcessingTime diff --git a/event-log/src/test/scala/io/renku/eventlog/events/consumers/statuschange/rollbacktoawaitingdeletion/DbUpdaterSpec.scala b/event-log/src/test/scala/io/renku/eventlog/events/consumers/statuschange/rollbacktoawaitingdeletion/DbUpdaterSpec.scala index b0fa48365d..783664869f 100644 --- a/event-log/src/test/scala/io/renku/eventlog/events/consumers/statuschange/rollbacktoawaitingdeletion/DbUpdaterSpec.scala +++ b/event-log/src/test/scala/io/renku/eventlog/events/consumers/statuschange/rollbacktoawaitingdeletion/DbUpdaterSpec.scala @@ -16,82 +16,96 @@ * limitations under the License. */ -package io.renku.eventlog.events.consumers.statuschange.rollbacktoawaitingdeletion +package io.renku.eventlog.events.consumers.statuschange +package rollbacktoawaitingdeletion +import SkunkExceptionsGenerators.postgresErrors import cats.effect.IO -import io.renku.eventlog.events.consumers.statuschange.DBUpdateResults +import cats.effect.testing.scalatest.AsyncIOSpec import io.renku.eventlog.api.events.StatusChangeEvent.RollbackToAwaitingDeletion import io.renku.eventlog.metrics.QueriesExecutionTimes import io.renku.eventlog.{InMemoryEventLogDbSpec, TypeSerializers} +import io.renku.events.consumers.ConsumersModelGenerators.consumerProjects import io.renku.events.consumers.Project import io.renku.generators.Generators.Implicits._ -import io.renku.generators.Generators.timestampsNotInTheFuture +import io.renku.generators.Generators.{exceptions, timestampsNotInTheFuture} import io.renku.graph.model.EventsGenerators.{eventBodies, eventIds, eventStatuses} -import io.renku.graph.model.GraphModelGenerators.{projectIds, projectSlugs} import io.renku.graph.model.events.EventStatus._ import io.renku.graph.model.events._ +import io.renku.interpreters.TestLogger import io.renku.metrics.TestMetricsRegistry -import io.renku.testtools.IOSpec -import org.scalamock.scalatest.MockFactory +import org.scalatest.OptionValues import org.scalatest.matchers.should -import org.scalatest.wordspec.AnyWordSpec - -import java.time.Instant +import org.scalatest.wordspec.AsyncWordSpec +import skunk.SqlState.DeadlockDetected class DbUpdaterSpec - extends AnyWordSpec - with IOSpec + extends AsyncWordSpec + with AsyncIOSpec with InMemoryEventLogDbSpec with TypeSerializers with should.Matchers - with MockFactory { + with OptionValues { "updateDB" should { - s"change status of all the event in the $Deleting status of a given project to $AwaitingDeletion" in new TestCase { + s"change status of all the event in the $Deleting status of a given project to $AwaitingDeletion" in { + + val project = consumerProjects.generateOne + val otherStatus = eventStatuses.filter(_ != Deleting).generateOne - val event1Id = addEvent(Deleting) - val event2Id = addEvent(otherStatus) - val event3Id = addEvent(Deleting) - sessionResource - .useK(dbUpdater updateDB RollbackToAwaitingDeletion(Project(projectId, projectSlug))) - .unsafeRunSync() shouldBe DBUpdateResults.ForProjects( - projectSlug, - Map(Deleting -> -2, AwaitingDeletion -> 2) - ) - - findEvent(CompoundEventId(event1Id, projectId)).map(_._2) shouldBe Some(AwaitingDeletion) - findEvent(CompoundEventId(event2Id, projectId)).map(_._2) shouldBe Some(otherStatus) - findEvent(CompoundEventId(event3Id, projectId)).map(_._2) shouldBe Some(AwaitingDeletion) - } + for { + event1Id <- addEvent(Deleting, project) + event2Id <- addEvent(otherStatus, project) + event3Id <- addEvent(Deleting, project) + _ <- sessionResource + .useK(dbUpdater updateDB RollbackToAwaitingDeletion(project)) + .asserting( + _ shouldBe DBUpdateResults.ForProjects(project.slug, Map(Deleting -> -2, AwaitingDeletion -> 2)) + ) + _ <- findEventIO(CompoundEventId(event1Id, project.id)).asserting(_.value._2 shouldBe AwaitingDeletion) + _ <- findEventIO(CompoundEventId(event2Id, project.id)).asserting(_.value._2 shouldBe otherStatus) + _ <- findEventIO(CompoundEventId(event3Id, project.id)).asserting(_.value._2 shouldBe AwaitingDeletion) + } yield () + } } - private trait TestCase { - - val projectId = projectIds.generateOne - val projectSlug = projectSlugs.generateOne + "onRollback" should { - val currentTime = mockFunction[Instant] - private implicit val metricsRegistry: TestMetricsRegistry[IO] = TestMetricsRegistry[IO] - private implicit val queriesExecTimes: QueriesExecutionTimes[IO] = QueriesExecutionTimes[IO]().unsafeRunSync() - val dbUpdater = new DbUpdater[IO](currentTime) + "retry on DeadlockDetected" in { - val now = Instant.now() - currentTime.expects().returning(now).anyNumberOfTimes() + val project = consumerProjects.generateOne - def addEvent(status: EventStatus): EventId = { - val eventId = CompoundEventId(eventIds.generateOne, projectId) + addEvent(Deleting, project) >> + dbUpdater + .onRollback(RollbackToAwaitingDeletion(project)) + .apply(postgresErrors(DeadlockDetected).generateOne) + .asserting(_ shouldBe DBUpdateResults.ForProjects(project.slug, Map(Deleting -> -1, AwaitingDeletion -> 1))) + } - storeEvent( - eventId, - status, - timestampsNotInTheFuture.generateAs(ExecutionDate), - timestampsNotInTheFuture.generateAs(EventDate), - eventBodies.generateOne, - projectSlug = projectSlug - ) - eventId.id + "not be defined for an exception different than DeadlockDetected" in { + dbUpdater + .onRollback(RollbackToAwaitingDeletion(consumerProjects.generateOne)) + .isDefinedAt(exceptions.generateOne) shouldBe false } } + + private implicit val logger: TestLogger[IO] = TestLogger[IO]() + private implicit val metricsRegistry: TestMetricsRegistry[IO] = TestMetricsRegistry[IO] + private implicit val queriesExecTimes: QueriesExecutionTimes[IO] = QueriesExecutionTimes[IO]().unsafeRunSync() + private lazy val dbUpdater = new DbUpdater[IO]() + + private def addEvent(status: EventStatus, project: Project): IO[EventId] = { + val eventId = CompoundEventId(eventIds.generateOne, project.id) + + storeEventIO( + eventId, + status, + timestampsNotInTheFuture.generateAs(ExecutionDate), + timestampsNotInTheFuture.generateAs(EventDate), + eventBodies.generateOne, + projectSlug = project.slug + ).as(eventId.id) + } }