Skip to content

Commit

Permalink
fix: rollbacktoawaitingdeletion status change to retry on deadlock (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jachro authored Aug 31, 2023
1 parent ebde622 commit 76495ae
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
* limitations under the License.
*/

package io.renku.eventlog.events.consumers.statuschange.rollbacktoawaitingdeletion
package io.renku.eventlog.events.consumers.statuschange
package rollbacktoawaitingdeletion

import cats.effect.MonadCancelThrow
import cats.syntax.all._
Expand All @@ -32,13 +33,14 @@ import io.renku.eventlog.metrics.QueriesExecutionTimes
import io.renku.graph.model.events.EventStatus.{AwaitingDeletion, Deleting}
import io.renku.graph.model.events.{EventStatus, ExecutionDate}
import io.renku.graph.model.projects
import org.typelevel.log4cats.Logger
import skunk._
import skunk.data.Completion
import skunk.implicits._

import java.time.Instant

private[statuschange] class DbUpdater[F[_]: MonadCancelThrow: QueriesExecutionTimes](
private[statuschange] class DbUpdater[F[_]: MonadCancelThrow: Logger: QueriesExecutionTimes](
now: () => Instant = () => Instant.now
) extends DbClient(Some(QueriesExecutionTimes[F]))
with statuschange.DBUpdater[F, RollbackToAwaitingDeletion] {
Expand Down Expand Up @@ -66,6 +68,10 @@ private[statuschange] class DbUpdater[F[_]: MonadCancelThrow: QueriesExecutionTi
}
}

override def onRollback(event: RollbackToAwaitingDeletion)(implicit sr: SessionResource[F]): RollbackOp[F] =
RollbackOp.empty
override def onRollback(event: RollbackToAwaitingDeletion)(implicit sr: SessionResource[F]): RollbackOp[F] = {
case SqlState.DeadlockDetected(_) =>
Logger[F].info(show"$categoryName: Deadlock happened while processing $event; retrying") >>
sr.useK(updateDB(event))
.handleErrorWith(onRollback(event))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.renku.eventlog

import cats.data.Kleisli
import cats.effect.IO
import io.renku.graph.model.events._
import io.renku.graph.model.projects
import skunk._
Expand Down Expand Up @@ -105,8 +106,8 @@ trait EventDataFetching {
protected def findEventMessage(eventId: CompoundEventId): Option[EventMessage] =
findEvent(eventId).flatMap(_._3)

protected def findEvent(eventId: CompoundEventId): Option[(ExecutionDate, EventStatus, Option[EventMessage])] =
execute {
protected def findEventIO(eventId: CompoundEventId): IO[Option[(ExecutionDate, EventStatus, Option[EventMessage])]] =
executeIO {
Kleisli { session =>
val query
: Query[EventId *: projects.GitLabId *: EmptyTuple, (ExecutionDate, EventStatus, Option[EventMessage])] =
Expand All @@ -121,6 +122,9 @@ trait EventDataFetching {
}
}

protected def findEvent(eventId: CompoundEventId): Option[(ExecutionDate, EventStatus, Option[EventMessage])] =
findEventIO(eventId).unsafeRunSync()

protected def findProcessingTime(eventId: CompoundEventId): List[(CompoundEventId, EventProcessingTime)] =
execute {
Kleisli { session =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package io.renku.eventlog

import cats.data.Kleisli
import cats.effect.IO
import cats.syntax.all._
import io.circe.Json
import io.renku.eventlog.events.producers.eventdelivery._
import io.renku.events.Generators.{subscriberIds, subscriberUrls}
Expand Down Expand Up @@ -84,6 +86,29 @@ trait EventLogDataProvisioning {
(eventId.id, status, maybeMessage, maybePayload, processingTimes)
}

protected def storeEventIO(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
executionDate: ExecutionDate,
eventDate: EventDate,
eventBody: EventBody,
createdDate: CreatedDate = CreatedDate(Instant.now),
batchDate: BatchDate = BatchDate(Instant.now),
projectSlug: Slug = projectSlugs.generateOne,
maybeMessage: Option[EventMessage] = None,
maybeEventPayload: Option[ZippedEventPayload] = None
): IO[Unit] =
upsertProjectIO(compoundEventId.projectId, projectSlug, eventDate) >>
insertEventIO(compoundEventId,
eventStatus,
executionDate,
eventDate,
eventBody,
createdDate,
batchDate,
maybeMessage
) >>
upsertEventPayloadIO(compoundEventId, eventStatus, maybeEventPayload)

protected def storeEvent(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
executionDate: ExecutionDate,
Expand All @@ -94,21 +119,27 @@ trait EventLogDataProvisioning {
projectSlug: Slug = projectSlugs.generateOne,
maybeMessage: Option[EventMessage] = None,
maybeEventPayload: Option[ZippedEventPayload] = None
): Unit = {
upsertProject(compoundEventId, projectSlug, eventDate)
insertEvent(compoundEventId, eventStatus, executionDate, eventDate, eventBody, createdDate, batchDate, maybeMessage)
upsertEventPayload(compoundEventId, eventStatus, maybeEventPayload)
}
): Unit = storeEventIO(compoundEventId,
eventStatus,
executionDate,
eventDate,
eventBody,
createdDate,
batchDate,
projectSlug,
maybeMessage,
maybeEventPayload
).unsafeRunSync()

protected def insertEvent(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
executionDate: ExecutionDate,
eventDate: EventDate,
eventBody: EventBody,
createdDate: CreatedDate,
batchDate: BatchDate,
maybeMessage: Option[EventMessage]
): Unit = execute {
protected def insertEventIO(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
executionDate: ExecutionDate,
eventDate: EventDate,
eventBody: EventBody,
createdDate: CreatedDate,
batchDate: BatchDate,
maybeMessage: Option[EventMessage]
): IO[Unit] = executeIO {
Kleisli { session =>
maybeMessage match {
case None =>
Expand Down Expand Up @@ -147,6 +178,24 @@ trait EventLogDataProvisioning {
}
}

protected def insertEvent(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
executionDate: ExecutionDate,
eventDate: EventDate,
eventBody: EventBody,
createdDate: CreatedDate,
batchDate: BatchDate,
maybeMessage: Option[EventMessage]
): Unit = insertEventIO(compoundEventId,
eventStatus,
executionDate,
eventDate,
eventBody,
createdDate,
batchDate,
maybeMessage
).unsafeRunSync()

protected def upsertProject(compoundEventId: CompoundEventId,
projectSlug: projects.Slug,
eventDate: EventDate
Expand All @@ -155,28 +204,33 @@ trait EventLogDataProvisioning {
protected def upsertProject(project: consumers.Project, eventDate: EventDate): Unit =
upsertProject(project.id, project.slug, eventDate)

protected def upsertProject(projectId: projects.GitLabId, projectSlug: projects.Slug, eventDate: EventDate): Unit =
execute {
Kleisli { session =>
val query: Command[projects.GitLabId *: projects.Slug *: EventDate *: EmptyTuple] =
sql"""INSERT INTO project (project_id, project_slug, latest_event_date)
protected def upsertProjectIO(projectId: projects.GitLabId,
projectSlug: projects.Slug,
eventDate: EventDate
): IO[Unit] = executeIO {
Kleisli { session =>
val query: Command[projects.GitLabId *: projects.Slug *: EventDate *: EmptyTuple] =
sql"""INSERT INTO project (project_id, project_slug, latest_event_date)
VALUES ($projectIdEncoder, $projectSlugEncoder, $eventDateEncoder)
ON CONFLICT (project_id)
DO UPDATE SET latest_event_date = excluded.latest_event_date WHERE excluded.latest_event_date > project.latest_event_date
""".command
session.prepare(query).flatMap(_.execute(projectId *: projectSlug *: eventDate *: EmptyTuple)).void
}
session.prepare(query).flatMap(_.execute(projectId *: projectSlug *: eventDate *: EmptyTuple)).void
}
}

protected def upsertEventPayload(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
maybePayload: Option[ZippedEventPayload]
): Unit = eventStatus match {
protected def upsertProject(projectId: projects.GitLabId, projectSlug: projects.Slug, eventDate: EventDate): Unit =
upsertProjectIO(projectId, projectSlug, eventDate).unsafeRunSync()

protected def upsertEventPayloadIO(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
maybePayload: Option[ZippedEventPayload]
): IO[Unit] = eventStatus match {
case TriplesGenerated | TransformingTriples | TransformationRecoverableFailure |
TransformationNonRecoverableFailure | TriplesStore | AwaitingDeletion =>
maybePayload
.map { payload =>
execute[Unit] {
executeIO {
Kleisli { session =>
val query: Command[EventId *: projects.GitLabId *: ZippedEventPayload *: EmptyTuple] = sql"""
INSERT INTO event_payload (event_id, project_id, payload)
Expand All @@ -191,10 +245,15 @@ trait EventLogDataProvisioning {
}
}
}
.getOrElse(())
case _ => ()
.getOrElse(().pure[IO])
case _ => ().pure[IO]
}

protected def upsertEventPayload(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
maybePayload: Option[ZippedEventPayload]
): Unit = upsertEventPayloadIO(compoundEventId, eventStatus, maybePayload).unsafeRunSync()

protected def upsertProcessingTime(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
processingTime: EventProcessingTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,82 +16,96 @@
* limitations under the License.
*/

package io.renku.eventlog.events.consumers.statuschange.rollbacktoawaitingdeletion
package io.renku.eventlog.events.consumers.statuschange
package rollbacktoawaitingdeletion

import SkunkExceptionsGenerators.postgresErrors
import cats.effect.IO
import io.renku.eventlog.events.consumers.statuschange.DBUpdateResults
import cats.effect.testing.scalatest.AsyncIOSpec
import io.renku.eventlog.api.events.StatusChangeEvent.RollbackToAwaitingDeletion
import io.renku.eventlog.metrics.QueriesExecutionTimes
import io.renku.eventlog.{InMemoryEventLogDbSpec, TypeSerializers}
import io.renku.events.consumers.ConsumersModelGenerators.consumerProjects
import io.renku.events.consumers.Project
import io.renku.generators.Generators.Implicits._
import io.renku.generators.Generators.timestampsNotInTheFuture
import io.renku.generators.Generators.{exceptions, timestampsNotInTheFuture}
import io.renku.graph.model.EventsGenerators.{eventBodies, eventIds, eventStatuses}
import io.renku.graph.model.GraphModelGenerators.{projectIds, projectSlugs}
import io.renku.graph.model.events.EventStatus._
import io.renku.graph.model.events._
import io.renku.interpreters.TestLogger
import io.renku.metrics.TestMetricsRegistry
import io.renku.testtools.IOSpec
import org.scalamock.scalatest.MockFactory
import org.scalatest.OptionValues
import org.scalatest.matchers.should
import org.scalatest.wordspec.AnyWordSpec

import java.time.Instant
import org.scalatest.wordspec.AsyncWordSpec
import skunk.SqlState.DeadlockDetected

class DbUpdaterSpec
extends AnyWordSpec
with IOSpec
extends AsyncWordSpec
with AsyncIOSpec
with InMemoryEventLogDbSpec
with TypeSerializers
with should.Matchers
with MockFactory {
with OptionValues {

"updateDB" should {

s"change status of all the event in the $Deleting status of a given project to $AwaitingDeletion" in new TestCase {
s"change status of all the event in the $Deleting status of a given project to $AwaitingDeletion" in {

val project = consumerProjects.generateOne

val otherStatus = eventStatuses.filter(_ != Deleting).generateOne
val event1Id = addEvent(Deleting)
val event2Id = addEvent(otherStatus)
val event3Id = addEvent(Deleting)
sessionResource
.useK(dbUpdater updateDB RollbackToAwaitingDeletion(Project(projectId, projectSlug)))
.unsafeRunSync() shouldBe DBUpdateResults.ForProjects(
projectSlug,
Map(Deleting -> -2, AwaitingDeletion -> 2)
)

findEvent(CompoundEventId(event1Id, projectId)).map(_._2) shouldBe Some(AwaitingDeletion)
findEvent(CompoundEventId(event2Id, projectId)).map(_._2) shouldBe Some(otherStatus)
findEvent(CompoundEventId(event3Id, projectId)).map(_._2) shouldBe Some(AwaitingDeletion)
}
for {
event1Id <- addEvent(Deleting, project)
event2Id <- addEvent(otherStatus, project)
event3Id <- addEvent(Deleting, project)

_ <- sessionResource
.useK(dbUpdater updateDB RollbackToAwaitingDeletion(project))
.asserting(
_ shouldBe DBUpdateResults.ForProjects(project.slug, Map(Deleting -> -2, AwaitingDeletion -> 2))
)
_ <- findEventIO(CompoundEventId(event1Id, project.id)).asserting(_.value._2 shouldBe AwaitingDeletion)
_ <- findEventIO(CompoundEventId(event2Id, project.id)).asserting(_.value._2 shouldBe otherStatus)
_ <- findEventIO(CompoundEventId(event3Id, project.id)).asserting(_.value._2 shouldBe AwaitingDeletion)
} yield ()
}
}

private trait TestCase {

val projectId = projectIds.generateOne
val projectSlug = projectSlugs.generateOne
"onRollback" should {

val currentTime = mockFunction[Instant]
private implicit val metricsRegistry: TestMetricsRegistry[IO] = TestMetricsRegistry[IO]
private implicit val queriesExecTimes: QueriesExecutionTimes[IO] = QueriesExecutionTimes[IO]().unsafeRunSync()
val dbUpdater = new DbUpdater[IO](currentTime)
"retry on DeadlockDetected" in {

val now = Instant.now()
currentTime.expects().returning(now).anyNumberOfTimes()
val project = consumerProjects.generateOne

def addEvent(status: EventStatus): EventId = {
val eventId = CompoundEventId(eventIds.generateOne, projectId)
addEvent(Deleting, project) >>
dbUpdater
.onRollback(RollbackToAwaitingDeletion(project))
.apply(postgresErrors(DeadlockDetected).generateOne)
.asserting(_ shouldBe DBUpdateResults.ForProjects(project.slug, Map(Deleting -> -1, AwaitingDeletion -> 1)))
}

storeEvent(
eventId,
status,
timestampsNotInTheFuture.generateAs(ExecutionDate),
timestampsNotInTheFuture.generateAs(EventDate),
eventBodies.generateOne,
projectSlug = projectSlug
)
eventId.id
"not be defined for an exception different than DeadlockDetected" in {
dbUpdater
.onRollback(RollbackToAwaitingDeletion(consumerProjects.generateOne))
.isDefinedAt(exceptions.generateOne) shouldBe false
}
}

private implicit val logger: TestLogger[IO] = TestLogger[IO]()
private implicit val metricsRegistry: TestMetricsRegistry[IO] = TestMetricsRegistry[IO]
private implicit val queriesExecTimes: QueriesExecutionTimes[IO] = QueriesExecutionTimes[IO]().unsafeRunSync()
private lazy val dbUpdater = new DbUpdater[IO]()

private def addEvent(status: EventStatus, project: Project): IO[EventId] = {
val eventId = CompoundEventId(eventIds.generateOne, project.id)

storeEventIO(
eventId,
status,
timestampsNotInTheFuture.generateAs(ExecutionDate),
timestampsNotInTheFuture.generateAs(EventDate),
eventBodies.generateOne,
projectSlug = project.slug
).as(eventId.id)
}
}

0 comments on commit 76495ae

Please sign in to comment.