Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: rollbacktoawaitingdeletion status change to retry on deadlock #1682

Merged
merged 1 commit into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
* limitations under the License.
*/

package io.renku.eventlog.events.consumers.statuschange.rollbacktoawaitingdeletion
package io.renku.eventlog.events.consumers.statuschange
package rollbacktoawaitingdeletion

import cats.effect.MonadCancelThrow
import cats.syntax.all._
Expand All @@ -32,13 +33,14 @@ import io.renku.eventlog.metrics.QueriesExecutionTimes
import io.renku.graph.model.events.EventStatus.{AwaitingDeletion, Deleting}
import io.renku.graph.model.events.{EventStatus, ExecutionDate}
import io.renku.graph.model.projects
import org.typelevel.log4cats.Logger
import skunk._
import skunk.data.Completion
import skunk.implicits._

import java.time.Instant

private[statuschange] class DbUpdater[F[_]: MonadCancelThrow: QueriesExecutionTimes](
private[statuschange] class DbUpdater[F[_]: MonadCancelThrow: Logger: QueriesExecutionTimes](
now: () => Instant = () => Instant.now
) extends DbClient(Some(QueriesExecutionTimes[F]))
with statuschange.DBUpdater[F, RollbackToAwaitingDeletion] {
Expand Down Expand Up @@ -66,6 +68,10 @@ private[statuschange] class DbUpdater[F[_]: MonadCancelThrow: QueriesExecutionTi
}
}

override def onRollback(event: RollbackToAwaitingDeletion)(implicit sr: SessionResource[F]): RollbackOp[F] =
RollbackOp.empty
override def onRollback(event: RollbackToAwaitingDeletion)(implicit sr: SessionResource[F]): RollbackOp[F] = {
case SqlState.DeadlockDetected(_) =>
Logger[F].info(show"$categoryName: Deadlock happened while processing $event; retrying") >>
sr.useK(updateDB(event))
.handleErrorWith(onRollback(event))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.renku.eventlog

import cats.data.Kleisli
import cats.effect.IO
import io.renku.graph.model.events._
import io.renku.graph.model.projects
import skunk._
Expand Down Expand Up @@ -105,8 +106,8 @@ trait EventDataFetching {
protected def findEventMessage(eventId: CompoundEventId): Option[EventMessage] =
findEvent(eventId).flatMap(_._3)

protected def findEvent(eventId: CompoundEventId): Option[(ExecutionDate, EventStatus, Option[EventMessage])] =
execute {
protected def findEventIO(eventId: CompoundEventId): IO[Option[(ExecutionDate, EventStatus, Option[EventMessage])]] =
executeIO {
Kleisli { session =>
val query
: Query[EventId *: projects.GitLabId *: EmptyTuple, (ExecutionDate, EventStatus, Option[EventMessage])] =
Expand All @@ -121,6 +122,9 @@ trait EventDataFetching {
}
}

protected def findEvent(eventId: CompoundEventId): Option[(ExecutionDate, EventStatus, Option[EventMessage])] =
findEventIO(eventId).unsafeRunSync()

protected def findProcessingTime(eventId: CompoundEventId): List[(CompoundEventId, EventProcessingTime)] =
execute {
Kleisli { session =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package io.renku.eventlog

import cats.data.Kleisli
import cats.effect.IO
import cats.syntax.all._
import io.circe.Json
import io.renku.eventlog.events.producers.eventdelivery._
import io.renku.events.Generators.{subscriberIds, subscriberUrls}
Expand Down Expand Up @@ -84,6 +86,29 @@ trait EventLogDataProvisioning {
(eventId.id, status, maybeMessage, maybePayload, processingTimes)
}

protected def storeEventIO(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
executionDate: ExecutionDate,
eventDate: EventDate,
eventBody: EventBody,
createdDate: CreatedDate = CreatedDate(Instant.now),
batchDate: BatchDate = BatchDate(Instant.now),
projectSlug: Slug = projectSlugs.generateOne,
maybeMessage: Option[EventMessage] = None,
maybeEventPayload: Option[ZippedEventPayload] = None
): IO[Unit] =
upsertProjectIO(compoundEventId.projectId, projectSlug, eventDate) >>
insertEventIO(compoundEventId,
eventStatus,
executionDate,
eventDate,
eventBody,
createdDate,
batchDate,
maybeMessage
) >>
upsertEventPayloadIO(compoundEventId, eventStatus, maybeEventPayload)

protected def storeEvent(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
executionDate: ExecutionDate,
Expand All @@ -94,21 +119,27 @@ trait EventLogDataProvisioning {
projectSlug: Slug = projectSlugs.generateOne,
maybeMessage: Option[EventMessage] = None,
maybeEventPayload: Option[ZippedEventPayload] = None
): Unit = {
upsertProject(compoundEventId, projectSlug, eventDate)
insertEvent(compoundEventId, eventStatus, executionDate, eventDate, eventBody, createdDate, batchDate, maybeMessage)
upsertEventPayload(compoundEventId, eventStatus, maybeEventPayload)
}
): Unit = storeEventIO(compoundEventId,
eventStatus,
executionDate,
eventDate,
eventBody,
createdDate,
batchDate,
projectSlug,
maybeMessage,
maybeEventPayload
).unsafeRunSync()

protected def insertEvent(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
executionDate: ExecutionDate,
eventDate: EventDate,
eventBody: EventBody,
createdDate: CreatedDate,
batchDate: BatchDate,
maybeMessage: Option[EventMessage]
): Unit = execute {
protected def insertEventIO(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
executionDate: ExecutionDate,
eventDate: EventDate,
eventBody: EventBody,
createdDate: CreatedDate,
batchDate: BatchDate,
maybeMessage: Option[EventMessage]
): IO[Unit] = executeIO {
Kleisli { session =>
maybeMessage match {
case None =>
Expand Down Expand Up @@ -147,6 +178,24 @@ trait EventLogDataProvisioning {
}
}

protected def insertEvent(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
executionDate: ExecutionDate,
eventDate: EventDate,
eventBody: EventBody,
createdDate: CreatedDate,
batchDate: BatchDate,
maybeMessage: Option[EventMessage]
): Unit = insertEventIO(compoundEventId,
eventStatus,
executionDate,
eventDate,
eventBody,
createdDate,
batchDate,
maybeMessage
).unsafeRunSync()

protected def upsertProject(compoundEventId: CompoundEventId,
projectSlug: projects.Slug,
eventDate: EventDate
Expand All @@ -155,28 +204,33 @@ trait EventLogDataProvisioning {
protected def upsertProject(project: consumers.Project, eventDate: EventDate): Unit =
upsertProject(project.id, project.slug, eventDate)

protected def upsertProject(projectId: projects.GitLabId, projectSlug: projects.Slug, eventDate: EventDate): Unit =
execute {
Kleisli { session =>
val query: Command[projects.GitLabId *: projects.Slug *: EventDate *: EmptyTuple] =
sql"""INSERT INTO project (project_id, project_slug, latest_event_date)
protected def upsertProjectIO(projectId: projects.GitLabId,
projectSlug: projects.Slug,
eventDate: EventDate
): IO[Unit] = executeIO {
Kleisli { session =>
val query: Command[projects.GitLabId *: projects.Slug *: EventDate *: EmptyTuple] =
sql"""INSERT INTO project (project_id, project_slug, latest_event_date)
VALUES ($projectIdEncoder, $projectSlugEncoder, $eventDateEncoder)
ON CONFLICT (project_id)
DO UPDATE SET latest_event_date = excluded.latest_event_date WHERE excluded.latest_event_date > project.latest_event_date
""".command
session.prepare(query).flatMap(_.execute(projectId *: projectSlug *: eventDate *: EmptyTuple)).void
}
session.prepare(query).flatMap(_.execute(projectId *: projectSlug *: eventDate *: EmptyTuple)).void
}
}

protected def upsertEventPayload(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
maybePayload: Option[ZippedEventPayload]
): Unit = eventStatus match {
protected def upsertProject(projectId: projects.GitLabId, projectSlug: projects.Slug, eventDate: EventDate): Unit =
upsertProjectIO(projectId, projectSlug, eventDate).unsafeRunSync()

protected def upsertEventPayloadIO(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
maybePayload: Option[ZippedEventPayload]
): IO[Unit] = eventStatus match {
case TriplesGenerated | TransformingTriples | TransformationRecoverableFailure |
TransformationNonRecoverableFailure | TriplesStore | AwaitingDeletion =>
maybePayload
.map { payload =>
execute[Unit] {
executeIO {
Kleisli { session =>
val query: Command[EventId *: projects.GitLabId *: ZippedEventPayload *: EmptyTuple] = sql"""
INSERT INTO event_payload (event_id, project_id, payload)
Expand All @@ -191,10 +245,15 @@ trait EventLogDataProvisioning {
}
}
}
.getOrElse(())
case _ => ()
.getOrElse(().pure[IO])
case _ => ().pure[IO]
}

protected def upsertEventPayload(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
maybePayload: Option[ZippedEventPayload]
): Unit = upsertEventPayloadIO(compoundEventId, eventStatus, maybePayload).unsafeRunSync()

protected def upsertProcessingTime(compoundEventId: CompoundEventId,
eventStatus: EventStatus,
processingTime: EventProcessingTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,82 +16,96 @@
* limitations under the License.
*/

package io.renku.eventlog.events.consumers.statuschange.rollbacktoawaitingdeletion
package io.renku.eventlog.events.consumers.statuschange
package rollbacktoawaitingdeletion

import SkunkExceptionsGenerators.postgresErrors
import cats.effect.IO
import io.renku.eventlog.events.consumers.statuschange.DBUpdateResults
import cats.effect.testing.scalatest.AsyncIOSpec
import io.renku.eventlog.api.events.StatusChangeEvent.RollbackToAwaitingDeletion
import io.renku.eventlog.metrics.QueriesExecutionTimes
import io.renku.eventlog.{InMemoryEventLogDbSpec, TypeSerializers}
import io.renku.events.consumers.ConsumersModelGenerators.consumerProjects
import io.renku.events.consumers.Project
import io.renku.generators.Generators.Implicits._
import io.renku.generators.Generators.timestampsNotInTheFuture
import io.renku.generators.Generators.{exceptions, timestampsNotInTheFuture}
import io.renku.graph.model.EventsGenerators.{eventBodies, eventIds, eventStatuses}
import io.renku.graph.model.GraphModelGenerators.{projectIds, projectSlugs}
import io.renku.graph.model.events.EventStatus._
import io.renku.graph.model.events._
import io.renku.interpreters.TestLogger
import io.renku.metrics.TestMetricsRegistry
import io.renku.testtools.IOSpec
import org.scalamock.scalatest.MockFactory
import org.scalatest.OptionValues
import org.scalatest.matchers.should
import org.scalatest.wordspec.AnyWordSpec

import java.time.Instant
import org.scalatest.wordspec.AsyncWordSpec
import skunk.SqlState.DeadlockDetected

class DbUpdaterSpec
extends AnyWordSpec
with IOSpec
extends AsyncWordSpec
with AsyncIOSpec
with InMemoryEventLogDbSpec
with TypeSerializers
with should.Matchers
with MockFactory {
with OptionValues {

"updateDB" should {

s"change status of all the event in the $Deleting status of a given project to $AwaitingDeletion" in new TestCase {
s"change status of all the event in the $Deleting status of a given project to $AwaitingDeletion" in {

val project = consumerProjects.generateOne

val otherStatus = eventStatuses.filter(_ != Deleting).generateOne
val event1Id = addEvent(Deleting)
val event2Id = addEvent(otherStatus)
val event3Id = addEvent(Deleting)
sessionResource
.useK(dbUpdater updateDB RollbackToAwaitingDeletion(Project(projectId, projectSlug)))
.unsafeRunSync() shouldBe DBUpdateResults.ForProjects(
projectSlug,
Map(Deleting -> -2, AwaitingDeletion -> 2)
)

findEvent(CompoundEventId(event1Id, projectId)).map(_._2) shouldBe Some(AwaitingDeletion)
findEvent(CompoundEventId(event2Id, projectId)).map(_._2) shouldBe Some(otherStatus)
findEvent(CompoundEventId(event3Id, projectId)).map(_._2) shouldBe Some(AwaitingDeletion)
}
for {
event1Id <- addEvent(Deleting, project)
event2Id <- addEvent(otherStatus, project)
event3Id <- addEvent(Deleting, project)

_ <- sessionResource
.useK(dbUpdater updateDB RollbackToAwaitingDeletion(project))
.asserting(
_ shouldBe DBUpdateResults.ForProjects(project.slug, Map(Deleting -> -2, AwaitingDeletion -> 2))
)
_ <- findEventIO(CompoundEventId(event1Id, project.id)).asserting(_.value._2 shouldBe AwaitingDeletion)
_ <- findEventIO(CompoundEventId(event2Id, project.id)).asserting(_.value._2 shouldBe otherStatus)
_ <- findEventIO(CompoundEventId(event3Id, project.id)).asserting(_.value._2 shouldBe AwaitingDeletion)
} yield ()
}
}

private trait TestCase {

val projectId = projectIds.generateOne
val projectSlug = projectSlugs.generateOne
"onRollback" should {

val currentTime = mockFunction[Instant]
private implicit val metricsRegistry: TestMetricsRegistry[IO] = TestMetricsRegistry[IO]
private implicit val queriesExecTimes: QueriesExecutionTimes[IO] = QueriesExecutionTimes[IO]().unsafeRunSync()
val dbUpdater = new DbUpdater[IO](currentTime)
"retry on DeadlockDetected" in {

val now = Instant.now()
currentTime.expects().returning(now).anyNumberOfTimes()
val project = consumerProjects.generateOne

def addEvent(status: EventStatus): EventId = {
val eventId = CompoundEventId(eventIds.generateOne, projectId)
addEvent(Deleting, project) >>
dbUpdater
.onRollback(RollbackToAwaitingDeletion(project))
.apply(postgresErrors(DeadlockDetected).generateOne)
.asserting(_ shouldBe DBUpdateResults.ForProjects(project.slug, Map(Deleting -> -1, AwaitingDeletion -> 1)))
}

storeEvent(
eventId,
status,
timestampsNotInTheFuture.generateAs(ExecutionDate),
timestampsNotInTheFuture.generateAs(EventDate),
eventBodies.generateOne,
projectSlug = projectSlug
)
eventId.id
"not be defined for an exception different than DeadlockDetected" in {
dbUpdater
.onRollback(RollbackToAwaitingDeletion(consumerProjects.generateOne))
.isDefinedAt(exceptions.generateOne) shouldBe false
}
}

private implicit val logger: TestLogger[IO] = TestLogger[IO]()
private implicit val metricsRegistry: TestMetricsRegistry[IO] = TestMetricsRegistry[IO]
private implicit val queriesExecTimes: QueriesExecutionTimes[IO] = QueriesExecutionTimes[IO]().unsafeRunSync()
private lazy val dbUpdater = new DbUpdater[IO]()

private def addEvent(status: EventStatus, project: Project): IO[EventId] = {
val eventId = CompoundEventId(eventIds.generateOne, project.id)

storeEventIO(
eventId,
status,
timestampsNotInTheFuture.generateAs(ExecutionDate),
timestampsNotInTheFuture.generateAs(EventDate),
eventBodies.generateOne,
projectSlug = project.slug
).as(eventId.id)
}
}
Loading